idiolect_indexer/cursor.rs
1//! The [`CursorStore`] boundary — where the firehose sequence number
2//! lives across indexer restarts.
3//!
4//! The indexer commits a cursor after each handled live event so that
5//! a restart can resume from the last processed commit instead of
6//! replaying the relay's retention floor. Cursor commits do not
7//! advance on backfill events: those are already replayed by tap
8//! whenever a repo is resynced, so committing them would produce
9//! misleading "we are caught up" telemetry.
10//!
11//! The shape is the same ephemeral-state adapter pattern used for
12//! OAuth tokens and identity resolution: one narrow async trait with
13//! `Send + Sync` bounds, an in-memory impl for tests, production
14//! backends plugged in behind the same signature.
15
16use std::collections::HashMap;
17use std::sync::Mutex;
18
19use crate::error::IndexerError;
20
21/// Durable store for firehose cursor positions.
22///
23/// One cursor per subscription id; the subscription id is whatever
24/// identifies the connection (tapped instance url, jetstream endpoint,
25/// self-hosted relay, etc.). The orchestrator treats
26/// `Ok(None)` from `load` as "start from the beginning", which
27/// implementations may interpret as "latest cursor the server offers"
28/// for live streams.
29#[allow(async_fn_in_trait)]
30pub trait CursorStore: Send + Sync {
31 /// Load the last committed cursor for `subscription_id`, if any.
32 ///
33 /// # Errors
34 ///
35 /// Returns [`IndexerError::Cursor`] when the backing store fails.
36 async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError>;
37
38 /// Commit `seq` as the new cursor for `subscription_id`.
39 ///
40 /// # Errors
41 ///
42 /// Returns [`IndexerError::Cursor`] when the backing store fails.
43 async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError>;
44
45 /// Enumerate every committed `(subscription_id, seq)` pair.
46 ///
47 /// Used by operator tooling (a `/v1/cursors` admin endpoint, a
48 /// health-check that sanity-checks cursor freshness) and by
49 /// multi-subscription indexers that want to display the cursor
50 /// fleet at a glance. The default implementation returns an
51 /// empty vec so custom backends can opt in without breaking
52 /// existing callers; every shipped store overrides it with its
53 /// native enumeration.
54 ///
55 /// # Errors
56 ///
57 /// Returns [`IndexerError::Cursor`] when the backing store fails.
58 async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
59 Ok(Vec::new())
60 }
61}
62
63/// `HashMap`-backed [`CursorStore`] for fixtures and unit tests.
64///
65/// Commits are in-memory only and are lost across process restarts;
66/// not suitable for production. A sqlite-backed impl is the usual
67/// next step for single-node indexers; Postgres or Redis for HA.
68#[derive(Debug, Default)]
69pub struct InMemoryCursorStore {
70 /// Cursor per subscription id, protected by a standard mutex.
71 /// Lock holds are sub-microsecond — a simple `HashMap::insert`
72 /// / `HashMap::get` — so a sync mutex inside an async trait
73 /// method is the right trade for avoiding a tokio dependency on
74 /// the default build.
75 cursors: Mutex<HashMap<String, u64>>,
76}
77
78impl InMemoryCursorStore {
79 /// Construct an empty store.
80 #[must_use]
81 pub fn new() -> Self {
82 Self::default()
83 }
84
85 /// Seed the store with an initial cursor, e.g. for test replay.
86 ///
87 /// # Panics
88 ///
89 /// Panics if the internal mutex has been poisoned by a previous
90 /// panic in another thread.
91 pub fn insert(&self, subscription_id: &str, seq: u64) {
92 let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
93 cursors.insert(subscription_id.to_owned(), seq);
94 }
95}
96
97/// Forward [`CursorStore`] through a shared `Arc<T>` so an `Arc<dyn
98/// CursorStore>` (or any concrete `Arc<SqliteCursorStore>`, etc.) can
99/// be passed anywhere a `CursorStore: Send + Sync` is expected. The
100/// `?Sized` bound lets the impl cover trait objects directly.
101impl<T: CursorStore + ?Sized> CursorStore for std::sync::Arc<T> {
102 async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
103 (**self).load(subscription_id).await
104 }
105
106 async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
107 (**self).commit(subscription_id, seq).await
108 }
109
110 async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
111 (**self).list().await
112 }
113}
114
115impl CursorStore for InMemoryCursorStore {
116 async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
117 // scope the lock guard so clippy's significant-drop-tightening
118 // sees the guard drop before the return point.
119 let seq = {
120 let cursors = self.cursors.lock().expect("cursors mutex poisoned");
121 cursors.get(subscription_id).copied()
122 };
123
124 Ok(seq)
125 }
126
127 async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
128 {
129 let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
130 cursors.insert(subscription_id.to_owned(), seq);
131 }
132
133 Ok(())
134 }
135
136 async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
137 let cursors = self.cursors.lock().expect("cursors mutex poisoned");
138 let mut out: Vec<(String, u64)> = cursors.iter().map(|(k, v)| (k.clone(), *v)).collect();
139 // Deterministic ordering so tests can compare directly.
140 out.sort_by(|a, b| a.0.cmp(&b.0));
141 Ok(out)
142 }
143}