Skip to main content

ember_core/keyspace/
mod.rs

1//! The keyspace: Ember's core key-value store.
2//!
3//! A `Keyspace` owns a flat `AHashMap<CompactString, Entry>` and handles
4//! get, set, delete, existence checks, and TTL management. Expired
5//! keys are removed lazily on access. Memory usage is tracked on
6//! every mutation for eviction and stats reporting.
7
8use std::collections::VecDeque;
9use std::time::Duration;
10
11use ahash::AHashMap;
12use bytes::Bytes;
13use compact_str::CompactString;
14use rand::seq::IteratorRandom;
15
16use tracing::warn;
17
18use crate::dropper::DropHandle;
19use crate::memory::{self, MemoryTracker};
20use crate::time;
21use crate::types::sorted_set::{ScoreBound, SortedSet, ZAddFlags};
22use crate::types::{self, normalize_range, Value};
23
24mod bitmap;
25mod hash;
26mod list;
27#[cfg(feature = "protobuf")]
28mod proto;
29mod set;
30mod string;
31#[cfg(feature = "vector")]
32mod vector;
33mod zset;
34
35const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
36const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
37
38/// Error returned when a command is used against a key holding the wrong type.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct WrongType;
41
42impl std::fmt::Display for WrongType {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "{WRONGTYPE_MSG}")
45    }
46}
47
48impl std::error::Error for WrongType {}
49
50/// Error returned by write operations that may fail due to type mismatch
51/// or memory limits.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum WriteError {
54    /// The key holds a different type than expected.
55    WrongType,
56    /// Memory limit reached and eviction couldn't free enough space.
57    OutOfMemory,
58}
59
60impl From<WrongType> for WriteError {
61    fn from(_: WrongType) -> Self {
62        WriteError::WrongType
63    }
64}
65
66/// Errors that can occur during INCR/DECR operations.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum IncrError {
69    /// Key holds a non-string type.
70    WrongType,
71    /// Value is not a valid integer.
72    NotAnInteger,
73    /// Increment or decrement would overflow i64.
74    Overflow,
75    /// Memory limit reached.
76    OutOfMemory,
77}
78
79impl std::fmt::Display for IncrError {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
83            IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
84            IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
85            IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
86        }
87    }
88}
89
90impl std::error::Error for IncrError {}
91
92/// Errors that can occur during INCRBYFLOAT operations.
93#[derive(Debug, Clone, PartialEq)]
94pub enum IncrFloatError {
95    /// Key holds a non-string type.
96    WrongType,
97    /// Value is not a valid float.
98    NotAFloat,
99    /// Result would be NaN or Infinity.
100    NanOrInfinity,
101    /// Memory limit reached.
102    OutOfMemory,
103}
104
105impl std::fmt::Display for IncrFloatError {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        match self {
108            IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
109            IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
110            IncrFloatError::NanOrInfinity => {
111                write!(f, "ERR increment would produce NaN or Infinity")
112            }
113            IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
114        }
115    }
116}
117
118impl std::error::Error for IncrFloatError {}
119
120/// Error returned when RENAME fails because the source key doesn't exist.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum RenameError {
123    /// The source key does not exist.
124    NoSuchKey,
125}
126
127/// Error returned by COPY.
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub enum CopyError {
130    /// The source key does not exist.
131    NoSuchKey,
132    /// Memory limit reached and eviction couldn't free enough space.
133    OutOfMemory,
134}
135
136/// Error returned by LSET.
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum LsetError {
139    /// Key holds a different type.
140    WrongType,
141    /// Key does not exist.
142    NoSuchKey,
143    /// Index is beyond list bounds.
144    IndexOutOfRange,
145}
146
147impl std::fmt::Display for LsetError {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        match self {
150            LsetError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
151            LsetError::NoSuchKey => write!(f, "ERR no such key"),
152            LsetError::IndexOutOfRange => write!(f, "ERR index out of range"),
153        }
154    }
155}
156
157impl std::error::Error for LsetError {}
158
159impl std::fmt::Display for RenameError {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        match self {
162            RenameError::NoSuchKey => write!(f, "ERR no such key"),
163        }
164    }
165}
166
167impl std::error::Error for RenameError {}
168
169/// Result of a ZADD operation, containing both the client-facing count
170/// and the list of members that were actually applied (for AOF correctness).
171#[derive(Debug, Clone)]
172pub struct ZAddResult {
173    /// Number of members added (or added+updated if CH flag was set).
174    pub count: usize,
175    /// Members that were actually inserted or had their score updated.
176    /// Only these should be persisted to the AOF.
177    pub applied: Vec<(f64, String)>,
178}
179
180/// Result of a VADD operation, carrying the applied element for AOF persistence.
181#[cfg(feature = "vector")]
182#[derive(Debug, Clone)]
183pub struct VAddResult {
184    /// The element name that was added or updated.
185    pub element: String,
186    /// The vector that was stored.
187    pub vector: Vec<f32>,
188    /// Whether a new element was added (false = updated existing).
189    pub added: bool,
190}
191
192/// Result of a VADD_BATCH operation.
193#[cfg(feature = "vector")]
194#[derive(Debug, Clone)]
195pub struct VAddBatchResult {
196    /// Number of newly added elements (not updates).
197    pub added_count: usize,
198    /// Elements that were actually inserted or updated, with their vectors.
199    /// Only these should be persisted to the AOF.
200    pub applied: Vec<(String, Vec<f32>)>,
201}
202
203/// Errors from vector write operations.
204#[cfg(feature = "vector")]
205#[derive(Debug, Clone)]
206pub enum VectorWriteError {
207    /// The key holds a different type than expected.
208    WrongType,
209    /// Memory limit reached.
210    OutOfMemory,
211    /// usearch index error (dimension mismatch, capacity, etc).
212    IndexError(String),
213    /// A batch insert partially succeeded before encountering an error.
214    /// The applied vectors should still be persisted to the AOF.
215    PartialBatch {
216        message: String,
217        applied: Vec<(String, Vec<f32>)>,
218    },
219}
220
221/// How the keyspace should handle writes when the memory limit is reached.
222#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
223pub enum EvictionPolicy {
224    /// Return an error on writes when memory is full.
225    #[default]
226    NoEviction,
227    /// Evict the least-recently-used key (approximated via random sampling).
228    AllKeysLru,
229}
230
231/// Configuration for a single keyspace / shard.
232#[derive(Debug, Clone)]
233pub struct ShardConfig {
234    /// Maximum memory in bytes. `None` means unlimited.
235    pub max_memory: Option<usize>,
236    /// What to do when memory is full.
237    pub eviction_policy: EvictionPolicy,
238    /// Numeric identifier for this shard (used for persistence file naming).
239    pub shard_id: u16,
240}
241
242impl Default for ShardConfig {
243    fn default() -> Self {
244        Self {
245            max_memory: None,
246            eviction_policy: EvictionPolicy::NoEviction,
247            shard_id: 0,
248        }
249    }
250}
251
252/// Result of a set operation that may fail under memory pressure.
253#[derive(Debug, PartialEq, Eq)]
254pub enum SetResult {
255    /// The key was stored successfully.
256    Ok,
257    /// Memory limit reached and eviction policy is NoEviction.
258    OutOfMemory,
259    /// NX/XX condition was not met (key existed for NX, or didn't for XX).
260    Blocked,
261}
262
263/// A single entry in the keyspace: a value plus optional expiration
264/// and last access time for LRU approximation.
265///
266/// Field order is chosen for cache-line packing: `value` and
267/// `expires_at_ms` (the hot-path read fields) sit at the front so
268/// they share the first L1 cache line with the HashMap key pointer.
269/// `cached_value_size` is warm (used on writes). `last_access_secs`
270/// is cold (only used during eviction sampling).
271///
272/// Version tracking for WATCH/EXEC lives in a separate side table on
273/// Keyspace (`versions` map), not on every entry. This saves 8 bytes
274/// per entry since <1% of keys are ever WATCHed.
275#[derive(Debug, Clone)]
276pub(crate) struct Entry {
277    pub(crate) value: Value,
278    /// Monotonic expiry timestamp in ms. 0 = no expiry.
279    pub(crate) expires_at_ms: u64,
280    /// Cached result of `memory::value_size(&self.value)`. Updated on
281    /// every mutation so that memory accounting is O(1) instead of
282    /// walking entire collections. Using u32 saves 4 bytes per entry;
283    /// max value size is 512 MB which fits comfortably in u32 (~4 GB).
284    pub(crate) cached_value_size: u32,
285    /// Monotonic last access time in seconds since process start (for LRU).
286    /// Using u32 saves 4 bytes per entry; wraps at ~136 years.
287    pub(crate) last_access_secs: u32,
288}
289
290impl Entry {
291    fn new(value: Value, ttl: Option<Duration>) -> Self {
292        let cached_value_size = memory::value_size(&value) as u32;
293        Self {
294            value,
295            expires_at_ms: time::expiry_from_duration(ttl),
296            cached_value_size,
297            last_access_secs: time::now_secs(),
298        }
299    }
300
301    /// Returns `true` if this entry has passed its expiration time.
302    fn is_expired(&self) -> bool {
303        time::is_expired(self.expires_at_ms)
304    }
305
306    /// Marks this entry as accessed right now. When `track` is false
307    /// (NoEviction policy), this is a no-op — skipping the `now_secs()`
308    /// call on every access.
309    #[inline(always)]
310    fn touch(&mut self, track: bool) {
311        if track {
312            self.last_access_secs = time::now_secs();
313        }
314    }
315
316    /// Returns the full estimated memory footprint of this entry
317    /// (key + value + overhead) using the cached value size.
318    fn entry_size(&self, key: &str) -> usize {
319        key.len() + self.cached_value_size as usize + memory::ENTRY_OVERHEAD
320    }
321}
322
323/// Result of a TTL query, matching Redis semantics.
324#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TtlResult {
326    /// Key exists and has a TTL. Returns remaining seconds.
327    Seconds(u64),
328    /// Key exists and has a TTL. Returns remaining milliseconds.
329    Milliseconds(u64),
330    /// Key exists but has no expiration set.
331    NoExpiry,
332    /// Key does not exist.
333    NotFound,
334}
335
336/// Aggregated statistics for a keyspace.
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct KeyspaceStats {
339    /// Number of live keys.
340    pub key_count: usize,
341    /// Estimated memory usage in bytes.
342    pub used_bytes: usize,
343    /// Number of keys with an expiration set.
344    pub keys_with_expiry: usize,
345    /// Cumulative count of keys removed by expiration (lazy + active).
346    pub keys_expired: u64,
347    /// Cumulative count of keys removed by eviction.
348    pub keys_evicted: u64,
349    /// Cumulative count of write commands rejected due to memory limits.
350    pub oom_rejections: u64,
351    /// Cumulative count of successful key lookups.
352    pub keyspace_hits: u64,
353    /// Cumulative count of key lookups that found no key (expired or absent).
354    pub keyspace_misses: u64,
355}
356
357/// Number of random keys to sample when looking for an eviction candidate.
358///
359/// Eviction uses sampling-based approximate LRU — we randomly select this many
360/// keys and evict the least-recently-accessed among them. This trades perfect
361/// LRU accuracy for O(1) eviction (no sorted structure to maintain).
362///
363/// Larger sample sizes give better LRU approximation but cost more per eviction.
364/// 16 is a reasonable balance — similar to Redis's default sample size. With
365/// 16 samples, we statistically find a good eviction candidate while keeping
366/// eviction overhead low even at millions of keys.
367const EVICTION_SAMPLE_SIZE: usize = 16;
368
369/// The core key-value store.
370///
371/// All operations are single-threaded per shard — no internal locking.
372/// Memory usage is tracked incrementally on every mutation.
373pub struct Keyspace {
374    entries: AHashMap<CompactString, Entry>,
375    memory: MemoryTracker,
376    config: ShardConfig,
377    /// Number of entries that currently have an expiration set.
378    expiry_count: usize,
379    /// Cumulative count of keys removed by expiration (lazy + active).
380    expired_total: u64,
381    /// Cumulative count of keys removed by eviction.
382    evicted_total: u64,
383    /// Cumulative count of write rejections due to memory limits.
384    oom_rejections: u64,
385    /// Cumulative count of successful key lookups.
386    keyspace_hits: u64,
387    /// Cumulative count of key lookups that found no key (expired or absent).
388    keyspace_misses: u64,
389    /// When set, large values are dropped on a background thread instead
390    /// of inline on the shard thread. See [`crate::dropper`].
391    drop_handle: Option<DropHandle>,
392    /// Monotonic counter for entry versions. Each mutation gets the next
393    /// value, giving WATCH a cheap way to detect concurrent writes.
394    next_version: u64,
395    /// Side table for WATCH/EXEC version tracking. Only populated for
396    /// keys that have been WATCHed. On mutation, we bump the version
397    /// only if the key exists in this map — which is almost never,
398    /// so the hot path pays only a fast hash-miss lookup.
399    versions: AHashMap<CompactString, u64>,
400    /// Whether to update `last_access_secs` on every access. Only useful
401    /// when the eviction policy needs LRU timestamps (AllKeysLru).
402    /// When false (NoEviction), we skip the `now_secs()` call on every
403    /// read/write — saving a syscall per operation on the hot path.
404    track_access: bool,
405}
406
407impl Keyspace {
408    /// Creates a new, empty keyspace with default config (no memory limit).
409    pub fn new() -> Self {
410        Self::with_config(ShardConfig::default())
411    }
412
413    /// Creates a new, empty keyspace with the given config.
414    pub fn with_config(config: ShardConfig) -> Self {
415        let track_access = config.eviction_policy == EvictionPolicy::AllKeysLru;
416        Self {
417            entries: AHashMap::new(),
418            memory: MemoryTracker::new(),
419            config,
420            expiry_count: 0,
421            expired_total: 0,
422            evicted_total: 0,
423            oom_rejections: 0,
424            keyspace_hits: 0,
425            keyspace_misses: 0,
426            drop_handle: None,
427            next_version: 0,
428            versions: AHashMap::new(),
429            track_access,
430        }
431    }
432
433    /// Attaches a background drop handle for lazy free. When set, large
434    /// values removed by del/eviction/expiration are dropped on a
435    /// background thread instead of blocking the shard.
436    pub fn set_drop_handle(&mut self, handle: DropHandle) {
437        self.drop_handle = Some(handle);
438    }
439
440    /// Bumps the version for a key in the side table, but only if the
441    /// key is being tracked (i.e. someone WATCHed it). When no keys
442    /// are watched, the versions map is empty and this is a fast miss.
443    fn bump_version(&mut self, key: &str) {
444        if let Some(ver) = self.versions.get_mut(key) {
445            self.next_version += 1;
446            *ver = self.next_version;
447        }
448    }
449
450    /// Returns the current version of a key, or `None` if the key is
451    /// missing or expired. Used by WATCH/EXEC — cold path only.
452    ///
453    /// If the key hasn't been WATCHed yet, inserts the current
454    /// `next_version` so that future mutations will be detected.
455    pub fn key_version(&mut self, key: &str) -> Option<u64> {
456        let entry = self.entries.get(key)?;
457        if entry.is_expired() {
458            return None;
459        }
460        let ver = *self
461            .versions
462            .entry(CompactString::from(key))
463            .or_insert(self.next_version);
464        Some(ver)
465    }
466
467    /// Removes version tracking for a key. Called on key deletion,
468    /// expiration, and eviction so the side table doesn't leak.
469    fn remove_version(&mut self, key: &str) {
470        self.versions.remove(key);
471    }
472
473    /// Removes all version tracking entries. Called on UNWATCH/DISCARD
474    /// or FLUSHDB.
475    pub fn clear_versions(&mut self) {
476        self.versions.clear();
477    }
478
479    /// Decrements the expiry count if the entry had a TTL set.
480    fn decrement_expiry_if_set(&mut self, entry: &Entry) {
481        if entry.expires_at_ms != 0 {
482            self.expiry_count = self.expiry_count.saturating_sub(1);
483        }
484    }
485
486    /// Cleans up after removing an element from a collection (list, sorted
487    /// set, hash, or set). If the collection is now empty, removes the key
488    /// entirely and subtracts `old_size` from the memory tracker. Otherwise
489    /// subtracts `removed_bytes` (the byte cost of the removed element(s))
490    /// without rescanning the remaining collection, and updates the cached
491    /// value size.
492    fn cleanup_after_remove(
493        &mut self,
494        key: &str,
495        old_size: usize,
496        is_empty: bool,
497        removed_bytes: usize,
498    ) {
499        if is_empty {
500            if let Some(removed) = self.entries.remove(key) {
501                self.decrement_expiry_if_set(&removed);
502            }
503            self.memory.remove_with_size(old_size);
504        } else {
505            self.memory.shrink_by(removed_bytes);
506            if let Some(entry) = self.entries.get_mut(key) {
507                entry.cached_value_size =
508                    (entry.cached_value_size as usize).saturating_sub(removed_bytes) as u32;
509            }
510        }
511    }
512
513    /// Checks whether a key either doesn't exist or holds the expected
514    /// collection type. Returns `Ok(true)` if the key is new,
515    /// `Ok(false)` if it exists with the right type, or `Err(WrongType)`
516    /// if the key exists with a different type.
517    fn ensure_collection_type(
518        &self,
519        key: &str,
520        type_check: fn(&Value) -> bool,
521    ) -> Result<bool, WriteError> {
522        match self.entries.get(key) {
523            None => Ok(true),
524            Some(e) if type_check(&e.value) => Ok(false),
525            Some(_) => Err(WriteError::WrongType),
526        }
527    }
528
529    /// Estimates the memory cost of a collection write and enforces the
530    /// limit. `base_overhead` is the fixed cost of a new collection
531    /// (e.g. VECDEQUE_BASE_OVERHEAD). Returns `Ok(())` on success.
532    fn reserve_memory(
533        &mut self,
534        is_new: bool,
535        key: &str,
536        base_overhead: usize,
537        element_increase: usize,
538    ) -> Result<(), WriteError> {
539        let estimated_increase = if is_new {
540            memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
541        } else {
542            element_increase
543        };
544        if self.enforce_memory_limit(estimated_increase) {
545            Ok(())
546        } else {
547            Err(WriteError::OutOfMemory)
548        }
549    }
550
551    /// Inserts a new key with an empty collection value. Used by
552    /// collection-write methods after type-checking and memory reservation.
553    fn insert_empty(&mut self, key: &str, value: Value) {
554        self.memory.add(key, &value);
555        let entry = Entry::new(value, None);
556        self.entries.insert(CompactString::from(key), entry);
557        self.bump_version(key);
558    }
559
560    /// Measures entry size before and after a mutation, adjusting the
561    /// memory tracker for the difference.
562    ///
563    /// Uses `cached_value_size` for the pre-mutation size (O(1)) and
564    /// recomputes after the mutation to update the cache. This halves
565    /// the cost of memory tracking for large collections.
566    fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
567        let entry = self.entries.get_mut(key)?;
568        let old_size = entry.entry_size(key);
569        let result = f(entry);
570        // re-lookup after mutation (f consumed the borrow)
571        let entry = self.entries.get_mut(key)?;
572        let new_value_size = memory::value_size(&entry.value);
573        entry.cached_value_size = new_value_size as u32;
574        let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
575        self.memory.adjust(old_size, new_size);
576        self.bump_version(key);
577        Some(result)
578    }
579
580    /// Adjusts the expiry count when replacing an entry whose TTL status
581    /// may have changed (e.g. SET overwriting an existing key).
582    fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
583        match (had_expiry, has_expiry) {
584            (false, true) => self.expiry_count += 1,
585            (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
586            _ => {}
587        }
588    }
589
590    /// Tries to evict one key using LRU approximation.
591    ///
592    /// Randomly samples `EVICTION_SAMPLE_SIZE` keys and removes the one
593    /// with the oldest `last_access` time. Returns `true` if a key was
594    /// evicted, `false` if the keyspace is empty.
595    ///
596    /// Uses reservoir sampling with k=1 to avoid allocating a Vec on
597    /// every eviction attempt. The victim key index is remembered
598    /// rather than cloned, eliminating a heap allocation on the hot path.
599    fn try_evict(&mut self) -> bool {
600        if self.entries.is_empty() {
601            return false;
602        }
603
604        let mut rng = rand::rng();
605
606        // reservoir sample k=1 from the iterator, tracking the oldest
607        // entry by last_access_secs. this replaces choose_multiple() which
608        // allocates a Vec internally.
609        let mut best_key: Option<&str> = None;
610        let mut best_access = u32::MAX;
611        let mut seen = 0usize;
612
613        for (key, entry) in &self.entries {
614            // reservoir sampling: include this item with probability
615            // EVICTION_SAMPLE_SIZE / (seen + 1), capped once we have
616            // enough candidates
617            seen += 1;
618            if seen <= EVICTION_SAMPLE_SIZE {
619                if entry.last_access_secs < best_access {
620                    best_access = entry.last_access_secs;
621                    best_key = Some(&**key);
622                }
623            } else {
624                use rand::Rng;
625                let j = rng.random_range(0..seen);
626                if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
627                    best_access = entry.last_access_secs;
628                    best_key = Some(&**key);
629                }
630            }
631        }
632
633        if let Some(victim) = best_key {
634            // own the key to break the immutable borrow on self.entries
635            let victim = victim.to_owned();
636            if let Some(entry) = self.entries.remove(victim.as_str()) {
637                self.memory.remove(&victim, &entry.value);
638                self.decrement_expiry_if_set(&entry);
639                self.evicted_total += 1;
640                self.remove_version(&victim);
641                self.defer_drop(entry.value);
642                return true;
643            }
644        }
645        false
646    }
647
648    /// Checks whether the memory limit allows a write that would increase
649    /// usage by `estimated_increase` bytes. Attempts eviction if the
650    /// policy allows it. Returns `true` if the write can proceed.
651    ///
652    /// The comparison uses [`memory::effective_limit`] rather than the raw
653    /// configured maximum. This reserves headroom for allocator overhead
654    /// and fragmentation that our per-entry estimates can't account for,
655    /// preventing the OS from OOM-killing us before eviction triggers.
656    fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
657        if let Some(max) = self.config.max_memory {
658            let limit = memory::effective_limit(max);
659            while self.memory.used_bytes() + estimated_increase > limit {
660                match self.config.eviction_policy {
661                    EvictionPolicy::NoEviction => {
662                        self.oom_rejections += 1;
663                        // log first rejection, then every 1000th to avoid flooding
664                        if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000) {
665                            warn!(
666                                used_bytes = self.memory.used_bytes(),
667                                limit,
668                                requested = estimated_increase,
669                                total_rejections = self.oom_rejections,
670                                "OOM: write rejected (policy: noeviction)"
671                            );
672                        }
673                        return false;
674                    }
675                    EvictionPolicy::AllKeysLru => {
676                        if !self.try_evict() {
677                            self.oom_rejections += 1;
678                            if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000)
679                            {
680                                warn!(
681                                    used_bytes = self.memory.used_bytes(),
682                                    limit,
683                                    requested = estimated_increase,
684                                    total_rejections = self.oom_rejections,
685                                    "OOM: write rejected (eviction exhausted)"
686                                );
687                            }
688                            return false;
689                        }
690                    }
691                }
692            }
693        }
694        true
695    }
696
697    /// Removes a key. Returns `true` if the key existed (and wasn't expired).
698    ///
699    /// When a drop handle is set, large values are dropped on the
700    /// background thread instead of inline.
701    pub fn del(&mut self, key: &str) -> bool {
702        if self.remove_if_expired(key) {
703            return false;
704        }
705        if let Some(entry) = self.entries.remove(key) {
706            self.memory.remove(key, &entry.value);
707            self.decrement_expiry_if_set(&entry);
708            self.remove_version(key);
709            self.defer_drop(entry.value);
710            true
711        } else {
712            false
713        }
714    }
715
716    /// Removes a key like `del`, but always defers the value's destructor
717    /// to the background drop thread (when available). Semantically
718    /// identical to DEL — the key is gone immediately, memory is
719    /// accounted for immediately, but the actual deallocation happens
720    /// off the hot path.
721    pub fn unlink(&mut self, key: &str) -> bool {
722        if self.remove_if_expired(key) {
723            return false;
724        }
725        if let Some(entry) = self.entries.remove(key) {
726            self.memory.remove(key, &entry.value);
727            self.decrement_expiry_if_set(&entry);
728            self.remove_version(key);
729            // always defer for UNLINK, regardless of value size
730            if let Some(ref handle) = self.drop_handle {
731                handle.defer_value(entry.value);
732            }
733            true
734        } else {
735            false
736        }
737    }
738
739    /// Replaces the entries map with an empty one and resets memory
740    /// tracking. Returns the old entries so the caller can send them
741    /// to the background drop thread.
742    pub(crate) fn flush_async(&mut self) -> AHashMap<CompactString, Entry> {
743        let old = std::mem::take(&mut self.entries);
744        self.memory.reset();
745        self.expiry_count = 0;
746        self.versions.clear();
747        old
748    }
749
750    /// Returns `true` if the key exists and hasn't expired.
751    pub fn exists(&mut self, key: &str) -> bool {
752        if self.remove_if_expired(key) {
753            return false;
754        }
755        self.entries.contains_key(key)
756    }
757
758    /// Returns a random key from the keyspace, or `None` if empty.
759    ///
760    /// Uses reservoir sampling from the hash map iterator. Expired keys
761    /// are skipped (and lazily cleaned up with bounded retries).
762    pub fn random_key(&mut self) -> Option<String> {
763        // bounded retries in case we keep hitting expired keys
764        for _ in 0..5 {
765            let mut rng = rand::rng();
766            let key = self.entries.keys().choose(&mut rng)?.clone();
767
768            if self.remove_if_expired(&key) {
769                continue;
770            }
771            return Some(key.to_string());
772        }
773        None
774    }
775
776    /// Updates the last access time for a key. Returns `true` if the key exists.
777    pub fn touch(&mut self, key: &str) -> bool {
778        if self.remove_if_expired(key) {
779            return false;
780        }
781        match self.entries.get_mut(key) {
782            Some(entry) => {
783                entry.touch(self.track_access);
784                true
785            }
786            None => false,
787        }
788    }
789
790    /// Sorts elements from a list, set, or sorted set.
791    ///
792    /// Returns the sorted elements as byte strings, or an error if the
793    /// key holds the wrong type or numeric parsing fails.
794    pub fn sort(
795        &mut self,
796        key: &str,
797        desc: bool,
798        alpha: bool,
799        limit: Option<(i64, i64)>,
800    ) -> Result<Vec<Bytes>, &'static str> {
801        if self.remove_if_expired(key) {
802            return Ok(Vec::new());
803        }
804        let entry = match self.entries.get_mut(key) {
805            Some(e) => {
806                e.touch(self.track_access);
807                e
808            }
809            None => return Ok(Vec::new()),
810        };
811
812        // collect elements from the appropriate type
813        let mut items: Vec<Bytes> = match &entry.value {
814            Value::List(deq) => deq.iter().cloned().collect(),
815            Value::Set(set) => set.iter().map(|s| Bytes::from(s.clone())).collect(),
816            Value::SortedSet(zset) => zset
817                .iter()
818                .map(|(m, _)| Bytes::from(m.to_owned()))
819                .collect(),
820            _ => return Err(WRONGTYPE_MSG),
821        };
822
823        // sort
824        if alpha {
825            items.sort();
826            if desc {
827                items.reverse();
828            }
829        } else {
830            // numeric sort — parse all elements as f64
831            let mut parse_err = false;
832            items.sort_by(|a, b| {
833                let a_str = std::str::from_utf8(a).unwrap_or("");
834                let b_str = std::str::from_utf8(b).unwrap_or("");
835                let a_val = a_str.parse::<f64>().unwrap_or_else(|_| {
836                    parse_err = true;
837                    0.0
838                });
839                let b_val = b_str.parse::<f64>().unwrap_or_else(|_| {
840                    parse_err = true;
841                    0.0
842                });
843                if desc {
844                    b_val
845                        .partial_cmp(&a_val)
846                        .unwrap_or(std::cmp::Ordering::Equal)
847                } else {
848                    a_val
849                        .partial_cmp(&b_val)
850                        .unwrap_or(std::cmp::Ordering::Equal)
851                }
852            });
853            if parse_err {
854                return Err("ERR One or more scores can't be converted into double");
855            }
856        }
857
858        // apply limit
859        if let Some((offset, count)) = limit {
860            let offset = offset.max(0) as usize;
861            let count = count.max(0) as usize;
862            let end = offset.saturating_add(count).min(items.len());
863            if offset < items.len() {
864                items = items[offset..end].to_vec();
865            } else {
866                items.clear();
867            }
868        }
869
870        Ok(items)
871    }
872
873    /// Sets an expiration on an existing key. Returns `true` if the key
874    /// exists (and the TTL was set), `false` if the key doesn't exist.
875    pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
876        if self.remove_if_expired(key) {
877            return false;
878        }
879        match self.entries.get_mut(key) {
880            Some(entry) => {
881                if entry.expires_at_ms == 0 {
882                    self.expiry_count += 1;
883                }
884                entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
885                self.bump_version(key);
886                true
887            }
888            None => false,
889        }
890    }
891
892    /// Returns the TTL status for a key, following Redis semantics:
893    /// - `Seconds(n)` if the key has a TTL
894    /// - `NoExpiry` if the key exists without a TTL
895    /// - `NotFound` if the key doesn't exist
896    pub fn ttl(&mut self, key: &str) -> TtlResult {
897        if self.remove_if_expired(key) {
898            return TtlResult::NotFound;
899        }
900        match self.entries.get(key) {
901            Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
902                Some(secs) => TtlResult::Seconds(secs),
903                None => TtlResult::NoExpiry,
904            },
905            None => TtlResult::NotFound,
906        }
907    }
908
909    /// Returns the estimated memory usage in bytes for the given key,
910    /// or `None` if the key does not exist or is expired.
911    ///
912    /// Uses the cached value size for O(1) cost. The estimate covers the
913    /// key string, the serialized value, and the per-entry overhead.
914    pub fn memory_usage(&mut self, key: &str) -> Option<usize> {
915        if self.remove_if_expired(key) {
916            return None;
917        }
918        let entry = self.entries.get(key)?;
919        Some(entry.entry_size(key))
920    }
921
922    /// Removes the expiration from a key.
923    ///
924    /// Returns `true` if the key existed and had a timeout that was removed.
925    /// Returns `false` if the key doesn't exist or has no expiration.
926    pub fn persist(&mut self, key: &str) -> bool {
927        if self.remove_if_expired(key) {
928            return false;
929        }
930        match self.entries.get_mut(key) {
931            Some(entry) => {
932                if entry.expires_at_ms != 0 {
933                    entry.expires_at_ms = 0;
934                    self.expiry_count = self.expiry_count.saturating_sub(1);
935                    self.bump_version(key);
936                    true
937                } else {
938                    false
939                }
940            }
941            None => false,
942        }
943    }
944
945    /// Returns the TTL status for a key in milliseconds, following Redis semantics:
946    /// - `Milliseconds(n)` if the key has a TTL
947    /// - `NoExpiry` if the key exists without a TTL
948    /// - `NotFound` if the key doesn't exist
949    pub fn pttl(&mut self, key: &str) -> TtlResult {
950        if self.remove_if_expired(key) {
951            return TtlResult::NotFound;
952        }
953        match self.entries.get(key) {
954            Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
955                Some(ms) => TtlResult::Milliseconds(ms),
956                None => TtlResult::NoExpiry,
957            },
958            None => TtlResult::NotFound,
959        }
960    }
961
962    /// Sets an expiration on an existing key in milliseconds.
963    ///
964    /// Returns `true` if the key exists (and the TTL was set),
965    /// `false` if the key doesn't exist.
966    pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
967        if self.remove_if_expired(key) {
968            return false;
969        }
970        match self.entries.get_mut(key) {
971            Some(entry) => {
972                if entry.expires_at_ms == 0 {
973                    self.expiry_count += 1;
974                }
975                entry.expires_at_ms = time::now_ms().saturating_add(millis);
976                self.bump_version(key);
977                true
978            }
979            None => false,
980        }
981    }
982
983    /// Sets an expiration at an absolute Unix timestamp (seconds).
984    ///
985    /// Returns `true` if the key exists and the expiry was set,
986    /// `false` if the key doesn't exist.
987    pub fn expireat(&mut self, key: &str, unix_secs: u64) -> bool {
988        if self.remove_if_expired(key) {
989            return false;
990        }
991        match self.entries.get_mut(key) {
992            Some(entry) => {
993                if entry.expires_at_ms == 0 {
994                    self.expiry_count += 1;
995                }
996                entry.expires_at_ms = time::unix_ms_to_monotonic_ms(unix_secs.saturating_mul(1000));
997                self.bump_version(key);
998                true
999            }
1000            None => false,
1001        }
1002    }
1003
1004    /// Sets an expiration at an absolute Unix timestamp (milliseconds).
1005    ///
1006    /// Returns `true` if the key exists and the expiry was set,
1007    /// `false` if the key doesn't exist.
1008    pub fn pexpireat(&mut self, key: &str, unix_ms: u64) -> bool {
1009        if self.remove_if_expired(key) {
1010            return false;
1011        }
1012        match self.entries.get_mut(key) {
1013            Some(entry) => {
1014                if entry.expires_at_ms == 0 {
1015                    self.expiry_count += 1;
1016                }
1017                entry.expires_at_ms = time::unix_ms_to_monotonic_ms(unix_ms);
1018                self.bump_version(key);
1019                true
1020            }
1021            None => false,
1022        }
1023    }
1024
1025    /// Returns the absolute Unix timestamp (seconds) when the key expires.
1026    ///
1027    /// Returns `-2` if the key doesn't exist, `-1` if it has no expiry.
1028    pub fn expiretime(&mut self, key: &str) -> i64 {
1029        if self.remove_if_expired(key) {
1030            return -2;
1031        }
1032        match self.entries.get(key) {
1033            None => -2,
1034            Some(entry) => match time::monotonic_to_unix_ms(entry.expires_at_ms) {
1035                None => -1,
1036                Some(unix_ms) => (unix_ms / 1000) as i64,
1037            },
1038        }
1039    }
1040
1041    /// Returns the absolute Unix timestamp (milliseconds) when the key expires.
1042    ///
1043    /// Returns `-2` if the key doesn't exist, `-1` if it has no expiry.
1044    pub fn pexpiretime(&mut self, key: &str) -> i64 {
1045        if self.remove_if_expired(key) {
1046            return -2;
1047        }
1048        match self.entries.get(key) {
1049            None => -2,
1050            Some(entry) => match time::monotonic_to_unix_ms(entry.expires_at_ms) {
1051                None => -1,
1052                Some(unix_ms) => unix_ms as i64,
1053            },
1054        }
1055    }
1056
1057    /// Returns all keys matching a glob pattern.
1058    ///
1059    /// Warning: O(n) scan of the entire keyspace. Use SCAN for production
1060    /// workloads with large key counts.
1061    pub fn keys(&self, pattern: &str) -> Vec<String> {
1062        let len = self.entries.len();
1063        if len > 10_000 {
1064            warn!(
1065                key_count = len,
1066                "KEYS on large keyspace, consider SCAN instead"
1067            );
1068        }
1069        let compiled = GlobPattern::new(pattern);
1070        self.entries
1071            .iter()
1072            .filter(|(_, entry)| !entry.is_expired())
1073            .filter(|(key, _)| compiled.matches(key))
1074            .map(|(key, _)| String::from(&**key))
1075            .collect()
1076    }
1077
1078    /// Counts live keys in this keyspace that hash to the given cluster slot.
1079    ///
1080    /// O(n) scan over all entries — same cost as KEYS.
1081    pub fn count_keys_in_slot(&self, slot: u16) -> usize {
1082        self.entries
1083            .iter()
1084            .filter(|(_, entry)| !entry.is_expired())
1085            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
1086            .count()
1087    }
1088
1089    /// Returns up to `count` live keys that hash to the given cluster slot.
1090    ///
1091    /// O(n) scan over all entries — same cost as KEYS.
1092    pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
1093        self.entries
1094            .iter()
1095            .filter(|(_, entry)| !entry.is_expired())
1096            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
1097            .take(count)
1098            .map(|(key, _)| String::from(&**key))
1099            .collect()
1100    }
1101
1102    /// Renames a key to a new name. Returns an error if the source key
1103    /// doesn't exist. If the destination key already exists, it is overwritten.
1104    pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
1105        self.remove_if_expired(key);
1106        self.remove_if_expired(newkey);
1107
1108        let entry = match self.entries.remove(key) {
1109            Some(entry) => entry,
1110            None => return Err(RenameError::NoSuchKey),
1111        };
1112
1113        // update memory tracking for old key removal
1114        self.memory.remove(key, &entry.value);
1115        self.decrement_expiry_if_set(&entry);
1116
1117        // remove destination if it exists
1118        if let Some(old_dest) = self.entries.remove(newkey) {
1119            self.memory.remove(newkey, &old_dest.value);
1120            self.decrement_expiry_if_set(&old_dest);
1121        }
1122
1123        // re-insert with the new key name, preserving value and expiry
1124        self.memory.add(newkey, &entry.value);
1125        if entry.expires_at_ms != 0 {
1126            self.expiry_count += 1;
1127        }
1128        self.remove_version(key);
1129        self.entries.insert(CompactString::from(newkey), entry);
1130        self.bump_version(newkey);
1131        Ok(())
1132    }
1133
1134    /// Copies the value at `source` to `destination`. If `replace` is false and
1135    /// the destination already exists, returns `Ok(false)`. Returns `Ok(true)` on
1136    /// success.
1137    pub fn copy(&mut self, source: &str, dest: &str, replace: bool) -> Result<bool, CopyError> {
1138        self.remove_if_expired(source);
1139        self.remove_if_expired(dest);
1140
1141        let src_entry = match self.entries.get(source) {
1142            Some(e) => e,
1143            None => return Err(CopyError::NoSuchKey),
1144        };
1145
1146        // if destination exists and replace not set, return 0
1147        if !replace && self.entries.contains_key(dest) {
1148            return Ok(false);
1149        }
1150
1151        // clone value and expiry from source
1152        let cloned_value = src_entry.value.clone();
1153        let cloned_expire = if src_entry.expires_at_ms != 0 {
1154            Some(src_entry.expires_at_ms)
1155        } else {
1156            None
1157        };
1158
1159        // estimate memory for the new entry
1160        let new_size = memory::entry_size(dest, &cloned_value);
1161
1162        // if replacing, account for the old destination's size
1163        let old_dest_size = self
1164            .entries
1165            .get(dest)
1166            .map(|e| e.entry_size(dest))
1167            .unwrap_or(0);
1168        let net_increase = new_size.saturating_sub(old_dest_size);
1169        if !self.enforce_memory_limit(net_increase) {
1170            return Err(CopyError::OutOfMemory);
1171        }
1172
1173        // remove old destination if replacing
1174        if let Some(old_dest) = self.entries.remove(dest) {
1175            self.memory.remove(dest, &old_dest.value);
1176            self.decrement_expiry_if_set(&old_dest);
1177            self.defer_drop(old_dest.value);
1178        }
1179
1180        // insert the clone
1181        self.memory.add(dest, &cloned_value);
1182        let has_expiry = cloned_expire.is_some();
1183        if has_expiry {
1184            self.expiry_count += 1;
1185        }
1186        let mut entry = Entry::new(cloned_value, None);
1187        // preserve the source's absolute expiry timestamp
1188        if let Some(ts) = cloned_expire {
1189            entry.expires_at_ms = ts;
1190        }
1191        self.entries.insert(CompactString::from(dest), entry);
1192        self.bump_version(dest);
1193        Ok(true)
1194    }
1195
1196    /// Updates the memory limit and eviction policy in-place.
1197    ///
1198    /// Takes effect immediately for all subsequent write commands.
1199    /// `track_access` is synchronized with the new policy so LRU
1200    /// sampling stays consistent.
1201    pub fn update_memory_config(
1202        &mut self,
1203        max_memory: Option<usize>,
1204        eviction_policy: EvictionPolicy,
1205    ) {
1206        self.config.max_memory = max_memory;
1207        self.config.eviction_policy = eviction_policy;
1208        self.track_access = matches!(eviction_policy, EvictionPolicy::AllKeysLru);
1209    }
1210
1211    /// Returns aggregated stats for this keyspace.
1212    ///
1213    /// All fields are tracked incrementally — this is O(1).
1214    pub fn stats(&self) -> KeyspaceStats {
1215        KeyspaceStats {
1216            key_count: self.memory.key_count(),
1217            used_bytes: self.memory.used_bytes(),
1218            keys_with_expiry: self.expiry_count,
1219            keys_expired: self.expired_total,
1220            keys_evicted: self.evicted_total,
1221            oom_rejections: self.oom_rejections,
1222            keyspace_hits: self.keyspace_hits,
1223            keyspace_misses: self.keyspace_misses,
1224        }
1225    }
1226
1227    /// Returns the number of live keys.
1228    pub fn len(&self) -> usize {
1229        self.entries.len()
1230    }
1231
1232    /// Removes all keys from the keyspace.
1233    pub fn clear(&mut self) {
1234        self.entries.clear();
1235        self.memory.reset();
1236        self.expiry_count = 0;
1237        self.versions.clear();
1238    }
1239
1240    /// Returns `true` if the keyspace has no entries.
1241    pub fn is_empty(&self) -> bool {
1242        self.entries.is_empty()
1243    }
1244
1245    /// Scans keys starting from a cursor position.
1246    ///
1247    /// Returns the next cursor (0 if scan complete) and a batch of keys.
1248    /// The `pattern` argument supports glob-style matching (`*`, `?`, `[abc]`).
1249    pub fn scan_keys(
1250        &self,
1251        cursor: u64,
1252        count: usize,
1253        pattern: Option<&str>,
1254    ) -> (u64, Vec<String>) {
1255        let mut keys = Vec::with_capacity(count);
1256        let mut position = 0u64;
1257        let target_count = if count == 0 { 10 } else { count };
1258
1259        let compiled = pattern.map(GlobPattern::new);
1260
1261        for (key, entry) in self.entries.iter() {
1262            // skip expired entries
1263            if entry.is_expired() {
1264                continue;
1265            }
1266
1267            // skip entries before cursor
1268            if position < cursor {
1269                position += 1;
1270                continue;
1271            }
1272
1273            // pattern matching
1274            if let Some(ref pat) = compiled {
1275                if !pat.matches(key) {
1276                    position += 1;
1277                    continue;
1278                }
1279            }
1280
1281            keys.push(String::from(&**key));
1282            position += 1;
1283
1284            if keys.len() >= target_count {
1285                // return position as next cursor
1286                return (position, keys);
1287            }
1288        }
1289
1290        // scan complete
1291        (0, keys)
1292    }
1293
1294    /// Returns the value and remaining TTL in milliseconds for a single key.
1295    ///
1296    /// Returns `None` if the key doesn't exist or is expired. TTL is -1 for
1297    /// entries with no expiration. Used by MIGRATE/DUMP to serialize a key
1298    /// for transfer to another node.
1299    pub fn dump(&mut self, key: &str) -> Option<(&Value, i64)> {
1300        if self.remove_if_expired(key) {
1301            return None;
1302        }
1303        let entry = self.entries.get(key)?;
1304        let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1305            Some(ms) => ms.min(i64::MAX as u64) as i64,
1306            None => -1,
1307        };
1308        Some((&entry.value, ttl_ms))
1309    }
1310
1311    /// Iterates over all live (non-expired) entries, yielding the key, a
1312    /// clone of the value, and the remaining TTL in milliseconds (-1 for
1313    /// entries with no expiration). Used by snapshot and AOF rewrite.
1314    pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
1315        self.entries.iter().filter_map(move |(key, entry)| {
1316            if entry.is_expired() {
1317                return None;
1318            }
1319            let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1320                Some(ms) => ms.min(i64::MAX as u64) as i64,
1321                None => -1,
1322            };
1323            Some((&**key, &entry.value, ttl_ms))
1324        })
1325    }
1326
1327    /// Restores an entry during recovery, bypassing memory limits.
1328    ///
1329    /// `ttl` is the remaining time-to-live. If `None`, the key has no expiry.
1330    /// This is used only during shard startup when loading from
1331    /// snapshot/AOF — normal writes should go through `set()`.
1332    pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
1333        let has_expiry = ttl.is_some();
1334
1335        // if replacing an existing entry, adjust memory tracking
1336        if let Some(old) = self.entries.get(key.as_str()) {
1337            self.memory.replace(&key, &old.value, &value);
1338            self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
1339        } else {
1340            self.memory.add(&key, &value);
1341            if has_expiry {
1342                self.expiry_count += 1;
1343            }
1344        }
1345
1346        let entry = Entry::new(value, ttl);
1347        self.entries.insert(CompactString::from(key.clone()), entry);
1348        self.bump_version(&key);
1349    }
1350
1351    /// Randomly samples up to `count` keys and removes any that have expired.
1352    ///
1353    /// Samples up to `count` random keys and removes any that have expired.
1354    ///
1355    /// Expired key names are appended to `out` so the caller can emit
1356    /// keyspace notifications. Returns the number of keys removed.
1357    pub(crate) fn expire_sample(&mut self, count: usize, out: &mut Vec<String>) -> usize {
1358        if self.entries.is_empty() {
1359            return 0;
1360        }
1361
1362        let mut rng = rand::rng();
1363
1364        let keys_to_check: Vec<String> = self
1365            .entries
1366            .keys()
1367            .choose_multiple(&mut rng, count)
1368            .into_iter()
1369            .map(|k| String::from(&**k))
1370            .collect();
1371
1372        let mut removed = 0;
1373        for key in keys_to_check {
1374            if self.remove_if_expired(&key) {
1375                out.push(key);
1376                removed += 1;
1377            }
1378        }
1379        removed
1380    }
1381
1382    /// Removes a key that is already known to be expired. Used by
1383    /// fused lookup paths that check expiry inline via `get_mut()` and
1384    /// need a second probe only on the rare expired path.
1385    fn remove_expired_entry(&mut self, key: &str) {
1386        if let Some(entry) = self.entries.remove(key) {
1387            self.memory.remove(key, &entry.value);
1388            self.decrement_expiry_if_set(&entry);
1389            self.expired_total += 1;
1390            self.remove_version(key);
1391            self.defer_drop(entry.value);
1392        }
1393    }
1394
1395    /// Checks if a key is expired and removes it if so. Returns `true`
1396    /// if the key was removed (or didn't exist).
1397    fn remove_if_expired(&mut self, key: &str) -> bool {
1398        let expired = self
1399            .entries
1400            .get(key)
1401            .map(|e| e.is_expired())
1402            .unwrap_or(false);
1403
1404        if expired {
1405            if let Some(entry) = self.entries.remove(key) {
1406                self.memory.remove(key, &entry.value);
1407                self.decrement_expiry_if_set(&entry);
1408                self.expired_total += 1;
1409                self.remove_version(key);
1410                self.defer_drop(entry.value);
1411            }
1412        }
1413        expired
1414    }
1415
1416    /// Returns a mutable reference to the entry for `key`, or `None` if the
1417    /// key doesn't exist or has expired.
1418    ///
1419    /// Combines the three steps that almost every read operation repeats:
1420    /// 1. Remove the key if it has expired (lazy expiration).
1421    /// 2. Look up the entry in the map.
1422    /// 3. Touch the entry to update its last-access timestamp for LRU.
1423    ///
1424    /// Callers still need to match on the entry's value type and handle
1425    /// `WrongType` themselves — this helper just eliminates the common
1426    /// expiry + lookup + touch boilerplate.
1427    fn get_live_entry(&mut self, key: &str) -> Option<&mut Entry> {
1428        self.remove_if_expired(key);
1429        let entry = self.entries.get_mut(key)?;
1430        entry.touch(self.track_access);
1431        Some(entry)
1432    }
1433
1434    /// Sends a value to the background drop thread if one is configured
1435    /// and the value is large enough to justify the overhead.
1436    fn defer_drop(&self, value: Value) {
1437        if let Some(ref handle) = self.drop_handle {
1438            handle.defer_value(value);
1439        }
1440    }
1441}
1442
1443impl Default for Keyspace {
1444    fn default() -> Self {
1445        Self::new()
1446    }
1447}
1448
1449/// Formats a float value matching Redis behavior.
1450///
1451/// Uses up to 17 significant digits and strips unnecessary trailing zeros,
1452/// but always keeps at least one decimal place for non-integer results.
1453pub(crate) fn format_float(val: f64) -> String {
1454    if val == 0.0 {
1455        return "0".into();
1456    }
1457    // Use enough precision to round-trip
1458    let s = format!("{:.17e}", val);
1459    // Parse back to get the clean representation
1460    let reparsed: f64 = s.parse().unwrap_or(val);
1461    // If it's a whole number that fits in i64, format without decimals
1462    if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
1463        format!("{}", reparsed as i64)
1464    } else {
1465        // Use ryu-like formatting via Display which strips trailing zeros
1466        let formatted = format!("{}", reparsed);
1467        formatted
1468    }
1469}
1470
1471/// Glob-style pattern matching for SCAN's MATCH option.
1472///
1473/// Supports:
1474/// - `*` matches any sequence of characters (including empty)
1475/// - `?` matches exactly one character
1476/// - `[abc]` matches one character from the set
1477/// - `[^abc]` or `[!abc]` matches one character NOT in the set
1478///
1479/// Uses an iterative two-pointer algorithm with backtracking for O(n*m)
1480/// worst-case performance, where n is pattern length and m is text length.
1481///
1482/// Prefer [`GlobPattern::new`] + [`GlobPattern::matches`] when matching
1483/// the same pattern against many strings (KEYS, SCAN) to avoid
1484/// re-collecting the pattern chars on every call.
1485pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
1486    let pat: Vec<char> = pattern.chars().collect();
1487    glob_match_compiled(&pat, text)
1488}
1489
1490/// Pre-compiled glob pattern that avoids re-allocating pattern chars
1491/// on every match call. Use for KEYS/SCAN where the same pattern is
1492/// tested against every key in the keyspace.
1493pub(crate) struct GlobPattern {
1494    chars: Vec<char>,
1495}
1496
1497impl GlobPattern {
1498    pub(crate) fn new(pattern: &str) -> Self {
1499        Self {
1500            chars: pattern.chars().collect(),
1501        }
1502    }
1503
1504    pub(crate) fn matches(&self, text: &str) -> bool {
1505        glob_match_compiled(&self.chars, text)
1506    }
1507}
1508
1509/// Core glob matching against a pre-compiled pattern char slice.
1510fn glob_match_compiled(pat: &[char], text: &str) -> bool {
1511    let txt: Vec<char> = text.chars().collect();
1512
1513    let mut pi = 0; // pattern index
1514    let mut ti = 0; // text index
1515
1516    // backtracking state for the most recent '*'
1517    let mut star_pi: Option<usize> = None;
1518    let mut star_ti: usize = 0;
1519
1520    while ti < txt.len() || pi < pat.len() {
1521        if pi < pat.len() {
1522            match pat[pi] {
1523                '*' => {
1524                    // record star position and try matching zero chars first
1525                    star_pi = Some(pi);
1526                    star_ti = ti;
1527                    pi += 1;
1528                    continue;
1529                }
1530                '?' if ti < txt.len() => {
1531                    pi += 1;
1532                    ti += 1;
1533                    continue;
1534                }
1535                '[' if ti < txt.len() => {
1536                    // parse character class
1537                    let tc = txt[ti];
1538                    let mut j = pi + 1;
1539                    let mut negated = false;
1540                    let mut matched = false;
1541
1542                    if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1543                        negated = true;
1544                        j += 1;
1545                    }
1546
1547                    while j < pat.len() && pat[j] != ']' {
1548                        if pat[j] == tc {
1549                            matched = true;
1550                        }
1551                        j += 1;
1552                    }
1553
1554                    if negated {
1555                        matched = !matched;
1556                    }
1557
1558                    if matched && j < pat.len() {
1559                        pi = j + 1; // skip past ']'
1560                        ti += 1;
1561                        continue;
1562                    }
1563                    // fall through to backtrack
1564                }
1565                c if ti < txt.len() && c == txt[ti] => {
1566                    pi += 1;
1567                    ti += 1;
1568                    continue;
1569                }
1570                _ => {}
1571            }
1572        }
1573
1574        // mismatch or end of pattern — try backtracking to last '*'
1575        if let Some(sp) = star_pi {
1576            pi = sp + 1;
1577            star_ti += 1;
1578            ti = star_ti;
1579            if ti > txt.len() {
1580                return false;
1581            }
1582        } else {
1583            return false;
1584        }
1585    }
1586
1587    // skip trailing '*' in pattern
1588    while pi < pat.len() && pat[pi] == '*' {
1589        pi += 1;
1590    }
1591
1592    pi == pat.len()
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597    use super::*;
1598    use std::thread;
1599
1600    #[test]
1601    fn del_existing() {
1602        let mut ks = Keyspace::new();
1603        ks.set("key".into(), Bytes::from("val"), None, false, false);
1604        assert!(ks.del("key"));
1605        assert_eq!(ks.get("key").unwrap(), None);
1606    }
1607
1608    #[test]
1609    fn del_missing() {
1610        let mut ks = Keyspace::new();
1611        assert!(!ks.del("nope"));
1612    }
1613
1614    #[test]
1615    fn exists_present_and_absent() {
1616        let mut ks = Keyspace::new();
1617        ks.set("yes".into(), Bytes::from("here"), None, false, false);
1618        assert!(ks.exists("yes"));
1619        assert!(!ks.exists("no"));
1620    }
1621
1622    #[test]
1623    fn ttl_no_expiry() {
1624        let mut ks = Keyspace::new();
1625        ks.set("key".into(), Bytes::from("val"), None, false, false);
1626        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1627    }
1628
1629    #[test]
1630    fn ttl_not_found() {
1631        let mut ks = Keyspace::new();
1632        assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1633    }
1634
1635    #[test]
1636    fn ttl_with_expiry() {
1637        let mut ks = Keyspace::new();
1638        ks.set(
1639            "key".into(),
1640            Bytes::from("val"),
1641            Some(Duration::from_secs(100)),
1642            false,
1643            false,
1644        );
1645        match ks.ttl("key") {
1646            TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
1647            other => panic!("expected Seconds, got {other:?}"),
1648        }
1649    }
1650
1651    #[test]
1652    fn ttl_expired_key() {
1653        let mut ks = Keyspace::new();
1654        ks.set(
1655            "temp".into(),
1656            Bytes::from("val"),
1657            Some(Duration::from_millis(10)),
1658            false,
1659            false,
1660        );
1661        thread::sleep(Duration::from_millis(30));
1662        assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1663    }
1664
1665    #[test]
1666    fn expire_existing_key() {
1667        let mut ks = Keyspace::new();
1668        ks.set("key".into(), Bytes::from("val"), None, false, false);
1669        assert!(ks.expire("key", 60));
1670        match ks.ttl("key") {
1671            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
1672            other => panic!("expected Seconds, got {other:?}"),
1673        }
1674    }
1675
1676    #[test]
1677    fn expire_missing_key() {
1678        let mut ks = Keyspace::new();
1679        assert!(!ks.expire("nope", 60));
1680    }
1681
1682    #[test]
1683    fn del_expired_key_returns_false() {
1684        let mut ks = Keyspace::new();
1685        ks.set(
1686            "temp".into(),
1687            Bytes::from("val"),
1688            Some(Duration::from_millis(10)),
1689            false,
1690            false,
1691        );
1692        thread::sleep(Duration::from_millis(30));
1693        // key is expired, del should return false (not found)
1694        assert!(!ks.del("temp"));
1695    }
1696
1697    // -- memory tracking tests --
1698
1699    #[test]
1700    fn memory_increases_on_set() {
1701        let mut ks = Keyspace::new();
1702        assert_eq!(ks.stats().used_bytes, 0);
1703        ks.set("key".into(), Bytes::from("value"), None, false, false);
1704        assert!(ks.stats().used_bytes > 0);
1705        assert_eq!(ks.stats().key_count, 1);
1706    }
1707
1708    #[test]
1709    fn memory_decreases_on_del() {
1710        let mut ks = Keyspace::new();
1711        ks.set("key".into(), Bytes::from("value"), None, false, false);
1712        let after_set = ks.stats().used_bytes;
1713        ks.del("key");
1714        assert_eq!(ks.stats().used_bytes, 0);
1715        assert!(after_set > 0);
1716    }
1717
1718    #[test]
1719    fn memory_adjusts_on_overwrite() {
1720        let mut ks = Keyspace::new();
1721        ks.set("key".into(), Bytes::from("short"), None, false, false);
1722        let small = ks.stats().used_bytes;
1723
1724        ks.set(
1725            "key".into(),
1726            Bytes::from("a much longer value"),
1727            None,
1728            false,
1729            false,
1730        );
1731        let large = ks.stats().used_bytes;
1732
1733        assert!(large > small);
1734        assert_eq!(ks.stats().key_count, 1);
1735    }
1736
1737    #[test]
1738    fn memory_decreases_on_expired_removal() {
1739        let mut ks = Keyspace::new();
1740        ks.set(
1741            "temp".into(),
1742            Bytes::from("data"),
1743            Some(Duration::from_millis(10)),
1744            false,
1745            false,
1746        );
1747        assert!(ks.stats().used_bytes > 0);
1748        thread::sleep(Duration::from_millis(30));
1749        // trigger lazy expiration
1750        let _ = ks.get("temp");
1751        assert_eq!(ks.stats().used_bytes, 0);
1752        assert_eq!(ks.stats().key_count, 0);
1753    }
1754
1755    #[test]
1756    fn stats_tracks_expiry_count() {
1757        let mut ks = Keyspace::new();
1758        ks.set("a".into(), Bytes::from("1"), None, false, false);
1759        ks.set(
1760            "b".into(),
1761            Bytes::from("2"),
1762            Some(Duration::from_secs(100)),
1763            false,
1764            false,
1765        );
1766        ks.set(
1767            "c".into(),
1768            Bytes::from("3"),
1769            Some(Duration::from_secs(200)),
1770            false,
1771            false,
1772        );
1773
1774        let stats = ks.stats();
1775        assert_eq!(stats.key_count, 3);
1776        assert_eq!(stats.keys_with_expiry, 2);
1777    }
1778
1779    // -- eviction tests --
1780
1781    #[test]
1782    fn noeviction_returns_oom_when_full() {
1783        // one entry with key "a" + value "val" = 1 + 3 + 104 = 108 bytes
1784        // set limit so one entry fits but two don't
1785        let config = ShardConfig {
1786            max_memory: Some(130),
1787            eviction_policy: EvictionPolicy::NoEviction,
1788            ..ShardConfig::default()
1789        };
1790        let mut ks = Keyspace::with_config(config);
1791
1792        // first key should fit
1793        assert_eq!(
1794            ks.set("a".into(), Bytes::from("val"), None, false, false),
1795            SetResult::Ok
1796        );
1797
1798        // second key should push us over the limit
1799        let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1800        assert_eq!(result, SetResult::OutOfMemory);
1801
1802        // original key should still be there
1803        assert!(ks.exists("a"));
1804    }
1805
1806    #[test]
1807    fn lru_eviction_makes_room() {
1808        let config = ShardConfig {
1809            max_memory: Some(130),
1810            eviction_policy: EvictionPolicy::AllKeysLru,
1811            ..ShardConfig::default()
1812        };
1813        let mut ks = Keyspace::with_config(config);
1814
1815        assert_eq!(
1816            ks.set("a".into(), Bytes::from("val"), None, false, false),
1817            SetResult::Ok
1818        );
1819
1820        // this should trigger eviction of "a" to make room
1821        assert_eq!(
1822            ks.set("b".into(), Bytes::from("val"), None, false, false),
1823            SetResult::Ok
1824        );
1825
1826        // "a" should have been evicted
1827        assert!(!ks.exists("a"));
1828        assert!(ks.exists("b"));
1829    }
1830
1831    #[test]
1832    fn safety_margin_rejects_near_raw_limit() {
1833        // one entry = 1 (key) + 3 (val) + ENTRY_OVERHEAD (104) = 108 bytes.
1834        // configure max_memory = 120. effective limit = 120 * 90 / 100 = 108.
1835        // the entry fills exactly the effective limit, so a second entry should
1836        // be rejected even though the raw limit has 12 bytes of headroom.
1837        let config = ShardConfig {
1838            max_memory: Some(120),
1839            eviction_policy: EvictionPolicy::NoEviction,
1840            ..ShardConfig::default()
1841        };
1842        let mut ks = Keyspace::with_config(config);
1843
1844        assert_eq!(
1845            ks.set("a".into(), Bytes::from("val"), None, false, false),
1846            SetResult::Ok
1847        );
1848
1849        let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1850        assert_eq!(result, SetResult::OutOfMemory);
1851    }
1852
1853    #[test]
1854    fn overwrite_same_size_succeeds_at_limit() {
1855        let config = ShardConfig {
1856            max_memory: Some(130),
1857            eviction_policy: EvictionPolicy::NoEviction,
1858            ..ShardConfig::default()
1859        };
1860        let mut ks = Keyspace::with_config(config);
1861
1862        assert_eq!(
1863            ks.set("a".into(), Bytes::from("val"), None, false, false),
1864            SetResult::Ok
1865        );
1866
1867        // overwriting with same-size value should succeed — no net increase
1868        assert_eq!(
1869            ks.set("a".into(), Bytes::from("new"), None, false, false),
1870            SetResult::Ok
1871        );
1872        assert_eq!(
1873            ks.get("a").unwrap(),
1874            Some(Value::String(Bytes::from("new")))
1875        );
1876    }
1877
1878    #[test]
1879    fn overwrite_larger_value_respects_limit() {
1880        let config = ShardConfig {
1881            max_memory: Some(130),
1882            eviction_policy: EvictionPolicy::NoEviction,
1883            ..ShardConfig::default()
1884        };
1885        let mut ks = Keyspace::with_config(config);
1886
1887        assert_eq!(
1888            ks.set("a".into(), Bytes::from("val"), None, false, false),
1889            SetResult::Ok
1890        );
1891
1892        // overwriting with a much larger value should fail if it exceeds limit
1893        let big_value = "x".repeat(200);
1894        let result = ks.set("a".into(), Bytes::from(big_value), None, false, false);
1895        assert_eq!(result, SetResult::OutOfMemory);
1896
1897        // original value should still be intact
1898        assert_eq!(
1899            ks.get("a").unwrap(),
1900            Some(Value::String(Bytes::from("val")))
1901        );
1902    }
1903
1904    // -- iter_entries tests --
1905
1906    #[test]
1907    fn iter_entries_returns_live_entries() {
1908        let mut ks = Keyspace::new();
1909        ks.set("a".into(), Bytes::from("1"), None, false, false);
1910        ks.set(
1911            "b".into(),
1912            Bytes::from("2"),
1913            Some(Duration::from_secs(100)),
1914            false,
1915            false,
1916        );
1917
1918        let entries: Vec<_> = ks.iter_entries().collect();
1919        assert_eq!(entries.len(), 2);
1920    }
1921
1922    #[test]
1923    fn iter_entries_skips_expired() {
1924        let mut ks = Keyspace::new();
1925        ks.set(
1926            "dead".into(),
1927            Bytes::from("gone"),
1928            Some(Duration::from_millis(1)),
1929            false,
1930            false,
1931        );
1932        ks.set("alive".into(), Bytes::from("here"), None, false, false);
1933        thread::sleep(Duration::from_millis(10));
1934
1935        let entries: Vec<_> = ks.iter_entries().collect();
1936        assert_eq!(entries.len(), 1);
1937        assert_eq!(entries[0].0, "alive");
1938    }
1939
1940    #[test]
1941    fn iter_entries_ttl_for_no_expiry() {
1942        let mut ks = Keyspace::new();
1943        ks.set("permanent".into(), Bytes::from("val"), None, false, false);
1944
1945        let entries: Vec<_> = ks.iter_entries().collect();
1946        assert_eq!(entries[0].2, -1);
1947    }
1948
1949    // -- restore tests --
1950
1951    #[test]
1952    fn restore_adds_entry() {
1953        let mut ks = Keyspace::new();
1954        ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1955        assert_eq!(
1956            ks.get("restored").unwrap(),
1957            Some(Value::String(Bytes::from("data")))
1958        );
1959        assert_eq!(ks.stats().key_count, 1);
1960    }
1961
1962    #[test]
1963    fn restore_with_zero_ttl_expires_immediately() {
1964        let mut ks = Keyspace::new();
1965        // TTL of 0 should create entry that expires immediately
1966        ks.restore(
1967            "short-lived".into(),
1968            Value::String(Bytes::from("data")),
1969            Some(Duration::from_millis(1)),
1970        );
1971        // Entry exists but will be expired on access
1972        std::thread::sleep(Duration::from_millis(5));
1973        assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
1974    }
1975
1976    #[test]
1977    fn restore_overwrites_existing() {
1978        let mut ks = Keyspace::new();
1979        ks.set("key".into(), Bytes::from("old"), None, false, false);
1980        ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1981        assert_eq!(
1982            ks.get("key").unwrap(),
1983            Some(Value::String(Bytes::from("new")))
1984        );
1985        assert_eq!(ks.stats().key_count, 1);
1986    }
1987
1988    #[test]
1989    fn restore_bypasses_memory_limit() {
1990        let config = ShardConfig {
1991            max_memory: Some(50), // very small
1992            eviction_policy: EvictionPolicy::NoEviction,
1993            ..ShardConfig::default()
1994        };
1995        let mut ks = Keyspace::with_config(config);
1996
1997        // normal set would fail due to memory limit
1998        ks.restore(
1999            "big".into(),
2000            Value::String(Bytes::from("x".repeat(200))),
2001            None,
2002        );
2003        assert_eq!(ks.stats().key_count, 1);
2004    }
2005
2006    #[test]
2007    fn no_limit_never_rejects() {
2008        // default config has no memory limit
2009        let mut ks = Keyspace::new();
2010        for i in 0..100 {
2011            assert_eq!(
2012                ks.set(format!("key:{i}"), Bytes::from("value"), None, false, false),
2013                SetResult::Ok
2014            );
2015        }
2016        assert_eq!(ks.len(), 100);
2017    }
2018
2019    #[test]
2020    fn clear_removes_all_keys() {
2021        let mut ks = Keyspace::new();
2022        ks.set("a".into(), Bytes::from("1"), None, false, false);
2023        ks.set(
2024            "b".into(),
2025            Bytes::from("2"),
2026            Some(Duration::from_secs(60)),
2027            false,
2028            false,
2029        );
2030        ks.lpush("list", &[Bytes::from("x")]).unwrap();
2031
2032        assert_eq!(ks.len(), 3);
2033        assert!(ks.stats().used_bytes > 0);
2034        assert_eq!(ks.stats().keys_with_expiry, 1);
2035
2036        ks.clear();
2037
2038        assert_eq!(ks.len(), 0);
2039        assert!(ks.is_empty());
2040        assert_eq!(ks.stats().used_bytes, 0);
2041        assert_eq!(ks.stats().keys_with_expiry, 0);
2042    }
2043
2044    // --- scan ---
2045
2046    #[test]
2047    fn scan_returns_keys() {
2048        let mut ks = Keyspace::new();
2049        ks.set("key1".into(), Bytes::from("a"), None, false, false);
2050        ks.set("key2".into(), Bytes::from("b"), None, false, false);
2051        ks.set("key3".into(), Bytes::from("c"), None, false, false);
2052
2053        let (cursor, keys) = ks.scan_keys(0, 10, None);
2054        assert_eq!(cursor, 0); // complete in one pass
2055        assert_eq!(keys.len(), 3);
2056    }
2057
2058    #[test]
2059    fn scan_empty_keyspace() {
2060        let ks = Keyspace::new();
2061        let (cursor, keys) = ks.scan_keys(0, 10, None);
2062        assert_eq!(cursor, 0);
2063        assert!(keys.is_empty());
2064    }
2065
2066    #[test]
2067    fn scan_with_pattern() {
2068        let mut ks = Keyspace::new();
2069        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2070        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2071        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2072
2073        let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
2074        assert_eq!(cursor, 0);
2075        assert_eq!(keys.len(), 2);
2076        for k in &keys {
2077            assert!(k.starts_with("user:"));
2078        }
2079    }
2080
2081    #[test]
2082    fn scan_with_count_limit() {
2083        let mut ks = Keyspace::new();
2084        for i in 0..10 {
2085            ks.set(format!("k{i}"), Bytes::from("v"), None, false, false);
2086        }
2087
2088        // first batch
2089        let (cursor, keys) = ks.scan_keys(0, 3, None);
2090        assert!(!keys.is_empty());
2091        assert!(keys.len() <= 3);
2092
2093        // if there are more keys, cursor should be non-zero
2094        if cursor != 0 {
2095            let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
2096            assert!(!keys2.is_empty());
2097            // continue until complete
2098            let _ = (cursor2, keys2);
2099        }
2100    }
2101
2102    #[test]
2103    fn scan_skips_expired_keys() {
2104        let mut ks = Keyspace::new();
2105        ks.set("live".into(), Bytes::from("a"), None, false, false);
2106        ks.set(
2107            "expired".into(),
2108            Bytes::from("b"),
2109            Some(Duration::from_millis(1)),
2110            false,
2111            false,
2112        );
2113
2114        std::thread::sleep(Duration::from_millis(5));
2115
2116        let (_, keys) = ks.scan_keys(0, 10, None);
2117        assert_eq!(keys.len(), 1);
2118        assert_eq!(keys[0], "live");
2119    }
2120
2121    #[test]
2122    fn glob_match_star() {
2123        assert!(super::glob_match("user:*", "user:123"));
2124        assert!(super::glob_match("user:*", "user:"));
2125        assert!(super::glob_match("*:data", "foo:data"));
2126        assert!(!super::glob_match("user:*", "item:123"));
2127    }
2128
2129    #[test]
2130    fn glob_match_question() {
2131        assert!(super::glob_match("key?", "key1"));
2132        assert!(super::glob_match("key?", "keya"));
2133        assert!(!super::glob_match("key?", "key"));
2134        assert!(!super::glob_match("key?", "key12"));
2135    }
2136
2137    #[test]
2138    fn glob_match_brackets() {
2139        assert!(super::glob_match("key[abc]", "keya"));
2140        assert!(super::glob_match("key[abc]", "keyb"));
2141        assert!(!super::glob_match("key[abc]", "keyd"));
2142    }
2143
2144    #[test]
2145    fn glob_match_literal() {
2146        assert!(super::glob_match("exact", "exact"));
2147        assert!(!super::glob_match("exact", "exactnot"));
2148        assert!(!super::glob_match("exact", "notexact"));
2149    }
2150
2151    // --- persist/pttl/pexpire ---
2152
2153    #[test]
2154    fn persist_removes_expiry() {
2155        let mut ks = Keyspace::new();
2156        ks.set(
2157            "key".into(),
2158            Bytes::from("val"),
2159            Some(Duration::from_secs(60)),
2160            false,
2161            false,
2162        );
2163        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2164
2165        assert!(ks.persist("key"));
2166        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2167        assert_eq!(ks.stats().keys_with_expiry, 0);
2168    }
2169
2170    #[test]
2171    fn persist_returns_false_without_expiry() {
2172        let mut ks = Keyspace::new();
2173        ks.set("key".into(), Bytes::from("val"), None, false, false);
2174        assert!(!ks.persist("key"));
2175    }
2176
2177    #[test]
2178    fn persist_returns_false_for_missing_key() {
2179        let mut ks = Keyspace::new();
2180        assert!(!ks.persist("missing"));
2181    }
2182
2183    #[test]
2184    fn pttl_returns_milliseconds() {
2185        let mut ks = Keyspace::new();
2186        ks.set(
2187            "key".into(),
2188            Bytes::from("val"),
2189            Some(Duration::from_secs(60)),
2190            false,
2191            false,
2192        );
2193        match ks.pttl("key") {
2194            TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2195            other => panic!("expected Milliseconds, got {other:?}"),
2196        }
2197    }
2198
2199    #[test]
2200    fn pttl_no_expiry() {
2201        let mut ks = Keyspace::new();
2202        ks.set("key".into(), Bytes::from("val"), None, false, false);
2203        assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2204    }
2205
2206    #[test]
2207    fn pttl_not_found() {
2208        let mut ks = Keyspace::new();
2209        assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2210    }
2211
2212    #[test]
2213    fn pexpire_sets_ttl_in_millis() {
2214        let mut ks = Keyspace::new();
2215        ks.set("key".into(), Bytes::from("val"), None, false, false);
2216        assert!(ks.pexpire("key", 5000));
2217        match ks.pttl("key") {
2218            TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2219            other => panic!("expected Milliseconds, got {other:?}"),
2220        }
2221        assert_eq!(ks.stats().keys_with_expiry, 1);
2222    }
2223
2224    #[test]
2225    fn pexpire_missing_key_returns_false() {
2226        let mut ks = Keyspace::new();
2227        assert!(!ks.pexpire("missing", 5000));
2228    }
2229
2230    #[test]
2231    fn pexpire_overwrites_existing_ttl() {
2232        let mut ks = Keyspace::new();
2233        ks.set(
2234            "key".into(),
2235            Bytes::from("val"),
2236            Some(Duration::from_secs(60)),
2237            false,
2238            false,
2239        );
2240        assert!(ks.pexpire("key", 500));
2241        match ks.pttl("key") {
2242            TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2243            other => panic!("expected Milliseconds, got {other:?}"),
2244        }
2245        // expiry count shouldn't double-count
2246        assert_eq!(ks.stats().keys_with_expiry, 1);
2247    }
2248
2249    // --- expireat / pexpireat ---
2250
2251    #[test]
2252    fn expireat_sets_expiry_on_existing_key() {
2253        use std::time::{SystemTime, UNIX_EPOCH};
2254        let mut ks = Keyspace::new();
2255        ks.set("k".into(), Bytes::from("v"), None, false, false);
2256        let future_secs = SystemTime::now()
2257            .duration_since(UNIX_EPOCH)
2258            .unwrap()
2259            .as_secs()
2260            + 60;
2261        assert!(ks.expireat("k", future_secs));
2262        assert!(matches!(ks.ttl("k"), TtlResult::Seconds(_)));
2263        assert_eq!(ks.stats().keys_with_expiry, 1);
2264    }
2265
2266    #[test]
2267    fn expireat_missing_key_returns_false() {
2268        let mut ks = Keyspace::new();
2269        assert!(!ks.expireat("missing", 9_999_999_999));
2270    }
2271
2272    #[test]
2273    fn expireat_does_not_double_count_expiry() {
2274        use std::time::{SystemTime, UNIX_EPOCH};
2275        let mut ks = Keyspace::new();
2276        let base = SystemTime::now()
2277            .duration_since(UNIX_EPOCH)
2278            .unwrap()
2279            .as_secs();
2280        ks.set(
2281            "k".into(),
2282            Bytes::from("v"),
2283            Some(Duration::from_secs(30)),
2284            false,
2285            false,
2286        );
2287        assert_eq!(ks.stats().keys_with_expiry, 1);
2288        assert!(ks.expireat("k", base + 120));
2289        assert_eq!(ks.stats().keys_with_expiry, 1);
2290    }
2291
2292    #[test]
2293    fn pexpireat_sets_expiry_in_ms() {
2294        use std::time::{SystemTime, UNIX_EPOCH};
2295        let mut ks = Keyspace::new();
2296        ks.set("k".into(), Bytes::from("v"), None, false, false);
2297        let future_ms = SystemTime::now()
2298            .duration_since(UNIX_EPOCH)
2299            .unwrap()
2300            .as_millis() as u64
2301            + 60_000;
2302        assert!(ks.pexpireat("k", future_ms));
2303        assert!(matches!(ks.pttl("k"), TtlResult::Milliseconds(_)));
2304        assert_eq!(ks.stats().keys_with_expiry, 1);
2305    }
2306
2307    #[test]
2308    fn pexpireat_missing_key_returns_false() {
2309        let mut ks = Keyspace::new();
2310        assert!(!ks.pexpireat("missing", 9_999_999_999_000));
2311    }
2312
2313    // --- keys tests ---
2314
2315    #[test]
2316    fn keys_match_all() {
2317        let mut ks = Keyspace::new();
2318        ks.set("a".into(), Bytes::from("1"), None, false, false);
2319        ks.set("b".into(), Bytes::from("2"), None, false, false);
2320        ks.set("c".into(), Bytes::from("3"), None, false, false);
2321        let mut result = ks.keys("*");
2322        result.sort();
2323        assert_eq!(result, vec!["a", "b", "c"]);
2324    }
2325
2326    #[test]
2327    fn keys_with_pattern() {
2328        let mut ks = Keyspace::new();
2329        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2330        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2331        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2332        let mut result = ks.keys("user:*");
2333        result.sort();
2334        assert_eq!(result, vec!["user:1", "user:2"]);
2335    }
2336
2337    #[test]
2338    fn keys_skips_expired() {
2339        let mut ks = Keyspace::new();
2340        ks.set("live".into(), Bytes::from("a"), None, false, false);
2341        ks.set(
2342            "dead".into(),
2343            Bytes::from("b"),
2344            Some(Duration::from_millis(1)),
2345            false,
2346            false,
2347        );
2348        thread::sleep(Duration::from_millis(5));
2349        let result = ks.keys("*");
2350        assert_eq!(result, vec!["live"]);
2351    }
2352
2353    #[test]
2354    fn keys_empty_keyspace() {
2355        let ks = Keyspace::new();
2356        assert!(ks.keys("*").is_empty());
2357    }
2358
2359    // --- rename tests ---
2360
2361    #[test]
2362    fn rename_basic() {
2363        let mut ks = Keyspace::new();
2364        ks.set("old".into(), Bytes::from("value"), None, false, false);
2365        ks.rename("old", "new").unwrap();
2366        assert!(!ks.exists("old"));
2367        assert_eq!(
2368            ks.get("new").unwrap(),
2369            Some(Value::String(Bytes::from("value")))
2370        );
2371    }
2372
2373    #[test]
2374    fn rename_preserves_expiry() {
2375        let mut ks = Keyspace::new();
2376        ks.set(
2377            "old".into(),
2378            Bytes::from("val"),
2379            Some(Duration::from_secs(60)),
2380            false,
2381            false,
2382        );
2383        ks.rename("old", "new").unwrap();
2384        match ks.ttl("new") {
2385            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2386            other => panic!("expected TTL preserved, got {other:?}"),
2387        }
2388    }
2389
2390    #[test]
2391    fn rename_overwrites_destination() {
2392        let mut ks = Keyspace::new();
2393        ks.set("src".into(), Bytes::from("new_val"), None, false, false);
2394        ks.set("dst".into(), Bytes::from("old_val"), None, false, false);
2395        ks.rename("src", "dst").unwrap();
2396        assert!(!ks.exists("src"));
2397        assert_eq!(
2398            ks.get("dst").unwrap(),
2399            Some(Value::String(Bytes::from("new_val")))
2400        );
2401        assert_eq!(ks.len(), 1);
2402    }
2403
2404    #[test]
2405    fn rename_missing_key_returns_error() {
2406        let mut ks = Keyspace::new();
2407        let err = ks.rename("missing", "new").unwrap_err();
2408        assert_eq!(err, RenameError::NoSuchKey);
2409    }
2410
2411    #[test]
2412    fn rename_same_key() {
2413        let mut ks = Keyspace::new();
2414        ks.set("key".into(), Bytes::from("val"), None, false, false);
2415        // renaming to itself should succeed (Redis behavior)
2416        ks.rename("key", "key").unwrap();
2417        assert_eq!(
2418            ks.get("key").unwrap(),
2419            Some(Value::String(Bytes::from("val")))
2420        );
2421    }
2422
2423    #[test]
2424    fn rename_tracks_memory() {
2425        let mut ks = Keyspace::new();
2426        ks.set("old".into(), Bytes::from("value"), None, false, false);
2427        let before = ks.stats().used_bytes;
2428        ks.rename("old", "new").unwrap();
2429        let after = ks.stats().used_bytes;
2430        // same key length, so memory should be the same
2431        assert_eq!(before, after);
2432        assert_eq!(ks.stats().key_count, 1);
2433    }
2434
2435    #[test]
2436    fn zero_ttl_expires_immediately() {
2437        let mut ks = Keyspace::new();
2438        ks.set(
2439            "key".into(),
2440            Bytes::from("val"),
2441            Some(Duration::ZERO),
2442            false,
2443            false,
2444        );
2445
2446        // key should be expired immediately
2447        std::thread::sleep(Duration::from_millis(1));
2448        assert!(ks.get("key").unwrap().is_none());
2449    }
2450
2451    #[test]
2452    fn very_small_ttl_expires_quickly() {
2453        let mut ks = Keyspace::new();
2454        ks.set(
2455            "key".into(),
2456            Bytes::from("val"),
2457            Some(Duration::from_millis(1)),
2458            false,
2459            false,
2460        );
2461
2462        std::thread::sleep(Duration::from_millis(5));
2463        assert!(ks.get("key").unwrap().is_none());
2464    }
2465
2466    #[test]
2467    fn count_keys_in_slot_empty() {
2468        let ks = Keyspace::new();
2469        assert_eq!(ks.count_keys_in_slot(0), 0);
2470    }
2471
2472    #[test]
2473    fn count_keys_in_slot_matches() {
2474        let mut ks = Keyspace::new();
2475        // insert a few keys and count those in a specific slot
2476        ks.set("a".into(), Bytes::from("1"), None, false, false);
2477        ks.set("b".into(), Bytes::from("2"), None, false, false);
2478        ks.set("c".into(), Bytes::from("3"), None, false, false);
2479
2480        let slot_a = ember_cluster::key_slot(b"a");
2481        let count = ks.count_keys_in_slot(slot_a);
2482        // at minimum, "a" should be in its own slot
2483        assert!(count >= 1);
2484    }
2485
2486    #[test]
2487    fn count_keys_in_slot_skips_expired() {
2488        let mut ks = Keyspace::new();
2489        let slot = ember_cluster::key_slot(b"temp");
2490        ks.set(
2491            "temp".into(),
2492            Bytes::from("gone"),
2493            Some(Duration::from_millis(0)),
2494            false,
2495            false,
2496        );
2497        // key is expired — should not be counted
2498        thread::sleep(Duration::from_millis(5));
2499        assert_eq!(ks.count_keys_in_slot(slot), 0);
2500    }
2501
2502    #[test]
2503    fn get_keys_in_slot_returns_matching() {
2504        let mut ks = Keyspace::new();
2505        ks.set("x".into(), Bytes::from("1"), None, false, false);
2506        ks.set("y".into(), Bytes::from("2"), None, false, false);
2507
2508        let slot_x = ember_cluster::key_slot(b"x");
2509        let keys = ks.get_keys_in_slot(slot_x, 100);
2510        assert!(keys.contains(&"x".to_string()));
2511    }
2512
2513    #[test]
2514    fn get_keys_in_slot_respects_count_limit() {
2515        let mut ks = Keyspace::new();
2516        // insert several keys — some might share a slot
2517        for i in 0..100 {
2518            ks.set(format!("key:{i}"), Bytes::from("v"), None, false, false);
2519        }
2520        // ask for at most 3 keys from slot 0
2521        let keys = ks.get_keys_in_slot(0, 3);
2522        assert!(keys.len() <= 3);
2523    }
2524
2525    // --- key_version (WATCH support) ---
2526    //
2527    // Version tracking uses a lazily-populated side table. `key_version()`
2528    // inserts the key into the table on first call (simulating WATCH).
2529    // Subsequent mutations only bump the version if the key is tracked.
2530
2531    #[test]
2532    fn key_version_returns_none_for_missing() {
2533        let mut ks = Keyspace::new();
2534        assert_eq!(ks.key_version("nope"), None);
2535    }
2536
2537    #[test]
2538    fn key_version_changes_on_set() {
2539        let mut ks = Keyspace::new();
2540        ks.set("k".into(), Bytes::from("v1"), None, false, false);
2541        // first call registers the key in the version table (like WATCH)
2542        let v1 = ks.key_version("k").expect("key should exist");
2543        ks.set("k".into(), Bytes::from("v2"), None, false, false);
2544        let v2 = ks.key_version("k").expect("key should exist");
2545        assert!(v2 > v1, "version should increase on overwrite");
2546    }
2547
2548    #[test]
2549    fn key_version_none_after_del() {
2550        let mut ks = Keyspace::new();
2551        ks.set("k".into(), Bytes::from("v"), None, false, false);
2552        assert!(ks.key_version("k").is_some());
2553        ks.del("k");
2554        assert_eq!(ks.key_version("k"), None);
2555    }
2556
2557    #[test]
2558    fn key_version_changes_on_list_push() {
2559        let mut ks = Keyspace::new();
2560        ks.lpush("list", &[Bytes::from("a")]).unwrap();
2561        let v1 = ks.key_version("list").expect("list should exist");
2562        ks.rpush("list", &[Bytes::from("b")]).unwrap();
2563        let v2 = ks.key_version("list").expect("list should exist");
2564        assert!(v2 > v1, "version should increase on rpush");
2565    }
2566
2567    #[test]
2568    fn key_version_changes_on_hash_set() {
2569        let mut ks = Keyspace::new();
2570        ks.hset("h", &[("f1".into(), Bytes::from("v1"))]).unwrap();
2571        let v1 = ks.key_version("h").expect("hash should exist");
2572        ks.hset("h", &[("f2".into(), Bytes::from("v2"))]).unwrap();
2573        let v2 = ks.key_version("h").expect("hash should exist");
2574        assert!(v2 > v1, "version should increase on hset");
2575    }
2576
2577    #[test]
2578    fn key_version_changes_on_expire() {
2579        let mut ks = Keyspace::new();
2580        ks.set("k".into(), Bytes::from("v"), None, false, false);
2581        let v1 = ks.key_version("k").expect("key should exist");
2582        ks.expire("k", 100);
2583        let v2 = ks.key_version("k").expect("key should exist");
2584        assert!(v2 > v1, "version should increase on expire");
2585    }
2586
2587    #[test]
2588    fn key_version_stable_without_watch() {
2589        // if key_version is never called, mutations don't create
2590        // version entries — the side table stays empty
2591        let mut ks = Keyspace::new();
2592        ks.set("a".into(), Bytes::from("1"), None, false, false);
2593        ks.set("a".into(), Bytes::from("2"), None, false, false);
2594        // first call to key_version returns a snapshot
2595        let v1 = ks.key_version("a").unwrap();
2596        // no mutation between calls — version is stable
2597        let v2 = ks.key_version("a").unwrap();
2598        assert_eq!(v1, v2, "version should be stable without mutations");
2599    }
2600
2601    // --- copy tests ---
2602
2603    #[test]
2604    fn copy_basic() {
2605        let mut ks = Keyspace::new();
2606        ks.set("src".into(), Bytes::from("hello"), None, false, false);
2607        assert_eq!(ks.copy("src", "dst", false), Ok(true));
2608        assert_eq!(
2609            ks.get("dst").unwrap(),
2610            Some(Value::String(Bytes::from("hello")))
2611        );
2612        // source should still exist
2613        assert!(ks.exists("src"));
2614    }
2615
2616    #[test]
2617    fn copy_preserves_expiry() {
2618        let mut ks = Keyspace::new();
2619        ks.set(
2620            "src".into(),
2621            Bytes::from("val"),
2622            Some(Duration::from_secs(60)),
2623            false,
2624            false,
2625        );
2626        assert_eq!(ks.copy("src", "dst", false), Ok(true));
2627        match ks.ttl("dst") {
2628            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2629            other => panic!("expected TTL preserved, got {other:?}"),
2630        }
2631    }
2632
2633    #[test]
2634    fn copy_no_replace_returns_false() {
2635        let mut ks = Keyspace::new();
2636        ks.set("src".into(), Bytes::from("a"), None, false, false);
2637        ks.set("dst".into(), Bytes::from("b"), None, false, false);
2638        assert_eq!(ks.copy("src", "dst", false), Ok(false));
2639        // destination should be unchanged
2640        assert_eq!(
2641            ks.get("dst").unwrap(),
2642            Some(Value::String(Bytes::from("b")))
2643        );
2644    }
2645
2646    #[test]
2647    fn copy_replace_overwrites() {
2648        let mut ks = Keyspace::new();
2649        ks.set("src".into(), Bytes::from("new"), None, false, false);
2650        ks.set("dst".into(), Bytes::from("old"), None, false, false);
2651        assert_eq!(ks.copy("src", "dst", true), Ok(true));
2652        assert_eq!(
2653            ks.get("dst").unwrap(),
2654            Some(Value::String(Bytes::from("new")))
2655        );
2656    }
2657
2658    #[test]
2659    fn copy_missing_source() {
2660        let mut ks = Keyspace::new();
2661        assert_eq!(ks.copy("missing", "dst", false), Err(CopyError::NoSuchKey));
2662    }
2663
2664    #[test]
2665    fn copy_tracks_memory() {
2666        let mut ks = Keyspace::new();
2667        ks.set("src".into(), Bytes::from("value"), None, false, false);
2668        let before = ks.stats().used_bytes;
2669        ks.copy("src", "dst", false).unwrap();
2670        let after = ks.stats().used_bytes;
2671        // memory should roughly double (two entries with same value)
2672        assert!(after > before);
2673        assert_eq!(ks.stats().key_count, 2);
2674    }
2675
2676    // --- random_key ---
2677
2678    #[test]
2679    fn random_key_empty() {
2680        let mut ks = Keyspace::new();
2681        assert_eq!(ks.random_key(), None);
2682    }
2683
2684    #[test]
2685    fn random_key_returns_existing() {
2686        let mut ks = Keyspace::new();
2687        ks.set("only".into(), Bytes::from("val"), None, false, false);
2688        assert_eq!(ks.random_key(), Some("only".into()));
2689    }
2690
2691    // --- touch ---
2692
2693    #[test]
2694    fn touch_existing_key() {
2695        let mut ks = Keyspace::new();
2696        ks.set("k".into(), Bytes::from("v"), None, false, false);
2697        assert!(ks.touch("k"));
2698    }
2699
2700    #[test]
2701    fn touch_missing_key() {
2702        let mut ks = Keyspace::new();
2703        assert!(!ks.touch("missing"));
2704    }
2705
2706    // --- sort ---
2707
2708    #[test]
2709    fn sort_list_numeric() {
2710        let mut ks = Keyspace::new();
2711        let _ = ks.lpush(
2712            "nums",
2713            &[Bytes::from("3"), Bytes::from("1"), Bytes::from("2")],
2714        );
2715        let result = ks.sort("nums", false, false, None).unwrap();
2716        assert_eq!(
2717            result,
2718            vec![Bytes::from("1"), Bytes::from("2"), Bytes::from("3")]
2719        );
2720    }
2721
2722    #[test]
2723    fn sort_list_alpha_desc() {
2724        let mut ks = Keyspace::new();
2725        let _ = ks.lpush(
2726            "words",
2727            &[
2728                Bytes::from("banana"),
2729                Bytes::from("apple"),
2730                Bytes::from("cherry"),
2731            ],
2732        );
2733        let result = ks.sort("words", true, true, None).unwrap();
2734        assert_eq!(
2735            result,
2736            vec![
2737                Bytes::from("cherry"),
2738                Bytes::from("banana"),
2739                Bytes::from("apple")
2740            ]
2741        );
2742    }
2743
2744    #[test]
2745    fn sort_with_limit() {
2746        let mut ks = Keyspace::new();
2747        let _ = ks.lpush(
2748            "nums",
2749            &[
2750                Bytes::from("4"),
2751                Bytes::from("3"),
2752                Bytes::from("2"),
2753                Bytes::from("1"),
2754            ],
2755        );
2756        let result = ks.sort("nums", false, false, Some((1, 2))).unwrap();
2757        assert_eq!(result, vec![Bytes::from("2"), Bytes::from("3")]);
2758    }
2759
2760    #[test]
2761    fn sort_set_alpha() {
2762        let mut ks = Keyspace::new();
2763        let members: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
2764        let _ = ks.sadd("myset", &members);
2765        let result = ks.sort("myset", false, true, None).unwrap();
2766        assert_eq!(
2767            result,
2768            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
2769        );
2770    }
2771
2772    #[test]
2773    fn sort_missing_key() {
2774        let mut ks = Keyspace::new();
2775        let result = ks.sort("nope", false, false, None).unwrap();
2776        assert!(result.is_empty());
2777    }
2778
2779    #[test]
2780    fn sort_wrong_type() {
2781        let mut ks = Keyspace::new();
2782        ks.set("str".into(), Bytes::from("hello"), None, false, false);
2783        let result = ks.sort("str", false, false, None);
2784        assert!(result.is_err());
2785    }
2786}