git-paw 0.7.0

Parallel AI Worktrees — orchestrate multiple AI coding CLI sessions across git worktrees
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
//! HTTP broker for agent coordination.
//!
//! Provides an HTTP server that agents use to publish messages, poll for
//! incoming messages, and report status. The broker runs on a background
//! tokio runtime and is managed through [`BrokerHandle`].
//!
//! # Lock discipline
//!
//! [`BrokerState`] wraps its inner state in an `RwLock`. **Guards MUST NOT be
//! held across `.await` boundaries.** The `clippy::await_holding_lock` lint is
//! enabled project-wide to catch violations at compile time. Use the
//! `read()` / `write()` methods to obtain guards inside synchronous closures
//! only.

pub mod conflict;
pub mod delivery;
pub mod learnings;
pub mod messages;
pub mod publish;
pub mod server;
pub mod watcher;

use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, OnceLock, RwLock};
use std::thread::JoinHandle;
use std::time::Instant;

use serde::Serialize;

use crate::config::{BrokerConfig, ConflictConfig};
pub use messages::BrokerMessage;

/// Worktree to watch for git-status changes.
///
/// The broker spawns one [`watcher::watch_worktree`] task per target.
#[derive(Debug, Clone)]
pub struct WatchTarget {
    /// Agent identifier (slugified branch name) that owns this worktree.
    pub agent_id: String,
    /// CLI name running in this agent's pane (e.g. `"claude"`).
    pub cli: String,
    /// Absolute path to the worktree root.
    pub worktree_path: PathBuf,
}

/// Record of a known agent's latest state.
#[derive(Debug, Clone)]
pub struct AgentRecord {
    /// Agent identifier (slugified branch name).
    pub agent_id: String,
    /// Last reported status label.
    pub status: String,
    /// When the agent last published a message.
    pub last_seen: Instant,
    /// The most recent message from this agent.
    pub last_message: Option<BrokerMessage>,
    /// When the agent most recently published an `agent.artifact status:
    /// "committed"` event, if ever.
    ///
    /// Bug 8 (`auto-approve-scope-v0-6-x`): the filesystem watcher consults
    /// this to decide whether a subsequent file modification should
    /// re-publish `working` (within the configured TTL window). Transient —
    /// not serialized; reset only when overwritten by a newer committed event.
    pub last_committed_at: Option<Instant>,
}

/// JSON-serializable snapshot of an agent's status for the `/status` endpoint
/// and the dashboard TUI.
#[derive(Debug, Clone, Serialize)]
pub struct AgentStatusEntry {
    /// Agent identifier (slugified branch name).
    pub agent_id: String,
    /// CLI name running in this agent's pane (e.g. "claude").
    pub cli: String,
    /// Current status label (e.g. "working", "done", "blocked").
    pub status: String,
    /// Seconds since the agent was last seen.
    pub last_seen_seconds: u64,
    /// One-line summary from the last message.
    pub summary: String,
    /// When the agent was last seen (for age calculations in the dashboard).
    #[serde(skip)]
    pub last_seen: Instant,
    /// Most recently published `payload.phase` on an `agent.status`, if any.
    /// The dashboard prefers this label over the message-type-derived
    /// `status_label()` when rendering the agent's row.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase: Option<String>,
}

/// Mutable broker state protected by an `RwLock`.
#[derive(Debug)]
pub struct BrokerStateInner {
    /// Known agents keyed by agent ID.
    pub agents: HashMap<String, AgentRecord>,
    /// CLI label per agent, populated from [`WatchTarget`] at broker start.
    pub agent_clis: HashMap<String, String>,
    /// Per-agent message inboxes: `(sequence_number, message)`.
    pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
    /// Append-only message log for disk flush.
    pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
    /// Post-commit re-entry TTL for the filesystem watcher (bug 8).
    ///
    /// A file modification observed within this window after an agent's
    /// `committed` event re-publishes `working`. `Duration::ZERO` disables
    /// the behaviour (v0.5.0 model). Defaults to 60s; overwritten from
    /// `[broker.watcher].republish_working_ttl_seconds` at broker start.
    pub republish_working_ttl: std::time::Duration,
    /// Worktree paths the broker is currently watching, for idempotent live
    /// registration via `POST /watch`. Seeded with the start-time watch
    /// targets in [`start_broker_with`] and extended at runtime by
    /// [`BrokerState::register_watch_target`]. A vanished worktree is pruned
    /// by its watcher task (see [`watcher::watch_worktree`]), so re-adding the
    /// same path later spawns a fresh watcher.
    pub watched_paths: HashSet<PathBuf>,
}

