1use crate::cas::compressor::{self, is_zstd_compressed};
15use crate::cas::hasher;
16use crate::cas::pack::{PackCache, PackError, PackFile, PackIndex};
17use std::fs;
18use std::io;
19use std::path::PathBuf;
20use std::sync::Mutex;
21use suture_common::Hash;
22use thiserror::Error;
23
24#[derive(Error, Debug)]
26pub enum CasError {
27 #[error("blob not found: {0}")]
28 BlobNotFound(String),
29
30 #[error("hash mismatch: expected {expected}, got {actual}")]
31 HashMismatch { expected: String, actual: String },
32
33 #[error("I/O error: {0}")]
34 Io(#[from] io::Error),
35
36 #[error("compression error: {0}")]
37 CompressionError(String),
38
39 #[error("decompression error: {0}")]
40 DecompressionError(String),
41
42 #[error("decompressed data too large: {max} bytes max")]
43 DecompressionTooLarge { max: usize },
44
45 #[error("blob already exists: {0}")]
46 AlreadyExists(String),
47
48 #[error("invalid path: {0}")]
49 InvalidPath(String),
50
51 #[error("pack error: {0}")]
52 Pack(#[from] PackError),
53}
54
55pub struct BlobStore {
65 root: PathBuf,
67 compress: bool,
69 compression_level: i32,
71 verify_on_read: bool,
76 pack_cache: Mutex<Option<PackCache>>,
79}
80
81impl BlobStore {
82 pub fn new(root: impl Into<PathBuf>) -> Result<Self, CasError> {
86 let root = root.into();
87 let objects_dir = root.join("objects");
88 fs::create_dir_all(&objects_dir)?;
89 Ok(Self {
90 root,
91 compress: true,
92 compression_level: compressor::DEFAULT_COMPRESSION_LEVEL,
93 verify_on_read: true,
94 pack_cache: Mutex::new(None),
95 })
96 }
97
98 pub fn open_in_memory() -> Result<Self, CasError> {
103 let root = tempfile::tempdir().map_err(CasError::Io)?.keep();
104 let objects_dir = root.join("objects");
105 fs::create_dir_all(&objects_dir)?;
106 Ok(Self {
107 root,
108 compress: true,
109 compression_level: compressor::DEFAULT_COMPRESSION_LEVEL,
110 verify_on_read: true,
111 pack_cache: Mutex::new(None),
112 })
113 }
114
115 pub fn new_uncompressed(root: impl Into<PathBuf>) -> Result<Self, CasError> {
117 let mut store = Self::new(root)?;
118 store.compress = false;
119 Ok(store)
120 }
121
122 pub fn set_verify_on_read(&mut self, verify: bool) {
130 self.verify_on_read = verify;
131 }
132
133 pub fn verify_on_read(&self) -> bool {
135 self.verify_on_read
136 }
137
138 pub fn put_blob(&self, data: &[u8]) -> Result<Hash, CasError> {
143 let hash = hasher::hash_bytes(data);
144 let blob_path = self.blob_path(&hash);
145
146 if blob_path.exists() {
148 return Ok(hash);
149 }
150
151 if let Some(parent) = blob_path.parent() {
153 fs::create_dir_all(parent)?;
154 }
155
156 if self.compress {
158 let compressed = compressor::compress(data, self.compression_level)?;
159 fs::write(&blob_path, &compressed)?;
160 } else {
161 fs::write(&blob_path, data)?;
162 }
163
164 Ok(hash)
165 }
166
167 pub fn put_blob_new(&self, data: &[u8]) -> Result<Hash, CasError> {
169 let hash = hasher::hash_bytes(data);
170 let blob_path = self.blob_path(&hash);
171
172 if blob_path.exists() {
173 return Err(CasError::AlreadyExists(hash.to_hex()));
174 }
175
176 if let Some(parent) = blob_path.parent() {
177 fs::create_dir_all(parent)?;
178 }
179
180 if self.compress {
181 let compressed = compressor::compress(data, self.compression_level)?;
182 fs::write(&blob_path, &compressed)?;
183 } else {
184 fs::write(&blob_path, data)?;
185 }
186
187 Ok(hash)
188 }
189
190 pub fn put_blob_with_hash(&self, data: &[u8], expected_hash: &Hash) -> Result<(), CasError> {
194 let blob_path = self.blob_path(expected_hash);
195
196 if blob_path.exists() {
197 return Ok(());
198 }
199
200 hasher::verify_hash(data, expected_hash)?;
201
202 if let Some(parent) = blob_path.parent() {
203 fs::create_dir_all(parent)?;
204 }
205
206 if self.compress {
207 let compressed = compressor::compress(data, self.compression_level)?;
208 fs::write(&blob_path, &compressed)?;
209 } else {
210 fs::write(&blob_path, data)?;
211 }
212
213 Ok(())
214 }
215
216 pub fn get_blob(&self, hash: &Hash) -> Result<Vec<u8>, CasError> {
222 let blob_path = self.blob_path(hash);
224 if blob_path.exists() {
225 let raw = fs::read(&blob_path)?;
226 let data = if is_zstd_compressed(&raw) {
227 compressor::decompress(&raw)?
228 } else {
229 raw
230 };
231 if self.verify_on_read {
232 hasher::verify_hash(&data, hash)?;
233 }
234 return Ok(data);
235 }
236
237 if let Ok(data) = self.get_blob_packed(hash) {
239 return Ok(data);
240 }
241
242 Err(CasError::BlobNotFound(hash.to_hex()))
243 }
244
245 pub fn has_blob(&self, hash: &Hash) -> bool {
250 self.blob_path(hash).exists() || self.has_blob_packed(hash)
251 }
252
253 pub fn delete_blob(&self, hash: &Hash) -> Result<(), CasError> {
257 let blob_path = self.blob_path(hash);
258 fs::remove_file(&blob_path).map_err(|e| {
259 if e.kind() == io::ErrorKind::NotFound {
260 CasError::BlobNotFound(hash.to_hex())
261 } else {
262 CasError::Io(e)
263 }
264 })
265 }
266
267 pub fn blob_count(&self) -> Result<u64, CasError> {
269 let objects_dir = self.root.join("objects");
270 let mut count = 0u64;
271 if objects_dir.exists() {
272 for entry in fs::read_dir(&objects_dir)? {
273 let entry = entry?;
274 if entry.file_type()?.is_dir() {
275 let dir_name = entry.file_name();
276 if dir_name == "pack" {
277 continue;
278 }
279 for sub_entry in fs::read_dir(entry.path())? {
280 let sub_entry = sub_entry?;
281 if sub_entry.file_type()?.is_file() {
282 count += 1;
283 }
284 }
285 }
286 }
287 }
288 Ok(count)
289 }
290
291 pub fn total_size(&self) -> Result<u64, CasError> {
293 let objects_dir = self.root.join("objects");
294 let mut total = 0u64;
295 if objects_dir.exists() {
296 for entry in fs::read_dir(&objects_dir)? {
297 let entry = entry?;
298 if entry.file_type()?.is_dir() {
299 let dir_name = entry.file_name();
300 if dir_name == "pack" {
301 continue;
302 }
303 for sub_entry in fs::read_dir(entry.path())? {
304 let sub_entry = sub_entry?;
305 if sub_entry.file_type()?.is_file() {
306 total += sub_entry.metadata()?.len();
307 }
308 }
309 }
310 }
311 }
312 Ok(total)
313 }
314
315 pub fn list_blobs(&self) -> Result<Vec<Hash>, CasError> {
317 let objects_dir = self.root.join("objects");
318 let mut hashes = Vec::new();
319 if !objects_dir.exists() {
320 return Ok(hashes);
321 }
322 for entry in fs::read_dir(&objects_dir)? {
323 let entry = entry?;
324 if entry.file_type()?.is_dir() {
325 let dir_name = entry.file_name();
326 if dir_name == "pack" {
327 continue;
328 }
329 let prefix = dir_name.to_string_lossy().to_string();
330 for sub_entry in fs::read_dir(entry.path())? {
331 let sub_entry = sub_entry?;
332 if sub_entry.file_type()?.is_file() {
333 let suffix = sub_entry.file_name().to_string_lossy().to_string();
334 let hex = format!("{prefix}{suffix}");
335 if let Ok(hash) = Hash::from_hex(&hex) {
336 hashes.push(hash);
337 }
338 }
339 }
340 }
341 }
342 Ok(hashes)
343 }
344
345 pub fn objects_dir(&self) -> PathBuf {
347 self.root.join("objects")
348 }
349
350 pub fn pack_dir(&self) -> PathBuf {
352 self.root.join("objects").join("pack")
353 }
354
355 fn with_pack_cache<F, R>(&self, f: F) -> Result<R, CasError>
361 where
362 F: FnOnce(&PackCache) -> R,
363 {
364 let mut guard = self
365 .pack_cache
366 .lock()
367 .map_err(|e| CasError::CompressionError(format!("pack cache lock poisoned: {e}")))?;
368 if guard.is_none() {
369 *guard = Some(PackCache::load_all(&self.pack_dir()).map_err(CasError::Pack)?);
370 }
371 let cache = guard.as_ref().ok_or_else(|| {
373 CasError::Pack(PackError::BlobNotFound("pack cache not loaded".into()))
374 })?;
375 Ok(f(cache))
376 }
377
378 pub fn invalidate_pack_cache(&self) {
380 if let Ok(mut guard) = self.pack_cache.lock() {
381 *guard = None;
382 }
383 }
384
385 pub fn get_blob_packed(&self, hash: &Hash) -> Result<Vec<u8>, CasError> {
387 let pack_path = self.with_pack_cache(|cache| cache.find(hash).map(|(p, _)| p.clone()))?;
389 let pack_path = pack_path.ok_or_else(|| CasError::BlobNotFound(hash.to_hex()))?;
390
391 let idx_path = pack_path.with_extension("idx");
392 let index = PackIndex::load(&idx_path).map_err(CasError::Pack)?;
393 let data = PackFile::read_blob(&pack_path, &index, hash).map_err(CasError::Pack)?;
394 Ok(data)
395 }
396
397 pub fn has_blob_packed(&self, hash: &Hash) -> bool {
399 self.with_pack_cache(|cache| cache.find(hash).is_some())
400 .unwrap_or(false)
401 }
402
403 pub fn list_blobs_packed(&self) -> Result<Vec<Hash>, CasError> {
405 self.with_pack_cache(|cache| cache.all_hashes())
406 }
407
408 pub fn repack(&self, threshold: usize) -> Result<usize, CasError> {
414 let loose_hashes = self.list_blobs()?;
415 if loose_hashes.len() <= threshold {
416 return Ok(0);
417 }
418
419 let mut objects = Vec::with_capacity(loose_hashes.len());
420 for hash in &loose_hashes {
421 let data = self.get_blob(hash)?;
422 objects.push((*hash, data));
423 }
424
425 let (pack_path, _idx_path) = PackFile::create(&self.pack_dir(), &objects)?;
426 let _ = pack_path;
427
428 for hash in &loose_hashes {
429 let _ = self.delete_blob(hash);
430 }
431
432 self.invalidate_pack_cache();
434
435 Ok(loose_hashes.len())
436 }
437
438 fn blob_path(&self, hash: &Hash) -> PathBuf {
440 let hex = hash.to_hex();
441 let prefix = &hex[..2];
442 let suffix = &hex[2..];
443 self.root.join("objects").join(prefix).join(suffix)
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use tempfile::TempDir;
451
452 fn make_store() -> (TempDir, BlobStore) {
453 let dir = tempfile::tempdir().unwrap();
454 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
455 (dir, store)
456 }
457
458 #[test]
459 fn test_put_and_get_blob() {
460 let (_dir, store) = make_store();
461 let data = b"hello, suture!";
462 let hash = store.put_blob(data).unwrap();
463
464 let retrieved = store.get_blob(&hash).unwrap();
465 assert_eq!(data.as_slice(), retrieved.as_slice());
466 }
467
468 #[test]
469 fn test_deduplication() {
470 let (_dir, store) = make_store();
471 let data = b"deduplicate me";
472
473 let h1 = store.put_blob(data).unwrap();
474 let h2 = store.put_blob(data).unwrap();
475 assert_eq!(h1, h2);
476
477 assert_eq!(store.blob_count().unwrap(), 1, "Only one copy should exist");
478 }
479
480 #[test]
481 fn test_has_blob() {
482 let (_dir, store) = make_store();
483 let hash = store.put_blob(b"exists").unwrap();
484
485 assert!(store.has_blob(&hash));
486 let missing = Hash::from_hex(&"f".repeat(64)).unwrap();
487 assert!(!store.has_blob(&missing));
488 }
489
490 #[test]
491 fn test_get_nonexistent_blob() {
492 let (_dir, store) = make_store();
493 let missing = Hash::from_hex(&"a".repeat(64)).unwrap();
494 let result = store.get_blob(&missing);
495 assert!(matches!(result, Err(CasError::BlobNotFound(_))));
496 }
497
498 #[test]
499 fn test_delete_blob() {
500 let (_dir, store) = make_store();
501 let hash = store.put_blob(b"delete me").unwrap();
502 assert!(store.has_blob(&hash));
503
504 store.delete_blob(&hash).unwrap();
505 assert!(!store.has_blob(&hash));
506 }
507
508 #[test]
509 fn test_delete_nonexistent_blob() {
510 let (_dir, store) = make_store();
511 let missing = Hash::from_hex(&"b".repeat(64)).unwrap();
512 let result = store.delete_blob(&missing);
513 assert!(matches!(result, Err(CasError::BlobNotFound(_))));
514 }
515
516 #[test]
517 fn test_put_blob_new_rejects_duplicate() {
518 let (_dir, store) = make_store();
519 let data = b"duplicate";
520 store.put_blob(data).unwrap();
521 let result = store.put_blob_new(data);
522 assert!(matches!(result, Err(CasError::AlreadyExists(_))));
523 }
524
525 #[test]
526 fn test_blob_count_and_list() {
527 let (_dir, store) = make_store();
528 store.put_blob(b"one").unwrap();
529 store.put_blob(b"two").unwrap();
530 store.put_blob(b"three").unwrap();
531
532 assert_eq!(store.blob_count().unwrap(), 3);
533 assert_eq!(store.list_blobs().unwrap().len(), 3);
534 }
535
536 #[test]
537 fn test_large_blob() {
538 let (_dir, store) = make_store();
539 let data: Vec<u8> = (0..10_000_000).map(|i| (i % 256) as u8).collect();
541 let hash = store.put_blob(&data).unwrap();
542
543 let retrieved = store.get_blob(&hash).unwrap();
544 assert_eq!(data.len(), retrieved.len());
545 assert_eq!(data, retrieved);
546 }
547
548 #[test]
549 fn test_hash_integrity() {
550 let (_dir, store) = make_store();
551 let data = b"integrity check";
552 let hash = store.put_blob(data).unwrap();
553
554 let blob_path = store.blob_path(&hash);
556 let mut corrupted = fs::read(&blob_path).unwrap();
557 corrupted[0] = corrupted[0].wrapping_add(1);
558 fs::write(&blob_path, &corrupted).unwrap();
559
560 let result = store.get_blob(&hash);
562 assert!(matches!(result, Err(CasError::HashMismatch { .. })));
563 }
564
565 #[test]
566 fn test_compressed_store() {
567 let dir = tempfile::tempdir().unwrap();
568 let store = BlobStore::new(dir.path()).unwrap();
569
570 let data = b"this will be compressed";
571 let hash = store.put_blob(data).unwrap();
572
573 let blob_path = store.blob_path(&hash);
575 let raw = fs::read(&blob_path).unwrap();
576 assert!(is_zstd_compressed(&raw), "Blob should be Zstd-compressed");
577
578 let retrieved = store.get_blob(&hash).unwrap();
580 assert_eq!(data.as_slice(), retrieved.as_slice());
581 }
582
583 mod proptests {
584 use super::*;
585 use proptest::prelude::*;
586
587 proptest! {
588 #[test]
589 fn put_get_roundtrip(data in proptest::collection::vec(proptest::num::u8::ANY, 0..1024)) {
590 let dir = tempfile::tempdir().unwrap();
591 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
592 let hash = store.put_blob(&data).unwrap();
593 let retrieved = store.get_blob(&hash).unwrap();
594 prop_assert_eq!(data, retrieved);
595 }
596
597 #[test]
598 fn content_addressing(
599 data1 in proptest::collection::vec(proptest::num::u8::ANY, 0..512),
600 data2 in proptest::collection::vec(proptest::num::u8::ANY, 0..512)
601 ) {
602 let dir = tempfile::tempdir().unwrap();
603 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
604
605 let hash1 = store.put_blob(&data1).unwrap();
606 let hash2 = store.put_blob(&data2).unwrap();
607
608 if data1 == data2 {
609 prop_assert_eq!(hash1, hash2, "same data must produce same hash");
610 } else {
611 prop_assert_ne!(hash1, hash2, "different data must produce different hashes");
612 }
613 }
614
615 #[test]
616 fn put_twice_idempotent(data in proptest::collection::vec(proptest::num::u8::ANY, 0..1024)) {
617 let dir = tempfile::tempdir().unwrap();
618 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
619
620 let hash1 = store.put_blob(&data).unwrap();
621 let hash2 = store.put_blob(&data).unwrap();
622 prop_assert_eq!(hash1, hash2);
623 prop_assert_eq!(store.blob_count().unwrap(), 1);
624 }
625 }
626 }
627
628 mod pack_tests {
629 use super::*;
630
631 #[test]
632 fn test_get_blob_from_pack() {
633 let dir = tempfile::tempdir().unwrap();
634 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
635
636 let hash1 = store.put_blob(b"packed blob one").unwrap();
637 let hash2 = store.put_blob(b"packed blob two").unwrap();
638
639 let packed = store.repack(0).unwrap();
640 assert_eq!(packed, 2);
641
642 assert_eq!(store.blob_count().unwrap(), 0);
643
644 let data1 = store.get_blob(&hash1).unwrap();
645 assert_eq!(data1, b"packed blob one".to_vec());
646
647 let data2 = store.get_blob(&hash2).unwrap();
648 assert_eq!(data2, b"packed blob two".to_vec());
649 }
650
651 #[test]
652 fn test_has_blob_checks_packs() {
653 let dir = tempfile::tempdir().unwrap();
654 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
655
656 let hash = store.put_blob(b"check me in packs").unwrap();
657 store.repack(0).unwrap();
658
659 assert!(store.has_blob(&hash));
660 assert!(!store.has_blob(&Hash::from_hex(&"c".repeat(64)).unwrap()));
661 }
662
663 #[test]
664 fn test_get_blob_packed_not_found() {
665 let dir = tempfile::tempdir().unwrap();
666 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
667
668 let missing = Hash::from_hex(&"d".repeat(64)).unwrap();
669 let result = store.get_blob_packed(&missing);
670 assert!(matches!(result, Err(CasError::BlobNotFound(_))));
671 }
672
673 #[test]
674 fn test_list_blobs_packed() {
675 let dir = tempfile::tempdir().unwrap();
676 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
677
678 store.put_blob(b"alpha").unwrap();
679 store.put_blob(b"beta").unwrap();
680 store.repack(0).unwrap();
681
682 let packed = store.list_blobs_packed().unwrap();
683 assert_eq!(packed.len(), 2);
684 }
685
686 #[test]
687 fn test_repack_below_threshold() {
688 let dir = tempfile::tempdir().unwrap();
689 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
690
691 store.put_blob(b"only one").unwrap();
692
693 let packed = store.repack(10).unwrap();
694 assert_eq!(packed, 0);
695 assert_eq!(store.blob_count().unwrap(), 1);
696 }
697
698 #[test]
699 fn test_repack_at_threshold() {
700 let dir = tempfile::tempdir().unwrap();
701 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
702
703 store.put_blob(b"one").unwrap();
704 store.put_blob(b"two").unwrap();
705
706 let packed = store.repack(2).unwrap();
707 assert_eq!(packed, 0);
708 assert_eq!(store.blob_count().unwrap(), 2);
709
710 let packed = store.repack(1).unwrap();
711 assert_eq!(packed, 2);
712 assert_eq!(store.blob_count().unwrap(), 0);
713 }
714
715 #[test]
716 fn test_loose_priority_over_packed() {
717 let dir = tempfile::tempdir().unwrap();
718 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
719
720 let hash = store.put_blob(b"original data").unwrap();
721 store.repack(0).unwrap();
722
723 let blob_path = store.blob_path(&hash);
725 if let Some(parent) = blob_path.parent() {
726 fs::create_dir_all(parent).unwrap();
727 }
728 fs::write(&blob_path, b"original data").unwrap();
729
730 let data = store.get_blob(&hash).unwrap();
731 assert_eq!(data, b"original data".to_vec());
732
733 store.delete_blob(&hash).unwrap();
735 let data = store.get_blob(&hash).unwrap();
736 assert_eq!(data, b"original data".to_vec());
737 }
738
739 #[test]
740 fn test_has_blob_packed() {
741 let dir = tempfile::tempdir().unwrap();
742 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
743
744 let hash = store.put_blob(b"packed check").unwrap();
745 assert!(!store.has_blob_packed(&hash));
746
747 store.repack(0).unwrap();
748 assert!(store.has_blob_packed(&hash));
749 }
750
751 #[test]
752 fn test_repack_multiple_times() {
753 let dir = tempfile::tempdir().unwrap();
754 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
755
756 store.put_blob(b"first batch one").unwrap();
757 store.put_blob(b"first batch two").unwrap();
758 store.repack(0).unwrap();
759
760 store.put_blob(b"second batch").unwrap();
761 store.repack(0).unwrap();
762
763 let all = store.list_blobs_packed().unwrap();
764 assert_eq!(all.len(), 3);
765 }
766
767 #[test]
768 fn test_pack_cache_avoids_repeated_disk_reads() {
769 let dir = tempfile::tempdir().unwrap();
770 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
771
772 let hash = store.put_blob(b"cache me").unwrap();
773 store.repack(0).unwrap();
774
775 assert!(store.has_blob_packed(&hash));
777 {
779 let guard = store.pack_cache.lock().unwrap();
780 assert!(
781 guard.is_some(),
782 "pack cache should be populated after first access"
783 );
784 }
785
786 assert!(store.has_blob_packed(&hash));
788
789 let data = store.get_blob_packed(&hash).unwrap();
791 assert_eq!(data, b"cache me".to_vec());
792 }
793
794 #[test]
795 fn test_invalidate_pack_cache() {
796 let dir = tempfile::tempdir().unwrap();
797 let store = BlobStore::new_uncompressed(dir.path()).unwrap();
798
799 let hash = store.put_blob(b"invalidate test").unwrap();
800 store.repack(0).unwrap();
801
802 assert!(store.has_blob_packed(&hash));
804 assert!(store.pack_cache.lock().unwrap().is_some());
805
806 store.invalidate_pack_cache();
808 assert!(store.pack_cache.lock().unwrap().is_none());
809
810 assert!(store.has_blob_packed(&hash));
812 assert!(store.pack_cache.lock().unwrap().is_some());
813 }
814 }
815}