1use std::collections::VecDeque;
9use std::time::Duration;
10
11use ahash::AHashMap;
12use bytes::Bytes;
13use compact_str::CompactString;
14use rand::seq::IteratorRandom;
15
16use tracing::warn;
17
18use crate::dropper::DropHandle;
19use crate::memory::{self, MemoryTracker};
20use crate::time;
21use crate::types::sorted_set::{ScoreBound, SortedSet, ZAddFlags};
22use crate::types::{self, normalize_range, Value};
23
24mod bitmap;
25mod hash;
26mod list;
27#[cfg(feature = "protobuf")]
28mod proto;
29mod set;
30mod string;
31#[cfg(feature = "vector")]
32mod vector;
33mod zset;
34
35const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
36const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct WrongType;
41
42impl std::fmt::Display for WrongType {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "{WRONGTYPE_MSG}")
45 }
46}
47
48impl std::error::Error for WrongType {}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum WriteError {
54 WrongType,
56 OutOfMemory,
58}
59
60impl From<WrongType> for WriteError {
61 fn from(_: WrongType) -> Self {
62 WriteError::WrongType
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum IncrError {
69 WrongType,
71 NotAnInteger,
73 Overflow,
75 OutOfMemory,
77}
78
79impl std::fmt::Display for IncrError {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
83 IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
84 IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
85 IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
86 }
87 }
88}
89
90impl std::error::Error for IncrError {}
91
92#[derive(Debug, Clone, PartialEq)]
94pub enum IncrFloatError {
95 WrongType,
97 NotAFloat,
99 NanOrInfinity,
101 OutOfMemory,
103}
104
105impl std::fmt::Display for IncrFloatError {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 match self {
108 IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
109 IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
110 IncrFloatError::NanOrInfinity => {
111 write!(f, "ERR increment would produce NaN or Infinity")
112 }
113 IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
114 }
115 }
116}
117
118impl std::error::Error for IncrFloatError {}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub enum RenameError {
123 NoSuchKey,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
129pub enum CopyError {
130 NoSuchKey,
132 OutOfMemory,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum LsetError {
139 WrongType,
141 NoSuchKey,
143 IndexOutOfRange,
145}
146
147impl std::fmt::Display for LsetError {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 match self {
150 LsetError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
151 LsetError::NoSuchKey => write!(f, "ERR no such key"),
152 LsetError::IndexOutOfRange => write!(f, "ERR index out of range"),
153 }
154 }
155}
156
157impl std::error::Error for LsetError {}
158
159impl std::fmt::Display for RenameError {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 match self {
162 RenameError::NoSuchKey => write!(f, "ERR no such key"),
163 }
164 }
165}
166
167impl std::error::Error for RenameError {}
168
169#[derive(Debug, Clone)]
172pub struct ZAddResult {
173 pub count: usize,
175 pub applied: Vec<(f64, String)>,
178}
179
180#[cfg(feature = "vector")]
182#[derive(Debug, Clone)]
183pub struct VAddResult {
184 pub element: String,
186 pub vector: Vec<f32>,
188 pub added: bool,
190}
191
192#[cfg(feature = "vector")]
194#[derive(Debug, Clone)]
195pub struct VAddBatchResult {
196 pub added_count: usize,
198 pub applied: Vec<(String, Vec<f32>)>,
201}
202
203#[cfg(feature = "vector")]
205#[derive(Debug, Clone)]
206pub enum VectorWriteError {
207 WrongType,
209 OutOfMemory,
211 IndexError(String),
213 PartialBatch {
216 message: String,
217 applied: Vec<(String, Vec<f32>)>,
218 },
219}
220
221#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
223pub enum EvictionPolicy {
224 #[default]
226 NoEviction,
227 AllKeysLru,
229}
230
231#[derive(Debug, Clone)]
233pub struct ShardConfig {
234 pub max_memory: Option<usize>,
236 pub eviction_policy: EvictionPolicy,
238 pub shard_id: u16,
240}
241
242impl Default for ShardConfig {
243 fn default() -> Self {
244 Self {
245 max_memory: None,
246 eviction_policy: EvictionPolicy::NoEviction,
247 shard_id: 0,
248 }
249 }
250}
251
252#[derive(Debug, PartialEq, Eq)]
254pub enum SetResult {
255 Ok,
257 OutOfMemory,
259 Blocked,
261}
262
263#[derive(Debug, Clone)]
276pub(crate) struct Entry {
277 pub(crate) value: Value,
278 pub(crate) expires_at_ms: u64,
280 pub(crate) cached_value_size: u32,
285 pub(crate) last_access_secs: u32,
288}
289
290impl Entry {
291 fn new(value: Value, ttl: Option<Duration>) -> Self {
292 let cached_value_size = memory::value_size(&value) as u32;
293 Self {
294 value,
295 expires_at_ms: time::expiry_from_duration(ttl),
296 cached_value_size,
297 last_access_secs: time::now_secs(),
298 }
299 }
300
301 fn is_expired(&self) -> bool {
303 time::is_expired(self.expires_at_ms)
304 }
305
306 #[inline(always)]
310 fn touch(&mut self, track: bool) {
311 if track {
312 self.last_access_secs = time::now_secs();
313 }
314 }
315
316 fn entry_size(&self, key: &str) -> usize {
319 key.len() + self.cached_value_size as usize + memory::ENTRY_OVERHEAD
320 }
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
325pub enum TtlResult {
326 Seconds(u64),
328 Milliseconds(u64),
330 NoExpiry,
332 NotFound,
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct KeyspaceStats {
339 pub key_count: usize,
341 pub used_bytes: usize,
343 pub keys_with_expiry: usize,
345 pub keys_expired: u64,
347 pub keys_evicted: u64,
349 pub oom_rejections: u64,
351 pub keyspace_hits: u64,
353 pub keyspace_misses: u64,
355}
356
357const EVICTION_SAMPLE_SIZE: usize = 16;
368
369pub struct Keyspace {
374 entries: AHashMap<CompactString, Entry>,
375 memory: MemoryTracker,
376 config: ShardConfig,
377 expiry_count: usize,
379 expired_total: u64,
381 evicted_total: u64,
383 oom_rejections: u64,
385 keyspace_hits: u64,
387 keyspace_misses: u64,
389 drop_handle: Option<DropHandle>,
392 next_version: u64,
395 versions: AHashMap<CompactString, u64>,
400 track_access: bool,
405}
406
407impl Keyspace {
408 pub fn new() -> Self {
410 Self::with_config(ShardConfig::default())
411 }
412
413 pub fn with_config(config: ShardConfig) -> Self {
415 let track_access = config.eviction_policy == EvictionPolicy::AllKeysLru;
416 Self {
417 entries: AHashMap::new(),
418 memory: MemoryTracker::new(),
419 config,
420 expiry_count: 0,
421 expired_total: 0,
422 evicted_total: 0,
423 oom_rejections: 0,
424 keyspace_hits: 0,
425 keyspace_misses: 0,
426 drop_handle: None,
427 next_version: 0,
428 versions: AHashMap::new(),
429 track_access,
430 }
431 }
432
433 pub fn set_drop_handle(&mut self, handle: DropHandle) {
437 self.drop_handle = Some(handle);
438 }
439
440 fn bump_version(&mut self, key: &str) {
444 if let Some(ver) = self.versions.get_mut(key) {
445 self.next_version += 1;
446 *ver = self.next_version;
447 }
448 }
449
450 pub fn key_version(&mut self, key: &str) -> Option<u64> {
456 let entry = self.entries.get(key)?;
457 if entry.is_expired() {
458 return None;
459 }
460 let ver = *self
461 .versions
462 .entry(CompactString::from(key))
463 .or_insert(self.next_version);
464 Some(ver)
465 }
466
467 fn remove_version(&mut self, key: &str) {
470 self.versions.remove(key);
471 }
472
473 pub fn clear_versions(&mut self) {
476 self.versions.clear();
477 }
478
479 fn decrement_expiry_if_set(&mut self, entry: &Entry) {
481 if entry.expires_at_ms != 0 {
482 self.expiry_count = self.expiry_count.saturating_sub(1);
483 }
484 }
485
486 fn cleanup_after_remove(
493 &mut self,
494 key: &str,
495 old_size: usize,
496 is_empty: bool,
497 removed_bytes: usize,
498 ) {
499 if is_empty {
500 if let Some(removed) = self.entries.remove(key) {
501 self.decrement_expiry_if_set(&removed);
502 }
503 self.memory.remove_with_size(old_size);
504 } else {
505 self.memory.shrink_by(removed_bytes);
506 if let Some(entry) = self.entries.get_mut(key) {
507 entry.cached_value_size =
508 (entry.cached_value_size as usize).saturating_sub(removed_bytes) as u32;
509 }
510 }
511 }
512
513 fn ensure_collection_type(
518 &self,
519 key: &str,
520 type_check: fn(&Value) -> bool,
521 ) -> Result<bool, WriteError> {
522 match self.entries.get(key) {
523 None => Ok(true),
524 Some(e) if type_check(&e.value) => Ok(false),
525 Some(_) => Err(WriteError::WrongType),
526 }
527 }
528
529 fn reserve_memory(
533 &mut self,
534 is_new: bool,
535 key: &str,
536 base_overhead: usize,
537 element_increase: usize,
538 ) -> Result<(), WriteError> {
539 let estimated_increase = if is_new {
540 memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
541 } else {
542 element_increase
543 };
544 if self.enforce_memory_limit(estimated_increase) {
545 Ok(())
546 } else {
547 Err(WriteError::OutOfMemory)
548 }
549 }
550
551 fn insert_empty(&mut self, key: &str, value: Value) {
554 self.memory.add(key, &value);
555 let entry = Entry::new(value, None);
556 self.entries.insert(CompactString::from(key), entry);
557 self.bump_version(key);
558 }
559
560 fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
567 let entry = self.entries.get_mut(key)?;
568 let old_size = entry.entry_size(key);
569 let result = f(entry);
570 let entry = self.entries.get_mut(key)?;
572 let new_value_size = memory::value_size(&entry.value);
573 entry.cached_value_size = new_value_size as u32;
574 let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
575 self.memory.adjust(old_size, new_size);
576 self.bump_version(key);
577 Some(result)
578 }
579
580 fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
583 match (had_expiry, has_expiry) {
584 (false, true) => self.expiry_count += 1,
585 (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
586 _ => {}
587 }
588 }
589
590 fn try_evict(&mut self) -> bool {
600 if self.entries.is_empty() {
601 return false;
602 }
603
604 let mut rng = rand::rng();
605
606 let mut best_key: Option<&str> = None;
610 let mut best_access = u32::MAX;
611 let mut seen = 0usize;
612
613 for (key, entry) in &self.entries {
614 seen += 1;
618 if seen <= EVICTION_SAMPLE_SIZE {
619 if entry.last_access_secs < best_access {
620 best_access = entry.last_access_secs;
621 best_key = Some(&**key);
622 }
623 } else {
624 use rand::Rng;
625 let j = rng.random_range(0..seen);
626 if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
627 best_access = entry.last_access_secs;
628 best_key = Some(&**key);
629 }
630 }
631 }
632
633 if let Some(victim) = best_key {
634 let victim = victim.to_owned();
636 if let Some(entry) = self.entries.remove(victim.as_str()) {
637 self.memory.remove(&victim, &entry.value);
638 self.decrement_expiry_if_set(&entry);
639 self.evicted_total += 1;
640 self.remove_version(&victim);
641 self.defer_drop(entry.value);
642 return true;
643 }
644 }
645 false
646 }
647
648 fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
657 if let Some(max) = self.config.max_memory {
658 let limit = memory::effective_limit(max);
659 while self.memory.used_bytes() + estimated_increase > limit {
660 match self.config.eviction_policy {
661 EvictionPolicy::NoEviction => {
662 self.oom_rejections += 1;
663 if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000) {
665 warn!(
666 used_bytes = self.memory.used_bytes(),
667 limit,
668 requested = estimated_increase,
669 total_rejections = self.oom_rejections,
670 "OOM: write rejected (policy: noeviction)"
671 );
672 }
673 return false;
674 }
675 EvictionPolicy::AllKeysLru => {
676 if !self.try_evict() {
677 self.oom_rejections += 1;
678 if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000)
679 {
680 warn!(
681 used_bytes = self.memory.used_bytes(),
682 limit,
683 requested = estimated_increase,
684 total_rejections = self.oom_rejections,
685 "OOM: write rejected (eviction exhausted)"
686 );
687 }
688 return false;
689 }
690 }
691 }
692 }
693 }
694 true
695 }
696
697 pub fn del(&mut self, key: &str) -> bool {
702 if self.remove_if_expired(key) {
703 return false;
704 }
705 if let Some(entry) = self.entries.remove(key) {
706 self.memory.remove(key, &entry.value);
707 self.decrement_expiry_if_set(&entry);
708 self.remove_version(key);
709 self.defer_drop(entry.value);
710 true
711 } else {
712 false
713 }
714 }
715
716 pub fn unlink(&mut self, key: &str) -> bool {
722 if self.remove_if_expired(key) {
723 return false;
724 }
725 if let Some(entry) = self.entries.remove(key) {
726 self.memory.remove(key, &entry.value);
727 self.decrement_expiry_if_set(&entry);
728 self.remove_version(key);
729 if let Some(ref handle) = self.drop_handle {
731 handle.defer_value(entry.value);
732 }
733 true
734 } else {
735 false
736 }
737 }
738
739 pub(crate) fn flush_async(&mut self) -> AHashMap<CompactString, Entry> {
743 let old = std::mem::take(&mut self.entries);
744 self.memory.reset();
745 self.expiry_count = 0;
746 self.versions.clear();
747 old
748 }
749
750 pub fn exists(&mut self, key: &str) -> bool {
752 if self.remove_if_expired(key) {
753 return false;
754 }
755 self.entries.contains_key(key)
756 }
757
758 pub fn random_key(&mut self) -> Option<String> {
763 for _ in 0..5 {
765 let mut rng = rand::rng();
766 let key = self.entries.keys().choose(&mut rng)?.clone();
767
768 if self.remove_if_expired(&key) {
769 continue;
770 }
771 return Some(key.to_string());
772 }
773 None
774 }
775
776 pub fn touch(&mut self, key: &str) -> bool {
778 if self.remove_if_expired(key) {
779 return false;
780 }
781 match self.entries.get_mut(key) {
782 Some(entry) => {
783 entry.touch(self.track_access);
784 true
785 }
786 None => false,
787 }
788 }
789
790 pub fn sort(
795 &mut self,
796 key: &str,
797 desc: bool,
798 alpha: bool,
799 limit: Option<(i64, i64)>,
800 ) -> Result<Vec<Bytes>, &'static str> {
801 if self.remove_if_expired(key) {
802 return Ok(Vec::new());
803 }
804 let entry = match self.entries.get_mut(key) {
805 Some(e) => {
806 e.touch(self.track_access);
807 e
808 }
809 None => return Ok(Vec::new()),
810 };
811
812 let mut items: Vec<Bytes> = match &entry.value {
814 Value::List(deq) => deq.iter().cloned().collect(),
815 Value::Set(set) => set.iter().map(|s| Bytes::from(s.clone())).collect(),
816 Value::SortedSet(zset) => zset
817 .iter()
818 .map(|(m, _)| Bytes::from(m.to_owned()))
819 .collect(),
820 _ => return Err(WRONGTYPE_MSG),
821 };
822
823 if alpha {
825 items.sort();
826 if desc {
827 items.reverse();
828 }
829 } else {
830 let mut parse_err = false;
832 items.sort_by(|a, b| {
833 let a_str = std::str::from_utf8(a).unwrap_or("");
834 let b_str = std::str::from_utf8(b).unwrap_or("");
835 let a_val = a_str.parse::<f64>().unwrap_or_else(|_| {
836 parse_err = true;
837 0.0
838 });
839 let b_val = b_str.parse::<f64>().unwrap_or_else(|_| {
840 parse_err = true;
841 0.0
842 });
843 if desc {
844 b_val
845 .partial_cmp(&a_val)
846 .unwrap_or(std::cmp::Ordering::Equal)
847 } else {
848 a_val
849 .partial_cmp(&b_val)
850 .unwrap_or(std::cmp::Ordering::Equal)
851 }
852 });
853 if parse_err {
854 return Err("ERR One or more scores can't be converted into double");
855 }
856 }
857
858 if let Some((offset, count)) = limit {
860 let offset = offset.max(0) as usize;
861 let count = count.max(0) as usize;
862 let end = offset.saturating_add(count).min(items.len());
863 if offset < items.len() {
864 items = items[offset..end].to_vec();
865 } else {
866 items.clear();
867 }
868 }
869
870 Ok(items)
871 }
872
873 pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
876 if self.remove_if_expired(key) {
877 return false;
878 }
879 match self.entries.get_mut(key) {
880 Some(entry) => {
881 if entry.expires_at_ms == 0 {
882 self.expiry_count += 1;
883 }
884 entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
885 self.bump_version(key);
886 true
887 }
888 None => false,
889 }
890 }
891
892 pub fn ttl(&mut self, key: &str) -> TtlResult {
897 if self.remove_if_expired(key) {
898 return TtlResult::NotFound;
899 }
900 match self.entries.get(key) {
901 Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
902 Some(secs) => TtlResult::Seconds(secs),
903 None => TtlResult::NoExpiry,
904 },
905 None => TtlResult::NotFound,
906 }
907 }
908
909 pub fn memory_usage(&mut self, key: &str) -> Option<usize> {
915 if self.remove_if_expired(key) {
916 return None;
917 }
918 let entry = self.entries.get(key)?;
919 Some(entry.entry_size(key))
920 }
921
922 pub fn persist(&mut self, key: &str) -> bool {
927 if self.remove_if_expired(key) {
928 return false;
929 }
930 match self.entries.get_mut(key) {
931 Some(entry) => {
932 if entry.expires_at_ms != 0 {
933 entry.expires_at_ms = 0;
934 self.expiry_count = self.expiry_count.saturating_sub(1);
935 self.bump_version(key);
936 true
937 } else {
938 false
939 }
940 }
941 None => false,
942 }
943 }
944
945 pub fn pttl(&mut self, key: &str) -> TtlResult {
950 if self.remove_if_expired(key) {
951 return TtlResult::NotFound;
952 }
953 match self.entries.get(key) {
954 Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
955 Some(ms) => TtlResult::Milliseconds(ms),
956 None => TtlResult::NoExpiry,
957 },
958 None => TtlResult::NotFound,
959 }
960 }
961
962 pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
967 if self.remove_if_expired(key) {
968 return false;
969 }
970 match self.entries.get_mut(key) {
971 Some(entry) => {
972 if entry.expires_at_ms == 0 {
973 self.expiry_count += 1;
974 }
975 entry.expires_at_ms = time::now_ms().saturating_add(millis);
976 self.bump_version(key);
977 true
978 }
979 None => false,
980 }
981 }
982
983 pub fn expireat(&mut self, key: &str, unix_secs: u64) -> bool {
988 if self.remove_if_expired(key) {
989 return false;
990 }
991 match self.entries.get_mut(key) {
992 Some(entry) => {
993 if entry.expires_at_ms == 0 {
994 self.expiry_count += 1;
995 }
996 entry.expires_at_ms = time::unix_ms_to_monotonic_ms(unix_secs.saturating_mul(1000));
997 self.bump_version(key);
998 true
999 }
1000 None => false,
1001 }
1002 }
1003
1004 pub fn pexpireat(&mut self, key: &str, unix_ms: u64) -> bool {
1009 if self.remove_if_expired(key) {
1010 return false;
1011 }
1012 match self.entries.get_mut(key) {
1013 Some(entry) => {
1014 if entry.expires_at_ms == 0 {
1015 self.expiry_count += 1;
1016 }
1017 entry.expires_at_ms = time::unix_ms_to_monotonic_ms(unix_ms);
1018 self.bump_version(key);
1019 true
1020 }
1021 None => false,
1022 }
1023 }
1024
1025 pub fn expiretime(&mut self, key: &str) -> i64 {
1029 if self.remove_if_expired(key) {
1030 return -2;
1031 }
1032 match self.entries.get(key) {
1033 None => -2,
1034 Some(entry) => match time::monotonic_to_unix_ms(entry.expires_at_ms) {
1035 None => -1,
1036 Some(unix_ms) => (unix_ms / 1000) as i64,
1037 },
1038 }
1039 }
1040
1041 pub fn pexpiretime(&mut self, key: &str) -> i64 {
1045 if self.remove_if_expired(key) {
1046 return -2;
1047 }
1048 match self.entries.get(key) {
1049 None => -2,
1050 Some(entry) => match time::monotonic_to_unix_ms(entry.expires_at_ms) {
1051 None => -1,
1052 Some(unix_ms) => unix_ms as i64,
1053 },
1054 }
1055 }
1056
1057 pub fn keys(&self, pattern: &str) -> Vec<String> {
1062 let len = self.entries.len();
1063 if len > 10_000 {
1064 warn!(
1065 key_count = len,
1066 "KEYS on large keyspace, consider SCAN instead"
1067 );
1068 }
1069 let compiled = GlobPattern::new(pattern);
1070 self.entries
1071 .iter()
1072 .filter(|(_, entry)| !entry.is_expired())
1073 .filter(|(key, _)| compiled.matches(key))
1074 .map(|(key, _)| String::from(&**key))
1075 .collect()
1076 }
1077
1078 pub fn count_keys_in_slot(&self, slot: u16) -> usize {
1082 self.entries
1083 .iter()
1084 .filter(|(_, entry)| !entry.is_expired())
1085 .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
1086 .count()
1087 }
1088
1089 pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
1093 self.entries
1094 .iter()
1095 .filter(|(_, entry)| !entry.is_expired())
1096 .filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
1097 .take(count)
1098 .map(|(key, _)| String::from(&**key))
1099 .collect()
1100 }
1101
1102 pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
1105 self.remove_if_expired(key);
1106 self.remove_if_expired(newkey);
1107
1108 let entry = match self.entries.remove(key) {
1109 Some(entry) => entry,
1110 None => return Err(RenameError::NoSuchKey),
1111 };
1112
1113 self.memory.remove(key, &entry.value);
1115 self.decrement_expiry_if_set(&entry);
1116
1117 if let Some(old_dest) = self.entries.remove(newkey) {
1119 self.memory.remove(newkey, &old_dest.value);
1120 self.decrement_expiry_if_set(&old_dest);
1121 }
1122
1123 self.memory.add(newkey, &entry.value);
1125 if entry.expires_at_ms != 0 {
1126 self.expiry_count += 1;
1127 }
1128 self.remove_version(key);
1129 self.entries.insert(CompactString::from(newkey), entry);
1130 self.bump_version(newkey);
1131 Ok(())
1132 }
1133
1134 pub fn copy(&mut self, source: &str, dest: &str, replace: bool) -> Result<bool, CopyError> {
1138 self.remove_if_expired(source);
1139 self.remove_if_expired(dest);
1140
1141 let src_entry = match self.entries.get(source) {
1142 Some(e) => e,
1143 None => return Err(CopyError::NoSuchKey),
1144 };
1145
1146 if !replace && self.entries.contains_key(dest) {
1148 return Ok(false);
1149 }
1150
1151 let cloned_value = src_entry.value.clone();
1153 let cloned_expire = if src_entry.expires_at_ms != 0 {
1154 Some(src_entry.expires_at_ms)
1155 } else {
1156 None
1157 };
1158
1159 let new_size = memory::entry_size(dest, &cloned_value);
1161
1162 let old_dest_size = self
1164 .entries
1165 .get(dest)
1166 .map(|e| e.entry_size(dest))
1167 .unwrap_or(0);
1168 let net_increase = new_size.saturating_sub(old_dest_size);
1169 if !self.enforce_memory_limit(net_increase) {
1170 return Err(CopyError::OutOfMemory);
1171 }
1172
1173 if let Some(old_dest) = self.entries.remove(dest) {
1175 self.memory.remove(dest, &old_dest.value);
1176 self.decrement_expiry_if_set(&old_dest);
1177 self.defer_drop(old_dest.value);
1178 }
1179
1180 self.memory.add(dest, &cloned_value);
1182 let has_expiry = cloned_expire.is_some();
1183 if has_expiry {
1184 self.expiry_count += 1;
1185 }
1186 let mut entry = Entry::new(cloned_value, None);
1187 if let Some(ts) = cloned_expire {
1189 entry.expires_at_ms = ts;
1190 }
1191 self.entries.insert(CompactString::from(dest), entry);
1192 self.bump_version(dest);
1193 Ok(true)
1194 }
1195
1196 pub fn update_memory_config(
1202 &mut self,
1203 max_memory: Option<usize>,
1204 eviction_policy: EvictionPolicy,
1205 ) {
1206 self.config.max_memory = max_memory;
1207 self.config.eviction_policy = eviction_policy;
1208 self.track_access = matches!(eviction_policy, EvictionPolicy::AllKeysLru);
1209 }
1210
1211 pub fn stats(&self) -> KeyspaceStats {
1215 KeyspaceStats {
1216 key_count: self.memory.key_count(),
1217 used_bytes: self.memory.used_bytes(),
1218 keys_with_expiry: self.expiry_count,
1219 keys_expired: self.expired_total,
1220 keys_evicted: self.evicted_total,
1221 oom_rejections: self.oom_rejections,
1222 keyspace_hits: self.keyspace_hits,
1223 keyspace_misses: self.keyspace_misses,
1224 }
1225 }
1226
1227 pub fn len(&self) -> usize {
1229 self.entries.len()
1230 }
1231
1232 pub fn clear(&mut self) {
1234 self.entries.clear();
1235 self.memory.reset();
1236 self.expiry_count = 0;
1237 self.versions.clear();
1238 }
1239
1240 pub fn is_empty(&self) -> bool {
1242 self.entries.is_empty()
1243 }
1244
1245 pub fn scan_keys(
1250 &self,
1251 cursor: u64,
1252 count: usize,
1253 pattern: Option<&str>,
1254 ) -> (u64, Vec<String>) {
1255 let mut keys = Vec::with_capacity(count);
1256 let mut position = 0u64;
1257 let target_count = if count == 0 { 10 } else { count };
1258
1259 let compiled = pattern.map(GlobPattern::new);
1260
1261 for (key, entry) in self.entries.iter() {
1262 if entry.is_expired() {
1264 continue;
1265 }
1266
1267 if position < cursor {
1269 position += 1;
1270 continue;
1271 }
1272
1273 if let Some(ref pat) = compiled {
1275 if !pat.matches(key) {
1276 position += 1;
1277 continue;
1278 }
1279 }
1280
1281 keys.push(String::from(&**key));
1282 position += 1;
1283
1284 if keys.len() >= target_count {
1285 return (position, keys);
1287 }
1288 }
1289
1290 (0, keys)
1292 }
1293
1294 pub fn dump(&mut self, key: &str) -> Option<(&Value, i64)> {
1300 if self.remove_if_expired(key) {
1301 return None;
1302 }
1303 let entry = self.entries.get(key)?;
1304 let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1305 Some(ms) => ms.min(i64::MAX as u64) as i64,
1306 None => -1,
1307 };
1308 Some((&entry.value, ttl_ms))
1309 }
1310
1311 pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
1315 self.entries.iter().filter_map(move |(key, entry)| {
1316 if entry.is_expired() {
1317 return None;
1318 }
1319 let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
1320 Some(ms) => ms.min(i64::MAX as u64) as i64,
1321 None => -1,
1322 };
1323 Some((&**key, &entry.value, ttl_ms))
1324 })
1325 }
1326
1327 pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
1333 let has_expiry = ttl.is_some();
1334
1335 if let Some(old) = self.entries.get(key.as_str()) {
1337 self.memory.replace(&key, &old.value, &value);
1338 self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
1339 } else {
1340 self.memory.add(&key, &value);
1341 if has_expiry {
1342 self.expiry_count += 1;
1343 }
1344 }
1345
1346 let entry = Entry::new(value, ttl);
1347 self.entries.insert(CompactString::from(key.clone()), entry);
1348 self.bump_version(&key);
1349 }
1350
1351 pub(crate) fn expire_sample(&mut self, count: usize, out: &mut Vec<String>) -> usize {
1358 if self.entries.is_empty() {
1359 return 0;
1360 }
1361
1362 let mut rng = rand::rng();
1363
1364 let keys_to_check: Vec<String> = self
1365 .entries
1366 .keys()
1367 .choose_multiple(&mut rng, count)
1368 .into_iter()
1369 .map(|k| String::from(&**k))
1370 .collect();
1371
1372 let mut removed = 0;
1373 for key in keys_to_check {
1374 if self.remove_if_expired(&key) {
1375 out.push(key);
1376 removed += 1;
1377 }
1378 }
1379 removed
1380 }
1381
1382 fn remove_expired_entry(&mut self, key: &str) {
1386 if let Some(entry) = self.entries.remove(key) {
1387 self.memory.remove(key, &entry.value);
1388 self.decrement_expiry_if_set(&entry);
1389 self.expired_total += 1;
1390 self.remove_version(key);
1391 self.defer_drop(entry.value);
1392 }
1393 }
1394
1395 fn remove_if_expired(&mut self, key: &str) -> bool {
1398 let expired = self
1399 .entries
1400 .get(key)
1401 .map(|e| e.is_expired())
1402 .unwrap_or(false);
1403
1404 if expired {
1405 if let Some(entry) = self.entries.remove(key) {
1406 self.memory.remove(key, &entry.value);
1407 self.decrement_expiry_if_set(&entry);
1408 self.expired_total += 1;
1409 self.remove_version(key);
1410 self.defer_drop(entry.value);
1411 }
1412 }
1413 expired
1414 }
1415
1416 fn get_live_entry(&mut self, key: &str) -> Option<&mut Entry> {
1428 self.remove_if_expired(key);
1429 let entry = self.entries.get_mut(key)?;
1430 entry.touch(self.track_access);
1431 Some(entry)
1432 }
1433
1434 fn defer_drop(&self, value: Value) {
1437 if let Some(ref handle) = self.drop_handle {
1438 handle.defer_value(value);
1439 }
1440 }
1441}
1442
1443impl Default for Keyspace {
1444 fn default() -> Self {
1445 Self::new()
1446 }
1447}
1448
1449pub(crate) fn format_float(val: f64) -> String {
1454 if val == 0.0 {
1455 return "0".into();
1456 }
1457 let s = format!("{:.17e}", val);
1459 let reparsed: f64 = s.parse().unwrap_or(val);
1461 if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
1463 format!("{}", reparsed as i64)
1464 } else {
1465 let formatted = format!("{}", reparsed);
1467 formatted
1468 }
1469}
1470
1471pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
1486 let pat: Vec<char> = pattern.chars().collect();
1487 glob_match_compiled(&pat, text)
1488}
1489
1490pub(crate) struct GlobPattern {
1494 chars: Vec<char>,
1495}
1496
1497impl GlobPattern {
1498 pub(crate) fn new(pattern: &str) -> Self {
1499 Self {
1500 chars: pattern.chars().collect(),
1501 }
1502 }
1503
1504 pub(crate) fn matches(&self, text: &str) -> bool {
1505 glob_match_compiled(&self.chars, text)
1506 }
1507}
1508
1509fn glob_match_compiled(pat: &[char], text: &str) -> bool {
1511 let txt: Vec<char> = text.chars().collect();
1512
1513 let mut pi = 0; let mut ti = 0; let mut star_pi: Option<usize> = None;
1518 let mut star_ti: usize = 0;
1519
1520 while ti < txt.len() || pi < pat.len() {
1521 if pi < pat.len() {
1522 match pat[pi] {
1523 '*' => {
1524 star_pi = Some(pi);
1526 star_ti = ti;
1527 pi += 1;
1528 continue;
1529 }
1530 '?' if ti < txt.len() => {
1531 pi += 1;
1532 ti += 1;
1533 continue;
1534 }
1535 '[' if ti < txt.len() => {
1536 let tc = txt[ti];
1538 let mut j = pi + 1;
1539 let mut negated = false;
1540 let mut matched = false;
1541
1542 if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1543 negated = true;
1544 j += 1;
1545 }
1546
1547 while j < pat.len() && pat[j] != ']' {
1548 if pat[j] == tc {
1549 matched = true;
1550 }
1551 j += 1;
1552 }
1553
1554 if negated {
1555 matched = !matched;
1556 }
1557
1558 if matched && j < pat.len() {
1559 pi = j + 1; ti += 1;
1561 continue;
1562 }
1563 }
1565 c if ti < txt.len() && c == txt[ti] => {
1566 pi += 1;
1567 ti += 1;
1568 continue;
1569 }
1570 _ => {}
1571 }
1572 }
1573
1574 if let Some(sp) = star_pi {
1576 pi = sp + 1;
1577 star_ti += 1;
1578 ti = star_ti;
1579 if ti > txt.len() {
1580 return false;
1581 }
1582 } else {
1583 return false;
1584 }
1585 }
1586
1587 while pi < pat.len() && pat[pi] == '*' {
1589 pi += 1;
1590 }
1591
1592 pi == pat.len()
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use super::*;
1598 use std::thread;
1599
1600 #[test]
1601 fn del_existing() {
1602 let mut ks = Keyspace::new();
1603 ks.set("key".into(), Bytes::from("val"), None, false, false);
1604 assert!(ks.del("key"));
1605 assert_eq!(ks.get("key").unwrap(), None);
1606 }
1607
1608 #[test]
1609 fn del_missing() {
1610 let mut ks = Keyspace::new();
1611 assert!(!ks.del("nope"));
1612 }
1613
1614 #[test]
1615 fn exists_present_and_absent() {
1616 let mut ks = Keyspace::new();
1617 ks.set("yes".into(), Bytes::from("here"), None, false, false);
1618 assert!(ks.exists("yes"));
1619 assert!(!ks.exists("no"));
1620 }
1621
1622 #[test]
1623 fn ttl_no_expiry() {
1624 let mut ks = Keyspace::new();
1625 ks.set("key".into(), Bytes::from("val"), None, false, false);
1626 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1627 }
1628
1629 #[test]
1630 fn ttl_not_found() {
1631 let mut ks = Keyspace::new();
1632 assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1633 }
1634
1635 #[test]
1636 fn ttl_with_expiry() {
1637 let mut ks = Keyspace::new();
1638 ks.set(
1639 "key".into(),
1640 Bytes::from("val"),
1641 Some(Duration::from_secs(100)),
1642 false,
1643 false,
1644 );
1645 match ks.ttl("key") {
1646 TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
1647 other => panic!("expected Seconds, got {other:?}"),
1648 }
1649 }
1650
1651 #[test]
1652 fn ttl_expired_key() {
1653 let mut ks = Keyspace::new();
1654 ks.set(
1655 "temp".into(),
1656 Bytes::from("val"),
1657 Some(Duration::from_millis(10)),
1658 false,
1659 false,
1660 );
1661 thread::sleep(Duration::from_millis(30));
1662 assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1663 }
1664
1665 #[test]
1666 fn expire_existing_key() {
1667 let mut ks = Keyspace::new();
1668 ks.set("key".into(), Bytes::from("val"), None, false, false);
1669 assert!(ks.expire("key", 60));
1670 match ks.ttl("key") {
1671 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
1672 other => panic!("expected Seconds, got {other:?}"),
1673 }
1674 }
1675
1676 #[test]
1677 fn expire_missing_key() {
1678 let mut ks = Keyspace::new();
1679 assert!(!ks.expire("nope", 60));
1680 }
1681
1682 #[test]
1683 fn del_expired_key_returns_false() {
1684 let mut ks = Keyspace::new();
1685 ks.set(
1686 "temp".into(),
1687 Bytes::from("val"),
1688 Some(Duration::from_millis(10)),
1689 false,
1690 false,
1691 );
1692 thread::sleep(Duration::from_millis(30));
1693 assert!(!ks.del("temp"));
1695 }
1696
1697 #[test]
1700 fn memory_increases_on_set() {
1701 let mut ks = Keyspace::new();
1702 assert_eq!(ks.stats().used_bytes, 0);
1703 ks.set("key".into(), Bytes::from("value"), None, false, false);
1704 assert!(ks.stats().used_bytes > 0);
1705 assert_eq!(ks.stats().key_count, 1);
1706 }
1707
1708 #[test]
1709 fn memory_decreases_on_del() {
1710 let mut ks = Keyspace::new();
1711 ks.set("key".into(), Bytes::from("value"), None, false, false);
1712 let after_set = ks.stats().used_bytes;
1713 ks.del("key");
1714 assert_eq!(ks.stats().used_bytes, 0);
1715 assert!(after_set > 0);
1716 }
1717
1718 #[test]
1719 fn memory_adjusts_on_overwrite() {
1720 let mut ks = Keyspace::new();
1721 ks.set("key".into(), Bytes::from("short"), None, false, false);
1722 let small = ks.stats().used_bytes;
1723
1724 ks.set(
1725 "key".into(),
1726 Bytes::from("a much longer value"),
1727 None,
1728 false,
1729 false,
1730 );
1731 let large = ks.stats().used_bytes;
1732
1733 assert!(large > small);
1734 assert_eq!(ks.stats().key_count, 1);
1735 }
1736
1737 #[test]
1738 fn memory_decreases_on_expired_removal() {
1739 let mut ks = Keyspace::new();
1740 ks.set(
1741 "temp".into(),
1742 Bytes::from("data"),
1743 Some(Duration::from_millis(10)),
1744 false,
1745 false,
1746 );
1747 assert!(ks.stats().used_bytes > 0);
1748 thread::sleep(Duration::from_millis(30));
1749 let _ = ks.get("temp");
1751 assert_eq!(ks.stats().used_bytes, 0);
1752 assert_eq!(ks.stats().key_count, 0);
1753 }
1754
1755 #[test]
1756 fn stats_tracks_expiry_count() {
1757 let mut ks = Keyspace::new();
1758 ks.set("a".into(), Bytes::from("1"), None, false, false);
1759 ks.set(
1760 "b".into(),
1761 Bytes::from("2"),
1762 Some(Duration::from_secs(100)),
1763 false,
1764 false,
1765 );
1766 ks.set(
1767 "c".into(),
1768 Bytes::from("3"),
1769 Some(Duration::from_secs(200)),
1770 false,
1771 false,
1772 );
1773
1774 let stats = ks.stats();
1775 assert_eq!(stats.key_count, 3);
1776 assert_eq!(stats.keys_with_expiry, 2);
1777 }
1778
1779 #[test]
1782 fn noeviction_returns_oom_when_full() {
1783 let config = ShardConfig {
1786 max_memory: Some(130),
1787 eviction_policy: EvictionPolicy::NoEviction,
1788 ..ShardConfig::default()
1789 };
1790 let mut ks = Keyspace::with_config(config);
1791
1792 assert_eq!(
1794 ks.set("a".into(), Bytes::from("val"), None, false, false),
1795 SetResult::Ok
1796 );
1797
1798 let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1800 assert_eq!(result, SetResult::OutOfMemory);
1801
1802 assert!(ks.exists("a"));
1804 }
1805
1806 #[test]
1807 fn lru_eviction_makes_room() {
1808 let config = ShardConfig {
1809 max_memory: Some(130),
1810 eviction_policy: EvictionPolicy::AllKeysLru,
1811 ..ShardConfig::default()
1812 };
1813 let mut ks = Keyspace::with_config(config);
1814
1815 assert_eq!(
1816 ks.set("a".into(), Bytes::from("val"), None, false, false),
1817 SetResult::Ok
1818 );
1819
1820 assert_eq!(
1822 ks.set("b".into(), Bytes::from("val"), None, false, false),
1823 SetResult::Ok
1824 );
1825
1826 assert!(!ks.exists("a"));
1828 assert!(ks.exists("b"));
1829 }
1830
1831 #[test]
1832 fn safety_margin_rejects_near_raw_limit() {
1833 let config = ShardConfig {
1838 max_memory: Some(120),
1839 eviction_policy: EvictionPolicy::NoEviction,
1840 ..ShardConfig::default()
1841 };
1842 let mut ks = Keyspace::with_config(config);
1843
1844 assert_eq!(
1845 ks.set("a".into(), Bytes::from("val"), None, false, false),
1846 SetResult::Ok
1847 );
1848
1849 let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
1850 assert_eq!(result, SetResult::OutOfMemory);
1851 }
1852
1853 #[test]
1854 fn overwrite_same_size_succeeds_at_limit() {
1855 let config = ShardConfig {
1856 max_memory: Some(130),
1857 eviction_policy: EvictionPolicy::NoEviction,
1858 ..ShardConfig::default()
1859 };
1860 let mut ks = Keyspace::with_config(config);
1861
1862 assert_eq!(
1863 ks.set("a".into(), Bytes::from("val"), None, false, false),
1864 SetResult::Ok
1865 );
1866
1867 assert_eq!(
1869 ks.set("a".into(), Bytes::from("new"), None, false, false),
1870 SetResult::Ok
1871 );
1872 assert_eq!(
1873 ks.get("a").unwrap(),
1874 Some(Value::String(Bytes::from("new")))
1875 );
1876 }
1877
1878 #[test]
1879 fn overwrite_larger_value_respects_limit() {
1880 let config = ShardConfig {
1881 max_memory: Some(130),
1882 eviction_policy: EvictionPolicy::NoEviction,
1883 ..ShardConfig::default()
1884 };
1885 let mut ks = Keyspace::with_config(config);
1886
1887 assert_eq!(
1888 ks.set("a".into(), Bytes::from("val"), None, false, false),
1889 SetResult::Ok
1890 );
1891
1892 let big_value = "x".repeat(200);
1894 let result = ks.set("a".into(), Bytes::from(big_value), None, false, false);
1895 assert_eq!(result, SetResult::OutOfMemory);
1896
1897 assert_eq!(
1899 ks.get("a").unwrap(),
1900 Some(Value::String(Bytes::from("val")))
1901 );
1902 }
1903
1904 #[test]
1907 fn iter_entries_returns_live_entries() {
1908 let mut ks = Keyspace::new();
1909 ks.set("a".into(), Bytes::from("1"), None, false, false);
1910 ks.set(
1911 "b".into(),
1912 Bytes::from("2"),
1913 Some(Duration::from_secs(100)),
1914 false,
1915 false,
1916 );
1917
1918 let entries: Vec<_> = ks.iter_entries().collect();
1919 assert_eq!(entries.len(), 2);
1920 }
1921
1922 #[test]
1923 fn iter_entries_skips_expired() {
1924 let mut ks = Keyspace::new();
1925 ks.set(
1926 "dead".into(),
1927 Bytes::from("gone"),
1928 Some(Duration::from_millis(1)),
1929 false,
1930 false,
1931 );
1932 ks.set("alive".into(), Bytes::from("here"), None, false, false);
1933 thread::sleep(Duration::from_millis(10));
1934
1935 let entries: Vec<_> = ks.iter_entries().collect();
1936 assert_eq!(entries.len(), 1);
1937 assert_eq!(entries[0].0, "alive");
1938 }
1939
1940 #[test]
1941 fn iter_entries_ttl_for_no_expiry() {
1942 let mut ks = Keyspace::new();
1943 ks.set("permanent".into(), Bytes::from("val"), None, false, false);
1944
1945 let entries: Vec<_> = ks.iter_entries().collect();
1946 assert_eq!(entries[0].2, -1);
1947 }
1948
1949 #[test]
1952 fn restore_adds_entry() {
1953 let mut ks = Keyspace::new();
1954 ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1955 assert_eq!(
1956 ks.get("restored").unwrap(),
1957 Some(Value::String(Bytes::from("data")))
1958 );
1959 assert_eq!(ks.stats().key_count, 1);
1960 }
1961
1962 #[test]
1963 fn restore_with_zero_ttl_expires_immediately() {
1964 let mut ks = Keyspace::new();
1965 ks.restore(
1967 "short-lived".into(),
1968 Value::String(Bytes::from("data")),
1969 Some(Duration::from_millis(1)),
1970 );
1971 std::thread::sleep(Duration::from_millis(5));
1973 assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
1974 }
1975
1976 #[test]
1977 fn restore_overwrites_existing() {
1978 let mut ks = Keyspace::new();
1979 ks.set("key".into(), Bytes::from("old"), None, false, false);
1980 ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1981 assert_eq!(
1982 ks.get("key").unwrap(),
1983 Some(Value::String(Bytes::from("new")))
1984 );
1985 assert_eq!(ks.stats().key_count, 1);
1986 }
1987
1988 #[test]
1989 fn restore_bypasses_memory_limit() {
1990 let config = ShardConfig {
1991 max_memory: Some(50), eviction_policy: EvictionPolicy::NoEviction,
1993 ..ShardConfig::default()
1994 };
1995 let mut ks = Keyspace::with_config(config);
1996
1997 ks.restore(
1999 "big".into(),
2000 Value::String(Bytes::from("x".repeat(200))),
2001 None,
2002 );
2003 assert_eq!(ks.stats().key_count, 1);
2004 }
2005
2006 #[test]
2007 fn no_limit_never_rejects() {
2008 let mut ks = Keyspace::new();
2010 for i in 0..100 {
2011 assert_eq!(
2012 ks.set(format!("key:{i}"), Bytes::from("value"), None, false, false),
2013 SetResult::Ok
2014 );
2015 }
2016 assert_eq!(ks.len(), 100);
2017 }
2018
2019 #[test]
2020 fn clear_removes_all_keys() {
2021 let mut ks = Keyspace::new();
2022 ks.set("a".into(), Bytes::from("1"), None, false, false);
2023 ks.set(
2024 "b".into(),
2025 Bytes::from("2"),
2026 Some(Duration::from_secs(60)),
2027 false,
2028 false,
2029 );
2030 ks.lpush("list", &[Bytes::from("x")]).unwrap();
2031
2032 assert_eq!(ks.len(), 3);
2033 assert!(ks.stats().used_bytes > 0);
2034 assert_eq!(ks.stats().keys_with_expiry, 1);
2035
2036 ks.clear();
2037
2038 assert_eq!(ks.len(), 0);
2039 assert!(ks.is_empty());
2040 assert_eq!(ks.stats().used_bytes, 0);
2041 assert_eq!(ks.stats().keys_with_expiry, 0);
2042 }
2043
2044 #[test]
2047 fn scan_returns_keys() {
2048 let mut ks = Keyspace::new();
2049 ks.set("key1".into(), Bytes::from("a"), None, false, false);
2050 ks.set("key2".into(), Bytes::from("b"), None, false, false);
2051 ks.set("key3".into(), Bytes::from("c"), None, false, false);
2052
2053 let (cursor, keys) = ks.scan_keys(0, 10, None);
2054 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
2056 }
2057
2058 #[test]
2059 fn scan_empty_keyspace() {
2060 let ks = Keyspace::new();
2061 let (cursor, keys) = ks.scan_keys(0, 10, None);
2062 assert_eq!(cursor, 0);
2063 assert!(keys.is_empty());
2064 }
2065
2066 #[test]
2067 fn scan_with_pattern() {
2068 let mut ks = Keyspace::new();
2069 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2070 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2071 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2072
2073 let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
2074 assert_eq!(cursor, 0);
2075 assert_eq!(keys.len(), 2);
2076 for k in &keys {
2077 assert!(k.starts_with("user:"));
2078 }
2079 }
2080
2081 #[test]
2082 fn scan_with_count_limit() {
2083 let mut ks = Keyspace::new();
2084 for i in 0..10 {
2085 ks.set(format!("k{i}"), Bytes::from("v"), None, false, false);
2086 }
2087
2088 let (cursor, keys) = ks.scan_keys(0, 3, None);
2090 assert!(!keys.is_empty());
2091 assert!(keys.len() <= 3);
2092
2093 if cursor != 0 {
2095 let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
2096 assert!(!keys2.is_empty());
2097 let _ = (cursor2, keys2);
2099 }
2100 }
2101
2102 #[test]
2103 fn scan_skips_expired_keys() {
2104 let mut ks = Keyspace::new();
2105 ks.set("live".into(), Bytes::from("a"), None, false, false);
2106 ks.set(
2107 "expired".into(),
2108 Bytes::from("b"),
2109 Some(Duration::from_millis(1)),
2110 false,
2111 false,
2112 );
2113
2114 std::thread::sleep(Duration::from_millis(5));
2115
2116 let (_, keys) = ks.scan_keys(0, 10, None);
2117 assert_eq!(keys.len(), 1);
2118 assert_eq!(keys[0], "live");
2119 }
2120
2121 #[test]
2122 fn glob_match_star() {
2123 assert!(super::glob_match("user:*", "user:123"));
2124 assert!(super::glob_match("user:*", "user:"));
2125 assert!(super::glob_match("*:data", "foo:data"));
2126 assert!(!super::glob_match("user:*", "item:123"));
2127 }
2128
2129 #[test]
2130 fn glob_match_question() {
2131 assert!(super::glob_match("key?", "key1"));
2132 assert!(super::glob_match("key?", "keya"));
2133 assert!(!super::glob_match("key?", "key"));
2134 assert!(!super::glob_match("key?", "key12"));
2135 }
2136
2137 #[test]
2138 fn glob_match_brackets() {
2139 assert!(super::glob_match("key[abc]", "keya"));
2140 assert!(super::glob_match("key[abc]", "keyb"));
2141 assert!(!super::glob_match("key[abc]", "keyd"));
2142 }
2143
2144 #[test]
2145 fn glob_match_literal() {
2146 assert!(super::glob_match("exact", "exact"));
2147 assert!(!super::glob_match("exact", "exactnot"));
2148 assert!(!super::glob_match("exact", "notexact"));
2149 }
2150
2151 #[test]
2154 fn persist_removes_expiry() {
2155 let mut ks = Keyspace::new();
2156 ks.set(
2157 "key".into(),
2158 Bytes::from("val"),
2159 Some(Duration::from_secs(60)),
2160 false,
2161 false,
2162 );
2163 assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2164
2165 assert!(ks.persist("key"));
2166 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2167 assert_eq!(ks.stats().keys_with_expiry, 0);
2168 }
2169
2170 #[test]
2171 fn persist_returns_false_without_expiry() {
2172 let mut ks = Keyspace::new();
2173 ks.set("key".into(), Bytes::from("val"), None, false, false);
2174 assert!(!ks.persist("key"));
2175 }
2176
2177 #[test]
2178 fn persist_returns_false_for_missing_key() {
2179 let mut ks = Keyspace::new();
2180 assert!(!ks.persist("missing"));
2181 }
2182
2183 #[test]
2184 fn pttl_returns_milliseconds() {
2185 let mut ks = Keyspace::new();
2186 ks.set(
2187 "key".into(),
2188 Bytes::from("val"),
2189 Some(Duration::from_secs(60)),
2190 false,
2191 false,
2192 );
2193 match ks.pttl("key") {
2194 TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2195 other => panic!("expected Milliseconds, got {other:?}"),
2196 }
2197 }
2198
2199 #[test]
2200 fn pttl_no_expiry() {
2201 let mut ks = Keyspace::new();
2202 ks.set("key".into(), Bytes::from("val"), None, false, false);
2203 assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2204 }
2205
2206 #[test]
2207 fn pttl_not_found() {
2208 let mut ks = Keyspace::new();
2209 assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2210 }
2211
2212 #[test]
2213 fn pexpire_sets_ttl_in_millis() {
2214 let mut ks = Keyspace::new();
2215 ks.set("key".into(), Bytes::from("val"), None, false, false);
2216 assert!(ks.pexpire("key", 5000));
2217 match ks.pttl("key") {
2218 TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2219 other => panic!("expected Milliseconds, got {other:?}"),
2220 }
2221 assert_eq!(ks.stats().keys_with_expiry, 1);
2222 }
2223
2224 #[test]
2225 fn pexpire_missing_key_returns_false() {
2226 let mut ks = Keyspace::new();
2227 assert!(!ks.pexpire("missing", 5000));
2228 }
2229
2230 #[test]
2231 fn pexpire_overwrites_existing_ttl() {
2232 let mut ks = Keyspace::new();
2233 ks.set(
2234 "key".into(),
2235 Bytes::from("val"),
2236 Some(Duration::from_secs(60)),
2237 false,
2238 false,
2239 );
2240 assert!(ks.pexpire("key", 500));
2241 match ks.pttl("key") {
2242 TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2243 other => panic!("expected Milliseconds, got {other:?}"),
2244 }
2245 assert_eq!(ks.stats().keys_with_expiry, 1);
2247 }
2248
2249 #[test]
2252 fn expireat_sets_expiry_on_existing_key() {
2253 use std::time::{SystemTime, UNIX_EPOCH};
2254 let mut ks = Keyspace::new();
2255 ks.set("k".into(), Bytes::from("v"), None, false, false);
2256 let future_secs = SystemTime::now()
2257 .duration_since(UNIX_EPOCH)
2258 .unwrap()
2259 .as_secs()
2260 + 60;
2261 assert!(ks.expireat("k", future_secs));
2262 assert!(matches!(ks.ttl("k"), TtlResult::Seconds(_)));
2263 assert_eq!(ks.stats().keys_with_expiry, 1);
2264 }
2265
2266 #[test]
2267 fn expireat_missing_key_returns_false() {
2268 let mut ks = Keyspace::new();
2269 assert!(!ks.expireat("missing", 9_999_999_999));
2270 }
2271
2272 #[test]
2273 fn expireat_does_not_double_count_expiry() {
2274 use std::time::{SystemTime, UNIX_EPOCH};
2275 let mut ks = Keyspace::new();
2276 let base = SystemTime::now()
2277 .duration_since(UNIX_EPOCH)
2278 .unwrap()
2279 .as_secs();
2280 ks.set(
2281 "k".into(),
2282 Bytes::from("v"),
2283 Some(Duration::from_secs(30)),
2284 false,
2285 false,
2286 );
2287 assert_eq!(ks.stats().keys_with_expiry, 1);
2288 assert!(ks.expireat("k", base + 120));
2289 assert_eq!(ks.stats().keys_with_expiry, 1);
2290 }
2291
2292 #[test]
2293 fn pexpireat_sets_expiry_in_ms() {
2294 use std::time::{SystemTime, UNIX_EPOCH};
2295 let mut ks = Keyspace::new();
2296 ks.set("k".into(), Bytes::from("v"), None, false, false);
2297 let future_ms = SystemTime::now()
2298 .duration_since(UNIX_EPOCH)
2299 .unwrap()
2300 .as_millis() as u64
2301 + 60_000;
2302 assert!(ks.pexpireat("k", future_ms));
2303 assert!(matches!(ks.pttl("k"), TtlResult::Milliseconds(_)));
2304 assert_eq!(ks.stats().keys_with_expiry, 1);
2305 }
2306
2307 #[test]
2308 fn pexpireat_missing_key_returns_false() {
2309 let mut ks = Keyspace::new();
2310 assert!(!ks.pexpireat("missing", 9_999_999_999_000));
2311 }
2312
2313 #[test]
2316 fn keys_match_all() {
2317 let mut ks = Keyspace::new();
2318 ks.set("a".into(), Bytes::from("1"), None, false, false);
2319 ks.set("b".into(), Bytes::from("2"), None, false, false);
2320 ks.set("c".into(), Bytes::from("3"), None, false, false);
2321 let mut result = ks.keys("*");
2322 result.sort();
2323 assert_eq!(result, vec!["a", "b", "c"]);
2324 }
2325
2326 #[test]
2327 fn keys_with_pattern() {
2328 let mut ks = Keyspace::new();
2329 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2330 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2331 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2332 let mut result = ks.keys("user:*");
2333 result.sort();
2334 assert_eq!(result, vec!["user:1", "user:2"]);
2335 }
2336
2337 #[test]
2338 fn keys_skips_expired() {
2339 let mut ks = Keyspace::new();
2340 ks.set("live".into(), Bytes::from("a"), None, false, false);
2341 ks.set(
2342 "dead".into(),
2343 Bytes::from("b"),
2344 Some(Duration::from_millis(1)),
2345 false,
2346 false,
2347 );
2348 thread::sleep(Duration::from_millis(5));
2349 let result = ks.keys("*");
2350 assert_eq!(result, vec!["live"]);
2351 }
2352
2353 #[test]
2354 fn keys_empty_keyspace() {
2355 let ks = Keyspace::new();
2356 assert!(ks.keys("*").is_empty());
2357 }
2358
2359 #[test]
2362 fn rename_basic() {
2363 let mut ks = Keyspace::new();
2364 ks.set("old".into(), Bytes::from("value"), None, false, false);
2365 ks.rename("old", "new").unwrap();
2366 assert!(!ks.exists("old"));
2367 assert_eq!(
2368 ks.get("new").unwrap(),
2369 Some(Value::String(Bytes::from("value")))
2370 );
2371 }
2372
2373 #[test]
2374 fn rename_preserves_expiry() {
2375 let mut ks = Keyspace::new();
2376 ks.set(
2377 "old".into(),
2378 Bytes::from("val"),
2379 Some(Duration::from_secs(60)),
2380 false,
2381 false,
2382 );
2383 ks.rename("old", "new").unwrap();
2384 match ks.ttl("new") {
2385 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2386 other => panic!("expected TTL preserved, got {other:?}"),
2387 }
2388 }
2389
2390 #[test]
2391 fn rename_overwrites_destination() {
2392 let mut ks = Keyspace::new();
2393 ks.set("src".into(), Bytes::from("new_val"), None, false, false);
2394 ks.set("dst".into(), Bytes::from("old_val"), None, false, false);
2395 ks.rename("src", "dst").unwrap();
2396 assert!(!ks.exists("src"));
2397 assert_eq!(
2398 ks.get("dst").unwrap(),
2399 Some(Value::String(Bytes::from("new_val")))
2400 );
2401 assert_eq!(ks.len(), 1);
2402 }
2403
2404 #[test]
2405 fn rename_missing_key_returns_error() {
2406 let mut ks = Keyspace::new();
2407 let err = ks.rename("missing", "new").unwrap_err();
2408 assert_eq!(err, RenameError::NoSuchKey);
2409 }
2410
2411 #[test]
2412 fn rename_same_key() {
2413 let mut ks = Keyspace::new();
2414 ks.set("key".into(), Bytes::from("val"), None, false, false);
2415 ks.rename("key", "key").unwrap();
2417 assert_eq!(
2418 ks.get("key").unwrap(),
2419 Some(Value::String(Bytes::from("val")))
2420 );
2421 }
2422
2423 #[test]
2424 fn rename_tracks_memory() {
2425 let mut ks = Keyspace::new();
2426 ks.set("old".into(), Bytes::from("value"), None, false, false);
2427 let before = ks.stats().used_bytes;
2428 ks.rename("old", "new").unwrap();
2429 let after = ks.stats().used_bytes;
2430 assert_eq!(before, after);
2432 assert_eq!(ks.stats().key_count, 1);
2433 }
2434
2435 #[test]
2436 fn zero_ttl_expires_immediately() {
2437 let mut ks = Keyspace::new();
2438 ks.set(
2439 "key".into(),
2440 Bytes::from("val"),
2441 Some(Duration::ZERO),
2442 false,
2443 false,
2444 );
2445
2446 std::thread::sleep(Duration::from_millis(1));
2448 assert!(ks.get("key").unwrap().is_none());
2449 }
2450
2451 #[test]
2452 fn very_small_ttl_expires_quickly() {
2453 let mut ks = Keyspace::new();
2454 ks.set(
2455 "key".into(),
2456 Bytes::from("val"),
2457 Some(Duration::from_millis(1)),
2458 false,
2459 false,
2460 );
2461
2462 std::thread::sleep(Duration::from_millis(5));
2463 assert!(ks.get("key").unwrap().is_none());
2464 }
2465
2466 #[test]
2467 fn count_keys_in_slot_empty() {
2468 let ks = Keyspace::new();
2469 assert_eq!(ks.count_keys_in_slot(0), 0);
2470 }
2471
2472 #[test]
2473 fn count_keys_in_slot_matches() {
2474 let mut ks = Keyspace::new();
2475 ks.set("a".into(), Bytes::from("1"), None, false, false);
2477 ks.set("b".into(), Bytes::from("2"), None, false, false);
2478 ks.set("c".into(), Bytes::from("3"), None, false, false);
2479
2480 let slot_a = ember_cluster::key_slot(b"a");
2481 let count = ks.count_keys_in_slot(slot_a);
2482 assert!(count >= 1);
2484 }
2485
2486 #[test]
2487 fn count_keys_in_slot_skips_expired() {
2488 let mut ks = Keyspace::new();
2489 let slot = ember_cluster::key_slot(b"temp");
2490 ks.set(
2491 "temp".into(),
2492 Bytes::from("gone"),
2493 Some(Duration::from_millis(0)),
2494 false,
2495 false,
2496 );
2497 thread::sleep(Duration::from_millis(5));
2499 assert_eq!(ks.count_keys_in_slot(slot), 0);
2500 }
2501
2502 #[test]
2503 fn get_keys_in_slot_returns_matching() {
2504 let mut ks = Keyspace::new();
2505 ks.set("x".into(), Bytes::from("1"), None, false, false);
2506 ks.set("y".into(), Bytes::from("2"), None, false, false);
2507
2508 let slot_x = ember_cluster::key_slot(b"x");
2509 let keys = ks.get_keys_in_slot(slot_x, 100);
2510 assert!(keys.contains(&"x".to_string()));
2511 }
2512
2513 #[test]
2514 fn get_keys_in_slot_respects_count_limit() {
2515 let mut ks = Keyspace::new();
2516 for i in 0..100 {
2518 ks.set(format!("key:{i}"), Bytes::from("v"), None, false, false);
2519 }
2520 let keys = ks.get_keys_in_slot(0, 3);
2522 assert!(keys.len() <= 3);
2523 }
2524
2525 #[test]
2532 fn key_version_returns_none_for_missing() {
2533 let mut ks = Keyspace::new();
2534 assert_eq!(ks.key_version("nope"), None);
2535 }
2536
2537 #[test]
2538 fn key_version_changes_on_set() {
2539 let mut ks = Keyspace::new();
2540 ks.set("k".into(), Bytes::from("v1"), None, false, false);
2541 let v1 = ks.key_version("k").expect("key should exist");
2543 ks.set("k".into(), Bytes::from("v2"), None, false, false);
2544 let v2 = ks.key_version("k").expect("key should exist");
2545 assert!(v2 > v1, "version should increase on overwrite");
2546 }
2547
2548 #[test]
2549 fn key_version_none_after_del() {
2550 let mut ks = Keyspace::new();
2551 ks.set("k".into(), Bytes::from("v"), None, false, false);
2552 assert!(ks.key_version("k").is_some());
2553 ks.del("k");
2554 assert_eq!(ks.key_version("k"), None);
2555 }
2556
2557 #[test]
2558 fn key_version_changes_on_list_push() {
2559 let mut ks = Keyspace::new();
2560 ks.lpush("list", &[Bytes::from("a")]).unwrap();
2561 let v1 = ks.key_version("list").expect("list should exist");
2562 ks.rpush("list", &[Bytes::from("b")]).unwrap();
2563 let v2 = ks.key_version("list").expect("list should exist");
2564 assert!(v2 > v1, "version should increase on rpush");
2565 }
2566
2567 #[test]
2568 fn key_version_changes_on_hash_set() {
2569 let mut ks = Keyspace::new();
2570 ks.hset("h", &[("f1".into(), Bytes::from("v1"))]).unwrap();
2571 let v1 = ks.key_version("h").expect("hash should exist");
2572 ks.hset("h", &[("f2".into(), Bytes::from("v2"))]).unwrap();
2573 let v2 = ks.key_version("h").expect("hash should exist");
2574 assert!(v2 > v1, "version should increase on hset");
2575 }
2576
2577 #[test]
2578 fn key_version_changes_on_expire() {
2579 let mut ks = Keyspace::new();
2580 ks.set("k".into(), Bytes::from("v"), None, false, false);
2581 let v1 = ks.key_version("k").expect("key should exist");
2582 ks.expire("k", 100);
2583 let v2 = ks.key_version("k").expect("key should exist");
2584 assert!(v2 > v1, "version should increase on expire");
2585 }
2586
2587 #[test]
2588 fn key_version_stable_without_watch() {
2589 let mut ks = Keyspace::new();
2592 ks.set("a".into(), Bytes::from("1"), None, false, false);
2593 ks.set("a".into(), Bytes::from("2"), None, false, false);
2594 let v1 = ks.key_version("a").unwrap();
2596 let v2 = ks.key_version("a").unwrap();
2598 assert_eq!(v1, v2, "version should be stable without mutations");
2599 }
2600
2601 #[test]
2604 fn copy_basic() {
2605 let mut ks = Keyspace::new();
2606 ks.set("src".into(), Bytes::from("hello"), None, false, false);
2607 assert_eq!(ks.copy("src", "dst", false), Ok(true));
2608 assert_eq!(
2609 ks.get("dst").unwrap(),
2610 Some(Value::String(Bytes::from("hello")))
2611 );
2612 assert!(ks.exists("src"));
2614 }
2615
2616 #[test]
2617 fn copy_preserves_expiry() {
2618 let mut ks = Keyspace::new();
2619 ks.set(
2620 "src".into(),
2621 Bytes::from("val"),
2622 Some(Duration::from_secs(60)),
2623 false,
2624 false,
2625 );
2626 assert_eq!(ks.copy("src", "dst", false), Ok(true));
2627 match ks.ttl("dst") {
2628 TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
2629 other => panic!("expected TTL preserved, got {other:?}"),
2630 }
2631 }
2632
2633 #[test]
2634 fn copy_no_replace_returns_false() {
2635 let mut ks = Keyspace::new();
2636 ks.set("src".into(), Bytes::from("a"), None, false, false);
2637 ks.set("dst".into(), Bytes::from("b"), None, false, false);
2638 assert_eq!(ks.copy("src", "dst", false), Ok(false));
2639 assert_eq!(
2641 ks.get("dst").unwrap(),
2642 Some(Value::String(Bytes::from("b")))
2643 );
2644 }
2645
2646 #[test]
2647 fn copy_replace_overwrites() {
2648 let mut ks = Keyspace::new();
2649 ks.set("src".into(), Bytes::from("new"), None, false, false);
2650 ks.set("dst".into(), Bytes::from("old"), None, false, false);
2651 assert_eq!(ks.copy("src", "dst", true), Ok(true));
2652 assert_eq!(
2653 ks.get("dst").unwrap(),
2654 Some(Value::String(Bytes::from("new")))
2655 );
2656 }
2657
2658 #[test]
2659 fn copy_missing_source() {
2660 let mut ks = Keyspace::new();
2661 assert_eq!(ks.copy("missing", "dst", false), Err(CopyError::NoSuchKey));
2662 }
2663
2664 #[test]
2665 fn copy_tracks_memory() {
2666 let mut ks = Keyspace::new();
2667 ks.set("src".into(), Bytes::from("value"), None, false, false);
2668 let before = ks.stats().used_bytes;
2669 ks.copy("src", "dst", false).unwrap();
2670 let after = ks.stats().used_bytes;
2671 assert!(after > before);
2673 assert_eq!(ks.stats().key_count, 2);
2674 }
2675
2676 #[test]
2679 fn random_key_empty() {
2680 let mut ks = Keyspace::new();
2681 assert_eq!(ks.random_key(), None);
2682 }
2683
2684 #[test]
2685 fn random_key_returns_existing() {
2686 let mut ks = Keyspace::new();
2687 ks.set("only".into(), Bytes::from("val"), None, false, false);
2688 assert_eq!(ks.random_key(), Some("only".into()));
2689 }
2690
2691 #[test]
2694 fn touch_existing_key() {
2695 let mut ks = Keyspace::new();
2696 ks.set("k".into(), Bytes::from("v"), None, false, false);
2697 assert!(ks.touch("k"));
2698 }
2699
2700 #[test]
2701 fn touch_missing_key() {
2702 let mut ks = Keyspace::new();
2703 assert!(!ks.touch("missing"));
2704 }
2705
2706 #[test]
2709 fn sort_list_numeric() {
2710 let mut ks = Keyspace::new();
2711 let _ = ks.lpush(
2712 "nums",
2713 &[Bytes::from("3"), Bytes::from("1"), Bytes::from("2")],
2714 );
2715 let result = ks.sort("nums", false, false, None).unwrap();
2716 assert_eq!(
2717 result,
2718 vec![Bytes::from("1"), Bytes::from("2"), Bytes::from("3")]
2719 );
2720 }
2721
2722 #[test]
2723 fn sort_list_alpha_desc() {
2724 let mut ks = Keyspace::new();
2725 let _ = ks.lpush(
2726 "words",
2727 &[
2728 Bytes::from("banana"),
2729 Bytes::from("apple"),
2730 Bytes::from("cherry"),
2731 ],
2732 );
2733 let result = ks.sort("words", true, true, None).unwrap();
2734 assert_eq!(
2735 result,
2736 vec![
2737 Bytes::from("cherry"),
2738 Bytes::from("banana"),
2739 Bytes::from("apple")
2740 ]
2741 );
2742 }
2743
2744 #[test]
2745 fn sort_with_limit() {
2746 let mut ks = Keyspace::new();
2747 let _ = ks.lpush(
2748 "nums",
2749 &[
2750 Bytes::from("4"),
2751 Bytes::from("3"),
2752 Bytes::from("2"),
2753 Bytes::from("1"),
2754 ],
2755 );
2756 let result = ks.sort("nums", false, false, Some((1, 2))).unwrap();
2757 assert_eq!(result, vec![Bytes::from("2"), Bytes::from("3")]);
2758 }
2759
2760 #[test]
2761 fn sort_set_alpha() {
2762 let mut ks = Keyspace::new();
2763 let members: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
2764 let _ = ks.sadd("myset", &members);
2765 let result = ks.sort("myset", false, true, None).unwrap();
2766 assert_eq!(
2767 result,
2768 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
2769 );
2770 }
2771
2772 #[test]
2773 fn sort_missing_key() {
2774 let mut ks = Keyspace::new();
2775 let result = ks.sort("nope", false, false, None).unwrap();
2776 assert!(result.is_empty());
2777 }
2778
2779 #[test]
2780 fn sort_wrong_type() {
2781 let mut ks = Keyspace::new();
2782 ks.set("str".into(), Bytes::from("hello"), None, false, false);
2783 let result = ks.sort("str", false, false, None);
2784 assert!(result.is_err());
2785 }
2786}