pub mod conflict;
pub mod delivery;
pub mod learnings;
pub mod messages;
pub mod publish;
pub mod server;
pub mod watcher;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, OnceLock, RwLock};
use std::thread::JoinHandle;
use std::time::Instant;
use serde::Serialize;
use crate::config::{BrokerConfig, ConflictConfig};
pub use messages::BrokerMessage;
#[derive(Debug, Clone)]
pub struct WatchTarget {
pub agent_id: String,
pub cli: String,
pub worktree_path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct AgentRecord {
pub agent_id: String,
pub status: String,
pub last_seen: Instant,
pub last_message: Option<BrokerMessage>,
pub last_committed_at: Option<Instant>,
}
#[derive(Debug, Clone, Serialize)]
pub struct AgentStatusEntry {
pub agent_id: String,
pub cli: String,
pub status: String,
pub last_seen_seconds: u64,
pub summary: String,
#[serde(skip)]
pub last_seen: Instant,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase: Option<String>,
}
#[derive(Debug)]
pub struct BrokerStateInner {
pub agents: HashMap<String, AgentRecord>,
pub agent_clis: HashMap<String, String>,
pub queues: HashMap<String, Vec<(u64, BrokerMessage)>>,
pub message_log: Vec<(u64, std::time::SystemTime, BrokerMessage)>,
pub republish_working_ttl: std::time::Duration,
pub watched_paths: HashSet<PathBuf>,
}
#[derive(Debug)]
pub struct BrokerState {
inner: RwLock<BrokerStateInner>,
next_seq: AtomicU64,
pub log_path: Option<PathBuf>,
started_at: Instant,
pub learnings: Option<learnings::SharedLearnings>,
pub verify_on_commit_nudge: bool,
pub role_gating: Option<crate::opsx::RoleGatingContext>,
watcher_shutdown_rx: OnceLock<tokio::sync::watch::Receiver<bool>>,
}
impl BrokerState {
pub fn new(log_path: Option<PathBuf>) -> Self {
Self {
inner: RwLock::new(BrokerStateInner {
agents: HashMap::new(),
agent_clis: HashMap::new(),
queues: HashMap::new(),
message_log: Vec::new(),
republish_working_ttl: std::time::Duration::from_secs(
crate::config::WatcherConfig::DEFAULT_REPUBLISH_TTL_SECONDS,
),
watched_paths: HashSet::new(),
}),
next_seq: AtomicU64::new(0),
log_path,
started_at: Instant::now(),
learnings: None,
verify_on_commit_nudge: false,
role_gating: None,
watcher_shutdown_rx: OnceLock::new(),
}
}
#[must_use]
pub fn with_role_gating(mut self, ctx: crate::opsx::RoleGatingContext) -> Self {
self.role_gating = Some(ctx);
self
}
#[must_use]
pub fn with_verify_on_commit_nudge(mut self, enabled: bool) -> Self {
self.verify_on_commit_nudge = enabled;
self
}
#[must_use]
pub fn with_seeded_cli(self, agent_id: &str, cli: &str) -> Self {
if !cli.is_empty()
&& let Ok(mut inner) = self.inner.write()
{
inner
.agent_clis
.insert(agent_id.to_string(), cli.to_string());
}
self
}
pub fn attach_learnings(&mut self, aggregator: learnings::SharedLearnings) {
self.learnings = Some(aggregator);
}
pub fn set_republish_working_ttl(&self, ttl: std::time::Duration) {
self.write().republish_working_ttl = ttl;
}
pub fn set_watcher_shutdown_rx(&self, rx: tokio::sync::watch::Receiver<bool>) {
let _ = self.watcher_shutdown_rx.set(rx);
}
#[must_use]
pub fn watcher_shutdown_rx(&self) -> Option<tokio::sync::watch::Receiver<bool>> {
self.watcher_shutdown_rx.get().cloned()
}
pub fn register_watch_target(&self, target: &WatchTarget) -> bool {
let mut inner = self.write();
if !inner.watched_paths.insert(target.worktree_path.clone()) {
return false;
}
if !target.cli.is_empty() {
inner
.agent_clis
.insert(target.agent_id.clone(), target.cli.clone());
}
inner.queues.entry(target.agent_id.clone()).or_default();
true
}
pub fn forget_watch_target(&self, worktree_path: &std::path::Path) {
self.write().watched_paths.remove(worktree_path);
}
pub fn read(&self) -> std::sync::RwLockReadGuard<'_, BrokerStateInner> {
self.inner.read().expect("broker state lock poisoned")
}
pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, BrokerStateInner> {
self.inner.write().expect("broker state lock poisoned")
}
pub fn next_seq(&self) -> u64 {
self.next_seq.fetch_add(1, Ordering::Relaxed) + 1
}
pub fn uptime_seconds(&self) -> u64 {
self.started_at.elapsed().as_secs()
}
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error(
"port {port} is already in use by another process — change [broker] port in .git-paw/config.toml"
)]
PortInUse {
port: u16,
source: std::io::Error,
},
#[error("broker probe timed out on port {port} — check for stuck processes on this port")]
ProbeTimeout {
port: u16,
},
#[error("failed to bind broker: {0}")]
BindFailed(std::io::Error),
#[error("failed to create broker runtime: {0}")]
RuntimeFailed(std::io::Error),
}
pub struct BrokerHandle {
pub state: Arc<BrokerState>,
runtime: Option<tokio::runtime::Runtime>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
watcher_shutdown: Option<tokio::sync::watch::Sender<bool>>,
pub url: String,
stop_flag: Arc<AtomicBool>,
flush_thread: Option<JoinHandle<()>>,
learnings_thread: Option<JoinHandle<()>>,
}
impl BrokerHandle {
fn reattached(url: String, state: Arc<BrokerState>) -> Self {
Self {
state,
runtime: None,
shutdown_tx: None,
watcher_shutdown: None,
url,
stop_flag: Arc::new(AtomicBool::new(false)),
flush_thread: None,
learnings_thread: None,
}
}
}
impl Drop for BrokerHandle {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::Release);
if let Some(handle) = self.flush_thread.take() {
let _ = handle.join();
}
if let Some(handle) = self.learnings_thread.take() {
let _ = handle.join();
}
if let Some(tx) = self.watcher_shutdown.take() {
let _ = tx.send(true);
}
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(rt) = self.runtime.take() {
rt.shutdown_timeout(std::time::Duration::from_secs(2));
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum ProbeResult {
NoListener,
LiveBroker,
ForeignServer,
Timeout,
}
pub fn probe_broker(url: &str) -> ProbeResult {
probe_existing_broker(url)
}
fn probe_existing_broker(url: &str) -> ProbeResult {
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
let addr = url.strip_prefix("http://").unwrap_or(url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
match addr.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(a) => a,
None => return ProbeResult::NoListener,
},
Err(_) => return ProbeResult::NoListener,
}
};
let Ok(mut stream) = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
else {
return ProbeResult::NoListener;
};
stream
.set_read_timeout(Some(Duration::from_millis(500)))
.ok();
stream
.set_write_timeout(Some(Duration::from_millis(500)))
.ok();
let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
if stream.write_all(request.as_bytes()).is_err() {
return ProbeResult::Timeout;
}
let mut response = String::new();
if stream.read_to_string(&mut response).is_err() && response.is_empty() {
return ProbeResult::Timeout;
}
if response.contains("\"git_paw\":true") || response.contains("\"git_paw\": true") {
ProbeResult::LiveBroker
} else if response.starts_with("HTTP/") {
ProbeResult::ForeignServer
} else {
ProbeResult::Timeout
}
}
pub fn start_broker(
config: &BrokerConfig,
state: BrokerState,
watch_targets: Vec<WatchTarget>,
) -> Result<BrokerHandle, BrokerError> {
start_broker_with(config, state, watch_targets, None, 60)
}
#[allow(clippy::too_many_lines)]
pub fn start_broker_with(
config: &BrokerConfig,
state: BrokerState,
watch_targets: Vec<WatchTarget>,
conflict: Option<ConflictConfig>,
learnings_flush_interval_seconds: u64,
) -> Result<BrokerHandle, BrokerError> {
let url = config.url();
let state = Arc::new(state);
state.set_republish_working_ttl(std::time::Duration::from_secs(
config.watcher.republish_working_ttl_seconds(),
));
let stop_flag = Arc::new(AtomicBool::new(false));
match probe_existing_broker(&url) {
ProbeResult::LiveBroker => return Ok(BrokerHandle::reattached(url, state)),
ProbeResult::ForeignServer => {
return Err(BrokerError::PortInUse {
port: config.port,
source: std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"port occupied by non-broker process",
),
});
}
ProbeResult::Timeout => {
return Err(BrokerError::ProbeTimeout { port: config.port });
}
ProbeResult::NoListener => {}
}
let flush_thread = if state.log_path.is_some() {
let s = Arc::clone(&state);
let f = Arc::clone(&stop_flag);
Some(std::thread::spawn(move || {
delivery::flush_loop(&s, &f);
}))
} else {
None
};
let learnings_thread = if state.learnings.is_some() {
let s = Arc::clone(&state);
let f = Arc::clone(&stop_flag);
Some(std::thread::spawn(move || {
learnings_flush_loop(&s, &f, learnings_flush_interval_seconds);
}))
} else {
None
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(BrokerError::RuntimeFailed)?;
let addr: std::net::SocketAddr = format!("{}:{}", config.bind, config.port).parse().map_err(
|e: std::net::AddrParseError| {
BrokerError::BindFailed(std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
},
)?;
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let router = server::router(Arc::clone(&state));
let listener = runtime.block_on(async {
let socket = tokio::net::TcpSocket::new_v4().map_err(BrokerError::BindFailed)?;
socket
.set_reuseaddr(true)
.map_err(BrokerError::BindFailed)?;
socket.bind(addr).map_err(BrokerError::BindFailed)?;
socket.listen(1024).map_err(BrokerError::BindFailed)
})?;
runtime.spawn(async {
let _ = tokio::signal::ctrl_c().await;
});
runtime.spawn(async move {
axum::serve(listener, router)
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
})
.await
.ok();
});
{
let mut inner = state.write();
for target in &watch_targets {
inner
.agent_clis
.insert(target.agent_id.clone(), target.cli.clone());
inner.queues.entry(target.agent_id.clone()).or_default();
inner.watched_paths.insert(target.worktree_path.clone());
}
}
let (watcher_tx, watcher_rx) = tokio::sync::watch::channel(false);
state.set_watcher_shutdown_rx(watcher_rx.clone());
for target in watch_targets {
let s = Arc::clone(&state);
let rx = watcher_rx.clone();
runtime.spawn(watcher::watch_worktree(s, target, rx));
}
if let Some(conflict_cfg) = conflict {
let s = Arc::clone(&state);
let rx = watcher_rx.clone();
runtime.spawn(conflict::run_detector_loop(s, conflict_cfg, rx));
}
Ok(BrokerHandle {
state,
runtime: Some(runtime),
shutdown_tx: Some(shutdown_tx),
watcher_shutdown: Some(watcher_tx),
url,
stop_flag,
flush_thread,
learnings_thread,
})
}
fn learnings_flush_loop(
state: &Arc<BrokerState>,
stop: &Arc<AtomicBool>,
flush_interval_seconds: u64,
) {
let Some(aggregator) = state.learnings.clone() else {
return;
};
let interval = std::time::Duration::from_secs(flush_interval_seconds.max(1));
let tick = std::time::Duration::from_millis(100);
loop {
let mut elapsed = std::time::Duration::ZERO;
while elapsed < interval {
if stop.load(Ordering::Acquire) {
if let Ok(mut agg) = aggregator.lock() {
let _ = agg.flush_at_shutdown();
}
publish_pending_learnings(state, &aggregator);
return;
}
std::thread::sleep(tick);
elapsed += tick;
}
if let Ok(mut agg) = aggregator.lock() {
let _ = agg.flush();
}
publish_pending_learnings(state, &aggregator);
}
}
fn publish_pending_learnings(state: &Arc<BrokerState>, aggregator: &learnings::SharedLearnings) {
let records = match aggregator.lock() {
Ok(mut agg) => agg.take_pending_publish(),
Err(_) => return,
};
for record in &records {
delivery::publish_message(state, &BrokerMessage::from(record));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broker_state_new_is_empty() {
let state = BrokerState::new(None);
let inner = state.read();
assert!(inner.agents.is_empty());
assert!(inner.queues.is_empty());
assert!(inner.message_log.is_empty());
}
#[test]
fn register_watch_target_is_idempotent_and_seeds_roster() {
let state = BrokerState::new(None);
let target = WatchTarget {
agent_id: "feat-hot".to_string(),
cli: "claude".to_string(),
worktree_path: PathBuf::from("/tmp/feat-hot"),
};
assert!(
state.register_watch_target(&target),
"first registration must return true"
);
assert!(
!state.register_watch_target(&target),
"duplicate registration must return false"
);
let inner = state.read();
assert_eq!(inner.watched_paths.len(), 1, "path recorded exactly once");
assert_eq!(
inner.agent_clis.get("feat-hot").map(String::as_str),
Some("claude"),
"registration seeds the CLI label"
);
assert!(
inner.queues.contains_key("feat-hot"),
"registration seeds the inbox queue"
);
}
#[test]
fn forget_watch_target_allows_re_registration() {
let state = BrokerState::new(None);
let target = WatchTarget {
agent_id: "feat-hot".to_string(),
cli: "claude".to_string(),
worktree_path: PathBuf::from("/tmp/feat-hot"),
};
assert!(state.register_watch_target(&target));
state.forget_watch_target(&target.worktree_path);
assert!(
state.register_watch_target(&target),
"after forgetting, the same path registers fresh again"
);
}
#[test]
fn next_seq_starts_at_one() {
let state = BrokerState::new(None);
assert_eq!(state.next_seq(), 1);
assert_eq!(state.next_seq(), 2);
assert_eq!(state.next_seq(), 3);
}
#[test]
fn probe_no_listener() {
let result = probe_existing_broker("http://127.0.0.1:19999");
assert_eq!(result, ProbeResult::NoListener);
}
#[test]
fn reattached_handle_has_no_runtime() {
let state = Arc::new(BrokerState::new(None));
let h = BrokerHandle::reattached("http://127.0.0.1:9119".into(), state);
assert!(h.runtime.is_none());
assert!(h.shutdown_tx.is_none());
assert!(h.flush_thread.is_none());
}
#[test]
fn start_broker_on_free_port() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_000 + (std::process::id() as u16 % 1000),
bind: "127.0.0.1".to_string(),
..Default::default()
};
let state = BrokerState::new(None);
let handle = start_broker(&config, state, Vec::new());
if let Ok(h) = handle {
assert!(h.url.contains(&config.port.to_string()));
drop(h);
}
}
#[test]
fn start_broker_no_log_path_no_flush_thread() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_100 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
..Default::default()
};
let state = BrokerState::new(None);
if let Ok(handle) = start_broker(&config, state, Vec::new()) {
assert!(handle.flush_thread.is_none());
drop(handle);
}
}
#[test]
fn start_broker_with_log_path_spawns_flush_thread() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_200 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
..Default::default()
};
let state = BrokerState::new(Some(log_path));
if let Ok(handle) = start_broker(&config, state, Vec::new()) {
assert!(handle.flush_thread.is_some());
drop(handle);
}
}
fn conflict_feedback(target: &str, other: &str) -> BrokerMessage {
BrokerMessage::Feedback {
agent_id: target.to_string(),
payload: messages::FeedbackPayload {
from: "supervisor".to_string(),
errors: vec![format!(
"[conflict-detector] in-flight conflict with {other} on src/a.rs"
)],
},
}
}
fn learning_payloads(state: &Arc<BrokerState>) -> Vec<messages::LearningPayload> {
state
.read()
.message_log
.iter()
.filter_map(|(_, _, m)| match m {
BrokerMessage::Learning { payload } => Some(payload.clone()),
_ => None,
})
.collect()
}
fn tick(state: &Arc<BrokerState>) {
let aggregator = state.learnings.clone().expect("aggregator attached");
if let Ok(mut a) = aggregator.lock() {
a.flush().unwrap();
}
publish_pending_learnings(state, &aggregator);
}
fn state_with_aggregator(path: PathBuf, broker_publish: bool) -> Arc<BrokerState> {
let mut agg = learnings::LearningsAggregator::new(path);
agg.set_broker_publish(broker_publish);
agg.register_agent("feat-x");
agg.register_agent("feat-y");
let mut state = BrokerState::new(None);
state.attach_learnings(Arc::new(std::sync::Mutex::new(agg)));
Arc::new(state)
}
#[test]
fn dual_output_publishes_learning_and_writes_file_when_broker_on() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("session-learnings.md");
let state = state_with_aggregator(path.clone(), true);
delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
tick(&state);
let md = std::fs::read_to_string(&path).unwrap();
assert!(
md.contains("### Conflict events"),
"file missing conflict:\n{md}"
);
let learnings = learning_payloads(&state);
assert_eq!(learnings.len(), 1, "expected one agent.learning record");
assert_eq!(learnings[0].category, "conflict_event");
assert_eq!(learnings[0].id.len(), 16);
assert!(
md.contains(&learnings[0].title),
"file title must match broker record title: {}",
learnings[0].title
);
}
#[test]
fn no_broker_publish_when_disabled_but_file_still_written() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("session-learnings.md");
let state = state_with_aggregator(path.clone(), false);
delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
tick(&state);
let md = std::fs::read_to_string(&path).unwrap();
assert!(md.contains("### Conflict events"));
assert!(
learning_payloads(&state).is_empty(),
"no agent.learning record should be published when broker publish is off"
);
}
#[test]
fn re_ticking_does_not_duplicate_learning_records() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("session-learnings.md");
let state = state_with_aggregator(path, true);
delivery::publish_message(&state, &conflict_feedback("feat-x", "feat-y"));
tick(&state);
tick(&state);
assert_eq!(
learning_payloads(&state).len(),
1,
"the conflict record must be published exactly once"
);
}
#[test]
fn branch_scoped_learning_is_routed_to_branch_inbox() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("session-learnings.md");
let state = state_with_aggregator(path, true);
delivery::publish_message(
&state,
&BrokerMessage::Blocked {
agent_id: "feat-x".to_string(),
payload: messages::BlockedPayload {
needs: "types".to_string(),
from: "feat-y".to_string(),
},
},
);
delivery::publish_message(
&state,
&BrokerMessage::Artifact {
agent_id: "feat-x".to_string(),
payload: messages::ArtifactPayload {
status: "done".to_string(),
exports: vec![],
modified_files: vec![],
},
},
);
tick(&state);
let (msgs, _) = delivery::poll_messages(&state, "feat-x", 0);
assert!(
msgs.iter().any(|m| matches!(
m,
BrokerMessage::Learning { payload } if payload.category == "stuck_duration"
)),
"stuck_duration learning should land in feat-x's inbox"
);
}
}