Skip to main content

fast_cache/storage/flat_map/
core.rs

1use super::*;
2
3impl FlatMap {
4    pub fn new() -> Self {
5        Self {
6            entries: HashTable::new(),
7            #[cfg(feature = "fast-point-map")]
8            fast_points: FastPointMap::default(),
9            ttl_entries: 0,
10            active_readers: AtomicUsize::new(0),
11            retired_values: Vec::new(),
12            reusable_values: Vec::new(),
13            reusable_value_bytes: 0,
14            stored_bytes: 0,
15            memory_limit_bytes: None,
16            eviction_policy: EvictionPolicy::None,
17            access_clock: 0,
18            read_sample_counter: 0,
19            lru_touch_log: VecDeque::new(),
20            evictions: 0,
21            #[cfg(feature = "telemetry")]
22            telemetry: None,
23        }
24    }
25
26    pub(crate) fn with_capacity(capacity: usize) -> Self {
27        let mut map = Self::new();
28        if capacity > 0 {
29            map.entries = HashTable::with_capacity(capacity);
30        }
31        map
32    }
33
34    pub fn from_entries(entries: impl IntoIterator<Item = StoredEntry>, now_ms: u64) -> Self {
35        let mut map = Self::new();
36        for entry in entries {
37            if entry
38                .expire_at_ms
39                .is_some_and(|deadline| deadline <= now_ms)
40            {
41                continue;
42            }
43            map.set(entry.key, entry.value, entry.expire_at_ms, now_ms);
44        }
45        map
46    }
47
48    #[inline(always)]
49    pub fn len(&self) -> usize {
50        #[cfg(feature = "fast-point-map")]
51        if self.fast_points.is_active() {
52            return self.fast_points.len();
53        }
54        self.entries.len()
55    }
56
57    #[inline(always)]
58    pub fn stored_bytes(&self) -> usize {
59        self.stored_bytes
60    }
61
62    #[inline(always)]
63    pub fn memory_limit_bytes(&self) -> Option<usize> {
64        self.memory_limit_bytes
65    }
66
67    #[inline(always)]
68    pub fn eviction_policy(&self) -> EvictionPolicy {
69        self.eviction_policy
70    }
71
72    #[inline(always)]
73    pub fn evictions(&self) -> u64 {
74        self.evictions
75    }
76
77    #[inline(always)]
78    pub fn is_empty(&self) -> bool {
79        #[cfg(feature = "fast-point-map")]
80        if self.fast_points.is_active() {
81            return self.fast_points.is_empty();
82        }
83        self.entries.is_empty()
84    }
85
86    #[cfg(feature = "telemetry")]
87    pub fn attach_metrics(&mut self, metrics: CacheTelemetryHandle, shard_id: usize) {
88        self.telemetry = Some(FlatMapTelemetry { metrics, shard_id });
89        self.sync_metrics_state();
90    }
91
92    pub fn configure_memory_policy(
93        &mut self,
94        memory_limit_bytes: Option<usize>,
95        eviction_policy: EvictionPolicy,
96        now_ms: u64,
97    ) {
98        self.memory_limit_bytes = memory_limit_bytes.filter(|limit| *limit > 0);
99        self.eviction_policy = eviction_policy;
100        if self.memory_limit_bytes.is_some() || self.eviction_policy != EvictionPolicy::None {
101            self.disable_fast_point_map();
102        } else {
103            self.reusable_values.clear();
104            self.reusable_value_bytes = 0;
105        }
106        self.enforce_memory_limit(now_ms);
107    }
108
109    #[inline(always)]
110    pub(super) fn entry_is_expired_hashed(&self, hash: u64, key: &[u8], now_ms: u64) -> bool {
111        self.entries
112            .find(hash, |entry| entry.matches(hash, key))
113            .is_some_and(|entry| entry.is_expired(now_ms))
114    }
115
116    #[inline(always)]
117    pub(super) fn lookup_ref_hashed_lazy(&mut self, hash: u64, key: &[u8]) -> Option<&[u8]> {
118        #[cfg(feature = "fast-point-map")]
119        if self.fast_points.is_active() {
120            return self.fast_points.get(hash, key).map(|value| value.as_ref());
121        }
122        if self.should_sample_read() {
123            let tick = self.next_access_tick();
124            self.entries
125                .find_mut(hash, |entry| entry.matches(hash, key))
126                .map(|entry| {
127                    entry.access.record_access(tick);
128                    entry.value.as_ref()
129                })
130        } else {
131            self.entries
132                .find(hash, |entry| entry.matches(hash, key))
133                .map(|entry| entry.value.as_ref())
134        }
135    }
136
137    #[inline(always)]
138    pub(super) fn lookup_ref_hashed_prepared_lazy(
139        &mut self,
140        hash: u64,
141        key: &[u8],
142        key_tag: u64,
143    ) -> Option<&[u8]> {
144        #[cfg(feature = "fast-point-map")]
145        if self.fast_points.is_active() {
146            return self.fast_points.get(hash, key).map(|value| value.as_ref());
147        }
148        if self.should_sample_read() {
149            let tick = self.next_access_tick();
150            self.entries
151                .find_mut(hash, |entry| entry.matches_prepared(hash, key, key_tag))
152                .map(|entry| {
153                    entry.access.record_access(tick);
154                    entry.value.as_ref()
155                })
156        } else {
157            self.entries
158                .find(hash, |entry| entry.matches_prepared(hash, key, key_tag))
159                .map(|entry| entry.value.as_ref())
160        }
161    }
162
163    #[inline(always)]
164    pub(super) fn adjust_ttl_count(&mut self, had_ttl: bool, has_ttl: bool) {
165        match (had_ttl, has_ttl) {
166            (false, true) => {
167                self.disable_fast_point_map();
168                self.ttl_entries = self.ttl_entries.saturating_add(1);
169            }
170            (true, false) => {
171                self.ttl_entries = self.ttl_entries.saturating_sub(1);
172            }
173            _ => {}
174        }
175    }
176
177    #[inline(always)]
178    pub(super) fn disable_fast_point_map(&mut self) {
179        #[cfg(feature = "fast-point-map")]
180        if self.fast_points.is_active() {
181            debug_assert!(self.entries.is_empty());
182            for fast_entry in self.fast_points.take_entries_and_disable() {
183                let entry = fast_entry.into_flat_entry();
184                self.entries
185                    .insert_unique(entry.hash, entry, |entry| entry.hash);
186            }
187        }
188    }
189
190    #[inline(always)]
191    pub(super) fn retire_value(&mut self, value: SharedBytes) {
192        if self.has_active_readers() {
193            self.retired_values.push(value);
194        } else {
195            self.recycle_value(value);
196        }
197    }
198
199    #[inline(always)]
200    pub(super) fn recycle_value(&mut self, value: SharedBytes) {
201        if self.eviction_policy == EvictionPolicy::None || self.memory_limit_bytes.is_none() {
202            return;
203        }
204        recycle_value_into_pool(
205            value,
206            &mut self.reusable_values,
207            &mut self.reusable_value_bytes,
208        );
209    }
210
211    #[inline(always)]
212    pub(super) fn has_active_readers(&self) -> bool {
213        self.active_readers.load(Ordering::Acquire) > 0
214    }
215
216    #[inline(always)]
217    pub(super) fn reclaim_retired_if_quiescent(&mut self) {
218        if !self.retired_values.is_empty() && !self.has_active_readers() {
219            let retired_values = mem::take(&mut self.retired_values);
220            for value in retired_values {
221                self.recycle_value(value);
222            }
223        }
224    }
225
226    #[inline(always)]
227    pub(super) fn next_access_tick(&mut self) -> u64 {
228        self.access_clock = self.access_clock.wrapping_add(1);
229        self.access_clock
230    }
231
232    #[inline(always)]
233    pub(super) fn record_lru_touch(&mut self, hash: u64, tick: u64) {
234        if tick == 0 || self.eviction_policy != EvictionPolicy::Lru {
235            return;
236        }
237        self.lru_touch_log.push_back(LruTouch { tick, hash });
238        self.compact_lru_touch_log_if_needed();
239    }
240
241    #[inline(always)]
242    fn compact_lru_touch_log_if_needed(&mut self) {
243        let max_log_len = self.entries.len().saturating_mul(4).max(1024);
244        if self.lru_touch_log.len() <= max_log_len {
245            return;
246        }
247        self.rebuild_lru_touch_log();
248    }
249
250    fn rebuild_lru_touch_log(&mut self) {
251        let mut touches = self
252            .entries
253            .iter()
254            .filter_map(|entry| match entry.access.last_touch {
255                0 => None,
256                tick => Some(LruTouch {
257                    tick,
258                    hash: entry.hash,
259                }),
260            })
261            .collect::<Vec<_>>();
262        touches.sort_unstable_by_key(|touch| touch.tick);
263        self.lru_touch_log = touches.into();
264    }
265
266    #[inline(always)]
267    fn should_sample_read(&mut self) -> bool {
268        const READ_TOUCH_SAMPLE_MASK: u64 = 1023;
269        if self.eviction_policy == EvictionPolicy::None {
270            return false;
271        }
272        if self.memory_limit_bytes.is_none() {
273            return false;
274        }
275        let limit = self.memory_limit_bytes.unwrap();
276        let watermark = limit.saturating_mul(3) / 4;
277        if self.stored_bytes < watermark.max(1) {
278            return false;
279        }
280        self.read_sample_counter = self.read_sample_counter.wrapping_add(1);
281        (self.read_sample_counter & READ_TOUCH_SAMPLE_MASK) == 0
282    }
283
284    pub(super) fn enforce_memory_limit(&mut self, now_ms: u64) {
285        let Some(limit) = self.memory_limit_bytes else {
286            return;
287        };
288        if self.stored_bytes <= limit {
289            return;
290        }
291        if self.ttl_entries > 0 {
292            self.process_maintenance(now_ms);
293        }
294        self.evict_to_memory_target(self.eviction_policy, now_ms, eviction_target_bytes(limit));
295    }
296
297    pub(crate) fn eviction_candidate(
298        &self,
299        policy: EvictionPolicy,
300    ) -> Option<(EvictionRank, u64, Bytes)> {
301        if policy == EvictionPolicy::None || self.entries.is_empty() {
302            return None;
303        }
304
305        let mut selected: Option<(EvictionRank, u64, &[u8])> = None;
306        for entry in self.entries.iter() {
307            let candidate = (entry.access.rank(policy), entry.hash, entry.key.as_ref());
308            selected = match selected {
309                Some(current) if current.0 <= candidate.0 => Some(current),
310                _ => Some(candidate),
311            };
312        }
313        selected.map(|(rank, hash, key)| (rank, hash, key.to_vec()))
314    }
315
316    pub(crate) fn evict_with_policy(&mut self, policy: EvictionPolicy, now_ms: u64) -> bool {
317        let Some((_rank, hash, key)) = self.eviction_candidate(policy) else {
318            return false;
319        };
320        self.delete_hashed_internal(hash, &key, now_ms, DeleteReason::Evicted)
321    }
322
323    pub(crate) fn evict_to_memory_target(
324        &mut self,
325        policy: EvictionPolicy,
326        now_ms: u64,
327        target_bytes: usize,
328    ) -> bool {
329        if policy == EvictionPolicy::None || self.entries.is_empty() {
330            return false;
331        }
332
333        if policy == EvictionPolicy::Lru {
334            let evicted = self.evict_lru_from_touch_log(now_ms, target_bytes);
335            if self.stored_bytes <= target_bytes || self.entries.is_empty() {
336                return evicted;
337            }
338        }
339
340        let mut evicted = false;
341        while self.stored_bytes > target_bytes {
342            let mut candidates = self.eviction_candidates_for_target(policy, target_bytes);
343            if candidates.is_empty() {
344                break;
345            }
346            candidates.sort_unstable_by_key(|candidate| candidate.rank);
347            let mut evicted_batch = false;
348            for candidate in candidates {
349                if self.stored_bytes <= target_bytes {
350                    break;
351                }
352                evicted_batch |= self.delete_hashed_internal(
353                    candidate.hash,
354                    &candidate.key,
355                    now_ms,
356                    DeleteReason::Evicted,
357                );
358            }
359            if !evicted_batch {
360                break;
361            }
362            evicted = true;
363        }
364        evicted
365    }
366
367    fn evict_lru_from_touch_log(&mut self, _now_ms: u64, target_bytes: usize) -> bool {
368        let mut evicted = false;
369        while self.stored_bytes > target_bytes {
370            let Some(touch) = self.lru_touch_log.pop_front() else {
371                break;
372            };
373            let Some(entry) = self
374                .entries
375                .find_entry(touch.hash, |entry| entry.access.last_touch == touch.tick)
376                .ok()
377            else {
378                continue;
379            };
380            let removed_key_len = entry.get().key.len();
381            let removed_value_len = entry.get().value.len();
382            let had_ttl = entry.get().expire_at_ms.is_some();
383            let (removed, _) = entry.remove();
384            if had_ttl {
385                self.ttl_entries = self.ttl_entries.saturating_sub(1);
386            }
387            self.stored_bytes = self
388                .stored_bytes
389                .saturating_sub(removed_key_len.saturating_add(removed_value_len));
390            self.retire_value(removed.value);
391            self.evictions = self.evictions.saturating_add(1);
392            #[cfg(feature = "telemetry")]
393            self.record_delete_metrics(
394                DeleteReason::Evicted,
395                -1,
396                -((removed_key_len + removed_value_len) as isize),
397            );
398            evicted = true;
399        }
400        evicted
401    }
402
403    fn eviction_candidates_for_target(
404        &self,
405        policy: EvictionPolicy,
406        target_bytes: usize,
407    ) -> Vec<EvictionCandidate> {
408        let target_count = self.eviction_candidate_count(target_bytes);
409        if target_count == 0 {
410            return Vec::new();
411        }
412
413        let mut candidates = BinaryHeap::with_capacity(target_count);
414        for entry in self.entries.iter() {
415            let rank = entry.access.rank(policy);
416            if candidates.len() < target_count {
417                candidates.push(EvictionCandidate {
418                    rank,
419                    hash: entry.hash,
420                    key: entry.key.as_ref().to_vec(),
421                });
422                continue;
423            }
424
425            let Some(mut warmest_candidate) = candidates.peek_mut() else {
426                continue;
427            };
428            if rank < warmest_candidate.rank {
429                *warmest_candidate = EvictionCandidate {
430                    rank,
431                    hash: entry.hash,
432                    key: entry.key.as_ref().to_vec(),
433                };
434            }
435        }
436        candidates.into_vec()
437    }
438
439    fn eviction_candidate_count(&self, target_bytes: usize) -> usize {
440        let bytes_to_free = self.stored_bytes.saturating_sub(target_bytes);
441        if bytes_to_free == 0 || self.entries.is_empty() {
442            return 0;
443        }
444
445        let average_entry_bytes = (self.stored_bytes / self.entries.len()).max(1);
446        let estimated_count = bytes_to_free.div_ceil(average_entry_bytes);
447        let safety_margin = (estimated_count / 8).saturating_add(8);
448        estimated_count
449            .saturating_add(safety_margin)
450            .min(self.entries.len())
451    }
452
453    pub(crate) fn eviction_target_bytes(limit: usize) -> usize {
454        eviction_target_bytes(limit)
455    }
456
457    #[cfg(feature = "telemetry")]
458    #[inline(always)]
459    pub(super) fn record_set_metrics(
460        &self,
461        written_len: usize,
462        key_delta: isize,
463        memory_delta: isize,
464        start: Option<Instant>,
465    ) {
466        if let (Some(telemetry), Some(start)) = (&self.telemetry, start) {
467            telemetry.metrics.record_set(
468                telemetry.shard_id,
469                written_len,
470                start.elapsed().as_nanos() as u64,
471            );
472            telemetry
473                .metrics
474                .adjust_keys_total(telemetry.shard_id, key_delta);
475            telemetry
476                .metrics
477                .adjust_memory_bytes(telemetry.shard_id, memory_delta);
478            telemetry
479                .metrics
480                .set_shard_keys(telemetry.shard_id, self.len());
481        }
482    }
483
484    #[cfg(feature = "telemetry")]
485    #[inline(always)]
486    pub(super) fn record_delete_metrics(
487        &self,
488        reason: DeleteReason,
489        key_delta: isize,
490        memory_delta: isize,
491    ) {
492        if let Some(telemetry) = &self.telemetry {
493            match reason {
494                DeleteReason::Explicit => telemetry.metrics.record_delete(telemetry.shard_id),
495                DeleteReason::Expired => telemetry.metrics.record_expiration(1),
496                DeleteReason::Evicted => {}
497            }
498            telemetry
499                .metrics
500                .adjust_keys_total(telemetry.shard_id, key_delta);
501            telemetry
502                .metrics
503                .adjust_memory_bytes(telemetry.shard_id, memory_delta);
504            telemetry
505                .metrics
506                .set_shard_keys(telemetry.shard_id, self.len());
507        }
508    }
509
510    #[cfg(feature = "telemetry")]
511    #[inline(always)]
512    fn sync_metrics_state(&self) {
513        if let Some(telemetry) = &self.telemetry {
514            telemetry
515                .metrics
516                .set_shard_keys(telemetry.shard_id, self.len());
517            telemetry
518                .metrics
519                .adjust_keys_total(telemetry.shard_id, self.len() as isize);
520            telemetry
521                .metrics
522                .adjust_memory_bytes(telemetry.shard_id, self.stored_bytes as isize);
523        }
524    }
525}
526
527fn eviction_target_bytes(limit: usize) -> usize {
528    const EXACT_EVICTION_LIMIT_BYTES: usize = 4096;
529    if limit <= EXACT_EVICTION_LIMIT_BYTES {
530        return limit;
531    }
532    limit.saturating_sub((limit / 20).max(1))
533}