Skip to main content

axon/
flow_execution_event.rs

1//! §Fase 33.b — Flow execution event stream (Layer 1: data-flow integrity).
2//!
3//! D2 ratificada: `ServerExecutionResult` is replaced by an event-stream
4//! return type — the runner emits each event AS IT OCCURS, the SSE
5//! handler consumes the stream and forwards directly to the wire.
6//!
7//! ## The closed event-shape catalog
8//!
9//! Every observable moment in a flow's execution is one of five
10//! events. The catalog is **closed** — adding a new variant requires a
11//! D-letter amendment, not a runtime-only patch. Cross-stack drift gate
12//! locks the JSON shape so Python + Rust agree on every field byte-for-
13//! byte (`tests/fixtures/fase33_flow_execution_event/corpus.json`).
14//!
15//! - **FlowStart** — emitted once before any step. Establishes the
16//!   trace identity + chosen backend.
17//! - **StepStart** — emitted once per step at its boundary. Carries the
18//!   step's source-declared `step_type` so adopters can correlate the
19//!   wire event back to the AST.
20//! - **StepToken** — emitted per chunk produced by the step's
21//!   underlying backend. For streaming backends (Anthropic SSE,
22//!   OpenAI SSE, …) this fires per chunk AS THE BYTE ARRIVES.
23//!   For non-streaming backends (stub, future deterministic-only),
24//!   this fires once with the full step output (post-completion).
25//! - **StepComplete** — emitted once per step at its end boundary.
26//!   Carries the full output text + token-input/output counters.
27//! - **FlowComplete** — terminator (success path). Receiver MUST
28//!   treat this as the stream's end.
29//! - **FlowError** — terminator (failure path). Receiver MUST treat
30//!   this as the stream's end.
31//!
32//! ## Pillar trace per D2 + D10
33//!
34//! - **MATHEMATICS** — the catalog is a closed sum type; pattern matching
35//!   is exhaustive. Adding a sixth variant breaks the build cross-stack.
36//! - **LOGIC** — the receiver invariant is precise: exactly one
37//!   `FlowStart`, followed by per-step (`StepStart` → 0..N
38//!   `StepToken` → `StepComplete`), followed by exactly one
39//!   `FlowComplete` OR `FlowError`. Any sequence violating this
40//!   invariant is a producer bug, not a consumer concern.
41//! - **PHILOSOPHY** — the source declaration IS the runtime contract:
42//!   every `step S { ... }` declaration produces a `StepStart` /
43//!   `StepComplete` pair at runtime, named identically.
44//! - **COMPUTING** — events are JSON-serializable + clonable + the
45//!   stream is a `tokio::sync::mpsc::UnboundedReceiver`; the
46//!   producer never blocks the executor on a slow consumer (33.b
47//!   layer; backpressure policy from `<stream:<policy>>` is honored
48//!   in 33.e).
49
50use serde::{Deserialize, Serialize};
51
52/// One observable moment in a flow's execution. Closed catalog per D2.
53///
54/// Field naming + JSON serde-rename match the Python mirror
55/// (`axon/runtime/flow_execution_event.py`) byte-for-byte. The drift
56/// gate at `tests/fixtures/fase33_flow_execution_event/corpus.json`
57/// asserts both stacks produce byte-identical JSON for each variant.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
59#[serde(tag = "kind", rename_all = "snake_case")]
60pub enum FlowExecutionEvent {
61    /// Emitted exactly once at the very start of execution.
62    FlowStart {
63        flow_name: String,
64        backend: String,
65        timestamp_ms: u64,
66    },
67    /// Emitted exactly once per step at its start boundary.
68    StepStart {
69        step_name: String,
70        step_index: usize,
71        step_type: String,
72        timestamp_ms: u64,
73    },
74    /// Emitted per token / chunk produced by the step's underlying
75    /// backend. The granularity matches the backend's chunk size
76    /// (Anthropic SSE delta, OpenAI streaming chunk, etc.). For
77    /// non-streaming backends, fires once per step with the full
78    /// output (post-StepComplete in practice; the catalog allows it
79    /// to fire either before or after but the convention is during
80    /// execution).
81    StepToken {
82        step_name: String,
83        content: String,
84        /// Monotonic counter, per-flow. Restarts at 1 for each new
85        /// FlowStart. Adopter clients use this to correlate
86        /// `Last-Event-ID` resumes (W3C SSE spec).
87        token_index: u64,
88        timestamp_ms: u64,
89    },
90    /// Emitted exactly once per step at its end boundary.
91    StepComplete {
92        step_name: String,
93        step_index: usize,
94        success: bool,
95        full_output: String,
96        tokens_input: u64,
97        tokens_output: u64,
98        timestamp_ms: u64,
99    },
100    /// §Fase 33.y.k — Tool invocation chunk. Emitted by per-step
101    /// handlers when the upstream backend's chunk stream signals
102    /// `FinishReason::ToolUse` (provider invoked a tool mid-stream).
103    /// Closed-catalog event variant; D4 byte-compat preserves
104    /// adopter parsers — flows without declared `apply: <tool>`
105    /// never emit this event.
106    ///
107    /// `content` carries the tool-call's structured payload as a
108    /// canonical wire-stable string (provider-specific shape today;
109    /// future Fase 33.y.k.2 standardizes per-provider extraction
110    /// into a unified `tool_call_id + arguments` schema).
111    ToolCall {
112        step_name: String,
113        tool_name: String,
114        content: String,
115        timestamp_ms: u64,
116    },
117    /// Terminator — success path. Receiver MUST close the stream.
118    FlowComplete {
119        flow_name: String,
120        backend: String,
121        success: bool,
122        steps_executed: usize,
123        tokens_input: u64,
124        tokens_output: u64,
125        latency_ms: u64,
126        timestamp_ms: u64,
127    },
128    /// Terminator — failure path. Receiver MUST close the stream.
129    FlowError {
130        flow_name: String,
131        error: String,
132        timestamp_ms: u64,
133    },
134}
135
136impl FlowExecutionEvent {
137    /// Closed predicate: is this the terminator of the stream? After
138    /// emitting a terminator, the producer MUST drop the sender so
139    /// the receiver's `recv()` returns `None`.
140    pub fn is_terminator(&self) -> bool {
141        matches!(
142            self,
143            FlowExecutionEvent::FlowComplete { .. } | FlowExecutionEvent::FlowError { .. }
144        )
145    }
146
147    /// Closed predicate: is this event step-scoped (carries
148    /// `step_name`)?
149    pub fn is_step_scoped(&self) -> bool {
150        matches!(
151            self,
152            FlowExecutionEvent::StepStart { .. }
153                | FlowExecutionEvent::StepToken { .. }
154                | FlowExecutionEvent::StepComplete { .. }
155                | FlowExecutionEvent::ToolCall { .. }
156        )
157    }
158
159    /// String kind for diagnostic / log lines. Matches the JSON
160    /// `kind` discriminator field (`snake_case` per serde-rename).
161    pub fn kind(&self) -> &'static str {
162        match self {
163            FlowExecutionEvent::FlowStart { .. } => "flow_start",
164            FlowExecutionEvent::StepStart { .. } => "step_start",
165            FlowExecutionEvent::StepToken { .. } => "step_token",
166            FlowExecutionEvent::StepComplete { .. } => "step_complete",
167            FlowExecutionEvent::ToolCall { .. } => "tool_call",
168            FlowExecutionEvent::FlowComplete { .. } => "flow_complete",
169            FlowExecutionEvent::FlowError { .. } => "flow_error",
170        }
171    }
172}
173
174/// Current Unix-milliseconds timestamp. Helper used by producers
175/// emitting events.
176pub fn now_ms() -> u64 {
177    std::time::SystemTime::now()
178        .duration_since(std::time::UNIX_EPOCH)
179        .map(|d| d.as_millis() as u64)
180        .unwrap_or(0)
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    fn ev_flow_start() -> FlowExecutionEvent {
188        FlowExecutionEvent::FlowStart {
189            flow_name: "F".to_string(),
190            backend: "stub".to_string(),
191            timestamp_ms: 1_000_000,
192        }
193    }
194
195    fn ev_step_token() -> FlowExecutionEvent {
196        FlowExecutionEvent::StepToken {
197            step_name: "S".to_string(),
198            content: "hello".to_string(),
199            token_index: 1,
200            timestamp_ms: 1_000_001,
201        }
202    }
203
204    fn ev_flow_complete() -> FlowExecutionEvent {
205        FlowExecutionEvent::FlowComplete {
206            flow_name: "F".to_string(),
207            backend: "stub".to_string(),
208            success: true,
209            steps_executed: 1,
210            tokens_input: 0,
211            tokens_output: 1,
212            latency_ms: 50,
213            timestamp_ms: 1_000_010,
214        }
215    }
216
217    #[test]
218    fn flow_start_serializes_with_kind_discriminator() {
219        let s = serde_json::to_string(&ev_flow_start()).unwrap();
220        // Externally tagged enum with kind = "flow_start"; field order
221        // matches Python mirror.
222        assert!(s.contains(r#""kind":"flow_start""#));
223        assert!(s.contains(r#""flow_name":"F""#));
224        assert!(s.contains(r#""backend":"stub""#));
225        assert!(s.contains(r#""timestamp_ms":1000000"#));
226    }
227
228    #[test]
229    fn step_token_serializes_with_token_index() {
230        let s = serde_json::to_string(&ev_step_token()).unwrap();
231        assert!(s.contains(r#""kind":"step_token""#));
232        assert!(s.contains(r#""token_index":1"#));
233        assert!(s.contains(r#""content":"hello""#));
234    }
235
236    #[test]
237    fn flow_complete_serializes_with_latency_ms() {
238        let s = serde_json::to_string(&ev_flow_complete()).unwrap();
239        assert!(s.contains(r#""kind":"flow_complete""#));
240        assert!(s.contains(r#""steps_executed":1"#));
241        assert!(s.contains(r#""latency_ms":50"#));
242        assert!(s.contains(r#""success":true"#));
243    }
244
245    #[test]
246    fn round_trip_through_json_preserves_every_variant() {
247        let cases = vec![
248            ev_flow_start(),
249            FlowExecutionEvent::StepStart {
250                step_name: "S".to_string(),
251                step_index: 0,
252                step_type: "step".to_string(),
253                timestamp_ms: 1,
254            },
255            ev_step_token(),
256            FlowExecutionEvent::StepComplete {
257                step_name: "S".to_string(),
258                step_index: 0,
259                success: true,
260                full_output: "hello world".to_string(),
261                tokens_input: 0,
262                tokens_output: 2,
263                timestamp_ms: 2,
264            },
265            ev_flow_complete(),
266            FlowExecutionEvent::FlowError {
267                flow_name: "F".to_string(),
268                error: "boom".to_string(),
269                timestamp_ms: 3,
270            },
271        ];
272        for e in cases {
273            let s = serde_json::to_string(&e).unwrap();
274            let back: FlowExecutionEvent = serde_json::from_str(&s).unwrap();
275            assert_eq!(back, e, "round-trip MUST preserve every variant");
276        }
277    }
278
279    #[test]
280    fn is_terminator_predicate_is_total() {
281        assert!(!ev_flow_start().is_terminator());
282        assert!(!ev_step_token().is_terminator());
283        assert!(!FlowExecutionEvent::StepStart {
284            step_name: "S".to_string(),
285            step_index: 0,
286            step_type: "step".to_string(),
287            timestamp_ms: 0,
288        }
289        .is_terminator());
290        assert!(!FlowExecutionEvent::StepComplete {
291            step_name: "S".to_string(),
292            step_index: 0,
293            success: true,
294            full_output: "".to_string(),
295            tokens_input: 0,
296            tokens_output: 0,
297            timestamp_ms: 0,
298        }
299        .is_terminator());
300        assert!(ev_flow_complete().is_terminator());
301        assert!(FlowExecutionEvent::FlowError {
302            flow_name: "F".to_string(),
303            error: "x".to_string(),
304            timestamp_ms: 0,
305        }
306        .is_terminator());
307    }
308
309    #[test]
310    fn is_step_scoped_predicate_is_total() {
311        assert!(!ev_flow_start().is_step_scoped());
312        assert!(ev_step_token().is_step_scoped());
313        assert!(FlowExecutionEvent::StepStart {
314            step_name: "S".to_string(),
315            step_index: 0,
316            step_type: "step".to_string(),
317            timestamp_ms: 0,
318        }
319        .is_step_scoped());
320        assert!(FlowExecutionEvent::StepComplete {
321            step_name: "S".to_string(),
322            step_index: 0,
323            success: true,
324            full_output: "".to_string(),
325            tokens_input: 0,
326            tokens_output: 0,
327            timestamp_ms: 0,
328        }
329        .is_step_scoped());
330        assert!(!ev_flow_complete().is_step_scoped());
331        assert!(!FlowExecutionEvent::FlowError {
332            flow_name: "F".to_string(),
333            error: "x".to_string(),
334            timestamp_ms: 0,
335        }
336        .is_step_scoped());
337    }
338
339    #[test]
340    fn kind_strings_match_serde_rename() {
341        assert_eq!(ev_flow_start().kind(), "flow_start");
342        assert_eq!(ev_step_token().kind(), "step_token");
343        assert_eq!(ev_flow_complete().kind(), "flow_complete");
344        assert_eq!(
345            FlowExecutionEvent::StepStart {
346                step_name: "S".to_string(),
347                step_index: 0,
348                step_type: "".to_string(),
349                timestamp_ms: 0,
350            }
351            .kind(),
352            "step_start"
353        );
354        assert_eq!(
355            FlowExecutionEvent::StepComplete {
356                step_name: "S".to_string(),
357                step_index: 0,
358                success: true,
359                full_output: "".to_string(),
360                tokens_input: 0,
361                tokens_output: 0,
362                timestamp_ms: 0,
363            }
364            .kind(),
365            "step_complete"
366        );
367        assert_eq!(
368            FlowExecutionEvent::FlowError {
369                flow_name: "F".to_string(),
370                error: "x".to_string(),
371                timestamp_ms: 0,
372            }
373            .kind(),
374            "flow_error"
375        );
376    }
377}