Skip to main content

bob_adapters/
store_file.rs

1//! File-backed session store adapter.
2
3use std::{
4    path::{Path, PathBuf},
5    time::{SystemTime, UNIX_EPOCH},
6};
7
8use bob_core::{
9    error::StoreError,
10    ports::SessionStore,
11    types::{SessionId, SessionState},
12};
13
14/// Durable session store backed by per-session JSON files.
15///
16/// Session snapshots are cached in-memory for fast reads and also persisted on
17/// every save so process restarts can recover context.
18#[derive(Debug)]
19pub struct FileSessionStore {
20    root: PathBuf,
21    cache: scc::HashMap<SessionId, SessionState>,
22    write_guard: tokio::sync::Mutex<()>,
23}
24
25impl FileSessionStore {
26    /// Create a file-backed store rooted at `root`.
27    ///
28    /// # Errors
29    /// Returns a backend error when the root directory cannot be created.
30    pub fn new(root: PathBuf) -> Result<Self, StoreError> {
31        std::fs::create_dir_all(&root)
32            .map_err(|err| StoreError::Backend(format!("failed to create store dir: {err}")))?;
33        Ok(Self { root, cache: scc::HashMap::new(), write_guard: tokio::sync::Mutex::new(()) })
34    }
35
36    fn session_path(&self, session_id: &SessionId) -> PathBuf {
37        self.root.join(format!("{}.json", encode_session_id(session_id)))
38    }
39
40    fn temp_path_for(final_path: &Path) -> PathBuf {
41        let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
42        final_path.with_extension(format!("json.tmp.{}.{}", std::process::id(), nanos))
43    }
44
45    fn quarantine_path_for(path: &Path) -> PathBuf {
46        let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
47        let filename = path.file_name().and_then(std::ffi::OsStr::to_str).unwrap_or("snapshot");
48        path.with_file_name(format!("{filename}.corrupt.{}.{}", std::process::id(), nanos))
49    }
50
51    async fn quarantine_corrupt_file(path: &Path) -> Result<PathBuf, StoreError> {
52        let quarantine_path = Self::quarantine_path_for(path);
53        tokio::fs::rename(path, &quarantine_path).await.map_err(|err| {
54            StoreError::Backend(format!(
55                "failed to quarantine corrupted snapshot '{}': {err}",
56                path.display()
57            ))
58        })?;
59        Ok(quarantine_path)
60    }
61
62    async fn load_from_disk(
63        &self,
64        session_id: &SessionId,
65    ) -> Result<Option<SessionState>, StoreError> {
66        let path = self.session_path(session_id);
67        let raw = match tokio::fs::read(&path).await {
68            Ok(raw) => raw,
69            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
70            Err(err) => {
71                return Err(StoreError::Backend(format!(
72                    "failed to read session snapshot '{}': {err}",
73                    path.display()
74                )));
75            }
76        };
77
78        let state = if let Ok(value) = serde_json::from_slice::<SessionState>(&raw) {
79            value
80        } else {
81            let _ = Self::quarantine_corrupt_file(&path).await?;
82            return Ok(None);
83        };
84        Ok(Some(state))
85    }
86
87    async fn save_to_disk(
88        &self,
89        session_id: &SessionId,
90        state: &SessionState,
91    ) -> Result<(), StoreError> {
92        let final_path = self.session_path(session_id);
93        let temp_path = Self::temp_path_for(&final_path);
94        let bytes = serde_json::to_vec_pretty(state)
95            .map_err(|err| StoreError::Serialization(err.to_string()))?;
96
97        tokio::fs::write(&temp_path, bytes).await.map_err(|err| {
98            StoreError::Backend(format!(
99                "failed to write temp session snapshot '{}': {err}",
100                temp_path.display()
101            ))
102        })?;
103
104        // Prefer direct atomic replace. Fallback to remove+rename only when needed.
105        if let Err(rename_err) = tokio::fs::rename(&temp_path, &final_path).await {
106            if path_exists(&final_path).await {
107                tokio::fs::remove_file(&final_path).await.map_err(|remove_err| {
108                    StoreError::Backend(format!(
109                        "failed to replace existing session snapshot '{}' after rename error '{rename_err}': {remove_err}",
110                        final_path.display()
111                    ))
112                })?;
113                tokio::fs::rename(&temp_path, &final_path).await.map_err(|err| {
114                    StoreError::Backend(format!(
115                        "failed to replace session snapshot '{}' after fallback remove: {err}",
116                        final_path.display()
117                    ))
118                })?;
119            } else {
120                return Err(StoreError::Backend(format!(
121                    "failed to atomically replace session snapshot '{}': {rename_err}",
122                    final_path.display()
123                )));
124            }
125        }
126        Ok(())
127    }
128}
129
130#[async_trait::async_trait]
131impl SessionStore for FileSessionStore {
132    async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
133        if let Some(state) = self.cache.read_async(id, |_key, value| value.clone()).await {
134            return Ok(Some(state));
135        }
136
137        let loaded = self.load_from_disk(id).await?;
138        if let Some(ref state) = loaded {
139            let entry = self.cache.entry_async(id.clone()).await;
140            match entry {
141                scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(state),
142                scc::hash_map::Entry::Vacant(vac) => {
143                    let _ = vac.insert_entry(state.clone());
144                }
145            }
146        }
147        Ok(loaded)
148    }
149
150    async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
151        let _lock = self.write_guard.lock().await;
152        let mut updated = state.clone();
153        // Read current version from cache or disk.
154        let current_version = self.cache.read_async(id, |_k, v| v.version).await.unwrap_or(0);
155        updated.version = current_version.saturating_add(1);
156        self.save_to_disk(id, &updated).await?;
157
158        let entry = self.cache.entry_async(id.clone()).await;
159        match entry {
160            scc::hash_map::Entry::Occupied(mut occ) => {
161                occ.get_mut().clone_from(&updated);
162            }
163            scc::hash_map::Entry::Vacant(vac) => {
164                let _ = vac.insert_entry(updated);
165            }
166        }
167        Ok(())
168    }
169
170    async fn save_if_version(
171        &self,
172        id: &SessionId,
173        state: &SessionState,
174        expected_version: u64,
175    ) -> Result<u64, StoreError> {
176        let _lock = self.write_guard.lock().await;
177        // Read current version from cache or disk.
178        let current_version = if let Some(v) = self.cache.read_async(id, |_k, v| v.version).await {
179            v
180        } else {
181            let loaded = self.load_from_disk(id).await?;
182            loaded.map_or(0, |s| s.version)
183        };
184
185        if current_version != expected_version {
186            return Err(StoreError::VersionConflict {
187                expected: expected_version,
188                actual: current_version,
189            });
190        }
191
192        let new_version = expected_version.saturating_add(1);
193        let mut updated = state.clone();
194        updated.version = new_version;
195        self.save_to_disk(id, &updated).await?;
196
197        let entry = self.cache.entry_async(id.clone()).await;
198        match entry {
199            scc::hash_map::Entry::Occupied(mut occ) => {
200                occ.get_mut().clone_from(&updated);
201            }
202            scc::hash_map::Entry::Vacant(vac) => {
203                let _ = vac.insert_entry(updated);
204            }
205        }
206        Ok(new_version)
207    }
208}
209
210fn encode_session_id(session_id: &str) -> String {
211    if session_id.is_empty() {
212        return "session".to_string();
213    }
214
215    // Hex-encode all bytes to avoid collisions from lossy sanitization.
216    let mut encoded = String::with_capacity(session_id.len().saturating_mul(2));
217    for byte in session_id.as_bytes() {
218        use std::fmt::Write as _;
219        let _ = write!(&mut encoded, "{byte:02x}");
220    }
221    encoded
222}
223
224async fn path_exists(path: &Path) -> bool {
225    tokio::fs::metadata(path).await.is_ok()
226}
227
228#[cfg(test)]
229mod tests {
230    use bob_core::types::{Message, Role};
231
232    use super::*;
233
234    #[tokio::test]
235    async fn missing_session_returns_none() {
236        let temp_dir = tempfile::tempdir();
237        assert!(temp_dir.is_ok(), "tempdir should be created");
238        let temp_dir = match temp_dir {
239            Ok(value) => value,
240            Err(_) => return,
241        };
242        let store = FileSessionStore::new(temp_dir.path().to_path_buf());
243        assert!(store.is_ok(), "store should initialize");
244        let store = match store {
245            Ok(value) => value,
246            Err(_) => return,
247        };
248
249        let loaded = store.load(&"missing".to_string()).await;
250        assert!(loaded.is_ok(), "load should not fail for missing sessions");
251        assert!(loaded.ok().flatten().is_none());
252    }
253
254    #[tokio::test]
255    async fn roundtrip_persists_across_store_recreation() {
256        let temp_dir = tempfile::tempdir();
257        assert!(temp_dir.is_ok(), "tempdir should be created");
258        let temp_dir = match temp_dir {
259            Ok(value) => value,
260            Err(_) => return,
261        };
262        let session_id = "cli/session:1".to_string();
263        let state = SessionState {
264            messages: vec![Message::text(Role::User, "hello")],
265            total_usage: bob_core::types::TokenUsage { prompt_tokens: 4, completion_tokens: 2 },
266            ..Default::default()
267        };
268
269        let first = FileSessionStore::new(temp_dir.path().to_path_buf());
270        assert!(first.is_ok(), "first store should initialize");
271        let first = match first {
272            Ok(value) => value,
273            Err(_) => return,
274        };
275        let saved = first.save(&session_id, &state).await;
276        assert!(saved.is_ok(), "save should succeed");
277
278        let second = FileSessionStore::new(temp_dir.path().to_path_buf());
279        assert!(second.is_ok(), "second store should initialize");
280        let second = match second {
281            Ok(value) => value,
282            Err(_) => return,
283        };
284        let loaded = second.load(&session_id).await;
285        assert!(loaded.is_ok(), "load should succeed after restart");
286        let loaded = loaded.ok().flatten();
287        assert!(loaded.is_some(), "session should exist");
288        let loaded = loaded.unwrap_or_default();
289        assert_eq!(loaded.messages.len(), 1);
290        assert_eq!(loaded.total_usage.total(), 6);
291    }
292
293    #[tokio::test]
294    async fn corrupted_snapshot_is_quarantined_and_treated_as_missing() {
295        let temp_dir = tempfile::tempdir();
296        assert!(temp_dir.is_ok(), "tempdir should be created");
297        let temp_dir = match temp_dir {
298            Ok(value) => value,
299            Err(_) => return,
300        };
301
302        let session_id = "broken-session".to_string();
303        let encoded = encode_session_id(&session_id);
304        let snapshot_path = temp_dir.path().join(format!("{encoded}.json"));
305        let write = tokio::fs::write(&snapshot_path, b"{not-json").await;
306        assert!(write.is_ok(), "fixture should be written");
307
308        let store = FileSessionStore::new(temp_dir.path().to_path_buf());
309        assert!(store.is_ok(), "store should initialize");
310        let store = match store {
311            Ok(value) => value,
312            Err(_) => return,
313        };
314        let loaded = store.load(&session_id).await;
315        assert!(loaded.is_ok(), "load should recover from corruption");
316        assert!(loaded.ok().flatten().is_none(), "corrupted session should be treated as missing");
317        assert!(
318            !snapshot_path.exists(),
319            "corrupted snapshot should be moved out of the primary location"
320        );
321
322        let mut has_quarantine = false;
323        let read_dir = std::fs::read_dir(temp_dir.path());
324        assert!(read_dir.is_ok());
325        if let Ok(entries) = read_dir {
326            for entry in entries.flatten() {
327                let name = entry.file_name().to_string_lossy().to_string();
328                if name.contains(".corrupt.") {
329                    has_quarantine = true;
330                    break;
331                }
332            }
333        }
334        assert!(has_quarantine, "corrupted snapshot should be quarantined");
335    }
336
337    #[tokio::test]
338    async fn session_id_path_is_sanitized() {
339        let temp_dir = tempfile::tempdir();
340        assert!(temp_dir.is_ok(), "tempdir should be created");
341        let temp_dir = match temp_dir {
342            Ok(value) => value,
343            Err(_) => return,
344        };
345        let store = FileSessionStore::new(temp_dir.path().to_path_buf());
346        assert!(store.is_ok(), "store should initialize");
347        let store = match store {
348            Ok(value) => value,
349            Err(_) => return,
350        };
351        let session_id = "../escape?*id".to_string();
352        let state = SessionState::default();
353
354        let saved = store.save(&session_id, &state).await;
355        assert!(saved.is_ok(), "save should succeed");
356        let path = store.session_path(&session_id);
357        assert!(path.starts_with(temp_dir.path()), "path must stay in store root");
358    }
359
360    #[test]
361    fn session_id_encoding_is_collision_resistant_for_common_cases() {
362        let a = encode_session_id("a/b");
363        let b = encode_session_id("a?b");
364        assert_ne!(a, b, "different session ids should map to different filenames");
365    }
366}