Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod delivery;
16pub mod messages;
17pub mod publish;
18pub mod server;
19pub mod watcher;
20
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25use std::thread::JoinHandle;
26use std::time::Instant;
27
28use serde::Serialize;
29
30use crate::config::BrokerConfig;
31pub use messages::BrokerMessage;
32
33/// Worktree to watch for git-status changes.
34///
35/// The broker spawns one [`watcher::watch_worktree`] task per target.
36#[derive(Debug, Clone)]
37pub struct WatchTarget {
38    /// Agent identifier (slugified branch name) that owns this worktree.
39    pub agent_id: String,
40    /// CLI name running in this agent's pane (e.g. `"claude"`).
41    pub cli: String,
42    /// Absolute path to the worktree root.
43    pub worktree_path: PathBuf,
44}
45
46/// Record of a known agent's latest state.
47#[derive(Debug, Clone)]
48pub struct AgentRecord {
49    /// Agent identifier (slugified branch name).
50    pub agent_id: String,
51    /// Last reported status label.
52    pub status: String,
53    /// When the agent last published a message.
54    pub last_seen: Instant,
55    /// The most recent message from this agent.
56    pub last_message: Option<BrokerMessage>,
57}
58
59/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
60/// and the dashboard TUI.
61#[derive(Debug, Clone, Serialize)]
62pub struct AgentStatusEntry {
63    /// Agent identifier (slugified branch name).
64    pub agent_id: String,
65    /// CLI name running in this agent's pane (e.g. "claude").
66    pub cli: String,
67    /// Current status label (e.g. "working", "done", "blocked").
68    pub status: String,
69    /// Seconds since the agent was last seen.
70    pub last_seen_seconds: u64,
71    /// One-line summary from the last message.
72    pub summary: String,
73    /// When the agent was last seen (for age calculations in the dashboard).
74    #[serde(skip)]
75    pub last_seen: Instant,
76}
77
78/// Mutable broker state protected by an `RwLock`.
79#[derive(Debug)]
80pub struct BrokerStateInner {
81    /// Known agents keyed by agent ID.
82    pub agents: HashMap<String, AgentRecord>,
83    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
84    pub agent_clis: HashMap<String, String>,
85    /// Per-agent message inboxes: `(sequence_number, message)`.
86    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
87    /// Append-only message log for disk flush.
88    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
89}
90
91/// Shared broker state.
92///
93/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
94/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
95/// that sequence numbers can be allocated without coupling to the write
96/// lock.
97#[derive(Debug)]
98pub struct BrokerState {
99    /// Protected mutable state.
100    inner: RwLock<BrokerStateInner>,
101    /// Global sequence counter (starts at 0; first assigned value is 1).
102    next_seq: AtomicU64,
103    /// Optional path for periodic log flush to disk.
104    pub log_path: Option<PathBuf>,
105    /// Wall-clock instant the broker state was created; used for uptime reporting.
106    started_at: Instant,
107}
108
109impl BrokerState {
110    /// Creates a new empty broker state.
111    pub fn new(log_path: Option<PathBuf>) -> Self {
112        Self {
113            inner: RwLock::new(BrokerStateInner {
114                agents: HashMap::new(),
115                agent_clis: HashMap::new(),
116                queues: HashMap::new(),
117                message_log: Vec::new(),
118            }),
119            next_seq: AtomicU64::new(0),
120            log_path,
121            started_at: Instant::now(),
122        }
123    }
124
125    /// Acquires a read lock on the inner state.
126    ///
127    /// # Panics
128    ///
129    /// Panics if the lock is poisoned (a thread panicked while holding it).
130    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
131        self.inner.read().expect("broker state lock poisoned")
132    }
133
134    /// Acquires a write lock on the inner state.
135    ///
136    /// # Panics
137    ///
138    /// Panics if the lock is poisoned (a thread panicked while holding it).
139    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
140        self.inner.write().expect("broker state lock poisoned")
141    }
142
143    /// Atomically allocates the next sequence number (starting at 1).
144    pub fn next_seq(&self) -> u64 {
145        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
146    }
147
148    /// Returns the number of seconds since the broker was started.
149    ///
150    /// Used by the HTTP `/status` handler to report uptime.
151    pub fn uptime_seconds(&self) -> u64 {
152        self.started_at.elapsed().as_secs()
153    }
154}
155
156/// Errors specific to broker operations.
157#[derive(Debug, thiserror::Error)]
158pub enum BrokerError {
159    /// The configured port is already in use by a non-broker process.
160    #[error(
161        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
162    )]
163    PortInUse {
164        /// The port that was occupied.
165        port: u16,
166        /// The underlying I/O error.
167        source: std::io::Error,
168    },
169
170    /// A probe to an existing listener on the port timed out.
171    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
172    ProbeTimeout {
173        /// The port that timed out.
174        port: u16,
175    },
176
177    /// Binding to the address failed.
178    #[error("failed to bind broker: {0}")]
179    BindFailed(std::io::Error),
180
181    /// Creating the tokio runtime failed.
182    #[error("failed to create broker runtime: {0}")]
183    RuntimeFailed(std::io::Error),
184}
185
186/// Handle to a running broker, including the optional flush thread.
187///
188/// When dropped, signals the flush thread to stop and joins it, then
189/// shuts down the tokio runtime. If the handle is in "reattached" mode
190/// (connected to an existing broker), dropping it is a no-op.
191pub struct BrokerHandle {
192    /// Shared broker state.
193    pub state: Arc<BrokerState>,
194    /// The tokio runtime powering the broker server.
195    /// `None` when reattached to an existing broker.
196    runtime: Option<tokio::runtime::Runtime>,
197    /// Sends a shutdown signal to the server task.
198    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
199    /// Broadcasts the watcher shutdown signal to all watcher tasks.
200    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
201    /// The URL the broker is listening on.
202    pub url: String,
203    /// Flag to signal the flush thread to exit.
204    stop_flag: Arc<AtomicBool>,
205    /// Flush thread join handle (present only when `log_path` is `Some`).
206    flush_thread: Option<JoinHandle<()>>,
207}
208
209impl BrokerHandle {
210    /// Creates a handle that reattaches to an existing broker (no owned runtime).
211    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
212        Self {
213            state,
214            runtime: None,
215            shutdown_tx: None,
216            watcher_shutdown: None,
217            url,
218            stop_flag: Arc::new(AtomicBool::new(false)),
219            flush_thread: None,
220        }
221    }
222}
223
224impl Drop for BrokerHandle {
225    fn drop(&mut self) {
226        // 1. Signal flush thread to stop and join it.
227        self.stop_flag.store(true, Ordering::Release);
228        if let Some(handle) = self.flush_thread.take() {
229            let _ = handle.join();
230        }
231        // 2. Signal watcher tasks to stop.
232        if let Some(tx) = self.watcher_shutdown.take() {
233            let _ = tx.send(true);
234        }
235        // 3. Signal shutdown to the server task.
236        if let Some(tx) = self.shutdown_tx.take() {
237            let _ = tx.send(());
238        }
239        // 4. Give in-flight requests up to 2 seconds to drain, then drop runtime.
240        if let Some(rt) = self.runtime.take() {
241            rt.shutdown_timeout(std::time::Duration::from_secs(2));
242        }
243    }
244}
245
246/// Result of probing an existing listener on the broker port.
247#[derive(Debug, PartialEq, Eq)]
248pub enum ProbeResult {
249    /// Nothing is listening — safe to bind.
250    NoListener,
251    /// A git-paw broker is already running.
252    LiveBroker,
253    /// Something else is using the port.
254    ForeignServer,
255    /// The probe timed out.
256    Timeout,
257}
258
259/// Probes an existing listener at the given URL to determine what is running.
260///
261/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
262/// to avoid pulling in a full HTTP client dependency.
263/// Probes a URL to determine what broker (if any) is running there.
264///
265/// Public entry point for callers that need to inspect broker status without
266/// starting a new server (e.g. the `status` subcommand).
267pub fn probe_broker(url: &str) -> ProbeResult {
268    probe_existing_broker(url)
269}
270
271fn probe_existing_broker(url: &str) -> ProbeResult {
272    use std::io::{Read, Write};
273    use std::net::TcpStream;
274    use std::time::Duration;
275
276    // Parse host:port from URL like "http://127.0.0.1:9119"
277    let addr = url.strip_prefix("http://").unwrap_or(url);
278
279    let socket_addr = if let Ok(a) = addr.parse() {
280        a
281    } else {
282        use std::net::ToSocketAddrs;
283        match addr.to_socket_addrs() {
284            Ok(mut addrs) => match addrs.next() {
285                Some(a) => a,
286                None => return ProbeResult::NoListener,
287            },
288            Err(_) => return ProbeResult::NoListener,
289        }
290    };
291
292    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
293    else {
294        return ProbeResult::NoListener;
295    };
296
297    stream
298        .set_read_timeout(Some(Duration::from_millis(500)))
299        .ok();
300    stream
301        .set_write_timeout(Some(Duration::from_millis(500)))
302        .ok();
303
304    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
305    if stream.write_all(request.as_bytes()).is_err() {
306        return ProbeResult::Timeout;
307    }
308
309    let mut response = String::new();
310    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
311        return ProbeResult::Timeout;
312    }
313
314    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
315        ProbeResult::LiveBroker
316    } else if response.starts_with("HTTP/") {
317        ProbeResult::ForeignServer
318    } else {
319        ProbeResult::Timeout
320    }
321}
322
323/// Starts the HTTP broker server.
324///
325/// Probes the configured port first:
326/// - If a live git-paw broker is found, returns a reattached handle.
327/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
328/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
329/// - If nothing is listening, binds and starts the server.
330///
331/// Also spawns the background flush thread if `state.log_path` is set.
332pub fn start_broker(
333    config: &BrokerConfig,
334    state: BrokerState,
335    watch_targets: Vec<WatchTarget>,
336) -> Result<BrokerHandle, BrokerError> {
337    let url = config.url();
338    let state = Arc::new(state);
339    let stop_flag = Arc::new(AtomicBool::new(false));
340
341    match probe_existing_broker(&url) {
342        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
343        ProbeResult::ForeignServer => {
344            return Err(BrokerError::PortInUse {
345                port: config.port,
346                source: std::io::Error::new(
347                    std::io::ErrorKind::AddrInUse,
348                    "port occupied by non-broker process",
349                ),
350            });
351        }
352        ProbeResult::Timeout => {
353            return Err(BrokerError::ProbeTimeout { port: config.port });
354        }
355        ProbeResult::NoListener => {}
356    }
357
358    // Spawn flush thread if log_path is configured.
359    let flush_thread = if state.log_path.is_some() {
360        let s = Arc::clone(&state);
361        let f = Arc::clone(&stop_flag);
362        Some(std::thread::spawn(move || {
363            delivery::flush_loop(&s, &f);
364        }))
365    } else {
366        None
367    };
368
369    let runtime = tokio::runtime::Builder::new_multi_thread()
370        .enable_all()
371        .build()
372        .map_err(BrokerError::RuntimeFailed)?;
373
374    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
375        |e: std::net::AddrParseError| {
376            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
377        },
378    )?;
379
380    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
381
382    let router = server::router(Arc::clone(&state));
383
384    let listener = runtime.block_on(async {
385        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
386        socket
387            .set_reuseaddr(true)
388            .map_err(BrokerError::BindFailed)?;
389        socket.bind(addr).map_err(BrokerError::BindFailed)?;
390        socket.listen(1024).map_err(BrokerError::BindFailed)
391    })?;
392
393    // Install SIGINT handler so the broker does not die on Ctrl+C.
394    // The dashboard process is responsible for user-facing Ctrl+C handling.
395    runtime.spawn(async {
396        let _ = tokio::signal::ctrl_c().await;
397    });
398
399    runtime.spawn(async move {
400        axum::serve(listener, router)
401            .with_graceful_shutdown(async {
402                let _ = shutdown_rx.await;
403            })
404            .await
405            .ok();
406    });
407
408    // Pre-populate the CLI label AND the inbox queue for every watched
409    // agent so (a) the dashboard shows the CLI before any status messages
410    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
411    // already-existing queues — actually reach the watched agent even
412    // before it has published anything itself.
413    {
414        let mut inner = state.write();
415        for target in &watch_targets {
416            inner
417                .agent_clis
418                .insert(target.agent_id.clone(), target.cli.clone());
419            inner.queues.entry(target.agent_id.clone()).or_default();
420        }
421    }
422
423    // Spawn one watcher task per target. All watchers share a single
424    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
425    // every watcher to exit on its next tick.
426    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
427    for target in watch_targets {
428        let s = Arc::clone(&state);
429        let rx = watcher_rx.clone();
430        runtime.spawn(watcher::watch_worktree(s, target, rx));
431    }
432
433    Ok(BrokerHandle {
434        state,
435        runtime: Some(runtime),
436        shutdown_tx: Some(shutdown_tx),
437        watcher_shutdown: Some(watcher_tx),
438        url,
439        stop_flag,
440        flush_thread,
441    })
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn broker_state_new_is_empty() {
450        let state = BrokerState::new(None);
451        let inner = state.read();
452        assert!(inner.agents.is_empty());
453        assert!(inner.queues.is_empty());
454        assert!(inner.message_log.is_empty());
455    }
456
457    #[test]
458    fn next_seq_starts_at_one() {
459        let state = BrokerState::new(None);
460        assert_eq!(state.next_seq(), 1);
461        assert_eq!(state.next_seq(), 2);
462        assert_eq!(state.next_seq(), 3);
463    }
464
465    #[test]
466    fn probe_no_listener() {
467        // Use a port that is almost certainly not in use.
468        let result = probe_existing_broker("http://127.0.0.1:19999");
469        assert_eq!(result, ProbeResult::NoListener);
470    }
471
472    #[test]
473    fn reattached_handle_has_no_runtime() {
474        let state = Arc::new(BrokerState::new(None));
475        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
476        assert!(h.runtime.is_none());
477        assert!(h.shutdown_tx.is_none());
478        assert!(h.flush_thread.is_none());
479    }
480
481    #[test]
482    fn start_broker_on_free_port() {
483        let config = BrokerConfig {
484            enabled: true,
485            // Use a high random port to avoid conflicts.
486            #[allow(clippy::cast_possible_truncation)]
487            port: 19_000 + (std::process::id() as u16 % 1000),
488            bind: "127.0.0.1".to_string(),
489        };
490        let state = BrokerState::new(None);
491        let handle = start_broker(&config, state, Vec::new());
492        // If the port happens to be in use, the test is inconclusive — not a failure.
493        if let Ok(h) = handle {
494            assert!(h.url.contains(&config.port.to_string()));
495            drop(h);
496        }
497    }
498
499    #[test]
500    fn start_broker_no_log_path_no_flush_thread() {
501        let config = BrokerConfig {
502            enabled: true,
503            #[allow(clippy::cast_possible_truncation)]
504            port: 19_100 + (std::process::id() as u16 % 100),
505            bind: "127.0.0.1".to_string(),
506        };
507        let state = BrokerState::new(None);
508        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
509            assert!(handle.flush_thread.is_none());
510            drop(handle);
511        }
512    }
513
514    #[test]
515    fn start_broker_with_log_path_spawns_flush_thread() {
516        let tmp = tempfile::tempdir().unwrap();
517        let log_path = tmp.path().join("broker.log");
518        let config = BrokerConfig {
519            enabled: true,
520            #[allow(clippy::cast_possible_truncation)]
521            port: 19_200 + (std::process::id() as u16 % 100),
522            bind: "127.0.0.1".to_string(),
523        };
524        let state = BrokerState::new(Some(log_path));
525        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
526            assert!(handle.flush_thread.is_some());
527            drop(handle);
528        }
529    }
530}