use crate::ast::{ActorHint, Span, TypeRef};
use crate::error::{ErrorCode, ErrorKind, ErrorSource};
use crate::value::Value;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
pub type NodeId = usize;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedToolCall {
pub tool_use_id: String,
pub name: String,
pub args: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "detail")]
pub enum ChildOutcome {
Ok {
value: serde_json::Value,
},
Err {
kind: String,
message: String,
code: Option<String>,
},
}
fn default_error_code_other() -> ErrorCode {
ErrorCode::Other
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TokenUsage {
pub input_tokens: u64,
pub output_tokens: u64,
pub model: String,
pub provider: String,
pub cached_input_tokens: u64,
#[serde(default)]
pub cache_write_input_tokens: u64,
#[serde(default)]
pub cache_write_5m_input_tokens: u64,
#[serde(default)]
pub cache_write_1h_input_tokens: u64,
#[serde(default)]
pub stop_reason: Option<String>,
#[serde(default)]
pub raw_stop_reason: Option<String>,
#[serde(default)]
pub reasoning_tokens: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubScriptFrame {
pub script_name: String,
pub parent_task: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_node_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub attempt: Option<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct WorkflowTotals {
#[serde(default)]
pub total_input_tokens: u64,
#[serde(default)]
pub total_output_tokens: u64,
#[serde(default)]
pub total_cached_input_tokens: u64,
#[serde(default)]
pub total_thinking_tokens: u64,
#[serde(default)]
pub total_tool_tokens: u64,
#[serde(default)]
pub total_cost_usd: f64,
#[serde(default)]
pub task_count: u32,
}
impl WorkflowTotals {
pub fn accumulate(&mut self, usage: Option<&TokenUsage>) {
self.task_count = self.task_count.saturating_add(1);
if let Some(u) = usage {
self.total_input_tokens = self.total_input_tokens.saturating_add(u.input_tokens);
self.total_output_tokens = self.total_output_tokens.saturating_add(u.output_tokens);
self.total_cached_input_tokens = self
.total_cached_input_tokens
.saturating_add(u.cached_input_tokens);
self.total_thinking_tokens = self
.total_thinking_tokens
.saturating_add(u.reasoning_tokens);
}
}
pub fn merge(&mut self, other: &WorkflowTotals) {
self.total_input_tokens = self
.total_input_tokens
.saturating_add(other.total_input_tokens);
self.total_output_tokens = self
.total_output_tokens
.saturating_add(other.total_output_tokens);
self.total_cached_input_tokens = self
.total_cached_input_tokens
.saturating_add(other.total_cached_input_tokens);
self.total_thinking_tokens = self
.total_thinking_tokens
.saturating_add(other.total_thinking_tokens);
self.total_tool_tokens = self
.total_tool_tokens
.saturating_add(other.total_tool_tokens);
self.total_cost_usd += other.total_cost_usd;
self.task_count = self.task_count.saturating_add(other.task_count);
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WorkflowEndPayload {
pub value: Value,
pub totals: WorkflowTotals,
}
impl Default for WorkflowEndPayload {
fn default() -> Self {
Self {
value: Value::Null,
totals: WorkflowTotals::default(),
}
}
}
impl WorkflowEndPayload {
pub fn new(value: Value) -> Self {
Self {
value,
totals: WorkflowTotals::default(),
}
}
pub fn with_totals(value: Value, totals: WorkflowTotals) -> Self {
Self { value, totals }
}
}
impl From<Value> for WorkflowEndPayload {
fn from(value: Value) -> Self {
Self::new(value)
}
}
impl Serialize for WorkflowEndPayload {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeMap;
let mut map = s.serialize_map(Some(8))?;
map.serialize_entry("value", &self.value.to_wire_json())?;
map.serialize_entry("total_input_tokens", &self.totals.total_input_tokens)?;
map.serialize_entry("total_output_tokens", &self.totals.total_output_tokens)?;
map.serialize_entry(
"total_cached_input_tokens",
&self.totals.total_cached_input_tokens,
)?;
map.serialize_entry("total_thinking_tokens", &self.totals.total_thinking_tokens)?;
map.serialize_entry("total_tool_tokens", &self.totals.total_tool_tokens)?;
map.serialize_entry("total_cost_usd", &self.totals.total_cost_usd)?;
map.serialize_entry("task_count", &self.totals.task_count)?;
map.end()
}
}
impl<'de> Deserialize<'de> for WorkflowEndPayload {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let raw = serde_json::Value::deserialize(d)?;
const AGG_KEYS: &[&str] = &[
"total_input_tokens",
"total_output_tokens",
"total_cached_input_tokens",
"total_thinking_tokens",
"total_tool_tokens",
"total_cost_usd",
"task_count",
];
if let serde_json::Value::Object(map) = &raw {
let has_value = map.contains_key("value");
let has_any_agg = AGG_KEYS.iter().any(|k| map.contains_key(*k));
if has_value && has_any_agg {
let value = Value::from_json(map.get("value").unwrap());
let totals = WorkflowTotals {
total_input_tokens: map
.get("total_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_output_tokens: map
.get("total_output_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_cached_input_tokens: map
.get("total_cached_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_thinking_tokens: map
.get("total_thinking_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_tool_tokens: map
.get("total_tool_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
total_cost_usd: map
.get("total_cost_usd")
.and_then(|v| v.as_f64())
.unwrap_or(0.0),
task_count: map
.get("task_count")
.and_then(|v| v.as_u64())
.map(|n| n as u32)
.unwrap_or(0),
};
return Ok(WorkflowEndPayload { value, totals });
}
}
Ok(WorkflowEndPayload {
value: Value::from_json(&raw),
totals: WorkflowTotals::default(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ValidationErrorWire {
pub stage: String,
pub message: String,
pub path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind")]
#[derive(Default)]
pub enum SuspendTrigger {
#[default]
DagPosition,
ValidationExhausted {
task_name: String,
retry_count: u32,
last_attempt: String,
validation_errors: Vec<ValidationErrorWire>,
},
AgentUnable {
task_name: String,
unable: crate::value::UnableRecord,
},
AgentVariant {
task_name: String,
variant: String,
payload: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LoopSuspendContext {
pub loop_id: String,
pub loop_name: String,
pub turn: u32,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[derive(Default)]
pub enum TaskEndVariant {
#[default]
Success,
Unable,
Failed,
#[serde(other)]
Unknown,
}
mod value_wire {
use super::*;
pub(super) fn serialize<S: Serializer>(v: &Value, s: S) -> Result<S::Ok, S::Error> {
v.to_wire_json().serialize(s)
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Value, D::Error> {
let j = serde_json::Value::deserialize(d)?;
Ok(Value::from_json(&j))
}
}
mod opt_value_wire {
use super::*;
pub(super) fn serialize<S: Serializer>(v: &Option<Value>, s: S) -> Result<S::Ok, S::Error> {
match v {
Some(val) => val.to_wire_json().serialize(s),
None => s.serialize_none(),
}
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Value>, D::Error> {
let j = Option::<serde_json::Value>::deserialize(d)?;
Ok(j.map(|v| Value::from_json(&v)))
}
}
mod value_map_wire {
use super::*;
pub(super) fn serialize<S: Serializer>(
m: &HashMap<String, Value>,
s: S,
) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeMap;
let mut map = s.serialize_map(Some(m.len()))?;
for (k, v) in m {
map.serialize_entry(k, &v.to_wire_json())?;
}
map.end()
}
pub(super) fn deserialize<'de, D: Deserializer<'de>>(
d: D,
) -> Result<HashMap<String, Value>, D::Error> {
let raw = HashMap::<String, serde_json::Value>::deserialize(d)?;
Ok(raw
.into_iter()
.map(|(k, v)| (k, Value::from_json(&v)))
.collect())
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type", content = "payload")]
pub enum EngineEvent {
Log(String),
LogLevel {
level: String,
message: String,
},
StateUpdate(String, #[serde(with = "value_wire")] Value),
WorkflowStart(usize), TaskStart(String, Option<String>), TaskPrompt(String, String), TaskEnd {
task: String,
on_error_label: Option<String>,
#[serde(with = "value_wire")]
value: Value,
value_type: Option<TypeRef>,
duration: std::time::Duration,
attempt: u8,
usage: Option<TokenUsage>,
#[serde(default)]
variant: TaskEndVariant,
},
AgentOutput {
task_name: String,
agent_name: Option<String>,
task_id: String,
schema_type: Option<String>,
chunk: String,
},
AgentReasoning {
task_name: String,
agent_name: Option<String>,
task_id: String,
schema_type: Option<String>,
chunk: String,
},
CachePlanned {
agent: String,
n_segments: usize,
n_stable: usize,
longest_stable_prefix_len_chars: usize,
markers_placed: usize,
#[serde(default)]
markers_placed_at: Vec<usize>,
#[serde(default)]
provider: String,
#[serde(default)]
tools_marker_placed: bool,
#[serde(default)]
system_marker_placed: bool,
},
Suspended {
checkpoint_name: String,
token: String,
prompt: String,
schema: serde_json::Value,
actor_hint: ActorHint,
timeout_secs: Option<u64>,
#[serde(default)]
trigger: SuspendTrigger,
#[serde(default, skip_serializing_if = "Option::is_none")]
loop_context: Option<LoopSuspendContext>,
},
Resumed {
checkpoint_name: String,
token: String,
},
WorkflowEnd(WorkflowEndPayload),
Error {
message: String,
kind: ErrorKind,
#[serde(default = "default_error_code_other")]
code: ErrorCode,
#[serde(default)]
user_message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
retry_after_ms: Option<u64>,
#[serde(default, skip_serializing_if = "ErrorSource::is_empty")]
source: ErrorSource,
},
NodeStart(NodeId, Span),
NodeEnd {
node_id: NodeId,
span: Span,
target_var: Option<String>,
#[serde(with = "opt_value_wire")]
value: Option<Value>,
duration: std::time::Duration,
},
Breakpoint {
node_id: NodeId,
span: Span,
token: String,
#[serde(with = "value_map_wire")]
env_snapshot: std::collections::HashMap<String, Value>,
},
BreakpointResumed {
node_id: NodeId,
token: String,
},
ToolCallStart {
task_name: String,
tool_name: String,
server_name: String,
input: serde_json::Value,
#[serde(default)]
tool_use_id: String,
},
ToolCallEnd {
task_name: String,
tool_name: String,
#[serde(default)]
tool_use_id: String,
output: serde_json::Value,
duration: std::time::Duration,
},
McpServerDegraded {
alias: String,
reason: String,
},
McpServerRecovered {
alias: String,
},
ToolApprovalPending {
execution_id: Option<String>,
node_id: Option<u64>,
token: String,
tool_ref: String,
args: serde_json::Value,
},
ToolApprovalResolved {
token: String,
approved: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
args_override: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
reason: Option<String>,
},
ToolApprovalSkipped {
execution_id: Option<String>,
node_id: Option<u64>,
tool_ref: String,
reason: String,
},
ToolReplayUncertain {
execution_id: Option<String>,
tool_use_id: String,
tool_name: String,
#[serde(default)]
args: serde_json::Value,
},
VerificationStart {
workflow_name: String,
},
VerificationResult {
workflow_name: String,
results: serde_json::Value,
duration: std::time::Duration,
},
ValidationFailure {
task_name: String,
attempt: u32,
model_response: String,
#[serde(default)]
truncated: bool,
#[serde(default)]
total_length: u64,
missing_fields: Vec<String>,
extra_fields: Vec<String>,
type_errors: Vec<String>,
stop_reason: Option<String>,
},
SubScript {
script_name: String,
parent_task: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_node_id: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
attempt: Option<u8>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
parent_path: Vec<SubScriptFrame>,
child: Box<EngineEvent>,
},
LoopStart {
name: String,
max_turns: u32,
},
LoopTurn {
name: String,
turn: u32,
tool_calls: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
usage: Option<TokenUsage>,
},
LoopEnd {
name: String,
turn_count: u32,
#[serde(with = "value_wire")]
value: Value,
},
ContextCompacted {
agent: String,
loop_id: Option<String>,
turn: Option<u32>,
threshold_pct: Option<u8>,
threshold_abs: Option<u32>,
strategy: String,
before_tokens: u32,
after_tokens: u32,
provider_native: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
cache_ttl: Option<String>,
},
ContextOverflow {
agent: String,
attempted_strategies: Vec<String>,
configured_cap_tokens: u32,
model_context_window: u32,
#[serde(default)]
terminated_by_hard_error: bool,
},
TaskCacheHit {
agent: String,
key_prefix: String,
},
LLMResponse {
node_id: String,
call_index: u32,
text: String,
tool_calls: Vec<CachedToolCall>,
usage: Option<TokenUsage>,
},
LLMReplayCacheHit {
node_id: String,
call_index: u32,
},
SubScriptSpawned {
child_execution_id: String,
parent_node_id: String,
args: serde_json::Value,
},
SubScriptResult {
parent_node_id: String,
child_execution_id: String,
outcome: ChildOutcome,
},
CheckpointResolution {
checkpoint_id: String,
payload: serde_json::Value,
},
RuntimeStart {
task_name: String,
runtime_name: String,
language: String,
},
RuntimeStdout {
task_name: String,
chunk: String,
},
RuntimeStderr {
task_name: String,
chunk: String,
},
RuntimeEnd {
task_name: String,
exit_code: i32,
duration_ms: u64,
},
RuntimeError {
task_name: String,
kind: String,
message: String,
},
}
pub const VALIDATION_FAILURE_RESPONSE_CAP_BYTES: usize = 64 * 1024;
impl EngineEvent {
pub fn flatten_subscript_chain(self) -> Self {
let (script_name, parent_task, parent_node_id, attempt, outer_path, child) = match self {
EngineEvent::SubScript {
script_name,
parent_task,
parent_node_id,
attempt,
parent_path,
child,
} => (
script_name,
parent_task,
parent_node_id,
attempt,
parent_path,
child,
),
other => return other,
};
let mut frames: Vec<SubScriptFrame> = outer_path;
let mut cur_script = script_name;
let mut cur_task = parent_task;
let mut cur_node = parent_node_id;
let mut cur_attempt = attempt;
let mut cur_child: Box<EngineEvent> = child;
loop {
match *cur_child {
EngineEvent::SubScript {
script_name: inner_script,
parent_task: inner_task,
parent_node_id: inner_node,
attempt: inner_attempt,
parent_path: inner_path,
child: inner_child,
} => {
frames.extend(inner_path);
frames.push(SubScriptFrame {
script_name: cur_script,
parent_task: cur_task,
parent_node_id: cur_node,
attempt: cur_attempt,
});
cur_script = inner_script;
cur_task = inner_task;
cur_node = inner_node;
cur_attempt = inner_attempt;
cur_child = inner_child;
}
leaf => {
return EngineEvent::SubScript {
script_name: cur_script,
parent_task: cur_task,
parent_node_id: cur_node,
attempt: cur_attempt,
parent_path: frames,
child: Box::new(leaf),
};
}
}
}
}
pub fn validation_failure(
task_name: impl Into<String>,
attempt: u32,
model_response: String,
missing_fields: Vec<String>,
extra_fields: Vec<String>,
type_errors: Vec<String>,
stop_reason: Option<String>,
) -> Self {
let total_length = model_response.len() as u64;
let (response, truncated) = if model_response.len() > VALIDATION_FAILURE_RESPONSE_CAP_BYTES
{
let mut end = VALIDATION_FAILURE_RESPONSE_CAP_BYTES;
while end > 0 && !model_response.is_char_boundary(end) {
end -= 1;
}
(model_response[..end].to_string(), true)
} else {
(model_response, false)
};
EngineEvent::ValidationFailure {
task_name: task_name.into(),
attempt,
model_response: response,
truncated,
total_length,
missing_fields,
extra_fields,
type_errors,
stop_reason,
}
}
pub fn error(detail: crate::error::ErrorDetail) -> Self {
EngineEvent::Error {
message: detail.message,
kind: detail.kind,
code: detail.code,
user_message: detail.user_message,
retry_after_ms: detail.retry_after_ms,
source: detail.source,
}
}
pub fn error_kind(kind: ErrorKind, message: impl Into<String>) -> Self {
EngineEvent::error(crate::error::ErrorDetail::from_kind(kind, message))
}
pub fn error_code(code: ErrorCode, message: impl Into<String>) -> Self {
EngineEvent::error(crate::error::ErrorDetail::new(code, message))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::TypeRef;
use crate::value::Value;
#[test]
fn task_end_event_serializes_with_value_and_value_type_and_attempt() {
let e = EngineEvent::TaskEnd {
task: "t".into(),
on_error_label: None,
value: Value::String("x".into()),
value_type: Some(TypeRef::primitive("int")),
duration: std::time::Duration::from_millis(10),
attempt: 2,
usage: None,
variant: TaskEndVariant::Success,
};
let s = serde_json::to_string(&e).unwrap();
assert!(
s.contains("\"value\""),
"serialized event should contain 'value' key: {}",
s
);
assert!(
s.contains("\"value_type\""),
"serialized event should contain 'value_type' key: {}",
s
);
assert!(
s.contains("\"attempt\":2"),
"serialized event should contain 'attempt':2: {}",
s
);
assert!(
s.contains("\"variant\":\"success\""),
"variant should round-trip as snake_case 'success': {}",
s
);
}
#[test]
fn task_end_value_emits_clean_wire_form_not_tagged_value() {
let mut obj = std::collections::HashMap::new();
obj.insert("exit_code".to_string(), Value::Int(0));
obj.insert("stdout".to_string(), Value::String("hi\n".into()));
let e = EngineEvent::TaskEnd {
task: "run".into(),
on_error_label: None,
value: Value::Object(obj),
value_type: None,
duration: std::time::Duration::from_millis(10),
attempt: 1,
usage: None,
variant: TaskEndVariant::Success,
};
let j: serde_json::Value =
serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
let value = &j["payload"]["value"];
assert_eq!(value["exit_code"], serde_json::json!(0));
assert_eq!(value["stdout"], serde_json::json!("hi\n"));
assert!(
value.get("Object").is_none(),
"wire form must not carry the tagged-enum 'Object' key: {value}",
);
assert!(
value["exit_code"].get("Int").is_none(),
"wire form must not carry the tagged-enum 'Int' key for scalars: {value}",
);
}
#[test]
fn workflow_end_payload_emits_clean_wire_form_not_tagged_value() {
let mut obj = std::collections::HashMap::new();
obj.insert("exit_code".to_string(), Value::Int(0));
obj.insert("duration_ms".to_string(), Value::Int(12));
let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
let j: serde_json::Value =
serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
let payload = &j["payload"];
let value = &payload["value"];
assert_eq!(value["exit_code"], serde_json::json!(0));
assert_eq!(value["duration_ms"], serde_json::json!(12));
assert!(
value.get("Object").is_none(),
"wire form must not carry the tagged-enum 'Object' key: {value}",
);
assert_eq!(payload["total_input_tokens"], serde_json::json!(0));
assert_eq!(payload["total_output_tokens"], serde_json::json!(0));
assert_eq!(payload["task_count"], serde_json::json!(0));
}
#[test]
fn workflow_end_value_deserialise_round_trip_keeps_clean_shape() {
let mut obj = std::collections::HashMap::new();
obj.insert("exit_code".to_string(), Value::Int(0));
obj.insert("stdout".to_string(), Value::String("hi".into()));
let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
let s = serde_json::to_string(&e).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
let s_again = serde_json::to_string(&back).unwrap();
assert_eq!(s, s_again, "wire form should round-trip without drift");
}
#[test]
fn workflow_end_back_compat_legacy_bare_value_payload_parses() {
let legacy = r#"{"type":"WorkflowEnd","payload":{"exit_code":0,"stdout":"hi"}}"#;
let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
match evt {
EngineEvent::WorkflowEnd(payload) => {
assert!(matches!(payload.value, Value::Object(_)));
assert_eq!(payload.totals.total_input_tokens, 0);
assert_eq!(payload.totals.total_output_tokens, 0);
assert_eq!(payload.totals.task_count, 0);
}
other => panic!("expected WorkflowEnd, got {other:?}"),
}
}
#[test]
fn workflow_end_scalar_legacy_payload_parses_as_bare_value() {
let legacy = r#"{"type":"WorkflowEnd","payload":"hi"}"#;
let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
match evt {
EngineEvent::WorkflowEnd(payload) => {
assert!(matches!(payload.value, Value::String(ref s) if s == "hi"));
}
other => panic!("expected WorkflowEnd, got {other:?}"),
}
}
#[test]
fn workflow_end_new_wire_shape_round_trips_with_totals() {
let totals = WorkflowTotals {
total_input_tokens: 1234,
total_output_tokens: 567,
total_cached_input_tokens: 100,
total_thinking_tokens: 25,
total_tool_tokens: 0,
total_cost_usd: 0.42,
task_count: 3,
};
let mut obj = std::collections::HashMap::new();
obj.insert("answer".to_string(), Value::Int(42));
let payload = WorkflowEndPayload::with_totals(Value::Object(obj), totals.clone());
let evt = EngineEvent::WorkflowEnd(payload);
let s = serde_json::to_string(&evt).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::WorkflowEnd(p) => {
assert_eq!(p.totals.total_input_tokens, 1234);
assert_eq!(p.totals.total_output_tokens, 567);
assert_eq!(p.totals.total_cached_input_tokens, 100);
assert_eq!(p.totals.total_thinking_tokens, 25);
assert_eq!(p.totals.task_count, 3);
assert!((p.totals.total_cost_usd - 0.42).abs() < 1e-9);
}
other => panic!("expected WorkflowEnd, got {other:?}"),
}
}
#[test]
fn workflow_totals_accumulate_folds_token_usage() {
let mut tot = WorkflowTotals::default();
tot.accumulate(Some(&TokenUsage {
input_tokens: 100,
output_tokens: 50,
cached_input_tokens: 10,
reasoning_tokens: 7,
..Default::default()
}));
tot.accumulate(Some(&TokenUsage {
input_tokens: 200,
output_tokens: 80,
cached_input_tokens: 30,
reasoning_tokens: 3,
..Default::default()
}));
tot.accumulate(None);
assert_eq!(tot.total_input_tokens, 300);
assert_eq!(tot.total_output_tokens, 130);
assert_eq!(tot.total_cached_input_tokens, 40);
assert_eq!(tot.total_thinking_tokens, 10);
assert_eq!(tot.task_count, 3);
}
#[test]
fn sub_script_flatten_chain_collapses_legacy_nested_envelopes() {
let leaf = EngineEvent::Log("hi".into());
let depth3 = EngineEvent::SubScript {
script_name: "C".into(),
parent_task: "c_task".into(),
parent_node_id: Some(3),
attempt: None,
parent_path: Vec::new(),
child: Box::new(leaf),
};
let depth2 = EngineEvent::SubScript {
script_name: "B".into(),
parent_task: "b_task".into(),
parent_node_id: Some(2),
attempt: None,
parent_path: Vec::new(),
child: Box::new(depth3),
};
let legacy = EngineEvent::SubScript {
script_name: "A".into(),
parent_task: "a_task".into(),
parent_node_id: Some(1),
attempt: None,
parent_path: Vec::new(),
child: Box::new(depth2),
};
let flat = legacy.flatten_subscript_chain();
match flat {
EngineEvent::SubScript {
script_name,
parent_task,
parent_node_id,
parent_path,
child,
..
} => {
assert_eq!(script_name, "C");
assert_eq!(parent_task, "c_task");
assert_eq!(parent_node_id, Some(3));
assert_eq!(parent_path.len(), 2);
assert_eq!(parent_path[0].script_name, "A");
assert_eq!(parent_path[0].parent_task, "a_task");
assert_eq!(parent_path[0].parent_node_id, Some(1));
assert_eq!(parent_path[1].script_name, "B");
assert_eq!(parent_path[1].parent_task, "b_task");
assert_eq!(parent_path[1].parent_node_id, Some(2));
assert!(matches!(*child, EngineEvent::Log(ref s) if s == "hi"));
}
other => panic!("expected SubScript, got {other:?}"),
}
}
#[test]
fn sub_script_flatten_leaves_already_flat_envelopes_alone() {
let evt = EngineEvent::SubScript {
script_name: "child".into(),
parent_task: "result".into(),
parent_node_id: Some(7),
attempt: Some(1),
parent_path: Vec::new(),
child: Box::new(EngineEvent::Log("hi".into())),
};
let flat = evt.flatten_subscript_chain();
match flat {
EngineEvent::SubScript {
parent_path, child, ..
} => {
assert!(parent_path.is_empty());
assert!(matches!(*child, EngineEvent::Log(_)));
}
other => panic!("expected SubScript, got {other:?}"),
}
}
#[test]
fn sub_script_flatten_no_op_for_non_subscript_events() {
let evt = EngineEvent::Log("hi".into());
let flat = evt.flatten_subscript_chain();
assert!(matches!(flat, EngineEvent::Log(ref s) if s == "hi"));
}
#[test]
fn task_end_event_value_type_none_serializes_null() {
let e = EngineEvent::TaskEnd {
task: "t".into(),
on_error_label: None,
value: Value::Null,
value_type: None,
duration: std::time::Duration::from_millis(5),
attempt: 1,
usage: None,
variant: TaskEndVariant::Success,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"attempt\":1"), "attempt should be 1: {}", s);
assert!(
s.contains("\"value_type\":null"),
"value_type should be null: {}",
s
);
}
#[test]
fn task_end_event_without_variant_deserializes_as_success() {
let json = r#"{
"type": "TaskEnd",
"payload": {
"task": "t",
"on_error_label": null,
"value": "x",
"value_type": null,
"duration": {"secs": 0, "nanos": 10000000},
"attempt": 1,
"usage": null
}
}"#;
let e: EngineEvent = serde_json::from_str(json).unwrap();
match e {
EngineEvent::TaskEnd { variant, .. } => {
assert_eq!(variant, TaskEndVariant::Success);
}
_ => panic!("expected TaskEnd"),
}
}
#[test]
fn task_end_event_with_unable_variant_roundtrips() {
let e = EngineEvent::TaskEnd {
task: "decompose".into(),
on_error_label: None,
value: Value::Unable(crate::value::UnableRecord {
reason: "image too blurry".into(),
missing: vec!["claim_text".into()],
category: crate::value::UnableCategory::InputAmbiguous,
}),
value_type: None,
duration: std::time::Duration::from_millis(10),
attempt: 1,
usage: None,
variant: TaskEndVariant::Unable,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"variant\":\"unable\""), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::TaskEnd { variant, .. } => {
assert_eq!(variant, TaskEndVariant::Unable);
}
_ => panic!("expected TaskEnd"),
}
}
#[test]
fn task_end_variant_unknown_discriminator_deserializes_as_unknown() {
let json = r#"{
"type": "TaskEnd",
"payload": {
"task": "t",
"on_error_label": null,
"value": null,
"value_type": null,
"duration": {"secs": 0, "nanos": 0},
"attempt": 1,
"usage": null,
"variant": "partial"
}
}"#;
let e: EngineEvent = serde_json::from_str(json).unwrap();
match e {
EngineEvent::TaskEnd { variant, .. } => {
assert_eq!(variant, TaskEndVariant::Unknown);
}
_ => panic!("expected TaskEnd"),
}
}
#[test]
fn suspended_event_with_validation_exhausted_trigger_serializes() {
let e = EngineEvent::Suspended {
checkpoint_name: "human_review".into(),
token: "tok".into(),
prompt: "Please review".into(),
schema: serde_json::json!({"type": "integer"}),
actor_hint: ActorHint::Human,
timeout_secs: Some(3600),
trigger: SuspendTrigger::ValidationExhausted {
task_name: "decompose_claims".into(),
retry_count: 3,
last_attempt: "{\"bad\": true}".into(),
validation_errors: vec![ValidationErrorWire {
stage: "schema".into(),
message: "required property \"number\" missing".into(),
path: Some("/0".into()),
}],
},
loop_context: None,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"kind\":\"ValidationExhausted\""), "{s}");
assert!(s.contains("\"retry_count\":3"), "{s}");
assert!(s.contains("\"task_name\":\"decompose_claims\""), "{s}");
assert!(s.contains("\"stage\":\"schema\""), "{s}");
assert!(!s.contains("\"loop_context\""), "{s}");
}
#[test]
fn suspended_event_with_loop_context_roundtrips() {
let e = EngineEvent::Suspended {
checkpoint_name: "review".into(),
token: "tok".into(),
prompt: "Triage skill failure".into(),
schema: serde_json::json!({}),
actor_hint: ActorHint::Human,
timeout_secs: None,
trigger: SuspendTrigger::AgentUnable {
task_name: "summarize".into(),
unable: crate::value::UnableRecord {
reason: "input ambiguous".into(),
missing: vec![],
category: crate::value::UnableCategory::InputAmbiguous,
},
},
loop_context: Some(LoopSuspendContext {
loop_id: "11111111-2222-3333-4444-555555555555".into(),
loop_name: "research".into(),
turn: 2,
}),
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"loop_context\""), "{s}");
assert!(s.contains("\"loop_name\":\"research\""), "{s}");
assert!(s.contains("\"turn\":2"), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::Suspended {
loop_context: Some(ctx),
..
} => {
assert_eq!(ctx.loop_name, "research");
assert_eq!(ctx.turn, 2);
assert_eq!(ctx.loop_id, "11111111-2222-3333-4444-555555555555");
}
_ => panic!("expected Suspended with loop_context"),
}
}
#[test]
fn suspended_event_without_trigger_deserializes_as_dag_position() {
let json = r#"{
"type": "Suspended",
"payload": {
"checkpoint_name": "cp",
"token": "t",
"prompt": "p",
"schema": {},
"actor_hint": "Human",
"timeout_secs": null
}
}"#;
let e: EngineEvent = serde_json::from_str(json).unwrap();
match e {
EngineEvent::Suspended { trigger, .. } => {
assert!(matches!(trigger, SuspendTrigger::DagPosition));
}
_ => panic!("expected Suspended"),
}
}
#[test]
fn suspended_event_with_dag_position_trigger_roundtrips() {
let e = EngineEvent::Suspended {
checkpoint_name: "cp".into(),
token: "t".into(),
prompt: "p".into(),
schema: serde_json::json!({}),
actor_hint: ActorHint::Human,
timeout_secs: None,
trigger: SuspendTrigger::DagPosition,
loop_context: None,
};
let s = serde_json::to_string(&e).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::Suspended { trigger, .. } => {
assert!(matches!(trigger, SuspendTrigger::DagPosition));
}
_ => panic!("expected Suspended"),
}
}
#[test]
fn suspend_trigger_agent_variant_roundtrips_with_payload() {
let trigger = SuspendTrigger::AgentVariant {
task_name: "decompose".into(),
variant: "ClaimErr".into(),
payload: serde_json::json!({
"message": "claim unsupported by evidence",
"claim_id": "c-7",
}),
};
let s = serde_json::to_string(&trigger).unwrap();
assert!(s.contains("\"kind\":\"AgentVariant\""), "{s}");
assert!(s.contains("\"variant\":\"ClaimErr\""), "{s}");
let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
match back {
SuspendTrigger::AgentVariant {
task_name,
variant,
payload,
} => {
assert_eq!(task_name, "decompose");
assert_eq!(variant, "ClaimErr");
assert_eq!(
payload["message"].as_str(),
Some("claim unsupported by evidence"),
);
}
other => panic!("expected AgentVariant, got {other:?}"),
}
}
#[test]
fn validation_failure_event_serializes_full_payload() {
let e = EngineEvent::ValidationFailure {
task_name: "classify_features".into(),
attempt: 2,
model_response: "{}".into(),
truncated: false,
total_length: 2,
missing_fields: vec!["/classifications".into()],
extra_fields: vec![],
type_errors: vec!["expected string, got null at /summary".into()],
stop_reason: Some("max_tokens".into()),
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"type\":\"ValidationFailure\""), "{s}");
assert!(s.contains("\"task_name\":\"classify_features\""), "{s}");
assert!(s.contains("\"attempt\":2"), "{s}");
assert!(s.contains("\"model_response\":\"{}\""), "{s}");
assert!(s.contains("\"/classifications\""), "{s}");
assert!(s.contains("\"stop_reason\":\"max_tokens\""), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ValidationFailure {
task_name,
attempt,
model_response,
missing_fields,
extra_fields,
type_errors,
stop_reason,
..
} => {
assert_eq!(task_name, "classify_features");
assert_eq!(attempt, 2);
assert_eq!(model_response, "{}");
assert_eq!(missing_fields, vec!["/classifications"]);
assert!(extra_fields.is_empty());
assert_eq!(type_errors.len(), 1);
assert_eq!(stop_reason.as_deref(), Some("max_tokens"));
}
other => panic!("expected ValidationFailure, got {other:?}"),
}
}
#[test]
fn suspend_trigger_agent_unable_roundtrips_with_payload() {
let trigger = SuspendTrigger::AgentUnable {
task_name: "escalate".into(),
unable: crate::value::UnableRecord {
reason: "image too blurry to OCR".into(),
missing: vec!["claim_text".into()],
category: crate::value::UnableCategory::InputAmbiguous,
},
};
let s = serde_json::to_string(&trigger).unwrap();
assert!(s.contains("\"kind\":\"AgentUnable\""), "{s}");
let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
match back {
SuspendTrigger::AgentUnable { task_name, unable } => {
assert_eq!(task_name, "escalate");
assert_eq!(
unable.category,
crate::value::UnableCategory::InputAmbiguous
);
}
other => panic!("expected AgentUnable, got {other:?}"),
}
}
#[test]
fn error_event_with_code_serializes_with_code_field() {
let e = EngineEvent::error(crate::error::ErrorDetail::new(
crate::error::ErrorCode::ScriptDepthExceeded,
"boom",
));
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"code\":\"ScriptDepthExceeded\""), "{s}");
}
#[test]
fn error_event_default_code_is_other_for_kind_only_construction() {
let e = EngineEvent::error_kind(crate::error::ErrorKind::ScriptError, "plain");
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"code\":\"ScriptError\""), "{s}");
}
#[test]
fn context_compacted_event_roundtrips_full_payload() {
let e = EngineEvent::ContextCompacted {
agent: "researcher".into(),
loop_id: Some("11111111-2222-3333-4444-555555555555".into()),
turn: Some(3),
threshold_pct: Some(70),
threshold_abs: Some(140_000),
strategy: "drop_thinking_blocks".into(),
before_tokens: 142_000,
after_tokens: 96_000,
provider_native: false,
cache_ttl: None,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"type\":\"ContextCompacted\""), "{s}");
assert!(s.contains("\"agent\":\"researcher\""), "{s}");
assert!(s.contains("\"strategy\":\"drop_thinking_blocks\""), "{s}");
assert!(s.contains("\"before_tokens\":142000"), "{s}");
assert!(s.contains("\"after_tokens\":96000"), "{s}");
assert!(s.contains("\"provider_native\":false"), "{s}");
assert!(s.contains("\"turn\":3"), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ContextCompacted {
agent,
loop_id,
turn,
threshold_pct,
threshold_abs,
strategy,
before_tokens,
after_tokens,
provider_native,
cache_ttl: _,
} => {
assert_eq!(agent, "researcher");
assert_eq!(
loop_id.as_deref(),
Some("11111111-2222-3333-4444-555555555555")
);
assert_eq!(turn, Some(3));
assert_eq!(threshold_pct, Some(70));
assert_eq!(threshold_abs, Some(140_000));
assert_eq!(strategy, "drop_thinking_blocks");
assert_eq!(before_tokens, 142_000);
assert_eq!(after_tokens, 96_000);
assert!(!provider_native);
}
other => panic!("expected ContextCompacted, got {other:?}"),
}
}
#[test]
fn context_compacted_event_provider_native_roundtrips() {
let e = EngineEvent::ContextCompacted {
agent: "summarizer".into(),
loop_id: None,
turn: None,
threshold_pct: None,
threshold_abs: None,
strategy: "provider_native".into(),
before_tokens: 180_000,
after_tokens: 42_000,
provider_native: true,
cache_ttl: Some("1h".to_string()),
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"provider_native\":true"), "{s}");
assert!(s.contains("\"strategy\":\"provider_native\""), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ContextCompacted {
provider_native,
loop_id,
turn,
..
} => {
assert!(provider_native);
assert!(loop_id.is_none());
assert!(turn.is_none());
}
other => panic!("expected ContextCompacted, got {other:?}"),
}
}
#[test]
fn context_overflow_event_roundtrips_full_payload() {
let e = EngineEvent::ContextOverflow {
agent: "researcher".into(),
attempted_strategies: vec![
"drop_thinking_blocks".into(),
"drop_oldest_tool_results".into(),
"summarize_to_state".into(),
],
configured_cap_tokens: 200_000,
model_context_window: 200_000,
terminated_by_hard_error: false,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"type\":\"ContextOverflow\""), "{s}");
assert!(s.contains("\"agent\":\"researcher\""), "{s}");
assert!(s.contains("\"configured_cap_tokens\":200000"), "{s}");
assert!(s.contains("\"model_context_window\":200000"), "{s}");
assert!(s.contains("\"drop_thinking_blocks\""), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ContextOverflow {
agent,
attempted_strategies,
configured_cap_tokens,
model_context_window,
terminated_by_hard_error,
} => {
assert_eq!(agent, "researcher");
assert_eq!(attempted_strategies.len(), 3);
assert_eq!(attempted_strategies[0], "drop_thinking_blocks");
assert_eq!(configured_cap_tokens, 200_000);
assert_eq!(model_context_window, 200_000);
assert!(!terminated_by_hard_error);
}
other => panic!("expected ContextOverflow, got {other:?}"),
}
}
#[test]
fn runtime_start_serializes_with_expected_fields() {
let ev = EngineEvent::RuntimeStart {
task_name: "analyze".into(),
runtime_name: "run_python".into(),
language: "python".into(),
};
let j = serde_json::to_value(&ev).unwrap();
assert_eq!(j["type"], "RuntimeStart");
assert_eq!(j["payload"]["task_name"], "analyze");
assert_eq!(j["payload"]["runtime_name"], "run_python");
assert_eq!(j["payload"]["language"], "python");
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::RuntimeStart {
task_name,
runtime_name,
language,
} => {
assert_eq!(task_name, "analyze");
assert_eq!(runtime_name, "run_python");
assert_eq!(language, "python");
}
other => panic!("expected RuntimeStart, got {other:?}"),
}
}
#[test]
fn runtime_stdout_stderr_roundtrip() {
let stdout = EngineEvent::RuntimeStdout {
task_name: "t".into(),
chunk: "hello\n".into(),
};
let stderr = EngineEvent::RuntimeStderr {
task_name: "t".into(),
chunk: "warn\n".into(),
};
let j_out = serde_json::to_value(&stdout).unwrap();
let j_err = serde_json::to_value(&stderr).unwrap();
assert_eq!(j_out["type"], "RuntimeStdout");
assert_eq!(j_err["type"], "RuntimeStderr");
assert_eq!(j_out["payload"]["chunk"], "hello\n");
assert_eq!(j_err["payload"]["chunk"], "warn\n");
}
#[test]
fn runtime_end_serializes_with_exit_code_and_duration() {
let ev = EngineEvent::RuntimeEnd {
task_name: "analyze".into(),
exit_code: 0,
duration_ms: 1234,
};
let j = serde_json::to_value(&ev).unwrap();
assert_eq!(j["type"], "RuntimeEnd");
assert_eq!(j["payload"]["exit_code"], 0);
assert_eq!(j["payload"]["duration_ms"], 1234);
let back: EngineEvent = serde_json::from_value(j).unwrap();
match back {
EngineEvent::RuntimeEnd {
exit_code,
duration_ms,
..
} => {
assert_eq!(exit_code, 0);
assert_eq!(duration_ms, 1234);
}
other => panic!("expected RuntimeEnd, got {other:?}"),
}
}
#[test]
fn runtime_error_serializes_with_kind_and_message() {
let ev = EngineEvent::RuntimeError {
task_name: "t".into(),
kind: "Timeout".into(),
message: "exceeded 30s".into(),
};
let j = serde_json::to_value(&ev).unwrap();
assert_eq!(j["type"], "RuntimeError");
assert_eq!(j["payload"]["kind"], "Timeout");
assert_eq!(j["payload"]["message"], "exceeded 30s");
}
#[test]
fn task_cache_hit_serializes_with_agent_and_key_prefix() {
let ev = EngineEvent::TaskCacheHit {
agent: "Researcher".into(),
key_prefix: "f7d3a9".into(),
};
let j = serde_json::to_value(&ev).unwrap();
assert_eq!(j["type"], "TaskCacheHit");
assert_eq!(j["payload"]["agent"], "Researcher");
assert_eq!(j["payload"]["key_prefix"], "f7d3a9");
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::TaskCacheHit { agent, key_prefix } => {
assert_eq!(agent, "Researcher");
assert_eq!(key_prefix, "f7d3a9");
}
other => panic!("expected TaskCacheHit, got {other:?}"),
}
}
#[test]
fn validation_failure_caps_oversized_response() {
let huge = "x".repeat(VALIDATION_FAILURE_RESPONSE_CAP_BYTES + 1024);
let ev = EngineEvent::validation_failure(
"t".to_string(),
1,
huge.clone(),
vec![],
vec![],
vec![],
None,
);
match ev {
EngineEvent::ValidationFailure {
model_response,
truncated,
total_length,
..
} => {
assert!(truncated, "should be truncated");
assert_eq!(total_length, huge.len() as u64);
assert!(model_response.len() <= VALIDATION_FAILURE_RESPONSE_CAP_BYTES);
}
other => panic!("expected ValidationFailure, got {other:?}"),
}
}
#[test]
fn validation_failure_under_cap_is_unchanged() {
let body = "{\"k\": \"v\"}".to_string();
let ev = EngineEvent::validation_failure(
"t".to_string(),
1,
body.clone(),
vec![],
vec![],
vec![],
None,
);
match ev {
EngineEvent::ValidationFailure {
model_response,
truncated,
total_length,
..
} => {
assert!(!truncated);
assert_eq!(total_length, body.len() as u64);
assert_eq!(model_response, body);
}
other => panic!("expected ValidationFailure, got {other:?}"),
}
}
#[test]
fn tool_approval_resolved_round_trips() {
let ev = EngineEvent::ToolApprovalResolved {
token: "tok_abc".into(),
approved: true,
args_override: Some(serde_json::json!({"safe": true})),
reason: Some("operator approved".into()),
};
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ToolApprovalResolved {
token,
approved,
reason,
..
} => {
assert_eq!(token, "tok_abc");
assert!(approved);
assert_eq!(reason.as_deref(), Some("operator approved"));
}
other => panic!("expected ToolApprovalResolved, got {other:?}"),
}
}
#[test]
fn tool_approval_skipped_round_trips() {
let ev = EngineEvent::ToolApprovalSkipped {
execution_id: Some("exec_1".into()),
node_id: Some(7),
tool_ref: "gh.list_issues".into(),
reason: "policy:read_only".into(),
};
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ToolApprovalSkipped {
tool_ref, reason, ..
} => {
assert_eq!(tool_ref, "gh.list_issues");
assert_eq!(reason, "policy:read_only");
}
other => panic!("expected ToolApprovalSkipped, got {other:?}"),
}
}
#[test]
fn tool_replay_uncertain_round_trips() {
let args = serde_json::json!({"channel": "general", "text": "hi"});
let ev = EngineEvent::ToolReplayUncertain {
execution_id: Some("exec_42".into()),
tool_use_id: "tu_abc".into(),
tool_name: "send_message".into(),
args: args.clone(),
};
let s = serde_json::to_string(&ev).unwrap();
assert!(s.contains("\"type\":\"ToolReplayUncertain\""), "{s}");
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::ToolReplayUncertain {
execution_id,
tool_use_id,
tool_name,
args: a,
} => {
assert_eq!(execution_id.as_deref(), Some("exec_42"));
assert_eq!(tool_use_id, "tu_abc");
assert_eq!(tool_name, "send_message");
assert_eq!(a, args);
}
other => panic!("expected ToolReplayUncertain, got {other:?}"),
}
}
#[test]
fn llm_replay_cache_hit_round_trips() {
let ev = EngineEvent::LLMReplayCacheHit {
node_id: "n42".into(),
call_index: 3,
};
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::LLMReplayCacheHit {
node_id,
call_index,
} => {
assert_eq!(node_id, "n42");
assert_eq!(call_index, 3);
}
other => panic!("expected LLMReplayCacheHit, got {other:?}"),
}
}
#[test]
fn loop_turn_carries_usage_when_present_and_omits_when_none() {
let ev = EngineEvent::LoopTurn {
name: "review".into(),
turn: 4,
tool_calls: vec!["fetch".into()],
usage: Some(TokenUsage {
input_tokens: 100,
output_tokens: 25,
model: "claude".into(),
provider: "anthropic".into(),
..Default::default()
}),
};
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::LoopTurn { usage, .. } => {
let u = usage.expect("usage present");
assert_eq!(u.input_tokens, 100);
assert_eq!(u.output_tokens, 25);
}
other => panic!("expected LoopTurn, got {other:?}"),
}
let ev2 = EngineEvent::LoopTurn {
name: "review".into(),
turn: 1,
tool_calls: vec![],
usage: None,
};
let s2 = serde_json::to_string(&ev2).unwrap();
assert!(
!s2.contains("\"usage\""),
"usage should be skipped when None: {s2}"
);
}
#[test]
fn sub_script_carries_parent_node_id_and_attempt() {
let inner = EngineEvent::Log("hi".into());
let ev = EngineEvent::SubScript {
script_name: "child".into(),
parent_task: "result".into(),
parent_node_id: Some(13),
attempt: Some(2),
parent_path: Vec::new(),
child: Box::new(inner),
};
let s = serde_json::to_string(&ev).unwrap();
let back: EngineEvent = serde_json::from_str(&s).unwrap();
match back {
EngineEvent::SubScript {
parent_node_id,
attempt,
..
} => {
assert_eq!(parent_node_id, Some(13));
assert_eq!(attempt, Some(2));
}
other => panic!("expected SubScript, got {other:?}"),
}
}
#[test]
fn sub_script_back_compat_omits_new_fields_when_none() {
let inner = EngineEvent::Log("hi".into());
let ev = EngineEvent::SubScript {
script_name: "child".into(),
parent_task: "result".into(),
parent_node_id: None,
attempt: None,
parent_path: Vec::new(),
child: Box::new(inner),
};
let s = serde_json::to_string(&ev).unwrap();
assert!(
!s.contains("\"parent_node_id\""),
"should be omitted when None: {s}"
);
assert!(
!s.contains("\"attempt\""),
"should be omitted when None: {s}"
);
}
#[test]
fn token_usage_raw_stop_reason_round_trips() {
let u = TokenUsage {
input_tokens: 10,
output_tokens: 5,
model: "claude-sonnet-4-6".into(),
provider: "anthropic".into(),
stop_reason: Some("end_turn".into()),
raw_stop_reason: Some("end_turn".into()),
..Default::default()
};
let s = serde_json::to_string(&u).unwrap();
assert!(s.contains("\"raw_stop_reason\":\"end_turn\""), "{s}");
let back: TokenUsage = serde_json::from_str(&s).unwrap();
assert_eq!(back.raw_stop_reason.as_deref(), Some("end_turn"));
assert_eq!(back.stop_reason.as_deref(), Some("end_turn"));
}
#[test]
fn token_usage_raw_stop_reason_back_compat_pre_field() {
let json = r#"{
"input_tokens": 10,
"output_tokens": 5,
"model": "m",
"provider": "p",
"cached_input_tokens": 0,
"stop_reason": "stop"
}"#;
let u: TokenUsage = serde_json::from_str(json).unwrap();
assert_eq!(u.raw_stop_reason, None);
assert_eq!(u.stop_reason.as_deref(), Some("stop"));
}
}