Skip to main content

ember_core/
concurrent.rs

1//! Concurrent keyspace using DashMap for lock-free multi-threaded access.
2//!
3//! This is an alternative to the sharded architecture that eliminates channel
4//! overhead by allowing direct access from multiple connection handlers.
5
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9use bytes::Bytes;
10use compact_str::CompactString;
11use dashmap::DashMap;
12
13use crate::keyspace::{format_float, EvictionPolicy, TtlResult};
14use crate::memory;
15use crate::time;
16
17/// Errors from integer/float operations on the concurrent keyspace.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConcurrentOpError {
20    /// Value cannot be parsed as a number.
21    NotAnInteger,
22    /// Increment or decrement would overflow i64.
23    Overflow,
24}
25
26impl std::fmt::Display for ConcurrentOpError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            Self::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
30            Self::Overflow => write!(f, "ERR increment or decrement would overflow"),
31        }
32    }
33}
34
35impl std::error::Error for ConcurrentOpError {}
36
37/// Errors from float operations on the concurrent keyspace.
38#[derive(Debug, Clone, PartialEq)]
39pub enum ConcurrentFloatError {
40    /// Value cannot be parsed as a float.
41    NotAFloat,
42    /// Result would be NaN or Infinity.
43    NanOrInfinity,
44}
45
46impl std::fmt::Display for ConcurrentFloatError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            Self::NotAFloat => write!(f, "ERR value is not a valid float"),
50            Self::NanOrInfinity => write!(f, "ERR increment would produce NaN or Infinity"),
51        }
52    }
53}
54
55impl std::error::Error for ConcurrentFloatError {}
56
57/// An entry in the concurrent keyspace.
58/// Optimized for memory: 40 bytes (down from 56).
59#[derive(Debug, Clone)]
60struct Entry {
61    value: Bytes,
62    /// Monotonic expiry timestamp in ms. 0 = no expiry.
63    expires_at_ms: u64,
64}
65
66impl Entry {
67    #[inline]
68    fn is_expired(&self) -> bool {
69        time::is_expired(self.expires_at_ms)
70    }
71
72    /// Compute entry size on demand (key_len passed in).
73    #[inline]
74    fn size(&self, key_len: usize) -> usize {
75        // key heap + value heap + entry struct overhead
76        key_len + self.value.len() + 48
77    }
78}
79
80/// A concurrent keyspace backed by DashMap.
81///
82/// Provides thread-safe access to key-value data without channel overhead.
83/// All operations are lock-free for non-conflicting keys.
84#[derive(Debug)]
85pub struct ConcurrentKeyspace {
86    /// CompactString inlines keys ≤24 bytes, avoiding heap allocation for short keys.
87    data: DashMap<CompactString, Entry>,
88    memory_used: AtomicUsize,
89    max_memory: Option<usize>,
90    eviction_policy: EvictionPolicy,
91    ops_count: AtomicU64,
92}
93
94impl ConcurrentKeyspace {
95    /// Creates a new concurrent keyspace with optional memory limit.
96    pub fn new(max_memory: Option<usize>, eviction_policy: EvictionPolicy) -> Self {
97        Self {
98            data: DashMap::new(),
99            memory_used: AtomicUsize::new(0),
100            max_memory,
101            eviction_policy,
102            ops_count: AtomicU64::new(0),
103        }
104    }
105
106    /// Gets a value by key, returning None if not found or expired.
107    pub fn get(&self, key: &str) -> Option<Bytes> {
108        self.ops_count.fetch_add(1, Ordering::Relaxed);
109
110        let entry = self.data.get(key)?;
111
112        if entry.is_expired() {
113            let key_len = entry.key().len();
114            let size = entry.size(key_len);
115            drop(entry);
116            // Remove expired entry
117            if self.data.remove(key).is_some() {
118                self.memory_used.fetch_sub(size, Ordering::Relaxed);
119            }
120            return None;
121        }
122
123        Some(entry.value.clone())
124    }
125
126    /// Sets a key-value pair with optional TTL.
127    pub fn set(&self, key: String, value: Bytes, ttl: Option<Duration>) -> bool {
128        self.ops_count.fetch_add(1, Ordering::Relaxed);
129
130        let key = CompactString::from(key);
131        let entry_size = key.len() + value.len() + 48;
132        let expires_at_ms = time::expiry_from_duration(ttl);
133
134        // Check memory limit (with safety margin for allocator overhead)
135        if let Some(max) = self.max_memory {
136            let limit = memory::effective_limit(max);
137            let current = self.memory_used.load(Ordering::Relaxed);
138            if current + entry_size > limit {
139                if self.eviction_policy == EvictionPolicy::NoEviction {
140                    return false;
141                }
142                // Simple eviction: remove some entries
143                self.evict_entries(entry_size);
144            }
145        }
146
147        let entry = Entry {
148            value,
149            expires_at_ms,
150        };
151
152        // Update memory tracking
153        if let Some(old) = self.data.insert(key.clone(), entry) {
154            // Replace: adjust memory for the size difference
155            self.adjust_memory(old.size(key.len()), entry_size);
156        } else {
157            self.memory_used.fetch_add(entry_size, Ordering::Relaxed);
158        }
159
160        true
161    }
162
163    /// Deletes a key, returning true if it existed.
164    pub fn del(&self, key: &str) -> bool {
165        self.ops_count.fetch_add(1, Ordering::Relaxed);
166
167        if let Some((k, removed)) = self.data.remove(key) {
168            self.memory_used
169                .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
170            true
171        } else {
172            false
173        }
174    }
175
176    /// Checks if a key exists (and is not expired).
177    pub fn exists(&self, key: &str) -> bool {
178        self.get(key).is_some()
179    }
180
181    /// Returns a random non-expired key, or `None` if the keyspace is empty.
182    pub fn random_key(&self) -> Option<String> {
183        use rand::seq::IteratorRandom;
184        let mut rng = rand::rng();
185        // sample up to 5 times to skip expired entries
186        for _ in 0..5 {
187            let entry = self.data.iter().choose(&mut rng)?;
188            if !entry.value().is_expired() {
189                return Some(entry.key().to_string());
190            }
191        }
192        None
193    }
194
195    /// Returns the TTL of a key.
196    pub fn ttl(&self, key: &str) -> TtlResult {
197        match self.data.get(key) {
198            None => TtlResult::NotFound,
199            Some(entry) => {
200                if entry.is_expired() {
201                    TtlResult::NotFound
202                } else {
203                    match time::remaining_secs(entry.expires_at_ms) {
204                        None => TtlResult::NoExpiry,
205                        Some(secs) => TtlResult::Seconds(secs),
206                    }
207                }
208            }
209        }
210    }
211
212    /// Sets expiration on a key.
213    pub fn expire(&self, key: &str, seconds: u64) -> bool {
214        self.ops_count.fetch_add(1, Ordering::Relaxed);
215
216        if let Some(mut entry) = self.data.get_mut(key) {
217            if entry.is_expired() {
218                return false;
219            }
220            entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
221            true
222        } else {
223            false
224        }
225    }
226
227    /// Increments the integer value of a key by 1.
228    /// If the key doesn't exist, it's initialized to 0 before incrementing.
229    pub fn incr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
230        self.incr_by(key, 1)
231    }
232
233    /// Decrements the integer value of a key by 1.
234    /// If the key doesn't exist, it's initialized to 0 before decrementing.
235    pub fn decr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
236        self.incr_by(key, -1)
237    }
238
239    /// Adds `delta` to the integer value of a key, creating it if missing.
240    /// Preserves existing TTL when updating.
241    pub fn incr_by(&self, key: &str, delta: i64) -> Result<i64, ConcurrentOpError> {
242        self.ops_count.fetch_add(1, Ordering::Relaxed);
243
244        // try to update in-place via get_mut
245        if let Some(mut entry) = self.data.get_mut(key) {
246            if entry.is_expired() {
247                let key_len = entry.key().len();
248                let old_size = entry.size(key_len);
249                drop(entry);
250                if self.data.remove(key).is_some() {
251                    self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
252                }
253                // treat as missing — fall through to insert below
254            } else {
255                let s = std::str::from_utf8(&entry.value)
256                    .map_err(|_| ConcurrentOpError::NotAnInteger)?;
257                let current: i64 = s.parse().map_err(|_| ConcurrentOpError::NotAnInteger)?;
258                let new_val = current
259                    .checked_add(delta)
260                    .ok_or(ConcurrentOpError::Overflow)?;
261                let new_bytes = Bytes::from(new_val.to_string());
262
263                let key_len = entry.key().len();
264                let old_size = entry.size(key_len);
265                entry.value = new_bytes;
266                self.adjust_memory(old_size, entry.size(key_len));
267                return Ok(new_val);
268            }
269        }
270
271        // key doesn't exist — treat as 0
272        let new_val = (0i64)
273            .checked_add(delta)
274            .ok_or(ConcurrentOpError::Overflow)?;
275        self.set(key.to_owned(), Bytes::from(new_val.to_string()), None);
276        Ok(new_val)
277    }
278
279    /// Adds `delta` to the float value of a key, creating it if missing.
280    /// Preserves existing TTL when updating.
281    pub fn incr_by_float(&self, key: &str, delta: f64) -> Result<f64, ConcurrentFloatError> {
282        self.ops_count.fetch_add(1, Ordering::Relaxed);
283
284        if let Some(mut entry) = self.data.get_mut(key) {
285            if entry.is_expired() {
286                let key_len = entry.key().len();
287                let old_size = entry.size(key_len);
288                drop(entry);
289                if self.data.remove(key).is_some() {
290                    self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
291                }
292            } else {
293                let s = std::str::from_utf8(&entry.value)
294                    .map_err(|_| ConcurrentFloatError::NotAFloat)?;
295                let current: f64 = s.parse().map_err(|_| ConcurrentFloatError::NotAFloat)?;
296                let new_val = current + delta;
297                if new_val.is_nan() || new_val.is_infinite() {
298                    return Err(ConcurrentFloatError::NanOrInfinity);
299                }
300                let new_bytes = Bytes::from(format_float(new_val));
301
302                let key_len = entry.key().len();
303                let old_size = entry.size(key_len);
304                entry.value = new_bytes;
305                self.adjust_memory(old_size, entry.size(key_len));
306                return Ok(new_val);
307            }
308        }
309
310        // key doesn't exist — treat as 0.0
311        let new_val = delta;
312        if new_val.is_nan() || new_val.is_infinite() {
313            return Err(ConcurrentFloatError::NanOrInfinity);
314        }
315        self.set(key.to_owned(), Bytes::from(format_float(new_val)), None);
316        Ok(new_val)
317    }
318
319    /// Appends a value to an existing string key, or creates a new key.
320    /// Returns the new string length.
321    pub fn append(&self, key: &str, suffix: &[u8]) -> usize {
322        self.ops_count.fetch_add(1, Ordering::Relaxed);
323
324        if let Some(mut entry) = self.data.get_mut(key) {
325            if !entry.is_expired() {
326                let mut new_data = Vec::with_capacity(entry.value.len() + suffix.len());
327                new_data.extend_from_slice(&entry.value);
328                new_data.extend_from_slice(suffix);
329                let new_len = new_data.len();
330
331                let key_len = entry.key().len();
332                let old_size = entry.size(key_len);
333                entry.value = Bytes::from(new_data);
334                self.adjust_memory(old_size, entry.size(key_len));
335                return new_len;
336            }
337            // expired — remove and fall through to create
338            let key_len = entry.key().len();
339            let old_size = entry.size(key_len);
340            drop(entry);
341            if self.data.remove(key).is_some() {
342                self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
343            }
344        }
345
346        // key doesn't exist — create with just the suffix
347        let new_len = suffix.len();
348        self.set(key.to_owned(), Bytes::copy_from_slice(suffix), None);
349        new_len
350    }
351
352    /// Returns the length of the string value stored at key.
353    /// Returns 0 if the key doesn't exist.
354    pub fn strlen(&self, key: &str) -> usize {
355        match self.get(key) {
356            Some(data) => data.len(),
357            None => 0,
358        }
359    }
360
361    /// Removes the expiration from a key.
362    /// Returns true if the timeout was successfully removed.
363    pub fn persist(&self, key: &str) -> bool {
364        self.ops_count.fetch_add(1, Ordering::Relaxed);
365
366        if let Some(mut entry) = self.data.get_mut(key) {
367            if entry.is_expired() {
368                return false;
369            }
370            if entry.expires_at_ms != 0 {
371                entry.expires_at_ms = 0;
372                true
373            } else {
374                false
375            }
376        } else {
377            false
378        }
379    }
380
381    /// Sets expiration in milliseconds on an existing key.
382    pub fn pexpire(&self, key: &str, millis: u64) -> bool {
383        self.ops_count.fetch_add(1, Ordering::Relaxed);
384
385        if let Some(mut entry) = self.data.get_mut(key) {
386            if entry.is_expired() {
387                return false;
388            }
389            entry.expires_at_ms = time::now_ms().saturating_add(millis);
390            true
391        } else {
392            false
393        }
394    }
395
396    /// Returns the remaining TTL in milliseconds.
397    pub fn pttl(&self, key: &str) -> TtlResult {
398        match self.data.get(key) {
399            None => TtlResult::NotFound,
400            Some(entry) => {
401                if entry.is_expired() {
402                    TtlResult::NotFound
403                } else {
404                    match time::remaining_ms(entry.expires_at_ms) {
405                        None => TtlResult::NoExpiry,
406                        Some(ms) => TtlResult::Milliseconds(ms),
407                    }
408                }
409            }
410        }
411    }
412
413    /// Returns all keys matching a glob pattern.
414    pub fn keys(&self, pattern: &str) -> Vec<String> {
415        let len = self.data.len();
416        if len > 10_000 {
417            tracing::warn!(
418                key_count = len,
419                "KEYS on large keyspace, consider SCAN instead"
420            );
421        }
422        self.data
423            .iter()
424            .filter(|entry| !entry.value().is_expired())
425            .filter(|entry| crate::keyspace::glob_match(pattern, entry.key()))
426            .map(|entry| entry.key().to_string())
427            .collect()
428    }
429
430    /// Iterates keys using a cursor. Returns (next_cursor, keys).
431    /// A next_cursor of 0 means the iteration is complete.
432    pub fn scan_keys(
433        &self,
434        cursor: u64,
435        count: usize,
436        pattern: Option<&str>,
437    ) -> (u64, Vec<String>) {
438        let target_count = if count == 0 { 10 } else { count };
439        let mut keys = Vec::with_capacity(target_count);
440        let mut position = 0u64;
441
442        for entry in self.data.iter() {
443            if entry.value().is_expired() {
444                continue;
445            }
446            if position < cursor {
447                position += 1;
448                continue;
449            }
450            if let Some(pat) = pattern {
451                if !crate::keyspace::glob_match(pat, entry.key()) {
452                    position += 1;
453                    continue;
454                }
455            }
456            keys.push(entry.key().to_string());
457            position += 1;
458            if keys.len() >= target_count {
459                // there may be more keys — return position as next cursor
460                return (position, keys);
461            }
462        }
463
464        // iteration complete
465        (0, keys)
466    }
467
468    /// Renames a key. Returns true if the source key existed.
469    pub fn rename(&self, key: &str, newkey: &str) -> Result<(), &'static str> {
470        self.ops_count.fetch_add(1, Ordering::Relaxed);
471
472        let (_, entry) = self.data.remove(key).ok_or("ERR no such key")?;
473
474        if entry.is_expired() {
475            let size = entry.size(key.len());
476            self.memory_used.fetch_sub(size, Ordering::Relaxed);
477            return Err("ERR no such key");
478        }
479
480        // remove destination if it exists
481        if let Some((k, old_dest)) = self.data.remove(newkey) {
482            self.memory_used
483                .fetch_sub(old_dest.size(k.len()), Ordering::Relaxed);
484        }
485
486        // adjust memory: old key removed, new key added
487        let old_key_len = key.len();
488        let new_key_len = newkey.len();
489        let old_mem = old_key_len + entry.value.len() + 48;
490        let new_mem = new_key_len + entry.value.len() + 48;
491
492        self.memory_used.fetch_sub(old_mem, Ordering::Relaxed);
493        self.memory_used.fetch_add(new_mem, Ordering::Relaxed);
494
495        self.data.insert(newkey.into(), entry);
496        Ok(())
497    }
498
499    /// Returns the number of keys.
500    pub fn len(&self) -> usize {
501        self.data.len()
502    }
503
504    /// Returns true if the keyspace is empty.
505    pub fn is_empty(&self) -> bool {
506        self.data.is_empty()
507    }
508
509    /// Returns memory usage in bytes.
510    pub fn memory_used(&self) -> usize {
511        self.memory_used.load(Ordering::Relaxed)
512    }
513
514    /// Returns the operation count.
515    pub fn ops_count(&self) -> u64 {
516        self.ops_count.load(Ordering::Relaxed)
517    }
518
519    /// Clears all keys.
520    pub fn clear(&self) {
521        self.data.clear();
522        self.memory_used.store(0, Ordering::Relaxed);
523    }
524
525    /// Adjusts `memory_used` after an in-place value replacement.
526    ///
527    /// Computes the signed difference between old and new sizes and applies it
528    /// atomically. Called wherever a key's value changes without removing the key.
529    #[inline]
530    fn adjust_memory(&self, old_size: usize, new_size: usize) {
531        let diff = new_size as isize - old_size as isize;
532        if diff > 0 {
533            self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
534        } else if diff < 0 {
535            self.memory_used
536                .fetch_sub((-diff) as usize, Ordering::Relaxed);
537        }
538    }
539
540    /// Simple eviction: remove approximately `needed` bytes worth of entries.
541    fn evict_entries(&self, needed: usize) {
542        // Use retain to remove entries in a single pass without collecting keys.
543        // The closure returns false for entries we want to evict, updating the
544        // memory counter as we go. Once we've freed enough bytes we stop evicting.
545        let freed = std::sync::atomic::AtomicUsize::new(0);
546        self.data.retain(|k, v| {
547            if freed.load(Ordering::Relaxed) >= needed {
548                return true; // enough freed — keep the rest
549            }
550            let entry_bytes = v.size(k.len());
551            freed.fetch_add(entry_bytes, Ordering::Relaxed);
552            self.memory_used.fetch_sub(entry_bytes, Ordering::Relaxed);
553            false // evict this entry
554        });
555    }
556}
557
558impl Default for ConcurrentKeyspace {
559    fn default() -> Self {
560        Self::new(None, EvictionPolicy::NoEviction)
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    #[test]
569    fn set_and_get() {
570        let ks = ConcurrentKeyspace::default();
571        assert!(ks.set("key".into(), Bytes::from("value"), None));
572        assert_eq!(ks.get("key"), Some(Bytes::from("value")));
573    }
574
575    #[test]
576    fn get_missing() {
577        let ks = ConcurrentKeyspace::default();
578        assert_eq!(ks.get("missing"), None);
579    }
580
581    #[test]
582    fn del_existing() {
583        let ks = ConcurrentKeyspace::default();
584        ks.set("key".into(), Bytes::from("value"), None);
585        assert!(ks.del("key"));
586        assert_eq!(ks.get("key"), None);
587    }
588
589    #[test]
590    fn del_missing() {
591        let ks = ConcurrentKeyspace::default();
592        assert!(!ks.del("missing"));
593    }
594
595    #[test]
596    fn exists_check() {
597        let ks = ConcurrentKeyspace::default();
598        ks.set("key".into(), Bytes::from("value"), None);
599        assert!(ks.exists("key"));
600        assert!(!ks.exists("missing"));
601    }
602
603    #[test]
604    fn ttl_expires() {
605        let ks = ConcurrentKeyspace::default();
606        ks.set(
607            "key".into(),
608            Bytes::from("value"),
609            Some(Duration::from_millis(10)),
610        );
611        assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
612        std::thread::sleep(Duration::from_millis(20));
613        assert_eq!(ks.get("key"), None);
614    }
615
616    #[test]
617    fn incr_new_key() {
618        let ks = ConcurrentKeyspace::default();
619        assert_eq!(ks.incr("counter").unwrap(), 1);
620        assert_eq!(ks.get("counter"), Some(Bytes::from("1")));
621    }
622
623    #[test]
624    fn incr_existing_key() {
625        let ks = ConcurrentKeyspace::default();
626        ks.set("counter".into(), Bytes::from("10"), None);
627        assert_eq!(ks.incr("counter").unwrap(), 11);
628    }
629
630    #[test]
631    fn decr_below_zero() {
632        let ks = ConcurrentKeyspace::default();
633        assert_eq!(ks.decr("counter").unwrap(), -1);
634        assert_eq!(ks.decr("counter").unwrap(), -2);
635    }
636
637    #[test]
638    fn incr_by_delta() {
639        let ks = ConcurrentKeyspace::default();
640        assert_eq!(ks.incr_by("counter", 5).unwrap(), 5);
641        assert_eq!(ks.incr_by("counter", -3).unwrap(), 2);
642    }
643
644    #[test]
645    fn incr_non_integer_value() {
646        let ks = ConcurrentKeyspace::default();
647        ks.set("key".into(), Bytes::from("not_a_number"), None);
648        assert_eq!(ks.incr("key"), Err(ConcurrentOpError::NotAnInteger));
649    }
650
651    #[test]
652    fn incr_overflow() {
653        let ks = ConcurrentKeyspace::default();
654        ks.set("key".into(), Bytes::from(i64::MAX.to_string()), None);
655        assert_eq!(ks.incr("key"), Err(ConcurrentOpError::Overflow));
656    }
657
658    #[test]
659    fn incr_by_float_new_key() {
660        let ks = ConcurrentKeyspace::default();
661        let val = ks.incr_by_float("key", 2.5).unwrap();
662        assert!((val - 2.5).abs() < f64::EPSILON);
663    }
664
665    #[test]
666    fn incr_by_float_existing() {
667        let ks = ConcurrentKeyspace::default();
668        ks.set("key".into(), Bytes::from("10.5"), None);
669        let val = ks.incr_by_float("key", 1.5).unwrap();
670        assert!((val - 12.0).abs() < f64::EPSILON);
671    }
672
673    #[test]
674    fn incr_by_float_not_a_float() {
675        let ks = ConcurrentKeyspace::default();
676        ks.set("key".into(), Bytes::from("hello"), None);
677        assert_eq!(
678            ks.incr_by_float("key", 1.0),
679            Err(ConcurrentFloatError::NotAFloat)
680        );
681    }
682
683    #[test]
684    fn incr_by_float_infinity() {
685        let ks = ConcurrentKeyspace::default();
686        ks.set("key".into(), Bytes::from(f64::MAX.to_string()), None);
687        assert_eq!(
688            ks.incr_by_float("key", f64::MAX),
689            Err(ConcurrentFloatError::NanOrInfinity)
690        );
691    }
692
693    #[test]
694    fn append_new_key() {
695        let ks = ConcurrentKeyspace::default();
696        assert_eq!(ks.append("key", b"hello"), 5);
697        assert_eq!(ks.get("key"), Some(Bytes::from("hello")));
698    }
699
700    #[test]
701    fn append_existing_key() {
702        let ks = ConcurrentKeyspace::default();
703        ks.set("key".into(), Bytes::from("hello"), None);
704        assert_eq!(ks.append("key", b" world"), 11);
705        assert_eq!(ks.get("key"), Some(Bytes::from("hello world")));
706    }
707
708    #[test]
709    fn strlen_existing() {
710        let ks = ConcurrentKeyspace::default();
711        ks.set("key".into(), Bytes::from("hello"), None);
712        assert_eq!(ks.strlen("key"), 5);
713    }
714
715    #[test]
716    fn strlen_missing() {
717        let ks = ConcurrentKeyspace::default();
718        assert_eq!(ks.strlen("missing"), 0);
719    }
720
721    #[test]
722    fn persist_removes_ttl() {
723        let ks = ConcurrentKeyspace::default();
724        ks.set(
725            "key".into(),
726            Bytes::from("val"),
727            Some(Duration::from_secs(60)),
728        );
729        assert!(ks.persist("key"));
730        assert!(matches!(ks.ttl("key"), TtlResult::NoExpiry));
731    }
732
733    #[test]
734    fn persist_no_ttl() {
735        let ks = ConcurrentKeyspace::default();
736        ks.set("key".into(), Bytes::from("val"), None);
737        assert!(!ks.persist("key")); // no TTL to remove
738    }
739
740    #[test]
741    fn persist_missing_key() {
742        let ks = ConcurrentKeyspace::default();
743        assert!(!ks.persist("missing"));
744    }
745
746    #[test]
747    fn pexpire_and_pttl() {
748        let ks = ConcurrentKeyspace::default();
749        ks.set("key".into(), Bytes::from("val"), None);
750        assert!(ks.pexpire("key", 5000));
751        match ks.pttl("key") {
752            TtlResult::Milliseconds(ms) => assert!(ms > 0 && ms <= 5000),
753            other => panic!("expected Milliseconds, got {other:?}"),
754        }
755    }
756
757    #[test]
758    fn pttl_no_expiry() {
759        let ks = ConcurrentKeyspace::default();
760        ks.set("key".into(), Bytes::from("val"), None);
761        assert!(matches!(ks.pttl("key"), TtlResult::NoExpiry));
762    }
763
764    #[test]
765    fn pttl_missing() {
766        let ks = ConcurrentKeyspace::default();
767        assert!(matches!(ks.pttl("missing"), TtlResult::NotFound));
768    }
769
770    #[test]
771    fn keys_match_pattern() {
772        let ks = ConcurrentKeyspace::default();
773        ks.set("user:1".into(), Bytes::from("a"), None);
774        ks.set("user:2".into(), Bytes::from("b"), None);
775        ks.set("item:1".into(), Bytes::from("c"), None);
776        let mut result = ks.keys("user:*");
777        result.sort();
778        assert_eq!(result, vec!["user:1", "user:2"]);
779    }
780
781    #[test]
782    fn keys_match_all() {
783        let ks = ConcurrentKeyspace::default();
784        ks.set("a".into(), Bytes::from("1"), None);
785        ks.set("b".into(), Bytes::from("2"), None);
786        let result = ks.keys("*");
787        assert_eq!(result.len(), 2);
788    }
789
790    #[test]
791    fn scan_basic() {
792        let ks = ConcurrentKeyspace::default();
793        ks.set("a".into(), Bytes::from("1"), None);
794        ks.set("b".into(), Bytes::from("2"), None);
795        ks.set("c".into(), Bytes::from("3"), None);
796        let (cursor, keys) = ks.scan_keys(0, 10, None);
797        assert_eq!(cursor, 0); // complete in one pass
798        assert_eq!(keys.len(), 3);
799    }
800
801    #[test]
802    fn scan_with_pattern() {
803        let ks = ConcurrentKeyspace::default();
804        ks.set("user:1".into(), Bytes::from("a"), None);
805        ks.set("user:2".into(), Bytes::from("b"), None);
806        ks.set("item:1".into(), Bytes::from("c"), None);
807        let (_, keys) = ks.scan_keys(0, 10, Some("user:*"));
808        assert_eq!(keys.len(), 2);
809    }
810
811    #[test]
812    fn scan_with_count() {
813        let ks = ConcurrentKeyspace::default();
814        for i in 0..10 {
815            ks.set(format!("k{i}"), Bytes::from("v"), None);
816        }
817        let (cursor, keys) = ks.scan_keys(0, 3, None);
818        assert!(keys.len() <= 3);
819        // if cursor > 0, there are more keys
820        if cursor > 0 {
821            let (_, keys2) = ks.scan_keys(cursor, 3, None);
822            assert!(!keys2.is_empty());
823        }
824    }
825
826    #[test]
827    fn rename_basic() {
828        let ks = ConcurrentKeyspace::default();
829        ks.set("old".into(), Bytes::from("value"), None);
830        ks.rename("old", "new").unwrap();
831        assert_eq!(ks.get("old"), None);
832        assert_eq!(ks.get("new"), Some(Bytes::from("value")));
833    }
834
835    #[test]
836    fn rename_missing_key() {
837        let ks = ConcurrentKeyspace::default();
838        assert!(ks.rename("missing", "new").is_err());
839    }
840
841    #[test]
842    fn rename_overwrites_destination() {
843        let ks = ConcurrentKeyspace::default();
844        ks.set("src".into(), Bytes::from("new_val"), None);
845        ks.set("dst".into(), Bytes::from("old_val"), None);
846        ks.rename("src", "dst").unwrap();
847        assert_eq!(ks.get("src"), None);
848        assert_eq!(ks.get("dst"), Some(Bytes::from("new_val")));
849    }
850
851    #[test]
852    fn concurrent_access() {
853        use std::sync::Arc;
854        use std::thread;
855
856        let ks = Arc::new(ConcurrentKeyspace::default());
857        let mut handles = vec![];
858
859        // Spawn multiple threads doing concurrent sets
860        for i in 0..8 {
861            let ks = Arc::clone(&ks);
862            handles.push(thread::spawn(move || {
863                for j in 0..1000 {
864                    let key = format!("key-{}-{}", i, j);
865                    ks.set(key, Bytes::from("value"), None);
866                }
867            }));
868        }
869
870        for h in handles {
871            h.join().unwrap();
872        }
873
874        assert_eq!(ks.len(), 8000);
875    }
876}