Skip to main content

ember_core/
keyspace.rs

1//! The keyspace: Ember's core key-value store.
2//!
3//! A `Keyspace` owns a flat `HashMap<String, Entry>` and handles
4//! get, set, delete, existence checks, and TTL management. Expired
5//! keys are removed lazily on access. Memory usage is tracked on
6//! every mutation for eviction and stats reporting.
7
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11use bytes::Bytes;
12use rand::seq::IteratorRandom;
13
14use crate::memory::{self, MemoryTracker};
15use crate::types::sorted_set::{SortedSet, ZAddFlags};
16use crate::types::{self, normalize_range, Value};
17
18/// Error returned when a command is used against a key holding the wrong type.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct WrongType;
21
22impl std::fmt::Display for WrongType {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        write!(
25            f,
26            "WRONGTYPE Operation against a key holding the wrong kind of value"
27        )
28    }
29}
30
31impl std::error::Error for WrongType {}
32
33/// Error returned by write operations that may fail due to type mismatch
34/// or memory limits.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum WriteError {
37    /// The key holds a different type than expected.
38    WrongType,
39    /// Memory limit reached and eviction couldn't free enough space.
40    OutOfMemory,
41}
42
43impl From<WrongType> for WriteError {
44    fn from(_: WrongType) -> Self {
45        WriteError::WrongType
46    }
47}
48
49/// Errors that can occur during INCR/DECR operations.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum IncrError {
52    /// Key holds a non-string type.
53    WrongType,
54    /// Value is not a valid integer.
55    NotAnInteger,
56    /// Increment or decrement would overflow i64.
57    Overflow,
58    /// Memory limit reached.
59    OutOfMemory,
60}
61
62impl std::fmt::Display for IncrError {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            IncrError::WrongType => write!(
66                f,
67                "WRONGTYPE Operation against a key holding the wrong kind of value"
68            ),
69            IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
70            IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
71            IncrError::OutOfMemory => {
72                write!(f, "OOM command not allowed when used memory > 'maxmemory'")
73            }
74        }
75    }
76}
77
78impl std::error::Error for IncrError {}
79/// Result of a ZADD operation, containing both the client-facing count
80/// and the list of members that were actually applied (for AOF correctness).
81#[derive(Debug, Clone)]
82pub struct ZAddResult {
83    /// Number of members added (or added+updated if CH flag was set).
84    pub count: usize,
85    /// Members that were actually inserted or had their score updated.
86    /// Only these should be persisted to the AOF.
87    pub applied: Vec<(f64, String)>,
88}
89
90/// How the keyspace should handle writes when the memory limit is reached.
91#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
92pub enum EvictionPolicy {
93    /// Return an error on writes when memory is full.
94    #[default]
95    NoEviction,
96    /// Evict the least-recently-used key (approximated via random sampling).
97    AllKeysLru,
98}
99
100/// Configuration for a single keyspace / shard.
101#[derive(Debug, Clone)]
102pub struct ShardConfig {
103    /// Maximum memory in bytes. `None` means unlimited.
104    pub max_memory: Option<usize>,
105    /// What to do when memory is full.
106    pub eviction_policy: EvictionPolicy,
107    /// Numeric identifier for this shard (used for persistence file naming).
108    pub shard_id: u16,
109}
110
111impl Default for ShardConfig {
112    fn default() -> Self {
113        Self {
114            max_memory: None,
115            eviction_policy: EvictionPolicy::NoEviction,
116            shard_id: 0,
117        }
118    }
119}
120
121/// Result of a set operation that may fail under memory pressure.
122#[derive(Debug, PartialEq, Eq)]
123pub enum SetResult {
124    /// The key was stored successfully.
125    Ok,
126    /// Memory limit reached and eviction policy is NoEviction.
127    OutOfMemory,
128}
129
130/// A single entry in the keyspace: a value plus optional expiration
131/// and last access time for LRU approximation.
132#[derive(Debug, Clone)]
133pub(crate) struct Entry {
134    pub(crate) value: Value,
135    pub(crate) expires_at: Option<Instant>,
136    pub(crate) last_access: Instant,
137}
138
139impl Entry {
140    fn new(value: Value, expires_at: Option<Instant>) -> Self {
141        Self {
142            value,
143            expires_at,
144            last_access: Instant::now(),
145        }
146    }
147
148    /// Returns `true` if this entry has passed its expiration time.
149    fn is_expired(&self) -> bool {
150        match self.expires_at {
151            Some(deadline) => Instant::now() >= deadline,
152            None => false,
153        }
154    }
155
156    /// Marks this entry as accessed right now.
157    fn touch(&mut self) {
158        self.last_access = Instant::now();
159    }
160}
161
162/// Result of a TTL query, matching Redis semantics.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum TtlResult {
165    /// Key exists and has a TTL. Returns remaining seconds.
166    Seconds(u64),
167    /// Key exists and has a TTL. Returns remaining milliseconds.
168    Milliseconds(u64),
169    /// Key exists but has no expiration set.
170    NoExpiry,
171    /// Key does not exist.
172    NotFound,
173}
174
175/// Aggregated statistics for a keyspace.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct KeyspaceStats {
178    /// Number of live keys.
179    pub key_count: usize,
180    /// Estimated memory usage in bytes.
181    pub used_bytes: usize,
182    /// Number of keys with an expiration set.
183    pub keys_with_expiry: usize,
184}
185
186/// Number of random keys to sample when looking for an eviction candidate.
187///
188/// Eviction uses sampling-based approximate LRU — we randomly select this many
189/// keys and evict the least-recently-accessed among them. This trades perfect
190/// LRU accuracy for O(1) eviction (no sorted structure to maintain).
191///
192/// Larger sample sizes give better LRU approximation but cost more per eviction.
193/// 16 is a reasonable balance — similar to Redis's default sample size. With
194/// 16 samples, we statistically find a good eviction candidate while keeping
195/// eviction overhead low even at millions of keys.
196const EVICTION_SAMPLE_SIZE: usize = 16;
197
198/// The core key-value store.
199///
200/// All operations are single-threaded per shard — no internal locking.
201/// Memory usage is tracked incrementally on every mutation.
202pub struct Keyspace {
203    entries: HashMap<String, Entry>,
204    memory: MemoryTracker,
205    config: ShardConfig,
206    /// Number of entries that currently have an expiration set.
207    expiry_count: usize,
208}
209
210impl Keyspace {
211    /// Creates a new, empty keyspace with default config (no memory limit).
212    pub fn new() -> Self {
213        Self::with_config(ShardConfig::default())
214    }
215
216    /// Creates a new, empty keyspace with the given config.
217    pub fn with_config(config: ShardConfig) -> Self {
218        Self {
219            entries: HashMap::new(),
220            memory: MemoryTracker::new(),
221            config,
222            expiry_count: 0,
223        }
224    }
225
226    /// Retrieves the string value for `key`, or `None` if missing/expired.
227    ///
228    /// Returns `Err(WrongType)` if the key holds a non-string value.
229    /// Expired keys are removed lazily on access. Successful reads update
230    /// the entry's last access time for LRU tracking.
231    pub fn get(&mut self, key: &str) -> Result<Option<Value>, WrongType> {
232        if self.remove_if_expired(key) {
233            return Ok(None);
234        }
235        match self.entries.get_mut(key) {
236            Some(e) => match &e.value {
237                Value::String(_) => {
238                    e.touch();
239                    Ok(Some(e.value.clone()))
240                }
241                _ => Err(WrongType),
242            },
243            None => Ok(None),
244        }
245    }
246
247    /// Returns the type name of the value at `key`, or "none" if missing.
248    pub fn value_type(&mut self, key: &str) -> &'static str {
249        if self.remove_if_expired(key) {
250            return "none";
251        }
252        match self.entries.get(key) {
253            Some(e) => types::type_name(&e.value),
254            None => "none",
255        }
256    }
257
258    /// Stores a key-value pair. If the key already existed, the old entry
259    /// (including any TTL) is replaced entirely.
260    ///
261    /// `expire` sets an optional TTL as a duration from now.
262    ///
263    /// Returns `SetResult::OutOfMemory` if the memory limit is reached
264    /// and the eviction policy is `NoEviction`. With `AllKeysLru`, this
265    /// will evict keys to make room before inserting.
266    pub fn set(&mut self, key: String, value: Bytes, expire: Option<Duration>) -> SetResult {
267        let expires_at = expire.map(|d| Instant::now() + d);
268        let new_value = Value::String(value);
269
270        // check memory limit — for overwrites, only the net increase matters
271        let new_size = memory::entry_size(&key, &new_value);
272        let old_size = self
273            .entries
274            .get(&key)
275            .map(|e| memory::entry_size(&key, &e.value))
276            .unwrap_or(0);
277        let net_increase = new_size.saturating_sub(old_size);
278
279        if !self.enforce_memory_limit(net_increase) {
280            return SetResult::OutOfMemory;
281        }
282
283        if let Some(old_entry) = self.entries.get(&key) {
284            self.memory.replace(&key, &old_entry.value, &new_value);
285            // adjust expiry count if the TTL status changed
286            let had_expiry = old_entry.expires_at.is_some();
287            let has_expiry = expires_at.is_some();
288            match (had_expiry, has_expiry) {
289                (false, true) => self.expiry_count += 1,
290                (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
291                _ => {}
292            }
293        } else {
294            self.memory.add(&key, &new_value);
295            if expires_at.is_some() {
296                self.expiry_count += 1;
297            }
298        }
299
300        self.entries.insert(key, Entry::new(new_value, expires_at));
301        SetResult::Ok
302    }
303
304    /// Tries to evict one key using LRU approximation.
305    ///
306    /// Randomly samples `EVICTION_SAMPLE_SIZE` keys and removes the one
307    /// with the oldest `last_access` time. Returns `true` if a key was
308    /// evicted, `false` if the keyspace is empty.
309    fn try_evict(&mut self) -> bool {
310        if self.entries.is_empty() {
311            return false;
312        }
313
314        let mut rng = rand::rng();
315
316        // randomly sample keys and find the least recently accessed one
317        let victim = self
318            .entries
319            .iter()
320            .choose_multiple(&mut rng, EVICTION_SAMPLE_SIZE)
321            .into_iter()
322            .min_by_key(|(_, entry)| entry.last_access)
323            .map(|(k, _)| k.clone());
324
325        if let Some(key) = victim {
326            if let Some(entry) = self.entries.remove(&key) {
327                self.memory.remove(&key, &entry.value);
328                if entry.expires_at.is_some() {
329                    self.expiry_count = self.expiry_count.saturating_sub(1);
330                }
331                return true;
332            }
333        }
334        false
335    }
336
337    /// Checks whether the memory limit allows a write that would increase
338    /// usage by `estimated_increase` bytes. Attempts eviction if the
339    /// policy allows it. Returns `true` if the write can proceed.
340    fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
341        if let Some(max) = self.config.max_memory {
342            while self.memory.used_bytes() + estimated_increase > max {
343                match self.config.eviction_policy {
344                    EvictionPolicy::NoEviction => return false,
345                    EvictionPolicy::AllKeysLru => {
346                        if !self.try_evict() {
347                            return false;
348                        }
349                    }
350                }
351            }
352        }
353        true
354    }
355
356    /// Removes a key. Returns `true` if the key existed (and wasn't expired).
357    pub fn del(&mut self, key: &str) -> bool {
358        if self.remove_if_expired(key) {
359            return false;
360        }
361        if let Some(entry) = self.entries.remove(key) {
362            self.memory.remove(key, &entry.value);
363            if entry.expires_at.is_some() {
364                self.expiry_count = self.expiry_count.saturating_sub(1);
365            }
366            true
367        } else {
368            false
369        }
370    }
371
372    /// Returns `true` if the key exists and hasn't expired.
373    pub fn exists(&mut self, key: &str) -> bool {
374        if self.remove_if_expired(key) {
375            return false;
376        }
377        self.entries.contains_key(key)
378    }
379
380    /// Sets an expiration on an existing key. Returns `true` if the key
381    /// exists (and the TTL was set), `false` if the key doesn't exist.
382    pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
383        if self.remove_if_expired(key) {
384            return false;
385        }
386        match self.entries.get_mut(key) {
387            Some(entry) => {
388                if entry.expires_at.is_none() {
389                    self.expiry_count += 1;
390                }
391                entry.expires_at = Some(Instant::now() + Duration::from_secs(seconds));
392                true
393            }
394            None => false,
395        }
396    }
397
398    /// Returns the TTL status for a key, following Redis semantics:
399    /// - `Seconds(n)` if the key has a TTL
400    /// - `NoExpiry` if the key exists without a TTL
401    /// - `NotFound` if the key doesn't exist
402    pub fn ttl(&mut self, key: &str) -> TtlResult {
403        if self.remove_if_expired(key) {
404            return TtlResult::NotFound;
405        }
406        match self.entries.get(key) {
407            Some(entry) => match entry.expires_at {
408                Some(deadline) => {
409                    let remaining = deadline.saturating_duration_since(Instant::now());
410                    TtlResult::Seconds(remaining.as_secs())
411                }
412                None => TtlResult::NoExpiry,
413            },
414            None => TtlResult::NotFound,
415        }
416    }
417
418    /// Removes the expiration from a key.
419    ///
420    /// Returns `true` if the key existed and had a timeout that was removed.
421    /// Returns `false` if the key doesn't exist or has no expiration.
422    pub fn persist(&mut self, key: &str) -> bool {
423        if self.remove_if_expired(key) {
424            return false;
425        }
426        match self.entries.get_mut(key) {
427            Some(entry) => {
428                if entry.expires_at.is_some() {
429                    entry.expires_at = None;
430                    self.expiry_count = self.expiry_count.saturating_sub(1);
431                    true
432                } else {
433                    false
434                }
435            }
436            None => false,
437        }
438    }
439
440    /// Returns the TTL status for a key in milliseconds, following Redis semantics:
441    /// - `Milliseconds(n)` if the key has a TTL
442    /// - `NoExpiry` if the key exists without a TTL
443    /// - `NotFound` if the key doesn't exist
444    pub fn pttl(&mut self, key: &str) -> TtlResult {
445        if self.remove_if_expired(key) {
446            return TtlResult::NotFound;
447        }
448        match self.entries.get(key) {
449            Some(entry) => match entry.expires_at {
450                Some(deadline) => {
451                    let remaining = deadline.saturating_duration_since(Instant::now());
452                    TtlResult::Milliseconds(remaining.as_millis() as u64)
453                }
454                None => TtlResult::NoExpiry,
455            },
456            None => TtlResult::NotFound,
457        }
458    }
459
460    /// Sets an expiration on an existing key in milliseconds.
461    ///
462    /// Returns `true` if the key exists (and the TTL was set),
463    /// `false` if the key doesn't exist.
464    pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
465        if self.remove_if_expired(key) {
466            return false;
467        }
468        match self.entries.get_mut(key) {
469            Some(entry) => {
470                if entry.expires_at.is_none() {
471                    self.expiry_count += 1;
472                }
473                entry.expires_at = Some(Instant::now() + Duration::from_millis(millis));
474                true
475            }
476            None => false,
477        }
478    }
479
480    /// Increments the integer value of a key by 1.
481    ///
482    /// If the key doesn't exist, it's initialized to 0 before incrementing.
483    /// Returns the new value after the operation.
484    pub fn incr(&mut self, key: &str) -> Result<i64, IncrError> {
485        self.incr_by(key, 1)
486    }
487
488    /// Decrements the integer value of a key by 1.
489    ///
490    /// If the key doesn't exist, it's initialized to 0 before decrementing.
491    /// Returns the new value after the operation.
492    pub fn decr(&mut self, key: &str) -> Result<i64, IncrError> {
493        self.incr_by(key, -1)
494    }
495
496    /// Shared implementation for INCR/DECR. Adds `delta` to the current
497    /// integer value of the key, creating it if necessary.
498    ///
499    /// Preserves the existing TTL when updating an existing key.
500    fn incr_by(&mut self, key: &str, delta: i64) -> Result<i64, IncrError> {
501        self.remove_if_expired(key);
502
503        // read current value and TTL
504        let (current, existing_expire) = match self.entries.get(key) {
505            Some(entry) => {
506                let val = match &entry.value {
507                    Value::String(data) => {
508                        let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
509                        s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
510                    }
511                    _ => return Err(IncrError::WrongType),
512                };
513                let expire = entry
514                    .expires_at
515                    .map(|deadline| deadline.saturating_duration_since(Instant::now()));
516                (val, expire)
517            }
518            None => (0, None),
519        };
520
521        let new_val = current.checked_add(delta).ok_or(IncrError::Overflow)?;
522        let new_bytes = Bytes::from(new_val.to_string());
523
524        match self.set(key.to_owned(), new_bytes, existing_expire) {
525            SetResult::Ok => Ok(new_val),
526            SetResult::OutOfMemory => Err(IncrError::OutOfMemory),
527        }
528    }
529
530    /// Returns aggregated stats for this keyspace.
531    ///
532    /// All fields are tracked incrementally — this is O(1).
533    pub fn stats(&self) -> KeyspaceStats {
534        KeyspaceStats {
535            key_count: self.memory.key_count(),
536            used_bytes: self.memory.used_bytes(),
537            keys_with_expiry: self.expiry_count,
538        }
539    }
540
541    /// Returns the number of live keys.
542    pub fn len(&self) -> usize {
543        self.entries.len()
544    }
545
546    /// Removes all keys from the keyspace.
547    pub fn clear(&mut self) {
548        self.entries.clear();
549        self.memory.reset();
550        self.expiry_count = 0;
551    }
552
553    /// Returns `true` if the keyspace has no entries.
554    pub fn is_empty(&self) -> bool {
555        self.entries.is_empty()
556    }
557
558    /// Scans keys starting from a cursor position.
559    ///
560    /// Returns the next cursor (0 if scan complete) and a batch of keys.
561    /// The `pattern` argument supports glob-style matching (*, ?, [abc]).
562    pub fn scan_keys(
563        &self,
564        cursor: u64,
565        count: usize,
566        pattern: Option<&str>,
567    ) -> (u64, Vec<String>) {
568        let mut keys = Vec::with_capacity(count);
569        let mut position = 0u64;
570        let target_count = if count == 0 { 10 } else { count };
571
572        for (key, entry) in self.entries.iter() {
573            // skip expired entries
574            if entry.is_expired() {
575                continue;
576            }
577
578            // skip entries before cursor
579            if position < cursor {
580                position += 1;
581                continue;
582            }
583
584            // pattern matching
585            if let Some(pat) = pattern {
586                if !glob_match(pat, key) {
587                    position += 1;
588                    continue;
589                }
590            }
591
592            keys.push(key.clone());
593            position += 1;
594
595            if keys.len() >= target_count {
596                // return position as next cursor
597                return (position, keys);
598            }
599        }
600
601        // scan complete
602        (0, keys)
603    }
604
605    /// Iterates over all live (non-expired) entries, yielding the key, a
606    /// clone of the value, and the remaining TTL in milliseconds (-1 for
607    /// entries with no expiration). Used by snapshot and AOF rewrite.
608    pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
609        let now = Instant::now();
610        self.entries.iter().filter_map(move |(key, entry)| {
611            if entry.is_expired() {
612                return None;
613            }
614            let ttl_ms = match entry.expires_at {
615                Some(deadline) => {
616                    let remaining = deadline.saturating_duration_since(now);
617                    remaining.as_millis() as i64
618                }
619                None => -1,
620            };
621            Some((key.as_str(), &entry.value, ttl_ms))
622        })
623    }
624
625    /// Restores an entry during recovery, bypassing memory limits.
626    ///
627    /// If `expires_at` is in the past, the entry is silently skipped.
628    /// This is used only during shard startup when loading from
629    /// snapshot/AOF — normal writes should go through `set()`.
630    pub fn restore(&mut self, key: String, value: Value, expires_at: Option<Instant>) {
631        // skip entries that already expired
632        if let Some(deadline) = expires_at {
633            if Instant::now() >= deadline {
634                return;
635            }
636        }
637
638        // if replacing an existing entry, adjust memory tracking
639        if let Some(old) = self.entries.get(&key) {
640            self.memory.replace(&key, &old.value, &value);
641            let had_expiry = old.expires_at.is_some();
642            let has_expiry = expires_at.is_some();
643            match (had_expiry, has_expiry) {
644                (false, true) => self.expiry_count += 1,
645                (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
646                _ => {}
647            }
648        } else {
649            self.memory.add(&key, &value);
650            if expires_at.is_some() {
651                self.expiry_count += 1;
652            }
653        }
654
655        self.entries.insert(key, Entry::new(value, expires_at));
656    }
657
658    // -- list operations --
659
660    /// Pushes one or more values to the head (left) of a list.
661    ///
662    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
663    /// if the key exists but holds a non-list value, or
664    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
665    /// Returns the new length on success.
666    pub fn lpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
667        self.list_push(key, values, true)
668    }
669
670    /// Pushes one or more values to the tail (right) of a list.
671    ///
672    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
673    /// if the key exists but holds a non-list value, or
674    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
675    /// Returns the new length on success.
676    pub fn rpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
677        self.list_push(key, values, false)
678    }
679
680    /// Pops a value from the head (left) of a list.
681    ///
682    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
683    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
684    pub fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
685        self.list_pop(key, true)
686    }
687
688    /// Pops a value from the tail (right) of a list.
689    ///
690    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
691    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
692    pub fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
693        self.list_pop(key, false)
694    }
695
696    /// Returns a range of elements from a list by index.
697    ///
698    /// Supports negative indices (e.g. -1 = last element). Out-of-bounds
699    /// indices are clamped to the list boundaries. Returns `Err(WrongType)`
700    /// on type mismatch. Missing keys return an empty vec.
701    pub fn lrange(&mut self, key: &str, start: i64, stop: i64) -> Result<Vec<Bytes>, WrongType> {
702        if self.remove_if_expired(key) {
703            return Ok(vec![]);
704        }
705        match self.entries.get_mut(key) {
706            None => Ok(vec![]),
707            Some(entry) => {
708                let result = match &entry.value {
709                    Value::List(deque) => {
710                        let len = deque.len() as i64;
711                        let (s, e) = normalize_range(start, stop, len);
712                        if s > e {
713                            return Ok(vec![]);
714                        }
715                        Ok(deque
716                            .iter()
717                            .skip(s as usize)
718                            .take((e - s + 1) as usize)
719                            .cloned()
720                            .collect())
721                    }
722                    _ => Err(WrongType),
723                };
724                if result.is_ok() {
725                    entry.touch();
726                }
727                result
728            }
729        }
730    }
731
732    /// Returns the length of a list, or 0 if the key doesn't exist.
733    ///
734    /// Returns `Err(WrongType)` on type mismatch.
735    pub fn llen(&mut self, key: &str) -> Result<usize, WrongType> {
736        if self.remove_if_expired(key) {
737            return Ok(0);
738        }
739        match self.entries.get(key) {
740            None => Ok(0),
741            Some(entry) => match &entry.value {
742                Value::List(deque) => Ok(deque.len()),
743                _ => Err(WrongType),
744            },
745        }
746    }
747
748    /// Internal push implementation shared by lpush/rpush.
749    fn list_push(&mut self, key: &str, values: &[Bytes], left: bool) -> Result<usize, WriteError> {
750        self.remove_if_expired(key);
751
752        let is_new = !self.entries.contains_key(key);
753
754        if !is_new && !matches!(self.entries[key].value, Value::List(_)) {
755            return Err(WriteError::WrongType);
756        }
757
758        // estimate the memory increase and enforce the limit before mutating
759        let element_increase: usize = values
760            .iter()
761            .map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
762            .sum();
763        let estimated_increase = if is_new {
764            memory::ENTRY_OVERHEAD + key.len() + memory::VECDEQUE_BASE_OVERHEAD + element_increase
765        } else {
766            element_increase
767        };
768        if !self.enforce_memory_limit(estimated_increase) {
769            return Err(WriteError::OutOfMemory);
770        }
771
772        if is_new {
773            let value = Value::List(VecDeque::new());
774            self.memory.add(key, &value);
775            self.entries.insert(key.to_owned(), Entry::new(value, None));
776        }
777
778        let entry = self
779            .entries
780            .get_mut(key)
781            .expect("just inserted or verified");
782        let old_entry_size = memory::entry_size(key, &entry.value);
783
784        if let Value::List(ref mut deque) = entry.value {
785            for val in values {
786                if left {
787                    deque.push_front(val.clone());
788                } else {
789                    deque.push_back(val.clone());
790                }
791            }
792        }
793        entry.touch();
794
795        let new_entry_size = memory::entry_size(key, &entry.value);
796        self.memory.adjust(old_entry_size, new_entry_size);
797
798        let len = match &entry.value {
799            Value::List(d) => d.len(),
800            _ => unreachable!(),
801        };
802        Ok(len)
803    }
804
805    /// Internal pop implementation shared by lpop/rpop.
806    fn list_pop(&mut self, key: &str, left: bool) -> Result<Option<Bytes>, WrongType> {
807        if self.remove_if_expired(key) {
808            return Ok(None);
809        }
810
811        match self.entries.get(key) {
812            None => return Ok(None),
813            Some(e) => {
814                if !matches!(e.value, Value::List(_)) {
815                    return Err(WrongType);
816                }
817            }
818        };
819
820        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
821        let entry = self.entries.get_mut(key).expect("verified above");
822        let popped = if let Value::List(ref mut deque) = entry.value {
823            if left {
824                deque.pop_front()
825            } else {
826                deque.pop_back()
827            }
828        } else {
829            unreachable!()
830        };
831        entry.touch();
832
833        // check if list is now empty — if so, delete the key entirely
834        let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty());
835        if is_empty {
836            let removed = self.entries.remove(key).expect("verified above");
837            // use remove_with_size since the value was already mutated
838            self.memory.remove_with_size(old_entry_size);
839            if removed.expires_at.is_some() {
840                self.expiry_count = self.expiry_count.saturating_sub(1);
841            }
842        } else {
843            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
844            self.memory.adjust(old_entry_size, new_entry_size);
845        }
846
847        Ok(popped)
848    }
849
850    // -- sorted set operations --
851
852    /// Adds members with scores to a sorted set, with optional ZADD flags.
853    ///
854    /// Creates the sorted set if the key doesn't exist. Returns a
855    /// `ZAddResult` containing the count for the client response and the
856    /// list of members that were actually applied (for AOF correctness).
857    /// Returns `Err(WriteError::WrongType)` on type mismatch, or
858    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
859    pub fn zadd(
860        &mut self,
861        key: &str,
862        members: &[(f64, String)],
863        flags: &ZAddFlags,
864    ) -> Result<ZAddResult, WriteError> {
865        self.remove_if_expired(key);
866
867        let is_new = !self.entries.contains_key(key);
868        if !is_new && !matches!(self.entries[key].value, Value::SortedSet(_)) {
869            return Err(WriteError::WrongType);
870        }
871
872        // worst-case estimate: assume all members are new
873        let member_increase: usize = members
874            .iter()
875            .map(|(_, m)| SortedSet::estimated_member_cost(m))
876            .sum();
877        let estimated_increase = if is_new {
878            memory::ENTRY_OVERHEAD + key.len() + SortedSet::BASE_OVERHEAD + member_increase
879        } else {
880            member_increase
881        };
882        if !self.enforce_memory_limit(estimated_increase) {
883            return Err(WriteError::OutOfMemory);
884        }
885
886        if is_new {
887            let value = Value::SortedSet(SortedSet::new());
888            self.memory.add(key, &value);
889            self.entries.insert(key.to_owned(), Entry::new(value, None));
890        }
891
892        let entry = self
893            .entries
894            .get_mut(key)
895            .expect("just inserted or verified");
896        let old_entry_size = memory::entry_size(key, &entry.value);
897
898        let mut count = 0;
899        let mut applied = Vec::new();
900        if let Value::SortedSet(ref mut ss) = entry.value {
901            for (score, member) in members {
902                let result = ss.add_with_flags(member.clone(), *score, flags);
903                if result.added || result.updated {
904                    applied.push((*score, member.clone()));
905                }
906                if flags.ch {
907                    if result.added || result.updated {
908                        count += 1;
909                    }
910                } else if result.added {
911                    count += 1;
912                }
913            }
914        }
915        entry.touch();
916
917        let new_entry_size = memory::entry_size(key, &entry.value);
918        self.memory.adjust(old_entry_size, new_entry_size);
919
920        // clean up if the set is still empty (e.g. XX on a new key)
921        if let Value::SortedSet(ref ss) = entry.value {
922            if ss.is_empty() {
923                self.memory
924                    .remove_with_size(memory::entry_size(key, &entry.value));
925                self.entries.remove(key);
926            }
927        }
928
929        Ok(ZAddResult { count, applied })
930    }
931
932    /// Removes members from a sorted set. Returns the names of members
933    /// that were actually removed (for AOF correctness). Deletes the key
934    /// if the set becomes empty.
935    ///
936    /// Returns `Err(WrongType)` if the key holds a non-sorted-set value.
937    pub fn zrem(&mut self, key: &str, members: &[String]) -> Result<Vec<String>, WrongType> {
938        if self.remove_if_expired(key) {
939            return Ok(vec![]);
940        }
941
942        match self.entries.get(key) {
943            None => return Ok(vec![]),
944            Some(e) => {
945                if !matches!(e.value, Value::SortedSet(_)) {
946                    return Err(WrongType);
947                }
948            }
949        }
950
951        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
952        let entry = self.entries.get_mut(key).expect("verified above");
953        let mut removed = Vec::new();
954        if let Value::SortedSet(ref mut ss) = entry.value {
955            for member in members {
956                if ss.remove(member) {
957                    removed.push(member.clone());
958                }
959            }
960        }
961        entry.touch();
962
963        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
964        if is_empty {
965            let removed_entry = self.entries.remove(key).expect("verified above");
966            self.memory.remove_with_size(old_entry_size);
967            if removed_entry.expires_at.is_some() {
968                self.expiry_count = self.expiry_count.saturating_sub(1);
969            }
970        } else {
971            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
972            self.memory.adjust(old_entry_size, new_entry_size);
973        }
974
975        Ok(removed)
976    }
977
978    /// Returns the score for a member in a sorted set.
979    ///
980    /// Returns `Ok(None)` if the key or member doesn't exist.
981    /// Returns `Err(WrongType)` on type mismatch.
982    pub fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, WrongType> {
983        if self.remove_if_expired(key) {
984            return Ok(None);
985        }
986        match self.entries.get_mut(key) {
987            None => Ok(None),
988            Some(entry) => match &entry.value {
989                Value::SortedSet(ss) => {
990                    let score = ss.score(member);
991                    entry.touch();
992                    Ok(score)
993                }
994                _ => Err(WrongType),
995            },
996        }
997    }
998
999    /// Returns the 0-based rank of a member in a sorted set (lowest score = 0).
1000    ///
1001    /// Returns `Ok(None)` if the key or member doesn't exist.
1002    /// Returns `Err(WrongType)` on type mismatch.
1003    pub fn zrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
1004        if self.remove_if_expired(key) {
1005            return Ok(None);
1006        }
1007        match self.entries.get_mut(key) {
1008            None => Ok(None),
1009            Some(entry) => match &entry.value {
1010                Value::SortedSet(ss) => {
1011                    let rank = ss.rank(member);
1012                    entry.touch();
1013                    Ok(rank)
1014                }
1015                _ => Err(WrongType),
1016            },
1017        }
1018    }
1019
1020    /// Returns a range of members from a sorted set by rank.
1021    ///
1022    /// Supports negative indices. If `with_scores` is true, the result
1023    /// includes `(member, score)` pairs; otherwise just members.
1024    /// Returns `Err(WrongType)` on type mismatch.
1025    pub fn zrange(
1026        &mut self,
1027        key: &str,
1028        start: i64,
1029        stop: i64,
1030    ) -> Result<Vec<(String, f64)>, WrongType> {
1031        if self.remove_if_expired(key) {
1032            return Ok(vec![]);
1033        }
1034        match self.entries.get_mut(key) {
1035            None => Ok(vec![]),
1036            Some(entry) => {
1037                let result = match &entry.value {
1038                    Value::SortedSet(ss) => {
1039                        let items = ss.range_by_rank(start, stop);
1040                        Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
1041                    }
1042                    _ => Err(WrongType),
1043                };
1044                if result.is_ok() {
1045                    entry.touch();
1046                }
1047                result
1048            }
1049        }
1050    }
1051
1052    /// Returns the number of members in a sorted set, or 0 if the key doesn't exist.
1053    ///
1054    /// Returns `Err(WrongType)` on type mismatch.
1055    pub fn zcard(&mut self, key: &str) -> Result<usize, WrongType> {
1056        if self.remove_if_expired(key) {
1057            return Ok(0);
1058        }
1059        match self.entries.get(key) {
1060            None => Ok(0),
1061            Some(entry) => match &entry.value {
1062                Value::SortedSet(ss) => Ok(ss.len()),
1063                _ => Err(WrongType),
1064            },
1065        }
1066    }
1067
1068    /// Randomly samples up to `count` keys and removes any that have expired.
1069    ///
1070    /// Returns the number of keys actually removed. Used by the active
1071    /// expiration cycle to clean up keys that no one is reading.
1072    pub fn expire_sample(&mut self, count: usize) -> usize {
1073        if self.entries.is_empty() {
1074            return 0;
1075        }
1076
1077        let mut rng = rand::rng();
1078
1079        let keys_to_check: Vec<String> = self
1080            .entries
1081            .keys()
1082            .choose_multiple(&mut rng, count)
1083            .into_iter()
1084            .cloned()
1085            .collect();
1086
1087        let mut removed = 0;
1088        for key in &keys_to_check {
1089            if self.remove_if_expired(key) {
1090                removed += 1;
1091            }
1092        }
1093        removed
1094    }
1095
1096    /// Checks if a key is expired and removes it if so. Returns `true`
1097    /// if the key was removed (or didn't exist).
1098    fn remove_if_expired(&mut self, key: &str) -> bool {
1099        let expired = self
1100            .entries
1101            .get(key)
1102            .map(|e| e.is_expired())
1103            .unwrap_or(false);
1104
1105        if expired {
1106            if let Some(entry) = self.entries.remove(key) {
1107                self.memory.remove(key, &entry.value);
1108                if entry.expires_at.is_some() {
1109                    self.expiry_count = self.expiry_count.saturating_sub(1);
1110                }
1111            }
1112        }
1113        expired
1114    }
1115}
1116
1117impl Default for Keyspace {
1118    fn default() -> Self {
1119        Self::new()
1120    }
1121}
1122
1123/// Glob-style pattern matching for SCAN's MATCH option.
1124///
1125/// Supports:
1126/// - `*` matches any sequence of characters (including empty)
1127/// - `?` matches exactly one character
1128/// - `[abc]` matches one character from the set
1129/// - `[^abc]` or `[!abc]` matches one character NOT in the set
1130///
1131/// Uses an iterative two-pointer algorithm with backtracking for O(n*m)
1132/// worst-case performance, where n is pattern length and m is text length.
1133fn glob_match(pattern: &str, text: &str) -> bool {
1134    let pat: Vec<char> = pattern.chars().collect();
1135    let txt: Vec<char> = text.chars().collect();
1136
1137    let mut pi = 0; // pattern index
1138    let mut ti = 0; // text index
1139
1140    // backtracking state for the most recent '*'
1141    let mut star_pi: Option<usize> = None;
1142    let mut star_ti: usize = 0;
1143
1144    while ti < txt.len() || pi < pat.len() {
1145        if pi < pat.len() {
1146            match pat[pi] {
1147                '*' => {
1148                    // record star position and try matching zero chars first
1149                    star_pi = Some(pi);
1150                    star_ti = ti;
1151                    pi += 1;
1152                    continue;
1153                }
1154                '?' if ti < txt.len() => {
1155                    pi += 1;
1156                    ti += 1;
1157                    continue;
1158                }
1159                '[' if ti < txt.len() => {
1160                    // parse character class
1161                    let tc = txt[ti];
1162                    let mut j = pi + 1;
1163                    let mut negated = false;
1164                    let mut matched = false;
1165
1166                    if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1167                        negated = true;
1168                        j += 1;
1169                    }
1170
1171                    while j < pat.len() && pat[j] != ']' {
1172                        if pat[j] == tc {
1173                            matched = true;
1174                        }
1175                        j += 1;
1176                    }
1177
1178                    if negated {
1179                        matched = !matched;
1180                    }
1181
1182                    if matched && j < pat.len() {
1183                        pi = j + 1; // skip past ']'
1184                        ti += 1;
1185                        continue;
1186                    }
1187                    // fall through to backtrack
1188                }
1189                c if ti < txt.len() && c == txt[ti] => {
1190                    pi += 1;
1191                    ti += 1;
1192                    continue;
1193                }
1194                _ => {}
1195            }
1196        }
1197
1198        // mismatch or end of pattern — try backtracking to last '*'
1199        if let Some(sp) = star_pi {
1200            pi = sp + 1;
1201            star_ti += 1;
1202            ti = star_ti;
1203            if ti > txt.len() {
1204                return false;
1205            }
1206        } else {
1207            return false;
1208        }
1209    }
1210
1211    // skip trailing '*' in pattern
1212    while pi < pat.len() && pat[pi] == '*' {
1213        pi += 1;
1214    }
1215
1216    pi == pat.len()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222    use std::thread;
1223
1224    #[test]
1225    fn set_and_get() {
1226        let mut ks = Keyspace::new();
1227        ks.set("hello".into(), Bytes::from("world"), None);
1228        assert_eq!(
1229            ks.get("hello").unwrap(),
1230            Some(Value::String(Bytes::from("world")))
1231        );
1232    }
1233
1234    #[test]
1235    fn get_missing_key() {
1236        let mut ks = Keyspace::new();
1237        assert_eq!(ks.get("nope").unwrap(), None);
1238    }
1239
1240    #[test]
1241    fn overwrite_replaces_value() {
1242        let mut ks = Keyspace::new();
1243        ks.set("key".into(), Bytes::from("first"), None);
1244        ks.set("key".into(), Bytes::from("second"), None);
1245        assert_eq!(
1246            ks.get("key").unwrap(),
1247            Some(Value::String(Bytes::from("second")))
1248        );
1249    }
1250
1251    #[test]
1252    fn overwrite_clears_old_ttl() {
1253        let mut ks = Keyspace::new();
1254        ks.set(
1255            "key".into(),
1256            Bytes::from("v1"),
1257            Some(Duration::from_secs(100)),
1258        );
1259        // overwrite without TTL — should clear the old one
1260        ks.set("key".into(), Bytes::from("v2"), None);
1261        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1262    }
1263
1264    #[test]
1265    fn del_existing() {
1266        let mut ks = Keyspace::new();
1267        ks.set("key".into(), Bytes::from("val"), None);
1268        assert!(ks.del("key"));
1269        assert_eq!(ks.get("key").unwrap(), None);
1270    }
1271
1272    #[test]
1273    fn del_missing() {
1274        let mut ks = Keyspace::new();
1275        assert!(!ks.del("nope"));
1276    }
1277
1278    #[test]
1279    fn exists_present_and_absent() {
1280        let mut ks = Keyspace::new();
1281        ks.set("yes".into(), Bytes::from("here"), None);
1282        assert!(ks.exists("yes"));
1283        assert!(!ks.exists("no"));
1284    }
1285
1286    #[test]
1287    fn expired_key_returns_none() {
1288        let mut ks = Keyspace::new();
1289        ks.set(
1290            "temp".into(),
1291            Bytes::from("gone"),
1292            Some(Duration::from_millis(10)),
1293        );
1294        // wait for expiration
1295        thread::sleep(Duration::from_millis(30));
1296        assert_eq!(ks.get("temp").unwrap(), None);
1297        // should also be gone from exists
1298        assert!(!ks.exists("temp"));
1299    }
1300
1301    #[test]
1302    fn ttl_no_expiry() {
1303        let mut ks = Keyspace::new();
1304        ks.set("key".into(), Bytes::from("val"), None);
1305        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1306    }
1307
1308    #[test]
1309    fn ttl_not_found() {
1310        let mut ks = Keyspace::new();
1311        assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1312    }
1313
1314    #[test]
1315    fn ttl_with_expiry() {
1316        let mut ks = Keyspace::new();
1317        ks.set(
1318            "key".into(),
1319            Bytes::from("val"),
1320            Some(Duration::from_secs(100)),
1321        );
1322        match ks.ttl("key") {
1323            TtlResult::Seconds(s) => assert!(s >= 98 && s <= 100),
1324            other => panic!("expected Seconds, got {other:?}"),
1325        }
1326    }
1327
1328    #[test]
1329    fn ttl_expired_key() {
1330        let mut ks = Keyspace::new();
1331        ks.set(
1332            "temp".into(),
1333            Bytes::from("val"),
1334            Some(Duration::from_millis(10)),
1335        );
1336        thread::sleep(Duration::from_millis(30));
1337        assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1338    }
1339
1340    #[test]
1341    fn expire_existing_key() {
1342        let mut ks = Keyspace::new();
1343        ks.set("key".into(), Bytes::from("val"), None);
1344        assert!(ks.expire("key", 60));
1345        match ks.ttl("key") {
1346            TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
1347            other => panic!("expected Seconds, got {other:?}"),
1348        }
1349    }
1350
1351    #[test]
1352    fn expire_missing_key() {
1353        let mut ks = Keyspace::new();
1354        assert!(!ks.expire("nope", 60));
1355    }
1356
1357    #[test]
1358    fn del_expired_key_returns_false() {
1359        let mut ks = Keyspace::new();
1360        ks.set(
1361            "temp".into(),
1362            Bytes::from("val"),
1363            Some(Duration::from_millis(10)),
1364        );
1365        thread::sleep(Duration::from_millis(30));
1366        // key is expired, del should return false (not found)
1367        assert!(!ks.del("temp"));
1368    }
1369
1370    // -- memory tracking tests --
1371
1372    #[test]
1373    fn memory_increases_on_set() {
1374        let mut ks = Keyspace::new();
1375        assert_eq!(ks.stats().used_bytes, 0);
1376        ks.set("key".into(), Bytes::from("value"), None);
1377        assert!(ks.stats().used_bytes > 0);
1378        assert_eq!(ks.stats().key_count, 1);
1379    }
1380
1381    #[test]
1382    fn memory_decreases_on_del() {
1383        let mut ks = Keyspace::new();
1384        ks.set("key".into(), Bytes::from("value"), None);
1385        let after_set = ks.stats().used_bytes;
1386        ks.del("key");
1387        assert_eq!(ks.stats().used_bytes, 0);
1388        assert!(after_set > 0);
1389    }
1390
1391    #[test]
1392    fn memory_adjusts_on_overwrite() {
1393        let mut ks = Keyspace::new();
1394        ks.set("key".into(), Bytes::from("short"), None);
1395        let small = ks.stats().used_bytes;
1396
1397        ks.set("key".into(), Bytes::from("a much longer value"), None);
1398        let large = ks.stats().used_bytes;
1399
1400        assert!(large > small);
1401        assert_eq!(ks.stats().key_count, 1);
1402    }
1403
1404    #[test]
1405    fn memory_decreases_on_expired_removal() {
1406        let mut ks = Keyspace::new();
1407        ks.set(
1408            "temp".into(),
1409            Bytes::from("data"),
1410            Some(Duration::from_millis(10)),
1411        );
1412        assert!(ks.stats().used_bytes > 0);
1413        thread::sleep(Duration::from_millis(30));
1414        // trigger lazy expiration
1415        let _ = ks.get("temp");
1416        assert_eq!(ks.stats().used_bytes, 0);
1417        assert_eq!(ks.stats().key_count, 0);
1418    }
1419
1420    #[test]
1421    fn stats_tracks_expiry_count() {
1422        let mut ks = Keyspace::new();
1423        ks.set("a".into(), Bytes::from("1"), None);
1424        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1425        ks.set("c".into(), Bytes::from("3"), Some(Duration::from_secs(200)));
1426
1427        let stats = ks.stats();
1428        assert_eq!(stats.key_count, 3);
1429        assert_eq!(stats.keys_with_expiry, 2);
1430    }
1431
1432    // -- eviction tests --
1433
1434    #[test]
1435    fn noeviction_returns_oom_when_full() {
1436        // one entry with key "a" + value "val" = 1 + 3 + 96 = 100 bytes
1437        // set limit so one entry fits but two don't
1438        let config = ShardConfig {
1439            max_memory: Some(150),
1440            eviction_policy: EvictionPolicy::NoEviction,
1441            ..ShardConfig::default()
1442        };
1443        let mut ks = Keyspace::with_config(config);
1444
1445        // first key should fit
1446        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1447
1448        // second key should push us over the limit
1449        let result = ks.set("b".into(), Bytes::from("val"), None);
1450        assert_eq!(result, SetResult::OutOfMemory);
1451
1452        // original key should still be there
1453        assert!(ks.exists("a"));
1454    }
1455
1456    #[test]
1457    fn lru_eviction_makes_room() {
1458        let config = ShardConfig {
1459            max_memory: Some(150),
1460            eviction_policy: EvictionPolicy::AllKeysLru,
1461            ..ShardConfig::default()
1462        };
1463        let mut ks = Keyspace::with_config(config);
1464
1465        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1466
1467        // this should trigger eviction of "a" to make room
1468        assert_eq!(ks.set("b".into(), Bytes::from("val"), None), SetResult::Ok);
1469
1470        // "a" should have been evicted
1471        assert!(!ks.exists("a"));
1472        assert!(ks.exists("b"));
1473    }
1474
1475    #[test]
1476    fn overwrite_same_size_succeeds_at_limit() {
1477        let config = ShardConfig {
1478            max_memory: Some(150),
1479            eviction_policy: EvictionPolicy::NoEviction,
1480            ..ShardConfig::default()
1481        };
1482        let mut ks = Keyspace::with_config(config);
1483
1484        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1485
1486        // overwriting with same-size value should succeed — no net increase
1487        assert_eq!(ks.set("a".into(), Bytes::from("new"), None), SetResult::Ok);
1488        assert_eq!(
1489            ks.get("a").unwrap(),
1490            Some(Value::String(Bytes::from("new")))
1491        );
1492    }
1493
1494    #[test]
1495    fn overwrite_larger_value_respects_limit() {
1496        let config = ShardConfig {
1497            max_memory: Some(150),
1498            eviction_policy: EvictionPolicy::NoEviction,
1499            ..ShardConfig::default()
1500        };
1501        let mut ks = Keyspace::with_config(config);
1502
1503        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1504
1505        // overwriting with a much larger value should fail if it exceeds limit
1506        let big_value = "x".repeat(200);
1507        let result = ks.set("a".into(), Bytes::from(big_value), None);
1508        assert_eq!(result, SetResult::OutOfMemory);
1509
1510        // original value should still be intact
1511        assert_eq!(
1512            ks.get("a").unwrap(),
1513            Some(Value::String(Bytes::from("val")))
1514        );
1515    }
1516
1517    // -- iter_entries tests --
1518
1519    #[test]
1520    fn iter_entries_returns_live_entries() {
1521        let mut ks = Keyspace::new();
1522        ks.set("a".into(), Bytes::from("1"), None);
1523        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1524
1525        let entries: Vec<_> = ks.iter_entries().collect();
1526        assert_eq!(entries.len(), 2);
1527    }
1528
1529    #[test]
1530    fn iter_entries_skips_expired() {
1531        let mut ks = Keyspace::new();
1532        ks.set(
1533            "dead".into(),
1534            Bytes::from("gone"),
1535            Some(Duration::from_millis(1)),
1536        );
1537        ks.set("alive".into(), Bytes::from("here"), None);
1538        thread::sleep(Duration::from_millis(10));
1539
1540        let entries: Vec<_> = ks.iter_entries().collect();
1541        assert_eq!(entries.len(), 1);
1542        assert_eq!(entries[0].0, "alive");
1543    }
1544
1545    #[test]
1546    fn iter_entries_ttl_for_no_expiry() {
1547        let mut ks = Keyspace::new();
1548        ks.set("permanent".into(), Bytes::from("val"), None);
1549
1550        let entries: Vec<_> = ks.iter_entries().collect();
1551        assert_eq!(entries[0].2, -1);
1552    }
1553
1554    // -- restore tests --
1555
1556    #[test]
1557    fn restore_adds_entry() {
1558        let mut ks = Keyspace::new();
1559        ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1560        assert_eq!(
1561            ks.get("restored").unwrap(),
1562            Some(Value::String(Bytes::from("data")))
1563        );
1564        assert_eq!(ks.stats().key_count, 1);
1565    }
1566
1567    #[test]
1568    fn restore_skips_past_deadline() {
1569        let mut ks = Keyspace::new();
1570        // deadline already passed
1571        let past = Instant::now() - Duration::from_secs(1);
1572        ks.restore(
1573            "expired".into(),
1574            Value::String(Bytes::from("old")),
1575            Some(past),
1576        );
1577        assert!(ks.is_empty());
1578    }
1579
1580    #[test]
1581    fn restore_overwrites_existing() {
1582        let mut ks = Keyspace::new();
1583        ks.set("key".into(), Bytes::from("old"), None);
1584        ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1585        assert_eq!(
1586            ks.get("key").unwrap(),
1587            Some(Value::String(Bytes::from("new")))
1588        );
1589        assert_eq!(ks.stats().key_count, 1);
1590    }
1591
1592    #[test]
1593    fn restore_bypasses_memory_limit() {
1594        let config = ShardConfig {
1595            max_memory: Some(50), // very small
1596            eviction_policy: EvictionPolicy::NoEviction,
1597            ..ShardConfig::default()
1598        };
1599        let mut ks = Keyspace::with_config(config);
1600
1601        // normal set would fail due to memory limit
1602        ks.restore(
1603            "big".into(),
1604            Value::String(Bytes::from("x".repeat(200))),
1605            None,
1606        );
1607        assert_eq!(ks.stats().key_count, 1);
1608    }
1609
1610    #[test]
1611    fn no_limit_never_rejects() {
1612        // default config has no memory limit
1613        let mut ks = Keyspace::new();
1614        for i in 0..100 {
1615            assert_eq!(
1616                ks.set(format!("key:{i}"), Bytes::from("value"), None),
1617                SetResult::Ok
1618            );
1619        }
1620        assert_eq!(ks.len(), 100);
1621    }
1622
1623    // -- list tests --
1624
1625    #[test]
1626    fn lpush_creates_list() {
1627        let mut ks = Keyspace::new();
1628        let len = ks
1629            .lpush("list", &[Bytes::from("a"), Bytes::from("b")])
1630            .unwrap();
1631        assert_eq!(len, 2);
1632        // lpush pushes each to front, so order is b, a
1633        let items = ks.lrange("list", 0, -1).unwrap();
1634        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("a")]);
1635    }
1636
1637    #[test]
1638    fn rpush_creates_list() {
1639        let mut ks = Keyspace::new();
1640        let len = ks
1641            .rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1642            .unwrap();
1643        assert_eq!(len, 2);
1644        let items = ks.lrange("list", 0, -1).unwrap();
1645        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
1646    }
1647
1648    #[test]
1649    fn push_to_existing_list() {
1650        let mut ks = Keyspace::new();
1651        ks.rpush("list", &[Bytes::from("a")]).unwrap();
1652        let len = ks.rpush("list", &[Bytes::from("b")]).unwrap();
1653        assert_eq!(len, 2);
1654    }
1655
1656    #[test]
1657    fn lpop_returns_front() {
1658        let mut ks = Keyspace::new();
1659        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1660            .unwrap();
1661        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("a")));
1662        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("b")));
1663        assert_eq!(ks.lpop("list").unwrap(), None); // empty, key deleted
1664    }
1665
1666    #[test]
1667    fn rpop_returns_back() {
1668        let mut ks = Keyspace::new();
1669        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1670            .unwrap();
1671        assert_eq!(ks.rpop("list").unwrap(), Some(Bytes::from("b")));
1672    }
1673
1674    #[test]
1675    fn pop_from_missing_key() {
1676        let mut ks = Keyspace::new();
1677        assert_eq!(ks.lpop("nope").unwrap(), None);
1678        assert_eq!(ks.rpop("nope").unwrap(), None);
1679    }
1680
1681    #[test]
1682    fn empty_list_auto_deletes_key() {
1683        let mut ks = Keyspace::new();
1684        ks.rpush("list", &[Bytes::from("only")]).unwrap();
1685        ks.lpop("list").unwrap();
1686        assert!(!ks.exists("list"));
1687        assert_eq!(ks.stats().key_count, 0);
1688        assert_eq!(ks.stats().used_bytes, 0);
1689    }
1690
1691    #[test]
1692    fn lrange_negative_indices() {
1693        let mut ks = Keyspace::new();
1694        ks.rpush(
1695            "list",
1696            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
1697        )
1698        .unwrap();
1699        // -2 to -1 => last two elements
1700        let items = ks.lrange("list", -2, -1).unwrap();
1701        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
1702    }
1703
1704    #[test]
1705    fn lrange_out_of_bounds_clamps() {
1706        let mut ks = Keyspace::new();
1707        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1708            .unwrap();
1709        let items = ks.lrange("list", -100, 100).unwrap();
1710        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
1711    }
1712
1713    #[test]
1714    fn lrange_missing_key_returns_empty() {
1715        let mut ks = Keyspace::new();
1716        assert_eq!(ks.lrange("nope", 0, -1).unwrap(), Vec::<Bytes>::new());
1717    }
1718
1719    #[test]
1720    fn llen_returns_length() {
1721        let mut ks = Keyspace::new();
1722        assert_eq!(ks.llen("nope").unwrap(), 0);
1723        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1724            .unwrap();
1725        assert_eq!(ks.llen("list").unwrap(), 2);
1726    }
1727
1728    #[test]
1729    fn list_memory_tracked_on_push_pop() {
1730        let mut ks = Keyspace::new();
1731        ks.rpush("list", &[Bytes::from("hello")]).unwrap();
1732        let after_push = ks.stats().used_bytes;
1733        assert!(after_push > 0);
1734
1735        ks.rpush("list", &[Bytes::from("world")]).unwrap();
1736        let after_second = ks.stats().used_bytes;
1737        assert!(after_second > after_push);
1738
1739        ks.lpop("list").unwrap();
1740        let after_pop = ks.stats().used_bytes;
1741        assert!(after_pop < after_second);
1742    }
1743
1744    #[test]
1745    fn lpush_on_string_key_returns_wrongtype() {
1746        let mut ks = Keyspace::new();
1747        ks.set("s".into(), Bytes::from("val"), None);
1748        assert!(ks.lpush("s", &[Bytes::from("nope")]).is_err());
1749    }
1750
1751    #[test]
1752    fn lrange_on_string_key_returns_wrongtype() {
1753        let mut ks = Keyspace::new();
1754        ks.set("s".into(), Bytes::from("val"), None);
1755        assert!(ks.lrange("s", 0, -1).is_err());
1756    }
1757
1758    #[test]
1759    fn llen_on_string_key_returns_wrongtype() {
1760        let mut ks = Keyspace::new();
1761        ks.set("s".into(), Bytes::from("val"), None);
1762        assert!(ks.llen("s").is_err());
1763    }
1764
1765    // -- wrongtype tests --
1766
1767    #[test]
1768    fn get_on_list_key_returns_wrongtype() {
1769        let mut ks = Keyspace::new();
1770        let mut list = std::collections::VecDeque::new();
1771        list.push_back(Bytes::from("item"));
1772        ks.restore("mylist".into(), Value::List(list), None);
1773
1774        assert!(ks.get("mylist").is_err());
1775    }
1776
1777    #[test]
1778    fn value_type_returns_correct_types() {
1779        let mut ks = Keyspace::new();
1780        assert_eq!(ks.value_type("missing"), "none");
1781
1782        ks.set("s".into(), Bytes::from("val"), None);
1783        assert_eq!(ks.value_type("s"), "string");
1784
1785        let mut list = std::collections::VecDeque::new();
1786        list.push_back(Bytes::from("item"));
1787        ks.restore("l".into(), Value::List(list), None);
1788        assert_eq!(ks.value_type("l"), "list");
1789
1790        ks.zadd("z", &[(1.0, "a".into())], &ZAddFlags::default())
1791            .unwrap();
1792        assert_eq!(ks.value_type("z"), "zset");
1793    }
1794
1795    // -- sorted set tests --
1796
1797    #[test]
1798    fn zadd_creates_sorted_set() {
1799        let mut ks = Keyspace::new();
1800        let result = ks
1801            .zadd(
1802                "board",
1803                &[(100.0, "alice".into()), (200.0, "bob".into())],
1804                &ZAddFlags::default(),
1805            )
1806            .unwrap();
1807        assert_eq!(result.count, 2);
1808        assert_eq!(result.applied.len(), 2);
1809        assert_eq!(ks.value_type("board"), "zset");
1810    }
1811
1812    #[test]
1813    fn zadd_updates_existing_score() {
1814        let mut ks = Keyspace::new();
1815        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1816            .unwrap();
1817        // update score — default flags don't count updates
1818        let result = ks
1819            .zadd("z", &[(200.0, "alice".into())], &ZAddFlags::default())
1820            .unwrap();
1821        assert_eq!(result.count, 0);
1822        // score was updated, so applied should have the member
1823        assert_eq!(result.applied.len(), 1);
1824        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
1825    }
1826
1827    #[test]
1828    fn zadd_ch_flag_counts_changes() {
1829        let mut ks = Keyspace::new();
1830        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1831            .unwrap();
1832        let flags = ZAddFlags {
1833            ch: true,
1834            ..Default::default()
1835        };
1836        let result = ks
1837            .zadd(
1838                "z",
1839                &[(200.0, "alice".into()), (50.0, "bob".into())],
1840                &flags,
1841            )
1842            .unwrap();
1843        // 1 updated + 1 added = 2
1844        assert_eq!(result.count, 2);
1845        assert_eq!(result.applied.len(), 2);
1846    }
1847
1848    #[test]
1849    fn zadd_nx_skips_existing() {
1850        let mut ks = Keyspace::new();
1851        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1852            .unwrap();
1853        let flags = ZAddFlags {
1854            nx: true,
1855            ..Default::default()
1856        };
1857        let result = ks.zadd("z", &[(999.0, "alice".into())], &flags).unwrap();
1858        assert_eq!(result.count, 0);
1859        assert!(result.applied.is_empty());
1860        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1861    }
1862
1863    #[test]
1864    fn zadd_xx_skips_new() {
1865        let mut ks = Keyspace::new();
1866        let flags = ZAddFlags {
1867            xx: true,
1868            ..Default::default()
1869        };
1870        let result = ks.zadd("z", &[(100.0, "alice".into())], &flags).unwrap();
1871        assert_eq!(result.count, 0);
1872        assert!(result.applied.is_empty());
1873        // key should be cleaned up since nothing was added
1874        assert_eq!(ks.value_type("z"), "none");
1875    }
1876
1877    #[test]
1878    fn zadd_gt_only_increases() {
1879        let mut ks = Keyspace::new();
1880        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1881            .unwrap();
1882        let flags = ZAddFlags {
1883            gt: true,
1884            ..Default::default()
1885        };
1886        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
1887        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1888        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
1889        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
1890    }
1891
1892    #[test]
1893    fn zadd_lt_only_decreases() {
1894        let mut ks = Keyspace::new();
1895        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1896            .unwrap();
1897        let flags = ZAddFlags {
1898            lt: true,
1899            ..Default::default()
1900        };
1901        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
1902        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1903        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
1904        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(50.0));
1905    }
1906
1907    #[test]
1908    fn zrem_removes_members() {
1909        let mut ks = Keyspace::new();
1910        ks.zadd(
1911            "z",
1912            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1913            &ZAddFlags::default(),
1914        )
1915        .unwrap();
1916        let removed = ks
1917            .zrem("z", &["a".into(), "c".into(), "nonexistent".into()])
1918            .unwrap();
1919        assert_eq!(removed.len(), 2);
1920        assert!(removed.contains(&"a".to_owned()));
1921        assert!(removed.contains(&"c".to_owned()));
1922        assert_eq!(ks.zscore("z", "a").unwrap(), None);
1923        assert_eq!(ks.zscore("z", "b").unwrap(), Some(2.0));
1924    }
1925
1926    #[test]
1927    fn zrem_auto_deletes_empty() {
1928        let mut ks = Keyspace::new();
1929        ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
1930            .unwrap();
1931        ks.zrem("z", &["only".into()]).unwrap();
1932        assert!(!ks.exists("z"));
1933        assert_eq!(ks.stats().key_count, 0);
1934    }
1935
1936    #[test]
1937    fn zrem_missing_key() {
1938        let mut ks = Keyspace::new();
1939        assert!(ks.zrem("nope", &["a".into()]).unwrap().is_empty());
1940    }
1941
1942    #[test]
1943    fn zscore_returns_score() {
1944        let mut ks = Keyspace::new();
1945        ks.zadd("z", &[(42.5, "member".into())], &ZAddFlags::default())
1946            .unwrap();
1947        assert_eq!(ks.zscore("z", "member").unwrap(), Some(42.5));
1948        assert_eq!(ks.zscore("z", "missing").unwrap(), None);
1949    }
1950
1951    #[test]
1952    fn zscore_missing_key() {
1953        let mut ks = Keyspace::new();
1954        assert_eq!(ks.zscore("nope", "m").unwrap(), None);
1955    }
1956
1957    #[test]
1958    fn zrank_returns_rank() {
1959        let mut ks = Keyspace::new();
1960        ks.zadd(
1961            "z",
1962            &[
1963                (300.0, "c".into()),
1964                (100.0, "a".into()),
1965                (200.0, "b".into()),
1966            ],
1967            &ZAddFlags::default(),
1968        )
1969        .unwrap();
1970        assert_eq!(ks.zrank("z", "a").unwrap(), Some(0));
1971        assert_eq!(ks.zrank("z", "b").unwrap(), Some(1));
1972        assert_eq!(ks.zrank("z", "c").unwrap(), Some(2));
1973        assert_eq!(ks.zrank("z", "d").unwrap(), None);
1974    }
1975
1976    #[test]
1977    fn zrange_returns_range() {
1978        let mut ks = Keyspace::new();
1979        ks.zadd(
1980            "z",
1981            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1982            &ZAddFlags::default(),
1983        )
1984        .unwrap();
1985
1986        let all = ks.zrange("z", 0, -1).unwrap();
1987        assert_eq!(
1988            all,
1989            vec![
1990                ("a".to_owned(), 1.0),
1991                ("b".to_owned(), 2.0),
1992                ("c".to_owned(), 3.0),
1993            ]
1994        );
1995
1996        let middle = ks.zrange("z", 1, 1).unwrap();
1997        assert_eq!(middle, vec![("b".to_owned(), 2.0)]);
1998
1999        let last_two = ks.zrange("z", -2, -1).unwrap();
2000        assert_eq!(last_two, vec![("b".to_owned(), 2.0), ("c".to_owned(), 3.0)]);
2001    }
2002
2003    #[test]
2004    fn zrange_missing_key() {
2005        let mut ks = Keyspace::new();
2006        assert!(ks.zrange("nope", 0, -1).unwrap().is_empty());
2007    }
2008
2009    #[test]
2010    fn zadd_on_string_key_returns_wrongtype() {
2011        let mut ks = Keyspace::new();
2012        ks.set("s".into(), Bytes::from("val"), None);
2013        assert!(ks
2014            .zadd("s", &[(1.0, "m".into())], &ZAddFlags::default())
2015            .is_err());
2016    }
2017
2018    #[test]
2019    fn zrem_on_string_key_returns_wrongtype() {
2020        let mut ks = Keyspace::new();
2021        ks.set("s".into(), Bytes::from("val"), None);
2022        assert!(ks.zrem("s", &["m".into()]).is_err());
2023    }
2024
2025    #[test]
2026    fn zscore_on_list_key_returns_wrongtype() {
2027        let mut ks = Keyspace::new();
2028        ks.rpush("l", &[Bytes::from("item")]).unwrap();
2029        assert!(ks.zscore("l", "m").is_err());
2030    }
2031
2032    #[test]
2033    fn zrank_on_string_key_returns_wrongtype() {
2034        let mut ks = Keyspace::new();
2035        ks.set("s".into(), Bytes::from("val"), None);
2036        assert!(ks.zrank("s", "m").is_err());
2037    }
2038
2039    #[test]
2040    fn zrange_on_string_key_returns_wrongtype() {
2041        let mut ks = Keyspace::new();
2042        ks.set("s".into(), Bytes::from("val"), None);
2043        assert!(ks.zrange("s", 0, -1).is_err());
2044    }
2045
2046    #[test]
2047    fn sorted_set_memory_tracked() {
2048        let mut ks = Keyspace::new();
2049        let before = ks.stats().used_bytes;
2050        ks.zadd("z", &[(1.0, "alice".into())], &ZAddFlags::default())
2051            .unwrap();
2052        let after_add = ks.stats().used_bytes;
2053        assert!(after_add > before);
2054
2055        ks.zadd("z", &[(2.0, "bob".into())], &ZAddFlags::default())
2056            .unwrap();
2057        let after_second = ks.stats().used_bytes;
2058        assert!(after_second > after_add);
2059
2060        ks.zrem("z", &["alice".into()]).unwrap();
2061        let after_remove = ks.stats().used_bytes;
2062        assert!(after_remove < after_second);
2063    }
2064
2065    #[test]
2066    fn incr_new_key_defaults_to_zero() {
2067        let mut ks = Keyspace::new();
2068        assert_eq!(ks.incr("counter").unwrap(), 1);
2069        // verify the stored value
2070        match ks.get("counter").unwrap() {
2071            Some(Value::String(data)) => assert_eq!(data, Bytes::from("1")),
2072            other => panic!("expected String(\"1\"), got {other:?}"),
2073        }
2074    }
2075
2076    #[test]
2077    fn incr_existing_value() {
2078        let mut ks = Keyspace::new();
2079        ks.set("n".into(), Bytes::from("10"), None);
2080        assert_eq!(ks.incr("n").unwrap(), 11);
2081    }
2082
2083    #[test]
2084    fn decr_new_key_defaults_to_zero() {
2085        let mut ks = Keyspace::new();
2086        assert_eq!(ks.decr("counter").unwrap(), -1);
2087    }
2088
2089    #[test]
2090    fn decr_existing_value() {
2091        let mut ks = Keyspace::new();
2092        ks.set("n".into(), Bytes::from("10"), None);
2093        assert_eq!(ks.decr("n").unwrap(), 9);
2094    }
2095
2096    #[test]
2097    fn incr_non_integer_returns_error() {
2098        let mut ks = Keyspace::new();
2099        ks.set("s".into(), Bytes::from("notanum"), None);
2100        assert_eq!(ks.incr("s").unwrap_err(), IncrError::NotAnInteger);
2101    }
2102
2103    #[test]
2104    fn incr_on_list_returns_wrongtype() {
2105        let mut ks = Keyspace::new();
2106        ks.lpush("list", &[Bytes::from("a")]).unwrap();
2107        assert_eq!(ks.incr("list").unwrap_err(), IncrError::WrongType);
2108    }
2109
2110    #[test]
2111    fn incr_overflow_returns_error() {
2112        let mut ks = Keyspace::new();
2113        ks.set("max".into(), Bytes::from(i64::MAX.to_string()), None);
2114        assert_eq!(ks.incr("max").unwrap_err(), IncrError::Overflow);
2115    }
2116
2117    #[test]
2118    fn decr_overflow_returns_error() {
2119        let mut ks = Keyspace::new();
2120        ks.set("min".into(), Bytes::from(i64::MIN.to_string()), None);
2121        assert_eq!(ks.decr("min").unwrap_err(), IncrError::Overflow);
2122    }
2123
2124    #[test]
2125    fn incr_preserves_ttl() {
2126        let mut ks = Keyspace::new();
2127        ks.set("n".into(), Bytes::from("5"), Some(Duration::from_secs(60)));
2128        ks.incr("n").unwrap();
2129        match ks.ttl("n") {
2130            TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
2131            other => panic!("expected TTL preserved, got {other:?}"),
2132        }
2133    }
2134
2135    #[test]
2136    fn zrem_returns_actually_removed_members() {
2137        let mut ks = Keyspace::new();
2138        ks.zadd(
2139            "z",
2140            &[(1.0, "a".into()), (2.0, "b".into())],
2141            &ZAddFlags::default(),
2142        )
2143        .unwrap();
2144        // "a" exists, "ghost" doesn't — only "a" should be in the result
2145        let removed = ks.zrem("z", &["a".into(), "ghost".into()]).unwrap();
2146        assert_eq!(removed, vec!["a".to_owned()]);
2147    }
2148
2149    #[test]
2150    fn zcard_returns_count() {
2151        let mut ks = Keyspace::new();
2152        ks.zadd(
2153            "z",
2154            &[(1.0, "a".into()), (2.0, "b".into())],
2155            &ZAddFlags::default(),
2156        )
2157        .unwrap();
2158        assert_eq!(ks.zcard("z").unwrap(), 2);
2159    }
2160
2161    #[test]
2162    fn zcard_missing_key_returns_zero() {
2163        let mut ks = Keyspace::new();
2164        assert_eq!(ks.zcard("missing").unwrap(), 0);
2165    }
2166
2167    #[test]
2168    fn zcard_on_string_key_returns_wrongtype() {
2169        let mut ks = Keyspace::new();
2170        ks.set("s".into(), Bytes::from("val"), None);
2171        assert!(ks.zcard("s").is_err());
2172    }
2173
2174    #[test]
2175    fn persist_removes_expiry() {
2176        let mut ks = Keyspace::new();
2177        ks.set(
2178            "key".into(),
2179            Bytes::from("val"),
2180            Some(Duration::from_secs(60)),
2181        );
2182        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2183
2184        assert!(ks.persist("key"));
2185        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2186        assert_eq!(ks.stats().keys_with_expiry, 0);
2187    }
2188
2189    #[test]
2190    fn persist_returns_false_without_expiry() {
2191        let mut ks = Keyspace::new();
2192        ks.set("key".into(), Bytes::from("val"), None);
2193        assert!(!ks.persist("key"));
2194    }
2195
2196    #[test]
2197    fn persist_returns_false_for_missing_key() {
2198        let mut ks = Keyspace::new();
2199        assert!(!ks.persist("missing"));
2200    }
2201
2202    #[test]
2203    fn pttl_returns_milliseconds() {
2204        let mut ks = Keyspace::new();
2205        ks.set(
2206            "key".into(),
2207            Bytes::from("val"),
2208            Some(Duration::from_secs(60)),
2209        );
2210        match ks.pttl("key") {
2211            TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2212            other => panic!("expected Milliseconds, got {other:?}"),
2213        }
2214    }
2215
2216    #[test]
2217    fn pttl_no_expiry() {
2218        let mut ks = Keyspace::new();
2219        ks.set("key".into(), Bytes::from("val"), None);
2220        assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2221    }
2222
2223    #[test]
2224    fn pttl_not_found() {
2225        let mut ks = Keyspace::new();
2226        assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2227    }
2228
2229    #[test]
2230    fn pexpire_sets_ttl_in_millis() {
2231        let mut ks = Keyspace::new();
2232        ks.set("key".into(), Bytes::from("val"), None);
2233        assert!(ks.pexpire("key", 5000));
2234        match ks.pttl("key") {
2235            TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2236            other => panic!("expected Milliseconds, got {other:?}"),
2237        }
2238        assert_eq!(ks.stats().keys_with_expiry, 1);
2239    }
2240
2241    #[test]
2242    fn pexpire_missing_key_returns_false() {
2243        let mut ks = Keyspace::new();
2244        assert!(!ks.pexpire("missing", 5000));
2245    }
2246
2247    #[test]
2248    fn pexpire_overwrites_existing_ttl() {
2249        let mut ks = Keyspace::new();
2250        ks.set(
2251            "key".into(),
2252            Bytes::from("val"),
2253            Some(Duration::from_secs(60)),
2254        );
2255        assert!(ks.pexpire("key", 500));
2256        match ks.pttl("key") {
2257            TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2258            other => panic!("expected Milliseconds, got {other:?}"),
2259        }
2260        // expiry count shouldn't double-count
2261        assert_eq!(ks.stats().keys_with_expiry, 1);
2262    }
2263
2264    // -- memory limit enforcement for list/zset --
2265
2266    #[test]
2267    fn lpush_rejects_when_memory_full() {
2268        let config = ShardConfig {
2269            max_memory: Some(150),
2270            eviction_policy: EvictionPolicy::NoEviction,
2271            ..ShardConfig::default()
2272        };
2273        let mut ks = Keyspace::with_config(config);
2274
2275        // first key eats up most of the budget
2276        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2277
2278        // lpush should be rejected — not enough room
2279        let result = ks.lpush("list", &[Bytes::from("big-value-here")]);
2280        assert_eq!(result, Err(WriteError::OutOfMemory));
2281
2282        // original key should be untouched
2283        assert!(ks.exists("a"));
2284    }
2285
2286    #[test]
2287    fn rpush_rejects_when_memory_full() {
2288        let config = ShardConfig {
2289            max_memory: Some(150),
2290            eviction_policy: EvictionPolicy::NoEviction,
2291            ..ShardConfig::default()
2292        };
2293        let mut ks = Keyspace::with_config(config);
2294
2295        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2296
2297        let result = ks.rpush("list", &[Bytes::from("big-value-here")]);
2298        assert_eq!(result, Err(WriteError::OutOfMemory));
2299    }
2300
2301    #[test]
2302    fn zadd_rejects_when_memory_full() {
2303        let config = ShardConfig {
2304            max_memory: Some(150),
2305            eviction_policy: EvictionPolicy::NoEviction,
2306            ..ShardConfig::default()
2307        };
2308        let mut ks = Keyspace::with_config(config);
2309
2310        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2311
2312        let result = ks.zadd("z", &[(1.0, "member".into())], &ZAddFlags::default());
2313        assert!(matches!(result, Err(WriteError::OutOfMemory)));
2314
2315        // original key should be untouched
2316        assert!(ks.exists("a"));
2317    }
2318
2319    #[test]
2320    fn lpush_evicts_under_lru_policy() {
2321        let config = ShardConfig {
2322            max_memory: Some(200),
2323            eviction_policy: EvictionPolicy::AllKeysLru,
2324            ..ShardConfig::default()
2325        };
2326        let mut ks = Keyspace::with_config(config);
2327
2328        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2329
2330        // should evict "a" to make room for the list
2331        assert!(ks.lpush("list", &[Bytes::from("item")]).is_ok());
2332        assert!(!ks.exists("a"));
2333    }
2334
2335    #[test]
2336    fn clear_removes_all_keys() {
2337        let mut ks = Keyspace::new();
2338        ks.set("a".into(), Bytes::from("1"), None);
2339        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(60)));
2340        ks.lpush("list", &[Bytes::from("x")]).unwrap();
2341
2342        assert_eq!(ks.len(), 3);
2343        assert!(ks.stats().used_bytes > 0);
2344        assert_eq!(ks.stats().keys_with_expiry, 1);
2345
2346        ks.clear();
2347
2348        assert_eq!(ks.len(), 0);
2349        assert!(ks.is_empty());
2350        assert_eq!(ks.stats().used_bytes, 0);
2351        assert_eq!(ks.stats().keys_with_expiry, 0);
2352    }
2353
2354    // --- scan ---
2355
2356    #[test]
2357    fn scan_returns_keys() {
2358        let mut ks = Keyspace::new();
2359        ks.set("key1".into(), Bytes::from("a"), None);
2360        ks.set("key2".into(), Bytes::from("b"), None);
2361        ks.set("key3".into(), Bytes::from("c"), None);
2362
2363        let (cursor, keys) = ks.scan_keys(0, 10, None);
2364        assert_eq!(cursor, 0); // complete in one pass
2365        assert_eq!(keys.len(), 3);
2366    }
2367
2368    #[test]
2369    fn scan_empty_keyspace() {
2370        let ks = Keyspace::new();
2371        let (cursor, keys) = ks.scan_keys(0, 10, None);
2372        assert_eq!(cursor, 0);
2373        assert!(keys.is_empty());
2374    }
2375
2376    #[test]
2377    fn scan_with_pattern() {
2378        let mut ks = Keyspace::new();
2379        ks.set("user:1".into(), Bytes::from("a"), None);
2380        ks.set("user:2".into(), Bytes::from("b"), None);
2381        ks.set("item:1".into(), Bytes::from("c"), None);
2382
2383        let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
2384        assert_eq!(cursor, 0);
2385        assert_eq!(keys.len(), 2);
2386        for k in &keys {
2387            assert!(k.starts_with("user:"));
2388        }
2389    }
2390
2391    #[test]
2392    fn scan_with_count_limit() {
2393        let mut ks = Keyspace::new();
2394        for i in 0..10 {
2395            ks.set(format!("k{i}"), Bytes::from("v"), None);
2396        }
2397
2398        // first batch
2399        let (cursor, keys) = ks.scan_keys(0, 3, None);
2400        assert!(!keys.is_empty());
2401        assert!(keys.len() <= 3);
2402
2403        // if there are more keys, cursor should be non-zero
2404        if cursor != 0 {
2405            let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
2406            assert!(!keys2.is_empty());
2407            // continue until complete
2408            let _ = (cursor2, keys2);
2409        }
2410    }
2411
2412    #[test]
2413    fn scan_skips_expired_keys() {
2414        let mut ks = Keyspace::new();
2415        ks.set("live".into(), Bytes::from("a"), None);
2416        ks.set(
2417            "expired".into(),
2418            Bytes::from("b"),
2419            Some(Duration::from_millis(1)),
2420        );
2421
2422        std::thread::sleep(Duration::from_millis(5));
2423
2424        let (_, keys) = ks.scan_keys(0, 10, None);
2425        assert_eq!(keys.len(), 1);
2426        assert_eq!(keys[0], "live");
2427    }
2428
2429    #[test]
2430    fn glob_match_star() {
2431        assert!(super::glob_match("user:*", "user:123"));
2432        assert!(super::glob_match("user:*", "user:"));
2433        assert!(super::glob_match("*:data", "foo:data"));
2434        assert!(!super::glob_match("user:*", "item:123"));
2435    }
2436
2437    #[test]
2438    fn glob_match_question() {
2439        assert!(super::glob_match("key?", "key1"));
2440        assert!(super::glob_match("key?", "keya"));
2441        assert!(!super::glob_match("key?", "key"));
2442        assert!(!super::glob_match("key?", "key12"));
2443    }
2444
2445    #[test]
2446    fn glob_match_brackets() {
2447        assert!(super::glob_match("key[abc]", "keya"));
2448        assert!(super::glob_match("key[abc]", "keyb"));
2449        assert!(!super::glob_match("key[abc]", "keyd"));
2450    }
2451
2452    #[test]
2453    fn glob_match_literal() {
2454        assert!(super::glob_match("exact", "exact"));
2455        assert!(!super::glob_match("exact", "exactnot"));
2456        assert!(!super::glob_match("exact", "notexact"));
2457    }
2458}