Skip to main content

kora_core/shard/
store.rs

1//! Per-shard key-value store with full command execution.
2//!
3//! `ShardStore` is the workhorse of the Kōra engine. Each shard worker thread
4//! owns exactly one instance, so every operation runs single-threaded with no
5//! locking. The store handles:
6//!
7//! - String, list, hash, set, sorted set, stream, HyperLogLog, bitmap, and
8//!   geo commands.
9//! - Lazy TTL expiration on access plus periodic bounded-sample sweeps.
10//! - LFU-based eviction when a memory limit is configured.
11//! - Optional per-command statistics recording (behind the `observability`
12//!   feature gate).
13//! - Optional HNSW vector index operations (behind the `vector` feature gate).
14
15use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
16use std::time::Duration;
17
18use ahash::AHashMap;
19
20use crate::command::{
21    command_docs_response, command_help_response, command_info_response, command_list_response,
22    supported_command_count, BitFieldEncoding, BitFieldOffset, BitFieldOperation, BitFieldOverflow,
23    BitOperation, Command, CommandResponse, GeoUnit,
24};
25use crate::types::{
26    CompactKey, KeyEntry, PendingEntry, StreamConsumerGroup, StreamEntry, StreamId, StreamLog,
27    Value,
28};
29
30mod hash_commands;
31mod list_commands;
32
33#[cfg(feature = "observability")]
34use kora_observability::stats::ShardStats;
35
36#[cfg(feature = "vector")]
37use kora_vector::distance::DistanceMetric;
38#[cfg(feature = "vector")]
39use kora_vector::hnsw::HnswIndex;
40
41/// A single shard's key-value store.
42///
43/// Each worker thread owns exactly one `ShardStore`. All operations on it
44/// are single-threaded — no locking required.
45pub struct ShardStore {
46    entries: AHashMap<CompactKey, KeyEntry>,
47    shard_id: u16,
48    max_memory: usize,
49    memory_used: usize,
50    eviction_counter: u64,
51    expire_scan_cursor: usize,
52    #[cfg(feature = "observability")]
53    stats: ShardStats,
54    #[cfg(feature = "observability")]
55    stats_enabled: bool,
56    #[cfg(feature = "vector")]
57    vector_indexes: AHashMap<CompactKey, HnswIndex>,
58    stream_groups: AHashMap<CompactKey, AHashMap<String, StreamConsumerGroup>>,
59}
60
61impl ShardStore {
62    /// Create a new empty shard store.
63    pub fn new(shard_id: u16) -> Self {
64        Self {
65            entries: AHashMap::new(),
66            shard_id,
67            max_memory: 0,
68            memory_used: 0,
69            eviction_counter: 0,
70            expire_scan_cursor: 0,
71            #[cfg(feature = "observability")]
72            stats: ShardStats::new(),
73            #[cfg(feature = "observability")]
74            stats_enabled: false,
75            #[cfg(feature = "vector")]
76            vector_indexes: AHashMap::new(),
77            stream_groups: AHashMap::new(),
78        }
79    }
80
81    /// Get a reference to the shard stats (observability feature).
82    #[cfg(feature = "observability")]
83    pub fn stats(&self) -> &ShardStats {
84        &self.stats
85    }
86
87    /// Enable or disable per-command statistics recording.
88    #[cfg(feature = "observability")]
89    pub fn set_stats_enabled(&mut self, enabled: bool) {
90        self.stats_enabled = enabled;
91    }
92
93    /// Get the shard ID.
94    pub fn shard_id(&self) -> u16 {
95        self.shard_id
96    }
97
98    /// Get the number of keys in this shard.
99    pub fn len(&self) -> usize {
100        self.entries.len()
101    }
102
103    /// Check if the shard is empty.
104    pub fn is_empty(&self) -> bool {
105        self.entries.is_empty()
106    }
107
108    /// Set the maximum memory limit for this shard (0 = unlimited).
109    pub fn set_max_memory(&mut self, bytes: usize) {
110        self.max_memory = bytes;
111    }
112
113    /// Get the maximum memory limit for this shard.
114    pub fn max_memory(&self) -> usize {
115        self.max_memory
116    }
117
118    /// Iterate over all entries in this shard.
119    pub fn entries_iter(&self) -> impl Iterator<Item = (&CompactKey, &KeyEntry)> {
120        self.entries.iter()
121    }
122
123    /// Get a reference to a key entry.
124    pub fn get_entry(&self, key: &CompactKey) -> Option<&KeyEntry> {
125        self.entries.get(key)
126    }
127
128    /// Execute a GET operation against a borrowed key.
129    pub fn get_bytes(&mut self, key: &[u8]) -> CommandResponse {
130        self.cmd_get(key)
131    }
132
133    /// Execute a SET-like operation against borrowed key and value bytes.
134    pub fn set_bytes(
135        &mut self,
136        key: &[u8],
137        value: &[u8],
138        ex: Option<u64>,
139        px: Option<u64>,
140        nx: bool,
141        xx: bool,
142    ) -> CommandResponse {
143        self.cmd_set(key, value, ex, px, nx, xx)
144    }
145
146    /// Execute INCRBY/DECRBY style mutation against a borrowed key.
147    pub fn incr_by_bytes(&mut self, key: &[u8], delta: i64) -> CommandResponse {
148        self.cmd_incrby(key, delta)
149    }
150
151    /// Get a mutable reference to a key entry.
152    pub fn get_entry_mut(&mut self, key: &CompactKey) -> Option<&mut KeyEntry> {
153        self.entries.get_mut(key)
154    }
155
156    /// Insert a key entry directly (used by migration and restore paths).
157    pub fn insert_entry(&mut self, key: CompactKey, entry: KeyEntry) {
158        let size = Self::estimate_key_entry_size(key.as_bytes(), &entry.value);
159        self.memory_used += size;
160        self.entries.insert(key, entry);
161    }
162
163    /// Replace a key's value with a tier reference after demotion.
164    pub fn mark_demoted(&mut self, key: &CompactKey, tier: u8, ref_hash: u64) {
165        if let Some(entry) = self.entries.get_mut(key) {
166            let old_size = entry.value.estimated_size();
167            entry.value = match tier {
168                crate::types::TIER_WARM => Value::WarmRef(ref_hash),
169                _ => Value::ColdRef(ref_hash),
170            };
171            let new_size = entry.value.estimated_size();
172            if old_size > new_size {
173                self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
174            }
175            entry.set_tier(tier);
176        }
177    }
178
179    /// Promote a key back to the hot tier with the given value.
180    pub fn promote(&mut self, key: &CompactKey, value: Value) {
181        if let Some(entry) = self.entries.get_mut(key) {
182            let old_size = entry.value.estimated_size();
183            let new_size = value.estimated_size();
184            entry.value = value;
185            entry.set_tier(crate::types::TIER_HOT);
186            entry.lfu_counter = 5;
187            if new_size > old_size {
188                let delta = new_size - old_size;
189                self.memory_used += delta;
190            } else {
191                let delta = old_size - new_size;
192                self.memory_used = self.memory_used.saturating_sub(delta);
193            }
194        }
195    }
196
197    /// Remove all expired keys, returning the count removed.
198    pub fn evict_expired(&mut self) -> usize {
199        let expired_keys: Vec<CompactKey> = self
200            .entries
201            .iter()
202            .filter(|(_, entry)| entry.is_expired())
203            .map(|(key, _)| key.clone())
204            .collect();
205        for key in &expired_keys {
206            let _ = self.remove_compact_entry(key);
207        }
208        expired_keys.len()
209    }
210
211    /// Remove expired keys by scanning a bounded sample of entries.
212    ///
213    /// This avoids latency spikes from full-map sweeps on the hot path.
214    pub fn evict_expired_sample(&mut self, sample_size: usize) -> usize {
215        if sample_size == 0 || self.entries.is_empty() {
216            return 0;
217        }
218
219        let len = self.entries.len();
220        let start = self.expire_scan_cursor % len;
221        let mut examined = 0usize;
222        let mut expired_keys = Vec::new();
223
224        for (key, entry) in self.entries.iter().skip(start) {
225            if examined >= sample_size {
226                break;
227            }
228            if entry.is_expired() {
229                expired_keys.push(key.clone());
230            }
231            examined += 1;
232        }
233
234        if examined < sample_size {
235            for (key, entry) in self.entries.iter().take(start) {
236                if examined >= sample_size {
237                    break;
238                }
239                if entry.is_expired() {
240                    expired_keys.push(key.clone());
241                }
242                examined += 1;
243            }
244        }
245
246        for key in &expired_keys {
247            let _ = self.remove_compact_entry(key);
248        }
249        self.expire_scan_cursor = self.expire_scan_cursor.wrapping_add(examined);
250
251        expired_keys.len()
252    }
253
254    /// Remove all keys from this shard.
255    pub fn flush(&mut self) {
256        self.entries.clear();
257        self.memory_used = 0;
258        self.expire_scan_cursor = 0;
259        self.stream_groups.clear();
260    }
261
262    /// Execute a command on this shard and return the response.
263    pub fn execute(&mut self, cmd: Command) -> CommandResponse {
264        #[cfg(feature = "observability")]
265        let track_stats = self.stats_enabled;
266        #[cfg(feature = "observability")]
267        let start = if track_stats {
268            Some(std::time::Instant::now())
269        } else {
270            None
271        };
272        #[cfg(feature = "observability")]
273        let cmd_type = if track_stats {
274            Some(cmd.cmd_type() as usize)
275        } else {
276            None
277        };
278
279        if cmd.is_mutation() {
280            self.maybe_evict();
281        }
282
283        // LFU touches are only needed when eviction is active (max_memory > 0).
284        let should_touch_lfu = !cmd.is_mutation() && self.max_memory > 0;
285        let cmd_key = if should_touch_lfu {
286            cmd.key().map(CompactKey::new)
287        } else {
288            None
289        };
290
291        let response = self.execute_inner(cmd);
292
293        if let Some(key) = cmd_key {
294            self.touch_key(&key);
295        }
296
297        #[cfg(feature = "observability")]
298        {
299            if let (Some(start), Some(cmd_type)) = (start, cmd_type) {
300                let duration_ns = start.elapsed().as_nanos() as u64;
301                self.stats.record_command(cmd_type, duration_ns);
302                self.stats.set_key_count(self.entries.len() as u64);
303            }
304        }
305
306        response
307    }
308
309    fn execute_inner(&mut self, cmd: Command) -> CommandResponse {
310        match cmd {
311            Command::Get { key } => self.cmd_get(&key),
312            Command::Set {
313                key,
314                value,
315                ex,
316                px,
317                nx,
318                xx,
319            } => self.cmd_set(&key, &value, ex, px, nx, xx),
320            Command::GetSet { key, value } => self.cmd_getset(&key, &value),
321            Command::Append { key, value } => self.cmd_append(&key, &value),
322            Command::Strlen { key } => self.cmd_strlen(&key),
323            Command::Incr { key } => self.cmd_incrby(&key, 1),
324            Command::Decr { key } => self.cmd_incrby(&key, -1),
325            Command::IncrBy { key, delta } => self.cmd_incrby(&key, delta),
326            Command::DecrBy { key, delta } => self.cmd_incrby(&key, -delta),
327            Command::SetNx { key, value } => {
328                match self.cmd_set(&key, &value, None, None, true, false) {
329                    CommandResponse::Ok => CommandResponse::Integer(1),
330                    CommandResponse::Nil => CommandResponse::Integer(0),
331                    other => other,
332                }
333            }
334            Command::IncrByFloat { key, delta } => self.cmd_incrbyfloat(&key, delta),
335            Command::GetRange { key, start, end } => self.cmd_getrange(&key, start, end),
336            Command::SetRange { key, offset, value } => self.cmd_setrange(&key, offset, &value),
337            Command::GetDel { key } => self.cmd_getdel(&key),
338            Command::GetEx {
339                key,
340                ex,
341                px,
342                exat,
343                pxat,
344                persist,
345            } => self.cmd_getex(&key, ex, px, exat, pxat, persist),
346            Command::MSetNx { entries } => self.cmd_msetnx(&entries),
347
348            Command::Del { keys } => {
349                let count = keys.iter().filter(|k| self.del(k)).count();
350                CommandResponse::Integer(count as i64)
351            }
352            Command::Exists { keys } => {
353                let count = keys.iter().filter(|k| self.exists(k)).count();
354                CommandResponse::Integer(count as i64)
355            }
356            Command::Expire { key, seconds } => self.cmd_expire(&key, Duration::from_secs(seconds)),
357            Command::PExpire { key, millis } => {
358                self.cmd_expire(&key, Duration::from_millis(millis))
359            }
360            Command::Persist { key } => self.cmd_persist(&key),
361            Command::Ttl { key } => self.cmd_ttl(&key, false),
362            Command::PTtl { key } => self.cmd_ttl(&key, true),
363            Command::Type { key } => self.cmd_type(&key),
364            Command::Keys { pattern } => self.cmd_keys(&pattern),
365            Command::Scan {
366                cursor,
367                pattern,
368                count,
369            } => self.cmd_scan(cursor, pattern.as_deref(), count.unwrap_or(10)),
370            Command::ExpireAt { key, timestamp } => self.cmd_expireat(&key, timestamp, false),
371            Command::PExpireAt { key, timestamp_ms } => self.cmd_expireat(&key, timestamp_ms, true),
372            Command::Rename { key, newkey } => self.cmd_rename(&key, &newkey, false),
373            Command::RenameNx { key, newkey } => self.cmd_rename(&key, &newkey, true),
374            Command::Unlink { keys } => {
375                let count = keys.iter().filter(|k| self.del(k)).count();
376                CommandResponse::Integer(count as i64)
377            }
378            Command::Copy {
379                source,
380                destination,
381                replace,
382            } => self.cmd_copy(&source, &destination, replace),
383            Command::RandomKey => self.cmd_randomkey(),
384            Command::Touch { keys } => {
385                let count = keys.iter().filter(|k| self.exists(k)).count();
386                CommandResponse::Integer(count as i64)
387            }
388            Command::ObjectRefCount { ref key } => {
389                let compact = CompactKey::new(key);
390                match self.entries.get(&compact) {
391                    Some(entry) if !entry.is_expired() => CommandResponse::Integer(1),
392                    _ => CommandResponse::Nil,
393                }
394            }
395            Command::ObjectIdleTime { ref key } => {
396                let compact = CompactKey::new(key);
397                match self.entries.get(&compact) {
398                    Some(entry) if !entry.is_expired() => CommandResponse::Integer(0),
399                    _ => CommandResponse::Nil,
400                }
401            }
402            Command::ObjectHelp => CommandResponse::Array(vec![
403                CommandResponse::BulkString(b"OBJECT subcommand [arguments]".to_vec()),
404                CommandResponse::BulkString(
405                    b"ENCODING <key> - Return the encoding of a key.".to_vec(),
406                ),
407                CommandResponse::BulkString(
408                    b"FREQ <key> - Return the access frequency of a key.".to_vec(),
409                ),
410                CommandResponse::BulkString(b"HELP - Return this help message.".to_vec()),
411                CommandResponse::BulkString(
412                    b"IDLETIME <key> - Return the idle time of a key.".to_vec(),
413                ),
414                CommandResponse::BulkString(
415                    b"REFCOUNT <key> - Return the reference count of a key.".to_vec(),
416                ),
417            ]),
418            Command::DbSize => CommandResponse::Integer(self.entries.len() as i64),
419            Command::FlushDb | Command::FlushAll => {
420                self.flush();
421                CommandResponse::Ok
422            }
423
424            Command::LPush { key, values } => self.cmd_lpush(&key, &values),
425            Command::RPush { key, values } => self.cmd_rpush(&key, &values),
426            Command::LPop { key } => self.cmd_lpop(&key),
427            Command::RPop { key } => self.cmd_rpop(&key),
428            Command::LLen { key } => self.cmd_llen(&key),
429            Command::LRange { key, start, stop } => self.cmd_lrange(&key, start, stop),
430            Command::LIndex { key, index } => self.cmd_lindex(&key, index),
431            Command::LSet { key, index, value } => self.cmd_lset(&key, index, &value),
432            Command::LInsert {
433                key,
434                before,
435                pivot,
436                value,
437            } => self.cmd_linsert(&key, before, &pivot, &value),
438            Command::LRem { key, count, value } => self.cmd_lrem(&key, count, &value),
439            Command::LTrim { key, start, stop } => self.cmd_ltrim(&key, start, stop),
440            Command::LPos {
441                key,
442                value,
443                rank,
444                count,
445                maxlen,
446            } => self.cmd_lpos(&key, &value, rank, count, maxlen),
447            Command::RPopLPush {
448                source,
449                destination,
450            } => self.cmd_rpoplpush(&source, &destination),
451            Command::LMove {
452                source,
453                destination,
454                from_left,
455                to_left,
456            } => self.cmd_lmove(&source, &destination, from_left, to_left),
457
458            Command::HSet { key, fields } => self.cmd_hset(&key, &fields),
459            Command::HGet { key, field } => self.cmd_hget(&key, &field),
460            Command::HDel { key, fields } => self.cmd_hdel(&key, &fields),
461            Command::HGetAll { key } => self.cmd_hgetall(&key),
462            Command::HLen { key } => self.cmd_hlen(&key),
463            Command::HExists { key, field } => self.cmd_hexists(&key, &field),
464            Command::HIncrBy { key, field, delta } => self.cmd_hincrby(&key, &field, delta),
465            Command::HMGet { key, fields } => self.cmd_hmget(&key, &fields),
466            Command::HKeys { key } => self.cmd_hkeys(&key),
467            Command::HVals { key } => self.cmd_hvals(&key),
468            Command::HSetNx { key, field, value } => self.cmd_hsetnx(&key, &field, &value),
469            Command::HIncrByFloat { key, field, delta } => {
470                self.cmd_hincrbyfloat(&key, &field, delta)
471            }
472            Command::HRandField {
473                key,
474                count,
475                withvalues,
476            } => self.cmd_hrandfield(&key, count, withvalues),
477            Command::HScan {
478                key,
479                cursor,
480                pattern,
481                count,
482            } => self.cmd_hscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
483
484            Command::SAdd { key, members } => self.cmd_sadd(&key, &members),
485            Command::SRem { key, members } => self.cmd_srem(&key, &members),
486            Command::SMembers { key } => self.cmd_smembers(&key),
487            Command::SIsMember { key, member } => self.cmd_sismember(&key, &member),
488            Command::SCard { key } => self.cmd_scard(&key),
489            Command::SPop { key, count } => self.cmd_spop(&key, count),
490            Command::SRandMember { key, count } => self.cmd_srandmember(&key, count),
491            Command::SUnion { keys } => self.cmd_sunion(&keys),
492            Command::SUnionStore { destination, keys } => self.cmd_sunionstore(&destination, &keys),
493            Command::SInter { keys } => self.cmd_sinter(&keys),
494            Command::SInterStore { destination, keys } => self.cmd_sinterstore(&destination, &keys),
495            Command::SDiff { keys } => self.cmd_sdiff(&keys),
496            Command::SDiffStore { destination, keys } => self.cmd_sdiffstore(&destination, &keys),
497            Command::SInterCard {
498                numkeys: _,
499                keys,
500                limit,
501            } => self.cmd_sintercard(&keys, limit),
502            Command::SMove {
503                source,
504                destination,
505                member,
506            } => self.cmd_smove(&source, &destination, &member),
507            Command::SMisMember { key, members } => self.cmd_smismember(&key, &members),
508            Command::SScan {
509                key,
510                cursor,
511                pattern,
512                count,
513            } => self.cmd_sscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
514
515            Command::ZAdd { key, members } => self.cmd_zadd(&key, &members),
516            Command::ZRem { key, members } => self.cmd_zrem(&key, &members),
517            Command::ZScore { key, member } => self.cmd_zscore(&key, &member),
518            Command::ZRank { key, member } => self.cmd_zrank(&key, &member, false),
519            Command::ZRevRank { key, member } => self.cmd_zrank(&key, &member, true),
520            Command::ZCard { key } => self.cmd_zcard(&key),
521            Command::ZRange {
522                key,
523                start,
524                stop,
525                withscores,
526            } => self.cmd_zrange(&key, start, stop, withscores, false),
527            Command::ZRevRange {
528                key,
529                start,
530                stop,
531                withscores,
532            } => self.cmd_zrange(&key, start, stop, withscores, true),
533            Command::ZRangeByScore {
534                key,
535                min,
536                max,
537                withscores,
538                offset,
539                count,
540            } => self.cmd_zrangebyscore(&key, min, max, withscores, offset, count),
541            Command::ZIncrBy { key, delta, member } => self.cmd_zincrby(&key, delta, &member),
542            Command::ZCount { key, min, max } => self.cmd_zcount(&key, min, max),
543            Command::ZRevRangeByScore {
544                key,
545                max,
546                min,
547                withscores,
548                offset,
549                count,
550            } => self.cmd_zrevrangebyscore(&key, max, min, withscores, offset, count),
551            Command::ZPopMin { key, count } => self.cmd_zpopmin(&key, count),
552            Command::ZPopMax { key, count } => self.cmd_zpopmax(&key, count),
553            Command::ZRangeByLex {
554                key,
555                min,
556                max,
557                offset,
558                count,
559            } => self.cmd_zrangebylex(&key, &min, &max, offset, count, false),
560            Command::ZRevRangeByLex {
561                key,
562                max,
563                min,
564                offset,
565                count,
566            } => self.cmd_zrangebylex(&key, &min, &max, offset, count, true),
567            Command::ZLexCount { key, min, max } => self.cmd_zlexcount(&key, &min, &max),
568            Command::ZMScore { key, members } => self.cmd_zmscore(&key, &members),
569            Command::ZRandMember {
570                key,
571                count,
572                withscores,
573            } => self.cmd_zrandmember(&key, count, withscores),
574            Command::ZScan {
575                key,
576                cursor,
577                pattern,
578                count,
579            } => self.cmd_zscan(&key, cursor, pattern.as_deref(), count.unwrap_or(10)),
580
581            Command::XAdd {
582                key,
583                id,
584                fields,
585                maxlen,
586            } => self.cmd_xadd(&key, &id, &fields, maxlen),
587            Command::XLen { key } => self.cmd_xlen(&key),
588            Command::XRange {
589                key,
590                start,
591                end,
592                count,
593            } => self.cmd_xrange(&key, &start, &end, count),
594            Command::XRevRange {
595                key,
596                start,
597                end,
598                count,
599            } => self.cmd_xrevrange(&key, &start, &end, count),
600            Command::XRead { keys, ids, count } => self.cmd_xread(&keys, &ids, count),
601            Command::XTrim { key, maxlen } => self.cmd_xtrim(&key, maxlen),
602            Command::XDel { key, ids } => self.cmd_xdel(&key, &ids),
603            Command::XGroupCreate {
604                key,
605                group,
606                id,
607                mkstream,
608            } => self.cmd_xgroup_create(&key, &group, &id, mkstream),
609            Command::XGroupDestroy { key, group } => self.cmd_xgroup_destroy(&key, &group),
610            Command::XGroupDelConsumer {
611                key,
612                group,
613                consumer,
614            } => self.cmd_xgroup_delconsumer(&key, &group, &consumer),
615            Command::XReadGroup {
616                group,
617                consumer,
618                count,
619                keys,
620                ids,
621            } => self.cmd_xreadgroup(&group, &consumer, count, &keys, &ids),
622            Command::XAck { key, group, ids } => self.cmd_xack(&key, &group, &ids),
623            Command::XPending {
624                key,
625                group,
626                start,
627                end,
628                count,
629            } => self.cmd_xpending(&key, &group, start.as_deref(), end.as_deref(), count),
630            Command::XClaim {
631                key,
632                group,
633                consumer,
634                min_idle_time,
635                ids,
636            } => self.cmd_xclaim(&key, &group, &consumer, min_idle_time, &ids),
637            Command::XAutoClaim {
638                key,
639                group,
640                consumer,
641                min_idle_time,
642                start,
643                count,
644            } => self.cmd_xautoclaim(&key, &group, &consumer, min_idle_time, &start, count),
645            Command::XInfoStream { key } => self.cmd_xinfo_stream(&key),
646            Command::XInfoGroups { key } => self.cmd_xinfo_groups(&key),
647
648            Command::BLPop { keys, .. } => {
649                for key in &keys {
650                    let resp = self.cmd_lpop(key);
651                    if !matches!(resp, CommandResponse::Nil) {
652                        return CommandResponse::Array(vec![
653                            CommandResponse::BulkString(key.clone()),
654                            resp,
655                        ]);
656                    }
657                }
658                CommandResponse::Nil
659            }
660            Command::BRPop { keys, .. } => {
661                for key in &keys {
662                    let resp = self.cmd_rpop(key);
663                    if !matches!(resp, CommandResponse::Nil) {
664                        return CommandResponse::Array(vec![
665                            CommandResponse::BulkString(key.clone()),
666                            resp,
667                        ]);
668                    }
669                }
670                CommandResponse::Nil
671            }
672            Command::BLMove {
673                source,
674                destination,
675                from_left,
676                to_left,
677                ..
678            } => self.cmd_lmove(&source, &destination, from_left, to_left),
679            Command::BZPopMin { keys, .. } => {
680                for key in &keys {
681                    let resp = self.cmd_zpopmin(key, Some(1));
682                    if let CommandResponse::Array(ref items) = resp {
683                        if !items.is_empty() {
684                            return CommandResponse::Array(vec![
685                                CommandResponse::BulkString(key.clone()),
686                                resp,
687                            ]);
688                        }
689                    }
690                }
691                CommandResponse::Nil
692            }
693            Command::BZPopMax { keys, .. } => {
694                for key in &keys {
695                    let resp = self.cmd_zpopmax(key, Some(1));
696                    if let CommandResponse::Array(ref items) = resp {
697                        if !items.is_empty() {
698                            return CommandResponse::Array(vec![
699                                CommandResponse::BulkString(key.clone()),
700                                resp,
701                            ]);
702                        }
703                    }
704                }
705                CommandResponse::Nil
706            }
707
708            Command::Ping { message } => match message {
709                Some(msg) => CommandResponse::BulkString(msg),
710                None => CommandResponse::SimpleString("PONG".to_string()),
711            },
712            Command::Echo { message } => CommandResponse::BulkString(message),
713            Command::Info { .. } => CommandResponse::BulkString(
714                format!(
715                    "# Server\r\nkora_version:0.1.0\r\n# Keyspace\r\ndb0:keys={}\r\n",
716                    self.entries.len()
717                )
718                .into_bytes(),
719            ),
720            Command::CommandInfo { names } => command_info_response(&names),
721
722            Command::Dump => {
723                let entries: Vec<CommandResponse> = self
724                    .entries
725                    .iter()
726                    .filter(|(_, entry)| !entry.is_expired())
727                    .flat_map(|(key, entry)| {
728                        vec![
729                            CommandResponse::BulkString(key.as_bytes().to_vec()),
730                            CommandResponse::BulkString(entry.value.to_bytes()),
731                        ]
732                    })
733                    .collect();
734                CommandResponse::Array(entries)
735            }
736
737            Command::MGet { keys } => {
738                let results: Vec<CommandResponse> = keys.iter().map(|k| self.cmd_get(k)).collect();
739                CommandResponse::Array(results)
740            }
741            Command::MSet { entries } => {
742                for (k, v) in &entries {
743                    self.cmd_set(k, v, None, None, false, false);
744                }
745                CommandResponse::Ok
746            }
747
748            #[cfg(feature = "vector")]
749            Command::VecSet {
750                key,
751                dimensions,
752                vector,
753            } => self.cmd_vec_set(&key, dimensions, &vector),
754            #[cfg(feature = "vector")]
755            Command::VecQuery { key, k, vector } => self.cmd_vec_query(&key, k, &vector),
756            #[cfg(feature = "vector")]
757            Command::VecDel { key } => self.cmd_vec_del(&key),
758
759            Command::ObjectFreq { ref key } => {
760                let compact = CompactKey::new(key);
761                match self.entries.get(&compact) {
762                    Some(entry) if !entry.is_expired() => {
763                        CommandResponse::Integer(entry.lfu_counter as i64)
764                    }
765                    _ => CommandResponse::Nil,
766                }
767            }
768            Command::ObjectEncoding { ref key } => {
769                let compact = CompactKey::new(key);
770                match self.entries.get(&compact) {
771                    Some(entry) if !entry.is_expired() => {
772                        let encoding = match &entry.value {
773                            Value::InlineStr { .. } => "embstr",
774                            Value::HeapStr(_) => "raw",
775                            Value::Int(_) => "int",
776                            Value::List(_) => "linkedlist",
777                            Value::Hash(_) => "hashtable",
778                            Value::Set(_) => "hashtable",
779                            _ => "unknown",
780                        };
781                        CommandResponse::BulkString(encoding.as_bytes().to_vec())
782                    }
783                    _ => CommandResponse::Nil,
784                }
785            }
786            Command::StatsHotkeys { count } => self.cmd_stats_hotkeys(count),
787            Command::StatsMemory { prefixes } => self.cmd_stats_memory(&prefixes),
788
789            Command::PfAdd { key, elements } => self.cmd_pfadd(&key, &elements),
790            Command::PfCount { keys } => {
791                if keys.len() == 1 {
792                    self.cmd_pfcount_single(&keys[0])
793                } else {
794                    self.cmd_pfcount_multi(&keys)
795                }
796            }
797            Command::PfMerge {
798                destkey,
799                sourcekeys,
800            } => self.cmd_pfmerge(&destkey, &sourcekeys),
801
802            Command::SetBit { key, offset, value } => self.cmd_setbit(&key, offset, value),
803            Command::GetBit { key, offset } => self.cmd_getbit(&key, offset),
804            Command::BitCount {
805                key,
806                start,
807                end,
808                use_bit,
809            } => self.cmd_bitcount(&key, start, end, use_bit),
810            Command::BitOp {
811                operation,
812                destkey,
813                keys,
814            } => self.cmd_bitop(operation, &destkey, &keys),
815            Command::BitPos {
816                key,
817                bit,
818                start,
819                end,
820                use_bit,
821            } => self.cmd_bitpos(&key, bit, start, end, use_bit),
822            Command::BitField { key, operations } => self.cmd_bitfield(&key, &operations),
823
824            Command::GeoAdd {
825                key,
826                nx,
827                xx,
828                ch,
829                members,
830            } => self.cmd_geoadd(&key, nx, xx, ch, &members),
831            Command::GeoDist {
832                key,
833                member1,
834                member2,
835                unit,
836            } => self.cmd_geodist(&key, &member1, &member2, unit),
837            Command::GeoHash { key, members } => self.cmd_geohash(&key, &members),
838            Command::GeoPos { key, members } => self.cmd_geopos(&key, &members),
839            Command::GeoSearch {
840                key,
841                from_member,
842                from_lonlat,
843                radius,
844                unit,
845                asc,
846                count,
847                withcoord,
848                withdist,
849                withhash,
850            } => self.cmd_geosearch(
851                &key,
852                from_member.as_deref(),
853                from_lonlat,
854                radius,
855                unit,
856                asc,
857                count,
858                withcoord,
859                withdist,
860                withhash,
861            ),
862
863            Command::Multi
864            | Command::Exec
865            | Command::Discard
866            | Command::Watch { .. }
867            | Command::Unwatch => {
868                CommandResponse::Error("ERR command handled at connection level".into())
869            }
870
871            Command::ClientId
872            | Command::ClientGetName
873            | Command::ClientSetName { .. }
874            | Command::ClientList
875            | Command::ClientInfo => {
876                CommandResponse::Error("ERR command handled at connection level".into())
877            }
878
879            Command::ConfigGet { .. } => CommandResponse::Array(vec![]),
880            Command::ConfigSet { .. } => {
881                CommandResponse::Error("ERR unsupported CONFIG SET parameter".into())
882            }
883            Command::ConfigResetStat => CommandResponse::Ok,
884
885            Command::Time => {
886                use std::time::SystemTime;
887                let now = SystemTime::now()
888                    .duration_since(SystemTime::UNIX_EPOCH)
889                    .unwrap_or_default();
890                let secs = now.as_secs();
891                let micros = now.subsec_micros() as u64;
892                CommandResponse::Array(vec![
893                    CommandResponse::BulkString(secs.to_string().into_bytes()),
894                    CommandResponse::BulkString(micros.to_string().into_bytes()),
895                ])
896            }
897            Command::Select { db } => {
898                if db < 0 {
899                    CommandResponse::Error("ERR DB index is out of range".into())
900                } else if db == 0 {
901                    CommandResponse::Ok
902                } else {
903                    CommandResponse::Error("ERR SELECT is not allowed in cluster mode".into())
904                }
905            }
906            Command::Quit => CommandResponse::Ok,
907            Command::Wait { timeout, .. } => {
908                if timeout < 0 {
909                    CommandResponse::Error("ERR timeout is negative".into())
910                } else {
911                    CommandResponse::Integer(0)
912                }
913            }
914            Command::CommandCount => CommandResponse::Integer(supported_command_count()),
915            Command::CommandList => command_list_response(),
916            Command::CommandHelp => command_help_response(),
917            Command::CommandDocs { names } => command_docs_response(&names),
918
919            Command::BgSave
920            | Command::BgRewriteAof
921            | Command::Hello { .. }
922            | Command::Auth { .. }
923            | Command::CdcPoll { .. }
924            | Command::CdcGroupCreate { .. }
925            | Command::CdcGroupRead { .. }
926            | Command::CdcAck { .. }
927            | Command::CdcPending { .. }
928            | Command::StatsLatency { .. }
929            | Command::DocCreate { .. }
930            | Command::DocDrop { .. }
931            | Command::DocInfo { .. }
932            | Command::DocDictInfo { .. }
933            | Command::DocStorage { .. }
934            | Command::DocSet { .. }
935            | Command::DocInsert { .. }
936            | Command::DocMSet { .. }
937            | Command::DocGet { .. }
938            | Command::DocMGet { .. }
939            | Command::DocUpdate { .. }
940            | Command::DocDel { .. }
941            | Command::DocExists { .. }
942            | Command::DocCreateIndex { .. }
943            | Command::DocDropIndex { .. }
944            | Command::DocIndexes { .. }
945            | Command::DocFind { .. }
946            | Command::DocCount { .. }
947            | Command::Subscribe { .. }
948            | Command::Unsubscribe { .. }
949            | Command::PSubscribe { .. }
950            | Command::PUnsubscribe { .. }
951            | Command::Publish { .. } => {
952                CommandResponse::Error("ERR command handled at server level".into())
953            }
954
955            #[cfg(not(feature = "vector"))]
956            Command::VecSet { .. } | Command::VecQuery { .. } | Command::VecDel { .. } => {
957                CommandResponse::Error("ERR vector feature not enabled".into())
958            }
959        }
960    }
961
962    // ─── String operations ───────────────────────────────────────────
963
964    fn cmd_get(&mut self, key: &[u8]) -> CommandResponse {
965        self.lazy_expire(key);
966        match self.entries.get(key) {
967            Some(entry) => match entry.value.bulk_response() {
968                Some(resp) => resp,
969                None => CommandResponse::Error(
970                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
971                ),
972            },
973            None => CommandResponse::Nil,
974        }
975    }
976
977    fn cmd_set(
978        &mut self,
979        key: &[u8],
980        value: &[u8],
981        ex: Option<u64>,
982        px: Option<u64>,
983        nx: bool,
984        xx: bool,
985    ) -> CommandResponse {
986        let ttl = Command::ttl_duration(ex, px);
987        let compact = CompactKey::new(key);
988        match self.entries.entry(compact) {
989            Entry::Occupied(mut occupied) => {
990                if occupied.get().is_expired() {
991                    let (map_key, old_entry) = occupied.remove_entry();
992                    let old_size =
993                        Self::estimate_key_entry_size(map_key.as_bytes(), &old_entry.value);
994                    self.memory_used = self.memory_used.saturating_sub(old_size);
995
996                    if xx {
997                        return CommandResponse::Nil;
998                    }
999
1000                    let (entry, entry_size) = Self::build_string_entry(&map_key, value, ttl);
1001                    self.memory_used += entry_size;
1002                    self.entries.insert(map_key, entry);
1003                    CommandResponse::Ok
1004                } else {
1005                    if nx {
1006                        return CommandResponse::Nil;
1007                    }
1008
1009                    let new_value = Value::from_raw_bytes(value);
1010                    let old_value_size = occupied.get().value.estimated_size();
1011                    let new_value_size = new_value.estimated_size();
1012                    let size_delta = new_value_size as isize - old_value_size as isize;
1013
1014                    if size_delta >= 0 {
1015                        self.memory_used += size_delta as usize;
1016                    } else {
1017                        self.memory_used = self.memory_used.saturating_sub((-size_delta) as usize);
1018                    }
1019
1020                    let entry = occupied.get_mut();
1021                    entry.value = new_value;
1022                    entry.client_flags = 0;
1023                    match ttl {
1024                        Some(dur) => entry.set_ttl(dur),
1025                        None => entry.ttl = None,
1026                    }
1027                    CommandResponse::Ok
1028                }
1029            }
1030            Entry::Vacant(vacant) => {
1031                if xx {
1032                    return CommandResponse::Nil;
1033                }
1034
1035                let new_value = Value::from_raw_bytes(value);
1036                let entry_size = Self::estimate_key_entry_size(vacant.key().as_bytes(), &new_value);
1037                let mut entry = KeyEntry::new(vacant.key().clone(), new_value);
1038                if let Some(dur) = ttl {
1039                    entry.set_ttl(dur);
1040                }
1041                self.memory_used += entry_size;
1042                vacant.insert(entry);
1043                CommandResponse::Ok
1044            }
1045        }
1046    }
1047
1048    fn cmd_getset(&mut self, key: &[u8], value: &[u8]) -> CommandResponse {
1049        let old = self.cmd_get(key);
1050        self.cmd_set(key, value, None, None, false, false);
1051        old
1052    }
1053
1054    fn cmd_append(&mut self, key: &[u8], value: &[u8]) -> CommandResponse {
1055        self.lazy_expire(key);
1056        let compact = CompactKey::new(key);
1057        match self.entries.get_mut(&compact) {
1058            Some(entry) => match &entry.value {
1059                Value::InlineStr { data, len } => {
1060                    let mut existing = data[..*len as usize].to_vec();
1061                    existing.extend_from_slice(value);
1062                    let new_len = existing.len();
1063                    entry.value = Value::from_raw_bytes(&existing);
1064                    CommandResponse::Integer(new_len as i64)
1065                }
1066                Value::HeapStr(arc) => {
1067                    let mut existing = arc.to_vec();
1068                    existing.extend_from_slice(value);
1069                    let new_len = existing.len();
1070                    entry.value = Value::from_raw_bytes(&existing);
1071                    CommandResponse::Integer(new_len as i64)
1072                }
1073                Value::Int(i) => {
1074                    let mut existing = i.to_string().into_bytes();
1075                    existing.extend_from_slice(value);
1076                    let new_len = existing.len();
1077                    entry.value = Value::from_raw_bytes(&existing);
1078                    CommandResponse::Integer(new_len as i64)
1079                }
1080                _ => CommandResponse::Error(
1081                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1082                ),
1083            },
1084            None => {
1085                let new_entry = KeyEntry::new(compact.clone(), Value::from_raw_bytes(value));
1086                self.entries.insert(compact, new_entry);
1087                CommandResponse::Integer(value.len() as i64)
1088            }
1089        }
1090    }
1091
1092    fn cmd_strlen(&mut self, key: &[u8]) -> CommandResponse {
1093        self.lazy_expire(key);
1094        match self.entries.get(key) {
1095            Some(entry) => match entry.value.string_len() {
1096                Some(len) => CommandResponse::Integer(len as i64),
1097                None => CommandResponse::Error(
1098                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1099                ),
1100            },
1101            None => CommandResponse::Integer(0),
1102        }
1103    }
1104
1105    fn cmd_incrby(&mut self, key: &[u8], delta: i64) -> CommandResponse {
1106        self.lazy_expire(key);
1107        match self.entries.get_mut(key) {
1108            Some(entry) => {
1109                let current = match &entry.value {
1110                    Value::Int(i) => *i,
1111                    Value::InlineStr { data, len } => {
1112                        match std::str::from_utf8(&data[..*len as usize])
1113                            .ok()
1114                            .and_then(|s| s.parse::<i64>().ok())
1115                        {
1116                            Some(i) => i,
1117                            None => {
1118                                return CommandResponse::Error(
1119                                    "ERR value is not an integer or out of range".into(),
1120                                )
1121                            }
1122                        }
1123                    }
1124                    Value::HeapStr(arc) => {
1125                        match std::str::from_utf8(arc)
1126                            .ok()
1127                            .and_then(|s| s.parse::<i64>().ok())
1128                        {
1129                            Some(i) => i,
1130                            None => {
1131                                return CommandResponse::Error(
1132                                    "ERR value is not an integer or out of range".into(),
1133                                )
1134                            }
1135                        }
1136                    }
1137                    _ => {
1138                        return CommandResponse::Error(
1139                            "WRONGTYPE Operation against a key holding the wrong kind of value"
1140                                .into(),
1141                        )
1142                    }
1143                };
1144                match current.checked_add(delta) {
1145                    Some(result) => {
1146                        entry.value = Value::Int(result);
1147                        CommandResponse::Integer(result)
1148                    }
1149                    None => {
1150                        CommandResponse::Error("ERR increment or decrement would overflow".into())
1151                    }
1152                }
1153            }
1154            None => {
1155                let compact = CompactKey::new(key);
1156                let entry = KeyEntry::new(compact.clone(), Value::Int(delta));
1157                self.entries.insert(compact, entry);
1158                CommandResponse::Integer(delta)
1159            }
1160        }
1161    }
1162
1163    fn cmd_incrbyfloat(&mut self, key: &[u8], delta: f64) -> CommandResponse {
1164        self.lazy_expire(key);
1165        match self.entries.get_mut(key) {
1166            Some(entry) => {
1167                let current = match &entry.value {
1168                    Value::Int(i) => *i as f64,
1169                    Value::InlineStr { data, len } => {
1170                        match std::str::from_utf8(&data[..*len as usize])
1171                            .ok()
1172                            .and_then(|s| s.parse::<f64>().ok())
1173                        {
1174                            Some(f) => f,
1175                            None => {
1176                                return CommandResponse::Error(
1177                                    "ERR value is not a valid float".into(),
1178                                )
1179                            }
1180                        }
1181                    }
1182                    Value::HeapStr(arc) => {
1183                        match std::str::from_utf8(arc)
1184                            .ok()
1185                            .and_then(|s| s.parse::<f64>().ok())
1186                        {
1187                            Some(f) => f,
1188                            None => {
1189                                return CommandResponse::Error(
1190                                    "ERR value is not a valid float".into(),
1191                                )
1192                            }
1193                        }
1194                    }
1195                    _ => {
1196                        return CommandResponse::Error(
1197                            "WRONGTYPE Operation against a key holding the wrong kind of value"
1198                                .into(),
1199                        )
1200                    }
1201                };
1202                let result = current + delta;
1203                if result.is_infinite() || result.is_nan() {
1204                    return CommandResponse::Error(
1205                        "ERR increment would produce NaN or Infinity".into(),
1206                    );
1207                }
1208                let result_str = format_float(result);
1209                entry.value = Value::from_raw_bytes(result_str.as_bytes());
1210                CommandResponse::BulkString(result_str.into_bytes())
1211            }
1212            None => {
1213                if delta.is_infinite() || delta.is_nan() {
1214                    return CommandResponse::Error(
1215                        "ERR increment would produce NaN or Infinity".into(),
1216                    );
1217                }
1218                let result_str = format_float(delta);
1219                let compact = CompactKey::new(key);
1220                let entry = KeyEntry::new(
1221                    compact.clone(),
1222                    Value::from_raw_bytes(result_str.as_bytes()),
1223                );
1224                self.entries.insert(compact, entry);
1225                CommandResponse::BulkString(result_str.into_bytes())
1226            }
1227        }
1228    }
1229
1230    fn cmd_getrange(&mut self, key: &[u8], start: i64, end: i64) -> CommandResponse {
1231        self.lazy_expire(key);
1232        match self.entries.get(key) {
1233            Some(entry) => {
1234                let bytes = match entry.value.as_bytes() {
1235                    Some(b) => b,
1236                    None => {
1237                        return CommandResponse::Error(
1238                            "WRONGTYPE Operation against a key holding the wrong kind of value"
1239                                .into(),
1240                        )
1241                    }
1242                };
1243                let len = bytes.len() as i64;
1244                if len == 0 {
1245                    return CommandResponse::BulkString(vec![]);
1246                }
1247                let s = if start < 0 {
1248                    (len + start).max(0) as usize
1249                } else {
1250                    start.min(len) as usize
1251                };
1252                let e = if end < 0 {
1253                    (len + end).max(0) as usize
1254                } else {
1255                    end.min(len - 1) as usize
1256                };
1257                if s > e || s >= bytes.len() {
1258                    CommandResponse::BulkString(vec![])
1259                } else {
1260                    CommandResponse::BulkString(bytes[s..=e].to_vec())
1261                }
1262            }
1263            None => CommandResponse::BulkString(vec![]),
1264        }
1265    }
1266
1267    fn cmd_setrange(&mut self, key: &[u8], offset: usize, value: &[u8]) -> CommandResponse {
1268        self.lazy_expire(key);
1269        let compact = CompactKey::new(key);
1270        let mut current = match self.entries.get(key) {
1271            Some(entry) => match entry.value.as_bytes() {
1272                Some(b) => b,
1273                None => {
1274                    return CommandResponse::Error(
1275                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1276                    )
1277                }
1278            },
1279            None => vec![],
1280        };
1281        let needed = offset + value.len();
1282        if current.len() < needed {
1283            current.resize(needed, 0u8);
1284        }
1285        current[offset..offset + value.len()].copy_from_slice(value);
1286        let new_len = current.len() as i64;
1287        let old_size = self
1288            .entries
1289            .get(key)
1290            .map(|e| Self::estimate_key_entry_size(key, &e.value))
1291            .unwrap_or(0);
1292        let new_value = Value::from_raw_bytes(&current);
1293        let new_size = Self::estimate_key_entry_size(key, &new_value);
1294        match self.entries.get_mut(key) {
1295            Some(entry) => {
1296                entry.value = new_value;
1297                if new_size > old_size {
1298                    self.memory_used += new_size - old_size;
1299                } else {
1300                    self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
1301                }
1302            }
1303            None => {
1304                let entry = KeyEntry::new(compact.clone(), new_value);
1305                self.memory_used += new_size;
1306                self.entries.insert(compact, entry);
1307            }
1308        }
1309        CommandResponse::Integer(new_len)
1310    }
1311
1312    fn cmd_getdel(&mut self, key: &[u8]) -> CommandResponse {
1313        self.lazy_expire(key);
1314        let compact = CompactKey::new(key);
1315        match self.entries.get(&compact) {
1316            Some(entry) if !entry.is_expired() => {
1317                let resp = match entry.value.bulk_response() {
1318                    Some(r) => r,
1319                    None => {
1320                        return CommandResponse::Error(
1321                            "WRONGTYPE Operation against a key holding the wrong kind of value"
1322                                .into(),
1323                        )
1324                    }
1325                };
1326                let _ = self.remove_compact_entry(&compact);
1327                resp
1328            }
1329            _ => CommandResponse::Nil,
1330        }
1331    }
1332
1333    fn cmd_getex(
1334        &mut self,
1335        key: &[u8],
1336        ex: Option<u64>,
1337        px: Option<u64>,
1338        exat: Option<u64>,
1339        pxat: Option<u64>,
1340        persist: bool,
1341    ) -> CommandResponse {
1342        self.lazy_expire(key);
1343        match self.entries.get_mut(key) {
1344            Some(entry) if !entry.is_expired() => {
1345                let resp = match entry.value.bulk_response() {
1346                    Some(r) => r,
1347                    None => {
1348                        return CommandResponse::Error(
1349                            "WRONGTYPE Operation against a key holding the wrong kind of value"
1350                                .into(),
1351                        )
1352                    }
1353                };
1354                if let Some(s) = ex {
1355                    entry.set_ttl(Duration::from_secs(s));
1356                } else if let Some(ms) = px {
1357                    entry.set_ttl(Duration::from_millis(ms));
1358                } else if let Some(ts) = exat {
1359                    let now = std::time::SystemTime::now()
1360                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
1361                        .unwrap_or_default();
1362                    let target = Duration::from_secs(ts);
1363                    if target > now {
1364                        entry.set_ttl(target - now);
1365                    } else {
1366                        let compact = CompactKey::new(key);
1367                        let _ = self.remove_compact_entry(&compact);
1368                        return resp;
1369                    }
1370                } else if let Some(ts_ms) = pxat {
1371                    let now = std::time::SystemTime::now()
1372                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
1373                        .unwrap_or_default();
1374                    let target = Duration::from_millis(ts_ms);
1375                    if target > now {
1376                        entry.set_ttl(target - now);
1377                    } else {
1378                        let compact = CompactKey::new(key);
1379                        let _ = self.remove_compact_entry(&compact);
1380                        return resp;
1381                    }
1382                } else if persist {
1383                    entry.clear_ttl();
1384                }
1385                resp
1386            }
1387            _ => CommandResponse::Nil,
1388        }
1389    }
1390
1391    fn cmd_msetnx(&mut self, entries: &[(Vec<u8>, Vec<u8>)]) -> CommandResponse {
1392        for (k, _) in entries {
1393            self.lazy_expire(k);
1394            if self.entries.contains_key(k.as_slice()) {
1395                return CommandResponse::Integer(0);
1396            }
1397        }
1398        for (k, v) in entries {
1399            self.cmd_set(k, v, None, None, false, false);
1400        }
1401        CommandResponse::Integer(1)
1402    }
1403
1404    // ─── Key operations ──────────────────────────────────────────────
1405
1406    fn del(&mut self, key: &[u8]) -> bool {
1407        let compact = CompactKey::new(key);
1408        self.remove_compact_entry(&compact).is_some()
1409    }
1410
1411    fn exists(&mut self, key: &[u8]) -> bool {
1412        self.lazy_expire(key);
1413        self.entries.contains_key(key)
1414    }
1415
1416    fn cmd_expire(&mut self, key: &[u8], duration: Duration) -> CommandResponse {
1417        match self.entries.get_mut(key) {
1418            Some(entry) if !entry.is_expired() => {
1419                entry.set_ttl(duration);
1420                CommandResponse::Integer(1)
1421            }
1422            _ => CommandResponse::Integer(0),
1423        }
1424    }
1425
1426    fn cmd_persist(&mut self, key: &[u8]) -> CommandResponse {
1427        match self.entries.get_mut(key) {
1428            Some(entry) if !entry.is_expired() && entry.ttl.is_some() => {
1429                entry.clear_ttl();
1430                CommandResponse::Integer(1)
1431            }
1432            _ => CommandResponse::Integer(0),
1433        }
1434    }
1435
1436    fn cmd_ttl(&mut self, key: &[u8], millis: bool) -> CommandResponse {
1437        self.lazy_expire(key);
1438        match self.entries.get(key) {
1439            None => CommandResponse::Integer(-2),
1440            Some(entry) => match entry.remaining_ttl() {
1441                None => CommandResponse::Integer(-1),
1442                Some(dur) => {
1443                    if millis {
1444                        CommandResponse::Integer(dur.as_millis() as i64)
1445                    } else {
1446                        CommandResponse::Integer(dur.as_secs() as i64)
1447                    }
1448                }
1449            },
1450        }
1451    }
1452
1453    fn cmd_type(&mut self, key: &[u8]) -> CommandResponse {
1454        self.lazy_expire(key);
1455        match self.entries.get(key) {
1456            Some(entry) => CommandResponse::SimpleString(entry.value.type_name().to_string()),
1457            None => CommandResponse::SimpleString("none".to_string()),
1458        }
1459    }
1460
1461    fn cmd_expireat(&mut self, key: &[u8], timestamp: u64, millis: bool) -> CommandResponse {
1462        match self.entries.get_mut(key) {
1463            Some(entry) if !entry.is_expired() => {
1464                let now = std::time::SystemTime::now()
1465                    .duration_since(std::time::SystemTime::UNIX_EPOCH)
1466                    .unwrap_or_default();
1467                let target = if millis {
1468                    Duration::from_millis(timestamp)
1469                } else {
1470                    Duration::from_secs(timestamp)
1471                };
1472                if target > now {
1473                    entry.set_ttl(target - now);
1474                    CommandResponse::Integer(1)
1475                } else {
1476                    let compact = CompactKey::new(key);
1477                    let _ = self.remove_compact_entry(&compact);
1478                    CommandResponse::Integer(1)
1479                }
1480            }
1481            _ => CommandResponse::Integer(0),
1482        }
1483    }
1484
1485    fn cmd_rename(&mut self, key: &[u8], newkey: &[u8], nx: bool) -> CommandResponse {
1486        self.lazy_expire(key);
1487        let source_compact = CompactKey::new(key);
1488        let source_entry = match self.remove_compact_entry(&source_compact) {
1489            Some(e) => e,
1490            None => {
1491                return CommandResponse::Error("ERR no such key".into());
1492            }
1493        };
1494        self.lazy_expire(newkey);
1495        let dest_compact = CompactKey::new(newkey);
1496        if nx && self.entries.contains_key(&dest_compact) {
1497            let size = Self::estimate_key_entry_size(key, &source_entry.value);
1498            self.memory_used += size;
1499            self.entries.insert(source_compact, source_entry);
1500            return CommandResponse::Integer(0);
1501        }
1502        if let Some(_old) = self.remove_compact_entry(&dest_compact) {}
1503        let mut new_entry = KeyEntry::new(dest_compact.clone(), source_entry.value.clone());
1504        new_entry.ttl = source_entry.ttl;
1505        new_entry.lfu_counter = source_entry.lfu_counter;
1506        let size = Self::estimate_key_entry_size(newkey, &new_entry.value);
1507        self.memory_used += size;
1508        self.entries.insert(dest_compact, new_entry);
1509        if nx {
1510            CommandResponse::Integer(1)
1511        } else {
1512            CommandResponse::Ok
1513        }
1514    }
1515
1516    fn cmd_copy(&mut self, source: &[u8], destination: &[u8], replace: bool) -> CommandResponse {
1517        self.lazy_expire(source);
1518        let src_compact = CompactKey::new(source);
1519        let (value, ttl) = match self.entries.get(&src_compact) {
1520            Some(entry) if !entry.is_expired() => (entry.value.clone(), entry.ttl),
1521            _ => return CommandResponse::Integer(0),
1522        };
1523        self.lazy_expire(destination);
1524        let dest_compact = CompactKey::new(destination);
1525        if self.entries.contains_key(&dest_compact) && !replace {
1526            return CommandResponse::Integer(0);
1527        }
1528        if replace {
1529            let _ = self.remove_compact_entry(&dest_compact);
1530        }
1531        let mut new_entry = KeyEntry::new(dest_compact.clone(), value);
1532        new_entry.ttl = ttl;
1533        let size = Self::estimate_key_entry_size(destination, &new_entry.value);
1534        self.memory_used += size;
1535        self.entries.insert(dest_compact, new_entry);
1536        CommandResponse::Integer(1)
1537    }
1538
1539    fn cmd_randomkey(&mut self) -> CommandResponse {
1540        for (key, entry) in &self.entries {
1541            if !entry.is_expired() {
1542                return CommandResponse::BulkString(key.as_bytes().to_vec());
1543            }
1544        }
1545        CommandResponse::Nil
1546    }
1547
1548    fn cmd_keys(&self, pattern: &str) -> CommandResponse {
1549        let results: Vec<CommandResponse> = self
1550            .entries
1551            .iter()
1552            .filter(|(_, entry)| !entry.is_expired())
1553            .filter(|(key, _)| glob_match(pattern, key.as_bytes()))
1554            .map(|(key, _)| CommandResponse::BulkString(key.as_bytes().to_vec()))
1555            .collect();
1556        CommandResponse::Array(results)
1557    }
1558
1559    fn cmd_scan(&self, cursor: u64, pattern: Option<&str>, count: usize) -> CommandResponse {
1560        // Collect and sort keys for deterministic cursor iteration.
1561        // Sorting ensures the same key set always yields the same order,
1562        // so cursor-based pagination works correctly across calls.
1563        let mut keys: Vec<&CompactKey> = self
1564            .entries
1565            .iter()
1566            .filter(|(_, entry)| !entry.is_expired())
1567            .filter(|(key, _)| pattern.is_none_or(|p| glob_match(p, key.as_bytes())))
1568            .map(|(key, _)| key)
1569            .collect();
1570        keys.sort_by(|a, b| a.as_bytes().cmp(b.as_bytes()));
1571
1572        let start = cursor as usize;
1573        let end = (start + count).min(keys.len());
1574
1575        if start >= keys.len() {
1576            return CommandResponse::Array(vec![
1577                CommandResponse::BulkString(b"0".to_vec()),
1578                CommandResponse::Array(vec![]),
1579            ]);
1580        }
1581
1582        let result_keys: Vec<CommandResponse> = keys[start..end]
1583            .iter()
1584            .map(|k| CommandResponse::BulkString(k.as_bytes().to_vec()))
1585            .collect();
1586
1587        let next_cursor = if end >= keys.len() { 0 } else { end as u64 };
1588
1589        CommandResponse::Array(vec![
1590            CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
1591            CommandResponse::Array(result_keys),
1592        ])
1593    }
1594
1595    fn cmd_stats_hotkeys(&self, count: usize) -> CommandResponse {
1596        let mut hot: Vec<(&CompactKey, u8)> = self
1597            .entries
1598            .iter()
1599            .filter(|(_, entry)| !entry.is_expired())
1600            .map(|(key, entry)| (key, entry.lfu_counter))
1601            .collect();
1602        hot.sort_by(|a, b| {
1603            b.1.cmp(&a.1)
1604                .then_with(|| a.0.as_bytes().cmp(b.0.as_bytes()))
1605        });
1606
1607        let items = hot
1608            .into_iter()
1609            .take(count)
1610            .map(|(key, freq)| {
1611                CommandResponse::Array(vec![
1612                    CommandResponse::BulkString(key.as_bytes().to_vec()),
1613                    CommandResponse::Integer(i64::from(freq)),
1614                ])
1615            })
1616            .collect();
1617        CommandResponse::Array(items)
1618    }
1619
1620    fn cmd_stats_memory(&self, prefixes: &[Vec<u8>]) -> CommandResponse {
1621        let items: Vec<CommandResponse> = prefixes
1622            .iter()
1623            .map(|prefix| {
1624                let total: usize = self
1625                    .entries
1626                    .iter()
1627                    .filter(|(key, entry)| {
1628                        !entry.is_expired() && key.as_bytes().starts_with(prefix.as_slice())
1629                    })
1630                    .map(|(key, entry)| Self::estimate_key_entry_size(key.as_bytes(), &entry.value))
1631                    .sum();
1632                CommandResponse::Integer(total as i64)
1633            })
1634            .collect();
1635        CommandResponse::Array(items)
1636    }
1637
1638    // List and hash command implementations live in dedicated modules.
1639
1640    fn cmd_sadd(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
1641        self.lazy_expire(key);
1642        let compact = CompactKey::new(key);
1643        let is_new = !self.entries.contains_key(&compact);
1644        let set = self.get_or_create_set(&compact);
1645        match set {
1646            Ok(set) => {
1647                let mut memory_delta: usize = 0;
1648                if is_new {
1649                    memory_delta += key.len()
1650                        + std::mem::size_of::<CompactKey>()
1651                        + std::mem::size_of::<KeyEntry>();
1652                }
1653                let mut count = 0usize;
1654                for m in members {
1655                    let val = Value::from_raw_bytes(m);
1656                    let size = val.estimated_size();
1657                    if set.insert(val) {
1658                        memory_delta += size;
1659                        count += 1;
1660                    }
1661                }
1662                self.memory_used += memory_delta;
1663                CommandResponse::Integer(count as i64)
1664            }
1665            Err(e) => e,
1666        }
1667    }
1668
1669    fn cmd_srem(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
1670        self.lazy_expire(key);
1671        let compact = CompactKey::new(key);
1672        let (count, is_empty) = match self.entries.get_mut(&compact) {
1673            Some(entry) => match &mut entry.value {
1674                Value::Set(set) => {
1675                    let c = members
1676                        .iter()
1677                        .filter(|m| set.remove(&Value::from_raw_bytes(m)))
1678                        .count();
1679                    (c as i64, set.is_empty())
1680                }
1681                _ => {
1682                    return CommandResponse::Error(
1683                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1684                    )
1685                }
1686            },
1687            None => return CommandResponse::Integer(0),
1688        };
1689        if is_empty {
1690            self.entries.remove(&compact);
1691        }
1692        CommandResponse::Integer(count)
1693    }
1694
1695    fn cmd_smembers(&mut self, key: &[u8]) -> CommandResponse {
1696        self.lazy_expire(key);
1697        let compact = CompactKey::new(key);
1698        match self.entries.get(&compact) {
1699            Some(entry) => match &entry.value {
1700                Value::Set(set) => {
1701                    let results: Vec<CommandResponse> = set
1702                        .iter()
1703                        .map(|v| match v.bulk_response() {
1704                            Some(resp) => resp,
1705                            None => CommandResponse::Nil,
1706                        })
1707                        .collect();
1708                    CommandResponse::Array(results)
1709                }
1710                _ => CommandResponse::Error(
1711                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1712                ),
1713            },
1714            None => CommandResponse::Array(vec![]),
1715        }
1716    }
1717
1718    fn cmd_sismember(&mut self, key: &[u8], member: &[u8]) -> CommandResponse {
1719        self.lazy_expire(key);
1720        let compact = CompactKey::new(key);
1721        match self.entries.get(&compact) {
1722            Some(entry) => match &entry.value {
1723                Value::Set(set) => {
1724                    let val = Value::from_raw_bytes(member);
1725                    CommandResponse::Integer(if set.contains(&val) { 1 } else { 0 })
1726                }
1727                _ => CommandResponse::Error(
1728                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1729                ),
1730            },
1731            None => CommandResponse::Integer(0),
1732        }
1733    }
1734
1735    fn cmd_scard(&mut self, key: &[u8]) -> CommandResponse {
1736        self.lazy_expire(key);
1737        let compact = CompactKey::new(key);
1738        match self.entries.get(&compact) {
1739            Some(entry) => match &entry.value {
1740                Value::Set(set) => CommandResponse::Integer(set.len() as i64),
1741                _ => CommandResponse::Error(
1742                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1743                ),
1744            },
1745            None => CommandResponse::Integer(0),
1746        }
1747    }
1748
1749    fn cmd_spop(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
1750        self.lazy_expire(key);
1751        let compact = CompactKey::new(key);
1752        let result = match self.entries.get_mut(&compact) {
1753            Some(entry) => match &mut entry.value {
1754                Value::Set(set) => {
1755                    if set.is_empty() {
1756                        return if count.is_some() {
1757                            CommandResponse::Array(vec![])
1758                        } else {
1759                            CommandResponse::Nil
1760                        };
1761                    }
1762                    let n = count.unwrap_or(1);
1763                    let mut popped = Vec::with_capacity(n.min(set.len()));
1764                    for _ in 0..n {
1765                        if set.is_empty() {
1766                            break;
1767                        }
1768                        let idx = (self.eviction_counter as usize) % set.len();
1769                        self.eviction_counter = self.eviction_counter.wrapping_add(1);
1770                        let val = set.iter().nth(idx).cloned();
1771                        if let Some(v) = val {
1772                            set.remove(&v);
1773                            popped.push(v);
1774                        }
1775                    }
1776                    let is_empty = set.is_empty();
1777                    let resp = if count.is_some() {
1778                        CommandResponse::Array(
1779                            popped
1780                                .iter()
1781                                .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1782                                .collect(),
1783                        )
1784                    } else {
1785                        popped
1786                            .first()
1787                            .and_then(|v| v.bulk_response())
1788                            .unwrap_or(CommandResponse::Nil)
1789                    };
1790                    (resp, is_empty)
1791                }
1792                _ => {
1793                    return CommandResponse::Error(
1794                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1795                    )
1796                }
1797            },
1798            None => {
1799                return if count.is_some() {
1800                    CommandResponse::Array(vec![])
1801                } else {
1802                    CommandResponse::Nil
1803                }
1804            }
1805        };
1806        if result.1 {
1807            self.entries.remove(&compact);
1808        }
1809        result.0
1810    }
1811
1812    fn cmd_srandmember(&mut self, key: &[u8], count: Option<i64>) -> CommandResponse {
1813        self.lazy_expire(key);
1814        let compact = CompactKey::new(key);
1815        match self.entries.get(&compact) {
1816            Some(entry) => match &entry.value {
1817                Value::Set(set) => {
1818                    if set.is_empty() {
1819                        return if count.is_some() {
1820                            CommandResponse::Array(vec![])
1821                        } else {
1822                            CommandResponse::Nil
1823                        };
1824                    }
1825                    let members: Vec<&Value> = set.iter().collect();
1826                    match count {
1827                        None => {
1828                            let idx = (self.eviction_counter as usize) % members.len();
1829                            self.eviction_counter = self.eviction_counter.wrapping_add(1);
1830                            members[idx].bulk_response().unwrap_or(CommandResponse::Nil)
1831                        }
1832                        Some(c) if c >= 0 => {
1833                            let take = (c as usize).min(members.len());
1834                            let start = (self.eviction_counter as usize) % members.len();
1835                            self.eviction_counter = self.eviction_counter.wrapping_add(take as u64);
1836                            let mut result = Vec::with_capacity(take);
1837                            for i in 0..take {
1838                                let idx = (start + i) % members.len();
1839                                result.push(
1840                                    members[idx].bulk_response().unwrap_or(CommandResponse::Nil),
1841                                );
1842                            }
1843                            CommandResponse::Array(result)
1844                        }
1845                        Some(c) => {
1846                            let abs_count = c.unsigned_abs() as usize;
1847                            let mut result = Vec::with_capacity(abs_count);
1848                            for i in 0..abs_count {
1849                                let idx = ((self.eviction_counter as usize) + i) % members.len();
1850                                result.push(
1851                                    members[idx].bulk_response().unwrap_or(CommandResponse::Nil),
1852                                );
1853                            }
1854                            self.eviction_counter =
1855                                self.eviction_counter.wrapping_add(abs_count as u64);
1856                            CommandResponse::Array(result)
1857                        }
1858                    }
1859                }
1860                _ => CommandResponse::Error(
1861                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1862                ),
1863            },
1864            None => {
1865                if count.is_some() {
1866                    CommandResponse::Array(vec![])
1867                } else {
1868                    CommandResponse::Nil
1869                }
1870            }
1871        }
1872    }
1873
1874    fn collect_set_members(&mut self, key: &[u8]) -> Result<HashSet<Value>, CommandResponse> {
1875        self.lazy_expire(key);
1876        let compact = CompactKey::new(key);
1877        match self.entries.get(&compact) {
1878            Some(entry) => match &entry.value {
1879                Value::Set(set) => Ok(set.clone()),
1880                _ => Err(CommandResponse::Error(
1881                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
1882                )),
1883            },
1884            None => Ok(HashSet::new()),
1885        }
1886    }
1887
1888    fn cmd_sunion(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1889        let mut result: HashSet<Value> = HashSet::new();
1890        for key in keys {
1891            match self.collect_set_members(key) {
1892                Ok(set) => {
1893                    for v in set {
1894                        result.insert(v);
1895                    }
1896                }
1897                Err(e) => return e,
1898            }
1899        }
1900        CommandResponse::Array(
1901            result
1902                .iter()
1903                .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1904                .collect(),
1905        )
1906    }
1907
1908    fn cmd_sunionstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
1909        let mut result: HashSet<Value> = HashSet::new();
1910        for key in keys {
1911            match self.collect_set_members(key) {
1912                Ok(set) => {
1913                    for v in set {
1914                        result.insert(v);
1915                    }
1916                }
1917                Err(e) => return e,
1918            }
1919        }
1920        let count = result.len() as i64;
1921        self.del(destination);
1922        if !result.is_empty() {
1923            let compact = CompactKey::new(destination);
1924            let mut memory_delta = destination.len()
1925                + std::mem::size_of::<CompactKey>()
1926                + std::mem::size_of::<KeyEntry>();
1927            for v in &result {
1928                memory_delta += v.estimated_size();
1929            }
1930            let entry = KeyEntry::new(compact.clone(), Value::Set(result));
1931            self.entries.insert(compact, entry);
1932            self.memory_used += memory_delta;
1933        }
1934        CommandResponse::Integer(count)
1935    }
1936
1937    fn cmd_sinter(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1938        if keys.is_empty() {
1939            return CommandResponse::Array(vec![]);
1940        }
1941        let first = match self.collect_set_members(&keys[0]) {
1942            Ok(s) => s,
1943            Err(e) => return e,
1944        };
1945        let mut result = first;
1946        for key in &keys[1..] {
1947            match self.collect_set_members(key) {
1948                Ok(set) => {
1949                    result = result.intersection(&set).cloned().collect();
1950                }
1951                Err(e) => return e,
1952            }
1953        }
1954        CommandResponse::Array(
1955            result
1956                .iter()
1957                .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
1958                .collect(),
1959        )
1960    }
1961
1962    fn cmd_sinterstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
1963        if keys.is_empty() {
1964            self.del(destination);
1965            return CommandResponse::Integer(0);
1966        }
1967        let first = match self.collect_set_members(&keys[0]) {
1968            Ok(s) => s,
1969            Err(e) => return e,
1970        };
1971        let mut result = first;
1972        for key in &keys[1..] {
1973            match self.collect_set_members(key) {
1974                Ok(set) => {
1975                    result = result.intersection(&set).cloned().collect();
1976                }
1977                Err(e) => return e,
1978            }
1979        }
1980        let count = result.len() as i64;
1981        self.del(destination);
1982        if !result.is_empty() {
1983            let compact = CompactKey::new(destination);
1984            let mut memory_delta = destination.len()
1985                + std::mem::size_of::<CompactKey>()
1986                + std::mem::size_of::<KeyEntry>();
1987            for v in &result {
1988                memory_delta += v.estimated_size();
1989            }
1990            let entry = KeyEntry::new(compact.clone(), Value::Set(result));
1991            self.entries.insert(compact, entry);
1992            self.memory_used += memory_delta;
1993        }
1994        CommandResponse::Integer(count)
1995    }
1996
1997    fn cmd_sdiff(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
1998        if keys.is_empty() {
1999            return CommandResponse::Array(vec![]);
2000        }
2001        let first = match self.collect_set_members(&keys[0]) {
2002            Ok(s) => s,
2003            Err(e) => return e,
2004        };
2005        let mut result = first;
2006        for key in &keys[1..] {
2007            match self.collect_set_members(key) {
2008                Ok(set) => {
2009                    result = result.difference(&set).cloned().collect();
2010                }
2011                Err(e) => return e,
2012            }
2013        }
2014        CommandResponse::Array(
2015            result
2016                .iter()
2017                .map(|v| v.bulk_response().unwrap_or(CommandResponse::Nil))
2018                .collect(),
2019        )
2020    }
2021
2022    fn cmd_sdiffstore(&mut self, destination: &[u8], keys: &[Vec<u8>]) -> CommandResponse {
2023        if keys.is_empty() {
2024            self.del(destination);
2025            return CommandResponse::Integer(0);
2026        }
2027        let first = match self.collect_set_members(&keys[0]) {
2028            Ok(s) => s,
2029            Err(e) => return e,
2030        };
2031        let mut result = first;
2032        for key in &keys[1..] {
2033            match self.collect_set_members(key) {
2034                Ok(set) => {
2035                    result = result.difference(&set).cloned().collect();
2036                }
2037                Err(e) => return e,
2038            }
2039        }
2040        let count = result.len() as i64;
2041        self.del(destination);
2042        if !result.is_empty() {
2043            let compact = CompactKey::new(destination);
2044            let mut memory_delta = destination.len()
2045                + std::mem::size_of::<CompactKey>()
2046                + std::mem::size_of::<KeyEntry>();
2047            for v in &result {
2048                memory_delta += v.estimated_size();
2049            }
2050            let entry = KeyEntry::new(compact.clone(), Value::Set(result));
2051            self.entries.insert(compact, entry);
2052            self.memory_used += memory_delta;
2053        }
2054        CommandResponse::Integer(count)
2055    }
2056
2057    fn cmd_sintercard(&mut self, keys: &[Vec<u8>], limit: Option<usize>) -> CommandResponse {
2058        if keys.is_empty() {
2059            return CommandResponse::Integer(0);
2060        }
2061        let first = match self.collect_set_members(&keys[0]) {
2062            Ok(s) => s,
2063            Err(e) => return e,
2064        };
2065        let mut result = first;
2066        for key in &keys[1..] {
2067            match self.collect_set_members(key) {
2068                Ok(set) => {
2069                    result = result.intersection(&set).cloned().collect();
2070                }
2071                Err(e) => return e,
2072            }
2073        }
2074        let count = match limit {
2075            Some(lim) if lim > 0 => result.len().min(lim),
2076            _ => result.len(),
2077        };
2078        CommandResponse::Integer(count as i64)
2079    }
2080
2081    fn cmd_smove(&mut self, source: &[u8], destination: &[u8], member: &[u8]) -> CommandResponse {
2082        self.lazy_expire(source);
2083        self.lazy_expire(destination);
2084        let val = Value::from_raw_bytes(member);
2085        let src_compact = CompactKey::new(source);
2086        let removed = match self.entries.get_mut(&src_compact) {
2087            Some(entry) => match &mut entry.value {
2088                Value::Set(set) => set.remove(&val),
2089                _ => {
2090                    return CommandResponse::Error(
2091                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2092                    )
2093                }
2094            },
2095            None => return CommandResponse::Integer(0),
2096        };
2097        if !removed {
2098            return CommandResponse::Integer(0);
2099        }
2100        let src_empty = self
2101            .entries
2102            .get(&src_compact)
2103            .map(|e| matches!(&e.value, Value::Set(s) if s.is_empty()))
2104            .unwrap_or(false);
2105        if src_empty {
2106            self.entries.remove(&src_compact);
2107        }
2108        let dst_compact = CompactKey::new(destination);
2109        let dst_set = self.get_or_create_set(&dst_compact);
2110        match dst_set {
2111            Ok(set) => {
2112                set.insert(val);
2113            }
2114            Err(e) => return e,
2115        }
2116        CommandResponse::Integer(1)
2117    }
2118
2119    fn cmd_smismember(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2120        self.lazy_expire(key);
2121        let compact = CompactKey::new(key);
2122        match self.entries.get(&compact) {
2123            Some(entry) => match &entry.value {
2124                Value::Set(set) => {
2125                    let results: Vec<CommandResponse> = members
2126                        .iter()
2127                        .map(|m| {
2128                            let val = Value::from_raw_bytes(m);
2129                            CommandResponse::Integer(if set.contains(&val) { 1 } else { 0 })
2130                        })
2131                        .collect();
2132                    CommandResponse::Array(results)
2133                }
2134                _ => CommandResponse::Error(
2135                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2136                ),
2137            },
2138            None => CommandResponse::Array(vec![CommandResponse::Integer(0); members.len()]),
2139        }
2140    }
2141
2142    fn cmd_sscan(
2143        &mut self,
2144        key: &[u8],
2145        cursor: u64,
2146        pattern: Option<&str>,
2147        count: usize,
2148    ) -> CommandResponse {
2149        self.lazy_expire(key);
2150        let compact = CompactKey::new(key);
2151        match self.entries.get(&compact) {
2152            Some(entry) => match &entry.value {
2153                Value::Set(set) => {
2154                    let mut members: Vec<&Value> = set
2155                        .iter()
2156                        .filter(|v| {
2157                            pattern.is_none_or(|p| {
2158                                if let Some(CommandResponse::BulkString(b)) = v.bulk_response() {
2159                                    glob_match(p, &b)
2160                                } else {
2161                                    false
2162                                }
2163                            })
2164                        })
2165                        .collect();
2166                    members.sort_by(|a, b| {
2167                        let ab = a.to_bytes();
2168                        let bb = b.to_bytes();
2169                        ab.cmp(&bb)
2170                    });
2171
2172                    let start = cursor as usize;
2173                    let end = (start + count).min(members.len());
2174
2175                    if start >= members.len() {
2176                        return CommandResponse::Array(vec![
2177                            CommandResponse::BulkString(b"0".to_vec()),
2178                            CommandResponse::Array(vec![]),
2179                        ]);
2180                    }
2181
2182                    let mut results = Vec::with_capacity(end - start);
2183                    for v in &members[start..end] {
2184                        results.push(v.bulk_response().unwrap_or(CommandResponse::Nil));
2185                    }
2186
2187                    let next_cursor = if end >= members.len() { 0 } else { end as u64 };
2188
2189                    CommandResponse::Array(vec![
2190                        CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
2191                        CommandResponse::Array(results),
2192                    ])
2193                }
2194                _ => CommandResponse::Error(
2195                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2196                ),
2197            },
2198            None => CommandResponse::Array(vec![
2199                CommandResponse::BulkString(b"0".to_vec()),
2200                CommandResponse::Array(vec![]),
2201            ]),
2202        }
2203    }
2204
2205    // ─── Sorted Set operations ────────────────────────────────────────
2206
2207    fn cmd_zadd(&mut self, key: &[u8], members: &[(f64, Vec<u8>)]) -> CommandResponse {
2208        self.lazy_expire(key);
2209        let compact = CompactKey::new(key);
2210        let is_new = !self.entries.contains_key(&compact);
2211        let zset = self.get_or_create_sorted_set(&compact);
2212        match zset {
2213            Ok(map) => {
2214                let mut memory_delta: usize = 0;
2215                if is_new {
2216                    memory_delta += key.len()
2217                        + std::mem::size_of::<CompactKey>()
2218                        + std::mem::size_of::<KeyEntry>();
2219                }
2220                let mut added = 0i64;
2221                let member_overhead = std::mem::size_of::<f64>();
2222                for (score, member) in members {
2223                    if map.insert(member.clone(), *score).is_none() {
2224                        memory_delta += member.len() + member_overhead;
2225                        added += 1;
2226                    }
2227                }
2228                self.memory_used += memory_delta;
2229                CommandResponse::Integer(added)
2230            }
2231            Err(e) => e,
2232        }
2233    }
2234
2235    fn cmd_zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2236        self.lazy_expire(key);
2237        let compact = CompactKey::new(key);
2238        let (count, is_empty) = match self.entries.get_mut(&compact) {
2239            Some(entry) => match &mut entry.value {
2240                Value::SortedSet(map) => {
2241                    let c = members.iter().filter(|m| map.remove(*m).is_some()).count();
2242                    (c as i64, map.is_empty())
2243                }
2244                _ => {
2245                    return CommandResponse::Error(
2246                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2247                    )
2248                }
2249            },
2250            None => return CommandResponse::Integer(0),
2251        };
2252        if is_empty {
2253            self.entries.remove(&compact);
2254        }
2255        CommandResponse::Integer(count)
2256    }
2257
2258    fn cmd_zscore(&mut self, key: &[u8], member: &[u8]) -> CommandResponse {
2259        self.lazy_expire(key);
2260        let compact = CompactKey::new(key);
2261        match self.entries.get(&compact) {
2262            Some(entry) => match &entry.value {
2263                Value::SortedSet(map) => match map.get(member) {
2264                    Some(score) => CommandResponse::BulkString(format!("{}", score).into_bytes()),
2265                    None => CommandResponse::Nil,
2266                },
2267                _ => CommandResponse::Error(
2268                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2269                ),
2270            },
2271            None => CommandResponse::Nil,
2272        }
2273    }
2274
2275    fn cmd_zrank(&mut self, key: &[u8], member: &[u8], reverse: bool) -> CommandResponse {
2276        self.lazy_expire(key);
2277        let compact = CompactKey::new(key);
2278        match self.entries.get(&compact) {
2279            Some(entry) => match &entry.value {
2280                Value::SortedSet(map) => {
2281                    if !map.contains_key(member) {
2282                        return CommandResponse::Nil;
2283                    }
2284                    let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2285                    sorted.sort_by(|a, b| {
2286                        a.1.partial_cmp(b.1)
2287                            .unwrap_or(std::cmp::Ordering::Equal)
2288                            .then_with(|| a.0.cmp(b.0))
2289                    });
2290                    if reverse {
2291                        sorted.reverse();
2292                    }
2293                    match sorted.iter().position(|(m, _)| m.as_slice() == member) {
2294                        Some(pos) => CommandResponse::Integer(pos as i64),
2295                        None => CommandResponse::Nil,
2296                    }
2297                }
2298                _ => CommandResponse::Error(
2299                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2300                ),
2301            },
2302            None => CommandResponse::Nil,
2303        }
2304    }
2305
2306    fn cmd_zcard(&mut self, key: &[u8]) -> CommandResponse {
2307        self.lazy_expire(key);
2308        let compact = CompactKey::new(key);
2309        match self.entries.get(&compact) {
2310            Some(entry) => match &entry.value {
2311                Value::SortedSet(map) => CommandResponse::Integer(map.len() as i64),
2312                _ => CommandResponse::Error(
2313                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2314                ),
2315            },
2316            None => CommandResponse::Integer(0),
2317        }
2318    }
2319
2320    fn cmd_zrange(
2321        &mut self,
2322        key: &[u8],
2323        start: i64,
2324        stop: i64,
2325        withscores: bool,
2326        reverse: bool,
2327    ) -> CommandResponse {
2328        self.lazy_expire(key);
2329        let compact = CompactKey::new(key);
2330        match self.entries.get(&compact) {
2331            Some(entry) => match &entry.value {
2332                Value::SortedSet(map) => {
2333                    let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2334                    sorted.sort_by(|a, b| {
2335                        a.1.partial_cmp(b.1)
2336                            .unwrap_or(std::cmp::Ordering::Equal)
2337                            .then_with(|| a.0.cmp(b.0))
2338                    });
2339                    if reverse {
2340                        sorted.reverse();
2341                    }
2342                    let len = sorted.len() as i64;
2343                    let s = normalize_index(start, len);
2344                    let e = normalize_index(stop, len);
2345                    if s > e || s >= len as usize {
2346                        return CommandResponse::Array(vec![]);
2347                    }
2348                    let e = e.min(len as usize - 1);
2349                    let mut results = Vec::new();
2350                    for (member, score) in &sorted[s..=e] {
2351                        results.push(CommandResponse::BulkString(member.to_vec()));
2352                        if withscores {
2353                            results.push(CommandResponse::BulkString(
2354                                format!("{}", score).into_bytes(),
2355                            ));
2356                        }
2357                    }
2358                    CommandResponse::Array(results)
2359                }
2360                _ => CommandResponse::Error(
2361                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2362                ),
2363            },
2364            None => CommandResponse::Array(vec![]),
2365        }
2366    }
2367
2368    fn cmd_zrangebyscore(
2369        &mut self,
2370        key: &[u8],
2371        min: f64,
2372        max: f64,
2373        withscores: bool,
2374        offset: Option<usize>,
2375        count: Option<usize>,
2376    ) -> CommandResponse {
2377        self.lazy_expire(key);
2378        let compact = CompactKey::new(key);
2379        match self.entries.get(&compact) {
2380            Some(entry) => match &entry.value {
2381                Value::SortedSet(map) => {
2382                    let mut sorted: Vec<(&Vec<u8>, &f64)> = map
2383                        .iter()
2384                        .filter(|(_, s)| **s >= min && **s <= max)
2385                        .collect();
2386                    sorted.sort_by(|a, b| {
2387                        a.1.partial_cmp(b.1)
2388                            .unwrap_or(std::cmp::Ordering::Equal)
2389                            .then_with(|| a.0.cmp(b.0))
2390                    });
2391                    let off = offset.unwrap_or(0);
2392                    let iter: Box<dyn Iterator<Item = &(&Vec<u8>, &f64)>> = if let Some(c) = count {
2393                        Box::new(sorted.iter().skip(off).take(c))
2394                    } else {
2395                        Box::new(sorted.iter().skip(off))
2396                    };
2397                    let mut results = Vec::new();
2398                    for (member, score) in iter {
2399                        results.push(CommandResponse::BulkString(member.to_vec()));
2400                        if withscores {
2401                            results.push(CommandResponse::BulkString(
2402                                format!("{}", score).into_bytes(),
2403                            ));
2404                        }
2405                    }
2406                    CommandResponse::Array(results)
2407                }
2408                _ => CommandResponse::Error(
2409                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2410                ),
2411            },
2412            None => CommandResponse::Array(vec![]),
2413        }
2414    }
2415
2416    fn cmd_zincrby(&mut self, key: &[u8], delta: f64, member: &[u8]) -> CommandResponse {
2417        self.lazy_expire(key);
2418        let compact = CompactKey::new(key);
2419        let zset = self.get_or_create_sorted_set(&compact);
2420        match zset {
2421            Ok(map) => {
2422                let new_score = map.get(member).copied().unwrap_or(0.0) + delta;
2423                map.insert(member.to_vec(), new_score);
2424                CommandResponse::BulkString(format!("{}", new_score).into_bytes())
2425            }
2426            Err(e) => e,
2427        }
2428    }
2429
2430    fn cmd_zcount(&mut self, key: &[u8], min: f64, max: f64) -> CommandResponse {
2431        self.lazy_expire(key);
2432        let compact = CompactKey::new(key);
2433        match self.entries.get(&compact) {
2434            Some(entry) => match &entry.value {
2435                Value::SortedSet(map) => {
2436                    let count = map.values().filter(|s| **s >= min && **s <= max).count();
2437                    CommandResponse::Integer(count as i64)
2438                }
2439                _ => CommandResponse::Error(
2440                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2441                ),
2442            },
2443            None => CommandResponse::Integer(0),
2444        }
2445    }
2446
2447    fn cmd_zrevrangebyscore(
2448        &mut self,
2449        key: &[u8],
2450        max: f64,
2451        min: f64,
2452        withscores: bool,
2453        offset: Option<usize>,
2454        count: Option<usize>,
2455    ) -> CommandResponse {
2456        self.lazy_expire(key);
2457        let compact = CompactKey::new(key);
2458        match self.entries.get(&compact) {
2459            Some(entry) => match &entry.value {
2460                Value::SortedSet(map) => {
2461                    let mut sorted: Vec<(&Vec<u8>, &f64)> = map
2462                        .iter()
2463                        .filter(|(_, s)| **s >= min && **s <= max)
2464                        .collect();
2465                    sorted.sort_by(|a, b| {
2466                        b.1.partial_cmp(a.1)
2467                            .unwrap_or(std::cmp::Ordering::Equal)
2468                            .then_with(|| b.0.cmp(a.0))
2469                    });
2470                    let off = offset.unwrap_or(0);
2471                    let iter: Box<dyn Iterator<Item = &(&Vec<u8>, &f64)>> = if let Some(c) = count {
2472                        Box::new(sorted.iter().skip(off).take(c))
2473                    } else {
2474                        Box::new(sorted.iter().skip(off))
2475                    };
2476                    let mut results = Vec::new();
2477                    for (member, score) in iter {
2478                        results.push(CommandResponse::BulkString(member.to_vec()));
2479                        if withscores {
2480                            results.push(CommandResponse::BulkString(
2481                                format!("{}", score).into_bytes(),
2482                            ));
2483                        }
2484                    }
2485                    CommandResponse::Array(results)
2486                }
2487                _ => CommandResponse::Error(
2488                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2489                ),
2490            },
2491            None => CommandResponse::Array(vec![]),
2492        }
2493    }
2494
2495    fn cmd_zpopmin(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
2496        self.lazy_expire(key);
2497        let compact = CompactKey::new(key);
2498        let result = match self.entries.get_mut(&compact) {
2499            Some(entry) => match &mut entry.value {
2500                Value::SortedSet(map) => {
2501                    if map.is_empty() {
2502                        return CommandResponse::Array(vec![]);
2503                    }
2504                    let n = count.unwrap_or(1);
2505                    let mut sorted: Vec<(Vec<u8>, f64)> =
2506                        map.iter().map(|(m, s)| (m.clone(), *s)).collect();
2507                    sorted.sort_by(|a, b| {
2508                        a.1.partial_cmp(&b.1)
2509                            .unwrap_or(std::cmp::Ordering::Equal)
2510                            .then_with(|| a.0.cmp(&b.0))
2511                    });
2512                    let take = n.min(sorted.len());
2513                    let mut results = Vec::with_capacity(take * 2);
2514                    for (member, score) in sorted.iter().take(take) {
2515                        map.remove(member);
2516                        results.push(CommandResponse::BulkString(member.clone()));
2517                        results.push(CommandResponse::BulkString(
2518                            format!("{}", score).into_bytes(),
2519                        ));
2520                    }
2521                    let is_empty = map.is_empty();
2522                    (CommandResponse::Array(results), is_empty)
2523                }
2524                _ => {
2525                    return CommandResponse::Error(
2526                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2527                    )
2528                }
2529            },
2530            None => return CommandResponse::Array(vec![]),
2531        };
2532        if result.1 {
2533            self.entries.remove(&compact);
2534        }
2535        result.0
2536    }
2537
2538    fn cmd_zpopmax(&mut self, key: &[u8], count: Option<usize>) -> CommandResponse {
2539        self.lazy_expire(key);
2540        let compact = CompactKey::new(key);
2541        let result = match self.entries.get_mut(&compact) {
2542            Some(entry) => match &mut entry.value {
2543                Value::SortedSet(map) => {
2544                    if map.is_empty() {
2545                        return CommandResponse::Array(vec![]);
2546                    }
2547                    let n = count.unwrap_or(1);
2548                    let mut sorted: Vec<(Vec<u8>, f64)> =
2549                        map.iter().map(|(m, s)| (m.clone(), *s)).collect();
2550                    sorted.sort_by(|a, b| {
2551                        b.1.partial_cmp(&a.1)
2552                            .unwrap_or(std::cmp::Ordering::Equal)
2553                            .then_with(|| b.0.cmp(&a.0))
2554                    });
2555                    let take = n.min(sorted.len());
2556                    let mut results = Vec::with_capacity(take * 2);
2557                    for (member, score) in sorted.iter().take(take) {
2558                        map.remove(member);
2559                        results.push(CommandResponse::BulkString(member.clone()));
2560                        results.push(CommandResponse::BulkString(
2561                            format!("{}", score).into_bytes(),
2562                        ));
2563                    }
2564                    let is_empty = map.is_empty();
2565                    (CommandResponse::Array(results), is_empty)
2566                }
2567                _ => {
2568                    return CommandResponse::Error(
2569                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2570                    )
2571                }
2572            },
2573            None => return CommandResponse::Array(vec![]),
2574        };
2575        if result.1 {
2576            self.entries.remove(&compact);
2577        }
2578        result.0
2579    }
2580
2581    fn cmd_zrangebylex(
2582        &mut self,
2583        key: &[u8],
2584        min: &[u8],
2585        max: &[u8],
2586        offset: Option<usize>,
2587        count: Option<usize>,
2588        reverse: bool,
2589    ) -> CommandResponse {
2590        self.lazy_expire(key);
2591        let compact = CompactKey::new(key);
2592        match self.entries.get(&compact) {
2593            Some(entry) => match &entry.value {
2594                Value::SortedSet(map) => {
2595                    let mut sorted: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2596                    sorted.sort_by(|a, b| {
2597                        a.1.partial_cmp(b.1)
2598                            .unwrap_or(std::cmp::Ordering::Equal)
2599                            .then_with(|| a.0.cmp(b.0))
2600                    });
2601                    if reverse {
2602                        sorted.reverse();
2603                    }
2604                    let filtered: Vec<&Vec<u8>> = sorted
2605                        .iter()
2606                        .map(|(m, _)| *m)
2607                        .filter(|m| lex_in_range(m, min, max))
2608                        .collect();
2609                    let off = offset.unwrap_or(0);
2610                    let iter: Box<dyn Iterator<Item = &&Vec<u8>>> = if let Some(c) = count {
2611                        Box::new(filtered.iter().skip(off).take(c))
2612                    } else {
2613                        Box::new(filtered.iter().skip(off))
2614                    };
2615                    let results: Vec<CommandResponse> = iter
2616                        .map(|m| CommandResponse::BulkString((*m).clone()))
2617                        .collect();
2618                    CommandResponse::Array(results)
2619                }
2620                _ => CommandResponse::Error(
2621                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2622                ),
2623            },
2624            None => CommandResponse::Array(vec![]),
2625        }
2626    }
2627
2628    fn cmd_zlexcount(&mut self, key: &[u8], min: &[u8], max: &[u8]) -> CommandResponse {
2629        self.lazy_expire(key);
2630        let compact = CompactKey::new(key);
2631        match self.entries.get(&compact) {
2632            Some(entry) => match &entry.value {
2633                Value::SortedSet(map) => {
2634                    let count = map.keys().filter(|m| lex_in_range(m, min, max)).count();
2635                    CommandResponse::Integer(count as i64)
2636                }
2637                _ => CommandResponse::Error(
2638                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2639                ),
2640            },
2641            None => CommandResponse::Integer(0),
2642        }
2643    }
2644
2645    fn cmd_zmscore(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
2646        self.lazy_expire(key);
2647        let compact = CompactKey::new(key);
2648        match self.entries.get(&compact) {
2649            Some(entry) => match &entry.value {
2650                Value::SortedSet(map) => {
2651                    let results: Vec<CommandResponse> = members
2652                        .iter()
2653                        .map(|m| match map.get(m.as_slice()) {
2654                            Some(score) => {
2655                                CommandResponse::BulkString(format!("{}", score).into_bytes())
2656                            }
2657                            None => CommandResponse::Nil,
2658                        })
2659                        .collect();
2660                    CommandResponse::Array(results)
2661                }
2662                _ => CommandResponse::Error(
2663                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2664                ),
2665            },
2666            None => CommandResponse::Array(vec![CommandResponse::Nil; members.len()]),
2667        }
2668    }
2669
2670    fn cmd_zrandmember(
2671        &mut self,
2672        key: &[u8],
2673        count: Option<i64>,
2674        withscores: bool,
2675    ) -> CommandResponse {
2676        self.lazy_expire(key);
2677        let compact = CompactKey::new(key);
2678        match self.entries.get(&compact) {
2679            Some(entry) => match &entry.value {
2680                Value::SortedSet(map) => {
2681                    if map.is_empty() {
2682                        return if count.is_some() {
2683                            CommandResponse::Array(vec![])
2684                        } else {
2685                            CommandResponse::Nil
2686                        };
2687                    }
2688                    let members: Vec<(&Vec<u8>, &f64)> = map.iter().collect();
2689                    match count {
2690                        None => {
2691                            let idx = (self.eviction_counter as usize) % members.len();
2692                            self.eviction_counter = self.eviction_counter.wrapping_add(1);
2693                            CommandResponse::BulkString(members[idx].0.clone())
2694                        }
2695                        Some(c) if c >= 0 => {
2696                            let take = (c as usize).min(members.len());
2697                            let start = (self.eviction_counter as usize) % members.len();
2698                            self.eviction_counter = self.eviction_counter.wrapping_add(take as u64);
2699                            let mut result =
2700                                Vec::with_capacity(if withscores { take * 2 } else { take });
2701                            for i in 0..take {
2702                                let idx = (start + i) % members.len();
2703                                result.push(CommandResponse::BulkString(members[idx].0.clone()));
2704                                if withscores {
2705                                    result.push(CommandResponse::BulkString(
2706                                        format!("{}", members[idx].1).into_bytes(),
2707                                    ));
2708                                }
2709                            }
2710                            CommandResponse::Array(result)
2711                        }
2712                        Some(c) => {
2713                            let abs_count = c.unsigned_abs() as usize;
2714                            let mut result = Vec::with_capacity(if withscores {
2715                                abs_count * 2
2716                            } else {
2717                                abs_count
2718                            });
2719                            for i in 0..abs_count {
2720                                let idx = ((self.eviction_counter as usize) + i) % members.len();
2721                                result.push(CommandResponse::BulkString(members[idx].0.clone()));
2722                                if withscores {
2723                                    result.push(CommandResponse::BulkString(
2724                                        format!("{}", members[idx].1).into_bytes(),
2725                                    ));
2726                                }
2727                            }
2728                            self.eviction_counter =
2729                                self.eviction_counter.wrapping_add(abs_count as u64);
2730                            CommandResponse::Array(result)
2731                        }
2732                    }
2733                }
2734                _ => CommandResponse::Error(
2735                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2736                ),
2737            },
2738            None => {
2739                if count.is_some() {
2740                    CommandResponse::Array(vec![])
2741                } else {
2742                    CommandResponse::Nil
2743                }
2744            }
2745        }
2746    }
2747
2748    fn cmd_zscan(
2749        &mut self,
2750        key: &[u8],
2751        cursor: u64,
2752        pattern: Option<&str>,
2753        count: usize,
2754    ) -> CommandResponse {
2755        self.lazy_expire(key);
2756        let compact = CompactKey::new(key);
2757        match self.entries.get(&compact) {
2758            Some(entry) => match &entry.value {
2759                Value::SortedSet(map) => {
2760                    let mut members: Vec<(&Vec<u8>, &f64)> = map
2761                        .iter()
2762                        .filter(|(m, _)| pattern.is_none_or(|p| glob_match(p, m)))
2763                        .collect();
2764                    members.sort_by(|a, b| a.0.cmp(b.0));
2765
2766                    let start = cursor as usize;
2767                    let end = (start + count).min(members.len());
2768
2769                    if start >= members.len() {
2770                        return CommandResponse::Array(vec![
2771                            CommandResponse::BulkString(b"0".to_vec()),
2772                            CommandResponse::Array(vec![]),
2773                        ]);
2774                    }
2775
2776                    let mut results = Vec::with_capacity((end - start) * 2);
2777                    for (m, s) in &members[start..end] {
2778                        results.push(CommandResponse::BulkString((*m).clone()));
2779                        results.push(CommandResponse::BulkString(format!("{}", s).into_bytes()));
2780                    }
2781
2782                    let next_cursor = if end >= members.len() { 0 } else { end as u64 };
2783
2784                    CommandResponse::Array(vec![
2785                        CommandResponse::BulkString(next_cursor.to_string().into_bytes()),
2786                        CommandResponse::Array(results),
2787                    ])
2788                }
2789                _ => CommandResponse::Error(
2790                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2791                ),
2792            },
2793            None => CommandResponse::Array(vec![
2794                CommandResponse::BulkString(b"0".to_vec()),
2795                CommandResponse::Array(vec![]),
2796            ]),
2797        }
2798    }
2799
2800    // ─── HyperLogLog operations ─────────────────────────────────────────
2801
2802    fn cmd_pfadd(&mut self, key: &[u8], elements: &[Vec<u8>]) -> CommandResponse {
2803        self.lazy_expire(key);
2804        let compact = CompactKey::new(key);
2805        let is_new = !self.entries.contains_key(&compact);
2806        let mut registers = if is_new {
2807            vec![0u8; HLL_REGISTER_BYTES]
2808        } else {
2809            match self.entries.get(&compact) {
2810                Some(entry) => match entry.value.as_bytes() {
2811                    Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => bytes,
2812                    Some(_) | None => {
2813                        return CommandResponse::Error(
2814                            "WRONGTYPE Operation against a key holding the wrong kind of value"
2815                                .into(),
2816                        )
2817                    }
2818                },
2819                None => vec![0u8; HLL_REGISTER_BYTES],
2820            }
2821        };
2822
2823        let mut changed = false;
2824        for elem in elements {
2825            if hll_add(&mut registers, elem) {
2826                changed = true;
2827            }
2828        }
2829
2830        if changed || is_new {
2831            let mut memory_delta = 0usize;
2832            if is_new {
2833                memory_delta += key.len()
2834                    + std::mem::size_of::<CompactKey>()
2835                    + std::mem::size_of::<KeyEntry>()
2836                    + HLL_REGISTER_BYTES;
2837            }
2838            let entry = KeyEntry::new(compact.clone(), Value::from_raw_bytes(&registers));
2839            self.entries.insert(compact, entry);
2840            self.memory_used += memory_delta;
2841        }
2842
2843        CommandResponse::Integer(if changed { 1 } else { 0 })
2844    }
2845
2846    fn cmd_pfcount_single(&mut self, key: &[u8]) -> CommandResponse {
2847        self.lazy_expire(key);
2848        let compact = CompactKey::new(key);
2849        match self.entries.get(&compact) {
2850            Some(entry) => match entry.value.as_bytes() {
2851                Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2852                    CommandResponse::Integer(hll_count(&bytes) as i64)
2853                }
2854                Some(_) => CommandResponse::Error(
2855                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2856                ),
2857                None => CommandResponse::Error(
2858                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2859                ),
2860            },
2861            None => CommandResponse::Integer(0),
2862        }
2863    }
2864
2865    fn cmd_pfcount_multi(&mut self, keys: &[Vec<u8>]) -> CommandResponse {
2866        let mut merged = vec![0u8; HLL_REGISTER_BYTES];
2867        for key in keys {
2868            self.lazy_expire(key);
2869            let compact = CompactKey::new(key);
2870            if let Some(entry) = self.entries.get(&compact) {
2871                match entry.value.as_bytes() {
2872                    Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2873                        hll_merge(&mut merged, &bytes);
2874                    }
2875                    Some(_) => {
2876                        return CommandResponse::Error(
2877                            "WRONGTYPE Operation against a key holding the wrong kind of value"
2878                                .into(),
2879                        )
2880                    }
2881                    None => {
2882                        return CommandResponse::Error(
2883                            "WRONGTYPE Operation against a key holding the wrong kind of value"
2884                                .into(),
2885                        )
2886                    }
2887                }
2888            }
2889        }
2890        CommandResponse::Integer(hll_count(&merged) as i64)
2891    }
2892
2893    fn cmd_pfmerge(&mut self, destkey: &[u8], sourcekeys: &[Vec<u8>]) -> CommandResponse {
2894        let mut merged = vec![0u8; HLL_REGISTER_BYTES];
2895
2896        let dest_compact = CompactKey::new(destkey);
2897        self.lazy_expire(destkey);
2898        if let Some(entry) = self.entries.get(&dest_compact) {
2899            match entry.value.as_bytes() {
2900                Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2901                    hll_merge(&mut merged, &bytes);
2902                }
2903                Some(_) => {
2904                    return CommandResponse::Error(
2905                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2906                    )
2907                }
2908                None => {
2909                    return CommandResponse::Error(
2910                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
2911                    )
2912                }
2913            }
2914        }
2915
2916        for key in sourcekeys {
2917            self.lazy_expire(key);
2918            let compact = CompactKey::new(key);
2919            if let Some(entry) = self.entries.get(&compact) {
2920                match entry.value.as_bytes() {
2921                    Some(bytes) if bytes.len() == HLL_REGISTER_BYTES => {
2922                        hll_merge(&mut merged, &bytes);
2923                    }
2924                    Some(_) => {
2925                        return CommandResponse::Error(
2926                            "WRONGTYPE Operation against a key holding the wrong kind of value"
2927                                .into(),
2928                        )
2929                    }
2930                    None => {
2931                        return CommandResponse::Error(
2932                            "WRONGTYPE Operation against a key holding the wrong kind of value"
2933                                .into(),
2934                        )
2935                    }
2936                }
2937            }
2938        }
2939
2940        let is_new = !self.entries.contains_key(&dest_compact);
2941        let mut memory_delta = 0usize;
2942        if is_new {
2943            memory_delta += destkey.len()
2944                + std::mem::size_of::<CompactKey>()
2945                + std::mem::size_of::<KeyEntry>()
2946                + HLL_REGISTER_BYTES;
2947        }
2948        let entry = KeyEntry::new(dest_compact.clone(), Value::from_raw_bytes(&merged));
2949        self.entries.insert(dest_compact, entry);
2950        self.memory_used += memory_delta;
2951
2952        CommandResponse::Ok
2953    }
2954
2955    // ─── Bitmap operations ──────────────────────────────────────────
2956
2957    fn get_string_bytes(&mut self, key: &[u8]) -> Option<Vec<u8>> {
2958        let compact = CompactKey::new(key);
2959        self.entries.get(&compact).and_then(|e| e.value.as_bytes())
2960    }
2961
2962    fn set_string_value(&mut self, key: &[u8], data: Vec<u8>) {
2963        let compact = CompactKey::new(key);
2964        let is_new = !self.entries.contains_key(&compact);
2965        let new_value = Value::from_raw_bytes(&data);
2966        if is_new {
2967            let size = Self::estimate_key_entry_size(key, &new_value);
2968            let entry = KeyEntry::new(compact.clone(), new_value);
2969            self.entries.insert(compact, entry);
2970            self.memory_used += size;
2971        } else {
2972            let entry = self
2973                .entries
2974                .get_mut(&compact)
2975                .expect("checked contains_key");
2976            let old_size = entry.value.estimated_size();
2977            entry.value = new_value;
2978            let new_size = entry.value.estimated_size();
2979            if new_size > old_size {
2980                self.memory_used += new_size - old_size;
2981            } else {
2982                self.memory_used = self.memory_used.saturating_sub(old_size - new_size);
2983            }
2984        }
2985    }
2986
2987    fn cmd_setbit(&mut self, key: &[u8], offset: u64, value: u8) -> CommandResponse {
2988        self.lazy_expire(key);
2989        let byte_idx = (offset / 8) as usize;
2990        let bit_idx = 7 - (offset % 8) as usize;
2991
2992        let mut data = self.get_string_bytes(key).unwrap_or_default();
2993
2994        if byte_idx >= data.len() {
2995            data.resize(byte_idx + 1, 0);
2996        }
2997
2998        let old_bit = (data[byte_idx] >> bit_idx) & 1;
2999
3000        if value == 1 {
3001            data[byte_idx] |= 1 << bit_idx;
3002        } else {
3003            data[byte_idx] &= !(1 << bit_idx);
3004        }
3005
3006        self.set_string_value(key, data);
3007        CommandResponse::Integer(old_bit as i64)
3008    }
3009
3010    fn cmd_getbit(&mut self, key: &[u8], offset: u64) -> CommandResponse {
3011        self.lazy_expire(key);
3012        let byte_idx = (offset / 8) as usize;
3013        let bit_idx = 7 - (offset % 8) as usize;
3014
3015        match self.get_string_bytes(key) {
3016            Some(data) => {
3017                if byte_idx >= data.len() {
3018                    CommandResponse::Integer(0)
3019                } else {
3020                    CommandResponse::Integer(((data[byte_idx] >> bit_idx) & 1) as i64)
3021                }
3022            }
3023            None => CommandResponse::Integer(0),
3024        }
3025    }
3026
3027    fn cmd_bitcount(
3028        &mut self,
3029        key: &[u8],
3030        start: Option<i64>,
3031        end: Option<i64>,
3032        use_bit: bool,
3033    ) -> CommandResponse {
3034        self.lazy_expire(key);
3035        let data = match self.get_string_bytes(key) {
3036            Some(d) => d,
3037            None => return CommandResponse::Integer(0),
3038        };
3039
3040        if data.is_empty() {
3041            return CommandResponse::Integer(0);
3042        }
3043
3044        if use_bit {
3045            let total_bits = (data.len() * 8) as i64;
3046            let s = normalize_index(start.unwrap_or(0), total_bits);
3047            let e = normalize_index(end.unwrap_or(total_bits - 1), total_bits);
3048            if s > e {
3049                return CommandResponse::Integer(0);
3050            }
3051            let mut count = 0i64;
3052            for bit_pos in s..=e {
3053                let byte_idx = bit_pos / 8;
3054                let bit_idx = 7 - (bit_pos % 8);
3055                if byte_idx < data.len() && (data[byte_idx] >> bit_idx) & 1 == 1 {
3056                    count += 1;
3057                }
3058            }
3059            CommandResponse::Integer(count)
3060        } else {
3061            let len = data.len() as i64;
3062            let s = normalize_index(start.unwrap_or(0), len);
3063            let e = normalize_index(end.unwrap_or(len - 1), len);
3064            if s > e || s >= data.len() {
3065                return CommandResponse::Integer(0);
3066            }
3067            let e = e.min(data.len() - 1);
3068            let count: u32 = data[s..=e].iter().map(|b| b.count_ones()).sum();
3069            CommandResponse::Integer(count as i64)
3070        }
3071    }
3072
3073    fn cmd_bitop(
3074        &mut self,
3075        operation: BitOperation,
3076        destkey: &[u8],
3077        keys: &[Vec<u8>],
3078    ) -> CommandResponse {
3079        let mut buffers: Vec<Vec<u8>> = Vec::with_capacity(keys.len());
3080        let mut max_len = 0usize;
3081
3082        for key in keys {
3083            self.lazy_expire(key);
3084            let data = self.get_string_bytes(key).unwrap_or_default();
3085            max_len = max_len.max(data.len());
3086            buffers.push(data);
3087        }
3088
3089        if buffers.is_empty() {
3090            self.set_string_value(destkey, Vec::new());
3091            return CommandResponse::Integer(0);
3092        }
3093
3094        let mut result = vec![0u8; max_len];
3095
3096        match operation {
3097            BitOperation::And => {
3098                result = vec![0xFF; max_len];
3099                for buf in &buffers {
3100                    for (i, byte) in result.iter_mut().enumerate() {
3101                        let src = if i < buf.len() { buf[i] } else { 0 };
3102                        *byte &= src;
3103                    }
3104                }
3105            }
3106            BitOperation::Or => {
3107                for buf in &buffers {
3108                    for (i, &src) in buf.iter().enumerate() {
3109                        result[i] |= src;
3110                    }
3111                }
3112            }
3113            BitOperation::Xor => {
3114                for buf in &buffers {
3115                    for (i, &src) in buf.iter().enumerate() {
3116                        result[i] ^= src;
3117                    }
3118                }
3119            }
3120            BitOperation::Not => {
3121                let buf = &buffers[0];
3122                result = vec![0u8; buf.len()];
3123                for (i, &src) in buf.iter().enumerate() {
3124                    result[i] = !src;
3125                }
3126            }
3127        }
3128
3129        let len = result.len() as i64;
3130        self.set_string_value(destkey, result);
3131        CommandResponse::Integer(len)
3132    }
3133
3134    fn cmd_bitpos(
3135        &mut self,
3136        key: &[u8],
3137        bit: u8,
3138        start: Option<i64>,
3139        end: Option<i64>,
3140        use_bit: bool,
3141    ) -> CommandResponse {
3142        self.lazy_expire(key);
3143        let data = match self.get_string_bytes(key) {
3144            Some(d) if !d.is_empty() => d,
3145            _ => {
3146                return if bit == 0 {
3147                    CommandResponse::Integer(0)
3148                } else {
3149                    CommandResponse::Integer(-1)
3150                };
3151            }
3152        };
3153
3154        let has_explicit_end = end.is_some();
3155
3156        if use_bit {
3157            let total_bits = (data.len() * 8) as i64;
3158            let s = normalize_index(start.unwrap_or(0), total_bits);
3159            let e = normalize_index(end.unwrap_or(total_bits - 1), total_bits);
3160            if s > e {
3161                return CommandResponse::Integer(-1);
3162            }
3163            for bit_pos in s..=e.min(data.len() * 8 - 1) {
3164                let byte_idx = bit_pos / 8;
3165                let bit_idx = 7 - (bit_pos % 8);
3166                let current = (data[byte_idx] >> bit_idx) & 1;
3167                if current == bit {
3168                    return CommandResponse::Integer(bit_pos as i64);
3169                }
3170            }
3171            CommandResponse::Integer(-1)
3172        } else {
3173            let len = data.len() as i64;
3174            let s = normalize_index(start.unwrap_or(0), len);
3175            let e = normalize_index(end.unwrap_or(len - 1), len);
3176            if s > e || s >= data.len() {
3177                return CommandResponse::Integer(-1);
3178            }
3179            let e = e.min(data.len() - 1);
3180            for (byte_idx, &byte_val) in data.iter().enumerate().take(e + 1).skip(s) {
3181                for bit_idx in (0..8).rev() {
3182                    let current = (byte_val >> bit_idx) & 1;
3183                    if current == bit {
3184                        return CommandResponse::Integer((byte_idx * 8 + (7 - bit_idx)) as i64);
3185                    }
3186                }
3187            }
3188            if bit == 0 && !has_explicit_end {
3189                CommandResponse::Integer((data.len() * 8) as i64)
3190            } else {
3191                CommandResponse::Integer(-1)
3192            }
3193        }
3194    }
3195
3196    fn cmd_bitfield(&mut self, key: &[u8], operations: &[BitFieldOperation]) -> CommandResponse {
3197        self.lazy_expire(key);
3198        let compact = CompactKey::new(key);
3199
3200        let mut data = match self.entries.get(&compact) {
3201            Some(entry) => match &entry.value {
3202                Value::InlineStr { .. } | Value::HeapStr(_) | Value::Int(_) => {
3203                    entry.value.as_bytes().unwrap_or_default()
3204                }
3205                _ => {
3206                    return CommandResponse::Error(
3207                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3208                    )
3209                }
3210            },
3211            None => Vec::new(),
3212        };
3213
3214        let mut overflow = BitFieldOverflow::Wrap;
3215        let mut responses = Vec::new();
3216        let mut dirty = false;
3217
3218        for op in operations {
3219            match *op {
3220                BitFieldOperation::Overflow(mode) => overflow = mode,
3221                BitFieldOperation::Get { encoding, offset } => {
3222                    let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3223                        return CommandResponse::Error(
3224                            "ERR bit offset is not an integer or out of range".into(),
3225                        );
3226                    };
3227                    let value = Self::bitfield_read(&data, bit_offset, encoding);
3228                    responses.push(CommandResponse::Integer(value));
3229                }
3230                BitFieldOperation::Set {
3231                    encoding,
3232                    offset,
3233                    value,
3234                } => {
3235                    let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3236                        return CommandResponse::Error(
3237                            "ERR bit offset is not an integer or out of range".into(),
3238                        );
3239                    };
3240                    let old_value = Self::bitfield_read(&data, bit_offset, encoding);
3241                    let Some(applied) =
3242                        Self::apply_bitfield_overflow(value as i128, encoding, overflow)
3243                    else {
3244                        responses.push(CommandResponse::Nil);
3245                        continue;
3246                    };
3247                    Self::bitfield_write(&mut data, bit_offset, encoding, applied);
3248                    responses.push(CommandResponse::Integer(old_value));
3249                    dirty = true;
3250                }
3251                BitFieldOperation::IncrBy {
3252                    encoding,
3253                    offset,
3254                    increment,
3255                } => {
3256                    let Some(bit_offset) = Self::resolve_bitfield_offset(encoding, offset) else {
3257                        return CommandResponse::Error(
3258                            "ERR bit offset is not an integer or out of range".into(),
3259                        );
3260                    };
3261                    let current = Self::bitfield_read(&data, bit_offset, encoding) as i128;
3262                    let target = current + increment as i128;
3263                    let Some(applied) = Self::apply_bitfield_overflow(target, encoding, overflow)
3264                    else {
3265                        responses.push(CommandResponse::Nil);
3266                        continue;
3267                    };
3268                    Self::bitfield_write(&mut data, bit_offset, encoding, applied);
3269                    responses.push(CommandResponse::Integer(applied as i64));
3270                    dirty = true;
3271                }
3272            }
3273        }
3274
3275        if dirty {
3276            self.set_string_value(key, data);
3277        }
3278
3279        CommandResponse::Array(responses)
3280    }
3281
3282    fn resolve_bitfield_offset(encoding: BitFieldEncoding, offset: BitFieldOffset) -> Option<u64> {
3283        let base = match offset {
3284            BitFieldOffset::Absolute(v) => v,
3285            BitFieldOffset::Multiplied(v) => v.checked_mul(encoding.bits as i64)?,
3286        };
3287        if base < 0 {
3288            return None;
3289        }
3290        let start = base as u64;
3291        start.checked_add(encoding.bits as u64)?;
3292        Some(start)
3293    }
3294
3295    fn bitfield_read(data: &[u8], bit_offset: u64, encoding: BitFieldEncoding) -> i64 {
3296        let width = encoding.bits as u32;
3297        let mut raw: u128 = 0;
3298        for i in 0..width {
3299            let pos = bit_offset + i as u64;
3300            let byte_idx = (pos / 8) as usize;
3301            let bit_idx = 7 - (pos % 8) as usize;
3302            let bit = if byte_idx < data.len() {
3303                (data[byte_idx] >> bit_idx) & 1
3304            } else {
3305                0
3306            };
3307            raw = (raw << 1) | bit as u128;
3308        }
3309
3310        if encoding.signed {
3311            let shift = 128 - width;
3312            (((raw << shift) as i128) >> shift) as i64
3313        } else {
3314            raw as i64
3315        }
3316    }
3317
3318    fn bitfield_write(
3319        data: &mut Vec<u8>,
3320        bit_offset: u64,
3321        encoding: BitFieldEncoding,
3322        value: i128,
3323    ) {
3324        let width = encoding.bits as u32;
3325        let encoded = Self::encode_bitfield_value(value, encoding);
3326        let end_bit = bit_offset + width as u64;
3327        let needed_bytes = end_bit.div_ceil(8) as usize;
3328        if needed_bytes > data.len() {
3329            data.resize(needed_bytes, 0);
3330        }
3331
3332        for i in 0..width {
3333            let src_shift = width - 1 - i;
3334            let bit = ((encoded >> src_shift) & 1) as u8;
3335            let pos = bit_offset + i as u64;
3336            let byte_idx = (pos / 8) as usize;
3337            let bit_idx = 7 - (pos % 8) as usize;
3338            if bit == 1 {
3339                data[byte_idx] |= 1 << bit_idx;
3340            } else {
3341                data[byte_idx] &= !(1 << bit_idx);
3342            }
3343        }
3344    }
3345
3346    fn bitfield_bounds(encoding: BitFieldEncoding) -> (i128, i128) {
3347        if encoding.signed {
3348            let min = -(1i128 << (encoding.bits as u32 - 1));
3349            let max = (1i128 << (encoding.bits as u32 - 1)) - 1;
3350            (min, max)
3351        } else {
3352            (0, (1i128 << encoding.bits as u32) - 1)
3353        }
3354    }
3355
3356    fn apply_bitfield_overflow(
3357        value: i128,
3358        encoding: BitFieldEncoding,
3359        mode: BitFieldOverflow,
3360    ) -> Option<i128> {
3361        let (min, max) = Self::bitfield_bounds(encoding);
3362        if (min..=max).contains(&value) {
3363            return Some(value);
3364        }
3365
3366        match mode {
3367            BitFieldOverflow::Fail => None,
3368            BitFieldOverflow::Sat => Some(value.clamp(min, max)),
3369            BitFieldOverflow::Wrap => {
3370                let modulo = 1i128 << encoding.bits as u32;
3371                let wrapped = ((value % modulo) + modulo) % modulo;
3372                if encoding.signed {
3373                    let sign_boundary = 1i128 << (encoding.bits as u32 - 1);
3374                    if wrapped >= sign_boundary {
3375                        Some(wrapped - modulo)
3376                    } else {
3377                        Some(wrapped)
3378                    }
3379                } else {
3380                    Some(wrapped)
3381                }
3382            }
3383        }
3384    }
3385
3386    fn encode_bitfield_value(value: i128, encoding: BitFieldEncoding) -> u128 {
3387        if encoding.signed && value < 0 {
3388            (value + (1i128 << encoding.bits as u32)) as u128
3389        } else {
3390            value as u128
3391        }
3392    }
3393
3394    // ─── Geo operations ─────────────────────────────────────────────
3395
3396    fn cmd_geoadd(
3397        &mut self,
3398        key: &[u8],
3399        nx: bool,
3400        xx: bool,
3401        ch: bool,
3402        members: &[(f64, f64, Vec<u8>)],
3403    ) -> CommandResponse {
3404        self.lazy_expire(key);
3405        let compact = CompactKey::new(key);
3406        let is_new = !self.entries.contains_key(&compact);
3407        let zset = self.get_or_create_sorted_set(&compact);
3408        match zset {
3409            Ok(map) => {
3410                let mut memory_delta: usize = 0;
3411                if is_new {
3412                    memory_delta += key.len()
3413                        + std::mem::size_of::<CompactKey>()
3414                        + std::mem::size_of::<KeyEntry>();
3415                }
3416                let mut count = 0i64;
3417                let member_overhead = std::mem::size_of::<f64>();
3418                for (lon, lat, member) in members {
3419                    let score = geo_encode(*lon, *lat);
3420                    let existing = map.get(member).copied();
3421                    match existing {
3422                        Some(old_score) => {
3423                            if nx {
3424                                continue;
3425                            }
3426                            if (old_score - score).abs() > f64::EPSILON {
3427                                map.insert(member.clone(), score);
3428                                if ch {
3429                                    count += 1;
3430                                }
3431                            }
3432                        }
3433                        None => {
3434                            if xx {
3435                                continue;
3436                            }
3437                            map.insert(member.clone(), score);
3438                            memory_delta += member.len() + member_overhead;
3439                            count += 1;
3440                        }
3441                    }
3442                }
3443                self.memory_used += memory_delta;
3444                CommandResponse::Integer(count)
3445            }
3446            Err(e) => e,
3447        }
3448    }
3449
3450    fn cmd_geodist(
3451        &mut self,
3452        key: &[u8],
3453        member1: &[u8],
3454        member2: &[u8],
3455        unit: GeoUnit,
3456    ) -> CommandResponse {
3457        self.lazy_expire(key);
3458        let compact = CompactKey::new(key);
3459        match self.entries.get(&compact) {
3460            Some(entry) => match &entry.value {
3461                Value::SortedSet(map) => {
3462                    let s1 = match map.get(member1) {
3463                        Some(s) => *s,
3464                        None => return CommandResponse::Nil,
3465                    };
3466                    let s2 = match map.get(member2) {
3467                        Some(s) => *s,
3468                        None => return CommandResponse::Nil,
3469                    };
3470                    let (lon1, lat1) = geo_decode(s1);
3471                    let (lon2, lat2) = geo_decode(s2);
3472                    let dist = haversine_distance(lat1, lon1, lat2, lon2);
3473                    let converted = geo_convert_distance(dist, unit);
3474                    CommandResponse::BulkString(format!("{:.4}", converted).into_bytes())
3475                }
3476                _ => CommandResponse::Error(
3477                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3478                ),
3479            },
3480            None => CommandResponse::Nil,
3481        }
3482    }
3483
3484    fn cmd_geohash(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
3485        self.lazy_expire(key);
3486        let compact = CompactKey::new(key);
3487        match self.entries.get(&compact) {
3488            Some(entry) => match &entry.value {
3489                Value::SortedSet(map) => {
3490                    let results: Vec<CommandResponse> = members
3491                        .iter()
3492                        .map(|member| match map.get(member.as_slice()) {
3493                            Some(&score) => {
3494                                let (lon, lat) = geo_decode(score);
3495                                let hash = geo_encode_base32(lon, lat);
3496                                CommandResponse::BulkString(hash.into_bytes())
3497                            }
3498                            None => CommandResponse::Nil,
3499                        })
3500                        .collect();
3501                    CommandResponse::Array(results)
3502                }
3503                _ => CommandResponse::Error(
3504                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3505                ),
3506            },
3507            None => {
3508                let results = vec![CommandResponse::Nil; members.len()];
3509                CommandResponse::Array(results)
3510            }
3511        }
3512    }
3513
3514    fn cmd_geopos(&mut self, key: &[u8], members: &[Vec<u8>]) -> CommandResponse {
3515        self.lazy_expire(key);
3516        let compact = CompactKey::new(key);
3517        match self.entries.get(&compact) {
3518            Some(entry) => match &entry.value {
3519                Value::SortedSet(map) => {
3520                    let results: Vec<CommandResponse> = members
3521                        .iter()
3522                        .map(|member| match map.get(member.as_slice()) {
3523                            Some(&score) => {
3524                                let (lon, lat) = geo_decode(score);
3525                                CommandResponse::Array(vec![
3526                                    CommandResponse::BulkString(format!("{:.6}", lon).into_bytes()),
3527                                    CommandResponse::BulkString(format!("{:.6}", lat).into_bytes()),
3528                                ])
3529                            }
3530                            None => CommandResponse::Nil,
3531                        })
3532                        .collect();
3533                    CommandResponse::Array(results)
3534                }
3535                _ => CommandResponse::Error(
3536                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3537                ),
3538            },
3539            None => {
3540                let results = vec![CommandResponse::Nil; members.len()];
3541                CommandResponse::Array(results)
3542            }
3543        }
3544    }
3545
3546    #[allow(clippy::too_many_arguments)]
3547    fn cmd_geosearch(
3548        &mut self,
3549        key: &[u8],
3550        from_member: Option<&[u8]>,
3551        from_lonlat: Option<(f64, f64)>,
3552        radius: f64,
3553        unit: GeoUnit,
3554        asc: Option<bool>,
3555        count: Option<usize>,
3556        withcoord: bool,
3557        withdist: bool,
3558        withhash: bool,
3559    ) -> CommandResponse {
3560        self.lazy_expire(key);
3561        let compact = CompactKey::new(key);
3562        let map = match self.entries.get(&compact) {
3563            Some(entry) => match &entry.value {
3564                Value::SortedSet(map) => map,
3565                _ => {
3566                    return CommandResponse::Error(
3567                        "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3568                    )
3569                }
3570            },
3571            None => return CommandResponse::Array(vec![]),
3572        };
3573
3574        let (center_lon, center_lat) = if let Some(member) = from_member {
3575            match map.get(member) {
3576                Some(&score) => geo_decode(score),
3577                None => return CommandResponse::Array(vec![]),
3578            }
3579        } else if let Some((lon, lat)) = from_lonlat {
3580            (lon, lat)
3581        } else {
3582            return CommandResponse::Error("ERR FROMMEMBER or FROMLONLAT required".into());
3583        };
3584
3585        let radius_meters = geo_to_meters(radius, unit);
3586
3587        let mut results: Vec<(Vec<u8>, f64, f64, f64, f64)> = Vec::new();
3588        for (member, &score) in map.iter() {
3589            let (lon, lat) = geo_decode(score);
3590            let dist = haversine_distance(center_lat, center_lon, lat, lon);
3591            if dist <= radius_meters {
3592                results.push((member.clone(), dist, lon, lat, score));
3593            }
3594        }
3595
3596        if let Some(true) = asc {
3597            results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
3598        } else if let Some(false) = asc {
3599            results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3600        }
3601
3602        if let Some(c) = count {
3603            results.truncate(c);
3604        }
3605
3606        let items: Vec<CommandResponse> = results
3607            .into_iter()
3608            .map(|(member, dist, lon, lat, score)| {
3609                if !withcoord && !withdist && !withhash {
3610                    return CommandResponse::BulkString(member);
3611                }
3612                let mut arr = vec![CommandResponse::BulkString(member)];
3613                if withdist {
3614                    let converted = geo_convert_distance(dist, unit);
3615                    arr.push(CommandResponse::BulkString(
3616                        format!("{:.4}", converted).into_bytes(),
3617                    ));
3618                }
3619                if withhash {
3620                    arr.push(CommandResponse::Integer(score.to_bits() as i64));
3621                }
3622                if withcoord {
3623                    arr.push(CommandResponse::Array(vec![
3624                        CommandResponse::BulkString(format!("{:.6}", lon).into_bytes()),
3625                        CommandResponse::BulkString(format!("{:.6}", lat).into_bytes()),
3626                    ]));
3627                }
3628                CommandResponse::Array(arr)
3629            })
3630            .collect();
3631
3632        CommandResponse::Array(items)
3633    }
3634
3635    // ─── Vector operations ────────────────────────────────────────────
3636
3637    #[cfg(feature = "vector")]
3638    fn cmd_vec_set(&mut self, key: &[u8], dimensions: usize, vector: &[f32]) -> CommandResponse {
3639        let compact = CompactKey::new(key);
3640        let index = self
3641            .vector_indexes
3642            .entry(compact)
3643            .or_insert_with(|| HnswIndex::new(dimensions, DistanceMetric::L2, 16, 200));
3644
3645        if index.dim() != dimensions {
3646            return CommandResponse::Error(format!(
3647                "ERR dimension mismatch: index has {}, got {}",
3648                index.dim(),
3649                dimensions
3650            ));
3651        }
3652
3653        let id = {
3654            let mut hasher = std::collections::hash_map::DefaultHasher::new();
3655            for &v in vector {
3656                std::hash::Hasher::write(&mut hasher, &v.to_le_bytes());
3657            }
3658            std::hash::Hasher::finish(&hasher)
3659        };
3660
3661        index.insert(id, vector);
3662        CommandResponse::Integer(id as i64)
3663    }
3664
3665    #[cfg(feature = "vector")]
3666    fn cmd_vec_query(&self, key: &[u8], k: usize, vector: &[f32]) -> CommandResponse {
3667        let compact = CompactKey::new(key);
3668        let index = match self.vector_indexes.get(&compact) {
3669            Some(idx) => idx,
3670            None => return CommandResponse::Array(vec![]),
3671        };
3672
3673        let results = index.search(vector, k, k.max(50));
3674        let items: Vec<CommandResponse> = results
3675            .into_iter()
3676            .map(|r| {
3677                CommandResponse::Array(vec![
3678                    CommandResponse::Integer(r.id as i64),
3679                    CommandResponse::BulkString(format!("{}", r.distance).into_bytes()),
3680                ])
3681            })
3682            .collect();
3683        CommandResponse::Array(items)
3684    }
3685
3686    #[cfg(feature = "vector")]
3687    fn cmd_vec_del(&mut self, key: &[u8]) -> CommandResponse {
3688        let compact = CompactKey::new(key);
3689        if self.vector_indexes.remove(&compact).is_some() {
3690            CommandResponse::Integer(1)
3691        } else {
3692            CommandResponse::Integer(0)
3693        }
3694    }
3695
3696    // ─── Helpers ─────────────────────────────────────────────────────
3697
3698    fn build_string_entry(
3699        key: &CompactKey,
3700        value: &[u8],
3701        ttl: Option<Duration>,
3702    ) -> (KeyEntry, usize) {
3703        let new_value = Value::from_raw_bytes(value);
3704        let entry_size = Self::estimate_key_entry_size(key.as_bytes(), &new_value);
3705        let mut entry = KeyEntry::new(key.clone(), new_value);
3706        if let Some(dur) = ttl {
3707            entry.set_ttl(dur);
3708        }
3709        (entry, entry_size)
3710    }
3711
3712    fn lazy_expire(&mut self, key: &[u8]) {
3713        if self.is_expired(key) {
3714            let compact = CompactKey::new(key);
3715            let _ = self.remove_compact_entry(&compact);
3716        }
3717    }
3718
3719    fn is_expired(&self, key: &[u8]) -> bool {
3720        self.entries
3721            .get(key)
3722            .is_some_and(|entry| entry.is_expired())
3723    }
3724
3725    /// Update the LFU counter for a key on read access.
3726    fn touch_key(&mut self, key: &CompactKey) {
3727        if let Some(entry) = self.entries.get_mut(key) {
3728            entry.touch_lfu();
3729        }
3730    }
3731
3732    /// Evict keys using LFU sampling when memory pressure is detected.
3733    fn maybe_evict(&mut self) {
3734        if self.max_memory == 0 || self.memory_used < self.max_memory {
3735            return;
3736        }
3737
3738        let sample_size = 5usize;
3739        let entry_count = self.entries.len();
3740        if entry_count == 0 {
3741            return;
3742        }
3743
3744        self.eviction_counter = self.eviction_counter.wrapping_add(1);
3745        let start = (self.eviction_counter as usize).wrapping_mul(self.shard_id as usize + 1)
3746            % entry_count.max(1);
3747
3748        let mut lowest_counter = u8::MAX;
3749        let mut lowest_key: Option<CompactKey> = None;
3750        let mut sampled = 0usize;
3751
3752        for (i, (key, entry)) in self.entries.iter().enumerate() {
3753            if i < start {
3754                continue;
3755            }
3756            if sampled >= sample_size {
3757                break;
3758            }
3759            sampled += 1;
3760            if entry.lfu_counter < lowest_counter {
3761                lowest_counter = entry.lfu_counter;
3762                lowest_key = Some(key.clone());
3763            }
3764        }
3765
3766        if lowest_key.is_none() {
3767            for (key, entry) in self.entries.iter().take(sample_size * 3) {
3768                if entry.lfu_counter < lowest_counter {
3769                    lowest_counter = entry.lfu_counter;
3770                    lowest_key = Some(key.clone());
3771                }
3772            }
3773        }
3774
3775        if let Some(key) = lowest_key {
3776            let _ = self.remove_compact_entry(&key);
3777        }
3778    }
3779
3780    fn remove_compact_entry(&mut self, key: &CompactKey) -> Option<KeyEntry> {
3781        let removed = self.entries.remove(key)?;
3782        let removed_size = Self::estimate_key_entry_size(key.as_bytes(), &removed.value);
3783        self.memory_used = self.memory_used.saturating_sub(removed_size);
3784        Some(removed)
3785    }
3786
3787    fn estimate_key_entry_size(key: &[u8], value: &Value) -> usize {
3788        key.len()
3789            + std::mem::size_of::<CompactKey>()
3790            + std::mem::size_of::<KeyEntry>()
3791            + value.estimated_size()
3792    }
3793
3794    fn get_or_create_list(
3795        &mut self,
3796        key: &CompactKey,
3797    ) -> Result<&mut VecDeque<Value>, CommandResponse> {
3798        if !self.entries.contains_key(key) {
3799            let entry = KeyEntry::new(key.clone(), Value::List(VecDeque::new()));
3800            self.entries.insert(key.clone(), entry);
3801        }
3802        let entry = self.entries.get_mut(key).ok_or_else(|| {
3803            CommandResponse::Error("ERR internal: key not found after insert".into())
3804        })?;
3805        match &mut entry.value {
3806            Value::List(deque) => Ok(deque),
3807            _ => Err(CommandResponse::Error(
3808                "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3809            )),
3810        }
3811    }
3812
3813    fn get_or_create_hash(
3814        &mut self,
3815        key: &CompactKey,
3816    ) -> Result<&mut HashMap<CompactKey, Value>, CommandResponse> {
3817        if !self.entries.contains_key(key) {
3818            let entry = KeyEntry::new(key.clone(), Value::Hash(HashMap::new()));
3819            self.entries.insert(key.clone(), entry);
3820        }
3821        let entry = self.entries.get_mut(key).ok_or_else(|| {
3822            CommandResponse::Error("ERR internal: key not found after insert".into())
3823        })?;
3824        match &mut entry.value {
3825            Value::Hash(map) => Ok(map),
3826            _ => Err(CommandResponse::Error(
3827                "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3828            )),
3829        }
3830    }
3831
3832    fn get_or_create_sorted_set(
3833        &mut self,
3834        key: &CompactKey,
3835    ) -> Result<&mut BTreeMap<Vec<u8>, f64>, CommandResponse> {
3836        if !self.entries.contains_key(key) {
3837            let entry = KeyEntry::new(key.clone(), Value::SortedSet(BTreeMap::new()));
3838            self.entries.insert(key.clone(), entry);
3839        }
3840        let entry = self.entries.get_mut(key).ok_or_else(|| {
3841            CommandResponse::Error("ERR internal: key not found after insert".into())
3842        })?;
3843        match &mut entry.value {
3844            Value::SortedSet(map) => Ok(map),
3845            _ => Err(CommandResponse::Error(
3846                "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3847            )),
3848        }
3849    }
3850
3851    fn get_or_create_set(
3852        &mut self,
3853        key: &CompactKey,
3854    ) -> Result<&mut HashSet<Value>, CommandResponse> {
3855        if !self.entries.contains_key(key) {
3856            let entry = KeyEntry::new(key.clone(), Value::Set(HashSet::new()));
3857            self.entries.insert(key.clone(), entry);
3858        }
3859        let entry = self.entries.get_mut(key).ok_or_else(|| {
3860            CommandResponse::Error("ERR internal: key not found after insert".into())
3861        })?;
3862        match &mut entry.value {
3863            Value::Set(set) => Ok(set),
3864            _ => Err(CommandResponse::Error(
3865                "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3866            )),
3867        }
3868    }
3869
3870    fn get_or_create_stream(
3871        &mut self,
3872        key: &CompactKey,
3873    ) -> Result<&mut StreamLog, CommandResponse> {
3874        if !self.entries.contains_key(key) {
3875            let log = StreamLog {
3876                entries: VecDeque::new(),
3877                last_id: StreamId { ms: 0, seq: 0 },
3878            };
3879            let entry = KeyEntry::new(key.clone(), Value::Stream(Box::new(log)));
3880            self.entries.insert(key.clone(), entry);
3881        }
3882        let entry = self.entries.get_mut(key).ok_or_else(|| {
3883            CommandResponse::Error("ERR internal: key not found after insert".into())
3884        })?;
3885        match &mut entry.value {
3886            Value::Stream(log) => Ok(log),
3887            _ => Err(CommandResponse::Error(
3888                "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3889            )),
3890        }
3891    }
3892
3893    fn cmd_xadd(
3894        &mut self,
3895        key: &[u8],
3896        id_arg: &[u8],
3897        fields: &[(Vec<u8>, Vec<u8>)],
3898        maxlen: Option<usize>,
3899    ) -> CommandResponse {
3900        self.lazy_expire(key);
3901        let compact = CompactKey::new(key);
3902        let is_new = !self.entries.contains_key(&compact);
3903
3904        let log = match self.get_or_create_stream(&compact) {
3905            Ok(log) => log,
3906            Err(e) => return e,
3907        };
3908
3909        let new_id = if id_arg == b"*" {
3910            let ms = std::time::SystemTime::now()
3911                .duration_since(std::time::UNIX_EPOCH)
3912                .map(|d| d.as_millis() as u64)
3913                .unwrap_or(0);
3914            if ms > log.last_id.ms {
3915                StreamId { ms, seq: 0 }
3916            } else {
3917                StreamId {
3918                    ms: log.last_id.ms,
3919                    seq: log.last_id.seq + 1,
3920                }
3921            }
3922        } else {
3923            match StreamId::parse(id_arg) {
3924                Some(parsed) => {
3925                    if parsed <= log.last_id {
3926                        return CommandResponse::Error(
3927                            "ERR The ID specified in XADD is equal or smaller than the target stream top item".into(),
3928                        );
3929                    }
3930                    parsed
3931                }
3932                None => {
3933                    return CommandResponse::Error(
3934                        "ERR Invalid stream ID specified as stream command argument".into(),
3935                    );
3936                }
3937            }
3938        };
3939
3940        let id_str = format!("{}", new_id).into_bytes();
3941        log.last_id = new_id.clone();
3942
3943        let mut memory_delta: usize = 0;
3944        if is_new {
3945            memory_delta +=
3946                key.len() + std::mem::size_of::<CompactKey>() + std::mem::size_of::<KeyEntry>();
3947        }
3948        memory_delta += std::mem::size_of::<StreamEntry>();
3949        for (k, v) in fields {
3950            memory_delta += k.len() + v.len();
3951        }
3952
3953        log.entries.push_back(StreamEntry {
3954            id: new_id,
3955            fields: fields.to_vec(),
3956        });
3957
3958        if let Some(max) = maxlen {
3959            while log.entries.len() > max {
3960                log.entries.pop_front();
3961            }
3962        }
3963
3964        self.memory_used += memory_delta;
3965        CommandResponse::BulkString(id_str)
3966    }
3967
3968    fn cmd_xlen(&mut self, key: &[u8]) -> CommandResponse {
3969        self.lazy_expire(key);
3970        let compact = CompactKey::new(key);
3971        match self.entries.get(&compact) {
3972            Some(entry) => match &entry.value {
3973                Value::Stream(log) => CommandResponse::Integer(log.entries.len() as i64),
3974                _ => CommandResponse::Error(
3975                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
3976                ),
3977            },
3978            None => CommandResponse::Integer(0),
3979        }
3980    }
3981
3982    fn parse_range_id(id_bytes: &[u8], is_start: bool) -> StreamId {
3983        if id_bytes == b"-" {
3984            StreamId::min_id()
3985        } else if id_bytes == b"+" {
3986            StreamId::max_id()
3987        } else {
3988            StreamId::parse(id_bytes).unwrap_or(if is_start {
3989                StreamId::min_id()
3990            } else {
3991                StreamId::max_id()
3992            })
3993        }
3994    }
3995
3996    fn format_stream_entry(entry: &StreamEntry) -> CommandResponse {
3997        let mut fields_resp: Vec<CommandResponse> = Vec::with_capacity(entry.fields.len() * 2);
3998        for (k, v) in &entry.fields {
3999            fields_resp.push(CommandResponse::BulkString(k.clone()));
4000            fields_resp.push(CommandResponse::BulkString(v.clone()));
4001        }
4002        CommandResponse::Array(vec![
4003            CommandResponse::BulkString(format!("{}", entry.id).into_bytes()),
4004            CommandResponse::Array(fields_resp),
4005        ])
4006    }
4007
4008    fn cmd_xrange(
4009        &mut self,
4010        key: &[u8],
4011        start: &[u8],
4012        end: &[u8],
4013        count: Option<usize>,
4014    ) -> CommandResponse {
4015        self.lazy_expire(key);
4016        let compact = CompactKey::new(key);
4017        match self.entries.get(&compact) {
4018            Some(entry) => match &entry.value {
4019                Value::Stream(log) => {
4020                    let start_id = Self::parse_range_id(start, true);
4021                    let end_id = Self::parse_range_id(end, false);
4022                    let limit = count.unwrap_or(usize::MAX);
4023
4024                    let results: Vec<CommandResponse> = log
4025                        .entries
4026                        .iter()
4027                        .filter(|e| e.id >= start_id && e.id <= end_id)
4028                        .take(limit)
4029                        .map(Self::format_stream_entry)
4030                        .collect();
4031
4032                    CommandResponse::Array(results)
4033                }
4034                _ => CommandResponse::Error(
4035                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4036                ),
4037            },
4038            None => CommandResponse::Array(vec![]),
4039        }
4040    }
4041
4042    fn cmd_xrevrange(
4043        &mut self,
4044        key: &[u8],
4045        start: &[u8],
4046        end: &[u8],
4047        count: Option<usize>,
4048    ) -> CommandResponse {
4049        self.lazy_expire(key);
4050        let compact = CompactKey::new(key);
4051        match self.entries.get(&compact) {
4052            Some(entry) => match &entry.value {
4053                Value::Stream(log) => {
4054                    let start_id = Self::parse_range_id(start, false);
4055                    let end_id = Self::parse_range_id(end, true);
4056                    let limit = count.unwrap_or(usize::MAX);
4057
4058                    let results: Vec<CommandResponse> = log
4059                        .entries
4060                        .iter()
4061                        .rev()
4062                        .filter(|e| e.id >= end_id && e.id <= start_id)
4063                        .take(limit)
4064                        .map(Self::format_stream_entry)
4065                        .collect();
4066
4067                    CommandResponse::Array(results)
4068                }
4069                _ => CommandResponse::Error(
4070                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4071                ),
4072            },
4073            None => CommandResponse::Array(vec![]),
4074        }
4075    }
4076
4077    fn cmd_xread(
4078        &mut self,
4079        keys: &[Vec<u8>],
4080        ids: &[Vec<u8>],
4081        count: Option<usize>,
4082    ) -> CommandResponse {
4083        if keys.len() != ids.len() {
4084            return CommandResponse::Error(
4085                "ERR Unbalanced XREAD list of streams: for each stream key an ID must be specified"
4086                    .into(),
4087            );
4088        }
4089
4090        let limit = count.unwrap_or(usize::MAX);
4091        let mut results: Vec<CommandResponse> = Vec::new();
4092
4093        for (key, id_bytes) in keys.iter().zip(ids.iter()) {
4094            self.lazy_expire(key);
4095            let compact = CompactKey::new(key);
4096            if let Some(entry) = self.entries.get(&compact) {
4097                if let Value::Stream(log) = &entry.value {
4098                    let after_id = if id_bytes == b"$" {
4099                        log.last_id.clone()
4100                    } else {
4101                        match StreamId::parse(id_bytes) {
4102                            Some(id) => id,
4103                            None => continue,
4104                        }
4105                    };
4106
4107                    let entries: Vec<CommandResponse> = log
4108                        .entries
4109                        .iter()
4110                        .filter(|e| e.id > after_id)
4111                        .take(limit)
4112                        .map(Self::format_stream_entry)
4113                        .collect();
4114
4115                    if !entries.is_empty() {
4116                        results.push(CommandResponse::Array(vec![
4117                            CommandResponse::BulkString(key.clone()),
4118                            CommandResponse::Array(entries),
4119                        ]));
4120                    }
4121                }
4122            }
4123        }
4124
4125        if results.is_empty() {
4126            CommandResponse::Nil
4127        } else {
4128            CommandResponse::Array(results)
4129        }
4130    }
4131
4132    fn cmd_xtrim(&mut self, key: &[u8], maxlen: usize) -> CommandResponse {
4133        self.lazy_expire(key);
4134        let compact = CompactKey::new(key);
4135        match self.entries.get_mut(&compact) {
4136            Some(entry) => match &mut entry.value {
4137                Value::Stream(log) => {
4138                    let mut trimmed = 0i64;
4139                    while log.entries.len() > maxlen {
4140                        log.entries.pop_front();
4141                        trimmed += 1;
4142                    }
4143                    CommandResponse::Integer(trimmed)
4144                }
4145                _ => CommandResponse::Error(
4146                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4147                ),
4148            },
4149            None => CommandResponse::Integer(0),
4150        }
4151    }
4152
4153    fn cmd_xdel(&mut self, key: &[u8], ids: &[Vec<u8>]) -> CommandResponse {
4154        self.lazy_expire(key);
4155        let compact = CompactKey::new(key);
4156        match self.entries.get_mut(&compact) {
4157            Some(entry) => match &mut entry.value {
4158                Value::Stream(log) => {
4159                    let mut deleted = 0i64;
4160                    for id_bytes in ids {
4161                        if let Some(sid) = StreamId::parse(id_bytes) {
4162                            let before = log.entries.len();
4163                            log.entries.retain(|e| e.id != sid);
4164                            if log.entries.len() < before {
4165                                deleted += 1;
4166                            }
4167                        }
4168                    }
4169                    CommandResponse::Integer(deleted)
4170                }
4171                _ => CommandResponse::Error(
4172                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4173                ),
4174            },
4175            None => CommandResponse::Integer(0),
4176        }
4177    }
4178
4179    fn cmd_xgroup_create(
4180        &mut self,
4181        key: &[u8],
4182        group: &str,
4183        id: &str,
4184        mkstream: bool,
4185    ) -> CommandResponse {
4186        self.lazy_expire(key);
4187        let compact = CompactKey::new(key);
4188
4189        if !self.entries.contains_key(&compact) {
4190            if !mkstream {
4191                return CommandResponse::Error(
4192                    "ERR The XGROUP subcommand requires the key to exist".into(),
4193                );
4194            }
4195            let log = StreamLog {
4196                entries: VecDeque::new(),
4197                last_id: StreamId { ms: 0, seq: 0 },
4198            };
4199            let entry = KeyEntry::new(compact.clone(), Value::Stream(Box::new(log)));
4200            self.entries.insert(compact.clone(), entry);
4201        }
4202
4203        match self.entries.get(&compact) {
4204            Some(entry) => match &entry.value {
4205                Value::Stream(log) => {
4206                    let last_delivered_id = if id == "$" {
4207                        log.last_id.clone()
4208                    } else if id == "0" || id == "0-0" {
4209                        StreamId::min_id()
4210                    } else {
4211                        StreamId::parse(id.as_bytes()).unwrap_or(StreamId::min_id())
4212                    };
4213                    let groups = self.stream_groups.entry(compact).or_default();
4214                    if groups.contains_key(group) {
4215                        return CommandResponse::Error(
4216                            "BUSYGROUP Consumer Group name already exists".into(),
4217                        );
4218                    }
4219                    groups.insert(
4220                        group.to_string(),
4221                        StreamConsumerGroup {
4222                            last_delivered_id,
4223                            pel: HashMap::new(),
4224                            consumers: HashMap::new(),
4225                        },
4226                    );
4227                    CommandResponse::Ok
4228                }
4229                _ => CommandResponse::Error(
4230                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4231                ),
4232            },
4233            None => CommandResponse::Error("ERR internal error".into()),
4234        }
4235    }
4236
4237    fn cmd_xgroup_destroy(&mut self, key: &[u8], group: &str) -> CommandResponse {
4238        let compact = CompactKey::new(key);
4239        if let Some(groups) = self.stream_groups.get_mut(&compact) {
4240            if groups.remove(group).is_some() {
4241                return CommandResponse::Integer(1);
4242            }
4243        }
4244        CommandResponse::Integer(0)
4245    }
4246
4247    fn cmd_xgroup_delconsumer(
4248        &mut self,
4249        key: &[u8],
4250        group: &str,
4251        consumer: &str,
4252    ) -> CommandResponse {
4253        let compact = CompactKey::new(key);
4254        if let Some(groups) = self.stream_groups.get_mut(&compact) {
4255            if let Some(grp) = groups.get_mut(group) {
4256                if let Some(state) = grp.consumers.remove(consumer) {
4257                    let pending_count = state.pending.len() as i64;
4258                    for sid in &state.pending {
4259                        grp.pel.remove(sid);
4260                    }
4261                    return CommandResponse::Integer(pending_count);
4262                }
4263            }
4264        }
4265        CommandResponse::Integer(0)
4266    }
4267
4268    fn cmd_xreadgroup(
4269        &mut self,
4270        group: &str,
4271        consumer: &str,
4272        count: Option<usize>,
4273        keys: &[Vec<u8>],
4274        ids: &[Vec<u8>],
4275    ) -> CommandResponse {
4276        if keys.len() != ids.len() {
4277            return CommandResponse::Error(
4278                "ERR Unbalanced XREADGROUP list of streams: for each stream key an ID must be specified"
4279                    .into(),
4280            );
4281        }
4282
4283        let limit = count.unwrap_or(usize::MAX);
4284        let mut results: Vec<CommandResponse> = Vec::new();
4285        let now_ms = std::time::SystemTime::now()
4286            .duration_since(std::time::UNIX_EPOCH)
4287            .map(|d| d.as_millis() as u64)
4288            .unwrap_or(0);
4289
4290        for (key, id_bytes) in keys.iter().zip(ids.iter()) {
4291            self.lazy_expire(key);
4292            let compact = CompactKey::new(key);
4293
4294            let is_new_msgs = id_bytes == b">";
4295
4296            let stream_entries: Vec<StreamEntry> = if let Some(entry) = self.entries.get(&compact) {
4297                if let Value::Stream(log) = &entry.value {
4298                    if is_new_msgs {
4299                        let last_delivered = self
4300                            .stream_groups
4301                            .get(&compact)
4302                            .and_then(|gs| gs.get(group))
4303                            .map(|g| g.last_delivered_id.clone())
4304                            .unwrap_or(StreamId::min_id());
4305                        log.entries
4306                            .iter()
4307                            .filter(|e| e.id > last_delivered)
4308                            .take(limit)
4309                            .cloned()
4310                            .collect()
4311                    } else {
4312                        let groups = self.stream_groups.get(&compact);
4313                        let grp = groups.and_then(|gs| gs.get(group));
4314                        if let Some(grp) = grp {
4315                            let consumer_state = grp.consumers.get(consumer);
4316                            let pending_ids: Vec<StreamId> = consumer_state
4317                                .map(|cs| {
4318                                    let mut ids: Vec<StreamId> =
4319                                        cs.pending.iter().cloned().collect();
4320                                    ids.sort();
4321                                    ids
4322                                })
4323                                .unwrap_or_default();
4324                            pending_ids
4325                                .into_iter()
4326                                .take(limit)
4327                                .filter_map(|sid| log.entries.iter().find(|e| e.id == sid).cloned())
4328                                .collect()
4329                        } else {
4330                            Vec::new()
4331                        }
4332                    }
4333                } else {
4334                    continue;
4335                }
4336            } else {
4337                continue;
4338            };
4339
4340            if is_new_msgs && !stream_entries.is_empty() {
4341                let groups = self.stream_groups.entry(compact.clone()).or_default();
4342                let grp = groups
4343                    .entry(group.to_string())
4344                    .or_insert_with(|| StreamConsumerGroup {
4345                        last_delivered_id: StreamId::min_id(),
4346                        pel: HashMap::new(),
4347                        consumers: HashMap::new(),
4348                    });
4349                let cs = grp.consumers.entry(consumer.to_string()).or_default();
4350                for se in &stream_entries {
4351                    if se.id > grp.last_delivered_id {
4352                        grp.last_delivered_id = se.id.clone();
4353                    }
4354                    grp.pel.insert(
4355                        se.id.clone(),
4356                        PendingEntry {
4357                            consumer: consumer.to_string(),
4358                            delivery_time: now_ms,
4359                            delivery_count: 1,
4360                        },
4361                    );
4362                    cs.pending.insert(se.id.clone());
4363                }
4364            }
4365
4366            if !stream_entries.is_empty() {
4367                let entries_resp: Vec<CommandResponse> = stream_entries
4368                    .iter()
4369                    .map(Self::format_stream_entry)
4370                    .collect();
4371                results.push(CommandResponse::Array(vec![
4372                    CommandResponse::BulkString(key.clone()),
4373                    CommandResponse::Array(entries_resp),
4374                ]));
4375            }
4376        }
4377
4378        if results.is_empty() {
4379            CommandResponse::Nil
4380        } else {
4381            CommandResponse::Array(results)
4382        }
4383    }
4384
4385    fn cmd_xack(&mut self, key: &[u8], group: &str, ids: &[Vec<u8>]) -> CommandResponse {
4386        let compact = CompactKey::new(key);
4387        let mut acked = 0i64;
4388        if let Some(groups) = self.stream_groups.get_mut(&compact) {
4389            if let Some(grp) = groups.get_mut(group) {
4390                for id_bytes in ids {
4391                    if let Some(sid) = StreamId::parse(id_bytes) {
4392                        if let Some(pe) = grp.pel.remove(&sid) {
4393                            if let Some(cs) = grp.consumers.get_mut(&pe.consumer) {
4394                                cs.pending.remove(&sid);
4395                            }
4396                            acked += 1;
4397                        }
4398                    }
4399                }
4400            }
4401        }
4402        CommandResponse::Integer(acked)
4403    }
4404
4405    fn cmd_xpending(
4406        &mut self,
4407        key: &[u8],
4408        group: &str,
4409        start: Option<&[u8]>,
4410        end: Option<&[u8]>,
4411        count: Option<usize>,
4412    ) -> CommandResponse {
4413        let compact = CompactKey::new(key);
4414        let groups = self.stream_groups.get(&compact);
4415        let grp = match groups.and_then(|gs| gs.get(group)) {
4416            Some(g) => g,
4417            None => {
4418                return CommandResponse::Error("NOGROUP No such consumer group for key".into());
4419            }
4420        };
4421
4422        if let (Some(s), Some(e), Some(c)) = (start, end, count) {
4423            let start_id = Self::parse_range_id(s, true);
4424            let end_id = Self::parse_range_id(e, false);
4425            let limit = c;
4426
4427            let mut entries: Vec<(&StreamId, &PendingEntry)> = grp
4428                .pel
4429                .iter()
4430                .filter(|(sid, _)| **sid >= start_id && **sid <= end_id)
4431                .collect();
4432            entries.sort_by_key(|(sid, _)| (*sid).clone());
4433            entries.truncate(limit);
4434
4435            let results: Vec<CommandResponse> = entries
4436                .into_iter()
4437                .map(|(sid, pe)| {
4438                    CommandResponse::Array(vec![
4439                        CommandResponse::BulkString(format!("{}", sid).into_bytes()),
4440                        CommandResponse::BulkString(pe.consumer.as_bytes().to_vec()),
4441                        CommandResponse::Integer(
4442                            std::time::SystemTime::now()
4443                                .duration_since(std::time::UNIX_EPOCH)
4444                                .map(|d| d.as_millis() as u64)
4445                                .unwrap_or(0)
4446                                .saturating_sub(pe.delivery_time)
4447                                as i64,
4448                        ),
4449                        CommandResponse::Integer(pe.delivery_count as i64),
4450                    ])
4451                })
4452                .collect();
4453
4454            return CommandResponse::Array(results);
4455        }
4456
4457        let total_pending = grp.pel.len() as i64;
4458        let min_id = grp
4459            .pel
4460            .keys()
4461            .min()
4462            .map(|id| format!("{}", id))
4463            .unwrap_or_default();
4464        let max_id = grp
4465            .pel
4466            .keys()
4467            .max()
4468            .map(|id| format!("{}", id))
4469            .unwrap_or_default();
4470
4471        let mut consumer_counts: HashMap<&str, i64> = HashMap::new();
4472        for pe in grp.pel.values() {
4473            *consumer_counts.entry(&pe.consumer).or_insert(0) += 1;
4474        }
4475        let consumers_resp: Vec<CommandResponse> = consumer_counts
4476            .into_iter()
4477            .map(|(name, count)| {
4478                CommandResponse::Array(vec![
4479                    CommandResponse::BulkString(name.as_bytes().to_vec()),
4480                    CommandResponse::BulkString(count.to_string().into_bytes()),
4481                ])
4482            })
4483            .collect();
4484
4485        CommandResponse::Array(vec![
4486            CommandResponse::Integer(total_pending),
4487            CommandResponse::BulkString(min_id.into_bytes()),
4488            CommandResponse::BulkString(max_id.into_bytes()),
4489            CommandResponse::Array(consumers_resp),
4490        ])
4491    }
4492
4493    fn cmd_xclaim(
4494        &mut self,
4495        key: &[u8],
4496        group: &str,
4497        consumer: &str,
4498        min_idle_time: u64,
4499        ids: &[Vec<u8>],
4500    ) -> CommandResponse {
4501        let compact = CompactKey::new(key);
4502        let now_ms = std::time::SystemTime::now()
4503            .duration_since(std::time::UNIX_EPOCH)
4504            .map(|d| d.as_millis() as u64)
4505            .unwrap_or(0);
4506
4507        let stream_ok = self
4508            .entries
4509            .get(&compact)
4510            .map(|e| matches!(&e.value, Value::Stream(_)))
4511            .unwrap_or(false);
4512
4513        if !stream_ok {
4514            return CommandResponse::Array(vec![]);
4515        }
4516
4517        let groups = match self.stream_groups.get_mut(&compact) {
4518            Some(g) => g,
4519            None => return CommandResponse::Array(vec![]),
4520        };
4521        let grp = match groups.get_mut(group) {
4522            Some(g) => g,
4523            None => return CommandResponse::Array(vec![]),
4524        };
4525
4526        let mut claimed_ids: Vec<StreamId> = Vec::new();
4527
4528        for id_bytes in ids {
4529            if let Some(sid) = StreamId::parse(id_bytes) {
4530                if let Some(pe) = grp.pel.get_mut(&sid) {
4531                    let idle = now_ms.saturating_sub(pe.delivery_time);
4532                    if idle >= min_idle_time {
4533                        let old_consumer = pe.consumer.clone();
4534                        pe.consumer = consumer.to_string();
4535                        pe.delivery_time = now_ms;
4536                        pe.delivery_count += 1;
4537
4538                        if old_consumer != consumer {
4539                            if let Some(cs) = grp.consumers.get_mut(&old_consumer) {
4540                                cs.pending.remove(&sid);
4541                            }
4542                        }
4543                        grp.consumers
4544                            .entry(consumer.to_string())
4545                            .or_default()
4546                            .pending
4547                            .insert(sid.clone());
4548                        claimed_ids.push(sid);
4549                    }
4550                }
4551            }
4552        }
4553
4554        let entry_map = self.entries.get(&compact);
4555        let results: Vec<CommandResponse> = if let Some(entry) = entry_map {
4556            if let Value::Stream(log) = &entry.value {
4557                claimed_ids
4558                    .iter()
4559                    .filter_map(|sid| {
4560                        log.entries
4561                            .iter()
4562                            .find(|e| e.id == *sid)
4563                            .map(Self::format_stream_entry)
4564                    })
4565                    .collect()
4566            } else {
4567                vec![]
4568            }
4569        } else {
4570            vec![]
4571        };
4572
4573        CommandResponse::Array(results)
4574    }
4575
4576    fn cmd_xautoclaim(
4577        &mut self,
4578        key: &[u8],
4579        group: &str,
4580        consumer: &str,
4581        min_idle_time: u64,
4582        start: &[u8],
4583        count: Option<usize>,
4584    ) -> CommandResponse {
4585        let compact = CompactKey::new(key);
4586        let now_ms = std::time::SystemTime::now()
4587            .duration_since(std::time::UNIX_EPOCH)
4588            .map(|d| d.as_millis() as u64)
4589            .unwrap_or(0);
4590        let start_id = Self::parse_range_id(start, true);
4591        let limit = count.unwrap_or(100);
4592
4593        let stream_ok = self
4594            .entries
4595            .get(&compact)
4596            .map(|e| matches!(&e.value, Value::Stream(_)))
4597            .unwrap_or(false);
4598
4599        if !stream_ok {
4600            return CommandResponse::Array(vec![
4601                CommandResponse::BulkString(b"0-0".to_vec()),
4602                CommandResponse::Array(vec![]),
4603                CommandResponse::Array(vec![]),
4604            ]);
4605        }
4606
4607        let groups = match self.stream_groups.get_mut(&compact) {
4608            Some(g) => g,
4609            None => {
4610                return CommandResponse::Array(vec![
4611                    CommandResponse::BulkString(b"0-0".to_vec()),
4612                    CommandResponse::Array(vec![]),
4613                    CommandResponse::Array(vec![]),
4614                ])
4615            }
4616        };
4617        let grp = match groups.get_mut(group) {
4618            Some(g) => g,
4619            None => {
4620                return CommandResponse::Array(vec![
4621                    CommandResponse::BulkString(b"0-0".to_vec()),
4622                    CommandResponse::Array(vec![]),
4623                    CommandResponse::Array(vec![]),
4624                ])
4625            }
4626        };
4627
4628        let mut eligible: Vec<StreamId> = grp
4629            .pel
4630            .iter()
4631            .filter(|(sid, pe)| {
4632                **sid >= start_id && now_ms.saturating_sub(pe.delivery_time) >= min_idle_time
4633            })
4634            .map(|(sid, _)| sid.clone())
4635            .collect();
4636        eligible.sort();
4637        eligible.truncate(limit);
4638
4639        let next_start = eligible
4640            .last()
4641            .map(|sid| StreamId {
4642                ms: sid.ms,
4643                seq: sid.seq + 1,
4644            })
4645            .unwrap_or(StreamId::min_id());
4646
4647        let mut claimed_ids: Vec<StreamId> = Vec::new();
4648        for sid in &eligible {
4649            if let Some(pe) = grp.pel.get_mut(sid) {
4650                let old_consumer = pe.consumer.clone();
4651                pe.consumer = consumer.to_string();
4652                pe.delivery_time = now_ms;
4653                pe.delivery_count += 1;
4654                if old_consumer != consumer {
4655                    if let Some(cs) = grp.consumers.get_mut(&old_consumer) {
4656                        cs.pending.remove(sid);
4657                    }
4658                }
4659                grp.consumers
4660                    .entry(consumer.to_string())
4661                    .or_default()
4662                    .pending
4663                    .insert(sid.clone());
4664                claimed_ids.push(sid.clone());
4665            }
4666        }
4667
4668        let entry_map = self.entries.get(&compact);
4669        let entries_resp: Vec<CommandResponse> = if let Some(entry) = entry_map {
4670            if let Value::Stream(log) = &entry.value {
4671                claimed_ids
4672                    .iter()
4673                    .filter_map(|sid| {
4674                        log.entries
4675                            .iter()
4676                            .find(|e| e.id == *sid)
4677                            .map(Self::format_stream_entry)
4678                    })
4679                    .collect()
4680            } else {
4681                vec![]
4682            }
4683        } else {
4684            vec![]
4685        };
4686
4687        CommandResponse::Array(vec![
4688            CommandResponse::BulkString(format!("{}", next_start).into_bytes()),
4689            CommandResponse::Array(entries_resp),
4690            CommandResponse::Array(vec![]),
4691        ])
4692    }
4693
4694    fn cmd_xinfo_stream(&mut self, key: &[u8]) -> CommandResponse {
4695        self.lazy_expire(key);
4696        let compact = CompactKey::new(key);
4697        match self.entries.get(&compact) {
4698            Some(entry) => match &entry.value {
4699                Value::Stream(log) => {
4700                    let length = log.entries.len() as i64;
4701                    let groups_count = self
4702                        .stream_groups
4703                        .get(&compact)
4704                        .map(|gs| gs.len() as i64)
4705                        .unwrap_or(0);
4706
4707                    CommandResponse::Array(vec![
4708                        CommandResponse::BulkString(b"length".to_vec()),
4709                        CommandResponse::Integer(length),
4710                        CommandResponse::BulkString(b"first-entry".to_vec()),
4711                        if let Some(first) = log.entries.front() {
4712                            Self::format_stream_entry(first)
4713                        } else {
4714                            CommandResponse::Nil
4715                        },
4716                        CommandResponse::BulkString(b"last-entry".to_vec()),
4717                        if let Some(last) = log.entries.back() {
4718                            Self::format_stream_entry(last)
4719                        } else {
4720                            CommandResponse::Nil
4721                        },
4722                        CommandResponse::BulkString(b"last-generated-id".to_vec()),
4723                        CommandResponse::BulkString(format!("{}", log.last_id).into_bytes()),
4724                        CommandResponse::BulkString(b"groups".to_vec()),
4725                        CommandResponse::Integer(groups_count),
4726                    ])
4727                }
4728                _ => CommandResponse::Error(
4729                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4730                ),
4731            },
4732            None => CommandResponse::Error("ERR no such key".into()),
4733        }
4734    }
4735
4736    fn cmd_xinfo_groups(&mut self, key: &[u8]) -> CommandResponse {
4737        self.lazy_expire(key);
4738        let compact = CompactKey::new(key);
4739        match self.entries.get(&compact) {
4740            Some(entry) => match &entry.value {
4741                Value::Stream(_) => {
4742                    let groups = self.stream_groups.get(&compact);
4743                    let results: Vec<CommandResponse> = groups
4744                        .map(|gs| {
4745                            gs.iter()
4746                                .map(|(name, grp)| {
4747                                    CommandResponse::Array(vec![
4748                                        CommandResponse::BulkString(b"name".to_vec()),
4749                                        CommandResponse::BulkString(name.as_bytes().to_vec()),
4750                                        CommandResponse::BulkString(b"consumers".to_vec()),
4751                                        CommandResponse::Integer(grp.consumers.len() as i64),
4752                                        CommandResponse::BulkString(b"pending".to_vec()),
4753                                        CommandResponse::Integer(grp.pel.len() as i64),
4754                                        CommandResponse::BulkString(b"last-delivered-id".to_vec()),
4755                                        CommandResponse::BulkString(
4756                                            format!("{}", grp.last_delivered_id).into_bytes(),
4757                                        ),
4758                                    ])
4759                                })
4760                                .collect()
4761                        })
4762                        .unwrap_or_default();
4763                    CommandResponse::Array(results)
4764                }
4765                _ => CommandResponse::Error(
4766                    "WRONGTYPE Operation against a key holding the wrong kind of value".into(),
4767                ),
4768            },
4769            None => CommandResponse::Error("ERR no such key".into()),
4770        }
4771    }
4772}
4773
4774/// Format a float with the minimum precision needed for round-trip fidelity.
4775fn format_float(f: f64) -> String {
4776    if f.fract() == 0.0 && f.abs() < 1e17 {
4777        let i = f as i64;
4778        if (i as f64) == f {
4779            return format!("{i}");
4780        }
4781    }
4782    for precision in 0..=17 {
4783        let s = format!("{f:.precision$}");
4784        if s.parse::<f64>().ok() == Some(f) {
4785            return s;
4786        }
4787    }
4788    format!("{f:.17}")
4789}
4790
4791fn normalize_index(index: i64, len: i64) -> usize {
4792    if index < 0 {
4793        let normalized = len + index;
4794        if normalized < 0 {
4795            0
4796        } else {
4797            normalized as usize
4798        }
4799    } else {
4800        index as usize
4801    }
4802}
4803
4804/// Simple glob pattern matching (supports * and ?).
4805fn glob_match(pattern: &str, key: &[u8]) -> bool {
4806    let key_str = match std::str::from_utf8(key) {
4807        Ok(s) => s,
4808        Err(_) => return false,
4809    };
4810    if pattern == "*" {
4811        return true;
4812    }
4813    glob_match_iterative(pattern.as_bytes(), key_str.as_bytes())
4814}
4815
4816/// Iterative glob matching — O(n*m) worst case, no exponential blowup.
4817fn glob_match_iterative(pattern: &[u8], text: &[u8]) -> bool {
4818    let mut pi = 0; // pattern index
4819    let mut ti = 0; // text index
4820    let mut star_pi = usize::MAX; // pattern index after last *
4821    let mut star_ti = 0; // text index when last * was matched
4822
4823    while ti < text.len() {
4824        if pi < pattern.len() && (pattern[pi] == b'?' || pattern[pi] == text[ti]) {
4825            pi += 1;
4826            ti += 1;
4827        } else if pi < pattern.len() && pattern[pi] == b'*' {
4828            star_pi = pi;
4829            star_ti = ti;
4830            pi += 1; // try matching * with empty string first
4831        } else if star_pi != usize::MAX {
4832            // backtrack: let * match one more character
4833            pi = star_pi + 1;
4834            star_ti += 1;
4835            ti = star_ti;
4836        } else {
4837            return false;
4838        }
4839    }
4840
4841    // Consume trailing *'s in pattern
4842    while pi < pattern.len() && pattern[pi] == b'*' {
4843        pi += 1;
4844    }
4845
4846    pi == pattern.len()
4847}
4848
4849fn lex_gte_min(member: &[u8], min: &[u8]) -> bool {
4850    if min.is_empty() {
4851        return true;
4852    }
4853    match min[0] {
4854        b'-' => true,
4855        b'+' => false,
4856        b'[' => member >= &min[1..],
4857        b'(' => member > &min[1..],
4858        _ => true,
4859    }
4860}
4861
4862fn lex_lte_max(member: &[u8], max: &[u8]) -> bool {
4863    if max.is_empty() {
4864        return true;
4865    }
4866    match max[0] {
4867        b'+' => true,
4868        b'-' => false,
4869        b'[' => member <= &max[1..],
4870        b'(' => member < &max[1..],
4871        _ => true,
4872    }
4873}
4874
4875fn lex_in_range(member: &[u8], min: &[u8], max: &[u8]) -> bool {
4876    lex_gte_min(member, min) && lex_lte_max(member, max)
4877}
4878
4879// ─── HyperLogLog algorithm ──────────────────────────────────────────
4880
4881const HLL_P: usize = 14;
4882const HLL_REGISTERS: usize = 1 << HLL_P;
4883const HLL_REGISTER_BYTES: usize = HLL_REGISTERS * 6 / 8;
4884
4885fn hll_hash(element: &[u8]) -> u64 {
4886    use std::hash::{Hash, Hasher};
4887    let mut hasher = ahash::AHasher::default();
4888    element.hash(&mut hasher);
4889    hasher.finish()
4890}
4891
4892fn hll_register_get(registers: &[u8], index: usize) -> u8 {
4893    let bit_offset = index * 6;
4894    let byte_offset = bit_offset / 8;
4895    let bit_shift = bit_offset % 8;
4896
4897    if bit_shift <= 2 {
4898        (registers[byte_offset] >> bit_shift) & 0x3F
4899    } else {
4900        let lo = registers[byte_offset] >> bit_shift;
4901        let hi = if byte_offset + 1 < registers.len() {
4902            registers[byte_offset + 1] << (8 - bit_shift)
4903        } else {
4904            0
4905        };
4906        (lo | hi) & 0x3F
4907    }
4908}
4909
4910fn hll_register_set(registers: &mut [u8], index: usize, value: u8) {
4911    let bit_offset = index * 6;
4912    let byte_offset = bit_offset / 8;
4913    let bit_shift = bit_offset % 8;
4914
4915    let mask_lo = !(0x3F_u8 << bit_shift);
4916    registers[byte_offset] = (registers[byte_offset] & mask_lo) | ((value & 0x3F) << bit_shift);
4917
4918    if bit_shift > 2 && byte_offset + 1 < registers.len() {
4919        let bits_in_first = 8 - bit_shift;
4920        let mask_hi = !((0x3F >> bits_in_first) as u8);
4921        registers[byte_offset + 1] =
4922            (registers[byte_offset + 1] & mask_hi) | ((value & 0x3F) >> bits_in_first);
4923    }
4924}
4925
4926fn hll_add(registers: &mut [u8], element: &[u8]) -> bool {
4927    let hash = hll_hash(element);
4928    let index = (hash & ((1 << HLL_P) - 1)) as usize;
4929    let remaining = hash >> HLL_P;
4930    let rank = if remaining == 0 {
4931        (64 - HLL_P) as u8 + 1
4932    } else {
4933        (remaining.trailing_zeros() + 1) as u8
4934    };
4935
4936    let current = hll_register_get(registers, index);
4937    if rank > current {
4938        hll_register_set(registers, index, rank);
4939        true
4940    } else {
4941        false
4942    }
4943}
4944
4945fn hll_count(registers: &[u8]) -> u64 {
4946    let m = HLL_REGISTERS as f64;
4947    let alpha = match HLL_REGISTERS {
4948        16 => 0.673,
4949        32 => 0.697,
4950        64 => 0.709,
4951        _ => 0.7213 / (1.0 + 1.079 / m),
4952    };
4953
4954    let mut sum = 0.0f64;
4955    let mut zeros = 0u32;
4956
4957    for i in 0..HLL_REGISTERS {
4958        let val = hll_register_get(registers, i);
4959        sum += 1.0 / (1u64 << val) as f64;
4960        if val == 0 {
4961            zeros += 1;
4962        }
4963    }
4964
4965    let mut estimate = alpha * m * m / sum;
4966
4967    if estimate <= 2.5 * m && zeros > 0 {
4968        estimate = m * (m / zeros as f64).ln();
4969    }
4970
4971    estimate.round() as u64
4972}
4973
4974fn hll_merge(dest: &mut [u8], src: &[u8]) {
4975    for i in 0..HLL_REGISTERS {
4976        let src_val = hll_register_get(src, i);
4977        let dst_val = hll_register_get(dest, i);
4978        if src_val > dst_val {
4979            hll_register_set(dest, i, src_val);
4980        }
4981    }
4982}
4983
4984// ─── Geospatial helpers ──────────────────────────────────────────────
4985
4986const GEO_HASH_BITS: u32 = 52;
4987
4988fn geo_encode(lon: f64, lat: f64) -> f64 {
4989    let lon_norm = (lon + 180.0) / 360.0;
4990    let lat_norm = (lat + 90.0) / 180.0;
4991
4992    let mut hash: u64 = 0;
4993    let mut lon_min = 0.0f64;
4994    let mut lon_max = 1.0f64;
4995    let mut lat_min = 0.0f64;
4996    let mut lat_max = 1.0f64;
4997
4998    for i in 0..GEO_HASH_BITS {
4999        if i % 2 == 0 {
5000            let mid = (lon_min + lon_max) / 2.0;
5001            if lon_norm >= mid {
5002                hash |= 1 << (GEO_HASH_BITS - 1 - i);
5003                lon_min = mid;
5004            } else {
5005                lon_max = mid;
5006            }
5007        } else {
5008            let mid = (lat_min + lat_max) / 2.0;
5009            if lat_norm >= mid {
5010                hash |= 1 << (GEO_HASH_BITS - 1 - i);
5011                lat_min = mid;
5012            } else {
5013                lat_max = mid;
5014            }
5015        }
5016    }
5017
5018    f64::from_bits(hash)
5019}
5020
5021fn geo_decode(score: f64) -> (f64, f64) {
5022    let hash = score.to_bits();
5023
5024    let mut lon_min = 0.0f64;
5025    let mut lon_max = 1.0f64;
5026    let mut lat_min = 0.0f64;
5027    let mut lat_max = 1.0f64;
5028
5029    for i in 0..GEO_HASH_BITS {
5030        let bit = (hash >> (GEO_HASH_BITS - 1 - i)) & 1;
5031        if i % 2 == 0 {
5032            let mid = (lon_min + lon_max) / 2.0;
5033            if bit == 1 {
5034                lon_min = mid;
5035            } else {
5036                lon_max = mid;
5037            }
5038        } else {
5039            let mid = (lat_min + lat_max) / 2.0;
5040            if bit == 1 {
5041                lat_min = mid;
5042            } else {
5043                lat_max = mid;
5044            }
5045        }
5046    }
5047
5048    let lon = (lon_min + lon_max) / 2.0 * 360.0 - 180.0;
5049    let lat = (lat_min + lat_max) / 2.0 * 180.0 - 90.0;
5050    (lon, lat)
5051}
5052
5053const BASE32_CHARS: &[u8] = b"0123456789bcdefghjkmnpqrstuvwxyz";
5054
5055fn geo_encode_base32(lon: f64, lat: f64) -> String {
5056    let lon_norm = (lon + 180.0) / 360.0;
5057    let lat_norm = (lat + 90.0) / 180.0;
5058
5059    let mut bits: Vec<bool> = Vec::with_capacity(55);
5060    let mut lon_min = 0.0f64;
5061    let mut lon_max = 1.0f64;
5062    let mut lat_min = 0.0f64;
5063    let mut lat_max = 1.0f64;
5064
5065    for i in 0..55 {
5066        if i % 2 == 0 {
5067            let mid = (lon_min + lon_max) / 2.0;
5068            if lon_norm >= mid {
5069                bits.push(true);
5070                lon_min = mid;
5071            } else {
5072                bits.push(false);
5073                lon_max = mid;
5074            }
5075        } else {
5076            let mid = (lat_min + lat_max) / 2.0;
5077            if lat_norm >= mid {
5078                bits.push(true);
5079                lat_min = mid;
5080            } else {
5081                bits.push(false);
5082                lat_max = mid;
5083            }
5084        }
5085    }
5086
5087    let mut result = String::with_capacity(11);
5088    for chunk in bits.chunks(5) {
5089        let mut idx = 0u8;
5090        for (j, &bit) in chunk.iter().enumerate() {
5091            if bit {
5092                idx |= 1 << (4 - j);
5093            }
5094        }
5095        result.push(BASE32_CHARS[idx as usize] as char);
5096    }
5097    result
5098}
5099
5100fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
5101    const EARTH_RADIUS: f64 = 6372797.560856;
5102
5103    let lat1_rad = lat1.to_radians();
5104    let lat2_rad = lat2.to_radians();
5105    let dlat = (lat2 - lat1).to_radians();
5106    let dlon = (lon2 - lon1).to_radians();
5107
5108    let a =
5109        (dlat / 2.0).sin().powi(2) + lat1_rad.cos() * lat2_rad.cos() * (dlon / 2.0).sin().powi(2);
5110    let c = 2.0 * a.sqrt().asin();
5111
5112    EARTH_RADIUS * c
5113}
5114
5115fn geo_convert_distance(meters: f64, unit: GeoUnit) -> f64 {
5116    match unit {
5117        GeoUnit::Meters => meters,
5118        GeoUnit::Kilometers => meters / 1000.0,
5119        GeoUnit::Feet => meters * 3.28084,
5120        GeoUnit::Miles => meters / 1609.344,
5121    }
5122}
5123
5124fn geo_to_meters(value: f64, unit: GeoUnit) -> f64 {
5125    match unit {
5126        GeoUnit::Meters => value,
5127        GeoUnit::Kilometers => value * 1000.0,
5128        GeoUnit::Feet => value / 3.28084,
5129        GeoUnit::Miles => value * 1609.344,
5130    }
5131}
5132
5133#[cfg(test)]
5134mod tests {
5135    use super::*;
5136
5137    #[test]
5138    fn test_set_and_get() {
5139        let mut store = ShardStore::new(0);
5140        store.execute(Command::Set {
5141            key: b"key1".to_vec(),
5142            value: b"value1".to_vec(),
5143            ex: None,
5144            px: None,
5145            nx: false,
5146            xx: false,
5147        });
5148        match store.execute(Command::Get {
5149            key: b"key1".to_vec(),
5150        }) {
5151            CommandResponse::BulkString(v) => assert_eq!(v, b"value1"),
5152            other => panic!("Expected BulkString, got {:?}", other),
5153        }
5154    }
5155
5156    #[test]
5157    fn test_get_large_value_uses_shared_bulk_string() {
5158        let mut store = ShardStore::new(0);
5159        let value = vec![b'x'; 256];
5160        store.execute(Command::Set {
5161            key: b"large".to_vec(),
5162            value: value.clone(),
5163            ex: None,
5164            px: None,
5165            nx: false,
5166            xx: false,
5167        });
5168
5169        match store.execute(Command::Get {
5170            key: b"large".to_vec(),
5171        }) {
5172            CommandResponse::BulkStringShared(bytes) => assert_eq!(bytes.as_ref(), value),
5173            other => panic!("Expected BulkStringShared, got {:?}", other),
5174        }
5175    }
5176
5177    #[test]
5178    fn test_get_nonexistent() {
5179        let mut store = ShardStore::new(0);
5180        assert!(matches!(
5181            store.execute(Command::Get {
5182                key: b"nope".to_vec()
5183            }),
5184            CommandResponse::Nil
5185        ));
5186    }
5187
5188    #[test]
5189    fn test_set_nx() {
5190        let mut store = ShardStore::new(0);
5191        store.execute(Command::Set {
5192            key: b"k".to_vec(),
5193            value: b"v1".to_vec(),
5194            ex: None,
5195            px: None,
5196            nx: false,
5197            xx: false,
5198        });
5199        // NX should fail if key exists
5200        let resp = store.execute(Command::SetNx {
5201            key: b"k".to_vec(),
5202            value: b"v2".to_vec(),
5203        });
5204        assert!(matches!(resp, CommandResponse::Integer(0)));
5205        // Original value unchanged
5206        match store.execute(Command::Get { key: b"k".to_vec() }) {
5207            CommandResponse::BulkString(v) => assert_eq!(v, b"v1"),
5208            other => panic!("Expected v1, got {:?}", other),
5209        }
5210    }
5211
5212    #[test]
5213    fn test_set_xx_treats_expired_key_as_missing() {
5214        let mut store = ShardStore::new(0);
5215        store.execute(Command::Set {
5216            key: b"k".to_vec(),
5217            value: b"stale".to_vec(),
5218            ex: None,
5219            px: Some(1),
5220            nx: false,
5221            xx: false,
5222        });
5223        std::thread::sleep(Duration::from_millis(5));
5224
5225        let resp = store.execute(Command::Set {
5226            key: b"k".to_vec(),
5227            value: b"fresh".to_vec(),
5228            ex: None,
5229            px: None,
5230            nx: false,
5231            xx: true,
5232        });
5233        assert!(matches!(resp, CommandResponse::Nil));
5234        assert!(matches!(
5235            store.execute(Command::Get { key: b"k".to_vec() }),
5236            CommandResponse::Nil
5237        ));
5238    }
5239
5240    #[test]
5241    fn test_incr_decr() {
5242        let mut store = ShardStore::new(0);
5243        // INCR on nonexistent key starts at 0
5244        match store.execute(Command::Incr {
5245            key: b"counter".to_vec(),
5246        }) {
5247            CommandResponse::Integer(1) => {}
5248            other => panic!("Expected 1, got {:?}", other),
5249        }
5250        store.execute(Command::IncrBy {
5251            key: b"counter".to_vec(),
5252            delta: 10,
5253        });
5254        match store.execute(Command::Get {
5255            key: b"counter".to_vec(),
5256        }) {
5257            CommandResponse::BulkString(v) => assert_eq!(v, b"11"),
5258            other => panic!("Expected 11, got {:?}", other),
5259        }
5260        store.execute(Command::Decr {
5261            key: b"counter".to_vec(),
5262        });
5263        match store.execute(Command::Get {
5264            key: b"counter".to_vec(),
5265        }) {
5266            CommandResponse::BulkString(v) => assert_eq!(v, b"10"),
5267            other => panic!("Expected 10, got {:?}", other),
5268        }
5269    }
5270
5271    #[test]
5272    fn test_del_multiple() {
5273        let mut store = ShardStore::new(0);
5274        store.execute(Command::Set {
5275            key: b"a".to_vec(),
5276            value: b"1".to_vec(),
5277            ex: None,
5278            px: None,
5279            nx: false,
5280            xx: false,
5281        });
5282        store.execute(Command::Set {
5283            key: b"b".to_vec(),
5284            value: b"2".to_vec(),
5285            ex: None,
5286            px: None,
5287            nx: false,
5288            xx: false,
5289        });
5290        match store.execute(Command::Del {
5291            keys: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5292        }) {
5293            CommandResponse::Integer(2) => {}
5294            other => panic!("Expected 2, got {:?}", other),
5295        }
5296    }
5297
5298    #[test]
5299    fn test_expire_and_ttl() {
5300        let mut store = ShardStore::new(0);
5301        store.execute(Command::Set {
5302            key: b"k".to_vec(),
5303            value: b"v".to_vec(),
5304            ex: Some(100),
5305            px: None,
5306            nx: false,
5307            xx: false,
5308        });
5309        match store.execute(Command::Ttl { key: b"k".to_vec() }) {
5310            CommandResponse::Integer(n) => assert!(n > 0 && n <= 100),
5311            other => panic!("Expected positive TTL, got {:?}", other),
5312        }
5313    }
5314
5315    #[test]
5316    fn test_type_command() {
5317        let mut store = ShardStore::new(0);
5318        store.execute(Command::Set {
5319            key: b"s".to_vec(),
5320            value: b"v".to_vec(),
5321            ex: None,
5322            px: None,
5323            nx: false,
5324            xx: false,
5325        });
5326        store.execute(Command::LPush {
5327            key: b"l".to_vec(),
5328            values: vec![b"a".to_vec()],
5329        });
5330        store.execute(Command::HSet {
5331            key: b"h".to_vec(),
5332            fields: vec![(b"f".to_vec(), b"v".to_vec())],
5333        });
5334        store.execute(Command::SAdd {
5335            key: b"set".to_vec(),
5336            members: vec![b"m".to_vec()],
5337        });
5338
5339        assert!(
5340            matches!(store.execute(Command::Type { key: b"s".to_vec() }), CommandResponse::SimpleString(s) if s == "string")
5341        );
5342        assert!(
5343            matches!(store.execute(Command::Type { key: b"l".to_vec() }), CommandResponse::SimpleString(s) if s == "list")
5344        );
5345        assert!(
5346            matches!(store.execute(Command::Type { key: b"h".to_vec() }), CommandResponse::SimpleString(s) if s == "hash")
5347        );
5348        assert!(
5349            matches!(store.execute(Command::Type { key: b"set".to_vec() }), CommandResponse::SimpleString(s) if s == "set")
5350        );
5351        assert!(
5352            matches!(store.execute(Command::Type { key: b"none".to_vec() }), CommandResponse::SimpleString(s) if s == "none")
5353        );
5354    }
5355
5356    #[test]
5357    fn test_list_operations() {
5358        let mut store = ShardStore::new(0);
5359        store.execute(Command::RPush {
5360            key: b"list".to_vec(),
5361            values: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5362        });
5363        assert!(matches!(
5364            store.execute(Command::LLen {
5365                key: b"list".to_vec()
5366            }),
5367            CommandResponse::Integer(3)
5368        ));
5369        match store.execute(Command::LRange {
5370            key: b"list".to_vec(),
5371            start: 0,
5372            stop: -1,
5373        }) {
5374            CommandResponse::Array(items) => assert_eq!(items.len(), 3),
5375            other => panic!("Expected array of 3, got {:?}", other),
5376        }
5377        match store.execute(Command::LPop {
5378            key: b"list".to_vec(),
5379        }) {
5380            CommandResponse::BulkString(v) => assert_eq!(v, b"a"),
5381            other => panic!("Expected 'a', got {:?}", other),
5382        }
5383        match store.execute(Command::RPop {
5384            key: b"list".to_vec(),
5385        }) {
5386            CommandResponse::BulkString(v) => assert_eq!(v, b"c"),
5387            other => panic!("Expected 'c', got {:?}", other),
5388        }
5389    }
5390
5391    #[test]
5392    fn test_hash_operations() {
5393        let mut store = ShardStore::new(0);
5394        store.execute(Command::HSet {
5395            key: b"hash".to_vec(),
5396            fields: vec![
5397                (b"name".to_vec(), b"Alice".to_vec()),
5398                (b"age".to_vec(), b"30".to_vec()),
5399            ],
5400        });
5401        match store.execute(Command::HGet {
5402            key: b"hash".to_vec(),
5403            field: b"name".to_vec(),
5404        }) {
5405            CommandResponse::BulkString(v) => assert_eq!(v, b"Alice"),
5406            other => panic!("Expected Alice, got {:?}", other),
5407        }
5408        assert!(matches!(
5409            store.execute(Command::HLen {
5410                key: b"hash".to_vec()
5411            }),
5412            CommandResponse::Integer(2)
5413        ));
5414        assert!(matches!(
5415            store.execute(Command::HExists {
5416                key: b"hash".to_vec(),
5417                field: b"name".to_vec()
5418            }),
5419            CommandResponse::Integer(1)
5420        ));
5421        store.execute(Command::HIncrBy {
5422            key: b"hash".to_vec(),
5423            field: b"age".to_vec(),
5424            delta: 5,
5425        });
5426        match store.execute(Command::HGet {
5427            key: b"hash".to_vec(),
5428            field: b"age".to_vec(),
5429        }) {
5430            CommandResponse::BulkString(v) => assert_eq!(v, b"35"),
5431            other => panic!("Expected 35, got {:?}", other),
5432        }
5433    }
5434
5435    #[test]
5436    fn test_set_operations() {
5437        let mut store = ShardStore::new(0);
5438        store.execute(Command::SAdd {
5439            key: b"myset".to_vec(),
5440            members: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
5441        });
5442        assert!(matches!(
5443            store.execute(Command::SCard {
5444                key: b"myset".to_vec()
5445            }),
5446            CommandResponse::Integer(3)
5447        ));
5448        assert!(matches!(
5449            store.execute(Command::SIsMember {
5450                key: b"myset".to_vec(),
5451                member: b"a".to_vec()
5452            }),
5453            CommandResponse::Integer(1)
5454        ));
5455        assert!(matches!(
5456            store.execute(Command::SIsMember {
5457                key: b"myset".to_vec(),
5458                member: b"z".to_vec()
5459            }),
5460            CommandResponse::Integer(0)
5461        ));
5462        store.execute(Command::SRem {
5463            key: b"myset".to_vec(),
5464            members: vec![b"b".to_vec()],
5465        });
5466        assert!(matches!(
5467            store.execute(Command::SCard {
5468                key: b"myset".to_vec()
5469            }),
5470            CommandResponse::Integer(2)
5471        ));
5472    }
5473
5474    #[test]
5475    fn test_set_numeric_payload_keeps_string_encoding() {
5476        let mut store = ShardStore::new(0);
5477        store.execute(Command::Set {
5478            key: b"num".to_vec(),
5479            value: b"42".to_vec(),
5480            ex: None,
5481            px: None,
5482            nx: false,
5483            xx: false,
5484        });
5485
5486        let get_resp = store.execute(Command::Get {
5487            key: b"num".to_vec(),
5488        });
5489        assert_eq!(get_resp.bulk_string_bytes(), Some(b"42".as_slice()));
5490
5491        match store.execute(Command::ObjectEncoding {
5492            key: b"num".to_vec(),
5493        }) {
5494            CommandResponse::BulkString(encoding) => assert_eq!(encoding, b"embstr"),
5495            other => panic!("Expected embstr encoding, got {:?}", other),
5496        }
5497    }
5498
5499    #[test]
5500    fn test_set_members_are_binary_safe() {
5501        let mut store = ShardStore::new(0);
5502        store.execute(Command::SAdd {
5503            key: b"codes".to_vec(),
5504            members: vec![b"042".to_vec()],
5505        });
5506
5507        assert_eq!(
5508            store.execute(Command::SIsMember {
5509                key: b"codes".to_vec(),
5510                member: b"042".to_vec(),
5511            }),
5512            CommandResponse::Integer(1)
5513        );
5514        assert_eq!(
5515            store.execute(Command::SIsMember {
5516                key: b"codes".to_vec(),
5517                member: b"42".to_vec(),
5518            }),
5519            CommandResponse::Integer(0)
5520        );
5521    }
5522
5523    #[test]
5524    fn test_wrongtype_error() {
5525        let mut store = ShardStore::new(0);
5526        store.execute(Command::Set {
5527            key: b"str".to_vec(),
5528            value: b"v".to_vec(),
5529            ex: None,
5530            px: None,
5531            nx: false,
5532            xx: false,
5533        });
5534        let resp = store.execute(Command::LPush {
5535            key: b"str".to_vec(),
5536            values: vec![b"x".to_vec()],
5537        });
5538        assert!(matches!(resp, CommandResponse::Error(s) if s.contains("WRONGTYPE")));
5539    }
5540
5541    #[test]
5542    fn test_keys_pattern() {
5543        let mut store = ShardStore::new(0);
5544        store.execute(Command::Set {
5545            key: b"user:1".to_vec(),
5546            value: b"a".to_vec(),
5547            ex: None,
5548            px: None,
5549            nx: false,
5550            xx: false,
5551        });
5552        store.execute(Command::Set {
5553            key: b"user:2".to_vec(),
5554            value: b"b".to_vec(),
5555            ex: None,
5556            px: None,
5557            nx: false,
5558            xx: false,
5559        });
5560        store.execute(Command::Set {
5561            key: b"session:1".to_vec(),
5562            value: b"c".to_vec(),
5563            ex: None,
5564            px: None,
5565            nx: false,
5566            xx: false,
5567        });
5568        match store.execute(Command::Keys {
5569            pattern: "user:*".into(),
5570        }) {
5571            CommandResponse::Array(items) => assert_eq!(items.len(), 2),
5572            other => panic!("Expected 2 keys, got {:?}", other),
5573        }
5574    }
5575
5576    #[test]
5577    fn test_ping_echo() {
5578        let mut store = ShardStore::new(0);
5579        assert!(
5580            matches!(store.execute(Command::Ping { message: None }), CommandResponse::SimpleString(s) if s == "PONG")
5581        );
5582        match store.execute(Command::Echo {
5583            message: b"hello".to_vec(),
5584        }) {
5585            CommandResponse::BulkString(v) => assert_eq!(v, b"hello"),
5586            other => panic!("Expected hello, got {:?}", other),
5587        }
5588    }
5589
5590    #[test]
5591    fn test_glob_matching() {
5592        assert!(glob_match("*", b"anything"));
5593        assert!(glob_match("user:*", b"user:123"));
5594        assert!(!glob_match("user:*", b"session:123"));
5595        assert!(glob_match("h?llo", b"hello"));
5596        assert!(glob_match("h?llo", b"hallo"));
5597        assert!(!glob_match("h?llo", b"hlo"));
5598    }
5599
5600    #[test]
5601    fn test_append() {
5602        let mut store = ShardStore::new(0);
5603        store.execute(Command::Append {
5604            key: b"k".to_vec(),
5605            value: b"Hello".to_vec(),
5606        });
5607        store.execute(Command::Append {
5608            key: b"k".to_vec(),
5609            value: b" World".to_vec(),
5610        });
5611        match store.execute(Command::Get { key: b"k".to_vec() }) {
5612            CommandResponse::BulkString(v) => assert_eq!(v, b"Hello World"),
5613            other => panic!("Expected 'Hello World', got {:?}", other),
5614        }
5615    }
5616
5617    #[test]
5618    fn test_dbsize_and_flush() {
5619        let mut store = ShardStore::new(0);
5620        store.execute(Command::Set {
5621            key: b"a".to_vec(),
5622            value: b"1".to_vec(),
5623            ex: None,
5624            px: None,
5625            nx: false,
5626            xx: false,
5627        });
5628        store.execute(Command::Set {
5629            key: b"b".to_vec(),
5630            value: b"2".to_vec(),
5631            ex: None,
5632            px: None,
5633            nx: false,
5634            xx: false,
5635        });
5636        assert!(matches!(
5637            store.execute(Command::DbSize),
5638            CommandResponse::Integer(2)
5639        ));
5640        store.execute(Command::FlushDb);
5641        assert!(matches!(
5642            store.execute(Command::DbSize),
5643            CommandResponse::Integer(0)
5644        ));
5645    }
5646
5647    #[test]
5648    fn test_lfu_counter_starts_at_5() {
5649        let mut store = ShardStore::new(0);
5650        store.execute(Command::Set {
5651            key: b"k".to_vec(),
5652            value: b"v".to_vec(),
5653            ex: None,
5654            px: None,
5655            nx: false,
5656            xx: false,
5657        });
5658        match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5659            CommandResponse::Integer(n) => assert_eq!(n, 5),
5660            other => panic!("Expected Integer(5), got {:?}", other),
5661        }
5662    }
5663
5664    #[test]
5665    fn test_lfu_counter_increments_on_access() {
5666        let mut store = ShardStore::new(0);
5667        store.execute(Command::Set {
5668            key: b"k".to_vec(),
5669            value: b"v".to_vec(),
5670            ex: None,
5671            px: None,
5672            nx: false,
5673            xx: false,
5674        });
5675        for _ in 0..100 {
5676            store.execute(Command::Get { key: b"k".to_vec() });
5677        }
5678        match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5679            CommandResponse::Integer(n) => assert!(n >= 5, "LFU counter should be >= 5, got {}", n),
5680            other => panic!("Expected Integer, got {:?}", other),
5681        }
5682    }
5683
5684    #[test]
5685    fn test_eviction_removes_least_frequent_key() {
5686        let mut store = ShardStore::new(0);
5687        store.set_max_memory(1);
5688
5689        store.execute(Command::Set {
5690            key: b"cold".to_vec(),
5691            value: b"val".to_vec(),
5692            ex: None,
5693            px: None,
5694            nx: false,
5695            xx: false,
5696        });
5697
5698        for _ in 0..50 {
5699            store.execute(Command::Get {
5700                key: b"cold".to_vec(),
5701            });
5702        }
5703
5704        store.execute(Command::Set {
5705            key: b"hot".to_vec(),
5706            value: b"val".to_vec(),
5707            ex: None,
5708            px: None,
5709            nx: false,
5710            xx: false,
5711        });
5712
5713        for _ in 0..200 {
5714            store.execute(Command::Get {
5715                key: b"hot".to_vec(),
5716            });
5717        }
5718
5719        store.execute(Command::Set {
5720            key: b"trigger".to_vec(),
5721            value: b"val".to_vec(),
5722            ex: None,
5723            px: None,
5724            nx: false,
5725            xx: false,
5726        });
5727
5728        assert!(
5729            store.len() < 4,
5730            "Eviction should have removed at least one key"
5731        );
5732    }
5733
5734    #[test]
5735    fn test_object_freq_command() {
5736        let mut store = ShardStore::new(0);
5737        assert!(matches!(
5738            store.execute(Command::ObjectFreq {
5739                key: b"missing".to_vec()
5740            }),
5741            CommandResponse::Nil
5742        ));
5743        store.execute(Command::Set {
5744            key: b"k".to_vec(),
5745            value: b"v".to_vec(),
5746            ex: None,
5747            px: None,
5748            nx: false,
5749            xx: false,
5750        });
5751        match store.execute(Command::ObjectFreq { key: b"k".to_vec() }) {
5752            CommandResponse::Integer(n) => assert!(n >= 0),
5753            other => panic!("Expected Integer, got {:?}", other),
5754        }
5755    }
5756
5757    #[test]
5758    fn test_object_encoding_command() {
5759        let mut store = ShardStore::new(0);
5760        store.execute(Command::Set {
5761            key: b"k".to_vec(),
5762            value: b"v".to_vec(),
5763            ex: None,
5764            px: None,
5765            nx: false,
5766            xx: false,
5767        });
5768        match store.execute(Command::ObjectEncoding { key: b"k".to_vec() }) {
5769            CommandResponse::BulkString(v) => {
5770                let encoding = String::from_utf8(v).unwrap_or_default();
5771                assert!(
5772                    encoding == "embstr" || encoding == "raw" || encoding == "int",
5773                    "Unexpected encoding: {}",
5774                    encoding
5775                );
5776            }
5777            other => panic!("Expected BulkString, got {:?}", other),
5778        }
5779    }
5780
5781    #[test]
5782    fn test_zadd_and_zscore() {
5783        let mut store = ShardStore::new(0);
5784        let resp = store.execute(Command::ZAdd {
5785            key: b"zs".to_vec(),
5786            members: vec![
5787                (1.0, b"alice".to_vec()),
5788                (2.0, b"bob".to_vec()),
5789                (3.0, b"charlie".to_vec()),
5790            ],
5791        });
5792        assert_eq!(resp, CommandResponse::Integer(3));
5793        match store.execute(Command::ZScore {
5794            key: b"zs".to_vec(),
5795            member: b"bob".to_vec(),
5796        }) {
5797            CommandResponse::BulkString(v) => assert_eq!(v, b"2"),
5798            other => panic!("Expected BulkString, got {:?}", other),
5799        }
5800        assert!(matches!(
5801            store.execute(Command::ZScore {
5802                key: b"zs".to_vec(),
5803                member: b"unknown".to_vec(),
5804            }),
5805            CommandResponse::Nil
5806        ));
5807    }
5808
5809    #[test]
5810    fn test_zadd_update_score() {
5811        let mut store = ShardStore::new(0);
5812        store.execute(Command::ZAdd {
5813            key: b"zs".to_vec(),
5814            members: vec![(1.0, b"alice".to_vec())],
5815        });
5816        let resp = store.execute(Command::ZAdd {
5817            key: b"zs".to_vec(),
5818            members: vec![(5.0, b"alice".to_vec())],
5819        });
5820        assert_eq!(resp, CommandResponse::Integer(0));
5821        match store.execute(Command::ZScore {
5822            key: b"zs".to_vec(),
5823            member: b"alice".to_vec(),
5824        }) {
5825            CommandResponse::BulkString(v) => assert_eq!(v, b"5"),
5826            other => panic!("Expected BulkString(5), got {:?}", other),
5827        }
5828    }
5829
5830    #[test]
5831    fn test_zrem() {
5832        let mut store = ShardStore::new(0);
5833        store.execute(Command::ZAdd {
5834            key: b"zs".to_vec(),
5835            members: vec![
5836                (1.0, b"a".to_vec()),
5837                (2.0, b"b".to_vec()),
5838                (3.0, b"c".to_vec()),
5839            ],
5840        });
5841        let resp = store.execute(Command::ZRem {
5842            key: b"zs".to_vec(),
5843            members: vec![b"a".to_vec(), b"c".to_vec(), b"nonexistent".to_vec()],
5844        });
5845        assert_eq!(resp, CommandResponse::Integer(2));
5846        assert_eq!(
5847            store.execute(Command::ZCard {
5848                key: b"zs".to_vec()
5849            }),
5850            CommandResponse::Integer(1)
5851        );
5852    }
5853
5854    #[test]
5855    fn test_zrank_and_zrevrank() {
5856        let mut store = ShardStore::new(0);
5857        store.execute(Command::ZAdd {
5858            key: b"zs".to_vec(),
5859            members: vec![
5860                (10.0, b"a".to_vec()),
5861                (20.0, b"b".to_vec()),
5862                (30.0, b"c".to_vec()),
5863            ],
5864        });
5865        assert_eq!(
5866            store.execute(Command::ZRank {
5867                key: b"zs".to_vec(),
5868                member: b"a".to_vec(),
5869            }),
5870            CommandResponse::Integer(0)
5871        );
5872        assert_eq!(
5873            store.execute(Command::ZRank {
5874                key: b"zs".to_vec(),
5875                member: b"c".to_vec(),
5876            }),
5877            CommandResponse::Integer(2)
5878        );
5879        assert_eq!(
5880            store.execute(Command::ZRevRank {
5881                key: b"zs".to_vec(),
5882                member: b"c".to_vec(),
5883            }),
5884            CommandResponse::Integer(0)
5885        );
5886        assert!(matches!(
5887            store.execute(Command::ZRank {
5888                key: b"zs".to_vec(),
5889                member: b"missing".to_vec(),
5890            }),
5891            CommandResponse::Nil
5892        ));
5893    }
5894
5895    #[test]
5896    fn test_zrange_and_zrevrange() {
5897        let mut store = ShardStore::new(0);
5898        store.execute(Command::ZAdd {
5899            key: b"zs".to_vec(),
5900            members: vec![
5901                (1.0, b"a".to_vec()),
5902                (2.0, b"b".to_vec()),
5903                (3.0, b"c".to_vec()),
5904            ],
5905        });
5906        match store.execute(Command::ZRange {
5907            key: b"zs".to_vec(),
5908            start: 0,
5909            stop: -1,
5910            withscores: false,
5911        }) {
5912            CommandResponse::Array(items) => {
5913                assert_eq!(items.len(), 3);
5914                assert_eq!(items[0], CommandResponse::BulkString(b"a".to_vec()));
5915                assert_eq!(items[2], CommandResponse::BulkString(b"c".to_vec()));
5916            }
5917            other => panic!("Expected Array, got {:?}", other),
5918        }
5919        match store.execute(Command::ZRevRange {
5920            key: b"zs".to_vec(),
5921            start: 0,
5922            stop: 1,
5923            withscores: true,
5924        }) {
5925            CommandResponse::Array(items) => {
5926                assert_eq!(items.len(), 4);
5927                assert_eq!(items[0], CommandResponse::BulkString(b"c".to_vec()));
5928                assert_eq!(items[1], CommandResponse::BulkString(b"3".to_vec()));
5929                assert_eq!(items[2], CommandResponse::BulkString(b"b".to_vec()));
5930            }
5931            other => panic!("Expected Array, got {:?}", other),
5932        }
5933    }
5934
5935    #[test]
5936    fn test_zrangebyscore() {
5937        let mut store = ShardStore::new(0);
5938        store.execute(Command::ZAdd {
5939            key: b"zs".to_vec(),
5940            members: vec![
5941                (1.0, b"a".to_vec()),
5942                (2.0, b"b".to_vec()),
5943                (3.0, b"c".to_vec()),
5944                (4.0, b"d".to_vec()),
5945            ],
5946        });
5947        match store.execute(Command::ZRangeByScore {
5948            key: b"zs".to_vec(),
5949            min: 2.0,
5950            max: 3.0,
5951            withscores: false,
5952            offset: None,
5953            count: None,
5954        }) {
5955            CommandResponse::Array(items) => {
5956                assert_eq!(items.len(), 2);
5957                assert_eq!(items[0], CommandResponse::BulkString(b"b".to_vec()));
5958                assert_eq!(items[1], CommandResponse::BulkString(b"c".to_vec()));
5959            }
5960            other => panic!("Expected Array, got {:?}", other),
5961        }
5962        match store.execute(Command::ZRangeByScore {
5963            key: b"zs".to_vec(),
5964            min: 1.0,
5965            max: 4.0,
5966            withscores: false,
5967            offset: Some(1),
5968            count: Some(2),
5969        }) {
5970            CommandResponse::Array(items) => {
5971                assert_eq!(items.len(), 2);
5972                assert_eq!(items[0], CommandResponse::BulkString(b"b".to_vec()));
5973                assert_eq!(items[1], CommandResponse::BulkString(b"c".to_vec()));
5974            }
5975            other => panic!("Expected Array, got {:?}", other),
5976        }
5977    }
5978
5979    #[test]
5980    fn test_zincrby() {
5981        let mut store = ShardStore::new(0);
5982        store.execute(Command::ZAdd {
5983            key: b"zs".to_vec(),
5984            members: vec![(10.0, b"a".to_vec())],
5985        });
5986        match store.execute(Command::ZIncrBy {
5987            key: b"zs".to_vec(),
5988            delta: 5.0,
5989            member: b"a".to_vec(),
5990        }) {
5991            CommandResponse::BulkString(v) => assert_eq!(v, b"15"),
5992            other => panic!("Expected BulkString, got {:?}", other),
5993        }
5994        match store.execute(Command::ZIncrBy {
5995            key: b"zs".to_vec(),
5996            delta: 3.0,
5997            member: b"newmember".to_vec(),
5998        }) {
5999            CommandResponse::BulkString(v) => assert_eq!(v, b"3"),
6000            other => panic!("Expected BulkString, got {:?}", other),
6001        }
6002    }
6003
6004    #[test]
6005    fn test_zcount() {
6006        let mut store = ShardStore::new(0);
6007        store.execute(Command::ZAdd {
6008            key: b"zs".to_vec(),
6009            members: vec![
6010                (1.0, b"a".to_vec()),
6011                (2.0, b"b".to_vec()),
6012                (3.0, b"c".to_vec()),
6013                (4.0, b"d".to_vec()),
6014            ],
6015        });
6016        assert_eq!(
6017            store.execute(Command::ZCount {
6018                key: b"zs".to_vec(),
6019                min: 2.0,
6020                max: 3.0,
6021            }),
6022            CommandResponse::Integer(2)
6023        );
6024        assert_eq!(
6025            store.execute(Command::ZCount {
6026                key: b"zs".to_vec(),
6027                min: f64::NEG_INFINITY,
6028                max: f64::INFINITY,
6029            }),
6030            CommandResponse::Integer(4)
6031        );
6032    }
6033
6034    #[test]
6035    fn test_zcard() {
6036        let mut store = ShardStore::new(0);
6037        assert_eq!(
6038            store.execute(Command::ZCard {
6039                key: b"zs".to_vec()
6040            }),
6041            CommandResponse::Integer(0)
6042        );
6043        store.execute(Command::ZAdd {
6044            key: b"zs".to_vec(),
6045            members: vec![(1.0, b"a".to_vec()), (2.0, b"b".to_vec())],
6046        });
6047        assert_eq!(
6048            store.execute(Command::ZCard {
6049                key: b"zs".to_vec()
6050            }),
6051            CommandResponse::Integer(2)
6052        );
6053    }
6054}