/// Shared broker state.
///
/// Wraps [`BrokerStateInner`] in an `RwLock` for concurrent read access.
/// The sequence counter is a standalone [`AtomicU64`] outside the lock so
/// that sequence numbers can be allocated without coupling to the write
/// lock.
#[derive(Debug)]
pub struct BrokerState {
    /// Protected mutable state.
    inner: RwLock<BrokerStateInner>,
    /// Global sequence counter (starts at 0; first assigned value is 1).
    next_seq: AtomicU64,
    /// Optional path for periodic log flush to disk.
    pub log_path: Option<PathBuf>,
    /// Wall-clock instant the broker state was created; used for uptime reporting.
    started_at: Instant,
    /// Optional learnings aggregator. Populated when supervisor + learnings
    /// mode is active; the publish path forwards every observed message.
    pub learnings: Option<learnings::SharedLearnings>,
    /// When `true`, an `agent.artifact { status: "committed" }` triggers a
    /// broker-emitted [`BrokerMessage::VerifyNow`] nudge to the supervisor
    /// inbox so per-commit verification fires on an explicit event. Resolved
    /// from `[supervisor].verify_on_commit_nudge` (default `true`) at session
    /// boot and threaded in via [`BrokerState::with_verify_on_commit_nudge`].
    pub verify_on_commit_nudge: bool,
    /// opsx role-gating context. When `Some` and active (`OpenSpec` engine +
    /// non-`off` mode), the publish path runs the role-gating guard on every
    /// `agent.artifact { status: "committed" }`. `None` (the default, and
    /// non-OpenSpec sessions) leaves the guard inert. Threaded in via
    /// [`BrokerState::with_role_gating`].
    pub role_gating: Option<crate::opsx::RoleGatingContext>,
    /// Watcher shutdown receiver, populated once in [`start_broker_with`]
    /// before the start-time watchers spawn. The `POST /watch` handler clones
    /// it to enroll a live-registered watcher in the same shared shutdown
    /// signal, so a hot-added watcher stops in lockstep with the rest on
    /// [`BrokerHandle`] drop. `None` until the broker has spawned its watchers
    /// (e.g. in router-only unit tests), in which case live registration still
    /// records the target but spawns no watcher.
    watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
}

impl BrokerState {
    /// Creates a new empty broker state.
    pub fn new(log_path: Option<PathBuf>) -> Self {
        Self {
            inner: RwLock::new(BrokerStateInner {
                agents: HashMap::new(),
                agent_clis: HashMap::new(),
                queues: HashMap::new(),
                message_log: Vec::new(),
                republish_working_ttl: std::time::Duration::from_secs(
                    crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
                ),
                watched_paths: HashSet::new(),
            }),
            next_seq: AtomicU64::new(0),
            log_path,
            started_at: Instant::now(),
            learnings: None,
            // Conservative broker-level default: the nudge is opt-in at the
            // state level and explicitly enabled from config (whose own
            // default is `true`) via `with_verify_on_commit_nudge`.
            verify_on_commit_nudge: false,
            role_gating: None,
            watcher_shutdown_rx: OnceLock::new(),
        }
    }

