Skip to main content

axon/wire_format/
anthropic_dialect.rs

1//! §Fase 33.z.k.f (v1.28.0) — `anthropic` Messages streaming dialect adapter.
2//!
3//! Matches the Anthropic Messages API streaming wire verbatim per
4//! the published API reference:
5//!
6//!   https://docs.anthropic.com/en/api/messages-streaming
7//!
8//! # Wire shape (verbatim from Anthropic reference)
9//!
10//! Anthropic streams W3C SSE with NAMED events forming a structured
11//! lifecycle:
12//!
13//! ```text
14//! event: message_start
15//! data: {"type": "message_start", "message": {"id": "...", "type": "message",
16//!         "role": "assistant", "content": [], "model": "...",
17//!         "stop_reason": null, "stop_sequence": null,
18//!         "usage": {"input_tokens": N, "output_tokens": N}}}
19//!
20//! event: content_block_start
21//! data: {"type": "content_block_start", "index": 0,
22//!         "content_block": {"type": "text", "text": ""}}
23//!
24//! event: content_block_delta
25//! data: {"type": "content_block_delta", "index": 0,
26//!         "delta": {"type": "text_delta", "text": "<token>"}}
27//!
28//! event: content_block_stop
29//! data: {"type": "content_block_stop", "index": 0}
30//!
31//! event: message_delta
32//! data: {"type": "message_delta",
33//!         "delta": {"stop_reason": "end_turn", "stop_sequence": null},
34//!         "usage": {"output_tokens": N}}
35//!
36//! event: message_stop
37//! data: {"type": "message_stop"}
38//! ```
39//!
40//! Event taxonomy:
41//!   - `message_start`: announces the message; carries id, role,
42//!     model, initial usage
43//!   - `content_block_start`: announces a content block (text,
44//!     tool_use, etc.); index 0+, monotonically increasing
45//!   - `content_block_delta`: incremental content within a block;
46//!     for text blocks: `delta: {"type": "text_delta", "text": "..."}`;
47//!     for tool_use blocks: `delta: {"type": "input_json_delta",
48//!     "partial_json": "..."}`
49//!   - `content_block_stop`: closes the block at the given index
50//!   - `message_delta`: final usage + stop_reason envelope
51//!   - `message_stop`: stream terminator
52//!
53//! Tool-use blocks (per Anthropic spec):
54//!
55//! ```text
56//! event: content_block_start
57//! data: {"type": "content_block_start", "index": N,
58//!         "content_block": {"type": "tool_use", "id": "toolu_xxx",
59//!                          "name": "<tool>", "input": {}}}
60//!
61//! event: content_block_delta
62//! data: {"type": "content_block_delta", "index": N,
63//!         "delta": {"type": "input_json_delta",
64//!                  "partial_json": "<json string>"}}
65//!
66//! event: content_block_stop
67//! data: {"type": "content_block_stop", "index": N}
68//! ```
69//!
70//! # axon → anthropic event mapping
71//!
72//! Adopts on-demand text-block management: a text content block is
73//! lazily opened on the first StepToken arrival and stopped on
74//! StepComplete OR when a ToolCall arrives (interleaving the
75//! tool_use block). The block index advances monotonically.
76//!
77//! | axon FlowExecutionEvent | anthropic wire frames                                 |
78//! |--------------------------|-------------------------------------------------------|
79//! | FlowStart                | 1 frame `event: message_start`                        |
80//! | StepStart                | 0 frames (lazy text-block opens on first StepToken)   |
81//! | StepToken                | (0 or 1) content_block_start(text) + 1 content_block_delta(text_delta) |
82//! | StepComplete             | (0 or 1) content_block_stop (closes open text block)  |
83//! | ToolCall                 | (0 or 1) content_block_stop(text) + 3 frames for tool_use block |
84//! | FlowComplete             | (0 or 1) content_block_stop + 1 message_delta         |
85//! | FlowError                | (0 or 1) content_block_stop + 1 message_delta (stop_reason=error_signal) |
86//! | flush_terminator         | 1 axon.metadata frame (Q7) + 1 message_stop           |
87//!
88//! # Pillar trace (D10)
89//!
90//! - MATHEMATICS — content_block indices are monotonic integers;
91//!   block lifecycle (start→delta*→stop) is a finite state machine.
92//! - LOGIC — every translate() call respects the
93//!   open-block-must-close-before-new-block invariant.
94//! - PHILOSOPHY — adopters using Anthropic SDKs (python-anthropic,
95//!   anthropic-sdk-typescript, vercel/ai-sdk anthropic provider)
96//!   parse the wire verbatim without any axon-specific awareness.
97//! - COMPUTING — state machine is bounded (one current text block
98//!   index + one rolling block counter); per-event O(1).
99
100use super::{CompleteEnvelope, WireFormatAdapter};
101use crate::flow_execution_event::FlowExecutionEvent;
102use axum::response::sse::Event;
103
104/// The Anthropic Messages streaming dialect adapter.
105pub struct AnthropicDialectAdapter {
106    /// Stable message id across the stream. Synthesized from trace_id.
107    message_id: String,
108    /// Model identifier; constant across the stream. Captured from
109    /// the FlowStart event's backend.
110    model: String,
111    /// Whether `message_start` has been emitted (defensive — should
112    /// be true after the first FlowStart translate).
113    message_started: bool,
114    /// Next content_block index to assign. Monotonically increases.
115    next_block_index: usize,
116    /// Index of the currently-open text block, if any. Lazy-opened on
117    /// the first StepToken; closed on StepComplete / ToolCall /
118    /// FlowComplete.
119    open_text_block: Option<usize>,
120    /// Per-request running counter for synthesizing tool_use IDs.
121    tool_use_counter: u64,
122    /// Output token count accumulator (best-effort; populated from
123    /// StepToken arrivals). Surfaced on the terminal message_delta.
124    output_tokens_accumulated: u64,
125    /// Whether FlowComplete / FlowError has been translated.
126    terminal_emitted: bool,
127    /// Tracks how the stream terminated for adopter visibility on
128    /// the axon.metadata frame.
129    saw_terminal_reason: TerminalReason,
130    /// §Fase 33.z.k.h — Algebraic-policy envelope stashed at
131    /// FlowComplete time so `flush_terminator()` can emit a
132    /// POPULATED `axon.metadata` frame. `None` pre-FlowComplete.
133    /// Adopters consuming the anthropic wire receive per-step
134    /// provenance (banking PCI DSS Req 10 / government FedRAMP
135    /// AU-2 / legal FRE 502 / medicine 21 CFR Part 11 §11.10) via
136    /// this extension frame; anthropic-compat SDKs ignore unknown
137    /// event names verbatim per Anthropic spec §Streaming.
138    stashed_envelope: Option<CompleteEnvelope>,
139    /// §Fase 37.e (D6) — the `FlowError.error` diagnostic, stashed when
140    /// a `FlowError` is translated so `build_axon_metadata_frame()`
141    /// surfaces it as the metadata frame's `error` field. Anthropic
142    /// has no native error stop_reason; without this the wire signalled
143    /// `terminal_reason: error` but never said WHY. `None` on a flow
144    /// that did not error.
145    error_detail: Option<String>,
146}
147
148/// Closed-catalog terminal reason mirror (parallel to OpenAI dialect).
149#[derive(Debug, Clone, Copy, PartialEq)]
150enum TerminalReason {
151    None,
152    Stop,
153    Error,
154}
155
156impl AnthropicDialectAdapter {
157    /// Construct a fresh adapter for a request.
158    pub fn new(trace_id: u64) -> Self {
159        let message_id = format!("msg_axon_{trace_id:x}");
160        Self {
161            message_id,
162            model: "axon".to_string(),
163            message_started: false,
164            next_block_index: 0,
165            open_text_block: None,
166            tool_use_counter: 0,
167            output_tokens_accumulated: 0,
168            terminal_emitted: false,
169            saw_terminal_reason: TerminalReason::None,
170            stashed_envelope: None,
171            error_detail: None,
172        }
173    }
174
175    fn build_event(event_name: &'static str, payload: serde_json::Value) -> Event {
176        Event::default()
177            .event(event_name)
178            .data(serde_json::to_string(&payload).unwrap_or_default())
179    }
180
181    fn build_message_start(&self) -> Event {
182        let payload = serde_json::json!({
183            "type": "message_start",
184            "message": {
185                "id": &self.message_id,
186                "type": "message",
187                "role": "assistant",
188                "content": [],
189                "model": &self.model,
190                "stop_reason": null,
191                "stop_sequence": null,
192                "usage": {
193                    "input_tokens": 0,
194                    "output_tokens": 0,
195                }
196            }
197        });
198        Self::build_event("message_start", payload)
199    }
200
201    fn build_text_block_start(&self, index: usize) -> Event {
202        let payload = serde_json::json!({
203            "type": "content_block_start",
204            "index": index,
205            "content_block": {
206                "type": "text",
207                "text": "",
208            }
209        });
210        Self::build_event("content_block_start", payload)
211    }
212
213    fn build_text_delta(&self, index: usize, text: &str) -> Event {
214        let payload = serde_json::json!({
215            "type": "content_block_delta",
216            "index": index,
217            "delta": {
218                "type": "text_delta",
219                "text": text,
220            }
221        });
222        Self::build_event("content_block_delta", payload)
223    }
224
225    fn build_block_stop(&self, index: usize) -> Event {
226        let payload = serde_json::json!({
227            "type": "content_block_stop",
228            "index": index,
229        });
230        Self::build_event("content_block_stop", payload)
231    }
232
233    fn build_tool_use_start(&mut self, index: usize, tool_name: &str) -> Event {
234        self.tool_use_counter += 1;
235        let tool_id = format!(
236            "toolu_axon_{}_{}",
237            self.message_id.strip_prefix("msg_axon_").unwrap_or("0"),
238            self.tool_use_counter
239        );
240        let payload = serde_json::json!({
241            "type": "content_block_start",
242            "index": index,
243            "content_block": {
244                "type": "tool_use",
245                "id": tool_id,
246                "name": tool_name,
247                "input": {},
248            }
249        });
250        Self::build_event("content_block_start", payload)
251    }
252
253    fn build_tool_input_delta(&self, index: usize, partial_json: &str) -> Event {
254        let payload = serde_json::json!({
255            "type": "content_block_delta",
256            "index": index,
257            "delta": {
258                "type": "input_json_delta",
259                "partial_json": partial_json,
260            }
261        });
262        Self::build_event("content_block_delta", payload)
263    }
264
265    fn build_message_delta(&self, stop_reason: &str) -> Event {
266        let payload = serde_json::json!({
267            "type": "message_delta",
268            "delta": {
269                "stop_reason": stop_reason,
270                "stop_sequence": null,
271            },
272            "usage": {
273                "output_tokens": self.output_tokens_accumulated,
274            }
275        });
276        Self::build_event("message_delta", payload)
277    }
278
279    fn build_message_stop() -> Event {
280        let payload = serde_json::json!({
281            "type": "message_stop",
282        });
283        Self::build_event("message_stop", payload)
284    }
285
286    /// §Q7 + §Fase 33.z.k.h — Build the `axon.metadata` extension
287    /// frame carrying the full algebraic-policy side-channel data
288    /// + flow envelope. Anthropic-compat clients ignore unknown
289    /// `event:` names per Anthropic spec §Streaming / "Other events"
290    /// ("any event-type your client doesn't recognize should be
291    /// silently dropped"); SDK-free clients + axon-aware tooling
292    /// consume `event: axon.metadata` directly to surface vertical-
293    /// regulator audit data.
294    ///
295    /// Frame shape mirrors the openai dialect's `axon_metadata` JSON
296    /// payload — adopters using both dialects (multi-provider
297    /// orchestration) get a uniform algebraic-policy surface.
298    fn build_axon_metadata_frame(&self) -> Event {
299        let mut metadata = serde_json::Map::new();
300        if let Some(envelope) = self.stashed_envelope.as_ref() {
301            metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
302            metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
303            metadata.insert(
304                "backend".to_string(),
305                serde_json::json!(&envelope.backend),
306            );
307            metadata.insert("success".to_string(), serde_json::json!(envelope.success));
308            metadata.insert(
309                "steps_executed".to_string(),
310                serde_json::json!(envelope.steps_executed),
311            );
312            metadata.insert(
313                "tokens_input".to_string(),
314                serde_json::json!(envelope.tokens_input),
315            );
316            metadata.insert(
317                "tokens_output".to_string(),
318                serde_json::json!(envelope.tokens_output),
319            );
320            metadata.insert(
321                "latency_ms".to_string(),
322                serde_json::json!(envelope.latency_ms),
323            );
324
325            // §Fase 33.e — stream_policies (elided when empty).
326            if !envelope.effect_policies.is_empty() {
327                let arr: Vec<serde_json::Value> = envelope
328                    .effect_policies
329                    .iter()
330                    .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
331                    .collect();
332                metadata.insert(
333                    "stream_policies".to_string(),
334                    serde_json::Value::Array(arr),
335                );
336            }
337            // §Fase 33.x.d — enforcement_summary (elided when empty).
338            if !envelope.enforcement_summaries.is_empty() {
339                let mut obj = serde_json::Map::new();
340                for (step, summary) in &envelope.enforcement_summaries {
341                    obj.insert(
342                        step.clone(),
343                        serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
344                    );
345                }
346                metadata.insert(
347                    "enforcement_summary".to_string(),
348                    serde_json::Value::Object(obj),
349                );
350            }
351            // §Fase 33.x.g — runtime_warnings (elided when empty).
352            if !envelope.runtime_warnings.is_empty() {
353                let arr: Vec<serde_json::Value> = envelope
354                    .runtime_warnings
355                    .iter()
356                    .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
357                    .collect();
358                metadata.insert(
359                    "runtime_warnings".to_string(),
360                    serde_json::Value::Array(arr),
361                );
362            }
363            // §Fase 33.x.f — step_audit (elided when empty).
364            if !envelope.step_audit_records.is_empty() {
365                let arr: Vec<serde_json::Value> = envelope
366                    .step_audit_records
367                    .iter()
368                    .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
369                    .collect();
370                metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
371            }
372        }
373        let terminal_str = match self.saw_terminal_reason {
374            TerminalReason::None => "none",
375            TerminalReason::Stop => "stop",
376            TerminalReason::Error => "error",
377        };
378        metadata.insert(
379            "terminal_reason".to_string(),
380            serde_json::json!(terminal_str),
381        );
382        // §Fase 37.e (D6) — honest failure: when the flow errored, the
383        // metadata frame names WHY. Elided on a non-erroring flow.
384        if let Some(err) = self.error_detail.as_ref() {
385            metadata.insert("error".to_string(), serde_json::json!(err));
386        }
387
388        let payload = serde_json::json!({
389            "type": "axon.metadata",
390            "axon_metadata": metadata,
391        });
392        Self::build_event("axon.metadata", payload)
393    }
394
395    /// Close the currently-open text block (if any). Returns the
396    /// content_block_stop frame OR an empty vec if no block is open.
397    fn close_text_block_if_open(&mut self) -> Vec<Event> {
398        if let Some(idx) = self.open_text_block.take() {
399            vec![self.build_block_stop(idx)]
400        } else {
401            Vec::new()
402        }
403    }
404}
405
406impl WireFormatAdapter for AnthropicDialectAdapter {
407    fn dialect(&self) -> &'static str {
408        "anthropic"
409    }
410
411    fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
412        match event {
413            FlowExecutionEvent::FlowStart { backend, .. } => {
414                // Capture the model identifier; emit message_start.
415                self.model = backend.clone();
416                self.message_started = true;
417                vec![self.build_message_start()]
418            }
419            FlowExecutionEvent::StepStart { .. } => {
420                // Lazy text-block open: defer until first StepToken
421                // (StepStart-without-tokens shouldn't open an empty
422                // block per Anthropic's content semantics).
423                Vec::new()
424            }
425            FlowExecutionEvent::StepToken { content, .. } => {
426                // Account toward the usage counter (best-effort).
427                self.output_tokens_accumulated += 1;
428                let mut events = Vec::new();
429                let block_idx = match self.open_text_block {
430                    Some(idx) => idx,
431                    None => {
432                        let idx = self.next_block_index;
433                        self.next_block_index += 1;
434                        events.push(self.build_text_block_start(idx));
435                        self.open_text_block = Some(idx);
436                        idx
437                    }
438                };
439                events.push(self.build_text_delta(block_idx, content));
440                events
441            }
442            FlowExecutionEvent::StepComplete { .. } => {
443                // Close any open text block; the next step's tokens
444                // will lazy-open a fresh block at a new index.
445                self.close_text_block_if_open()
446            }
447            FlowExecutionEvent::ToolCall {
448                tool_name, content, ..
449            } => {
450                // Interleave a tool_use block: close any open text
451                // block first, emit start + delta + stop for the
452                // tool_use block, advance block index past it.
453                let mut events = self.close_text_block_if_open();
454                let tool_block_idx = self.next_block_index;
455                self.next_block_index += 1;
456                events.push(self.build_tool_use_start(tool_block_idx, tool_name));
457                events.push(self.build_tool_input_delta(tool_block_idx, content));
458                events.push(self.build_block_stop(tool_block_idx));
459                events
460            }
461            FlowExecutionEvent::FlowComplete { .. } => {
462                // Close any lingering text block then emit message_delta.
463                // message_stop emits from flush_terminator() — separating
464                // the terminator allows Q7 axon.metadata to interpose.
465                self.terminal_emitted = true;
466                self.saw_terminal_reason = TerminalReason::Stop;
467                let mut events = self.close_text_block_if_open();
468                events.push(self.build_message_delta("end_turn"));
469                events
470            }
471            FlowExecutionEvent::FlowError { error, .. } => {
472                // Anthropic has no native error stop_reason; the closest
473                // canonical value for an interrupted/failed stream is
474                // "stop_sequence" or simply omitting. We emit
475                // stop_reason="end_turn" defensively + rely on the
476                // axon.metadata frame for adopters who need explicit
477                // error signaling (terminal_reason: "error").
478                self.terminal_emitted = true;
479                self.saw_terminal_reason = TerminalReason::Error;
480                // §Fase 37.e (D6) — stash the diagnostic for the
481                // axon.metadata frame's `error` field.
482                self.error_detail = Some(error.clone());
483                let mut events = self.close_text_block_if_open();
484                events.push(self.build_message_delta("end_turn"));
485                events
486            }
487        }
488    }
489
490    /// §Fase 33.z.k.h — Stash the full envelope so flush_terminator()
491    /// can populate the `axon.metadata` frame with real algebraic-
492    /// policy data, then emit the standard message_delta sequence
493    /// (close-text-block-if-open + message_delta with stop_reason).
494    fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
495        self.terminal_emitted = true;
496        self.saw_terminal_reason = TerminalReason::Stop;
497        // Stash before emitting frames so a partial-burst send failure
498        // can't strand the side-channel data.
499        self.stashed_envelope = Some(envelope.clone());
500        let mut events = self.close_text_block_if_open();
501        events.push(self.build_message_delta("end_turn"));
502        events
503    }
504
505    fn flush_terminator(&mut self) -> Vec<Event> {
506        // §Fase 33.z.k.j defense-in-depth — close any orphan text
507        // block before emitting the terminator. In production this
508        // is unreachable (build_complete_envelope_event +
509        // translate(FlowComplete/FlowError) both call
510        // close_text_block_if_open before returning), but a malformed
511        // input stream (producer crashed mid-flight, channel dropped
512        // before FlowComplete, fuzz harness driving incomplete
513        // sequences) could leave a block open. The Anthropic spec
514        // (§Streaming / "ill-formed streams") requires every
515        // content_block_start to be balanced by content_block_stop;
516        // emitting message_stop with an orphan open block produces
517        // wire that some strict-validating clients reject.
518        let mut frames = self.close_text_block_if_open();
519        // Q7 axon.metadata extension frame BEFORE the terminator
520        // (anthropic-compat clients ignore unknown events; adopters
521        // who want the algebraic-policy data subscribe to the
522        // `axon.metadata` event name explicitly).
523        frames.push(self.build_axon_metadata_frame());
524        // Then D5 message_stop per Anthropic spec.
525        frames.push(Self::build_message_stop());
526        frames
527    }
528}