use super::error::ProviderError;
use super::types::*;
#[derive(Debug, Deserialize)]
struct NonStreamResponse {
id: String,
#[serde(default)]
model: Option<String>,
choices: Vec<NonStreamChoice>,
#[serde(default)]
usage: Option<NonStreamUsage>,
}
#[derive(Debug, Deserialize)]
struct NonStreamChoice {
#[allow(dead_code)]
index: u32,
message: Option<NonStreamMessage>,
delta: Option<NonStreamMessage>,
finish_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
struct NonStreamMessage {
#[allow(dead_code)]
role: Option<String>,
#[serde(default)]
content: Option<String>,
#[serde(default, alias = "reasoning")]
reasoning_content: Option<String>,
#[serde(default)]
tool_calls: Option<Vec<NonStreamToolCall>>,
}
#[derive(Debug, Deserialize)]
struct NonStreamToolCall {
#[serde(default)]
index: usize,
#[serde(default)]
id: Option<String>,
#[serde(default)]
function: Option<NonStreamFunction>,
}
#[derive(Debug, Deserialize)]
struct NonStreamFunction {
#[serde(default)]
name: Option<String>,
#[serde(default)]
arguments: Option<String>,
}
#[derive(Debug, Deserialize)]
struct NonStreamUsage {
#[serde(default)]
prompt_tokens: Option<u32>,
#[serde(default)]
completion_tokens: Option<u32>,
#[serde(default)]
cache_creation_input_tokens: Option<u32>,
#[serde(default, alias = "prompt_tokens_details")]
prompt_details: Option<NonStreamPromptDetails>,
}
#[derive(Debug, Deserialize)]
struct NonStreamPromptDetails {
#[serde(default)]
cached_tokens: Option<u32>,
}
use serde::Deserialize;
pub(crate) fn is_nonstream_response(buf: &str) -> bool {
let trimmed = buf.trim();
trimmed.starts_with('{') && trimmed.contains("\"chat.completion\"")
}
pub(crate) fn synthesize_stream_events(
buf: &str,
) -> Option<Vec<std::result::Result<StreamEvent, ProviderError>>> {
let resp: NonStreamResponse = serde_json::from_str(buf.trim()).ok()?;
let mut events = Vec::new();
tracing::info!(
"[OR_NONSTREAM] Synthesizing stream events from non-streaming response (id={})",
resp.id,
);
if !resp.id.is_empty() {
events.push(Ok(StreamEvent::MessageStart {
message: StreamMessage {
id: resp.id,
model: resp.model.unwrap_or_default(),
role: Role::Assistant,
usage: TokenUsage::default(),
},
}));
}
let choice = resp.choices.first()?;
let msg = choice.message.as_ref().or(choice.delta.as_ref())?;
if let Some(ref reasoning) = msg.reasoning_content
&& !reasoning.is_empty()
{
events.push(Ok(StreamEvent::ContentBlockDelta {
index: 0,
delta: ContentDelta::ReasoningDelta {
text: reasoning.clone(),
},
}));
}
let content = msg.content.as_deref().unwrap_or("");
let content = content.strip_prefix('\n').unwrap_or(content);
if !content.is_empty() {
events.push(Ok(StreamEvent::ContentBlockStart {
index: 0,
content_block: ContentBlock::Text {
text: content.to_string(),
},
}));
events.push(Ok(StreamEvent::ContentBlockStop { index: 0 }));
}
if let Some(ref tc_list) = msg.tool_calls {
for tc in tc_list {
let id = tc.id.clone().unwrap_or_default();
let name = tc
.function
.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
let args = tc
.function
.as_ref()
.and_then(|f| f.arguments.clone())
.unwrap_or_default();
let input = serde_json::from_str(&args).unwrap_or_else(|_| serde_json::json!({}));
let tool_index = tc.index + 1;
tracing::info!(
"[OR_NONSTREAM] tool_call: id={}, name={}, args_len={}",
id,
name,
args.len(),
);
events.push(Ok(StreamEvent::ContentBlockStart {
index: tool_index,
content_block: ContentBlock::ToolUse { id, name, input },
}));
events.push(Ok(StreamEvent::ContentBlockStop { index: tool_index }));
}
}
let stop_reason = choice.finish_reason.as_deref().map(|fr| match fr {
"tool_calls" | "function_call" => StopReason::ToolUse,
"length" => StopReason::MaxTokens,
_ => StopReason::EndTurn,
});
let mut token_usage = TokenUsage::default();
if let Some(ref usage) = resp.usage {
token_usage.input_tokens = usage.prompt_tokens.unwrap_or(0);
token_usage.output_tokens = usage.completion_tokens.unwrap_or(0);
if let Some(cache_create) = usage.cache_creation_input_tokens {
token_usage.cache_creation_tokens = cache_create;
}
if let Some(ref details) = usage.prompt_details
&& let Some(cached) = details.cached_tokens
{
token_usage.cache_read_tokens = cached;
}
}
events.push(Ok(StreamEvent::MessageDelta {
delta: MessageDelta {
stop_reason,
stop_sequence: None,
},
usage: token_usage,
}));
events.push(Ok(StreamEvent::MessageStop));
Some(events)
}