1pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35#[derive(Debug, Clone)]
39pub struct WatchTarget {
40 pub agent_id: String,
42 pub cli: String,
44 pub worktree_path: PathBuf,
46}
47
48#[derive(Debug, Clone)]
50pub struct AgentRecord {
51 pub agent_id: String,
53 pub status: String,
55 pub last_seen: Instant,
57 pub last_message: Option<BrokerMessage>,
59 pub last_committed_at: Option<Instant>,
67}
68
69#[derive(Debug, Clone, Serialize)]
72pub struct AgentStatusEntry {
73 pub agent_id: String,
75 pub cli: String,
77 pub status: String,
79 pub last_seen_seconds: u64,
81 pub summary: String,
83 #[serde(skip)]
85 pub last_seen: Instant,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub phase: Option<String>,
91}
92
93#[derive(Debug)]
95pub struct BrokerStateInner {
96 pub agents: HashMap<String, AgentRecord>,
98 pub agent_clis: HashMap<String, String>,
100 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
102 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
104 pub republish_working_ttl: std::time::Duration,
111}
112
113#[derive(Debug)]
120pub struct BrokerState {
121 inner: RwLock<BrokerStateInner>,
123 next_seq: AtomicU64,
125 pub log_path: Option<PathBuf>,
127 started_at: Instant,
129 pub learnings: Option<learnings::SharedLearnings>,
132 pub verify_on_commit_nudge: bool,
138 pub role_gating: Option<crate::opsx::RoleGatingContext>,
144}
145
146impl BrokerState {
147 pub fn new(log_path: Option<PathBuf>) -> Self {
149 Self {
150 inner: RwLock::new(BrokerStateInner {
151 agents: HashMap::new(),
152 agent_clis: HashMap::new(),
153 queues: HashMap::new(),
154 message_log: Vec::new(),
155 republish_working_ttl: std::time::Duration::from_secs(
156 crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
157 ),
158 }),
159 next_seq: AtomicU64::new(0),
160 log_path,
161 started_at: Instant::now(),
162 learnings: None,
163 verify_on_commit_nudge: false,
167 role_gating: None,
168 }
169 }
170
171 #[must_use]
176 pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
177 self.role_gating = Some(ctx);
178 self
179 }
180
181 #[must_use]
185 pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
186 self.verify_on_commit_nudge = enabled;
187 self
188 }
189
190 #[must_use]
201 pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
202 if !cli.is_empty()
203 && let Ok(mut inner) = self.inner.write()
204 {
205 inner
206 .agent_clis
207 .insert(agent_id.to_string(), cli.to_string());
208 }
209 self
210 }
211
212 pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
216 self.learnings = Some(aggregator);
217 }
218
219 pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
223 self.write().republish_working_ttl = ttl;
224 }
225
226 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
232 self.inner.read().expect("broker state lock poisoned")
233 }
234
235 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
241 self.inner.write().expect("broker state lock poisoned")
242 }
243
244 pub fn next_seq(&self) -> u64 {
246 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
247 }
248
249 pub fn uptime_seconds(&self) -> u64 {
253 self.started_at.elapsed().as_secs()
254 }
255}
256
257#[derive(Debug, thiserror::Error)]
259pub enum BrokerError {
260 #[error(
262 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
263 )]
264 PortInUse {
265 port: u16,
267 source: std::io::Error,
269 },
270
271 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
273 ProbeTimeout {
274 port: u16,
276 },
277
278 #[error("failed to bind broker: {0}")]
280 BindFailed(std::io::Error),
281
282 #[error("failed to create broker runtime: {0}")]
284 RuntimeFailed(std::io::Error),
285}
286
287pub struct BrokerHandle {
293 pub state: Arc<BrokerState>,
295 runtime: Option<tokio::runtime::Runtime>,
298 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
300 watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
302 pub url: String,
304 stop_flag: Arc<AtomicBool>,
306 flush_thread: Option<JoinHandle<()>>,
308 learnings_thread: Option<JoinHandle<()>>,
311}
312
313impl BrokerHandle {
314 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
316 Self {
317 state,
318 runtime: None,
319 shutdown_tx: None,
320 watcher_shutdown: None,
321 url,
322 stop_flag: Arc::new(AtomicBool::new(false)),
323 flush_thread: None,
324 learnings_thread: None,
325 }
326 }
327}
328
329impl Drop for BrokerHandle {
330 fn drop(&mut self) {
331 self.stop_flag.store(true, Ordering::Release);
333 if let Some(handle) = self.flush_thread.take() {
334 let _ = handle.join();
335 }
336 if let Some(handle) = self.learnings_thread.take() {
339 let _ = handle.join();
340 }
341 if let Some(tx) = self.watcher_shutdown.take() {
343 let _ = tx.send(true);
344 }
345 if let Some(tx) = self.shutdown_tx.take() {
347 let _ = tx.send(());
348 }
349 if let Some(rt) = self.runtime.take() {
351 rt.shutdown_timeout(std::time::Duration::from_secs(2));
352 }
353 }
354}
355
356#[derive(Debug, PartialEq, Eq)]
358pub enum ProbeResult {
359 NoListener,
361 LiveBroker,
363 ForeignServer,
365 Timeout,
367}
368
369pub fn probe_broker(url: &str) -> ProbeResult {
378 probe_existing_broker(url)
379}
380
381fn probe_existing_broker(url: &str) -> ProbeResult {
382 use std::io::{Read, Write};
383 use std::net::TcpStream;
384 use std::time::Duration;
385
386 let addr = url.strip_prefix("http://").unwrap_or(url);
388
389 let socket_addr = if let Ok(a) = addr.parse() {
390 a
391 } else {
392 use std::net::ToSocketAddrs;
393 match addr.to_socket_addrs() {
394 Ok(mut addrs) => match addrs.next() {
395 Some(a) => a,
396 None => return ProbeResult::NoListener,
397 },
398 Err(_) => return ProbeResult::NoListener,
399 }
400 };
401
402 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
403 else {
404 return ProbeResult::NoListener;
405 };
406
407 stream
408 .set_read_timeout(Some(Duration::from_millis(500)))
409 .ok();
410 stream
411 .set_write_timeout(Some(Duration::from_millis(500)))
412 .ok();
413
414 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
415 if stream.write_all(request.as_bytes()).is_err() {
416 return ProbeResult::Timeout;
417 }
418
419 let mut response = String::new();
420 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
421 return ProbeResult::Timeout;
422 }
423
424 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
425 ProbeResult::LiveBroker
426 } else if response.starts_with("HTTP/") {
427 ProbeResult::ForeignServer
428 } else {
429 ProbeResult::Timeout
430 }
431}
432
433pub fn start_broker(
446 config: &BrokerConfig,
447 state: BrokerState,
448 watch_targets: Vec<WatchTarget>,
449) -> Result<BrokerHandle, BrokerError> {
450 start_broker_with(config, state, watch_targets, None, 60)
451}
452
453pub fn start_broker_with(
466 config: &BrokerConfig,
467 state: BrokerState,
468 watch_targets: Vec<WatchTarget>,
469 conflict: Option<ConflictConfig>,
470 learnings_flush_interval_seconds: u64,
471) -> Result<BrokerHandle, BrokerError> {
472 let url = config.url();
473 let state = Arc::new(state);
474 state.set_republish_working_ttl(std::time::Duration::from_secs(
477 config.watcher.republish_working_ttl_seconds(),
478 ));
479 let stop_flag = Arc::new(AtomicBool::new(false));
480
481 match probe_existing_broker(&url) {
482 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
483 ProbeResult::ForeignServer => {
484 return Err(BrokerError::PortInUse {
485 port: config.port,
486 source: std::io::Error::new(
487 std::io::ErrorKind::AddrInUse,
488 "port occupied by non-broker process",
489 ),
490 });
491 }
492 ProbeResult::Timeout => {
493 return Err(BrokerError::ProbeTimeout { port: config.port });
494 }
495 ProbeResult::NoListener => {}
496 }
497
498 let flush_thread = if state.log_path.is_some() {
500 let s = Arc::clone(&state);
501 let f = Arc::clone(&stop_flag);
502 Some(std::thread::spawn(move || {
503 delivery::flush_loop(&s, &f);
504 }))
505 } else {
506 None
507 };
508
509 let learnings_thread = if state.learnings.is_some() {
513 let s = Arc::clone(&state);
514 let f = Arc::clone(&stop_flag);
515 Some(std::thread::spawn(move || {
516 learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
517 }))
518 } else {
519 None
520 };
521
522 let runtime = tokio::runtime::Builder::new_multi_thread()
523 .enable_all()
524 .build()
525 .map_err(BrokerError::RuntimeFailed)?;
526
527 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
528 |e: std::net::AddrParseError| {
529 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
530 },
531 )?;
532
533 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
534
535 let router = server::router(Arc::clone(&state));
536
537 let listener = runtime.block_on(async {
538 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
539 socket
540 .set_reuseaddr(true)
541 .map_err(BrokerError::BindFailed)?;
542 socket.bind(addr).map_err(BrokerError::BindFailed)?;
543 socket.listen(1024).map_err(BrokerError::BindFailed)
544 })?;
545
546 runtime.spawn(async {
549 let _ = tokio::signal::ctrl_c().await;
550 });
551
552 runtime.spawn(async move {
553 axum::serve(listener, router)
554 .with_graceful_shutdown(async {
555 let _ = shutdown_rx.await;
556 })
557 .await
558 .ok();
559 });
560
561 {
567 let mut inner = state.write();
568 for target in &watch_targets {
569 inner
570 .agent_clis
571 .insert(target.agent_id.clone(), target.cli.clone());
572 inner.queues.entry(target.agent_id.clone()).or_default();
573 }
574 }
575
576 let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
581 for target in watch_targets {
582 let s = Arc::clone(&state);
583 let rx = watcher_rx.clone();
584 runtime.spawn(watcher::watch_worktree(s, target, rx));
585 }
586 if let Some(conflict_cfg) = conflict {
587 let s = Arc::clone(&state);
588 let rx = watcher_rx.clone();
589 runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
590 }
591
592 Ok(BrokerHandle {
593 state,
594 runtime: Some(runtime),
595 shutdown_tx: Some(shutdown_tx),
596 watcher_shutdown: Some(watcher_tx),
597 url,
598 stop_flag,
599 flush_thread,
600 learnings_thread,
601 })
602}
603
604fn learnings_flush_loop(
610 state: &Arc<BrokerState>,
611 stop: &Arc<AtomicBool>,
612 flush_interval_seconds: u64,
613) {
614 let Some(aggregator) = state.learnings.clone() else {
615 return;
616 };
617 let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
618 let tick = std::time::Duration::from_millis(100);
619
620 loop {
621 let mut elapsed = std::time::Duration::ZERO;
622 while elapsed < interval {
623 if stop.load(Ordering::Acquire) {
624 if let Ok(mut agg) = aggregator.lock() {
625 let _ = agg.flush_at_shutdown();
626 }
627 publish_pending_learnings(state, &aggregator);
628 return;
629 }
630 std::thread::sleep(tick);
631 elapsed += tick;
632 }
633 if let Ok(mut agg) = aggregator.lock() {
634 let _ = agg.flush();
635 }
636 publish_pending_learnings(state, &aggregator);
637 }
638}
639
640fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
649 let records = match aggregator.lock() {
650 Ok(mut agg) => agg.take_pending_publish(),
651 Err(_) => return,
652 };
653 for record in &records {
654 delivery::publish_message(state, &BrokerMessage::from(record));
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661
662 #[test]
663 fn broker_state_new_is_empty() {
664 let state = BrokerState::new(None);
665 let inner = state.read();
666 assert!(inner.agents.is_empty());
667 assert!(inner.queues.is_empty());
668 assert!(inner.message_log.is_empty());
669 }
670
671 #[test]
672 fn next_seq_starts_at_one() {
673 let state = BrokerState::new(None);
674 assert_eq!(state.next_seq(), 1);
675 assert_eq!(state.next_seq(), 2);
676 assert_eq!(state.next_seq(), 3);
677 }
678
679 #[test]
680 fn probe_no_listener() {
681 let result = probe_existing_broker("http://127.0.0.1:19999");
683 assert_eq!(result, ProbeResult::NoListener);
684 }
685
686 #[test]
687 fn reattached_handle_has_no_runtime() {
688 let state = Arc::new(BrokerState::new(None));
689 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
690 assert!(h.runtime.is_none());
691 assert!(h.shutdown_tx.is_none());
692 assert!(h.flush_thread.is_none());
693 }
694
695 #[test]
696 fn start_broker_on_free_port() {
697 let config = BrokerConfig {
698 enabled: true,
699 #[allow(clippy::cast_possible_truncation)]
701 port: 19_000 + (std::process::id() as u16 % 1000),
702 bind: "127.0.0.1".to_string(),
703 ..Default::default()
704 };
705 let state = BrokerState::new(None);
706 let handle = start_broker(&config, state, Vec::new());
707 if let Ok(h) = handle {
709 assert!(h.url.contains(&config.port.to_string()));
710 drop(h);
711 }
712 }
713
714 #[test]
715 fn start_broker_no_log_path_no_flush_thread() {
716 let config = BrokerConfig {
717 enabled: true,
718 #[allow(clippy::cast_possible_truncation)]
719 port: 19_100 + (std::process::id() as u16 % 100),
720 bind: "127.0.0.1".to_string(),
721 ..Default::default()
722 };
723 let state = BrokerState::new(None);
724 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
725 assert!(handle.flush_thread.is_none());
726 drop(handle);
727 }
728 }
729
730 #[test]
731 fn start_broker_with_log_path_spawns_flush_thread() {
732 let tmp = tempfile::tempdir().unwrap();
733 let log_path = tmp.path().join("broker.log");
734 let config = BrokerConfig {
735 enabled: true,
736 #[allow(clippy::cast_possible_truncation)]
737 port: 19_200 + (std::process::id() as u16 % 100),
738 bind: "127.0.0.1".to_string(),
739 ..Default::default()
740 };
741 let state = BrokerState::new(Some(log_path));
742 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
743 assert!(handle.flush_thread.is_some());
744 drop(handle);
745 }
746 }
747
748 fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
751 BrokerMessage::Feedback {
752 agent_id: target.to_string(),
753 payload: messages::FeedbackPayload {
754 from: "supervisor".to_string(),
755 errors: vec![format!(
756 "[conflict-detector] in-flight conflict with {other} on src/a.rs"
757 )],
758 },
759 }
760 }
761
762 fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
763 state
764 .read()
765 .message_log
766 .iter()
767 .filter_map(|(_, _, m)| match m {
768 BrokerMessage::Learning { payload } => Some(payload.clone()),
769 _ => None,
770 })
771 .collect()
772 }
773
774 fn tick(state: &Arc<BrokerState>) {
777 let aggregator = state.learnings.clone().expect("aggregator attached");
778 if let Ok(mut a) = aggregator.lock() {
779 a.flush().unwrap();
780 }
781 publish_pending_learnings(state, &aggregator);
782 }
783
784 fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
785 let mut agg = learnings::LearningsAggregator::new(path);
786 agg.set_broker_publish(broker_publish);
787 agg.register_agent("feat-x");
788 agg.register_agent("feat-y");
789 let mut state = BrokerState::new(None);
790 state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
791 Arc::new(state)
792 }
793
794 #[test]
797 fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
798 let tmp = tempfile::tempdir().unwrap();
799 let path = tmp.path().join("session-learnings.md");
800 let state = state_with_aggregator(path.clone(), true);
801
802 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
803 tick(&state);
804
805 let md = std::fs::read_to_string(&path).unwrap();
806 assert!(
807 md.contains("### Conflict events"),
808 "file missing conflict:\n{md}"
809 );
810
811 let learnings = learning_payloads(&state);
812 assert_eq!(learnings.len(), 1, "expected one agent.learning record");
813 assert_eq!(learnings[0].category, "conflict_event");
814 assert_eq!(learnings[0].id.len(), 16);
815 assert!(
816 md.contains(&learnings[0].title),
817 "file title must match broker record title: {}",
818 learnings[0].title
819 );
820 }
821
822 #[test]
825 fn no_broker_publish_when_disabled_but_file_still_written() {
826 let tmp = tempfile::tempdir().unwrap();
827 let path = tmp.path().join("session-learnings.md");
828 let state = state_with_aggregator(path.clone(), false);
829
830 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
831 tick(&state);
832
833 let md = std::fs::read_to_string(&path).unwrap();
834 assert!(md.contains("### Conflict events"));
835 assert!(
836 learning_payloads(&state).is_empty(),
837 "no agent.learning record should be published when broker publish is off"
838 );
839 }
840
841 #[test]
844 fn re_ticking_does_not_duplicate_learning_records() {
845 let tmp = tempfile::tempdir().unwrap();
846 let path = tmp.path().join("session-learnings.md");
847 let state = state_with_aggregator(path, true);
848
849 delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
850 tick(&state);
851 tick(&state); assert_eq!(
854 learning_payloads(&state).len(),
855 1,
856 "the conflict record must be published exactly once"
857 );
858 }
859
860 #[test]
862 fn branch_scoped_learning_is_routed_to_branch_inbox() {
863 let tmp = tempfile::tempdir().unwrap();
864 let path = tmp.path().join("session-learnings.md");
865 let state = state_with_aggregator(path, true);
866
867 delivery::publish_message(
869 &state,
870 &BrokerMessage::Blocked {
871 agent_id: "feat-x".to_string(),
872 payload: messages::BlockedPayload {
873 needs: "types".to_string(),
874 from: "feat-y".to_string(),
875 },
876 },
877 );
878 delivery::publish_message(
879 &state,
880 &BrokerMessage::Artifact {
881 agent_id: "feat-x".to_string(),
882 payload: messages::ArtifactPayload {
883 status: "done".to_string(),
884 exports: vec![],
885 modified_files: vec![],
886 },
887 },
888 );
889 tick(&state);
890
891 let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
892 assert!(
893 msgs.iter().any(|m| matches!(
894 m,
895 BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
896 )),
897 "stuck_duration learning should land in feat-x's inbox"
898 );
899 }
900}