Skip to main content

hashtree_core/
store.rs

1//! Content-addressed key-value store interfaces and implementations
2
3use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6
7use crate::types::{to_hex, Hash};
8
9/// Storage statistics
10#[derive(Debug, Clone, Default)]
11pub struct StoreStats {
12    /// Number of items in store
13    pub count: u64,
14    /// Total bytes stored
15    pub bytes: u64,
16    /// Number of pinned items
17    pub pinned_count: u64,
18    /// Bytes used by pinned items
19    pub pinned_bytes: u64,
20}
21
22/// Content-addressed key-value store interface
23#[async_trait]
24pub trait Store: Send + Sync {
25    /// Store data by its hash
26    /// Returns true if newly stored, false if already existed
27    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError>;
28
29    /// Retrieve data by hash
30    /// Returns data or None if not found
31    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError>;
32
33    /// Check if hash exists
34    async fn has(&self, hash: &Hash) -> Result<bool, StoreError>;
35
36    /// Delete by hash
37    /// Returns true if deleted, false if didn't exist
38    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError>;
39
40    // ========================================================================
41    // Optional: Storage limits and eviction (default no-op implementations)
42    // ========================================================================
43
44    /// Set maximum storage size in bytes. 0 = unlimited.
45    fn set_max_bytes(&self, _max: u64) {}
46
47    /// Get maximum storage size. None = unlimited.
48    fn max_bytes(&self) -> Option<u64> {
49        None
50    }
51
52    /// Get storage statistics
53    async fn stats(&self) -> StoreStats {
54        StoreStats::default()
55    }
56
57    /// Evict unpinned items if over storage limit.
58    /// Returns number of bytes freed.
59    async fn evict_if_needed(&self) -> Result<u64, StoreError> {
60        Ok(0)
61    }
62
63    // ========================================================================
64    // Optional: Pinning (default no-op implementations)
65    // ========================================================================
66
67    /// Pin a hash (increment ref count). Pinned items are not evicted.
68    async fn pin(&self, _hash: &Hash) -> Result<(), StoreError> {
69        Ok(())
70    }
71
72    /// Unpin a hash (decrement ref count). Item can be evicted when count reaches 0.
73    async fn unpin(&self, _hash: &Hash) -> Result<(), StoreError> {
74        Ok(())
75    }
76
77    /// Get pin count for a hash. 0 = not pinned.
78    fn pin_count(&self, _hash: &Hash) -> u32 {
79        0
80    }
81
82    /// Check if hash is pinned (pin count > 0)
83    fn is_pinned(&self, hash: &Hash) -> bool {
84        self.pin_count(hash) > 0
85    }
86}
87
88/// Store error type
89#[derive(Debug, thiserror::Error)]
90pub enum StoreError {
91    #[error("IO error: {0}")]
92    Io(#[from] std::io::Error),
93    #[error("Store error: {0}")]
94    Other(String),
95}
96
97/// Entry in the memory store with metadata for LRU
98#[derive(Debug, Clone)]
99struct MemoryEntry {
100    data: Vec<u8>,
101    /// Insertion order for LRU (lower = older)
102    order: u64,
103}
104
105/// Internal state for MemoryStore
106#[derive(Debug, Default)]
107struct MemoryStoreInner {
108    data: HashMap<String, MemoryEntry>,
109    pins: HashMap<String, u32>,
110    next_order: u64,
111    max_bytes: Option<u64>,
112}
113
114/// In-memory content-addressed store with LRU eviction and pinning
115#[derive(Debug, Clone, Default)]
116pub struct MemoryStore {
117    inner: Arc<RwLock<MemoryStoreInner>>,
118}
119
120impl MemoryStore {
121    pub fn new() -> Self {
122        Self {
123            inner: Arc::new(RwLock::new(MemoryStoreInner::default())),
124        }
125    }
126
127    /// Create a new store with a maximum size limit
128    pub fn with_max_bytes(max_bytes: u64) -> Self {
129        Self {
130            inner: Arc::new(RwLock::new(MemoryStoreInner {
131                max_bytes: if max_bytes > 0 { Some(max_bytes) } else { None },
132                ..Default::default()
133            })),
134        }
135    }
136
137    /// Get number of stored items
138    pub fn size(&self) -> usize {
139        self.inner.read().unwrap().data.len()
140    }
141
142    /// Get total bytes stored
143    pub fn total_bytes(&self) -> usize {
144        self.inner
145            .read()
146            .unwrap()
147            .data
148            .values()
149            .map(|e| e.data.len())
150            .sum()
151    }
152
153    /// Clear all data (but not pins)
154    pub fn clear(&self) {
155        self.inner.write().unwrap().data.clear();
156    }
157
158    /// List all hashes
159    pub fn keys(&self) -> Vec<Hash> {
160        self.inner
161            .read()
162            .unwrap()
163            .data
164            .keys()
165            .filter_map(|hex| {
166                let bytes = hex::decode(hex).ok()?;
167                if bytes.len() != 32 {
168                    return None;
169                }
170                let mut hash = [0u8; 32];
171                hash.copy_from_slice(&bytes);
172                Some(hash)
173            })
174            .collect()
175    }
176
177    /// Evict oldest unpinned entries until under target bytes
178    fn evict_to_target(&self, target_bytes: u64) -> u64 {
179        let mut inner = self.inner.write().unwrap();
180
181        let current_bytes: u64 = inner.data.values().map(|e| e.data.len() as u64).sum();
182        if current_bytes <= target_bytes {
183            return 0;
184        }
185
186        // Collect unpinned entries sorted by order (oldest first)
187        let mut unpinned: Vec<(String, u64, u64)> = inner
188            .data
189            .iter()
190            .filter(|(key, _)| inner.pins.get(*key).copied().unwrap_or(0) == 0)
191            .map(|(key, entry)| (key.clone(), entry.order, entry.data.len() as u64))
192            .collect();
193
194        unpinned.sort_by_key(|(_, order, _)| *order);
195
196        let mut freed = 0u64;
197        let to_free = current_bytes - target_bytes;
198
199        for (key, _, size) in unpinned {
200            if freed >= to_free {
201                break;
202            }
203            inner.data.remove(&key);
204            freed += size;
205        }
206
207        freed
208    }
209}
210
211#[async_trait]
212impl Store for MemoryStore {
213    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
214        let key = to_hex(&hash);
215        let mut inner = self.inner.write().unwrap();
216        if inner.data.contains_key(&key) {
217            return Ok(false);
218        }
219        let order = inner.next_order;
220        inner.next_order += 1;
221        inner.data.insert(key, MemoryEntry { data, order });
222        Ok(true)
223    }
224
225    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
226        let key = to_hex(hash);
227        let inner = self.inner.read().unwrap();
228        Ok(inner.data.get(&key).map(|e| e.data.clone()))
229    }
230
231    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
232        let key = to_hex(hash);
233        Ok(self.inner.read().unwrap().data.contains_key(&key))
234    }
235
236    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
237        let key = to_hex(hash);
238        let mut inner = self.inner.write().unwrap();
239        // Also remove pin entry if exists
240        inner.pins.remove(&key);
241        Ok(inner.data.remove(&key).is_some())
242    }
243
244    fn set_max_bytes(&self, max: u64) {
245        self.inner.write().unwrap().max_bytes = if max > 0 { Some(max) } else { None };
246    }
247
248    fn max_bytes(&self) -> Option<u64> {
249        self.inner.read().unwrap().max_bytes
250    }
251
252    async fn stats(&self) -> StoreStats {
253        let inner = self.inner.read().unwrap();
254        let mut count = 0u64;
255        let mut bytes = 0u64;
256        let mut pinned_count = 0u64;
257        let mut pinned_bytes = 0u64;
258
259        for (key, entry) in &inner.data {
260            count += 1;
261            bytes += entry.data.len() as u64;
262            if inner.pins.get(key).copied().unwrap_or(0) > 0 {
263                pinned_count += 1;
264                pinned_bytes += entry.data.len() as u64;
265            }
266        }
267
268        StoreStats {
269            count,
270            bytes,
271            pinned_count,
272            pinned_bytes,
273        }
274    }
275
276    async fn evict_if_needed(&self) -> Result<u64, StoreError> {
277        let max = match self.inner.read().unwrap().max_bytes {
278            Some(m) => m,
279            None => return Ok(0), // No limit set
280        };
281
282        let current: u64 = self
283            .inner
284            .read()
285            .unwrap()
286            .data
287            .values()
288            .map(|e| e.data.len() as u64)
289            .sum();
290
291        if current <= max {
292            return Ok(0);
293        }
294
295        // Evict to 90% of max to avoid frequent evictions
296        let target = max * 9 / 10;
297        Ok(self.evict_to_target(target))
298    }
299
300    async fn pin(&self, hash: &Hash) -> Result<(), StoreError> {
301        let key = to_hex(hash);
302        let mut inner = self.inner.write().unwrap();
303        *inner.pins.entry(key).or_insert(0) += 1;
304        Ok(())
305    }
306
307    async fn unpin(&self, hash: &Hash) -> Result<(), StoreError> {
308        let key = to_hex(hash);
309        let mut inner = self.inner.write().unwrap();
310        if let Some(count) = inner.pins.get_mut(&key) {
311            if *count > 0 {
312                *count -= 1;
313            }
314            if *count == 0 {
315                inner.pins.remove(&key);
316            }
317        }
318        Ok(())
319    }
320
321    fn pin_count(&self, hash: &Hash) -> u32 {
322        let key = to_hex(hash);
323        self.inner
324            .read()
325            .unwrap()
326            .pins
327            .get(&key)
328            .copied()
329            .unwrap_or(0)
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::hash::sha256;
337
338    #[tokio::test]
339    async fn test_put_returns_true_for_new() {
340        let store = MemoryStore::new();
341        let data = vec![1u8, 2, 3];
342        let hash = sha256(&data);
343
344        let result = store.put(hash, data).await.unwrap();
345        assert!(result);
346    }
347
348    #[tokio::test]
349    async fn test_put_returns_false_for_duplicate() {
350        let store = MemoryStore::new();
351        let data = vec![1u8, 2, 3];
352        let hash = sha256(&data);
353
354        store.put(hash, data.clone()).await.unwrap();
355        let result = store.put(hash, data).await.unwrap();
356        assert!(!result);
357    }
358
359    #[tokio::test]
360    async fn test_get_returns_data() {
361        let store = MemoryStore::new();
362        let data = vec![1u8, 2, 3];
363        let hash = sha256(&data);
364
365        store.put(hash, data.clone()).await.unwrap();
366        let result = store.get(&hash).await.unwrap();
367
368        assert_eq!(result, Some(data));
369    }
370
371    #[tokio::test]
372    async fn test_get_returns_none_for_missing() {
373        let store = MemoryStore::new();
374        let hash = [0u8; 32];
375
376        let result = store.get(&hash).await.unwrap();
377        assert!(result.is_none());
378    }
379
380    #[tokio::test]
381    async fn test_has_returns_true() {
382        let store = MemoryStore::new();
383        let data = vec![1u8, 2, 3];
384        let hash = sha256(&data);
385
386        store.put(hash, data).await.unwrap();
387        assert!(store.has(&hash).await.unwrap());
388    }
389
390    #[tokio::test]
391    async fn test_has_returns_false() {
392        let store = MemoryStore::new();
393        let hash = [0u8; 32];
394
395        assert!(!store.has(&hash).await.unwrap());
396    }
397
398    #[tokio::test]
399    async fn test_delete_returns_true() {
400        let store = MemoryStore::new();
401        let data = vec![1u8, 2, 3];
402        let hash = sha256(&data);
403
404        store.put(hash, data).await.unwrap();
405        let result = store.delete(&hash).await.unwrap();
406
407        assert!(result);
408        assert!(!store.has(&hash).await.unwrap());
409    }
410
411    #[tokio::test]
412    async fn test_delete_returns_false() {
413        let store = MemoryStore::new();
414        let hash = [0u8; 32];
415
416        let result = store.delete(&hash).await.unwrap();
417        assert!(!result);
418    }
419
420    #[tokio::test]
421    async fn test_size() {
422        let store = MemoryStore::new();
423        assert_eq!(store.size(), 0);
424
425        let data1 = vec![1u8];
426        let data2 = vec![2u8];
427        let hash1 = sha256(&data1);
428        let hash2 = sha256(&data2);
429
430        store.put(hash1, data1).await.unwrap();
431        store.put(hash2, data2).await.unwrap();
432
433        assert_eq!(store.size(), 2);
434    }
435
436    #[tokio::test]
437    async fn test_total_bytes() {
438        let store = MemoryStore::new();
439        assert_eq!(store.total_bytes(), 0);
440
441        let data1 = vec![1u8, 2, 3];
442        let data2 = vec![4u8, 5];
443        let hash1 = sha256(&data1);
444        let hash2 = sha256(&data2);
445
446        store.put(hash1, data1).await.unwrap();
447        store.put(hash2, data2).await.unwrap();
448
449        assert_eq!(store.total_bytes(), 5);
450    }
451
452    #[tokio::test]
453    async fn test_clear() {
454        let store = MemoryStore::new();
455        let data = vec![1u8, 2, 3];
456        let hash = sha256(&data);
457
458        store.put(hash, data).await.unwrap();
459        store.clear();
460
461        assert_eq!(store.size(), 0);
462        assert!(!store.has(&hash).await.unwrap());
463    }
464
465    #[tokio::test]
466    async fn test_keys() {
467        let store = MemoryStore::new();
468        assert!(store.keys().is_empty());
469
470        let data1 = vec![1u8];
471        let data2 = vec![2u8];
472        let hash1 = sha256(&data1);
473        let hash2 = sha256(&data2);
474
475        store.put(hash1, data1).await.unwrap();
476        store.put(hash2, data2).await.unwrap();
477
478        let keys = store.keys();
479        assert_eq!(keys.len(), 2);
480
481        let mut hex_keys: Vec<_> = keys.iter().map(to_hex).collect();
482        hex_keys.sort();
483        let mut expected: Vec<_> = vec![to_hex(&hash1), to_hex(&hash2)];
484        expected.sort();
485        assert_eq!(hex_keys, expected);
486    }
487
488    #[tokio::test]
489    async fn test_pin_and_unpin() {
490        let store = MemoryStore::new();
491        let data = vec![1u8, 2, 3];
492        let hash = sha256(&data);
493
494        store.put(hash, data).await.unwrap();
495
496        // Initially not pinned
497        assert!(!store.is_pinned(&hash));
498        assert_eq!(store.pin_count(&hash), 0);
499
500        // Pin
501        store.pin(&hash).await.unwrap();
502        assert!(store.is_pinned(&hash));
503        assert_eq!(store.pin_count(&hash), 1);
504
505        // Unpin
506        store.unpin(&hash).await.unwrap();
507        assert!(!store.is_pinned(&hash));
508        assert_eq!(store.pin_count(&hash), 0);
509    }
510
511    #[tokio::test]
512    async fn test_pin_count_ref_counting() {
513        let store = MemoryStore::new();
514        let data = vec![1u8, 2, 3];
515        let hash = sha256(&data);
516
517        store.put(hash, data).await.unwrap();
518
519        // Pin multiple times
520        store.pin(&hash).await.unwrap();
521        store.pin(&hash).await.unwrap();
522        store.pin(&hash).await.unwrap();
523        assert_eq!(store.pin_count(&hash), 3);
524
525        // Unpin once
526        store.unpin(&hash).await.unwrap();
527        assert_eq!(store.pin_count(&hash), 2);
528        assert!(store.is_pinned(&hash));
529
530        // Unpin remaining
531        store.unpin(&hash).await.unwrap();
532        store.unpin(&hash).await.unwrap();
533        assert_eq!(store.pin_count(&hash), 0);
534        assert!(!store.is_pinned(&hash));
535
536        // Extra unpin shouldn't go negative
537        store.unpin(&hash).await.unwrap();
538        assert_eq!(store.pin_count(&hash), 0);
539    }
540
541    #[tokio::test]
542    async fn test_stats() {
543        let store = MemoryStore::new();
544
545        let data1 = vec![1u8, 2, 3]; // 3 bytes
546        let data2 = vec![4u8, 5]; // 2 bytes
547        let hash1 = sha256(&data1);
548        let hash2 = sha256(&data2);
549
550        store.put(hash1, data1).await.unwrap();
551        store.put(hash2, data2).await.unwrap();
552
553        // Pin one item
554        store.pin(&hash1).await.unwrap();
555
556        let stats = store.stats().await;
557        assert_eq!(stats.count, 2);
558        assert_eq!(stats.bytes, 5);
559        assert_eq!(stats.pinned_count, 1);
560        assert_eq!(stats.pinned_bytes, 3);
561    }
562
563    #[tokio::test]
564    async fn test_max_bytes() {
565        let store = MemoryStore::new();
566        assert!(store.max_bytes().is_none());
567
568        store.set_max_bytes(1000);
569        assert_eq!(store.max_bytes(), Some(1000));
570
571        // 0 means unlimited
572        store.set_max_bytes(0);
573        assert!(store.max_bytes().is_none());
574    }
575
576    #[tokio::test]
577    async fn test_with_max_bytes() {
578        let store = MemoryStore::with_max_bytes(500);
579        assert_eq!(store.max_bytes(), Some(500));
580
581        let store_unlimited = MemoryStore::with_max_bytes(0);
582        assert!(store_unlimited.max_bytes().is_none());
583    }
584
585    #[tokio::test]
586    async fn test_eviction_respects_pins() {
587        // Store with 10 byte limit
588        let store = MemoryStore::with_max_bytes(10);
589
590        // Insert 3 items: 3 + 3 + 3 = 9 bytes
591        let data1 = vec![1u8, 1, 1]; // oldest
592        let data2 = vec![2u8, 2, 2];
593        let data3 = vec![3u8, 3, 3]; // newest
594        let hash1 = sha256(&data1);
595        let hash2 = sha256(&data2);
596        let hash3 = sha256(&data3);
597
598        store.put(hash1, data1).await.unwrap();
599        store.put(hash2, data2).await.unwrap();
600        store.put(hash3, data3).await.unwrap();
601
602        // Pin the oldest item
603        store.pin(&hash1).await.unwrap();
604
605        // Add more data to exceed limit: 9 + 3 = 12 bytes > 10
606        let data4 = vec![4u8, 4, 4];
607        let hash4 = sha256(&data4);
608        store.put(hash4, data4).await.unwrap();
609
610        // Evict - should remove hash2 (oldest unpinned)
611        let freed = store.evict_if_needed().await.unwrap();
612        assert!(freed > 0);
613
614        // hash1 should still exist (pinned)
615        assert!(store.has(&hash1).await.unwrap());
616        // hash2 should be gone (oldest unpinned)
617        assert!(!store.has(&hash2).await.unwrap());
618        // hash3 and hash4 should exist
619        assert!(store.has(&hash3).await.unwrap());
620        assert!(store.has(&hash4).await.unwrap());
621    }
622
623    #[tokio::test]
624    async fn test_eviction_lru_order() {
625        // Store with 15 byte limit
626        let store = MemoryStore::with_max_bytes(15);
627
628        // Insert items in order (oldest first)
629        let data1 = vec![1u8; 5]; // oldest
630        let data2 = vec![2u8; 5];
631        let data3 = vec![3u8; 5];
632        let data4 = vec![4u8; 5]; // newest
633        let hash1 = sha256(&data1);
634        let hash2 = sha256(&data2);
635        let hash3 = sha256(&data3);
636        let hash4 = sha256(&data4);
637
638        store.put(hash1, data1).await.unwrap();
639        store.put(hash2, data2).await.unwrap();
640        store.put(hash3, data3).await.unwrap();
641        store.put(hash4, data4).await.unwrap();
642
643        // Now at 20 bytes, limit is 15
644        assert_eq!(store.total_bytes(), 20);
645
646        // Evict - should remove oldest items first
647        let freed = store.evict_if_needed().await.unwrap();
648        assert!(freed >= 5); // At least one item evicted
649
650        // Oldest should be gone
651        assert!(!store.has(&hash1).await.unwrap());
652        // Newest should still exist
653        assert!(store.has(&hash4).await.unwrap());
654    }
655
656    #[tokio::test]
657    async fn test_no_eviction_when_under_limit() {
658        let store = MemoryStore::with_max_bytes(100);
659
660        let data = vec![1u8, 2, 3];
661        let hash = sha256(&data);
662        store.put(hash, data).await.unwrap();
663
664        let freed = store.evict_if_needed().await.unwrap();
665        assert_eq!(freed, 0);
666        assert!(store.has(&hash).await.unwrap());
667    }
668
669    #[tokio::test]
670    async fn test_no_eviction_without_limit() {
671        let store = MemoryStore::new();
672
673        // Add lots of data
674        for i in 0..100u8 {
675            let data = vec![i; 100];
676            let hash = sha256(&data);
677            store.put(hash, data).await.unwrap();
678        }
679
680        let freed = store.evict_if_needed().await.unwrap();
681        assert_eq!(freed, 0);
682        assert_eq!(store.size(), 100);
683    }
684
685    #[tokio::test]
686    async fn test_delete_removes_pin() {
687        let store = MemoryStore::new();
688        let data = vec![1u8, 2, 3];
689        let hash = sha256(&data);
690
691        store.put(hash, data).await.unwrap();
692        store.pin(&hash).await.unwrap();
693        assert!(store.is_pinned(&hash));
694
695        store.delete(&hash).await.unwrap();
696        // Pin should be gone after delete
697        assert_eq!(store.pin_count(&hash), 0);
698    }
699}