Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::{HashMap, HashSet};
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, OnceLock, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35/// Worktree to watch for git-status changes.
36///
37/// The broker spawns one [`watcher::watch_worktree`] task per target.
38#[derive(Debug, Clone)]
39pub struct WatchTarget {
40    /// Agent identifier (slugified branch name) that owns this worktree.
41    pub agent_id: String,
42    /// CLI name running in this agent's pane (e.g. `"claude"`).
43    pub cli: String,
44    /// Absolute path to the worktree root.
45    pub worktree_path: PathBuf,
46}
47
48/// Record of a known agent's latest state.
49#[derive(Debug, Clone)]
50pub struct AgentRecord {
51    /// Agent identifier (slugified branch name).
52    pub agent_id: String,
53    /// Last reported status label.
54    pub status: String,
55    /// When the agent last published a message.
56    pub last_seen: Instant,
57    /// The most recent message from this agent.
58    pub last_message: Option<BrokerMessage>,
59    /// When the agent most recently published an `agent.artifact status:
60    /// "committed"` event, if ever.
61    ///
62    /// Bug 8 (`auto-approve-scope-v0-6-x`): the filesystem watcher consults
63    /// this to decide whether a subsequent file modification should
64    /// re-publish `working` (within the configured TTL window). Transient —
65    /// not serialized; reset only when overwritten by a newer committed event.
66    pub last_committed_at: Option<Instant>,
67}
68
69/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
70/// and the dashboard TUI.
71#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73    /// Agent identifier (slugified branch name).
74    pub agent_id: String,
75    /// CLI name running in this agent's pane (e.g. "claude").
76    pub cli: String,
77    /// Current status label (e.g. "working", "done", "blocked").
78    pub status: String,
79    /// Seconds since the agent was last seen.
80    pub last_seen_seconds: u64,
81    /// When the agent was last seen (for age calculations in the dashboard).
82    #[serde(skip)]
83    pub last_seen: Instant,
84    /// Most recently published `payload.phase` on an `agent.status`, if any.
85    /// The dashboard prefers this label over the message-type-derived
86    /// `status_label()` when rendering the agent's row.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub phase: Option<String>,
89}
90
91/// Mutable broker state protected by an `RwLock`.
92#[derive(Debug)]
93pub struct BrokerStateInner {
94    /// Known agents keyed by agent ID.
95    pub agents: HashMap<String, AgentRecord>,
96    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
97    pub agent_clis: HashMap<String, String>,
98    /// Per-agent message inboxes: `(sequence_number, message)`.
99    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
100    /// Append-only message log for disk flush.
101    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
102    /// Post-commit re-entry TTL for the filesystem watcher (bug 8).
103    ///
104    /// A file modification observed within this window after an agent's
105    /// `committed` event re-publishes `working`. `Duration::ZERO` disables
106    /// the behaviour (v0.5.0 model). Defaults to 60s; overwritten from
107    /// `[broker.watcher].republish_working_ttl_seconds` at broker start.
108    pub republish_working_ttl: std::time::Duration,
109    /// Worktree paths the broker is currently watching, for idempotent live
110    /// registration via `POST /watch`. Seeded with the start-time watch
111    /// targets in [`start_broker_with`] and extended at runtime by
112    /// [`BrokerState::register_watch_target`]. A vanished worktree is pruned
113    /// by its watcher task (see [`watcher::watch_worktree`]), so re-adding the
114    /// same path later spawns a fresh watcher.
115    pub watched_paths: HashSet<PathBuf>,
116}
117
118/// Shared broker state.
119///
120/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
121/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
122/// that sequence numbers can be allocated without coupling to the write
123/// lock.
124#[derive(Debug)]
125pub struct BrokerState {
126    /// Protected mutable state.
127    inner: RwLock<BrokerStateInner>,
128    /// Global sequence counter (starts at 0; first assigned value is 1).
129    next_seq: AtomicU64,
130    /// Optional path for periodic log flush to disk.
131    pub log_path: Option<PathBuf>,
132    /// Wall-clock instant the broker state was created; used for uptime reporting.
133    started_at: Instant,
134    /// Optional learnings aggregator. Populated when supervisor + learnings
135    /// mode is active; the publish path forwards every observed message.
136    pub learnings: Option<learnings::SharedLearnings>,
137    /// When `true`, an `agent.artifact { status: "committed" }` triggers a
138    /// broker-emitted [`BrokerMessage::VerifyNow`] nudge to the supervisor
139    /// inbox so per-commit verification fires on an explicit event. Resolved
140    /// from `[supervisor].verify_on_commit_nudge` (default `true`) at session
141    /// boot and threaded in via [`BrokerState::with_verify_on_commit_nudge`].
142    pub verify_on_commit_nudge: bool,
143    /// opsx role-gating context. When `Some` and active (`OpenSpec` engine +
144    /// non-`off` mode), the publish path runs the role-gating guard on every
145    /// `agent.artifact { status: "committed" }`. `None` (the default, and
146    /// non-OpenSpec sessions) leaves the guard inert. Threaded in via
147    /// [`BrokerState::with_role_gating`].
148    pub role_gating: Option<crate::opsx::RoleGatingContext>,
149    /// Watcher shutdown receiver, populated once in [`start_broker_with`]
150    /// before the start-time watchers spawn. The `POST /watch` handler clones
151    /// it to enroll a live-registered watcher in the same shared shutdown
152    /// signal, so a hot-added watcher stops in lockstep with the rest on
153    /// [`BrokerHandle`] drop. `None` until the broker has spawned its watchers
154    /// (e.g. in router-only unit tests), in which case live registration still
155    /// records the target but spawns no watcher.
156    watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
157}
158
159impl BrokerState {
160    /// Creates a new empty broker state.
161    pub fn new(log_path: Option<PathBuf>) -> Self {
162        Self {
163            inner: RwLock::new(BrokerStateInner {
164                agents: HashMap::new(),
165                agent_clis: HashMap::new(),
166                queues: HashMap::new(),
167                message_log: Vec::new(),
168                republish_working_ttl: std::time::Duration::from_secs(
169                    crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
170                ),
171                watched_paths: HashSet::new(),
172            }),
173            next_seq: AtomicU64::new(0),
174            log_path,
175            started_at: Instant::now(),
176            learnings: None,
177            // Conservative broker-level default: the nudge is opt-in at the
178            // state level and explicitly enabled from config (whose own
179            // default is `true`) via `with_verify_on_commit_nudge`.
180            verify_on_commit_nudge: false,
181            role_gating: None,
182            watcher_shutdown_rx: OnceLock::new(),
183        }
184    }
185
186    /// Attaches the opsx role-gating context. Call before [`start_broker`] so
187    /// the publish path observes it from the first committed artifact. Passing
188    /// a context whose mode is `off` or whose engine is not `OpenSpec` leaves
189    /// the guard inert (the guard re-checks [`crate::opsx::RoleGatingContext::is_active`]).
190    #[must_use]
191    pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
192        self.role_gating = Some(ctx);
193        self
194    }
195
196    /// Sets whether committed artifacts emit a [`BrokerMessage::VerifyNow`]
197    /// nudge to the supervisor inbox. Call before [`start_broker`] so the
198    /// publish path observes the resolved value from the first message.
199    #[must_use]
200    pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
201        self.verify_on_commit_nudge = enabled;
202        self
203    }
204
205    /// Authoritatively seeds an agent's CLI into the roster's CLI map.
206    ///
207    /// git-paw knows every pane's CLI at launch (the supervisor's from
208    /// `[supervisor].cli`/`default_cli`, each agent's from the resolved
209    /// agent CLI), so the launcher seeds those known values rather than
210    /// depending on the agent to self-report via `agent.status`. This is
211    /// the only source for the supervisor's CLI column, since the
212    /// supervisor is not a filesystem watch target (W15-15). A blank `cli`
213    /// is ignored so a missing config value never clobbers the map. Call
214    /// before [`start_broker`] so the first `/status` snapshot is correct.
215    #[must_use]
216    pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
217        if !cli.is_empty()
218            && let Ok(mut inner) = self.inner.write()
219        {
220            inner
221                .agent_clis
222                .insert(agent_id.to_string(), cli.to_string());
223        }
224        self
225    }
226
227    /// Attaches a learnings aggregator. Replaces any previously attached
228    /// instance. Must be called before [`start_broker`] so the publish path
229    /// observes every message from the first one.
230    pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
231        self.learnings = Some(aggregator);
232    }
233
234    /// Sets the post-commit re-entry TTL consulted by the filesystem watcher
235    /// and the `committed -> working` transition in `update_agent_record`
236    /// (bug 8). `Duration::ZERO` disables the auto-republish behaviour.
237    pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
238        self.write().republish_working_ttl = ttl;
239    }
240
241    /// Records the shared watcher shutdown receiver so the `POST /watch`
242    /// handler can clone it when spawning a live-registered watcher. Called
243    /// once in [`start_broker_with`] before the start-time watchers spawn;
244    /// subsequent calls are ignored (the receiver is set-once).
245    pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
246        let _ = self.watcher_shutdown_rx.set(rx);
247    }
248
249    /// Returns a clone of the shared watcher shutdown receiver if the broker
250    /// has spawned its watchers, or `None` otherwise (e.g. router-only unit
251    /// tests). The `POST /watch` handler uses this to enroll a hot-added
252    /// watcher in the shared shutdown signal.
253    #[must_use]
254    pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
255        self.watcher_shutdown_rx.get().cloned()
256    }
257
258    /// Registers a worktree as a live watch target for `POST /watch`.
259    ///
260    /// Bookkeeping only — the caller spawns the actual [`watcher::watch_worktree`]
261    /// task when this returns `true`. Idempotent: the worktree path is the key,
262    /// so re-registering an already-watched path returns `false` and the caller
263    /// spawns nothing (no duplicate watcher). On a fresh path it seeds the
264    /// agent's CLI label and inbox queue exactly as the start-time targets are
265    /// seeded in [`start_broker_with`], so the agent surfaces in `/status` and
266    /// receives peer broadcasts on the same terms.
267    pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
268        let mut inner = self.write();
269        if !inner.watched_paths.insert(target.worktree_path.clone()) {
270            return false;
271        }
272        if !target.cli.is_empty() {
273            inner
274                .agent_clis
275                .insert(target.agent_id.clone(), target.cli.clone());
276        }
277        inner.queues.entry(target.agent_id.clone()).or_default();
278        true
279    }
280
281    /// Drops a worktree from the live watch-target set so a later
282    /// re-registration of the same path spawns a fresh watcher. Called by a
283    /// watcher task when it detects its worktree has disappeared (the prune
284    /// path for `git paw remove`).
285    pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
286        self.write().watched_paths.remove(worktree_path);
287    }
288
289    /// Acquires a read lock on the inner state.
290    ///
291    /// # Panics
292    ///
293    /// Panics if the lock is poisoned (a thread panicked while holding it).
294    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
295        self.inner.read().expect("broker state lock poisoned")
296    }
297
298    /// Acquires a write lock on the inner state.
299    ///
300    /// # Panics
301    ///
302    /// Panics if the lock is poisoned (a thread panicked while holding it).
303    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
304        self.inner.write().expect("broker state lock poisoned")
305    }
306
307    /// Atomically allocates the next sequence number (starting at 1).
308    pub fn next_seq(&self) -> u64 {
309        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
310    }
311
312    /// Returns the number of seconds since the broker was started.
313    ///
314    /// Used by the HTTP `/status` handler to report uptime.
315    pub fn uptime_seconds(&self) -> u64 {
316        self.started_at.elapsed().as_secs()
317    }
318}
319
320/// Errors specific to broker operations.
321#[derive(Debug, thiserror::Error)]
322pub enum BrokerError {
323    /// The configured port is already in use by a non-broker process.
324    #[error(
325        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
326    )]
327    PortInUse {
328        /// The port that was occupied.
329        port: u16,
330        /// The underlying I/O error.
331        source: std::io::Error,
332    },
333
334    /// A probe to an existing listener on the port timed out.
335    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
336    ProbeTimeout {
337        /// The port that timed out.
338        port: u16,
339    },
340
341    /// Binding to the address failed.
342    #[error("failed to bind broker: {0}")]
343    BindFailed(std::io::Error),
344
345    /// Creating the tokio runtime failed.
346    #[error("failed to create broker runtime: {0}")]
347    RuntimeFailed(std::io::Error),
348}
349
350/// Handle to a running broker, including the optional flush thread.
351///
352/// When dropped, signals the flush thread to stop and joins it, then
353/// shuts down the tokio runtime. If the handle is in "reattached" mode
354/// (connected to an existing broker), dropping it is a no-op.
355pub struct BrokerHandle {
356    /// Shared broker state.
357    pub state: Arc<BrokerState>,
358    /// The tokio runtime powering the broker server.
359    /// `None` when reattached to an existing broker.
360    runtime: Option<tokio::runtime::Runtime>,
361    /// Sends a shutdown signal to the server task.
362    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
363    /// Broadcasts the watcher shutdown signal to all watcher tasks.
364    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
365    /// The URL the broker is listening on.
366    pub url: String,
367    /// Flag to signal the flush thread to exit.
368    stop_flag: Arc<AtomicBool>,
369    /// Flush thread join handle (present only when `log_path` is `Some`).
370    flush_thread: Option<JoinHandle<()>>,
371    /// Periodic flush thread for the learnings aggregator (present only
372    /// when `state.learnings.is_some()`).
373    learnings_thread: Option<JoinHandle<()>>,
374}
375
376impl BrokerHandle {
377    /// Creates a handle that reattaches to an existing broker (no owned runtime).
378    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
379        Self {
380            state,
381            runtime: None,
382            shutdown_tx: None,
383            watcher_shutdown: None,
384            url,
385            stop_flag: Arc::new(AtomicBool::new(false)),
386            flush_thread: None,
387            learnings_thread: None,
388        }
389    }
390}
391
392impl Drop for BrokerHandle {
393    fn drop(&mut self) {
394        // 1. Signal both flush threads to stop and join the log flush thread.
395        self.stop_flag.store(true, Ordering::Release);
396        if let Some(handle) = self.flush_thread.take() {
397            let _ = handle.join();
398        }
399        // 2. Join the learnings flush thread — it performs the final
400        //    shutdown flush before returning.
401        if let Some(handle) = self.learnings_thread.take() {
402            let _ = handle.join();
403        }
404        // 3. Signal watcher tasks to stop.
405        if let Some(tx) = self.watcher_shutdown.take() {
406            let _ = tx.send(true);
407        }
408        // 4. Signal shutdown to the server task.
409        if let Some(tx) = self.shutdown_tx.take() {
410            let _ = tx.send(());
411        }
412        // 5. Give in-flight requests up to 2 seconds to drain, then drop runtime.
413        if let Some(rt) = self.runtime.take() {
414            rt.shutdown_timeout(std::time::Duration::from_secs(2));
415        }
416    }
417}
418
419/// Result of probing an existing listener on the broker port.
420#[derive(Debug, PartialEq, Eq)]
421pub enum ProbeResult {
422    /// Nothing is listening — safe to bind.
423    NoListener,
424    /// A git-paw broker is already running.
425    LiveBroker,
426    /// Something else is using the port.
427    ForeignServer,
428    /// The probe timed out.
429    Timeout,
430}
431
432/// Probes an existing listener at the given URL to determine what is running.
433///
434/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
435/// to avoid pulling in a full HTTP client dependency.
436/// Probes a URL to determine what broker (if any) is running there.
437///
438/// Public entry point for callers that need to inspect broker status without
439/// starting a new server (e.g. the `status` subcommand).
440pub fn probe_broker(url: &str) -> ProbeResult {
441    probe_existing_broker(url)
442}
443
444fn probe_existing_broker(url: &str) -> ProbeResult {
445    use std::io::{Read, Write};
446    use std::net::TcpStream;
447    use std::time::Duration;
448
449    // Parse host:port from URL like "http://127.0.0.1:9119"
450    let addr = url.strip_prefix("http://").unwrap_or(url);
451
452    let socket_addr = if let Ok(a) = addr.parse() {
453        a
454    } else {
455        use std::net::ToSocketAddrs;
456        match addr.to_socket_addrs() {
457            Ok(mut addrs) => match addrs.next() {
458                Some(a) => a,
459                None => return ProbeResult::NoListener,
460            },
461            Err(_) => return ProbeResult::NoListener,
462        }
463    };
464
465    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
466    else {
467        return ProbeResult::NoListener;
468    };
469
470    stream
471        .set_read_timeout(Some(Duration::from_millis(500)))
472        .ok();
473    stream
474        .set_write_timeout(Some(Duration::from_millis(500)))
475        .ok();
476
477    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
478    if stream.write_all(request.as_bytes()).is_err() {
479        return ProbeResult::Timeout;
480    }
481
482    let mut response = String::new();
483    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
484        return ProbeResult::Timeout;
485    }
486
487    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
488        ProbeResult::LiveBroker
489    } else if response.starts_with("HTTP/") {
490        ProbeResult::ForeignServer
491    } else {
492        ProbeResult::Timeout
493    }
494}
495
496/// Starts the HTTP broker server.
497///
498/// Probes the configured port first:
499/// - If a live git-paw broker is found, returns a reattached handle.
500/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
501/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
502/// - If nothing is listening, binds and starts the server.
503///
504/// Also spawns the background flush thread if `state.log_path` is set.
505///
506/// Calls [`start_broker`] without a conflict-detector. Equivalent to
507/// `start_broker_with(config, state, watch_targets, None)`.
508pub fn start_broker(
509    config: &BrokerConfig,
510    state: BrokerState,
511    watch_targets: Vec<WatchTarget>,
512) -> Result<BrokerHandle, BrokerError> {
513    start_broker_with(config, state, watch_targets, None, 60)
514}
515
516/// Starts the HTTP broker server with optional conflict-detector wiring
517/// and a configurable learnings-flush interval.
518///
519/// When `conflict` is `Some`, a background tokio task running the
520/// detector loop is spawned alongside the watcher tasks. The detector
521/// shuts down with the rest of the broker when [`BrokerHandle`] is
522/// dropped.
523///
524/// `learnings_flush_interval_seconds` controls how often the learnings
525/// aggregator flushes to `.git-paw/session-learnings.md` when
526/// `state.learnings` is `Some`. Default for [`start_broker`] is 60s;
527/// tests override to drive flush behaviour without waiting on real time.
528#[allow(clippy::too_many_lines)]
529pub fn start_broker_with(
530    config: &BrokerConfig,
531    state: BrokerState,
532    watch_targets: Vec<WatchTarget>,
533    conflict: Option<ConflictConfig>,
534    learnings_flush_interval_seconds: u64,
535) -> Result<BrokerHandle, BrokerError> {
536    let url = config.url();
537    let state = Arc::new(state);
538    // Apply the configured post-commit re-entry TTL (bug 8) before any
539    // watcher task or publish observes the state.
540    state.set_republish_working_ttl(std::time::Duration::from_secs(
541        config.watcher.republish_working_ttl_seconds(),
542    ));
543    let stop_flag = Arc::new(AtomicBool::new(false));
544
545    match probe_existing_broker(&url) {
546        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
547        ProbeResult::ForeignServer => {
548            return Err(BrokerError::PortInUse {
549                port: config.port,
550                source: std::io::Error::new(
551                    std::io::ErrorKind::AddrInUse,
552                    "port occupied by non-broker process",
553                ),
554            });
555        }
556        ProbeResult::Timeout => {
557            return Err(BrokerError::ProbeTimeout { port: config.port });
558        }
559        ProbeResult::NoListener => {}
560    }
561
562    // Spawn flush thread if log_path is configured.
563    let flush_thread = if state.log_path.is_some() {
564        let s = Arc::clone(&state);
565        let f = Arc::clone(&stop_flag);
566        Some(std::thread::spawn(move || {
567            delivery::flush_loop(&s, &f);
568        }))
569    } else {
570        None
571    };
572
573    // Spawn learnings flush thread when an aggregator is attached. The
574    // thread performs a final `flush_at_shutdown` after the stop flag is
575    // raised so unresolved blocks and recovery cycles end up in the file.
576    let learnings_thread = if state.learnings.is_some() {
577        let s = Arc::clone(&state);
578        let f = Arc::clone(&stop_flag);
579        Some(std::thread::spawn(move || {
580            learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
581        }))
582    } else {
583        None
584    };
585
586    let runtime = tokio::runtime::Builder::new_multi_thread()
587        .enable_all()
588        .build()
589        .map_err(BrokerError::RuntimeFailed)?;
590
591    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
592        |e: std::net::AddrParseError| {
593            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
594        },
595    )?;
596
597    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
598
599    let router = server::router(Arc::clone(&state));
600
601    let listener = runtime.block_on(async {
602        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
603        socket
604            .set_reuseaddr(true)
605            .map_err(BrokerError::BindFailed)?;
606        socket.bind(addr).map_err(BrokerError::BindFailed)?;
607        socket.listen(1024).map_err(BrokerError::BindFailed)
608    })?;
609
610    // Install SIGINT handler so the broker does not die on Ctrl+C.
611    // The dashboard process is responsible for user-facing Ctrl+C handling.
612    runtime.spawn(async {
613        let _ = tokio::signal::ctrl_c().await;
614    });
615
616    runtime.spawn(async move {
617        axum::serve(listener, router)
618            .with_graceful_shutdown(async {
619                let _ = shutdown_rx.await;
620            })
621            .await
622            .ok();
623    });
624
625    // Pre-populate the CLI label AND the inbox queue for every watched
626    // agent so (a) the dashboard shows the CLI before any status messages
627    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
628    // already-existing queues — actually reach the watched agent even
629    // before it has published anything itself.
630    {
631        let mut inner = state.write();
632        for target in &watch_targets {
633            inner
634                .agent_clis
635                .insert(target.agent_id.clone(), target.cli.clone());
636            inner.queues.entry(target.agent_id.clone()).or_default();
637            // Seed the live target set so a `POST /watch` for a start-time path is a no-op.
638            inner.watched_paths.insert(target.worktree_path.clone());
639        }
640    }
641
642    // Spawn one watcher task per target. All watchers share a single
643    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
644    // every watcher to exit on its next tick. The conflict detector
645    // shares the same shutdown channel so it stops in lockstep.
646    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
647    // Publish the shutdown receiver so `POST /watch` can enroll live watchers.
648    state.set_watcher_shutdown_rx(watcher_rx.clone());
649    for target in watch_targets {
650        let s = Arc::clone(&state);
651        let rx = watcher_rx.clone();
652        runtime.spawn(watcher::watch_worktree(s, target, rx));
653    }
654    if let Some(conflict_cfg) = conflict {
655        let s = Arc::clone(&state);
656        let rx = watcher_rx.clone();
657        runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
658    }
659
660    Ok(BrokerHandle {
661        state,
662        runtime: Some(runtime),
663        shutdown_tx: Some(shutdown_tx),
664        watcher_shutdown: Some(watcher_tx),
665        url,
666        stop_flag,
667        flush_thread,
668        learnings_thread,
669    })
670}
671
672/// Background loop driving the learnings aggregator's periodic flush.
673///
674/// Sleeps in small slices so it can react to the broker stop flag within
675/// ~100ms. When the stop flag is raised, it performs one final
676/// [`learnings::LearningsAggregator::flush_at_shutdown`] before exiting.
677fn learnings_flush_loop(
678    state: &Arc<BrokerState>,
679    stop: &Arc<AtomicBool>,
680    flush_interval_seconds: u64,
681) {
682    let Some(aggregator) = state.learnings.clone() else {
683        return;
684    };
685    let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
686    let tick = std::time::Duration::from_millis(100);
687
688    loop {
689        let mut elapsed = std::time::Duration::ZERO;
690        while elapsed < interval {
691            if stop.load(Ordering::Acquire) {
692                if let Ok(mut agg) = aggregator.lock() {
693                    let _ = agg.flush_at_shutdown();
694                }
695                publish_pending_learnings(state, &aggregator);
696                return;
697            }
698            std::thread::sleep(tick);
699            elapsed += tick;
700        }
701        if let Ok(mut agg) = aggregator.lock() {
702            let _ = agg.flush();
703        }
704        publish_pending_learnings(state, &aggregator);
705    }
706}
707
708/// Drains the aggregator's pending broker records and publishes each as an
709/// `agent.learning` message.
710///
711/// Critically, the aggregator lock is acquired *only* to drain the queue and
712/// is released before any publish: `publish_message` re-enters the aggregator
713/// via `observe`, so publishing while holding the lock would deadlock the
714/// non-reentrant `Mutex`. When broker publish is disabled the queue is always
715/// empty, so this is a cheap no-op.
716fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
717    let records = match aggregator.lock() {
718        Ok(mut agg) => agg.take_pending_publish(),
719        Err(_) => return,
720    };
721    for record in &records {
722        delivery::publish_message(state, &BrokerMessage::from(record));
723    }
724}
725
726#[cfg(test)]
727mod tests {
728    use super::*;
729
730    #[test]
731    fn broker_state_new_is_empty() {
732        let state = BrokerState::new(None);
733        let inner = state.read();
734        assert!(inner.agents.is_empty());
735        assert!(inner.queues.is_empty());
736        assert!(inner.message_log.is_empty());
737    }
738
739    #[test]
740    fn register_watch_target_is_idempotent_and_seeds_roster() {
741        let state = BrokerState::new(None);
742        let target = WatchTarget {
743            agent_id: "feat-hot".to_string(),
744            cli: "claude".to_string(),
745            worktree_path: PathBuf::from("/tmp/feat-hot"),
746        };
747        // First registration is fresh — the caller should spawn a watcher.
748        assert!(
749            state.register_watch_target(&target),
750            "first registration must return true"
751        );
752        // Re-registering the same path is a no-op — no duplicate watcher.
753        assert!(
754            !state.register_watch_target(&target),
755            "duplicate registration must return false"
756        );
757        let inner = state.read();
758        assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
759        assert_eq!(
760            inner.agent_clis.get("feat-hot").map(String::as_str),
761            Some("claude"),
762            "registration seeds the CLI label"
763        );
764        assert!(
765            inner.queues.contains_key("feat-hot"),
766            "registration seeds the inbox queue"
767        );
768    }
769
770    #[test]
771    fn forget_watch_target_allows_re_registration() {
772        let state = BrokerState::new(None);
773        let target = WatchTarget {
774            agent_id: "feat-hot".to_string(),
775            cli: "claude".to_string(),
776            worktree_path: PathBuf::from("/tmp/feat-hot"),
777        };
778        assert!(state.register_watch_target(&target));
779        state.forget_watch_target(&target.worktree_path);
780        assert!(
781            state.register_watch_target(&target),
782            "after forgetting, the same path registers fresh again"
783        );
784    }
785
786    #[test]
787    fn next_seq_starts_at_one() {
788        let state = BrokerState::new(None);
789        assert_eq!(state.next_seq(), 1);
790        assert_eq!(state.next_seq(), 2);
791        assert_eq!(state.next_seq(), 3);
792    }
793
794    #[test]
795    fn probe_no_listener() {
796        // Use a port that is almost certainly not in use.
797        let result = probe_existing_broker("http://127.0.0.1:19999");
798        assert_eq!(result, ProbeResult::NoListener);
799    }
800
801    #[test]
802    fn reattached_handle_has_no_runtime() {
803        let state = Arc::new(BrokerState::new(None));
804        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
805        assert!(h.runtime.is_none());
806        assert!(h.shutdown_tx.is_none());
807        assert!(h.flush_thread.is_none());
808    }
809
810    #[test]
811    fn start_broker_on_free_port() {
812        let config = BrokerConfig {
813            enabled: true,
814            // Use a high random port to avoid conflicts.
815            #[allow(clippy::cast_possible_truncation)]
816            port: 19_000 + (std::process::id() as u16 % 1000),
817            bind: "127.0.0.1".to_string(),
818            ..Default::default()
819        };
820        let state = BrokerState::new(None);
821        let handle = start_broker(&config, state, Vec::new());
822        // If the port happens to be in use, the test is inconclusive — not a failure.
823        if let Ok(h) = handle {
824            assert!(h.url.contains(&config.port.to_string()));
825            drop(h);
826        }
827    }
828
829    #[test]
830    fn start_broker_no_log_path_no_flush_thread() {
831        let config = BrokerConfig {
832            enabled: true,
833            #[allow(clippy::cast_possible_truncation)]
834            port: 19_100 + (std::process::id() as u16 % 100),
835            bind: "127.0.0.1".to_string(),
836            ..Default::default()
837        };
838        let state = BrokerState::new(None);
839        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
840            assert!(handle.flush_thread.is_none());
841            drop(handle);
842        }
843    }
844
845    #[test]
846    fn start_broker_with_log_path_spawns_flush_thread() {
847        let tmp = tempfile::tempdir().unwrap();
848        let log_path = tmp.path().join("broker.log");
849        let config = BrokerConfig {
850            enabled: true,
851            #[allow(clippy::cast_possible_truncation)]
852            port: 19_200 + (std::process::id() as u16 % 100),
853            bind: "127.0.0.1".to_string(),
854            ..Default::default()
855        };
856        let state = BrokerState::new(Some(log_path));
857        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
858            assert!(handle.flush_thread.is_some());
859            drop(handle);
860        }
861    }
862
863    // === agent-learning-variant: dual-output through the real publish path ===
864
865    fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
866        BrokerMessage::Feedback {
867            agent_id: target.to_string(),
868            payload: messages::FeedbackPayload {
869                from: "supervisor".to_string(),
870                errors: vec![format!(
871                    "[conflict-detector] in-flight conflict with {other} on src/a.rs"
872                )],
873            },
874        }
875    }
876
877    fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
878        state
879            .read()
880            .message_log
881            .iter()
882            .filter_map(|(_, _, m)| match m {
883                BrokerMessage::Learning { payload } => Some(payload.clone()),
884                _ => None,
885            })
886            .collect()
887    }
888
889    /// Drives one aggregator tick (flush + publish-pending) the way the flush
890    /// loop does, against an attached aggregator.
891    fn tick(state: &Arc<BrokerState>) {
892        let aggregator = state.learnings.clone().expect("aggregator attached");
893        if let Ok(mut a) = aggregator.lock() {
894            a.flush().unwrap();
895        }
896        publish_pending_learnings(state, &aggregator);
897    }
898
899    fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
900        let mut agg = learnings::LearningsAggregator::new(path);
901        agg.set_broker_publish(broker_publish);
902        agg.register_agent("feat-x");
903        agg.register_agent("feat-y");
904        let mut state = BrokerState::new(None);
905        state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
906        Arc::new(state)
907    }
908
909    // Task 7.1: broker on — a conflict scenario yields an `agent.learning`
910    // record on the broker AND a matching entry in the learnings file.
911    #[test]
912    fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
913        let tmp = tempfile::tempdir().unwrap();
914        let path = tmp.path().join("session-learnings.md");
915        let state = state_with_aggregator(path.clone(), true);
916
917        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
918        tick(&state);
919
920        let md = std::fs::read_to_string(&path).unwrap();
921        assert!(
922            md.contains("### Conflict events"),
923            "file missing conflict:\n{md}"
924        );
925
926        let learnings = learning_payloads(&state);
927        assert_eq!(learnings.len(), 1, "expected one agent.learning record");
928        assert_eq!(learnings[0].category, "conflict_event");
929        assert_eq!(learnings[0].id.len(), 16);
930        assert!(
931            md.contains(&learnings[0].title),
932            "file title must match broker record title: {}",
933            learnings[0].title
934        );
935    }
936
937    // Task 7.2: broker off — same scenario writes the file but attempts no
938    // broker publish.
939    #[test]
940    fn no_broker_publish_when_disabled_but_file_still_written() {
941        let tmp = tempfile::tempdir().unwrap();
942        let path = tmp.path().join("session-learnings.md");
943        let state = state_with_aggregator(path.clone(), false);
944
945        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
946        tick(&state);
947
948        let md = std::fs::read_to_string(&path).unwrap();
949        assert!(md.contains("### Conflict events"));
950        assert!(
951            learning_payloads(&state).is_empty(),
952            "no agent.learning record should be published when broker publish is off"
953        );
954    }
955
956    // Task 7.4: re-ticking after the queue is drained does not re-publish —
957    // each record reaches the broker exactly once.
958    #[test]
959    fn re_ticking_does_not_duplicate_learning_records() {
960        let tmp = tempfile::tempdir().unwrap();
961        let path = tmp.path().join("session-learnings.md");
962        let state = state_with_aggregator(path, true);
963
964        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
965        tick(&state);
966        tick(&state); // second tick: queue already drained, nothing new flushed
967
968        assert_eq!(
969            learning_payloads(&state).len(),
970            1,
971            "the conflict record must be published exactly once"
972        );
973    }
974
975    // The branch-scoped record is retrievable via `messages/<branch_id>`.
976    #[test]
977    fn branch_scoped_learning_is_routed_to_branch_inbox() {
978        let tmp = tempfile::tempdir().unwrap();
979        let path = tmp.path().join("session-learnings.md");
980        let state = state_with_aggregator(path, true);
981
982        // A stuck-duration scenario is branch-scoped to "feat-x".
983        delivery::publish_message(
984            &state,
985            &BrokerMessage::Blocked {
986                agent_id: "feat-x".to_string(),
987                payload: messages::BlockedPayload {
988                    needs: "types".to_string(),
989                    from: "feat-y".to_string(),
990                },
991            },
992        );
993        delivery::publish_message(
994            &state,
995            &BrokerMessage::Artifact {
996                agent_id: "feat-x".to_string(),
997                payload: messages::ArtifactPayload {
998                    status: "done".to_string(),
999                    exports: vec![],
1000                    modified_files: vec![],
1001                },
1002            },
1003        );
1004        tick(&state);
1005
1006        let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
1007        assert!(
1008            msgs.iter().any(|m| matches!(
1009                m,
1010                BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
1011            )),
1012            "stuck_duration learning should land in feat-x's inbox"
1013        );
1014    }
1015}