Skip to main content

git_paw/broker/
mod.rs

1//! HTTP broker for agent coordination.
2//!
3//! Provides an HTTP server that agents use to publish messages, poll for
4//! incoming messages, and report status. The broker runs on a background
5//! tokio runtime and is managed through [`BrokerHandle`].
6//!
7//! # Lock discipline
8//!
9//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
10//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
11//! enabled project-wide to catch violations at compile time. Use the
12//! `read()` / `write()` methods to obtain guards inside synchronous closures
13//! only.
14
15pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35/// Worktree to watch for git-status changes.
36///
37/// The broker spawns one [`watcher::watch_worktree`] task per target.
38#[derive(Debug, Clone)]
39pub struct WatchTarget {
40    /// Agent identifier (slugified branch name) that owns this worktree.
41    pub agent_id: String,
42    /// CLI name running in this agent's pane (e.g. `"claude"`).
43    pub cli: String,
44    /// Absolute path to the worktree root.
45    pub worktree_path: PathBuf,
46}
47
48/// Record of a known agent's latest state.
49#[derive(Debug, Clone)]
50pub struct AgentRecord {
51    /// Agent identifier (slugified branch name).
52    pub agent_id: String,
53    /// Last reported status label.
54    pub status: String,
55    /// When the agent last published a message.
56    pub last_seen: Instant,
57    /// The most recent message from this agent.
58    pub last_message: Option<BrokerMessage>,
59    /// When the agent most recently published an `agent.artifact status:
60    /// "committed"` event, if ever.
61    ///
62    /// Bug 8 (`auto-approve-scope-v0-6-x`): the filesystem watcher consults
63    /// this to decide whether a subsequent file modification should
64    /// re-publish `working` (within the configured TTL window). Transient —
65    /// not serialized; reset only when overwritten by a newer committed event.
66    pub last_committed_at: Option<Instant>,
67}
68
69/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
70/// and the dashboard TUI.
71#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73    /// Agent identifier (slugified branch name).
74    pub agent_id: String,
75    /// CLI name running in this agent's pane (e.g. "claude").
76    pub cli: String,
77    /// Current status label (e.g. "working", "done", "blocked").
78    pub status: String,
79    /// Seconds since the agent was last seen.
80    pub last_seen_seconds: u64,
81    /// One-line summary from the last message.
82    pub summary: String,
83    /// When the agent was last seen (for age calculations in the dashboard).
84    #[serde(skip)]
85    pub last_seen: Instant,
86    /// Most recently published `payload.phase` on an `agent.status`, if any.
87    /// The dashboard prefers this label over the message-type-derived
88    /// `status_label()` when rendering the agent's row.
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub phase: Option<String>,
91}
92
93/// Mutable broker state protected by an `RwLock`.
94#[derive(Debug)]
95pub struct BrokerStateInner {
96    /// Known agents keyed by agent ID.
97    pub agents: HashMap<String, AgentRecord>,
98    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
99    pub agent_clis: HashMap<String, String>,
100    /// Per-agent message inboxes: `(sequence_number, message)`.
101    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
102    /// Append-only message log for disk flush.
103    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
104    /// Post-commit re-entry TTL for the filesystem watcher (bug 8).
105    ///
106    /// A file modification observed within this window after an agent's
107    /// `committed` event re-publishes `working`. `Duration::ZERO` disables
108    /// the behaviour (v0.5.0 model). Defaults to 60s; overwritten from
109    /// `[broker.watcher].republish_working_ttl_seconds` at broker start.
110    pub republish_working_ttl: std::time::Duration,
111}
112
113/// Shared broker state.
114///
115/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
116/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
117/// that sequence numbers can be allocated without coupling to the write
118/// lock.
119#[derive(Debug)]
120pub struct BrokerState {
121    /// Protected mutable state.
122    inner: RwLock<BrokerStateInner>,
123    /// Global sequence counter (starts at 0; first assigned value is 1).
124    next_seq: AtomicU64,
125    /// Optional path for periodic log flush to disk.
126    pub log_path: Option<PathBuf>,
127    /// Wall-clock instant the broker state was created; used for uptime reporting.
128    started_at: Instant,
129    /// Optional learnings aggregator. Populated when supervisor + learnings
130    /// mode is active; the publish path forwards every observed message.
131    pub learnings: Option<learnings::SharedLearnings>,
132    /// When `true`, an `agent.artifact { status: "committed" }` triggers a
133    /// broker-emitted [`BrokerMessage::VerifyNow`] nudge to the supervisor
134    /// inbox so per-commit verification fires on an explicit event. Resolved
135    /// from `[supervisor].verify_on_commit_nudge` (default `true`) at session
136    /// boot and threaded in via [`BrokerState::with_verify_on_commit_nudge`].
137    pub verify_on_commit_nudge: bool,
138    /// opsx role-gating context. When `Some` and active (`OpenSpec` engine +
139    /// non-`off` mode), the publish path runs the role-gating guard on every
140    /// `agent.artifact { status: "committed" }`. `None` (the default, and
141    /// non-OpenSpec sessions) leaves the guard inert. Threaded in via
142    /// [`BrokerState::with_role_gating`].
143    pub role_gating: Option<crate::opsx::RoleGatingContext>,
144}
145
146impl BrokerState {
147    /// Creates a new empty broker state.
148    pub fn new(log_path: Option<PathBuf>) -> Self {
149        Self {
150            inner: RwLock::new(BrokerStateInner {
151                agents: HashMap::new(),
152                agent_clis: HashMap::new(),
153                queues: HashMap::new(),
154                message_log: Vec::new(),
155                republish_working_ttl: std::time::Duration::from_secs(
156                    crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
157                ),
158            }),
159            next_seq: AtomicU64::new(0),
160            log_path,
161            started_at: Instant::now(),
162            learnings: None,
163            // Conservative broker-level default: the nudge is opt-in at the
164            // state level and explicitly enabled from config (whose own
165            // default is `true`) via `with_verify_on_commit_nudge`.
166            verify_on_commit_nudge: false,
167            role_gating: None,
168        }
169    }
170
171    /// Attaches the opsx role-gating context. Call before [`start_broker`] so
172    /// the publish path observes it from the first committed artifact. Passing
173    /// a context whose mode is `off` or whose engine is not `OpenSpec` leaves
174    /// the guard inert (the guard re-checks [`crate::opsx::RoleGatingContext::is_active`]).
175    #[must_use]
176    pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
177        self.role_gating = Some(ctx);
178        self
179    }
180
181    /// Sets whether committed artifacts emit a [`BrokerMessage::VerifyNow`]
182    /// nudge to the supervisor inbox. Call before [`start_broker`] so the
183    /// publish path observes the resolved value from the first message.
184    #[must_use]
185    pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
186        self.verify_on_commit_nudge = enabled;
187        self
188    }
189
190    /// Authoritatively seeds an agent's CLI into the roster's CLI map.
191    ///
192    /// git-paw knows every pane's CLI at launch (the supervisor's from
193    /// `[supervisor].cli`/`default_cli`, each agent's from the resolved
194    /// agent CLI), so the launcher seeds those known values rather than
195    /// depending on the agent to self-report via `agent.status`. This is
196    /// the only source for the supervisor's CLI column, since the
197    /// supervisor is not a filesystem watch target (W15-15). A blank `cli`
198    /// is ignored so a missing config value never clobbers the map. Call
199    /// before [`start_broker`] so the first `/status` snapshot is correct.
200    #[must_use]
201    pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
202        if !cli.is_empty()
203            && let Ok(mut inner) = self.inner.write()
204        {
205            inner
206                .agent_clis
207                .insert(agent_id.to_string(), cli.to_string());
208        }
209        self
210    }
211
212    /// Attaches a learnings aggregator. Replaces any previously attached
213    /// instance. Must be called before [`start_broker`] so the publish path
214    /// observes every message from the first one.
215    pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
216        self.learnings = Some(aggregator);
217    }
218
219    /// Sets the post-commit re-entry TTL consulted by the filesystem watcher
220    /// and the `committed -> working` transition in `update_agent_record`
221    /// (bug 8). `Duration::ZERO` disables the auto-republish behaviour.
222    pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
223        self.write().republish_working_ttl = ttl;
224    }
225
226    /// Acquires a read lock on the inner state.
227    ///
228    /// # Panics
229    ///
230    /// Panics if the lock is poisoned (a thread panicked while holding it).
231    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
232        self.inner.read().expect("broker state lock poisoned")
233    }
234
235    /// Acquires a write lock on the inner state.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the lock is poisoned (a thread panicked while holding it).
240    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
241        self.inner.write().expect("broker state lock poisoned")
242    }
243
244    /// Atomically allocates the next sequence number (starting at 1).
245    pub fn next_seq(&self) -> u64 {
246        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
247    }
248
249    /// Returns the number of seconds since the broker was started.
250    ///
251    /// Used by the HTTP `/status` handler to report uptime.
252    pub fn uptime_seconds(&self) -> u64 {
253        self.started_at.elapsed().as_secs()
254    }
255}
256
257/// Errors specific to broker operations.
258#[derive(Debug, thiserror::Error)]
259pub enum BrokerError {
260    /// The configured port is already in use by a non-broker process.
261    #[error(
262        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
263    )]
264    PortInUse {
265        /// The port that was occupied.
266        port: u16,
267        /// The underlying I/O error.
268        source: std::io::Error,
269    },
270
271    /// A probe to an existing listener on the port timed out.
272    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
273    ProbeTimeout {
274        /// The port that timed out.
275        port: u16,
276    },
277
278    /// Binding to the address failed.
279    #[error("failed to bind broker: {0}")]
280    BindFailed(std::io::Error),
281
282    /// Creating the tokio runtime failed.
283    #[error("failed to create broker runtime: {0}")]
284    RuntimeFailed(std::io::Error),
285}
286
287/// Handle to a running broker, including the optional flush thread.
288///
289/// When dropped, signals the flush thread to stop and joins it, then
290/// shuts down the tokio runtime. If the handle is in "reattached" mode
291/// (connected to an existing broker), dropping it is a no-op.
292pub struct BrokerHandle {
293    /// Shared broker state.
294    pub state: Arc<BrokerState>,
295    /// The tokio runtime powering the broker server.
296    /// `None` when reattached to an existing broker.
297    runtime: Option<tokio::runtime::Runtime>,
298    /// Sends a shutdown signal to the server task.
299    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
300    /// Broadcasts the watcher shutdown signal to all watcher tasks.
301    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
302    /// The URL the broker is listening on.
303    pub url: String,
304    /// Flag to signal the flush thread to exit.
305    stop_flag: Arc<AtomicBool>,
306    /// Flush thread join handle (present only when `log_path` is `Some`).
307    flush_thread: Option<JoinHandle<()>>,
308    /// Periodic flush thread for the learnings aggregator (present only
309    /// when `state.learnings.is_some()`).
310    learnings_thread: Option<JoinHandle<()>>,
311}
312
313impl BrokerHandle {
314    /// Creates a handle that reattaches to an existing broker (no owned runtime).
315    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
316        Self {
317            state,
318            runtime: None,
319            shutdown_tx: None,
320            watcher_shutdown: None,
321            url,
322            stop_flag: Arc::new(AtomicBool::new(false)),
323            flush_thread: None,
324            learnings_thread: None,
325        }
326    }
327}
328
329impl Drop for BrokerHandle {
330    fn drop(&mut self) {
331        // 1. Signal both flush threads to stop and join the log flush thread.
332        self.stop_flag.store(true, Ordering::Release);
333        if let Some(handle) = self.flush_thread.take() {
334            let _ = handle.join();
335        }
336        // 2. Join the learnings flush thread — it performs the final
337        //    shutdown flush before returning.
338        if let Some(handle) = self.learnings_thread.take() {
339            let _ = handle.join();
340        }
341        // 3. Signal watcher tasks to stop.
342        if let Some(tx) = self.watcher_shutdown.take() {
343            let _ = tx.send(true);
344        }
345        // 4. Signal shutdown to the server task.
346        if let Some(tx) = self.shutdown_tx.take() {
347            let _ = tx.send(());
348        }
349        // 5. Give in-flight requests up to 2 seconds to drain, then drop runtime.
350        if let Some(rt) = self.runtime.take() {
351            rt.shutdown_timeout(std::time::Duration::from_secs(2));
352        }
353    }
354}
355
356/// Result of probing an existing listener on the broker port.
357#[derive(Debug, PartialEq, Eq)]
358pub enum ProbeResult {
359    /// Nothing is listening — safe to bind.
360    NoListener,
361    /// A git-paw broker is already running.
362    LiveBroker,
363    /// Something else is using the port.
364    ForeignServer,
365    /// The probe timed out.
366    Timeout,
367}
368
369/// Probes an existing listener at the given URL to determine what is running.
370///
371/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
372/// to avoid pulling in a full HTTP client dependency.
373/// Probes a URL to determine what broker (if any) is running there.
374///
375/// Public entry point for callers that need to inspect broker status without
376/// starting a new server (e.g. the `status` subcommand).
377pub fn probe_broker(url: &str) -> ProbeResult {
378    probe_existing_broker(url)
379}
380
381fn probe_existing_broker(url: &str) -> ProbeResult {
382    use std::io::{Read, Write};
383    use std::net::TcpStream;
384    use std::time::Duration;
385
386    // Parse host:port from URL like "http://127.0.0.1:9119"
387    let addr = url.strip_prefix("http://").unwrap_or(url);
388
389    let socket_addr = if let Ok(a) = addr.parse() {
390        a
391    } else {
392        use std::net::ToSocketAddrs;
393        match addr.to_socket_addrs() {
394            Ok(mut addrs) => match addrs.next() {
395                Some(a) => a,
396                None => return ProbeResult::NoListener,
397            },
398            Err(_) => return ProbeResult::NoListener,
399        }
400    };
401
402    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
403    else {
404        return ProbeResult::NoListener;
405    };
406
407    stream
408        .set_read_timeout(Some(Duration::from_millis(500)))
409        .ok();
410    stream
411        .set_write_timeout(Some(Duration::from_millis(500)))
412        .ok();
413
414    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
415    if stream.write_all(request.as_bytes()).is_err() {
416        return ProbeResult::Timeout;
417    }
418
419    let mut response = String::new();
420    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
421        return ProbeResult::Timeout;
422    }
423
424    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
425        ProbeResult::LiveBroker
426    } else if response.starts_with("HTTP/") {
427        ProbeResult::ForeignServer
428    } else {
429        ProbeResult::Timeout
430    }
431}
432
433/// Starts the HTTP broker server.
434///
435/// Probes the configured port first:
436/// - If a live git-paw broker is found, returns a reattached handle.
437/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
438/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
439/// - If nothing is listening, binds and starts the server.
440///
441/// Also spawns the background flush thread if `state.log_path` is set.
442///
443/// Calls [`start_broker`] without a conflict-detector. Equivalent to
444/// `start_broker_with(config, state, watch_targets, None)`.
445pub fn start_broker(
446    config: &BrokerConfig,
447    state: BrokerState,
448    watch_targets: Vec<WatchTarget>,
449) -> Result<BrokerHandle, BrokerError> {
450    start_broker_with(config, state, watch_targets, None, 60)
451}
452
453/// Starts the HTTP broker server with optional conflict-detector wiring
454/// and a configurable learnings-flush interval.
455///
456/// When `conflict` is `Some`, a background tokio task running the
457/// detector loop is spawned alongside the watcher tasks. The detector
458/// shuts down with the rest of the broker when [`BrokerHandle`] is
459/// dropped.
460///
461/// `learnings_flush_interval_seconds` controls how often the learnings
462/// aggregator flushes to `.git-paw/session-learnings.md` when
463/// `state.learnings` is `Some`. Default for [`start_broker`] is 60s;
464/// tests override to drive flush behaviour without waiting on real time.
465pub fn start_broker_with(
466    config: &BrokerConfig,
467    state: BrokerState,
468    watch_targets: Vec<WatchTarget>,
469    conflict: Option<ConflictConfig>,
470    learnings_flush_interval_seconds: u64,
471) -> Result<BrokerHandle, BrokerError> {
472    let url = config.url();
473    let state = Arc::new(state);
474    // Apply the configured post-commit re-entry TTL (bug 8) before any
475    // watcher task or publish observes the state.
476    state.set_republish_working_ttl(std::time::Duration::from_secs(
477        config.watcher.republish_working_ttl_seconds(),
478    ));
479    let stop_flag = Arc::new(AtomicBool::new(false));
480
481    match probe_existing_broker(&url) {
482        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
483        ProbeResult::ForeignServer => {
484            return Err(BrokerError::PortInUse {
485                port: config.port,
486                source: std::io::Error::new(
487                    std::io::ErrorKind::AddrInUse,
488                    "port occupied by non-broker process",
489                ),
490            });
491        }
492        ProbeResult::Timeout => {
493            return Err(BrokerError::ProbeTimeout { port: config.port });
494        }
495        ProbeResult::NoListener => {}
496    }
497
498    // Spawn flush thread if log_path is configured.
499    let flush_thread = if state.log_path.is_some() {
500        let s = Arc::clone(&state);
501        let f = Arc::clone(&stop_flag);
502        Some(std::thread::spawn(move || {
503            delivery::flush_loop(&s, &f);
504        }))
505    } else {
506        None
507    };
508
509    // Spawn learnings flush thread when an aggregator is attached. The
510    // thread performs a final `flush_at_shutdown` after the stop flag is
511    // raised so unresolved blocks and recovery cycles end up in the file.
512    let learnings_thread = if state.learnings.is_some() {
513        let s = Arc::clone(&state);
514        let f = Arc::clone(&stop_flag);
515        Some(std::thread::spawn(move || {
516            learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
517        }))
518    } else {
519        None
520    };
521
522    let runtime = tokio::runtime::Builder::new_multi_thread()
523        .enable_all()
524        .build()
525        .map_err(BrokerError::RuntimeFailed)?;
526
527    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
528        |e: std::net::AddrParseError| {
529            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
530        },
531    )?;
532
533    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
534
535    let router = server::router(Arc::clone(&state));
536
537    let listener = runtime.block_on(async {
538        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
539        socket
540            .set_reuseaddr(true)
541            .map_err(BrokerError::BindFailed)?;
542        socket.bind(addr).map_err(BrokerError::BindFailed)?;
543        socket.listen(1024).map_err(BrokerError::BindFailed)
544    })?;
545
546    // Install SIGINT handler so the broker does not die on Ctrl+C.
547    // The dashboard process is responsible for user-facing Ctrl+C handling.
548    runtime.spawn(async {
549        let _ = tokio::signal::ctrl_c().await;
550    });
551
552    runtime.spawn(async move {
553        axum::serve(listener, router)
554            .with_graceful_shutdown(async {
555                let _ = shutdown_rx.await;
556            })
557            .await
558            .ok();
559    });
560
561    // Pre-populate the CLI label AND the inbox queue for every watched
562    // agent so (a) the dashboard shows the CLI before any status messages
563    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
564    // already-existing queues — actually reach the watched agent even
565    // before it has published anything itself.
566    {
567        let mut inner = state.write();
568        for target in &watch_targets {
569            inner
570                .agent_clis
571                .insert(target.agent_id.clone(), target.cli.clone());
572            inner.queues.entry(target.agent_id.clone()).or_default();
573        }
574    }
575
576    // Spawn one watcher task per target. All watchers share a single
577    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
578    // every watcher to exit on its next tick. The conflict detector
579    // shares the same shutdown channel so it stops in lockstep.
580    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
581    for target in watch_targets {
582        let s = Arc::clone(&state);
583        let rx = watcher_rx.clone();
584        runtime.spawn(watcher::watch_worktree(s, target, rx));
585    }
586    if let Some(conflict_cfg) = conflict {
587        let s = Arc::clone(&state);
588        let rx = watcher_rx.clone();
589        runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
590    }
591
592    Ok(BrokerHandle {
593        state,
594        runtime: Some(runtime),
595        shutdown_tx: Some(shutdown_tx),
596        watcher_shutdown: Some(watcher_tx),
597        url,
598        stop_flag,
599        flush_thread,
600        learnings_thread,
601    })
602}
603
604/// Background loop driving the learnings aggregator's periodic flush.
605///
606/// Sleeps in small slices so it can react to the broker stop flag within
607/// ~100ms. When the stop flag is raised, it performs one final
608/// [`learnings::LearningsAggregator::flush_at_shutdown`] before exiting.
609fn learnings_flush_loop(
610    state: &Arc<BrokerState>,
611    stop: &Arc<AtomicBool>,
612    flush_interval_seconds: u64,
613) {
614    let Some(aggregator) = state.learnings.clone() else {
615        return;
616    };
617    let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
618    let tick = std::time::Duration::from_millis(100);
619
620    loop {
621        let mut elapsed = std::time::Duration::ZERO;
622        while elapsed < interval {
623            if stop.load(Ordering::Acquire) {
624                if let Ok(mut agg) = aggregator.lock() {
625                    let _ = agg.flush_at_shutdown();
626                }
627                publish_pending_learnings(state, &aggregator);
628                return;
629            }
630            std::thread::sleep(tick);
631            elapsed += tick;
632        }
633        if let Ok(mut agg) = aggregator.lock() {
634            let _ = agg.flush();
635        }
636        publish_pending_learnings(state, &aggregator);
637    }
638}
639
640/// Drains the aggregator's pending broker records and publishes each as an
641/// `agent.learning` message.
642///
643/// Critically, the aggregator lock is acquired *only* to drain the queue and
644/// is released before any publish: `publish_message` re-enters the aggregator
645/// via `observe`, so publishing while holding the lock would deadlock the
646/// non-reentrant `Mutex`. When broker publish is disabled the queue is always
647/// empty, so this is a cheap no-op.
648fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
649    let records = match aggregator.lock() {
650        Ok(mut agg) => agg.take_pending_publish(),
651        Err(_) => return,
652    };
653    for record in &records {
654        delivery::publish_message(state, &BrokerMessage::from(record));
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661
662    #[test]
663    fn broker_state_new_is_empty() {
664        let state = BrokerState::new(None);
665        let inner = state.read();
666        assert!(inner.agents.is_empty());
667        assert!(inner.queues.is_empty());
668        assert!(inner.message_log.is_empty());
669    }
670
671    #[test]
672    fn next_seq_starts_at_one() {
673        let state = BrokerState::new(None);
674        assert_eq!(state.next_seq(), 1);
675        assert_eq!(state.next_seq(), 2);
676        assert_eq!(state.next_seq(), 3);
677    }
678
679    #[test]
680    fn probe_no_listener() {
681        // Use a port that is almost certainly not in use.
682        let result = probe_existing_broker("http://127.0.0.1:19999");
683        assert_eq!(result, ProbeResult::NoListener);
684    }
685
686    #[test]
687    fn reattached_handle_has_no_runtime() {
688        let state = Arc::new(BrokerState::new(None));
689        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
690        assert!(h.runtime.is_none());
691        assert!(h.shutdown_tx.is_none());
692        assert!(h.flush_thread.is_none());
693    }
694
695    #[test]
696    fn start_broker_on_free_port() {
697        let config = BrokerConfig {
698            enabled: true,
699            // Use a high random port to avoid conflicts.
700            #[allow(clippy::cast_possible_truncation)]
701            port: 19_000 + (std::process::id() as u16 % 1000),
702            bind: "127.0.0.1".to_string(),
703            ..Default::default()
704        };
705        let state = BrokerState::new(None);
706        let handle = start_broker(&config, state, Vec::new());
707        // If the port happens to be in use, the test is inconclusive — not a failure.
708        if let Ok(h) = handle {
709            assert!(h.url.contains(&config.port.to_string()));
710            drop(h);
711        }
712    }
713
714    #[test]
715    fn start_broker_no_log_path_no_flush_thread() {
716        let config = BrokerConfig {
717            enabled: true,
718            #[allow(clippy::cast_possible_truncation)]
719            port: 19_100 + (std::process::id() as u16 % 100),
720            bind: "127.0.0.1".to_string(),
721            ..Default::default()
722        };
723        let state = BrokerState::new(None);
724        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
725            assert!(handle.flush_thread.is_none());
726            drop(handle);
727        }
728    }
729
730    #[test]
731    fn start_broker_with_log_path_spawns_flush_thread() {
732        let tmp = tempfile::tempdir().unwrap();
733        let log_path = tmp.path().join("broker.log");
734        let config = BrokerConfig {
735            enabled: true,
736            #[allow(clippy::cast_possible_truncation)]
737            port: 19_200 + (std::process::id() as u16 % 100),
738            bind: "127.0.0.1".to_string(),
739            ..Default::default()
740        };
741        let state = BrokerState::new(Some(log_path));
742        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
743            assert!(handle.flush_thread.is_some());
744            drop(handle);
745        }
746    }
747
748    // === agent-learning-variant: dual-output through the real publish path ===
749
750    fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
751        BrokerMessage::Feedback {
752            agent_id: target.to_string(),
753            payload: messages::FeedbackPayload {
754                from: "supervisor".to_string(),
755                errors: vec![format!(
756                    "[conflict-detector] in-flight conflict with {other} on src/a.rs"
757                )],
758            },
759        }
760    }
761
762    fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
763        state
764            .read()
765            .message_log
766            .iter()
767            .filter_map(|(_, _, m)| match m {
768                BrokerMessage::Learning { payload } => Some(payload.clone()),
769                _ => None,
770            })
771            .collect()
772    }
773
774    /// Drives one aggregator tick (flush + publish-pending) the way the flush
775    /// loop does, against an attached aggregator.
776    fn tick(state: &Arc<BrokerState>) {
777        let aggregator = state.learnings.clone().expect("aggregator attached");
778        if let Ok(mut a) = aggregator.lock() {
779            a.flush().unwrap();
780        }
781        publish_pending_learnings(state, &aggregator);
782    }
783
784    fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
785        let mut agg = learnings::LearningsAggregator::new(path);
786        agg.set_broker_publish(broker_publish);
787        agg.register_agent("feat-x");
788        agg.register_agent("feat-y");
789        let mut state = BrokerState::new(None);
790        state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
791        Arc::new(state)
792    }
793
794    // Task 7.1: broker on — a conflict scenario yields an `agent.learning`
795    // record on the broker AND a matching entry in the learnings file.
796    #[test]
797    fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
798        let tmp = tempfile::tempdir().unwrap();
799        let path = tmp.path().join("session-learnings.md");
800        let state = state_with_aggregator(path.clone(), true);
801
802        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
803        tick(&state);
804
805        let md = std::fs::read_to_string(&path).unwrap();
806        assert!(
807            md.contains("### Conflict events"),
808            "file missing conflict:\n{md}"
809        );
810
811        let learnings = learning_payloads(&state);
812        assert_eq!(learnings.len(), 1, "expected one agent.learning record");
813        assert_eq!(learnings[0].category, "conflict_event");
814        assert_eq!(learnings[0].id.len(), 16);
815        assert!(
816            md.contains(&learnings[0].title),
817            "file title must match broker record title: {}",
818            learnings[0].title
819        );
820    }
821
822    // Task 7.2: broker off — same scenario writes the file but attempts no
823    // broker publish.
824    #[test]
825    fn no_broker_publish_when_disabled_but_file_still_written() {
826        let tmp = tempfile::tempdir().unwrap();
827        let path = tmp.path().join("session-learnings.md");
828        let state = state_with_aggregator(path.clone(), false);
829
830        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
831        tick(&state);
832
833        let md = std::fs::read_to_string(&path).unwrap();
834        assert!(md.contains("### Conflict events"));
835        assert!(
836            learning_payloads(&state).is_empty(),
837            "no agent.learning record should be published when broker publish is off"
838        );
839    }
840
841    // Task 7.4: re-ticking after the queue is drained does not re-publish —
842    // each record reaches the broker exactly once.
843    #[test]
844    fn re_ticking_does_not_duplicate_learning_records() {
845        let tmp = tempfile::tempdir().unwrap();
846        let path = tmp.path().join("session-learnings.md");
847        let state = state_with_aggregator(path, true);
848
849        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
850        tick(&state);
851        tick(&state); // second tick: queue already drained, nothing new flushed
852
853        assert_eq!(
854            learning_payloads(&state).len(),
855            1,
856            "the conflict record must be published exactly once"
857        );
858    }
859
860    // The branch-scoped record is retrievable via `messages/<branch_id>`.
861    #[test]
862    fn branch_scoped_learning_is_routed_to_branch_inbox() {
863        let tmp = tempfile::tempdir().unwrap();
864        let path = tmp.path().join("session-learnings.md");
865        let state = state_with_aggregator(path, true);
866
867        // A stuck-duration scenario is branch-scoped to "feat-x".
868        delivery::publish_message(
869            &state,
870            &BrokerMessage::Blocked {
871                agent_id: "feat-x".to_string(),
872                payload: messages::BlockedPayload {
873                    needs: "types".to_string(),
874                    from: "feat-y".to_string(),
875                },
876            },
877        );
878        delivery::publish_message(
879            &state,
880            &BrokerMessage::Artifact {
881                agent_id: "feat-x".to_string(),
882                payload: messages::ArtifactPayload {
883                    status: "done".to_string(),
884                    exports: vec![],
885                    modified_files: vec![],
886                },
887            },
888        );
889        tick(&state);
890
891        let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
892        assert!(
893            msgs.iter().any(|m| matches!(
894                m,
895                BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
896            )),
897            "stuck_duration learning should land in feat-x's inbox"
898        );
899    }
900}