Skip to main content

fast_cache/storage/
flat_map.rs

1use hashbrown::HashTable;
2use std::collections::{BinaryHeap, VecDeque};
3use std::mem;
4use std::sync::atomic::{AtomicUsize, Ordering};
5#[cfg(feature = "telemetry")]
6use std::time::Instant;
7
8use crate::config::EvictionPolicy;
9#[cfg(feature = "telemetry")]
10use crate::storage::CacheTelemetryHandle;
11use crate::storage::stats::TierStatsSnapshot;
12use crate::storage::{Bytes, StoredEntry, hash_key, hash_key_tag_from_hash};
13use bytes::Bytes as SharedBytes;
14
15#[derive(Debug)]
16struct FlatEntry {
17    hash: u64,
18    key_tag: u64,
19    key_len: usize,
20    key: Box<[u8]>,
21    /// Value held as `bytes::Bytes` for zero-copy `SET` from the read buffer.
22    /// `as_ref()` gives `&[u8]`; storage size = `value.len()`.
23    value: SharedBytes,
24    expire_at_ms: Option<u64>,
25    access: EntryAccessMeta,
26}
27
28impl FlatEntry {
29    #[inline(always)]
30    fn matches(&self, hash: u64, key: &[u8]) -> bool {
31        self.matches_hashed_key(hash, key)
32    }
33
34    #[inline(always)]
35    fn matches_hashed_key(&self, hash: u64, key: &[u8]) -> bool {
36        self.hash == hash && self.key_len == key.len() && bytes_equal_hot(self.key.as_ref(), key)
37    }
38
39    #[inline(always)]
40    fn matches_prepared(&self, hash: u64, key: &[u8], _key_tag: u64) -> bool {
41        self.matches_hashed_key(hash, key)
42    }
43
44    #[inline(always)]
45    fn matches_tagged(&self, hash: u64, key_tag: u64, key_len: usize) -> bool {
46        self.hash == hash && self.key_tag == key_tag && self.key_len == key_len
47    }
48
49    #[inline(always)]
50    fn is_expired(&self, now_ms: u64) -> bool {
51        self.expire_at_ms.is_some_and(|deadline| deadline <= now_ms)
52    }
53}
54
55#[cfg(feature = "unsafe")]
56#[inline(always)]
57unsafe fn copy_hot_value_bytes(dst: *mut u8, src: *const u8, len: usize) {
58    // SAFETY: forwarded from this function's caller.
59    unsafe { std::ptr::copy_nonoverlapping(src, dst, len) };
60}
61
62#[inline(always)]
63fn bytes_equal_hot(left: &[u8], right: &[u8]) -> bool {
64    left == right
65}
66
67#[inline(always)]
68fn shared_bytes_from_slice(value: &[u8]) -> SharedBytes {
69    if should_reuse_value_buffer(value.len()) {
70        SharedBytes::from(value.to_vec())
71    } else {
72        SharedBytes::copy_from_slice(value)
73    }
74}
75
76#[inline(always)]
77fn should_reuse_value_buffer(value_len: usize) -> bool {
78    value_len >= REUSABLE_VALUE_MIN_BYTES
79}
80
81fn shared_bytes_from_reusable_pool(
82    value: &[u8],
83    reusable_values: &mut Vec<SharedBytes>,
84    reusable_value_bytes: &mut usize,
85) -> SharedBytes {
86    let Some(position) = reusable_values
87        .iter()
88        .position(|candidate| candidate.len() == value.len())
89    else {
90        return shared_bytes_from_slice(value);
91    };
92
93    let reusable = reusable_values.swap_remove(position);
94    *reusable_value_bytes = reusable_value_bytes.saturating_sub(reusable.len());
95    match reusable.try_into_mut() {
96        Ok(mut writable) => {
97            writable[..].copy_from_slice(value);
98            writable.freeze()
99        }
100        Err(_reusable) => shared_bytes_from_slice(value),
101    }
102}
103
104#[inline(always)]
105fn recycle_value_into_pool(
106    value: SharedBytes,
107    reusable_values: &mut Vec<SharedBytes>,
108    reusable_value_bytes: &mut usize,
109) {
110    let value_len = value.len();
111    if !should_reuse_value_buffer(value_len) {
112        return;
113    }
114    if reusable_values.len() >= MAX_REUSABLE_VALUE_BUFFERS {
115        return;
116    }
117    if reusable_value_bytes.saturating_add(value_len) > MAX_REUSABLE_VALUE_BYTES {
118        return;
119    }
120    *reusable_value_bytes = reusable_value_bytes.saturating_add(value_len);
121    reusable_values.push(value);
122}
123
124#[derive(Debug, Clone, Copy, Default)]
125struct EntryAccessMeta {
126    last_touch: u64,
127    frequency: u32,
128}
129
130impl EntryAccessMeta {
131    #[inline(always)]
132    fn record_access(&mut self, tick: u64) {
133        self.last_touch = tick;
134        self.frequency = self.frequency.saturating_add(1).max(1);
135    }
136
137    #[inline(always)]
138    fn rank(&self, policy: EvictionPolicy) -> EvictionRank {
139        match policy {
140            EvictionPolicy::None => EvictionRank {
141                primary: u64::MAX,
142                secondary: u64::MAX,
143            },
144            EvictionPolicy::Lru => EvictionRank {
145                primary: self.last_touch,
146                secondary: 0,
147            },
148            EvictionPolicy::Lfu => EvictionRank {
149                primary: self.frequency as u64,
150                secondary: self.last_touch,
151            },
152        }
153    }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
157pub(crate) struct EvictionRank {
158    pub(crate) primary: u64,
159    pub(crate) secondary: u64,
160}
161
162#[derive(Debug)]
163struct EvictionCandidate {
164    rank: EvictionRank,
165    hash: u64,
166    key: Bytes,
167}
168
169impl PartialEq for EvictionCandidate {
170    fn eq(&self, other: &Self) -> bool {
171        self.rank == other.rank && self.hash == other.hash
172    }
173}
174
175impl Eq for EvictionCandidate {}
176
177impl PartialOrd for EvictionCandidate {
178    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179        Some(self.cmp(other))
180    }
181}
182
183impl Ord for EvictionCandidate {
184    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
185        self.rank
186            .cmp(&other.rank)
187            .then_with(|| self.hash.cmp(&other.hash))
188    }
189}
190
191#[derive(Debug)]
192struct LruTouch {
193    tick: u64,
194    hash: u64,
195}
196
197const REUSABLE_VALUE_MIN_BYTES: usize = 4096;
198const MAX_REUSABLE_VALUE_BUFFERS: usize = 128;
199const MAX_REUSABLE_VALUE_BYTES: usize = 8 * 1024 * 1024;
200
201#[derive(Debug, Default)]
202pub struct FlatMap {
203    entries: HashTable<FlatEntry>,
204    #[cfg(feature = "fast-point-map")]
205    fast_points: FastPointMap,
206    ttl_entries: usize,
207    active_readers: AtomicUsize,
208    retired_values: Vec<SharedBytes>,
209    reusable_values: Vec<SharedBytes>,
210    reusable_value_bytes: usize,
211    stored_bytes: usize,
212    memory_limit_bytes: Option<usize>,
213    eviction_policy: EvictionPolicy,
214    /// Monotonic counter for entry access timestamps. Metadata touches require
215    /// `&mut FlatMap`, so this stays shard-local and non-atomic.
216    access_clock: u64,
217    /// Sampling counter for the lazy LFU/LRU read-touch decision.
218    read_sample_counter: u64,
219    lru_touch_log: VecDeque<LruTouch>,
220    evictions: u64,
221    #[cfg(feature = "telemetry")]
222    telemetry: Option<FlatMapTelemetry>,
223}
224
225#[cfg(feature = "telemetry")]
226#[derive(Debug, Clone)]
227struct FlatMapTelemetry {
228    metrics: CacheTelemetryHandle,
229    shard_id: usize,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233enum DeleteReason {
234    Explicit,
235    Expired,
236    Evicted,
237}
238#[cfg(feature = "fast-point-map")]
239mod fast_point;
240
241mod core;
242mod lifecycle;
243mod read;
244mod write;
245mod write_hot;
246mod write_local;
247
248#[cfg(feature = "fast-point-map")]
249use fast_point::FastPointMap;
250
251#[cfg(test)]
252mod tests {
253    use super::FlatMap;
254    #[cfg(feature = "embedded")]
255    use super::hash_key_tag_from_hash;
256    use super::{REUSABLE_VALUE_MIN_BYTES, hash_key};
257    use crate::config::EvictionPolicy;
258
259    #[test]
260    fn stores_reads_and_updates_values() {
261        let mut map = FlatMap::new();
262        map.set(b"alpha".to_vec(), b"one".to_vec(), None, 0);
263        assert_eq!(map.get(b"alpha", 0), Some(b"one".to_vec()));
264
265        map.set(b"alpha".to_vec(), b"two".to_vec(), None, 0);
266        assert_eq!(map.get(b"alpha", 0), Some(b"two".to_vec()));
267    }
268
269    #[test]
270    fn expires_values() {
271        let mut map = FlatMap::new();
272        map.set(b"alpha".to_vec(), b"one".to_vec(), Some(10), 0);
273
274        assert_eq!(map.get(b"alpha", 9), Some(b"one".to_vec()));
275        assert_eq!(map.ttl_seconds(b"alpha", 11), -2);
276        assert_eq!(map.get(b"alpha", 11), None);
277    }
278
279    #[test]
280    fn maintenance_removes_expired_entries() {
281        let mut map = FlatMap::new();
282        map.set(b"alpha".to_vec(), b"one".to_vec(), Some(10), 0);
283        map.set(b"beta".to_vec(), b"two".to_vec(), Some(10), 0);
284
285        assert_eq!(map.process_maintenance(11), 2);
286        assert!(map.is_empty());
287    }
288
289    #[test]
290    fn read_epoch_keeps_old_value_alive_across_update() {
291        let mut map = FlatMap::new();
292        map.set(b"alpha".to_vec(), b"one".to_vec(), None, 0);
293
294        map.begin_read_epoch();
295        let read = map.get_ref(b"alpha", 0).unwrap();
296        let ptr = read.as_ptr();
297        let len = read.len();
298
299        map.set(b"alpha".to_vec(), b"two".to_vec(), None, 0);
300
301        let stale = unsafe { std::slice::from_raw_parts(ptr, len) };
302        assert_eq!(stale, b"one");
303
304        map.end_read_epoch();
305        assert_eq!(map.get(b"alpha", 0), Some(b"two".to_vec()));
306    }
307
308    #[test]
309    fn lru_eviction_removes_least_recent_entry_under_cap() {
310        let mut map = FlatMap::new();
311        map.configure_memory_policy(Some(4), EvictionPolicy::Lru, 0);
312
313        map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
314        map.set(b"b".to_vec(), b"2".to_vec(), None, 0);
315        map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
316
317        map.set(b"c".to_vec(), b"3".to_vec(), None, 0);
318
319        assert_eq!(map.get(b"a", 0), Some(b"1".to_vec()));
320        assert_eq!(map.get(b"b", 0), None);
321        assert_eq!(map.get(b"c", 0), Some(b"3".to_vec()));
322        assert!(map.stored_bytes() <= 4);
323        assert_eq!(map.evictions(), 1);
324    }
325
326    #[cfg(feature = "embedded")]
327    #[test]
328    fn local_lru_reuses_evicted_large_value_buffer() {
329        let mut map = FlatMap::new();
330        map.configure_memory_policy(Some(5000), EvictionPolicy::Lru, 0);
331        let value = vec![7u8; REUSABLE_VALUE_MIN_BYTES];
332
333        let hash_a = hash_key(b"a");
334        map.set_slice_hashed_tagged_no_ttl_local(
335            hash_a,
336            hash_key_tag_from_hash(hash_a),
337            b"a",
338            &value,
339        );
340        map.enforce_memory_limit(0);
341
342        let hash_b = hash_key(b"b");
343        map.set_slice_hashed_tagged_no_ttl_local(
344            hash_b,
345            hash_key_tag_from_hash(hash_b),
346            b"b",
347            &value,
348        );
349        map.enforce_memory_limit(0);
350
351        assert_eq!(map.reusable_values.len(), 1);
352        let reusable_ptr = map.reusable_values[0].as_ptr();
353
354        let hash_c = hash_key(b"c");
355        map.set_slice_hashed_tagged_no_ttl_local(
356            hash_c,
357            hash_key_tag_from_hash(hash_c),
358            b"c",
359            &value,
360        );
361
362        let stored_ptr = map
363            .get_shared_value_bytes_hashed_no_ttl(hash_c, b"c")
364            .expect("new value is stored")
365            .as_ptr();
366        assert_eq!(stored_ptr, reusable_ptr);
367    }
368
369    #[test]
370    fn ttl_lru_reuses_evicted_large_value_buffer() {
371        let mut map = FlatMap::new();
372        let value = vec![7u8; REUSABLE_VALUE_MIN_BYTES];
373        map.configure_memory_policy(Some(value.len() + 2048), EvictionPolicy::Lru, 0);
374
375        let hash_a = hash_key(b"a");
376        map.set_slice_hashed(hash_a, b"a", &value, Some(60_000), 0);
377
378        let hash_b = hash_key(b"b");
379        map.set_slice_hashed(hash_b, b"b", &value, Some(60_000), 1);
380
381        assert_eq!(map.reusable_values.len(), 1);
382        let reusable_ptr = map.reusable_values[0].as_ptr();
383
384        let hash_c = hash_key(b"c");
385        map.set_slice_hashed(hash_c, b"c", &value, Some(60_000), 2);
386
387        let stored_ptr = map
388            .get_shared_value_bytes_hashed(hash_c, b"c", 2)
389            .expect("new value is stored")
390            .as_ptr();
391        assert_eq!(stored_ptr, reusable_ptr);
392    }
393
394    #[test]
395    fn ttl_lru_does_not_pool_small_value_buffers() {
396        let mut map = FlatMap::new();
397        let small_value_len = 512;
398        assert!(small_value_len < REUSABLE_VALUE_MIN_BYTES);
399        map.configure_memory_policy(Some(small_value_len + 88), EvictionPolicy::Lru, 0);
400        let value = vec![7u8; small_value_len];
401
402        map.set_slice_hashed(hash_key(b"a"), b"a", &value, Some(60_000), 0);
403        map.set_slice_hashed(hash_key(b"b"), b"b", &value, Some(60_000), 1);
404
405        assert_eq!(map.evictions(), 1);
406        assert!(map.reusable_values.is_empty());
407        assert_eq!(map.reusable_value_bytes, 0);
408    }
409
410    #[test]
411    fn lfu_eviction_removes_least_frequent_entry_under_cap() {
412        let mut map = FlatMap::new();
413        map.configure_memory_policy(Some(4), EvictionPolicy::Lfu, 0);
414
415        map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
416        map.set(b"b".to_vec(), b"2".to_vec(), None, 0);
417        map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
418        map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
419
420        map.set(b"c".to_vec(), b"3".to_vec(), None, 0);
421
422        assert_eq!(map.get(b"a", 0), Some(b"1".to_vec()));
423        assert_eq!(map.get(b"b", 0), None);
424        assert_eq!(map.get(b"c", 0), Some(b"3".to_vec()));
425        assert!(map.stored_bytes() <= 4);
426        assert_eq!(map.evictions(), 1);
427    }
428}