use crate::error::LlmConnectorError;
use futures_util::{Stream, StreamExt};
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamFormat {
Sse,
NdJson,
Auto,
}
#[cfg(feature = "streaming")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingParseMode {
Auto,
OpenAIOnly,
OllamaStrict,
}
pub fn create_text_stream(
response: reqwest::Response,
format: StreamFormat,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
let stream = response.bytes_stream();
struct ScanState {
buffer: String,
detected_format: Option<StreamFormat>,
}
let events_stream = stream
.scan(
ScanState {
buffer: String::new(),
detected_format: if format == StreamFormat::Auto {
None
} else {
Some(format)
},
},
move |state, chunk_result| {
let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
match chunk_result {
Ok(chunk) => {
let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
state.buffer.push_str(&chunk_str);
if state.detected_format.is_none() {
if state.buffer.contains("data:") {
state.detected_format = Some(StreamFormat::Sse);
} else if state.buffer.contains('\n')
&& state.buffer.trim().starts_with('{')
{
state.detected_format = Some(StreamFormat::NdJson);
}
}
match state.detected_format {
Some(StreamFormat::Sse) => {
while let Some(boundary_idx) = state.buffer.find("\n\n") {
let event_str: String =
state.buffer.drain(..boundary_idx + 2).collect();
let mut data_lines = Vec::new();
for line in event_str.split('\n') {
let line = line.trim();
if let Some(payload) = line.strip_prefix("data:") {
let payload = payload.trim();
if !payload.is_empty() && payload != "[DONE]" {
data_lines.push(payload.to_string());
}
}
}
if !data_lines.is_empty() {
out.push(Ok(data_lines.join("\n")));
}
}
}
Some(StreamFormat::NdJson) => {
while let Some(boundary_idx) = state.buffer.find('\n') {
let line: String =
state.buffer.drain(..boundary_idx + 1).collect();
let trimmed = line.trim();
let payload = if let Some(p) = trimmed.strip_prefix("data:") {
p.trim()
} else {
trimmed
};
if !payload.is_empty() && payload != "[DONE]" {
out.push(Ok(payload.to_string()));
}
}
}
None => {
}
_ => {
}
}
}
Err(e) => {
out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
}
}
std::future::ready(Some(out))
},
)
.flat_map(futures_util::stream::iter);
Box::pin(events_stream)
}
#[inline]
pub fn sse_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
create_text_stream(response, StreamFormat::Sse)
}
#[inline]
pub fn json_lines_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
create_text_stream(response, StreamFormat::NdJson)
}
pub fn parse_sse_line(line: &str) -> Result<Option<serde_json::Value>, LlmConnectorError> {
let line = line.trim();
if line.is_empty() || line.starts_with(':') {
return Ok(None);
}
if let Some(payload) = line.strip_prefix("data:") {
let payload = payload.trim();
if payload.is_empty() || payload == "[DONE]" {
return Ok(None);
}
let value: serde_json::Value = serde_json::from_str(payload).map_err(|e| {
LlmConnectorError::ParseError(format!("Failed to parse SSE JSON: {}", e))
})?;
Ok(Some(value))
} else {
Ok(None)
}
}
#[cfg(feature = "streaming")]
pub fn sse_to_streaming_response(response: reqwest::Response) -> crate::types::ChatStream {
sse_to_streaming_response_with_mode(response, StreamingParseMode::Auto)
}
#[cfg(feature = "streaming")]
pub fn sse_to_streaming_response_with_mode(
response: reqwest::Response,
parse_mode: StreamingParseMode,
) -> crate::types::ChatStream {
use crate::types::ToolCall;
use std::collections::HashMap;
let string_stream = create_text_stream(response, StreamFormat::Auto);
let response_stream = string_stream.scan(
HashMap::<usize, ToolCall>::new(),
move |accumulated_tool_calls, result| {
let processed = result.and_then(|json_str| {
let mut streaming_response = parse_streaming_payload(&json_str, parse_mode)?;
populate_convenience_fields(&mut streaming_response);
accumulate_tool_calls(&mut streaming_response, accumulated_tool_calls);
Ok(streaming_response)
});
std::future::ready(Some(processed))
},
);
Box::pin(response_stream)
}
#[cfg(feature = "streaming")]
fn parse_streaming_payload(
json_str: &str,
parse_mode: StreamingParseMode,
) -> Result<crate::types::StreamingResponse, crate::error::LlmConnectorError> {
use crate::types::StreamingResponse;
if let Ok(mut response) = serde_json::from_str::<StreamingResponse>(json_str) {
if let Ok(raw) = serde_json::from_str::<serde_json::Value>(json_str) {
response.populate_reasoning_synonyms(&raw);
}
return Ok(response);
}
if parse_mode == StreamingParseMode::OpenAIOnly {
return Err(crate::error::LlmConnectorError::ParseError(format!(
"Failed to parse streaming response as OpenAI-compatible chunk. Content: {}",
json_str
)));
}
let raw: serde_json::Value = serde_json::from_str(json_str).map_err(|e| {
crate::error::LlmConnectorError::ParseError(format!(
"Failed to parse streaming response: {}. Content: {}",
e, json_str
))
})?;
if let Some(response) = parse_ollama_chunk(&raw, parse_mode) {
return Ok(response);
}
Err(crate::error::LlmConnectorError::ParseError(format!(
"Failed to parse streaming response: unsupported chunk format. Content: {}",
json_str
)))
}
#[cfg(feature = "streaming")]
fn parse_ollama_chunk(
raw: &serde_json::Value,
parse_mode: StreamingParseMode,
) -> Option<crate::types::StreamingResponse> {
use crate::types::{Delta, Role, StreamingChoice, StreamingResponse, Usage};
if parse_mode == StreamingParseMode::OpenAIOnly || !is_strict_ollama_chunk(raw) {
return None;
}
let model = raw.get("model")?.as_str()?.to_string();
let message = raw.get("message")?.as_object()?;
let role = message
.get("role")
.and_then(|v| v.as_str())
.and_then(|r| match r {
"system" => Some(Role::System),
"user" => Some(Role::User),
"assistant" => Some(Role::Assistant),
"tool" => Some(Role::Tool),
_ => None,
});
let content = message
.get("content")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let delta = Delta {
role,
content: if content.is_empty() {
None
} else {
Some(content.clone())
},
tool_calls: None,
reasoning_content: message
.get("reasoning_content")
.and_then(|v| v.as_str())
.map(ToString::to_string),
reasoning: message
.get("reasoning")
.and_then(|v| v.as_str())
.map(ToString::to_string),
thought: message
.get("thought")
.and_then(|v| v.as_str())
.map(ToString::to_string),
thinking: message
.get("thinking")
.and_then(|v| v.as_str())
.map(ToString::to_string),
};
let done = raw.get("done").and_then(|v| v.as_bool()).unwrap_or(false);
let finish_reason = if done {
Some(
raw.get("done_reason")
.and_then(|v| v.as_str())
.unwrap_or("stop")
.to_string(),
)
} else {
None
};
let usage = if done {
let prompt_tokens = raw
.get("prompt_eval_count")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
let completion_tokens = raw.get("eval_count").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
if prompt_tokens > 0 || completion_tokens > 0 {
Some(Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
..Default::default()
})
} else {
None
}
} else {
None
};
let created = raw
.get("created_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.timestamp() as u64)
.unwrap_or_else(|| chrono::Utc::now().timestamp() as u64);
let mut response = StreamingResponse {
id: format!("ollama-{}", created),
object: "chat.completion.chunk".to_string(),
created,
model,
choices: vec![StreamingChoice {
index: 0,
delta,
finish_reason,
logprobs: None,
}],
content,
reasoning_content: None,
usage,
system_fingerprint: None,
};
response.populate_reasoning_synonyms(raw);
Some(response)
}
#[cfg(feature = "streaming")]
fn is_strict_ollama_chunk(raw: &serde_json::Value) -> bool {
let message = match raw.get("message").and_then(|v| v.as_object()) {
Some(m) => m,
None => return false,
};
if raw.get("model").and_then(|v| v.as_str()).is_none() {
return false;
}
if raw.get("done").and_then(|v| v.as_bool()).is_none() {
return false;
}
if message.get("role").and_then(|v| v.as_str()).is_none() {
return false;
}
if !message
.get("content")
.map(|v| v.is_string())
.unwrap_or(false)
{
return false;
}
raw.get("created_at").and_then(|v| v.as_str()).is_some()
|| raw.get("done_reason").and_then(|v| v.as_str()).is_some()
|| raw
.get("prompt_eval_count")
.and_then(|v| v.as_u64())
.is_some()
|| raw.get("eval_count").and_then(|v| v.as_u64()).is_some()
|| raw.get("total_duration").and_then(|v| v.as_u64()).is_some()
|| raw.get("remote_model").and_then(|v| v.as_str()).is_some()
|| raw.get("remote_host").and_then(|v| v.as_str()).is_some()
}
#[cfg(feature = "streaming")]
fn populate_convenience_fields(response: &mut crate::types::StreamingResponse) {
if response.content.is_empty()
&& let Some(choice) = response.choices.first()
{
let content_to_use = choice
.delta
.content
.as_ref()
.filter(|s| !s.is_empty())
.or(choice.delta.reasoning_content.as_ref())
.or(choice.delta.reasoning.as_ref())
.or(choice.delta.thought.as_ref())
.or(choice.delta.thinking.as_ref());
if let Some(content) = content_to_use {
response.content = content.clone();
}
}
}
#[cfg(feature = "streaming")]
fn accumulate_tool_calls(
response: &mut crate::types::StreamingResponse,
accumulated: &mut std::collections::HashMap<usize, crate::types::ToolCall>,
) {
if let Some(choice) = response.choices.first_mut()
&& let Some(delta_tool_calls) = &choice.delta.tool_calls
{
for delta_call in delta_tool_calls {
let index = delta_call.index.unwrap_or(0);
accumulated
.entry(index)
.and_modify(|existing| existing.merge_delta(delta_call))
.or_insert_with(|| delta_call.clone());
}
let complete_calls: Vec<crate::types::ToolCall> = accumulated
.values()
.filter(|call| call.is_complete())
.cloned()
.collect();
if !complete_calls.is_empty() {
choice.delta.tool_calls = Some(complete_calls);
} else {
choice.delta.tool_calls = None;
}
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "streaming")]
#[test]
fn test_parse_ollama_chunk_with_thinking() {
let chunk = r#"{"model":"kimi-k2.5:cloud","created_at":"2026-03-05T08:32:36.674615034Z","message":{"role":"assistant","content":"","thinking":"step-by-step"},"done":false}"#;
let parsed = super::parse_streaming_payload(chunk, super::StreamingParseMode::OllamaStrict)
.expect("should parse ollama chunk");
assert_eq!(parsed.model, "kimi-k2.5:cloud");
assert_eq!(parsed.choices.len(), 1);
assert_eq!(
parsed.choices[0].delta.thinking.as_deref(),
Some("step-by-step")
);
}
#[cfg(feature = "streaming")]
#[test]
fn test_parse_ollama_done_chunk_with_usage() {
let chunk = r#"{"model":"kimi-k2.5:cloud","created_at":"2026-03-05T08:32:36.674615034Z","message":{"role":"assistant","content":"done"},"done":true,"done_reason":"stop","prompt_eval_count":10,"eval_count":20}"#;
let parsed = super::parse_streaming_payload(chunk, super::StreamingParseMode::OllamaStrict)
.expect("should parse ollama done chunk");
assert_eq!(parsed.choices[0].finish_reason.as_deref(), Some("stop"));
assert_eq!(parsed.usage.as_ref().map(|u| u.total_tokens), Some(30));
}
#[cfg(feature = "streaming")]
#[test]
fn test_openai_only_mode_rejects_ollama_chunk() {
let chunk = r#"{"model":"kimi-k2.5:cloud","created_at":"2026-03-05T08:32:36.674615034Z","message":{"role":"assistant","content":""},"done":false}"#;
let result = super::parse_streaming_payload(chunk, super::StreamingParseMode::OpenAIOnly);
assert!(result.is_err());
}
#[tokio::test]
async fn test_sse_detection() {
let _mock_response = "data: {\"test\":1}\n\ndata: {\"test\":2}\n\n";
}
}