use async_openai::types::chat::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionRequest,
CreateChatCompletionStreamResponse,
};
use axum::response::sse::{Event, Sse};
use futures_util::stream::{Stream, StreamExt};
use genai::Client;
use genai::chat::ChatStreamEvent;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use crate::gateway::adapter::adapt_openai_to_genai;
use crate::gateway::error::GatewayError;
use reflex::cache::BqSearchBackend;
pub async fn handle_streaming_request<B>(
client: Client,
model: &str,
request: CreateChatCompletionRequest,
_tenant_id_hash: u64,
_context_hash: u64,
_semantic_text: String,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>> + Send + 'static>, GatewayError>
where
B: BqSearchBackend + Clone + Send + Sync + 'static,
{
let genai_req = adapt_openai_to_genai(request.clone());
let model_owned = model.to_string();
let chat_stream_resp = client
.exec_chat_stream(&model_owned, genai_req, None)
.await
.map_err(|e| {
error!("Provider stream init error: {}", e);
GatewayError::ProviderError("Upstream service stream init failed".to_string())
})?;
let stream = chat_stream_resp.stream;
let _accumulated_content = Arc::new(Mutex::new(String::new()));
let event_stream = stream.map(move |result| match result {
Ok(ChatStreamEvent::Start) => Ok(Event::default().comment("start")),
Ok(ChatStreamEvent::Chunk(chunk)) => {
let text = chunk.content;
if !text.is_empty() {
let delta: ChatCompletionStreamResponseDelta = match serde_json::from_value(
serde_json::json!({ "role": "assistant", "content": text }),
) {
Ok(d) => d,
Err(e) => {
error!("Failed to construct delta: {}", e);
return Ok(Event::default().comment("delta-error"));
}
};
let response: CreateChatCompletionStreamResponse =
match serde_json::from_value(serde_json::json!({
"id": format!("chatcmpl-{}", uuid::Uuid::new_v4()),
"object": "chat.completion.chunk",
"created": chrono::Utc::now().timestamp() as u32,
"model": model_owned.clone(),
"choices": vec![ChatChoiceStream {
index: 0,
delta,
finish_reason: None,
logprobs: None,
}],
"usage": serde_json::Value::Null,
})) {
Ok(r) => r,
Err(e) => {
error!("Failed to construct streaming response: {}", e);
return Ok(Event::default().comment("delta-error"));
}
};
match serde_json::to_string(&response) {
Ok(json) => Ok(Event::default().data(json)),
Err(e) => {
error!("Failed to serialize response: {}", e);
Ok(Event::default().comment("serialization-error"))
}
}
} else {
Ok(Event::default().comment("keep-alive"))
}
}
Ok(ChatStreamEvent::End(_)) => Ok(Event::default().data("[DONE]")),
Ok(_) => Ok(Event::default().comment("ignored-event")),
Err(e) => {
error!("Stream error: {}", e);
Ok(Event::default()
.event("error")
.data("Stream interrupted by upstream error"))
}
});
Ok(Sse::new(event_stream))
}