pub mod delivery;
pub mod messages;
pub mod server;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::time::Instant;
use serde::Serialize;
use crate::config::BrokerConfig;
pub use messages::BrokerMessage;
#[derive(Debug, Clone)]
pub struct AgentRecord {
pub agent_id: String,
pub status: String,
pub last_seen: Instant,
pub last_message: Option<BrokerMessage>,
}
#[derive(Debug, Clone, Serialize)]
pub struct AgentStatusEntry {
pub agent_id: String,
pub cli: String,
pub status: String,
pub last_seen_seconds: u64,
pub summary: String,
#[serde(skip)]
pub last_seen: Instant,
}
#[derive(Debug)]
pub struct BrokerStateInner {
pub agents: HashMap<String, AgentRecord>,
pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
}
#[derive(Debug)]
pub struct BrokerState {
inner: RwLock<BrokerStateInner>,
next_seq: AtomicU64,
pub log_path: Option<PathBuf>,
}
impl BrokerState {
pub fn new(log_path: Option<PathBuf>) -> Self {
Self {
inner: RwLock::new(BrokerStateInner {
agents: HashMap::new(),
queues: HashMap::new(),
message_log: Vec::new(),
}),
next_seq: AtomicU64::new(0),
log_path,
}
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
self.inner.read().expect("broker state lock poisoned")
}
pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
self.inner.write().expect("broker state lock poisoned")
}
pub fn next_seq(&self) -> u64 {
self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
}
pub fn uptime_seconds(&self) -> u64 {
0
}
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error(
"port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
)]
PortInUse {
port: u16,
source: std::io::Error,
},
#[error("broker probe timed out on port {port} — check for stuck processes on this port")]
ProbeTimeout {
port: u16,
},
#[error("failed to bind broker: {0}")]
BindFailed(std::io::Error),
#[error("failed to create broker runtime: {0}")]
RuntimeFailed(std::io::Error),
}
pub struct BrokerHandle {
pub state: Arc<BrokerState>,
runtime: Option<tokio::runtime::Runtime>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
pub url: String,
stop_flag: Arc<AtomicBool>,
flush_thread: Option<JoinHandle<()>>,
}
impl BrokerHandle {
fn reattached(url: String, state: Arc<BrokerState>) -> Self {
Self {
state,
runtime: None,
shutdown_tx: None,
url,
stop_flag: Arc::new(AtomicBool::new(false)),
flush_thread: None,
}
}
}
impl Drop for BrokerHandle {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::Release);
if let Some(handle) = self.flush_thread.take() {
let _ = handle.join();
}
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(rt) = self.runtime.take() {
rt.shutdown_timeout(std::time::Duration::from_secs(2));
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum ProbeResult {
NoListener,
LiveBroker,
ForeignServer,
Timeout,
}
pub fn probe_broker(url: &str) -> ProbeResult {
probe_existing_broker(url)
}
fn probe_existing_broker(url: &str) -> ProbeResult {
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
let addr = url.strip_prefix("http://").unwrap_or(url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
match addr.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(a) => a,
None => return ProbeResult::NoListener,
},
Err(_) => return ProbeResult::NoListener,
}
};
let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
else {
return ProbeResult::NoListener;
};
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.ok();
stream
.set_write_timeout(Some(Duration::from_millis(500)))
.ok();
let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
if stream.write_all(request.as_bytes()).is_err() {
return ProbeResult::Timeout;
}
let mut response = String::new();
if stream.read_to_string(&mut response).is_err() && response.is_empty() {
return ProbeResult::Timeout;
}
if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
ProbeResult::LiveBroker
} else if response.starts_with("HTTP/") {
ProbeResult::ForeignServer
} else {
ProbeResult::Timeout
}
}
pub fn start_broker(
config: &BrokerConfig,
state: BrokerState,
) -> Result<BrokerHandle, BrokerError> {
let url = config.url();
let state = Arc::new(state);
let stop_flag = Arc::new(AtomicBool::new(false));
match probe_existing_broker(&url) {
ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
ProbeResult::ForeignServer => {
return Err(BrokerError::PortInUse {
port: config.port,
source: std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"port occupied by non-broker process",
),
});
}
ProbeResult::Timeout => {
return Err(BrokerError::ProbeTimeout { port: config.port });
}
ProbeResult::NoListener => {}
}
let flush_thread = if state.log_path.is_some() {
let s = Arc::clone(&state);
let f = Arc::clone(&stop_flag);
Some(std::thread::spawn(move || {
delivery::flush_loop(&s, &f);
}))
} else {
None
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(BrokerError::RuntimeFailed)?;
let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
|e: std::net::AddrParseError| {
BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
},
)?;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let router = server::router(Arc::clone(&state));
let listener = runtime.block_on(async {
let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
socket
.set_reuseaddr(true)
.map_err(BrokerError::BindFailed)?;
socket.bind(addr).map_err(BrokerError::BindFailed)?;
socket.listen(1024).map_err(BrokerError::BindFailed)
})?;
runtime.spawn(async {
let _ = tokio::signal::ctrl_c().await;
});
runtime.spawn(async move {
axum::serve(listener, router)
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
})
.await
.ok();
});
Ok(BrokerHandle {
state,
runtime: Some(runtime),
shutdown_tx: Some(shutdown_tx),
url,
stop_flag,
flush_thread,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broker_state_new_is_empty() {
let state = BrokerState::new(None);
let inner = state.read();
assert!(inner.agents.is_empty());
assert!(inner.queues.is_empty());
assert!(inner.message_log.is_empty());
}
#[test]
fn next_seq_starts_at_one() {
let state = BrokerState::new(None);
assert_eq!(state.next_seq(), 1);
assert_eq!(state.next_seq(), 2);
assert_eq!(state.next_seq(), 3);
}
#[test]
fn probe_no_listener() {
let result = probe_existing_broker("http://127.0.0.1:19999");
assert_eq!(result, ProbeResult::NoListener);
}
#[test]
fn reattached_handle_has_no_runtime() {
let state = Arc::new(BrokerState::new(None));
let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
assert!(h.runtime.is_none());
assert!(h.shutdown_tx.is_none());
assert!(h.flush_thread.is_none());
}
#[test]
fn start_broker_on_free_port() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_000 + (std::process::id() as u16 % 1000),
bind: "127.0.0.1".to_string(),
};
let state = BrokerState::new(None);
let handle = start_broker(&config, state);
if let Ok(h) = handle {
assert!(h.url.contains(&config.port.to_string()));
drop(h);
}
}
#[test]
fn start_broker_no_log_path_no_flush_thread() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_100 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
};
let state = BrokerState::new(None);
if let Ok(handle) = start_broker(&config, state) {
assert!(handle.flush_thread.is_none());
drop(handle);
}
}
#[test]
fn start_broker_with_log_path_spawns_flush_thread() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_200 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
};
let state = BrokerState::new(Some(log_path));
if let Ok(handle) = start_broker(&config, state) {
assert!(handle.flush_thread.is_some());
drop(handle);
}
}
}