Skip to main content

chainindex_core/
checkpoint.rs

1//! Checkpoint manager — persists the indexer's position for crash recovery.
2//!
3//! A checkpoint stores the last successfully processed block number and hash.
4//! On restart, the indexer resumes from the last checkpoint rather than
5//! re-indexing from scratch.
6
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9
10use crate::error::IndexerError;
11
12/// A persisted checkpoint for an indexer.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Checkpoint {
15    /// Chain slug (e.g. `"ethereum"`).
16    pub chain_id: String,
17    /// Unique indexer identifier.
18    pub indexer_id: String,
19    /// Last successfully processed block number.
20    pub block_number: u64,
21    /// Last successfully processed block hash.
22    pub block_hash: String,
23    /// Unix timestamp of when this checkpoint was saved.
24    pub updated_at: i64,
25}
26
27/// Trait for storing and loading checkpoints.
28///
29/// Implementations include `MemoryCheckpointStore`, `SqliteCheckpointStore`,
30/// and `PostgresCheckpointStore`.
31#[async_trait]
32pub trait CheckpointStore: Send + Sync {
33    /// Load the latest checkpoint for a given chain + indexer pair.
34    async fn load(
35        &self,
36        chain_id: &str,
37        indexer_id: &str,
38    ) -> Result<Option<Checkpoint>, IndexerError>;
39
40    /// Save (upsert) a checkpoint.
41    async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError>;
42
43    /// Delete a checkpoint (e.g. when resetting an indexer).
44    async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError>;
45}
46
47/// Manages checkpoint reads/writes for an indexer.
48pub struct CheckpointManager {
49    store: Box<dyn CheckpointStore>,
50    chain_id: String,
51    indexer_id: String,
52    /// How often to save (every N blocks).
53    save_interval: u64,
54    /// Block counter since last save.
55    counter: u64,
56}
57
58impl CheckpointManager {
59    pub fn new(
60        store: Box<dyn CheckpointStore>,
61        chain_id: impl Into<String>,
62        indexer_id: impl Into<String>,
63        save_interval: u64,
64    ) -> Self {
65        Self {
66            store,
67            chain_id: chain_id.into(),
68            indexer_id: indexer_id.into(),
69            save_interval,
70            counter: 0,
71        }
72    }
73
74    /// Load the saved checkpoint (returns `None` if none exists).
75    pub async fn load(&self) -> Result<Option<Checkpoint>, IndexerError> {
76        self.store.load(&self.chain_id, &self.indexer_id).await
77    }
78
79    /// Conditionally save a checkpoint every `save_interval` blocks.
80    ///
81    /// Call this after each block is successfully processed.
82    pub async fn maybe_save(
83        &mut self,
84        block_number: u64,
85        block_hash: &str,
86    ) -> Result<(), IndexerError> {
87        self.counter += 1;
88        if self.counter >= self.save_interval {
89            self.force_save(block_number, block_hash).await?;
90            self.counter = 0;
91        }
92        Ok(())
93    }
94
95    /// Immediately save a checkpoint (used on shutdown / reorg recovery).
96    pub async fn force_save(
97        &self,
98        block_number: u64,
99        block_hash: &str,
100    ) -> Result<(), IndexerError> {
101        let cp = Checkpoint {
102            chain_id: self.chain_id.clone(),
103            indexer_id: self.indexer_id.clone(),
104            block_number,
105            block_hash: block_hash.to_string(),
106            updated_at: chrono::Utc::now().timestamp(),
107        };
108        self.store.save(cp).await
109    }
110}
111
112// ─── In-memory store (for testing) ────────────────────────────────────────────
113
114use std::collections::HashMap;
115use std::sync::Mutex;
116
117/// In-memory checkpoint store for tests and ephemeral indexers.
118#[derive(Default)]
119pub struct MemoryCheckpointStore {
120    data: Mutex<HashMap<String, Checkpoint>>,
121}
122
123impl MemoryCheckpointStore {
124    pub fn new() -> Self {
125        Self::default()
126    }
127
128    fn key(chain_id: &str, indexer_id: &str) -> String {
129        format!("{chain_id}:{indexer_id}")
130    }
131}
132
133#[async_trait]
134impl CheckpointStore for MemoryCheckpointStore {
135    async fn load(
136        &self,
137        chain_id: &str,
138        indexer_id: &str,
139    ) -> Result<Option<Checkpoint>, IndexerError> {
140        Ok(self
141            .data
142            .lock()
143            .unwrap()
144            .get(&Self::key(chain_id, indexer_id))
145            .cloned())
146    }
147
148    async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError> {
149        let key = Self::key(&checkpoint.chain_id, &checkpoint.indexer_id);
150        self.data.lock().unwrap().insert(key, checkpoint);
151        Ok(())
152    }
153
154    async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError> {
155        self.data
156            .lock()
157            .unwrap()
158            .remove(&Self::key(chain_id, indexer_id));
159        Ok(())
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[tokio::test]
168    async fn memory_store_roundtrip() {
169        let store = Box::new(MemoryCheckpointStore::new());
170        let mgr = CheckpointManager::new(store, "ethereum", "my-indexer", 10);
171
172        // No checkpoint initially
173        assert!(mgr.load().await.unwrap().is_none());
174
175        // Force save
176        mgr.force_save(1000, "0xabc").await.unwrap();
177
178        // Load should return the checkpoint
179        let cp = mgr.load().await.unwrap().unwrap();
180        assert_eq!(cp.block_number, 1000);
181        assert_eq!(cp.block_hash, "0xabc");
182        assert_eq!(cp.chain_id, "ethereum");
183    }
184
185    #[tokio::test]
186    async fn checkpoint_save_interval() {
187        let store = Box::new(MemoryCheckpointStore::new());
188        let mut mgr = CheckpointManager::new(store, "ethereum", "idx", 5);
189
190        // Process 4 blocks — should not save yet
191        for i in 1..=4 {
192            mgr.maybe_save(i, "0xhash").await.unwrap();
193        }
194        assert!(mgr.load().await.unwrap().is_none());
195
196        // 5th block — should save
197        mgr.maybe_save(5, "0xhash5").await.unwrap();
198        let cp = mgr.load().await.unwrap().unwrap();
199        assert_eq!(cp.block_number, 5);
200    }
201}