Skip to main content

ember_core/
concurrent.rs

1//! Concurrent keyspace using DashMap for lock-free multi-threaded access.
2//!
3//! This is an alternative to the sharded architecture that eliminates channel
4//! overhead by allowing direct access from multiple connection handlers.
5
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9use bytes::Bytes;
10use dashmap::DashMap;
11
12use crate::keyspace::{format_float, EvictionPolicy, TtlResult};
13use crate::memory;
14use crate::time;
15
16/// Errors from integer/float operations on the concurrent keyspace.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum ConcurrentOpError {
19    /// Value cannot be parsed as a number.
20    NotAnInteger,
21    /// Increment or decrement would overflow i64.
22    Overflow,
23}
24
25impl std::fmt::Display for ConcurrentOpError {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        match self {
28            Self::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
29            Self::Overflow => write!(f, "ERR increment or decrement would overflow"),
30        }
31    }
32}
33
34impl std::error::Error for ConcurrentOpError {}
35
36/// Errors from float operations on the concurrent keyspace.
37#[derive(Debug, Clone, PartialEq)]
38pub enum ConcurrentFloatError {
39    /// Value cannot be parsed as a float.
40    NotAFloat,
41    /// Result would be NaN or Infinity.
42    NanOrInfinity,
43}
44
45impl std::fmt::Display for ConcurrentFloatError {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            Self::NotAFloat => write!(f, "ERR value is not a valid float"),
49            Self::NanOrInfinity => write!(f, "ERR increment would produce NaN or Infinity"),
50        }
51    }
52}
53
54impl std::error::Error for ConcurrentFloatError {}
55
56/// An entry in the concurrent keyspace.
57/// Optimized for memory: 40 bytes (down from 56).
58#[derive(Debug, Clone)]
59struct Entry {
60    value: Bytes,
61    /// Monotonic expiry timestamp in ms. 0 = no expiry.
62    expires_at_ms: u64,
63}
64
65impl Entry {
66    #[inline]
67    fn is_expired(&self) -> bool {
68        time::is_expired(self.expires_at_ms)
69    }
70
71    /// Compute entry size on demand (key_len passed in).
72    #[inline]
73    fn size(&self, key_len: usize) -> usize {
74        // key heap + value heap + entry struct overhead
75        key_len + self.value.len() + 48
76    }
77}
78
79/// A concurrent keyspace backed by DashMap.
80///
81/// Provides thread-safe access to key-value data without channel overhead.
82/// All operations are lock-free for non-conflicting keys.
83#[derive(Debug)]
84pub struct ConcurrentKeyspace {
85    /// Using Box<str> instead of String saves 8 bytes per key (no capacity field).
86    data: DashMap<Box<str>, Entry>,
87    memory_used: AtomicUsize,
88    max_memory: Option<usize>,
89    eviction_policy: EvictionPolicy,
90    ops_count: AtomicU64,
91}
92
93impl ConcurrentKeyspace {
94    /// Creates a new concurrent keyspace with optional memory limit.
95    pub fn new(max_memory: Option<usize>, eviction_policy: EvictionPolicy) -> Self {
96        Self {
97            data: DashMap::new(),
98            memory_used: AtomicUsize::new(0),
99            max_memory,
100            eviction_policy,
101            ops_count: AtomicU64::new(0),
102        }
103    }
104
105    /// Gets a value by key, returning None if not found or expired.
106    pub fn get(&self, key: &str) -> Option<Bytes> {
107        self.ops_count.fetch_add(1, Ordering::Relaxed);
108
109        let entry = self.data.get(key)?;
110
111        if entry.is_expired() {
112            let key_len = entry.key().len();
113            let size = entry.size(key_len);
114            drop(entry);
115            // Remove expired entry
116            if self.data.remove(key).is_some() {
117                self.memory_used.fetch_sub(size, Ordering::Relaxed);
118            }
119            return None;
120        }
121
122        Some(entry.value.clone())
123    }
124
125    /// Sets a key-value pair with optional TTL.
126    pub fn set(&self, key: String, value: Bytes, ttl: Option<Duration>) -> bool {
127        self.ops_count.fetch_add(1, Ordering::Relaxed);
128
129        let key: Box<str> = key.into_boxed_str();
130        let entry_size = key.len() + value.len() + 48;
131        let expires_at_ms = time::expiry_from_duration(ttl);
132
133        // Check memory limit (with safety margin for allocator overhead)
134        if let Some(max) = self.max_memory {
135            let limit = memory::effective_limit(max);
136            let current = self.memory_used.load(Ordering::Relaxed);
137            if current + entry_size > limit {
138                if self.eviction_policy == EvictionPolicy::NoEviction {
139                    return false;
140                }
141                // Simple eviction: remove some entries
142                self.evict_entries(entry_size);
143            }
144        }
145
146        let entry = Entry {
147            value,
148            expires_at_ms,
149        };
150
151        // Update memory tracking
152        if let Some(old) = self.data.insert(key.clone(), entry) {
153            // Replace: adjust memory
154            let old_size = old.size(key.len());
155            let diff = entry_size as isize - old_size as isize;
156            if diff > 0 {
157                self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
158            } else {
159                self.memory_used
160                    .fetch_sub((-diff) as usize, Ordering::Relaxed);
161            }
162        } else {
163            self.memory_used.fetch_add(entry_size, Ordering::Relaxed);
164        }
165
166        true
167    }
168
169    /// Deletes a key, returning true if it existed.
170    pub fn del(&self, key: &str) -> bool {
171        self.ops_count.fetch_add(1, Ordering::Relaxed);
172
173        if let Some((k, removed)) = self.data.remove(key) {
174            self.memory_used
175                .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
176            true
177        } else {
178            false
179        }
180    }
181
182    /// Checks if a key exists (and is not expired).
183    pub fn exists(&self, key: &str) -> bool {
184        self.get(key).is_some()
185    }
186
187    /// Returns the TTL of a key.
188    pub fn ttl(&self, key: &str) -> TtlResult {
189        match self.data.get(key) {
190            None => TtlResult::NotFound,
191            Some(entry) => {
192                if entry.is_expired() {
193                    TtlResult::NotFound
194                } else {
195                    match time::remaining_secs(entry.expires_at_ms) {
196                        None => TtlResult::NoExpiry,
197                        Some(secs) => TtlResult::Seconds(secs),
198                    }
199                }
200            }
201        }
202    }
203
204    /// Sets expiration on a key.
205    pub fn expire(&self, key: &str, seconds: u64) -> bool {
206        self.ops_count.fetch_add(1, Ordering::Relaxed);
207
208        if let Some(mut entry) = self.data.get_mut(key) {
209            if entry.is_expired() {
210                return false;
211            }
212            entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
213            true
214        } else {
215            false
216        }
217    }
218
219    /// Increments the integer value of a key by 1.
220    /// If the key doesn't exist, it's initialized to 0 before incrementing.
221    pub fn incr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
222        self.incr_by(key, 1)
223    }
224
225    /// Decrements the integer value of a key by 1.
226    /// If the key doesn't exist, it's initialized to 0 before decrementing.
227    pub fn decr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
228        self.incr_by(key, -1)
229    }
230
231    /// Adds `delta` to the integer value of a key, creating it if missing.
232    /// Preserves existing TTL when updating.
233    pub fn incr_by(&self, key: &str, delta: i64) -> Result<i64, ConcurrentOpError> {
234        self.ops_count.fetch_add(1, Ordering::Relaxed);
235
236        // try to update in-place via get_mut
237        if let Some(mut entry) = self.data.get_mut(key) {
238            if entry.is_expired() {
239                let key_len = entry.key().len();
240                let old_size = entry.size(key_len);
241                drop(entry);
242                if self.data.remove(key).is_some() {
243                    self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
244                }
245                // treat as missing — fall through to insert below
246            } else {
247                let s = std::str::from_utf8(&entry.value)
248                    .map_err(|_| ConcurrentOpError::NotAnInteger)?;
249                let current: i64 = s.parse().map_err(|_| ConcurrentOpError::NotAnInteger)?;
250                let new_val = current
251                    .checked_add(delta)
252                    .ok_or(ConcurrentOpError::Overflow)?;
253                let new_bytes = Bytes::from(new_val.to_string());
254
255                let key_len = entry.key().len();
256                let old_size = entry.size(key_len);
257                entry.value = new_bytes;
258                let new_size = entry.size(key_len);
259                let diff = new_size as isize - old_size as isize;
260                if diff > 0 {
261                    self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
262                } else if diff < 0 {
263                    self.memory_used
264                        .fetch_sub((-diff) as usize, Ordering::Relaxed);
265                }
266                return Ok(new_val);
267            }
268        }
269
270        // key doesn't exist — treat as 0
271        let new_val = (0i64)
272            .checked_add(delta)
273            .ok_or(ConcurrentOpError::Overflow)?;
274        self.set(key.to_owned(), Bytes::from(new_val.to_string()), None);
275        Ok(new_val)
276    }
277
278    /// Adds `delta` to the float value of a key, creating it if missing.
279    /// Preserves existing TTL when updating.
280    pub fn incr_by_float(&self, key: &str, delta: f64) -> Result<f64, ConcurrentFloatError> {
281        self.ops_count.fetch_add(1, Ordering::Relaxed);
282
283        if let Some(mut entry) = self.data.get_mut(key) {
284            if entry.is_expired() {
285                let key_len = entry.key().len();
286                let old_size = entry.size(key_len);
287                drop(entry);
288                if self.data.remove(key).is_some() {
289                    self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
290                }
291            } else {
292                let s = std::str::from_utf8(&entry.value)
293                    .map_err(|_| ConcurrentFloatError::NotAFloat)?;
294                let current: f64 = s.parse().map_err(|_| ConcurrentFloatError::NotAFloat)?;
295                let new_val = current + delta;
296                if new_val.is_nan() || new_val.is_infinite() {
297                    return Err(ConcurrentFloatError::NanOrInfinity);
298                }
299                let new_bytes = Bytes::from(format_float(new_val));
300
301                let key_len = entry.key().len();
302                let old_size = entry.size(key_len);
303                entry.value = new_bytes;
304                let new_size = entry.size(key_len);
305                let diff = new_size as isize - old_size as isize;
306                if diff > 0 {
307                    self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
308                } else if diff < 0 {
309                    self.memory_used
310                        .fetch_sub((-diff) as usize, Ordering::Relaxed);
311                }
312                return Ok(new_val);
313            }
314        }
315
316        // key doesn't exist — treat as 0.0
317        let new_val = delta;
318        if new_val.is_nan() || new_val.is_infinite() {
319            return Err(ConcurrentFloatError::NanOrInfinity);
320        }
321        self.set(key.to_owned(), Bytes::from(format_float(new_val)), None);
322        Ok(new_val)
323    }
324
325    /// Appends a value to an existing string key, or creates a new key.
326    /// Returns the new string length.
327    pub fn append(&self, key: &str, suffix: &[u8]) -> usize {
328        self.ops_count.fetch_add(1, Ordering::Relaxed);
329
330        if let Some(mut entry) = self.data.get_mut(key) {
331            if !entry.is_expired() {
332                let mut new_data = Vec::with_capacity(entry.value.len() + suffix.len());
333                new_data.extend_from_slice(&entry.value);
334                new_data.extend_from_slice(suffix);
335                let new_len = new_data.len();
336
337                let key_len = entry.key().len();
338                let old_size = entry.size(key_len);
339                entry.value = Bytes::from(new_data);
340                let new_size = entry.size(key_len);
341                let diff = new_size as isize - old_size as isize;
342                if diff > 0 {
343                    self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
344                } else if diff < 0 {
345                    self.memory_used
346                        .fetch_sub((-diff) as usize, Ordering::Relaxed);
347                }
348                return new_len;
349            }
350            // expired — remove and fall through to create
351            let key_len = entry.key().len();
352            let old_size = entry.size(key_len);
353            drop(entry);
354            if self.data.remove(key).is_some() {
355                self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
356            }
357        }
358
359        // key doesn't exist — create with just the suffix
360        let new_len = suffix.len();
361        self.set(key.to_owned(), Bytes::copy_from_slice(suffix), None);
362        new_len
363    }
364
365    /// Returns the length of the string value stored at key.
366    /// Returns 0 if the key doesn't exist.
367    pub fn strlen(&self, key: &str) -> usize {
368        match self.get(key) {
369            Some(data) => data.len(),
370            None => 0,
371        }
372    }
373
374    /// Removes the expiration from a key.
375    /// Returns true if the timeout was successfully removed.
376    pub fn persist(&self, key: &str) -> bool {
377        self.ops_count.fetch_add(1, Ordering::Relaxed);
378
379        if let Some(mut entry) = self.data.get_mut(key) {
380            if entry.is_expired() {
381                return false;
382            }
383            if entry.expires_at_ms != 0 {
384                entry.expires_at_ms = 0;
385                true
386            } else {
387                false
388            }
389        } else {
390            false
391        }
392    }
393
394    /// Sets expiration in milliseconds on an existing key.
395    pub fn pexpire(&self, key: &str, millis: u64) -> bool {
396        self.ops_count.fetch_add(1, Ordering::Relaxed);
397
398        if let Some(mut entry) = self.data.get_mut(key) {
399            if entry.is_expired() {
400                return false;
401            }
402            entry.expires_at_ms = time::now_ms().saturating_add(millis);
403            true
404        } else {
405            false
406        }
407    }
408
409    /// Returns the remaining TTL in milliseconds.
410    pub fn pttl(&self, key: &str) -> TtlResult {
411        match self.data.get(key) {
412            None => TtlResult::NotFound,
413            Some(entry) => {
414                if entry.is_expired() {
415                    TtlResult::NotFound
416                } else {
417                    match time::remaining_ms(entry.expires_at_ms) {
418                        None => TtlResult::NoExpiry,
419                        Some(ms) => TtlResult::Milliseconds(ms),
420                    }
421                }
422            }
423        }
424    }
425
426    /// Returns all keys matching a glob pattern.
427    pub fn keys(&self, pattern: &str) -> Vec<String> {
428        let len = self.data.len();
429        if len > 10_000 {
430            tracing::warn!(
431                key_count = len,
432                "KEYS on large keyspace, consider SCAN instead"
433            );
434        }
435        self.data
436            .iter()
437            .filter(|entry| !entry.value().is_expired())
438            .filter(|entry| crate::keyspace::glob_match(pattern, entry.key()))
439            .map(|entry| entry.key().to_string())
440            .collect()
441    }
442
443    /// Iterates keys using a cursor. Returns (next_cursor, keys).
444    /// A next_cursor of 0 means the iteration is complete.
445    pub fn scan_keys(
446        &self,
447        cursor: u64,
448        count: usize,
449        pattern: Option<&str>,
450    ) -> (u64, Vec<String>) {
451        let target_count = if count == 0 { 10 } else { count };
452        let mut keys = Vec::with_capacity(target_count);
453        let mut position = 0u64;
454
455        for entry in self.data.iter() {
456            if entry.value().is_expired() {
457                continue;
458            }
459            if position < cursor {
460                position += 1;
461                continue;
462            }
463            if let Some(pat) = pattern {
464                if !crate::keyspace::glob_match(pat, entry.key()) {
465                    position += 1;
466                    continue;
467                }
468            }
469            keys.push(entry.key().to_string());
470            position += 1;
471            if keys.len() >= target_count {
472                // there may be more keys — return position as next cursor
473                return (position, keys);
474            }
475        }
476
477        // iteration complete
478        (0, keys)
479    }
480
481    /// Renames a key. Returns true if the source key existed.
482    pub fn rename(&self, key: &str, newkey: &str) -> Result<(), &'static str> {
483        self.ops_count.fetch_add(1, Ordering::Relaxed);
484
485        let (_, entry) = self.data.remove(key).ok_or("ERR no such key")?;
486
487        if entry.is_expired() {
488            let size = entry.size(key.len());
489            self.memory_used.fetch_sub(size, Ordering::Relaxed);
490            return Err("ERR no such key");
491        }
492
493        // remove destination if it exists
494        if let Some((k, old_dest)) = self.data.remove(newkey) {
495            self.memory_used
496                .fetch_sub(old_dest.size(k.len()), Ordering::Relaxed);
497        }
498
499        // adjust memory: old key removed, new key added
500        let old_key_len = key.len();
501        let new_key_len = newkey.len();
502        let old_mem = old_key_len + entry.value.len() + 48;
503        let new_mem = new_key_len + entry.value.len() + 48;
504
505        self.memory_used.fetch_sub(old_mem, Ordering::Relaxed);
506        self.memory_used.fetch_add(new_mem, Ordering::Relaxed);
507
508        self.data.insert(newkey.into(), entry);
509        Ok(())
510    }
511
512    /// Returns the number of keys.
513    pub fn len(&self) -> usize {
514        self.data.len()
515    }
516
517    /// Returns true if the keyspace is empty.
518    pub fn is_empty(&self) -> bool {
519        self.data.is_empty()
520    }
521
522    /// Returns memory usage in bytes.
523    pub fn memory_used(&self) -> usize {
524        self.memory_used.load(Ordering::Relaxed)
525    }
526
527    /// Returns the operation count.
528    pub fn ops_count(&self) -> u64 {
529        self.ops_count.load(Ordering::Relaxed)
530    }
531
532    /// Clears all keys.
533    pub fn clear(&self) {
534        self.data.clear();
535        self.memory_used.store(0, Ordering::Relaxed);
536    }
537
538    /// Simple eviction: remove approximately `needed` bytes worth of entries.
539    fn evict_entries(&self, needed: usize) {
540        let mut freed = 0usize;
541        let mut keys_to_remove = Vec::new();
542
543        // Collect keys to remove (can't remove while iterating)
544        for entry in self.data.iter() {
545            if freed >= needed {
546                break;
547            }
548            let key_len = entry.key().len();
549            keys_to_remove.push(entry.key().clone());
550            freed += entry.value().size(key_len);
551        }
552
553        // Remove collected keys
554        for key in keys_to_remove {
555            if let Some((k, removed)) = self.data.remove(&key) {
556                self.memory_used
557                    .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
558            }
559        }
560    }
561}
562
563impl Default for ConcurrentKeyspace {
564    fn default() -> Self {
565        Self::new(None, EvictionPolicy::NoEviction)
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572
573    #[test]
574    fn set_and_get() {
575        let ks = ConcurrentKeyspace::default();
576        assert!(ks.set("key".into(), Bytes::from("value"), None));
577        assert_eq!(ks.get("key"), Some(Bytes::from("value")));
578    }
579
580    #[test]
581    fn get_missing() {
582        let ks = ConcurrentKeyspace::default();
583        assert_eq!(ks.get("missing"), None);
584    }
585
586    #[test]
587    fn del_existing() {
588        let ks = ConcurrentKeyspace::default();
589        ks.set("key".into(), Bytes::from("value"), None);
590        assert!(ks.del("key"));
591        assert_eq!(ks.get("key"), None);
592    }
593
594    #[test]
595    fn del_missing() {
596        let ks = ConcurrentKeyspace::default();
597        assert!(!ks.del("missing"));
598    }
599
600    #[test]
601    fn exists_check() {
602        let ks = ConcurrentKeyspace::default();
603        ks.set("key".into(), Bytes::from("value"), None);
604        assert!(ks.exists("key"));
605        assert!(!ks.exists("missing"));
606    }
607
608    #[test]
609    fn ttl_expires() {
610        let ks = ConcurrentKeyspace::default();
611        ks.set(
612            "key".into(),
613            Bytes::from("value"),
614            Some(Duration::from_millis(10)),
615        );
616        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
617        std::thread::sleep(Duration::from_millis(20));
618        assert_eq!(ks.get("key"), None);
619    }
620
621    #[test]
622    fn incr_new_key() {
623        let ks = ConcurrentKeyspace::default();
624        assert_eq!(ks.incr("counter").unwrap(), 1);
625        assert_eq!(ks.get("counter"), Some(Bytes::from("1")));
626    }
627
628    #[test]
629    fn incr_existing_key() {
630        let ks = ConcurrentKeyspace::default();
631        ks.set("counter".into(), Bytes::from("10"), None);
632        assert_eq!(ks.incr("counter").unwrap(), 11);
633    }
634
635    #[test]
636    fn decr_below_zero() {
637        let ks = ConcurrentKeyspace::default();
638        assert_eq!(ks.decr("counter").unwrap(), -1);
639        assert_eq!(ks.decr("counter").unwrap(), -2);
640    }
641
642    #[test]
643    fn incr_by_delta() {
644        let ks = ConcurrentKeyspace::default();
645        assert_eq!(ks.incr_by("counter", 5).unwrap(), 5);
646        assert_eq!(ks.incr_by("counter", -3).unwrap(), 2);
647    }
648
649    #[test]
650    fn incr_non_integer_value() {
651        let ks = ConcurrentKeyspace::default();
652        ks.set("key".into(), Bytes::from("not_a_number"), None);
653        assert_eq!(ks.incr("key"), Err(ConcurrentOpError::NotAnInteger));
654    }
655
656    #[test]
657    fn incr_overflow() {
658        let ks = ConcurrentKeyspace::default();
659        ks.set("key".into(), Bytes::from(i64::MAX.to_string()), None);
660        assert_eq!(ks.incr("key"), Err(ConcurrentOpError::Overflow));
661    }
662
663    #[test]
664    fn incr_by_float_new_key() {
665        let ks = ConcurrentKeyspace::default();
666        let val = ks.incr_by_float("key", 2.5).unwrap();
667        assert!((val - 2.5).abs() < f64::EPSILON);
668    }
669
670    #[test]
671    fn incr_by_float_existing() {
672        let ks = ConcurrentKeyspace::default();
673        ks.set("key".into(), Bytes::from("10.5"), None);
674        let val = ks.incr_by_float("key", 1.5).unwrap();
675        assert!((val - 12.0).abs() < f64::EPSILON);
676    }
677
678    #[test]
679    fn incr_by_float_not_a_float() {
680        let ks = ConcurrentKeyspace::default();
681        ks.set("key".into(), Bytes::from("hello"), None);
682        assert_eq!(
683            ks.incr_by_float("key", 1.0),
684            Err(ConcurrentFloatError::NotAFloat)
685        );
686    }
687
688    #[test]
689    fn incr_by_float_infinity() {
690        let ks = ConcurrentKeyspace::default();
691        ks.set("key".into(), Bytes::from(f64::MAX.to_string()), None);
692        assert_eq!(
693            ks.incr_by_float("key", f64::MAX),
694            Err(ConcurrentFloatError::NanOrInfinity)
695        );
696    }
697
698    #[test]
699    fn append_new_key() {
700        let ks = ConcurrentKeyspace::default();
701        assert_eq!(ks.append("key", b"hello"), 5);
702        assert_eq!(ks.get("key"), Some(Bytes::from("hello")));
703    }
704
705    #[test]
706    fn append_existing_key() {
707        let ks = ConcurrentKeyspace::default();
708        ks.set("key".into(), Bytes::from("hello"), None);
709        assert_eq!(ks.append("key", b" world"), 11);
710        assert_eq!(ks.get("key"), Some(Bytes::from("hello world")));
711    }
712
713    #[test]
714    fn strlen_existing() {
715        let ks = ConcurrentKeyspace::default();
716        ks.set("key".into(), Bytes::from("hello"), None);
717        assert_eq!(ks.strlen("key"), 5);
718    }
719
720    #[test]
721    fn strlen_missing() {
722        let ks = ConcurrentKeyspace::default();
723        assert_eq!(ks.strlen("missing"), 0);
724    }
725
726    #[test]
727    fn persist_removes_ttl() {
728        let ks = ConcurrentKeyspace::default();
729        ks.set(
730            "key".into(),
731            Bytes::from("val"),
732            Some(Duration::from_secs(60)),
733        );
734        assert!(ks.persist("key"));
735        assert!(matches!(ks.ttl("key"), TtlResult::NoExpiry));
736    }
737
738    #[test]
739    fn persist_no_ttl() {
740        let ks = ConcurrentKeyspace::default();
741        ks.set("key".into(), Bytes::from("val"), None);
742        assert!(!ks.persist("key")); // no TTL to remove
743    }
744
745    #[test]
746    fn persist_missing_key() {
747        let ks = ConcurrentKeyspace::default();
748        assert!(!ks.persist("missing"));
749    }
750
751    #[test]
752    fn pexpire_and_pttl() {
753        let ks = ConcurrentKeyspace::default();
754        ks.set("key".into(), Bytes::from("val"), None);
755        assert!(ks.pexpire("key", 5000));
756        match ks.pttl("key") {
757            TtlResult::Milliseconds(ms) => assert!(ms > 0 && ms <= 5000),
758            other => panic!("expected Milliseconds, got {other:?}"),
759        }
760    }
761
762    #[test]
763    fn pttl_no_expiry() {
764        let ks = ConcurrentKeyspace::default();
765        ks.set("key".into(), Bytes::from("val"), None);
766        assert!(matches!(ks.pttl("key"), TtlResult::NoExpiry));
767    }
768
769    #[test]
770    fn pttl_missing() {
771        let ks = ConcurrentKeyspace::default();
772        assert!(matches!(ks.pttl("missing"), TtlResult::NotFound));
773    }
774
775    #[test]
776    fn keys_match_pattern() {
777        let ks = ConcurrentKeyspace::default();
778        ks.set("user:1".into(), Bytes::from("a"), None);
779        ks.set("user:2".into(), Bytes::from("b"), None);
780        ks.set("item:1".into(), Bytes::from("c"), None);
781        let mut result = ks.keys("user:*");
782        result.sort();
783        assert_eq!(result, vec!["user:1", "user:2"]);
784    }
785
786    #[test]
787    fn keys_match_all() {
788        let ks = ConcurrentKeyspace::default();
789        ks.set("a".into(), Bytes::from("1"), None);
790        ks.set("b".into(), Bytes::from("2"), None);
791        let result = ks.keys("*");
792        assert_eq!(result.len(), 2);
793    }
794
795    #[test]
796    fn scan_basic() {
797        let ks = ConcurrentKeyspace::default();
798        ks.set("a".into(), Bytes::from("1"), None);
799        ks.set("b".into(), Bytes::from("2"), None);
800        ks.set("c".into(), Bytes::from("3"), None);
801        let (cursor, keys) = ks.scan_keys(0, 10, None);
802        assert_eq!(cursor, 0); // complete in one pass
803        assert_eq!(keys.len(), 3);
804    }
805
806    #[test]
807    fn scan_with_pattern() {
808        let ks = ConcurrentKeyspace::default();
809        ks.set("user:1".into(), Bytes::from("a"), None);
810        ks.set("user:2".into(), Bytes::from("b"), None);
811        ks.set("item:1".into(), Bytes::from("c"), None);
812        let (_, keys) = ks.scan_keys(0, 10, Some("user:*"));
813        assert_eq!(keys.len(), 2);
814    }
815
816    #[test]
817    fn scan_with_count() {
818        let ks = ConcurrentKeyspace::default();
819        for i in 0..10 {
820            ks.set(format!("k{i}"), Bytes::from("v"), None);
821        }
822        let (cursor, keys) = ks.scan_keys(0, 3, None);
823        assert!(keys.len() <= 3);
824        // if cursor > 0, there are more keys
825        if cursor > 0 {
826            let (_, keys2) = ks.scan_keys(cursor, 3, None);
827            assert!(!keys2.is_empty());
828        }
829    }
830
831    #[test]
832    fn rename_basic() {
833        let ks = ConcurrentKeyspace::default();
834        ks.set("old".into(), Bytes::from("value"), None);
835        ks.rename("old", "new").unwrap();
836        assert_eq!(ks.get("old"), None);
837        assert_eq!(ks.get("new"), Some(Bytes::from("value")));
838    }
839
840    #[test]
841    fn rename_missing_key() {
842        let ks = ConcurrentKeyspace::default();
843        assert!(ks.rename("missing", "new").is_err());
844    }
845
846    #[test]
847    fn rename_overwrites_destination() {
848        let ks = ConcurrentKeyspace::default();
849        ks.set("src".into(), Bytes::from("new_val"), None);
850        ks.set("dst".into(), Bytes::from("old_val"), None);
851        ks.rename("src", "dst").unwrap();
852        assert_eq!(ks.get("src"), None);
853        assert_eq!(ks.get("dst"), Some(Bytes::from("new_val")));
854    }
855
856    #[test]
857    fn concurrent_access() {
858        use std::sync::Arc;
859        use std::thread;
860
861        let ks = Arc::new(ConcurrentKeyspace::default());
862        let mut handles = vec![];
863
864        // Spawn multiple threads doing concurrent sets
865        for i in 0..8 {
866            let ks = Arc::clone(&ks);
867            handles.push(thread::spawn(move || {
868                for j in 0..1000 {
869                    let key = format!("key-{}-{}", i, j);
870                    ks.set(key, Bytes::from("value"), None);
871                }
872            }));
873        }
874
875        for h in handles {
876            h.join().unwrap();
877        }
878
879        assert_eq!(ks.len(), 8000);
880    }
881}