Skip to main content

axon/wire_format/
axon_dialect.rs

1//! §Fase 33.z.k.d (v1.28.0) — `axon` W3C named-events dialect adapter.
2//!
3//! This adapter is the **D6 backwards-compat baseline** — its wire
4//! output is byte-identical to the v1.27.1 inline emission helpers
5//! `build_token_event` / `build_complete_event` / `build_tool_call_event`
6//! / `build_error_event` in `axon_server.rs`.
7//!
8//! # Wire shape (event grammar)
9//!
10//! ```text
11//! event: axon.start
12//! id: 0
13//! data: {"trace_id": N, "flow": "...", "backend": "...", "timestamp_ms": N}
14//!
15//! event: axon.token
16//! id: 1
17//! data: {"step": "...", "trace_id": N, "token": "...", "timestamp_ms": N}
18//!
19//! event: axon.tool_call
20//! id: 2
21//! data: {"step": "...", "trace_id": N, "tool_name": "...", "content": "...", "timestamp_ms": N}
22//!
23//! event: axon.complete
24//! id: 3
25//! data: {"trace_id": N, "flow": "...", "backend": "...", "steps_executed": N,
26//!        "tokens_input": N, "tokens_output": N, "latency_ms": N, "success": bool,
27//!        "stream_policies": [...], "enforcement_summary": {...}, "warnings": [...]}
28//! ```
29//!
30//! The `axon.start` event is the structural marker the v1.27.1 wire
31//! does NOT emit (it relies on FlowStart being silently consumed at
32//! the producer). For 33.z.k.d we PRESERVE that behavior — FlowStart
33//! returns an empty `Vec<Event>` from `translate()` so the wire body
34//! stays byte-identical with v1.27.1 stub-mode output (1 axon.token +
35//! 1 axon.complete).
36//!
37//! # Stateful tracking
38//!
39//! - `event_id_counter` — monotonic event ID starting at 1; matches
40//!   the v1.27.1 producer's counter.
41//! - `trace_id` — request-scoped UUID (reserved upstream).
42//! - `terminal_emitted` — flag set when FlowComplete / FlowError
43//!   has been translated; `flush_terminator()` no-ops once true
44//!   (axon dialect emits the terminator IN-LINE with FlowComplete).
45
46use super::{CompleteEnvelope, WireFormatAdapter};
47use crate::flow_execution_event::FlowExecutionEvent;
48use axum::response::sse::Event;
49
50/// The axon W3C named-events adapter.
51pub struct AxonDialectAdapter {
52    event_id_counter: u64,
53    trace_id: u64,
54    terminal_emitted: bool,
55}
56
57impl AxonDialectAdapter {
58    /// Construct a fresh adapter for a request. `trace_id` is the
59    /// request-scoped UUID reserved by the producer (Fase 33.x.c).
60    /// Event ID counter starts at 1 to match v1.27.1's producer
61    /// semantics (id=0 reserved for the silently-elided start frame).
62    pub fn new(trace_id: u64) -> Self {
63        Self {
64            event_id_counter: 1,
65            trace_id,
66            terminal_emitted: false,
67        }
68    }
69
70    fn next_id(&mut self) -> u64 {
71        let id = self.event_id_counter;
72        self.event_id_counter += 1;
73        id
74    }
75
76    /// Build an `axon.token` SSE event byte-identical to v1.27.1's
77    /// inline `build_token_event` helper.
78    fn build_token_event(&mut self, step_name: &str, token: &str, timestamp_ms: i64) -> Event {
79        let data = serde_json::json!({
80            "step": step_name,
81            "trace_id": self.trace_id,
82            "token": token,
83            "timestamp_ms": timestamp_ms,
84        });
85        let event_id = self.next_id();
86        Event::default()
87            .event("axon.token")
88            .id(event_id.to_string())
89            .data(serde_json::to_string(&data).unwrap_or_default())
90    }
91
92    /// Build an `axon.tool_call` SSE event byte-identical to v1.27.1's
93    /// inline `build_tool_call_event` helper.
94    fn build_tool_call_event(
95        &mut self,
96        step_name: &str,
97        tool_name: &str,
98        content: &str,
99        timestamp_ms: u64,
100    ) -> Event {
101        let data = serde_json::json!({
102            "step": step_name,
103            "trace_id": self.trace_id,
104            "tool_name": tool_name,
105            "content": content,
106            "timestamp_ms": timestamp_ms,
107        });
108        let event_id = self.next_id();
109        Event::default()
110            .event("axon.tool_call")
111            .id(event_id.to_string())
112            .data(serde_json::to_string(&data).unwrap_or_default())
113    }
114
115    /// Build an `axon.error` SSE event byte-identical to v1.27.1's
116    /// inline `build_error_event` helper.
117    fn build_error_event(&mut self, error_msg: &str) -> Event {
118        let data = serde_json::json!({
119            "trace_id": self.trace_id,
120            "error": error_msg,
121            "recoverable": false,
122        });
123        let event_id = self.next_id();
124        Event::default()
125            .event("axon.error")
126            .id(event_id.to_string())
127            .data(serde_json::to_string(&data).unwrap_or_default())
128    }
129
130    /// Build an `axon.complete` SSE event. Carries the
131    /// success/steps/tokens/latency envelope plus the optional
132    /// algebraic-policy side-channels (stream_policies / enforcement_summary
133    /// / warnings) when non-empty (D4 byte-compat: empty fields elided).
134    ///
135    /// NOTE: 33.z.k.d ships this stub that emits a minimal
136    /// envelope from the FlowExecutionEvent's data. The full
137    /// algebraic-policy population (33.z.k.h) is handled by the
138    /// producer-side wiring; this method receives the data via
139    /// the FlowComplete event's fields verbatim.
140    fn build_complete_event(
141        &mut self,
142        flow_name: &str,
143        backend: &str,
144        success: bool,
145        steps_executed: u64,
146        tokens_input: u64,
147        tokens_output: u64,
148        latency_ms: u64,
149    ) -> Event {
150        let data = serde_json::json!({
151            "trace_id": self.trace_id,
152            "flow": flow_name,
153            "backend": backend,
154            "steps_executed": steps_executed,
155            "tokens_input": tokens_input,
156            "tokens_output": tokens_output,
157            "latency_ms": latency_ms,
158            "success": success,
159        });
160        let event_id = self.next_id();
161        Event::default()
162            .event("axon.complete")
163            .id(event_id.to_string())
164            .data(serde_json::to_string(&data).unwrap_or_default())
165    }
166}
167
168impl AxonDialectAdapter {
169    /// §Fase 33.z.k.g — Build the full `axon.complete` event with
170    /// all the v1.27.1 envelope fields including the optional
171    /// algebraic-policy side-channels (stream_policies,
172    /// enforcement_summary, warnings). Each side-channel is elided
173    /// from the JSON when empty (D4 byte-compat with v1.27.1 inline
174    /// `build_complete_event` helper).
175    fn build_full_complete_event(&mut self, envelope: &CompleteEnvelope) -> Event {
176        let mut data = serde_json::json!({
177            "trace_id": envelope.trace_id,
178            "flow": envelope.flow_name,
179            "backend": envelope.backend,
180            "steps_executed": envelope.steps_executed,
181            "tokens_input": envelope.tokens_input,
182            "tokens_output": envelope.tokens_output,
183            "latency_ms": envelope.latency_ms,
184            "success": envelope.success,
185        });
186        // §Fase 33.e — stream_policies array, elided when empty.
187        if !envelope.effect_policies.is_empty() {
188            let arr = envelope
189                .effect_policies
190                .iter()
191                .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
192                .collect::<Vec<_>>();
193            data.as_object_mut()
194                .expect("json object")
195                .insert("stream_policies".to_string(), serde_json::Value::Array(arr));
196        }
197        // §Fase 33.x.d — enforcement_summary, elided when empty.
198        if !envelope.enforcement_summaries.is_empty() {
199            let mut obj = serde_json::Map::new();
200            for (step, summary) in &envelope.enforcement_summaries {
201                obj.insert(
202                    step.clone(),
203                    serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
204                );
205            }
206            data.as_object_mut().expect("json object").insert(
207                "enforcement_summary".to_string(),
208                serde_json::Value::Object(obj),
209            );
210        }
211        // §Fase 33.x.g — runtime_warnings, elided when empty.
212        if !envelope.runtime_warnings.is_empty() {
213            let arr = envelope
214                .runtime_warnings
215                .iter()
216                .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
217                .collect::<Vec<_>>();
218            data.as_object_mut()
219                .expect("json object")
220                .insert("warnings".to_string(), serde_json::Value::Array(arr));
221        }
222        // §Fase 55.b — epistemic_envelopes array, elided when empty. The
223        // key + element shape ({base, scope, confidence}) match the sync
224        // `FlowEnvelope.epistemic_envelopes` byte-for-byte (§55.c parity).
225        if !envelope.epistemic_envelopes.is_empty() {
226            let arr = envelope
227                .epistemic_envelopes
228                .iter()
229                .map(|e| serde_json::to_value(e).unwrap_or(serde_json::Value::Null))
230                .collect::<Vec<_>>();
231            data.as_object_mut().expect("json object").insert(
232                "epistemic_envelopes".to_string(),
233                serde_json::Value::Array(arr),
234            );
235        }
236        let event_id = self.next_id();
237        Event::default()
238            .event("axon.complete")
239            .id(event_id.to_string())
240            .data(serde_json::to_string(&data).unwrap_or_default())
241    }
242}
243
244impl WireFormatAdapter for AxonDialectAdapter {
245    fn dialect(&self) -> &'static str {
246        "axon"
247    }
248
249    fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
250        self.terminal_emitted = true;
251        vec![self.build_full_complete_event(envelope)]
252    }
253
254    fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
255        match event {
256            // FlowStart is silently consumed in v1.27.1 — preserve
257            // for D6 byte-compat.
258            FlowExecutionEvent::FlowStart { .. } => Vec::new(),
259            // StepStart is silently consumed in v1.27.1 — preserve
260            // for D6 byte-compat.
261            FlowExecutionEvent::StepStart { .. } => Vec::new(),
262            FlowExecutionEvent::StepToken {
263                step_name,
264                content,
265                timestamp_ms,
266                ..
267            } => vec![self.build_token_event(step_name, content, *timestamp_ms as i64)],
268            // StepComplete is silently consumed in v1.27.1 — preserve.
269            FlowExecutionEvent::StepComplete { .. } => Vec::new(),
270            FlowExecutionEvent::ToolCall {
271                step_name,
272                tool_name,
273                content,
274                timestamp_ms,
275            } => vec![self.build_tool_call_event(
276                step_name,
277                tool_name,
278                content,
279                *timestamp_ms,
280            )],
281            FlowExecutionEvent::FlowComplete {
282                flow_name,
283                backend,
284                success,
285                steps_executed,
286                tokens_input,
287                tokens_output,
288                latency_ms,
289                ..
290            } => {
291                self.terminal_emitted = true;
292                vec![self.build_complete_event(
293                    flow_name,
294                    backend,
295                    *success,
296                    *steps_executed as u64,
297                    *tokens_input as u64,
298                    *tokens_output as u64,
299                    *latency_ms,
300                )]
301            }
302            FlowExecutionEvent::FlowError { error, .. } => {
303                self.terminal_emitted = true;
304                vec![self.build_error_event(error)]
305            }
306        }
307    }
308
309    fn flush_terminator(&mut self) -> Vec<Event> {
310        // axon dialect emits its terminator IN-LINE with the
311        // FlowComplete / FlowError translation. No additional
312        // terminator frame.
313        Vec::new()
314    }
315}