use super::{CompleteEnvelope, WireFormatAdapter};
use crate::flow_execution_event::FlowExecutionEvent;
use axum::response::sse::Event;
pub struct AxonDialectAdapter {
event_id_counter: u64,
trace_id: u64,
terminal_emitted: bool,
}
impl AxonDialectAdapter {
pub fn new(trace_id: u64) -> Self {
Self {
event_id_counter: 1,
trace_id,
terminal_emitted: false,
}
}
fn next_id(&mut self) -> u64 {
let id = self.event_id_counter;
self.event_id_counter += 1;
id
}
fn build_token_event(&mut self, step_name: &str, token: &str, timestamp_ms: i64) -> Event {
let data = serde_json::json!({
"step": step_name,
"trace_id": self.trace_id,
"token": token,
"timestamp_ms": timestamp_ms,
});
let event_id = self.next_id();
Event::default()
.event("axon.token")
.id(event_id.to_string())
.data(serde_json::to_string(&data).unwrap_or_default())
}
fn build_tool_call_event(
&mut self,
step_name: &str,
tool_name: &str,
content: &str,
timestamp_ms: u64,
) -> Event {
let data = serde_json::json!({
"step": step_name,
"trace_id": self.trace_id,
"tool_name": tool_name,
"content": content,
"timestamp_ms": timestamp_ms,
});
let event_id = self.next_id();
Event::default()
.event("axon.tool_call")
.id(event_id.to_string())
.data(serde_json::to_string(&data).unwrap_or_default())
}
fn build_error_event(&mut self, error_msg: &str) -> Event {
let data = serde_json::json!({
"trace_id": self.trace_id,
"error": error_msg,
"recoverable": false,
});
let event_id = self.next_id();
Event::default()
.event("axon.error")
.id(event_id.to_string())
.data(serde_json::to_string(&data).unwrap_or_default())
}
fn build_complete_event(
&mut self,
flow_name: &str,
backend: &str,
success: bool,
steps_executed: u64,
tokens_input: u64,
tokens_output: u64,
latency_ms: u64,
) -> Event {
let data = serde_json::json!({
"trace_id": self.trace_id,
"flow": flow_name,
"backend": backend,
"steps_executed": steps_executed,
"tokens_input": tokens_input,
"tokens_output": tokens_output,
"latency_ms": latency_ms,
"success": success,
});
let event_id = self.next_id();
Event::default()
.event("axon.complete")
.id(event_id.to_string())
.data(serde_json::to_string(&data).unwrap_or_default())
}
}
impl AxonDialectAdapter {
fn build_full_complete_event(&mut self, envelope: &CompleteEnvelope) -> Event {
let mut data = serde_json::json!({
"trace_id": envelope.trace_id,
"flow": envelope.flow_name,
"backend": envelope.backend,
"steps_executed": envelope.steps_executed,
"tokens_input": envelope.tokens_input,
"tokens_output": envelope.tokens_output,
"latency_ms": envelope.latency_ms,
"success": envelope.success,
});
if !envelope.effect_policies.is_empty() {
let arr = envelope
.effect_policies
.iter()
.map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
.collect::<Vec<_>>();
data.as_object_mut()
.expect("json object")
.insert("stream_policies".to_string(), serde_json::Value::Array(arr));
}
if !envelope.enforcement_summaries.is_empty() {
let mut obj = serde_json::Map::new();
for (step, summary) in &envelope.enforcement_summaries {
obj.insert(
step.clone(),
serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
);
}
data.as_object_mut().expect("json object").insert(
"enforcement_summary".to_string(),
serde_json::Value::Object(obj),
);
}
if !envelope.runtime_warnings.is_empty() {
let arr = envelope
.runtime_warnings
.iter()
.map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
.collect::<Vec<_>>();
data.as_object_mut()
.expect("json object")
.insert("warnings".to_string(), serde_json::Value::Array(arr));
}
if !envelope.epistemic_envelopes.is_empty() {
let arr = envelope
.epistemic_envelopes
.iter()
.map(|e| serde_json::to_value(e).unwrap_or(serde_json::Value::Null))
.collect::<Vec<_>>();
data.as_object_mut().expect("json object").insert(
"epistemic_envelopes".to_string(),
serde_json::Value::Array(arr),
);
}
let event_id = self.next_id();
Event::default()
.event("axon.complete")
.id(event_id.to_string())
.data(serde_json::to_string(&data).unwrap_or_default())
}
}
impl WireFormatAdapter for AxonDialectAdapter {
fn dialect(&self) -> &'static str {
"axon"
}
fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
self.terminal_emitted = true;
vec![self.build_full_complete_event(envelope)]
}
fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
match event {
FlowExecutionEvent::FlowStart { .. } => Vec::new(),
FlowExecutionEvent::StepStart { .. } => Vec::new(),
FlowExecutionEvent::StepToken {
step_name,
content,
timestamp_ms,
..
} => vec![self.build_token_event(step_name, content, *timestamp_ms as i64)],
FlowExecutionEvent::StepComplete { .. } => Vec::new(),
FlowExecutionEvent::ToolCall {
step_name,
tool_name,
content,
timestamp_ms,
} => vec![self.build_tool_call_event(
step_name,
tool_name,
content,
*timestamp_ms,
)],
FlowExecutionEvent::FlowComplete {
flow_name,
backend,
success,
steps_executed,
tokens_input,
tokens_output,
latency_ms,
..
} => {
self.terminal_emitted = true;
vec![self.build_complete_event(
flow_name,
backend,
*success,
*steps_executed as u64,
*tokens_input as u64,
*tokens_output as u64,
*latency_ms,
)]
}
FlowExecutionEvent::FlowError { error, .. } => {
self.terminal_emitted = true;
vec![self.build_error_event(error)]
}
}
}
fn flush_terminator(&mut self) -> Vec<Event> {
Vec::new()
}
}