aiclient_api/util/
stream.rs1use axum::response::sse::{Event, KeepAlive, Sse};
2use bytes::Bytes;
3use futures::Stream;
4use futures::StreamExt;
5use std::convert::Infallible;
6use std::pin::Pin;
7use tracing::error;
8
9use crate::convert::stream::{chunk_to_anthropic, chunk_to_openai};
10use crate::providers::OutputFormat;
11
12pub fn into_sse_response(
15 stream: Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send>>,
16 format: OutputFormat,
17 model: String,
18) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
19 let converted = stream.filter_map(move |result| {
20 let model = model.clone();
21 async move {
22 match result {
23 Ok(bytes) => {
24 let converted_bytes = match format {
25 OutputFormat::OpenAI => chunk_to_openai(&bytes, &model),
26 OutputFormat::Anthropic => chunk_to_anthropic(&bytes, &model),
27 };
28
29 if converted_bytes.is_empty() {
30 return None;
31 }
32
33 let text = match std::str::from_utf8(&converted_bytes) {
35 Ok(s) => s.to_string(),
36 Err(_) => return None,
37 };
38
39 let data_lines: Vec<&str> = text
41 .lines()
42 .filter_map(|line| line.strip_prefix("data: "))
43 .collect();
44
45 if data_lines.is_empty() {
46 return None;
47 }
48
49 let data = data_lines.join("\n");
52 Some(Ok::<Event, Infallible>(Event::default().data(data)))
53 }
54 Err(e) => {
55 error!("Stream error: {}", e);
56 None
57 }
58 }
59 }
60 });
61
62 Sse::new(converted).keep_alive(KeepAlive::default())
63}