fast_cache/storage/flat_map/
core.rs1use super::*;
2
3impl FlatMap {
4 pub fn new() -> Self {
5 Self {
6 entries: HashTable::new(),
7 #[cfg(feature = "fast-point-map")]
8 fast_points: FastPointMap::default(),
9 ttl_entries: 0,
10 active_readers: AtomicUsize::new(0),
11 retired_values: Vec::new(),
12 reusable_values: Vec::new(),
13 reusable_value_bytes: 0,
14 stored_bytes: 0,
15 memory_limit_bytes: None,
16 eviction_policy: EvictionPolicy::None,
17 access_clock: 0,
18 read_sample_counter: 0,
19 lru_touch_log: VecDeque::new(),
20 evictions: 0,
21 #[cfg(feature = "telemetry")]
22 telemetry: None,
23 }
24 }
25
26 pub(crate) fn with_capacity(capacity: usize) -> Self {
27 let mut map = Self::new();
28 if capacity > 0 {
29 map.entries = HashTable::with_capacity(capacity);
30 }
31 map
32 }
33
34 pub fn from_entries(entries: impl IntoIterator<Item = StoredEntry>, now_ms: u64) -> Self {
35 let mut map = Self::new();
36 for entry in entries {
37 if entry
38 .expire_at_ms
39 .is_some_and(|deadline| deadline <= now_ms)
40 {
41 continue;
42 }
43 map.set(entry.key, entry.value, entry.expire_at_ms, now_ms);
44 }
45 map
46 }
47
48 #[inline(always)]
49 pub fn len(&self) -> usize {
50 #[cfg(feature = "fast-point-map")]
51 if self.fast_points.is_active() {
52 return self.fast_points.len();
53 }
54 self.entries.len()
55 }
56
57 #[inline(always)]
58 pub fn stored_bytes(&self) -> usize {
59 self.stored_bytes
60 }
61
62 #[inline(always)]
63 pub fn memory_limit_bytes(&self) -> Option<usize> {
64 self.memory_limit_bytes
65 }
66
67 #[inline(always)]
68 pub fn eviction_policy(&self) -> EvictionPolicy {
69 self.eviction_policy
70 }
71
72 #[inline(always)]
73 pub fn evictions(&self) -> u64 {
74 self.evictions
75 }
76
77 #[inline(always)]
78 pub fn is_empty(&self) -> bool {
79 #[cfg(feature = "fast-point-map")]
80 if self.fast_points.is_active() {
81 return self.fast_points.is_empty();
82 }
83 self.entries.is_empty()
84 }
85
86 #[cfg(feature = "telemetry")]
87 pub fn attach_metrics(&mut self, metrics: CacheTelemetryHandle, shard_id: usize) {
88 self.telemetry = Some(FlatMapTelemetry { metrics, shard_id });
89 self.sync_metrics_state();
90 }
91
92 pub fn configure_memory_policy(
93 &mut self,
94 memory_limit_bytes: Option<usize>,
95 eviction_policy: EvictionPolicy,
96 now_ms: u64,
97 ) {
98 self.memory_limit_bytes = memory_limit_bytes.filter(|limit| *limit > 0);
99 self.eviction_policy = eviction_policy;
100 if self.memory_limit_bytes.is_some() || self.eviction_policy != EvictionPolicy::None {
101 self.disable_fast_point_map();
102 } else {
103 self.reusable_values.clear();
104 self.reusable_value_bytes = 0;
105 }
106 self.enforce_memory_limit(now_ms);
107 }
108
109 #[inline(always)]
110 pub(super) fn entry_is_expired_hashed(&self, hash: u64, key: &[u8], now_ms: u64) -> bool {
111 self.entries
112 .find(hash, |entry| entry.matches(hash, key))
113 .is_some_and(|entry| entry.is_expired(now_ms))
114 }
115
116 #[inline(always)]
117 pub(super) fn lookup_ref_hashed_lazy(&mut self, hash: u64, key: &[u8]) -> Option<&[u8]> {
118 #[cfg(feature = "fast-point-map")]
119 if self.fast_points.is_active() {
120 return self.fast_points.get(hash, key).map(|value| value.as_ref());
121 }
122 if self.should_sample_read() {
123 let tick = self.next_access_tick();
124 self.entries
125 .find_mut(hash, |entry| entry.matches(hash, key))
126 .map(|entry| {
127 entry.access.record_access(tick);
128 entry.value.as_ref()
129 })
130 } else {
131 self.entries
132 .find(hash, |entry| entry.matches(hash, key))
133 .map(|entry| entry.value.as_ref())
134 }
135 }
136
137 #[inline(always)]
138 pub(super) fn lookup_ref_hashed_prepared_lazy(
139 &mut self,
140 hash: u64,
141 key: &[u8],
142 key_tag: u64,
143 ) -> Option<&[u8]> {
144 #[cfg(feature = "fast-point-map")]
145 if self.fast_points.is_active() {
146 return self.fast_points.get(hash, key).map(|value| value.as_ref());
147 }
148 if self.should_sample_read() {
149 let tick = self.next_access_tick();
150 self.entries
151 .find_mut(hash, |entry| entry.matches_prepared(hash, key, key_tag))
152 .map(|entry| {
153 entry.access.record_access(tick);
154 entry.value.as_ref()
155 })
156 } else {
157 self.entries
158 .find(hash, |entry| entry.matches_prepared(hash, key, key_tag))
159 .map(|entry| entry.value.as_ref())
160 }
161 }
162
163 #[inline(always)]
164 pub(super) fn adjust_ttl_count(&mut self, had_ttl: bool, has_ttl: bool) {
165 match (had_ttl, has_ttl) {
166 (false, true) => {
167 self.disable_fast_point_map();
168 self.ttl_entries = self.ttl_entries.saturating_add(1);
169 }
170 (true, false) => {
171 self.ttl_entries = self.ttl_entries.saturating_sub(1);
172 }
173 _ => {}
174 }
175 }
176
177 #[inline(always)]
178 pub(super) fn disable_fast_point_map(&mut self) {
179 #[cfg(feature = "fast-point-map")]
180 if self.fast_points.is_active() {
181 debug_assert!(self.entries.is_empty());
182 for fast_entry in self.fast_points.take_entries_and_disable() {
183 let entry = fast_entry.into_flat_entry();
184 self.entries
185 .insert_unique(entry.hash, entry, |entry| entry.hash);
186 }
187 }
188 }
189
190 #[inline(always)]
191 pub(super) fn retire_value(&mut self, value: SharedBytes) {
192 if self.has_active_readers() {
193 self.retired_values.push(value);
194 } else {
195 self.recycle_value(value);
196 }
197 }
198
199 #[inline(always)]
200 pub(super) fn recycle_value(&mut self, value: SharedBytes) {
201 if self.eviction_policy == EvictionPolicy::None || self.memory_limit_bytes.is_none() {
202 return;
203 }
204 recycle_value_into_pool(
205 value,
206 &mut self.reusable_values,
207 &mut self.reusable_value_bytes,
208 );
209 }
210
211 #[inline(always)]
212 pub(super) fn has_active_readers(&self) -> bool {
213 self.active_readers.load(Ordering::Acquire) > 0
214 }
215
216 #[inline(always)]
217 pub(super) fn reclaim_retired_if_quiescent(&mut self) {
218 if !self.retired_values.is_empty() && !self.has_active_readers() {
219 let retired_values = mem::take(&mut self.retired_values);
220 for value in retired_values {
221 self.recycle_value(value);
222 }
223 }
224 }
225
226 #[inline(always)]
227 pub(super) fn next_access_tick(&mut self) -> u64 {
228 self.access_clock = self.access_clock.wrapping_add(1);
229 self.access_clock
230 }
231
232 #[inline(always)]
233 pub(super) fn record_lru_touch(&mut self, hash: u64, tick: u64) {
234 if tick == 0 || self.eviction_policy != EvictionPolicy::Lru {
235 return;
236 }
237 self.lru_touch_log.push_back(LruTouch { tick, hash });
238 self.compact_lru_touch_log_if_needed();
239 }
240
241 #[inline(always)]
242 fn compact_lru_touch_log_if_needed(&mut self) {
243 let max_log_len = self.entries.len().saturating_mul(4).max(1024);
244 if self.lru_touch_log.len() <= max_log_len {
245 return;
246 }
247 self.rebuild_lru_touch_log();
248 }
249
250 fn rebuild_lru_touch_log(&mut self) {
251 let mut touches = self
252 .entries
253 .iter()
254 .filter_map(|entry| match entry.access.last_touch {
255 0 => None,
256 tick => Some(LruTouch {
257 tick,
258 hash: entry.hash,
259 }),
260 })
261 .collect::<Vec<_>>();
262 touches.sort_unstable_by_key(|touch| touch.tick);
263 self.lru_touch_log = touches.into();
264 }
265
266 #[inline(always)]
267 fn should_sample_read(&mut self) -> bool {
268 const READ_TOUCH_SAMPLE_MASK: u64 = 1023;
269 if self.eviction_policy == EvictionPolicy::None {
270 return false;
271 }
272 if self.memory_limit_bytes.is_none() {
273 return false;
274 }
275 let limit = self.memory_limit_bytes.unwrap();
276 let watermark = limit.saturating_mul(3) / 4;
277 if self.stored_bytes < watermark.max(1) {
278 return false;
279 }
280 self.read_sample_counter = self.read_sample_counter.wrapping_add(1);
281 (self.read_sample_counter & READ_TOUCH_SAMPLE_MASK) == 0
282 }
283
284 pub(super) fn enforce_memory_limit(&mut self, now_ms: u64) {
285 let Some(limit) = self.memory_limit_bytes else {
286 return;
287 };
288 if self.stored_bytes <= limit {
289 return;
290 }
291 if self.ttl_entries > 0 {
292 self.process_maintenance(now_ms);
293 }
294 self.evict_to_memory_target(self.eviction_policy, now_ms, eviction_target_bytes(limit));
295 }
296
297 pub(crate) fn eviction_candidate(
298 &self,
299 policy: EvictionPolicy,
300 ) -> Option<(EvictionRank, u64, Bytes)> {
301 if policy == EvictionPolicy::None || self.entries.is_empty() {
302 return None;
303 }
304
305 let mut selected: Option<(EvictionRank, u64, &[u8])> = None;
306 for entry in self.entries.iter() {
307 let candidate = (entry.access.rank(policy), entry.hash, entry.key.as_ref());
308 selected = match selected {
309 Some(current) if current.0 <= candidate.0 => Some(current),
310 _ => Some(candidate),
311 };
312 }
313 selected.map(|(rank, hash, key)| (rank, hash, key.to_vec()))
314 }
315
316 pub(crate) fn evict_with_policy(&mut self, policy: EvictionPolicy, now_ms: u64) -> bool {
317 let Some((_rank, hash, key)) = self.eviction_candidate(policy) else {
318 return false;
319 };
320 self.delete_hashed_internal(hash, &key, now_ms, DeleteReason::Evicted)
321 }
322
323 pub(crate) fn evict_to_memory_target(
324 &mut self,
325 policy: EvictionPolicy,
326 now_ms: u64,
327 target_bytes: usize,
328 ) -> bool {
329 if policy == EvictionPolicy::None || self.entries.is_empty() {
330 return false;
331 }
332
333 if policy == EvictionPolicy::Lru {
334 let evicted = self.evict_lru_from_touch_log(now_ms, target_bytes);
335 if self.stored_bytes <= target_bytes || self.entries.is_empty() {
336 return evicted;
337 }
338 }
339
340 let mut evicted = false;
341 while self.stored_bytes > target_bytes {
342 let mut candidates = self.eviction_candidates_for_target(policy, target_bytes);
343 if candidates.is_empty() {
344 break;
345 }
346 candidates.sort_unstable_by_key(|candidate| candidate.rank);
347 let mut evicted_batch = false;
348 for candidate in candidates {
349 if self.stored_bytes <= target_bytes {
350 break;
351 }
352 evicted_batch |= self.delete_hashed_internal(
353 candidate.hash,
354 &candidate.key,
355 now_ms,
356 DeleteReason::Evicted,
357 );
358 }
359 if !evicted_batch {
360 break;
361 }
362 evicted = true;
363 }
364 evicted
365 }
366
367 fn evict_lru_from_touch_log(&mut self, _now_ms: u64, target_bytes: usize) -> bool {
368 let mut evicted = false;
369 while self.stored_bytes > target_bytes {
370 let Some(touch) = self.lru_touch_log.pop_front() else {
371 break;
372 };
373 let Some(entry) = self
374 .entries
375 .find_entry(touch.hash, |entry| entry.access.last_touch == touch.tick)
376 .ok()
377 else {
378 continue;
379 };
380 let removed_key_len = entry.get().key.len();
381 let removed_value_len = entry.get().value.len();
382 let had_ttl = entry.get().expire_at_ms.is_some();
383 let (removed, _) = entry.remove();
384 if had_ttl {
385 self.ttl_entries = self.ttl_entries.saturating_sub(1);
386 }
387 self.stored_bytes = self
388 .stored_bytes
389 .saturating_sub(removed_key_len.saturating_add(removed_value_len));
390 self.retire_value(removed.value);
391 self.evictions = self.evictions.saturating_add(1);
392 #[cfg(feature = "telemetry")]
393 self.record_delete_metrics(
394 DeleteReason::Evicted,
395 -1,
396 -((removed_key_len + removed_value_len) as isize),
397 );
398 evicted = true;
399 }
400 evicted
401 }
402
403 fn eviction_candidates_for_target(
404 &self,
405 policy: EvictionPolicy,
406 target_bytes: usize,
407 ) -> Vec<EvictionCandidate> {
408 let target_count = self.eviction_candidate_count(target_bytes);
409 if target_count == 0 {
410 return Vec::new();
411 }
412
413 let mut candidates = BinaryHeap::with_capacity(target_count);
414 for entry in self.entries.iter() {
415 let rank = entry.access.rank(policy);
416 if candidates.len() < target_count {
417 candidates.push(EvictionCandidate {
418 rank,
419 hash: entry.hash,
420 key: entry.key.as_ref().to_vec(),
421 });
422 continue;
423 }
424
425 let Some(mut warmest_candidate) = candidates.peek_mut() else {
426 continue;
427 };
428 if rank < warmest_candidate.rank {
429 *warmest_candidate = EvictionCandidate {
430 rank,
431 hash: entry.hash,
432 key: entry.key.as_ref().to_vec(),
433 };
434 }
435 }
436 candidates.into_vec()
437 }
438
439 fn eviction_candidate_count(&self, target_bytes: usize) -> usize {
440 let bytes_to_free = self.stored_bytes.saturating_sub(target_bytes);
441 if bytes_to_free == 0 || self.entries.is_empty() {
442 return 0;
443 }
444
445 let average_entry_bytes = (self.stored_bytes / self.entries.len()).max(1);
446 let estimated_count = bytes_to_free.div_ceil(average_entry_bytes);
447 let safety_margin = (estimated_count / 8).saturating_add(8);
448 estimated_count
449 .saturating_add(safety_margin)
450 .min(self.entries.len())
451 }
452
453 pub(crate) fn eviction_target_bytes(limit: usize) -> usize {
454 eviction_target_bytes(limit)
455 }
456
457 #[cfg(feature = "telemetry")]
458 #[inline(always)]
459 pub(super) fn record_set_metrics(
460 &self,
461 written_len: usize,
462 key_delta: isize,
463 memory_delta: isize,
464 start: Option<Instant>,
465 ) {
466 if let (Some(telemetry), Some(start)) = (&self.telemetry, start) {
467 telemetry.metrics.record_set(
468 telemetry.shard_id,
469 written_len,
470 start.elapsed().as_nanos() as u64,
471 );
472 telemetry
473 .metrics
474 .adjust_keys_total(telemetry.shard_id, key_delta);
475 telemetry
476 .metrics
477 .adjust_memory_bytes(telemetry.shard_id, memory_delta);
478 telemetry
479 .metrics
480 .set_shard_keys(telemetry.shard_id, self.len());
481 }
482 }
483
484 #[cfg(feature = "telemetry")]
485 #[inline(always)]
486 pub(super) fn record_delete_metrics(
487 &self,
488 reason: DeleteReason,
489 key_delta: isize,
490 memory_delta: isize,
491 ) {
492 if let Some(telemetry) = &self.telemetry {
493 match reason {
494 DeleteReason::Explicit => telemetry.metrics.record_delete(telemetry.shard_id),
495 DeleteReason::Expired => telemetry.metrics.record_expiration(1),
496 DeleteReason::Evicted => {}
497 }
498 telemetry
499 .metrics
500 .adjust_keys_total(telemetry.shard_id, key_delta);
501 telemetry
502 .metrics
503 .adjust_memory_bytes(telemetry.shard_id, memory_delta);
504 telemetry
505 .metrics
506 .set_shard_keys(telemetry.shard_id, self.len());
507 }
508 }
509
510 #[cfg(feature = "telemetry")]
511 #[inline(always)]
512 fn sync_metrics_state(&self) {
513 if let Some(telemetry) = &self.telemetry {
514 telemetry
515 .metrics
516 .set_shard_keys(telemetry.shard_id, self.len());
517 telemetry
518 .metrics
519 .adjust_keys_total(telemetry.shard_id, self.len() as isize);
520 telemetry
521 .metrics
522 .adjust_memory_bytes(telemetry.shard_id, self.stored_bytes as isize);
523 }
524 }
525}
526
527fn eviction_target_bytes(limit: usize) -> usize {
528 const EXACT_EVICTION_LIMIT_BYTES: usize = 4096;
529 if limit <= EXACT_EVICTION_LIMIT_BYTES {
530 return limit;
531 }
532 limit.saturating_sub((limit / 20).max(1))
533}