1pub mod delivery;
16pub mod messages;
17pub mod publish;
18pub mod server;
19pub mod watcher;
20
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24use std::sync::{Arc, RwLock};
25use std::thread::JoinHandle;
26use std::time::Instant;
27
28use serde::Serialize;
29
30use crate::config::BrokerConfig;
31pub use messages::BrokerMessage;
32
33#[derive(Debug, Clone)]
37pub struct WatchTarget {
38 pub agent_id: String,
40 pub cli: String,
42 pub worktree_path: PathBuf,
44}
45
46#[derive(Debug, Clone)]
48pub struct AgentRecord {
49 pub agent_id: String,
51 pub status: String,
53 pub last_seen: Instant,
55 pub last_message: Option<BrokerMessage>,
57}
58
59#[derive(Debug, Clone, Serialize)]
62pub struct AgentStatusEntry {
63 pub agent_id: String,
65 pub cli: String,
67 pub status: String,
69 pub last_seen_seconds: u64,
71 pub summary: String,
73 #[serde(skip)]
75 pub last_seen: Instant,
76}
77
78#[derive(Debug)]
80pub struct BrokerStateInner {
81 pub agents: HashMap<String, AgentRecord>,
83 pub agent_clis: HashMap<String, String>,
85 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
87 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
89}
90
91#[derive(Debug)]
98pub struct BrokerState {
99 inner: RwLock<BrokerStateInner>,
101 next_seq: AtomicU64,
103 pub log_path: Option<PathBuf>,
105 started_at: Instant,
107}
108
109impl BrokerState {
110 pub fn new(log_path: Option<PathBuf>) -> Self {
112 Self {
113 inner: RwLock::new(BrokerStateInner {
114 agents: HashMap::new(),
115 agent_clis: HashMap::new(),
116 queues: HashMap::new(),
117 message_log: Vec::new(),
118 }),
119 next_seq: AtomicU64::new(0),
120 log_path,
121 started_at: Instant::now(),
122 }
123 }
124
125 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
131 self.inner.read().expect("broker state lock poisoned")
132 }
133
134 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
140 self.inner.write().expect("broker state lock poisoned")
141 }
142
143 pub fn next_seq(&self) -> u64 {
145 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
146 }
147
148 pub fn uptime_seconds(&self) -> u64 {
152 self.started_at.elapsed().as_secs()
153 }
154}
155
156#[derive(Debug, thiserror::Error)]
158pub enum BrokerError {
159 #[error(
161 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
162 )]
163 PortInUse {
164 port: u16,
166 source: std::io::Error,
168 },
169
170 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
172 ProbeTimeout {
173 port: u16,
175 },
176
177 #[error("failed to bind broker: {0}")]
179 BindFailed(std::io::Error),
180
181 #[error("failed to create broker runtime: {0}")]
183 RuntimeFailed(std::io::Error),
184}
185
186pub struct BrokerHandle {
192 pub state: Arc<BrokerState>,
194 runtime: Option<tokio::runtime::Runtime>,
197 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
199 watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
201 pub url: String,
203 stop_flag: Arc<AtomicBool>,
205 flush_thread: Option<JoinHandle<()>>,
207}
208
209impl BrokerHandle {
210 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
212 Self {
213 state,
214 runtime: None,
215 shutdown_tx: None,
216 watcher_shutdown: None,
217 url,
218 stop_flag: Arc::new(AtomicBool::new(false)),
219 flush_thread: None,
220 }
221 }
222}
223
224impl Drop for BrokerHandle {
225 fn drop(&mut self) {
226 self.stop_flag.store(true, Ordering::Release);
228 if let Some(handle) = self.flush_thread.take() {
229 let _ = handle.join();
230 }
231 if let Some(tx) = self.watcher_shutdown.take() {
233 let _ = tx.send(true);
234 }
235 if let Some(tx) = self.shutdown_tx.take() {
237 let _ = tx.send(());
238 }
239 if let Some(rt) = self.runtime.take() {
241 rt.shutdown_timeout(std::time::Duration::from_secs(2));
242 }
243 }
244}
245
246#[derive(Debug, PartialEq, Eq)]
248pub enum ProbeResult {
249 NoListener,
251 LiveBroker,
253 ForeignServer,
255 Timeout,
257}
258
259pub fn probe_broker(url: &str) -> ProbeResult {
268 probe_existing_broker(url)
269}
270
271fn probe_existing_broker(url: &str) -> ProbeResult {
272 use std::io::{Read, Write};
273 use std::net::TcpStream;
274 use std::time::Duration;
275
276 let addr = url.strip_prefix("http://").unwrap_or(url);
278
279 let socket_addr = if let Ok(a) = addr.parse() {
280 a
281 } else {
282 use std::net::ToSocketAddrs;
283 match addr.to_socket_addrs() {
284 Ok(mut addrs) => match addrs.next() {
285 Some(a) => a,
286 None => return ProbeResult::NoListener,
287 },
288 Err(_) => return ProbeResult::NoListener,
289 }
290 };
291
292 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
293 else {
294 return ProbeResult::NoListener;
295 };
296
297 stream
298 .set_read_timeout(Some(Duration::from_millis(500)))
299 .ok();
300 stream
301 .set_write_timeout(Some(Duration::from_millis(500)))
302 .ok();
303
304 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
305 if stream.write_all(request.as_bytes()).is_err() {
306 return ProbeResult::Timeout;
307 }
308
309 let mut response = String::new();
310 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
311 return ProbeResult::Timeout;
312 }
313
314 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
315 ProbeResult::LiveBroker
316 } else if response.starts_with("HTTP/") {
317 ProbeResult::ForeignServer
318 } else {
319 ProbeResult::Timeout
320 }
321}
322
323pub fn start_broker(
333 config: &BrokerConfig,
334 state: BrokerState,
335 watch_targets: Vec<WatchTarget>,
336) -> Result<BrokerHandle, BrokerError> {
337 let url = config.url();
338 let state = Arc::new(state);
339 let stop_flag = Arc::new(AtomicBool::new(false));
340
341 match probe_existing_broker(&url) {
342 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
343 ProbeResult::ForeignServer => {
344 return Err(BrokerError::PortInUse {
345 port: config.port,
346 source: std::io::Error::new(
347 std::io::ErrorKind::AddrInUse,
348 "port occupied by non-broker process",
349 ),
350 });
351 }
352 ProbeResult::Timeout => {
353 return Err(BrokerError::ProbeTimeout { port: config.port });
354 }
355 ProbeResult::NoListener => {}
356 }
357
358 let flush_thread = if state.log_path.is_some() {
360 let s = Arc::clone(&state);
361 let f = Arc::clone(&stop_flag);
362 Some(std::thread::spawn(move || {
363 delivery::flush_loop(&s, &f);
364 }))
365 } else {
366 None
367 };
368
369 let runtime = tokio::runtime::Builder::new_multi_thread()
370 .enable_all()
371 .build()
372 .map_err(BrokerError::RuntimeFailed)?;
373
374 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
375 |e: std::net::AddrParseError| {
376 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
377 },
378 )?;
379
380 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
381
382 let router = server::router(Arc::clone(&state));
383
384 let listener = runtime.block_on(async {
385 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
386 socket
387 .set_reuseaddr(true)
388 .map_err(BrokerError::BindFailed)?;
389 socket.bind(addr).map_err(BrokerError::BindFailed)?;
390 socket.listen(1024).map_err(BrokerError::BindFailed)
391 })?;
392
393 runtime.spawn(async {
396 let _ = tokio::signal::ctrl_c().await;
397 });
398
399 runtime.spawn(async move {
400 axum::serve(listener, router)
401 .with_graceful_shutdown(async {
402 let _ = shutdown_rx.await;
403 })
404 .await
405 .ok();
406 });
407
408 {
414 let mut inner = state.write();
415 for target in &watch_targets {
416 inner
417 .agent_clis
418 .insert(target.agent_id.clone(), target.cli.clone());
419 inner.queues.entry(target.agent_id.clone()).or_default();
420 }
421 }
422
423 let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
427 for target in watch_targets {
428 let s = Arc::clone(&state);
429 let rx = watcher_rx.clone();
430 runtime.spawn(watcher::watch_worktree(s, target, rx));
431 }
432
433 Ok(BrokerHandle {
434 state,
435 runtime: Some(runtime),
436 shutdown_tx: Some(shutdown_tx),
437 watcher_shutdown: Some(watcher_tx),
438 url,
439 stop_flag,
440 flush_thread,
441 })
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn broker_state_new_is_empty() {
450 let state = BrokerState::new(None);
451 let inner = state.read();
452 assert!(inner.agents.is_empty());
453 assert!(inner.queues.is_empty());
454 assert!(inner.message_log.is_empty());
455 }
456
457 #[test]
458 fn next_seq_starts_at_one() {
459 let state = BrokerState::new(None);
460 assert_eq!(state.next_seq(), 1);
461 assert_eq!(state.next_seq(), 2);
462 assert_eq!(state.next_seq(), 3);
463 }
464
465 #[test]
466 fn probe_no_listener() {
467 let result = probe_existing_broker("http://127.0.0.1:19999");
469 assert_eq!(result, ProbeResult::NoListener);
470 }
471
472 #[test]
473 fn reattached_handle_has_no_runtime() {
474 let state = Arc::new(BrokerState::new(None));
475 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
476 assert!(h.runtime.is_none());
477 assert!(h.shutdown_tx.is_none());
478 assert!(h.flush_thread.is_none());
479 }
480
481 #[test]
482 fn start_broker_on_free_port() {
483 let config = BrokerConfig {
484 enabled: true,
485 #[allow(clippy::cast_possible_truncation)]
487 port: 19_000 + (std::process::id() as u16 % 1000),
488 bind: "127.0.0.1".to_string(),
489 };
490 let state = BrokerState::new(None);
491 let handle = start_broker(&config, state, Vec::new());
492 if let Ok(h) = handle {
494 assert!(h.url.contains(&config.port.to_string()));
495 drop(h);
496 }
497 }
498
499 #[test]
500 fn start_broker_no_log_path_no_flush_thread() {
501 let config = BrokerConfig {
502 enabled: true,
503 #[allow(clippy::cast_possible_truncation)]
504 port: 19_100 + (std::process::id() as u16 % 100),
505 bind: "127.0.0.1".to_string(),
506 };
507 let state = BrokerState::new(None);
508 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
509 assert!(handle.flush_thread.is_none());
510 drop(handle);
511 }
512 }
513
514 #[test]
515 fn start_broker_with_log_path_spawns_flush_thread() {
516 let tmp = tempfile::tempdir().unwrap();
517 let log_path = tmp.path().join("broker.log");
518 let config = BrokerConfig {
519 enabled: true,
520 #[allow(clippy::cast_possible_truncation)]
521 port: 19_200 + (std::process::id() as u16 % 100),
522 bind: "127.0.0.1".to_string(),
523 };
524 let state = BrokerState::new(Some(log_path));
525 if let Ok(handle) = start_broker(&config, state, Vec::new()) {
526 assert!(handle.flush_thread.is_some());
527 drop(handle);
528 }
529 }
530}