1use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6
7use crate::types::{to_hex, Hash};
8
9#[derive(Debug, Clone, Default)]
11pub struct StoreStats {
12 pub count: u64,
14 pub bytes: u64,
16 pub pinned_count: u64,
18 pub pinned_bytes: u64,
20}
21
22#[async_trait]
24pub trait Store: Send + Sync {
25 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError>;
28
29 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError>;
32
33 async fn has(&self, hash: &Hash) -> Result<bool, StoreError>;
35
36 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError>;
39
40 fn set_max_bytes(&self, _max: u64) {}
46
47 fn max_bytes(&self) -> Option<u64> {
49 None
50 }
51
52 async fn stats(&self) -> StoreStats {
54 StoreStats::default()
55 }
56
57 async fn evict_if_needed(&self) -> Result<u64, StoreError> {
60 Ok(0)
61 }
62
63 async fn pin(&self, _hash: &Hash) -> Result<(), StoreError> {
69 Ok(())
70 }
71
72 async fn unpin(&self, _hash: &Hash) -> Result<(), StoreError> {
74 Ok(())
75 }
76
77 fn pin_count(&self, _hash: &Hash) -> u32 {
79 0
80 }
81
82 fn is_pinned(&self, hash: &Hash) -> bool {
84 self.pin_count(hash) > 0
85 }
86}
87
88#[derive(Debug, thiserror::Error)]
90pub enum StoreError {
91 #[error("IO error: {0}")]
92 Io(#[from] std::io::Error),
93 #[error("Store error: {0}")]
94 Other(String),
95}
96
97#[derive(Debug, Clone)]
99struct MemoryEntry {
100 data: Vec<u8>,
101 order: u64,
103}
104
105#[derive(Debug, Default)]
107struct MemoryStoreInner {
108 data: HashMap<String, MemoryEntry>,
109 pins: HashMap<String, u32>,
110 next_order: u64,
111 max_bytes: Option<u64>,
112}
113
114#[derive(Debug, Clone, Default)]
116pub struct MemoryStore {
117 inner: Arc<RwLock<MemoryStoreInner>>,
118}
119
120impl MemoryStore {
121 pub fn new() -> Self {
122 Self {
123 inner: Arc::new(RwLock::new(MemoryStoreInner::default())),
124 }
125 }
126
127 pub fn with_max_bytes(max_bytes: u64) -> Self {
129 Self {
130 inner: Arc::new(RwLock::new(MemoryStoreInner {
131 max_bytes: if max_bytes > 0 { Some(max_bytes) } else { None },
132 ..Default::default()
133 })),
134 }
135 }
136
137 pub fn size(&self) -> usize {
139 self.inner.read().unwrap().data.len()
140 }
141
142 pub fn total_bytes(&self) -> usize {
144 self.inner
145 .read()
146 .unwrap()
147 .data
148 .values()
149 .map(|e| e.data.len())
150 .sum()
151 }
152
153 pub fn clear(&self) {
155 self.inner.write().unwrap().data.clear();
156 }
157
158 pub fn keys(&self) -> Vec<Hash> {
160 self.inner
161 .read()
162 .unwrap()
163 .data
164 .keys()
165 .filter_map(|hex| {
166 let bytes = hex::decode(hex).ok()?;
167 if bytes.len() != 32 {
168 return None;
169 }
170 let mut hash = [0u8; 32];
171 hash.copy_from_slice(&bytes);
172 Some(hash)
173 })
174 .collect()
175 }
176
177 fn evict_to_target(&self, target_bytes: u64) -> u64 {
179 let mut inner = self.inner.write().unwrap();
180
181 let current_bytes: u64 = inner.data.values().map(|e| e.data.len() as u64).sum();
182 if current_bytes <= target_bytes {
183 return 0;
184 }
185
186 let mut unpinned: Vec<(String, u64, u64)> = inner
188 .data
189 .iter()
190 .filter(|(key, _)| inner.pins.get(*key).copied().unwrap_or(0) == 0)
191 .map(|(key, entry)| (key.clone(), entry.order, entry.data.len() as u64))
192 .collect();
193
194 unpinned.sort_by_key(|(_, order, _)| *order);
195
196 let mut freed = 0u64;
197 let to_free = current_bytes - target_bytes;
198
199 for (key, _, size) in unpinned {
200 if freed >= to_free {
201 break;
202 }
203 inner.data.remove(&key);
204 freed += size;
205 }
206
207 freed
208 }
209}
210
211#[async_trait]
212impl Store for MemoryStore {
213 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
214 let key = to_hex(&hash);
215 let mut inner = self.inner.write().unwrap();
216 if inner.data.contains_key(&key) {
217 return Ok(false);
218 }
219 let order = inner.next_order;
220 inner.next_order += 1;
221 inner.data.insert(key, MemoryEntry { data, order });
222 Ok(true)
223 }
224
225 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
226 let key = to_hex(hash);
227 let inner = self.inner.read().unwrap();
228 Ok(inner.data.get(&key).map(|e| e.data.clone()))
229 }
230
231 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
232 let key = to_hex(hash);
233 Ok(self.inner.read().unwrap().data.contains_key(&key))
234 }
235
236 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
237 let key = to_hex(hash);
238 let mut inner = self.inner.write().unwrap();
239 inner.pins.remove(&key);
241 Ok(inner.data.remove(&key).is_some())
242 }
243
244 fn set_max_bytes(&self, max: u64) {
245 self.inner.write().unwrap().max_bytes = if max > 0 { Some(max) } else { None };
246 }
247
248 fn max_bytes(&self) -> Option<u64> {
249 self.inner.read().unwrap().max_bytes
250 }
251
252 async fn stats(&self) -> StoreStats {
253 let inner = self.inner.read().unwrap();
254 let mut count = 0u64;
255 let mut bytes = 0u64;
256 let mut pinned_count = 0u64;
257 let mut pinned_bytes = 0u64;
258
259 for (key, entry) in &inner.data {
260 count += 1;
261 bytes += entry.data.len() as u64;
262 if inner.pins.get(key).copied().unwrap_or(0) > 0 {
263 pinned_count += 1;
264 pinned_bytes += entry.data.len() as u64;
265 }
266 }
267
268 StoreStats {
269 count,
270 bytes,
271 pinned_count,
272 pinned_bytes,
273 }
274 }
275
276 async fn evict_if_needed(&self) -> Result<u64, StoreError> {
277 let max = match self.inner.read().unwrap().max_bytes {
278 Some(m) => m,
279 None => return Ok(0), };
281
282 let current: u64 = self
283 .inner
284 .read()
285 .unwrap()
286 .data
287 .values()
288 .map(|e| e.data.len() as u64)
289 .sum();
290
291 if current <= max {
292 return Ok(0);
293 }
294
295 let target = max * 9 / 10;
297 Ok(self.evict_to_target(target))
298 }
299
300 async fn pin(&self, hash: &Hash) -> Result<(), StoreError> {
301 let key = to_hex(hash);
302 let mut inner = self.inner.write().unwrap();
303 *inner.pins.entry(key).or_insert(0) += 1;
304 Ok(())
305 }
306
307 async fn unpin(&self, hash: &Hash) -> Result<(), StoreError> {
308 let key = to_hex(hash);
309 let mut inner = self.inner.write().unwrap();
310 if let Some(count) = inner.pins.get_mut(&key) {
311 if *count > 0 {
312 *count -= 1;
313 }
314 if *count == 0 {
315 inner.pins.remove(&key);
316 }
317 }
318 Ok(())
319 }
320
321 fn pin_count(&self, hash: &Hash) -> u32 {
322 let key = to_hex(hash);
323 self.inner.read().unwrap().pins.get(&key).copied().unwrap_or(0)
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::hash::sha256;
331
332 #[tokio::test]
333 async fn test_put_returns_true_for_new() {
334 let store = MemoryStore::new();
335 let data = vec![1u8, 2, 3];
336 let hash = sha256(&data);
337
338 let result = store.put(hash, data).await.unwrap();
339 assert!(result);
340 }
341
342 #[tokio::test]
343 async fn test_put_returns_false_for_duplicate() {
344 let store = MemoryStore::new();
345 let data = vec![1u8, 2, 3];
346 let hash = sha256(&data);
347
348 store.put(hash, data.clone()).await.unwrap();
349 let result = store.put(hash, data).await.unwrap();
350 assert!(!result);
351 }
352
353 #[tokio::test]
354 async fn test_get_returns_data() {
355 let store = MemoryStore::new();
356 let data = vec![1u8, 2, 3];
357 let hash = sha256(&data);
358
359 store.put(hash, data.clone()).await.unwrap();
360 let result = store.get(&hash).await.unwrap();
361
362 assert_eq!(result, Some(data));
363 }
364
365 #[tokio::test]
366 async fn test_get_returns_none_for_missing() {
367 let store = MemoryStore::new();
368 let hash = [0u8; 32];
369
370 let result = store.get(&hash).await.unwrap();
371 assert!(result.is_none());
372 }
373
374 #[tokio::test]
375 async fn test_has_returns_true() {
376 let store = MemoryStore::new();
377 let data = vec![1u8, 2, 3];
378 let hash = sha256(&data);
379
380 store.put(hash, data).await.unwrap();
381 assert!(store.has(&hash).await.unwrap());
382 }
383
384 #[tokio::test]
385 async fn test_has_returns_false() {
386 let store = MemoryStore::new();
387 let hash = [0u8; 32];
388
389 assert!(!store.has(&hash).await.unwrap());
390 }
391
392 #[tokio::test]
393 async fn test_delete_returns_true() {
394 let store = MemoryStore::new();
395 let data = vec![1u8, 2, 3];
396 let hash = sha256(&data);
397
398 store.put(hash, data).await.unwrap();
399 let result = store.delete(&hash).await.unwrap();
400
401 assert!(result);
402 assert!(!store.has(&hash).await.unwrap());
403 }
404
405 #[tokio::test]
406 async fn test_delete_returns_false() {
407 let store = MemoryStore::new();
408 let hash = [0u8; 32];
409
410 let result = store.delete(&hash).await.unwrap();
411 assert!(!result);
412 }
413
414 #[tokio::test]
415 async fn test_size() {
416 let store = MemoryStore::new();
417 assert_eq!(store.size(), 0);
418
419 let data1 = vec![1u8];
420 let data2 = vec![2u8];
421 let hash1 = sha256(&data1);
422 let hash2 = sha256(&data2);
423
424 store.put(hash1, data1).await.unwrap();
425 store.put(hash2, data2).await.unwrap();
426
427 assert_eq!(store.size(), 2);
428 }
429
430 #[tokio::test]
431 async fn test_total_bytes() {
432 let store = MemoryStore::new();
433 assert_eq!(store.total_bytes(), 0);
434
435 let data1 = vec![1u8, 2, 3];
436 let data2 = vec![4u8, 5];
437 let hash1 = sha256(&data1);
438 let hash2 = sha256(&data2);
439
440 store.put(hash1, data1).await.unwrap();
441 store.put(hash2, data2).await.unwrap();
442
443 assert_eq!(store.total_bytes(), 5);
444 }
445
446 #[tokio::test]
447 async fn test_clear() {
448 let store = MemoryStore::new();
449 let data = vec![1u8, 2, 3];
450 let hash = sha256(&data);
451
452 store.put(hash, data).await.unwrap();
453 store.clear();
454
455 assert_eq!(store.size(), 0);
456 assert!(!store.has(&hash).await.unwrap());
457 }
458
459 #[tokio::test]
460 async fn test_keys() {
461 let store = MemoryStore::new();
462 assert!(store.keys().is_empty());
463
464 let data1 = vec![1u8];
465 let data2 = vec![2u8];
466 let hash1 = sha256(&data1);
467 let hash2 = sha256(&data2);
468
469 store.put(hash1, data1).await.unwrap();
470 store.put(hash2, data2).await.unwrap();
471
472 let keys = store.keys();
473 assert_eq!(keys.len(), 2);
474
475 let mut hex_keys: Vec<_> = keys.iter().map(to_hex).collect();
476 hex_keys.sort();
477 let mut expected: Vec<_> = vec![to_hex(&hash1), to_hex(&hash2)];
478 expected.sort();
479 assert_eq!(hex_keys, expected);
480 }
481
482 #[tokio::test]
483 async fn test_pin_and_unpin() {
484 let store = MemoryStore::new();
485 let data = vec![1u8, 2, 3];
486 let hash = sha256(&data);
487
488 store.put(hash, data).await.unwrap();
489
490 assert!(!store.is_pinned(&hash));
492 assert_eq!(store.pin_count(&hash), 0);
493
494 store.pin(&hash).await.unwrap();
496 assert!(store.is_pinned(&hash));
497 assert_eq!(store.pin_count(&hash), 1);
498
499 store.unpin(&hash).await.unwrap();
501 assert!(!store.is_pinned(&hash));
502 assert_eq!(store.pin_count(&hash), 0);
503 }
504
505 #[tokio::test]
506 async fn test_pin_count_ref_counting() {
507 let store = MemoryStore::new();
508 let data = vec![1u8, 2, 3];
509 let hash = sha256(&data);
510
511 store.put(hash, data).await.unwrap();
512
513 store.pin(&hash).await.unwrap();
515 store.pin(&hash).await.unwrap();
516 store.pin(&hash).await.unwrap();
517 assert_eq!(store.pin_count(&hash), 3);
518
519 store.unpin(&hash).await.unwrap();
521 assert_eq!(store.pin_count(&hash), 2);
522 assert!(store.is_pinned(&hash));
523
524 store.unpin(&hash).await.unwrap();
526 store.unpin(&hash).await.unwrap();
527 assert_eq!(store.pin_count(&hash), 0);
528 assert!(!store.is_pinned(&hash));
529
530 store.unpin(&hash).await.unwrap();
532 assert_eq!(store.pin_count(&hash), 0);
533 }
534
535 #[tokio::test]
536 async fn test_stats() {
537 let store = MemoryStore::new();
538
539 let data1 = vec![1u8, 2, 3]; let data2 = vec![4u8, 5]; let hash1 = sha256(&data1);
542 let hash2 = sha256(&data2);
543
544 store.put(hash1, data1).await.unwrap();
545 store.put(hash2, data2).await.unwrap();
546
547 store.pin(&hash1).await.unwrap();
549
550 let stats = store.stats().await;
551 assert_eq!(stats.count, 2);
552 assert_eq!(stats.bytes, 5);
553 assert_eq!(stats.pinned_count, 1);
554 assert_eq!(stats.pinned_bytes, 3);
555 }
556
557 #[tokio::test]
558 async fn test_max_bytes() {
559 let store = MemoryStore::new();
560 assert!(store.max_bytes().is_none());
561
562 store.set_max_bytes(1000);
563 assert_eq!(store.max_bytes(), Some(1000));
564
565 store.set_max_bytes(0);
567 assert!(store.max_bytes().is_none());
568 }
569
570 #[tokio::test]
571 async fn test_with_max_bytes() {
572 let store = MemoryStore::with_max_bytes(500);
573 assert_eq!(store.max_bytes(), Some(500));
574
575 let store_unlimited = MemoryStore::with_max_bytes(0);
576 assert!(store_unlimited.max_bytes().is_none());
577 }
578
579 #[tokio::test]
580 async fn test_eviction_respects_pins() {
581 let store = MemoryStore::with_max_bytes(10);
583
584 let data1 = vec![1u8, 1, 1]; let data2 = vec![2u8, 2, 2];
587 let data3 = vec![3u8, 3, 3]; let hash1 = sha256(&data1);
589 let hash2 = sha256(&data2);
590 let hash3 = sha256(&data3);
591
592 store.put(hash1, data1).await.unwrap();
593 store.put(hash2, data2).await.unwrap();
594 store.put(hash3, data3).await.unwrap();
595
596 store.pin(&hash1).await.unwrap();
598
599 let data4 = vec![4u8, 4, 4];
601 let hash4 = sha256(&data4);
602 store.put(hash4, data4).await.unwrap();
603
604 let freed = store.evict_if_needed().await.unwrap();
606 assert!(freed > 0);
607
608 assert!(store.has(&hash1).await.unwrap());
610 assert!(!store.has(&hash2).await.unwrap());
612 assert!(store.has(&hash3).await.unwrap());
614 assert!(store.has(&hash4).await.unwrap());
615 }
616
617 #[tokio::test]
618 async fn test_eviction_lru_order() {
619 let store = MemoryStore::with_max_bytes(15);
621
622 let data1 = vec![1u8; 5]; let data2 = vec![2u8; 5];
625 let data3 = vec![3u8; 5];
626 let data4 = vec![4u8; 5]; let hash1 = sha256(&data1);
628 let hash2 = sha256(&data2);
629 let hash3 = sha256(&data3);
630 let hash4 = sha256(&data4);
631
632 store.put(hash1, data1).await.unwrap();
633 store.put(hash2, data2).await.unwrap();
634 store.put(hash3, data3).await.unwrap();
635 store.put(hash4, data4).await.unwrap();
636
637 assert_eq!(store.total_bytes(), 20);
639
640 let freed = store.evict_if_needed().await.unwrap();
642 assert!(freed >= 5); assert!(!store.has(&hash1).await.unwrap());
646 assert!(store.has(&hash4).await.unwrap());
648 }
649
650 #[tokio::test]
651 async fn test_no_eviction_when_under_limit() {
652 let store = MemoryStore::with_max_bytes(100);
653
654 let data = vec![1u8, 2, 3];
655 let hash = sha256(&data);
656 store.put(hash, data).await.unwrap();
657
658 let freed = store.evict_if_needed().await.unwrap();
659 assert_eq!(freed, 0);
660 assert!(store.has(&hash).await.unwrap());
661 }
662
663 #[tokio::test]
664 async fn test_no_eviction_without_limit() {
665 let store = MemoryStore::new();
666
667 for i in 0..100u8 {
669 let data = vec![i; 100];
670 let hash = sha256(&data);
671 store.put(hash, data).await.unwrap();
672 }
673
674 let freed = store.evict_if_needed().await.unwrap();
675 assert_eq!(freed, 0);
676 assert_eq!(store.size(), 100);
677 }
678
679 #[tokio::test]
680 async fn test_delete_removes_pin() {
681 let store = MemoryStore::new();
682 let data = vec![1u8, 2, 3];
683 let hash = sha256(&data);
684
685 store.put(hash, data).await.unwrap();
686 store.pin(&hash).await.unwrap();
687 assert!(store.is_pinned(&hash));
688
689 store.delete(&hash).await.unwrap();
690 assert_eq!(store.pin_count(&hash), 0);
692 }
693}