axon/flow_execution_event.rs
1//! §Fase 33.b — Flow execution event stream (Layer 1: data-flow integrity).
2//!
3//! D2 ratificada: `ServerExecutionResult` is replaced by an event-stream
4//! return type — the runner emits each event AS IT OCCURS, the SSE
5//! handler consumes the stream and forwards directly to the wire.
6//!
7//! ## The closed event-shape catalog
8//!
9//! Every observable moment in a flow's execution is one of five
10//! events. The catalog is **closed** — adding a new variant requires a
11//! D-letter amendment, not a runtime-only patch. Cross-stack drift gate
12//! locks the JSON shape so Python + Rust agree on every field byte-for-
13//! byte (`tests/fixtures/fase33_flow_execution_event/corpus.json`).
14//!
15//! - **FlowStart** — emitted once before any step. Establishes the
16//! trace identity + chosen backend.
17//! - **StepStart** — emitted once per step at its boundary. Carries the
18//! step's source-declared `step_type` so adopters can correlate the
19//! wire event back to the AST.
20//! - **StepToken** — emitted per chunk produced by the step's
21//! underlying backend. For streaming backends (Anthropic SSE,
22//! OpenAI SSE, …) this fires per chunk AS THE BYTE ARRIVES.
23//! For non-streaming backends (stub, future deterministic-only),
24//! this fires once with the full step output (post-completion).
25//! - **StepComplete** — emitted once per step at its end boundary.
26//! Carries the full output text + token-input/output counters.
27//! - **FlowComplete** — terminator (success path). Receiver MUST
28//! treat this as the stream's end.
29//! - **FlowError** — terminator (failure path). Receiver MUST treat
30//! this as the stream's end.
31//!
32//! ## Pillar trace per D2 + D10
33//!
34//! - **MATHEMATICS** — the catalog is a closed sum type; pattern matching
35//! is exhaustive. Adding a sixth variant breaks the build cross-stack.
36//! - **LOGIC** — the receiver invariant is precise: exactly one
37//! `FlowStart`, followed by per-step (`StepStart` → 0..N
38//! `StepToken` → `StepComplete`), followed by exactly one
39//! `FlowComplete` OR `FlowError`. Any sequence violating this
40//! invariant is a producer bug, not a consumer concern.
41//! - **PHILOSOPHY** — the source declaration IS the runtime contract:
42//! every `step S { ... }` declaration produces a `StepStart` /
43//! `StepComplete` pair at runtime, named identically.
44//! - **COMPUTING** — events are JSON-serializable + clonable + the
45//! stream is a `tokio::sync::mpsc::UnboundedReceiver`; the
46//! producer never blocks the executor on a slow consumer (33.b
47//! layer; backpressure policy from `<stream:<policy>>` is honored
48//! in 33.e).
49
50use serde::{Deserialize, Serialize};
51
52/// One observable moment in a flow's execution. Closed catalog per D2.
53///
54/// Field naming + JSON serde-rename match the Python mirror
55/// (`axon/runtime/flow_execution_event.py`) byte-for-byte. The drift
56/// gate at `tests/fixtures/fase33_flow_execution_event/corpus.json`
57/// asserts both stacks produce byte-identical JSON for each variant.
58#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
59#[serde(tag = "kind", rename_all = "snake_case")]
60pub enum FlowExecutionEvent {
61 /// Emitted exactly once at the very start of execution.
62 FlowStart {
63 flow_name: String,
64 backend: String,
65 timestamp_ms: u64,
66 },
67 /// Emitted exactly once per step at its start boundary.
68 StepStart {
69 step_name: String,
70 step_index: usize,
71 step_type: String,
72 timestamp_ms: u64,
73 },
74 /// Emitted per token / chunk produced by the step's underlying
75 /// backend. The granularity matches the backend's chunk size
76 /// (Anthropic SSE delta, OpenAI streaming chunk, etc.). For
77 /// non-streaming backends, fires once per step with the full
78 /// output (post-StepComplete in practice; the catalog allows it
79 /// to fire either before or after but the convention is during
80 /// execution).
81 StepToken {
82 step_name: String,
83 content: String,
84 /// Monotonic counter, per-flow. Restarts at 1 for each new
85 /// FlowStart. Adopter clients use this to correlate
86 /// `Last-Event-ID` resumes (W3C SSE spec).
87 token_index: u64,
88 timestamp_ms: u64,
89 },
90 /// Emitted exactly once per step at its end boundary.
91 StepComplete {
92 step_name: String,
93 step_index: usize,
94 success: bool,
95 full_output: String,
96 tokens_input: u64,
97 tokens_output: u64,
98 timestamp_ms: u64,
99 },
100 /// §Fase 33.y.k — Tool invocation chunk. Emitted by per-step
101 /// handlers when the upstream backend's chunk stream signals
102 /// `FinishReason::ToolUse` (provider invoked a tool mid-stream).
103 /// Closed-catalog event variant; D4 byte-compat preserves
104 /// adopter parsers — flows without declared `apply: <tool>`
105 /// never emit this event.
106 ///
107 /// `content` carries the tool-call's structured payload as a
108 /// canonical wire-stable string (provider-specific shape today;
109 /// future Fase 33.y.k.2 standardizes per-provider extraction
110 /// into a unified `tool_call_id + arguments` schema).
111 ToolCall {
112 step_name: String,
113 tool_name: String,
114 content: String,
115 timestamp_ms: u64,
116 },
117 /// Terminator — success path. Receiver MUST close the stream.
118 FlowComplete {
119 flow_name: String,
120 backend: String,
121 success: bool,
122 steps_executed: usize,
123 tokens_input: u64,
124 tokens_output: u64,
125 latency_ms: u64,
126 timestamp_ms: u64,
127 },
128 /// Terminator — failure path. Receiver MUST close the stream.
129 FlowError {
130 flow_name: String,
131 error: String,
132 timestamp_ms: u64,
133 },
134}
135
136impl FlowExecutionEvent {
137 /// Closed predicate: is this the terminator of the stream? After
138 /// emitting a terminator, the producer MUST drop the sender so
139 /// the receiver's `recv()` returns `None`.
140 pub fn is_terminator(&self) -> bool {
141 matches!(
142 self,
143 FlowExecutionEvent::FlowComplete { .. } | FlowExecutionEvent::FlowError { .. }
144 )
145 }
146
147 /// Closed predicate: is this event step-scoped (carries
148 /// `step_name`)?
149 pub fn is_step_scoped(&self) -> bool {
150 matches!(
151 self,
152 FlowExecutionEvent::StepStart { .. }
153 | FlowExecutionEvent::StepToken { .. }
154 | FlowExecutionEvent::StepComplete { .. }
155 | FlowExecutionEvent::ToolCall { .. }
156 )
157 }
158
159 /// String kind for diagnostic / log lines. Matches the JSON
160 /// `kind` discriminator field (`snake_case` per serde-rename).
161 pub fn kind(&self) -> &'static str {
162 match self {
163 FlowExecutionEvent::FlowStart { .. } => "flow_start",
164 FlowExecutionEvent::StepStart { .. } => "step_start",
165 FlowExecutionEvent::StepToken { .. } => "step_token",
166 FlowExecutionEvent::StepComplete { .. } => "step_complete",
167 FlowExecutionEvent::ToolCall { .. } => "tool_call",
168 FlowExecutionEvent::FlowComplete { .. } => "flow_complete",
169 FlowExecutionEvent::FlowError { .. } => "flow_error",
170 }
171 }
172}
173
174/// Current Unix-milliseconds timestamp. Helper used by producers
175/// emitting events.
176pub fn now_ms() -> u64 {
177 std::time::SystemTime::now()
178 .duration_since(std::time::UNIX_EPOCH)
179 .map(|d| d.as_millis() as u64)
180 .unwrap_or(0)
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 fn ev_flow_start() -> FlowExecutionEvent {
188 FlowExecutionEvent::FlowStart {
189 flow_name: "F".to_string(),
190 backend: "stub".to_string(),
191 timestamp_ms: 1_000_000,
192 }
193 }
194
195 fn ev_step_token() -> FlowExecutionEvent {
196 FlowExecutionEvent::StepToken {
197 step_name: "S".to_string(),
198 content: "hello".to_string(),
199 token_index: 1,
200 timestamp_ms: 1_000_001,
201 }
202 }
203
204 fn ev_flow_complete() -> FlowExecutionEvent {
205 FlowExecutionEvent::FlowComplete {
206 flow_name: "F".to_string(),
207 backend: "stub".to_string(),
208 success: true,
209 steps_executed: 1,
210 tokens_input: 0,
211 tokens_output: 1,
212 latency_ms: 50,
213 timestamp_ms: 1_000_010,
214 }
215 }
216
217 #[test]
218 fn flow_start_serializes_with_kind_discriminator() {
219 let s = serde_json::to_string(&ev_flow_start()).unwrap();
220 // Externally tagged enum with kind = "flow_start"; field order
221 // matches Python mirror.
222 assert!(s.contains(r#""kind":"flow_start""#));
223 assert!(s.contains(r#""flow_name":"F""#));
224 assert!(s.contains(r#""backend":"stub""#));
225 assert!(s.contains(r#""timestamp_ms":1000000"#));
226 }
227
228 #[test]
229 fn step_token_serializes_with_token_index() {
230 let s = serde_json::to_string(&ev_step_token()).unwrap();
231 assert!(s.contains(r#""kind":"step_token""#));
232 assert!(s.contains(r#""token_index":1"#));
233 assert!(s.contains(r#""content":"hello""#));
234 }
235
236 #[test]
237 fn flow_complete_serializes_with_latency_ms() {
238 let s = serde_json::to_string(&ev_flow_complete()).unwrap();
239 assert!(s.contains(r#""kind":"flow_complete""#));
240 assert!(s.contains(r#""steps_executed":1"#));
241 assert!(s.contains(r#""latency_ms":50"#));
242 assert!(s.contains(r#""success":true"#));
243 }
244
245 #[test]
246 fn round_trip_through_json_preserves_every_variant() {
247 let cases = vec![
248 ev_flow_start(),
249 FlowExecutionEvent::StepStart {
250 step_name: "S".to_string(),
251 step_index: 0,
252 step_type: "step".to_string(),
253 timestamp_ms: 1,
254 },
255 ev_step_token(),
256 FlowExecutionEvent::StepComplete {
257 step_name: "S".to_string(),
258 step_index: 0,
259 success: true,
260 full_output: "hello world".to_string(),
261 tokens_input: 0,
262 tokens_output: 2,
263 timestamp_ms: 2,
264 },
265 ev_flow_complete(),
266 FlowExecutionEvent::FlowError {
267 flow_name: "F".to_string(),
268 error: "boom".to_string(),
269 timestamp_ms: 3,
270 },
271 ];
272 for e in cases {
273 let s = serde_json::to_string(&e).unwrap();
274 let back: FlowExecutionEvent = serde_json::from_str(&s).unwrap();
275 assert_eq!(back, e, "round-trip MUST preserve every variant");
276 }
277 }
278
279 #[test]
280 fn is_terminator_predicate_is_total() {
281 assert!(!ev_flow_start().is_terminator());
282 assert!(!ev_step_token().is_terminator());
283 assert!(!FlowExecutionEvent::StepStart {
284 step_name: "S".to_string(),
285 step_index: 0,
286 step_type: "step".to_string(),
287 timestamp_ms: 0,
288 }
289 .is_terminator());
290 assert!(!FlowExecutionEvent::StepComplete {
291 step_name: "S".to_string(),
292 step_index: 0,
293 success: true,
294 full_output: "".to_string(),
295 tokens_input: 0,
296 tokens_output: 0,
297 timestamp_ms: 0,
298 }
299 .is_terminator());
300 assert!(ev_flow_complete().is_terminator());
301 assert!(FlowExecutionEvent::FlowError {
302 flow_name: "F".to_string(),
303 error: "x".to_string(),
304 timestamp_ms: 0,
305 }
306 .is_terminator());
307 }
308
309 #[test]
310 fn is_step_scoped_predicate_is_total() {
311 assert!(!ev_flow_start().is_step_scoped());
312 assert!(ev_step_token().is_step_scoped());
313 assert!(FlowExecutionEvent::StepStart {
314 step_name: "S".to_string(),
315 step_index: 0,
316 step_type: "step".to_string(),
317 timestamp_ms: 0,
318 }
319 .is_step_scoped());
320 assert!(FlowExecutionEvent::StepComplete {
321 step_name: "S".to_string(),
322 step_index: 0,
323 success: true,
324 full_output: "".to_string(),
325 tokens_input: 0,
326 tokens_output: 0,
327 timestamp_ms: 0,
328 }
329 .is_step_scoped());
330 assert!(!ev_flow_complete().is_step_scoped());
331 assert!(!FlowExecutionEvent::FlowError {
332 flow_name: "F".to_string(),
333 error: "x".to_string(),
334 timestamp_ms: 0,
335 }
336 .is_step_scoped());
337 }
338
339 #[test]
340 fn kind_strings_match_serde_rename() {
341 assert_eq!(ev_flow_start().kind(), "flow_start");
342 assert_eq!(ev_step_token().kind(), "step_token");
343 assert_eq!(ev_flow_complete().kind(), "flow_complete");
344 assert_eq!(
345 FlowExecutionEvent::StepStart {
346 step_name: "S".to_string(),
347 step_index: 0,
348 step_type: "".to_string(),
349 timestamp_ms: 0,
350 }
351 .kind(),
352 "step_start"
353 );
354 assert_eq!(
355 FlowExecutionEvent::StepComplete {
356 step_name: "S".to_string(),
357 step_index: 0,
358 success: true,
359 full_output: "".to_string(),
360 tokens_input: 0,
361 tokens_output: 0,
362 timestamp_ms: 0,
363 }
364 .kind(),
365 "step_complete"
366 );
367 assert_eq!(
368 FlowExecutionEvent::FlowError {
369 flow_name: "F".to_string(),
370 error: "x".to_string(),
371 timestamp_ms: 0,
372 }
373 .kind(),
374 "flow_error"
375 );
376 }
377}