use super::request::{Role, ToolCall};
use super::response::{Usage, ChatResponse};
use serde::{Deserialize, Serialize};
#[cfg(feature = "streaming")]
use futures_util::Stream;
#[cfg(feature = "streaming")]
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StreamingFormat {
#[serde(rename = "openai")]
OpenAI,
#[serde(rename = "ollama")]
Ollama,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StreamFormat {
#[serde(rename = "json")]
Json,
#[serde(rename = "sse")]
SSE,
#[serde(rename = "ndjson")]
NDJSON,
}
impl Default for StreamingFormat {
fn default() -> Self {
Self::OpenAI
}
}
impl Default for StreamFormat {
fn default() -> Self {
Self::Json
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamChunk {
pub data: serde_json::Value,
pub format: StreamFormat,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl StreamChunk {
pub fn new(data: serde_json::Value, format: StreamFormat) -> Self {
Self {
data,
format,
metadata: None,
}
}
pub fn with_metadata(data: serde_json::Value, format: StreamFormat, metadata: serde_json::Value) -> Self {
Self {
data,
format,
metadata: Some(metadata),
}
}
pub fn to_sse(&self) -> String {
format!("data: {}\n\n", self.data)
}
pub fn to_ndjson(&self) -> String {
format!("{}\n", self.data)
}
pub fn to_json(&self) -> String {
self.data.to_string()
}
pub fn to_format(&self) -> String {
match self.format {
StreamFormat::Json => self.to_json(),
StreamFormat::SSE => self.to_sse(),
StreamFormat::NDJSON => self.to_ndjson(),
}
}
pub fn from_openai(response: &StreamingResponse, format: StreamFormat) -> Result<Self, serde_json::Error> {
let data = serde_json::to_value(response)?;
Ok(Self::new(data, format))
}
pub fn from_ollama(chunk: &OllamaStreamChunk, format: StreamFormat) -> Result<Self, serde_json::Error> {
let data = serde_json::to_value(chunk)?;
Ok(Self::new(data, format))
}
pub fn is_final(&self) -> bool {
self.data.get("done")
.and_then(|d| d.as_bool())
.unwrap_or(false)
}
pub fn extract_content(&self) -> Option<String> {
if let Some(content) = self.data
.get("message")
.and_then(|m| m.get("content"))
.and_then(|c| c.as_str())
{
return Some(content.to_string());
}
if let Some(choices) = self.data.get("choices").and_then(|c| c.as_array()) {
if let Some(choice) = choices.get(0) {
if let Some(content) = choice
.get("delta")
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
return Some(content.to_string());
}
}
}
None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingConfig {
pub format: StreamingFormat,
pub stream_format: StreamFormat,
pub include_usage: bool,
pub include_reasoning: bool,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
format: StreamingFormat::OpenAI,
stream_format: StreamFormat::Json,
include_usage: true,
include_reasoning: true,
}
}
}
#[cfg(feature = "streaming")]
pub type ChatStream =
Pin<Box<dyn Stream<Item = Result<StreamingResponse, crate::error::LlmConnectorError>> + Send>>;
#[cfg(feature = "streaming")]
pub type OllamaChatStream =
Pin<Box<dyn Stream<Item = Result<OllamaStreamChunk, crate::error::LlmConnectorError>> + Send>>;
#[cfg(feature = "streaming")]
pub type UniversalChatStream =
Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::LlmConnectorError>> + Send>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingResponse {
pub id: String,
#[serde(default)]
pub object: String,
pub created: u64,
pub model: String,
pub choices: Vec<StreamingChoice>,
#[serde(default)]
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<Usage>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system_fingerprint: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingChoice {
pub index: u32,
pub delta: Delta,
#[serde(skip_serializing_if = "Option::is_none")]
pub finish_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logprobs: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Delta {
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<Role>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning_content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reasoning: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thought: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thinking: Option<String>,
}
impl Delta {
pub fn reasoning_any(&self) -> Option<&str> {
self.reasoning_content
.as_deref()
.or(self.reasoning.as_deref())
.or(self.thought.as_deref())
.or(self.thinking.as_deref())
}
pub fn populate_reasoning_from_json(&mut self, raw: &serde_json::Value) {
fn collect_synonyms(val: &serde_json::Value, acc: &mut std::collections::HashMap<String, String>) {
match val {
serde_json::Value::Array(arr) => {
for v in arr { collect_synonyms(v, acc); }
}
serde_json::Value::Object(map) => {
for (k, v) in map {
let key = k.to_ascii_lowercase();
if let serde_json::Value::String(s) = v {
match key.as_str() {
"reasoning_content" | "reasoning" | "thought" | "thinking" => {
acc.entry(key).or_insert_with(|| s.clone());
}
_ => {}
}
}
collect_synonyms(v, acc);
}
}
_ => {}
}
}
let mut found = std::collections::HashMap::<String, String>::new();
collect_synonyms(raw, &mut found);
if self.reasoning_content.is_none() {
if let Some(v) = found.get("reasoning_content") { self.reasoning_content = Some(v.clone()); }
}
if self.reasoning.is_none() {
if let Some(v) = found.get("reasoning") { self.reasoning = Some(v.clone()); }
}
if self.thought.is_none() {
if let Some(v) = found.get("thought") { self.thought = Some(v.clone()); }
}
if self.thinking.is_none() {
if let Some(v) = found.get("thinking") { self.thinking = Some(v.clone()); }
}
}
}
impl StreamingResponse {
pub fn populate_reasoning_synonyms(&mut self, raw: &serde_json::Value) {
for choice in &mut self.choices {
choice.delta.populate_reasoning_from_json(raw);
}
if self.reasoning_content.is_none() {
if let Some(reason) = self
.choices
.iter()
.find_map(|c| c.delta.reasoning_any().map(|s| s.to_string()))
{
self.reasoning_content = Some(reason);
}
}
}
pub fn get_content(&self) -> Option<&str> {
if self.content.is_empty() { None } else { Some(&self.content) }
}
}
impl Default for StreamingResponse {
fn default() -> Self {
Self {
id: String::new(),
object: "chat.completion.chunk".to_string(),
created: 0,
model: String::new(),
choices: Vec::new(),
content: String::new(),
reasoning_content: None,
usage: None,
system_fingerprint: None,
}
}
}
impl From<ChatResponse> for StreamingResponse {
fn from(response: ChatResponse) -> Self {
let first_choice = response.choices.first();
Self {
id: response.id,
object: "chat.completion.chunk".to_string(),
created: response.created,
model: response.model,
choices: first_choice.map(|choice| {
vec![StreamingChoice {
index: choice.index,
delta: Delta {
role: Some(choice.message.role),
content: if choice.message.content.is_empty() {
None
} else {
Some(choice.message.content_as_text())
},
tool_calls: choice.message.tool_calls.clone(),
reasoning_content: None,
reasoning: None,
thought: None,
thinking: None,
},
finish_reason: choice.finish_reason.clone(),
logprobs: choice.logprobs.clone(),
}]
}).unwrap_or_default(),
content: response.content,
reasoning_content: None,
usage: response.usage,
system_fingerprint: response.system_fingerprint,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OllamaStreamChunk {
pub model: String,
pub created_at: String,
pub message: OllamaMessage,
pub done: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub load_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub prompt_eval_duration: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eval_duration: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OllamaMessage {
pub role: String,
pub content: String,
}
impl OllamaStreamChunk {
pub fn new(model: String, content: String, done: bool) -> Self {
Self {
model,
created_at: chrono::Utc::now().to_rfc3339(),
message: OllamaMessage {
role: "assistant".to_string(),
content,
},
done,
total_duration: None,
load_duration: None,
prompt_eval_count: None,
prompt_eval_duration: None,
eval_count: None,
eval_duration: None,
}
}
pub fn final_chunk(model: String, usage: Option<&Usage>) -> Self {
let mut chunk = Self::new(model, String::new(), true);
if let Some(usage) = usage {
chunk.prompt_eval_count = Some(usage.prompt_tokens);
chunk.eval_count = Some(usage.completion_tokens);
chunk.prompt_eval_duration = Some((usage.prompt_tokens as u64) * 1_000_000); chunk.eval_duration = Some((usage.completion_tokens as u64) * 10_000_000); chunk.total_duration = Some(
chunk.prompt_eval_duration.unwrap_or(0) + chunk.eval_duration.unwrap_or(0)
);
}
chunk
}
pub fn from_openai_chunk(openai_chunk: &StreamingResponse, done: bool) -> Self {
let content = if !openai_chunk.content.is_empty() {
openai_chunk.content.clone()
} else {
openai_chunk.choices.get(0)
.and_then(|choice| choice.delta.content.clone())
.unwrap_or_default()
};
let mut chunk = Self::new(openai_chunk.model.clone(), content, done);
if done {
if let Some(usage) = &openai_chunk.usage {
chunk.prompt_eval_count = Some(usage.prompt_tokens);
chunk.eval_count = Some(usage.completion_tokens);
chunk.prompt_eval_duration = Some((usage.prompt_tokens as u64) * 1_000_000);
chunk.eval_duration = Some((usage.completion_tokens as u64) * 10_000_000);
chunk.total_duration = Some(
chunk.prompt_eval_duration.unwrap_or(0) + chunk.eval_duration.unwrap_or(0)
);
}
}
chunk
}
}
pub fn convert_streaming_format(
openai_chunk: &StreamingResponse,
format: StreamingFormat,
is_final: bool,
) -> Result<String, serde_json::Error> {
match format {
StreamingFormat::OpenAI => {
serde_json::to_string(openai_chunk)
}
StreamingFormat::Ollama => {
let ollama_chunk = OllamaStreamChunk::from_openai_chunk(openai_chunk, is_final);
serde_json::to_string(&ollama_chunk)
}
}
}
pub fn create_final_ollama_chunk(model: &str, usage: Option<&Usage>) -> String {
let final_chunk = OllamaStreamChunk::final_chunk(model.to_string(), usage);
serde_json::to_string(&final_chunk).unwrap_or_else(|_| {
format!(
r#"{{"model":"{}","created_at":"{}","message":{{"role":"assistant","content":""}},"done":true}}"#,
model,
chrono::Utc::now().to_rfc3339()
)
})
}