Skip to main content

claude_api/
sse.rs

1//! Low-level Server-Sent Events parsing.
2//!
3//! Wraps the `eventsource-stream` crate and maps each SSE event's `data` payload
4//! into a typed value via [`serde_json`]. The high-level
5//! [`EventStream`](crate::messages::stream::EventStream) sits on top of this.
6//!
7//! Gated on the `streaming` feature.
8
9use eventsource_stream::Eventsource;
10use futures_util::StreamExt;
11
12use crate::error::{Error, Result, StreamError};
13
14/// Convert a streaming HTTP response body into a stream of typed values.
15///
16/// Each SSE event's `data` field is JSON-parsed into `T`. Wire-level parse
17/// errors map to [`StreamError::Connection`]; JSON-decode errors map to
18/// [`StreamError::Parse`].
19pub(crate) fn into_typed_stream<T>(
20    response: reqwest::Response,
21) -> impl futures_util::Stream<Item = Result<T>> + Send + 'static
22where
23    T: serde::de::DeserializeOwned + Send + 'static,
24{
25    response
26        .bytes_stream()
27        .eventsource()
28        .map(|item| match item {
29            Ok(event) => serde_json::from_str::<T>(&event.data)
30                .map_err(|e| Error::Stream(StreamError::Parse(e.to_string()))),
31            Err(e) => Err(Error::Stream(StreamError::Connection(e.to_string()))),
32        })
33}