wesichain_graph/
file_checkpointer.rs1use ahash::RandomState;
2use std::fs::{self, File, OpenOptions};
3
4use std::io::{BufRead, BufReader, Write};
5use std::path::{Path, PathBuf};
6
7use serde::{Deserialize, Serialize};
8
9use crate::{Checkpoint, CheckpointMetadata, Checkpointer, HistoryCheckpointer, StateSchema};
10use wesichain_core::WesichainError;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
13#[serde(bound = "S: StateSchema")]
14pub struct CheckpointRecord<S: StateSchema> {
15 pub seq: u64,
16 pub created_at: String,
17 pub checkpoint: Checkpoint<S>,
18}
19
20#[derive(Clone, Debug)]
21pub struct FileCheckpointer {
22 base_dir: PathBuf,
23}
24
25impl FileCheckpointer {
26 pub fn new<P: AsRef<Path>>(base_dir: P) -> Self {
27 Self {
28 base_dir: base_dir.as_ref().to_path_buf(),
29 }
30 }
31
32 fn sanitize_thread_id(thread_id: &str) -> String {
33 let mut out = String::with_capacity(thread_id.len());
34 for ch in thread_id.chars() {
35 match ch {
36 '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => out.push('_'),
37 c if c.is_control() => {}
38 c => out.push(c),
39 }
40 }
41 let trimmed = out.trim_matches(|c: char| c == '.' || c.is_whitespace() || c == '_');
42 if trimmed.is_empty() {
43 let hash = RandomState::with_seeds(0x517cc1b727220a95, 0x6ed9eba1999cd92d, 0, 0)
44 .hash_one(thread_id);
45 return format!("thread-{:08x}", hash);
46 }
47 trimmed.to_string()
48 }
49
50 fn thread_path(&self, thread_id: &str) -> PathBuf {
51 let filename = format!("{}.jsonl", Self::sanitize_thread_id(thread_id));
52 self.base_dir.join(filename)
53 }
54
55 fn next_seq<S: StateSchema>(&self, thread_id: &str) -> Result<u64, WesichainError> {
56 let path = self.thread_path(thread_id);
57 if !path.exists() {
58 return Ok(1);
59 }
60 let file =
61 File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
62 let reader = BufReader::new(file);
63 let mut last: Option<CheckpointRecord<S>> = None;
64 for line in reader.lines() {
65 let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
66 if line.trim().is_empty() {
67 continue;
68 }
69 last = Some(
70 serde_json::from_str(&line)
71 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?,
72 );
73 }
74 Ok(last.map(|record| record.seq + 1).unwrap_or(1))
75 }
76}
77
78#[async_trait::async_trait]
79impl<S: StateSchema> Checkpointer<S> for FileCheckpointer {
80 async fn save(&self, checkpoint: &Checkpoint<S>) -> Result<(), WesichainError> {
81 fs::create_dir_all(&self.base_dir)
82 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
83
84 let path = self.thread_path(&checkpoint.thread_id);
85 let seq = self.next_seq::<S>(&checkpoint.thread_id)?;
86 let record = CheckpointRecord {
87 seq,
88 created_at: checkpoint.created_at.clone(),
89 checkpoint: checkpoint.clone(),
90 };
91 let mut file = OpenOptions::new()
92 .create(true)
93 .append(true)
94 .open(&path)
95 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
96 let line = serde_json::to_string(&record)
97 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
98 file.write_all(format!("{line}\n").as_bytes())
99 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
100 Ok(())
101 }
102
103 async fn load(&self, thread_id: &str) -> Result<Option<Checkpoint<S>>, WesichainError> {
104 let path = self.thread_path(thread_id);
105 if !path.exists() {
106 return Ok(None);
107 }
108 let file =
109 File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
110 let reader = BufReader::new(file);
111 let mut last: Option<CheckpointRecord<S>> = None;
112 for line in reader.lines() {
113 let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
114 if line.trim().is_empty() {
115 continue;
116 }
117 last = Some(
118 serde_json::from_str(&line)
119 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?,
120 );
121 }
122 Ok(last.map(|record| record.checkpoint))
123 }
124}
125
126#[async_trait::async_trait]
127impl<S: StateSchema> HistoryCheckpointer<S> for FileCheckpointer {
128 async fn list_checkpoints(
129 &self,
130 thread_id: &str,
131 ) -> Result<Vec<CheckpointMetadata>, WesichainError> {
132 let path = self.thread_path(thread_id);
133 if !path.exists() {
134 return Ok(Vec::new());
135 }
136 let file =
137 File::open(&path).map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
138 let reader = BufReader::new(file);
139 let mut history = Vec::new();
140 for line in reader.lines() {
141 let line = line.map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
142 if line.trim().is_empty() {
143 continue;
144 }
145 let record: CheckpointRecord<S> = serde_json::from_str(&line)
146 .map_err(|err| WesichainError::CheckpointFailed(err.to_string()))?;
147 history.push(CheckpointMetadata {
148 seq: record.seq,
149 created_at: record.created_at,
150 });
151 }
152 Ok(history)
153 }
154}