Skip to main content

idiolect_indexer/
cursor.rs

1//! The [`CursorStore`] boundary — where the firehose sequence number
2//! lives across indexer restarts.
3//!
4//! The indexer commits a cursor after each handled live event so that
5//! a restart can resume from the last processed commit instead of
6//! replaying the relay's retention floor. Cursor commits do not
7//! advance on backfill events: those are already replayed by tap
8//! whenever a repo is resynced, so committing them would produce
9//! misleading "we are caught up" telemetry.
10//!
11//! The shape is the same ephemeral-state adapter pattern used for
12//! OAuth tokens and identity resolution: one narrow async trait with
13//! `Send + Sync` bounds, an in-memory impl for tests, production
14//! backends plugged in behind the same signature.
15
16use std::collections::HashMap;
17use std::sync::Mutex;
18
19use crate::error::IndexerError;
20
21/// Durable store for firehose cursor positions.
22///
23/// One cursor per subscription id; the subscription id is whatever
24/// identifies the connection (tapped instance url, jetstream endpoint,
25/// self-hosted relay, etc.). The orchestrator treats
26/// `Ok(None)` from `load` as "start from the beginning", which
27/// implementations may interpret as "latest cursor the server offers"
28/// for live streams.
29#[allow(async_fn_in_trait)]
30pub trait CursorStore: Send + Sync {
31    /// Load the last committed cursor for `subscription_id`, if any.
32    ///
33    /// # Errors
34    ///
35    /// Returns [`IndexerError::Cursor`] when the backing store fails.
36    async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError>;
37
38    /// Commit `seq` as the new cursor for `subscription_id`.
39    ///
40    /// # Errors
41    ///
42    /// Returns [`IndexerError::Cursor`] when the backing store fails.
43    async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError>;
44
45    /// Enumerate every committed `(subscription_id, seq)` pair.
46    ///
47    /// Used by operator tooling (a `/v1/cursors` admin endpoint, a
48    /// health-check that sanity-checks cursor freshness) and by
49    /// multi-subscription indexers that want to display the cursor
50    /// fleet at a glance. The default implementation returns an
51    /// empty vec so custom backends can opt in without breaking
52    /// existing callers; every shipped store overrides it with its
53    /// native enumeration.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`IndexerError::Cursor`] when the backing store fails.
58    async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
59        Ok(Vec::new())
60    }
61}
62
63/// `HashMap`-backed [`CursorStore`] for fixtures and unit tests.
64///
65/// Commits are in-memory only and are lost across process restarts;
66/// not suitable for production. A sqlite-backed impl is the usual
67/// next step for single-node indexers; Postgres or Redis for HA.
68#[derive(Debug, Default)]
69pub struct InMemoryCursorStore {
70    /// Cursor per subscription id, protected by a standard mutex.
71    /// Lock holds are sub-microsecond — a simple `HashMap::insert`
72    /// / `HashMap::get` — so a sync mutex inside an async trait
73    /// method is the right trade for avoiding a tokio dependency on
74    /// the default build.
75    cursors: Mutex<HashMap<String, u64>>,
76}
77
78impl InMemoryCursorStore {
79    /// Construct an empty store.
80    #[must_use]
81    pub fn new() -> Self {
82        Self::default()
83    }
84
85    /// Seed the store with an initial cursor, e.g. for test replay.
86    ///
87    /// # Panics
88    ///
89    /// Panics if the internal mutex has been poisoned by a previous
90    /// panic in another thread.
91    pub fn insert(&self, subscription_id: &str, seq: u64) {
92        let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
93        cursors.insert(subscription_id.to_owned(), seq);
94    }
95}
96
97/// Forward [`CursorStore`] through a shared `Arc<T>` so an `Arc<dyn
98/// CursorStore>` (or any concrete `Arc<SqliteCursorStore>`, etc.) can
99/// be passed anywhere a `CursorStore: Send + Sync` is expected. The
100/// `?Sized` bound lets the impl cover trait objects directly.
101impl<T: CursorStore + ?Sized> CursorStore for std::sync::Arc<T> {
102    async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
103        (**self).load(subscription_id).await
104    }
105
106    async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
107        (**self).commit(subscription_id, seq).await
108    }
109
110    async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
111        (**self).list().await
112    }
113}
114
115impl CursorStore for InMemoryCursorStore {
116    async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
117        // scope the lock guard so clippy's significant-drop-tightening
118        // sees the guard drop before the return point.
119        let seq = {
120            let cursors = self.cursors.lock().expect("cursors mutex poisoned");
121            cursors.get(subscription_id).copied()
122        };
123
124        Ok(seq)
125    }
126
127    async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
128        {
129            let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
130            cursors.insert(subscription_id.to_owned(), seq);
131        }
132
133        Ok(())
134    }
135
136    async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
137        let cursors = self.cursors.lock().expect("cursors mutex poisoned");
138        let mut out: Vec<(String, u64)> = cursors.iter().map(|(k, v)| (k.clone(), *v)).collect();
139        // Deterministic ordering so tests can compare directly.
140        out.sort_by(|a, b| a.0.cmp(&b.0));
141        Ok(out)
142    }
143}