Skip to main content

axon/wire_format/
mod.rs

1//! §Fase 33.z.k (v1.28.0) — Wire-format adapter framework.
2//!
3//! Closed-catalog dispatcher for SSE wire-format dialects. Three
4//! dialects ship as first-class adapters per Q3 ratification:
5//!
6//! - **axon**: W3C named events (`event: axon.token` /
7//!   `event: axon.complete` / `event: axon.tool_call` /
8//!   `event: axon.error`). D6 backwards-compat baseline.
9//! - **openai**: `data: {"choices": [{"delta": {"content": "..."}}]}`
10//!   frames terminated by `data: [DONE]`. Matches OpenAI Chat
11//!   Completions API streaming wire (OpenAI Reference Docs §Chat /
12//!   Create chat completion / Streaming).
13//! - **anthropic**: `event: content_block_delta` /
14//!   `event: message_stop`. Matches Anthropic Messages API
15//!   streaming wire (Anthropic API Reference §Messages / Streaming).
16//!
17//! # Architecture
18//!
19//! The producer drives a translation loop:
20//!
21//! ```text
22//! for event in flow_execution_event_stream {
23//!     for wire_event in adapter.translate(&event) {
24//!         tx.send(Ok(wire_event)).await?;
25//!     }
26//! }
27//! for terminator in adapter.flush_terminator() {
28//!     tx.send(Ok(terminator)).await?;
29//! }
30//! ```
31//!
32//! Each adapter is **stateful** — it tracks per-request state (event
33//! ID counter, current step index for content_block boundaries,
34//! per-dialect tool-call indices, etc.). A fresh adapter is
35//! constructed per request via `select_adapter(dialect, trace_id)`.
36//!
37//! # D-letter coverage
38//!
39//! - **D3 semantic equivalence**: every dialect emits the SAME
40//!   per-token content + step-name + arrival ordering. Only framing
41//!   differs.
42//! - **D4 algebraic-policy preservation**: each adapter exposes a
43//!   dedicated path for `enforcement_summary` / `runtime_warnings` /
44//!   `step_audit` surfacing — axon embeds them on `axon.complete`,
45//!   openai emits a `data: {"axon_metadata": {...}}` frame BEFORE
46//!   `data: [DONE]`, anthropic emits `event: axon.metadata` BEFORE
47//!   `event: message_stop`.
48//! - **D5 closed-dialect terminators**: each adapter's
49//!   `flush_terminator()` emits the dialect's native terminator
50//!   (axon: `axon.complete`; openai: `data: [DONE]`; anthropic:
51//!   `event: message_stop`).
52//! - **D9 wire byte-compat for canonical Step**: the axon-dialect
53//!   adapter MUST produce byte-identical output to the v1.27.1
54//!   inline emission. Test pack
55//!   `tests/fase33z_k_d_axon_adapter_byte_compat.rs` pins this.
56//!
57//! # Pillar trace (D10)
58//!
59//! - MATHEMATICS — each `translate` is a pure projection from one
60//!   `FlowExecutionEvent` to a `Vec<Event>`. Trait method invariant.
61//! - LOGIC — closed catalog of 3 dialects; `select_adapter`
62//!   dispatch is exhaustive; unknown dialect defaults to axon.
63//! - PHILOSOPHY — adopters' SDKs consume the wire format their
64//!   ecosystem documents; the language adapts to the adopter
65//!   layer, not the other way around (`Axon for Axon` discipline
66//!   per the founder principle).
67//! - COMPUTING — adapter selection is O(1) at request time;
68//!   per-request adapter state is unbounded but small (~3-5
69//!   counters); no allocation per token translation.
70
71pub mod anthropic_dialect;
72pub mod axon_dialect;
73pub mod openai_dialect;
74
75use crate::axon_server::EnforcementSummaryWire;
76use crate::axonendpoint_replay::StepAuditRecord;
77use crate::flow_execution_event::FlowExecutionEvent;
78use crate::runtime_warnings::RuntimeWarning;
79use axum::response::sse::Event;
80
81/// §Fase 33.z.k.g (v1.28.0) — Envelope carrying all the side-channel
82/// + summary data the SSE producer accumulates over the lifetime of
83/// a flow. Passed to `adapter.build_complete_envelope_event()` at
84/// FlowComplete time so dialect adapters can surface the data per
85/// their wire-format conventions:
86///
87/// - **axon**: embeds the fields directly on `axon.complete` (D4
88///   wire byte-compat with v1.27.1 inline `build_complete_event`).
89/// - **openai**: emits a separate `data: {"axon_metadata": {...}}`
90///   frame BEFORE `data: [DONE]` (Q7 ratification).
91/// - **anthropic**: emits a separate `event: axon.metadata` frame
92///   BEFORE `event: message_stop` (Q7).
93///
94/// Built by the producer from the same side-channels v1.27.1
95/// `build_complete_event` consumed:
96/// - `enforcement_summaries`: per-step enforcement counters from
97///   the dispatcher's `StreamPolicyEnforcer` (Fase 33.x.d).
98/// - `runtime_warnings`: closed-catalog axon-W002 etc. warnings
99///   (Fase 33.x.g).
100/// - `effect_policies`: per-step `<stream:<policy>>` declarations
101///   (Fase 33.e).
102/// - `step_audit_records`: per-step audit rows (step_name +
103///   tokens_emitted + output_hash + effect_policy_applied +
104///   counters), populated by the dispatcher (Fase 33.x.f). Surfaces
105///   on the `axon.metadata` extension frame for openai + anthropic
106///   dialects (Q7 algebraic-policy preservation channel — adopters
107///   on those wires use this to satisfy banking PCI DSS Req 10 /
108///   government FedRAMP AU-2 / legal FRE 502 / medicine 21 CFR Part
109///   11 §11.10 per-step provenance requirements).
110/// - Flow envelope: trace_id, flow_name, backend, success, counters,
111///   latency.
112#[derive(Debug, Clone)]
113pub struct CompleteEnvelope {
114    pub trace_id: u64,
115    pub flow_name: String,
116    pub backend: String,
117    pub success: bool,
118    pub steps_executed: usize,
119    pub tokens_input: u64,
120    pub tokens_output: u64,
121    pub latency_ms: u64,
122    pub effect_policies: Vec<(String, String)>,
123    pub enforcement_summaries: Vec<(String, EnforcementSummaryWire)>,
124    pub runtime_warnings: Vec<RuntimeWarning>,
125    pub step_audit_records: Vec<StepAuditRecord>,
126    /// §Fase 55.b — the Theorem 5.1 `(base, scope, confidence)` triple of
127    /// every `use <Tool>` dispatch whose tool declares an
128    /// `epistemic:<level>` effect. Emitted as the `epistemic` array on the
129    /// streaming `axon.complete` envelope (elided when empty), byte-aligned
130    /// with the sync `FlowEnvelope.epistemic_envelopes` (both derived by
131    /// `epistemic_capture::collect_for_flow` — §55.c parity).
132    pub epistemic_envelopes: Vec<crate::epistemic_capture::EpistemicEnvelope>,
133}
134
135/// Wire-format adapter trait — translates internal flow-execution
136/// events into per-dialect SSE wire events.
137///
138/// Adapters are **stateful** — a fresh adapter is constructed per
139/// request via [`select_adapter`]. The adapter tracks per-request
140/// state (event ID counter, step index, per-tool-call index, etc.)
141/// internally.
142///
143/// All methods are `&mut self` to permit internal counter updates.
144/// Adapters MUST be `Send` so the SSE producer can move them across
145/// the spawn boundary.
146pub trait WireFormatAdapter: Send {
147    /// Closed-catalog dialect identifier this adapter implements.
148    /// One of `"axon"`, `"openai"`, `"anthropic"`.
149    fn dialect(&self) -> &'static str;
150
151    /// Translate a single internal flow-execution event into zero
152    /// or more wire SSE events.
153    ///
154    /// Returning an empty Vec is valid — some dialects swallow
155    /// certain internal events (e.g., openai doesn't have a per-step
156    /// `step_start` concept; AxonDialectAdapter emits one).
157    ///
158    /// NOTE: the producer calls `build_complete_envelope_event()`
159    /// instead of `translate()` for `FlowComplete` so the adapter
160    /// receives the full algebraic-policy side-channel data. The
161    /// `translate(FlowComplete)` path is retained for adapters /
162    /// callers that don't have the full envelope available; it
163    /// produces a minimal envelope without enforcement_summaries /
164    /// runtime_warnings / effect_policies.
165    fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event>;
166
167    /// §Fase 33.z.k.g (v1.28.0) — Build the dialect-specific
168    /// "complete" event with the full algebraic-policy side-channel
169    /// data accumulated over the flow's lifetime. The producer calls
170    /// this in place of `translate(FlowComplete)` so adapters can
171    /// surface enforcement_summaries / runtime_warnings /
172    /// effect_policies per their wire-format conventions.
173    ///
174    /// Default impl: produces the same output as
175    /// `translate(FlowComplete{...})` reconstructed from the envelope
176    /// — adapters that don't override see the envelope's flow-level
177    /// fields but no side-channel data.
178    fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
179        // Default fallback: build a FlowComplete event from the
180        // envelope's flow-level fields + translate it normally.
181        let event = FlowExecutionEvent::FlowComplete {
182            flow_name: envelope.flow_name.clone(),
183            backend: envelope.backend.clone(),
184            success: envelope.success,
185            steps_executed: envelope.steps_executed,
186            tokens_input: envelope.tokens_input,
187            tokens_output: envelope.tokens_output,
188            latency_ms: envelope.latency_ms,
189            timestamp_ms: 0,
190        };
191        self.translate(&event)
192    }
193
194    /// Emit any final terminator frames after the internal
195    /// `FlowComplete` / `FlowError` event has been translated.
196    ///
197    /// - axon: emits no extra terminator (the `axon.complete` event
198    ///   from `translate()` is itself the terminator).
199    /// - openai: emits `data: [DONE]`.
200    /// - anthropic: emits `event: message_stop` if not already
201    ///   emitted as part of FlowComplete translation.
202    fn flush_terminator(&mut self) -> Vec<Event>;
203}
204
205/// Closed-catalog factory for wire-format adapters.
206///
207/// `dialect` is one of `"axon"` / `"openai"` / `"anthropic"` (Q3
208/// ratified catalog). Unknown dialect strings default to
209/// `axon` (defensive — caller already validated via
210/// `AXONENDPOINT_TRANSPORT_DIALECTS` at parse time, but we never
211/// panic on a stale input).
212///
213/// `trace_id` is reserved per-request (Fase 33.x.c contract); the
214/// adapter embeds it in every emitted event for correlation.
215pub fn select_adapter(
216    dialect: &str,
217    trace_id: u64,
218) -> Box<dyn WireFormatAdapter> {
219    match dialect {
220        "axon" => Box::new(axon_dialect::AxonDialectAdapter::new(trace_id)),
221        // §Fase 33.z.k.e — OpenAI Chat Completions streaming wire.
222        "openai" => Box::new(openai_dialect::OpenAIDialectAdapter::new(trace_id)),
223        // §Q3 revision 2026-05-14 — Kimi (Moonshot) + GLM (Zhipu).
224        // Both providers use the OpenAI-compatible Chat Completions
225        // streaming wire verbatim (same chunk shape, same `data: [DONE]`
226        // sentinel). First-class catalog entries for adopter intent
227        // declaration; the wire is dispatched to the same
228        // OpenAIDialectAdapter so the bytes are canonical-OpenAI.
229        "kimi" => Box::new(openai_dialect::OpenAIDialectAdapter::new(trace_id)),
230        "glm" => Box::new(openai_dialect::OpenAIDialectAdapter::new(trace_id)),
231        // §Fase 33.z.k.f — Anthropic Messages streaming wire.
232        "anthropic" => Box::new(anthropic_dialect::AnthropicDialectAdapter::new(trace_id)),
233        _ => Box::new(axon_dialect::AxonDialectAdapter::new(trace_id)),
234    }
235}