    /// Attaches the opsx role-gating context. Call before [`start_broker`] so
    /// the publish path observes it from the first committed artifact. Passing
    /// a context whose mode is `off` or whose engine is not `OpenSpec` leaves
    /// the guard inert (the guard re-checks [`crate::opsx::RoleGatingContext::is_active`]).
    #[must_use]
    pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
        self.role_gating = Some(ctx);
        self
    }

    /// Sets whether committed artifacts emit a [`BrokerMessage::VerifyNow`]
    /// nudge to the supervisor inbox. Call before [`start_broker`] so the
    /// publish path observes the resolved value from the first message.
    #[must_use]
    pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
        self.verify_on_commit_nudge = enabled;
        self
    }

    /// Authoritatively seeds an agent's CLI into the roster's CLI map.
    ///
    /// git-paw knows every pane's CLI at launch (the supervisor's from
    /// `[supervisor].cli`/`default_cli`, each agent's from the resolved
    /// agent CLI), so the launcher seeds those known values rather than
    /// depending on the agent to self-report via `agent.status`. This is
    /// the only source for the supervisor's CLI column, since the
    /// supervisor is not a filesystem watch target (W15-15). A blank `cli`
    /// is ignored so a missing config value never clobbers the map. Call
    /// before [`start_broker`] so the first `/status` snapshot is correct.
    #[must_use]
    pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
        if !cli.is_empty()
            && let Ok(mut inner) = self.inner.write()
        {
            inner
                .agent_clis
                .insert(agent_id.to_string(), cli.to_string());
        }
        self
    }

    /// Attaches a learnings aggregator. Replaces any previously attached
    /// instance. Must be called before [`start_broker`] so the publish path
    /// observes every message from the first one.
    pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
        self.learnings = Some(aggregator);
    }

    /// Sets the post-commit re-entry TTL consulted by the filesystem watcher
    /// and the `committed -> working` transition in `update_agent_record`
    /// (bug 8). `Duration::ZERO` disables the auto-republish behaviour.
    pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
        self.write().republish_working_ttl = ttl;
    }

    /// Records the shared watcher shutdown receiver so the `POST /watch`
    /// handler can clone it when spawning a live-registered watcher. Called
    /// once in [`start_broker_with`] before the start-time watchers spawn;
    /// subsequent calls are ignored (the receiver is set-once).
    pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
        let _ = self.watcher_shutdown_rx.set(rx);
    }

    /// Returns a clone of the shared watcher shutdown receiver if the broker
    /// has spawned its watchers, or `None` otherwise (e.g. router-only unit
    /// tests). The `POST /watch` handler uses this to enroll a hot-added
    /// watcher in the shared shutdown signal.
    #[must_use]
    pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
        self.watcher_shutdown_rx.get().cloned()
    }

    /// Registers a worktree as a live watch target for `POST /watch`.
    ///
    /// Bookkeeping only — the caller spawns the actual [`watcher::watch_worktree`]
    /// task when this returns `true`. Idempotent: the worktree path is the key,
    /// so re-registering an already-watched path returns `false` and the caller
    /// spawns nothing (no duplicate watcher). On a fresh path it seeds the
    /// agent's CLI label and inbox queue exactly as the start-time targets are
    /// seeded in [`start_broker_with`], so the agent surfaces in `/status` and
    /// receives peer broadcasts on the same terms.
    pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
        let mut inner = self.write();
        if !inner.watched_paths.insert(target.worktree_path.clone()) {
            return false;
        }
        if !target.cli.is_empty() {
            inner
                .agent_clis
                .insert(target.agent_id.clone(), target.cli.clone());
        }
        inner.queues.entry(target.agent_id.clone()).or_default();
        true
    }

    /// Drops a worktree from the live watch-target set so a later
    /// re-registration of the same path spawns a fresh watcher. Called by a
    /// watcher task when it detects its worktree has disappeared (the prune
    /// path for `git paw remove`).
    pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
        self.write().watched_paths.remove(worktree_path);
    }

    /// Acquires a read lock on the inner state.
    ///
    /// # Panics
    ///
    /// Panics if the lock is poisoned (a thread panicked while holding it).
    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
        self.inner.read().expect("broker state lock poisoned")
    }

    /// Acquires a write lock on the inner state.
    ///
    /// # Panics
    ///
    /// Panics if the lock is poisoned (a thread panicked while holding it).
    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
        self.inner.write().expect("broker state lock poisoned")
    }

    /// Atomically allocates the next sequence number (starting at 1).
    pub fn next_seq(&self) -> u64 {
        self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
    }

    /// Returns the number of seconds since the broker was started.
    ///
    /// Used by the HTTP `/status` handler to report uptime.
    pub fn uptime_seconds(&self) -> u64 {
        self.started_at.elapsed().as_secs()
    }
}

