1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9use bytes::Bytes;
10use compact_str::CompactString;
11use dashmap::DashMap;
12
13use crate::keyspace::{format_float, EvictionPolicy, TtlResult};
14use crate::memory;
15use crate::time;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConcurrentOpError {
20 NotAnInteger,
22 Overflow,
24}
25
26impl std::fmt::Display for ConcurrentOpError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 Self::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
30 Self::Overflow => write!(f, "ERR increment or decrement would overflow"),
31 }
32 }
33}
34
35impl std::error::Error for ConcurrentOpError {}
36
37#[derive(Debug, Clone, PartialEq)]
39pub enum ConcurrentFloatError {
40 NotAFloat,
42 NanOrInfinity,
44}
45
46impl std::fmt::Display for ConcurrentFloatError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 Self::NotAFloat => write!(f, "ERR value is not a valid float"),
50 Self::NanOrInfinity => write!(f, "ERR increment would produce NaN or Infinity"),
51 }
52 }
53}
54
55impl std::error::Error for ConcurrentFloatError {}
56
57#[derive(Debug, Clone)]
60struct Entry {
61 value: Bytes,
62 expires_at_ms: u64,
64}
65
66impl Entry {
67 #[inline]
68 fn is_expired(&self) -> bool {
69 time::is_expired(self.expires_at_ms)
70 }
71
72 #[inline]
74 fn size(&self, key_len: usize) -> usize {
75 key_len + self.value.len() + 48
77 }
78}
79
80#[derive(Debug)]
85pub struct ConcurrentKeyspace {
86 data: DashMap<CompactString, Entry>,
88 memory_used: AtomicUsize,
89 max_memory: Option<usize>,
90 eviction_policy: EvictionPolicy,
91 ops_count: AtomicU64,
92}
93
94impl ConcurrentKeyspace {
95 pub fn new(max_memory: Option<usize>, eviction_policy: EvictionPolicy) -> Self {
97 Self {
98 data: DashMap::new(),
99 memory_used: AtomicUsize::new(0),
100 max_memory,
101 eviction_policy,
102 ops_count: AtomicU64::new(0),
103 }
104 }
105
106 pub fn get(&self, key: &str) -> Option<Bytes> {
108 self.ops_count.fetch_add(1, Ordering::Relaxed);
109
110 let entry = self.data.get(key)?;
111
112 if entry.is_expired() {
113 let key_len = entry.key().len();
114 let size = entry.size(key_len);
115 drop(entry);
116 if self.data.remove(key).is_some() {
118 self.memory_used.fetch_sub(size, Ordering::Relaxed);
119 }
120 return None;
121 }
122
123 Some(entry.value.clone())
124 }
125
126 pub fn set(&self, key: String, value: Bytes, ttl: Option<Duration>) -> bool {
128 self.ops_count.fetch_add(1, Ordering::Relaxed);
129
130 let key = CompactString::from(key);
131 let entry_size = key.len() + value.len() + 48;
132 let expires_at_ms = time::expiry_from_duration(ttl);
133
134 if let Some(max) = self.max_memory {
136 let limit = memory::effective_limit(max);
137 let current = self.memory_used.load(Ordering::Relaxed);
138 if current + entry_size > limit {
139 if self.eviction_policy == EvictionPolicy::NoEviction {
140 return false;
141 }
142 self.evict_entries(entry_size);
144 }
145 }
146
147 let entry = Entry {
148 value,
149 expires_at_ms,
150 };
151
152 if let Some(old) = self.data.insert(key.clone(), entry) {
154 self.adjust_memory(old.size(key.len()), entry_size);
156 } else {
157 self.memory_used.fetch_add(entry_size, Ordering::Relaxed);
158 }
159
160 true
161 }
162
163 pub fn del(&self, key: &str) -> bool {
165 self.ops_count.fetch_add(1, Ordering::Relaxed);
166
167 if let Some((k, removed)) = self.data.remove(key) {
168 self.memory_used
169 .fetch_sub(removed.size(k.len()), Ordering::Relaxed);
170 true
171 } else {
172 false
173 }
174 }
175
176 pub fn exists(&self, key: &str) -> bool {
178 self.get(key).is_some()
179 }
180
181 pub fn random_key(&self) -> Option<String> {
183 use rand::seq::IteratorRandom;
184 let mut rng = rand::rng();
185 for _ in 0..5 {
187 let entry = self.data.iter().choose(&mut rng)?;
188 if !entry.value().is_expired() {
189 return Some(entry.key().to_string());
190 }
191 }
192 None
193 }
194
195 pub fn ttl(&self, key: &str) -> TtlResult {
197 match self.data.get(key) {
198 None => TtlResult::NotFound,
199 Some(entry) => {
200 if entry.is_expired() {
201 TtlResult::NotFound
202 } else {
203 match time::remaining_secs(entry.expires_at_ms) {
204 None => TtlResult::NoExpiry,
205 Some(secs) => TtlResult::Seconds(secs),
206 }
207 }
208 }
209 }
210 }
211
212 pub fn expire(&self, key: &str, seconds: u64) -> bool {
214 self.ops_count.fetch_add(1, Ordering::Relaxed);
215
216 if let Some(mut entry) = self.data.get_mut(key) {
217 if entry.is_expired() {
218 return false;
219 }
220 entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
221 true
222 } else {
223 false
224 }
225 }
226
227 pub fn incr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
230 self.incr_by(key, 1)
231 }
232
233 pub fn decr(&self, key: &str) -> Result<i64, ConcurrentOpError> {
236 self.incr_by(key, -1)
237 }
238
239 pub fn incr_by(&self, key: &str, delta: i64) -> Result<i64, ConcurrentOpError> {
242 self.ops_count.fetch_add(1, Ordering::Relaxed);
243
244 if let Some(mut entry) = self.data.get_mut(key) {
246 if entry.is_expired() {
247 let key_len = entry.key().len();
248 let old_size = entry.size(key_len);
249 drop(entry);
250 if self.data.remove(key).is_some() {
251 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
252 }
253 } else {
255 let s = std::str::from_utf8(&entry.value)
256 .map_err(|_| ConcurrentOpError::NotAnInteger)?;
257 let current: i64 = s.parse().map_err(|_| ConcurrentOpError::NotAnInteger)?;
258 let new_val = current
259 .checked_add(delta)
260 .ok_or(ConcurrentOpError::Overflow)?;
261 let new_bytes = Bytes::from(new_val.to_string());
262
263 let key_len = entry.key().len();
264 let old_size = entry.size(key_len);
265 entry.value = new_bytes;
266 self.adjust_memory(old_size, entry.size(key_len));
267 return Ok(new_val);
268 }
269 }
270
271 let new_val = (0i64)
273 .checked_add(delta)
274 .ok_or(ConcurrentOpError::Overflow)?;
275 self.set(key.to_owned(), Bytes::from(new_val.to_string()), None);
276 Ok(new_val)
277 }
278
279 pub fn incr_by_float(&self, key: &str, delta: f64) -> Result<f64, ConcurrentFloatError> {
282 self.ops_count.fetch_add(1, Ordering::Relaxed);
283
284 if let Some(mut entry) = self.data.get_mut(key) {
285 if entry.is_expired() {
286 let key_len = entry.key().len();
287 let old_size = entry.size(key_len);
288 drop(entry);
289 if self.data.remove(key).is_some() {
290 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
291 }
292 } else {
293 let s = std::str::from_utf8(&entry.value)
294 .map_err(|_| ConcurrentFloatError::NotAFloat)?;
295 let current: f64 = s.parse().map_err(|_| ConcurrentFloatError::NotAFloat)?;
296 let new_val = current + delta;
297 if new_val.is_nan() || new_val.is_infinite() {
298 return Err(ConcurrentFloatError::NanOrInfinity);
299 }
300 let new_bytes = Bytes::from(format_float(new_val));
301
302 let key_len = entry.key().len();
303 let old_size = entry.size(key_len);
304 entry.value = new_bytes;
305 self.adjust_memory(old_size, entry.size(key_len));
306 return Ok(new_val);
307 }
308 }
309
310 let new_val = delta;
312 if new_val.is_nan() || new_val.is_infinite() {
313 return Err(ConcurrentFloatError::NanOrInfinity);
314 }
315 self.set(key.to_owned(), Bytes::from(format_float(new_val)), None);
316 Ok(new_val)
317 }
318
319 pub fn append(&self, key: &str, suffix: &[u8]) -> usize {
322 self.ops_count.fetch_add(1, Ordering::Relaxed);
323
324 if let Some(mut entry) = self.data.get_mut(key) {
325 if !entry.is_expired() {
326 let mut new_data = Vec::with_capacity(entry.value.len() + suffix.len());
327 new_data.extend_from_slice(&entry.value);
328 new_data.extend_from_slice(suffix);
329 let new_len = new_data.len();
330
331 let key_len = entry.key().len();
332 let old_size = entry.size(key_len);
333 entry.value = Bytes::from(new_data);
334 self.adjust_memory(old_size, entry.size(key_len));
335 return new_len;
336 }
337 let key_len = entry.key().len();
339 let old_size = entry.size(key_len);
340 drop(entry);
341 if self.data.remove(key).is_some() {
342 self.memory_used.fetch_sub(old_size, Ordering::Relaxed);
343 }
344 }
345
346 let new_len = suffix.len();
348 self.set(key.to_owned(), Bytes::copy_from_slice(suffix), None);
349 new_len
350 }
351
352 pub fn strlen(&self, key: &str) -> usize {
355 match self.get(key) {
356 Some(data) => data.len(),
357 None => 0,
358 }
359 }
360
361 pub fn persist(&self, key: &str) -> bool {
364 self.ops_count.fetch_add(1, Ordering::Relaxed);
365
366 if let Some(mut entry) = self.data.get_mut(key) {
367 if entry.is_expired() {
368 return false;
369 }
370 if entry.expires_at_ms != 0 {
371 entry.expires_at_ms = 0;
372 true
373 } else {
374 false
375 }
376 } else {
377 false
378 }
379 }
380
381 pub fn pexpire(&self, key: &str, millis: u64) -> bool {
383 self.ops_count.fetch_add(1, Ordering::Relaxed);
384
385 if let Some(mut entry) = self.data.get_mut(key) {
386 if entry.is_expired() {
387 return false;
388 }
389 entry.expires_at_ms = time::now_ms().saturating_add(millis);
390 true
391 } else {
392 false
393 }
394 }
395
396 pub fn pttl(&self, key: &str) -> TtlResult {
398 match self.data.get(key) {
399 None => TtlResult::NotFound,
400 Some(entry) => {
401 if entry.is_expired() {
402 TtlResult::NotFound
403 } else {
404 match time::remaining_ms(entry.expires_at_ms) {
405 None => TtlResult::NoExpiry,
406 Some(ms) => TtlResult::Milliseconds(ms),
407 }
408 }
409 }
410 }
411 }
412
413 pub fn keys(&self, pattern: &str) -> Vec<String> {
415 let len = self.data.len();
416 if len > 10_000 {
417 tracing::warn!(
418 key_count = len,
419 "KEYS on large keyspace, consider SCAN instead"
420 );
421 }
422 self.data
423 .iter()
424 .filter(|entry| !entry.value().is_expired())
425 .filter(|entry| crate::keyspace::glob_match(pattern, entry.key()))
426 .map(|entry| entry.key().to_string())
427 .collect()
428 }
429
430 pub fn scan_keys(
433 &self,
434 cursor: u64,
435 count: usize,
436 pattern: Option<&str>,
437 ) -> (u64, Vec<String>) {
438 let target_count = if count == 0 { 10 } else { count };
439 let mut keys = Vec::with_capacity(target_count);
440 let mut position = 0u64;
441
442 for entry in self.data.iter() {
443 if entry.value().is_expired() {
444 continue;
445 }
446 if position < cursor {
447 position += 1;
448 continue;
449 }
450 if let Some(pat) = pattern {
451 if !crate::keyspace::glob_match(pat, entry.key()) {
452 position += 1;
453 continue;
454 }
455 }
456 keys.push(entry.key().to_string());
457 position += 1;
458 if keys.len() >= target_count {
459 return (position, keys);
461 }
462 }
463
464 (0, keys)
466 }
467
468 pub fn rename(&self, key: &str, newkey: &str) -> Result<(), &'static str> {
470 self.ops_count.fetch_add(1, Ordering::Relaxed);
471
472 let (_, entry) = self.data.remove(key).ok_or("ERR no such key")?;
473
474 if entry.is_expired() {
475 let size = entry.size(key.len());
476 self.memory_used.fetch_sub(size, Ordering::Relaxed);
477 return Err("ERR no such key");
478 }
479
480 if let Some((k, old_dest)) = self.data.remove(newkey) {
482 self.memory_used
483 .fetch_sub(old_dest.size(k.len()), Ordering::Relaxed);
484 }
485
486 let old_key_len = key.len();
488 let new_key_len = newkey.len();
489 let old_mem = old_key_len + entry.value.len() + 48;
490 let new_mem = new_key_len + entry.value.len() + 48;
491
492 self.memory_used.fetch_sub(old_mem, Ordering::Relaxed);
493 self.memory_used.fetch_add(new_mem, Ordering::Relaxed);
494
495 self.data.insert(newkey.into(), entry);
496 Ok(())
497 }
498
499 pub fn len(&self) -> usize {
501 self.data.len()
502 }
503
504 pub fn is_empty(&self) -> bool {
506 self.data.is_empty()
507 }
508
509 pub fn memory_used(&self) -> usize {
511 self.memory_used.load(Ordering::Relaxed)
512 }
513
514 pub fn ops_count(&self) -> u64 {
516 self.ops_count.load(Ordering::Relaxed)
517 }
518
519 pub fn clear(&self) {
521 self.data.clear();
522 self.memory_used.store(0, Ordering::Relaxed);
523 }
524
525 #[inline]
530 fn adjust_memory(&self, old_size: usize, new_size: usize) {
531 let diff = new_size as isize - old_size as isize;
532 if diff > 0 {
533 self.memory_used.fetch_add(diff as usize, Ordering::Relaxed);
534 } else if diff < 0 {
535 self.memory_used
536 .fetch_sub((-diff) as usize, Ordering::Relaxed);
537 }
538 }
539
540 fn evict_entries(&self, needed: usize) {
542 let freed = std::sync::atomic::AtomicUsize::new(0);
546 self.data.retain(|k, v| {
547 if freed.load(Ordering::Relaxed) >= needed {
548 return true; }
550 let entry_bytes = v.size(k.len());
551 freed.fetch_add(entry_bytes, Ordering::Relaxed);
552 self.memory_used.fetch_sub(entry_bytes, Ordering::Relaxed);
553 false });
555 }
556}
557
558impl Default for ConcurrentKeyspace {
559 fn default() -> Self {
560 Self::new(None, EvictionPolicy::NoEviction)
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 #[test]
569 fn set_and_get() {
570 let ks = ConcurrentKeyspace::default();
571 assert!(ks.set("key".into(), Bytes::from("value"), None));
572 assert_eq!(ks.get("key"), Some(Bytes::from("value")));
573 }
574
575 #[test]
576 fn get_missing() {
577 let ks = ConcurrentKeyspace::default();
578 assert_eq!(ks.get("missing"), None);
579 }
580
581 #[test]
582 fn del_existing() {
583 let ks = ConcurrentKeyspace::default();
584 ks.set("key".into(), Bytes::from("value"), None);
585 assert!(ks.del("key"));
586 assert_eq!(ks.get("key"), None);
587 }
588
589 #[test]
590 fn del_missing() {
591 let ks = ConcurrentKeyspace::default();
592 assert!(!ks.del("missing"));
593 }
594
595 #[test]
596 fn exists_check() {
597 let ks = ConcurrentKeyspace::default();
598 ks.set("key".into(), Bytes::from("value"), None);
599 assert!(ks.exists("key"));
600 assert!(!ks.exists("missing"));
601 }
602
603 #[test]
604 fn ttl_expires() {
605 let ks = ConcurrentKeyspace::default();
606 ks.set(
607 "key".into(),
608 Bytes::from("value"),
609 Some(Duration::from_millis(10)),
610 );
611 assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
612 std::thread::sleep(Duration::from_millis(20));
613 assert_eq!(ks.get("key"), None);
614 }
615
616 #[test]
617 fn incr_new_key() {
618 let ks = ConcurrentKeyspace::default();
619 assert_eq!(ks.incr("counter").unwrap(), 1);
620 assert_eq!(ks.get("counter"), Some(Bytes::from("1")));
621 }
622
623 #[test]
624 fn incr_existing_key() {
625 let ks = ConcurrentKeyspace::default();
626 ks.set("counter".into(), Bytes::from("10"), None);
627 assert_eq!(ks.incr("counter").unwrap(), 11);
628 }
629
630 #[test]
631 fn decr_below_zero() {
632 let ks = ConcurrentKeyspace::default();
633 assert_eq!(ks.decr("counter").unwrap(), -1);
634 assert_eq!(ks.decr("counter").unwrap(), -2);
635 }
636
637 #[test]
638 fn incr_by_delta() {
639 let ks = ConcurrentKeyspace::default();
640 assert_eq!(ks.incr_by("counter", 5).unwrap(), 5);
641 assert_eq!(ks.incr_by("counter", -3).unwrap(), 2);
642 }
643
644 #[test]
645 fn incr_non_integer_value() {
646 let ks = ConcurrentKeyspace::default();
647 ks.set("key".into(), Bytes::from("not_a_number"), None);
648 assert_eq!(ks.incr("key"), Err(ConcurrentOpError::NotAnInteger));
649 }
650
651 #[test]
652 fn incr_overflow() {
653 let ks = ConcurrentKeyspace::default();
654 ks.set("key".into(), Bytes::from(i64::MAX.to_string()), None);
655 assert_eq!(ks.incr("key"), Err(ConcurrentOpError::Overflow));
656 }
657
658 #[test]
659 fn incr_by_float_new_key() {
660 let ks = ConcurrentKeyspace::default();
661 let val = ks.incr_by_float("key", 2.5).unwrap();
662 assert!((val - 2.5).abs() < f64::EPSILON);
663 }
664
665 #[test]
666 fn incr_by_float_existing() {
667 let ks = ConcurrentKeyspace::default();
668 ks.set("key".into(), Bytes::from("10.5"), None);
669 let val = ks.incr_by_float("key", 1.5).unwrap();
670 assert!((val - 12.0).abs() < f64::EPSILON);
671 }
672
673 #[test]
674 fn incr_by_float_not_a_float() {
675 let ks = ConcurrentKeyspace::default();
676 ks.set("key".into(), Bytes::from("hello"), None);
677 assert_eq!(
678 ks.incr_by_float("key", 1.0),
679 Err(ConcurrentFloatError::NotAFloat)
680 );
681 }
682
683 #[test]
684 fn incr_by_float_infinity() {
685 let ks = ConcurrentKeyspace::default();
686 ks.set("key".into(), Bytes::from(f64::MAX.to_string()), None);
687 assert_eq!(
688 ks.incr_by_float("key", f64::MAX),
689 Err(ConcurrentFloatError::NanOrInfinity)
690 );
691 }
692
693 #[test]
694 fn append_new_key() {
695 let ks = ConcurrentKeyspace::default();
696 assert_eq!(ks.append("key", b"hello"), 5);
697 assert_eq!(ks.get("key"), Some(Bytes::from("hello")));
698 }
699
700 #[test]
701 fn append_existing_key() {
702 let ks = ConcurrentKeyspace::default();
703 ks.set("key".into(), Bytes::from("hello"), None);
704 assert_eq!(ks.append("key", b" world"), 11);
705 assert_eq!(ks.get("key"), Some(Bytes::from("hello world")));
706 }
707
708 #[test]
709 fn strlen_existing() {
710 let ks = ConcurrentKeyspace::default();
711 ks.set("key".into(), Bytes::from("hello"), None);
712 assert_eq!(ks.strlen("key"), 5);
713 }
714
715 #[test]
716 fn strlen_missing() {
717 let ks = ConcurrentKeyspace::default();
718 assert_eq!(ks.strlen("missing"), 0);
719 }
720
721 #[test]
722 fn persist_removes_ttl() {
723 let ks = ConcurrentKeyspace::default();
724 ks.set(
725 "key".into(),
726 Bytes::from("val"),
727 Some(Duration::from_secs(60)),
728 );
729 assert!(ks.persist("key"));
730 assert!(matches!(ks.ttl("key"), TtlResult::NoExpiry));
731 }
732
733 #[test]
734 fn persist_no_ttl() {
735 let ks = ConcurrentKeyspace::default();
736 ks.set("key".into(), Bytes::from("val"), None);
737 assert!(!ks.persist("key")); }
739
740 #[test]
741 fn persist_missing_key() {
742 let ks = ConcurrentKeyspace::default();
743 assert!(!ks.persist("missing"));
744 }
745
746 #[test]
747 fn pexpire_and_pttl() {
748 let ks = ConcurrentKeyspace::default();
749 ks.set("key".into(), Bytes::from("val"), None);
750 assert!(ks.pexpire("key", 5000));
751 match ks.pttl("key") {
752 TtlResult::Milliseconds(ms) => assert!(ms > 0 && ms <= 5000),
753 other => panic!("expected Milliseconds, got {other:?}"),
754 }
755 }
756
757 #[test]
758 fn pttl_no_expiry() {
759 let ks = ConcurrentKeyspace::default();
760 ks.set("key".into(), Bytes::from("val"), None);
761 assert!(matches!(ks.pttl("key"), TtlResult::NoExpiry));
762 }
763
764 #[test]
765 fn pttl_missing() {
766 let ks = ConcurrentKeyspace::default();
767 assert!(matches!(ks.pttl("missing"), TtlResult::NotFound));
768 }
769
770 #[test]
771 fn keys_match_pattern() {
772 let ks = ConcurrentKeyspace::default();
773 ks.set("user:1".into(), Bytes::from("a"), None);
774 ks.set("user:2".into(), Bytes::from("b"), None);
775 ks.set("item:1".into(), Bytes::from("c"), None);
776 let mut result = ks.keys("user:*");
777 result.sort();
778 assert_eq!(result, vec!["user:1", "user:2"]);
779 }
780
781 #[test]
782 fn keys_match_all() {
783 let ks = ConcurrentKeyspace::default();
784 ks.set("a".into(), Bytes::from("1"), None);
785 ks.set("b".into(), Bytes::from("2"), None);
786 let result = ks.keys("*");
787 assert_eq!(result.len(), 2);
788 }
789
790 #[test]
791 fn scan_basic() {
792 let ks = ConcurrentKeyspace::default();
793 ks.set("a".into(), Bytes::from("1"), None);
794 ks.set("b".into(), Bytes::from("2"), None);
795 ks.set("c".into(), Bytes::from("3"), None);
796 let (cursor, keys) = ks.scan_keys(0, 10, None);
797 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
799 }
800
801 #[test]
802 fn scan_with_pattern() {
803 let ks = ConcurrentKeyspace::default();
804 ks.set("user:1".into(), Bytes::from("a"), None);
805 ks.set("user:2".into(), Bytes::from("b"), None);
806 ks.set("item:1".into(), Bytes::from("c"), None);
807 let (_, keys) = ks.scan_keys(0, 10, Some("user:*"));
808 assert_eq!(keys.len(), 2);
809 }
810
811 #[test]
812 fn scan_with_count() {
813 let ks = ConcurrentKeyspace::default();
814 for i in 0..10 {
815 ks.set(format!("k{i}"), Bytes::from("v"), None);
816 }
817 let (cursor, keys) = ks.scan_keys(0, 3, None);
818 assert!(keys.len() <= 3);
819 if cursor > 0 {
821 let (_, keys2) = ks.scan_keys(cursor, 3, None);
822 assert!(!keys2.is_empty());
823 }
824 }
825
826 #[test]
827 fn rename_basic() {
828 let ks = ConcurrentKeyspace::default();
829 ks.set("old".into(), Bytes::from("value"), None);
830 ks.rename("old", "new").unwrap();
831 assert_eq!(ks.get("old"), None);
832 assert_eq!(ks.get("new"), Some(Bytes::from("value")));
833 }
834
835 #[test]
836 fn rename_missing_key() {
837 let ks = ConcurrentKeyspace::default();
838 assert!(ks.rename("missing", "new").is_err());
839 }
840
841 #[test]
842 fn rename_overwrites_destination() {
843 let ks = ConcurrentKeyspace::default();
844 ks.set("src".into(), Bytes::from("new_val"), None);
845 ks.set("dst".into(), Bytes::from("old_val"), None);
846 ks.rename("src", "dst").unwrap();
847 assert_eq!(ks.get("src"), None);
848 assert_eq!(ks.get("dst"), Some(Bytes::from("new_val")));
849 }
850
851 #[test]
852 fn concurrent_access() {
853 use std::sync::Arc;
854 use std::thread;
855
856 let ks = Arc::new(ConcurrentKeyspace::default());
857 let mut handles = vec![];
858
859 for i in 0..8 {
861 let ks = Arc::clone(&ks);
862 handles.push(thread::spawn(move || {
863 for j in 0..1000 {
864 let key = format!("key-{}-{}", i, j);
865 ks.set(key, Bytes::from("value"), None);
866 }
867 }));
868 }
869
870 for h in handles {
871 h.join().unwrap();
872 }
873
874 assert_eq!(ks.len(), 8000);
875 }
876}