idiolect-indexer
Firehose consumer for dev.idiolect.* records.
Overview
Sits between a firehose transport (tapped, jetstream, a custom adapter)
and an appview's per-record handlers. The crate owns the event loop,
cursor management, and reconnect/retry policy; consumers provide handler
logic and pick their transport via feature flag.
Architecture
flowchart LR
subgraph transport["EventStream"]
T1["TappedEventStream"]
T2["JetstreamEventStream"]
T3["InMemoryEventStream"]
RC["ReconnectingEventStream<br/>(backoff + replay)"]
end
subgraph loop["drive_indexer"]
DEC["decode (AnyRecord)"]
DISP["dispatch"]
ACK["commit cursor"]
end
subgraph handler["RecordHandler"]
H0["user impl"]
H1["RetryingHandler"]
H2["CircuitBreakerHandler"]
end
subgraph cursor["CursorStore"]
C1["InMemoryCursorStore"]
C2["FilesystemCursorStore"]
C3["SqliteCursorStore"]
end
FH[("firehose")]
FH --> T1
FH --> T2
T1 --> RC
T2 --> RC
RC --> DEC
T3 --> DEC
DEC --> DISP
DISP --> H0
H1 -.wraps.-> H0
H2 -.wraps.-> H0
DISP --> ACK
ACK --> C1
ACK --> C2
ACK --> C3
Three traits carry every boundary:
EventStream— yields commits, one at a time. Impls for in-memory fixtures, tapped, and jetstream.CursorStore— persists the ack cursor so the indexer resumes after restart. Impls for in-memory, filesystem JSON, and sqlite.RecordHandler— user code. Receives each decoded commit as anIndexerEventwith the body already materialized intoAnyRecord.
drive_indexer wires the three together and owns the event loop: decode,
dispatch, commit the cursor, handle backpressure errors, exit cleanly on
stream close. ReconnectingEventStream layers exponential-backoff
reconnect plus cursor replay for production deployments where transport
flaps are routine. RetryingHandler and CircuitBreakerHandler wrap any
RecordHandler with the matching resilience policy.
Usage
use ;
let mut stream = new;
let handler = new;
let cursors = new;
drive_indexer.await?;
Feature flags
| Flag | Default | Effect |
|---|---|---|
firehose-tapped |
off | TappedEventStream backed by tapped. Live firehose + repo backfill. |
firehose-jetstream |
off | JetstreamEventStream for jetstream's JSON-over-websocket. Includes keepalive pings. |
cursor-filesystem |
off | FilesystemCursorStore — one JSON file per subscription id. |
cursor-sqlite |
off | SqliteCursorStore — WAL-journaled sqlite table. |
reconnecting |
off | ReconnectingEventStream + BackoffPolicy. |
resilience |
off | RetryingHandler + CircuitBreakerHandler. |
Design notes
- Every event carries a
live: bool. Live and backfill events dispatch identically at the handler, but the cursor store only advances on live events — replaying backfill on reconnect is safe and expected. IndexerEvent.collectionis a typedNsid(parsed at the stream-decode boundary). A frame with a malformed NSID is skipped with atracing::warn!rather than fatal-ing the loop, so a single buggy publisher does not drop the firehose for everyone else.- Trait objects are not dyn-compatible because the traits use native
async fn; the crate ships Arc blanket impls so consumers share state viaArc<ConcreteImpl>instead.
Stability
idiolect is pre-1.0. Releases in the 0.x series may include
arbitrary breaking changes between minor versions — Rust APIs,
lexicon shapes, wire formats, and CLI surfaces are all in scope.
Pin to an exact version if you depend on this crate, and read
CHANGELOG.md before bumping.
Related
idiolect-records—AnyRecordanddecode_recordmaterialize bodies inside the indexer.idiolect-orchestratorandidiolect-observer— both consume this crate's firehose stream.