Skip to main content

Crate sseer

Crate sseer 

Source
Expand description

High-performance, no_std-compatible utilities for parsing and consuming Server-Sent Events (SSE) streams.

sseer provides a layered API for working with SSE:

  • EventStream - a generic Stream adapter that converts any Stream<Item = Result<impl AsRef<[u8]>, E>> into a stream of parsed Events.
  • [EventSource] (requires reqwest feature) - a batteries-included HTTP client that wraps [reqwest] with automatic reconnection, retry policies, and the Last-Event-ID header.
  • [JsonStream][json_stream::JsonStream] (requires json feature) - a stream adapter that deserialises each event’s data field into a typed value via [serde_json].
  • Utf8Stream - validates and converts a raw byte stream into a stream of UTF-8 Strs, buffering incomplete multi-byte sequences across chunks.
  • Low-level parsing via parser::parse_line and parser::parse_line_from_buffer for custom integrations.

§Quick start with reqwest

use futures::StreamExt;
use sseer::{EventSource, reqwest::StreamEvent};

let request = reqwest::Client::new().get("https://example.com/events");
let mut source = EventSource::new(request)?;

while let Some(result) = source.next().await {
    match result {
        Ok(StreamEvent::Open) => println!("connected"),
        Ok(StreamEvent::Event(evt)) => {
            println!("{}: {}", evt.event, evt.data);
        }
        Err(e) if e.is_response_err() => break,
        Err(e) => eprintln!("error: {e}"),
    }
}

§Without retry logic

If you don’t need automatic reconnection, such as with the OpenAI API where you typically can’t pick up a dropped stream (or don’t want to accidentally send the same request more times), you can skip [EventSource] and use [response_to_stream] to convert a [::reqwest::Response] directly into an EventStream (the ergonomics are better I recommend it):

use futures::StreamExt;

let response = reqwest::Client::new()
    .get("https://api.example.com/v1/chat/completions")
    .send()
    .await?;

let mut stream = sseer::response_to_stream(response);

while let Some(Ok(event)) = stream.next().await {
    println!("{}", event.data);
}

§Using EventStream directly

If you already have a byte stream (from any HTTP client, WebSocket, file, etc.) you can use EventStream without the reqwest feature:

use bytes::Bytes;
use futures::StreamExt;
use sseer::EventStream;

let chunks = vec![
    Ok::<_, std::io::Error>(Bytes::from("data: hello\n\ndata: world\n\n")),
];
let mut stream = EventStream::new(futures::stream::iter(chunks));

while let Some(Ok(event)) = stream.next().await {
    println!("{}", event.data);
}

§Feature flags

FeatureDefaultDescriptionno std?
serdeoffDerives [Serialize][::serde::Serialize] and [Deserialize][::serde::Deserialize] on Event and enables serde support in bytes-utils.false
stdoffEnables standard library support in core dependencies (bytes, memchr, futures-core, etc.). Notably enables runtime SIMD for memchr. Turned on automatically by reqwest and json.false
reqwestoffProvides [EventSource] for HTTP-based SSE with automatic reconnection and configurable retry policies.false
jsonoffProvides [JsonStream][json_stream::JsonStream] for deserialising event data into typed values via [serde_json] and lets you choose between the default errors or [serde_path_to_error] for richer errors.false

Without any features enabled, the crate is fully no_std compatible and provides EventStream, Utf8Stream, the low-level parser, and retry policy types.

Re-exports§

pub use event_stream::bytes::EventStreamBytes;
pub use event_stream::generic::EventStream;

Modules§

errors
Error implementations used across the crate
event
Representation of SSE events based primarily off https://html.spec.whatwg.org/multipage/server-sent-events.html
event_stream
Stream that converts a stream of Bytes into Events
parser
UTF-8 agnostic parser implementation for SSE
retry
Helpers to handle connection delays when receiving errors
utf8_stream