1use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, Instant};
10
11use bytes::Bytes;
12use rand::seq::IteratorRandom;
13
14use crate::memory::{self, MemoryTracker};
15use crate::types::sorted_set::{SortedSet, ZAddFlags};
16use crate::types::{self, normalize_range, Value};
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct WrongType;
21
22impl std::fmt::Display for WrongType {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 write!(
25 f,
26 "WRONGTYPE Operation against a key holding the wrong kind of value"
27 )
28 }
29}
30
31impl std::error::Error for WrongType {}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum WriteError {
37 WrongType,
39 OutOfMemory,
41}
42
43impl From<WrongType> for WriteError {
44 fn from(_: WrongType) -> Self {
45 WriteError::WrongType
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum IncrError {
52 WrongType,
54 NotAnInteger,
56 Overflow,
58 OutOfMemory,
60}
61
62impl std::fmt::Display for IncrError {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 IncrError::WrongType => write!(
66 f,
67 "WRONGTYPE Operation against a key holding the wrong kind of value"
68 ),
69 IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
70 IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
71 IncrError::OutOfMemory => {
72 write!(f, "OOM command not allowed when used memory > 'maxmemory'")
73 }
74 }
75 }
76}
77
78impl std::error::Error for IncrError {}
79#[derive(Debug, Clone)]
82pub struct ZAddResult {
83 pub count: usize,
85 pub applied: Vec<(f64, String)>,
88}
89
90#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
92pub enum EvictionPolicy {
93 #[default]
95 NoEviction,
96 AllKeysLru,
98}
99
100#[derive(Debug, Clone)]
102pub struct ShardConfig {
103 pub max_memory: Option<usize>,
105 pub eviction_policy: EvictionPolicy,
107 pub shard_id: u16,
109}
110
111impl Default for ShardConfig {
112 fn default() -> Self {
113 Self {
114 max_memory: None,
115 eviction_policy: EvictionPolicy::NoEviction,
116 shard_id: 0,
117 }
118 }
119}
120
121#[derive(Debug, PartialEq, Eq)]
123pub enum SetResult {
124 Ok,
126 OutOfMemory,
128}
129
130#[derive(Debug, Clone)]
133pub(crate) struct Entry {
134 pub(crate) value: Value,
135 pub(crate) expires_at: Option<Instant>,
136 pub(crate) last_access: Instant,
137}
138
139impl Entry {
140 fn new(value: Value, expires_at: Option<Instant>) -> Self {
141 Self {
142 value,
143 expires_at,
144 last_access: Instant::now(),
145 }
146 }
147
148 fn is_expired(&self) -> bool {
150 match self.expires_at {
151 Some(deadline) => Instant::now() >= deadline,
152 None => false,
153 }
154 }
155
156 fn touch(&mut self) {
158 self.last_access = Instant::now();
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum TtlResult {
165 Seconds(u64),
167 Milliseconds(u64),
169 NoExpiry,
171 NotFound,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct KeyspaceStats {
178 pub key_count: usize,
180 pub used_bytes: usize,
182 pub keys_with_expiry: usize,
184}
185
186const EVICTION_SAMPLE_SIZE: usize = 16;
197
198pub struct Keyspace {
203 entries: HashMap<String, Entry>,
204 memory: MemoryTracker,
205 config: ShardConfig,
206 expiry_count: usize,
208}
209
210impl Keyspace {
211 pub fn new() -> Self {
213 Self::with_config(ShardConfig::default())
214 }
215
216 pub fn with_config(config: ShardConfig) -> Self {
218 Self {
219 entries: HashMap::new(),
220 memory: MemoryTracker::new(),
221 config,
222 expiry_count: 0,
223 }
224 }
225
226 pub fn get(&mut self, key: &str) -> Result<Option<Value>, WrongType> {
232 if self.remove_if_expired(key) {
233 return Ok(None);
234 }
235 match self.entries.get_mut(key) {
236 Some(e) => match &e.value {
237 Value::String(_) => {
238 e.touch();
239 Ok(Some(e.value.clone()))
240 }
241 _ => Err(WrongType),
242 },
243 None => Ok(None),
244 }
245 }
246
247 pub fn value_type(&mut self, key: &str) -> &'static str {
249 if self.remove_if_expired(key) {
250 return "none";
251 }
252 match self.entries.get(key) {
253 Some(e) => types::type_name(&e.value),
254 None => "none",
255 }
256 }
257
258 pub fn set(&mut self, key: String, value: Bytes, expire: Option<Duration>) -> SetResult {
267 let expires_at = expire.map(|d| Instant::now() + d);
268 let new_value = Value::String(value);
269
270 let new_size = memory::entry_size(&key, &new_value);
272 let old_size = self
273 .entries
274 .get(&key)
275 .map(|e| memory::entry_size(&key, &e.value))
276 .unwrap_or(0);
277 let net_increase = new_size.saturating_sub(old_size);
278
279 if !self.enforce_memory_limit(net_increase) {
280 return SetResult::OutOfMemory;
281 }
282
283 if let Some(old_entry) = self.entries.get(&key) {
284 self.memory.replace(&key, &old_entry.value, &new_value);
285 let had_expiry = old_entry.expires_at.is_some();
287 let has_expiry = expires_at.is_some();
288 match (had_expiry, has_expiry) {
289 (false, true) => self.expiry_count += 1,
290 (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
291 _ => {}
292 }
293 } else {
294 self.memory.add(&key, &new_value);
295 if expires_at.is_some() {
296 self.expiry_count += 1;
297 }
298 }
299
300 self.entries.insert(key, Entry::new(new_value, expires_at));
301 SetResult::Ok
302 }
303
304 fn try_evict(&mut self) -> bool {
310 if self.entries.is_empty() {
311 return false;
312 }
313
314 let mut rng = rand::rng();
315
316 let victim = self
318 .entries
319 .iter()
320 .choose_multiple(&mut rng, EVICTION_SAMPLE_SIZE)
321 .into_iter()
322 .min_by_key(|(_, entry)| entry.last_access)
323 .map(|(k, _)| k.clone());
324
325 if let Some(key) = victim {
326 if let Some(entry) = self.entries.remove(&key) {
327 self.memory.remove(&key, &entry.value);
328 if entry.expires_at.is_some() {
329 self.expiry_count = self.expiry_count.saturating_sub(1);
330 }
331 return true;
332 }
333 }
334 false
335 }
336
337 fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
341 if let Some(max) = self.config.max_memory {
342 while self.memory.used_bytes() + estimated_increase > max {
343 match self.config.eviction_policy {
344 EvictionPolicy::NoEviction => return false,
345 EvictionPolicy::AllKeysLru => {
346 if !self.try_evict() {
347 return false;
348 }
349 }
350 }
351 }
352 }
353 true
354 }
355
356 pub fn del(&mut self, key: &str) -> bool {
358 if self.remove_if_expired(key) {
359 return false;
360 }
361 if let Some(entry) = self.entries.remove(key) {
362 self.memory.remove(key, &entry.value);
363 if entry.expires_at.is_some() {
364 self.expiry_count = self.expiry_count.saturating_sub(1);
365 }
366 true
367 } else {
368 false
369 }
370 }
371
372 pub fn exists(&mut self, key: &str) -> bool {
374 if self.remove_if_expired(key) {
375 return false;
376 }
377 self.entries.contains_key(key)
378 }
379
380 pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
383 if self.remove_if_expired(key) {
384 return false;
385 }
386 match self.entries.get_mut(key) {
387 Some(entry) => {
388 if entry.expires_at.is_none() {
389 self.expiry_count += 1;
390 }
391 entry.expires_at = Some(Instant::now() + Duration::from_secs(seconds));
392 true
393 }
394 None => false,
395 }
396 }
397
398 pub fn ttl(&mut self, key: &str) -> TtlResult {
403 if self.remove_if_expired(key) {
404 return TtlResult::NotFound;
405 }
406 match self.entries.get(key) {
407 Some(entry) => match entry.expires_at {
408 Some(deadline) => {
409 let remaining = deadline.saturating_duration_since(Instant::now());
410 TtlResult::Seconds(remaining.as_secs())
411 }
412 None => TtlResult::NoExpiry,
413 },
414 None => TtlResult::NotFound,
415 }
416 }
417
418 pub fn persist(&mut self, key: &str) -> bool {
423 if self.remove_if_expired(key) {
424 return false;
425 }
426 match self.entries.get_mut(key) {
427 Some(entry) => {
428 if entry.expires_at.is_some() {
429 entry.expires_at = None;
430 self.expiry_count = self.expiry_count.saturating_sub(1);
431 true
432 } else {
433 false
434 }
435 }
436 None => false,
437 }
438 }
439
440 pub fn pttl(&mut self, key: &str) -> TtlResult {
445 if self.remove_if_expired(key) {
446 return TtlResult::NotFound;
447 }
448 match self.entries.get(key) {
449 Some(entry) => match entry.expires_at {
450 Some(deadline) => {
451 let remaining = deadline.saturating_duration_since(Instant::now());
452 TtlResult::Milliseconds(remaining.as_millis() as u64)
453 }
454 None => TtlResult::NoExpiry,
455 },
456 None => TtlResult::NotFound,
457 }
458 }
459
460 pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
465 if self.remove_if_expired(key) {
466 return false;
467 }
468 match self.entries.get_mut(key) {
469 Some(entry) => {
470 if entry.expires_at.is_none() {
471 self.expiry_count += 1;
472 }
473 entry.expires_at = Some(Instant::now() + Duration::from_millis(millis));
474 true
475 }
476 None => false,
477 }
478 }
479
480 pub fn incr(&mut self, key: &str) -> Result<i64, IncrError> {
485 self.incr_by(key, 1)
486 }
487
488 pub fn decr(&mut self, key: &str) -> Result<i64, IncrError> {
493 self.incr_by(key, -1)
494 }
495
496 fn incr_by(&mut self, key: &str, delta: i64) -> Result<i64, IncrError> {
501 self.remove_if_expired(key);
502
503 let (current, existing_expire) = match self.entries.get(key) {
505 Some(entry) => {
506 let val = match &entry.value {
507 Value::String(data) => {
508 let s = std::str::from_utf8(data).map_err(|_| IncrError::NotAnInteger)?;
509 s.parse::<i64>().map_err(|_| IncrError::NotAnInteger)?
510 }
511 _ => return Err(IncrError::WrongType),
512 };
513 let expire = entry
514 .expires_at
515 .map(|deadline| deadline.saturating_duration_since(Instant::now()));
516 (val, expire)
517 }
518 None => (0, None),
519 };
520
521 let new_val = current.checked_add(delta).ok_or(IncrError::Overflow)?;
522 let new_bytes = Bytes::from(new_val.to_string());
523
524 match self.set(key.to_owned(), new_bytes, existing_expire) {
525 SetResult::Ok => Ok(new_val),
526 SetResult::OutOfMemory => Err(IncrError::OutOfMemory),
527 }
528 }
529
530 pub fn stats(&self) -> KeyspaceStats {
534 KeyspaceStats {
535 key_count: self.memory.key_count(),
536 used_bytes: self.memory.used_bytes(),
537 keys_with_expiry: self.expiry_count,
538 }
539 }
540
541 pub fn len(&self) -> usize {
543 self.entries.len()
544 }
545
546 pub fn clear(&mut self) {
548 self.entries.clear();
549 self.memory.reset();
550 self.expiry_count = 0;
551 }
552
553 pub fn is_empty(&self) -> bool {
555 self.entries.is_empty()
556 }
557
558 pub fn scan_keys(
563 &self,
564 cursor: u64,
565 count: usize,
566 pattern: Option<&str>,
567 ) -> (u64, Vec<String>) {
568 let mut keys = Vec::with_capacity(count);
569 let mut position = 0u64;
570 let target_count = if count == 0 { 10 } else { count };
571
572 for (key, entry) in self.entries.iter() {
573 if entry.is_expired() {
575 continue;
576 }
577
578 if position < cursor {
580 position += 1;
581 continue;
582 }
583
584 if let Some(pat) = pattern {
586 if !glob_match(pat, key) {
587 position += 1;
588 continue;
589 }
590 }
591
592 keys.push(key.clone());
593 position += 1;
594
595 if keys.len() >= target_count {
596 return (position, keys);
598 }
599 }
600
601 (0, keys)
603 }
604
605 pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
609 let now = Instant::now();
610 self.entries.iter().filter_map(move |(key, entry)| {
611 if entry.is_expired() {
612 return None;
613 }
614 let ttl_ms = match entry.expires_at {
615 Some(deadline) => {
616 let remaining = deadline.saturating_duration_since(now);
617 remaining.as_millis() as i64
618 }
619 None => -1,
620 };
621 Some((key.as_str(), &entry.value, ttl_ms))
622 })
623 }
624
625 pub fn restore(&mut self, key: String, value: Value, expires_at: Option<Instant>) {
631 if let Some(deadline) = expires_at {
633 if Instant::now() >= deadline {
634 return;
635 }
636 }
637
638 if let Some(old) = self.entries.get(&key) {
640 self.memory.replace(&key, &old.value, &value);
641 let had_expiry = old.expires_at.is_some();
642 let has_expiry = expires_at.is_some();
643 match (had_expiry, has_expiry) {
644 (false, true) => self.expiry_count += 1,
645 (true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
646 _ => {}
647 }
648 } else {
649 self.memory.add(&key, &value);
650 if expires_at.is_some() {
651 self.expiry_count += 1;
652 }
653 }
654
655 self.entries.insert(key, Entry::new(value, expires_at));
656 }
657
658 pub fn lpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
667 self.list_push(key, values, true)
668 }
669
670 pub fn rpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
677 self.list_push(key, values, false)
678 }
679
680 pub fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
685 self.list_pop(key, true)
686 }
687
688 pub fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
693 self.list_pop(key, false)
694 }
695
696 pub fn lrange(&mut self, key: &str, start: i64, stop: i64) -> Result<Vec<Bytes>, WrongType> {
702 if self.remove_if_expired(key) {
703 return Ok(vec![]);
704 }
705 match self.entries.get_mut(key) {
706 None => Ok(vec![]),
707 Some(entry) => {
708 let result = match &entry.value {
709 Value::List(deque) => {
710 let len = deque.len() as i64;
711 let (s, e) = normalize_range(start, stop, len);
712 if s > e {
713 return Ok(vec![]);
714 }
715 Ok(deque
716 .iter()
717 .skip(s as usize)
718 .take((e - s + 1) as usize)
719 .cloned()
720 .collect())
721 }
722 _ => Err(WrongType),
723 };
724 if result.is_ok() {
725 entry.touch();
726 }
727 result
728 }
729 }
730 }
731
732 pub fn llen(&mut self, key: &str) -> Result<usize, WrongType> {
736 if self.remove_if_expired(key) {
737 return Ok(0);
738 }
739 match self.entries.get(key) {
740 None => Ok(0),
741 Some(entry) => match &entry.value {
742 Value::List(deque) => Ok(deque.len()),
743 _ => Err(WrongType),
744 },
745 }
746 }
747
748 fn list_push(&mut self, key: &str, values: &[Bytes], left: bool) -> Result<usize, WriteError> {
750 self.remove_if_expired(key);
751
752 let is_new = !self.entries.contains_key(key);
753
754 if !is_new && !matches!(self.entries[key].value, Value::List(_)) {
755 return Err(WriteError::WrongType);
756 }
757
758 let element_increase: usize = values
760 .iter()
761 .map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
762 .sum();
763 let estimated_increase = if is_new {
764 memory::ENTRY_OVERHEAD + key.len() + memory::VECDEQUE_BASE_OVERHEAD + element_increase
765 } else {
766 element_increase
767 };
768 if !self.enforce_memory_limit(estimated_increase) {
769 return Err(WriteError::OutOfMemory);
770 }
771
772 if is_new {
773 let value = Value::List(VecDeque::new());
774 self.memory.add(key, &value);
775 self.entries.insert(key.to_owned(), Entry::new(value, None));
776 }
777
778 let entry = self
779 .entries
780 .get_mut(key)
781 .expect("just inserted or verified");
782 let old_entry_size = memory::entry_size(key, &entry.value);
783
784 if let Value::List(ref mut deque) = entry.value {
785 for val in values {
786 if left {
787 deque.push_front(val.clone());
788 } else {
789 deque.push_back(val.clone());
790 }
791 }
792 }
793 entry.touch();
794
795 let new_entry_size = memory::entry_size(key, &entry.value);
796 self.memory.adjust(old_entry_size, new_entry_size);
797
798 let len = match &entry.value {
799 Value::List(d) => d.len(),
800 _ => unreachable!(),
801 };
802 Ok(len)
803 }
804
805 fn list_pop(&mut self, key: &str, left: bool) -> Result<Option<Bytes>, WrongType> {
807 if self.remove_if_expired(key) {
808 return Ok(None);
809 }
810
811 match self.entries.get(key) {
812 None => return Ok(None),
813 Some(e) => {
814 if !matches!(e.value, Value::List(_)) {
815 return Err(WrongType);
816 }
817 }
818 };
819
820 let old_entry_size = memory::entry_size(key, &self.entries[key].value);
821 let entry = self.entries.get_mut(key).expect("verified above");
822 let popped = if let Value::List(ref mut deque) = entry.value {
823 if left {
824 deque.pop_front()
825 } else {
826 deque.pop_back()
827 }
828 } else {
829 unreachable!()
830 };
831 entry.touch();
832
833 let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty());
835 if is_empty {
836 let removed = self.entries.remove(key).expect("verified above");
837 self.memory.remove_with_size(old_entry_size);
839 if removed.expires_at.is_some() {
840 self.expiry_count = self.expiry_count.saturating_sub(1);
841 }
842 } else {
843 let new_entry_size = memory::entry_size(key, &self.entries[key].value);
844 self.memory.adjust(old_entry_size, new_entry_size);
845 }
846
847 Ok(popped)
848 }
849
850 pub fn zadd(
860 &mut self,
861 key: &str,
862 members: &[(f64, String)],
863 flags: &ZAddFlags,
864 ) -> Result<ZAddResult, WriteError> {
865 self.remove_if_expired(key);
866
867 let is_new = !self.entries.contains_key(key);
868 if !is_new && !matches!(self.entries[key].value, Value::SortedSet(_)) {
869 return Err(WriteError::WrongType);
870 }
871
872 let member_increase: usize = members
874 .iter()
875 .map(|(_, m)| SortedSet::estimated_member_cost(m))
876 .sum();
877 let estimated_increase = if is_new {
878 memory::ENTRY_OVERHEAD + key.len() + SortedSet::BASE_OVERHEAD + member_increase
879 } else {
880 member_increase
881 };
882 if !self.enforce_memory_limit(estimated_increase) {
883 return Err(WriteError::OutOfMemory);
884 }
885
886 if is_new {
887 let value = Value::SortedSet(SortedSet::new());
888 self.memory.add(key, &value);
889 self.entries.insert(key.to_owned(), Entry::new(value, None));
890 }
891
892 let entry = self
893 .entries
894 .get_mut(key)
895 .expect("just inserted or verified");
896 let old_entry_size = memory::entry_size(key, &entry.value);
897
898 let mut count = 0;
899 let mut applied = Vec::new();
900 if let Value::SortedSet(ref mut ss) = entry.value {
901 for (score, member) in members {
902 let result = ss.add_with_flags(member.clone(), *score, flags);
903 if result.added || result.updated {
904 applied.push((*score, member.clone()));
905 }
906 if flags.ch {
907 if result.added || result.updated {
908 count += 1;
909 }
910 } else if result.added {
911 count += 1;
912 }
913 }
914 }
915 entry.touch();
916
917 let new_entry_size = memory::entry_size(key, &entry.value);
918 self.memory.adjust(old_entry_size, new_entry_size);
919
920 if let Value::SortedSet(ref ss) = entry.value {
922 if ss.is_empty() {
923 self.memory
924 .remove_with_size(memory::entry_size(key, &entry.value));
925 self.entries.remove(key);
926 }
927 }
928
929 Ok(ZAddResult { count, applied })
930 }
931
932 pub fn zrem(&mut self, key: &str, members: &[String]) -> Result<Vec<String>, WrongType> {
938 if self.remove_if_expired(key) {
939 return Ok(vec![]);
940 }
941
942 match self.entries.get(key) {
943 None => return Ok(vec![]),
944 Some(e) => {
945 if !matches!(e.value, Value::SortedSet(_)) {
946 return Err(WrongType);
947 }
948 }
949 }
950
951 let old_entry_size = memory::entry_size(key, &self.entries[key].value);
952 let entry = self.entries.get_mut(key).expect("verified above");
953 let mut removed = Vec::new();
954 if let Value::SortedSet(ref mut ss) = entry.value {
955 for member in members {
956 if ss.remove(member) {
957 removed.push(member.clone());
958 }
959 }
960 }
961 entry.touch();
962
963 let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
964 if is_empty {
965 let removed_entry = self.entries.remove(key).expect("verified above");
966 self.memory.remove_with_size(old_entry_size);
967 if removed_entry.expires_at.is_some() {
968 self.expiry_count = self.expiry_count.saturating_sub(1);
969 }
970 } else {
971 let new_entry_size = memory::entry_size(key, &self.entries[key].value);
972 self.memory.adjust(old_entry_size, new_entry_size);
973 }
974
975 Ok(removed)
976 }
977
978 pub fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, WrongType> {
983 if self.remove_if_expired(key) {
984 return Ok(None);
985 }
986 match self.entries.get_mut(key) {
987 None => Ok(None),
988 Some(entry) => match &entry.value {
989 Value::SortedSet(ss) => {
990 let score = ss.score(member);
991 entry.touch();
992 Ok(score)
993 }
994 _ => Err(WrongType),
995 },
996 }
997 }
998
999 pub fn zrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
1004 if self.remove_if_expired(key) {
1005 return Ok(None);
1006 }
1007 match self.entries.get_mut(key) {
1008 None => Ok(None),
1009 Some(entry) => match &entry.value {
1010 Value::SortedSet(ss) => {
1011 let rank = ss.rank(member);
1012 entry.touch();
1013 Ok(rank)
1014 }
1015 _ => Err(WrongType),
1016 },
1017 }
1018 }
1019
1020 pub fn zrange(
1026 &mut self,
1027 key: &str,
1028 start: i64,
1029 stop: i64,
1030 ) -> Result<Vec<(String, f64)>, WrongType> {
1031 if self.remove_if_expired(key) {
1032 return Ok(vec![]);
1033 }
1034 match self.entries.get_mut(key) {
1035 None => Ok(vec![]),
1036 Some(entry) => {
1037 let result = match &entry.value {
1038 Value::SortedSet(ss) => {
1039 let items = ss.range_by_rank(start, stop);
1040 Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
1041 }
1042 _ => Err(WrongType),
1043 };
1044 if result.is_ok() {
1045 entry.touch();
1046 }
1047 result
1048 }
1049 }
1050 }
1051
1052 pub fn zcard(&mut self, key: &str) -> Result<usize, WrongType> {
1056 if self.remove_if_expired(key) {
1057 return Ok(0);
1058 }
1059 match self.entries.get(key) {
1060 None => Ok(0),
1061 Some(entry) => match &entry.value {
1062 Value::SortedSet(ss) => Ok(ss.len()),
1063 _ => Err(WrongType),
1064 },
1065 }
1066 }
1067
1068 pub fn expire_sample(&mut self, count: usize) -> usize {
1073 if self.entries.is_empty() {
1074 return 0;
1075 }
1076
1077 let mut rng = rand::rng();
1078
1079 let keys_to_check: Vec<String> = self
1080 .entries
1081 .keys()
1082 .choose_multiple(&mut rng, count)
1083 .into_iter()
1084 .cloned()
1085 .collect();
1086
1087 let mut removed = 0;
1088 for key in &keys_to_check {
1089 if self.remove_if_expired(key) {
1090 removed += 1;
1091 }
1092 }
1093 removed
1094 }
1095
1096 fn remove_if_expired(&mut self, key: &str) -> bool {
1099 let expired = self
1100 .entries
1101 .get(key)
1102 .map(|e| e.is_expired())
1103 .unwrap_or(false);
1104
1105 if expired {
1106 if let Some(entry) = self.entries.remove(key) {
1107 self.memory.remove(key, &entry.value);
1108 if entry.expires_at.is_some() {
1109 self.expiry_count = self.expiry_count.saturating_sub(1);
1110 }
1111 }
1112 }
1113 expired
1114 }
1115}
1116
1117impl Default for Keyspace {
1118 fn default() -> Self {
1119 Self::new()
1120 }
1121}
1122
1123fn glob_match(pattern: &str, text: &str) -> bool {
1134 let pat: Vec<char> = pattern.chars().collect();
1135 let txt: Vec<char> = text.chars().collect();
1136
1137 let mut pi = 0; let mut ti = 0; let mut star_pi: Option<usize> = None;
1142 let mut star_ti: usize = 0;
1143
1144 while ti < txt.len() || pi < pat.len() {
1145 if pi < pat.len() {
1146 match pat[pi] {
1147 '*' => {
1148 star_pi = Some(pi);
1150 star_ti = ti;
1151 pi += 1;
1152 continue;
1153 }
1154 '?' if ti < txt.len() => {
1155 pi += 1;
1156 ti += 1;
1157 continue;
1158 }
1159 '[' if ti < txt.len() => {
1160 let tc = txt[ti];
1162 let mut j = pi + 1;
1163 let mut negated = false;
1164 let mut matched = false;
1165
1166 if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
1167 negated = true;
1168 j += 1;
1169 }
1170
1171 while j < pat.len() && pat[j] != ']' {
1172 if pat[j] == tc {
1173 matched = true;
1174 }
1175 j += 1;
1176 }
1177
1178 if negated {
1179 matched = !matched;
1180 }
1181
1182 if matched && j < pat.len() {
1183 pi = j + 1; ti += 1;
1185 continue;
1186 }
1187 }
1189 c if ti < txt.len() && c == txt[ti] => {
1190 pi += 1;
1191 ti += 1;
1192 continue;
1193 }
1194 _ => {}
1195 }
1196 }
1197
1198 if let Some(sp) = star_pi {
1200 pi = sp + 1;
1201 star_ti += 1;
1202 ti = star_ti;
1203 if ti > txt.len() {
1204 return false;
1205 }
1206 } else {
1207 return false;
1208 }
1209 }
1210
1211 while pi < pat.len() && pat[pi] == '*' {
1213 pi += 1;
1214 }
1215
1216 pi == pat.len()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222 use std::thread;
1223
1224 #[test]
1225 fn set_and_get() {
1226 let mut ks = Keyspace::new();
1227 ks.set("hello".into(), Bytes::from("world"), None);
1228 assert_eq!(
1229 ks.get("hello").unwrap(),
1230 Some(Value::String(Bytes::from("world")))
1231 );
1232 }
1233
1234 #[test]
1235 fn get_missing_key() {
1236 let mut ks = Keyspace::new();
1237 assert_eq!(ks.get("nope").unwrap(), None);
1238 }
1239
1240 #[test]
1241 fn overwrite_replaces_value() {
1242 let mut ks = Keyspace::new();
1243 ks.set("key".into(), Bytes::from("first"), None);
1244 ks.set("key".into(), Bytes::from("second"), None);
1245 assert_eq!(
1246 ks.get("key").unwrap(),
1247 Some(Value::String(Bytes::from("second")))
1248 );
1249 }
1250
1251 #[test]
1252 fn overwrite_clears_old_ttl() {
1253 let mut ks = Keyspace::new();
1254 ks.set(
1255 "key".into(),
1256 Bytes::from("v1"),
1257 Some(Duration::from_secs(100)),
1258 );
1259 ks.set("key".into(), Bytes::from("v2"), None);
1261 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1262 }
1263
1264 #[test]
1265 fn del_existing() {
1266 let mut ks = Keyspace::new();
1267 ks.set("key".into(), Bytes::from("val"), None);
1268 assert!(ks.del("key"));
1269 assert_eq!(ks.get("key").unwrap(), None);
1270 }
1271
1272 #[test]
1273 fn del_missing() {
1274 let mut ks = Keyspace::new();
1275 assert!(!ks.del("nope"));
1276 }
1277
1278 #[test]
1279 fn exists_present_and_absent() {
1280 let mut ks = Keyspace::new();
1281 ks.set("yes".into(), Bytes::from("here"), None);
1282 assert!(ks.exists("yes"));
1283 assert!(!ks.exists("no"));
1284 }
1285
1286 #[test]
1287 fn expired_key_returns_none() {
1288 let mut ks = Keyspace::new();
1289 ks.set(
1290 "temp".into(),
1291 Bytes::from("gone"),
1292 Some(Duration::from_millis(10)),
1293 );
1294 thread::sleep(Duration::from_millis(30));
1296 assert_eq!(ks.get("temp").unwrap(), None);
1297 assert!(!ks.exists("temp"));
1299 }
1300
1301 #[test]
1302 fn ttl_no_expiry() {
1303 let mut ks = Keyspace::new();
1304 ks.set("key".into(), Bytes::from("val"), None);
1305 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
1306 }
1307
1308 #[test]
1309 fn ttl_not_found() {
1310 let mut ks = Keyspace::new();
1311 assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
1312 }
1313
1314 #[test]
1315 fn ttl_with_expiry() {
1316 let mut ks = Keyspace::new();
1317 ks.set(
1318 "key".into(),
1319 Bytes::from("val"),
1320 Some(Duration::from_secs(100)),
1321 );
1322 match ks.ttl("key") {
1323 TtlResult::Seconds(s) => assert!(s >= 98 && s <= 100),
1324 other => panic!("expected Seconds, got {other:?}"),
1325 }
1326 }
1327
1328 #[test]
1329 fn ttl_expired_key() {
1330 let mut ks = Keyspace::new();
1331 ks.set(
1332 "temp".into(),
1333 Bytes::from("val"),
1334 Some(Duration::from_millis(10)),
1335 );
1336 thread::sleep(Duration::from_millis(30));
1337 assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
1338 }
1339
1340 #[test]
1341 fn expire_existing_key() {
1342 let mut ks = Keyspace::new();
1343 ks.set("key".into(), Bytes::from("val"), None);
1344 assert!(ks.expire("key", 60));
1345 match ks.ttl("key") {
1346 TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
1347 other => panic!("expected Seconds, got {other:?}"),
1348 }
1349 }
1350
1351 #[test]
1352 fn expire_missing_key() {
1353 let mut ks = Keyspace::new();
1354 assert!(!ks.expire("nope", 60));
1355 }
1356
1357 #[test]
1358 fn del_expired_key_returns_false() {
1359 let mut ks = Keyspace::new();
1360 ks.set(
1361 "temp".into(),
1362 Bytes::from("val"),
1363 Some(Duration::from_millis(10)),
1364 );
1365 thread::sleep(Duration::from_millis(30));
1366 assert!(!ks.del("temp"));
1368 }
1369
1370 #[test]
1373 fn memory_increases_on_set() {
1374 let mut ks = Keyspace::new();
1375 assert_eq!(ks.stats().used_bytes, 0);
1376 ks.set("key".into(), Bytes::from("value"), None);
1377 assert!(ks.stats().used_bytes > 0);
1378 assert_eq!(ks.stats().key_count, 1);
1379 }
1380
1381 #[test]
1382 fn memory_decreases_on_del() {
1383 let mut ks = Keyspace::new();
1384 ks.set("key".into(), Bytes::from("value"), None);
1385 let after_set = ks.stats().used_bytes;
1386 ks.del("key");
1387 assert_eq!(ks.stats().used_bytes, 0);
1388 assert!(after_set > 0);
1389 }
1390
1391 #[test]
1392 fn memory_adjusts_on_overwrite() {
1393 let mut ks = Keyspace::new();
1394 ks.set("key".into(), Bytes::from("short"), None);
1395 let small = ks.stats().used_bytes;
1396
1397 ks.set("key".into(), Bytes::from("a much longer value"), None);
1398 let large = ks.stats().used_bytes;
1399
1400 assert!(large > small);
1401 assert_eq!(ks.stats().key_count, 1);
1402 }
1403
1404 #[test]
1405 fn memory_decreases_on_expired_removal() {
1406 let mut ks = Keyspace::new();
1407 ks.set(
1408 "temp".into(),
1409 Bytes::from("data"),
1410 Some(Duration::from_millis(10)),
1411 );
1412 assert!(ks.stats().used_bytes > 0);
1413 thread::sleep(Duration::from_millis(30));
1414 let _ = ks.get("temp");
1416 assert_eq!(ks.stats().used_bytes, 0);
1417 assert_eq!(ks.stats().key_count, 0);
1418 }
1419
1420 #[test]
1421 fn stats_tracks_expiry_count() {
1422 let mut ks = Keyspace::new();
1423 ks.set("a".into(), Bytes::from("1"), None);
1424 ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1425 ks.set("c".into(), Bytes::from("3"), Some(Duration::from_secs(200)));
1426
1427 let stats = ks.stats();
1428 assert_eq!(stats.key_count, 3);
1429 assert_eq!(stats.keys_with_expiry, 2);
1430 }
1431
1432 #[test]
1435 fn noeviction_returns_oom_when_full() {
1436 let config = ShardConfig {
1439 max_memory: Some(150),
1440 eviction_policy: EvictionPolicy::NoEviction,
1441 ..ShardConfig::default()
1442 };
1443 let mut ks = Keyspace::with_config(config);
1444
1445 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1447
1448 let result = ks.set("b".into(), Bytes::from("val"), None);
1450 assert_eq!(result, SetResult::OutOfMemory);
1451
1452 assert!(ks.exists("a"));
1454 }
1455
1456 #[test]
1457 fn lru_eviction_makes_room() {
1458 let config = ShardConfig {
1459 max_memory: Some(150),
1460 eviction_policy: EvictionPolicy::AllKeysLru,
1461 ..ShardConfig::default()
1462 };
1463 let mut ks = Keyspace::with_config(config);
1464
1465 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1466
1467 assert_eq!(ks.set("b".into(), Bytes::from("val"), None), SetResult::Ok);
1469
1470 assert!(!ks.exists("a"));
1472 assert!(ks.exists("b"));
1473 }
1474
1475 #[test]
1476 fn overwrite_same_size_succeeds_at_limit() {
1477 let config = ShardConfig {
1478 max_memory: Some(150),
1479 eviction_policy: EvictionPolicy::NoEviction,
1480 ..ShardConfig::default()
1481 };
1482 let mut ks = Keyspace::with_config(config);
1483
1484 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1485
1486 assert_eq!(ks.set("a".into(), Bytes::from("new"), None), SetResult::Ok);
1488 assert_eq!(
1489 ks.get("a").unwrap(),
1490 Some(Value::String(Bytes::from("new")))
1491 );
1492 }
1493
1494 #[test]
1495 fn overwrite_larger_value_respects_limit() {
1496 let config = ShardConfig {
1497 max_memory: Some(150),
1498 eviction_policy: EvictionPolicy::NoEviction,
1499 ..ShardConfig::default()
1500 };
1501 let mut ks = Keyspace::with_config(config);
1502
1503 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
1504
1505 let big_value = "x".repeat(200);
1507 let result = ks.set("a".into(), Bytes::from(big_value), None);
1508 assert_eq!(result, SetResult::OutOfMemory);
1509
1510 assert_eq!(
1512 ks.get("a").unwrap(),
1513 Some(Value::String(Bytes::from("val")))
1514 );
1515 }
1516
1517 #[test]
1520 fn iter_entries_returns_live_entries() {
1521 let mut ks = Keyspace::new();
1522 ks.set("a".into(), Bytes::from("1"), None);
1523 ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(100)));
1524
1525 let entries: Vec<_> = ks.iter_entries().collect();
1526 assert_eq!(entries.len(), 2);
1527 }
1528
1529 #[test]
1530 fn iter_entries_skips_expired() {
1531 let mut ks = Keyspace::new();
1532 ks.set(
1533 "dead".into(),
1534 Bytes::from("gone"),
1535 Some(Duration::from_millis(1)),
1536 );
1537 ks.set("alive".into(), Bytes::from("here"), None);
1538 thread::sleep(Duration::from_millis(10));
1539
1540 let entries: Vec<_> = ks.iter_entries().collect();
1541 assert_eq!(entries.len(), 1);
1542 assert_eq!(entries[0].0, "alive");
1543 }
1544
1545 #[test]
1546 fn iter_entries_ttl_for_no_expiry() {
1547 let mut ks = Keyspace::new();
1548 ks.set("permanent".into(), Bytes::from("val"), None);
1549
1550 let entries: Vec<_> = ks.iter_entries().collect();
1551 assert_eq!(entries[0].2, -1);
1552 }
1553
1554 #[test]
1557 fn restore_adds_entry() {
1558 let mut ks = Keyspace::new();
1559 ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
1560 assert_eq!(
1561 ks.get("restored").unwrap(),
1562 Some(Value::String(Bytes::from("data")))
1563 );
1564 assert_eq!(ks.stats().key_count, 1);
1565 }
1566
1567 #[test]
1568 fn restore_skips_past_deadline() {
1569 let mut ks = Keyspace::new();
1570 let past = Instant::now() - Duration::from_secs(1);
1572 ks.restore(
1573 "expired".into(),
1574 Value::String(Bytes::from("old")),
1575 Some(past),
1576 );
1577 assert!(ks.is_empty());
1578 }
1579
1580 #[test]
1581 fn restore_overwrites_existing() {
1582 let mut ks = Keyspace::new();
1583 ks.set("key".into(), Bytes::from("old"), None);
1584 ks.restore("key".into(), Value::String(Bytes::from("new")), None);
1585 assert_eq!(
1586 ks.get("key").unwrap(),
1587 Some(Value::String(Bytes::from("new")))
1588 );
1589 assert_eq!(ks.stats().key_count, 1);
1590 }
1591
1592 #[test]
1593 fn restore_bypasses_memory_limit() {
1594 let config = ShardConfig {
1595 max_memory: Some(50), eviction_policy: EvictionPolicy::NoEviction,
1597 ..ShardConfig::default()
1598 };
1599 let mut ks = Keyspace::with_config(config);
1600
1601 ks.restore(
1603 "big".into(),
1604 Value::String(Bytes::from("x".repeat(200))),
1605 None,
1606 );
1607 assert_eq!(ks.stats().key_count, 1);
1608 }
1609
1610 #[test]
1611 fn no_limit_never_rejects() {
1612 let mut ks = Keyspace::new();
1614 for i in 0..100 {
1615 assert_eq!(
1616 ks.set(format!("key:{i}"), Bytes::from("value"), None),
1617 SetResult::Ok
1618 );
1619 }
1620 assert_eq!(ks.len(), 100);
1621 }
1622
1623 #[test]
1626 fn lpush_creates_list() {
1627 let mut ks = Keyspace::new();
1628 let len = ks
1629 .lpush("list", &[Bytes::from("a"), Bytes::from("b")])
1630 .unwrap();
1631 assert_eq!(len, 2);
1632 let items = ks.lrange("list", 0, -1).unwrap();
1634 assert_eq!(items, vec![Bytes::from("b"), Bytes::from("a")]);
1635 }
1636
1637 #[test]
1638 fn rpush_creates_list() {
1639 let mut ks = Keyspace::new();
1640 let len = ks
1641 .rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1642 .unwrap();
1643 assert_eq!(len, 2);
1644 let items = ks.lrange("list", 0, -1).unwrap();
1645 assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
1646 }
1647
1648 #[test]
1649 fn push_to_existing_list() {
1650 let mut ks = Keyspace::new();
1651 ks.rpush("list", &[Bytes::from("a")]).unwrap();
1652 let len = ks.rpush("list", &[Bytes::from("b")]).unwrap();
1653 assert_eq!(len, 2);
1654 }
1655
1656 #[test]
1657 fn lpop_returns_front() {
1658 let mut ks = Keyspace::new();
1659 ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1660 .unwrap();
1661 assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("a")));
1662 assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("b")));
1663 assert_eq!(ks.lpop("list").unwrap(), None); }
1665
1666 #[test]
1667 fn rpop_returns_back() {
1668 let mut ks = Keyspace::new();
1669 ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1670 .unwrap();
1671 assert_eq!(ks.rpop("list").unwrap(), Some(Bytes::from("b")));
1672 }
1673
1674 #[test]
1675 fn pop_from_missing_key() {
1676 let mut ks = Keyspace::new();
1677 assert_eq!(ks.lpop("nope").unwrap(), None);
1678 assert_eq!(ks.rpop("nope").unwrap(), None);
1679 }
1680
1681 #[test]
1682 fn empty_list_auto_deletes_key() {
1683 let mut ks = Keyspace::new();
1684 ks.rpush("list", &[Bytes::from("only")]).unwrap();
1685 ks.lpop("list").unwrap();
1686 assert!(!ks.exists("list"));
1687 assert_eq!(ks.stats().key_count, 0);
1688 assert_eq!(ks.stats().used_bytes, 0);
1689 }
1690
1691 #[test]
1692 fn lrange_negative_indices() {
1693 let mut ks = Keyspace::new();
1694 ks.rpush(
1695 "list",
1696 &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
1697 )
1698 .unwrap();
1699 let items = ks.lrange("list", -2, -1).unwrap();
1701 assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
1702 }
1703
1704 #[test]
1705 fn lrange_out_of_bounds_clamps() {
1706 let mut ks = Keyspace::new();
1707 ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1708 .unwrap();
1709 let items = ks.lrange("list", -100, 100).unwrap();
1710 assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
1711 }
1712
1713 #[test]
1714 fn lrange_missing_key_returns_empty() {
1715 let mut ks = Keyspace::new();
1716 assert_eq!(ks.lrange("nope", 0, -1).unwrap(), Vec::<Bytes>::new());
1717 }
1718
1719 #[test]
1720 fn llen_returns_length() {
1721 let mut ks = Keyspace::new();
1722 assert_eq!(ks.llen("nope").unwrap(), 0);
1723 ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1724 .unwrap();
1725 assert_eq!(ks.llen("list").unwrap(), 2);
1726 }
1727
1728 #[test]
1729 fn list_memory_tracked_on_push_pop() {
1730 let mut ks = Keyspace::new();
1731 ks.rpush("list", &[Bytes::from("hello")]).unwrap();
1732 let after_push = ks.stats().used_bytes;
1733 assert!(after_push > 0);
1734
1735 ks.rpush("list", &[Bytes::from("world")]).unwrap();
1736 let after_second = ks.stats().used_bytes;
1737 assert!(after_second > after_push);
1738
1739 ks.lpop("list").unwrap();
1740 let after_pop = ks.stats().used_bytes;
1741 assert!(after_pop < after_second);
1742 }
1743
1744 #[test]
1745 fn lpush_on_string_key_returns_wrongtype() {
1746 let mut ks = Keyspace::new();
1747 ks.set("s".into(), Bytes::from("val"), None);
1748 assert!(ks.lpush("s", &[Bytes::from("nope")]).is_err());
1749 }
1750
1751 #[test]
1752 fn lrange_on_string_key_returns_wrongtype() {
1753 let mut ks = Keyspace::new();
1754 ks.set("s".into(), Bytes::from("val"), None);
1755 assert!(ks.lrange("s", 0, -1).is_err());
1756 }
1757
1758 #[test]
1759 fn llen_on_string_key_returns_wrongtype() {
1760 let mut ks = Keyspace::new();
1761 ks.set("s".into(), Bytes::from("val"), None);
1762 assert!(ks.llen("s").is_err());
1763 }
1764
1765 #[test]
1768 fn get_on_list_key_returns_wrongtype() {
1769 let mut ks = Keyspace::new();
1770 let mut list = std::collections::VecDeque::new();
1771 list.push_back(Bytes::from("item"));
1772 ks.restore("mylist".into(), Value::List(list), None);
1773
1774 assert!(ks.get("mylist").is_err());
1775 }
1776
1777 #[test]
1778 fn value_type_returns_correct_types() {
1779 let mut ks = Keyspace::new();
1780 assert_eq!(ks.value_type("missing"), "none");
1781
1782 ks.set("s".into(), Bytes::from("val"), None);
1783 assert_eq!(ks.value_type("s"), "string");
1784
1785 let mut list = std::collections::VecDeque::new();
1786 list.push_back(Bytes::from("item"));
1787 ks.restore("l".into(), Value::List(list), None);
1788 assert_eq!(ks.value_type("l"), "list");
1789
1790 ks.zadd("z", &[(1.0, "a".into())], &ZAddFlags::default())
1791 .unwrap();
1792 assert_eq!(ks.value_type("z"), "zset");
1793 }
1794
1795 #[test]
1798 fn zadd_creates_sorted_set() {
1799 let mut ks = Keyspace::new();
1800 let result = ks
1801 .zadd(
1802 "board",
1803 &[(100.0, "alice".into()), (200.0, "bob".into())],
1804 &ZAddFlags::default(),
1805 )
1806 .unwrap();
1807 assert_eq!(result.count, 2);
1808 assert_eq!(result.applied.len(), 2);
1809 assert_eq!(ks.value_type("board"), "zset");
1810 }
1811
1812 #[test]
1813 fn zadd_updates_existing_score() {
1814 let mut ks = Keyspace::new();
1815 ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1816 .unwrap();
1817 let result = ks
1819 .zadd("z", &[(200.0, "alice".into())], &ZAddFlags::default())
1820 .unwrap();
1821 assert_eq!(result.count, 0);
1822 assert_eq!(result.applied.len(), 1);
1824 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
1825 }
1826
1827 #[test]
1828 fn zadd_ch_flag_counts_changes() {
1829 let mut ks = Keyspace::new();
1830 ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1831 .unwrap();
1832 let flags = ZAddFlags {
1833 ch: true,
1834 ..Default::default()
1835 };
1836 let result = ks
1837 .zadd(
1838 "z",
1839 &[(200.0, "alice".into()), (50.0, "bob".into())],
1840 &flags,
1841 )
1842 .unwrap();
1843 assert_eq!(result.count, 2);
1845 assert_eq!(result.applied.len(), 2);
1846 }
1847
1848 #[test]
1849 fn zadd_nx_skips_existing() {
1850 let mut ks = Keyspace::new();
1851 ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1852 .unwrap();
1853 let flags = ZAddFlags {
1854 nx: true,
1855 ..Default::default()
1856 };
1857 let result = ks.zadd("z", &[(999.0, "alice".into())], &flags).unwrap();
1858 assert_eq!(result.count, 0);
1859 assert!(result.applied.is_empty());
1860 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1861 }
1862
1863 #[test]
1864 fn zadd_xx_skips_new() {
1865 let mut ks = Keyspace::new();
1866 let flags = ZAddFlags {
1867 xx: true,
1868 ..Default::default()
1869 };
1870 let result = ks.zadd("z", &[(100.0, "alice".into())], &flags).unwrap();
1871 assert_eq!(result.count, 0);
1872 assert!(result.applied.is_empty());
1873 assert_eq!(ks.value_type("z"), "none");
1875 }
1876
1877 #[test]
1878 fn zadd_gt_only_increases() {
1879 let mut ks = Keyspace::new();
1880 ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1881 .unwrap();
1882 let flags = ZAddFlags {
1883 gt: true,
1884 ..Default::default()
1885 };
1886 ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
1887 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1888 ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
1889 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
1890 }
1891
1892 #[test]
1893 fn zadd_lt_only_decreases() {
1894 let mut ks = Keyspace::new();
1895 ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
1896 .unwrap();
1897 let flags = ZAddFlags {
1898 lt: true,
1899 ..Default::default()
1900 };
1901 ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
1902 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
1903 ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
1904 assert_eq!(ks.zscore("z", "alice").unwrap(), Some(50.0));
1905 }
1906
1907 #[test]
1908 fn zrem_removes_members() {
1909 let mut ks = Keyspace::new();
1910 ks.zadd(
1911 "z",
1912 &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1913 &ZAddFlags::default(),
1914 )
1915 .unwrap();
1916 let removed = ks
1917 .zrem("z", &["a".into(), "c".into(), "nonexistent".into()])
1918 .unwrap();
1919 assert_eq!(removed.len(), 2);
1920 assert!(removed.contains(&"a".to_owned()));
1921 assert!(removed.contains(&"c".to_owned()));
1922 assert_eq!(ks.zscore("z", "a").unwrap(), None);
1923 assert_eq!(ks.zscore("z", "b").unwrap(), Some(2.0));
1924 }
1925
1926 #[test]
1927 fn zrem_auto_deletes_empty() {
1928 let mut ks = Keyspace::new();
1929 ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
1930 .unwrap();
1931 ks.zrem("z", &["only".into()]).unwrap();
1932 assert!(!ks.exists("z"));
1933 assert_eq!(ks.stats().key_count, 0);
1934 }
1935
1936 #[test]
1937 fn zrem_missing_key() {
1938 let mut ks = Keyspace::new();
1939 assert!(ks.zrem("nope", &["a".into()]).unwrap().is_empty());
1940 }
1941
1942 #[test]
1943 fn zscore_returns_score() {
1944 let mut ks = Keyspace::new();
1945 ks.zadd("z", &[(42.5, "member".into())], &ZAddFlags::default())
1946 .unwrap();
1947 assert_eq!(ks.zscore("z", "member").unwrap(), Some(42.5));
1948 assert_eq!(ks.zscore("z", "missing").unwrap(), None);
1949 }
1950
1951 #[test]
1952 fn zscore_missing_key() {
1953 let mut ks = Keyspace::new();
1954 assert_eq!(ks.zscore("nope", "m").unwrap(), None);
1955 }
1956
1957 #[test]
1958 fn zrank_returns_rank() {
1959 let mut ks = Keyspace::new();
1960 ks.zadd(
1961 "z",
1962 &[
1963 (300.0, "c".into()),
1964 (100.0, "a".into()),
1965 (200.0, "b".into()),
1966 ],
1967 &ZAddFlags::default(),
1968 )
1969 .unwrap();
1970 assert_eq!(ks.zrank("z", "a").unwrap(), Some(0));
1971 assert_eq!(ks.zrank("z", "b").unwrap(), Some(1));
1972 assert_eq!(ks.zrank("z", "c").unwrap(), Some(2));
1973 assert_eq!(ks.zrank("z", "d").unwrap(), None);
1974 }
1975
1976 #[test]
1977 fn zrange_returns_range() {
1978 let mut ks = Keyspace::new();
1979 ks.zadd(
1980 "z",
1981 &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1982 &ZAddFlags::default(),
1983 )
1984 .unwrap();
1985
1986 let all = ks.zrange("z", 0, -1).unwrap();
1987 assert_eq!(
1988 all,
1989 vec![
1990 ("a".to_owned(), 1.0),
1991 ("b".to_owned(), 2.0),
1992 ("c".to_owned(), 3.0),
1993 ]
1994 );
1995
1996 let middle = ks.zrange("z", 1, 1).unwrap();
1997 assert_eq!(middle, vec![("b".to_owned(), 2.0)]);
1998
1999 let last_two = ks.zrange("z", -2, -1).unwrap();
2000 assert_eq!(last_two, vec![("b".to_owned(), 2.0), ("c".to_owned(), 3.0)]);
2001 }
2002
2003 #[test]
2004 fn zrange_missing_key() {
2005 let mut ks = Keyspace::new();
2006 assert!(ks.zrange("nope", 0, -1).unwrap().is_empty());
2007 }
2008
2009 #[test]
2010 fn zadd_on_string_key_returns_wrongtype() {
2011 let mut ks = Keyspace::new();
2012 ks.set("s".into(), Bytes::from("val"), None);
2013 assert!(ks
2014 .zadd("s", &[(1.0, "m".into())], &ZAddFlags::default())
2015 .is_err());
2016 }
2017
2018 #[test]
2019 fn zrem_on_string_key_returns_wrongtype() {
2020 let mut ks = Keyspace::new();
2021 ks.set("s".into(), Bytes::from("val"), None);
2022 assert!(ks.zrem("s", &["m".into()]).is_err());
2023 }
2024
2025 #[test]
2026 fn zscore_on_list_key_returns_wrongtype() {
2027 let mut ks = Keyspace::new();
2028 ks.rpush("l", &[Bytes::from("item")]).unwrap();
2029 assert!(ks.zscore("l", "m").is_err());
2030 }
2031
2032 #[test]
2033 fn zrank_on_string_key_returns_wrongtype() {
2034 let mut ks = Keyspace::new();
2035 ks.set("s".into(), Bytes::from("val"), None);
2036 assert!(ks.zrank("s", "m").is_err());
2037 }
2038
2039 #[test]
2040 fn zrange_on_string_key_returns_wrongtype() {
2041 let mut ks = Keyspace::new();
2042 ks.set("s".into(), Bytes::from("val"), None);
2043 assert!(ks.zrange("s", 0, -1).is_err());
2044 }
2045
2046 #[test]
2047 fn sorted_set_memory_tracked() {
2048 let mut ks = Keyspace::new();
2049 let before = ks.stats().used_bytes;
2050 ks.zadd("z", &[(1.0, "alice".into())], &ZAddFlags::default())
2051 .unwrap();
2052 let after_add = ks.stats().used_bytes;
2053 assert!(after_add > before);
2054
2055 ks.zadd("z", &[(2.0, "bob".into())], &ZAddFlags::default())
2056 .unwrap();
2057 let after_second = ks.stats().used_bytes;
2058 assert!(after_second > after_add);
2059
2060 ks.zrem("z", &["alice".into()]).unwrap();
2061 let after_remove = ks.stats().used_bytes;
2062 assert!(after_remove < after_second);
2063 }
2064
2065 #[test]
2066 fn incr_new_key_defaults_to_zero() {
2067 let mut ks = Keyspace::new();
2068 assert_eq!(ks.incr("counter").unwrap(), 1);
2069 match ks.get("counter").unwrap() {
2071 Some(Value::String(data)) => assert_eq!(data, Bytes::from("1")),
2072 other => panic!("expected String(\"1\"), got {other:?}"),
2073 }
2074 }
2075
2076 #[test]
2077 fn incr_existing_value() {
2078 let mut ks = Keyspace::new();
2079 ks.set("n".into(), Bytes::from("10"), None);
2080 assert_eq!(ks.incr("n").unwrap(), 11);
2081 }
2082
2083 #[test]
2084 fn decr_new_key_defaults_to_zero() {
2085 let mut ks = Keyspace::new();
2086 assert_eq!(ks.decr("counter").unwrap(), -1);
2087 }
2088
2089 #[test]
2090 fn decr_existing_value() {
2091 let mut ks = Keyspace::new();
2092 ks.set("n".into(), Bytes::from("10"), None);
2093 assert_eq!(ks.decr("n").unwrap(), 9);
2094 }
2095
2096 #[test]
2097 fn incr_non_integer_returns_error() {
2098 let mut ks = Keyspace::new();
2099 ks.set("s".into(), Bytes::from("notanum"), None);
2100 assert_eq!(ks.incr("s").unwrap_err(), IncrError::NotAnInteger);
2101 }
2102
2103 #[test]
2104 fn incr_on_list_returns_wrongtype() {
2105 let mut ks = Keyspace::new();
2106 ks.lpush("list", &[Bytes::from("a")]).unwrap();
2107 assert_eq!(ks.incr("list").unwrap_err(), IncrError::WrongType);
2108 }
2109
2110 #[test]
2111 fn incr_overflow_returns_error() {
2112 let mut ks = Keyspace::new();
2113 ks.set("max".into(), Bytes::from(i64::MAX.to_string()), None);
2114 assert_eq!(ks.incr("max").unwrap_err(), IncrError::Overflow);
2115 }
2116
2117 #[test]
2118 fn decr_overflow_returns_error() {
2119 let mut ks = Keyspace::new();
2120 ks.set("min".into(), Bytes::from(i64::MIN.to_string()), None);
2121 assert_eq!(ks.decr("min").unwrap_err(), IncrError::Overflow);
2122 }
2123
2124 #[test]
2125 fn incr_preserves_ttl() {
2126 let mut ks = Keyspace::new();
2127 ks.set("n".into(), Bytes::from("5"), Some(Duration::from_secs(60)));
2128 ks.incr("n").unwrap();
2129 match ks.ttl("n") {
2130 TtlResult::Seconds(s) => assert!(s >= 58 && s <= 60),
2131 other => panic!("expected TTL preserved, got {other:?}"),
2132 }
2133 }
2134
2135 #[test]
2136 fn zrem_returns_actually_removed_members() {
2137 let mut ks = Keyspace::new();
2138 ks.zadd(
2139 "z",
2140 &[(1.0, "a".into()), (2.0, "b".into())],
2141 &ZAddFlags::default(),
2142 )
2143 .unwrap();
2144 let removed = ks.zrem("z", &["a".into(), "ghost".into()]).unwrap();
2146 assert_eq!(removed, vec!["a".to_owned()]);
2147 }
2148
2149 #[test]
2150 fn zcard_returns_count() {
2151 let mut ks = Keyspace::new();
2152 ks.zadd(
2153 "z",
2154 &[(1.0, "a".into()), (2.0, "b".into())],
2155 &ZAddFlags::default(),
2156 )
2157 .unwrap();
2158 assert_eq!(ks.zcard("z").unwrap(), 2);
2159 }
2160
2161 #[test]
2162 fn zcard_missing_key_returns_zero() {
2163 let mut ks = Keyspace::new();
2164 assert_eq!(ks.zcard("missing").unwrap(), 0);
2165 }
2166
2167 #[test]
2168 fn zcard_on_string_key_returns_wrongtype() {
2169 let mut ks = Keyspace::new();
2170 ks.set("s".into(), Bytes::from("val"), None);
2171 assert!(ks.zcard("s").is_err());
2172 }
2173
2174 #[test]
2175 fn persist_removes_expiry() {
2176 let mut ks = Keyspace::new();
2177 ks.set(
2178 "key".into(),
2179 Bytes::from("val"),
2180 Some(Duration::from_secs(60)),
2181 );
2182 assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
2183
2184 assert!(ks.persist("key"));
2185 assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
2186 assert_eq!(ks.stats().keys_with_expiry, 0);
2187 }
2188
2189 #[test]
2190 fn persist_returns_false_without_expiry() {
2191 let mut ks = Keyspace::new();
2192 ks.set("key".into(), Bytes::from("val"), None);
2193 assert!(!ks.persist("key"));
2194 }
2195
2196 #[test]
2197 fn persist_returns_false_for_missing_key() {
2198 let mut ks = Keyspace::new();
2199 assert!(!ks.persist("missing"));
2200 }
2201
2202 #[test]
2203 fn pttl_returns_milliseconds() {
2204 let mut ks = Keyspace::new();
2205 ks.set(
2206 "key".into(),
2207 Bytes::from("val"),
2208 Some(Duration::from_secs(60)),
2209 );
2210 match ks.pttl("key") {
2211 TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
2212 other => panic!("expected Milliseconds, got {other:?}"),
2213 }
2214 }
2215
2216 #[test]
2217 fn pttl_no_expiry() {
2218 let mut ks = Keyspace::new();
2219 ks.set("key".into(), Bytes::from("val"), None);
2220 assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
2221 }
2222
2223 #[test]
2224 fn pttl_not_found() {
2225 let mut ks = Keyspace::new();
2226 assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
2227 }
2228
2229 #[test]
2230 fn pexpire_sets_ttl_in_millis() {
2231 let mut ks = Keyspace::new();
2232 ks.set("key".into(), Bytes::from("val"), None);
2233 assert!(ks.pexpire("key", 5000));
2234 match ks.pttl("key") {
2235 TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
2236 other => panic!("expected Milliseconds, got {other:?}"),
2237 }
2238 assert_eq!(ks.stats().keys_with_expiry, 1);
2239 }
2240
2241 #[test]
2242 fn pexpire_missing_key_returns_false() {
2243 let mut ks = Keyspace::new();
2244 assert!(!ks.pexpire("missing", 5000));
2245 }
2246
2247 #[test]
2248 fn pexpire_overwrites_existing_ttl() {
2249 let mut ks = Keyspace::new();
2250 ks.set(
2251 "key".into(),
2252 Bytes::from("val"),
2253 Some(Duration::from_secs(60)),
2254 );
2255 assert!(ks.pexpire("key", 500));
2256 match ks.pttl("key") {
2257 TtlResult::Milliseconds(ms) => assert!(ms <= 500),
2258 other => panic!("expected Milliseconds, got {other:?}"),
2259 }
2260 assert_eq!(ks.stats().keys_with_expiry, 1);
2262 }
2263
2264 #[test]
2267 fn lpush_rejects_when_memory_full() {
2268 let config = ShardConfig {
2269 max_memory: Some(150),
2270 eviction_policy: EvictionPolicy::NoEviction,
2271 ..ShardConfig::default()
2272 };
2273 let mut ks = Keyspace::with_config(config);
2274
2275 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2277
2278 let result = ks.lpush("list", &[Bytes::from("big-value-here")]);
2280 assert_eq!(result, Err(WriteError::OutOfMemory));
2281
2282 assert!(ks.exists("a"));
2284 }
2285
2286 #[test]
2287 fn rpush_rejects_when_memory_full() {
2288 let config = ShardConfig {
2289 max_memory: Some(150),
2290 eviction_policy: EvictionPolicy::NoEviction,
2291 ..ShardConfig::default()
2292 };
2293 let mut ks = Keyspace::with_config(config);
2294
2295 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2296
2297 let result = ks.rpush("list", &[Bytes::from("big-value-here")]);
2298 assert_eq!(result, Err(WriteError::OutOfMemory));
2299 }
2300
2301 #[test]
2302 fn zadd_rejects_when_memory_full() {
2303 let config = ShardConfig {
2304 max_memory: Some(150),
2305 eviction_policy: EvictionPolicy::NoEviction,
2306 ..ShardConfig::default()
2307 };
2308 let mut ks = Keyspace::with_config(config);
2309
2310 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2311
2312 let result = ks.zadd("z", &[(1.0, "member".into())], &ZAddFlags::default());
2313 assert!(matches!(result, Err(WriteError::OutOfMemory)));
2314
2315 assert!(ks.exists("a"));
2317 }
2318
2319 #[test]
2320 fn lpush_evicts_under_lru_policy() {
2321 let config = ShardConfig {
2322 max_memory: Some(200),
2323 eviction_policy: EvictionPolicy::AllKeysLru,
2324 ..ShardConfig::default()
2325 };
2326 let mut ks = Keyspace::with_config(config);
2327
2328 assert_eq!(ks.set("a".into(), Bytes::from("val"), None), SetResult::Ok);
2329
2330 assert!(ks.lpush("list", &[Bytes::from("item")]).is_ok());
2332 assert!(!ks.exists("a"));
2333 }
2334
2335 #[test]
2336 fn clear_removes_all_keys() {
2337 let mut ks = Keyspace::new();
2338 ks.set("a".into(), Bytes::from("1"), None);
2339 ks.set("b".into(), Bytes::from("2"), Some(Duration::from_secs(60)));
2340 ks.lpush("list", &[Bytes::from("x")]).unwrap();
2341
2342 assert_eq!(ks.len(), 3);
2343 assert!(ks.stats().used_bytes > 0);
2344 assert_eq!(ks.stats().keys_with_expiry, 1);
2345
2346 ks.clear();
2347
2348 assert_eq!(ks.len(), 0);
2349 assert!(ks.is_empty());
2350 assert_eq!(ks.stats().used_bytes, 0);
2351 assert_eq!(ks.stats().keys_with_expiry, 0);
2352 }
2353
2354 #[test]
2357 fn scan_returns_keys() {
2358 let mut ks = Keyspace::new();
2359 ks.set("key1".into(), Bytes::from("a"), None);
2360 ks.set("key2".into(), Bytes::from("b"), None);
2361 ks.set("key3".into(), Bytes::from("c"), None);
2362
2363 let (cursor, keys) = ks.scan_keys(0, 10, None);
2364 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
2366 }
2367
2368 #[test]
2369 fn scan_empty_keyspace() {
2370 let ks = Keyspace::new();
2371 let (cursor, keys) = ks.scan_keys(0, 10, None);
2372 assert_eq!(cursor, 0);
2373 assert!(keys.is_empty());
2374 }
2375
2376 #[test]
2377 fn scan_with_pattern() {
2378 let mut ks = Keyspace::new();
2379 ks.set("user:1".into(), Bytes::from("a"), None);
2380 ks.set("user:2".into(), Bytes::from("b"), None);
2381 ks.set("item:1".into(), Bytes::from("c"), None);
2382
2383 let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
2384 assert_eq!(cursor, 0);
2385 assert_eq!(keys.len(), 2);
2386 for k in &keys {
2387 assert!(k.starts_with("user:"));
2388 }
2389 }
2390
2391 #[test]
2392 fn scan_with_count_limit() {
2393 let mut ks = Keyspace::new();
2394 for i in 0..10 {
2395 ks.set(format!("k{i}"), Bytes::from("v"), None);
2396 }
2397
2398 let (cursor, keys) = ks.scan_keys(0, 3, None);
2400 assert!(!keys.is_empty());
2401 assert!(keys.len() <= 3);
2402
2403 if cursor != 0 {
2405 let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
2406 assert!(!keys2.is_empty());
2407 let _ = (cursor2, keys2);
2409 }
2410 }
2411
2412 #[test]
2413 fn scan_skips_expired_keys() {
2414 let mut ks = Keyspace::new();
2415 ks.set("live".into(), Bytes::from("a"), None);
2416 ks.set(
2417 "expired".into(),
2418 Bytes::from("b"),
2419 Some(Duration::from_millis(1)),
2420 );
2421
2422 std::thread::sleep(Duration::from_millis(5));
2423
2424 let (_, keys) = ks.scan_keys(0, 10, None);
2425 assert_eq!(keys.len(), 1);
2426 assert_eq!(keys[0], "live");
2427 }
2428
2429 #[test]
2430 fn glob_match_star() {
2431 assert!(super::glob_match("user:*", "user:123"));
2432 assert!(super::glob_match("user:*", "user:"));
2433 assert!(super::glob_match("*:data", "foo:data"));
2434 assert!(!super::glob_match("user:*", "item:123"));
2435 }
2436
2437 #[test]
2438 fn glob_match_question() {
2439 assert!(super::glob_match("key?", "key1"));
2440 assert!(super::glob_match("key?", "keya"));
2441 assert!(!super::glob_match("key?", "key"));
2442 assert!(!super::glob_match("key?", "key12"));
2443 }
2444
2445 #[test]
2446 fn glob_match_brackets() {
2447 assert!(super::glob_match("key[abc]", "keya"));
2448 assert!(super::glob_match("key[abc]", "keyb"));
2449 assert!(!super::glob_match("key[abc]", "keyd"));
2450 }
2451
2452 #[test]
2453 fn glob_match_literal() {
2454 assert!(super::glob_match("exact", "exact"));
2455 assert!(!super::glob_match("exact", "exactnot"));
2456 assert!(!super::glob_match("exact", "notexact"));
2457 }
2458}