bob_adapters/
artifact_file.rs1use std::{
4 path::{Path, PathBuf},
5 time::{SystemTime, UNIX_EPOCH},
6};
7
8use bob_core::{
9 error::StoreError,
10 ports::ArtifactStorePort,
11 types::{ArtifactRecord, SessionId},
12};
13
14#[derive(Debug)]
16pub struct FileArtifactStore {
17 root: PathBuf,
18 cache: scc::HashMap<SessionId, Vec<ArtifactRecord>>,
19 write_guard: tokio::sync::Mutex<()>,
20}
21
22impl FileArtifactStore {
23 pub fn new(root: PathBuf) -> Result<Self, StoreError> {
28 std::fs::create_dir_all(&root)
29 .map_err(|err| StoreError::Backend(format!("failed to create artifact dir: {err}")))?;
30 Ok(Self { root, cache: scc::HashMap::new(), write_guard: tokio::sync::Mutex::new(()) })
31 }
32
33 fn artifact_path(&self, session_id: &SessionId) -> PathBuf {
34 self.root.join(format!("{}.json", encode_session_id(session_id)))
35 }
36
37 fn temp_path_for(final_path: &Path) -> PathBuf {
38 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
39 final_path.with_extension(format!("json.tmp.{}.{}", std::process::id(), nanos))
40 }
41
42 fn quarantine_path_for(path: &Path) -> PathBuf {
43 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos();
44 let filename = path.file_name().and_then(std::ffi::OsStr::to_str).unwrap_or("artifacts");
45 path.with_file_name(format!("{filename}.corrupt.{}.{}", std::process::id(), nanos))
46 }
47
48 async fn quarantine_corrupt_file(path: &Path) -> Result<PathBuf, StoreError> {
49 let quarantine_path = Self::quarantine_path_for(path);
50 tokio::fs::rename(path, &quarantine_path).await.map_err(|err| {
51 StoreError::Backend(format!(
52 "failed to quarantine corrupted artifacts '{}': {err}",
53 path.display()
54 ))
55 })?;
56 Ok(quarantine_path)
57 }
58
59 async fn load_from_disk(
60 &self,
61 session_id: &SessionId,
62 ) -> Result<Vec<ArtifactRecord>, StoreError> {
63 let path = self.artifact_path(session_id);
64 let raw = match tokio::fs::read(&path).await {
65 Ok(raw) => raw,
66 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
67 Err(err) => {
68 return Err(StoreError::Backend(format!(
69 "failed to read artifacts '{}': {err}",
70 path.display()
71 )));
72 }
73 };
74
75 let artifacts = if let Ok(value) = serde_json::from_slice::<Vec<ArtifactRecord>>(&raw) {
76 value
77 } else {
78 let _ = Self::quarantine_corrupt_file(&path).await?;
79 return Ok(Vec::new());
80 };
81 Ok(artifacts)
82 }
83
84 async fn save_to_disk(
85 &self,
86 session_id: &SessionId,
87 records: &[ArtifactRecord],
88 ) -> Result<(), StoreError> {
89 let final_path = self.artifact_path(session_id);
90 let temp_path = Self::temp_path_for(&final_path);
91 let bytes = serde_json::to_vec_pretty(records)
92 .map_err(|err| StoreError::Serialization(err.to_string()))?;
93
94 tokio::fs::write(&temp_path, bytes).await.map_err(|err| {
95 StoreError::Backend(format!(
96 "failed to write temp artifacts '{}': {err}",
97 temp_path.display()
98 ))
99 })?;
100
101 if let Err(rename_err) = tokio::fs::rename(&temp_path, &final_path).await {
102 if path_exists(&final_path).await {
103 tokio::fs::remove_file(&final_path).await.map_err(|remove_err| {
104 StoreError::Backend(format!(
105 "failed to replace existing artifacts '{}' after rename error '{rename_err}': {remove_err}",
106 final_path.display()
107 ))
108 })?;
109 tokio::fs::rename(&temp_path, &final_path).await.map_err(|err| {
110 StoreError::Backend(format!(
111 "failed to replace artifacts '{}' after fallback remove: {err}",
112 final_path.display()
113 ))
114 })?;
115 } else {
116 return Err(StoreError::Backend(format!(
117 "failed to persist artifacts '{}': {rename_err}",
118 final_path.display()
119 )));
120 }
121 }
122 Ok(())
123 }
124}
125
126#[async_trait::async_trait]
127impl ArtifactStorePort for FileArtifactStore {
128 async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
129 let session_id = artifact.session_id.clone();
130 let _lock = self.write_guard.lock().await;
131
132 let mut records = if let Some(cached) =
133 self.cache.read_async(&session_id, |_k, value| value.clone()).await
134 {
135 cached
136 } else {
137 self.load_from_disk(&session_id).await?
138 };
139 records.push(artifact);
140 self.save_to_disk(&session_id, &records).await?;
141
142 let entry = self.cache.entry_async(session_id).await;
143 match entry {
144 scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(&records),
145 scc::hash_map::Entry::Vacant(vac) => {
146 let _ = vac.insert_entry(records);
147 }
148 }
149 Ok(())
150 }
151
152 async fn list_by_session(
153 &self,
154 session_id: &SessionId,
155 ) -> Result<Vec<ArtifactRecord>, StoreError> {
156 if let Some(cached) = self.cache.read_async(session_id, |_k, value| value.clone()).await {
157 return Ok(cached);
158 }
159
160 let records = self.load_from_disk(session_id).await?;
161 let entry = self.cache.entry_async(session_id.clone()).await;
162 match entry {
163 scc::hash_map::Entry::Occupied(mut occ) => occ.get_mut().clone_from(&records),
164 scc::hash_map::Entry::Vacant(vac) => {
165 let _ = vac.insert_entry(records.clone());
166 }
167 }
168 Ok(records)
169 }
170}
171
172fn encode_session_id(session_id: &str) -> String {
173 if session_id.is_empty() {
174 return "session".to_string();
175 }
176
177 let mut encoded = String::with_capacity(session_id.len().saturating_mul(2));
178 for byte in session_id.as_bytes() {
179 use std::fmt::Write as _;
180 let _ = write!(&mut encoded, "{byte:02x}");
181 }
182 encoded
183}
184
185async fn path_exists(path: &Path) -> bool {
186 tokio::fs::metadata(path).await.is_ok()
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[tokio::test]
194 async fn list_missing_session_returns_empty() {
195 let dir = tempfile::tempdir();
196 assert!(dir.is_ok());
197 let dir = match dir {
198 Ok(value) => value,
199 Err(_) => return,
200 };
201 let store = FileArtifactStore::new(dir.path().to_path_buf());
202 assert!(store.is_ok());
203 let store = match store {
204 Ok(value) => value,
205 Err(_) => return,
206 };
207
208 let listed = store.list_by_session(&"missing".to_string()).await;
209 assert!(listed.is_ok());
210 assert!(listed.ok().is_some_and(|items| items.is_empty()));
211 }
212
213 #[tokio::test]
214 async fn roundtrip_persists_across_store_recreation() {
215 let dir = tempfile::tempdir();
216 assert!(dir.is_ok());
217 let dir = match dir {
218 Ok(value) => value,
219 Err(_) => return,
220 };
221
222 let first = FileArtifactStore::new(dir.path().to_path_buf());
223 assert!(first.is_ok());
224 let first = match first {
225 Ok(value) => value,
226 Err(_) => return,
227 };
228 let inserted = first
229 .put(ArtifactRecord {
230 session_id: "s/1".to_string(),
231 kind: "tool_result".to_string(),
232 name: "search".to_string(),
233 content: serde_json::json!({"hits": 3}),
234 })
235 .await;
236 assert!(inserted.is_ok());
237
238 let second = FileArtifactStore::new(dir.path().to_path_buf());
239 assert!(second.is_ok());
240 let second = match second {
241 Ok(value) => value,
242 Err(_) => return,
243 };
244 let listed = second.list_by_session(&"s/1".to_string()).await;
245 assert!(listed.is_ok());
246 assert_eq!(listed.ok().map(|items| items.len()), Some(1));
247 }
248
249 #[tokio::test]
250 async fn corrupted_artifact_snapshot_is_quarantined_and_treated_as_empty() {
251 let dir = tempfile::tempdir();
252 assert!(dir.is_ok());
253 let dir = match dir {
254 Ok(value) => value,
255 Err(_) => return,
256 };
257
258 let session_id = "broken-artifacts".to_string();
259 let encoded = encode_session_id(&session_id);
260 let artifact_path = dir.path().join(format!("{encoded}.json"));
261 let write = tokio::fs::write(&artifact_path, b"{not-json").await;
262 assert!(write.is_ok());
263
264 let store = FileArtifactStore::new(dir.path().to_path_buf());
265 assert!(store.is_ok());
266 let store = match store {
267 Ok(value) => value,
268 Err(_) => return,
269 };
270 let listed = store.list_by_session(&session_id).await;
271 assert!(listed.is_ok());
272 assert!(listed.ok().is_some_and(|records| records.is_empty()));
273 assert!(!artifact_path.exists());
274
275 let mut has_quarantine = false;
276 let read_dir = std::fs::read_dir(dir.path());
277 assert!(read_dir.is_ok());
278 if let Ok(entries) = read_dir {
279 for entry in entries.flatten() {
280 let name = entry.file_name().to_string_lossy().to_string();
281 if name.contains(".corrupt.") {
282 has_quarantine = true;
283 break;
284 }
285 }
286 }
287 assert!(has_quarantine);
288 }
289}