use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum DaemonStreamEvent {
TurnStarted {
agent: String,
thread_id: String,
turn_id: String,
transport: String,
},
TurnCompleted {
agent: String,
thread_id: String,
turn_id: String,
status: TurnStatusWire,
transport: String,
},
TurnIdle {
agent: String,
turn_id: String,
transport: String,
},
StreamError {
agent_id: String,
session_id: String,
error_summary: String,
},
DroppedCounters {
agent_id: String,
dropped: u64,
unknown: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TurnStatusWire {
Completed,
Interrupted,
Failed,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentStreamState {
pub turn_id: Option<String>,
pub thread_id: Option<String>,
pub transport: Option<String>,
pub turn_status: StreamTurnStatus,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum StreamTurnStatus {
#[default]
Idle,
Busy,
Terminal,
}
impl std::fmt::Display for StreamTurnStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Idle => write!(f, "idle"),
Self::Busy => write!(f, "busy"),
Self::Terminal => write!(f, "terminal"),
}
}
}
impl AgentStreamState {
pub fn apply(&mut self, event: &DaemonStreamEvent) {
match event {
DaemonStreamEvent::TurnStarted {
thread_id,
turn_id,
transport,
..
} => {
self.turn_id = Some(turn_id.clone());
self.thread_id = Some(thread_id.clone());
self.transport = Some(transport.clone());
self.turn_status = StreamTurnStatus::Busy;
}
DaemonStreamEvent::TurnCompleted {
thread_id,
turn_id,
transport,
..
} => {
self.turn_id = Some(turn_id.clone());
self.thread_id = Some(thread_id.clone());
self.transport = Some(transport.clone());
self.turn_status = StreamTurnStatus::Terminal;
}
DaemonStreamEvent::TurnIdle {
turn_id, transport, ..
} => {
self.turn_id = Some(turn_id.clone());
self.transport = Some(transport.clone());
self.turn_status = StreamTurnStatus::Idle;
}
DaemonStreamEvent::StreamError { .. } | DaemonStreamEvent::DroppedCounters { .. } => {
}
}
}
pub fn agent_from_event(event: &DaemonStreamEvent) -> &str {
match event {
DaemonStreamEvent::TurnStarted { agent, .. }
| DaemonStreamEvent::TurnCompleted { agent, .. }
| DaemonStreamEvent::TurnIdle { agent, .. } => agent,
DaemonStreamEvent::StreamError { agent_id, .. }
| DaemonStreamEvent::DroppedCounters { agent_id, .. } => agent_id,
}
}
}
impl DaemonStreamEvent {
pub fn agent(&self) -> &str {
match self {
Self::TurnStarted { agent, .. } => agent,
Self::TurnCompleted { agent, .. } => agent,
Self::TurnIdle { agent, .. } => agent,
Self::StreamError { agent_id, .. } => agent_id,
Self::DroppedCounters { agent_id, .. } => agent_id,
}
}
}
impl std::fmt::Display for DaemonStreamEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TurnStarted {
agent,
turn_id,
transport,
..
} => write!(
f,
"TurnStarted(agent={agent}, turn_id={turn_id}, transport={transport})"
),
Self::TurnCompleted {
agent,
turn_id,
transport,
..
} => write!(
f,
"TurnCompleted(agent={agent}, turn_id={turn_id}, transport={transport})"
),
Self::TurnIdle {
agent, transport, ..
} => write!(f, "TurnIdle(agent={agent}, transport={transport})"),
Self::StreamError {
agent_id,
session_id,
error_summary,
} => write!(
f,
"StreamError(agent_id={agent_id}, session_id={session_id}, error_summary={error_summary})"
),
Self::DroppedCounters {
agent_id,
dropped,
unknown,
} => write!(
f,
"DroppedCounters(agent_id={agent_id}, dropped={dropped}, unknown={unknown})"
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn daemon_stream_event_serialization_round_trip() {
let events = vec![
DaemonStreamEvent::TurnStarted {
agent: "arch-ctm".to_string(),
thread_id: "th-1".to_string(),
turn_id: "turn-abc".to_string(),
transport: "app-server".to_string(),
},
DaemonStreamEvent::TurnCompleted {
agent: "arch-ctm".to_string(),
thread_id: "th-1".to_string(),
turn_id: "turn-abc".to_string(),
status: TurnStatusWire::Completed,
transport: "app-server".to_string(),
},
DaemonStreamEvent::TurnIdle {
agent: "arch-ctm".to_string(),
turn_id: "turn-abc".to_string(),
transport: "cli-json".to_string(),
},
DaemonStreamEvent::StreamError {
agent_id: "arch-ctm".to_string(),
session_id: "th-1".to_string(),
error_summary: "socket closed".to_string(),
},
DaemonStreamEvent::DroppedCounters {
agent_id: "proxy:all".to_string(),
dropped: 3,
unknown: 2,
},
];
for event in &events {
let json = serde_json::to_string(event).expect("serialize");
let deserialized: DaemonStreamEvent = serde_json::from_str(&json).expect("deserialize");
assert_eq!(&deserialized, event, "round-trip mismatch for {json}");
}
}
#[test]
fn turn_status_wire_serialization() {
assert_eq!(
serde_json::to_string(&TurnStatusWire::Completed).unwrap(),
"\"completed\""
);
assert_eq!(
serde_json::to_string(&TurnStatusWire::Interrupted).unwrap(),
"\"interrupted\""
);
assert_eq!(
serde_json::to_string(&TurnStatusWire::Failed).unwrap(),
"\"failed\""
);
}
#[test]
fn agent_stream_state_apply_turn_started() {
let mut state = AgentStreamState::default();
let event = DaemonStreamEvent::TurnStarted {
agent: "a".to_string(),
thread_id: "th1".to_string(),
turn_id: "t1".to_string(),
transport: "app-server".to_string(),
};
state.apply(&event);
assert_eq!(state.turn_status, StreamTurnStatus::Busy);
assert_eq!(state.turn_id.as_deref(), Some("t1"));
assert_eq!(state.thread_id.as_deref(), Some("th1"));
assert_eq!(state.transport.as_deref(), Some("app-server"));
}
#[test]
fn agent_stream_state_apply_turn_completed() {
let mut state = AgentStreamState {
turn_status: StreamTurnStatus::Busy,
turn_id: Some("t1".into()),
..Default::default()
};
let event = DaemonStreamEvent::TurnCompleted {
agent: "a".to_string(),
thread_id: "th1".to_string(),
turn_id: "t1".to_string(),
status: TurnStatusWire::Failed,
transport: "cli-json".to_string(),
};
state.apply(&event);
assert_eq!(state.turn_status, StreamTurnStatus::Terminal);
}
#[test]
fn agent_stream_state_apply_turn_idle() {
let mut state = AgentStreamState {
turn_status: StreamTurnStatus::Terminal,
..Default::default()
};
let event = DaemonStreamEvent::TurnIdle {
agent: "a".to_string(),
turn_id: "t1".to_string(),
transport: "mcp".to_string(),
};
state.apply(&event);
assert_eq!(state.turn_status, StreamTurnStatus::Idle);
}
#[test]
fn agent_stream_state_apply_observability_events_no_state_change() {
let mut state = AgentStreamState {
turn_status: StreamTurnStatus::Busy,
turn_id: Some("t1".into()),
thread_id: Some("th1".into()),
transport: Some("app-server".into()),
};
state.apply(&DaemonStreamEvent::StreamError {
agent_id: "a".to_string(),
session_id: "th1".to_string(),
error_summary: "err".to_string(),
});
assert_eq!(state.turn_status, StreamTurnStatus::Busy);
state.apply(&DaemonStreamEvent::DroppedCounters {
agent_id: "proxy:all".to_string(),
dropped: 1,
unknown: 2,
});
assert_eq!(state.turn_status, StreamTurnStatus::Busy);
assert_eq!(state.turn_id.as_deref(), Some("t1"));
}
#[test]
fn agent_from_event_extracts_agent() {
let event = DaemonStreamEvent::TurnStarted {
agent: "test-agent".to_string(),
thread_id: String::new(),
turn_id: String::new(),
transport: String::new(),
};
assert_eq!(AgentStreamState::agent_from_event(&event), "test-agent");
}
#[test]
fn stream_turn_status_display() {
assert_eq!(format!("{}", StreamTurnStatus::Idle), "idle");
assert_eq!(format!("{}", StreamTurnStatus::Busy), "busy");
assert_eq!(format!("{}", StreamTurnStatus::Terminal), "terminal");
}
#[test]
fn stream_turn_status_default_is_idle() {
assert_eq!(StreamTurnStatus::default(), StreamTurnStatus::Idle);
}
#[test]
fn agent_stream_state_default_is_idle() {
let state = AgentStreamState::default();
assert_eq!(state.turn_status, StreamTurnStatus::Idle);
assert!(state.turn_id.is_none());
assert!(state.thread_id.is_none());
assert!(state.transport.is_none());
}
#[test]
fn daemon_stream_event_display_turn_started() {
let event = DaemonStreamEvent::TurnStarted {
agent: "arch-ctm".to_string(),
thread_id: "th-1".to_string(),
turn_id: "turn-abc".to_string(),
transport: "app-server".to_string(),
};
let s = event.to_string();
assert_eq!(
s,
"TurnStarted(agent=arch-ctm, turn_id=turn-abc, transport=app-server)"
);
}
#[test]
fn daemon_stream_event_display_turn_completed() {
let event = DaemonStreamEvent::TurnCompleted {
agent: "arch-ctm".to_string(),
thread_id: "th-1".to_string(),
turn_id: "turn-abc".to_string(),
status: TurnStatusWire::Completed,
transport: "mcp".to_string(),
};
let s = event.to_string();
assert_eq!(
s,
"TurnCompleted(agent=arch-ctm, turn_id=turn-abc, transport=mcp)"
);
}
#[test]
fn daemon_stream_event_display_turn_idle() {
let event = DaemonStreamEvent::TurnIdle {
agent: "arch-ctm".to_string(),
turn_id: "turn-abc".to_string(),
transport: "cli-json".to_string(),
};
let s = event.to_string();
assert_eq!(s, "TurnIdle(agent=arch-ctm, transport=cli-json)");
}
#[test]
fn daemon_stream_event_display_stream_error() {
let event = DaemonStreamEvent::StreamError {
agent_id: "arch-ctm".to_string(),
session_id: "th-1".to_string(),
error_summary: "socket closed".to_string(),
};
let s = event.to_string();
assert_eq!(
s,
"StreamError(agent_id=arch-ctm, session_id=th-1, error_summary=socket closed)"
);
}
}