use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
use crate::state::atomic_write::write_atomically;
use crate::state::resume_key::KEY_FORMAT_VERSION;
use crate::state::{Checkpoint, ResumeKey, StateStore, StoreError};
mod load;
mod lock;
mod merge;
use load::load_from_disk;
use lock::lock_path_for;
use merge::merge_monotonic;
const FILE_FORMAT_VERSION: u32 = 1;
const DIGEST_BYTE_LEN: usize = 32;
#[derive(Debug, Clone)]
pub struct JsonFileStore {
path: Arc<PathBuf>,
lock_path: Arc<PathBuf>,
inner: Arc<RwLock<HashMap<ResumeKey, Checkpoint>>>,
disk_write_mutex: Arc<Mutex<()>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct FileFormat {
version: u32,
key_format_version: u32,
checkpoints: HashMap<String, Checkpoint>,
}
impl JsonFileStore {
pub async fn open(path: impl Into<PathBuf>) -> Result<Self, StoreError> {
let path: PathBuf = path.into();
let lock_path = lock_path_for(&path);
let read_path = path.clone();
let initial = tokio::task::spawn_blocking(move || load_from_disk(&read_path))
.await
.map_err(StoreError::BackgroundTaskFailed)??;
Ok(Self {
path: Arc::new(path),
lock_path: Arc::new(lock_path),
inner: Arc::new(RwLock::new(initial)),
disk_write_mutex: Arc::new(Mutex::new(())),
})
}
async fn write_through<F>(&self, mutate: F) -> Result<(), StoreError>
where
F: FnOnce(&mut HashMap<ResumeKey, Checkpoint>) + Send,
{
let _disk_guard = self.disk_write_mutex.lock().await;
let (pre_state, mut candidate) = {
let map = self.inner.read().await;
let pre = map.clone();
let mut c = map.clone();
mutate(&mut c);
(pre, c)
};
for (k, pre_cp) in &pre_state {
if let Some(cand_cp) = candidate.get(k) {
if cand_cp.last_committed_sequence <= pre_cp.last_committed_sequence {
candidate.insert(k.clone(), pre_cp.clone());
}
}
}
let deletes: HashMap<ResumeKey, u64> = pre_state
.iter()
.filter(|(k, _)| !candidate.contains_key(*k))
.map(|(k, cp)| (k.clone(), cp.last_committed_sequence))
.collect();
let path = (*self.path).clone();
let lock_path = (*self.lock_path).clone();
let merged = tokio::task::spawn_blocking(move || -> Result<_, StoreError> {
let lock_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.map_err(StoreError::Io)?;
let mut rw_lock = fd_lock::RwLock::new(lock_file);
let _guard = loop {
match rw_lock.write() {
Ok(g) => break g,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(StoreError::Io(e)),
}
};
let disk_state = load_from_disk(&path)?;
let disk_snapshot = disk_state.clone();
let merged = merge_monotonic(candidate, disk_state, &deletes);
if merged != disk_snapshot {
let file = FileFormat {
version: FILE_FORMAT_VERSION,
key_format_version: KEY_FORMAT_VERSION,
checkpoints: merged
.iter()
.map(|(k, v)| (k.as_hex(), v.clone()))
.collect(),
};
let bytes = serde_json::to_vec_pretty(&file).map_err(StoreError::Encode)?;
write_atomically(&path, &bytes)?;
}
Ok(merged)
})
.await
.map_err(StoreError::BackgroundTaskFailed)??;
let mut map = self.inner.write().await;
*map = merged;
Ok(())
}
}
#[async_trait]
impl StateStore for JsonFileStore {
async fn get(&self, key: &ResumeKey) -> Result<Option<Checkpoint>, StoreError> {
let map = self.inner.read().await;
Ok(map.get(key).cloned())
}
async fn put(&self, key: &ResumeKey, checkpoint: Checkpoint) -> Result<(), StoreError> {
let key = key.clone();
self.write_through(move |map| {
map.insert(key, checkpoint);
})
.await
}
async fn delete(&self, key: &ResumeKey) -> Result<(), StoreError> {
let key = key.clone();
self.write_through(move |map| {
map.remove(&key);
})
.await
}
}
#[cfg(test)]
mod tests;