/// Errors specific to broker operations.
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
    /// The configured port is already in use by a non-broker process.
    #[error(
        "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
    )]
    PortInUse {
        /// The port that was occupied.
        port: u16,
        /// The underlying I/O error.
        source: std::io::Error,
    },

    /// A probe to an existing listener on the port timed out.
    #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
    ProbeTimeout {
        /// The port that timed out.
        port: u16,
    },

    /// Binding to the address failed.
    #[error("failed to bind broker: {0}")]
    BindFailed(std::io::Error),

    /// Creating the tokio runtime failed.
    #[error("failed to create broker runtime: {0}")]
    RuntimeFailed(std::io::Error),
}

/// Handle to a running broker, including the optional flush thread.
///
/// When dropped, signals the flush thread to stop and joins it, then
/// shuts down the tokio runtime. If the handle is in "reattached" mode
/// (connected to an existing broker), dropping it is a no-op.
pub struct BrokerHandle {
    /// Shared broker state.
    pub state: Arc<BrokerState>,
    /// The tokio runtime powering the broker server.
    /// `None` when reattached to an existing broker.
    runtime: Option<tokio::runtime::Runtime>,
    /// Sends a shutdown signal to the server task.
    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
    /// Broadcasts the watcher shutdown signal to all watcher tasks.
    watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
    /// The URL the broker is listening on.
    pub url: String,
    /// Flag to signal the flush thread to exit.
    stop_flag: Arc<AtomicBool>,
    /// Flush thread join handle (present only when `log_path` is `Some`).
    flush_thread: Option<JoinHandle<()>>,
    /// Periodic flush thread for the learnings aggregator (present only
    /// when `state.learnings.is_some()`).
    learnings_thread: Option<JoinHandle<()>>,
}

impl BrokerHandle {
    /// Creates a handle that reattaches to an existing broker (no owned runtime).
    fn reattached(url: String, state: Arc<BrokerState>) -> Self {
        Self {
            state,
            runtime: None,
            shutdown_tx: None,
            watcher_shutdown: None,
            url,
            stop_flag: Arc::new(AtomicBool::new(false)),
            flush_thread: None,
            learnings_thread: None,
        }
    }
}

impl Drop for BrokerHandle {
    fn drop(&mut self) {
        // 1. Signal both flush threads to stop and join the log flush thread.
        self.stop_flag.store(true, Ordering::Release);
        if let Some(handle) = self.flush_thread.take() {
            let _ = handle.join();
        }
        // 2. Join the learnings flush thread — it performs the final
        //    shutdown flush before returning.
        if let Some(handle) = self.learnings_thread.take() {
            let _ = handle.join();
        }
        // 3. Signal watcher tasks to stop.
        if let Some(tx) = self.watcher_shutdown.take() {
            let _ = tx.send(true);
        }
        // 4. Signal shutdown to the server task.
        if let Some(tx) = self.shutdown_tx.take() {
            let _ = tx.send(());
        }
        // 5. Give in-flight requests up to 2 seconds to drain, then drop runtime.
        if let Some(rt) = self.runtime.take() {
            rt.shutdown_timeout(std::time::Duration::from_secs(2));
        }
    }
}

/// Result of probing an existing listener on the broker port.
#[derive(Debug, PartialEq, Eq)]
pub enum ProbeResult {
    /// Nothing is listening — safe to bind.
    NoListener,
    /// A git-paw broker is already running.
    LiveBroker,
    /// Something else is using the port.
    ForeignServer,
    /// The probe timed out.
    Timeout,
}

/// Probes an existing listener at the given URL to determine what is running.
///
/// Uses a lightweight `TcpStream` with a manual HTTP/1.1 GET to `/status`
/// to avoid pulling in a full HTTP client dependency.
/// Probes a URL to determine what broker (if any) is running there.
///
/// Public entry point for callers that need to inspect broker status without
/// starting a new server (e.g. the `status` subcommand).
pub fn probe_broker(url: &str) -> ProbeResult {
    probe_existing_broker(url)
}

