use crate::error::LlmError;
use crate::stream::{ChatStream, ChatStreamEvent};
use crate::utils::sse_stream::SseStreamExt;
use eventsource_stream::Event;
use futures_util::StreamExt;
use std::future::Future;
use std::pin::Pin;
type SseEventFuture<'a> =
Pin<Box<dyn Future<Output = Vec<Result<ChatStreamEvent, LlmError>>> + Send + Sync + 'a>>;
type JsonEventFuture<'a> =
Pin<Box<dyn Future<Output = Vec<Result<ChatStreamEvent, LlmError>>> + Send + Sync + 'a>>;
pub trait SseEventConverter: Send + Sync {
fn convert_event(&self, event: Event) -> SseEventFuture<'_>;
fn handle_stream_end(&self) -> Option<Result<ChatStreamEvent, LlmError>> {
None
}
}
pub trait JsonEventConverter: Send + Sync {
fn convert_json<'a>(&'a self, json_data: &'a str) -> JsonEventFuture<'a>;
}
pub struct StreamFactory;
impl StreamFactory {
pub async fn create_json_stream<C>(
response: reqwest::Response,
converter: C,
) -> Result<ChatStream, LlmError>
where
C: JsonEventConverter + Clone + 'static,
{
let byte_stream = response
.bytes_stream()
.map(|chunk| chunk.map_err(|e| LlmError::HttpError(format!("Stream error: {e}"))));
let sse_stream = byte_stream.into_sse_stream();
let chat_stream = sse_stream
.then(move |event_result| {
let converter = converter.clone();
async move {
match event_result {
Ok(event) => {
if event.data.trim().is_empty() {
return vec![];
}
converter.convert_json(&event.data).await
}
Err(e) => {
let error =
Err(LlmError::ParseError(format!("JSON parsing error: {e}")));
vec![error]
}
}
}
})
.flat_map(futures::stream::iter);
let boxed_stream: ChatStream = Box::pin(chat_stream);
Ok(boxed_stream)
}
pub async fn create_eventsource_stream<C>(
request_builder: reqwest::RequestBuilder,
converter: C,
) -> Result<ChatStream, LlmError>
where
C: SseEventConverter + Clone + Send + 'static,
{
use crate::utils::sse_stream::SseStreamExt;
let response = request_builder
.send()
.await
.map_err(|e| LlmError::HttpError(format!("Failed to send request: {e}")))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(LlmError::HttpError(format!(
"HTTP error {}: {}",
status.as_u16(),
error_text
)));
}
let byte_stream = response
.bytes_stream()
.map(|chunk| chunk.map_err(|e| LlmError::HttpError(format!("Stream error: {e}"))));
let sse_stream = byte_stream.into_sse_stream();
let chat_stream = sse_stream
.then(move |event_result| {
let converter = converter.clone();
async move {
match event_result {
Ok(event) => {
if event.data.trim() == "[DONE]" {
if let Some(end_event) = converter.handle_stream_end() {
return vec![end_event];
} else {
return vec![];
}
}
if event.data.trim().is_empty() {
return vec![];
}
converter.convert_event(event).await
}
Err(e) => {
let error =
Err(LlmError::StreamError(format!("SSE parsing error: {e}")));
vec![error]
}
}
}
})
.flat_map(futures::stream::iter);
Ok(Box::pin(chat_stream))
}
}
pub struct EventBuilder {
events: Vec<ChatStreamEvent>,
}
impl EventBuilder {
pub fn new() -> Self {
Self {
events: Vec::with_capacity(2), }
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
events: Vec::with_capacity(capacity),
}
}
pub fn add_stream_start(mut self, metadata: crate::types::ResponseMetadata) -> Self {
self.events.push(ChatStreamEvent::StreamStart { metadata });
self
}
pub fn add_content_delta(mut self, delta: String, index: Option<usize>) -> Self {
if !delta.is_empty() {
self.events
.push(ChatStreamEvent::ContentDelta { delta, index });
}
self
}
pub fn add_tool_call_delta(
mut self,
id: String,
function_name: Option<String>,
arguments_delta: Option<String>,
index: Option<usize>,
) -> Self {
self.events.push(ChatStreamEvent::ToolCallDelta {
id,
function_name,
arguments_delta,
index,
});
self
}
pub fn add_thinking_delta(mut self, delta: String) -> Self {
if !delta.is_empty() {
self.events.push(ChatStreamEvent::ThinkingDelta { delta });
}
self
}
pub fn add_usage_update(mut self, usage: crate::types::Usage) -> Self {
self.events.push(ChatStreamEvent::UsageUpdate { usage });
self
}
pub fn add_stream_end(mut self, response: crate::types::ChatResponse) -> Self {
self.events.push(ChatStreamEvent::StreamEnd { response });
self
}
pub fn build(self) -> Vec<ChatStreamEvent> {
self.events
}
pub fn build_results(self) -> Vec<Result<ChatStreamEvent, LlmError>> {
self.events.into_iter().map(Ok).collect()
}
}
impl Default for EventBuilder {
fn default() -> Self {
Self::new()
}
}
#[macro_export]
macro_rules! impl_sse_converter {
($converter_type:ty, $event_type:ty, $convert_fn:ident) => {
impl $crate::utils::streaming::SseEventConverter for $converter_type {
fn convert_event(
&self,
event: eventsource_stream::Event,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Option<
Result<$crate::stream::ChatStreamEvent, $crate::error::LlmError>,
>,
> + Send
+ Sync
+ '_,
>,
> {
Box::pin(async move {
match serde_json::from_str::<$event_type>(&event.data) {
Ok(parsed_event) => Some(Ok(self.$convert_fn(parsed_event))),
Err(e) => Some(Err($crate::error::LlmError::ParseError(format!(
"Failed to parse event: {e}"
)))),
}
})
}
}
};
}
#[macro_export]
macro_rules! impl_json_converter {
($converter_type:ty, $event_type:ty, $convert_fn:ident) => {
impl $crate::utils::streaming::JsonEventConverter for $converter_type {
fn convert_json<'a>(
&'a self,
json_data: &'a str,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Option<
Result<$crate::stream::ChatStreamEvent, $crate::error::LlmError>,
>,
> + Send
+ Sync
+ 'a,
>,
> {
Box::pin(async move {
match serde_json::from_str::<$event_type>(json_data) {
Ok(parsed_event) => Some(Ok(self.$convert_fn(parsed_event))),
Err(e) => Some(Err($crate::error::LlmError::ParseError(format!(
"Failed to parse JSON: {e}"
)))),
}
})
}
}
};
}