use serde::{Deserialize, Serialize};
use std::path::PathBuf;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DaemonLockMetadata {
pub pid: u32,
pub executable_path: String,
pub home_scope: String,
pub version: String,
pub written_at: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LifecycleSource {
pub kind: LifecycleSourceKind,
}
impl LifecycleSource {
pub fn new(kind: LifecycleSourceKind) -> Self {
Self { kind }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LifecycleSourceKind {
ClaudeHook,
AtmMcp,
AgentHook,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketRequest {
pub version: u32,
pub request_id: String,
pub command: String,
pub payload: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketResponse {
pub version: u32,
pub request_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<SocketError>,
}
impl SocketResponse {
pub fn is_ok(&self) -> bool {
self.status == "ok"
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SocketError {
pub code: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStateInfo {
pub state: String,
pub last_transition: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentSummary {
pub agent: String,
pub state: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CanonicalMemberState {
pub agent: String,
pub state: String,
#[serde(default)]
pub activity: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub process_id: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_alive_at: Option<String>,
#[serde(default)]
pub reason: String,
#[serde(default)]
pub source: String,
#[serde(default = "default_in_config_true", skip_serializing_if = "is_true")]
pub in_config: bool,
}
fn default_in_config_true() -> bool {
true
}
fn is_true(value: &bool) -> bool {
*value
}
pub fn canonical_status_label(state: Option<&CanonicalMemberState>) -> &'static str {
match state.map(|s| s.state.as_str()) {
Some("active") => "Active",
Some("idle") => "Idle",
Some("offline") | Some("dead") => "Dead",
_ => "Unknown",
}
}
pub fn canonical_activity_label(state: Option<&CanonicalMemberState>) -> &'static str {
match state.map(|s| s.activity.as_str()) {
Some("busy") => "Busy",
Some("idle") => "Idle",
_ => "Unknown",
}
}
pub fn canonical_liveness_bool(state: Option<&CanonicalMemberState>) -> Option<bool> {
match state.map(|s| s.state.as_str()) {
Some("active") | Some("idle") => Some(true),
Some("offline") | Some("dead") => Some(false),
_ => None,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchConfig {
pub agent: String,
pub team: String,
pub command: String,
pub prompt: Option<String>,
pub timeout_secs: u32,
pub env_vars: std::collections::HashMap<String, String>,
#[serde(default)]
pub runtime: Option<String>,
#[serde(default)]
pub resume_session_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchResult {
pub agent: String,
pub pane_id: String,
pub state: String,
pub warning: Option<String>,
}
pub fn launch_agent(config: &LaunchConfig) -> anyhow::Result<Option<LaunchResult>> {
let payload = match serde_json::to_value(config) {
Ok(v) => v,
Err(e) => anyhow::bail!("Failed to serialize LaunchConfig: {e}"),
};
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "launch".to_string(),
payload,
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
let msg = response
.error
.map(|e| format!("{}: {}", e.code, e.message))
.unwrap_or_else(|| "unknown daemon error".to_string());
anyhow::bail!("Daemon returned error for launch command: {msg}");
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<LaunchResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(e) => anyhow::bail!("Failed to parse LaunchResult from daemon response: {e}"),
}
}
pub fn daemon_runtime_dir() -> anyhow::Result<PathBuf> {
let home = crate::home::get_home_dir()?;
Ok(home.join(".atm/daemon"))
}
pub fn daemon_runtime_dir_for(home: &std::path::Path) -> PathBuf {
home.join(".atm/daemon")
}
pub fn daemon_socket_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("atm-daemon.sock"))
}
pub fn daemon_pid_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("atm-daemon.pid"))
}
pub fn daemon_status_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("status.json"))
}
pub fn daemon_status_path_for(home: &std::path::Path) -> PathBuf {
daemon_runtime_dir_for(home).join("status.json")
}
pub fn daemon_lock_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("daemon.lock"))
}
pub fn daemon_lock_metadata_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("daemon.lock.meta.json"))
}
pub fn daemon_lock_metadata_path_for(home: &std::path::Path) -> PathBuf {
daemon_runtime_dir_for(home).join("daemon.lock.meta.json")
}
pub fn daemon_start_lock_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("daemon-start.lock"))
}
pub fn daemon_dedup_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("dedup.jsonl"))
}
pub fn daemon_gh_monitor_health_path() -> anyhow::Result<PathBuf> {
Ok(daemon_runtime_dir()?.join("gh-monitor-health.json"))
}
pub fn daemon_gh_monitor_health_path_for(home: &std::path::Path) -> PathBuf {
daemon_runtime_dir_for(home).join("gh-monitor-health.json")
}
pub fn write_daemon_lock_metadata(home: &std::path::Path, version: &str) -> anyhow::Result<()> {
let metadata_path = daemon_lock_metadata_path_for(home);
if let Some(parent) = metadata_path.parent() {
std::fs::create_dir_all(parent)?;
}
let executable_path = std::env::current_exe()
.ok()
.and_then(|p| std::fs::canonicalize(p).ok())
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| "<unknown>".to_string());
let home_scope = std::fs::canonicalize(home)
.unwrap_or_else(|_| home.to_path_buf())
.to_string_lossy()
.to_string();
let metadata = DaemonLockMetadata {
pid: std::process::id(),
executable_path,
home_scope,
version: version.to_string(),
written_at: chrono::Utc::now().to_rfc3339(),
};
let json = serde_json::to_vec_pretty(&metadata)?;
let tmp = metadata_path.with_extension("json.tmp");
std::fs::write(&tmp, json)?;
std::fs::rename(tmp, metadata_path)?;
Ok(())
}
pub fn daemon_is_running() -> bool {
#[cfg(unix)]
{
let pid_path = match daemon_pid_path() {
Ok(p) => p,
Err(_) => return false,
};
if let Ok(content) = std::fs::read_to_string(&pid_path) {
if let Ok(pid) = content.trim().parse::<i32>() {
return pid_alive(pid);
}
}
false
}
#[cfg(not(unix))]
{
false
}
}
pub fn ensure_daemon_running() -> anyhow::Result<()> {
#[cfg(unix)]
{
ensure_daemon_running_unix()
}
#[cfg(not(unix))]
{
Ok(())
}
}
pub fn query_daemon(request: &SocketRequest) -> anyhow::Result<Option<SocketResponse>> {
#[cfg(unix)]
{
query_daemon_unix(request, std::time::Duration::from_millis(500))
}
#[cfg(not(unix))]
{
Ok(None)
}
}
pub fn query_daemon_with_timeout(
request: &SocketRequest,
read_timeout: std::time::Duration,
) -> anyhow::Result<Option<SocketResponse>> {
#[cfg(unix)]
{
query_daemon_unix(request, read_timeout)
}
#[cfg(not(unix))]
{
Ok(None)
}
}
pub fn query_agent_state(agent: &str, team: &str) -> anyhow::Result<Option<AgentStateInfo>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": agent, "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<AgentStateInfo>(payload) {
Ok(info) => Ok(Some(info)),
Err(_) => Ok(None),
}
}
pub fn subscribe_to_agent(
subscriber: &str,
agent: &str,
team: &str,
events: &[String],
) -> anyhow::Result<Option<SocketResponse>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "subscribe".to_string(),
payload: serde_json::json!({
"subscriber": subscriber,
"agent": agent,
"team": team,
"events": events,
}),
};
query_daemon(&request)
}
pub fn unsubscribe_from_agent(
subscriber: &str,
agent: &str,
team: &str,
) -> anyhow::Result<Option<SocketResponse>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "unsubscribe".to_string(),
payload: serde_json::json!({
"subscriber": subscriber,
"agent": agent,
"team": team,
}),
};
query_daemon(&request)
}
pub fn query_list_agents() -> anyhow::Result<Option<Vec<AgentSummary>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::Value::Object(Default::default()),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<Vec<AgentSummary>>(payload) {
Ok(agents) => Ok(Some(agents)),
Err(_) => Ok(None),
}
}
pub fn query_list_agents_for_team(team: &str) -> anyhow::Result<Option<Vec<AgentSummary>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::json!({ "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<Vec<AgentSummary>>(payload) {
Ok(agents) => Ok(Some(agents)),
Err(_) => Ok(None),
}
}
pub fn query_team_member_states(team: &str) -> anyhow::Result<Option<Vec<CanonicalMemberState>>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "list-agents".to_string(),
payload: serde_json::json!({ "team": team }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
decode_canonical_member_states_payload(payload).map(Some)
}
fn decode_canonical_member_states_payload(
payload: serde_json::Value,
) -> anyhow::Result<Vec<CanonicalMemberState>> {
serde_json::from_value::<Vec<CanonicalMemberState>>(payload).map_err(|err| {
anyhow::anyhow!(
"invalid canonical member-state payload from daemon list-agents(team): {err}"
)
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentPaneInfo {
pub pane_id: String,
pub log_path: String,
}
pub fn query_agent_pane(agent: &str) -> anyhow::Result<Option<AgentPaneInfo>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-pane".to_string(),
payload: serde_json::json!({ "agent": agent }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<AgentPaneInfo>(payload) {
Ok(info) => Ok(Some(info)),
Err(_) => Ok(None),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionQueryResult {
pub session_id: String,
pub process_id: u32,
pub alive: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_seen_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pane_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_home: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegisterHintOutcome {
Registered,
DaemonUnavailable,
UnsupportedDaemon,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum GhMonitorTargetKind {
Pr,
Workflow,
Run,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GhMonitorRequest {
pub team: String,
pub target_kind: GhMonitorTargetKind,
pub target: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reference: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub start_timeout_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_cwd: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GhStatusRequest {
pub team: String,
pub target_kind: GhMonitorTargetKind,
pub target: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reference: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_cwd: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum GhMonitorLifecycleAction {
Start,
Stop,
Restart,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GhMonitorControlRequest {
pub team: String,
pub action: GhMonitorLifecycleAction,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub drain_timeout_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_cwd: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GhMonitorHealth {
pub team: String,
#[serde(default)]
pub configured: bool,
#[serde(default)]
pub enabled: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_path: Option<String>,
pub lifecycle_state: String,
pub availability_state: String,
pub in_flight: u64,
pub updated_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GhMonitorStatus {
pub team: String,
#[serde(default)]
pub configured: bool,
#[serde(default)]
pub enabled: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_path: Option<String>,
pub target_kind: GhMonitorTargetKind,
pub target: String,
pub state: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub run_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reference: Option<String>,
pub updated_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
pub fn query_session(name: &str) -> anyhow::Result<Option<SessionQueryResult>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "session-query".to_string(),
payload: serde_json::json!({ "name": name }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<SessionQueryResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(_) => Ok(None),
}
}
pub fn query_session_for_team(
team: &str,
name: &str,
) -> anyhow::Result<Option<SessionQueryResult>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "session-query-team".to_string(),
payload: serde_json::json!({ "team": team, "name": name }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<SessionQueryResult>(payload) {
Ok(result) => Ok(Some(result)),
Err(_) => Ok(None),
}
}
pub fn query_agent_stream_state(
agent: &str,
) -> anyhow::Result<Option<crate::daemon_stream::AgentStreamState>> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "agent-stream-state".to_string(),
payload: serde_json::json!({ "agent": agent }),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(None),
};
if !response.is_ok() {
return Ok(None);
}
let payload = match response.payload {
Some(p) => p,
None => return Ok(None),
};
match serde_json::from_value::<crate::daemon_stream::AgentStreamState>(payload) {
Ok(state) => Ok(Some(state)),
Err(_) => Ok(None),
}
}
pub struct StreamSubscription {
pub rx: std::sync::mpsc::Receiver<crate::daemon_stream::DaemonStreamEvent>,
stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for StreamSubscription {
fn drop(&mut self) {
self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn subscribe_stream_events() -> anyhow::Result<Option<StreamSubscription>> {
#[cfg(unix)]
{
subscribe_stream_events_unix()
}
#[cfg(not(unix))]
{
Ok(None)
}
}
pub fn send_control(
request: &crate::control::ControlRequest,
) -> anyhow::Result<crate::control::ControlAck> {
let payload = serde_json::to_value(request)
.map_err(|e| anyhow::anyhow!("Failed to serialize ControlRequest: {e}"))?;
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: format!(
"sock-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
),
command: "control".to_string(),
payload,
};
let response = match query_daemon(&socket_request)? {
Some(r) => r,
None => anyhow::bail!("Daemon not reachable (socket not found or connection refused)"),
};
if !response.is_ok() {
let msg = response
.error
.map(|e| format!("{}: {}", e.code, e.message))
.unwrap_or_else(|| "unknown daemon error".to_string());
anyhow::bail!("Daemon returned error for control command: {msg}");
}
let payload = response
.payload
.ok_or_else(|| anyhow::anyhow!("Daemon returned ok status but no payload"))?;
serde_json::from_value::<crate::control::ControlAck>(payload)
.map_err(|e| anyhow::anyhow!("Failed to parse ControlAck from daemon response: {e}"))
}
#[allow(clippy::too_many_arguments)]
pub fn register_hint(
team: &str,
agent: &str,
session_id: &str,
process_id: u32,
runtime: Option<&str>,
runtime_session_id: Option<&str>,
pane_id: Option<&str>,
runtime_home: Option<&str>,
) -> anyhow::Result<RegisterHintOutcome> {
let request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "register-hint".to_string(),
payload: serde_json::json!({
"team": team,
"agent": agent,
"session_id": session_id,
"process_id": process_id,
"runtime": runtime,
"runtime_session_id": runtime_session_id,
"pane_id": pane_id,
"runtime_home": runtime_home,
"identity": std::env::var("ATM_IDENTITY")
.ok()
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty()),
}),
};
let response = match query_daemon(&request)? {
Some(r) => r,
None => return Ok(RegisterHintOutcome::DaemonUnavailable),
};
decode_register_hint_response(response)
}
pub fn gh_monitor(request: &GhMonitorRequest) -> anyhow::Result<Option<GhMonitorStatus>> {
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "gh-monitor".to_string(),
payload: serde_json::to_value(request)?,
};
let start_timeout_secs = request.start_timeout_secs.unwrap_or(120);
let read_timeout = std::time::Duration::from_secs((start_timeout_secs + 30).min(600));
let response = match query_daemon_with_timeout(&socket_request, read_timeout)? {
Some(r) => r,
None => return Ok(None),
};
decode_gh_monitor_response(response).map(Some)
}
pub fn gh_status(request: &GhStatusRequest) -> anyhow::Result<Option<GhMonitorStatus>> {
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "gh-status".to_string(),
payload: serde_json::to_value(request)?,
};
let response = match query_daemon(&socket_request)? {
Some(r) => r,
None => return Ok(None),
};
decode_gh_monitor_response(response).map(Some)
}
pub fn gh_monitor_control(
request: &GhMonitorControlRequest,
) -> anyhow::Result<Option<GhMonitorHealth>> {
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "gh-monitor-control".to_string(),
payload: serde_json::to_value(request)?,
};
let drain_timeout_secs = request.drain_timeout_secs.unwrap_or(30);
let read_timeout = std::time::Duration::from_secs((drain_timeout_secs + 30).min(600));
let response = match query_daemon_with_timeout(&socket_request, read_timeout)? {
Some(r) => r,
None => return Ok(None),
};
decode_gh_monitor_health_response(response).map(Some)
}
pub fn gh_monitor_health(team: &str) -> anyhow::Result<Option<GhMonitorHealth>> {
gh_monitor_health_with_context(team, None)
}
pub fn gh_monitor_health_with_context(
team: &str,
config_cwd: Option<String>,
) -> anyhow::Result<Option<GhMonitorHealth>> {
let socket_request = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "gh-monitor-health".to_string(),
payload: serde_json::json!({
"team": team,
"config_cwd": config_cwd,
}),
};
let response = match query_daemon(&socket_request)? {
Some(r) => r,
None => return Ok(None),
};
decode_gh_monitor_health_response(response).map(Some)
}
fn decode_gh_monitor_response(response: SocketResponse) -> anyhow::Result<GhMonitorStatus> {
if !response.is_ok() {
let Some(err) = response.error else {
anyhow::bail!("Daemon returned gh-monitor error status without error payload");
};
anyhow::bail!(
"Daemon returned error for {} command: {}: {}",
response.request_id,
err.code,
err.message
);
}
let payload = response
.payload
.ok_or_else(|| anyhow::anyhow!("Daemon returned ok status but no payload"))?;
serde_json::from_value::<GhMonitorStatus>(payload)
.map_err(|e| anyhow::anyhow!("Failed to parse GhMonitorStatus from daemon response: {e}"))
}
fn decode_gh_monitor_health_response(response: SocketResponse) -> anyhow::Result<GhMonitorHealth> {
if !response.is_ok() {
let Some(err) = response.error else {
anyhow::bail!("Daemon returned gh-monitor health error status without error payload");
};
anyhow::bail!(
"Daemon returned error for {} command: {}: {}",
response.request_id,
err.code,
err.message
);
}
let payload = response
.payload
.ok_or_else(|| anyhow::anyhow!("Daemon returned ok status but no payload"))?;
serde_json::from_value::<GhMonitorHealth>(payload)
.map_err(|e| anyhow::anyhow!("Failed to parse GhMonitorHealth from daemon response: {e}"))
}
fn decode_register_hint_response(response: SocketResponse) -> anyhow::Result<RegisterHintOutcome> {
if response.is_ok() {
return Ok(RegisterHintOutcome::Registered);
}
let Some(err) = response.error else {
anyhow::bail!("Daemon returned register-hint error status without error payload");
};
if err.code == "UNKNOWN_COMMAND" {
return Ok(RegisterHintOutcome::UnsupportedDaemon);
}
anyhow::bail!(
"Daemon returned error for register-hint command: {}: {}",
err.code,
err.message
)
}
fn new_request_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
let id = std::process::id();
format!("req-{id}-{nanos}")
}
#[cfg(unix)]
fn query_daemon_unix(
request: &SocketRequest,
read_timeout: std::time::Duration,
) -> anyhow::Result<Option<SocketResponse>> {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::time::{Duration, Instant};
let socket_path = daemon_socket_path()?;
let stream = match UnixStream::connect(&socket_path) {
Ok(s) => s,
Err(_) => {
if daemon_autostart_enabled() {
ensure_daemon_running_unix()?;
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match UnixStream::connect(&socket_path) {
Ok(s) => break s,
Err(e) if Instant::now() < deadline => {
let _ = e;
std::thread::sleep(Duration::from_millis(100));
}
Err(e) => {
anyhow::bail!(
"daemon auto-start attempted but socket remained unavailable at {}: {e}",
socket_path.display()
)
}
}
}
} else {
if socket_path.exists() {
let mut connected = None;
for _ in 0..3 {
match UnixStream::connect(&socket_path) {
Ok(s) => {
connected = Some(s);
break;
}
Err(_) => std::thread::sleep(Duration::from_millis(100)),
}
}
match connected {
Some(s) => s,
None => return Ok(None),
}
} else {
return Ok(None);
}
}
}
};
stream.set_read_timeout(Some(read_timeout)).ok();
stream
.set_write_timeout(Some(Duration::from_millis(500)))
.ok();
let request_line = serde_json::to_string(request)?;
{
let mut writer = std::io::BufWriter::new(&stream);
writer.write_all(request_line.as_bytes())?;
writer.write_all(b"\n")?;
writer.flush()?;
}
let mut reader = BufReader::new(&stream);
let mut response_line = String::new();
match reader.read_line(&mut response_line) {
Ok(0) | Err(_) => return Ok(None), Ok(_) => {}
}
let response: SocketResponse = match serde_json::from_str(response_line.trim()) {
Ok(r) => r,
Err(_) => return Ok(None),
};
Ok(Some(response))
}
#[cfg(unix)]
fn daemon_autostart_enabled() -> bool {
let Ok(raw) = std::env::var("ATM_DAEMON_AUTOSTART") else {
return true;
};
!matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "no"
)
}
#[cfg(unix)]
fn resolve_daemon_binary() -> std::ffi::OsString {
if let Some(override_bin) = std::env::var_os("ATM_DAEMON_BIN")
&& !override_bin.is_empty()
{
return override_bin;
}
let name = std::ffi::OsString::from("atm-daemon");
if let Ok(current_exe) = std::env::current_exe()
&& let Some(dir) = current_exe.parent()
{
let sibling = dir.join(std::path::Path::new(&name));
if sibling.exists() {
return sibling.into_os_string();
}
}
name
}
#[cfg(unix)]
fn ensure_daemon_running_unix() -> anyhow::Result<()> {
use crate::event_log::{EventFields, emit_event_best_effort};
use crate::io::InboxError;
use std::io::ErrorKind;
use std::io::Read;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
if !daemon_autostart_enabled() {
return Ok(());
}
let home = crate::home::get_home_dir()?;
let daemon_running = daemon_is_running();
let socket_connectable = daemon_socket_connectable(&home);
if daemon_running || socket_connectable {
if let Some(reason) = detect_daemon_identity_mismatch(&home, socket_connectable) {
restart_mismatched_daemon(&home, &reason)?;
} else {
return Ok(());
}
}
cleanup_stale_daemon_runtime_files(&home);
let startup_lock_path = daemon_start_lock_path()?;
if let Some(parent) = startup_lock_path.parent() {
std::fs::create_dir_all(parent)?;
}
let _startup_lock = match crate::io::lock::acquire_lock(&startup_lock_path, 3) {
Ok(lock) => Some(lock),
Err(InboxError::LockTimeout { .. }) => {
for _ in 0..10 {
if daemon_is_running() || daemon_socket_connectable(&home) {
return Ok(());
}
std::thread::sleep(Duration::from_millis(100));
}
match crate::io::lock::acquire_lock(&startup_lock_path, 10) {
Ok(lock) => Some(lock),
Err(e) => anyhow::bail!(
"timed out waiting for daemon startup lock holder to bring daemon online: {} ({e})",
startup_lock_path.display()
),
}
}
Err(e) => anyhow::bail!(
"failed to acquire daemon startup lock {}: {e}",
startup_lock_path.display()
),
};
let daemon_running = daemon_is_running();
let socket_connectable = daemon_socket_connectable(&home);
if daemon_running || socket_connectable {
if let Some(reason) = detect_daemon_identity_mismatch(&home, socket_connectable) {
restart_mismatched_daemon(&home, &reason)?;
} else {
return Ok(());
}
}
let daemon_bin = resolve_daemon_binary();
emit_event_best_effort(EventFields {
level: "info",
source: "atm",
action: "daemon_autostart_attempt",
result: Some("attempt".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
..Default::default()
});
let mut child = match Command::new(&daemon_bin)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(e) => {
let error = if e.kind() == ErrorKind::NotFound {
format!(
"failed to auto-start daemon: binary '{}' not found in PATH (or ATM_DAEMON_BIN override)",
std::path::PathBuf::from(&daemon_bin).display()
)
} else {
format!(
"failed to auto-start daemon via '{}': {e}",
std::path::PathBuf::from(&daemon_bin).display()
)
};
emit_event_best_effort(EventFields {
level: "error",
source: "atm",
action: "daemon_autostart_failure",
result: Some("spawn_error".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
error: Some(error.clone()),
..Default::default()
});
anyhow::bail!("{error}");
}
};
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if daemon_is_running() || daemon_socket_connectable(&home) {
emit_event_best_effort(EventFields {
level: "info",
source: "atm",
action: "daemon_autostart_success",
result: Some("ok".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
..Default::default()
});
return Ok(());
}
if let Some(status) = child.try_wait()? {
let stderr_tail = child.stderr.take().and_then(|mut stderr| {
let mut buf = Vec::new();
stderr.read_to_end(&mut buf).ok()?;
if buf.is_empty() {
return None;
}
let trimmed = if buf.len() > 4096 {
&buf[buf.len() - 4096..]
} else {
&buf
};
let text = String::from_utf8_lossy(trimmed).trim().to_string();
if text.is_empty() { None } else { Some(text) }
});
let error = match stderr_tail {
Some(tail) => {
format!(
"daemon process exited during startup with status {status}; stderr_tail={tail}"
)
}
None => format!("daemon process exited during startup with status {status}"),
};
emit_event_best_effort(EventFields {
level: "error",
source: "atm",
action: "daemon_autostart_failure",
result: Some("process_exit".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
error: Some(error.clone()),
..Default::default()
});
anyhow::bail!("{error}");
}
std::thread::sleep(Duration::from_millis(100));
}
let socket_path = daemon_socket_path()?;
let pid_path = daemon_pid_path()?;
let timeout_error = format!(
"daemon startup timed out after 5s; pid_file_exists={}, socket_exists={}, pid_path={}, socket_path={}",
pid_path.exists(),
socket_path.exists(),
pid_path.display(),
socket_path.display()
);
emit_event_best_effort(EventFields {
level: "warn",
source: "atm",
action: "daemon_autostart_timeout",
result: Some("timeout".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
error: Some(timeout_error.clone()),
..Default::default()
});
emit_event_best_effort(EventFields {
level: "error",
source: "atm",
action: "daemon_autostart_failure",
result: Some("timeout".to_string()),
target: Some(std::path::PathBuf::from(&daemon_bin).display().to_string()),
error: Some(timeout_error.clone()),
..Default::default()
});
anyhow::bail!("{timeout_error}")
}
#[cfg(unix)]
fn daemon_socket_connectable(home: &std::path::Path) -> bool {
use std::os::unix::net::UnixStream;
let socket_path = home.join(".atm/daemon/atm-daemon.sock");
UnixStream::connect(socket_path).is_ok()
}
#[cfg(unix)]
fn cleanup_stale_daemon_runtime_files(home: &std::path::Path) {
let socket_path = home.join(".atm/daemon/atm-daemon.sock");
let pid_path = home.join(".atm/daemon/atm-daemon.pid");
let pid_state = read_daemon_pid_state(&pid_path);
if matches!(
pid_state,
PidState::Dead | PidState::Missing | PidState::Malformed
) {
let _ = std::fs::remove_file(&pid_path);
}
let ownership_known_dead = matches!(
pid_state,
PidState::Dead | PidState::Missing | PidState::Malformed
);
if socket_path.exists() && ownership_known_dead && !daemon_socket_connectable(home) {
let _ = std::fs::remove_file(&socket_path);
}
}
#[cfg(unix)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PidState {
Missing,
Malformed,
Unreadable,
Dead,
Alive,
}
#[cfg(unix)]
fn read_daemon_pid_state(pid_path: &std::path::Path) -> PidState {
if !pid_path.exists() {
return PidState::Missing;
}
let content = match std::fs::read_to_string(pid_path) {
Ok(s) => s,
Err(_) => return PidState::Unreadable,
};
let pid = match content.trim().parse::<i32>() {
Ok(pid) => pid,
Err(_) => return PidState::Malformed,
};
if pid_alive(pid) {
PidState::Alive
} else {
PidState::Dead
}
}
#[cfg(unix)]
#[derive(Debug, Clone, Default)]
struct DaemonIdentitySnapshot {
pid_from_file: Option<u32>,
pid_from_status: Option<u32>,
version_from_status: Option<String>,
metadata: Option<DaemonLockMetadata>,
socket_connectable: bool,
}
#[cfg(unix)]
fn evaluate_daemon_identity_mismatch(
snapshot: &DaemonIdentitySnapshot,
expected_home: &str,
expected_bin: &std::ffi::OsStr,
expected_version: &str,
pid_alive_fn: impl Fn(i32) -> bool,
pid_command_line_fn: impl Fn(i32) -> Option<String>,
) -> Option<String> {
if snapshot.metadata.is_none() && !snapshot.socket_connectable {
return None;
}
if snapshot.metadata.is_none() {
return Some(
"daemon identity mismatch: lock metadata missing (soft mismatch, restart required)"
.to_string(),
);
}
let pid = snapshot
.metadata
.as_ref()
.map(|m| m.pid)
.or(snapshot.pid_from_file)
.or(snapshot.pid_from_status)?;
if !pid_alive_fn(pid as i32) {
return Some(format!("daemon identity mismatch: pid {pid} is not alive"));
}
if let Some(meta) = &snapshot.metadata {
if let Some(file_pid) = snapshot.pid_from_file
&& file_pid != meta.pid
{
return Some(format!(
"daemon identity mismatch: pid file ({file_pid}) != lock metadata ({})",
meta.pid
));
}
if !meta.home_scope.is_empty() && meta.home_scope != expected_home {
return Some(format!(
"daemon identity mismatch: home scope '{}' != expected '{}'",
meta.home_scope, expected_home
));
}
if let Some(cmdline) = pid_command_line_fn(pid as i32)
&& let Some(matches) = pid_command_matches_expected_binary(&cmdline, expected_bin)
&& !matches
{
return Some(format!(
"daemon identity mismatch: running command '{}' != expected daemon binary '{}'",
cmdline,
std::path::PathBuf::from(expected_bin).display()
));
}
}
if let Some(ver) = snapshot.version_from_status.as_deref()
&& ver != expected_version
{
return Some(format!(
"daemon version mismatch: running={ver} expected={expected_version}"
));
}
None
}
#[cfg(unix)]
fn detect_daemon_identity_mismatch(
home: &std::path::Path,
socket_connectable: bool,
) -> Option<String> {
let pid_path = home.join(".atm/daemon/atm-daemon.pid");
let status_path = daemon_status_path_for(home);
let metadata_path = daemon_lock_metadata_path_for(home);
let pid_from_file = std::fs::read_to_string(&pid_path)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
let status_json = std::fs::read_to_string(&status_path)
.ok()
.and_then(|content| serde_json::from_str::<serde_json::Value>(&content).ok());
let pid_from_status = status_json
.as_ref()
.and_then(|json| json.get("pid").and_then(serde_json::Value::as_u64))
.map(|pid| pid as u32);
let version_from_status = status_json
.as_ref()
.and_then(|json| json.get("version").and_then(serde_json::Value::as_str))
.map(std::string::ToString::to_string);
let mut metadata = std::fs::read_to_string(&metadata_path)
.ok()
.and_then(|s| serde_json::from_str::<DaemonLockMetadata>(&s).ok());
if metadata.is_none()
&& let Some(candidate_pid) = pid_from_file.or(pid_from_status)
&& pid_alive(candidate_pid as i32)
{
std::thread::sleep(std::time::Duration::from_millis(150));
metadata = std::fs::read_to_string(&metadata_path)
.ok()
.and_then(|s| serde_json::from_str::<DaemonLockMetadata>(&s).ok());
}
let expected_home = std::fs::canonicalize(home)
.unwrap_or_else(|_| home.to_path_buf())
.to_string_lossy()
.to_string();
let expected_bin = resolve_daemon_binary();
let snapshot = DaemonIdentitySnapshot {
pid_from_file,
pid_from_status,
version_from_status,
metadata,
socket_connectable,
};
evaluate_daemon_identity_mismatch(
&snapshot,
&expected_home,
expected_bin.as_os_str(),
env!("CARGO_PKG_VERSION"),
pid_alive,
pid_command_line,
)
}
#[cfg(unix)]
fn pid_command_line(pid: i32) -> Option<String> {
let output = std::process::Command::new("ps")
.arg("-p")
.arg(pid.to_string())
.arg("-o")
.arg("command=")
.output()
.ok()?;
if !output.status.success() {
return None;
}
let text = String::from_utf8_lossy(&output.stdout).trim().to_string();
if text.is_empty() { None } else { Some(text) }
}
#[cfg(unix)]
fn pid_command_matches_expected_binary(
cmdline: &str,
expected_bin: &std::ffi::OsStr,
) -> Option<bool> {
let actual = cmdline.split_whitespace().next()?;
let expected = std::path::PathBuf::from(expected_bin);
let actual_path = std::path::PathBuf::from(actual);
if expected.as_os_str().is_empty() {
return None;
}
if expected.components().count() > 1 {
let expected_canon = std::fs::canonicalize(&expected).unwrap_or(expected.clone());
let actual_canon = std::fs::canonicalize(&actual_path).unwrap_or(actual_path.clone());
Some(expected_canon == actual_canon)
} else {
let expected_name = expected.file_name()?;
Some(actual_path.file_name() == Some(expected_name))
}
}
#[cfg(unix)]
fn restart_mismatched_daemon(home: &std::path::Path, reason: &str) -> anyhow::Result<()> {
use crate::event_log::{EventFields, emit_event_best_effort};
use std::time::Duration;
let pid_path = home.join(".atm/daemon/atm-daemon.pid");
let pid = std::fs::read_to_string(&pid_path)
.ok()
.and_then(|s| s.trim().parse::<i32>().ok());
emit_event_best_effort(EventFields {
level: "warn",
source: "atm",
action: "daemon_identity_restart",
result: Some("restart_attempt".to_string()),
error: Some(reason.to_string()),
..Default::default()
});
if let Some(pid) = pid
&& pid_alive(pid)
{
send_signal(pid, 15);
for _ in 0..20 {
if !pid_alive(pid) {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
if pid_alive(pid) {
send_signal(pid, 9);
for _ in 0..20 {
if !pid_alive(pid) {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}
if pid_alive(pid) {
emit_event_best_effort(EventFields {
level: "warn",
source: "atm",
action: "daemon_identity_restart",
result: Some("kill_incomplete".to_string()),
error: Some(format!(
"stale daemon pid {pid} still alive after SIGTERM/SIGKILL; proceeding with runtime file replacement"
)),
..Default::default()
});
}
}
let lock_path = home.join(".atm/daemon/daemon.lock");
if let Some(parent) = lock_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _lock_guard = crate::io::lock::acquire_lock(&lock_path, 5).map_err(|e| {
anyhow::anyhow!(
"failed to acquire daemon lock at {} before runtime cleanup: {e}",
lock_path.display()
)
})?;
let daemon_dir = home.join(".atm/daemon");
let _ = std::fs::remove_file(daemon_dir.join("atm-daemon.sock"));
let _ = std::fs::remove_file(daemon_dir.join("atm-daemon.pid"));
let _ = std::fs::remove_file(daemon_dir.join("status.json"));
cleanup_stale_daemon_runtime_files(home);
Ok(())
}
#[cfg(unix)]
fn send_signal(pid: i32, sig: i32) {
unsafe extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
let _ = unsafe { kill(pid, sig) };
}
#[cfg(unix)]
fn subscribe_stream_events_unix() -> anyhow::Result<Option<StreamSubscription>> {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
let socket_path = daemon_socket_path()?;
let mut stream = match UnixStream::connect(&socket_path) {
Ok(s) => s,
Err(_) => return Ok(None),
};
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: new_request_id(),
command: "stream-subscribe".to_string(),
payload: serde_json::json!({}),
};
let req_line = serde_json::to_string(&req)?;
stream.write_all(req_line.as_bytes())?;
stream.write_all(b"\n")?;
stream.flush()?;
{
let mut ack_reader = BufReader::new(stream.try_clone()?);
let mut ack_line = String::new();
if ack_reader.read_line(&mut ack_line)? == 0 {
return Ok(None);
}
let ack_json: serde_json::Value = match serde_json::from_str(ack_line.trim()) {
Ok(v) => v,
Err(_) => return Ok(None),
};
let ok = ack_json
.get("status")
.and_then(|v| v.as_str())
.map(|s| s == "ok")
.unwrap_or(false);
let streaming = ack_json
.get("streaming")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !(ok && streaming) {
return Ok(None);
}
}
let (tx, rx) = std::sync::mpsc::channel::<crate::daemon_stream::DaemonStreamEvent>();
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_thread = std::sync::Arc::clone(&stop);
stream
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.ok();
std::thread::spawn(move || {
let mut reader = BufReader::new(stream);
loop {
if stop_thread.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut line = String::new();
let n = match reader.read_line(&mut line) {
Ok(n) => n,
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut =>
{
continue;
}
Err(_) => break,
};
if n == 0 {
break;
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(event) =
serde_json::from_str::<crate::daemon_stream::DaemonStreamEvent>(trimmed)
{
if tx.send(event).is_err() {
break;
}
}
}
});
Ok(Some(StreamSubscription { rx, stop }))
}
#[cfg(unix)]
fn pid_alive(pid: i32) -> bool {
unsafe extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
let result = unsafe { kill(pid, 0) };
result == 0
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
fn wait_for_daemon_runtime_ready(home: &std::path::Path) -> bool {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
let pid_path = home.join(".atm/daemon/atm-daemon.pid");
while std::time::Instant::now() < deadline {
if pid_path.exists() && super::daemon_socket_connectable(home) {
return true;
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
false
}
#[cfg(unix)]
fn wait_for_daemon_version(home: &std::path::Path, expected_version: &str) -> Option<i32> {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
let pid_path = home.join(".atm/daemon/atm-daemon.pid");
let status_path = home.join(".atm/daemon/status.json");
while std::time::Instant::now() < deadline {
let pid = std::fs::read_to_string(&pid_path)
.ok()
.and_then(|raw| raw.trim().parse::<i32>().ok());
let status_version = std::fs::read_to_string(&status_path)
.ok()
.and_then(|raw| serde_json::from_str::<serde_json::Value>(&raw).ok())
.and_then(|json| {
json.get("version")
.and_then(serde_json::Value::as_str)
.map(str::to_string)
});
if let Some(pid) = pid
&& super::pid_alive(pid)
&& status_version.as_deref() == Some(expected_version)
{
return Some(pid);
}
std::thread::sleep(std::time::Duration::from_millis(25));
}
None
}
#[cfg(unix)]
fn fake_lock_metadata(home: &str, pid: u32) -> DaemonLockMetadata {
DaemonLockMetadata {
pid,
executable_path: std::env::temp_dir()
.join("fake-atm-daemon")
.to_string_lossy()
.into_owned(),
home_scope: home.to_string(),
version: "0.0.1".to_string(),
written_at: chrono::Utc::now().to_rfc3339(),
}
}
use serial_test::serial;
fn with_autostart_disabled<T>(f: impl FnOnce() -> T) -> T {
let old = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "0") };
let out = f();
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
out
}
#[test]
fn test_socket_request_serialization() {
let req = SocketRequest {
version: 1,
request_id: "req-123".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": "arch-ctm", "team": "atm-dev" }),
};
let json = serde_json::to_string(&req).unwrap();
let decoded: SocketRequest = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.version, 1);
assert_eq!(decoded.request_id, "req-123");
assert_eq!(decoded.command, "agent-state");
}
#[test]
fn test_socket_response_ok_deserialization() {
let json = r#"{"version":1,"request_id":"req-123","status":"ok","payload":{"state":"idle","last_transition":"2026-02-16T22:30:00Z"}}"#;
let resp: SocketResponse = serde_json::from_str(json).unwrap();
assert!(resp.is_ok());
assert_eq!(resp.request_id, "req-123");
assert!(resp.payload.is_some());
assert!(resp.error.is_none());
}
#[test]
fn test_socket_response_error_deserialization() {
let json = r#"{"version":1,"request_id":"req-456","status":"error","error":{"code":"AGENT_NOT_FOUND","message":"Agent 'unknown' is not tracked"}}"#;
let resp: SocketResponse = serde_json::from_str(json).unwrap();
assert!(!resp.is_ok());
let err = resp.error.unwrap();
assert_eq!(err.code, "AGENT_NOT_FOUND");
}
#[test]
fn test_agent_state_info_deserialization() {
let json = r#"{"state":"idle","last_transition":"2026-02-16T22:30:00Z"}"#;
let info: AgentStateInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.state, "idle");
assert_eq!(
info.last_transition.as_deref(),
Some("2026-02-16T22:30:00Z")
);
}
#[test]
fn test_agent_state_info_missing_transition() {
let json = r#"{"state":"launching"}"#;
let info: AgentStateInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.state, "launching");
assert!(info.last_transition.is_none());
}
#[test]
#[serial]
fn test_query_daemon_no_socket_returns_none() {
with_autostart_disabled(|| {
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "req-test".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({}),
};
let result = query_daemon(&req);
assert!(result.is_ok());
});
}
#[cfg(unix)]
#[test]
#[serial]
fn test_daemon_autostart_flag_parsing() {
let old = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe { std::env::remove_var("ATM_DAEMON_AUTOSTART") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "1") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "true") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "yes") };
assert!(daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "0") };
assert!(!daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "false") };
assert!(!daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "no") };
assert!(!daemon_autostart_enabled());
unsafe { std::env::set_var("ATM_DAEMON_AUTOSTART", "maybe") };
assert!(daemon_autostart_enabled());
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_resolve_daemon_binary_honors_override() {
let old = std::env::var("ATM_DAEMON_BIN").ok();
let tmp = tempfile::tempdir().unwrap();
let custom = tmp.path().join("custom-atm-daemon");
std::fs::write(&custom, "#!/bin/sh\nexit 0\n").unwrap();
unsafe { std::env::set_var("ATM_DAEMON_BIN", &custom) };
let resolved = resolve_daemon_binary();
assert_eq!(std::path::PathBuf::from(resolved), custom);
unsafe {
match old {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_removes_dead_pid_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".atm/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
fs::write(&pid_path, "999999\n").unwrap();
assert!(pid_path.exists());
cleanup_stale_daemon_runtime_files(home);
assert!(
!pid_path.exists(),
"stale PID file should be removed when PID is not alive"
);
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_handles_malformed_pid() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".atm/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
let socket_path = daemon_dir.join("atm-daemon.sock");
fs::write(&pid_path, "not-a-pid\n").unwrap();
fs::write(&socket_path, "stale").unwrap();
cleanup_stale_daemon_runtime_files(home);
assert!(
!pid_path.exists(),
"malformed PID file should be removed during cleanup"
);
assert!(
!socket_path.exists(),
"stale socket should be removed when PID ownership is known-dead"
);
}
#[cfg(unix)]
#[test]
#[serial]
fn test_cleanup_stale_runtime_files_unreadable_pid_does_not_remove_socket() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
let daemon_dir = home.join(".atm/daemon");
fs::create_dir_all(&daemon_dir).unwrap();
let pid_path = daemon_dir.join("atm-daemon.pid");
let socket_path = daemon_dir.join("atm-daemon.sock");
fs::write(&pid_path, "123\n").unwrap();
fs::write(&socket_path, "stale").unwrap();
let mut perms = fs::metadata(&pid_path).unwrap().permissions();
perms.set_mode(0o000);
fs::set_permissions(&pid_path, perms).unwrap();
cleanup_stale_daemon_runtime_files(home);
assert!(
socket_path.exists(),
"socket must not be removed when PID ownership cannot be read"
);
let mut restore = fs::metadata(&pid_path).unwrap().permissions();
restore.set_mode(0o600);
fs::set_permissions(&pid_path, restore).unwrap();
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_includes_stderr_tail_on_startup_exit() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
let script_path = home.join("fake-daemon-fail.sh");
let script = r#"#!/bin/sh
set -eu
echo "fatal: invalid plugin config" >&2
exit 42
"#;
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
let err = ensure_daemon_running_unix().expect_err("startup should fail");
let msg = err.to_string();
assert!(
msg.contains("stderr_tail="),
"error must include captured stderr tail: {msg}"
);
assert!(
msg.contains("invalid plugin config"),
"stderr tail should include daemon stderr content: {msg}"
);
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_timeout_when_spawned_process_never_creates_runtime_files() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
let script_path = home.join("fake-daemon-never-ready.sh");
let script = r#"#!/bin/sh
set -eu
sleep 10
"#;
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
let err = ensure_daemon_running_unix().expect_err("startup should time out");
let msg = err.to_string();
assert!(
msg.contains("daemon startup timed out after 5s"),
"timeout error should include actionable timeout details: {msg}"
);
assert!(
msg.contains("pid_path="),
"timeout error should include pid path"
);
assert!(
msg.contains("socket_path="),
"timeout error should include socket path"
);
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_serializes_concurrent_start() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::thread;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
let script_path = home.join("fake-daemon.sh");
let script = r#"#!/bin/sh
set -eu
home="${ATM_HOME:?}"
mkdir -p "$home/.atm/daemon"
mkdir -p "$home/spawn-markers"
touch "$home/spawn-markers/spawn.$$"
echo $$ > "$home/.atm/daemon/atm-daemon.pid"
sleep 2
"#;
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
let mut handles = Vec::new();
let barrier = Arc::new(std::sync::Barrier::new(2));
for _ in 0..2 {
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
ensure_daemon_running_unix().unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let count = fs::read_dir(home.join("spawn-markers"))
.ok()
.into_iter()
.flatten()
.filter_map(Result::ok)
.count();
assert_eq!(
count, 1,
"concurrent startup attempts should spawn at most one daemon process"
);
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
fn test_write_daemon_lock_metadata_contains_identity_fields() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path();
write_daemon_lock_metadata(home, "9.9.9-test").expect("write lock metadata");
let path = home.join(".atm/daemon/daemon.lock.meta.json");
let raw = std::fs::read_to_string(&path).expect("read lock metadata");
let meta: DaemonLockMetadata = serde_json::from_str(&raw).expect("parse lock metadata");
assert_eq!(meta.pid, std::process::id());
assert_eq!(meta.version, "9.9.9-test");
assert!(
!meta.executable_path.trim().is_empty(),
"executable path must be populated"
);
assert!(
!meta.home_scope.trim().is_empty(),
"home scope must be populated"
);
}
#[cfg(unix)]
#[test]
fn test_evaluate_daemon_identity_mismatch_requires_metadata_or_socket() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_string_lossy().into_owned();
let snapshot = DaemonIdentitySnapshot::default();
let reason = evaluate_daemon_identity_mismatch(
&snapshot,
&home,
std::ffi::OsStr::new("atm-daemon"),
env!("CARGO_PKG_VERSION"),
|_| true,
|_| Some("atm-daemon".to_string()),
);
assert_eq!(reason, None);
}
#[cfg(unix)]
#[test]
fn test_evaluate_daemon_identity_mismatch_reports_missing_metadata_when_socket_live() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_string_lossy().into_owned();
let snapshot = DaemonIdentitySnapshot {
socket_connectable: true,
..Default::default()
};
let reason = evaluate_daemon_identity_mismatch(
&snapshot,
&home,
std::ffi::OsStr::new("atm-daemon"),
env!("CARGO_PKG_VERSION"),
|_| true,
|_| Some("atm-daemon".to_string()),
)
.expect("expected mismatch");
assert!(reason.contains("lock metadata missing"));
}
#[cfg(unix)]
#[test]
fn test_evaluate_daemon_identity_mismatch_reports_command_mismatch() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_string_lossy().into_owned();
let snapshot = DaemonIdentitySnapshot {
metadata: Some(fake_lock_metadata(&home, 4242)),
pid_from_file: Some(4242),
socket_connectable: true,
..Default::default()
};
let reason = evaluate_daemon_identity_mismatch(
&snapshot,
&home,
std::ffi::OsStr::new("atm-daemon"),
env!("CARGO_PKG_VERSION"),
|_| true,
|_| Some("/usr/bin/python3 -m stale-daemon".to_string()),
)
.expect("expected mismatch");
assert!(reason.contains("running command"));
assert!(reason.contains("expected daemon binary"));
}
#[cfg(unix)]
#[test]
fn test_evaluate_daemon_identity_mismatch_reports_version_mismatch() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_string_lossy().into_owned();
let snapshot = DaemonIdentitySnapshot {
metadata: Some(fake_lock_metadata(&home, 4242)),
pid_from_file: Some(4242),
version_from_status: Some("0.0.1".to_string()),
socket_connectable: true,
..Default::default()
};
let reason = evaluate_daemon_identity_mismatch(
&snapshot,
&home,
std::ffi::OsStr::new("atm-daemon"),
env!("CARGO_PKG_VERSION"),
|_| true,
|_| Some("atm-daemon".to_string()),
)
.expect("expected mismatch");
assert!(reason.contains("daemon version mismatch"));
assert!(reason.contains("running=0.0.1"));
}
#[cfg(unix)]
#[test]
fn test_evaluate_daemon_identity_mismatch_accepts_matching_identity() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_string_lossy().into_owned();
let snapshot = DaemonIdentitySnapshot {
metadata: Some(fake_lock_metadata(&home, 4242)),
pid_from_file: Some(4242),
version_from_status: Some(env!("CARGO_PKG_VERSION").to_string()),
socket_connectable: true,
..Default::default()
};
let reason = evaluate_daemon_identity_mismatch(
&snapshot,
&home,
std::ffi::OsStr::new("atm-daemon"),
env!("CARGO_PKG_VERSION"),
|_| true,
|_| Some("atm-daemon --serve".to_string()),
);
assert_eq!(reason, None);
}
#[cfg(unix)]
#[test]
#[serial]
fn test_ensure_daemon_running_recovers_from_dead_pid_metadata() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
fs::create_dir_all(home.join(".atm/daemon")).unwrap();
fs::write(home.join(".atm/daemon/atm-daemon.pid"), "999999\n").unwrap();
let stale = DaemonLockMetadata {
pid: 999999,
executable_path: std::env::temp_dir()
.join("old-atm-daemon")
.to_string_lossy()
.to_string(),
home_scope: home.to_string_lossy().to_string(),
version: "0.0.1".to_string(),
written_at: chrono::Utc::now().to_rfc3339(),
};
fs::write(
home.join(".atm/daemon/daemon.lock.meta.json"),
serde_json::to_string_pretty(&stale).unwrap(),
)
.unwrap();
let script_path = home.join("fake-daemon-start.sh");
let script = format!(
r#"#!/bin/sh
set -eu
home="${{ATM_HOME:?}}"
mkdir -p "$home/.atm/daemon"
pid=$$
echo "$pid" > "$home/.atm/daemon/atm-daemon.pid"
cat > "$home/.atm/daemon/status.json" <<'JSON'
{{"timestamp":"2026-01-01T00:00:00Z","pid":0,"version":"{}","uptime_secs":1,"plugins":[],"teams":[]}}
JSON
python3 - <<'PY'
import json, os
home=os.environ["ATM_HOME"]
path=os.path.join(home, ".atm", "daemon", "status.json")
with open(path, "r", encoding="utf-8") as f:
obj=json.load(f)
obj["pid"]=os.getpid()
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f)
open(os.path.join(home, "started-ok"), "w").write("ok")
PY
sleep 8
"#,
env!("CARGO_PKG_VERSION")
);
fs::write(&script_path, script).unwrap();
let mut perms = fs::metadata(&script_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_path, perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_BIN", &script_path);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
ensure_daemon_running_unix().expect("must recover from dead stale pid metadata");
let marker = home.join("started-ok");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline && !marker.exists() {
std::thread::sleep(std::time::Duration::from_millis(50));
}
assert!(marker.exists(), "expected replacement daemon to start");
if let Ok(pid_str) = std::fs::read_to_string(home.join(".atm/daemon/atm-daemon.pid"))
&& let Ok(pid) = pid_str.trim().parse::<i32>()
&& pid_alive(pid)
{
send_signal(pid, 15);
}
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[cfg(unix)]
#[test]
#[serial]
#[ignore = "smoke coverage only; exercises real subprocess and socket timing"]
fn test_ensure_daemon_running_restarts_identity_mismatch_daemon() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().to_path_buf();
fs::create_dir_all(home.join(".atm/daemon")).unwrap();
let stale_script = home.join("stale-daemon.sh");
let stale = r#"#!/bin/sh
set -eu
home="${ATM_HOME:?}"
mkdir -p "$home/.atm/daemon"
pid=$$
echo "$pid" > "$home/.atm/daemon/atm-daemon.pid"
cat > "$home/.atm/daemon/status.json" <<'JSON'
{"timestamp":"2026-01-01T00:00:00Z","pid":0,"version":"0.0.1","uptime_secs":1,"plugins":[],"teams":[]}
JSON
python3 - <<'PY'
import json, os
home=os.environ["ATM_HOME"]
path=os.path.join(home, ".atm", "daemon", "status.json")
with open(path, "r", encoding="utf-8") as f:
obj=json.load(f)
obj["pid"]=os.getpid()
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f)
PY
exec python3 - "$home/.atm/daemon/atm-daemon.sock" <<'PY'
import os, signal, socket, sys, time
path=sys.argv[1]
try:
os.unlink(path)
except FileNotFoundError:
pass
srv=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv.bind(path)
srv.listen(1)
def shutdown(*_):
try:
srv.close()
finally:
try:
os.unlink(path)
except FileNotFoundError:
pass
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
while True:
time.sleep(1)
PY
"#;
fs::write(&stale_script, stale).unwrap();
let mut stale_perms = fs::metadata(&stale_script).unwrap().permissions();
stale_perms.set_mode(0o755);
fs::set_permissions(&stale_script, stale_perms).unwrap();
let expected_script = home.join("expected-daemon.sh");
let expected = format!(
r#"#!/bin/sh
set -eu
home="${{ATM_HOME:?}}"
mkdir -p "$home/.atm/daemon"
pid=$$
echo "$pid" > "$home/.atm/daemon/atm-daemon.pid"
cat > "$home/.atm/daemon/status.json" <<'JSON'
{{"timestamp":"2026-01-01T00:00:00Z","pid":0,"version":"{}","uptime_secs":1,"plugins":[],"teams":[]}}
JSON
python3 - <<'PY'
import json, os
home=os.environ["ATM_HOME"]
path=os.path.join(home, ".atm", "daemon", "status.json")
with open(path, "r", encoding="utf-8") as f:
obj=json.load(f)
obj["pid"]=os.getpid()
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f)
open(os.path.join(home, "replacement-started"), "w").write("ok")
with open(os.path.join(home, "replacement-started"), "a", encoding="utf-8") as f:
f.flush()
os.fsync(f.fileno())
PY
sleep 8
"#,
env!("CARGO_PKG_VERSION")
);
fs::write(&expected_script, expected).unwrap();
let mut expected_perms = fs::metadata(&expected_script).unwrap().permissions();
expected_perms.set_mode(0o755);
fs::set_permissions(&expected_script, expected_perms).unwrap();
let old_home = std::env::var("ATM_HOME").ok();
let old_bin = std::env::var("ATM_DAEMON_BIN").ok();
let old_auto = std::env::var("ATM_DAEMON_AUTOSTART").ok();
unsafe {
std::env::set_var("ATM_HOME", &home);
std::env::set_var("ATM_DAEMON_AUTOSTART", "0");
}
let mut stale_child = std::process::Command::new(&stale_script)
.env("ATM_HOME", &home)
.spawn()
.expect("spawn stale daemon");
assert!(
wait_for_daemon_runtime_ready(&home),
"stale daemon must publish pid file and bind socket before mismatch check"
);
let stale_pid: u32 = std::fs::read_to_string(home.join(".atm/daemon/atm-daemon.pid"))
.unwrap()
.trim()
.parse()
.unwrap();
let stale_metadata = DaemonLockMetadata {
pid: stale_pid,
executable_path: stale_script.to_string_lossy().to_string(),
home_scope: std::fs::canonicalize(&home)
.unwrap_or_else(|_| home.clone())
.to_string_lossy()
.to_string(),
version: "0.0.1".to_string(),
written_at: chrono::Utc::now().to_rfc3339(),
};
std::fs::write(
home.join(".atm/daemon/daemon.lock.meta.json"),
serde_json::to_string_pretty(&stale_metadata).unwrap(),
)
.unwrap();
unsafe {
std::env::set_var("ATM_DAEMON_BIN", &expected_script);
std::env::set_var("ATM_DAEMON_AUTOSTART", "1");
}
ensure_daemon_running_unix().expect("mismatch daemon should be restarted");
let stale_exit_deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
let mut stale_exited = false;
while std::time::Instant::now() < stale_exit_deadline {
if stale_child.try_wait().ok().flatten().is_some() {
stale_exited = true;
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
let new_pid = wait_for_daemon_version(&home, env!("CARGO_PKG_VERSION"))
.expect("replacement daemon missing");
if !stale_exited && stale_child.try_wait().ok().flatten().is_none() {
let _ = stale_child.kill();
let _ = stale_child.wait();
}
assert!(
stale_exited,
"stale daemon must exit during mismatch restart"
);
assert!(new_pid > 1, "replacement daemon pid must be valid");
if pid_alive(new_pid) {
send_signal(new_pid, 15);
}
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
match old_bin {
Some(v) => std::env::set_var("ATM_DAEMON_BIN", v),
None => std::env::remove_var("ATM_DAEMON_BIN"),
}
match old_auto {
Some(v) => std::env::set_var("ATM_DAEMON_AUTOSTART", v),
None => std::env::remove_var("ATM_DAEMON_AUTOSTART"),
}
}
}
#[test]
fn test_new_request_id_is_unique() {
let id1 = new_request_id();
std::thread::sleep(std::time::Duration::from_nanos(1000));
let id2 = new_request_id();
assert!(!id1.is_empty());
assert!(!id2.is_empty());
}
#[test]
fn test_daemon_socket_path_contains_expected_suffix() {
let path = daemon_socket_path().unwrap();
assert!(path.to_string_lossy().ends_with("atm-daemon.sock"));
assert!(path.to_string_lossy().contains(".atm/daemon"));
}
#[test]
fn test_daemon_pid_path_contains_expected_suffix() {
let path = daemon_pid_path().unwrap();
assert!(path.to_string_lossy().ends_with("atm-daemon.pid"));
assert!(path.to_string_lossy().contains(".atm/daemon"));
}
#[test]
#[serial]
fn test_query_agent_state_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_agent_state("arch-ctm", "atm-dev");
assert!(result.is_ok());
});
}
#[test]
#[serial]
fn test_query_team_member_states_offline_returns_none() {
with_autostart_disabled(|| {
let tmp = tempfile::tempdir().expect("tempdir");
let old_home = std::env::var("ATM_HOME").ok();
unsafe { std::env::set_var("ATM_HOME", tmp.path()) };
let result = query_team_member_states("atm-dev");
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
}
assert!(
matches!(result, Ok(None)),
"offline daemon must map to Ok(None), got: {result:?}"
);
});
}
#[cfg(unix)]
#[test]
#[serial]
fn test_query_team_member_states_invalid_payload_returns_err() {
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixListener;
with_autostart_disabled(|| {
let tmp = tempfile::tempdir().expect("tempdir");
let daemon_dir = tmp.path().join(".atm/daemon");
std::fs::create_dir_all(&daemon_dir).expect("create daemon dir");
let socket_path = daemon_dir.join("atm-daemon.sock");
let listener = UnixListener::bind(&socket_path).expect("bind socket");
let handle = std::thread::spawn(move || {
for _ in 0..32 {
let (mut stream, _) = listener.accept().expect("accept");
let mut request_line = String::new();
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
reader.read_line(&mut request_line).expect("read request");
if request_line.contains("\"command\":\"list-agents\"") {
let response = SocketResponse {
version: PROTOCOL_VERSION,
request_id: "req-test".to_string(),
status: "ok".to_string(),
payload: Some(serde_json::json!({
"agent": "arch-ctm",
"state": "active"
})),
error: None,
};
let line = serde_json::to_string(&response).expect("serialize response");
stream.write_all(line.as_bytes()).expect("write response");
stream.write_all(b"\n").expect("write newline");
return;
}
let ignored = SocketResponse {
version: PROTOCOL_VERSION,
request_id: "req-ignored".to_string(),
status: "error".to_string(),
payload: None,
error: Some(SocketError {
code: "IGNORED_FOR_TEST".to_string(),
message: "ignored non-list-agents request".to_string(),
}),
};
let line = serde_json::to_string(&ignored).expect("serialize ignored");
stream.write_all(line.as_bytes()).expect("write ignored");
stream.write_all(b"\n").expect("write newline");
}
panic!("expected list-agents request within retry budget");
});
let old_home = std::env::var("ATM_HOME").ok();
unsafe { std::env::set_var("ATM_HOME", tmp.path()) };
let result = query_team_member_states("atm-dev");
unsafe {
match old_home {
Some(v) => std::env::set_var("ATM_HOME", v),
None => std::env::remove_var("ATM_HOME"),
}
}
handle.join().expect("mock daemon thread");
let err = result.expect_err("invalid payload must return Err");
assert!(
err.to_string()
.contains("invalid canonical member-state payload"),
"unexpected error: {err}"
);
});
}
#[test]
fn test_agent_pane_info_deserialization() {
let json = r#"{"pane_id":"%42","log_path":"/home/user/.claude/logs/arch-ctm.log"}"#;
let info: AgentPaneInfo = serde_json::from_str(json).unwrap();
assert_eq!(info.pane_id, "%42");
assert_eq!(info.log_path, "/home/user/.claude/logs/arch-ctm.log");
}
#[test]
#[serial]
fn test_query_agent_pane_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_agent_pane("arch-ctm");
assert!(result.is_ok());
});
}
#[test]
fn test_launch_config_serialization() {
let mut env_vars = std::collections::HashMap::new();
env_vars.insert("EXTRA_VAR".to_string(), "value".to_string());
let config = LaunchConfig {
agent: "arch-ctm".to_string(),
team: "atm-dev".to_string(),
command: "codex --yolo".to_string(),
prompt: Some("Review the bridge module".to_string()),
timeout_secs: 30,
env_vars,
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let json = serde_json::to_string(&config).unwrap();
let decoded: LaunchConfig = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.team, "atm-dev");
assert_eq!(decoded.command, "codex --yolo");
assert_eq!(decoded.prompt.as_deref(), Some("Review the bridge module"));
assert_eq!(decoded.timeout_secs, 30);
assert_eq!(decoded.runtime.as_deref(), Some("codex"));
assert!(decoded.resume_session_id.is_none());
assert_eq!(
decoded.env_vars.get("EXTRA_VAR").map(String::as_str),
Some("value")
);
}
#[test]
fn test_launch_config_no_prompt_serialization() {
let config = LaunchConfig {
agent: "worker-1".to_string(),
team: "my-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 60,
env_vars: std::collections::HashMap::new(),
runtime: None,
resume_session_id: Some("sess-123".to_string()),
};
let json = serde_json::to_string(&config).unwrap();
let decoded: LaunchConfig = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "worker-1");
assert!(decoded.prompt.is_none());
assert!(decoded.env_vars.is_empty());
assert!(decoded.runtime.is_none());
assert_eq!(decoded.resume_session_id.as_deref(), Some("sess-123"));
}
#[test]
fn test_launch_result_serialization() {
let result = LaunchResult {
agent: "arch-ctm".to_string(),
pane_id: "%42".to_string(),
state: "launching".to_string(),
warning: None,
};
let json = serde_json::to_string(&result).unwrap();
let decoded: LaunchResult = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.pane_id, "%42");
assert_eq!(decoded.state, "launching");
assert!(decoded.warning.is_none());
}
#[test]
fn test_launch_result_with_warning_serialization() {
let result = LaunchResult {
agent: "arch-ctm".to_string(),
pane_id: "%7".to_string(),
state: "launching".to_string(),
warning: Some("Readiness timeout reached".to_string()),
};
let json = serde_json::to_string(&result).unwrap();
let decoded: LaunchResult = serde_json::from_str(&json).unwrap();
assert_eq!(
decoded.warning.as_deref(),
Some("Readiness timeout reached")
);
}
#[test]
fn test_session_query_result_serialization() {
let result = SessionQueryResult {
session_id: "abc123".to_string(),
process_id: 12345,
alive: true,
last_seen_at: Some("2026-03-10T00:00:00Z".to_string()),
runtime: None,
runtime_session_id: None,
pane_id: None,
runtime_home: None,
};
let json = serde_json::to_string(&result).unwrap();
let decoded: SessionQueryResult = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.session_id, "abc123");
assert_eq!(decoded.process_id, 12345);
assert!(decoded.alive);
}
#[test]
fn test_session_query_result_dead() {
let json = r#"{"session_id":"xyz789","process_id":99,"alive":false}"#;
let result: SessionQueryResult = serde_json::from_str(json).unwrap();
assert_eq!(result.session_id, "xyz789");
assert_eq!(result.process_id, 99);
assert!(!result.alive);
assert!(result.last_seen_at.is_none());
assert!(result.runtime.is_none());
assert!(result.runtime_session_id.is_none());
}
#[test]
#[serial]
fn test_query_session_no_daemon_returns_none() {
with_autostart_disabled(|| {
let result = query_session("team-lead");
assert!(result.is_ok());
});
}
#[test]
#[serial]
fn test_launch_agent_no_daemon_returns_none() {
with_autostart_disabled(|| {
if daemon_is_running() {
return;
}
let config = LaunchConfig {
agent: "test-agent".to_string(),
team: "test-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 5,
env_vars: std::collections::HashMap::new(),
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let result = launch_agent(&config);
assert!(result.is_ok());
});
}
#[test]
#[serial]
fn test_register_hint_no_daemon_is_silent_skip() {
with_autostart_disabled(|| {
if daemon_is_running() {
return;
}
let outcome = register_hint(
"atm-dev",
"arch-ctm",
"sess-arch-ctm-test-1234",
1234,
Some("codex"),
Some("thread-id:arch-ctm-test-1234"),
None,
None,
)
.expect("register-hint must not error when daemon unavailable");
assert_eq!(outcome, RegisterHintOutcome::DaemonUnavailable);
});
}
#[test]
fn test_agent_summary_serialization() {
let summary = AgentSummary {
agent: "arch-ctm".to_string(),
state: "idle".to_string(),
};
let json = serde_json::to_string(&summary).unwrap();
let decoded: AgentSummary = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.state, "idle");
}
#[test]
fn test_canonical_member_state_serialization() {
let state = CanonicalMemberState {
agent: "arch-ctm".to_string(),
state: "active".to_string(),
activity: "busy".to_string(),
session_id: Some("sess-123".to_string()),
process_id: Some(4242),
last_alive_at: Some("2026-03-08T00:00:00Z".to_string()),
reason: "session active with live pid".to_string(),
source: "session_registry".to_string(),
in_config: true,
};
let json = serde_json::to_string(&state).unwrap();
let decoded: CanonicalMemberState = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.agent, "arch-ctm");
assert_eq!(decoded.state, "active");
assert_eq!(decoded.activity, "busy");
assert_eq!(decoded.session_id.as_deref(), Some("sess-123"));
assert_eq!(decoded.process_id, Some(4242));
assert_eq!(
decoded.last_alive_at.as_deref(),
Some("2026-03-08T00:00:00Z")
);
assert!(decoded.in_config);
}
#[test]
fn test_canonical_status_activity_labels_and_liveness() {
let active = CanonicalMemberState {
agent: "arch-ctm".to_string(),
state: "active".to_string(),
activity: "busy".to_string(),
session_id: None,
process_id: None,
last_alive_at: None,
reason: String::new(),
source: String::new(),
in_config: true,
};
let idle = CanonicalMemberState {
state: "idle".to_string(),
activity: "idle".to_string(),
..active.clone()
};
let dead = CanonicalMemberState {
state: "offline".to_string(),
activity: "unknown".to_string(),
..active.clone()
};
assert_eq!(canonical_status_label(Some(&active)), "Active");
assert_eq!(canonical_status_label(Some(&idle)), "Idle");
assert_eq!(canonical_status_label(Some(&dead)), "Dead");
assert_eq!(canonical_status_label(None), "Unknown");
assert_eq!(canonical_activity_label(Some(&active)), "Busy");
assert_eq!(canonical_activity_label(Some(&idle)), "Idle");
assert_eq!(canonical_activity_label(Some(&dead)), "Unknown");
assert_eq!(canonical_activity_label(None), "Unknown");
assert_eq!(canonical_liveness_bool(Some(&active)), Some(true));
assert_eq!(canonical_liveness_bool(Some(&idle)), Some(true));
assert_eq!(canonical_liveness_bool(Some(&dead)), Some(false));
assert_eq!(canonical_liveness_bool(None), None);
}
#[test]
fn test_decode_canonical_member_states_payload_rejects_invalid_schema() {
let invalid = serde_json::json!({
"agent": "arch-ctm",
"state": "active"
});
let err = decode_canonical_member_states_payload(invalid).unwrap_err();
assert!(
err.to_string()
.contains("invalid canonical member-state payload")
);
}
#[test]
fn test_decode_canonical_member_states_payload_accepts_valid_schema() {
let valid = serde_json::json!([
{
"agent": "arch-ctm",
"state": "active",
"activity": "busy",
"session_id": "sess-1",
"process_id": 1234,
"reason": "session active",
"source": "session_registry",
"in_config": false
}
]);
let states = decode_canonical_member_states_payload(valid).expect("valid payload");
assert_eq!(states.len(), 1);
assert_eq!(states[0].agent, "arch-ctm");
assert_eq!(states[0].state, "active");
assert!(!states[0].in_config);
}
#[test]
fn test_decode_canonical_member_state_defaults_in_config_true_when_missing() {
let json = r#"{
"agent":"arch-ctm",
"state":"active",
"activity":"busy",
"reason":"session active",
"source":"session_registry"
}"#;
let state: CanonicalMemberState = serde_json::from_str(json).expect("decode");
assert!(state.in_config);
}
#[test]
fn test_decode_register_hint_response_ok_registered() {
let response = SocketResponse {
version: PROTOCOL_VERSION,
request_id: "req-1".to_string(),
status: "ok".to_string(),
payload: Some(serde_json::json!({ "processed": true })),
error: None,
};
let outcome = decode_register_hint_response(response).expect("ok response");
assert_eq!(outcome, RegisterHintOutcome::Registered);
}
#[test]
fn test_decode_register_hint_response_unknown_command_maps_to_unsupported() {
let response = SocketResponse {
version: PROTOCOL_VERSION,
request_id: "req-1".to_string(),
status: "error".to_string(),
payload: None,
error: Some(SocketError {
code: "UNKNOWN_COMMAND".to_string(),
message: "Unknown command: 'register-hint'".to_string(),
}),
};
let outcome = decode_register_hint_response(response).expect("unknown command handled");
assert_eq!(outcome, RegisterHintOutcome::UnsupportedDaemon);
}
#[cfg(unix)]
#[test]
fn test_pid_alive_current_process() {
let pid = std::process::id() as i32;
assert!(pid_alive(pid));
}
#[cfg(unix)]
#[test]
fn test_pid_alive_nonexistent_pid() {
assert!(!pid_alive(i32::MAX));
}
#[test]
#[serial]
fn test_ensure_daemon_running_reads_atm_daemon_bin() {
if daemon_is_running() {
return;
}
unsafe {
std::env::set_var("ATM_DAEMON_BIN", "/nonexistent-bin-for-atm-test");
}
let result = ensure_daemon_running();
unsafe {
std::env::remove_var("ATM_DAEMON_BIN");
}
#[cfg(unix)]
assert!(
result.is_err(),
"spawn of nonexistent binary must return Err on Unix"
);
#[cfg(not(unix))]
assert!(
result.is_ok(),
"ensure_daemon_running is a no-op on non-Unix"
);
}
#[test]
fn lifecycle_source_kind_serializes_snake_case() {
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::ClaudeHook).unwrap(),
"\"claude_hook\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::AtmMcp).unwrap(),
"\"atm_mcp\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::AgentHook).unwrap(),
"\"agent_hook\""
);
assert_eq!(
serde_json::to_string(&LifecycleSourceKind::Unknown).unwrap(),
"\"unknown\""
);
}
#[test]
fn lifecycle_source_kind_deserializes_snake_case() {
let kind: LifecycleSourceKind = serde_json::from_str("\"claude_hook\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::ClaudeHook);
let kind: LifecycleSourceKind = serde_json::from_str("\"atm_mcp\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::AtmMcp);
let kind: LifecycleSourceKind = serde_json::from_str("\"agent_hook\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::AgentHook);
let kind: LifecycleSourceKind = serde_json::from_str("\"unknown\"").unwrap();
assert_eq!(kind, LifecycleSourceKind::Unknown);
}
#[test]
fn lifecycle_source_round_trip() {
let src = LifecycleSource::new(LifecycleSourceKind::AtmMcp);
let json = serde_json::to_string(&src).unwrap();
assert!(json.contains("\"atm_mcp\""), "serialized: {json}");
let decoded: LifecycleSource = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.kind, LifecycleSourceKind::AtmMcp);
}
#[test]
fn hook_event_payload_without_source_is_backward_compatible() {
let json = r#"{
"version": 1,
"request_id": "req-test",
"command": "hook-event",
"payload": {
"event": "session_start",
"agent": "team-lead",
"team": "atm-dev",
"session_id": "abc-123"
}
}"#;
let req: SocketRequest = serde_json::from_str(json).unwrap();
assert!(req.payload.get("source").is_none());
assert_eq!(req.command, "hook-event");
}
#[test]
fn hook_event_payload_with_atm_mcp_source_parses() {
let json = r#"{
"version": 1,
"request_id": "req-mcp",
"command": "hook-event",
"payload": {
"event": "session_start",
"agent": "arch-ctm",
"team": "atm-dev",
"session_id": "codex:abc-123",
"source": {"kind": "atm_mcp"}
}
}"#;
let req: SocketRequest = serde_json::from_str(json).unwrap();
let source: LifecycleSource =
serde_json::from_value(req.payload["source"].clone()).unwrap();
assert_eq!(source.kind, LifecycleSourceKind::AtmMcp);
}
#[test]
fn test_send_control_no_daemon_returns_err() {
if daemon_is_running() {
return;
}
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-test-ctrl".to_string(),
msg_type: "control.stdin.request".to_string(),
signal: None,
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Stdin,
payload: Some("hello".to_string()),
content_ref: None,
elicitation_id: None,
decision: None,
};
let result = send_control(&req);
assert!(
result.is_err(),
"send_control should return Err when daemon is not running"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_daemon_returns_ok_none() {
let req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "req-win-test".to_string(),
command: "agent-state".to_string(),
payload: serde_json::json!({ "agent": "arch-ctm", "team": "atm-dev" }),
};
let result = query_daemon(&req);
assert!(
result.is_ok(),
"query_daemon must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_daemon must return Ok(None) on Windows (no Unix socket available)"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_is_running_returns_false() {
assert!(
!daemon_is_running(),
"daemon_is_running must return false on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_subscribe_stream_events_returns_ok_none() {
let result = subscribe_stream_events();
assert!(
result.is_ok(),
"subscribe_stream_events must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"subscribe_stream_events must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_agent_state_returns_ok_none() {
let result = query_agent_state("arch-ctm", "atm-dev");
assert!(
result.is_ok(),
"query_agent_state must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_agent_state must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_query_session_returns_ok_none() {
let result = query_session("team-lead");
assert!(
result.is_ok(),
"query_session must not return Err on Windows"
);
assert!(
result.unwrap().is_none(),
"query_session must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
#[serial]
fn windows_launch_agent_returns_ok_none() {
let config = LaunchConfig {
agent: "test-agent".to_string(),
team: "test-team".to_string(),
command: "codex --yolo".to_string(),
prompt: None,
timeout_secs: 5,
env_vars: std::collections::HashMap::new(),
runtime: Some("codex".to_string()),
resume_session_id: None,
};
let result = launch_agent(&config);
assert!(
result.is_ok(),
"launch_agent must not return Err on Windows (no daemon socket)"
);
assert!(
result.unwrap().is_none(),
"launch_agent must return Ok(None) on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_startup_lock_acquires_and_releases() {
use crate::io::lock::acquire_lock;
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let lock_dir = tmp.path().join("config").join("atm");
fs::create_dir_all(&lock_dir).unwrap();
let lock_path = lock_dir.join("daemon-start.lock");
let lock = acquire_lock(&lock_path, 3);
assert!(
lock.is_ok(),
"startup lock must be acquirable on Windows: {:?}",
lock.err()
);
drop(lock.unwrap());
let lock2 = acquire_lock(&lock_path, 1);
assert!(
lock2.is_ok(),
"startup lock must be re-acquirable after release on Windows"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_socket_path_has_correct_suffix() {
let path = daemon_socket_path().unwrap();
let s = path.to_string_lossy();
assert!(
s.ends_with("atm-daemon.sock"),
"daemon_socket_path must end with 'atm-daemon.sock' on Windows, got: {s}"
);
assert!(
s.contains(".atm") && s.contains("daemon"),
"daemon_socket_path must contain '.atm/daemon' on Windows, got: {s}"
);
}
#[cfg(windows)]
#[test]
fn windows_daemon_pid_path_has_correct_suffix() {
let path = daemon_pid_path().unwrap();
let s = path.to_string_lossy();
assert!(
s.ends_with("atm-daemon.pid"),
"daemon_pid_path must end with 'atm-daemon.pid' on Windows, got: {s}"
);
assert!(
s.contains(".atm") && s.contains("daemon"),
"daemon_pid_path must contain '.atm/daemon' on Windows, got: {s}"
);
}
#[cfg(windows)]
#[test]
fn windows_send_control_no_daemon_returns_err() {
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-win-ctrl".to_string(),
msg_type: "control.stdin.request".to_string(),
signal: None,
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Stdin,
payload: Some("hello".to_string()),
content_ref: None,
elicitation_id: None,
decision: None,
};
let result = send_control(&req);
assert!(
result.is_err(),
"send_control must return Err on Windows when daemon is not reachable"
);
}
#[test]
fn test_send_control_builds_correct_socket_request() {
use crate::control::{CONTROL_SCHEMA_VERSION, ControlAction, ControlRequest};
let req = ControlRequest {
v: CONTROL_SCHEMA_VERSION,
request_id: "req-ctrl-check".to_string(),
msg_type: "control.interrupt.request".to_string(),
signal: Some("interrupt".to_string()),
sent_at: "2026-02-21T00:00:00Z".to_string(),
team: "atm-dev".to_string(),
session_id: String::new(),
agent_id: "arch-ctm".to_string(),
sender: "tui".to_string(),
action: ControlAction::Interrupt,
payload: None,
content_ref: None,
elicitation_id: None,
decision: None,
};
let control_payload = serde_json::to_value(&req).expect("serialize ControlRequest");
let socket_req = SocketRequest {
version: PROTOCOL_VERSION,
request_id: "sock-test-123".to_string(),
command: "control".to_string(),
payload: control_payload,
};
assert_ne!(
socket_req.request_id, req.request_id,
"socket-level request_id must differ from control payload request_id"
);
assert_eq!(socket_req.request_id, "sock-test-123");
let json = serde_json::to_string(&socket_req).expect("serialize SocketRequest");
assert!(
json.contains("\"command\":\"control\""),
"command field missing"
);
assert!(
json.contains("\"request_id\":\"req-ctrl-check\""),
"control payload request_id must appear inside the payload body"
);
assert!(
json.contains("\"request_id\":\"sock-test-123\""),
"socket-level request_id must appear in the outer envelope"
);
assert!(
json.contains("\"type\":\"control.interrupt.request\""),
"msg_type field missing from control payload"
);
assert!(json.contains("\"interrupt\""), "interrupt signal missing");
}
}