1pub mod conflict;
16pub mod delivery;
17pub mod learnings;
18pub mod messages;
19pub mod publish;
20pub mod server;
21pub mod watcher;
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
26use std::sync::{Arc, RwLock};
27use std::thread::JoinHandle;
28use std::time::Instant;
29
30use serde::Serialize;
31
32use crate::config::{BrokerConfig, ConflictConfig};
33pub use messages::BrokerMessage;
34
35#[derive(Debug, Clone)]
39pub struct WatchTarget {
40 pub agent_id: String,
42 pub cli: String,
44 pub worktree_path: PathBuf,
46}
47
48#[derive(Debug, Clone)]
50pub struct AgentRecord {
51 pub agent_id: String,
53 pub status: String,
55 pub last_seen: Instant,
57 pub last_message: Option<BrokerMessage>,
59}
60
61#[derive(Debug, Clone, Serialize)]
64pub struct AgentStatusEntry {
65 pub agent_id: String,
67 pub cli: String,
69 pub status: String,
71 pub last_seen_seconds: u64,
73 pub summary: String,
75 #[serde(skip)]
77 pub last_seen: Instant,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub phase: Option<String>,
83}
84
85#[derive(Debug)]
87pub struct BrokerStateInner {
88 pub agents: HashMap<String, AgentRecord>,
90 pub agent_clis: HashMap<String, String>,
92 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
94 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
96}
97
98#[derive(Debug)]
105pub struct BrokerState {
106 inner: RwLock<BrokerStateInner>,
108 next_seq: AtomicU64,
110 pub log_path: Option<PathBuf>,
112 started_at: Instant,
114 pub learnings: Option<learnings::SharedLearnings>,
117}
118
119impl BrokerState {
120 pub fn new(log_path: Option<PathBuf>) -> Self {
122 Self {
123 inner: RwLock::new(BrokerStateInner {
124 agents: HashMap::new(),
125 agent_clis: HashMap::new(),
126 queues: HashMap::new(),
127 message_log: Vec::new(),
128 }),
129 next_seq: AtomicU64::new(0),
130 log_path,
131 started_at: Instant::now(),
132 learnings: None,
133 }
134 }
135
136 pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
140 self.learnings = Some(aggregator);
141 }
142
143 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
149 self.inner.read().expect("broker state lock poisoned")
150 }
151
152 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
158 self.inner.write().expect("broker state lock poisoned")
159 }
160
161 pub fn next_seq(&self) -> u64 {
163 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
164 }
165
166 pub fn uptime_seconds(&self) -> u64 {
170 self.started_at.elapsed().as_secs()
171 }
172}
173
174#[derive(Debug, thiserror::Error)]
176pub enum BrokerError {
177 #[error(
179 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
180 )]
181 PortInUse {
182 port: u16,
184 source: std::io::Error,
186 },
187
188 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
190 ProbeTimeout {
191 port: u16,
193 },
194
195 #[error("failed to bind broker: {0}")]
197 BindFailed(std::io::Error),
198
199 #[error("failed to create broker runtime: {0}")]
201 RuntimeFailed(std::io::Error),
202}
203
204pub struct BrokerHandle {
210 pub state: Arc<BrokerState>,
212 runtime: Option<tokio::runtime::Runtime>,
215 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
217 watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
219 pub url: String,
221 stop_flag: Arc<AtomicBool>,
223 flush_thread: Option<JoinHandle<()>>,
225 learnings_thread: Option<JoinHandle<()>>,
228}
229
230impl BrokerHandle {
231 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
233 Self {
234 state,
235 runtime: None,
236 shutdown_tx: None,
237 watcher_shutdown: None,
238 url,
239 stop_flag: Arc::new(AtomicBool::new(false)),
240 flush_thread: None,
241 learnings_thread: None,
242 }
243 }
244}
245
246impl Drop for BrokerHandle {
247 fn drop(&mut self) {
248 self.stop_flag.store(true, Ordering::Release);
250 if let Some(handle) = self.flush_thread.take() {
251 let _ = handle.join();
252 }
253 if let Some(handle) = self.learnings_thread.take() {
256 let _ = handle.join();
257 }
258 if let Some(tx) = self.watcher_shutdown.take() {
260 let _ = tx.send(true);
261 }
262 if let Some(tx) = self.shutdown_tx.take() {
264 let _ = tx.send(());
265 }
266 if let Some(rt) = self.runtime.take() {
268 rt.shutdown_timeout(std::time::Duration::from_secs(2));
269 }
270 }
271}
272
273#[derive(Debug, PartialEq, Eq)]
275pub enum ProbeResult {
276 NoListener,
278 LiveBroker,
280 ForeignServer,
282 Timeout,
284}
285
286pub fn probe_broker(url: &str) -> ProbeResult {
295 probe_existing_broker(url)
296}
297
298fn probe_existing_broker(url: &str) -> ProbeResult {
299 use std::io::{Read, Write};
300 use std::net::TcpStream;
301 use std::time::Duration;
302
303 let addr = url.strip_prefix("http://").unwrap_or(url);
305
306 let socket_addr = if let Ok(a) = addr.parse() {
307 a
308 } else {
309 use std::net::ToSocketAddrs;
310 match addr.to_socket_addrs() {
311 Ok(mut addrs) => match addrs.next() {
312 Some(a) => a,
313 None => return ProbeResult::NoListener,
314 },
315 Err(_) => return ProbeResult::NoListener,
316 }
317 };
318
319 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
320 else {
321 return ProbeResult::NoListener;
322 };
323
324 stream
325 .set_read_timeout(Some(Duration::from_millis(500)))
326 .ok();
327 stream
328 .set_write_timeout(Some(Duration::from_millis(500)))
329 .ok();
330
331 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
332 if stream.write_all(request.as_bytes()).is_err() {
333 return ProbeResult::Timeout;
334 }
335
336 let mut response = String::new();
337 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
338 return ProbeResult::Timeout;
339 }
340
341 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
342 ProbeResult::LiveBroker
343 } else if response.starts_with("HTTP/") {
344 ProbeResult::ForeignServer
345 } else {
346 ProbeResult::Timeout
347 }
348}
349
350pub fn start_broker(
363 config: &BrokerConfig,
364 state: BrokerState,
365 watch_targets: Vec<WatchTarget>,
366) -> Result<BrokerHandle, BrokerError> {
367 start_broker_with(config, state, watch_targets, None, 60)
368}
369
370pub fn start_broker_with(
383 config: &BrokerConfig,
384 state: BrokerState,
385 watch_targets: Vec<WatchTarget>,
386 conflict: Option<ConflictConfig>,
387 learnings_flush_interval_seconds: u64,
388) -> Result<BrokerHandle, BrokerError> {
389 let url = config.url();
390 let state = Arc::new(state);
391 let stop_flag = Arc::new(AtomicBool::new(false));
392
393 match probe_existing_broker(&url) {
394 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
395 ProbeResult::ForeignServer => {
396 return Err(BrokerError::PortInUse {
397 port: config.port,
398 source: std::io::Error::new(
399 std::io::ErrorKind::AddrInUse,
400 "port occupied by non-broker process",
401 ),
402 });
403 }
404 ProbeResult::Timeout => {
405 return Err(BrokerError::ProbeTimeout { port: config.port });
406 }
407 ProbeResult::NoListener => {}
408 }
409
410 let flush_thread = if state.log_path.is_some() {
412 let s = Arc::clone(&state);
413 let f = Arc::clone(&stop_flag);
414 Some(std::thread::spawn(move || {
415 delivery::flush_loop(&s, &f);
416 }))
417 } else {
418 None
419 };
420
421 let learnings_thread = if state.learnings.is_some() {
425 let s = Arc::clone(&state);
426 let f = Arc::clone(&stop_flag);
427 Some(std::thread::spawn(move || {
428 learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
429 }))
430 } else {
431 None
432 };
433
434 let runtime = tokio::runtime::Builder::new_multi_thread()
435 .enable_all()
436 .build()
437 .map_err(BrokerError::RuntimeFailed)?;
438
439 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
440 |e: std::net::AddrParseError| {
441 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
442 },
443 )?;
444
445 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
446
447 let router = server::router(Arc::clone(&state));
448
449 let listener = runtime.block_on(async {
450 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
451 socket
452 .set_reuseaddr(true)
453 .map_err(BrokerError::BindFailed)?;
454 socket.bind(addr).map_err(BrokerError::BindFailed)?;
455 socket.listen(1024).map_err(BrokerError::BindFailed)
456 })?;
457
458 runtime.spawn(async {
461 let _ = tokio::signal::ctrl_c().await;
462 });
463
464 runtime.spawn(async move {
465 axum::serve(listener, router)
466 .with_graceful_shutdown(async {
467 let _ = shutdown_rx.await;
468 })
469 .await
470 .ok();
471 });
472
473 {
479 let mut inner = state.write();
480 for target in &watch_targets {
481 inner
482 .agent_clis
483 .insert(target.agent_id.clone(), target.cli.clone());
484 inner.queues.entry(target.agent_id.clone()).or_default();
485 }
486 }
487
488 let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
493 for target in watch_targets {
494 let s = Arc::clone(&state);
495 let rx = watcher_rx.clone();
496 runtime.spawn(watcher::watch_worktree(s, target, rx));
497 }
498 if let Some(conflict_cfg) = conflict {
499 let s = Arc::clone(&state);
500 let rx = watcher_rx.clone();
501 runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
502 }
503
504 Ok(BrokerHandle {
505 state,
506 runtime: Some(runtime),
507 shutdown_tx: Some(shutdown_tx),
508 watcher_shutdown: Some(watcher_tx),
509 url,
510 stop_flag,
511 flush_thread,
512 learnings_thread,
513 })
514}
515
516fn learnings_flush_loop(
522 state: &Arc<BrokerState>,
523 stop: &Arc<AtomicBool>,
524 flush_interval_seconds: u64,
525) {
526 let Some(aggregator) = state.learnings.clone() else {
527 return;
528 };
529 let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
530 let tick = std::time::Duration::from_millis(100);
531
532 loop {
533 let mut elapsed = std::time::Duration::ZERO;
534 while elapsed < interval {
535 if stop.load(Ordering::Acquire) {
536 if let Ok(mut agg) = aggregator.lock() {
537 let _ = agg.flush_at_shutdown();
538 }
539 return;
540 }
541 std::thread::sleep(tick);
542 elapsed += tick;
543 }
544 if let Ok(mut agg) = aggregator.lock() {
545 let _ = agg.flush();
546 }
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn broker_state_new_is_empty() {
556 let state = BrokerState::new(None);
557 let inner = state.read();
558 assert!(inner.agents.is_empty());
559 assert!(inner.queues.is_empty());
560 assert!(inner.message_log.is_empty());
561 }
562
563 #[test]
564 fn next_seq_starts_at_one() {
565 let state = BrokerState::new(None);
566 assert_eq!(state.next_seq(), 1);
567 assert_eq!(state.next_seq(), 2);
568 assert_eq!(state.next_seq(), 3);
569 }
570
571 #[test]
572 fn probe_no_listener() {
573 let result = probe_existing_broker("http://127.0.0.1:19999");
575 assert_eq!(result, ProbeResult::NoListener);
576 }
577
578 #[test]
579 fn reattached_handle_has_no_runtime() {
580 let state = Arc::new(BrokerState::new(None));
581 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
582 assert!(h.runtime.is_none());
583 assert!(h.shutdown_tx.is_none());
584 assert!(h.flush_thread.is_none());
585 }
586
587 #[test]
588 fn start_broker_on_free_port() {
589 let config = BrokerConfig {
590 enabled: true,
591 #[allow(clippy::cast_possible_truncation)]
593 port: 19_000 + (std::process::id() as u16 % 1000),
594 bind: "127.0.0.1".to_string(),
595 };
596 let state = BrokerState::new(None);
597 let handle = start_broker(&config, state, Vec::new());
598 if let Ok(h) = handle {
600 assert!(h.url.contains(&config.port.to_string()));
601 drop(h);
602 }
603 }
604
605 #[test]
606 fn start_broker_no_log_path_no_flush_thread() {
607 let config = BrokerConfig {
608 enabled: true,
609 #[allow(clippy::cast_possible_truncation)]
610 port: 19_100 + (std::process::id() as u16 % 100),
611 bind: "127.0.0.1".to_string(),
612 };
613 let state = BrokerState::new(None);
614 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
615 assert!(handle.flush_thread.is_none());
616 drop(handle);
617 }
618 }
619
620 #[test]
621 fn start_broker_with_log_path_spawns_flush_thread() {
622 let tmp = tempfile::tempdir().unwrap();
623 let log_path = tmp.path().join("broker.log");
624 let config = BrokerConfig {
625 enabled: true,
626 #[allow(clippy::cast_possible_truncation)]
627 port: 19_200 + (std::process::id() as u16 % 100),
628 bind: "127.0.0.1".to_string(),
629 };
630 let state = BrokerState::new(Some(log_path));
631 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
632 assert!(handle.flush_thread.is_some());
633 drop(handle);
634 }
635 }
636}