Skip to main content

idiolect_indexer/
lib.rs

1//! Firehose consumer for `dev.idiolect.*` records.
2//!
3//! `idiolect-indexer` sits between a tapped-backed firehose
4//! transport and an appview's per-record handlers. The crate is
5//! organized around three narrow traits that mirror the adapter
6//! pattern already used in `idiolect-lens`:
7//!
8//! - [`EventStream`]: source of raw firehose commits.
9//! - [`RecordHandler`]: appview-side consumer of decoded records.
10//! - [`CursorStore`]: durable seat for the firehose cursor so the
11//!   indexer resumes cleanly after a restart.
12//!
13//! The orchestrator [`drive_indexer`] pulls from the stream, filters
14//! to the `dev.idiolect.*` nsid prefix, decodes bodies into
15//! [`idiolect_records::AnyRecord`], dispatches to the handler, and
16//! commits the cursor on live events.
17//!
18//! The default feature set is transport-agnostic: every trait ships
19//! an `InMemory*` impl that appview tests can wire up directly.
20//! Enable `firehose-tapped` to plug the tapped-based adapter in.
21//!
22//! # Quickstart (in-memory)
23//!
24//! ```
25//! use idiolect_indexer::{
26//!     InMemoryCursorStore, InMemoryEventStream, IndexerAction, IndexerConfig,
27//!     NoopRecordHandler, RawEvent, drive_idiolect_indexer,
28//! };
29//!
30//! # #[tokio::main(flavor = "current_thread")]
31//! # async fn main() {
32//! let mut stream = InMemoryEventStream::new();
33//! stream.push(RawEvent {
34//!     seq: 1,
35//!     live: true,
36//!     did: "did:plc:alice".to_owned(),
37//!     rev: "3l5".to_owned(),
38//!     collection: idiolect_records::Nsid::parse("dev.idiolect.encounter").unwrap(),
39//!     rkey: "3l5".to_owned(),
40//!     action: IndexerAction::Delete,
41//!     cid: None,
42//!     body: None,
43//! });
44//!
45//! let handler = NoopRecordHandler::new();
46//! let cursors = InMemoryCursorStore::new();
47//! let cfg = IndexerConfig::default();
48//!
49//! drive_idiolect_indexer(&mut stream, &handler, &cursors, &cfg)
50//!     .await
51//!     .unwrap();
52//!
53//! assert_eq!(handler.observed(), 1);
54//! assert_eq!(handler.deletes(), 1);
55//! # }
56//! ```
57
58pub mod cursor;
59pub mod error;
60pub mod event;
61pub mod handler;
62pub mod indexer;
63#[cfg(feature = "reconnecting")]
64pub mod reconnect;
65#[cfg(feature = "resilience")]
66pub mod resilience;
67pub mod stream;
68
69#[cfg(feature = "firehose-tapped")]
70pub mod tapped;
71
72#[cfg(feature = "firehose-jetstream")]
73pub mod jetstream;
74
75#[cfg(feature = "cursor-filesystem")]
76pub mod cursor_fs;
77
78#[cfg(feature = "cursor-sqlite")]
79pub mod cursor_sqlite;
80
81pub use cursor::{CursorStore, InMemoryCursorStore};
82pub use error::IndexerError;
83pub use event::{IndexerAction, IndexerEvent};
84pub use handler::{NoopRecordHandler, RecordHandler};
85pub use indexer::{IndexerConfig, drive_idiolect_indexer, drive_indexer};
86pub use stream::{EventStream, InMemoryEventStream, RawEvent};
87
88#[cfg(feature = "firehose-tapped")]
89pub use tapped::TappedEventStream;
90
91#[cfg(feature = "firehose-jetstream")]
92pub use jetstream::{JetstreamEventStream, parse_frame as parse_jetstream_frame};
93
94#[cfg(feature = "cursor-filesystem")]
95pub use cursor_fs::FilesystemCursorStore;
96
97#[cfg(feature = "cursor-sqlite")]
98pub use cursor_sqlite::SqliteCursorStore;
99
100#[cfg(feature = "reconnecting")]
101pub use reconnect::{BackoffPolicy, ReconnectingEventStream};
102
103#[cfg(feature = "resilience")]
104pub use resilience::{CircuitBreakerHandler, CircuitPolicy, RetryPolicy, RetryingHandler};