1use async_trait::async_trait;
10use hashtree_core::store::{Store, StoreError, StoreStats};
11use hashtree_core::types::Hash;
12use std::collections::HashMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::RwLock;
17use std::time::SystemTime;
18
19pub struct FsBlobStore {
25 base_path: PathBuf,
26 max_bytes: AtomicU64,
27 pins: RwLock<HashMap<String, u32>>,
29}
30
31impl FsBlobStore {
32 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
36 let base_path = path.as_ref().to_path_buf();
37 fs::create_dir_all(&base_path)?;
38
39 let pins = Self::load_pins(&base_path).unwrap_or_default();
41
42 Ok(Self {
43 base_path,
44 max_bytes: AtomicU64::new(0), pins: RwLock::new(pins),
46 })
47 }
48
49 pub fn with_max_bytes<P: AsRef<Path>>(path: P, max_bytes: u64) -> Result<Self, StoreError> {
51 let store = Self::new(path)?;
52 store.max_bytes.store(max_bytes, Ordering::Relaxed);
53 Ok(store)
54 }
55
56 fn pins_path(&self) -> PathBuf {
58 self.base_path.join("pins.json")
59 }
60
61 fn load_pins(base_path: &Path) -> Option<HashMap<String, u32>> {
63 let pins_path = base_path.join("pins.json");
64 let contents = fs::read_to_string(pins_path).ok()?;
65 serde_json::from_str(&contents).ok()
66 }
67
68 fn save_pins(&self) -> Result<(), StoreError> {
70 let pins = self.pins.read().unwrap();
71 let json = serde_json::to_string(&*pins)
72 .map_err(|e| StoreError::Other(format!("Failed to serialize pins: {}", e)))?;
73 fs::write(self.pins_path(), json)?;
74 Ok(())
75 }
76
77 fn blob_path(&self, hash: &Hash) -> PathBuf {
81 let hex = hex::encode(hash);
82 let (prefix, rest) = hex.split_at(2);
83 self.base_path.join(prefix).join(rest)
84 }
85
86 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
88 let path = self.blob_path(&hash);
89
90 if path.exists() {
92 return Ok(false);
93 }
94
95 if let Some(parent) = path.parent() {
97 fs::create_dir_all(parent)?;
98 }
99
100 let temp_path = path.with_extension("tmp");
102 fs::write(&temp_path, data)?;
103 fs::rename(&temp_path, &path)?;
104
105 Ok(true)
106 }
107
108 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
110 let path = self.blob_path(hash);
111 if path.exists() {
112 Ok(Some(fs::read(&path)?))
113 } else {
114 Ok(None)
115 }
116 }
117
118 pub fn exists(&self, hash: &Hash) -> bool {
120 self.blob_path(hash).exists()
121 }
122
123 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
125 let path = self.blob_path(hash);
126 if path.exists() {
127 fs::remove_file(&path)?;
128 Ok(true)
129 } else {
130 Ok(false)
131 }
132 }
133
134 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
136 let mut hashes = Vec::new();
137
138 let entries = match fs::read_dir(&self.base_path) {
140 Ok(e) => e,
141 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(hashes),
142 Err(e) => return Err(e.into()),
143 };
144
145 for prefix_entry in entries {
146 let prefix_entry = prefix_entry?;
147 let prefix_path = prefix_entry.path();
148
149 if !prefix_path.is_dir() {
150 continue;
151 }
152
153 let prefix = match prefix_path.file_name().and_then(|n| n.to_str()) {
154 Some(p) if p.len() == 2 => p.to_string(),
155 _ => continue,
156 };
157
158 for blob_entry in fs::read_dir(&prefix_path)? {
160 let blob_entry = blob_entry?;
161 let rest = match blob_entry.file_name().to_str() {
162 Some(r) if r.len() == 62 => r.to_string(),
163 _ => continue,
164 };
165
166 let full_hex = format!("{}{}", prefix, rest);
168 if let Ok(bytes) = hex::decode(&full_hex) {
169 if bytes.len() == 32 {
170 let mut hash = [0u8; 32];
171 hash.copy_from_slice(&bytes);
172 hashes.push(hash);
173 }
174 }
175 }
176 }
177
178 Ok(hashes)
179 }
180
181 pub fn stats(&self) -> Result<FsStats, StoreError> {
183 let pins = self.pins.read().unwrap();
184 let mut count = 0usize;
185 let mut total_bytes = 0u64;
186 let mut pinned_count = 0usize;
187 let mut pinned_bytes = 0u64;
188
189 let entries = match fs::read_dir(&self.base_path) {
190 Ok(e) => e,
191 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
192 return Ok(FsStats {
193 count,
194 total_bytes,
195 pinned_count,
196 pinned_bytes,
197 })
198 }
199 Err(e) => return Err(e.into()),
200 };
201
202 for prefix_entry in entries {
203 let prefix_entry = prefix_entry?;
204 let prefix_path = prefix_entry.path();
205
206 if !prefix_path.is_dir() {
207 continue;
208 }
209
210 let prefix = match prefix_path.file_name().and_then(|n| n.to_str()) {
211 Some(p) if p.len() == 2 => p,
212 _ => continue,
213 };
214
215 for blob_entry in fs::read_dir(&prefix_path)? {
216 let blob_entry = blob_entry?;
217 if blob_entry.path().is_file() {
218 let size = blob_entry.metadata()?.len();
219 count += 1;
220 total_bytes += size;
221
222 if let Some(rest) = blob_entry.file_name().to_str() {
224 let hex = format!("{}{}", prefix, rest);
225 if pins.get(&hex).copied().unwrap_or(0) > 0 {
226 pinned_count += 1;
227 pinned_bytes += size;
228 }
229 }
230 }
231 }
232 }
233
234 Ok(FsStats {
235 count,
236 total_bytes,
237 pinned_count,
238 pinned_bytes,
239 })
240 }
241
242 fn collect_blobs_for_eviction(&self) -> Vec<(PathBuf, String, SystemTime, u64)> {
244 let mut blobs = Vec::new();
245
246 let entries = match fs::read_dir(&self.base_path) {
247 Ok(e) => e,
248 Err(_) => return blobs,
249 };
250
251 for prefix_entry in entries.flatten() {
252 let prefix_path = prefix_entry.path();
253 if !prefix_path.is_dir() {
254 continue;
255 }
256
257 let prefix = match prefix_path.file_name().and_then(|n| n.to_str()) {
258 Some(p) if p.len() == 2 => p.to_string(),
259 _ => continue,
260 };
261
262 if let Ok(blob_entries) = fs::read_dir(&prefix_path) {
263 for blob_entry in blob_entries.flatten() {
264 let path = blob_entry.path();
265 if !path.is_file() {
266 continue;
267 }
268
269 if let Ok(metadata) = blob_entry.metadata() {
270 let mtime = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
271 let size = metadata.len();
272
273 if let Some(rest) = blob_entry.file_name().to_str() {
274 let hex = format!("{}{}", prefix, rest);
275 blobs.push((path, hex, mtime, size));
276 }
277 }
278 }
279 }
280 }
281
282 blobs
283 }
284
285 fn evict_to_target(&self, target_bytes: u64) -> u64 {
287 let pins = self.pins.read().unwrap();
288
289 let mut blobs = self.collect_blobs_for_eviction();
291
292 blobs.retain(|(_, hex, _, _)| pins.get(hex).copied().unwrap_or(0) == 0);
294
295 blobs.sort_by_key(|(_, _, mtime, _)| *mtime);
297
298 drop(pins); let current_bytes: u64 = self
302 .collect_blobs_for_eviction()
303 .iter()
304 .map(|(_, _, _, size)| *size)
305 .sum();
306
307 if current_bytes <= target_bytes {
308 return 0;
309 }
310
311 let to_free = current_bytes - target_bytes;
312 let mut freed = 0u64;
313
314 for (path, _, _, size) in blobs {
315 if freed >= to_free {
316 break;
317 }
318 if fs::remove_file(&path).is_ok() {
319 freed += size;
320 }
321 }
322
323 freed
324 }
325}
326
327#[derive(Debug, Clone)]
329pub struct FsStats {
330 pub count: usize,
331 pub total_bytes: u64,
332 pub pinned_count: usize,
333 pub pinned_bytes: u64,
334}
335
336#[async_trait]
337impl Store for FsBlobStore {
338 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
339 self.put_sync(hash, &data)
340 }
341
342 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
343 self.get_sync(hash)
344 }
345
346 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
347 Ok(self.exists(hash))
348 }
349
350 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
351 let hex = hex::encode(hash);
352 {
354 let mut pins = self.pins.write().unwrap();
355 pins.remove(&hex);
356 }
357 let _ = self.save_pins(); self.delete_sync(hash)
359 }
360
361 fn set_max_bytes(&self, max: u64) {
362 self.max_bytes.store(max, Ordering::Relaxed);
363 }
364
365 fn max_bytes(&self) -> Option<u64> {
366 let max = self.max_bytes.load(Ordering::Relaxed);
367 if max > 0 {
368 Some(max)
369 } else {
370 None
371 }
372 }
373
374 async fn stats(&self) -> StoreStats {
375 match self.stats() {
376 Ok(fs_stats) => StoreStats {
377 count: fs_stats.count as u64,
378 bytes: fs_stats.total_bytes,
379 pinned_count: fs_stats.pinned_count as u64,
380 pinned_bytes: fs_stats.pinned_bytes,
381 },
382 Err(_) => StoreStats::default(),
383 }
384 }
385
386 async fn evict_if_needed(&self) -> Result<u64, StoreError> {
387 let max = self.max_bytes.load(Ordering::Relaxed);
388 if max == 0 {
389 return Ok(0); }
391
392 let current = match self.stats() {
393 Ok(s) => s.total_bytes,
394 Err(_) => return Ok(0),
395 };
396
397 if current <= max {
398 return Ok(0);
399 }
400
401 let target = max * 9 / 10;
403 Ok(self.evict_to_target(target))
404 }
405
406 async fn pin(&self, hash: &Hash) -> Result<(), StoreError> {
407 let hex = hex::encode(hash);
408 {
409 let mut pins = self.pins.write().unwrap();
410 *pins.entry(hex).or_insert(0) += 1;
411 }
412 self.save_pins()
413 }
414
415 async fn unpin(&self, hash: &Hash) -> Result<(), StoreError> {
416 let hex = hex::encode(hash);
417 {
418 let mut pins = self.pins.write().unwrap();
419 if let Some(count) = pins.get_mut(&hex) {
420 if *count > 0 {
421 *count -= 1;
422 }
423 if *count == 0 {
424 pins.remove(&hex);
425 }
426 }
427 }
428 self.save_pins()
429 }
430
431 fn pin_count(&self, hash: &Hash) -> u32 {
432 let hex = hex::encode(hash);
433 self.pins.read().unwrap().get(&hex).copied().unwrap_or(0)
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440 use hashtree_core::sha256;
441 use tempfile::TempDir;
442
443 #[tokio::test]
444 async fn test_put_get() {
445 let temp = TempDir::new().unwrap();
446 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
447
448 let data = b"hello filesystem";
449 let hash = sha256(data);
450 store.put(hash, data.to_vec()).await.unwrap();
451
452 assert!(store.has(&hash).await.unwrap());
453 assert_eq!(store.get(&hash).await.unwrap(), Some(data.to_vec()));
454 }
455
456 #[tokio::test]
457 async fn test_get_missing() {
458 let temp = TempDir::new().unwrap();
459 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
460
461 let hash = [0u8; 32];
462 assert!(!store.has(&hash).await.unwrap());
463 assert_eq!(store.get(&hash).await.unwrap(), None);
464 }
465
466 #[tokio::test]
467 async fn test_delete() {
468 let temp = TempDir::new().unwrap();
469 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
470
471 let data = b"delete me";
472 let hash = sha256(data);
473 store.put(hash, data.to_vec()).await.unwrap();
474 assert!(store.has(&hash).await.unwrap());
475
476 assert!(store.delete(&hash).await.unwrap());
477 assert!(!store.has(&hash).await.unwrap());
478 assert!(!store.delete(&hash).await.unwrap());
479 }
480
481 #[tokio::test]
482 async fn test_deduplication() {
483 let temp = TempDir::new().unwrap();
484 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
485
486 let data = b"same content";
487 let hash = sha256(data);
488
489 assert!(store.put(hash, data.to_vec()).await.unwrap());
491 assert!(!store.put(hash, data.to_vec()).await.unwrap());
493
494 assert_eq!(store.list().unwrap().len(), 1);
495 }
496
497 #[tokio::test]
498 async fn test_list() {
499 let temp = TempDir::new().unwrap();
500 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
501
502 let d1 = b"one";
503 let d2 = b"two";
504 let d3 = b"three";
505 let h1 = sha256(d1);
506 let h2 = sha256(d2);
507 let h3 = sha256(d3);
508
509 store.put(h1, d1.to_vec()).await.unwrap();
510 store.put(h2, d2.to_vec()).await.unwrap();
511 store.put(h3, d3.to_vec()).await.unwrap();
512
513 let hashes = store.list().unwrap();
514 assert_eq!(hashes.len(), 3);
515 assert!(hashes.contains(&h1));
516 assert!(hashes.contains(&h2));
517 assert!(hashes.contains(&h3));
518 }
519
520 #[tokio::test]
521 async fn test_stats() {
522 let temp = TempDir::new().unwrap();
523 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
524
525 let d1 = b"hello";
526 let d2 = b"world";
527 let h1 = sha256(d1);
528 store.put(h1, d1.to_vec()).await.unwrap();
529 store.put(sha256(d2), d2.to_vec()).await.unwrap();
530
531 let stats = store.stats().unwrap();
532 assert_eq!(stats.count, 2);
533 assert_eq!(stats.total_bytes, 10);
534 assert_eq!(stats.pinned_count, 0);
535 assert_eq!(stats.pinned_bytes, 0);
536
537 store.pin(&h1).await.unwrap();
539 let stats = store.stats().unwrap();
540 assert_eq!(stats.pinned_count, 1);
541 assert_eq!(stats.pinned_bytes, 5);
542 }
543
544 #[tokio::test]
545 async fn test_directory_structure() {
546 let temp = TempDir::new().unwrap();
547 let blobs_path = temp.path().join("blobs");
548 let store = FsBlobStore::new(&blobs_path).unwrap();
549
550 let data = b"test data";
551 let hash = sha256(data);
552 let hex = hex::encode(hash);
553
554 store.put(hash, data.to_vec()).await.unwrap();
555
556 let prefix = &hex[..2];
558 let rest = &hex[2..];
559 let expected_path = blobs_path.join(prefix).join(rest);
560
561 assert!(
562 expected_path.exists(),
563 "Blob should be at {:?}",
564 expected_path
565 );
566 assert_eq!(fs::read(&expected_path).unwrap(), data);
567 }
568
569 #[test]
570 fn test_blob_path_format() {
571 let temp = TempDir::new().unwrap();
572 let store = FsBlobStore::new(temp.path()).unwrap();
573
574 let mut hash = [0u8; 32];
576 hash[0] = 0x00;
577 hash[1] = 0x11;
578 hash[2] = 0x22;
579
580 let path = store.blob_path(&hash);
581 let path_str = path.to_string_lossy();
582
583 assert!(
585 path_str.contains("/00/"),
586 "Path should contain /00/ directory: {}",
587 path_str
588 );
589 assert!(path.file_name().unwrap().len() == 62);
591 }
592
593 #[tokio::test]
594 async fn test_empty_store_stats() {
595 let temp = TempDir::new().unwrap();
596 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
597
598 let stats = store.stats().unwrap();
599 assert_eq!(stats.count, 0);
600 assert_eq!(stats.total_bytes, 0);
601 }
602
603 #[tokio::test]
604 async fn test_empty_store_list() {
605 let temp = TempDir::new().unwrap();
606 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
607
608 let hashes = store.list().unwrap();
609 assert!(hashes.is_empty());
610 }
611
612 #[tokio::test]
613 async fn test_pin_and_unpin() {
614 let temp = TempDir::new().unwrap();
615 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
616
617 let data = b"pin me";
618 let hash = sha256(data);
619 store.put(hash, data.to_vec()).await.unwrap();
620
621 assert!(!store.is_pinned(&hash));
623 assert_eq!(store.pin_count(&hash), 0);
624
625 store.pin(&hash).await.unwrap();
627 assert!(store.is_pinned(&hash));
628 assert_eq!(store.pin_count(&hash), 1);
629
630 store.unpin(&hash).await.unwrap();
632 assert!(!store.is_pinned(&hash));
633 assert_eq!(store.pin_count(&hash), 0);
634 }
635
636 #[tokio::test]
637 async fn test_pin_ref_counting() {
638 let temp = TempDir::new().unwrap();
639 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
640
641 let data = b"multi pin";
642 let hash = sha256(data);
643 store.put(hash, data.to_vec()).await.unwrap();
644
645 store.pin(&hash).await.unwrap();
647 store.pin(&hash).await.unwrap();
648 store.pin(&hash).await.unwrap();
649 assert_eq!(store.pin_count(&hash), 3);
650
651 store.unpin(&hash).await.unwrap();
653 assert_eq!(store.pin_count(&hash), 2);
654 assert!(store.is_pinned(&hash));
655
656 store.unpin(&hash).await.unwrap();
658 store.unpin(&hash).await.unwrap();
659 assert_eq!(store.pin_count(&hash), 0);
660 }
661
662 #[tokio::test]
663 async fn test_pins_persist_across_reload() {
664 let temp = TempDir::new().unwrap();
665 let blobs_path = temp.path().join("blobs");
666
667 let data = b"persist me";
668 let hash = sha256(data);
669
670 {
672 let store = FsBlobStore::new(&blobs_path).unwrap();
673 store.put(hash, data.to_vec()).await.unwrap();
674 store.pin(&hash).await.unwrap();
675 store.pin(&hash).await.unwrap();
676 assert_eq!(store.pin_count(&hash), 2);
677 }
678
679 {
681 let store = FsBlobStore::new(&blobs_path).unwrap();
682 assert_eq!(store.pin_count(&hash), 2);
683 assert!(store.is_pinned(&hash));
684 }
685 }
686
687 #[tokio::test]
688 async fn test_max_bytes() {
689 let temp = TempDir::new().unwrap();
690 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
691
692 assert!(store.max_bytes().is_none());
693
694 store.set_max_bytes(1000);
695 assert_eq!(store.max_bytes(), Some(1000));
696
697 store.set_max_bytes(0);
698 assert!(store.max_bytes().is_none());
699 }
700
701 #[tokio::test]
702 async fn test_with_max_bytes() {
703 let temp = TempDir::new().unwrap();
704 let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 500).unwrap();
705 assert_eq!(store.max_bytes(), Some(500));
706 }
707
708 #[tokio::test]
709 async fn test_eviction_respects_pins() {
710 let temp = TempDir::new().unwrap();
711 let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 20).unwrap();
713
714 let d1 = b"aaaaa"; let d2 = b"bbbbb";
717 let d3 = b"ccccc";
718 let h1 = sha256(d1);
719 let h2 = sha256(d2);
720 let h3 = sha256(d3);
721
722 store.put(h1, d1.to_vec()).await.unwrap();
723 std::thread::sleep(std::time::Duration::from_millis(10)); store.put(h2, d2.to_vec()).await.unwrap();
725 std::thread::sleep(std::time::Duration::from_millis(10));
726 store.put(h3, d3.to_vec()).await.unwrap();
727
728 store.pin(&h1).await.unwrap();
730
731 let d4 = b"ddddd";
733 let h4 = sha256(d4);
734 std::thread::sleep(std::time::Duration::from_millis(10));
735 store.put(h4, d4.to_vec()).await.unwrap();
736
737 let d5 = b"eeeee";
739 let h5 = sha256(d5);
740 std::thread::sleep(std::time::Duration::from_millis(10));
741 store.put(h5, d5.to_vec()).await.unwrap();
742
743 let freed = store.evict_if_needed().await.unwrap();
745 assert!(freed > 0, "Should have freed some bytes");
746
747 assert!(store.has(&h1).await.unwrap(), "Pinned item should exist");
749 assert!(
751 !store.has(&h2).await.unwrap(),
752 "Oldest unpinned should be evicted"
753 );
754 assert!(store.has(&h5).await.unwrap(), "Newest should exist");
756 }
757
758 #[tokio::test]
759 async fn test_no_eviction_when_under_limit() {
760 let temp = TempDir::new().unwrap();
761 let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 1000).unwrap();
762
763 let data = b"small";
764 let hash = sha256(data);
765 store.put(hash, data.to_vec()).await.unwrap();
766
767 let freed = store.evict_if_needed().await.unwrap();
768 assert_eq!(freed, 0);
769 assert!(store.has(&hash).await.unwrap());
770 }
771
772 #[tokio::test]
773 async fn test_no_eviction_without_limit() {
774 let temp = TempDir::new().unwrap();
775 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
776
777 for i in 0..10u8 {
778 let data = vec![i; 100];
779 let hash = sha256(&data);
780 store.put(hash, data).await.unwrap();
781 }
782
783 let freed = store.evict_if_needed().await.unwrap();
784 assert_eq!(freed, 0);
785 assert_eq!(store.list().unwrap().len(), 10);
786 }
787
788 #[tokio::test]
789 async fn test_delete_removes_pin() {
790 let temp = TempDir::new().unwrap();
791 let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
792
793 let data = b"delete pinned";
794 let hash = sha256(data);
795 store.put(hash, data.to_vec()).await.unwrap();
796 store.pin(&hash).await.unwrap();
797 assert!(store.is_pinned(&hash));
798
799 store.delete(&hash).await.unwrap();
800 assert_eq!(store.pin_count(&hash), 0);
801 }
802}