Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35/// Worktree to watch for git-status changes.
36///
37/// The broker spawns one [`watcher::watch_worktree`] task per target.
38#[derive(Debug, Clone)]
39pub struct WatchTarget {
40    /// Agent identifier (slugified branch name) that owns this worktree.
41    pub agent_id: String,
42    /// CLI name running in this agent's pane (e.g. `"claude"`).
43    pub cli: String,
44    /// Absolute path to the worktree root.
45    pub worktree_path: PathBuf,
46}
47
48/// Record of a known agent's latest state.
49#[derive(Debug, Clone)]
50pub struct AgentRecord {
51    /// Agent identifier (slugified branch name).
52    pub agent_id: String,
53    /// Last reported status label.
54    pub status: String,
55    /// When the agent last published a message.
56    pub last_seen: Instant,
57    /// The most recent message from this agent.
58    pub last_message: Option<BrokerMessage>,
59    /// When the agent most recently published an `agent.artifact status:
60    /// "committed"` event, if ever.
61    ///
62    /// Bug 8 (`auto-approve-scope-v0-6-x`): the filesystem watcher consults
63    /// this to decide whether a subsequent file modification should
64    /// re-publish `working` (within the configured TTL window). Transient —
65    /// not serialized; reset only when overwritten by a newer committed event.
66    pub last_committed_at: Option<Instant>,
67}
68
69/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
70/// and the dashboard TUI.
71#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73    /// Agent identifier (slugified branch name).
74    pub agent_id: String,
75    /// CLI name running in this agent's pane (e.g. "claude").
76    pub cli: String,
77    /// Current status label (e.g. "working", "done", "blocked").
78    pub status: String,
79    /// Seconds since the agent was last seen.
80    pub last_seen_seconds: u64,
81    /// One-line summary from the last message.
82    pub summary: String,
83    /// When the agent was last seen (for age calculations in the dashboard).
84    #[serde(skip)]
85    pub last_seen: Instant,
86    /// Most recently published `payload.phase` on an `agent.status`, if any.
87    /// The dashboard prefers this label over the message-type-derived
88    /// `status_label()` when rendering the agent's row.
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub phase: Option<String>,
91}
92
93/// Mutable broker state protected by an `RwLock`.
94#[derive(Debug)]
95pub struct BrokerStateInner {
96    /// Known agents keyed by agent ID.
97    pub agents: HashMap<String, AgentRecord>,
98    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
99    pub agent_clis: HashMap<String, String>,
100    /// Per-agent message inboxes: `(sequence_number, message)`.
101    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
102    /// Append-only message log for disk flush.
103    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
104    /// Post-commit re-entry TTL for the filesystem watcher (bug 8).
105    ///
106    /// A file modification observed within this window after an agent's
107    /// `committed` event re-publishes `working`. `Duration::ZERO` disables
108    /// the behaviour (v0.5.0 model). Defaults to 60s; overwritten from
109    /// `[broker.watcher].republish_working_ttl_seconds` at broker start.
110    pub republish_working_ttl: std::time::Duration,
111    /// Worktree paths the broker is currently watching, for idempotent live
112    /// registration via `POST /watch`. Seeded with the start-time watch
113    /// targets in [`start_broker_with`] and extended at runtime by
114    /// [`BrokerState::register_watch_target`]. A vanished worktree is pruned
115    /// by its watcher task (see [`watcher::watch_worktree`]), so re-adding the
116    /// same path later spawns a fresh watcher.
117    pub watched_paths: HashSet<PathBuf>,
118}
119
120/// Shared broker state.
121///
122/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
123/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
124/// that sequence numbers can be allocated without coupling to the write
125/// lock.
126#[derive(Debug)]
127pub struct BrokerState {
128    /// Protected mutable state.
129    inner: RwLock<BrokerStateInner>,
130    /// Global sequence counter (starts at 0; first assigned value is 1).
131    next_seq: AtomicU64,
132    /// Optional path for periodic log flush to disk.
133    pub log_path: Option<PathBuf>,
134    /// Wall-clock instant the broker state was created; used for uptime reporting.
135    started_at: Instant,
136    /// Optional learnings aggregator. Populated when supervisor + learnings
137    /// mode is active; the publish path forwards every observed message.
138    pub learnings: Option<learnings::SharedLearnings>,
139    /// When `true`, an `agent.artifact { status: "committed" }` triggers a
140    /// broker-emitted [`BrokerMessage::VerifyNow`] nudge to the supervisor
141    /// inbox so per-commit verification fires on an explicit event. Resolved
142    /// from `[supervisor].verify_on_commit_nudge` (default `true`) at session
143    /// boot and threaded in via [`BrokerState::with_verify_on_commit_nudge`].
144    pub verify_on_commit_nudge: bool,
145    /// opsx role-gating context. When `Some` and active (`OpenSpec` engine +
146    /// non-`off` mode), the publish path runs the role-gating guard on every
147    /// `agent.artifact { status: "committed" }`. `None` (the default, and
148    /// non-OpenSpec sessions) leaves the guard inert. Threaded in via
149    /// [`BrokerState::with_role_gating`].
150    pub role_gating: Option<crate::opsx::RoleGatingContext>,
151    /// Watcher shutdown receiver, populated once in [`start_broker_with`]
152    /// before the start-time watchers spawn. The `POST /watch` handler clones
153    /// it to enroll a live-registered watcher in the same shared shutdown
154    /// signal, so a hot-added watcher stops in lockstep with the rest on
155    /// [`BrokerHandle`] drop. `None` until the broker has spawned its watchers
156    /// (e.g. in router-only unit tests), in which case live registration still
157    /// records the target but spawns no watcher.
158    watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
159}
160
161impl BrokerState {
162    /// Creates a new empty broker state.
163    pub fn new(log_path: Option<PathBuf>) -> Self {
164        Self {
165            inner: RwLock::new(BrokerStateInner {
166                agents: HashMap::new(),
167                agent_clis: HashMap::new(),
168                queues: HashMap::new(),
169                message_log: Vec::new(),
170                republish_working_ttl: std::time::Duration::from_secs(
171                    crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
172                ),
173                watched_paths: HashSet::new(),
174            }),
175            next_seq: AtomicU64::new(0),
176            log_path,
177            started_at: Instant::now(),
178            learnings: None,
179            // Conservative broker-level default: the nudge is opt-in at the
180            // state level and explicitly enabled from config (whose own
181            // default is `true`) via `with_verify_on_commit_nudge`.
182            verify_on_commit_nudge: false,
183            role_gating: None,
184            watcher_shutdown_rx: OnceLock::new(),
185        }
186    }
187
188    /// Attaches the opsx role-gating context. Call before [`start_broker`] so
189    /// the publish path observes it from the first committed artifact. Passing
190    /// a context whose mode is `off` or whose engine is not `OpenSpec` leaves
191    /// the guard inert (the guard re-checks [`crate::opsx::RoleGatingContext::is_active`]).
192    #[must_use]
193    pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
194        self.role_gating = Some(ctx);
195        self
196    }
197
198    /// Sets whether committed artifacts emit a [`BrokerMessage::VerifyNow`]
199    /// nudge to the supervisor inbox. Call before [`start_broker`] so the
200    /// publish path observes the resolved value from the first message.
201    #[must_use]
202    pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
203        self.verify_on_commit_nudge = enabled;
204        self
205    }
206
207    /// Authoritatively seeds an agent's CLI into the roster's CLI map.
208    ///
209    /// git-paw knows every pane's CLI at launch (the supervisor's from
210    /// `[supervisor].cli`/`default_cli`, each agent's from the resolved
211    /// agent CLI), so the launcher seeds those known values rather than
212    /// depending on the agent to self-report via `agent.status`. This is
213    /// the only source for the supervisor's CLI column, since the
214    /// supervisor is not a filesystem watch target (W15-15). A blank `cli`
215    /// is ignored so a missing config value never clobbers the map. Call
216    /// before [`start_broker`] so the first `/status` snapshot is correct.
217    #[must_use]
218    pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
219        if !cli.is_empty()
220            && let Ok(mut inner) = self.inner.write()
221        {
222            inner
223                .agent_clis
224                .insert(agent_id.to_string(), cli.to_string());
225        }
226        self
227    }
228
229    /// Attaches a learnings aggregator. Replaces any previously attached
230    /// instance. Must be called before [`start_broker`] so the publish path
231    /// observes every message from the first one.
232    pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
233        self.learnings = Some(aggregator);
234    }
235
236    /// Sets the post-commit re-entry TTL consulted by the filesystem watcher
237    /// and the `committed -> working` transition in `update_agent_record`
238    /// (bug 8). `Duration::ZERO` disables the auto-republish behaviour.
239    pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
240        self.write().republish_working_ttl = ttl;
241    }
242
243    /// Records the shared watcher shutdown receiver so the `POST /watch`
244    /// handler can clone it when spawning a live-registered watcher. Called
245    /// once in [`start_broker_with`] before the start-time watchers spawn;
246    /// subsequent calls are ignored (the receiver is set-once).
247    pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
248        let _ = self.watcher_shutdown_rx.set(rx);
249    }
250
251    /// Returns a clone of the shared watcher shutdown receiver if the broker
252    /// has spawned its watchers, or `None` otherwise (e.g. router-only unit
253    /// tests). The `POST /watch` handler uses this to enroll a hot-added
254    /// watcher in the shared shutdown signal.
255    #[must_use]
256    pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
257        self.watcher_shutdown_rx.get().cloned()
258    }
259
260    /// Registers a worktree as a live watch target for `POST /watch`.
261    ///
262    /// Bookkeeping only — the caller spawns the actual [`watcher::watch_worktree`]
263    /// task when this returns `true`. Idempotent: the worktree path is the key,
264    /// so re-registering an already-watched path returns `false` and the caller
265    /// spawns nothing (no duplicate watcher). On a fresh path it seeds the
266    /// agent's CLI label and inbox queue exactly as the start-time targets are
267    /// seeded in [`start_broker_with`], so the agent surfaces in `/status` and
268    /// receives peer broadcasts on the same terms.
269    pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
270        let mut inner = self.write();
271        if !inner.watched_paths.insert(target.worktree_path.clone()) {
272            return false;
273        }
274        if !target.cli.is_empty() {
275            inner
276                .agent_clis
277                .insert(target.agent_id.clone(), target.cli.clone());
278        }
279        inner.queues.entry(target.agent_id.clone()).or_default();
280        true
281    }
282
283    /// Drops a worktree from the live watch-target set so a later
284    /// re-registration of the same path spawns a fresh watcher. Called by a
285    /// watcher task when it detects its worktree has disappeared (the prune
286    /// path for `git paw remove`).
287    pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
288        self.write().watched_paths.remove(worktree_path);
289    }
290
291    /// Acquires a read lock on the inner state.
292    ///
293    /// # Panics
294    ///
295    /// Panics if the lock is poisoned (a thread panicked while holding it).
296    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
297        self.inner.read().expect("broker state lock poisoned")
298    }
299
300    /// Acquires a write lock on the inner state.
301    ///
302    /// # Panics
303    ///
304    /// Panics if the lock is poisoned (a thread panicked while holding it).
305    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
306        self.inner.write().expect("broker state lock poisoned")
307    }
308
309    /// Atomically allocates the next sequence number (starting at 1).
310    pub fn next_seq(&self) -> u64 {
311        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
312    }
313
314    /// Returns the number of seconds since the broker was started.
315    ///
316    /// Used by the HTTP `/status` handler to report uptime.
317    pub fn uptime_seconds(&self) -> u64 {
318        self.started_at.elapsed().as_secs()
319    }
320}
321
322/// Errors specific to broker operations.
323#[derive(Debug, thiserror::Error)]
324pub enum BrokerError {
325    /// The configured port is already in use by a non-broker process.
326    #[error(
327        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
328    )]
329    PortInUse {
330        /// The port that was occupied.
331        port: u16,
332        /// The underlying I/O error.
333        source: std::io::Error,
334    },
335
336    /// A probe to an existing listener on the port timed out.
337    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
338    ProbeTimeout {
339        /// The port that timed out.
340        port: u16,
341    },
342
343    /// Binding to the address failed.
344    #[error("failed to bind broker: {0}")]
345    BindFailed(std::io::Error),
346
347    /// Creating the tokio runtime failed.
348    #[error("failed to create broker runtime: {0}")]
349    RuntimeFailed(std::io::Error),
350}
351
352/// Handle to a running broker, including the optional flush thread.
353///
354/// When dropped, signals the flush thread to stop and joins it, then
355/// shuts down the tokio runtime. If the handle is in "reattached" mode
356/// (connected to an existing broker), dropping it is a no-op.
357pub struct BrokerHandle {
358    /// Shared broker state.
359    pub state: Arc<BrokerState>,
360    /// The tokio runtime powering the broker server.
361    /// `None` when reattached to an existing broker.
362    runtime: Option<tokio::runtime::Runtime>,
363    /// Sends a shutdown signal to the server task.
364    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
365    /// Broadcasts the watcher shutdown signal to all watcher tasks.
366    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
367    /// The URL the broker is listening on.
368    pub url: String,
369    /// Flag to signal the flush thread to exit.
370    stop_flag: Arc<AtomicBool>,
371    /// Flush thread join handle (present only when `log_path` is `Some`).
372    flush_thread: Option<JoinHandle<()>>,
373    /// Periodic flush thread for the learnings aggregator (present only
374    /// when `state.learnings.is_some()`).
375    learnings_thread: Option<JoinHandle<()>>,
376}
377
378impl BrokerHandle {
379    /// Creates a handle that reattaches to an existing broker (no owned runtime).
380    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
381        Self {
382            state,
383            runtime: None,
384            shutdown_tx: None,
385            watcher_shutdown: None,
386            url,
387            stop_flag: Arc::new(AtomicBool::new(false)),
388            flush_thread: None,
389            learnings_thread: None,
390        }
391    }
392}
393
394impl Drop for BrokerHandle {
395    fn drop(&mut self) {
396        // 1. Signal both flush threads to stop and join the log flush thread.
397        self.stop_flag.store(true, Ordering::Release);
398        if let Some(handle) = self.flush_thread.take() {
399            let _ = handle.join();
400        }
401        // 2. Join the learnings flush thread — it performs the final
402        //    shutdown flush before returning.
403        if let Some(handle) = self.learnings_thread.take() {
404            let _ = handle.join();
405        }
406        // 3. Signal watcher tasks to stop.
407        if let Some(tx) = self.watcher_shutdown.take() {
408            let _ = tx.send(true);
409        }
410        // 4. Signal shutdown to the server task.
411        if let Some(tx) = self.shutdown_tx.take() {
412            let _ = tx.send(());
413        }
414        // 5. Give in-flight requests up to 2 seconds to drain, then drop runtime.
415        if let Some(rt) = self.runtime.take() {
416            rt.shutdown_timeout(std::time::Duration::from_secs(2));
417        }
418    }
419}
420
421/// Result of probing an existing listener on the broker port.
422#[derive(Debug, PartialEq, Eq)]
423pub enum ProbeResult {
424    /// Nothing is listening — safe to bind.
425    NoListener,
426    /// A git-paw broker is already running.
427    LiveBroker,
428    /// Something else is using the port.
429    ForeignServer,
430    /// The probe timed out.
431    Timeout,
432}
433
434/// Probes an existing listener at the given URL to determine what is running.
435///
436/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
437/// to avoid pulling in a full HTTP client dependency.
438/// Probes a URL to determine what broker (if any) is running there.
439///
440/// Public entry point for callers that need to inspect broker status without
441/// starting a new server (e.g. the `status` subcommand).
442pub fn probe_broker(url: &str) -> ProbeResult {
443    probe_existing_broker(url)
444}
445
446fn probe_existing_broker(url: &str) -> ProbeResult {
447    use std::io::{Read, Write};
448    use std::net::TcpStream;
449    use std::time::Duration;
450
451    // Parse host:port from URL like "http://127.0.0.1:9119"
452    let addr = url.strip_prefix("http://").unwrap_or(url);
453
454    let socket_addr = if let Ok(a) = addr.parse() {
455        a
456    } else {
457        use std::net::ToSocketAddrs;
458        match addr.to_socket_addrs() {
459            Ok(mut addrs) => match addrs.next() {
460                Some(a) => a,
461                None => return ProbeResult::NoListener,
462            },
463            Err(_) => return ProbeResult::NoListener,
464        }
465    };
466
467    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
468    else {
469        return ProbeResult::NoListener;
470    };
471
472    stream
473        .set_read_timeout(Some(Duration::from_millis(500)))
474        .ok();
475    stream
476        .set_write_timeout(Some(Duration::from_millis(500)))
477        .ok();
478
479    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
480    if stream.write_all(request.as_bytes()).is_err() {
481        return ProbeResult::Timeout;
482    }
483
484    let mut response = String::new();
485    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
486        return ProbeResult::Timeout;
487    }
488
489    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
490        ProbeResult::LiveBroker
491    } else if response.starts_with("HTTP/") {
492        ProbeResult::ForeignServer
493    } else {
494        ProbeResult::Timeout
495    }
496}
497
498/// Starts the HTTP broker server.
499///
500/// Probes the configured port first:
501/// - If a live git-paw broker is found, returns a reattached handle.
502/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
503/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
504/// - If nothing is listening, binds and starts the server.
505///
506/// Also spawns the background flush thread if `state.log_path` is set.
507///
508/// Calls [`start_broker`] without a conflict-detector. Equivalent to
509/// `start_broker_with(config, state, watch_targets, None)`.
510pub fn start_broker(
511    config: &BrokerConfig,
512    state: BrokerState,
513    watch_targets: Vec<WatchTarget>,
514) -> Result<BrokerHandle, BrokerError> {
515    start_broker_with(config, state, watch_targets, None, 60)
516}
517
518/// Starts the HTTP broker server with optional conflict-detector wiring
519/// and a configurable learnings-flush interval.
520///
521/// When `conflict` is `Some`, a background tokio task running the
522/// detector loop is spawned alongside the watcher tasks. The detector
523/// shuts down with the rest of the broker when [`BrokerHandle`] is
524/// dropped.
525///
526/// `learnings_flush_interval_seconds` controls how often the learnings
527/// aggregator flushes to `.git-paw/session-learnings.md` when
528/// `state.learnings` is `Some`. Default for [`start_broker`] is 60s;
529/// tests override to drive flush behaviour without waiting on real time.
530#[allow(clippy::too_many_lines)]
531pub fn start_broker_with(
532    config: &BrokerConfig,
533    state: BrokerState,
534    watch_targets: Vec<WatchTarget>,
535    conflict: Option<ConflictConfig>,
536    learnings_flush_interval_seconds: u64,
537) -> Result<BrokerHandle, BrokerError> {
538    let url = config.url();
539    let state = Arc::new(state);
540    // Apply the configured post-commit re-entry TTL (bug 8) before any
541    // watcher task or publish observes the state.
542    state.set_republish_working_ttl(std::time::Duration::from_secs(
543        config.watcher.republish_working_ttl_seconds(),
544    ));
545    let stop_flag = Arc::new(AtomicBool::new(false));
546
547    match probe_existing_broker(&url) {
548        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
549        ProbeResult::ForeignServer => {
550            return Err(BrokerError::PortInUse {
551                port: config.port,
552                source: std::io::Error::new(
553                    std::io::ErrorKind::AddrInUse,
554                    "port occupied by non-broker process",
555                ),
556            });
557        }
558        ProbeResult::Timeout => {
559            return Err(BrokerError::ProbeTimeout { port: config.port });
560        }
561        ProbeResult::NoListener => {}
562    }
563
564    // Spawn flush thread if log_path is configured.
565    let flush_thread = if state.log_path.is_some() {
566        let s = Arc::clone(&state);
567        let f = Arc::clone(&stop_flag);
568        Some(std::thread::spawn(move || {
569            delivery::flush_loop(&s, &f);
570        }))
571    } else {
572        None
573    };
574
575    // Spawn learnings flush thread when an aggregator is attached. The
576    // thread performs a final `flush_at_shutdown` after the stop flag is
577    // raised so unresolved blocks and recovery cycles end up in the file.
578    let learnings_thread = if state.learnings.is_some() {
579        let s = Arc::clone(&state);
580        let f = Arc::clone(&stop_flag);
581        Some(std::thread::spawn(move || {
582            learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
583        }))
584    } else {
585        None
586    };
587
588    let runtime = tokio::runtime::Builder::new_multi_thread()
589        .enable_all()
590        .build()
591        .map_err(BrokerError::RuntimeFailed)?;
592
593    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
594        |e: std::net::AddrParseError| {
595            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
596        },
597    )?;
598
599    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
600
601    let router = server::router(Arc::clone(&state));
602
603    let listener = runtime.block_on(async {
604        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
605        socket
606            .set_reuseaddr(true)
607            .map_err(BrokerError::BindFailed)?;
608        socket.bind(addr).map_err(BrokerError::BindFailed)?;
609        socket.listen(1024).map_err(BrokerError::BindFailed)
610    })?;
611
612    // Install SIGINT handler so the broker does not die on Ctrl+C.
613    // The dashboard process is responsible for user-facing Ctrl+C handling.
614    runtime.spawn(async {
615        let _ = tokio::signal::ctrl_c().await;
616    });
617
618    runtime.spawn(async move {
619        axum::serve(listener, router)
620            .with_graceful_shutdown(async {
621                let _ = shutdown_rx.await;
622            })
623            .await
624            .ok();
625    });
626
627    // Pre-populate the CLI label AND the inbox queue for every watched
628    // agent so (a) the dashboard shows the CLI before any status messages
629    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
630    // already-existing queues — actually reach the watched agent even
631    // before it has published anything itself.
632    {
633        let mut inner = state.write();
634        for target in &watch_targets {
635            inner
636                .agent_clis
637                .insert(target.agent_id.clone(), target.cli.clone());
638            inner.queues.entry(target.agent_id.clone()).or_default();
639            // Seed the live target set so a `POST /watch` for a start-time path is a no-op.
640            inner.watched_paths.insert(target.worktree_path.clone());
641        }
642    }
643
644    // Spawn one watcher task per target. All watchers share a single
645    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
646    // every watcher to exit on its next tick. The conflict detector
647    // shares the same shutdown channel so it stops in lockstep.
648    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
649    // Publish the shutdown receiver so `POST /watch` can enroll live watchers.
650    state.set_watcher_shutdown_rx(watcher_rx.clone());
651    for target in watch_targets {
652        let s = Arc::clone(&state);
653        let rx = watcher_rx.clone();
654        runtime.spawn(watcher::watch_worktree(s, target, rx));
655    }
656    if let Some(conflict_cfg) = conflict {
657        let s = Arc::clone(&state);
658        let rx = watcher_rx.clone();
659        runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
660    }
661
662    Ok(BrokerHandle {
663        state,
664        runtime: Some(runtime),
665        shutdown_tx: Some(shutdown_tx),
666        watcher_shutdown: Some(watcher_tx),
667        url,
668        stop_flag,
669        flush_thread,
670        learnings_thread,
671    })
672}
673
674/// Background loop driving the learnings aggregator's periodic flush.
675///
676/// Sleeps in small slices so it can react to the broker stop flag within
677/// ~100ms. When the stop flag is raised, it performs one final
678/// [`learnings::LearningsAggregator::flush_at_shutdown`] before exiting.
679fn learnings_flush_loop(
680    state: &Arc<BrokerState>,
681    stop: &Arc<AtomicBool>,
682    flush_interval_seconds: u64,
683) {
684    let Some(aggregator) = state.learnings.clone() else {
685        return;
686    };
687    let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
688    let tick = std::time::Duration::from_millis(100);
689
690    loop {
691        let mut elapsed = std::time::Duration::ZERO;
692        while elapsed < interval {
693            if stop.load(Ordering::Acquire) {
694                if let Ok(mut agg) = aggregator.lock() {
695                    let _ = agg.flush_at_shutdown();
696                }
697                publish_pending_learnings(state, &aggregator);
698                return;
699            }
700            std::thread::sleep(tick);
701            elapsed += tick;
702        }
703        if let Ok(mut agg) = aggregator.lock() {
704            let _ = agg.flush();
705        }
706        publish_pending_learnings(state, &aggregator);
707    }
708}
709
710/// Drains the aggregator's pending broker records and publishes each as an
711/// `agent.learning` message.
712///
713/// Critically, the aggregator lock is acquired *only* to drain the queue and
714/// is released before any publish: `publish_message` re-enters the aggregator
715/// via `observe`, so publishing while holding the lock would deadlock the
716/// non-reentrant `Mutex`. When broker publish is disabled the queue is always
717/// empty, so this is a cheap no-op.
718fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
719    let records = match aggregator.lock() {
720        Ok(mut agg) => agg.take_pending_publish(),
721        Err(_) => return,
722    };
723    for record in &records {
724        delivery::publish_message(state, &BrokerMessage::from(record));
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use super::*;
731
732    #[test]
733    fn broker_state_new_is_empty() {
734        let state = BrokerState::new(None);
735        let inner = state.read();
736        assert!(inner.agents.is_empty());
737        assert!(inner.queues.is_empty());
738        assert!(inner.message_log.is_empty());
739    }
740
741    #[test]
742    fn register_watch_target_is_idempotent_and_seeds_roster() {
743        let state = BrokerState::new(None);
744        let target = WatchTarget {
745            agent_id: "feat-hot".to_string(),
746            cli: "claude".to_string(),
747            worktree_path: PathBuf::from("/tmp/feat-hot"),
748        };
749        // First registration is fresh — the caller should spawn a watcher.
750        assert!(
751            state.register_watch_target(&target),
752            "first registration must return true"
753        );
754        // Re-registering the same path is a no-op — no duplicate watcher.
755        assert!(
756            !state.register_watch_target(&target),
757            "duplicate registration must return false"
758        );
759        let inner = state.read();
760        assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
761        assert_eq!(
762            inner.agent_clis.get("feat-hot").map(String::as_str),
763            Some("claude"),
764            "registration seeds the CLI label"
765        );
766        assert!(
767            inner.queues.contains_key("feat-hot"),
768            "registration seeds the inbox queue"
769        );
770    }
771
772    #[test]
773    fn forget_watch_target_allows_re_registration() {
774        let state = BrokerState::new(None);
775        let target = WatchTarget {
776            agent_id: "feat-hot".to_string(),
777            cli: "claude".to_string(),
778            worktree_path: PathBuf::from("/tmp/feat-hot"),
779        };
780        assert!(state.register_watch_target(&target));
781        state.forget_watch_target(&target.worktree_path);
782        assert!(
783            state.register_watch_target(&target),
784            "after forgetting, the same path registers fresh again"
785        );
786    }
787
788    #[test]
789    fn next_seq_starts_at_one() {
790        let state = BrokerState::new(None);
791        assert_eq!(state.next_seq(), 1);
792        assert_eq!(state.next_seq(), 2);
793        assert_eq!(state.next_seq(), 3);
794    }
795
796    #[test]
797    fn probe_no_listener() {
798        // Use a port that is almost certainly not in use.
799        let result = probe_existing_broker("http://127.0.0.1:19999");
800        assert_eq!(result, ProbeResult::NoListener);
801    }
802
803    #[test]
804    fn reattached_handle_has_no_runtime() {
805        let state = Arc::new(BrokerState::new(None));
806        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
807        assert!(h.runtime.is_none());
808        assert!(h.shutdown_tx.is_none());
809        assert!(h.flush_thread.is_none());
810    }
811
812    #[test]
813    fn start_broker_on_free_port() {
814        let config = BrokerConfig {
815            enabled: true,
816            // Use a high random port to avoid conflicts.
817            #[allow(clippy::cast_possible_truncation)]
818            port: 19_000 + (std::process::id() as u16 % 1000),
819            bind: "127.0.0.1".to_string(),
820            ..Default::default()
821        };
822        let state = BrokerState::new(None);
823        let handle = start_broker(&config, state, Vec::new());
824        // If the port happens to be in use, the test is inconclusive — not a failure.
825        if let Ok(h) = handle {
826            assert!(h.url.contains(&config.port.to_string()));
827            drop(h);
828        }
829    }
830
831    #[test]
832    fn start_broker_no_log_path_no_flush_thread() {
833        let config = BrokerConfig {
834            enabled: true,
835            #[allow(clippy::cast_possible_truncation)]
836            port: 19_100 + (std::process::id() as u16 % 100),
837            bind: "127.0.0.1".to_string(),
838            ..Default::default()
839        };
840        let state = BrokerState::new(None);
841        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
842            assert!(handle.flush_thread.is_none());
843            drop(handle);
844        }
845    }
846
847    #[test]
848    fn start_broker_with_log_path_spawns_flush_thread() {
849        let tmp = tempfile::tempdir().unwrap();
850        let log_path = tmp.path().join("broker.log");
851        let config = BrokerConfig {
852            enabled: true,
853            #[allow(clippy::cast_possible_truncation)]
854            port: 19_200 + (std::process::id() as u16 % 100),
855            bind: "127.0.0.1".to_string(),
856            ..Default::default()
857        };
858        let state = BrokerState::new(Some(log_path));
859        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
860            assert!(handle.flush_thread.is_some());
861            drop(handle);
862        }
863    }
864
865    // === agent-learning-variant: dual-output through the real publish path ===
866
867    fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
868        BrokerMessage::Feedback {
869            agent_id: target.to_string(),
870            payload: messages::FeedbackPayload {
871                from: "supervisor".to_string(),
872                errors: vec![format!(
873                    "[conflict-detector] in-flight conflict with {other} on src/a.rs"
874                )],
875            },
876        }
877    }
878
879    fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
880        state
881            .read()
882            .message_log
883            .iter()
884            .filter_map(|(_, _, m)| match m {
885                BrokerMessage::Learning { payload } => Some(payload.clone()),
886                _ => None,
887            })
888            .collect()
889    }
890
891    /// Drives one aggregator tick (flush + publish-pending) the way the flush
892    /// loop does, against an attached aggregator.
893    fn tick(state: &Arc<BrokerState>) {
894        let aggregator = state.learnings.clone().expect("aggregator attached");
895        if let Ok(mut a) = aggregator.lock() {
896            a.flush().unwrap();
897        }
898        publish_pending_learnings(state, &aggregator);
899    }
900
901    fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
902        let mut agg = learnings::LearningsAggregator::new(path);
903        agg.set_broker_publish(broker_publish);
904        agg.register_agent("feat-x");
905        agg.register_agent("feat-y");
906        let mut state = BrokerState::new(None);
907        state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
908        Arc::new(state)
909    }
910
911    // Task 7.1: broker on — a conflict scenario yields an `agent.learning`
912    // record on the broker AND a matching entry in the learnings file.
913    #[test]
914    fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
915        let tmp = tempfile::tempdir().unwrap();
916        let path = tmp.path().join("session-learnings.md");
917        let state = state_with_aggregator(path.clone(), true);
918
919        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
920        tick(&state);
921
922        let md = std::fs::read_to_string(&path).unwrap();
923        assert!(
924            md.contains("### Conflict events"),
925            "file missing conflict:\n{md}"
926        );
927
928        let learnings = learning_payloads(&state);
929        assert_eq!(learnings.len(), 1, "expected one agent.learning record");
930        assert_eq!(learnings[0].category, "conflict_event");
931        assert_eq!(learnings[0].id.len(), 16);
932        assert!(
933            md.contains(&learnings[0].title),
934            "file title must match broker record title: {}",
935            learnings[0].title
936        );
937    }
938
939    // Task 7.2: broker off — same scenario writes the file but attempts no
940    // broker publish.
941    #[test]
942    fn no_broker_publish_when_disabled_but_file_still_written() {
943        let tmp = tempfile::tempdir().unwrap();
944        let path = tmp.path().join("session-learnings.md");
945        let state = state_with_aggregator(path.clone(), false);
946
947        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
948        tick(&state);
949
950        let md = std::fs::read_to_string(&path).unwrap();
951        assert!(md.contains("### Conflict events"));
952        assert!(
953            learning_payloads(&state).is_empty(),
954            "no agent.learning record should be published when broker publish is off"
955        );
956    }
957
958    // Task 7.4: re-ticking after the queue is drained does not re-publish —
959    // each record reaches the broker exactly once.
960    #[test]
961    fn re_ticking_does_not_duplicate_learning_records() {
962        let tmp = tempfile::tempdir().unwrap();
963        let path = tmp.path().join("session-learnings.md");
964        let state = state_with_aggregator(path, true);
965
966        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
967        tick(&state);
968        tick(&state); // second tick: queue already drained, nothing new flushed
969
970        assert_eq!(
971            learning_payloads(&state).len(),
972            1,
973            "the conflict record must be published exactly once"
974        );
975    }
976
977    // The branch-scoped record is retrievable via `messages/<branch_id>`.
978    #[test]
979    fn branch_scoped_learning_is_routed_to_branch_inbox() {
980        let tmp = tempfile::tempdir().unwrap();
981        let path = tmp.path().join("session-learnings.md");
982        let state = state_with_aggregator(path, true);
983
984        // A stuck-duration scenario is branch-scoped to "feat-x".
985        delivery::publish_message(
986            &state,
987            &BrokerMessage::Blocked {
988                agent_id: "feat-x".to_string(),
989                payload: messages::BlockedPayload {
990                    needs: "types".to_string(),
991                    from: "feat-y".to_string(),
992                },
993            },
994        );
995        delivery::publish_message(
996            &state,
997            &BrokerMessage::Artifact {
998                agent_id: "feat-x".to_string(),
999                payload: messages::ArtifactPayload {
1000                    status: "done".to_string(),
1001                    exports: vec![],
1002                    modified_files: vec![],
1003                },
1004            },
1005        );
1006        tick(&state);
1007
1008        let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
1009        assert!(
1010            msgs.iter().any(|m| matches!(
1011                m,
1012                BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
1013            )),
1014            "stuck_duration learning should land in feat-x's inbox"
1015        );
1016    }
1017}