use std::collections::{BTreeMap, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::stream::BoxStream;
use futures_util::{Stream, StreamExt};
use serde_json::Value;
use crate::error::OpenRouterError;
use crate::types::completion::{
CompletionsResponse, FunctionCall, PartialToolCall, ReasoningDetail, ResponseUsage, ToolCall,
};
use crate::{
api::{
messages::{AnthropicContentPart, AnthropicMessagesSseEvent, AnthropicMessagesStreamEvent},
responses::ResponsesStreamEvent,
},
types::completion::FinishReason,
};
#[derive(Debug)]
pub enum StreamEvent {
ContentDelta(String),
ReasoningDelta(String),
ReasoningDetailsDelta(Vec<ReasoningDetail>),
Done {
tool_calls: Vec<ToolCall>,
finish_reason: Option<FinishReason>,
usage: Option<ResponseUsage>,
id: String,
model: String,
},
Error(OpenRouterError),
}
#[derive(Debug, Clone, Default)]
struct ToolCallAccumulator {
id: Option<String>,
type_: Option<String>,
name: Option<String>,
arguments: String,
}
impl ToolCallAccumulator {
fn merge(&mut self, partial: &PartialToolCall) {
if let Some(id) = &partial.id {
self.id = Some(id.clone());
}
if let Some(type_) = &partial.type_ {
self.type_ = Some(type_.clone());
}
if let Some(func) = &partial.function {
if let Some(name) = &func.name {
self.name = Some(name.clone());
}
if let Some(args) = &func.arguments {
self.arguments.push_str(args);
}
}
}
fn into_tool_call(self) -> Option<ToolCall> {
Some(ToolCall {
id: self.id?,
type_: self.type_.unwrap_or_else(|| "function".to_string()),
function: FunctionCall {
name: self.name?,
arguments: self.arguments,
},
index: None,
})
}
}
pub struct ToolAwareStream {
inner: BoxStream<'static, Result<CompletionsResponse, OpenRouterError>>,
tool_accumulators: BTreeMap<u32, ToolCallAccumulator>,
pending_events: Vec<StreamEvent>,
last_id: String,
last_model: String,
last_usage: Option<ResponseUsage>,
last_finish_reason: Option<FinishReason>,
finished: bool,
}
impl ToolAwareStream {
pub fn new(inner: BoxStream<'static, Result<CompletionsResponse, OpenRouterError>>) -> Self {
Self {
inner,
tool_accumulators: BTreeMap::new(),
pending_events: Vec::new(),
last_id: String::new(),
last_model: String::new(),
last_usage: None,
last_finish_reason: None,
finished: false,
}
}
fn process_chunk(&mut self, response: CompletionsResponse) {
self.last_id.clone_from(&response.id);
self.last_model.clone_from(&response.model);
if response.usage.is_some() {
self.last_usage = response.usage;
}
for choice in &response.choices {
if let Some(reason) = choice.finish_reason() {
self.last_finish_reason = Some(reason.clone());
}
if let Some(content) = choice.content() {
if !content.is_empty() {
self.pending_events
.push(StreamEvent::ContentDelta(content.to_string()));
}
}
if let Some(reasoning) = choice.reasoning() {
if !reasoning.is_empty() {
self.pending_events
.push(StreamEvent::ReasoningDelta(reasoning.to_string()));
}
}
if let Some(details) = choice.reasoning_details() {
if !details.is_empty() {
self.pending_events
.push(StreamEvent::ReasoningDetailsDelta(details.to_vec()));
}
}
if let Some(partial_tool_calls) = choice.partial_tool_calls() {
for partial in partial_tool_calls {
let idx = partial.index.unwrap_or(0);
let acc = self.tool_accumulators.entry(idx).or_default();
acc.merge(partial);
}
}
}
}
fn finalize(&mut self) {
let tool_calls: Vec<ToolCall> = self
.tool_accumulators
.values()
.cloned()
.filter_map(|acc| acc.into_tool_call())
.collect();
self.pending_events.push(StreamEvent::Done {
tool_calls,
finish_reason: self.last_finish_reason.take(),
usage: self.last_usage.take(),
id: std::mem::take(&mut self.last_id),
model: std::mem::take(&mut self.last_model),
});
self.finished = true;
}
}
impl Stream for ToolAwareStream {
type Item = StreamEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.pending_events.is_empty() {
return Poll::Ready(Some(self.pending_events.remove(0)));
}
if self.finished {
return Poll::Ready(None);
}
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
self.process_chunk(response);
if !self.pending_events.is_empty() {
Poll::Ready(Some(self.pending_events.remove(0)))
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(StreamEvent::Error(e))),
Poll::Ready(None) => {
if !self.finished {
self.finalize();
if !self.pending_events.is_empty() {
Poll::Ready(Some(self.pending_events.remove(0)))
} else {
Poll::Ready(None)
}
} else {
Poll::Ready(None)
}
}
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnifiedStreamSource {
Chat,
Responses,
Messages,
}
#[derive(Debug)]
pub enum UnifiedStreamEvent {
ContentDelta(String),
ReasoningDelta(String),
ReasoningDetailsDelta(Vec<ReasoningDetail>),
ToolDelta(Value),
Raw {
source: UnifiedStreamSource,
event_type: String,
data: Value,
},
Done {
source: UnifiedStreamSource,
id: Option<String>,
model: Option<String>,
finish_reason: Option<String>,
usage: Option<Value>,
},
Error(OpenRouterError),
}
pub type UnifiedStream = BoxStream<'static, UnifiedStreamEvent>;
#[derive(Debug, Default)]
struct StreamMeta {
id: Option<String>,
model: Option<String>,
finish_reason: Option<String>,
usage: Option<Value>,
}
fn finish_reason_to_string(reason: &FinishReason) -> &'static str {
match reason {
FinishReason::ToolCalls => "tool_calls",
FinishReason::Stop => "stop",
FinishReason::Length => "length",
FinishReason::ContentFilter => "content_filter",
FinishReason::Error => "error",
}
}
pub fn adapt_chat_stream(
inner: BoxStream<'static, Result<CompletionsResponse, OpenRouterError>>,
) -> UnifiedStream {
struct State {
inner: BoxStream<'static, Result<CompletionsResponse, OpenRouterError>>,
pending: VecDeque<UnifiedStreamEvent>,
done_emitted: bool,
meta: StreamMeta,
}
let state = State {
inner,
pending: VecDeque::new(),
done_emitted: false,
meta: StreamMeta::default(),
};
futures_util::stream::unfold(state, |mut state| async move {
loop {
if let Some(event) = state.pending.pop_front() {
return Some((event, state));
}
if state.done_emitted {
return None;
}
match state.inner.next().await {
Some(Ok(response)) => {
state.meta.id = Some(response.id.clone());
state.meta.model = Some(response.model.clone());
if let Some(usage) = response.usage {
state.meta.usage = serde_json::to_value(usage).ok();
}
for choice in &response.choices {
if let Some(content) = choice.content() {
if !content.is_empty() {
state.pending.push_back(UnifiedStreamEvent::ContentDelta(
content.to_string(),
));
}
}
if let Some(reasoning) = choice.reasoning() {
if !reasoning.is_empty() {
state.pending.push_back(UnifiedStreamEvent::ReasoningDelta(
reasoning.to_string(),
));
}
}
if let Some(reasoning_details) = choice.reasoning_details() {
if !reasoning_details.is_empty() {
state
.pending
.push_back(UnifiedStreamEvent::ReasoningDetailsDelta(
reasoning_details.to_vec(),
));
}
}
if let Some(partials) = choice.partial_tool_calls() {
for partial in partials {
state.pending.push_back(UnifiedStreamEvent::ToolDelta(
serde_json::to_value(partial).unwrap_or(Value::Null),
));
}
}
if let Some(reason) = choice.finish_reason() {
state.meta.finish_reason =
Some(finish_reason_to_string(reason).to_string());
}
}
}
Some(Err(error)) => {
state.pending.push_back(UnifiedStreamEvent::Error(error));
}
None => {
state.done_emitted = true;
state.pending.push_back(UnifiedStreamEvent::Done {
source: UnifiedStreamSource::Chat,
id: state.meta.id.take(),
model: state.meta.model.take(),
finish_reason: state.meta.finish_reason.take(),
usage: state.meta.usage.take(),
});
}
}
}
})
.boxed()
}
pub fn adapt_responses_stream(
inner: BoxStream<'static, Result<ResponsesStreamEvent, OpenRouterError>>,
) -> UnifiedStream {
struct State {
inner: BoxStream<'static, Result<ResponsesStreamEvent, OpenRouterError>>,
pending: VecDeque<UnifiedStreamEvent>,
done_emitted: bool,
meta: StreamMeta,
}
let state = State {
inner,
pending: VecDeque::new(),
done_emitted: false,
meta: StreamMeta::default(),
};
futures_util::stream::unfold(state, |mut state| async move {
loop {
if let Some(event) = state.pending.pop_front() {
return Some((event, state));
}
if state.done_emitted {
return None;
}
match state.inner.next().await {
Some(Ok(event)) => {
let event_type = event.event_type.clone();
let data_value = serde_json::to_value(&event.data).unwrap_or(Value::Null);
let mut emitted = false;
if let Some(response) = event.data.get("response") {
if let Some(id) = response.get("id").and_then(Value::as_str) {
state.meta.id = Some(id.to_string());
}
if let Some(model) = response.get("model").and_then(Value::as_str) {
state.meta.model = Some(model.to_string());
}
if let Some(status) = response.get("status").and_then(Value::as_str) {
state.meta.finish_reason = Some(status.to_string());
}
if let Some(usage) = response.get("usage") {
state.meta.usage = Some(usage.clone());
}
}
if event_type.contains("output_text.delta") {
if let Some(delta) = event.data.get("delta").and_then(Value::as_str) {
state
.pending
.push_back(UnifiedStreamEvent::ContentDelta(delta.to_string()));
emitted = true;
}
}
if !emitted && event_type.contains("reasoning") {
let reasoning = event
.data
.get("delta")
.and_then(Value::as_str)
.or_else(|| event.data.get("text").and_then(Value::as_str))
.or_else(|| event.data.get("reasoning").and_then(Value::as_str));
if let Some(reasoning) = reasoning {
state.pending.push_back(UnifiedStreamEvent::ReasoningDelta(
reasoning.to_string(),
));
emitted = true;
}
}
if !emitted && event_type.contains("tool") {
state
.pending
.push_back(UnifiedStreamEvent::ToolDelta(data_value.clone()));
emitted = true;
}
if event_type == "response.completed" {
state.done_emitted = true;
state.pending.push_back(UnifiedStreamEvent::Done {
source: UnifiedStreamSource::Responses,
id: state.meta.id.take(),
model: state.meta.model.take(),
finish_reason: state.meta.finish_reason.take(),
usage: state.meta.usage.take(),
});
continue;
}
if !emitted {
state.pending.push_back(UnifiedStreamEvent::Raw {
source: UnifiedStreamSource::Responses,
event_type,
data: data_value,
});
}
}
Some(Err(error)) => {
state.pending.push_back(UnifiedStreamEvent::Error(error));
}
None => {
state.done_emitted = true;
state.pending.push_back(UnifiedStreamEvent::Done {
source: UnifiedStreamSource::Responses,
id: state.meta.id.take(),
model: state.meta.model.take(),
finish_reason: state.meta.finish_reason.take(),
usage: state.meta.usage.take(),
});
}
}
}
})
.boxed()
}
pub fn adapt_messages_stream(
inner: BoxStream<'static, Result<AnthropicMessagesSseEvent, OpenRouterError>>,
) -> UnifiedStream {
struct State {
inner: BoxStream<'static, Result<AnthropicMessagesSseEvent, OpenRouterError>>,
pending: VecDeque<UnifiedStreamEvent>,
done_emitted: bool,
meta: StreamMeta,
}
let state = State {
inner,
pending: VecDeque::new(),
done_emitted: false,
meta: StreamMeta::default(),
};
futures_util::stream::unfold(state, |mut state| async move {
loop {
if let Some(event) = state.pending.pop_front() {
return Some((event, state));
}
if state.done_emitted {
return None;
}
match state.inner.next().await {
Some(Ok(event)) => {
let event_name = event.event.clone();
match event.data {
AnthropicMessagesStreamEvent::MessageStart { message } => {
state.meta.id = message.id.clone();
state.meta.model = message.model.clone();
if let Some(usage) = message.usage {
state.meta.usage = serde_json::to_value(usage).ok();
}
}
AnthropicMessagesStreamEvent::MessageDelta { delta, usage } => {
state.meta.usage = Some(usage);
if let Some(reason) = delta.get("stop_reason").and_then(Value::as_str) {
state.meta.finish_reason = Some(reason.to_string());
}
let text = delta
.get("text")
.and_then(Value::as_str)
.or_else(|| delta.get("output_text").and_then(Value::as_str));
if let Some(text) = text {
state
.pending
.push_back(UnifiedStreamEvent::ContentDelta(text.to_string()));
}
}
AnthropicMessagesStreamEvent::ContentBlockStart {
index,
content_block,
} => match *content_block {
AnthropicContentPart::Thinking { thinking, .. } => {
state
.pending
.push_back(UnifiedStreamEvent::ReasoningDelta(thinking));
}
AnthropicContentPart::ToolUse { .. }
| AnthropicContentPart::ServerToolUse { .. } => {
let content_block_value =
serde_json::to_value(content_block).unwrap_or(Value::Null);
state.pending.push_back(UnifiedStreamEvent::ToolDelta(
serde_json::json!({
"index": index,
"content_block": content_block_value,
}),
));
}
_ => {}
},
AnthropicMessagesStreamEvent::ContentBlockDelta { index, delta } => {
let delta_type = delta
.get("type")
.and_then(Value::as_str)
.unwrap_or_default();
if delta_type.contains("text_delta") {
if let Some(text) = delta.get("text").and_then(Value::as_str) {
state.pending.push_back(UnifiedStreamEvent::ContentDelta(
text.to_string(),
));
}
} else if delta_type.contains("thinking") {
let reasoning = delta
.get("thinking")
.and_then(Value::as_str)
.or_else(|| delta.get("text").and_then(Value::as_str));
if let Some(reasoning) = reasoning {
state.pending.push_back(UnifiedStreamEvent::ReasoningDelta(
reasoning.to_string(),
));
}
} else if delta_type.contains("tool")
|| delta_type.contains("json")
|| delta.get("partial_json").is_some()
{
state.pending.push_back(UnifiedStreamEvent::ToolDelta(
serde_json::json!({
"index": index,
"delta": delta
}),
));
} else {
state.pending.push_back(UnifiedStreamEvent::Raw {
source: UnifiedStreamSource::Messages,
event_type: event_name,
data: delta,
});
}
}
AnthropicMessagesStreamEvent::MessageStop => {
state.done_emitted = true;
state.pending.push_back(UnifiedStreamEvent::Done {
source: UnifiedStreamSource::Messages,
id: state.meta.id.take(),
model: state.meta.model.take(),
finish_reason: state.meta.finish_reason.take(),
usage: state.meta.usage.take(),
});
}
AnthropicMessagesStreamEvent::Error { error } => {
let message = error
.get("message")
.and_then(Value::as_str)
.map(ToOwned::to_owned)
.unwrap_or_else(|| error.to_string());
state.pending.push_back(UnifiedStreamEvent::Error(
OpenRouterError::Unknown(format!(
"messages stream error event: {message}"
)),
));
}
AnthropicMessagesStreamEvent::ContentBlockStop { .. }
| AnthropicMessagesStreamEvent::Ping => {}
}
}
Some(Err(error)) => {
state.pending.push_back(UnifiedStreamEvent::Error(error));
}
None => {
state.done_emitted = true;
state.pending.push_back(UnifiedStreamEvent::Done {
source: UnifiedStreamSource::Messages,
id: state.meta.id.take(),
model: state.meta.model.take(),
finish_reason: state.meta.finish_reason.take(),
usage: state.meta.usage.take(),
});
}
}
}
})
.boxed()
}