axon/wire_format/axon_dialect.rs
1//! §Fase 33.z.k.d (v1.28.0) — `axon` W3C named-events dialect adapter.
2//!
3//! This adapter is the **D6 backwards-compat baseline** — its wire
4//! output is byte-identical to the v1.27.1 inline emission helpers
5//! `build_token_event` / `build_complete_event` / `build_tool_call_event`
6//! / `build_error_event` in `axon_server.rs`.
7//!
8//! # Wire shape (event grammar)
9//!
10//! ```text
11//! event: axon.start
12//! id: 0
13//! data: {"trace_id": N, "flow": "...", "backend": "...", "timestamp_ms": N}
14//!
15//! event: axon.token
16//! id: 1
17//! data: {"step": "...", "trace_id": N, "token": "...", "timestamp_ms": N}
18//!
19//! event: axon.tool_call
20//! id: 2
21//! data: {"step": "...", "trace_id": N, "tool_name": "...", "content": "...", "timestamp_ms": N}
22//!
23//! event: axon.complete
24//! id: 3
25//! data: {"trace_id": N, "flow": "...", "backend": "...", "steps_executed": N,
26//! "tokens_input": N, "tokens_output": N, "latency_ms": N, "success": bool,
27//! "stream_policies": [...], "enforcement_summary": {...}, "warnings": [...]}
28//! ```
29//!
30//! The `axon.start` event is the structural marker the v1.27.1 wire
31//! does NOT emit (it relies on FlowStart being silently consumed at
32//! the producer). For 33.z.k.d we PRESERVE that behavior — FlowStart
33//! returns an empty `Vec<Event>` from `translate()` so the wire body
34//! stays byte-identical with v1.27.1 stub-mode output (1 axon.token +
35//! 1 axon.complete).
36//!
37//! # Stateful tracking
38//!
39//! - `event_id_counter` — monotonic event ID starting at 1; matches
40//! the v1.27.1 producer's counter.
41//! - `trace_id` — request-scoped UUID (reserved upstream).
42//! - `terminal_emitted` — flag set when FlowComplete / FlowError
43//! has been translated; `flush_terminator()` no-ops once true
44//! (axon dialect emits the terminator IN-LINE with FlowComplete).
45
46use super::{CompleteEnvelope, WireFormatAdapter};
47use crate::flow_execution_event::FlowExecutionEvent;
48use axum::response::sse::Event;
49
50/// The axon W3C named-events adapter.
51pub struct AxonDialectAdapter {
52 event_id_counter: u64,
53 trace_id: u64,
54 terminal_emitted: bool,
55}
56
57impl AxonDialectAdapter {
58 /// Construct a fresh adapter for a request. `trace_id` is the
59 /// request-scoped UUID reserved by the producer (Fase 33.x.c).
60 /// Event ID counter starts at 1 to match v1.27.1's producer
61 /// semantics (id=0 reserved for the silently-elided start frame).
62 pub fn new(trace_id: u64) -> Self {
63 Self {
64 event_id_counter: 1,
65 trace_id,
66 terminal_emitted: false,
67 }
68 }
69
70 fn next_id(&mut self) -> u64 {
71 let id = self.event_id_counter;
72 self.event_id_counter += 1;
73 id
74 }
75
76 /// Build an `axon.token` SSE event byte-identical to v1.27.1's
77 /// inline `build_token_event` helper.
78 fn build_token_event(&mut self, step_name: &str, token: &str, timestamp_ms: i64) -> Event {
79 let data = serde_json::json!({
80 "step": step_name,
81 "trace_id": self.trace_id,
82 "token": token,
83 "timestamp_ms": timestamp_ms,
84 });
85 let event_id = self.next_id();
86 Event::default()
87 .event("axon.token")
88 .id(event_id.to_string())
89 .data(serde_json::to_string(&data).unwrap_or_default())
90 }
91
92 /// Build an `axon.tool_call` SSE event byte-identical to v1.27.1's
93 /// inline `build_tool_call_event` helper.
94 fn build_tool_call_event(
95 &mut self,
96 step_name: &str,
97 tool_name: &str,
98 content: &str,
99 timestamp_ms: u64,
100 ) -> Event {
101 let data = serde_json::json!({
102 "step": step_name,
103 "trace_id": self.trace_id,
104 "tool_name": tool_name,
105 "content": content,
106 "timestamp_ms": timestamp_ms,
107 });
108 let event_id = self.next_id();
109 Event::default()
110 .event("axon.tool_call")
111 .id(event_id.to_string())
112 .data(serde_json::to_string(&data).unwrap_or_default())
113 }
114
115 /// Build an `axon.error` SSE event byte-identical to v1.27.1's
116 /// inline `build_error_event` helper.
117 fn build_error_event(&mut self, error_msg: &str) -> Event {
118 let data = serde_json::json!({
119 "trace_id": self.trace_id,
120 "error": error_msg,
121 "recoverable": false,
122 });
123 let event_id = self.next_id();
124 Event::default()
125 .event("axon.error")
126 .id(event_id.to_string())
127 .data(serde_json::to_string(&data).unwrap_or_default())
128 }
129
130 /// Build an `axon.complete` SSE event. Carries the
131 /// success/steps/tokens/latency envelope plus the optional
132 /// algebraic-policy side-channels (stream_policies / enforcement_summary
133 /// / warnings) when non-empty (D4 byte-compat: empty fields elided).
134 ///
135 /// NOTE: 33.z.k.d ships this stub that emits a minimal
136 /// envelope from the FlowExecutionEvent's data. The full
137 /// algebraic-policy population (33.z.k.h) is handled by the
138 /// producer-side wiring; this method receives the data via
139 /// the FlowComplete event's fields verbatim.
140 fn build_complete_event(
141 &mut self,
142 flow_name: &str,
143 backend: &str,
144 success: bool,
145 steps_executed: u64,
146 tokens_input: u64,
147 tokens_output: u64,
148 latency_ms: u64,
149 ) -> Event {
150 let data = serde_json::json!({
151 "trace_id": self.trace_id,
152 "flow": flow_name,
153 "backend": backend,
154 "steps_executed": steps_executed,
155 "tokens_input": tokens_input,
156 "tokens_output": tokens_output,
157 "latency_ms": latency_ms,
158 "success": success,
159 });
160 let event_id = self.next_id();
161 Event::default()
162 .event("axon.complete")
163 .id(event_id.to_string())
164 .data(serde_json::to_string(&data).unwrap_or_default())
165 }
166}
167
168impl AxonDialectAdapter {
169 /// §Fase 33.z.k.g — Build the full `axon.complete` event with
170 /// all the v1.27.1 envelope fields including the optional
171 /// algebraic-policy side-channels (stream_policies,
172 /// enforcement_summary, warnings). Each side-channel is elided
173 /// from the JSON when empty (D4 byte-compat with v1.27.1 inline
174 /// `build_complete_event` helper).
175 fn build_full_complete_event(&mut self, envelope: &CompleteEnvelope) -> Event {
176 let mut data = serde_json::json!({
177 "trace_id": envelope.trace_id,
178 "flow": envelope.flow_name,
179 "backend": envelope.backend,
180 "steps_executed": envelope.steps_executed,
181 "tokens_input": envelope.tokens_input,
182 "tokens_output": envelope.tokens_output,
183 "latency_ms": envelope.latency_ms,
184 "success": envelope.success,
185 });
186 // §Fase 33.e — stream_policies array, elided when empty.
187 if !envelope.effect_policies.is_empty() {
188 let arr = envelope
189 .effect_policies
190 .iter()
191 .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
192 .collect::<Vec<_>>();
193 data.as_object_mut()
194 .expect("json object")
195 .insert("stream_policies".to_string(), serde_json::Value::Array(arr));
196 }
197 // §Fase 33.x.d — enforcement_summary, elided when empty.
198 if !envelope.enforcement_summaries.is_empty() {
199 let mut obj = serde_json::Map::new();
200 for (step, summary) in &envelope.enforcement_summaries {
201 obj.insert(
202 step.clone(),
203 serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
204 );
205 }
206 data.as_object_mut().expect("json object").insert(
207 "enforcement_summary".to_string(),
208 serde_json::Value::Object(obj),
209 );
210 }
211 // §Fase 33.x.g — runtime_warnings, elided when empty.
212 if !envelope.runtime_warnings.is_empty() {
213 let arr = envelope
214 .runtime_warnings
215 .iter()
216 .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
217 .collect::<Vec<_>>();
218 data.as_object_mut()
219 .expect("json object")
220 .insert("warnings".to_string(), serde_json::Value::Array(arr));
221 }
222 // §Fase 55.b — epistemic_envelopes array, elided when empty. The
223 // key + element shape ({base, scope, confidence}) match the sync
224 // `FlowEnvelope.epistemic_envelopes` byte-for-byte (§55.c parity).
225 if !envelope.epistemic_envelopes.is_empty() {
226 let arr = envelope
227 .epistemic_envelopes
228 .iter()
229 .map(|e| serde_json::to_value(e).unwrap_or(serde_json::Value::Null))
230 .collect::<Vec<_>>();
231 data.as_object_mut().expect("json object").insert(
232 "epistemic_envelopes".to_string(),
233 serde_json::Value::Array(arr),
234 );
235 }
236 let event_id = self.next_id();
237 Event::default()
238 .event("axon.complete")
239 .id(event_id.to_string())
240 .data(serde_json::to_string(&data).unwrap_or_default())
241 }
242}
243
244impl WireFormatAdapter for AxonDialectAdapter {
245 fn dialect(&self) -> &'static str {
246 "axon"
247 }
248
249 fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
250 self.terminal_emitted = true;
251 vec![self.build_full_complete_event(envelope)]
252 }
253
254 fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
255 match event {
256 // FlowStart is silently consumed in v1.27.1 — preserve
257 // for D6 byte-compat.
258 FlowExecutionEvent::FlowStart { .. } => Vec::new(),
259 // StepStart is silently consumed in v1.27.1 — preserve
260 // for D6 byte-compat.
261 FlowExecutionEvent::StepStart { .. } => Vec::new(),
262 FlowExecutionEvent::StepToken {
263 step_name,
264 content,
265 timestamp_ms,
266 ..
267 } => vec![self.build_token_event(step_name, content, *timestamp_ms as i64)],
268 // StepComplete is silently consumed in v1.27.1 — preserve.
269 FlowExecutionEvent::StepComplete { .. } => Vec::new(),
270 FlowExecutionEvent::ToolCall {
271 step_name,
272 tool_name,
273 content,
274 timestamp_ms,
275 } => vec![self.build_tool_call_event(
276 step_name,
277 tool_name,
278 content,
279 *timestamp_ms,
280 )],
281 FlowExecutionEvent::FlowComplete {
282 flow_name,
283 backend,
284 success,
285 steps_executed,
286 tokens_input,
287 tokens_output,
288 latency_ms,
289 ..
290 } => {
291 self.terminal_emitted = true;
292 vec![self.build_complete_event(
293 flow_name,
294 backend,
295 *success,
296 *steps_executed as u64,
297 *tokens_input as u64,
298 *tokens_output as u64,
299 *latency_ms,
300 )]
301 }
302 FlowExecutionEvent::FlowError { error, .. } => {
303 self.terminal_emitted = true;
304 vec![self.build_error_event(error)]
305 }
306 }
307 }
308
309 fn flush_terminator(&mut self) -> Vec<Event> {
310 // axon dialect emits its terminator IN-LINE with the
311 // FlowComplete / FlowError translation. No additional
312 // terminator frame.
313 Vec::new()
314 }
315}