Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod delivery;
16pub mod messages;
17pub mod server;
18
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::{Arc, RwLock};
23use std::thread::JoinHandle;
24use std::time::Instant;
25
26use serde::Serialize;
27
28use crate::config::BrokerConfig;
29pub use messages::BrokerMessage;
30
31/// Record of a known agent's latest state.
32#[derive(Debug, Clone)]
33pub struct AgentRecord {
34    /// Agent identifier (slugified branch name).
35    pub agent_id: String,
36    /// Last reported status label.
37    pub status: String,
38    /// When the agent last published a message.
39    pub last_seen: Instant,
40    /// The most recent message from this agent.
41    pub last_message: Option<BrokerMessage>,
42}
43
44/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
45/// and the dashboard TUI.
46#[derive(Debug, Clone, Serialize)]
47pub struct AgentStatusEntry {
48    /// Agent identifier (slugified branch name).
49    pub agent_id: String,
50    /// CLI name running in this agent's pane (e.g. "claude").
51    pub cli: String,
52    /// Current status label (e.g. "working", "done", "blocked").
53    pub status: String,
54    /// Seconds since the agent was last seen.
55    pub last_seen_seconds: u64,
56    /// One-line summary from the last message.
57    pub summary: String,
58    /// When the agent was last seen (for age calculations in the dashboard).
59    #[serde(skip)]
60    pub last_seen: Instant,
61}
62
63/// Mutable broker state protected by an `RwLock`.
64#[derive(Debug)]
65pub struct BrokerStateInner {
66    /// Known agents keyed by agent ID.
67    pub agents: HashMap<String, AgentRecord>,
68    /// Per-agent message inboxes: `(sequence_number, message)`.
69    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
70    /// Append-only message log for disk flush.
71    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
72}
73
74/// Shared broker state.
75///
76/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
77/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
78/// that sequence numbers can be allocated without coupling to the write
79/// lock.
80#[derive(Debug)]
81pub struct BrokerState {
82    /// Protected mutable state.
83    inner: RwLock<BrokerStateInner>,
84    /// Global sequence counter (starts at 0; first assigned value is 1).
85    next_seq: AtomicU64,
86    /// Optional path for periodic log flush to disk.
87    pub log_path: Option<PathBuf>,
88}
89
90impl BrokerState {
91    /// Creates a new empty broker state.
92    pub fn new(log_path: Option<PathBuf>) -> Self {
93        Self {
94            inner: RwLock::new(BrokerStateInner {
95                agents: HashMap::new(),
96                queues: HashMap::new(),
97                message_log: Vec::new(),
98            }),
99            next_seq: AtomicU64::new(0),
100            log_path,
101        }
102    }
103
104    /// Acquires a read lock on the inner state.
105    ///
106    /// # Panics
107    ///
108    /// Panics if the lock is poisoned (a thread panicked while holding it).
109    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
110        self.inner.read().expect("broker state lock poisoned")
111    }
112
113    /// Acquires a write lock on the inner state.
114    ///
115    /// # Panics
116    ///
117    /// Panics if the lock is poisoned (a thread panicked while holding it).
118    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
119        self.inner.write().expect("broker state lock poisoned")
120    }
121
122    /// Atomically allocates the next sequence number (starting at 1).
123    pub fn next_seq(&self) -> u64 {
124        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
125    }
126
127    /// Returns the number of seconds since the broker was started.
128    ///
129    /// Used by the HTTP `/status` handler to report uptime. This value is
130    /// informational only; callers should handle `0` gracefully.
131    pub fn uptime_seconds(&self) -> u64 {
132        // A dedicated `started_at` field would be more precise, but since
133        // `uptime_seconds` is a best-effort diagnostic metric we return 0 here
134        // to keep the struct lean.
135        0
136    }
137}
138
139/// Errors specific to broker operations.
140#[derive(Debug, thiserror::Error)]
141pub enum BrokerError {
142    /// The configured port is already in use by a non-broker process.
143    #[error(
144        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
145    )]
146    PortInUse {
147        /// The port that was occupied.
148        port: u16,
149        /// The underlying I/O error.
150        source: std::io::Error,
151    },
152
153    /// A probe to an existing listener on the port timed out.
154    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
155    ProbeTimeout {
156        /// The port that timed out.
157        port: u16,
158    },
159
160    /// Binding to the address failed.
161    #[error("failed to bind broker: {0}")]
162    BindFailed(std::io::Error),
163
164    /// Creating the tokio runtime failed.
165    #[error("failed to create broker runtime: {0}")]
166    RuntimeFailed(std::io::Error),
167}
168
169/// Handle to a running broker, including the optional flush thread.
170///
171/// When dropped, signals the flush thread to stop and joins it, then
172/// shuts down the tokio runtime. If the handle is in "reattached" mode
173/// (connected to an existing broker), dropping it is a no-op.
174pub struct BrokerHandle {
175    /// Shared broker state.
176    pub state: Arc<BrokerState>,
177    /// The tokio runtime powering the broker server.
178    /// `None` when reattached to an existing broker.
179    runtime: Option<tokio::runtime::Runtime>,
180    /// Sends a shutdown signal to the server task.
181    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
182    /// The URL the broker is listening on.
183    pub url: String,
184    /// Flag to signal the flush thread to exit.
185    stop_flag: Arc<AtomicBool>,
186    /// Flush thread join handle (present only when `log_path` is `Some`).
187    flush_thread: Option<JoinHandle<()>>,
188}
189
190impl BrokerHandle {
191    /// Creates a handle that reattaches to an existing broker (no owned runtime).
192    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
193        Self {
194            state,
195            runtime: None,
196            shutdown_tx: None,
197            url,
198            stop_flag: Arc::new(AtomicBool::new(false)),
199            flush_thread: None,
200        }
201    }
202}
203
204impl Drop for BrokerHandle {
205    fn drop(&mut self) {
206        // 1. Signal flush thread to stop and join it.
207        self.stop_flag.store(true, Ordering::Release);
208        if let Some(handle) = self.flush_thread.take() {
209            let _ = handle.join();
210        }
211        // 2. Signal shutdown to the server task.
212        if let Some(tx) = self.shutdown_tx.take() {
213            let _ = tx.send(());
214        }
215        // 3. Give in-flight requests up to 2 seconds to drain, then drop runtime.
216        if let Some(rt) = self.runtime.take() {
217            rt.shutdown_timeout(std::time::Duration::from_secs(2));
218        }
219    }
220}
221
222/// Result of probing an existing listener on the broker port.
223#[derive(Debug, PartialEq, Eq)]
224pub enum ProbeResult {
225    /// Nothing is listening — safe to bind.
226    NoListener,
227    /// A git-paw broker is already running.
228    LiveBroker,
229    /// Something else is using the port.
230    ForeignServer,
231    /// The probe timed out.
232    Timeout,
233}
234
235/// Probes an existing listener at the given URL to determine what is running.
236///
237/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
238/// to avoid pulling in a full HTTP client dependency.
239/// Probes a URL to determine what broker (if any) is running there.
240///
241/// Public entry point for callers that need to inspect broker status without
242/// starting a new server (e.g. the `status` subcommand).
243pub fn probe_broker(url: &str) -> ProbeResult {
244    probe_existing_broker(url)
245}
246
247fn probe_existing_broker(url: &str) -> ProbeResult {
248    use std::io::{Read, Write};
249    use std::net::TcpStream;
250    use std::time::Duration;
251
252    // Parse host:port from URL like "http://127.0.0.1:9119"
253    let addr = url.strip_prefix("http://").unwrap_or(url);
254
255    let socket_addr = if let Ok(a) = addr.parse() {
256        a
257    } else {
258        use std::net::ToSocketAddrs;
259        match addr.to_socket_addrs() {
260            Ok(mut addrs) => match addrs.next() {
261                Some(a) => a,
262                None => return ProbeResult::NoListener,
263            },
264            Err(_) => return ProbeResult::NoListener,
265        }
266    };
267
268    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
269    else {
270        return ProbeResult::NoListener;
271    };
272
273    stream
274        .set_read_timeout(Some(Duration::from_millis(500)))
275        .ok();
276    stream
277        .set_write_timeout(Some(Duration::from_millis(500)))
278        .ok();
279
280    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
281    if stream.write_all(request.as_bytes()).is_err() {
282        return ProbeResult::Timeout;
283    }
284
285    let mut response = String::new();
286    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
287        return ProbeResult::Timeout;
288    }
289
290    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
291        ProbeResult::LiveBroker
292    } else if response.starts_with("HTTP/") {
293        ProbeResult::ForeignServer
294    } else {
295        ProbeResult::Timeout
296    }
297}
298
299/// Starts the HTTP broker server.
300///
301/// Probes the configured port first:
302/// - If a live git-paw broker is found, returns a reattached handle.
303/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
304/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
305/// - If nothing is listening, binds and starts the server.
306///
307/// Also spawns the background flush thread if `state.log_path` is set.
308pub fn start_broker(
309    config: &BrokerConfig,
310    state: BrokerState,
311) -> Result<BrokerHandle, BrokerError> {
312    let url = config.url();
313    let state = Arc::new(state);
314    let stop_flag = Arc::new(AtomicBool::new(false));
315
316    match probe_existing_broker(&url) {
317        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
318        ProbeResult::ForeignServer => {
319            return Err(BrokerError::PortInUse {
320                port: config.port,
321                source: std::io::Error::new(
322                    std::io::ErrorKind::AddrInUse,
323                    "port occupied by non-broker process",
324                ),
325            });
326        }
327        ProbeResult::Timeout => {
328            return Err(BrokerError::ProbeTimeout { port: config.port });
329        }
330        ProbeResult::NoListener => {}
331    }
332
333    // Spawn flush thread if log_path is configured.
334    let flush_thread = if state.log_path.is_some() {
335        let s = Arc::clone(&state);
336        let f = Arc::clone(&stop_flag);
337        Some(std::thread::spawn(move || {
338            delivery::flush_loop(&s, &f);
339        }))
340    } else {
341        None
342    };
343
344    let runtime = tokio::runtime::Builder::new_multi_thread()
345        .enable_all()
346        .build()
347        .map_err(BrokerError::RuntimeFailed)?;
348
349    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
350        |e: std::net::AddrParseError| {
351            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
352        },
353    )?;
354
355    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
356
357    let router = server::router(Arc::clone(&state));
358
359    let listener = runtime.block_on(async {
360        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
361        socket
362            .set_reuseaddr(true)
363            .map_err(BrokerError::BindFailed)?;
364        socket.bind(addr).map_err(BrokerError::BindFailed)?;
365        socket.listen(1024).map_err(BrokerError::BindFailed)
366    })?;
367
368    // Install SIGINT handler so the broker does not die on Ctrl+C.
369    // The dashboard process is responsible for user-facing Ctrl+C handling.
370    runtime.spawn(async {
371        let _ = tokio::signal::ctrl_c().await;
372    });
373
374    runtime.spawn(async move {
375        axum::serve(listener, router)
376            .with_graceful_shutdown(async {
377                let _ = shutdown_rx.await;
378            })
379            .await
380            .ok();
381    });
382
383    Ok(BrokerHandle {
384        state,
385        runtime: Some(runtime),
386        shutdown_tx: Some(shutdown_tx),
387        url,
388        stop_flag,
389        flush_thread,
390    })
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn broker_state_new_is_empty() {
399        let state = BrokerState::new(None);
400        let inner = state.read();
401        assert!(inner.agents.is_empty());
402        assert!(inner.queues.is_empty());
403        assert!(inner.message_log.is_empty());
404    }
405
406    #[test]
407    fn next_seq_starts_at_one() {
408        let state = BrokerState::new(None);
409        assert_eq!(state.next_seq(), 1);
410        assert_eq!(state.next_seq(), 2);
411        assert_eq!(state.next_seq(), 3);
412    }
413
414    #[test]
415    fn probe_no_listener() {
416        // Use a port that is almost certainly not in use.
417        let result = probe_existing_broker("http://127.0.0.1:19999");
418        assert_eq!(result, ProbeResult::NoListener);
419    }
420
421    #[test]
422    fn reattached_handle_has_no_runtime() {
423        let state = Arc::new(BrokerState::new(None));
424        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
425        assert!(h.runtime.is_none());
426        assert!(h.shutdown_tx.is_none());
427        assert!(h.flush_thread.is_none());
428    }
429
430    #[test]
431    fn start_broker_on_free_port() {
432        let config = BrokerConfig {
433            enabled: true,
434            // Use a high random port to avoid conflicts.
435            #[allow(clippy::cast_possible_truncation)]
436            port: 19_000 + (std::process::id() as u16 % 1000),
437            bind: "127.0.0.1".to_string(),
438        };
439        let state = BrokerState::new(None);
440        let handle = start_broker(&config, state);
441        // If the port happens to be in use, the test is inconclusive — not a failure.
442        if let Ok(h) = handle {
443            assert!(h.url.contains(&config.port.to_string()));
444            drop(h);
445        }
446    }
447
448    #[test]
449    fn start_broker_no_log_path_no_flush_thread() {
450        let config = BrokerConfig {
451            enabled: true,
452            #[allow(clippy::cast_possible_truncation)]
453            port: 19_100 + (std::process::id() as u16 % 100),
454            bind: "127.0.0.1".to_string(),
455        };
456        let state = BrokerState::new(None);
457        if let Ok(handle) = start_broker(&config, state) {
458            assert!(handle.flush_thread.is_none());
459            drop(handle);
460        }
461    }
462
463    #[test]
464    fn start_broker_with_log_path_spawns_flush_thread() {
465        let tmp = tempfile::tempdir().unwrap();
466        let log_path = tmp.path().join("broker.log");
467        let config = BrokerConfig {
468            enabled: true,
469            #[allow(clippy::cast_possible_truncation)]
470            port: 19_200 + (std::process::id() as u16 % 100),
471            bind: "127.0.0.1".to_string(),
472        };
473        let state = BrokerState::new(Some(log_path));
474        if let Ok(handle) = start_broker(&config, state) {
475            assert!(handle.flush_thread.is_some());
476            drop(handle);
477        }
478    }
479}