Skip to main content

lash_core/attachments/
file_store.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Mutex;
5
6use lash_sansio::{AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef};
7
8use super::{
9    AttachmentStore, AttachmentStoreError, AttachmentStorePersistence, StoredAttachment, content_id,
10};
11
12pub struct FileAttachmentStore {
13    root: PathBuf,
14    meta: Mutex<HashMap<AttachmentId, AttachmentMeta>>,
15}
16
17impl FileAttachmentStore {
18    pub fn new(root: impl Into<PathBuf>) -> Self {
19        Self {
20            root: root.into(),
21            meta: Mutex::new(HashMap::new()),
22        }
23    }
24
25    pub fn root(&self) -> &Path {
26        &self.root
27    }
28
29    /// Lock the in-memory metadata cache, recovering from a poisoned lock
30    /// rather than panicking. The cache is a best-effort fast path backed by
31    /// the on-disk `.json` sidecars, so a prior panic while it was held must
32    /// not permanently brick the store — `get`/`put` simply fall back to disk.
33    fn meta_cache(&self) -> std::sync::MutexGuard<'_, HashMap<AttachmentId, AttachmentMeta>> {
34        self.meta
35            .lock()
36            .unwrap_or_else(|poisoned| poisoned.into_inner())
37    }
38
39    fn path_for_id(&self, id: &AttachmentId) -> PathBuf {
40        let id = id.as_str();
41        let prefix = id.get(..2).unwrap_or(id);
42        self.root.join("sha256").join(prefix).join(id)
43    }
44
45    fn meta_path_for_id(&self, id: &AttachmentId) -> PathBuf {
46        self.path_for_id(id).with_extension("json")
47    }
48}
49
50/// Write `bytes` to `final_path` crash-atomically: stage into a sibling
51/// `<final>.tmp`, flush it, then `rename` into place. A `rename` within the
52/// same directory is atomic on POSIX, so a reader (or a crash) ever sees either
53/// the old contents or the complete new contents — never a half-written file.
54/// The temp file is removed on any failure so a crashed write leaves no
55/// `.tmp` litter behind.
56fn write_atomic(final_path: &Path, bytes: &[u8]) -> Result<(), AttachmentStoreError> {
57    let mut tmp_os = final_path.as_os_str().to_os_string();
58    tmp_os.push(".tmp");
59    let tmp_path = PathBuf::from(tmp_os);
60
61    let io_err = |path: &Path, source: std::io::Error| AttachmentStoreError::Io {
62        path: path.to_path_buf(),
63        source,
64    };
65
66    let write_result = (|| {
67        let mut file = fs::File::create(&tmp_path).map_err(|source| io_err(&tmp_path, source))?;
68        std::io::Write::write_all(&mut file, bytes).map_err(|source| io_err(&tmp_path, source))?;
69        // Best-effort durability for the staged bytes before the rename.
70        file.sync_all()
71            .map_err(|source| io_err(&tmp_path, source))?;
72        fs::rename(&tmp_path, final_path).map_err(|source| io_err(final_path, source))
73    })();
74
75    if write_result.is_err() {
76        // Never leave a partial temp file behind.
77        let _ = fs::remove_file(&tmp_path);
78    }
79    write_result
80}
81
82#[async_trait::async_trait]
83impl AttachmentStore for FileAttachmentStore {
84    fn persistence(&self) -> AttachmentStorePersistence {
85        AttachmentStorePersistence::Durable
86    }
87
88    async fn put(
89        &self,
90        bytes: Vec<u8>,
91        meta: AttachmentCreateMeta,
92    ) -> Result<AttachmentRef, AttachmentStoreError> {
93        let meta = AttachmentMeta::new(
94            content_id(&bytes),
95            meta.media_type,
96            bytes.len() as u64,
97            meta.width,
98            meta.height,
99            meta.label,
100        );
101        let path = self.path_for_id(&meta.id);
102        if let Some(parent) = path.parent() {
103            fs::create_dir_all(parent).map_err(|source| AttachmentStoreError::Io {
104                path: parent.to_path_buf(),
105                source,
106            })?;
107        }
108        if !path.exists() {
109            write_atomic(&path, &bytes)?;
110        }
111        let meta_path = self.meta_path_for_id(&meta.id);
112        let meta_bytes = serde_json::to_vec_pretty(&meta).expect("attachment metadata serializes");
113        write_atomic(&meta_path, &meta_bytes)?;
114        let reference = meta.as_ref();
115        self.meta_cache().insert(reference.id.clone(), meta);
116        Ok(reference)
117    }
118
119    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
120        let path = self.path_for_id(id);
121        let bytes = fs::read(&path).map_err(|source| {
122            if source.kind() == std::io::ErrorKind::NotFound {
123                AttachmentStoreError::NotFound(id.clone())
124            } else {
125                AttachmentStoreError::Io {
126                    path: path.clone(),
127                    source,
128                }
129            }
130        })?;
131        let meta = if let Some(meta) = self.meta_cache().get(id).cloned() {
132            meta
133        } else {
134            let meta_path = self.meta_path_for_id(id);
135            let meta_bytes = fs::read(&meta_path).map_err(|source| {
136                if source.kind() == std::io::ErrorKind::NotFound {
137                    AttachmentStoreError::MissingMeta(id.clone())
138                } else {
139                    AttachmentStoreError::Io {
140                        path: meta_path.clone(),
141                        source,
142                    }
143                }
144            })?;
145            serde_json::from_slice(&meta_bytes).map_err(|source| {
146                AttachmentStoreError::MetadataDecode {
147                    id: id.clone(),
148                    source,
149                }
150            })?
151        };
152        Ok(StoredAttachment { meta, bytes })
153    }
154
155    async fn delete(&self, id: &AttachmentId) -> Result<(), AttachmentStoreError> {
156        // Remove the content file and its metadata sidecar. A missing file is
157        // not an error (idempotent delete); any other I/O failure surfaces.
158        let remove = |path: PathBuf| -> Result<(), AttachmentStoreError> {
159            match fs::remove_file(&path) {
160                Ok(()) => Ok(()),
161                Err(source) if source.kind() == std::io::ErrorKind::NotFound => Ok(()),
162                Err(source) => Err(AttachmentStoreError::Io { path, source }),
163            }
164        };
165        remove(self.path_for_id(id))?;
166        remove(self.meta_path_for_id(id))?;
167        self.meta_cache().remove(id);
168        Ok(())
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::{ImageMediaType, MediaType};
176
177    fn meta() -> AttachmentCreateMeta {
178        AttachmentCreateMeta::new(
179            MediaType::Image(ImageMediaType::Png),
180            Some(1),
181            Some(1),
182            Some("pixel".to_string()),
183        )
184    }
185
186    #[tokio::test]
187    async fn file_store_round_trips_bytes_and_metadata() {
188        let temp = tempfile::tempdir().expect("tempdir");
189        let store = FileAttachmentStore::new(temp.path());
190        let reference = store.put(vec![1, 2, 3], meta()).await.expect("put");
191        let stored = store.get(&reference.id).await.expect("get");
192
193        assert_eq!(stored.bytes, vec![1, 2, 3]);
194        assert_eq!(stored.meta.id, reference.id);
195        assert_eq!(stored.meta.byte_len, 3);
196    }
197
198    // Finding 4: `put` must write crash-atomically (stage into `<final>.tmp`,
199    // then rename). After a successful put there must be no leftover `.tmp`
200    // files in the content directory — proof that the temp file was renamed
201    // into place rather than written in situ.
202    #[tokio::test]
203    async fn file_store_writes_atomically_without_temp_litter() {
204        let temp = tempfile::tempdir().expect("tempdir");
205        let store = FileAttachmentStore::new(temp.path());
206        let reference = store.put(vec![9, 8, 7, 6], meta()).await.expect("put");
207
208        let final_path = store.path_for_id(&reference.id);
209        let meta_path = store.meta_path_for_id(&reference.id);
210        assert!(final_path.exists(), "content file must be in place");
211        assert!(meta_path.exists(), "metadata file must be in place");
212
213        let mut tmp_files = Vec::new();
214        let dir = final_path.parent().expect("content dir");
215        for entry in fs::read_dir(dir).expect("read content dir") {
216            let path = entry.expect("dir entry").path();
217            if path.extension().and_then(|ext| ext.to_str()) == Some("tmp") {
218                tmp_files.push(path);
219            }
220        }
221        assert!(
222            tmp_files.is_empty(),
223            "atomic write must not leave .tmp files behind: {tmp_files:?}"
224        );
225
226        // The bytes round-trip in full (no truncation from a partial write).
227        let stored = store.get(&reference.id).await.expect("get");
228        assert_eq!(stored.bytes, vec![9, 8, 7, 6]);
229    }
230
231    // A stale `<final>.tmp` left by a crashed prior write must not block a
232    // subsequent successful put — the temp file is recreated/truncated.
233    #[tokio::test]
234    async fn file_store_overwrites_stale_temp_file() {
235        let temp = tempfile::tempdir().expect("tempdir");
236        let store = FileAttachmentStore::new(temp.path());
237        let content_id = content_id(&[1, 1, 1]);
238        let id = AttachmentId::new(content_id.to_string());
239        let final_path = store.path_for_id(&id);
240        let parent = final_path.parent().expect("parent");
241        fs::create_dir_all(parent).expect("mkdir");
242        let mut tmp_os = final_path.as_os_str().to_os_string();
243        tmp_os.push(".tmp");
244        fs::write(PathBuf::from(tmp_os), b"stale partial write").expect("seed stale tmp");
245
246        let reference = store
247            .put(vec![1, 1, 1], meta())
248            .await
249            .expect("put over stale tmp");
250        let stored = store.get(&reference.id).await.expect("get");
251        assert_eq!(stored.bytes, vec![1, 1, 1]);
252    }
253
254    // Runs the backend-agnostic `AttachmentStore` conformance suite against
255    // the file-backed implementation. The same suite runs against the
256    // in-memory store, so both backends are held to one contract.
257    #[tokio::test]
258    async fn file_attachment_store_satisfies_conformance() {
259        use std::sync::Arc;
260
261        use crate::testing::conformance::ReopenableAttachmentStore;
262
263        // Each `make()` call needs its own root that outlives the returned
264        // store. Keep the tempdirs alive for the duration of the suite.
265        let dirs: Arc<Mutex<Vec<tempfile::TempDir>>> = Arc::new(Mutex::new(Vec::new()));
266        crate::testing::conformance::attachment_store_reopenable(
267            || {
268                let dir = tempfile::tempdir().expect("tempdir");
269                let open =
270                    Arc::new(FileAttachmentStore::new(dir.path())) as Arc<dyn AttachmentStore>;
271                let reopen =
272                    Arc::new(FileAttachmentStore::new(dir.path())) as Arc<dyn AttachmentStore>;
273                dirs.lock().expect("dirs lock").push(dir);
274                ReopenableAttachmentStore { open, reopen }
275            },
276            AttachmentStorePersistence::Durable,
277        )
278        .await;
279    }
280}