Skip to main content

wesichain_graph/
file_checkpointer.rs

1use ahash::RandomState;
2use std::fs::{self, File, OpenOptions};
3
4use std::io::{BufRead, BufReader, Write};
5use std::path::{Path, PathBuf};
6
7use serde::{Deserialize, Serialize};
8
9use crate::{Checkpoint, CheckpointMetadata, Checkpointer, HistoryCheckpointer, StateSchema};
10use wesichain_core::WesichainError;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(bound = "S: StateSchema")]
14pub struct CheckpointRecord<S: StateSchema> {
15    pub seq: u64,
16    pub created_at: String,
17    pub checkpoint: Checkpoint<S>,
18}
19
20#[derive(Clone, Debug)]
21pub struct FileCheckpointer {
22    base_dir: PathBuf,
23}
24
25impl FileCheckpointer {
26    pub fn new<P: AsRef<Path>>(base_dir: P) -> Self {
27        Self {
28            base_dir: base_dir.as_ref().to_path_buf(),
29        }
30    }
31
32    fn sanitize_thread_id(thread_id: &str) -> String {
33        let mut out = String::with_capacity(thread_id.len());
34        for ch in thread_id.chars() {
35            match ch {
36                '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => out.push('_'),
37                c if c.is_control() => {}
38                c => out.push(c),
39            }
40        }
41        let trimmed = out.trim_matches(|c: char| c == '.' || c.is_whitespace() || c == '_');
42        if trimmed.is_empty() {
43            let hash = RandomState::with_seeds(0x517cc1b727220a95, 0x6ed9eba1999cd92d, 0, 0)
44                .hash_one(thread_id);
45            return format!("thread-{:08x}", hash);
46        }
47        trimmed.to_string()
48    }
49
50    fn thread_path(&self, thread_id: &str) -> PathBuf {
51        let filename = format!("{}.jsonl", Self::sanitize_thread_id(thread_id));
52        self.base_dir.join(filename)
53    }
54
55    fn next_seq<S: StateSchema>(&self, thread_id: &str) -> Result<u64, WesichainError> {
56        let path = self.thread_path(thread_id);
57        if !path.exists() {
58            return Ok(1);
59        }
60        let file =
61            File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
62        let reader = BufReader::new(file);
63        let mut last: Option<CheckpointRecord<S>> = None;
64        for line in reader.lines() {
65            let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
66            if line.trim().is_empty() {
67                continue;
68            }
69            last = Some(
70                serde_json::from_str(&line)
71                    .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?,
72            );
73        }
74        Ok(last.map(|record| record.seq + 1).unwrap_or(1))
75    }
76}
77
78#[async_trait::async_trait]
79impl<S: StateSchema> Checkpointer<S> for FileCheckpointer {
80    async fn save(&self, checkpoint: &Checkpoint<S>) -> Result<(), WesichainError> {
81        fs::create_dir_all(&self.base_dir)
82            .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
83
84        let path = self.thread_path(&checkpoint.thread_id);
85        let seq = self.next_seq::<S>(&checkpoint.thread_id)?;
86        let record = CheckpointRecord {
87            seq,
88            created_at: checkpoint.created_at.clone(),
89            checkpoint: checkpoint.clone(),
90        };
91        let mut file = OpenOptions::new()
92            .create(true)
93            .append(true)
94            .open(&path)
95            .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
96        let line = serde_json::to_string(&record)
97            .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
98        file.write_all(format!("{line}\n").as_bytes())
99            .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
100        Ok(())
101    }
102
103    async fn load(&self, thread_id: &str) -> Result<Option<Checkpoint<S>>, WesichainError> {
104        let path = self.thread_path(thread_id);
105        if !path.exists() {
106            return Ok(None);
107        }
108        let file =
109            File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
110        let reader = BufReader::new(file);
111        let mut last: Option<CheckpointRecord<S>> = None;
112        for line in reader.lines() {
113            let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
114            if line.trim().is_empty() {
115                continue;
116            }
117            last = Some(
118                serde_json::from_str(&line)
119                    .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?,
120            );
121        }
122        Ok(last.map(|record| record.checkpoint))
123    }
124}
125
126#[async_trait::async_trait]
127impl<S: StateSchema> HistoryCheckpointer<S> for FileCheckpointer {
128    async fn list_checkpoints(
129        &self,
130        thread_id: &str,
131    ) -> Result<Vec<CheckpointMetadata>, WesichainError> {
132        let path = self.thread_path(thread_id);
133        if !path.exists() {
134            return Ok(Vec::new());
135        }
136        let file =
137            File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
138        let reader = BufReader::new(file);
139        let mut history = Vec::new();
140        for line in reader.lines() {
141            let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
142            if line.trim().is_empty() {
143                continue;
144            }
145            let record: CheckpointRecord<S> = serde_json::from_str(&line)
146                .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
147            history.push(CheckpointMetadata {
148                seq: record.seq,
149                created_at: record.created_at,
150            });
151        }
152        Ok(history)
153    }
154}