use crate::{
chat::{ChatCompletionRequest, ChatCompletionResponse, Choice},
client::OpenRouterClient,
error::{OpenRouterError, Result},
types::{Message, Usage},
};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
pub type ChatCompletionStream = Pin<Box<dyn Stream<Item = Result<ChatCompletionChunk>> + Send>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatCompletionChunk {
pub id: String,
pub object: String,
pub created: i64,
pub model: String,
pub choices: Vec<StreamingChoice>,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<Usage>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ChunkError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingChoice {
pub index: u32,
pub delta: DeltaMessage,
#[serde(skip_serializing_if = "Option::is_none")]
pub finish_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub native_finish_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ChoiceError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChoiceError {
pub code: u16,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkError {
pub code: u16,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl OpenRouterClient {
pub async fn chat_completion_stream(
&self,
mut request: ChatCompletionRequest,
) -> Result<ChatCompletionStream> {
request.stream = Some(true);
let url = format!("{}/chat/completions", self.base_url);
let headers = self.build_headers()?;
let response = self
.client
.post(&url)
.headers(headers)
.json(&request)
.send()
.await
.map_err(OpenRouterError::HttpError)?;
let status = response.status();
if !status.is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(OpenRouterError::ApiError {
code: status.as_u16(),
message: error_text,
});
}
let stream = response
.bytes_stream()
.map(|result| {
result.map_err(OpenRouterError::HttpError)
})
.filter_map(|result| async move {
match result {
Ok(bytes) => {
let text = String::from_utf8_lossy(&bytes);
parse_sse_chunk(&text)
}
Err(e) => Some(Err(e)),
}
});
Ok(Box::pin(stream))
}
}
fn parse_sse_chunk(text: &str) -> Option<Result<ChatCompletionChunk>> {
let mut result = None;
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with(':') {
continue;
}
if line.starts_with("data: ") {
let data = &line[6..];
if data == "[DONE]" {
return None;
}
match serde_json::from_str::<ChatCompletionChunk>(data) {
Ok(chunk) => {
if let Some(ref error) = chunk.error {
return Some(Err(OpenRouterError::StreamError(format!(
"Stream error: {} - {}",
error.code, error.message
))));
}
result = Some(Ok(chunk));
}
Err(_) => continue,
}
}
}
result
}
pub async fn collect_stream(stream: ChatCompletionStream) -> Result<ChatCompletionResponse> {
let mut chunks: Vec<ChatCompletionChunk> = Vec::new();
let mut full_content = String::new();
let mut role = String::new();
let mut last_usage: Option<Usage> = None;
let mut finish_reason: Option<String> = None;
let mut native_finish_reason: Option<String> = None;
let mut id = String::new();
let mut object = String::new();
let mut created: i64 = 0;
let mut model = String::new();
let mut stream = stream;
while let Some(result) = stream.next().await {
let chunk = result?;
if id.is_empty() {
id = chunk.id.clone();
object = chunk.object.clone();
created = chunk.created;
model = chunk.model.clone();
}
if let Some(ref usage) = chunk.usage {
last_usage = Some(usage.clone());
}
for choice in &chunk.choices {
if let Some(ref r) = choice.delta.role {
role = r.clone();
}
if let Some(ref content) = choice.delta.content {
full_content.push_str(content);
}
if let Some(ref fr) = choice.finish_reason {
finish_reason = Some(fr.clone());
}
if let Some(ref nfr) = choice.native_finish_reason {
native_finish_reason = Some(nfr.clone());
}
}
chunks.push(chunk);
}
Ok(ChatCompletionResponse {
id,
object,
created,
model,
choices: vec![Choice {
index: 0,
message: Message {
role: if role == "assistant" {
crate::types::Role::Assistant
} else {
crate::types::Role::User
},
content: Some(full_content),
name: None,
tool_calls: None,
},
finish_reason,
native_finish_reason,
error: None,
}],
usage: last_usage,
system_fingerprint: None,
})
}