use super::{CompleteEnvelope, WireFormatAdapter};
use crate::flow_execution_event::FlowExecutionEvent;
use axum::response::sse::Event;
pub struct AnthropicDialectAdapter {
message_id: String,
model: String,
message_started: bool,
next_block_index: usize,
open_text_block: Option<usize>,
tool_use_counter: u64,
output_tokens_accumulated: u64,
terminal_emitted: bool,
saw_terminal_reason: TerminalReason,
stashed_envelope: Option<CompleteEnvelope>,
error_detail: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum TerminalReason {
None,
Stop,
Error,
}
impl AnthropicDialectAdapter {
pub fn new(trace_id: u64) -> Self {
let message_id = format!("msg_axon_{trace_id:x}");
Self {
message_id,
model: "axon".to_string(),
message_started: false,
next_block_index: 0,
open_text_block: None,
tool_use_counter: 0,
output_tokens_accumulated: 0,
terminal_emitted: false,
saw_terminal_reason: TerminalReason::None,
stashed_envelope: None,
error_detail: None,
}
}
fn build_event(event_name: &'static str, payload: serde_json::Value) -> Event {
Event::default()
.event(event_name)
.data(serde_json::to_string(&payload).unwrap_or_default())
}
fn build_message_start(&self) -> Event {
let payload = serde_json::json!({
"type": "message_start",
"message": {
"id": &self.message_id,
"type": "message",
"role": "assistant",
"content": [],
"model": &self.model,
"stop_reason": null,
"stop_sequence": null,
"usage": {
"input_tokens": 0,
"output_tokens": 0,
}
}
});
Self::build_event("message_start", payload)
}
fn build_text_block_start(&self, index: usize) -> Event {
let payload = serde_json::json!({
"type": "content_block_start",
"index": index,
"content_block": {
"type": "text",
"text": "",
}
});
Self::build_event("content_block_start", payload)
}
fn build_text_delta(&self, index: usize, text: &str) -> Event {
let payload = serde_json::json!({
"type": "content_block_delta",
"index": index,
"delta": {
"type": "text_delta",
"text": text,
}
});
Self::build_event("content_block_delta", payload)
}
fn build_block_stop(&self, index: usize) -> Event {
let payload = serde_json::json!({
"type": "content_block_stop",
"index": index,
});
Self::build_event("content_block_stop", payload)
}
fn build_tool_use_start(&mut self, index: usize, tool_name: &str) -> Event {
self.tool_use_counter += 1;
let tool_id = format!(
"toolu_axon_{}_{}",
self.message_id.strip_prefix("msg_axon_").unwrap_or("0"),
self.tool_use_counter
);
let payload = serde_json::json!({
"type": "content_block_start",
"index": index,
"content_block": {
"type": "tool_use",
"id": tool_id,
"name": tool_name,
"input": {},
}
});
Self::build_event("content_block_start", payload)
}
fn build_tool_input_delta(&self, index: usize, partial_json: &str) -> Event {
let payload = serde_json::json!({
"type": "content_block_delta",
"index": index,
"delta": {
"type": "input_json_delta",
"partial_json": partial_json,
}
});
Self::build_event("content_block_delta", payload)
}
fn build_message_delta(&self, stop_reason: &str) -> Event {
let payload = serde_json::json!({
"type": "message_delta",
"delta": {
"stop_reason": stop_reason,
"stop_sequence": null,
},
"usage": {
"output_tokens": self.output_tokens_accumulated,
}
});
Self::build_event("message_delta", payload)
}
fn build_message_stop() -> Event {
let payload = serde_json::json!({
"type": "message_stop",
});
Self::build_event("message_stop", payload)
}
fn build_axon_metadata_frame(&self) -> Event {
let mut metadata = serde_json::Map::new();
if let Some(envelope) = self.stashed_envelope.as_ref() {
metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
metadata.insert(
"backend".to_string(),
serde_json::json!(&envelope.backend),
);
metadata.insert("success".to_string(), serde_json::json!(envelope.success));
metadata.insert(
"steps_executed".to_string(),
serde_json::json!(envelope.steps_executed),
);
metadata.insert(
"tokens_input".to_string(),
serde_json::json!(envelope.tokens_input),
);
metadata.insert(
"tokens_output".to_string(),
serde_json::json!(envelope.tokens_output),
);
metadata.insert(
"latency_ms".to_string(),
serde_json::json!(envelope.latency_ms),
);
if !envelope.effect_policies.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.effect_policies
.iter()
.map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
.collect();
metadata.insert(
"stream_policies".to_string(),
serde_json::Value::Array(arr),
);
}
if !envelope.enforcement_summaries.is_empty() {
let mut obj = serde_json::Map::new();
for (step, summary) in &envelope.enforcement_summaries {
obj.insert(
step.clone(),
serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
);
}
metadata.insert(
"enforcement_summary".to_string(),
serde_json::Value::Object(obj),
);
}
if !envelope.runtime_warnings.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.runtime_warnings
.iter()
.map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
.collect();
metadata.insert(
"runtime_warnings".to_string(),
serde_json::Value::Array(arr),
);
}
if !envelope.step_audit_records.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.step_audit_records
.iter()
.map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
.collect();
metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
}
}
let terminal_str = match self.saw_terminal_reason {
TerminalReason::None => "none",
TerminalReason::Stop => "stop",
TerminalReason::Error => "error",
};
metadata.insert(
"terminal_reason".to_string(),
serde_json::json!(terminal_str),
);
if let Some(err) = self.error_detail.as_ref() {
metadata.insert("error".to_string(), serde_json::json!(err));
}
let payload = serde_json::json!({
"type": "axon.metadata",
"axon_metadata": metadata,
});
Self::build_event("axon.metadata", payload)
}
fn close_text_block_if_open(&mut self) -> Vec<Event> {
if let Some(idx) = self.open_text_block.take() {
vec![self.build_block_stop(idx)]
} else {
Vec::new()
}
}
}
impl WireFormatAdapter for AnthropicDialectAdapter {
fn dialect(&self) -> &'static str {
"anthropic"
}
fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
match event {
FlowExecutionEvent::FlowStart { backend, .. } => {
self.model = backend.clone();
self.message_started = true;
vec![self.build_message_start()]
}
FlowExecutionEvent::StepStart { .. } => {
Vec::new()
}
FlowExecutionEvent::StepToken { content, .. } => {
self.output_tokens_accumulated += 1;
let mut events = Vec::new();
let block_idx = match self.open_text_block {
Some(idx) => idx,
None => {
let idx = self.next_block_index;
self.next_block_index += 1;
events.push(self.build_text_block_start(idx));
self.open_text_block = Some(idx);
idx
}
};
events.push(self.build_text_delta(block_idx, content));
events
}
FlowExecutionEvent::StepComplete { .. } => {
self.close_text_block_if_open()
}
FlowExecutionEvent::ToolCall {
tool_name, content, ..
} => {
let mut events = self.close_text_block_if_open();
let tool_block_idx = self.next_block_index;
self.next_block_index += 1;
events.push(self.build_tool_use_start(tool_block_idx, tool_name));
events.push(self.build_tool_input_delta(tool_block_idx, content));
events.push(self.build_block_stop(tool_block_idx));
events
}
FlowExecutionEvent::FlowComplete { .. } => {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Stop;
let mut events = self.close_text_block_if_open();
events.push(self.build_message_delta("end_turn"));
events
}
FlowExecutionEvent::FlowError { error, .. } => {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Error;
self.error_detail = Some(error.clone());
let mut events = self.close_text_block_if_open();
events.push(self.build_message_delta("end_turn"));
events
}
}
}
fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Stop;
self.stashed_envelope = Some(envelope.clone());
let mut events = self.close_text_block_if_open();
events.push(self.build_message_delta("end_turn"));
events
}
fn flush_terminator(&mut self) -> Vec<Event> {
let mut frames = self.close_text_block_if_open();
frames.push(self.build_axon_metadata_frame());
frames.push(Self::build_message_stop());
frames
}
}