atrg-stream 0.2.2

Jetstream consumer wiring for at-rust-go
Documentation

atrg-stream

Bounded, backpressure-aware Jetstream consumer for AT Protocol event streaming.

Part of at-rust-go (atrg) — a batteries-included AT Protocol backend framework for Rust.

What this crate provides

  • spawn_consumer — spawns a background tokio task that connects to an AT Protocol Jetstream relay and delivers events through a bounded channel
  • JetstreamEvent — re-exported event type from the Jetstream protocol
  • JetstreamMetrics — runtime metrics: events received, events dropped, reconnects, current backoff
  • StreamConfig — standalone configuration (no dependency on atrg-core) for relay host, collections, channel capacity, lag thresholds, and optional ZSTD dictionary
  • EventHandler<S> — type alias for async event handler closures that receive an event and a clone of your app state
  • Built-in backpressure — events flow through a tokio::sync::mpsc channel; when the handler falls behind, the consumer pauses WebSocket reads rather than buffering unbounded
  • Lag detection & shedding — warns via tracing when the channel reaches max_lag_events and drops the oldest events to keep memory bounded
  • ZSTD dictionary support — auto-fetches and caches dictionaries from a URL, or loads from a local path
  • Automatic reconnection with exponential backoff

Usage

[dependencies]
atrg-stream = "0.1"
tokio = { version = "1", features = ["full"] }
anyhow = "1"
use atrg_stream::{spawn_consumer, JetstreamEvent, StreamConfig};
use std::sync::Arc;

#[derive(Clone)]
struct MyState { /* your app state */ }

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let config = StreamConfig {
        host: "jetstream1.us-east.bsky.network".into(),
        collections: vec!["app.bsky.feed.post".into()],
        zstd_dict: None,
        channel_capacity: 1024,
        max_lag_events: 10_000,
    };

    let state = MyState {};

    let handler = Arc::new(|event: JetstreamEvent, _state: MyState| {
        Box::pin(async move {
            println!("got event from {}", event.did);
            Ok(())
        }) as futures::future::BoxFuture<'static, anyhow::Result<()>>
    });

    let _handle = spawn_consumer(&config, state, handler).await?;

    // Consumer runs in the background until the task is aborted
    tokio::signal::ctrl_c().await?;
    Ok(())
}

Design notes

This crate is deliberately independent of atrg-core to avoid cyclic dependencies. It defines its own StreamConfig that atrg-core maps from its JetstreamConfig before calling spawn_consumer. Events are delivered in arrival order per consumer connection.

When used via the atrg-core AtrgApp builder, all of this wiring is automatic — just add [jetstream] to your atrg.toml and register a handler with .on_event().

License

LGPL-3.0-only — see LICENSE.