use super::request::{Role, ToolCall};
use super::response::{ChatResponse, Usage};
use serde::{Deserialize, Serialize};
#[cfg(feature = "streaming")]
use futures_util::Stream;
#[cfg(feature = "streaming")]
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum StreamingFormat {
#[serde(rename = "openai")]
#[default]
OpenAI,
#[serde(rename = "ollama")]
Ollama,
#[serde(rename = "anthropic")]
Anthropic,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum StreamFormat {
#[serde(rename = "json")]
#[default]
Json,
#[serde(rename = "sse")]
SSE,
#[serde(rename = "ndjson")]
NDJSON,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamChunk {
pub data: serde_json::Value,
pub format: StreamFormat,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl StreamChunk {
pub fn new(data: serde_json::Value, format: StreamFormat) -> Self {
Self {
data,
format,
metadata: None,
}
}
pub fn with_metadata(
data: serde_json::Value,
format: StreamFormat,
metadata: serde_json::Value,
) -> Self {
Self {
data,
format,
metadata: Some(metadata),
}
}
pub fn to_sse(&self) -> String {
format!("data: {}\n\n", self.data)
}
pub fn to_anthropic_sse(&self) -> String {
let content = self.extract_content().unwrap_or_default();
if content.is_empty() {
return String::new();
}
let index = self
.metadata
.as_ref()
.and_then(|m| m.get("block_index"))
.and_then(|i| i.as_u64())
.unwrap_or(0);
let event_data = serde_json::json!({
"type": "content_block_delta",
"index": index,
"delta": {
"type": "text_delta",
"text": content
}
});
format!("event: content_block_delta\ndata: {}\n\n", event_data)
}
pub fn to_ndjson(&self) -> String {
format!("{}\n", self.data)
}
pub fn to_json(&self) -> String {
self.data.to_string()
}
pub fn to_format(&self) -> String {
match self.format {
StreamFormat::Json => self.to_json(),
StreamFormat::SSE => self.to_sse(),
StreamFormat::NDJSON => self.to_ndjson(),
}
}
pub fn from_openai(
response: &StreamingResponse,
format: StreamFormat,
) -> Result<Self, serde_json::Error> {
let data = serde_json::to_value(response)?;
Ok(Self::new(data, format))
}
pub fn from_ollama(
chunk: &OllamaStreamChunk,
format: StreamFormat,
) -> Result<Self, serde_json::Error> {
let data = serde_json::to_value(chunk)?;
Ok(Self::new(data, format))
}
pub fn is_final(&self) -> bool {
self.data
.get("done")
.and_then(|d| d.as_bool())
.unwrap_or(false)
}
pub fn extract_content(&self) -> Option<String> {
if let Some(content) = self
.data
.get("message")
.and_then(|m| m.get("content"))
.and_then(|c| c.as_str())
{
return Some(content.to_string());
}
if let Some(choices) = self.data.get("choices").and_then(|c| c.as_array())
&& let Some(choice) = choices.first()
&& let Some(content) = choice
.get("delta")
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
return Some(content.to_string());
}
None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub format: StreamingFormat,
pub stream_format: StreamFormat,
pub include_usage: bool,
pub include_reasoning: bool,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
format: StreamingFormat::OpenAI,
stream_format: StreamFormat::Json,
include_usage: true,
include_reasoning: true,
}
}
}
#[cfg(feature = "streaming")]
pub type ChatStream =
Pin<Box<dyn Stream<Item = Result<StreamingResponse, crate::error::LlmConnectorError>> + Send>>;
#[cfg(feature = "streaming")]
pub type OllamaChatStream =
Pin<Box<dyn Stream<Item = Result<OllamaStreamChunk, crate::error::LlmConnectorError>> + Send>>;
#[cfg(feature = "streaming")]
pub type UniversalChatStream =
Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::LlmConnectorError>> + Send>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingResponse {
pub id: String,
#[serde(default)]
pub object: String,
pub created: u64,
pub model: String,
pub choices: Vec<StreamingChoice>,
#[serde(default)]
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<Usage>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system_fingerprint: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingChoice {
pub index: u32,
pub delta: Delta,
#[serde(skip_serializing_if = "Option::is_none")]
pub finish_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logprobs: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Delta {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<Role>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thought: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thinking: Option<String>,
}
impl Delta {
pub fn reasoning_any(&self) -> Option<&str> {
self.reasoning_content
.as_deref()
.or(self.reasoning.as_deref())
.or(self.thought.as_deref())
.or(self.thinking.as_deref())
}
pub fn populate_reasoning_from_json(&mut self, raw: &serde_json::Value) {
fn collect_synonyms(
val: &serde_json::Value,
acc: &mut std::collections::HashMap<String, String>,
) {
match val {
serde_json::Value::Array(arr) => {
for v in arr {
collect_synonyms(v, acc);
}
}
serde_json::Value::Object(map) => {
for (k, v) in map {
let key = k.to_ascii_lowercase();
if let serde_json::Value::String(s) = v {
match key.as_str() {
"reasoning_content" | "reasoning" | "thought" | "thinking" => {
acc.entry(key).or_insert_with(|| s.clone());
}
_ => {}
}
}
collect_synonyms(v, acc);
}
}
_ => {}
}
}
let mut found = std::collections::HashMap::<String, String>::new();
collect_synonyms(raw, &mut found);
if self.reasoning_content.is_none()
&& let Some(v) = found.get("reasoning_content")
{
self.reasoning_content = Some(v.clone());
}
if self.reasoning.is_none()
&& let Some(v) = found.get("reasoning")
{
self.reasoning = Some(v.clone());
}
if self.thought.is_none()
&& let Some(v) = found.get("thought")
{
self.thought = Some(v.clone());
}
if self.thinking.is_none()
&& let Some(v) = found.get("thinking")
{
self.thinking = Some(v.clone());
}
}
}
impl StreamingResponse {
pub fn populate_reasoning_synonyms(&mut self, raw: &serde_json::Value) {
for choice in &mut self.choices {
choice.delta.populate_reasoning_from_json(raw);
}
if self.reasoning_content.is_none()
&& let Some(reason) = self
.choices
.iter()
.find_map(|c| c.delta.reasoning_any().map(|s| s.to_string()))
{
self.reasoning_content = Some(reason);
}
}
pub fn get_content(&self) -> Option<&str> {
if self.content.is_empty() {
None
} else {
Some(&self.content)
}
}
}
impl Default for StreamingResponse {
fn default() -> Self {
Self {
id: String::new(),
object: "chat.completion.chunk".to_string(),
created: 0,
model: String::new(),
choices: Vec::new(),
content: String::new(),
reasoning_content: None,
usage: None,
system_fingerprint: None,
}
}
}
impl From<ChatResponse> for StreamingResponse {
fn from(response: ChatResponse) -> Self {
let first_choice = response.choices.first();
Self {
id: response.id,
object: "chat.completion.chunk".to_string(),
created: response.created,
model: response.model,
choices: first_choice
.map(|choice| {
vec![StreamingChoice {
index: choice.index,
delta: Delta {
role: Some(choice.message.role),
content: if choice.message.content.is_empty() {
None
} else {
Some(choice.message.content_as_text())
},
tool_calls: choice.message.tool_calls.clone(),
reasoning_content: None,
reasoning: None,
thought: None,
thinking: None,
},
finish_reason: choice.finish_reason.clone(),
logprobs: choice.logprobs.clone(),
}]
})
.unwrap_or_default(),
content: response.content,
reasoning_content: None,
usage: response.usage,
system_fingerprint: response.system_fingerprint,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OllamaStreamChunk {
pub model: String,
pub created_at: String,
pub message: OllamaMessage,
pub done: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub load_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_duration: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OllamaMessage {
pub role: String,
pub content: String,
}
impl OllamaStreamChunk {
pub fn new(model: String, content: String, done: bool) -> Self {
Self {
model,
created_at: chrono::Utc::now().to_rfc3339(),
message: OllamaMessage {
role: "assistant".to_string(),
content,
},
done,
total_duration: None,
load_duration: None,
prompt_eval_count: None,
prompt_eval_duration: None,
eval_count: None,
eval_duration: None,
}
}
pub fn final_chunk(model: String, usage: Option<&Usage>) -> Self {
let mut chunk = Self::new(model, String::new(), true);
if let Some(usage) = usage {
chunk.prompt_eval_count = Some(usage.prompt_tokens);
chunk.eval_count = Some(usage.completion_tokens);
chunk.prompt_eval_duration = Some((usage.prompt_tokens as u64) * 1_000_000); chunk.eval_duration = Some((usage.completion_tokens as u64) * 10_000_000); chunk.total_duration =
Some(chunk.prompt_eval_duration.unwrap_or(0) + chunk.eval_duration.unwrap_or(0));
}
chunk
}
pub fn from_openai_chunk(openai_chunk: &StreamingResponse, done: bool) -> Self {
let content = if !openai_chunk.content.is_empty() {
openai_chunk.content.clone()
} else {
openai_chunk
.choices
.first()
.and_then(|choice| choice.delta.content.clone())
.unwrap_or_default()
};
let mut chunk = Self::new(openai_chunk.model.clone(), content, done);
if done && let Some(usage) = &openai_chunk.usage {
chunk.prompt_eval_count = Some(usage.prompt_tokens);
chunk.eval_count = Some(usage.completion_tokens);
chunk.prompt_eval_duration = Some((usage.prompt_tokens as u64) * 1_000_000);
chunk.eval_duration = Some((usage.completion_tokens as u64) * 10_000_000);
chunk.total_duration =
Some(chunk.prompt_eval_duration.unwrap_or(0) + chunk.eval_duration.unwrap_or(0));
}
chunk
}
}
fn format_anthropic_event(event_type: &str, data: &serde_json::Value) -> String {
format!("event: {}\ndata: {}\n\n", event_type, data)
}
#[derive(Debug, Clone)]
pub struct AnthropicSseAdapter {
started: bool,
message_id: String,
model: String,
current_block_index: i32,
current_block_type: Option<String>,
input_tokens: u32,
finished: bool,
}
impl Default for AnthropicSseAdapter {
fn default() -> Self {
Self::new()
}
}
impl AnthropicSseAdapter {
pub fn new() -> Self {
Self {
started: false,
message_id: String::new(),
model: String::new(),
current_block_index: -1,
current_block_type: None,
input_tokens: 0,
finished: false,
}
}
pub fn convert(&mut self, chunk: &StreamingResponse) -> Vec<String> {
if self.finished {
return Vec::new();
}
let mut events = Vec::new();
if !self.started {
self.started = true;
self.message_id = if chunk.id.is_empty() {
format!(
"msg_{}{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
rand::random::<u16>()
)
} else {
chunk.id.clone()
};
self.model = chunk.model.clone();
if let Some(usage) = &chunk.usage {
self.input_tokens = usage.prompt_tokens;
}
events.push(format_anthropic_event(
"message_start",
&serde_json::json!({
"type": "message_start",
"message": {
"id": self.message_id,
"type": "message",
"role": "assistant",
"content": [],
"model": self.model,
"stop_reason": null,
"stop_sequence": null,
"usage": {
"input_tokens": self.input_tokens,
"output_tokens": 0
}
}
}),
));
}
let delta = chunk.choices.first().map(|c| &c.delta);
let finish_reason = chunk
.choices
.first()
.and_then(|c| c.finish_reason.as_deref());
if let Some(delta) = delta {
if let Some(thinking) = delta.reasoning_any()
&& !thinking.is_empty()
{
if self.current_block_type.as_deref() != Some("thinking") {
self.close_current_block(&mut events);
self.current_block_index += 1;
self.current_block_type = Some("thinking".to_string());
events.push(format_anthropic_event(
"content_block_start",
&serde_json::json!({
"type": "content_block_start",
"index": self.current_block_index,
"content_block": { "type": "thinking", "thinking": "" }
}),
));
}
events.push(format_anthropic_event(
"content_block_delta",
&serde_json::json!({
"type": "content_block_delta",
"index": self.current_block_index,
"delta": { "type": "thinking_delta", "thinking": thinking }
}),
));
}
if let Some(text) = &delta.content
&& !text.is_empty()
{
if self.current_block_type.as_deref() != Some("text") {
self.close_current_block(&mut events);
self.current_block_index += 1;
self.current_block_type = Some("text".to_string());
events.push(format_anthropic_event(
"content_block_start",
&serde_json::json!({
"type": "content_block_start",
"index": self.current_block_index,
"content_block": { "type": "text", "text": "" }
}),
));
}
events.push(format_anthropic_event(
"content_block_delta",
&serde_json::json!({
"type": "content_block_delta",
"index": self.current_block_index,
"delta": { "type": "text_delta", "text": text }
}),
));
}
}
if let Some(stop_reason) = finish_reason {
self.emit_finish_events(&mut events, stop_reason, chunk.usage.as_ref());
}
events
}
pub fn finish(&mut self, usage: Option<&Usage>) -> Vec<String> {
if self.finished || !self.started {
return Vec::new();
}
let mut events = Vec::new();
self.emit_finish_events(&mut events, "end_turn", usage);
events
}
fn close_current_block(&mut self, events: &mut Vec<String>) {
if self.current_block_type.is_some() {
events.push(format_anthropic_event(
"content_block_stop",
&serde_json::json!({
"type": "content_block_stop",
"index": self.current_block_index
}),
));
self.current_block_type = None;
}
}
fn emit_finish_events(
&mut self,
events: &mut Vec<String>,
stop_reason: &str,
usage: Option<&Usage>,
) {
self.finished = true;
self.close_current_block(events);
let anthropic_stop_reason = match stop_reason {
"stop" => "end_turn",
"length" => "max_tokens",
"tool_calls" => "tool_use",
other => other,
};
let output_tokens = usage.map(|u| u.completion_tokens).unwrap_or(0);
events.push(format_anthropic_event(
"message_delta",
&serde_json::json!({
"type": "message_delta",
"delta": {
"stop_reason": anthropic_stop_reason,
"stop_sequence": null
},
"usage": { "output_tokens": output_tokens }
}),
));
events.push(format_anthropic_event(
"message_stop",
&serde_json::json!({ "type": "message_stop" }),
));
}
}
pub fn convert_streaming_format(
openai_chunk: &StreamingResponse,
format: StreamingFormat,
is_final: bool,
) -> Result<String, serde_json::Error> {
match format {
StreamingFormat::OpenAI => serde_json::to_string(openai_chunk),
StreamingFormat::Ollama => {
let ollama_chunk = OllamaStreamChunk::from_openai_chunk(openai_chunk, is_final);
serde_json::to_string(&ollama_chunk)
}
StreamingFormat::Anthropic => {
let content = openai_chunk.get_content().unwrap_or("");
if content.is_empty() {
return Ok(String::new());
}
let event = serde_json::json!({
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": content }
});
serde_json::to_string(&event)
}
}
}
pub fn create_final_ollama_chunk(model: &str, usage: Option<&Usage>) -> String {
let final_chunk = OllamaStreamChunk::final_chunk(model.to_string(), usage);
serde_json::to_string(&final_chunk).unwrap_or_else(|_| {
format!(
r#"{{"model":"{}","created_at":"{}","message":{{"role":"assistant","content":""}},"done":true}}"#,
model,
chrono::Utc::now().to_rfc3339()
)
})
}