Skip to main content

ember_core/shard/
mod.rs

1//! Shard: an independent partition of the keyspace.
2//!
3//! ## thread-per-core, shared-nothing model
4//!
5//! Each shard runs as a single tokio task that exclusively owns its [`Keyspace`]
6//! partition. All mutations execute serially inside one task — no mutex, no
7//! read-write lock, no cross-thread coordination on the hot path. This is the
8//! core design choice that enables predictable latency: a shard thread can never
9//! be stalled waiting for another thread to release a lock.
10//!
11//! ## backpressure via bounded channel
12//!
13//! The mpsc buffer (4,096 items) is the system's flow-control valve. When a shard
14//! is fully loaded, `try_send` returns `Err(Full)` immediately, giving the caller a
15//! clear signal to shed load or return an error to the client rather than quietly
16//! growing an unbounded queue. This prevents memory blow-up under sustained overload.
17//!
18//! ## per-request oneshot channels
19//!
20//! Each [`ShardMessage`] carries a `oneshot::Sender<ShardResponse>`. The caller
21//! blocks on the receiver while the shard processes the request. This gives O(1)
22//! delivery with no shared response queue and no head-of-line blocking between
23//! concurrent callers — each caller waits only on its own future.
24//!
25//! ## pipeline draining
26//!
27//! After waking from `select!`, the event loop drains the channel with `try_recv()`
28//! before re-entering `select!`. This amortizes scheduler wake-up overhead across
29//! burst traffic — essential for pipelined clients that send dozens of commands
30//! back-to-back without waiting for individual responses.
31//!
32//! ## AOF linearizability
33//!
34//! Because all mutations execute serially in one task, the AOF record sequence is a
35//! perfect linearized history of the keyspace. The per-shard monotonically increasing
36//! offset is a consistent position marker that replicas use to detect gaps and
37//! trigger re-sync when they fall behind.
38//!
39//! ## mechanical sympathy
40//!
41//! The hot path (`recv → dispatch → respond`) touches only shard-local memory. No
42//! cross-thread cache-line contention. This is why sharded mode outperforms
43//! mutex-per-command designs by 3–5× on write-heavy workloads — the CPU's cache
44//! hierarchy can stay warm on data that belongs to this core.
45
46mod aof;
47mod blocking;
48mod persistence;
49
50use std::collections::{HashMap, VecDeque};
51use std::path::PathBuf;
52use std::time::Duration;
53
54use bytes::Bytes;
55use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
56use ember_persistence::recovery::{self, RecoveredValue};
57use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
58use smallvec::{smallvec, SmallVec};
59use tokio::sync::{broadcast, mpsc, oneshot};
60use tracing::{debug, error, info, warn};
61
62use crate::dropper::DropHandle;
63use crate::error::ShardError;
64use crate::expiry;
65use crate::keyspace::{
66    EvictionPolicy, IncrError, IncrFloatError, Keyspace, KeyspaceStats, LsetError, SetResult,
67    ShardConfig, TtlResult, WriteError,
68};
69use crate::types::sorted_set::{ScoreBound, ZAddFlags};
70use crate::types::Value;
71use ember_protocol::command::{BitOpKind, BitRange};
72
73/// How often the shard runs active expiration. 100ms matches
74/// Redis's hz=10 default and keeps CPU overhead negligible.
75const EXPIRY_TICK: Duration = Duration::from_millis(100);
76
77/// How often to fsync when using the `EverySec` policy.
78const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
79
80/// A mutation event broadcast to replication subscribers.
81///
82/// Published after every successful mutation on the hot path. The
83/// `offset` is per-shard and monotonically increasing — replicas use it
84/// to detect gaps and trigger re-sync when they fall behind.
85#[derive(Debug, Clone)]
86pub struct ReplicationEvent {
87    /// The shard that produced this event.
88    pub shard_id: u16,
89    /// Monotonically increasing per-shard offset.
90    pub offset: u64,
91    /// The mutation record, ready to replay on a replica.
92    pub record: AofRecord,
93}
94
95/// Optional persistence configuration for a shard.
96#[derive(Debug, Clone)]
97pub struct ShardPersistenceConfig {
98    /// Directory where AOF and snapshot files live.
99    pub data_dir: PathBuf,
100    /// Whether to write an AOF log of mutations.
101    pub append_only: bool,
102    /// When to fsync the AOF file.
103    pub fsync_policy: FsyncPolicy,
104    /// Optional encryption key for encrypting data at rest.
105    /// When set, AOF and snapshot files use the v3 encrypted format.
106    #[cfg(feature = "encryption")]
107    pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
108}
109
110/// A protocol-agnostic command sent to a shard.
111#[derive(Debug)]
112pub enum ShardRequest {
113    Get {
114        key: String,
115    },
116    Set {
117        key: String,
118        value: Bytes,
119        expire: Option<Duration>,
120        /// Only set the key if it does not already exist.
121        nx: bool,
122        /// Only set the key if it already exists.
123        xx: bool,
124    },
125    Incr {
126        key: String,
127    },
128    Decr {
129        key: String,
130    },
131    IncrBy {
132        key: String,
133        delta: i64,
134    },
135    DecrBy {
136        key: String,
137        delta: i64,
138    },
139    IncrByFloat {
140        key: String,
141        delta: f64,
142    },
143    Append {
144        key: String,
145        value: Bytes,
146    },
147    Strlen {
148        key: String,
149    },
150    GetRange {
151        key: String,
152        start: i64,
153        end: i64,
154    },
155    SetRange {
156        key: String,
157        offset: usize,
158        value: Bytes,
159    },
160    /// GETBIT key offset. Returns the bit at `offset` (0 or 1). Big-endian ordering.
161    GetBit {
162        key: String,
163        offset: u64,
164    },
165    /// SETBIT key offset value. Sets the bit at `offset` to 0 or 1. Returns old bit.
166    SetBit {
167        key: String,
168        offset: u64,
169        value: u8,
170    },
171    /// BITCOUNT key [range]. Counts set bits, optionally restricted to a range.
172    BitCount {
173        key: String,
174        range: Option<BitRange>,
175    },
176    /// BITPOS key bit [range]. Finds first set or clear bit position.
177    BitPos {
178        key: String,
179        bit: u8,
180        range: Option<BitRange>,
181    },
182    /// BITOP op destkey key [key ...]. Bitwise operation across strings.
183    BitOp {
184        op: BitOpKind,
185        dest: String,
186        keys: Vec<String>,
187    },
188    /// Returns all keys matching a glob pattern in this shard.
189    Keys {
190        pattern: String,
191    },
192    /// Renames a key within this shard.
193    Rename {
194        key: String,
195        newkey: String,
196    },
197    /// Copies the value at source to destination within this shard.
198    Copy {
199        source: String,
200        destination: String,
201        replace: bool,
202    },
203    /// Returns the internal encoding name for the value at key.
204    ObjectEncoding {
205        key: String,
206    },
207    Del {
208        key: String,
209    },
210    /// Like DEL but defers value deallocation to the background drop thread.
211    Unlink {
212        key: String,
213    },
214    Exists {
215        key: String,
216    },
217    /// Returns a random key from the shard's keyspace.
218    RandomKey,
219    /// Updates last access time for a key. Returns bool (existed).
220    Touch {
221        key: String,
222    },
223    /// Sorts elements from a list, set, or sorted set in this shard.
224    Sort {
225        key: String,
226        desc: bool,
227        alpha: bool,
228        limit: Option<(i64, i64)>,
229    },
230    Expire {
231        key: String,
232        seconds: u64,
233    },
234    Ttl {
235        key: String,
236    },
237    /// MEMORY USAGE. Returns the estimated memory footprint of a key in bytes.
238    MemoryUsage {
239        key: String,
240    },
241    Persist {
242        key: String,
243    },
244    Pttl {
245        key: String,
246    },
247    Pexpire {
248        key: String,
249        milliseconds: u64,
250    },
251    /// EXPIREAT: set expiry at an absolute Unix timestamp (seconds).
252    Expireat {
253        key: String,
254        timestamp: u64,
255    },
256    /// PEXPIREAT: set expiry at an absolute Unix timestamp (milliseconds).
257    Pexpireat {
258        key: String,
259        timestamp_ms: u64,
260    },
261    LPush {
262        key: String,
263        values: Vec<Bytes>,
264    },
265    RPush {
266        key: String,
267        values: Vec<Bytes>,
268    },
269    LPop {
270        key: String,
271    },
272    RPop {
273        key: String,
274    },
275    /// LPOP key count — pop up to `count` elements from the list head, returning an array.
276    LPopCount {
277        key: String,
278        count: usize,
279    },
280    /// RPOP key count — pop up to `count` elements from the list tail, returning an array.
281    RPopCount {
282        key: String,
283        count: usize,
284    },
285    /// Blocking left-pop. If the list has elements, pops immediately and sends
286    /// the result on `waiter`. If empty, the shard registers the waiter to be
287    /// woken when an element is pushed. Uses an mpsc sender so multiple shards
288    /// can race to deliver the first result to a single receiver.
289    BLPop {
290        key: String,
291        waiter: mpsc::Sender<(String, Bytes)>,
292    },
293    /// Blocking right-pop. Same semantics as BLPop but pops from the tail.
294    BRPop {
295        key: String,
296        waiter: mpsc::Sender<(String, Bytes)>,
297    },
298    LRange {
299        key: String,
300        start: i64,
301        stop: i64,
302    },
303    LLen {
304        key: String,
305    },
306    LIndex {
307        key: String,
308        index: i64,
309    },
310    LSet {
311        key: String,
312        index: i64,
313        value: Bytes,
314    },
315    LTrim {
316        key: String,
317        start: i64,
318        stop: i64,
319    },
320    LInsert {
321        key: String,
322        before: bool,
323        pivot: Bytes,
324        value: Bytes,
325    },
326    LRem {
327        key: String,
328        count: i64,
329        value: Bytes,
330    },
331    LPos {
332        key: String,
333        element: Bytes,
334        rank: i64,
335        count: usize,
336        maxlen: usize,
337    },
338    Type {
339        key: String,
340    },
341    ZAdd {
342        key: String,
343        members: Vec<(f64, String)>,
344        nx: bool,
345        xx: bool,
346        gt: bool,
347        lt: bool,
348        ch: bool,
349    },
350    ZRem {
351        key: String,
352        members: Vec<String>,
353    },
354    ZScore {
355        key: String,
356        member: String,
357    },
358    ZRank {
359        key: String,
360        member: String,
361    },
362    ZRevRank {
363        key: String,
364        member: String,
365    },
366    ZCard {
367        key: String,
368    },
369    ZRange {
370        key: String,
371        start: i64,
372        stop: i64,
373        with_scores: bool,
374    },
375    ZRevRange {
376        key: String,
377        start: i64,
378        stop: i64,
379        with_scores: bool,
380    },
381    ZCount {
382        key: String,
383        min: ScoreBound,
384        max: ScoreBound,
385    },
386    ZIncrBy {
387        key: String,
388        increment: f64,
389        member: String,
390    },
391    ZRangeByScore {
392        key: String,
393        min: ScoreBound,
394        max: ScoreBound,
395        offset: usize,
396        count: Option<usize>,
397    },
398    ZRevRangeByScore {
399        key: String,
400        min: ScoreBound,
401        max: ScoreBound,
402        offset: usize,
403        count: Option<usize>,
404    },
405    ZPopMin {
406        key: String,
407        count: usize,
408    },
409    ZPopMax {
410        key: String,
411        count: usize,
412    },
413    /// LMPOP single-key sub-request: pop up to `count` items from one list.
414    LmpopSingle {
415        key: String,
416        left: bool,
417        count: usize,
418    },
419    /// ZMPOP single-key sub-request: pop up to `count` items from one sorted set.
420    ZmpopSingle {
421        key: String,
422        min: bool,
423        count: usize,
424    },
425    HSet {
426        key: String,
427        fields: Vec<(String, Bytes)>,
428    },
429    HGet {
430        key: String,
431        field: String,
432    },
433    HGetAll {
434        key: String,
435    },
436    HDel {
437        key: String,
438        fields: Vec<String>,
439    },
440    HExists {
441        key: String,
442        field: String,
443    },
444    HLen {
445        key: String,
446    },
447    HIncrBy {
448        key: String,
449        field: String,
450        delta: i64,
451    },
452    /// HINCRBYFLOAT key field increment — increments a hash field by a float.
453    HIncrByFloat {
454        key: String,
455        field: String,
456        delta: f64,
457    },
458    HKeys {
459        key: String,
460    },
461    HVals {
462        key: String,
463    },
464    HMGet {
465        key: String,
466        fields: Vec<String>,
467    },
468    /// HRANDFIELD — returns random field(s) from a hash; read-only, no AOF.
469    HRandField {
470        key: String,
471        count: Option<i64>,
472        with_values: bool,
473    },
474    SAdd {
475        key: String,
476        members: Vec<String>,
477    },
478    SRem {
479        key: String,
480        members: Vec<String>,
481    },
482    SMembers {
483        key: String,
484    },
485    SIsMember {
486        key: String,
487        member: String,
488    },
489    SCard {
490        key: String,
491    },
492    SUnion {
493        keys: Vec<String>,
494    },
495    SInter {
496        keys: Vec<String>,
497    },
498    SDiff {
499        keys: Vec<String>,
500    },
501    SUnionStore {
502        dest: String,
503        keys: Vec<String>,
504    },
505    SInterStore {
506        dest: String,
507        keys: Vec<String>,
508    },
509    SDiffStore {
510        dest: String,
511        keys: Vec<String>,
512    },
513    SRandMember {
514        key: String,
515        count: i64,
516    },
517    SPop {
518        key: String,
519        count: usize,
520    },
521    SMisMember {
522        key: String,
523        members: Vec<String>,
524    },
525    /// SMOVE — atomically moves a member between two sets on the same shard.
526    SMove {
527        source: String,
528        destination: String,
529        member: String,
530    },
531    /// SINTERCARD — returns cardinality of set intersection, capped at limit (0 = no limit).
532    SInterCard {
533        keys: Vec<String>,
534        limit: usize,
535    },
536    /// EXPIRETIME — returns the absolute expiry timestamp in seconds (-1 or -2 for missing/no-expiry).
537    Expiretime {
538        key: String,
539    },
540    /// PEXPIRETIME — returns the absolute expiry timestamp in milliseconds (-1 or -2 for missing/no-expiry).
541    Pexpiretime {
542        key: String,
543    },
544    /// LMOVE: atomically pops from source and pushes to destination.
545    LMove {
546        source: String,
547        destination: String,
548        src_left: bool,
549        dst_left: bool,
550    },
551    /// GETDEL: returns the value at key and deletes it.
552    GetDel {
553        key: String,
554    },
555    /// GETSET: atomically sets key to a new value and returns the old value.
556    GetSet {
557        key: String,
558        value: Bytes,
559    },
560    /// MSETNX: sets multiple keys only if none already exist (atomic all-or-nothing).
561    MSetNx {
562        pairs: Vec<(String, Bytes)>,
563    },
564    /// GETEX: returns the value at key and optionally updates its TTL.
565    ///
566    /// `expire`: `None` = no change, `Some(None)` = persist, `Some(Some(ms))` = new TTL in ms.
567    GetEx {
568        key: String,
569        expire: Option<Option<u64>>,
570    },
571    /// ZDIFF: returns members in the first sorted set not in the others.
572    ZDiff {
573        keys: Vec<String>,
574    },
575    /// ZINTER: returns members present in all sorted sets, scores summed.
576    ZInter {
577        keys: Vec<String>,
578    },
579    /// ZUNION: returns the union of all sorted sets, scores summed.
580    ZUnion {
581        keys: Vec<String>,
582    },
583    /// ZDIFFSTORE destkey numkeys key [key ...] — stores diff result in dest.
584    ZDiffStore {
585        dest: String,
586        keys: Vec<String>,
587    },
588    /// ZINTERSTORE destkey numkeys key [key ...] — stores intersection in dest.
589    ZInterStore {
590        dest: String,
591        keys: Vec<String>,
592    },
593    /// ZUNIONSTORE destkey numkeys key [key ...] — stores union in dest.
594    ZUnionStore {
595        dest: String,
596        keys: Vec<String>,
597    },
598    /// ZRANDMEMBER — returns random member(s) from a sorted set; read-only, no AOF.
599    ZRandMember {
600        key: String,
601        count: Option<i64>,
602        with_scores: bool,
603    },
604    /// Returns the key count for this shard.
605    DbSize,
606    /// Returns keyspace stats for this shard.
607    Stats,
608    /// Returns the current version of a key for WATCH optimistic locking.
609    /// Read-only, no AOF, no replication — cold path only.
610    KeyVersion {
611        key: String,
612    },
613    /// Applies a live memory configuration update to this shard.
614    ///
615    /// Sent by the server when CONFIG SET maxmemory or maxmemory-policy
616    /// is changed at runtime. Takes effect on the next write check.
617    UpdateMemoryConfig {
618        max_memory: Option<usize>,
619        eviction_policy: EvictionPolicy,
620    },
621    /// Triggers a snapshot write.
622    Snapshot,
623    /// Serializes the current shard state to bytes (in-memory snapshot).
624    ///
625    /// Used by the replication server to capture a consistent shard
626    /// snapshot for transmission to a new replica without filesystem I/O.
627    SerializeSnapshot,
628    /// Triggers an AOF rewrite (snapshot + truncate AOF).
629    RewriteAof,
630    /// Clears all keys from the keyspace.
631    FlushDb,
632    /// Clears all keys, deferring deallocation to the background drop thread.
633    FlushDbAsync,
634    /// Scans keys in the keyspace.
635    Scan {
636        cursor: u64,
637        count: usize,
638        pattern: Option<String>,
639    },
640    /// Incrementally iterates set members.
641    SScan {
642        key: String,
643        cursor: u64,
644        count: usize,
645        pattern: Option<String>,
646    },
647    /// Incrementally iterates hash fields.
648    HScan {
649        key: String,
650        cursor: u64,
651        count: usize,
652        pattern: Option<String>,
653    },
654    /// Incrementally iterates sorted set members.
655    ZScan {
656        key: String,
657        cursor: u64,
658        count: usize,
659        pattern: Option<String>,
660    },
661    /// Counts keys in this shard that hash to the given cluster slot.
662    CountKeysInSlot {
663        slot: u16,
664    },
665    /// Returns up to `count` keys that hash to the given cluster slot.
666    GetKeysInSlot {
667        slot: u16,
668        count: usize,
669    },
670    /// Dumps a key's value as serialized bytes for MIGRATE.
671    DumpKey {
672        key: String,
673    },
674    /// Restores a key from serialized bytes (received via MIGRATE).
675    RestoreKey {
676        key: String,
677        ttl_ms: u64,
678        data: bytes::Bytes,
679        replace: bool,
680    },
681    /// Adds a vector to a vector set.
682    #[cfg(feature = "vector")]
683    VAdd {
684        key: String,
685        element: String,
686        vector: Vec<f32>,
687        metric: u8,
688        quantization: u8,
689        connectivity: u32,
690        expansion_add: u32,
691    },
692    /// Adds multiple vectors to a vector set in a single command.
693    #[cfg(feature = "vector")]
694    VAddBatch {
695        key: String,
696        entries: Vec<(String, Vec<f32>)>,
697        dim: usize,
698        metric: u8,
699        quantization: u8,
700        connectivity: u32,
701        expansion_add: u32,
702    },
703    /// Searches for nearest neighbors in a vector set.
704    #[cfg(feature = "vector")]
705    VSim {
706        key: String,
707        query: Vec<f32>,
708        count: usize,
709        ef_search: usize,
710    },
711    /// Removes an element from a vector set.
712    #[cfg(feature = "vector")]
713    VRem {
714        key: String,
715        element: String,
716    },
717    /// Gets the stored vector for an element.
718    #[cfg(feature = "vector")]
719    VGet {
720        key: String,
721        element: String,
722    },
723    /// Returns the number of elements in a vector set.
724    #[cfg(feature = "vector")]
725    VCard {
726        key: String,
727    },
728    /// Returns the dimensionality of a vector set.
729    #[cfg(feature = "vector")]
730    VDim {
731        key: String,
732    },
733    /// Returns metadata about a vector set.
734    #[cfg(feature = "vector")]
735    VInfo {
736        key: String,
737    },
738    /// Stores a validated protobuf value.
739    #[cfg(feature = "protobuf")]
740    ProtoSet {
741        key: String,
742        type_name: String,
743        data: Bytes,
744        expire: Option<Duration>,
745        nx: bool,
746        xx: bool,
747    },
748    /// Retrieves a protobuf value.
749    #[cfg(feature = "protobuf")]
750    ProtoGet {
751        key: String,
752    },
753    /// Returns the protobuf message type name for a key.
754    #[cfg(feature = "protobuf")]
755    ProtoType {
756        key: String,
757    },
758    /// Writes a ProtoRegister AOF record (no keyspace mutation).
759    /// Broadcast to all shards after a schema registration so the
760    /// schema is recovered from any shard's AOF on restart.
761    #[cfg(feature = "protobuf")]
762    ProtoRegisterAof {
763        name: String,
764        descriptor: Bytes,
765    },
766    /// Atomically reads a proto value, sets a field, and writes it back.
767    /// Runs entirely within the shard's single-threaded dispatch.
768    #[cfg(feature = "protobuf")]
769    ProtoSetField {
770        key: String,
771        field_path: String,
772        value: String,
773    },
774    /// Atomically reads a proto value, clears a field, and writes it back.
775    /// Runs entirely within the shard's single-threaded dispatch.
776    #[cfg(feature = "protobuf")]
777    ProtoDelField {
778        key: String,
779        field_path: String,
780    },
781}
782
783impl ShardRequest {
784    /// Returns `true` if this request mutates the keyspace and should be
785    /// rejected when the AOF disk is full. Read-only operations, admin
786    /// commands, and scan operations always proceed.
787    fn is_write(&self) -> bool {
788        #[allow(unreachable_patterns)]
789        match self {
790            ShardRequest::Set { .. }
791            | ShardRequest::Incr { .. }
792            | ShardRequest::Decr { .. }
793            | ShardRequest::IncrBy { .. }
794            | ShardRequest::DecrBy { .. }
795            | ShardRequest::IncrByFloat { .. }
796            | ShardRequest::Append { .. }
797            | ShardRequest::SetBit { .. }
798            | ShardRequest::BitOp { .. }
799            | ShardRequest::Del { .. }
800            | ShardRequest::Unlink { .. }
801            | ShardRequest::Rename { .. }
802            | ShardRequest::Copy { .. }
803            | ShardRequest::Expire { .. }
804            | ShardRequest::Expireat { .. }
805            | ShardRequest::Persist { .. }
806            | ShardRequest::Pexpire { .. }
807            | ShardRequest::Pexpireat { .. }
808            | ShardRequest::LPush { .. }
809            | ShardRequest::RPush { .. }
810            | ShardRequest::LPop { .. }
811            | ShardRequest::RPop { .. }
812            | ShardRequest::LPopCount { .. }
813            | ShardRequest::RPopCount { .. }
814            | ShardRequest::LSet { .. }
815            | ShardRequest::LTrim { .. }
816            | ShardRequest::LInsert { .. }
817            | ShardRequest::LRem { .. }
818            | ShardRequest::BLPop { .. }
819            | ShardRequest::BRPop { .. }
820            | ShardRequest::ZAdd { .. }
821            | ShardRequest::ZRem { .. }
822            | ShardRequest::ZIncrBy { .. }
823            | ShardRequest::ZPopMin { .. }
824            | ShardRequest::ZPopMax { .. }
825            | ShardRequest::LmpopSingle { .. }
826            | ShardRequest::ZmpopSingle { .. }
827            | ShardRequest::HSet { .. }
828            | ShardRequest::HDel { .. }
829            | ShardRequest::HIncrBy { .. }
830            | ShardRequest::HIncrByFloat { .. }
831            | ShardRequest::SAdd { .. }
832            | ShardRequest::SRem { .. }
833            | ShardRequest::SPop { .. }
834            | ShardRequest::SUnionStore { .. }
835            | ShardRequest::SInterStore { .. }
836            | ShardRequest::SDiffStore { .. }
837            | ShardRequest::SMove { .. }
838            | ShardRequest::ZDiffStore { .. }
839            | ShardRequest::ZInterStore { .. }
840            | ShardRequest::ZUnionStore { .. }
841            | ShardRequest::LMove { .. }
842            | ShardRequest::GetDel { .. }
843            | ShardRequest::GetEx { .. }
844            | ShardRequest::GetSet { .. }
845            | ShardRequest::MSetNx { .. }
846            | ShardRequest::FlushDb
847            | ShardRequest::FlushDbAsync
848            | ShardRequest::RestoreKey { .. } => true,
849            #[cfg(feature = "protobuf")]
850            ShardRequest::ProtoSet { .. }
851            | ShardRequest::ProtoRegisterAof { .. }
852            | ShardRequest::ProtoSetField { .. }
853            | ShardRequest::ProtoDelField { .. } => true,
854            #[cfg(feature = "vector")]
855            ShardRequest::VAdd { .. }
856            | ShardRequest::VAddBatch { .. }
857            | ShardRequest::VRem { .. } => true,
858            _ => false,
859        }
860    }
861}
862
863/// The shard's response to a request.
864#[derive(Debug)]
865pub enum ShardResponse {
866    /// A value (or None for a cache miss).
867    Value(Option<Value>),
868    /// Simple acknowledgement (e.g. SET).
869    Ok,
870    /// Integer result (e.g. INCR, DECR).
871    Integer(i64),
872    /// Boolean result (e.g. DEL, EXISTS, EXPIRE).
873    Bool(bool),
874    /// TTL query result.
875    Ttl(TtlResult),
876    /// Memory limit reached and eviction policy is NoEviction.
877    OutOfMemory,
878    /// Key count for a shard (DBSIZE).
879    KeyCount(usize),
880    /// Full stats for a shard (INFO).
881    Stats(KeyspaceStats),
882    /// Integer length result (e.g. LPUSH, RPUSH, LLEN).
883    Len(usize),
884    /// Array of bulk values (e.g. LRANGE).
885    Array(Vec<Bytes>),
886    /// The type name of a stored value.
887    TypeName(&'static str),
888    /// The encoding name of a stored value, or None if the key doesn't exist.
889    EncodingName(Option<&'static str>),
890    /// ZADD result: count for the client + actually applied members for AOF.
891    ZAddLen {
892        count: usize,
893        applied: Vec<(f64, String)>,
894    },
895    /// ZREM result: count for the client + actually removed members for AOF.
896    ZRemLen { count: usize, removed: Vec<String> },
897    /// Float score result (e.g. ZSCORE).
898    Score(Option<f64>),
899    /// Rank result (e.g. ZRANK).
900    Rank(Option<usize>),
901    /// Scored array of (member, score) pairs (e.g. ZRANGE).
902    ScoredArray(Vec<(String, f64)>),
903    /// ZINCRBY result: new score + the member/score for AOF persistence.
904    ZIncrByResult { new_score: f64, member: String },
905    /// ZPOPMIN/ZPOPMAX result: popped members for both response and AOF.
906    ZPopResult(Vec<(String, f64)>),
907    /// A bulk string result (e.g. INCRBYFLOAT).
908    BulkString(String),
909    /// Command used against a key holding the wrong kind of value.
910    WrongType,
911    /// An error message.
912    Err(String),
913    /// Scan result: next cursor and list of keys.
914    Scan { cursor: u64, keys: Vec<String> },
915    /// SSCAN/HSCAN/ZSCAN result: next cursor and pre-flattened items.
916    CollectionScan { cursor: u64, items: Vec<Bytes> },
917    /// HGETALL result: all field-value pairs.
918    HashFields(Vec<(String, Bytes)>),
919    /// HRANDFIELD result: field names with optional values.
920    HRandFieldResult(Vec<(String, Option<Bytes>)>),
921    /// ZRANDMEMBER result: member names with optional scores.
922    ZRandMemberResult(Vec<(String, Option<f64>)>),
923    /// HDEL result: removed count + field names for AOF.
924    HDelLen { count: usize, removed: Vec<String> },
925    /// Array of strings (e.g. HKEYS).
926    StringArray(Vec<String>),
927    /// Array of integer positions (e.g. LPOS).
928    IntegerArray(Vec<i64>),
929    /// Array of booleans (e.g. SMISMEMBER).
930    BoolArray(Vec<bool>),
931    /// SUNIONSTORE/SINTERSTORE/SDIFFSTORE result: count + stored members for AOF.
932    SetStoreResult { count: usize, members: Vec<String> },
933    /// ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE result: count + scored members for AOF.
934    ZStoreResult {
935        count: usize,
936        /// (score, member) pairs stored in dest.
937        members: Vec<(f64, String)>,
938    },
939    /// Serialized key dump with remaining TTL (for MIGRATE/DUMP).
940    KeyDump { data: Vec<u8>, ttl_ms: i64 },
941    /// In-memory snapshot of the full shard state (for replication).
942    SnapshotData { shard_id: u16, data: Vec<u8> },
943    /// HMGET result: array of optional values.
944    OptionalArray(Vec<Option<Bytes>>),
945    /// VADD result: element, vector, and whether it was newly added.
946    #[cfg(feature = "vector")]
947    VAddResult {
948        element: String,
949        vector: Vec<f32>,
950        added: bool,
951    },
952    /// VADD_BATCH result: count of newly added elements + applied entries for AOF.
953    #[cfg(feature = "vector")]
954    VAddBatchResult {
955        added_count: usize,
956        applied: Vec<(String, Vec<f32>)>,
957    },
958    /// VSIM result: nearest neighbors with distances.
959    #[cfg(feature = "vector")]
960    VSimResult(Vec<(String, f32)>),
961    /// VGET result: stored vector or None.
962    #[cfg(feature = "vector")]
963    VectorData(Option<Vec<f32>>),
964    /// VINFO result: vector set metadata.
965    #[cfg(feature = "vector")]
966    VectorInfo(Option<Vec<(String, String)>>),
967    /// PROTO.GET result: (type_name, data, remaining_ttl) or None.
968    #[cfg(feature = "protobuf")]
969    ProtoValue(Option<(String, Bytes, Option<Duration>)>),
970    /// PROTO.TYPE result: message type name or None.
971    #[cfg(feature = "protobuf")]
972    ProtoTypeName(Option<String>),
973    /// Result of an atomic SETFIELD/DELFIELD: carries the updated value
974    /// for AOF persistence.
975    #[cfg(feature = "protobuf")]
976    ProtoFieldUpdated {
977        type_name: String,
978        data: Bytes,
979        expire: Option<Duration>,
980    },
981    /// Key version for WATCH optimistic locking. `None` means missing/expired.
982    Version(Option<u64>),
983}
984
985/// A request (or batch of requests) bundled with reply channels.
986///
987/// The `Batch` variant reduces channel traffic during pipelining: instead
988/// of N individual sends (one per pipelined command), the connection handler
989/// groups commands by target shard and sends one `Batch` message per shard.
990/// This cuts channel contention from O(pipeline_depth) to O(shard_count).
991///
992/// The `SingleReusable` variant avoids per-command `oneshot::channel()`
993/// allocation on the P=1 (no pipeline) path. The connection handler keeps
994/// a long-lived `mpsc::channel(1)` and reuses it across commands.
995#[derive(Debug)]
996pub enum ShardMessage {
997    /// A single request with its reply channel.
998    Single {
999        request: ShardRequest,
1000        reply: oneshot::Sender<ShardResponse>,
1001    },
1002    /// A single request using a reusable mpsc reply channel.
1003    ///
1004    /// Avoids the heap allocation of `oneshot::channel()` on every command.
1005    /// Used for the P=1 fast path where the connection handler sends one
1006    /// command at a time and waits for the response before sending the next.
1007    SingleReusable {
1008        request: ShardRequest,
1009        reply: mpsc::Sender<ShardResponse>,
1010    },
1011    /// Multiple requests batched for a single channel send.
1012    Batch(Vec<(ShardRequest, oneshot::Sender<ShardResponse>)>),
1013}
1014
1015/// A cloneable handle for sending commands to a shard task.
1016///
1017/// Wraps the mpsc sender so callers don't need to manage oneshot
1018/// channels directly.
1019#[derive(Debug, Clone)]
1020pub struct ShardHandle {
1021    tx: mpsc::Sender<ShardMessage>,
1022}
1023
1024impl ShardHandle {
1025    /// Sends a request and waits for the response.
1026    ///
1027    /// Returns `ShardError::Unavailable` if the shard task has stopped.
1028    pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
1029        let rx = self.dispatch(request).await?;
1030        rx.await.map_err(|_| ShardError::Unavailable)
1031    }
1032
1033    /// Sends a request and returns the reply channel without waiting
1034    /// for the response. Used by `Engine::broadcast` to fan out to
1035    /// all shards before collecting results, and by
1036    /// `Engine::dispatch_to_shard` for the dispatch-collect pipeline.
1037    pub async fn dispatch(
1038        &self,
1039        request: ShardRequest,
1040    ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
1041        let (reply_tx, reply_rx) = oneshot::channel();
1042        let msg = ShardMessage::Single {
1043            request,
1044            reply: reply_tx,
1045        };
1046        self.tx
1047            .send(msg)
1048            .await
1049            .map_err(|_| ShardError::Unavailable)?;
1050        Ok(reply_rx)
1051    }
1052
1053    /// Sends a request using a caller-owned reply channel.
1054    ///
1055    /// Unlike `dispatch()`, this doesn't allocate a oneshot per call.
1056    /// The caller reuses the same `mpsc::Sender` across commands, saving
1057    /// a heap allocation per P=1 round-trip.
1058    pub async fn dispatch_reusable(
1059        &self,
1060        request: ShardRequest,
1061        reply: mpsc::Sender<ShardResponse>,
1062    ) -> Result<(), ShardError> {
1063        self.tx
1064            .send(ShardMessage::SingleReusable { request, reply })
1065            .await
1066            .map_err(|_| ShardError::Unavailable)
1067    }
1068
1069    /// Sends a batch of requests as a single channel message.
1070    ///
1071    /// Returns one receiver per request, in the same order. For a single
1072    /// request, falls through to `dispatch()` to avoid the batch overhead.
1073    ///
1074    /// This is the key optimization for pipelining: N commands targeting
1075    /// the same shard consume 1 channel slot instead of N.
1076    pub async fn dispatch_batch(
1077        &self,
1078        requests: Vec<ShardRequest>,
1079    ) -> Result<Vec<oneshot::Receiver<ShardResponse>>, ShardError> {
1080        if requests.len() == 1 {
1081            let rx = self
1082                .dispatch(requests.into_iter().next().expect("len == 1"))
1083                .await?;
1084            return Ok(vec![rx]);
1085        }
1086        let mut receivers = Vec::with_capacity(requests.len());
1087        let mut entries = Vec::with_capacity(requests.len());
1088        for request in requests {
1089            let (tx, rx) = oneshot::channel();
1090            entries.push((request, tx));
1091            receivers.push(rx);
1092        }
1093        self.tx
1094            .send(ShardMessage::Batch(entries))
1095            .await
1096            .map_err(|_| ShardError::Unavailable)?;
1097        Ok(receivers)
1098    }
1099}
1100
1101/// Everything needed to run a shard on a specific runtime.
1102///
1103/// Created by [`prepare_shard`] without spawning any tasks. The caller
1104/// is responsible for passing this to [`run_prepared`] on the desired
1105/// tokio runtime — this is the hook that enables thread-per-core
1106/// deployment where each worker thread runs its own shard.
1107pub struct PreparedShard {
1108    rx: mpsc::Receiver<ShardMessage>,
1109    config: ShardConfig,
1110    persistence: Option<ShardPersistenceConfig>,
1111    drop_handle: Option<DropHandle>,
1112    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1113    /// Optional channel to broadcast expired key names for keyspace notifications.
1114    expired_tx: Option<broadcast::Sender<String>>,
1115    #[cfg(feature = "protobuf")]
1116    schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1117}
1118
1119/// Creates the channel and prepared shard without spawning any tasks.
1120///
1121/// Returns the `ShardHandle` for sending commands and the `PreparedShard`
1122/// that must be driven on the target runtime via [`run_prepared`].
1123pub fn prepare_shard(
1124    buffer: usize,
1125    config: ShardConfig,
1126    persistence: Option<ShardPersistenceConfig>,
1127    drop_handle: Option<DropHandle>,
1128    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1129    expired_tx: Option<broadcast::Sender<String>>,
1130    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1131) -> (ShardHandle, PreparedShard) {
1132    let (tx, rx) = mpsc::channel(buffer);
1133    let prepared = PreparedShard {
1134        rx,
1135        config,
1136        persistence,
1137        drop_handle,
1138        replication_tx,
1139        expired_tx,
1140        #[cfg(feature = "protobuf")]
1141        schema_registry,
1142    };
1143    (ShardHandle { tx }, prepared)
1144}
1145
1146/// Runs the shard's main loop. Call this on the target runtime.
1147///
1148/// Consumes the `PreparedShard` and enters the infinite recv/expiry/fsync
1149/// select loop. Returns when the channel is closed (all senders dropped).
1150pub async fn run_prepared(prepared: PreparedShard) {
1151    run_shard(
1152        prepared.rx,
1153        prepared.config,
1154        prepared.persistence,
1155        prepared.drop_handle,
1156        prepared.replication_tx,
1157        prepared.expired_tx,
1158        #[cfg(feature = "protobuf")]
1159        prepared.schema_registry,
1160    )
1161    .await
1162}
1163
1164/// Spawns a shard task and returns the handle for communicating with it.
1165///
1166/// `buffer` controls the mpsc channel capacity — higher values absorb
1167/// burst traffic at the cost of memory. When `drop_handle` is provided,
1168/// large value deallocations are deferred to the background drop thread.
1169///
1170/// This is a convenience wrapper around [`prepare_shard`] + [`run_prepared`]
1171/// for the common case where you want to spawn on the current runtime.
1172pub fn spawn_shard(
1173    buffer: usize,
1174    config: ShardConfig,
1175    persistence: Option<ShardPersistenceConfig>,
1176    drop_handle: Option<DropHandle>,
1177    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1178    expired_tx: Option<broadcast::Sender<String>>,
1179    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1180) -> ShardHandle {
1181    let (handle, prepared) = prepare_shard(
1182        buffer,
1183        config,
1184        persistence,
1185        drop_handle,
1186        replication_tx,
1187        expired_tx,
1188        #[cfg(feature = "protobuf")]
1189        schema_registry,
1190    );
1191    tokio::spawn(run_prepared(prepared));
1192    handle
1193}
1194
1195/// The shard's main loop. Processes messages and runs periodic
1196/// active expiration until the channel closes.
1197async fn run_shard(
1198    mut rx: mpsc::Receiver<ShardMessage>,
1199    config: ShardConfig,
1200    persistence: Option<ShardPersistenceConfig>,
1201    drop_handle: Option<DropHandle>,
1202    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1203    expired_tx: Option<broadcast::Sender<String>>,
1204    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1205) {
1206    let shard_id = config.shard_id;
1207    let mut keyspace = Keyspace::with_config(config);
1208
1209    if let Some(handle) = drop_handle.clone() {
1210        keyspace.set_drop_handle(handle);
1211    }
1212
1213    // -- recovery --
1214    if let Some(ref pcfg) = persistence {
1215        #[cfg(feature = "encryption")]
1216        let result = if let Some(ref key) = pcfg.encryption_key {
1217            recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
1218        } else {
1219            recovery::recover_shard(&pcfg.data_dir, shard_id)
1220        };
1221        #[cfg(not(feature = "encryption"))]
1222        let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
1223        let count = result.entries.len();
1224        for entry in result.entries {
1225            let value = match entry.value {
1226                RecoveredValue::String(data) => Value::String(data),
1227                RecoveredValue::List(deque) => Value::List(deque),
1228                RecoveredValue::SortedSet(members) => {
1229                    let mut ss = crate::types::sorted_set::SortedSet::new();
1230                    for (score, member) in members {
1231                        ss.add(&member, score);
1232                    }
1233                    Value::SortedSet(Box::new(ss))
1234                }
1235                RecoveredValue::Hash(map) => {
1236                    Value::Hash(Box::new(crate::types::hash::HashValue::from(map)))
1237                }
1238                RecoveredValue::Set(set) => Value::Set(Box::new(set)),
1239                #[cfg(feature = "vector")]
1240                RecoveredValue::Vector {
1241                    metric,
1242                    quantization,
1243                    connectivity,
1244                    expansion_add,
1245                    elements,
1246                } => {
1247                    use crate::types::vector::{DistanceMetric, QuantizationType, VectorSet};
1248                    let dim = elements.first().map(|(_, v)| v.len()).unwrap_or(0);
1249                    match VectorSet::new(
1250                        dim,
1251                        DistanceMetric::from_u8(metric),
1252                        QuantizationType::from_u8(quantization),
1253                        connectivity as usize,
1254                        expansion_add as usize,
1255                    ) {
1256                        Ok(mut vs) => {
1257                            for (element, vector) in elements {
1258                                if let Err(e) = vs.add(element, &vector) {
1259                                    warn!("vector recovery: failed to add element: {e}");
1260                                }
1261                            }
1262                            Value::Vector(vs)
1263                        }
1264                        Err(e) => {
1265                            warn!("vector recovery: failed to create index: {e}");
1266                            continue;
1267                        }
1268                    }
1269                }
1270                #[cfg(feature = "protobuf")]
1271                RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
1272            };
1273            keyspace.restore(entry.key, value, entry.ttl);
1274        }
1275        if count > 0 {
1276            info!(
1277                shard_id,
1278                recovered_keys = count,
1279                snapshot = result.loaded_snapshot,
1280                aof = result.replayed_aof,
1281                "recovered shard state"
1282            );
1283        }
1284
1285        // restore schemas found in the AOF into the shared registry
1286        #[cfg(feature = "protobuf")]
1287        if let Some(ref registry) = schema_registry {
1288            if !result.schemas.is_empty() {
1289                if let Ok(mut reg) = registry.write() {
1290                    let schema_count = result.schemas.len();
1291                    for (name, descriptor) in result.schemas {
1292                        reg.restore(name, descriptor);
1293                    }
1294                    info!(
1295                        shard_id,
1296                        schemas = schema_count,
1297                        "restored schemas from AOF"
1298                    );
1299                }
1300            }
1301        }
1302    }
1303
1304    // -- AOF writer --
1305    let mut aof_writer: Option<AofWriter> = match &persistence {
1306        Some(pcfg) if pcfg.append_only => {
1307            let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
1308            #[cfg(feature = "encryption")]
1309            let result = if let Some(ref key) = pcfg.encryption_key {
1310                AofWriter::open_encrypted(path, key.clone())
1311            } else {
1312                AofWriter::open(path)
1313            };
1314            #[cfg(not(feature = "encryption"))]
1315            let result = AofWriter::open(path);
1316            match result {
1317                Ok(w) => Some(w),
1318                Err(e) => {
1319                    warn!(shard_id, "failed to open AOF writer: {e}");
1320                    None
1321                }
1322            }
1323        }
1324        _ => None,
1325    };
1326
1327    let fsync_policy = persistence
1328        .as_ref()
1329        .map(|p| p.fsync_policy)
1330        .unwrap_or(FsyncPolicy::No);
1331
1332    // monotonically increasing per-shard replication offset
1333    let mut replication_offset: u64 = 0;
1334
1335    // waiter registries for blocking list operations (BLPOP/BRPOP)
1336    let mut lpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1337    let mut rpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1338
1339    // consecutive AOF write/sync failure counter for rate-limited logging
1340    let mut aof_errors: u32 = 0;
1341
1342    // when true, write commands are rejected with an error until disk
1343    // space is available again (detected on the periodic fsync tick).
1344    let mut disk_full: bool = false;
1345
1346    // -- tickers --
1347    let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
1348    expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1349
1350    let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
1351    fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1352
1353    loop {
1354        tokio::select! {
1355            msg = rx.recv() => {
1356                match msg {
1357                    Some(msg) => {
1358                        let mut ctx = ProcessCtx {
1359                            keyspace: &mut keyspace,
1360                            aof_writer: &mut aof_writer,
1361                            fsync_policy,
1362                            persistence: &persistence,
1363                            drop_handle: &drop_handle,
1364                            shard_id,
1365                            replication_tx: &replication_tx,
1366                            replication_offset: &mut replication_offset,
1367                            lpop_waiters: &mut lpop_waiters,
1368                            rpop_waiters: &mut rpop_waiters,
1369                            aof_errors: &mut aof_errors,
1370                            disk_full: &mut disk_full,
1371                            #[cfg(feature = "protobuf")]
1372                            schema_registry: &schema_registry,
1373                        };
1374                        process_message(msg, &mut ctx);
1375
1376                        // drain any pending messages without re-entering select!.
1377                        // this amortizes the select! overhead across bursts of
1378                        // pipelined commands that arrived while we processed the
1379                        // first message.
1380                        while let Ok(msg) = rx.try_recv() {
1381                            process_message(msg, &mut ctx);
1382                        }
1383                    }
1384                    None => break, // channel closed, shard shutting down
1385                }
1386            }
1387            _ = expiry_tick.tick() => {
1388                let expired_keys = expiry::run_expiration_cycle(&mut keyspace);
1389                if let Some(ref tx) = expired_tx {
1390                    if !expired_keys.is_empty() && tx.receiver_count() > 0 {
1391                        for key in &expired_keys {
1392                            let _ = tx.send(key.clone());
1393                        }
1394                    }
1395                }
1396            }
1397            _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
1398                if let Some(ref mut writer) = aof_writer {
1399                    if let Err(e) = writer.sync() {
1400                        if aof::log_aof_error(shard_id, &mut aof_errors, "sync", &e) {
1401                            disk_full = true;
1402                        }
1403                    } else if aof_errors > 0 {
1404                        let missed = aof_errors;
1405                        aof_errors = 0;
1406                        if disk_full {
1407                            disk_full = false;
1408                            info!(shard_id, missed_errors = missed, "aof sync recovered, accepting writes again");
1409                        } else {
1410                            info!(shard_id, missed_errors = missed, "aof sync recovered");
1411                        }
1412                    }
1413                }
1414            }
1415        }
1416    }
1417
1418    // flush AOF on clean shutdown
1419    if let Some(ref mut writer) = aof_writer {
1420        let _ = writer.sync();
1421    }
1422}
1423
1424/// Per-shard processing context passed into `process_message`.
1425///
1426/// Groups the mutable and configuration fields so the call site stays
1427/// readable and the parameter count stays reasonable.
1428struct ProcessCtx<'a> {
1429    keyspace: &'a mut Keyspace,
1430    aof_writer: &'a mut Option<AofWriter>,
1431    fsync_policy: FsyncPolicy,
1432    persistence: &'a Option<ShardPersistenceConfig>,
1433    drop_handle: &'a Option<DropHandle>,
1434    shard_id: u16,
1435    replication_tx: &'a Option<broadcast::Sender<ReplicationEvent>>,
1436    replication_offset: &'a mut u64,
1437    /// Waiters for BLPOP — keyed by list name, FIFO order.
1438    lpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1439    /// Waiters for BRPOP — keyed by list name, FIFO order.
1440    rpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1441    /// Consecutive AOF write/sync failures. Used to rate-limit error logging
1442    /// so a sustained disk-full condition doesn't flood logs.
1443    aof_errors: &'a mut u32,
1444    /// When true, write commands are rejected until disk space recovers.
1445    disk_full: &'a mut bool,
1446    #[cfg(feature = "protobuf")]
1447    schema_registry: &'a Option<crate::schema::SharedSchemaRegistry>,
1448}
1449
1450/// Dispatches a single or batched message to the shard's keyspace.
1451///
1452/// Called both in the main `recv()` path and in the `try_recv()` drain loop
1453/// to amortize tokio select overhead across pipelined commands.
1454fn process_message(msg: ShardMessage, ctx: &mut ProcessCtx<'_>) {
1455    match msg {
1456        ShardMessage::Single { request, reply } => {
1457            process_single(request, ReplySender::Oneshot(reply), ctx);
1458        }
1459        ShardMessage::SingleReusable { request, reply } => {
1460            process_single(request, ReplySender::Reusable(reply), ctx);
1461        }
1462        ShardMessage::Batch(entries) => {
1463            for (request, reply) in entries {
1464                process_single(request, ReplySender::Oneshot(reply), ctx);
1465            }
1466        }
1467    }
1468}
1469
1470/// Wraps either a oneshot or reusable mpsc sender for reply delivery.
1471///
1472/// The oneshot path is used for pipelined and batch commands. The reusable
1473/// path avoids per-command allocation for the P=1 fast path.
1474enum ReplySender {
1475    Oneshot(oneshot::Sender<ShardResponse>),
1476    Reusable(mpsc::Sender<ShardResponse>),
1477}
1478
1479impl ReplySender {
1480    fn send(self, response: ShardResponse) {
1481        match self {
1482            ReplySender::Oneshot(tx) => {
1483                let _ = tx.send(response);
1484            }
1485            ReplySender::Reusable(tx) => {
1486                // capacity is 1 and the receiver always drains before
1487                // sending the next command, so try_send won't fail
1488                // under normal operation.
1489                if let Err(e) = tx.try_send(response) {
1490                    debug!("reusable reply channel full or closed: {e}");
1491                }
1492            }
1493        }
1494    }
1495}
1496
1497/// Processes a single request: dispatch, write AOF, broadcast replication,
1498/// and send the response on the reply channel.
1499fn process_single(mut request: ShardRequest, reply: ReplySender, ctx: &mut ProcessCtx<'_>) {
1500    // copy cheap fields upfront to avoid field-borrow conflicts below
1501    let fsync_policy = ctx.fsync_policy;
1502    let shard_id = ctx.shard_id;
1503
1504    // reject writes when AOF is enabled and disk is full. reads and admin
1505    // commands still go through so operators can inspect and recover.
1506    if *ctx.disk_full && ctx.aof_writer.is_some() && request.is_write() {
1507        reply.send(ShardResponse::Err(
1508            "ERR disk full, write rejected — free disk space to resume writes".into(),
1509        ));
1510        return;
1511    }
1512
1513    // handle blocking pop requests before dispatch — they carry a waiter
1514    // oneshot that must be consumed here rather than going through the
1515    // normal dispatch → response path.
1516    match request {
1517        ShardRequest::BLPop { key, waiter } => {
1518            blocking::handle_blocking_pop(&key, waiter, true, reply, ctx);
1519            return;
1520        }
1521        ShardRequest::BRPop { key, waiter } => {
1522            blocking::handle_blocking_pop(&key, waiter, false, reply, ctx);
1523            return;
1524        }
1525        _ => {}
1526    }
1527
1528    let request_kind = describe_request(&request);
1529    let mut response = dispatch(
1530        ctx.keyspace,
1531        &mut request,
1532        #[cfg(feature = "protobuf")]
1533        ctx.schema_registry,
1534    );
1535
1536    // after LPush/RPush, check if any blocked clients are waiting.
1537    // done before consuming the request so we can borrow the key.
1538    if let ShardRequest::LPush { ref key, .. } | ShardRequest::RPush { ref key, .. } = request {
1539        if matches!(response, ShardResponse::Len(_)) {
1540            blocking::wake_blocked_waiters(key, ctx);
1541        }
1542    }
1543
1544    // consume the request to move owned data into AOF records (avoids cloning).
1545    // response is &mut so VAddBatch can steal applied entries instead of cloning
1546    // vectors — the connection handler only uses added_count.
1547    let records = aof::to_aof_records(request, &mut response);
1548
1549    // write AOF records for successful mutations
1550    if let Some(ref mut writer) = *ctx.aof_writer {
1551        let mut batch_ok = true;
1552        for record in &records {
1553            if let Err(e) = writer.write_record(record) {
1554                if aof::log_aof_error(shard_id, ctx.aof_errors, "write", &e) {
1555                    *ctx.disk_full = true;
1556                }
1557                batch_ok = false;
1558            }
1559        }
1560        if !records.is_empty() && fsync_policy == FsyncPolicy::Always {
1561            if let Err(e) = writer.sync() {
1562                if aof::log_aof_error(shard_id, ctx.aof_errors, "sync", &e) {
1563                    *ctx.disk_full = true;
1564                }
1565                batch_ok = false;
1566            }
1567        }
1568        if batch_ok && *ctx.aof_errors > 0 {
1569            let missed = *ctx.aof_errors;
1570            *ctx.aof_errors = 0;
1571            *ctx.disk_full = false;
1572            info!(shard_id, missed_errors = missed, "aof writes recovered");
1573        }
1574    }
1575
1576    // broadcast mutation events to replication subscribers
1577    if let Some(ref tx) = *ctx.replication_tx {
1578        for record in records {
1579            *ctx.replication_offset += 1;
1580            // ignore send errors — no subscribers or lagged consumers
1581            let _ = tx.send(ReplicationEvent {
1582                shard_id,
1583                offset: *ctx.replication_offset,
1584                record,
1585            });
1586        }
1587    }
1588
1589    // handle special requests that need access to persistence state
1590    match request_kind {
1591        RequestKind::Snapshot => {
1592            let resp = persistence::handle_snapshot(ctx.keyspace, ctx.persistence, shard_id);
1593            reply.send(resp);
1594            return;
1595        }
1596        RequestKind::SerializeSnapshot => {
1597            let resp = persistence::handle_serialize_snapshot(ctx.keyspace, shard_id);
1598            reply.send(resp);
1599            return;
1600        }
1601        RequestKind::RewriteAof => {
1602            let resp = persistence::handle_rewrite(
1603                ctx.keyspace,
1604                ctx.persistence,
1605                ctx.aof_writer,
1606                shard_id,
1607                #[cfg(feature = "protobuf")]
1608                ctx.schema_registry,
1609            );
1610            reply.send(resp);
1611            return;
1612        }
1613        RequestKind::FlushDbAsync => {
1614            let old_entries = ctx.keyspace.flush_async();
1615            if let Some(ref handle) = *ctx.drop_handle {
1616                handle.defer_entries(old_entries);
1617            }
1618            reply.send(ShardResponse::Ok);
1619            return;
1620        }
1621        RequestKind::UpdateMemoryConfig {
1622            max_memory,
1623            eviction_policy,
1624        } => {
1625            ctx.keyspace
1626                .update_memory_config(max_memory, eviction_policy);
1627            reply.send(ShardResponse::Ok);
1628            return;
1629        }
1630        RequestKind::Other => {}
1631    }
1632
1633    reply.send(response);
1634}
1635
1636/// Lightweight tag so we can identify requests that need special
1637/// handling after dispatch without borrowing the request again.
1638enum RequestKind {
1639    Snapshot,
1640    SerializeSnapshot,
1641    RewriteAof,
1642    FlushDbAsync,
1643    UpdateMemoryConfig {
1644        max_memory: Option<usize>,
1645        eviction_policy: EvictionPolicy,
1646    },
1647    Other,
1648}
1649
1650fn describe_request(req: &ShardRequest) -> RequestKind {
1651    match req {
1652        ShardRequest::Snapshot => RequestKind::Snapshot,
1653        ShardRequest::SerializeSnapshot => RequestKind::SerializeSnapshot,
1654        ShardRequest::RewriteAof => RequestKind::RewriteAof,
1655        ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
1656        ShardRequest::UpdateMemoryConfig {
1657            max_memory,
1658            eviction_policy,
1659        } => RequestKind::UpdateMemoryConfig {
1660            max_memory: *max_memory,
1661            eviction_policy: *eviction_policy,
1662        },
1663        _ => RequestKind::Other,
1664    }
1665}
1666
1667/// Converts an `IncrError` result into a `ShardResponse::Integer`.
1668fn incr_result(result: Result<i64, IncrError>) -> ShardResponse {
1669    match result {
1670        Ok(val) => ShardResponse::Integer(val),
1671        Err(IncrError::WrongType) => ShardResponse::WrongType,
1672        Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
1673        Err(e) => ShardResponse::Err(e.to_string()),
1674    }
1675}
1676
1677/// Converts a `WriteError` result into a `ShardResponse::Len`.
1678fn write_result_len(result: Result<usize, WriteError>) -> ShardResponse {
1679    match result {
1680        Ok(len) => ShardResponse::Len(len),
1681        Err(WriteError::WrongType) => ShardResponse::WrongType,
1682        Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1683    }
1684}
1685
1686fn store_set_response(result: Result<(usize, Vec<String>), WriteError>) -> ShardResponse {
1687    match result {
1688        Ok((count, members)) => ShardResponse::SetStoreResult { count, members },
1689        Err(WriteError::WrongType) => ShardResponse::WrongType,
1690        Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1691    }
1692}
1693
1694/// Routes a request to the appropriate keyspace operation and returns a response.
1695///
1696/// This is the hot path — every read and write goes through here.
1697fn dispatch(
1698    ks: &mut Keyspace,
1699    req: &mut ShardRequest,
1700    #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1701) -> ShardResponse {
1702    match req {
1703        ShardRequest::Get { key } => match ks.get_string(key) {
1704            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1705            Err(_) => ShardResponse::WrongType,
1706        },
1707        ShardRequest::Set {
1708            key,
1709            value,
1710            expire,
1711            nx,
1712            xx,
1713        } => match ks.set(key.clone(), value.clone(), *expire, *nx, *xx) {
1714            SetResult::Ok => ShardResponse::Ok,
1715            SetResult::Blocked => ShardResponse::Value(None),
1716            SetResult::OutOfMemory => ShardResponse::OutOfMemory,
1717        },
1718        ShardRequest::Incr { key } => incr_result(ks.incr(key)),
1719        ShardRequest::Decr { key } => incr_result(ks.decr(key)),
1720        ShardRequest::IncrBy { key, delta } => incr_result(ks.incr_by(key, *delta)),
1721        ShardRequest::DecrBy { key, delta } => match delta.checked_neg() {
1722            Some(neg) => incr_result(ks.incr_by(key, neg)),
1723            None => ShardResponse::Err("ERR increment or decrement would overflow".into()),
1724        },
1725        ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
1726            Ok(val) => ShardResponse::BulkString(val),
1727            Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
1728            Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
1729            Err(e) => ShardResponse::Err(e.to_string()),
1730        },
1731        ShardRequest::Append { key, value } => write_result_len(ks.append(key, value)),
1732        ShardRequest::Strlen { key } => match ks.strlen(key) {
1733            Ok(len) => ShardResponse::Len(len),
1734            Err(_) => ShardResponse::WrongType,
1735        },
1736        ShardRequest::GetRange { key, start, end } => match ks.getrange(key, *start, *end) {
1737            Ok(data) => ShardResponse::Value(Some(Value::String(data))),
1738            Err(_) => ShardResponse::WrongType,
1739        },
1740        ShardRequest::SetRange { key, offset, value } => {
1741            write_result_len(ks.setrange(key, *offset, value))
1742        }
1743        ShardRequest::GetBit { key, offset } => match ks.getbit(key, *offset) {
1744            Ok(bit) => ShardResponse::Integer(bit as i64),
1745            Err(_) => ShardResponse::WrongType,
1746        },
1747        ShardRequest::SetBit { key, offset, value } => match ks.setbit(key, *offset, *value) {
1748            Ok(old_bit) => ShardResponse::Integer(old_bit as i64),
1749            Err(WriteError::WrongType) => ShardResponse::WrongType,
1750            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1751        },
1752        ShardRequest::BitCount { key, range } => match ks.bitcount(key, *range) {
1753            Ok(count) => ShardResponse::Integer(count as i64),
1754            Err(_) => ShardResponse::WrongType,
1755        },
1756        ShardRequest::BitPos { key, bit, range } => match ks.bitpos(key, *bit, *range) {
1757            Ok(pos) => ShardResponse::Integer(pos),
1758            Err(_) => ShardResponse::WrongType,
1759        },
1760        ShardRequest::BitOp { op, dest, keys } => match ks.bitop(*op, dest.clone(), keys) {
1761            Ok(len) => ShardResponse::Integer(len as i64),
1762            Err(WriteError::WrongType) => ShardResponse::WrongType,
1763            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1764        },
1765        ShardRequest::Keys { pattern } => {
1766            let keys = ks.keys(pattern);
1767            ShardResponse::StringArray(keys)
1768        }
1769        ShardRequest::Rename { key, newkey } => {
1770            use crate::keyspace::RenameError;
1771            match ks.rename(key, newkey) {
1772                Ok(()) => ShardResponse::Ok,
1773                Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1774            }
1775        }
1776        ShardRequest::Copy {
1777            source,
1778            destination,
1779            replace,
1780        } => {
1781            use crate::keyspace::CopyError;
1782            match ks.copy(source, destination, *replace) {
1783                Ok(copied) => ShardResponse::Bool(copied),
1784                Err(CopyError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1785                Err(CopyError::OutOfMemory) => ShardResponse::OutOfMemory,
1786            }
1787        }
1788        ShardRequest::ObjectEncoding { key } => ShardResponse::EncodingName(ks.encoding(key)),
1789        ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
1790        ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
1791        ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
1792        ShardRequest::RandomKey => match ks.random_key() {
1793            Some(k) => ShardResponse::StringArray(vec![k]),
1794            None => ShardResponse::StringArray(vec![]),
1795        },
1796        ShardRequest::Touch { key } => ShardResponse::Bool(ks.touch(key)),
1797        ShardRequest::Sort {
1798            key,
1799            desc,
1800            alpha,
1801            limit,
1802        } => match ks.sort(key, *desc, *alpha, *limit) {
1803            Ok(items) => ShardResponse::Array(items),
1804            Err(_) => ShardResponse::WrongType,
1805        },
1806        ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
1807        ShardRequest::Expireat { key, timestamp } => {
1808            ShardResponse::Bool(ks.expireat(key, *timestamp))
1809        }
1810        ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
1811        ShardRequest::MemoryUsage { key } => {
1812            ShardResponse::Integer(ks.memory_usage(key).map(|n| n as i64).unwrap_or(-1))
1813        }
1814        ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
1815        ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
1816        ShardRequest::Pexpire { key, milliseconds } => {
1817            ShardResponse::Bool(ks.pexpire(key, *milliseconds))
1818        }
1819        ShardRequest::Pexpireat { key, timestamp_ms } => {
1820            ShardResponse::Bool(ks.pexpireat(key, *timestamp_ms))
1821        }
1822        ShardRequest::LPush { key, values } => write_result_len(ks.lpush(key, values)),
1823        ShardRequest::RPush { key, values } => write_result_len(ks.rpush(key, values)),
1824        ShardRequest::LPop { key } => match ks.lpop(key) {
1825            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1826            Err(_) => ShardResponse::WrongType,
1827        },
1828        ShardRequest::RPop { key } => match ks.rpop(key) {
1829            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1830            Err(_) => ShardResponse::WrongType,
1831        },
1832        ShardRequest::LPopCount { key, count } => match ks.lpop_count(key, *count) {
1833            Ok(Some(items)) => ShardResponse::Array(items),
1834            Ok(None) => ShardResponse::Value(None),
1835            Err(_) => ShardResponse::WrongType,
1836        },
1837        ShardRequest::RPopCount { key, count } => match ks.rpop_count(key, *count) {
1838            Ok(Some(items)) => ShardResponse::Array(items),
1839            Ok(None) => ShardResponse::Value(None),
1840            Err(_) => ShardResponse::WrongType,
1841        },
1842        ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
1843            Ok(items) => ShardResponse::Array(items),
1844            Err(_) => ShardResponse::WrongType,
1845        },
1846        ShardRequest::LLen { key } => match ks.llen(key) {
1847            Ok(len) => ShardResponse::Len(len),
1848            Err(_) => ShardResponse::WrongType,
1849        },
1850        ShardRequest::LIndex { key, index } => match ks.lindex(key, *index) {
1851            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1852            Err(_) => ShardResponse::WrongType,
1853        },
1854        ShardRequest::LSet { key, index, value } => match ks.lset(key, *index, value.clone()) {
1855            Ok(()) => ShardResponse::Ok,
1856            Err(e) => match e {
1857                LsetError::WrongType => ShardResponse::WrongType,
1858                LsetError::NoSuchKey => ShardResponse::Err("ERR no such key".into()),
1859                LsetError::IndexOutOfRange => ShardResponse::Err("ERR index out of range".into()),
1860            },
1861        },
1862        ShardRequest::LTrim { key, start, stop } => match ks.ltrim(key, *start, *stop) {
1863            Ok(()) => ShardResponse::Ok,
1864            Err(_) => ShardResponse::WrongType,
1865        },
1866        ShardRequest::LInsert {
1867            key,
1868            before,
1869            pivot,
1870            value,
1871        } => match ks.linsert(key, *before, pivot, value.clone()) {
1872            Ok(n) => ShardResponse::Integer(n),
1873            Err(WriteError::WrongType) => ShardResponse::WrongType,
1874            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1875        },
1876        ShardRequest::LRem { key, count, value } => match ks.lrem(key, *count, value) {
1877            Ok(n) => ShardResponse::Len(n),
1878            Err(_) => ShardResponse::WrongType,
1879        },
1880        ShardRequest::LPos {
1881            key,
1882            element,
1883            rank,
1884            count,
1885            maxlen,
1886        } => match ks.lpos(key, element, *rank, *count, *maxlen) {
1887            Ok(positions) => ShardResponse::IntegerArray(positions),
1888            Err(_) => ShardResponse::WrongType,
1889        },
1890        ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
1891        ShardRequest::ZAdd {
1892            key,
1893            members,
1894            nx,
1895            xx,
1896            gt,
1897            lt,
1898            ch,
1899        } => {
1900            let flags = ZAddFlags {
1901                nx: *nx,
1902                xx: *xx,
1903                gt: *gt,
1904                lt: *lt,
1905                ch: *ch,
1906            };
1907            match ks.zadd(key, members, &flags) {
1908                Ok(result) => ShardResponse::ZAddLen {
1909                    count: result.count,
1910                    applied: result.applied,
1911                },
1912                Err(WriteError::WrongType) => ShardResponse::WrongType,
1913                Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1914            }
1915        }
1916        ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
1917            Ok(removed) => ShardResponse::ZRemLen {
1918                count: removed.len(),
1919                removed,
1920            },
1921            Err(_) => ShardResponse::WrongType,
1922        },
1923        ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
1924            Ok(score) => ShardResponse::Score(score),
1925            Err(_) => ShardResponse::WrongType,
1926        },
1927        ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
1928            Ok(rank) => ShardResponse::Rank(rank),
1929            Err(_) => ShardResponse::WrongType,
1930        },
1931        ShardRequest::ZCard { key } => match ks.zcard(key) {
1932            Ok(len) => ShardResponse::Len(len),
1933            Err(_) => ShardResponse::WrongType,
1934        },
1935        ShardRequest::ZRevRank { key, member } => match ks.zrevrank(key, member) {
1936            Ok(rank) => ShardResponse::Rank(rank),
1937            Err(_) => ShardResponse::WrongType,
1938        },
1939        ShardRequest::ZRange {
1940            key, start, stop, ..
1941        } => match ks.zrange(key, *start, *stop) {
1942            Ok(items) => ShardResponse::ScoredArray(items),
1943            Err(_) => ShardResponse::WrongType,
1944        },
1945        ShardRequest::ZRevRange {
1946            key, start, stop, ..
1947        } => match ks.zrevrange(key, *start, *stop) {
1948            Ok(items) => ShardResponse::ScoredArray(items),
1949            Err(_) => ShardResponse::WrongType,
1950        },
1951        ShardRequest::ZCount { key, min, max } => match ks.zcount(key, *min, *max) {
1952            Ok(count) => ShardResponse::Len(count),
1953            Err(_) => ShardResponse::WrongType,
1954        },
1955        ShardRequest::ZIncrBy {
1956            key,
1957            increment,
1958            member,
1959        } => match ks.zincrby(key, *increment, member) {
1960            Ok(new_score) => ShardResponse::ZIncrByResult {
1961                new_score,
1962                member: member.clone(),
1963            },
1964            Err(WriteError::WrongType) => ShardResponse::WrongType,
1965            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1966        },
1967        ShardRequest::ZRangeByScore {
1968            key,
1969            min,
1970            max,
1971            offset,
1972            count,
1973        } => match ks.zrangebyscore(key, *min, *max, *offset, *count) {
1974            Ok(items) => ShardResponse::ScoredArray(items),
1975            Err(_) => ShardResponse::WrongType,
1976        },
1977        ShardRequest::ZRevRangeByScore {
1978            key,
1979            min,
1980            max,
1981            offset,
1982            count,
1983        } => match ks.zrevrangebyscore(key, *min, *max, *offset, *count) {
1984            Ok(items) => ShardResponse::ScoredArray(items),
1985            Err(_) => ShardResponse::WrongType,
1986        },
1987        ShardRequest::ZPopMin { key, count } => match ks.zpopmin(key, *count) {
1988            Ok(items) => ShardResponse::ZPopResult(items),
1989            Err(_) => ShardResponse::WrongType,
1990        },
1991        ShardRequest::ZPopMax { key, count } => match ks.zpopmax(key, *count) {
1992            Ok(items) => ShardResponse::ZPopResult(items),
1993            Err(_) => ShardResponse::WrongType,
1994        },
1995        ShardRequest::LmpopSingle { key, left, count } => {
1996            let result = if *left {
1997                ks.lpop_count(key, *count)
1998            } else {
1999                ks.rpop_count(key, *count)
2000            };
2001            match result {
2002                Ok(Some(items)) => ShardResponse::Array(items),
2003                Ok(None) => ShardResponse::Value(None),
2004                Err(_) => ShardResponse::WrongType,
2005            }
2006        }
2007        ShardRequest::ZmpopSingle { key, min, count } => {
2008            let result = if *min {
2009                ks.zpopmin(key, *count)
2010            } else {
2011                ks.zpopmax(key, *count)
2012            };
2013            match result {
2014                Ok(items) if !items.is_empty() => ShardResponse::ZPopResult(items),
2015                Ok(_) => ShardResponse::Value(None),
2016                Err(_) => ShardResponse::WrongType,
2017            }
2018        }
2019        ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
2020        ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
2021        ShardRequest::KeyVersion { ref key } => ShardResponse::Version(ks.key_version(key)),
2022        ShardRequest::FlushDb => {
2023            ks.clear();
2024            ShardResponse::Ok
2025        }
2026        ShardRequest::Scan {
2027            cursor,
2028            count,
2029            pattern,
2030        } => {
2031            let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
2032            ShardResponse::Scan {
2033                cursor: next_cursor,
2034                keys,
2035            }
2036        }
2037        ShardRequest::HSet { key, fields } => write_result_len(ks.hset(key, fields)),
2038        ShardRequest::HGet { key, field } => match ks.hget(key, field) {
2039            Ok(val) => ShardResponse::Value(val.map(Value::String)),
2040            Err(_) => ShardResponse::WrongType,
2041        },
2042        ShardRequest::HGetAll { key } => match ks.hgetall(key) {
2043            Ok(fields) => ShardResponse::HashFields(fields),
2044            Err(_) => ShardResponse::WrongType,
2045        },
2046        ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
2047            Ok(removed) => ShardResponse::HDelLen {
2048                count: removed.len(),
2049                removed,
2050            },
2051            Err(_) => ShardResponse::WrongType,
2052        },
2053        ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
2054            Ok(exists) => ShardResponse::Bool(exists),
2055            Err(_) => ShardResponse::WrongType,
2056        },
2057        ShardRequest::HLen { key } => match ks.hlen(key) {
2058            Ok(len) => ShardResponse::Len(len),
2059            Err(_) => ShardResponse::WrongType,
2060        },
2061        ShardRequest::HIncrBy { key, field, delta } => incr_result(ks.hincrby(key, field, *delta)),
2062        ShardRequest::HIncrByFloat { key, field, delta } => {
2063            match ks.hincrbyfloat(key, field, *delta) {
2064                Ok(val) => ShardResponse::BulkString(val),
2065                Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
2066                Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
2067                Err(e) => ShardResponse::Err(e.to_string()),
2068            }
2069        }
2070        ShardRequest::HKeys { key } => match ks.hkeys(key) {
2071            Ok(keys) => ShardResponse::StringArray(keys),
2072            Err(_) => ShardResponse::WrongType,
2073        },
2074        ShardRequest::HVals { key } => match ks.hvals(key) {
2075            Ok(vals) => ShardResponse::Array(vals),
2076            Err(_) => ShardResponse::WrongType,
2077        },
2078        ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
2079            Ok(vals) => ShardResponse::OptionalArray(vals),
2080            Err(_) => ShardResponse::WrongType,
2081        },
2082        ShardRequest::HRandField {
2083            key,
2084            count,
2085            with_values,
2086        } => match ks.hrandfield(key, *count, *with_values) {
2087            Ok(pairs) => ShardResponse::HRandFieldResult(pairs),
2088            Err(_) => ShardResponse::WrongType,
2089        },
2090        ShardRequest::SAdd { key, members } => write_result_len(ks.sadd(key, members)),
2091        ShardRequest::SRem { key, members } => match ks.srem(key, members) {
2092            Ok(count) => ShardResponse::Len(count),
2093            Err(_) => ShardResponse::WrongType,
2094        },
2095        ShardRequest::SMembers { key } => match ks.smembers(key) {
2096            Ok(members) => ShardResponse::StringArray(members),
2097            Err(_) => ShardResponse::WrongType,
2098        },
2099        ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
2100            Ok(exists) => ShardResponse::Bool(exists),
2101            Err(_) => ShardResponse::WrongType,
2102        },
2103        ShardRequest::SCard { key } => match ks.scard(key) {
2104            Ok(count) => ShardResponse::Len(count),
2105            Err(_) => ShardResponse::WrongType,
2106        },
2107        ShardRequest::SUnion { keys } => match ks.sunion(keys) {
2108            Ok(members) => ShardResponse::StringArray(members),
2109            Err(_) => ShardResponse::WrongType,
2110        },
2111        ShardRequest::SInter { keys } => match ks.sinter(keys) {
2112            Ok(members) => ShardResponse::StringArray(members),
2113            Err(_) => ShardResponse::WrongType,
2114        },
2115        ShardRequest::SDiff { keys } => match ks.sdiff(keys) {
2116            Ok(members) => ShardResponse::StringArray(members),
2117            Err(_) => ShardResponse::WrongType,
2118        },
2119        ShardRequest::SUnionStore { dest, keys } => store_set_response(ks.sunionstore(dest, keys)),
2120        ShardRequest::SInterStore { dest, keys } => store_set_response(ks.sinterstore(dest, keys)),
2121        ShardRequest::SDiffStore { dest, keys } => store_set_response(ks.sdiffstore(dest, keys)),
2122        ShardRequest::SRandMember { key, count } => match ks.srandmember(key, *count) {
2123            Ok(members) => ShardResponse::StringArray(members),
2124            Err(_) => ShardResponse::WrongType,
2125        },
2126        ShardRequest::SPop { key, count } => match ks.spop(key, *count) {
2127            Ok(members) => ShardResponse::StringArray(members),
2128            Err(_) => ShardResponse::WrongType,
2129        },
2130        ShardRequest::SMisMember { key, members } => match ks.smismember(key, members) {
2131            Ok(results) => ShardResponse::BoolArray(results),
2132            Err(_) => ShardResponse::WrongType,
2133        },
2134        ShardRequest::SMove {
2135            source,
2136            destination,
2137            member,
2138        } => match ks.smove(source, destination, member) {
2139            Ok(moved) => ShardResponse::Bool(moved),
2140            Err(WriteError::WrongType) => ShardResponse::WrongType,
2141            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2142        },
2143        ShardRequest::SInterCard { keys, limit } => match ks.sintercard(keys, *limit) {
2144            Ok(n) => ShardResponse::Integer(n as i64),
2145            Err(_) => ShardResponse::WrongType,
2146        },
2147        ShardRequest::Expiretime { key } => ShardResponse::Integer(ks.expiretime(key)),
2148        ShardRequest::Pexpiretime { key } => ShardResponse::Integer(ks.pexpiretime(key)),
2149        ShardRequest::LMove {
2150            source,
2151            destination,
2152            src_left,
2153            dst_left,
2154        } => match ks.lmove(source, destination, *src_left, *dst_left) {
2155            Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2156            Ok(None) => ShardResponse::Value(None),
2157            Err(e) => match e {
2158                WriteError::WrongType => ShardResponse::WrongType,
2159                WriteError::OutOfMemory => ShardResponse::OutOfMemory,
2160            },
2161        },
2162        ShardRequest::GetDel { key } => match ks.getdel(key) {
2163            Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2164            Ok(None) => ShardResponse::Value(None),
2165            Err(_) => ShardResponse::WrongType,
2166        },
2167        ShardRequest::GetEx { key, expire } => {
2168            let dur = expire.map(|opt| opt.map(Duration::from_millis));
2169            match ks.getex(key, dur) {
2170                Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2171                Ok(None) => ShardResponse::Value(None),
2172                Err(_) => ShardResponse::WrongType,
2173            }
2174        }
2175        ShardRequest::GetSet { key, value } => match ks.getset(key, value.clone()) {
2176            Ok(old) => ShardResponse::Value(old.map(Value::String)),
2177            Err(_) => ShardResponse::WrongType,
2178        },
2179        ShardRequest::MSetNx { pairs } => {
2180            let result = ks.msetnx(pairs);
2181            ShardResponse::Bool(result)
2182        }
2183        ShardRequest::ZDiff { keys } => match ks.zdiff(keys) {
2184            Ok(pairs) => ShardResponse::ScoredArray(pairs),
2185            Err(_) => ShardResponse::WrongType,
2186        },
2187        ShardRequest::ZInter { keys } => match ks.zinter(keys) {
2188            Ok(pairs) => ShardResponse::ScoredArray(pairs),
2189            Err(_) => ShardResponse::WrongType,
2190        },
2191        ShardRequest::ZUnion { keys } => match ks.zunion(keys) {
2192            Ok(pairs) => ShardResponse::ScoredArray(pairs),
2193            Err(_) => ShardResponse::WrongType,
2194        },
2195        ShardRequest::ZDiffStore { dest, keys } => match ks.zdiffstore(dest, keys) {
2196            Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2197            Err(_) => ShardResponse::WrongType,
2198        },
2199        ShardRequest::ZInterStore { dest, keys } => match ks.zinterstore(dest, keys) {
2200            Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2201            Err(_) => ShardResponse::WrongType,
2202        },
2203        ShardRequest::ZUnionStore { dest, keys } => match ks.zunionstore(dest, keys) {
2204            Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2205            Err(_) => ShardResponse::WrongType,
2206        },
2207        ShardRequest::ZRandMember {
2208            key,
2209            count,
2210            with_scores,
2211        } => match ks.zrandmember(key, *count, *with_scores) {
2212            Ok(pairs) => ShardResponse::ZRandMemberResult(pairs),
2213            Err(_) => ShardResponse::WrongType,
2214        },
2215        ShardRequest::SScan {
2216            key,
2217            cursor,
2218            count,
2219            pattern,
2220        } => match ks.scan_set(key, *cursor, *count, pattern.as_deref()) {
2221            Ok((next, members)) => {
2222                let items = members.into_iter().map(Bytes::from).collect();
2223                ShardResponse::CollectionScan {
2224                    cursor: next,
2225                    items,
2226                }
2227            }
2228            Err(_) => ShardResponse::WrongType,
2229        },
2230        ShardRequest::HScan {
2231            key,
2232            cursor,
2233            count,
2234            pattern,
2235        } => match ks.scan_hash(key, *cursor, *count, pattern.as_deref()) {
2236            Ok((next, fields)) => {
2237                let mut items = Vec::with_capacity(fields.len() * 2);
2238                for (field, value) in fields {
2239                    items.push(Bytes::from(field));
2240                    items.push(value);
2241                }
2242                ShardResponse::CollectionScan {
2243                    cursor: next,
2244                    items,
2245                }
2246            }
2247            Err(_) => ShardResponse::WrongType,
2248        },
2249        ShardRequest::ZScan {
2250            key,
2251            cursor,
2252            count,
2253            pattern,
2254        } => match ks.scan_sorted_set(key, *cursor, *count, pattern.as_deref()) {
2255            Ok((next, members)) => {
2256                let mut items = Vec::with_capacity(members.len() * 2);
2257                for (member, score) in members {
2258                    items.push(Bytes::from(member));
2259                    items.push(Bytes::from(score.to_string()));
2260                }
2261                ShardResponse::CollectionScan {
2262                    cursor: next,
2263                    items,
2264                }
2265            }
2266            Err(_) => ShardResponse::WrongType,
2267        },
2268        ShardRequest::CountKeysInSlot { slot } => {
2269            ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
2270        }
2271        ShardRequest::GetKeysInSlot { slot, count } => {
2272            ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
2273        }
2274        ShardRequest::DumpKey { key } => match ks.dump(key) {
2275            Some((value, ttl_ms)) => {
2276                let snap = persistence::value_to_snap(value);
2277                match snapshot::serialize_snap_value(&snap) {
2278                    Ok(data) => ShardResponse::KeyDump { data, ttl_ms },
2279                    Err(e) => ShardResponse::Err(format!("ERR snapshot serialization failed: {e}")),
2280                }
2281            }
2282            None => ShardResponse::Value(None),
2283        },
2284        ShardRequest::RestoreKey {
2285            key,
2286            ttl_ms,
2287            data,
2288            replace,
2289        } => match snapshot::deserialize_snap_value(data) {
2290            Ok(snap) => {
2291                let exists = ks.exists(key);
2292                if exists && !*replace {
2293                    ShardResponse::Err("ERR Target key name already exists".into())
2294                } else {
2295                    let value = persistence::snap_to_value(snap);
2296                    let ttl = if *ttl_ms == 0 {
2297                        None
2298                    } else {
2299                        Some(Duration::from_millis(*ttl_ms))
2300                    };
2301                    ks.restore(key.clone(), value, ttl);
2302                    ShardResponse::Ok
2303                }
2304            }
2305            Err(e) => ShardResponse::Err(format!("ERR DUMP payload corrupted: {e}")),
2306        },
2307        #[cfg(feature = "vector")]
2308        ShardRequest::VAdd {
2309            key,
2310            element,
2311            vector,
2312            metric,
2313            quantization,
2314            connectivity,
2315            expansion_add,
2316        } => {
2317            use crate::types::vector::{DistanceMetric, QuantizationType};
2318            match ks.vadd(
2319                key,
2320                element.clone(),
2321                vector.clone(),
2322                DistanceMetric::from_u8(*metric),
2323                QuantizationType::from_u8(*quantization),
2324                *connectivity as usize,
2325                *expansion_add as usize,
2326            ) {
2327                Ok(result) => ShardResponse::VAddResult {
2328                    element: result.element,
2329                    vector: result.vector,
2330                    added: result.added,
2331                },
2332                Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
2333                Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2334                Err(crate::keyspace::VectorWriteError::IndexError(e))
2335                | Err(crate::keyspace::VectorWriteError::PartialBatch { message: e, .. }) => {
2336                    ShardResponse::Err(format!("ERR vector index: {e}"))
2337                }
2338            }
2339        }
2340        #[cfg(feature = "vector")]
2341        ShardRequest::VAddBatch {
2342            key,
2343            entries,
2344            metric,
2345            quantization,
2346            connectivity,
2347            expansion_add,
2348            ..
2349        } => {
2350            use crate::types::vector::{DistanceMetric, QuantizationType};
2351            // take ownership of entries to avoid cloning vectors during
2352            // batch insertion. the entries vec in the request becomes empty,
2353            // which is fine because to_aof_records uses response.applied
2354            // instead of request.entries.
2355            let owned_entries = std::mem::take(entries);
2356            match ks.vadd_batch(
2357                key,
2358                owned_entries,
2359                DistanceMetric::from_u8(*metric),
2360                QuantizationType::from_u8(*quantization),
2361                *connectivity as usize,
2362                *expansion_add as usize,
2363            ) {
2364                Ok(result) => ShardResponse::VAddBatchResult {
2365                    added_count: result.added_count,
2366                    applied: result.applied,
2367                },
2368                Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
2369                Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2370                Err(crate::keyspace::VectorWriteError::IndexError(e)) => {
2371                    ShardResponse::Err(format!("ERR vector index: {e}"))
2372                }
2373                Err(crate::keyspace::VectorWriteError::PartialBatch { applied, .. }) => {
2374                    // partial success: return applied vectors for AOF persistence
2375                    ShardResponse::VAddBatchResult {
2376                        added_count: applied.len(),
2377                        applied,
2378                    }
2379                }
2380            }
2381        }
2382        #[cfg(feature = "vector")]
2383        ShardRequest::VSim {
2384            key,
2385            query,
2386            count,
2387            ef_search,
2388        } => match ks.vsim(key, query, *count, *ef_search) {
2389            Ok(results) => ShardResponse::VSimResult(
2390                results
2391                    .into_iter()
2392                    .map(|r| (r.element, r.distance))
2393                    .collect(),
2394            ),
2395            Err(_) => ShardResponse::WrongType,
2396        },
2397        #[cfg(feature = "vector")]
2398        ShardRequest::VRem { key, element } => match ks.vrem(key, element) {
2399            Ok(removed) => ShardResponse::Bool(removed),
2400            Err(_) => ShardResponse::WrongType,
2401        },
2402        #[cfg(feature = "vector")]
2403        ShardRequest::VGet { key, element } => match ks.vget(key, element) {
2404            Ok(data) => ShardResponse::VectorData(data),
2405            Err(_) => ShardResponse::WrongType,
2406        },
2407        #[cfg(feature = "vector")]
2408        ShardRequest::VCard { key } => match ks.vcard(key) {
2409            Ok(count) => ShardResponse::Integer(count as i64),
2410            Err(_) => ShardResponse::WrongType,
2411        },
2412        #[cfg(feature = "vector")]
2413        ShardRequest::VDim { key } => match ks.vdim(key) {
2414            Ok(dim) => ShardResponse::Integer(dim as i64),
2415            Err(_) => ShardResponse::WrongType,
2416        },
2417        #[cfg(feature = "vector")]
2418        ShardRequest::VInfo { key } => match ks.vinfo(key) {
2419            Ok(Some(info)) => {
2420                let fields = vec![
2421                    ("dim".to_owned(), info.dim.to_string()),
2422                    ("count".to_owned(), info.count.to_string()),
2423                    ("metric".to_owned(), info.metric.to_string()),
2424                    ("quantization".to_owned(), info.quantization.to_string()),
2425                    ("connectivity".to_owned(), info.connectivity.to_string()),
2426                    ("expansion_add".to_owned(), info.expansion_add.to_string()),
2427                ];
2428                ShardResponse::VectorInfo(Some(fields))
2429            }
2430            Ok(None) => ShardResponse::VectorInfo(None),
2431            Err(_) => ShardResponse::WrongType,
2432        },
2433        #[cfg(feature = "protobuf")]
2434        ShardRequest::ProtoSet {
2435            key,
2436            type_name,
2437            data,
2438            expire,
2439            nx,
2440            xx,
2441        } => {
2442            if *nx && ks.exists(key) {
2443                return ShardResponse::Value(None);
2444            }
2445            if *xx && !ks.exists(key) {
2446                return ShardResponse::Value(None);
2447            }
2448            match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
2449                SetResult::Ok | SetResult::Blocked => ShardResponse::Ok,
2450                SetResult::OutOfMemory => ShardResponse::OutOfMemory,
2451            }
2452        }
2453        #[cfg(feature = "protobuf")]
2454        ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
2455            Ok(val) => ShardResponse::ProtoValue(val),
2456            Err(_) => ShardResponse::WrongType,
2457        },
2458        #[cfg(feature = "protobuf")]
2459        ShardRequest::ProtoType { key } => match ks.proto_type(key) {
2460            Ok(name) => ShardResponse::ProtoTypeName(name),
2461            Err(_) => ShardResponse::WrongType,
2462        },
2463        // ProtoRegisterAof is a no-op for the keyspace — the AOF record
2464        // is written by the to_aof_record path after dispatch returns Ok.
2465        #[cfg(feature = "protobuf")]
2466        ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
2467        #[cfg(feature = "protobuf")]
2468        ShardRequest::ProtoSetField {
2469            key,
2470            field_path,
2471            value,
2472        } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2473            let new_data = reg.set_field(type_name, data, field_path, value)?;
2474            Ok(ShardResponse::ProtoFieldUpdated {
2475                type_name: type_name.to_owned(),
2476                data: new_data,
2477                expire: ttl,
2478            })
2479        }),
2480        #[cfg(feature = "protobuf")]
2481        ShardRequest::ProtoDelField { key, field_path } => {
2482            dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2483                let new_data = reg.clear_field(type_name, data, field_path)?;
2484                Ok(ShardResponse::ProtoFieldUpdated {
2485                    type_name: type_name.to_owned(),
2486                    data: new_data,
2487                    expire: ttl,
2488                })
2489            })
2490        }
2491        // these requests are intercepted in process_message, not handled here
2492        ShardRequest::Snapshot
2493        | ShardRequest::SerializeSnapshot
2494        | ShardRequest::RewriteAof
2495        | ShardRequest::FlushDbAsync
2496        | ShardRequest::UpdateMemoryConfig { .. }
2497        | ShardRequest::BLPop { .. }
2498        | ShardRequest::BRPop { .. } => ShardResponse::Ok,
2499    }
2500}
2501
2502/// Shared logic for atomic proto field operations (SETFIELD/DELFIELD).
2503///
2504/// Reads the proto value, acquires the schema registry, calls the
2505/// provided mutation closure, then writes the result back to the keyspace
2506/// — all within the single-threaded shard dispatch.
2507#[cfg(feature = "protobuf")]
2508fn dispatch_proto_field_op<F>(
2509    ks: &mut Keyspace,
2510    schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
2511    key: &str,
2512    mutate: F,
2513) -> ShardResponse
2514where
2515    F: FnOnce(
2516        &crate::schema::SchemaRegistry,
2517        &str,
2518        &[u8],
2519        Option<Duration>,
2520    ) -> Result<ShardResponse, crate::schema::SchemaError>,
2521{
2522    let registry = match schema_registry {
2523        Some(r) => r,
2524        None => return ShardResponse::Err("protobuf support is not enabled".into()),
2525    };
2526
2527    let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
2528        Ok(Some(tuple)) => tuple,
2529        Ok(None) => return ShardResponse::Value(None),
2530        Err(_) => return ShardResponse::WrongType,
2531    };
2532
2533    let reg = match registry.read() {
2534        Ok(r) => r,
2535        Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
2536    };
2537
2538    let resp = match mutate(&reg, &type_name, &data, remaining_ttl) {
2539        Ok(r) => r,
2540        Err(e) => return ShardResponse::Err(e.to_string()),
2541    };
2542
2543    // write the updated value back, preserving the original TTL
2544    if let ShardResponse::ProtoFieldUpdated {
2545        ref type_name,
2546        ref data,
2547        expire,
2548    } = resp
2549    {
2550        ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
2551    }
2552
2553    resp
2554}
2555
2556#[cfg(test)]
2557mod tests {
2558    use super::*;
2559
2560    /// Test helper: dispatch without a schema registry.
2561    fn test_dispatch(ks: &mut Keyspace, mut req: ShardRequest) -> ShardResponse {
2562        dispatch(
2563            ks,
2564            &mut req,
2565            #[cfg(feature = "protobuf")]
2566            &None,
2567        )
2568    }
2569
2570    #[test]
2571    fn dispatch_set_and_get() {
2572        let mut ks = Keyspace::new();
2573
2574        let resp = test_dispatch(
2575            &mut ks,
2576            ShardRequest::Set {
2577                key: "k".into(),
2578                value: Bytes::from("v"),
2579                expire: None,
2580                nx: false,
2581                xx: false,
2582            },
2583        );
2584        assert!(matches!(resp, ShardResponse::Ok));
2585
2586        let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "k".into() });
2587        match resp {
2588            ShardResponse::Value(Some(Value::String(data))) => {
2589                assert_eq!(data, Bytes::from("v"));
2590            }
2591            other => panic!("expected Value(Some(String)), got {other:?}"),
2592        }
2593    }
2594
2595    #[test]
2596    fn dispatch_get_missing() {
2597        let mut ks = Keyspace::new();
2598        let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "nope".into() });
2599        assert!(matches!(resp, ShardResponse::Value(None)));
2600    }
2601
2602    #[test]
2603    fn dispatch_del() {
2604        let mut ks = Keyspace::new();
2605        ks.set("key".into(), Bytes::from("val"), None, false, false);
2606
2607        let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2608        assert!(matches!(resp, ShardResponse::Bool(true)));
2609
2610        let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2611        assert!(matches!(resp, ShardResponse::Bool(false)));
2612    }
2613
2614    #[test]
2615    fn dispatch_exists() {
2616        let mut ks = Keyspace::new();
2617        ks.set("yes".into(), Bytes::from("here"), None, false, false);
2618
2619        let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "yes".into() });
2620        assert!(matches!(resp, ShardResponse::Bool(true)));
2621
2622        let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "no".into() });
2623        assert!(matches!(resp, ShardResponse::Bool(false)));
2624    }
2625
2626    #[test]
2627    fn dispatch_expire_and_ttl() {
2628        let mut ks = Keyspace::new();
2629        ks.set("key".into(), Bytes::from("val"), None, false, false);
2630
2631        let resp = test_dispatch(
2632            &mut ks,
2633            ShardRequest::Expire {
2634                key: "key".into(),
2635                seconds: 60,
2636            },
2637        );
2638        assert!(matches!(resp, ShardResponse::Bool(true)));
2639
2640        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2641        match resp {
2642            ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
2643            other => panic!("expected Ttl(Seconds), got {other:?}"),
2644        }
2645    }
2646
2647    #[test]
2648    fn dispatch_ttl_missing() {
2649        let mut ks = Keyspace::new();
2650        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "gone".into() });
2651        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2652    }
2653
2654    #[test]
2655    fn dispatch_incr_new_key() {
2656        let mut ks = Keyspace::new();
2657        let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "c".into() });
2658        assert!(matches!(resp, ShardResponse::Integer(1)));
2659    }
2660
2661    #[test]
2662    fn dispatch_decr_existing() {
2663        let mut ks = Keyspace::new();
2664        ks.set("n".into(), Bytes::from("10"), None, false, false);
2665        let resp = test_dispatch(&mut ks, ShardRequest::Decr { key: "n".into() });
2666        assert!(matches!(resp, ShardResponse::Integer(9)));
2667    }
2668
2669    #[test]
2670    fn dispatch_incr_non_integer() {
2671        let mut ks = Keyspace::new();
2672        ks.set("s".into(), Bytes::from("hello"), None, false, false);
2673        let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "s".into() });
2674        assert!(matches!(resp, ShardResponse::Err(_)));
2675    }
2676
2677    #[test]
2678    fn dispatch_incrby() {
2679        let mut ks = Keyspace::new();
2680        ks.set("n".into(), Bytes::from("10"), None, false, false);
2681        let resp = test_dispatch(
2682            &mut ks,
2683            ShardRequest::IncrBy {
2684                key: "n".into(),
2685                delta: 5,
2686            },
2687        );
2688        assert!(matches!(resp, ShardResponse::Integer(15)));
2689    }
2690
2691    #[test]
2692    fn dispatch_decrby() {
2693        let mut ks = Keyspace::new();
2694        ks.set("n".into(), Bytes::from("10"), None, false, false);
2695        let resp = test_dispatch(
2696            &mut ks,
2697            ShardRequest::DecrBy {
2698                key: "n".into(),
2699                delta: 3,
2700            },
2701        );
2702        assert!(matches!(resp, ShardResponse::Integer(7)));
2703    }
2704
2705    #[test]
2706    fn dispatch_incrby_new_key() {
2707        let mut ks = Keyspace::new();
2708        let resp = test_dispatch(
2709            &mut ks,
2710            ShardRequest::IncrBy {
2711                key: "new".into(),
2712                delta: 42,
2713            },
2714        );
2715        assert!(matches!(resp, ShardResponse::Integer(42)));
2716    }
2717
2718    #[test]
2719    fn dispatch_incrbyfloat() {
2720        let mut ks = Keyspace::new();
2721        ks.set("n".into(), Bytes::from("10.5"), None, false, false);
2722        let resp = test_dispatch(
2723            &mut ks,
2724            ShardRequest::IncrByFloat {
2725                key: "n".into(),
2726                delta: 2.3,
2727            },
2728        );
2729        match resp {
2730            ShardResponse::BulkString(val) => {
2731                let f: f64 = val.parse().unwrap();
2732                assert!((f - 12.8).abs() < 0.001);
2733            }
2734            other => panic!("expected BulkString, got {other:?}"),
2735        }
2736    }
2737
2738    #[test]
2739    fn dispatch_append() {
2740        let mut ks = Keyspace::new();
2741        ks.set("k".into(), Bytes::from("hello"), None, false, false);
2742        let resp = test_dispatch(
2743            &mut ks,
2744            ShardRequest::Append {
2745                key: "k".into(),
2746                value: Bytes::from(" world"),
2747            },
2748        );
2749        assert!(matches!(resp, ShardResponse::Len(11)));
2750    }
2751
2752    #[test]
2753    fn dispatch_strlen() {
2754        let mut ks = Keyspace::new();
2755        ks.set("k".into(), Bytes::from("hello"), None, false, false);
2756        let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "k".into() });
2757        assert!(matches!(resp, ShardResponse::Len(5)));
2758    }
2759
2760    #[test]
2761    fn dispatch_strlen_missing() {
2762        let mut ks = Keyspace::new();
2763        let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "nope".into() });
2764        assert!(matches!(resp, ShardResponse::Len(0)));
2765    }
2766
2767    #[test]
2768    fn dispatch_incrbyfloat_new_key() {
2769        let mut ks = Keyspace::new();
2770        let resp = test_dispatch(
2771            &mut ks,
2772            ShardRequest::IncrByFloat {
2773                key: "new".into(),
2774                delta: 2.72,
2775            },
2776        );
2777        match resp {
2778            ShardResponse::BulkString(val) => {
2779                let f: f64 = val.parse().unwrap();
2780                assert!((f - 2.72).abs() < 0.001);
2781            }
2782            other => panic!("expected BulkString, got {other:?}"),
2783        }
2784    }
2785
2786    #[test]
2787    fn dispatch_persist_removes_ttl() {
2788        let mut ks = Keyspace::new();
2789        ks.set(
2790            "key".into(),
2791            Bytes::from("val"),
2792            Some(Duration::from_secs(60)),
2793            false,
2794            false,
2795        );
2796
2797        let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "key".into() });
2798        assert!(matches!(resp, ShardResponse::Bool(true)));
2799
2800        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2801        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
2802    }
2803
2804    #[test]
2805    fn dispatch_persist_missing_key() {
2806        let mut ks = Keyspace::new();
2807        let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "nope".into() });
2808        assert!(matches!(resp, ShardResponse::Bool(false)));
2809    }
2810
2811    #[test]
2812    fn dispatch_pttl() {
2813        let mut ks = Keyspace::new();
2814        ks.set(
2815            "key".into(),
2816            Bytes::from("val"),
2817            Some(Duration::from_secs(60)),
2818            false,
2819            false,
2820        );
2821
2822        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2823        match resp {
2824            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2825                assert!(ms > 59_000 && ms <= 60_000);
2826            }
2827            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2828        }
2829    }
2830
2831    #[test]
2832    fn dispatch_pttl_missing() {
2833        let mut ks = Keyspace::new();
2834        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "nope".into() });
2835        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2836    }
2837
2838    #[test]
2839    fn dispatch_pexpire() {
2840        let mut ks = Keyspace::new();
2841        ks.set("key".into(), Bytes::from("val"), None, false, false);
2842
2843        let resp = test_dispatch(
2844            &mut ks,
2845            ShardRequest::Pexpire {
2846                key: "key".into(),
2847                milliseconds: 5000,
2848            },
2849        );
2850        assert!(matches!(resp, ShardResponse::Bool(true)));
2851
2852        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2853        match resp {
2854            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2855                assert!(ms > 4000 && ms <= 5000);
2856            }
2857            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2858        }
2859    }
2860
2861    #[test]
2862    fn dispatch_set_nx_when_key_missing() {
2863        let mut ks = Keyspace::new();
2864        let resp = test_dispatch(
2865            &mut ks,
2866            ShardRequest::Set {
2867                key: "k".into(),
2868                value: Bytes::from("v"),
2869                expire: None,
2870                nx: true,
2871                xx: false,
2872            },
2873        );
2874        assert!(matches!(resp, ShardResponse::Ok));
2875        assert!(ks.exists("k"));
2876    }
2877
2878    #[test]
2879    fn dispatch_set_nx_when_key_exists() {
2880        let mut ks = Keyspace::new();
2881        ks.set("k".into(), Bytes::from("old"), None, false, false);
2882
2883        let resp = test_dispatch(
2884            &mut ks,
2885            ShardRequest::Set {
2886                key: "k".into(),
2887                value: Bytes::from("new"),
2888                expire: None,
2889                nx: true,
2890                xx: false,
2891            },
2892        );
2893        // NX should block — returns nil
2894        assert!(matches!(resp, ShardResponse::Value(None)));
2895        // original value should remain
2896        match ks.get("k").unwrap() {
2897            Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2898            other => panic!("expected old value, got {other:?}"),
2899        }
2900    }
2901
2902    #[test]
2903    fn dispatch_set_xx_when_key_exists() {
2904        let mut ks = Keyspace::new();
2905        ks.set("k".into(), Bytes::from("old"), None, false, false);
2906
2907        let resp = test_dispatch(
2908            &mut ks,
2909            ShardRequest::Set {
2910                key: "k".into(),
2911                value: Bytes::from("new"),
2912                expire: None,
2913                nx: false,
2914                xx: true,
2915            },
2916        );
2917        assert!(matches!(resp, ShardResponse::Ok));
2918        match ks.get("k").unwrap() {
2919            Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2920            other => panic!("expected new value, got {other:?}"),
2921        }
2922    }
2923
2924    #[test]
2925    fn dispatch_set_xx_when_key_missing() {
2926        let mut ks = Keyspace::new();
2927        let resp = test_dispatch(
2928            &mut ks,
2929            ShardRequest::Set {
2930                key: "k".into(),
2931                value: Bytes::from("v"),
2932                expire: None,
2933                nx: false,
2934                xx: true,
2935            },
2936        );
2937        // XX should block — returns nil
2938        assert!(matches!(resp, ShardResponse::Value(None)));
2939        assert!(!ks.exists("k"));
2940    }
2941
2942    #[test]
2943    fn dispatch_flushdb_clears_all_keys() {
2944        let mut ks = Keyspace::new();
2945        ks.set("a".into(), Bytes::from("1"), None, false, false);
2946        ks.set("b".into(), Bytes::from("2"), None, false, false);
2947
2948        assert_eq!(ks.len(), 2);
2949
2950        let resp = test_dispatch(&mut ks, ShardRequest::FlushDb);
2951        assert!(matches!(resp, ShardResponse::Ok));
2952        assert_eq!(ks.len(), 0);
2953    }
2954
2955    #[test]
2956    fn dispatch_scan_returns_keys() {
2957        let mut ks = Keyspace::new();
2958        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2959        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2960        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2961
2962        let resp = test_dispatch(
2963            &mut ks,
2964            ShardRequest::Scan {
2965                cursor: 0,
2966                count: 10,
2967                pattern: None,
2968            },
2969        );
2970
2971        match resp {
2972            ShardResponse::Scan { cursor, keys } => {
2973                assert_eq!(cursor, 0); // complete in one pass
2974                assert_eq!(keys.len(), 3);
2975            }
2976            _ => panic!("expected Scan response"),
2977        }
2978    }
2979
2980    #[test]
2981    fn dispatch_scan_with_pattern() {
2982        let mut ks = Keyspace::new();
2983        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2984        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2985        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2986
2987        let resp = test_dispatch(
2988            &mut ks,
2989            ShardRequest::Scan {
2990                cursor: 0,
2991                count: 10,
2992                pattern: Some("user:*".into()),
2993            },
2994        );
2995
2996        match resp {
2997            ShardResponse::Scan { cursor, keys } => {
2998                assert_eq!(cursor, 0);
2999                assert_eq!(keys.len(), 2);
3000                for k in &keys {
3001                    assert!(k.starts_with("user:"));
3002                }
3003            }
3004            _ => panic!("expected Scan response"),
3005        }
3006    }
3007
3008    #[test]
3009    fn dispatch_keys() {
3010        let mut ks = Keyspace::new();
3011        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
3012        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
3013        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
3014        let resp = test_dispatch(
3015            &mut ks,
3016            ShardRequest::Keys {
3017                pattern: "user:*".into(),
3018            },
3019        );
3020        match resp {
3021            ShardResponse::StringArray(mut keys) => {
3022                keys.sort();
3023                assert_eq!(keys, vec!["user:1", "user:2"]);
3024            }
3025            other => panic!("expected StringArray, got {other:?}"),
3026        }
3027    }
3028
3029    #[test]
3030    fn dispatch_rename() {
3031        let mut ks = Keyspace::new();
3032        ks.set("old".into(), Bytes::from("value"), None, false, false);
3033        let resp = test_dispatch(
3034            &mut ks,
3035            ShardRequest::Rename {
3036                key: "old".into(),
3037                newkey: "new".into(),
3038            },
3039        );
3040        assert!(matches!(resp, ShardResponse::Ok));
3041        assert!(!ks.exists("old"));
3042        assert!(ks.exists("new"));
3043    }
3044
3045    #[test]
3046    fn dispatch_rename_missing_key() {
3047        let mut ks = Keyspace::new();
3048        let resp = test_dispatch(
3049            &mut ks,
3050            ShardRequest::Rename {
3051                key: "missing".into(),
3052                newkey: "new".into(),
3053            },
3054        );
3055        assert!(matches!(resp, ShardResponse::Err(_)));
3056    }
3057
3058    #[test]
3059    fn dump_key_returns_serialized_value() {
3060        let mut ks = Keyspace::new();
3061        ks.set(
3062            "greeting".into(),
3063            Bytes::from("hello"),
3064            Some(Duration::from_secs(60)),
3065            false,
3066            false,
3067        );
3068
3069        let resp = test_dispatch(
3070            &mut ks,
3071            ShardRequest::DumpKey {
3072                key: "greeting".into(),
3073            },
3074        );
3075        match resp {
3076            ShardResponse::KeyDump { data, ttl_ms } => {
3077                assert!(!data.is_empty());
3078                assert!(ttl_ms > 0);
3079                // verify the data round-trips
3080                let snap = snapshot::deserialize_snap_value(&data).unwrap();
3081                assert!(matches!(snap, SnapValue::String(ref b) if b == &Bytes::from("hello")));
3082            }
3083            other => panic!("expected KeyDump, got {other:?}"),
3084        }
3085    }
3086
3087    #[test]
3088    fn dump_key_missing_returns_none() {
3089        let mut ks = Keyspace::new();
3090        let resp = test_dispatch(&mut ks, ShardRequest::DumpKey { key: "nope".into() });
3091        assert!(matches!(resp, ShardResponse::Value(None)));
3092    }
3093
3094    #[test]
3095    fn restore_key_inserts_value() {
3096        let mut ks = Keyspace::new();
3097        let snap = SnapValue::String(Bytes::from("restored"));
3098        let data = snapshot::serialize_snap_value(&snap).unwrap();
3099
3100        let resp = test_dispatch(
3101            &mut ks,
3102            ShardRequest::RestoreKey {
3103                key: "mykey".into(),
3104                ttl_ms: 0,
3105                data: Bytes::from(data),
3106                replace: false,
3107            },
3108        );
3109        assert!(matches!(resp, ShardResponse::Ok));
3110        assert_eq!(
3111            ks.get("mykey").unwrap(),
3112            Some(Value::String(Bytes::from("restored")))
3113        );
3114    }
3115
3116    #[test]
3117    fn restore_key_with_ttl() {
3118        let mut ks = Keyspace::new();
3119        let snap = SnapValue::String(Bytes::from("temp"));
3120        let data = snapshot::serialize_snap_value(&snap).unwrap();
3121
3122        let resp = test_dispatch(
3123            &mut ks,
3124            ShardRequest::RestoreKey {
3125                key: "ttlkey".into(),
3126                ttl_ms: 30_000,
3127                data: Bytes::from(data),
3128                replace: false,
3129            },
3130        );
3131        assert!(matches!(resp, ShardResponse::Ok));
3132        match ks.pttl("ttlkey") {
3133            TtlResult::Milliseconds(ms) => assert!(ms > 29_000 && ms <= 30_000),
3134            other => panic!("expected Milliseconds, got {other:?}"),
3135        }
3136    }
3137
3138    #[test]
3139    fn restore_key_rejects_duplicate_without_replace() {
3140        let mut ks = Keyspace::new();
3141        ks.set("existing".into(), Bytes::from("old"), None, false, false);
3142
3143        let snap = SnapValue::String(Bytes::from("new"));
3144        let data = snapshot::serialize_snap_value(&snap).unwrap();
3145
3146        let resp = test_dispatch(
3147            &mut ks,
3148            ShardRequest::RestoreKey {
3149                key: "existing".into(),
3150                ttl_ms: 0,
3151                data: Bytes::from(data),
3152                replace: false,
3153            },
3154        );
3155        assert!(matches!(resp, ShardResponse::Err(_)));
3156        // original value unchanged
3157        assert_eq!(
3158            ks.get("existing").unwrap(),
3159            Some(Value::String(Bytes::from("old")))
3160        );
3161    }
3162
3163    #[test]
3164    fn restore_key_replace_overwrites() {
3165        let mut ks = Keyspace::new();
3166        ks.set("existing".into(), Bytes::from("old"), None, false, false);
3167
3168        let snap = SnapValue::String(Bytes::from("new"));
3169        let data = snapshot::serialize_snap_value(&snap).unwrap();
3170
3171        let resp = test_dispatch(
3172            &mut ks,
3173            ShardRequest::RestoreKey {
3174                key: "existing".into(),
3175                ttl_ms: 0,
3176                data: Bytes::from(data),
3177                replace: true,
3178            },
3179        );
3180        assert!(matches!(resp, ShardResponse::Ok));
3181        assert_eq!(
3182            ks.get("existing").unwrap(),
3183            Some(Value::String(Bytes::from("new")))
3184        );
3185    }
3186
3187    #[test]
3188    fn dump_and_restore_hash_roundtrip() {
3189        let mut ks = Keyspace::new();
3190        ks.hset(
3191            "myhash",
3192            &[
3193                ("f1".into(), Bytes::from("v1")),
3194                ("f2".into(), Bytes::from("v2")),
3195            ],
3196        )
3197        .unwrap();
3198
3199        // dump
3200        let resp = test_dispatch(
3201            &mut ks,
3202            ShardRequest::DumpKey {
3203                key: "myhash".into(),
3204            },
3205        );
3206        let (data, _ttl) = match resp {
3207            ShardResponse::KeyDump { data, ttl_ms } => (data, ttl_ms),
3208            other => panic!("expected KeyDump, got {other:?}"),
3209        };
3210
3211        // restore to a new key
3212        let resp = test_dispatch(
3213            &mut ks,
3214            ShardRequest::RestoreKey {
3215                key: "myhash2".into(),
3216                ttl_ms: 0,
3217                data: Bytes::from(data),
3218                replace: false,
3219            },
3220        );
3221        assert!(matches!(resp, ShardResponse::Ok));
3222
3223        // verify fields
3224        assert_eq!(ks.hget("myhash2", "f1").unwrap(), Some(Bytes::from("v1")));
3225        assert_eq!(ks.hget("myhash2", "f2").unwrap(), Some(Bytes::from("v2")));
3226    }
3227
3228    #[test]
3229    fn is_write_classifies_correctly() {
3230        // write commands
3231        assert!(ShardRequest::Set {
3232            key: "k".into(),
3233            value: Bytes::from("v"),
3234            expire: None,
3235            nx: false,
3236            xx: false,
3237        }
3238        .is_write());
3239        assert!(ShardRequest::Del { key: "k".into() }.is_write());
3240        assert!(ShardRequest::Incr { key: "k".into() }.is_write());
3241        assert!(ShardRequest::LPush {
3242            key: "k".into(),
3243            values: vec![],
3244        }
3245        .is_write());
3246        assert!(ShardRequest::HSet {
3247            key: "k".into(),
3248            fields: vec![],
3249        }
3250        .is_write());
3251        assert!(ShardRequest::SAdd {
3252            key: "k".into(),
3253            members: vec![],
3254        }
3255        .is_write());
3256        assert!(ShardRequest::FlushDb.is_write());
3257
3258        // read commands
3259        assert!(!ShardRequest::Get { key: "k".into() }.is_write());
3260        assert!(!ShardRequest::Exists { key: "k".into() }.is_write());
3261        assert!(!ShardRequest::Ttl { key: "k".into() }.is_write());
3262        assert!(!ShardRequest::DbSize.is_write());
3263        assert!(!ShardRequest::Stats.is_write());
3264        assert!(!ShardRequest::LLen { key: "k".into() }.is_write());
3265        assert!(!ShardRequest::HGet {
3266            key: "k".into(),
3267            field: "f".into(),
3268        }
3269        .is_write());
3270        assert!(!ShardRequest::SMembers { key: "k".into() }.is_write());
3271    }
3272}