1use hashbrown::HashTable;
2use std::collections::{BinaryHeap, VecDeque};
3use std::mem;
4use std::sync::atomic::{AtomicUsize, Ordering};
5#[cfg(feature = "telemetry")]
6use std::time::Instant;
7
8use crate::config::EvictionPolicy;
9#[cfg(feature = "telemetry")]
10use crate::storage::CacheTelemetryHandle;
11use crate::storage::stats::TierStatsSnapshot;
12use crate::storage::{Bytes, StoredEntry, hash_key, hash_key_tag_from_hash};
13use bytes::Bytes as SharedBytes;
14
15#[derive(Debug)]
16struct FlatEntry {
17 hash: u64,
18 key_tag: u64,
19 key_len: usize,
20 key: Box<[u8]>,
21 value: SharedBytes,
24 expire_at_ms: Option<u64>,
25 access: EntryAccessMeta,
26}
27
28impl FlatEntry {
29 #[inline(always)]
30 fn matches(&self, hash: u64, key: &[u8]) -> bool {
31 self.matches_hashed_key(hash, key)
32 }
33
34 #[inline(always)]
35 fn matches_hashed_key(&self, hash: u64, key: &[u8]) -> bool {
36 self.hash == hash && self.key_len == key.len() && bytes_equal_hot(self.key.as_ref(), key)
37 }
38
39 #[inline(always)]
40 fn matches_prepared(&self, hash: u64, key: &[u8], _key_tag: u64) -> bool {
41 self.matches_hashed_key(hash, key)
42 }
43
44 #[inline(always)]
45 fn matches_tagged(&self, hash: u64, key_tag: u64, key_len: usize) -> bool {
46 self.hash == hash && self.key_tag == key_tag && self.key_len == key_len
47 }
48
49 #[inline(always)]
50 fn is_expired(&self, now_ms: u64) -> bool {
51 self.expire_at_ms.is_some_and(|deadline| deadline <= now_ms)
52 }
53}
54
55#[cfg(feature = "unsafe")]
56#[inline(always)]
57unsafe fn copy_hot_value_bytes(dst: *mut u8, src: *const u8, len: usize) {
58 unsafe { std::ptr::copy_nonoverlapping(src, dst, len) };
60}
61
62#[inline(always)]
63fn bytes_equal_hot(left: &[u8], right: &[u8]) -> bool {
64 left == right
65}
66
67#[inline(always)]
68fn shared_bytes_from_slice(value: &[u8]) -> SharedBytes {
69 if should_reuse_value_buffer(value.len()) {
70 SharedBytes::from(value.to_vec())
71 } else {
72 SharedBytes::copy_from_slice(value)
73 }
74}
75
76#[inline(always)]
77fn should_reuse_value_buffer(value_len: usize) -> bool {
78 value_len >= REUSABLE_VALUE_MIN_BYTES
79}
80
81fn shared_bytes_from_reusable_pool(
82 value: &[u8],
83 reusable_values: &mut Vec<SharedBytes>,
84 reusable_value_bytes: &mut usize,
85) -> SharedBytes {
86 let Some(position) = reusable_values
87 .iter()
88 .position(|candidate| candidate.len() == value.len())
89 else {
90 return shared_bytes_from_slice(value);
91 };
92
93 let reusable = reusable_values.swap_remove(position);
94 *reusable_value_bytes = reusable_value_bytes.saturating_sub(reusable.len());
95 match reusable.try_into_mut() {
96 Ok(mut writable) => {
97 writable[..].copy_from_slice(value);
98 writable.freeze()
99 }
100 Err(_reusable) => shared_bytes_from_slice(value),
101 }
102}
103
104#[inline(always)]
105fn recycle_value_into_pool(
106 value: SharedBytes,
107 reusable_values: &mut Vec<SharedBytes>,
108 reusable_value_bytes: &mut usize,
109) {
110 let value_len = value.len();
111 if !should_reuse_value_buffer(value_len) {
112 return;
113 }
114 if reusable_values.len() >= MAX_REUSABLE_VALUE_BUFFERS {
115 return;
116 }
117 if reusable_value_bytes.saturating_add(value_len) > MAX_REUSABLE_VALUE_BYTES {
118 return;
119 }
120 *reusable_value_bytes = reusable_value_bytes.saturating_add(value_len);
121 reusable_values.push(value);
122}
123
124#[derive(Debug, Clone, Copy, Default)]
125struct EntryAccessMeta {
126 last_touch: u64,
127 frequency: u32,
128}
129
130impl EntryAccessMeta {
131 #[inline(always)]
132 fn record_access(&mut self, tick: u64) {
133 self.last_touch = tick;
134 self.frequency = self.frequency.saturating_add(1).max(1);
135 }
136
137 #[inline(always)]
138 fn rank(&self, policy: EvictionPolicy) -> EvictionRank {
139 match policy {
140 EvictionPolicy::None => EvictionRank {
141 primary: u64::MAX,
142 secondary: u64::MAX,
143 },
144 EvictionPolicy::Lru => EvictionRank {
145 primary: self.last_touch,
146 secondary: 0,
147 },
148 EvictionPolicy::Lfu => EvictionRank {
149 primary: self.frequency as u64,
150 secondary: self.last_touch,
151 },
152 }
153 }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
157pub(crate) struct EvictionRank {
158 pub(crate) primary: u64,
159 pub(crate) secondary: u64,
160}
161
162#[derive(Debug)]
163struct EvictionCandidate {
164 rank: EvictionRank,
165 hash: u64,
166 key: Bytes,
167}
168
169impl PartialEq for EvictionCandidate {
170 fn eq(&self, other: &Self) -> bool {
171 self.rank == other.rank && self.hash == other.hash
172 }
173}
174
175impl Eq for EvictionCandidate {}
176
177impl PartialOrd for EvictionCandidate {
178 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179 Some(self.cmp(other))
180 }
181}
182
183impl Ord for EvictionCandidate {
184 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
185 self.rank
186 .cmp(&other.rank)
187 .then_with(|| self.hash.cmp(&other.hash))
188 }
189}
190
191#[derive(Debug)]
192struct LruTouch {
193 tick: u64,
194 hash: u64,
195}
196
197const REUSABLE_VALUE_MIN_BYTES: usize = 4096;
198const MAX_REUSABLE_VALUE_BUFFERS: usize = 128;
199const MAX_REUSABLE_VALUE_BYTES: usize = 8 * 1024 * 1024;
200
201#[derive(Debug, Default)]
202pub struct FlatMap {
203 entries: HashTable<FlatEntry>,
204 #[cfg(feature = "fast-point-map")]
205 fast_points: FastPointMap,
206 ttl_entries: usize,
207 active_readers: AtomicUsize,
208 retired_values: Vec<SharedBytes>,
209 reusable_values: Vec<SharedBytes>,
210 reusable_value_bytes: usize,
211 stored_bytes: usize,
212 memory_limit_bytes: Option<usize>,
213 eviction_policy: EvictionPolicy,
214 access_clock: u64,
217 read_sample_counter: u64,
219 lru_touch_log: VecDeque<LruTouch>,
220 evictions: u64,
221 #[cfg(feature = "telemetry")]
222 telemetry: Option<FlatMapTelemetry>,
223}
224
225#[cfg(feature = "telemetry")]
226#[derive(Debug, Clone)]
227struct FlatMapTelemetry {
228 metrics: CacheTelemetryHandle,
229 shard_id: usize,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233enum DeleteReason {
234 Explicit,
235 Expired,
236 Evicted,
237}
238#[cfg(feature = "fast-point-map")]
239mod fast_point;
240
241mod core;
242mod lifecycle;
243mod read;
244mod write;
245mod write_hot;
246mod write_local;
247
248#[cfg(feature = "fast-point-map")]
249use fast_point::FastPointMap;
250
251#[cfg(test)]
252mod tests {
253 use super::FlatMap;
254 #[cfg(feature = "embedded")]
255 use super::hash_key_tag_from_hash;
256 use super::{REUSABLE_VALUE_MIN_BYTES, hash_key};
257 use crate::config::EvictionPolicy;
258
259 #[test]
260 fn stores_reads_and_updates_values() {
261 let mut map = FlatMap::new();
262 map.set(b"alpha".to_vec(), b"one".to_vec(), None, 0);
263 assert_eq!(map.get(b"alpha", 0), Some(b"one".to_vec()));
264
265 map.set(b"alpha".to_vec(), b"two".to_vec(), None, 0);
266 assert_eq!(map.get(b"alpha", 0), Some(b"two".to_vec()));
267 }
268
269 #[test]
270 fn expires_values() {
271 let mut map = FlatMap::new();
272 map.set(b"alpha".to_vec(), b"one".to_vec(), Some(10), 0);
273
274 assert_eq!(map.get(b"alpha", 9), Some(b"one".to_vec()));
275 assert_eq!(map.ttl_seconds(b"alpha", 11), -2);
276 assert_eq!(map.get(b"alpha", 11), None);
277 }
278
279 #[test]
280 fn maintenance_removes_expired_entries() {
281 let mut map = FlatMap::new();
282 map.set(b"alpha".to_vec(), b"one".to_vec(), Some(10), 0);
283 map.set(b"beta".to_vec(), b"two".to_vec(), Some(10), 0);
284
285 assert_eq!(map.process_maintenance(11), 2);
286 assert!(map.is_empty());
287 }
288
289 #[test]
290 fn read_epoch_keeps_old_value_alive_across_update() {
291 let mut map = FlatMap::new();
292 map.set(b"alpha".to_vec(), b"one".to_vec(), None, 0);
293
294 map.begin_read_epoch();
295 let read = map.get_ref(b"alpha", 0).unwrap();
296 let ptr = read.as_ptr();
297 let len = read.len();
298
299 map.set(b"alpha".to_vec(), b"two".to_vec(), None, 0);
300
301 let stale = unsafe { std::slice::from_raw_parts(ptr, len) };
302 assert_eq!(stale, b"one");
303
304 map.end_read_epoch();
305 assert_eq!(map.get(b"alpha", 0), Some(b"two".to_vec()));
306 }
307
308 #[test]
309 fn lru_eviction_removes_least_recent_entry_under_cap() {
310 let mut map = FlatMap::new();
311 map.configure_memory_policy(Some(4), EvictionPolicy::Lru, 0);
312
313 map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
314 map.set(b"b".to_vec(), b"2".to_vec(), None, 0);
315 map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
316
317 map.set(b"c".to_vec(), b"3".to_vec(), None, 0);
318
319 assert_eq!(map.get(b"a", 0), Some(b"1".to_vec()));
320 assert_eq!(map.get(b"b", 0), None);
321 assert_eq!(map.get(b"c", 0), Some(b"3".to_vec()));
322 assert!(map.stored_bytes() <= 4);
323 assert_eq!(map.evictions(), 1);
324 }
325
326 #[cfg(feature = "embedded")]
327 #[test]
328 fn local_lru_reuses_evicted_large_value_buffer() {
329 let mut map = FlatMap::new();
330 map.configure_memory_policy(Some(5000), EvictionPolicy::Lru, 0);
331 let value = vec![7u8; REUSABLE_VALUE_MIN_BYTES];
332
333 let hash_a = hash_key(b"a");
334 map.set_slice_hashed_tagged_no_ttl_local(
335 hash_a,
336 hash_key_tag_from_hash(hash_a),
337 b"a",
338 &value,
339 );
340 map.enforce_memory_limit(0);
341
342 let hash_b = hash_key(b"b");
343 map.set_slice_hashed_tagged_no_ttl_local(
344 hash_b,
345 hash_key_tag_from_hash(hash_b),
346 b"b",
347 &value,
348 );
349 map.enforce_memory_limit(0);
350
351 assert_eq!(map.reusable_values.len(), 1);
352 let reusable_ptr = map.reusable_values[0].as_ptr();
353
354 let hash_c = hash_key(b"c");
355 map.set_slice_hashed_tagged_no_ttl_local(
356 hash_c,
357 hash_key_tag_from_hash(hash_c),
358 b"c",
359 &value,
360 );
361
362 let stored_ptr = map
363 .get_shared_value_bytes_hashed_no_ttl(hash_c, b"c")
364 .expect("new value is stored")
365 .as_ptr();
366 assert_eq!(stored_ptr, reusable_ptr);
367 }
368
369 #[test]
370 fn ttl_lru_reuses_evicted_large_value_buffer() {
371 let mut map = FlatMap::new();
372 let value = vec![7u8; REUSABLE_VALUE_MIN_BYTES];
373 map.configure_memory_policy(Some(value.len() + 2048), EvictionPolicy::Lru, 0);
374
375 let hash_a = hash_key(b"a");
376 map.set_slice_hashed(hash_a, b"a", &value, Some(60_000), 0);
377
378 let hash_b = hash_key(b"b");
379 map.set_slice_hashed(hash_b, b"b", &value, Some(60_000), 1);
380
381 assert_eq!(map.reusable_values.len(), 1);
382 let reusable_ptr = map.reusable_values[0].as_ptr();
383
384 let hash_c = hash_key(b"c");
385 map.set_slice_hashed(hash_c, b"c", &value, Some(60_000), 2);
386
387 let stored_ptr = map
388 .get_shared_value_bytes_hashed(hash_c, b"c", 2)
389 .expect("new value is stored")
390 .as_ptr();
391 assert_eq!(stored_ptr, reusable_ptr);
392 }
393
394 #[test]
395 fn ttl_lru_does_not_pool_small_value_buffers() {
396 let mut map = FlatMap::new();
397 let small_value_len = 512;
398 assert!(small_value_len < REUSABLE_VALUE_MIN_BYTES);
399 map.configure_memory_policy(Some(small_value_len + 88), EvictionPolicy::Lru, 0);
400 let value = vec![7u8; small_value_len];
401
402 map.set_slice_hashed(hash_key(b"a"), b"a", &value, Some(60_000), 0);
403 map.set_slice_hashed(hash_key(b"b"), b"b", &value, Some(60_000), 1);
404
405 assert_eq!(map.evictions(), 1);
406 assert!(map.reusable_values.is_empty());
407 assert_eq!(map.reusable_value_bytes, 0);
408 }
409
410 #[test]
411 fn lfu_eviction_removes_least_frequent_entry_under_cap() {
412 let mut map = FlatMap::new();
413 map.configure_memory_policy(Some(4), EvictionPolicy::Lfu, 0);
414
415 map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
416 map.set(b"b".to_vec(), b"2".to_vec(), None, 0);
417 map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
418 map.set(b"a".to_vec(), b"1".to_vec(), None, 0);
419
420 map.set(b"c".to_vec(), b"3".to_vec(), None, 0);
421
422 assert_eq!(map.get(b"a", 0), Some(b"1".to_vec()));
423 assert_eq!(map.get(b"b", 0), None);
424 assert_eq!(map.get(b"c", 0), Some(b"3".to_vec()));
425 assert!(map.stored_bytes() <= 4);
426 assert_eq!(map.evictions(), 1);
427 }
428}