akribes_types/event.rs
1use crate::ast::{ActorHint, Span, TypeRef};
2use crate::error::{ErrorCode, ErrorKind, ErrorSource};
3use crate::value::Value;
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use std::collections::HashMap;
6
7/// Node-id type used by `EngineEvent::NodeStart` / `NodeEnd` /
8/// `Breakpoint*`. Mirror of `akribes_core::compiler::NodeId` — same
9/// underlying `usize` representation; the alias lives here so the
10/// SDK-facing `EngineEvent` doesn't need to depend on the compiler
11/// module.
12pub type NodeId = usize;
13
14/// Caches the LLM-emitted tool-use blocks so a replay rebuilds the same
15/// `tool_use_id`s and the downstream `ToolCallEnd` lookups stay stable.
16/// Mirror of `akribes_core::replay_cache::CachedToolCall` — same wire
17/// shape; the type lives here so it can be embedded in
18/// `EngineEvent::LLMResponse` without an akribes-core dependency.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct CachedToolCall {
21 pub tool_use_id: String,
22 pub name: String,
23 pub args: serde_json::Value,
24}
25
26/// Outcome of a child execution observed by its parent at the
27/// `call(...)` boundary. Mirror of
28/// `akribes_core::replay_cache::ChildOutcome` — same wire shape.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "kind", content = "detail")]
31pub enum ChildOutcome {
32 Ok { value: serde_json::Value },
33 Err { kind: String, message: String, code: Option<String> },
34}
35
36/// Default for `EngineEvent::Error::code` on payloads from older SDK
37/// versions that didn't include the field.
38fn default_error_code_other() -> ErrorCode {
39 ErrorCode::Other
40}
41
42/// Token usage from a single LLM call.
43///
44/// # Normalized superset convention
45///
46/// `input_tokens` is the **total** number of input tokens processed — it
47/// is a superset of `cached_input_tokens` and `cache_write_input_tokens`.
48/// Downstream cost logic derives the "fresh" (non-cached) portion by
49/// subtracting the two cache counts. This matches OpenAI and Gemini's
50/// native reporting; Anthropic's API reports the three groups as disjoint
51/// so the Anthropic parser normalizes by summing before assigning.
52///
53/// # Prompt-caching semantics per provider
54/// - **OpenAI** — `cached_input_tokens` counts cache READS (billed at a
55/// discount, typically 0.1x base input). Cache writes are free per
56/// OpenAI's caching docs. `cache_write_input_tokens` is always 0.
57/// - **Anthropic** — the API returns three token groups:
58/// `input_tokens` (fresh, 1x), `cache_read_input_tokens` (0.1x), and
59/// `cache_creation_input_tokens` (1.25x at the default 5-minute TTL,
60/// 2.0x at 1-hour TTL). The parser remaps these to the superset
61/// convention above. The per-TTL split is surfaced as
62/// `cache_write_5m_input_tokens` and `cache_write_1h_input_tokens`
63/// (parsed from `usage.cache_creation.ephemeral_5m_input_tokens` /
64/// `ephemeral_1h_input_tokens`); these two fields sum to
65/// `cache_write_input_tokens`. Akribes workflows opt into the 1h TTL
66/// via the `extended-cache-ttl-2025-04-11` beta header, so this split
67/// matters for cost accounting (#1091).
68/// - **Gemini** — only cache reads are reported; writes are not
69/// separately billed. `cache_write_input_tokens` is always 0.
70#[derive(Debug, Clone, Serialize, Deserialize, Default)]
71pub struct TokenUsage {
72 /// Total input tokens processed (superset of the two cache counts).
73 pub input_tokens: u64,
74 pub output_tokens: u64,
75 pub model: String,
76 pub provider: String,
77 /// Cache-READ tokens (billed at `CACHE_READ_RATE`, ~0.1x input).
78 pub cached_input_tokens: u64,
79 /// Cache-WRITE / creation tokens (Anthropic only today; billed at
80 /// `CACHE_WRITE_RATE`, 1.25x input at 5m TTL or 2.0x at 1h TTL).
81 /// This is the **total** across both TTL buckets; the breakdown
82 /// lives on [`Self::cache_write_5m_input_tokens`] and
83 /// [`Self::cache_write_1h_input_tokens`] (#1091). Serialized
84 /// default for backward-compatibility with events predating this
85 /// field.
86 #[serde(default)]
87 pub cache_write_input_tokens: u64,
88 /// Anthropic cache-WRITE tokens at the default 5-minute TTL,
89 /// parsed from `usage.cache_creation.ephemeral_5m_input_tokens`.
90 /// Subset of [`Self::cache_write_input_tokens`] — sums with
91 /// [`Self::cache_write_1h_input_tokens`] to the total. `0` on
92 /// providers that don't report the per-TTL breakdown (OpenAI,
93 /// Gemini, mock) and for pre-#1091 events that omit the field.
94 #[serde(default)]
95 pub cache_write_5m_input_tokens: u64,
96 /// Anthropic cache-WRITE tokens at the 1-hour TTL, parsed from
97 /// `usage.cache_creation.ephemeral_1h_input_tokens`. Subset of
98 /// [`Self::cache_write_input_tokens`] — sums with
99 /// [`Self::cache_write_5m_input_tokens`] to the total. `0` on
100 /// providers without per-TTL reporting (OpenAI, Gemini, mock) and
101 /// for pre-#1091 events that omit the field. The 1h-TTL bucket
102 /// bills at 2.0x base input vs. 1.25x for 5m — `pricing::compute_cost`
103 /// uses this split for accurate cost attribution (#1091).
104 #[serde(default)]
105 pub cache_write_1h_input_tokens: u64,
106 /// The provider-reported stop reason for the underlying call, when
107 /// known. Anthropic surfaces values like `"end_turn"`, `"max_tokens"`,
108 /// `"tool_use"`, `"stop_sequence"`. OpenAI: `"stop"`, `"length"`,
109 /// `"tool_calls"`. Gemini: `"STOP"`, `"MAX_TOKENS"`, etc.
110 ///
111 /// Carried alongside usage so the engine's validation-failure path can
112 /// distinguish "model truncated mid-output" (`max_tokens` / `length` /
113 /// `MAX_TOKENS`) from "model finished cleanly but produced an
114 /// invalid shape" — see issue #320 / #321. `None` for providers that
115 /// don't surface a stop reason or for paths that haven't been threaded
116 /// (e.g. the mock provider). Serialized with `#[serde(default)]` so old
117 /// wire payloads that omit the field still deserialize.
118 ///
119 /// Today this field carries the RAW provider value when the
120 /// `parse_*_usage` path produced the `TokenUsage` (the common case
121 /// for non-streamed calls). The `usage_from_outcome` rebuild path
122 /// (streaming + some retry paths) writes the OTel-canonical form
123 /// (`"stop"` / `"max_tokens"` / `"tool_use"` / `"content_filter"` /
124 /// `"other"`) because `LlmCallOutcome` only carries the canonical
125 /// form. Consumers that need a deterministic-by-provider raw value
126 /// should prefer [`Self::raw_stop_reason`] (#1077).
127 #[serde(default)]
128 pub stop_reason: Option<String>,
129 /// Raw provider stop reason, never lossy-mapped to OTel canonical
130 /// form. Set to the same value as [`Self::stop_reason`] when the
131 /// `parse_*_usage` path produced the usage; `None` otherwise
132 /// (mock, streaming rebuilds via `usage_from_outcome`).
133 ///
134 /// Bench / observability code that needs to distinguish Gemini's
135 /// `"STOP"` from `"RECITATION"` (both collapse to `"stop"` under
136 /// the canonical mapping) or Anthropic's `"stop_sequence"` from
137 /// `"end_turn"` should read this field. #1077.
138 #[serde(default)]
139 pub raw_stop_reason: Option<String>,
140 /// Reasoning / thinking tokens — a SUBSET of [`Self::output_tokens`],
141 /// not in addition. Captured from:
142 /// * OpenAI o-series + GPT-5: `usage.completion_tokens_details.reasoning_tokens`
143 /// * Anthropic extended-thinking: `usage.thinking_tokens` (when present)
144 /// * Gemini with `thinkingBudget` set: `usageMetadata.thoughtsTokenCount`
145 ///
146 /// `0` when the model didn't engage reasoning or the provider didn't
147 /// surface the breakdown. `#[serde(default)]` keeps wire-compat with
148 /// pre-#322 events that omit the field entirely.
149 #[serde(default)]
150 pub reasoning_tokens: u64,
151}
152
153/// One ancestor frame on a flattened [`EngineEvent::SubScript`] payload.
154///
155/// Issue #993: `EngineEvent::SubScript` used to wrap a nested
156/// `Box<EngineEvent>` per call-stack level, so a depth-N chain produced an
157/// event whose serialized payload was `O(N)` deep — a 10-level art_123_2
158/// fanout could blow a single SSE frame past a megabyte and choke
159/// reconnect logic. The new wire shape carries the leaf event flat and
160/// names ancestors via an ordered `parent_path` of these frames so SDKs
161/// rebuild the tree off the side without paying the recursion cost on
162/// every envelope.
163///
164/// Frame ordering: `parent_path[0]` is the outermost ancestor (direct
165/// child of the top-level workflow); `parent_path[len-1]` is the
166/// immediate parent of the currently-emitting sub-script (whose own
167/// `script_name` lives on the [`EngineEvent::SubScript`] fields, NOT
168/// inside `parent_path`).
169#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
170pub struct SubScriptFrame {
171 /// Called script's name as it lives in the parent's project. Matches
172 /// the `script_name` an outer SubScript envelope would have carried
173 /// in the legacy recursive shape.
174 pub script_name: String,
175 /// Variable name on the parent side that received the call result —
176 /// same semantics as the top-level [`EngineEvent::SubScript::parent_task`].
177 pub parent_task: String,
178 /// Compiler-stable id of the parent's call(...) node, when known.
179 /// Lets consumers correlate retries of the same call site (issue
180 /// #845) across the ancestor chain.
181 #[serde(default, skip_serializing_if = "Option::is_none")]
182 pub parent_node_id: Option<u64>,
183 /// 1-indexed attempt counter for author-raise retries at the same
184 /// call site. Matches [`EngineEvent::SubScript::attempt`] semantics.
185 #[serde(default, skip_serializing_if = "Option::is_none")]
186 pub attempt: Option<u8>,
187}
188
189/// Aggregate token + cost rollup emitted on [`EngineEvent::WorkflowEnd`].
190///
191/// Issue #1173: today's `WorkflowEnd` carries just the workflow's return
192/// value, so dashboards and the CLI re-walk every `TaskEnd` to compute
193/// per-execution totals. This struct surfaces the same numbers once on
194/// the terminating event so consumers can size the run without parsing
195/// the full event log.
196///
197/// The engine populates this by summing [`TokenUsage`] fields across
198/// every `TaskEnd` emitted in the workflow scope. Sub-script TaskEnds
199/// (events wrapped inside [`EngineEvent::SubScript`]) DO contribute —
200/// the engine relay forwards them to the parent counter so a chain's
201/// outer `WorkflowEnd` reflects the entire chain's spend.
202///
203/// `total_cost_usd` is the only field the engine can't always populate
204/// on its own — pricing tables live in `akribes-server` to keep
205/// `akribes-core` free of provider rate metadata. The field defaults to
206/// `0.0` and stays `0.0` when the engine has no cost data; downstream
207/// enrichers may overwrite it on the server side before persistence.
208#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
209pub struct WorkflowTotals {
210 /// Sum of [`TokenUsage::input_tokens`] across every `TaskEnd` in the
211 /// workflow scope (including sub-script `TaskEnd`s). Superset of
212 /// cached + cache-write — see [`TokenUsage`] for the convention.
213 #[serde(default)]
214 pub total_input_tokens: u64,
215 /// Sum of [`TokenUsage::output_tokens`].
216 #[serde(default)]
217 pub total_output_tokens: u64,
218 /// Sum of [`TokenUsage::cached_input_tokens`] (cache READS).
219 #[serde(default)]
220 pub total_cached_input_tokens: u64,
221 /// Sum of [`TokenUsage::reasoning_tokens`] (extended thinking /
222 /// reasoning tokens — a SUBSET of `total_output_tokens`).
223 #[serde(default)]
224 pub total_thinking_tokens: u64,
225 /// Tokens spent on tool-call traffic. Reserved for future per-tool
226 /// breakdown — the engine doesn't track this separately today, so
227 /// it stays `0`. Present on the wire so SDKs can render the slot
228 /// without a schema bump when the engine starts populating it.
229 #[serde(default)]
230 pub total_tool_tokens: u64,
231 /// Sum of per-task USD cost. Always `0.0` when emitted by the
232 /// engine — pricing lives in `akribes-server`. Server-side
233 /// enrichment may overwrite before persistence.
234 #[serde(default)]
235 pub total_cost_usd: f64,
236 /// Number of `TaskEnd` events folded into the totals above.
237 #[serde(default)]
238 pub task_count: u32,
239}
240
241impl WorkflowTotals {
242 /// Fold a single [`TokenUsage`] into the running totals. No-op for
243 /// `None` so the engine can call it on every `TaskEnd` without
244 /// branching on the optional `usage` field.
245 pub fn accumulate(&mut self, usage: Option<&TokenUsage>) {
246 self.task_count = self.task_count.saturating_add(1);
247 if let Some(u) = usage {
248 self.total_input_tokens = self.total_input_tokens.saturating_add(u.input_tokens);
249 self.total_output_tokens = self.total_output_tokens.saturating_add(u.output_tokens);
250 self.total_cached_input_tokens =
251 self.total_cached_input_tokens.saturating_add(u.cached_input_tokens);
252 self.total_thinking_tokens =
253 self.total_thinking_tokens.saturating_add(u.reasoning_tokens);
254 }
255 }
256
257 /// Merge another `WorkflowTotals` (e.g. from a sub-script) into self.
258 /// Used by the engine when the relay surfaces a child workflow's
259 /// rollup — the outer chain's totals subsume every sub-script's.
260 pub fn merge(&mut self, other: &WorkflowTotals) {
261 self.total_input_tokens =
262 self.total_input_tokens.saturating_add(other.total_input_tokens);
263 self.total_output_tokens =
264 self.total_output_tokens.saturating_add(other.total_output_tokens);
265 self.total_cached_input_tokens = self
266 .total_cached_input_tokens
267 .saturating_add(other.total_cached_input_tokens);
268 self.total_thinking_tokens =
269 self.total_thinking_tokens.saturating_add(other.total_thinking_tokens);
270 self.total_tool_tokens =
271 self.total_tool_tokens.saturating_add(other.total_tool_tokens);
272 self.total_cost_usd += other.total_cost_usd;
273 self.task_count = self.task_count.saturating_add(other.task_count);
274 }
275}
276
277/// Payload of [`EngineEvent::WorkflowEnd`]. Pairs the workflow's
278/// terminal output value with an aggregate [`WorkflowTotals`] rollup.
279///
280/// Serialized with a hand-written `Serialize` / `Deserialize` so the
281/// wire stays bridge-compatible between the legacy (pre-#1173) shape
282/// (`payload = <bare-value>`) and the new shape
283/// (`payload = {"value": <bare-value>, "total_input_tokens": N, ...}`).
284/// See the [`EngineEvent::WorkflowEnd`] doc for the disambiguation
285/// rule.
286#[derive(Debug, Clone, PartialEq)]
287pub struct WorkflowEndPayload {
288 /// The workflow's final return value (the historical payload).
289 pub value: Value,
290 /// Aggregate rollup across every `TaskEnd` in the workflow scope.
291 pub totals: WorkflowTotals,
292}
293
294impl Default for WorkflowEndPayload {
295 fn default() -> Self {
296 Self { value: Value::Null, totals: WorkflowTotals::default() }
297 }
298}
299
300impl WorkflowEndPayload {
301 /// Construct a payload from just a value (totals default to zero).
302 /// Used by call sites that don't have totals yet — they get the
303 /// historical behaviour automatically.
304 pub fn new(value: Value) -> Self {
305 Self { value, totals: WorkflowTotals::default() }
306 }
307
308 /// Construct a payload with both value and totals.
309 pub fn with_totals(value: Value, totals: WorkflowTotals) -> Self {
310 Self { value, totals }
311 }
312}
313
314// Allow call sites that have historically passed a bare `Value` to keep
315// doing so via `EngineEvent::WorkflowEnd(value.into())`. Tests and the
316// CLI fixture use this; the engine emit path uses the explicit
317// `WorkflowEndPayload::with_totals` constructor.
318impl From<Value> for WorkflowEndPayload {
319 fn from(value: Value) -> Self {
320 Self::new(value)
321 }
322}
323
324impl Serialize for WorkflowEndPayload {
325 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
326 use serde::ser::SerializeMap;
327 let mut map = s.serialize_map(Some(8))?;
328 // Emit the workflow output under "value" using the clean wire
329 // form (Value::to_wire_json) so downstream consumers see the
330 // same shape they always did, just one level nested.
331 map.serialize_entry("value", &self.value.to_wire_json())?;
332 // Aggregate totals — flat siblings of `value`. Matches issue
333 // #1173's wire shape exactly.
334 map.serialize_entry("total_input_tokens", &self.totals.total_input_tokens)?;
335 map.serialize_entry("total_output_tokens", &self.totals.total_output_tokens)?;
336 map.serialize_entry(
337 "total_cached_input_tokens",
338 &self.totals.total_cached_input_tokens,
339 )?;
340 map.serialize_entry("total_thinking_tokens", &self.totals.total_thinking_tokens)?;
341 map.serialize_entry("total_tool_tokens", &self.totals.total_tool_tokens)?;
342 map.serialize_entry("total_cost_usd", &self.totals.total_cost_usd)?;
343 map.serialize_entry("task_count", &self.totals.task_count)?;
344 map.end()
345 }
346}
347
348impl<'de> Deserialize<'de> for WorkflowEndPayload {
349 fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
350 // Read into a raw JSON value first so we can dispatch on shape.
351 let raw = serde_json::Value::deserialize(d)?;
352 // New shape disambiguator: an object that carries BOTH a
353 // `value` key AND at least one `total_*` aggregate key (or
354 // `task_count`). Anything else is the legacy bare-value form.
355 const AGG_KEYS: &[&str] = &[
356 "total_input_tokens",
357 "total_output_tokens",
358 "total_cached_input_tokens",
359 "total_thinking_tokens",
360 "total_tool_tokens",
361 "total_cost_usd",
362 "task_count",
363 ];
364 if let serde_json::Value::Object(map) = &raw {
365 let has_value = map.contains_key("value");
366 let has_any_agg = AGG_KEYS.iter().any(|k| map.contains_key(*k));
367 if has_value && has_any_agg {
368 let value = Value::from_json(map.get("value").unwrap());
369 let totals = WorkflowTotals {
370 total_input_tokens: map
371 .get("total_input_tokens")
372 .and_then(|v| v.as_u64())
373 .unwrap_or(0),
374 total_output_tokens: map
375 .get("total_output_tokens")
376 .and_then(|v| v.as_u64())
377 .unwrap_or(0),
378 total_cached_input_tokens: map
379 .get("total_cached_input_tokens")
380 .and_then(|v| v.as_u64())
381 .unwrap_or(0),
382 total_thinking_tokens: map
383 .get("total_thinking_tokens")
384 .and_then(|v| v.as_u64())
385 .unwrap_or(0),
386 total_tool_tokens: map
387 .get("total_tool_tokens")
388 .and_then(|v| v.as_u64())
389 .unwrap_or(0),
390 total_cost_usd: map
391 .get("total_cost_usd")
392 .and_then(|v| v.as_f64())
393 .unwrap_or(0.0),
394 task_count: map
395 .get("task_count")
396 .and_then(|v| v.as_u64())
397 .map(|n| n as u32)
398 .unwrap_or(0),
399 };
400 return Ok(WorkflowEndPayload { value, totals });
401 }
402 }
403 // Legacy bare-value form: the payload IS the workflow output.
404 Ok(WorkflowEndPayload {
405 value: Value::from_json(&raw),
406 totals: WorkflowTotals::default(),
407 })
408 }
409}
410
411/// Wire-format twin of [`crate::validation::ValidationError`]. Owned +
412/// serializable; the `stage` discriminator is a string (`"parse"`,
413/// `"schema"`, `"custom:<rule>"`) so SDK consumers don't need to round-trip
414/// through the internal enum.
415///
416/// Produced by [`crate::validation::ValidationError::to_wire`]. The internal
417/// `SchemaCompile` stage is intentionally not representable here — those
418/// errors short-circuit to [`crate::value::Value::FatalError`] before any
419/// [`EngineEvent::Suspended`] would be emitted (they're engine bugs, not
420/// model bugs; author can't fix them by reviewing a payload). See the
421/// authoritative decision in
422/// `docs/superpowers/plans/2026-04-18-epa-feature-tracker.md`
423/// ("Wave-1 M1 ship notes + cross-cutting decisions (2026-04-18, round 3)").
424#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
425pub struct ValidationErrorWire {
426 pub stage: String,
427 pub message: String,
428 pub path: Option<String>,
429}
430
431/// Why the engine suspended execution at a checkpoint. This is the canonical
432/// wire-shape for the `trigger` discriminator on [`EngineEvent::Suspended`];
433/// Stream 4 (#149 + #156 approach C) populates the `AgentUnable` variant,
434/// and Stream 6 (this stream, #156 approach B) populates
435/// `ValidationExhausted`.
436///
437/// Serde-tagged with an internal `"kind"` discriminator so deserializers can
438/// match on a single field without peeking at shape. Each variant carries
439/// its payload inline — there is no sidecar `unable_payload` / `exhaustion`
440/// field on `Suspended` itself.
441///
442/// Spec: `docs/superpowers/specs/2026-04-18-epa-checkpoint-validation-design.md`.
443/// The wire shape is locked per the Wave-1 round-3 tracker decision
444/// ("Suspended wire shape (S4 ↔ S6) = S6's embedded-payload shape").
445#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
446#[serde(tag = "kind")]
447pub enum SuspendTrigger {
448 /// The DAG reached an explicit `checkpoint cp(...)` call site. This is
449 /// today's only behaviour; the variant carries no payload because the
450 /// checkpoint's own `expects:` schema fully describes what comes back
451 /// on resume.
452 DagPosition,
453 /// The task's `on_validation_exhausted:` property fired: all
454 /// validation retries consumed without producing a payload that passes
455 /// the parse→schema→custom pipeline. Studio / SDKs render the last
456 /// failing attempt + its errors so the human can correct in place.
457 ValidationExhausted {
458 task_name: String,
459 retry_count: u32,
460 last_attempt: String,
461 validation_errors: Vec<ValidationErrorWire>,
462 },
463 /// Reserved for Stream 4 (#149 + #156 approach C) — emitted when a
464 /// task with a `T | Unable` return type produces an `Unable` value and
465 /// the flow routes it to a checkpoint via `on unable <cp>`. Shape is
466 /// invariant across Stream 4's four `on unable` forms: the payload is
467 /// always the `Unable` record (reason/missing/category). Not produced
468 /// by the engine in Stream 6 — defined here because Stream 6 owns the
469 /// canonical `SuspendTrigger` type so Stream 4's engine-emit site is
470 /// pure code addition, no wire-shape change.
471 AgentUnable {
472 task_name: String,
473 unable: crate::value::UnableRecord,
474 },
475 /// Emitted when a task whose declared return type is a discriminated
476 /// union `A | B | ... | Unable` produces a non-Unable variant and the
477 /// flow routes that variant to a checkpoint via `on <Variant> <cp>`.
478 /// `variant` is the canonical record name (PascalCase, matches the
479 /// source identifier); `payload` is the parsed record (with the
480 /// `kind` discriminator stripped). Studio renders a generic
481 /// "agent returned variant <X>" badge on this trigger.
482 ///
483 /// `AgentUnable` remains a specialization for the `Unable` arm —
484 /// rather than `AgentVariant { variant: "Unable", ... }` — to
485 /// preserve the existing Studio rendering path from #157 (zero-day
486 /// compat).
487 AgentVariant {
488 task_name: String,
489 variant: String,
490 payload: serde_json::Value,
491 },
492}
493
494impl Default for SuspendTrigger {
495 fn default() -> Self {
496 // Every `Suspended` event had this shape before Stream 6 landed;
497 // `#[serde(default)]` on the field means old servers' payloads
498 // (which omit the field entirely) deserialize as `DagPosition`.
499 SuspendTrigger::DagPosition
500 }
501}
502
503/// Mid-loop checkpoint context attached to [`EngineEvent::Suspended`] when a
504/// suspension fires inside a `loop` block's per-turn dispatch (a skill task
505/// invoked as a loop tool whose `on_validation_exhausted` / `on unable`
506/// handler routes to a checkpoint).
507///
508/// This metadata travels alongside the regular `SuspendTrigger`: the trigger
509/// describes *why* the engine paused (DagPosition / ValidationExhausted /
510/// AgentUnable / AgentVariant), and `LoopSuspendContext` tells consumers
511/// *which loop turn* the suspension belongs to so resumption can be
512/// rendered in the loop's UI lane and so the spawn handler's persisted
513/// envelope carries enough context to reconstruct the loop's identity if the
514/// execution is later restarted (driver state versioning is a follow-up; the
515/// current cycle relies on the in-process await-point to hold the loop's
516/// stack).
517///
518/// Serialized as a flat object (`{loop_id, loop_name, turn}`) on the wire.
519/// `#[serde(default)]` on the field on [`EngineEvent::Suspended`] keeps
520/// wire-compat with older servers / SDKs that don't emit it (they
521/// deserialize as `None` — a non-loop suspension).
522#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
523pub struct LoopSuspendContext {
524 /// UUID v4 generated when the loop driver started; stable for the
525 /// lifetime of one `loop NAME(...)` invocation. Used by the spawn
526 /// handler / Studio to correlate `LoopStart` → `Suspended` → `Resumed`
527 /// → subsequent `LoopTurn` events into the same per-loop UI lane.
528 pub loop_id: String,
529 /// The declared `loop NAME(...)` identifier. Same value as
530 /// [`EngineEvent::LoopStart::name`] / [`EngineEvent::LoopTurn::name`].
531 pub loop_name: String,
532 /// 1-indexed turn the suspension occurred during. Matches the next
533 /// [`EngineEvent::LoopTurn::turn`] the engine will emit when the
534 /// suspension resumes and the turn settles.
535 pub turn: u32,
536}
537
538/// Discriminator for [`EngineEvent::TaskEnd`] that tells consumers *how* a
539/// task finished without having to introspect the `value` payload. Extracted
540/// in #206 (Stream 4 follow-up): before this, a caller had to inspect the
541/// `value` for a `Value::Unable` envelope to distinguish "the agent said I
542/// can't" from a well-typed successful return. Serde-tagged on a `"variant"`
543/// field so new arms ship without wire-shape churn.
544///
545/// `#[serde(other)]` on [`TaskEndVariant::Unknown`] preserves forward-compat:
546/// a future engine that adds e.g. `Partial` (#205) still deserializes on
547/// older SDKs — the unknown variant surfaces as `Unknown` and the stream
548/// keeps flowing. Do *not* match without a wildcard on this enum.
549#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
550#[serde(rename_all = "snake_case")]
551pub enum TaskEndVariant {
552 /// The task produced a well-typed value that passed every stage of the
553 /// parse → schema → custom validation pipeline. This is the pre-#206
554 /// default and the variant carried when `#[serde(default)]` fires on
555 /// payloads that omit the field entirely (older servers).
556 Success,
557 /// The task's declared return type was `T | Unable` and the agent
558 /// emitted a canonical `{"unable": {...}}` envelope. The `value` field
559 /// on [`EngineEvent::TaskEnd`] carries the full [`Value::Unable`] record
560 /// so consumers can render reason/missing/category without re-parsing.
561 Unable,
562 /// The task ended with a dispatch-level failure — provider error,
563 /// sandbox timeout, OOM kill, schema-validation budget exhausted,
564 /// or any other path where the `value` on [`EngineEvent::TaskEnd`]
565 /// is a [`Value::FatalError`]. Consumers grouping by task can use
566 /// this to render a failure UI without inspecting `value`. Emitted
567 /// from the `runtime` dispatch path; LLM tasks may also adopt it in
568 /// a follow-up. Older SDKs that don't know this variant will see it
569 /// as `Unknown` via `#[serde(other)]` and behave as today.
570 Failed,
571 /// Catch-all for future variants the SDK doesn't know yet. `#[serde(other)]`
572 /// routes unknown discriminants here so consumers never crash on a
573 /// newer engine — e.g. `Partial` lands in #205 and an older SDK will
574 /// see it as `Unknown` until its own upgrade.
575 #[serde(other)]
576 Unknown,
577}
578
579impl Default for TaskEndVariant {
580 fn default() -> Self {
581 // Wire-compat with pre-#206 payloads: the field is absent, and
582 // `#[serde(default)]` carries it to `Success`. Every existing
583 // `TaskEnd` event without the discriminator was, by definition, a
584 // success (the Unable path didn't exist yet).
585 TaskEndVariant::Success
586 }
587}
588
589/// `serde` adapters that project a [`Value`] (or a container of it) to and
590/// from the canonical wire form documented in
591/// `docs/src/content/docs/reference/engine-events.mdx`.
592///
593/// The default `Serialize` / `Deserialize` derive on [`Value`] emits the
594/// internal tagged-enum shape (`{"String":"hi"}`, `{"Object":{...}}`, …),
595/// which is fine for caching / hashing but leaks the engine's internal
596/// representation onto the wire. Every [`EngineEvent`] field that carries
597/// a workflow-visible value uses these adapters so SDK consumers see the
598/// clean form spec'd in the docs.
599///
600/// On the read path we reconstruct a [`Value`] from clean JSON via
601/// [`Value::from_json`] — this is shape-preserving (e.g. an `Object` round-
602/// trips, a number becomes `Value::Int` / `Value::Decimal`). Variants that
603/// the engine emits with semantic meaning beyond shape — `Unable`, `Union`,
604/// `FatalError` — are NOT reconstructed back from their wire envelopes on
605/// the deserialize path because no in-process consumer relies on it: the
606/// engine never reads its own emitted JSON back as a `Value`, the Rust SDK
607/// converts via `to_wire_json` again before exposing it, and the durable
608/// replay path branches on a different set of events (`LLMResponse`,
609/// `ToolCall*`, `SubScriptResult`, `CheckpointResolution`) that already
610/// store their payloads as `serde_json::Value`.
611mod value_wire {
612 use super::*;
613
614 pub(super) fn serialize<S: Serializer>(v: &Value, s: S) -> Result<S::Ok, S::Error> {
615 v.to_wire_json().serialize(s)
616 }
617
618 pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Value, D::Error> {
619 let j = serde_json::Value::deserialize(d)?;
620 Ok(Value::from_json(&j))
621 }
622}
623
624mod opt_value_wire {
625 use super::*;
626
627 pub(super) fn serialize<S: Serializer>(v: &Option<Value>, s: S) -> Result<S::Ok, S::Error> {
628 match v {
629 Some(val) => val.to_wire_json().serialize(s),
630 None => s.serialize_none(),
631 }
632 }
633
634 pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Value>, D::Error> {
635 let j = Option::<serde_json::Value>::deserialize(d)?;
636 Ok(j.map(|v| Value::from_json(&v)))
637 }
638}
639
640mod value_map_wire {
641 use super::*;
642
643 pub(super) fn serialize<S: Serializer>(
644 m: &HashMap<String, Value>,
645 s: S,
646 ) -> Result<S::Ok, S::Error> {
647 use serde::ser::SerializeMap;
648 let mut map = s.serialize_map(Some(m.len()))?;
649 for (k, v) in m {
650 map.serialize_entry(k, &v.to_wire_json())?;
651 }
652 map.end()
653 }
654
655 pub(super) fn deserialize<'de, D: Deserializer<'de>>(
656 d: D,
657 ) -> Result<HashMap<String, Value>, D::Error> {
658 let raw = HashMap::<String, serde_json::Value>::deserialize(d)?;
659 Ok(raw.into_iter().map(|(k, v)| (k, Value::from_json(&v))).collect())
660 }
661}
662
663#[derive(Debug, Serialize, Deserialize, Clone)]
664#[serde(tag = "type", content = "payload")]
665pub enum EngineEvent {
666 Log(String),
667 /// Structured log line. Parallel to [`EngineEvent::Log`] but carries a
668 /// severity so consumers (Studio's trace panel, the bench `why_failed`
669 /// runner, the future eval-failure dashboard) can highlight WARNs and
670 /// ERRORs without string-sniffing.
671 ///
672 /// Added in the "why-did-it-fail" infra round so that
673 /// `tracing::warn!` calls in providers.rs / engine.rs ALSO show up in
674 /// the execution event stream — previously they only went to the
675 /// akribes-server stdout that nobody actively watches, which is how the
676 /// `max_tokens=4096` truncation hid for a week. Pre-existing
677 /// [`EngineEvent::Log`] is retained verbatim for wire compat with
678 /// older SDKs that match on the bare-string variant.
679 ///
680 /// `level` is a free-form short string (`"WARN"`, `"ERROR"`,
681 /// `"INFO"`, …) — kept as `String` rather than an enum so a newer
682 /// engine adding `"DEBUG"` doesn't crash an older SDK.
683 LogLevel {
684 level: String,
685 message: String,
686 },
687 StateUpdate(String, #[serde(with = "value_wire")] Value),
688 WorkflowStart(usize), // total tasks
689 TaskStart(String, Option<String>), // (name, on_error policy label)
690 TaskPrompt(String, String), // (task_name, rendered_prompt)
691 TaskEnd {
692 task: String,
693 on_error_label: Option<String>,
694 #[serde(with = "value_wire")]
695 value: Value,
696 /// The declared return type of the task, if any. `None` when the task
697 /// has no `-> Type` annotation (e.g. plain `str` tasks or untyped tasks).
698 value_type: Option<TypeRef>,
699 duration: std::time::Duration,
700 /// 1-indexed attempt count: `1` = first call succeeded, `2` = first
701 /// validation retry succeeded, etc. Resets on task-level `on_error`
702 /// retries (which are orthogonal to validation retries).
703 attempt: u8,
704 usage: Option<TokenUsage>,
705 /// How the task finished. Explicit discriminator so consumers don't
706 /// have to inspect `value` to distinguish Success from Unable.
707 /// `#[serde(default)]` keeps pre-#206 wire payloads deserialising
708 /// cleanly (they become [`TaskEndVariant::Success`]). Forward-compat
709 /// for later expansion (e.g. `Partial` in #205) is provided by
710 /// [`TaskEndVariant::Unknown`].
711 #[serde(default)]
712 variant: TaskEndVariant,
713 },
714 AgentOutput {
715 task_name: String,
716 agent_name: Option<String>,
717 task_id: String,
718 schema_type: Option<String>,
719 chunk: String,
720 },
721 /// Streaming extended-thinking / reasoning fragment from the LLM,
722 /// emitted alongside `AgentOutput` for providers that interleave
723 /// reasoning blocks into a streamed response (Anthropic extended
724 /// thinking, OpenAI o-series + GPT-5 reasoning, Gemini 2.5 thinking).
725 ///
726 /// Issue #1176. Pre-fix the streaming path collapsed every non-text
727 /// rig variant into an empty `AgentOutput` chunk; reasoning content
728 /// was simply lost, so Studio's "thinking" inline panel stayed dark
729 /// for streamed runs even when `reasoning_tokens` showed up on the
730 /// trailing usage. This variant surfaces the same content
731 /// non-streaming consumers see via the provider's message-level
732 /// reasoning block, but as it arrives.
733 ///
734 /// Fields mirror [`Self::AgentOutput`] so a consumer that wants to
735 /// render reasoning identically to text can switch on the variant
736 /// name only. The `chunk` is plain text — encrypted / redacted
737 /// reasoning blocks (Anthropic's opaque thinking signatures) are
738 /// dropped at the provider boundary rather than surfaced here, as
739 /// they have no plain-text equivalent.
740 AgentReasoning {
741 task_name: String,
742 agent_name: Option<String>,
743 task_id: String,
744 /// Mirrors [`Self::AgentOutput::schema_type`] — the declared
745 /// return type, when present. Reasoning is associated with the
746 /// task the engine is currently dispatching.
747 schema_type: Option<String>,
748 chunk: String,
749 },
750 /// Auto cache-breakpoint engine emitted a placement decision for
751 /// the upcoming dispatch. One event per Anthropic structured-output
752 /// call; absent for non-Anthropic providers and for any dispatch
753 /// where `EngineOptions::auto_cache_enabled` is `false`.
754 ///
755 /// Fields mirror `engine_cache::CachePlan` plus a human-readable
756 /// agent label. Captured by the rig path (and any in-process
757 /// observer) to track cache-hit rates across iterations.
758 ///
759 /// # All four marker slots are now reported
760 ///
761 /// Anthropic's 4-marker per-request budget covers four canonical
762 /// slots: `tools`, `system`, `user-msg #1`, `user-msg #2`. Issue
763 /// #472 item 1 brought the tools and system slots under engine
764 /// ownership; the two `*_marker_placed` boolean fields below
765 /// report the engine's decision for each slot. Pre-#472 payloads
766 /// reported only the user-message slots (`markers_placed` /
767 /// `markers_placed_at`); the new booleans default to `false` on
768 /// older wire payloads via `#[serde(default)]`, which preserves
769 /// the historical "engine reports user-message markers only" view
770 /// for legacy consumers reading the JSON.
771 ///
772 /// # Engine intent vs. wire-level cache footprint
773 ///
774 /// Every `*_marker_placed` field describes the engine's INTENT to
775 /// stamp `cache_control` on that block. Anthropic's response is
776 /// what determines actual `cache_creation_input_tokens` /
777 /// `cache_read_input_tokens`. Anthropic enforces a per-prefix
778 /// minimum cacheable size (1024 tokens for sonnet/haiku-class
779 /// models). When a `cache_control` marker sits at a prefix BELOW
780 /// that minimum (e.g. the system block by itself is ~30 tokens),
781 /// Anthropic extends the cache write forward to the next eligible
782 /// boundary. So a "system marker only" cold dispatch can still
783 /// report `cache_creation_input_tokens` ≈ entire prompt size.
784 /// This is Anthropic's documented behavior; the engine's plan is
785 /// correct, but the `*_marker_placed` fields describe INTENT, not
786 /// the resulting wire-level cache footprint.
787 ///
788 /// Set `AKRIBES_DEBUG_CACHE_BODY=1` on the process running the
789 /// dispatcher to print a per-call summary of which body sections
790 /// carry markers.
791 CachePlanned {
792 /// Agent name as declared in the source (`agent classifier`).
793 agent: String,
794 /// Number of segments the engine assembled for this dispatch.
795 /// Includes the static prefix (docstrings/rules/examples), one
796 /// segment per `{placeholder}` boundary in the body template,
797 /// and the trailing structured-output instruction.
798 n_segments: usize,
799 /// How many segments the engine considered "stable" — i.e.
800 /// either previously cached for this agent or marked stable by
801 /// the DAG-aware peek (referenced by an upcoming dispatch).
802 n_stable: usize,
803 /// Total character length of the longest stable run at the
804 /// HEAD of the segment list. `0` when no leading prefix was
805 /// stable.
806 longest_stable_prefix_len_chars: usize,
807 /// Number of `cache_control` markers actually placed on the
808 /// user message. `0`, `1`, or `2` (Anthropic's user-message
809 /// budget within the 4-marker per-request cap).
810 ///
811 /// Engine-level intent only; see the type-level docstring
812 /// above for the relationship to wire markers and Anthropic's
813 /// observed `cache_creation_input_tokens`.
814 markers_placed: usize,
815 /// Segment indices the engine stamped with a `cache_control`
816 /// marker, sorted ascending. Length always equals
817 /// `markers_placed`. Empty on the cold-cache / no-stable-prefix
818 /// paths. Captured for the DAG-aware integration tests so they
819 /// can assert the boundary picked by the placement algorithm
820 /// without scraping the outbound HTTP body.
821 ///
822 /// `#[serde(default)]` for backwards compat with payloads
823 /// emitted before the field existed.
824 #[serde(default)]
825 markers_placed_at: Vec<usize>,
826 /// Whether the engine asked the provider to stamp
827 /// `cache_control` on the `tools` block (issue #472 item 1).
828 /// `true` on every Anthropic structured-output dispatch
829 /// today — the synthetic `return_result` tool is stable per
830 /// task and always benefits from caching. Reserved for
831 /// future per-call unique-tool flows that may flip it to
832 /// `false`.
833 ///
834 /// Provider name the dispatch is going to (`anthropic`,
835 /// `openai`, `gemini`, ...). Lets SDK consumers route each
836 /// `CachePlanned` event to a provider-specific renderer
837 /// without scraping `agent` / model state. Issue #1019:
838 /// before this field every event was an Anthropic event,
839 /// invisible for OpenAI / Gemini caching paths. Carries the
840 /// lower-cased provider id so renderers match against
841 /// `"anthropic"` / `"openai"` / `"gemini"` directly. Empty
842 /// string in old wire payloads (the `#[serde(default)]`
843 /// default).
844 #[serde(default)]
845 provider: String,
846 /// `#[serde(default)]` for backwards compat with payloads
847 /// emitted before the field existed (pre-#472 payloads
848 /// silently treat this as `false`, which under-reports the
849 /// real wire footprint — the field is the canonical truth
850 /// from this PR onward).
851 #[serde(default)]
852 tools_marker_placed: bool,
853 /// Whether the engine asked the provider to stamp
854 /// `cache_control` on the `system` block (issue #472
855 /// item 1). `true` on any dispatch with a non-empty agent
856 /// system prompt. Reserved for future one-shot
857 /// system-prompt flows (e.g. agent definitions that include
858 /// per-call data) that may flip it to `false`.
859 ///
860 /// `#[serde(default)]` for backwards compat — see
861 /// `tools_marker_placed`.
862 #[serde(default)]
863 system_marker_placed: bool,
864 },
865 Suspended {
866 checkpoint_name: String,
867 token: String,
868 prompt: String,
869 schema: serde_json::Value,
870 actor_hint: ActorHint,
871 timeout_secs: Option<u64>,
872 /// Why we suspended. Defaults to [`SuspendTrigger::DagPosition`] on
873 /// older wire payloads that omit the field (e.g. an older server
874 /// serializing against a pre-Stream-6 SDK, or vice versa). The
875 /// `#[serde(default)]` here is what guarantees backwards compat.
876 #[serde(default)]
877 trigger: SuspendTrigger,
878 /// `Some(ctx)` when the suspension fired inside a `loop` block's
879 /// per-turn dispatch — see [`LoopSuspendContext`] for the carried
880 /// fields. `None` for the existing top-level / DAG checkpoint
881 /// path, which is the wire shape every pre-loop-checkpoint SDK
882 /// emits. `#[serde(default, skip_serializing_if = "Option::is_none")]`
883 /// keeps both directions wire-compatible: old servers serializing
884 /// against new SDKs simply omit the field, and new servers emit it
885 /// only when there's a loop context to carry.
886 #[serde(default, skip_serializing_if = "Option::is_none")]
887 loop_context: Option<LoopSuspendContext>,
888 },
889 Resumed {
890 checkpoint_name: String,
891 token: String,
892 },
893 /// Terminal event of a workflow run.
894 ///
895 /// # Aggregate rollup (issue #1173)
896 ///
897 /// Pre-#1173 this was the tuple variant `WorkflowEnd(Value)` whose
898 /// payload was the bare workflow output. Consumers re-walked every
899 /// `TaskEnd` to compute totals; cluster dashboards and the CLI's
900 /// trailer line did this on every execution. The variant still has
901 /// a single payload slot — a [`WorkflowEndPayload`] — but the
902 /// payload itself now carries both the output value AND a
903 /// [`WorkflowTotals`] rollup so consumers can size the run without
904 /// touching the per-task stream.
905 ///
906 /// # Wire compatibility
907 ///
908 /// New wire shape:
909 /// `{"type": "WorkflowEnd", "payload": {"value": <output>, "total_input_tokens": N, ...}}`.
910 /// Legacy wire shape (pre-#1173):
911 /// `{"type": "WorkflowEnd", "payload": <output>}` — payload IS the
912 /// bare output value.
913 ///
914 /// [`WorkflowEndPayload`] implements `Serialize`/`Deserialize` by
915 /// hand so it accepts both shapes: new shape when the payload is a
916 /// JSON object with a `value` key plus at least one `total_*`
917 /// aggregate key, legacy bare-value otherwise. Aggregate fields
918 /// default to `0` on legacy reads; serialization always emits the
919 /// new shape.
920 WorkflowEnd(WorkflowEndPayload),
921 /// Structured failure surfaced to subscribers (SSE, WebSocket, OTel).
922 /// `message` and `kind` are kept for back-compat; `code`,
923 /// `user_message`, `retry_after_ms`, and `source` carry the richer
924 /// detail consumers should branch on. New construction goes through
925 /// [`EngineEvent::error_from_detail`] so all fields stay in sync.
926 Error {
927 message: String,
928 kind: ErrorKind,
929 #[serde(default = "default_error_code_other")]
930 code: ErrorCode,
931 #[serde(default)]
932 user_message: String,
933 #[serde(default, skip_serializing_if = "Option::is_none")]
934 retry_after_ms: Option<u64>,
935 #[serde(default, skip_serializing_if = "ErrorSource::is_empty")]
936 source: ErrorSource,
937 },
938 NodeStart(NodeId, Span),
939 NodeEnd {
940 node_id: NodeId,
941 span: Span,
942 target_var: Option<String>,
943 #[serde(with = "opt_value_wire")]
944 value: Option<Value>,
945 duration: std::time::Duration,
946 },
947 Breakpoint {
948 node_id: NodeId,
949 span: Span,
950 token: String,
951 #[serde(with = "value_map_wire")]
952 env_snapshot: std::collections::HashMap<String, Value>,
953 },
954 BreakpointResumed {
955 node_id: NodeId,
956 token: String,
957 },
958 ToolCallStart {
959 task_name: String,
960 tool_name: String,
961 server_name: String,
962 input: serde_json::Value,
963 /// LLM-issued `tool_use_id`. Empty string on pre-durable-execution
964 /// payloads (preserved by `#[serde(default)]`). Always present on
965 /// events written by v1+ engines; the cache lookup at the
966 /// `ToolCallEnd` site keys on this.
967 #[serde(default)]
968 tool_use_id: String,
969 },
970 ToolCallEnd {
971 task_name: String,
972 tool_name: String,
973 #[serde(default)]
974 tool_use_id: String,
975 output: serde_json::Value,
976 duration: std::time::Duration,
977 },
978 /// An MCP server's circuit breaker tripped open.
979 McpServerDegraded {
980 alias: String,
981 reason: String,
982 },
983 /// An MCP server recovered (circuit breaker closed again).
984 McpServerRecovered {
985 alias: String,
986 },
987 /// A destructive MCP tool invocation is awaiting operator approval.
988 /// Mirrors the checkpoint suspension protocol — the execution is
989 /// resumed by `POST /executions/{id}/resume` with
990 /// `{ approve: bool, args_override?: Value }` keyed on `token`.
991 ToolApprovalPending {
992 execution_id: Option<String>,
993 node_id: Option<u64>,
994 token: String,
995 tool_ref: String,
996 args: serde_json::Value,
997 },
998 /// Audit-trail companion to [`EngineEvent::ToolApprovalPending`]
999 /// (issue #857). Emitted on every resume path — both approval and
1000 /// rejection — so trace replay can reconstruct the decision and
1001 /// Studio's approval inbox can render a checkmark/X next to the
1002 /// resolved row. Without this event the only way to distinguish
1003 /// "approved" from "rejected" after the fact is to observe whether
1004 /// a subsequent `ToolCallStart` fired against the same token,
1005 /// which is fragile and loses the rejection reason.
1006 ToolApprovalResolved {
1007 /// Matches the `token` on the originating
1008 /// [`EngineEvent::ToolApprovalPending`].
1009 token: String,
1010 /// True on approve, false on reject.
1011 approved: bool,
1012 /// Operator-supplied argument override on approval, when the
1013 /// approver chose to edit the tool args before dispatch. None
1014 /// on rejection or on plain approve-as-proposed.
1015 #[serde(default, skip_serializing_if = "Option::is_none")]
1016 args_override: Option<serde_json::Value>,
1017 /// Optional reason string the approver attached to the decision.
1018 /// Surfaced by Studio's inbox; persisted for compliance audits.
1019 #[serde(default, skip_serializing_if = "Option::is_none")]
1020 reason: Option<String>,
1021 },
1022 /// Emitted when a tool that would normally have required approval
1023 /// was dispatched without prompting because a pre-configured policy
1024 /// (allowlist / read-only classification / project-level
1025 /// auto-approval) covered it. Operator-audit gap closer (issue
1026 /// #1110): without this event the only way to spot auto-approval
1027 /// is to compare the policy config against the trace stream
1028 /// out-of-band.
1029 ///
1030 /// `reason` is a short policy-driven discriminator (e.g.
1031 /// `"policy:read_only"`, `"policy:allowlisted"`,
1032 /// `"policy:internal"`) so SDK consumers can render a badge
1033 /// without re-classifying the tool themselves.
1034 ToolApprovalSkipped {
1035 execution_id: Option<String>,
1036 node_id: Option<u64>,
1037 tool_ref: String,
1038 reason: String,
1039 },
1040 /// Emitted during durable replay when the replay cache holds a
1041 /// `ToolCallStart` for a given `tool_use_id` but no matching
1042 /// `ToolCallEnd`. The tool MAY have fired once on a previous run before
1043 /// the server crashed mid-call, and we are about to re-fire it — which
1044 /// will cause a duplicate side effect for non-idempotent tools (e.g.
1045 /// `send_email`, `create_pr`).
1046 ///
1047 /// This previously emitted only a `tracing::warn!(target =
1048 /// "tool_replay_uncertain", ...)`, which is invisible to SDK / Studio
1049 /// consumers. The structured event lets the operator surface a
1050 /// "replay-uncertain tool" badge inline in the execution timeline and
1051 /// decide whether to acknowledge before the re-run continues (#872).
1052 ///
1053 /// `args` is the raw JSON-encoded tool input from the cached start
1054 /// event so the operator can compare against the impending re-fire's
1055 /// args. `#[serde(default)]` on the deserializer keeps wire-compat
1056 /// with pre-#872 events that omit the variant entirely (they decode
1057 /// through the SDK's `Other` catch-all).
1058 ToolReplayUncertain {
1059 execution_id: Option<String>,
1060 tool_use_id: String,
1061 tool_name: String,
1062 #[serde(default)]
1063 args: serde_json::Value,
1064 },
1065 VerificationStart {
1066 workflow_name: String,
1067 },
1068 VerificationResult {
1069 workflow_name: String,
1070 results: serde_json::Value,
1071 duration: std::time::Duration,
1072 },
1073 /// A structured-output task's response failed validation. Emitted in
1074 /// addition to the existing `Log` line so SDK consumers without the new
1075 /// event still render the human-readable summary, but tooling that knows
1076 /// about this variant can render the model's actual response, the
1077 /// schema-validator's structured error breakdown, and the provider's
1078 /// `stop_reason` (so e.g. a `max_tokens` truncation isn't misdiagnosed
1079 /// as "schema overflow" — see issue #320).
1080 ///
1081 /// Fields:
1082 /// * `task_name` — the task whose validation failed.
1083 /// * `attempt` — 1-indexed attempt number (1 = first call, 2 = first
1084 /// retry, …).
1085 /// * `model_response` — the raw text / JSON-serialized tool input the
1086 /// model emitted, exactly as the validator saw it. May be empty (`""`)
1087 /// or `"{}"` when the model truncated mid-output.
1088 ///
1089 /// **Issue #1139 — bounded.** Capped at
1090 /// [`VALIDATION_FAILURE_RESPONSE_CAP_BYTES`] bytes on emit. If the
1091 /// raw response was longer, `truncated` is `true` and `total_length`
1092 /// carries the original byte count so consumers can surface "model
1093 /// emitted 4 MB; first 64 KB shown" instead of silently logging a
1094 /// megabyte to `execution_events` three times in a retry loop.
1095 /// * `truncated` — `true` when `model_response` was truncated to fit
1096 /// the cap. `#[serde(default)]` keeps pre-#1139 wire payloads
1097 /// decoding cleanly (they decode as `false`).
1098 /// * `total_length` — original byte length of the model response
1099 /// before truncation. Equal to `model_response.len()` when
1100 /// `truncated` is `false`. `#[serde(default)]` for wire-compat.
1101 /// * `missing_fields` — JSON-pointer paths to required fields the schema
1102 /// validator flagged as absent.
1103 /// * `extra_fields` — paths to fields rejected by `additionalProperties:
1104 /// false`.
1105 /// * `type_errors` — human-readable type / value mismatches (e.g.
1106 /// `"expected string, got null at /name"`). Includes any non-missing,
1107 /// non-additional-property schema errors plus parse / custom-validator
1108 /// messages.
1109 /// * `stop_reason` — the provider's stop reason, when known. For
1110 /// Anthropic this is `"end_turn"` / `"max_tokens"` / `"tool_use"` etc.
1111 /// `None` when the upstream call didn't surface one.
1112 ValidationFailure {
1113 task_name: String,
1114 attempt: u32,
1115 model_response: String,
1116 #[serde(default)]
1117 truncated: bool,
1118 #[serde(default)]
1119 total_length: u64,
1120 missing_fields: Vec<String>,
1121 extra_fields: Vec<String>,
1122 type_errors: Vec<String>,
1123 stop_reason: Option<String>,
1124 },
1125 /// Envelope that wraps an event emitted by a sub-script invoked from a
1126 /// parent task via the `call("name", inputs={...})` script-composition
1127 /// primitive (roadmap item A).
1128 ///
1129 /// # Flat wire shape (issue #993)
1130 ///
1131 /// Before #993 the envelope nested a `Box<EngineEvent>` per call-stack
1132 /// level, so a depth-N chain produced a single event whose serialized
1133 /// size was `O(N)` (a 10-level fanout could push one SSE frame past a
1134 /// megabyte and choke reconnect logic in cluster cards). The new shape
1135 /// is flat: `child` is always the LEAF event the innermost sub-engine
1136 /// emitted (never another `SubScript`), and `parent_path` carries the
1137 /// outer ancestor chain by id.
1138 ///
1139 /// * `script_name` — the immediately-running sub-script (innermost
1140 /// frame). Same field semantics as the legacy wire shape.
1141 /// * `parent_task` — the variable name on the immediate-parent side
1142 /// that received the call's result.
1143 /// * `parent_node_id` / `attempt` — the immediate parent's
1144 /// call(...) node id + author-raise attempt counter (#845).
1145 /// * `parent_path` — ordered `[outermost, ..., immediate_parent]`;
1146 /// empty when the current sub-script is a direct child of the
1147 /// top-level workflow.
1148 /// * `child` — the leaf event the innermost sub-engine emitted.
1149 /// Boxed to keep the variant cheap on the stack. Field name kept
1150 /// for wire-compat with pre-#993 consumers; the meaning narrowed
1151 /// from "wrapped event (potentially another SubScript)" to "leaf
1152 /// event".
1153 ///
1154 /// Consumers that want the full call tree walk `parent_path` once
1155 /// and consume `child` as a leaf. The top-level reducer in each SDK
1156 /// is responsible for stitching frames back into a tree.
1157 ///
1158 /// # Back-compat read path
1159 ///
1160 /// Legacy emissions (pre-#993) put a nested `SubScript` inside
1161 /// `child`, with `parent_path` absent or empty. [`Self::flatten_subscript_chain`]
1162 /// is the canonical post-deserialize step that walks any nested
1163 /// SubScripts into `parent_path`, leaving the resulting envelope
1164 /// in the new flat form. The Rust SDK runs it on every inbound
1165 /// event before projecting to `WorkflowEvent`; the TS and Python
1166 /// SDKs flatten equivalently inside their reducers.
1167 SubScript {
1168 /// Script name of the innermost (currently emitting) sub-script.
1169 script_name: String,
1170 /// Parent-side variable name that received the immediate parent's
1171 /// call result.
1172 parent_task: String,
1173 /// Compiler-stable id of the immediate parent's call(...) node.
1174 /// Lets SDK consumers correlate retries of the same call site
1175 /// (issue #845). `#[serde(default)]` keeps pre-#845 wire payloads
1176 /// decoding cleanly.
1177 #[serde(default, skip_serializing_if = "Option::is_none")]
1178 parent_node_id: Option<u64>,
1179 /// 1-indexed attempt counter for author-raise retries at the
1180 /// same call site. See [`SubScriptFrame::attempt`] for the same
1181 /// semantics on ancestor frames.
1182 #[serde(default, skip_serializing_if = "Option::is_none")]
1183 attempt: Option<u8>,
1184 /// Ancestor chain from outermost to immediate parent. Empty
1185 /// when the emitting sub-script sits directly under the
1186 /// top-level workflow. Skipped from the wire when empty so a
1187 /// depth-1 envelope stays as compact as it was pre-#993, and
1188 /// `#[serde(default)]` keeps pre-#993 payloads (which omit the
1189 /// field) decoding cleanly.
1190 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1191 parent_path: Vec<SubScriptFrame>,
1192 /// The leaf event the innermost sub-engine emitted. New
1193 /// emissions guarantee this is never another `SubScript`; legacy
1194 /// payloads may still nest one — call [`EngineEvent::flatten_subscript_chain`]
1195 /// to normalize.
1196 child: Box<EngineEvent>,
1197 },
1198 /// A `loop` block began executing. Emitted exactly once per loop call,
1199 /// before any `LoopTurn`. `max_turns` is the resolved upper-bound budget
1200 /// (declared `max_turns:` if present, else the engine's
1201 /// `LOOP_MAX_TURNS_DEFAULT`). Additive event — older SDKs treat the
1202 /// JSON payload as an unknown variant and ignore it.
1203 LoopStart {
1204 name: String,
1205 max_turns: u32,
1206 },
1207 /// A single turn of a `loop` block settled (provider call returned and
1208 /// every `tool_use` block was dispatched). `turn` is 1-indexed.
1209 /// `tool_calls` is the names of the tools the model invoked this turn,
1210 /// in dispatch order — including the synthetic `state_get`,
1211 /// `state_update`, `return` and any user `skills:` entries.
1212 LoopTurn {
1213 name: String,
1214 turn: u32,
1215 tool_calls: Vec<String>,
1216 /// Per-turn token usage reported by the provider for the
1217 /// dispatch this turn produced. `None` on providers that don't
1218 /// surface per-call usage (mock provider) and on pre-#829
1219 /// wire payloads. Lets Studio's loop card show "turn 3 spent
1220 /// 4500 tokens" without walking the wrapped `LLMResponse`
1221 /// sub-tree of events.
1222 #[serde(default, skip_serializing_if = "Option::is_none")]
1223 usage: Option<TokenUsage>,
1224 },
1225 /// A `loop` block exited. `value` is the agent's submitted return value
1226 /// (from `return(...)`), the final state on natural `stop_when` exit
1227 /// without a return, or a `Value::FatalError` envelope when the loop
1228 /// exhausted its `max_turns` budget without ever calling `return`.
1229 /// `turn_count` is the number of turns actually executed (1-indexed).
1230 LoopEnd {
1231 name: String,
1232 turn_count: u32,
1233 #[serde(with = "value_wire")]
1234 value: Value,
1235 },
1236 /// Emitted when a compaction step runs — once per primitive
1237 /// activation. `provider_native: true` means Anthropic / OpenAI
1238 /// performed the compaction server-side; the engine surfaces the
1239 /// before/after counts from the response. `strategy` is the
1240 /// primitive name (`drop_thinking_blocks`, `drop_oldest_tool_results`,
1241 /// `summarize_to_state`, `provider_native`) or the user task name
1242 /// for a custom compactor task.
1243 ///
1244 /// `cache_ttl` is `Some("5m")` or `Some("1h")` on the Anthropic
1245 /// `provider_native` path (the engine pins `ttl: "1h"` via the
1246 /// `extended-cache-ttl-2025-04-11` beta header — see
1247 /// `providers.rs:1772`), `None` for every non-native compaction
1248 /// primitive (the engine-driven primitives don't write any cache
1249 /// block themselves). Downstream cost dashboards multiply
1250 /// cache-write tokens by the 1h-vs-5m rate (issue #1130); without
1251 /// this field the wire envelope leaves the TTL ambiguous.
1252 /// `#[serde(default, skip_serializing_if = "Option::is_none")]`
1253 /// keeps pre-#1130 wire payloads decodable as `cache_ttl = None`.
1254 ///
1255 /// See the compaction design at
1256 /// `docs/superpowers/specs/2026-05-12-compaction-design.md` for the
1257 /// "Observability + cost" contract; the chain emits one
1258 /// `ContextCompacted` per primitive activation and `ContextOverflow`
1259 /// at chain exhaustion.
1260 ContextCompacted {
1261 agent: String,
1262 loop_id: Option<String>,
1263 turn: Option<u32>,
1264 threshold_pct: Option<u8>,
1265 threshold_abs: Option<u32>,
1266 strategy: String,
1267 before_tokens: u32,
1268 after_tokens: u32,
1269 provider_native: bool,
1270 #[serde(default, skip_serializing_if = "Option::is_none")]
1271 cache_ttl: Option<String>,
1272 },
1273 /// Emitted when the compaction chain runs to exhaustion (or when
1274 /// `compaction: none` and the request would exceed the model context).
1275 /// Carries the chain log so users can diagnose which primitives ran
1276 /// before the engine gave up.
1277 ///
1278 /// `terminated_by_hard_error` distinguishes the user-authored
1279 /// `hard_error()` early-exit (issue #1056) from the implicit
1280 /// chain-exhaustion exit. SDKs can render a stronger "author
1281 /// intentionally bailed out at threshold X" message instead of the
1282 /// generic "all arms exhausted" copy.
1283 ///
1284 /// `#[serde(default)]` on the new field keeps pre-#1056 wire
1285 /// payloads decodable (they decode as `terminated_by_hard_error =
1286 /// false`, which matches their original semantics — every old
1287 /// emission was a chain-exhaustion exit).
1288 ContextOverflow {
1289 agent: String,
1290 attempted_strategies: Vec<String>,
1291 configured_cap_tokens: u32,
1292 model_context_window: u32,
1293 /// `true` when the chain hit a user-authored `hard_error()`
1294 /// step; `false` when the chain ran past every step without
1295 /// freeing enough budget. See the type-level docs for the
1296 /// distinction and the back-compat default.
1297 #[serde(default)]
1298 terminated_by_hard_error: bool,
1299 },
1300 /// Emitted by the engine when a task's result is served from
1301 /// [`crate::engine_persistent_cache::PersistentTaskCache`] instead of
1302 /// being dispatched to a provider. Lets trace inspectors, the bench
1303 /// UI, and the MCP show "stages 1-3 were free, stage 4 ran" without
1304 /// having to parse prompt-segment internals.
1305 ///
1306 /// `agent` is the agent name the task ran under (matches
1307 /// [`EngineEvent::AgentOutput::agent_name`] for the same task).
1308 /// `key_prefix` is the first 6 hex chars of the (u64) cache key so
1309 /// consumers can cluster repeated hits without persisting the full
1310 /// key. The prefix is informational only — collisions are harmless,
1311 /// the key itself is the source of truth.
1312 ///
1313 /// Emitted on the cache-hit branch of the engine's task dispatch
1314 /// loop, before the corresponding `TaskEnd` event for the same task.
1315 TaskCacheHit {
1316 agent: String,
1317 key_prefix: String,
1318 },
1319 /// LLM provider response captured for durable replay. Carries the full
1320 /// response (text + tool-use blocks + usage) keyed by `(node_id,
1321 /// call_index)`. See `crates/akribes-core/src/replay_cache.rs`.
1322 LLMResponse {
1323 node_id: String,
1324 call_index: u32,
1325 text: String,
1326 tool_calls: Vec<CachedToolCall>,
1327 usage: Option<TokenUsage>,
1328 },
1329 /// Emitted on the cache-hit branch of `call_provider_*` —
1330 /// distinguishes "this task's underlying LLM call was served from
1331 /// the replay cache" from "the task's full result was served from
1332 /// the persistent task cache" (the latter already has
1333 /// [`EngineEvent::TaskCacheHit`]). Issue #815: a reconnecting SSE
1334 /// client otherwise can't tell a cache-served task from a slow
1335 /// first-token, because the engine deliberately does not re-emit
1336 /// streaming AgentOutput chunks on replay.
1337 ///
1338 /// `node_id` + `call_index` mirror the keying of the underlying
1339 /// `LLMResponse` variant so consumers can correlate the hit with
1340 /// the cached response.
1341 LLMReplayCacheHit {
1342 node_id: String,
1343 call_index: u32,
1344 },
1345 /// A child execution row was just inserted at the parent's `call(...)`
1346 /// node. The parent's event log carries this *intent* event; the
1347 /// child's own log records its lifecycle independently.
1348 SubScriptSpawned {
1349 child_execution_id: String,
1350 parent_node_id: String,
1351 args: serde_json::Value,
1352 },
1353 /// Child execution finished and the parent observed its terminal
1354 /// state. Synthesised by the parent reading the child's terminal
1355 /// event; never written by the child itself.
1356 SubScriptResult {
1357 parent_node_id: String,
1358 child_execution_id: String,
1359 outcome: ChildOutcome,
1360 },
1361 /// A `Suspended` checkpoint resolved. Written when `POST
1362 /// /executions/:id/resume` lands a payload; replay re-derives the
1363 /// engine state from this event instead of re-suspending.
1364 CheckpointResolution {
1365 checkpoint_id: String,
1366 payload: serde_json::Value,
1367 },
1368 /// Emitted when the engine starts dispatching a `runtime` block (the
1369 /// container code-execution construct — see the
1370 /// "AI-driven container code execution" feature). One event per
1371 /// runtime call site, before any `RuntimeStdout`/`RuntimeStderr`
1372 /// chunks. `task_name` is the call-site identifier the engine uses
1373 /// to attribute the call (matches the wrapping `TaskStart`/`TaskEnd`
1374 /// pair's `task` field so SDK reducers that group by task keep
1375 /// working). `runtime_name` is the declared `runtime NAME(...)`
1376 /// identifier. `language` is the source-form keyword
1377 /// (`"python"` / `"bash"` / `"node"` / `"rust"` / `"java"`).
1378 RuntimeStart {
1379 task_name: String,
1380 runtime_name: String,
1381 language: String,
1382 },
1383 /// A chunk of stdout produced by the running sandbox. `chunk` is the
1384 /// raw byte slice decoded as UTF-8 (lossy). Multiple events fire in
1385 /// arrival order; SDK reducers concatenate to reconstruct the full
1386 /// stream.
1387 RuntimeStdout {
1388 task_name: String,
1389 chunk: String,
1390 },
1391 /// A chunk of stderr produced by the running sandbox. Mirrors
1392 /// [`EngineEvent::RuntimeStdout`] for the error stream.
1393 RuntimeStderr {
1394 task_name: String,
1395 chunk: String,
1396 },
1397 /// The runtime call finished successfully. `exit_code` is the
1398 /// container's process exit code (0 for clean exit; non-zero on
1399 /// crash / panic / explicit non-zero exit). `duration_ms` is the
1400 /// wall-clock time the sandbox reported between dispatch and exit
1401 /// — does not include the time the engine spent waiting on its own
1402 /// semaphore.
1403 RuntimeEnd {
1404 task_name: String,
1405 exit_code: i32,
1406 duration_ms: u64,
1407 },
1408 /// The runtime call failed before producing an exit code.
1409 /// `kind` is a stable wire-form discriminator:
1410 /// `"NotConfigured"` (no sandbox URL set),
1411 /// `"Timeout"` (execution exceeded the declared timeout),
1412 /// `"SandboxUnavailable"` (network / connect error to the sandbox),
1413 /// `"OomKilled"` (the container hit its memory cap),
1414 /// `"Internal"` (any other sandbox-side failure).
1415 /// `message` is human-readable detail forwarded from the sandbox's
1416 /// `error` SSE event (or synthesised by the engine for client-side
1417 /// errors like `NotConfigured`).
1418 RuntimeError {
1419 task_name: String,
1420 kind: String,
1421 message: String,
1422 },
1423}
1424
1425/// Cap applied to [`EngineEvent::ValidationFailure::model_response`]
1426/// on emit (issue #1139). Set to 64 KiB — enough to capture the
1427/// validator's view of any reasonable model output while keeping
1428/// `execution_events` rows bounded. A 4 MB tool input on a three-retry
1429/// validation loop used to bloat the persisted log by 12 MB per task;
1430/// post-cap the same loop emits at most 192 KB.
1431pub const VALIDATION_FAILURE_RESPONSE_CAP_BYTES: usize = 64 * 1024;
1432
1433impl EngineEvent {
1434 /// Flatten any legacy nested [`EngineEvent::SubScript`] chain into the
1435 /// new `parent_path + child(leaf)` shape (issue #993).
1436 ///
1437 /// Pre-#993 emissions wrapped each call-stack level in its own
1438 /// `SubScript` envelope (`SubScript { child: Box<SubScript{ child: ... }> }`).
1439 /// The new shape carries the chain via `parent_path` and reserves
1440 /// `child` for the innermost leaf event. This method walks down
1441 /// `child` for as long as it is itself a `SubScript`, accumulating
1442 /// each frame into a growing `parent_path` (so the resulting list
1443 /// reads `[outermost_ancestor, …, immediate_parent_of_leaf]`), and
1444 /// finally sets `child` to the recovered leaf.
1445 ///
1446 /// Events that are not `SubScript` envelopes are returned unchanged.
1447 /// SDK consumers (Rust / TS / Python) call this on every inbound
1448 /// event so reducers see a uniform flat shape regardless of which
1449 /// engine version emitted the wire log.
1450 pub fn flatten_subscript_chain(self) -> Self {
1451 // We treat the current SubScript as a stack of `(script_name,
1452 // parent_task, parent_node_id, attempt)` frames anchored above
1453 // a leaf. Walk inward, accumulating frames; the OUTER frame at
1454 // each step is the immediate parent of the next inward step.
1455 let (script_name, parent_task, parent_node_id, attempt, outer_path, child) =
1456 match self {
1457 EngineEvent::SubScript {
1458 script_name,
1459 parent_task,
1460 parent_node_id,
1461 attempt,
1462 parent_path,
1463 child,
1464 } => (script_name, parent_task, parent_node_id, attempt, parent_path, child),
1465 other => return other,
1466 };
1467
1468 // Path order is `[outermost_ancestor, ..., immediate_parent_of_innermost]`.
1469 // `outer_path` is already in that order (legacy emissions have it empty).
1470 // The current top-level frame (cur_*) sits AFTER everything in outer_path
1471 // and is the immediate parent of whatever lives in `child`. As we walk
1472 // inward, cur_* moves down into `outer_path`'s suffix.
1473 let mut frames: Vec<SubScriptFrame> = outer_path;
1474 let mut cur_script = script_name;
1475 let mut cur_task = parent_task;
1476 let mut cur_node = parent_node_id;
1477 let mut cur_attempt = attempt;
1478 let mut cur_child: Box<EngineEvent> = child;
1479
1480 loop {
1481 match *cur_child {
1482 EngineEvent::SubScript {
1483 script_name: inner_script,
1484 parent_task: inner_task,
1485 parent_node_id: inner_node,
1486 attempt: inner_attempt,
1487 parent_path: inner_path,
1488 child: inner_child,
1489 } => {
1490 // Promote cur_* into the frame list — it now sits
1491 // ABOVE the inner frame in the ancestor chain.
1492 // Order: existing frames, then any frames the inner
1493 // envelope already carried in its own parent_path
1494 // (legacy emissions have this empty), then cur_*.
1495 frames.extend(inner_path);
1496 frames.push(SubScriptFrame {
1497 script_name: cur_script,
1498 parent_task: cur_task,
1499 parent_node_id: cur_node,
1500 attempt: cur_attempt,
1501 });
1502 cur_script = inner_script;
1503 cur_task = inner_task;
1504 cur_node = inner_node;
1505 cur_attempt = inner_attempt;
1506 cur_child = inner_child;
1507 }
1508 leaf => {
1509 return EngineEvent::SubScript {
1510 script_name: cur_script,
1511 parent_task: cur_task,
1512 parent_node_id: cur_node,
1513 attempt: cur_attempt,
1514 parent_path: frames,
1515 child: Box::new(leaf),
1516 };
1517 }
1518 }
1519 }
1520 }
1521
1522 /// Build a [`EngineEvent::ValidationFailure`] with `model_response`
1523 /// capped to [`VALIDATION_FAILURE_RESPONSE_CAP_BYTES`] (issue
1524 /// #1139). The original byte length is preserved on `total_length`
1525 /// and `truncated` is set when the cap fired so consumers can
1526 /// surface "first 64 KB shown; original was N bytes" without
1527 /// having to re-derive it. UTF-8 boundary is respected — the
1528 /// truncation slices at the closest char boundary below the cap.
1529 pub fn validation_failure(
1530 task_name: impl Into<String>,
1531 attempt: u32,
1532 model_response: String,
1533 missing_fields: Vec<String>,
1534 extra_fields: Vec<String>,
1535 type_errors: Vec<String>,
1536 stop_reason: Option<String>,
1537 ) -> Self {
1538 let total_length = model_response.len() as u64;
1539 let (response, truncated) = if model_response.len() > VALIDATION_FAILURE_RESPONSE_CAP_BYTES {
1540 // Walk down to the nearest char boundary so we don't
1541 // produce a fragment of a multi-byte UTF-8 sequence.
1542 let mut end = VALIDATION_FAILURE_RESPONSE_CAP_BYTES;
1543 while end > 0 && !model_response.is_char_boundary(end) {
1544 end -= 1;
1545 }
1546 (model_response[..end].to_string(), true)
1547 } else {
1548 (model_response, false)
1549 };
1550 EngineEvent::ValidationFailure {
1551 task_name: task_name.into(),
1552 attempt,
1553 model_response: response,
1554 truncated,
1555 total_length,
1556 missing_fields,
1557 extra_fields,
1558 type_errors,
1559 stop_reason,
1560 }
1561 }
1562
1563 /// Build an [`EngineEvent::Error`] from a fully-formed
1564 /// [`crate::error::ErrorDetail`]. Use this at every error-emission
1565 /// site so SDK / OTel / DB consumers all see the same structured
1566 /// fields.
1567 pub fn error(detail: crate::error::ErrorDetail) -> Self {
1568 EngineEvent::Error {
1569 message: detail.message,
1570 kind: detail.kind,
1571 code: detail.code,
1572 user_message: detail.user_message,
1573 retry_after_ms: detail.retry_after_ms,
1574 source: detail.source,
1575 }
1576 }
1577
1578 /// Quick constructor for sites that don't yet have a specific
1579 /// [`ErrorCode`]. Picks the closest "Other" code for the kind via
1580 /// [`crate::error::ErrorDetail::from_kind`].
1581 pub fn error_kind(kind: ErrorKind, message: impl Into<String>) -> Self {
1582 EngineEvent::error(crate::error::ErrorDetail::from_kind(kind, message))
1583 }
1584
1585 /// Quick constructor from a specific [`ErrorCode`].
1586 pub fn error_code(code: ErrorCode, message: impl Into<String>) -> Self {
1587 EngineEvent::error(crate::error::ErrorDetail::new(code, message))
1588 }
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593 use super::*;
1594 use crate::ast::TypeRef;
1595 use crate::value::Value;
1596
1597 #[test]
1598 fn task_end_event_serializes_with_value_and_value_type_and_attempt() {
1599 let e = EngineEvent::TaskEnd {
1600 task: "t".into(),
1601 on_error_label: None,
1602 value: Value::String("x".into()),
1603 value_type: Some(TypeRef::primitive("int")),
1604 duration: std::time::Duration::from_millis(10),
1605 attempt: 2,
1606 usage: None,
1607 variant: TaskEndVariant::Success,
1608 };
1609 let s = serde_json::to_string(&e).unwrap();
1610 assert!(s.contains("\"value\""), "serialized event should contain 'value' key: {}", s);
1611 assert!(s.contains("\"value_type\""), "serialized event should contain 'value_type' key: {}", s);
1612 assert!(s.contains("\"attempt\":2"), "serialized event should contain 'attempt':2: {}", s);
1613 assert!(s.contains("\"variant\":\"success\""), "variant should round-trip as snake_case 'success': {}", s);
1614 }
1615
1616 #[test]
1617 fn task_end_value_emits_clean_wire_form_not_tagged_value() {
1618 // Spec: `docs/src/content/docs/reference/engine-events.mdx` — every
1619 // `Value`-carrying field on an `EngineEvent` serializes as clean
1620 // JSON (the form `Value::to_wire_json` produces), not the internal
1621 // tagged-enum form (`{"String": ...}`, `{"Object": {...}}`).
1622 let mut obj = std::collections::HashMap::new();
1623 obj.insert("exit_code".to_string(), Value::Int(0));
1624 obj.insert("stdout".to_string(), Value::String("hi\n".into()));
1625 let e = EngineEvent::TaskEnd {
1626 task: "run".into(),
1627 on_error_label: None,
1628 value: Value::Object(obj),
1629 value_type: None,
1630 duration: std::time::Duration::from_millis(10),
1631 attempt: 1,
1632 usage: None,
1633 variant: TaskEndVariant::Success,
1634 };
1635 let j: serde_json::Value = serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
1636 let value = &j["payload"]["value"];
1637 assert_eq!(value["exit_code"], serde_json::json!(0));
1638 assert_eq!(value["stdout"], serde_json::json!("hi\n"));
1639 assert!(
1640 value.get("Object").is_none(),
1641 "wire form must not carry the tagged-enum 'Object' key: {value}",
1642 );
1643 assert!(
1644 value["exit_code"].get("Int").is_none(),
1645 "wire form must not carry the tagged-enum 'Int' key for scalars: {value}",
1646 );
1647 }
1648
1649 #[test]
1650 fn workflow_end_payload_emits_clean_wire_form_not_tagged_value() {
1651 // Issue #1173 reshape: `WorkflowEnd.payload` is now an object
1652 // with a `value` sub-field carrying the workflow's clean output
1653 // alongside the aggregate `total_*` rollup. The output under
1654 // `value` is still in the clean wire form (no `"Object"` /
1655 // `"Int"` tagged-enum keys) per the engine-events reference.
1656 let mut obj = std::collections::HashMap::new();
1657 obj.insert("exit_code".to_string(), Value::Int(0));
1658 obj.insert("duration_ms".to_string(), Value::Int(12));
1659 let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
1660 let j: serde_json::Value = serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
1661 let payload = &j["payload"];
1662 // The workflow output is now nested under `payload.value`.
1663 let value = &payload["value"];
1664 assert_eq!(value["exit_code"], serde_json::json!(0));
1665 assert_eq!(value["duration_ms"], serde_json::json!(12));
1666 assert!(
1667 value.get("Object").is_none(),
1668 "wire form must not carry the tagged-enum 'Object' key: {value}",
1669 );
1670 // Aggregate rollup is present and defaults to zero on a
1671 // payload built without explicit totals.
1672 assert_eq!(payload["total_input_tokens"], serde_json::json!(0));
1673 assert_eq!(payload["total_output_tokens"], serde_json::json!(0));
1674 assert_eq!(payload["task_count"], serde_json::json!(0));
1675 }
1676
1677 #[test]
1678 fn workflow_end_value_deserialise_round_trip_keeps_clean_shape() {
1679 // A consumer that round-trips `EngineEvent` via serde (e.g. the
1680 // durable event log on the server) should see the same clean JSON
1681 // shape both directions. The inner `Value` is reconstructed via
1682 // `Value::from_json`, which preserves shape but not semantic
1683 // variants (`Object`/`Int`/`String`/`List`/`Null`/`Decimal`/`Bool`).
1684 let mut obj = std::collections::HashMap::new();
1685 obj.insert("exit_code".to_string(), Value::Int(0));
1686 obj.insert("stdout".to_string(), Value::String("hi".into()));
1687 let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
1688 let s = serde_json::to_string(&e).unwrap();
1689 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1690 let s_again = serde_json::to_string(&back).unwrap();
1691 // Re-serialising the round-tripped event must produce identical JSON,
1692 // i.e. the wire form is its own fixed point.
1693 assert_eq!(s, s_again, "wire form should round-trip without drift");
1694 }
1695
1696 #[test]
1697 fn workflow_end_back_compat_legacy_bare_value_payload_parses() {
1698 // Issue #1173 back-compat: an event log captured against a
1699 // pre-#1173 server emits `payload` as the bare workflow value.
1700 // The custom `Deserialize` impl must accept that shape and
1701 // default the aggregate rollup to all-zero.
1702 let legacy = r#"{"type":"WorkflowEnd","payload":{"exit_code":0,"stdout":"hi"}}"#;
1703 let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
1704 match evt {
1705 EngineEvent::WorkflowEnd(payload) => {
1706 // Output recovered from bare payload.
1707 assert!(matches!(payload.value, Value::Object(_)));
1708 // Totals default to zero on legacy reads.
1709 assert_eq!(payload.totals.total_input_tokens, 0);
1710 assert_eq!(payload.totals.total_output_tokens, 0);
1711 assert_eq!(payload.totals.task_count, 0);
1712 }
1713 other => panic!("expected WorkflowEnd, got {other:?}"),
1714 }
1715 }
1716
1717 #[test]
1718 fn workflow_end_scalar_legacy_payload_parses_as_bare_value() {
1719 // Edge case: a workflow that returns a scalar (e.g.
1720 // `return "hi"`) emits `payload: "hi"` on the legacy wire.
1721 // Make sure the deserializer doesn't try to interpret that
1722 // as the new struct shape.
1723 let legacy = r#"{"type":"WorkflowEnd","payload":"hi"}"#;
1724 let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
1725 match evt {
1726 EngineEvent::WorkflowEnd(payload) => {
1727 assert!(matches!(payload.value, Value::String(ref s) if s == "hi"));
1728 }
1729 other => panic!("expected WorkflowEnd, got {other:?}"),
1730 }
1731 }
1732
1733 #[test]
1734 fn workflow_end_new_wire_shape_round_trips_with_totals() {
1735 // The new wire shape carries both `value` and `total_*` keys.
1736 // Round-trip serialize → deserialize → serialize must preserve
1737 // every field exactly.
1738 let totals = WorkflowTotals {
1739 total_input_tokens: 1234,
1740 total_output_tokens: 567,
1741 total_cached_input_tokens: 100,
1742 total_thinking_tokens: 25,
1743 total_tool_tokens: 0,
1744 total_cost_usd: 0.42,
1745 task_count: 3,
1746 };
1747 let mut obj = std::collections::HashMap::new();
1748 obj.insert("answer".to_string(), Value::Int(42));
1749 let payload = WorkflowEndPayload::with_totals(Value::Object(obj), totals.clone());
1750 let evt = EngineEvent::WorkflowEnd(payload);
1751 let s = serde_json::to_string(&evt).unwrap();
1752 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1753 match back {
1754 EngineEvent::WorkflowEnd(p) => {
1755 assert_eq!(p.totals.total_input_tokens, 1234);
1756 assert_eq!(p.totals.total_output_tokens, 567);
1757 assert_eq!(p.totals.total_cached_input_tokens, 100);
1758 assert_eq!(p.totals.total_thinking_tokens, 25);
1759 assert_eq!(p.totals.task_count, 3);
1760 assert!((p.totals.total_cost_usd - 0.42).abs() < 1e-9);
1761 }
1762 other => panic!("expected WorkflowEnd, got {other:?}"),
1763 }
1764 }
1765
1766 #[test]
1767 fn workflow_totals_accumulate_folds_token_usage() {
1768 // Engine's `emit(TaskEnd)` path delegates to `accumulate`;
1769 // each call adds usage tokens and bumps `task_count` even
1770 // when usage is `None`.
1771 let mut tot = WorkflowTotals::default();
1772 tot.accumulate(Some(&TokenUsage {
1773 input_tokens: 100,
1774 output_tokens: 50,
1775 cached_input_tokens: 10,
1776 reasoning_tokens: 7,
1777 ..Default::default()
1778 }));
1779 tot.accumulate(Some(&TokenUsage {
1780 input_tokens: 200,
1781 output_tokens: 80,
1782 cached_input_tokens: 30,
1783 reasoning_tokens: 3,
1784 ..Default::default()
1785 }));
1786 tot.accumulate(None);
1787 assert_eq!(tot.total_input_tokens, 300);
1788 assert_eq!(tot.total_output_tokens, 130);
1789 assert_eq!(tot.total_cached_input_tokens, 40);
1790 assert_eq!(tot.total_thinking_tokens, 10);
1791 assert_eq!(tot.task_count, 3);
1792 }
1793
1794 #[test]
1795 fn sub_script_flatten_chain_collapses_legacy_nested_envelopes() {
1796 // Build the legacy 3-deep shape:
1797 // SubScript{ name=A, child= SubScript{ name=B, child= SubScript{ name=C, child=Log("hi") } } }
1798 // Expected after `flatten_subscript_chain`:
1799 // SubScript{
1800 // name=C, parent_path=[A, B], child=Log("hi")
1801 // }
1802 let leaf = EngineEvent::Log("hi".into());
1803 let depth3 = EngineEvent::SubScript {
1804 script_name: "C".into(),
1805 parent_task: "c_task".into(),
1806 parent_node_id: Some(3),
1807 attempt: None,
1808 parent_path: Vec::new(),
1809 child: Box::new(leaf),
1810 };
1811 let depth2 = EngineEvent::SubScript {
1812 script_name: "B".into(),
1813 parent_task: "b_task".into(),
1814 parent_node_id: Some(2),
1815 attempt: None,
1816 parent_path: Vec::new(),
1817 child: Box::new(depth3),
1818 };
1819 let legacy = EngineEvent::SubScript {
1820 script_name: "A".into(),
1821 parent_task: "a_task".into(),
1822 parent_node_id: Some(1),
1823 attempt: None,
1824 parent_path: Vec::new(),
1825 child: Box::new(depth2),
1826 };
1827
1828 let flat = legacy.flatten_subscript_chain();
1829 match flat {
1830 EngineEvent::SubScript {
1831 script_name,
1832 parent_task,
1833 parent_node_id,
1834 parent_path,
1835 child,
1836 ..
1837 } => {
1838 // Innermost frame surfaces on the envelope itself.
1839 assert_eq!(script_name, "C");
1840 assert_eq!(parent_task, "c_task");
1841 assert_eq!(parent_node_id, Some(3));
1842 // Ancestor chain: [outermost A, then B].
1843 assert_eq!(parent_path.len(), 2);
1844 assert_eq!(parent_path[0].script_name, "A");
1845 assert_eq!(parent_path[0].parent_task, "a_task");
1846 assert_eq!(parent_path[0].parent_node_id, Some(1));
1847 assert_eq!(parent_path[1].script_name, "B");
1848 assert_eq!(parent_path[1].parent_task, "b_task");
1849 assert_eq!(parent_path[1].parent_node_id, Some(2));
1850 // Leaf is the original Log event.
1851 assert!(matches!(*child, EngineEvent::Log(ref s) if s == "hi"));
1852 }
1853 other => panic!("expected SubScript, got {other:?}"),
1854 }
1855 }
1856
1857 #[test]
1858 fn sub_script_flatten_leaves_already_flat_envelopes_alone() {
1859 // A SubScript whose `child` is already a leaf event is the
1860 // canonical shape — no transformation needed.
1861 let evt = EngineEvent::SubScript {
1862 script_name: "child".into(),
1863 parent_task: "result".into(),
1864 parent_node_id: Some(7),
1865 attempt: Some(1),
1866 parent_path: Vec::new(),
1867 child: Box::new(EngineEvent::Log("hi".into())),
1868 };
1869 let flat = evt.flatten_subscript_chain();
1870 match flat {
1871 EngineEvent::SubScript { parent_path, child, .. } => {
1872 assert!(parent_path.is_empty());
1873 assert!(matches!(*child, EngineEvent::Log(_)));
1874 }
1875 other => panic!("expected SubScript, got {other:?}"),
1876 }
1877 }
1878
1879 #[test]
1880 fn sub_script_flatten_no_op_for_non_subscript_events() {
1881 // Non-SubScript events pass through `flatten_subscript_chain`
1882 // unchanged.
1883 let evt = EngineEvent::Log("hi".into());
1884 let flat = evt.flatten_subscript_chain();
1885 assert!(matches!(flat, EngineEvent::Log(ref s) if s == "hi"));
1886 }
1887
1888 #[test]
1889 fn task_end_event_value_type_none_serializes_null() {
1890 let e = EngineEvent::TaskEnd {
1891 task: "t".into(),
1892 on_error_label: None,
1893 value: Value::Null,
1894 value_type: None,
1895 duration: std::time::Duration::from_millis(5),
1896 attempt: 1,
1897 usage: None,
1898 variant: TaskEndVariant::Success,
1899 };
1900 let s = serde_json::to_string(&e).unwrap();
1901 assert!(s.contains("\"attempt\":1"), "attempt should be 1: {}", s);
1902 // value_type: None serializes as null
1903 assert!(s.contains("\"value_type\":null"), "value_type should be null: {}", s);
1904 }
1905
1906 #[test]
1907 fn task_end_event_without_variant_deserializes_as_success() {
1908 // Wire-compat guard: pre-#206 servers emit `TaskEnd` with no
1909 // `variant` field. `#[serde(default)]` must carry it to `Success`.
1910 //
1911 // The `value` field uses the clean wire form spec'd in
1912 // `docs/src/content/docs/reference/engine-events.mdx` — a bare JSON
1913 // value, not the engine's internal tagged `{"String": "x"}` shape.
1914 let json = r#"{
1915 "type": "TaskEnd",
1916 "payload": {
1917 "task": "t",
1918 "on_error_label": null,
1919 "value": "x",
1920 "value_type": null,
1921 "duration": {"secs": 0, "nanos": 10000000},
1922 "attempt": 1,
1923 "usage": null
1924 }
1925 }"#;
1926 let e: EngineEvent = serde_json::from_str(json).unwrap();
1927 match e {
1928 EngineEvent::TaskEnd { variant, .. } => {
1929 assert_eq!(variant, TaskEndVariant::Success);
1930 }
1931 _ => panic!("expected TaskEnd"),
1932 }
1933 }
1934
1935 #[test]
1936 fn task_end_event_with_unable_variant_roundtrips() {
1937 let e = EngineEvent::TaskEnd {
1938 task: "decompose".into(),
1939 on_error_label: None,
1940 value: Value::Unable(crate::value::UnableRecord {
1941 reason: "image too blurry".into(),
1942 missing: vec!["claim_text".into()],
1943 category: crate::value::UnableCategory::InputAmbiguous,
1944 }),
1945 value_type: None,
1946 duration: std::time::Duration::from_millis(10),
1947 attempt: 1,
1948 usage: None,
1949 variant: TaskEndVariant::Unable,
1950 };
1951 let s = serde_json::to_string(&e).unwrap();
1952 assert!(s.contains("\"variant\":\"unable\""), "{s}");
1953 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1954 match back {
1955 EngineEvent::TaskEnd { variant, .. } => {
1956 assert_eq!(variant, TaskEndVariant::Unable);
1957 }
1958 _ => panic!("expected TaskEnd"),
1959 }
1960 }
1961
1962 #[test]
1963 fn task_end_variant_unknown_discriminator_deserializes_as_unknown() {
1964 // Forward-compat: a newer engine adds (say) `partial` for #205 and
1965 // an older SDK must not crash. `#[serde(other)]` routes the unknown
1966 // tag to `TaskEndVariant::Unknown`.
1967 let json = r#"{
1968 "type": "TaskEnd",
1969 "payload": {
1970 "task": "t",
1971 "on_error_label": null,
1972 "value": null,
1973 "value_type": null,
1974 "duration": {"secs": 0, "nanos": 0},
1975 "attempt": 1,
1976 "usage": null,
1977 "variant": "partial"
1978 }
1979 }"#;
1980 let e: EngineEvent = serde_json::from_str(json).unwrap();
1981 match e {
1982 EngineEvent::TaskEnd { variant, .. } => {
1983 assert_eq!(variant, TaskEndVariant::Unknown);
1984 }
1985 _ => panic!("expected TaskEnd"),
1986 }
1987 }
1988
1989 #[test]
1990 fn suspended_event_with_validation_exhausted_trigger_serializes() {
1991 let e = EngineEvent::Suspended {
1992 checkpoint_name: "human_review".into(),
1993 token: "tok".into(),
1994 prompt: "Please review".into(),
1995 schema: serde_json::json!({"type": "integer"}),
1996 actor_hint: ActorHint::Human,
1997 timeout_secs: Some(3600),
1998 trigger: SuspendTrigger::ValidationExhausted {
1999 task_name: "decompose_claims".into(),
2000 retry_count: 3,
2001 last_attempt: "{\"bad\": true}".into(),
2002 validation_errors: vec![ValidationErrorWire {
2003 stage: "schema".into(),
2004 message: "required property \"number\" missing".into(),
2005 path: Some("/0".into()),
2006 }],
2007 },
2008 loop_context: None,
2009 };
2010 let s = serde_json::to_string(&e).unwrap();
2011 assert!(s.contains("\"kind\":\"ValidationExhausted\""), "{s}");
2012 assert!(s.contains("\"retry_count\":3"), "{s}");
2013 assert!(s.contains("\"task_name\":\"decompose_claims\""), "{s}");
2014 assert!(s.contains("\"stage\":\"schema\""), "{s}");
2015 // `loop_context` is None on a top-level checkpoint; the field
2016 // skips serialization so older SDKs see the same wire shape.
2017 assert!(!s.contains("\"loop_context\""), "{s}");
2018 }
2019
2020 #[test]
2021 fn suspended_event_with_loop_context_roundtrips() {
2022 // Mid-loop suspension carries a `loop_context` envelope that
2023 // SDK consumers use to render the suspension in the loop's UI
2024 // lane and that the spawn handler persists alongside the event.
2025 let e = EngineEvent::Suspended {
2026 checkpoint_name: "review".into(),
2027 token: "tok".into(),
2028 prompt: "Triage skill failure".into(),
2029 schema: serde_json::json!({}),
2030 actor_hint: ActorHint::Human,
2031 timeout_secs: None,
2032 trigger: SuspendTrigger::AgentUnable {
2033 task_name: "summarize".into(),
2034 unable: crate::value::UnableRecord {
2035 reason: "input ambiguous".into(),
2036 missing: vec![],
2037 category: crate::value::UnableCategory::InputAmbiguous,
2038 },
2039 },
2040 loop_context: Some(LoopSuspendContext {
2041 loop_id: "11111111-2222-3333-4444-555555555555".into(),
2042 loop_name: "research".into(),
2043 turn: 2,
2044 }),
2045 };
2046 let s = serde_json::to_string(&e).unwrap();
2047 assert!(s.contains("\"loop_context\""), "{s}");
2048 assert!(s.contains("\"loop_name\":\"research\""), "{s}");
2049 assert!(s.contains("\"turn\":2"), "{s}");
2050 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2051 match back {
2052 EngineEvent::Suspended { loop_context: Some(ctx), .. } => {
2053 assert_eq!(ctx.loop_name, "research");
2054 assert_eq!(ctx.turn, 2);
2055 assert_eq!(ctx.loop_id, "11111111-2222-3333-4444-555555555555");
2056 }
2057 _ => panic!("expected Suspended with loop_context"),
2058 }
2059 }
2060
2061 #[test]
2062 fn suspended_event_without_trigger_deserializes_as_dag_position() {
2063 // Wire-compat guard: old servers / old SDKs omit `trigger`.
2064 // `#[serde(default)]` must carry the field to `DagPosition`.
2065 let json = r#"{
2066 "type": "Suspended",
2067 "payload": {
2068 "checkpoint_name": "cp",
2069 "token": "t",
2070 "prompt": "p",
2071 "schema": {},
2072 "actor_hint": "Human",
2073 "timeout_secs": null
2074 }
2075 }"#;
2076 let e: EngineEvent = serde_json::from_str(json).unwrap();
2077 match e {
2078 EngineEvent::Suspended { trigger, .. } => {
2079 assert!(matches!(trigger, SuspendTrigger::DagPosition));
2080 }
2081 _ => panic!("expected Suspended"),
2082 }
2083 }
2084
2085 #[test]
2086 fn suspended_event_with_dag_position_trigger_roundtrips() {
2087 let e = EngineEvent::Suspended {
2088 checkpoint_name: "cp".into(),
2089 token: "t".into(),
2090 prompt: "p".into(),
2091 schema: serde_json::json!({}),
2092 actor_hint: ActorHint::Human,
2093 timeout_secs: None,
2094 trigger: SuspendTrigger::DagPosition,
2095 loop_context: None,
2096 };
2097 let s = serde_json::to_string(&e).unwrap();
2098 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2099 match back {
2100 EngineEvent::Suspended { trigger, .. } => {
2101 assert!(matches!(trigger, SuspendTrigger::DagPosition));
2102 }
2103 _ => panic!("expected Suspended"),
2104 }
2105 }
2106
2107 #[test]
2108 fn suspend_trigger_agent_variant_roundtrips_with_payload() {
2109 // #226 M5: non-Unable arm of a discriminated union routed to a
2110 // checkpoint emits AgentVariant, carrying the variant name and
2111 // the parsed record as JSON. Studio renders a generic
2112 // "agent returned variant <X>" badge on this trigger.
2113 let trigger = SuspendTrigger::AgentVariant {
2114 task_name: "decompose".into(),
2115 variant: "ClaimErr".into(),
2116 payload: serde_json::json!({
2117 "message": "claim unsupported by evidence",
2118 "claim_id": "c-7",
2119 }),
2120 };
2121 let s = serde_json::to_string(&trigger).unwrap();
2122 assert!(s.contains("\"kind\":\"AgentVariant\""), "{s}");
2123 assert!(s.contains("\"variant\":\"ClaimErr\""), "{s}");
2124 let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
2125 match back {
2126 SuspendTrigger::AgentVariant { task_name, variant, payload } => {
2127 assert_eq!(task_name, "decompose");
2128 assert_eq!(variant, "ClaimErr");
2129 assert_eq!(
2130 payload["message"].as_str(),
2131 Some("claim unsupported by evidence"),
2132 );
2133 }
2134 other => panic!("expected AgentVariant, got {other:?}"),
2135 }
2136 }
2137
2138 #[test]
2139 fn validation_failure_event_serializes_full_payload() {
2140 // #320: Studio + tooling consume the structured fields. Round-trip
2141 // ensures missing/extra/type breakdowns and the stop_reason carry
2142 // across the wire without losing shape.
2143 let e = EngineEvent::ValidationFailure {
2144 task_name: "classify_features".into(),
2145 attempt: 2,
2146 model_response: "{}".into(),
2147 truncated: false,
2148 total_length: 2,
2149 missing_fields: vec!["/classifications".into()],
2150 extra_fields: vec![],
2151 type_errors: vec![
2152 "expected string, got null at /summary".into(),
2153 ],
2154 stop_reason: Some("max_tokens".into()),
2155 };
2156 let s = serde_json::to_string(&e).unwrap();
2157 assert!(s.contains("\"type\":\"ValidationFailure\""), "{s}");
2158 assert!(s.contains("\"task_name\":\"classify_features\""), "{s}");
2159 assert!(s.contains("\"attempt\":2"), "{s}");
2160 assert!(s.contains("\"model_response\":\"{}\""), "{s}");
2161 assert!(s.contains("\"/classifications\""), "{s}");
2162 assert!(s.contains("\"stop_reason\":\"max_tokens\""), "{s}");
2163
2164 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2165 match back {
2166 EngineEvent::ValidationFailure {
2167 task_name,
2168 attempt,
2169 model_response,
2170 missing_fields,
2171 extra_fields,
2172 type_errors,
2173 stop_reason,
2174 ..
2175 } => {
2176 assert_eq!(task_name, "classify_features");
2177 assert_eq!(attempt, 2);
2178 assert_eq!(model_response, "{}");
2179 assert_eq!(missing_fields, vec!["/classifications"]);
2180 assert!(extra_fields.is_empty());
2181 assert_eq!(type_errors.len(), 1);
2182 assert_eq!(stop_reason.as_deref(), Some("max_tokens"));
2183 }
2184 other => panic!("expected ValidationFailure, got {other:?}"),
2185 }
2186 }
2187
2188 #[test]
2189 fn suspend_trigger_agent_unable_roundtrips_with_payload() {
2190 // Reserved variant — Stream 4 populates this in a follow-up. The
2191 // shape is locked here so Stream 4's engine-emit site is pure code
2192 // addition, no wire-shape churn.
2193 let trigger = SuspendTrigger::AgentUnable {
2194 task_name: "escalate".into(),
2195 unable: crate::value::UnableRecord {
2196 reason: "image too blurry to OCR".into(),
2197 missing: vec!["claim_text".into()],
2198 category: crate::value::UnableCategory::InputAmbiguous,
2199 },
2200 };
2201 let s = serde_json::to_string(&trigger).unwrap();
2202 assert!(s.contains("\"kind\":\"AgentUnable\""), "{s}");
2203 let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
2204 match back {
2205 SuspendTrigger::AgentUnable { task_name, unable } => {
2206 assert_eq!(task_name, "escalate");
2207 assert_eq!(unable.category, crate::value::UnableCategory::InputAmbiguous);
2208 }
2209 other => panic!("expected AgentUnable, got {other:?}"),
2210 }
2211 }
2212
2213 #[test]
2214 fn error_event_with_code_serializes_with_code_field() {
2215 let e = EngineEvent::error(crate::error::ErrorDetail::new(
2216 crate::error::ErrorCode::ScriptDepthExceeded,
2217 "boom",
2218 ));
2219 let s = serde_json::to_string(&e).unwrap();
2220 assert!(s.contains("\"code\":\"ScriptDepthExceeded\""), "{s}");
2221 }
2222
2223 #[test]
2224 fn error_event_default_code_is_other_for_kind_only_construction() {
2225 // `error_kind` derives the code from the kind via
2226 // `ErrorDetail::from_kind`. ScriptError → ScriptError code.
2227 let e = EngineEvent::error_kind(crate::error::ErrorKind::ScriptError, "plain");
2228 let s = serde_json::to_string(&e).unwrap();
2229 assert!(s.contains("\"code\":\"ScriptError\""), "{s}");
2230 }
2231
2232 #[test]
2233 fn context_compacted_event_roundtrips_full_payload() {
2234 let e = EngineEvent::ContextCompacted {
2235 agent: "researcher".into(),
2236 loop_id: Some("11111111-2222-3333-4444-555555555555".into()),
2237 turn: Some(3),
2238 threshold_pct: Some(70),
2239 threshold_abs: Some(140_000),
2240 strategy: "drop_thinking_blocks".into(),
2241 before_tokens: 142_000,
2242 after_tokens: 96_000,
2243 provider_native: false,
2244 cache_ttl: None,
2245 };
2246 let s = serde_json::to_string(&e).unwrap();
2247 assert!(s.contains("\"type\":\"ContextCompacted\""), "{s}");
2248 assert!(s.contains("\"agent\":\"researcher\""), "{s}");
2249 assert!(s.contains("\"strategy\":\"drop_thinking_blocks\""), "{s}");
2250 assert!(s.contains("\"before_tokens\":142000"), "{s}");
2251 assert!(s.contains("\"after_tokens\":96000"), "{s}");
2252 assert!(s.contains("\"provider_native\":false"), "{s}");
2253 assert!(s.contains("\"turn\":3"), "{s}");
2254
2255 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2256 match back {
2257 EngineEvent::ContextCompacted {
2258 agent,
2259 loop_id,
2260 turn,
2261 threshold_pct,
2262 threshold_abs,
2263 strategy,
2264 before_tokens,
2265 after_tokens,
2266 provider_native,
2267 cache_ttl: _,
2268 } => {
2269 assert_eq!(agent, "researcher");
2270 assert_eq!(loop_id.as_deref(), Some("11111111-2222-3333-4444-555555555555"));
2271 assert_eq!(turn, Some(3));
2272 assert_eq!(threshold_pct, Some(70));
2273 assert_eq!(threshold_abs, Some(140_000));
2274 assert_eq!(strategy, "drop_thinking_blocks");
2275 assert_eq!(before_tokens, 142_000);
2276 assert_eq!(after_tokens, 96_000);
2277 assert!(!provider_native);
2278 }
2279 other => panic!("expected ContextCompacted, got {other:?}"),
2280 }
2281 }
2282
2283 #[test]
2284 fn context_compacted_event_provider_native_roundtrips() {
2285 // Anthropic / OpenAI server-side compaction surfaces with
2286 // provider_native = true and the loop/turn context is absent
2287 // (compaction happens inside a provider call, not at a loop
2288 // boundary).
2289 let e = EngineEvent::ContextCompacted {
2290 agent: "summarizer".into(),
2291 loop_id: None,
2292 turn: None,
2293 threshold_pct: None,
2294 threshold_abs: None,
2295 strategy: "provider_native".into(),
2296 before_tokens: 180_000,
2297 after_tokens: 42_000,
2298 provider_native: true,
2299 cache_ttl: Some("1h".to_string()),
2300 };
2301 let s = serde_json::to_string(&e).unwrap();
2302 assert!(s.contains("\"provider_native\":true"), "{s}");
2303 assert!(s.contains("\"strategy\":\"provider_native\""), "{s}");
2304 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2305 match back {
2306 EngineEvent::ContextCompacted { provider_native, loop_id, turn, .. } => {
2307 assert!(provider_native);
2308 assert!(loop_id.is_none());
2309 assert!(turn.is_none());
2310 }
2311 other => panic!("expected ContextCompacted, got {other:?}"),
2312 }
2313 }
2314
2315 #[test]
2316 fn context_overflow_event_roundtrips_full_payload() {
2317 let e = EngineEvent::ContextOverflow {
2318 agent: "researcher".into(),
2319 attempted_strategies: vec![
2320 "drop_thinking_blocks".into(),
2321 "drop_oldest_tool_results".into(),
2322 "summarize_to_state".into(),
2323 ],
2324 configured_cap_tokens: 200_000,
2325 model_context_window: 200_000,
2326 terminated_by_hard_error: false,
2327 };
2328 let s = serde_json::to_string(&e).unwrap();
2329 assert!(s.contains("\"type\":\"ContextOverflow\""), "{s}");
2330 assert!(s.contains("\"agent\":\"researcher\""), "{s}");
2331 assert!(s.contains("\"configured_cap_tokens\":200000"), "{s}");
2332 assert!(s.contains("\"model_context_window\":200000"), "{s}");
2333 assert!(s.contains("\"drop_thinking_blocks\""), "{s}");
2334
2335 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2336 match back {
2337 EngineEvent::ContextOverflow {
2338 agent,
2339 attempted_strategies,
2340 configured_cap_tokens,
2341 model_context_window,
2342 terminated_by_hard_error,
2343 } => {
2344 assert_eq!(agent, "researcher");
2345 assert_eq!(attempted_strategies.len(), 3);
2346 assert_eq!(attempted_strategies[0], "drop_thinking_blocks");
2347 assert_eq!(configured_cap_tokens, 200_000);
2348 assert_eq!(model_context_window, 200_000);
2349 assert!(!terminated_by_hard_error);
2350 }
2351 other => panic!("expected ContextOverflow, got {other:?}"),
2352 }
2353 }
2354
2355 #[test]
2356 fn runtime_start_serializes_with_expected_fields() {
2357 let ev = EngineEvent::RuntimeStart {
2358 task_name: "analyze".into(),
2359 runtime_name: "run_python".into(),
2360 language: "python".into(),
2361 };
2362 let j = serde_json::to_value(&ev).unwrap();
2363 assert_eq!(j["type"], "RuntimeStart");
2364 assert_eq!(j["payload"]["task_name"], "analyze");
2365 assert_eq!(j["payload"]["runtime_name"], "run_python");
2366 assert_eq!(j["payload"]["language"], "python");
2367
2368 let s = serde_json::to_string(&ev).unwrap();
2369 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2370 match back {
2371 EngineEvent::RuntimeStart { task_name, runtime_name, language } => {
2372 assert_eq!(task_name, "analyze");
2373 assert_eq!(runtime_name, "run_python");
2374 assert_eq!(language, "python");
2375 }
2376 other => panic!("expected RuntimeStart, got {other:?}"),
2377 }
2378 }
2379
2380 #[test]
2381 fn runtime_stdout_stderr_roundtrip() {
2382 let stdout = EngineEvent::RuntimeStdout {
2383 task_name: "t".into(),
2384 chunk: "hello\n".into(),
2385 };
2386 let stderr = EngineEvent::RuntimeStderr {
2387 task_name: "t".into(),
2388 chunk: "warn\n".into(),
2389 };
2390 let j_out = serde_json::to_value(&stdout).unwrap();
2391 let j_err = serde_json::to_value(&stderr).unwrap();
2392 assert_eq!(j_out["type"], "RuntimeStdout");
2393 assert_eq!(j_err["type"], "RuntimeStderr");
2394 assert_eq!(j_out["payload"]["chunk"], "hello\n");
2395 assert_eq!(j_err["payload"]["chunk"], "warn\n");
2396 }
2397
2398 #[test]
2399 fn runtime_end_serializes_with_exit_code_and_duration() {
2400 let ev = EngineEvent::RuntimeEnd {
2401 task_name: "analyze".into(),
2402 exit_code: 0,
2403 duration_ms: 1234,
2404 };
2405 let j = serde_json::to_value(&ev).unwrap();
2406 assert_eq!(j["type"], "RuntimeEnd");
2407 assert_eq!(j["payload"]["exit_code"], 0);
2408 assert_eq!(j["payload"]["duration_ms"], 1234);
2409
2410 let back: EngineEvent = serde_json::from_value(j).unwrap();
2411 match back {
2412 EngineEvent::RuntimeEnd { exit_code, duration_ms, .. } => {
2413 assert_eq!(exit_code, 0);
2414 assert_eq!(duration_ms, 1234);
2415 }
2416 other => panic!("expected RuntimeEnd, got {other:?}"),
2417 }
2418 }
2419
2420 #[test]
2421 fn runtime_error_serializes_with_kind_and_message() {
2422 let ev = EngineEvent::RuntimeError {
2423 task_name: "t".into(),
2424 kind: "Timeout".into(),
2425 message: "exceeded 30s".into(),
2426 };
2427 let j = serde_json::to_value(&ev).unwrap();
2428 assert_eq!(j["type"], "RuntimeError");
2429 assert_eq!(j["payload"]["kind"], "Timeout");
2430 assert_eq!(j["payload"]["message"], "exceeded 30s");
2431 }
2432
2433 #[test]
2434 fn task_cache_hit_serializes_with_agent_and_key_prefix() {
2435 // P3: when `PersistentTaskCache::get` returns a hit, the engine
2436 // emits this event so trace inspectors / Studio / MCP can show
2437 // "stage N was cached" without parsing prompt-segment internals.
2438 let ev = EngineEvent::TaskCacheHit {
2439 agent: "Researcher".into(),
2440 key_prefix: "f7d3a9".into(),
2441 };
2442 let j = serde_json::to_value(&ev).unwrap();
2443 assert_eq!(j["type"], "TaskCacheHit");
2444 assert_eq!(j["payload"]["agent"], "Researcher");
2445 assert_eq!(j["payload"]["key_prefix"], "f7d3a9");
2446
2447 // Round-trip
2448 let s = serde_json::to_string(&ev).unwrap();
2449 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2450 match back {
2451 EngineEvent::TaskCacheHit { agent, key_prefix } => {
2452 assert_eq!(agent, "Researcher");
2453 assert_eq!(key_prefix, "f7d3a9");
2454 }
2455 other => panic!("expected TaskCacheHit, got {other:?}"),
2456 }
2457 }
2458
2459 #[test]
2460 fn validation_failure_caps_oversized_response() {
2461 // Issue #1139: emit-time cap prevents megabyte tool outputs
2462 // from bloating execution_events. A response larger than the
2463 // cap is truncated, `truncated=true`, and `total_length`
2464 // preserves the original byte count.
2465 let huge = "x".repeat(VALIDATION_FAILURE_RESPONSE_CAP_BYTES + 1024);
2466 let ev = EngineEvent::validation_failure(
2467 "t".to_string(),
2468 1,
2469 huge.clone(),
2470 vec![],
2471 vec![],
2472 vec![],
2473 None,
2474 );
2475 match ev {
2476 EngineEvent::ValidationFailure {
2477 model_response,
2478 truncated,
2479 total_length,
2480 ..
2481 } => {
2482 assert!(truncated, "should be truncated");
2483 assert_eq!(total_length, huge.len() as u64);
2484 assert!(model_response.len() <= VALIDATION_FAILURE_RESPONSE_CAP_BYTES);
2485 }
2486 other => panic!("expected ValidationFailure, got {other:?}"),
2487 }
2488 }
2489
2490 #[test]
2491 fn validation_failure_under_cap_is_unchanged() {
2492 let body = "{\"k\": \"v\"}".to_string();
2493 let ev = EngineEvent::validation_failure(
2494 "t".to_string(),
2495 1,
2496 body.clone(),
2497 vec![],
2498 vec![],
2499 vec![],
2500 None,
2501 );
2502 match ev {
2503 EngineEvent::ValidationFailure {
2504 model_response,
2505 truncated,
2506 total_length,
2507 ..
2508 } => {
2509 assert!(!truncated);
2510 assert_eq!(total_length, body.len() as u64);
2511 assert_eq!(model_response, body);
2512 }
2513 other => panic!("expected ValidationFailure, got {other:?}"),
2514 }
2515 }
2516
2517 #[test]
2518 fn tool_approval_resolved_round_trips() {
2519 // Issue #857: audit trail companion to ToolApprovalPending.
2520 let ev = EngineEvent::ToolApprovalResolved {
2521 token: "tok_abc".into(),
2522 approved: true,
2523 args_override: Some(serde_json::json!({"safe": true})),
2524 reason: Some("operator approved".into()),
2525 };
2526 let s = serde_json::to_string(&ev).unwrap();
2527 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2528 match back {
2529 EngineEvent::ToolApprovalResolved { token, approved, reason, .. } => {
2530 assert_eq!(token, "tok_abc");
2531 assert!(approved);
2532 assert_eq!(reason.as_deref(), Some("operator approved"));
2533 }
2534 other => panic!("expected ToolApprovalResolved, got {other:?}"),
2535 }
2536 }
2537
2538 #[test]
2539 fn tool_approval_skipped_round_trips() {
2540 // Issue #1110: auto-approval audit gap closer.
2541 let ev = EngineEvent::ToolApprovalSkipped {
2542 execution_id: Some("exec_1".into()),
2543 node_id: Some(7),
2544 tool_ref: "gh.list_issues".into(),
2545 reason: "policy:read_only".into(),
2546 };
2547 let s = serde_json::to_string(&ev).unwrap();
2548 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2549 match back {
2550 EngineEvent::ToolApprovalSkipped { tool_ref, reason, .. } => {
2551 assert_eq!(tool_ref, "gh.list_issues");
2552 assert_eq!(reason, "policy:read_only");
2553 }
2554 other => panic!("expected ToolApprovalSkipped, got {other:?}"),
2555 }
2556 }
2557
2558 #[test]
2559 fn tool_replay_uncertain_round_trips() {
2560 // Issue #872: structured event for durable-replay tool ambiguity.
2561 let args = serde_json::json!({"channel": "general", "text": "hi"});
2562 let ev = EngineEvent::ToolReplayUncertain {
2563 execution_id: Some("exec_42".into()),
2564 tool_use_id: "tu_abc".into(),
2565 tool_name: "send_message".into(),
2566 args: args.clone(),
2567 };
2568 let s = serde_json::to_string(&ev).unwrap();
2569 assert!(s.contains("\"type\":\"ToolReplayUncertain\""), "{s}");
2570 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2571 match back {
2572 EngineEvent::ToolReplayUncertain {
2573 execution_id,
2574 tool_use_id,
2575 tool_name,
2576 args: a,
2577 } => {
2578 assert_eq!(execution_id.as_deref(), Some("exec_42"));
2579 assert_eq!(tool_use_id, "tu_abc");
2580 assert_eq!(tool_name, "send_message");
2581 assert_eq!(a, args);
2582 }
2583 other => panic!("expected ToolReplayUncertain, got {other:?}"),
2584 }
2585 }
2586
2587 #[test]
2588 fn llm_replay_cache_hit_round_trips() {
2589 // Issue #815: explicit replay-cache-hit signal for SDK consumers.
2590 let ev = EngineEvent::LLMReplayCacheHit {
2591 node_id: "n42".into(),
2592 call_index: 3,
2593 };
2594 let s = serde_json::to_string(&ev).unwrap();
2595 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2596 match back {
2597 EngineEvent::LLMReplayCacheHit { node_id, call_index } => {
2598 assert_eq!(node_id, "n42");
2599 assert_eq!(call_index, 3);
2600 }
2601 other => panic!("expected LLMReplayCacheHit, got {other:?}"),
2602 }
2603 }
2604
2605 #[test]
2606 fn loop_turn_carries_usage_when_present_and_omits_when_none() {
2607 // Issue #829: per-turn token usage on LoopTurn lets consumers
2608 // skip walking the LLMResponse sub-tree for per-turn cost.
2609 let ev = EngineEvent::LoopTurn {
2610 name: "review".into(),
2611 turn: 4,
2612 tool_calls: vec!["fetch".into()],
2613 usage: Some(TokenUsage {
2614 input_tokens: 100,
2615 output_tokens: 25,
2616 model: "claude".into(),
2617 provider: "anthropic".into(),
2618 ..Default::default()
2619 }),
2620 };
2621 let s = serde_json::to_string(&ev).unwrap();
2622 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2623 match back {
2624 EngineEvent::LoopTurn { usage, .. } => {
2625 let u = usage.expect("usage present");
2626 assert_eq!(u.input_tokens, 100);
2627 assert_eq!(u.output_tokens, 25);
2628 }
2629 other => panic!("expected LoopTurn, got {other:?}"),
2630 }
2631 // `None` usage omits the field from the wire shape.
2632 let ev2 = EngineEvent::LoopTurn {
2633 name: "review".into(),
2634 turn: 1,
2635 tool_calls: vec![],
2636 usage: None,
2637 };
2638 let s2 = serde_json::to_string(&ev2).unwrap();
2639 assert!(!s2.contains("\"usage\""), "usage should be skipped when None: {s2}");
2640 }
2641
2642 #[test]
2643 fn sub_script_carries_parent_node_id_and_attempt() {
2644 // Issue #845: parent retry attribution on SubScript envelopes.
2645 let inner = EngineEvent::Log("hi".into());
2646 let ev = EngineEvent::SubScript {
2647 script_name: "child".into(),
2648 parent_task: "result".into(),
2649 parent_node_id: Some(13),
2650 attempt: Some(2),
2651 parent_path: Vec::new(),
2652 child: Box::new(inner),
2653 };
2654 let s = serde_json::to_string(&ev).unwrap();
2655 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2656 match back {
2657 EngineEvent::SubScript { parent_node_id, attempt, .. } => {
2658 assert_eq!(parent_node_id, Some(13));
2659 assert_eq!(attempt, Some(2));
2660 }
2661 other => panic!("expected SubScript, got {other:?}"),
2662 }
2663 }
2664
2665 #[test]
2666 fn sub_script_back_compat_omits_new_fields_when_none() {
2667 // The new fields default to None and are skipped from the
2668 // wire shape when unset — preserving wire compat for pre-#845
2669 // payloads.
2670 let inner = EngineEvent::Log("hi".into());
2671 let ev = EngineEvent::SubScript {
2672 script_name: "child".into(),
2673 parent_task: "result".into(),
2674 parent_node_id: None,
2675 attempt: None,
2676 parent_path: Vec::new(),
2677 child: Box::new(inner),
2678 };
2679 let s = serde_json::to_string(&ev).unwrap();
2680 assert!(!s.contains("\"parent_node_id\""), "should be omitted when None: {s}");
2681 assert!(!s.contains("\"attempt\""), "should be omitted when None: {s}");
2682 }
2683
2684 #[test]
2685 fn token_usage_raw_stop_reason_round_trips() {
2686 // Issue #1077: `raw_stop_reason` is a new field that mirrors
2687 // `stop_reason` when the parse path produced the usage. It must
2688 // serialize alongside the canonical field and round-trip with
2689 // serde-default for back-compat.
2690 let u = TokenUsage {
2691 input_tokens: 10,
2692 output_tokens: 5,
2693 model: "claude-sonnet-4-6".into(),
2694 provider: "anthropic".into(),
2695 stop_reason: Some("end_turn".into()),
2696 raw_stop_reason: Some("end_turn".into()),
2697 ..Default::default()
2698 };
2699 let s = serde_json::to_string(&u).unwrap();
2700 assert!(s.contains("\"raw_stop_reason\":\"end_turn\""), "{s}");
2701 let back: TokenUsage = serde_json::from_str(&s).unwrap();
2702 assert_eq!(back.raw_stop_reason.as_deref(), Some("end_turn"));
2703 assert_eq!(back.stop_reason.as_deref(), Some("end_turn"));
2704 }
2705
2706 #[test]
2707 fn token_usage_raw_stop_reason_back_compat_pre_field() {
2708 // A wire payload predating #1077 omits `raw_stop_reason` entirely.
2709 // It must decode as `None` rather than failing.
2710 let json = r#"{
2711 "input_tokens": 10,
2712 "output_tokens": 5,
2713 "model": "m",
2714 "provider": "p",
2715 "cached_input_tokens": 0,
2716 "stop_reason": "stop"
2717 }"#;
2718 let u: TokenUsage = serde_json::from_str(json).unwrap();
2719 assert_eq!(u.raw_stop_reason, None);
2720 assert_eq!(u.stop_reason.as_deref(), Some("stop"));
2721 }
2722}