use super::{CompleteEnvelope, WireFormatAdapter};
use crate::flow_execution_event::FlowExecutionEvent;
use axum::response::sse::Event;
pub struct OpenAIDialectAdapter {
response_id: String,
created: u64,
model: String,
role_marker_emitted: bool,
terminal_emitted: bool,
tool_call_counter: u64,
saw_terminal_reason: TerminalReason,
stashed_envelope: Option<CompleteEnvelope>,
error_detail: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum TerminalReason {
None,
Stop,
Error,
}
impl OpenAIDialectAdapter {
pub fn new(trace_id: u64) -> Self {
let response_id = format!("chatcmpl-axon-{trace_id:x}");
let created = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
response_id,
created,
model: "axon".to_string(),
role_marker_emitted: false,
terminal_emitted: false,
tool_call_counter: 0,
saw_terminal_reason: TerminalReason::None,
stashed_envelope: None,
error_detail: None,
}
}
fn build_chunk_frame(
&self,
delta: serde_json::Value,
finish_reason: Option<&str>,
) -> Event {
let choice = serde_json::json!({
"index": 0,
"delta": delta,
"finish_reason": finish_reason,
});
let payload = serde_json::json!({
"id": &self.response_id,
"object": "chat.completion.chunk",
"created": self.created,
"model": &self.model,
"choices": [choice],
});
Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
}
fn next_tool_call_id(&mut self) -> String {
self.tool_call_counter += 1;
format!(
"call_{}_{}",
self.response_id.strip_prefix("chatcmpl-axon-").unwrap_or("0"),
self.tool_call_counter,
)
}
fn build_axon_metadata_frame(&self) -> Event {
let mut metadata = serde_json::Map::new();
if let Some(envelope) = self.stashed_envelope.as_ref() {
metadata.insert("trace_id".to_string(), serde_json::json!(envelope.trace_id));
metadata.insert("flow".to_string(), serde_json::json!(&envelope.flow_name));
metadata.insert(
"backend".to_string(),
serde_json::json!(&envelope.backend),
);
metadata.insert("success".to_string(), serde_json::json!(envelope.success));
metadata.insert(
"steps_executed".to_string(),
serde_json::json!(envelope.steps_executed),
);
metadata.insert(
"tokens_input".to_string(),
serde_json::json!(envelope.tokens_input),
);
metadata.insert(
"tokens_output".to_string(),
serde_json::json!(envelope.tokens_output),
);
metadata.insert(
"latency_ms".to_string(),
serde_json::json!(envelope.latency_ms),
);
if !envelope.effect_policies.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.effect_policies
.iter()
.map(|(step, policy)| serde_json::json!({"step": step, "policy": policy}))
.collect();
metadata.insert(
"stream_policies".to_string(),
serde_json::Value::Array(arr),
);
}
if !envelope.enforcement_summaries.is_empty() {
let mut obj = serde_json::Map::new();
for (step, summary) in &envelope.enforcement_summaries {
obj.insert(
step.clone(),
serde_json::to_value(summary).unwrap_or(serde_json::Value::Null),
);
}
metadata.insert(
"enforcement_summary".to_string(),
serde_json::Value::Object(obj),
);
}
if !envelope.runtime_warnings.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.runtime_warnings
.iter()
.map(|w| serde_json::to_value(w).unwrap_or(serde_json::Value::Null))
.collect();
metadata.insert(
"runtime_warnings".to_string(),
serde_json::Value::Array(arr),
);
}
if !envelope.step_audit_records.is_empty() {
let arr: Vec<serde_json::Value> = envelope
.step_audit_records
.iter()
.map(|r| serde_json::to_value(r).unwrap_or(serde_json::Value::Null))
.collect();
metadata.insert("step_audit".to_string(), serde_json::Value::Array(arr));
}
}
let terminal_str = match self.saw_terminal_reason {
TerminalReason::None => "none",
TerminalReason::Stop => "stop",
TerminalReason::Error => "error",
};
metadata.insert(
"terminal_reason".to_string(),
serde_json::json!(terminal_str),
);
if let Some(err) = self.error_detail.as_ref() {
metadata.insert("error".to_string(), serde_json::json!(err));
}
let payload = serde_json::json!({ "axon_metadata": metadata });
Event::default().data(serde_json::to_string(&payload).unwrap_or_default())
}
}
impl WireFormatAdapter for OpenAIDialectAdapter {
fn dialect(&self) -> &'static str {
"openai"
}
fn build_complete_envelope_event(&mut self, envelope: &CompleteEnvelope) -> Vec<Event> {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Stop;
self.stashed_envelope = Some(envelope.clone());
vec![self.build_chunk_frame(serde_json::json!({}), Some("stop"))]
}
fn translate(&mut self, event: &FlowExecutionEvent) -> Vec<Event> {
match event {
FlowExecutionEvent::FlowStart { backend, .. } => {
self.model = backend.clone();
self.role_marker_emitted = true;
vec![self.build_chunk_frame(
serde_json::json!({"role": "assistant"}),
None,
)]
}
FlowExecutionEvent::StepStart { .. } => {
Vec::new()
}
FlowExecutionEvent::StepToken { content, .. } => {
vec![self.build_chunk_frame(
serde_json::json!({"content": content}),
None,
)]
}
FlowExecutionEvent::StepComplete { .. } => Vec::new(),
FlowExecutionEvent::ToolCall {
tool_name,
content,
..
} => {
let call_id = self.next_tool_call_id();
let delta = serde_json::json!({
"tool_calls": [{
"index": 0,
"id": call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": content,
}
}]
});
vec![self.build_chunk_frame(delta, None)]
}
FlowExecutionEvent::FlowComplete { .. } => {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Stop;
vec![self.build_chunk_frame(
serde_json::json!({}),
Some("stop"),
)]
}
FlowExecutionEvent::FlowError { error, .. } => {
self.terminal_emitted = true;
self.saw_terminal_reason = TerminalReason::Error;
self.error_detail = Some(error.clone());
vec![self.build_chunk_frame(
serde_json::json!({}),
Some("stop"),
)]
}
}
}
fn flush_terminator(&mut self) -> Vec<Event> {
vec![
self.build_axon_metadata_frame(),
Event::default().data("[DONE]"),
]
}
}