akribes_types/event.rs
1use crate::ast::{ActorHint, Span, TypeRef};
2use crate::error::{ErrorCode, ErrorKind, ErrorSource};
3use crate::value::Value;
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use std::collections::HashMap;
6
7/// Node-id type used by `EngineEvent::NodeStart` / `NodeEnd` /
8/// `Breakpoint*`. Mirror of `akribes_core::compiler::NodeId` — same
9/// underlying `usize` representation; the alias lives here so the
10/// SDK-facing `EngineEvent` doesn't need to depend on the compiler
11/// module.
12pub type NodeId = usize;
13
14/// Caches the LLM-emitted tool-use blocks so a replay rebuilds the same
15/// `tool_use_id`s and the downstream `ToolCallEnd` lookups stay stable.
16/// Mirror of `akribes_core::replay_cache::CachedToolCall` — same wire
17/// shape; the type lives here so it can be embedded in
18/// `EngineEvent::LLMResponse` without an akribes-core dependency.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct CachedToolCall {
21 pub tool_use_id: String,
22 pub name: String,
23 pub args: serde_json::Value,
24}
25
26/// Outcome of a child execution observed by its parent at the
27/// `call(...)` boundary. Mirror of
28/// `akribes_core::replay_cache::ChildOutcome` — same wire shape.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(tag = "kind", content = "detail")]
31pub enum ChildOutcome {
32 Ok {
33 value: serde_json::Value,
34 },
35 Err {
36 kind: String,
37 message: String,
38 code: Option<String>,
39 },
40}
41
42/// Default for `EngineEvent::Error::code` on payloads from older SDK
43/// versions that didn't include the field.
44fn default_error_code_other() -> ErrorCode {
45 ErrorCode::Other
46}
47
48/// Token usage from a single LLM call.
49///
50/// # Normalized superset convention
51///
52/// `input_tokens` is the **total** number of input tokens processed — it
53/// is a superset of `cached_input_tokens` and `cache_write_input_tokens`.
54/// Downstream cost logic derives the "fresh" (non-cached) portion by
55/// subtracting the two cache counts. This matches OpenAI and Gemini's
56/// native reporting; Anthropic's API reports the three groups as disjoint
57/// so the Anthropic parser normalizes by summing before assigning.
58///
59/// # Prompt-caching semantics per provider
60/// - **OpenAI** — `cached_input_tokens` counts cache READS (billed at a
61/// discount, typically 0.1x base input). Cache writes are free per
62/// OpenAI's caching docs. `cache_write_input_tokens` is always 0.
63/// - **Anthropic** — the API returns three token groups:
64/// `input_tokens` (fresh, 1x), `cache_read_input_tokens` (0.1x), and
65/// `cache_creation_input_tokens` (1.25x at the default 5-minute TTL,
66/// 2.0x at 1-hour TTL). The parser remaps these to the superset
67/// convention above. The per-TTL split is surfaced as
68/// `cache_write_5m_input_tokens` and `cache_write_1h_input_tokens`
69/// (parsed from `usage.cache_creation.ephemeral_5m_input_tokens` /
70/// `ephemeral_1h_input_tokens`); these two fields sum to
71/// `cache_write_input_tokens`. Akribes workflows opt into the 1h TTL
72/// via the `extended-cache-ttl-2025-04-11` beta header, so this split
73/// matters for cost accounting (#1091).
74/// - **Gemini** — only cache reads are reported; writes are not
75/// separately billed. `cache_write_input_tokens` is always 0.
76#[derive(Debug, Clone, Serialize, Deserialize, Default)]
77pub struct TokenUsage {
78 /// Total input tokens processed (superset of the two cache counts).
79 pub input_tokens: u64,
80 pub output_tokens: u64,
81 pub model: String,
82 pub provider: String,
83 /// Cache-READ tokens (billed at `CACHE_READ_RATE`, ~0.1x input).
84 pub cached_input_tokens: u64,
85 /// Cache-WRITE / creation tokens (Anthropic only today; billed at
86 /// `CACHE_WRITE_RATE`, 1.25x input at 5m TTL or 2.0x at 1h TTL).
87 /// This is the **total** across both TTL buckets; the breakdown
88 /// lives on [`Self::cache_write_5m_input_tokens`] and
89 /// [`Self::cache_write_1h_input_tokens`] (#1091). Serialized
90 /// default for backward-compatibility with events predating this
91 /// field.
92 #[serde(default)]
93 pub cache_write_input_tokens: u64,
94 /// Anthropic cache-WRITE tokens at the default 5-minute TTL,
95 /// parsed from `usage.cache_creation.ephemeral_5m_input_tokens`.
96 /// Subset of [`Self::cache_write_input_tokens`] — sums with
97 /// [`Self::cache_write_1h_input_tokens`] to the total. `0` on
98 /// providers that don't report the per-TTL breakdown (OpenAI,
99 /// Gemini, mock) and for pre-#1091 events that omit the field.
100 #[serde(default)]
101 pub cache_write_5m_input_tokens: u64,
102 /// Anthropic cache-WRITE tokens at the 1-hour TTL, parsed from
103 /// `usage.cache_creation.ephemeral_1h_input_tokens`. Subset of
104 /// [`Self::cache_write_input_tokens`] — sums with
105 /// [`Self::cache_write_5m_input_tokens`] to the total. `0` on
106 /// providers without per-TTL reporting (OpenAI, Gemini, mock) and
107 /// for pre-#1091 events that omit the field. The 1h-TTL bucket
108 /// bills at 2.0x base input vs. 1.25x for 5m — `pricing::compute_cost`
109 /// uses this split for accurate cost attribution (#1091).
110 #[serde(default)]
111 pub cache_write_1h_input_tokens: u64,
112 /// The provider-reported stop reason for the underlying call, when
113 /// known. Anthropic surfaces values like `"end_turn"`, `"max_tokens"`,
114 /// `"tool_use"`, `"stop_sequence"`. OpenAI: `"stop"`, `"length"`,
115 /// `"tool_calls"`. Gemini: `"STOP"`, `"MAX_TOKENS"`, etc.
116 ///
117 /// Carried alongside usage so the engine's validation-failure path can
118 /// distinguish "model truncated mid-output" (`max_tokens` / `length` /
119 /// `MAX_TOKENS`) from "model finished cleanly but produced an
120 /// invalid shape" — see issue #320 / #321. `None` for providers that
121 /// don't surface a stop reason or for paths that haven't been threaded
122 /// (e.g. the mock provider). Serialized with `#[serde(default)]` so old
123 /// wire payloads that omit the field still deserialize.
124 ///
125 /// Today this field carries the RAW provider value when the
126 /// `parse_*_usage` path produced the `TokenUsage` (the common case
127 /// for non-streamed calls). The `usage_from_outcome` rebuild path
128 /// (streaming + some retry paths) writes the OTel-canonical form
129 /// (`"stop"` / `"max_tokens"` / `"tool_use"` / `"content_filter"` /
130 /// `"other"`) because `LlmCallOutcome` only carries the canonical
131 /// form. Consumers that need a deterministic-by-provider raw value
132 /// should prefer [`Self::raw_stop_reason`] (#1077).
133 #[serde(default)]
134 pub stop_reason: Option<String>,
135 /// Raw provider stop reason, never lossy-mapped to OTel canonical
136 /// form. Set to the same value as [`Self::stop_reason`] when the
137 /// `parse_*_usage` path produced the usage; `None` otherwise
138 /// (mock, streaming rebuilds via `usage_from_outcome`).
139 ///
140 /// Bench / observability code that needs to distinguish Gemini's
141 /// `"STOP"` from `"RECITATION"` (both collapse to `"stop"` under
142 /// the canonical mapping) or Anthropic's `"stop_sequence"` from
143 /// `"end_turn"` should read this field. #1077.
144 #[serde(default)]
145 pub raw_stop_reason: Option<String>,
146 /// Reasoning / thinking tokens — a SUBSET of [`Self::output_tokens`],
147 /// not in addition. Captured from:
148 /// * OpenAI o-series + GPT-5: `usage.completion_tokens_details.reasoning_tokens`
149 /// * Anthropic extended-thinking: `usage.thinking_tokens` (when present)
150 /// * Gemini with `thinkingBudget` set: `usageMetadata.thoughtsTokenCount`
151 ///
152 /// `0` when the model didn't engage reasoning or the provider didn't
153 /// surface the breakdown. `#[serde(default)]` keeps wire-compat with
154 /// pre-#322 events that omit the field entirely.
155 #[serde(default)]
156 pub reasoning_tokens: u64,
157}
158
159/// One ancestor frame on a flattened [`EngineEvent::SubScript`] payload.
160///
161/// Issue #993: `EngineEvent::SubScript` used to wrap a nested
162/// `Box<EngineEvent>` per call-stack level, so a depth-N chain produced an
163/// event whose serialized payload was `O(N)` deep — a 10-level art_123_2
164/// fanout could blow a single SSE frame past a megabyte and choke
165/// reconnect logic. The new wire shape carries the leaf event flat and
166/// names ancestors via an ordered `parent_path` of these frames so SDKs
167/// rebuild the tree off the side without paying the recursion cost on
168/// every envelope.
169///
170/// Frame ordering: `parent_path[0]` is the outermost ancestor (direct
171/// child of the top-level workflow); `parent_path[len-1]` is the
172/// immediate parent of the currently-emitting sub-script (whose own
173/// `script_name` lives on the [`EngineEvent::SubScript`] fields, NOT
174/// inside `parent_path`).
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
176pub struct SubScriptFrame {
177 /// Called script's name as it lives in the parent's project. Matches
178 /// the `script_name` an outer SubScript envelope would have carried
179 /// in the legacy recursive shape.
180 pub script_name: String,
181 /// Variable name on the parent side that received the call result —
182 /// same semantics as the top-level [`EngineEvent::SubScript::parent_task`].
183 pub parent_task: String,
184 /// Compiler-stable id of the parent's call(...) node, when known.
185 /// Lets consumers correlate retries of the same call site (issue
186 /// #845) across the ancestor chain.
187 #[serde(default, skip_serializing_if = "Option::is_none")]
188 pub parent_node_id: Option<u64>,
189 /// 1-indexed attempt counter for author-raise retries at the same
190 /// call site. Matches [`EngineEvent::SubScript::attempt`] semantics.
191 #[serde(default, skip_serializing_if = "Option::is_none")]
192 pub attempt: Option<u8>,
193}
194
195/// Aggregate token + cost rollup emitted on [`EngineEvent::WorkflowEnd`].
196///
197/// Issue #1173: today's `WorkflowEnd` carries just the workflow's return
198/// value, so dashboards and the CLI re-walk every `TaskEnd` to compute
199/// per-execution totals. This struct surfaces the same numbers once on
200/// the terminating event so consumers can size the run without parsing
201/// the full event log.
202///
203/// The engine populates this by summing [`TokenUsage`] fields across
204/// every `TaskEnd` emitted in the workflow scope. Sub-script TaskEnds
205/// (events wrapped inside [`EngineEvent::SubScript`]) DO contribute —
206/// the engine relay forwards them to the parent counter so a chain's
207/// outer `WorkflowEnd` reflects the entire chain's spend.
208///
209/// `total_cost_usd` is the only field the engine can't always populate
210/// on its own — pricing tables live in `akribes-server` to keep
211/// `akribes-core` free of provider rate metadata. The field defaults to
212/// `0.0` and stays `0.0` when the engine has no cost data; downstream
213/// enrichers may overwrite it on the server side before persistence.
214#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
215pub struct WorkflowTotals {
216 /// Sum of [`TokenUsage::input_tokens`] across every `TaskEnd` in the
217 /// workflow scope (including sub-script `TaskEnd`s). Superset of
218 /// cached + cache-write — see [`TokenUsage`] for the convention.
219 #[serde(default)]
220 pub total_input_tokens: u64,
221 /// Sum of [`TokenUsage::output_tokens`].
222 #[serde(default)]
223 pub total_output_tokens: u64,
224 /// Sum of [`TokenUsage::cached_input_tokens`] (cache READS).
225 #[serde(default)]
226 pub total_cached_input_tokens: u64,
227 /// Sum of [`TokenUsage::reasoning_tokens`] (extended thinking /
228 /// reasoning tokens — a SUBSET of `total_output_tokens`).
229 #[serde(default)]
230 pub total_thinking_tokens: u64,
231 /// Tokens spent on tool-call traffic. Reserved for future per-tool
232 /// breakdown — the engine doesn't track this separately today, so
233 /// it stays `0`. Present on the wire so SDKs can render the slot
234 /// without a schema bump when the engine starts populating it.
235 #[serde(default)]
236 pub total_tool_tokens: u64,
237 /// Sum of per-task USD cost. Always `0.0` when emitted by the
238 /// engine — pricing lives in `akribes-server`. Server-side
239 /// enrichment may overwrite before persistence.
240 #[serde(default)]
241 pub total_cost_usd: f64,
242 /// Number of `TaskEnd` events folded into the totals above.
243 #[serde(default)]
244 pub task_count: u32,
245}
246
247impl WorkflowTotals {
248 /// Fold a single [`TokenUsage`] into the running totals. No-op for
249 /// `None` so the engine can call it on every `TaskEnd` without
250 /// branching on the optional `usage` field.
251 pub fn accumulate(&mut self, usage: Option<&TokenUsage>) {
252 self.task_count = self.task_count.saturating_add(1);
253 if let Some(u) = usage {
254 self.total_input_tokens = self.total_input_tokens.saturating_add(u.input_tokens);
255 self.total_output_tokens = self.total_output_tokens.saturating_add(u.output_tokens);
256 self.total_cached_input_tokens = self
257 .total_cached_input_tokens
258 .saturating_add(u.cached_input_tokens);
259 self.total_thinking_tokens = self
260 .total_thinking_tokens
261 .saturating_add(u.reasoning_tokens);
262 }
263 }
264
265 /// Merge another `WorkflowTotals` (e.g. from a sub-script) into self.
266 /// Used by the engine when the relay surfaces a child workflow's
267 /// rollup — the outer chain's totals subsume every sub-script's.
268 pub fn merge(&mut self, other: &WorkflowTotals) {
269 self.total_input_tokens = self
270 .total_input_tokens
271 .saturating_add(other.total_input_tokens);
272 self.total_output_tokens = self
273 .total_output_tokens
274 .saturating_add(other.total_output_tokens);
275 self.total_cached_input_tokens = self
276 .total_cached_input_tokens
277 .saturating_add(other.total_cached_input_tokens);
278 self.total_thinking_tokens = self
279 .total_thinking_tokens
280 .saturating_add(other.total_thinking_tokens);
281 self.total_tool_tokens = self
282 .total_tool_tokens
283 .saturating_add(other.total_tool_tokens);
284 self.total_cost_usd += other.total_cost_usd;
285 self.task_count = self.task_count.saturating_add(other.task_count);
286 }
287}
288
289/// Payload of [`EngineEvent::WorkflowEnd`]. Pairs the workflow's
290/// terminal output value with an aggregate [`WorkflowTotals`] rollup.
291///
292/// Serialized with a hand-written `Serialize` / `Deserialize` so the
293/// wire stays bridge-compatible between the legacy (pre-#1173) shape
294/// (`payload = <bare-value>`) and the new shape
295/// (`payload = {"value": <bare-value>, "total_input_tokens": N, ...}`).
296/// See the [`EngineEvent::WorkflowEnd`] doc for the disambiguation
297/// rule.
298#[derive(Debug, Clone, PartialEq)]
299pub struct WorkflowEndPayload {
300 /// The workflow's final return value (the historical payload).
301 pub value: Value,
302 /// Aggregate rollup across every `TaskEnd` in the workflow scope.
303 pub totals: WorkflowTotals,
304}
305
306impl Default for WorkflowEndPayload {
307 fn default() -> Self {
308 Self {
309 value: Value::Null,
310 totals: WorkflowTotals::default(),
311 }
312 }
313}
314
315impl WorkflowEndPayload {
316 /// Construct a payload from just a value (totals default to zero).
317 /// Used by call sites that don't have totals yet — they get the
318 /// historical behaviour automatically.
319 pub fn new(value: Value) -> Self {
320 Self {
321 value,
322 totals: WorkflowTotals::default(),
323 }
324 }
325
326 /// Construct a payload with both value and totals.
327 pub fn with_totals(value: Value, totals: WorkflowTotals) -> Self {
328 Self { value, totals }
329 }
330}
331
332// Allow call sites that have historically passed a bare `Value` to keep
333// doing so via `EngineEvent::WorkflowEnd(value.into())`. Tests and the
334// CLI fixture use this; the engine emit path uses the explicit
335// `WorkflowEndPayload::with_totals` constructor.
336impl From<Value> for WorkflowEndPayload {
337 fn from(value: Value) -> Self {
338 Self::new(value)
339 }
340}
341
342impl Serialize for WorkflowEndPayload {
343 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
344 use serde::ser::SerializeMap;
345 let mut map = s.serialize_map(Some(8))?;
346 // Emit the workflow output under "value" using the clean wire
347 // form (Value::to_wire_json) so downstream consumers see the
348 // same shape they always did, just one level nested.
349 map.serialize_entry("value", &self.value.to_wire_json())?;
350 // Aggregate totals — flat siblings of `value`. Matches issue
351 // #1173's wire shape exactly.
352 map.serialize_entry("total_input_tokens", &self.totals.total_input_tokens)?;
353 map.serialize_entry("total_output_tokens", &self.totals.total_output_tokens)?;
354 map.serialize_entry(
355 "total_cached_input_tokens",
356 &self.totals.total_cached_input_tokens,
357 )?;
358 map.serialize_entry("total_thinking_tokens", &self.totals.total_thinking_tokens)?;
359 map.serialize_entry("total_tool_tokens", &self.totals.total_tool_tokens)?;
360 map.serialize_entry("total_cost_usd", &self.totals.total_cost_usd)?;
361 map.serialize_entry("task_count", &self.totals.task_count)?;
362 map.end()
363 }
364}
365
366impl<'de> Deserialize<'de> for WorkflowEndPayload {
367 fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
368 // Read into a raw JSON value first so we can dispatch on shape.
369 let raw = serde_json::Value::deserialize(d)?;
370 // New shape disambiguator: an object that carries BOTH a
371 // `value` key AND at least one `total_*` aggregate key (or
372 // `task_count`). Anything else is the legacy bare-value form.
373 const AGG_KEYS: &[&str] = &[
374 "total_input_tokens",
375 "total_output_tokens",
376 "total_cached_input_tokens",
377 "total_thinking_tokens",
378 "total_tool_tokens",
379 "total_cost_usd",
380 "task_count",
381 ];
382 if let serde_json::Value::Object(map) = &raw {
383 let has_value = map.contains_key("value");
384 let has_any_agg = AGG_KEYS.iter().any(|k| map.contains_key(*k));
385 if has_value && has_any_agg {
386 let value = Value::from_json(map.get("value").unwrap());
387 let totals = WorkflowTotals {
388 total_input_tokens: map
389 .get("total_input_tokens")
390 .and_then(|v| v.as_u64())
391 .unwrap_or(0),
392 total_output_tokens: map
393 .get("total_output_tokens")
394 .and_then(|v| v.as_u64())
395 .unwrap_or(0),
396 total_cached_input_tokens: map
397 .get("total_cached_input_tokens")
398 .and_then(|v| v.as_u64())
399 .unwrap_or(0),
400 total_thinking_tokens: map
401 .get("total_thinking_tokens")
402 .and_then(|v| v.as_u64())
403 .unwrap_or(0),
404 total_tool_tokens: map
405 .get("total_tool_tokens")
406 .and_then(|v| v.as_u64())
407 .unwrap_or(0),
408 total_cost_usd: map
409 .get("total_cost_usd")
410 .and_then(|v| v.as_f64())
411 .unwrap_or(0.0),
412 task_count: map
413 .get("task_count")
414 .and_then(|v| v.as_u64())
415 .map(|n| n as u32)
416 .unwrap_or(0),
417 };
418 return Ok(WorkflowEndPayload { value, totals });
419 }
420 }
421 // Legacy bare-value form: the payload IS the workflow output.
422 Ok(WorkflowEndPayload {
423 value: Value::from_json(&raw),
424 totals: WorkflowTotals::default(),
425 })
426 }
427}
428
429/// Wire-format twin of [`crate::validation::ValidationError`]. Owned +
430/// serializable; the `stage` discriminator is a string (`"parse"`,
431/// `"schema"`, `"custom:<rule>"`) so SDK consumers don't need to round-trip
432/// through the internal enum.
433///
434/// Produced by [`crate::validation::ValidationError::to_wire`]. The internal
435/// `SchemaCompile` stage is intentionally not representable here — those
436/// errors short-circuit to [`crate::value::Value::FatalError`] before any
437/// [`EngineEvent::Suspended`] would be emitted (they're engine bugs, not
438/// model bugs; author can't fix them by reviewing a payload). See the
439/// authoritative decision in
440/// `docs/superpowers/plans/2026-04-18-epa-feature-tracker.md`
441/// ("Wave-1 M1 ship notes + cross-cutting decisions (2026-04-18, round 3)").
442#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
443pub struct ValidationErrorWire {
444 pub stage: String,
445 pub message: String,
446 pub path: Option<String>,
447}
448
449/// Why the engine suspended execution at a checkpoint. This is the canonical
450/// wire-shape for the `trigger` discriminator on [`EngineEvent::Suspended`];
451/// Stream 4 (#149 + #156 approach C) populates the `AgentUnable` variant,
452/// and Stream 6 (this stream, #156 approach B) populates
453/// `ValidationExhausted`.
454///
455/// Serde-tagged with an internal `"kind"` discriminator so deserializers can
456/// match on a single field without peeking at shape. Each variant carries
457/// its payload inline — there is no sidecar `unable_payload` / `exhaustion`
458/// field on `Suspended` itself.
459///
460/// Spec: `docs/superpowers/specs/2026-04-18-epa-checkpoint-validation-design.md`.
461/// The wire shape is locked per the Wave-1 round-3 tracker decision
462/// ("Suspended wire shape (S4 ↔ S6) = S6's embedded-payload shape").
463#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
464#[serde(tag = "kind")]
465#[derive(Default)]
466pub enum SuspendTrigger {
467 /// The DAG reached an explicit `checkpoint cp(...)` call site. This is
468 /// today's only behaviour; the variant carries no payload because the
469 /// checkpoint's own `expects:` schema fully describes what comes back
470 /// on resume.
471 #[default]
472 DagPosition,
473 /// The task's `on_validation_exhausted:` property fired: all
474 /// validation retries consumed without producing a payload that passes
475 /// the parse→schema→custom pipeline. Studio / SDKs render the last
476 /// failing attempt + its errors so the human can correct in place.
477 ValidationExhausted {
478 task_name: String,
479 retry_count: u32,
480 last_attempt: String,
481 validation_errors: Vec<ValidationErrorWire>,
482 },
483 /// Reserved for Stream 4 (#149 + #156 approach C) — emitted when a
484 /// task with a `T | Unable` return type produces an `Unable` value and
485 /// the flow routes it to a checkpoint via `on unable <cp>`. Shape is
486 /// invariant across Stream 4's four `on unable` forms: the payload is
487 /// always the `Unable` record (reason/missing/category). Not produced
488 /// by the engine in Stream 6 — defined here because Stream 6 owns the
489 /// canonical `SuspendTrigger` type so Stream 4's engine-emit site is
490 /// pure code addition, no wire-shape change.
491 AgentUnable {
492 task_name: String,
493 unable: crate::value::UnableRecord,
494 },
495 /// Emitted when a task whose declared return type is a discriminated
496 /// union `A | B | ... | Unable` produces a non-Unable variant and the
497 /// flow routes that variant to a checkpoint via `on <Variant> <cp>`.
498 /// `variant` is the canonical record name (PascalCase, matches the
499 /// source identifier); `payload` is the parsed record (with the
500 /// `kind` discriminator stripped). Studio renders a generic
501 /// "agent returned variant <X>" badge on this trigger.
502 ///
503 /// `AgentUnable` remains a specialization for the `Unable` arm —
504 /// rather than `AgentVariant { variant: "Unable", ... }` — to
505 /// preserve the existing Studio rendering path from #157 (zero-day
506 /// compat).
507 AgentVariant {
508 task_name: String,
509 variant: String,
510 payload: serde_json::Value,
511 },
512}
513
514/// Mid-loop checkpoint context attached to [`EngineEvent::Suspended`] when a
515/// suspension fires inside a `loop` block's per-turn dispatch (a skill task
516/// invoked as a loop tool whose `on_validation_exhausted` / `on unable`
517/// handler routes to a checkpoint).
518///
519/// This metadata travels alongside the regular `SuspendTrigger`: the trigger
520/// describes *why* the engine paused (DagPosition / ValidationExhausted /
521/// AgentUnable / AgentVariant), and `LoopSuspendContext` tells consumers
522/// *which loop turn* the suspension belongs to so resumption can be
523/// rendered in the loop's UI lane and so the spawn handler's persisted
524/// envelope carries enough context to reconstruct the loop's identity if the
525/// execution is later restarted (driver state versioning is a follow-up; the
526/// current cycle relies on the in-process await-point to hold the loop's
527/// stack).
528///
529/// Serialized as a flat object (`{loop_id, loop_name, turn}`) on the wire.
530/// `#[serde(default)]` on the field on [`EngineEvent::Suspended`] keeps
531/// wire-compat with older servers / SDKs that don't emit it (they
532/// deserialize as `None` — a non-loop suspension).
533#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
534pub struct LoopSuspendContext {
535 /// UUID v4 generated when the loop driver started; stable for the
536 /// lifetime of one `loop NAME(...)` invocation. Used by the spawn
537 /// handler / Studio to correlate `LoopStart` → `Suspended` → `Resumed`
538 /// → subsequent `LoopTurn` events into the same per-loop UI lane.
539 pub loop_id: String,
540 /// The declared `loop NAME(...)` identifier. Same value as
541 /// [`EngineEvent::LoopStart::name`] / [`EngineEvent::LoopTurn::name`].
542 pub loop_name: String,
543 /// 1-indexed turn the suspension occurred during. Matches the next
544 /// [`EngineEvent::LoopTurn::turn`] the engine will emit when the
545 /// suspension resumes and the turn settles.
546 pub turn: u32,
547}
548
549/// Discriminator for [`EngineEvent::TaskEnd`] that tells consumers *how* a
550/// task finished without having to introspect the `value` payload. Extracted
551/// in #206 (Stream 4 follow-up): before this, a caller had to inspect the
552/// `value` for a `Value::Unable` envelope to distinguish "the agent said I
553/// can't" from a well-typed successful return. Serde-tagged on a `"variant"`
554/// field so new arms ship without wire-shape churn.
555///
556/// `#[serde(other)]` on [`TaskEndVariant::Unknown`] preserves forward-compat:
557/// a future engine that adds e.g. `Partial` (#205) still deserializes on
558/// older SDKs — the unknown variant surfaces as `Unknown` and the stream
559/// keeps flowing. Do *not* match without a wildcard on this enum.
560#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
561#[serde(rename_all = "snake_case")]
562#[derive(Default)]
563pub enum TaskEndVariant {
564 /// The task produced a well-typed value that passed every stage of the
565 /// parse → schema → custom validation pipeline. This is the pre-#206
566 /// default and the variant carried when `#[serde(default)]` fires on
567 /// payloads that omit the field entirely (older servers).
568 #[default]
569 Success,
570 /// The task's declared return type was `T | Unable` and the agent
571 /// emitted a canonical `{"unable": {...}}` envelope. The `value` field
572 /// on [`EngineEvent::TaskEnd`] carries the full [`Value::Unable`] record
573 /// so consumers can render reason/missing/category without re-parsing.
574 Unable,
575 /// The task ended with a dispatch-level failure — provider error,
576 /// sandbox timeout, OOM kill, schema-validation budget exhausted,
577 /// or any other path where the `value` on [`EngineEvent::TaskEnd`]
578 /// is a [`Value::FatalError`]. Consumers grouping by task can use
579 /// this to render a failure UI without inspecting `value`. Emitted
580 /// from the `runtime` dispatch path; LLM tasks may also adopt it in
581 /// a follow-up. Older SDKs that don't know this variant will see it
582 /// as `Unknown` via `#[serde(other)]` and behave as today.
583 Failed,
584 /// Catch-all for future variants the SDK doesn't know yet. `#[serde(other)]`
585 /// routes unknown discriminants here so consumers never crash on a
586 /// newer engine — e.g. `Partial` lands in #205 and an older SDK will
587 /// see it as `Unknown` until its own upgrade.
588 #[serde(other)]
589 Unknown,
590}
591
592/// `serde` adapters that project a [`Value`] (or a container of it) to and
593/// from the canonical wire form documented in
594/// `docs/src/content/docs/reference/engine-events.mdx`.
595///
596/// The default `Serialize` / `Deserialize` derive on [`Value`] emits the
597/// internal tagged-enum shape (`{"String":"hi"}`, `{"Object":{...}}`, …),
598/// which is fine for caching / hashing but leaks the engine's internal
599/// representation onto the wire. Every [`EngineEvent`] field that carries
600/// a workflow-visible value uses these adapters so SDK consumers see the
601/// clean form spec'd in the docs.
602///
603/// On the read path we reconstruct a [`Value`] from clean JSON via
604/// [`Value::from_json`] — this is shape-preserving (e.g. an `Object` round-
605/// trips, a number becomes `Value::Int` / `Value::Decimal`). Variants that
606/// the engine emits with semantic meaning beyond shape — `Unable`, `Union`,
607/// `FatalError` — are NOT reconstructed back from their wire envelopes on
608/// the deserialize path because no in-process consumer relies on it: the
609/// engine never reads its own emitted JSON back as a `Value`, the Rust SDK
610/// converts via `to_wire_json` again before exposing it, and the durable
611/// replay path branches on a different set of events (`LLMResponse`,
612/// `ToolCall*`, `SubScriptResult`, `CheckpointResolution`) that already
613/// store their payloads as `serde_json::Value`.
614mod value_wire {
615 use super::*;
616
617 pub(super) fn serialize<S: Serializer>(v: &Value, s: S) -> Result<S::Ok, S::Error> {
618 v.to_wire_json().serialize(s)
619 }
620
621 pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Value, D::Error> {
622 let j = serde_json::Value::deserialize(d)?;
623 Ok(Value::from_json(&j))
624 }
625}
626
627mod opt_value_wire {
628 use super::*;
629
630 pub(super) fn serialize<S: Serializer>(v: &Option<Value>, s: S) -> Result<S::Ok, S::Error> {
631 match v {
632 Some(val) => val.to_wire_json().serialize(s),
633 None => s.serialize_none(),
634 }
635 }
636
637 pub(super) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Value>, D::Error> {
638 let j = Option::<serde_json::Value>::deserialize(d)?;
639 Ok(j.map(|v| Value::from_json(&v)))
640 }
641}
642
643mod value_map_wire {
644 use super::*;
645
646 pub(super) fn serialize<S: Serializer>(
647 m: &HashMap<String, Value>,
648 s: S,
649 ) -> Result<S::Ok, S::Error> {
650 use serde::ser::SerializeMap;
651 let mut map = s.serialize_map(Some(m.len()))?;
652 for (k, v) in m {
653 map.serialize_entry(k, &v.to_wire_json())?;
654 }
655 map.end()
656 }
657
658 pub(super) fn deserialize<'de, D: Deserializer<'de>>(
659 d: D,
660 ) -> Result<HashMap<String, Value>, D::Error> {
661 let raw = HashMap::<String, serde_json::Value>::deserialize(d)?;
662 Ok(raw
663 .into_iter()
664 .map(|(k, v)| (k, Value::from_json(&v)))
665 .collect())
666 }
667}
668
669#[derive(Debug, Serialize, Deserialize, Clone)]
670#[serde(tag = "type", content = "payload")]
671pub enum EngineEvent {
672 Log(String),
673 /// Structured log line. Parallel to [`EngineEvent::Log`] but carries a
674 /// severity so consumers (Studio's trace panel, the bench `why_failed`
675 /// runner, the future eval-failure dashboard) can highlight WARNs and
676 /// ERRORs without string-sniffing.
677 ///
678 /// Added in the "why-did-it-fail" infra round so that
679 /// `tracing::warn!` calls in providers.rs / engine.rs ALSO show up in
680 /// the execution event stream — previously they only went to the
681 /// akribes-server stdout that nobody actively watches, which is how the
682 /// `max_tokens=4096` truncation hid for a week. Pre-existing
683 /// [`EngineEvent::Log`] is retained verbatim for wire compat with
684 /// older SDKs that match on the bare-string variant.
685 ///
686 /// `level` is a free-form short string (`"WARN"`, `"ERROR"`,
687 /// `"INFO"`, …) — kept as `String` rather than an enum so a newer
688 /// engine adding `"DEBUG"` doesn't crash an older SDK.
689 LogLevel {
690 level: String,
691 message: String,
692 },
693 StateUpdate(String, #[serde(with = "value_wire")] Value),
694 WorkflowStart(usize), // total tasks
695 TaskStart(String, Option<String>), // (name, on_error policy label)
696 TaskPrompt(String, String), // (task_name, rendered_prompt)
697 TaskEnd {
698 task: String,
699 on_error_label: Option<String>,
700 #[serde(with = "value_wire")]
701 value: Value,
702 /// The declared return type of the task, if any. `None` when the task
703 /// has no `-> Type` annotation (e.g. plain `str` tasks or untyped tasks).
704 value_type: Option<TypeRef>,
705 duration: std::time::Duration,
706 /// 1-indexed attempt count: `1` = first call succeeded, `2` = first
707 /// validation retry succeeded, etc. Resets on task-level `on_error`
708 /// retries (which are orthogonal to validation retries).
709 attempt: u8,
710 usage: Option<TokenUsage>,
711 /// How the task finished. Explicit discriminator so consumers don't
712 /// have to inspect `value` to distinguish Success from Unable.
713 /// `#[serde(default)]` keeps pre-#206 wire payloads deserialising
714 /// cleanly (they become [`TaskEndVariant::Success`]). Forward-compat
715 /// for later expansion (e.g. `Partial` in #205) is provided by
716 /// [`TaskEndVariant::Unknown`].
717 #[serde(default)]
718 variant: TaskEndVariant,
719 },
720 AgentOutput {
721 task_name: String,
722 agent_name: Option<String>,
723 task_id: String,
724 schema_type: Option<String>,
725 chunk: String,
726 },
727 /// Streaming extended-thinking / reasoning fragment from the LLM,
728 /// emitted alongside `AgentOutput` for providers that interleave
729 /// reasoning blocks into a streamed response (Anthropic extended
730 /// thinking, OpenAI o-series + GPT-5 reasoning, Gemini 2.5 thinking).
731 ///
732 /// Issue #1176. Pre-fix the streaming path collapsed every non-text
733 /// rig variant into an empty `AgentOutput` chunk; reasoning content
734 /// was simply lost, so Studio's "thinking" inline panel stayed dark
735 /// for streamed runs even when `reasoning_tokens` showed up on the
736 /// trailing usage. This variant surfaces the same content
737 /// non-streaming consumers see via the provider's message-level
738 /// reasoning block, but as it arrives.
739 ///
740 /// Fields mirror [`Self::AgentOutput`] so a consumer that wants to
741 /// render reasoning identically to text can switch on the variant
742 /// name only. The `chunk` is plain text — encrypted / redacted
743 /// reasoning blocks (Anthropic's opaque thinking signatures) are
744 /// dropped at the provider boundary rather than surfaced here, as
745 /// they have no plain-text equivalent.
746 AgentReasoning {
747 task_name: String,
748 agent_name: Option<String>,
749 task_id: String,
750 /// Mirrors [`Self::AgentOutput::schema_type`] — the declared
751 /// return type, when present. Reasoning is associated with the
752 /// task the engine is currently dispatching.
753 schema_type: Option<String>,
754 chunk: String,
755 },
756 /// Auto cache-breakpoint engine emitted a placement decision for
757 /// the upcoming dispatch. One event per Anthropic structured-output
758 /// call; absent for non-Anthropic providers and for any dispatch
759 /// where `EngineOptions::auto_cache_enabled` is `false`.
760 ///
761 /// Fields mirror `engine_cache::CachePlan` plus a human-readable
762 /// agent label. Captured by the rig path (and any in-process
763 /// observer) to track cache-hit rates across iterations.
764 ///
765 /// # All four marker slots are now reported
766 ///
767 /// Anthropic's 4-marker per-request budget covers four canonical
768 /// slots: `tools`, `system`, `user-msg #1`, `user-msg #2`. Issue
769 /// #472 item 1 brought the tools and system slots under engine
770 /// ownership; the two `*_marker_placed` boolean fields below
771 /// report the engine's decision for each slot. Pre-#472 payloads
772 /// reported only the user-message slots (`markers_placed` /
773 /// `markers_placed_at`); the new booleans default to `false` on
774 /// older wire payloads via `#[serde(default)]`, which preserves
775 /// the historical "engine reports user-message markers only" view
776 /// for legacy consumers reading the JSON.
777 ///
778 /// # Engine intent vs. wire-level cache footprint
779 ///
780 /// Every `*_marker_placed` field describes the engine's INTENT to
781 /// stamp `cache_control` on that block. Anthropic's response is
782 /// what determines actual `cache_creation_input_tokens` /
783 /// `cache_read_input_tokens`. Anthropic enforces a per-prefix
784 /// minimum cacheable size (1024 tokens for sonnet/haiku-class
785 /// models). When a `cache_control` marker sits at a prefix BELOW
786 /// that minimum (e.g. the system block by itself is ~30 tokens),
787 /// Anthropic extends the cache write forward to the next eligible
788 /// boundary. So a "system marker only" cold dispatch can still
789 /// report `cache_creation_input_tokens` ≈ entire prompt size.
790 /// This is Anthropic's documented behavior; the engine's plan is
791 /// correct, but the `*_marker_placed` fields describe INTENT, not
792 /// the resulting wire-level cache footprint.
793 ///
794 /// Set `AKRIBES_DEBUG_CACHE_BODY=1` on the process running the
795 /// dispatcher to print a per-call summary of which body sections
796 /// carry markers.
797 CachePlanned {
798 /// Agent name as declared in the source (`agent classifier`).
799 agent: String,
800 /// Number of segments the engine assembled for this dispatch.
801 /// Includes the static prefix (docstrings/rules/examples), one
802 /// segment per `{placeholder}` boundary in the body template,
803 /// and the trailing structured-output instruction.
804 n_segments: usize,
805 /// How many segments the engine considered "stable" — i.e.
806 /// either previously cached for this agent or marked stable by
807 /// the DAG-aware peek (referenced by an upcoming dispatch).
808 n_stable: usize,
809 /// Total character length of the longest stable run at the
810 /// HEAD of the segment list. `0` when no leading prefix was
811 /// stable.
812 longest_stable_prefix_len_chars: usize,
813 /// Number of `cache_control` markers actually placed on the
814 /// user message. `0`, `1`, or `2` (Anthropic's user-message
815 /// budget within the 4-marker per-request cap).
816 ///
817 /// Engine-level intent only; see the type-level docstring
818 /// above for the relationship to wire markers and Anthropic's
819 /// observed `cache_creation_input_tokens`.
820 markers_placed: usize,
821 /// Segment indices the engine stamped with a `cache_control`
822 /// marker, sorted ascending. Length always equals
823 /// `markers_placed`. Empty on the cold-cache / no-stable-prefix
824 /// paths. Captured for the DAG-aware integration tests so they
825 /// can assert the boundary picked by the placement algorithm
826 /// without scraping the outbound HTTP body.
827 ///
828 /// `#[serde(default)]` for backwards compat with payloads
829 /// emitted before the field existed.
830 #[serde(default)]
831 markers_placed_at: Vec<usize>,
832 /// Whether the engine asked the provider to stamp
833 /// `cache_control` on the `tools` block (issue #472 item 1).
834 /// `true` on every Anthropic structured-output dispatch
835 /// today — the synthetic `return_result` tool is stable per
836 /// task and always benefits from caching. Reserved for
837 /// future per-call unique-tool flows that may flip it to
838 /// `false`.
839 ///
840 /// Provider name the dispatch is going to (`anthropic`,
841 /// `openai`, `gemini`, ...). Lets SDK consumers route each
842 /// `CachePlanned` event to a provider-specific renderer
843 /// without scraping `agent` / model state. Issue #1019:
844 /// before this field every event was an Anthropic event,
845 /// invisible for OpenAI / Gemini caching paths. Carries the
846 /// lower-cased provider id so renderers match against
847 /// `"anthropic"` / `"openai"` / `"gemini"` directly. Empty
848 /// string in old wire payloads (the `#[serde(default)]`
849 /// default).
850 #[serde(default)]
851 provider: String,
852 /// `#[serde(default)]` for backwards compat with payloads
853 /// emitted before the field existed (pre-#472 payloads
854 /// silently treat this as `false`, which under-reports the
855 /// real wire footprint — the field is the canonical truth
856 /// from this PR onward).
857 #[serde(default)]
858 tools_marker_placed: bool,
859 /// Whether the engine asked the provider to stamp
860 /// `cache_control` on the `system` block (issue #472
861 /// item 1). `true` on any dispatch with a non-empty agent
862 /// system prompt. Reserved for future one-shot
863 /// system-prompt flows (e.g. agent definitions that include
864 /// per-call data) that may flip it to `false`.
865 ///
866 /// `#[serde(default)]` for backwards compat — see
867 /// `tools_marker_placed`.
868 #[serde(default)]
869 system_marker_placed: bool,
870 },
871 Suspended {
872 checkpoint_name: String,
873 token: String,
874 prompt: String,
875 schema: serde_json::Value,
876 actor_hint: ActorHint,
877 timeout_secs: Option<u64>,
878 /// Why we suspended. Defaults to [`SuspendTrigger::DagPosition`] on
879 /// older wire payloads that omit the field (e.g. an older server
880 /// serializing against a pre-Stream-6 SDK, or vice versa). The
881 /// `#[serde(default)]` here is what guarantees backwards compat.
882 #[serde(default)]
883 trigger: SuspendTrigger,
884 /// `Some(ctx)` when the suspension fired inside a `loop` block's
885 /// per-turn dispatch — see [`LoopSuspendContext`] for the carried
886 /// fields. `None` for the existing top-level / DAG checkpoint
887 /// path, which is the wire shape every pre-loop-checkpoint SDK
888 /// emits. `#[serde(default, skip_serializing_if = "Option::is_none")]`
889 /// keeps both directions wire-compatible: old servers serializing
890 /// against new SDKs simply omit the field, and new servers emit it
891 /// only when there's a loop context to carry.
892 #[serde(default, skip_serializing_if = "Option::is_none")]
893 loop_context: Option<LoopSuspendContext>,
894 },
895 Resumed {
896 checkpoint_name: String,
897 token: String,
898 },
899 /// Terminal event of a workflow run.
900 ///
901 /// # Aggregate rollup (issue #1173)
902 ///
903 /// Pre-#1173 this was the tuple variant `WorkflowEnd(Value)` whose
904 /// payload was the bare workflow output. Consumers re-walked every
905 /// `TaskEnd` to compute totals; cluster dashboards and the CLI's
906 /// trailer line did this on every execution. The variant still has
907 /// a single payload slot — a [`WorkflowEndPayload`] — but the
908 /// payload itself now carries both the output value AND a
909 /// [`WorkflowTotals`] rollup so consumers can size the run without
910 /// touching the per-task stream.
911 ///
912 /// # Wire compatibility
913 ///
914 /// New wire shape:
915 /// `{"type": "WorkflowEnd", "payload": {"value": <output>, "total_input_tokens": N, ...}}`.
916 /// Legacy wire shape (pre-#1173):
917 /// `{"type": "WorkflowEnd", "payload": <output>}` — payload IS the
918 /// bare output value.
919 ///
920 /// [`WorkflowEndPayload`] implements `Serialize`/`Deserialize` by
921 /// hand so it accepts both shapes: new shape when the payload is a
922 /// JSON object with a `value` key plus at least one `total_*`
923 /// aggregate key, legacy bare-value otherwise. Aggregate fields
924 /// default to `0` on legacy reads; serialization always emits the
925 /// new shape.
926 WorkflowEnd(WorkflowEndPayload),
927 /// Structured failure surfaced to subscribers (SSE, WebSocket, OTel).
928 /// `message` and `kind` are kept for back-compat; `code`,
929 /// `user_message`, `retry_after_ms`, and `source` carry the richer
930 /// detail consumers should branch on. New construction goes through
931 /// [`EngineEvent::error_from_detail`] so all fields stay in sync.
932 Error {
933 message: String,
934 kind: ErrorKind,
935 #[serde(default = "default_error_code_other")]
936 code: ErrorCode,
937 #[serde(default)]
938 user_message: String,
939 #[serde(default, skip_serializing_if = "Option::is_none")]
940 retry_after_ms: Option<u64>,
941 #[serde(default, skip_serializing_if = "ErrorSource::is_empty")]
942 source: ErrorSource,
943 },
944 NodeStart(NodeId, Span),
945 NodeEnd {
946 node_id: NodeId,
947 span: Span,
948 target_var: Option<String>,
949 #[serde(with = "opt_value_wire")]
950 value: Option<Value>,
951 duration: std::time::Duration,
952 },
953 Breakpoint {
954 node_id: NodeId,
955 span: Span,
956 token: String,
957 #[serde(with = "value_map_wire")]
958 env_snapshot: std::collections::HashMap<String, Value>,
959 },
960 BreakpointResumed {
961 node_id: NodeId,
962 token: String,
963 },
964 ToolCallStart {
965 task_name: String,
966 tool_name: String,
967 server_name: String,
968 input: serde_json::Value,
969 /// LLM-issued `tool_use_id`. Empty string on pre-durable-execution
970 /// payloads (preserved by `#[serde(default)]`). Always present on
971 /// events written by v1+ engines; the cache lookup at the
972 /// `ToolCallEnd` site keys on this.
973 #[serde(default)]
974 tool_use_id: String,
975 },
976 ToolCallEnd {
977 task_name: String,
978 tool_name: String,
979 #[serde(default)]
980 tool_use_id: String,
981 output: serde_json::Value,
982 duration: std::time::Duration,
983 },
984 /// An MCP server's circuit breaker tripped open.
985 McpServerDegraded {
986 alias: String,
987 reason: String,
988 },
989 /// An MCP server recovered (circuit breaker closed again).
990 McpServerRecovered {
991 alias: String,
992 },
993 /// A destructive MCP tool invocation is awaiting operator approval.
994 /// Mirrors the checkpoint suspension protocol — the execution is
995 /// resumed by `POST /executions/{id}/resume` with
996 /// `{ approve: bool, args_override?: Value }` keyed on `token`.
997 ToolApprovalPending {
998 execution_id: Option<String>,
999 node_id: Option<u64>,
1000 token: String,
1001 tool_ref: String,
1002 args: serde_json::Value,
1003 },
1004 /// Audit-trail companion to [`EngineEvent::ToolApprovalPending`]
1005 /// (issue #857). Emitted on every resume path — both approval and
1006 /// rejection — so trace replay can reconstruct the decision and
1007 /// Studio's approval inbox can render a checkmark/X next to the
1008 /// resolved row. Without this event the only way to distinguish
1009 /// "approved" from "rejected" after the fact is to observe whether
1010 /// a subsequent `ToolCallStart` fired against the same token,
1011 /// which is fragile and loses the rejection reason.
1012 ToolApprovalResolved {
1013 /// Matches the `token` on the originating
1014 /// [`EngineEvent::ToolApprovalPending`].
1015 token: String,
1016 /// True on approve, false on reject.
1017 approved: bool,
1018 /// Operator-supplied argument override on approval, when the
1019 /// approver chose to edit the tool args before dispatch. None
1020 /// on rejection or on plain approve-as-proposed.
1021 #[serde(default, skip_serializing_if = "Option::is_none")]
1022 args_override: Option<serde_json::Value>,
1023 /// Optional reason string the approver attached to the decision.
1024 /// Surfaced by Studio's inbox; persisted for compliance audits.
1025 #[serde(default, skip_serializing_if = "Option::is_none")]
1026 reason: Option<String>,
1027 },
1028 /// Emitted when a tool that would normally have required approval
1029 /// was dispatched without prompting because a pre-configured policy
1030 /// (allowlist / read-only classification / project-level
1031 /// auto-approval) covered it. Operator-audit gap closer (issue
1032 /// #1110): without this event the only way to spot auto-approval
1033 /// is to compare the policy config against the trace stream
1034 /// out-of-band.
1035 ///
1036 /// `reason` is a short policy-driven discriminator (e.g.
1037 /// `"policy:read_only"`, `"policy:allowlisted"`,
1038 /// `"policy:internal"`) so SDK consumers can render a badge
1039 /// without re-classifying the tool themselves.
1040 ToolApprovalSkipped {
1041 execution_id: Option<String>,
1042 node_id: Option<u64>,
1043 tool_ref: String,
1044 reason: String,
1045 },
1046 /// Emitted during durable replay when the replay cache holds a
1047 /// `ToolCallStart` for a given `tool_use_id` but no matching
1048 /// `ToolCallEnd`. The tool MAY have fired once on a previous run before
1049 /// the server crashed mid-call, and we are about to re-fire it — which
1050 /// will cause a duplicate side effect for non-idempotent tools (e.g.
1051 /// `send_email`, `create_pr`).
1052 ///
1053 /// This previously emitted only a `tracing::warn!(target =
1054 /// "tool_replay_uncertain", ...)`, which is invisible to SDK / Studio
1055 /// consumers. The structured event lets the operator surface a
1056 /// "replay-uncertain tool" badge inline in the execution timeline and
1057 /// decide whether to acknowledge before the re-run continues (#872).
1058 ///
1059 /// `args` is the raw JSON-encoded tool input from the cached start
1060 /// event so the operator can compare against the impending re-fire's
1061 /// args. `#[serde(default)]` on the deserializer keeps wire-compat
1062 /// with pre-#872 events that omit the variant entirely (they decode
1063 /// through the SDK's `Other` catch-all).
1064 ToolReplayUncertain {
1065 execution_id: Option<String>,
1066 tool_use_id: String,
1067 tool_name: String,
1068 #[serde(default)]
1069 args: serde_json::Value,
1070 },
1071 VerificationStart {
1072 workflow_name: String,
1073 },
1074 VerificationResult {
1075 workflow_name: String,
1076 results: serde_json::Value,
1077 duration: std::time::Duration,
1078 },
1079 /// A structured-output task's response failed validation. Emitted in
1080 /// addition to the existing `Log` line so SDK consumers without the new
1081 /// event still render the human-readable summary, but tooling that knows
1082 /// about this variant can render the model's actual response, the
1083 /// schema-validator's structured error breakdown, and the provider's
1084 /// `stop_reason` (so e.g. a `max_tokens` truncation isn't misdiagnosed
1085 /// as "schema overflow" — see issue #320).
1086 ///
1087 /// Fields:
1088 /// * `task_name` — the task whose validation failed.
1089 /// * `attempt` — 1-indexed attempt number (1 = first call, 2 = first
1090 /// retry, …).
1091 /// * `model_response` — the raw text / JSON-serialized tool input the
1092 /// model emitted, exactly as the validator saw it. May be empty (`""`)
1093 /// or `"{}"` when the model truncated mid-output.
1094 ///
1095 /// **Issue #1139 — bounded.** Capped at
1096 /// [`VALIDATION_FAILURE_RESPONSE_CAP_BYTES`] bytes on emit. If the
1097 /// raw response was longer, `truncated` is `true` and `total_length`
1098 /// carries the original byte count so consumers can surface "model
1099 /// emitted 4 MB; first 64 KB shown" instead of silently logging a
1100 /// megabyte to `execution_events` three times in a retry loop.
1101 /// * `truncated` — `true` when `model_response` was truncated to fit
1102 /// the cap. `#[serde(default)]` keeps pre-#1139 wire payloads
1103 /// decoding cleanly (they decode as `false`).
1104 /// * `total_length` — original byte length of the model response
1105 /// before truncation. Equal to `model_response.len()` when
1106 /// `truncated` is `false`. `#[serde(default)]` for wire-compat.
1107 /// * `missing_fields` — JSON-pointer paths to required fields the schema
1108 /// validator flagged as absent.
1109 /// * `extra_fields` — paths to fields rejected by `additionalProperties:
1110 /// false`.
1111 /// * `type_errors` — human-readable type / value mismatches (e.g.
1112 /// `"expected string, got null at /name"`). Includes any non-missing,
1113 /// non-additional-property schema errors plus parse / custom-validator
1114 /// messages.
1115 /// * `stop_reason` — the provider's stop reason, when known. For
1116 /// Anthropic this is `"end_turn"` / `"max_tokens"` / `"tool_use"` etc.
1117 /// `None` when the upstream call didn't surface one.
1118 ValidationFailure {
1119 task_name: String,
1120 attempt: u32,
1121 model_response: String,
1122 #[serde(default)]
1123 truncated: bool,
1124 #[serde(default)]
1125 total_length: u64,
1126 missing_fields: Vec<String>,
1127 extra_fields: Vec<String>,
1128 type_errors: Vec<String>,
1129 stop_reason: Option<String>,
1130 },
1131 /// Envelope that wraps an event emitted by a sub-script invoked from a
1132 /// parent task via the `call("name", inputs={...})` script-composition
1133 /// primitive (roadmap item A).
1134 ///
1135 /// # Flat wire shape (issue #993)
1136 ///
1137 /// Before #993 the envelope nested a `Box<EngineEvent>` per call-stack
1138 /// level, so a depth-N chain produced a single event whose serialized
1139 /// size was `O(N)` (a 10-level fanout could push one SSE frame past a
1140 /// megabyte and choke reconnect logic in cluster cards). The new shape
1141 /// is flat: `child` is always the LEAF event the innermost sub-engine
1142 /// emitted (never another `SubScript`), and `parent_path` carries the
1143 /// outer ancestor chain by id.
1144 ///
1145 /// * `script_name` — the immediately-running sub-script (innermost
1146 /// frame). Same field semantics as the legacy wire shape.
1147 /// * `parent_task` — the variable name on the immediate-parent side
1148 /// that received the call's result.
1149 /// * `parent_node_id` / `attempt` — the immediate parent's
1150 /// call(...) node id + author-raise attempt counter (#845).
1151 /// * `parent_path` — ordered `[outermost, ..., immediate_parent]`;
1152 /// empty when the current sub-script is a direct child of the
1153 /// top-level workflow.
1154 /// * `child` — the leaf event the innermost sub-engine emitted.
1155 /// Boxed to keep the variant cheap on the stack. Field name kept
1156 /// for wire-compat with pre-#993 consumers; the meaning narrowed
1157 /// from "wrapped event (potentially another SubScript)" to "leaf
1158 /// event".
1159 ///
1160 /// Consumers that want the full call tree walk `parent_path` once
1161 /// and consume `child` as a leaf. The top-level reducer in each SDK
1162 /// is responsible for stitching frames back into a tree.
1163 ///
1164 /// # Back-compat read path
1165 ///
1166 /// Legacy emissions (pre-#993) put a nested `SubScript` inside
1167 /// `child`, with `parent_path` absent or empty. [`Self::flatten_subscript_chain`]
1168 /// is the canonical post-deserialize step that walks any nested
1169 /// SubScripts into `parent_path`, leaving the resulting envelope
1170 /// in the new flat form. The Rust SDK runs it on every inbound
1171 /// event before projecting to `WorkflowEvent`; the TS and Python
1172 /// SDKs flatten equivalently inside their reducers.
1173 SubScript {
1174 /// Script name of the innermost (currently emitting) sub-script.
1175 script_name: String,
1176 /// Parent-side variable name that received the immediate parent's
1177 /// call result.
1178 parent_task: String,
1179 /// Compiler-stable id of the immediate parent's call(...) node.
1180 /// Lets SDK consumers correlate retries of the same call site
1181 /// (issue #845). `#[serde(default)]` keeps pre-#845 wire payloads
1182 /// decoding cleanly.
1183 #[serde(default, skip_serializing_if = "Option::is_none")]
1184 parent_node_id: Option<u64>,
1185 /// 1-indexed attempt counter for author-raise retries at the
1186 /// same call site. See [`SubScriptFrame::attempt`] for the same
1187 /// semantics on ancestor frames.
1188 #[serde(default, skip_serializing_if = "Option::is_none")]
1189 attempt: Option<u8>,
1190 /// Ancestor chain from outermost to immediate parent. Empty
1191 /// when the emitting sub-script sits directly under the
1192 /// top-level workflow. Skipped from the wire when empty so a
1193 /// depth-1 envelope stays as compact as it was pre-#993, and
1194 /// `#[serde(default)]` keeps pre-#993 payloads (which omit the
1195 /// field) decoding cleanly.
1196 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1197 parent_path: Vec<SubScriptFrame>,
1198 /// The leaf event the innermost sub-engine emitted. New
1199 /// emissions guarantee this is never another `SubScript`; legacy
1200 /// payloads may still nest one — call [`EngineEvent::flatten_subscript_chain`]
1201 /// to normalize.
1202 child: Box<EngineEvent>,
1203 },
1204 /// A `loop` block began executing. Emitted exactly once per loop call,
1205 /// before any `LoopTurn`. `max_turns` is the resolved upper-bound budget
1206 /// (declared `max_turns:` if present, else the engine's
1207 /// `LOOP_MAX_TURNS_DEFAULT`). Additive event — older SDKs treat the
1208 /// JSON payload as an unknown variant and ignore it.
1209 LoopStart {
1210 name: String,
1211 max_turns: u32,
1212 },
1213 /// A single turn of a `loop` block settled (provider call returned and
1214 /// every `tool_use` block was dispatched). `turn` is 1-indexed.
1215 /// `tool_calls` is the names of the tools the model invoked this turn,
1216 /// in dispatch order — including the synthetic `state_get`,
1217 /// `state_update`, `return` and any user `skills:` entries.
1218 LoopTurn {
1219 name: String,
1220 turn: u32,
1221 tool_calls: Vec<String>,
1222 /// Per-turn token usage reported by the provider for the
1223 /// dispatch this turn produced. `None` on providers that don't
1224 /// surface per-call usage (mock provider) and on pre-#829
1225 /// wire payloads. Lets Studio's loop card show "turn 3 spent
1226 /// 4500 tokens" without walking the wrapped `LLMResponse`
1227 /// sub-tree of events.
1228 #[serde(default, skip_serializing_if = "Option::is_none")]
1229 usage: Option<TokenUsage>,
1230 },
1231 /// A `loop` block exited. `value` is the agent's submitted return value
1232 /// (from `return(...)`), the final state on natural `stop_when` exit
1233 /// without a return, or a `Value::FatalError` envelope when the loop
1234 /// exhausted its `max_turns` budget without ever calling `return`.
1235 /// `turn_count` is the number of turns actually executed (1-indexed).
1236 LoopEnd {
1237 name: String,
1238 turn_count: u32,
1239 #[serde(with = "value_wire")]
1240 value: Value,
1241 },
1242 /// Emitted when a compaction step runs — once per primitive
1243 /// activation. `provider_native: true` means Anthropic / OpenAI
1244 /// performed the compaction server-side; the engine surfaces the
1245 /// before/after counts from the response. `strategy` is the
1246 /// primitive name (`drop_thinking_blocks`, `drop_oldest_tool_results`,
1247 /// `summarize_to_state`, `provider_native`) or the user task name
1248 /// for a custom compactor task.
1249 ///
1250 /// `cache_ttl` is `Some("5m")` or `Some("1h")` on the Anthropic
1251 /// `provider_native` path (the engine pins `ttl: "1h"` via the
1252 /// `extended-cache-ttl-2025-04-11` beta header — see
1253 /// `providers.rs:1772`), `None` for every non-native compaction
1254 /// primitive (the engine-driven primitives don't write any cache
1255 /// block themselves). Downstream cost dashboards multiply
1256 /// cache-write tokens by the 1h-vs-5m rate (issue #1130); without
1257 /// this field the wire envelope leaves the TTL ambiguous.
1258 /// `#[serde(default, skip_serializing_if = "Option::is_none")]`
1259 /// keeps pre-#1130 wire payloads decodable as `cache_ttl = None`.
1260 ///
1261 /// See the compaction design at
1262 /// `docs/superpowers/specs/2026-05-12-compaction-design.md` for the
1263 /// "Observability + cost" contract; the chain emits one
1264 /// `ContextCompacted` per primitive activation and `ContextOverflow`
1265 /// at chain exhaustion.
1266 ContextCompacted {
1267 agent: String,
1268 loop_id: Option<String>,
1269 turn: Option<u32>,
1270 threshold_pct: Option<u8>,
1271 threshold_abs: Option<u32>,
1272 strategy: String,
1273 before_tokens: u32,
1274 after_tokens: u32,
1275 provider_native: bool,
1276 #[serde(default, skip_serializing_if = "Option::is_none")]
1277 cache_ttl: Option<String>,
1278 },
1279 /// Emitted when the compaction chain runs to exhaustion (or when
1280 /// `compaction: none` and the request would exceed the model context).
1281 /// Carries the chain log so users can diagnose which primitives ran
1282 /// before the engine gave up.
1283 ///
1284 /// `terminated_by_hard_error` distinguishes the user-authored
1285 /// `hard_error()` early-exit (issue #1056) from the implicit
1286 /// chain-exhaustion exit. SDKs can render a stronger "author
1287 /// intentionally bailed out at threshold X" message instead of the
1288 /// generic "all arms exhausted" copy.
1289 ///
1290 /// `#[serde(default)]` on the new field keeps pre-#1056 wire
1291 /// payloads decodable (they decode as `terminated_by_hard_error =
1292 /// false`, which matches their original semantics — every old
1293 /// emission was a chain-exhaustion exit).
1294 ContextOverflow {
1295 agent: String,
1296 attempted_strategies: Vec<String>,
1297 configured_cap_tokens: u32,
1298 model_context_window: u32,
1299 /// `true` when the chain hit a user-authored `hard_error()`
1300 /// step; `false` when the chain ran past every step without
1301 /// freeing enough budget. See the type-level docs for the
1302 /// distinction and the back-compat default.
1303 #[serde(default)]
1304 terminated_by_hard_error: bool,
1305 },
1306 /// Emitted by the engine when a task's result is served from
1307 /// [`crate::engine_persistent_cache::PersistentTaskCache`] instead of
1308 /// being dispatched to a provider. Lets trace inspectors, the bench
1309 /// UI, and the MCP show "stages 1-3 were free, stage 4 ran" without
1310 /// having to parse prompt-segment internals.
1311 ///
1312 /// `agent` is the agent name the task ran under (matches
1313 /// [`EngineEvent::AgentOutput::agent_name`] for the same task).
1314 /// `key_prefix` is the first 6 hex chars of the (u64) cache key so
1315 /// consumers can cluster repeated hits without persisting the full
1316 /// key. The prefix is informational only — collisions are harmless,
1317 /// the key itself is the source of truth.
1318 ///
1319 /// Emitted on the cache-hit branch of the engine's task dispatch
1320 /// loop, before the corresponding `TaskEnd` event for the same task.
1321 TaskCacheHit {
1322 agent: String,
1323 key_prefix: String,
1324 },
1325 /// LLM provider response captured for durable replay. Carries the full
1326 /// response (text + tool-use blocks + usage) keyed by `(node_id,
1327 /// call_index)`. See `crates/akribes-core/src/replay_cache.rs`.
1328 LLMResponse {
1329 node_id: String,
1330 call_index: u32,
1331 text: String,
1332 tool_calls: Vec<CachedToolCall>,
1333 usage: Option<TokenUsage>,
1334 },
1335 /// Emitted on the cache-hit branch of `call_provider_*` —
1336 /// distinguishes "this task's underlying LLM call was served from
1337 /// the replay cache" from "the task's full result was served from
1338 /// the persistent task cache" (the latter already has
1339 /// [`EngineEvent::TaskCacheHit`]). Issue #815: a reconnecting SSE
1340 /// client otherwise can't tell a cache-served task from a slow
1341 /// first-token, because the engine deliberately does not re-emit
1342 /// streaming AgentOutput chunks on replay.
1343 ///
1344 /// `node_id` + `call_index` mirror the keying of the underlying
1345 /// `LLMResponse` variant so consumers can correlate the hit with
1346 /// the cached response.
1347 LLMReplayCacheHit {
1348 node_id: String,
1349 call_index: u32,
1350 },
1351 /// A child execution row was just inserted at the parent's `call(...)`
1352 /// node. The parent's event log carries this *intent* event; the
1353 /// child's own log records its lifecycle independently.
1354 SubScriptSpawned {
1355 child_execution_id: String,
1356 parent_node_id: String,
1357 args: serde_json::Value,
1358 },
1359 /// Child execution finished and the parent observed its terminal
1360 /// state. Synthesised by the parent reading the child's terminal
1361 /// event; never written by the child itself.
1362 SubScriptResult {
1363 parent_node_id: String,
1364 child_execution_id: String,
1365 outcome: ChildOutcome,
1366 },
1367 /// A `Suspended` checkpoint resolved. Written when `POST
1368 /// /executions/:id/resume` lands a payload; replay re-derives the
1369 /// engine state from this event instead of re-suspending.
1370 CheckpointResolution {
1371 checkpoint_id: String,
1372 payload: serde_json::Value,
1373 },
1374 /// Emitted when the engine starts dispatching a `runtime` block (the
1375 /// container code-execution construct — see the
1376 /// "AI-driven container code execution" feature). One event per
1377 /// runtime call site, before any `RuntimeStdout`/`RuntimeStderr`
1378 /// chunks. `task_name` is the call-site identifier the engine uses
1379 /// to attribute the call (matches the wrapping `TaskStart`/`TaskEnd`
1380 /// pair's `task` field so SDK reducers that group by task keep
1381 /// working). `runtime_name` is the declared `runtime NAME(...)`
1382 /// identifier. `language` is the source-form keyword
1383 /// (`"python"` / `"bash"` / `"node"` / `"rust"` / `"java"`).
1384 RuntimeStart {
1385 task_name: String,
1386 runtime_name: String,
1387 language: String,
1388 },
1389 /// A chunk of stdout produced by the running sandbox. `chunk` is the
1390 /// raw byte slice decoded as UTF-8 (lossy). Multiple events fire in
1391 /// arrival order; SDK reducers concatenate to reconstruct the full
1392 /// stream.
1393 RuntimeStdout {
1394 task_name: String,
1395 chunk: String,
1396 },
1397 /// A chunk of stderr produced by the running sandbox. Mirrors
1398 /// [`EngineEvent::RuntimeStdout`] for the error stream.
1399 RuntimeStderr {
1400 task_name: String,
1401 chunk: String,
1402 },
1403 /// The runtime call finished successfully. `exit_code` is the
1404 /// container's process exit code (0 for clean exit; non-zero on
1405 /// crash / panic / explicit non-zero exit). `duration_ms` is the
1406 /// wall-clock time the sandbox reported between dispatch and exit
1407 /// — does not include the time the engine spent waiting on its own
1408 /// semaphore.
1409 RuntimeEnd {
1410 task_name: String,
1411 exit_code: i32,
1412 duration_ms: u64,
1413 },
1414 /// The runtime call failed before producing an exit code.
1415 /// `kind` is a stable wire-form discriminator:
1416 /// `"NotConfigured"` (no sandbox URL set),
1417 /// `"Timeout"` (execution exceeded the declared timeout),
1418 /// `"SandboxUnavailable"` (network / connect error to the sandbox),
1419 /// `"OomKilled"` (the container hit its memory cap),
1420 /// `"Internal"` (any other sandbox-side failure).
1421 /// `message` is human-readable detail forwarded from the sandbox's
1422 /// `error` SSE event (or synthesised by the engine for client-side
1423 /// errors like `NotConfigured`).
1424 RuntimeError {
1425 task_name: String,
1426 kind: String,
1427 message: String,
1428 },
1429}
1430
1431/// Cap applied to [`EngineEvent::ValidationFailure::model_response`]
1432/// on emit (issue #1139). Set to 64 KiB — enough to capture the
1433/// validator's view of any reasonable model output while keeping
1434/// `execution_events` rows bounded. A 4 MB tool input on a three-retry
1435/// validation loop used to bloat the persisted log by 12 MB per task;
1436/// post-cap the same loop emits at most 192 KB.
1437pub const VALIDATION_FAILURE_RESPONSE_CAP_BYTES: usize = 64 * 1024;
1438
1439impl EngineEvent {
1440 /// Flatten any legacy nested [`EngineEvent::SubScript`] chain into the
1441 /// new `parent_path + child(leaf)` shape (issue #993).
1442 ///
1443 /// Pre-#993 emissions wrapped each call-stack level in its own
1444 /// `SubScript` envelope (`SubScript { child: Box<SubScript{ child: ... }> }`).
1445 /// The new shape carries the chain via `parent_path` and reserves
1446 /// `child` for the innermost leaf event. This method walks down
1447 /// `child` for as long as it is itself a `SubScript`, accumulating
1448 /// each frame into a growing `parent_path` (so the resulting list
1449 /// reads `[outermost_ancestor, …, immediate_parent_of_leaf]`), and
1450 /// finally sets `child` to the recovered leaf.
1451 ///
1452 /// Events that are not `SubScript` envelopes are returned unchanged.
1453 /// SDK consumers (Rust / TS / Python) call this on every inbound
1454 /// event so reducers see a uniform flat shape regardless of which
1455 /// engine version emitted the wire log.
1456 pub fn flatten_subscript_chain(self) -> Self {
1457 // We treat the current SubScript as a stack of `(script_name,
1458 // parent_task, parent_node_id, attempt)` frames anchored above
1459 // a leaf. Walk inward, accumulating frames; the OUTER frame at
1460 // each step is the immediate parent of the next inward step.
1461 let (script_name, parent_task, parent_node_id, attempt, outer_path, child) = match self {
1462 EngineEvent::SubScript {
1463 script_name,
1464 parent_task,
1465 parent_node_id,
1466 attempt,
1467 parent_path,
1468 child,
1469 } => (
1470 script_name,
1471 parent_task,
1472 parent_node_id,
1473 attempt,
1474 parent_path,
1475 child,
1476 ),
1477 other => return other,
1478 };
1479
1480 // Path order is `[outermost_ancestor, ..., immediate_parent_of_innermost]`.
1481 // `outer_path` is already in that order (legacy emissions have it empty).
1482 // The current top-level frame (cur_*) sits AFTER everything in outer_path
1483 // and is the immediate parent of whatever lives in `child`. As we walk
1484 // inward, cur_* moves down into `outer_path`'s suffix.
1485 let mut frames: Vec<SubScriptFrame> = outer_path;
1486 let mut cur_script = script_name;
1487 let mut cur_task = parent_task;
1488 let mut cur_node = parent_node_id;
1489 let mut cur_attempt = attempt;
1490 let mut cur_child: Box<EngineEvent> = child;
1491
1492 loop {
1493 match *cur_child {
1494 EngineEvent::SubScript {
1495 script_name: inner_script,
1496 parent_task: inner_task,
1497 parent_node_id: inner_node,
1498 attempt: inner_attempt,
1499 parent_path: inner_path,
1500 child: inner_child,
1501 } => {
1502 // Promote cur_* into the frame list — it now sits
1503 // ABOVE the inner frame in the ancestor chain.
1504 // Order: existing frames, then any frames the inner
1505 // envelope already carried in its own parent_path
1506 // (legacy emissions have this empty), then cur_*.
1507 frames.extend(inner_path);
1508 frames.push(SubScriptFrame {
1509 script_name: cur_script,
1510 parent_task: cur_task,
1511 parent_node_id: cur_node,
1512 attempt: cur_attempt,
1513 });
1514 cur_script = inner_script;
1515 cur_task = inner_task;
1516 cur_node = inner_node;
1517 cur_attempt = inner_attempt;
1518 cur_child = inner_child;
1519 }
1520 leaf => {
1521 return EngineEvent::SubScript {
1522 script_name: cur_script,
1523 parent_task: cur_task,
1524 parent_node_id: cur_node,
1525 attempt: cur_attempt,
1526 parent_path: frames,
1527 child: Box::new(leaf),
1528 };
1529 }
1530 }
1531 }
1532 }
1533
1534 /// Build a [`EngineEvent::ValidationFailure`] with `model_response`
1535 /// capped to [`VALIDATION_FAILURE_RESPONSE_CAP_BYTES`] (issue
1536 /// #1139). The original byte length is preserved on `total_length`
1537 /// and `truncated` is set when the cap fired so consumers can
1538 /// surface "first 64 KB shown; original was N bytes" without
1539 /// having to re-derive it. UTF-8 boundary is respected — the
1540 /// truncation slices at the closest char boundary below the cap.
1541 pub fn validation_failure(
1542 task_name: impl Into<String>,
1543 attempt: u32,
1544 model_response: String,
1545 missing_fields: Vec<String>,
1546 extra_fields: Vec<String>,
1547 type_errors: Vec<String>,
1548 stop_reason: Option<String>,
1549 ) -> Self {
1550 let total_length = model_response.len() as u64;
1551 let (response, truncated) = if model_response.len() > VALIDATION_FAILURE_RESPONSE_CAP_BYTES
1552 {
1553 // Walk down to the nearest char boundary so we don't
1554 // produce a fragment of a multi-byte UTF-8 sequence.
1555 let mut end = VALIDATION_FAILURE_RESPONSE_CAP_BYTES;
1556 while end > 0 && !model_response.is_char_boundary(end) {
1557 end -= 1;
1558 }
1559 (model_response[..end].to_string(), true)
1560 } else {
1561 (model_response, false)
1562 };
1563 EngineEvent::ValidationFailure {
1564 task_name: task_name.into(),
1565 attempt,
1566 model_response: response,
1567 truncated,
1568 total_length,
1569 missing_fields,
1570 extra_fields,
1571 type_errors,
1572 stop_reason,
1573 }
1574 }
1575
1576 /// Build an [`EngineEvent::Error`] from a fully-formed
1577 /// [`crate::error::ErrorDetail`]. Use this at every error-emission
1578 /// site so SDK / OTel / DB consumers all see the same structured
1579 /// fields.
1580 pub fn error(detail: crate::error::ErrorDetail) -> Self {
1581 EngineEvent::Error {
1582 message: detail.message,
1583 kind: detail.kind,
1584 code: detail.code,
1585 user_message: detail.user_message,
1586 retry_after_ms: detail.retry_after_ms,
1587 source: detail.source,
1588 }
1589 }
1590
1591 /// Quick constructor for sites that don't yet have a specific
1592 /// [`ErrorCode`]. Picks the closest "Other" code for the kind via
1593 /// [`crate::error::ErrorDetail::from_kind`].
1594 pub fn error_kind(kind: ErrorKind, message: impl Into<String>) -> Self {
1595 EngineEvent::error(crate::error::ErrorDetail::from_kind(kind, message))
1596 }
1597
1598 /// Quick constructor from a specific [`ErrorCode`].
1599 pub fn error_code(code: ErrorCode, message: impl Into<String>) -> Self {
1600 EngineEvent::error(crate::error::ErrorDetail::new(code, message))
1601 }
1602}
1603
1604#[cfg(test)]
1605mod tests {
1606 use super::*;
1607 use crate::ast::TypeRef;
1608 use crate::value::Value;
1609
1610 #[test]
1611 fn task_end_event_serializes_with_value_and_value_type_and_attempt() {
1612 let e = EngineEvent::TaskEnd {
1613 task: "t".into(),
1614 on_error_label: None,
1615 value: Value::String("x".into()),
1616 value_type: Some(TypeRef::primitive("int")),
1617 duration: std::time::Duration::from_millis(10),
1618 attempt: 2,
1619 usage: None,
1620 variant: TaskEndVariant::Success,
1621 };
1622 let s = serde_json::to_string(&e).unwrap();
1623 assert!(
1624 s.contains("\"value\""),
1625 "serialized event should contain 'value' key: {}",
1626 s
1627 );
1628 assert!(
1629 s.contains("\"value_type\""),
1630 "serialized event should contain 'value_type' key: {}",
1631 s
1632 );
1633 assert!(
1634 s.contains("\"attempt\":2"),
1635 "serialized event should contain 'attempt':2: {}",
1636 s
1637 );
1638 assert!(
1639 s.contains("\"variant\":\"success\""),
1640 "variant should round-trip as snake_case 'success': {}",
1641 s
1642 );
1643 }
1644
1645 #[test]
1646 fn task_end_value_emits_clean_wire_form_not_tagged_value() {
1647 // Spec: `docs/src/content/docs/reference/engine-events.mdx` — every
1648 // `Value`-carrying field on an `EngineEvent` serializes as clean
1649 // JSON (the form `Value::to_wire_json` produces), not the internal
1650 // tagged-enum form (`{"String": ...}`, `{"Object": {...}}`).
1651 let mut obj = std::collections::HashMap::new();
1652 obj.insert("exit_code".to_string(), Value::Int(0));
1653 obj.insert("stdout".to_string(), Value::String("hi\n".into()));
1654 let e = EngineEvent::TaskEnd {
1655 task: "run".into(),
1656 on_error_label: None,
1657 value: Value::Object(obj),
1658 value_type: None,
1659 duration: std::time::Duration::from_millis(10),
1660 attempt: 1,
1661 usage: None,
1662 variant: TaskEndVariant::Success,
1663 };
1664 let j: serde_json::Value =
1665 serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
1666 let value = &j["payload"]["value"];
1667 assert_eq!(value["exit_code"], serde_json::json!(0));
1668 assert_eq!(value["stdout"], serde_json::json!("hi\n"));
1669 assert!(
1670 value.get("Object").is_none(),
1671 "wire form must not carry the tagged-enum 'Object' key: {value}",
1672 );
1673 assert!(
1674 value["exit_code"].get("Int").is_none(),
1675 "wire form must not carry the tagged-enum 'Int' key for scalars: {value}",
1676 );
1677 }
1678
1679 #[test]
1680 fn workflow_end_payload_emits_clean_wire_form_not_tagged_value() {
1681 // Issue #1173 reshape: `WorkflowEnd.payload` is now an object
1682 // with a `value` sub-field carrying the workflow's clean output
1683 // alongside the aggregate `total_*` rollup. The output under
1684 // `value` is still in the clean wire form (no `"Object"` /
1685 // `"Int"` tagged-enum keys) per the engine-events reference.
1686 let mut obj = std::collections::HashMap::new();
1687 obj.insert("exit_code".to_string(), Value::Int(0));
1688 obj.insert("duration_ms".to_string(), Value::Int(12));
1689 let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
1690 let j: serde_json::Value =
1691 serde_json::from_str(&serde_json::to_string(&e).unwrap()).unwrap();
1692 let payload = &j["payload"];
1693 // The workflow output is now nested under `payload.value`.
1694 let value = &payload["value"];
1695 assert_eq!(value["exit_code"], serde_json::json!(0));
1696 assert_eq!(value["duration_ms"], serde_json::json!(12));
1697 assert!(
1698 value.get("Object").is_none(),
1699 "wire form must not carry the tagged-enum 'Object' key: {value}",
1700 );
1701 // Aggregate rollup is present and defaults to zero on a
1702 // payload built without explicit totals.
1703 assert_eq!(payload["total_input_tokens"], serde_json::json!(0));
1704 assert_eq!(payload["total_output_tokens"], serde_json::json!(0));
1705 assert_eq!(payload["task_count"], serde_json::json!(0));
1706 }
1707
1708 #[test]
1709 fn workflow_end_value_deserialise_round_trip_keeps_clean_shape() {
1710 // A consumer that round-trips `EngineEvent` via serde (e.g. the
1711 // durable event log on the server) should see the same clean JSON
1712 // shape both directions. The inner `Value` is reconstructed via
1713 // `Value::from_json`, which preserves shape but not semantic
1714 // variants (`Object`/`Int`/`String`/`List`/`Null`/`Decimal`/`Bool`).
1715 let mut obj = std::collections::HashMap::new();
1716 obj.insert("exit_code".to_string(), Value::Int(0));
1717 obj.insert("stdout".to_string(), Value::String("hi".into()));
1718 let e = EngineEvent::WorkflowEnd(WorkflowEndPayload::new(Value::Object(obj)));
1719 let s = serde_json::to_string(&e).unwrap();
1720 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1721 let s_again = serde_json::to_string(&back).unwrap();
1722 // Re-serialising the round-tripped event must produce identical JSON,
1723 // i.e. the wire form is its own fixed point.
1724 assert_eq!(s, s_again, "wire form should round-trip without drift");
1725 }
1726
1727 #[test]
1728 fn workflow_end_back_compat_legacy_bare_value_payload_parses() {
1729 // Issue #1173 back-compat: an event log captured against a
1730 // pre-#1173 server emits `payload` as the bare workflow value.
1731 // The custom `Deserialize` impl must accept that shape and
1732 // default the aggregate rollup to all-zero.
1733 let legacy = r#"{"type":"WorkflowEnd","payload":{"exit_code":0,"stdout":"hi"}}"#;
1734 let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
1735 match evt {
1736 EngineEvent::WorkflowEnd(payload) => {
1737 // Output recovered from bare payload.
1738 assert!(matches!(payload.value, Value::Object(_)));
1739 // Totals default to zero on legacy reads.
1740 assert_eq!(payload.totals.total_input_tokens, 0);
1741 assert_eq!(payload.totals.total_output_tokens, 0);
1742 assert_eq!(payload.totals.task_count, 0);
1743 }
1744 other => panic!("expected WorkflowEnd, got {other:?}"),
1745 }
1746 }
1747
1748 #[test]
1749 fn workflow_end_scalar_legacy_payload_parses_as_bare_value() {
1750 // Edge case: a workflow that returns a scalar (e.g.
1751 // `return "hi"`) emits `payload: "hi"` on the legacy wire.
1752 // Make sure the deserializer doesn't try to interpret that
1753 // as the new struct shape.
1754 let legacy = r#"{"type":"WorkflowEnd","payload":"hi"}"#;
1755 let evt: EngineEvent = serde_json::from_str(legacy).unwrap();
1756 match evt {
1757 EngineEvent::WorkflowEnd(payload) => {
1758 assert!(matches!(payload.value, Value::String(ref s) if s == "hi"));
1759 }
1760 other => panic!("expected WorkflowEnd, got {other:?}"),
1761 }
1762 }
1763
1764 #[test]
1765 fn workflow_end_new_wire_shape_round_trips_with_totals() {
1766 // The new wire shape carries both `value` and `total_*` keys.
1767 // Round-trip serialize → deserialize → serialize must preserve
1768 // every field exactly.
1769 let totals = WorkflowTotals {
1770 total_input_tokens: 1234,
1771 total_output_tokens: 567,
1772 total_cached_input_tokens: 100,
1773 total_thinking_tokens: 25,
1774 total_tool_tokens: 0,
1775 total_cost_usd: 0.42,
1776 task_count: 3,
1777 };
1778 let mut obj = std::collections::HashMap::new();
1779 obj.insert("answer".to_string(), Value::Int(42));
1780 let payload = WorkflowEndPayload::with_totals(Value::Object(obj), totals.clone());
1781 let evt = EngineEvent::WorkflowEnd(payload);
1782 let s = serde_json::to_string(&evt).unwrap();
1783 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1784 match back {
1785 EngineEvent::WorkflowEnd(p) => {
1786 assert_eq!(p.totals.total_input_tokens, 1234);
1787 assert_eq!(p.totals.total_output_tokens, 567);
1788 assert_eq!(p.totals.total_cached_input_tokens, 100);
1789 assert_eq!(p.totals.total_thinking_tokens, 25);
1790 assert_eq!(p.totals.task_count, 3);
1791 assert!((p.totals.total_cost_usd - 0.42).abs() < 1e-9);
1792 }
1793 other => panic!("expected WorkflowEnd, got {other:?}"),
1794 }
1795 }
1796
1797 #[test]
1798 fn workflow_totals_accumulate_folds_token_usage() {
1799 // Engine's `emit(TaskEnd)` path delegates to `accumulate`;
1800 // each call adds usage tokens and bumps `task_count` even
1801 // when usage is `None`.
1802 let mut tot = WorkflowTotals::default();
1803 tot.accumulate(Some(&TokenUsage {
1804 input_tokens: 100,
1805 output_tokens: 50,
1806 cached_input_tokens: 10,
1807 reasoning_tokens: 7,
1808 ..Default::default()
1809 }));
1810 tot.accumulate(Some(&TokenUsage {
1811 input_tokens: 200,
1812 output_tokens: 80,
1813 cached_input_tokens: 30,
1814 reasoning_tokens: 3,
1815 ..Default::default()
1816 }));
1817 tot.accumulate(None);
1818 assert_eq!(tot.total_input_tokens, 300);
1819 assert_eq!(tot.total_output_tokens, 130);
1820 assert_eq!(tot.total_cached_input_tokens, 40);
1821 assert_eq!(tot.total_thinking_tokens, 10);
1822 assert_eq!(tot.task_count, 3);
1823 }
1824
1825 #[test]
1826 fn sub_script_flatten_chain_collapses_legacy_nested_envelopes() {
1827 // Build the legacy 3-deep shape:
1828 // SubScript{ name=A, child= SubScript{ name=B, child= SubScript{ name=C, child=Log("hi") } } }
1829 // Expected after `flatten_subscript_chain`:
1830 // SubScript{
1831 // name=C, parent_path=[A, B], child=Log("hi")
1832 // }
1833 let leaf = EngineEvent::Log("hi".into());
1834 let depth3 = EngineEvent::SubScript {
1835 script_name: "C".into(),
1836 parent_task: "c_task".into(),
1837 parent_node_id: Some(3),
1838 attempt: None,
1839 parent_path: Vec::new(),
1840 child: Box::new(leaf),
1841 };
1842 let depth2 = EngineEvent::SubScript {
1843 script_name: "B".into(),
1844 parent_task: "b_task".into(),
1845 parent_node_id: Some(2),
1846 attempt: None,
1847 parent_path: Vec::new(),
1848 child: Box::new(depth3),
1849 };
1850 let legacy = EngineEvent::SubScript {
1851 script_name: "A".into(),
1852 parent_task: "a_task".into(),
1853 parent_node_id: Some(1),
1854 attempt: None,
1855 parent_path: Vec::new(),
1856 child: Box::new(depth2),
1857 };
1858
1859 let flat = legacy.flatten_subscript_chain();
1860 match flat {
1861 EngineEvent::SubScript {
1862 script_name,
1863 parent_task,
1864 parent_node_id,
1865 parent_path,
1866 child,
1867 ..
1868 } => {
1869 // Innermost frame surfaces on the envelope itself.
1870 assert_eq!(script_name, "C");
1871 assert_eq!(parent_task, "c_task");
1872 assert_eq!(parent_node_id, Some(3));
1873 // Ancestor chain: [outermost A, then B].
1874 assert_eq!(parent_path.len(), 2);
1875 assert_eq!(parent_path[0].script_name, "A");
1876 assert_eq!(parent_path[0].parent_task, "a_task");
1877 assert_eq!(parent_path[0].parent_node_id, Some(1));
1878 assert_eq!(parent_path[1].script_name, "B");
1879 assert_eq!(parent_path[1].parent_task, "b_task");
1880 assert_eq!(parent_path[1].parent_node_id, Some(2));
1881 // Leaf is the original Log event.
1882 assert!(matches!(*child, EngineEvent::Log(ref s) if s == "hi"));
1883 }
1884 other => panic!("expected SubScript, got {other:?}"),
1885 }
1886 }
1887
1888 #[test]
1889 fn sub_script_flatten_leaves_already_flat_envelopes_alone() {
1890 // A SubScript whose `child` is already a leaf event is the
1891 // canonical shape — no transformation needed.
1892 let evt = EngineEvent::SubScript {
1893 script_name: "child".into(),
1894 parent_task: "result".into(),
1895 parent_node_id: Some(7),
1896 attempt: Some(1),
1897 parent_path: Vec::new(),
1898 child: Box::new(EngineEvent::Log("hi".into())),
1899 };
1900 let flat = evt.flatten_subscript_chain();
1901 match flat {
1902 EngineEvent::SubScript {
1903 parent_path, child, ..
1904 } => {
1905 assert!(parent_path.is_empty());
1906 assert!(matches!(*child, EngineEvent::Log(_)));
1907 }
1908 other => panic!("expected SubScript, got {other:?}"),
1909 }
1910 }
1911
1912 #[test]
1913 fn sub_script_flatten_no_op_for_non_subscript_events() {
1914 // Non-SubScript events pass through `flatten_subscript_chain`
1915 // unchanged.
1916 let evt = EngineEvent::Log("hi".into());
1917 let flat = evt.flatten_subscript_chain();
1918 assert!(matches!(flat, EngineEvent::Log(ref s) if s == "hi"));
1919 }
1920
1921 #[test]
1922 fn task_end_event_value_type_none_serializes_null() {
1923 let e = EngineEvent::TaskEnd {
1924 task: "t".into(),
1925 on_error_label: None,
1926 value: Value::Null,
1927 value_type: None,
1928 duration: std::time::Duration::from_millis(5),
1929 attempt: 1,
1930 usage: None,
1931 variant: TaskEndVariant::Success,
1932 };
1933 let s = serde_json::to_string(&e).unwrap();
1934 assert!(s.contains("\"attempt\":1"), "attempt should be 1: {}", s);
1935 // value_type: None serializes as null
1936 assert!(
1937 s.contains("\"value_type\":null"),
1938 "value_type should be null: {}",
1939 s
1940 );
1941 }
1942
1943 #[test]
1944 fn task_end_event_without_variant_deserializes_as_success() {
1945 // Wire-compat guard: pre-#206 servers emit `TaskEnd` with no
1946 // `variant` field. `#[serde(default)]` must carry it to `Success`.
1947 //
1948 // The `value` field uses the clean wire form spec'd in
1949 // `docs/src/content/docs/reference/engine-events.mdx` — a bare JSON
1950 // value, not the engine's internal tagged `{"String": "x"}` shape.
1951 let json = r#"{
1952 "type": "TaskEnd",
1953 "payload": {
1954 "task": "t",
1955 "on_error_label": null,
1956 "value": "x",
1957 "value_type": null,
1958 "duration": {"secs": 0, "nanos": 10000000},
1959 "attempt": 1,
1960 "usage": null
1961 }
1962 }"#;
1963 let e: EngineEvent = serde_json::from_str(json).unwrap();
1964 match e {
1965 EngineEvent::TaskEnd { variant, .. } => {
1966 assert_eq!(variant, TaskEndVariant::Success);
1967 }
1968 _ => panic!("expected TaskEnd"),
1969 }
1970 }
1971
1972 #[test]
1973 fn task_end_event_with_unable_variant_roundtrips() {
1974 let e = EngineEvent::TaskEnd {
1975 task: "decompose".into(),
1976 on_error_label: None,
1977 value: Value::Unable(crate::value::UnableRecord {
1978 reason: "image too blurry".into(),
1979 missing: vec!["claim_text".into()],
1980 category: crate::value::UnableCategory::InputAmbiguous,
1981 }),
1982 value_type: None,
1983 duration: std::time::Duration::from_millis(10),
1984 attempt: 1,
1985 usage: None,
1986 variant: TaskEndVariant::Unable,
1987 };
1988 let s = serde_json::to_string(&e).unwrap();
1989 assert!(s.contains("\"variant\":\"unable\""), "{s}");
1990 let back: EngineEvent = serde_json::from_str(&s).unwrap();
1991 match back {
1992 EngineEvent::TaskEnd { variant, .. } => {
1993 assert_eq!(variant, TaskEndVariant::Unable);
1994 }
1995 _ => panic!("expected TaskEnd"),
1996 }
1997 }
1998
1999 #[test]
2000 fn task_end_variant_unknown_discriminator_deserializes_as_unknown() {
2001 // Forward-compat: a newer engine adds (say) `partial` for #205 and
2002 // an older SDK must not crash. `#[serde(other)]` routes the unknown
2003 // tag to `TaskEndVariant::Unknown`.
2004 let json = r#"{
2005 "type": "TaskEnd",
2006 "payload": {
2007 "task": "t",
2008 "on_error_label": null,
2009 "value": null,
2010 "value_type": null,
2011 "duration": {"secs": 0, "nanos": 0},
2012 "attempt": 1,
2013 "usage": null,
2014 "variant": "partial"
2015 }
2016 }"#;
2017 let e: EngineEvent = serde_json::from_str(json).unwrap();
2018 match e {
2019 EngineEvent::TaskEnd { variant, .. } => {
2020 assert_eq!(variant, TaskEndVariant::Unknown);
2021 }
2022 _ => panic!("expected TaskEnd"),
2023 }
2024 }
2025
2026 #[test]
2027 fn suspended_event_with_validation_exhausted_trigger_serializes() {
2028 let e = EngineEvent::Suspended {
2029 checkpoint_name: "human_review".into(),
2030 token: "tok".into(),
2031 prompt: "Please review".into(),
2032 schema: serde_json::json!({"type": "integer"}),
2033 actor_hint: ActorHint::Human,
2034 timeout_secs: Some(3600),
2035 trigger: SuspendTrigger::ValidationExhausted {
2036 task_name: "decompose_claims".into(),
2037 retry_count: 3,
2038 last_attempt: "{\"bad\": true}".into(),
2039 validation_errors: vec![ValidationErrorWire {
2040 stage: "schema".into(),
2041 message: "required property \"number\" missing".into(),
2042 path: Some("/0".into()),
2043 }],
2044 },
2045 loop_context: None,
2046 };
2047 let s = serde_json::to_string(&e).unwrap();
2048 assert!(s.contains("\"kind\":\"ValidationExhausted\""), "{s}");
2049 assert!(s.contains("\"retry_count\":3"), "{s}");
2050 assert!(s.contains("\"task_name\":\"decompose_claims\""), "{s}");
2051 assert!(s.contains("\"stage\":\"schema\""), "{s}");
2052 // `loop_context` is None on a top-level checkpoint; the field
2053 // skips serialization so older SDKs see the same wire shape.
2054 assert!(!s.contains("\"loop_context\""), "{s}");
2055 }
2056
2057 #[test]
2058 fn suspended_event_with_loop_context_roundtrips() {
2059 // Mid-loop suspension carries a `loop_context` envelope that
2060 // SDK consumers use to render the suspension in the loop's UI
2061 // lane and that the spawn handler persists alongside the event.
2062 let e = EngineEvent::Suspended {
2063 checkpoint_name: "review".into(),
2064 token: "tok".into(),
2065 prompt: "Triage skill failure".into(),
2066 schema: serde_json::json!({}),
2067 actor_hint: ActorHint::Human,
2068 timeout_secs: None,
2069 trigger: SuspendTrigger::AgentUnable {
2070 task_name: "summarize".into(),
2071 unable: crate::value::UnableRecord {
2072 reason: "input ambiguous".into(),
2073 missing: vec![],
2074 category: crate::value::UnableCategory::InputAmbiguous,
2075 },
2076 },
2077 loop_context: Some(LoopSuspendContext {
2078 loop_id: "11111111-2222-3333-4444-555555555555".into(),
2079 loop_name: "research".into(),
2080 turn: 2,
2081 }),
2082 };
2083 let s = serde_json::to_string(&e).unwrap();
2084 assert!(s.contains("\"loop_context\""), "{s}");
2085 assert!(s.contains("\"loop_name\":\"research\""), "{s}");
2086 assert!(s.contains("\"turn\":2"), "{s}");
2087 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2088 match back {
2089 EngineEvent::Suspended {
2090 loop_context: Some(ctx),
2091 ..
2092 } => {
2093 assert_eq!(ctx.loop_name, "research");
2094 assert_eq!(ctx.turn, 2);
2095 assert_eq!(ctx.loop_id, "11111111-2222-3333-4444-555555555555");
2096 }
2097 _ => panic!("expected Suspended with loop_context"),
2098 }
2099 }
2100
2101 #[test]
2102 fn suspended_event_without_trigger_deserializes_as_dag_position() {
2103 // Wire-compat guard: old servers / old SDKs omit `trigger`.
2104 // `#[serde(default)]` must carry the field to `DagPosition`.
2105 let json = r#"{
2106 "type": "Suspended",
2107 "payload": {
2108 "checkpoint_name": "cp",
2109 "token": "t",
2110 "prompt": "p",
2111 "schema": {},
2112 "actor_hint": "Human",
2113 "timeout_secs": null
2114 }
2115 }"#;
2116 let e: EngineEvent = serde_json::from_str(json).unwrap();
2117 match e {
2118 EngineEvent::Suspended { trigger, .. } => {
2119 assert!(matches!(trigger, SuspendTrigger::DagPosition));
2120 }
2121 _ => panic!("expected Suspended"),
2122 }
2123 }
2124
2125 #[test]
2126 fn suspended_event_with_dag_position_trigger_roundtrips() {
2127 let e = EngineEvent::Suspended {
2128 checkpoint_name: "cp".into(),
2129 token: "t".into(),
2130 prompt: "p".into(),
2131 schema: serde_json::json!({}),
2132 actor_hint: ActorHint::Human,
2133 timeout_secs: None,
2134 trigger: SuspendTrigger::DagPosition,
2135 loop_context: None,
2136 };
2137 let s = serde_json::to_string(&e).unwrap();
2138 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2139 match back {
2140 EngineEvent::Suspended { trigger, .. } => {
2141 assert!(matches!(trigger, SuspendTrigger::DagPosition));
2142 }
2143 _ => panic!("expected Suspended"),
2144 }
2145 }
2146
2147 #[test]
2148 fn suspend_trigger_agent_variant_roundtrips_with_payload() {
2149 // #226 M5: non-Unable arm of a discriminated union routed to a
2150 // checkpoint emits AgentVariant, carrying the variant name and
2151 // the parsed record as JSON. Studio renders a generic
2152 // "agent returned variant <X>" badge on this trigger.
2153 let trigger = SuspendTrigger::AgentVariant {
2154 task_name: "decompose".into(),
2155 variant: "ClaimErr".into(),
2156 payload: serde_json::json!({
2157 "message": "claim unsupported by evidence",
2158 "claim_id": "c-7",
2159 }),
2160 };
2161 let s = serde_json::to_string(&trigger).unwrap();
2162 assert!(s.contains("\"kind\":\"AgentVariant\""), "{s}");
2163 assert!(s.contains("\"variant\":\"ClaimErr\""), "{s}");
2164 let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
2165 match back {
2166 SuspendTrigger::AgentVariant {
2167 task_name,
2168 variant,
2169 payload,
2170 } => {
2171 assert_eq!(task_name, "decompose");
2172 assert_eq!(variant, "ClaimErr");
2173 assert_eq!(
2174 payload["message"].as_str(),
2175 Some("claim unsupported by evidence"),
2176 );
2177 }
2178 other => panic!("expected AgentVariant, got {other:?}"),
2179 }
2180 }
2181
2182 #[test]
2183 fn validation_failure_event_serializes_full_payload() {
2184 // #320: Studio + tooling consume the structured fields. Round-trip
2185 // ensures missing/extra/type breakdowns and the stop_reason carry
2186 // across the wire without losing shape.
2187 let e = EngineEvent::ValidationFailure {
2188 task_name: "classify_features".into(),
2189 attempt: 2,
2190 model_response: "{}".into(),
2191 truncated: false,
2192 total_length: 2,
2193 missing_fields: vec!["/classifications".into()],
2194 extra_fields: vec![],
2195 type_errors: vec!["expected string, got null at /summary".into()],
2196 stop_reason: Some("max_tokens".into()),
2197 };
2198 let s = serde_json::to_string(&e).unwrap();
2199 assert!(s.contains("\"type\":\"ValidationFailure\""), "{s}");
2200 assert!(s.contains("\"task_name\":\"classify_features\""), "{s}");
2201 assert!(s.contains("\"attempt\":2"), "{s}");
2202 assert!(s.contains("\"model_response\":\"{}\""), "{s}");
2203 assert!(s.contains("\"/classifications\""), "{s}");
2204 assert!(s.contains("\"stop_reason\":\"max_tokens\""), "{s}");
2205
2206 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2207 match back {
2208 EngineEvent::ValidationFailure {
2209 task_name,
2210 attempt,
2211 model_response,
2212 missing_fields,
2213 extra_fields,
2214 type_errors,
2215 stop_reason,
2216 ..
2217 } => {
2218 assert_eq!(task_name, "classify_features");
2219 assert_eq!(attempt, 2);
2220 assert_eq!(model_response, "{}");
2221 assert_eq!(missing_fields, vec!["/classifications"]);
2222 assert!(extra_fields.is_empty());
2223 assert_eq!(type_errors.len(), 1);
2224 assert_eq!(stop_reason.as_deref(), Some("max_tokens"));
2225 }
2226 other => panic!("expected ValidationFailure, got {other:?}"),
2227 }
2228 }
2229
2230 #[test]
2231 fn suspend_trigger_agent_unable_roundtrips_with_payload() {
2232 // Reserved variant — Stream 4 populates this in a follow-up. The
2233 // shape is locked here so Stream 4's engine-emit site is pure code
2234 // addition, no wire-shape churn.
2235 let trigger = SuspendTrigger::AgentUnable {
2236 task_name: "escalate".into(),
2237 unable: crate::value::UnableRecord {
2238 reason: "image too blurry to OCR".into(),
2239 missing: vec!["claim_text".into()],
2240 category: crate::value::UnableCategory::InputAmbiguous,
2241 },
2242 };
2243 let s = serde_json::to_string(&trigger).unwrap();
2244 assert!(s.contains("\"kind\":\"AgentUnable\""), "{s}");
2245 let back: SuspendTrigger = serde_json::from_str(&s).unwrap();
2246 match back {
2247 SuspendTrigger::AgentUnable { task_name, unable } => {
2248 assert_eq!(task_name, "escalate");
2249 assert_eq!(
2250 unable.category,
2251 crate::value::UnableCategory::InputAmbiguous
2252 );
2253 }
2254 other => panic!("expected AgentUnable, got {other:?}"),
2255 }
2256 }
2257
2258 #[test]
2259 fn error_event_with_code_serializes_with_code_field() {
2260 let e = EngineEvent::error(crate::error::ErrorDetail::new(
2261 crate::error::ErrorCode::ScriptDepthExceeded,
2262 "boom",
2263 ));
2264 let s = serde_json::to_string(&e).unwrap();
2265 assert!(s.contains("\"code\":\"ScriptDepthExceeded\""), "{s}");
2266 }
2267
2268 #[test]
2269 fn error_event_default_code_is_other_for_kind_only_construction() {
2270 // `error_kind` derives the code from the kind via
2271 // `ErrorDetail::from_kind`. ScriptError → ScriptError code.
2272 let e = EngineEvent::error_kind(crate::error::ErrorKind::ScriptError, "plain");
2273 let s = serde_json::to_string(&e).unwrap();
2274 assert!(s.contains("\"code\":\"ScriptError\""), "{s}");
2275 }
2276
2277 #[test]
2278 fn context_compacted_event_roundtrips_full_payload() {
2279 let e = EngineEvent::ContextCompacted {
2280 agent: "researcher".into(),
2281 loop_id: Some("11111111-2222-3333-4444-555555555555".into()),
2282 turn: Some(3),
2283 threshold_pct: Some(70),
2284 threshold_abs: Some(140_000),
2285 strategy: "drop_thinking_blocks".into(),
2286 before_tokens: 142_000,
2287 after_tokens: 96_000,
2288 provider_native: false,
2289 cache_ttl: None,
2290 };
2291 let s = serde_json::to_string(&e).unwrap();
2292 assert!(s.contains("\"type\":\"ContextCompacted\""), "{s}");
2293 assert!(s.contains("\"agent\":\"researcher\""), "{s}");
2294 assert!(s.contains("\"strategy\":\"drop_thinking_blocks\""), "{s}");
2295 assert!(s.contains("\"before_tokens\":142000"), "{s}");
2296 assert!(s.contains("\"after_tokens\":96000"), "{s}");
2297 assert!(s.contains("\"provider_native\":false"), "{s}");
2298 assert!(s.contains("\"turn\":3"), "{s}");
2299
2300 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2301 match back {
2302 EngineEvent::ContextCompacted {
2303 agent,
2304 loop_id,
2305 turn,
2306 threshold_pct,
2307 threshold_abs,
2308 strategy,
2309 before_tokens,
2310 after_tokens,
2311 provider_native,
2312 cache_ttl: _,
2313 } => {
2314 assert_eq!(agent, "researcher");
2315 assert_eq!(
2316 loop_id.as_deref(),
2317 Some("11111111-2222-3333-4444-555555555555")
2318 );
2319 assert_eq!(turn, Some(3));
2320 assert_eq!(threshold_pct, Some(70));
2321 assert_eq!(threshold_abs, Some(140_000));
2322 assert_eq!(strategy, "drop_thinking_blocks");
2323 assert_eq!(before_tokens, 142_000);
2324 assert_eq!(after_tokens, 96_000);
2325 assert!(!provider_native);
2326 }
2327 other => panic!("expected ContextCompacted, got {other:?}"),
2328 }
2329 }
2330
2331 #[test]
2332 fn context_compacted_event_provider_native_roundtrips() {
2333 // Anthropic / OpenAI server-side compaction surfaces with
2334 // provider_native = true and the loop/turn context is absent
2335 // (compaction happens inside a provider call, not at a loop
2336 // boundary).
2337 let e = EngineEvent::ContextCompacted {
2338 agent: "summarizer".into(),
2339 loop_id: None,
2340 turn: None,
2341 threshold_pct: None,
2342 threshold_abs: None,
2343 strategy: "provider_native".into(),
2344 before_tokens: 180_000,
2345 after_tokens: 42_000,
2346 provider_native: true,
2347 cache_ttl: Some("1h".to_string()),
2348 };
2349 let s = serde_json::to_string(&e).unwrap();
2350 assert!(s.contains("\"provider_native\":true"), "{s}");
2351 assert!(s.contains("\"strategy\":\"provider_native\""), "{s}");
2352 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2353 match back {
2354 EngineEvent::ContextCompacted {
2355 provider_native,
2356 loop_id,
2357 turn,
2358 ..
2359 } => {
2360 assert!(provider_native);
2361 assert!(loop_id.is_none());
2362 assert!(turn.is_none());
2363 }
2364 other => panic!("expected ContextCompacted, got {other:?}"),
2365 }
2366 }
2367
2368 #[test]
2369 fn context_overflow_event_roundtrips_full_payload() {
2370 let e = EngineEvent::ContextOverflow {
2371 agent: "researcher".into(),
2372 attempted_strategies: vec![
2373 "drop_thinking_blocks".into(),
2374 "drop_oldest_tool_results".into(),
2375 "summarize_to_state".into(),
2376 ],
2377 configured_cap_tokens: 200_000,
2378 model_context_window: 200_000,
2379 terminated_by_hard_error: false,
2380 };
2381 let s = serde_json::to_string(&e).unwrap();
2382 assert!(s.contains("\"type\":\"ContextOverflow\""), "{s}");
2383 assert!(s.contains("\"agent\":\"researcher\""), "{s}");
2384 assert!(s.contains("\"configured_cap_tokens\":200000"), "{s}");
2385 assert!(s.contains("\"model_context_window\":200000"), "{s}");
2386 assert!(s.contains("\"drop_thinking_blocks\""), "{s}");
2387
2388 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2389 match back {
2390 EngineEvent::ContextOverflow {
2391 agent,
2392 attempted_strategies,
2393 configured_cap_tokens,
2394 model_context_window,
2395 terminated_by_hard_error,
2396 } => {
2397 assert_eq!(agent, "researcher");
2398 assert_eq!(attempted_strategies.len(), 3);
2399 assert_eq!(attempted_strategies[0], "drop_thinking_blocks");
2400 assert_eq!(configured_cap_tokens, 200_000);
2401 assert_eq!(model_context_window, 200_000);
2402 assert!(!terminated_by_hard_error);
2403 }
2404 other => panic!("expected ContextOverflow, got {other:?}"),
2405 }
2406 }
2407
2408 #[test]
2409 fn runtime_start_serializes_with_expected_fields() {
2410 let ev = EngineEvent::RuntimeStart {
2411 task_name: "analyze".into(),
2412 runtime_name: "run_python".into(),
2413 language: "python".into(),
2414 };
2415 let j = serde_json::to_value(&ev).unwrap();
2416 assert_eq!(j["type"], "RuntimeStart");
2417 assert_eq!(j["payload"]["task_name"], "analyze");
2418 assert_eq!(j["payload"]["runtime_name"], "run_python");
2419 assert_eq!(j["payload"]["language"], "python");
2420
2421 let s = serde_json::to_string(&ev).unwrap();
2422 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2423 match back {
2424 EngineEvent::RuntimeStart {
2425 task_name,
2426 runtime_name,
2427 language,
2428 } => {
2429 assert_eq!(task_name, "analyze");
2430 assert_eq!(runtime_name, "run_python");
2431 assert_eq!(language, "python");
2432 }
2433 other => panic!("expected RuntimeStart, got {other:?}"),
2434 }
2435 }
2436
2437 #[test]
2438 fn runtime_stdout_stderr_roundtrip() {
2439 let stdout = EngineEvent::RuntimeStdout {
2440 task_name: "t".into(),
2441 chunk: "hello\n".into(),
2442 };
2443 let stderr = EngineEvent::RuntimeStderr {
2444 task_name: "t".into(),
2445 chunk: "warn\n".into(),
2446 };
2447 let j_out = serde_json::to_value(&stdout).unwrap();
2448 let j_err = serde_json::to_value(&stderr).unwrap();
2449 assert_eq!(j_out["type"], "RuntimeStdout");
2450 assert_eq!(j_err["type"], "RuntimeStderr");
2451 assert_eq!(j_out["payload"]["chunk"], "hello\n");
2452 assert_eq!(j_err["payload"]["chunk"], "warn\n");
2453 }
2454
2455 #[test]
2456 fn runtime_end_serializes_with_exit_code_and_duration() {
2457 let ev = EngineEvent::RuntimeEnd {
2458 task_name: "analyze".into(),
2459 exit_code: 0,
2460 duration_ms: 1234,
2461 };
2462 let j = serde_json::to_value(&ev).unwrap();
2463 assert_eq!(j["type"], "RuntimeEnd");
2464 assert_eq!(j["payload"]["exit_code"], 0);
2465 assert_eq!(j["payload"]["duration_ms"], 1234);
2466
2467 let back: EngineEvent = serde_json::from_value(j).unwrap();
2468 match back {
2469 EngineEvent::RuntimeEnd {
2470 exit_code,
2471 duration_ms,
2472 ..
2473 } => {
2474 assert_eq!(exit_code, 0);
2475 assert_eq!(duration_ms, 1234);
2476 }
2477 other => panic!("expected RuntimeEnd, got {other:?}"),
2478 }
2479 }
2480
2481 #[test]
2482 fn runtime_error_serializes_with_kind_and_message() {
2483 let ev = EngineEvent::RuntimeError {
2484 task_name: "t".into(),
2485 kind: "Timeout".into(),
2486 message: "exceeded 30s".into(),
2487 };
2488 let j = serde_json::to_value(&ev).unwrap();
2489 assert_eq!(j["type"], "RuntimeError");
2490 assert_eq!(j["payload"]["kind"], "Timeout");
2491 assert_eq!(j["payload"]["message"], "exceeded 30s");
2492 }
2493
2494 #[test]
2495 fn task_cache_hit_serializes_with_agent_and_key_prefix() {
2496 // P3: when `PersistentTaskCache::get` returns a hit, the engine
2497 // emits this event so trace inspectors / Studio / MCP can show
2498 // "stage N was cached" without parsing prompt-segment internals.
2499 let ev = EngineEvent::TaskCacheHit {
2500 agent: "Researcher".into(),
2501 key_prefix: "f7d3a9".into(),
2502 };
2503 let j = serde_json::to_value(&ev).unwrap();
2504 assert_eq!(j["type"], "TaskCacheHit");
2505 assert_eq!(j["payload"]["agent"], "Researcher");
2506 assert_eq!(j["payload"]["key_prefix"], "f7d3a9");
2507
2508 // Round-trip
2509 let s = serde_json::to_string(&ev).unwrap();
2510 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2511 match back {
2512 EngineEvent::TaskCacheHit { agent, key_prefix } => {
2513 assert_eq!(agent, "Researcher");
2514 assert_eq!(key_prefix, "f7d3a9");
2515 }
2516 other => panic!("expected TaskCacheHit, got {other:?}"),
2517 }
2518 }
2519
2520 #[test]
2521 fn validation_failure_caps_oversized_response() {
2522 // Issue #1139: emit-time cap prevents megabyte tool outputs
2523 // from bloating execution_events. A response larger than the
2524 // cap is truncated, `truncated=true`, and `total_length`
2525 // preserves the original byte count.
2526 let huge = "x".repeat(VALIDATION_FAILURE_RESPONSE_CAP_BYTES + 1024);
2527 let ev = EngineEvent::validation_failure(
2528 "t".to_string(),
2529 1,
2530 huge.clone(),
2531 vec![],
2532 vec![],
2533 vec![],
2534 None,
2535 );
2536 match ev {
2537 EngineEvent::ValidationFailure {
2538 model_response,
2539 truncated,
2540 total_length,
2541 ..
2542 } => {
2543 assert!(truncated, "should be truncated");
2544 assert_eq!(total_length, huge.len() as u64);
2545 assert!(model_response.len() <= VALIDATION_FAILURE_RESPONSE_CAP_BYTES);
2546 }
2547 other => panic!("expected ValidationFailure, got {other:?}"),
2548 }
2549 }
2550
2551 #[test]
2552 fn validation_failure_under_cap_is_unchanged() {
2553 let body = "{\"k\": \"v\"}".to_string();
2554 let ev = EngineEvent::validation_failure(
2555 "t".to_string(),
2556 1,
2557 body.clone(),
2558 vec![],
2559 vec![],
2560 vec![],
2561 None,
2562 );
2563 match ev {
2564 EngineEvent::ValidationFailure {
2565 model_response,
2566 truncated,
2567 total_length,
2568 ..
2569 } => {
2570 assert!(!truncated);
2571 assert_eq!(total_length, body.len() as u64);
2572 assert_eq!(model_response, body);
2573 }
2574 other => panic!("expected ValidationFailure, got {other:?}"),
2575 }
2576 }
2577
2578 #[test]
2579 fn tool_approval_resolved_round_trips() {
2580 // Issue #857: audit trail companion to ToolApprovalPending.
2581 let ev = EngineEvent::ToolApprovalResolved {
2582 token: "tok_abc".into(),
2583 approved: true,
2584 args_override: Some(serde_json::json!({"safe": true})),
2585 reason: Some("operator approved".into()),
2586 };
2587 let s = serde_json::to_string(&ev).unwrap();
2588 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2589 match back {
2590 EngineEvent::ToolApprovalResolved {
2591 token,
2592 approved,
2593 reason,
2594 ..
2595 } => {
2596 assert_eq!(token, "tok_abc");
2597 assert!(approved);
2598 assert_eq!(reason.as_deref(), Some("operator approved"));
2599 }
2600 other => panic!("expected ToolApprovalResolved, got {other:?}"),
2601 }
2602 }
2603
2604 #[test]
2605 fn tool_approval_skipped_round_trips() {
2606 // Issue #1110: auto-approval audit gap closer.
2607 let ev = EngineEvent::ToolApprovalSkipped {
2608 execution_id: Some("exec_1".into()),
2609 node_id: Some(7),
2610 tool_ref: "gh.list_issues".into(),
2611 reason: "policy:read_only".into(),
2612 };
2613 let s = serde_json::to_string(&ev).unwrap();
2614 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2615 match back {
2616 EngineEvent::ToolApprovalSkipped {
2617 tool_ref, reason, ..
2618 } => {
2619 assert_eq!(tool_ref, "gh.list_issues");
2620 assert_eq!(reason, "policy:read_only");
2621 }
2622 other => panic!("expected ToolApprovalSkipped, got {other:?}"),
2623 }
2624 }
2625
2626 #[test]
2627 fn tool_replay_uncertain_round_trips() {
2628 // Issue #872: structured event for durable-replay tool ambiguity.
2629 let args = serde_json::json!({"channel": "general", "text": "hi"});
2630 let ev = EngineEvent::ToolReplayUncertain {
2631 execution_id: Some("exec_42".into()),
2632 tool_use_id: "tu_abc".into(),
2633 tool_name: "send_message".into(),
2634 args: args.clone(),
2635 };
2636 let s = serde_json::to_string(&ev).unwrap();
2637 assert!(s.contains("\"type\":\"ToolReplayUncertain\""), "{s}");
2638 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2639 match back {
2640 EngineEvent::ToolReplayUncertain {
2641 execution_id,
2642 tool_use_id,
2643 tool_name,
2644 args: a,
2645 } => {
2646 assert_eq!(execution_id.as_deref(), Some("exec_42"));
2647 assert_eq!(tool_use_id, "tu_abc");
2648 assert_eq!(tool_name, "send_message");
2649 assert_eq!(a, args);
2650 }
2651 other => panic!("expected ToolReplayUncertain, got {other:?}"),
2652 }
2653 }
2654
2655 #[test]
2656 fn llm_replay_cache_hit_round_trips() {
2657 // Issue #815: explicit replay-cache-hit signal for SDK consumers.
2658 let ev = EngineEvent::LLMReplayCacheHit {
2659 node_id: "n42".into(),
2660 call_index: 3,
2661 };
2662 let s = serde_json::to_string(&ev).unwrap();
2663 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2664 match back {
2665 EngineEvent::LLMReplayCacheHit {
2666 node_id,
2667 call_index,
2668 } => {
2669 assert_eq!(node_id, "n42");
2670 assert_eq!(call_index, 3);
2671 }
2672 other => panic!("expected LLMReplayCacheHit, got {other:?}"),
2673 }
2674 }
2675
2676 #[test]
2677 fn loop_turn_carries_usage_when_present_and_omits_when_none() {
2678 // Issue #829: per-turn token usage on LoopTurn lets consumers
2679 // skip walking the LLMResponse sub-tree for per-turn cost.
2680 let ev = EngineEvent::LoopTurn {
2681 name: "review".into(),
2682 turn: 4,
2683 tool_calls: vec!["fetch".into()],
2684 usage: Some(TokenUsage {
2685 input_tokens: 100,
2686 output_tokens: 25,
2687 model: "claude".into(),
2688 provider: "anthropic".into(),
2689 ..Default::default()
2690 }),
2691 };
2692 let s = serde_json::to_string(&ev).unwrap();
2693 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2694 match back {
2695 EngineEvent::LoopTurn { usage, .. } => {
2696 let u = usage.expect("usage present");
2697 assert_eq!(u.input_tokens, 100);
2698 assert_eq!(u.output_tokens, 25);
2699 }
2700 other => panic!("expected LoopTurn, got {other:?}"),
2701 }
2702 // `None` usage omits the field from the wire shape.
2703 let ev2 = EngineEvent::LoopTurn {
2704 name: "review".into(),
2705 turn: 1,
2706 tool_calls: vec![],
2707 usage: None,
2708 };
2709 let s2 = serde_json::to_string(&ev2).unwrap();
2710 assert!(
2711 !s2.contains("\"usage\""),
2712 "usage should be skipped when None: {s2}"
2713 );
2714 }
2715
2716 #[test]
2717 fn sub_script_carries_parent_node_id_and_attempt() {
2718 // Issue #845: parent retry attribution on SubScript envelopes.
2719 let inner = EngineEvent::Log("hi".into());
2720 let ev = EngineEvent::SubScript {
2721 script_name: "child".into(),
2722 parent_task: "result".into(),
2723 parent_node_id: Some(13),
2724 attempt: Some(2),
2725 parent_path: Vec::new(),
2726 child: Box::new(inner),
2727 };
2728 let s = serde_json::to_string(&ev).unwrap();
2729 let back: EngineEvent = serde_json::from_str(&s).unwrap();
2730 match back {
2731 EngineEvent::SubScript {
2732 parent_node_id,
2733 attempt,
2734 ..
2735 } => {
2736 assert_eq!(parent_node_id, Some(13));
2737 assert_eq!(attempt, Some(2));
2738 }
2739 other => panic!("expected SubScript, got {other:?}"),
2740 }
2741 }
2742
2743 #[test]
2744 fn sub_script_back_compat_omits_new_fields_when_none() {
2745 // The new fields default to None and are skipped from the
2746 // wire shape when unset — preserving wire compat for pre-#845
2747 // payloads.
2748 let inner = EngineEvent::Log("hi".into());
2749 let ev = EngineEvent::SubScript {
2750 script_name: "child".into(),
2751 parent_task: "result".into(),
2752 parent_node_id: None,
2753 attempt: None,
2754 parent_path: Vec::new(),
2755 child: Box::new(inner),
2756 };
2757 let s = serde_json::to_string(&ev).unwrap();
2758 assert!(
2759 !s.contains("\"parent_node_id\""),
2760 "should be omitted when None: {s}"
2761 );
2762 assert!(
2763 !s.contains("\"attempt\""),
2764 "should be omitted when None: {s}"
2765 );
2766 }
2767
2768 #[test]
2769 fn token_usage_raw_stop_reason_round_trips() {
2770 // Issue #1077: `raw_stop_reason` is a new field that mirrors
2771 // `stop_reason` when the parse path produced the usage. It must
2772 // serialize alongside the canonical field and round-trip with
2773 // serde-default for back-compat.
2774 let u = TokenUsage {
2775 input_tokens: 10,
2776 output_tokens: 5,
2777 model: "claude-sonnet-4-6".into(),
2778 provider: "anthropic".into(),
2779 stop_reason: Some("end_turn".into()),
2780 raw_stop_reason: Some("end_turn".into()),
2781 ..Default::default()
2782 };
2783 let s = serde_json::to_string(&u).unwrap();
2784 assert!(s.contains("\"raw_stop_reason\":\"end_turn\""), "{s}");
2785 let back: TokenUsage = serde_json::from_str(&s).unwrap();
2786 assert_eq!(back.raw_stop_reason.as_deref(), Some("end_turn"));
2787 assert_eq!(back.stop_reason.as_deref(), Some("end_turn"));
2788 }
2789
2790 #[test]
2791 fn token_usage_raw_stop_reason_back_compat_pre_field() {
2792 // A wire payload predating #1077 omits `raw_stop_reason` entirely.
2793 // It must decode as `None` rather than failing.
2794 let json = r#"{
2795 "input_tokens": 10,
2796 "output_tokens": 5,
2797 "model": "m",
2798 "provider": "p",
2799 "cached_input_tokens": 0,
2800 "stop_reason": "stop"
2801 }"#;
2802 let u: TokenUsage = serde_json::from_str(json).unwrap();
2803 assert_eq!(u.raw_stop_reason, None);
2804 assert_eq!(u.stop_reason.as_deref(), Some("stop"));
2805 }
2806}