1use std::collections::VecDeque;
9use std::time::Duration;
10
11use ahash::AHashMap;
12use bytes::Bytes;
13use compact_str::CompactString;
14use rand::seq::IteratorRandom;
15
16use tracing::warn;
17
18use crate::dropper::DropHandle;
19use crate::memory::{self, MemoryTracker};
20use crate::time;
21use crate::types::sorted_set::{ScoreBound, SortedSet, ZAddFlags};
22use crate::types::{self, normalize_range, Value};
23
24mod hash;
25mod list;
26#[cfg(feature = "protobuf")]
27mod proto;
28mod set;
29mod string;
30#[cfg(feature = "vector")]
31mod vector;
32mod zset;
33
34const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
35const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct WrongType;
40
41impl std::fmt::Display for WrongType {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(f, "{WRONGTYPE_MSG}")
44 }
45}
46
47impl std::error::Error for WrongType {}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum WriteError {
53 WrongType,
55 OutOfMemory,
57}
58
59impl From<WrongType> for WriteError {
60 fn from(_: WrongType) -> Self {
61 WriteError::WrongType
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum IncrError {
68 WrongType,
70 NotAnInteger,
72 Overflow,
74 OutOfMemory,
76}
77
78impl std::fmt::Display for IncrError {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
82 IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
83 IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
84 IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
85 }
86 }
87}
88
89impl std::error::Error for IncrError {}
90
91#[derive(Debug, Clone, PartialEq)]
93pub enum IncrFloatError {
94 WrongType,
96 NotAFloat,
98 NanOrInfinity,
100 OutOfMemory,
102}
103
104impl std::fmt::Display for IncrFloatError {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
108 IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
109 IncrFloatError::NanOrInfinity => {
110 write!(f, "ERR increment would produce NaN or Infinity")
111 }
112 IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
113 }
114 }
115}
116
117impl std::error::Error for IncrFloatError {}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum RenameError {
122 NoSuchKey,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum CopyError {
129 NoSuchKey,
131 OutOfMemory,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum LsetError {
138 WrongType,
140 NoSuchKey,
142 IndexOutOfRange,
144}
145
146impl std::fmt::Display for LsetError {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 LsetError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
150 LsetError::NoSuchKey => write!(f, "ERR no such key"),
151 LsetError::IndexOutOfRange => write!(f, "ERR index out of range"),
152 }
153 }
154}
155
156impl std::error::Error for LsetError {}
157
158impl std::fmt::Display for RenameError {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 RenameError::NoSuchKey => write!(f, "ERR no such key"),
162 }
163 }
164}
165
166impl std::error::Error for RenameError {}
167
168#[derive(Debug, Clone)]
171pub struct ZAddResult {
172 pub count: usize,
174 pub applied: Vec<(f64, String)>,
177}
178
179#[cfg(feature = "vector")]
181#[derive(Debug, Clone)]
182pub struct VAddResult {
183 pub element: String,
185 pub vector: Vec<f32>,
187 pub added: bool,
189}
190
191#[cfg(feature = "vector")]
193#[derive(Debug, Clone)]
194pub struct VAddBatchResult {
195 pub added_count: usize,
197 pub applied: Vec<(String, Vec<f32>)>,
200}
201
202#[cfg(feature = "vector")]
204#[derive(Debug, Clone)]
205pub enum VectorWriteError {
206 WrongType,
208 OutOfMemory,
210 IndexError(String),
212 PartialBatch {
215 message: String,
216 applied: Vec<(String, Vec<f32>)>,
217 },
218}
219
220#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
222pub enum EvictionPolicy {
223 #[default]
225 NoEviction,
226 AllKeysLru,
228}
229
230#[derive(Debug, Clone)]
232pub struct ShardConfig {
233 pub max_memory: Option<usize>,
235 pub eviction_policy: EvictionPolicy,
237 pub shard_id: u16,
239}
240
241impl Default for ShardConfig {
242 fn default() -> Self {
243 Self {
244 max_memory: None,
245 eviction_policy: EvictionPolicy::NoEviction,
246 shard_id: 0,
247 }
248 }
249}
250
251#[derive(Debug, PartialEq, Eq)]
253pub enum SetResult {
254 Ok,
256 OutOfMemory,
258 Blocked,
260}
261
262#[derive(Debug, Clone)]
275pub(crate) struct Entry {
276 pub(crate) value: Value,
277 pub(crate) expires_at_ms: u64,
279 pub(crate) cached_value_size: u32,
284 pub(crate) last_access_secs: u32,
287}
288
289impl Entry {
290 fn new(value: Value, ttl: Option<Duration>) -> Self {
291 let cached_value_size = memory::value_size(&value) as u32;
292 Self {
293 value,
294 expires_at_ms: time::expiry_from_duration(ttl),
295 cached_value_size,
296 last_access_secs: time::now_secs(),
297 }
298 }
299
300 fn is_expired(&self) -> bool {
302 time::is_expired(self.expires_at_ms)
303 }
304
305 #[inline(always)]
309 fn touch(&mut self, track: bool) {
310 if track {
311 self.last_access_secs = time::now_secs();
312 }
313 }
314
315 fn entry_size(&self, key: &str) -> usize {
318 key.len() + self.cached_value_size as usize + memory::ENTRY_OVERHEAD
319 }
320}
321
322#[derive(Debug, Clone, PartialEq, Eq)]
324pub enum TtlResult {
325 Seconds(u64),
327 Milliseconds(u64),
329 NoExpiry,
331 NotFound,
333}
334
335#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct KeyspaceStats {
338 pub key_count: usize,
340 pub used_bytes: usize,
342 pub keys_with_expiry: usize,
344 pub keys_expired: u64,
346 pub keys_evicted: u64,
348 pub oom_rejections: u64,
350}
351
352const EVICTION_SAMPLE_SIZE: usize = 16;
363
364pub struct Keyspace {
369 entries: AHashMap<CompactString, Entry>,
370 memory: MemoryTracker,
371 config: ShardConfig,
372 expiry_count: usize,
374 expired_total: u64,
376 evicted_total: u64,
378 oom_rejections: u64,
380 drop_handle: Option<DropHandle>,
383 next_version: u64,
386 versions: AHashMap<CompactString, u64>,
391 track_access: bool,
396}
397
398impl Keyspace {
399 pub fn new() -> Self {
401 Self::with_config(ShardConfig::default())
402 }
403
404 pub fn with_config(config: ShardConfig) -> Self {
406 let track_access = config.eviction_policy == EvictionPolicy::AllKeysLru;
407 Self {
408 entries: AHashMap::new(),
409 memory: MemoryTracker::new(),
410 config,
411 expiry_count: 0,
412 expired_total: 0,
413 evicted_total: 0,
414 oom_rejections: 0,
415 drop_handle: None,
416 next_version: 0,
417 versions: AHashMap::new(),
418 track_access,
419 }
420 }
421
422 pub fn set_drop_handle(&mut self, handle: DropHandle) {
426 self.drop_handle = Some(handle);
427 }
428
429 fn bump_version(&mut self, key: &str) {
433 if let Some(ver) = self.versions.get_mut(key) {
434 self.next_version += 1;
435 *ver = self.next_version;
436 }
437 }
438
439 pub fn key_version(&mut self, key: &str) -> Option<u64> {
445 let entry = self.entries.get(key)?;
446 if entry.is_expired() {
447 return None;
448 }
449 let ver = *self
450 .versions
451 .entry(CompactString::from(key))
452 .or_insert(self.next_version);
453 Some(ver)
454 }
455
456 fn remove_version(&mut self, key: &str) {
459 self.versions.remove(key);
460 }
461
462 pub fn clear_versions(&mut self) {
465 self.versions.clear();
466 }
467
468 fn decrement_expiry_if_set(&mut self, entry: &Entry) {
470 if entry.expires_at_ms != 0 {
471 self.expiry_count = self.expiry_count.saturating_sub(1);
472 }
473 }
474
475 fn cleanup_after_remove(
482 &mut self,
483 key: &str,
484 old_size: usize,
485 is_empty: bool,
486 removed_bytes: usize,
487 ) {
488 if is_empty {
489 if let Some(removed) = self.entries.remove(key) {
490 self.decrement_expiry_if_set(&removed);
491 }
492 self.memory.remove_with_size(old_size);
493 } else {
494 self.memory.shrink_by(removed_bytes);
495 if let Some(entry) = self.entries.get_mut(key) {
496 entry.cached_value_size =
497 (entry.cached_value_size as usize).saturating_sub(removed_bytes) as u32;
498 }
499 }
500 }
501
502 fn ensure_collection_type(
507 &self,
508 key: &str,
509 type_check: fn(&Value) -> bool,
510 ) -> Result<bool, WriteError> {
511 match self.entries.get(key) {
512 None => Ok(true),
513 Some(e) if type_check(&e.value) => Ok(false),
514 Some(_) => Err(WriteError::WrongType),
515 }
516 }
517
518 fn reserve_memory(
522 &mut self,
523 is_new: bool,
524 key: &str,
525 base_overhead: usize,
526 element_increase: usize,
527 ) -> Result<(), WriteError> {
528 let estimated_increase = if is_new {
529 memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
530 } else {
531 element_increase
532 };
533 if self.enforce_memory_limit(estimated_increase) {
534 Ok(())
535 } else {
536 Err(WriteError::OutOfMemory)
537 }
538 }
539
540 fn insert_empty(&mut self, key: &str, value: Value) {
543 self.memory.add(key, &value);
544 let entry = Entry::new(value, None);
545 self.entries.insert(CompactString::from(key), entry);
546 self.bump_version(key);
547 }
548
549 fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
556 let entry = self.entries.get_mut(key)?;
557 let old_size = entry.entry_size(key);
558 let result = f(entry);
559 let entry = self.entries.get_mut(key)?;
561 let new_value_size = memory::value_size(&entry.value);
562 entry.cached_value_size = new_value_size as u32;
563 let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
564 self.memory.adjust(old_size, new_size);
565 self.bump_version(key);
566 Some(result)
567 }
568
569 fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
572 match (had_expiry, has_expiry) {
573 (false, true) => self.expiry_count += 1,
574 (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
575 _ => {}
576 }
577 }
578
579 fn try_evict(&mut self) -> bool {
589 if self.entries.is_empty() {
590 return false;
591 }
592
593 let mut rng = rand::rng();
594
595 let mut best_key: Option<&str> = None;
599 let mut best_access = u32::MAX;
600 let mut seen = 0usize;
601
602 for (key, entry) in &self.entries {
603 seen += 1;
607 if seen <= EVICTION_SAMPLE_SIZE {
608 if entry.last_access_secs < best_access {
609 best_access = entry.last_access_secs;
610 best_key = Some(&**key);
611 }
612 } else {
613 use rand::Rng;
614 let j = rng.random_range(0..seen);
615 if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
616 best_access = entry.last_access_secs;
617 best_key = Some(&**key);
618 }
619 }
620 }
621
622 if let Some(victim) = best_key {
623 let victim = victim.to_owned();
625 if let Some(entry) = self.entries.remove(victim.as_str()) {
626 self.memory.remove(&victim, &entry.value);
627 self.decrement_expiry_if_set(&entry);
628 self.evicted_total += 1;
629 self.remove_version(&victim);
630 self.defer_drop(entry.value);
631 return true;
632 }
633 }
634 false
635 }
636
637 fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
646 if let Some(max) = self.config.max_memory {
647 let limit = memory::effective_limit(max);
648 while self.memory.used_bytes() + estimated_increase > limit {
649 match self.config.eviction_policy {
650 EvictionPolicy::NoEviction => {
651 self.oom_rejections += 1;
652 if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000) {
654 warn!(
655 used_bytes = self.memory.used_bytes(),
656 limit,
657 requested = estimated_increase,
658 total_rejections = self.oom_rejections,
659 "OOM: write rejected (policy: noeviction)"
660 );
661 }
662 return false;
663 }
664 EvictionPolicy::AllKeysLru => {
665 if !self.try_evict() {
666 self.oom_rejections += 1;
667 if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000)
668 {
669 warn!(
670 used_bytes = self.memory.used_bytes(),
671 limit,
672 requested = estimated_increase,
673 total_rejections = self.oom_rejections,
674 "OOM: write rejected (eviction exhausted)"
675 );
676 }
677 return false;
678 }
679 }
680 }
681 }
682 }
683 true
684 }
685
686 pub fn del(&mut self, key: &str) -> bool {
691 if self.remove_if_expired(key) {
692 return false;
693 }
694 if let Some(entry) = self.entries.remove(key) {
695 self.memory.remove(key, &entry.value);
696 self.decrement_expiry_if_set(&entry);
697 self.remove_version(key);
698 self.defer_drop(entry.value);
699 true
700 } else {
701 false
702 }
703 }
704
705 pub fn unlink(&mut self, key: &str) -> bool {
711 if self.remove_if_expired(key) {
712 return false;
713 }
714 if let Some(entry) = self.entries.remove(key) {
715 self.memory.remove(key, &entry.value);
716 self.decrement_expiry_if_set(&entry);
717 self.remove_version(key);
718 if let Some(ref handle) = self.drop_handle {
720 handle.defer_value(entry.value);
721 }
722 true
723 } else {
724 false
725 }
726 }
727
728 pub(crate) fn flush_async(&mut self) -> AHashMap<CompactString, Entry> {
732 let old = std::mem::take(&mut self.entries);
733 self.memory.reset();
734 self.expiry_count = 0;
735 self.versions.clear();
736 old
737 }
738
739 pub fn exists(&mut self, key: &str) -> bool {
741 if self.remove_if_expired(key) {
742 return false;
743 }
744 self.entries.contains_key(key)
745 }
746
747 pub fn random_key(&mut self) -> Option<String> {
752 for _ in 0..5 {
754 let mut rng = rand::rng();
755 let key = self.entries.keys().choose(&mut rng)?.clone();
756
757 if self.remove_if_expired(&key) {
758 continue;
759 }
760 return Some(key.to_string());
761 }
762 None
763 }
764
765 pub fn touch(&mut self, key: &str) -> bool {
767 if self.remove_if_expired(key) {
768 return false;
769 }
770 match self.entries.get_mut(key) {
771 Some(entry) => {
772 entry.touch(self.track_access);
773 true
774 }
775 None => false,
776 }
777 }
778
779 pub fn sort(
784 &mut self,
785 key: &str,
786 desc: bool,
787 alpha: bool,
788 limit: Option<(i64, i64)>,
789 ) -> Result<Vec<Bytes>, &'static str> {
790 if self.remove_if_expired(key) {
791 return Ok(Vec::new());
792 }
793 let entry = match self.entries.get_mut(key) {
794 Some(e) => {
795 e.touch(self.track_access);
796 e
797 }
798 None => return Ok(Vec::new()),
799 };
800
801 let mut items: Vec<Bytes> = match &entry.value {
803 Value::List(deq) => deq.iter().cloned().collect(),
804 Value::Set(set) => set.iter().map(|s| Bytes::from(s.clone())).collect(),
805 Value::SortedSet(zset) => zset
806 .iter()
807 .map(|(m, _)| Bytes::from(m.to_owned()))
808 .collect(),
809 _ => return Err(WRONGTYPE_MSG),
810 };
811
812 if alpha {
814 items.sort();
815 if desc {
816 items.reverse();
817 }
818 } else {
819 let mut parse_err = false;
821 items.sort_by(|a, b| {
822 let a_str = std::str::from_utf8(a).unwrap_or("");
823 let b_str = std::str::from_utf8(b).unwrap_or("");
824 let a_val = a_str.parse::<f64>().unwrap_or_else(|_| {
825 parse_err = true;
826 0.0
827 });
828 let b_val = b_str.parse::<f64>().unwrap_or_else(|_| {
829 parse_err = true;
830 0.0
831 });
832 if desc {
833 b_val
834 .partial_cmp(&a_val)
835 .unwrap_or(std::cmp::Ordering::Equal)
836 } else {
837 a_val
838 .partial_cmp(&b_val)
839 .unwrap_or(std::cmp::Ordering::Equal)
840 }
841 });
842 if parse_err {
843 return Err("ERR One or more scores can't be converted into double");
844 }
845 }
846
847 if let Some((offset, count)) = limit {
849 let offset = offset.max(0) as usize;
850 let count = count.max(0) as usize;
851 let end = offset.saturating_add(count).min(items.len());
852 if offset < items.len() {
853 items = items[offset..end].to_vec();
854 } else {
855 items.clear();
856 }
857 }
858
859 Ok(items)
860 }
861
862 pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
865 if self.remove_if_expired(key) {
866 return false;
867 }
868 match self.entries.get_mut(key) {
869 Some(entry) => {
870 if entry.expires_at_ms == 0 {
871 self.expiry_count += 1;
872 }
873 entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
874 self.bump_version(key);
875 true
876 }
877 None => false,
878 }
879 }
880
881 pub fn ttl(&mut self, key: &str) -> TtlResult {
886 if self.remove_if_expired(key) {
887 return TtlResult::NotFound;
888 }
889 match self.entries.get(key) {
890 Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
891 Some(secs) => TtlResult::Seconds(secs),
892 None => TtlResult::NoExpiry,
893 },
894 None => TtlResult::NotFound,
895 }
896 }
897
898 pub fn persist(&mut self, key: &str) -> bool {
903 if self.remove_if_expired(key) {
904 return false;
905 }
906 match self.entries.get_mut(key) {
907 Some(entry) => {
908 if entry.expires_at_ms != 0 {
909 entry.expires_at_ms = 0;
910 self.expiry_count = self.expiry_count.saturating_sub(1);
911 self.bump_version(key);
912 true
913 } else {
914 false
915 }
916 }
917 None => false,
918 }
919 }
920
921 pub fn pttl(&mut self, key: &str) -> TtlResult {
926 if self.remove_if_expired(key) {
927 return TtlResult::NotFound;
928 }
929 match self.entries.get(key) {
930 Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
931 Some(ms) => TtlResult::Milliseconds(ms),
932 None => TtlResult::NoExpiry,
933 },
934 None => TtlResult::NotFound,
935 }
936 }
937
938 pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
943 if self.remove_if_expired(key) {
944 return false;
945 }
946 match self.entries.get_mut(key) {
947 Some(entry) => {
948 if entry.expires_at_ms == 0 {
949 self.expiry_count += 1;
950 }
951 entry.expires_at_ms = time::now_ms().saturating_add(millis);
952 self.bump_version(key);
953 true
954 }
955 None => false,
956 }
957 }
958
959 pub fn keys(&self, pattern: &str) -> Vec<String> {
964 let len = self.entries.len();
965 if len > 10_000 {
966 warn!(
967 key_count = len,
968 "KEYS on large keyspace, consider SCAN instead"
969 );
970 }
971 let compiled = GlobPattern::new(pattern);
972 self.entries
973 .iter()
974 .filter(|(_, entry)| !entry.is_expired())
975 .filter(|(key, _)| compiled.matches(key))
976 .map(|(key, _)| String::from(&**key))
977 .collect()
978 }
979
980 pub fn count_keys_in_slot(&self, slot: u16) -> usize {
984 self.entries
985 .iter()
986 .filter(|(_, entry)| !entry.is_expired())
987 .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
988 .count()
989 }
990
991 pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
995 self.entries
996 .iter()
997 .filter(|(_, entry)| !entry.is_expired())
998 .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
999 .take(count)
1000 .map(|(key, _)| String::from(&**key))
1001 .collect()
1002 }
1003
1004 pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
1007 self.remove_if_expired(key);
1008 self.remove_if_expired(newkey);
1009
1010 let entry = match self.entries.remove(key) {
1011 Some(entry) => entry,
1012 None => return Err(RenameError::NoSuchKey),
1013 };
1014
1015 self.memory.remove(key, &entry.value);
1017 self.decrement_expiry_if_set(&entry);
1018
1019 if let Some(old_dest) = self.entries.remove(newkey) {
1021 self.memory.remove(newkey, &old_dest.value);
1022 self.decrement_expiry_if_set(&old_dest);
1023 }
1024
1025 self.memory.add(newkey, &entry.value);
1027 if entry.expires_at_ms != 0 {
1028 self.expiry_count += 1;
1029 }
1030 self.remove_version(key);
1031 self.entries.insert(CompactString::from(newkey), entry);
1032 self.bump_version(newkey);
1033 Ok(())
1034 }
1035
1036 pub fn copy(&mut self, source: &str, dest: &str, replace: bool) -> Result<bool, CopyError> {
1040 self.remove_if_expired(source);
1041 self.remove_if_expired(dest);
1042
1043 let src_entry = match self.entries.get(source) {
1044 Some(e) => e,
1045 None => return Err(CopyError::NoSuchKey),
1046 };
1047
1048 if !replace && self.entries.contains_key(dest) {
1050 return Ok(false);
1051 }
1052
1053 let cloned_value = src_entry.value.clone();
1055 let cloned_expire = if src_entry.expires_at_ms != 0 {
1056 Some(src_entry.expires_at_ms)
1057 } else {
1058 None
1059 };
1060
1061 let new_size = memory::entry_size(dest, &cloned_value);
1063
1064 let old_dest_size = self
1066 .entries
1067 .get(dest)
1068 .map(|e| e.entry_size(dest))
1069 .unwrap_or(0);
1070 let net_increase = new_size.saturating_sub(old_dest_size);
1071 if !self.enforce_memory_limit(net_increase) {
1072 return Err(CopyError::OutOfMemory);
1073 }
1074
1075 if let Some(old_dest) = self.entries.remove(dest) {
1077 self.memory.remove(dest, &old_dest.value);
1078 self.decrement_expiry_if_set(&old_dest);
1079 self.defer_drop(old_dest.value);
1080 }
1081
1082 self.memory.add(dest, &cloned_value);
1084 let has_expiry = cloned_expire.is_some();
1085 if has_expiry {
1086 self.expiry_count += 1;
1087 }
1088 let mut entry = Entry::new(cloned_value, None);
1089 if let Some(ts) = cloned_expire {
1091 entry.expires_at_ms = ts;
1092 }
1093 self.entries.insert(CompactString::from(dest), entry);
1094 self.bump_version(dest);
1095 Ok(true)
1096 }
1097
1098 pub fn stats(&self) -> KeyspaceStats {
1102 KeyspaceStats {
1103 key_count: self.memory.key_count(),
1104 used_bytes: self.memory.used_bytes(),
1105 keys_with_expiry: self.expiry_count,
1106 keys_expired: self.expired_total,
1107 keys_evicted: self.evicted_total,
1108 oom_rejections: self.oom_rejections,
1109 }
1110 }
1111
1112 pub fn len(&self) -> usize {
1114 self.entries.len()
1115 }
1116
1117 pub fn clear(&mut self) {
1119 self.entries.clear();
1120 self.memory.reset();
1121 self.expiry_count = 0;
1122 self.versions.clear();
1123 }
1124
1125 pub fn is_empty(&self) -> bool {
1127 self.entries.is_empty()
1128 }
1129
1130 pub fn scan_keys(
1135 &self,
1136 cursor: u64,
1137 count: usize,
1138 pattern: Option<&str>,
1139 ) -> (u64, Vec<String>) {
1140 let mut keys = Vec::with_capacity(count);
1141 let mut position = 0u64;
1142 let target_count = if count == 0 { 10 } else { count };
1143
1144 let compiled = pattern.map(GlobPattern::new);
1145
1146 for (key, entry) in self.entries.iter() {
1147 if entry.is_expired() {
1149 continue;
1150 }
1151
1152 if position < cursor {
1154 position += 1;
1155 continue;
1156 }
1157
1158 if let Some(ref pat) = compiled {
1160 if !pat.matches(key) {
1161 position += 1;
1162 continue;
1163 }
1164 }
1165
1166 keys.push(String::from(&**key));
1167 position += 1;
1168
1169 if keys.len() >= target_count {
1170 return (position, keys);
1172 }
1173 }
1174
1175 (0, keys)
1177 }
1178
1179 pub fn dump(&mut self, key: &str) -> Option<(&Value, i64)> {
1185 if self.remove_if_expired(key) {
1186 return None;
1187 }
1188 let entry = self.entries.get(key)?;
1189 let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1190 Some(ms) => ms.min(i64::MAX as u64) as i64,
1191 None => -1,
1192 };
1193 Some((&entry.value, ttl_ms))
1194 }
1195
1196 pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
1200 self.entries.iter().filter_map(move |(key, entry)| {
1201 if entry.is_expired() {
1202 return None;
1203 }
1204 let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1205 Some(ms) => ms.min(i64::MAX as u64) as i64,
1206 None => -1,
1207 };
1208 Some((&**key, &entry.value, ttl_ms))
1209 })
1210 }
1211
1212 pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
1218 let has_expiry = ttl.is_some();
1219
1220 if let Some(old) = self.entries.get(key.as_str()) {
1222 self.memory.replace(&key, &old.value, &value);
1223 self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
1224 } else {
1225 self.memory.add(&key, &value);
1226 if has_expiry {
1227 self.expiry_count += 1;
1228 }
1229 }
1230
1231 let entry = Entry::new(value, ttl);
1232 self.entries.insert(CompactString::from(key.clone()), entry);
1233 self.bump_version(&key);
1234 }
1235
1236 pub fn expire_sample(&mut self, count: usize) -> usize {
1241 if self.entries.is_empty() {
1242 return 0;
1243 }
1244
1245 let mut rng = rand::rng();
1246
1247 let keys_to_check: Vec<String> = self
1248 .entries
1249 .keys()
1250 .choose_multiple(&mut rng, count)
1251 .into_iter()
1252 .map(|k| String::from(&**k))
1253 .collect();
1254
1255 let mut removed = 0;
1256 for key in &keys_to_check {
1257 if self.remove_if_expired(key) {
1258 removed += 1;
1259 }
1260 }
1261 removed
1262 }
1263
1264 fn remove_expired_entry(&mut self, key: &str) {
1268 if let Some(entry) = self.entries.remove(key) {
1269 self.memory.remove(key, &entry.value);
1270 self.decrement_expiry_if_set(&entry);
1271 self.expired_total += 1;
1272 self.remove_version(key);
1273 self.defer_drop(entry.value);
1274 }
1275 }
1276
1277 fn remove_if_expired(&mut self, key: &str) -> bool {
1280 let expired = self
1281 .entries
1282 .get(key)
1283 .map(|e| e.is_expired())
1284 .unwrap_or(false);
1285
1286 if expired {
1287 if let Some(entry) = self.entries.remove(key) {
1288 self.memory.remove(key, &entry.value);
1289 self.decrement_expiry_if_set(&entry);
1290 self.expired_total += 1;
1291 self.remove_version(key);
1292 self.defer_drop(entry.value);
1293 }
1294 }
1295 expired
1296 }
1297
1298 fn get_live_entry(&mut self, key: &str) -> Option<&mut Entry> {
1310 self.remove_if_expired(key);
1311 let entry = self.entries.get_mut(key)?;
1312 entry.touch(self.track_access);
1313 Some(entry)
1314 }
1315
1316 fn defer_drop(&self, value: Value) {
1319 if let Some(ref handle) = self.drop_handle {
1320 handle.defer_value(value);
1321 }
1322 }
1323}
1324
1325impl Default for Keyspace {
1326 fn default() -> Self {
1327 Self::new()
1328 }
1329}
1330
1331pub(crate) fn format_float(val: f64) -> String {
1336 if val == 0.0 {
1337 return "0".into();
1338 }
1339 let s = format!("{:.17e}", val);
1341 let reparsed: f64 = s.parse().unwrap_or(val);
1343 if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
1345 format!("{}", reparsed as i64)
1346 } else {
1347 let formatted = format!("{}", reparsed);
1349 formatted
1350 }
1351}
1352
1353pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
1368 let pat: Vec<char> = pattern.chars().collect();
1369 glob_match_compiled(&pat, text)
1370}
1371
1372pub(crate) struct GlobPattern {
1376 chars: Vec<char>,
1377}
1378
1379impl GlobPattern {
1380 pub(crate) fn new(pattern: &str) -> Self {
1381 Self {
1382 chars: pattern.chars().collect(),
1383 }
1384 }
1385
1386 pub(crate) fn matches(&self, text: &str) -> bool {
1387 glob_match_compiled(&self.chars, text)
1388 }
1389}
1390
1391fn glob_match_compiled(pat: &[char], text: &str) -> bool {
1393 let txt: Vec<char> = text.chars().collect();
1394
1395 let mut pi = 0; let mut ti = 0; let mut star_pi: Option<usize> = None;
1400 let mut star_ti: usize = 0;
1401
1402 while ti < txt.len() || pi < pat.len() {
1403 if pi < pat.len() {
1404 match pat[pi] {
1405 '*' => {
1406 star_pi = Some(pi);
1408 star_ti = ti;
1409 pi += 1;
1410 continue;
1411 }
1412 '?' if ti < txt.len() => {
1413 pi += 1;
1414 ti += 1;
1415 continue;
1416 }
1417 '[' if ti < txt.len() => {
1418 let tc = txt[ti];
1420 let mut j = pi + 1;
1421 let mut negated = false;
1422 let mut matched = false;
1423
1424 if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1425 negated = true;
1426 j += 1;
1427 }
1428
1429 while j < pat.len() && pat[j] != ']' {
1430 if pat[j] == tc {
1431 matched = true;
1432 }
1433 j += 1;
1434 }
1435
1436 if negated {
1437 matched = !matched;
1438 }
1439
1440 if matched && j < pat.len() {
1441 pi = j + 1; ti += 1;
1443 continue;
1444 }
1445 }
1447 c if ti < txt.len() && c == txt[ti] => {
1448 pi += 1;
1449 ti += 1;
1450 continue;
1451 }
1452 _ => {}
1453 }
1454 }
1455
1456 if let Some(sp) = star_pi {
1458 pi = sp + 1;
1459 star_ti += 1;
1460 ti = star_ti;
1461 if ti > txt.len() {
1462 return false;
1463 }
1464 } else {
1465 return false;
1466 }
1467 }
1468
1469 while pi < pat.len() && pat[pi] == '*' {
1471 pi += 1;
1472 }
1473
1474 pi == pat.len()
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479 use super::*;
1480 use std::thread;
1481
1482 #[test]
1483 fn del_existing() {
1484 let mut ks = Keyspace::new();
1485 ks.set("key".into(), Bytes::from("val"), None, false, false);
1486 assert!(ks.del("key"));
1487 assert_eq!(ks.get("key").unwrap(), None);
1488 }
1489
1490 #[test]
1491 fn del_missing() {
1492 let mut ks = Keyspace::new();
1493 assert!(!ks.del("nope"));
1494 }
1495
1496 #[test]
1497 fn exists_present_and_absent() {
1498 let mut ks = Keyspace::new();
1499 ks.set("yes".into(), Bytes::from("here"), None, false, false);
1500 assert!(ks.exists("yes"));
1501 assert!(!ks.exists("no"));
1502 }
1503
1504 #[test]
1505 fn ttl_no_expiry() {
1506 let mut ks = Keyspace::new();
1507 ks.set("key".into(), Bytes::from("val"), None, false, false);
1508 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1509 }
1510
1511 #[test]
1512 fn ttl_not_found() {
1513 let mut ks = Keyspace::new();
1514 assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1515 }
1516
1517 #[test]
1518 fn ttl_with_expiry() {
1519 let mut ks = Keyspace::new();
1520 ks.set(
1521 "key".into(),
1522 Bytes::from("val"),
1523 Some(Duration::from_secs(100)),
1524 false,
1525 false,
1526 );
1527 match ks.ttl("key") {
1528 TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
1529 other => panic!("expected Seconds, got {other:?}"),
1530 }
1531 }
1532
1533 #[test]
1534 fn ttl_expired_key() {
1535 let mut ks = Keyspace::new();
1536 ks.set(
1537 "temp".into(),
1538 Bytes::from("val"),
1539 Some(Duration::from_millis(10)),
1540 false,
1541 false,
1542 );
1543 thread::sleep(Duration::from_millis(30));
1544 assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1545 }
1546
1547 #[test]
1548 fn expire_existing_key() {
1549 let mut ks = Keyspace::new();
1550 ks.set("key".into(), Bytes::from("val"), None, false, false);
1551 assert!(ks.expire("key", 60));
1552 match ks.ttl("key") {
1553 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
1554 other => panic!("expected Seconds, got {other:?}"),
1555 }
1556 }
1557
1558 #[test]
1559 fn expire_missing_key() {
1560 let mut ks = Keyspace::new();
1561 assert!(!ks.expire("nope", 60));
1562 }
1563
1564 #[test]
1565 fn del_expired_key_returns_false() {
1566 let mut ks = Keyspace::new();
1567 ks.set(
1568 "temp".into(),
1569 Bytes::from("val"),
1570 Some(Duration::from_millis(10)),
1571 false,
1572 false,
1573 );
1574 thread::sleep(Duration::from_millis(30));
1575 assert!(!ks.del("temp"));
1577 }
1578
1579 #[test]
1582 fn memory_increases_on_set() {
1583 let mut ks = Keyspace::new();
1584 assert_eq!(ks.stats().used_bytes, 0);
1585 ks.set("key".into(), Bytes::from("value"), None, false, false);
1586 assert!(ks.stats().used_bytes > 0);
1587 assert_eq!(ks.stats().key_count, 1);
1588 }
1589
1590 #[test]
1591 fn memory_decreases_on_del() {
1592 let mut ks = Keyspace::new();
1593 ks.set("key".into(), Bytes::from("value"), None, false, false);
1594 let after_set = ks.stats().used_bytes;
1595 ks.del("key");
1596 assert_eq!(ks.stats().used_bytes, 0);
1597 assert!(after_set > 0);
1598 }
1599
1600 #[test]
1601 fn memory_adjusts_on_overwrite() {
1602 let mut ks = Keyspace::new();
1603 ks.set("key".into(), Bytes::from("short"), None, false, false);
1604 let small = ks.stats().used_bytes;
1605
1606 ks.set(
1607 "key".into(),
1608 Bytes::from("a much longer value"),
1609 None,
1610 false,
1611 false,
1612 );
1613 let large = ks.stats().used_bytes;
1614
1615 assert!(large > small);
1616 assert_eq!(ks.stats().key_count, 1);
1617 }
1618
1619 #[test]
1620 fn memory_decreases_on_expired_removal() {
1621 let mut ks = Keyspace::new();
1622 ks.set(
1623 "temp".into(),
1624 Bytes::from("data"),
1625 Some(Duration::from_millis(10)),
1626 false,
1627 false,
1628 );
1629 assert!(ks.stats().used_bytes > 0);
1630 thread::sleep(Duration::from_millis(30));
1631 let _ = ks.get("temp");
1633 assert_eq!(ks.stats().used_bytes, 0);
1634 assert_eq!(ks.stats().key_count, 0);
1635 }
1636
1637 #[test]
1638 fn stats_tracks_expiry_count() {
1639 let mut ks = Keyspace::new();
1640 ks.set("a".into(), Bytes::from("1"), None, false, false);
1641 ks.set(
1642 "b".into(),
1643 Bytes::from("2"),
1644 Some(Duration::from_secs(100)),
1645 false,
1646 false,
1647 );
1648 ks.set(
1649 "c".into(),
1650 Bytes::from("3"),
1651 Some(Duration::from_secs(200)),
1652 false,
1653 false,
1654 );
1655
1656 let stats = ks.stats();
1657 assert_eq!(stats.key_count, 3);
1658 assert_eq!(stats.keys_with_expiry, 2);
1659 }
1660
1661 #[test]
1664 fn noeviction_returns_oom_when_full() {
1665 let config = ShardConfig {
1668 max_memory: Some(130),
1669 eviction_policy: EvictionPolicy::NoEviction,
1670 ..ShardConfig::default()
1671 };
1672 let mut ks = Keyspace::with_config(config);
1673
1674 assert_eq!(
1676 ks.set("a".into(), Bytes::from("val"), None, false, false),
1677 SetResult::Ok
1678 );
1679
1680 let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1682 assert_eq!(result, SetResult::OutOfMemory);
1683
1684 assert!(ks.exists("a"));
1686 }
1687
1688 #[test]
1689 fn lru_eviction_makes_room() {
1690 let config = ShardConfig {
1691 max_memory: Some(130),
1692 eviction_policy: EvictionPolicy::AllKeysLru,
1693 ..ShardConfig::default()
1694 };
1695 let mut ks = Keyspace::with_config(config);
1696
1697 assert_eq!(
1698 ks.set("a".into(), Bytes::from("val"), None, false, false),
1699 SetResult::Ok
1700 );
1701
1702 assert_eq!(
1704 ks.set("b".into(), Bytes::from("val"), None, false, false),
1705 SetResult::Ok
1706 );
1707
1708 assert!(!ks.exists("a"));
1710 assert!(ks.exists("b"));
1711 }
1712
1713 #[test]
1714 fn safety_margin_rejects_near_raw_limit() {
1715 let config = ShardConfig {
1720 max_memory: Some(120),
1721 eviction_policy: EvictionPolicy::NoEviction,
1722 ..ShardConfig::default()
1723 };
1724 let mut ks = Keyspace::with_config(config);
1725
1726 assert_eq!(
1727 ks.set("a".into(), Bytes::from("val"), None, false, false),
1728 SetResult::Ok
1729 );
1730
1731 let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1732 assert_eq!(result, SetResult::OutOfMemory);
1733 }
1734
1735 #[test]
1736 fn overwrite_same_size_succeeds_at_limit() {
1737 let config = ShardConfig {
1738 max_memory: Some(130),
1739 eviction_policy: EvictionPolicy::NoEviction,
1740 ..ShardConfig::default()
1741 };
1742 let mut ks = Keyspace::with_config(config);
1743
1744 assert_eq!(
1745 ks.set("a".into(), Bytes::from("val"), None, false, false),
1746 SetResult::Ok
1747 );
1748
1749 assert_eq!(
1751 ks.set("a".into(), Bytes::from("new"), None, false, false),
1752 SetResult::Ok
1753 );
1754 assert_eq!(
1755 ks.get("a").unwrap(),
1756 Some(Value::String(Bytes::from("new")))
1757 );
1758 }
1759
1760 #[test]
1761 fn overwrite_larger_value_respects_limit() {
1762 let config = ShardConfig {
1763 max_memory: Some(130),
1764 eviction_policy: EvictionPolicy::NoEviction,
1765 ..ShardConfig::default()
1766 };
1767 let mut ks = Keyspace::with_config(config);
1768
1769 assert_eq!(
1770 ks.set("a".into(), Bytes::from("val"), None, false, false),
1771 SetResult::Ok
1772 );
1773
1774 let big_value = "x".repeat(200);
1776 let result = ks.set("a".into(), Bytes::from(big_value), None, false, false);
1777 assert_eq!(result, SetResult::OutOfMemory);
1778
1779 assert_eq!(
1781 ks.get("a").unwrap(),
1782 Some(Value::String(Bytes::from("val")))
1783 );
1784 }
1785
1786 #[test]
1789 fn iter_entries_returns_live_entries() {
1790 let mut ks = Keyspace::new();
1791 ks.set("a".into(), Bytes::from("1"), None, false, false);
1792 ks.set(
1793 "b".into(),
1794 Bytes::from("2"),
1795 Some(Duration::from_secs(100)),
1796 false,
1797 false,
1798 );
1799
1800 let entries: Vec<_> = ks.iter_entries().collect();
1801 assert_eq!(entries.len(), 2);
1802 }
1803
1804 #[test]
1805 fn iter_entries_skips_expired() {
1806 let mut ks = Keyspace::new();
1807 ks.set(
1808 "dead".into(),
1809 Bytes::from("gone"),
1810 Some(Duration::from_millis(1)),
1811 false,
1812 false,
1813 );
1814 ks.set("alive".into(), Bytes::from("here"), None, false, false);
1815 thread::sleep(Duration::from_millis(10));
1816
1817 let entries: Vec<_> = ks.iter_entries().collect();
1818 assert_eq!(entries.len(), 1);
1819 assert_eq!(entries[0].0, "alive");
1820 }
1821
1822 #[test]
1823 fn iter_entries_ttl_for_no_expiry() {
1824 let mut ks = Keyspace::new();
1825 ks.set("permanent".into(), Bytes::from("val"), None, false, false);
1826
1827 let entries: Vec<_> = ks.iter_entries().collect();
1828 assert_eq!(entries[0].2, -1);
1829 }
1830
1831 #[test]
1834 fn restore_adds_entry() {
1835 let mut ks = Keyspace::new();
1836 ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1837 assert_eq!(
1838 ks.get("restored").unwrap(),
1839 Some(Value::String(Bytes::from("data")))
1840 );
1841 assert_eq!(ks.stats().key_count, 1);
1842 }
1843
1844 #[test]
1845 fn restore_with_zero_ttl_expires_immediately() {
1846 let mut ks = Keyspace::new();
1847 ks.restore(
1849 "short-lived".into(),
1850 Value::String(Bytes::from("data")),
1851 Some(Duration::from_millis(1)),
1852 );
1853 std::thread::sleep(Duration::from_millis(5));
1855 assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
1856 }
1857
1858 #[test]
1859 fn restore_overwrites_existing() {
1860 let mut ks = Keyspace::new();
1861 ks.set("key".into(), Bytes::from("old"), None, false, false);
1862 ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1863 assert_eq!(
1864 ks.get("key").unwrap(),
1865 Some(Value::String(Bytes::from("new")))
1866 );
1867 assert_eq!(ks.stats().key_count, 1);
1868 }
1869
1870 #[test]
1871 fn restore_bypasses_memory_limit() {
1872 let config = ShardConfig {
1873 max_memory: Some(50), eviction_policy: EvictionPolicy::NoEviction,
1875 ..ShardConfig::default()
1876 };
1877 let mut ks = Keyspace::with_config(config);
1878
1879 ks.restore(
1881 "big".into(),
1882 Value::String(Bytes::from("x".repeat(200))),
1883 None,
1884 );
1885 assert_eq!(ks.stats().key_count, 1);
1886 }
1887
1888 #[test]
1889 fn no_limit_never_rejects() {
1890 let mut ks = Keyspace::new();
1892 for i in 0..100 {
1893 assert_eq!(
1894 ks.set(format!("key:{i}"), Bytes::from("value"), None, false, false),
1895 SetResult::Ok
1896 );
1897 }
1898 assert_eq!(ks.len(), 100);
1899 }
1900
1901 #[test]
1902 fn clear_removes_all_keys() {
1903 let mut ks = Keyspace::new();
1904 ks.set("a".into(), Bytes::from("1"), None, false, false);
1905 ks.set(
1906 "b".into(),
1907 Bytes::from("2"),
1908 Some(Duration::from_secs(60)),
1909 false,
1910 false,
1911 );
1912 ks.lpush("list", &[Bytes::from("x")]).unwrap();
1913
1914 assert_eq!(ks.len(), 3);
1915 assert!(ks.stats().used_bytes > 0);
1916 assert_eq!(ks.stats().keys_with_expiry, 1);
1917
1918 ks.clear();
1919
1920 assert_eq!(ks.len(), 0);
1921 assert!(ks.is_empty());
1922 assert_eq!(ks.stats().used_bytes, 0);
1923 assert_eq!(ks.stats().keys_with_expiry, 0);
1924 }
1925
1926 #[test]
1929 fn scan_returns_keys() {
1930 let mut ks = Keyspace::new();
1931 ks.set("key1".into(), Bytes::from("a"), None, false, false);
1932 ks.set("key2".into(), Bytes::from("b"), None, false, false);
1933 ks.set("key3".into(), Bytes::from("c"), None, false, false);
1934
1935 let (cursor, keys) = ks.scan_keys(0, 10, None);
1936 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
1938 }
1939
1940 #[test]
1941 fn scan_empty_keyspace() {
1942 let ks = Keyspace::new();
1943 let (cursor, keys) = ks.scan_keys(0, 10, None);
1944 assert_eq!(cursor, 0);
1945 assert!(keys.is_empty());
1946 }
1947
1948 #[test]
1949 fn scan_with_pattern() {
1950 let mut ks = Keyspace::new();
1951 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
1952 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
1953 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
1954
1955 let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
1956 assert_eq!(cursor, 0);
1957 assert_eq!(keys.len(), 2);
1958 for k in &keys {
1959 assert!(k.starts_with("user:"));
1960 }
1961 }
1962
1963 #[test]
1964 fn scan_with_count_limit() {
1965 let mut ks = Keyspace::new();
1966 for i in 0..10 {
1967 ks.set(format!("k{i}"), Bytes::from("v"), None, false, false);
1968 }
1969
1970 let (cursor, keys) = ks.scan_keys(0, 3, None);
1972 assert!(!keys.is_empty());
1973 assert!(keys.len() <= 3);
1974
1975 if cursor != 0 {
1977 let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
1978 assert!(!keys2.is_empty());
1979 let _ = (cursor2, keys2);
1981 }
1982 }
1983
1984 #[test]
1985 fn scan_skips_expired_keys() {
1986 let mut ks = Keyspace::new();
1987 ks.set("live".into(), Bytes::from("a"), None, false, false);
1988 ks.set(
1989 "expired".into(),
1990 Bytes::from("b"),
1991 Some(Duration::from_millis(1)),
1992 false,
1993 false,
1994 );
1995
1996 std::thread::sleep(Duration::from_millis(5));
1997
1998 let (_, keys) = ks.scan_keys(0, 10, None);
1999 assert_eq!(keys.len(), 1);
2000 assert_eq!(keys[0], "live");
2001 }
2002
2003 #[test]
2004 fn glob_match_star() {
2005 assert!(super::glob_match("user:*", "user:123"));
2006 assert!(super::glob_match("user:*", "user:"));
2007 assert!(super::glob_match("*:data", "foo:data"));
2008 assert!(!super::glob_match("user:*", "item:123"));
2009 }
2010
2011 #[test]
2012 fn glob_match_question() {
2013 assert!(super::glob_match("key?", "key1"));
2014 assert!(super::glob_match("key?", "keya"));
2015 assert!(!super::glob_match("key?", "key"));
2016 assert!(!super::glob_match("key?", "key12"));
2017 }
2018
2019 #[test]
2020 fn glob_match_brackets() {
2021 assert!(super::glob_match("key[abc]", "keya"));
2022 assert!(super::glob_match("key[abc]", "keyb"));
2023 assert!(!super::glob_match("key[abc]", "keyd"));
2024 }
2025
2026 #[test]
2027 fn glob_match_literal() {
2028 assert!(super::glob_match("exact", "exact"));
2029 assert!(!super::glob_match("exact", "exactnot"));
2030 assert!(!super::glob_match("exact", "notexact"));
2031 }
2032
2033 #[test]
2036 fn persist_removes_expiry() {
2037 let mut ks = Keyspace::new();
2038 ks.set(
2039 "key".into(),
2040 Bytes::from("val"),
2041 Some(Duration::from_secs(60)),
2042 false,
2043 false,
2044 );
2045 assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2046
2047 assert!(ks.persist("key"));
2048 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2049 assert_eq!(ks.stats().keys_with_expiry, 0);
2050 }
2051
2052 #[test]
2053 fn persist_returns_false_without_expiry() {
2054 let mut ks = Keyspace::new();
2055 ks.set("key".into(), Bytes::from("val"), None, false, false);
2056 assert!(!ks.persist("key"));
2057 }
2058
2059 #[test]
2060 fn persist_returns_false_for_missing_key() {
2061 let mut ks = Keyspace::new();
2062 assert!(!ks.persist("missing"));
2063 }
2064
2065 #[test]
2066 fn pttl_returns_milliseconds() {
2067 let mut ks = Keyspace::new();
2068 ks.set(
2069 "key".into(),
2070 Bytes::from("val"),
2071 Some(Duration::from_secs(60)),
2072 false,
2073 false,
2074 );
2075 match ks.pttl("key") {
2076 TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2077 other => panic!("expected Milliseconds, got {other:?}"),
2078 }
2079 }
2080
2081 #[test]
2082 fn pttl_no_expiry() {
2083 let mut ks = Keyspace::new();
2084 ks.set("key".into(), Bytes::from("val"), None, false, false);
2085 assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2086 }
2087
2088 #[test]
2089 fn pttl_not_found() {
2090 let mut ks = Keyspace::new();
2091 assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2092 }
2093
2094 #[test]
2095 fn pexpire_sets_ttl_in_millis() {
2096 let mut ks = Keyspace::new();
2097 ks.set("key".into(), Bytes::from("val"), None, false, false);
2098 assert!(ks.pexpire("key", 5000));
2099 match ks.pttl("key") {
2100 TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2101 other => panic!("expected Milliseconds, got {other:?}"),
2102 }
2103 assert_eq!(ks.stats().keys_with_expiry, 1);
2104 }
2105
2106 #[test]
2107 fn pexpire_missing_key_returns_false() {
2108 let mut ks = Keyspace::new();
2109 assert!(!ks.pexpire("missing", 5000));
2110 }
2111
2112 #[test]
2113 fn pexpire_overwrites_existing_ttl() {
2114 let mut ks = Keyspace::new();
2115 ks.set(
2116 "key".into(),
2117 Bytes::from("val"),
2118 Some(Duration::from_secs(60)),
2119 false,
2120 false,
2121 );
2122 assert!(ks.pexpire("key", 500));
2123 match ks.pttl("key") {
2124 TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2125 other => panic!("expected Milliseconds, got {other:?}"),
2126 }
2127 assert_eq!(ks.stats().keys_with_expiry, 1);
2129 }
2130
2131 #[test]
2134 fn keys_match_all() {
2135 let mut ks = Keyspace::new();
2136 ks.set("a".into(), Bytes::from("1"), None, false, false);
2137 ks.set("b".into(), Bytes::from("2"), None, false, false);
2138 ks.set("c".into(), Bytes::from("3"), None, false, false);
2139 let mut result = ks.keys("*");
2140 result.sort();
2141 assert_eq!(result, vec!["a", "b", "c"]);
2142 }
2143
2144 #[test]
2145 fn keys_with_pattern() {
2146 let mut ks = Keyspace::new();
2147 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2148 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2149 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2150 let mut result = ks.keys("user:*");
2151 result.sort();
2152 assert_eq!(result, vec!["user:1", "user:2"]);
2153 }
2154
2155 #[test]
2156 fn keys_skips_expired() {
2157 let mut ks = Keyspace::new();
2158 ks.set("live".into(), Bytes::from("a"), None, false, false);
2159 ks.set(
2160 "dead".into(),
2161 Bytes::from("b"),
2162 Some(Duration::from_millis(1)),
2163 false,
2164 false,
2165 );
2166 thread::sleep(Duration::from_millis(5));
2167 let result = ks.keys("*");
2168 assert_eq!(result, vec!["live"]);
2169 }
2170
2171 #[test]
2172 fn keys_empty_keyspace() {
2173 let ks = Keyspace::new();
2174 assert!(ks.keys("*").is_empty());
2175 }
2176
2177 #[test]
2180 fn rename_basic() {
2181 let mut ks = Keyspace::new();
2182 ks.set("old".into(), Bytes::from("value"), None, false, false);
2183 ks.rename("old", "new").unwrap();
2184 assert!(!ks.exists("old"));
2185 assert_eq!(
2186 ks.get("new").unwrap(),
2187 Some(Value::String(Bytes::from("value")))
2188 );
2189 }
2190
2191 #[test]
2192 fn rename_preserves_expiry() {
2193 let mut ks = Keyspace::new();
2194 ks.set(
2195 "old".into(),
2196 Bytes::from("val"),
2197 Some(Duration::from_secs(60)),
2198 false,
2199 false,
2200 );
2201 ks.rename("old", "new").unwrap();
2202 match ks.ttl("new") {
2203 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2204 other => panic!("expected TTL preserved, got {other:?}"),
2205 }
2206 }
2207
2208 #[test]
2209 fn rename_overwrites_destination() {
2210 let mut ks = Keyspace::new();
2211 ks.set("src".into(), Bytes::from("new_val"), None, false, false);
2212 ks.set("dst".into(), Bytes::from("old_val"), None, false, false);
2213 ks.rename("src", "dst").unwrap();
2214 assert!(!ks.exists("src"));
2215 assert_eq!(
2216 ks.get("dst").unwrap(),
2217 Some(Value::String(Bytes::from("new_val")))
2218 );
2219 assert_eq!(ks.len(), 1);
2220 }
2221
2222 #[test]
2223 fn rename_missing_key_returns_error() {
2224 let mut ks = Keyspace::new();
2225 let err = ks.rename("missing", "new").unwrap_err();
2226 assert_eq!(err, RenameError::NoSuchKey);
2227 }
2228
2229 #[test]
2230 fn rename_same_key() {
2231 let mut ks = Keyspace::new();
2232 ks.set("key".into(), Bytes::from("val"), None, false, false);
2233 ks.rename("key", "key").unwrap();
2235 assert_eq!(
2236 ks.get("key").unwrap(),
2237 Some(Value::String(Bytes::from("val")))
2238 );
2239 }
2240
2241 #[test]
2242 fn rename_tracks_memory() {
2243 let mut ks = Keyspace::new();
2244 ks.set("old".into(), Bytes::from("value"), None, false, false);
2245 let before = ks.stats().used_bytes;
2246 ks.rename("old", "new").unwrap();
2247 let after = ks.stats().used_bytes;
2248 assert_eq!(before, after);
2250 assert_eq!(ks.stats().key_count, 1);
2251 }
2252
2253 #[test]
2254 fn zero_ttl_expires_immediately() {
2255 let mut ks = Keyspace::new();
2256 ks.set(
2257 "key".into(),
2258 Bytes::from("val"),
2259 Some(Duration::ZERO),
2260 false,
2261 false,
2262 );
2263
2264 std::thread::sleep(Duration::from_millis(1));
2266 assert!(ks.get("key").unwrap().is_none());
2267 }
2268
2269 #[test]
2270 fn very_small_ttl_expires_quickly() {
2271 let mut ks = Keyspace::new();
2272 ks.set(
2273 "key".into(),
2274 Bytes::from("val"),
2275 Some(Duration::from_millis(1)),
2276 false,
2277 false,
2278 );
2279
2280 std::thread::sleep(Duration::from_millis(5));
2281 assert!(ks.get("key").unwrap().is_none());
2282 }
2283
2284 #[test]
2285 fn count_keys_in_slot_empty() {
2286 let ks = Keyspace::new();
2287 assert_eq!(ks.count_keys_in_slot(0), 0);
2288 }
2289
2290 #[test]
2291 fn count_keys_in_slot_matches() {
2292 let mut ks = Keyspace::new();
2293 ks.set("a".into(), Bytes::from("1"), None, false, false);
2295 ks.set("b".into(), Bytes::from("2"), None, false, false);
2296 ks.set("c".into(), Bytes::from("3"), None, false, false);
2297
2298 let slot_a = ember_cluster::key_slot(b"a");
2299 let count = ks.count_keys_in_slot(slot_a);
2300 assert!(count >= 1);
2302 }
2303
2304 #[test]
2305 fn count_keys_in_slot_skips_expired() {
2306 let mut ks = Keyspace::new();
2307 let slot = ember_cluster::key_slot(b"temp");
2308 ks.set(
2309 "temp".into(),
2310 Bytes::from("gone"),
2311 Some(Duration::from_millis(0)),
2312 false,
2313 false,
2314 );
2315 thread::sleep(Duration::from_millis(5));
2317 assert_eq!(ks.count_keys_in_slot(slot), 0);
2318 }
2319
2320 #[test]
2321 fn get_keys_in_slot_returns_matching() {
2322 let mut ks = Keyspace::new();
2323 ks.set("x".into(), Bytes::from("1"), None, false, false);
2324 ks.set("y".into(), Bytes::from("2"), None, false, false);
2325
2326 let slot_x = ember_cluster::key_slot(b"x");
2327 let keys = ks.get_keys_in_slot(slot_x, 100);
2328 assert!(keys.contains(&"x".to_string()));
2329 }
2330
2331 #[test]
2332 fn get_keys_in_slot_respects_count_limit() {
2333 let mut ks = Keyspace::new();
2334 for i in 0..100 {
2336 ks.set(format!("key:{i}"), Bytes::from("v"), None, false, false);
2337 }
2338 let keys = ks.get_keys_in_slot(0, 3);
2340 assert!(keys.len() <= 3);
2341 }
2342
2343 #[test]
2350 fn key_version_returns_none_for_missing() {
2351 let mut ks = Keyspace::new();
2352 assert_eq!(ks.key_version("nope"), None);
2353 }
2354
2355 #[test]
2356 fn key_version_changes_on_set() {
2357 let mut ks = Keyspace::new();
2358 ks.set("k".into(), Bytes::from("v1"), None, false, false);
2359 let v1 = ks.key_version("k").expect("key should exist");
2361 ks.set("k".into(), Bytes::from("v2"), None, false, false);
2362 let v2 = ks.key_version("k").expect("key should exist");
2363 assert!(v2 > v1, "version should increase on overwrite");
2364 }
2365
2366 #[test]
2367 fn key_version_none_after_del() {
2368 let mut ks = Keyspace::new();
2369 ks.set("k".into(), Bytes::from("v"), None, false, false);
2370 assert!(ks.key_version("k").is_some());
2371 ks.del("k");
2372 assert_eq!(ks.key_version("k"), None);
2373 }
2374
2375 #[test]
2376 fn key_version_changes_on_list_push() {
2377 let mut ks = Keyspace::new();
2378 ks.lpush("list", &[Bytes::from("a")]).unwrap();
2379 let v1 = ks.key_version("list").expect("list should exist");
2380 ks.rpush("list", &[Bytes::from("b")]).unwrap();
2381 let v2 = ks.key_version("list").expect("list should exist");
2382 assert!(v2 > v1, "version should increase on rpush");
2383 }
2384
2385 #[test]
2386 fn key_version_changes_on_hash_set() {
2387 let mut ks = Keyspace::new();
2388 ks.hset("h", &[("f1".into(), Bytes::from("v1"))]).unwrap();
2389 let v1 = ks.key_version("h").expect("hash should exist");
2390 ks.hset("h", &[("f2".into(), Bytes::from("v2"))]).unwrap();
2391 let v2 = ks.key_version("h").expect("hash should exist");
2392 assert!(v2 > v1, "version should increase on hset");
2393 }
2394
2395 #[test]
2396 fn key_version_changes_on_expire() {
2397 let mut ks = Keyspace::new();
2398 ks.set("k".into(), Bytes::from("v"), None, false, false);
2399 let v1 = ks.key_version("k").expect("key should exist");
2400 ks.expire("k", 100);
2401 let v2 = ks.key_version("k").expect("key should exist");
2402 assert!(v2 > v1, "version should increase on expire");
2403 }
2404
2405 #[test]
2406 fn key_version_stable_without_watch() {
2407 let mut ks = Keyspace::new();
2410 ks.set("a".into(), Bytes::from("1"), None, false, false);
2411 ks.set("a".into(), Bytes::from("2"), None, false, false);
2412 let v1 = ks.key_version("a").unwrap();
2414 let v2 = ks.key_version("a").unwrap();
2416 assert_eq!(v1, v2, "version should be stable without mutations");
2417 }
2418
2419 #[test]
2422 fn copy_basic() {
2423 let mut ks = Keyspace::new();
2424 ks.set("src".into(), Bytes::from("hello"), None, false, false);
2425 assert_eq!(ks.copy("src", "dst", false), Ok(true));
2426 assert_eq!(
2427 ks.get("dst").unwrap(),
2428 Some(Value::String(Bytes::from("hello")))
2429 );
2430 assert!(ks.exists("src"));
2432 }
2433
2434 #[test]
2435 fn copy_preserves_expiry() {
2436 let mut ks = Keyspace::new();
2437 ks.set(
2438 "src".into(),
2439 Bytes::from("val"),
2440 Some(Duration::from_secs(60)),
2441 false,
2442 false,
2443 );
2444 assert_eq!(ks.copy("src", "dst", false), Ok(true));
2445 match ks.ttl("dst") {
2446 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2447 other => panic!("expected TTL preserved, got {other:?}"),
2448 }
2449 }
2450
2451 #[test]
2452 fn copy_no_replace_returns_false() {
2453 let mut ks = Keyspace::new();
2454 ks.set("src".into(), Bytes::from("a"), None, false, false);
2455 ks.set("dst".into(), Bytes::from("b"), None, false, false);
2456 assert_eq!(ks.copy("src", "dst", false), Ok(false));
2457 assert_eq!(
2459 ks.get("dst").unwrap(),
2460 Some(Value::String(Bytes::from("b")))
2461 );
2462 }
2463
2464 #[test]
2465 fn copy_replace_overwrites() {
2466 let mut ks = Keyspace::new();
2467 ks.set("src".into(), Bytes::from("new"), None, false, false);
2468 ks.set("dst".into(), Bytes::from("old"), None, false, false);
2469 assert_eq!(ks.copy("src", "dst", true), Ok(true));
2470 assert_eq!(
2471 ks.get("dst").unwrap(),
2472 Some(Value::String(Bytes::from("new")))
2473 );
2474 }
2475
2476 #[test]
2477 fn copy_missing_source() {
2478 let mut ks = Keyspace::new();
2479 assert_eq!(ks.copy("missing", "dst", false), Err(CopyError::NoSuchKey));
2480 }
2481
2482 #[test]
2483 fn copy_tracks_memory() {
2484 let mut ks = Keyspace::new();
2485 ks.set("src".into(), Bytes::from("value"), None, false, false);
2486 let before = ks.stats().used_bytes;
2487 ks.copy("src", "dst", false).unwrap();
2488 let after = ks.stats().used_bytes;
2489 assert!(after > before);
2491 assert_eq!(ks.stats().key_count, 2);
2492 }
2493
2494 #[test]
2497 fn random_key_empty() {
2498 let mut ks = Keyspace::new();
2499 assert_eq!(ks.random_key(), None);
2500 }
2501
2502 #[test]
2503 fn random_key_returns_existing() {
2504 let mut ks = Keyspace::new();
2505 ks.set("only".into(), Bytes::from("val"), None, false, false);
2506 assert_eq!(ks.random_key(), Some("only".into()));
2507 }
2508
2509 #[test]
2512 fn touch_existing_key() {
2513 let mut ks = Keyspace::new();
2514 ks.set("k".into(), Bytes::from("v"), None, false, false);
2515 assert!(ks.touch("k"));
2516 }
2517
2518 #[test]
2519 fn touch_missing_key() {
2520 let mut ks = Keyspace::new();
2521 assert!(!ks.touch("missing"));
2522 }
2523
2524 #[test]
2527 fn sort_list_numeric() {
2528 let mut ks = Keyspace::new();
2529 let _ = ks.lpush(
2530 "nums",
2531 &[Bytes::from("3"), Bytes::from("1"), Bytes::from("2")],
2532 );
2533 let result = ks.sort("nums", false, false, None).unwrap();
2534 assert_eq!(
2535 result,
2536 vec![Bytes::from("1"), Bytes::from("2"), Bytes::from("3")]
2537 );
2538 }
2539
2540 #[test]
2541 fn sort_list_alpha_desc() {
2542 let mut ks = Keyspace::new();
2543 let _ = ks.lpush(
2544 "words",
2545 &[
2546 Bytes::from("banana"),
2547 Bytes::from("apple"),
2548 Bytes::from("cherry"),
2549 ],
2550 );
2551 let result = ks.sort("words", true, true, None).unwrap();
2552 assert_eq!(
2553 result,
2554 vec![
2555 Bytes::from("cherry"),
2556 Bytes::from("banana"),
2557 Bytes::from("apple")
2558 ]
2559 );
2560 }
2561
2562 #[test]
2563 fn sort_with_limit() {
2564 let mut ks = Keyspace::new();
2565 let _ = ks.lpush(
2566 "nums",
2567 &[
2568 Bytes::from("4"),
2569 Bytes::from("3"),
2570 Bytes::from("2"),
2571 Bytes::from("1"),
2572 ],
2573 );
2574 let result = ks.sort("nums", false, false, Some((1, 2))).unwrap();
2575 assert_eq!(result, vec![Bytes::from("2"), Bytes::from("3")]);
2576 }
2577
2578 #[test]
2579 fn sort_set_alpha() {
2580 let mut ks = Keyspace::new();
2581 let members: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
2582 let _ = ks.sadd("myset", &members);
2583 let result = ks.sort("myset", false, true, None).unwrap();
2584 assert_eq!(
2585 result,
2586 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
2587 );
2588 }
2589
2590 #[test]
2591 fn sort_missing_key() {
2592 let mut ks = Keyspace::new();
2593 let result = ks.sort("nope", false, false, None).unwrap();
2594 assert!(result.is_empty());
2595 }
2596
2597 #[test]
2598 fn sort_wrong_type() {
2599 let mut ks = Keyspace::new();
2600 ks.set("str".into(), Bytes::from("hello"), None, false, false);
2601 let result = ks.sort("str", false, false, None);
2602 assert!(result.is_err());
2603 }
2604}