Skip to main content

idiolect_indexer/
stream.rs

1//! The [`EventStream`] boundary — where firehose events come from.
2//!
3//! The indexer's orchestrator pulls from this trait; any implementation
4//! that can hand back `(action, did, rev, collection, rkey, body)`
5//! tuples in firehose order satisfies it. Ships with
6//! [`InMemoryEventStream`] for fixtures and tests; the tapped-backed
7//! adapter lives behind the `firehose-tapped` feature in
8//! [`crate::tapped`].
9
10use std::collections::VecDeque;
11
12use idiolect_records::Nsid;
13
14use crate::error::IndexerError;
15use crate::event::IndexerAction;
16
17/// A single firehose commit as seen by the stream layer, before the
18/// indexer decodes the record body.
19///
20/// Equivalent to `tapped::RecordEvent` but without the lifetime on
21/// the record body: the body is materialized as an owned
22/// `serde_json::Value` so the indexer does not hold a reference into
23/// the stream's buffer across handler awaits.
24#[derive(Debug, Clone)]
25pub struct RawEvent {
26    /// Sequence id assigned by the stream implementation. Must be
27    /// monotonically non-decreasing per connection.
28    pub seq: u64,
29    /// True if from the live firehose; false for backfill / resync.
30    pub live: bool,
31    /// DID of the repo that produced this commit.
32    pub did: String,
33    /// Repository revision the commit advanced to (TID).
34    pub rev: String,
35    /// Collection NSID, e.g. `dev.idiolect.encounter`.
36    pub collection: Nsid,
37    /// Record key within the collection.
38    pub rkey: String,
39    /// Commit action.
40    pub action: IndexerAction,
41    /// CID of the record body. `None` for delete events.
42    pub cid: Option<String>,
43    /// The record body json. `None` for delete events or when the
44    /// stream could not parse the embedded body.
45    pub body: Option<serde_json::Value>,
46}
47
48/// Async source of firehose commits.
49///
50/// Implementations return `Ok(Some(event))` on success, `Ok(None)`
51/// when the stream is cleanly exhausted (useful for bounded mocks),
52/// and [`IndexerError::Stream`] for transport-level failures.
53///
54/// Implementations are `Send + Sync` so the orchestrator can share
55/// them across tokio tasks.
56#[allow(async_fn_in_trait)]
57pub trait EventStream: Send + Sync {
58    /// Await the next firehose event.
59    ///
60    /// # Errors
61    ///
62    /// Returns [`IndexerError::Stream`] on transport errors.
63    async fn next_event(&mut self) -> Result<Option<RawEvent>, IndexerError>;
64}
65
66/// `VecDeque`-backed [`EventStream`] for tests and offline fixtures.
67///
68/// Pre-seed events with [`InMemoryEventStream::push`] or from an
69/// iterator; [`EventStream::next_event`] drains the queue in push
70/// order and returns `Ok(None)` once it is empty.
71#[derive(Debug, Default)]
72pub struct InMemoryEventStream {
73    /// The internal queue; events are drained in push order.
74    queue: VecDeque<RawEvent>,
75}
76
77impl InMemoryEventStream {
78    /// Construct an empty stream.
79    #[must_use]
80    pub fn new() -> Self {
81        Self::default()
82    }
83
84    /// Push one event onto the tail of the queue.
85    pub fn push(&mut self, event: RawEvent) {
86        self.queue.push_back(event);
87    }
88
89    /// Extend the queue with an iterator of events.
90    pub fn extend<I>(&mut self, events: I)
91    where
92        I: IntoIterator<Item = RawEvent>,
93    {
94        self.queue.extend(events);
95    }
96
97    /// Number of events waiting to be consumed.
98    #[must_use]
99    pub fn len(&self) -> usize {
100        self.queue.len()
101    }
102
103    /// Whether the queue is empty.
104    #[must_use]
105    pub fn is_empty(&self) -> bool {
106        self.queue.is_empty()
107    }
108}
109
110impl EventStream for InMemoryEventStream {
111    async fn next_event(&mut self) -> Result<Option<RawEvent>, IndexerError> {
112        Ok(self.queue.pop_front())
113    }
114}