Skip to main content

axon/wire_format/
openai_dialect.rs

1//! §Fase 33.z.k.e (v1.28.0) — `openai` Chat Completions streaming
2//! dialect adapter.
3//!
4//! Matches the OpenAI Chat Completions streaming wire verbatim per
5//! the published API reference:
6//!
7//!   https://platform.openai.com/docs/api-reference/chat/streaming
8//!
9//! # Wire shape (verbatim from OpenAI reference)
10//!
11//! Every frame is a `data:` line — no `event:` field. Frames are
12//! separated by `\n\n`. Each frame's payload is a JSON object with
13//! the closed shape:
14//!
15//! ```text
16//! data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1715648400,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
17//!
18//! ```
19//!
20//! Required top-level fields per OpenAI spec:
21//!   - `id`: response identifier; constant across the chunk stream
22//!   - `object`: literal `"chat.completion.chunk"`
23//!   - `created`: Unix timestamp in seconds (constant across stream)
24//!   - `model`: model identifier (constant across stream)
25//!   - `choices`: array of choice objects
26//!
27//! Per-choice fields:
28//!   - `index`: 0 for single-completion streams
29//!   - `delta`: incremental content (`{}`, `{"role": "assistant"}`,
30//!     `{"content": "..."}`, or `{"tool_calls": [...]}`)
31//!   - `finish_reason`: `null` mid-stream; `"stop"` /
32//!     `"length"` / `"tool_calls"` / `"content_filter"` on last frame
33//!
34//! After the LAST data frame, OpenAI emits a final sentinel:
35//!
36//! ```text
37//! data: [DONE]
38//!
39//! ```
40//!
41//! # axon → openai event mapping
42//!
43//! | axon FlowExecutionEvent | openai wire frame                                    |
44//! |--------------------------|------------------------------------------------------|
45//! | FlowStart                | 1 frame with `delta: {"role": "assistant"}`          |
46//! | StepStart                | 0 frames (openai has no multi-step concept)          |
47//! | StepToken                | 1 frame with `delta: {"content": "<token>"}`         |
48//! | StepComplete             | 0 frames                                             |
49//! | ToolCall                 | 1 frame with `delta: {"tool_calls": [{...}]}`        |
50//! | FlowComplete             | 1 frame with `delta: {}`, `finish_reason: "stop"`    |
51//! |                          | + 1 axon_metadata frame (Q7 algebraic-policy)        |
52//! | FlowError                | 1 frame with `delta: {}`, `finish_reason: "error"`   |
53//! | flush_terminator         | 1 frame `data: [DONE]`                               |
54//!
55//! Step boundaries are NOT visible on the openai wire (the dialect
56//! doesn't model multi-step flows). Adopters consuming openai-compat
57//! clients see a single continuous content stream concatenated
58//! across step tokens. The `axon_metadata` frame emitted before
59//! `[DONE]` carries per-step audit + enforcement counters for
60//! adopters who need step-level visibility (Q7 ratification).
61//!
62//! # Tool-call mapping detail
63//!
64//! Our `FlowExecutionEvent::ToolCall` carries the FULL tool-call
65//! payload in one event (the upstream LLM emitted it as a complete
66//! `FinishReason::ToolUse`). OpenAI's wire streams tool calls as
67//! partial `function.arguments` deltas across frames; emitting the
68//! whole arguments string in a single frame is VALID — OpenAI does
69//! this when arguments fit in one chunk. The adapter emits one
70//! frame per ToolCall event with the complete arguments string.
71//!
72//! # Pillar trace (D10)
73//!
74//! - MATHEMATICS — every frame is a pure projection from one input
75//!   event; field set is closed-catalog.
76//! - LOGIC — `finish_reason` enum is closed per OpenAI spec.
77//! - PHILOSOPHY — adopters using openai-compat SDKs (litellm,
78//!   instructor, langchain, llama_index, vercel/ai-sdk, etc.) parse
79//!   the wire verbatim without any axon-specific awareness.
80//! - COMPUTING — JSON serialization is `serde_json::to_string`
81//!   default ordering (insertion-order preserving in serde_json's
82//!   default `Value::Object` backed by `Map<String, Value>` which is
83//!   `serde_json::Map = BTreeMap` when `preserve_order` feature is
84//!   off — alphabetical key order in output). Adopter parsers MUST
85//!   be order-agnostic per JSON spec; OpenAI's own wire is not
86//!   key-ordered. Tests pin field PRESENCE + values, not byte order.
87
88use super::{CompleteEnvelope, WireFormatAdapter};
89use crate::flow_execution_event::FlowExecutionEvent;
90use axum::response::sse::Event;
91
92/// The OpenAI Chat Completions streaming dialect adapter.
93pub struct OpenAIDialectAdapter {
94    /// Stable response id across the chunk stream. Synthesized once
95    /// at adapter construction from the request's trace_id.
96    response_id: String,
97    /// Unix timestamp in seconds; constant across the stream per
98    /// OpenAI spec.
99    created: u64,
100    /// Model identifier; constant across the stream. Populated from
101    /// the first FlowStart event's `backend` field (defaults to
102    /// `"axon"` until that event arrives).
103    model: String,
104    /// Whether the role-marker frame has been emitted. Per OpenAI
105    /// spec, the first chunk's delta is `{"role": "assistant"}`.
106    role_marker_emitted: bool,
107    /// Tracks the last terminal reason so flush_terminator() can
108    /// emit the proper `finish_reason` if FlowComplete/FlowError
109    /// was not processed (defensive — should not happen in
110    /// well-formed flows).
111    terminal_emitted: bool,
112    /// Per-request running counter for synthesizing tool_call IDs
113    /// (OpenAI requires `id` on each tool_calls[] entry).
114    tool_call_counter: u64,
115    /// Tracks how the stream terminated for adopter visibility on
116    /// the axon_metadata frame.
117    saw_terminal_reason: TerminalReason,
118    /// §Fase 33.z.k.h — Algebraic-policy envelope stashed at
119    /// FlowComplete time so `flush_terminator()` can emit a
120    /// POPULATED `axon_metadata` frame. `None` pre-FlowComplete
121    /// (defensive — flush before FlowComplete is unreachable in
122    /// the production producer but legal per the trait contract).
123    /// Adopters consuming the openai wire receive per-step
124    /// provenance (PCI DSS Req 10 / FedRAMP AU-2 / FRE 502 /
125    /// 21 CFR Part 11 §11.10) via this extension frame even
126    /// though OpenAI's `chat.completion.chunk` shape doesn't
127    /// model multi-step flows.
128    stashed_envelope: Option<CompleteEnvelope>,
129    /// §Fase 37.e (D6) — the `FlowError.error` diagnostic, stashed when
130    /// a `FlowError` is translated so `build_axon_metadata_frame()` can
131    /// surface it as the metadata frame's `error` field. Without this
132    /// the openai (and kimi / glm) wire signalled `terminal_reason:
133    /// error` but never said WHY — a hollow terminator. `None` on a
134    /// flow that did not error.
135    error_detail: Option<String>,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq)]
139enum TerminalReason {
140    None,
141    Stop,
142    Error,
143}
144
145impl OpenAIDialectAdapter {
146    /// Construct a fresh adapter for a request. The trace_id is
147    /// embedded in the response_id; created uses the current Unix
148    /// timestamp.
149    pub fn new(trace_id: u64) -> Self {
150        let response_id = format!("chatcmpl-axon-{trace_id:x}");
151        let created = std::time::SystemTime::now()
152            .duration_since(std::time::UNIX_EPOCH)
153            .map(|d| d.as_secs())
154            .unwrap_or(0);
155        Self {
156            response_id,
157            created,
158            model: "axon".to_string(),
159            role_marker_emitted: false,
160            terminal_emitted: false,
161            tool_call_counter: 0,
162            saw_terminal_reason: TerminalReason::None,
163            stashed_envelope: None,
164            error_detail: None,
165        }
166    }
167
168    /// Build a chunk frame with the given delta + finish_reason.
169    /// All other top-level fields (id, object, created, model)
170    /// stay constant across the stream per OpenAI spec.
171    fn build_chunk_frame(
172        &self,
173        delta: serde_json::Value,
174        finish_reason: Option<&str>,
175    ) -> Event {
176        let choice = serde_json::json!({
177            "index": 0,
178            "delta": delta,
179            "finish_reason": finish_reason,
180        });
181        let payload = serde_json::json!({
182            "id": &self.response_id,
183            "object": "chat.completion.chunk",
184            "created": self.created,
185            "model": &self.model,
186            "choices": [choice],
187        });
188        Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
189    }
190
191    /// Synthesize a stable tool_call ID. OpenAI requires per-call IDs
192    /// so adopters can correlate tool results back to their requests.
193    fn next_tool_call_id(&mut self) -> String {
194        self.tool_call_counter += 1;
195        format!(
196            "call_{}_{}",
197            self.response_id.strip_prefix("chatcmpl-axon-").unwrap_or("0"),
198            self.tool_call_counter,
199        )
200    }
201
202    /// §Q7 + §Fase 33.z.k.h — Build the `axon_metadata` extension
203    /// frame carrying the full algebraic-policy side-channel data
204    /// + flow envelope. Adopters on the openai wire (litellm,
205    /// langchain, vercel/ai, instructor, llama_index) ignore unknown
206    /// top-level keys per their permissive JSON parsing; SDK-free
207    /// clients + axon-aware tooling consume the `axon_metadata` key
208    /// directly to surface vertical-regulator audit data.
209    ///
210    /// Frame shape (every field optional / elided when empty for
211    /// minimal-overhead default behavior):
212    ///
213    /// ```text
214    /// data: {"axon_metadata": {
215    ///     "trace_id": N, "flow": "...", "backend": "...",
216    ///     "success": bool, "steps_executed": N,
217    ///     "tokens_input": N, "tokens_output": N, "latency_ms": N,
218    ///     "stream_policies": [{step, policy}, ...],
219    ///     "enforcement_summary": {step: {policy_slug, chunks_pushed, ...}},
220    ///     "runtime_warnings": [...],
221    ///     "step_audit": [{step_name, step_index, tokens_emitted,
222    ///                     output_hash_hex, effect_policy_applied, ...}, ...],
223    ///     "terminal_reason": "stop" | "error" | "none"
224    /// }}
225    /// ```
226    ///
227    /// When no envelope has been stashed (pre-FlowComplete flush —
228    /// defensive path), the frame carries only the terminal_reason
229    /// + empty algebraic-policy fields. The frame's existence is
230    /// invariant; the data populated is conditional on having
231    /// reached FlowComplete.
232    fn build_axon_metadata_frame(&self) -> Event {
233        let mut metadata = serde_json::Map::new();
234        if let Some(envelope) = self.stashed_envelope.as_ref() {
235            metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
236            metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
237            metadata.insert(
238                "backend".to_string(),
239                serde_json::json!(&envelope.backend),
240            );
241            metadata.insert("success".to_string(), serde_json::json!(envelope.success));
242            metadata.insert(
243                "steps_executed".to_string(),
244                serde_json::json!(envelope.steps_executed),
245            );
246            metadata.insert(
247                "tokens_input".to_string(),
248                serde_json::json!(envelope.tokens_input),
249            );
250            metadata.insert(
251                "tokens_output".to_string(),
252                serde_json::json!(envelope.tokens_output),
253            );
254            metadata.insert(
255                "latency_ms".to_string(),
256                serde_json::json!(envelope.latency_ms),
257            );
258
259            // §Fase 33.e — stream_policies (elided when empty).
260            if !envelope.effect_policies.is_empty() {
261                let arr: Vec<serde_json::Value> = envelope
262                    .effect_policies
263                    .iter()
264                    .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
265                    .collect();
266                metadata.insert(
267                    "stream_policies".to_string(),
268                    serde_json::Value::Array(arr),
269                );
270            }
271            // §Fase 33.x.d — enforcement_summary (elided when empty).
272            if !envelope.enforcement_summaries.is_empty() {
273                let mut obj = serde_json::Map::new();
274                for (step, summary) in &envelope.enforcement_summaries {
275                    obj.insert(
276                        step.clone(),
277                        serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
278                    );
279                }
280                metadata.insert(
281                    "enforcement_summary".to_string(),
282                    serde_json::Value::Object(obj),
283                );
284            }
285            // §Fase 33.x.g — runtime_warnings (elided when empty).
286            if !envelope.runtime_warnings.is_empty() {
287                let arr: Vec<serde_json::Value> = envelope
288                    .runtime_warnings
289                    .iter()
290                    .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
291                    .collect();
292                metadata.insert(
293                    "runtime_warnings".to_string(),
294                    serde_json::Value::Array(arr),
295                );
296            }
297            // §Fase 33.x.f — step_audit (elided when empty).
298            if !envelope.step_audit_records.is_empty() {
299                let arr: Vec<serde_json::Value> = envelope
300                    .step_audit_records
301                    .iter()
302                    .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
303                    .collect();
304                metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
305            }
306        }
307        // Always include terminal_reason so adopters parsing the
308        // metadata frame can distinguish stop / error / none paths
309        // even when the rest of the envelope is missing.
310        let terminal_str = match self.saw_terminal_reason {
311            TerminalReason::None => "none",
312            TerminalReason::Stop => "stop",
313            TerminalReason::Error => "error",
314        };
315        metadata.insert(
316            "terminal_reason".to_string(),
317            serde_json::json!(terminal_str),
318        );
319        // §Fase 37.e (D6) — honest failure: when the flow errored,
320        // the metadata frame names WHY, not just THAT. The diagnostic
321        // string carries the failing node + cause (e.g. `flow node
322        // 'Lookup' failed: …`). Elided on a non-erroring flow.
323        if let Some(err) = self.error_detail.as_ref() {
324            metadata.insert("error".to_string(), serde_json::json!(err));
325        }
326
327        let payload = serde_json::json!({ "axon_metadata": metadata });
328        Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
329    }
330}
331
332impl WireFormatAdapter for OpenAIDialectAdapter {
333    fn dialect(&self) -> &'static str {
334        "openai"
335    }
336
337    /// §Fase 33.z.k.h — Stash the full envelope so flush_terminator()
338    /// can populate the axon_metadata frame with real algebraic-policy
339    /// data, then emit the final chunk with `finish_reason: "stop"`
340    /// per OpenAI spec.
341    fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
342        self.terminal_emitted = true;
343        self.saw_terminal_reason = TerminalReason::Stop;
344        // Stash the envelope BEFORE building the final chunk so that
345        // if the producer's tx send fails mid-burst the flush would
346        // still see the side-channel data on a defensive retry.
347        self.stashed_envelope = Some(envelope.clone());
348        vec![self.build_chunk_frame(serde_json::json!({}), Some("stop"))]
349    }
350
351    fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
352        match event {
353            FlowExecutionEvent::FlowStart { backend, .. } => {
354                // Capture the model identifier from the FlowStart's
355                // backend field. This is the only opportunity to
356                // pin the model name for the response (OpenAI spec
357                // requires it constant across the stream).
358                self.model = backend.clone();
359                // Emit the role-marker frame per OpenAI spec
360                // (first chunk has `delta: {"role": "assistant"}`).
361                self.role_marker_emitted = true;
362                vec![self.build_chunk_frame(
363                    serde_json::json!({"role": "assistant"}),
364                    None,
365                )]
366            }
367            FlowExecutionEvent::StepStart { .. } => {
368                // OpenAI has no multi-step concept; silently consume.
369                Vec::new()
370            }
371            FlowExecutionEvent::StepToken { content, .. } => {
372                vec![self.build_chunk_frame(
373                    serde_json::json!({"content": content}),
374                    None,
375                )]
376            }
377            FlowExecutionEvent::StepComplete { .. } => Vec::new(),
378            FlowExecutionEvent::ToolCall {
379                tool_name,
380                content,
381                ..
382            } => {
383                let call_id = self.next_tool_call_id();
384                let delta = serde_json::json!({
385                    "tool_calls": [{
386                        "index": 0,
387                        "id": call_id,
388                        "type": "function",
389                        "function": {
390                            "name": tool_name,
391                            "arguments": content,
392                        }
393                    }]
394                });
395                vec![self.build_chunk_frame(delta, None)]
396            }
397            FlowExecutionEvent::FlowComplete { .. } => {
398                self.terminal_emitted = true;
399                self.saw_terminal_reason = TerminalReason::Stop;
400                vec![self.build_chunk_frame(
401                    serde_json::json!({}),
402                    Some("stop"),
403                )]
404            }
405            FlowExecutionEvent::FlowError { error, .. } => {
406                self.terminal_emitted = true;
407                self.saw_terminal_reason = TerminalReason::Error;
408                // §Fase 37.e (D6) — stash the diagnostic so the
409                // axon_metadata frame can surface it (the openai wire
410                // has no native error event).
411                self.error_detail = Some(error.clone());
412                // OpenAI doesn't have a dedicated "error" finish_reason
413                // — adopter SDKs treat the absence of the `[DONE]`
414                // sentinel OR a non-standard finish_reason as the
415                // signal. We emit a final chunk with the closest
416                // canonical value `"stop"` and rely on the absence
417                // of any further content frames as the implicit
418                // error signal. Adopters who need explicit error
419                // info should also consume the axon_metadata frame
420                // (Q7).
421                vec![self.build_chunk_frame(
422                    serde_json::json!({}),
423                    Some("stop"),
424                )]
425            }
426        }
427    }
428
429    fn flush_terminator(&mut self) -> Vec<Event> {
430        // §Q7 — emit the axon_metadata frame BEFORE the [DONE]
431        // sentinel so adopters parsing the full stream see the
432        // algebraic-policy side-channels in order. The metadata
433        // frame is a non-OpenAI extension; openai-compat clients
434        // that strictly validate the chunk shape will ignore it
435        // (they don't recognize the top-level `axon_metadata` key
436        // and skip the frame).
437        //
438        // §Fase 33.z.k.e ships the metadata frame as a placeholder
439        // (empty fields). 33.z.k.h wires the actual data through.
440        //
441        // §D5 — emit the [DONE] sentinel per OpenAI spec. This is
442        // a non-JSON literal; the adapter emits it via
443        // `Event::default().data("[DONE]")` so the wire bytes
444        // come out as `data: [DONE]\n\n` per W3C SSE framing.
445        vec![
446            self.build_axon_metadata_frame(),
447            Event::default().data("[DONE]"),
448        ]
449    }
450}