use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::error::IndexerError;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
pub chain_id: String,
pub indexer_id: String,
pub block_number: u64,
pub block_hash: String,
pub updated_at: i64,
}
#[async_trait]
pub trait CheckpointStore: Send + Sync {
async fn load(
&self,
chain_id: &str,
indexer_id: &str,
) -> Result<Option<Checkpoint>, IndexerError>;
async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError>;
async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError>;
}
pub struct CheckpointManager {
store: Box<dyn CheckpointStore>,
chain_id: String,
indexer_id: String,
save_interval: u64,
counter: u64,
}
impl CheckpointManager {
pub fn new(
store: Box<dyn CheckpointStore>,
chain_id: impl Into<String>,
indexer_id: impl Into<String>,
save_interval: u64,
) -> Self {
Self {
store,
chain_id: chain_id.into(),
indexer_id: indexer_id.into(),
save_interval,
counter: 0,
}
}
pub async fn load(&self) -> Result<Option<Checkpoint>, IndexerError> {
self.store.load(&self.chain_id, &self.indexer_id).await
}
pub async fn maybe_save(
&mut self,
block_number: u64,
block_hash: &str,
) -> Result<(), IndexerError> {
self.counter += 1;
if self.counter >= self.save_interval {
self.force_save(block_number, block_hash).await?;
self.counter = 0;
}
Ok(())
}
pub async fn force_save(
&self,
block_number: u64,
block_hash: &str,
) -> Result<(), IndexerError> {
let cp = Checkpoint {
chain_id: self.chain_id.clone(),
indexer_id: self.indexer_id.clone(),
block_number,
block_hash: block_hash.to_string(),
updated_at: chrono::Utc::now().timestamp(),
};
self.store.save(cp).await
}
}
use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Default)]
pub struct MemoryCheckpointStore {
data: Mutex<HashMap<String, Checkpoint>>,
}
impl MemoryCheckpointStore {
pub fn new() -> Self {
Self::default()
}
fn key(chain_id: &str, indexer_id: &str) -> String {
format!("{chain_id}:{indexer_id}")
}
}
#[async_trait]
impl CheckpointStore for MemoryCheckpointStore {
async fn load(
&self,
chain_id: &str,
indexer_id: &str,
) -> Result<Option<Checkpoint>, IndexerError> {
Ok(self
.data
.lock()
.unwrap()
.get(&Self::key(chain_id, indexer_id))
.cloned())
}
async fn save(&self, checkpoint: Checkpoint) -> Result<(), IndexerError> {
let key = Self::key(&checkpoint.chain_id, &checkpoint.indexer_id);
self.data.lock().unwrap().insert(key, checkpoint);
Ok(())
}
async fn delete(&self, chain_id: &str, indexer_id: &str) -> Result<(), IndexerError> {
self.data
.lock()
.unwrap()
.remove(&Self::key(chain_id, indexer_id));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn memory_store_roundtrip() {
let store = Box::new(MemoryCheckpointStore::new());
let mgr = CheckpointManager::new(store, "ethereum", "my-indexer", 10);
assert!(mgr.load().await.unwrap().is_none());
mgr.force_save(1000, "0xabc").await.unwrap();
let cp = mgr.load().await.unwrap().unwrap();
assert_eq!(cp.block_number, 1000);
assert_eq!(cp.block_hash, "0xabc");
assert_eq!(cp.chain_id, "ethereum");
}
#[tokio::test]
async fn checkpoint_save_interval() {
let store = Box::new(MemoryCheckpointStore::new());
let mut mgr = CheckpointManager::new(store, "ethereum", "idx", 5);
for i in 1..=4 {
mgr.maybe_save(i, "0xhash").await.unwrap();
}
assert!(mgr.load().await.unwrap().is_none());
mgr.maybe_save(5, "0xhash5").await.unwrap();
let cp = mgr.load().await.unwrap().unwrap();
assert_eq!(cp.block_number, 5);
}
}