axon/wire_format/openai_dialect.rs
1//! §Fase 33.z.k.e (v1.28.0) — `openai` Chat Completions streaming
2//! dialect adapter.
3//!
4//! Matches the OpenAI Chat Completions streaming wire verbatim per
5//! the published API reference:
6//!
7//! https://platform.openai.com/docs/api-reference/chat/streaming
8//!
9//! # Wire shape (verbatim from OpenAI reference)
10//!
11//! Every frame is a `data:` line — no `event:` field. Frames are
12//! separated by `\n\n`. Each frame's payload is a JSON object with
13//! the closed shape:
14//!
15//! ```text
16//! data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1715648400,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
17//!
18//! ```
19//!
20//! Required top-level fields per OpenAI spec:
21//! - `id`: response identifier; constant across the chunk stream
22//! - `object`: literal `"chat.completion.chunk"`
23//! - `created`: Unix timestamp in seconds (constant across stream)
24//! - `model`: model identifier (constant across stream)
25//! - `choices`: array of choice objects
26//!
27//! Per-choice fields:
28//! - `index`: 0 for single-completion streams
29//! - `delta`: incremental content (`{}`, `{"role": "assistant"}`,
30//! `{"content": "..."}`, or `{"tool_calls": [...]}`)
31//! - `finish_reason`: `null` mid-stream; `"stop"` /
32//! `"length"` / `"tool_calls"` / `"content_filter"` on last frame
33//!
34//! After the LAST data frame, OpenAI emits a final sentinel:
35//!
36//! ```text
37//! data: [DONE]
38//!
39//! ```
40//!
41//! # axon → openai event mapping
42//!
43//! | axon FlowExecutionEvent | openai wire frame |
44//! |--------------------------|------------------------------------------------------|
45//! | FlowStart | 1 frame with `delta: {"role": "assistant"}` |
46//! | StepStart | 0 frames (openai has no multi-step concept) |
47//! | StepToken | 1 frame with `delta: {"content": "<token>"}` |
48//! | StepComplete | 0 frames |
49//! | ToolCall | 1 frame with `delta: {"tool_calls": [{...}]}` |
50//! | FlowComplete | 1 frame with `delta: {}`, `finish_reason: "stop"` |
51//! | | + 1 axon_metadata frame (Q7 algebraic-policy) |
52//! | FlowError | 1 frame with `delta: {}`, `finish_reason: "error"` |
53//! | flush_terminator | 1 frame `data: [DONE]` |
54//!
55//! Step boundaries are NOT visible on the openai wire (the dialect
56//! doesn't model multi-step flows). Adopters consuming openai-compat
57//! clients see a single continuous content stream concatenated
58//! across step tokens. The `axon_metadata` frame emitted before
59//! `[DONE]` carries per-step audit + enforcement counters for
60//! adopters who need step-level visibility (Q7 ratification).
61//!
62//! # Tool-call mapping detail
63//!
64//! Our `FlowExecutionEvent::ToolCall` carries the FULL tool-call
65//! payload in one event (the upstream LLM emitted it as a complete
66//! `FinishReason::ToolUse`). OpenAI's wire streams tool calls as
67//! partial `function.arguments` deltas across frames; emitting the
68//! whole arguments string in a single frame is VALID — OpenAI does
69//! this when arguments fit in one chunk. The adapter emits one
70//! frame per ToolCall event with the complete arguments string.
71//!
72//! # Pillar trace (D10)
73//!
74//! - MATHEMATICS — every frame is a pure projection from one input
75//! event; field set is closed-catalog.
76//! - LOGIC — `finish_reason` enum is closed per OpenAI spec.
77//! - PHILOSOPHY — adopters using openai-compat SDKs (litellm,
78//! instructor, langchain, llama_index, vercel/ai-sdk, etc.) parse
79//! the wire verbatim without any axon-specific awareness.
80//! - COMPUTING — JSON serialization is `serde_json::to_string`
81//! default ordering (insertion-order preserving in serde_json's
82//! default `Value::Object` backed by `Map<String, Value>` which is
83//! `serde_json::Map = BTreeMap` when `preserve_order` feature is
84//! off — alphabetical key order in output). Adopter parsers MUST
85//! be order-agnostic per JSON spec; OpenAI's own wire is not
86//! key-ordered. Tests pin field PRESENCE + values, not byte order.
87
88use super::{CompleteEnvelope, WireFormatAdapter};
89use crate::flow_execution_event::FlowExecutionEvent;
90use axum::response::sse::Event;
91
92/// The OpenAI Chat Completions streaming dialect adapter.
93pub struct OpenAIDialectAdapter {
94 /// Stable response id across the chunk stream. Synthesized once
95 /// at adapter construction from the request's trace_id.
96 response_id: String,
97 /// Unix timestamp in seconds; constant across the stream per
98 /// OpenAI spec.
99 created: u64,
100 /// Model identifier; constant across the stream. Populated from
101 /// the first FlowStart event's `backend` field (defaults to
102 /// `"axon"` until that event arrives).
103 model: String,
104 /// Whether the role-marker frame has been emitted. Per OpenAI
105 /// spec, the first chunk's delta is `{"role": "assistant"}`.
106 role_marker_emitted: bool,
107 /// Tracks the last terminal reason so flush_terminator() can
108 /// emit the proper `finish_reason` if FlowComplete/FlowError
109 /// was not processed (defensive — should not happen in
110 /// well-formed flows).
111 terminal_emitted: bool,
112 /// Per-request running counter for synthesizing tool_call IDs
113 /// (OpenAI requires `id` on each tool_calls[] entry).
114 tool_call_counter: u64,
115 /// Tracks how the stream terminated for adopter visibility on
116 /// the axon_metadata frame.
117 saw_terminal_reason: TerminalReason,
118 /// §Fase 33.z.k.h — Algebraic-policy envelope stashed at
119 /// FlowComplete time so `flush_terminator()` can emit a
120 /// POPULATED `axon_metadata` frame. `None` pre-FlowComplete
121 /// (defensive — flush before FlowComplete is unreachable in
122 /// the production producer but legal per the trait contract).
123 /// Adopters consuming the openai wire receive per-step
124 /// provenance (PCI DSS Req 10 / FedRAMP AU-2 / FRE 502 /
125 /// 21 CFR Part 11 §11.10) via this extension frame even
126 /// though OpenAI's `chat.completion.chunk` shape doesn't
127 /// model multi-step flows.
128 stashed_envelope: Option<CompleteEnvelope>,
129 /// §Fase 37.e (D6) — the `FlowError.error` diagnostic, stashed when
130 /// a `FlowError` is translated so `build_axon_metadata_frame()` can
131 /// surface it as the metadata frame's `error` field. Without this
132 /// the openai (and kimi / glm) wire signalled `terminal_reason:
133 /// error` but never said WHY — a hollow terminator. `None` on a
134 /// flow that did not error.
135 error_detail: Option<String>,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq)]
139enum TerminalReason {
140 None,
141 Stop,
142 Error,
143}
144
145impl OpenAIDialectAdapter {
146 /// Construct a fresh adapter for a request. The trace_id is
147 /// embedded in the response_id; created uses the current Unix
148 /// timestamp.
149 pub fn new(trace_id: u64) -> Self {
150 let response_id = format!("chatcmpl-axon-{trace_id:x}");
151 let created = std::time::SystemTime::now()
152 .duration_since(std::time::UNIX_EPOCH)
153 .map(|d| d.as_secs())
154 .unwrap_or(0);
155 Self {
156 response_id,
157 created,
158 model: "axon".to_string(),
159 role_marker_emitted: false,
160 terminal_emitted: false,
161 tool_call_counter: 0,
162 saw_terminal_reason: TerminalReason::None,
163 stashed_envelope: None,
164 error_detail: None,
165 }
166 }
167
168 /// Build a chunk frame with the given delta + finish_reason.
169 /// All other top-level fields (id, object, created, model)
170 /// stay constant across the stream per OpenAI spec.
171 fn build_chunk_frame(
172 &self,
173 delta: serde_json::Value,
174 finish_reason: Option<&str>,
175 ) -> Event {
176 let choice = serde_json::json!({
177 "index": 0,
178 "delta": delta,
179 "finish_reason": finish_reason,
180 });
181 let payload = serde_json::json!({
182 "id": &self.response_id,
183 "object": "chat.completion.chunk",
184 "created": self.created,
185 "model": &self.model,
186 "choices": [choice],
187 });
188 Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
189 }
190
191 /// Synthesize a stable tool_call ID. OpenAI requires per-call IDs
192 /// so adopters can correlate tool results back to their requests.
193 fn next_tool_call_id(&mut self) -> String {
194 self.tool_call_counter += 1;
195 format!(
196 "call_{}_{}",
197 self.response_id.strip_prefix("chatcmpl-axon-").unwrap_or("0"),
198 self.tool_call_counter,
199 )
200 }
201
202 /// §Q7 + §Fase 33.z.k.h — Build the `axon_metadata` extension
203 /// frame carrying the full algebraic-policy side-channel data
204 /// + flow envelope. Adopters on the openai wire (litellm,
205 /// langchain, vercel/ai, instructor, llama_index) ignore unknown
206 /// top-level keys per their permissive JSON parsing; SDK-free
207 /// clients + axon-aware tooling consume the `axon_metadata` key
208 /// directly to surface vertical-regulator audit data.
209 ///
210 /// Frame shape (every field optional / elided when empty for
211 /// minimal-overhead default behavior):
212 ///
213 /// ```text
214 /// data: {"axon_metadata": {
215 /// "trace_id": N, "flow": "...", "backend": "...",
216 /// "success": bool, "steps_executed": N,
217 /// "tokens_input": N, "tokens_output": N, "latency_ms": N,
218 /// "stream_policies": [{step, policy}, ...],
219 /// "enforcement_summary": {step: {policy_slug, chunks_pushed, ...}},
220 /// "runtime_warnings": [...],
221 /// "step_audit": [{step_name, step_index, tokens_emitted,
222 /// output_hash_hex, effect_policy_applied, ...}, ...],
223 /// "terminal_reason": "stop" | "error" | "none"
224 /// }}
225 /// ```
226 ///
227 /// When no envelope has been stashed (pre-FlowComplete flush —
228 /// defensive path), the frame carries only the terminal_reason
229 /// + empty algebraic-policy fields. The frame's existence is
230 /// invariant; the data populated is conditional on having
231 /// reached FlowComplete.
232 fn build_axon_metadata_frame(&self) -> Event {
233 let mut metadata = serde_json::Map::new();
234 if let Some(envelope) = self.stashed_envelope.as_ref() {
235 metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
236 metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
237 metadata.insert(
238 "backend".to_string(),
239 serde_json::json!(&envelope.backend),
240 );
241 metadata.insert("success".to_string(), serde_json::json!(envelope.success));
242 metadata.insert(
243 "steps_executed".to_string(),
244 serde_json::json!(envelope.steps_executed),
245 );
246 metadata.insert(
247 "tokens_input".to_string(),
248 serde_json::json!(envelope.tokens_input),
249 );
250 metadata.insert(
251 "tokens_output".to_string(),
252 serde_json::json!(envelope.tokens_output),
253 );
254 metadata.insert(
255 "latency_ms".to_string(),
256 serde_json::json!(envelope.latency_ms),
257 );
258
259 // §Fase 33.e — stream_policies (elided when empty).
260 if !envelope.effect_policies.is_empty() {
261 let arr: Vec<serde_json::Value> = envelope
262 .effect_policies
263 .iter()
264 .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
265 .collect();
266 metadata.insert(
267 "stream_policies".to_string(),
268 serde_json::Value::Array(arr),
269 );
270 }
271 // §Fase 33.x.d — enforcement_summary (elided when empty).
272 if !envelope.enforcement_summaries.is_empty() {
273 let mut obj = serde_json::Map::new();
274 for (step, summary) in &envelope.enforcement_summaries {
275 obj.insert(
276 step.clone(),
277 serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
278 );
279 }
280 metadata.insert(
281 "enforcement_summary".to_string(),
282 serde_json::Value::Object(obj),
283 );
284 }
285 // §Fase 33.x.g — runtime_warnings (elided when empty).
286 if !envelope.runtime_warnings.is_empty() {
287 let arr: Vec<serde_json::Value> = envelope
288 .runtime_warnings
289 .iter()
290 .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
291 .collect();
292 metadata.insert(
293 "runtime_warnings".to_string(),
294 serde_json::Value::Array(arr),
295 );
296 }
297 // §Fase 33.x.f — step_audit (elided when empty).
298 if !envelope.step_audit_records.is_empty() {
299 let arr: Vec<serde_json::Value> = envelope
300 .step_audit_records
301 .iter()
302 .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
303 .collect();
304 metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
305 }
306 }
307 // Always include terminal_reason so adopters parsing the
308 // metadata frame can distinguish stop / error / none paths
309 // even when the rest of the envelope is missing.
310 let terminal_str = match self.saw_terminal_reason {
311 TerminalReason::None => "none",
312 TerminalReason::Stop => "stop",
313 TerminalReason::Error => "error",
314 };
315 metadata.insert(
316 "terminal_reason".to_string(),
317 serde_json::json!(terminal_str),
318 );
319 // §Fase 37.e (D6) — honest failure: when the flow errored,
320 // the metadata frame names WHY, not just THAT. The diagnostic
321 // string carries the failing node + cause (e.g. `flow node
322 // 'Lookup' failed: …`). Elided on a non-erroring flow.
323 if let Some(err) = self.error_detail.as_ref() {
324 metadata.insert("error".to_string(), serde_json::json!(err));
325 }
326
327 let payload = serde_json::json!({ "axon_metadata": metadata });
328 Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
329 }
330}
331
332impl WireFormatAdapter for OpenAIDialectAdapter {
333 fn dialect(&self) -> &'static str {
334 "openai"
335 }
336
337 /// §Fase 33.z.k.h — Stash the full envelope so flush_terminator()
338 /// can populate the axon_metadata frame with real algebraic-policy
339 /// data, then emit the final chunk with `finish_reason: "stop"`
340 /// per OpenAI spec.
341 fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
342 self.terminal_emitted = true;
343 self.saw_terminal_reason = TerminalReason::Stop;
344 // Stash the envelope BEFORE building the final chunk so that
345 // if the producer's tx send fails mid-burst the flush would
346 // still see the side-channel data on a defensive retry.
347 self.stashed_envelope = Some(envelope.clone());
348 vec![self.build_chunk_frame(serde_json::json!({}), Some("stop"))]
349 }
350
351 fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
352 match event {
353 FlowExecutionEvent::FlowStart { backend, .. } => {
354 // Capture the model identifier from the FlowStart's
355 // backend field. This is the only opportunity to
356 // pin the model name for the response (OpenAI spec
357 // requires it constant across the stream).
358 self.model = backend.clone();
359 // Emit the role-marker frame per OpenAI spec
360 // (first chunk has `delta: {"role": "assistant"}`).
361 self.role_marker_emitted = true;
362 vec![self.build_chunk_frame(
363 serde_json::json!({"role": "assistant"}),
364 None,
365 )]
366 }
367 FlowExecutionEvent::StepStart { .. } => {
368 // OpenAI has no multi-step concept; silently consume.
369 Vec::new()
370 }
371 FlowExecutionEvent::StepToken { content, .. } => {
372 vec![self.build_chunk_frame(
373 serde_json::json!({"content": content}),
374 None,
375 )]
376 }
377 FlowExecutionEvent::StepComplete { .. } => Vec::new(),
378 FlowExecutionEvent::ToolCall {
379 tool_name,
380 content,
381 ..
382 } => {
383 let call_id = self.next_tool_call_id();
384 let delta = serde_json::json!({
385 "tool_calls": [{
386 "index": 0,
387 "id": call_id,
388 "type": "function",
389 "function": {
390 "name": tool_name,
391 "arguments": content,
392 }
393 }]
394 });
395 vec![self.build_chunk_frame(delta, None)]
396 }
397 FlowExecutionEvent::FlowComplete { .. } => {
398 self.terminal_emitted = true;
399 self.saw_terminal_reason = TerminalReason::Stop;
400 vec![self.build_chunk_frame(
401 serde_json::json!({}),
402 Some("stop"),
403 )]
404 }
405 FlowExecutionEvent::FlowError { error, .. } => {
406 self.terminal_emitted = true;
407 self.saw_terminal_reason = TerminalReason::Error;
408 // §Fase 37.e (D6) — stash the diagnostic so the
409 // axon_metadata frame can surface it (the openai wire
410 // has no native error event).
411 self.error_detail = Some(error.clone());
412 // OpenAI doesn't have a dedicated "error" finish_reason
413 // — adopter SDKs treat the absence of the `[DONE]`
414 // sentinel OR a non-standard finish_reason as the
415 // signal. We emit a final chunk with the closest
416 // canonical value `"stop"` and rely on the absence
417 // of any further content frames as the implicit
418 // error signal. Adopters who need explicit error
419 // info should also consume the axon_metadata frame
420 // (Q7).
421 vec![self.build_chunk_frame(
422 serde_json::json!({}),
423 Some("stop"),
424 )]
425 }
426 }
427 }
428
429 fn flush_terminator(&mut self) -> Vec<Event> {
430 // §Q7 — emit the axon_metadata frame BEFORE the [DONE]
431 // sentinel so adopters parsing the full stream see the
432 // algebraic-policy side-channels in order. The metadata
433 // frame is a non-OpenAI extension; openai-compat clients
434 // that strictly validate the chunk shape will ignore it
435 // (they don't recognize the top-level `axon_metadata` key
436 // and skip the frame).
437 //
438 // §Fase 33.z.k.e ships the metadata frame as a placeholder
439 // (empty fields). 33.z.k.h wires the actual data through.
440 //
441 // §D5 — emit the [DONE] sentinel per OpenAI spec. This is
442 // a non-JSON literal; the adapter emits it via
443 // `Event::default().data("[DONE]")` so the wire bytes
444 // come out as `data: [DONE]\n\n` per W3C SSE framing.
445 vec![
446 self.build_axon_metadata_frame(),
447 Event::default().data("[DONE]"),
448 ]
449 }
450}