Skip to main content

aiclient_api/util/
stream.rs

1use axum::response::sse::{Event, KeepAlive, Sse};
2use bytes::Bytes;
3use futures::Stream;
4use futures::StreamExt;
5use std::convert::Infallible;
6use std::pin::Pin;
7use tracing::error;
8
9use crate::convert::stream::{chunk_to_anthropic, chunk_to_openai};
10use crate::providers::OutputFormat;
11
12/// Convert a provider byte stream into an SSE response.
13/// Applies chunk conversion based on the target output format.
14pub fn into_sse_response(
15    stream: Pin<Box<dyn Stream<Item = anyhow::Result<Bytes>> + Send>>,
16    format: OutputFormat,
17    model: String,
18) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
19    let converted = stream.filter_map(move |result| {
20        let model = model.clone();
21        async move {
22            match result {
23                Ok(bytes) => {
24                    let converted_bytes = match format {
25                        OutputFormat::OpenAI => chunk_to_openai(&bytes, &model),
26                        OutputFormat::Anthropic => chunk_to_anthropic(&bytes, &model),
27                    };
28
29                    if converted_bytes.is_empty() {
30                        return None;
31                    }
32
33                    // Parse SSE lines and emit events
34                    let text = match std::str::from_utf8(&converted_bytes) {
35                        Ok(s) => s.to_string(),
36                        Err(_) => return None,
37                    };
38
39                    // Extract data from SSE formatted content
40                    let data_lines: Vec<&str> = text
41                        .lines()
42                        .filter_map(|line| line.strip_prefix("data: "))
43                        .collect();
44
45                    if data_lines.is_empty() {
46                        return None;
47                    }
48
49                    // For simplicity, emit the first data chunk
50                    // In production, this could be split into multiple events
51                    let data = data_lines.join("\n");
52                    Some(Ok::<Event, Infallible>(Event::default().data(data)))
53                }
54                Err(e) => {
55                    error!("Stream error: {}", e);
56                    None
57                }
58            }
59        }
60    });
61
62    Sse::new(converted).keep_alive(KeepAlive::default())
63}