fn probe_existing_broker(url: &str) -> ProbeResult {
    use std::io::{Read, Write};
    use std::net::TcpStream;
    use std::time::Duration;

    // Parse host:port from URL like "http://127.0.0.1:9119"
    let addr = url.strip_prefix("http://").unwrap_or(url);

    let socket_addr = if let Ok(a) = addr.parse() {
        a
    } else {
        use std::net::ToSocketAddrs;
        match addr.to_socket_addrs() {
            Ok(mut addrs) => match addrs.next() {
                Some(a) => a,
                None => return ProbeResult::NoListener,
            },
            Err(_) => return ProbeResult::NoListener,
        }
    };

    let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
    else {
        return ProbeResult::NoListener;
    };

    stream
        .set_read_timeout(Some(Duration::from_millis(500)))
        .ok();
    stream
        .set_write_timeout(Some(Duration::from_millis(500)))
        .ok();

    let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
    if stream.write_all(request.as_bytes()).is_err() {
        return ProbeResult::Timeout;
    }

    let mut response = String::new();
    if stream.read_to_string(&mut response).is_err() && response.is_empty() {
        return ProbeResult::Timeout;
    }

    if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
        ProbeResult::LiveBroker
    } else if response.starts_with("HTTP/") {
        ProbeResult::ForeignServer
    } else {
        ProbeResult::Timeout
    }
}

/// Starts the HTTP broker server.
///
/// Probes the configured port first:
/// - If a live git-paw broker is found, returns a reattached handle.
/// - If a foreign server occupies the port, returns [`BrokerError::PortInUse`].
/// - If the probe times out, returns [`BrokerError::ProbeTimeout`].
/// - If nothing is listening, binds and starts the server.
///
/// Also spawns the background flush thread if `state.log_path` is set.
///
/// Calls [`start_broker`] without a conflict-detector. Equivalent to
/// `start_broker_with(config, state, watch_targets, None)`.
pub fn start_broker(
    config: &BrokerConfig,
    state: BrokerState,
    watch_targets: Vec<WatchTarget>,
) -> Result<BrokerHandle, BrokerError> {
    start_broker_with(config, state, watch_targets, None, 60)
}

