Skip to main content

synth_ai/
sse.rs

1use eventsource_stream::Eventsource;
2use futures_util::{Stream, StreamExt};
3use reqwest::header::HeaderMap;
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6
7use crate::types::{Result, SynthError};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct SseEvent {
11    pub event: String,
12    pub data: String,
13    pub id: String,
14    pub retry: Option<std::time::Duration>,
15}
16
17pub type SseStream = Pin<Box<dyn Stream<Item = Result<SseEvent>> + Send>>;
18
19pub async fn stream_sse(
20    client: &reqwest::Client,
21    url: String,
22    headers: HeaderMap,
23) -> Result<SseStream> {
24    let resp = client.get(url).headers(headers).send().await?;
25    let status = resp.status();
26    if !status.is_success() {
27        let body = resp.text().await.unwrap_or_default();
28        return Err(SynthError::Api {
29            status: status.as_u16(),
30            body,
31        });
32    }
33
34    let stream = resp.bytes_stream().eventsource().map(|item| match item {
35        Ok(evt) => Ok(SseEvent {
36            event: evt.event,
37            data: evt.data,
38            id: evt.id,
39            retry: evt.retry,
40        }),
41        Err(err) => Err(SynthError::Sse(err.to_string())),
42    });
43
44    Ok(Box::pin(stream))
45}