Skip to main content

ember_core/keyspace/
mod.rs

1//! The keyspace: Ember's core key-value store.
2//!
3//! A `Keyspace` owns a flat `AHashMap<CompactString, Entry>` and handles
4//! get, set, delete, existence checks, and TTL management. Expired
5//! keys are removed lazily on access. Memory usage is tracked on
6//! every mutation for eviction and stats reporting.
7
8use std::collections::VecDeque;
9use std::time::Duration;
10
11use ahash::AHashMap;
12use bytes::Bytes;
13use compact_str::CompactString;
14use rand::seq::IteratorRandom;
15
16use tracing::warn;
17
18use crate::dropper::DropHandle;
19use crate::memory::{self, MemoryTracker};
20use crate::time;
21use crate::types::sorted_set::{ScoreBound, SortedSet, ZAddFlags};
22use crate::types::{self, normalize_range, Value};
23
24mod hash;
25mod list;
26#[cfg(feature = "protobuf")]
27mod proto;
28mod set;
29mod string;
30#[cfg(feature = "vector")]
31mod vector;
32mod zset;
33
34const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
35const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
36
37/// Error returned when a command is used against a key holding the wrong type.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct WrongType;
40
41impl std::fmt::Display for WrongType {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(f, "{WRONGTYPE_MSG}")
44    }
45}
46
47impl std::error::Error for WrongType {}
48
49/// Error returned by write operations that may fail due to type mismatch
50/// or memory limits.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum WriteError {
53    /// The key holds a different type than expected.
54    WrongType,
55    /// Memory limit reached and eviction couldn't free enough space.
56    OutOfMemory,
57}
58
59impl From<WrongType> for WriteError {
60    fn from(_: WrongType) -> Self {
61        WriteError::WrongType
62    }
63}
64
65/// Errors that can occur during INCR/DECR operations.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum IncrError {
68    /// Key holds a non-string type.
69    WrongType,
70    /// Value is not a valid integer.
71    NotAnInteger,
72    /// Increment or decrement would overflow i64.
73    Overflow,
74    /// Memory limit reached.
75    OutOfMemory,
76}
77
78impl std::fmt::Display for IncrError {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
82            IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
83            IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
84            IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
85        }
86    }
87}
88
89impl std::error::Error for IncrError {}
90
91/// Errors that can occur during INCRBYFLOAT operations.
92#[derive(Debug, Clone, PartialEq)]
93pub enum IncrFloatError {
94    /// Key holds a non-string type.
95    WrongType,
96    /// Value is not a valid float.
97    NotAFloat,
98    /// Result would be NaN or Infinity.
99    NanOrInfinity,
100    /// Memory limit reached.
101    OutOfMemory,
102}
103
104impl std::fmt::Display for IncrFloatError {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
108            IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
109            IncrFloatError::NanOrInfinity => {
110                write!(f, "ERR increment would produce NaN or Infinity")
111            }
112            IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
113        }
114    }
115}
116
117impl std::error::Error for IncrFloatError {}
118
119/// Error returned when RENAME fails because the source key doesn't exist.
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum RenameError {
122    /// The source key does not exist.
123    NoSuchKey,
124}
125
126/// Error returned by COPY.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CopyError {
129    /// The source key does not exist.
130    NoSuchKey,
131    /// Memory limit reached and eviction couldn't free enough space.
132    OutOfMemory,
133}
134
135/// Error returned by LSET.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum LsetError {
138    /// Key holds a different type.
139    WrongType,
140    /// Key does not exist.
141    NoSuchKey,
142    /// Index is beyond list bounds.
143    IndexOutOfRange,
144}
145
146impl std::fmt::Display for LsetError {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            LsetError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
150            LsetError::NoSuchKey => write!(f, "ERR no such key"),
151            LsetError::IndexOutOfRange => write!(f, "ERR index out of range"),
152        }
153    }
154}
155
156impl std::error::Error for LsetError {}
157
158impl std::fmt::Display for RenameError {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            RenameError::NoSuchKey => write!(f, "ERR no such key"),
162        }
163    }
164}
165
166impl std::error::Error for RenameError {}
167
168/// Result of a ZADD operation, containing both the client-facing count
169/// and the list of members that were actually applied (for AOF correctness).
170#[derive(Debug, Clone)]
171pub struct ZAddResult {
172    /// Number of members added (or added+updated if CH flag was set).
173    pub count: usize,
174    /// Members that were actually inserted or had their score updated.
175    /// Only these should be persisted to the AOF.
176    pub applied: Vec<(f64, String)>,
177}
178
179/// Result of a VADD operation, carrying the applied element for AOF persistence.
180#[cfg(feature = "vector")]
181#[derive(Debug, Clone)]
182pub struct VAddResult {
183    /// The element name that was added or updated.
184    pub element: String,
185    /// The vector that was stored.
186    pub vector: Vec<f32>,
187    /// Whether a new element was added (false = updated existing).
188    pub added: bool,
189}
190
191/// Result of a VADD_BATCH operation.
192#[cfg(feature = "vector")]
193#[derive(Debug, Clone)]
194pub struct VAddBatchResult {
195    /// Number of newly added elements (not updates).
196    pub added_count: usize,
197    /// Elements that were actually inserted or updated, with their vectors.
198    /// Only these should be persisted to the AOF.
199    pub applied: Vec<(String, Vec<f32>)>,
200}
201
202/// Errors from vector write operations.
203#[cfg(feature = "vector")]
204#[derive(Debug, Clone)]
205pub enum VectorWriteError {
206    /// The key holds a different type than expected.
207    WrongType,
208    /// Memory limit reached.
209    OutOfMemory,
210    /// usearch index error (dimension mismatch, capacity, etc).
211    IndexError(String),
212    /// A batch insert partially succeeded before encountering an error.
213    /// The applied vectors should still be persisted to the AOF.
214    PartialBatch {
215        message: String,
216        applied: Vec<(String, Vec<f32>)>,
217    },
218}
219
220/// How the keyspace should handle writes when the memory limit is reached.
221#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
222pub enum EvictionPolicy {
223    /// Return an error on writes when memory is full.
224    #[default]
225    NoEviction,
226    /// Evict the least-recently-used key (approximated via random sampling).
227    AllKeysLru,
228}
229
230/// Configuration for a single keyspace / shard.
231#[derive(Debug, Clone)]
232pub struct ShardConfig {
233    /// Maximum memory in bytes. `None` means unlimited.
234    pub max_memory: Option<usize>,
235    /// What to do when memory is full.
236    pub eviction_policy: EvictionPolicy,
237    /// Numeric identifier for this shard (used for persistence file naming).
238    pub shard_id: u16,
239}
240
241impl Default for ShardConfig {
242    fn default() -> Self {
243        Self {
244            max_memory: None,
245            eviction_policy: EvictionPolicy::NoEviction,
246            shard_id: 0,
247        }
248    }
249}
250
251/// Result of a set operation that may fail under memory pressure.
252#[derive(Debug, PartialEq, Eq)]
253pub enum SetResult {
254    /// The key was stored successfully.
255    Ok,
256    /// Memory limit reached and eviction policy is NoEviction.
257    OutOfMemory,
258    /// NX/XX condition was not met (key existed for NX, or didn't for XX).
259    Blocked,
260}
261
262/// A single entry in the keyspace: a value plus optional expiration
263/// and last access time for LRU approximation.
264///
265/// Field order is chosen for cache-line packing: `value` and
266/// `expires_at_ms` (the hot-path read fields) sit at the front so
267/// they share the first L1 cache line with the HashMap key pointer.
268/// `cached_value_size` is warm (used on writes). `last_access_secs`
269/// is cold (only used during eviction sampling).
270///
271/// Version tracking for WATCH/EXEC lives in a separate side table on
272/// Keyspace (`versions` map), not on every entry. This saves 8 bytes
273/// per entry since <1% of keys are ever WATCHed.
274#[derive(Debug, Clone)]
275pub(crate) struct Entry {
276    pub(crate) value: Value,
277    /// Monotonic expiry timestamp in ms. 0 = no expiry.
278    pub(crate) expires_at_ms: u64,
279    /// Cached result of `memory::value_size(&self.value)`. Updated on
280    /// every mutation so that memory accounting is O(1) instead of
281    /// walking entire collections. Using u32 saves 4 bytes per entry;
282    /// max value size is 512 MB which fits comfortably in u32 (~4 GB).
283    pub(crate) cached_value_size: u32,
284    /// Monotonic last access time in seconds since process start (for LRU).
285    /// Using u32 saves 4 bytes per entry; wraps at ~136 years.
286    pub(crate) last_access_secs: u32,
287}
288
289impl Entry {
290    fn new(value: Value, ttl: Option<Duration>) -> Self {
291        let cached_value_size = memory::value_size(&value) as u32;
292        Self {
293            value,
294            expires_at_ms: time::expiry_from_duration(ttl),
295            cached_value_size,
296            last_access_secs: time::now_secs(),
297        }
298    }
299
300    /// Returns `true` if this entry has passed its expiration time.
301    fn is_expired(&self) -> bool {
302        time::is_expired(self.expires_at_ms)
303    }
304
305    /// Marks this entry as accessed right now. When `track` is false
306    /// (NoEviction policy), this is a no-op — skipping the `now_secs()`
307    /// call on every access.
308    #[inline(always)]
309    fn touch(&mut self, track: bool) {
310        if track {
311            self.last_access_secs = time::now_secs();
312        }
313    }
314
315    /// Returns the full estimated memory footprint of this entry
316    /// (key + value + overhead) using the cached value size.
317    fn entry_size(&self, key: &str) -> usize {
318        key.len() + self.cached_value_size as usize + memory::ENTRY_OVERHEAD
319    }
320}
321
322/// Result of a TTL query, matching Redis semantics.
323#[derive(Debug, Clone, PartialEq, Eq)]
324pub enum TtlResult {
325    /// Key exists and has a TTL. Returns remaining seconds.
326    Seconds(u64),
327    /// Key exists and has a TTL. Returns remaining milliseconds.
328    Milliseconds(u64),
329    /// Key exists but has no expiration set.
330    NoExpiry,
331    /// Key does not exist.
332    NotFound,
333}
334
335/// Aggregated statistics for a keyspace.
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct KeyspaceStats {
338    /// Number of live keys.
339    pub key_count: usize,
340    /// Estimated memory usage in bytes.
341    pub used_bytes: usize,
342    /// Number of keys with an expiration set.
343    pub keys_with_expiry: usize,
344    /// Cumulative count of keys removed by expiration (lazy + active).
345    pub keys_expired: u64,
346    /// Cumulative count of keys removed by eviction.
347    pub keys_evicted: u64,
348    /// Cumulative count of write commands rejected due to memory limits.
349    pub oom_rejections: u64,
350}
351
352/// Number of random keys to sample when looking for an eviction candidate.
353///
354/// Eviction uses sampling-based approximate LRU — we randomly select this many
355/// keys and evict the least-recently-accessed among them. This trades perfect
356/// LRU accuracy for O(1) eviction (no sorted structure to maintain).
357///
358/// Larger sample sizes give better LRU approximation but cost more per eviction.
359/// 16 is a reasonable balance — similar to Redis's default sample size. With
360/// 16 samples, we statistically find a good eviction candidate while keeping
361/// eviction overhead low even at millions of keys.
362const EVICTION_SAMPLE_SIZE: usize = 16;
363
364/// The core key-value store.
365///
366/// All operations are single-threaded per shard — no internal locking.
367/// Memory usage is tracked incrementally on every mutation.
368pub struct Keyspace {
369    entries: AHashMap<CompactString, Entry>,
370    memory: MemoryTracker,
371    config: ShardConfig,
372    /// Number of entries that currently have an expiration set.
373    expiry_count: usize,
374    /// Cumulative count of keys removed by expiration (lazy + active).
375    expired_total: u64,
376    /// Cumulative count of keys removed by eviction.
377    evicted_total: u64,
378    /// Cumulative count of write rejections due to memory limits.
379    oom_rejections: u64,
380    /// When set, large values are dropped on a background thread instead
381    /// of inline on the shard thread. See [`crate::dropper`].
382    drop_handle: Option<DropHandle>,
383    /// Monotonic counter for entry versions. Each mutation gets the next
384    /// value, giving WATCH a cheap way to detect concurrent writes.
385    next_version: u64,
386    /// Side table for WATCH/EXEC version tracking. Only populated for
387    /// keys that have been WATCHed. On mutation, we bump the version
388    /// only if the key exists in this map — which is almost never,
389    /// so the hot path pays only a fast hash-miss lookup.
390    versions: AHashMap<CompactString, u64>,
391    /// Whether to update `last_access_secs` on every access. Only useful
392    /// when the eviction policy needs LRU timestamps (AllKeysLru).
393    /// When false (NoEviction), we skip the `now_secs()` call on every
394    /// read/write — saving a syscall per operation on the hot path.
395    track_access: bool,
396}
397
398impl Keyspace {
399    /// Creates a new, empty keyspace with default config (no memory limit).
400    pub fn new() -> Self {
401        Self::with_config(ShardConfig::default())
402    }
403
404    /// Creates a new, empty keyspace with the given config.
405    pub fn with_config(config: ShardConfig) -> Self {
406        let track_access = config.eviction_policy == EvictionPolicy::AllKeysLru;
407        Self {
408            entries: AHashMap::new(),
409            memory: MemoryTracker::new(),
410            config,
411            expiry_count: 0,
412            expired_total: 0,
413            evicted_total: 0,
414            oom_rejections: 0,
415            drop_handle: None,
416            next_version: 0,
417            versions: AHashMap::new(),
418            track_access,
419        }
420    }
421
422    /// Attaches a background drop handle for lazy free. When set, large
423    /// values removed by del/eviction/expiration are dropped on a
424    /// background thread instead of blocking the shard.
425    pub fn set_drop_handle(&mut self, handle: DropHandle) {
426        self.drop_handle = Some(handle);
427    }
428
429    /// Bumps the version for a key in the side table, but only if the
430    /// key is being tracked (i.e. someone WATCHed it). When no keys
431    /// are watched, the versions map is empty and this is a fast miss.
432    fn bump_version(&mut self, key: &str) {
433        if let Some(ver) = self.versions.get_mut(key) {
434            self.next_version += 1;
435            *ver = self.next_version;
436        }
437    }
438
439    /// Returns the current version of a key, or `None` if the key is
440    /// missing or expired. Used by WATCH/EXEC — cold path only.
441    ///
442    /// If the key hasn't been WATCHed yet, inserts the current
443    /// `next_version` so that future mutations will be detected.
444    pub fn key_version(&mut self, key: &str) -> Option<u64> {
445        let entry = self.entries.get(key)?;
446        if entry.is_expired() {
447            return None;
448        }
449        let ver = *self
450            .versions
451            .entry(CompactString::from(key))
452            .or_insert(self.next_version);
453        Some(ver)
454    }
455
456    /// Removes version tracking for a key. Called on key deletion,
457    /// expiration, and eviction so the side table doesn't leak.
458    fn remove_version(&mut self, key: &str) {
459        self.versions.remove(key);
460    }
461
462    /// Removes all version tracking entries. Called on UNWATCH/DISCARD
463    /// or FLUSHDB.
464    pub fn clear_versions(&mut self) {
465        self.versions.clear();
466    }
467
468    /// Decrements the expiry count if the entry had a TTL set.
469    fn decrement_expiry_if_set(&mut self, entry: &Entry) {
470        if entry.expires_at_ms != 0 {
471            self.expiry_count = self.expiry_count.saturating_sub(1);
472        }
473    }
474
475    /// Cleans up after removing an element from a collection (list, sorted
476    /// set, hash, or set). If the collection is now empty, removes the key
477    /// entirely and subtracts `old_size` from the memory tracker. Otherwise
478    /// subtracts `removed_bytes` (the byte cost of the removed element(s))
479    /// without rescanning the remaining collection, and updates the cached
480    /// value size.
481    fn cleanup_after_remove(
482        &mut self,
483        key: &str,
484        old_size: usize,
485        is_empty: bool,
486        removed_bytes: usize,
487    ) {
488        if is_empty {
489            if let Some(removed) = self.entries.remove(key) {
490                self.decrement_expiry_if_set(&removed);
491            }
492            self.memory.remove_with_size(old_size);
493        } else {
494            self.memory.shrink_by(removed_bytes);
495            if let Some(entry) = self.entries.get_mut(key) {
496                entry.cached_value_size =
497                    (entry.cached_value_size as usize).saturating_sub(removed_bytes) as u32;
498            }
499        }
500    }
501
502    /// Checks whether a key either doesn't exist or holds the expected
503    /// collection type. Returns `Ok(true)` if the key is new,
504    /// `Ok(false)` if it exists with the right type, or `Err(WrongType)`
505    /// if the key exists with a different type.
506    fn ensure_collection_type(
507        &self,
508        key: &str,
509        type_check: fn(&Value) -> bool,
510    ) -> Result<bool, WriteError> {
511        match self.entries.get(key) {
512            None => Ok(true),
513            Some(e) if type_check(&e.value) => Ok(false),
514            Some(_) => Err(WriteError::WrongType),
515        }
516    }
517
518    /// Estimates the memory cost of a collection write and enforces the
519    /// limit. `base_overhead` is the fixed cost of a new collection
520    /// (e.g. VECDEQUE_BASE_OVERHEAD). Returns `Ok(())` on success.
521    fn reserve_memory(
522        &mut self,
523        is_new: bool,
524        key: &str,
525        base_overhead: usize,
526        element_increase: usize,
527    ) -> Result<(), WriteError> {
528        let estimated_increase = if is_new {
529            memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
530        } else {
531            element_increase
532        };
533        if self.enforce_memory_limit(estimated_increase) {
534            Ok(())
535        } else {
536            Err(WriteError::OutOfMemory)
537        }
538    }
539
540    /// Inserts a new key with an empty collection value. Used by
541    /// collection-write methods after type-checking and memory reservation.
542    fn insert_empty(&mut self, key: &str, value: Value) {
543        self.memory.add(key, &value);
544        let entry = Entry::new(value, None);
545        self.entries.insert(CompactString::from(key), entry);
546        self.bump_version(key);
547    }
548
549    /// Measures entry size before and after a mutation, adjusting the
550    /// memory tracker for the difference.
551    ///
552    /// Uses `cached_value_size` for the pre-mutation size (O(1)) and
553    /// recomputes after the mutation to update the cache. This halves
554    /// the cost of memory tracking for large collections.
555    fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
556        let entry = self.entries.get_mut(key)?;
557        let old_size = entry.entry_size(key);
558        let result = f(entry);
559        // re-lookup after mutation (f consumed the borrow)
560        let entry = self.entries.get_mut(key)?;
561        let new_value_size = memory::value_size(&entry.value);
562        entry.cached_value_size = new_value_size as u32;
563        let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
564        self.memory.adjust(old_size, new_size);
565        self.bump_version(key);
566        Some(result)
567    }
568
569    /// Adjusts the expiry count when replacing an entry whose TTL status
570    /// may have changed (e.g. SET overwriting an existing key).
571    fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
572        match (had_expiry, has_expiry) {
573            (false, true) => self.expiry_count += 1,
574            (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
575            _ => {}
576        }
577    }
578
579    /// Tries to evict one key using LRU approximation.
580    ///
581    /// Randomly samples `EVICTION_SAMPLE_SIZE` keys and removes the one
582    /// with the oldest `last_access` time. Returns `true` if a key was
583    /// evicted, `false` if the keyspace is empty.
584    ///
585    /// Uses reservoir sampling with k=1 to avoid allocating a Vec on
586    /// every eviction attempt. The victim key index is remembered
587    /// rather than cloned, eliminating a heap allocation on the hot path.
588    fn try_evict(&mut self) -> bool {
589        if self.entries.is_empty() {
590            return false;
591        }
592
593        let mut rng = rand::rng();
594
595        // reservoir sample k=1 from the iterator, tracking the oldest
596        // entry by last_access_secs. this replaces choose_multiple() which
597        // allocates a Vec internally.
598        let mut best_key: Option<&str> = None;
599        let mut best_access = u32::MAX;
600        let mut seen = 0usize;
601
602        for (key, entry) in &self.entries {
603            // reservoir sampling: include this item with probability
604            // EVICTION_SAMPLE_SIZE / (seen + 1), capped once we have
605            // enough candidates
606            seen += 1;
607            if seen <= EVICTION_SAMPLE_SIZE {
608                if entry.last_access_secs < best_access {
609                    best_access = entry.last_access_secs;
610                    best_key = Some(&**key);
611                }
612            } else {
613                use rand::Rng;
614                let j = rng.random_range(0..seen);
615                if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
616                    best_access = entry.last_access_secs;
617                    best_key = Some(&**key);
618                }
619            }
620        }
621
622        if let Some(victim) = best_key {
623            // own the key to break the immutable borrow on self.entries
624            let victim = victim.to_owned();
625            if let Some(entry) = self.entries.remove(victim.as_str()) {
626                self.memory.remove(&victim, &entry.value);
627                self.decrement_expiry_if_set(&entry);
628                self.evicted_total += 1;
629                self.remove_version(&victim);
630                self.defer_drop(entry.value);
631                return true;
632            }
633        }
634        false
635    }
636
637    /// Checks whether the memory limit allows a write that would increase
638    /// usage by `estimated_increase` bytes. Attempts eviction if the
639    /// policy allows it. Returns `true` if the write can proceed.
640    ///
641    /// The comparison uses [`memory::effective_limit`] rather than the raw
642    /// configured maximum. This reserves headroom for allocator overhead
643    /// and fragmentation that our per-entry estimates can't account for,
644    /// preventing the OS from OOM-killing us before eviction triggers.
645    fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
646        if let Some(max) = self.config.max_memory {
647            let limit = memory::effective_limit(max);
648            while self.memory.used_bytes() + estimated_increase > limit {
649                match self.config.eviction_policy {
650                    EvictionPolicy::NoEviction => {
651                        self.oom_rejections += 1;
652                        // log first rejection, then every 1000th to avoid flooding
653                        if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000) {
654                            warn!(
655                                used_bytes = self.memory.used_bytes(),
656                                limit,
657                                requested = estimated_increase,
658                                total_rejections = self.oom_rejections,
659                                "OOM: write rejected (policy: noeviction)"
660                            );
661                        }
662                        return false;
663                    }
664                    EvictionPolicy::AllKeysLru => {
665                        if !self.try_evict() {
666                            self.oom_rejections += 1;
667                            if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000)
668                            {
669                                warn!(
670                                    used_bytes = self.memory.used_bytes(),
671                                    limit,
672                                    requested = estimated_increase,
673                                    total_rejections = self.oom_rejections,
674                                    "OOM: write rejected (eviction exhausted)"
675                                );
676                            }
677                            return false;
678                        }
679                    }
680                }
681            }
682        }
683        true
684    }
685
686    /// Removes a key. Returns `true` if the key existed (and wasn't expired).
687    ///
688    /// When a drop handle is set, large values are dropped on the
689    /// background thread instead of inline.
690    pub fn del(&mut self, key: &str) -> bool {
691        if self.remove_if_expired(key) {
692            return false;
693        }
694        if let Some(entry) = self.entries.remove(key) {
695            self.memory.remove(key, &entry.value);
696            self.decrement_expiry_if_set(&entry);
697            self.remove_version(key);
698            self.defer_drop(entry.value);
699            true
700        } else {
701            false
702        }
703    }
704
705    /// Removes a key like `del`, but always defers the value's destructor
706    /// to the background drop thread (when available). Semantically
707    /// identical to DEL — the key is gone immediately, memory is
708    /// accounted for immediately, but the actual deallocation happens
709    /// off the hot path.
710    pub fn unlink(&mut self, key: &str) -> bool {
711        if self.remove_if_expired(key) {
712            return false;
713        }
714        if let Some(entry) = self.entries.remove(key) {
715            self.memory.remove(key, &entry.value);
716            self.decrement_expiry_if_set(&entry);
717            self.remove_version(key);
718            // always defer for UNLINK, regardless of value size
719            if let Some(ref handle) = self.drop_handle {
720                handle.defer_value(entry.value);
721            }
722            true
723        } else {
724            false
725        }
726    }
727
728    /// Replaces the entries map with an empty one and resets memory
729    /// tracking. Returns the old entries so the caller can send them
730    /// to the background drop thread.
731    pub(crate) fn flush_async(&mut self) -> AHashMap<CompactString, Entry> {
732        let old = std::mem::take(&mut self.entries);
733        self.memory.reset();
734        self.expiry_count = 0;
735        self.versions.clear();
736        old
737    }
738
739    /// Returns `true` if the key exists and hasn't expired.
740    pub fn exists(&mut self, key: &str) -> bool {
741        if self.remove_if_expired(key) {
742            return false;
743        }
744        self.entries.contains_key(key)
745    }
746
747    /// Returns a random key from the keyspace, or `None` if empty.
748    ///
749    /// Uses reservoir sampling from the hash map iterator. Expired keys
750    /// are skipped (and lazily cleaned up with bounded retries).
751    pub fn random_key(&mut self) -> Option<String> {
752        // bounded retries in case we keep hitting expired keys
753        for _ in 0..5 {
754            let mut rng = rand::rng();
755            let key = self.entries.keys().choose(&mut rng)?.clone();
756
757            if self.remove_if_expired(&key) {
758                continue;
759            }
760            return Some(key.to_string());
761        }
762        None
763    }
764
765    /// Updates the last access time for a key. Returns `true` if the key exists.
766    pub fn touch(&mut self, key: &str) -> bool {
767        if self.remove_if_expired(key) {
768            return false;
769        }
770        match self.entries.get_mut(key) {
771            Some(entry) => {
772                entry.touch(self.track_access);
773                true
774            }
775            None => false,
776        }
777    }
778
779    /// Sorts elements from a list, set, or sorted set.
780    ///
781    /// Returns the sorted elements as byte strings, or an error if the
782    /// key holds the wrong type or numeric parsing fails.
783    pub fn sort(
784        &mut self,
785        key: &str,
786        desc: bool,
787        alpha: bool,
788        limit: Option<(i64, i64)>,
789    ) -> Result<Vec<Bytes>, &'static str> {
790        if self.remove_if_expired(key) {
791            return Ok(Vec::new());
792        }
793        let entry = match self.entries.get_mut(key) {
794            Some(e) => {
795                e.touch(self.track_access);
796                e
797            }
798            None => return Ok(Vec::new()),
799        };
800
801        // collect elements from the appropriate type
802        let mut items: Vec<Bytes> = match &entry.value {
803            Value::List(deq) => deq.iter().cloned().collect(),
804            Value::Set(set) => set.iter().map(|s| Bytes::from(s.clone())).collect(),
805            Value::SortedSet(zset) => zset
806                .iter()
807                .map(|(m, _)| Bytes::from(m.to_owned()))
808                .collect(),
809            _ => return Err(WRONGTYPE_MSG),
810        };
811
812        // sort
813        if alpha {
814            items.sort();
815            if desc {
816                items.reverse();
817            }
818        } else {
819            // numeric sort — parse all elements as f64
820            let mut parse_err = false;
821            items.sort_by(|a, b| {
822                let a_str = std::str::from_utf8(a).unwrap_or("");
823                let b_str = std::str::from_utf8(b).unwrap_or("");
824                let a_val = a_str.parse::<f64>().unwrap_or_else(|_| {
825                    parse_err = true;
826                    0.0
827                });
828                let b_val = b_str.parse::<f64>().unwrap_or_else(|_| {
829                    parse_err = true;
830                    0.0
831                });
832                if desc {
833                    b_val
834                        .partial_cmp(&a_val)
835                        .unwrap_or(std::cmp::Ordering::Equal)
836                } else {
837                    a_val
838                        .partial_cmp(&b_val)
839                        .unwrap_or(std::cmp::Ordering::Equal)
840                }
841            });
842            if parse_err {
843                return Err("ERR One or more scores can't be converted into double");
844            }
845        }
846
847        // apply limit
848        if let Some((offset, count)) = limit {
849            let offset = offset.max(0) as usize;
850            let count = count.max(0) as usize;
851            let end = offset.saturating_add(count).min(items.len());
852            if offset < items.len() {
853                items = items[offset..end].to_vec();
854            } else {
855                items.clear();
856            }
857        }
858
859        Ok(items)
860    }
861
862    /// Sets an expiration on an existing key. Returns `true` if the key
863    /// exists (and the TTL was set), `false` if the key doesn't exist.
864    pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
865        if self.remove_if_expired(key) {
866            return false;
867        }
868        match self.entries.get_mut(key) {
869            Some(entry) => {
870                if entry.expires_at_ms == 0 {
871                    self.expiry_count += 1;
872                }
873                entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
874                self.bump_version(key);
875                true
876            }
877            None => false,
878        }
879    }
880
881    /// Returns the TTL status for a key, following Redis semantics:
882    /// - `Seconds(n)` if the key has a TTL
883    /// - `NoExpiry` if the key exists without a TTL
884    /// - `NotFound` if the key doesn't exist
885    pub fn ttl(&mut self, key: &str) -> TtlResult {
886        if self.remove_if_expired(key) {
887            return TtlResult::NotFound;
888        }
889        match self.entries.get(key) {
890            Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
891                Some(secs) => TtlResult::Seconds(secs),
892                None => TtlResult::NoExpiry,
893            },
894            None => TtlResult::NotFound,
895        }
896    }
897
898    /// Removes the expiration from a key.
899    ///
900    /// Returns `true` if the key existed and had a timeout that was removed.
901    /// Returns `false` if the key doesn't exist or has no expiration.
902    pub fn persist(&mut self, key: &str) -> bool {
903        if self.remove_if_expired(key) {
904            return false;
905        }
906        match self.entries.get_mut(key) {
907            Some(entry) => {
908                if entry.expires_at_ms != 0 {
909                    entry.expires_at_ms = 0;
910                    self.expiry_count = self.expiry_count.saturating_sub(1);
911                    self.bump_version(key);
912                    true
913                } else {
914                    false
915                }
916            }
917            None => false,
918        }
919    }
920
921    /// Returns the TTL status for a key in milliseconds, following Redis semantics:
922    /// - `Milliseconds(n)` if the key has a TTL
923    /// - `NoExpiry` if the key exists without a TTL
924    /// - `NotFound` if the key doesn't exist
925    pub fn pttl(&mut self, key: &str) -> TtlResult {
926        if self.remove_if_expired(key) {
927            return TtlResult::NotFound;
928        }
929        match self.entries.get(key) {
930            Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
931                Some(ms) => TtlResult::Milliseconds(ms),
932                None => TtlResult::NoExpiry,
933            },
934            None => TtlResult::NotFound,
935        }
936    }
937
938    /// Sets an expiration on an existing key in milliseconds.
939    ///
940    /// Returns `true` if the key exists (and the TTL was set),
941    /// `false` if the key doesn't exist.
942    pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
943        if self.remove_if_expired(key) {
944            return false;
945        }
946        match self.entries.get_mut(key) {
947            Some(entry) => {
948                if entry.expires_at_ms == 0 {
949                    self.expiry_count += 1;
950                }
951                entry.expires_at_ms = time::now_ms().saturating_add(millis);
952                self.bump_version(key);
953                true
954            }
955            None => false,
956        }
957    }
958
959    /// Returns all keys matching a glob pattern.
960    ///
961    /// Warning: O(n) scan of the entire keyspace. Use SCAN for production
962    /// workloads with large key counts.
963    pub fn keys(&self, pattern: &str) -> Vec<String> {
964        let len = self.entries.len();
965        if len > 10_000 {
966            warn!(
967                key_count = len,
968                "KEYS on large keyspace, consider SCAN instead"
969            );
970        }
971        let compiled = GlobPattern::new(pattern);
972        self.entries
973            .iter()
974            .filter(|(_, entry)| !entry.is_expired())
975            .filter(|(key, _)| compiled.matches(key))
976            .map(|(key, _)| String::from(&**key))
977            .collect()
978    }
979
980    /// Counts live keys in this keyspace that hash to the given cluster slot.
981    ///
982    /// O(n) scan over all entries — same cost as KEYS.
983    pub fn count_keys_in_slot(&self, slot: u16) -> usize {
984        self.entries
985            .iter()
986            .filter(|(_, entry)| !entry.is_expired())
987            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
988            .count()
989    }
990
991    /// Returns up to `count` live keys that hash to the given cluster slot.
992    ///
993    /// O(n) scan over all entries — same cost as KEYS.
994    pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
995        self.entries
996            .iter()
997            .filter(|(_, entry)| !entry.is_expired())
998            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
999            .take(count)
1000            .map(|(key, _)| String::from(&**key))
1001            .collect()
1002    }
1003
1004    /// Renames a key to a new name. Returns an error if the source key
1005    /// doesn't exist. If the destination key already exists, it is overwritten.
1006    pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
1007        self.remove_if_expired(key);
1008        self.remove_if_expired(newkey);
1009
1010        let entry = match self.entries.remove(key) {
1011            Some(entry) => entry,
1012            None => return Err(RenameError::NoSuchKey),
1013        };
1014
1015        // update memory tracking for old key removal
1016        self.memory.remove(key, &entry.value);
1017        self.decrement_expiry_if_set(&entry);
1018
1019        // remove destination if it exists
1020        if let Some(old_dest) = self.entries.remove(newkey) {
1021            self.memory.remove(newkey, &old_dest.value);
1022            self.decrement_expiry_if_set(&old_dest);
1023        }
1024
1025        // re-insert with the new key name, preserving value and expiry
1026        self.memory.add(newkey, &entry.value);
1027        if entry.expires_at_ms != 0 {
1028            self.expiry_count += 1;
1029        }
1030        self.remove_version(key);
1031        self.entries.insert(CompactString::from(newkey), entry);
1032        self.bump_version(newkey);
1033        Ok(())
1034    }
1035
1036    /// Copies the value at `source` to `destination`. If `replace` is false and
1037    /// the destination already exists, returns `Ok(false)`. Returns `Ok(true)` on
1038    /// success.
1039    pub fn copy(&mut self, source: &str, dest: &str, replace: bool) -> Result<bool, CopyError> {
1040        self.remove_if_expired(source);
1041        self.remove_if_expired(dest);
1042
1043        let src_entry = match self.entries.get(source) {
1044            Some(e) => e,
1045            None => return Err(CopyError::NoSuchKey),
1046        };
1047
1048        // if destination exists and replace not set, return 0
1049        if !replace && self.entries.contains_key(dest) {
1050            return Ok(false);
1051        }
1052
1053        // clone value and expiry from source
1054        let cloned_value = src_entry.value.clone();
1055        let cloned_expire = if src_entry.expires_at_ms != 0 {
1056            Some(src_entry.expires_at_ms)
1057        } else {
1058            None
1059        };
1060
1061        // estimate memory for the new entry
1062        let new_size = memory::entry_size(dest, &cloned_value);
1063
1064        // if replacing, account for the old destination's size
1065        let old_dest_size = self
1066            .entries
1067            .get(dest)
1068            .map(|e| e.entry_size(dest))
1069            .unwrap_or(0);
1070        let net_increase = new_size.saturating_sub(old_dest_size);
1071        if !self.enforce_memory_limit(net_increase) {
1072            return Err(CopyError::OutOfMemory);
1073        }
1074
1075        // remove old destination if replacing
1076        if let Some(old_dest) = self.entries.remove(dest) {
1077            self.memory.remove(dest, &old_dest.value);
1078            self.decrement_expiry_if_set(&old_dest);
1079            self.defer_drop(old_dest.value);
1080        }
1081
1082        // insert the clone
1083        self.memory.add(dest, &cloned_value);
1084        let has_expiry = cloned_expire.is_some();
1085        if has_expiry {
1086            self.expiry_count += 1;
1087        }
1088        let mut entry = Entry::new(cloned_value, None);
1089        // preserve the source's absolute expiry timestamp
1090        if let Some(ts) = cloned_expire {
1091            entry.expires_at_ms = ts;
1092        }
1093        self.entries.insert(CompactString::from(dest), entry);
1094        self.bump_version(dest);
1095        Ok(true)
1096    }
1097
1098    /// Returns aggregated stats for this keyspace.
1099    ///
1100    /// All fields are tracked incrementally — this is O(1).
1101    pub fn stats(&self) -> KeyspaceStats {
1102        KeyspaceStats {
1103            key_count: self.memory.key_count(),
1104            used_bytes: self.memory.used_bytes(),
1105            keys_with_expiry: self.expiry_count,
1106            keys_expired: self.expired_total,
1107            keys_evicted: self.evicted_total,
1108            oom_rejections: self.oom_rejections,
1109        }
1110    }
1111
1112    /// Returns the number of live keys.
1113    pub fn len(&self) -> usize {
1114        self.entries.len()
1115    }
1116
1117    /// Removes all keys from the keyspace.
1118    pub fn clear(&mut self) {
1119        self.entries.clear();
1120        self.memory.reset();
1121        self.expiry_count = 0;
1122        self.versions.clear();
1123    }
1124
1125    /// Returns `true` if the keyspace has no entries.
1126    pub fn is_empty(&self) -> bool {
1127        self.entries.is_empty()
1128    }
1129
1130    /// Scans keys starting from a cursor position.
1131    ///
1132    /// Returns the next cursor (0 if scan complete) and a batch of keys.
1133    /// The `pattern` argument supports glob-style matching (`*`, `?`, `[abc]`).
1134    pub fn scan_keys(
1135        &self,
1136        cursor: u64,
1137        count: usize,
1138        pattern: Option<&str>,
1139    ) -> (u64, Vec<String>) {
1140        let mut keys = Vec::with_capacity(count);
1141        let mut position = 0u64;
1142        let target_count = if count == 0 { 10 } else { count };
1143
1144        let compiled = pattern.map(GlobPattern::new);
1145
1146        for (key, entry) in self.entries.iter() {
1147            // skip expired entries
1148            if entry.is_expired() {
1149                continue;
1150            }
1151
1152            // skip entries before cursor
1153            if position < cursor {
1154                position += 1;
1155                continue;
1156            }
1157
1158            // pattern matching
1159            if let Some(ref pat) = compiled {
1160                if !pat.matches(key) {
1161                    position += 1;
1162                    continue;
1163                }
1164            }
1165
1166            keys.push(String::from(&**key));
1167            position += 1;
1168
1169            if keys.len() >= target_count {
1170                // return position as next cursor
1171                return (position, keys);
1172            }
1173        }
1174
1175        // scan complete
1176        (0, keys)
1177    }
1178
1179    /// Returns the value and remaining TTL in milliseconds for a single key.
1180    ///
1181    /// Returns `None` if the key doesn't exist or is expired. TTL is -1 for
1182    /// entries with no expiration. Used by MIGRATE/DUMP to serialize a key
1183    /// for transfer to another node.
1184    pub fn dump(&mut self, key: &str) -> Option<(&Value, i64)> {
1185        if self.remove_if_expired(key) {
1186            return None;
1187        }
1188        let entry = self.entries.get(key)?;
1189        let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1190            Some(ms) => ms.min(i64::MAX as u64) as i64,
1191            None => -1,
1192        };
1193        Some((&entry.value, ttl_ms))
1194    }
1195
1196    /// Iterates over all live (non-expired) entries, yielding the key, a
1197    /// clone of the value, and the remaining TTL in milliseconds (-1 for
1198    /// entries with no expiration). Used by snapshot and AOF rewrite.
1199    pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
1200        self.entries.iter().filter_map(move |(key, entry)| {
1201            if entry.is_expired() {
1202                return None;
1203            }
1204            let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1205                Some(ms) => ms.min(i64::MAX as u64) as i64,
1206                None => -1,
1207            };
1208            Some((&**key, &entry.value, ttl_ms))
1209        })
1210    }
1211
1212    /// Restores an entry during recovery, bypassing memory limits.
1213    ///
1214    /// `ttl` is the remaining time-to-live. If `None`, the key has no expiry.
1215    /// This is used only during shard startup when loading from
1216    /// snapshot/AOF — normal writes should go through `set()`.
1217    pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
1218        let has_expiry = ttl.is_some();
1219
1220        // if replacing an existing entry, adjust memory tracking
1221        if let Some(old) = self.entries.get(key.as_str()) {
1222            self.memory.replace(&key, &old.value, &value);
1223            self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
1224        } else {
1225            self.memory.add(&key, &value);
1226            if has_expiry {
1227                self.expiry_count += 1;
1228            }
1229        }
1230
1231        let entry = Entry::new(value, ttl);
1232        self.entries.insert(CompactString::from(key.clone()), entry);
1233        self.bump_version(&key);
1234    }
1235
1236    /// Randomly samples up to `count` keys and removes any that have expired.
1237    ///
1238    /// Returns the number of keys actually removed. Used by the active
1239    /// expiration cycle to clean up keys that no one is reading.
1240    pub fn expire_sample(&mut self, count: usize) -> usize {
1241        if self.entries.is_empty() {
1242            return 0;
1243        }
1244
1245        let mut rng = rand::rng();
1246
1247        let keys_to_check: Vec<String> = self
1248            .entries
1249            .keys()
1250            .choose_multiple(&mut rng, count)
1251            .into_iter()
1252            .map(|k| String::from(&**k))
1253            .collect();
1254
1255        let mut removed = 0;
1256        for key in &keys_to_check {
1257            if self.remove_if_expired(key) {
1258                removed += 1;
1259            }
1260        }
1261        removed
1262    }
1263
1264    /// Removes a key that is already known to be expired. Used by
1265    /// fused lookup paths that check expiry inline via `get_mut()` and
1266    /// need a second probe only on the rare expired path.
1267    fn remove_expired_entry(&mut self, key: &str) {
1268        if let Some(entry) = self.entries.remove(key) {
1269            self.memory.remove(key, &entry.value);
1270            self.decrement_expiry_if_set(&entry);
1271            self.expired_total += 1;
1272            self.remove_version(key);
1273            self.defer_drop(entry.value);
1274        }
1275    }
1276
1277    /// Checks if a key is expired and removes it if so. Returns `true`
1278    /// if the key was removed (or didn't exist).
1279    fn remove_if_expired(&mut self, key: &str) -> bool {
1280        let expired = self
1281            .entries
1282            .get(key)
1283            .map(|e| e.is_expired())
1284            .unwrap_or(false);
1285
1286        if expired {
1287            if let Some(entry) = self.entries.remove(key) {
1288                self.memory.remove(key, &entry.value);
1289                self.decrement_expiry_if_set(&entry);
1290                self.expired_total += 1;
1291                self.remove_version(key);
1292                self.defer_drop(entry.value);
1293            }
1294        }
1295        expired
1296    }
1297
1298    /// Returns a mutable reference to the entry for `key`, or `None` if the
1299    /// key doesn't exist or has expired.
1300    ///
1301    /// Combines the three steps that almost every read operation repeats:
1302    /// 1. Remove the key if it has expired (lazy expiration).
1303    /// 2. Look up the entry in the map.
1304    /// 3. Touch the entry to update its last-access timestamp for LRU.
1305    ///
1306    /// Callers still need to match on the entry's value type and handle
1307    /// `WrongType` themselves — this helper just eliminates the common
1308    /// expiry + lookup + touch boilerplate.
1309    fn get_live_entry(&mut self, key: &str) -> Option<&mut Entry> {
1310        self.remove_if_expired(key);
1311        let entry = self.entries.get_mut(key)?;
1312        entry.touch(self.track_access);
1313        Some(entry)
1314    }
1315
1316    /// Sends a value to the background drop thread if one is configured
1317    /// and the value is large enough to justify the overhead.
1318    fn defer_drop(&self, value: Value) {
1319        if let Some(ref handle) = self.drop_handle {
1320            handle.defer_value(value);
1321        }
1322    }
1323}
1324
1325impl Default for Keyspace {
1326    fn default() -> Self {
1327        Self::new()
1328    }
1329}
1330
1331/// Formats a float value matching Redis behavior.
1332///
1333/// Uses up to 17 significant digits and strips unnecessary trailing zeros,
1334/// but always keeps at least one decimal place for non-integer results.
1335pub(crate) fn format_float(val: f64) -> String {
1336    if val == 0.0 {
1337        return "0".into();
1338    }
1339    // Use enough precision to round-trip
1340    let s = format!("{:.17e}", val);
1341    // Parse back to get the clean representation
1342    let reparsed: f64 = s.parse().unwrap_or(val);
1343    // If it's a whole number that fits in i64, format without decimals
1344    if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
1345        format!("{}", reparsed as i64)
1346    } else {
1347        // Use ryu-like formatting via Display which strips trailing zeros
1348        let formatted = format!("{}", reparsed);
1349        formatted
1350    }
1351}
1352
1353/// Glob-style pattern matching for SCAN's MATCH option.
1354///
1355/// Supports:
1356/// - `*` matches any sequence of characters (including empty)
1357/// - `?` matches exactly one character
1358/// - `[abc]` matches one character from the set
1359/// - `[^abc]` or `[!abc]` matches one character NOT in the set
1360///
1361/// Uses an iterative two-pointer algorithm with backtracking for O(n*m)
1362/// worst-case performance, where n is pattern length and m is text length.
1363///
1364/// Prefer [`GlobPattern::new`] + [`GlobPattern::matches`] when matching
1365/// the same pattern against many strings (KEYS, SCAN) to avoid
1366/// re-collecting the pattern chars on every call.
1367pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
1368    let pat: Vec<char> = pattern.chars().collect();
1369    glob_match_compiled(&pat, text)
1370}
1371
1372/// Pre-compiled glob pattern that avoids re-allocating pattern chars
1373/// on every match call. Use for KEYS/SCAN where the same pattern is
1374/// tested against every key in the keyspace.
1375pub(crate) struct GlobPattern {
1376    chars: Vec<char>,
1377}
1378
1379impl GlobPattern {
1380    pub(crate) fn new(pattern: &str) -> Self {
1381        Self {
1382            chars: pattern.chars().collect(),
1383        }
1384    }
1385
1386    pub(crate) fn matches(&self, text: &str) -> bool {
1387        glob_match_compiled(&self.chars, text)
1388    }
1389}
1390
1391/// Core glob matching against a pre-compiled pattern char slice.
1392fn glob_match_compiled(pat: &[char], text: &str) -> bool {
1393    let txt: Vec<char> = text.chars().collect();
1394
1395    let mut pi = 0; // pattern index
1396    let mut ti = 0; // text index
1397
1398    // backtracking state for the most recent '*'
1399    let mut star_pi: Option<usize> = None;
1400    let mut star_ti: usize = 0;
1401
1402    while ti < txt.len() || pi < pat.len() {
1403        if pi < pat.len() {
1404            match pat[pi] {
1405                '*' => {
1406                    // record star position and try matching zero chars first
1407                    star_pi = Some(pi);
1408                    star_ti = ti;
1409                    pi += 1;
1410                    continue;
1411                }
1412                '?' if ti < txt.len() => {
1413                    pi += 1;
1414                    ti += 1;
1415                    continue;
1416                }
1417                '[' if ti < txt.len() => {
1418                    // parse character class
1419                    let tc = txt[ti];
1420                    let mut j = pi + 1;
1421                    let mut negated = false;
1422                    let mut matched = false;
1423
1424                    if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1425                        negated = true;
1426                        j += 1;
1427                    }
1428
1429                    while j < pat.len() && pat[j] != ']' {
1430                        if pat[j] == tc {
1431                            matched = true;
1432                        }
1433                        j += 1;
1434                    }
1435
1436                    if negated {
1437                        matched = !matched;
1438                    }
1439
1440                    if matched && j < pat.len() {
1441                        pi = j + 1; // skip past ']'
1442                        ti += 1;
1443                        continue;
1444                    }
1445                    // fall through to backtrack
1446                }
1447                c if ti < txt.len() && c == txt[ti] => {
1448                    pi += 1;
1449                    ti += 1;
1450                    continue;
1451                }
1452                _ => {}
1453            }
1454        }
1455
1456        // mismatch or end of pattern — try backtracking to last '*'
1457        if let Some(sp) = star_pi {
1458            pi = sp + 1;
1459            star_ti += 1;
1460            ti = star_ti;
1461            if ti > txt.len() {
1462                return false;
1463            }
1464        } else {
1465            return false;
1466        }
1467    }
1468
1469    // skip trailing '*' in pattern
1470    while pi < pat.len() && pat[pi] == '*' {
1471        pi += 1;
1472    }
1473
1474    pi == pat.len()
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479    use super::*;
1480    use std::thread;
1481
1482    #[test]
1483    fn del_existing() {
1484        let mut ks = Keyspace::new();
1485        ks.set("key".into(), Bytes::from("val"), None, false, false);
1486        assert!(ks.del("key"));
1487        assert_eq!(ks.get("key").unwrap(), None);
1488    }
1489
1490    #[test]
1491    fn del_missing() {
1492        let mut ks = Keyspace::new();
1493        assert!(!ks.del("nope"));
1494    }
1495
1496    #[test]
1497    fn exists_present_and_absent() {
1498        let mut ks = Keyspace::new();
1499        ks.set("yes".into(), Bytes::from("here"), None, false, false);
1500        assert!(ks.exists("yes"));
1501        assert!(!ks.exists("no"));
1502    }
1503
1504    #[test]
1505    fn ttl_no_expiry() {
1506        let mut ks = Keyspace::new();
1507        ks.set("key".into(), Bytes::from("val"), None, false, false);
1508        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1509    }
1510
1511    #[test]
1512    fn ttl_not_found() {
1513        let mut ks = Keyspace::new();
1514        assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1515    }
1516
1517    #[test]
1518    fn ttl_with_expiry() {
1519        let mut ks = Keyspace::new();
1520        ks.set(
1521            "key".into(),
1522            Bytes::from("val"),
1523            Some(Duration::from_secs(100)),
1524            false,
1525            false,
1526        );
1527        match ks.ttl("key") {
1528            TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
1529            other => panic!("expected Seconds, got {other:?}"),
1530        }
1531    }
1532
1533    #[test]
1534    fn ttl_expired_key() {
1535        let mut ks = Keyspace::new();
1536        ks.set(
1537            "temp".into(),
1538            Bytes::from("val"),
1539            Some(Duration::from_millis(10)),
1540            false,
1541            false,
1542        );
1543        thread::sleep(Duration::from_millis(30));
1544        assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1545    }
1546
1547    #[test]
1548    fn expire_existing_key() {
1549        let mut ks = Keyspace::new();
1550        ks.set("key".into(), Bytes::from("val"), None, false, false);
1551        assert!(ks.expire("key", 60));
1552        match ks.ttl("key") {
1553            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
1554            other => panic!("expected Seconds, got {other:?}"),
1555        }
1556    }
1557
1558    #[test]
1559    fn expire_missing_key() {
1560        let mut ks = Keyspace::new();
1561        assert!(!ks.expire("nope", 60));
1562    }
1563
1564    #[test]
1565    fn del_expired_key_returns_false() {
1566        let mut ks = Keyspace::new();
1567        ks.set(
1568            "temp".into(),
1569            Bytes::from("val"),
1570            Some(Duration::from_millis(10)),
1571            false,
1572            false,
1573        );
1574        thread::sleep(Duration::from_millis(30));
1575        // key is expired, del should return false (not found)
1576        assert!(!ks.del("temp"));
1577    }
1578
1579    // -- memory tracking tests --
1580
1581    #[test]
1582    fn memory_increases_on_set() {
1583        let mut ks = Keyspace::new();
1584        assert_eq!(ks.stats().used_bytes, 0);
1585        ks.set("key".into(), Bytes::from("value"), None, false, false);
1586        assert!(ks.stats().used_bytes > 0);
1587        assert_eq!(ks.stats().key_count, 1);
1588    }
1589
1590    #[test]
1591    fn memory_decreases_on_del() {
1592        let mut ks = Keyspace::new();
1593        ks.set("key".into(), Bytes::from("value"), None, false, false);
1594        let after_set = ks.stats().used_bytes;
1595        ks.del("key");
1596        assert_eq!(ks.stats().used_bytes, 0);
1597        assert!(after_set > 0);
1598    }
1599
1600    #[test]
1601    fn memory_adjusts_on_overwrite() {
1602        let mut ks = Keyspace::new();
1603        ks.set("key".into(), Bytes::from("short"), None, false, false);
1604        let small = ks.stats().used_bytes;
1605
1606        ks.set(
1607            "key".into(),
1608            Bytes::from("a much longer value"),
1609            None,
1610            false,
1611            false,
1612        );
1613        let large = ks.stats().used_bytes;
1614
1615        assert!(large > small);
1616        assert_eq!(ks.stats().key_count, 1);
1617    }
1618
1619    #[test]
1620    fn memory_decreases_on_expired_removal() {
1621        let mut ks = Keyspace::new();
1622        ks.set(
1623            "temp".into(),
1624            Bytes::from("data"),
1625            Some(Duration::from_millis(10)),
1626            false,
1627            false,
1628        );
1629        assert!(ks.stats().used_bytes > 0);
1630        thread::sleep(Duration::from_millis(30));
1631        // trigger lazy expiration
1632        let _ = ks.get("temp");
1633        assert_eq!(ks.stats().used_bytes, 0);
1634        assert_eq!(ks.stats().key_count, 0);
1635    }
1636
1637    #[test]
1638    fn stats_tracks_expiry_count() {
1639        let mut ks = Keyspace::new();
1640        ks.set("a".into(), Bytes::from("1"), None, false, false);
1641        ks.set(
1642            "b".into(),
1643            Bytes::from("2"),
1644            Some(Duration::from_secs(100)),
1645            false,
1646            false,
1647        );
1648        ks.set(
1649            "c".into(),
1650            Bytes::from("3"),
1651            Some(Duration::from_secs(200)),
1652            false,
1653            false,
1654        );
1655
1656        let stats = ks.stats();
1657        assert_eq!(stats.key_count, 3);
1658        assert_eq!(stats.keys_with_expiry, 2);
1659    }
1660
1661    // -- eviction tests --
1662
1663    #[test]
1664    fn noeviction_returns_oom_when_full() {
1665        // one entry with key "a" + value "val" = 1 + 3 + 104 = 108 bytes
1666        // set limit so one entry fits but two don't
1667        let config = ShardConfig {
1668            max_memory: Some(130),
1669            eviction_policy: EvictionPolicy::NoEviction,
1670            ..ShardConfig::default()
1671        };
1672        let mut ks = Keyspace::with_config(config);
1673
1674        // first key should fit
1675        assert_eq!(
1676            ks.set("a".into(), Bytes::from("val"), None, false, false),
1677            SetResult::Ok
1678        );
1679
1680        // second key should push us over the limit
1681        let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1682        assert_eq!(result, SetResult::OutOfMemory);
1683
1684        // original key should still be there
1685        assert!(ks.exists("a"));
1686    }
1687
1688    #[test]
1689    fn lru_eviction_makes_room() {
1690        let config = ShardConfig {
1691            max_memory: Some(130),
1692            eviction_policy: EvictionPolicy::AllKeysLru,
1693            ..ShardConfig::default()
1694        };
1695        let mut ks = Keyspace::with_config(config);
1696
1697        assert_eq!(
1698            ks.set("a".into(), Bytes::from("val"), None, false, false),
1699            SetResult::Ok
1700        );
1701
1702        // this should trigger eviction of "a" to make room
1703        assert_eq!(
1704            ks.set("b".into(), Bytes::from("val"), None, false, false),
1705            SetResult::Ok
1706        );
1707
1708        // "a" should have been evicted
1709        assert!(!ks.exists("a"));
1710        assert!(ks.exists("b"));
1711    }
1712
1713    #[test]
1714    fn safety_margin_rejects_near_raw_limit() {
1715        // one entry = 1 (key) + 3 (val) + ENTRY_OVERHEAD (104) = 108 bytes.
1716        // configure max_memory = 120. effective limit = 120 * 90 / 100 = 108.
1717        // the entry fills exactly the effective limit, so a second entry should
1718        // be rejected even though the raw limit has 12 bytes of headroom.
1719        let config = ShardConfig {
1720            max_memory: Some(120),
1721            eviction_policy: EvictionPolicy::NoEviction,
1722            ..ShardConfig::default()
1723        };
1724        let mut ks = Keyspace::with_config(config);
1725
1726        assert_eq!(
1727            ks.set("a".into(), Bytes::from("val"), None, false, false),
1728            SetResult::Ok
1729        );
1730
1731        let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1732        assert_eq!(result, SetResult::OutOfMemory);
1733    }
1734
1735    #[test]
1736    fn overwrite_same_size_succeeds_at_limit() {
1737        let config = ShardConfig {
1738            max_memory: Some(130),
1739            eviction_policy: EvictionPolicy::NoEviction,
1740            ..ShardConfig::default()
1741        };
1742        let mut ks = Keyspace::with_config(config);
1743
1744        assert_eq!(
1745            ks.set("a".into(), Bytes::from("val"), None, false, false),
1746            SetResult::Ok
1747        );
1748
1749        // overwriting with same-size value should succeed — no net increase
1750        assert_eq!(
1751            ks.set("a".into(), Bytes::from("new"), None, false, false),
1752            SetResult::Ok
1753        );
1754        assert_eq!(
1755            ks.get("a").unwrap(),
1756            Some(Value::String(Bytes::from("new")))
1757        );
1758    }
1759
1760    #[test]
1761    fn overwrite_larger_value_respects_limit() {
1762        let config = ShardConfig {
1763            max_memory: Some(130),
1764            eviction_policy: EvictionPolicy::NoEviction,
1765            ..ShardConfig::default()
1766        };
1767        let mut ks = Keyspace::with_config(config);
1768
1769        assert_eq!(
1770            ks.set("a".into(), Bytes::from("val"), None, false, false),
1771            SetResult::Ok
1772        );
1773
1774        // overwriting with a much larger value should fail if it exceeds limit
1775        let big_value = "x".repeat(200);
1776        let result = ks.set("a".into(), Bytes::from(big_value), None, false, false);
1777        assert_eq!(result, SetResult::OutOfMemory);
1778
1779        // original value should still be intact
1780        assert_eq!(
1781            ks.get("a").unwrap(),
1782            Some(Value::String(Bytes::from("val")))
1783        );
1784    }
1785
1786    // -- iter_entries tests --
1787
1788    #[test]
1789    fn iter_entries_returns_live_entries() {
1790        let mut ks = Keyspace::new();
1791        ks.set("a".into(), Bytes::from("1"), None, false, false);
1792        ks.set(
1793            "b".into(),
1794            Bytes::from("2"),
1795            Some(Duration::from_secs(100)),
1796            false,
1797            false,
1798        );
1799
1800        let entries: Vec<_> = ks.iter_entries().collect();
1801        assert_eq!(entries.len(), 2);
1802    }
1803
1804    #[test]
1805    fn iter_entries_skips_expired() {
1806        let mut ks = Keyspace::new();
1807        ks.set(
1808            "dead".into(),
1809            Bytes::from("gone"),
1810            Some(Duration::from_millis(1)),
1811            false,
1812            false,
1813        );
1814        ks.set("alive".into(), Bytes::from("here"), None, false, false);
1815        thread::sleep(Duration::from_millis(10));
1816
1817        let entries: Vec<_> = ks.iter_entries().collect();
1818        assert_eq!(entries.len(), 1);
1819        assert_eq!(entries[0].0, "alive");
1820    }
1821
1822    #[test]
1823    fn iter_entries_ttl_for_no_expiry() {
1824        let mut ks = Keyspace::new();
1825        ks.set("permanent".into(), Bytes::from("val"), None, false, false);
1826
1827        let entries: Vec<_> = ks.iter_entries().collect();
1828        assert_eq!(entries[0].2, -1);
1829    }
1830
1831    // -- restore tests --
1832
1833    #[test]
1834    fn restore_adds_entry() {
1835        let mut ks = Keyspace::new();
1836        ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1837        assert_eq!(
1838            ks.get("restored").unwrap(),
1839            Some(Value::String(Bytes::from("data")))
1840        );
1841        assert_eq!(ks.stats().key_count, 1);
1842    }
1843
1844    #[test]
1845    fn restore_with_zero_ttl_expires_immediately() {
1846        let mut ks = Keyspace::new();
1847        // TTL of 0 should create entry that expires immediately
1848        ks.restore(
1849            "short-lived".into(),
1850            Value::String(Bytes::from("data")),
1851            Some(Duration::from_millis(1)),
1852        );
1853        // Entry exists but will be expired on access
1854        std::thread::sleep(Duration::from_millis(5));
1855        assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
1856    }
1857
1858    #[test]
1859    fn restore_overwrites_existing() {
1860        let mut ks = Keyspace::new();
1861        ks.set("key".into(), Bytes::from("old"), None, false, false);
1862        ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1863        assert_eq!(
1864            ks.get("key").unwrap(),
1865            Some(Value::String(Bytes::from("new")))
1866        );
1867        assert_eq!(ks.stats().key_count, 1);
1868    }
1869
1870    #[test]
1871    fn restore_bypasses_memory_limit() {
1872        let config = ShardConfig {
1873            max_memory: Some(50), // very small
1874            eviction_policy: EvictionPolicy::NoEviction,
1875            ..ShardConfig::default()
1876        };
1877        let mut ks = Keyspace::with_config(config);
1878
1879        // normal set would fail due to memory limit
1880        ks.restore(
1881            "big".into(),
1882            Value::String(Bytes::from("x".repeat(200))),
1883            None,
1884        );
1885        assert_eq!(ks.stats().key_count, 1);
1886    }
1887
1888    #[test]
1889    fn no_limit_never_rejects() {
1890        // default config has no memory limit
1891        let mut ks = Keyspace::new();
1892        for i in 0..100 {
1893            assert_eq!(
1894                ks.set(format!("key:{i}"), Bytes::from("value"), None, false, false),
1895                SetResult::Ok
1896            );
1897        }
1898        assert_eq!(ks.len(), 100);
1899    }
1900
1901    #[test]
1902    fn clear_removes_all_keys() {
1903        let mut ks = Keyspace::new();
1904        ks.set("a".into(), Bytes::from("1"), None, false, false);
1905        ks.set(
1906            "b".into(),
1907            Bytes::from("2"),
1908            Some(Duration::from_secs(60)),
1909            false,
1910            false,
1911        );
1912        ks.lpush("list", &[Bytes::from("x")]).unwrap();
1913
1914        assert_eq!(ks.len(), 3);
1915        assert!(ks.stats().used_bytes > 0);
1916        assert_eq!(ks.stats().keys_with_expiry, 1);
1917
1918        ks.clear();
1919
1920        assert_eq!(ks.len(), 0);
1921        assert!(ks.is_empty());
1922        assert_eq!(ks.stats().used_bytes, 0);
1923        assert_eq!(ks.stats().keys_with_expiry, 0);
1924    }
1925
1926    // --- scan ---
1927
1928    #[test]
1929    fn scan_returns_keys() {
1930        let mut ks = Keyspace::new();
1931        ks.set("key1".into(), Bytes::from("a"), None, false, false);
1932        ks.set("key2".into(), Bytes::from("b"), None, false, false);
1933        ks.set("key3".into(), Bytes::from("c"), None, false, false);
1934
1935        let (cursor, keys) = ks.scan_keys(0, 10, None);
1936        assert_eq!(cursor, 0); // complete in one pass
1937        assert_eq!(keys.len(), 3);
1938    }
1939
1940    #[test]
1941    fn scan_empty_keyspace() {
1942        let ks = Keyspace::new();
1943        let (cursor, keys) = ks.scan_keys(0, 10, None);
1944        assert_eq!(cursor, 0);
1945        assert!(keys.is_empty());
1946    }
1947
1948    #[test]
1949    fn scan_with_pattern() {
1950        let mut ks = Keyspace::new();
1951        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
1952        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
1953        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
1954
1955        let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
1956        assert_eq!(cursor, 0);
1957        assert_eq!(keys.len(), 2);
1958        for k in &keys {
1959            assert!(k.starts_with("user:"));
1960        }
1961    }
1962
1963    #[test]
1964    fn scan_with_count_limit() {
1965        let mut ks = Keyspace::new();
1966        for i in 0..10 {
1967            ks.set(format!("k{i}"), Bytes::from("v"), None, false, false);
1968        }
1969
1970        // first batch
1971        let (cursor, keys) = ks.scan_keys(0, 3, None);
1972        assert!(!keys.is_empty());
1973        assert!(keys.len() <= 3);
1974
1975        // if there are more keys, cursor should be non-zero
1976        if cursor != 0 {
1977            let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
1978            assert!(!keys2.is_empty());
1979            // continue until complete
1980            let _ = (cursor2, keys2);
1981        }
1982    }
1983
1984    #[test]
1985    fn scan_skips_expired_keys() {
1986        let mut ks = Keyspace::new();
1987        ks.set("live".into(), Bytes::from("a"), None, false, false);
1988        ks.set(
1989            "expired".into(),
1990            Bytes::from("b"),
1991            Some(Duration::from_millis(1)),
1992            false,
1993            false,
1994        );
1995
1996        std::thread::sleep(Duration::from_millis(5));
1997
1998        let (_, keys) = ks.scan_keys(0, 10, None);
1999        assert_eq!(keys.len(), 1);
2000        assert_eq!(keys[0], "live");
2001    }
2002
2003    #[test]
2004    fn glob_match_star() {
2005        assert!(super::glob_match("user:*", "user:123"));
2006        assert!(super::glob_match("user:*", "user:"));
2007        assert!(super::glob_match("*:data", "foo:data"));
2008        assert!(!super::glob_match("user:*", "item:123"));
2009    }
2010
2011    #[test]
2012    fn glob_match_question() {
2013        assert!(super::glob_match("key?", "key1"));
2014        assert!(super::glob_match("key?", "keya"));
2015        assert!(!super::glob_match("key?", "key"));
2016        assert!(!super::glob_match("key?", "key12"));
2017    }
2018
2019    #[test]
2020    fn glob_match_brackets() {
2021        assert!(super::glob_match("key[abc]", "keya"));
2022        assert!(super::glob_match("key[abc]", "keyb"));
2023        assert!(!super::glob_match("key[abc]", "keyd"));
2024    }
2025
2026    #[test]
2027    fn glob_match_literal() {
2028        assert!(super::glob_match("exact", "exact"));
2029        assert!(!super::glob_match("exact", "exactnot"));
2030        assert!(!super::glob_match("exact", "notexact"));
2031    }
2032
2033    // --- persist/pttl/pexpire ---
2034
2035    #[test]
2036    fn persist_removes_expiry() {
2037        let mut ks = Keyspace::new();
2038        ks.set(
2039            "key".into(),
2040            Bytes::from("val"),
2041            Some(Duration::from_secs(60)),
2042            false,
2043            false,
2044        );
2045        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2046
2047        assert!(ks.persist("key"));
2048        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2049        assert_eq!(ks.stats().keys_with_expiry, 0);
2050    }
2051
2052    #[test]
2053    fn persist_returns_false_without_expiry() {
2054        let mut ks = Keyspace::new();
2055        ks.set("key".into(), Bytes::from("val"), None, false, false);
2056        assert!(!ks.persist("key"));
2057    }
2058
2059    #[test]
2060    fn persist_returns_false_for_missing_key() {
2061        let mut ks = Keyspace::new();
2062        assert!(!ks.persist("missing"));
2063    }
2064
2065    #[test]
2066    fn pttl_returns_milliseconds() {
2067        let mut ks = Keyspace::new();
2068        ks.set(
2069            "key".into(),
2070            Bytes::from("val"),
2071            Some(Duration::from_secs(60)),
2072            false,
2073            false,
2074        );
2075        match ks.pttl("key") {
2076            TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2077            other => panic!("expected Milliseconds, got {other:?}"),
2078        }
2079    }
2080
2081    #[test]
2082    fn pttl_no_expiry() {
2083        let mut ks = Keyspace::new();
2084        ks.set("key".into(), Bytes::from("val"), None, false, false);
2085        assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2086    }
2087
2088    #[test]
2089    fn pttl_not_found() {
2090        let mut ks = Keyspace::new();
2091        assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2092    }
2093
2094    #[test]
2095    fn pexpire_sets_ttl_in_millis() {
2096        let mut ks = Keyspace::new();
2097        ks.set("key".into(), Bytes::from("val"), None, false, false);
2098        assert!(ks.pexpire("key", 5000));
2099        match ks.pttl("key") {
2100            TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2101            other => panic!("expected Milliseconds, got {other:?}"),
2102        }
2103        assert_eq!(ks.stats().keys_with_expiry, 1);
2104    }
2105
2106    #[test]
2107    fn pexpire_missing_key_returns_false() {
2108        let mut ks = Keyspace::new();
2109        assert!(!ks.pexpire("missing", 5000));
2110    }
2111
2112    #[test]
2113    fn pexpire_overwrites_existing_ttl() {
2114        let mut ks = Keyspace::new();
2115        ks.set(
2116            "key".into(),
2117            Bytes::from("val"),
2118            Some(Duration::from_secs(60)),
2119            false,
2120            false,
2121        );
2122        assert!(ks.pexpire("key", 500));
2123        match ks.pttl("key") {
2124            TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2125            other => panic!("expected Milliseconds, got {other:?}"),
2126        }
2127        // expiry count shouldn't double-count
2128        assert_eq!(ks.stats().keys_with_expiry, 1);
2129    }
2130
2131    // --- keys tests ---
2132
2133    #[test]
2134    fn keys_match_all() {
2135        let mut ks = Keyspace::new();
2136        ks.set("a".into(), Bytes::from("1"), None, false, false);
2137        ks.set("b".into(), Bytes::from("2"), None, false, false);
2138        ks.set("c".into(), Bytes::from("3"), None, false, false);
2139        let mut result = ks.keys("*");
2140        result.sort();
2141        assert_eq!(result, vec!["a", "b", "c"]);
2142    }
2143
2144    #[test]
2145    fn keys_with_pattern() {
2146        let mut ks = Keyspace::new();
2147        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2148        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2149        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2150        let mut result = ks.keys("user:*");
2151        result.sort();
2152        assert_eq!(result, vec!["user:1", "user:2"]);
2153    }
2154
2155    #[test]
2156    fn keys_skips_expired() {
2157        let mut ks = Keyspace::new();
2158        ks.set("live".into(), Bytes::from("a"), None, false, false);
2159        ks.set(
2160            "dead".into(),
2161            Bytes::from("b"),
2162            Some(Duration::from_millis(1)),
2163            false,
2164            false,
2165        );
2166        thread::sleep(Duration::from_millis(5));
2167        let result = ks.keys("*");
2168        assert_eq!(result, vec!["live"]);
2169    }
2170
2171    #[test]
2172    fn keys_empty_keyspace() {
2173        let ks = Keyspace::new();
2174        assert!(ks.keys("*").is_empty());
2175    }
2176
2177    // --- rename tests ---
2178
2179    #[test]
2180    fn rename_basic() {
2181        let mut ks = Keyspace::new();
2182        ks.set("old".into(), Bytes::from("value"), None, false, false);
2183        ks.rename("old", "new").unwrap();
2184        assert!(!ks.exists("old"));
2185        assert_eq!(
2186            ks.get("new").unwrap(),
2187            Some(Value::String(Bytes::from("value")))
2188        );
2189    }
2190
2191    #[test]
2192    fn rename_preserves_expiry() {
2193        let mut ks = Keyspace::new();
2194        ks.set(
2195            "old".into(),
2196            Bytes::from("val"),
2197            Some(Duration::from_secs(60)),
2198            false,
2199            false,
2200        );
2201        ks.rename("old", "new").unwrap();
2202        match ks.ttl("new") {
2203            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2204            other => panic!("expected TTL preserved, got {other:?}"),
2205        }
2206    }
2207
2208    #[test]
2209    fn rename_overwrites_destination() {
2210        let mut ks = Keyspace::new();
2211        ks.set("src".into(), Bytes::from("new_val"), None, false, false);
2212        ks.set("dst".into(), Bytes::from("old_val"), None, false, false);
2213        ks.rename("src", "dst").unwrap();
2214        assert!(!ks.exists("src"));
2215        assert_eq!(
2216            ks.get("dst").unwrap(),
2217            Some(Value::String(Bytes::from("new_val")))
2218        );
2219        assert_eq!(ks.len(), 1);
2220    }
2221
2222    #[test]
2223    fn rename_missing_key_returns_error() {
2224        let mut ks = Keyspace::new();
2225        let err = ks.rename("missing", "new").unwrap_err();
2226        assert_eq!(err, RenameError::NoSuchKey);
2227    }
2228
2229    #[test]
2230    fn rename_same_key() {
2231        let mut ks = Keyspace::new();
2232        ks.set("key".into(), Bytes::from("val"), None, false, false);
2233        // renaming to itself should succeed (Redis behavior)
2234        ks.rename("key", "key").unwrap();
2235        assert_eq!(
2236            ks.get("key").unwrap(),
2237            Some(Value::String(Bytes::from("val")))
2238        );
2239    }
2240
2241    #[test]
2242    fn rename_tracks_memory() {
2243        let mut ks = Keyspace::new();
2244        ks.set("old".into(), Bytes::from("value"), None, false, false);
2245        let before = ks.stats().used_bytes;
2246        ks.rename("old", "new").unwrap();
2247        let after = ks.stats().used_bytes;
2248        // same key length, so memory should be the same
2249        assert_eq!(before, after);
2250        assert_eq!(ks.stats().key_count, 1);
2251    }
2252
2253    #[test]
2254    fn zero_ttl_expires_immediately() {
2255        let mut ks = Keyspace::new();
2256        ks.set(
2257            "key".into(),
2258            Bytes::from("val"),
2259            Some(Duration::ZERO),
2260            false,
2261            false,
2262        );
2263
2264        // key should be expired immediately
2265        std::thread::sleep(Duration::from_millis(1));
2266        assert!(ks.get("key").unwrap().is_none());
2267    }
2268
2269    #[test]
2270    fn very_small_ttl_expires_quickly() {
2271        let mut ks = Keyspace::new();
2272        ks.set(
2273            "key".into(),
2274            Bytes::from("val"),
2275            Some(Duration::from_millis(1)),
2276            false,
2277            false,
2278        );
2279
2280        std::thread::sleep(Duration::from_millis(5));
2281        assert!(ks.get("key").unwrap().is_none());
2282    }
2283
2284    #[test]
2285    fn count_keys_in_slot_empty() {
2286        let ks = Keyspace::new();
2287        assert_eq!(ks.count_keys_in_slot(0), 0);
2288    }
2289
2290    #[test]
2291    fn count_keys_in_slot_matches() {
2292        let mut ks = Keyspace::new();
2293        // insert a few keys and count those in a specific slot
2294        ks.set("a".into(), Bytes::from("1"), None, false, false);
2295        ks.set("b".into(), Bytes::from("2"), None, false, false);
2296        ks.set("c".into(), Bytes::from("3"), None, false, false);
2297
2298        let slot_a = ember_cluster::key_slot(b"a");
2299        let count = ks.count_keys_in_slot(slot_a);
2300        // at minimum, "a" should be in its own slot
2301        assert!(count >= 1);
2302    }
2303
2304    #[test]
2305    fn count_keys_in_slot_skips_expired() {
2306        let mut ks = Keyspace::new();
2307        let slot = ember_cluster::key_slot(b"temp");
2308        ks.set(
2309            "temp".into(),
2310            Bytes::from("gone"),
2311            Some(Duration::from_millis(0)),
2312            false,
2313            false,
2314        );
2315        // key is expired — should not be counted
2316        thread::sleep(Duration::from_millis(5));
2317        assert_eq!(ks.count_keys_in_slot(slot), 0);
2318    }
2319
2320    #[test]
2321    fn get_keys_in_slot_returns_matching() {
2322        let mut ks = Keyspace::new();
2323        ks.set("x".into(), Bytes::from("1"), None, false, false);
2324        ks.set("y".into(), Bytes::from("2"), None, false, false);
2325
2326        let slot_x = ember_cluster::key_slot(b"x");
2327        let keys = ks.get_keys_in_slot(slot_x, 100);
2328        assert!(keys.contains(&"x".to_string()));
2329    }
2330
2331    #[test]
2332    fn get_keys_in_slot_respects_count_limit() {
2333        let mut ks = Keyspace::new();
2334        // insert several keys — some might share a slot
2335        for i in 0..100 {
2336            ks.set(format!("key:{i}"), Bytes::from("v"), None, false, false);
2337        }
2338        // ask for at most 3 keys from slot 0
2339        let keys = ks.get_keys_in_slot(0, 3);
2340        assert!(keys.len() <= 3);
2341    }
2342
2343    // --- key_version (WATCH support) ---
2344    //
2345    // Version tracking uses a lazily-populated side table. `key_version()`
2346    // inserts the key into the table on first call (simulating WATCH).
2347    // Subsequent mutations only bump the version if the key is tracked.
2348
2349    #[test]
2350    fn key_version_returns_none_for_missing() {
2351        let mut ks = Keyspace::new();
2352        assert_eq!(ks.key_version("nope"), None);
2353    }
2354
2355    #[test]
2356    fn key_version_changes_on_set() {
2357        let mut ks = Keyspace::new();
2358        ks.set("k".into(), Bytes::from("v1"), None, false, false);
2359        // first call registers the key in the version table (like WATCH)
2360        let v1 = ks.key_version("k").expect("key should exist");
2361        ks.set("k".into(), Bytes::from("v2"), None, false, false);
2362        let v2 = ks.key_version("k").expect("key should exist");
2363        assert!(v2 > v1, "version should increase on overwrite");
2364    }
2365
2366    #[test]
2367    fn key_version_none_after_del() {
2368        let mut ks = Keyspace::new();
2369        ks.set("k".into(), Bytes::from("v"), None, false, false);
2370        assert!(ks.key_version("k").is_some());
2371        ks.del("k");
2372        assert_eq!(ks.key_version("k"), None);
2373    }
2374
2375    #[test]
2376    fn key_version_changes_on_list_push() {
2377        let mut ks = Keyspace::new();
2378        ks.lpush("list", &[Bytes::from("a")]).unwrap();
2379        let v1 = ks.key_version("list").expect("list should exist");
2380        ks.rpush("list", &[Bytes::from("b")]).unwrap();
2381        let v2 = ks.key_version("list").expect("list should exist");
2382        assert!(v2 > v1, "version should increase on rpush");
2383    }
2384
2385    #[test]
2386    fn key_version_changes_on_hash_set() {
2387        let mut ks = Keyspace::new();
2388        ks.hset("h", &[("f1".into(), Bytes::from("v1"))]).unwrap();
2389        let v1 = ks.key_version("h").expect("hash should exist");
2390        ks.hset("h", &[("f2".into(), Bytes::from("v2"))]).unwrap();
2391        let v2 = ks.key_version("h").expect("hash should exist");
2392        assert!(v2 > v1, "version should increase on hset");
2393    }
2394
2395    #[test]
2396    fn key_version_changes_on_expire() {
2397        let mut ks = Keyspace::new();
2398        ks.set("k".into(), Bytes::from("v"), None, false, false);
2399        let v1 = ks.key_version("k").expect("key should exist");
2400        ks.expire("k", 100);
2401        let v2 = ks.key_version("k").expect("key should exist");
2402        assert!(v2 > v1, "version should increase on expire");
2403    }
2404
2405    #[test]
2406    fn key_version_stable_without_watch() {
2407        // if key_version is never called, mutations don't create
2408        // version entries — the side table stays empty
2409        let mut ks = Keyspace::new();
2410        ks.set("a".into(), Bytes::from("1"), None, false, false);
2411        ks.set("a".into(), Bytes::from("2"), None, false, false);
2412        // first call to key_version returns a snapshot
2413        let v1 = ks.key_version("a").unwrap();
2414        // no mutation between calls — version is stable
2415        let v2 = ks.key_version("a").unwrap();
2416        assert_eq!(v1, v2, "version should be stable without mutations");
2417    }
2418
2419    // --- copy tests ---
2420
2421    #[test]
2422    fn copy_basic() {
2423        let mut ks = Keyspace::new();
2424        ks.set("src".into(), Bytes::from("hello"), None, false, false);
2425        assert_eq!(ks.copy("src", "dst", false), Ok(true));
2426        assert_eq!(
2427            ks.get("dst").unwrap(),
2428            Some(Value::String(Bytes::from("hello")))
2429        );
2430        // source should still exist
2431        assert!(ks.exists("src"));
2432    }
2433
2434    #[test]
2435    fn copy_preserves_expiry() {
2436        let mut ks = Keyspace::new();
2437        ks.set(
2438            "src".into(),
2439            Bytes::from("val"),
2440            Some(Duration::from_secs(60)),
2441            false,
2442            false,
2443        );
2444        assert_eq!(ks.copy("src", "dst", false), Ok(true));
2445        match ks.ttl("dst") {
2446            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2447            other => panic!("expected TTL preserved, got {other:?}"),
2448        }
2449    }
2450
2451    #[test]
2452    fn copy_no_replace_returns_false() {
2453        let mut ks = Keyspace::new();
2454        ks.set("src".into(), Bytes::from("a"), None, false, false);
2455        ks.set("dst".into(), Bytes::from("b"), None, false, false);
2456        assert_eq!(ks.copy("src", "dst", false), Ok(false));
2457        // destination should be unchanged
2458        assert_eq!(
2459            ks.get("dst").unwrap(),
2460            Some(Value::String(Bytes::from("b")))
2461        );
2462    }
2463
2464    #[test]
2465    fn copy_replace_overwrites() {
2466        let mut ks = Keyspace::new();
2467        ks.set("src".into(), Bytes::from("new"), None, false, false);
2468        ks.set("dst".into(), Bytes::from("old"), None, false, false);
2469        assert_eq!(ks.copy("src", "dst", true), Ok(true));
2470        assert_eq!(
2471            ks.get("dst").unwrap(),
2472            Some(Value::String(Bytes::from("new")))
2473        );
2474    }
2475
2476    #[test]
2477    fn copy_missing_source() {
2478        let mut ks = Keyspace::new();
2479        assert_eq!(ks.copy("missing", "dst", false), Err(CopyError::NoSuchKey));
2480    }
2481
2482    #[test]
2483    fn copy_tracks_memory() {
2484        let mut ks = Keyspace::new();
2485        ks.set("src".into(), Bytes::from("value"), None, false, false);
2486        let before = ks.stats().used_bytes;
2487        ks.copy("src", "dst", false).unwrap();
2488        let after = ks.stats().used_bytes;
2489        // memory should roughly double (two entries with same value)
2490        assert!(after > before);
2491        assert_eq!(ks.stats().key_count, 2);
2492    }
2493
2494    // --- random_key ---
2495
2496    #[test]
2497    fn random_key_empty() {
2498        let mut ks = Keyspace::new();
2499        assert_eq!(ks.random_key(), None);
2500    }
2501
2502    #[test]
2503    fn random_key_returns_existing() {
2504        let mut ks = Keyspace::new();
2505        ks.set("only".into(), Bytes::from("val"), None, false, false);
2506        assert_eq!(ks.random_key(), Some("only".into()));
2507    }
2508
2509    // --- touch ---
2510
2511    #[test]
2512    fn touch_existing_key() {
2513        let mut ks = Keyspace::new();
2514        ks.set("k".into(), Bytes::from("v"), None, false, false);
2515        assert!(ks.touch("k"));
2516    }
2517
2518    #[test]
2519    fn touch_missing_key() {
2520        let mut ks = Keyspace::new();
2521        assert!(!ks.touch("missing"));
2522    }
2523
2524    // --- sort ---
2525
2526    #[test]
2527    fn sort_list_numeric() {
2528        let mut ks = Keyspace::new();
2529        let _ = ks.lpush(
2530            "nums",
2531            &[Bytes::from("3"), Bytes::from("1"), Bytes::from("2")],
2532        );
2533        let result = ks.sort("nums", false, false, None).unwrap();
2534        assert_eq!(
2535            result,
2536            vec![Bytes::from("1"), Bytes::from("2"), Bytes::from("3")]
2537        );
2538    }
2539
2540    #[test]
2541    fn sort_list_alpha_desc() {
2542        let mut ks = Keyspace::new();
2543        let _ = ks.lpush(
2544            "words",
2545            &[
2546                Bytes::from("banana"),
2547                Bytes::from("apple"),
2548                Bytes::from("cherry"),
2549            ],
2550        );
2551        let result = ks.sort("words", true, true, None).unwrap();
2552        assert_eq!(
2553            result,
2554            vec![
2555                Bytes::from("cherry"),
2556                Bytes::from("banana"),
2557                Bytes::from("apple")
2558            ]
2559        );
2560    }
2561
2562    #[test]
2563    fn sort_with_limit() {
2564        let mut ks = Keyspace::new();
2565        let _ = ks.lpush(
2566            "nums",
2567            &[
2568                Bytes::from("4"),
2569                Bytes::from("3"),
2570                Bytes::from("2"),
2571                Bytes::from("1"),
2572            ],
2573        );
2574        let result = ks.sort("nums", false, false, Some((1, 2))).unwrap();
2575        assert_eq!(result, vec![Bytes::from("2"), Bytes::from("3")]);
2576    }
2577
2578    #[test]
2579    fn sort_set_alpha() {
2580        let mut ks = Keyspace::new();
2581        let members: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
2582        let _ = ks.sadd("myset", &members);
2583        let result = ks.sort("myset", false, true, None).unwrap();
2584        assert_eq!(
2585            result,
2586            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
2587        );
2588    }
2589
2590    #[test]
2591    fn sort_missing_key() {
2592        let mut ks = Keyspace::new();
2593        let result = ks.sort("nope", false, false, None).unwrap();
2594        assert!(result.is_empty());
2595    }
2596
2597    #[test]
2598    fn sort_wrong_type() {
2599        let mut ks = Keyspace::new();
2600        ks.set("str".into(), Bytes::from("hello"), None, false, false);
2601        let result = ks.sort("str", false, false, None);
2602        assert!(result.is_err());
2603    }
2604}