1use std::{
4 path::{Path, PathBuf},
5 time::{SystemTime, UNIX_EPOCH},
6};
7
8use bob_core::{
9 error::StoreError,
10 ports::SessionStore,
11 types::{SessionId, SessionState},
12};
13
14#[derive(Debug)]
19pub struct FileSessionStore {
20 root: PathBuf,
21 cache: scc::HashMap<SessionId, SessionState>,
22 write_guard: tokio::sync::Mutex<()>,
23}
24
25impl FileSessionStore {
26 pub fn new(root: PathBuf) -> Result<Self, StoreError> {
31 std::fs::create_dir_all(&root)
32 .map_err(|err| StoreError::Backend(format!("failed to create store dir: {err}")))?;
33 Ok(Self { root, cache: scc::HashMap::new(), write_guard: tokio::sync::Mutex::new(()) })
34 }
35
36 fn session_path(&self, session_id: &SessionId) -> PathBuf {
37 self.root.join(format!("{}.json", encode_session_id(session_id)))
38 }
39
40 fn temp_path_for(final_path: &Path) -> PathBuf {
41 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
42 final_path.with_extension(format!("json.tmp.{}.{}", std::process::id(), nanos))
43 }
44
45 fn quarantine_path_for(path: &Path) -> PathBuf {
46 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
47 let filename = path.file_name().and_then(std::ffi::OsStr::to_str).unwrap_or("snapshot");
48 path.with_file_name(format!("{filename}.corrupt.{}.{}", std::process::id(), nanos))
49 }
50
51 async fn quarantine_corrupt_file(path: &Path) -> Result<PathBuf, StoreError> {
52 let quarantine_path = Self::quarantine_path_for(path);
53 tokio::fs::rename(path, &quarantine_path).await.map_err(|err| {
54 StoreError::Backend(format!(
55 "failed to quarantine corrupted snapshot '{}': {err}",
56 path.display()
57 ))
58 })?;
59 Ok(quarantine_path)
60 }
61
62 async fn load_from_disk(
63 &self,
64 session_id: &SessionId,
65 ) -> Result<Option<SessionState>, StoreError> {
66 let path = self.session_path(session_id);
67 let raw = match tokio::fs::read(&path).await {
68 Ok(raw) => raw,
69 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
70 Err(err) => {
71 return Err(StoreError::Backend(format!(
72 "failed to read session snapshot '{}': {err}",
73 path.display()
74 )));
75 }
76 };
77
78 let state = if let Ok(value) = serde_json::from_slice::<SessionState>(&raw) {
79 value
80 } else {
81 let _ = Self::quarantine_corrupt_file(&path).await?;
82 return Ok(None);
83 };
84 Ok(Some(state))
85 }
86
87 async fn save_to_disk(
88 &self,
89 session_id: &SessionId,
90 state: &SessionState,
91 ) -> Result<(), StoreError> {
92 let final_path = self.session_path(session_id);
93 let temp_path = Self::temp_path_for(&final_path);
94 let bytes = serde_json::to_vec_pretty(state)
95 .map_err(|err| StoreError::Serialization(err.to_string()))?;
96
97 tokio::fs::write(&temp_path, bytes).await.map_err(|err| {
98 StoreError::Backend(format!(
99 "failed to write temp session snapshot '{}': {err}",
100 temp_path.display()
101 ))
102 })?;
103
104 if let Err(rename_err) = tokio::fs::rename(&temp_path, &final_path).await {
106 if path_exists(&final_path).await {
107 tokio::fs::remove_file(&final_path).await.map_err(|remove_err| {
108 StoreError::Backend(format!(
109 "failed to replace existing session snapshot '{}' after rename error '{rename_err}': {remove_err}",
110 final_path.display()
111 ))
112 })?;
113 tokio::fs::rename(&temp_path, &final_path).await.map_err(|err| {
114 StoreError::Backend(format!(
115 "failed to replace session snapshot '{}' after fallback remove: {err}",
116 final_path.display()
117 ))
118 })?;
119 } else {
120 return Err(StoreError::Backend(format!(
121 "failed to atomically replace session snapshot '{}': {rename_err}",
122 final_path.display()
123 )));
124 }
125 }
126 Ok(())
127 }
128}
129
130#[async_trait::async_trait]
131impl SessionStore for FileSessionStore {
132 async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
133 if let Some(state) = self.cache.read_async(id, |_key, value| value.clone()).await {
134 return Ok(Some(state));
135 }
136
137 let loaded = self.load_from_disk(id).await?;
138 if let Some(ref state) = loaded {
139 let entry = self.cache.entry_async(id.clone()).await;
140 match entry {
141 scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(state),
142 scc::hash_map::Entry::Vacant(vac) => {
143 let _ = vac.insert_entry(state.clone());
144 }
145 }
146 }
147 Ok(loaded)
148 }
149
150 async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
151 let _lock = self.write_guard.lock().await;
152 let mut updated = state.clone();
153 let current_version = self.cache.read_async(id, |_k, v| v.version).await.unwrap_or(0);
155 updated.version = current_version.saturating_add(1);
156 self.save_to_disk(id, &updated).await?;
157
158 let entry = self.cache.entry_async(id.clone()).await;
159 match entry {
160 scc::hash_map::Entry::Occupied(mut occ) => {
161 occ.get_mut().clone_from(&updated);
162 }
163 scc::hash_map::Entry::Vacant(vac) => {
164 let _ = vac.insert_entry(updated);
165 }
166 }
167 Ok(())
168 }
169
170 async fn save_if_version(
171 &self,
172 id: &SessionId,
173 state: &SessionState,
174 expected_version: u64,
175 ) -> Result<u64, StoreError> {
176 let _lock = self.write_guard.lock().await;
177 let current_version = if let Some(v) = self.cache.read_async(id, |_k, v| v.version).await {
179 v
180 } else {
181 let loaded = self.load_from_disk(id).await?;
182 loaded.map_or(0, |s| s.version)
183 };
184
185 if current_version != expected_version {
186 return Err(StoreError::VersionConflict {
187 expected: expected_version,
188 actual: current_version,
189 });
190 }
191
192 let new_version = expected_version.saturating_add(1);
193 let mut updated = state.clone();
194 updated.version = new_version;
195 self.save_to_disk(id, &updated).await?;
196
197 let entry = self.cache.entry_async(id.clone()).await;
198 match entry {
199 scc::hash_map::Entry::Occupied(mut occ) => {
200 occ.get_mut().clone_from(&updated);
201 }
202 scc::hash_map::Entry::Vacant(vac) => {
203 let _ = vac.insert_entry(updated);
204 }
205 }
206 Ok(new_version)
207 }
208}
209
210fn encode_session_id(session_id: &str) -> String {
211 if session_id.is_empty() {
212 return "session".to_string();
213 }
214
215 let mut encoded = String::with_capacity(session_id.len().saturating_mul(2));
217 for byte in session_id.as_bytes() {
218 use std::fmt::Write as _;
219 let _ = write!(&mut encoded, "{byte:02x}");
220 }
221 encoded
222}
223
224async fn path_exists(path: &Path) -> bool {
225 tokio::fs::metadata(path).await.is_ok()
226}
227
228#[cfg(test)]
229mod tests {
230 use bob_core::types::{Message, Role};
231
232 use super::*;
233
234 #[tokio::test]
235 async fn missing_session_returns_none() {
236 let temp_dir = tempfile::tempdir();
237 assert!(temp_dir.is_ok(), "tempdir should be created");
238 let temp_dir = match temp_dir {
239 Ok(value) => value,
240 Err(_) => return,
241 };
242 let store = FileSessionStore::new(temp_dir.path().to_path_buf());
243 assert!(store.is_ok(), "store should initialize");
244 let store = match store {
245 Ok(value) => value,
246 Err(_) => return,
247 };
248
249 let loaded = store.load(&"missing".to_string()).await;
250 assert!(loaded.is_ok(), "load should not fail for missing sessions");
251 assert!(loaded.ok().flatten().is_none());
252 }
253
254 #[tokio::test]
255 async fn roundtrip_persists_across_store_recreation() {
256 let temp_dir = tempfile::tempdir();
257 assert!(temp_dir.is_ok(), "tempdir should be created");
258 let temp_dir = match temp_dir {
259 Ok(value) => value,
260 Err(_) => return,
261 };
262 let session_id = "cli/session:1".to_string();
263 let state = SessionState {
264 messages: vec![Message::text(Role::User, "hello")],
265 total_usage: bob_core::types::TokenUsage { prompt_tokens: 4, completion_tokens: 2 },
266 ..Default::default()
267 };
268
269 let first = FileSessionStore::new(temp_dir.path().to_path_buf());
270 assert!(first.is_ok(), "first store should initialize");
271 let first = match first {
272 Ok(value) => value,
273 Err(_) => return,
274 };
275 let saved = first.save(&session_id, &state).await;
276 assert!(saved.is_ok(), "save should succeed");
277
278 let second = FileSessionStore::new(temp_dir.path().to_path_buf());
279 assert!(second.is_ok(), "second store should initialize");
280 let second = match second {
281 Ok(value) => value,
282 Err(_) => return,
283 };
284 let loaded = second.load(&session_id).await;
285 assert!(loaded.is_ok(), "load should succeed after restart");
286 let loaded = loaded.ok().flatten();
287 assert!(loaded.is_some(), "session should exist");
288 let loaded = loaded.unwrap_or_default();
289 assert_eq!(loaded.messages.len(), 1);
290 assert_eq!(loaded.total_usage.total(), 6);
291 }
292
293 #[tokio::test]
294 async fn corrupted_snapshot_is_quarantined_and_treated_as_missing() {
295 let temp_dir = tempfile::tempdir();
296 assert!(temp_dir.is_ok(), "tempdir should be created");
297 let temp_dir = match temp_dir {
298 Ok(value) => value,
299 Err(_) => return,
300 };
301
302 let session_id = "broken-session".to_string();
303 let encoded = encode_session_id(&session_id);
304 let snapshot_path = temp_dir.path().join(format!("{encoded}.json"));
305 let write = tokio::fs::write(&snapshot_path, b"{not-json").await;
306 assert!(write.is_ok(), "fixture should be written");
307
308 let store = FileSessionStore::new(temp_dir.path().to_path_buf());
309 assert!(store.is_ok(), "store should initialize");
310 let store = match store {
311 Ok(value) => value,
312 Err(_) => return,
313 };
314 let loaded = store.load(&session_id).await;
315 assert!(loaded.is_ok(), "load should recover from corruption");
316 assert!(loaded.ok().flatten().is_none(), "corrupted session should be treated as missing");
317 assert!(
318 !snapshot_path.exists(),
319 "corrupted snapshot should be moved out of the primary location"
320 );
321
322 let mut has_quarantine = false;
323 let read_dir = std::fs::read_dir(temp_dir.path());
324 assert!(read_dir.is_ok());
325 if let Ok(entries) = read_dir {
326 for entry in entries.flatten() {
327 let name = entry.file_name().to_string_lossy().to_string();
328 if name.contains(".corrupt.") {
329 has_quarantine = true;
330 break;
331 }
332 }
333 }
334 assert!(has_quarantine, "corrupted snapshot should be quarantined");
335 }
336
337 #[tokio::test]
338 async fn session_id_path_is_sanitized() {
339 let temp_dir = tempfile::tempdir();
340 assert!(temp_dir.is_ok(), "tempdir should be created");
341 let temp_dir = match temp_dir {
342 Ok(value) => value,
343 Err(_) => return,
344 };
345 let store = FileSessionStore::new(temp_dir.path().to_path_buf());
346 assert!(store.is_ok(), "store should initialize");
347 let store = match store {
348 Ok(value) => value,
349 Err(_) => return,
350 };
351 let session_id = "../escape?*id".to_string();
352 let state = SessionState::default();
353
354 let saved = store.save(&session_id, &state).await;
355 assert!(saved.is_ok(), "save should succeed");
356 let path = store.session_path(&session_id);
357 assert!(path.starts_with(temp_dir.path()), "path must stay in store root");
358 }
359
360 #[test]
361 fn session_id_encoding_is_collision_resistant_for_common_cases() {
362 let a = encode_session_id("a/b");
363 let b = encode_session_id("a?b");
364 assert_ne!(a, b, "different session ids should map to different filenames");
365 }
366}