1use std::collections::HashSet;
19
20use oxistore_core::{KvStore, StoreError};
21
22use crate::Cache;
23
24pub struct WriteThroughCache<S, C> {
40 store: S,
41 cache: C,
42}
43
44impl<S, C> WriteThroughCache<S, C>
45where
46 S: KvStore,
47 C: Cache<Vec<u8>, Vec<u8>>,
48{
49 pub fn new(store: S, cache: C) -> Self {
51 WriteThroughCache { store, cache }
52 }
53
54 pub fn store(&self) -> &S {
56 &self.store
57 }
58
59 pub fn cache(&self) -> &C {
61 &self.cache
62 }
63
64 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
68 if let Some(v) = self.cache.get(&key.to_vec()) {
70 return Ok(Some(v.clone()));
71 }
72 match self.store.get(key)? {
74 Some(v) => {
75 self.cache.put(key.to_vec(), v.clone());
77 Ok(Some(v))
78 }
79 None => Ok(None),
80 }
81 }
82
83 pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), StoreError> {
85 self.store.put(&key, &value)?;
86 self.cache.put(key, value);
87 Ok(())
88 }
89
90 pub fn remove(&mut self, key: &[u8]) -> Result<(), StoreError> {
92 self.cache.remove(&key.to_vec());
93 self.store.delete(key)?;
94 Ok(())
95 }
96
97 pub fn cache_len(&self) -> usize {
99 self.cache.len()
100 }
101}
102
103pub struct WriteBackCache<S, C> {
129 store: S,
130 cache: C,
131 dirty: HashSet<Vec<u8>>,
132}
133
134impl<S, C> WriteBackCache<S, C>
135where
136 S: KvStore,
137 C: Cache<Vec<u8>, Vec<u8>>,
138{
139 pub fn new(store: S, cache: C) -> Self {
141 WriteBackCache {
142 store,
143 cache,
144 dirty: HashSet::new(),
145 }
146 }
147
148 pub fn store(&self) -> &S {
150 &self.store
151 }
152
153 pub fn cache(&self) -> &C {
155 &self.cache
156 }
157
158 pub fn dirty_count(&self) -> usize {
160 self.dirty.len()
161 }
162
163 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
169 if let Some(v) = self.cache.get(&key.to_vec()) {
170 return Ok(Some(v.clone()));
171 }
172 self.store.get(key)
174 }
175
176 pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), StoreError> {
180 self.flush_if_eviction_imminent(&key)?;
183
184 self.dirty.insert(key.clone());
185 self.cache.put(key, value);
186 Ok(())
187 }
188
189 pub fn remove(&mut self, key: &[u8]) -> Result<(), StoreError> {
191 let key_vec = key.to_vec();
192 self.cache.remove(&key_vec);
193 self.dirty.remove(&key_vec);
194 self.store.delete(key)?;
195 Ok(())
196 }
197
198 pub fn flush(&mut self) -> Result<(), StoreError> {
204 let dirty_keys: Vec<Vec<u8>> = self.dirty.iter().cloned().collect();
205 for key in dirty_keys {
206 if let Some(val) = self.cache.peek(&key) {
207 self.store.put(&key, val)?;
208 }
209 }
211 self.dirty.clear();
212 Ok(())
213 }
214
215 fn flush_if_eviction_imminent(&mut self, incoming_key: &[u8]) -> Result<(), StoreError> {
221 if self.cache.contains_key(&incoming_key.to_vec()) {
224 return Ok(());
225 }
226 if self.cache.len() < self.cache.cap() {
227 return Ok(());
228 }
229 let dirty_keys: Vec<Vec<u8>> = self.dirty.iter().cloned().collect();
237 for key in dirty_keys {
238 if let Some(val) = self.cache.peek(&key) {
239 self.store.put(&key, val)?;
240 }
241 }
242 Ok(())
246 }
247}
248
249pub struct CacheableKvStore<S, C> {
270 store: S,
271 cache: std::sync::Mutex<C>,
272}
273
274impl<S, C> CacheableKvStore<S, C> {
275 pub fn new(store: S, cache: C) -> Self {
277 CacheableKvStore {
278 store,
279 cache: std::sync::Mutex::new(cache),
280 }
281 }
282}
283
284impl<S, C> KvStore for CacheableKvStore<S, C>
285where
286 S: KvStore,
287 C: Cache<Vec<u8>, Vec<u8>> + Send,
288{
289 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
290 let cached = {
293 let mut guard = self
294 .cache
295 .lock()
296 .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
297 guard.get(&key.to_vec()).cloned()
298 };
299 if let Some(v) = cached {
300 return Ok(Some(v));
301 }
302 let from_store = self.store.get(key)?;
304 if let Some(ref v) = from_store {
305 let mut guard = self
307 .cache
308 .lock()
309 .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
310 guard.put(key.to_vec(), v.clone());
311 }
312 Ok(from_store)
313 }
314
315 fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
316 self.store.put(key, value)?;
318 let mut guard = self
319 .cache
320 .lock()
321 .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
322 guard.remove(&key.to_vec());
323 Ok(())
324 }
325
326 fn delete(&self, key: &[u8]) -> Result<(), StoreError> {
327 self.store.delete(key)?;
328 let mut guard = self
329 .cache
330 .lock()
331 .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
332 guard.remove(&key.to_vec());
333 Ok(())
334 }
335
336 fn range<'a>(
337 &'a self,
338 lo: &[u8],
339 hi: &[u8],
340 ) -> Result<oxistore_core::RangeIter<'a>, StoreError> {
341 self.store.range(lo, hi)
342 }
343
344 fn iter<'a>(&'a self) -> Result<oxistore_core::RangeIter<'a>, StoreError> {
345 self.store.iter()
346 }
347
348 fn transaction(&self) -> Result<Box<dyn oxistore_core::KvTxn + '_>, StoreError> {
349 self.store.transaction()
350 }
351
352 fn snapshot(&self) -> Result<Box<dyn oxistore_core::KvSnapshot + '_>, StoreError> {
353 self.store.snapshot()
354 }
355
356 fn flush(&self) -> Result<(), StoreError> {
357 self.store.flush()
358 }
359}
360
361#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::LruCache;
369 use oxistore_core::{KvSnapshot, KvStore, KvTxn, RangeIter, StoreError};
370 use std::collections::HashMap;
371 use std::sync::Mutex;
372
373 #[derive(Default, Debug)]
375 struct MemStore(Mutex<HashMap<Vec<u8>, Vec<u8>>>);
376
377 impl KvStore for MemStore {
378 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
379 Ok(self.0.lock().expect("lock").get(key).cloned())
380 }
381
382 fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
383 self.0
384 .lock()
385 .expect("lock")
386 .insert(key.to_vec(), value.to_vec());
387 Ok(())
388 }
389
390 fn delete(&self, key: &[u8]) -> Result<(), StoreError> {
391 self.0.lock().expect("lock").remove(key);
392 Ok(())
393 }
394
395 fn range<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError> {
396 let guard = self.0.lock().expect("lock");
397 let lo = lo.to_vec();
398 let hi = hi.to_vec();
399 let pairs: Vec<_> = guard
400 .iter()
401 .filter(|(k, _)| **k >= lo && **k < hi)
402 .map(|(k, v)| Ok((k.clone(), v.clone())))
403 .collect();
404 drop(guard);
405 Ok(Box::new(pairs.into_iter()))
406 }
407
408 fn transaction(&self) -> Result<Box<dyn KvTxn + '_>, StoreError> {
409 Err(StoreError::Other("MemStore: no txn".to_string()))
410 }
411
412 fn snapshot(&self) -> Result<Box<dyn KvSnapshot + '_>, StoreError> {
413 Err(StoreError::Other("MemStore: no snapshot".to_string()))
414 }
415
416 fn iter<'a>(&'a self) -> Result<RangeIter<'a>, StoreError> {
417 let guard = self.0.lock().expect("lock");
418 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> =
419 guard.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
420 drop(guard);
421 pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
422 Ok(Box::new(pairs.into_iter().map(Ok)))
423 }
424
425 fn flush(&self) -> Result<(), StoreError> {
426 Ok(())
427 }
428 }
429
430 #[test]
433 fn write_through_put_flushes_to_store() {
434 let store = MemStore::default();
435 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
436 let mut wt = WriteThroughCache::new(store, cache);
437
438 wt.put(b"key".to_vec(), b"value".to_vec())
439 .expect("put failed");
440
441 let from_store = wt.store().get(b"key").expect("get failed");
443 assert_eq!(from_store, Some(b"value".to_vec()));
444 }
445
446 #[test]
447 fn write_through_get_hits_cache() {
448 let store = MemStore::default();
449 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
450 let mut wt = WriteThroughCache::new(store, cache);
451
452 wt.put(b"k".to_vec(), b"v".to_vec()).expect("put");
453 let v = wt.get(b"k").expect("get");
454 assert_eq!(v, Some(b"v".to_vec()));
455 }
456
457 #[test]
458 fn write_through_get_miss_populates_from_store() {
459 let store = MemStore::default();
460 store.put(b"existing", b"from_store").expect("store put");
462
463 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
464 let mut wt = WriteThroughCache::new(store, cache);
465
466 let v = wt.get(b"existing").expect("get");
468 assert_eq!(v, Some(b"from_store".to_vec()));
469
470 assert_eq!(wt.cache_len(), 1);
472 }
473
474 #[test]
475 fn write_through_remove_clears_store() {
476 let store = MemStore::default();
477 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
478 let mut wt = WriteThroughCache::new(store, cache);
479
480 wt.put(b"rm_key".to_vec(), b"rm_val".to_vec()).expect("put");
481 wt.remove(b"rm_key").expect("remove");
482
483 let from_store = wt.store().get(b"rm_key").expect("store get");
484 assert!(from_store.is_none());
485 }
486
487 #[test]
488 fn write_through_get_miss_absent_in_store() {
489 let store = MemStore::default();
490 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
491 let mut wt = WriteThroughCache::new(store, cache);
492
493 let v = wt.get(b"no_such_key").expect("get");
494 assert!(v.is_none());
495 }
496
497 #[test]
500 fn write_back_put_deferred() {
501 let store = MemStore::default();
502 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
503 let mut wb = WriteBackCache::new(store, cache);
504
505 wb.put(b"lazy".to_vec(), b"write".to_vec()).expect("put");
506
507 let from_store = wb.store().get(b"lazy").expect("store get");
509 assert!(from_store.is_none());
510 assert_eq!(wb.dirty_count(), 1);
511 }
512
513 #[test]
514 fn write_back_flush_persists() {
515 let store = MemStore::default();
516 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
517 let mut wb = WriteBackCache::new(store, cache);
518
519 wb.put(b"a".to_vec(), b"1".to_vec()).expect("put");
520 wb.put(b"b".to_vec(), b"2".to_vec()).expect("put");
521
522 wb.flush().expect("flush");
523
524 assert_eq!(wb.dirty_count(), 0);
525 assert_eq!(
526 wb.store().get(b"a").expect("store get a"),
527 Some(b"1".to_vec())
528 );
529 assert_eq!(
530 wb.store().get(b"b").expect("store get b"),
531 Some(b"2".to_vec())
532 );
533 }
534
535 #[test]
536 fn write_back_get_hits_cache() {
537 let store = MemStore::default();
538 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
539 let mut wb = WriteBackCache::new(store, cache);
540
541 wb.put(b"key".to_vec(), b"val".to_vec()).expect("put");
542 let v = wb.get(b"key").expect("get");
543 assert_eq!(v, Some(b"val".to_vec()));
544 }
545
546 #[test]
547 fn write_back_get_misses_to_store() {
548 let store = MemStore::default();
549 store.put(b"persistent", b"data").expect("store put");
550
551 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
552 let mut wb = WriteBackCache::new(store, cache);
553
554 let v = wb.get(b"persistent").expect("get");
555 assert_eq!(v, Some(b"data".to_vec()));
556 }
557
558 #[test]
559 fn write_back_remove_deletes_from_store() {
560 let store = MemStore::default();
561 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
562 let mut wb = WriteBackCache::new(store, cache);
563
564 wb.put(b"del".to_vec(), b"gone".to_vec()).expect("put");
565 wb.flush().expect("flush");
566 wb.remove(b"del").expect("remove");
567
568 assert!(wb.store().get(b"del").expect("store get").is_none());
569 assert_eq!(wb.dirty_count(), 0);
570 }
571
572 #[test]
573 fn write_back_dirty_eviction_flushes() {
574 let store = MemStore::default();
576 let cache = LruCache::<Vec<u8>, Vec<u8>>::new(2);
577 let mut wb = WriteBackCache::new(store, cache);
578
579 wb.put(b"first".to_vec(), b"v1".to_vec()).expect("put 1");
580 wb.put(b"second".to_vec(), b"v2".to_vec()).expect("put 2");
581 wb.put(b"third".to_vec(), b"v3".to_vec()).expect("put 3");
584
585 wb.flush().expect("flush");
587
588 let v2 = wb.store().get(b"second").expect("store get second");
590 let v3 = wb.store().get(b"third").expect("store get third");
591 assert_eq!(v2, Some(b"v2".to_vec()));
592 assert_eq!(v3, Some(b"v3".to_vec()));
593 }
594}