/// Starts the HTTP broker server with optional conflict-detector wiring
/// and a configurable learnings-flush interval.
///
/// When `conflict` is `Some`, a background tokio task running the
/// detector loop is spawned alongside the watcher tasks. The detector
/// shuts down with the rest of the broker when [`BrokerHandle`] is
/// dropped.
///
/// `learnings_flush_interval_seconds` controls how often the learnings
/// aggregator flushes to `.git-paw/session-learnings.md` when
/// `state.learnings` is `Some`. Default for [`start_broker`] is 60s;
/// tests override to drive flush behaviour without waiting on real time.
#[allow(clippy::too_many_lines)]
pub fn start_broker_with(
    config: &BrokerConfig,
    state: BrokerState,
    watch_targets: Vec<WatchTarget>,
    conflict: Option<ConflictConfig>,
    learnings_flush_interval_seconds: u64,
) -> Result<BrokerHandle, BrokerError> {
    let url = config.url();
    let state = Arc::new(state);
    // Apply the configured post-commit re-entry TTL (bug 8) before any
    // watcher task or publish observes the state.
    state.set_republish_working_ttl(std::time::Duration::from_secs(
        config.watcher.republish_working_ttl_seconds(),
    ));
    let stop_flag = Arc::new(AtomicBool::new(false));

    match probe_existing_broker(&url) {
        ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
        ProbeResult::ForeignServer => {
            return Err(BrokerError::PortInUse {
                port: config.port,
                source: std::io::Error::new(
                    std::io::ErrorKind::AddrInUse,
                    "port occupied by non-broker process",
                ),
            });
        }
        ProbeResult::Timeout => {
            return Err(BrokerError::ProbeTimeout { port: config.port });
        }
        ProbeResult::NoListener => {}
    }

    // Spawn flush thread if log_path is configured.
    let flush_thread = if state.log_path.is_some() {
        let s = Arc::clone(&state);
        let f = Arc::clone(&stop_flag);
        Some(std::thread::spawn(move || {
            delivery::flush_loop(&s, &f);
        }))
    } else {
        None
    };

    // Spawn learnings flush thread when an aggregator is attached. The
    // thread performs a final `flush_at_shutdown` after the stop flag is
    // raised so unresolved blocks and recovery cycles end up in the file.
    let learnings_thread = if state.learnings.is_some() {
        let s = Arc::clone(&state);
        let f = Arc::clone(&stop_flag);
        Some(std::thread::spawn(move || {
            learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
        }))
    } else {
        None
    };

    let runtime = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .map_err(BrokerError::RuntimeFailed)?;

    let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
        |e: std::net::AddrParseError| {
            BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
        },
    )?;

    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();

    let router = server::router(Arc::clone(&state));

    let listener = runtime.block_on(async {
        let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
        socket
            .set_reuseaddr(true)
            .map_err(BrokerError::BindFailed)?;
        socket.bind(addr).map_err(BrokerError::BindFailed)?;
        socket.listen(1024).map_err(BrokerError::BindFailed)
    })?;

    // Install SIGINT handler so the broker does not die on Ctrl+C.
    // The dashboard process is responsible for user-facing Ctrl+C handling.
    runtime.spawn(async {
        let _ = tokio::signal::ctrl_c().await;
    });

    runtime.spawn(async move {
        axum::serve(listener, router)
            .with_graceful_shutdown(async {
                let _ = shutdown_rx.await;
            })
            .await
            .ok();
    });

    // Pre-populate the CLI label AND the inbox queue for every watched
    // agent so (a) the dashboard shows the CLI before any status messages
    // arrive, and (b) peer `agent.artifact` broadcasts — which only target
    // already-existing queues — actually reach the watched agent even
    // before it has published anything itself.
    {
        let mut inner = state.write();
        for target in &watch_targets {
            inner
                .agent_clis
                .insert(target.agent_id.clone(), target.cli.clone());
            inner.queues.entry(target.agent_id.clone()).or_default();
            // Seed the live target set so a `POST /watch` for a start-time path is a no-op.
            inner.watched_paths.insert(target.worktree_path.clone());
        }
    }

    // Spawn one watcher task per target. All watchers share a single
    // `tokio::sync::watch` channel; flipping it to `true` on drop signals
    // every watcher to exit on its next tick. The conflict detector
    // shares the same shutdown channel so it stops in lockstep.
    let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
    // Publish the shutdown receiver so `POST /watch` can enroll live watchers.
    state.set_watcher_shutdown_rx(watcher_rx.clone());
    for target in watch_targets {
        let s = Arc::clone(&state);
        let rx = watcher_rx.clone();
        runtime.spawn(watcher::watch_worktree(s, target, rx));
    }
    if let Some(conflict_cfg) = conflict {
        let s = Arc::clone(&state);
        let rx = watcher_rx.clone();
        runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
    }

    Ok(BrokerHandle {
        state,
        runtime: Some(runtime),
        shutdown_tx: Some(shutdown_tx),
        watcher_shutdown: Some(watcher_tx),
        url,
        stop_flag,
        flush_thread,
        learnings_thread,
    })
}

/// Background loop driving the learnings aggregator's periodic flush.
///
/// Sleeps in small slices so it can react to the broker stop flag within
/// ~100ms. When the stop flag is raised, it performs one final
/// [`learnings::LearningsAggregator::flush_at_shutdown`] before exiting.
fn learnings_flush_loop(
    state: &Arc<BrokerState>,
    stop: &Arc<AtomicBool>,
    flush_interval_seconds: u64,
) {
    let Some(aggregator) = state.learnings.clone() else {
        return;
    };
    let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
    let tick = std::time::Duration::from_millis(100);

    loop {
        let mut elapsed = std::time::Duration::ZERO;
        while elapsed < interval {
            if stop.load(Ordering::Acquire) {
                if let Ok(mut agg) = aggregator.lock() {
                    let _ = agg.flush_at_shutdown();
                }
                publish_pending_learnings(state, &aggregator);
                return;
            }
            std::thread::sleep(tick);
            elapsed += tick;
        }
        if let Ok(mut agg) = aggregator.lock() {
            let _ = agg.flush();
        }
        publish_pending_learnings(state, &aggregator);
    }
}

