use crate::anthropic;
use crate::mapping::reverse_message_map::{
anthropic_stop_reason_to_openai, AnthropicTranslationContext,
};
use crate::openai;
use crate::openai::streaming::{
ChatCompletionChunk, ChunkChoice, ChunkDelta, ChunkFunctionCall, ChunkToolCall,
};
pub const DONE_SENTINEL: &str = "[DONE]";
pub struct ReverseStreamingTranslator {
message_id: String,
model: String,
tool_call_index: i32,
input_tokens: Option<u32>,
output_tokens: Option<u32>,
created: u64,
done: bool,
context: AnthropicTranslationContext,
}
impl ReverseStreamingTranslator {
pub fn new(id: String, model: String) -> Self {
Self::with_context(id, model, AnthropicTranslationContext::default())
}
pub fn with_context(id: String, model: String, context: AnthropicTranslationContext) -> Self {
Self {
message_id: id,
model,
tool_call_index: -1,
input_tokens: None,
output_tokens: None,
created: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
done: false,
context,
}
}
pub fn is_done(&self) -> bool {
self.done
}
pub fn process_event(&mut self, event: &anthropic::StreamEvent) -> Vec<ChatCompletionChunk> {
match event {
anthropic::StreamEvent::MessageStart { message } => {
self.input_tokens = Some(message.usage.input_tokens);
if let Some(created) = message.created {
self.created = created;
}
vec![self.make_chunk(
ChunkDelta {
role: Some(openai::ChatRole::Assistant),
..Default::default()
},
None,
)]
}
anthropic::StreamEvent::ContentBlockStart {
content_block:
anthropic::ContentBlock::ToolUse { id, name, .. }
| anthropic::ContentBlock::ServerToolUse { id, name, .. },
..
} => {
self.tool_call_index += 1;
let tc = ChunkToolCall {
index: self.tool_call_index as u32,
id: Some(id.clone()),
call_type: Some("function".to_string()),
function: Some(ChunkFunctionCall {
name: Some(self.context.original_tool_name(name)),
arguments: Some(String::new()),
}),
};
vec![self.make_chunk(
ChunkDelta {
tool_calls: Some(vec![tc]),
..Default::default()
},
None,
)]
}
anthropic::StreamEvent::ContentBlockStart { .. } => vec![],
anthropic::StreamEvent::ContentBlockDelta { delta, .. } => match delta {
anthropic::streaming::Delta::TextDelta { text } => {
vec![self.make_chunk(
ChunkDelta {
content: Some(text.clone()),
..Default::default()
},
None,
)]
}
anthropic::streaming::Delta::InputJsonDelta { partial_json } => {
if self.tool_call_index < 0 {
return vec![];
}
let tc = ChunkToolCall {
index: self.tool_call_index as u32,
id: None,
call_type: None,
function: Some(ChunkFunctionCall {
name: None,
arguments: Some(partial_json.clone()),
}),
};
vec![self.make_chunk(
ChunkDelta {
tool_calls: Some(vec![tc]),
..Default::default()
},
None,
)]
}
anthropic::streaming::Delta::ThinkingDelta { thinking } => {
vec![self.make_chunk(
ChunkDelta {
reasoning_content: Some(thinking.clone()),
..Default::default()
},
None,
)]
}
anthropic::streaming::Delta::SignatureDelta { .. } => vec![],
_ => vec![],
},
anthropic::StreamEvent::ContentBlockStop { .. } => vec![],
anthropic::StreamEvent::MessageDelta { delta, usage } => {
if let Some(u) = usage {
self.output_tokens = Some(u.output_tokens);
}
let finish_reason = delta
.stop_reason
.as_ref()
.map(anthropic_stop_reason_to_openai);
let mut chunks = vec![self.make_chunk(ChunkDelta::default(), finish_reason)];
if let (Some(input), Some(output)) = (self.input_tokens, self.output_tokens) {
chunks.push(ChatCompletionChunk {
id: self.message_id.clone(),
object: "chat.completion.chunk".to_string(),
model: self.model.clone(),
choices: vec![],
usage: Some(openai::ChatUsage {
prompt_tokens: input,
completion_tokens: output,
total_tokens: input + output,
completion_tokens_details: None,
prompt_tokens_details: None,
}),
created: Some(self.created),
system_fingerprint: None,
});
}
chunks
}
anthropic::StreamEvent::MessageStop {} => {
self.done = true;
vec![]
}
anthropic::StreamEvent::Ping {} => vec![],
anthropic::StreamEvent::Error { .. } => {
self.done = true;
vec![]
}
_ => vec![],
}
}
fn make_chunk(
&self,
delta: ChunkDelta,
finish_reason: Option<openai::FinishReason>,
) -> ChatCompletionChunk {
ChatCompletionChunk {
id: self.message_id.clone(),
object: "chat.completion.chunk".to_string(),
model: self.model.clone(),
choices: vec![ChunkChoice {
index: 0,
delta,
finish_reason,
logprobs: None,
}],
usage: None,
created: Some(self.created),
system_fingerprint: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::anthropic::messages::{ContentBlock, StopReason, Usage};
use crate::anthropic::streaming::*;
fn make_translator() -> ReverseStreamingTranslator {
ReverseStreamingTranslator::new("chatcmpl-test".to_string(), "gpt-4o".to_string())
}
#[test]
fn message_start_emits_role_chunk() {
let mut t = make_translator();
let event = StreamEvent::MessageStart {
message: MessageStartData {
id: "msg_123".to_string(),
msg_type: "message".to_string(),
role: "assistant".to_string(),
content: vec![],
model: "claude-sonnet".to_string(),
stop_reason: None,
stop_sequence: None,
usage: Usage {
input_tokens: 10,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
..Default::default()
},
created: Some(1700000000),
},
};
let chunks = t.process_event(&event);
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].choices[0].delta.role,
Some(openai::ChatRole::Assistant)
);
assert!(chunks[0].choices[0].finish_reason.is_none());
}
#[test]
fn text_delta_emits_content_chunk() {
let mut t = make_translator();
let event = StreamEvent::ContentBlockDelta {
index: 0,
delta: Delta::TextDelta {
text: "Hello".to_string(),
},
};
let chunks = t.process_event(&event);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].choices[0].delta.content.as_deref(), Some("Hello"));
}
#[test]
fn tool_use_streaming() {
let mut t = make_translator();
let start = StreamEvent::ContentBlockStart {
index: 0,
content_block: ContentBlock::ToolUse {
id: "call_123".to_string(),
name: "get_weather".to_string(),
input: serde_json::Value::Object(serde_json::Map::new()),
},
};
let chunks = t.process_event(&start);
assert_eq!(chunks.len(), 1);
let tc = &chunks[0].choices[0].delta.tool_calls.as_ref().unwrap()[0];
assert_eq!(tc.id.as_deref(), Some("call_123"));
assert_eq!(
tc.function.as_ref().unwrap().name.as_deref(),
Some("get_weather")
);
let delta = StreamEvent::ContentBlockDelta {
index: 0,
delta: Delta::InputJsonDelta {
partial_json: "{\"loc".to_string(),
},
};
let chunks = t.process_event(&delta);
assert_eq!(chunks.len(), 1);
let tc = &chunks[0].choices[0].delta.tool_calls.as_ref().unwrap()[0];
assert_eq!(tc.index, 0);
assert!(tc.id.is_none()); assert_eq!(
tc.function.as_ref().unwrap().arguments.as_deref(),
Some("{\"loc")
);
}
#[test]
fn server_tool_use_streaming_restores_original_name() {
let req: openai::ChatCompletionRequest = serde_json::from_value(serde_json::json!({
"model": "claude",
"messages": [{"role": "user", "content": "hi"}],
"tools": [{"type": "function", "function": {"name": "bad.name", "parameters": {"type": "object"}}}],
"max_tokens": 100
}))
.unwrap();
let context =
crate::mapping::reverse_message_map::AnthropicTranslationContext::from_openai_request(
&req,
);
let mut t = ReverseStreamingTranslator::with_context(
"chatcmpl-test".to_string(),
"gpt-4o".to_string(),
context,
);
let event = StreamEvent::ContentBlockStart {
index: 0,
content_block: ContentBlock::ServerToolUse {
id: "srv_1".to_string(),
name: "bad_name".to_string(),
input: serde_json::json!({}),
},
};
let chunks = t.process_event(&event);
let tc = &chunks[0].choices[0].delta.tool_calls.as_ref().unwrap()[0];
assert_eq!(tc.id.as_deref(), Some("srv_1"));
assert_eq!(
tc.function.as_ref().unwrap().name.as_deref(),
Some("bad.name")
);
}
#[test]
fn thinking_delta_emits_reasoning_content() {
let mut t = make_translator();
let event = StreamEvent::ContentBlockDelta {
index: 0,
delta: Delta::ThinkingDelta {
thinking: "Let me think...".to_string(),
},
};
let chunks = t.process_event(&event);
assert_eq!(chunks.len(), 1);
assert_eq!(
chunks[0].choices[0].delta.reasoning_content.as_deref(),
Some("Let me think...")
);
}
#[test]
fn message_delta_emits_finish_reason_and_usage() {
let mut t = make_translator();
let start = StreamEvent::MessageStart {
message: MessageStartData {
id: "msg_1".to_string(),
msg_type: "message".to_string(),
role: "assistant".to_string(),
content: vec![],
model: "claude".to_string(),
stop_reason: None,
stop_sequence: None,
usage: Usage {
input_tokens: 10,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
..Default::default()
},
created: None,
},
};
t.process_event(&start);
let event = StreamEvent::MessageDelta {
delta: MessageDeltaData {
stop_reason: Some(StopReason::EndTurn),
stop_sequence: None,
..Default::default()
},
usage: Some(DeltaUsage { output_tokens: 5 }),
};
let chunks = t.process_event(&event);
assert_eq!(chunks.len(), 2); assert_eq!(
chunks[0].choices[0].finish_reason,
Some(openai::FinishReason::Stop)
);
let usage = chunks[1].usage.as_ref().unwrap();
assert_eq!(usage.prompt_tokens, 10);
assert_eq!(usage.completion_tokens, 5);
assert_eq!(usage.total_tokens, 15);
}
#[test]
fn message_stop_sets_done() {
let mut t = make_translator();
assert!(!t.is_done());
t.process_event(&StreamEvent::MessageStop {});
assert!(t.is_done());
}
#[test]
fn ping_produces_no_chunks() {
let mut t = make_translator();
let chunks = t.process_event(&StreamEvent::Ping {});
assert!(chunks.is_empty());
}
#[test]
fn multiple_tool_calls_track_index() {
let mut t = make_translator();
let start1 = StreamEvent::ContentBlockStart {
index: 0,
content_block: ContentBlock::ToolUse {
id: "call_1".to_string(),
name: "fn_a".to_string(),
input: serde_json::Value::Object(serde_json::Map::new()),
},
};
let chunks = t.process_event(&start1);
assert_eq!(
chunks[0].choices[0].delta.tool_calls.as_ref().unwrap()[0].index,
0
);
let start2 = StreamEvent::ContentBlockStart {
index: 1,
content_block: ContentBlock::ToolUse {
id: "call_2".to_string(),
name: "fn_b".to_string(),
input: serde_json::Value::Object(serde_json::Map::new()),
},
};
let chunks = t.process_event(&start2);
assert_eq!(
chunks[0].choices[0].delta.tool_calls.as_ref().unwrap()[0].index,
1
);
}
}