Skip to main content

atrg_stream/
lib.rs

1#![deny(unsafe_code)]
2#![warn(missing_docs)]
3//! Jetstream consumer wiring for at-rust-go.
4//!
5//! Provides a bounded, backpressure-aware Jetstream consumer that
6//! spawns as a background task and delivers events to a user-supplied handler.
7//!
8//! This crate is deliberately independent of `atrg-core` to avoid cyclic
9//! dependencies. It defines its own [`StreamConfig`] that `atrg-core` maps
10//! its `JetstreamConfig` into before calling [`spawn_consumer`].
11
12pub mod backoff;
13pub mod consumer;
14pub mod cursor;
15pub mod event;
16pub mod metrics;
17pub mod router;
18pub mod zstd_dict;
19
20pub use consumer::{spawn_consumer, spawn_consumer_with_cursor};
21pub use event::JetstreamEvent;
22pub use metrics::JetstreamMetrics;
23pub use router::{CommitEvent, EventRouterBuilder, Operation};
24
25use std::sync::Arc;
26
27use futures::future::BoxFuture;
28
29/// Configuration for the Jetstream consumer.
30///
31/// This mirrors the fields from `atrg-core`'s `JetstreamConfig` but lives
32/// in this crate so that `atrg-stream` has zero dependency on `atrg-core`.
33#[derive(Debug, Clone)]
34pub struct StreamConfig {
35    /// Jetstream relay host, e.g. `"jetstream1.us-east.bsky.network"`.
36    pub host: String,
37    /// NSID collections to subscribe to, e.g. `["app.bsky.feed.post"]`.
38    pub collections: Vec<String>,
39    /// Optional path or URL to a ZSTD dictionary for decompression.
40    pub zstd_dict: Option<String>,
41    /// Bounded back-pressure channel size (default: 1024).
42    pub channel_capacity: usize,
43    /// Event lag threshold before shedding/warning (default: 10_000).
44    pub max_lag_events: usize,
45    /// Optional cursor mode for resumption.
46    ///
47    /// - `None` or `Some("live")` — always start from now (no cursor).
48    /// - `Some("auto")` — resume from the last stored cursor in the database.
49    /// - `Some("<numeric>")` — use the given value as a microsecond timestamp cursor.
50    pub cursor: Option<String>,
51}
52
53/// Type alias for event handler functions.
54///
55/// The handler receives a [`JetstreamEvent`] and a clone of whatever state
56/// object the caller supplied to [`spawn_consumer`]. The state type must be
57/// `Clone + Send + Sync + 'static`.
58pub type EventHandler<S> =
59    Arc<dyn Fn(JetstreamEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;