/// Drains the aggregator's pending broker records and publishes each as an
/// `agent.learning` message.
///
/// Critically, the aggregator lock is acquired *only* to drain the queue and
/// is released before any publish: `publish_message` re-enters the aggregator
/// via `observe`, so publishing while holding the lock would deadlock the
/// non-reentrant `Mutex`. When broker publish is disabled the queue is always
/// empty, so this is a cheap no-op.
fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
    let records = match aggregator.lock() {
        Ok(mut agg) => agg.take_pending_publish(),
        Err(_) => return,
    };
    for record in &records {
        delivery::publish_message(state, &BrokerMessage::from(record));
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn broker_state_new_is_empty() {
        let state = BrokerState::new(None);
        let inner = state.read();
        assert!(inner.agents.is_empty());
        assert!(inner.queues.is_empty());
        assert!(inner.message_log.is_empty());
    }

    #[test]
    fn register_watch_target_is_idempotent_and_seeds_roster() {
        let state = BrokerState::new(None);
        let target = WatchTarget {
            agent_id: "feat-hot".to_string(),
            cli: "claude".to_string(),
            worktree_path: PathBuf::from("/tmp/feat-hot"),
        };
        // First registration is fresh — the caller should spawn a watcher.
        assert!(
            state.register_watch_target(&target),
            "first registration must return true"
        );
        // Re-registering the same path is a no-op — no duplicate watcher.
        assert!(
            !state.register_watch_target(&target),
            "duplicate registration must return false"
        );
        let inner = state.read();
        assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
        assert_eq!(
            inner.agent_clis.get("feat-hot").map(String::as_str),
            Some("claude"),
            "registration seeds the CLI label"
        );
        assert!(
            inner.queues.contains_key("feat-hot"),
            "registration seeds the inbox queue"
        );
    }

    #[test]
    fn forget_watch_target_allows_re_registration() {
        let state = BrokerState::new(None);
        let target = WatchTarget {
            agent_id: "feat-hot".to_string(),
            cli: "claude".to_string(),
            worktree_path: PathBuf::from("/tmp/feat-hot"),
        };
        assert!(state.register_watch_target(&target));
        state.forget_watch_target(&target.worktree_path);
        assert!(
            state.register_watch_target(&target),
            "after forgetting, the same path registers fresh again"
        );
    }

    #[test]
    fn next_seq_starts_at_one() {
        let state = BrokerState::new(None);
        assert_eq!(state.next_seq(), 1);
        assert_eq!(state.next_seq(), 2);
        assert_eq!(state.next_seq(), 3);
    }

    #[test]
    fn probe_no_listener() {
        // Use a port that is almost certainly not in use.
        let result = probe_existing_broker("http://127.0.0.1:19999");
        assert_eq!(result, ProbeResult::NoListener);
    }

    #[test]
    fn reattached_handle_has_no_runtime() {
        let state = Arc::new(BrokerState::new(None));
        let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
        assert!(h.runtime.is_none());
        assert!(h.shutdown_tx.is_none());
        assert!(h.flush_thread.is_none());
    }

    #[test]
    fn start_broker_on_free_port() {
        let config = BrokerConfig {
            enabled: true,
            // Use a high random port to avoid conflicts.
            #[allow(clippy::cast_possible_truncation)]
            port: 19_000 + (std::process::id() as u16 % 1000),
            bind: "127.0.0.1".to_string(),
            ..Default::default()
        };
        let state = BrokerState::new(None);
        let handle = start_broker(&config, state, Vec::new());
        // If the port happens to be in use, the test is inconclusive — not a failure.
        if let Ok(h) = handle {
            assert!(h.url.contains(&config.port.to_string()));
            drop(h);
        }
    }

    #[test]
    fn start_broker_no_log_path_no_flush_thread() {
        let config = BrokerConfig {
            enabled: true,
            #[allow(clippy::cast_possible_truncation)]
            port: 19_100 + (std::process::id() as u16 % 100),
            bind: "127.0.0.1".to_string(),
            ..Default::default()
        };
        let state = BrokerState::new(None);
        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
            assert!(handle.flush_thread.is_none());
            drop(handle);
        }
    }

    #[test]
    fn start_broker_with_log_path_spawns_flush_thread() {
        let tmp = tempfile::tempdir().unwrap();
        let log_path = tmp.path().join("broker.log");
        let config = BrokerConfig {
            enabled: true,
            #[allow(clippy::cast_possible_truncation)]
            port: 19_200 + (std::process::id() as u16 % 100),
            bind: "127.0.0.1".to_string(),
            ..Default::default()
        };
        let state = BrokerState::new(Some(log_path));
        if let Ok(handle) = start_broker(&config, state, Vec::new()) {
            assert!(handle.flush_thread.is_some());
            drop(handle);
        }
    }

    // === agent-learning-variant: dual-output through the real publish path ===

    fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
        BrokerMessage::Feedback {
            agent_id: target.to_string(),
            payload: messages::FeedbackPayload {
                from: "supervisor".to_string(),
                errors: vec![format!(
                    "[conflict-detector] in-flight conflict with {other} on src/a.rs"
                )],
            },
        }
    }

    fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
        state
            .read()
            .message_log
            .iter()
            .filter_map(|(_, _, m)| match m {
                BrokerMessage::Learning { payload } => Some(payload.clone()),
                _ => None,
            })
            .collect()
    }

    /// Drives one aggregator tick (flush + publish-pending) the way the flush
    /// loop does, against an attached aggregator.
    fn tick(state: &Arc<BrokerState>) {
        let aggregator = state.learnings.clone().expect("aggregator attached");
        if let Ok(mut a) = aggregator.lock() {
            a.flush().unwrap();
        }
        publish_pending_learnings(state, &aggregator);
    }

    fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
        let mut agg = learnings::LearningsAggregator::new(path);
        agg.set_broker_publish(broker_publish);
        agg.register_agent("feat-x");
        agg.register_agent("feat-y");
        let mut state = BrokerState::new(None);
        state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
        Arc::new(state)
    }

    // Task 7.1: broker on — a conflict scenario yields an `agent.learning`
    // record on the broker AND a matching entry in the learnings file.
    #[test]
    fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("session-learnings.md");
        let state = state_with_aggregator(path.clone(), true);

        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
        tick(&state);

        let md = std::fs::read_to_string(&path).unwrap();
        assert!(
            md.contains("### Conflict events"),
            "file missing conflict:\n{md}"
        );

        let learnings = learning_payloads(&state);
        assert_eq!(learnings.len(), 1, "expected one agent.learning record");
        assert_eq!(learnings[0].category, "conflict_event");
        assert_eq!(learnings[0].id.len(), 16);
        assert!(
            md.contains(&learnings[0].title),
            "file title must match broker record title: {}",
            learnings[0].title
        );
    }

    // Task 7.2: broker off — same scenario writes the file but attempts no
    // broker publish.
    #[test]
    fn no_broker_publish_when_disabled_but_file_still_written() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("session-learnings.md");
        let state = state_with_aggregator(path.clone(), false);

        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
        tick(&state);

        let md = std::fs::read_to_string(&path).unwrap();
        assert!(md.contains("### Conflict events"));
        assert!(
            learning_payloads(&state).is_empty(),
            "no agent.learning record should be published when broker publish is off"
        );
    }

    // Task 7.4: re-ticking after the queue is drained does not re-publish —
    // each record reaches the broker exactly once.
    #[test]
    fn re_ticking_does_not_duplicate_learning_records() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("session-learnings.md");
        let state = state_with_aggregator(path, true);

        delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
        tick(&state);
        tick(&state); // second tick: queue already drained, nothing new flushed

        assert_eq!(
            learning_payloads(&state).len(),
            1,
            "the conflict record must be published exactly once"
        );
    }

    // The branch-scoped record is retrievable via `messages/<branch_id>`.
    #[test]
    fn branch_scoped_learning_is_routed_to_branch_inbox() {
        let tmp = tempfile::tempdir().unwrap();
        let path = tmp.path().join("session-learnings.md");
        let state = state_with_aggregator(path, true);

        // A stuck-duration scenario is branch-scoped to "feat-x".
        delivery::publish_message(
            &state,
            &BrokerMessage::Blocked {
                agent_id: "feat-x".to_string(),
                payload: messages::BlockedPayload {
                    needs: "types".to_string(),
                    from: "feat-y".to_string(),
                },
            },
        );
        delivery::publish_message(
            &state,
            &BrokerMessage::Artifact {
                agent_id: "feat-x".to_string(),
                payload: messages::ArtifactPayload {
                    status: "done".to_string(),
                    exports: vec![],
                    modified_files: vec![],
                },
            },
        );
        tick(&state);

        let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
        assert!(
            msgs.iter().any(|m| matches!(
                m,
                BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
            )),
            "stuck_duration learning should land in feat-x's inbox"
        );
    }
}