Skip to main content

sseer/
lib.rs

1//! High-performance, `no_std`-compatible utilities for parsing and consuming
2//! [Server-Sent Events](https://html.spec.whatwg.org/multipage/server-sent-events.html) (SSE) streams.
3//!
4//! `sseer` provides a layered API for working with SSE:
5//!
6//! - [`EventStream`] - a generic [`Stream`][futures_core::Stream] adapter that converts any
7//!   `Stream<Item = Result<impl AsRef<[u8]>, E>>` into a stream of parsed [`Event`][event::Event]s.
8//! - [`EventSource`] (requires `reqwest` feature) - a batteries-included HTTP client that
9//!   wraps [`reqwest`] with automatic reconnection, retry policies, and the `Last-Event-ID` header.
10//! - [`JsonStream`][json_stream::JsonStream] (requires `json` feature) - a stream adapter
11//!   that deserialises each event's `data` field into a typed value via [`serde_json`].
12//! - [`Utf8Stream`][utf8_stream::Utf8Stream] - validates and converts a raw byte stream into
13//!   a stream of UTF-8 [`Str`][bytes_utils::Str]s, buffering incomplete multi-byte sequences across
14//!   chunks.
15//! - Low-level parsing via [`parser::parse_line`] and [`parser::parse_line_from_buffer`] for
16//!   custom integrations.
17//!
18//! # Quick start with `reqwest`
19//!
20//! ```ignore
21//! use futures::StreamExt;
22//! use sseer::{EventSource, reqwest::StreamEvent};
23//!
24//! # #[tokio::main]
25//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//! let request = reqwest::Client::new().get("https://example.com/events");
27//! let mut source = EventSource::new(request)?;
28//!
29//! while let Some(result) = source.next().await {
30//!     match result {
31//!         Ok(StreamEvent::Open) => println!("connected"),
32//!         Ok(StreamEvent::Event(evt)) => {
33//!             println!("{}: {}", evt.event, evt.data);
34//!         }
35//!         Err(e) if e.is_response_err() => break,
36//!         Err(e) => eprintln!("error: {e}"),
37//!     }
38//! }
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! # Without retry logic
44//!
45//! If you don't need automatic reconnection, such as with the OpenAI API where you typically
46//! can't pick up a dropped stream (or don't want to accidentally send the same request more times), you can skip [`EventSource`]
47//! and use [`response_to_stream`] to convert a [`::reqwest::Response`] directly
48//! into an [`EventStream`] (the ergonomics are better I recommend it):
49//!
50//! ```ignore
51//! use futures::StreamExt;
52//!
53//! # #[tokio::main]
54//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
55//! let response = reqwest::Client::new()
56//!     .get("https://api.example.com/v1/chat/completions")
57//!     .send()
58//!     .await?;
59//!
60//! let mut stream = sseer::response_to_stream(response);
61//!
62//! while let Some(Ok(event)) = stream.next().await {
63//!     println!("{}", event.data);
64//! }
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! # Using `EventStream` directly
70//!
71//! If you already have a byte stream (from any HTTP client, WebSocket, file, etc.)
72//! you can use [`EventStream`] without the `reqwest` feature:
73//!
74//! ```rust
75//! use bytes::Bytes;
76//! use futures::StreamExt;
77//! use sseer::EventStream;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! let chunks = vec![
82//!     Ok::<_, std::io::Error>(Bytes::from("data: hello\n\ndata: world\n\n")),
83//! ];
84//! let mut stream = EventStream::new(futures::stream::iter(chunks));
85//!
86//! while let Some(Ok(event)) = stream.next().await {
87//!     println!("{}", event.data);
88//! }
89//! # }
90//! ```
91//!
92//! # Feature flags
93//!
94//! | Feature | Default | Description | no std? |
95//! | --- | --- | --- | --- |
96//! | `serde` | off | Derives [`Serialize`][::serde::Serialize] and [`Deserialize`][::serde::Deserialize] on [`Event`][event::Event] and enables `serde` support in [`bytes-utils`][bytes_utils]. | false |
97//! | `std` | off | Enables standard library support in core dependencies (`bytes`, `memchr`, `futures-core`, etc.). Notably enables runtime SIMD for memchr. Turned on automatically by `reqwest` and `json`. | false |
98//! | `reqwest` | off | Provides [`EventSource`] for HTTP-based SSE with automatic reconnection and configurable retry policies. | false |
99//! | `json` | off | Provides [`JsonStream`][json_stream::JsonStream] for deserialising event data into typed values via [`serde_json`] and lets you choose between the default errors or [`serde_path_to_error`] for richer errors. | false |
100//!
101//! Without any features enabled, the crate is fully `no_std` compatible and provides
102//! [`EventStream`], [`Utf8Stream`][utf8_stream::Utf8Stream], the low-level parser,
103//! and retry policy types.
104
105#![cfg_attr(not(feature = "std"), no_std)]
106
107pub(crate) mod constants;
108pub mod errors;
109pub mod event;
110pub mod event_stream;
111pub mod parser;
112#[cfg(feature = "reqwest")]
113pub mod reqwest;
114pub mod retry;
115pub mod utf8_stream;
116
117#[cfg(feature = "json")]
118pub mod json_stream;
119// if the reqwest feature is enabled, this is what someone wants
120#[cfg(feature = "reqwest")]
121pub use reqwest::EventSource;
122
123pub use event_stream::{bytes::EventStreamBytes, generic::EventStream};
124
125#[cfg(feature = "reqwest")]
126/// Convert a [`Response`][::reqwest::Response] into a [`Stream`][futures_core::Stream] via a similar mechanism to [::reqwest::Response::bytes_stream]
127pub fn response_to_stream(
128    response: ::reqwest::Response,
129) -> event_stream::bytes::EventStreamBytes<http_body_util::BodyDataStream<::reqwest::Body>> {
130    event_stream::bytes::EventStreamBytes::new(http_body_util::BodyDataStream::new(
131        ::reqwest::Body::from(response),
132    ))
133}