1pub mod delivery;
16pub mod messages;
17pub mod server;
18
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::{Arc, RwLock};
23use std::thread::JoinHandle;
24use std::time::Instant;
25
26use serde::Serialize;
27
28use crate::config::BrokerConfig;
29pub use messages::BrokerMessage;
30
31#[derive(Debug, Clone)]
33pub struct AgentRecord {
34 pub agent_id: String,
36 pub status: String,
38 pub last_seen: Instant,
40 pub last_message: Option<BrokerMessage>,
42}
43
44#[derive(Debug, Clone, Serialize)]
47pub struct AgentStatusEntry {
48 pub agent_id: String,
50 pub cli: String,
52 pub status: String,
54 pub last_seen_seconds: u64,
56 pub summary: String,
58 #[serde(skip)]
60 pub last_seen: Instant,
61}
62
63#[derive(Debug)]
65pub struct BrokerStateInner {
66 pub agents: HashMap<String, AgentRecord>,
68 pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
70 pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
72}
73
74#[derive(Debug)]
81pub struct BrokerState {
82 inner: RwLock<BrokerStateInner>,
84 next_seq: AtomicU64,
86 pub log_path: Option<PathBuf>,
88}
89
90impl BrokerState {
91 pub fn new(log_path: Option<PathBuf>) -> Self {
93 Self {
94 inner: RwLock::new(BrokerStateInner {
95 agents: HashMap::new(),
96 queues: HashMap::new(),
97 message_log: Vec::new(),
98 }),
99 next_seq: AtomicU64::new(0),
100 log_path,
101 }
102 }
103
104 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
110 self.inner.read().expect("broker state lock poisoned")
111 }
112
113 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
119 self.inner.write().expect("broker state lock poisoned")
120 }
121
122 pub fn next_seq(&self) -> u64 {
124 self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
125 }
126
127 pub fn uptime_seconds(&self) -> u64 {
132 0
136 }
137}
138
139#[derive(Debug, thiserror::Error)]
141pub enum BrokerError {
142 #[error(
144 "port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
145 )]
146 PortInUse {
147 port: u16,
149 source: std::io::Error,
151 },
152
153 #[error("broker probe timed out on port {port} — check for stuck processes on this port")]
155 ProbeTimeout {
156 port: u16,
158 },
159
160 #[error("failed to bind broker: {0}")]
162 BindFailed(std::io::Error),
163
164 #[error("failed to create broker runtime: {0}")]
166 RuntimeFailed(std::io::Error),
167}
168
169pub struct BrokerHandle {
175 pub state: Arc<BrokerState>,
177 runtime: Option<tokio::runtime::Runtime>,
180 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
182 pub url: String,
184 stop_flag: Arc<AtomicBool>,
186 flush_thread: Option<JoinHandle<()>>,
188}
189
190impl BrokerHandle {
191 fn reattached(url: String, state: Arc<BrokerState>) -> Self {
193 Self {
194 state,
195 runtime: None,
196 shutdown_tx: None,
197 url,
198 stop_flag: Arc::new(AtomicBool::new(false)),
199 flush_thread: None,
200 }
201 }
202}
203
204impl Drop for BrokerHandle {
205 fn drop(&mut self) {
206 self.stop_flag.store(true, Ordering::Release);
208 if let Some(handle) = self.flush_thread.take() {
209 let _ = handle.join();
210 }
211 if let Some(tx) = self.shutdown_tx.take() {
213 let _ = tx.send(());
214 }
215 if let Some(rt) = self.runtime.take() {
217 rt.shutdown_timeout(std::time::Duration::from_secs(2));
218 }
219 }
220}
221
222#[derive(Debug, PartialEq, Eq)]
224pub enum ProbeResult {
225 NoListener,
227 LiveBroker,
229 ForeignServer,
231 Timeout,
233}
234
235pub fn probe_broker(url: &str) -> ProbeResult {
244 probe_existing_broker(url)
245}
246
247fn probe_existing_broker(url: &str) -> ProbeResult {
248 use std::io::{Read, Write};
249 use std::net::TcpStream;
250 use std::time::Duration;
251
252 let addr = url.strip_prefix("http://").unwrap_or(url);
254
255 let socket_addr = if let Ok(a) = addr.parse() {
256 a
257 } else {
258 use std::net::ToSocketAddrs;
259 match addr.to_socket_addrs() {
260 Ok(mut addrs) => match addrs.next() {
261 Some(a) => a,
262 None => return ProbeResult::NoListener,
263 },
264 Err(_) => return ProbeResult::NoListener,
265 }
266 };
267
268 let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
269 else {
270 return ProbeResult::NoListener;
271 };
272
273 stream
274 .set_read_timeout(Some(Duration::from_millis(500)))
275 .ok();
276 stream
277 .set_write_timeout(Some(Duration::from_millis(500)))
278 .ok();
279
280 let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
281 if stream.write_all(request.as_bytes()).is_err() {
282 return ProbeResult::Timeout;
283 }
284
285 let mut response = String::new();
286 if stream.read_to_string(&mut response).is_err() && response.is_empty() {
287 return ProbeResult::Timeout;
288 }
289
290 if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
291 ProbeResult::LiveBroker
292 } else if response.starts_with("HTTP/") {
293 ProbeResult::ForeignServer
294 } else {
295 ProbeResult::Timeout
296 }
297}
298
299pub fn start_broker(
309 config: &BrokerConfig,
310 state: BrokerState,
311) -> Result<BrokerHandle, BrokerError> {
312 let url = config.url();
313 let state = Arc::new(state);
314 let stop_flag = Arc::new(AtomicBool::new(false));
315
316 match probe_existing_broker(&url) {
317 ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
318 ProbeResult::ForeignServer => {
319 return Err(BrokerError::PortInUse {
320 port: config.port,
321 source: std::io::Error::new(
322 std::io::ErrorKind::AddrInUse,
323 "port occupied by non-broker process",
324 ),
325 });
326 }
327 ProbeResult::Timeout => {
328 return Err(BrokerError::ProbeTimeout { port: config.port });
329 }
330 ProbeResult::NoListener => {}
331 }
332
333 let flush_thread = if state.log_path.is_some() {
335 let s = Arc::clone(&state);
336 let f = Arc::clone(&stop_flag);
337 Some(std::thread::spawn(move || {
338 delivery::flush_loop(&s, &f);
339 }))
340 } else {
341 None
342 };
343
344 let runtime = tokio::runtime::Builder::new_multi_thread()
345 .enable_all()
346 .build()
347 .map_err(BrokerError::RuntimeFailed)?;
348
349 let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
350 |e: std::net::AddrParseError| {
351 BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
352 },
353 )?;
354
355 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
356
357 let router = server::router(Arc::clone(&state));
358
359 let listener = runtime.block_on(async {
360 let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
361 socket
362 .set_reuseaddr(true)
363 .map_err(BrokerError::BindFailed)?;
364 socket.bind(addr).map_err(BrokerError::BindFailed)?;
365 socket.listen(1024).map_err(BrokerError::BindFailed)
366 })?;
367
368 runtime.spawn(async {
371 let _ = tokio::signal::ctrl_c().await;
372 });
373
374 runtime.spawn(async move {
375 axum::serve(listener, router)
376 .with_graceful_shutdown(async {
377 let _ = shutdown_rx.await;
378 })
379 .await
380 .ok();
381 });
382
383 Ok(BrokerHandle {
384 state,
385 runtime: Some(runtime),
386 shutdown_tx: Some(shutdown_tx),
387 url,
388 stop_flag,
389 flush_thread,
390 })
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn broker_state_new_is_empty() {
399 let state = BrokerState::new(None);
400 let inner = state.read();
401 assert!(inner.agents.is_empty());
402 assert!(inner.queues.is_empty());
403 assert!(inner.message_log.is_empty());
404 }
405
406 #[test]
407 fn next_seq_starts_at_one() {
408 let state = BrokerState::new(None);
409 assert_eq!(state.next_seq(), 1);
410 assert_eq!(state.next_seq(), 2);
411 assert_eq!(state.next_seq(), 3);
412 }
413
414 #[test]
415 fn probe_no_listener() {
416 let result = probe_existing_broker("http://127.0.0.1:19999");
418 assert_eq!(result, ProbeResult::NoListener);
419 }
420
421 #[test]
422 fn reattached_handle_has_no_runtime() {
423 let state = Arc::new(BrokerState::new(None));
424 let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
425 assert!(h.runtime.is_none());
426 assert!(h.shutdown_tx.is_none());
427 assert!(h.flush_thread.is_none());
428 }
429
430 #[test]
431 fn start_broker_on_free_port() {
432 let config = BrokerConfig {
433 enabled: true,
434 #[allow(clippy::cast_possible_truncation)]
436 port: 19_000 + (std::process::id() as u16 % 1000),
437 bind: "127.0.0.1".to_string(),
438 };
439 let state = BrokerState::new(None);
440 let handle = start_broker(&config, state);
441 if let Ok(h) = handle {
443 assert!(h.url.contains(&config.port.to_string()));
444 drop(h);
445 }
446 }
447
448 #[test]
449 fn start_broker_no_log_path_no_flush_thread() {
450 let config = BrokerConfig {
451 enabled: true,
452 #[allow(clippy::cast_possible_truncation)]
453 port: 19_100 + (std::process::id() as u16 % 100),
454 bind: "127.0.0.1".to_string(),
455 };
456 let state = BrokerState::new(None);
457 if let Ok(handle) = start_broker(&config, state) {
458 assert!(handle.flush_thread.is_none());
459 drop(handle);
460 }
461 }
462
463 #[test]
464 fn start_broker_with_log_path_spawns_flush_thread() {
465 let tmp = tempfile::tempdir().unwrap();
466 let log_path = tmp.path().join("broker.log");
467 let config = BrokerConfig {
468 enabled: true,
469 #[allow(clippy::cast_possible_truncation)]
470 port: 19_200 + (std::process::id() as u16 % 100),
471 bind: "127.0.0.1".to_string(),
472 };
473 let state = BrokerState::new(Some(log_path));
474 if let Ok(handle) = start_broker(&config, state) {
475 assert!(handle.flush_thread.is_some());
476 drop(handle);
477 }
478 }
479}