use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use crate::cursor::CursorStore;
use crate::error::IndexerError;
#[derive(Debug)]
pub struct FilesystemCursorStore {
path: PathBuf,
cache: Mutex<HashMap<String, u64>>,
}
impl FilesystemCursorStore {
pub fn open<P: Into<PathBuf>>(path: P) -> Result<Self, IndexerError> {
let path = path.into();
let cache = if path.exists() {
let bytes = fs::read(&path)
.map_err(|e| IndexerError::Cursor(format!("read {}: {e}", path.display())))?;
if bytes.is_empty() {
HashMap::new()
} else {
serde_json::from_slice::<HashMap<String, u64>>(&bytes)
.map_err(|e| IndexerError::Cursor(format!("parse {}: {e}", path.display())))?
}
} else {
HashMap::new()
};
Ok(Self {
path,
cache: Mutex::new(cache),
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
fn flush(&self, cache: &HashMap<String, u64>) -> Result<(), IndexerError> {
let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
let tmp = parent.join(format!(
".{}.tmp.{}",
self.path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("cursors"),
std::process::id(),
));
let bytes = serde_json::to_vec(cache)
.map_err(|e| IndexerError::Cursor(format!("serialize cursors: {e}")))?;
{
let mut f = fs::File::create(&tmp)
.map_err(|e| IndexerError::Cursor(format!("open temp {}: {e}", tmp.display())))?;
f.write_all(&bytes)
.map_err(|e| IndexerError::Cursor(format!("write temp {}: {e}", tmp.display())))?;
f.sync_all()
.map_err(|e| IndexerError::Cursor(format!("fsync temp {}: {e}", tmp.display())))?;
}
fs::rename(&tmp, &self.path).map_err(|e| {
IndexerError::Cursor(format!(
"rename {} -> {}: {e}",
tmp.display(),
self.path.display()
))
})?;
Ok(())
}
}
impl CursorStore for FilesystemCursorStore {
async fn load(&self, subscription_id: &str) -> Result<Option<u64>, IndexerError> {
let cache = self
.cache
.lock()
.map_err(|e| IndexerError::Cursor(format!("cache mutex poisoned: {e}")))?;
Ok(cache.get(subscription_id).copied())
}
async fn commit(&self, subscription_id: &str, seq: u64) -> Result<(), IndexerError> {
let mut cache = self
.cache
.lock()
.map_err(|e| IndexerError::Cursor(format!("cache mutex poisoned: {e}")))?;
cache.insert(subscription_id.to_owned(), seq);
self.flush(&cache)?;
Ok(())
}
async fn list(&self) -> Result<Vec<(String, u64)>, IndexerError> {
let cache = self
.cache
.lock()
.map_err(|e| IndexerError::Cursor(format!("cache mutex poisoned: {e}")))?;
let mut out: Vec<(String, u64)> = cache.iter().map(|(k, v)| (k.clone(), *v)).collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
}