1use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6
7use crate::types::{to_hex, Hash};
8
9#[derive(Debug, Clone, Default)]
11pub struct StoreStats {
12 pub count: u64,
14 pub bytes: u64,
16 pub pinned_count: u64,
18 pub pinned_bytes: u64,
20}
21
22#[async_trait]
24pub trait Store: Send + Sync {
25 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError>;
28
29 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError>;
32
33 async fn has(&self, hash: &Hash) -> Result<bool, StoreError>;
35
36 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError>;
39
40 fn set_max_bytes(&self, _max: u64) {}
46
47 fn max_bytes(&self) -> Option<u64> {
49 None
50 }
51
52 async fn stats(&self) -> StoreStats {
54 StoreStats::default()
55 }
56
57 async fn evict_if_needed(&self) -> Result<u64, StoreError> {
60 Ok(0)
61 }
62
63 async fn pin(&self, _hash: &Hash) -> Result<(), StoreError> {
69 Ok(())
70 }
71
72 async fn unpin(&self, _hash: &Hash) -> Result<(), StoreError> {
74 Ok(())
75 }
76
77 fn pin_count(&self, _hash: &Hash) -> u32 {
79 0
80 }
81
82 fn is_pinned(&self, hash: &Hash) -> bool {
84 self.pin_count(hash) > 0
85 }
86}
87
88#[derive(Debug, thiserror::Error)]
90pub enum StoreError {
91 #[error("IO error: {0}")]
92 Io(#[from] std::io::Error),
93 #[error("Store error: {0}")]
94 Other(String),
95}
96
97#[derive(Debug, Clone)]
99struct MemoryEntry {
100 data: Vec<u8>,
101 order: u64,
103}
104
105#[derive(Debug, Default)]
107struct MemoryStoreInner {
108 data: HashMap<String, MemoryEntry>,
109 pins: HashMap<String, u32>,
110 next_order: u64,
111 max_bytes: Option<u64>,
112}
113
114#[derive(Debug, Clone, Default)]
116pub struct MemoryStore {
117 inner: Arc<RwLock<MemoryStoreInner>>,
118}
119
120impl MemoryStore {
121 pub fn new() -> Self {
122 Self {
123 inner: Arc::new(RwLock::new(MemoryStoreInner::default())),
124 }
125 }
126
127 pub fn with_max_bytes(max_bytes: u64) -> Self {
129 Self {
130 inner: Arc::new(RwLock::new(MemoryStoreInner {
131 max_bytes: if max_bytes > 0 { Some(max_bytes) } else { None },
132 ..Default::default()
133 })),
134 }
135 }
136
137 pub fn size(&self) -> usize {
139 self.inner.read().unwrap().data.len()
140 }
141
142 pub fn total_bytes(&self) -> usize {
144 self.inner
145 .read()
146 .unwrap()
147 .data
148 .values()
149 .map(|e| e.data.len())
150 .sum()
151 }
152
153 pub fn clear(&self) {
155 self.inner.write().unwrap().data.clear();
156 }
157
158 pub fn keys(&self) -> Vec<Hash> {
160 self.inner
161 .read()
162 .unwrap()
163 .data
164 .keys()
165 .filter_map(|hex| {
166 let bytes = hex::decode(hex).ok()?;
167 if bytes.len() != 32 {
168 return None;
169 }
170 let mut hash = [0u8; 32];
171 hash.copy_from_slice(&bytes);
172 Some(hash)
173 })
174 .collect()
175 }
176
177 fn evict_to_target(&self, target_bytes: u64) -> u64 {
179 let mut inner = self.inner.write().unwrap();
180
181 let current_bytes: u64 = inner.data.values().map(|e| e.data.len() as u64).sum();
182 if current_bytes <= target_bytes {
183 return 0;
184 }
185
186 let mut unpinned: Vec<(String, u64, u64)> = inner
188 .data
189 .iter()
190 .filter(|(key, _)| inner.pins.get(*key).copied().unwrap_or(0) == 0)
191 .map(|(key, entry)| (key.clone(), entry.order, entry.data.len() as u64))
192 .collect();
193
194 unpinned.sort_by_key(|(_, order, _)| *order);
195
196 let mut freed = 0u64;
197 let to_free = current_bytes - target_bytes;
198
199 for (key, _, size) in unpinned {
200 if freed >= to_free {
201 break;
202 }
203 inner.data.remove(&key);
204 freed += size;
205 }
206
207 freed
208 }
209}
210
211#[async_trait]
212impl Store for MemoryStore {
213 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
214 let key = to_hex(&hash);
215 let mut inner = self.inner.write().unwrap();
216 if inner.data.contains_key(&key) {
217 return Ok(false);
218 }
219 let order = inner.next_order;
220 inner.next_order += 1;
221 inner.data.insert(key, MemoryEntry { data, order });
222 Ok(true)
223 }
224
225 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
226 let key = to_hex(hash);
227 let inner = self.inner.read().unwrap();
228 Ok(inner.data.get(&key).map(|e| e.data.clone()))
229 }
230
231 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
232 let key = to_hex(hash);
233 Ok(self.inner.read().unwrap().data.contains_key(&key))
234 }
235
236 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
237 let key = to_hex(hash);
238 let mut inner = self.inner.write().unwrap();
239 inner.pins.remove(&key);
241 Ok(inner.data.remove(&key).is_some())
242 }
243
244 fn set_max_bytes(&self, max: u64) {
245 self.inner.write().unwrap().max_bytes = if max > 0 { Some(max) } else { None };
246 }
247
248 fn max_bytes(&self) -> Option<u64> {
249 self.inner.read().unwrap().max_bytes
250 }
251
252 async fn stats(&self) -> StoreStats {
253 let inner = self.inner.read().unwrap();
254 let mut count = 0u64;
255 let mut bytes = 0u64;
256 let mut pinned_count = 0u64;
257 let mut pinned_bytes = 0u64;
258
259 for (key, entry) in &inner.data {
260 count += 1;
261 bytes += entry.data.len() as u64;
262 if inner.pins.get(key).copied().unwrap_or(0) > 0 {
263 pinned_count += 1;
264 pinned_bytes += entry.data.len() as u64;
265 }
266 }
267
268 StoreStats {
269 count,
270 bytes,
271 pinned_count,
272 pinned_bytes,
273 }
274 }
275
276 async fn evict_if_needed(&self) -> Result<u64, StoreError> {
277 let max = match self.inner.read().unwrap().max_bytes {
278 Some(m) => m,
279 None => return Ok(0), };
281
282 let current: u64 = self
283 .inner
284 .read()
285 .unwrap()
286 .data
287 .values()
288 .map(|e| e.data.len() as u64)
289 .sum();
290
291 if current <= max {
292 return Ok(0);
293 }
294
295 let target = max * 9 / 10;
297 Ok(self.evict_to_target(target))
298 }
299
300 async fn pin(&self, hash: &Hash) -> Result<(), StoreError> {
301 let key = to_hex(hash);
302 let mut inner = self.inner.write().unwrap();
303 *inner.pins.entry(key).or_insert(0) += 1;
304 Ok(())
305 }
306
307 async fn unpin(&self, hash: &Hash) -> Result<(), StoreError> {
308 let key = to_hex(hash);
309 let mut inner = self.inner.write().unwrap();
310 if let Some(count) = inner.pins.get_mut(&key) {
311 if *count > 0 {
312 *count -= 1;
313 }
314 if *count == 0 {
315 inner.pins.remove(&key);
316 }
317 }
318 Ok(())
319 }
320
321 fn pin_count(&self, hash: &Hash) -> u32 {
322 let key = to_hex(hash);
323 self.inner
324 .read()
325 .unwrap()
326 .pins
327 .get(&key)
328 .copied()
329 .unwrap_or(0)
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::hash::sha256;
337
338 #[tokio::test]
339 async fn test_put_returns_true_for_new() {
340 let store = MemoryStore::new();
341 let data = vec![1u8, 2, 3];
342 let hash = sha256(&data);
343
344 let result = store.put(hash, data).await.unwrap();
345 assert!(result);
346 }
347
348 #[tokio::test]
349 async fn test_put_returns_false_for_duplicate() {
350 let store = MemoryStore::new();
351 let data = vec![1u8, 2, 3];
352 let hash = sha256(&data);
353
354 store.put(hash, data.clone()).await.unwrap();
355 let result = store.put(hash, data).await.unwrap();
356 assert!(!result);
357 }
358
359 #[tokio::test]
360 async fn test_get_returns_data() {
361 let store = MemoryStore::new();
362 let data = vec![1u8, 2, 3];
363 let hash = sha256(&data);
364
365 store.put(hash, data.clone()).await.unwrap();
366 let result = store.get(&hash).await.unwrap();
367
368 assert_eq!(result, Some(data));
369 }
370
371 #[tokio::test]
372 async fn test_get_returns_none_for_missing() {
373 let store = MemoryStore::new();
374 let hash = [0u8; 32];
375
376 let result = store.get(&hash).await.unwrap();
377 assert!(result.is_none());
378 }
379
380 #[tokio::test]
381 async fn test_has_returns_true() {
382 let store = MemoryStore::new();
383 let data = vec![1u8, 2, 3];
384 let hash = sha256(&data);
385
386 store.put(hash, data).await.unwrap();
387 assert!(store.has(&hash).await.unwrap());
388 }
389
390 #[tokio::test]
391 async fn test_has_returns_false() {
392 let store = MemoryStore::new();
393 let hash = [0u8; 32];
394
395 assert!(!store.has(&hash).await.unwrap());
396 }
397
398 #[tokio::test]
399 async fn test_delete_returns_true() {
400 let store = MemoryStore::new();
401 let data = vec![1u8, 2, 3];
402 let hash = sha256(&data);
403
404 store.put(hash, data).await.unwrap();
405 let result = store.delete(&hash).await.unwrap();
406
407 assert!(result);
408 assert!(!store.has(&hash).await.unwrap());
409 }
410
411 #[tokio::test]
412 async fn test_delete_returns_false() {
413 let store = MemoryStore::new();
414 let hash = [0u8; 32];
415
416 let result = store.delete(&hash).await.unwrap();
417 assert!(!result);
418 }
419
420 #[tokio::test]
421 async fn test_size() {
422 let store = MemoryStore::new();
423 assert_eq!(store.size(), 0);
424
425 let data1 = vec![1u8];
426 let data2 = vec![2u8];
427 let hash1 = sha256(&data1);
428 let hash2 = sha256(&data2);
429
430 store.put(hash1, data1).await.unwrap();
431 store.put(hash2, data2).await.unwrap();
432
433 assert_eq!(store.size(), 2);
434 }
435
436 #[tokio::test]
437 async fn test_total_bytes() {
438 let store = MemoryStore::new();
439 assert_eq!(store.total_bytes(), 0);
440
441 let data1 = vec![1u8, 2, 3];
442 let data2 = vec![4u8, 5];
443 let hash1 = sha256(&data1);
444 let hash2 = sha256(&data2);
445
446 store.put(hash1, data1).await.unwrap();
447 store.put(hash2, data2).await.unwrap();
448
449 assert_eq!(store.total_bytes(), 5);
450 }
451
452 #[tokio::test]
453 async fn test_clear() {
454 let store = MemoryStore::new();
455 let data = vec![1u8, 2, 3];
456 let hash = sha256(&data);
457
458 store.put(hash, data).await.unwrap();
459 store.clear();
460
461 assert_eq!(store.size(), 0);
462 assert!(!store.has(&hash).await.unwrap());
463 }
464
465 #[tokio::test]
466 async fn test_keys() {
467 let store = MemoryStore::new();
468 assert!(store.keys().is_empty());
469
470 let data1 = vec![1u8];
471 let data2 = vec![2u8];
472 let hash1 = sha256(&data1);
473 let hash2 = sha256(&data2);
474
475 store.put(hash1, data1).await.unwrap();
476 store.put(hash2, data2).await.unwrap();
477
478 let keys = store.keys();
479 assert_eq!(keys.len(), 2);
480
481 let mut hex_keys: Vec<_> = keys.iter().map(to_hex).collect();
482 hex_keys.sort();
483 let mut expected: Vec<_> = vec![to_hex(&hash1), to_hex(&hash2)];
484 expected.sort();
485 assert_eq!(hex_keys, expected);
486 }
487
488 #[tokio::test]
489 async fn test_pin_and_unpin() {
490 let store = MemoryStore::new();
491 let data = vec![1u8, 2, 3];
492 let hash = sha256(&data);
493
494 store.put(hash, data).await.unwrap();
495
496 assert!(!store.is_pinned(&hash));
498 assert_eq!(store.pin_count(&hash), 0);
499
500 store.pin(&hash).await.unwrap();
502 assert!(store.is_pinned(&hash));
503 assert_eq!(store.pin_count(&hash), 1);
504
505 store.unpin(&hash).await.unwrap();
507 assert!(!store.is_pinned(&hash));
508 assert_eq!(store.pin_count(&hash), 0);
509 }
510
511 #[tokio::test]
512 async fn test_pin_count_ref_counting() {
513 let store = MemoryStore::new();
514 let data = vec![1u8, 2, 3];
515 let hash = sha256(&data);
516
517 store.put(hash, data).await.unwrap();
518
519 store.pin(&hash).await.unwrap();
521 store.pin(&hash).await.unwrap();
522 store.pin(&hash).await.unwrap();
523 assert_eq!(store.pin_count(&hash), 3);
524
525 store.unpin(&hash).await.unwrap();
527 assert_eq!(store.pin_count(&hash), 2);
528 assert!(store.is_pinned(&hash));
529
530 store.unpin(&hash).await.unwrap();
532 store.unpin(&hash).await.unwrap();
533 assert_eq!(store.pin_count(&hash), 0);
534 assert!(!store.is_pinned(&hash));
535
536 store.unpin(&hash).await.unwrap();
538 assert_eq!(store.pin_count(&hash), 0);
539 }
540
541 #[tokio::test]
542 async fn test_stats() {
543 let store = MemoryStore::new();
544
545 let data1 = vec![1u8, 2, 3]; let data2 = vec![4u8, 5]; let hash1 = sha256(&data1);
548 let hash2 = sha256(&data2);
549
550 store.put(hash1, data1).await.unwrap();
551 store.put(hash2, data2).await.unwrap();
552
553 store.pin(&hash1).await.unwrap();
555
556 let stats = store.stats().await;
557 assert_eq!(stats.count, 2);
558 assert_eq!(stats.bytes, 5);
559 assert_eq!(stats.pinned_count, 1);
560 assert_eq!(stats.pinned_bytes, 3);
561 }
562
563 #[tokio::test]
564 async fn test_max_bytes() {
565 let store = MemoryStore::new();
566 assert!(store.max_bytes().is_none());
567
568 store.set_max_bytes(1000);
569 assert_eq!(store.max_bytes(), Some(1000));
570
571 store.set_max_bytes(0);
573 assert!(store.max_bytes().is_none());
574 }
575
576 #[tokio::test]
577 async fn test_with_max_bytes() {
578 let store = MemoryStore::with_max_bytes(500);
579 assert_eq!(store.max_bytes(), Some(500));
580
581 let store_unlimited = MemoryStore::with_max_bytes(0);
582 assert!(store_unlimited.max_bytes().is_none());
583 }
584
585 #[tokio::test]
586 async fn test_eviction_respects_pins() {
587 let store = MemoryStore::with_max_bytes(10);
589
590 let data1 = vec![1u8, 1, 1]; let data2 = vec![2u8, 2, 2];
593 let data3 = vec![3u8, 3, 3]; let hash1 = sha256(&data1);
595 let hash2 = sha256(&data2);
596 let hash3 = sha256(&data3);
597
598 store.put(hash1, data1).await.unwrap();
599 store.put(hash2, data2).await.unwrap();
600 store.put(hash3, data3).await.unwrap();
601
602 store.pin(&hash1).await.unwrap();
604
605 let data4 = vec![4u8, 4, 4];
607 let hash4 = sha256(&data4);
608 store.put(hash4, data4).await.unwrap();
609
610 let freed = store.evict_if_needed().await.unwrap();
612 assert!(freed > 0);
613
614 assert!(store.has(&hash1).await.unwrap());
616 assert!(!store.has(&hash2).await.unwrap());
618 assert!(store.has(&hash3).await.unwrap());
620 assert!(store.has(&hash4).await.unwrap());
621 }
622
623 #[tokio::test]
624 async fn test_eviction_lru_order() {
625 let store = MemoryStore::with_max_bytes(15);
627
628 let data1 = vec![1u8; 5]; let data2 = vec![2u8; 5];
631 let data3 = vec![3u8; 5];
632 let data4 = vec![4u8; 5]; let hash1 = sha256(&data1);
634 let hash2 = sha256(&data2);
635 let hash3 = sha256(&data3);
636 let hash4 = sha256(&data4);
637
638 store.put(hash1, data1).await.unwrap();
639 store.put(hash2, data2).await.unwrap();
640 store.put(hash3, data3).await.unwrap();
641 store.put(hash4, data4).await.unwrap();
642
643 assert_eq!(store.total_bytes(), 20);
645
646 let freed = store.evict_if_needed().await.unwrap();
648 assert!(freed >= 5); assert!(!store.has(&hash1).await.unwrap());
652 assert!(store.has(&hash4).await.unwrap());
654 }
655
656 #[tokio::test]
657 async fn test_no_eviction_when_under_limit() {
658 let store = MemoryStore::with_max_bytes(100);
659
660 let data = vec![1u8, 2, 3];
661 let hash = sha256(&data);
662 store.put(hash, data).await.unwrap();
663
664 let freed = store.evict_if_needed().await.unwrap();
665 assert_eq!(freed, 0);
666 assert!(store.has(&hash).await.unwrap());
667 }
668
669 #[tokio::test]
670 async fn test_no_eviction_without_limit() {
671 let store = MemoryStore::new();
672
673 for i in 0..100u8 {
675 let data = vec![i; 100];
676 let hash = sha256(&data);
677 store.put(hash, data).await.unwrap();
678 }
679
680 let freed = store.evict_if_needed().await.unwrap();
681 assert_eq!(freed, 0);
682 assert_eq!(store.size(), 100);
683 }
684
685 #[tokio::test]
686 async fn test_delete_removes_pin() {
687 let store = MemoryStore::new();
688 let data = vec![1u8, 2, 3];
689 let hash = sha256(&data);
690
691 store.put(hash, data).await.unwrap();
692 store.pin(&hash).await.unwrap();
693 assert!(store.is_pinned(&hash));
694
695 store.delete(&hash).await.unwrap();
696 assert_eq!(store.pin_count(&hash), 0);
698 }
699}