Expand description
High-performance, no_std-compatible utilities for parsing and consuming
Server-Sent Events (SSE) streams.
sseer provides a layered API for working with SSE:
EventStream- a genericStreamadapter that converts anyStream<Item = Result<impl AsRef<[u8]>, E>>into a stream of parsedEvents.- [
EventSource] (requiresreqwestfeature) - a batteries-included HTTP client that wraps [reqwest] with automatic reconnection, retry policies, and theLast-Event-IDheader. - [
JsonStream][json_stream::JsonStream] (requiresjsonfeature) - a stream adapter that deserialises each event’sdatafield into a typed value via [serde_json]. Utf8Stream- validates and converts a raw byte stream into a stream of UTF-8Strs, buffering incomplete multi-byte sequences across chunks.- Low-level parsing via
parser::parse_lineandparser::parse_line_from_bufferfor custom integrations.
§Quick start with reqwest
use futures::StreamExt;
use sseer::{EventSource, reqwest::StreamEvent};
let request = reqwest::Client::new().get("https://example.com/events");
let mut source = EventSource::new(request)?;
while let Some(result) = source.next().await {
match result {
Ok(StreamEvent::Open) => println!("connected"),
Ok(StreamEvent::Event(evt)) => {
println!("{}: {}", evt.event, evt.data);
}
Err(e) if e.is_response_err() => break,
Err(e) => eprintln!("error: {e}"),
}
}§Without retry logic
If you don’t need automatic reconnection, such as with the OpenAI API where you typically
can’t pick up a dropped stream (or don’t want to accidentally send the same request more times), you can skip [EventSource]
and use [response_to_stream] to convert a [::reqwest::Response] directly
into an EventStream (the ergonomics are better I recommend it):
use futures::StreamExt;
let response = reqwest::Client::new()
.get("https://api.example.com/v1/chat/completions")
.send()
.await?;
let mut stream = sseer::response_to_stream(response);
while let Some(Ok(event)) = stream.next().await {
println!("{}", event.data);
}§Using EventStream directly
If you already have a byte stream (from any HTTP client, WebSocket, file, etc.)
you can use EventStream without the reqwest feature:
use bytes::Bytes;
use futures::StreamExt;
use sseer::EventStream;
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("data: hello\n\ndata: world\n\n")),
];
let mut stream = EventStream::new(futures::stream::iter(chunks));
while let Some(Ok(event)) = stream.next().await {
println!("{}", event.data);
}§Feature flags
| Feature | Default | Description | no std? |
|---|---|---|---|
serde | off | Derives [Serialize][::serde::Serialize] and [Deserialize][::serde::Deserialize] on Event and enables serde support in bytes-utils. | false |
std | off | Enables standard library support in core dependencies (bytes, memchr, futures-core, etc.). Notably enables runtime SIMD for memchr. Turned on automatically by reqwest and json. | false |
reqwest | off | Provides [EventSource] for HTTP-based SSE with automatic reconnection and configurable retry policies. | false |
json | off | Provides [JsonStream][json_stream::JsonStream] for deserialising event data into typed values via [serde_json] and lets you choose between the default errors or [serde_path_to_error] for richer errors. | false |
Without any features enabled, the crate is fully no_std compatible and provides
EventStream, Utf8Stream, the low-level parser,
and retry policy types.
Re-exports§
pub use event_stream::bytes::EventStreamBytes;pub use event_stream::generic::EventStream;
Modules§
- errors
Errorimplementations used across the crate- event
- Representation of SSE events based primarily off https://html.spec.whatwg.org/multipage/server-sent-events.html
- event_
stream Streamthat converts a stream ofBytesintoEvents- parser
- UTF-8 agnostic parser implementation for SSE
- retry
- Helpers to handle connection delays when receiving errors
- utf8_
stream