chainindex_core/
checkpoint.rs1use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9
10use crate::error::IndexerError;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Checkpoint {
15 pub chain_id: String,
17 pub indexer_id: String,
19 pub block_number: u64,
21 pub block_hash: String,
23 pub updated_at: i64,
25}
26
27#[async_trait]
32pub trait CheckpointStore: Send + Sync {
33 async fn load(
35 &self,
36 chain_id: &str,
37 indexer_id: &str,
38 ) -> Result<Option<Checkpoint>, IndexerError>;
39
40 async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError>;
42
43 async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError>;
45}
46
47pub struct CheckpointManager {
49 store: Box<dyn CheckpointStore>,
50 chain_id: String,
51 indexer_id: String,
52 save_interval: u64,
54 counter: u64,
56}
57
58impl CheckpointManager {
59 pub fn new(
60 store: Box<dyn CheckpointStore>,
61 chain_id: impl Into<String>,
62 indexer_id: impl Into<String>,
63 save_interval: u64,
64 ) -> Self {
65 Self {
66 store,
67 chain_id: chain_id.into(),
68 indexer_id: indexer_id.into(),
69 save_interval,
70 counter: 0,
71 }
72 }
73
74 pub async fn load(&self) -> Result<Option<Checkpoint>, IndexerError> {
76 self.store.load(&self.chain_id, &self.indexer_id).await
77 }
78
79 pub async fn maybe_save(
83 &mut self,
84 block_number: u64,
85 block_hash: &str,
86 ) -> Result<(), IndexerError> {
87 self.counter += 1;
88 if self.counter >= self.save_interval {
89 self.force_save(block_number, block_hash).await?;
90 self.counter = 0;
91 }
92 Ok(())
93 }
94
95 pub async fn force_save(
97 &self,
98 block_number: u64,
99 block_hash: &str,
100 ) -> Result<(), IndexerError> {
101 let cp = Checkpoint {
102 chain_id: self.chain_id.clone(),
103 indexer_id: self.indexer_id.clone(),
104 block_number,
105 block_hash: block_hash.to_string(),
106 updated_at: chrono::Utc::now().timestamp(),
107 };
108 self.store.save(cp).await
109 }
110}
111
112use std::collections::HashMap;
115use std::sync::Mutex;
116
117#[derive(Default)]
119pub struct MemoryCheckpointStore {
120 data: Mutex<HashMap<String, Checkpoint>>,
121}
122
123impl MemoryCheckpointStore {
124 pub fn new() -> Self {
125 Self::default()
126 }
127
128 fn key(chain_id: &str, indexer_id: &str) -> String {
129 format!("{chain_id}:{indexer_id}")
130 }
131}
132
133#[async_trait]
134impl CheckpointStore for MemoryCheckpointStore {
135 async fn load(
136 &self,
137 chain_id: &str,
138 indexer_id: &str,
139 ) -> Result<Option<Checkpoint>, IndexerError> {
140 Ok(self
141 .data
142 .lock()
143 .unwrap()
144 .get(&Self::key(chain_id, indexer_id))
145 .cloned())
146 }
147
148 async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError> {
149 let key = Self::key(&checkpoint.chain_id, &checkpoint.indexer_id);
150 self.data.lock().unwrap().insert(key, checkpoint);
151 Ok(())
152 }
153
154 async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError> {
155 self.data
156 .lock()
157 .unwrap()
158 .remove(&Self::key(chain_id, indexer_id));
159 Ok(())
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[tokio::test]
168 async fn memory_store_roundtrip() {
169 let store = Box::new(MemoryCheckpointStore::new());
170 let mgr = CheckpointManager::new(store, "ethereum", "my-indexer", 10);
171
172 assert!(mgr.load().await.unwrap().is_none());
174
175 mgr.force_save(1000, "0xabc").await.unwrap();
177
178 let cp = mgr.load().await.unwrap().unwrap();
180 assert_eq!(cp.block_number, 1000);
181 assert_eq!(cp.block_hash, "0xabc");
182 assert_eq!(cp.chain_id, "ethereum");
183 }
184
185 #[tokio::test]
186 async fn checkpoint_save_interval() {
187 let store = Box::new(MemoryCheckpointStore::new());
188 let mut mgr = CheckpointManager::new(store, "ethereum", "idx", 5);
189
190 for i in 1..=4 {
192 mgr.maybe_save(i, "0xhash").await.unwrap();
193 }
194 assert!(mgr.load().await.unwrap().is_none());
195
196 mgr.maybe_save(5, "0xhash5").await.unwrap();
198 let cp = mgr.load().await.unwrap().unwrap();
199 assert_eq!(cp.block_number, 5);
200 }
201}