use serde::{Deserialize, Serialize};
use std::path::PathBuf;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LifecycleSource {
pub kind: LifecycleSourceKind,
}
impl LifecycleSource {
pub fn new(kind: LifecycleSourceKind) -> Self {
Self { kind }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LifecycleSourceKind {
ClaudeHook,
AtmMcp,
AgentHook,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketRequest {
pub version: u32,
pub request_id: String,
pub command: String,
pub payload: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketResponse {
pub version: u32,
pub request_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<SocketError>,
}
impl SocketResponse {
pub fn is_ok(&self) -> bool {
self.status == "ok"
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketError {
pub code: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStateInfo {
pub state: String,
pub last_transition: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentSummary {
pub agent: String,
pub state: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CanonicalMemberState {
pub agent: String,
pub state: String,
#[serde(default)]
pub activity: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub process_id: Option<u32>,
#[serde(default)]
pub reason: String,
#[serde(default)]
pub source: String,
}
pub fn canonical_status_label(state: Option<&CanonicalMemberState>) -> &'static str {
match state.map(|s| s.state.as_str()) {
Some("active") => "Active",
Some("idle") => "Idle",
Some("offline") | Some("dead") => "Dead",
_ => "Unknown",
}
}
pub fn canonical_activity_label(state: Option<&CanonicalMemberState>) -> &'static str {
match state.map(|s| s.activity.as_str()) {
Some("busy") => "Busy",
Some("idle") => "Idle",
_ => "Unknown",
}
}
pub fn canonical_liveness_bool(state: Option<&CanonicalMemberState>) -> Option<bool> {
match state.map(|s| s.state.as_str()) {
Some("active") | Some("idle") => Some(true),
Some("offline") | Some("dead") => Some(false),
_ => None,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchConfig {
pub agent: String,
pub team: String,
pub command: String,
pub prompt: Option<String>,
pub timeout_secs: u32,
pub env_vars: std::collections::HashMap<String, String>,
#[serde(default)]
pub runtime: Option<String>,
#[serde(default)]
pub resume_session_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchResult {
pub agent: String,
pub pane_id: String,
pub state: String,
pub warning: Option<String>,
}
pub fn launch_agent(config: &LaunchConfig) -> anyhow::Result<Option<LaunchResult>> {
let payload = match serde_json::to_value(config) {
Ok(v) => v,
Err(e) => anyhow::bail!("Failed to serialize LaunchConfig: {e}"),
};
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "launch".to_string(),
payload,
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
let msg = response
.error
.map(|e| format!("{}: {}", e.code, e.message))
.unwrap_or_else(|| "unknown daemon error".to_string());
anyhow::bail!("Daemon returned error for launch command: {msg}");
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<LaunchResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(e) => anyhow::bail!("Failed to parse LaunchResult from daemon response: {e}"),
}
}
pub fn daemon_socket_path() -> anyhow::Result<PathBuf> {
let home = crate::home::get_home_dir()?;
Ok(home.join(".claude/daemon/atm-daemon.sock"))
}
pub fn daemon_pid_path() -> anyhow::Result<PathBuf> {
let home = crate::home::get_home_dir()?;
Ok(home.join(".claude/daemon/atm-daemon.pid"))
}
pub fn daemon_is_running() -> bool {
#[cfg(unix)]
{
let pid_path = match daemon_pid_path() {
Ok(p) => p,
Err(_) => return false,
};
if let Ok(content) = std::fs::read_to_string(&pid_path) {
if let Ok(pid) = content.trim().parse::<i32>() {
return pid_alive(pid);
}
}
false
}
#[cfg(not(unix))]
{
false
}
}
pub fn ensure_daemon_running() -> anyhow::Result<()> {
#[cfg(unix)]
{
ensure_daemon_running_unix()
}
#[cfg(not(unix))]
{
Ok(())
}
}
pub fn query_daemon(request: &SocketRequest) -> anyhow::Result<Option<SocketResponse>> {
#[cfg(unix)]
{
query_daemon_unix(request)
}
#[cfg(not(unix))]
{
Ok(None)
}
}
pub fn query_agent_state(agent: &str, team: &str) -> anyhow::Result<Option<AgentStateInfo>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": agent, "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<AgentStateInfo>(payload) {
Ok(info) => Ok(Some(info)),
Err(_) => Ok(None),
}
}
pub fn subscribe_to_agent(
subscriber: &str,
agent: &str,
team: &str,
events: &[String],
) -> anyhow::Result<Option<SocketResponse>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "subscribe".to_string(),
payload: serde_json::json!({
"subscriber": subscriber,
"agent": agent,
"team": team,
"events": events,
}),
};
query_daemon(&request)
}
pub fn unsubscribe_from_agent(
subscriber: &str,
agent: &str,
team: &str,
) -> anyhow::Result<Option<SocketResponse>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "unsubscribe".to_string(),
payload: serde_json::json!({
"subscriber": subscriber,
"agent": agent,
"team": team,
}),
};
query_daemon(&request)
}
pub fn query_list_agents() -> anyhow::Result<Option<Vec<AgentSummary>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::Value::Object(Default::default()),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<Vec<AgentSummary>>(payload) {
Ok(agents) => Ok(Some(agents)),
Err(_) => Ok(None),
}
}
pub fn query_list_agents_for_team(team: &str) -> anyhow::Result<Option<Vec<AgentSummary>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::json!({ "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<Vec<AgentSummary>>(payload) {
Ok(agents) => Ok(Some(agents)),
Err(_) => Ok(None),
}
}
pub fn query_team_member_states(team: &str) -> anyhow::Result<Option<Vec<CanonicalMemberState>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::json!({ "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
decode_canonical_member_states_payload(payload).map(Some)
}
fn decode_canonical_member_states_payload(
payload: serde_json::Value,
) -> anyhow::Result<Vec<CanonicalMemberState>> {
serde_json::from_value::<Vec<CanonicalMemberState>>(payload).map_err(|err| {
anyhow::anyhow!(
"invalid canonical member-state payload from daemon list-agents(team): {err}"
)
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentPaneInfo {
pub pane_id: String,
pub log_path: String,
}
pub fn query_agent_pane(agent: &str) -> anyhow::Result<Option<AgentPaneInfo>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-pane".to_string(),
payload: serde_json::json!({ "agent": agent }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<AgentPaneInfo>(payload) {
Ok(info) => Ok(Some(info)),
Err(_) => Ok(None),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionQueryResult {
pub session_id: String,
pub process_id: u32,
pub alive: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pane_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_home: Option<String>,
}
pub fn query_session(name: &str) -> anyhow::Result<Option<SessionQueryResult>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "session-query".to_string(),
payload: serde_json::json!({ "name": name }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<SessionQueryResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(_) => Ok(None),
}
}
pub fn query_session_for_team(
team: &str,
name: &str,
) -> anyhow::Result<Option<SessionQueryResult>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "session-query-team".to_string(),
payload: serde_json::json!({ "team": team, "name": name }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<SessionQueryResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(_) => Ok(None),
}
}
pub fn query_agent_stream_state(
agent: &str,
) -> anyhow::Result<Option<crate::daemon_stream::AgentStreamState>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-stream-state".to_string(),
payload: serde_json::json!({ "agent": agent }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<crate::daemon_stream::AgentStreamState>(payload) {
Ok(state) => Ok(Some(state)),
Err(_) => Ok(None),
}
}
pub struct StreamSubscription {
pub rx: std::sync::mpsc::Receiver<crate::daemon_stream::DaemonStreamEvent>,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for StreamSubscription {
fn drop(&mut self) {
self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn subscribe_stream_events() -> anyhow::Result<Option<StreamSubscription>> {
#[cfg(unix)]
{
subscribe_stream_events_unix()
}
#[cfg(not(unix))]
{
Ok(None)
}
}
pub fn send_control(
request: &crate::control::ControlRequest,
) -> anyhow::Result<crate::control::ControlAck> {
let payload = serde_json::to_value(request)
.map_err(|e| anyhow::anyhow!("Failed to serialize ControlRequest: {e}"))?;
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: format!(
"sock-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
),
command: "control".to_string(),
payload,
};
let response = match query_daemon(&socket_request)? {
Some(r) => r,
None => anyhow::bail!("Daemon not reachable (socket not found or connection refused)"),
};
if !response.is_ok() {
let msg = response
.error
.map(|e| format!("{}: {}", e.code, e.message))
.unwrap_or_else(|| "unknown daemon error".to_string());
anyhow::bail!("Daemon returned error for control command: {msg}");
}
let payload = response
.payload
.ok_or_else(|| anyhow::anyhow!("Daemon returned ok status but no payload"))?;
serde_json::from_value::<crate::control::ControlAck>(payload)
.map_err(|e| anyhow::anyhow!("Failed to parse ControlAck from daemon response: {e}"))
}
fn new_request_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
let id = std::process::id();
format!("req-{id}-{nanos}")
}
#[cfg(unix)]
fn query_daemon_unix(request: &SocketRequest) -> anyhow::Result<Option<SocketResponse>> {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::time::{Duration, Instant};
let socket_path = daemon_socket_path()?;
let stream = match UnixStream::connect(&socket_path) {
Ok(s) => s,
Err(_) => {
if daemon_autostart_enabled() {
ensure_daemon_running_unix()?;
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match UnixStream::connect(&socket_path) {
Ok(s) => break s,
Err(e) if Instant::now() < deadline => {
let _ = e;
std::thread::sleep(Duration::from_millis(100));
}
Err(e) => {
anyhow::bail!(
"daemon auto-start attempted but socket remained unavailable at {}: {e}",
socket_path.display()
)
}
}
}
} else {
return Ok(None);
}
}
};
let timeout = Duration::from_millis(500);
stream.set_read_timeout(Some(timeout)).ok();
stream.set_write_timeout(Some(timeout)).ok();
let request_line = serde_json::to_string(request)?;
{
let mut writer = std::io::BufWriter::new(&stream);
writer.write_all(request_line.as_bytes())?;
writer.write_all(b"\n")?;
writer.flush()?;
}
let mut reader = BufReader::new(&stream);
let mut response_line = String::new();
match reader.read_line(&mut response_line) {
Ok(0) | Err(_) => return Ok(None), Ok(_) => {}
}
let response: SocketResponse = match serde_json::from_str(response_line.trim()) {
Ok(r) => r,
Err(_) => return Ok(None),
};
Ok(Some(response))
}
#[cfg(unix)]
fn daemon_autostart_enabled() -> bool {
matches!(
std::env::var("ATM_DAEMON_AUTOSTART").ok().as_deref(),
Some("1") | Some("true") | Some("TRUE") | Some("yes") | Some("YES")
)
}
#[cfg(unix)]
fn resolve_daemon_binary() -> std::ffi::OsString {
if let Some(override_bin) = std::env::var_os("ATM_DAEMON_BIN")
&& !override_bin.is_empty()
{
return override_bin;
}
let name = std::ffi::OsString::from("atm-daemon");
if let Ok(current_exe) = std::env::current_exe()
&& let Some(dir) = current_exe.parent()
{
let sibling = dir.join(std::path::Path::new(&name));
if sibling.exists() {
return sibling.into_os_string();
}
}
name
}
#[cfg(unix)]
fn ensure_daemon_running_unix() -> anyhow::Result<()> {
use crate::event_log::{EventFields, emit_event_best_effort};
use crate::io::InboxError;
use std::io::ErrorKind;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
if daemon_is_running() {
return Ok(());
}
let home = crate::home::get_home_dir()?;
cleanup_stale_daemon_runtime_files(&home);
let startup_lock_path = home.join(".config/atm/daemon-start.lock");
if let Some(parent) = startup_lock_path.parent() {
std::fs::create_dir_all(parent)?;
}
let _startup_lock = match crate::io::lock::acquire_lock(&startup_lock_path, 3) {
Ok(lock) => Some(lock),
Err(InboxError::LockTimeout { .. }) => {
for _ in 0..10 {
if daemon_is_running() || daemon_socket_connectable(&home) {
return Ok(());
}
std::thread::sleep(Duration::from_millis(100));
}
match crate::io::lock::acquire_lock(&startup_lock_path, 10) {
Ok(lock) => Some(lock),
Err(e) => anyhow::bail!(
"timed out waiting for daemon startup lock holder to bring daemon online: {} ({e})",
startup_lock_path.display()
),
}
}
Err(e) => anyhow::bail!(
"failed to acquire daemon startup lock {}: {e}",
startup_lock_path.display()
),
};
if daemon_is_running() || daemon_socket_connectable(&home) {
return Ok(());
}
let daemon_bin = resolve_daemon_binary();
emit_event_best_effort(EventFields {
level: "info",
source: "atm",
action: "daemon_autostart_attempt",
result: Some("attempt".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
..Default::default()
});
let mut child = Command::new(&daemon_bin)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map_err(|e| {
if e.kind() == ErrorKind::NotFound {
anyhow::anyhow!(
"failed to auto-start daemon: binary '{}' not found in PATH (or ATM_DAEMON_BIN override)",
std::path::PathBuf::from(&daemon_bin).display()
)
} else {
anyhow::anyhow!(
"failed to auto-start daemon via '{}': {e}",
std::path::PathBuf::from(&daemon_bin).display()
)
}
})?;
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if daemon_is_running() || daemon_socket_connectable(&home) {
emit_event_best_effort(EventFields {
level: "info",
source: "atm",
action: "daemon_autostart_success",
result: Some("ok".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
..Default::default()
});
return Ok(());
}
if let Some(status) = child.try_wait()? {
anyhow::bail!("daemon process exited during startup with status {status}");
}
std::thread::sleep(Duration::from_millis(100));
}
let socket_path = home.join(".claude/daemon/atm-daemon.sock");
let pid_path = home.join(".claude/daemon/atm-daemon.pid");
let timeout_error = format!(
"daemon startup timed out after 5s; pid_file_exists={}, socket_exists={}, pid_path={}, socket_path={}",
pid_path.exists(),
socket_path.exists(),
pid_path.display(),
socket_path.display()
);
emit_event_best_effort(EventFields {
level: "warn",
source: "atm",
action: "daemon_autostart_timeout",
result: Some("timeout".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
error: Some(timeout_error.clone()),
..Default::default()
});
anyhow::bail!("{timeout_error}")
}
#[cfg(unix)]
fn daemon_socket_connectable(home: &std::path::Path) -> bool {
use std::os::unix::net::UnixStream;
let socket_path = home.join(".claude/daemon/atm-daemon.sock");
UnixStream::connect(socket_path).is_ok()
}
#[cfg(unix)]
fn cleanup_stale_daemon_runtime_files(home: &std::path::Path) {
let socket_path = home.join(".claude/daemon/atm-daemon.sock");
let pid_path = home.join(".claude/daemon/atm-daemon.pid");
let pid_state = read_daemon_pid_state(&pid_path);
if matches!(
pid_state,
PidState::Dead | PidState::Missing | PidState::Malformed
) {
let _ = std::fs::remove_file(&pid_path);
}
let ownership_known_dead = matches!(
pid_state,
PidState::Dead | PidState::Missing | PidState::Malformed
);
if socket_path.exists() && ownership_known_dead && !daemon_socket_connectable(home) {
let _ = std::fs::remove_file(&socket_path);
}
}
#[cfg(unix)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PidState {
Missing,
Malformed,
Unreadable,
Dead,
Alive,
}
#[cfg(unix)]
fn read_daemon_pid_state(pid_path: &std::path::Path) -> PidState {
if !pid_path.exists() {
return PidState::Missing;
}
let content = match std::fs::read_to_string(pid_path) {
Ok(s) => s,
Err(_) => return PidState::Unreadable,
};
let pid = match content.trim().parse::<i32>() {
Ok(pid) => pid,
Err(_) => return PidState::Malformed,
};
if pid_alive(pid) {
PidState::Alive
} else {
PidState::Dead
}
}
#[cfg(unix)]
fn subscribe_stream_events_unix() -> anyhow::Result<Option<StreamSubscription>> {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
let socket_path = daemon_socket_path()?;
let mut stream = match UnixStream::connect(&socket_path) {
Ok(s) => s,
Err(_) => return Ok(None),
};
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "stream-subscribe".to_string(),
payload: serde_json::json!({}),
};
let req_line = serde_json::to_string(&req)?;
stream.write_all(req_line.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
{
let mut ack_reader = BufReader::new(stream.try_clone()?);
let mut ack_line = String::new();
if ack_reader.read_line(&mut ack_line)? == 0 {
return Ok(None);
}
let ack_json: serde_json::Value = match serde_json::from_str(ack_line.trim()) {
Ok(v) => v,
Err(_) => return Ok(None),
};
let ok = ack_json
.get("status")
.and_then(|v| v.as_str())
.map(|s| s == "ok")
.unwrap_or(false);
let streaming = ack_json
.get("streaming")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !(ok && streaming) {
return Ok(None);
}
}
let (tx, rx) = std::sync::mpsc::channel::<crate::daemon_stream::DaemonStreamEvent>();
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_thread = std::sync::Arc::clone(&stop);
stream
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.ok();
std::thread::spawn(move || {
let mut reader = BufReader::new(stream);
loop {
if stop_thread.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut line = String::new();
let n = match reader.read_line(&mut line) {
Ok(n) => n,
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
continue;
}
Err(_) => break,
};
if n == 0 {
break;
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(event) =
serde_json::from_str::<crate::daemon_stream::DaemonStreamEvent>(trimmed)
{
if tx.send(event).is_err() {
break;
}
}
}
});
Ok(Some(StreamSubscription { rx, stop }))
}
#[cfg(unix)]
fn pid_alive(pid: i32) -> bool {
unsafe extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
let result = unsafe { kill(pid, 0) };
result == 0
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
fn with_autostart_disabled<T>(f: impl FnOnce() -> T) -> T {
let old = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "0") };
let out = f();
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
out
}
#[test]
fn test_socket_request_serialization() {
let req = SocketRequest {
version: 1,
request_id: "req-123".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": "arch-ctm", "team": "atm-dev" }),
};
let json = serde_json::to_string(&req).unwrap();
let decoded: SocketRequest = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.version, 1);
assert_eq!(decoded.request_id, "req-123");
assert_eq!(decoded.command, "agent-state");
}
#[test]
fn test_socket_response_ok_deserialization() {
let json = r#"{"version":1,"request_id":"req-123","status":"ok","payload":{"state":"idle","last_transition":"2026-02-16T22:30:00Z"}}"#;
let resp: SocketResponse = serde_json::from_str(json).unwrap();
assert!(resp.is_ok());
assert_eq!(resp.request_id, "req-123");
assert!(resp.payload.is_some());
assert!(resp.error.is_none());
}
#[test]
fn test_socket_response_error_deserialization() {
let json = r#"{"version":1,"request_id":"req-456","status":"error","error":{"code":"AGENT_NOT_FOUND","message":"Agent 'unknown' is not tracked"}}"#;
let resp: SocketResponse = serde_json::from_str(json).unwrap();
assert!(!resp.is_ok());
let err = resp.error.unwrap();
assert_eq!(err.code, "AGENT_NOT_FOUND");
}
#[test]
fn test_agent_state_info_deserialization() {
let json = r#"{"state":"idle","last_transition":"2026-02-16T22:30:00Z"}"#;
let info: AgentStateInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.state, "idle");
assert_eq!(
info.last_transition.as_deref(),
Some("2026-02-16T22:30:00Z")
);
}
#[test]
fn test_agent_state_info_missing_transition() {
let json = r#"{"state":"launching"}"#;
let info: AgentStateInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.state, "launching");
assert!(info.last_transition.is_none());
}
#[test]
#[serial]
fn test_query_daemon_no_socket_returns_none() {
with_autostart_disabled(|| {
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "req-test".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({}),
};
let result = query_daemon(&req);
assert!(result.is_ok());
});
}
#[cfg(unix)]
#[test]
#[serial]
fn test_daemon_autostart_flag_parsing() {
let old = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "1") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "true") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "0") };
assert!(!daemon_autostart_enabled());
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_resolve_daemon_binary_honors_override() {
let old = std::env::var("ATM_DAEMON_BIN").ok();
let tmp = tempfile::tempdir().unwrap();
let custom = tmp.path().join("custom-atm-daemon");
std::fs::write(&custom, "#!/bin/sh\nexit 0\n").unwrap();
unsafe { std::env::set_var("ATM_DAEMON_BIN", &custom) };
let resolved = resolve_daemon_binary();
assert_eq!(std::path::PathBuf::from(resolved), custom);
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_removes_dead_pid_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".claude/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
fs::write(&pid_path, "999999\n").unwrap();
assert!(pid_path.exists());
cleanup_stale_daemon_runtime_files(home);
assert!(
!pid_path.exists(),
"stale PID file should be removed when PID is not alive"
);
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_handles_malformed_pid() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".claude/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
let socket_path = daemon_dir.join("atm-daemon.sock");
fs::write(&pid_path, "not-a-pid\n").unwrap();
fs::write(&socket_path, "stale").unwrap();
cleanup_stale_daemon_runtime_files(home);
assert!(
!pid_path.exists(),
"malformed PID file should be removed during cleanup"
);
assert!(
!socket_path.exists(),
"stale socket should be removed when PID ownership is known-dead"
);
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_unreadable_pid_does_not_remove_socket() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".claude/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
let socket_path = daemon_dir.join("atm-daemon.sock");
fs::write(&pid_path, "123\n").unwrap();
fs::write(&socket_path, "stale").unwrap();
let mut perms = fs::metadata(&pid_path).unwrap().permissions();
perms.set_mode(0o000);
fs::set_permissions(&pid_path, perms).unwrap();
cleanup_stale_daemon_runtime_files(home);
assert!(
socket_path.exists(),
"socket must not be removed when PID ownership cannot be read"
);
let mut restore = fs::metadata(&pid_path).unwrap().permissions();
restore.set_mode(0o600);
fs::set_permissions(&pid_path, restore).unwrap();
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_timeout_when_spawned_process_never_creates_runtime_files() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
let script_path = home.join("fake-daemon-never-ready.sh");
let script = r#"#!/bin/sh
set -eu
sleep 10
"#;
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
let err = ensure_daemon_running_unix().expect_err("startup should time out");
let msg = err.to_string();
assert!(
msg.contains("daemon startup timed out after 5s"),
"timeout error should include actionable timeout details: {msg}"
);
assert!(
msg.contains("pid_path="),
"timeout error should include pid path"
);
assert!(
msg.contains("socket_path="),
"timeout error should include socket path"
);
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_serializes_concurrent_start() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::thread;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
let script_path = home.join("fake-daemon.sh");
let script = r#"#!/bin/sh
set -eu
home="${ATM_HOME:?}"
mkdir -p "$home/.claude/daemon"
mkdir -p "$home/spawn-markers"
touch "$home/spawn-markers/spawn.$$"
echo $$ > "$home/.claude/daemon/atm-daemon.pid"
sleep 2
"#;
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
let mut handles = Vec::new();
let barrier = Arc::new(std::sync::Barrier::new(2));
for _ in 0..2 {
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
ensure_daemon_running_unix().unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let count = fs::read_dir(home.join("spawn-markers"))
.ok()
.into_iter()
.flatten()
.filter_map(Result::ok)
.count();
assert_eq!(
count, 1,
"concurrent startup attempts should spawn at most one daemon process"
);
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[test]
fn test_new_request_id_is_unique() {
let id1 = new_request_id();
std::thread::sleep(std::time::Duration::from_nanos(1000));
let id2 = new_request_id();
assert!(!id1.is_empty());
assert!(!id2.is_empty());
}
#[test]
fn test_daemon_socket_path_contains_expected_suffix() {
let path = daemon_socket_path().unwrap();
assert!(path.to_string_lossy().ends_with("atm-daemon.sock"));
assert!(path.to_string_lossy().contains(".claude/daemon"));
}
#[test]
fn test_daemon_pid_path_contains_expected_suffix() {
let path = daemon_pid_path().unwrap();
assert!(path.to_string_lossy().ends_with("atm-daemon.pid"));
assert!(path.to_string_lossy().contains(".claude/daemon"));
}
#[test]
#[serial]
fn test_query_agent_state_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_agent_state("arch-ctm", "atm-dev");
assert!(result.is_ok());
});
}
#[test]
#[serial]
fn test_query_team_member_states_offline_returns_none() {
with_autostart_disabled(|| {
let tmp = tempfile::tempdir().expect("tempdir");
let old_home = std::env::var("ATM_HOME").ok();
unsafe { std::env::set_var("ATM_HOME", tmp.path()) };
let result = query_team_member_states("atm-dev");
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
}
assert!(
matches!(result, Ok(None)),
"offline daemon must map to Ok(None), got: {result:?}"
);
});
}
#[cfg(unix)]
#[test]
#[serial]
fn test_query_team_member_states_invalid_payload_returns_err() {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixListener;
with_autostart_disabled(|| {
let tmp = tempfile::tempdir().expect("tempdir");
let daemon_dir = tmp.path().join(".claude/daemon");
std::fs::create_dir_all(&daemon_dir).expect("create daemon dir");
let socket_path = daemon_dir.join("atm-daemon.sock");
let listener = UnixListener::bind(&socket_path).expect("bind socket");
let handle = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept");
let mut request_line = String::new();
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
reader.read_line(&mut request_line).expect("read request");
assert!(
request_line.contains("\"command\":\"list-agents\""),
"expected list-agents request, got: {request_line}"
);
let response = SocketResponse {
version: PROTOCOL_VERSION,
request_id: "req-test".to_string(),
status: "ok".to_string(),
payload: Some(serde_json::json!({
"agent": "arch-ctm",
"state": "active"
})),
error: None,
};
let line = serde_json::to_string(&response).expect("serialize response");
stream.write_all(line.as_bytes()).expect("write response");
stream.write_all(b"\n").expect("write newline");
});
let old_home = std::env::var("ATM_HOME").ok();
unsafe { std::env::set_var("ATM_HOME", tmp.path()) };
let result = query_team_member_states("atm-dev");
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
}
handle.join().expect("mock daemon thread");
let err = result.expect_err("invalid payload must return Err");
assert!(
err.to_string()
.contains("invalid canonical member-state payload"),
"unexpected error: {err}"
);
});
}
#[test]
fn test_agent_pane_info_deserialization() {
let json = r#"{"pane_id":"%42","log_path":"/home/user/.claude/logs/arch-ctm.log"}"#;
let info: AgentPaneInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.pane_id, "%42");
assert_eq!(info.log_path, "/home/user/.claude/logs/arch-ctm.log");
}
#[test]
#[serial]
fn test_query_agent_pane_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_agent_pane("arch-ctm");
assert!(result.is_ok());
});
}
#[test]
fn test_launch_config_serialization() {
let mut env_vars = std::collections::HashMap::new();
env_vars.insert("EXTRA_VAR".to_string(), "value".to_string());
let config = LaunchConfig {
agent: "arch-ctm".to_string(),
team: "atm-dev".to_string(),
command: "codex --yolo".to_string(),
prompt: Some("Review the bridge module".to_string()),
timeout_secs: 30,
env_vars,
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let json = serde_json::to_string(&config).unwrap();
let decoded: LaunchConfig = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.team, "atm-dev");
assert_eq!(decoded.command, "codex --yolo");
assert_eq!(decoded.prompt.as_deref(), Some("Review the bridge module"));
assert_eq!(decoded.timeout_secs, 30);
assert_eq!(decoded.runtime.as_deref(), Some("codex"));
assert!(decoded.resume_session_id.is_none());
assert_eq!(
decoded.env_vars.get("EXTRA_VAR").map(String::as_str),
Some("value")
);
}
#[test]
fn test_launch_config_no_prompt_serialization() {
let config = LaunchConfig {
agent: "worker-1".to_string(),
team: "my-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 60,
env_vars: std::collections::HashMap::new(),
runtime: None,
resume_session_id: Some("sess-123".to_string()),
};
let json = serde_json::to_string(&config).unwrap();
let decoded: LaunchConfig = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "worker-1");
assert!(decoded.prompt.is_none());
assert!(decoded.env_vars.is_empty());
assert!(decoded.runtime.is_none());
assert_eq!(decoded.resume_session_id.as_deref(), Some("sess-123"));
}
#[test]
fn test_launch_result_serialization() {
let result = LaunchResult {
agent: "arch-ctm".to_string(),
pane_id: "%42".to_string(),
state: "launching".to_string(),
warning: None,
};
let json = serde_json::to_string(&result).unwrap();
let decoded: LaunchResult = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.pane_id, "%42");
assert_eq!(decoded.state, "launching");
assert!(decoded.warning.is_none());
}
#[test]
fn test_launch_result_with_warning_serialization() {
let result = LaunchResult {
agent: "arch-ctm".to_string(),
pane_id: "%7".to_string(),
state: "launching".to_string(),
warning: Some("Readiness timeout reached".to_string()),
};
let json = serde_json::to_string(&result).unwrap();
let decoded: LaunchResult = serde_json::from_str(&json).unwrap();
assert_eq!(
decoded.warning.as_deref(),
Some("Readiness timeout reached")
);
}
#[test]
fn test_session_query_result_serialization() {
let result = SessionQueryResult {
session_id: "abc123".to_string(),
process_id: 12345,
alive: true,
runtime: None,
runtime_session_id: None,
pane_id: None,
runtime_home: None,
};
let json = serde_json::to_string(&result).unwrap();
let decoded: SessionQueryResult = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.session_id, "abc123");
assert_eq!(decoded.process_id, 12345);
assert!(decoded.alive);
}
#[test]
fn test_session_query_result_dead() {
let json = r#"{"session_id":"xyz789","process_id":99,"alive":false}"#;
let result: SessionQueryResult = serde_json::from_str(json).unwrap();
assert_eq!(result.session_id, "xyz789");
assert_eq!(result.process_id, 99);
assert!(!result.alive);
assert!(result.runtime.is_none());
assert!(result.runtime_session_id.is_none());
}
#[test]
#[serial]
fn test_query_session_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_session("team-lead");
assert!(result.is_ok());
});
}
#[test]
#[serial]
fn test_launch_agent_no_daemon_returns_none() {
with_autostart_disabled(|| {
if daemon_is_running() {
return;
}
let config = LaunchConfig {
agent: "test-agent".to_string(),
team: "test-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 5,
env_vars: std::collections::HashMap::new(),
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let result = launch_agent(&config);
assert!(result.is_ok());
});
}
#[test]
fn test_agent_summary_serialization() {
let summary = AgentSummary {
agent: "arch-ctm".to_string(),
state: "idle".to_string(),
};
let json = serde_json::to_string(&summary).unwrap();
let decoded: AgentSummary = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.state, "idle");
}
#[test]
fn test_canonical_member_state_serialization() {
let state = CanonicalMemberState {
agent: "arch-ctm".to_string(),
state: "active".to_string(),
activity: "busy".to_string(),
session_id: Some("sess-123".to_string()),
process_id: Some(4242),
reason: "session active with live pid".to_string(),
source: "session_registry".to_string(),
};
let json = serde_json::to_string(&state).unwrap();
let decoded: CanonicalMemberState = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.state, "active");
assert_eq!(decoded.activity, "busy");
assert_eq!(decoded.session_id.as_deref(), Some("sess-123"));
assert_eq!(decoded.process_id, Some(4242));
}
#[test]
fn test_canonical_status_activity_labels_and_liveness() {
let active = CanonicalMemberState {
agent: "arch-ctm".to_string(),
state: "active".to_string(),
activity: "busy".to_string(),
session_id: None,
process_id: None,
reason: String::new(),
source: String::new(),
};
let idle = CanonicalMemberState {
state: "idle".to_string(),
activity: "idle".to_string(),
..active.clone()
};
let dead = CanonicalMemberState {
state: "offline".to_string(),
activity: "unknown".to_string(),
..active.clone()
};
assert_eq!(canonical_status_label(Some(&active)), "Active");
assert_eq!(canonical_status_label(Some(&idle)), "Idle");
assert_eq!(canonical_status_label(Some(&dead)), "Dead");
assert_eq!(canonical_status_label(None), "Unknown");
assert_eq!(canonical_activity_label(Some(&active)), "Busy");
assert_eq!(canonical_activity_label(Some(&idle)), "Idle");
assert_eq!(canonical_activity_label(Some(&dead)), "Unknown");
assert_eq!(canonical_activity_label(None), "Unknown");
assert_eq!(canonical_liveness_bool(Some(&active)), Some(true));
assert_eq!(canonical_liveness_bool(Some(&idle)), Some(true));
assert_eq!(canonical_liveness_bool(Some(&dead)), Some(false));
assert_eq!(canonical_liveness_bool(None), None);
}
#[test]
fn test_decode_canonical_member_states_payload_rejects_invalid_schema() {
let invalid = serde_json::json!({
"agent": "arch-ctm",
"state": "active"
});
let err = decode_canonical_member_states_payload(invalid).unwrap_err();
assert!(
err.to_string()
.contains("invalid canonical member-state payload")
);
}
#[test]
fn test_decode_canonical_member_states_payload_accepts_valid_schema() {
let valid = serde_json::json!([
{
"agent": "arch-ctm",
"state": "active",
"activity": "busy",
"session_id": "sess-1",
"process_id": 1234,
"reason": "session active",
"source": "session_registry"
}
]);
let states = decode_canonical_member_states_payload(valid).expect("valid payload");
assert_eq!(states.len(), 1);
assert_eq!(states[0].agent, "arch-ctm");
assert_eq!(states[0].state, "active");
}
#[cfg(unix)]
#[test]
fn test_pid_alive_current_process() {
let pid = std::process::id() as i32;
assert!(pid_alive(pid));
}
#[cfg(unix)]
#[test]
fn test_pid_alive_nonexistent_pid() {
assert!(!pid_alive(i32::MAX));
}
#[test]
#[serial]
fn test_ensure_daemon_running_reads_atm_daemon_bin() {
if daemon_is_running() {
return;
}
unsafe {
std::env::set_var("ATM_DAEMON_BIN", "/nonexistent-bin-for-atm-test");
}
let result = ensure_daemon_running();
unsafe {
std::env::remove_var("ATM_DAEMON_BIN");
}
#[cfg(unix)]
assert!(
result.is_err(),
"spawn of nonexistent binary must return Err on Unix"
);
#[cfg(not(unix))]
assert!(
result.is_ok(),
"ensure_daemon_running is a no-op on non-Unix"
);
}
#[test]
fn lifecycle_source_kind_serializes_snake_case() {
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::ClaudeHook).unwrap(),
"\"claude_hook\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::AtmMcp).unwrap(),
"\"atm_mcp\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::AgentHook).unwrap(),
"\"agent_hook\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::Unknown).unwrap(),
"\"unknown\""
);
}
#[test]
fn lifecycle_source_kind_deserializes_snake_case() {
let kind: LifecycleSourceKind = serde_json::from_str("\"claude_hook\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::ClaudeHook);
let kind: LifecycleSourceKind = serde_json::from_str("\"atm_mcp\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::AtmMcp);
let kind: LifecycleSourceKind = serde_json::from_str("\"agent_hook\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::AgentHook);
let kind: LifecycleSourceKind = serde_json::from_str("\"unknown\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::Unknown);
}
#[test]
fn lifecycle_source_round_trip() {
let src = LifecycleSource::new(LifecycleSourceKind::AtmMcp);
let json = serde_json::to_string(&src).unwrap();
assert!(json.contains("\"atm_mcp\""), "serialized: {json}");
let decoded: LifecycleSource = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.kind, LifecycleSourceKind::AtmMcp);
}
#[test]
fn hook_event_payload_without_source_is_backward_compatible() {
let json = r#"{
"version": 1,
"request_id": "req-test",
"command": "hook-event",
"payload": {
"event": "session_start",
"agent": "team-lead",
"team": "atm-dev",
"session_id": "abc-123"
}
}"#;
let req: SocketRequest = serde_json::from_str(json).unwrap();
assert!(req.payload.get("source").is_none());
assert_eq!(req.command, "hook-event");
}
#[test]
fn hook_event_payload_with_atm_mcp_source_parses() {
let json = r#"{
"version": 1,
"request_id": "req-mcp",
"command": "hook-event",
"payload": {
"event": "session_start",
"agent": "arch-ctm",
"team": "atm-dev",
"session_id": "codex:abc-123",
"source": {"kind": "atm_mcp"}
}
}"#;
let req: SocketRequest = serde_json::from_str(json).unwrap();
let source: LifecycleSource =
serde_json::from_value(req.payload["source"].clone()).unwrap();
assert_eq!(source.kind, LifecycleSourceKind::AtmMcp);
}
#[test]
fn test_send_control_no_daemon_returns_err() {
if daemon_is_running() {
return;
}
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-test-ctrl".to_string(),
msg_type: "control.stdin.request".to_string(),
signal: None,
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Stdin,
payload: Some("hello".to_string()),
content_ref: None,
elicitation_id: None,
decision: None,
};
let result = send_control(&req);
assert!(
result.is_err(),
"send_control should return Err when daemon is not running"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_daemon_returns_ok_none() {
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "req-win-test".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": "arch-ctm", "team": "atm-dev" }),
};
let result = query_daemon(&req);
assert!(
result.is_ok(),
"query_daemon must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_daemon must return Ok(None) on Windows (no Unix socket available)"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_is_running_returns_false() {
assert!(
!daemon_is_running(),
"daemon_is_running must return false on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_subscribe_stream_events_returns_ok_none() {
let result = subscribe_stream_events();
assert!(
result.is_ok(),
"subscribe_stream_events must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"subscribe_stream_events must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_agent_state_returns_ok_none() {
let result = query_agent_state("arch-ctm", "atm-dev");
assert!(
result.is_ok(),
"query_agent_state must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_agent_state must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_session_returns_ok_none() {
let result = query_session("team-lead");
assert!(
result.is_ok(),
"query_session must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_session must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_launch_agent_returns_ok_none() {
let config = LaunchConfig {
agent: "test-agent".to_string(),
team: "test-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 5,
env_vars: std::collections::HashMap::new(),
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let result = launch_agent(&config);
assert!(
result.is_ok(),
"launch_agent must not return Err on Windows (no daemon socket)"
);
assert!(
result.unwrap().is_none(),
"launch_agent must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_startup_lock_acquires_and_releases() {
use crate::io::lock::acquire_lock;
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let lock_dir = tmp.path().join("config").join("atm");
fs::create_dir_all(&lock_dir).unwrap();
let lock_path = lock_dir.join("daemon-start.lock");
let lock = acquire_lock(&lock_path, 3);
assert!(
lock.is_ok(),
"startup lock must be acquirable on Windows: {:?}",
lock.err()
);
drop(lock.unwrap());
let lock2 = acquire_lock(&lock_path, 1);
assert!(
lock2.is_ok(),
"startup lock must be re-acquirable after release on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_socket_path_has_correct_suffix() {
let path = daemon_socket_path().unwrap();
let s = path.to_string_lossy();
assert!(
s.ends_with("atm-daemon.sock"),
"daemon_socket_path must end with 'atm-daemon.sock' on Windows, got: {s}"
);
assert!(
s.contains(".claude") && s.contains("daemon"),
"daemon_socket_path must contain '.claude/daemon' on Windows, got: {s}"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_pid_path_has_correct_suffix() {
let path = daemon_pid_path().unwrap();
let s = path.to_string_lossy();
assert!(
s.ends_with("atm-daemon.pid"),
"daemon_pid_path must end with 'atm-daemon.pid' on Windows, got: {s}"
);
assert!(
s.contains(".claude") && s.contains("daemon"),
"daemon_pid_path must contain '.claude/daemon' on Windows, got: {s}"
);
}
#[cfg(windows)]
#[test]
fn windows_send_control_no_daemon_returns_err() {
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-win-ctrl".to_string(),
msg_type: "control.stdin.request".to_string(),
signal: None,
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Stdin,
payload: Some("hello".to_string()),
content_ref: None,
elicitation_id: None,
decision: None,
};
let result = send_control(&req);
assert!(
result.is_err(),
"send_control must return Err on Windows when daemon is not reachable"
);
}
#[test]
fn test_send_control_builds_correct_socket_request() {
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-ctrl-check".to_string(),
msg_type: "control.interrupt.request".to_string(),
signal: Some("interrupt".to_string()),
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Interrupt,
payload: None,
content_ref: None,
elicitation_id: None,
decision: None,
};
let control_payload = serde_json::to_value(&req).expect("serialize ControlRequest");
let socket_req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "sock-test-123".to_string(),
command: "control".to_string(),
payload: control_payload,
};
assert_ne!(
socket_req.request_id, req.request_id,
"socket-level request_id must differ from control payload request_id"
);
assert_eq!(socket_req.request_id, "sock-test-123");
let json = serde_json::to_string(&socket_req).expect("serialize SocketRequest");
assert!(
json.contains("\"command\":\"control\""),
"command field missing"
);
assert!(
json.contains("\"request_id\":\"req-ctrl-check\""),
"control payload request_id must appear inside the payload body"
);
assert!(
json.contains("\"request_id\":\"sock-test-123\""),
"socket-level request_id must appear in the outer envelope"
);
assert!(
json.contains("\"type\":\"control.interrupt.request\""),
"msg_type field missing from control payload"
);
assert!(json.contains("\"interrupt\""), "interrupt signal missing");
}
}