Skip to main content

ember_core/
keyspace.rs

1//! The keyspace: Ember's core key-value store.
2//!
3//! A `Keyspace` owns a flat `HashMap<String, Entry>` and handles
4//! get, set, delete, existence checks, and TTL management. Expired
5//! keys are removed lazily on access. Memory usage is tracked on
6//! every mutation for eviction and stats reporting.
7
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11use bytes::Bytes;
12use rand::seq::IteratorRandom;
13
14use crate::memory::{self, MemoryTracker};
15use crate::types::sorted_set::{SortedSet, ZAddFlags};
16use crate::types::{self, normalize_range, Value};
17
18/// Error returned when a command is used against a key holding the wrong type.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct WrongType;
21
22impl std::fmt::Display for WrongType {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        write!(
25            f,
26            "WRONGTYPE Operation against a key holding the wrong kind of value"
27        )
28    }
29}
30
31impl std::error::Error for WrongType {}
32
33/// Error returned by write operations that may fail due to type mismatch
34/// or memory limits.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum WriteError {
37    /// The key holds a different type than expected.
38    WrongType,
39    /// Memory limit reached and eviction couldn't free enough space.
40    OutOfMemory,
41}
42
43impl From<WrongType> for WriteError {
44    fn from(_: WrongType) -> Self {
45        WriteError::WrongType
46    }
47}
48
49/// Errors that can occur during INCR/DECR operations.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum IncrError {
52    /// Key holds a non-string type.
53    WrongType,
54    /// Value is not a valid integer.
55    NotAnInteger,
56    /// Increment or decrement would overflow i64.
57    Overflow,
58    /// Memory limit reached.
59    OutOfMemory,
60}
61
62impl std::fmt::Display for IncrError {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            IncrError::WrongType => write!(
66                f,
67                "WRONGTYPE Operation against a key holding the wrong kind of value"
68            ),
69            IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
70            IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
71            IncrError::OutOfMemory => {
72                write!(f, "OOM command not allowed when used memory > 'maxmemory'")
73            }
74        }
75    }
76}
77
78impl std::error::Error for IncrError {}
79/// Result of a ZADD operation, containing both the client-facing count
80/// and the list of members that were actually applied (for AOF correctness).
81#[derive(Debug, Clone)]
82pub struct ZAddResult {
83    /// Number of members added (or added+updated if CH flag was set).
84    pub count: usize,
85    /// Members that were actually inserted or had their score updated.
86    /// Only these should be persisted to the AOF.
87    pub applied: Vec<(f64, String)>,
88}
89
90/// How the keyspace should handle writes when the memory limit is reached.
91#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
92pub enum EvictionPolicy {
93    /// Return an error on writes when memory is full.
94    #[default]
95    NoEviction,
96    /// Evict the least-recently-used key (approximated via random sampling).
97    AllKeysLru,
98}
99
100/// Configuration for a single keyspace / shard.
101#[derive(Debug, Clone)]
102pub struct ShardConfig {
103    /// Maximum memory in bytes. `None` means unlimited.
104    pub max_memory: Option<usize>,
105    /// What to do when memory is full.
106    pub eviction_policy: EvictionPolicy,
107    /// Numeric identifier for this shard (used for persistence file naming).
108    pub shard_id: u16,
109}
110
111impl Default for ShardConfig {
112    fn default() -> Self {
113        Self {
114            max_memory: None,
115            eviction_policy: EvictionPolicy::NoEviction,
116            shard_id: 0,
117        }
118    }
119}
120
121/// Result of a set operation that may fail under memory pressure.
122#[derive(Debug, PartialEq, Eq)]
123pub enum SetResult {
124    /// The key was stored successfully.
125    Ok,
126    /// Memory limit reached and eviction policy is NoEviction.
127    OutOfMemory,
128}
129
130/// A single entry in the keyspace: a value plus optional expiration
131/// and last access time for LRU approximation.
132#[derive(Debug, Clone)]
133pub(crate) struct Entry {
134    pub(crate) value: Value,
135    pub(crate) expires_at: Option<Instant>,
136    pub(crate) last_access: Instant,
137}
138
139impl Entry {
140    fn new(value: Value, expires_at: Option<Instant>) -> Self {
141        Self {
142            value,
143            expires_at,
144            last_access: Instant::now(),
145        }
146    }
147
148    /// Returns `true` if this entry has passed its expiration time.
149    fn is_expired(&self) -> bool {
150        match self.expires_at {
151            Some(deadline) => Instant::now() >= deadline,
152            None => false,
153        }
154    }
155
156    /// Marks this entry as accessed right now.
157    fn touch(&mut self) {
158        self.last_access = Instant::now();
159    }
160}
161
162/// Result of a TTL query, matching Redis semantics.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum TtlResult {
165    /// Key exists and has a TTL. Returns remaining seconds.
166    Seconds(u64),
167    /// Key exists and has a TTL. Returns remaining milliseconds.
168    Milliseconds(u64),
169    /// Key exists but has no expiration set.
170    NoExpiry,
171    /// Key does not exist.
172    NotFound,
173}
174
175/// Aggregated statistics for a keyspace.
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct KeyspaceStats {
178    /// Number of live keys.
179    pub key_count: usize,
180    /// Estimated memory usage in bytes.
181    pub used_bytes: usize,
182    /// Number of keys with an expiration set.
183    pub keys_with_expiry: usize,
184}
185
186/// Number of random keys to sample when looking for an eviction candidate.
187///
188/// Eviction uses sampling-based approximate LRU — we randomly select this many
189/// keys and evict the least-recently-accessed among them. This trades perfect
190/// LRU accuracy for O(1) eviction (no sorted structure to maintain).
191///
192/// Larger sample sizes give better LRU approximation but cost more per eviction.
193/// 16 is a reasonable balance — similar to Redis's default sample size. With
194/// 16 samples, we statistically find a good eviction candidate while keeping
195/// eviction overhead low even at millions of keys.
196const EVICTION_SAMPLE_SIZE: usize = 16;
197
198/// The core key-value store.
199///
200/// All operations are single-threaded per shard — no internal locking.
201/// Memory usage is tracked incrementally on every mutation.
202pub struct Keyspace {
203    entries: HashMap<String, Entry>,
204    memory: MemoryTracker,
205    config: ShardConfig,
206    /// Number of entries that currently have an expiration set.
207    expiry_count: usize,
208}
209
210impl Keyspace {
211    /// Creates a new, empty keyspace with default config (no memory limit).
212    pub fn new() -> Self {
213        Self::with_config(ShardConfig::default())
214    }
215
216    /// Creates a new, empty keyspace with the given config.
217    pub fn with_config(config: ShardConfig) -> Self {
218        Self {
219            entries: HashMap::new(),
220            memory: MemoryTracker::new(),
221            config,
222            expiry_count: 0,
223        }
224    }
225
226    /// Retrieves the string value for `key`, or `None` if missing/expired.
227    ///
228    /// Returns `Err(WrongType)` if the key holds a non-string value.
229    /// Expired keys are removed lazily on access. Successful reads update
230    /// the entry's last access time for LRU tracking.
231    pub fn get(&mut self, key: &str) -> Result<Option<Value>, WrongType> {
232        if self.remove_if_expired(key) {
233            return Ok(None);
234        }
235        match self.entries.get_mut(key) {
236            Some(e) => match &e.value {
237                Value::String(_) => {
238                    e.touch();
239                    Ok(Some(e.value.clone()))
240                }
241                _ => Err(WrongType),
242            },
243            None => Ok(None),
244        }
245    }
246
247    /// Returns the type name of the value at `key`, or "none" if missing.
248    pub fn value_type(&mut self, key: &str) -> &'static str {
249        if self.remove_if_expired(key) {
250            return "none";
251        }
252        match self.entries.get(key) {
253            Some(e) => types::type_name(&e.value),
254            None => "none",
255        }
256    }
257
258    /// Stores a key-value pair. If the key already existed, the old entry
259    /// (including any TTL) is replaced entirely.
260    ///
261    /// `expire` sets an optional TTL as a duration from now.
262    ///
263    /// Returns `SetResult::OutOfMemory` if the memory limit is reached
264    /// and the eviction policy is `NoEviction`. With `AllKeysLru`, this
265    /// will evict keys to make room before inserting.
266    pub fn set(&mut self, key: String, value: Bytes, expire: Option<Duration>) -> SetResult {
267        let expires_at = expire.map(|d| Instant::now() + d);
268        let new_value = Value::String(value);
269
270        // check memory limit — for overwrites, only the net increase matters
271        let new_size = memory::entry_size(&key, &new_value);
272        let old_size = self
273            .entries
274            .get(&key)
275            .map(|e| memory::entry_size(&key, &e.value))
276            .unwrap_or(0);
277        let net_increase = new_size.saturating_sub(old_size);
278
279        if !self.enforce_memory_limit(net_increase) {
280            return SetResult::OutOfMemory;
281        }
282
283        if let Some(old_entry) = self.entries.get(&key) {
284            self.memory.replace(&key, &old_entry.value, &new_value);
285            // adjust expiry count if the TTL status changed
286            let had_expiry = old_entry.expires_at.is_some();
287            let has_expiry = expires_at.is_some();
288            match (had_expiry, has_expiry) {
289                (false, true) => self.expiry_count += 1,
290                (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
291                _ => {}
292            }
293        } else {
294            self.memory.add(&key, &new_value);
295            if expires_at.is_some() {
296                self.expiry_count += 1;
297            }
298        }
299
300        self.entries.insert(key, Entry::new(new_value, expires_at));
301        SetResult::Ok
302    }
303
304    /// Tries to evict one key using LRU approximation.
305    ///
306    /// Randomly samples `EVICTION_SAMPLE_SIZE` keys and removes the one
307    /// with the oldest `last_access` time. Returns `true` if a key was
308    /// evicted, `false` if the keyspace is empty.
309    fn try_evict(&mut self) -> bool {
310        if self.entries.is_empty() {
311            return false;
312        }
313
314        let mut rng = rand::rng();
315
316        // randomly sample keys and find the least recently accessed one
317        let victim = self
318            .entries
319            .iter()
320            .choose_multiple(&mut rng, EVICTION_SAMPLE_SIZE)
321            .into_iter()
322            .min_by_key(|(_, entry)| entry.last_access)
323            .map(|(k, _)| k.clone());
324
325        if let Some(key) = victim {
326            if let Some(entry) = self.entries.remove(&key) {
327                self.memory.remove(&key, &entry.value);
328                if entry.expires_at.is_some() {
329                    self.expiry_count = self.expiry_count.saturating_sub(1);
330                }
331                return true;
332            }
333        }
334        false
335    }
336
337    /// Checks whether the memory limit allows a write that would increase
338    /// usage by `estimated_increase` bytes. Attempts eviction if the
339    /// policy allows it. Returns `true` if the write can proceed.
340    fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
341        if let Some(max) = self.config.max_memory {
342            while self.memory.used_bytes() + estimated_increase > max {
343                match self.config.eviction_policy {
344                    EvictionPolicy::NoEviction => return false,
345                    EvictionPolicy::AllKeysLru => {
346                        if !self.try_evict() {
347                            return false;
348                        }
349                    }
350                }
351            }
352        }
353        true
354    }
355
356    /// Removes a key. Returns `true` if the key existed (and wasn't expired).
357    pub fn del(&mut self, key: &str) -> bool {
358        if self.remove_if_expired(key) {
359            return false;
360        }
361        if let Some(entry) = self.entries.remove(key) {
362            self.memory.remove(key, &entry.value);
363            if entry.expires_at.is_some() {
364                self.expiry_count = self.expiry_count.saturating_sub(1);
365            }
366            true
367        } else {
368            false
369        }
370    }
371
372    /// Returns `true` if the key exists and hasn't expired.
373    pub fn exists(&mut self, key: &str) -> bool {
374        if self.remove_if_expired(key) {
375            return false;
376        }
377        self.entries.contains_key(key)
378    }
379
380    /// Sets an expiration on an existing key. Returns `true` if the key
381    /// exists (and the TTL was set), `false` if the key doesn't exist.
382    pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
383        if self.remove_if_expired(key) {
384            return false;
385        }
386        match self.entries.get_mut(key) {
387            Some(entry) => {
388                if entry.expires_at.is_none() {
389                    self.expiry_count += 1;
390                }
391                entry.expires_at = Some(Instant::now() + Duration::from_secs(seconds));
392                true
393            }
394            None => false,
395        }
396    }
397
398    /// Returns the TTL status for a key, following Redis semantics:
399    /// - `Seconds(n)` if the key has a TTL
400    /// - `NoExpiry` if the key exists without a TTL
401    /// - `NotFound` if the key doesn't exist
402    pub fn ttl(&mut self, key: &str) -> TtlResult {
403        if self.remove_if_expired(key) {
404            return TtlResult::NotFound;
405        }
406        match self.entries.get(key) {
407            Some(entry) => match entry.expires_at {
408                Some(deadline) => {
409                    let remaining = deadline.saturating_duration_since(Instant::now());
410                    TtlResult::Seconds(remaining.as_secs())
411                }
412                None => TtlResult::NoExpiry,
413            },
414            None => TtlResult::NotFound,
415        }
416    }
417
418    /// Removes the expiration from a key.
419    ///
420    /// Returns `true` if the key existed and had a timeout that was removed.
421    /// Returns `false` if the key doesn't exist or has no expiration.
422    pub fn persist(&mut self, key: &str) -> bool {
423        if self.remove_if_expired(key) {
424            return false;
425        }
426        match self.entries.get_mut(key) {
427            Some(entry) => {
428                if entry.expires_at.is_some() {
429                    entry.expires_at = None;
430                    self.expiry_count = self.expiry_count.saturating_sub(1);
431                    true
432                } else {
433                    false
434                }
435            }
436            None => false,
437        }
438    }
439
440    /// Returns the TTL status for a key in milliseconds, following Redis semantics:
441    /// - `Milliseconds(n)` if the key has a TTL
442    /// - `NoExpiry` if the key exists without a TTL
443    /// - `NotFound` if the key doesn't exist
444    pub fn pttl(&mut self, key: &str) -> TtlResult {
445        if self.remove_if_expired(key) {
446            return TtlResult::NotFound;
447        }
448        match self.entries.get(key) {
449            Some(entry) => match entry.expires_at {
450                Some(deadline) => {
451                    let remaining = deadline.saturating_duration_since(Instant::now());
452                    TtlResult::Milliseconds(remaining.as_millis() as u64)
453                }
454                None => TtlResult::NoExpiry,
455            },
456            None => TtlResult::NotFound,
457        }
458    }
459
460    /// Sets an expiration on an existing key in milliseconds.
461    ///
462    /// Returns `true` if the key exists (and the TTL was set),
463    /// `false` if the key doesn't exist.
464    pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
465        if self.remove_if_expired(key) {
466            return false;
467        }
468        match self.entries.get_mut(key) {
469            Some(entry) => {
470                if entry.expires_at.is_none() {
471                    self.expiry_count += 1;
472                }
473                entry.expires_at = Some(Instant::now() + Duration::from_millis(millis));
474                true
475            }
476            None => false,
477        }
478    }
479
480    /// Increments the integer value of a key by 1.
481    ///
482    /// If the key doesn't exist, it's initialized to 0 before incrementing.
483    /// Returns the new value after the operation.
484    pub fn incr(&mut self, key: &str) -> Result<i64, IncrError> {
485        self.incr_by(key, 1)
486    }
487
488    /// Decrements the integer value of a key by 1.
489    ///
490    /// If the key doesn't exist, it's initialized to 0 before decrementing.
491    /// Returns the new value after the operation.
492    pub fn decr(&mut self, key: &str) -> Result<i64, IncrError> {
493        self.incr_by(key, -1)
494    }
495
496    /// Shared implementation for INCR/DECR. Adds `delta` to the current
497    /// integer value of the key, creating it if necessary.
498    ///
499    /// Preserves the existing TTL when updating an existing key.
500    fn incr_by(&mut self, key: &str, delta: i64) -> Result<i64, IncrError> {
501        self.remove_if_expired(key);
502
503        // read current value and TTL
504        let (current, existing_expire) = match self.entries.get(key) {
505            Some(entry) => {
506                let val = match &entry.value {
507                    Value::String(data) => {
508                        let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
509                        s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
510                    }
511                    _ => return Err(IncrError::WrongType),
512                };
513                let expire = entry
514                    .expires_at
515                    .map(|deadline| deadline.saturating_duration_since(Instant::now()));
516                (val, expire)
517            }
518            None => (0, None),
519        };
520
521        let new_val = current.checked_add(delta).ok_or(IncrError::Overflow)?;
522        let new_bytes = Bytes::from(new_val.to_string());
523
524        match self.set(key.to_owned(), new_bytes, existing_expire) {
525            SetResult::Ok => Ok(new_val),
526            SetResult::OutOfMemory => Err(IncrError::OutOfMemory),
527        }
528    }
529
530    /// Returns aggregated stats for this keyspace.
531    ///
532    /// All fields are tracked incrementally — this is O(1).
533    pub fn stats(&self) -> KeyspaceStats {
534        KeyspaceStats {
535            key_count: self.memory.key_count(),
536            used_bytes: self.memory.used_bytes(),
537            keys_with_expiry: self.expiry_count,
538        }
539    }
540
541    /// Returns the number of live keys.
542    pub fn len(&self) -> usize {
543        self.entries.len()
544    }
545
546    /// Removes all keys from the keyspace.
547    pub fn clear(&mut self) {
548        self.entries.clear();
549        self.memory.reset();
550        self.expiry_count = 0;
551    }
552
553    /// Returns `true` if the keyspace has no entries.
554    pub fn is_empty(&self) -> bool {
555        self.entries.is_empty()
556    }
557
558    /// Scans keys starting from a cursor position.
559    ///
560    /// Returns the next cursor (0 if scan complete) and a batch of keys.
561    /// The `pattern` argument supports glob-style matching (*, ?, [abc]).
562    pub fn scan_keys(
563        &self,
564        cursor: u64,
565        count: usize,
566        pattern: Option<&str>,
567    ) -> (u64, Vec<String>) {
568        let mut keys = Vec::with_capacity(count);
569        let mut position = 0u64;
570        let target_count = if count == 0 { 10 } else { count };
571
572        for (key, entry) in self.entries.iter() {
573            // skip expired entries
574            if entry.is_expired() {
575                continue;
576            }
577
578            // skip entries before cursor
579            if position < cursor {
580                position += 1;
581                continue;
582            }
583
584            // pattern matching
585            if let Some(pat) = pattern {
586                if !glob_match(pat, key) {
587                    position += 1;
588                    continue;
589                }
590            }
591
592            keys.push(key.clone());
593            position += 1;
594
595            if keys.len() >= target_count {
596                // return position as next cursor
597                return (position, keys);
598            }
599        }
600
601        // scan complete
602        (0, keys)
603    }
604
605    /// Iterates over all live (non-expired) entries, yielding the key, a
606    /// clone of the value, and the remaining TTL in milliseconds (-1 for
607    /// entries with no expiration). Used by snapshot and AOF rewrite.
608    pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
609        let now = Instant::now();
610        self.entries.iter().filter_map(move |(key, entry)| {
611            if entry.is_expired() {
612                return None;
613            }
614            let ttl_ms = match entry.expires_at {
615                Some(deadline) => {
616                    let remaining = deadline.saturating_duration_since(now);
617                    remaining.as_millis() as i64
618                }
619                None => -1,
620            };
621            Some((key.as_str(), &entry.value, ttl_ms))
622        })
623    }
624
625    /// Restores an entry during recovery, bypassing memory limits.
626    ///
627    /// If `expires_at` is in the past, the entry is silently skipped.
628    /// This is used only during shard startup when loading from
629    /// snapshot/AOF — normal writes should go through `set()`.
630    pub fn restore(&mut self, key: String, value: Value, expires_at: Option<Instant>) {
631        // skip entries that already expired
632        if let Some(deadline) = expires_at {
633            if Instant::now() >= deadline {
634                return;
635            }
636        }
637
638        // if replacing an existing entry, adjust memory tracking
639        if let Some(old) = self.entries.get(&key) {
640            self.memory.replace(&key, &old.value, &value);
641            let had_expiry = old.expires_at.is_some();
642            let has_expiry = expires_at.is_some();
643            match (had_expiry, has_expiry) {
644                (false, true) => self.expiry_count += 1,
645                (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
646                _ => {}
647            }
648        } else {
649            self.memory.add(&key, &value);
650            if expires_at.is_some() {
651                self.expiry_count += 1;
652            }
653        }
654
655        self.entries.insert(key, Entry::new(value, expires_at));
656    }
657
658    // -- list operations --
659
660    /// Pushes one or more values to the head (left) of a list.
661    ///
662    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
663    /// if the key exists but holds a non-list value, or
664    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
665    /// Returns the new length on success.
666    pub fn lpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
667        self.list_push(key, values, true)
668    }
669
670    /// Pushes one or more values to the tail (right) of a list.
671    ///
672    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
673    /// if the key exists but holds a non-list value, or
674    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
675    /// Returns the new length on success.
676    pub fn rpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
677        self.list_push(key, values, false)
678    }
679
680    /// Pops a value from the head (left) of a list.
681    ///
682    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
683    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
684    pub fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
685        self.list_pop(key, true)
686    }
687
688    /// Pops a value from the tail (right) of a list.
689    ///
690    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
691    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
692    pub fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
693        self.list_pop(key, false)
694    }
695
696    /// Returns a range of elements from a list by index.
697    ///
698    /// Supports negative indices (e.g. -1 = last element). Out-of-bounds
699    /// indices are clamped to the list boundaries. Returns `Err(WrongType)`
700    /// on type mismatch. Missing keys return an empty vec.
701    pub fn lrange(&mut self, key: &str, start: i64, stop: i64) -> Result<Vec<Bytes>, WrongType> {
702        if self.remove_if_expired(key) {
703            return Ok(vec![]);
704        }
705        match self.entries.get_mut(key) {
706            None => Ok(vec![]),
707            Some(entry) => {
708                let result = match &entry.value {
709                    Value::List(deque) => {
710                        let len = deque.len() as i64;
711                        let (s, e) = normalize_range(start, stop, len);
712                        if s > e {
713                            return Ok(vec![]);
714                        }
715                        Ok(deque
716                            .iter()
717                            .skip(s as usize)
718                            .take((e - s + 1) as usize)
719                            .cloned()
720                            .collect())
721                    }
722                    _ => Err(WrongType),
723                };
724                if result.is_ok() {
725                    entry.touch();
726                }
727                result
728            }
729        }
730    }
731
732    /// Returns the length of a list, or 0 if the key doesn't exist.
733    ///
734    /// Returns `Err(WrongType)` on type mismatch.
735    pub fn llen(&mut self, key: &str) -> Result<usize, WrongType> {
736        if self.remove_if_expired(key) {
737            return Ok(0);
738        }
739        match self.entries.get(key) {
740            None => Ok(0),
741            Some(entry) => match &entry.value {
742                Value::List(deque) => Ok(deque.len()),
743                _ => Err(WrongType),
744            },
745        }
746    }
747
748    /// Internal push implementation shared by lpush/rpush.
749    fn list_push(&mut self, key: &str, values: &[Bytes], left: bool) -> Result<usize, WriteError> {
750        self.remove_if_expired(key);
751
752        let is_new = !self.entries.contains_key(key);
753
754        if !is_new && !matches!(self.entries[key].value, Value::List(_)) {
755            return Err(WriteError::WrongType);
756        }
757
758        // estimate the memory increase and enforce the limit before mutating
759        let element_increase: usize = values
760            .iter()
761            .map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
762            .sum();
763        let estimated_increase = if is_new {
764            memory::ENTRY_OVERHEAD + key.len() + memory::VECDEQUE_BASE_OVERHEAD + element_increase
765        } else {
766            element_increase
767        };
768        if !self.enforce_memory_limit(estimated_increase) {
769            return Err(WriteError::OutOfMemory);
770        }
771
772        if is_new {
773            let value = Value::List(VecDeque::new());
774            self.memory.add(key, &value);
775            self.entries.insert(key.to_owned(), Entry::new(value, None));
776        }
777
778        let entry = self
779            .entries
780            .get_mut(key)
781            .expect("just inserted or verified");
782        let old_entry_size = memory::entry_size(key, &entry.value);
783
784        if let Value::List(ref mut deque) = entry.value {
785            for val in values {
786                if left {
787                    deque.push_front(val.clone());
788                } else {
789                    deque.push_back(val.clone());
790                }
791            }
792        }
793        entry.touch();
794
795        let new_entry_size = memory::entry_size(key, &entry.value);
796        self.memory.adjust(old_entry_size, new_entry_size);
797
798        let len = match &entry.value {
799            Value::List(d) => d.len(),
800            _ => unreachable!(),
801        };
802        Ok(len)
803    }
804
805    /// Internal pop implementation shared by lpop/rpop.
806    fn list_pop(&mut self, key: &str, left: bool) -> Result<Option<Bytes>, WrongType> {
807        if self.remove_if_expired(key) {
808            return Ok(None);
809        }
810
811        match self.entries.get(key) {
812            None => return Ok(None),
813            Some(e) => {
814                if !matches!(e.value, Value::List(_)) {
815                    return Err(WrongType);
816                }
817            }
818        };
819
820        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
821        let entry = self.entries.get_mut(key).expect("verified above");
822        let popped = if let Value::List(ref mut deque) = entry.value {
823            if left {
824                deque.pop_front()
825            } else {
826                deque.pop_back()
827            }
828        } else {
829            unreachable!()
830        };
831        entry.touch();
832
833        // check if list is now empty — if so, delete the key entirely
834        let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty());
835        if is_empty {
836            let removed = self.entries.remove(key).expect("verified above");
837            // use remove_with_size since the value was already mutated
838            self.memory.remove_with_size(old_entry_size);
839            if removed.expires_at.is_some() {
840                self.expiry_count = self.expiry_count.saturating_sub(1);
841            }
842        } else {
843            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
844            self.memory.adjust(old_entry_size, new_entry_size);
845        }
846
847        Ok(popped)
848    }
849
850    // -- sorted set operations --
851
852    /// Adds members with scores to a sorted set, with optional ZADD flags.
853    ///
854    /// Creates the sorted set if the key doesn't exist. Returns a
855    /// `ZAddResult` containing the count for the client response and the
856    /// list of members that were actually applied (for AOF correctness).
857    /// Returns `Err(WriteError::WrongType)` on type mismatch, or
858    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
859    pub fn zadd(
860        &mut self,
861        key: &str,
862        members: &[(f64, String)],
863        flags: &ZAddFlags,
864    ) -> Result<ZAddResult, WriteError> {
865        self.remove_if_expired(key);
866
867        let is_new = !self.entries.contains_key(key);
868        if !is_new && !matches!(self.entries[key].value, Value::SortedSet(_)) {
869            return Err(WriteError::WrongType);
870        }
871
872        // worst-case estimate: assume all members are new
873        let member_increase: usize = members
874            .iter()
875            .map(|(_, m)| SortedSet::estimated_member_cost(m))
876            .sum();
877        let estimated_increase = if is_new {
878            memory::ENTRY_OVERHEAD + key.len() + SortedSet::BASE_OVERHEAD + member_increase
879        } else {
880            member_increase
881        };
882        if !self.enforce_memory_limit(estimated_increase) {
883            return Err(WriteError::OutOfMemory);
884        }
885
886        if is_new {
887            let value = Value::SortedSet(SortedSet::new());
888            self.memory.add(key, &value);
889            self.entries.insert(key.to_owned(), Entry::new(value, None));
890        }
891
892        let entry = self
893            .entries
894            .get_mut(key)
895            .expect("just inserted or verified");
896        let old_entry_size = memory::entry_size(key, &entry.value);
897
898        let mut count = 0;
899        let mut applied = Vec::new();
900        if let Value::SortedSet(ref mut ss) = entry.value {
901            for (score, member) in members {
902                let result = ss.add_with_flags(member.clone(), *score, flags);
903                if result.added || result.updated {
904                    applied.push((*score, member.clone()));
905                }
906                if flags.ch {
907                    if result.added || result.updated {
908                        count += 1;
909                    }
910                } else if result.added {
911                    count += 1;
912                }
913            }
914        }
915        entry.touch();
916
917        let new_entry_size = memory::entry_size(key, &entry.value);
918        self.memory.adjust(old_entry_size, new_entry_size);
919
920        // clean up if the set is still empty (e.g. XX on a new key)
921        if let Value::SortedSet(ref ss) = entry.value {
922            if ss.is_empty() {
923                self.memory
924                    .remove_with_size(memory::entry_size(key, &entry.value));
925                self.entries.remove(key);
926            }
927        }
928
929        Ok(ZAddResult { count, applied })
930    }
931
932    /// Removes members from a sorted set. Returns the names of members
933    /// that were actually removed (for AOF correctness). Deletes the key
934    /// if the set becomes empty.
935    ///
936    /// Returns `Err(WrongType)` if the key holds a non-sorted-set value.
937    pub fn zrem(&mut self, key: &str, members: &[String]) -> Result<Vec<String>, WrongType> {
938        if self.remove_if_expired(key) {
939            return Ok(vec![]);
940        }
941
942        match self.entries.get(key) {
943            None => return Ok(vec![]),
944            Some(e) => {
945                if !matches!(e.value, Value::SortedSet(_)) {
946                    return Err(WrongType);
947                }
948            }
949        }
950
951        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
952        let entry = self.entries.get_mut(key).expect("verified above");
953        let mut removed = Vec::new();
954        if let Value::SortedSet(ref mut ss) = entry.value {
955            for member in members {
956                if ss.remove(member) {
957                    removed.push(member.clone());
958                }
959            }
960        }
961        entry.touch();
962
963        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
964        if is_empty {
965            let removed_entry = self.entries.remove(key).expect("verified above");
966            self.memory.remove_with_size(old_entry_size);
967            if removed_entry.expires_at.is_some() {
968                self.expiry_count = self.expiry_count.saturating_sub(1);
969            }
970        } else {
971            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
972            self.memory.adjust(old_entry_size, new_entry_size);
973        }
974
975        Ok(removed)
976    }
977
978    /// Returns the score for a member in a sorted set.
979    ///
980    /// Returns `Ok(None)` if the key or member doesn't exist.
981    /// Returns `Err(WrongType)` on type mismatch.
982    pub fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, WrongType> {
983        if self.remove_if_expired(key) {
984            return Ok(None);
985        }
986        match self.entries.get_mut(key) {
987            None => Ok(None),
988            Some(entry) => match &entry.value {
989                Value::SortedSet(ss) => {
990                    let score = ss.score(member);
991                    entry.touch();
992                    Ok(score)
993                }
994                _ => Err(WrongType),
995            },
996        }
997    }
998
999    /// Returns the 0-based rank of a member in a sorted set (lowest score = 0).
1000    ///
1001    /// Returns `Ok(None)` if the key or member doesn't exist.
1002    /// Returns `Err(WrongType)` on type mismatch.
1003    pub fn zrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
1004        if self.remove_if_expired(key) {
1005            return Ok(None);
1006        }
1007        match self.entries.get_mut(key) {
1008            None => Ok(None),
1009            Some(entry) => match &entry.value {
1010                Value::SortedSet(ss) => {
1011                    let rank = ss.rank(member);
1012                    entry.touch();
1013                    Ok(rank)
1014                }
1015                _ => Err(WrongType),
1016            },
1017        }
1018    }
1019
1020    /// Returns a range of members from a sorted set by rank.
1021    ///
1022    /// Supports negative indices. If `with_scores` is true, the result
1023    /// includes `(member, score)` pairs; otherwise just members.
1024    /// Returns `Err(WrongType)` on type mismatch.
1025    pub fn zrange(
1026        &mut self,
1027        key: &str,
1028        start: i64,
1029        stop: i64,
1030    ) -> Result<Vec<(String, f64)>, WrongType> {
1031        if self.remove_if_expired(key) {
1032            return Ok(vec![]);
1033        }
1034        match self.entries.get_mut(key) {
1035            None => Ok(vec![]),
1036            Some(entry) => {
1037                let result = match &entry.value {
1038                    Value::SortedSet(ss) => {
1039                        let items = ss.range_by_rank(start, stop);
1040                        Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
1041                    }
1042                    _ => Err(WrongType),
1043                };
1044                if result.is_ok() {
1045                    entry.touch();
1046                }
1047                result
1048            }
1049        }
1050    }
1051
1052    /// Returns the number of members in a sorted set, or 0 if the key doesn't exist.
1053    ///
1054    /// Returns `Err(WrongType)` on type mismatch.
1055    pub fn zcard(&mut self, key: &str) -> Result<usize, WrongType> {
1056        if self.remove_if_expired(key) {
1057            return Ok(0);
1058        }
1059        match self.entries.get(key) {
1060            None => Ok(0),
1061            Some(entry) => match &entry.value {
1062                Value::SortedSet(ss) => Ok(ss.len()),
1063                _ => Err(WrongType),
1064            },
1065        }
1066    }
1067
1068    // -------------------------------------------------------------------------
1069    // Hash operations
1070    // -------------------------------------------------------------------------
1071
1072    /// Sets one or more field-value pairs in a hash.
1073    ///
1074    /// Creates the hash if the key doesn't exist. Returns the number of
1075    /// new fields added (fields that were updated don't count).
1076    pub fn hset(&mut self, key: &str, fields: &[(String, Bytes)]) -> Result<usize, WriteError> {
1077        if fields.is_empty() {
1078            return Ok(0);
1079        }
1080
1081        self.remove_if_expired(key);
1082
1083        let is_new = !self.entries.contains_key(key);
1084
1085        if !is_new && !matches!(self.entries[key].value, Value::Hash(_)) {
1086            return Err(WriteError::WrongType);
1087        }
1088
1089        // estimate memory increase before mutating
1090        let field_increase: usize = fields
1091            .iter()
1092            .map(|(f, v)| f.len() + v.len() + memory::HASHMAP_ENTRY_OVERHEAD)
1093            .sum();
1094        let estimated_increase = if is_new {
1095            memory::ENTRY_OVERHEAD + key.len() + memory::HASHMAP_BASE_OVERHEAD + field_increase
1096        } else {
1097            field_increase
1098        };
1099        if !self.enforce_memory_limit(estimated_increase) {
1100            return Err(WriteError::OutOfMemory);
1101        }
1102
1103        if is_new {
1104            let value = Value::Hash(HashMap::new());
1105            self.memory.add(key, &value);
1106            self.entries.insert(key.to_owned(), Entry::new(value, None));
1107        }
1108
1109        let entry = self
1110            .entries
1111            .get_mut(key)
1112            .expect("just inserted or verified");
1113        let old_entry_size = memory::entry_size(key, &entry.value);
1114
1115        let mut added = 0;
1116        if let Value::Hash(ref mut map) = entry.value {
1117            for (field, value) in fields {
1118                if map.insert(field.clone(), value.clone()).is_none() {
1119                    added += 1;
1120                }
1121            }
1122        }
1123        entry.touch();
1124
1125        let new_entry_size = memory::entry_size(key, &entry.value);
1126        self.memory.adjust(old_entry_size, new_entry_size);
1127
1128        Ok(added)
1129    }
1130
1131    /// Gets the value of a field in a hash.
1132    ///
1133    /// Returns `None` if the key or field doesn't exist.
1134    pub fn hget(&mut self, key: &str, field: &str) -> Result<Option<Bytes>, WrongType> {
1135        if self.remove_if_expired(key) {
1136            return Ok(None);
1137        }
1138        match self.entries.get_mut(key) {
1139            None => Ok(None),
1140            Some(entry) => match &entry.value {
1141                Value::Hash(map) => {
1142                    let result = map.get(field).cloned();
1143                    entry.touch();
1144                    Ok(result)
1145                }
1146                _ => Err(WrongType),
1147            },
1148        }
1149    }
1150
1151    /// Gets all field-value pairs from a hash.
1152    ///
1153    /// Returns an empty vec if the key doesn't exist.
1154    pub fn hgetall(&mut self, key: &str) -> Result<Vec<(String, Bytes)>, WrongType> {
1155        if self.remove_if_expired(key) {
1156            return Ok(vec![]);
1157        }
1158        match self.entries.get_mut(key) {
1159            None => Ok(vec![]),
1160            Some(entry) => match &entry.value {
1161                Value::Hash(map) => {
1162                    let result: Vec<_> = map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1163                    entry.touch();
1164                    Ok(result)
1165                }
1166                _ => Err(WrongType),
1167            },
1168        }
1169    }
1170
1171    /// Deletes one or more fields from a hash.
1172    ///
1173    /// Returns the fields that were actually removed.
1174    pub fn hdel(&mut self, key: &str, fields: &[String]) -> Result<Vec<String>, WrongType> {
1175        if self.remove_if_expired(key) {
1176            return Ok(vec![]);
1177        }
1178
1179        match self.entries.get(key) {
1180            None => return Ok(vec![]),
1181            Some(e) => {
1182                if !matches!(e.value, Value::Hash(_)) {
1183                    return Err(WrongType);
1184                }
1185            }
1186        }
1187
1188        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
1189        let entry = self.entries.get_mut(key).expect("verified above");
1190
1191        let mut removed = Vec::new();
1192        let is_empty = if let Value::Hash(ref mut map) = entry.value {
1193            for field in fields {
1194                if map.remove(field).is_some() {
1195                    removed.push(field.clone());
1196                }
1197            }
1198            map.is_empty()
1199        } else {
1200            false
1201        };
1202
1203        if is_empty {
1204            self.memory.remove_with_size(old_entry_size);
1205            self.entries.remove(key);
1206        } else {
1207            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
1208            self.memory.adjust(old_entry_size, new_entry_size);
1209        }
1210
1211        Ok(removed)
1212    }
1213
1214    /// Checks if a field exists in a hash.
1215    pub fn hexists(&mut self, key: &str, field: &str) -> Result<bool, WrongType> {
1216        if self.remove_if_expired(key) {
1217            return Ok(false);
1218        }
1219        match self.entries.get_mut(key) {
1220            None => Ok(false),
1221            Some(entry) => match &entry.value {
1222                Value::Hash(map) => {
1223                    let result = map.contains_key(field);
1224                    entry.touch();
1225                    Ok(result)
1226                }
1227                _ => Err(WrongType),
1228            },
1229        }
1230    }
1231
1232    /// Returns the number of fields in a hash.
1233    pub fn hlen(&mut self, key: &str) -> Result<usize, WrongType> {
1234        if self.remove_if_expired(key) {
1235            return Ok(0);
1236        }
1237        match self.entries.get(key) {
1238            None => Ok(0),
1239            Some(entry) => match &entry.value {
1240                Value::Hash(map) => Ok(map.len()),
1241                _ => Err(WrongType),
1242            },
1243        }
1244    }
1245
1246    /// Increments a field's integer value by the given amount.
1247    ///
1248    /// Creates the hash and field if they don't exist, starting from 0.
1249    pub fn hincrby(&mut self, key: &str, field: &str, delta: i64) -> Result<i64, IncrError> {
1250        self.remove_if_expired(key);
1251
1252        let is_new = !self.entries.contains_key(key);
1253
1254        if !is_new {
1255            match &self.entries[key].value {
1256                Value::Hash(_) => {}
1257                _ => return Err(IncrError::WrongType),
1258            }
1259        }
1260
1261        // estimate memory for new field (worst case: new hash + new field)
1262        let val_str_len = 20; // max i64 string length
1263        let estimated_increase = if is_new {
1264            memory::ENTRY_OVERHEAD
1265                + key.len()
1266                + memory::HASHMAP_BASE_OVERHEAD
1267                + field.len()
1268                + val_str_len
1269                + memory::HASHMAP_ENTRY_OVERHEAD
1270        } else {
1271            field.len() + val_str_len + memory::HASHMAP_ENTRY_OVERHEAD
1272        };
1273
1274        if !self.enforce_memory_limit(estimated_increase) {
1275            return Err(IncrError::OutOfMemory);
1276        }
1277
1278        if is_new {
1279            let value = Value::Hash(HashMap::new());
1280            self.memory.add(key, &value);
1281            self.entries.insert(key.to_owned(), Entry::new(value, None));
1282        }
1283
1284        let entry = self
1285            .entries
1286            .get_mut(key)
1287            .expect("just inserted or verified");
1288        let old_entry_size = memory::entry_size(key, &entry.value);
1289
1290        let new_val = if let Value::Hash(ref mut map) = entry.value {
1291            let current_val = match map.get(field) {
1292                Some(data) => {
1293                    let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
1294                    s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
1295                }
1296                None => 0,
1297            };
1298
1299            let new_val = current_val.checked_add(delta).ok_or(IncrError::Overflow)?;
1300            map.insert(field.to_owned(), Bytes::from(new_val.to_string()));
1301            new_val
1302        } else {
1303            unreachable!()
1304        };
1305        entry.touch();
1306
1307        let new_entry_size = memory::entry_size(key, &entry.value);
1308        self.memory.adjust(old_entry_size, new_entry_size);
1309
1310        Ok(new_val)
1311    }
1312
1313    /// Returns all field names in a hash.
1314    pub fn hkeys(&mut self, key: &str) -> Result<Vec<String>, WrongType> {
1315        if self.remove_if_expired(key) {
1316            return Ok(vec![]);
1317        }
1318        match self.entries.get_mut(key) {
1319            None => Ok(vec![]),
1320            Some(entry) => match &entry.value {
1321                Value::Hash(map) => {
1322                    let result = map.keys().cloned().collect();
1323                    entry.touch();
1324                    Ok(result)
1325                }
1326                _ => Err(WrongType),
1327            },
1328        }
1329    }
1330
1331    /// Returns all values in a hash.
1332    pub fn hvals(&mut self, key: &str) -> Result<Vec<Bytes>, WrongType> {
1333        if self.remove_if_expired(key) {
1334            return Ok(vec![]);
1335        }
1336        match self.entries.get_mut(key) {
1337            None => Ok(vec![]),
1338            Some(entry) => match &entry.value {
1339                Value::Hash(map) => {
1340                    let result = map.values().cloned().collect();
1341                    entry.touch();
1342                    Ok(result)
1343                }
1344                _ => Err(WrongType),
1345            },
1346        }
1347    }
1348
1349    /// Gets multiple field values from a hash.
1350    ///
1351    /// Returns `None` for fields that don't exist.
1352    pub fn hmget(&mut self, key: &str, fields: &[String]) -> Result<Vec<Option<Bytes>>, WrongType> {
1353        if self.remove_if_expired(key) {
1354            return Ok(fields.iter().map(|_| None).collect());
1355        }
1356        match self.entries.get_mut(key) {
1357            None => Ok(fields.iter().map(|_| None).collect()),
1358            Some(entry) => match &entry.value {
1359                Value::Hash(map) => {
1360                    let result = fields.iter().map(|f| map.get(f).cloned()).collect();
1361                    entry.touch();
1362                    Ok(result)
1363                }
1364                _ => Err(WrongType),
1365            },
1366        }
1367    }
1368
1369    // -------------------------------------------------------------------------
1370    // Set operations
1371    // -------------------------------------------------------------------------
1372
1373    /// Adds one or more members to a set.
1374    ///
1375    /// Creates the set if the key doesn't exist. Returns the number of
1376    /// new members added (existing members don't count).
1377    pub fn sadd(&mut self, key: &str, members: &[String]) -> Result<usize, WriteError> {
1378        if members.is_empty() {
1379            return Ok(0);
1380        }
1381
1382        self.remove_if_expired(key);
1383
1384        let is_new = !self.entries.contains_key(key);
1385
1386        if !is_new && !matches!(self.entries[key].value, Value::Set(_)) {
1387            return Err(WriteError::WrongType);
1388        }
1389
1390        // estimate memory increase before mutating
1391        let member_increase: usize = members
1392            .iter()
1393            .map(|m| m.len() + memory::HASHSET_MEMBER_OVERHEAD)
1394            .sum();
1395        let estimated_increase = if is_new {
1396            memory::ENTRY_OVERHEAD + key.len() + memory::HASHSET_BASE_OVERHEAD + member_increase
1397        } else {
1398            member_increase
1399        };
1400        if !self.enforce_memory_limit(estimated_increase) {
1401            return Err(WriteError::OutOfMemory);
1402        }
1403
1404        if is_new {
1405            let value = Value::Set(std::collections::HashSet::new());
1406            self.memory.add(key, &value);
1407            self.entries.insert(key.to_owned(), Entry::new(value, None));
1408        }
1409
1410        let entry = self
1411            .entries
1412            .get_mut(key)
1413            .expect("just inserted or verified");
1414        let old_entry_size = memory::entry_size(key, &entry.value);
1415
1416        let mut added = 0;
1417        if let Value::Set(ref mut set) = entry.value {
1418            for member in members {
1419                if set.insert(member.clone()) {
1420                    added += 1;
1421                }
1422            }
1423        }
1424        entry.touch();
1425
1426        let new_entry_size = memory::entry_size(key, &entry.value);
1427        self.memory.adjust(old_entry_size, new_entry_size);
1428
1429        Ok(added)
1430    }
1431
1432    /// Removes one or more members from a set.
1433    ///
1434    /// Returns the number of members that were actually removed.
1435    pub fn srem(&mut self, key: &str, members: &[String]) -> Result<usize, WrongType> {
1436        if self.remove_if_expired(key) {
1437            return Ok(0);
1438        }
1439
1440        match self.entries.get(key) {
1441            None => return Ok(0),
1442            Some(e) => {
1443                if !matches!(e.value, Value::Set(_)) {
1444                    return Err(WrongType);
1445                }
1446            }
1447        }
1448
1449        let old_entry_size = memory::entry_size(key, &self.entries[key].value);
1450        let entry = self.entries.get_mut(key).expect("verified above");
1451
1452        let mut removed = 0;
1453        let is_empty = if let Value::Set(ref mut set) = entry.value {
1454            for member in members {
1455                if set.remove(member) {
1456                    removed += 1;
1457                }
1458            }
1459            set.is_empty()
1460        } else {
1461            false
1462        };
1463
1464        if is_empty {
1465            self.memory.remove_with_size(old_entry_size);
1466            self.entries.remove(key);
1467        } else {
1468            let new_entry_size = memory::entry_size(key, &self.entries[key].value);
1469            self.memory.adjust(old_entry_size, new_entry_size);
1470        }
1471
1472        Ok(removed)
1473    }
1474
1475    /// Returns all members of a set.
1476    pub fn smembers(&mut self, key: &str) -> Result<Vec<String>, WrongType> {
1477        if self.remove_if_expired(key) {
1478            return Ok(vec![]);
1479        }
1480        match self.entries.get_mut(key) {
1481            None => Ok(vec![]),
1482            Some(entry) => match &entry.value {
1483                Value::Set(set) => {
1484                    let result = set.iter().cloned().collect();
1485                    entry.touch();
1486                    Ok(result)
1487                }
1488                _ => Err(WrongType),
1489            },
1490        }
1491    }
1492
1493    /// Checks if a member exists in a set.
1494    pub fn sismember(&mut self, key: &str, member: &str) -> Result<bool, WrongType> {
1495        if self.remove_if_expired(key) {
1496            return Ok(false);
1497        }
1498        match self.entries.get_mut(key) {
1499            None => Ok(false),
1500            Some(entry) => match &entry.value {
1501                Value::Set(set) => {
1502                    let result = set.contains(member);
1503                    entry.touch();
1504                    Ok(result)
1505                }
1506                _ => Err(WrongType),
1507            },
1508        }
1509    }
1510
1511    /// Returns the cardinality (number of elements) of a set.
1512    pub fn scard(&mut self, key: &str) -> Result<usize, WrongType> {
1513        if self.remove_if_expired(key) {
1514            return Ok(0);
1515        }
1516        match self.entries.get(key) {
1517            None => Ok(0),
1518            Some(entry) => match &entry.value {
1519                Value::Set(set) => Ok(set.len()),
1520                _ => Err(WrongType),
1521            },
1522        }
1523    }
1524
1525    /// Randomly samples up to `count` keys and removes any that have expired.
1526    ///
1527    /// Returns the number of keys actually removed. Used by the active
1528    /// expiration cycle to clean up keys that no one is reading.
1529    pub fn expire_sample(&mut self, count: usize) -> usize {
1530        if self.entries.is_empty() {
1531            return 0;
1532        }
1533
1534        let mut rng = rand::rng();
1535
1536        let keys_to_check: Vec<String> = self
1537            .entries
1538            .keys()
1539            .choose_multiple(&mut rng, count)
1540            .into_iter()
1541            .cloned()
1542            .collect();
1543
1544        let mut removed = 0;
1545        for key in &keys_to_check {
1546            if self.remove_if_expired(key) {
1547                removed += 1;
1548            }
1549        }
1550        removed
1551    }
1552
1553    /// Checks if a key is expired and removes it if so. Returns `true`
1554    /// if the key was removed (or didn't exist).
1555    fn remove_if_expired(&mut self, key: &str) -> bool {
1556        let expired = self
1557            .entries
1558            .get(key)
1559            .map(|e| e.is_expired())
1560            .unwrap_or(false);
1561
1562        if expired {
1563            if let Some(entry) = self.entries.remove(key) {
1564                self.memory.remove(key, &entry.value);
1565                if entry.expires_at.is_some() {
1566                    self.expiry_count = self.expiry_count.saturating_sub(1);
1567                }
1568            }
1569        }
1570        expired
1571    }
1572}
1573
1574impl Default for Keyspace {
1575    fn default() -> Self {
1576        Self::new()
1577    }
1578}
1579
1580/// Glob-style pattern matching for SCAN's MATCH option.
1581///
1582/// Supports:
1583/// - `*` matches any sequence of characters (including empty)
1584/// - `?` matches exactly one character
1585/// - `[abc]` matches one character from the set
1586/// - `[^abc]` or `[!abc]` matches one character NOT in the set
1587///
1588/// Uses an iterative two-pointer algorithm with backtracking for O(n*m)
1589/// worst-case performance, where n is pattern length and m is text length.
1590fn glob_match(pattern: &str, text: &str) -> bool {
1591    let pat: Vec<char> = pattern.chars().collect();
1592    let txt: Vec<char> = text.chars().collect();
1593
1594    let mut pi = 0; // pattern index
1595    let mut ti = 0; // text index
1596
1597    // backtracking state for the most recent '*'
1598    let mut star_pi: Option<usize> = None;
1599    let mut star_ti: usize = 0;
1600
1601    while ti < txt.len() || pi < pat.len() {
1602        if pi < pat.len() {
1603            match pat[pi] {
1604                '*' => {
1605                    // record star position and try matching zero chars first
1606                    star_pi = Some(pi);
1607                    star_ti = ti;
1608                    pi += 1;
1609                    continue;
1610                }
1611                '?' if ti < txt.len() => {
1612                    pi += 1;
1613                    ti += 1;
1614                    continue;
1615                }
1616                '[' if ti < txt.len() => {
1617                    // parse character class
1618                    let tc = txt[ti];
1619                    let mut j = pi + 1;
1620                    let mut negated = false;
1621                    let mut matched = false;
1622
1623                    if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1624                        negated = true;
1625                        j += 1;
1626                    }
1627
1628                    while j < pat.len() && pat[j] != ']' {
1629                        if pat[j] == tc {
1630                            matched = true;
1631                        }
1632                        j += 1;
1633                    }
1634
1635                    if negated {
1636                        matched = !matched;
1637                    }
1638
1639                    if matched && j < pat.len() {
1640                        pi = j + 1; // skip past ']'
1641                        ti += 1;
1642                        continue;
1643                    }
1644                    // fall through to backtrack
1645                }
1646                c if ti < txt.len() && c == txt[ti] => {
1647                    pi += 1;
1648                    ti += 1;
1649                    continue;
1650                }
1651                _ => {}
1652            }
1653        }
1654
1655        // mismatch or end of pattern — try backtracking to last '*'
1656        if let Some(sp) = star_pi {
1657            pi = sp + 1;
1658            star_ti += 1;
1659            ti = star_ti;
1660            if ti > txt.len() {
1661                return false;
1662            }
1663        } else {
1664            return false;
1665        }
1666    }
1667
1668    // skip trailing '*' in pattern
1669    while pi < pat.len() && pat[pi] == '*' {
1670        pi += 1;
1671    }
1672
1673    pi == pat.len()
1674}
1675
1676#[cfg(test)]
1677mod tests {
1678    use super::*;
1679    use std::thread;
1680
1681    #[test]
1682    fn set_and_get() {
1683        let mut ks = Keyspace::new();
1684        ks.set("hello".into(), Bytes::from("world"), None);
1685        assert_eq!(
1686            ks.get("hello").unwrap(),
1687            Some(Value::String(Bytes::from("world")))
1688        );
1689    }
1690
1691    #[test]
1692    fn get_missing_key() {
1693        let mut ks = Keyspace::new();
1694        assert_eq!(ks.get("nope").unwrap(), None);
1695    }
1696
1697    #[test]
1698    fn overwrite_replaces_value() {
1699        let mut ks = Keyspace::new();
1700        ks.set("key".into(), Bytes::from("first"), None);
1701        ks.set("key".into(), Bytes::from("second"), None);
1702        assert_eq!(
1703            ks.get("key").unwrap(),
1704            Some(Value::String(Bytes::from("second")))
1705        );
1706    }
1707
1708    #[test]
1709    fn overwrite_clears_old_ttl() {
1710        let mut ks = Keyspace::new();
1711        ks.set(
1712            "key".into(),
1713            Bytes::from("v1"),
1714            Some(Duration::from_secs(100)),
1715        );
1716        // overwrite without TTL — should clear the old one
1717        ks.set("key".into(), Bytes::from("v2"), None);
1718        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1719    }
1720
1721    #[test]
1722    fn del_existing() {
1723        let mut ks = Keyspace::new();
1724        ks.set("key".into(), Bytes::from("val"), None);
1725        assert!(ks.del("key"));
1726        assert_eq!(ks.get("key").unwrap(), None);
1727    }
1728
1729    #[test]
1730    fn del_missing() {
1731        let mut ks = Keyspace::new();
1732        assert!(!ks.del("nope"));
1733    }
1734
1735    #[test]
1736    fn exists_present_and_absent() {
1737        let mut ks = Keyspace::new();
1738        ks.set("yes".into(), Bytes::from("here"), None);
1739        assert!(ks.exists("yes"));
1740        assert!(!ks.exists("no"));
1741    }
1742
1743    #[test]
1744    fn expired_key_returns_none() {
1745        let mut ks = Keyspace::new();
1746        ks.set(
1747            "temp".into(),
1748            Bytes::from("gone"),
1749            Some(Duration::from_millis(10)),
1750        );
1751        // wait for expiration
1752        thread::sleep(Duration::from_millis(30));
1753        assert_eq!(ks.get("temp").unwrap(), None);
1754        // should also be gone from exists
1755        assert!(!ks.exists("temp"));
1756    }
1757
1758    #[test]
1759    fn ttl_no_expiry() {
1760        let mut ks = Keyspace::new();
1761        ks.set("key".into(), Bytes::from("val"), None);
1762        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1763    }
1764
1765    #[test]
1766    fn ttl_not_found() {
1767        let mut ks = Keyspace::new();
1768        assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1769    }
1770
1771    #[test]
1772    fn ttl_with_expiry() {
1773        let mut ks = Keyspace::new();
1774        ks.set(
1775            "key".into(),
1776            Bytes::from("val"),
1777            Some(Duration::from_secs(100)),
1778        );
1779        match ks.ttl("key") {
1780            TtlResult::Seconds(s) => assert!(s >= 98 && s <= 100),
1781            other => panic!("expected Seconds, got {other:?}"),
1782        }
1783    }
1784
1785    #[test]
1786    fn ttl_expired_key() {
1787        let mut ks = Keyspace::new();
1788        ks.set(
1789            "temp".into(),
1790            Bytes::from("val"),
1791            Some(Duration::from_millis(10)),
1792        );
1793        thread::sleep(Duration::from_millis(30));
1794        assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1795    }
1796
1797    #[test]
1798    fn expire_existing_key() {
1799        let mut ks = Keyspace::new();
1800        ks.set("key".into(), Bytes::from("val"), None);
1801        assert!(ks.expire("key", 60));
1802        match ks.ttl("key") {
1803            TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
1804            other => panic!("expected Seconds, got {other:?}"),
1805        }
1806    }
1807
1808    #[test]
1809    fn expire_missing_key() {
1810        let mut ks = Keyspace::new();
1811        assert!(!ks.expire("nope", 60));
1812    }
1813
1814    #[test]
1815    fn del_expired_key_returns_false() {
1816        let mut ks = Keyspace::new();
1817        ks.set(
1818            "temp".into(),
1819            Bytes::from("val"),
1820            Some(Duration::from_millis(10)),
1821        );
1822        thread::sleep(Duration::from_millis(30));
1823        // key is expired, del should return false (not found)
1824        assert!(!ks.del("temp"));
1825    }
1826
1827    // -- memory tracking tests --
1828
1829    #[test]
1830    fn memory_increases_on_set() {
1831        let mut ks = Keyspace::new();
1832        assert_eq!(ks.stats().used_bytes, 0);
1833        ks.set("key".into(), Bytes::from("value"), None);
1834        assert!(ks.stats().used_bytes > 0);
1835        assert_eq!(ks.stats().key_count, 1);
1836    }
1837
1838    #[test]
1839    fn memory_decreases_on_del() {
1840        let mut ks = Keyspace::new();
1841        ks.set("key".into(), Bytes::from("value"), None);
1842        let after_set = ks.stats().used_bytes;
1843        ks.del("key");
1844        assert_eq!(ks.stats().used_bytes, 0);
1845        assert!(after_set > 0);
1846    }
1847
1848    #[test]
1849    fn memory_adjusts_on_overwrite() {
1850        let mut ks = Keyspace::new();
1851        ks.set("key".into(), Bytes::from("short"), None);
1852        let small = ks.stats().used_bytes;
1853
1854        ks.set("key".into(), Bytes::from("a much longer value"), None);
1855        let large = ks.stats().used_bytes;
1856
1857        assert!(large > small);
1858        assert_eq!(ks.stats().key_count, 1);
1859    }
1860
1861    #[test]
1862    fn memory_decreases_on_expired_removal() {
1863        let mut ks = Keyspace::new();
1864        ks.set(
1865            "temp".into(),
1866            Bytes::from("data"),
1867            Some(Duration::from_millis(10)),
1868        );
1869        assert!(ks.stats().used_bytes > 0);
1870        thread::sleep(Duration::from_millis(30));
1871        // trigger lazy expiration
1872        let _ = ks.get("temp");
1873        assert_eq!(ks.stats().used_bytes, 0);
1874        assert_eq!(ks.stats().key_count, 0);
1875    }
1876
1877    #[test]
1878    fn stats_tracks_expiry_count() {
1879        let mut ks = Keyspace::new();
1880        ks.set("a".into(), Bytes::from("1"), None);
1881        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1882        ks.set("c".into(), Bytes::from("3"), Some(Duration::from_secs(200)));
1883
1884        let stats = ks.stats();
1885        assert_eq!(stats.key_count, 3);
1886        assert_eq!(stats.keys_with_expiry, 2);
1887    }
1888
1889    // -- eviction tests --
1890
1891    #[test]
1892    fn noeviction_returns_oom_when_full() {
1893        // one entry with key "a" + value "val" = 1 + 3 + 96 = 100 bytes
1894        // set limit so one entry fits but two don't
1895        let config = ShardConfig {
1896            max_memory: Some(150),
1897            eviction_policy: EvictionPolicy::NoEviction,
1898            ..ShardConfig::default()
1899        };
1900        let mut ks = Keyspace::with_config(config);
1901
1902        // first key should fit
1903        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1904
1905        // second key should push us over the limit
1906        let result = ks.set("b".into(), Bytes::from("val"), None);
1907        assert_eq!(result, SetResult::OutOfMemory);
1908
1909        // original key should still be there
1910        assert!(ks.exists("a"));
1911    }
1912
1913    #[test]
1914    fn lru_eviction_makes_room() {
1915        let config = ShardConfig {
1916            max_memory: Some(150),
1917            eviction_policy: EvictionPolicy::AllKeysLru,
1918            ..ShardConfig::default()
1919        };
1920        let mut ks = Keyspace::with_config(config);
1921
1922        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1923
1924        // this should trigger eviction of "a" to make room
1925        assert_eq!(ks.set("b".into(), Bytes::from("val"), None), SetResult::Ok);
1926
1927        // "a" should have been evicted
1928        assert!(!ks.exists("a"));
1929        assert!(ks.exists("b"));
1930    }
1931
1932    #[test]
1933    fn overwrite_same_size_succeeds_at_limit() {
1934        let config = ShardConfig {
1935            max_memory: Some(150),
1936            eviction_policy: EvictionPolicy::NoEviction,
1937            ..ShardConfig::default()
1938        };
1939        let mut ks = Keyspace::with_config(config);
1940
1941        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1942
1943        // overwriting with same-size value should succeed — no net increase
1944        assert_eq!(ks.set("a".into(), Bytes::from("new"), None), SetResult::Ok);
1945        assert_eq!(
1946            ks.get("a").unwrap(),
1947            Some(Value::String(Bytes::from("new")))
1948        );
1949    }
1950
1951    #[test]
1952    fn overwrite_larger_value_respects_limit() {
1953        let config = ShardConfig {
1954            max_memory: Some(150),
1955            eviction_policy: EvictionPolicy::NoEviction,
1956            ..ShardConfig::default()
1957        };
1958        let mut ks = Keyspace::with_config(config);
1959
1960        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1961
1962        // overwriting with a much larger value should fail if it exceeds limit
1963        let big_value = "x".repeat(200);
1964        let result = ks.set("a".into(), Bytes::from(big_value), None);
1965        assert_eq!(result, SetResult::OutOfMemory);
1966
1967        // original value should still be intact
1968        assert_eq!(
1969            ks.get("a").unwrap(),
1970            Some(Value::String(Bytes::from("val")))
1971        );
1972    }
1973
1974    // -- iter_entries tests --
1975
1976    #[test]
1977    fn iter_entries_returns_live_entries() {
1978        let mut ks = Keyspace::new();
1979        ks.set("a".into(), Bytes::from("1"), None);
1980        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1981
1982        let entries: Vec<_> = ks.iter_entries().collect();
1983        assert_eq!(entries.len(), 2);
1984    }
1985
1986    #[test]
1987    fn iter_entries_skips_expired() {
1988        let mut ks = Keyspace::new();
1989        ks.set(
1990            "dead".into(),
1991            Bytes::from("gone"),
1992            Some(Duration::from_millis(1)),
1993        );
1994        ks.set("alive".into(), Bytes::from("here"), None);
1995        thread::sleep(Duration::from_millis(10));
1996
1997        let entries: Vec<_> = ks.iter_entries().collect();
1998        assert_eq!(entries.len(), 1);
1999        assert_eq!(entries[0].0, "alive");
2000    }
2001
2002    #[test]
2003    fn iter_entries_ttl_for_no_expiry() {
2004        let mut ks = Keyspace::new();
2005        ks.set("permanent".into(), Bytes::from("val"), None);
2006
2007        let entries: Vec<_> = ks.iter_entries().collect();
2008        assert_eq!(entries[0].2, -1);
2009    }
2010
2011    // -- restore tests --
2012
2013    #[test]
2014    fn restore_adds_entry() {
2015        let mut ks = Keyspace::new();
2016        ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
2017        assert_eq!(
2018            ks.get("restored").unwrap(),
2019            Some(Value::String(Bytes::from("data")))
2020        );
2021        assert_eq!(ks.stats().key_count, 1);
2022    }
2023
2024    #[test]
2025    fn restore_skips_past_deadline() {
2026        let mut ks = Keyspace::new();
2027        // deadline already passed
2028        let past = Instant::now() - Duration::from_secs(1);
2029        ks.restore(
2030            "expired".into(),
2031            Value::String(Bytes::from("old")),
2032            Some(past),
2033        );
2034        assert!(ks.is_empty());
2035    }
2036
2037    #[test]
2038    fn restore_overwrites_existing() {
2039        let mut ks = Keyspace::new();
2040        ks.set("key".into(), Bytes::from("old"), None);
2041        ks.restore("key".into(), Value::String(Bytes::from("new")), None);
2042        assert_eq!(
2043            ks.get("key").unwrap(),
2044            Some(Value::String(Bytes::from("new")))
2045        );
2046        assert_eq!(ks.stats().key_count, 1);
2047    }
2048
2049    #[test]
2050    fn restore_bypasses_memory_limit() {
2051        let config = ShardConfig {
2052            max_memory: Some(50), // very small
2053            eviction_policy: EvictionPolicy::NoEviction,
2054            ..ShardConfig::default()
2055        };
2056        let mut ks = Keyspace::with_config(config);
2057
2058        // normal set would fail due to memory limit
2059        ks.restore(
2060            "big".into(),
2061            Value::String(Bytes::from("x".repeat(200))),
2062            None,
2063        );
2064        assert_eq!(ks.stats().key_count, 1);
2065    }
2066
2067    #[test]
2068    fn no_limit_never_rejects() {
2069        // default config has no memory limit
2070        let mut ks = Keyspace::new();
2071        for i in 0..100 {
2072            assert_eq!(
2073                ks.set(format!("key:{i}"), Bytes::from("value"), None),
2074                SetResult::Ok
2075            );
2076        }
2077        assert_eq!(ks.len(), 100);
2078    }
2079
2080    // -- list tests --
2081
2082    #[test]
2083    fn lpush_creates_list() {
2084        let mut ks = Keyspace::new();
2085        let len = ks
2086            .lpush("list", &[Bytes::from("a"), Bytes::from("b")])
2087            .unwrap();
2088        assert_eq!(len, 2);
2089        // lpush pushes each to front, so order is b, a
2090        let items = ks.lrange("list", 0, -1).unwrap();
2091        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("a")]);
2092    }
2093
2094    #[test]
2095    fn rpush_creates_list() {
2096        let mut ks = Keyspace::new();
2097        let len = ks
2098            .rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2099            .unwrap();
2100        assert_eq!(len, 2);
2101        let items = ks.lrange("list", 0, -1).unwrap();
2102        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
2103    }
2104
2105    #[test]
2106    fn push_to_existing_list() {
2107        let mut ks = Keyspace::new();
2108        ks.rpush("list", &[Bytes::from("a")]).unwrap();
2109        let len = ks.rpush("list", &[Bytes::from("b")]).unwrap();
2110        assert_eq!(len, 2);
2111    }
2112
2113    #[test]
2114    fn lpop_returns_front() {
2115        let mut ks = Keyspace::new();
2116        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2117            .unwrap();
2118        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("a")));
2119        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("b")));
2120        assert_eq!(ks.lpop("list").unwrap(), None); // empty, key deleted
2121    }
2122
2123    #[test]
2124    fn rpop_returns_back() {
2125        let mut ks = Keyspace::new();
2126        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2127            .unwrap();
2128        assert_eq!(ks.rpop("list").unwrap(), Some(Bytes::from("b")));
2129    }
2130
2131    #[test]
2132    fn pop_from_missing_key() {
2133        let mut ks = Keyspace::new();
2134        assert_eq!(ks.lpop("nope").unwrap(), None);
2135        assert_eq!(ks.rpop("nope").unwrap(), None);
2136    }
2137
2138    #[test]
2139    fn empty_list_auto_deletes_key() {
2140        let mut ks = Keyspace::new();
2141        ks.rpush("list", &[Bytes::from("only")]).unwrap();
2142        ks.lpop("list").unwrap();
2143        assert!(!ks.exists("list"));
2144        assert_eq!(ks.stats().key_count, 0);
2145        assert_eq!(ks.stats().used_bytes, 0);
2146    }
2147
2148    #[test]
2149    fn lrange_negative_indices() {
2150        let mut ks = Keyspace::new();
2151        ks.rpush(
2152            "list",
2153            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
2154        )
2155        .unwrap();
2156        // -2 to -1 => last two elements
2157        let items = ks.lrange("list", -2, -1).unwrap();
2158        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
2159    }
2160
2161    #[test]
2162    fn lrange_out_of_bounds_clamps() {
2163        let mut ks = Keyspace::new();
2164        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2165            .unwrap();
2166        let items = ks.lrange("list", -100, 100).unwrap();
2167        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
2168    }
2169
2170    #[test]
2171    fn lrange_missing_key_returns_empty() {
2172        let mut ks = Keyspace::new();
2173        assert_eq!(ks.lrange("nope", 0, -1).unwrap(), Vec::<Bytes>::new());
2174    }
2175
2176    #[test]
2177    fn llen_returns_length() {
2178        let mut ks = Keyspace::new();
2179        assert_eq!(ks.llen("nope").unwrap(), 0);
2180        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
2181            .unwrap();
2182        assert_eq!(ks.llen("list").unwrap(), 2);
2183    }
2184
2185    #[test]
2186    fn list_memory_tracked_on_push_pop() {
2187        let mut ks = Keyspace::new();
2188        ks.rpush("list", &[Bytes::from("hello")]).unwrap();
2189        let after_push = ks.stats().used_bytes;
2190        assert!(after_push > 0);
2191
2192        ks.rpush("list", &[Bytes::from("world")]).unwrap();
2193        let after_second = ks.stats().used_bytes;
2194        assert!(after_second > after_push);
2195
2196        ks.lpop("list").unwrap();
2197        let after_pop = ks.stats().used_bytes;
2198        assert!(after_pop < after_second);
2199    }
2200
2201    #[test]
2202    fn lpush_on_string_key_returns_wrongtype() {
2203        let mut ks = Keyspace::new();
2204        ks.set("s".into(), Bytes::from("val"), None);
2205        assert!(ks.lpush("s", &[Bytes::from("nope")]).is_err());
2206    }
2207
2208    #[test]
2209    fn lrange_on_string_key_returns_wrongtype() {
2210        let mut ks = Keyspace::new();
2211        ks.set("s".into(), Bytes::from("val"), None);
2212        assert!(ks.lrange("s", 0, -1).is_err());
2213    }
2214
2215    #[test]
2216    fn llen_on_string_key_returns_wrongtype() {
2217        let mut ks = Keyspace::new();
2218        ks.set("s".into(), Bytes::from("val"), None);
2219        assert!(ks.llen("s").is_err());
2220    }
2221
2222    // -- wrongtype tests --
2223
2224    #[test]
2225    fn get_on_list_key_returns_wrongtype() {
2226        let mut ks = Keyspace::new();
2227        let mut list = std::collections::VecDeque::new();
2228        list.push_back(Bytes::from("item"));
2229        ks.restore("mylist".into(), Value::List(list), None);
2230
2231        assert!(ks.get("mylist").is_err());
2232    }
2233
2234    #[test]
2235    fn value_type_returns_correct_types() {
2236        let mut ks = Keyspace::new();
2237        assert_eq!(ks.value_type("missing"), "none");
2238
2239        ks.set("s".into(), Bytes::from("val"), None);
2240        assert_eq!(ks.value_type("s"), "string");
2241
2242        let mut list = std::collections::VecDeque::new();
2243        list.push_back(Bytes::from("item"));
2244        ks.restore("l".into(), Value::List(list), None);
2245        assert_eq!(ks.value_type("l"), "list");
2246
2247        ks.zadd("z", &[(1.0, "a".into())], &ZAddFlags::default())
2248            .unwrap();
2249        assert_eq!(ks.value_type("z"), "zset");
2250    }
2251
2252    // -- sorted set tests --
2253
2254    #[test]
2255    fn zadd_creates_sorted_set() {
2256        let mut ks = Keyspace::new();
2257        let result = ks
2258            .zadd(
2259                "board",
2260                &[(100.0, "alice".into()), (200.0, "bob".into())],
2261                &ZAddFlags::default(),
2262            )
2263            .unwrap();
2264        assert_eq!(result.count, 2);
2265        assert_eq!(result.applied.len(), 2);
2266        assert_eq!(ks.value_type("board"), "zset");
2267    }
2268
2269    #[test]
2270    fn zadd_updates_existing_score() {
2271        let mut ks = Keyspace::new();
2272        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
2273            .unwrap();
2274        // update score — default flags don't count updates
2275        let result = ks
2276            .zadd("z", &[(200.0, "alice".into())], &ZAddFlags::default())
2277            .unwrap();
2278        assert_eq!(result.count, 0);
2279        // score was updated, so applied should have the member
2280        assert_eq!(result.applied.len(), 1);
2281        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
2282    }
2283
2284    #[test]
2285    fn zadd_ch_flag_counts_changes() {
2286        let mut ks = Keyspace::new();
2287        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
2288            .unwrap();
2289        let flags = ZAddFlags {
2290            ch: true,
2291            ..Default::default()
2292        };
2293        let result = ks
2294            .zadd(
2295                "z",
2296                &[(200.0, "alice".into()), (50.0, "bob".into())],
2297                &flags,
2298            )
2299            .unwrap();
2300        // 1 updated + 1 added = 2
2301        assert_eq!(result.count, 2);
2302        assert_eq!(result.applied.len(), 2);
2303    }
2304
2305    #[test]
2306    fn zadd_nx_skips_existing() {
2307        let mut ks = Keyspace::new();
2308        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
2309            .unwrap();
2310        let flags = ZAddFlags {
2311            nx: true,
2312            ..Default::default()
2313        };
2314        let result = ks.zadd("z", &[(999.0, "alice".into())], &flags).unwrap();
2315        assert_eq!(result.count, 0);
2316        assert!(result.applied.is_empty());
2317        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
2318    }
2319
2320    #[test]
2321    fn zadd_xx_skips_new() {
2322        let mut ks = Keyspace::new();
2323        let flags = ZAddFlags {
2324            xx: true,
2325            ..Default::default()
2326        };
2327        let result = ks.zadd("z", &[(100.0, "alice".into())], &flags).unwrap();
2328        assert_eq!(result.count, 0);
2329        assert!(result.applied.is_empty());
2330        // key should be cleaned up since nothing was added
2331        assert_eq!(ks.value_type("z"), "none");
2332    }
2333
2334    #[test]
2335    fn zadd_gt_only_increases() {
2336        let mut ks = Keyspace::new();
2337        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
2338            .unwrap();
2339        let flags = ZAddFlags {
2340            gt: true,
2341            ..Default::default()
2342        };
2343        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
2344        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
2345        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
2346        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
2347    }
2348
2349    #[test]
2350    fn zadd_lt_only_decreases() {
2351        let mut ks = Keyspace::new();
2352        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
2353            .unwrap();
2354        let flags = ZAddFlags {
2355            lt: true,
2356            ..Default::default()
2357        };
2358        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
2359        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
2360        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
2361        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(50.0));
2362    }
2363
2364    #[test]
2365    fn zrem_removes_members() {
2366        let mut ks = Keyspace::new();
2367        ks.zadd(
2368            "z",
2369            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
2370            &ZAddFlags::default(),
2371        )
2372        .unwrap();
2373        let removed = ks
2374            .zrem("z", &["a".into(), "c".into(), "nonexistent".into()])
2375            .unwrap();
2376        assert_eq!(removed.len(), 2);
2377        assert!(removed.contains(&"a".to_owned()));
2378        assert!(removed.contains(&"c".to_owned()));
2379        assert_eq!(ks.zscore("z", "a").unwrap(), None);
2380        assert_eq!(ks.zscore("z", "b").unwrap(), Some(2.0));
2381    }
2382
2383    #[test]
2384    fn zrem_auto_deletes_empty() {
2385        let mut ks = Keyspace::new();
2386        ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
2387            .unwrap();
2388        ks.zrem("z", &["only".into()]).unwrap();
2389        assert!(!ks.exists("z"));
2390        assert_eq!(ks.stats().key_count, 0);
2391    }
2392
2393    #[test]
2394    fn zrem_missing_key() {
2395        let mut ks = Keyspace::new();
2396        assert!(ks.zrem("nope", &["a".into()]).unwrap().is_empty());
2397    }
2398
2399    #[test]
2400    fn zscore_returns_score() {
2401        let mut ks = Keyspace::new();
2402        ks.zadd("z", &[(42.5, "member".into())], &ZAddFlags::default())
2403            .unwrap();
2404        assert_eq!(ks.zscore("z", "member").unwrap(), Some(42.5));
2405        assert_eq!(ks.zscore("z", "missing").unwrap(), None);
2406    }
2407
2408    #[test]
2409    fn zscore_missing_key() {
2410        let mut ks = Keyspace::new();
2411        assert_eq!(ks.zscore("nope", "m").unwrap(), None);
2412    }
2413
2414    #[test]
2415    fn zrank_returns_rank() {
2416        let mut ks = Keyspace::new();
2417        ks.zadd(
2418            "z",
2419            &[
2420                (300.0, "c".into()),
2421                (100.0, "a".into()),
2422                (200.0, "b".into()),
2423            ],
2424            &ZAddFlags::default(),
2425        )
2426        .unwrap();
2427        assert_eq!(ks.zrank("z", "a").unwrap(), Some(0));
2428        assert_eq!(ks.zrank("z", "b").unwrap(), Some(1));
2429        assert_eq!(ks.zrank("z", "c").unwrap(), Some(2));
2430        assert_eq!(ks.zrank("z", "d").unwrap(), None);
2431    }
2432
2433    #[test]
2434    fn zrange_returns_range() {
2435        let mut ks = Keyspace::new();
2436        ks.zadd(
2437            "z",
2438            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
2439            &ZAddFlags::default(),
2440        )
2441        .unwrap();
2442
2443        let all = ks.zrange("z", 0, -1).unwrap();
2444        assert_eq!(
2445            all,
2446            vec![
2447                ("a".to_owned(), 1.0),
2448                ("b".to_owned(), 2.0),
2449                ("c".to_owned(), 3.0),
2450            ]
2451        );
2452
2453        let middle = ks.zrange("z", 1, 1).unwrap();
2454        assert_eq!(middle, vec![("b".to_owned(), 2.0)]);
2455
2456        let last_two = ks.zrange("z", -2, -1).unwrap();
2457        assert_eq!(last_two, vec![("b".to_owned(), 2.0), ("c".to_owned(), 3.0)]);
2458    }
2459
2460    #[test]
2461    fn zrange_missing_key() {
2462        let mut ks = Keyspace::new();
2463        assert!(ks.zrange("nope", 0, -1).unwrap().is_empty());
2464    }
2465
2466    #[test]
2467    fn zadd_on_string_key_returns_wrongtype() {
2468        let mut ks = Keyspace::new();
2469        ks.set("s".into(), Bytes::from("val"), None);
2470        assert!(ks
2471            .zadd("s", &[(1.0, "m".into())], &ZAddFlags::default())
2472            .is_err());
2473    }
2474
2475    #[test]
2476    fn zrem_on_string_key_returns_wrongtype() {
2477        let mut ks = Keyspace::new();
2478        ks.set("s".into(), Bytes::from("val"), None);
2479        assert!(ks.zrem("s", &["m".into()]).is_err());
2480    }
2481
2482    #[test]
2483    fn zscore_on_list_key_returns_wrongtype() {
2484        let mut ks = Keyspace::new();
2485        ks.rpush("l", &[Bytes::from("item")]).unwrap();
2486        assert!(ks.zscore("l", "m").is_err());
2487    }
2488
2489    #[test]
2490    fn zrank_on_string_key_returns_wrongtype() {
2491        let mut ks = Keyspace::new();
2492        ks.set("s".into(), Bytes::from("val"), None);
2493        assert!(ks.zrank("s", "m").is_err());
2494    }
2495
2496    #[test]
2497    fn zrange_on_string_key_returns_wrongtype() {
2498        let mut ks = Keyspace::new();
2499        ks.set("s".into(), Bytes::from("val"), None);
2500        assert!(ks.zrange("s", 0, -1).is_err());
2501    }
2502
2503    #[test]
2504    fn sorted_set_memory_tracked() {
2505        let mut ks = Keyspace::new();
2506        let before = ks.stats().used_bytes;
2507        ks.zadd("z", &[(1.0, "alice".into())], &ZAddFlags::default())
2508            .unwrap();
2509        let after_add = ks.stats().used_bytes;
2510        assert!(after_add > before);
2511
2512        ks.zadd("z", &[(2.0, "bob".into())], &ZAddFlags::default())
2513            .unwrap();
2514        let after_second = ks.stats().used_bytes;
2515        assert!(after_second > after_add);
2516
2517        ks.zrem("z", &["alice".into()]).unwrap();
2518        let after_remove = ks.stats().used_bytes;
2519        assert!(after_remove < after_second);
2520    }
2521
2522    #[test]
2523    fn incr_new_key_defaults_to_zero() {
2524        let mut ks = Keyspace::new();
2525        assert_eq!(ks.incr("counter").unwrap(), 1);
2526        // verify the stored value
2527        match ks.get("counter").unwrap() {
2528            Some(Value::String(data)) => assert_eq!(data, Bytes::from("1")),
2529            other => panic!("expected String(\"1\"), got {other:?}"),
2530        }
2531    }
2532
2533    #[test]
2534    fn incr_existing_value() {
2535        let mut ks = Keyspace::new();
2536        ks.set("n".into(), Bytes::from("10"), None);
2537        assert_eq!(ks.incr("n").unwrap(), 11);
2538    }
2539
2540    #[test]
2541    fn decr_new_key_defaults_to_zero() {
2542        let mut ks = Keyspace::new();
2543        assert_eq!(ks.decr("counter").unwrap(), -1);
2544    }
2545
2546    #[test]
2547    fn decr_existing_value() {
2548        let mut ks = Keyspace::new();
2549        ks.set("n".into(), Bytes::from("10"), None);
2550        assert_eq!(ks.decr("n").unwrap(), 9);
2551    }
2552
2553    #[test]
2554    fn incr_non_integer_returns_error() {
2555        let mut ks = Keyspace::new();
2556        ks.set("s".into(), Bytes::from("notanum"), None);
2557        assert_eq!(ks.incr("s").unwrap_err(), IncrError::NotAnInteger);
2558    }
2559
2560    #[test]
2561    fn incr_on_list_returns_wrongtype() {
2562        let mut ks = Keyspace::new();
2563        ks.lpush("list", &[Bytes::from("a")]).unwrap();
2564        assert_eq!(ks.incr("list").unwrap_err(), IncrError::WrongType);
2565    }
2566
2567    #[test]
2568    fn incr_overflow_returns_error() {
2569        let mut ks = Keyspace::new();
2570        ks.set("max".into(), Bytes::from(i64::MAX.to_string()), None);
2571        assert_eq!(ks.incr("max").unwrap_err(), IncrError::Overflow);
2572    }
2573
2574    #[test]
2575    fn decr_overflow_returns_error() {
2576        let mut ks = Keyspace::new();
2577        ks.set("min".into(), Bytes::from(i64::MIN.to_string()), None);
2578        assert_eq!(ks.decr("min").unwrap_err(), IncrError::Overflow);
2579    }
2580
2581    #[test]
2582    fn incr_preserves_ttl() {
2583        let mut ks = Keyspace::new();
2584        ks.set("n".into(), Bytes::from("5"), Some(Duration::from_secs(60)));
2585        ks.incr("n").unwrap();
2586        match ks.ttl("n") {
2587            TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
2588            other => panic!("expected TTL preserved, got {other:?}"),
2589        }
2590    }
2591
2592    #[test]
2593    fn zrem_returns_actually_removed_members() {
2594        let mut ks = Keyspace::new();
2595        ks.zadd(
2596            "z",
2597            &[(1.0, "a".into()), (2.0, "b".into())],
2598            &ZAddFlags::default(),
2599        )
2600        .unwrap();
2601        // "a" exists, "ghost" doesn't — only "a" should be in the result
2602        let removed = ks.zrem("z", &["a".into(), "ghost".into()]).unwrap();
2603        assert_eq!(removed, vec!["a".to_owned()]);
2604    }
2605
2606    #[test]
2607    fn zcard_returns_count() {
2608        let mut ks = Keyspace::new();
2609        ks.zadd(
2610            "z",
2611            &[(1.0, "a".into()), (2.0, "b".into())],
2612            &ZAddFlags::default(),
2613        )
2614        .unwrap();
2615        assert_eq!(ks.zcard("z").unwrap(), 2);
2616    }
2617
2618    #[test]
2619    fn zcard_missing_key_returns_zero() {
2620        let mut ks = Keyspace::new();
2621        assert_eq!(ks.zcard("missing").unwrap(), 0);
2622    }
2623
2624    #[test]
2625    fn zcard_on_string_key_returns_wrongtype() {
2626        let mut ks = Keyspace::new();
2627        ks.set("s".into(), Bytes::from("val"), None);
2628        assert!(ks.zcard("s").is_err());
2629    }
2630
2631    #[test]
2632    fn persist_removes_expiry() {
2633        let mut ks = Keyspace::new();
2634        ks.set(
2635            "key".into(),
2636            Bytes::from("val"),
2637            Some(Duration::from_secs(60)),
2638        );
2639        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2640
2641        assert!(ks.persist("key"));
2642        assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2643        assert_eq!(ks.stats().keys_with_expiry, 0);
2644    }
2645
2646    #[test]
2647    fn persist_returns_false_without_expiry() {
2648        let mut ks = Keyspace::new();
2649        ks.set("key".into(), Bytes::from("val"), None);
2650        assert!(!ks.persist("key"));
2651    }
2652
2653    #[test]
2654    fn persist_returns_false_for_missing_key() {
2655        let mut ks = Keyspace::new();
2656        assert!(!ks.persist("missing"));
2657    }
2658
2659    #[test]
2660    fn pttl_returns_milliseconds() {
2661        let mut ks = Keyspace::new();
2662        ks.set(
2663            "key".into(),
2664            Bytes::from("val"),
2665            Some(Duration::from_secs(60)),
2666        );
2667        match ks.pttl("key") {
2668            TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2669            other => panic!("expected Milliseconds, got {other:?}"),
2670        }
2671    }
2672
2673    #[test]
2674    fn pttl_no_expiry() {
2675        let mut ks = Keyspace::new();
2676        ks.set("key".into(), Bytes::from("val"), None);
2677        assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2678    }
2679
2680    #[test]
2681    fn pttl_not_found() {
2682        let mut ks = Keyspace::new();
2683        assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2684    }
2685
2686    #[test]
2687    fn pexpire_sets_ttl_in_millis() {
2688        let mut ks = Keyspace::new();
2689        ks.set("key".into(), Bytes::from("val"), None);
2690        assert!(ks.pexpire("key", 5000));
2691        match ks.pttl("key") {
2692            TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2693            other => panic!("expected Milliseconds, got {other:?}"),
2694        }
2695        assert_eq!(ks.stats().keys_with_expiry, 1);
2696    }
2697
2698    #[test]
2699    fn pexpire_missing_key_returns_false() {
2700        let mut ks = Keyspace::new();
2701        assert!(!ks.pexpire("missing", 5000));
2702    }
2703
2704    #[test]
2705    fn pexpire_overwrites_existing_ttl() {
2706        let mut ks = Keyspace::new();
2707        ks.set(
2708            "key".into(),
2709            Bytes::from("val"),
2710            Some(Duration::from_secs(60)),
2711        );
2712        assert!(ks.pexpire("key", 500));
2713        match ks.pttl("key") {
2714            TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2715            other => panic!("expected Milliseconds, got {other:?}"),
2716        }
2717        // expiry count shouldn't double-count
2718        assert_eq!(ks.stats().keys_with_expiry, 1);
2719    }
2720
2721    // -- memory limit enforcement for list/zset --
2722
2723    #[test]
2724    fn lpush_rejects_when_memory_full() {
2725        let config = ShardConfig {
2726            max_memory: Some(150),
2727            eviction_policy: EvictionPolicy::NoEviction,
2728            ..ShardConfig::default()
2729        };
2730        let mut ks = Keyspace::with_config(config);
2731
2732        // first key eats up most of the budget
2733        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2734
2735        // lpush should be rejected — not enough room
2736        let result = ks.lpush("list", &[Bytes::from("big-value-here")]);
2737        assert_eq!(result, Err(WriteError::OutOfMemory));
2738
2739        // original key should be untouched
2740        assert!(ks.exists("a"));
2741    }
2742
2743    #[test]
2744    fn rpush_rejects_when_memory_full() {
2745        let config = ShardConfig {
2746            max_memory: Some(150),
2747            eviction_policy: EvictionPolicy::NoEviction,
2748            ..ShardConfig::default()
2749        };
2750        let mut ks = Keyspace::with_config(config);
2751
2752        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2753
2754        let result = ks.rpush("list", &[Bytes::from("big-value-here")]);
2755        assert_eq!(result, Err(WriteError::OutOfMemory));
2756    }
2757
2758    #[test]
2759    fn zadd_rejects_when_memory_full() {
2760        let config = ShardConfig {
2761            max_memory: Some(150),
2762            eviction_policy: EvictionPolicy::NoEviction,
2763            ..ShardConfig::default()
2764        };
2765        let mut ks = Keyspace::with_config(config);
2766
2767        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2768
2769        let result = ks.zadd("z", &[(1.0, "member".into())], &ZAddFlags::default());
2770        assert!(matches!(result, Err(WriteError::OutOfMemory)));
2771
2772        // original key should be untouched
2773        assert!(ks.exists("a"));
2774    }
2775
2776    #[test]
2777    fn lpush_evicts_under_lru_policy() {
2778        let config = ShardConfig {
2779            max_memory: Some(200),
2780            eviction_policy: EvictionPolicy::AllKeysLru,
2781            ..ShardConfig::default()
2782        };
2783        let mut ks = Keyspace::with_config(config);
2784
2785        assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2786
2787        // should evict "a" to make room for the list
2788        assert!(ks.lpush("list", &[Bytes::from("item")]).is_ok());
2789        assert!(!ks.exists("a"));
2790    }
2791
2792    #[test]
2793    fn clear_removes_all_keys() {
2794        let mut ks = Keyspace::new();
2795        ks.set("a".into(), Bytes::from("1"), None);
2796        ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(60)));
2797        ks.lpush("list", &[Bytes::from("x")]).unwrap();
2798
2799        assert_eq!(ks.len(), 3);
2800        assert!(ks.stats().used_bytes > 0);
2801        assert_eq!(ks.stats().keys_with_expiry, 1);
2802
2803        ks.clear();
2804
2805        assert_eq!(ks.len(), 0);
2806        assert!(ks.is_empty());
2807        assert_eq!(ks.stats().used_bytes, 0);
2808        assert_eq!(ks.stats().keys_with_expiry, 0);
2809    }
2810
2811    // --- scan ---
2812
2813    #[test]
2814    fn scan_returns_keys() {
2815        let mut ks = Keyspace::new();
2816        ks.set("key1".into(), Bytes::from("a"), None);
2817        ks.set("key2".into(), Bytes::from("b"), None);
2818        ks.set("key3".into(), Bytes::from("c"), None);
2819
2820        let (cursor, keys) = ks.scan_keys(0, 10, None);
2821        assert_eq!(cursor, 0); // complete in one pass
2822        assert_eq!(keys.len(), 3);
2823    }
2824
2825    #[test]
2826    fn scan_empty_keyspace() {
2827        let ks = Keyspace::new();
2828        let (cursor, keys) = ks.scan_keys(0, 10, None);
2829        assert_eq!(cursor, 0);
2830        assert!(keys.is_empty());
2831    }
2832
2833    #[test]
2834    fn scan_with_pattern() {
2835        let mut ks = Keyspace::new();
2836        ks.set("user:1".into(), Bytes::from("a"), None);
2837        ks.set("user:2".into(), Bytes::from("b"), None);
2838        ks.set("item:1".into(), Bytes::from("c"), None);
2839
2840        let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
2841        assert_eq!(cursor, 0);
2842        assert_eq!(keys.len(), 2);
2843        for k in &keys {
2844            assert!(k.starts_with("user:"));
2845        }
2846    }
2847
2848    #[test]
2849    fn scan_with_count_limit() {
2850        let mut ks = Keyspace::new();
2851        for i in 0..10 {
2852            ks.set(format!("k{i}"), Bytes::from("v"), None);
2853        }
2854
2855        // first batch
2856        let (cursor, keys) = ks.scan_keys(0, 3, None);
2857        assert!(!keys.is_empty());
2858        assert!(keys.len() <= 3);
2859
2860        // if there are more keys, cursor should be non-zero
2861        if cursor != 0 {
2862            let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
2863            assert!(!keys2.is_empty());
2864            // continue until complete
2865            let _ = (cursor2, keys2);
2866        }
2867    }
2868
2869    #[test]
2870    fn scan_skips_expired_keys() {
2871        let mut ks = Keyspace::new();
2872        ks.set("live".into(), Bytes::from("a"), None);
2873        ks.set(
2874            "expired".into(),
2875            Bytes::from("b"),
2876            Some(Duration::from_millis(1)),
2877        );
2878
2879        std::thread::sleep(Duration::from_millis(5));
2880
2881        let (_, keys) = ks.scan_keys(0, 10, None);
2882        assert_eq!(keys.len(), 1);
2883        assert_eq!(keys[0], "live");
2884    }
2885
2886    #[test]
2887    fn glob_match_star() {
2888        assert!(super::glob_match("user:*", "user:123"));
2889        assert!(super::glob_match("user:*", "user:"));
2890        assert!(super::glob_match("*:data", "foo:data"));
2891        assert!(!super::glob_match("user:*", "item:123"));
2892    }
2893
2894    #[test]
2895    fn glob_match_question() {
2896        assert!(super::glob_match("key?", "key1"));
2897        assert!(super::glob_match("key?", "keya"));
2898        assert!(!super::glob_match("key?", "key"));
2899        assert!(!super::glob_match("key?", "key12"));
2900    }
2901
2902    #[test]
2903    fn glob_match_brackets() {
2904        assert!(super::glob_match("key[abc]", "keya"));
2905        assert!(super::glob_match("key[abc]", "keyb"));
2906        assert!(!super::glob_match("key[abc]", "keyd"));
2907    }
2908
2909    #[test]
2910    fn glob_match_literal() {
2911        assert!(super::glob_match("exact", "exact"));
2912        assert!(!super::glob_match("exact", "exactnot"));
2913        assert!(!super::glob_match("exact", "notexact"));
2914    }
2915
2916    // --- hash tests ---
2917
2918    #[test]
2919    fn hset_creates_hash() {
2920        let mut ks = Keyspace::new();
2921        let count = ks
2922            .hset("h", &[("field1".into(), Bytes::from("value1"))])
2923            .unwrap();
2924        assert_eq!(count, 1);
2925        assert_eq!(ks.value_type("h"), "hash");
2926    }
2927
2928    #[test]
2929    fn hset_returns_new_field_count() {
2930        let mut ks = Keyspace::new();
2931        // add two new fields
2932        let count = ks
2933            .hset(
2934                "h",
2935                &[
2936                    ("f1".into(), Bytes::from("v1")),
2937                    ("f2".into(), Bytes::from("v2")),
2938                ],
2939            )
2940            .unwrap();
2941        assert_eq!(count, 2);
2942
2943        // update one, add one new
2944        let count = ks
2945            .hset(
2946                "h",
2947                &[
2948                    ("f1".into(), Bytes::from("updated")),
2949                    ("f3".into(), Bytes::from("v3")),
2950                ],
2951            )
2952            .unwrap();
2953        assert_eq!(count, 1); // only f3 is new
2954    }
2955
2956    #[test]
2957    fn hget_returns_value() {
2958        let mut ks = Keyspace::new();
2959        ks.hset("h", &[("name".into(), Bytes::from("alice"))])
2960            .unwrap();
2961        let val = ks.hget("h", "name").unwrap();
2962        assert_eq!(val, Some(Bytes::from("alice")));
2963    }
2964
2965    #[test]
2966    fn hget_missing_field_returns_none() {
2967        let mut ks = Keyspace::new();
2968        ks.hset("h", &[("a".into(), Bytes::from("1"))]).unwrap();
2969        assert_eq!(ks.hget("h", "b").unwrap(), None);
2970    }
2971
2972    #[test]
2973    fn hget_missing_key_returns_none() {
2974        let mut ks = Keyspace::new();
2975        assert_eq!(ks.hget("missing", "field").unwrap(), None);
2976    }
2977
2978    #[test]
2979    fn hgetall_returns_all_fields() {
2980        let mut ks = Keyspace::new();
2981        ks.hset(
2982            "h",
2983            &[
2984                ("a".into(), Bytes::from("1")),
2985                ("b".into(), Bytes::from("2")),
2986            ],
2987        )
2988        .unwrap();
2989        let mut fields = ks.hgetall("h").unwrap();
2990        fields.sort_by(|a, b| a.0.cmp(&b.0));
2991        assert_eq!(fields.len(), 2);
2992        assert_eq!(fields[0], ("a".into(), Bytes::from("1")));
2993        assert_eq!(fields[1], ("b".into(), Bytes::from("2")));
2994    }
2995
2996    #[test]
2997    fn hdel_removes_fields() {
2998        let mut ks = Keyspace::new();
2999        ks.hset(
3000            "h",
3001            &[
3002                ("a".into(), Bytes::from("1")),
3003                ("b".into(), Bytes::from("2")),
3004                ("c".into(), Bytes::from("3")),
3005            ],
3006        )
3007        .unwrap();
3008        let removed = ks.hdel("h", &["a".into(), "c".into()]).unwrap();
3009        assert_eq!(removed.len(), 2);
3010        assert!(removed.contains(&"a".into()));
3011        assert!(removed.contains(&"c".into()));
3012        assert_eq!(ks.hlen("h").unwrap(), 1);
3013    }
3014
3015    #[test]
3016    fn hdel_auto_deletes_empty_hash() {
3017        let mut ks = Keyspace::new();
3018        ks.hset("h", &[("only".into(), Bytes::from("field"))])
3019            .unwrap();
3020        ks.hdel("h", &["only".into()]).unwrap();
3021        assert_eq!(ks.value_type("h"), "none");
3022    }
3023
3024    #[test]
3025    fn hexists_returns_true_for_existing_field() {
3026        let mut ks = Keyspace::new();
3027        ks.hset("h", &[("field".into(), Bytes::from("val"))])
3028            .unwrap();
3029        assert!(ks.hexists("h", "field").unwrap());
3030    }
3031
3032    #[test]
3033    fn hexists_returns_false_for_missing_field() {
3034        let mut ks = Keyspace::new();
3035        ks.hset("h", &[("a".into(), Bytes::from("1"))]).unwrap();
3036        assert!(!ks.hexists("h", "missing").unwrap());
3037    }
3038
3039    #[test]
3040    fn hlen_returns_field_count() {
3041        let mut ks = Keyspace::new();
3042        ks.hset(
3043            "h",
3044            &[
3045                ("a".into(), Bytes::from("1")),
3046                ("b".into(), Bytes::from("2")),
3047            ],
3048        )
3049        .unwrap();
3050        assert_eq!(ks.hlen("h").unwrap(), 2);
3051    }
3052
3053    #[test]
3054    fn hlen_missing_key_returns_zero() {
3055        let mut ks = Keyspace::new();
3056        assert_eq!(ks.hlen("missing").unwrap(), 0);
3057    }
3058
3059    #[test]
3060    fn hincrby_new_field() {
3061        let mut ks = Keyspace::new();
3062        ks.hset("h", &[("x".into(), Bytes::from("ignored"))])
3063            .unwrap();
3064        let val = ks.hincrby("h", "counter", 5).unwrap();
3065        assert_eq!(val, 5);
3066    }
3067
3068    #[test]
3069    fn hincrby_existing_field() {
3070        let mut ks = Keyspace::new();
3071        ks.hset("h", &[("n".into(), Bytes::from("10"))]).unwrap();
3072        let val = ks.hincrby("h", "n", 3).unwrap();
3073        assert_eq!(val, 13);
3074    }
3075
3076    #[test]
3077    fn hincrby_negative_delta() {
3078        let mut ks = Keyspace::new();
3079        ks.hset("h", &[("n".into(), Bytes::from("10"))]).unwrap();
3080        let val = ks.hincrby("h", "n", -7).unwrap();
3081        assert_eq!(val, 3);
3082    }
3083
3084    #[test]
3085    fn hincrby_non_integer_returns_error() {
3086        let mut ks = Keyspace::new();
3087        ks.hset("h", &[("s".into(), Bytes::from("notanumber"))])
3088            .unwrap();
3089        assert_eq!(
3090            ks.hincrby("h", "s", 1).unwrap_err(),
3091            IncrError::NotAnInteger
3092        );
3093    }
3094
3095    #[test]
3096    fn hkeys_returns_field_names() {
3097        let mut ks = Keyspace::new();
3098        ks.hset(
3099            "h",
3100            &[
3101                ("alpha".into(), Bytes::from("1")),
3102                ("beta".into(), Bytes::from("2")),
3103            ],
3104        )
3105        .unwrap();
3106        let mut keys = ks.hkeys("h").unwrap();
3107        keys.sort();
3108        assert_eq!(keys, vec!["alpha", "beta"]);
3109    }
3110
3111    #[test]
3112    fn hvals_returns_values() {
3113        let mut ks = Keyspace::new();
3114        ks.hset(
3115            "h",
3116            &[
3117                ("a".into(), Bytes::from("x")),
3118                ("b".into(), Bytes::from("y")),
3119            ],
3120        )
3121        .unwrap();
3122        let mut vals = ks.hvals("h").unwrap();
3123        vals.sort();
3124        assert_eq!(vals, vec![Bytes::from("x"), Bytes::from("y")]);
3125    }
3126
3127    #[test]
3128    fn hmget_returns_values_for_existing_fields() {
3129        let mut ks = Keyspace::new();
3130        ks.hset(
3131            "h",
3132            &[
3133                ("a".into(), Bytes::from("1")),
3134                ("b".into(), Bytes::from("2")),
3135            ],
3136        )
3137        .unwrap();
3138        let vals = ks
3139            .hmget("h", &["a".into(), "missing".into(), "b".into()])
3140            .unwrap();
3141        assert_eq!(vals.len(), 3);
3142        assert_eq!(vals[0], Some(Bytes::from("1")));
3143        assert_eq!(vals[1], None);
3144        assert_eq!(vals[2], Some(Bytes::from("2")));
3145    }
3146
3147    #[test]
3148    fn hash_on_string_key_returns_wrongtype() {
3149        let mut ks = Keyspace::new();
3150        ks.set("s".into(), Bytes::from("string"), None);
3151        assert!(ks.hset("s", &[("f".into(), Bytes::from("v"))]).is_err());
3152        assert!(ks.hget("s", "f").is_err());
3153        assert!(ks.hgetall("s").is_err());
3154        assert!(ks.hdel("s", &["f".into()]).is_err());
3155        assert!(ks.hexists("s", "f").is_err());
3156        assert!(ks.hlen("s").is_err());
3157        assert!(ks.hincrby("s", "f", 1).is_err());
3158        assert!(ks.hkeys("s").is_err());
3159        assert!(ks.hvals("s").is_err());
3160        assert!(ks.hmget("s", &["f".into()]).is_err());
3161    }
3162
3163    // --- set tests ---
3164
3165    #[test]
3166    fn sadd_creates_set() {
3167        let mut ks = Keyspace::new();
3168        let added = ks.sadd("s", &["a".into(), "b".into()]).unwrap();
3169        assert_eq!(added, 2);
3170        assert_eq!(ks.value_type("s"), "set");
3171    }
3172
3173    #[test]
3174    fn sadd_returns_new_member_count() {
3175        let mut ks = Keyspace::new();
3176        ks.sadd("s", &["a".into(), "b".into()]).unwrap();
3177        // add one existing, one new
3178        let added = ks.sadd("s", &["b".into(), "c".into()]).unwrap();
3179        assert_eq!(added, 1); // only "c" is new
3180    }
3181
3182    #[test]
3183    fn srem_removes_members() {
3184        let mut ks = Keyspace::new();
3185        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
3186        let removed = ks.srem("s", &["a".into(), "c".into()]).unwrap();
3187        assert_eq!(removed, 2);
3188        assert_eq!(ks.scard("s").unwrap(), 1);
3189    }
3190
3191    #[test]
3192    fn srem_auto_deletes_empty_set() {
3193        let mut ks = Keyspace::new();
3194        ks.sadd("s", &["only".into()]).unwrap();
3195        ks.srem("s", &["only".into()]).unwrap();
3196        assert_eq!(ks.value_type("s"), "none");
3197    }
3198
3199    #[test]
3200    fn smembers_returns_all_members() {
3201        let mut ks = Keyspace::new();
3202        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
3203        let mut members = ks.smembers("s").unwrap();
3204        members.sort();
3205        assert_eq!(members, vec!["a", "b", "c"]);
3206    }
3207
3208    #[test]
3209    fn smembers_missing_key_returns_empty() {
3210        let mut ks = Keyspace::new();
3211        assert_eq!(ks.smembers("missing").unwrap(), Vec::<String>::new());
3212    }
3213
3214    #[test]
3215    fn sismember_returns_true_for_existing() {
3216        let mut ks = Keyspace::new();
3217        ks.sadd("s", &["member".into()]).unwrap();
3218        assert!(ks.sismember("s", "member").unwrap());
3219    }
3220
3221    #[test]
3222    fn sismember_returns_false_for_missing() {
3223        let mut ks = Keyspace::new();
3224        ks.sadd("s", &["a".into()]).unwrap();
3225        assert!(!ks.sismember("s", "missing").unwrap());
3226    }
3227
3228    #[test]
3229    fn scard_returns_count() {
3230        let mut ks = Keyspace::new();
3231        ks.sadd("s", &["a".into(), "b".into(), "c".into()]).unwrap();
3232        assert_eq!(ks.scard("s").unwrap(), 3);
3233    }
3234
3235    #[test]
3236    fn scard_missing_key_returns_zero() {
3237        let mut ks = Keyspace::new();
3238        assert_eq!(ks.scard("missing").unwrap(), 0);
3239    }
3240
3241    #[test]
3242    fn set_on_string_key_returns_wrongtype() {
3243        let mut ks = Keyspace::new();
3244        ks.set("s".into(), Bytes::from("string"), None);
3245        assert!(ks.sadd("s", &["m".into()]).is_err());
3246        assert!(ks.srem("s", &["m".into()]).is_err());
3247        assert!(ks.smembers("s").is_err());
3248        assert!(ks.sismember("s", "m").is_err());
3249        assert!(ks.scard("s").is_err());
3250    }
3251}