kevy_store/accounting.rs
1//! Internal accounting helpers on [`Store`]: per-entry weight bookkeeping,
2//! LRU/LFU clock advance, prefetch, and the lazy-expire `live_entry` /
3//! `live_entry_mut` lookups used by every typed accessor.
4//!
5//! Split out of [`crate`] for file-size hygiene. Nothing here is part of
6//! the public surface — all methods are `pub(crate)` and called by sibling
7//! modules (string/hash/list/set/zset/evict/expire/keyspace).
8
9use kevy_hash::KevyHash;
10
11use crate::value::ENTRY_OVERHEAD;
12use crate::{Entry, SmallBytes, Store, apply_delta, evict, key_heap_bytes_for};
13
14impl Store {
15 /// Insert a fresh entry, replacing any prior. Stamps `entry.weight` from
16 /// the live value and key, then updates `used_memory` for either the
17 /// new-key (charges [`ENTRY_OVERHEAD`]) or overwrite (weight swap) case.
18 pub(crate) fn insert_entry(&mut self, key: SmallBytes, mut entry: Entry) -> Option<Entry> {
19 entry.set_weight(key.heap_bytes() as u64 + entry.value.weight());
20 if self.maxmemory > 0 {
21 self.tick_clock();
22 entry.set_lru_clock(self.clock_counter as u32);
23 }
24 let new_w = entry.weight();
25 let new_has_ttl = entry.expire_at_ns.is_some();
26 let prev = self.map.insert(key, entry);
27 match &prev {
28 Some(old) => {
29 self.used_memory = self
30 .used_memory
31 .saturating_sub(old.weight())
32 .saturating_add(new_w);
33 }
34 None => {
35 self.used_memory = self.used_memory.saturating_add(new_w + ENTRY_OVERHEAD);
36 }
37 }
38 let old_has_ttl = prev.as_ref().is_some_and(|o| o.expire_at_ns.is_some());
39 self.adjust_expires(i64::from(new_has_ttl) - i64::from(old_has_ttl));
40 self.update_peak();
41 prev
42 }
43
44 /// Remove a key, returning the displaced entry (`None` if absent).
45 /// Frees the entry's cached weight + [`ENTRY_OVERHEAD`].
46 pub(crate) fn remove_entry(&mut self, key: &[u8]) -> Option<Entry> {
47 let old = self.map.remove(key)?;
48 self.used_memory = self
49 .used_memory
50 .saturating_sub(old.weight() + ENTRY_OVERHEAD);
51 if old.expire_at_ns.is_some() {
52 self.adjust_expires(-1);
53 }
54 Some(old)
55 }
56
57 /// Apply a signed weight delta to `key`'s cached `Entry::weight` AND to
58 /// the shard-wide `used_memory`. Used by in-place collection mutators
59 /// (HSET adding a field, LPUSH adding an item, …) so we account in O(1)
60 /// without re-walking the container.
61 pub(crate) fn account_delta(&mut self, key: &[u8], delta: i64) {
62 if delta == 0 {
63 return;
64 }
65 if let Some(e) = self.map.get_mut(key) {
66 e.add_to_weight(delta);
67 }
68 apply_delta(&mut self.used_memory, delta);
69 if delta > 0 {
70 self.update_peak();
71 }
72 }
73
74 /// Recompute `weight` for the entry at `key` from its current value +
75 /// key, then propagate the delta to `used_memory`. Use after a wholesale
76 /// in-place value swap (SET / APPEND / INCRBYFLOAT) where the prior
77 /// `Value`'s weight was already cached on the entry.
78 pub(crate) fn reweigh_entry(&mut self, key: &[u8]) {
79 let key_heap = key_heap_bytes_for(key);
80 let Some(e) = self.map.get_mut(key) else {
81 return;
82 };
83 let new_w = key_heap + e.value.weight();
84 let delta = new_w as i64 - e.weight() as i64;
85 e.set_weight(new_w);
86 apply_delta(&mut self.used_memory, delta);
87 if delta > 0 {
88 self.update_peak();
89 }
90 }
91
92 /// Advance the global access ordinal by one tick. Only invoked under
93 /// `maxmemory > 0` so the wrapping_add cost stays out of the unlimited
94 /// fast path.
95 #[inline]
96 pub(crate) fn tick_clock(&mut self) {
97 self.clock_counter = self.clock_counter.wrapping_add(1);
98 }
99
100 #[inline]
101 fn update_peak(&mut self) {
102 if self.used_memory > self.used_memory_peak {
103 self.used_memory_peak = self.used_memory;
104 }
105 }
106
107 /// Apply a weight delta computed in-place by a caller that already held
108 /// `&mut Entry` (overwrite-SET fast path) — same arithmetic as
109 /// [`Self::reweigh_entry`] but WITHOUT the second hash + map probe that
110 /// `reweigh_entry(key)` pays to re-find the entry it just mutated.
111 #[inline]
112 pub(crate) fn apply_weight_delta(&mut self, delta: i64) {
113 apply_delta(&mut self.used_memory, delta);
114 if delta > 0 {
115 self.update_peak();
116 }
117 }
118
119 /// Hint the CPU to fetch the bucket cache line for `key` into L1. Called
120 /// by the reactor's parse loop on command N+1 while command N is still
121 /// being dispatched — by the time N+1 actually probes the table, the
122 /// metadata line is hot. No-op when the table is empty. Cheap when not.
123 #[inline]
124 pub fn prefetch_for_key(&self, key: &[u8]) {
125 let hash = key.kevy_hash();
126 self.map.prefetch_for_hash(hash);
127 }
128
129 pub(crate) fn expired(&self, key: &[u8], now: u64) -> bool {
130 match self.map.get(key) {
131 Some(e) => e.is_expired_at(now),
132 None => false,
133 }
134 }
135
136 /// Drop `key` if expired; returns whether it is live afterwards. `now` is
137 /// monotonic ns since epoch (from [`crate::now_ns`]).
138 pub(crate) fn reap(&mut self, key: &[u8], now: u64) -> bool {
139 if self.expired(key, now) {
140 self.remove_entry(key);
141 self.expired_keys_total = self.expired_keys_total.saturating_add(1);
142 false
143 } else {
144 self.map.contains_key(key)
145 }
146 }
147
148 /// Single-lookup lazy-expiring read: the live `Entry` for `key`, or `None` if
149 /// absent or expired (expired keys are dropped here, as `reap` would).
150 ///
151 /// Two wins over the old `reap(now)`-then-`get` read path: (1) the clock is
152 /// read **only when the entry actually carries a TTL** — most keys don't, so
153 /// the common hit skips `Instant::now()` (~20–40 ns); (2) one fewer keyspace
154 /// lookup on hits (was peek-expiry + `contains_key` + `get` = 3; now peek +
155 /// `get` = 2). The two-phase shape (decide, then mutate/fetch) keeps the
156 /// borrow checker happy without an owning key clone.
157 pub(crate) fn live_entry(&mut self, key: &[u8]) -> Option<&Entry> {
158 let (uc, cn) = (self.cached_clock, self.cached_ns);
159 let expired = match self.map.get(key) {
160 None => return None,
161 Some(e) => e.is_expired(uc, cn),
162 };
163 if expired {
164 self.remove_entry(key);
165 self.expired_keys_total = self.expired_keys_total.saturating_add(1);
166 return None;
167 }
168 if self.maxmemory > 0 {
169 self.tick_clock();
170 let c = self.clock_counter as u32;
171 let e = self.map.get_mut(key)?;
172 evict::touch_on_access(e, self.eviction_policy, c);
173 return Some(&*e);
174 }
175 self.map.get(key)
176 }
177
178 /// Mutable [`live_entry`](Self::live_entry): the live `Entry` for `key` by
179 /// `&mut`, or `None` if absent/expired (expired dropped). Same wins — clock
180 /// read only on TTL'd keys, one fewer lookup than `reap`-then-`get_mut`.
181 /// Read-modify commands (INCR/APPEND/…) get the entry once and mutate in
182 /// place, preserving any TTL on it.
183 pub(crate) fn live_entry_mut(&mut self, key: &[u8]) -> Option<&mut Entry> {
184 let (uc, cn) = (self.cached_clock, self.cached_ns);
185 let expired = match self.map.get(key) {
186 None => return None,
187 Some(e) => e.is_expired(uc, cn),
188 };
189 if expired {
190 self.remove_entry(key);
191 self.expired_keys_total = self.expired_keys_total.saturating_add(1);
192 return None;
193 }
194 if self.maxmemory > 0 {
195 self.tick_clock();
196 let c = self.clock_counter as u32;
197 let e = self.map.get_mut(key)?;
198 evict::touch_on_access(e, self.eviction_policy, c);
199 return Some(e);
200 }
201 self.map.get_mut(key)
202 }
203}