Skip to main content

ember_core/
keyspace.rs

1//! The keyspace: Ember's core key-value store.
2//!
3//! A `Keyspace` owns a flat `HashMap<String, Entry>` and handles
4//! get, set, delete, existence checks, and TTL management. Expired
5//! keys are removed lazily on access. Memory usage is tracked on
6//! every mutation for eviction and stats reporting.
7
8use std::collections::{HashMap, VecDeque};
9use std::time::Duration;
10
11use bytes::Bytes;
12use rand::seq::IteratorRandom;
13
14use tracing::warn;
15
16use crate::dropper::DropHandle;
17use crate::memory::{self, MemoryTracker};
18use crate::time;
19use crate::types::sorted_set::{SortedSet, ZAddFlags};
20use crate::types::{self, normalize_range, Value};
21
22const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
23const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
24
25/// Error returned when a command is used against a key holding the wrong type.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct WrongType;
28
29impl std::fmt::Display for WrongType {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        write!(f, "{WRONGTYPE_MSG}")
32    }
33}
34
35impl std::error::Error for WrongType {}
36
37/// Error returned by write operations that may fail due to type mismatch
38/// or memory limits.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum WriteError {
41    /// The key holds a different type than expected.
42    WrongType,
43    /// Memory limit reached and eviction couldn't free enough space.
44    OutOfMemory,
45}
46
47impl From<WrongType> for WriteError {
48    fn from(_: WrongType) -> Self {
49        WriteError::WrongType
50    }
51}
52
53/// Errors that can occur during INCR/DECR operations.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum IncrError {
56    /// Key holds a non-string type.
57    WrongType,
58    /// Value is not a valid integer.
59    NotAnInteger,
60    /// Increment or decrement would overflow i64.
61    Overflow,
62    /// Memory limit reached.
63    OutOfMemory,
64}
65
66impl std::fmt::Display for IncrError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
70            IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
71            IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
72            IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
73        }
74    }
75}
76
77impl std::error::Error for IncrError {}
78
79/// Errors that can occur during INCRBYFLOAT operations.
80#[derive(Debug, Clone, PartialEq)]
81pub enum IncrFloatError {
82    /// Key holds a non-string type.
83    WrongType,
84    /// Value is not a valid float.
85    NotAFloat,
86    /// Result would be NaN or Infinity.
87    NanOrInfinity,
88    /// Memory limit reached.
89    OutOfMemory,
90}
91
92impl std::fmt::Display for IncrFloatError {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        match self {
95            IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
96            IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
97            IncrFloatError::NanOrInfinity => {
98                write!(f, "ERR increment would produce NaN or Infinity")
99            }
100            IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
101        }
102    }
103}
104
105impl std::error::Error for IncrFloatError {}
106
107/// Error returned when RENAME fails because the source key doesn't exist.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub enum RenameError {
110    /// The source key does not exist.
111    NoSuchKey,
112}
113
114impl std::fmt::Display for RenameError {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            RenameError::NoSuchKey => write!(f, "ERR no such key"),
118        }
119    }
120}
121
122impl std::error::Error for RenameError {}
123
124/// Result of a ZADD operation, containing both the client-facing count
125/// and the list of members that were actually applied (for AOF correctness).
126#[derive(Debug, Clone)]
127pub struct ZAddResult {
128    /// Number of members added (or added+updated if CH flag was set).
129    pub count: usize,
130    /// Members that were actually inserted or had their score updated.
131    /// Only these should be persisted to the AOF.
132    pub applied: Vec<(f64, String)>,
133}
134
135/// Result of a VADD operation, carrying the applied element for AOF persistence.
136#[cfg(feature = "vector")]
137#[derive(Debug, Clone)]
138pub struct VAddResult {
139    /// The element name that was added or updated.
140    pub element: String,
141    /// The vector that was stored.
142    pub vector: Vec<f32>,
143    /// Whether a new element was added (false = updated existing).
144    pub added: bool,
145}
146
147/// Result of a VADD_BATCH operation.
148#[cfg(feature = "vector")]
149#[derive(Debug, Clone)]
150pub struct VAddBatchResult {
151    /// Number of newly added elements (not updates).
152    pub added_count: usize,
153    /// Elements that were actually inserted or updated, with their vectors.
154    /// Only these should be persisted to the AOF.
155    pub applied: Vec<(String, Vec<f32>)>,
156}
157
158/// Errors from vector write operations.
159#[cfg(feature = "vector")]
160#[derive(Debug, Clone)]
161pub enum VectorWriteError {
162    /// The key holds a different type than expected.
163    WrongType,
164    /// Memory limit reached.
165    OutOfMemory,
166    /// usearch index error (dimension mismatch, capacity, etc).
167    IndexError(String),
168    /// A batch insert partially succeeded before encountering an error.
169    /// The applied vectors should still be persisted to the AOF.
170    PartialBatch {
171        message: String,
172        applied: Vec<(String, Vec<f32>)>,
173    },
174}
175
176/// How the keyspace should handle writes when the memory limit is reached.
177#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
178pub enum EvictionPolicy {
179    /// Return an error on writes when memory is full.
180    #[default]
181    NoEviction,
182    /// Evict the least-recently-used key (approximated via random sampling).
183    AllKeysLru,
184}
185
186/// Configuration for a single keyspace / shard.
187#[derive(Debug, Clone)]
188pub struct ShardConfig {
189    /// Maximum memory in bytes. `None` means unlimited.
190    pub max_memory: Option<usize>,
191    /// What to do when memory is full.
192    pub eviction_policy: EvictionPolicy,
193    /// Numeric identifier for this shard (used for persistence file naming).
194    pub shard_id: u16,
195}
196
197impl Default for ShardConfig {
198    fn default() -> Self {
199        Self {
200            max_memory: None,
201            eviction_policy: EvictionPolicy::NoEviction,
202            shard_id: 0,
203        }
204    }
205}
206
207/// Result of a set operation that may fail under memory pressure.
208#[derive(Debug, PartialEq, Eq)]
209pub enum SetResult {
210    /// The key was stored successfully.
211    Ok,
212    /// Memory limit reached and eviction policy is NoEviction.
213    OutOfMemory,
214}
215
216/// A single entry in the keyspace: a value plus optional expiration
217/// and last access time for LRU approximation.
218///
219/// Memory optimized: uses u64 timestamps instead of Option<Instant>.
220/// Saves 8 bytes per entry (24 bytes down from 32 for metadata).
221#[derive(Debug, Clone)]
222pub(crate) struct Entry {
223    pub(crate) value: Value,
224    /// Monotonic expiry timestamp in ms. 0 = no expiry.
225    pub(crate) expires_at_ms: u64,
226    /// Monotonic last access timestamp in ms (for LRU).
227    pub(crate) last_access_ms: u64,
228}
229
230impl Entry {
231    fn new(value: Value, ttl: Option<Duration>) -> Self {
232        Self {
233            value,
234            expires_at_ms: time::expiry_from_duration(ttl),
235            last_access_ms: time::now_ms(),
236        }
237    }
238
239    /// Returns `true` if this entry has passed its expiration time.
240    fn is_expired(&self) -> bool {
241        time::is_expired(self.expires_at_ms)
242    }
243
244    /// Marks this entry as accessed right now.
245    fn touch(&mut self) {
246        self.last_access_ms = time::now_ms();
247    }
248}
249
250/// Result of a TTL query, matching Redis semantics.
251#[derive(Debug, Clone, PartialEq, Eq)]
252pub enum TtlResult {
253    /// Key exists and has a TTL. Returns remaining seconds.
254    Seconds(u64),
255    /// Key exists and has a TTL. Returns remaining milliseconds.
256    Milliseconds(u64),
257    /// Key exists but has no expiration set.
258    NoExpiry,
259    /// Key does not exist.
260    NotFound,
261}
262
263/// Aggregated statistics for a keyspace.
264#[derive(Debug, Clone, PartialEq, Eq)]
265pub struct KeyspaceStats {
266    /// Number of live keys.
267    pub key_count: usize,
268    /// Estimated memory usage in bytes.
269    pub used_bytes: usize,
270    /// Number of keys with an expiration set.
271    pub keys_with_expiry: usize,
272    /// Cumulative count of keys removed by expiration (lazy + active).
273    pub keys_expired: u64,
274    /// Cumulative count of keys removed by eviction.
275    pub keys_evicted: u64,
276}
277
278/// Number of random keys to sample when looking for an eviction candidate.
279///
280/// Eviction uses sampling-based approximate LRU — we randomly select this many
281/// keys and evict the least-recently-accessed among them. This trades perfect
282/// LRU accuracy for O(1) eviction (no sorted structure to maintain).
283///
284/// Larger sample sizes give better LRU approximation but cost more per eviction.
285/// 16 is a reasonable balance — similar to Redis's default sample size. With
286/// 16 samples, we statistically find a good eviction candidate while keeping
287/// eviction overhead low even at millions of keys.
288const EVICTION_SAMPLE_SIZE: usize = 16;
289
290/// The core key-value store.
291///
292/// All operations are single-threaded per shard — no internal locking.
293/// Memory usage is tracked incrementally on every mutation.
294pub struct Keyspace {
295    entries: HashMap<String, Entry>,
296    memory: MemoryTracker,
297    config: ShardConfig,
298    /// Number of entries that currently have an expiration set.
299    expiry_count: usize,
300    /// Cumulative count of keys removed by expiration (lazy + active).
301    expired_total: u64,
302    /// Cumulative count of keys removed by eviction.
303    evicted_total: u64,
304    /// When set, large values are dropped on a background thread instead
305    /// of inline on the shard thread. See [`crate::dropper`].
306    drop_handle: Option<DropHandle>,
307}
308
309impl Keyspace {
310    /// Creates a new, empty keyspace with default config (no memory limit).
311    pub fn new() -> Self {
312        Self::with_config(ShardConfig::default())
313    }
314
315    /// Creates a new, empty keyspace with the given config.
316    pub fn with_config(config: ShardConfig) -> Self {
317        Self {
318            entries: HashMap::new(),
319            memory: MemoryTracker::new(),
320            config,
321            expiry_count: 0,
322            expired_total: 0,
323            evicted_total: 0,
324            drop_handle: None,
325        }
326    }
327
328    /// Attaches a background drop handle for lazy free. When set, large
329    /// values removed by del/eviction/expiration are dropped on a
330    /// background thread instead of blocking the shard.
331    pub fn set_drop_handle(&mut self, handle: DropHandle) {
332        self.drop_handle = Some(handle);
333    }
334
335    /// Decrements the expiry count if the entry had a TTL set.
336    fn decrement_expiry_if_set(&mut self, entry: &Entry) {
337        if entry.expires_at_ms != 0 {
338            self.expiry_count = self.expiry_count.saturating_sub(1);
339        }
340    }
341
342    /// Cleans up after removing an element from a collection (list, sorted
343    /// set, hash, or set). If the collection is now empty, removes the key
344    /// entirely and adjusts memory/expiry tracking. Otherwise just adjusts
345    /// the memory delta.
346    fn cleanup_after_remove(&mut self, key: &str, old_size: usize, is_empty: bool) {
347        if is_empty {
348            if let Some(removed) = self.entries.remove(key) {
349                self.decrement_expiry_if_set(&removed);
350            }
351            self.memory.remove_with_size(old_size);
352        } else if let Some(entry) = self.entries.get(key) {
353            let new_size = memory::entry_size(key, &entry.value);
354            self.memory.adjust(old_size, new_size);
355        }
356    }
357
358    /// Checks whether a key either doesn't exist or holds the expected
359    /// collection type. Returns `Ok(true)` if the key is new,
360    /// `Ok(false)` if it exists with the right type, or `Err(WrongType)`
361    /// if the key exists with a different type.
362    fn ensure_collection_type(
363        &self,
364        key: &str,
365        type_check: fn(&Value) -> bool,
366    ) -> Result<bool, WriteError> {
367        match self.entries.get(key) {
368            None => Ok(true),
369            Some(e) if type_check(&e.value) => Ok(false),
370            Some(_) => Err(WriteError::WrongType),
371        }
372    }
373
374    /// Estimates the memory cost of a collection write and enforces the
375    /// limit. `base_overhead` is the fixed cost of a new collection
376    /// (e.g. VECDEQUE_BASE_OVERHEAD). Returns `Ok(())` on success.
377    fn reserve_memory(
378        &mut self,
379        is_new: bool,
380        key: &str,
381        base_overhead: usize,
382        element_increase: usize,
383    ) -> Result<(), WriteError> {
384        let estimated_increase = if is_new {
385            memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
386        } else {
387            element_increase
388        };
389        if self.enforce_memory_limit(estimated_increase) {
390            Ok(())
391        } else {
392            Err(WriteError::OutOfMemory)
393        }
394    }
395
396    /// Inserts a new key with an empty collection value. Used by
397    /// collection-write methods after type-checking and memory reservation.
398    fn insert_empty(&mut self, key: &str, value: Value) {
399        self.memory.add(key, &value);
400        self.entries.insert(key.to_owned(), Entry::new(value, None));
401    }
402
403    /// Measures entry size before and after a mutation, adjusting the
404    /// memory tracker for the difference. Touches the entry afterwards.
405    fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
406        let entry = self.entries.get_mut(key)?;
407        let old_size = memory::entry_size(key, &entry.value);
408        let result = f(entry);
409        let entry = self.entries.get(key)?;
410        let new_size = memory::entry_size(key, &entry.value);
411        self.memory.adjust(old_size, new_size);
412        Some(result)
413    }
414
415    /// Adjusts the expiry count when replacing an entry whose TTL status
416    /// may have changed (e.g. SET overwriting an existing key).
417    fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
418        match (had_expiry, has_expiry) {
419            (false, true) => self.expiry_count += 1,
420            (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
421            _ => {}
422        }
423    }
424
425    /// Retrieves the string value for `key`, or `None` if missing/expired.
426    ///
427    /// Returns `Err(WrongType)` if the key holds a non-string value.
428    /// Expired keys are removed lazily on access. Successful reads update
429    /// the entry's last access time for LRU tracking.
430    pub fn get(&mut self, key: &str) -> Result<Option<Value>, WrongType> {
431        if self.remove_if_expired(key) {
432            return Ok(None);
433        }
434        match self.entries.get_mut(key) {
435            Some(e) => match &e.value {
436                Value::String(_) => {
437                    e.touch();
438                    Ok(Some(e.value.clone()))
439                }
440                _ => Err(WrongType),
441            },
442            None => Ok(None),
443        }
444    }
445
446    /// Retrieves the raw `Bytes` for a string key, avoiding the `Value`
447    /// enum wrapper. `Bytes::clone()` is a cheap refcount increment.
448    ///
449    /// Returns `Err(WrongType)` if the key holds a non-string value.
450    pub fn get_string(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
451        if self.remove_if_expired(key) {
452            return Ok(None);
453        }
454        match self.entries.get_mut(key) {
455            Some(e) => match &e.value {
456                Value::String(b) => {
457                    let data = b.clone(); // Bytes::clone is a refcount bump
458                    e.touch();
459                    Ok(Some(data))
460                }
461                _ => Err(WrongType),
462            },
463            None => Ok(None),
464        }
465    }
466
467    /// Returns the type name of the value at `key`, or "none" if missing.
468    pub fn value_type(&mut self, key: &str) -> &'static str {
469        if self.remove_if_expired(key) {
470            return "none";
471        }
472        match self.entries.get(key) {
473            Some(e) => types::type_name(&e.value),
474            None => "none",
475        }
476    }
477
478    /// Stores a key-value pair. If the key already existed, the old entry
479    /// (including any TTL) is replaced entirely.
480    ///
481    /// `expire` sets an optional TTL as a duration from now.
482    ///
483    /// Returns `SetResult::OutOfMemory` if the memory limit is reached
484    /// and the eviction policy is `NoEviction`. With `AllKeysLru`, this
485    /// will evict keys to make room before inserting.
486    pub fn set(&mut self, key: String, value: Bytes, expire: Option<Duration>) -> SetResult {
487        let has_expiry = expire.is_some();
488        let new_value = Value::String(value);
489
490        // check memory limit — for overwrites, only the net increase matters
491        let new_size = memory::entry_size(&key, &new_value);
492        let old_size = self
493            .entries
494            .get(&key)
495            .map(|e| memory::entry_size(&key, &e.value))
496            .unwrap_or(0);
497        let net_increase = new_size.saturating_sub(old_size);
498
499        if !self.enforce_memory_limit(net_increase) {
500            return SetResult::OutOfMemory;
501        }
502
503        if let Some(old_entry) = self.entries.get(&key) {
504            self.memory.replace(&key, &old_entry.value, &new_value);
505            self.adjust_expiry_count(old_entry.expires_at_ms != 0, has_expiry);
506        } else {
507            self.memory.add(&key, &new_value);
508            if has_expiry {
509                self.expiry_count += 1;
510            }
511        }
512
513        self.entries.insert(key, Entry::new(new_value, expire));
514        SetResult::Ok
515    }
516
517    /// Tries to evict one key using LRU approximation.
518    ///
519    /// Randomly samples `EVICTION_SAMPLE_SIZE` keys and removes the one
520    /// with the oldest `last_access` time. Returns `true` if a key was
521    /// evicted, `false` if the keyspace is empty.
522    ///
523    /// Uses reservoir sampling with k=1 to avoid allocating a Vec on
524    /// every eviction attempt. The victim key index is remembered
525    /// rather than cloned, eliminating a heap allocation on the hot path.
526    fn try_evict(&mut self) -> bool {
527        if self.entries.is_empty() {
528            return false;
529        }
530
531        let mut rng = rand::rng();
532
533        // reservoir sample k=1 from the iterator, tracking the oldest
534        // entry by last_access_ms. this replaces choose_multiple() which
535        // allocates a Vec internally.
536        let mut best_key: Option<&str> = None;
537        let mut best_access = u64::MAX;
538        let mut seen = 0usize;
539
540        for (key, entry) in &self.entries {
541            // reservoir sampling: include this item with probability
542            // EVICTION_SAMPLE_SIZE / (seen + 1), capped once we have
543            // enough candidates
544            seen += 1;
545            if seen <= EVICTION_SAMPLE_SIZE {
546                if entry.last_access_ms < best_access {
547                    best_access = entry.last_access_ms;
548                    best_key = Some(key.as_str());
549                }
550            } else {
551                use rand::Rng;
552                let j = rng.random_range(0..seen);
553                if j < EVICTION_SAMPLE_SIZE && entry.last_access_ms < best_access {
554                    best_access = entry.last_access_ms;
555                    best_key = Some(key.as_str());
556                }
557            }
558        }
559
560        if let Some(victim) = best_key {
561            // we need an owned key to remove from the map
562            let victim = victim.to_owned();
563            if let Some(entry) = self.entries.remove(&victim) {
564                self.memory.remove(&victim, &entry.value);
565                self.decrement_expiry_if_set(&entry);
566                self.evicted_total += 1;
567                self.defer_drop(entry.value);
568                return true;
569            }
570        }
571        false
572    }
573
574    /// Checks whether the memory limit allows a write that would increase
575    /// usage by `estimated_increase` bytes. Attempts eviction if the
576    /// policy allows it. Returns `true` if the write can proceed.
577    ///
578    /// The comparison uses [`memory::effective_limit`] rather than the raw
579    /// configured maximum. This reserves headroom for allocator overhead
580    /// and fragmentation that our per-entry estimates can't account for,
581    /// preventing the OS from OOM-killing us before eviction triggers.
582    fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
583        if let Some(max) = self.config.max_memory {
584            let limit = memory::effective_limit(max);
585            while self.memory.used_bytes() + estimated_increase > limit {
586                match self.config.eviction_policy {
587                    EvictionPolicy::NoEviction => return false,
588                    EvictionPolicy::AllKeysLru => {
589                        if !self.try_evict() {
590                            return false;
591                        }
592                    }
593                }
594            }
595        }
596        true
597    }
598
599    /// Removes a key. Returns `true` if the key existed (and wasn't expired).
600    ///
601    /// When a drop handle is set, large values are dropped on the
602    /// background thread instead of inline.
603    pub fn del(&mut self, key: &str) -> bool {
604        if self.remove_if_expired(key) {
605            return false;
606        }
607        if let Some(entry) = self.entries.remove(key) {
608            self.memory.remove(key, &entry.value);
609            self.decrement_expiry_if_set(&entry);
610            self.defer_drop(entry.value);
611            true
612        } else {
613            false
614        }
615    }
616
617    /// Removes a key like `del`, but always defers the value's destructor
618    /// to the background drop thread (when available). Semantically
619    /// identical to DEL — the key is gone immediately, memory is
620    /// accounted for immediately, but the actual deallocation happens
621    /// off the hot path.
622    pub fn unlink(&mut self, key: &str) -> bool {
623        if self.remove_if_expired(key) {
624            return false;
625        }
626        if let Some(entry) = self.entries.remove(key) {
627            self.memory.remove(key, &entry.value);
628            self.decrement_expiry_if_set(&entry);
629            // always defer for UNLINK, regardless of value size
630            if let Some(ref handle) = self.drop_handle {
631                handle.defer_value(entry.value);
632            }
633            true
634        } else {
635            false
636        }
637    }
638
639    /// Replaces the entries map with an empty one and resets memory
640    /// tracking. Returns the old entries so the caller can send them
641    /// to the background drop thread.
642    pub(crate) fn flush_async(&mut self) -> HashMap<String, Entry> {
643        let old = std::mem::take(&mut self.entries);
644        self.memory.reset();
645        self.expiry_count = 0;
646        old
647    }
648
649    /// Returns `true` if the key exists and hasn't expired.
650    pub fn exists(&mut self, key: &str) -> bool {
651        if self.remove_if_expired(key) {
652            return false;
653        }
654        self.entries.contains_key(key)
655    }
656
657    /// Sets an expiration on an existing key. Returns `true` if the key
658    /// exists (and the TTL was set), `false` if the key doesn't exist.
659    pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
660        if self.remove_if_expired(key) {
661            return false;
662        }
663        match self.entries.get_mut(key) {
664            Some(entry) => {
665                if entry.expires_at_ms == 0 {
666                    self.expiry_count += 1;
667                }
668                entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
669                true
670            }
671            None => false,
672        }
673    }
674
675    /// Returns the TTL status for a key, following Redis semantics:
676    /// - `Seconds(n)` if the key has a TTL
677    /// - `NoExpiry` if the key exists without a TTL
678    /// - `NotFound` if the key doesn't exist
679    pub fn ttl(&mut self, key: &str) -> TtlResult {
680        if self.remove_if_expired(key) {
681            return TtlResult::NotFound;
682        }
683        match self.entries.get(key) {
684            Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
685                Some(secs) => TtlResult::Seconds(secs),
686                None => TtlResult::NoExpiry,
687            },
688            None => TtlResult::NotFound,
689        }
690    }
691
692    /// Removes the expiration from a key.
693    ///
694    /// Returns `true` if the key existed and had a timeout that was removed.
695    /// Returns `false` if the key doesn't exist or has no expiration.
696    pub fn persist(&mut self, key: &str) -> bool {
697        if self.remove_if_expired(key) {
698            return false;
699        }
700        match self.entries.get_mut(key) {
701            Some(entry) => {
702                if entry.expires_at_ms != 0 {
703                    entry.expires_at_ms = 0;
704                    self.expiry_count = self.expiry_count.saturating_sub(1);
705                    true
706                } else {
707                    false
708                }
709            }
710            None => false,
711        }
712    }
713
714    /// Returns the TTL status for a key in milliseconds, following Redis semantics:
715    /// - `Milliseconds(n)` if the key has a TTL
716    /// - `NoExpiry` if the key exists without a TTL
717    /// - `NotFound` if the key doesn't exist
718    pub fn pttl(&mut self, key: &str) -> TtlResult {
719        if self.remove_if_expired(key) {
720            return TtlResult::NotFound;
721        }
722        match self.entries.get(key) {
723            Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
724                Some(ms) => TtlResult::Milliseconds(ms),
725                None => TtlResult::NoExpiry,
726            },
727            None => TtlResult::NotFound,
728        }
729    }
730
731    /// Sets an expiration on an existing key in milliseconds.
732    ///
733    /// Returns `true` if the key exists (and the TTL was set),
734    /// `false` if the key doesn't exist.
735    pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
736        if self.remove_if_expired(key) {
737            return false;
738        }
739        match self.entries.get_mut(key) {
740            Some(entry) => {
741                if entry.expires_at_ms == 0 {
742                    self.expiry_count += 1;
743                }
744                entry.expires_at_ms = time::now_ms().saturating_add(millis);
745                true
746            }
747            None => false,
748        }
749    }
750
751    /// Increments the integer value of a key by 1.
752    ///
753    /// If the key doesn't exist, it's initialized to 0 before incrementing.
754    /// Returns the new value after the operation.
755    pub fn incr(&mut self, key: &str) -> Result<i64, IncrError> {
756        self.incr_by(key, 1)
757    }
758
759    /// Decrements the integer value of a key by 1.
760    ///
761    /// If the key doesn't exist, it's initialized to 0 before decrementing.
762    /// Returns the new value after the operation.
763    pub fn decr(&mut self, key: &str) -> Result<i64, IncrError> {
764        self.incr_by(key, -1)
765    }
766
767    /// Adds `delta` to the current integer value of the key, creating it
768    /// if necessary. Used by INCR, DECR, INCRBY, and DECRBY.
769    ///
770    /// Preserves the existing TTL when updating an existing key.
771    pub fn incr_by(&mut self, key: &str, delta: i64) -> Result<i64, IncrError> {
772        self.remove_if_expired(key);
773
774        // read current value and TTL
775        let (current, existing_expire) = match self.entries.get(key) {
776            Some(entry) => {
777                let val = match &entry.value {
778                    Value::String(data) => {
779                        let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
780                        s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
781                    }
782                    _ => return Err(IncrError::WrongType),
783                };
784                let expire = time::remaining_ms(entry.expires_at_ms).map(Duration::from_millis);
785                (val, expire)
786            }
787            None => (0, None),
788        };
789
790        let new_val = current.checked_add(delta).ok_or(IncrError::Overflow)?;
791        let new_bytes = Bytes::from(new_val.to_string());
792
793        match self.set(key.to_owned(), new_bytes, existing_expire) {
794            SetResult::Ok => Ok(new_val),
795            SetResult::OutOfMemory => Err(IncrError::OutOfMemory),
796        }
797    }
798
799    /// Adds a float `delta` to the current value of the key, creating it
800    /// if necessary. Used by INCRBYFLOAT.
801    ///
802    /// Preserves the existing TTL when updating an existing key.
803    /// Returns the new value as a string (matching Redis behavior).
804    pub fn incr_by_float(&mut self, key: &str, delta: f64) -> Result<String, IncrFloatError> {
805        self.remove_if_expired(key);
806
807        let (current, existing_expire) = match self.entries.get(key) {
808            Some(entry) => {
809                let val = match &entry.value {
810                    Value::String(data) => {
811                        let s = std::str::from_utf8(data).map_err(|_| IncrFloatError::NotAFloat)?;
812                        s.parse::<f64>().map_err(|_| IncrFloatError::NotAFloat)?
813                    }
814                    _ => return Err(IncrFloatError::WrongType),
815                };
816                let expire = time::remaining_ms(entry.expires_at_ms).map(Duration::from_millis);
817                (val, expire)
818            }
819            None => (0.0, None),
820        };
821
822        let new_val = current + delta;
823        if new_val.is_nan() || new_val.is_infinite() {
824            return Err(IncrFloatError::NanOrInfinity);
825        }
826
827        // Redis strips trailing zeros: "10.5" not "10.50000..."
828        // but keeps at least one decimal if the result is a whole number
829        let formatted = format_float(new_val);
830        let new_bytes = Bytes::copy_from_slice(formatted.as_bytes());
831
832        match self.set(key.to_owned(), new_bytes, existing_expire) {
833            SetResult::Ok => Ok(formatted),
834            SetResult::OutOfMemory => Err(IncrFloatError::OutOfMemory),
835        }
836    }
837
838    /// Appends a value to an existing string key, or creates a new key if
839    /// it doesn't exist. Returns the new string length.
840    pub fn append(&mut self, key: &str, value: &[u8]) -> Result<usize, WriteError> {
841        self.remove_if_expired(key);
842
843        match self.entries.get(key) {
844            Some(entry) => match &entry.value {
845                Value::String(existing) => {
846                    let mut new_data = Vec::with_capacity(existing.len() + value.len());
847                    new_data.extend_from_slice(existing);
848                    new_data.extend_from_slice(value);
849                    let new_len = new_data.len();
850                    let expire = time::remaining_ms(entry.expires_at_ms).map(Duration::from_millis);
851                    match self.set(key.to_owned(), Bytes::from(new_data), expire) {
852                        SetResult::Ok => Ok(new_len),
853                        SetResult::OutOfMemory => Err(WriteError::OutOfMemory),
854                    }
855                }
856                _ => Err(WriteError::WrongType),
857            },
858            None => {
859                let new_len = value.len();
860                match self.set(key.to_owned(), Bytes::copy_from_slice(value), None) {
861                    SetResult::Ok => Ok(new_len),
862                    SetResult::OutOfMemory => Err(WriteError::OutOfMemory),
863                }
864            }
865        }
866    }
867
868    /// Returns the length of the string value stored at key.
869    /// Returns 0 if the key does not exist.
870    pub fn strlen(&mut self, key: &str) -> Result<usize, WrongType> {
871        self.remove_if_expired(key);
872
873        match self.entries.get(key) {
874            Some(entry) => match &entry.value {
875                Value::String(data) => Ok(data.len()),
876                _ => Err(WrongType),
877            },
878            None => Ok(0),
879        }
880    }
881
882    /// Returns all keys matching a glob pattern.
883    ///
884    /// Warning: O(n) scan of the entire keyspace. Use SCAN for production
885    /// workloads with large key counts.
886    pub fn keys(&self, pattern: &str) -> Vec<String> {
887        let len = self.entries.len();
888        if len > 10_000 {
889            warn!(
890                key_count = len,
891                "KEYS on large keyspace, consider SCAN instead"
892            );
893        }
894        let compiled = GlobPattern::new(pattern);
895        self.entries
896            .iter()
897            .filter(|(_, entry)| !entry.is_expired())
898            .filter(|(key, _)| compiled.matches(key))
899            .map(|(key, _)| key.clone())
900            .collect()
901    }
902
903    /// Counts live keys in this keyspace that hash to the given cluster slot.
904    ///
905    /// O(n) scan over all entries — same cost as KEYS.
906    pub fn count_keys_in_slot(&self, slot: u16) -> usize {
907        self.entries
908            .iter()
909            .filter(|(_, entry)| !entry.is_expired())
910            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
911            .count()
912    }
913
914    /// Returns up to `count` live keys that hash to the given cluster slot.
915    ///
916    /// O(n) scan over all entries — same cost as KEYS.
917    pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
918        self.entries
919            .iter()
920            .filter(|(_, entry)| !entry.is_expired())
921            .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
922            .take(count)
923            .map(|(key, _)| key.clone())
924            .collect()
925    }
926
927    /// Renames a key to a new name. Returns an error if the source key
928    /// doesn't exist. If the destination key already exists, it is overwritten.
929    pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
930        self.remove_if_expired(key);
931        self.remove_if_expired(newkey);
932
933        let entry = match self.entries.remove(key) {
934            Some(entry) => entry,
935            None => return Err(RenameError::NoSuchKey),
936        };
937
938        // update memory tracking for old key removal
939        self.memory.remove(key, &entry.value);
940        self.decrement_expiry_if_set(&entry);
941
942        // remove destination if it exists
943        if let Some(old_dest) = self.entries.remove(newkey) {
944            self.memory.remove(newkey, &old_dest.value);
945            self.decrement_expiry_if_set(&old_dest);
946        }
947
948        // re-insert with the new key name, preserving value and expiry
949        self.memory.add(newkey, &entry.value);
950        if entry.expires_at_ms != 0 {
951            self.expiry_count += 1;
952        }
953        self.entries.insert(newkey.to_owned(), entry);
954        Ok(())
955    }
956
957    /// Returns aggregated stats for this keyspace.
958    ///
959    /// All fields are tracked incrementally — this is O(1).
960    pub fn stats(&self) -> KeyspaceStats {
961        KeyspaceStats {
962            key_count: self.memory.key_count(),
963            used_bytes: self.memory.used_bytes(),
964            keys_with_expiry: self.expiry_count,
965            keys_expired: self.expired_total,
966            keys_evicted: self.evicted_total,
967        }
968    }
969
970    /// Returns the number of live keys.
971    pub fn len(&self) -> usize {
972        self.entries.len()
973    }
974
975    /// Removes all keys from the keyspace.
976    pub fn clear(&mut self) {
977        self.entries.clear();
978        self.memory.reset();
979        self.expiry_count = 0;
980    }
981
982    /// Returns `true` if the keyspace has no entries.
983    pub fn is_empty(&self) -> bool {
984        self.entries.is_empty()
985    }
986
987    /// Scans keys starting from a cursor position.
988    ///
989    /// Returns the next cursor (0 if scan complete) and a batch of keys.
990    /// The `pattern` argument supports glob-style matching (`*`, `?`, `[abc]`).
991    pub fn scan_keys(
992        &self,
993        cursor: u64,
994        count: usize,
995        pattern: Option<&str>,
996    ) -> (u64, Vec<String>) {
997        let mut keys = Vec::with_capacity(count);
998        let mut position = 0u64;
999        let target_count = if count == 0 { 10 } else { count };
1000
1001        let compiled = pattern.map(GlobPattern::new);
1002
1003        for (key, entry) in self.entries.iter() {
1004            // skip expired entries
1005            if entry.is_expired() {
1006                continue;
1007            }
1008
1009            // skip entries before cursor
1010            if position < cursor {
1011                position += 1;
1012                continue;
1013            }
1014
1015            // pattern matching
1016            if let Some(ref pat) = compiled {
1017                if !pat.matches(key) {
1018                    position += 1;
1019                    continue;
1020                }
1021            }
1022
1023            keys.push(key.clone());
1024            position += 1;
1025
1026            if keys.len() >= target_count {
1027                // return position as next cursor
1028                return (position, keys);
1029            }
1030        }
1031
1032        // scan complete
1033        (0, keys)
1034    }
1035
1036    /// Iterates over all live (non-expired) entries, yielding the key, a
1037    /// clone of the value, and the remaining TTL in milliseconds (-1 for
1038    /// entries with no expiration). Used by snapshot and AOF rewrite.
1039    pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
1040        self.entries.iter().filter_map(move |(key, entry)| {
1041            if entry.is_expired() {
1042                return None;
1043            }
1044            let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1045                Some(ms) => ms.min(i64::MAX as u64) as i64,
1046                None => -1,
1047            };
1048            Some((key.as_str(), &entry.value, ttl_ms))
1049        })
1050    }
1051
1052    /// Restores an entry during recovery, bypassing memory limits.
1053    ///
1054    /// `ttl` is the remaining time-to-live. If `None`, the key has no expiry.
1055    /// This is used only during shard startup when loading from
1056    /// snapshot/AOF — normal writes should go through `set()`.
1057    pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
1058        let has_expiry = ttl.is_some();
1059
1060        // if replacing an existing entry, adjust memory tracking
1061        if let Some(old) = self.entries.get(&key) {
1062            self.memory.replace(&key, &old.value, &value);
1063            self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
1064        } else {
1065            self.memory.add(&key, &value);
1066            if has_expiry {
1067                self.expiry_count += 1;
1068            }
1069        }
1070
1071        self.entries.insert(key, Entry::new(value, ttl));
1072    }
1073
1074    // -- list operations --
1075
1076    /// Pushes one or more values to the head (left) of a list.
1077    ///
1078    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
1079    /// if the key exists but holds a non-list value, or
1080    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
1081    /// Returns the new length on success.
1082    pub fn lpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
1083        self.list_push(key, values, true)
1084    }
1085
1086    /// Pushes one or more values to the tail (right) of a list.
1087    ///
1088    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
1089    /// if the key exists but holds a non-list value, or
1090    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
1091    /// Returns the new length on success.
1092    pub fn rpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
1093        self.list_push(key, values, false)
1094    }
1095
1096    /// Pops a value from the head (left) of a list.
1097    ///
1098    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
1099    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
1100    pub fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
1101        self.list_pop(key, true)
1102    }
1103
1104    /// Pops a value from the tail (right) of a list.
1105    ///
1106    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
1107    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
1108    pub fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
1109        self.list_pop(key, false)
1110    }
1111
1112    /// Returns a range of elements from a list by index.
1113    ///
1114    /// Supports negative indices (e.g. -1 = last element). Out-of-bounds
1115    /// indices are clamped to the list boundaries. Returns `Err(WrongType)`
1116    /// on type mismatch. Missing keys return an empty vec.
1117    pub fn lrange(&mut self, key: &str, start: i64, stop: i64) -> Result<Vec<Bytes>, WrongType> {
1118        if self.remove_if_expired(key) {
1119            return Ok(vec![]);
1120        }
1121        match self.entries.get_mut(key) {
1122            None => Ok(vec![]),
1123            Some(entry) => {
1124                let result = match &entry.value {
1125                    Value::List(deque) => {
1126                        let len = deque.len() as i64;
1127                        let (s, e) = normalize_range(start, stop, len);
1128                        // empty range: inverted indices or both out of bounds
1129                        if s > e {
1130                            return Ok(vec![]);
1131                        }
1132                        Ok(deque
1133                            .iter()
1134                            .skip(s as usize)
1135                            .take((e - s + 1) as usize)
1136                            .cloned()
1137                            .collect())
1138                    }
1139                    _ => Err(WrongType),
1140                };
1141                if result.is_ok() {
1142                    entry.touch();
1143                }
1144                result
1145            }
1146        }
1147    }
1148
1149    /// Returns the length of a list, or 0 if the key doesn't exist.
1150    ///
1151    /// Returns `Err(WrongType)` on type mismatch.
1152    pub fn llen(&mut self, key: &str) -> Result<usize, WrongType> {
1153        if self.remove_if_expired(key) {
1154            return Ok(0);
1155        }
1156        match self.entries.get(key) {
1157            None => Ok(0),
1158            Some(entry) => match &entry.value {
1159                Value::List(deque) => Ok(deque.len()),
1160                _ => Err(WrongType),
1161            },
1162        }
1163    }
1164
1165    /// Internal push implementation shared by lpush/rpush.
1166    fn list_push(&mut self, key: &str, values: &[Bytes], left: bool) -> Result<usize, WriteError> {
1167        self.remove_if_expired(key);
1168
1169        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::List(_)))?;
1170
1171        let element_increase: usize = values
1172            .iter()
1173            .map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
1174            .sum();
1175        self.reserve_memory(
1176            is_new,
1177            key,
1178            memory::VECDEQUE_BASE_OVERHEAD,
1179            element_increase,
1180        )?;
1181
1182        if is_new {
1183            self.insert_empty(key, Value::List(VecDeque::new()));
1184        }
1185
1186        let len = self
1187            .track_size(key, |entry| {
1188                let Value::List(ref mut deque) = entry.value else {
1189                    unreachable!("type verified by ensure_collection_type");
1190                };
1191                for val in values {
1192                    if left {
1193                        deque.push_front(val.clone());
1194                    } else {
1195                        deque.push_back(val.clone());
1196                    }
1197                }
1198                let len = deque.len();
1199                entry.touch();
1200                len
1201            })
1202            .unwrap_or(0);
1203
1204        Ok(len)
1205    }
1206
1207    /// Internal pop implementation shared by lpop/rpop.
1208    fn list_pop(&mut self, key: &str, left: bool) -> Result<Option<Bytes>, WrongType> {
1209        if self.remove_if_expired(key) {
1210            return Ok(None);
1211        }
1212
1213        let Some(entry) = self.entries.get_mut(key) else {
1214            return Ok(None);
1215        };
1216        if !matches!(entry.value, Value::List(_)) {
1217            return Err(WrongType);
1218        }
1219
1220        let old_entry_size = memory::entry_size(key, &entry.value);
1221        let Value::List(ref mut deque) = entry.value else {
1222            // checked above
1223            return Err(WrongType);
1224        };
1225        let popped = if left {
1226            deque.pop_front()
1227        } else {
1228            deque.pop_back()
1229        };
1230        entry.touch();
1231
1232        let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty());
1233        self.cleanup_after_remove(key, old_entry_size, is_empty);
1234
1235        Ok(popped)
1236    }
1237
1238    // -- sorted set operations --
1239
1240    /// Adds members with scores to a sorted set, with optional ZADD flags.
1241    ///
1242    /// Creates the sorted set if the key doesn't exist. Returns a
1243    /// `ZAddResult` containing the count for the client response and the
1244    /// list of members that were actually applied (for AOF correctness).
1245    /// Returns `Err(WriteError::WrongType)` on type mismatch, or
1246    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
1247    pub fn zadd(
1248        &mut self,
1249        key: &str,
1250        members: &[(f64, String)],
1251        flags: &ZAddFlags,
1252    ) -> Result<ZAddResult, WriteError> {
1253        self.remove_if_expired(key);
1254
1255        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::SortedSet(_)))?;
1256
1257        // worst-case estimate: assume all members are new
1258        let member_increase: usize = members
1259            .iter()
1260            .map(|(_, m)| SortedSet::estimated_member_cost(m))
1261            .sum();
1262        self.reserve_memory(is_new, key, SortedSet::BASE_OVERHEAD, member_increase)?;
1263
1264        if is_new {
1265            self.insert_empty(key, Value::SortedSet(SortedSet::new()));
1266        }
1267
1268        let (count, applied) = self
1269            .track_size(key, |entry| {
1270                let Value::SortedSet(ref mut ss) = entry.value else {
1271                    unreachable!("type verified by ensure_collection_type");
1272                };
1273                let mut count = 0;
1274                let mut applied = Vec::new();
1275                for (score, member) in members {
1276                    let result = ss.add_with_flags(member.clone(), *score, flags);
1277                    if result.added || result.updated {
1278                        applied.push((*score, member.clone()));
1279                    }
1280                    if flags.ch {
1281                        if result.added || result.updated {
1282                            count += 1;
1283                        }
1284                    } else if result.added {
1285                        count += 1;
1286                    }
1287                }
1288                entry.touch();
1289                (count, applied)
1290            })
1291            .unwrap_or_default();
1292
1293        // clean up if the set is still empty (e.g. XX flag on a new key)
1294        if let Some(entry) = self.entries.get(key) {
1295            if matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty()) {
1296                self.memory
1297                    .remove_with_size(memory::entry_size(key, &entry.value));
1298                self.entries.remove(key);
1299            }
1300        }
1301
1302        Ok(ZAddResult { count, applied })
1303    }
1304
1305    /// Removes members from a sorted set. Returns the names of members
1306    /// that were actually removed (for AOF correctness). Deletes the key
1307    /// if the set becomes empty.
1308    ///
1309    /// Returns `Err(WrongType)` if the key holds a non-sorted-set value.
1310    pub fn zrem(&mut self, key: &str, members: &[String]) -> Result<Vec<String>, WrongType> {
1311        if self.remove_if_expired(key) {
1312            return Ok(vec![]);
1313        }
1314
1315        let Some(entry) = self.entries.get(key) else {
1316            return Ok(vec![]);
1317        };
1318        if !matches!(entry.value, Value::SortedSet(_)) {
1319            return Err(WrongType);
1320        }
1321
1322        let Some(entry) = self.entries.get_mut(key) else {
1323            return Ok(vec![]);
1324        };
1325        let old_entry_size = memory::entry_size(key, &entry.value);
1326        let mut removed = Vec::new();
1327        if let Value::SortedSet(ref mut ss) = entry.value {
1328            for member in members {
1329                if ss.remove(member) {
1330                    removed.push(member.clone());
1331                }
1332            }
1333        }
1334        entry.touch();
1335
1336        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
1337        self.cleanup_after_remove(key, old_entry_size, is_empty);
1338
1339        Ok(removed)
1340    }
1341
1342    /// Returns the score for a member in a sorted set.
1343    ///
1344    /// Returns `Ok(None)` if the key or member doesn't exist.
1345    /// Returns `Err(WrongType)` on type mismatch.
1346    pub fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, WrongType> {
1347        if self.remove_if_expired(key) {
1348            return Ok(None);
1349        }
1350        match self.entries.get_mut(key) {
1351            None => Ok(None),
1352            Some(entry) => match &entry.value {
1353                Value::SortedSet(ss) => {
1354                    let score = ss.score(member);
1355                    entry.touch();
1356                    Ok(score)
1357                }
1358                _ => Err(WrongType),
1359            },
1360        }
1361    }
1362
1363    /// Returns the 0-based rank of a member in a sorted set (lowest score = 0).
1364    ///
1365    /// Returns `Ok(None)` if the key or member doesn't exist.
1366    /// Returns `Err(WrongType)` on type mismatch.
1367    pub fn zrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
1368        if self.remove_if_expired(key) {
1369            return Ok(None);
1370        }
1371        match self.entries.get_mut(key) {
1372            None => Ok(None),
1373            Some(entry) => match &entry.value {
1374                Value::SortedSet(ss) => {
1375                    let rank = ss.rank(member);
1376                    entry.touch();
1377                    Ok(rank)
1378                }
1379                _ => Err(WrongType),
1380            },
1381        }
1382    }
1383
1384    /// Returns a range of members from a sorted set by rank.
1385    ///
1386    /// Supports negative indices. If `with_scores` is true, the result
1387    /// includes `(member, score)` pairs; otherwise just members.
1388    /// Returns `Err(WrongType)` on type mismatch.
1389    pub fn zrange(
1390        &mut self,
1391        key: &str,
1392        start: i64,
1393        stop: i64,
1394    ) -> Result<Vec<(String, f64)>, WrongType> {
1395        if self.remove_if_expired(key) {
1396            return Ok(vec![]);
1397        }
1398        match self.entries.get_mut(key) {
1399            None => Ok(vec![]),
1400            Some(entry) => {
1401                let result = match &entry.value {
1402                    Value::SortedSet(ss) => {
1403                        let items = ss.range_by_rank(start, stop);
1404                        Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
1405                    }
1406                    _ => Err(WrongType),
1407                };
1408                if result.is_ok() {
1409                    entry.touch();
1410                }
1411                result
1412            }
1413        }
1414    }
1415
1416    /// Returns the number of members in a sorted set, or 0 if the key doesn't exist.
1417    ///
1418    /// Returns `Err(WrongType)` on type mismatch.
1419    pub fn zcard(&mut self, key: &str) -> Result<usize, WrongType> {
1420        if self.remove_if_expired(key) {
1421            return Ok(0);
1422        }
1423        match self.entries.get(key) {
1424            None => Ok(0),
1425            Some(entry) => match &entry.value {
1426                Value::SortedSet(ss) => Ok(ss.len()),
1427                _ => Err(WrongType),
1428            },
1429        }
1430    }
1431
1432    // -------------------------------------------------------------------------
1433    // Hash operations
1434    // -------------------------------------------------------------------------
1435
1436    /// Sets one or more field-value pairs in a hash.
1437    ///
1438    /// Creates the hash if the key doesn't exist. Returns the number of
1439    /// new fields added (fields that were updated don't count).
1440    pub fn hset(&mut self, key: &str, fields: &[(String, Bytes)]) -> Result<usize, WriteError> {
1441        if fields.is_empty() {
1442            return Ok(0);
1443        }
1444
1445        self.remove_if_expired(key);
1446
1447        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::Hash(_)))?;
1448
1449        let field_increase: usize = fields
1450            .iter()
1451            .map(|(f, v)| f.len() + v.len() + memory::HASHMAP_ENTRY_OVERHEAD)
1452            .sum();
1453        self.reserve_memory(is_new, key, memory::HASHMAP_BASE_OVERHEAD, field_increase)?;
1454
1455        if is_new {
1456            self.insert_empty(key, Value::Hash(HashMap::new()));
1457        }
1458
1459        let added = self
1460            .track_size(key, |entry| {
1461                let Value::Hash(ref mut map) = entry.value else {
1462                    unreachable!("type verified by ensure_collection_type");
1463                };
1464                let mut added = 0;
1465                for (field, value) in fields {
1466                    if map.insert(field.clone(), value.clone()).is_none() {
1467                        added += 1;
1468                    }
1469                }
1470                entry.touch();
1471                added
1472            })
1473            .unwrap_or(0);
1474
1475        Ok(added)
1476    }
1477
1478    /// Gets the value of a field in a hash.
1479    ///
1480    /// Returns `None` if the key or field doesn't exist.
1481    pub fn hget(&mut self, key: &str, field: &str) -> Result<Option<Bytes>, WrongType> {
1482        if self.remove_if_expired(key) {
1483            return Ok(None);
1484        }
1485        match self.entries.get_mut(key) {
1486            None => Ok(None),
1487            Some(entry) => match &entry.value {
1488                Value::Hash(map) => {
1489                    let result = map.get(field).cloned();
1490                    entry.touch();
1491                    Ok(result)
1492                }
1493                _ => Err(WrongType),
1494            },
1495        }
1496    }
1497
1498    /// Gets all field-value pairs from a hash.
1499    ///
1500    /// Returns an empty vec if the key doesn't exist.
1501    pub fn hgetall(&mut self, key: &str) -> Result<Vec<(String, Bytes)>, WrongType> {
1502        if self.remove_if_expired(key) {
1503            return Ok(vec![]);
1504        }
1505        match self.entries.get_mut(key) {
1506            None => Ok(vec![]),
1507            Some(entry) => match &entry.value {
1508                Value::Hash(map) => {
1509                    let result: Vec<_> = map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1510                    entry.touch();
1511                    Ok(result)
1512                }
1513                _ => Err(WrongType),
1514            },
1515        }
1516    }
1517
1518    /// Deletes one or more fields from a hash.
1519    ///
1520    /// Returns the fields that were actually removed.
1521    pub fn hdel(&mut self, key: &str, fields: &[String]) -> Result<Vec<String>, WrongType> {
1522        if self.remove_if_expired(key) {
1523            return Ok(vec![]);
1524        }
1525
1526        let Some(entry) = self.entries.get_mut(key) else {
1527            return Ok(vec![]);
1528        };
1529        if !matches!(entry.value, Value::Hash(_)) {
1530            return Err(WrongType);
1531        }
1532
1533        let old_entry_size = memory::entry_size(key, &entry.value);
1534        let mut removed = Vec::new();
1535        let is_empty = if let Value::Hash(ref mut map) = entry.value {
1536            for field in fields {
1537                if map.remove(field).is_some() {
1538                    removed.push(field.clone());
1539                }
1540            }
1541            map.is_empty()
1542        } else {
1543            false
1544        };
1545
1546        self.cleanup_after_remove(key, old_entry_size, is_empty);
1547
1548        Ok(removed)
1549    }
1550
1551    /// Checks if a field exists in a hash.
1552    pub fn hexists(&mut self, key: &str, field: &str) -> Result<bool, WrongType> {
1553        if self.remove_if_expired(key) {
1554            return Ok(false);
1555        }
1556        match self.entries.get_mut(key) {
1557            None => Ok(false),
1558            Some(entry) => match &entry.value {
1559                Value::Hash(map) => {
1560                    let result = map.contains_key(field);
1561                    entry.touch();
1562                    Ok(result)
1563                }
1564                _ => Err(WrongType),
1565            },
1566        }
1567    }
1568
1569    /// Returns the number of fields in a hash.
1570    pub fn hlen(&mut self, key: &str) -> Result<usize, WrongType> {
1571        if self.remove_if_expired(key) {
1572            return Ok(0);
1573        }
1574        match self.entries.get(key) {
1575            None => Ok(0),
1576            Some(entry) => match &entry.value {
1577                Value::Hash(map) => Ok(map.len()),
1578                _ => Err(WrongType),
1579            },
1580        }
1581    }
1582
1583    /// Increments a field's integer value by the given amount.
1584    ///
1585    /// Creates the hash and field if they don't exist, starting from 0.
1586    pub fn hincrby(&mut self, key: &str, field: &str, delta: i64) -> Result<i64, IncrError> {
1587        self.remove_if_expired(key);
1588
1589        let is_new = match self.entries.get(key) {
1590            None => true,
1591            Some(e) if matches!(e.value, Value::Hash(_)) => false,
1592            Some(_) => return Err(IncrError::WrongType),
1593        };
1594
1595        // estimate memory for new field (worst case: new hash + new field)
1596        let val_str_len = 20; // max i64 string length
1597        let estimated_increase = if is_new {
1598            memory::ENTRY_OVERHEAD
1599                + key.len()
1600                + memory::HASHMAP_BASE_OVERHEAD
1601                + field.len()
1602                + val_str_len
1603                + memory::HASHMAP_ENTRY_OVERHEAD
1604        } else {
1605            field.len() + val_str_len + memory::HASHMAP_ENTRY_OVERHEAD
1606        };
1607
1608        if !self.enforce_memory_limit(estimated_increase) {
1609            return Err(IncrError::OutOfMemory);
1610        }
1611
1612        if is_new {
1613            let value = Value::Hash(HashMap::new());
1614            self.memory.add(key, &value);
1615            self.entries.insert(key.to_owned(), Entry::new(value, None));
1616        }
1617
1618        // safe: key was either just inserted above or verified to exist
1619        let Some(entry) = self.entries.get_mut(key) else {
1620            return Err(IncrError::WrongType);
1621        };
1622        let old_entry_size = memory::entry_size(key, &entry.value);
1623
1624        let Value::Hash(ref mut map) = entry.value else {
1625            return Err(IncrError::WrongType);
1626        };
1627        let current_val = match map.get(field) {
1628            Some(data) => {
1629                let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
1630                s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
1631            }
1632            None => 0,
1633        };
1634        let new_val = current_val.checked_add(delta).ok_or(IncrError::Overflow)?;
1635        map.insert(field.to_owned(), Bytes::from(new_val.to_string()));
1636        entry.touch();
1637
1638        let new_entry_size = memory::entry_size(key, &entry.value);
1639        self.memory.adjust(old_entry_size, new_entry_size);
1640
1641        Ok(new_val)
1642    }
1643
1644    /// Returns all field names in a hash.
1645    pub fn hkeys(&mut self, key: &str) -> Result<Vec<String>, WrongType> {
1646        if self.remove_if_expired(key) {
1647            return Ok(vec![]);
1648        }
1649        match self.entries.get_mut(key) {
1650            None => Ok(vec![]),
1651            Some(entry) => match &entry.value {
1652                Value::Hash(map) => {
1653                    let result = map.keys().cloned().collect();
1654                    entry.touch();
1655                    Ok(result)
1656                }
1657                _ => Err(WrongType),
1658            },
1659        }
1660    }
1661
1662    /// Returns all values in a hash.
1663    pub fn hvals(&mut self, key: &str) -> Result<Vec<Bytes>, WrongType> {
1664        if self.remove_if_expired(key) {
1665            return Ok(vec![]);
1666        }
1667        match self.entries.get_mut(key) {
1668            None => Ok(vec![]),
1669            Some(entry) => match &entry.value {
1670                Value::Hash(map) => {
1671                    let result = map.values().cloned().collect();
1672                    entry.touch();
1673                    Ok(result)
1674                }
1675                _ => Err(WrongType),
1676            },
1677        }
1678    }
1679
1680    /// Gets multiple field values from a hash.
1681    ///
1682    /// Returns `None` for fields that don't exist.
1683    pub fn hmget(&mut self, key: &str, fields: &[String]) -> Result<Vec<Option<Bytes>>, WrongType> {
1684        if self.remove_if_expired(key) {
1685            return Ok(fields.iter().map(|_| None).collect());
1686        }
1687        match self.entries.get_mut(key) {
1688            None => Ok(fields.iter().map(|_| None).collect()),
1689            Some(entry) => match &entry.value {
1690                Value::Hash(map) => {
1691                    let result = fields.iter().map(|f| map.get(f).cloned()).collect();
1692                    entry.touch();
1693                    Ok(result)
1694                }
1695                _ => Err(WrongType),
1696            },
1697        }
1698    }
1699
1700    // -------------------------------------------------------------------------
1701    // Set operations
1702    // -------------------------------------------------------------------------
1703
1704    /// Adds one or more members to a set.
1705    ///
1706    /// Creates the set if the key doesn't exist. Returns the number of
1707    /// new members added (existing members don't count).
1708    pub fn sadd(&mut self, key: &str, members: &[String]) -> Result<usize, WriteError> {
1709        if members.is_empty() {
1710            return Ok(0);
1711        }
1712
1713        self.remove_if_expired(key);
1714
1715        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::Set(_)))?;
1716
1717        let member_increase: usize = members
1718            .iter()
1719            .map(|m| m.len() + memory::HASHSET_MEMBER_OVERHEAD)
1720            .sum();
1721        self.reserve_memory(is_new, key, memory::HASHSET_BASE_OVERHEAD, member_increase)?;
1722
1723        if is_new {
1724            self.insert_empty(key, Value::Set(std::collections::HashSet::new()));
1725        }
1726
1727        let added = self
1728            .track_size(key, |entry| {
1729                let Value::Set(ref mut set) = entry.value else {
1730                    unreachable!("type verified by ensure_collection_type");
1731                };
1732                let mut added = 0;
1733                for member in members {
1734                    if set.insert(member.clone()) {
1735                        added += 1;
1736                    }
1737                }
1738                entry.touch();
1739                added
1740            })
1741            .unwrap_or(0);
1742
1743        Ok(added)
1744    }
1745
1746    /// Removes one or more members from a set.
1747    ///
1748    /// Returns the number of members that were actually removed.
1749    pub fn srem(&mut self, key: &str, members: &[String]) -> Result<usize, WrongType> {
1750        if self.remove_if_expired(key) {
1751            return Ok(0);
1752        }
1753
1754        let Some(entry) = self.entries.get_mut(key) else {
1755            return Ok(0);
1756        };
1757        if !matches!(entry.value, Value::Set(_)) {
1758            return Err(WrongType);
1759        }
1760
1761        let old_entry_size = memory::entry_size(key, &entry.value);
1762
1763        let mut removed = 0;
1764        let is_empty = if let Value::Set(ref mut set) = entry.value {
1765            for member in members {
1766                if set.remove(member) {
1767                    removed += 1;
1768                }
1769            }
1770            set.is_empty()
1771        } else {
1772            false
1773        };
1774
1775        self.cleanup_after_remove(key, old_entry_size, is_empty);
1776
1777        Ok(removed)
1778    }
1779
1780    /// Returns all members of a set.
1781    pub fn smembers(&mut self, key: &str) -> Result<Vec<String>, WrongType> {
1782        if self.remove_if_expired(key) {
1783            return Ok(vec![]);
1784        }
1785        match self.entries.get_mut(key) {
1786            None => Ok(vec![]),
1787            Some(entry) => match &entry.value {
1788                Value::Set(set) => {
1789                    let result = set.iter().cloned().collect();
1790                    entry.touch();
1791                    Ok(result)
1792                }
1793                _ => Err(WrongType),
1794            },
1795        }
1796    }
1797
1798    /// Checks if a member exists in a set.
1799    pub fn sismember(&mut self, key: &str, member: &str) -> Result<bool, WrongType> {
1800        if self.remove_if_expired(key) {
1801            return Ok(false);
1802        }
1803        match self.entries.get_mut(key) {
1804            None => Ok(false),
1805            Some(entry) => match &entry.value {
1806                Value::Set(set) => {
1807                    let result = set.contains(member);
1808                    entry.touch();
1809                    Ok(result)
1810                }
1811                _ => Err(WrongType),
1812            },
1813        }
1814    }
1815
1816    /// Returns the cardinality (number of elements) of a set.
1817    pub fn scard(&mut self, key: &str) -> Result<usize, WrongType> {
1818        if self.remove_if_expired(key) {
1819            return Ok(0);
1820        }
1821        match self.entries.get(key) {
1822            None => Ok(0),
1823            Some(entry) => match &entry.value {
1824                Value::Set(set) => Ok(set.len()),
1825                _ => Err(WrongType),
1826            },
1827        }
1828    }
1829
1830    /// Randomly samples up to `count` keys and removes any that have expired.
1831    ///
1832    /// Returns the number of keys actually removed. Used by the active
1833    /// expiration cycle to clean up keys that no one is reading.
1834    pub fn expire_sample(&mut self, count: usize) -> usize {
1835        if self.entries.is_empty() {
1836            return 0;
1837        }
1838
1839        let mut rng = rand::rng();
1840
1841        let keys_to_check: Vec<String> = self
1842            .entries
1843            .keys()
1844            .choose_multiple(&mut rng, count)
1845            .into_iter()
1846            .cloned()
1847            .collect();
1848
1849        let mut removed = 0;
1850        for key in &keys_to_check {
1851            if self.remove_if_expired(key) {
1852                removed += 1;
1853            }
1854        }
1855        removed
1856    }
1857
1858    /// Checks if a key is expired and removes it if so. Returns `true`
1859    /// if the key was removed (or didn't exist).
1860    fn remove_if_expired(&mut self, key: &str) -> bool {
1861        let expired = self
1862            .entries
1863            .get(key)
1864            .map(|e| e.is_expired())
1865            .unwrap_or(false);
1866
1867        if expired {
1868            if let Some(entry) = self.entries.remove(key) {
1869                self.memory.remove(key, &entry.value);
1870                self.decrement_expiry_if_set(&entry);
1871                self.expired_total += 1;
1872                self.defer_drop(entry.value);
1873            }
1874        }
1875        expired
1876    }
1877
1878    // -- vector operations --
1879
1880    /// Adds a vector to a vector set, creating the set if it doesn't exist.
1881    ///
1882    /// On first insert, the set's configuration (dim, metric, quantization,
1883    /// connectivity, expansion_add) is locked. Subsequent inserts must match
1884    /// the established dimensionality.
1885    ///
1886    /// Returns a `VAddResult` with the element name, vector, and whether it
1887    /// was newly added (for AOF recording).
1888    #[cfg(feature = "vector")]
1889    #[allow(clippy::too_many_arguments)]
1890    pub fn vadd(
1891        &mut self,
1892        key: &str,
1893        element: String,
1894        vector: Vec<f32>,
1895        metric: crate::types::vector::DistanceMetric,
1896        quantization: crate::types::vector::QuantizationType,
1897        connectivity: usize,
1898        expansion_add: usize,
1899    ) -> Result<VAddResult, VectorWriteError> {
1900        use crate::types::vector::VectorSet;
1901
1902        self.remove_if_expired(key);
1903
1904        let is_new = match self.entries.get(key) {
1905            None => true,
1906            Some(e) if matches!(e.value, Value::Vector(_)) => false,
1907            Some(_) => return Err(VectorWriteError::WrongType),
1908        };
1909
1910        // estimate memory for the new vector (saturating to avoid overflow)
1911        let dim = vector.len();
1912        let per_vector = dim
1913            .saturating_mul(quantization.bytes_per_element())
1914            .saturating_add(connectivity.saturating_mul(16))
1915            .saturating_add(element.len())
1916            .saturating_add(80);
1917        let estimated_increase = if is_new {
1918            memory::ENTRY_OVERHEAD + key.len() + VectorSet::BASE_OVERHEAD + per_vector
1919        } else {
1920            per_vector
1921        };
1922        if !self.enforce_memory_limit(estimated_increase) {
1923            return Err(VectorWriteError::OutOfMemory);
1924        }
1925
1926        if is_new {
1927            let vs = VectorSet::new(dim, metric, quantization, connectivity, expansion_add)
1928                .map_err(|e| VectorWriteError::IndexError(e.to_string()))?;
1929            let value = Value::Vector(vs);
1930            self.memory.add(key, &value);
1931            self.entries.insert(key.to_owned(), Entry::new(value, None));
1932        }
1933
1934        let entry = match self.entries.get_mut(key) {
1935            Some(e) => e,
1936            None => return Err(VectorWriteError::IndexError("entry missing".into())),
1937        };
1938        let old_entry_size = memory::entry_size(key, &entry.value);
1939
1940        let added = match entry.value {
1941            Value::Vector(ref mut vs) => vs
1942                .add(element.clone(), &vector)
1943                .map_err(|e| VectorWriteError::IndexError(e.to_string()))?,
1944            _ => return Err(VectorWriteError::WrongType),
1945        };
1946        entry.touch();
1947
1948        let new_entry_size = memory::entry_size(key, &entry.value);
1949        self.memory.adjust(old_entry_size, new_entry_size);
1950
1951        Ok(VAddResult {
1952            element,
1953            vector,
1954            added,
1955        })
1956    }
1957
1958    /// Adds multiple vectors to a vector set in a single operation.
1959    ///
1960    /// All vectors are validated upfront (NaN/inf check) before any are inserted.
1961    /// Memory is estimated for the entire batch with one `enforce_memory_limit` call.
1962    /// On usearch error mid-batch, returns the error but already-applied vectors
1963    /// are included in the result for AOF persistence.
1964    #[cfg(feature = "vector")]
1965    #[allow(clippy::too_many_arguments)]
1966    pub fn vadd_batch(
1967        &mut self,
1968        key: &str,
1969        entries: &[(String, Vec<f32>)],
1970        metric: crate::types::vector::DistanceMetric,
1971        quantization: crate::types::vector::QuantizationType,
1972        connectivity: usize,
1973        expansion_add: usize,
1974    ) -> Result<VAddBatchResult, VectorWriteError> {
1975        use crate::types::vector::VectorSet;
1976
1977        if entries.is_empty() {
1978            return Ok(VAddBatchResult {
1979                added_count: 0,
1980                applied: Vec::new(),
1981            });
1982        }
1983
1984        self.remove_if_expired(key);
1985
1986        // type check
1987        let is_new = match self.entries.get(key) {
1988            None => true,
1989            Some(e) if matches!(e.value, Value::Vector(_)) => false,
1990            Some(_) => return Err(VectorWriteError::WrongType),
1991        };
1992
1993        // validate all vectors upfront — reject entire batch on NaN/inf
1994        let dim = entries[0].1.len();
1995        for (elem, vec) in entries {
1996            if vec.len() != dim {
1997                return Err(VectorWriteError::IndexError(format!(
1998                    "dimension mismatch: expected {dim}, element '{elem}' has {}",
1999                    vec.len()
2000                )));
2001            }
2002            for &v in vec {
2003                if v.is_nan() || v.is_infinite() {
2004                    return Err(VectorWriteError::IndexError(format!(
2005                        "element '{elem}' contains NaN or infinity"
2006                    )));
2007                }
2008            }
2009        }
2010
2011        // estimate total memory for all vectors
2012        let per_vector = dim
2013            .saturating_mul(quantization.bytes_per_element())
2014            .saturating_add(connectivity.saturating_mul(16))
2015            .saturating_add(80);
2016        let total_elem_names: usize = entries.iter().map(|(e, _)| e.len()).sum();
2017        let vectors_cost = entries
2018            .len()
2019            .saturating_mul(per_vector)
2020            .saturating_add(total_elem_names);
2021        let estimated_increase = if is_new {
2022            memory::ENTRY_OVERHEAD + key.len() + VectorSet::BASE_OVERHEAD + vectors_cost
2023        } else {
2024            vectors_cost
2025        };
2026        if !self.enforce_memory_limit(estimated_increase) {
2027            return Err(VectorWriteError::OutOfMemory);
2028        }
2029
2030        // create vector set if new
2031        if is_new {
2032            let vs = VectorSet::new(dim, metric, quantization, connectivity, expansion_add)
2033                .map_err(|e| VectorWriteError::IndexError(e.to_string()))?;
2034            let value = Value::Vector(vs);
2035            self.memory.add(key, &value);
2036            self.entries.insert(key.to_owned(), Entry::new(value, None));
2037        }
2038
2039        let entry = match self.entries.get_mut(key) {
2040            Some(e) => e,
2041            None => return Err(VectorWriteError::IndexError("entry missing".into())),
2042        };
2043        let old_entry_size = memory::entry_size(key, &entry.value);
2044
2045        let mut added_count = 0;
2046        let mut applied = Vec::with_capacity(entries.len());
2047
2048        match entry.value {
2049            Value::Vector(ref mut vs) => {
2050                for (element, vector) in entries {
2051                    match vs.add(element.clone(), vector) {
2052                        Ok(added) => {
2053                            if added {
2054                                added_count += 1;
2055                            }
2056                            applied.push((element.clone(), vector.clone()));
2057                        }
2058                        Err(e) => {
2059                            // partial insert: return applied vectors so they can
2060                            // be persisted to AOF despite the error
2061                            entry.touch();
2062                            let new_entry_size = memory::entry_size(key, &entry.value);
2063                            self.memory.adjust(old_entry_size, new_entry_size);
2064                            return Err(VectorWriteError::PartialBatch {
2065                                message: format!(
2066                                    "error at element '{}': {e} ({} vectors applied before failure)",
2067                                    element,
2068                                    applied.len()
2069                                ),
2070                                applied,
2071                            });
2072                        }
2073                    }
2074                }
2075            }
2076            _ => return Err(VectorWriteError::WrongType),
2077        }
2078
2079        entry.touch();
2080        let new_entry_size = memory::entry_size(key, &entry.value);
2081        self.memory.adjust(old_entry_size, new_entry_size);
2082
2083        Ok(VAddBatchResult {
2084            added_count,
2085            applied,
2086        })
2087    }
2088
2089    /// Searches for the k nearest neighbors in a vector set.
2090    #[cfg(feature = "vector")]
2091    pub fn vsim(
2092        &mut self,
2093        key: &str,
2094        query: &[f32],
2095        count: usize,
2096        ef_search: usize,
2097    ) -> Result<Vec<crate::types::vector::SearchResult>, WrongType> {
2098        if self.remove_if_expired(key) {
2099            return Ok(Vec::new());
2100        }
2101
2102        let entry = match self.entries.get_mut(key) {
2103            Some(e) => e,
2104            None => return Ok(Vec::new()),
2105        };
2106
2107        entry.touch();
2108
2109        match entry.value {
2110            Value::Vector(ref vs) => vs.search(query, count, ef_search).map_err(|_| WrongType),
2111            _ => Err(WrongType),
2112        }
2113    }
2114
2115    /// Removes an element from a vector set. Returns `true` if the element
2116    /// existed. Deletes the key if the set becomes empty.
2117    #[cfg(feature = "vector")]
2118    pub fn vrem(&mut self, key: &str, element: &str) -> Result<bool, WrongType> {
2119        if self.remove_if_expired(key) {
2120            return Ok(false);
2121        }
2122
2123        let entry = match self.entries.get_mut(key) {
2124            Some(e) => e,
2125            None => return Ok(false),
2126        };
2127
2128        if !matches!(entry.value, Value::Vector(_)) {
2129            return Err(WrongType);
2130        }
2131
2132        let old_size = memory::entry_size(key, &entry.value);
2133
2134        let removed = match entry.value {
2135            Value::Vector(ref mut vs) => vs.remove(element),
2136            _ => return Err(WrongType),
2137        };
2138
2139        if removed {
2140            entry.touch();
2141            let is_empty = matches!(entry.value, Value::Vector(ref vs) if vs.is_empty());
2142            let new_size = memory::entry_size(key, &entry.value);
2143            self.memory.adjust(old_size, new_size);
2144
2145            if is_empty {
2146                self.memory.remove_with_size(new_size);
2147                self.entries.remove(key);
2148            }
2149        }
2150
2151        Ok(removed)
2152    }
2153
2154    /// Retrieves the stored vector for an element.
2155    #[cfg(feature = "vector")]
2156    pub fn vget(&mut self, key: &str, element: &str) -> Result<Option<Vec<f32>>, WrongType> {
2157        if self.remove_if_expired(key) {
2158            return Ok(None);
2159        }
2160
2161        let entry = match self.entries.get_mut(key) {
2162            Some(e) => e,
2163            None => return Ok(None),
2164        };
2165
2166        entry.touch();
2167
2168        match entry.value {
2169            Value::Vector(ref vs) => Ok(vs.get(element)),
2170            _ => Err(WrongType),
2171        }
2172    }
2173
2174    /// Returns the number of elements in a vector set.
2175    #[cfg(feature = "vector")]
2176    pub fn vcard(&mut self, key: &str) -> Result<usize, WrongType> {
2177        if self.remove_if_expired(key) {
2178            return Ok(0);
2179        }
2180
2181        match self.entries.get(key) {
2182            None => Ok(0),
2183            Some(e) => match e.value {
2184                Value::Vector(ref vs) => Ok(vs.len()),
2185                _ => Err(WrongType),
2186            },
2187        }
2188    }
2189
2190    /// Returns the dimensionality of a vector set, or 0 if the key doesn't exist.
2191    #[cfg(feature = "vector")]
2192    pub fn vdim(&mut self, key: &str) -> Result<usize, WrongType> {
2193        if self.remove_if_expired(key) {
2194            return Ok(0);
2195        }
2196
2197        match self.entries.get(key) {
2198            None => Ok(0),
2199            Some(e) => match e.value {
2200                Value::Vector(ref vs) => Ok(vs.dim()),
2201                _ => Err(WrongType),
2202            },
2203        }
2204    }
2205
2206    /// Returns metadata about a vector set.
2207    #[cfg(feature = "vector")]
2208    pub fn vinfo(
2209        &mut self,
2210        key: &str,
2211    ) -> Result<Option<crate::types::vector::VectorSetInfo>, WrongType> {
2212        if self.remove_if_expired(key) {
2213            return Ok(None);
2214        }
2215
2216        match self.entries.get(key) {
2217            None => Ok(None),
2218            Some(e) => match e.value {
2219                Value::Vector(ref vs) => Ok(Some(vs.info())),
2220                _ => Err(WrongType),
2221            },
2222        }
2223    }
2224
2225    // -- protobuf operations --
2226
2227    /// Stores a protobuf value. No schema validation here — that's the
2228    /// server's responsibility. Follows the same pattern as `set()`.
2229    #[cfg(feature = "protobuf")]
2230    pub fn proto_set(
2231        &mut self,
2232        key: String,
2233        type_name: String,
2234        data: Bytes,
2235        expire: Option<Duration>,
2236    ) -> SetResult {
2237        let has_expiry = expire.is_some();
2238        let new_value = Value::Proto { type_name, data };
2239
2240        let new_size = memory::entry_size(&key, &new_value);
2241        let old_size = self
2242            .entries
2243            .get(&key)
2244            .map(|e| memory::entry_size(&key, &e.value))
2245            .unwrap_or(0);
2246        let net_increase = new_size.saturating_sub(old_size);
2247
2248        if !self.enforce_memory_limit(net_increase) {
2249            return SetResult::OutOfMemory;
2250        }
2251
2252        if let Some(old_entry) = self.entries.get(&key) {
2253            self.memory.replace(&key, &old_entry.value, &new_value);
2254            let had_expiry = old_entry.expires_at_ms != 0;
2255            match (had_expiry, has_expiry) {
2256                (false, true) => self.expiry_count += 1,
2257                (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
2258                _ => {}
2259            }
2260        } else {
2261            self.memory.add(&key, &new_value);
2262            if has_expiry {
2263                self.expiry_count += 1;
2264            }
2265        }
2266
2267        self.entries.insert(key, Entry::new(new_value, expire));
2268        SetResult::Ok
2269    }
2270
2271    /// Retrieves a proto value, returning `(type_name, data, remaining_ttl)`
2272    /// or `None`.
2273    ///
2274    /// The remaining TTL is `Some(duration)` if the key has an expiry set,
2275    /// or `None` for keys that never expire. This allows callers to preserve
2276    /// the TTL across read-modify-write cycles (e.g. SETFIELD/DELFIELD).
2277    ///
2278    /// Returns `Err(WrongType)` if the key holds a different value type.
2279    #[cfg(feature = "protobuf")]
2280    pub fn proto_get(
2281        &mut self,
2282        key: &str,
2283    ) -> Result<Option<(String, Bytes, Option<Duration>)>, WrongType> {
2284        if self.remove_if_expired(key) {
2285            return Ok(None);
2286        }
2287        match self.entries.get_mut(key) {
2288            Some(e) => {
2289                if let Value::Proto { type_name, data } = &e.value {
2290                    let remaining = if e.expires_at_ms == 0 {
2291                        None
2292                    } else {
2293                        let now = time::now_ms();
2294                        Some(Duration::from_millis(e.expires_at_ms.saturating_sub(now)))
2295                    };
2296                    let result = (type_name.clone(), data.clone(), remaining);
2297                    e.touch();
2298                    Ok(Some(result))
2299                } else {
2300                    Err(WrongType)
2301                }
2302            }
2303            None => Ok(None),
2304        }
2305    }
2306
2307    /// Returns the protobuf message type name for a key, or `None` if
2308    /// the key doesn't exist.
2309    ///
2310    /// Returns `Err(WrongType)` if the key holds a non-proto value.
2311    #[cfg(feature = "protobuf")]
2312    pub fn proto_type(&mut self, key: &str) -> Result<Option<String>, WrongType> {
2313        if self.remove_if_expired(key) {
2314            return Ok(None);
2315        }
2316        match self.entries.get(key) {
2317            Some(e) => match &e.value {
2318                Value::Proto { type_name, .. } => Ok(Some(type_name.clone())),
2319                _ => Err(WrongType),
2320            },
2321            None => Ok(None),
2322        }
2323    }
2324
2325    /// Sends a value to the background drop thread if one is configured
2326    /// and the value is large enough to justify the overhead.
2327    fn defer_drop(&self, value: Value) {
2328        if let Some(ref handle) = self.drop_handle {
2329            handle.defer_value(value);
2330        }
2331    }
2332}
2333
2334impl Default for Keyspace {
2335    fn default() -> Self {
2336        Self::new()
2337    }
2338}
2339
2340/// Formats a float value matching Redis behavior.
2341///
2342/// Uses up to 17 significant digits and strips unnecessary trailing zeros,
2343/// but always keeps at least one decimal place for non-integer results.
2344pub(crate) fn format_float(val: f64) -> String {
2345    if val == 0.0 {
2346        return "0".into();
2347    }
2348    // Use enough precision to round-trip
2349    let s = format!("{:.17e}", val);
2350    // Parse back to get the clean representation
2351    let reparsed: f64 = s.parse().unwrap_or(val);
2352    // If it's a whole number that fits in i64, format without decimals
2353    if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
2354        format!("{}", reparsed as i64)
2355    } else {
2356        // Use ryu-like formatting via Display which strips trailing zeros
2357        let formatted = format!("{}", reparsed);
2358        formatted
2359    }
2360}
2361
2362/// Glob-style pattern matching for SCAN's MATCH option.
2363///
2364/// Supports:
2365/// - `*` matches any sequence of characters (including empty)
2366/// - `?` matches exactly one character
2367/// - `[abc]` matches one character from the set
2368/// - `[^abc]` or `[!abc]` matches one character NOT in the set
2369///
2370/// Uses an iterative two-pointer algorithm with backtracking for O(n*m)
2371/// worst-case performance, where n is pattern length and m is text length.
2372///
2373/// Prefer [`GlobPattern::new`] + [`GlobPattern::matches`] when matching
2374/// the same pattern against many strings (KEYS, SCAN) to avoid
2375/// re-collecting the pattern chars on every call.
2376pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
2377    let pat: Vec<char> = pattern.chars().collect();
2378    glob_match_compiled(&pat, text)
2379}
2380
2381/// Pre-compiled glob pattern that avoids re-allocating pattern chars
2382/// on every match call. Use for KEYS/SCAN where the same pattern is
2383/// tested against every key in the keyspace.
2384pub(crate) struct GlobPattern {
2385    chars: Vec<char>,
2386}
2387
2388impl GlobPattern {
2389    pub(crate) fn new(pattern: &str) -> Self {
2390        Self {
2391            chars: pattern.chars().collect(),
2392        }
2393    }
2394
2395    pub(crate) fn matches(&self, text: &str) -> bool {
2396        glob_match_compiled(&self.chars, text)
2397    }
2398}
2399
2400/// Core glob matching against a pre-compiled pattern char slice.
2401fn glob_match_compiled(pat: &[char], text: &str) -> bool {
2402    let txt: Vec<char> = text.chars().collect();
2403
2404    let mut pi = 0; // pattern index
2405    let mut ti = 0; // text index
2406
2407    // backtracking state for the most recent '*'
2408    let mut star_pi: Option<usize> = None;
2409    let mut star_ti: usize = 0;
2410
2411    while ti < txt.len() || pi < pat.len() {
2412        if pi < pat.len() {
2413            match pat[pi] {
2414                '*' => {
2415                    // record star position and try matching zero chars first
2416                    star_pi = Some(pi);
2417                    star_ti = ti;
2418                    pi += 1;
2419                    continue;
2420                }
2421                '?' if ti < txt.len() => {
2422                    pi += 1;
2423                    ti += 1;
2424                    continue;
2425                }
2426                '[' if ti < txt.len() => {
2427                    // parse character class
2428                    let tc = txt[ti];
2429                    let mut j = pi + 1;
2430                    let mut negated = false;
2431                    let mut matched = false;
2432
2433                    if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
2434                        negated = true;
2435                        j += 1;
2436                    }
2437
2438                    while j < pat.len() && pat[j] != ']' {
2439                        if pat[j] == tc {
2440                            matched = true;
2441                        }
2442                        j += 1;
2443                    }
2444
2445                    if negated {
2446                        matched = !matched;
2447                    }
2448
2449                    if matched && j < pat.len() {
2450                        pi = j + 1; // skip past ']'
2451                        ti += 1;
2452                        continue;
2453                    }
2454                    // fall through to backtrack
2455                }
2456                c if ti < txt.len() && c == txt[ti] => {
2457                    pi += 1;
2458                    ti += 1;
2459                    continue;
2460                }
2461                _ => {}
2462            }
2463        }
2464
2465        // mismatch or end of pattern — try backtracking to last '*'
2466        if let Some(sp) = star_pi {
2467            pi = sp + 1;
2468            star_ti += 1;
2469            ti = star_ti;
2470            if ti > txt.len() {
2471                return false;
2472            }
2473        } else {
2474            return false;
2475        }
2476    }
2477
2478    // skip trailing '*' in pattern
2479    while pi < pat.len() && pat[pi] == '*' {
2480        pi += 1;
2481    }
2482
2483    pi == pat.len()
2484}
2485
2486#[cfg(test)]
2487mod tests {
2488    use super::*;
2489    use std::thread;
2490
2491    #[test]
2492    fn set_and_get() {
2493        let mut ks = Keyspace::new();
2494        ks.set("hello".into(), Bytes::from("world"), None);
2495        assert_eq!(
2496            ks.get("hello").unwrap(),
2497            Some(Value::String(Bytes::from("world")))
2498        );
2499    }
2500
2501    #[test]
2502    fn get_missing_key() {
2503        let mut ks = Keyspace::new();
2504        assert_eq!(ks.get("nope").unwrap(), None);
2505    }
2506
2507    #[test]
2508    fn overwrite_replaces_value() {
2509        let mut ks = Keyspace::new();
2510        ks.set("key".into(), Bytes::from("first"), None);
2511        ks.set("key".into(), Bytes::from("second"), None);
2512        assert_eq!(
2513            ks.get("key").unwrap(),
2514            Some(Value::String(Bytes::from("second")))
2515        );
2516    }
2517
2518    #[test]
2519    fn overwrite_clears_old_ttl() {
2520        let mut ks = Keyspace::new();
2521        ks.set(
2522            "key".into(),
2523            Bytes::from("v1"),
2524            Some(Duration::from_secs(100)),
2525        );
2526        // overwrite without TTL — should clear the old one
2527        ks.set("key".into(), Bytes::from("v2"), None);
2528        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2529    }
2530
2531    #[test]
2532    fn del_existing() {
2533        let mut ks = Keyspace::new();
2534        ks.set("key".into(), Bytes::from("val"), None);
2535        assert!(ks.del("key"));
2536        assert_eq!(ks.get("key").unwrap(), None);
2537    }
2538
2539    #[test]
2540    fn del_missing() {
2541        let mut ks = Keyspace::new();
2542        assert!(!ks.del("nope"));
2543    }
2544
2545    #[test]
2546    fn exists_present_and_absent() {
2547        let mut ks = Keyspace::new();
2548        ks.set("yes".into(), Bytes::from("here"), None);
2549        assert!(ks.exists("yes"));
2550        assert!(!ks.exists("no"));
2551    }
2552
2553    #[test]
2554    fn expired_key_returns_none() {
2555        let mut ks = Keyspace::new();
2556        ks.set(
2557            "temp".into(),
2558            Bytes::from("gone"),
2559            Some(Duration::from_millis(10)),
2560        );
2561        // wait for expiration
2562        thread::sleep(Duration::from_millis(30));
2563        assert_eq!(ks.get("temp").unwrap(), None);
2564        // should also be gone from exists
2565        assert!(!ks.exists("temp"));
2566    }
2567
2568    #[test]
2569    fn ttl_no_expiry() {
2570        let mut ks = Keyspace::new();
2571        ks.set("key".into(), Bytes::from("val"), None);
2572        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2573    }
2574
2575    #[test]
2576    fn ttl_not_found() {
2577        let mut ks = Keyspace::new();
2578        assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
2579    }
2580
2581    #[test]
2582    fn ttl_with_expiry() {
2583        let mut ks = Keyspace::new();
2584        ks.set(
2585            "key".into(),
2586            Bytes::from("val"),
2587            Some(Duration::from_secs(100)),
2588        );
2589        match ks.ttl("key") {
2590            TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
2591            other => panic!("expected Seconds, got {other:?}"),
2592        }
2593    }
2594
2595    #[test]
2596    fn ttl_expired_key() {
2597        let mut ks = Keyspace::new();
2598        ks.set(
2599            "temp".into(),
2600            Bytes::from("val"),
2601            Some(Duration::from_millis(10)),
2602        );
2603        thread::sleep(Duration::from_millis(30));
2604        assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
2605    }
2606
2607    #[test]
2608    fn expire_existing_key() {
2609        let mut ks = Keyspace::new();
2610        ks.set("key".into(), Bytes::from("val"), None);
2611        assert!(ks.expire("key", 60));
2612        match ks.ttl("key") {
2613            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2614            other => panic!("expected Seconds, got {other:?}"),
2615        }
2616    }
2617
2618    #[test]
2619    fn expire_missing_key() {
2620        let mut ks = Keyspace::new();
2621        assert!(!ks.expire("nope", 60));
2622    }
2623
2624    #[test]
2625    fn del_expired_key_returns_false() {
2626        let mut ks = Keyspace::new();
2627        ks.set(
2628            "temp".into(),
2629            Bytes::from("val"),
2630            Some(Duration::from_millis(10)),
2631        );
2632        thread::sleep(Duration::from_millis(30));
2633        // key is expired, del should return false (not found)
2634        assert!(!ks.del("temp"));
2635    }
2636
2637    // -- memory tracking tests --
2638
2639    #[test]
2640    fn memory_increases_on_set() {
2641        let mut ks = Keyspace::new();
2642        assert_eq!(ks.stats().used_bytes, 0);
2643        ks.set("key".into(), Bytes::from("value"), None);
2644        assert!(ks.stats().used_bytes > 0);
2645        assert_eq!(ks.stats().key_count, 1);
2646    }
2647
2648    #[test]
2649    fn memory_decreases_on_del() {
2650        let mut ks = Keyspace::new();
2651        ks.set("key".into(), Bytes::from("value"), None);
2652        let after_set = ks.stats().used_bytes;
2653        ks.del("key");
2654        assert_eq!(ks.stats().used_bytes, 0);
2655        assert!(after_set > 0);
2656    }
2657
2658    #[test]
2659    fn memory_adjusts_on_overwrite() {
2660        let mut ks = Keyspace::new();
2661        ks.set("key".into(), Bytes::from("short"), None);
2662        let small = ks.stats().used_bytes;
2663
2664        ks.set("key".into(), Bytes::from("a much longer value"), None);
2665        let large = ks.stats().used_bytes;
2666
2667        assert!(large > small);
2668        assert_eq!(ks.stats().key_count, 1);
2669    }
2670
2671    #[test]
2672    fn memory_decreases_on_expired_removal() {
2673        let mut ks = Keyspace::new();
2674        ks.set(
2675            "temp".into(),
2676            Bytes::from("data"),
2677            Some(Duration::from_millis(10)),
2678        );
2679        assert!(ks.stats().used_bytes > 0);
2680        thread::sleep(Duration::from_millis(30));
2681        // trigger lazy expiration
2682        let _ = ks.get("temp");
2683        assert_eq!(ks.stats().used_bytes, 0);
2684        assert_eq!(ks.stats().key_count, 0);
2685    }
2686
2687    #[test]
2688    fn stats_tracks_expiry_count() {
2689        let mut ks = Keyspace::new();
2690        ks.set("a".into(), Bytes::from("1"), None);
2691        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
2692        ks.set("c".into(), Bytes::from("3"), Some(Duration::from_secs(200)));
2693
2694        let stats = ks.stats();
2695        assert_eq!(stats.key_count, 3);
2696        assert_eq!(stats.keys_with_expiry, 2);
2697    }
2698
2699    // -- eviction tests --
2700
2701    #[test]
2702    fn noeviction_returns_oom_when_full() {
2703        // one entry with key "a" + value "val" = 1 + 3 + 96 = 100 bytes
2704        // set limit so one entry fits but two don't
2705        let config = ShardConfig {
2706            max_memory: Some(150),
2707            eviction_policy: EvictionPolicy::NoEviction,
2708            ..ShardConfig::default()
2709        };
2710        let mut ks = Keyspace::with_config(config);
2711
2712        // first key should fit
2713        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2714
2715        // second key should push us over the limit
2716        let result = ks.set("b".into(), Bytes::from("val"), None);
2717        assert_eq!(result, SetResult::OutOfMemory);
2718
2719        // original key should still be there
2720        assert!(ks.exists("a"));
2721    }
2722
2723    #[test]
2724    fn lru_eviction_makes_room() {
2725        let config = ShardConfig {
2726            max_memory: Some(150),
2727            eviction_policy: EvictionPolicy::AllKeysLru,
2728            ..ShardConfig::default()
2729        };
2730        let mut ks = Keyspace::with_config(config);
2731
2732        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2733
2734        // this should trigger eviction of "a" to make room
2735        assert_eq!(ks.set("b".into(), Bytes::from("val"), None), SetResult::Ok);
2736
2737        // "a" should have been evicted
2738        assert!(!ks.exists("a"));
2739        assert!(ks.exists("b"));
2740    }
2741
2742    #[test]
2743    fn safety_margin_rejects_near_raw_limit() {
2744        // one entry = 1 (key) + 3 (val) + 128 (overhead) = 132 bytes.
2745        // configure max_memory = 147. effective limit = 147 * 90 / 100 = 132.
2746        // the entry fills exactly the effective limit, so a second entry should
2747        // be rejected even though the raw limit has 15 bytes of headroom.
2748        let config = ShardConfig {
2749            max_memory: Some(147),
2750            eviction_policy: EvictionPolicy::NoEviction,
2751            ..ShardConfig::default()
2752        };
2753        let mut ks = Keyspace::with_config(config);
2754
2755        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2756
2757        let result = ks.set("b".into(), Bytes::from("val"), None);
2758        assert_eq!(result, SetResult::OutOfMemory);
2759    }
2760
2761    #[test]
2762    fn overwrite_same_size_succeeds_at_limit() {
2763        let config = ShardConfig {
2764            max_memory: Some(150),
2765            eviction_policy: EvictionPolicy::NoEviction,
2766            ..ShardConfig::default()
2767        };
2768        let mut ks = Keyspace::with_config(config);
2769
2770        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2771
2772        // overwriting with same-size value should succeed — no net increase
2773        assert_eq!(ks.set("a".into(), Bytes::from("new"), None), SetResult::Ok);
2774        assert_eq!(
2775            ks.get("a").unwrap(),
2776            Some(Value::String(Bytes::from("new")))
2777        );
2778    }
2779
2780    #[test]
2781    fn overwrite_larger_value_respects_limit() {
2782        let config = ShardConfig {
2783            max_memory: Some(150),
2784            eviction_policy: EvictionPolicy::NoEviction,
2785            ..ShardConfig::default()
2786        };
2787        let mut ks = Keyspace::with_config(config);
2788
2789        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2790
2791        // overwriting with a much larger value should fail if it exceeds limit
2792        let big_value = "x".repeat(200);
2793        let result = ks.set("a".into(), Bytes::from(big_value), None);
2794        assert_eq!(result, SetResult::OutOfMemory);
2795
2796        // original value should still be intact
2797        assert_eq!(
2798            ks.get("a").unwrap(),
2799            Some(Value::String(Bytes::from("val")))
2800        );
2801    }
2802
2803    // -- iter_entries tests --
2804
2805    #[test]
2806    fn iter_entries_returns_live_entries() {
2807        let mut ks = Keyspace::new();
2808        ks.set("a".into(), Bytes::from("1"), None);
2809        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
2810
2811        let entries: Vec<_> = ks.iter_entries().collect();
2812        assert_eq!(entries.len(), 2);
2813    }
2814
2815    #[test]
2816    fn iter_entries_skips_expired() {
2817        let mut ks = Keyspace::new();
2818        ks.set(
2819            "dead".into(),
2820            Bytes::from("gone"),
2821            Some(Duration::from_millis(1)),
2822        );
2823        ks.set("alive".into(), Bytes::from("here"), None);
2824        thread::sleep(Duration::from_millis(10));
2825
2826        let entries: Vec<_> = ks.iter_entries().collect();
2827        assert_eq!(entries.len(), 1);
2828        assert_eq!(entries[0].0, "alive");
2829    }
2830
2831    #[test]
2832    fn iter_entries_ttl_for_no_expiry() {
2833        let mut ks = Keyspace::new();
2834        ks.set("permanent".into(), Bytes::from("val"), None);
2835
2836        let entries: Vec<_> = ks.iter_entries().collect();
2837        assert_eq!(entries[0].2, -1);
2838    }
2839
2840    // -- restore tests --
2841
2842    #[test]
2843    fn restore_adds_entry() {
2844        let mut ks = Keyspace::new();
2845        ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
2846        assert_eq!(
2847            ks.get("restored").unwrap(),
2848            Some(Value::String(Bytes::from("data")))
2849        );
2850        assert_eq!(ks.stats().key_count, 1);
2851    }
2852
2853    #[test]
2854    fn restore_with_zero_ttl_expires_immediately() {
2855        let mut ks = Keyspace::new();
2856        // TTL of 0 should create entry that expires immediately
2857        ks.restore(
2858            "short-lived".into(),
2859            Value::String(Bytes::from("data")),
2860            Some(Duration::from_millis(1)),
2861        );
2862        // Entry exists but will be expired on access
2863        std::thread::sleep(Duration::from_millis(5));
2864        assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
2865    }
2866
2867    #[test]
2868    fn restore_overwrites_existing() {
2869        let mut ks = Keyspace::new();
2870        ks.set("key".into(), Bytes::from("old"), None);
2871        ks.restore("key".into(), Value::String(Bytes::from("new")), None);
2872        assert_eq!(
2873            ks.get("key").unwrap(),
2874            Some(Value::String(Bytes::from("new")))
2875        );
2876        assert_eq!(ks.stats().key_count, 1);
2877    }
2878
2879    #[test]
2880    fn restore_bypasses_memory_limit() {
2881        let config = ShardConfig {
2882            max_memory: Some(50), // very small
2883            eviction_policy: EvictionPolicy::NoEviction,
2884            ..ShardConfig::default()
2885        };
2886        let mut ks = Keyspace::with_config(config);
2887
2888        // normal set would fail due to memory limit
2889        ks.restore(
2890            "big".into(),
2891            Value::String(Bytes::from("x".repeat(200))),
2892            None,
2893        );
2894        assert_eq!(ks.stats().key_count, 1);
2895    }
2896
2897    #[test]
2898    fn no_limit_never_rejects() {
2899        // default config has no memory limit
2900        let mut ks = Keyspace::new();
2901        for i in 0..100 {
2902            assert_eq!(
2903                ks.set(format!("key:{i}"), Bytes::from("value"), None),
2904                SetResult::Ok
2905            );
2906        }
2907        assert_eq!(ks.len(), 100);
2908    }
2909
2910    // -- list tests --
2911
2912    #[test]
2913    fn lpush_creates_list() {
2914        let mut ks = Keyspace::new();
2915        let len = ks
2916            .lpush("list", &[Bytes::from("a"), Bytes::from("b")])
2917            .unwrap();
2918        assert_eq!(len, 2);
2919        // lpush pushes each to front, so order is b, a
2920        let items = ks.lrange("list", 0, -1).unwrap();
2921        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("a")]);
2922    }
2923
2924    #[test]
2925    fn rpush_creates_list() {
2926        let mut ks = Keyspace::new();
2927        let len = ks
2928            .rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2929            .unwrap();
2930        assert_eq!(len, 2);
2931        let items = ks.lrange("list", 0, -1).unwrap();
2932        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
2933    }
2934
2935    #[test]
2936    fn push_to_existing_list() {
2937        let mut ks = Keyspace::new();
2938        ks.rpush("list", &[Bytes::from("a")]).unwrap();
2939        let len = ks.rpush("list", &[Bytes::from("b")]).unwrap();
2940        assert_eq!(len, 2);
2941    }
2942
2943    #[test]
2944    fn lpop_returns_front() {
2945        let mut ks = Keyspace::new();
2946        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2947            .unwrap();
2948        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("a")));
2949        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("b")));
2950        assert_eq!(ks.lpop("list").unwrap(), None); // empty, key deleted
2951    }
2952
2953    #[test]
2954    fn rpop_returns_back() {
2955        let mut ks = Keyspace::new();
2956        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2957            .unwrap();
2958        assert_eq!(ks.rpop("list").unwrap(), Some(Bytes::from("b")));
2959    }
2960
2961    #[test]
2962    fn pop_from_missing_key() {
2963        let mut ks = Keyspace::new();
2964        assert_eq!(ks.lpop("nope").unwrap(), None);
2965        assert_eq!(ks.rpop("nope").unwrap(), None);
2966    }
2967
2968    #[test]
2969    fn empty_list_auto_deletes_key() {
2970        let mut ks = Keyspace::new();
2971        ks.rpush("list", &[Bytes::from("only")]).unwrap();
2972        ks.lpop("list").unwrap();
2973        assert!(!ks.exists("list"));
2974        assert_eq!(ks.stats().key_count, 0);
2975        assert_eq!(ks.stats().used_bytes, 0);
2976    }
2977
2978    #[test]
2979    fn lrange_negative_indices() {
2980        let mut ks = Keyspace::new();
2981        ks.rpush(
2982            "list",
2983            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
2984        )
2985        .unwrap();
2986        // -2 to -1 => last two elements
2987        let items = ks.lrange("list", -2, -1).unwrap();
2988        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
2989    }
2990
2991    #[test]
2992    fn lrange_out_of_bounds_clamps() {
2993        let mut ks = Keyspace::new();
2994        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2995            .unwrap();
2996        let items = ks.lrange("list", -100, 100).unwrap();
2997        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
2998    }
2999
3000    #[test]
3001    fn lrange_missing_key_returns_empty() {
3002        let mut ks = Keyspace::new();
3003        assert_eq!(ks.lrange("nope", 0, -1).unwrap(), Vec::<Bytes>::new());
3004    }
3005
3006    #[test]
3007    fn llen_returns_length() {
3008        let mut ks = Keyspace::new();
3009        assert_eq!(ks.llen("nope").unwrap(), 0);
3010        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
3011            .unwrap();
3012        assert_eq!(ks.llen("list").unwrap(), 2);
3013    }
3014
3015    #[test]
3016    fn list_memory_tracked_on_push_pop() {
3017        let mut ks = Keyspace::new();
3018        ks.rpush("list", &[Bytes::from("hello")]).unwrap();
3019        let after_push = ks.stats().used_bytes;
3020        assert!(after_push > 0);
3021
3022        ks.rpush("list", &[Bytes::from("world")]).unwrap();
3023        let after_second = ks.stats().used_bytes;
3024        assert!(after_second > after_push);
3025
3026        ks.lpop("list").unwrap();
3027        let after_pop = ks.stats().used_bytes;
3028        assert!(after_pop < after_second);
3029    }
3030
3031    #[test]
3032    fn lpush_on_string_key_returns_wrongtype() {
3033        let mut ks = Keyspace::new();
3034        ks.set("s".into(), Bytes::from("val"), None);
3035        assert!(ks.lpush("s", &[Bytes::from("nope")]).is_err());
3036    }
3037
3038    #[test]
3039    fn lrange_on_string_key_returns_wrongtype() {
3040        let mut ks = Keyspace::new();
3041        ks.set("s".into(), Bytes::from("val"), None);
3042        assert!(ks.lrange("s", 0, -1).is_err());
3043    }
3044
3045    #[test]
3046    fn llen_on_string_key_returns_wrongtype() {
3047        let mut ks = Keyspace::new();
3048        ks.set("s".into(), Bytes::from("val"), None);
3049        assert!(ks.llen("s").is_err());
3050    }
3051
3052    // -- wrongtype tests --
3053
3054    #[test]
3055    fn get_on_list_key_returns_wrongtype() {
3056        let mut ks = Keyspace::new();
3057        let mut list = std::collections::VecDeque::new();
3058        list.push_back(Bytes::from("item"));
3059        ks.restore("mylist".into(), Value::List(list), None);
3060
3061        assert!(ks.get("mylist").is_err());
3062    }
3063
3064    #[test]
3065    fn value_type_returns_correct_types() {
3066        let mut ks = Keyspace::new();
3067        assert_eq!(ks.value_type("missing"), "none");
3068
3069        ks.set("s".into(), Bytes::from("val"), None);
3070        assert_eq!(ks.value_type("s"), "string");
3071
3072        let mut list = std::collections::VecDeque::new();
3073        list.push_back(Bytes::from("item"));
3074        ks.restore("l".into(), Value::List(list), None);
3075        assert_eq!(ks.value_type("l"), "list");
3076
3077        ks.zadd("z", &[(1.0, "a".into())], &ZAddFlags::default())
3078            .unwrap();
3079        assert_eq!(ks.value_type("z"), "zset");
3080    }
3081
3082    // -- sorted set tests --
3083
3084    #[test]
3085    fn zadd_creates_sorted_set() {
3086        let mut ks = Keyspace::new();
3087        let result = ks
3088            .zadd(
3089                "board",
3090                &[(100.0, "alice".into()), (200.0, "bob".into())],
3091                &ZAddFlags::default(),
3092            )
3093            .unwrap();
3094        assert_eq!(result.count, 2);
3095        assert_eq!(result.applied.len(), 2);
3096        assert_eq!(ks.value_type("board"), "zset");
3097    }
3098
3099    #[test]
3100    fn zadd_updates_existing_score() {
3101        let mut ks = Keyspace::new();
3102        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
3103            .unwrap();
3104        // update score — default flags don't count updates
3105        let result = ks
3106            .zadd("z", &[(200.0, "alice".into())], &ZAddFlags::default())
3107            .unwrap();
3108        assert_eq!(result.count, 0);
3109        // score was updated, so applied should have the member
3110        assert_eq!(result.applied.len(), 1);
3111        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
3112    }
3113
3114    #[test]
3115    fn zadd_ch_flag_counts_changes() {
3116        let mut ks = Keyspace::new();
3117        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
3118            .unwrap();
3119        let flags = ZAddFlags {
3120            ch: true,
3121            ..Default::default()
3122        };
3123        let result = ks
3124            .zadd(
3125                "z",
3126                &[(200.0, "alice".into()), (50.0, "bob".into())],
3127                &flags,
3128            )
3129            .unwrap();
3130        // 1 updated + 1 added = 2
3131        assert_eq!(result.count, 2);
3132        assert_eq!(result.applied.len(), 2);
3133    }
3134
3135    #[test]
3136    fn zadd_nx_skips_existing() {
3137        let mut ks = Keyspace::new();
3138        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
3139            .unwrap();
3140        let flags = ZAddFlags {
3141            nx: true,
3142            ..Default::default()
3143        };
3144        let result = ks.zadd("z", &[(999.0, "alice".into())], &flags).unwrap();
3145        assert_eq!(result.count, 0);
3146        assert!(result.applied.is_empty());
3147        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
3148    }
3149
3150    #[test]
3151    fn zadd_xx_skips_new() {
3152        let mut ks = Keyspace::new();
3153        let flags = ZAddFlags {
3154            xx: true,
3155            ..Default::default()
3156        };
3157        let result = ks.zadd("z", &[(100.0, "alice".into())], &flags).unwrap();
3158        assert_eq!(result.count, 0);
3159        assert!(result.applied.is_empty());
3160        // key should be cleaned up since nothing was added
3161        assert_eq!(ks.value_type("z"), "none");
3162    }
3163
3164    #[test]
3165    fn zadd_gt_only_increases() {
3166        let mut ks = Keyspace::new();
3167        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
3168            .unwrap();
3169        let flags = ZAddFlags {
3170            gt: true,
3171            ..Default::default()
3172        };
3173        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
3174        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
3175        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
3176        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
3177    }
3178
3179    #[test]
3180    fn zadd_lt_only_decreases() {
3181        let mut ks = Keyspace::new();
3182        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
3183            .unwrap();
3184        let flags = ZAddFlags {
3185            lt: true,
3186            ..Default::default()
3187        };
3188        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
3189        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
3190        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
3191        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(50.0));
3192    }
3193
3194    #[test]
3195    fn zrem_removes_members() {
3196        let mut ks = Keyspace::new();
3197        ks.zadd(
3198            "z",
3199            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
3200            &ZAddFlags::default(),
3201        )
3202        .unwrap();
3203        let removed = ks
3204            .zrem("z", &["a".into(), "c".into(), "nonexistent".into()])
3205            .unwrap();
3206        assert_eq!(removed.len(), 2);
3207        assert!(removed.contains(&"a".to_owned()));
3208        assert!(removed.contains(&"c".to_owned()));
3209        assert_eq!(ks.zscore("z", "a").unwrap(), None);
3210        assert_eq!(ks.zscore("z", "b").unwrap(), Some(2.0));
3211    }
3212
3213    #[test]
3214    fn zrem_auto_deletes_empty() {
3215        let mut ks = Keyspace::new();
3216        ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
3217            .unwrap();
3218        ks.zrem("z", &["only".into()]).unwrap();
3219        assert!(!ks.exists("z"));
3220        assert_eq!(ks.stats().key_count, 0);
3221    }
3222
3223    #[test]
3224    fn zrem_missing_key() {
3225        let mut ks = Keyspace::new();
3226        assert!(ks.zrem("nope", &["a".into()]).unwrap().is_empty());
3227    }
3228
3229    #[test]
3230    fn zscore_returns_score() {
3231        let mut ks = Keyspace::new();
3232        ks.zadd("z", &[(42.5, "member".into())], &ZAddFlags::default())
3233            .unwrap();
3234        assert_eq!(ks.zscore("z", "member").unwrap(), Some(42.5));
3235        assert_eq!(ks.zscore("z", "missing").unwrap(), None);
3236    }
3237
3238    #[test]
3239    fn zscore_missing_key() {
3240        let mut ks = Keyspace::new();
3241        assert_eq!(ks.zscore("nope", "m").unwrap(), None);
3242    }
3243
3244    #[test]
3245    fn zrank_returns_rank() {
3246        let mut ks = Keyspace::new();
3247        ks.zadd(
3248            "z",
3249            &[
3250                (300.0, "c".into()),
3251                (100.0, "a".into()),
3252                (200.0, "b".into()),
3253            ],
3254            &ZAddFlags::default(),
3255        )
3256        .unwrap();
3257        assert_eq!(ks.zrank("z", "a").unwrap(), Some(0));
3258        assert_eq!(ks.zrank("z", "b").unwrap(), Some(1));
3259        assert_eq!(ks.zrank("z", "c").unwrap(), Some(2));
3260        assert_eq!(ks.zrank("z", "d").unwrap(), None);
3261    }
3262
3263    #[test]
3264    fn zrange_returns_range() {
3265        let mut ks = Keyspace::new();
3266        ks.zadd(
3267            "z",
3268            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
3269            &ZAddFlags::default(),
3270        )
3271        .unwrap();
3272
3273        let all = ks.zrange("z", 0, -1).unwrap();
3274        assert_eq!(
3275            all,
3276            vec![
3277                ("a".to_owned(), 1.0),
3278                ("b".to_owned(), 2.0),
3279                ("c".to_owned(), 3.0),
3280            ]
3281        );
3282
3283        let middle = ks.zrange("z", 1, 1).unwrap();
3284        assert_eq!(middle, vec![("b".to_owned(), 2.0)]);
3285
3286        let last_two = ks.zrange("z", -2, -1).unwrap();
3287        assert_eq!(last_two, vec![("b".to_owned(), 2.0), ("c".to_owned(), 3.0)]);
3288    }
3289
3290    #[test]
3291    fn zrange_missing_key() {
3292        let mut ks = Keyspace::new();
3293        assert!(ks.zrange("nope", 0, -1).unwrap().is_empty());
3294    }
3295
3296    #[test]
3297    fn zadd_on_string_key_returns_wrongtype() {
3298        let mut ks = Keyspace::new();
3299        ks.set("s".into(), Bytes::from("val"), None);
3300        assert!(ks
3301            .zadd("s", &[(1.0, "m".into())], &ZAddFlags::default())
3302            .is_err());
3303    }
3304
3305    #[test]
3306    fn zrem_on_string_key_returns_wrongtype() {
3307        let mut ks = Keyspace::new();
3308        ks.set("s".into(), Bytes::from("val"), None);
3309        assert!(ks.zrem("s", &["m".into()]).is_err());
3310    }
3311
3312    #[test]
3313    fn zscore_on_list_key_returns_wrongtype() {
3314        let mut ks = Keyspace::new();
3315        ks.rpush("l", &[Bytes::from("item")]).unwrap();
3316        assert!(ks.zscore("l", "m").is_err());
3317    }
3318
3319    #[test]
3320    fn zrank_on_string_key_returns_wrongtype() {
3321        let mut ks = Keyspace::new();
3322        ks.set("s".into(), Bytes::from("val"), None);
3323        assert!(ks.zrank("s", "m").is_err());
3324    }
3325
3326    #[test]
3327    fn zrange_on_string_key_returns_wrongtype() {
3328        let mut ks = Keyspace::new();
3329        ks.set("s".into(), Bytes::from("val"), None);
3330        assert!(ks.zrange("s", 0, -1).is_err());
3331    }
3332
3333    #[test]
3334    fn sorted_set_memory_tracked() {
3335        let mut ks = Keyspace::new();
3336        let before = ks.stats().used_bytes;
3337        ks.zadd("z", &[(1.0, "alice".into())], &ZAddFlags::default())
3338            .unwrap();
3339        let after_add = ks.stats().used_bytes;
3340        assert!(after_add > before);
3341
3342        ks.zadd("z", &[(2.0, "bob".into())], &ZAddFlags::default())
3343            .unwrap();
3344        let after_second = ks.stats().used_bytes;
3345        assert!(after_second > after_add);
3346
3347        ks.zrem("z", &["alice".into()]).unwrap();
3348        let after_remove = ks.stats().used_bytes;
3349        assert!(after_remove < after_second);
3350    }
3351
3352    #[test]
3353    fn incr_new_key_defaults_to_zero() {
3354        let mut ks = Keyspace::new();
3355        assert_eq!(ks.incr("counter").unwrap(), 1);
3356        // verify the stored value
3357        match ks.get("counter").unwrap() {
3358            Some(Value::String(data)) => assert_eq!(data, Bytes::from("1")),
3359            other => panic!("expected String(\"1\"), got {other:?}"),
3360        }
3361    }
3362
3363    #[test]
3364    fn incr_existing_value() {
3365        let mut ks = Keyspace::new();
3366        ks.set("n".into(), Bytes::from("10"), None);
3367        assert_eq!(ks.incr("n").unwrap(), 11);
3368    }
3369
3370    #[test]
3371    fn decr_new_key_defaults_to_zero() {
3372        let mut ks = Keyspace::new();
3373        assert_eq!(ks.decr("counter").unwrap(), -1);
3374    }
3375
3376    #[test]
3377    fn decr_existing_value() {
3378        let mut ks = Keyspace::new();
3379        ks.set("n".into(), Bytes::from("10"), None);
3380        assert_eq!(ks.decr("n").unwrap(), 9);
3381    }
3382
3383    #[test]
3384    fn incr_non_integer_returns_error() {
3385        let mut ks = Keyspace::new();
3386        ks.set("s".into(), Bytes::from("notanum"), None);
3387        assert_eq!(ks.incr("s").unwrap_err(), IncrError::NotAnInteger);
3388    }
3389
3390    #[test]
3391    fn incr_on_list_returns_wrongtype() {
3392        let mut ks = Keyspace::new();
3393        ks.lpush("list", &[Bytes::from("a")]).unwrap();
3394        assert_eq!(ks.incr("list").unwrap_err(), IncrError::WrongType);
3395    }
3396
3397    #[test]
3398    fn incr_overflow_returns_error() {
3399        let mut ks = Keyspace::new();
3400        ks.set("max".into(), Bytes::from(i64::MAX.to_string()), None);
3401        assert_eq!(ks.incr("max").unwrap_err(), IncrError::Overflow);
3402    }
3403
3404    #[test]
3405    fn decr_overflow_returns_error() {
3406        let mut ks = Keyspace::new();
3407        ks.set("min".into(), Bytes::from(i64::MIN.to_string()), None);
3408        assert_eq!(ks.decr("min").unwrap_err(), IncrError::Overflow);
3409    }
3410
3411    #[test]
3412    fn incr_preserves_ttl() {
3413        let mut ks = Keyspace::new();
3414        ks.set("n".into(), Bytes::from("5"), Some(Duration::from_secs(60)));
3415        ks.incr("n").unwrap();
3416        match ks.ttl("n") {
3417            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
3418            other => panic!("expected TTL preserved, got {other:?}"),
3419        }
3420    }
3421
3422    #[test]
3423    fn zrem_returns_actually_removed_members() {
3424        let mut ks = Keyspace::new();
3425        ks.zadd(
3426            "z",
3427            &[(1.0, "a".into()), (2.0, "b".into())],
3428            &ZAddFlags::default(),
3429        )
3430        .unwrap();
3431        // "a" exists, "ghost" doesn't — only "a" should be in the result
3432        let removed = ks.zrem("z", &["a".into(), "ghost".into()]).unwrap();
3433        assert_eq!(removed, vec!["a".to_owned()]);
3434    }
3435
3436    #[test]
3437    fn zcard_returns_count() {
3438        let mut ks = Keyspace::new();
3439        ks.zadd(
3440            "z",
3441            &[(1.0, "a".into()), (2.0, "b".into())],
3442            &ZAddFlags::default(),
3443        )
3444        .unwrap();
3445        assert_eq!(ks.zcard("z").unwrap(), 2);
3446    }
3447
3448    #[test]
3449    fn zcard_missing_key_returns_zero() {
3450        let mut ks = Keyspace::new();
3451        assert_eq!(ks.zcard("missing").unwrap(), 0);
3452    }
3453
3454    #[test]
3455    fn zcard_on_string_key_returns_wrongtype() {
3456        let mut ks = Keyspace::new();
3457        ks.set("s".into(), Bytes::from("val"), None);
3458        assert!(ks.zcard("s").is_err());
3459    }
3460
3461    #[test]
3462    fn persist_removes_expiry() {
3463        let mut ks = Keyspace::new();
3464        ks.set(
3465            "key".into(),
3466            Bytes::from("val"),
3467            Some(Duration::from_secs(60)),
3468        );
3469        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
3470
3471        assert!(ks.persist("key"));
3472        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
3473        assert_eq!(ks.stats().keys_with_expiry, 0);
3474    }
3475
3476    #[test]
3477    fn persist_returns_false_without_expiry() {
3478        let mut ks = Keyspace::new();
3479        ks.set("key".into(), Bytes::from("val"), None);
3480        assert!(!ks.persist("key"));
3481    }
3482
3483    #[test]
3484    fn persist_returns_false_for_missing_key() {
3485        let mut ks = Keyspace::new();
3486        assert!(!ks.persist("missing"));
3487    }
3488
3489    #[test]
3490    fn pttl_returns_milliseconds() {
3491        let mut ks = Keyspace::new();
3492        ks.set(
3493            "key".into(),
3494            Bytes::from("val"),
3495            Some(Duration::from_secs(60)),
3496        );
3497        match ks.pttl("key") {
3498            TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
3499            other => panic!("expected Milliseconds, got {other:?}"),
3500        }
3501    }
3502
3503    #[test]
3504    fn pttl_no_expiry() {
3505        let mut ks = Keyspace::new();
3506        ks.set("key".into(), Bytes::from("val"), None);
3507        assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
3508    }
3509
3510    #[test]
3511    fn pttl_not_found() {
3512        let mut ks = Keyspace::new();
3513        assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
3514    }
3515
3516    #[test]
3517    fn pexpire_sets_ttl_in_millis() {
3518        let mut ks = Keyspace::new();
3519        ks.set("key".into(), Bytes::from("val"), None);
3520        assert!(ks.pexpire("key", 5000));
3521        match ks.pttl("key") {
3522            TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
3523            other => panic!("expected Milliseconds, got {other:?}"),
3524        }
3525        assert_eq!(ks.stats().keys_with_expiry, 1);
3526    }
3527
3528    #[test]
3529    fn pexpire_missing_key_returns_false() {
3530        let mut ks = Keyspace::new();
3531        assert!(!ks.pexpire("missing", 5000));
3532    }
3533
3534    #[test]
3535    fn pexpire_overwrites_existing_ttl() {
3536        let mut ks = Keyspace::new();
3537        ks.set(
3538            "key".into(),
3539            Bytes::from("val"),
3540            Some(Duration::from_secs(60)),
3541        );
3542        assert!(ks.pexpire("key", 500));
3543        match ks.pttl("key") {
3544            TtlResult::Milliseconds(ms) => assert!(ms <= 500),
3545            other => panic!("expected Milliseconds, got {other:?}"),
3546        }
3547        // expiry count shouldn't double-count
3548        assert_eq!(ks.stats().keys_with_expiry, 1);
3549    }
3550
3551    // -- memory limit enforcement for list/zset --
3552
3553    #[test]
3554    fn lpush_rejects_when_memory_full() {
3555        let config = ShardConfig {
3556            max_memory: Some(150),
3557            eviction_policy: EvictionPolicy::NoEviction,
3558            ..ShardConfig::default()
3559        };
3560        let mut ks = Keyspace::with_config(config);
3561
3562        // first key eats up most of the budget
3563        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
3564
3565        // lpush should be rejected — not enough room
3566        let result = ks.lpush("list", &[Bytes::from("big-value-here")]);
3567        assert_eq!(result, Err(WriteError::OutOfMemory));
3568
3569        // original key should be untouched
3570        assert!(ks.exists("a"));
3571    }
3572
3573    #[test]
3574    fn rpush_rejects_when_memory_full() {
3575        let config = ShardConfig {
3576            max_memory: Some(150),
3577            eviction_policy: EvictionPolicy::NoEviction,
3578            ..ShardConfig::default()
3579        };
3580        let mut ks = Keyspace::with_config(config);
3581
3582        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
3583
3584        let result = ks.rpush("list", &[Bytes::from("big-value-here")]);
3585        assert_eq!(result, Err(WriteError::OutOfMemory));
3586    }
3587
3588    #[test]
3589    fn zadd_rejects_when_memory_full() {
3590        let config = ShardConfig {
3591            max_memory: Some(150),
3592            eviction_policy: EvictionPolicy::NoEviction,
3593            ..ShardConfig::default()
3594        };
3595        let mut ks = Keyspace::with_config(config);
3596
3597        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
3598
3599        let result = ks.zadd("z", &[(1.0, "member".into())], &ZAddFlags::default());
3600        assert!(matches!(result, Err(WriteError::OutOfMemory)));
3601
3602        // original key should be untouched
3603        assert!(ks.exists("a"));
3604    }
3605
3606    #[test]
3607    fn lpush_evicts_under_lru_policy() {
3608        // "a" entry = 1 + 3 + 128 = 132 bytes.
3609        // list entry = 4 + 24 + (4 + 32) + 128 = 192 bytes.
3610        // effective limit = 250 * 90 / 100 = 225. fits one entry but not both,
3611        // so lpush should evict "a" to make room for the list.
3612        let config = ShardConfig {
3613            max_memory: Some(250),
3614            eviction_policy: EvictionPolicy::AllKeysLru,
3615            ..ShardConfig::default()
3616        };
3617        let mut ks = Keyspace::with_config(config);
3618
3619        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
3620
3621        // should evict "a" to make room for the list
3622        assert!(ks.lpush("list", &[Bytes::from("item")]).is_ok());
3623        assert!(!ks.exists("a"));
3624    }
3625
3626    #[test]
3627    fn clear_removes_all_keys() {
3628        let mut ks = Keyspace::new();
3629        ks.set("a".into(), Bytes::from("1"), None);
3630        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(60)));
3631        ks.lpush("list", &[Bytes::from("x")]).unwrap();
3632
3633        assert_eq!(ks.len(), 3);
3634        assert!(ks.stats().used_bytes > 0);
3635        assert_eq!(ks.stats().keys_with_expiry, 1);
3636
3637        ks.clear();
3638
3639        assert_eq!(ks.len(), 0);
3640        assert!(ks.is_empty());
3641        assert_eq!(ks.stats().used_bytes, 0);
3642        assert_eq!(ks.stats().keys_with_expiry, 0);
3643    }
3644
3645    // --- scan ---
3646
3647    #[test]
3648    fn scan_returns_keys() {
3649        let mut ks = Keyspace::new();
3650        ks.set("key1".into(), Bytes::from("a"), None);
3651        ks.set("key2".into(), Bytes::from("b"), None);
3652        ks.set("key3".into(), Bytes::from("c"), None);
3653
3654        let (cursor, keys) = ks.scan_keys(0, 10, None);
3655        assert_eq!(cursor, 0); // complete in one pass
3656        assert_eq!(keys.len(), 3);
3657    }
3658
3659    #[test]
3660    fn scan_empty_keyspace() {
3661        let ks = Keyspace::new();
3662        let (cursor, keys) = ks.scan_keys(0, 10, None);
3663        assert_eq!(cursor, 0);
3664        assert!(keys.is_empty());
3665    }
3666
3667    #[test]
3668    fn scan_with_pattern() {
3669        let mut ks = Keyspace::new();
3670        ks.set("user:1".into(), Bytes::from("a"), None);
3671        ks.set("user:2".into(), Bytes::from("b"), None);
3672        ks.set("item:1".into(), Bytes::from("c"), None);
3673
3674        let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
3675        assert_eq!(cursor, 0);
3676        assert_eq!(keys.len(), 2);
3677        for k in &keys {
3678            assert!(k.starts_with("user:"));
3679        }
3680    }
3681
3682    #[test]
3683    fn scan_with_count_limit() {
3684        let mut ks = Keyspace::new();
3685        for i in 0..10 {
3686            ks.set(format!("k{i}"), Bytes::from("v"), None);
3687        }
3688
3689        // first batch
3690        let (cursor, keys) = ks.scan_keys(0, 3, None);
3691        assert!(!keys.is_empty());
3692        assert!(keys.len() <= 3);
3693
3694        // if there are more keys, cursor should be non-zero
3695        if cursor != 0 {
3696            let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
3697            assert!(!keys2.is_empty());
3698            // continue until complete
3699            let _ = (cursor2, keys2);
3700        }
3701    }
3702
3703    #[test]
3704    fn scan_skips_expired_keys() {
3705        let mut ks = Keyspace::new();
3706        ks.set("live".into(), Bytes::from("a"), None);
3707        ks.set(
3708            "expired".into(),
3709            Bytes::from("b"),
3710            Some(Duration::from_millis(1)),
3711        );
3712
3713        std::thread::sleep(Duration::from_millis(5));
3714
3715        let (_, keys) = ks.scan_keys(0, 10, None);
3716        assert_eq!(keys.len(), 1);
3717        assert_eq!(keys[0], "live");
3718    }
3719
3720    #[test]
3721    fn glob_match_star() {
3722        assert!(super::glob_match("user:*", "user:123"));
3723        assert!(super::glob_match("user:*", "user:"));
3724        assert!(super::glob_match("*:data", "foo:data"));
3725        assert!(!super::glob_match("user:*", "item:123"));
3726    }
3727
3728    #[test]
3729    fn glob_match_question() {
3730        assert!(super::glob_match("key?", "key1"));
3731        assert!(super::glob_match("key?", "keya"));
3732        assert!(!super::glob_match("key?", "key"));
3733        assert!(!super::glob_match("key?", "key12"));
3734    }
3735
3736    #[test]
3737    fn glob_match_brackets() {
3738        assert!(super::glob_match("key[abc]", "keya"));
3739        assert!(super::glob_match("key[abc]", "keyb"));
3740        assert!(!super::glob_match("key[abc]", "keyd"));
3741    }
3742
3743    #[test]
3744    fn glob_match_literal() {
3745        assert!(super::glob_match("exact", "exact"));
3746        assert!(!super::glob_match("exact", "exactnot"));
3747        assert!(!super::glob_match("exact", "notexact"));
3748    }
3749
3750    // --- hash tests ---
3751
3752    #[test]
3753    fn hset_creates_hash() {
3754        let mut ks = Keyspace::new();
3755        let count = ks
3756            .hset("h", &[("field1".into(), Bytes::from("value1"))])
3757            .unwrap();
3758        assert_eq!(count, 1);
3759        assert_eq!(ks.value_type("h"), "hash");
3760    }
3761
3762    #[test]
3763    fn hset_returns_new_field_count() {
3764        let mut ks = Keyspace::new();
3765        // add two new fields
3766        let count = ks
3767            .hset(
3768                "h",
3769                &[
3770                    ("f1".into(), Bytes::from("v1")),
3771                    ("f2".into(), Bytes::from("v2")),
3772                ],
3773            )
3774            .unwrap();
3775        assert_eq!(count, 2);
3776
3777        // update one, add one new
3778        let count = ks
3779            .hset(
3780                "h",
3781                &[
3782                    ("f1".into(), Bytes::from("updated")),
3783                    ("f3".into(), Bytes::from("v3")),
3784                ],
3785            )
3786            .unwrap();
3787        assert_eq!(count, 1); // only f3 is new
3788    }
3789
3790    #[test]
3791    fn hget_returns_value() {
3792        let mut ks = Keyspace::new();
3793        ks.hset("h", &[("name".into(), Bytes::from("alice"))])
3794            .unwrap();
3795        let val = ks.hget("h", "name").unwrap();
3796        assert_eq!(val, Some(Bytes::from("alice")));
3797    }
3798
3799    #[test]
3800    fn hget_missing_field_returns_none() {
3801        let mut ks = Keyspace::new();
3802        ks.hset("h", &[("a".into(), Bytes::from("1"))]).unwrap();
3803        assert_eq!(ks.hget("h", "b").unwrap(), None);
3804    }
3805
3806    #[test]
3807    fn hget_missing_key_returns_none() {
3808        let mut ks = Keyspace::new();
3809        assert_eq!(ks.hget("missing", "field").unwrap(), None);
3810    }
3811
3812    #[test]
3813    fn hgetall_returns_all_fields() {
3814        let mut ks = Keyspace::new();
3815        ks.hset(
3816            "h",
3817            &[
3818                ("a".into(), Bytes::from("1")),
3819                ("b".into(), Bytes::from("2")),
3820            ],
3821        )
3822        .unwrap();
3823        let mut fields = ks.hgetall("h").unwrap();
3824        fields.sort_by(|a, b| a.0.cmp(&b.0));
3825        assert_eq!(fields.len(), 2);
3826        assert_eq!(fields[0], ("a".into(), Bytes::from("1")));
3827        assert_eq!(fields[1], ("b".into(), Bytes::from("2")));
3828    }
3829
3830    #[test]
3831    fn hdel_removes_fields() {
3832        let mut ks = Keyspace::new();
3833        ks.hset(
3834            "h",
3835            &[
3836                ("a".into(), Bytes::from("1")),
3837                ("b".into(), Bytes::from("2")),
3838                ("c".into(), Bytes::from("3")),
3839            ],
3840        )
3841        .unwrap();
3842        let removed = ks.hdel("h", &["a".into(), "c".into()]).unwrap();
3843        assert_eq!(removed.len(), 2);
3844        assert!(removed.contains(&"a".into()));
3845        assert!(removed.contains(&"c".into()));
3846        assert_eq!(ks.hlen("h").unwrap(), 1);
3847    }
3848
3849    #[test]
3850    fn hdel_auto_deletes_empty_hash() {
3851        let mut ks = Keyspace::new();
3852        ks.hset("h", &[("only".into(), Bytes::from("field"))])
3853            .unwrap();
3854        ks.hdel("h", &["only".into()]).unwrap();
3855        assert_eq!(ks.value_type("h"), "none");
3856    }
3857
3858    #[test]
3859    fn hexists_returns_true_for_existing_field() {
3860        let mut ks = Keyspace::new();
3861        ks.hset("h", &[("field".into(), Bytes::from("val"))])
3862            .unwrap();
3863        assert!(ks.hexists("h", "field").unwrap());
3864    }
3865
3866    #[test]
3867    fn hexists_returns_false_for_missing_field() {
3868        let mut ks = Keyspace::new();
3869        ks.hset("h", &[("a".into(), Bytes::from("1"))]).unwrap();
3870        assert!(!ks.hexists("h", "missing").unwrap());
3871    }
3872
3873    #[test]
3874    fn hlen_returns_field_count() {
3875        let mut ks = Keyspace::new();
3876        ks.hset(
3877            "h",
3878            &[
3879                ("a".into(), Bytes::from("1")),
3880                ("b".into(), Bytes::from("2")),
3881            ],
3882        )
3883        .unwrap();
3884        assert_eq!(ks.hlen("h").unwrap(), 2);
3885    }
3886
3887    #[test]
3888    fn hlen_missing_key_returns_zero() {
3889        let mut ks = Keyspace::new();
3890        assert_eq!(ks.hlen("missing").unwrap(), 0);
3891    }
3892
3893    #[test]
3894    fn hincrby_new_field() {
3895        let mut ks = Keyspace::new();
3896        ks.hset("h", &[("x".into(), Bytes::from("ignored"))])
3897            .unwrap();
3898        let val = ks.hincrby("h", "counter", 5).unwrap();
3899        assert_eq!(val, 5);
3900    }
3901
3902    #[test]
3903    fn hincrby_existing_field() {
3904        let mut ks = Keyspace::new();
3905        ks.hset("h", &[("n".into(), Bytes::from("10"))]).unwrap();
3906        let val = ks.hincrby("h", "n", 3).unwrap();
3907        assert_eq!(val, 13);
3908    }
3909
3910    #[test]
3911    fn hincrby_negative_delta() {
3912        let mut ks = Keyspace::new();
3913        ks.hset("h", &[("n".into(), Bytes::from("10"))]).unwrap();
3914        let val = ks.hincrby("h", "n", -7).unwrap();
3915        assert_eq!(val, 3);
3916    }
3917
3918    #[test]
3919    fn hincrby_non_integer_returns_error() {
3920        let mut ks = Keyspace::new();
3921        ks.hset("h", &[("s".into(), Bytes::from("notanumber"))])
3922            .unwrap();
3923        assert_eq!(
3924            ks.hincrby("h", "s", 1).unwrap_err(),
3925            IncrError::NotAnInteger
3926        );
3927    }
3928
3929    #[test]
3930    fn hkeys_returns_field_names() {
3931        let mut ks = Keyspace::new();
3932        ks.hset(
3933            "h",
3934            &[
3935                ("alpha".into(), Bytes::from("1")),
3936                ("beta".into(), Bytes::from("2")),
3937            ],
3938        )
3939        .unwrap();
3940        let mut keys = ks.hkeys("h").unwrap();
3941        keys.sort();
3942        assert_eq!(keys, vec!["alpha", "beta"]);
3943    }
3944
3945    #[test]
3946    fn hvals_returns_values() {
3947        let mut ks = Keyspace::new();
3948        ks.hset(
3949            "h",
3950            &[
3951                ("a".into(), Bytes::from("x")),
3952                ("b".into(), Bytes::from("y")),
3953            ],
3954        )
3955        .unwrap();
3956        let mut vals = ks.hvals("h").unwrap();
3957        vals.sort();
3958        assert_eq!(vals, vec![Bytes::from("x"), Bytes::from("y")]);
3959    }
3960
3961    #[test]
3962    fn hmget_returns_values_for_existing_fields() {
3963        let mut ks = Keyspace::new();
3964        ks.hset(
3965            "h",
3966            &[
3967                ("a".into(), Bytes::from("1")),
3968                ("b".into(), Bytes::from("2")),
3969            ],
3970        )
3971        .unwrap();
3972        let vals = ks
3973            .hmget("h", &["a".into(), "missing".into(), "b".into()])
3974            .unwrap();
3975        assert_eq!(vals.len(), 3);
3976        assert_eq!(vals[0], Some(Bytes::from("1")));
3977        assert_eq!(vals[1], None);
3978        assert_eq!(vals[2], Some(Bytes::from("2")));
3979    }
3980
3981    #[test]
3982    fn hash_on_string_key_returns_wrongtype() {
3983        let mut ks = Keyspace::new();
3984        ks.set("s".into(), Bytes::from("string"), None);
3985        assert!(ks.hset("s", &[("f".into(), Bytes::from("v"))]).is_err());
3986        assert!(ks.hget("s", "f").is_err());
3987        assert!(ks.hgetall("s").is_err());
3988        assert!(ks.hdel("s", &["f".into()]).is_err());
3989        assert!(ks.hexists("s", "f").is_err());
3990        assert!(ks.hlen("s").is_err());
3991        assert!(ks.hincrby("s", "f", 1).is_err());
3992        assert!(ks.hkeys("s").is_err());
3993        assert!(ks.hvals("s").is_err());
3994        assert!(ks.hmget("s", &["f".into()]).is_err());
3995    }
3996
3997    // --- set tests ---
3998
3999    #[test]
4000    fn sadd_creates_set() {
4001        let mut ks = Keyspace::new();
4002        let added = ks.sadd("s", &["a".into(), "b".into()]).unwrap();
4003        assert_eq!(added, 2);
4004        assert_eq!(ks.value_type("s"), "set");
4005    }
4006
4007    #[test]
4008    fn sadd_returns_new_member_count() {
4009        let mut ks = Keyspace::new();
4010        ks.sadd("s", &["a".into(), "b".into()]).unwrap();
4011        // add one existing, one new
4012        let added = ks.sadd("s", &["b".into(), "c".into()]).unwrap();
4013        assert_eq!(added, 1); // only "c" is new
4014    }
4015
4016    #[test]
4017    fn srem_removes_members() {
4018        let mut ks = Keyspace::new();
4019        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
4020        let removed = ks.srem("s", &["a".into(), "c".into()]).unwrap();
4021        assert_eq!(removed, 2);
4022        assert_eq!(ks.scard("s").unwrap(), 1);
4023    }
4024
4025    #[test]
4026    fn srem_auto_deletes_empty_set() {
4027        let mut ks = Keyspace::new();
4028        ks.sadd("s", &["only".into()]).unwrap();
4029        ks.srem("s", &["only".into()]).unwrap();
4030        assert_eq!(ks.value_type("s"), "none");
4031    }
4032
4033    #[test]
4034    fn smembers_returns_all_members() {
4035        let mut ks = Keyspace::new();
4036        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
4037        let mut members = ks.smembers("s").unwrap();
4038        members.sort();
4039        assert_eq!(members, vec!["a", "b", "c"]);
4040    }
4041
4042    #[test]
4043    fn smembers_missing_key_returns_empty() {
4044        let mut ks = Keyspace::new();
4045        assert_eq!(ks.smembers("missing").unwrap(), Vec::<String>::new());
4046    }
4047
4048    #[test]
4049    fn sismember_returns_true_for_existing() {
4050        let mut ks = Keyspace::new();
4051        ks.sadd("s", &["member".into()]).unwrap();
4052        assert!(ks.sismember("s", "member").unwrap());
4053    }
4054
4055    #[test]
4056    fn sismember_returns_false_for_missing() {
4057        let mut ks = Keyspace::new();
4058        ks.sadd("s", &["a".into()]).unwrap();
4059        assert!(!ks.sismember("s", "missing").unwrap());
4060    }
4061
4062    #[test]
4063    fn scard_returns_count() {
4064        let mut ks = Keyspace::new();
4065        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
4066        assert_eq!(ks.scard("s").unwrap(), 3);
4067    }
4068
4069    #[test]
4070    fn scard_missing_key_returns_zero() {
4071        let mut ks = Keyspace::new();
4072        assert_eq!(ks.scard("missing").unwrap(), 0);
4073    }
4074
4075    #[test]
4076    fn set_on_string_key_returns_wrongtype() {
4077        let mut ks = Keyspace::new();
4078        ks.set("s".into(), Bytes::from("string"), None);
4079        assert!(ks.sadd("s", &["m".into()]).is_err());
4080        assert!(ks.srem("s", &["m".into()]).is_err());
4081        assert!(ks.smembers("s").is_err());
4082        assert!(ks.sismember("s", "m").is_err());
4083        assert!(ks.scard("s").is_err());
4084    }
4085
4086    // --- edge case tests ---
4087
4088    #[test]
4089    fn zero_ttl_expires_immediately() {
4090        let mut ks = Keyspace::new();
4091        ks.set("key".into(), Bytes::from("val"), Some(Duration::ZERO));
4092
4093        // key should be expired immediately
4094        std::thread::sleep(Duration::from_millis(1));
4095        assert!(ks.get("key").unwrap().is_none());
4096    }
4097
4098    #[test]
4099    fn very_small_ttl_expires_quickly() {
4100        let mut ks = Keyspace::new();
4101        ks.set(
4102            "key".into(),
4103            Bytes::from("val"),
4104            Some(Duration::from_millis(1)),
4105        );
4106
4107        std::thread::sleep(Duration::from_millis(5));
4108        assert!(ks.get("key").unwrap().is_none());
4109    }
4110
4111    #[test]
4112    fn list_auto_deleted_when_empty() {
4113        let mut ks = Keyspace::new();
4114        ks.lpush("list", &[Bytes::from("a"), Bytes::from("b")])
4115            .unwrap();
4116        assert_eq!(ks.len(), 1);
4117
4118        // pop all elements
4119        let _ = ks.lpop("list");
4120        let _ = ks.lpop("list");
4121
4122        // list should be auto-deleted
4123        assert_eq!(ks.len(), 0);
4124        assert!(!ks.exists("list"));
4125    }
4126
4127    #[test]
4128    fn set_auto_deleted_when_empty() {
4129        let mut ks = Keyspace::new();
4130        ks.sadd("s", &["a".into(), "b".into()]).unwrap();
4131        assert_eq!(ks.len(), 1);
4132
4133        // remove all members
4134        ks.srem("s", &["a".into(), "b".into()]).unwrap();
4135
4136        // set should be auto-deleted
4137        assert_eq!(ks.len(), 0);
4138        assert!(!ks.exists("s"));
4139    }
4140
4141    #[test]
4142    fn hash_auto_deleted_when_empty() {
4143        let mut ks = Keyspace::new();
4144        ks.hset(
4145            "h",
4146            &[
4147                ("f1".into(), Bytes::from("v1")),
4148                ("f2".into(), Bytes::from("v2")),
4149            ],
4150        )
4151        .unwrap();
4152        assert_eq!(ks.len(), 1);
4153
4154        // delete all fields
4155        ks.hdel("h", &["f1".into(), "f2".into()]).unwrap();
4156
4157        // hash should be auto-deleted
4158        assert_eq!(ks.len(), 0);
4159        assert!(!ks.exists("h"));
4160    }
4161
4162    #[test]
4163    fn sadd_duplicate_members_counted_once() {
4164        let mut ks = Keyspace::new();
4165        // add same member twice in one call
4166        let count = ks.sadd("s", &["a".into(), "a".into()]).unwrap();
4167        // should only count as 1 new member
4168        assert_eq!(count, 1);
4169        assert_eq!(ks.scard("s").unwrap(), 1);
4170    }
4171
4172    #[test]
4173    fn srem_non_existent_member_returns_zero() {
4174        let mut ks = Keyspace::new();
4175        ks.sadd("s", &["a".into()]).unwrap();
4176        let removed = ks.srem("s", &["nonexistent".into()]).unwrap();
4177        assert_eq!(removed, 0);
4178    }
4179
4180    #[test]
4181    fn hincrby_overflow_returns_error() {
4182        let mut ks = Keyspace::new();
4183        // set field to near max
4184        ks.hset("h", &[("count".into(), Bytes::from(i64::MAX.to_string()))])
4185            .unwrap();
4186
4187        // try to increment by 1 - should overflow
4188        let result = ks.hincrby("h", "count", 1);
4189        assert!(result.is_err());
4190    }
4191
4192    #[test]
4193    fn hincrby_on_non_integer_returns_error() {
4194        let mut ks = Keyspace::new();
4195        ks.hset("h", &[("field".into(), Bytes::from("not_a_number"))])
4196            .unwrap();
4197
4198        let result = ks.hincrby("h", "field", 1);
4199        assert!(result.is_err());
4200    }
4201
4202    #[test]
4203    fn incr_at_max_value_overflows() {
4204        let mut ks = Keyspace::new();
4205        ks.set("counter".into(), Bytes::from(i64::MAX.to_string()), None);
4206
4207        let result = ks.incr("counter");
4208        assert!(matches!(result, Err(IncrError::Overflow)));
4209    }
4210
4211    #[test]
4212    fn decr_at_min_value_underflows() {
4213        let mut ks = Keyspace::new();
4214        ks.set("counter".into(), Bytes::from(i64::MIN.to_string()), None);
4215
4216        let result = ks.decr("counter");
4217        assert!(matches!(result, Err(IncrError::Overflow)));
4218    }
4219
4220    #[test]
4221    fn lrange_inverted_start_stop_returns_empty() {
4222        let mut ks = Keyspace::new();
4223        ks.lpush(
4224            "list",
4225            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
4226        )
4227        .unwrap();
4228
4229        // start > stop with positive indices
4230        let result = ks.lrange("list", 2, 0).unwrap();
4231        assert!(result.is_empty());
4232    }
4233
4234    #[test]
4235    fn lrange_large_stop_clamps_to_len() {
4236        let mut ks = Keyspace::new();
4237        ks.lpush("list", &[Bytes::from("a"), Bytes::from("b")])
4238            .unwrap();
4239
4240        // large indices should clamp to list bounds
4241        let result = ks.lrange("list", 0, 1000).unwrap();
4242        assert_eq!(result.len(), 2);
4243    }
4244
4245    #[test]
4246    fn empty_string_key_works() {
4247        let mut ks = Keyspace::new();
4248        ks.set("".into(), Bytes::from("value"), None);
4249        assert_eq!(
4250            ks.get("").unwrap(),
4251            Some(Value::String(Bytes::from("value")))
4252        );
4253        assert!(ks.exists(""));
4254    }
4255
4256    #[test]
4257    fn empty_value_works() {
4258        let mut ks = Keyspace::new();
4259        ks.set("key".into(), Bytes::from(""), None);
4260        assert_eq!(ks.get("key").unwrap(), Some(Value::String(Bytes::from(""))));
4261    }
4262
4263    #[test]
4264    fn binary_data_in_value() {
4265        let mut ks = Keyspace::new();
4266        // value with null bytes and other binary data
4267        let binary = Bytes::from(vec![0u8, 1, 2, 255, 0, 128]);
4268        ks.set("binary".into(), binary.clone(), None);
4269        assert_eq!(ks.get("binary").unwrap(), Some(Value::String(binary)));
4270    }
4271
4272    #[test]
4273    fn incr_by_float_basic() {
4274        let mut ks = Keyspace::new();
4275        ks.set("n".into(), Bytes::from("10.5"), None);
4276        let result = ks.incr_by_float("n", 2.3).unwrap();
4277        let f: f64 = result.parse().unwrap();
4278        assert!((f - 12.8).abs() < 0.001);
4279    }
4280
4281    #[test]
4282    fn incr_by_float_new_key() {
4283        let mut ks = Keyspace::new();
4284        let result = ks.incr_by_float("new", 2.72).unwrap();
4285        let f: f64 = result.parse().unwrap();
4286        assert!((f - 2.72).abs() < 0.001);
4287    }
4288
4289    #[test]
4290    fn incr_by_float_negative() {
4291        let mut ks = Keyspace::new();
4292        ks.set("n".into(), Bytes::from("10"), None);
4293        let result = ks.incr_by_float("n", -3.5).unwrap();
4294        let f: f64 = result.parse().unwrap();
4295        assert!((f - 6.5).abs() < 0.001);
4296    }
4297
4298    #[test]
4299    fn incr_by_float_wrong_type() {
4300        let mut ks = Keyspace::new();
4301        ks.lpush("mylist", &[Bytes::from("a")]).unwrap();
4302        let err = ks.incr_by_float("mylist", 1.0).unwrap_err();
4303        assert_eq!(err, IncrFloatError::WrongType);
4304    }
4305
4306    #[test]
4307    fn incr_by_float_not_a_float() {
4308        let mut ks = Keyspace::new();
4309        ks.set("s".into(), Bytes::from("hello"), None);
4310        let err = ks.incr_by_float("s", 1.0).unwrap_err();
4311        assert_eq!(err, IncrFloatError::NotAFloat);
4312    }
4313
4314    #[test]
4315    fn append_to_existing_key() {
4316        let mut ks = Keyspace::new();
4317        ks.set("key".into(), Bytes::from("hello"), None);
4318        let len = ks.append("key", b" world").unwrap();
4319        assert_eq!(len, 11);
4320        assert_eq!(
4321            ks.get("key").unwrap(),
4322            Some(Value::String(Bytes::from("hello world")))
4323        );
4324    }
4325
4326    #[test]
4327    fn append_to_new_key() {
4328        let mut ks = Keyspace::new();
4329        let len = ks.append("new", b"value").unwrap();
4330        assert_eq!(len, 5);
4331        assert_eq!(
4332            ks.get("new").unwrap(),
4333            Some(Value::String(Bytes::from("value")))
4334        );
4335    }
4336
4337    #[test]
4338    fn append_wrong_type() {
4339        let mut ks = Keyspace::new();
4340        ks.lpush("mylist", &[Bytes::from("a")]).unwrap();
4341        let err = ks.append("mylist", b"value").unwrap_err();
4342        assert_eq!(err, WriteError::WrongType);
4343    }
4344
4345    #[test]
4346    fn strlen_existing_key() {
4347        let mut ks = Keyspace::new();
4348        ks.set("key".into(), Bytes::from("hello"), None);
4349        assert_eq!(ks.strlen("key").unwrap(), 5);
4350    }
4351
4352    #[test]
4353    fn strlen_missing_key() {
4354        let mut ks = Keyspace::new();
4355        assert_eq!(ks.strlen("missing").unwrap(), 0);
4356    }
4357
4358    #[test]
4359    fn strlen_wrong_type() {
4360        let mut ks = Keyspace::new();
4361        ks.lpush("mylist", &[Bytes::from("a")]).unwrap();
4362        let err = ks.strlen("mylist").unwrap_err();
4363        assert_eq!(err, WrongType);
4364    }
4365
4366    #[test]
4367    fn format_float_integers() {
4368        assert_eq!(super::format_float(10.0), "10");
4369        assert_eq!(super::format_float(0.0), "0");
4370        assert_eq!(super::format_float(-5.0), "-5");
4371    }
4372
4373    #[test]
4374    fn format_float_decimals() {
4375        assert_eq!(super::format_float(2.72), "2.72");
4376        assert_eq!(super::format_float(10.5), "10.5");
4377    }
4378
4379    // --- keys tests ---
4380
4381    #[test]
4382    fn keys_match_all() {
4383        let mut ks = Keyspace::new();
4384        ks.set("a".into(), Bytes::from("1"), None);
4385        ks.set("b".into(), Bytes::from("2"), None);
4386        ks.set("c".into(), Bytes::from("3"), None);
4387        let mut result = ks.keys("*");
4388        result.sort();
4389        assert_eq!(result, vec!["a", "b", "c"]);
4390    }
4391
4392    #[test]
4393    fn keys_with_pattern() {
4394        let mut ks = Keyspace::new();
4395        ks.set("user:1".into(), Bytes::from("a"), None);
4396        ks.set("user:2".into(), Bytes::from("b"), None);
4397        ks.set("item:1".into(), Bytes::from("c"), None);
4398        let mut result = ks.keys("user:*");
4399        result.sort();
4400        assert_eq!(result, vec!["user:1", "user:2"]);
4401    }
4402
4403    #[test]
4404    fn keys_skips_expired() {
4405        let mut ks = Keyspace::new();
4406        ks.set("live".into(), Bytes::from("a"), None);
4407        ks.set(
4408            "dead".into(),
4409            Bytes::from("b"),
4410            Some(Duration::from_millis(1)),
4411        );
4412        thread::sleep(Duration::from_millis(5));
4413        let result = ks.keys("*");
4414        assert_eq!(result, vec!["live"]);
4415    }
4416
4417    #[test]
4418    fn keys_empty_keyspace() {
4419        let ks = Keyspace::new();
4420        assert!(ks.keys("*").is_empty());
4421    }
4422
4423    // --- rename tests ---
4424
4425    #[test]
4426    fn rename_basic() {
4427        let mut ks = Keyspace::new();
4428        ks.set("old".into(), Bytes::from("value"), None);
4429        ks.rename("old", "new").unwrap();
4430        assert!(!ks.exists("old"));
4431        assert_eq!(
4432            ks.get("new").unwrap(),
4433            Some(Value::String(Bytes::from("value")))
4434        );
4435    }
4436
4437    #[test]
4438    fn rename_preserves_expiry() {
4439        let mut ks = Keyspace::new();
4440        ks.set(
4441            "old".into(),
4442            Bytes::from("val"),
4443            Some(Duration::from_secs(60)),
4444        );
4445        ks.rename("old", "new").unwrap();
4446        match ks.ttl("new") {
4447            TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
4448            other => panic!("expected TTL preserved, got {other:?}"),
4449        }
4450    }
4451
4452    #[test]
4453    fn rename_overwrites_destination() {
4454        let mut ks = Keyspace::new();
4455        ks.set("src".into(), Bytes::from("new_val"), None);
4456        ks.set("dst".into(), Bytes::from("old_val"), None);
4457        ks.rename("src", "dst").unwrap();
4458        assert!(!ks.exists("src"));
4459        assert_eq!(
4460            ks.get("dst").unwrap(),
4461            Some(Value::String(Bytes::from("new_val")))
4462        );
4463        assert_eq!(ks.len(), 1);
4464    }
4465
4466    #[test]
4467    fn rename_missing_key_returns_error() {
4468        let mut ks = Keyspace::new();
4469        let err = ks.rename("missing", "new").unwrap_err();
4470        assert_eq!(err, RenameError::NoSuchKey);
4471    }
4472
4473    #[test]
4474    fn rename_same_key() {
4475        let mut ks = Keyspace::new();
4476        ks.set("key".into(), Bytes::from("val"), None);
4477        // renaming to itself should succeed (Redis behavior)
4478        ks.rename("key", "key").unwrap();
4479        assert_eq!(
4480            ks.get("key").unwrap(),
4481            Some(Value::String(Bytes::from("val")))
4482        );
4483    }
4484
4485    #[test]
4486    fn rename_tracks_memory() {
4487        let mut ks = Keyspace::new();
4488        ks.set("old".into(), Bytes::from("value"), None);
4489        let before = ks.stats().used_bytes;
4490        ks.rename("old", "new").unwrap();
4491        let after = ks.stats().used_bytes;
4492        // same key length, so memory should be the same
4493        assert_eq!(before, after);
4494        assert_eq!(ks.stats().key_count, 1);
4495    }
4496
4497    #[test]
4498    fn count_keys_in_slot_empty() {
4499        let ks = Keyspace::new();
4500        assert_eq!(ks.count_keys_in_slot(0), 0);
4501    }
4502
4503    #[test]
4504    fn count_keys_in_slot_matches() {
4505        let mut ks = Keyspace::new();
4506        // insert a few keys and count those in a specific slot
4507        ks.set("a".into(), Bytes::from("1"), None);
4508        ks.set("b".into(), Bytes::from("2"), None);
4509        ks.set("c".into(), Bytes::from("3"), None);
4510
4511        let slot_a = ember_cluster::key_slot(b"a");
4512        let count = ks.count_keys_in_slot(slot_a);
4513        // at minimum, "a" should be in its own slot
4514        assert!(count >= 1);
4515    }
4516
4517    #[test]
4518    fn count_keys_in_slot_skips_expired() {
4519        let mut ks = Keyspace::new();
4520        let slot = ember_cluster::key_slot(b"temp");
4521        ks.set(
4522            "temp".into(),
4523            Bytes::from("gone"),
4524            Some(Duration::from_millis(0)),
4525        );
4526        // key is expired — should not be counted
4527        thread::sleep(Duration::from_millis(5));
4528        assert_eq!(ks.count_keys_in_slot(slot), 0);
4529    }
4530
4531    #[test]
4532    fn get_keys_in_slot_returns_matching() {
4533        let mut ks = Keyspace::new();
4534        ks.set("x".into(), Bytes::from("1"), None);
4535        ks.set("y".into(), Bytes::from("2"), None);
4536
4537        let slot_x = ember_cluster::key_slot(b"x");
4538        let keys = ks.get_keys_in_slot(slot_x, 100);
4539        assert!(keys.contains(&"x".to_string()));
4540    }
4541
4542    #[test]
4543    fn get_keys_in_slot_respects_count_limit() {
4544        let mut ks = Keyspace::new();
4545        // insert several keys — some might share a slot
4546        for i in 0..100 {
4547            ks.set(format!("key:{i}"), Bytes::from("v"), None);
4548        }
4549        // ask for at most 3 keys from slot 0
4550        let keys = ks.get_keys_in_slot(0, 3);
4551        assert!(keys.len() <= 3);
4552    }
4553}