Skip to main content

kevy_store/
accounting.rs

1//! Internal accounting helpers on [`Store`]: per-entry weight bookkeeping,
2//! LRU/LFU clock advance, prefetch, and the lazy-expire `live_entry` /
3//! `live_entry_mut` lookups used by every typed accessor.
4//!
5//! Split out of [`crate`] for file-size hygiene. Nothing here is part of
6//! the public surface — all methods are `pub(crate)` and called by sibling
7//! modules (string/hash/list/set/zset/evict/expire/keyspace).
8
9use kevy_hash::KevyHash;
10
11use crate::value::ENTRY_OVERHEAD;
12use crate::{Entry, SmallBytes, Store, apply_delta, evict, key_heap_bytes_for};
13
14impl Store {
15    /// Insert a fresh entry, replacing any prior. Stamps `entry.weight` from
16    /// the live value and key, then updates `used_memory` for either the
17    /// new-key (charges [`ENTRY_OVERHEAD`]) or overwrite (weight swap) case.
18    pub(crate) fn insert_entry(&mut self, key: SmallBytes, mut entry: Entry) -> Option<Entry> {
19        entry.set_weight(key.heap_bytes() as u64 + entry.value.weight());
20        if self.maxmemory > 0 {
21            self.tick_clock();
22            entry.set_lru_clock(self.clock_counter as u32);
23        }
24        let new_w = entry.weight();
25        let new_has_ttl = entry.expire_at_ns.is_some();
26        let prev = self.map.insert(key, entry);
27        match &prev {
28            Some(old) => {
29                self.used_memory = self
30                    .used_memory
31                    .saturating_sub(old.weight())
32                    .saturating_add(new_w);
33            }
34            None => {
35                self.used_memory = self.used_memory.saturating_add(new_w + ENTRY_OVERHEAD);
36            }
37        }
38        let old_has_ttl = prev.as_ref().is_some_and(|o| o.expire_at_ns.is_some());
39        self.adjust_expires(i64::from(new_has_ttl) - i64::from(old_has_ttl));
40        self.update_peak();
41        prev
42    }
43
44    /// Remove a key, returning the displaced entry (`None` if absent).
45    /// Frees the entry's cached weight + [`ENTRY_OVERHEAD`].
46    pub(crate) fn remove_entry(&mut self, key: &[u8]) -> Option<Entry> {
47        let old = self.map.remove(key)?;
48        self.used_memory = self
49            .used_memory
50            .saturating_sub(old.weight() + ENTRY_OVERHEAD);
51        if old.expire_at_ns.is_some() {
52            self.adjust_expires(-1);
53        }
54        Some(old)
55    }
56
57    /// Apply a signed weight delta to `key`'s cached `Entry::weight` AND to
58    /// the shard-wide `used_memory`. Used by in-place collection mutators
59    /// (HSET adding a field, LPUSH adding an item, …) so we account in O(1)
60    /// without re-walking the container.
61    pub(crate) fn account_delta(&mut self, key: &[u8], delta: i64) {
62        if delta == 0 {
63            return;
64        }
65        if let Some(e) = self.map.get_mut(key) {
66            e.add_to_weight(delta);
67        }
68        apply_delta(&mut self.used_memory, delta);
69        if delta > 0 {
70            self.update_peak();
71        }
72    }
73
74    /// Recompute `weight` for the entry at `key` from its current value +
75    /// key, then propagate the delta to `used_memory`. Use after a wholesale
76    /// in-place value swap (SET / APPEND / INCRBYFLOAT) where the prior
77    /// `Value`'s weight was already cached on the entry.
78    pub(crate) fn reweigh_entry(&mut self, key: &[u8]) {
79        let key_heap = key_heap_bytes_for(key);
80        let Some(e) = self.map.get_mut(key) else {
81            return;
82        };
83        let new_w = key_heap + e.value.weight();
84        let delta = new_w as i64 - e.weight() as i64;
85        e.set_weight(new_w);
86        apply_delta(&mut self.used_memory, delta);
87        if delta > 0 {
88            self.update_peak();
89        }
90    }
91
92    /// Advance the global access ordinal by one tick. Only invoked under
93    /// `maxmemory > 0` so the wrapping_add cost stays out of the unlimited
94    /// fast path.
95    #[inline]
96    pub(crate) fn tick_clock(&mut self) {
97        self.clock_counter = self.clock_counter.wrapping_add(1);
98    }
99
100    #[inline]
101    fn update_peak(&mut self) {
102        if self.used_memory > self.used_memory_peak {
103            self.used_memory_peak = self.used_memory;
104        }
105    }
106
107    /// Apply a weight delta computed in-place by a caller that already held
108    /// `&mut Entry` (overwrite-SET fast path) — same arithmetic as
109    /// [`Self::reweigh_entry`] but WITHOUT the second hash + map probe that
110    /// `reweigh_entry(key)` pays to re-find the entry it just mutated.
111    #[inline]
112    pub(crate) fn apply_weight_delta(&mut self, delta: i64) {
113        apply_delta(&mut self.used_memory, delta);
114        if delta > 0 {
115            self.update_peak();
116        }
117    }
118
119    /// Hint the CPU to fetch the bucket cache line for `key` into L1. Called
120    /// by the reactor's parse loop on command N+1 while command N is still
121    /// being dispatched — by the time N+1 actually probes the table, the
122    /// metadata line is hot. No-op when the table is empty. Cheap when not.
123    #[inline]
124    pub fn prefetch_for_key(&self, key: &[u8]) {
125        let hash = key.kevy_hash();
126        self.map.prefetch_for_hash(hash);
127    }
128
129    pub(crate) fn expired(&self, key: &[u8], now: u64) -> bool {
130        match self.map.get(key) {
131            Some(e) => e.is_expired_at(now),
132            None => false,
133        }
134    }
135
136    /// Drop `key` if expired; returns whether it is live afterwards. `now` is
137    /// monotonic ns since epoch (from [`crate::now_ns`]).
138    pub(crate) fn reap(&mut self, key: &[u8], now: u64) -> bool {
139        if self.expired(key, now) {
140            self.remove_entry(key);
141            self.expired_keys_total = self.expired_keys_total.saturating_add(1);
142            false
143        } else {
144            self.map.contains_key(key)
145        }
146    }
147
148    /// Single-lookup lazy-expiring read: the live `Entry` for `key`, or `None` if
149    /// absent or expired (expired keys are dropped here, as `reap` would).
150    ///
151    /// Two wins over the old `reap(now)`-then-`get` read path: (1) the clock is
152    /// read **only when the entry actually carries a TTL** — most keys don't, so
153    /// the common hit skips `Instant::now()` (~20–40 ns); (2) one fewer keyspace
154    /// lookup on hits (was peek-expiry + `contains_key` + `get` = 3; now peek +
155    /// `get` = 2). The two-phase shape (decide, then mutate/fetch) keeps the
156    /// borrow checker happy without an owning key clone.
157    pub(crate) fn live_entry(&mut self, key: &[u8]) -> Option<&Entry> {
158        let (uc, cn) = (self.cached_clock, self.cached_ns);
159        let expired = match self.map.get(key) {
160            None => return None,
161            Some(e) => e.is_expired(uc, cn),
162        };
163        if expired {
164            self.remove_entry(key);
165            self.expired_keys_total = self.expired_keys_total.saturating_add(1);
166            return None;
167        }
168        if self.maxmemory > 0 {
169            self.tick_clock();
170            let c = self.clock_counter as u32;
171            let e = self.map.get_mut(key)?;
172            evict::touch_on_access(e, self.eviction_policy, c);
173            return Some(&*e);
174        }
175        self.map.get(key)
176    }
177
178    /// Mutable [`live_entry`](Self::live_entry): the live `Entry` for `key` by
179    /// `&mut`, or `None` if absent/expired (expired dropped). Same wins — clock
180    /// read only on TTL'd keys, one fewer lookup than `reap`-then-`get_mut`.
181    /// Read-modify commands (INCR/APPEND/…) get the entry once and mutate in
182    /// place, preserving any TTL on it.
183    pub(crate) fn live_entry_mut(&mut self, key: &[u8]) -> Option<&mut Entry> {
184        let (uc, cn) = (self.cached_clock, self.cached_ns);
185        let expired = match self.map.get(key) {
186            None => return None,
187            Some(e) => e.is_expired(uc, cn),
188        };
189        if expired {
190            self.remove_entry(key);
191            self.expired_keys_total = self.expired_keys_total.saturating_add(1);
192            return None;
193        }
194        if self.maxmemory > 0 {
195            self.tick_clock();
196            let c = self.clock_counter as u32;
197            let e = self.map.get_mut(key)?;
198            evict::touch_on_access(e, self.eviction_policy, c);
199            return Some(e);
200        }
201        self.map.get_mut(key)
202    }
203}