1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
//! Internal accounting helpers on [`Store`]: per-entry weight bookkeeping,
//! LRU/LFU clock advance, prefetch, and the lazy-expire `live_entry` /
//! `live_entry_mut` lookups used by every typed accessor.
//!
//! Split out of [`crate`] for file-size hygiene. Nothing here is part of
//! the public surface — all methods are `pub(crate)` and called by sibling
//! modules (string/hash/list/set/zset/evict/expire/keyspace).
use std::time::Instant;
use kevy_hash::KevyHash;
use crate::value::ENTRY_OVERHEAD;
use crate::{Entry, SmallBytes, Store, apply_delta, evict, key_heap_bytes_for};
impl Store {
/// Insert a fresh entry, replacing any prior. Stamps `entry.weight` from
/// the live value and key, then updates `used_memory` for either the
/// new-key (charges [`ENTRY_OVERHEAD`]) or overwrite (weight swap) case.
pub(crate) fn insert_entry(&mut self, key: SmallBytes, mut entry: Entry) -> Option<Entry> {
entry.set_weight(key.heap_bytes() as u64 + entry.value.weight());
if self.maxmemory > 0 {
self.tick_clock();
entry.set_lru_clock(self.clock_counter as u32);
}
let new_w = entry.weight();
let prev = self.map.insert(key, entry);
match &prev {
Some(old) => {
self.used_memory = self
.used_memory
.saturating_sub(old.weight())
.saturating_add(new_w);
}
None => {
self.used_memory = self.used_memory.saturating_add(new_w + ENTRY_OVERHEAD);
}
}
self.update_peak();
prev
}
/// Remove a key, returning the displaced entry (`None` if absent).
/// Frees the entry's cached weight + [`ENTRY_OVERHEAD`].
pub(crate) fn remove_entry(&mut self, key: &[u8]) -> Option<Entry> {
let old = self.map.remove(key)?;
self.used_memory = self
.used_memory
.saturating_sub(old.weight() + ENTRY_OVERHEAD);
Some(old)
}
/// Apply a signed weight delta to `key`'s cached `Entry::weight` AND to
/// the shard-wide `used_memory`. Used by in-place collection mutators
/// (HSET adding a field, LPUSH adding an item, …) so we account in O(1)
/// without re-walking the container.
pub(crate) fn account_delta(&mut self, key: &[u8], delta: i64) {
if delta == 0 {
return;
}
if let Some(e) = self.map.get_mut(key) {
e.add_to_weight(delta);
}
apply_delta(&mut self.used_memory, delta);
if delta > 0 {
self.update_peak();
}
}
/// Recompute `weight` for the entry at `key` from its current value +
/// key, then propagate the delta to `used_memory`. Use after a wholesale
/// in-place value swap (SET / APPEND / INCRBYFLOAT) where the prior
/// `Value`'s weight was already cached on the entry.
pub(crate) fn reweigh_entry(&mut self, key: &[u8]) {
let key_heap = key_heap_bytes_for(key);
let Some(e) = self.map.get_mut(key) else {
return;
};
let new_w = key_heap + e.value.weight();
let delta = new_w as i64 - e.weight() as i64;
e.set_weight(new_w);
apply_delta(&mut self.used_memory, delta);
if delta > 0 {
self.update_peak();
}
}
/// Advance the global access ordinal by one tick. Only invoked under
/// `maxmemory > 0` so the wrapping_add cost stays out of the unlimited
/// fast path.
#[inline]
pub(crate) fn tick_clock(&mut self) {
self.clock_counter = self.clock_counter.wrapping_add(1);
}
#[inline]
fn update_peak(&mut self) {
if self.used_memory > self.used_memory_peak {
self.used_memory_peak = self.used_memory;
}
}
/// Hint the CPU to fetch the bucket cache line for `key` into L1. Called
/// by the reactor's parse loop on command N+1 while command N is still
/// being dispatched — by the time N+1 actually probes the table, the
/// metadata line is hot. No-op when the table is empty. Cheap when not.
#[inline]
pub fn prefetch_for_key(&self, key: &[u8]) {
let hash = key.kevy_hash();
self.map.prefetch_for_hash(hash);
}
pub(crate) fn expired(&self, key: &[u8], now: Instant) -> bool {
match self.map.get(key) {
Some(e) => e.is_expired_at(now),
None => false,
}
}
/// Drop `key` if expired; returns whether it is live afterwards.
pub(crate) fn reap(&mut self, key: &[u8], now: Instant) -> bool {
if self.expired(key, now) {
self.remove_entry(key);
self.expired_keys_total = self.expired_keys_total.saturating_add(1);
false
} else {
self.map.contains_key(key)
}
}
/// Single-lookup lazy-expiring read: the live `Entry` for `key`, or `None` if
/// absent or expired (expired keys are dropped here, as `reap` would).
///
/// Two wins over the old `reap(now)`-then-`get` read path: (1) the clock is
/// read **only when the entry actually carries a TTL** — most keys don't, so
/// the common hit skips `Instant::now()` (~20–40 ns); (2) one fewer keyspace
/// lookup on hits (was peek-expiry + `contains_key` + `get` = 3; now peek +
/// `get` = 2). The two-phase shape (decide, then mutate/fetch) keeps the
/// borrow checker happy without an owning key clone.
pub(crate) fn live_entry(&mut self, key: &[u8]) -> Option<&Entry> {
let expired = match self.map.get(key) {
None => return None,
Some(e) => e.is_expired_at(Instant::now()),
};
if expired {
self.remove_entry(key);
self.expired_keys_total = self.expired_keys_total.saturating_add(1);
return None;
}
if self.maxmemory > 0 {
self.tick_clock();
let c = self.clock_counter as u32;
let e = self.map.get_mut(key)?;
evict::touch_on_access(e, self.eviction_policy, c);
return Some(&*e);
}
self.map.get(key)
}
/// Mutable [`live_entry`](Self::live_entry): the live `Entry` for `key` by
/// `&mut`, or `None` if absent/expired (expired dropped). Same wins — clock
/// read only on TTL'd keys, one fewer lookup than `reap`-then-`get_mut`.
/// Read-modify commands (INCR/APPEND/…) get the entry once and mutate in
/// place, preserving any TTL on it.
pub(crate) fn live_entry_mut(&mut self, key: &[u8]) -> Option<&mut Entry> {
let expired = match self.map.get(key) {
None => return None,
Some(e) => e.is_expired_at(Instant::now()),
};
if expired {
self.remove_entry(key);
self.expired_keys_total = self.expired_keys_total.saturating_add(1);
return None;
}
if self.maxmemory > 0 {
self.tick_clock();
let c = self.clock_counter as u32;
let e = self.map.get_mut(key)?;
evict::touch_on_access(e, self.eviction_policy, c);
return Some(e);
}
self.map.get_mut(key)
}
}