Skip to main content

bob_adapters/
artifact_file.rs

1//! File-backed artifact store adapter.
2
3use std::{
4    path::{Path, PathBuf},
5    time::{SystemTime, UNIX_EPOCH},
6};
7
8use bob_core::{
9    error::StoreError,
10    ports::ArtifactStorePort,
11    types::{ArtifactRecord, SessionId},
12};
13
14/// Durable artifact store backed by per-session JSON snapshots.
15#[derive(Debug)]
16pub struct FileArtifactStore {
17    root: PathBuf,
18    cache: scc::HashMap<SessionId, Vec<ArtifactRecord>>,
19    write_guard: tokio::sync::Mutex<()>,
20}
21
22impl FileArtifactStore {
23    /// Create a file-backed artifact store rooted at `root`.
24    ///
25    /// # Errors
26    /// Returns a backend error when the root directory cannot be created.
27    pub fn new(root: PathBuf) -> Result<Self, StoreError> {
28        std::fs::create_dir_all(&root)
29            .map_err(|err| StoreError::Backend(format!("failed to create artifact dir: {err}")))?;
30        Ok(Self { root, cache: scc::HashMap::new(), write_guard: tokio::sync::Mutex::new(()) })
31    }
32
33    fn artifact_path(&self, session_id: &SessionId) -> PathBuf {
34        self.root.join(format!("{}.json", encode_session_id(session_id)))
35    }
36
37    fn temp_path_for(final_path: &Path) -> PathBuf {
38        let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
39        final_path.with_extension(format!("json.tmp.{}.{}", std::process::id(), nanos))
40    }
41
42    fn quarantine_path_for(path: &Path) -> PathBuf {
43        let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
44        let filename = path.file_name().and_then(std::ffi::OsStr::to_str).unwrap_or("artifacts");
45        path.with_file_name(format!("{filename}.corrupt.{}.{}", std::process::id(), nanos))
46    }
47
48    async fn quarantine_corrupt_file(path: &Path) -> Result<PathBuf, StoreError> {
49        let quarantine_path = Self::quarantine_path_for(path);
50        tokio::fs::rename(path, &quarantine_path).await.map_err(|err| {
51            StoreError::Backend(format!(
52                "failed to quarantine corrupted artifacts '{}': {err}",
53                path.display()
54            ))
55        })?;
56        Ok(quarantine_path)
57    }
58
59    async fn load_from_disk(
60        &self,
61        session_id: &SessionId,
62    ) -> Result<Vec<ArtifactRecord>, StoreError> {
63        let path = self.artifact_path(session_id);
64        let raw = match tokio::fs::read(&path).await {
65            Ok(raw) => raw,
66            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
67            Err(err) => {
68                return Err(StoreError::Backend(format!(
69                    "failed to read artifacts '{}': {err}",
70                    path.display()
71                )));
72            }
73        };
74
75        let artifacts = if let Ok(value) = serde_json::from_slice::<Vec<ArtifactRecord>>(&raw) {
76            value
77        } else {
78            let _ = Self::quarantine_corrupt_file(&path).await?;
79            return Ok(Vec::new());
80        };
81        Ok(artifacts)
82    }
83
84    async fn save_to_disk(
85        &self,
86        session_id: &SessionId,
87        records: &[ArtifactRecord],
88    ) -> Result<(), StoreError> {
89        let final_path = self.artifact_path(session_id);
90        let temp_path = Self::temp_path_for(&final_path);
91        let bytes = serde_json::to_vec_pretty(records)
92            .map_err(|err| StoreError::Serialization(err.to_string()))?;
93
94        tokio::fs::write(&temp_path, bytes).await.map_err(|err| {
95            StoreError::Backend(format!(
96                "failed to write temp artifacts '{}': {err}",
97                temp_path.display()
98            ))
99        })?;
100
101        if let Err(rename_err) = tokio::fs::rename(&temp_path, &final_path).await {
102            if path_exists(&final_path).await {
103                tokio::fs::remove_file(&final_path).await.map_err(|remove_err| {
104                    StoreError::Backend(format!(
105                        "failed to replace existing artifacts '{}' after rename error '{rename_err}': {remove_err}",
106                        final_path.display()
107                    ))
108                })?;
109                tokio::fs::rename(&temp_path, &final_path).await.map_err(|err| {
110                    StoreError::Backend(format!(
111                        "failed to replace artifacts '{}' after fallback remove: {err}",
112                        final_path.display()
113                    ))
114                })?;
115            } else {
116                return Err(StoreError::Backend(format!(
117                    "failed to persist artifacts '{}': {rename_err}",
118                    final_path.display()
119                )));
120            }
121        }
122        Ok(())
123    }
124}
125
126#[async_trait::async_trait]
127impl ArtifactStorePort for FileArtifactStore {
128    async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
129        let session_id = artifact.session_id.clone();
130        let _lock = self.write_guard.lock().await;
131
132        let mut records = if let Some(cached) =
133            self.cache.read_async(&session_id, |_k, value| value.clone()).await
134        {
135            cached
136        } else {
137            self.load_from_disk(&session_id).await?
138        };
139        records.push(artifact);
140        self.save_to_disk(&session_id, &records).await?;
141
142        let entry = self.cache.entry_async(session_id).await;
143        match entry {
144            scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(&records),
145            scc::hash_map::Entry::Vacant(vac) => {
146                let _ = vac.insert_entry(records);
147            }
148        }
149        Ok(())
150    }
151
152    async fn list_by_session(
153        &self,
154        session_id: &SessionId,
155    ) -> Result<Vec<ArtifactRecord>, StoreError> {
156        if let Some(cached) = self.cache.read_async(session_id, |_k, value| value.clone()).await {
157            return Ok(cached);
158        }
159
160        let records = self.load_from_disk(session_id).await?;
161        let entry = self.cache.entry_async(session_id.clone()).await;
162        match entry {
163            scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(&records),
164            scc::hash_map::Entry::Vacant(vac) => {
165                let _ = vac.insert_entry(records.clone());
166            }
167        }
168        Ok(records)
169    }
170}
171
172fn encode_session_id(session_id: &str) -> String {
173    if session_id.is_empty() {
174        return "session".to_string();
175    }
176
177    let mut encoded = String::with_capacity(session_id.len().saturating_mul(2));
178    for byte in session_id.as_bytes() {
179        use std::fmt::Write as _;
180        let _ = write!(&mut encoded, "{byte:02x}");
181    }
182    encoded
183}
184
185async fn path_exists(path: &Path) -> bool {
186    tokio::fs::metadata(path).await.is_ok()
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[tokio::test]
194    async fn list_missing_session_returns_empty() {
195        let dir = tempfile::tempdir();
196        assert!(dir.is_ok());
197        let dir = match dir {
198            Ok(value) => value,
199            Err(_) => return,
200        };
201        let store = FileArtifactStore::new(dir.path().to_path_buf());
202        assert!(store.is_ok());
203        let store = match store {
204            Ok(value) => value,
205            Err(_) => return,
206        };
207
208        let listed = store.list_by_session(&"missing".to_string()).await;
209        assert!(listed.is_ok());
210        assert!(listed.ok().is_some_and(|items| items.is_empty()));
211    }
212
213    #[tokio::test]
214    async fn roundtrip_persists_across_store_recreation() {
215        let dir = tempfile::tempdir();
216        assert!(dir.is_ok());
217        let dir = match dir {
218            Ok(value) => value,
219            Err(_) => return,
220        };
221
222        let first = FileArtifactStore::new(dir.path().to_path_buf());
223        assert!(first.is_ok());
224        let first = match first {
225            Ok(value) => value,
226            Err(_) => return,
227        };
228        let inserted = first
229            .put(ArtifactRecord {
230                session_id: "s/1".to_string(),
231                kind: "tool_result".to_string(),
232                name: "search".to_string(),
233                content: serde_json::json!({"hits": 3}),
234            })
235            .await;
236        assert!(inserted.is_ok());
237
238        let second = FileArtifactStore::new(dir.path().to_path_buf());
239        assert!(second.is_ok());
240        let second = match second {
241            Ok(value) => value,
242            Err(_) => return,
243        };
244        let listed = second.list_by_session(&"s/1".to_string()).await;
245        assert!(listed.is_ok());
246        assert_eq!(listed.ok().map(|items| items.len()), Some(1));
247    }
248
249    #[tokio::test]
250    async fn corrupted_artifact_snapshot_is_quarantined_and_treated_as_empty() {
251        let dir = tempfile::tempdir();
252        assert!(dir.is_ok());
253        let dir = match dir {
254            Ok(value) => value,
255            Err(_) => return,
256        };
257
258        let session_id = "broken-artifacts".to_string();
259        let encoded = encode_session_id(&session_id);
260        let artifact_path = dir.path().join(format!("{encoded}.json"));
261        let write = tokio::fs::write(&artifact_path, b"{not-json").await;
262        assert!(write.is_ok());
263
264        let store = FileArtifactStore::new(dir.path().to_path_buf());
265        assert!(store.is_ok());
266        let store = match store {
267            Ok(value) => value,
268            Err(_) => return,
269        };
270        let listed = store.list_by_session(&session_id).await;
271        assert!(listed.is_ok());
272        assert!(listed.ok().is_some_and(|records| records.is_empty()));
273        assert!(!artifact_path.exists());
274
275        let mut has_quarantine = false;
276        let read_dir = std::fs::read_dir(dir.path());
277        assert!(read_dir.is_ok());
278        if let Ok(entries) = read_dir {
279            for entry in entries.flatten() {
280                let name = entry.file_name().to_string_lossy().to_string();
281                if name.contains(".corrupt.") {
282                    has_quarantine = true;
283                    break;
284                }
285            }
286        }
287        assert!(has_quarantine);
288    }
289}