Skip to main content

akribes_sdk/
events.rs

1//! Normalized, client-friendly workflow events.
2//!
3//! The raw wire type is [`akribes_types::event::EngineEvent`] (re-exported as
4//! [`crate::EngineEvent`]) — use that for layer-1 consumers who want every
5//! variant exactly as the engine emits it. For most workflow-driving
6//! consumers, prefer [`WorkflowEvent`]: it types the high-traffic variants
7//! with real fields and collapses the long tail into a single [`Other`]
8//! variant so the SDK stays forward-compatible when the engine adds new
9//! events.
10//!
11//! [`Other`]: WorkflowEvent::Other
12
13use std::collections::HashMap;
14use std::time::Duration;
15
16use akribes_types::event::{EngineEvent, TokenUsage};
17
18use crate::models::engine_event_type_name;
19use crate::runtime::{
20    RuntimeEndPayload, RuntimeErrorPayload, RuntimeEvent, RuntimeStartPayload,
21    RuntimeStderrPayload, RuntimeStdoutPayload,
22};
23use crate::suspend::SuspendTrigger;
24use crate::task_end::TaskEndVariant;
25
26/// A normalized, client-friendly engine event.
27///
28/// High-traffic variants (`Start`, `End`, `TaskStart`, `TaskEnd`,
29/// `AgentChunk`, `ToolCallStart`, `ToolCallEnd`, the three suspensions and
30/// `Error`) are typed with real fields. Everything else is collapsed into
31/// [`Other`](Self::Other), which preserves the wire type name and the raw
32/// JSON payload so consumers can still inspect it.
33#[derive(Debug, Clone)]
34pub enum WorkflowEvent {
35    Start {
36        total_tasks: usize,
37    },
38    End {
39        output: serde_json::Value,
40        duration: Duration,
41        /// Aggregate token + cost rollup across every `TaskEnd` in the
42        /// workflow scope (issue #1173). Always present in the typed
43        /// projection — when the upstream `EngineEvent::WorkflowEnd`
44        /// carries the default `WorkflowTotals` (legacy bare-value
45        /// wire shape, or a workflow that ran no `TaskEnd`s), every
46        /// field is zero. Consumers can pull total tokens off this
47        /// without re-walking the per-task stream.
48        totals: akribes_types::event::WorkflowTotals,
49    },
50    TaskStart {
51        task: String,
52        on_error: Option<String>,
53    },
54    TaskEnd {
55        task: String,
56        output: serde_json::Value,
57        duration: Duration,
58        usage: Option<TokenUsage>,
59        /// How the task finished. `Success` for the ordinary path; `Unable`
60        /// when the agent emitted a canonical `{"unable": ...}` envelope and
61        /// the flow's `on unable <target>` trailer (or default `fail`) took
62        /// over. `Unknown` is the forward-compat catch-all for variants a
63        /// newer akribes-core might add — see [`TaskEndVariant`].
64        variant: TaskEndVariant,
65    },
66    AgentChunk {
67        task: String,
68        agent: Option<String>,
69        task_id: String,
70        chunk: String,
71    },
72    ToolCallStart {
73        task: String,
74        tool: String,
75        server: String,
76        input: serde_json::Value,
77    },
78    ToolCallEnd {
79        task: String,
80        tool: String,
81        output: serde_json::Value,
82        duration: Duration,
83    },
84    Checkpoint {
85        name: String,
86        token: String,
87        prompt: String,
88        schema: serde_json::Value,
89        timeout_secs: Option<u64>,
90        /// Why the engine suspended. `DagPosition` for a plain
91        /// `checkpoint cp(...)` call site; `ValidationExhausted` /
92        /// `AgentUnable` when a task-level gate routed here; `Unknown` for
93        /// discriminants added in a newer akribes-core the SDK doesn't yet
94        /// know about (forward-compat; see [`crate::suspend`]).
95        trigger: SuspendTrigger,
96    },
97    ToolApproval {
98        token: String,
99        tool_ref: String,
100        args: serde_json::Value,
101        execution_id: Option<String>,
102        node_id: Option<u64>,
103    },
104    Breakpoint {
105        token: String,
106        node_id: u64,
107        env: HashMap<String, serde_json::Value>,
108    },
109    Error {
110        message: String,
111        kind: akribes_types::error::ErrorKind,
112        /// Stable diagnostic code (e.g. `"AKRIBES-E-SCRIPT-DEPTH"`). Mirrored
113        /// from `akribes_types::event::EngineEvent::Error.code`. `None` on
114        /// legacy errors without a registered code (#429).
115        code: Option<String>,
116    },
117    /// A structured-output task's response failed validation. Mirrors
118    /// `akribes_types::event::EngineEvent::ValidationFailure`. Emitted in
119    /// addition to the existing `Log` line so consumers without this
120    /// variant still render the human-readable summary, but tooling that
121    /// knows about the variant can render the model's actual response,
122    /// the schema-validator's structured error breakdown, and the
123    /// provider's `stop_reason` (so e.g. a `max_tokens` truncation isn't
124    /// misdiagnosed as "schema overflow" — see issue #320).
125    ValidationFailure {
126        task_name: String,
127        /// 1-indexed attempt number.
128        attempt: u32,
129        /// Raw text / JSON the validator saw, exactly as the model emitted.
130        model_response: String,
131        /// JSON-pointer paths to required fields the schema validator
132        /// flagged as absent.
133        missing_fields: Vec<String>,
134        /// Paths to fields rejected by `additionalProperties: false`.
135        extra_fields: Vec<String>,
136        /// Human-readable type / value mismatches (e.g. `"expected string,
137        /// got null at /name"`).
138        type_errors: Vec<String>,
139        /// Provider's stop_reason when known. `None` for streaming paths
140        /// that don't surface usage.
141        stop_reason: Option<String>,
142    },
143    /// A `runtime` block began dispatching to the sandbox executor.
144    /// Mirrors `EngineEvent::RuntimeStart`. `task_name` matches the
145    /// wrapping task's name so reducers that group by task continue to
146    /// work; `runtime_name` is the source-declared block name.
147    RuntimeStart {
148        task_name: String,
149        runtime_name: String,
150        /// `"python" | "bash" | "node" | "rust" | "java"`. Free-form
151        /// string on the wire so a future language doesn't require an
152        /// SDK release.
153        language: String,
154    },
155    /// One chunk of stdout from a running `runtime` block. Many may fire
156    /// per invocation; consumers should accumulate.
157    RuntimeStdout {
158        task_name: String,
159        chunk: String,
160    },
161    /// One chunk of stderr from a running `runtime` block.
162    RuntimeStderr {
163        task_name: String,
164        chunk: String,
165    },
166    /// A `runtime` block completed (the executor returned an
167    /// `ExecResult`). A non-zero `exit_code` is still a `RuntimeEnd` —
168    /// infrastructure failures (timeout / OOM / unreachable sandbox)
169    /// emit [`Self::RuntimeError`] instead.
170    RuntimeEnd {
171        task_name: String,
172        exit_code: i32,
173        duration_ms: u64,
174    },
175    /// A `runtime` block failed to complete. `kind` is a stable wire
176    /// string mirroring the engine's `RuntimeError` enum
177    /// (`NotConfigured` / `Timeout` / `SandboxUnavailable` / `OomKilled`
178    /// / `Internal`); use [`crate::runtime::RuntimeErrorKind::from_wire`]
179    /// to pattern-match without re-parsing the string.
180    RuntimeError {
181        task_name: String,
182        kind: String,
183        message: String,
184    },
185    /// Catch-all for variants that don't need dedicated fields in the SDK:
186    /// `StateUpdate`, `Log`, `NodeStart`, `NodeEnd`, `Resumed`,
187    /// `BreakpointResumed`, `McpServerDegraded`, `McpServerRecovered`,
188    /// `TaskPrompt`, `VerificationStart`, `VerificationResult`.
189    ///
190    /// Preserves the original wire type name and JSON payload for consumers
191    /// who want to reach in and pick them apart.
192    Other {
193        type_name: String,
194        payload: serde_json::Value,
195    },
196}
197
198/// Coarse category tag for a [`WorkflowEvent`], useful for routing (e.g.
199/// "show only progress in the status bar", "write tool events to a log").
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
201pub enum EventCategory {
202    Progress,
203    Output,
204    Tool,
205    Suspend,
206    Error,
207    Other,
208}
209
210impl WorkflowEvent {
211    /// Coarse routing category for this event.
212    pub fn category(&self) -> EventCategory {
213        match self {
214            Self::Start { .. }
215            | Self::End { .. }
216            | Self::TaskStart { .. }
217            | Self::TaskEnd { .. } => EventCategory::Progress,
218            // `RuntimeStart` and `RuntimeEnd` are the bookends for a
219            // single sandbox dispatch — they sit alongside `TaskStart` /
220            // `TaskEnd` rather than producing user-visible output, so
221            // they group with Progress for status-bar consumers.
222            Self::RuntimeStart { .. } | Self::RuntimeEnd { .. } => EventCategory::Progress,
223            Self::AgentChunk { .. } => EventCategory::Output,
224            // Runtime stdout/stderr are the equivalent of `AgentChunk`
225            // for the executor path — typed text the consumer is meant
226            // to surface to the user as it streams in.
227            Self::RuntimeStdout { .. } | Self::RuntimeStderr { .. } => EventCategory::Output,
228            Self::ValidationFailure { .. } => EventCategory::Output,
229            Self::ToolCallStart { .. } | Self::ToolCallEnd { .. } => EventCategory::Tool,
230            Self::Checkpoint { .. } | Self::ToolApproval { .. } | Self::Breakpoint { .. } => {
231                EventCategory::Suspend
232            }
233            Self::Error { .. } => EventCategory::Error,
234            // Sandbox infrastructure failure (timeout / OOM / unreachable)
235            // is an in-task error — the wrapping `TaskEnd` is what marks
236            // the workflow's outcome. Categorise as Error so consumers
237            // that filter by category still see it surface.
238            Self::RuntimeError { .. } => EventCategory::Error,
239            Self::Other { .. } => EventCategory::Other,
240        }
241    }
242
243    /// Whether this event is terminal for a workflow run. Used by
244    /// [`crate::RunStream`] to decide when to stop yielding.
245    ///
246    /// `Runtime*` events are NOT terminal — they live inside a task that
247    /// is itself bookended by `TaskStart` / `TaskEnd`, and the workflow
248    /// only ends on the outer `End` / `Error` events.
249    pub(crate) fn is_terminal(&self) -> bool {
250        matches!(self, Self::End { .. } | Self::Error { .. })
251    }
252}
253
254impl From<EngineEvent> for WorkflowEvent {
255    fn from(evt: EngineEvent) -> Self {
256        // #993 back-compat: legacy emissions wrapped each call-stack
257        // level in its own SubScript envelope. The post-#993 emit path
258        // already flattens before sending, but old persisted event logs
259        // may still nest. Normalize once at the SDK boundary so
260        // downstream consumers always see the flat `parent_path` shape.
261        let evt = evt.flatten_subscript_chain();
262        match evt {
263            EngineEvent::WorkflowStart(total_tasks) => Self::Start { total_tasks },
264
265            EngineEvent::WorkflowEnd(payload) => Self::End {
266                output: payload.value.to_json(),
267                // `WorkflowEnd` on the wire has no duration field — the
268                // server-side timing lives in `ExecutionStatus`. Expose
269                // `Duration::ZERO` so the struct still has a non-optional
270                // field; consumers who need wall-clock duration should use
271                // `ExecutionsClient::get`.
272                duration: Duration::ZERO,
273                totals: payload.totals,
274            },
275
276            EngineEvent::TaskStart(task, on_error) => Self::TaskStart { task, on_error },
277
278            EngineEvent::TaskEnd {
279                task,
280                on_error_label: _,
281                value,
282                value_type: _,
283                duration,
284                attempt: _,
285                usage,
286                variant,
287            } => Self::TaskEnd {
288                task,
289                output: value.to_json(),
290                duration,
291                usage,
292                variant: variant.into(),
293            },
294
295            EngineEvent::AgentOutput {
296                task_name,
297                agent_name,
298                task_id,
299                schema_type: _,
300                chunk,
301            } => Self::AgentChunk {
302                task: task_name,
303                agent: agent_name,
304                task_id,
305                chunk,
306            },
307
308            EngineEvent::ToolCallStart {
309                task_name,
310                tool_name,
311                server_name,
312                input,
313                ..
314            } => Self::ToolCallStart {
315                task: task_name,
316                tool: tool_name,
317                server: server_name,
318                input,
319            },
320
321            EngineEvent::ToolCallEnd {
322                task_name,
323                tool_name,
324                output,
325                duration,
326                ..
327            } => Self::ToolCallEnd {
328                task: task_name,
329                tool: tool_name,
330                output,
331                duration,
332            },
333
334            EngineEvent::Suspended {
335                checkpoint_name,
336                token,
337                prompt,
338                schema,
339                actor_hint: _,
340                timeout_secs,
341                trigger,
342                // The loop context lives on the engine event for the
343                // server's persistence path; it is not surfaced through
344                // the public WorkflowEvent today (consumers reading the
345                // SDK's typed event stream are happy with the existing
346                // `Checkpoint` shape — Studio reads the raw EngineEvent
347                // when it needs the loop_id). Drop here.
348                loop_context: _,
349            } => Self::Checkpoint {
350                name: checkpoint_name,
351                token,
352                prompt,
353                schema,
354                timeout_secs,
355                trigger: trigger.into(),
356            },
357
358            EngineEvent::ToolApprovalPending {
359                execution_id,
360                node_id,
361                token,
362                tool_ref,
363                args,
364            } => Self::ToolApproval {
365                token,
366                tool_ref,
367                args,
368                execution_id,
369                node_id,
370            },
371
372            EngineEvent::Breakpoint {
373                node_id,
374                span: _,
375                token,
376                env_snapshot,
377            } => Self::Breakpoint {
378                token,
379                node_id: node_id as u64,
380                env: env_snapshot
381                    .into_iter()
382                    .map(|(k, v)| (k, v.to_json()))
383                    .collect(),
384            },
385
386            // Project the engine's richer envelope down to the SDK's
387            // legacy two-fields-plus-string-code shape. The full detail
388            // (`user_message`, `retry_after_ms`, `source`) is available on
389            // the underlying `EngineEvent` for SDK consumers that read the
390            // raw event stream.
391            EngineEvent::Error {
392                message,
393                kind,
394                code,
395                ..
396            } => Self::Error {
397                message,
398                kind,
399                code: Some(code.as_wire().to_string()),
400            },
401
402            EngineEvent::ValidationFailure {
403                task_name,
404                attempt,
405                model_response,
406                missing_fields,
407                extra_fields,
408                type_errors,
409                stop_reason,
410                truncated: _,
411                total_length: _,
412            } => Self::ValidationFailure {
413                task_name,
414                attempt,
415                model_response,
416                missing_fields,
417                extra_fields,
418                type_errors,
419                stop_reason,
420            },
421
422            // Long tail — anything we don't type explicitly becomes `Other`
423            // with the wire name and raw JSON preserved. This keeps the SDK
424            // forward-compatible: when akribes-core adds a new EngineEvent
425            // variant, existing consumers of WorkflowEvent keep compiling
426            // and see it as `Other` instead of breaking.
427            other => {
428                let type_name = engine_event_type_name(&other).to_string();
429                let payload = serde_json::to_value(&other).unwrap_or(serde_json::Value::Null);
430                Self::Other { type_name, payload }
431            }
432        }
433    }
434}
435
436// ── RuntimeEvent → WorkflowEvent ────────────────────────────────────────────
437//
438// Lets [`WorkflowEvent::from_envelope_json`] (and any future helper) materialise
439// typed runtime arms without touching `EngineEvent`. When `EngineEvent` gains
440// the corresponding `Runtime*` variants in akribes-core (unit 3), the
441// `From<EngineEvent>` impl above can match them directly and call into these
442// same arms.
443
444impl From<RuntimeEvent> for WorkflowEvent {
445    fn from(evt: RuntimeEvent) -> Self {
446        match evt {
447            RuntimeEvent::RuntimeStart(RuntimeStartPayload {
448                task_name,
449                runtime_name,
450                language,
451            }) => Self::RuntimeStart {
452                task_name,
453                runtime_name,
454                language,
455            },
456            RuntimeEvent::RuntimeStdout(RuntimeStdoutPayload { task_name, chunk }) => {
457                Self::RuntimeStdout { task_name, chunk }
458            }
459            RuntimeEvent::RuntimeStderr(RuntimeStderrPayload { task_name, chunk }) => {
460                Self::RuntimeStderr { task_name, chunk }
461            }
462            RuntimeEvent::RuntimeEnd(RuntimeEndPayload {
463                task_name,
464                exit_code,
465                duration_ms,
466            }) => Self::RuntimeEnd {
467                task_name,
468                exit_code,
469                duration_ms,
470            },
471            RuntimeEvent::RuntimeError(RuntimeErrorPayload {
472                task_name,
473                kind,
474                message,
475            }) => Self::RuntimeError {
476                task_name,
477                kind,
478                message,
479            },
480        }
481    }
482}
483
484// ── Wire-envelope decoder ──────────────────────────────────────────────────
485//
486// Today's main decoder is `From<EngineEvent>`: every JSON envelope is parsed
487// to `EngineEvent` first and then projected to `WorkflowEvent`. That works for
488// every variant `akribes-core` already knows about, but the engine's
489// `Runtime*` variants ship in a sibling unit (#3) — until those land, JSON
490// envelopes carrying `"type": "RuntimeStart"` and friends would fail to
491// deserialise as `EngineEvent` and surface to the consumer as a parse error.
492//
493// `from_envelope_json` runs the runtime decoder first: it accepts any
494// `{type, payload}` JSON and returns a typed `WorkflowEvent` arm if the
495// envelope matches one of the five canonical `Runtime*` types. Anything else
496// falls back through `EngineEvent` deserialisation (preserving the existing
497// `Other` long-tail behaviour). The result is that the SDK can emit typed
498// runtime arms today without waiting on the engine merge, and once unit 3
499// lands the `EngineEvent` decoder catches the same envelopes at the same
500// place — both paths produce equivalent typed `WorkflowEvent` values.
501
502/// Decode error returned by [`WorkflowEvent::from_envelope_json`].
503#[derive(Debug, thiserror::Error)]
504pub enum EnvelopeDecodeError {
505    /// The envelope's `"type"` was a recognised `Runtime*` tag but the
506    /// payload failed to deserialise — wire shape mismatch.
507    #[error("invalid Runtime envelope: {0}")]
508    Runtime(#[source] serde_json::Error),
509    /// The envelope did not match the runtime decoder *or* the engine
510    /// decoder. The wrapped error is from the `EngineEvent` parser.
511    #[error("failed to decode engine event: {0}")]
512    Engine(#[source] serde_json::Error),
513}
514
515impl WorkflowEvent {
516    /// Decode a raw `{type, payload}` JSON envelope into a typed
517    /// [`WorkflowEvent`].
518    ///
519    /// Tries the `Runtime*` decoder first (5 canonical types from
520    /// `crates/akribes-core/src/event.rs`). If the envelope's `"type"` is
521    /// not one of those, it falls back to deserialising the JSON as
522    /// [`EngineEvent`] and routing through the existing
523    /// [`From<EngineEvent>`] projection.
524    ///
525    /// Returns [`EnvelopeDecodeError::Engine`] if both paths fail. The
526    /// runtime decoder only errors when the `"type"` *was* a runtime tag
527    /// but the payload shape was wrong — that surfaces as
528    /// [`EnvelopeDecodeError::Runtime`] and is not retried via the engine
529    /// path (a payload-shape mismatch on a known runtime tag is the only
530    /// way the runtime arm could be lossy, so we surface it explicitly).
531    pub fn from_envelope_json(value: serde_json::Value) -> Result<Self, EnvelopeDecodeError> {
532        // Peek at the wire `"type"` to decide which decoder to invoke.
533        // We deliberately don't pre-validate the shape — let the typed
534        // decoders error normally.
535        let type_tag = value.get("type").and_then(|t| t.as_str()).unwrap_or("");
536        if matches!(
537            type_tag,
538            "RuntimeStart" | "RuntimeStdout" | "RuntimeStderr" | "RuntimeEnd" | "RuntimeError"
539        ) {
540            let runtime: RuntimeEvent =
541                serde_json::from_value(value).map_err(EnvelopeDecodeError::Runtime)?;
542            return Ok(runtime.into());
543        }
544        let engine: EngineEvent =
545            serde_json::from_value(value).map_err(EnvelopeDecodeError::Engine)?;
546        Ok(engine.into())
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use akribes_types::ast::Span;
554    use akribes_types::error::ErrorKind;
555    use akribes_types::value::Value;
556
557    fn span() -> Span {
558        Span {
559            line: 1,
560            col: 1,
561            end_line: 1,
562            end_col: 1,
563        }
564    }
565
566    #[test]
567    fn start_and_end_map_to_progress() {
568        let start: WorkflowEvent = EngineEvent::WorkflowStart(5).into();
569        assert!(matches!(start, WorkflowEvent::Start { total_tasks: 5 }));
570        assert_eq!(start.category(), EventCategory::Progress);
571
572        let end: WorkflowEvent = EngineEvent::WorkflowEnd(
573            akribes_types::event::WorkflowEndPayload::new(Value::String("done".into())),
574        )
575        .into();
576        match end {
577            WorkflowEvent::End { output, .. } => {
578                assert_eq!(output, serde_json::Value::String("done".into()));
579            }
580            _ => panic!("expected End"),
581        }
582    }
583
584    #[test]
585    fn agent_output_maps_to_chunk() {
586        let evt: WorkflowEvent = EngineEvent::AgentOutput {
587            task_name: "summarise".into(),
588            agent_name: Some("gpt".into()),
589            task_id: "t1".into(),
590            schema_type: None,
591            chunk: "hi".into(),
592        }
593        .into();
594        match evt {
595            WorkflowEvent::AgentChunk {
596                task,
597                agent,
598                task_id,
599                chunk,
600            } => {
601                assert_eq!(task, "summarise");
602                assert_eq!(agent.as_deref(), Some("gpt"));
603                assert_eq!(task_id, "t1");
604                assert_eq!(chunk, "hi");
605            }
606            _ => panic!("expected AgentChunk"),
607        }
608    }
609
610    #[test]
611    fn tool_calls_map_to_tool_category() {
612        let start: WorkflowEvent = EngineEvent::ToolCallStart {
613            task_name: "t".into(),
614            tool_name: "web".into(),
615            server_name: "s".into(),
616            input: serde_json::json!({"q": "hi"}),
617            tool_use_id: String::new(),
618        }
619        .into();
620        assert_eq!(start.category(), EventCategory::Tool);
621
622        let end: WorkflowEvent = EngineEvent::ToolCallEnd {
623            task_name: "t".into(),
624            tool_name: "web".into(),
625            tool_use_id: String::new(),
626            output: serde_json::json!({"r": "ok"}),
627            duration: Duration::from_millis(10),
628        }
629        .into();
630        assert_eq!(end.category(), EventCategory::Tool);
631    }
632
633    #[test]
634    fn suspended_maps_to_checkpoint() {
635        let evt: WorkflowEvent = EngineEvent::Suspended {
636            checkpoint_name: "approve".into(),
637            token: "tok".into(),
638            prompt: "please".into(),
639            schema: serde_json::json!({}),
640            actor_hint: akribes_types::ast::ActorHint::Any,
641            timeout_secs: Some(30),
642            trigger: akribes_types::event::SuspendTrigger::DagPosition,
643            loop_context: None,
644        }
645        .into();
646        assert_eq!(evt.category(), EventCategory::Suspend);
647        match evt {
648            WorkflowEvent::Checkpoint {
649                name,
650                token,
651                timeout_secs,
652                trigger,
653                ..
654            } => {
655                assert_eq!(name, "approve");
656                assert_eq!(token, "tok");
657                assert_eq!(timeout_secs, Some(30));
658                assert!(matches!(trigger, SuspendTrigger::DagPosition));
659            }
660            _ => panic!("expected Checkpoint"),
661        }
662    }
663
664    #[test]
665    fn suspended_with_validation_exhausted_trigger_survives_translation() {
666        let evt: WorkflowEvent = EngineEvent::Suspended {
667            checkpoint_name: "review".into(),
668            token: "tok".into(),
669            prompt: "please review".into(),
670            schema: serde_json::json!({}),
671            actor_hint: akribes_types::ast::ActorHint::Human,
672            timeout_secs: None,
673            trigger: akribes_types::event::SuspendTrigger::ValidationExhausted {
674                task_name: "decompose".into(),
675                retry_count: 3,
676                last_attempt: "{\"bad\":true}".into(),
677                validation_errors: vec![akribes_types::event::ValidationErrorWire {
678                    stage: "schema".into(),
679                    message: "missing number".into(),
680                    path: Some("/0".into()),
681                }],
682            },
683            loop_context: None,
684        }
685        .into();
686        match evt {
687            WorkflowEvent::Checkpoint { trigger, .. } => match trigger {
688                SuspendTrigger::ValidationExhausted {
689                    task_name,
690                    retry_count,
691                    validation_errors,
692                    ..
693                } => {
694                    assert_eq!(task_name, "decompose");
695                    assert_eq!(retry_count, 3);
696                    assert_eq!(validation_errors.len(), 1);
697                    assert_eq!(validation_errors[0].stage, "schema");
698                }
699                other => panic!("expected ValidationExhausted, got {other:?}"),
700            },
701            _ => panic!("expected Checkpoint"),
702        }
703    }
704
705    #[test]
706    fn suspended_with_agent_unable_trigger_survives_translation() {
707        let evt: WorkflowEvent = EngineEvent::Suspended {
708            checkpoint_name: "escalate".into(),
709            token: "tok".into(),
710            prompt: "take over".into(),
711            schema: serde_json::json!({}),
712            actor_hint: akribes_types::ast::ActorHint::Human,
713            timeout_secs: None,
714            trigger: akribes_types::event::SuspendTrigger::AgentUnable {
715                task_name: "decompose".into(),
716                unable: akribes_types::value::UnableRecord {
717                    reason: "image too blurry".into(),
718                    missing: vec!["claim_text".into()],
719                    category: akribes_types::value::UnableCategory::InputAmbiguous,
720                },
721            },
722            loop_context: None,
723        }
724        .into();
725        match evt {
726            WorkflowEvent::Checkpoint { trigger, .. } => match trigger {
727                SuspendTrigger::AgentUnable { task_name, unable } => {
728                    assert_eq!(task_name, "decompose");
729                    assert_eq!(unable.reason, "image too blurry");
730                    assert_eq!(unable.category, "input_ambiguous");
731                    assert_eq!(unable.missing, vec!["claim_text".to_string()]);
732                }
733                other => panic!("expected AgentUnable, got {other:?}"),
734            },
735            _ => panic!("expected Checkpoint"),
736        }
737    }
738
739    #[test]
740    fn tool_approval_has_suspend_category() {
741        let evt: WorkflowEvent = EngineEvent::ToolApprovalPending {
742            execution_id: Some("exec".into()),
743            node_id: Some(1),
744            token: "tk".into(),
745            tool_ref: "web".into(),
746            args: serde_json::json!({}),
747        }
748        .into();
749        assert_eq!(evt.category(), EventCategory::Suspend);
750    }
751
752    #[test]
753    fn log_has_other_category() {
754        let evt: WorkflowEvent = EngineEvent::Log("hello".into()).into();
755        assert_eq!(evt.category(), EventCategory::Other);
756    }
757
758    #[test]
759    fn tool_approval_maps() {
760        let evt: WorkflowEvent = EngineEvent::ToolApprovalPending {
761            execution_id: Some("e1".into()),
762            node_id: Some(42),
763            token: "tok".into(),
764            tool_ref: "web.search".into(),
765            args: serde_json::json!({"q": "hi"}),
766        }
767        .into();
768        match evt {
769            WorkflowEvent::ToolApproval {
770                token,
771                tool_ref,
772                node_id,
773                ..
774            } => {
775                assert_eq!(token, "tok");
776                assert_eq!(tool_ref, "web.search");
777                assert_eq!(node_id, Some(42));
778            }
779            _ => panic!("expected ToolApproval"),
780        }
781    }
782
783    #[test]
784    fn breakpoint_casts_node_id() {
785        let mut env = std::collections::HashMap::new();
786        env.insert("x".to_string(), Value::Int(7));
787        let evt: WorkflowEvent = EngineEvent::Breakpoint {
788            node_id: 3usize,
789            span: span(),
790            token: "tok".into(),
791            env_snapshot: env,
792        }
793        .into();
794        match evt {
795            WorkflowEvent::Breakpoint {
796                token,
797                node_id,
798                env,
799            } => {
800                assert_eq!(token, "tok");
801                assert_eq!(node_id, 3u64);
802                assert_eq!(env.get("x"), Some(&serde_json::json!(7)));
803            }
804            _ => panic!("expected Breakpoint"),
805        }
806    }
807
808    #[test]
809    fn error_maps_to_error_category() {
810        let evt: WorkflowEvent = EngineEvent::error_kind(ErrorKind::ScriptError, "boom").into();
811        assert_eq!(evt.category(), EventCategory::Error);
812        match evt {
813            WorkflowEvent::Error { message, kind, .. } => {
814                assert_eq!(message, "boom");
815                assert_eq!(kind, ErrorKind::ScriptError);
816            }
817            _ => panic!("expected Error"),
818        }
819    }
820
821    #[test]
822    fn task_end_preserves_usage() {
823        let usage = TokenUsage {
824            input_tokens: 10,
825            output_tokens: 20,
826            model: "m".into(),
827            provider: "p".into(),
828            cached_input_tokens: 0,
829            cache_write_input_tokens: 0,
830            cache_write_5m_input_tokens: 0,
831            cache_write_1h_input_tokens: 0,
832            stop_reason: None,
833            raw_stop_reason: None,
834            reasoning_tokens: 0,
835        };
836        let evt: WorkflowEvent = EngineEvent::TaskEnd {
837            task: "t".into(),
838            on_error_label: None,
839            value: Value::String("ok".into()),
840            value_type: None,
841            duration: Duration::from_millis(100),
842            attempt: 1,
843            usage: Some(usage),
844            variant: akribes_types::event::TaskEndVariant::Success,
845        }
846        .into();
847        match evt {
848            WorkflowEvent::TaskEnd {
849                task,
850                usage,
851                duration,
852                variant,
853                ..
854            } => {
855                assert_eq!(task, "t");
856                assert_eq!(duration, Duration::from_millis(100));
857                assert_eq!(usage.unwrap().input_tokens, 10);
858                assert_eq!(variant, TaskEndVariant::Success);
859            }
860            _ => panic!("expected TaskEnd"),
861        }
862    }
863
864    #[test]
865    fn task_end_propagates_unable_variant() {
866        // Wave-1 #206: `variant` on TaskEnd distinguishes Unable from
867        // Success without consumers having to re-parse the value payload.
868        let evt: WorkflowEvent = EngineEvent::TaskEnd {
869            task: "decompose".into(),
870            on_error_label: None,
871            value: Value::Unable(akribes_types::value::UnableRecord {
872                reason: "image too blurry".into(),
873                missing: vec![],
874                category: akribes_types::value::UnableCategory::InputAmbiguous,
875            }),
876            value_type: None,
877            duration: Duration::from_millis(10),
878            attempt: 1,
879            usage: None,
880            variant: akribes_types::event::TaskEndVariant::Unable,
881        }
882        .into();
883        match evt {
884            WorkflowEvent::TaskEnd { variant, .. } => {
885                assert_eq!(variant, TaskEndVariant::Unable);
886            }
887            _ => panic!("expected TaskEnd"),
888        }
889    }
890
891    // ── Long-tail variants fall into Other ──────────────────────────────────
892
893    fn assert_other_named(evt: EngineEvent, expected: &str) {
894        let wf: WorkflowEvent = evt.into();
895        match wf {
896            WorkflowEvent::Other { type_name, .. } => {
897                assert_eq!(type_name, expected);
898            }
899            _ => panic!("expected Other({}), got {:?}", expected, wf),
900        }
901    }
902
903    #[test]
904    fn log_is_other() {
905        assert_other_named(EngineEvent::Log("hi".into()), "Log");
906    }
907
908    #[test]
909    fn state_update_is_other() {
910        assert_other_named(
911            EngineEvent::StateUpdate("x".into(), Value::Int(1)),
912            "StateUpdate",
913        );
914    }
915
916    #[test]
917    fn node_start_end_are_other() {
918        assert_other_named(EngineEvent::NodeStart(0, span()), "NodeStart");
919        assert_other_named(
920            EngineEvent::NodeEnd {
921                node_id: 0,
922                span: span(),
923                target_var: None,
924                value: None,
925                duration: Duration::ZERO,
926            },
927            "NodeEnd",
928        );
929    }
930
931    #[test]
932    fn resumed_is_other() {
933        assert_other_named(
934            EngineEvent::Resumed {
935                checkpoint_name: "c".into(),
936                token: "t".into(),
937            },
938            "Resumed",
939        );
940    }
941
942    #[test]
943    fn breakpoint_resumed_is_other() {
944        assert_other_named(
945            EngineEvent::BreakpointResumed {
946                node_id: 1,
947                token: "t".into(),
948            },
949            "BreakpointResumed",
950        );
951    }
952
953    #[test]
954    fn mcp_degraded_recovered_are_other() {
955        assert_other_named(
956            EngineEvent::McpServerDegraded {
957                alias: "a".into(),
958                reason: "r".into(),
959            },
960            "McpServerDegraded",
961        );
962        assert_other_named(
963            EngineEvent::McpServerRecovered { alias: "a".into() },
964            "McpServerRecovered",
965        );
966    }
967
968    #[test]
969    fn task_prompt_is_other() {
970        assert_other_named(
971            EngineEvent::TaskPrompt("t".into(), "p".into()),
972            "TaskPrompt",
973        );
974    }
975
976    #[test]
977    fn verification_events_are_other() {
978        assert_other_named(
979            EngineEvent::VerificationStart {
980                workflow_name: "w".into(),
981            },
982            "VerificationStart",
983        );
984        assert_other_named(
985            EngineEvent::VerificationResult {
986                workflow_name: "w".into(),
987                results: serde_json::json!({}),
988                duration: Duration::ZERO,
989            },
990            "VerificationResult",
991        );
992    }
993
994    #[test]
995    fn other_payload_preserves_type_tag() {
996        let evt: WorkflowEvent = EngineEvent::Log("hello".into()).into();
997        match evt {
998            WorkflowEvent::Other { type_name, payload } => {
999                assert_eq!(type_name, "Log");
1000                assert_eq!(payload["type"], "Log");
1001                assert_eq!(payload["payload"], "hello");
1002            }
1003            _ => panic!("expected Other"),
1004        }
1005    }
1006
1007    #[test]
1008    fn validation_failure_maps_to_typed_variant() {
1009        let evt: WorkflowEvent = EngineEvent::ValidationFailure {
1010            task_name: "decompose".into(),
1011            attempt: 2,
1012            model_response: "{}".into(),
1013            missing_fields: vec!["/claim_text".into()],
1014            extra_fields: vec![],
1015            type_errors: vec![],
1016            stop_reason: Some("max_tokens".into()),
1017            truncated: false,
1018            total_length: 2,
1019        }
1020        .into();
1021        match evt {
1022            WorkflowEvent::ValidationFailure {
1023                task_name,
1024                attempt,
1025                model_response,
1026                missing_fields,
1027                extra_fields,
1028                type_errors,
1029                stop_reason,
1030            } => {
1031                assert_eq!(task_name, "decompose");
1032                assert_eq!(attempt, 2);
1033                assert_eq!(model_response, "{}");
1034                assert_eq!(missing_fields, vec!["/claim_text".to_string()]);
1035                assert!(extra_fields.is_empty());
1036                assert!(type_errors.is_empty());
1037                assert_eq!(stop_reason.as_deref(), Some("max_tokens"));
1038            }
1039            other => panic!("expected ValidationFailure, got {:?}", other),
1040        }
1041    }
1042
1043    #[test]
1044    fn validation_failure_has_output_category() {
1045        let evt: WorkflowEvent = EngineEvent::ValidationFailure {
1046            task_name: "t".into(),
1047            attempt: 1,
1048            model_response: "".into(),
1049            missing_fields: vec![],
1050            extra_fields: vec![],
1051            type_errors: vec![],
1052            stop_reason: None,
1053            truncated: false,
1054            total_length: 0,
1055        }
1056        .into();
1057        assert_eq!(evt.category(), EventCategory::Output);
1058    }
1059
1060    // ── Runtime* variants ───────────────────────────────────────────────────
1061    //
1062    // Field-level decode roundtrips live in `tests/runtime_events.rs` (via the
1063    // JSON envelope decoder). These lib tests cover what that crate can't:
1064    // the direct `RuntimeEvent → WorkflowEvent` projection (`From` impl) and
1065    // the `pub(crate)` `is_terminal()` invariant.
1066
1067    use crate::runtime::{
1068        RuntimeEndPayload, RuntimeErrorPayload, RuntimeEvent, RuntimeStartPayload,
1069        RuntimeStderrPayload, RuntimeStdoutPayload,
1070    };
1071
1072    #[test]
1073    fn runtime_event_projects_every_variant() {
1074        let cases: [(RuntimeEvent, fn(&WorkflowEvent) -> bool); 5] = [
1075            (
1076                RuntimeEvent::RuntimeStart(RuntimeStartPayload {
1077                    task_name: "t".into(),
1078                    runtime_name: "r".into(),
1079                    language: "python".into(),
1080                }),
1081                |e| matches!(e, WorkflowEvent::RuntimeStart { language, .. } if language == "python"),
1082            ),
1083            (
1084                RuntimeEvent::RuntimeStdout(RuntimeStdoutPayload {
1085                    task_name: "t".into(),
1086                    chunk: "x".into(),
1087                }),
1088                |e| matches!(e, WorkflowEvent::RuntimeStdout { chunk, .. } if chunk == "x"),
1089            ),
1090            (
1091                RuntimeEvent::RuntimeStderr(RuntimeStderrPayload {
1092                    task_name: "t".into(),
1093                    chunk: "x".into(),
1094                }),
1095                |e| matches!(e, WorkflowEvent::RuntimeStderr { .. }),
1096            ),
1097            (
1098                RuntimeEvent::RuntimeEnd(RuntimeEndPayload {
1099                    task_name: "t".into(),
1100                    exit_code: 0,
1101                    duration_ms: 4242,
1102                }),
1103                |e| {
1104                    matches!(
1105                        e,
1106                        WorkflowEvent::RuntimeEnd {
1107                            duration_ms: 4242,
1108                            ..
1109                        }
1110                    )
1111                },
1112            ),
1113            (
1114                RuntimeEvent::RuntimeError(RuntimeErrorPayload {
1115                    task_name: "t".into(),
1116                    kind: "Timeout".into(),
1117                    message: "x".into(),
1118                }),
1119                |e| matches!(e, WorkflowEvent::RuntimeError { kind, .. } if kind == "Timeout"),
1120            ),
1121        ];
1122        for (input, check) in cases {
1123            let evt: WorkflowEvent = input.into();
1124            assert!(check(&evt), "projection failed: {evt:?}");
1125        }
1126    }
1127
1128    #[test]
1129    fn runtime_events_are_not_terminal() {
1130        // Runtime* events sit inside a task; the workflow terminates only
1131        // on the outer End/Error. RunStream must not stop on them.
1132        let events = [
1133            WorkflowEvent::RuntimeStart {
1134                task_name: "t".into(),
1135                runtime_name: "r".into(),
1136                language: "python".into(),
1137            },
1138            WorkflowEvent::RuntimeStdout {
1139                task_name: "t".into(),
1140                chunk: "x".into(),
1141            },
1142            WorkflowEvent::RuntimeStderr {
1143                task_name: "t".into(),
1144                chunk: "x".into(),
1145            },
1146            WorkflowEvent::RuntimeEnd {
1147                task_name: "t".into(),
1148                exit_code: 0,
1149                duration_ms: 0,
1150            },
1151            WorkflowEvent::RuntimeError {
1152                task_name: "t".into(),
1153                kind: "Timeout".into(),
1154                message: "x".into(),
1155            },
1156        ];
1157        for evt in events {
1158            assert!(!evt.is_terminal(), "{evt:?} should not be terminal");
1159        }
1160    }
1161}