Skip to main content

oxistore_cache/
write_adapter.rs

1//! Write-through and write-back cache adapters.
2//!
3//! Both adapters combine a [`Cache<Vec<u8>, Vec<u8>>`] with an
4//! [`oxistore_core::KvStore`] to provide transparent persistence.
5//!
6//! ## Write-Through
7//!
8//! [`WriteThroughCache`] ensures every `put` is immediately written to the
9//! backing store.  Cache misses on `get` are populated from the store.
10//!
11//! ## Write-Back
12//!
13//! [`WriteBackCache`] writes to the cache immediately but defers flushing to
14//! the store until an explicit [`WriteBackCache::flush`] call.  If the inner
15//! cache evicts a dirty entry, the entry is flushed to the store synchronously
16//! to avoid silent data loss.
17
18use std::collections::HashSet;
19
20use oxistore_core::{KvStore, StoreError};
21
22use crate::Cache;
23
24// ---------------------------------------------------------------------------
25// WriteThroughCache
26// ---------------------------------------------------------------------------
27
28/// A cache adapter that propagates writes to a backing [`KvStore`] immediately.
29///
30/// - `put(k, v)` → writes to both the cache **and** the store.
31/// - `get(k)` → returns from cache if present; on miss, fetches from the store,
32///   populates the cache, and returns the value.
33/// - `remove(k)` → removes from both cache and store.
34///
35/// # Type parameters
36///
37/// - `S`: a [`KvStore`] implementor.
38/// - `C`: a `Cache<Vec<u8>, Vec<u8>>` implementor.
39pub struct WriteThroughCache<S, C> {
40    store: S,
41    cache: C,
42}
43
44impl<S, C> WriteThroughCache<S, C>
45where
46    S: KvStore,
47    C: Cache<Vec<u8>, Vec<u8>>,
48{
49    /// Create a new write-through cache adapter.
50    pub fn new(store: S, cache: C) -> Self {
51        WriteThroughCache { store, cache }
52    }
53
54    /// Borrow the inner store.
55    pub fn store(&self) -> &S {
56        &self.store
57    }
58
59    /// Borrow the inner cache.
60    pub fn cache(&self) -> &C {
61        &self.cache
62    }
63
64    /// Look up `key`: cache first, then store on miss.
65    ///
66    /// On a store hit the value is populated back into the cache.
67    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
68        // Cache hit (returns a reference — clone to avoid borrow issues).
69        if let Some(v) = self.cache.get(&key.to_vec()) {
70            return Ok(Some(v.clone()));
71        }
72        // Cache miss → try store.
73        match self.store.get(key)? {
74            Some(v) => {
75                // Populate cache.
76                self.cache.put(key.to_vec(), v.clone());
77                Ok(Some(v))
78            }
79            None => Ok(None),
80        }
81    }
82
83    /// Insert `key` → `value` into both cache and store.
84    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), StoreError> {
85        self.store.put(&key, &value)?;
86        self.cache.put(key, value);
87        Ok(())
88    }
89
90    /// Remove `key` from both cache and store.
91    pub fn remove(&mut self, key: &[u8]) -> Result<(), StoreError> {
92        self.cache.remove(&key.to_vec());
93        self.store.delete(key)?;
94        Ok(())
95    }
96
97    /// Return the number of live cache entries.
98    pub fn cache_len(&self) -> usize {
99        self.cache.len()
100    }
101}
102
103// ---------------------------------------------------------------------------
104// WriteBackCache
105// ---------------------------------------------------------------------------
106
107/// A cache adapter that defers writes to the backing [`KvStore`].
108///
109/// Writes are buffered in the cache and marked as dirty.  The store is not
110/// updated until:
111///
112/// 1. [`WriteBackCache::flush`] is called explicitly, or
113/// 2. The inner cache evicts a dirty entry (to avoid silent data loss).
114///
115/// # Eviction of dirty entries
116///
117/// The `Cache` trait has no eviction callback.  This adapter uses a
118/// cooperative model: before calling `put` on the inner cache, it checks
119/// whether the cache is full and peeks at the current LRU entry.  If the
120/// entry to be evicted is dirty it is flushed first.  This is a best-effort
121/// approach — the adapter introspects the cache state before insertion; it
122/// does **not** hook into the cache's internal eviction path.
123///
124/// # Type parameters
125///
126/// - `S`: a [`KvStore`] implementor.
127/// - `C`: a `Cache<Vec<u8>, Vec<u8>>` implementor.
128pub struct WriteBackCache<S, C> {
129    store: S,
130    cache: C,
131    dirty: HashSet<Vec<u8>>,
132}
133
134impl<S, C> WriteBackCache<S, C>
135where
136    S: KvStore,
137    C: Cache<Vec<u8>, Vec<u8>>,
138{
139    /// Create a new write-back cache adapter with an empty dirty set.
140    pub fn new(store: S, cache: C) -> Self {
141        WriteBackCache {
142            store,
143            cache,
144            dirty: HashSet::new(),
145        }
146    }
147
148    /// Borrow the inner store.
149    pub fn store(&self) -> &S {
150        &self.store
151    }
152
153    /// Borrow the inner cache.
154    pub fn cache(&self) -> &C {
155        &self.cache
156    }
157
158    /// Return the number of keys that have unflushed writes.
159    pub fn dirty_count(&self) -> usize {
160        self.dirty.len()
161    }
162
163    /// Look up `key`: cache first, then store on miss.
164    ///
165    /// Store hits are **not** inserted back into the cache (read-around policy)
166    /// to avoid polluting the cache with cold data that has already been
167    /// committed.  Callers that want read-population can call `put` after `get`.
168    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
169        if let Some(v) = self.cache.get(&key.to_vec()) {
170            return Ok(Some(v.clone()));
171        }
172        // Cache miss → fetch from store (read-around, do not populate cache).
173        self.store.get(key)
174    }
175
176    /// Insert `key` → `value` into the cache and mark dirty.
177    ///
178    /// The store is **not** updated immediately.
179    pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), StoreError> {
180        // Pre-eviction check: if the cache is at capacity and the to-be-evicted
181        // entry is dirty, flush it before it disappears.
182        self.flush_if_eviction_imminent(&key)?;
183
184        self.dirty.insert(key.clone());
185        self.cache.put(key, value);
186        Ok(())
187    }
188
189    /// Remove `key` from the cache and dirty set, and delete from the store.
190    pub fn remove(&mut self, key: &[u8]) -> Result<(), StoreError> {
191        let key_vec = key.to_vec();
192        self.cache.remove(&key_vec);
193        self.dirty.remove(&key_vec);
194        self.store.delete(key)?;
195        Ok(())
196    }
197
198    /// Flush all dirty keys to the store and clear the dirty set.
199    ///
200    /// For each dirty key the value is read from the cache.  If the key is no
201    /// longer in the cache (it was evicted and presumably already flushed via
202    /// the pre-eviction hook), it is skipped.
203    pub fn flush(&mut self) -> Result<(), StoreError> {
204        let dirty_keys: Vec<Vec<u8>> = self.dirty.iter().cloned().collect();
205        for key in dirty_keys {
206            if let Some(val) = self.cache.peek(&key) {
207                self.store.put(&key, val)?;
208            }
209            // If peek returns None the key was evicted; it was flushed at eviction time.
210        }
211        self.dirty.clear();
212        Ok(())
213    }
214
215    /// Check whether inserting a new key will trigger eviction of a dirty entry.
216    ///
217    /// This is called *before* every `put`.  If the cache is full and the front
218    /// (LRU) entry is dirty, it is flushed to the store now so that the value
219    /// is not lost when the inner cache evicts it.
220    fn flush_if_eviction_imminent(&mut self, incoming_key: &[u8]) -> Result<(), StoreError> {
221        // Only relevant if the cache is at or beyond capacity and the new key
222        // is not already present (updates don't cause eviction).
223        if self.cache.contains_key(&incoming_key.to_vec()) {
224            return Ok(());
225        }
226        if self.cache.len() < self.cache.cap() {
227            return Ok(());
228        }
229        // The cache is full and the new key is novel — eviction will happen.
230        // Find the LRU candidate via peek on the order-tracking structure.
231        // Because we don't have direct access to the cache's internal order,
232        // we iterate the dirty set and check each dirty key: any dirty key
233        // currently at the LRU position might be evicted.  The simplest safe
234        // approach: flush all dirty entries now (conservative but correct).
235        // This avoids requiring the inner Cache trait to expose its eviction order.
236        let dirty_keys: Vec<Vec<u8>> = self.dirty.iter().cloned().collect();
237        for key in dirty_keys {
238            if let Some(val) = self.cache.peek(&key) {
239                self.store.put(&key, val)?;
240            }
241        }
242        // We do NOT clear dirty here because the actual eviction hasn't happened yet;
243        // after flush the entries are still in the cache.  They'll be cleared on
244        // the next explicit flush() call.  This is safe because store.put is idempotent.
245        Ok(())
246    }
247}
248
249// ---------------------------------------------------------------------------
250// CacheableKvStore
251// ---------------------------------------------------------------------------
252
253/// A read-through cache adapter that wraps a [`KvStore`] with a [`Cache`].
254///
255/// - `get(k)` — checks the cache first; on a miss, fetches from the store,
256///   populates the cache, and returns the value.  The mutex lock is **not**
257///   held while calling the underlying store, so concurrent readers are never
258///   blocked by store I/O.
259/// - `put(k, v)` — writes to the store immediately and then invalidates the
260///   cached entry to prevent stale reads.
261/// - `delete(k)` — deletes from the store and invalidates the cached entry.
262/// - All other [`KvStore`] methods (transactions, snapshots, iteration, …)
263///   delegate directly to the inner store without cache involvement.
264///
265/// # Type parameters
266///
267/// - `S`: a [`KvStore`] implementor.
268/// - `C`: a `Cache<Vec<u8>, Vec<u8>>` implementor that is also `Send`.
269pub struct CacheableKvStore<S, C> {
270    store: S,
271    cache: std::sync::Mutex<C>,
272}
273
274impl<S, C> CacheableKvStore<S, C> {
275    /// Create a new `CacheableKvStore` wrapping `store` and `cache`.
276    pub fn new(store: S, cache: C) -> Self {
277        CacheableKvStore {
278            store,
279            cache: std::sync::Mutex::new(cache),
280        }
281    }
282}
283
284impl<S, C> KvStore for CacheableKvStore<S, C>
285where
286    S: KvStore,
287    C: Cache<Vec<u8>, Vec<u8>> + Send,
288{
289    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
290        // CRITICAL: lock → check cache → if hit return clone → drop lock.
291        // Do NOT hold the lock while calling store.get.
292        let cached = {
293            let mut guard = self
294                .cache
295                .lock()
296                .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
297            guard.get(&key.to_vec()).cloned()
298        };
299        if let Some(v) = cached {
300            return Ok(Some(v));
301        }
302        // Cache miss — fetch from store without holding the lock.
303        let from_store = self.store.get(key)?;
304        if let Some(ref v) = from_store {
305            // Re-acquire lock to insert the fetched value.
306            let mut guard = self
307                .cache
308                .lock()
309                .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
310            guard.put(key.to_vec(), v.clone());
311        }
312        Ok(from_store)
313    }
314
315    fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
316        // Write to store first (durability), then invalidate cache entry.
317        self.store.put(key, value)?;
318        let mut guard = self
319            .cache
320            .lock()
321            .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
322        guard.remove(&key.to_vec());
323        Ok(())
324    }
325
326    fn delete(&self, key: &[u8]) -> Result<(), StoreError> {
327        self.store.delete(key)?;
328        let mut guard = self
329            .cache
330            .lock()
331            .map_err(|e| StoreError::Other(format!("cache lock poisoned: {e}")))?;
332        guard.remove(&key.to_vec());
333        Ok(())
334    }
335
336    fn range<'a>(
337        &'a self,
338        lo: &[u8],
339        hi: &[u8],
340    ) -> Result<oxistore_core::RangeIter<'a>, StoreError> {
341        self.store.range(lo, hi)
342    }
343
344    fn iter<'a>(&'a self) -> Result<oxistore_core::RangeIter<'a>, StoreError> {
345        self.store.iter()
346    }
347
348    fn transaction(&self) -> Result<Box<dyn oxistore_core::KvTxn + '_>, StoreError> {
349        self.store.transaction()
350    }
351
352    fn snapshot(&self) -> Result<Box<dyn oxistore_core::KvSnapshot + '_>, StoreError> {
353        self.store.snapshot()
354    }
355
356    fn flush(&self) -> Result<(), StoreError> {
357        self.store.flush()
358    }
359}
360
361// ---------------------------------------------------------------------------
362// Tests
363// ---------------------------------------------------------------------------
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::LruCache;
369    use oxistore_core::{KvSnapshot, KvStore, KvTxn, RangeIter, StoreError};
370    use std::collections::HashMap;
371    use std::sync::Mutex;
372
373    // Minimal in-memory KvStore for tests.
374    #[derive(Default, Debug)]
375    struct MemStore(Mutex<HashMap<Vec<u8>, Vec<u8>>>);
376
377    impl KvStore for MemStore {
378        fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
379            Ok(self.0.lock().expect("lock").get(key).cloned())
380        }
381
382        fn put(&self, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
383            self.0
384                .lock()
385                .expect("lock")
386                .insert(key.to_vec(), value.to_vec());
387            Ok(())
388        }
389
390        fn delete(&self, key: &[u8]) -> Result<(), StoreError> {
391            self.0.lock().expect("lock").remove(key);
392            Ok(())
393        }
394
395        fn range<'a>(&'a self, lo: &[u8], hi: &[u8]) -> Result<RangeIter<'a>, StoreError> {
396            let guard = self.0.lock().expect("lock");
397            let lo = lo.to_vec();
398            let hi = hi.to_vec();
399            let pairs: Vec<_> = guard
400                .iter()
401                .filter(|(k, _)| **k >= lo && **k < hi)
402                .map(|(k, v)| Ok((k.clone(), v.clone())))
403                .collect();
404            drop(guard);
405            Ok(Box::new(pairs.into_iter()))
406        }
407
408        fn transaction(&self) -> Result<Box<dyn KvTxn + '_>, StoreError> {
409            Err(StoreError::Other("MemStore: no txn".to_string()))
410        }
411
412        fn snapshot(&self) -> Result<Box<dyn KvSnapshot + '_>, StoreError> {
413            Err(StoreError::Other("MemStore: no snapshot".to_string()))
414        }
415
416        fn iter<'a>(&'a self) -> Result<RangeIter<'a>, StoreError> {
417            let guard = self.0.lock().expect("lock");
418            let mut pairs: Vec<(Vec<u8>, Vec<u8>)> =
419                guard.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
420            drop(guard);
421            pairs.sort_by(|(a, _), (b, _)| a.cmp(b));
422            Ok(Box::new(pairs.into_iter().map(Ok)))
423        }
424
425        fn flush(&self) -> Result<(), StoreError> {
426            Ok(())
427        }
428    }
429
430    // ── WriteThroughCache tests ───────────────────────────────────────────────
431
432    #[test]
433    fn write_through_put_flushes_to_store() {
434        let store = MemStore::default();
435        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
436        let mut wt = WriteThroughCache::new(store, cache);
437
438        wt.put(b"key".to_vec(), b"value".to_vec())
439            .expect("put failed");
440
441        // Verify the store was written immediately.
442        let from_store = wt.store().get(b"key").expect("get failed");
443        assert_eq!(from_store, Some(b"value".to_vec()));
444    }
445
446    #[test]
447    fn write_through_get_hits_cache() {
448        let store = MemStore::default();
449        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
450        let mut wt = WriteThroughCache::new(store, cache);
451
452        wt.put(b"k".to_vec(), b"v".to_vec()).expect("put");
453        let v = wt.get(b"k").expect("get");
454        assert_eq!(v, Some(b"v".to_vec()));
455    }
456
457    #[test]
458    fn write_through_get_miss_populates_from_store() {
459        let store = MemStore::default();
460        // Pre-populate store directly.
461        store.put(b"existing", b"from_store").expect("store put");
462
463        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
464        let mut wt = WriteThroughCache::new(store, cache);
465
466        // Cache is empty; should fetch from store and return value.
467        let v = wt.get(b"existing").expect("get");
468        assert_eq!(v, Some(b"from_store".to_vec()));
469
470        // Now the cache should be populated — second get should hit the cache.
471        assert_eq!(wt.cache_len(), 1);
472    }
473
474    #[test]
475    fn write_through_remove_clears_store() {
476        let store = MemStore::default();
477        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
478        let mut wt = WriteThroughCache::new(store, cache);
479
480        wt.put(b"rm_key".to_vec(), b"rm_val".to_vec()).expect("put");
481        wt.remove(b"rm_key").expect("remove");
482
483        let from_store = wt.store().get(b"rm_key").expect("store get");
484        assert!(from_store.is_none());
485    }
486
487    #[test]
488    fn write_through_get_miss_absent_in_store() {
489        let store = MemStore::default();
490        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
491        let mut wt = WriteThroughCache::new(store, cache);
492
493        let v = wt.get(b"no_such_key").expect("get");
494        assert!(v.is_none());
495    }
496
497    // ── WriteBackCache tests ──────────────────────────────────────────────────
498
499    #[test]
500    fn write_back_put_deferred() {
501        let store = MemStore::default();
502        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
503        let mut wb = WriteBackCache::new(store, cache);
504
505        wb.put(b"lazy".to_vec(), b"write".to_vec()).expect("put");
506
507        // Store should NOT have the value yet.
508        let from_store = wb.store().get(b"lazy").expect("store get");
509        assert!(from_store.is_none());
510        assert_eq!(wb.dirty_count(), 1);
511    }
512
513    #[test]
514    fn write_back_flush_persists() {
515        let store = MemStore::default();
516        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
517        let mut wb = WriteBackCache::new(store, cache);
518
519        wb.put(b"a".to_vec(), b"1".to_vec()).expect("put");
520        wb.put(b"b".to_vec(), b"2".to_vec()).expect("put");
521
522        wb.flush().expect("flush");
523
524        assert_eq!(wb.dirty_count(), 0);
525        assert_eq!(
526            wb.store().get(b"a").expect("store get a"),
527            Some(b"1".to_vec())
528        );
529        assert_eq!(
530            wb.store().get(b"b").expect("store get b"),
531            Some(b"2".to_vec())
532        );
533    }
534
535    #[test]
536    fn write_back_get_hits_cache() {
537        let store = MemStore::default();
538        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
539        let mut wb = WriteBackCache::new(store, cache);
540
541        wb.put(b"key".to_vec(), b"val".to_vec()).expect("put");
542        let v = wb.get(b"key").expect("get");
543        assert_eq!(v, Some(b"val".to_vec()));
544    }
545
546    #[test]
547    fn write_back_get_misses_to_store() {
548        let store = MemStore::default();
549        store.put(b"persistent", b"data").expect("store put");
550
551        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
552        let mut wb = WriteBackCache::new(store, cache);
553
554        let v = wb.get(b"persistent").expect("get");
555        assert_eq!(v, Some(b"data".to_vec()));
556    }
557
558    #[test]
559    fn write_back_remove_deletes_from_store() {
560        let store = MemStore::default();
561        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(8);
562        let mut wb = WriteBackCache::new(store, cache);
563
564        wb.put(b"del".to_vec(), b"gone".to_vec()).expect("put");
565        wb.flush().expect("flush");
566        wb.remove(b"del").expect("remove");
567
568        assert!(wb.store().get(b"del").expect("store get").is_none());
569        assert_eq!(wb.dirty_count(), 0);
570    }
571
572    #[test]
573    fn write_back_dirty_eviction_flushes() {
574        // Cache capacity = 2; insert 3 keys → eviction of 1st dirty key.
575        let store = MemStore::default();
576        let cache = LruCache::<Vec<u8>, Vec<u8>>::new(2);
577        let mut wb = WriteBackCache::new(store, cache);
578
579        wb.put(b"first".to_vec(), b"v1".to_vec()).expect("put 1");
580        wb.put(b"second".to_vec(), b"v2".to_vec()).expect("put 2");
581        // Inserting "third" should trigger eviction of "first" (LRU).
582        // Our pre-eviction hook flushes all dirty keys conservatively.
583        wb.put(b"third".to_vec(), b"v3".to_vec()).expect("put 3");
584
585        // After explicit flush everything should be in the store.
586        wb.flush().expect("flush");
587
588        // At minimum "second" and "third" are in the store.
589        let v2 = wb.store().get(b"second").expect("store get second");
590        let v3 = wb.store().get(b"third").expect("store get third");
591        assert_eq!(v2, Some(b"v2".to_vec()));
592        assert_eq!(v3, Some(b"v3".to_vec()));
593    }
594}