use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use serde::{Deserialize, Serialize};
use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
use crate::tool_annotations::ToolKind;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FsWatchEvent {
pub kind: String,
pub paths: Vec<String>,
pub relative_paths: Vec<String>,
pub raw_kind: String,
pub error: Option<String>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub enum WorkerEvent {
WorkerSpawned,
WorkerProgressed,
WorkerWaitingForInput,
WorkerCompleted,
WorkerFailed,
WorkerCancelled,
}
impl WorkerEvent {
pub fn as_status(self) -> &'static str {
match self {
Self::WorkerSpawned => "running",
Self::WorkerProgressed => "progressed",
Self::WorkerWaitingForInput => "awaiting_input",
Self::WorkerCompleted => "completed",
Self::WorkerFailed => "failed",
Self::WorkerCancelled => "cancelled",
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::WorkerSpawned => "WorkerSpawned",
Self::WorkerProgressed => "WorkerProgressed",
Self::WorkerWaitingForInput => "WorkerWaitingForInput",
Self::WorkerCompleted => "WorkerCompleted",
Self::WorkerFailed => "WorkerFailed",
Self::WorkerCancelled => "WorkerCancelled",
}
}
pub fn is_terminal(self) -> bool {
matches!(
self,
Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ToolCallStatus {
Pending,
InProgress,
Completed,
Failed,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ToolCallErrorCategory {
SchemaValidation,
ToolError,
McpServerError,
HostBridgeError,
PermissionDenied,
RejectedLoop,
ParseAborted,
Timeout,
Network,
Cancelled,
Unknown,
}
impl ToolCallErrorCategory {
pub fn as_str(self) -> &'static str {
match self {
Self::SchemaValidation => "schema_validation",
Self::ToolError => "tool_error",
Self::McpServerError => "mcp_server_error",
Self::HostBridgeError => "host_bridge_error",
Self::PermissionDenied => "permission_denied",
Self::RejectedLoop => "rejected_loop",
Self::ParseAborted => "parse_aborted",
Self::Timeout => "timeout",
Self::Network => "network",
Self::Cancelled => "cancelled",
Self::Unknown => "unknown",
}
}
pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
use crate::value::ErrorCategory as Internal;
match category {
Internal::Timeout => Self::Timeout,
Internal::RateLimit
| Internal::Overloaded
| Internal::ServerError
| Internal::TransientNetwork => Self::Network,
Internal::SchemaValidation => Self::SchemaValidation,
Internal::ToolError => Self::ToolError,
Internal::ToolRejected => Self::PermissionDenied,
Internal::Cancelled => Self::Cancelled,
Internal::Auth
| Internal::EgressBlocked
| Internal::NotFound
| Internal::CircuitOpen
| Internal::BudgetExceeded
| Internal::Generic => Self::HostBridgeError,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ToolExecutor {
HarnBuiltin,
HostBridge,
McpServer { server_name: String },
ProviderNative,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
AgentMessageChunk {
session_id: String,
content: String,
},
AgentThoughtChunk {
session_id: String,
content: String,
},
ToolCall {
session_id: String,
tool_call_id: String,
tool_name: String,
kind: Option<ToolKind>,
status: ToolCallStatus,
raw_input: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
parsing: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
audit: Option<MutationSessionRecord>,
},
ToolCallUpdate {
session_id: String,
tool_call_id: String,
tool_name: String,
status: ToolCallStatus,
raw_output: Option<serde_json::Value>,
error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
execution_duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
error_category: Option<ToolCallErrorCategory>,
#[serde(default, skip_serializing_if = "Option::is_none")]
executor: Option<ToolExecutor>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parsing: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
raw_input: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
raw_input_partial: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
audit: Option<MutationSessionRecord>,
},
Plan {
session_id: String,
plan: serde_json::Value,
},
TurnStart {
session_id: String,
iteration: usize,
},
TurnEnd {
session_id: String,
iteration: usize,
turn_info: serde_json::Value,
},
JudgeDecision {
session_id: String,
iteration: usize,
verdict: String,
reasoning: String,
next_step: Option<String>,
judge_duration_ms: u64,
},
TypedCheckpoint {
session_id: String,
checkpoint: serde_json::Value,
},
FeedbackInjected {
session_id: String,
kind: String,
content: String,
},
BudgetExhausted {
session_id: String,
max_iterations: usize,
},
LoopStuck {
session_id: String,
max_nudges: usize,
last_iteration: usize,
tail_excerpt: String,
},
DaemonWatchdogTripped {
session_id: String,
attempts: usize,
elapsed_ms: u64,
},
SkillActivated {
session_id: String,
skill_name: String,
iteration: usize,
reason: String,
},
SkillDeactivated {
session_id: String,
skill_name: String,
iteration: usize,
},
SkillScopeTools {
session_id: String,
skill_name: String,
allowed_tools: Vec<String>,
},
ToolSearchQuery {
session_id: String,
tool_use_id: String,
name: String,
query: serde_json::Value,
strategy: String,
mode: String,
},
ToolSearchResult {
session_id: String,
tool_use_id: String,
promoted: Vec<String>,
strategy: String,
mode: String,
},
TranscriptCompacted {
session_id: String,
mode: String,
strategy: String,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
snapshot_asset_id: Option<String>,
},
Handoff {
session_id: String,
artifact_id: String,
handoff: Box<HandoffArtifact>,
},
FsWatch {
session_id: String,
subscription_id: String,
events: Vec<FsWatchEvent>,
},
WorkerUpdate {
session_id: String,
worker_id: String,
worker_name: String,
worker_task: String,
worker_mode: String,
event: WorkerEvent,
status: String,
metadata: serde_json::Value,
audit: Option<serde_json::Value>,
},
HitlRequested {
session_id: String,
request_id: String,
kind: String,
payload: serde_json::Value,
},
HitlResolved {
session_id: String,
request_id: String,
kind: String,
outcome: String,
},
LoopControlDecision {
session_id: String,
iteration: usize,
action: String,
old_limit: usize,
new_limit: usize,
reason: String,
status: String,
},
}
impl AgentEvent {
pub fn session_id(&self) -> &str {
match self {
Self::AgentMessageChunk { session_id, .. }
| Self::AgentThoughtChunk { session_id, .. }
| Self::ToolCall { session_id, .. }
| Self::ToolCallUpdate { session_id, .. }
| Self::Plan { session_id, .. }
| Self::TurnStart { session_id, .. }
| Self::TurnEnd { session_id, .. }
| Self::JudgeDecision { session_id, .. }
| Self::TypedCheckpoint { session_id, .. }
| Self::FeedbackInjected { session_id, .. }
| Self::BudgetExhausted { session_id, .. }
| Self::LoopStuck { session_id, .. }
| Self::DaemonWatchdogTripped { session_id, .. }
| Self::SkillActivated { session_id, .. }
| Self::SkillDeactivated { session_id, .. }
| Self::SkillScopeTools { session_id, .. }
| Self::ToolSearchQuery { session_id, .. }
| Self::ToolSearchResult { session_id, .. }
| Self::TranscriptCompacted { session_id, .. }
| Self::Handoff { session_id, .. }
| Self::FsWatch { session_id, .. }
| Self::WorkerUpdate { session_id, .. }
| Self::HitlRequested { session_id, .. }
| Self::HitlResolved { session_id, .. }
| Self::LoopControlDecision { session_id, .. } => session_id,
}
}
}
pub trait AgentEventSink: Send + Sync {
fn handle_event(&self, event: &AgentEvent);
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersistedAgentEvent {
pub index: u64,
pub emitted_at_ms: i64,
pub frame_depth: Option<u32>,
#[serde(flatten)]
pub event: AgentEvent,
}
pub struct JsonlEventSink {
state: Mutex<JsonlEventSinkState>,
base_path: std::path::PathBuf,
}
struct JsonlEventSinkState {
writer: std::io::BufWriter<std::fs::File>,
index: u64,
bytes_written: u64,
rotation: u32,
}
impl JsonlEventSink {
pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
let base_path = base_path.into();
if let Some(parent) = base_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&base_path)?;
Ok(Arc::new(Self {
state: Mutex::new(JsonlEventSinkState {
writer: std::io::BufWriter::new(file),
index: 0,
bytes_written: 0,
rotation: 0,
}),
base_path,
}))
}
pub fn flush(&self) -> std::io::Result<()> {
use std::io::Write as _;
self.state
.lock()
.expect("jsonl sink mutex poisoned")
.writer
.flush()
}
pub fn event_count(&self) -> u64 {
self.state.lock().expect("jsonl sink mutex poisoned").index
}
fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
use std::io::Write as _;
if state.bytes_written < Self::ROTATE_BYTES {
return Ok(());
}
state.writer.flush()?;
state.rotation += 1;
let suffix = format!("-{:06}", state.rotation);
let rotated = self.base_path.with_file_name({
let stem = self
.base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("event_log");
let ext = self
.base_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("jsonl");
format!("{stem}{suffix}.{ext}")
});
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&rotated)?;
state.writer = std::io::BufWriter::new(file);
state.bytes_written = 0;
Ok(())
}
}
pub struct EventLogSink {
log: Arc<AnyEventLog>,
topic: Topic,
session_id: String,
}
impl EventLogSink {
pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
let session_id = session_id.into();
let topic = Topic::new(format!(
"observability.agent_events.{}",
crate::event_log::sanitize_topic_component(&session_id)
))
.expect("session id should sanitize to a valid topic");
Arc::new(Self {
log,
topic,
session_id,
})
}
}
impl AgentEventSink for JsonlEventSink {
fn handle_event(&self, event: &AgentEvent) {
use std::io::Write as _;
let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
let index = state.index;
state.index += 1;
let emitted_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let envelope = PersistedAgentEvent {
index,
emitted_at_ms,
frame_depth: None,
event: event.clone(),
};
if let Ok(line) = serde_json::to_string(&envelope) {
let _ = state.writer.write_all(line.as_bytes());
let _ = state.writer.write_all(b"\n");
state.bytes_written += line.len() as u64 + 1;
let _ = self.rotate_if_needed(&mut state);
}
}
}
impl AgentEventSink for EventLogSink {
fn handle_event(&self, event: &AgentEvent) {
let event_json = match serde_json::to_value(event) {
Ok(value) => value,
Err(_) => return,
};
let event_kind = event_json
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("agent_event")
.to_string();
let payload = serde_json::json!({
"index_hint": now_ms(),
"session_id": self.session_id,
"event": event_json,
});
let mut headers = std::collections::BTreeMap::new();
headers.insert("session_id".to_string(), self.session_id.clone());
let log = self.log.clone();
let topic = self.topic.clone();
let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = log.append(&topic, record).await;
});
} else {
let _ = futures::executor::block_on(log.append(&topic, record));
}
}
}
impl Drop for JsonlEventSink {
fn drop(&mut self) {
if let Ok(mut state) = self.state.lock() {
use std::io::Write as _;
let _ = state.writer.flush();
}
}
}
pub struct MultiSink {
sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
}
impl MultiSink {
pub fn new() -> Self {
Self {
sinks: Mutex::new(Vec::new()),
}
}
pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
self.sinks.lock().expect("sink mutex poisoned").push(sink);
}
pub fn len(&self) -> usize {
self.sinks.lock().expect("sink mutex poisoned").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for MultiSink {
fn default() -> Self {
Self::new()
}
}
impl AgentEventSink for MultiSink {
fn handle_event(&self, event: &AgentEvent) {
let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
for sink in sinks {
sink.handle_event(event);
}
}
}
#[cfg(test)]
#[derive(Clone)]
struct RegisteredSink {
owner: std::thread::ThreadId,
sink: Arc<dyn AgentEventSink>,
}
#[cfg(not(test))]
type RegisteredSink = Arc<dyn AgentEventSink>;
type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
fn external_sinks() -> &'static ExternalSinkRegistry {
static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
}
pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
let session_id = session_id.into();
let mut reg = external_sinks().write().expect("sink registry poisoned");
#[cfg(test)]
let sink = RegisteredSink {
owner: std::thread::current().id(),
sink,
};
reg.entry(session_id).or_default().push(sink);
}
pub fn clear_session_sinks(session_id: &str) {
#[cfg(test)]
{
let owner = std::thread::current().id();
let mut reg = external_sinks().write().expect("sink registry poisoned");
if let Some(sinks) = reg.get_mut(session_id) {
sinks.retain(|sink| sink.owner != owner);
if sinks.is_empty() {
reg.remove(session_id);
}
}
}
#[cfg(not(test))]
{
external_sinks()
.write()
.expect("sink registry poisoned")
.remove(session_id);
}
}
pub fn reset_all_sinks() {
#[cfg(test)]
{
let owner = std::thread::current().id();
let mut reg = external_sinks().write().expect("sink registry poisoned");
reg.retain(|_, sinks| {
sinks.retain(|sink| sink.owner != owner);
!sinks.is_empty()
});
crate::agent_sessions::reset_session_store();
}
#[cfg(not(test))]
{
external_sinks()
.write()
.expect("sink registry poisoned")
.clear();
crate::agent_sessions::reset_session_store();
}
}
pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
if source_session_id.is_empty() || target_session_id.is_empty() {
return;
}
if source_session_id == target_session_id {
return;
}
let mut reg = external_sinks().write().expect("sink registry poisoned");
let Some(source_sinks) = reg.get(source_session_id).cloned() else {
return;
};
let target = reg.entry(target_session_id.to_string()).or_default();
#[cfg(test)]
{
for source in source_sinks {
let already_present = target
.iter()
.any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
if !already_present {
target.push(source);
}
}
}
#[cfg(not(test))]
{
for source in source_sinks {
let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
if !already_present {
target.push(source);
}
}
}
}
pub fn emit_event(event: &AgentEvent) {
let sinks: Vec<Arc<dyn AgentEventSink>> = {
let reg = external_sinks().read().expect("sink registry poisoned");
#[cfg(test)]
{
let owner = std::thread::current().id();
reg.get(event.session_id())
.map(|sinks| {
sinks
.iter()
.filter(|sink| sink.owner == owner)
.map(|sink| sink.sink.clone())
.collect()
})
.unwrap_or_default()
}
#[cfg(not(test))]
{
reg.get(event.session_id()).cloned().unwrap_or_default()
}
};
for sink in sinks {
sink.handle_event(event);
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0)
}
pub fn session_external_sink_count(session_id: &str) -> usize {
#[cfg(test)]
{
let owner = std::thread::current().id();
return external_sinks()
.read()
.expect("sink registry poisoned")
.get(session_id)
.map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
.unwrap_or(0);
}
#[cfg(not(test))]
{
external_sinks()
.read()
.expect("sink registry poisoned")
.get(session_id)
.map(|v| v.len())
.unwrap_or(0)
}
}
pub fn session_closure_subscriber_count(session_id: &str) -> usize {
crate::agent_sessions::subscriber_count(session_id)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingSink(Arc<AtomicUsize>);
impl AgentEventSink for CountingSink {
fn handle_event(&self, _event: &AgentEvent) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn multi_sink_fans_out_in_order() {
let multi = MultiSink::new();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
multi.push(Arc::new(CountingSink(a.clone())));
multi.push(Arc::new(CountingSink(b.clone())));
let event = AgentEvent::TurnStart {
session_id: "s1".into(),
iteration: 1,
};
multi.handle_event(&event);
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
}
#[test]
fn session_scoped_sink_routing() {
reset_all_sinks();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
register_sink("session-a", Arc::new(CountingSink(a.clone())));
register_sink("session-b", Arc::new(CountingSink(b.clone())));
emit_event(&AgentEvent::TurnStart {
session_id: "session-a".into(),
iteration: 0,
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 0);
emit_event(&AgentEvent::TurnEnd {
session_id: "session-b".into(),
iteration: 0,
turn_info: serde_json::json!({}),
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
clear_session_sinks("session-a");
assert_eq!(session_external_sink_count("session-a"), 0);
assert_eq!(session_external_sink_count("session-b"), 1);
reset_all_sinks();
}
#[test]
fn newly_opened_child_session_inherits_current_external_sinks() {
reset_all_sinks();
let delivered = Arc::new(AtomicUsize::new(0));
register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
{
let _guard = crate::agent_sessions::enter_current_session("outer-session");
let inner = crate::agent_sessions::open_or_create(None);
assert_ne!(inner, "outer-session");
emit_event(&AgentEvent::TurnStart {
session_id: inner,
iteration: 0,
});
}
assert_eq!(delivered.load(Ordering::SeqCst), 1);
reset_all_sinks();
}
#[test]
fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
for i in 0..5 {
sink.handle_event(&AgentEvent::TurnStart {
session_id: "s".into(),
iteration: i,
});
}
assert_eq!(sink.event_count(), 5);
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let mut last_idx: i64 = -1;
let mut last_ts: i64 = 0;
for line in BufReader::new(file).lines() {
let line = line.unwrap();
let val: serde_json::Value = serde_json::from_str(&line).unwrap();
let idx = val["index"].as_i64().unwrap();
let ts = val["emitted_at_ms"].as_i64().unwrap();
assert_eq!(idx, last_idx + 1, "indices must be contiguous");
assert!(ts >= last_ts, "timestamps must be non-decreasing");
last_idx = idx;
last_ts = ts;
assert_eq!(val["type"], "turn_start");
}
assert_eq!(last_idx, 4);
let _ = std::fs::remove_file(&path);
}
#[test]
fn judge_decision_round_trips_through_jsonl_sink() {
use std::io::{BufRead, BufReader};
let dir =
std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
sink.handle_event(&AgentEvent::JudgeDecision {
session_id: "s".into(),
iteration: 2,
verdict: "continue".into(),
reasoning: "needs a concrete next step".into(),
next_step: Some("run the verifier".into()),
judge_duration_ms: 17,
});
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let line = BufReader::new(file).lines().next().unwrap().unwrap();
let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
match recovered.event {
AgentEvent::JudgeDecision {
session_id,
iteration,
verdict,
reasoning,
next_step,
judge_duration_ms,
} => {
assert_eq!(session_id, "s");
assert_eq!(iteration, 2);
assert_eq!(verdict, "continue");
assert_eq!(reasoning, "needs a concrete next step");
assert_eq!(next_step.as_deref(), Some("run the verifier"));
assert_eq!(judge_duration_ms, 17);
}
other => panic!("expected JudgeDecision, got {other:?}"),
}
let value: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(value["type"], "judge_decision");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
let terminal = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: Some(42),
execution_duration_ms: Some(7),
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let value = serde_json::to_value(&terminal).unwrap();
assert_eq!(value["duration_ms"], serde_json::json!(42));
assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
let intermediate = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::InProgress,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let value = serde_json::to_value(&intermediate).unwrap();
let object = value.as_object().expect("update serializes as object");
assert!(
!object.contains_key("duration_ms"),
"duration_ms must be omitted when None: {value}"
);
assert!(
!object.contains_key("execution_duration_ms"),
"execution_duration_ms must be omitted when None: {value}"
);
}
#[test]
fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
let raw = serde_json::json!({
"type": "tool_call_update",
"session_id": "s",
"tool_call_id": "tc-1",
"tool_name": "read",
"status": "completed",
"raw_output": null,
"error": null,
});
let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
match event {
AgentEvent::ToolCallUpdate {
duration_ms,
execution_duration_ms,
..
} => {
assert!(duration_ms.is_none());
assert!(execution_duration_ms.is_none());
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
#[test]
fn tool_call_status_serde() {
assert_eq!(
serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
"\"pending\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
"\"in_progress\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
"\"completed\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
"\"failed\""
);
}
#[test]
fn tool_call_error_category_serializes_as_snake_case() {
let pairs = [
(ToolCallErrorCategory::SchemaValidation, "schema_validation"),
(ToolCallErrorCategory::ToolError, "tool_error"),
(ToolCallErrorCategory::McpServerError, "mcp_server_error"),
(ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
(ToolCallErrorCategory::PermissionDenied, "permission_denied"),
(ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
(ToolCallErrorCategory::ParseAborted, "parse_aborted"),
(ToolCallErrorCategory::Timeout, "timeout"),
(ToolCallErrorCategory::Network, "network"),
(ToolCallErrorCategory::Cancelled, "cancelled"),
(ToolCallErrorCategory::Unknown, "unknown"),
];
for (variant, wire) in pairs {
let encoded = serde_json::to_string(&variant).unwrap();
assert_eq!(encoded, format!("\"{wire}\""));
assert_eq!(variant.as_str(), wire);
let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, variant);
}
}
#[test]
fn tool_executor_round_trips_with_adjacent_tag() {
for executor in [
ToolExecutor::HarnBuiltin,
ToolExecutor::HostBridge,
ToolExecutor::McpServer {
server_name: "linear".to_string(),
},
ToolExecutor::ProviderNative,
] {
let json = serde_json::to_value(&executor).unwrap();
let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
match &executor {
ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
ToolExecutor::McpServer { server_name } => {
assert_eq!(kind, "mcp_server");
assert_eq!(json["server_name"], *server_name);
}
ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
}
let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
assert_eq!(recovered, executor);
}
}
#[test]
fn tool_call_error_category_from_internal_collapses_transient_family() {
use crate::value::ErrorCategory as Internal;
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::Timeout),
ToolCallErrorCategory::Timeout
);
for net in [
Internal::RateLimit,
Internal::Overloaded,
Internal::ServerError,
Internal::TransientNetwork,
] {
assert_eq!(
ToolCallErrorCategory::from_internal(&net),
ToolCallErrorCategory::Network,
"{net:?} should map to Network",
);
}
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
ToolCallErrorCategory::SchemaValidation
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::ToolError),
ToolCallErrorCategory::ToolError
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
ToolCallErrorCategory::PermissionDenied
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::Cancelled),
ToolCallErrorCategory::Cancelled
);
for bridge in [
Internal::Auth,
Internal::EgressBlocked,
Internal::NotFound,
Internal::CircuitOpen,
Internal::Generic,
] {
assert_eq!(
ToolCallErrorCategory::from_internal(&bridge),
ToolCallErrorCategory::HostBridgeError,
"{bridge:?} should map to HostBridgeError",
);
}
}
#[test]
fn tool_call_update_event_omits_error_category_when_none() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "t".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let v = serde_json::to_value(&event).unwrap();
assert_eq!(v["type"], "tool_call_update");
assert!(v.get("error_category").is_none());
}
#[test]
fn tool_call_update_event_serializes_error_category_when_set() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "t".into(),
tool_name: "read".into(),
status: ToolCallStatus::Failed,
raw_output: None,
error: Some("missing required field".into()),
duration_ms: None,
execution_duration_ms: None,
error_category: Some(ToolCallErrorCategory::SchemaValidation),
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let v = serde_json::to_value(&event).unwrap();
assert_eq!(v["error_category"], "schema_validation");
assert_eq!(v["error"], "missing required field");
}
#[test]
fn tool_call_update_omits_executor_when_absent() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("executor").is_none(), "got: {json}");
}
#[test]
fn worker_event_status_strings_cover_all_variants() {
assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
assert_eq!(
WorkerEvent::WorkerWaitingForInput.as_status(),
"awaiting_input"
);
assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
for terminal in [
WorkerEvent::WorkerCompleted,
WorkerEvent::WorkerFailed,
WorkerEvent::WorkerCancelled,
] {
assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
}
for non_terminal in [
WorkerEvent::WorkerSpawned,
WorkerEvent::WorkerProgressed,
WorkerEvent::WorkerWaitingForInput,
] {
assert!(
!non_terminal.is_terminal(),
"{non_terminal:?} should not be terminal"
);
}
}
#[test]
fn worker_update_event_routes_through_session_keyed_sink() {
reset_all_sinks();
let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
impl AgentEventSink for CapturingSink {
fn handle_event(&self, event: &AgentEvent) {
self.0
.lock()
.expect("captured sink mutex poisoned")
.push(event.clone());
}
}
register_sink(
"worker-session-1",
Arc::new(CapturingSink(captured.clone())),
);
emit_event(&AgentEvent::WorkerUpdate {
session_id: "worker-session-1".into(),
worker_id: "worker_42".into(),
worker_name: "review_captain".into(),
worker_task: "review pr".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerWaitingForInput,
status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
audit: None,
});
emit_event(&AgentEvent::WorkerUpdate {
session_id: "other-session".into(),
worker_id: "w2".into(),
worker_name: "n2".into(),
worker_task: "t2".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerCompleted,
status: "completed".into(),
metadata: serde_json::json!({}),
audit: None,
});
let received = captured.lock().unwrap().clone();
assert_eq!(received.len(), 1, "got: {received:?}");
match &received[0] {
AgentEvent::WorkerUpdate {
session_id,
worker_id,
event,
status,
..
} => {
assert_eq!(session_id, "worker-session-1");
assert_eq!(worker_id, "worker_42");
assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
assert_eq!(status, "awaiting_input");
}
other => panic!("expected WorkerUpdate, got {other:?}"),
}
reset_all_sinks();
}
#[test]
fn worker_update_event_serializes_to_canonical_shape() {
let event = AgentEvent::WorkerUpdate {
session_id: "s".into(),
worker_id: "w".into(),
worker_name: "n".into(),
worker_task: "t".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerProgressed,
status: "progressed".into(),
metadata: serde_json::json!({"started_at": "0193..."}),
audit: Some(serde_json::json!({"run_id": "run_x"})),
};
let value = serde_json::to_value(&event).unwrap();
assert_eq!(value["type"], "worker_update");
assert_eq!(value["session_id"], "s");
assert_eq!(value["worker_id"], "w");
assert_eq!(value["status"], "progressed");
assert_eq!(value["audit"]["run_id"], "run_x");
let recovered: AgentEvent = serde_json::from_value(value).unwrap();
match recovered {
AgentEvent::WorkerUpdate {
event: recovered_event,
..
} => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
other => panic!("expected WorkerUpdate, got {other:?}"),
}
}
#[test]
fn tool_call_update_includes_executor_when_present() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: Some(ToolExecutor::McpServer {
server_name: "github".into(),
}),
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["executor"]["kind"], "mcp_server");
assert_eq!(json["executor"]["server_name"], "github");
}
#[test]
fn tool_call_update_omits_audit_when_absent() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("audit").is_none(), "got: {json}");
}
#[test]
fn tool_call_update_includes_audit_when_present() {
let audit = MutationSessionRecord {
session_id: "session_42".into(),
run_id: Some("run_42".into()),
mutation_scope: "apply_workspace".into(),
execution_kind: Some("worker".into()),
..Default::default()
};
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "edit_file".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: Some(ToolExecutor::HostBridge),
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: Some(audit),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["audit"]["session_id"], "session_42");
assert_eq!(json["audit"]["run_id"], "run_42");
assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
assert_eq!(json["audit"]["execution_kind"], "worker");
}
#[test]
fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
let raw = serde_json::json!({
"type": "tool_call_update",
"session_id": "s",
"tool_call_id": "tc-1",
"tool_name": "read",
"status": "completed",
"raw_output": null,
"error": null,
});
let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
match event {
AgentEvent::ToolCallUpdate { audit, .. } => {
assert!(audit.is_none());
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
}