axon/wire_format/anthropic_dialect.rs
1//! §Fase 33.z.k.f (v1.28.0) — `anthropic` Messages streaming dialect adapter.
2//!
3//! Matches the Anthropic Messages API streaming wire verbatim per
4//! the published API reference:
5//!
6//! https://docs.anthropic.com/en/api/messages-streaming
7//!
8//! # Wire shape (verbatim from Anthropic reference)
9//!
10//! Anthropic streams W3C SSE with NAMED events forming a structured
11//! lifecycle:
12//!
13//! ```text
14//! event: message_start
15//! data: {"type": "message_start", "message": {"id": "...", "type": "message",
16//! "role": "assistant", "content": [], "model": "...",
17//! "stop_reason": null, "stop_sequence": null,
18//! "usage": {"input_tokens": N, "output_tokens": N}}}
19//!
20//! event: content_block_start
21//! data: {"type": "content_block_start", "index": 0,
22//! "content_block": {"type": "text", "text": ""}}
23//!
24//! event: content_block_delta
25//! data: {"type": "content_block_delta", "index": 0,
26//! "delta": {"type": "text_delta", "text": "<token>"}}
27//!
28//! event: content_block_stop
29//! data: {"type": "content_block_stop", "index": 0}
30//!
31//! event: message_delta
32//! data: {"type": "message_delta",
33//! "delta": {"stop_reason": "end_turn", "stop_sequence": null},
34//! "usage": {"output_tokens": N}}
35//!
36//! event: message_stop
37//! data: {"type": "message_stop"}
38//! ```
39//!
40//! Event taxonomy:
41//! - `message_start`: announces the message; carries id, role,
42//! model, initial usage
43//! - `content_block_start`: announces a content block (text,
44//! tool_use, etc.); index 0+, monotonically increasing
45//! - `content_block_delta`: incremental content within a block;
46//! for text blocks: `delta: {"type": "text_delta", "text": "..."}`;
47//! for tool_use blocks: `delta: {"type": "input_json_delta",
48//! "partial_json": "..."}`
49//! - `content_block_stop`: closes the block at the given index
50//! - `message_delta`: final usage + stop_reason envelope
51//! - `message_stop`: stream terminator
52//!
53//! Tool-use blocks (per Anthropic spec):
54//!
55//! ```text
56//! event: content_block_start
57//! data: {"type": "content_block_start", "index": N,
58//! "content_block": {"type": "tool_use", "id": "toolu_xxx",
59//! "name": "<tool>", "input": {}}}
60//!
61//! event: content_block_delta
62//! data: {"type": "content_block_delta", "index": N,
63//! "delta": {"type": "input_json_delta",
64//! "partial_json": "<json string>"}}
65//!
66//! event: content_block_stop
67//! data: {"type": "content_block_stop", "index": N}
68//! ```
69//!
70//! # axon → anthropic event mapping
71//!
72//! Adopts on-demand text-block management: a text content block is
73//! lazily opened on the first StepToken arrival and stopped on
74//! StepComplete OR when a ToolCall arrives (interleaving the
75//! tool_use block). The block index advances monotonically.
76//!
77//! | axon FlowExecutionEvent | anthropic wire frames |
78//! |--------------------------|-------------------------------------------------------|
79//! | FlowStart | 1 frame `event: message_start` |
80//! | StepStart | 0 frames (lazy text-block opens on first StepToken) |
81//! | StepToken | (0 or 1) content_block_start(text) + 1 content_block_delta(text_delta) |
82//! | StepComplete | (0 or 1) content_block_stop (closes open text block) |
83//! | ToolCall | (0 or 1) content_block_stop(text) + 3 frames for tool_use block |
84//! | FlowComplete | (0 or 1) content_block_stop + 1 message_delta |
85//! | FlowError | (0 or 1) content_block_stop + 1 message_delta (stop_reason=error_signal) |
86//! | flush_terminator | 1 axon.metadata frame (Q7) + 1 message_stop |
87//!
88//! # Pillar trace (D10)
89//!
90//! - MATHEMATICS — content_block indices are monotonic integers;
91//! block lifecycle (start→delta*→stop) is a finite state machine.
92//! - LOGIC — every translate() call respects the
93//! open-block-must-close-before-new-block invariant.
94//! - PHILOSOPHY — adopters using Anthropic SDKs (python-anthropic,
95//! anthropic-sdk-typescript, vercel/ai-sdk anthropic provider)
96//! parse the wire verbatim without any axon-specific awareness.
97//! - COMPUTING — state machine is bounded (one current text block
98//! index + one rolling block counter); per-event O(1).
99
100use super::{CompleteEnvelope, WireFormatAdapter};
101use crate::flow_execution_event::FlowExecutionEvent;
102use axum::response::sse::Event;
103
104/// The Anthropic Messages streaming dialect adapter.
105pub struct AnthropicDialectAdapter {
106 /// Stable message id across the stream. Synthesized from trace_id.
107 message_id: String,
108 /// Model identifier; constant across the stream. Captured from
109 /// the FlowStart event's backend.
110 model: String,
111 /// Whether `message_start` has been emitted (defensive — should
112 /// be true after the first FlowStart translate).
113 message_started: bool,
114 /// Next content_block index to assign. Monotonically increases.
115 next_block_index: usize,
116 /// Index of the currently-open text block, if any. Lazy-opened on
117 /// the first StepToken; closed on StepComplete / ToolCall /
118 /// FlowComplete.
119 open_text_block: Option<usize>,
120 /// Per-request running counter for synthesizing tool_use IDs.
121 tool_use_counter: u64,
122 /// Output token count accumulator (best-effort; populated from
123 /// StepToken arrivals). Surfaced on the terminal message_delta.
124 output_tokens_accumulated: u64,
125 /// Whether FlowComplete / FlowError has been translated.
126 terminal_emitted: bool,
127 /// Tracks how the stream terminated for adopter visibility on
128 /// the axon.metadata frame.
129 saw_terminal_reason: TerminalReason,
130 /// §Fase 33.z.k.h — Algebraic-policy envelope stashed at
131 /// FlowComplete time so `flush_terminator()` can emit a
132 /// POPULATED `axon.metadata` frame. `None` pre-FlowComplete.
133 /// Adopters consuming the anthropic wire receive per-step
134 /// provenance (banking PCI DSS Req 10 / government FedRAMP
135 /// AU-2 / legal FRE 502 / medicine 21 CFR Part 11 §11.10) via
136 /// this extension frame; anthropic-compat SDKs ignore unknown
137 /// event names verbatim per Anthropic spec §Streaming.
138 stashed_envelope: Option<CompleteEnvelope>,
139 /// §Fase 37.e (D6) — the `FlowError.error` diagnostic, stashed when
140 /// a `FlowError` is translated so `build_axon_metadata_frame()`
141 /// surfaces it as the metadata frame's `error` field. Anthropic
142 /// has no native error stop_reason; without this the wire signalled
143 /// `terminal_reason: error` but never said WHY. `None` on a flow
144 /// that did not error.
145 error_detail: Option<String>,
146}
147
148/// Closed-catalog terminal reason mirror (parallel to OpenAI dialect).
149#[derive(Debug, Clone, Copy, PartialEq)]
150enum TerminalReason {
151 None,
152 Stop,
153 Error,
154}
155
156impl AnthropicDialectAdapter {
157 /// Construct a fresh adapter for a request.
158 pub fn new(trace_id: u64) -> Self {
159 let message_id = format!("msg_axon_{trace_id:x}");
160 Self {
161 message_id,
162 model: "axon".to_string(),
163 message_started: false,
164 next_block_index: 0,
165 open_text_block: None,
166 tool_use_counter: 0,
167 output_tokens_accumulated: 0,
168 terminal_emitted: false,
169 saw_terminal_reason: TerminalReason::None,
170 stashed_envelope: None,
171 error_detail: None,
172 }
173 }
174
175 fn build_event(event_name: &'static str, payload: serde_json::Value) -> Event {
176 Event::default()
177 .event(event_name)
178 .data(serde_json::to_string(&payload).unwrap_or_default())
179 }
180
181 fn build_message_start(&self) -> Event {
182 let payload = serde_json::json!({
183 "type": "message_start",
184 "message": {
185 "id": &self.message_id,
186 "type": "message",
187 "role": "assistant",
188 "content": [],
189 "model": &self.model,
190 "stop_reason": null,
191 "stop_sequence": null,
192 "usage": {
193 "input_tokens": 0,
194 "output_tokens": 0,
195 }
196 }
197 });
198 Self::build_event("message_start", payload)
199 }
200
201 fn build_text_block_start(&self, index: usize) -> Event {
202 let payload = serde_json::json!({
203 "type": "content_block_start",
204 "index": index,
205 "content_block": {
206 "type": "text",
207 "text": "",
208 }
209 });
210 Self::build_event("content_block_start", payload)
211 }
212
213 fn build_text_delta(&self, index: usize, text: &str) -> Event {
214 let payload = serde_json::json!({
215 "type": "content_block_delta",
216 "index": index,
217 "delta": {
218 "type": "text_delta",
219 "text": text,
220 }
221 });
222 Self::build_event("content_block_delta", payload)
223 }
224
225 fn build_block_stop(&self, index: usize) -> Event {
226 let payload = serde_json::json!({
227 "type": "content_block_stop",
228 "index": index,
229 });
230 Self::build_event("content_block_stop", payload)
231 }
232
233 fn build_tool_use_start(&mut self, index: usize, tool_name: &str) -> Event {
234 self.tool_use_counter += 1;
235 let tool_id = format!(
236 "toolu_axon_{}_{}",
237 self.message_id.strip_prefix("msg_axon_").unwrap_or("0"),
238 self.tool_use_counter
239 );
240 let payload = serde_json::json!({
241 "type": "content_block_start",
242 "index": index,
243 "content_block": {
244 "type": "tool_use",
245 "id": tool_id,
246 "name": tool_name,
247 "input": {},
248 }
249 });
250 Self::build_event("content_block_start", payload)
251 }
252
253 fn build_tool_input_delta(&self, index: usize, partial_json: &str) -> Event {
254 let payload = serde_json::json!({
255 "type": "content_block_delta",
256 "index": index,
257 "delta": {
258 "type": "input_json_delta",
259 "partial_json": partial_json,
260 }
261 });
262 Self::build_event("content_block_delta", payload)
263 }
264
265 fn build_message_delta(&self, stop_reason: &str) -> Event {
266 let payload = serde_json::json!({
267 "type": "message_delta",
268 "delta": {
269 "stop_reason": stop_reason,
270 "stop_sequence": null,
271 },
272 "usage": {
273 "output_tokens": self.output_tokens_accumulated,
274 }
275 });
276 Self::build_event("message_delta", payload)
277 }
278
279 fn build_message_stop() -> Event {
280 let payload = serde_json::json!({
281 "type": "message_stop",
282 });
283 Self::build_event("message_stop", payload)
284 }
285
286 /// §Q7 + §Fase 33.z.k.h — Build the `axon.metadata` extension
287 /// frame carrying the full algebraic-policy side-channel data
288 /// + flow envelope. Anthropic-compat clients ignore unknown
289 /// `event:` names per Anthropic spec §Streaming / "Other events"
290 /// ("any event-type your client doesn't recognize should be
291 /// silently dropped"); SDK-free clients + axon-aware tooling
292 /// consume `event: axon.metadata` directly to surface vertical-
293 /// regulator audit data.
294 ///
295 /// Frame shape mirrors the openai dialect's `axon_metadata` JSON
296 /// payload — adopters using both dialects (multi-provider
297 /// orchestration) get a uniform algebraic-policy surface.
298 fn build_axon_metadata_frame(&self) -> Event {
299 let mut metadata = serde_json::Map::new();
300 if let Some(envelope) = self.stashed_envelope.as_ref() {
301 metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
302 metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
303 metadata.insert(
304 "backend".to_string(),
305 serde_json::json!(&envelope.backend),
306 );
307 metadata.insert("success".to_string(), serde_json::json!(envelope.success));
308 metadata.insert(
309 "steps_executed".to_string(),
310 serde_json::json!(envelope.steps_executed),
311 );
312 metadata.insert(
313 "tokens_input".to_string(),
314 serde_json::json!(envelope.tokens_input),
315 );
316 metadata.insert(
317 "tokens_output".to_string(),
318 serde_json::json!(envelope.tokens_output),
319 );
320 metadata.insert(
321 "latency_ms".to_string(),
322 serde_json::json!(envelope.latency_ms),
323 );
324
325 // §Fase 33.e — stream_policies (elided when empty).
326 if !envelope.effect_policies.is_empty() {
327 let arr: Vec<serde_json::Value> = envelope
328 .effect_policies
329 .iter()
330 .map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
331 .collect();
332 metadata.insert(
333 "stream_policies".to_string(),
334 serde_json::Value::Array(arr),
335 );
336 }
337 // §Fase 33.x.d — enforcement_summary (elided when empty).
338 if !envelope.enforcement_summaries.is_empty() {
339 let mut obj = serde_json::Map::new();
340 for (step, summary) in &envelope.enforcement_summaries {
341 obj.insert(
342 step.clone(),
343 serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
344 );
345 }
346 metadata.insert(
347 "enforcement_summary".to_string(),
348 serde_json::Value::Object(obj),
349 );
350 }
351 // §Fase 33.x.g — runtime_warnings (elided when empty).
352 if !envelope.runtime_warnings.is_empty() {
353 let arr: Vec<serde_json::Value> = envelope
354 .runtime_warnings
355 .iter()
356 .map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
357 .collect();
358 metadata.insert(
359 "runtime_warnings".to_string(),
360 serde_json::Value::Array(arr),
361 );
362 }
363 // §Fase 33.x.f — step_audit (elided when empty).
364 if !envelope.step_audit_records.is_empty() {
365 let arr: Vec<serde_json::Value> = envelope
366 .step_audit_records
367 .iter()
368 .map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
369 .collect();
370 metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
371 }
372 }
373 let terminal_str = match self.saw_terminal_reason {
374 TerminalReason::None => "none",
375 TerminalReason::Stop => "stop",
376 TerminalReason::Error => "error",
377 };
378 metadata.insert(
379 "terminal_reason".to_string(),
380 serde_json::json!(terminal_str),
381 );
382 // §Fase 37.e (D6) — honest failure: when the flow errored, the
383 // metadata frame names WHY. Elided on a non-erroring flow.
384 if let Some(err) = self.error_detail.as_ref() {
385 metadata.insert("error".to_string(), serde_json::json!(err));
386 }
387
388 let payload = serde_json::json!({
389 "type": "axon.metadata",
390 "axon_metadata": metadata,
391 });
392 Self::build_event("axon.metadata", payload)
393 }
394
395 /// Close the currently-open text block (if any). Returns the
396 /// content_block_stop frame OR an empty vec if no block is open.
397 fn close_text_block_if_open(&mut self) -> Vec<Event> {
398 if let Some(idx) = self.open_text_block.take() {
399 vec![self.build_block_stop(idx)]
400 } else {
401 Vec::new()
402 }
403 }
404}
405
406impl WireFormatAdapter for AnthropicDialectAdapter {
407 fn dialect(&self) -> &'static str {
408 "anthropic"
409 }
410
411 fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
412 match event {
413 FlowExecutionEvent::FlowStart { backend, .. } => {
414 // Capture the model identifier; emit message_start.
415 self.model = backend.clone();
416 self.message_started = true;
417 vec![self.build_message_start()]
418 }
419 FlowExecutionEvent::StepStart { .. } => {
420 // Lazy text-block open: defer until first StepToken
421 // (StepStart-without-tokens shouldn't open an empty
422 // block per Anthropic's content semantics).
423 Vec::new()
424 }
425 FlowExecutionEvent::StepToken { content, .. } => {
426 // Account toward the usage counter (best-effort).
427 self.output_tokens_accumulated += 1;
428 let mut events = Vec::new();
429 let block_idx = match self.open_text_block {
430 Some(idx) => idx,
431 None => {
432 let idx = self.next_block_index;
433 self.next_block_index += 1;
434 events.push(self.build_text_block_start(idx));
435 self.open_text_block = Some(idx);
436 idx
437 }
438 };
439 events.push(self.build_text_delta(block_idx, content));
440 events
441 }
442 FlowExecutionEvent::StepComplete { .. } => {
443 // Close any open text block; the next step's tokens
444 // will lazy-open a fresh block at a new index.
445 self.close_text_block_if_open()
446 }
447 FlowExecutionEvent::ToolCall {
448 tool_name, content, ..
449 } => {
450 // Interleave a tool_use block: close any open text
451 // block first, emit start + delta + stop for the
452 // tool_use block, advance block index past it.
453 let mut events = self.close_text_block_if_open();
454 let tool_block_idx = self.next_block_index;
455 self.next_block_index += 1;
456 events.push(self.build_tool_use_start(tool_block_idx, tool_name));
457 events.push(self.build_tool_input_delta(tool_block_idx, content));
458 events.push(self.build_block_stop(tool_block_idx));
459 events
460 }
461 FlowExecutionEvent::FlowComplete { .. } => {
462 // Close any lingering text block then emit message_delta.
463 // message_stop emits from flush_terminator() — separating
464 // the terminator allows Q7 axon.metadata to interpose.
465 self.terminal_emitted = true;
466 self.saw_terminal_reason = TerminalReason::Stop;
467 let mut events = self.close_text_block_if_open();
468 events.push(self.build_message_delta("end_turn"));
469 events
470 }
471 FlowExecutionEvent::FlowError { error, .. } => {
472 // Anthropic has no native error stop_reason; the closest
473 // canonical value for an interrupted/failed stream is
474 // "stop_sequence" or simply omitting. We emit
475 // stop_reason="end_turn" defensively + rely on the
476 // axon.metadata frame for adopters who need explicit
477 // error signaling (terminal_reason: "error").
478 self.terminal_emitted = true;
479 self.saw_terminal_reason = TerminalReason::Error;
480 // §Fase 37.e (D6) — stash the diagnostic for the
481 // axon.metadata frame's `error` field.
482 self.error_detail = Some(error.clone());
483 let mut events = self.close_text_block_if_open();
484 events.push(self.build_message_delta("end_turn"));
485 events
486 }
487 }
488 }
489
490 /// §Fase 33.z.k.h — Stash the full envelope so flush_terminator()
491 /// can populate the `axon.metadata` frame with real algebraic-
492 /// policy data, then emit the standard message_delta sequence
493 /// (close-text-block-if-open + message_delta with stop_reason).
494 fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
495 self.terminal_emitted = true;
496 self.saw_terminal_reason = TerminalReason::Stop;
497 // Stash before emitting frames so a partial-burst send failure
498 // can't strand the side-channel data.
499 self.stashed_envelope = Some(envelope.clone());
500 let mut events = self.close_text_block_if_open();
501 events.push(self.build_message_delta("end_turn"));
502 events
503 }
504
505 fn flush_terminator(&mut self) -> Vec<Event> {
506 // §Fase 33.z.k.j defense-in-depth — close any orphan text
507 // block before emitting the terminator. In production this
508 // is unreachable (build_complete_envelope_event +
509 // translate(FlowComplete/FlowError) both call
510 // close_text_block_if_open before returning), but a malformed
511 // input stream (producer crashed mid-flight, channel dropped
512 // before FlowComplete, fuzz harness driving incomplete
513 // sequences) could leave a block open. The Anthropic spec
514 // (§Streaming / "ill-formed streams") requires every
515 // content_block_start to be balanced by content_block_stop;
516 // emitting message_stop with an orphan open block produces
517 // wire that some strict-validating clients reject.
518 let mut frames = self.close_text_block_if_open();
519 // Q7 axon.metadata extension frame BEFORE the terminator
520 // (anthropic-compat clients ignore unknown events; adopters
521 // who want the algebraic-policy data subscribe to the
522 // `axon.metadata` event name explicitly).
523 frames.push(self.build_axon_metadata_frame());
524 // Then D5 message_stop per Anthropic spec.
525 frames.push(Self::build_message_stop());
526 frames
527 }
528}