1use eventsource_stream::Eventsource;
2use futures_util::{Stream, StreamExt};
3use reqwest::header::HeaderMap;
4use serde::{Deserialize, Serialize};
5use std::pin::Pin;
6
7use crate::types::{Result, SynthError};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct SseEvent {
11 pub event: String,
12 pub data: String,
13 pub id: String,
14 pub retry: Option<std::time::Duration>,
15}
16
17pub type SseStream = Pin<Box<dyn Stream<Item = Result<SseEvent>> + Send>>;
18
19pub async fn stream_sse(
20 client: &reqwest::Client,
21 url: String,
22 headers: HeaderMap,
23) -> Result<SseStream> {
24 let resp = client.get(url).headers(headers).send().await?;
25 let status = resp.status();
26 if !status.is_success() {
27 let body = resp.text().await.unwrap_or_default();
28 return Err(SynthError::Api {
29 status: status.as_u16(),
30 body,
31 });
32 }
33
34 let stream = resp.bytes_stream().eventsource().map(|item| match item {
35 Ok(evt) => Ok(SseEvent {
36 event: evt.event,
37 data: evt.data,
38 id: evt.id,
39 retry: evt.retry,
40 }),
41 Err(err) => Err(SynthError::Sse(err.to_string())),
42 });
43
44 Ok(Box::pin(stream))
45}