Skip to main content

lash_local_store/
lib.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Mutex;
5
6use lash_core::{
7    AttachmentCreateMeta, AttachmentId, AttachmentMeta, AttachmentRef, AttachmentStore,
8    AttachmentStoreError, AttachmentStorePersistence, StoredAttachment,
9};
10
11pub struct FileAttachmentStore {
12    root: PathBuf,
13    meta: Mutex<HashMap<AttachmentId, AttachmentMeta>>,
14}
15
16impl FileAttachmentStore {
17    pub fn new(root: impl Into<PathBuf>) -> Self {
18        Self {
19            root: root.into(),
20            meta: Mutex::new(HashMap::new()),
21        }
22    }
23
24    pub fn root(&self) -> &Path {
25        &self.root
26    }
27
28    /// Lock the in-memory metadata cache, recovering from a poisoned lock
29    /// rather than panicking. The cache is a best-effort fast path backed by
30    /// the on-disk `.json` sidecars, so a prior panic while it was held must
31    /// not permanently brick the store — `get`/`put` simply fall back to disk.
32    fn meta_cache(&self) -> std::sync::MutexGuard<'_, HashMap<AttachmentId, AttachmentMeta>> {
33        self.meta
34            .lock()
35            .unwrap_or_else(|poisoned| poisoned.into_inner())
36    }
37
38    fn path_for_id(&self, id: &AttachmentId) -> PathBuf {
39        let id = id.as_str();
40        let prefix = id.get(..2).unwrap_or(id);
41        self.root.join("sha256").join(prefix).join(id)
42    }
43
44    fn meta_path_for_id(&self, id: &AttachmentId) -> PathBuf {
45        self.path_for_id(id).with_extension("json")
46    }
47}
48
49/// Write `bytes` to `final_path` crash-atomically: stage into a sibling
50/// `<final>.tmp`, flush it, then `rename` into place. A `rename` within the
51/// same directory is atomic on POSIX, so a reader (or a crash) ever sees either
52/// the old contents or the complete new contents — never a half-written file.
53/// The temp file is removed on any failure so a crashed write leaves no
54/// `.tmp` litter behind.
55fn write_atomic(final_path: &Path, bytes: &[u8]) -> Result<(), AttachmentStoreError> {
56    let mut tmp_os = final_path.as_os_str().to_os_string();
57    tmp_os.push(".tmp");
58    let tmp_path = PathBuf::from(tmp_os);
59
60    let io_err = |path: &Path, source: std::io::Error| AttachmentStoreError::Io {
61        path: path.to_path_buf(),
62        source,
63    };
64
65    let write_result = (|| {
66        let mut file = fs::File::create(&tmp_path).map_err(|source| io_err(&tmp_path, source))?;
67        std::io::Write::write_all(&mut file, bytes).map_err(|source| io_err(&tmp_path, source))?;
68        // Best-effort durability for the staged bytes before the rename.
69        file.sync_all()
70            .map_err(|source| io_err(&tmp_path, source))?;
71        fs::rename(&tmp_path, final_path).map_err(|source| io_err(final_path, source))
72    })();
73
74    if write_result.is_err() {
75        // Never leave a partial temp file behind.
76        let _ = fs::remove_file(&tmp_path);
77    }
78    write_result
79}
80
81#[async_trait::async_trait]
82impl AttachmentStore for FileAttachmentStore {
83    fn persistence(&self) -> AttachmentStorePersistence {
84        AttachmentStorePersistence::Durable
85    }
86
87    async fn put(
88        &self,
89        bytes: Vec<u8>,
90        meta: AttachmentCreateMeta,
91    ) -> Result<AttachmentRef, AttachmentStoreError> {
92        let meta = AttachmentMeta::new(
93            lash_core::attachments::content_id(&bytes),
94            meta.media_type,
95            bytes.len() as u64,
96            meta.width,
97            meta.height,
98            meta.label,
99        );
100        let path = self.path_for_id(&meta.id);
101        if let Some(parent) = path.parent() {
102            fs::create_dir_all(parent).map_err(|source| AttachmentStoreError::Io {
103                path: parent.to_path_buf(),
104                source,
105            })?;
106        }
107        if !path.exists() {
108            write_atomic(&path, &bytes)?;
109        }
110        let meta_path = self.meta_path_for_id(&meta.id);
111        let meta_bytes = serde_json::to_vec_pretty(&meta).expect("attachment metadata serializes");
112        write_atomic(&meta_path, &meta_bytes)?;
113        let reference = meta.as_ref();
114        self.meta_cache().insert(reference.id.clone(), meta);
115        Ok(reference)
116    }
117
118    async fn get(&self, id: &AttachmentId) -> Result<StoredAttachment, AttachmentStoreError> {
119        let path = self.path_for_id(id);
120        let bytes = fs::read(&path).map_err(|source| {
121            if source.kind() == std::io::ErrorKind::NotFound {
122                AttachmentStoreError::NotFound(id.clone())
123            } else {
124                AttachmentStoreError::Io {
125                    path: path.clone(),
126                    source,
127                }
128            }
129        })?;
130        let meta = if let Some(meta) = self.meta_cache().get(id).cloned() {
131            meta
132        } else {
133            let meta_path = self.meta_path_for_id(id);
134            let meta_bytes = fs::read(&meta_path).map_err(|source| {
135                if source.kind() == std::io::ErrorKind::NotFound {
136                    AttachmentStoreError::MissingMeta(id.clone())
137                } else {
138                    AttachmentStoreError::Io {
139                        path: meta_path.clone(),
140                        source,
141                    }
142                }
143            })?;
144            serde_json::from_slice(&meta_bytes).map_err(|source| {
145                AttachmentStoreError::MetadataDecode {
146                    id: id.clone(),
147                    source,
148                }
149            })?
150        };
151        Ok(StoredAttachment { meta, bytes })
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use lash_core::{ImageMediaType, MediaType};
159
160    fn meta() -> AttachmentCreateMeta {
161        AttachmentCreateMeta::new(
162            MediaType::Image(ImageMediaType::Png),
163            Some(1),
164            Some(1),
165            Some("pixel".to_string()),
166        )
167    }
168
169    #[tokio::test]
170    async fn file_store_round_trips_bytes_and_metadata() {
171        let temp = tempfile::tempdir().expect("tempdir");
172        let store = FileAttachmentStore::new(temp.path());
173        let reference = store.put(vec![1, 2, 3], meta()).await.expect("put");
174        let stored = store.get(&reference.id).await.expect("get");
175
176        assert_eq!(stored.bytes, vec![1, 2, 3]);
177        assert_eq!(stored.meta.id, reference.id);
178        assert_eq!(stored.meta.byte_len, 3);
179    }
180
181    // Finding 4: `put` must write crash-atomically (stage into `<final>.tmp`,
182    // then rename). After a successful put there must be no leftover `.tmp`
183    // files in the content directory — proof that the temp file was renamed
184    // into place rather than written in situ.
185    #[tokio::test]
186    async fn file_store_writes_atomically_without_temp_litter() {
187        let temp = tempfile::tempdir().expect("tempdir");
188        let store = FileAttachmentStore::new(temp.path());
189        let reference = store.put(vec![9, 8, 7, 6], meta()).await.expect("put");
190
191        let final_path = store.path_for_id(&reference.id);
192        let meta_path = store.meta_path_for_id(&reference.id);
193        assert!(final_path.exists(), "content file must be in place");
194        assert!(meta_path.exists(), "metadata file must be in place");
195
196        let mut tmp_files = Vec::new();
197        let dir = final_path.parent().expect("content dir");
198        for entry in fs::read_dir(dir).expect("read content dir") {
199            let path = entry.expect("dir entry").path();
200            if path.extension().and_then(|ext| ext.to_str()) == Some("tmp") {
201                tmp_files.push(path);
202            }
203        }
204        assert!(
205            tmp_files.is_empty(),
206            "atomic write must not leave .tmp files behind: {tmp_files:?}"
207        );
208
209        // The bytes round-trip in full (no truncation from a partial write).
210        let stored = store.get(&reference.id).await.expect("get");
211        assert_eq!(stored.bytes, vec![9, 8, 7, 6]);
212    }
213
214    // A stale `<final>.tmp` left by a crashed prior write must not block a
215    // subsequent successful put — the temp file is recreated/truncated.
216    #[tokio::test]
217    async fn file_store_overwrites_stale_temp_file() {
218        let temp = tempfile::tempdir().expect("tempdir");
219        let store = FileAttachmentStore::new(temp.path());
220        let content_id = lash_core::attachments::content_id(&[1, 1, 1]);
221        let id = AttachmentId::new(content_id.to_string());
222        let final_path = store.path_for_id(&id);
223        let parent = final_path.parent().expect("parent");
224        fs::create_dir_all(parent).expect("mkdir");
225        let mut tmp_os = final_path.as_os_str().to_os_string();
226        tmp_os.push(".tmp");
227        fs::write(PathBuf::from(tmp_os), b"stale partial write").expect("seed stale tmp");
228
229        let reference = store
230            .put(vec![1, 1, 1], meta())
231            .await
232            .expect("put over stale tmp");
233        let stored = store.get(&reference.id).await.expect("get");
234        assert_eq!(stored.bytes, vec![1, 1, 1]);
235    }
236}