1use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
16use std::time::Duration;
17
18use ahash::AHashMap;
19
20use crate::command::{
21 command_docs_response, command_help_response, command_info_response, command_list_response,
22 supported_command_count, BitFieldEncoding, BitFieldOffset, BitFieldOperation, BitFieldOverflow,
23 BitOperation, Command, CommandResponse, GeoUnit,
24};
25use crate::types::{
26 CompactKey, KeyEntry, PendingEntry, StreamConsumerGroup, StreamEntry, StreamId, StreamLog,
27 Value,
28};
29
30mod hash_commands;
31mod list_commands;
32
33#[cfg(feature = "observability")]
34use kora_observability::stats::ShardStats;
35
36#[cfg(feature = "vector")]
37use kora_vector::distance::DistanceMetric;
38#[cfg(feature = "vector")]
39use kora_vector::hnsw::HnswIndex;
40
41pub struct ShardStore {
46 entries: AHashMap<CompactKey, KeyEntry>,
47 shard_id: u16,
48 max_memory: usize,
49 memory_used: usize,
50 eviction_counter: u64,
51 expire_scan_cursor: usize,
52 #[cfg(feature = "observability")]
53 stats: ShardStats,
54 #[cfg(feature = "observability")]
55 stats_enabled: bool,
56 #[cfg(feature = "vector")]
57 vector_indexes: AHashMap<CompactKey, HnswIndex>,
58 stream_groups: AHashMap<CompactKey, AHashMap<String, StreamConsumerGroup>>,
59}
60
61impl ShardStore {
62 pub fn new(shard_id: u16) -> Self {
64 Self {
65 entries: AHashMap::new(),
66 shard_id,
67 max_memory: 0,
68 memory_used: 0,
69 eviction_counter: 0,
70 expire_scan_cursor: 0,
71 #[cfg(feature = "observability")]
72 stats: ShardStats::new(),
73 #[cfg(feature = "observability")]
74 stats_enabled: false,
75 #[cfg(feature = "vector")]
76 vector_indexes: AHashMap::new(),
77 stream_groups: AHashMap::new(),
78 }
79 }
80
81 #[cfg(feature = "observability")]
83 pub fn stats(&self) -> &ShardStats {
84 &self.stats
85 }
86
87 #[cfg(feature = "observability")]
89 pub fn set_stats_enabled(&mut self, enabled: bool) {
90 self.stats_enabled = enabled;
91 }
92
93 pub fn shard_id(&self) -> u16 {
95 self.shard_id
96 }
97
98 pub fn len(&self) -> usize {
100 self.entries.len()
101 }
102
103 pub fn is_empty(&self) -> bool {
105 self.entries.is_empty()
106 }
107
108 pub fn set_max_memory(&mut self, bytes: usize) {
110 self.max_memory = bytes;
111 }
112
113 pub fn max_memory(&self) -> usize {
115 self.max_memory
116 }
117
118 pub fn entries_iter(&self) -> impl Iterator<Item = (&CompactKey, &KeyEntry)> {
120 self.entries.iter()
121 }
122
123 pub fn get_entry(&self, key: &CompactKey) -> Option<&KeyEntry> {
125 self.entries.get(key)
126 }
127
128 pub fn get_bytes(&mut self, key: &[u8]) -> CommandResponse {
130 self.cmd_get(key)
131 }
132
133 pub fn set_bytes(
135 &mut self,
136 key: &[u8],
137 value: &[u8],
138 ex: Option<u64>,
139 px: Option<u64>,
140 nx: bool,
141 xx: bool,
142 ) -> CommandResponse {
143 self.cmd_set(key, value, ex, px, nx, xx)
144 }
145
146 pub fn incr_by_bytes(&mut self, key: &[u8], delta: i64) -> CommandResponse {
148 self.cmd_incrby(key, delta)
149 }
150
151 pub fn get_entry_mut(&mut self, key: &CompactKey) -> Option<&mut KeyEntry> {
153 self.entries.get_mut(key)
154 }
155
156 pub fn insert_entry(&mut self, key: CompactKey, entry: KeyEntry) {
158 let size = Self::estimate_key_entry_size(key.as_bytes(), &entry.value);
159 self.memory_used += size;
160 self.entries.insert(key, entry);
161 }
162
163 pub fn mark_demoted(&mut self, key: &CompactKey, tier: u8, ref_hash: u64) {
165 if let Some(entry) = self.entries.get_mut(key) {
166 let old_size = entry.value.estimated_size();
167 entry.value = match tier {
168 crate::types::TIER_WARM => Value::WarmRef(ref_hash),
169 _ => Value::ColdRef(ref_hash),
170 };
171 let new_size = entry.value.estimated_size();
172 if old_size > new_size {
173 self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
174 }
175 entry.set_tier(tier);
176 }
177 }
178
179 pub fn promote(&mut self, key: &CompactKey, value: Value) {
181 if let Some(entry) = self.entries.get_mut(key) {
182 let old_size = entry.value.estimated_size();
183 let new_size = value.estimated_size();
184 entry.value = value;
185 entry.set_tier(crate::types::TIER_HOT);
186 entry.lfu_counter = 5;
187 if new_size > old_size {
188 let delta = new_size - old_size;
189 self.memory_used += delta;
190 } else {
191 let delta = old_size - new_size;
192 self.memory_used = self.memory_used.saturating_sub(delta);
193 }
194 }
195 }
196
197 pub fn evict_expired(&mut self) -> usize {
199 let expired_keys: Vec<CompactKey> = self
200 .entries
201 .iter()
202 .filter(|(_, entry)| entry.is_expired())
203 .map(|(key, _)| key.clone())
204 .collect();
205 for key in &expired_keys {
206 let _ = self.remove_compact_entry(key);
207 }
208 expired_keys.len()
209 }
210
211 pub fn evict_expired_sample(&mut self, sample_size: usize) -> usize {
215 if sample_size == 0 || self.entries.is_empty() {
216 return 0;
217 }
218
219 let len = self.entries.len();
220 let start = self.expire_scan_cursor % len;
221 let mut examined = 0usize;
222 let mut expired_keys = Vec::new();
223
224 for (key, entry) in self.entries.iter().skip(start) {
225 if examined >= sample_size {
226 break;
227 }
228 if entry.is_expired() {
229 expired_keys.push(key.clone());
230 }
231 examined += 1;
232 }
233
234 if examined < sample_size {
235 for (key, entry) in self.entries.iter().take(start) {
236 if examined >= sample_size {
237 break;
238 }
239 if entry.is_expired() {
240 expired_keys.push(key.clone());
241 }
242 examined += 1;
243 }
244 }
245
246 for key in &expired_keys {
247 let _ = self.remove_compact_entry(key);
248 }
249 self.expire_scan_cursor = self.expire_scan_cursor.wrapping_add(examined);
250
251 expired_keys.len()
252 }
253
254 pub fn flush(&mut self) {
256 self.entries.clear();
257 self.memory_used = 0;
258 self.expire_scan_cursor = 0;
259 self.stream_groups.clear();
260 }
261
262 pub fn execute(&mut self, cmd: Command) -> CommandResponse {
264 #[cfg(feature = "observability")]
265 let track_stats = self.stats_enabled;
266 #[cfg(feature = "observability")]
267 let start = if track_stats {
268 Some(std::time::Instant::now())
269 } else {
270 None
271 };
272 #[cfg(feature = "observability")]
273 let cmd_type = if track_stats {
274 Some(cmd.cmd_type() as usize)
275 } else {
276 None
277 };
278
279 if cmd.is_mutation() {
280 self.maybe_evict();
281 }
282
283 let should_touch_lfu = !cmd.is_mutation() && self.max_memory > 0;
285 let cmd_key = if should_touch_lfu {
286 cmd.key().map(CompactKey::new)
287 } else {
288 None
289 };
290
291 let response = self.execute_inner(cmd);
292
293 if let Some(key) = cmd_key {
294 self.touch_key(&key);
295 }
296
297 #[cfg(feature = "observability")]
298 {
299 if let (Some(start), Some(cmd_type)) = (start, cmd_type) {
300 let duration_ns = start.elapsed().as_nanos() as u64;
301 self.stats.record_command(cmd_type, duration_ns);
302 self.stats.set_key_count(self.entries.len() as u64);
303 }
304 }
305
306 response
307 }
308
309 fn execute_inner(&mut self, cmd: Command) -> CommandResponse {
310 match cmd {
311 Command::Get { key } => self.cmd_get(&key),
312 Command::Set {
313 key,
314 value,
315 ex,
316 px,
317 nx,
318 xx,
319 } => self.cmd_set(&key, &value, ex, px, nx, xx),
320 Command::GetSet { key, value } => self.cmd_getset(&key, &value),
321 Command::Append { key, value } => self.cmd_append(&key, &value),
322 Command::Strlen { key } => self.cmd_strlen(&key),
323 Command::Incr { key } => self.cmd_incrby(&key, 1),
324 Command::Decr { key } => self.cmd_incrby(&key, -1),
325 Command::IncrBy { key, delta } => self.cmd_incrby(&key, delta),
326 Command::DecrBy { key, delta } => self.cmd_incrby(&key, -delta),
327 Command::SetNx { key, value } => {
328 match self.cmd_set(&key, &value, None, None, true, false) {
329 CommandResponse::Ok => CommandResponse::Integer(1),
330 CommandResponse::Nil => CommandResponse::Integer(0),
331 other => other,
332 }
333 }
334 Command::IncrByFloat { key, delta } => self.cmd_incrbyfloat(&key, delta),
335 Command::GetRange { key, start, end } => self.cmd_getrange(&key, start, end),
336 Command::SetRange { key, offset, value } => self.cmd_setrange(&key, offset, &value),
337 Command::GetDel { key } => self.cmd_getdel(&key),
338 Command::GetEx {
339 key,
340 ex,
341 px,
342 exat,
343 pxat,
344 persist,
345 } => self.cmd_getex(&key, ex, px, exat, pxat, persist),
346 Command::MSetNx { entries } => self.cmd_msetnx(&entries),
347
348 Command::Del { keys } => {
349 let count = keys.iter().filter(|k| self.del(k)).count();
350 CommandResponse::Integer(count as i64)
351 }
352 Command::Exists { keys } => {
353 let count = keys.iter().filter(|k| self.exists(k)).count();
354 CommandResponse::Integer(count as i64)
355 }
356 Command::Expire { key, seconds } => self.cmd_expire(&key, Duration::from_secs(seconds)),
357 Command::PExpire { key, millis } => {
358 self.cmd_expire(&key, Duration::from_millis(millis))
359 }
360 Command::Persist { key } => self.cmd_persist(&key),
361 Command::Ttl { key } => self.cmd_ttl(&key, false),
362 Command::PTtl { key } => self.cmd_ttl(&key, true),
363 Command::Type { key } => self.cmd_type(&key),
364 Command::Keys { pattern } => self.cmd_keys(&pattern),
365 Command::Scan {
366 cursor,
367 pattern,
368 count,
369 } => self.cmd_scan(cursor, pattern.as_deref(), count.unwrap_or(10)),
370 Command::ExpireAt { key, timestamp } => self.cmd_expireat(&key, timestamp, false),
371 Command::PExpireAt { key, timestamp_ms } => self.cmd_expireat(&key, timestamp_ms, true),
372 Command::Rename { key, newkey } => self.cmd_rename(&key, &newkey, false),
373 Command::RenameNx { key, newkey } => self.cmd_rename(&key, &newkey, true),
374 Command::Unlink { keys } => {
375 let count = keys.iter().filter(|k| self.del(k)).count();
376 CommandResponse::Integer(count as i64)
377 }
378 Command::Copy {
379 source,
380 destination,
381 replace,
382 } => self.cmd_copy(&source, &destination, replace),
383 Command::RandomKey => self.cmd_randomkey(),
384 Command::Touch { keys } => {
385 let count = keys.iter().filter(|k| self.exists(k)).count();
386 CommandResponse::Integer(count as i64)
387 }
388 Command::ObjectRefCount { ref key } => {
389 let compact = CompactKey::new(key);
390 match self.entries.get(&compact) {
391 Some(entry) if !entry.is_expired() => CommandResponse::Integer(1),
392 _ => CommandResponse::Nil,
393 }
394 }
395 Command::ObjectIdleTime { ref key } => {
396 let compact = CompactKey::new(key);
397 match self.entries.get(&compact) {
398 Some(entry) if !entry.is_expired() => CommandResponse::Integer(0),
399 _ => CommandResponse::Nil,
400 }
401 }
402 Command::ObjectHelp => CommandResponse::Array(vec![
403 CommandResponse::BulkString(b"OBJECT subcommand [arguments]".to_vec()),
404 CommandResponse::BulkString(
405 b"ENCODING <key> - Return the encoding of a key.".to_vec(),
406 ),
407 CommandResponse::BulkString(
408 b"FREQ <key> - Return the access frequency of a key.".to_vec(),
409 ),
410 CommandResponse::BulkString(b"HELP - Return this help message.".to_vec()),
411 CommandResponse::BulkString(
412 b"IDLETIME <key> - Return the idle time of a key.".to_vec(),
413 ),
414 CommandResponse::BulkString(
415 b"REFCOUNT <key> - Return the reference count of a key.".to_vec(),
416 ),
417 ]),
418 Command::DbSize => CommandResponse::Integer(self.entries.len() as i64),
419 Command::FlushDb | Command::FlushAll => {
420 self.flush();
421 CommandResponse::Ok
422 }
423
424 Command::LPush { key, values } => self.cmd_lpush(&key, &values),
425 Command::RPush { key, values } => self.cmd_rpush(&key, &values),
426 Command::LPop { key } => self.cmd_lpop(&key),
427 Command::RPop { key } => self.cmd_rpop(&key),
428 Command::LLen { key } => self.cmd_llen(&key),
429 Command::LRange { key, start, stop } => self.cmd_lrange(&key, start, stop),
430 Command::LIndex { key, index } => self.cmd_lindex(&key, index),
431 Command::LSet { key, index, value } => self.cmd_lset(&key, index, &value),
432 Command::LInsert {
433 key,
434 before,
435 pivot,
436 value,
437 } => self.cmd_linsert(&key, before, &pivot, &value),
438 Command::LRem { key, count, value } => self.cmd_lrem(&key, count, &value),
439 Command::LTrim { key, start, stop } => self.cmd_ltrim(&key, start, stop),
440 Command::LPos {
441 key,
442 value,
443 rank,
444 count,
445 maxlen,
446 } => self.cmd_lpos(&key, &value, rank, count, maxlen),
447 Command::RPopLPush {
448 source,
449 destination,
450 } => self.cmd_rpoplpush(&source, &destination),
451 Command::LMove {
452 source,
453 destination,
454 from_left,
455 to_left,
456 } => self.cmd_lmove(&source, &destination, from_left, to_left),
457
458 Command::HSet { key, fields } => self.cmd_hset(&key, &fields),
459 Command::HGet { key, field } => self.cmd_hget(&key, &field),
460 Command::HDel { key, fields } => self.cmd_hdel(&key, &fields),
461 Command::HGetAll { key } => self.cmd_hgetall(&key),
462 Command::HLen { key } => self.cmd_hlen(&key),
463 Command::HExists { key, field } => self.cmd_hexists(&key, &field),
464 Command::HIncrBy { key, field, delta } => self.cmd_hincrby(&key, &field, delta),
465 Command::HMGet { key, fields } => self.cmd_hmget(&key, &fields),
466 Command::HKeys { key } => self.cmd_hkeys(&key),
467 Command::HVals { key } => self.cmd_hvals(&key),
468 Command::HSetNx { key, field, value } => self.cmd_hsetnx(&key, &field, &value),
469 Command::HIncrByFloat { key, field, delta } => {
470 self.cmd_hincrbyfloat(&key, &field, delta)
471 }
472 Command::HRandField {
473 key,
474 count,
475 withvalues,
476 } => self.cmd_hrandfield(&key, count, withvalues),
477 Command::HScan {
478 key,
479 cursor,
480 pattern,
481 count,
482 } => self.cmd_hscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
483
484 Command::SAdd { key, members } => self.cmd_sadd(&key, &members),
485 Command::SRem { key, members } => self.cmd_srem(&key, &members),
486 Command::SMembers { key } => self.cmd_smembers(&key),
487 Command::SIsMember { key, member } => self.cmd_sismember(&key, &member),
488 Command::SCard { key } => self.cmd_scard(&key),
489 Command::SPop { key, count } => self.cmd_spop(&key, count),
490 Command::SRandMember { key, count } => self.cmd_srandmember(&key, count),
491 Command::SUnion { keys } => self.cmd_sunion(&keys),
492 Command::SUnionStore { destination, keys } => self.cmd_sunionstore(&destination, &keys),
493 Command::SInter { keys } => self.cmd_sinter(&keys),
494 Command::SInterStore { destination, keys } => self.cmd_sinterstore(&destination, &keys),
495 Command::SDiff { keys } => self.cmd_sdiff(&keys),
496 Command::SDiffStore { destination, keys } => self.cmd_sdiffstore(&destination, &keys),
497 Command::SInterCard {
498 numkeys: _,
499 keys,
500 limit,
501 } => self.cmd_sintercard(&keys, limit),
502 Command::SMove {
503 source,
504 destination,
505 member,
506 } => self.cmd_smove(&source, &destination, &member),
507 Command::SMisMember { key, members } => self.cmd_smismember(&key, &members),
508 Command::SScan {
509 key,
510 cursor,
511 pattern,
512 count,
513 } => self.cmd_sscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
514
515 Command::ZAdd { key, members } => self.cmd_zadd(&key, &members),
516 Command::ZRem { key, members } => self.cmd_zrem(&key, &members),
517 Command::ZScore { key, member } => self.cmd_zscore(&key, &member),
518 Command::ZRank { key, member } => self.cmd_zrank(&key, &member, false),
519 Command::ZRevRank { key, member } => self.cmd_zrank(&key, &member, true),
520 Command::ZCard { key } => self.cmd_zcard(&key),
521 Command::ZRange {
522 key,
523 start,
524 stop,
525 withscores,
526 } => self.cmd_zrange(&key, start, stop, withscores, false),
527 Command::ZRevRange {
528 key,
529 start,
530 stop,
531 withscores,
532 } => self.cmd_zrange(&key, start, stop, withscores, true),
533 Command::ZRangeByScore {
534 key,
535 min,
536 max,
537 withscores,
538 offset,
539 count,
540 } => self.cmd_zrangebyscore(&key, min, max, withscores, offset, count),
541 Command::ZIncrBy { key, delta, member } => self.cmd_zincrby(&key, delta, &member),
542 Command::ZCount { key, min, max } => self.cmd_zcount(&key, min, max),
543 Command::ZRevRangeByScore {
544 key,
545 max,
546 min,
547 withscores,
548 offset,
549 count,
550 } => self.cmd_zrevrangebyscore(&key, max, min, withscores, offset, count),
551 Command::ZPopMin { key, count } => self.cmd_zpopmin(&key, count),
552 Command::ZPopMax { key, count } => self.cmd_zpopmax(&key, count),
553 Command::ZRangeByLex {
554 key,
555 min,
556 max,
557 offset,
558 count,
559 } => self.cmd_zrangebylex(&key, &min, &max, offset, count, false),
560 Command::ZRevRangeByLex {
561 key,
562 max,
563 min,
564 offset,
565 count,
566 } => self.cmd_zrangebylex(&key, &min, &max, offset, count, true),
567 Command::ZLexCount { key, min, max } => self.cmd_zlexcount(&key, &min, &max),
568 Command::ZMScore { key, members } => self.cmd_zmscore(&key, &members),
569 Command::ZRandMember {
570 key,
571 count,
572 withscores,
573 } => self.cmd_zrandmember(&key, count, withscores),
574 Command::ZScan {
575 key,
576 cursor,
577 pattern,
578 count,
579 } => self.cmd_zscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
580
581 Command::XAdd {
582 key,
583 id,
584 fields,
585 maxlen,
586 } => self.cmd_xadd(&key, &id, &fields, maxlen),
587 Command::XLen { key } => self.cmd_xlen(&key),
588 Command::XRange {
589 key,
590 start,
591 end,
592 count,
593 } => self.cmd_xrange(&key, &start, &end, count),
594 Command::XRevRange {
595 key,
596 start,
597 end,
598 count,
599 } => self.cmd_xrevrange(&key, &start, &end, count),
600 Command::XRead { keys, ids, count } => self.cmd_xread(&keys, &ids, count),
601 Command::XTrim { key, maxlen } => self.cmd_xtrim(&key, maxlen),
602 Command::XDel { key, ids } => self.cmd_xdel(&key, &ids),
603 Command::XGroupCreate {
604 key,
605 group,
606 id,
607 mkstream,
608 } => self.cmd_xgroup_create(&key, &group, &id, mkstream),
609 Command::XGroupDestroy { key, group } => self.cmd_xgroup_destroy(&key, &group),
610 Command::XGroupDelConsumer {
611 key,
612 group,
613 consumer,
614 } => self.cmd_xgroup_delconsumer(&key, &group, &consumer),
615 Command::XReadGroup {
616 group,
617 consumer,
618 count,
619 keys,
620 ids,
621 } => self.cmd_xreadgroup(&group, &consumer, count, &keys, &ids),
622 Command::XAck { key, group, ids } => self.cmd_xack(&key, &group, &ids),
623 Command::XPending {
624 key,
625 group,
626 start,
627 end,
628 count,
629 } => self.cmd_xpending(&key, &group, start.as_deref(), end.as_deref(), count),
630 Command::XClaim {
631 key,
632 group,
633 consumer,
634 min_idle_time,
635 ids,
636 } => self.cmd_xclaim(&key, &group, &consumer, min_idle_time, &ids),
637 Command::XAutoClaim {
638 key,
639 group,
640 consumer,
641 min_idle_time,
642 start,
643 count,
644 } => self.cmd_xautoclaim(&key, &group, &consumer, min_idle_time, &start, count),
645 Command::XInfoStream { key } => self.cmd_xinfo_stream(&key),
646 Command::XInfoGroups { key } => self.cmd_xinfo_groups(&key),
647
648 Command::BLPop { keys, .. } => {
649 for key in &keys {
650 let resp = self.cmd_lpop(key);
651 if !matches!(resp, CommandResponse::Nil) {
652 return CommandResponse::Array(vec![
653 CommandResponse::BulkString(key.clone()),
654 resp,
655 ]);
656 }
657 }
658 CommandResponse::Nil
659 }
660 Command::BRPop { keys, .. } => {
661 for key in &keys {
662 let resp = self.cmd_rpop(key);
663 if !matches!(resp, CommandResponse::Nil) {
664 return CommandResponse::Array(vec![
665 CommandResponse::BulkString(key.clone()),
666 resp,
667 ]);
668 }
669 }
670 CommandResponse::Nil
671 }
672 Command::BLMove {
673 source,
674 destination,
675 from_left,
676 to_left,
677 ..
678 } => self.cmd_lmove(&source, &destination, from_left, to_left),
679 Command::BZPopMin { keys, .. } => {
680 for key in &keys {
681 let resp = self.cmd_zpopmin(key, Some(1));
682 if let CommandResponse::Array(ref items) = resp {
683 if !items.is_empty() {
684 return CommandResponse::Array(vec![
685 CommandResponse::BulkString(key.clone()),
686 resp,
687 ]);
688 }
689 }
690 }
691 CommandResponse::Nil
692 }
693 Command::BZPopMax { keys, .. } => {
694 for key in &keys {
695 let resp = self.cmd_zpopmax(key, Some(1));
696 if let CommandResponse::Array(ref items) = resp {
697 if !items.is_empty() {
698 return CommandResponse::Array(vec![
699 CommandResponse::BulkString(key.clone()),
700 resp,
701 ]);
702 }
703 }
704 }
705 CommandResponse::Nil
706 }
707
708 Command::Ping { message } => match message {
709 Some(msg) => CommandResponse::BulkString(msg),
710 None => CommandResponse::SimpleString("PONG".to_string()),
711 },
712 Command::Echo { message } => CommandResponse::BulkString(message),
713 Command::Info { .. } => CommandResponse::BulkString(
714 format!(
715 "# Server\r\nkora_version:0.1.0\r\n# Keyspace\r\ndb0:keys={}\r\n",
716 self.entries.len()
717 )
718 .into_bytes(),
719 ),
720 Command::CommandInfo { names } => command_info_response(&names),
721
722 Command::Dump => {
723 let entries: Vec<CommandResponse> = self
724 .entries
725 .iter()
726 .filter(|(_, entry)| !entry.is_expired())
727 .flat_map(|(key, entry)| {
728 vec![
729 CommandResponse::BulkString(key.as_bytes().to_vec()),
730 CommandResponse::BulkString(entry.value.to_bytes()),
731 ]
732 })
733 .collect();
734 CommandResponse::Array(entries)
735 }
736
737 Command::MGet { keys } => {
738 let results: Vec<CommandResponse> = keys.iter().map(|k| self.cmd_get(k)).collect();
739 CommandResponse::Array(results)
740 }
741 Command::MSet { entries } => {
742 for (k, v) in &entries {
743 self.cmd_set(k, v, None, None, false, false);
744 }
745 CommandResponse::Ok
746 }
747
748 #[cfg(feature = "vector")]
749 Command::VecSet {
750 key,
751 dimensions,
752 vector,
753 } => self.cmd_vec_set(&key, dimensions, &vector),
754 #[cfg(feature = "vector")]
755 Command::VecQuery { key, k, vector } => self.cmd_vec_query(&key, k, &vector),
756 #[cfg(feature = "vector")]
757 Command::VecDel { key } => self.cmd_vec_del(&key),
758
759 Command::ObjectFreq { ref key } => {
760 let compact = CompactKey::new(key);
761 match self.entries.get(&compact) {
762 Some(entry) if !entry.is_expired() => {
763 CommandResponse::Integer(entry.lfu_counter as i64)
764 }
765 _ => CommandResponse::Nil,
766 }
767 }
768 Command::ObjectEncoding { ref key } => {
769 let compact = CompactKey::new(key);
770 match self.entries.get(&compact) {
771 Some(entry) if !entry.is_expired() => {
772 let encoding = match &entry.value {
773 Value::InlineStr { .. } => "embstr",
774 Value::HeapStr(_) => "raw",
775 Value::Int(_) => "int",
776 Value::List(_) => "linkedlist",
777 Value::Hash(_) => "hashtable",
778 Value::Set(_) => "hashtable",
779 _ => "unknown",
780 };
781 CommandResponse::BulkString(encoding.as_bytes().to_vec())
782 }
783 _ => CommandResponse::Nil,
784 }
785 }
786 Command::StatsHotkeys { count } => self.cmd_stats_hotkeys(count),
787 Command::StatsMemory { prefixes } => self.cmd_stats_memory(&prefixes),
788
789 Command::PfAdd { key, elements } => self.cmd_pfadd(&key, &elements),
790 Command::PfCount { keys } => {
791 if keys.len() == 1 {
792 self.cmd_pfcount_single(&keys[0])
793 } else {
794 self.cmd_pfcount_multi(&keys)
795 }
796 }
797 Command::PfMerge {
798 destkey,
799 sourcekeys,
800 } => self.cmd_pfmerge(&destkey, &sourcekeys),
801
802 Command::SetBit { key, offset, value } => self.cmd_setbit(&key, offset, value),
803 Command::GetBit { key, offset } => self.cmd_getbit(&key, offset),
804 Command::BitCount {
805 key,
806 start,
807 end,
808 use_bit,
809 } => self.cmd_bitcount(&key, start, end, use_bit),
810 Command::BitOp {
811 operation,
812 destkey,
813 keys,
814 } => self.cmd_bitop(operation, &destkey, &keys),
815 Command::BitPos {
816 key,
817 bit,
818 start,
819 end,
820 use_bit,
821 } => self.cmd_bitpos(&key, bit, start, end, use_bit),
822 Command::BitField { key, operations } => self.cmd_bitfield(&key, &operations),
823
824 Command::GeoAdd {
825 key,
826 nx,
827 xx,
828 ch,
829 members,
830 } => self.cmd_geoadd(&key, nx, xx, ch, &members),
831 Command::GeoDist {
832 key,
833 member1,
834 member2,
835 unit,
836 } => self.cmd_geodist(&key, &member1, &member2, unit),
837 Command::GeoHash { key, members } => self.cmd_geohash(&key, &members),
838 Command::GeoPos { key, members } => self.cmd_geopos(&key, &members),
839 Command::GeoSearch {
840 key,
841 from_member,
842 from_lonlat,
843 radius,
844 unit,
845 asc,
846 count,
847 withcoord,
848 withdist,
849 withhash,
850 } => self.cmd_geosearch(
851 &key,
852 from_member.as_deref(),
853 from_lonlat,
854 radius,
855 unit,
856 asc,
857 count,
858 withcoord,
859 withdist,
860 withhash,
861 ),
862
863 Command::Multi
864 | Command::Exec
865 | Command::Discard
866 | Command::Watch { .. }
867 | Command::Unwatch => {
868 CommandResponse::Error("ERR command handled at connection level".into())
869 }
870
871 Command::ClientId
872 | Command::ClientGetName
873 | Command::ClientSetName { .. }
874 | Command::ClientList
875 | Command::ClientInfo => {
876 CommandResponse::Error("ERR command handled at connection level".into())
877 }
878
879 Command::ConfigGet { .. } => CommandResponse::Array(vec![]),
880 Command::ConfigSet { .. } => {
881 CommandResponse::Error("ERR unsupported CONFIG SET parameter".into())
882 }
883 Command::ConfigResetStat => CommandResponse::Ok,
884
885 Command::Time => {
886 use std::time::SystemTime;
887 let now = SystemTime::now()
888 .duration_since(SystemTime::UNIX_EPOCH)
889 .unwrap_or_default();
890 let secs = now.as_secs();
891 let micros = now.subsec_micros() as u64;
892 CommandResponse::Array(vec![
893 CommandResponse::BulkString(secs.to_string().into_bytes()),
894 CommandResponse::BulkString(micros.to_string().into_bytes()),
895 ])
896 }
897 Command::Select { db } => {
898 if db < 0 {
899 CommandResponse::Error("ERR DB index is out of range".into())
900 } else if db == 0 {
901 CommandResponse::Ok
902 } else {
903 CommandResponse::Error("ERR SELECT is not allowed in cluster mode".into())
904 }
905 }
906 Command::Quit => CommandResponse::Ok,
907 Command::Wait { timeout, .. } => {
908 if timeout < 0 {
909 CommandResponse::Error("ERR timeout is negative".into())
910 } else {
911 CommandResponse::Integer(0)
912 }
913 }
914 Command::CommandCount => CommandResponse::Integer(supported_command_count()),
915 Command::CommandList => command_list_response(),
916 Command::CommandHelp => command_help_response(),
917 Command::CommandDocs { names } => command_docs_response(&names),
918
919 Command::BgSave
920 | Command::BgRewriteAof
921 | Command::Hello { .. }
922 | Command::Auth { .. }
923 | Command::CdcPoll { .. }
924 | Command::CdcGroupCreate { .. }
925 | Command::CdcGroupRead { .. }
926 | Command::CdcAck { .. }
927 | Command::CdcPending { .. }
928 | Command::StatsLatency { .. }
929 | Command::DocCreate { .. }
930 | Command::DocDrop { .. }
931 | Command::DocInfo { .. }
932 | Command::DocDictInfo { .. }
933 | Command::DocStorage { .. }
934 | Command::DocSet { .. }
935 | Command::DocInsert { .. }
936 | Command::DocMSet { .. }
937 | Command::DocGet { .. }
938 | Command::DocMGet { .. }
939 | Command::DocUpdate { .. }
940 | Command::DocDel { .. }
941 | Command::DocExists { .. }
942 | Command::DocCreateIndex { .. }
943 | Command::DocDropIndex { .. }
944 | Command::DocIndexes { .. }
945 | Command::DocFind { .. }
946 | Command::DocCount { .. }
947 | Command::Subscribe { .. }
948 | Command::Unsubscribe { .. }
949 | Command::PSubscribe { .. }
950 | Command::PUnsubscribe { .. }
951 | Command::Publish { .. } => {
952 CommandResponse::Error("ERR command handled at server level".into())
953 }
954
955 #[cfg(not(feature = "vector"))]
956 Command::VecSet { .. } | Command::VecQuery { .. } | Command::VecDel { .. } => {
957 CommandResponse::Error("ERR vector feature not enabled".into())
958 }
959 }
960 }
961
962 fn cmd_get(&mut self, key: &[u8]) -> CommandResponse {
965 self.lazy_expire(key);
966 match self.entries.get(key) {
967 Some(entry) => match entry.value.bulk_response() {
968 Some(resp) => resp,
969 None => CommandResponse::Error(
970 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
971 ),
972 },
973 None => CommandResponse::Nil,
974 }
975 }
976
977 fn cmd_set(
978 &mut self,
979 key: &[u8],
980 value: &[u8],
981 ex: Option<u64>,
982 px: Option<u64>,
983 nx: bool,
984 xx: bool,
985 ) -> CommandResponse {
986 let ttl = Command::ttl_duration(ex, px);
987 let compact = CompactKey::new(key);
988 match self.entries.entry(compact) {
989 Entry::Occupied(mut occupied) => {
990 if occupied.get().is_expired() {
991 let (map_key, old_entry) = occupied.remove_entry();
992 let old_size =
993 Self::estimate_key_entry_size(map_key.as_bytes(), &old_entry.value);
994 self.memory_used = self.memory_used.saturating_sub(old_size);
995
996 if xx {
997 return CommandResponse::Nil;
998 }
999
1000 let (entry, entry_size) = Self::build_string_entry(&map_key, value, ttl);
1001 self.memory_used += entry_size;
1002 self.entries.insert(map_key, entry);
1003 CommandResponse::Ok
1004 } else {
1005 if nx {
1006 return CommandResponse::Nil;
1007 }
1008
1009 let new_value = Value::from_raw_bytes(value);
1010 let old_value_size = occupied.get().value.estimated_size();
1011 let new_value_size = new_value.estimated_size();
1012 let size_delta = new_value_size as isize - old_value_size as isize;
1013
1014 if size_delta >= 0 {
1015 self.memory_used += size_delta as usize;
1016 } else {
1017 self.memory_used = self.memory_used.saturating_sub((-size_delta) as usize);
1018 }
1019
1020 let entry = occupied.get_mut();
1021 entry.value = new_value;
1022 entry.client_flags = 0;
1023 match ttl {
1024 Some(dur) => entry.set_ttl(dur),
1025 None => entry.ttl = None,
1026 }
1027 CommandResponse::Ok
1028 }
1029 }
1030 Entry::Vacant(vacant) => {
1031 if xx {
1032 return CommandResponse::Nil;
1033 }
1034
1035 let new_value = Value::from_raw_bytes(value);
1036 let entry_size = Self::estimate_key_entry_size(vacant.key().as_bytes(), &new_value);
1037 let mut entry = KeyEntry::new(vacant.key().clone(), new_value);
1038 if let Some(dur) = ttl {
1039 entry.set_ttl(dur);
1040 }
1041 self.memory_used += entry_size;
1042 vacant.insert(entry);
1043 CommandResponse::Ok
1044 }
1045 }
1046 }
1047
1048 fn cmd_getset(&mut self, key: &[u8], value: &[u8]) -> CommandResponse {
1049 let old = self.cmd_get(key);
1050 self.cmd_set(key, value, None, None, false, false);
1051 old
1052 }
1053
1054 fn cmd_append(&mut self, key: &[u8], value: &[u8]) -> CommandResponse {
1055 self.lazy_expire(key);
1056 let compact = CompactKey::new(key);
1057 match self.entries.get_mut(&compact) {
1058 Some(entry) => match &entry.value {
1059 Value::InlineStr { data, len } => {
1060 let mut existing = data[..*len as usize].to_vec();
1061 existing.extend_from_slice(value);
1062 let new_len = existing.len();
1063 entry.value = Value::from_raw_bytes(&existing);
1064 CommandResponse::Integer(new_len as i64)
1065 }
1066 Value::HeapStr(arc) => {
1067 let mut existing = arc.to_vec();
1068 existing.extend_from_slice(value);
1069 let new_len = existing.len();
1070 entry.value = Value::from_raw_bytes(&existing);
1071 CommandResponse::Integer(new_len as i64)
1072 }
1073 Value::Int(i) => {
1074 let mut existing = i.to_string().into_bytes();
1075 existing.extend_from_slice(value);
1076 let new_len = existing.len();
1077 entry.value = Value::from_raw_bytes(&existing);
1078 CommandResponse::Integer(new_len as i64)
1079 }
1080 _ => CommandResponse::Error(
1081 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1082 ),
1083 },
1084 None => {
1085 let new_entry = KeyEntry::new(compact.clone(), Value::from_raw_bytes(value));
1086 self.entries.insert(compact, new_entry);
1087 CommandResponse::Integer(value.len() as i64)
1088 }
1089 }
1090 }
1091
1092 fn cmd_strlen(&mut self, key: &[u8]) -> CommandResponse {
1093 self.lazy_expire(key);
1094 match self.entries.get(key) {
1095 Some(entry) => match entry.value.string_len() {
1096 Some(len) => CommandResponse::Integer(len as i64),
1097 None => CommandResponse::Error(
1098 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1099 ),
1100 },
1101 None => CommandResponse::Integer(0),
1102 }
1103 }
1104
1105 fn cmd_incrby(&mut self, key: &[u8], delta: i64) -> CommandResponse {
1106 self.lazy_expire(key);
1107 match self.entries.get_mut(key) {
1108 Some(entry) => {
1109 let current = match &entry.value {
1110 Value::Int(i) => *i,
1111 Value::InlineStr { data, len } => {
1112 match std::str::from_utf8(&data[..*len as usize])
1113 .ok()
1114 .and_then(|s| s.parse::<i64>().ok())
1115 {
1116 Some(i) => i,
1117 None => {
1118 return CommandResponse::Error(
1119 "ERR value is not an integer or out of range".into(),
1120 )
1121 }
1122 }
1123 }
1124 Value::HeapStr(arc) => {
1125 match std::str::from_utf8(arc)
1126 .ok()
1127 .and_then(|s| s.parse::<i64>().ok())
1128 {
1129 Some(i) => i,
1130 None => {
1131 return CommandResponse::Error(
1132 "ERR value is not an integer or out of range".into(),
1133 )
1134 }
1135 }
1136 }
1137 _ => {
1138 return CommandResponse::Error(
1139 "WRONGTYPE Operation against a key holding the wrong kind of value"
1140 .into(),
1141 )
1142 }
1143 };
1144 match current.checked_add(delta) {
1145 Some(result) => {
1146 entry.value = Value::Int(result);
1147 CommandResponse::Integer(result)
1148 }
1149 None => {
1150 CommandResponse::Error("ERR increment or decrement would overflow".into())
1151 }
1152 }
1153 }
1154 None => {
1155 let compact = CompactKey::new(key);
1156 let entry = KeyEntry::new(compact.clone(), Value::Int(delta));
1157 self.entries.insert(compact, entry);
1158 CommandResponse::Integer(delta)
1159 }
1160 }
1161 }
1162
1163 fn cmd_incrbyfloat(&mut self, key: &[u8], delta: f64) -> CommandResponse {
1164 self.lazy_expire(key);
1165 match self.entries.get_mut(key) {
1166 Some(entry) => {
1167 let current = match &entry.value {
1168 Value::Int(i) => *i as f64,
1169 Value::InlineStr { data, len } => {
1170 match std::str::from_utf8(&data[..*len as usize])
1171 .ok()
1172 .and_then(|s| s.parse::<f64>().ok())
1173 {
1174 Some(f) => f,
1175 None => {
1176 return CommandResponse::Error(
1177 "ERR value is not a valid float".into(),
1178 )
1179 }
1180 }
1181 }
1182 Value::HeapStr(arc) => {
1183 match std::str::from_utf8(arc)
1184 .ok()
1185 .and_then(|s| s.parse::<f64>().ok())
1186 {
1187 Some(f) => f,
1188 None => {
1189 return CommandResponse::Error(
1190 "ERR value is not a valid float".into(),
1191 )
1192 }
1193 }
1194 }
1195 _ => {
1196 return CommandResponse::Error(
1197 "WRONGTYPE Operation against a key holding the wrong kind of value"
1198 .into(),
1199 )
1200 }
1201 };
1202 let result = current + delta;
1203 if result.is_infinite() || result.is_nan() {
1204 return CommandResponse::Error(
1205 "ERR increment would produce NaN or Infinity".into(),
1206 );
1207 }
1208 let result_str = format_float(result);
1209 entry.value = Value::from_raw_bytes(result_str.as_bytes());
1210 CommandResponse::BulkString(result_str.into_bytes())
1211 }
1212 None => {
1213 if delta.is_infinite() || delta.is_nan() {
1214 return CommandResponse::Error(
1215 "ERR increment would produce NaN or Infinity".into(),
1216 );
1217 }
1218 let result_str = format_float(delta);
1219 let compact = CompactKey::new(key);
1220 let entry = KeyEntry::new(
1221 compact.clone(),
1222 Value::from_raw_bytes(result_str.as_bytes()),
1223 );
1224 self.entries.insert(compact, entry);
1225 CommandResponse::BulkString(result_str.into_bytes())
1226 }
1227 }
1228 }
1229
1230 fn cmd_getrange(&mut self, key: &[u8], start: i64, end: i64) -> CommandResponse {
1231 self.lazy_expire(key);
1232 match self.entries.get(key) {
1233 Some(entry) => {
1234 let bytes = match entry.value.as_bytes() {
1235 Some(b) => b,
1236 None => {
1237 return CommandResponse::Error(
1238 "WRONGTYPE Operation against a key holding the wrong kind of value"
1239 .into(),
1240 )
1241 }
1242 };
1243 let len = bytes.len() as i64;
1244 if len == 0 {
1245 return CommandResponse::BulkString(vec![]);
1246 }
1247 let s = if start < 0 {
1248 (len + start).max(0) as usize
1249 } else {
1250 start.min(len) as usize
1251 };
1252 let e = if end < 0 {
1253 (len + end).max(0) as usize
1254 } else {
1255 end.min(len - 1) as usize
1256 };
1257 if s > e || s >= bytes.len() {
1258 CommandResponse::BulkString(vec![])
1259 } else {
1260 CommandResponse::BulkString(bytes[s..=e].to_vec())
1261 }
1262 }
1263 None => CommandResponse::BulkString(vec![]),
1264 }
1265 }
1266
1267 fn cmd_setrange(&mut self, key: &[u8], offset: usize, value: &[u8]) -> CommandResponse {
1268 self.lazy_expire(key);
1269 let compact = CompactKey::new(key);
1270 let mut current = match self.entries.get(key) {
1271 Some(entry) => match entry.value.as_bytes() {
1272 Some(b) => b,
1273 None => {
1274 return CommandResponse::Error(
1275 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1276 )
1277 }
1278 },
1279 None => vec![],
1280 };
1281 let needed = offset + value.len();
1282 if current.len() < needed {
1283 current.resize(needed, 0u8);
1284 }
1285 current[offset..offset + value.len()].copy_from_slice(value);
1286 let new_len = current.len() as i64;
1287 let old_size = self
1288 .entries
1289 .get(key)
1290 .map(|e| Self::estimate_key_entry_size(key, &e.value))
1291 .unwrap_or(0);
1292 let new_value = Value::from_raw_bytes(¤t);
1293 let new_size = Self::estimate_key_entry_size(key, &new_value);
1294 match self.entries.get_mut(key) {
1295 Some(entry) => {
1296 entry.value = new_value;
1297 if new_size > old_size {
1298 self.memory_used += new_size - old_size;
1299 } else {
1300 self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
1301 }
1302 }
1303 None => {
1304 let entry = KeyEntry::new(compact.clone(), new_value);
1305 self.memory_used += new_size;
1306 self.entries.insert(compact, entry);
1307 }
1308 }
1309 CommandResponse::Integer(new_len)
1310 }
1311
1312 fn cmd_getdel(&mut self, key: &[u8]) -> CommandResponse {
1313 self.lazy_expire(key);
1314 let compact = CompactKey::new(key);
1315 match self.entries.get(&compact) {
1316 Some(entry) if !entry.is_expired() => {
1317 let resp = match entry.value.bulk_response() {
1318 Some(r) => r,
1319 None => {
1320 return CommandResponse::Error(
1321 "WRONGTYPE Operation against a key holding the wrong kind of value"
1322 .into(),
1323 )
1324 }
1325 };
1326 let _ = self.remove_compact_entry(&compact);
1327 resp
1328 }
1329 _ => CommandResponse::Nil,
1330 }
1331 }
1332
1333 fn cmd_getex(
1334 &mut self,
1335 key: &[u8],
1336 ex: Option<u64>,
1337 px: Option<u64>,
1338 exat: Option<u64>,
1339 pxat: Option<u64>,
1340 persist: bool,
1341 ) -> CommandResponse {
1342 self.lazy_expire(key);
1343 match self.entries.get_mut(key) {
1344 Some(entry) if !entry.is_expired() => {
1345 let resp = match entry.value.bulk_response() {
1346 Some(r) => r,
1347 None => {
1348 return CommandResponse::Error(
1349 "WRONGTYPE Operation against a key holding the wrong kind of value"
1350 .into(),
1351 )
1352 }
1353 };
1354 if let Some(s) = ex {
1355 entry.set_ttl(Duration::from_secs(s));
1356 } else if let Some(ms) = px {
1357 entry.set_ttl(Duration::from_millis(ms));
1358 } else if let Some(ts) = exat {
1359 let now = std::time::SystemTime::now()
1360 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1361 .unwrap_or_default();
1362 let target = Duration::from_secs(ts);
1363 if target > now {
1364 entry.set_ttl(target - now);
1365 } else {
1366 let compact = CompactKey::new(key);
1367 let _ = self.remove_compact_entry(&compact);
1368 return resp;
1369 }
1370 } else if let Some(ts_ms) = pxat {
1371 let now = std::time::SystemTime::now()
1372 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1373 .unwrap_or_default();
1374 let target = Duration::from_millis(ts_ms);
1375 if target > now {
1376 entry.set_ttl(target - now);
1377 } else {
1378 let compact = CompactKey::new(key);
1379 let _ = self.remove_compact_entry(&compact);
1380 return resp;
1381 }
1382 } else if persist {
1383 entry.clear_ttl();
1384 }
1385 resp
1386 }
1387 _ => CommandResponse::Nil,
1388 }
1389 }
1390
1391 fn cmd_msetnx(&mut self, entries: &[(Vec<u8>, Vec<u8>)]) -> CommandResponse {
1392 for (k, _) in entries {
1393 self.lazy_expire(k);
1394 if self.entries.contains_key(k.as_slice()) {
1395 return CommandResponse::Integer(0);
1396 }
1397 }
1398 for (k, v) in entries {
1399 self.cmd_set(k, v, None, None, false, false);
1400 }
1401 CommandResponse::Integer(1)
1402 }
1403
1404 fn del(&mut self, key: &[u8]) -> bool {
1407 let compact = CompactKey::new(key);
1408 self.remove_compact_entry(&compact).is_some()
1409 }
1410
1411 fn exists(&mut self, key: &[u8]) -> bool {
1412 self.lazy_expire(key);
1413 self.entries.contains_key(key)
1414 }
1415
1416 fn cmd_expire(&mut self, key: &[u8], duration: Duration) -> CommandResponse {
1417 match self.entries.get_mut(key) {
1418 Some(entry) if !entry.is_expired() => {
1419 entry.set_ttl(duration);
1420 CommandResponse::Integer(1)
1421 }
1422 _ => CommandResponse::Integer(0),
1423 }
1424 }
1425
1426 fn cmd_persist(&mut self, key: &[u8]) -> CommandResponse {
1427 match self.entries.get_mut(key) {
1428 Some(entry) if !entry.is_expired() && entry.ttl.is_some() => {
1429 entry.clear_ttl();
1430 CommandResponse::Integer(1)
1431 }
1432 _ => CommandResponse::Integer(0),
1433 }
1434 }
1435
1436 fn cmd_ttl(&mut self, key: &[u8], millis: bool) -> CommandResponse {
1437 self.lazy_expire(key);
1438 match self.entries.get(key) {
1439 None => CommandResponse::Integer(-2),
1440 Some(entry) => match entry.remaining_ttl() {
1441 None => CommandResponse::Integer(-1),
1442 Some(dur) => {
1443 if millis {
1444 CommandResponse::Integer(dur.as_millis() as i64)
1445 } else {
1446 CommandResponse::Integer(dur.as_secs() as i64)
1447 }
1448 }
1449 },
1450 }
1451 }
1452
1453 fn cmd_type(&mut self, key: &[u8]) -> CommandResponse {
1454 self.lazy_expire(key);
1455 match self.entries.get(key) {
1456 Some(entry) => CommandResponse::SimpleString(entry.value.type_name().to_string()),
1457 None => CommandResponse::SimpleString("none".to_string()),
1458 }
1459 }
1460
1461 fn cmd_expireat(&mut self, key: &[u8], timestamp: u64, millis: bool) -> CommandResponse {
1462 match self.entries.get_mut(key) {
1463 Some(entry) if !entry.is_expired() => {
1464 let now = std::time::SystemTime::now()
1465 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1466 .unwrap_or_default();
1467 let target = if millis {
1468 Duration::from_millis(timestamp)
1469 } else {
1470 Duration::from_secs(timestamp)
1471 };
1472 if target > now {
1473 entry.set_ttl(target - now);
1474 CommandResponse::Integer(1)
1475 } else {
1476 let compact = CompactKey::new(key);
1477 let _ = self.remove_compact_entry(&compact);
1478 CommandResponse::Integer(1)
1479 }
1480 }
1481 _ => CommandResponse::Integer(0),
1482 }
1483 }
1484
1485 fn cmd_rename(&mut self, key: &[u8], newkey: &[u8], nx: bool) -> CommandResponse {
1486 self.lazy_expire(key);
1487 let source_compact = CompactKey::new(key);
1488 let source_entry = match self.remove_compact_entry(&source_compact) {
1489 Some(e) => e,
1490 None => {
1491 return CommandResponse::Error("ERR no such key".into());
1492 }
1493 };
1494 self.lazy_expire(newkey);
1495 let dest_compact = CompactKey::new(newkey);
1496 if nx && self.entries.contains_key(&dest_compact) {
1497 let size = Self::estimate_key_entry_size(key, &source_entry.value);
1498 self.memory_used += size;
1499 self.entries.insert(source_compact, source_entry);
1500 return CommandResponse::Integer(0);
1501 }
1502 if let Some(_old) = self.remove_compact_entry(&dest_compact) {}
1503 let mut new_entry = KeyEntry::new(dest_compact.clone(), source_entry.value.clone());
1504 new_entry.ttl = source_entry.ttl;
1505 new_entry.lfu_counter = source_entry.lfu_counter;
1506 let size = Self::estimate_key_entry_size(newkey, &new_entry.value);
1507 self.memory_used += size;
1508 self.entries.insert(dest_compact, new_entry);
1509 if nx {
1510 CommandResponse::Integer(1)
1511 } else {
1512 CommandResponse::Ok
1513 }
1514 }
1515
1516 fn cmd_copy(&mut self, source: &[u8], destination: &[u8], replace: bool) -> CommandResponse {
1517 self.lazy_expire(source);
1518 let src_compact = CompactKey::new(source);
1519 let (value, ttl) = match self.entries.get(&src_compact) {
1520 Some(entry) if !entry.is_expired() => (entry.value.clone(), entry.ttl),
1521 _ => return CommandResponse::Integer(0),
1522 };
1523 self.lazy_expire(destination);
1524 let dest_compact = CompactKey::new(destination);
1525 if self.entries.contains_key(&dest_compact) && !replace {
1526 return CommandResponse::Integer(0);
1527 }
1528 if replace {
1529 let _ = self.remove_compact_entry(&dest_compact);
1530 }
1531 let mut new_entry = KeyEntry::new(dest_compact.clone(), value);
1532 new_entry.ttl = ttl;
1533 let size = Self::estimate_key_entry_size(destination, &new_entry.value);
1534 self.memory_used += size;
1535 self.entries.insert(dest_compact, new_entry);
1536 CommandResponse::Integer(1)
1537 }
1538
1539 fn cmd_randomkey(&mut self) -> CommandResponse {
1540 for (key, entry) in &self.entries {
1541 if !entry.is_expired() {
1542 return CommandResponse::BulkString(key.as_bytes().to_vec());
1543 }
1544 }
1545 CommandResponse::Nil
1546 }
1547
1548 fn cmd_keys(&self, pattern: &str) -> CommandResponse {
1549 let results: Vec<CommandResponse> = self
1550 .entries
1551 .iter()
1552 .filter(|(_, entry)| !entry.is_expired())
1553 .filter(|(key, _)| glob_match(pattern, key.as_bytes()))
1554 .map(|(key, _)| CommandResponse::BulkString(key.as_bytes().to_vec()))
1555 .collect();
1556 CommandResponse::Array(results)
1557 }
1558
1559 fn cmd_scan(&self, cursor: u64, pattern: Option<&str>, count: usize) -> CommandResponse {
1560 let mut keys: Vec<&CompactKey> = self
1564 .entries
1565 .iter()
1566 .filter(|(_, entry)| !entry.is_expired())
1567 .filter(|(key, _)| pattern.is_none_or(|p| glob_match(p, key.as_bytes())))
1568 .map(|(key, _)| key)
1569 .collect();
1570 keys.sort_by(|a, b| a.as_bytes().cmp(b.as_bytes()));
1571
1572 let start = cursor as usize;
1573 let end = (start + count).min(keys.len());
1574
1575 if start >= keys.len() {
1576 return CommandResponse::Array(vec![
1577 CommandResponse::BulkString(b"0".to_vec()),
1578 CommandResponse::Array(vec![]),
1579 ]);
1580 }
1581
1582 let result_keys: Vec<CommandResponse> = keys[start..end]
1583 .iter()
1584 .map(|k| CommandResponse::BulkString(k.as_bytes().to_vec()))
1585 .collect();
1586
1587 let next_cursor = if end >= keys.len() { 0 } else { end as u64 };
1588
1589 CommandResponse::Array(vec![
1590 CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
1591 CommandResponse::Array(result_keys),
1592 ])
1593 }
1594
1595 fn cmd_stats_hotkeys(&self, count: usize) -> CommandResponse {
1596 let mut hot: Vec<(&CompactKey, u8)> = self
1597 .entries
1598 .iter()
1599 .filter(|(_, entry)| !entry.is_expired())
1600 .map(|(key, entry)| (key, entry.lfu_counter))
1601 .collect();
1602 hot.sort_by(|a, b| {
1603 b.1.cmp(&a.1)
1604 .then_with(|| a.0.as_bytes().cmp(b.0.as_bytes()))
1605 });
1606
1607 let items = hot
1608 .into_iter()
1609 .take(count)
1610 .map(|(key, freq)| {
1611 CommandResponse::Array(vec![
1612 CommandResponse::BulkString(key.as_bytes().to_vec()),
1613 CommandResponse::Integer(i64::from(freq)),
1614 ])
1615 })
1616 .collect();
1617 CommandResponse::Array(items)
1618 }
1619
1620 fn cmd_stats_memory(&self, prefixes: &[Vec<u8>]) -> CommandResponse {
1621 let items: Vec<CommandResponse> = prefixes
1622 .iter()
1623 .map(|prefix| {
1624 let total: usize = self
1625 .entries
1626 .iter()
1627 .filter(|(key, entry)| {
1628 !entry.is_expired() && key.as_bytes().starts_with(prefix.as_slice())
1629 })
1630 .map(|(key, entry)| Self::estimate_key_entry_size(key.as_bytes(), &entry.value))
1631 .sum();
1632 CommandResponse::Integer(total as i64)
1633 })
1634 .collect();
1635 CommandResponse::Array(items)
1636 }
1637
1638 fn cmd_sadd(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
1641 self.lazy_expire(key);
1642 let compact = CompactKey::new(key);
1643 let is_new = !self.entries.contains_key(&compact);
1644 let set = self.get_or_create_set(&compact);
1645 match set {
1646 Ok(set) => {
1647 let mut memory_delta: usize = 0;
1648 if is_new {
1649 memory_delta += key.len()
1650 + std::mem::size_of::<CompactKey>()
1651 + std::mem::size_of::<KeyEntry>();
1652 }
1653 let mut count = 0usize;
1654 for m in members {
1655 let val = Value::from_raw_bytes(m);
1656 let size = val.estimated_size();
1657 if set.insert(val) {
1658 memory_delta += size;
1659 count += 1;
1660 }
1661 }
1662 self.memory_used += memory_delta;
1663 CommandResponse::Integer(count as i64)
1664 }
1665 Err(e) => e,
1666 }
1667 }
1668
1669 fn cmd_srem(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
1670 self.lazy_expire(key);
1671 let compact = CompactKey::new(key);
1672 let (count, is_empty) = match self.entries.get_mut(&compact) {
1673 Some(entry) => match &mut entry.value {
1674 Value::Set(set) => {
1675 let c = members
1676 .iter()
1677 .filter(|m| set.remove(&Value::from_raw_bytes(m)))
1678 .count();
1679 (c as i64, set.is_empty())
1680 }
1681 _ => {
1682 return CommandResponse::Error(
1683 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1684 )
1685 }
1686 },
1687 None => return CommandResponse::Integer(0),
1688 };
1689 if is_empty {
1690 self.entries.remove(&compact);
1691 }
1692 CommandResponse::Integer(count)
1693 }
1694
1695 fn cmd_smembers(&mut self, key: &[u8]) -> CommandResponse {
1696 self.lazy_expire(key);
1697 let compact = CompactKey::new(key);
1698 match self.entries.get(&compact) {
1699 Some(entry) => match &entry.value {
1700 Value::Set(set) => {
1701 let results: Vec<CommandResponse> = set
1702 .iter()
1703 .map(|v| match v.bulk_response() {
1704 Some(resp) => resp,
1705 None => CommandResponse::Nil,
1706 })
1707 .collect();
1708 CommandResponse::Array(results)
1709 }
1710 _ => CommandResponse::Error(
1711 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1712 ),
1713 },
1714 None => CommandResponse::Array(vec![]),
1715 }
1716 }
1717
1718 fn cmd_sismember(&mut self, key: &[u8], member: &[u8]) -> CommandResponse {
1719 self.lazy_expire(key);
1720 let compact = CompactKey::new(key);
1721 match self.entries.get(&compact) {
1722 Some(entry) => match &entry.value {
1723 Value::Set(set) => {
1724 let val = Value::from_raw_bytes(member);
1725 CommandResponse::Integer(if set.contains(&val) { 1 } else { 0 })
1726 }
1727 _ => CommandResponse::Error(
1728 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1729 ),
1730 },
1731 None => CommandResponse::Integer(0),
1732 }
1733 }
1734
1735 fn cmd_scard(&mut self, key: &[u8]) -> CommandResponse {
1736 self.lazy_expire(key);
1737 let compact = CompactKey::new(key);
1738 match self.entries.get(&compact) {
1739 Some(entry) => match &entry.value {
1740 Value::Set(set) => CommandResponse::Integer(set.len() as i64),
1741 _ => CommandResponse::Error(
1742 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1743 ),
1744 },
1745 None => CommandResponse::Integer(0),
1746 }
1747 }
1748
1749 fn cmd_spop(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
1750 self.lazy_expire(key);
1751 let compact = CompactKey::new(key);
1752 let result = match self.entries.get_mut(&compact) {
1753 Some(entry) => match &mut entry.value {
1754 Value::Set(set) => {
1755 if set.is_empty() {
1756 return if count.is_some() {
1757 CommandResponse::Array(vec![])
1758 } else {
1759 CommandResponse::Nil
1760 };
1761 }
1762 let n = count.unwrap_or(1);
1763 let mut popped = Vec::with_capacity(n.min(set.len()));
1764 for _ in 0..n {
1765 if set.is_empty() {
1766 break;
1767 }
1768 let idx = (self.eviction_counter as usize) % set.len();
1769 self.eviction_counter = self.eviction_counter.wrapping_add(1);
1770 let val = set.iter().nth(idx).cloned();
1771 if let Some(v) = val {
1772 set.remove(&v);
1773 popped.push(v);
1774 }
1775 }
1776 let is_empty = set.is_empty();
1777 let resp = if count.is_some() {
1778 CommandResponse::Array(
1779 popped
1780 .iter()
1781 .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1782 .collect(),
1783 )
1784 } else {
1785 popped
1786 .first()
1787 .and_then(|v| v.bulk_response())
1788 .unwrap_or(CommandResponse::Nil)
1789 };
1790 (resp, is_empty)
1791 }
1792 _ => {
1793 return CommandResponse::Error(
1794 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1795 )
1796 }
1797 },
1798 None => {
1799 return if count.is_some() {
1800 CommandResponse::Array(vec![])
1801 } else {
1802 CommandResponse::Nil
1803 }
1804 }
1805 };
1806 if result.1 {
1807 self.entries.remove(&compact);
1808 }
1809 result.0
1810 }
1811
1812 fn cmd_srandmember(&mut self, key: &[u8], count: Option<i64>) -> CommandResponse {
1813 self.lazy_expire(key);
1814 let compact = CompactKey::new(key);
1815 match self.entries.get(&compact) {
1816 Some(entry) => match &entry.value {
1817 Value::Set(set) => {
1818 if set.is_empty() {
1819 return if count.is_some() {
1820 CommandResponse::Array(vec![])
1821 } else {
1822 CommandResponse::Nil
1823 };
1824 }
1825 let members: Vec<&Value> = set.iter().collect();
1826 match count {
1827 None => {
1828 let idx = (self.eviction_counter as usize) % members.len();
1829 self.eviction_counter = self.eviction_counter.wrapping_add(1);
1830 members[idx].bulk_response().unwrap_or(CommandResponse::Nil)
1831 }
1832 Some(c) if c >= 0 => {
1833 let take = (c as usize).min(members.len());
1834 let start = (self.eviction_counter as usize) % members.len();
1835 self.eviction_counter = self.eviction_counter.wrapping_add(take as u64);
1836 let mut result = Vec::with_capacity(take);
1837 for i in 0..take {
1838 let idx = (start + i) % members.len();
1839 result.push(
1840 members[idx].bulk_response().unwrap_or(CommandResponse::Nil),
1841 );
1842 }
1843 CommandResponse::Array(result)
1844 }
1845 Some(c) => {
1846 let abs_count = c.unsigned_abs() as usize;
1847 let mut result = Vec::with_capacity(abs_count);
1848 for i in 0..abs_count {
1849 let idx = ((self.eviction_counter as usize) + i) % members.len();
1850 result.push(
1851 members[idx].bulk_response().unwrap_or(CommandResponse::Nil),
1852 );
1853 }
1854 self.eviction_counter =
1855 self.eviction_counter.wrapping_add(abs_count as u64);
1856 CommandResponse::Array(result)
1857 }
1858 }
1859 }
1860 _ => CommandResponse::Error(
1861 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1862 ),
1863 },
1864 None => {
1865 if count.is_some() {
1866 CommandResponse::Array(vec![])
1867 } else {
1868 CommandResponse::Nil
1869 }
1870 }
1871 }
1872 }
1873
1874 fn collect_set_members(&mut self, key: &[u8]) -> Result<HashSet<Value>, CommandResponse> {
1875 self.lazy_expire(key);
1876 let compact = CompactKey::new(key);
1877 match self.entries.get(&compact) {
1878 Some(entry) => match &entry.value {
1879 Value::Set(set) => Ok(set.clone()),
1880 _ => Err(CommandResponse::Error(
1881 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1882 )),
1883 },
1884 None => Ok(HashSet::new()),
1885 }
1886 }
1887
1888 fn cmd_sunion(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1889 let mut result: HashSet<Value> = HashSet::new();
1890 for key in keys {
1891 match self.collect_set_members(key) {
1892 Ok(set) => {
1893 for v in set {
1894 result.insert(v);
1895 }
1896 }
1897 Err(e) => return e,
1898 }
1899 }
1900 CommandResponse::Array(
1901 result
1902 .iter()
1903 .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1904 .collect(),
1905 )
1906 }
1907
1908 fn cmd_sunionstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
1909 let mut result: HashSet<Value> = HashSet::new();
1910 for key in keys {
1911 match self.collect_set_members(key) {
1912 Ok(set) => {
1913 for v in set {
1914 result.insert(v);
1915 }
1916 }
1917 Err(e) => return e,
1918 }
1919 }
1920 let count = result.len() as i64;
1921 self.del(destination);
1922 if !result.is_empty() {
1923 let compact = CompactKey::new(destination);
1924 let mut memory_delta = destination.len()
1925 + std::mem::size_of::<CompactKey>()
1926 + std::mem::size_of::<KeyEntry>();
1927 for v in &result {
1928 memory_delta += v.estimated_size();
1929 }
1930 let entry = KeyEntry::new(compact.clone(), Value::Set(result));
1931 self.entries.insert(compact, entry);
1932 self.memory_used += memory_delta;
1933 }
1934 CommandResponse::Integer(count)
1935 }
1936
1937 fn cmd_sinter(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1938 if keys.is_empty() {
1939 return CommandResponse::Array(vec![]);
1940 }
1941 let first = match self.collect_set_members(&keys[0]) {
1942 Ok(s) => s,
1943 Err(e) => return e,
1944 };
1945 let mut result = first;
1946 for key in &keys[1..] {
1947 match self.collect_set_members(key) {
1948 Ok(set) => {
1949 result = result.intersection(&set).cloned().collect();
1950 }
1951 Err(e) => return e,
1952 }
1953 }
1954 CommandResponse::Array(
1955 result
1956 .iter()
1957 .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1958 .collect(),
1959 )
1960 }
1961
1962 fn cmd_sinterstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
1963 if keys.is_empty() {
1964 self.del(destination);
1965 return CommandResponse::Integer(0);
1966 }
1967 let first = match self.collect_set_members(&keys[0]) {
1968 Ok(s) => s,
1969 Err(e) => return e,
1970 };
1971 let mut result = first;
1972 for key in &keys[1..] {
1973 match self.collect_set_members(key) {
1974 Ok(set) => {
1975 result = result.intersection(&set).cloned().collect();
1976 }
1977 Err(e) => return e,
1978 }
1979 }
1980 let count = result.len() as i64;
1981 self.del(destination);
1982 if !result.is_empty() {
1983 let compact = CompactKey::new(destination);
1984 let mut memory_delta = destination.len()
1985 + std::mem::size_of::<CompactKey>()
1986 + std::mem::size_of::<KeyEntry>();
1987 for v in &result {
1988 memory_delta += v.estimated_size();
1989 }
1990 let entry = KeyEntry::new(compact.clone(), Value::Set(result));
1991 self.entries.insert(compact, entry);
1992 self.memory_used += memory_delta;
1993 }
1994 CommandResponse::Integer(count)
1995 }
1996
1997 fn cmd_sdiff(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1998 if keys.is_empty() {
1999 return CommandResponse::Array(vec![]);
2000 }
2001 let first = match self.collect_set_members(&keys[0]) {
2002 Ok(s) => s,
2003 Err(e) => return e,
2004 };
2005 let mut result = first;
2006 for key in &keys[1..] {
2007 match self.collect_set_members(key) {
2008 Ok(set) => {
2009 result = result.difference(&set).cloned().collect();
2010 }
2011 Err(e) => return e,
2012 }
2013 }
2014 CommandResponse::Array(
2015 result
2016 .iter()
2017 .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
2018 .collect(),
2019 )
2020 }
2021
2022 fn cmd_sdiffstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
2023 if keys.is_empty() {
2024 self.del(destination);
2025 return CommandResponse::Integer(0);
2026 }
2027 let first = match self.collect_set_members(&keys[0]) {
2028 Ok(s) => s,
2029 Err(e) => return e,
2030 };
2031 let mut result = first;
2032 for key in &keys[1..] {
2033 match self.collect_set_members(key) {
2034 Ok(set) => {
2035 result = result.difference(&set).cloned().collect();
2036 }
2037 Err(e) => return e,
2038 }
2039 }
2040 let count = result.len() as i64;
2041 self.del(destination);
2042 if !result.is_empty() {
2043 let compact = CompactKey::new(destination);
2044 let mut memory_delta = destination.len()
2045 + std::mem::size_of::<CompactKey>()
2046 + std::mem::size_of::<KeyEntry>();
2047 for v in &result {
2048 memory_delta += v.estimated_size();
2049 }
2050 let entry = KeyEntry::new(compact.clone(), Value::Set(result));
2051 self.entries.insert(compact, entry);
2052 self.memory_used += memory_delta;
2053 }
2054 CommandResponse::Integer(count)
2055 }
2056
2057 fn cmd_sintercard(&mut self, keys: &[Vec<u8>], limit: Option<usize>) -> CommandResponse {
2058 if keys.is_empty() {
2059 return CommandResponse::Integer(0);
2060 }
2061 let first = match self.collect_set_members(&keys[0]) {
2062 Ok(s) => s,
2063 Err(e) => return e,
2064 };
2065 let mut result = first;
2066 for key in &keys[1..] {
2067 match self.collect_set_members(key) {
2068 Ok(set) => {
2069 result = result.intersection(&set).cloned().collect();
2070 }
2071 Err(e) => return e,
2072 }
2073 }
2074 let count = match limit {
2075 Some(lim) if lim > 0 => result.len().min(lim),
2076 _ => result.len(),
2077 };
2078 CommandResponse::Integer(count as i64)
2079 }
2080
2081 fn cmd_smove(&mut self, source: &[u8], destination: &[u8], member: &[u8]) -> CommandResponse {
2082 self.lazy_expire(source);
2083 self.lazy_expire(destination);
2084 let val = Value::from_raw_bytes(member);
2085 let src_compact = CompactKey::new(source);
2086 let removed = match self.entries.get_mut(&src_compact) {
2087 Some(entry) => match &mut entry.value {
2088 Value::Set(set) => set.remove(&val),
2089 _ => {
2090 return CommandResponse::Error(
2091 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2092 )
2093 }
2094 },
2095 None => return CommandResponse::Integer(0),
2096 };
2097 if !removed {
2098 return CommandResponse::Integer(0);
2099 }
2100 let src_empty = self
2101 .entries
2102 .get(&src_compact)
2103 .map(|e| matches!(&e.value, Value::Set(s) if s.is_empty()))
2104 .unwrap_or(false);
2105 if src_empty {
2106 self.entries.remove(&src_compact);
2107 }
2108 let dst_compact = CompactKey::new(destination);
2109 let dst_set = self.get_or_create_set(&dst_compact);
2110 match dst_set {
2111 Ok(set) => {
2112 set.insert(val);
2113 }
2114 Err(e) => return e,
2115 }
2116 CommandResponse::Integer(1)
2117 }
2118
2119 fn cmd_smismember(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2120 self.lazy_expire(key);
2121 let compact = CompactKey::new(key);
2122 match self.entries.get(&compact) {
2123 Some(entry) => match &entry.value {
2124 Value::Set(set) => {
2125 let results: Vec<CommandResponse> = members
2126 .iter()
2127 .map(|m| {
2128 let val = Value::from_raw_bytes(m);
2129 CommandResponse::Integer(if set.contains(&val) { 1 } else { 0 })
2130 })
2131 .collect();
2132 CommandResponse::Array(results)
2133 }
2134 _ => CommandResponse::Error(
2135 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2136 ),
2137 },
2138 None => CommandResponse::Array(vec![CommandResponse::Integer(0); members.len()]),
2139 }
2140 }
2141
2142 fn cmd_sscan(
2143 &mut self,
2144 key: &[u8],
2145 cursor: u64,
2146 pattern: Option<&str>,
2147 count: usize,
2148 ) -> CommandResponse {
2149 self.lazy_expire(key);
2150 let compact = CompactKey::new(key);
2151 match self.entries.get(&compact) {
2152 Some(entry) => match &entry.value {
2153 Value::Set(set) => {
2154 let mut members: Vec<&Value> = set
2155 .iter()
2156 .filter(|v| {
2157 pattern.is_none_or(|p| {
2158 if let Some(CommandResponse::BulkString(b)) = v.bulk_response() {
2159 glob_match(p, &b)
2160 } else {
2161 false
2162 }
2163 })
2164 })
2165 .collect();
2166 members.sort_by(|a, b| {
2167 let ab = a.to_bytes();
2168 let bb = b.to_bytes();
2169 ab.cmp(&bb)
2170 });
2171
2172 let start = cursor as usize;
2173 let end = (start + count).min(members.len());
2174
2175 if start >= members.len() {
2176 return CommandResponse::Array(vec![
2177 CommandResponse::BulkString(b"0".to_vec()),
2178 CommandResponse::Array(vec![]),
2179 ]);
2180 }
2181
2182 let mut results = Vec::with_capacity(end - start);
2183 for v in &members[start..end] {
2184 results.push(v.bulk_response().unwrap_or(CommandResponse::Nil));
2185 }
2186
2187 let next_cursor = if end >= members.len() { 0 } else { end as u64 };
2188
2189 CommandResponse::Array(vec![
2190 CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
2191 CommandResponse::Array(results),
2192 ])
2193 }
2194 _ => CommandResponse::Error(
2195 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2196 ),
2197 },
2198 None => CommandResponse::Array(vec![
2199 CommandResponse::BulkString(b"0".to_vec()),
2200 CommandResponse::Array(vec![]),
2201 ]),
2202 }
2203 }
2204
2205 fn cmd_zadd(&mut self, key: &[u8], members: &[(f64, Vec<u8>)]) -> CommandResponse {
2208 self.lazy_expire(key);
2209 let compact = CompactKey::new(key);
2210 let is_new = !self.entries.contains_key(&compact);
2211 let zset = self.get_or_create_sorted_set(&compact);
2212 match zset {
2213 Ok(map) => {
2214 let mut memory_delta: usize = 0;
2215 if is_new {
2216 memory_delta += key.len()
2217 + std::mem::size_of::<CompactKey>()
2218 + std::mem::size_of::<KeyEntry>();
2219 }
2220 let mut added = 0i64;
2221 let member_overhead = std::mem::size_of::<f64>();
2222 for (score, member) in members {
2223 if map.insert(member.clone(), *score).is_none() {
2224 memory_delta += member.len() + member_overhead;
2225 added += 1;
2226 }
2227 }
2228 self.memory_used += memory_delta;
2229 CommandResponse::Integer(added)
2230 }
2231 Err(e) => e,
2232 }
2233 }
2234
2235 fn cmd_zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2236 self.lazy_expire(key);
2237 let compact = CompactKey::new(key);
2238 let (count, is_empty) = match self.entries.get_mut(&compact) {
2239 Some(entry) => match &mut entry.value {
2240 Value::SortedSet(map) => {
2241 let c = members.iter().filter(|m| map.remove(*m).is_some()).count();
2242 (c as i64, map.is_empty())
2243 }
2244 _ => {
2245 return CommandResponse::Error(
2246 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2247 )
2248 }
2249 },
2250 None => return CommandResponse::Integer(0),
2251 };
2252 if is_empty {
2253 self.entries.remove(&compact);
2254 }
2255 CommandResponse::Integer(count)
2256 }
2257
2258 fn cmd_zscore(&mut self, key: &[u8], member: &[u8]) -> CommandResponse {
2259 self.lazy_expire(key);
2260 let compact = CompactKey::new(key);
2261 match self.entries.get(&compact) {
2262 Some(entry) => match &entry.value {
2263 Value::SortedSet(map) => match map.get(member) {
2264 Some(score) => CommandResponse::BulkString(format!("{}", score).into_bytes()),
2265 None => CommandResponse::Nil,
2266 },
2267 _ => CommandResponse::Error(
2268 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2269 ),
2270 },
2271 None => CommandResponse::Nil,
2272 }
2273 }
2274
2275 fn cmd_zrank(&mut self, key: &[u8], member: &[u8], reverse: bool) -> CommandResponse {
2276 self.lazy_expire(key);
2277 let compact = CompactKey::new(key);
2278 match self.entries.get(&compact) {
2279 Some(entry) => match &entry.value {
2280 Value::SortedSet(map) => {
2281 if !map.contains_key(member) {
2282 return CommandResponse::Nil;
2283 }
2284 let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2285 sorted.sort_by(|a, b| {
2286 a.1.partial_cmp(b.1)
2287 .unwrap_or(std::cmp::Ordering::Equal)
2288 .then_with(|| a.0.cmp(b.0))
2289 });
2290 if reverse {
2291 sorted.reverse();
2292 }
2293 match sorted.iter().position(|(m, _)| m.as_slice() == member) {
2294 Some(pos) => CommandResponse::Integer(pos as i64),
2295 None => CommandResponse::Nil,
2296 }
2297 }
2298 _ => CommandResponse::Error(
2299 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2300 ),
2301 },
2302 None => CommandResponse::Nil,
2303 }
2304 }
2305
2306 fn cmd_zcard(&mut self, key: &[u8]) -> CommandResponse {
2307 self.lazy_expire(key);
2308 let compact = CompactKey::new(key);
2309 match self.entries.get(&compact) {
2310 Some(entry) => match &entry.value {
2311 Value::SortedSet(map) => CommandResponse::Integer(map.len() as i64),
2312 _ => CommandResponse::Error(
2313 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2314 ),
2315 },
2316 None => CommandResponse::Integer(0),
2317 }
2318 }
2319
2320 fn cmd_zrange(
2321 &mut self,
2322 key: &[u8],
2323 start: i64,
2324 stop: i64,
2325 withscores: bool,
2326 reverse: bool,
2327 ) -> CommandResponse {
2328 self.lazy_expire(key);
2329 let compact = CompactKey::new(key);
2330 match self.entries.get(&compact) {
2331 Some(entry) => match &entry.value {
2332 Value::SortedSet(map) => {
2333 let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2334 sorted.sort_by(|a, b| {
2335 a.1.partial_cmp(b.1)
2336 .unwrap_or(std::cmp::Ordering::Equal)
2337 .then_with(|| a.0.cmp(b.0))
2338 });
2339 if reverse {
2340 sorted.reverse();
2341 }
2342 let len = sorted.len() as i64;
2343 let s = normalize_index(start, len);
2344 let e = normalize_index(stop, len);
2345 if s > e || s >= len as usize {
2346 return CommandResponse::Array(vec![]);
2347 }
2348 let e = e.min(len as usize - 1);
2349 let mut results = Vec::new();
2350 for (member, score) in &sorted[s..=e] {
2351 results.push(CommandResponse::BulkString(member.to_vec()));
2352 if withscores {
2353 results.push(CommandResponse::BulkString(
2354 format!("{}", score).into_bytes(),
2355 ));
2356 }
2357 }
2358 CommandResponse::Array(results)
2359 }
2360 _ => CommandResponse::Error(
2361 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2362 ),
2363 },
2364 None => CommandResponse::Array(vec![]),
2365 }
2366 }
2367
2368 fn cmd_zrangebyscore(
2369 &mut self,
2370 key: &[u8],
2371 min: f64,
2372 max: f64,
2373 withscores: bool,
2374 offset: Option<usize>,
2375 count: Option<usize>,
2376 ) -> CommandResponse {
2377 self.lazy_expire(key);
2378 let compact = CompactKey::new(key);
2379 match self.entries.get(&compact) {
2380 Some(entry) => match &entry.value {
2381 Value::SortedSet(map) => {
2382 let mut sorted: Vec<(&Vec<u8>, &f64)> = map
2383 .iter()
2384 .filter(|(_, s)| **s >= min && **s <= max)
2385 .collect();
2386 sorted.sort_by(|a, b| {
2387 a.1.partial_cmp(b.1)
2388 .unwrap_or(std::cmp::Ordering::Equal)
2389 .then_with(|| a.0.cmp(b.0))
2390 });
2391 let off = offset.unwrap_or(0);
2392 let iter: Box<dyn Iterator<Item = &(&Vec<u8>, &f64)>> = if let Some(c) = count {
2393 Box::new(sorted.iter().skip(off).take(c))
2394 } else {
2395 Box::new(sorted.iter().skip(off))
2396 };
2397 let mut results = Vec::new();
2398 for (member, score) in iter {
2399 results.push(CommandResponse::BulkString(member.to_vec()));
2400 if withscores {
2401 results.push(CommandResponse::BulkString(
2402 format!("{}", score).into_bytes(),
2403 ));
2404 }
2405 }
2406 CommandResponse::Array(results)
2407 }
2408 _ => CommandResponse::Error(
2409 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2410 ),
2411 },
2412 None => CommandResponse::Array(vec![]),
2413 }
2414 }
2415
2416 fn cmd_zincrby(&mut self, key: &[u8], delta: f64, member: &[u8]) -> CommandResponse {
2417 self.lazy_expire(key);
2418 let compact = CompactKey::new(key);
2419 let zset = self.get_or_create_sorted_set(&compact);
2420 match zset {
2421 Ok(map) => {
2422 let new_score = map.get(member).copied().unwrap_or(0.0) + delta;
2423 map.insert(member.to_vec(), new_score);
2424 CommandResponse::BulkString(format!("{}", new_score).into_bytes())
2425 }
2426 Err(e) => e,
2427 }
2428 }
2429
2430 fn cmd_zcount(&mut self, key: &[u8], min: f64, max: f64) -> CommandResponse {
2431 self.lazy_expire(key);
2432 let compact = CompactKey::new(key);
2433 match self.entries.get(&compact) {
2434 Some(entry) => match &entry.value {
2435 Value::SortedSet(map) => {
2436 let count = map.values().filter(|s| **s >= min && **s <= max).count();
2437 CommandResponse::Integer(count as i64)
2438 }
2439 _ => CommandResponse::Error(
2440 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2441 ),
2442 },
2443 None => CommandResponse::Integer(0),
2444 }
2445 }
2446
2447 fn cmd_zrevrangebyscore(
2448 &mut self,
2449 key: &[u8],
2450 max: f64,
2451 min: f64,
2452 withscores: bool,
2453 offset: Option<usize>,
2454 count: Option<usize>,
2455 ) -> CommandResponse {
2456 self.lazy_expire(key);
2457 let compact = CompactKey::new(key);
2458 match self.entries.get(&compact) {
2459 Some(entry) => match &entry.value {
2460 Value::SortedSet(map) => {
2461 let mut sorted: Vec<(&Vec<u8>, &f64)> = map
2462 .iter()
2463 .filter(|(_, s)| **s >= min && **s <= max)
2464 .collect();
2465 sorted.sort_by(|a, b| {
2466 b.1.partial_cmp(a.1)
2467 .unwrap_or(std::cmp::Ordering::Equal)
2468 .then_with(|| b.0.cmp(a.0))
2469 });
2470 let off = offset.unwrap_or(0);
2471 let iter: Box<dyn Iterator<Item = &(&Vec<u8>, &f64)>> = if let Some(c) = count {
2472 Box::new(sorted.iter().skip(off).take(c))
2473 } else {
2474 Box::new(sorted.iter().skip(off))
2475 };
2476 let mut results = Vec::new();
2477 for (member, score) in iter {
2478 results.push(CommandResponse::BulkString(member.to_vec()));
2479 if withscores {
2480 results.push(CommandResponse::BulkString(
2481 format!("{}", score).into_bytes(),
2482 ));
2483 }
2484 }
2485 CommandResponse::Array(results)
2486 }
2487 _ => CommandResponse::Error(
2488 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2489 ),
2490 },
2491 None => CommandResponse::Array(vec![]),
2492 }
2493 }
2494
2495 fn cmd_zpopmin(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
2496 self.lazy_expire(key);
2497 let compact = CompactKey::new(key);
2498 let result = match self.entries.get_mut(&compact) {
2499 Some(entry) => match &mut entry.value {
2500 Value::SortedSet(map) => {
2501 if map.is_empty() {
2502 return CommandResponse::Array(vec![]);
2503 }
2504 let n = count.unwrap_or(1);
2505 let mut sorted: Vec<(Vec<u8>, f64)> =
2506 map.iter().map(|(m, s)| (m.clone(), *s)).collect();
2507 sorted.sort_by(|a, b| {
2508 a.1.partial_cmp(&b.1)
2509 .unwrap_or(std::cmp::Ordering::Equal)
2510 .then_with(|| a.0.cmp(&b.0))
2511 });
2512 let take = n.min(sorted.len());
2513 let mut results = Vec::with_capacity(take * 2);
2514 for (member, score) in sorted.iter().take(take) {
2515 map.remove(member);
2516 results.push(CommandResponse::BulkString(member.clone()));
2517 results.push(CommandResponse::BulkString(
2518 format!("{}", score).into_bytes(),
2519 ));
2520 }
2521 let is_empty = map.is_empty();
2522 (CommandResponse::Array(results), is_empty)
2523 }
2524 _ => {
2525 return CommandResponse::Error(
2526 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2527 )
2528 }
2529 },
2530 None => return CommandResponse::Array(vec![]),
2531 };
2532 if result.1 {
2533 self.entries.remove(&compact);
2534 }
2535 result.0
2536 }
2537
2538 fn cmd_zpopmax(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
2539 self.lazy_expire(key);
2540 let compact = CompactKey::new(key);
2541 let result = match self.entries.get_mut(&compact) {
2542 Some(entry) => match &mut entry.value {
2543 Value::SortedSet(map) => {
2544 if map.is_empty() {
2545 return CommandResponse::Array(vec![]);
2546 }
2547 let n = count.unwrap_or(1);
2548 let mut sorted: Vec<(Vec<u8>, f64)> =
2549 map.iter().map(|(m, s)| (m.clone(), *s)).collect();
2550 sorted.sort_by(|a, b| {
2551 b.1.partial_cmp(&a.1)
2552 .unwrap_or(std::cmp::Ordering::Equal)
2553 .then_with(|| b.0.cmp(&a.0))
2554 });
2555 let take = n.min(sorted.len());
2556 let mut results = Vec::with_capacity(take * 2);
2557 for (member, score) in sorted.iter().take(take) {
2558 map.remove(member);
2559 results.push(CommandResponse::BulkString(member.clone()));
2560 results.push(CommandResponse::BulkString(
2561 format!("{}", score).into_bytes(),
2562 ));
2563 }
2564 let is_empty = map.is_empty();
2565 (CommandResponse::Array(results), is_empty)
2566 }
2567 _ => {
2568 return CommandResponse::Error(
2569 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2570 )
2571 }
2572 },
2573 None => return CommandResponse::Array(vec![]),
2574 };
2575 if result.1 {
2576 self.entries.remove(&compact);
2577 }
2578 result.0
2579 }
2580
2581 fn cmd_zrangebylex(
2582 &mut self,
2583 key: &[u8],
2584 min: &[u8],
2585 max: &[u8],
2586 offset: Option<usize>,
2587 count: Option<usize>,
2588 reverse: bool,
2589 ) -> CommandResponse {
2590 self.lazy_expire(key);
2591 let compact = CompactKey::new(key);
2592 match self.entries.get(&compact) {
2593 Some(entry) => match &entry.value {
2594 Value::SortedSet(map) => {
2595 let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2596 sorted.sort_by(|a, b| {
2597 a.1.partial_cmp(b.1)
2598 .unwrap_or(std::cmp::Ordering::Equal)
2599 .then_with(|| a.0.cmp(b.0))
2600 });
2601 if reverse {
2602 sorted.reverse();
2603 }
2604 let filtered: Vec<&Vec<u8>> = sorted
2605 .iter()
2606 .map(|(m, _)| *m)
2607 .filter(|m| lex_in_range(m, min, max))
2608 .collect();
2609 let off = offset.unwrap_or(0);
2610 let iter: Box<dyn Iterator<Item = &&Vec<u8>>> = if let Some(c) = count {
2611 Box::new(filtered.iter().skip(off).take(c))
2612 } else {
2613 Box::new(filtered.iter().skip(off))
2614 };
2615 let results: Vec<CommandResponse> = iter
2616 .map(|m| CommandResponse::BulkString((*m).clone()))
2617 .collect();
2618 CommandResponse::Array(results)
2619 }
2620 _ => CommandResponse::Error(
2621 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2622 ),
2623 },
2624 None => CommandResponse::Array(vec![]),
2625 }
2626 }
2627
2628 fn cmd_zlexcount(&mut self, key: &[u8], min: &[u8], max: &[u8]) -> CommandResponse {
2629 self.lazy_expire(key);
2630 let compact = CompactKey::new(key);
2631 match self.entries.get(&compact) {
2632 Some(entry) => match &entry.value {
2633 Value::SortedSet(map) => {
2634 let count = map.keys().filter(|m| lex_in_range(m, min, max)).count();
2635 CommandResponse::Integer(count as i64)
2636 }
2637 _ => CommandResponse::Error(
2638 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2639 ),
2640 },
2641 None => CommandResponse::Integer(0),
2642 }
2643 }
2644
2645 fn cmd_zmscore(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2646 self.lazy_expire(key);
2647 let compact = CompactKey::new(key);
2648 match self.entries.get(&compact) {
2649 Some(entry) => match &entry.value {
2650 Value::SortedSet(map) => {
2651 let results: Vec<CommandResponse> = members
2652 .iter()
2653 .map(|m| match map.get(m.as_slice()) {
2654 Some(score) => {
2655 CommandResponse::BulkString(format!("{}", score).into_bytes())
2656 }
2657 None => CommandResponse::Nil,
2658 })
2659 .collect();
2660 CommandResponse::Array(results)
2661 }
2662 _ => CommandResponse::Error(
2663 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2664 ),
2665 },
2666 None => CommandResponse::Array(vec![CommandResponse::Nil; members.len()]),
2667 }
2668 }
2669
2670 fn cmd_zrandmember(
2671 &mut self,
2672 key: &[u8],
2673 count: Option<i64>,
2674 withscores: bool,
2675 ) -> CommandResponse {
2676 self.lazy_expire(key);
2677 let compact = CompactKey::new(key);
2678 match self.entries.get(&compact) {
2679 Some(entry) => match &entry.value {
2680 Value::SortedSet(map) => {
2681 if map.is_empty() {
2682 return if count.is_some() {
2683 CommandResponse::Array(vec![])
2684 } else {
2685 CommandResponse::Nil
2686 };
2687 }
2688 let members: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2689 match count {
2690 None => {
2691 let idx = (self.eviction_counter as usize) % members.len();
2692 self.eviction_counter = self.eviction_counter.wrapping_add(1);
2693 CommandResponse::BulkString(members[idx].0.clone())
2694 }
2695 Some(c) if c >= 0 => {
2696 let take = (c as usize).min(members.len());
2697 let start = (self.eviction_counter as usize) % members.len();
2698 self.eviction_counter = self.eviction_counter.wrapping_add(take as u64);
2699 let mut result =
2700 Vec::with_capacity(if withscores { take * 2 } else { take });
2701 for i in 0..take {
2702 let idx = (start + i) % members.len();
2703 result.push(CommandResponse::BulkString(members[idx].0.clone()));
2704 if withscores {
2705 result.push(CommandResponse::BulkString(
2706 format!("{}", members[idx].1).into_bytes(),
2707 ));
2708 }
2709 }
2710 CommandResponse::Array(result)
2711 }
2712 Some(c) => {
2713 let abs_count = c.unsigned_abs() as usize;
2714 let mut result = Vec::with_capacity(if withscores {
2715 abs_count * 2
2716 } else {
2717 abs_count
2718 });
2719 for i in 0..abs_count {
2720 let idx = ((self.eviction_counter as usize) + i) % members.len();
2721 result.push(CommandResponse::BulkString(members[idx].0.clone()));
2722 if withscores {
2723 result.push(CommandResponse::BulkString(
2724 format!("{}", members[idx].1).into_bytes(),
2725 ));
2726 }
2727 }
2728 self.eviction_counter =
2729 self.eviction_counter.wrapping_add(abs_count as u64);
2730 CommandResponse::Array(result)
2731 }
2732 }
2733 }
2734 _ => CommandResponse::Error(
2735 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2736 ),
2737 },
2738 None => {
2739 if count.is_some() {
2740 CommandResponse::Array(vec![])
2741 } else {
2742 CommandResponse::Nil
2743 }
2744 }
2745 }
2746 }
2747
2748 fn cmd_zscan(
2749 &mut self,
2750 key: &[u8],
2751 cursor: u64,
2752 pattern: Option<&str>,
2753 count: usize,
2754 ) -> CommandResponse {
2755 self.lazy_expire(key);
2756 let compact = CompactKey::new(key);
2757 match self.entries.get(&compact) {
2758 Some(entry) => match &entry.value {
2759 Value::SortedSet(map) => {
2760 let mut members: Vec<(&Vec<u8>, &f64)> = map
2761 .iter()
2762 .filter(|(m, _)| pattern.is_none_or(|p| glob_match(p, m)))
2763 .collect();
2764 members.sort_by(|a, b| a.0.cmp(b.0));
2765
2766 let start = cursor as usize;
2767 let end = (start + count).min(members.len());
2768
2769 if start >= members.len() {
2770 return CommandResponse::Array(vec![
2771 CommandResponse::BulkString(b"0".to_vec()),
2772 CommandResponse::Array(vec![]),
2773 ]);
2774 }
2775
2776 let mut results = Vec::with_capacity((end - start) * 2);
2777 for (m, s) in &members[start..end] {
2778 results.push(CommandResponse::BulkString((*m).clone()));
2779 results.push(CommandResponse::BulkString(format!("{}", s).into_bytes()));
2780 }
2781
2782 let next_cursor = if end >= members.len() { 0 } else { end as u64 };
2783
2784 CommandResponse::Array(vec![
2785 CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
2786 CommandResponse::Array(results),
2787 ])
2788 }
2789 _ => CommandResponse::Error(
2790 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2791 ),
2792 },
2793 None => CommandResponse::Array(vec![
2794 CommandResponse::BulkString(b"0".to_vec()),
2795 CommandResponse::Array(vec![]),
2796 ]),
2797 }
2798 }
2799
2800 fn cmd_pfadd(&mut self, key: &[u8], elements: &[Vec<u8>]) -> CommandResponse {
2803 self.lazy_expire(key);
2804 let compact = CompactKey::new(key);
2805 let is_new = !self.entries.contains_key(&compact);
2806 let mut registers = if is_new {
2807 vec![0u8; HLL_REGISTER_BYTES]
2808 } else {
2809 match self.entries.get(&compact) {
2810 Some(entry) => match entry.value.as_bytes() {
2811 Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => bytes,
2812 Some(_) | None => {
2813 return CommandResponse::Error(
2814 "WRONGTYPE Operation against a key holding the wrong kind of value"
2815 .into(),
2816 )
2817 }
2818 },
2819 None => vec![0u8; HLL_REGISTER_BYTES],
2820 }
2821 };
2822
2823 let mut changed = false;
2824 for elem in elements {
2825 if hll_add(&mut registers, elem) {
2826 changed = true;
2827 }
2828 }
2829
2830 if changed || is_new {
2831 let mut memory_delta = 0usize;
2832 if is_new {
2833 memory_delta += key.len()
2834 + std::mem::size_of::<CompactKey>()
2835 + std::mem::size_of::<KeyEntry>()
2836 + HLL_REGISTER_BYTES;
2837 }
2838 let entry = KeyEntry::new(compact.clone(), Value::from_raw_bytes(®isters));
2839 self.entries.insert(compact, entry);
2840 self.memory_used += memory_delta;
2841 }
2842
2843 CommandResponse::Integer(if changed { 1 } else { 0 })
2844 }
2845
2846 fn cmd_pfcount_single(&mut self, key: &[u8]) -> CommandResponse {
2847 self.lazy_expire(key);
2848 let compact = CompactKey::new(key);
2849 match self.entries.get(&compact) {
2850 Some(entry) => match entry.value.as_bytes() {
2851 Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2852 CommandResponse::Integer(hll_count(&bytes) as i64)
2853 }
2854 Some(_) => CommandResponse::Error(
2855 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2856 ),
2857 None => CommandResponse::Error(
2858 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2859 ),
2860 },
2861 None => CommandResponse::Integer(0),
2862 }
2863 }
2864
2865 fn cmd_pfcount_multi(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
2866 let mut merged = vec![0u8; HLL_REGISTER_BYTES];
2867 for key in keys {
2868 self.lazy_expire(key);
2869 let compact = CompactKey::new(key);
2870 if let Some(entry) = self.entries.get(&compact) {
2871 match entry.value.as_bytes() {
2872 Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2873 hll_merge(&mut merged, &bytes);
2874 }
2875 Some(_) => {
2876 return CommandResponse::Error(
2877 "WRONGTYPE Operation against a key holding the wrong kind of value"
2878 .into(),
2879 )
2880 }
2881 None => {
2882 return CommandResponse::Error(
2883 "WRONGTYPE Operation against a key holding the wrong kind of value"
2884 .into(),
2885 )
2886 }
2887 }
2888 }
2889 }
2890 CommandResponse::Integer(hll_count(&merged) as i64)
2891 }
2892
2893 fn cmd_pfmerge(&mut self, destkey: &[u8], sourcekeys: &[Vec<u8>]) -> CommandResponse {
2894 let mut merged = vec![0u8; HLL_REGISTER_BYTES];
2895
2896 let dest_compact = CompactKey::new(destkey);
2897 self.lazy_expire(destkey);
2898 if let Some(entry) = self.entries.get(&dest_compact) {
2899 match entry.value.as_bytes() {
2900 Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2901 hll_merge(&mut merged, &bytes);
2902 }
2903 Some(_) => {
2904 return CommandResponse::Error(
2905 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2906 )
2907 }
2908 None => {
2909 return CommandResponse::Error(
2910 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2911 )
2912 }
2913 }
2914 }
2915
2916 for key in sourcekeys {
2917 self.lazy_expire(key);
2918 let compact = CompactKey::new(key);
2919 if let Some(entry) = self.entries.get(&compact) {
2920 match entry.value.as_bytes() {
2921 Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2922 hll_merge(&mut merged, &bytes);
2923 }
2924 Some(_) => {
2925 return CommandResponse::Error(
2926 "WRONGTYPE Operation against a key holding the wrong kind of value"
2927 .into(),
2928 )
2929 }
2930 None => {
2931 return CommandResponse::Error(
2932 "WRONGTYPE Operation against a key holding the wrong kind of value"
2933 .into(),
2934 )
2935 }
2936 }
2937 }
2938 }
2939
2940 let is_new = !self.entries.contains_key(&dest_compact);
2941 let mut memory_delta = 0usize;
2942 if is_new {
2943 memory_delta += destkey.len()
2944 + std::mem::size_of::<CompactKey>()
2945 + std::mem::size_of::<KeyEntry>()
2946 + HLL_REGISTER_BYTES;
2947 }
2948 let entry = KeyEntry::new(dest_compact.clone(), Value::from_raw_bytes(&merged));
2949 self.entries.insert(dest_compact, entry);
2950 self.memory_used += memory_delta;
2951
2952 CommandResponse::Ok
2953 }
2954
2955 fn get_string_bytes(&mut self, key: &[u8]) -> Option<Vec<u8>> {
2958 let compact = CompactKey::new(key);
2959 self.entries.get(&compact).and_then(|e| e.value.as_bytes())
2960 }
2961
2962 fn set_string_value(&mut self, key: &[u8], data: Vec<u8>) {
2963 let compact = CompactKey::new(key);
2964 let is_new = !self.entries.contains_key(&compact);
2965 let new_value = Value::from_raw_bytes(&data);
2966 if is_new {
2967 let size = Self::estimate_key_entry_size(key, &new_value);
2968 let entry = KeyEntry::new(compact.clone(), new_value);
2969 self.entries.insert(compact, entry);
2970 self.memory_used += size;
2971 } else {
2972 let entry = self
2973 .entries
2974 .get_mut(&compact)
2975 .expect("checked contains_key");
2976 let old_size = entry.value.estimated_size();
2977 entry.value = new_value;
2978 let new_size = entry.value.estimated_size();
2979 if new_size > old_size {
2980 self.memory_used += new_size - old_size;
2981 } else {
2982 self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
2983 }
2984 }
2985 }
2986
2987 fn cmd_setbit(&mut self, key: &[u8], offset: u64, value: u8) -> CommandResponse {
2988 self.lazy_expire(key);
2989 let byte_idx = (offset / 8) as usize;
2990 let bit_idx = 7 - (offset % 8) as usize;
2991
2992 let mut data = self.get_string_bytes(key).unwrap_or_default();
2993
2994 if byte_idx >= data.len() {
2995 data.resize(byte_idx + 1, 0);
2996 }
2997
2998 let old_bit = (data[byte_idx] >> bit_idx) & 1;
2999
3000 if value == 1 {
3001 data[byte_idx] |= 1 << bit_idx;
3002 } else {
3003 data[byte_idx] &= !(1 << bit_idx);
3004 }
3005
3006 self.set_string_value(key, data);
3007 CommandResponse::Integer(old_bit as i64)
3008 }
3009
3010 fn cmd_getbit(&mut self, key: &[u8], offset: u64) -> CommandResponse {
3011 self.lazy_expire(key);
3012 let byte_idx = (offset / 8) as usize;
3013 let bit_idx = 7 - (offset % 8) as usize;
3014
3015 match self.get_string_bytes(key) {
3016 Some(data) => {
3017 if byte_idx >= data.len() {
3018 CommandResponse::Integer(0)
3019 } else {
3020 CommandResponse::Integer(((data[byte_idx] >> bit_idx) & 1) as i64)
3021 }
3022 }
3023 None => CommandResponse::Integer(0),
3024 }
3025 }
3026
3027 fn cmd_bitcount(
3028 &mut self,
3029 key: &[u8],
3030 start: Option<i64>,
3031 end: Option<i64>,
3032 use_bit: bool,
3033 ) -> CommandResponse {
3034 self.lazy_expire(key);
3035 let data = match self.get_string_bytes(key) {
3036 Some(d) => d,
3037 None => return CommandResponse::Integer(0),
3038 };
3039
3040 if data.is_empty() {
3041 return CommandResponse::Integer(0);
3042 }
3043
3044 if use_bit {
3045 let total_bits = (data.len() * 8) as i64;
3046 let s = normalize_index(start.unwrap_or(0), total_bits);
3047 let e = normalize_index(end.unwrap_or(total_bits - 1), total_bits);
3048 if s > e {
3049 return CommandResponse::Integer(0);
3050 }
3051 let mut count = 0i64;
3052 for bit_pos in s..=e {
3053 let byte_idx = bit_pos / 8;
3054 let bit_idx = 7 - (bit_pos % 8);
3055 if byte_idx < data.len() && (data[byte_idx] >> bit_idx) & 1 == 1 {
3056 count += 1;
3057 }
3058 }
3059 CommandResponse::Integer(count)
3060 } else {
3061 let len = data.len() as i64;
3062 let s = normalize_index(start.unwrap_or(0), len);
3063 let e = normalize_index(end.unwrap_or(len - 1), len);
3064 if s > e || s >= data.len() {
3065 return CommandResponse::Integer(0);
3066 }
3067 let e = e.min(data.len() - 1);
3068 let count: u32 = data[s..=e].iter().map(|b| b.count_ones()).sum();
3069 CommandResponse::Integer(count as i64)
3070 }
3071 }
3072
3073 fn cmd_bitop(
3074 &mut self,
3075 operation: BitOperation,
3076 destkey: &[u8],
3077 keys: &[Vec<u8>],
3078 ) -> CommandResponse {
3079 let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(keys.len());
3080 let mut max_len = 0usize;
3081
3082 for key in keys {
3083 self.lazy_expire(key);
3084 let data = self.get_string_bytes(key).unwrap_or_default();
3085 max_len = max_len.max(data.len());
3086 buffers.push(data);
3087 }
3088
3089 if buffers.is_empty() {
3090 self.set_string_value(destkey, Vec::new());
3091 return CommandResponse::Integer(0);
3092 }
3093
3094 let mut result = vec![0u8; max_len];
3095
3096 match operation {
3097 BitOperation::And => {
3098 result = vec![0xFF; max_len];
3099 for buf in &buffers {
3100 for (i, byte) in result.iter_mut().enumerate() {
3101 let src = if i < buf.len() { buf[i] } else { 0 };
3102 *byte &= src;
3103 }
3104 }
3105 }
3106 BitOperation::Or => {
3107 for buf in &buffers {
3108 for (i, &src) in buf.iter().enumerate() {
3109 result[i] |= src;
3110 }
3111 }
3112 }
3113 BitOperation::Xor => {
3114 for buf in &buffers {
3115 for (i, &src) in buf.iter().enumerate() {
3116 result[i] ^= src;
3117 }
3118 }
3119 }
3120 BitOperation::Not => {
3121 let buf = &buffers[0];
3122 result = vec![0u8; buf.len()];
3123 for (i, &src) in buf.iter().enumerate() {
3124 result[i] = !src;
3125 }
3126 }
3127 }
3128
3129 let len = result.len() as i64;
3130 self.set_string_value(destkey, result);
3131 CommandResponse::Integer(len)
3132 }
3133
3134 fn cmd_bitpos(
3135 &mut self,
3136 key: &[u8],
3137 bit: u8,
3138 start: Option<i64>,
3139 end: Option<i64>,
3140 use_bit: bool,
3141 ) -> CommandResponse {
3142 self.lazy_expire(key);
3143 let data = match self.get_string_bytes(key) {
3144 Some(d) if !d.is_empty() => d,
3145 _ => {
3146 return if bit == 0 {
3147 CommandResponse::Integer(0)
3148 } else {
3149 CommandResponse::Integer(-1)
3150 };
3151 }
3152 };
3153
3154 let has_explicit_end = end.is_some();
3155
3156 if use_bit {
3157 let total_bits = (data.len() * 8) as i64;
3158 let s = normalize_index(start.unwrap_or(0), total_bits);
3159 let e = normalize_index(end.unwrap_or(total_bits - 1), total_bits);
3160 if s > e {
3161 return CommandResponse::Integer(-1);
3162 }
3163 for bit_pos in s..=e.min(data.len() * 8 - 1) {
3164 let byte_idx = bit_pos / 8;
3165 let bit_idx = 7 - (bit_pos % 8);
3166 let current = (data[byte_idx] >> bit_idx) & 1;
3167 if current == bit {
3168 return CommandResponse::Integer(bit_pos as i64);
3169 }
3170 }
3171 CommandResponse::Integer(-1)
3172 } else {
3173 let len = data.len() as i64;
3174 let s = normalize_index(start.unwrap_or(0), len);
3175 let e = normalize_index(end.unwrap_or(len - 1), len);
3176 if s > e || s >= data.len() {
3177 return CommandResponse::Integer(-1);
3178 }
3179 let e = e.min(data.len() - 1);
3180 for (byte_idx, &byte_val) in data.iter().enumerate().take(e + 1).skip(s) {
3181 for bit_idx in (0..8).rev() {
3182 let current = (byte_val >> bit_idx) & 1;
3183 if current == bit {
3184 return CommandResponse::Integer((byte_idx * 8 + (7 - bit_idx)) as i64);
3185 }
3186 }
3187 }
3188 if bit == 0 && !has_explicit_end {
3189 CommandResponse::Integer((data.len() * 8) as i64)
3190 } else {
3191 CommandResponse::Integer(-1)
3192 }
3193 }
3194 }
3195
3196 fn cmd_bitfield(&mut self, key: &[u8], operations: &[BitFieldOperation]) -> CommandResponse {
3197 self.lazy_expire(key);
3198 let compact = CompactKey::new(key);
3199
3200 let mut data = match self.entries.get(&compact) {
3201 Some(entry) => match &entry.value {
3202 Value::InlineStr { .. } | Value::HeapStr(_) | Value::Int(_) => {
3203 entry.value.as_bytes().unwrap_or_default()
3204 }
3205 _ => {
3206 return CommandResponse::Error(
3207 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3208 )
3209 }
3210 },
3211 None => Vec::new(),
3212 };
3213
3214 let mut overflow = BitFieldOverflow::Wrap;
3215 let mut responses = Vec::new();
3216 let mut dirty = false;
3217
3218 for op in operations {
3219 match *op {
3220 BitFieldOperation::Overflow(mode) => overflow = mode,
3221 BitFieldOperation::Get { encoding, offset } => {
3222 let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3223 return CommandResponse::Error(
3224 "ERR bit offset is not an integer or out of range".into(),
3225 );
3226 };
3227 let value = Self::bitfield_read(&data, bit_offset, encoding);
3228 responses.push(CommandResponse::Integer(value));
3229 }
3230 BitFieldOperation::Set {
3231 encoding,
3232 offset,
3233 value,
3234 } => {
3235 let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3236 return CommandResponse::Error(
3237 "ERR bit offset is not an integer or out of range".into(),
3238 );
3239 };
3240 let old_value = Self::bitfield_read(&data, bit_offset, encoding);
3241 let Some(applied) =
3242 Self::apply_bitfield_overflow(value as i128, encoding, overflow)
3243 else {
3244 responses.push(CommandResponse::Nil);
3245 continue;
3246 };
3247 Self::bitfield_write(&mut data, bit_offset, encoding, applied);
3248 responses.push(CommandResponse::Integer(old_value));
3249 dirty = true;
3250 }
3251 BitFieldOperation::IncrBy {
3252 encoding,
3253 offset,
3254 increment,
3255 } => {
3256 let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3257 return CommandResponse::Error(
3258 "ERR bit offset is not an integer or out of range".into(),
3259 );
3260 };
3261 let current = Self::bitfield_read(&data, bit_offset, encoding) as i128;
3262 let target = current + increment as i128;
3263 let Some(applied) = Self::apply_bitfield_overflow(target, encoding, overflow)
3264 else {
3265 responses.push(CommandResponse::Nil);
3266 continue;
3267 };
3268 Self::bitfield_write(&mut data, bit_offset, encoding, applied);
3269 responses.push(CommandResponse::Integer(applied as i64));
3270 dirty = true;
3271 }
3272 }
3273 }
3274
3275 if dirty {
3276 self.set_string_value(key, data);
3277 }
3278
3279 CommandResponse::Array(responses)
3280 }
3281
3282 fn resolve_bitfield_offset(encoding: BitFieldEncoding, offset: BitFieldOffset) -> Option<u64> {
3283 let base = match offset {
3284 BitFieldOffset::Absolute(v) => v,
3285 BitFieldOffset::Multiplied(v) => v.checked_mul(encoding.bits as i64)?,
3286 };
3287 if base < 0 {
3288 return None;
3289 }
3290 let start = base as u64;
3291 start.checked_add(encoding.bits as u64)?;
3292 Some(start)
3293 }
3294
3295 fn bitfield_read(data: &[u8], bit_offset: u64, encoding: BitFieldEncoding) -> i64 {
3296 let width = encoding.bits as u32;
3297 let mut raw: u128 = 0;
3298 for i in 0..width {
3299 let pos = bit_offset + i as u64;
3300 let byte_idx = (pos / 8) as usize;
3301 let bit_idx = 7 - (pos % 8) as usize;
3302 let bit = if byte_idx < data.len() {
3303 (data[byte_idx] >> bit_idx) & 1
3304 } else {
3305 0
3306 };
3307 raw = (raw << 1) | bit as u128;
3308 }
3309
3310 if encoding.signed {
3311 let shift = 128 - width;
3312 (((raw << shift) as i128) >> shift) as i64
3313 } else {
3314 raw as i64
3315 }
3316 }
3317
3318 fn bitfield_write(
3319 data: &mut Vec<u8>,
3320 bit_offset: u64,
3321 encoding: BitFieldEncoding,
3322 value: i128,
3323 ) {
3324 let width = encoding.bits as u32;
3325 let encoded = Self::encode_bitfield_value(value, encoding);
3326 let end_bit = bit_offset + width as u64;
3327 let needed_bytes = end_bit.div_ceil(8) as usize;
3328 if needed_bytes > data.len() {
3329 data.resize(needed_bytes, 0);
3330 }
3331
3332 for i in 0..width {
3333 let src_shift = width - 1 - i;
3334 let bit = ((encoded >> src_shift) & 1) as u8;
3335 let pos = bit_offset + i as u64;
3336 let byte_idx = (pos / 8) as usize;
3337 let bit_idx = 7 - (pos % 8) as usize;
3338 if bit == 1 {
3339 data[byte_idx] |= 1 << bit_idx;
3340 } else {
3341 data[byte_idx] &= !(1 << bit_idx);
3342 }
3343 }
3344 }
3345
3346 fn bitfield_bounds(encoding: BitFieldEncoding) -> (i128, i128) {
3347 if encoding.signed {
3348 let min = -(1i128 << (encoding.bits as u32 - 1));
3349 let max = (1i128 << (encoding.bits as u32 - 1)) - 1;
3350 (min, max)
3351 } else {
3352 (0, (1i128 << encoding.bits as u32) - 1)
3353 }
3354 }
3355
3356 fn apply_bitfield_overflow(
3357 value: i128,
3358 encoding: BitFieldEncoding,
3359 mode: BitFieldOverflow,
3360 ) -> Option<i128> {
3361 let (min, max) = Self::bitfield_bounds(encoding);
3362 if (min..=max).contains(&value) {
3363 return Some(value);
3364 }
3365
3366 match mode {
3367 BitFieldOverflow::Fail => None,
3368 BitFieldOverflow::Sat => Some(value.clamp(min, max)),
3369 BitFieldOverflow::Wrap => {
3370 let modulo = 1i128 << encoding.bits as u32;
3371 let wrapped = ((value % modulo) + modulo) % modulo;
3372 if encoding.signed {
3373 let sign_boundary = 1i128 << (encoding.bits as u32 - 1);
3374 if wrapped >= sign_boundary {
3375 Some(wrapped - modulo)
3376 } else {
3377 Some(wrapped)
3378 }
3379 } else {
3380 Some(wrapped)
3381 }
3382 }
3383 }
3384 }
3385
3386 fn encode_bitfield_value(value: i128, encoding: BitFieldEncoding) -> u128 {
3387 if encoding.signed && value < 0 {
3388 (value + (1i128 << encoding.bits as u32)) as u128
3389 } else {
3390 value as u128
3391 }
3392 }
3393
3394 fn cmd_geoadd(
3397 &mut self,
3398 key: &[u8],
3399 nx: bool,
3400 xx: bool,
3401 ch: bool,
3402 members: &[(f64, f64, Vec<u8>)],
3403 ) -> CommandResponse {
3404 self.lazy_expire(key);
3405 let compact = CompactKey::new(key);
3406 let is_new = !self.entries.contains_key(&compact);
3407 let zset = self.get_or_create_sorted_set(&compact);
3408 match zset {
3409 Ok(map) => {
3410 let mut memory_delta: usize = 0;
3411 if is_new {
3412 memory_delta += key.len()
3413 + std::mem::size_of::<CompactKey>()
3414 + std::mem::size_of::<KeyEntry>();
3415 }
3416 let mut count = 0i64;
3417 let member_overhead = std::mem::size_of::<f64>();
3418 for (lon, lat, member) in members {
3419 let score = geo_encode(*lon, *lat);
3420 let existing = map.get(member).copied();
3421 match existing {
3422 Some(old_score) => {
3423 if nx {
3424 continue;
3425 }
3426 if (old_score - score).abs() > f64::EPSILON {
3427 map.insert(member.clone(), score);
3428 if ch {
3429 count += 1;
3430 }
3431 }
3432 }
3433 None => {
3434 if xx {
3435 continue;
3436 }
3437 map.insert(member.clone(), score);
3438 memory_delta += member.len() + member_overhead;
3439 count += 1;
3440 }
3441 }
3442 }
3443 self.memory_used += memory_delta;
3444 CommandResponse::Integer(count)
3445 }
3446 Err(e) => e,
3447 }
3448 }
3449
3450 fn cmd_geodist(
3451 &mut self,
3452 key: &[u8],
3453 member1: &[u8],
3454 member2: &[u8],
3455 unit: GeoUnit,
3456 ) -> CommandResponse {
3457 self.lazy_expire(key);
3458 let compact = CompactKey::new(key);
3459 match self.entries.get(&compact) {
3460 Some(entry) => match &entry.value {
3461 Value::SortedSet(map) => {
3462 let s1 = match map.get(member1) {
3463 Some(s) => *s,
3464 None => return CommandResponse::Nil,
3465 };
3466 let s2 = match map.get(member2) {
3467 Some(s) => *s,
3468 None => return CommandResponse::Nil,
3469 };
3470 let (lon1, lat1) = geo_decode(s1);
3471 let (lon2, lat2) = geo_decode(s2);
3472 let dist = haversine_distance(lat1, lon1, lat2, lon2);
3473 let converted = geo_convert_distance(dist, unit);
3474 CommandResponse::BulkString(format!("{:.4}", converted).into_bytes())
3475 }
3476 _ => CommandResponse::Error(
3477 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3478 ),
3479 },
3480 None => CommandResponse::Nil,
3481 }
3482 }
3483
3484 fn cmd_geohash(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
3485 self.lazy_expire(key);
3486 let compact = CompactKey::new(key);
3487 match self.entries.get(&compact) {
3488 Some(entry) => match &entry.value {
3489 Value::SortedSet(map) => {
3490 let results: Vec<CommandResponse> = members
3491 .iter()
3492 .map(|member| match map.get(member.as_slice()) {
3493 Some(&score) => {
3494 let (lon, lat) = geo_decode(score);
3495 let hash = geo_encode_base32(lon, lat);
3496 CommandResponse::BulkString(hash.into_bytes())
3497 }
3498 None => CommandResponse::Nil,
3499 })
3500 .collect();
3501 CommandResponse::Array(results)
3502 }
3503 _ => CommandResponse::Error(
3504 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3505 ),
3506 },
3507 None => {
3508 let results = vec![CommandResponse::Nil; members.len()];
3509 CommandResponse::Array(results)
3510 }
3511 }
3512 }
3513
3514 fn cmd_geopos(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
3515 self.lazy_expire(key);
3516 let compact = CompactKey::new(key);
3517 match self.entries.get(&compact) {
3518 Some(entry) => match &entry.value {
3519 Value::SortedSet(map) => {
3520 let results: Vec<CommandResponse> = members
3521 .iter()
3522 .map(|member| match map.get(member.as_slice()) {
3523 Some(&score) => {
3524 let (lon, lat) = geo_decode(score);
3525 CommandResponse::Array(vec![
3526 CommandResponse::BulkString(format!("{:.6}", lon).into_bytes()),
3527 CommandResponse::BulkString(format!("{:.6}", lat).into_bytes()),
3528 ])
3529 }
3530 None => CommandResponse::Nil,
3531 })
3532 .collect();
3533 CommandResponse::Array(results)
3534 }
3535 _ => CommandResponse::Error(
3536 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3537 ),
3538 },
3539 None => {
3540 let results = vec![CommandResponse::Nil; members.len()];
3541 CommandResponse::Array(results)
3542 }
3543 }
3544 }
3545
3546 #[allow(clippy::too_many_arguments)]
3547 fn cmd_geosearch(
3548 &mut self,
3549 key: &[u8],
3550 from_member: Option<&[u8]>,
3551 from_lonlat: Option<(f64, f64)>,
3552 radius: f64,
3553 unit: GeoUnit,
3554 asc: Option<bool>,
3555 count: Option<usize>,
3556 withcoord: bool,
3557 withdist: bool,
3558 withhash: bool,
3559 ) -> CommandResponse {
3560 self.lazy_expire(key);
3561 let compact = CompactKey::new(key);
3562 let map = match self.entries.get(&compact) {
3563 Some(entry) => match &entry.value {
3564 Value::SortedSet(map) => map,
3565 _ => {
3566 return CommandResponse::Error(
3567 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3568 )
3569 }
3570 },
3571 None => return CommandResponse::Array(vec![]),
3572 };
3573
3574 let (center_lon, center_lat) = if let Some(member) = from_member {
3575 match map.get(member) {
3576 Some(&score) => geo_decode(score),
3577 None => return CommandResponse::Array(vec![]),
3578 }
3579 } else if let Some((lon, lat)) = from_lonlat {
3580 (lon, lat)
3581 } else {
3582 return CommandResponse::Error("ERR FROMMEMBER or FROMLONLAT required".into());
3583 };
3584
3585 let radius_meters = geo_to_meters(radius, unit);
3586
3587 let mut results: Vec<(Vec<u8>, f64, f64, f64, f64)> = Vec::new();
3588 for (member, &score) in map.iter() {
3589 let (lon, lat) = geo_decode(score);
3590 let dist = haversine_distance(center_lat, center_lon, lat, lon);
3591 if dist <= radius_meters {
3592 results.push((member.clone(), dist, lon, lat, score));
3593 }
3594 }
3595
3596 if let Some(true) = asc {
3597 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
3598 } else if let Some(false) = asc {
3599 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3600 }
3601
3602 if let Some(c) = count {
3603 results.truncate(c);
3604 }
3605
3606 let items: Vec<CommandResponse> = results
3607 .into_iter()
3608 .map(|(member, dist, lon, lat, score)| {
3609 if !withcoord && !withdist && !withhash {
3610 return CommandResponse::BulkString(member);
3611 }
3612 let mut arr = vec![CommandResponse::BulkString(member)];
3613 if withdist {
3614 let converted = geo_convert_distance(dist, unit);
3615 arr.push(CommandResponse::BulkString(
3616 format!("{:.4}", converted).into_bytes(),
3617 ));
3618 }
3619 if withhash {
3620 arr.push(CommandResponse::Integer(score.to_bits() as i64));
3621 }
3622 if withcoord {
3623 arr.push(CommandResponse::Array(vec![
3624 CommandResponse::BulkString(format!("{:.6}", lon).into_bytes()),
3625 CommandResponse::BulkString(format!("{:.6}", lat).into_bytes()),
3626 ]));
3627 }
3628 CommandResponse::Array(arr)
3629 })
3630 .collect();
3631
3632 CommandResponse::Array(items)
3633 }
3634
3635 #[cfg(feature = "vector")]
3638 fn cmd_vec_set(&mut self, key: &[u8], dimensions: usize, vector: &[f32]) -> CommandResponse {
3639 let compact = CompactKey::new(key);
3640 let index = self
3641 .vector_indexes
3642 .entry(compact)
3643 .or_insert_with(|| HnswIndex::new(dimensions, DistanceMetric::L2, 16, 200));
3644
3645 if index.dim() != dimensions {
3646 return CommandResponse::Error(format!(
3647 "ERR dimension mismatch: index has {}, got {}",
3648 index.dim(),
3649 dimensions
3650 ));
3651 }
3652
3653 let id = {
3654 let mut hasher = std::collections::hash_map::DefaultHasher::new();
3655 for &v in vector {
3656 std::hash::Hasher::write(&mut hasher, &v.to_le_bytes());
3657 }
3658 std::hash::Hasher::finish(&hasher)
3659 };
3660
3661 index.insert(id, vector);
3662 CommandResponse::Integer(id as i64)
3663 }
3664
3665 #[cfg(feature = "vector")]
3666 fn cmd_vec_query(&self, key: &[u8], k: usize, vector: &[f32]) -> CommandResponse {
3667 let compact = CompactKey::new(key);
3668 let index = match self.vector_indexes.get(&compact) {
3669 Some(idx) => idx,
3670 None => return CommandResponse::Array(vec![]),
3671 };
3672
3673 let results = index.search(vector, k, k.max(50));
3674 let items: Vec<CommandResponse> = results
3675 .into_iter()
3676 .map(|r| {
3677 CommandResponse::Array(vec![
3678 CommandResponse::Integer(r.id as i64),
3679 CommandResponse::BulkString(format!("{}", r.distance).into_bytes()),
3680 ])
3681 })
3682 .collect();
3683 CommandResponse::Array(items)
3684 }
3685
3686 #[cfg(feature = "vector")]
3687 fn cmd_vec_del(&mut self, key: &[u8]) -> CommandResponse {
3688 let compact = CompactKey::new(key);
3689 if self.vector_indexes.remove(&compact).is_some() {
3690 CommandResponse::Integer(1)
3691 } else {
3692 CommandResponse::Integer(0)
3693 }
3694 }
3695
3696 fn build_string_entry(
3699 key: &CompactKey,
3700 value: &[u8],
3701 ttl: Option<Duration>,
3702 ) -> (KeyEntry, usize) {
3703 let new_value = Value::from_raw_bytes(value);
3704 let entry_size = Self::estimate_key_entry_size(key.as_bytes(), &new_value);
3705 let mut entry = KeyEntry::new(key.clone(), new_value);
3706 if let Some(dur) = ttl {
3707 entry.set_ttl(dur);
3708 }
3709 (entry, entry_size)
3710 }
3711
3712 fn lazy_expire(&mut self, key: &[u8]) {
3713 if self.is_expired(key) {
3714 let compact = CompactKey::new(key);
3715 let _ = self.remove_compact_entry(&compact);
3716 }
3717 }
3718
3719 fn is_expired(&self, key: &[u8]) -> bool {
3720 self.entries
3721 .get(key)
3722 .is_some_and(|entry| entry.is_expired())
3723 }
3724
3725 fn touch_key(&mut self, key: &CompactKey) {
3727 if let Some(entry) = self.entries.get_mut(key) {
3728 entry.touch_lfu();
3729 }
3730 }
3731
3732 fn maybe_evict(&mut self) {
3734 if self.max_memory == 0 || self.memory_used < self.max_memory {
3735 return;
3736 }
3737
3738 let sample_size = 5usize;
3739 let entry_count = self.entries.len();
3740 if entry_count == 0 {
3741 return;
3742 }
3743
3744 self.eviction_counter = self.eviction_counter.wrapping_add(1);
3745 let start = (self.eviction_counter as usize).wrapping_mul(self.shard_id as usize + 1)
3746 % entry_count.max(1);
3747
3748 let mut lowest_counter = u8::MAX;
3749 let mut lowest_key: Option<CompactKey> = None;
3750 let mut sampled = 0usize;
3751
3752 for (i, (key, entry)) in self.entries.iter().enumerate() {
3753 if i < start {
3754 continue;
3755 }
3756 if sampled >= sample_size {
3757 break;
3758 }
3759 sampled += 1;
3760 if entry.lfu_counter < lowest_counter {
3761 lowest_counter = entry.lfu_counter;
3762 lowest_key = Some(key.clone());
3763 }
3764 }
3765
3766 if lowest_key.is_none() {
3767 for (key, entry) in self.entries.iter().take(sample_size * 3) {
3768 if entry.lfu_counter < lowest_counter {
3769 lowest_counter = entry.lfu_counter;
3770 lowest_key = Some(key.clone());
3771 }
3772 }
3773 }
3774
3775 if let Some(key) = lowest_key {
3776 let _ = self.remove_compact_entry(&key);
3777 }
3778 }
3779
3780 fn remove_compact_entry(&mut self, key: &CompactKey) -> Option<KeyEntry> {
3781 let removed = self.entries.remove(key)?;
3782 let removed_size = Self::estimate_key_entry_size(key.as_bytes(), &removed.value);
3783 self.memory_used = self.memory_used.saturating_sub(removed_size);
3784 Some(removed)
3785 }
3786
3787 fn estimate_key_entry_size(key: &[u8], value: &Value) -> usize {
3788 key.len()
3789 + std::mem::size_of::<CompactKey>()
3790 + std::mem::size_of::<KeyEntry>()
3791 + value.estimated_size()
3792 }
3793
3794 fn get_or_create_list(
3795 &mut self,
3796 key: &CompactKey,
3797 ) -> Result<&mut VecDeque<Value>, CommandResponse> {
3798 if !self.entries.contains_key(key) {
3799 let entry = KeyEntry::new(key.clone(), Value::List(VecDeque::new()));
3800 self.entries.insert(key.clone(), entry);
3801 }
3802 let entry = self.entries.get_mut(key).ok_or_else(|| {
3803 CommandResponse::Error("ERR internal: key not found after insert".into())
3804 })?;
3805 match &mut entry.value {
3806 Value::List(deque) => Ok(deque),
3807 _ => Err(CommandResponse::Error(
3808 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3809 )),
3810 }
3811 }
3812
3813 fn get_or_create_hash(
3814 &mut self,
3815 key: &CompactKey,
3816 ) -> Result<&mut HashMap<CompactKey, Value>, CommandResponse> {
3817 if !self.entries.contains_key(key) {
3818 let entry = KeyEntry::new(key.clone(), Value::Hash(HashMap::new()));
3819 self.entries.insert(key.clone(), entry);
3820 }
3821 let entry = self.entries.get_mut(key).ok_or_else(|| {
3822 CommandResponse::Error("ERR internal: key not found after insert".into())
3823 })?;
3824 match &mut entry.value {
3825 Value::Hash(map) => Ok(map),
3826 _ => Err(CommandResponse::Error(
3827 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3828 )),
3829 }
3830 }
3831
3832 fn get_or_create_sorted_set(
3833 &mut self,
3834 key: &CompactKey,
3835 ) -> Result<&mut BTreeMap<Vec<u8>, f64>, CommandResponse> {
3836 if !self.entries.contains_key(key) {
3837 let entry = KeyEntry::new(key.clone(), Value::SortedSet(BTreeMap::new()));
3838 self.entries.insert(key.clone(), entry);
3839 }
3840 let entry = self.entries.get_mut(key).ok_or_else(|| {
3841 CommandResponse::Error("ERR internal: key not found after insert".into())
3842 })?;
3843 match &mut entry.value {
3844 Value::SortedSet(map) => Ok(map),
3845 _ => Err(CommandResponse::Error(
3846 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3847 )),
3848 }
3849 }
3850
3851 fn get_or_create_set(
3852 &mut self,
3853 key: &CompactKey,
3854 ) -> Result<&mut HashSet<Value>, CommandResponse> {
3855 if !self.entries.contains_key(key) {
3856 let entry = KeyEntry::new(key.clone(), Value::Set(HashSet::new()));
3857 self.entries.insert(key.clone(), entry);
3858 }
3859 let entry = self.entries.get_mut(key).ok_or_else(|| {
3860 CommandResponse::Error("ERR internal: key not found after insert".into())
3861 })?;
3862 match &mut entry.value {
3863 Value::Set(set) => Ok(set),
3864 _ => Err(CommandResponse::Error(
3865 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3866 )),
3867 }
3868 }
3869
3870 fn get_or_create_stream(
3871 &mut self,
3872 key: &CompactKey,
3873 ) -> Result<&mut StreamLog, CommandResponse> {
3874 if !self.entries.contains_key(key) {
3875 let log = StreamLog {
3876 entries: VecDeque::new(),
3877 last_id: StreamId { ms: 0, seq: 0 },
3878 };
3879 let entry = KeyEntry::new(key.clone(), Value::Stream(Box::new(log)));
3880 self.entries.insert(key.clone(), entry);
3881 }
3882 let entry = self.entries.get_mut(key).ok_or_else(|| {
3883 CommandResponse::Error("ERR internal: key not found after insert".into())
3884 })?;
3885 match &mut entry.value {
3886 Value::Stream(log) => Ok(log),
3887 _ => Err(CommandResponse::Error(
3888 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3889 )),
3890 }
3891 }
3892
3893 fn cmd_xadd(
3894 &mut self,
3895 key: &[u8],
3896 id_arg: &[u8],
3897 fields: &[(Vec<u8>, Vec<u8>)],
3898 maxlen: Option<usize>,
3899 ) -> CommandResponse {
3900 self.lazy_expire(key);
3901 let compact = CompactKey::new(key);
3902 let is_new = !self.entries.contains_key(&compact);
3903
3904 let log = match self.get_or_create_stream(&compact) {
3905 Ok(log) => log,
3906 Err(e) => return e,
3907 };
3908
3909 let new_id = if id_arg == b"*" {
3910 let ms = std::time::SystemTime::now()
3911 .duration_since(std::time::UNIX_EPOCH)
3912 .map(|d| d.as_millis() as u64)
3913 .unwrap_or(0);
3914 if ms > log.last_id.ms {
3915 StreamId { ms, seq: 0 }
3916 } else {
3917 StreamId {
3918 ms: log.last_id.ms,
3919 seq: log.last_id.seq + 1,
3920 }
3921 }
3922 } else {
3923 match StreamId::parse(id_arg) {
3924 Some(parsed) => {
3925 if parsed <= log.last_id {
3926 return CommandResponse::Error(
3927 "ERR The ID specified in XADD is equal or smaller than the target stream top item".into(),
3928 );
3929 }
3930 parsed
3931 }
3932 None => {
3933 return CommandResponse::Error(
3934 "ERR Invalid stream ID specified as stream command argument".into(),
3935 );
3936 }
3937 }
3938 };
3939
3940 let id_str = format!("{}", new_id).into_bytes();
3941 log.last_id = new_id.clone();
3942
3943 let mut memory_delta: usize = 0;
3944 if is_new {
3945 memory_delta +=
3946 key.len() + std::mem::size_of::<CompactKey>() + std::mem::size_of::<KeyEntry>();
3947 }
3948 memory_delta += std::mem::size_of::<StreamEntry>();
3949 for (k, v) in fields {
3950 memory_delta += k.len() + v.len();
3951 }
3952
3953 log.entries.push_back(StreamEntry {
3954 id: new_id,
3955 fields: fields.to_vec(),
3956 });
3957
3958 if let Some(max) = maxlen {
3959 while log.entries.len() > max {
3960 log.entries.pop_front();
3961 }
3962 }
3963
3964 self.memory_used += memory_delta;
3965 CommandResponse::BulkString(id_str)
3966 }
3967
3968 fn cmd_xlen(&mut self, key: &[u8]) -> CommandResponse {
3969 self.lazy_expire(key);
3970 let compact = CompactKey::new(key);
3971 match self.entries.get(&compact) {
3972 Some(entry) => match &entry.value {
3973 Value::Stream(log) => CommandResponse::Integer(log.entries.len() as i64),
3974 _ => CommandResponse::Error(
3975 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3976 ),
3977 },
3978 None => CommandResponse::Integer(0),
3979 }
3980 }
3981
3982 fn parse_range_id(id_bytes: &[u8], is_start: bool) -> StreamId {
3983 if id_bytes == b"-" {
3984 StreamId::min_id()
3985 } else if id_bytes == b"+" {
3986 StreamId::max_id()
3987 } else {
3988 StreamId::parse(id_bytes).unwrap_or(if is_start {
3989 StreamId::min_id()
3990 } else {
3991 StreamId::max_id()
3992 })
3993 }
3994 }
3995
3996 fn format_stream_entry(entry: &StreamEntry) -> CommandResponse {
3997 let mut fields_resp: Vec<CommandResponse> = Vec::with_capacity(entry.fields.len() * 2);
3998 for (k, v) in &entry.fields {
3999 fields_resp.push(CommandResponse::BulkString(k.clone()));
4000 fields_resp.push(CommandResponse::BulkString(v.clone()));
4001 }
4002 CommandResponse::Array(vec![
4003 CommandResponse::BulkString(format!("{}", entry.id).into_bytes()),
4004 CommandResponse::Array(fields_resp),
4005 ])
4006 }
4007
4008 fn cmd_xrange(
4009 &mut self,
4010 key: &[u8],
4011 start: &[u8],
4012 end: &[u8],
4013 count: Option<usize>,
4014 ) -> CommandResponse {
4015 self.lazy_expire(key);
4016 let compact = CompactKey::new(key);
4017 match self.entries.get(&compact) {
4018 Some(entry) => match &entry.value {
4019 Value::Stream(log) => {
4020 let start_id = Self::parse_range_id(start, true);
4021 let end_id = Self::parse_range_id(end, false);
4022 let limit = count.unwrap_or(usize::MAX);
4023
4024 let results: Vec<CommandResponse> = log
4025 .entries
4026 .iter()
4027 .filter(|e| e.id >= start_id && e.id <= end_id)
4028 .take(limit)
4029 .map(Self::format_stream_entry)
4030 .collect();
4031
4032 CommandResponse::Array(results)
4033 }
4034 _ => CommandResponse::Error(
4035 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4036 ),
4037 },
4038 None => CommandResponse::Array(vec![]),
4039 }
4040 }
4041
4042 fn cmd_xrevrange(
4043 &mut self,
4044 key: &[u8],
4045 start: &[u8],
4046 end: &[u8],
4047 count: Option<usize>,
4048 ) -> CommandResponse {
4049 self.lazy_expire(key);
4050 let compact = CompactKey::new(key);
4051 match self.entries.get(&compact) {
4052 Some(entry) => match &entry.value {
4053 Value::Stream(log) => {
4054 let start_id = Self::parse_range_id(start, false);
4055 let end_id = Self::parse_range_id(end, true);
4056 let limit = count.unwrap_or(usize::MAX);
4057
4058 let results: Vec<CommandResponse> = log
4059 .entries
4060 .iter()
4061 .rev()
4062 .filter(|e| e.id >= end_id && e.id <= start_id)
4063 .take(limit)
4064 .map(Self::format_stream_entry)
4065 .collect();
4066
4067 CommandResponse::Array(results)
4068 }
4069 _ => CommandResponse::Error(
4070 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4071 ),
4072 },
4073 None => CommandResponse::Array(vec![]),
4074 }
4075 }
4076
4077 fn cmd_xread(
4078 &mut self,
4079 keys: &[Vec<u8>],
4080 ids: &[Vec<u8>],
4081 count: Option<usize>,
4082 ) -> CommandResponse {
4083 if keys.len() != ids.len() {
4084 return CommandResponse::Error(
4085 "ERR Unbalanced XREAD list of streams: for each stream key an ID must be specified"
4086 .into(),
4087 );
4088 }
4089
4090 let limit = count.unwrap_or(usize::MAX);
4091 let mut results: Vec<CommandResponse> = Vec::new();
4092
4093 for (key, id_bytes) in keys.iter().zip(ids.iter()) {
4094 self.lazy_expire(key);
4095 let compact = CompactKey::new(key);
4096 if let Some(entry) = self.entries.get(&compact) {
4097 if let Value::Stream(log) = &entry.value {
4098 let after_id = if id_bytes == b"$" {
4099 log.last_id.clone()
4100 } else {
4101 match StreamId::parse(id_bytes) {
4102 Some(id) => id,
4103 None => continue,
4104 }
4105 };
4106
4107 let entries: Vec<CommandResponse> = log
4108 .entries
4109 .iter()
4110 .filter(|e| e.id > after_id)
4111 .take(limit)
4112 .map(Self::format_stream_entry)
4113 .collect();
4114
4115 if !entries.is_empty() {
4116 results.push(CommandResponse::Array(vec![
4117 CommandResponse::BulkString(key.clone()),
4118 CommandResponse::Array(entries),
4119 ]));
4120 }
4121 }
4122 }
4123 }
4124
4125 if results.is_empty() {
4126 CommandResponse::Nil
4127 } else {
4128 CommandResponse::Array(results)
4129 }
4130 }
4131
4132 fn cmd_xtrim(&mut self, key: &[u8], maxlen: usize) -> CommandResponse {
4133 self.lazy_expire(key);
4134 let compact = CompactKey::new(key);
4135 match self.entries.get_mut(&compact) {
4136 Some(entry) => match &mut entry.value {
4137 Value::Stream(log) => {
4138 let mut trimmed = 0i64;
4139 while log.entries.len() > maxlen {
4140 log.entries.pop_front();
4141 trimmed += 1;
4142 }
4143 CommandResponse::Integer(trimmed)
4144 }
4145 _ => CommandResponse::Error(
4146 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4147 ),
4148 },
4149 None => CommandResponse::Integer(0),
4150 }
4151 }
4152
4153 fn cmd_xdel(&mut self, key: &[u8], ids: &[Vec<u8>]) -> CommandResponse {
4154 self.lazy_expire(key);
4155 let compact = CompactKey::new(key);
4156 match self.entries.get_mut(&compact) {
4157 Some(entry) => match &mut entry.value {
4158 Value::Stream(log) => {
4159 let mut deleted = 0i64;
4160 for id_bytes in ids {
4161 if let Some(sid) = StreamId::parse(id_bytes) {
4162 let before = log.entries.len();
4163 log.entries.retain(|e| e.id != sid);
4164 if log.entries.len() < before {
4165 deleted += 1;
4166 }
4167 }
4168 }
4169 CommandResponse::Integer(deleted)
4170 }
4171 _ => CommandResponse::Error(
4172 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4173 ),
4174 },
4175 None => CommandResponse::Integer(0),
4176 }
4177 }
4178
4179 fn cmd_xgroup_create(
4180 &mut self,
4181 key: &[u8],
4182 group: &str,
4183 id: &str,
4184 mkstream: bool,
4185 ) -> CommandResponse {
4186 self.lazy_expire(key);
4187 let compact = CompactKey::new(key);
4188
4189 if !self.entries.contains_key(&compact) {
4190 if !mkstream {
4191 return CommandResponse::Error(
4192 "ERR The XGROUP subcommand requires the key to exist".into(),
4193 );
4194 }
4195 let log = StreamLog {
4196 entries: VecDeque::new(),
4197 last_id: StreamId { ms: 0, seq: 0 },
4198 };
4199 let entry = KeyEntry::new(compact.clone(), Value::Stream(Box::new(log)));
4200 self.entries.insert(compact.clone(), entry);
4201 }
4202
4203 match self.entries.get(&compact) {
4204 Some(entry) => match &entry.value {
4205 Value::Stream(log) => {
4206 let last_delivered_id = if id == "$" {
4207 log.last_id.clone()
4208 } else if id == "0" || id == "0-0" {
4209 StreamId::min_id()
4210 } else {
4211 StreamId::parse(id.as_bytes()).unwrap_or(StreamId::min_id())
4212 };
4213 let groups = self.stream_groups.entry(compact).or_default();
4214 if groups.contains_key(group) {
4215 return CommandResponse::Error(
4216 "BUSYGROUP Consumer Group name already exists".into(),
4217 );
4218 }
4219 groups.insert(
4220 group.to_string(),
4221 StreamConsumerGroup {
4222 last_delivered_id,
4223 pel: HashMap::new(),
4224 consumers: HashMap::new(),
4225 },
4226 );
4227 CommandResponse::Ok
4228 }
4229 _ => CommandResponse::Error(
4230 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4231 ),
4232 },
4233 None => CommandResponse::Error("ERR internal error".into()),
4234 }
4235 }
4236
4237 fn cmd_xgroup_destroy(&mut self, key: &[u8], group: &str) -> CommandResponse {
4238 let compact = CompactKey::new(key);
4239 if let Some(groups) = self.stream_groups.get_mut(&compact) {
4240 if groups.remove(group).is_some() {
4241 return CommandResponse::Integer(1);
4242 }
4243 }
4244 CommandResponse::Integer(0)
4245 }
4246
4247 fn cmd_xgroup_delconsumer(
4248 &mut self,
4249 key: &[u8],
4250 group: &str,
4251 consumer: &str,
4252 ) -> CommandResponse {
4253 let compact = CompactKey::new(key);
4254 if let Some(groups) = self.stream_groups.get_mut(&compact) {
4255 if let Some(grp) = groups.get_mut(group) {
4256 if let Some(state) = grp.consumers.remove(consumer) {
4257 let pending_count = state.pending.len() as i64;
4258 for sid in &state.pending {
4259 grp.pel.remove(sid);
4260 }
4261 return CommandResponse::Integer(pending_count);
4262 }
4263 }
4264 }
4265 CommandResponse::Integer(0)
4266 }
4267
4268 fn cmd_xreadgroup(
4269 &mut self,
4270 group: &str,
4271 consumer: &str,
4272 count: Option<usize>,
4273 keys: &[Vec<u8>],
4274 ids: &[Vec<u8>],
4275 ) -> CommandResponse {
4276 if keys.len() != ids.len() {
4277 return CommandResponse::Error(
4278 "ERR Unbalanced XREADGROUP list of streams: for each stream key an ID must be specified"
4279 .into(),
4280 );
4281 }
4282
4283 let limit = count.unwrap_or(usize::MAX);
4284 let mut results: Vec<CommandResponse> = Vec::new();
4285 let now_ms = std::time::SystemTime::now()
4286 .duration_since(std::time::UNIX_EPOCH)
4287 .map(|d| d.as_millis() as u64)
4288 .unwrap_or(0);
4289
4290 for (key, id_bytes) in keys.iter().zip(ids.iter()) {
4291 self.lazy_expire(key);
4292 let compact = CompactKey::new(key);
4293
4294 let is_new_msgs = id_bytes == b">";
4295
4296 let stream_entries: Vec<StreamEntry> = if let Some(entry) = self.entries.get(&compact) {
4297 if let Value::Stream(log) = &entry.value {
4298 if is_new_msgs {
4299 let last_delivered = self
4300 .stream_groups
4301 .get(&compact)
4302 .and_then(|gs| gs.get(group))
4303 .map(|g| g.last_delivered_id.clone())
4304 .unwrap_or(StreamId::min_id());
4305 log.entries
4306 .iter()
4307 .filter(|e| e.id > last_delivered)
4308 .take(limit)
4309 .cloned()
4310 .collect()
4311 } else {
4312 let groups = self.stream_groups.get(&compact);
4313 let grp = groups.and_then(|gs| gs.get(group));
4314 if let Some(grp) = grp {
4315 let consumer_state = grp.consumers.get(consumer);
4316 let pending_ids: Vec<StreamId> = consumer_state
4317 .map(|cs| {
4318 let mut ids: Vec<StreamId> =
4319 cs.pending.iter().cloned().collect();
4320 ids.sort();
4321 ids
4322 })
4323 .unwrap_or_default();
4324 pending_ids
4325 .into_iter()
4326 .take(limit)
4327 .filter_map(|sid| log.entries.iter().find(|e| e.id == sid).cloned())
4328 .collect()
4329 } else {
4330 Vec::new()
4331 }
4332 }
4333 } else {
4334 continue;
4335 }
4336 } else {
4337 continue;
4338 };
4339
4340 if is_new_msgs && !stream_entries.is_empty() {
4341 let groups = self.stream_groups.entry(compact.clone()).or_default();
4342 let grp = groups
4343 .entry(group.to_string())
4344 .or_insert_with(|| StreamConsumerGroup {
4345 last_delivered_id: StreamId::min_id(),
4346 pel: HashMap::new(),
4347 consumers: HashMap::new(),
4348 });
4349 let cs = grp.consumers.entry(consumer.to_string()).or_default();
4350 for se in &stream_entries {
4351 if se.id > grp.last_delivered_id {
4352 grp.last_delivered_id = se.id.clone();
4353 }
4354 grp.pel.insert(
4355 se.id.clone(),
4356 PendingEntry {
4357 consumer: consumer.to_string(),
4358 delivery_time: now_ms,
4359 delivery_count: 1,
4360 },
4361 );
4362 cs.pending.insert(se.id.clone());
4363 }
4364 }
4365
4366 if !stream_entries.is_empty() {
4367 let entries_resp: Vec<CommandResponse> = stream_entries
4368 .iter()
4369 .map(Self::format_stream_entry)
4370 .collect();
4371 results.push(CommandResponse::Array(vec![
4372 CommandResponse::BulkString(key.clone()),
4373 CommandResponse::Array(entries_resp),
4374 ]));
4375 }
4376 }
4377
4378 if results.is_empty() {
4379 CommandResponse::Nil
4380 } else {
4381 CommandResponse::Array(results)
4382 }
4383 }
4384
4385 fn cmd_xack(&mut self, key: &[u8], group: &str, ids: &[Vec<u8>]) -> CommandResponse {
4386 let compact = CompactKey::new(key);
4387 let mut acked = 0i64;
4388 if let Some(groups) = self.stream_groups.get_mut(&compact) {
4389 if let Some(grp) = groups.get_mut(group) {
4390 for id_bytes in ids {
4391 if let Some(sid) = StreamId::parse(id_bytes) {
4392 if let Some(pe) = grp.pel.remove(&sid) {
4393 if let Some(cs) = grp.consumers.get_mut(&pe.consumer) {
4394 cs.pending.remove(&sid);
4395 }
4396 acked += 1;
4397 }
4398 }
4399 }
4400 }
4401 }
4402 CommandResponse::Integer(acked)
4403 }
4404
4405 fn cmd_xpending(
4406 &mut self,
4407 key: &[u8],
4408 group: &str,
4409 start: Option<&[u8]>,
4410 end: Option<&[u8]>,
4411 count: Option<usize>,
4412 ) -> CommandResponse {
4413 let compact = CompactKey::new(key);
4414 let groups = self.stream_groups.get(&compact);
4415 let grp = match groups.and_then(|gs| gs.get(group)) {
4416 Some(g) => g,
4417 None => {
4418 return CommandResponse::Error("NOGROUP No such consumer group for key".into());
4419 }
4420 };
4421
4422 if let (Some(s), Some(e), Some(c)) = (start, end, count) {
4423 let start_id = Self::parse_range_id(s, true);
4424 let end_id = Self::parse_range_id(e, false);
4425 let limit = c;
4426
4427 let mut entries: Vec<(&StreamId, &PendingEntry)> = grp
4428 .pel
4429 .iter()
4430 .filter(|(sid, _)| **sid >= start_id && **sid <= end_id)
4431 .collect();
4432 entries.sort_by_key(|(sid, _)| (*sid).clone());
4433 entries.truncate(limit);
4434
4435 let results: Vec<CommandResponse> = entries
4436 .into_iter()
4437 .map(|(sid, pe)| {
4438 CommandResponse::Array(vec![
4439 CommandResponse::BulkString(format!("{}", sid).into_bytes()),
4440 CommandResponse::BulkString(pe.consumer.as_bytes().to_vec()),
4441 CommandResponse::Integer(
4442 std::time::SystemTime::now()
4443 .duration_since(std::time::UNIX_EPOCH)
4444 .map(|d| d.as_millis() as u64)
4445 .unwrap_or(0)
4446 .saturating_sub(pe.delivery_time)
4447 as i64,
4448 ),
4449 CommandResponse::Integer(pe.delivery_count as i64),
4450 ])
4451 })
4452 .collect();
4453
4454 return CommandResponse::Array(results);
4455 }
4456
4457 let total_pending = grp.pel.len() as i64;
4458 let min_id = grp
4459 .pel
4460 .keys()
4461 .min()
4462 .map(|id| format!("{}", id))
4463 .unwrap_or_default();
4464 let max_id = grp
4465 .pel
4466 .keys()
4467 .max()
4468 .map(|id| format!("{}", id))
4469 .unwrap_or_default();
4470
4471 let mut consumer_counts: HashMap<&str, i64> = HashMap::new();
4472 for pe in grp.pel.values() {
4473 *consumer_counts.entry(&pe.consumer).or_insert(0) += 1;
4474 }
4475 let consumers_resp: Vec<CommandResponse> = consumer_counts
4476 .into_iter()
4477 .map(|(name, count)| {
4478 CommandResponse::Array(vec![
4479 CommandResponse::BulkString(name.as_bytes().to_vec()),
4480 CommandResponse::BulkString(count.to_string().into_bytes()),
4481 ])
4482 })
4483 .collect();
4484
4485 CommandResponse::Array(vec![
4486 CommandResponse::Integer(total_pending),
4487 CommandResponse::BulkString(min_id.into_bytes()),
4488 CommandResponse::BulkString(max_id.into_bytes()),
4489 CommandResponse::Array(consumers_resp),
4490 ])
4491 }
4492
4493 fn cmd_xclaim(
4494 &mut self,
4495 key: &[u8],
4496 group: &str,
4497 consumer: &str,
4498 min_idle_time: u64,
4499 ids: &[Vec<u8>],
4500 ) -> CommandResponse {
4501 let compact = CompactKey::new(key);
4502 let now_ms = std::time::SystemTime::now()
4503 .duration_since(std::time::UNIX_EPOCH)
4504 .map(|d| d.as_millis() as u64)
4505 .unwrap_or(0);
4506
4507 let stream_ok = self
4508 .entries
4509 .get(&compact)
4510 .map(|e| matches!(&e.value, Value::Stream(_)))
4511 .unwrap_or(false);
4512
4513 if !stream_ok {
4514 return CommandResponse::Array(vec![]);
4515 }
4516
4517 let groups = match self.stream_groups.get_mut(&compact) {
4518 Some(g) => g,
4519 None => return CommandResponse::Array(vec![]),
4520 };
4521 let grp = match groups.get_mut(group) {
4522 Some(g) => g,
4523 None => return CommandResponse::Array(vec![]),
4524 };
4525
4526 let mut claimed_ids: Vec<StreamId> = Vec::new();
4527
4528 for id_bytes in ids {
4529 if let Some(sid) = StreamId::parse(id_bytes) {
4530 if let Some(pe) = grp.pel.get_mut(&sid) {
4531 let idle = now_ms.saturating_sub(pe.delivery_time);
4532 if idle >= min_idle_time {
4533 let old_consumer = pe.consumer.clone();
4534 pe.consumer = consumer.to_string();
4535 pe.delivery_time = now_ms;
4536 pe.delivery_count += 1;
4537
4538 if old_consumer != consumer {
4539 if let Some(cs) = grp.consumers.get_mut(&old_consumer) {
4540 cs.pending.remove(&sid);
4541 }
4542 }
4543 grp.consumers
4544 .entry(consumer.to_string())
4545 .or_default()
4546 .pending
4547 .insert(sid.clone());
4548 claimed_ids.push(sid);
4549 }
4550 }
4551 }
4552 }
4553
4554 let entry_map = self.entries.get(&compact);
4555 let results: Vec<CommandResponse> = if let Some(entry) = entry_map {
4556 if let Value::Stream(log) = &entry.value {
4557 claimed_ids
4558 .iter()
4559 .filter_map(|sid| {
4560 log.entries
4561 .iter()
4562 .find(|e| e.id == *sid)
4563 .map(Self::format_stream_entry)
4564 })
4565 .collect()
4566 } else {
4567 vec![]
4568 }
4569 } else {
4570 vec![]
4571 };
4572
4573 CommandResponse::Array(results)
4574 }
4575
4576 fn cmd_xautoclaim(
4577 &mut self,
4578 key: &[u8],
4579 group: &str,
4580 consumer: &str,
4581 min_idle_time: u64,
4582 start: &[u8],
4583 count: Option<usize>,
4584 ) -> CommandResponse {
4585 let compact = CompactKey::new(key);
4586 let now_ms = std::time::SystemTime::now()
4587 .duration_since(std::time::UNIX_EPOCH)
4588 .map(|d| d.as_millis() as u64)
4589 .unwrap_or(0);
4590 let start_id = Self::parse_range_id(start, true);
4591 let limit = count.unwrap_or(100);
4592
4593 let stream_ok = self
4594 .entries
4595 .get(&compact)
4596 .map(|e| matches!(&e.value, Value::Stream(_)))
4597 .unwrap_or(false);
4598
4599 if !stream_ok {
4600 return CommandResponse::Array(vec![
4601 CommandResponse::BulkString(b"0-0".to_vec()),
4602 CommandResponse::Array(vec![]),
4603 CommandResponse::Array(vec![]),
4604 ]);
4605 }
4606
4607 let groups = match self.stream_groups.get_mut(&compact) {
4608 Some(g) => g,
4609 None => {
4610 return CommandResponse::Array(vec![
4611 CommandResponse::BulkString(b"0-0".to_vec()),
4612 CommandResponse::Array(vec![]),
4613 CommandResponse::Array(vec![]),
4614 ])
4615 }
4616 };
4617 let grp = match groups.get_mut(group) {
4618 Some(g) => g,
4619 None => {
4620 return CommandResponse::Array(vec![
4621 CommandResponse::BulkString(b"0-0".to_vec()),
4622 CommandResponse::Array(vec![]),
4623 CommandResponse::Array(vec![]),
4624 ])
4625 }
4626 };
4627
4628 let mut eligible: Vec<StreamId> = grp
4629 .pel
4630 .iter()
4631 .filter(|(sid, pe)| {
4632 **sid >= start_id && now_ms.saturating_sub(pe.delivery_time) >= min_idle_time
4633 })
4634 .map(|(sid, _)| sid.clone())
4635 .collect();
4636 eligible.sort();
4637 eligible.truncate(limit);
4638
4639 let next_start = eligible
4640 .last()
4641 .map(|sid| StreamId {
4642 ms: sid.ms,
4643 seq: sid.seq + 1,
4644 })
4645 .unwrap_or(StreamId::min_id());
4646
4647 let mut claimed_ids: Vec<StreamId> = Vec::new();
4648 for sid in &eligible {
4649 if let Some(pe) = grp.pel.get_mut(sid) {
4650 let old_consumer = pe.consumer.clone();
4651 pe.consumer = consumer.to_string();
4652 pe.delivery_time = now_ms;
4653 pe.delivery_count += 1;
4654 if old_consumer != consumer {
4655 if let Some(cs) = grp.consumers.get_mut(&old_consumer) {
4656 cs.pending.remove(sid);
4657 }
4658 }
4659 grp.consumers
4660 .entry(consumer.to_string())
4661 .or_default()
4662 .pending
4663 .insert(sid.clone());
4664 claimed_ids.push(sid.clone());
4665 }
4666 }
4667
4668 let entry_map = self.entries.get(&compact);
4669 let entries_resp: Vec<CommandResponse> = if let Some(entry) = entry_map {
4670 if let Value::Stream(log) = &entry.value {
4671 claimed_ids
4672 .iter()
4673 .filter_map(|sid| {
4674 log.entries
4675 .iter()
4676 .find(|e| e.id == *sid)
4677 .map(Self::format_stream_entry)
4678 })
4679 .collect()
4680 } else {
4681 vec![]
4682 }
4683 } else {
4684 vec![]
4685 };
4686
4687 CommandResponse::Array(vec![
4688 CommandResponse::BulkString(format!("{}", next_start).into_bytes()),
4689 CommandResponse::Array(entries_resp),
4690 CommandResponse::Array(vec![]),
4691 ])
4692 }
4693
4694 fn cmd_xinfo_stream(&mut self, key: &[u8]) -> CommandResponse {
4695 self.lazy_expire(key);
4696 let compact = CompactKey::new(key);
4697 match self.entries.get(&compact) {
4698 Some(entry) => match &entry.value {
4699 Value::Stream(log) => {
4700 let length = log.entries.len() as i64;
4701 let groups_count = self
4702 .stream_groups
4703 .get(&compact)
4704 .map(|gs| gs.len() as i64)
4705 .unwrap_or(0);
4706
4707 CommandResponse::Array(vec![
4708 CommandResponse::BulkString(b"length".to_vec()),
4709 CommandResponse::Integer(length),
4710 CommandResponse::BulkString(b"first-entry".to_vec()),
4711 if let Some(first) = log.entries.front() {
4712 Self::format_stream_entry(first)
4713 } else {
4714 CommandResponse::Nil
4715 },
4716 CommandResponse::BulkString(b"last-entry".to_vec()),
4717 if let Some(last) = log.entries.back() {
4718 Self::format_stream_entry(last)
4719 } else {
4720 CommandResponse::Nil
4721 },
4722 CommandResponse::BulkString(b"last-generated-id".to_vec()),
4723 CommandResponse::BulkString(format!("{}", log.last_id).into_bytes()),
4724 CommandResponse::BulkString(b"groups".to_vec()),
4725 CommandResponse::Integer(groups_count),
4726 ])
4727 }
4728 _ => CommandResponse::Error(
4729 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4730 ),
4731 },
4732 None => CommandResponse::Error("ERR no such key".into()),
4733 }
4734 }
4735
4736 fn cmd_xinfo_groups(&mut self, key: &[u8]) -> CommandResponse {
4737 self.lazy_expire(key);
4738 let compact = CompactKey::new(key);
4739 match self.entries.get(&compact) {
4740 Some(entry) => match &entry.value {
4741 Value::Stream(_) => {
4742 let groups = self.stream_groups.get(&compact);
4743 let results: Vec<CommandResponse> = groups
4744 .map(|gs| {
4745 gs.iter()
4746 .map(|(name, grp)| {
4747 CommandResponse::Array(vec![
4748 CommandResponse::BulkString(b"name".to_vec()),
4749 CommandResponse::BulkString(name.as_bytes().to_vec()),
4750 CommandResponse::BulkString(b"consumers".to_vec()),
4751 CommandResponse::Integer(grp.consumers.len() as i64),
4752 CommandResponse::BulkString(b"pending".to_vec()),
4753 CommandResponse::Integer(grp.pel.len() as i64),
4754 CommandResponse::BulkString(b"last-delivered-id".to_vec()),
4755 CommandResponse::BulkString(
4756 format!("{}", grp.last_delivered_id).into_bytes(),
4757 ),
4758 ])
4759 })
4760 .collect()
4761 })
4762 .unwrap_or_default();
4763 CommandResponse::Array(results)
4764 }
4765 _ => CommandResponse::Error(
4766 "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4767 ),
4768 },
4769 None => CommandResponse::Error("ERR no such key".into()),
4770 }
4771 }
4772}
4773
4774fn format_float(f: f64) -> String {
4776 if f.fract() == 0.0 && f.abs() < 1e17 {
4777 let i = f as i64;
4778 if (i as f64) == f {
4779 return format!("{i}");
4780 }
4781 }
4782 for precision in 0..=17 {
4783 let s = format!("{f:.precision$}");
4784 if s.parse::<f64>().ok() == Some(f) {
4785 return s;
4786 }
4787 }
4788 format!("{f:.17}")
4789}
4790
4791fn normalize_index(index: i64, len: i64) -> usize {
4792 if index < 0 {
4793 let normalized = len + index;
4794 if normalized < 0 {
4795 0
4796 } else {
4797 normalized as usize
4798 }
4799 } else {
4800 index as usize
4801 }
4802}
4803
4804fn glob_match(pattern: &str, key: &[u8]) -> bool {
4806 let key_str = match std::str::from_utf8(key) {
4807 Ok(s) => s,
4808 Err(_) => return false,
4809 };
4810 if pattern == "*" {
4811 return true;
4812 }
4813 glob_match_iterative(pattern.as_bytes(), key_str.as_bytes())
4814}
4815
4816fn glob_match_iterative(pattern: &[u8], text: &[u8]) -> bool {
4818 let mut pi = 0; let mut ti = 0; let mut star_pi = usize::MAX; let mut star_ti = 0; while ti < text.len() {
4824 if pi < pattern.len() && (pattern[pi] == b'?' || pattern[pi] == text[ti]) {
4825 pi += 1;
4826 ti += 1;
4827 } else if pi < pattern.len() && pattern[pi] == b'*' {
4828 star_pi = pi;
4829 star_ti = ti;
4830 pi += 1; } else if star_pi != usize::MAX {
4832 pi = star_pi + 1;
4834 star_ti += 1;
4835 ti = star_ti;
4836 } else {
4837 return false;
4838 }
4839 }
4840
4841 while pi < pattern.len() && pattern[pi] == b'*' {
4843 pi += 1;
4844 }
4845
4846 pi == pattern.len()
4847}
4848
4849fn lex_gte_min(member: &[u8], min: &[u8]) -> bool {
4850 if min.is_empty() {
4851 return true;
4852 }
4853 match min[0] {
4854 b'-' => true,
4855 b'+' => false,
4856 b'[' => member >= &min[1..],
4857 b'(' => member > &min[1..],
4858 _ => true,
4859 }
4860}
4861
4862fn lex_lte_max(member: &[u8], max: &[u8]) -> bool {
4863 if max.is_empty() {
4864 return true;
4865 }
4866 match max[0] {
4867 b'+' => true,
4868 b'-' => false,
4869 b'[' => member <= &max[1..],
4870 b'(' => member < &max[1..],
4871 _ => true,
4872 }
4873}
4874
4875fn lex_in_range(member: &[u8], min: &[u8], max: &[u8]) -> bool {
4876 lex_gte_min(member, min) && lex_lte_max(member, max)
4877}
4878
4879const HLL_P: usize = 14;
4882const HLL_REGISTERS: usize = 1 << HLL_P;
4883const HLL_REGISTER_BYTES: usize = HLL_REGISTERS * 6 / 8;
4884
4885fn hll_hash(element: &[u8]) -> u64 {
4886 use std::hash::{Hash, Hasher};
4887 let mut hasher = ahash::AHasher::default();
4888 element.hash(&mut hasher);
4889 hasher.finish()
4890}
4891
4892fn hll_register_get(registers: &[u8], index: usize) -> u8 {
4893 let bit_offset = index * 6;
4894 let byte_offset = bit_offset / 8;
4895 let bit_shift = bit_offset % 8;
4896
4897 if bit_shift <= 2 {
4898 (registers[byte_offset] >> bit_shift) & 0x3F
4899 } else {
4900 let lo = registers[byte_offset] >> bit_shift;
4901 let hi = if byte_offset + 1 < registers.len() {
4902 registers[byte_offset + 1] << (8 - bit_shift)
4903 } else {
4904 0
4905 };
4906 (lo | hi) & 0x3F
4907 }
4908}
4909
4910fn hll_register_set(registers: &mut [u8], index: usize, value: u8) {
4911 let bit_offset = index * 6;
4912 let byte_offset = bit_offset / 8;
4913 let bit_shift = bit_offset % 8;
4914
4915 let mask_lo = !(0x3F_u8 << bit_shift);
4916 registers[byte_offset] = (registers[byte_offset] & mask_lo) | ((value & 0x3F) << bit_shift);
4917
4918 if bit_shift > 2 && byte_offset + 1 < registers.len() {
4919 let bits_in_first = 8 - bit_shift;
4920 let mask_hi = !((0x3F >> bits_in_first) as u8);
4921 registers[byte_offset + 1] =
4922 (registers[byte_offset + 1] & mask_hi) | ((value & 0x3F) >> bits_in_first);
4923 }
4924}
4925
4926fn hll_add(registers: &mut [u8], element: &[u8]) -> bool {
4927 let hash = hll_hash(element);
4928 let index = (hash & ((1 << HLL_P) - 1)) as usize;
4929 let remaining = hash >> HLL_P;
4930 let rank = if remaining == 0 {
4931 (64 - HLL_P) as u8 + 1
4932 } else {
4933 (remaining.trailing_zeros() + 1) as u8
4934 };
4935
4936 let current = hll_register_get(registers, index);
4937 if rank > current {
4938 hll_register_set(registers, index, rank);
4939 true
4940 } else {
4941 false
4942 }
4943}
4944
4945fn hll_count(registers: &[u8]) -> u64 {
4946 let m = HLL_REGISTERS as f64;
4947 let alpha = match HLL_REGISTERS {
4948 16 => 0.673,
4949 32 => 0.697,
4950 64 => 0.709,
4951 _ => 0.7213 / (1.0 + 1.079 / m),
4952 };
4953
4954 let mut sum = 0.0f64;
4955 let mut zeros = 0u32;
4956
4957 for i in 0..HLL_REGISTERS {
4958 let val = hll_register_get(registers, i);
4959 sum += 1.0 / (1u64 << val) as f64;
4960 if val == 0 {
4961 zeros += 1;
4962 }
4963 }
4964
4965 let mut estimate = alpha * m * m / sum;
4966
4967 if estimate <= 2.5 * m && zeros > 0 {
4968 estimate = m * (m / zeros as f64).ln();
4969 }
4970
4971 estimate.round() as u64
4972}
4973
4974fn hll_merge(dest: &mut [u8], src: &[u8]) {
4975 for i in 0..HLL_REGISTERS {
4976 let src_val = hll_register_get(src, i);
4977 let dst_val = hll_register_get(dest, i);
4978 if src_val > dst_val {
4979 hll_register_set(dest, i, src_val);
4980 }
4981 }
4982}
4983
4984const GEO_HASH_BITS: u32 = 52;
4987
4988fn geo_encode(lon: f64, lat: f64) -> f64 {
4989 let lon_norm = (lon + 180.0) / 360.0;
4990 let lat_norm = (lat + 90.0) / 180.0;
4991
4992 let mut hash: u64 = 0;
4993 let mut lon_min = 0.0f64;
4994 let mut lon_max = 1.0f64;
4995 let mut lat_min = 0.0f64;
4996 let mut lat_max = 1.0f64;
4997
4998 for i in 0..GEO_HASH_BITS {
4999 if i % 2 == 0 {
5000 let mid = (lon_min + lon_max) / 2.0;
5001 if lon_norm >= mid {
5002 hash |= 1 << (GEO_HASH_BITS - 1 - i);
5003 lon_min = mid;
5004 } else {
5005 lon_max = mid;
5006 }
5007 } else {
5008 let mid = (lat_min + lat_max) / 2.0;
5009 if lat_norm >= mid {
5010 hash |= 1 << (GEO_HASH_BITS - 1 - i);
5011 lat_min = mid;
5012 } else {
5013 lat_max = mid;
5014 }
5015 }
5016 }
5017
5018 f64::from_bits(hash)
5019}
5020
5021fn geo_decode(score: f64) -> (f64, f64) {
5022 let hash = score.to_bits();
5023
5024 let mut lon_min = 0.0f64;
5025 let mut lon_max = 1.0f64;
5026 let mut lat_min = 0.0f64;
5027 let mut lat_max = 1.0f64;
5028
5029 for i in 0..GEO_HASH_BITS {
5030 let bit = (hash >> (GEO_HASH_BITS - 1 - i)) & 1;
5031 if i % 2 == 0 {
5032 let mid = (lon_min + lon_max) / 2.0;
5033 if bit == 1 {
5034 lon_min = mid;
5035 } else {
5036 lon_max = mid;
5037 }
5038 } else {
5039 let mid = (lat_min + lat_max) / 2.0;
5040 if bit == 1 {
5041 lat_min = mid;
5042 } else {
5043 lat_max = mid;
5044 }
5045 }
5046 }
5047
5048 let lon = (lon_min + lon_max) / 2.0 * 360.0 - 180.0;
5049 let lat = (lat_min + lat_max) / 2.0 * 180.0 - 90.0;
5050 (lon, lat)
5051}
5052
5053const BASE32_CHARS: &[u8] = b"0123456789bcdefghjkmnpqrstuvwxyz";
5054
5055fn geo_encode_base32(lon: f64, lat: f64) -> String {
5056 let lon_norm = (lon + 180.0) / 360.0;
5057 let lat_norm = (lat + 90.0) / 180.0;
5058
5059 let mut bits: Vec<bool> = Vec::with_capacity(55);
5060 let mut lon_min = 0.0f64;
5061 let mut lon_max = 1.0f64;
5062 let mut lat_min = 0.0f64;
5063 let mut lat_max = 1.0f64;
5064
5065 for i in 0..55 {
5066 if i % 2 == 0 {
5067 let mid = (lon_min + lon_max) / 2.0;
5068 if lon_norm >= mid {
5069 bits.push(true);
5070 lon_min = mid;
5071 } else {
5072 bits.push(false);
5073 lon_max = mid;
5074 }
5075 } else {
5076 let mid = (lat_min + lat_max) / 2.0;
5077 if lat_norm >= mid {
5078 bits.push(true);
5079 lat_min = mid;
5080 } else {
5081 bits.push(false);
5082 lat_max = mid;
5083 }
5084 }
5085 }
5086
5087 let mut result = String::with_capacity(11);
5088 for chunk in bits.chunks(5) {
5089 let mut idx = 0u8;
5090 for (j, &bit) in chunk.iter().enumerate() {
5091 if bit {
5092 idx |= 1 << (4 - j);
5093 }
5094 }
5095 result.push(BASE32_CHARS[idx as usize] as char);
5096 }
5097 result
5098}
5099
5100fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
5101 const EARTH_RADIUS: f64 = 6372797.560856;
5102
5103 let lat1_rad = lat1.to_radians();
5104 let lat2_rad = lat2.to_radians();
5105 let dlat = (lat2 - lat1).to_radians();
5106 let dlon = (lon2 - lon1).to_radians();
5107
5108 let a =
5109 (dlat / 2.0).sin().powi(2) + lat1_rad.cos() * lat2_rad.cos() * (dlon / 2.0).sin().powi(2);
5110 let c = 2.0 * a.sqrt().asin();
5111
5112 EARTH_RADIUS * c
5113}
5114
5115fn geo_convert_distance(meters: f64, unit: GeoUnit) -> f64 {
5116 match unit {
5117 GeoUnit::Meters => meters,
5118 GeoUnit::Kilometers => meters / 1000.0,
5119 GeoUnit::Feet => meters * 3.28084,
5120 GeoUnit::Miles => meters / 1609.344,
5121 }
5122}
5123
5124fn geo_to_meters(value: f64, unit: GeoUnit) -> f64 {
5125 match unit {
5126 GeoUnit::Meters => value,
5127 GeoUnit::Kilometers => value * 1000.0,
5128 GeoUnit::Feet => value / 3.28084,
5129 GeoUnit::Miles => value * 1609.344,
5130 }
5131}
5132
5133#[cfg(test)]
5134mod tests {
5135 use super::*;
5136
5137 #[test]
5138 fn test_set_and_get() {
5139 let mut store = ShardStore::new(0);
5140 store.execute(Command::Set {
5141 key: b"key1".to_vec(),
5142 value: b"value1".to_vec(),
5143 ex: None,
5144 px: None,
5145 nx: false,
5146 xx: false,
5147 });
5148 match store.execute(Command::Get {
5149 key: b"key1".to_vec(),
5150 }) {
5151 CommandResponse::BulkString(v) => assert_eq!(v, b"value1"),
5152 other => panic!("Expected BulkString, got {:?}", other),
5153 }
5154 }
5155
5156 #[test]
5157 fn test_get_large_value_uses_shared_bulk_string() {
5158 let mut store = ShardStore::new(0);
5159 let value = vec![b'x'; 256];
5160 store.execute(Command::Set {
5161 key: b"large".to_vec(),
5162 value: value.clone(),
5163 ex: None,
5164 px: None,
5165 nx: false,
5166 xx: false,
5167 });
5168
5169 match store.execute(Command::Get {
5170 key: b"large".to_vec(),
5171 }) {
5172 CommandResponse::BulkStringShared(bytes) => assert_eq!(bytes.as_ref(), value),
5173 other => panic!("Expected BulkStringShared, got {:?}", other),
5174 }
5175 }
5176
5177 #[test]
5178 fn test_get_nonexistent() {
5179 let mut store = ShardStore::new(0);
5180 assert!(matches!(
5181 store.execute(Command::Get {
5182 key: b"nope".to_vec()
5183 }),
5184 CommandResponse::Nil
5185 ));
5186 }
5187
5188 #[test]
5189 fn test_set_nx() {
5190 let mut store = ShardStore::new(0);
5191 store.execute(Command::Set {
5192 key: b"k".to_vec(),
5193 value: b"v1".to_vec(),
5194 ex: None,
5195 px: None,
5196 nx: false,
5197 xx: false,
5198 });
5199 let resp = store.execute(Command::SetNx {
5201 key: b"k".to_vec(),
5202 value: b"v2".to_vec(),
5203 });
5204 assert!(matches!(resp, CommandResponse::Integer(0)));
5205 match store.execute(Command::Get { key: b"k".to_vec() }) {
5207 CommandResponse::BulkString(v) => assert_eq!(v, b"v1"),
5208 other => panic!("Expected v1, got {:?}", other),
5209 }
5210 }
5211
5212 #[test]
5213 fn test_set_xx_treats_expired_key_as_missing() {
5214 let mut store = ShardStore::new(0);
5215 store.execute(Command::Set {
5216 key: b"k".to_vec(),
5217 value: b"stale".to_vec(),
5218 ex: None,
5219 px: Some(1),
5220 nx: false,
5221 xx: false,
5222 });
5223 std::thread::sleep(Duration::from_millis(5));
5224
5225 let resp = store.execute(Command::Set {
5226 key: b"k".to_vec(),
5227 value: b"fresh".to_vec(),
5228 ex: None,
5229 px: None,
5230 nx: false,
5231 xx: true,
5232 });
5233 assert!(matches!(resp, CommandResponse::Nil));
5234 assert!(matches!(
5235 store.execute(Command::Get { key: b"k".to_vec() }),
5236 CommandResponse::Nil
5237 ));
5238 }
5239
5240 #[test]
5241 fn test_incr_decr() {
5242 let mut store = ShardStore::new(0);
5243 match store.execute(Command::Incr {
5245 key: b"counter".to_vec(),
5246 }) {
5247 CommandResponse::Integer(1) => {}
5248 other => panic!("Expected 1, got {:?}", other),
5249 }
5250 store.execute(Command::IncrBy {
5251 key: b"counter".to_vec(),
5252 delta: 10,
5253 });
5254 match store.execute(Command::Get {
5255 key: b"counter".to_vec(),
5256 }) {
5257 CommandResponse::BulkString(v) => assert_eq!(v, b"11"),
5258 other => panic!("Expected 11, got {:?}", other),
5259 }
5260 store.execute(Command::Decr {
5261 key: b"counter".to_vec(),
5262 });
5263 match store.execute(Command::Get {
5264 key: b"counter".to_vec(),
5265 }) {
5266 CommandResponse::BulkString(v) => assert_eq!(v, b"10"),
5267 other => panic!("Expected 10, got {:?}", other),
5268 }
5269 }
5270
5271 #[test]
5272 fn test_del_multiple() {
5273 let mut store = ShardStore::new(0);
5274 store.execute(Command::Set {
5275 key: b"a".to_vec(),
5276 value: b"1".to_vec(),
5277 ex: None,
5278 px: None,
5279 nx: false,
5280 xx: false,
5281 });
5282 store.execute(Command::Set {
5283 key: b"b".to_vec(),
5284 value: b"2".to_vec(),
5285 ex: None,
5286 px: None,
5287 nx: false,
5288 xx: false,
5289 });
5290 match store.execute(Command::Del {
5291 keys: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5292 }) {
5293 CommandResponse::Integer(2) => {}
5294 other => panic!("Expected 2, got {:?}", other),
5295 }
5296 }
5297
5298 #[test]
5299 fn test_expire_and_ttl() {
5300 let mut store = ShardStore::new(0);
5301 store.execute(Command::Set {
5302 key: b"k".to_vec(),
5303 value: b"v".to_vec(),
5304 ex: Some(100),
5305 px: None,
5306 nx: false,
5307 xx: false,
5308 });
5309 match store.execute(Command::Ttl { key: b"k".to_vec() }) {
5310 CommandResponse::Integer(n) => assert!(n > 0 && n <= 100),
5311 other => panic!("Expected positive TTL, got {:?}", other),
5312 }
5313 }
5314
5315 #[test]
5316 fn test_type_command() {
5317 let mut store = ShardStore::new(0);
5318 store.execute(Command::Set {
5319 key: b"s".to_vec(),
5320 value: b"v".to_vec(),
5321 ex: None,
5322 px: None,
5323 nx: false,
5324 xx: false,
5325 });
5326 store.execute(Command::LPush {
5327 key: b"l".to_vec(),
5328 values: vec![b"a".to_vec()],
5329 });
5330 store.execute(Command::HSet {
5331 key: b"h".to_vec(),
5332 fields: vec![(b"f".to_vec(), b"v".to_vec())],
5333 });
5334 store.execute(Command::SAdd {
5335 key: b"set".to_vec(),
5336 members: vec![b"m".to_vec()],
5337 });
5338
5339 assert!(
5340 matches!(store.execute(Command::Type { key: b"s".to_vec() }), CommandResponse::SimpleString(s) if s == "string")
5341 );
5342 assert!(
5343 matches!(store.execute(Command::Type { key: b"l".to_vec() }), CommandResponse::SimpleString(s) if s == "list")
5344 );
5345 assert!(
5346 matches!(store.execute(Command::Type { key: b"h".to_vec() }), CommandResponse::SimpleString(s) if s == "hash")
5347 );
5348 assert!(
5349 matches!(store.execute(Command::Type { key: b"set".to_vec() }), CommandResponse::SimpleString(s) if s == "set")
5350 );
5351 assert!(
5352 matches!(store.execute(Command::Type { key: b"none".to_vec() }), CommandResponse::SimpleString(s) if s == "none")
5353 );
5354 }
5355
5356 #[test]
5357 fn test_list_operations() {
5358 let mut store = ShardStore::new(0);
5359 store.execute(Command::RPush {
5360 key: b"list".to_vec(),
5361 values: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5362 });
5363 assert!(matches!(
5364 store.execute(Command::LLen {
5365 key: b"list".to_vec()
5366 }),
5367 CommandResponse::Integer(3)
5368 ));
5369 match store.execute(Command::LRange {
5370 key: b"list".to_vec(),
5371 start: 0,
5372 stop: -1,
5373 }) {
5374 CommandResponse::Array(items) => assert_eq!(items.len(), 3),
5375 other => panic!("Expected array of 3, got {:?}", other),
5376 }
5377 match store.execute(Command::LPop {
5378 key: b"list".to_vec(),
5379 }) {
5380 CommandResponse::BulkString(v) => assert_eq!(v, b"a"),
5381 other => panic!("Expected 'a', got {:?}", other),
5382 }
5383 match store.execute(Command::RPop {
5384 key: b"list".to_vec(),
5385 }) {
5386 CommandResponse::BulkString(v) => assert_eq!(v, b"c"),
5387 other => panic!("Expected 'c', got {:?}", other),
5388 }
5389 }
5390
5391 #[test]
5392 fn test_hash_operations() {
5393 let mut store = ShardStore::new(0);
5394 store.execute(Command::HSet {
5395 key: b"hash".to_vec(),
5396 fields: vec![
5397 (b"name".to_vec(), b"Alice".to_vec()),
5398 (b"age".to_vec(), b"30".to_vec()),
5399 ],
5400 });
5401 match store.execute(Command::HGet {
5402 key: b"hash".to_vec(),
5403 field: b"name".to_vec(),
5404 }) {
5405 CommandResponse::BulkString(v) => assert_eq!(v, b"Alice"),
5406 other => panic!("Expected Alice, got {:?}", other),
5407 }
5408 assert!(matches!(
5409 store.execute(Command::HLen {
5410 key: b"hash".to_vec()
5411 }),
5412 CommandResponse::Integer(2)
5413 ));
5414 assert!(matches!(
5415 store.execute(Command::HExists {
5416 key: b"hash".to_vec(),
5417 field: b"name".to_vec()
5418 }),
5419 CommandResponse::Integer(1)
5420 ));
5421 store.execute(Command::HIncrBy {
5422 key: b"hash".to_vec(),
5423 field: b"age".to_vec(),
5424 delta: 5,
5425 });
5426 match store.execute(Command::HGet {
5427 key: b"hash".to_vec(),
5428 field: b"age".to_vec(),
5429 }) {
5430 CommandResponse::BulkString(v) => assert_eq!(v, b"35"),
5431 other => panic!("Expected 35, got {:?}", other),
5432 }
5433 }
5434
5435 #[test]
5436 fn test_set_operations() {
5437 let mut store = ShardStore::new(0);
5438 store.execute(Command::SAdd {
5439 key: b"myset".to_vec(),
5440 members: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5441 });
5442 assert!(matches!(
5443 store.execute(Command::SCard {
5444 key: b"myset".to_vec()
5445 }),
5446 CommandResponse::Integer(3)
5447 ));
5448 assert!(matches!(
5449 store.execute(Command::SIsMember {
5450 key: b"myset".to_vec(),
5451 member: b"a".to_vec()
5452 }),
5453 CommandResponse::Integer(1)
5454 ));
5455 assert!(matches!(
5456 store.execute(Command::SIsMember {
5457 key: b"myset".to_vec(),
5458 member: b"z".to_vec()
5459 }),
5460 CommandResponse::Integer(0)
5461 ));
5462 store.execute(Command::SRem {
5463 key: b"myset".to_vec(),
5464 members: vec![b"b".to_vec()],
5465 });
5466 assert!(matches!(
5467 store.execute(Command::SCard {
5468 key: b"myset".to_vec()
5469 }),
5470 CommandResponse::Integer(2)
5471 ));
5472 }
5473
5474 #[test]
5475 fn test_set_numeric_payload_keeps_string_encoding() {
5476 let mut store = ShardStore::new(0);
5477 store.execute(Command::Set {
5478 key: b"num".to_vec(),
5479 value: b"42".to_vec(),
5480 ex: None,
5481 px: None,
5482 nx: false,
5483 xx: false,
5484 });
5485
5486 let get_resp = store.execute(Command::Get {
5487 key: b"num".to_vec(),
5488 });
5489 assert_eq!(get_resp.bulk_string_bytes(), Some(b"42".as_slice()));
5490
5491 match store.execute(Command::ObjectEncoding {
5492 key: b"num".to_vec(),
5493 }) {
5494 CommandResponse::BulkString(encoding) => assert_eq!(encoding, b"embstr"),
5495 other => panic!("Expected embstr encoding, got {:?}", other),
5496 }
5497 }
5498
5499 #[test]
5500 fn test_set_members_are_binary_safe() {
5501 let mut store = ShardStore::new(0);
5502 store.execute(Command::SAdd {
5503 key: b"codes".to_vec(),
5504 members: vec![b"042".to_vec()],
5505 });
5506
5507 assert_eq!(
5508 store.execute(Command::SIsMember {
5509 key: b"codes".to_vec(),
5510 member: b"042".to_vec(),
5511 }),
5512 CommandResponse::Integer(1)
5513 );
5514 assert_eq!(
5515 store.execute(Command::SIsMember {
5516 key: b"codes".to_vec(),
5517 member: b"42".to_vec(),
5518 }),
5519 CommandResponse::Integer(0)
5520 );
5521 }
5522
5523 #[test]
5524 fn test_wrongtype_error() {
5525 let mut store = ShardStore::new(0);
5526 store.execute(Command::Set {
5527 key: b"str".to_vec(),
5528 value: b"v".to_vec(),
5529 ex: None,
5530 px: None,
5531 nx: false,
5532 xx: false,
5533 });
5534 let resp = store.execute(Command::LPush {
5535 key: b"str".to_vec(),
5536 values: vec![b"x".to_vec()],
5537 });
5538 assert!(matches!(resp, CommandResponse::Error(s) if s.contains("WRONGTYPE")));
5539 }
5540
5541 #[test]
5542 fn test_keys_pattern() {
5543 let mut store = ShardStore::new(0);
5544 store.execute(Command::Set {
5545 key: b"user:1".to_vec(),
5546 value: b"a".to_vec(),
5547 ex: None,
5548 px: None,
5549 nx: false,
5550 xx: false,
5551 });
5552 store.execute(Command::Set {
5553 key: b"user:2".to_vec(),
5554 value: b"b".to_vec(),
5555 ex: None,
5556 px: None,
5557 nx: false,
5558 xx: false,
5559 });
5560 store.execute(Command::Set {
5561 key: b"session:1".to_vec(),
5562 value: b"c".to_vec(),
5563 ex: None,
5564 px: None,
5565 nx: false,
5566 xx: false,
5567 });
5568 match store.execute(Command::Keys {
5569 pattern: "user:*".into(),
5570 }) {
5571 CommandResponse::Array(items) => assert_eq!(items.len(), 2),
5572 other => panic!("Expected 2 keys, got {:?}", other),
5573 }
5574 }
5575
5576 #[test]
5577 fn test_ping_echo() {
5578 let mut store = ShardStore::new(0);
5579 assert!(
5580 matches!(store.execute(Command::Ping { message: None }), CommandResponse::SimpleString(s) if s == "PONG")
5581 );
5582 match store.execute(Command::Echo {
5583 message: b"hello".to_vec(),
5584 }) {
5585 CommandResponse::BulkString(v) => assert_eq!(v, b"hello"),
5586 other => panic!("Expected hello, got {:?}", other),
5587 }
5588 }
5589
5590 #[test]
5591 fn test_glob_matching() {
5592 assert!(glob_match("*", b"anything"));
5593 assert!(glob_match("user:*", b"user:123"));
5594 assert!(!glob_match("user:*", b"session:123"));
5595 assert!(glob_match("h?llo", b"hello"));
5596 assert!(glob_match("h?llo", b"hallo"));
5597 assert!(!glob_match("h?llo", b"hlo"));
5598 }
5599
5600 #[test]
5601 fn test_append() {
5602 let mut store = ShardStore::new(0);
5603 store.execute(Command::Append {
5604 key: b"k".to_vec(),
5605 value: b"Hello".to_vec(),
5606 });
5607 store.execute(Command::Append {
5608 key: b"k".to_vec(),
5609 value: b" World".to_vec(),
5610 });
5611 match store.execute(Command::Get { key: b"k".to_vec() }) {
5612 CommandResponse::BulkString(v) => assert_eq!(v, b"Hello World"),
5613 other => panic!("Expected 'Hello World', got {:?}", other),
5614 }
5615 }
5616
5617 #[test]
5618 fn test_dbsize_and_flush() {
5619 let mut store = ShardStore::new(0);
5620 store.execute(Command::Set {
5621 key: b"a".to_vec(),
5622 value: b"1".to_vec(),
5623 ex: None,
5624 px: None,
5625 nx: false,
5626 xx: false,
5627 });
5628 store.execute(Command::Set {
5629 key: b"b".to_vec(),
5630 value: b"2".to_vec(),
5631 ex: None,
5632 px: None,
5633 nx: false,
5634 xx: false,
5635 });
5636 assert!(matches!(
5637 store.execute(Command::DbSize),
5638 CommandResponse::Integer(2)
5639 ));
5640 store.execute(Command::FlushDb);
5641 assert!(matches!(
5642 store.execute(Command::DbSize),
5643 CommandResponse::Integer(0)
5644 ));
5645 }
5646
5647 #[test]
5648 fn test_lfu_counter_starts_at_5() {
5649 let mut store = ShardStore::new(0);
5650 store.execute(Command::Set {
5651 key: b"k".to_vec(),
5652 value: b"v".to_vec(),
5653 ex: None,
5654 px: None,
5655 nx: false,
5656 xx: false,
5657 });
5658 match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5659 CommandResponse::Integer(n) => assert_eq!(n, 5),
5660 other => panic!("Expected Integer(5), got {:?}", other),
5661 }
5662 }
5663
5664 #[test]
5665 fn test_lfu_counter_increments_on_access() {
5666 let mut store = ShardStore::new(0);
5667 store.execute(Command::Set {
5668 key: b"k".to_vec(),
5669 value: b"v".to_vec(),
5670 ex: None,
5671 px: None,
5672 nx: false,
5673 xx: false,
5674 });
5675 for _ in 0..100 {
5676 store.execute(Command::Get { key: b"k".to_vec() });
5677 }
5678 match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5679 CommandResponse::Integer(n) => assert!(n >= 5, "LFU counter should be >= 5, got {}", n),
5680 other => panic!("Expected Integer, got {:?}", other),
5681 }
5682 }
5683
5684 #[test]
5685 fn test_eviction_removes_least_frequent_key() {
5686 let mut store = ShardStore::new(0);
5687 store.set_max_memory(1);
5688
5689 store.execute(Command::Set {
5690 key: b"cold".to_vec(),
5691 value: b"val".to_vec(),
5692 ex: None,
5693 px: None,
5694 nx: false,
5695 xx: false,
5696 });
5697
5698 for _ in 0..50 {
5699 store.execute(Command::Get {
5700 key: b"cold".to_vec(),
5701 });
5702 }
5703
5704 store.execute(Command::Set {
5705 key: b"hot".to_vec(),
5706 value: b"val".to_vec(),
5707 ex: None,
5708 px: None,
5709 nx: false,
5710 xx: false,
5711 });
5712
5713 for _ in 0..200 {
5714 store.execute(Command::Get {
5715 key: b"hot".to_vec(),
5716 });
5717 }
5718
5719 store.execute(Command::Set {
5720 key: b"trigger".to_vec(),
5721 value: b"val".to_vec(),
5722 ex: None,
5723 px: None,
5724 nx: false,
5725 xx: false,
5726 });
5727
5728 assert!(
5729 store.len() < 4,
5730 "Eviction should have removed at least one key"
5731 );
5732 }
5733
5734 #[test]
5735 fn test_object_freq_command() {
5736 let mut store = ShardStore::new(0);
5737 assert!(matches!(
5738 store.execute(Command::ObjectFreq {
5739 key: b"missing".to_vec()
5740 }),
5741 CommandResponse::Nil
5742 ));
5743 store.execute(Command::Set {
5744 key: b"k".to_vec(),
5745 value: b"v".to_vec(),
5746 ex: None,
5747 px: None,
5748 nx: false,
5749 xx: false,
5750 });
5751 match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5752 CommandResponse::Integer(n) => assert!(n >= 0),
5753 other => panic!("Expected Integer, got {:?}", other),
5754 }
5755 }
5756
5757 #[test]
5758 fn test_object_encoding_command() {
5759 let mut store = ShardStore::new(0);
5760 store.execute(Command::Set {
5761 key: b"k".to_vec(),
5762 value: b"v".to_vec(),
5763 ex: None,
5764 px: None,
5765 nx: false,
5766 xx: false,
5767 });
5768 match store.execute(Command::ObjectEncoding { key: b"k".to_vec() }) {
5769 CommandResponse::BulkString(v) => {
5770 let encoding = String::from_utf8(v).unwrap_or_default();
5771 assert!(
5772 encoding == "embstr" || encoding == "raw" || encoding == "int",
5773 "Unexpected encoding: {}",
5774 encoding
5775 );
5776 }
5777 other => panic!("Expected BulkString, got {:?}", other),
5778 }
5779 }
5780
5781 #[test]
5782 fn test_zadd_and_zscore() {
5783 let mut store = ShardStore::new(0);
5784 let resp = store.execute(Command::ZAdd {
5785 key: b"zs".to_vec(),
5786 members: vec![
5787 (1.0, b"alice".to_vec()),
5788 (2.0, b"bob".to_vec()),
5789 (3.0, b"charlie".to_vec()),
5790 ],
5791 });
5792 assert_eq!(resp, CommandResponse::Integer(3));
5793 match store.execute(Command::ZScore {
5794 key: b"zs".to_vec(),
5795 member: b"bob".to_vec(),
5796 }) {
5797 CommandResponse::BulkString(v) => assert_eq!(v, b"2"),
5798 other => panic!("Expected BulkString, got {:?}", other),
5799 }
5800 assert!(matches!(
5801 store.execute(Command::ZScore {
5802 key: b"zs".to_vec(),
5803 member: b"unknown".to_vec(),
5804 }),
5805 CommandResponse::Nil
5806 ));
5807 }
5808
5809 #[test]
5810 fn test_zadd_update_score() {
5811 let mut store = ShardStore::new(0);
5812 store.execute(Command::ZAdd {
5813 key: b"zs".to_vec(),
5814 members: vec![(1.0, b"alice".to_vec())],
5815 });
5816 let resp = store.execute(Command::ZAdd {
5817 key: b"zs".to_vec(),
5818 members: vec![(5.0, b"alice".to_vec())],
5819 });
5820 assert_eq!(resp, CommandResponse::Integer(0));
5821 match store.execute(Command::ZScore {
5822 key: b"zs".to_vec(),
5823 member: b"alice".to_vec(),
5824 }) {
5825 CommandResponse::BulkString(v) => assert_eq!(v, b"5"),
5826 other => panic!("Expected BulkString(5), got {:?}", other),
5827 }
5828 }
5829
5830 #[test]
5831 fn test_zrem() {
5832 let mut store = ShardStore::new(0);
5833 store.execute(Command::ZAdd {
5834 key: b"zs".to_vec(),
5835 members: vec![
5836 (1.0, b"a".to_vec()),
5837 (2.0, b"b".to_vec()),
5838 (3.0, b"c".to_vec()),
5839 ],
5840 });
5841 let resp = store.execute(Command::ZRem {
5842 key: b"zs".to_vec(),
5843 members: vec![b"a".to_vec(), b"c".to_vec(), b"nonexistent".to_vec()],
5844 });
5845 assert_eq!(resp, CommandResponse::Integer(2));
5846 assert_eq!(
5847 store.execute(Command::ZCard {
5848 key: b"zs".to_vec()
5849 }),
5850 CommandResponse::Integer(1)
5851 );
5852 }
5853
5854 #[test]
5855 fn test_zrank_and_zrevrank() {
5856 let mut store = ShardStore::new(0);
5857 store.execute(Command::ZAdd {
5858 key: b"zs".to_vec(),
5859 members: vec![
5860 (10.0, b"a".to_vec()),
5861 (20.0, b"b".to_vec()),
5862 (30.0, b"c".to_vec()),
5863 ],
5864 });
5865 assert_eq!(
5866 store.execute(Command::ZRank {
5867 key: b"zs".to_vec(),
5868 member: b"a".to_vec(),
5869 }),
5870 CommandResponse::Integer(0)
5871 );
5872 assert_eq!(
5873 store.execute(Command::ZRank {
5874 key: b"zs".to_vec(),
5875 member: b"c".to_vec(),
5876 }),
5877 CommandResponse::Integer(2)
5878 );
5879 assert_eq!(
5880 store.execute(Command::ZRevRank {
5881 key: b"zs".to_vec(),
5882 member: b"c".to_vec(),
5883 }),
5884 CommandResponse::Integer(0)
5885 );
5886 assert!(matches!(
5887 store.execute(Command::ZRank {
5888 key: b"zs".to_vec(),
5889 member: b"missing".to_vec(),
5890 }),
5891 CommandResponse::Nil
5892 ));
5893 }
5894
5895 #[test]
5896 fn test_zrange_and_zrevrange() {
5897 let mut store = ShardStore::new(0);
5898 store.execute(Command::ZAdd {
5899 key: b"zs".to_vec(),
5900 members: vec![
5901 (1.0, b"a".to_vec()),
5902 (2.0, b"b".to_vec()),
5903 (3.0, b"c".to_vec()),
5904 ],
5905 });
5906 match store.execute(Command::ZRange {
5907 key: b"zs".to_vec(),
5908 start: 0,
5909 stop: -1,
5910 withscores: false,
5911 }) {
5912 CommandResponse::Array(items) => {
5913 assert_eq!(items.len(), 3);
5914 assert_eq!(items[0], CommandResponse::BulkString(b"a".to_vec()));
5915 assert_eq!(items[2], CommandResponse::BulkString(b"c".to_vec()));
5916 }
5917 other => panic!("Expected Array, got {:?}", other),
5918 }
5919 match store.execute(Command::ZRevRange {
5920 key: b"zs".to_vec(),
5921 start: 0,
5922 stop: 1,
5923 withscores: true,
5924 }) {
5925 CommandResponse::Array(items) => {
5926 assert_eq!(items.len(), 4);
5927 assert_eq!(items[0], CommandResponse::BulkString(b"c".to_vec()));
5928 assert_eq!(items[1], CommandResponse::BulkString(b"3".to_vec()));
5929 assert_eq!(items[2], CommandResponse::BulkString(b"b".to_vec()));
5930 }
5931 other => panic!("Expected Array, got {:?}", other),
5932 }
5933 }
5934
5935 #[test]
5936 fn test_zrangebyscore() {
5937 let mut store = ShardStore::new(0);
5938 store.execute(Command::ZAdd {
5939 key: b"zs".to_vec(),
5940 members: vec![
5941 (1.0, b"a".to_vec()),
5942 (2.0, b"b".to_vec()),
5943 (3.0, b"c".to_vec()),
5944 (4.0, b"d".to_vec()),
5945 ],
5946 });
5947 match store.execute(Command::ZRangeByScore {
5948 key: b"zs".to_vec(),
5949 min: 2.0,
5950 max: 3.0,
5951 withscores: false,
5952 offset: None,
5953 count: None,
5954 }) {
5955 CommandResponse::Array(items) => {
5956 assert_eq!(items.len(), 2);
5957 assert_eq!(items[0], CommandResponse::BulkString(b"b".to_vec()));
5958 assert_eq!(items[1], CommandResponse::BulkString(b"c".to_vec()));
5959 }
5960 other => panic!("Expected Array, got {:?}", other),
5961 }
5962 match store.execute(Command::ZRangeByScore {
5963 key: b"zs".to_vec(),
5964 min: 1.0,
5965 max: 4.0,
5966 withscores: false,
5967 offset: Some(1),
5968 count: Some(2),
5969 }) {
5970 CommandResponse::Array(items) => {
5971 assert_eq!(items.len(), 2);
5972 assert_eq!(items[0], CommandResponse::BulkString(b"b".to_vec()));
5973 assert_eq!(items[1], CommandResponse::BulkString(b"c".to_vec()));
5974 }
5975 other => panic!("Expected Array, got {:?}", other),
5976 }
5977 }
5978
5979 #[test]
5980 fn test_zincrby() {
5981 let mut store = ShardStore::new(0);
5982 store.execute(Command::ZAdd {
5983 key: b"zs".to_vec(),
5984 members: vec![(10.0, b"a".to_vec())],
5985 });
5986 match store.execute(Command::ZIncrBy {
5987 key: b"zs".to_vec(),
5988 delta: 5.0,
5989 member: b"a".to_vec(),
5990 }) {
5991 CommandResponse::BulkString(v) => assert_eq!(v, b"15"),
5992 other => panic!("Expected BulkString, got {:?}", other),
5993 }
5994 match store.execute(Command::ZIncrBy {
5995 key: b"zs".to_vec(),
5996 delta: 3.0,
5997 member: b"newmember".to_vec(),
5998 }) {
5999 CommandResponse::BulkString(v) => assert_eq!(v, b"3"),
6000 other => panic!("Expected BulkString, got {:?}", other),
6001 }
6002 }
6003
6004 #[test]
6005 fn test_zcount() {
6006 let mut store = ShardStore::new(0);
6007 store.execute(Command::ZAdd {
6008 key: b"zs".to_vec(),
6009 members: vec![
6010 (1.0, b"a".to_vec()),
6011 (2.0, b"b".to_vec()),
6012 (3.0, b"c".to_vec()),
6013 (4.0, b"d".to_vec()),
6014 ],
6015 });
6016 assert_eq!(
6017 store.execute(Command::ZCount {
6018 key: b"zs".to_vec(),
6019 min: 2.0,
6020 max: 3.0,
6021 }),
6022 CommandResponse::Integer(2)
6023 );
6024 assert_eq!(
6025 store.execute(Command::ZCount {
6026 key: b"zs".to_vec(),
6027 min: f64::NEG_INFINITY,
6028 max: f64::INFINITY,
6029 }),
6030 CommandResponse::Integer(4)
6031 );
6032 }
6033
6034 #[test]
6035 fn test_zcard() {
6036 let mut store = ShardStore::new(0);
6037 assert_eq!(
6038 store.execute(Command::ZCard {
6039 key: b"zs".to_vec()
6040 }),
6041 CommandResponse::Integer(0)
6042 );
6043 store.execute(Command::ZAdd {
6044 key: b"zs".to_vec(),
6045 members: vec![(1.0, b"a".to_vec()), (2.0, b"b".to_vec())],
6046 });
6047 assert_eq!(
6048 store.execute(Command::ZCard {
6049 key: b"zs".to_vec()
6050 }),
6051 CommandResponse::Integer(2)
6052 );
6053 }
6054}