use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use serde_json::{Value, json};
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use crate::Result;
#[derive(Clone)]
pub struct Checkpoint {
inner: Arc<Inner>,
}
struct Inner {
path: PathBuf,
done: Mutex<HashSet<String>>,
}
impl Checkpoint {
pub async fn load(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let mut done = HashSet::new();
if let Ok(content) = tokio::fs::read_to_string(&path).await {
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(v) = serde_json::from_str::<Value>(line)
&& let Some(k) = v.get("key").and_then(|x| x.as_str())
{
done.insert(k.to_string());
}
}
}
Ok(Self {
inner: Arc::new(Inner {
path,
done: Mutex::new(done),
}),
})
}
pub async fn is_done(&self, key: &str) -> bool {
self.inner.done.lock().await.contains(key)
}
pub async fn done_count(&self) -> usize {
self.inner.done.lock().await.len()
}
pub async fn mark_done(&self, key: &str, value: Option<Value>) -> Result<()> {
let mut set = self.inner.done.lock().await;
if set.contains(key) {
return Ok(());
}
let mut line = json!({ "key": key });
if let Some(v) = value {
line["value"] = v;
}
let mut text = line.to_string();
text.push('\n');
if let Some(parent) = self.inner.path.parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await?;
}
let mut f = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.inner.path)
.await?;
f.write_all(text.as_bytes()).await?;
f.flush().await?;
set.insert(key.to_string());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tmp_path(name: &str) -> PathBuf {
let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
p.push("target");
p.push("test-tmp");
p.push(format!("ckpt-{}-{}.jsonl", name, std::process::id()));
p
}
#[tokio::test]
async fn mark_and_reload_roundtrip() {
let path = tmp_path("roundtrip");
let _ = tokio::fs::remove_file(&path).await;
let ck = Checkpoint::load(&path).await.unwrap();
assert_eq!(ck.done_count().await, 0);
ck.mark_done("a", None).await.unwrap();
ck.mark_done("b", Some(json!({"n": 1}))).await.unwrap();
ck.mark_done("a", None).await.unwrap(); assert_eq!(ck.done_count().await, 2);
assert!(ck.is_done("a").await);
assert!(!ck.is_done("zzz").await);
let ck2 = Checkpoint::load(&path).await.unwrap();
assert_eq!(ck2.done_count().await, 2);
assert!(ck2.is_done("a").await && ck2.is_done("b").await);
let _ = tokio::fs::remove_file(&path).await;
}
#[tokio::test]
async fn missing_file_is_empty() {
let path = tmp_path("missing");
let _ = tokio::fs::remove_file(&path).await;
let ck = Checkpoint::load(&path).await.unwrap();
assert_eq!(ck.done_count().await, 0);
}
}