Skip to main content

pacha/storage/
object_store.rs

1//! Object store for content-addressed artifact storage.
2
3use crate::error::{PachaError, Result};
4use crate::storage::ContentAddress;
5use std::fs::{self, File};
6use std::io::{BufReader, BufWriter, Read, Write};
7use std::path::{Path, PathBuf};
8
9/// Content-addressed object store.
10///
11/// Stores artifacts using BLAKE3 hash prefixes for sharding:
12/// ```text
13/// objects/
14/// ├── ab/
15/// │   └── cdef1234...
16/// ├── cd/
17/// │   └── ef5678...
18/// └── ...
19/// ```
20#[derive(Debug)]
21pub struct ObjectStore {
22    /// Base path for object storage.
23    base_path: PathBuf,
24}
25
26impl ObjectStore {
27    /// Create a new object store at the given path.
28    ///
29    /// # Errors
30    ///
31    /// Returns an error if the directory cannot be created.
32    pub fn new<P: AsRef<Path>>(base_path: P) -> Result<Self> {
33        let base_path = base_path.as_ref().to_path_buf();
34        fs::create_dir_all(&base_path)?;
35        Ok(Self { base_path })
36    }
37
38    /// Open an existing object store.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if the path doesn't exist.
43    pub fn open<P: AsRef<Path>>(base_path: P) -> Result<Self> {
44        let base_path = base_path.as_ref().to_path_buf();
45        if !base_path.exists() {
46            return Err(PachaError::NotInitialized(base_path));
47        }
48        Ok(Self { base_path })
49    }
50
51    /// Get the base path.
52    #[must_use]
53    pub fn base_path(&self) -> &Path {
54        &self.base_path
55    }
56
57    /// Store bytes and return their content address.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if writing fails.
62    pub fn put(&self, data: &[u8]) -> Result<ContentAddress> {
63        let addr = ContentAddress::from_bytes(data);
64        self.put_with_address(data, &addr)?;
65        Ok(addr)
66    }
67
68    /// Store bytes at a specific content address.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if writing fails or hash doesn't match.
73    pub fn put_with_address(&self, data: &[u8], addr: &ContentAddress) -> Result<()> {
74        // Verify hash matches
75        if !addr.verify(data) {
76            return Err(PachaError::HashMismatch {
77                expected: addr.hash_hex(),
78                actual: ContentAddress::from_bytes(data).hash_hex(),
79            });
80        }
81
82        let path = self.object_path(addr);
83
84        // Skip if already exists (content-addressed = idempotent)
85        if path.exists() {
86            return Ok(());
87        }
88
89        // Create parent directory
90        if let Some(parent) = path.parent() {
91            fs::create_dir_all(parent)?;
92        }
93
94        let temp_path = path.with_extension("tmp");
95        {
96            let file = File::create(&temp_path)?;
97            let mut writer = BufWriter::new(file);
98            writer.write_all(data)?;
99            writer.flush()?;
100        }
101
102        // Atomic rename
103        fs::rename(&temp_path, &path)?;
104
105        Ok(())
106    }
107
108    /// Store from a reader and return content address.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if reading or writing fails.
113    pub fn put_reader<R: Read>(&self, mut reader: R) -> Result<ContentAddress> {
114        // Read all data first to compute hash
115        let mut data = Vec::new();
116        reader.read_to_end(&mut data)?;
117        self.put(&data)
118    }
119
120    /// Get data by content address.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the object doesn't exist or reading fails.
125    pub fn get(&self, addr: &ContentAddress) -> Result<Vec<u8>> {
126        let path = self.object_path(addr);
127
128        if !path.exists() {
129            return Err(PachaError::NotFound {
130                kind: "object".to_string(),
131                name: addr.hash_hex(),
132                version: "n/a".to_string(),
133            });
134        }
135
136        let file = File::open(&path)?;
137        let mut reader = BufReader::new(file);
138        let capacity = usize::try_from(addr.size()).unwrap_or(0);
139        let mut data = Vec::with_capacity(capacity);
140        reader.read_to_end(&mut data)?;
141
142        // Verify integrity
143        if !addr.verify(&data) {
144            return Err(PachaError::HashMismatch {
145                expected: addr.hash_hex(),
146                actual: ContentAddress::from_bytes(&data).hash_hex(),
147            });
148        }
149
150        Ok(data)
151    }
152
153    /// Check if an object exists.
154    #[must_use]
155    pub fn exists(&self, addr: &ContentAddress) -> bool {
156        self.object_path(addr).exists()
157    }
158
159    /// Delete an object by content address.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if deletion fails.
164    pub fn delete(&self, addr: &ContentAddress) -> Result<bool> {
165        let path = self.object_path(addr);
166
167        if !path.exists() {
168            return Ok(false);
169        }
170
171        fs::remove_file(&path)?;
172
173        // Try to remove empty parent directory
174        if let Some(parent) = path.parent() {
175            let _ = fs::remove_dir(parent); // Ignore if not empty
176        }
177
178        Ok(true)
179    }
180
181    /// List all content addresses in the store.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if reading the directory fails.
186    pub fn list(&self) -> Result<Vec<String>> {
187        let mut addresses = Vec::new();
188
189        if !self.base_path.exists() {
190            return Ok(addresses);
191        }
192
193        for prefix_entry in fs::read_dir(&self.base_path)? {
194            let prefix_entry = prefix_entry?;
195            if !prefix_entry.file_type()?.is_dir() {
196                continue;
197            }
198
199            for entry in fs::read_dir(prefix_entry.path())? {
200                let entry = entry?;
201                if entry.file_type()?.is_file() {
202                    if let Some(name) = entry.file_name().to_str() {
203                        // Exclude .tmp working files from listing
204                        #[allow(clippy::case_sensitive_file_extension_comparisons)]
205                        if !name.ends_with(".tmp") {
206                            addresses.push(name.to_string());
207                        }
208                    }
209                }
210            }
211        }
212
213        Ok(addresses)
214    }
215
216    /// Get total size of all stored objects in bytes.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if reading fails.
221    pub fn total_size(&self) -> Result<u64> {
222        let mut total = 0u64;
223
224        if !self.base_path.exists() {
225            return Ok(0);
226        }
227
228        for prefix_entry in fs::read_dir(&self.base_path)? {
229            let prefix_entry = prefix_entry?;
230            if !prefix_entry.file_type()?.is_dir() {
231                continue;
232            }
233
234            for entry in fs::read_dir(prefix_entry.path())? {
235                let entry = entry?;
236                if entry.file_type()?.is_file() {
237                    total += entry.metadata()?.len();
238                }
239            }
240        }
241
242        Ok(total)
243    }
244
245    /// Get the file path for a content address.
246    fn object_path(&self, addr: &ContentAddress) -> PathBuf {
247        self.base_path.join(addr.storage_prefix()).join(addr.hash_hex())
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use proptest::prelude::*;
255    use tempfile::TempDir;
256
257    fn setup() -> (TempDir, ObjectStore) {
258        let dir = TempDir::new().unwrap();
259        let store = ObjectStore::new(dir.path().join("objects")).unwrap();
260        (dir, store)
261    }
262
263    #[test]
264    fn test_put_and_get() {
265        let (_dir, store) = setup();
266        let data = b"hello world";
267
268        let addr = store.put(data).unwrap();
269        assert_eq!(addr.size(), 11);
270
271        let retrieved = store.get(&addr).unwrap();
272        assert_eq!(retrieved, data);
273    }
274
275    #[test]
276    fn test_put_idempotent() {
277        let (_dir, store) = setup();
278        let data = b"test data";
279
280        let addr1 = store.put(data).unwrap();
281        let addr2 = store.put(data).unwrap();
282
283        assert_eq!(addr1, addr2);
284    }
285
286    #[test]
287    fn test_exists() {
288        let (_dir, store) = setup();
289        let data = b"test";
290
291        let addr = ContentAddress::from_bytes(data);
292        assert!(!store.exists(&addr));
293
294        store.put(data).unwrap();
295        assert!(store.exists(&addr));
296    }
297
298    #[test]
299    fn test_delete() {
300        let (_dir, store) = setup();
301        let data = b"delete me";
302
303        let addr = store.put(data).unwrap();
304        assert!(store.exists(&addr));
305
306        let deleted = store.delete(&addr).unwrap();
307        assert!(deleted);
308        assert!(!store.exists(&addr));
309
310        // Delete non-existent returns false
311        let deleted_again = store.delete(&addr).unwrap();
312        assert!(!deleted_again);
313    }
314
315    #[test]
316    fn test_get_not_found() {
317        let (_dir, store) = setup();
318        let addr = ContentAddress::from_bytes(b"nonexistent");
319
320        let result = store.get(&addr);
321        assert!(matches!(result, Err(PachaError::NotFound { .. })));
322    }
323
324    #[test]
325    fn test_put_with_wrong_address() {
326        let (_dir, store) = setup();
327        let data = b"actual data";
328        let wrong_addr = ContentAddress::from_bytes(b"different data");
329
330        let result = store.put_with_address(data, &wrong_addr);
331        assert!(matches!(result, Err(PachaError::HashMismatch { .. })));
332    }
333
334    #[test]
335    fn test_list() {
336        let (_dir, store) = setup();
337
338        store.put(b"one").unwrap();
339        store.put(b"two").unwrap();
340        store.put(b"three").unwrap();
341
342        let addresses = store.list().unwrap();
343        assert_eq!(addresses.len(), 3);
344    }
345
346    #[test]
347    fn test_total_size() {
348        let (_dir, store) = setup();
349
350        store.put(b"12345").unwrap();
351        store.put(b"67890").unwrap();
352
353        let size = store.total_size().unwrap();
354        assert_eq!(size, 10);
355    }
356
357    #[test]
358    fn test_open_nonexistent() {
359        let dir = TempDir::new().unwrap();
360        let result = ObjectStore::open(dir.path().join("nonexistent"));
361        assert!(matches!(result, Err(PachaError::NotInitialized(_))));
362    }
363
364    // Property-based tests
365    proptest! {
366        #[test]
367        fn prop_roundtrip(data: Vec<u8>) {
368            let dir = TempDir::new().unwrap();
369            let store = ObjectStore::new(dir.path().join("objects")).unwrap();
370
371            let addr = store.put(&data).unwrap();
372            let retrieved = store.get(&addr).unwrap();
373
374            prop_assert_eq!(data, retrieved);
375        }
376
377        #[test]
378        fn prop_idempotent(data: Vec<u8>) {
379            let dir = TempDir::new().unwrap();
380            let store = ObjectStore::new(dir.path().join("objects")).unwrap();
381
382            let addr1 = store.put(&data).unwrap();
383            let addr2 = store.put(&data).unwrap();
384
385            prop_assert_eq!(addr1, addr2);
386        }
387
388        #[test]
389        fn prop_deduplication(data: Vec<u8>) {
390            let dir = TempDir::new().unwrap();
391            let store = ObjectStore::new(dir.path().join("objects")).unwrap();
392
393            // Store same data twice
394            store.put(&data).unwrap();
395            store.put(&data).unwrap();
396
397            // Should only have one object
398            let addresses = store.list().unwrap();
399            prop_assert_eq!(addresses.len(), 1);
400        }
401    }
402}