use std::collections::HashMap;
use std::sync::Mutex;
use crate::error::IndexerError;
#[allow(async_fn_in_trait)]
pub trait CursorStore: Send + Sync {
async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError>;
async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError>;
async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
Ok(Vec::new())
}
}
#[derive(Debug, Default)]
pub struct InMemoryCursorStore {
cursors: Mutex<HashMap<String, u64>>,
}
impl InMemoryCursorStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn insert(&self, subscription_id: &str, seq: u64) {
let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
cursors.insert(subscription_id.to_owned(), seq);
}
}
impl<T: CursorStore + ?Sized> CursorStore for std::sync::Arc<T> {
async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
(**self).load(subscription_id).await
}
async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
(**self).commit(subscription_id, seq).await
}
async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
(**self).list().await
}
}
impl CursorStore for InMemoryCursorStore {
async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
let seq = {
let cursors = self.cursors.lock().expect("cursors mutex poisoned");
cursors.get(subscription_id).copied()
};
Ok(seq)
}
async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
{
let mut cursors = self.cursors.lock().expect("cursors mutex poisoned");
cursors.insert(subscription_id.to_owned(), seq);
}
Ok(())
}
async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
let cursors = self.cursors.lock().expect("cursors mutex poisoned");
let mut out: Vec<(String, u64)> = cursors.iter().map(|(k, v)| (k.clone(), *v)).collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
}