use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use super::*;
use crate::llm::receipts::ToolCallReceipt;
use crate::orchestration::MutationSessionRecord;
struct CountingSink(Arc<AtomicUsize>);
impl AgentEventSink for CountingSink {
fn handle_event(&self, _event: &AgentEvent) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn multi_sink_fans_out_in_order() {
let multi = MultiSink::new();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
multi.push(Arc::new(CountingSink(a.clone())));
multi.push(Arc::new(CountingSink(b.clone())));
let event = AgentEvent::IterationStart {
session_id: "s1".into(),
iteration: 1,
provider: String::new(),
model: String::new(),
};
multi.handle_event(&event);
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
}
#[test]
fn session_scoped_sink_routing() {
reset_all_sinks();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
register_sink("session-a", Arc::new(CountingSink(a.clone())));
register_sink("session-b", Arc::new(CountingSink(b.clone())));
emit_event(&AgentEvent::IterationStart {
session_id: "session-a".into(),
iteration: 0,
provider: String::new(),
model: String::new(),
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 0);
emit_event(&AgentEvent::IterationEnd {
session_id: "session-b".into(),
iteration: 0,
iteration_info: serde_json::json!({}),
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
clear_session_sinks("session-a");
assert_eq!(session_external_sink_count("session-a"), 0);
assert_eq!(session_external_sink_count("session-b"), 1);
reset_all_sinks();
}
#[test]
fn newly_opened_child_session_inherits_current_external_sinks() {
reset_all_sinks();
let delivered = Arc::new(AtomicUsize::new(0));
register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
{
let _guard = crate::agent_sessions::enter_current_session("outer-session");
let inner = crate::agent_sessions::open_or_create(None);
assert_ne!(inner, "outer-session");
emit_event(&AgentEvent::IterationStart {
session_id: inner,
iteration: 0,
provider: String::new(),
model: String::new(),
});
}
assert_eq!(delivered.load(Ordering::SeqCst), 1);
reset_all_sinks();
}
#[test]
fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
for i in 0..5 {
sink.handle_event(&AgentEvent::IterationStart {
session_id: "s".into(),
iteration: i,
provider: String::new(),
model: String::new(),
});
}
assert_eq!(sink.event_count(), 5);
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let mut last_idx: i64 = -1;
let mut last_ts: i64 = 0;
for line in BufReader::new(file).lines() {
let line = line.unwrap();
let val: serde_json::Value = serde_json::from_str(&line).unwrap();
let idx = val["index"].as_i64().unwrap();
let ts = val["emitted_at_ms"].as_i64().unwrap();
assert_eq!(idx, last_idx + 1, "indices must be contiguous");
assert!(ts >= last_ts, "timestamps must be non-decreasing");
last_idx = idx;
last_ts = ts;
assert_eq!(val["type"], "iteration_start");
}
assert_eq!(last_idx, 4);
let _ = std::fs::remove_file(&path);
}
#[test]
fn judge_decision_round_trips_through_jsonl_sink() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
sink.handle_event(&AgentEvent::JudgeDecision {
session_id: "s".into(),
iteration: 2,
verdict: "continue".into(),
reasoning: "needs a concrete next step".into(),
next_step: Some("run the verifier".into()),
judge_duration_ms: 17,
trigger: Some("stalled".into()),
});
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let line = BufReader::new(file).lines().next().unwrap().unwrap();
let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
match recovered.event {
AgentEvent::JudgeDecision {
session_id,
iteration,
verdict,
reasoning,
next_step,
judge_duration_ms,
trigger,
} => {
assert_eq!(session_id, "s");
assert_eq!(iteration, 2);
assert_eq!(verdict, "continue");
assert_eq!(reasoning, "needs a concrete next step");
assert_eq!(next_step.as_deref(), Some("run the verifier"));
assert_eq!(judge_duration_ms, 17);
assert_eq!(trigger.as_deref(), Some("stalled"));
}
other => panic!("expected JudgeDecision, got {other:?}"),
}
let value: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(value["type"], "judge_decision");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn structural_validator_decision_round_trips_through_jsonl_sink() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!(
"harn-structural-validator-event-log-{}",
uuid::Uuid::now_v7()
));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
sink.handle_event(&AgentEvent::StructuralValidatorDecision {
session_id: "s".into(),
iteration: 2,
rule: "non_empty_when_writes_expected".into(),
diagnostic: "Assistant emitted no tool calls while writable tools were available.".into(),
recommended_action: "Emit the concrete write or edit tool call needed for the task.".into(),
vetoed: true,
skipped: false,
reason: None,
on_failure: "regenerate_with_feedback".into(),
attempts: 1,
max_attempts: 3,
});
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let line = BufReader::new(file).lines().next().unwrap().unwrap();
let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
match recovered.event {
AgentEvent::StructuralValidatorDecision {
session_id,
iteration,
rule,
diagnostic,
recommended_action,
vetoed,
skipped,
reason,
on_failure,
attempts,
max_attempts,
} => {
assert_eq!(session_id, "s");
assert_eq!(iteration, 2);
assert_eq!(rule, "non_empty_when_writes_expected");
assert_eq!(
diagnostic,
"Assistant emitted no tool calls while writable tools were available."
);
assert_eq!(
recommended_action,
"Emit the concrete write or edit tool call needed for the task."
);
assert!(vetoed);
assert!(!skipped);
assert!(reason.is_none());
assert_eq!(on_failure, "regenerate_with_feedback");
assert_eq!(attempts, 1);
assert_eq!(max_attempts, 3);
}
other => panic!("expected StructuralValidatorDecision, got {other:?}"),
}
let value: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(value["type"], "structural_validator_decision");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn scope_classifier_verdict_round_trips_through_jsonl_sink() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!(
"harn-scope-classifier-event-log-{}",
uuid::Uuid::now_v7()
));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
sink.handle_event(&AgentEvent::ScopeClassifierVerdict {
session_id: "s".into(),
iteration: 1,
label: "out_of_scope".into(),
original_label: "out_of_scope".into(),
confidence: 0.91,
confidence_threshold: 0.65,
evidence: "mentions /workspace/other".into(),
skip_main_turn: true,
classifier_kind: Some("custom".into()),
model: None,
error: None,
});
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let line = BufReader::new(file).lines().next().unwrap().unwrap();
let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
match recovered.event {
AgentEvent::ScopeClassifierVerdict {
session_id,
iteration,
label,
confidence,
evidence,
skip_main_turn,
classifier_kind,
..
} => {
assert_eq!(session_id, "s");
assert_eq!(iteration, 1);
assert_eq!(label, "out_of_scope");
assert_eq!(confidence, 0.91);
assert_eq!(evidence, "mentions /workspace/other");
assert!(skip_main_turn);
assert_eq!(classifier_kind.as_deref(), Some("custom"));
}
other => panic!("expected ScopeClassifierVerdict, got {other:?}"),
}
let value: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(value["type"], "scope_classifier_verdict");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
let terminal = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: Some(42),
execution_duration_ms: Some(7),
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let value = serde_json::to_value(&terminal).unwrap();
assert_eq!(value["duration_ms"], serde_json::json!(42));
assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
let intermediate = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::InProgress,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let value = serde_json::to_value(&intermediate).unwrap();
let object = value.as_object().expect("update serializes as object");
assert!(
!object.contains_key("duration_ms"),
"duration_ms must be omitted when None: {value}"
);
assert!(
!object.contains_key("execution_duration_ms"),
"execution_duration_ms must be omitted when None: {value}"
);
}
#[test]
fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
let raw = serde_json::json!({
"type": "tool_call_update",
"session_id": "s",
"tool_call_id": "tc-1",
"tool_name": "read",
"status": "completed",
"raw_output": null,
"error": null,
});
let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
match event {
AgentEvent::ToolCallUpdate {
duration_ms,
execution_duration_ms,
..
} => {
assert!(duration_ms.is_none());
assert!(execution_duration_ms.is_none());
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
#[test]
fn tool_call_status_serde() {
assert_eq!(
serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
"\"pending\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
"\"in_progress\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
"\"completed\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
"\"failed\""
);
}
#[test]
fn tool_call_error_category_serializes_as_snake_case() {
let pairs = [
(ToolCallErrorCategory::SchemaValidation, "schema_validation"),
(ToolCallErrorCategory::ToolError, "tool_error"),
(ToolCallErrorCategory::McpServerError, "mcp_server_error"),
(ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
(ToolCallErrorCategory::PermissionDenied, "permission_denied"),
(ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
(ToolCallErrorCategory::ParseAborted, "parse_aborted"),
(ToolCallErrorCategory::Timeout, "timeout"),
(ToolCallErrorCategory::Network, "network"),
(ToolCallErrorCategory::Cancelled, "cancelled"),
(ToolCallErrorCategory::Unknown, "unknown"),
];
for (variant, wire) in pairs {
let encoded = serde_json::to_string(&variant).unwrap();
assert_eq!(encoded, format!("\"{wire}\""));
assert_eq!(variant.as_str(), wire);
let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, variant);
}
}
#[test]
fn tool_executor_round_trips_with_adjacent_tag() {
for executor in [
ToolExecutor::HarnBuiltin,
ToolExecutor::HostBridge,
ToolExecutor::McpServer {
server_name: "linear".to_string(),
},
ToolExecutor::ProviderNative,
] {
let json = serde_json::to_value(&executor).unwrap();
let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
match &executor {
ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
ToolExecutor::McpServer { server_name } => {
assert_eq!(kind, "mcp_server");
assert_eq!(json["server_name"], *server_name);
}
ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
}
let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
assert_eq!(recovered, executor);
}
}
#[test]
fn tool_call_error_category_from_internal_collapses_transient_family() {
use crate::value::ErrorCategory as Internal;
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::Timeout),
ToolCallErrorCategory::Timeout
);
for net in [
Internal::RateLimit,
Internal::Overloaded,
Internal::ServerError,
Internal::TransientNetwork,
] {
assert_eq!(
ToolCallErrorCategory::from_internal(&net),
ToolCallErrorCategory::Network,
"{net:?} should map to Network",
);
}
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
ToolCallErrorCategory::SchemaValidation
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::ToolError),
ToolCallErrorCategory::ToolError
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
ToolCallErrorCategory::PermissionDenied
);
assert_eq!(
ToolCallErrorCategory::from_internal(&Internal::Cancelled),
ToolCallErrorCategory::Cancelled
);
for bridge in [
Internal::Auth,
Internal::EgressBlocked,
Internal::NotFound,
Internal::CircuitOpen,
Internal::Generic,
] {
assert_eq!(
ToolCallErrorCategory::from_internal(&bridge),
ToolCallErrorCategory::HostBridgeError,
"{bridge:?} should map to HostBridgeError",
);
}
}
#[test]
fn tool_call_update_event_omits_error_category_when_none() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "t".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let v = serde_json::to_value(&event).unwrap();
assert_eq!(v["type"], "tool_call_update");
assert!(v.get("error_category").is_none());
}
#[test]
fn tool_call_update_event_serializes_error_category_when_set() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "t".into(),
tool_name: "read".into(),
status: ToolCallStatus::Failed,
raw_output: None,
error: Some("missing required field".into()),
duration_ms: None,
execution_duration_ms: None,
error_category: Some(ToolCallErrorCategory::SchemaValidation),
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let v = serde_json::to_value(&event).unwrap();
assert_eq!(v["error_category"], "schema_validation");
assert_eq!(v["error"], "missing required field");
}
#[test]
fn tool_call_update_omits_executor_when_absent() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("executor").is_none(), "got: {json}");
}
#[test]
fn worker_event_status_strings_cover_all_variants() {
assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
assert_eq!(
WorkerEvent::WorkerWaitingForInput.as_status(),
"awaiting_input"
);
assert_eq!(WorkerEvent::WorkerSuspended.as_status(), "suspended");
assert_eq!(WorkerEvent::WorkerResumed.as_status(), "running");
assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
for terminal in [
WorkerEvent::WorkerCompleted,
WorkerEvent::WorkerFailed,
WorkerEvent::WorkerCancelled,
] {
assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
}
for non_terminal in [
WorkerEvent::WorkerSpawned,
WorkerEvent::WorkerProgressed,
WorkerEvent::WorkerWaitingForInput,
WorkerEvent::WorkerSuspended,
WorkerEvent::WorkerResumed,
] {
assert!(
!non_terminal.is_terminal(),
"{non_terminal:?} should not be terminal"
);
}
let collected: Vec<&'static str> = WorkerEvent::ALL
.iter()
.map(|event| event.as_status())
.collect();
assert_eq!(
collected,
vec![
"running",
"progressed",
"awaiting_input",
"suspended",
"running",
"completed",
"failed",
"cancelled",
]
);
}
#[test]
fn worker_update_event_routes_through_session_keyed_sink() {
reset_all_sinks();
let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
impl AgentEventSink for CapturingSink {
fn handle_event(&self, event: &AgentEvent) {
self.0
.lock()
.expect("captured sink mutex poisoned")
.push(event.clone());
}
}
register_sink(
"worker-session-1",
Arc::new(CapturingSink(captured.clone())),
);
emit_event(&AgentEvent::WorkerUpdate {
session_id: "worker-session-1".into(),
worker_id: "worker_42".into(),
worker_name: "review_captain".into(),
worker_task: "review pr".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerWaitingForInput,
status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
audit: None,
});
emit_event(&AgentEvent::WorkerUpdate {
session_id: "other-session".into(),
worker_id: "w2".into(),
worker_name: "n2".into(),
worker_task: "t2".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerCompleted,
status: "completed".into(),
metadata: serde_json::json!({}),
audit: None,
});
let received = captured.lock().unwrap().clone();
assert_eq!(received.len(), 1, "got: {received:?}");
match &received[0] {
AgentEvent::WorkerUpdate {
session_id,
worker_id,
event,
status,
..
} => {
assert_eq!(session_id, "worker-session-1");
assert_eq!(worker_id, "worker_42");
assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
assert_eq!(status, "awaiting_input");
}
other => panic!("expected WorkerUpdate, got {other:?}"),
}
reset_all_sinks();
}
#[test]
fn worker_update_event_serializes_to_canonical_shape() {
let event = AgentEvent::WorkerUpdate {
session_id: "s".into(),
worker_id: "w".into(),
worker_name: "n".into(),
worker_task: "t".into(),
worker_mode: "delegated_stage".into(),
event: WorkerEvent::WorkerProgressed,
status: "progressed".into(),
metadata: serde_json::json!({"started_at": "0193..."}),
audit: Some(serde_json::json!({"run_id": "run_x"})),
};
let value = serde_json::to_value(&event).unwrap();
assert_eq!(value["type"], "worker_update");
assert_eq!(value["session_id"], "s");
assert_eq!(value["worker_id"], "w");
assert_eq!(value["status"], "progressed");
assert_eq!(value["audit"]["run_id"], "run_x");
let recovered: AgentEvent = serde_json::from_value(value).unwrap();
match recovered {
AgentEvent::WorkerUpdate {
event: recovered_event,
..
} => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
other => panic!("expected WorkerUpdate, got {other:?}"),
}
}
#[test]
fn tool_call_update_includes_executor_when_present() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: Some(ToolExecutor::McpServer {
server_name: "github".into(),
}),
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["executor"]["kind"], "mcp_server");
assert_eq!(json["executor"]["server_name"], "github");
}
#[test]
fn tool_call_update_omits_audit_when_absent() {
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "read".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: None,
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: None,
};
let json = serde_json::to_value(&event).unwrap();
assert!(json.get("audit").is_none(), "got: {json}");
}
#[test]
fn tool_call_update_includes_audit_when_present() {
let audit = MutationSessionRecord {
session_id: "session_42".into(),
run_id: Some("run_42".into()),
mutation_scope: "apply_workspace".into(),
execution_kind: Some("worker".into()),
..Default::default()
};
let event = AgentEvent::ToolCallUpdate {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "edit_file".into(),
status: ToolCallStatus::Completed,
raw_output: None,
error: None,
duration_ms: None,
execution_duration_ms: None,
error_category: None,
executor: Some(ToolExecutor::HostBridge),
parsing: None,
raw_input: None,
raw_input_partial: None,
audit: Some(audit),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["audit"]["session_id"], "session_42");
assert_eq!(json["audit"]["run_id"], "run_42");
assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
assert_eq!(json["audit"]["execution_kind"], "worker");
}
#[test]
fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
let raw = serde_json::json!({
"type": "tool_call_update",
"session_id": "s",
"tool_call_id": "tc-1",
"tool_name": "read",
"status": "completed",
"raw_output": null,
"error": null,
});
let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
match event {
AgentEvent::ToolCallUpdate { audit, .. } => {
assert!(audit.is_none());
}
other => panic!("expected ToolCallUpdate, got {other:?}"),
}
}
#[test]
fn tool_call_audit_serializes_with_free_form_audit_payload() {
let audit = serde_json::json!({
"summary": "Searched codebase",
"kind": "search",
"consent": {"decision": "approved", "decided_by": "auto"},
"layers": [{"name": "with_required_reason", "status": "ok"}],
});
let event = AgentEvent::ToolCallAudit {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "search_files".into(),
audit: audit.clone(),
receipt: None,
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["type"], "tool_call_audit");
assert_eq!(json["session_id"], "s");
assert_eq!(json["tool_call_id"], "tc-1");
assert_eq!(json["tool_name"], "search_files");
assert_eq!(json["audit"], audit);
}
#[test]
fn tool_call_audit_session_id_routes_correctly() {
let event = AgentEvent::ToolCallAudit {
session_id: "abc".into(),
tool_call_id: "tc".into(),
tool_name: "read".into(),
audit: serde_json::Value::Null,
receipt: None,
};
assert_eq!(event.session_id(), "abc");
}
#[test]
fn tool_call_audit_serializes_typed_receipt_when_present() {
let receipt = ToolCallReceipt {
schema_version: 1,
session_id: "s".into(),
run_id: None,
tool_call_id: "tc-1".into(),
tool_name: "search_files".into(),
iteration: 3,
turn_index: Some(2),
emit_order: 0,
reason: Some("Search for middleware".into()),
kind: Some("search".into()),
executor: Some("harn".into()),
status: "ok".into(),
error_category: None,
duration_ms: 9,
args_hash: "0".repeat(64),
result_hash: Some("1".repeat(64)),
audit: serde_json::json!({"summary": "Search for middleware"}),
emitted_at: "2026-05-16T00:00:00Z".into(),
model: Some("mock".into()),
provider: Some("mock".into()),
};
let event = AgentEvent::ToolCallAudit {
session_id: "s".into(),
tool_call_id: "tc-1".into(),
tool_name: "search_files".into(),
audit: receipt.audit.clone(),
receipt: Some(receipt.clone()),
};
let json = serde_json::to_value(&event).unwrap();
assert_eq!(json["receipt"], serde_json::to_value(receipt).unwrap());
assert_eq!(json["receipt"]["args_hash"], "0".repeat(64));
}
#[test]
fn wildcard_sink_receives_events_across_sessions() {
reset_wildcard_sinks();
let counter = Arc::new(AtomicUsize::new(0));
let handle = register_wildcard_sink(Arc::new(CountingSink(counter.clone())));
emit_event(&AgentEvent::IterationStart {
session_id: "session-w".into(),
iteration: 0,
provider: String::new(),
model: String::new(),
});
emit_event(&AgentEvent::IterationEnd {
session_id: "session-w-other".into(),
iteration: 0,
iteration_info: serde_json::json!({}),
});
assert_eq!(counter.load(Ordering::SeqCst), 2);
unregister_wildcard_sink(handle);
emit_event(&AgentEvent::IterationStart {
session_id: "session-w".into(),
iteration: 1,
provider: String::new(),
model: String::new(),
});
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"unregister stops delivery"
);
reset_wildcard_sinks();
}
#[test]
fn wildcard_sink_unregister_unknown_handle_is_noop() {
reset_wildcard_sinks();
let counter = Arc::new(AtomicUsize::new(0));
let handle = register_wildcard_sink(Arc::new(CountingSink(counter.clone())));
unregister_wildcard_sink(handle);
unregister_wildcard_sink(handle);
let bogus = WildcardSinkHandle(u64::MAX);
unregister_wildcard_sink(bogus);
emit_event(&AgentEvent::IterationStart {
session_id: "s".into(),
iteration: 0,
provider: String::new(),
model: String::new(),
});
assert_eq!(counter.load(Ordering::SeqCst), 0);
reset_wildcard_sinks();
}