kevy_store/lib.rs
1//! kevy-store — the keyspace.
2//!
3//! A single-threaded, multi-type keyspace with lazy expiration. Each Redis data
4//! type is backed by a modern `std` structure — behaviour-compatible, but **not**
5//! Redis's legacy encodings:
6//!
7//! | Type | Backing structure |
8//! |------|-------------------|
9//! | String | `Vec<u8>` |
10//! | Hash / Set | `HashMap` / `HashSet` (hashbrown Swiss table) |
11//! | List | `VecDeque` (ring buffer, O(1) ends) |
12//! | Sorted set | `HashMap` + `BTreeSet<(score, member)>` (a B-tree, not a skiplist) |
13//!
14//! Wrong-type access returns [`StoreError::WrongType`]. The API is `&mut self`
15//! and lock-free, so a thread-per-core runtime ([kevy-rt]) can own one shard per
16//! core with no locking. Part of the [kevy] key–value server.
17//!
18//! `maxmemory` enforcement + 8 eviction policies live in [`evict`]; toggle via
19//! [`Store::set_max_memory`]. With `maxmemory == 0` (the default) the hot-path
20//! cost collapses to a single predicted-not-taken branch, matching the
21//! "unlimited" mode in Redis byte-for-byte.
22//!
23//! [kevy]: https://crates.io/crates/kevy
24//! [kevy-rt]: https://crates.io/crates/kevy-rt
25//!
26//! # Example
27//!
28//! ```
29//! use kevy_store::Store;
30//!
31//! let mut s = Store::new();
32//! s.set(b"greeting", b"hello".to_vec(), None, false, false);
33//! assert_eq!(s.get(b"greeting").unwrap(), Some(&b"hello"[..]));
34//!
35//! s.hset(b"user:1", &[(b"name".to_vec(), b"alice".to_vec())]).unwrap();
36//! assert_eq!(s.hget(b"user:1", b"name").unwrap(), Some(&b"alice"[..]));
37//!
38//! // A string command on a hash key is a type error, as in Redis.
39//! assert_eq!(s.get(b"user:1"), Err(kevy_store::StoreError::WrongType));
40//! ```
41#![forbid(unsafe_code)]
42
43mod accounting;
44mod clock;
45mod entry;
46pub mod evict;
47pub mod expire;
48pub use expire::ExpireStats;
49pub(crate) use entry::Entry;
50mod hash;
51mod keyspace;
52mod list;
53mod set;
54mod snapshot;
55pub use snapshot::SnapshotView;
56mod stream;
57mod string;
58mod util;
59mod value;
60mod zset;
61pub use stream::{
62 AutoclaimResult, ConsumerGroup, ConsumerState, EntryBatch, GroupCreateMode,
63 LoadedGroup, LoadedPelEntry, LoadedStreamEntry, PelEntry, PendingExtended,
64 PendingExtendedRow, PendingSummary, ReadGroupId, StreamData, StreamId, StreamIdError,
65 XAddIdSpec, XClaimOpts, now_unix_ms, parse_explicit_id, parse_range_end,
66 parse_range_start, parse_xadd_id,
67};
68pub use util::glob_match;
69pub use value::*;
70
71pub(crate) use clock::{deadline_at, now_ns, pack_deadline, remaining_ms};
72use kevy_map::KevyMap;
73
74/// Feed kevy's monotonic clock on `wasm32-unknown-unknown`, which has no
75/// `Instant`. The embedding host advances time (ns since an arbitrary fixed
76/// epoch, e.g. `Date.now() * 1e6`) before TTL-sensitive ops and once per
77/// reaper tick. No-op concept on native targets, where the OS clock is the
78/// source — hence wasm-only.
79#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
80pub use clock::set_clock_ns;
81/// Feed kevy's wall clock (Unix-epoch millis, e.g. `Date.now()`) on
82/// `wasm32-unknown-unknown`, where `SystemTime::now()` traps. Used by `XADD`
83/// auto-IDs and `EXPIREAT`/`PEXPIREAT`.
84#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
85pub use clock::set_wall_clock_ms;
86
87
88/// Outcome of [`Store::rename`] — three-way result so the dispatch
89/// layer can pick the right RESP frame (`+OK` / `-ERR no such key` /
90/// `:0` for `RENAMENX`-with-existing-dst).
91#[derive(Debug, PartialEq, Eq)]
92pub enum RenameOutcome {
93 /// Source removed, destination created (overwriting any prior dst).
94 Renamed,
95 /// Source key doesn't exist.
96 NoSuchSrc,
97 /// `RENAMENX` only — destination already exists, no rename done.
98 DstExists,
99}
100
101/// Operation errors surfaced to the command layer.
102#[derive(Debug, PartialEq, Eq)]
103pub enum StoreError {
104 /// Key holds a different type than the command expects.
105 WrongType,
106 /// Value is not a base-10 integer (INCR family).
107 NotInteger,
108 /// Result would overflow `i64`.
109 Overflow,
110 /// Index outside the collection (LSET).
111 OutOfRange,
112 /// Key does not exist where the command requires one (LSET).
113 NoSuchKey,
114 /// Value is not a valid float (INCRBYFLOAT).
115 NotFloat,
116 /// `maxmemory` would be exceeded and the active eviction policy is
117 /// [`EvictionPolicy::NoEviction`]. Surfaces as Redis's classic OOM error
118 /// at the RESP layer.
119 OutOfMemory,
120}
121
122/// Maxmemory eviction policy. Mirror of `kevy_config::EvictionPolicy` —
123/// duplicated here so `kevy-store` stays a leaf crate (no `kevy-config` dep).
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
125pub enum EvictionPolicy {
126 /// Refuse writes once `maxmemory` is hit. Default.
127 #[default]
128 NoEviction,
129 /// Approximated LRU across all keys.
130 AllKeysLru,
131 /// Approximated LFU across all keys.
132 AllKeysLfu,
133 /// Random key across all keys.
134 AllKeysRandom,
135 /// Approximated LRU across keys with a TTL.
136 VolatileLru,
137 /// Approximated LFU across keys with a TTL.
138 VolatileLfu,
139 /// Random key from those with a TTL.
140 VolatileRandom,
141 /// Key with the shortest remaining TTL.
142 VolatileTtl,
143}
144
145impl EvictionPolicy {
146 /// Whether the policy ranks candidates by LRU clock (read-touches matter).
147 #[inline]
148 pub fn uses_lru(self) -> bool {
149 matches!(self, Self::AllKeysLru | Self::VolatileLru)
150 }
151
152 /// Whether the policy ranks candidates by LFU counter (read-touches and
153 /// log-counter increments matter).
154 #[inline]
155 pub fn uses_lfu(self) -> bool {
156 matches!(self, Self::AllKeysLfu | Self::VolatileLfu)
157 }
158
159 /// Whether the policy restricts eviction to keys that carry a TTL.
160 #[inline]
161 pub fn is_volatile(self) -> bool {
162 matches!(
163 self,
164 Self::VolatileLru | Self::VolatileLfu | Self::VolatileRandom | Self::VolatileTtl
165 )
166 }
167}
168
169/// A single-database keyspace.
170///
171/// The keyspace map is a [`KevyMap`] — a pure-Rust open-addressing Swiss
172/// table tuned for kevy's per-shard, single-trust-domain keyspace. The
173/// hasher is [`kevy_hash::KevyHash`] (one-call inlinable; no DoS hardening
174/// since the shard is single-threaded with no cross-trust keys). Owning the
175/// table also exposes bucket addresses for software prefetch on the batch
176/// driver.
177#[derive(Default)]
178pub struct Store {
179 pub(crate) map: KevyMap<SmallBytes, Entry>,
180 /// Coarse cached monotonic clock (ns since [`epoch`]), refreshed by the
181 /// reactor loop / reaper tick via [`Self::refresh_clock`]. Lazy expiry on
182 /// the read path (`live_entry`) compares deadlines against this instead of
183 /// calling `Instant::now()` per access — the Redis cached-`mstime` model.
184 /// `0` (the `Default`) reads as "epoch" → keys look live until the first
185 /// refresh, the safe direction (expires at most one refresh-interval late,
186 /// never early — writes stamp deadlines from a *fresh* clock).
187 pub(crate) cached_ns: u64,
188 /// Whether lazy expiry trusts `Self::cached_ns` (set by a reactor/reaper
189 /// that calls [`Self::refresh_clock`]) instead of reading a fresh clock per
190 /// access. Enabled by the server reactor and the embedded background
191 /// reaper; left `false` (the `Default`) for manual-reaper / bare-`Store`
192 /// use, where nothing refreshes the cache so each access reads fresh —
193 /// preserving "lazy expiry works without an explicit tick".
194 pub(crate) cached_clock: bool,
195 /// Live byte estimate (dynamic per-entry weights + [`ENTRY_OVERHEAD`] per
196 /// key). Compared against [`Self::maxmemory`] to drive eviction.
197 pub(crate) used_memory: u64,
198 /// Soft byte ceiling. `0` = unlimited; the entire accounting + eviction
199 /// machinery short-circuits to a single not-taken branch in that case.
200 pub(crate) maxmemory: u64,
201 /// Active eviction policy. Only consulted when `used_memory > maxmemory`.
202 pub(crate) eviction_policy: EvictionPolicy,
203 /// Total keys evicted by [`Self::try_evict_after_write`] — surfaced via
204 /// `INFO memory` / `MEMORY STATS`.
205 pub(crate) evictions_total: u64,
206 /// Monotonic access counter; the upper 32 bits are unused, the lower 32
207 /// stamp `Entry::lru_clock` on each access while eviction is enabled.
208 pub(crate) clock_counter: u64,
209 /// `used_memory` peak across the shard's lifetime; surfaced as
210 /// `used_memory_peak` in `INFO memory`.
211 pub(crate) used_memory_peak: u64,
212 /// Keys expired since startup (lazy reap path AND
213 /// [`Self::tick_expire`]). Surfaced via `INFO keyspace` / `MEMORY STATS`
214 /// once those fields land.
215 pub(crate) expired_keys_total: u64,
216 /// Count of live keys carrying a TTL — the size of Redis's "expire set"
217 /// (`INFO keyspace`'s `expires=`). Maintained in O(1) at every TTL
218 /// transition (`insert_entry` / `remove_entry` deltas + the in-place
219 /// EXPIRE / PERSIST / SET sites) so the gauge never pays an O(n) keyspace
220 /// scan; [`Self::ttl_pending_count`] is the O(n) ground truth used to
221 /// assert this counter never drifts.
222 pub(crate) expires: u64,
223 /// `WATCH` version counters — present only for keys that have been
224 /// `WATCH`-ed at least once. [`Self::record_watch`] inserts the entry
225 /// (version 0 = "never written since first watch"); every subsequent
226 /// write on this shard calls [`Self::bump_if_watched`] which increments
227 /// only if the key is present in the map. Keys never `WATCH`-ed pay
228 /// one empty-map hashmap lookup per write (~10 ns).
229 ///
230 /// The map grows monotonically — entries are never evicted, even
231 /// when no conn is currently watching the key. For high-key-churn
232 /// workloads this can become a memory item; v1.x acceptable since
233 /// the entry is `Vec<u8>` + `u64` (~ 30 B + key length) and only
234 /// touched on writes / WATCH calls.
235 pub(crate) watch_versions: std::collections::HashMap<Vec<u8>, u64>,
236}
237
238impl Store {
239 pub fn new() -> Self {
240 Store::default()
241 }
242
243 /// Refresh the coarse cached clock (`Self::cached_ns`) from a single
244 /// `Instant::now()`. Call once per reactor-loop batch / reaper tick; the
245 /// per-access read path then skips its own clock read. Lazy expiry is
246 /// coarse to this cadence (a key expires ≤ one refresh-interval late,
247 /// never early — writes stamp deadlines from a fresh clock).
248 #[inline]
249 pub fn refresh_clock(&mut self) {
250 self.cached_ns = now_ns();
251 }
252
253 /// Enable/disable trusting the cached clock for lazy expiry (see
254 /// `Self::cached_ns`). Call with `true` only when something refreshes the
255 /// clock regularly (the server reactor per batch, the embedded background
256 /// reaper per tick); leave `false` for manual-reaper mode. Seeds the cache
257 /// when enabling so the first access is accurate.
258 #[inline]
259 pub fn set_cached_clock(&mut self, on: bool) {
260 self.cached_clock = on;
261 if on {
262 self.refresh_clock();
263 }
264 }
265
266 /// Install (or clear, with `maxmemory == 0`) the eviction limit and
267 /// policy. Cheap; safe to call repeatedly (e.g. on `CONFIG SET`).
268 #[inline]
269 pub fn set_max_memory(&mut self, maxmemory: u64, policy: EvictionPolicy) {
270 self.maxmemory = maxmemory;
271 self.eviction_policy = policy;
272 }
273
274 /// Live byte estimate (see field doc).
275 #[inline]
276 pub fn used_memory(&self) -> u64 {
277 self.used_memory
278 }
279
280 /// `used_memory` high-water mark since startup.
281 #[inline]
282 pub fn used_memory_peak(&self) -> u64 {
283 self.used_memory_peak
284 }
285
286 /// Configured `maxmemory` (0 = unlimited).
287 #[inline]
288 pub fn maxmemory(&self) -> u64 {
289 self.maxmemory
290 }
291
292 /// Configured eviction policy.
293 #[inline]
294 pub fn eviction_policy(&self) -> EvictionPolicy {
295 self.eviction_policy
296 }
297
298 /// Total keys evicted since startup.
299 #[inline]
300 pub fn evictions_total(&self) -> u64 {
301 self.evictions_total
302 }
303
304 /// Live keys carrying a TTL (`INFO keyspace`'s `expires=`). O(1) — reads
305 /// the maintained counter, not an O(n) scan (cf. [`Self::ttl_pending_count`]).
306 #[inline]
307 pub fn expires_count(&self) -> usize {
308 self.expires as usize
309 }
310
311 /// Apply a signed delta to the [`Self::expires`] counter, clamped at 0.
312 /// Centralises the saturating arithmetic for every TTL-transition site.
313 #[inline]
314 pub(crate) fn adjust_expires(&mut self, delta: i64) {
315 if delta != 0 {
316 self.expires = (self.expires as i64 + delta).max(0) as u64;
317 }
318 }
319
320 /// `WATCH` — record this key in the version tracker and return its
321 /// current version. Subsequent writes on this shard bump the version
322 /// via [`Self::bump_if_watched`]. Caller (the conn's origin shard)
323 /// stores the returned version; `EXEC` later asks every owning shard
324 /// "is the version still N?" via [`Self::key_version`].
325 ///
326 /// Keys that have never been written stay at version 0 — the first
327 /// write after a `WATCH` bumps to 1, which is what makes the "dirty"
328 /// comparison work (stored 0 ≠ current 1 ⇒ abort EXEC).
329 pub fn record_watch(&mut self, key: &[u8]) -> u64 {
330 *self
331 .watch_versions
332 .entry(key.to_vec())
333 .or_insert(0)
334 }
335
336 /// Read-only version lookup used by `EXEC`'s pre-execution check.
337 /// Returns `0` for keys never `WATCH`-ed (matches the initial value
338 /// `record_watch` would have inserted, so a `WATCH` → no-write →
339 /// `EXEC` sequence sees the stored 0 == current 0 and proceeds).
340 #[inline]
341 pub fn key_version(&self, key: &[u8]) -> u64 {
342 self.watch_versions.get(key).copied().unwrap_or(0)
343 }
344
345 /// Bump the version of `key` if (and only if) it has been `WATCH`-ed at
346 /// least once. Write-side call after every mutation. The empty check
347 /// runs BEFORE the key is hashed — the common nothing-watched case
348 /// pays one branch, not a guaranteed-miss probe.
349 #[inline]
350 pub fn bump_if_watched(&mut self, key: &[u8]) {
351 if self.watch_versions.is_empty() {
352 return;
353 }
354 if let Some(v) = self.watch_versions.get_mut(key) {
355 *v = v.wrapping_add(1);
356 }
357 }
358
359 /// Invalidate every watched key in one shot. Called from `FLUSHDB`
360 /// / `FLUSHALL` execution paths — every WATCH against this shard
361 /// must invalidate so a pending `EXEC` aborts.
362 pub fn bump_all_watched(&mut self) {
363 for v in self.watch_versions.values_mut() {
364 *v = v.wrapping_add(1);
365 }
366 }
367
368 /// Cached weight of `key` (dynamic part + [`ENTRY_OVERHEAD`]). Returns
369 /// `None` when the key is absent or expired (no implicit reap).
370 pub fn estimate_key_bytes(&self, key: &[u8]) -> Option<u64> {
371 self.map.get(key).map(|e| e.weight() + ENTRY_OVERHEAD)
372 }
373
374 /// O(1) precondition check the dispatch layer calls before every write
375 /// command. Returns `Err(OutOfMemory)` only when `maxmemory > 0`, the
376 /// budget is already over, AND the policy is `NoEviction` (Redis
377 /// behaviour). All other policies let the write proceed and recover via
378 /// [`Self::try_evict_after_write`].
379 #[inline]
380 pub fn precheck_for_write(&self) -> Result<(), StoreError> {
381 if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
382 return Ok(());
383 }
384 if self.eviction_policy == EvictionPolicy::NoEviction {
385 return Err(StoreError::OutOfMemory);
386 }
387 Ok(())
388 }
389
390 /// Run after every write command. No-op when disabled or under budget;
391 /// otherwise samples per [`Self::eviction_policy`] and removes keys until
392 /// back under `maxmemory` or no eligible candidate remains. Returns the
393 /// number of keys evicted (0 on the common fast path).
394 #[inline]
395 pub fn try_evict_after_write(&mut self) -> usize {
396 if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
397 return 0;
398 }
399 evict::evict_until_under_limit(self)
400 }
401
402}
403
404/// Apply a signed delta to a `u64` (saturating both directions). Used by
405/// `Store::account_delta` / `reweigh_entry` so the in-place mutators don't
406/// have to repeat the same overflow-guarded match.
407#[inline]
408pub(crate) fn apply_delta(v: &mut u64, delta: i64) {
409 if delta >= 0 {
410 *v = v.saturating_add(delta as u64);
411 } else {
412 *v = v.saturating_sub((-delta) as u64);
413 }
414}
415
416/// Heap bytes a `SmallBytes`-encoded key would own (`&[u8]` mirror of
417/// `SmallBytes::heap_bytes`; 22-byte inline boundary per `kevy-bytes`).
418#[inline]
419pub(crate) fn key_heap_bytes_for(key: &[u8]) -> u64 {
420 if key.len() <= 22 { 0 } else { key.len() as u64 }
421}
422
423#[cfg(test)]
424mod tests;
425#[cfg(test)]
426mod tests_memory;
427#[cfg(test)]
428mod tests_snapshot;