claude_api/sse.rs
1//! Low-level Server-Sent Events parsing.
2//!
3//! Wraps the `eventsource-stream` crate and maps each SSE event's `data` payload
4//! into a typed value via [`serde_json`]. The high-level
5//! [`EventStream`](crate::messages::stream::EventStream) sits on top of this.
6//!
7//! Gated on the `streaming` feature.
8
9use eventsource_stream::Eventsource;
10use futures_util::StreamExt;
11
12use crate::error::{Error, Result, StreamError};
13
14/// Convert a streaming HTTP response body into a stream of typed values.
15///
16/// Each SSE event's `data` field is JSON-parsed into `T`. Wire-level parse
17/// errors map to [`StreamError::Connection`]; JSON-decode errors map to
18/// [`StreamError::Parse`].
19pub(crate) fn into_typed_stream<T>(
20 response: reqwest::Response,
21) -> impl futures_util::Stream<Item = Result<T>> + Send + 'static
22where
23 T: serde::de::DeserializeOwned + Send + 'static,
24{
25 response
26 .bytes_stream()
27 .eventsource()
28 .map(|item| match item {
29 Ok(event) => serde_json::from_str::<T>(&event.data)
30 .map_err(|e| Error::Stream(StreamError::Parse(e.to_string()))),
31 Err(e) => Err(Error::Stream(StreamError::Connection(e.to_string()))),
32 })
33}