Skip to main content

kevy_store/
lib.rs

1//! kevy-store — the keyspace.
2//!
3//! A single-threaded, multi-type keyspace with lazy expiration. Each Redis data
4//! type is backed by a modern `std` structure — behaviour-compatible, but **not**
5//! Redis's legacy encodings:
6//!
7//! | Type | Backing structure |
8//! |------|-------------------|
9//! | String | `Vec<u8>` |
10//! | Hash / Set | `HashMap` / `HashSet` (hashbrown Swiss table) |
11//! | List | `VecDeque` (ring buffer, O(1) ends) |
12//! | Sorted set | `HashMap` + `BTreeSet<(score, member)>` (a B-tree, not a skiplist) |
13//!
14//! Wrong-type access returns [`StoreError::WrongType`]. The API is `&mut self`
15//! and lock-free, so a thread-per-core runtime ([kevy-rt]) can own one shard per
16//! core with no locking. Part of the [kevy] key–value server.
17//!
18//! `maxmemory` enforcement + 8 eviction policies live in [`evict`]; toggle via
19//! [`Store::set_max_memory`]. With `maxmemory == 0` (the default) the hot-path
20//! cost collapses to a single predicted-not-taken branch, matching the
21//! "unlimited" mode in Redis byte-for-byte.
22//!
23//! [kevy]: https://crates.io/crates/kevy
24//! [kevy-rt]: https://crates.io/crates/kevy-rt
25//!
26//! # Example
27//!
28//! ```
29//! use kevy_store::Store;
30//!
31//! let mut s = Store::new();
32//! s.set(b"greeting", b"hello".to_vec(), None, false, false);
33//! assert_eq!(s.get(b"greeting").unwrap(), Some(&b"hello"[..]));
34//!
35//! s.hset(b"user:1", &[(b"name".to_vec(), b"alice".to_vec())]).unwrap();
36//! assert_eq!(s.hget(b"user:1", b"name").unwrap(), Some(&b"alice"[..]));
37//!
38//! // A string command on a hash key is a type error, as in Redis.
39//! assert_eq!(s.get(b"user:1"), Err(kevy_store::StoreError::WrongType));
40//! ```
41#![forbid(unsafe_code)]
42
43mod accounting;
44pub mod evict;
45pub mod expire;
46pub use expire::ExpireStats;
47mod hash;
48mod keyspace;
49mod list;
50mod set;
51mod stream;
52mod string;
53mod util;
54mod value;
55mod zset;
56pub use stream::{
57    AutoclaimResult, ConsumerGroup, ConsumerState, EntryBatch, GroupCreateMode,
58    LoadedStreamEntry, PelEntry, PendingExtended, PendingExtendedRow, PendingSummary,
59    ReadGroupId, StreamData, StreamId, StreamIdError, XAddIdSpec, XClaimOpts,
60    now_unix_ms, parse_explicit_id, parse_range_end, parse_range_start, parse_xadd_id,
61};
62pub use util::glob_match;
63pub use value::*;
64
65use kevy_map::KevyMap;
66use std::num::NonZeroU64;
67use std::sync::OnceLock;
68use std::time::{Duration, Instant};
69
70/// Process-start anchor: every `Entry::expire_at_ns` is a nanosecond
71/// offset from this `Instant`, encoded as `Option<NonZeroU64>` so the
72/// niche optimisation lets the field cost 8 bytes (vs 16 for a bare
73/// `Option<Instant>`). 584-year range from process start — Y2538-proof.
74fn epoch() -> Instant {
75    static EPOCH: OnceLock<Instant> = OnceLock::new();
76    *EPOCH.get_or_init(Instant::now)
77}
78
79/// Encode an absolute `Instant` as ns-since-process-start. Returns `None`
80/// when `t == epoch()` exactly (sentinel collision); in practice an entry
81/// inserted at exactly t=0 from process start with TTL=0 is the only path
82/// there, and TTL=0 isn't a valid expiry the API ever takes.
83#[inline]
84fn pack_deadline(t: Instant) -> Option<NonZeroU64> {
85    let ns = t.saturating_duration_since(epoch()).as_nanos() as u64;
86    NonZeroU64::new(ns)
87}
88
89/// Decode a packed deadline back into an `Instant` for the rare paths
90/// (`pttl`, snapshot dump) that need real-clock math.
91#[inline]
92fn unpack_deadline(ns: NonZeroU64) -> Instant {
93    epoch() + Duration::from_nanos(ns.get())
94}
95
96/// Per-entry weight ceiling — the field is `u32` so accounting saturates
97/// at 4 GiB per entry. Real-world Redis values are well below this; the
98/// ceiling only matters when a single hash / list / zset exceeds 4 GiB,
99/// in which case `MEMORY USAGE` and the maxmemory accounting under-
100/// report that one entry by the overflow amount. Acceptable v1.0 tradeoff
101/// — keeps `Entry` at 48 bytes (vs 56 if we kept `u64`).
102const WEIGHT_MAX: u32 = u32::MAX;
103
104/// Per-key entry — packed to 48 bytes (vs 64 in the original
105/// `Value + Option<Instant> + u64 weight + u32 clock + 4 pad` layout):
106///
107/// - `value`: 32 bytes (boxed-collection enum).
108/// - `expire_at_ns`: `Option<NonZeroU64>` = ns since process start.
109///   Niche optimisation makes this 8 bytes, not the 16 a bare
110///   `Option<Instant>` would cost.
111/// - `weight`: `u32`. Cached `key.heap_bytes() + value.weight()` for
112///   O(1) eviction & `MEMORY USAGE`. Saturates at 4 GiB per entry.
113/// - `lru_clock`: `u32`. LRU = monotonic op counter; LFU = packed
114///   `[16-bit decay-tick | 8-bit log-counter]`. Only updated when
115///   `Store::maxmemory > 0`.
116///
117/// Storage saving over the original layout: 16 bytes per entry = 25 %.
118/// For a 1 M-key shard that's ~16 MB of RSS back.
119pub(crate) struct Entry {
120    pub(crate) value: Value,
121    pub(crate) expire_at_ns: Option<NonZeroU64>,
122    pub(crate) weight: u32,
123    pub(crate) lru_clock: u32,
124}
125
126impl Entry {
127    /// Build a fresh entry with weight + lru_clock uninitialised (the
128    /// caller — usually [`Store::insert_entry`] — will compute and stamp them).
129    #[inline]
130    pub(crate) fn new(value: Value, expire_at: Option<Instant>) -> Self {
131        Self {
132            value,
133            expire_at_ns: expire_at.and_then(pack_deadline),
134            weight: 0,
135            lru_clock: 0,
136        }
137    }
138
139    /// Cached entry weight as a `u64` for arithmetic uniformity with the
140    /// `Store::used_memory: u64` accumulator. Zero-cost cast.
141    #[inline]
142    pub(crate) fn weight(&self) -> u64 {
143        self.weight as u64
144    }
145
146    /// LRU / LFU clock value (eviction-only).
147    #[inline]
148    pub(crate) fn lru_clock(&self) -> u32 {
149        self.lru_clock
150    }
151
152    /// Overwrite the cached weight, saturating at the 4 GiB ceiling.
153    #[inline]
154    pub(crate) fn set_weight(&mut self, w: u64) {
155        self.weight = w.min(WEIGHT_MAX as u64) as u32;
156    }
157
158    /// Overwrite the LRU/LFU clock field.
159    #[inline]
160    pub(crate) fn set_lru_clock(&mut self, c: u32) {
161        self.lru_clock = c;
162    }
163
164    /// Apply a signed delta to the cached weight (saturating both directions).
165    #[inline]
166    pub(crate) fn add_to_weight(&mut self, delta: i64) {
167        if delta == 0 {
168            return;
169        }
170        let cur = self.weight as u64;
171        let new = if delta >= 0 {
172            cur.saturating_add(delta as u64)
173        } else {
174            cur.saturating_sub((-delta) as u64)
175        };
176        self.weight = new.min(WEIGHT_MAX as u64) as u32;
177    }
178
179    /// Is the entry past its deadline as of `now`? `None` deadline =
180    /// never. Combines the two-step compare into one branch on the
181    /// niche-optimised `Option`.
182    #[inline]
183    pub(crate) fn is_expired_at(&self, now: Instant) -> bool {
184        match self.expire_at_ns {
185            None => false,
186            Some(ns) => unpack_deadline(ns) <= now,
187        }
188    }
189}
190
191// Pin the Entry layout: 32 (Value) + 8 (expire_at_ns, niche-opt) + 8 (packed)
192// = 48 bytes. Any padding regression (e.g. someone re-adding a 4-byte field
193// without packing) is caught at compile time.
194const _: () = {
195    assert!(std::mem::size_of::<Entry>() == 48);
196};
197
198/// Outcome of [`Store::rename`] — three-way result so the dispatch
199/// layer can pick the right RESP frame (`+OK` / `-ERR no such key` /
200/// `:0` for `RENAMENX`-with-existing-dst).
201#[derive(Debug, PartialEq, Eq)]
202pub enum RenameOutcome {
203    /// Source removed, destination created (overwriting any prior dst).
204    Renamed,
205    /// Source key doesn't exist.
206    NoSuchSrc,
207    /// `RENAMENX` only — destination already exists, no rename done.
208    DstExists,
209}
210
211/// Operation errors surfaced to the command layer.
212#[derive(Debug, PartialEq, Eq)]
213pub enum StoreError {
214    /// Key holds a different type than the command expects.
215    WrongType,
216    /// Value is not a base-10 integer (INCR family).
217    NotInteger,
218    /// Result would overflow `i64`.
219    Overflow,
220    /// Index outside the collection (LSET).
221    OutOfRange,
222    /// Key does not exist where the command requires one (LSET).
223    NoSuchKey,
224    /// Value is not a valid float (INCRBYFLOAT).
225    NotFloat,
226    /// `maxmemory` would be exceeded and the active eviction policy is
227    /// [`EvictionPolicy::NoEviction`]. Surfaces as Redis's classic OOM error
228    /// at the RESP layer.
229    OutOfMemory,
230}
231
232/// Maxmemory eviction policy. Mirror of `kevy_config::EvictionPolicy` —
233/// duplicated here so `kevy-store` stays a leaf crate (no `kevy-config` dep).
234#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
235pub enum EvictionPolicy {
236    /// Refuse writes once `maxmemory` is hit. Default.
237    #[default]
238    NoEviction,
239    /// Approximated LRU across all keys.
240    AllKeysLru,
241    /// Approximated LFU across all keys.
242    AllKeysLfu,
243    /// Random key across all keys.
244    AllKeysRandom,
245    /// Approximated LRU across keys with a TTL.
246    VolatileLru,
247    /// Approximated LFU across keys with a TTL.
248    VolatileLfu,
249    /// Random key from those with a TTL.
250    VolatileRandom,
251    /// Key with the shortest remaining TTL.
252    VolatileTtl,
253}
254
255impl EvictionPolicy {
256    /// Whether the policy ranks candidates by LRU clock (read-touches matter).
257    #[inline]
258    pub fn uses_lru(self) -> bool {
259        matches!(self, Self::AllKeysLru | Self::VolatileLru)
260    }
261
262    /// Whether the policy ranks candidates by LFU counter (read-touches and
263    /// log-counter increments matter).
264    #[inline]
265    pub fn uses_lfu(self) -> bool {
266        matches!(self, Self::AllKeysLfu | Self::VolatileLfu)
267    }
268
269    /// Whether the policy restricts eviction to keys that carry a TTL.
270    #[inline]
271    pub fn is_volatile(self) -> bool {
272        matches!(
273            self,
274            Self::VolatileLru | Self::VolatileLfu | Self::VolatileRandom | Self::VolatileTtl
275        )
276    }
277}
278
279/// A single-database keyspace.
280///
281/// The keyspace map is a [`KevyMap`] — a pure-Rust open-addressing Swiss
282/// table tuned for kevy's per-shard, single-trust-domain keyspace. The
283/// hasher is [`kevy_hash::KevyHash`] (one-call inlinable; no DoS hardening
284/// since the shard is single-threaded with no cross-trust keys). Owning the
285/// table also exposes bucket addresses for software prefetch on the batch
286/// driver.
287#[derive(Default)]
288pub struct Store {
289    pub(crate) map: KevyMap<SmallBytes, Entry>,
290    /// Live byte estimate (dynamic per-entry weights + [`ENTRY_OVERHEAD`] per
291    /// key). Compared against [`Self::maxmemory`] to drive eviction.
292    pub(crate) used_memory: u64,
293    /// Soft byte ceiling. `0` = unlimited; the entire accounting + eviction
294    /// machinery short-circuits to a single not-taken branch in that case.
295    pub(crate) maxmemory: u64,
296    /// Active eviction policy. Only consulted when `used_memory > maxmemory`.
297    pub(crate) eviction_policy: EvictionPolicy,
298    /// Total keys evicted by [`Self::try_evict_after_write`] — surfaced via
299    /// `INFO memory` / `MEMORY STATS`.
300    pub(crate) evictions_total: u64,
301    /// Monotonic access counter; the upper 32 bits are unused, the lower 32
302    /// stamp `Entry::lru_clock` on each access while eviction is enabled.
303    pub(crate) clock_counter: u64,
304    /// `used_memory` peak across the shard's lifetime; surfaced as
305    /// `used_memory_peak` in `INFO memory`.
306    pub(crate) used_memory_peak: u64,
307    /// Keys expired since startup (lazy reap path AND
308    /// [`Self::tick_expire`]). Surfaced via `INFO keyspace` / `MEMORY STATS`
309    /// once those fields land.
310    pub(crate) expired_keys_total: u64,
311    /// `WATCH` version counters — present only for keys that have been
312    /// `WATCH`-ed at least once. [`Self::record_watch`] inserts the entry
313    /// (version 0 = "never written since first watch"); every subsequent
314    /// write on this shard calls [`Self::bump_if_watched`] which increments
315    /// only if the key is present in the map. Keys never `WATCH`-ed pay
316    /// one empty-map hashmap lookup per write (~10 ns).
317    ///
318    /// The map grows monotonically — entries are never evicted, even
319    /// when no conn is currently watching the key. For high-key-churn
320    /// workloads this can become a memory item; v1.x acceptable since
321    /// the entry is `Vec<u8>` + `u64` (~ 30 B + key length) and only
322    /// touched on writes / WATCH calls.
323    pub(crate) watch_versions: std::collections::HashMap<Vec<u8>, u64>,
324}
325
326impl Store {
327    pub fn new() -> Self {
328        Store::default()
329    }
330
331    /// Install (or clear, with `maxmemory == 0`) the eviction limit and
332    /// policy. Cheap; safe to call repeatedly (e.g. on `CONFIG SET`).
333    #[inline]
334    pub fn set_max_memory(&mut self, maxmemory: u64, policy: EvictionPolicy) {
335        self.maxmemory = maxmemory;
336        self.eviction_policy = policy;
337    }
338
339    /// Live byte estimate (see field doc).
340    #[inline]
341    pub fn used_memory(&self) -> u64 {
342        self.used_memory
343    }
344
345    /// `used_memory` high-water mark since startup.
346    #[inline]
347    pub fn used_memory_peak(&self) -> u64 {
348        self.used_memory_peak
349    }
350
351    /// Configured `maxmemory` (0 = unlimited).
352    #[inline]
353    pub fn maxmemory(&self) -> u64 {
354        self.maxmemory
355    }
356
357    /// Configured eviction policy.
358    #[inline]
359    pub fn eviction_policy(&self) -> EvictionPolicy {
360        self.eviction_policy
361    }
362
363    /// Total keys evicted since startup.
364    #[inline]
365    pub fn evictions_total(&self) -> u64 {
366        self.evictions_total
367    }
368
369    /// `WATCH` — record this key in the version tracker and return its
370    /// current version. Subsequent writes on this shard bump the version
371    /// via [`Self::bump_if_watched`]. Caller (the conn's origin shard)
372    /// stores the returned version; `EXEC` later asks every owning shard
373    /// "is the version still N?" via [`Self::key_version`].
374    ///
375    /// Keys that have never been written stay at version 0 — the first
376    /// write after a `WATCH` bumps to 1, which is what makes the "dirty"
377    /// comparison work (stored 0 ≠ current 1 ⇒ abort EXEC).
378    pub fn record_watch(&mut self, key: &[u8]) -> u64 {
379        *self
380            .watch_versions
381            .entry(key.to_vec())
382            .or_insert(0)
383    }
384
385    /// Read-only version lookup used by `EXEC`'s pre-execution check.
386    /// Returns `0` for keys never `WATCH`-ed (matches the initial value
387    /// `record_watch` would have inserted, so a `WATCH` → no-write →
388    /// `EXEC` sequence sees the stored 0 == current 0 and proceeds).
389    #[inline]
390    pub fn key_version(&self, key: &[u8]) -> u64 {
391        self.watch_versions.get(key).copied().unwrap_or(0)
392    }
393
394    /// Bump the version of `key` if (and only if) it has been
395    /// `WATCH`-ed at least once. Called from the write side of
396    /// `exec_op` after every successful mutation. Cost when no key is
397    /// watched: one empty-map lookup (~10 ns); when watched: lookup +
398    /// in-place u64 increment.
399    #[inline]
400    pub fn bump_if_watched(&mut self, key: &[u8]) {
401        if let Some(v) = self.watch_versions.get_mut(key) {
402            *v = v.wrapping_add(1);
403        }
404    }
405
406    /// Invalidate every watched key in one shot. Called from `FLUSHDB`
407    /// / `FLUSHALL` execution paths — every WATCH against this shard
408    /// must invalidate so a pending `EXEC` aborts.
409    pub fn bump_all_watched(&mut self) {
410        for v in self.watch_versions.values_mut() {
411            *v = v.wrapping_add(1);
412        }
413    }
414
415    /// Cached weight of `key` (dynamic part + [`ENTRY_OVERHEAD`]). Returns
416    /// `None` when the key is absent or expired (no implicit reap).
417    pub fn estimate_key_bytes(&self, key: &[u8]) -> Option<u64> {
418        self.map.get(key).map(|e| e.weight() + ENTRY_OVERHEAD)
419    }
420
421    /// O(1) precondition check the dispatch layer calls before every write
422    /// command. Returns `Err(OutOfMemory)` only when `maxmemory > 0`, the
423    /// budget is already over, AND the policy is `NoEviction` (Redis
424    /// behaviour). All other policies let the write proceed and recover via
425    /// [`Self::try_evict_after_write`].
426    #[inline]
427    pub fn precheck_for_write(&self) -> Result<(), StoreError> {
428        if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
429            return Ok(());
430        }
431        if self.eviction_policy == EvictionPolicy::NoEviction {
432            return Err(StoreError::OutOfMemory);
433        }
434        Ok(())
435    }
436
437    /// Run after every write command. No-op when disabled or under budget;
438    /// otherwise samples per [`Self::eviction_policy`] and removes keys until
439    /// back under `maxmemory` or no eligible candidate remains. Returns the
440    /// number of keys evicted (0 on the common fast path).
441    #[inline]
442    pub fn try_evict_after_write(&mut self) -> usize {
443        if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
444            return 0;
445        }
446        evict::evict_until_under_limit(self)
447    }
448
449}
450
451/// Apply a signed delta to a `u64` (saturating both directions). Used by
452/// `Store::account_delta` / `reweigh_entry` so the in-place mutators don't
453/// have to repeat the same overflow-guarded match.
454#[inline]
455pub(crate) fn apply_delta(v: &mut u64, delta: i64) {
456    if delta >= 0 {
457        *v = v.saturating_add(delta as u64);
458    } else {
459        *v = v.saturating_sub((-delta) as u64);
460    }
461}
462
463/// Heap bytes a `SmallBytes`-encoded key would own. Mirrors
464/// `SmallBytes::heap_bytes` but takes `&[u8]` so the helper is reachable from
465/// places that don't yet have the typed `SmallBytes` (e.g. `reweigh_entry`).
466/// The 22-byte inline boundary is shared with the `kevy-bytes` crate.
467#[inline]
468pub(crate) fn key_heap_bytes_for(key: &[u8]) -> u64 {
469    if key.len() <= 22 { 0 } else { key.len() as u64 }
470}
471
472#[cfg(test)]
473mod tests;