use crate::api::openai::OpenAIClient;
use crate::api::openai::client::OPENAI_API_BASE;
use crate::api::openai::wire::{OpenAIResponse, build_response, build_responses_body};
use crate::api::types::*;
use crate::api::utils;
use crate::error::{Result, SofosError};
use futures::stream::{Stream, StreamExt};
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
impl OpenAIClient {
pub async fn create_message_streaming<FText, FThink>(
&self,
request: CreateMessageRequest,
on_text_delta: FText,
on_thinking_delta: FThink,
interrupt_flag: Arc<AtomicBool>,
) -> Result<CreateMessageResponse>
where
FText: Fn(&str) + Send + Sync,
FThink: Fn(&str) + Send + Sync,
{
let mut body = build_responses_body(&request);
body["stream"] = json!(true);
let url = format!("{}/responses", OPENAI_API_BASE);
let response = utils::send_once("OpenAI", self.client.post(&url).json(&body)).await?;
let byte_stream = response.bytes_stream().map(|chunk_result| {
chunk_result.map_err(|e| SofosError::NetworkError(format!("Stream read error: {}", e)))
});
parse_stream(
byte_stream,
on_text_delta,
on_thinking_delta,
interrupt_flag,
)
.await
}
}
pub(crate) async fn parse_stream<S, B, FText, FThink>(
byte_stream: S,
on_text_delta: FText,
on_thinking_delta: FThink,
interrupt_flag: Arc<AtomicBool>,
) -> Result<CreateMessageResponse>
where
S: Stream<Item = Result<B>> + Unpin,
B: AsRef<[u8]>,
FText: Fn(&str) + Send + Sync,
FThink: Fn(&str) + Send + Sync,
{
let mut byte_stream = byte_stream;
let mut buffer: Vec<u8> = Vec::new();
let mut final_response: Option<OpenAIResponse> = None;
while let Some(chunk_result) = byte_stream.next().await {
if interrupt_flag.load(Ordering::SeqCst) {
return Err(SofosError::Interrupted);
}
let chunk = chunk_result?;
buffer.extend_from_slice(chunk.as_ref());
while let Some(pos) = buffer.iter().position(|b| *b == b'\n') {
if interrupt_flag.load(Ordering::SeqCst) {
return Err(SofosError::Interrupted);
}
let line = String::from_utf8_lossy(&buffer[..pos]).into_owned();
buffer.drain(..=pos);
let line = line.trim_end();
let json_str = match line.strip_prefix("data: ") {
Some("[DONE]") => continue,
Some(s) => s,
None => continue,
};
let event: serde_json::Value = match serde_json::from_str(json_str) {
Ok(v) => v,
Err(e) => {
tracing::debug!(
error = %e,
preview = %json_str.chars().take(200).collect::<String>(),
"failed to parse OpenAI streaming event"
);
continue;
}
};
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"response.output_text.delta" => {
if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
on_text_delta(delta);
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
on_thinking_delta(delta);
}
}
"response.refusal.delta" => {
if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
on_text_delta(delta);
}
}
"response.completed" | "response.incomplete" => {
if let Some(resp) = event.get("response") {
match serde_json::from_value::<OpenAIResponse>(resp.clone()) {
Ok(parsed) => final_response = Some(parsed),
Err(e) => {
return Err(SofosError::Api(format!(
"Failed to parse OpenAI streaming final response: {}",
e
)));
}
}
}
}
"response.failed" => {
let error_msg = event
.get("response")
.and_then(|r| r.get("error"))
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("Unknown streaming error");
return Err(SofosError::Api(format!("Streaming error: {}", error_msg)));
}
"error" => {
let error_msg = event
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.or_else(|| event.get("message").and_then(|m| m.as_str()))
.unwrap_or("Unknown streaming error");
return Err(SofosError::Api(format!("Streaming error: {}", error_msg)));
}
_ => {}
}
}
}
let parsed = final_response.ok_or_else(|| {
SofosError::Api(
"OpenAI stream ended without a response.completed/incomplete event".to_string(),
)
})?;
build_response(parsed)
}