Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35/// Worktree to watch for git-status changes.
36///
37/// The broker spawns one [`watcher::watch_worktree`] task per target.
38#[derive(Debug, Clone)]
39pub struct WatchTarget {
40    /// Agent identifier (slugified branch name) that owns this worktree.
41    pub agent_id: String,
42    /// CLI name running in this agent's pane (e.g. `"claude"`).
43    pub cli: String,
44    /// Absolute path to the worktree root.
45    pub worktree_path: PathBuf,
46}
47
48/// Record of a known agent's latest state.
49#[derive(Debug, Clone)]
50pub struct AgentRecord {
51    /// Agent identifier (slugified branch name).
52    pub agent_id: String,
53    /// Last reported status label.
54    pub status: String,
55    /// When the agent last published a message.
56    pub last_seen: Instant,
57    /// The most recent message from this agent.
58    pub last_message: Option<BrokerMessage>,
59}
60
61/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
62/// and the dashboard TUI.
63#[derive(Debug, Clone, Serialize)]
64pub struct AgentStatusEntry {
65    /// Agent identifier (slugified branch name).
66    pub agent_id: String,
67    /// CLI name running in this agent's pane (e.g. "claude").
68    pub cli: String,
69    /// Current status label (e.g. "working", "done", "blocked").
70    pub status: String,
71    /// Seconds since the agent was last seen.
72    pub last_seen_seconds: u64,
73    /// One-line summary from the last message.
74    pub summary: String,
75    /// When the agent was last seen (for age calculations in the dashboard).
76    #[serde(skip)]
77    pub last_seen: Instant,
78    /// Most recently published `payload.phase` on an `agent.status`, if any.
79    /// The dashboard prefers this label over the message-type-derived
80    /// `status_label()` when rendering the agent's row.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub phase: Option<String>,
83}
84
85/// Mutable broker state protected by an `RwLock`.
86#[derive(Debug)]
87pub struct BrokerStateInner {
88    /// Known agents keyed by agent ID.
89    pub agents: HashMap<String, AgentRecord>,
90    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
91    pub agent_clis: HashMap<String, String>,
92    /// Per-agent message inboxes: `(sequence_number, message)`.
93    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
94    /// Append-only message log for disk flush.
95    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
96}
97
98/// Shared broker state.
99///
100/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
101/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
102/// that sequence numbers can be allocated without coupling to the write
103/// lock.
104#[derive(Debug)]
105pub struct BrokerState {
106    /// Protected mutable state.
107    inner: RwLock<BrokerStateInner>,
108    /// Global sequence counter (starts at 0; first assigned value is 1).
109    next_seq: AtomicU64,
110    /// Optional path for periodic log flush to disk.
111    pub log_path: Option<PathBuf>,
112    /// Wall-clock instant the broker state was created; used for uptime reporting.
113    started_at: Instant,
114    /// Optional learnings aggregator. Populated when supervisor + learnings
115    /// mode is active; the publish path forwards every observed message.
116    pub learnings: Option<learnings::SharedLearnings>,
117}
118
119impl BrokerState {
120    /// Creates a new empty broker state.
121    pub fn new(log_path: Option<PathBuf>) -> Self {
122        Self {
123            inner: RwLock::new(BrokerStateInner {
124                agents: HashMap::new(),
125                agent_clis: HashMap::new(),
126                queues: HashMap::new(),
127                message_log: Vec::new(),
128            }),
129            next_seq: AtomicU64::new(0),
130            log_path,
131            started_at: Instant::now(),
132            learnings: None,
133        }
134    }
135
136    /// Attaches a learnings aggregator. Replaces any previously attached
137    /// instance. Must be called before [`start_broker`] so the publish path
138    /// observes every message from the first one.
139    pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
140        self.learnings = Some(aggregator);
141    }
142
143    /// Acquires a read lock on the inner state.
144    ///
145    /// # Panics
146    ///
147    /// Panics if the lock is poisoned (a thread panicked while holding it).
148    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
149        self.inner.read().expect("broker state lock poisoned")
150    }
151
152    /// Acquires a write lock on the inner state.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the lock is poisoned (a thread panicked while holding it).
157    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
158        self.inner.write().expect("broker state lock poisoned")
159    }
160
161    /// Atomically allocates the next sequence number (starting at 1).
162    pub fn next_seq(&self) -> u64 {
163        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
164    }
165
166    /// Returns the number of seconds since the broker was started.
167    ///
168    /// Used by the HTTP `/status` handler to report uptime.
169    pub fn uptime_seconds(&self) -> u64 {
170        self.started_at.elapsed().as_secs()
171    }
172}
173
174/// Errors specific to broker operations.
175#[derive(Debug, thiserror::Error)]
176pub enum BrokerError {
177    /// The configured port is already in use by a non-broker process.
178    #[error(
179        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
180    )]
181    PortInUse {
182        /// The port that was occupied.
183        port: u16,
184        /// The underlying I/O error.
185        source: std::io::Error,
186    },
187
188    /// A probe to an existing listener on the port timed out.
189    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
190    ProbeTimeout {
191        /// The port that timed out.
192        port: u16,
193    },
194
195    /// Binding to the address failed.
196    #[error("failed to bind broker: {0}")]
197    BindFailed(std::io::Error),
198
199    /// Creating the tokio runtime failed.
200    #[error("failed to create broker runtime: {0}")]
201    RuntimeFailed(std::io::Error),
202}
203
204/// Handle to a running broker, including the optional flush thread.
205///
206/// When dropped, signals the flush thread to stop and joins it, then
207/// shuts down the tokio runtime. If the handle is in "reattached" mode
208/// (connected to an existing broker), dropping it is a no-op.
209pub struct BrokerHandle {
210    /// Shared broker state.
211    pub state: Arc<BrokerState>,
212    /// The tokio runtime powering the broker server.
213    /// `None` when reattached to an existing broker.
214    runtime: Option<tokio::runtime::Runtime>,
215    /// Sends a shutdown signal to the server task.
216    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
217    /// Broadcasts the watcher shutdown signal to all watcher tasks.
218    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
219    /// The URL the broker is listening on.
220    pub url: String,
221    /// Flag to signal the flush thread to exit.
222    stop_flag: Arc<AtomicBool>,
223    /// Flush thread join handle (present only when `log_path` is `Some`).
224    flush_thread: Option<JoinHandle<()>>,
225    /// Periodic flush thread for the learnings aggregator (present only
226    /// when `state.learnings.is_some()`).
227    learnings_thread: Option<JoinHandle<()>>,
228}
229
230impl BrokerHandle {
231    /// Creates a handle that reattaches to an existing broker (no owned runtime).
232    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
233        Self {
234            state,
235            runtime: None,
236            shutdown_tx: None,
237            watcher_shutdown: None,
238            url,
239            stop_flag: Arc::new(AtomicBool::new(false)),
240            flush_thread: None,
241            learnings_thread: None,
242        }
243    }
244}
245
246impl Drop for BrokerHandle {
247    fn drop(&mut self) {
248        // 1. Signal both flush threads to stop and join the log flush thread.
249        self.stop_flag.store(true, Ordering::Release);
250        if let Some(handle) = self.flush_thread.take() {
251            let _ = handle.join();
252        }
253        // 2. Join the learnings flush thread — it performs the final
254        //    shutdown flush before returning.
255        if let Some(handle) = self.learnings_thread.take() {
256            let _ = handle.join();
257        }
258        // 3. Signal watcher tasks to stop.
259        if let Some(tx) = self.watcher_shutdown.take() {
260            let _ = tx.send(true);
261        }
262        // 4. Signal shutdown to the server task.
263        if let Some(tx) = self.shutdown_tx.take() {
264            let _ = tx.send(());
265        }
266        // 5. Give in-flight requests up to 2 seconds to drain, then drop runtime.
267        if let Some(rt) = self.runtime.take() {
268            rt.shutdown_timeout(std::time::Duration::from_secs(2));
269        }
270    }
271}
272
273/// Result of probing an existing listener on the broker port.
274#[derive(Debug, PartialEq, Eq)]
275pub enum ProbeResult {
276    /// Nothing is listening — safe to bind.
277    NoListener,
278    /// A git-paw broker is already running.
279    LiveBroker,
280    /// Something else is using the port.
281    ForeignServer,
282    /// The probe timed out.
283    Timeout,
284}
285
286/// Probes an existing listener at the given URL to determine what is running.
287///
288/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
289/// to avoid pulling in a full HTTP client dependency.
290/// Probes a URL to determine what broker (if any) is running there.
291///
292/// Public entry point for callers that need to inspect broker status without
293/// starting a new server (e.g. the `status` subcommand).
294pub fn probe_broker(url: &str) -> ProbeResult {
295    probe_existing_broker(url)
296}
297
298fn probe_existing_broker(url: &str) -> ProbeResult {
299    use std::io::{Read, Write};
300    use std::net::TcpStream;
301    use std::time::Duration;
302
303    // Parse host:port from URL like "http://127.0.0.1:9119"
304    let addr = url.strip_prefix("http://").unwrap_or(url);
305
306    let socket_addr = if let Ok(a) = addr.parse() {
307        a
308    } else {
309        use std::net::ToSocketAddrs;
310        match addr.to_socket_addrs() {
311            Ok(mut addrs) => match addrs.next() {
312                Some(a) => a,
313                None => return ProbeResult::NoListener,
314            },
315            Err(_) => return ProbeResult::NoListener,
316        }
317    };
318
319    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
320    else {
321        return ProbeResult::NoListener;
322    };
323
324    stream
325        .set_read_timeout(Some(Duration::from_millis(500)))
326        .ok();
327    stream
328        .set_write_timeout(Some(Duration::from_millis(500)))
329        .ok();
330
331    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
332    if stream.write_all(request.as_bytes()).is_err() {
333        return ProbeResult::Timeout;
334    }
335
336    let mut response = String::new();
337    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
338        return ProbeResult::Timeout;
339    }
340
341    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
342        ProbeResult::LiveBroker
343    } else if response.starts_with("HTTP/") {
344        ProbeResult::ForeignServer
345    } else {
346        ProbeResult::Timeout
347    }
348}
349
350/// Starts the HTTP broker server.
351///
352/// Probes the configured port first:
353/// - If a live git-paw broker is found, returns a reattached handle.
354/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
355/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
356/// - If nothing is listening, binds and starts the server.
357///
358/// Also spawns the background flush thread if `state.log_path` is set.
359///
360/// Calls [`start_broker`] without a conflict-detector. Equivalent to
361/// `start_broker_with(config, state, watch_targets, None)`.
362pub fn start_broker(
363    config: &BrokerConfig,
364    state: BrokerState,
365    watch_targets: Vec<WatchTarget>,
366) -> Result<BrokerHandle, BrokerError> {
367    start_broker_with(config, state, watch_targets, None, 60)
368}
369
370/// Starts the HTTP broker server with optional conflict-detector wiring
371/// and a configurable learnings-flush interval.
372///
373/// When `conflict` is `Some`, a background tokio task running the
374/// detector loop is spawned alongside the watcher tasks. The detector
375/// shuts down with the rest of the broker when [`BrokerHandle`] is
376/// dropped.
377///
378/// `learnings_flush_interval_seconds` controls how often the learnings
379/// aggregator flushes to `.git-paw/session-learnings.md` when
380/// `state.learnings` is `Some`. Default for [`start_broker`] is 60s;
381/// tests override to drive flush behaviour without waiting on real time.
382pub fn start_broker_with(
383    config: &BrokerConfig,
384    state: BrokerState,
385    watch_targets: Vec<WatchTarget>,
386    conflict: Option<ConflictConfig>,
387    learnings_flush_interval_seconds: u64,
388) -> Result<BrokerHandle, BrokerError> {
389    let url = config.url();
390    let state = Arc::new(state);
391    let stop_flag = Arc::new(AtomicBool::new(false));
392
393    match probe_existing_broker(&url) {
394        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
395        ProbeResult::ForeignServer => {
396            return Err(BrokerError::PortInUse {
397                port: config.port,
398                source: std::io::Error::new(
399                    std::io::ErrorKind::AddrInUse,
400                    "port occupied by non-broker process",
401                ),
402            });
403        }
404        ProbeResult::Timeout => {
405            return Err(BrokerError::ProbeTimeout { port: config.port });
406        }
407        ProbeResult::NoListener => {}
408    }
409
410    // Spawn flush thread if log_path is configured.
411    let flush_thread = if state.log_path.is_some() {
412        let s = Arc::clone(&state);
413        let f = Arc::clone(&stop_flag);
414        Some(std::thread::spawn(move || {
415            delivery::flush_loop(&s, &f);
416        }))
417    } else {
418        None
419    };
420
421    // Spawn learnings flush thread when an aggregator is attached. The
422    // thread performs a final `flush_at_shutdown` after the stop flag is
423    // raised so unresolved blocks and recovery cycles end up in the file.
424    let learnings_thread = if state.learnings.is_some() {
425        let s = Arc::clone(&state);
426        let f = Arc::clone(&stop_flag);
427        Some(std::thread::spawn(move || {
428            learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
429        }))
430    } else {
431        None
432    };
433
434    let runtime = tokio::runtime::Builder::new_multi_thread()
435        .enable_all()
436        .build()
437        .map_err(BrokerError::RuntimeFailed)?;
438
439    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
440        |e: std::net::AddrParseError| {
441            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
442        },
443    )?;
444
445    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
446
447    let router = server::router(Arc::clone(&state));
448
449    let listener = runtime.block_on(async {
450        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
451        socket
452            .set_reuseaddr(true)
453            .map_err(BrokerError::BindFailed)?;
454        socket.bind(addr).map_err(BrokerError::BindFailed)?;
455        socket.listen(1024).map_err(BrokerError::BindFailed)
456    })?;
457
458    // Install SIGINT handler so the broker does not die on Ctrl+C.
459    // The dashboard process is responsible for user-facing Ctrl+C handling.
460    runtime.spawn(async {
461        let _ = tokio::signal::ctrl_c().await;
462    });
463
464    runtime.spawn(async move {
465        axum::serve(listener, router)
466            .with_graceful_shutdown(async {
467                let _ = shutdown_rx.await;
468            })
469            .await
470            .ok();
471    });
472
473    // Pre-populate the CLI label AND the inbox queue for every watched
474    // agent so (a) the dashboard shows the CLI before any status messages
475    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
476    // already-existing queues — actually reach the watched agent even
477    // before it has published anything itself.
478    {
479        let mut inner = state.write();
480        for target in &watch_targets {
481            inner
482                .agent_clis
483                .insert(target.agent_id.clone(), target.cli.clone());
484            inner.queues.entry(target.agent_id.clone()).or_default();
485        }
486    }
487
488    // Spawn one watcher task per target. All watchers share a single
489    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
490    // every watcher to exit on its next tick. The conflict detector
491    // shares the same shutdown channel so it stops in lockstep.
492    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
493    for target in watch_targets {
494        let s = Arc::clone(&state);
495        let rx = watcher_rx.clone();
496        runtime.spawn(watcher::watch_worktree(s, target, rx));
497    }
498    if let Some(conflict_cfg) = conflict {
499        let s = Arc::clone(&state);
500        let rx = watcher_rx.clone();
501        runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
502    }
503
504    Ok(BrokerHandle {
505        state,
506        runtime: Some(runtime),
507        shutdown_tx: Some(shutdown_tx),
508        watcher_shutdown: Some(watcher_tx),
509        url,
510        stop_flag,
511        flush_thread,
512        learnings_thread,
513    })
514}
515
516/// Background loop driving the learnings aggregator's periodic flush.
517///
518/// Sleeps in small slices so it can react to the broker stop flag within
519/// ~100ms. When the stop flag is raised, it performs one final
520/// [`learnings::LearningsAggregator::flush_at_shutdown`] before exiting.
521fn learnings_flush_loop(
522    state: &Arc<BrokerState>,
523    stop: &Arc<AtomicBool>,
524    flush_interval_seconds: u64,
525) {
526    let Some(aggregator) = state.learnings.clone() else {
527        return;
528    };
529    let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
530    let tick = std::time::Duration::from_millis(100);
531
532    loop {
533        let mut elapsed = std::time::Duration::ZERO;
534        while elapsed < interval {
535            if stop.load(Ordering::Acquire) {
536                if let Ok(mut agg) = aggregator.lock() {
537                    let _ = agg.flush_at_shutdown();
538                }
539                return;
540            }
541            std::thread::sleep(tick);
542            elapsed += tick;
543        }
544        if let Ok(mut agg) = aggregator.lock() {
545            let _ = agg.flush();
546        }
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    #[test]
555    fn broker_state_new_is_empty() {
556        let state = BrokerState::new(None);
557        let inner = state.read();
558        assert!(inner.agents.is_empty());
559        assert!(inner.queues.is_empty());
560        assert!(inner.message_log.is_empty());
561    }
562
563    #[test]
564    fn next_seq_starts_at_one() {
565        let state = BrokerState::new(None);
566        assert_eq!(state.next_seq(), 1);
567        assert_eq!(state.next_seq(), 2);
568        assert_eq!(state.next_seq(), 3);
569    }
570
571    #[test]
572    fn probe_no_listener() {
573        // Use a port that is almost certainly not in use.
574        let result = probe_existing_broker("http://127.0.0.1:19999");
575        assert_eq!(result, ProbeResult::NoListener);
576    }
577
578    #[test]
579    fn reattached_handle_has_no_runtime() {
580        let state = Arc::new(BrokerState::new(None));
581        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
582        assert!(h.runtime.is_none());
583        assert!(h.shutdown_tx.is_none());
584        assert!(h.flush_thread.is_none());
585    }
586
587    #[test]
588    fn start_broker_on_free_port() {
589        let config = BrokerConfig {
590            enabled: true,
591            // Use a high random port to avoid conflicts.
592            #[allow(clippy::cast_possible_truncation)]
593            port: 19_000 + (std::process::id() as u16 % 1000),
594            bind: "127.0.0.1".to_string(),
595        };
596        let state = BrokerState::new(None);
597        let handle = start_broker(&config, state, Vec::new());
598        // If the port happens to be in use, the test is inconclusive — not a failure.
599        if let Ok(h) = handle {
600            assert!(h.url.contains(&config.port.to_string()));
601            drop(h);
602        }
603    }
604
605    #[test]
606    fn start_broker_no_log_path_no_flush_thread() {
607        let config = BrokerConfig {
608            enabled: true,
609            #[allow(clippy::cast_possible_truncation)]
610            port: 19_100 + (std::process::id() as u16 % 100),
611            bind: "127.0.0.1".to_string(),
612        };
613        let state = BrokerState::new(None);
614        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
615            assert!(handle.flush_thread.is_none());
616            drop(handle);
617        }
618    }
619
620    #[test]
621    fn start_broker_with_log_path_spawns_flush_thread() {
622        let tmp = tempfile::tempdir().unwrap();
623        let log_path = tmp.path().join("broker.log");
624        let config = BrokerConfig {
625            enabled: true,
626            #[allow(clippy::cast_possible_truncation)]
627            port: 19_200 + (std::process::id() as u16 % 100),
628            bind: "127.0.0.1".to_string(),
629        };
630        let state = BrokerState::new(Some(log_path));
631        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
632            assert!(handle.flush_thread.is_some());
633            drop(handle);
634        }
635    }
636}