Skip to main content

reddb_server/storage/cache/
ring.rs

1//! Fixed-size circular page ring used by [`super::strategy::BufferAccessStrategy`].
2//!
3//! A `BufferRing` is a small, isolated cache that recycles slots in a
4//! clock-style hand sweep. It exists specifically so that sequential
5//! scans, bulk reads, and bulk writes do **not** populate the main
6//! SIEVE pool — see `src/storage/cache/README.md` § Invariant 4.
7//!
8//! Unlike [`super::sieve::PageCache`], the ring has no pin counts, no
9//! visited bits, and no eviction policy beyond "the slot at the hand
10//! is the next victim." That is the entire point — sequential scans
11//! visit each page exactly once, so any policy beyond FIFO is wasted
12//! work.
13//!
14//! The ring is read-and-written *separately* from the main pool. A
15//! page in the ring does not appear in the main pool, and vice versa.
16//! Callers that want main-pool semantics simply do not pass a strategy.
17
18use std::collections::HashMap;
19use std::hash::Hash;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::RwLock;
22
23/// Recover from a poisoned read lock. Mirrors the convention used by
24/// [`super::sieve`] — poisoning is treated as a non-fatal hint that a
25/// previous writer panicked while holding the lock.
26fn read_lock<T>(lock: &RwLock<T>) -> std::sync::RwLockReadGuard<'_, T> {
27    lock.read().unwrap_or_else(|p| p.into_inner())
28}
29
30fn write_lock<T>(lock: &RwLock<T>) -> std::sync::RwLockWriteGuard<'_, T> {
31    lock.write().unwrap_or_else(|p| p.into_inner())
32}
33
34/// Fixed-capacity circular cache.
35///
36/// Inserting into a full ring evicts the slot at the current hand
37/// position and returns the evicted `(key, value)` so the caller can
38/// flush it (e.g. through the pager's double-write buffer for
39/// `BulkWrite` strategies).
40pub struct BufferRing<K, V>
41where
42    K: Clone + Eq + Hash,
43    V: Clone,
44{
45    capacity: usize,
46    slots: RwLock<Vec<Option<(K, V)>>>,
47    hand: AtomicUsize,
48    map: RwLock<HashMap<K, usize>>,
49}
50
51impl<K, V> BufferRing<K, V>
52where
53    K: Clone + Eq + Hash,
54    V: Clone,
55{
56    /// Create a new ring with the given fixed capacity. `capacity` is
57    /// clamped to at least 1.
58    pub fn new(capacity: usize) -> Self {
59        let capacity = capacity.max(1);
60        Self {
61            capacity,
62            slots: RwLock::new(vec![None; capacity]),
63            hand: AtomicUsize::new(0),
64            map: RwLock::new(HashMap::with_capacity(capacity)),
65        }
66    }
67
68    /// Configured ring capacity.
69    pub fn capacity(&self) -> usize {
70        self.capacity
71    }
72
73    /// Number of occupied slots.
74    pub fn len(&self) -> usize {
75        read_lock(&self.map).len()
76    }
77
78    /// Whether the ring has zero occupied slots.
79    pub fn is_empty(&self) -> bool {
80        self.len() == 0
81    }
82
83    /// Look up a key. Does NOT update any recency / visited state.
84    pub fn get(&self, key: &K) -> Option<V> {
85        let map = read_lock(&self.map);
86        let idx = *map.get(key)?;
87        drop(map);
88        let slots = read_lock(&self.slots);
89        slots
90            .get(idx)
91            .and_then(|s| s.as_ref().map(|(_, v)| v.clone()))
92    }
93
94    /// Insert a `(key, value)` pair. If the ring is full, the slot at
95    /// the current hand position is evicted and the hand advances.
96    /// Returns the evicted `(key, value)` if any.
97    ///
98    /// If `key` is already present in the ring, its slot is updated in
99    /// place and `None` is returned.
100    pub fn insert(&self, key: K, value: V) -> Option<(K, V)> {
101        // Update-in-place fast path.
102        {
103            let map = read_lock(&self.map);
104            if let Some(&idx) = map.get(&key) {
105                drop(map);
106                let mut slots = write_lock(&self.slots);
107                if let Some(slot) = slots.get_mut(idx) {
108                    *slot = Some((key, value));
109                }
110                return None;
111            }
112        }
113
114        // Find an empty slot first (ring not yet full).
115        let mut map = write_lock(&self.map);
116        let mut slots = write_lock(&self.slots);
117        if map.len() < self.capacity {
118            // Find first empty slot starting from the hand.
119            let start = self.hand.load(Ordering::Relaxed) % self.capacity;
120            for offset in 0..self.capacity {
121                let idx = (start + offset) % self.capacity;
122                if slots[idx].is_none() {
123                    slots[idx] = Some((key.clone(), value));
124                    map.insert(key, idx);
125                    self.hand
126                        .store((idx + 1) % self.capacity, Ordering::Relaxed);
127                    return None;
128                }
129            }
130        }
131
132        // Ring full — evict the slot at the hand and replace.
133        let victim_idx = self.hand.load(Ordering::Relaxed) % self.capacity;
134        let evicted = slots[victim_idx].take();
135        if let Some((ref evicted_key, _)) = evicted {
136            map.remove(evicted_key);
137        }
138        slots[victim_idx] = Some((key.clone(), value));
139        map.insert(key, victim_idx);
140        self.hand
141            .store((victim_idx + 1) % self.capacity, Ordering::Relaxed);
142        evicted
143    }
144
145    /// Drop every slot. Used by `clear_cursors`-style cleanup paths.
146    pub fn clear(&self) {
147        let mut slots = write_lock(&self.slots);
148        let mut map = write_lock(&self.map);
149        for slot in slots.iter_mut() {
150            *slot = None;
151        }
152        map.clear();
153        self.hand.store(0, Ordering::Relaxed);
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn empty_ring_returns_none() {
163        let ring: BufferRing<u32, &str> = BufferRing::new(4);
164        assert!(ring.is_empty());
165        assert_eq!(ring.len(), 0);
166        assert!(ring.get(&1).is_none());
167    }
168
169    #[test]
170    fn insert_into_empty_slot_no_eviction() {
171        let ring: BufferRing<u32, String> = BufferRing::new(4);
172        assert!(ring.insert(1, "a".to_string()).is_none());
173        assert!(ring.insert(2, "b".to_string()).is_none());
174        assert_eq!(ring.get(&1), Some("a".to_string()));
175        assert_eq!(ring.get(&2), Some("b".to_string()));
176        assert_eq!(ring.len(), 2);
177    }
178
179    #[test]
180    fn ring_recycles_at_capacity() {
181        // Insert 20 pages into a 16-slot ring; first 4 must be evicted.
182        let ring: BufferRing<u32, u32> = BufferRing::new(16);
183        for i in 0..20 {
184            let _ = ring.insert(i, i * 10);
185        }
186        assert_eq!(ring.len(), 16);
187        // The first 4 (keys 0..4) should have been evicted by the hand
188        // sweep that started at slot 0.
189        for i in 0..4 {
190            assert!(ring.get(&i).is_none(), "key {i} should be evicted");
191        }
192        // The last 16 must be present.
193        for i in 4..20 {
194            assert_eq!(ring.get(&i), Some(i * 10), "key {i} should be present");
195        }
196    }
197
198    #[test]
199    fn insert_returns_evicted_pair_when_full() {
200        let ring: BufferRing<u32, &str> = BufferRing::new(2);
201        assert!(ring.insert(1, "a").is_none());
202        assert!(ring.insert(2, "b").is_none());
203        // Now full; next insert evicts.
204        let evicted = ring.insert(3, "c");
205        assert!(evicted.is_some());
206        let (ek, ev) = evicted.unwrap();
207        // Hand was at 0 after first insert (slot 0 occupied), so the
208        // first eviction comes from slot 0 = key 1.
209        assert_eq!(ek, 1);
210        assert_eq!(ev, "a");
211    }
212
213    #[test]
214    fn update_in_place_does_not_evict() {
215        let ring: BufferRing<u32, &str> = BufferRing::new(2);
216        ring.insert(1, "a");
217        ring.insert(2, "b");
218        // Re-insert key 1 — should overwrite, not evict.
219        let evicted = ring.insert(1, "A");
220        assert!(evicted.is_none());
221        assert_eq!(ring.get(&1), Some("A"));
222        assert_eq!(ring.get(&2), Some("b"));
223        assert_eq!(ring.len(), 2);
224    }
225
226    #[test]
227    fn clear_drops_all_entries() {
228        let ring: BufferRing<u32, &str> = BufferRing::new(4);
229        ring.insert(1, "a");
230        ring.insert(2, "b");
231        ring.clear();
232        assert!(ring.is_empty());
233        assert!(ring.get(&1).is_none());
234    }
235
236    #[test]
237    fn capacity_clamped_to_at_least_one() {
238        let ring: BufferRing<u32, &str> = BufferRing::new(0);
239        assert_eq!(ring.capacity(), 1);
240        ring.insert(1, "a");
241        // Inserting a second key evicts the first.
242        let ev = ring.insert(2, "b");
243        assert_eq!(ev, Some((1, "a")));
244    }
245}