1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9use bytes::Bytes;
10use dashmap::DashMap;
11
12use crate::keyspace::{format_float, EvictionPolicy, TtlResult};
13use crate::memory;
14use crate::time;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum ConcurrentOpError {
19 NotAnInteger,
21 Overflow,
23}
24
25impl std::fmt::Display for ConcurrentOpError {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 match self {
28 Self::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
29 Self::Overflow => write!(f, "ERR increment or decrement would overflow"),
30 }
31 }
32}
33
34impl std::error::Error for ConcurrentOpError {}
35
36#[derive(Debug, Clone, PartialEq)]
38pub enum ConcurrentFloatError {
39 NotAFloat,
41 NanOrInfinity,
43}
44
45impl std::fmt::Display for ConcurrentFloatError {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 Self::NotAFloat => write!(f, "ERR value is not a valid float"),
49 Self::NanOrInfinity => write!(f, "ERR increment would produce NaN or Infinity"),
50 }
51 }
52}
53
54impl std::error::Error for ConcurrentFloatError {}
55
56#[derive(Debug, Clone)]
59struct Entry {
60 value: Bytes,
61 expires_at_ms: u64,
63}
64
65impl Entry {
66 #[inline]
67 fn is_expired(&self) -> bool {
68 time::is_expired(self.expires_at_ms)
69 }
70
71 #[inline]
73 fn size(&self, key_len: usize) -> usize {
74 key_len + self.value.len() + 48
76 }
77}
78
79#[derive(Debug)]
84pub struct ConcurrentKeyspace {
85 data: DashMap<Box<str>, Entry>,
87 memory_used: AtomicUsize,
88 max_memory: Option<usize>,
89 eviction_policy: EvictionPolicy,
90 ops_count: AtomicU64,
91}
92
93impl ConcurrentKeyspace {
94 pub fn new(max_memory: Option<usize>, eviction_policy: EvictionPolicy) -> Self {
96 Self {
97 data: DashMap::new(),
98 memory_used: AtomicUsize::new(0),
99 max_memory,
100 eviction_policy,
101 ops_count: AtomicU64::new(0),
102 }
103 }
104
105 pub fn get(&self, key: &str) -> Option<Bytes> {
107 self.ops_count.fetch_add(1, Ordering::Relaxed);
108
109 let entry = self.data.get(key)?;
110
111 if entry.is_expired() {
112 let key_len = entry.key().len();
113 let size = entry.size(key_len);
114 drop(entry);
115 if self.data.remove(key).is_some() {
117 self.memory_used.fetch_sub(size, Ordering::Relaxed);
118 }
119 return None;
120 }
121
122 Some(entry.value.clone())
123 }
124
125 pub fn set(&self, key: String, value: Bytes, ttl: Option<Duration>) -> bool {
127 self.ops_count.fetch_add(1, Ordering::Relaxed);
128
129 let key: Box<str> = key.into_boxed_str();
130 let entry_size = key.len() + value.len() + 48;
131 let expires_at_ms = time::expiry_from_duration(ttl);
132
133 if let Some(max) = self.max_memory {
135 let limit = memory::effective_limit(max);
136 let current = self.memory_used.load(Ordering::Relaxed);
137 if current + entry_size > limit {
138 if self.eviction_policy == EvictionPolicy::NoEviction {
139 return false;
140 }
141 self.evict_entries(entry_size);
143 }
144 }
145
146 let entry = Entry {
147 value,
148 expires_at_ms,
149 };
150
151 if let Some(old) = self.data.insert(key.clone(), entry) {
153 let old_size = old.size(key.len());
155 let diff = entry_size as isize - old_size as isize;
156 if diff > 0 {
157 self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
158 } else {
159 self.memory_used
160 .fetch_sub((-diff) as usize, Ordering::Relaxed);
161 }
162 } else {
163 self.memory_used.fetch_add(entry_size, Ordering::Relaxed);
164 }
165
166 true
167 }
168
169 pub fn del(&self, key: &str) -> bool {
171 self.ops_count.fetch_add(1, Ordering::Relaxed);
172
173 if let Some((k, removed)) = self.data.remove(key) {
174 self.memory_used
175 .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
176 true
177 } else {
178 false
179 }
180 }
181
182 pub fn exists(&self, key: &str) -> bool {
184 self.get(key).is_some()
185 }
186
187 pub fn ttl(&self, key: &str) -> TtlResult {
189 match self.data.get(key) {
190 None => TtlResult::NotFound,
191 Some(entry) => {
192 if entry.is_expired() {
193 TtlResult::NotFound
194 } else {
195 match time::remaining_secs(entry.expires_at_ms) {
196 None => TtlResult::NoExpiry,
197 Some(secs) => TtlResult::Seconds(secs),
198 }
199 }
200 }
201 }
202 }
203
204 pub fn expire(&self, key: &str, seconds: u64) -> bool {
206 self.ops_count.fetch_add(1, Ordering::Relaxed);
207
208 if let Some(mut entry) = self.data.get_mut(key) {
209 if entry.is_expired() {
210 return false;
211 }
212 entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
213 true
214 } else {
215 false
216 }
217 }
218
219 pub fn incr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
222 self.incr_by(key, 1)
223 }
224
225 pub fn decr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
228 self.incr_by(key, -1)
229 }
230
231 pub fn incr_by(&self, key: &str, delta: i64) -> Result<i64, ConcurrentOpError> {
234 self.ops_count.fetch_add(1, Ordering::Relaxed);
235
236 if let Some(mut entry) = self.data.get_mut(key) {
238 if entry.is_expired() {
239 let key_len = entry.key().len();
240 let old_size = entry.size(key_len);
241 drop(entry);
242 if self.data.remove(key).is_some() {
243 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
244 }
245 } else {
247 let s = std::str::from_utf8(&entry.value)
248 .map_err(|_| ConcurrentOpError::NotAnInteger)?;
249 let current: i64 = s.parse().map_err(|_| ConcurrentOpError::NotAnInteger)?;
250 let new_val = current
251 .checked_add(delta)
252 .ok_or(ConcurrentOpError::Overflow)?;
253 let new_bytes = Bytes::from(new_val.to_string());
254
255 let key_len = entry.key().len();
256 let old_size = entry.size(key_len);
257 entry.value = new_bytes;
258 let new_size = entry.size(key_len);
259 let diff = new_size as isize - old_size as isize;
260 if diff > 0 {
261 self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
262 } else if diff < 0 {
263 self.memory_used
264 .fetch_sub((-diff) as usize, Ordering::Relaxed);
265 }
266 return Ok(new_val);
267 }
268 }
269
270 let new_val = (0i64)
272 .checked_add(delta)
273 .ok_or(ConcurrentOpError::Overflow)?;
274 self.set(key.to_owned(), Bytes::from(new_val.to_string()), None);
275 Ok(new_val)
276 }
277
278 pub fn incr_by_float(&self, key: &str, delta: f64) -> Result<f64, ConcurrentFloatError> {
281 self.ops_count.fetch_add(1, Ordering::Relaxed);
282
283 if let Some(mut entry) = self.data.get_mut(key) {
284 if entry.is_expired() {
285 let key_len = entry.key().len();
286 let old_size = entry.size(key_len);
287 drop(entry);
288 if self.data.remove(key).is_some() {
289 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
290 }
291 } else {
292 let s = std::str::from_utf8(&entry.value)
293 .map_err(|_| ConcurrentFloatError::NotAFloat)?;
294 let current: f64 = s.parse().map_err(|_| ConcurrentFloatError::NotAFloat)?;
295 let new_val = current + delta;
296 if new_val.is_nan() || new_val.is_infinite() {
297 return Err(ConcurrentFloatError::NanOrInfinity);
298 }
299 let new_bytes = Bytes::from(format_float(new_val));
300
301 let key_len = entry.key().len();
302 let old_size = entry.size(key_len);
303 entry.value = new_bytes;
304 let new_size = entry.size(key_len);
305 let diff = new_size as isize - old_size as isize;
306 if diff > 0 {
307 self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
308 } else if diff < 0 {
309 self.memory_used
310 .fetch_sub((-diff) as usize, Ordering::Relaxed);
311 }
312 return Ok(new_val);
313 }
314 }
315
316 let new_val = delta;
318 if new_val.is_nan() || new_val.is_infinite() {
319 return Err(ConcurrentFloatError::NanOrInfinity);
320 }
321 self.set(key.to_owned(), Bytes::from(format_float(new_val)), None);
322 Ok(new_val)
323 }
324
325 pub fn append(&self, key: &str, suffix: &[u8]) -> usize {
328 self.ops_count.fetch_add(1, Ordering::Relaxed);
329
330 if let Some(mut entry) = self.data.get_mut(key) {
331 if !entry.is_expired() {
332 let mut new_data = Vec::with_capacity(entry.value.len() + suffix.len());
333 new_data.extend_from_slice(&entry.value);
334 new_data.extend_from_slice(suffix);
335 let new_len = new_data.len();
336
337 let key_len = entry.key().len();
338 let old_size = entry.size(key_len);
339 entry.value = Bytes::from(new_data);
340 let new_size = entry.size(key_len);
341 let diff = new_size as isize - old_size as isize;
342 if diff > 0 {
343 self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
344 } else if diff < 0 {
345 self.memory_used
346 .fetch_sub((-diff) as usize, Ordering::Relaxed);
347 }
348 return new_len;
349 }
350 let key_len = entry.key().len();
352 let old_size = entry.size(key_len);
353 drop(entry);
354 if self.data.remove(key).is_some() {
355 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
356 }
357 }
358
359 let new_len = suffix.len();
361 self.set(key.to_owned(), Bytes::copy_from_slice(suffix), None);
362 new_len
363 }
364
365 pub fn strlen(&self, key: &str) -> usize {
368 match self.get(key) {
369 Some(data) => data.len(),
370 None => 0,
371 }
372 }
373
374 pub fn persist(&self, key: &str) -> bool {
377 self.ops_count.fetch_add(1, Ordering::Relaxed);
378
379 if let Some(mut entry) = self.data.get_mut(key) {
380 if entry.is_expired() {
381 return false;
382 }
383 if entry.expires_at_ms != 0 {
384 entry.expires_at_ms = 0;
385 true
386 } else {
387 false
388 }
389 } else {
390 false
391 }
392 }
393
394 pub fn pexpire(&self, key: &str, millis: u64) -> bool {
396 self.ops_count.fetch_add(1, Ordering::Relaxed);
397
398 if let Some(mut entry) = self.data.get_mut(key) {
399 if entry.is_expired() {
400 return false;
401 }
402 entry.expires_at_ms = time::now_ms().saturating_add(millis);
403 true
404 } else {
405 false
406 }
407 }
408
409 pub fn pttl(&self, key: &str) -> TtlResult {
411 match self.data.get(key) {
412 None => TtlResult::NotFound,
413 Some(entry) => {
414 if entry.is_expired() {
415 TtlResult::NotFound
416 } else {
417 match time::remaining_ms(entry.expires_at_ms) {
418 None => TtlResult::NoExpiry,
419 Some(ms) => TtlResult::Milliseconds(ms),
420 }
421 }
422 }
423 }
424 }
425
426 pub fn keys(&self, pattern: &str) -> Vec<String> {
428 let len = self.data.len();
429 if len > 10_000 {
430 tracing::warn!(
431 key_count = len,
432 "KEYS on large keyspace, consider SCAN instead"
433 );
434 }
435 self.data
436 .iter()
437 .filter(|entry| !entry.value().is_expired())
438 .filter(|entry| crate::keyspace::glob_match(pattern, entry.key()))
439 .map(|entry| entry.key().to_string())
440 .collect()
441 }
442
443 pub fn scan_keys(
446 &self,
447 cursor: u64,
448 count: usize,
449 pattern: Option<&str>,
450 ) -> (u64, Vec<String>) {
451 let target_count = if count == 0 { 10 } else { count };
452 let mut keys = Vec::with_capacity(target_count);
453 let mut position = 0u64;
454
455 for entry in self.data.iter() {
456 if entry.value().is_expired() {
457 continue;
458 }
459 if position < cursor {
460 position += 1;
461 continue;
462 }
463 if let Some(pat) = pattern {
464 if !crate::keyspace::glob_match(pat, entry.key()) {
465 position += 1;
466 continue;
467 }
468 }
469 keys.push(entry.key().to_string());
470 position += 1;
471 if keys.len() >= target_count {
472 return (position, keys);
474 }
475 }
476
477 (0, keys)
479 }
480
481 pub fn rename(&self, key: &str, newkey: &str) -> Result<(), &'static str> {
483 self.ops_count.fetch_add(1, Ordering::Relaxed);
484
485 let (_, entry) = self.data.remove(key).ok_or("ERR no such key")?;
486
487 if entry.is_expired() {
488 let size = entry.size(key.len());
489 self.memory_used.fetch_sub(size, Ordering::Relaxed);
490 return Err("ERR no such key");
491 }
492
493 if let Some((k, old_dest)) = self.data.remove(newkey) {
495 self.memory_used
496 .fetch_sub(old_dest.size(k.len()), Ordering::Relaxed);
497 }
498
499 let old_key_len = key.len();
501 let new_key_len = newkey.len();
502 let old_mem = old_key_len + entry.value.len() + 48;
503 let new_mem = new_key_len + entry.value.len() + 48;
504
505 self.memory_used.fetch_sub(old_mem, Ordering::Relaxed);
506 self.memory_used.fetch_add(new_mem, Ordering::Relaxed);
507
508 self.data.insert(newkey.into(), entry);
509 Ok(())
510 }
511
512 pub fn len(&self) -> usize {
514 self.data.len()
515 }
516
517 pub fn is_empty(&self) -> bool {
519 self.data.is_empty()
520 }
521
522 pub fn memory_used(&self) -> usize {
524 self.memory_used.load(Ordering::Relaxed)
525 }
526
527 pub fn ops_count(&self) -> u64 {
529 self.ops_count.load(Ordering::Relaxed)
530 }
531
532 pub fn clear(&self) {
534 self.data.clear();
535 self.memory_used.store(0, Ordering::Relaxed);
536 }
537
538 fn evict_entries(&self, needed: usize) {
540 let mut freed = 0usize;
541 let mut keys_to_remove = Vec::new();
542
543 for entry in self.data.iter() {
545 if freed >= needed {
546 break;
547 }
548 let key_len = entry.key().len();
549 keys_to_remove.push(entry.key().clone());
550 freed += entry.value().size(key_len);
551 }
552
553 for key in keys_to_remove {
555 if let Some((k, removed)) = self.data.remove(&key) {
556 self.memory_used
557 .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
558 }
559 }
560 }
561}
562
563impl Default for ConcurrentKeyspace {
564 fn default() -> Self {
565 Self::new(None, EvictionPolicy::NoEviction)
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572
573 #[test]
574 fn set_and_get() {
575 let ks = ConcurrentKeyspace::default();
576 assert!(ks.set("key".into(), Bytes::from("value"), None));
577 assert_eq!(ks.get("key"), Some(Bytes::from("value")));
578 }
579
580 #[test]
581 fn get_missing() {
582 let ks = ConcurrentKeyspace::default();
583 assert_eq!(ks.get("missing"), None);
584 }
585
586 #[test]
587 fn del_existing() {
588 let ks = ConcurrentKeyspace::default();
589 ks.set("key".into(), Bytes::from("value"), None);
590 assert!(ks.del("key"));
591 assert_eq!(ks.get("key"), None);
592 }
593
594 #[test]
595 fn del_missing() {
596 let ks = ConcurrentKeyspace::default();
597 assert!(!ks.del("missing"));
598 }
599
600 #[test]
601 fn exists_check() {
602 let ks = ConcurrentKeyspace::default();
603 ks.set("key".into(), Bytes::from("value"), None);
604 assert!(ks.exists("key"));
605 assert!(!ks.exists("missing"));
606 }
607
608 #[test]
609 fn ttl_expires() {
610 let ks = ConcurrentKeyspace::default();
611 ks.set(
612 "key".into(),
613 Bytes::from("value"),
614 Some(Duration::from_millis(10)),
615 );
616 assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
617 std::thread::sleep(Duration::from_millis(20));
618 assert_eq!(ks.get("key"), None);
619 }
620
621 #[test]
622 fn incr_new_key() {
623 let ks = ConcurrentKeyspace::default();
624 assert_eq!(ks.incr("counter").unwrap(), 1);
625 assert_eq!(ks.get("counter"), Some(Bytes::from("1")));
626 }
627
628 #[test]
629 fn incr_existing_key() {
630 let ks = ConcurrentKeyspace::default();
631 ks.set("counter".into(), Bytes::from("10"), None);
632 assert_eq!(ks.incr("counter").unwrap(), 11);
633 }
634
635 #[test]
636 fn decr_below_zero() {
637 let ks = ConcurrentKeyspace::default();
638 assert_eq!(ks.decr("counter").unwrap(), -1);
639 assert_eq!(ks.decr("counter").unwrap(), -2);
640 }
641
642 #[test]
643 fn incr_by_delta() {
644 let ks = ConcurrentKeyspace::default();
645 assert_eq!(ks.incr_by("counter", 5).unwrap(), 5);
646 assert_eq!(ks.incr_by("counter", -3).unwrap(), 2);
647 }
648
649 #[test]
650 fn incr_non_integer_value() {
651 let ks = ConcurrentKeyspace::default();
652 ks.set("key".into(), Bytes::from("not_a_number"), None);
653 assert_eq!(ks.incr("key"), Err(ConcurrentOpError::NotAnInteger));
654 }
655
656 #[test]
657 fn incr_overflow() {
658 let ks = ConcurrentKeyspace::default();
659 ks.set("key".into(), Bytes::from(i64::MAX.to_string()), None);
660 assert_eq!(ks.incr("key"), Err(ConcurrentOpError::Overflow));
661 }
662
663 #[test]
664 fn incr_by_float_new_key() {
665 let ks = ConcurrentKeyspace::default();
666 let val = ks.incr_by_float("key", 2.5).unwrap();
667 assert!((val - 2.5).abs() < f64::EPSILON);
668 }
669
670 #[test]
671 fn incr_by_float_existing() {
672 let ks = ConcurrentKeyspace::default();
673 ks.set("key".into(), Bytes::from("10.5"), None);
674 let val = ks.incr_by_float("key", 1.5).unwrap();
675 assert!((val - 12.0).abs() < f64::EPSILON);
676 }
677
678 #[test]
679 fn incr_by_float_not_a_float() {
680 let ks = ConcurrentKeyspace::default();
681 ks.set("key".into(), Bytes::from("hello"), None);
682 assert_eq!(
683 ks.incr_by_float("key", 1.0),
684 Err(ConcurrentFloatError::NotAFloat)
685 );
686 }
687
688 #[test]
689 fn incr_by_float_infinity() {
690 let ks = ConcurrentKeyspace::default();
691 ks.set("key".into(), Bytes::from(f64::MAX.to_string()), None);
692 assert_eq!(
693 ks.incr_by_float("key", f64::MAX),
694 Err(ConcurrentFloatError::NanOrInfinity)
695 );
696 }
697
698 #[test]
699 fn append_new_key() {
700 let ks = ConcurrentKeyspace::default();
701 assert_eq!(ks.append("key", b"hello"), 5);
702 assert_eq!(ks.get("key"), Some(Bytes::from("hello")));
703 }
704
705 #[test]
706 fn append_existing_key() {
707 let ks = ConcurrentKeyspace::default();
708 ks.set("key".into(), Bytes::from("hello"), None);
709 assert_eq!(ks.append("key", b" world"), 11);
710 assert_eq!(ks.get("key"), Some(Bytes::from("hello world")));
711 }
712
713 #[test]
714 fn strlen_existing() {
715 let ks = ConcurrentKeyspace::default();
716 ks.set("key".into(), Bytes::from("hello"), None);
717 assert_eq!(ks.strlen("key"), 5);
718 }
719
720 #[test]
721 fn strlen_missing() {
722 let ks = ConcurrentKeyspace::default();
723 assert_eq!(ks.strlen("missing"), 0);
724 }
725
726 #[test]
727 fn persist_removes_ttl() {
728 let ks = ConcurrentKeyspace::default();
729 ks.set(
730 "key".into(),
731 Bytes::from("val"),
732 Some(Duration::from_secs(60)),
733 );
734 assert!(ks.persist("key"));
735 assert!(matches!(ks.ttl("key"), TtlResult::NoExpiry));
736 }
737
738 #[test]
739 fn persist_no_ttl() {
740 let ks = ConcurrentKeyspace::default();
741 ks.set("key".into(), Bytes::from("val"), None);
742 assert!(!ks.persist("key")); }
744
745 #[test]
746 fn persist_missing_key() {
747 let ks = ConcurrentKeyspace::default();
748 assert!(!ks.persist("missing"));
749 }
750
751 #[test]
752 fn pexpire_and_pttl() {
753 let ks = ConcurrentKeyspace::default();
754 ks.set("key".into(), Bytes::from("val"), None);
755 assert!(ks.pexpire("key", 5000));
756 match ks.pttl("key") {
757 TtlResult::Milliseconds(ms) => assert!(ms > 0 && ms <= 5000),
758 other => panic!("expected Milliseconds, got {other:?}"),
759 }
760 }
761
762 #[test]
763 fn pttl_no_expiry() {
764 let ks = ConcurrentKeyspace::default();
765 ks.set("key".into(), Bytes::from("val"), None);
766 assert!(matches!(ks.pttl("key"), TtlResult::NoExpiry));
767 }
768
769 #[test]
770 fn pttl_missing() {
771 let ks = ConcurrentKeyspace::default();
772 assert!(matches!(ks.pttl("missing"), TtlResult::NotFound));
773 }
774
775 #[test]
776 fn keys_match_pattern() {
777 let ks = ConcurrentKeyspace::default();
778 ks.set("user:1".into(), Bytes::from("a"), None);
779 ks.set("user:2".into(), Bytes::from("b"), None);
780 ks.set("item:1".into(), Bytes::from("c"), None);
781 let mut result = ks.keys("user:*");
782 result.sort();
783 assert_eq!(result, vec!["user:1", "user:2"]);
784 }
785
786 #[test]
787 fn keys_match_all() {
788 let ks = ConcurrentKeyspace::default();
789 ks.set("a".into(), Bytes::from("1"), None);
790 ks.set("b".into(), Bytes::from("2"), None);
791 let result = ks.keys("*");
792 assert_eq!(result.len(), 2);
793 }
794
795 #[test]
796 fn scan_basic() {
797 let ks = ConcurrentKeyspace::default();
798 ks.set("a".into(), Bytes::from("1"), None);
799 ks.set("b".into(), Bytes::from("2"), None);
800 ks.set("c".into(), Bytes::from("3"), None);
801 let (cursor, keys) = ks.scan_keys(0, 10, None);
802 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
804 }
805
806 #[test]
807 fn scan_with_pattern() {
808 let ks = ConcurrentKeyspace::default();
809 ks.set("user:1".into(), Bytes::from("a"), None);
810 ks.set("user:2".into(), Bytes::from("b"), None);
811 ks.set("item:1".into(), Bytes::from("c"), None);
812 let (_, keys) = ks.scan_keys(0, 10, Some("user:*"));
813 assert_eq!(keys.len(), 2);
814 }
815
816 #[test]
817 fn scan_with_count() {
818 let ks = ConcurrentKeyspace::default();
819 for i in 0..10 {
820 ks.set(format!("k{i}"), Bytes::from("v"), None);
821 }
822 let (cursor, keys) = ks.scan_keys(0, 3, None);
823 assert!(keys.len() <= 3);
824 if cursor > 0 {
826 let (_, keys2) = ks.scan_keys(cursor, 3, None);
827 assert!(!keys2.is_empty());
828 }
829 }
830
831 #[test]
832 fn rename_basic() {
833 let ks = ConcurrentKeyspace::default();
834 ks.set("old".into(), Bytes::from("value"), None);
835 ks.rename("old", "new").unwrap();
836 assert_eq!(ks.get("old"), None);
837 assert_eq!(ks.get("new"), Some(Bytes::from("value")));
838 }
839
840 #[test]
841 fn rename_missing_key() {
842 let ks = ConcurrentKeyspace::default();
843 assert!(ks.rename("missing", "new").is_err());
844 }
845
846 #[test]
847 fn rename_overwrites_destination() {
848 let ks = ConcurrentKeyspace::default();
849 ks.set("src".into(), Bytes::from("new_val"), None);
850 ks.set("dst".into(), Bytes::from("old_val"), None);
851 ks.rename("src", "dst").unwrap();
852 assert_eq!(ks.get("src"), None);
853 assert_eq!(ks.get("dst"), Some(Bytes::from("new_val")));
854 }
855
856 #[test]
857 fn concurrent_access() {
858 use std::sync::Arc;
859 use std::thread;
860
861 let ks = Arc::new(ConcurrentKeyspace::default());
862 let mut handles = vec![];
863
864 for i in 0..8 {
866 let ks = Arc::clone(&ks);
867 handles.push(thread::spawn(move || {
868 for j in 0..1000 {
869 let key = format!("key-{}-{}", i, j);
870 ks.set(key, Bytes::from("value"), None);
871 }
872 }));
873 }
874
875 for h in handles {
876 h.join().unwrap();
877 }
878
879 assert_eq!(ks.len(), 8000);
880 }
881}