use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
use crate::broker::messages::{BrokerMessage, StatusPayload};
use crate::error::PawError;
pub fn build_status_message(
agent_id: &str,
status: &str,
message: Option<String>,
cli: Option<&str>,
) -> BrokerMessage {
BrokerMessage::Status {
agent_id: agent_id.to_string(),
payload: StatusPayload {
status: status.to_string(),
modified_files: Vec::new(),
message,
cli: cli.map(str::to_string),
phase: None,
detail: None,
},
}
}
pub fn publish_to_broker_http(broker_url: &str, msg: &BrokerMessage) -> Result<(), PawError> {
let body = serde_json::to_string(msg)
.map_err(|e| PawError::SessionError(format!("failed to serialize broker message: {e}")))?;
let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
addr.to_socket_addrs()
.map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
.next()
.ok_or_else(|| {
PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
})?
};
let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
.map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
let request = format!(
"POST /publish HTTP/1.1\r\nHost: {addr}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(request.as_bytes())
.map_err(|e| PawError::SessionError(format!("failed to write broker request: {e}")))?;
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
if !(response.starts_with("HTTP/1.1 202") || response.starts_with("HTTP/1.0 202")) {
return Err(PawError::SessionError(format!(
"broker rejected publish: {}",
response.lines().next().unwrap_or("<empty>")
)));
}
Ok(())
}
pub fn register_watch_target_http(
broker_url: &str,
agent_id: &str,
worktree_path: &std::path::Path,
cli: &str,
) -> Result<(), PawError> {
let body = serde_json::json!({
"agent_id": agent_id,
"worktree_path": worktree_path.to_string_lossy(),
"cli": cli,
})
.to_string();
let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
addr.to_socket_addrs()
.map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
.next()
.ok_or_else(|| {
PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
})?
};
let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
.map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
let request = format!(
"POST /watch HTTP/1.1\r\nHost: {addr}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
stream
.write_all(request.as_bytes())
.map_err(|e| PawError::SessionError(format!("failed to write broker request: {e}")))?;
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
if !(response.starts_with("HTTP/1.1 202") || response.starts_with("HTTP/1.0 202")) {
return Err(PawError::SessionError(format!(
"broker rejected watch registration: {}",
response.lines().next().unwrap_or("<empty>")
)));
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct BrokerLogEntry {
pub seq: u64,
pub timestamp_unix_secs: u64,
pub message: BrokerMessage,
}
pub fn fetch_log_entries_over_http(broker_url: &str) -> Result<Vec<BrokerLogEntry>, PawError> {
let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
addr.to_socket_addrs()
.map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
.next()
.ok_or_else(|| {
PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
})?
};
let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
.map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
let request = format!(
"GET /log HTTP/1.1\r\nHost: {addr}\r\nAccept: application/json\r\nConnection: close\r\n\r\n",
);
stream
.write_all(request.as_bytes())
.map_err(|e| PawError::SessionError(format!("failed to write broker request: {e}")))?;
let mut response = String::new();
stream
.read_to_string(&mut response)
.map_err(|e| PawError::SessionError(format!("failed to read broker response: {e}")))?;
if !(response.starts_with("HTTP/1.1 200") || response.starts_with("HTTP/1.0 200")) {
return Err(PawError::SessionError(format!(
"broker /log returned non-200: {}",
response.lines().next().unwrap_or("<empty>")
)));
}
let body = response
.split_once("\r\n\r\n")
.map(|(_, b)| b)
.ok_or_else(|| {
PawError::SessionError("broker /log response missing body separator".to_string())
})?;
let parsed: serde_json::Value = serde_json::from_str(body)
.map_err(|e| PawError::SessionError(format!("broker /log returned invalid JSON: {e}")))?;
let entries = parsed
.get("entries")
.and_then(|v| v.as_array())
.ok_or_else(|| {
PawError::SessionError("broker /log response missing entries array".to_string())
})?;
let mut out = Vec::with_capacity(entries.len());
for entry in entries {
if let Some(msg_value) = entry.get("message")
&& let Ok(msg) = serde_json::from_value::<BrokerMessage>(msg_value.clone())
{
out.push(BrokerLogEntry {
seq: entry
.get("seq")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0),
timestamp_unix_secs: entry
.get("timestamp_unix_secs")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0),
message: msg,
});
}
}
Ok(out)
}
pub fn fetch_log_over_http(broker_url: &str) -> Result<Vec<BrokerMessage>, PawError> {
Ok(fetch_log_entries_over_http(broker_url)?
.into_iter()
.map(|e| e.message)
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_status_message_with_explicit_cli_populates_cli_field() {
let msg = build_status_message(
"supervisor",
"working",
Some("Supervisor booting".to_string()),
Some("claude"),
);
let BrokerMessage::Status { agent_id, payload } = msg else {
panic!("expected BrokerMessage::Status");
};
assert_eq!(agent_id, "supervisor");
assert_eq!(payload.status, "working");
assert_eq!(payload.message.as_deref(), Some("Supervisor booting"));
assert_eq!(payload.cli.as_deref(), Some("claude"));
assert_eq!(payload.phase, None);
}
#[test]
fn register_watch_target_http_surfaces_hot_added_agent() {
use std::process::Command;
use crate::broker::{BrokerState, ProbeResult, probe_broker, start_broker};
use crate::config::BrokerConfig;
let tmp = tempfile::tempdir().unwrap();
let run = |args: &[&str]| {
Command::new("git")
.args(args)
.current_dir(tmp.path())
.output()
.expect("git command failed");
};
run(&["init", "-q", "-b", "main"]);
run(&["config", "user.email", "test@example.com"]);
run(&["config", "user.name", "test"]);
run(&["commit", "--allow-empty", "-m", "root", "-q"]);
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 20_300 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
..Default::default()
};
if probe_broker(&config.url()) != ProbeResult::NoListener {
return;
}
let state = BrokerState::new(None);
let Ok(handle) = start_broker(&config, state, Vec::new()) else {
return;
};
register_watch_target_http(&config.url(), "feat-hot", tmp.path(), "claude")
.expect("broker must accept the live watch registration");
std::fs::write(tmp.path().join("hot.rs"), "fn hot() {}").unwrap();
let mut found = false;
for _ in 0..40 {
std::thread::sleep(Duration::from_millis(250));
if handle.state.read().agents.contains_key("feat-hot") {
found = true;
break;
}
}
assert!(
found,
"the hot-added worktree must surface feat-hot in /status via the watcher"
);
drop(handle);
}
#[test]
fn build_status_message_with_none_cli_omits_cli_key_from_json() {
let msg = build_status_message("feat-x", "working", None, None);
let BrokerMessage::Status { ref payload, .. } = msg else {
panic!("expected BrokerMessage::Status");
};
assert_eq!(payload.cli, None);
assert_eq!(payload.phase, None);
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains("\"cli\""),
"cli key must be omitted from JSON when None; got {json}"
);
assert!(
!json.contains("\"phase\""),
"phase key must be omitted from JSON when None; got {json}"
);
}
}