Skip to main content

ember_core/shard/
mod.rs

1//! Shard: an independent partition of the keyspace.
2//!
3//! ## thread-per-core, shared-nothing model
4//!
5//! Each shard runs as a single tokio task that exclusively owns its [`Keyspace`]
6//! partition. All mutations execute serially inside one task — no mutex, no
7//! read-write lock, no cross-thread coordination on the hot path. This is the
8//! core design choice that enables predictable latency: a shard thread can never
9//! be stalled waiting for another thread to release a lock.
10//!
11//! ## backpressure via bounded channel
12//!
13//! The mpsc buffer (4,096 items) is the system's flow-control valve. When a shard
14//! is fully loaded, `try_send` returns `Err(Full)` immediately, giving the caller a
15//! clear signal to shed load or return an error to the client rather than quietly
16//! growing an unbounded queue. This prevents memory blow-up under sustained overload.
17//!
18//! ## per-request oneshot channels
19//!
20//! Each [`ShardMessage`] carries a `oneshot::Sender<ShardResponse>`. The caller
21//! blocks on the receiver while the shard processes the request. This gives O(1)
22//! delivery with no shared response queue and no head-of-line blocking between
23//! concurrent callers — each caller waits only on its own future.
24//!
25//! ## pipeline draining
26//!
27//! After waking from `select!`, the event loop drains the channel with `try_recv()`
28//! before re-entering `select!`. This amortizes scheduler wake-up overhead across
29//! burst traffic — essential for pipelined clients that send dozens of commands
30//! back-to-back without waiting for individual responses.
31//!
32//! ## AOF linearizability
33//!
34//! Because all mutations execute serially in one task, the AOF record sequence is a
35//! perfect linearized history of the keyspace. The per-shard monotonically increasing
36//! offset is a consistent position marker that replicas use to detect gaps and
37//! trigger re-sync when they fall behind.
38//!
39//! ## mechanical sympathy
40//!
41//! The hot path (`recv → dispatch → respond`) touches only shard-local memory. No
42//! cross-thread cache-line contention. This is why sharded mode outperforms
43//! mutex-per-command designs by 3–5× on write-heavy workloads — the CPU's cache
44//! hierarchy can stay warm on data that belongs to this core.
45
46mod aof;
47mod blocking;
48mod persistence;
49
50use std::collections::{HashMap, VecDeque};
51use std::path::PathBuf;
52use std::time::Duration;
53
54use bytes::Bytes;
55use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
56use ember_persistence::recovery::{self, RecoveredValue};
57use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
58use smallvec::{smallvec, SmallVec};
59use tokio::sync::{broadcast, mpsc, oneshot};
60use tracing::{debug, error, info, warn};
61
62use crate::dropper::DropHandle;
63use crate::error::ShardError;
64use crate::expiry;
65use crate::keyspace::{
66    IncrError, IncrFloatError, Keyspace, KeyspaceStats, LsetError, SetResult, ShardConfig,
67    TtlResult, WriteError,
68};
69use crate::types::sorted_set::{ScoreBound, ZAddFlags};
70use crate::types::Value;
71
72/// How often the shard runs active expiration. 100ms matches
73/// Redis's hz=10 default and keeps CPU overhead negligible.
74const EXPIRY_TICK: Duration = Duration::from_millis(100);
75
76/// How often to fsync when using the `EverySec` policy.
77const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
78
79/// A mutation event broadcast to replication subscribers.
80///
81/// Published after every successful mutation on the hot path. The
82/// `offset` is per-shard and monotonically increasing — replicas use it
83/// to detect gaps and trigger re-sync when they fall behind.
84#[derive(Debug, Clone)]
85pub struct ReplicationEvent {
86    /// The shard that produced this event.
87    pub shard_id: u16,
88    /// Monotonically increasing per-shard offset.
89    pub offset: u64,
90    /// The mutation record, ready to replay on a replica.
91    pub record: AofRecord,
92}
93
94/// Optional persistence configuration for a shard.
95#[derive(Debug, Clone)]
96pub struct ShardPersistenceConfig {
97    /// Directory where AOF and snapshot files live.
98    pub data_dir: PathBuf,
99    /// Whether to write an AOF log of mutations.
100    pub append_only: bool,
101    /// When to fsync the AOF file.
102    pub fsync_policy: FsyncPolicy,
103    /// Optional encryption key for encrypting data at rest.
104    /// When set, AOF and snapshot files use the v3 encrypted format.
105    #[cfg(feature = "encryption")]
106    pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
107}
108
109/// A protocol-agnostic command sent to a shard.
110#[derive(Debug)]
111pub enum ShardRequest {
112    Get {
113        key: String,
114    },
115    Set {
116        key: String,
117        value: Bytes,
118        expire: Option<Duration>,
119        /// Only set the key if it does not already exist.
120        nx: bool,
121        /// Only set the key if it already exists.
122        xx: bool,
123    },
124    Incr {
125        key: String,
126    },
127    Decr {
128        key: String,
129    },
130    IncrBy {
131        key: String,
132        delta: i64,
133    },
134    DecrBy {
135        key: String,
136        delta: i64,
137    },
138    IncrByFloat {
139        key: String,
140        delta: f64,
141    },
142    Append {
143        key: String,
144        value: Bytes,
145    },
146    Strlen {
147        key: String,
148    },
149    GetRange {
150        key: String,
151        start: i64,
152        end: i64,
153    },
154    SetRange {
155        key: String,
156        offset: usize,
157        value: Bytes,
158    },
159    /// Returns all keys matching a glob pattern in this shard.
160    Keys {
161        pattern: String,
162    },
163    /// Renames a key within this shard.
164    Rename {
165        key: String,
166        newkey: String,
167    },
168    /// Copies the value at source to destination within this shard.
169    Copy {
170        source: String,
171        destination: String,
172        replace: bool,
173    },
174    /// Returns the internal encoding name for the value at key.
175    ObjectEncoding {
176        key: String,
177    },
178    Del {
179        key: String,
180    },
181    /// Like DEL but defers value deallocation to the background drop thread.
182    Unlink {
183        key: String,
184    },
185    Exists {
186        key: String,
187    },
188    /// Returns a random key from the shard's keyspace.
189    RandomKey,
190    /// Updates last access time for a key. Returns bool (existed).
191    Touch {
192        key: String,
193    },
194    /// Sorts elements from a list, set, or sorted set in this shard.
195    Sort {
196        key: String,
197        desc: bool,
198        alpha: bool,
199        limit: Option<(i64, i64)>,
200    },
201    Expire {
202        key: String,
203        seconds: u64,
204    },
205    Ttl {
206        key: String,
207    },
208    Persist {
209        key: String,
210    },
211    Pttl {
212        key: String,
213    },
214    Pexpire {
215        key: String,
216        milliseconds: u64,
217    },
218    LPush {
219        key: String,
220        values: Vec<Bytes>,
221    },
222    RPush {
223        key: String,
224        values: Vec<Bytes>,
225    },
226    LPop {
227        key: String,
228    },
229    RPop {
230        key: String,
231    },
232    /// Blocking left-pop. If the list has elements, pops immediately and sends
233    /// the result on `waiter`. If empty, the shard registers the waiter to be
234    /// woken when an element is pushed. Uses an mpsc sender so multiple shards
235    /// can race to deliver the first result to a single receiver.
236    BLPop {
237        key: String,
238        waiter: mpsc::Sender<(String, Bytes)>,
239    },
240    /// Blocking right-pop. Same semantics as BLPop but pops from the tail.
241    BRPop {
242        key: String,
243        waiter: mpsc::Sender<(String, Bytes)>,
244    },
245    LRange {
246        key: String,
247        start: i64,
248        stop: i64,
249    },
250    LLen {
251        key: String,
252    },
253    LIndex {
254        key: String,
255        index: i64,
256    },
257    LSet {
258        key: String,
259        index: i64,
260        value: Bytes,
261    },
262    LTrim {
263        key: String,
264        start: i64,
265        stop: i64,
266    },
267    LInsert {
268        key: String,
269        before: bool,
270        pivot: Bytes,
271        value: Bytes,
272    },
273    LRem {
274        key: String,
275        count: i64,
276        value: Bytes,
277    },
278    LPos {
279        key: String,
280        element: Bytes,
281        rank: i64,
282        count: usize,
283        maxlen: usize,
284    },
285    Type {
286        key: String,
287    },
288    ZAdd {
289        key: String,
290        members: Vec<(f64, String)>,
291        nx: bool,
292        xx: bool,
293        gt: bool,
294        lt: bool,
295        ch: bool,
296    },
297    ZRem {
298        key: String,
299        members: Vec<String>,
300    },
301    ZScore {
302        key: String,
303        member: String,
304    },
305    ZRank {
306        key: String,
307        member: String,
308    },
309    ZRevRank {
310        key: String,
311        member: String,
312    },
313    ZCard {
314        key: String,
315    },
316    ZRange {
317        key: String,
318        start: i64,
319        stop: i64,
320        with_scores: bool,
321    },
322    ZRevRange {
323        key: String,
324        start: i64,
325        stop: i64,
326        with_scores: bool,
327    },
328    ZCount {
329        key: String,
330        min: ScoreBound,
331        max: ScoreBound,
332    },
333    ZIncrBy {
334        key: String,
335        increment: f64,
336        member: String,
337    },
338    ZRangeByScore {
339        key: String,
340        min: ScoreBound,
341        max: ScoreBound,
342        offset: usize,
343        count: Option<usize>,
344    },
345    ZRevRangeByScore {
346        key: String,
347        min: ScoreBound,
348        max: ScoreBound,
349        offset: usize,
350        count: Option<usize>,
351    },
352    ZPopMin {
353        key: String,
354        count: usize,
355    },
356    ZPopMax {
357        key: String,
358        count: usize,
359    },
360    HSet {
361        key: String,
362        fields: Vec<(String, Bytes)>,
363    },
364    HGet {
365        key: String,
366        field: String,
367    },
368    HGetAll {
369        key: String,
370    },
371    HDel {
372        key: String,
373        fields: Vec<String>,
374    },
375    HExists {
376        key: String,
377        field: String,
378    },
379    HLen {
380        key: String,
381    },
382    HIncrBy {
383        key: String,
384        field: String,
385        delta: i64,
386    },
387    HKeys {
388        key: String,
389    },
390    HVals {
391        key: String,
392    },
393    HMGet {
394        key: String,
395        fields: Vec<String>,
396    },
397    SAdd {
398        key: String,
399        members: Vec<String>,
400    },
401    SRem {
402        key: String,
403        members: Vec<String>,
404    },
405    SMembers {
406        key: String,
407    },
408    SIsMember {
409        key: String,
410        member: String,
411    },
412    SCard {
413        key: String,
414    },
415    SUnion {
416        keys: Vec<String>,
417    },
418    SInter {
419        keys: Vec<String>,
420    },
421    SDiff {
422        keys: Vec<String>,
423    },
424    SUnionStore {
425        dest: String,
426        keys: Vec<String>,
427    },
428    SInterStore {
429        dest: String,
430        keys: Vec<String>,
431    },
432    SDiffStore {
433        dest: String,
434        keys: Vec<String>,
435    },
436    SRandMember {
437        key: String,
438        count: i64,
439    },
440    SPop {
441        key: String,
442        count: usize,
443    },
444    SMisMember {
445        key: String,
446        members: Vec<String>,
447    },
448    /// Returns the key count for this shard.
449    DbSize,
450    /// Returns keyspace stats for this shard.
451    Stats,
452    /// Returns the current version of a key for WATCH optimistic locking.
453    /// Read-only, no AOF, no replication — cold path only.
454    KeyVersion {
455        key: String,
456    },
457    /// Triggers a snapshot write.
458    Snapshot,
459    /// Serializes the current shard state to bytes (in-memory snapshot).
460    ///
461    /// Used by the replication server to capture a consistent shard
462    /// snapshot for transmission to a new replica without filesystem I/O.
463    SerializeSnapshot,
464    /// Triggers an AOF rewrite (snapshot + truncate AOF).
465    RewriteAof,
466    /// Clears all keys from the keyspace.
467    FlushDb,
468    /// Clears all keys, deferring deallocation to the background drop thread.
469    FlushDbAsync,
470    /// Scans keys in the keyspace.
471    Scan {
472        cursor: u64,
473        count: usize,
474        pattern: Option<String>,
475    },
476    /// Incrementally iterates set members.
477    SScan {
478        key: String,
479        cursor: u64,
480        count: usize,
481        pattern: Option<String>,
482    },
483    /// Incrementally iterates hash fields.
484    HScan {
485        key: String,
486        cursor: u64,
487        count: usize,
488        pattern: Option<String>,
489    },
490    /// Incrementally iterates sorted set members.
491    ZScan {
492        key: String,
493        cursor: u64,
494        count: usize,
495        pattern: Option<String>,
496    },
497    /// Counts keys in this shard that hash to the given cluster slot.
498    CountKeysInSlot {
499        slot: u16,
500    },
501    /// Returns up to `count` keys that hash to the given cluster slot.
502    GetKeysInSlot {
503        slot: u16,
504        count: usize,
505    },
506    /// Dumps a key's value as serialized bytes for MIGRATE.
507    DumpKey {
508        key: String,
509    },
510    /// Restores a key from serialized bytes (received via MIGRATE).
511    RestoreKey {
512        key: String,
513        ttl_ms: u64,
514        data: bytes::Bytes,
515        replace: bool,
516    },
517    /// Adds a vector to a vector set.
518    #[cfg(feature = "vector")]
519    VAdd {
520        key: String,
521        element: String,
522        vector: Vec<f32>,
523        metric: u8,
524        quantization: u8,
525        connectivity: u32,
526        expansion_add: u32,
527    },
528    /// Adds multiple vectors to a vector set in a single command.
529    #[cfg(feature = "vector")]
530    VAddBatch {
531        key: String,
532        entries: Vec<(String, Vec<f32>)>,
533        dim: usize,
534        metric: u8,
535        quantization: u8,
536        connectivity: u32,
537        expansion_add: u32,
538    },
539    /// Searches for nearest neighbors in a vector set.
540    #[cfg(feature = "vector")]
541    VSim {
542        key: String,
543        query: Vec<f32>,
544        count: usize,
545        ef_search: usize,
546    },
547    /// Removes an element from a vector set.
548    #[cfg(feature = "vector")]
549    VRem {
550        key: String,
551        element: String,
552    },
553    /// Gets the stored vector for an element.
554    #[cfg(feature = "vector")]
555    VGet {
556        key: String,
557        element: String,
558    },
559    /// Returns the number of elements in a vector set.
560    #[cfg(feature = "vector")]
561    VCard {
562        key: String,
563    },
564    /// Returns the dimensionality of a vector set.
565    #[cfg(feature = "vector")]
566    VDim {
567        key: String,
568    },
569    /// Returns metadata about a vector set.
570    #[cfg(feature = "vector")]
571    VInfo {
572        key: String,
573    },
574    /// Stores a validated protobuf value.
575    #[cfg(feature = "protobuf")]
576    ProtoSet {
577        key: String,
578        type_name: String,
579        data: Bytes,
580        expire: Option<Duration>,
581        nx: bool,
582        xx: bool,
583    },
584    /// Retrieves a protobuf value.
585    #[cfg(feature = "protobuf")]
586    ProtoGet {
587        key: String,
588    },
589    /// Returns the protobuf message type name for a key.
590    #[cfg(feature = "protobuf")]
591    ProtoType {
592        key: String,
593    },
594    /// Writes a ProtoRegister AOF record (no keyspace mutation).
595    /// Broadcast to all shards after a schema registration so the
596    /// schema is recovered from any shard's AOF on restart.
597    #[cfg(feature = "protobuf")]
598    ProtoRegisterAof {
599        name: String,
600        descriptor: Bytes,
601    },
602    /// Atomically reads a proto value, sets a field, and writes it back.
603    /// Runs entirely within the shard's single-threaded dispatch.
604    #[cfg(feature = "protobuf")]
605    ProtoSetField {
606        key: String,
607        field_path: String,
608        value: String,
609    },
610    /// Atomically reads a proto value, clears a field, and writes it back.
611    /// Runs entirely within the shard's single-threaded dispatch.
612    #[cfg(feature = "protobuf")]
613    ProtoDelField {
614        key: String,
615        field_path: String,
616    },
617}
618
619impl ShardRequest {
620    /// Returns `true` if this request mutates the keyspace and should be
621    /// rejected when the AOF disk is full. Read-only operations, admin
622    /// commands, and scan operations always proceed.
623    fn is_write(&self) -> bool {
624        #[allow(unreachable_patterns)]
625        match self {
626            ShardRequest::Set { .. }
627            | ShardRequest::Incr { .. }
628            | ShardRequest::Decr { .. }
629            | ShardRequest::IncrBy { .. }
630            | ShardRequest::DecrBy { .. }
631            | ShardRequest::IncrByFloat { .. }
632            | ShardRequest::Append { .. }
633            | ShardRequest::Del { .. }
634            | ShardRequest::Unlink { .. }
635            | ShardRequest::Rename { .. }
636            | ShardRequest::Copy { .. }
637            | ShardRequest::Expire { .. }
638            | ShardRequest::Persist { .. }
639            | ShardRequest::Pexpire { .. }
640            | ShardRequest::LPush { .. }
641            | ShardRequest::RPush { .. }
642            | ShardRequest::LPop { .. }
643            | ShardRequest::RPop { .. }
644            | ShardRequest::LSet { .. }
645            | ShardRequest::LTrim { .. }
646            | ShardRequest::LInsert { .. }
647            | ShardRequest::LRem { .. }
648            | ShardRequest::BLPop { .. }
649            | ShardRequest::BRPop { .. }
650            | ShardRequest::ZAdd { .. }
651            | ShardRequest::ZRem { .. }
652            | ShardRequest::ZIncrBy { .. }
653            | ShardRequest::ZPopMin { .. }
654            | ShardRequest::ZPopMax { .. }
655            | ShardRequest::HSet { .. }
656            | ShardRequest::HDel { .. }
657            | ShardRequest::HIncrBy { .. }
658            | ShardRequest::SAdd { .. }
659            | ShardRequest::SRem { .. }
660            | ShardRequest::SPop { .. }
661            | ShardRequest::SUnionStore { .. }
662            | ShardRequest::SInterStore { .. }
663            | ShardRequest::SDiffStore { .. }
664            | ShardRequest::FlushDb
665            | ShardRequest::FlushDbAsync
666            | ShardRequest::RestoreKey { .. } => true,
667            #[cfg(feature = "protobuf")]
668            ShardRequest::ProtoSet { .. }
669            | ShardRequest::ProtoRegisterAof { .. }
670            | ShardRequest::ProtoSetField { .. }
671            | ShardRequest::ProtoDelField { .. } => true,
672            #[cfg(feature = "vector")]
673            ShardRequest::VAdd { .. }
674            | ShardRequest::VAddBatch { .. }
675            | ShardRequest::VRem { .. } => true,
676            _ => false,
677        }
678    }
679}
680
681/// The shard's response to a request.
682#[derive(Debug)]
683pub enum ShardResponse {
684    /// A value (or None for a cache miss).
685    Value(Option<Value>),
686    /// Simple acknowledgement (e.g. SET).
687    Ok,
688    /// Integer result (e.g. INCR, DECR).
689    Integer(i64),
690    /// Boolean result (e.g. DEL, EXISTS, EXPIRE).
691    Bool(bool),
692    /// TTL query result.
693    Ttl(TtlResult),
694    /// Memory limit reached and eviction policy is NoEviction.
695    OutOfMemory,
696    /// Key count for a shard (DBSIZE).
697    KeyCount(usize),
698    /// Full stats for a shard (INFO).
699    Stats(KeyspaceStats),
700    /// Integer length result (e.g. LPUSH, RPUSH, LLEN).
701    Len(usize),
702    /// Array of bulk values (e.g. LRANGE).
703    Array(Vec<Bytes>),
704    /// The type name of a stored value.
705    TypeName(&'static str),
706    /// The encoding name of a stored value, or None if the key doesn't exist.
707    EncodingName(Option<&'static str>),
708    /// ZADD result: count for the client + actually applied members for AOF.
709    ZAddLen {
710        count: usize,
711        applied: Vec<(f64, String)>,
712    },
713    /// ZREM result: count for the client + actually removed members for AOF.
714    ZRemLen { count: usize, removed: Vec<String> },
715    /// Float score result (e.g. ZSCORE).
716    Score(Option<f64>),
717    /// Rank result (e.g. ZRANK).
718    Rank(Option<usize>),
719    /// Scored array of (member, score) pairs (e.g. ZRANGE).
720    ScoredArray(Vec<(String, f64)>),
721    /// ZINCRBY result: new score + the member/score for AOF persistence.
722    ZIncrByResult { new_score: f64, member: String },
723    /// ZPOPMIN/ZPOPMAX result: popped members for both response and AOF.
724    ZPopResult(Vec<(String, f64)>),
725    /// A bulk string result (e.g. INCRBYFLOAT).
726    BulkString(String),
727    /// Command used against a key holding the wrong kind of value.
728    WrongType,
729    /// An error message.
730    Err(String),
731    /// Scan result: next cursor and list of keys.
732    Scan { cursor: u64, keys: Vec<String> },
733    /// SSCAN/HSCAN/ZSCAN result: next cursor and pre-flattened items.
734    CollectionScan { cursor: u64, items: Vec<Bytes> },
735    /// HGETALL result: all field-value pairs.
736    HashFields(Vec<(String, Bytes)>),
737    /// HDEL result: removed count + field names for AOF.
738    HDelLen { count: usize, removed: Vec<String> },
739    /// Array of strings (e.g. HKEYS).
740    StringArray(Vec<String>),
741    /// Array of integer positions (e.g. LPOS).
742    IntegerArray(Vec<i64>),
743    /// Array of booleans (e.g. SMISMEMBER).
744    BoolArray(Vec<bool>),
745    /// SUNIONSTORE/SINTERSTORE/SDIFFSTORE result: count + stored members for AOF.
746    SetStoreResult { count: usize, members: Vec<String> },
747    /// Serialized key dump with remaining TTL (for MIGRATE/DUMP).
748    KeyDump { data: Vec<u8>, ttl_ms: i64 },
749    /// In-memory snapshot of the full shard state (for replication).
750    SnapshotData { shard_id: u16, data: Vec<u8> },
751    /// HMGET result: array of optional values.
752    OptionalArray(Vec<Option<Bytes>>),
753    /// VADD result: element, vector, and whether it was newly added.
754    #[cfg(feature = "vector")]
755    VAddResult {
756        element: String,
757        vector: Vec<f32>,
758        added: bool,
759    },
760    /// VADD_BATCH result: count of newly added elements + applied entries for AOF.
761    #[cfg(feature = "vector")]
762    VAddBatchResult {
763        added_count: usize,
764        applied: Vec<(String, Vec<f32>)>,
765    },
766    /// VSIM result: nearest neighbors with distances.
767    #[cfg(feature = "vector")]
768    VSimResult(Vec<(String, f32)>),
769    /// VGET result: stored vector or None.
770    #[cfg(feature = "vector")]
771    VectorData(Option<Vec<f32>>),
772    /// VINFO result: vector set metadata.
773    #[cfg(feature = "vector")]
774    VectorInfo(Option<Vec<(String, String)>>),
775    /// PROTO.GET result: (type_name, data, remaining_ttl) or None.
776    #[cfg(feature = "protobuf")]
777    ProtoValue(Option<(String, Bytes, Option<Duration>)>),
778    /// PROTO.TYPE result: message type name or None.
779    #[cfg(feature = "protobuf")]
780    ProtoTypeName(Option<String>),
781    /// Result of an atomic SETFIELD/DELFIELD: carries the updated value
782    /// for AOF persistence.
783    #[cfg(feature = "protobuf")]
784    ProtoFieldUpdated {
785        type_name: String,
786        data: Bytes,
787        expire: Option<Duration>,
788    },
789    /// Key version for WATCH optimistic locking. `None` means missing/expired.
790    Version(Option<u64>),
791}
792
793/// A request (or batch of requests) bundled with reply channels.
794///
795/// The `Batch` variant reduces channel traffic during pipelining: instead
796/// of N individual sends (one per pipelined command), the connection handler
797/// groups commands by target shard and sends one `Batch` message per shard.
798/// This cuts channel contention from O(pipeline_depth) to O(shard_count).
799///
800/// The `SingleReusable` variant avoids per-command `oneshot::channel()`
801/// allocation on the P=1 (no pipeline) path. The connection handler keeps
802/// a long-lived `mpsc::channel(1)` and reuses it across commands.
803#[derive(Debug)]
804pub enum ShardMessage {
805    /// A single request with its reply channel.
806    Single {
807        request: ShardRequest,
808        reply: oneshot::Sender<ShardResponse>,
809    },
810    /// A single request using a reusable mpsc reply channel.
811    ///
812    /// Avoids the heap allocation of `oneshot::channel()` on every command.
813    /// Used for the P=1 fast path where the connection handler sends one
814    /// command at a time and waits for the response before sending the next.
815    SingleReusable {
816        request: ShardRequest,
817        reply: mpsc::Sender<ShardResponse>,
818    },
819    /// Multiple requests batched for a single channel send.
820    Batch(Vec<(ShardRequest, oneshot::Sender<ShardResponse>)>),
821}
822
823/// A cloneable handle for sending commands to a shard task.
824///
825/// Wraps the mpsc sender so callers don't need to manage oneshot
826/// channels directly.
827#[derive(Debug, Clone)]
828pub struct ShardHandle {
829    tx: mpsc::Sender<ShardMessage>,
830}
831
832impl ShardHandle {
833    /// Sends a request and waits for the response.
834    ///
835    /// Returns `ShardError::Unavailable` if the shard task has stopped.
836    pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
837        let rx = self.dispatch(request).await?;
838        rx.await.map_err(|_| ShardError::Unavailable)
839    }
840
841    /// Sends a request and returns the reply channel without waiting
842    /// for the response. Used by `Engine::broadcast` to fan out to
843    /// all shards before collecting results, and by
844    /// `Engine::dispatch_to_shard` for the dispatch-collect pipeline.
845    pub async fn dispatch(
846        &self,
847        request: ShardRequest,
848    ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
849        let (reply_tx, reply_rx) = oneshot::channel();
850        let msg = ShardMessage::Single {
851            request,
852            reply: reply_tx,
853        };
854        self.tx
855            .send(msg)
856            .await
857            .map_err(|_| ShardError::Unavailable)?;
858        Ok(reply_rx)
859    }
860
861    /// Sends a request using a caller-owned reply channel.
862    ///
863    /// Unlike `dispatch()`, this doesn't allocate a oneshot per call.
864    /// The caller reuses the same `mpsc::Sender` across commands, saving
865    /// a heap allocation per P=1 round-trip.
866    pub async fn dispatch_reusable(
867        &self,
868        request: ShardRequest,
869        reply: mpsc::Sender<ShardResponse>,
870    ) -> Result<(), ShardError> {
871        self.tx
872            .send(ShardMessage::SingleReusable { request, reply })
873            .await
874            .map_err(|_| ShardError::Unavailable)
875    }
876
877    /// Sends a batch of requests as a single channel message.
878    ///
879    /// Returns one receiver per request, in the same order. For a single
880    /// request, falls through to `dispatch()` to avoid the batch overhead.
881    ///
882    /// This is the key optimization for pipelining: N commands targeting
883    /// the same shard consume 1 channel slot instead of N.
884    pub async fn dispatch_batch(
885        &self,
886        requests: Vec<ShardRequest>,
887    ) -> Result<Vec<oneshot::Receiver<ShardResponse>>, ShardError> {
888        if requests.len() == 1 {
889            let rx = self
890                .dispatch(requests.into_iter().next().expect("len == 1"))
891                .await?;
892            return Ok(vec![rx]);
893        }
894        let mut receivers = Vec::with_capacity(requests.len());
895        let mut entries = Vec::with_capacity(requests.len());
896        for request in requests {
897            let (tx, rx) = oneshot::channel();
898            entries.push((request, tx));
899            receivers.push(rx);
900        }
901        self.tx
902            .send(ShardMessage::Batch(entries))
903            .await
904            .map_err(|_| ShardError::Unavailable)?;
905        Ok(receivers)
906    }
907}
908
909/// Everything needed to run a shard on a specific runtime.
910///
911/// Created by [`prepare_shard`] without spawning any tasks. The caller
912/// is responsible for passing this to [`run_prepared`] on the desired
913/// tokio runtime — this is the hook that enables thread-per-core
914/// deployment where each worker thread runs its own shard.
915pub struct PreparedShard {
916    rx: mpsc::Receiver<ShardMessage>,
917    config: ShardConfig,
918    persistence: Option<ShardPersistenceConfig>,
919    drop_handle: Option<DropHandle>,
920    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
921    #[cfg(feature = "protobuf")]
922    schema_registry: Option<crate::schema::SharedSchemaRegistry>,
923}
924
925/// Creates the channel and prepared shard without spawning any tasks.
926///
927/// Returns the `ShardHandle` for sending commands and the `PreparedShard`
928/// that must be driven on the target runtime via [`run_prepared`].
929pub fn prepare_shard(
930    buffer: usize,
931    config: ShardConfig,
932    persistence: Option<ShardPersistenceConfig>,
933    drop_handle: Option<DropHandle>,
934    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
935    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
936) -> (ShardHandle, PreparedShard) {
937    let (tx, rx) = mpsc::channel(buffer);
938    let prepared = PreparedShard {
939        rx,
940        config,
941        persistence,
942        drop_handle,
943        replication_tx,
944        #[cfg(feature = "protobuf")]
945        schema_registry,
946    };
947    (ShardHandle { tx }, prepared)
948}
949
950/// Runs the shard's main loop. Call this on the target runtime.
951///
952/// Consumes the `PreparedShard` and enters the infinite recv/expiry/fsync
953/// select loop. Returns when the channel is closed (all senders dropped).
954pub async fn run_prepared(prepared: PreparedShard) {
955    run_shard(
956        prepared.rx,
957        prepared.config,
958        prepared.persistence,
959        prepared.drop_handle,
960        prepared.replication_tx,
961        #[cfg(feature = "protobuf")]
962        prepared.schema_registry,
963    )
964    .await
965}
966
967/// Spawns a shard task and returns the handle for communicating with it.
968///
969/// `buffer` controls the mpsc channel capacity — higher values absorb
970/// burst traffic at the cost of memory. When `drop_handle` is provided,
971/// large value deallocations are deferred to the background drop thread.
972///
973/// This is a convenience wrapper around [`prepare_shard`] + [`run_prepared`]
974/// for the common case where you want to spawn on the current runtime.
975pub fn spawn_shard(
976    buffer: usize,
977    config: ShardConfig,
978    persistence: Option<ShardPersistenceConfig>,
979    drop_handle: Option<DropHandle>,
980    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
981    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
982) -> ShardHandle {
983    let (handle, prepared) = prepare_shard(
984        buffer,
985        config,
986        persistence,
987        drop_handle,
988        replication_tx,
989        #[cfg(feature = "protobuf")]
990        schema_registry,
991    );
992    tokio::spawn(run_prepared(prepared));
993    handle
994}
995
996/// The shard's main loop. Processes messages and runs periodic
997/// active expiration until the channel closes.
998async fn run_shard(
999    mut rx: mpsc::Receiver<ShardMessage>,
1000    config: ShardConfig,
1001    persistence: Option<ShardPersistenceConfig>,
1002    drop_handle: Option<DropHandle>,
1003    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1004    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1005) {
1006    let shard_id = config.shard_id;
1007    let mut keyspace = Keyspace::with_config(config);
1008
1009    if let Some(handle) = drop_handle.clone() {
1010        keyspace.set_drop_handle(handle);
1011    }
1012
1013    // -- recovery --
1014    if let Some(ref pcfg) = persistence {
1015        #[cfg(feature = "encryption")]
1016        let result = if let Some(ref key) = pcfg.encryption_key {
1017            recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
1018        } else {
1019            recovery::recover_shard(&pcfg.data_dir, shard_id)
1020        };
1021        #[cfg(not(feature = "encryption"))]
1022        let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
1023        let count = result.entries.len();
1024        for entry in result.entries {
1025            let value = match entry.value {
1026                RecoveredValue::String(data) => Value::String(data),
1027                RecoveredValue::List(deque) => Value::List(deque),
1028                RecoveredValue::SortedSet(members) => {
1029                    let mut ss = crate::types::sorted_set::SortedSet::new();
1030                    for (score, member) in members {
1031                        ss.add(&member, score);
1032                    }
1033                    Value::SortedSet(Box::new(ss))
1034                }
1035                RecoveredValue::Hash(map) => {
1036                    Value::Hash(Box::new(crate::types::hash::HashValue::from(map)))
1037                }
1038                RecoveredValue::Set(set) => Value::Set(Box::new(set)),
1039                #[cfg(feature = "vector")]
1040                RecoveredValue::Vector {
1041                    metric,
1042                    quantization,
1043                    connectivity,
1044                    expansion_add,
1045                    elements,
1046                } => {
1047                    use crate::types::vector::{DistanceMetric, QuantizationType, VectorSet};
1048                    let dim = elements.first().map(|(_, v)| v.len()).unwrap_or(0);
1049                    match VectorSet::new(
1050                        dim,
1051                        DistanceMetric::from_u8(metric),
1052                        QuantizationType::from_u8(quantization),
1053                        connectivity as usize,
1054                        expansion_add as usize,
1055                    ) {
1056                        Ok(mut vs) => {
1057                            for (element, vector) in elements {
1058                                if let Err(e) = vs.add(element, &vector) {
1059                                    warn!("vector recovery: failed to add element: {e}");
1060                                }
1061                            }
1062                            Value::Vector(vs)
1063                        }
1064                        Err(e) => {
1065                            warn!("vector recovery: failed to create index: {e}");
1066                            continue;
1067                        }
1068                    }
1069                }
1070                #[cfg(feature = "protobuf")]
1071                RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
1072            };
1073            keyspace.restore(entry.key, value, entry.ttl);
1074        }
1075        if count > 0 {
1076            info!(
1077                shard_id,
1078                recovered_keys = count,
1079                snapshot = result.loaded_snapshot,
1080                aof = result.replayed_aof,
1081                "recovered shard state"
1082            );
1083        }
1084
1085        // restore schemas found in the AOF into the shared registry
1086        #[cfg(feature = "protobuf")]
1087        if let Some(ref registry) = schema_registry {
1088            if !result.schemas.is_empty() {
1089                if let Ok(mut reg) = registry.write() {
1090                    let schema_count = result.schemas.len();
1091                    for (name, descriptor) in result.schemas {
1092                        reg.restore(name, descriptor);
1093                    }
1094                    info!(
1095                        shard_id,
1096                        schemas = schema_count,
1097                        "restored schemas from AOF"
1098                    );
1099                }
1100            }
1101        }
1102    }
1103
1104    // -- AOF writer --
1105    let mut aof_writer: Option<AofWriter> = match &persistence {
1106        Some(pcfg) if pcfg.append_only => {
1107            let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
1108            #[cfg(feature = "encryption")]
1109            let result = if let Some(ref key) = pcfg.encryption_key {
1110                AofWriter::open_encrypted(path, key.clone())
1111            } else {
1112                AofWriter::open(path)
1113            };
1114            #[cfg(not(feature = "encryption"))]
1115            let result = AofWriter::open(path);
1116            match result {
1117                Ok(w) => Some(w),
1118                Err(e) => {
1119                    warn!(shard_id, "failed to open AOF writer: {e}");
1120                    None
1121                }
1122            }
1123        }
1124        _ => None,
1125    };
1126
1127    let fsync_policy = persistence
1128        .as_ref()
1129        .map(|p| p.fsync_policy)
1130        .unwrap_or(FsyncPolicy::No);
1131
1132    // monotonically increasing per-shard replication offset
1133    let mut replication_offset: u64 = 0;
1134
1135    // waiter registries for blocking list operations (BLPOP/BRPOP)
1136    let mut lpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1137    let mut rpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1138
1139    // consecutive AOF write/sync failure counter for rate-limited logging
1140    let mut aof_errors: u32 = 0;
1141
1142    // when true, write commands are rejected with an error until disk
1143    // space is available again (detected on the periodic fsync tick).
1144    let mut disk_full: bool = false;
1145
1146    // -- tickers --
1147    let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
1148    expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1149
1150    let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
1151    fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1152
1153    loop {
1154        tokio::select! {
1155            msg = rx.recv() => {
1156                match msg {
1157                    Some(msg) => {
1158                        let mut ctx = ProcessCtx {
1159                            keyspace: &mut keyspace,
1160                            aof_writer: &mut aof_writer,
1161                            fsync_policy,
1162                            persistence: &persistence,
1163                            drop_handle: &drop_handle,
1164                            shard_id,
1165                            replication_tx: &replication_tx,
1166                            replication_offset: &mut replication_offset,
1167                            lpop_waiters: &mut lpop_waiters,
1168                            rpop_waiters: &mut rpop_waiters,
1169                            aof_errors: &mut aof_errors,
1170                            disk_full: &mut disk_full,
1171                            #[cfg(feature = "protobuf")]
1172                            schema_registry: &schema_registry,
1173                        };
1174                        process_message(msg, &mut ctx);
1175
1176                        // drain any pending messages without re-entering select!.
1177                        // this amortizes the select! overhead across bursts of
1178                        // pipelined commands that arrived while we processed the
1179                        // first message.
1180                        while let Ok(msg) = rx.try_recv() {
1181                            process_message(msg, &mut ctx);
1182                        }
1183                    }
1184                    None => break, // channel closed, shard shutting down
1185                }
1186            }
1187            _ = expiry_tick.tick() => {
1188                expiry::run_expiration_cycle(&mut keyspace);
1189            }
1190            _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
1191                if let Some(ref mut writer) = aof_writer {
1192                    if let Err(e) = writer.sync() {
1193                        if aof::log_aof_error(shard_id, &mut aof_errors, "sync", &e) {
1194                            disk_full = true;
1195                        }
1196                    } else if aof_errors > 0 {
1197                        let missed = aof_errors;
1198                        aof_errors = 0;
1199                        if disk_full {
1200                            disk_full = false;
1201                            info!(shard_id, missed_errors = missed, "aof sync recovered, accepting writes again");
1202                        } else {
1203                            info!(shard_id, missed_errors = missed, "aof sync recovered");
1204                        }
1205                    }
1206                }
1207            }
1208        }
1209    }
1210
1211    // flush AOF on clean shutdown
1212    if let Some(ref mut writer) = aof_writer {
1213        let _ = writer.sync();
1214    }
1215}
1216
1217/// Per-shard processing context passed into `process_message`.
1218///
1219/// Groups the mutable and configuration fields so the call site stays
1220/// readable and the parameter count stays reasonable.
1221struct ProcessCtx<'a> {
1222    keyspace: &'a mut Keyspace,
1223    aof_writer: &'a mut Option<AofWriter>,
1224    fsync_policy: FsyncPolicy,
1225    persistence: &'a Option<ShardPersistenceConfig>,
1226    drop_handle: &'a Option<DropHandle>,
1227    shard_id: u16,
1228    replication_tx: &'a Option<broadcast::Sender<ReplicationEvent>>,
1229    replication_offset: &'a mut u64,
1230    /// Waiters for BLPOP — keyed by list name, FIFO order.
1231    lpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1232    /// Waiters for BRPOP — keyed by list name, FIFO order.
1233    rpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1234    /// Consecutive AOF write/sync failures. Used to rate-limit error logging
1235    /// so a sustained disk-full condition doesn't flood logs.
1236    aof_errors: &'a mut u32,
1237    /// When true, write commands are rejected until disk space recovers.
1238    disk_full: &'a mut bool,
1239    #[cfg(feature = "protobuf")]
1240    schema_registry: &'a Option<crate::schema::SharedSchemaRegistry>,
1241}
1242
1243/// Dispatches a single or batched message to the shard's keyspace.
1244///
1245/// Called both in the main `recv()` path and in the `try_recv()` drain loop
1246/// to amortize tokio select overhead across pipelined commands.
1247fn process_message(msg: ShardMessage, ctx: &mut ProcessCtx<'_>) {
1248    match msg {
1249        ShardMessage::Single { request, reply } => {
1250            process_single(request, ReplySender::Oneshot(reply), ctx);
1251        }
1252        ShardMessage::SingleReusable { request, reply } => {
1253            process_single(request, ReplySender::Reusable(reply), ctx);
1254        }
1255        ShardMessage::Batch(entries) => {
1256            for (request, reply) in entries {
1257                process_single(request, ReplySender::Oneshot(reply), ctx);
1258            }
1259        }
1260    }
1261}
1262
1263/// Wraps either a oneshot or reusable mpsc sender for reply delivery.
1264///
1265/// The oneshot path is used for pipelined and batch commands. The reusable
1266/// path avoids per-command allocation for the P=1 fast path.
1267enum ReplySender {
1268    Oneshot(oneshot::Sender<ShardResponse>),
1269    Reusable(mpsc::Sender<ShardResponse>),
1270}
1271
1272impl ReplySender {
1273    fn send(self, response: ShardResponse) {
1274        match self {
1275            ReplySender::Oneshot(tx) => {
1276                let _ = tx.send(response);
1277            }
1278            ReplySender::Reusable(tx) => {
1279                // capacity is 1 and the receiver always drains before
1280                // sending the next command, so try_send won't fail
1281                // under normal operation.
1282                if let Err(e) = tx.try_send(response) {
1283                    debug!("reusable reply channel full or closed: {e}");
1284                }
1285            }
1286        }
1287    }
1288}
1289
1290/// Processes a single request: dispatch, write AOF, broadcast replication,
1291/// and send the response on the reply channel.
1292fn process_single(mut request: ShardRequest, reply: ReplySender, ctx: &mut ProcessCtx<'_>) {
1293    // copy cheap fields upfront to avoid field-borrow conflicts below
1294    let fsync_policy = ctx.fsync_policy;
1295    let shard_id = ctx.shard_id;
1296
1297    // reject writes when AOF is enabled and disk is full. reads and admin
1298    // commands still go through so operators can inspect and recover.
1299    if *ctx.disk_full && ctx.aof_writer.is_some() && request.is_write() {
1300        reply.send(ShardResponse::Err(
1301            "ERR disk full, write rejected — free disk space to resume writes".into(),
1302        ));
1303        return;
1304    }
1305
1306    // handle blocking pop requests before dispatch — they carry a waiter
1307    // oneshot that must be consumed here rather than going through the
1308    // normal dispatch → response path.
1309    match request {
1310        ShardRequest::BLPop { key, waiter } => {
1311            blocking::handle_blocking_pop(&key, waiter, true, reply, ctx);
1312            return;
1313        }
1314        ShardRequest::BRPop { key, waiter } => {
1315            blocking::handle_blocking_pop(&key, waiter, false, reply, ctx);
1316            return;
1317        }
1318        _ => {}
1319    }
1320
1321    let request_kind = describe_request(&request);
1322    let mut response = dispatch(
1323        ctx.keyspace,
1324        &mut request,
1325        #[cfg(feature = "protobuf")]
1326        ctx.schema_registry,
1327    );
1328
1329    // after LPush/RPush, check if any blocked clients are waiting.
1330    // done before consuming the request so we can borrow the key.
1331    if let ShardRequest::LPush { ref key, .. } | ShardRequest::RPush { ref key, .. } = request {
1332        if matches!(response, ShardResponse::Len(_)) {
1333            blocking::wake_blocked_waiters(key, ctx);
1334        }
1335    }
1336
1337    // consume the request to move owned data into AOF records (avoids cloning).
1338    // response is &mut so VAddBatch can steal applied entries instead of cloning
1339    // vectors — the connection handler only uses added_count.
1340    let records = aof::to_aof_records(request, &mut response);
1341
1342    // write AOF records for successful mutations
1343    if let Some(ref mut writer) = *ctx.aof_writer {
1344        let mut batch_ok = true;
1345        for record in &records {
1346            if let Err(e) = writer.write_record(record) {
1347                if aof::log_aof_error(shard_id, ctx.aof_errors, "write", &e) {
1348                    *ctx.disk_full = true;
1349                }
1350                batch_ok = false;
1351            }
1352        }
1353        if !records.is_empty() && fsync_policy == FsyncPolicy::Always {
1354            if let Err(e) = writer.sync() {
1355                if aof::log_aof_error(shard_id, ctx.aof_errors, "sync", &e) {
1356                    *ctx.disk_full = true;
1357                }
1358                batch_ok = false;
1359            }
1360        }
1361        if batch_ok && *ctx.aof_errors > 0 {
1362            let missed = *ctx.aof_errors;
1363            *ctx.aof_errors = 0;
1364            *ctx.disk_full = false;
1365            info!(shard_id, missed_errors = missed, "aof writes recovered");
1366        }
1367    }
1368
1369    // broadcast mutation events to replication subscribers
1370    if let Some(ref tx) = *ctx.replication_tx {
1371        for record in records {
1372            *ctx.replication_offset += 1;
1373            // ignore send errors — no subscribers or lagged consumers
1374            let _ = tx.send(ReplicationEvent {
1375                shard_id,
1376                offset: *ctx.replication_offset,
1377                record,
1378            });
1379        }
1380    }
1381
1382    // handle special requests that need access to persistence state
1383    match request_kind {
1384        RequestKind::Snapshot => {
1385            let resp = persistence::handle_snapshot(ctx.keyspace, ctx.persistence, shard_id);
1386            reply.send(resp);
1387            return;
1388        }
1389        RequestKind::SerializeSnapshot => {
1390            let resp = persistence::handle_serialize_snapshot(ctx.keyspace, shard_id);
1391            reply.send(resp);
1392            return;
1393        }
1394        RequestKind::RewriteAof => {
1395            let resp = persistence::handle_rewrite(
1396                ctx.keyspace,
1397                ctx.persistence,
1398                ctx.aof_writer,
1399                shard_id,
1400                #[cfg(feature = "protobuf")]
1401                ctx.schema_registry,
1402            );
1403            reply.send(resp);
1404            return;
1405        }
1406        RequestKind::FlushDbAsync => {
1407            let old_entries = ctx.keyspace.flush_async();
1408            if let Some(ref handle) = *ctx.drop_handle {
1409                handle.defer_entries(old_entries);
1410            }
1411            reply.send(ShardResponse::Ok);
1412            return;
1413        }
1414        RequestKind::Other => {}
1415    }
1416
1417    reply.send(response);
1418}
1419
1420/// Lightweight tag so we can identify requests that need special
1421/// handling after dispatch without borrowing the request again.
1422enum RequestKind {
1423    Snapshot,
1424    SerializeSnapshot,
1425    RewriteAof,
1426    FlushDbAsync,
1427    Other,
1428}
1429
1430fn describe_request(req: &ShardRequest) -> RequestKind {
1431    match req {
1432        ShardRequest::Snapshot => RequestKind::Snapshot,
1433        ShardRequest::SerializeSnapshot => RequestKind::SerializeSnapshot,
1434        ShardRequest::RewriteAof => RequestKind::RewriteAof,
1435        ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
1436        _ => RequestKind::Other,
1437    }
1438}
1439
1440/// Converts an `IncrError` result into a `ShardResponse::Integer`.
1441fn incr_result(result: Result<i64, IncrError>) -> ShardResponse {
1442    match result {
1443        Ok(val) => ShardResponse::Integer(val),
1444        Err(IncrError::WrongType) => ShardResponse::WrongType,
1445        Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
1446        Err(e) => ShardResponse::Err(e.to_string()),
1447    }
1448}
1449
1450/// Converts a `WriteError` result into a `ShardResponse::Len`.
1451fn write_result_len(result: Result<usize, WriteError>) -> ShardResponse {
1452    match result {
1453        Ok(len) => ShardResponse::Len(len),
1454        Err(WriteError::WrongType) => ShardResponse::WrongType,
1455        Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1456    }
1457}
1458
1459fn store_set_response(result: Result<(usize, Vec<String>), WriteError>) -> ShardResponse {
1460    match result {
1461        Ok((count, members)) => ShardResponse::SetStoreResult { count, members },
1462        Err(WriteError::WrongType) => ShardResponse::WrongType,
1463        Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1464    }
1465}
1466
1467/// Routes a request to the appropriate keyspace operation and returns a response.
1468///
1469/// This is the hot path — every read and write goes through here.
1470fn dispatch(
1471    ks: &mut Keyspace,
1472    req: &mut ShardRequest,
1473    #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1474) -> ShardResponse {
1475    match req {
1476        ShardRequest::Get { key } => match ks.get_string(key) {
1477            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1478            Err(_) => ShardResponse::WrongType,
1479        },
1480        ShardRequest::Set {
1481            key,
1482            value,
1483            expire,
1484            nx,
1485            xx,
1486        } => match ks.set(key.clone(), value.clone(), *expire, *nx, *xx) {
1487            SetResult::Ok => ShardResponse::Ok,
1488            SetResult::Blocked => ShardResponse::Value(None),
1489            SetResult::OutOfMemory => ShardResponse::OutOfMemory,
1490        },
1491        ShardRequest::Incr { key } => incr_result(ks.incr(key)),
1492        ShardRequest::Decr { key } => incr_result(ks.decr(key)),
1493        ShardRequest::IncrBy { key, delta } => incr_result(ks.incr_by(key, *delta)),
1494        ShardRequest::DecrBy { key, delta } => match delta.checked_neg() {
1495            Some(neg) => incr_result(ks.incr_by(key, neg)),
1496            None => ShardResponse::Err("ERR increment or decrement would overflow".into()),
1497        },
1498        ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
1499            Ok(val) => ShardResponse::BulkString(val),
1500            Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
1501            Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
1502            Err(e) => ShardResponse::Err(e.to_string()),
1503        },
1504        ShardRequest::Append { key, value } => write_result_len(ks.append(key, value)),
1505        ShardRequest::Strlen { key } => match ks.strlen(key) {
1506            Ok(len) => ShardResponse::Len(len),
1507            Err(_) => ShardResponse::WrongType,
1508        },
1509        ShardRequest::GetRange { key, start, end } => match ks.getrange(key, *start, *end) {
1510            Ok(data) => ShardResponse::Value(Some(Value::String(data))),
1511            Err(_) => ShardResponse::WrongType,
1512        },
1513        ShardRequest::SetRange { key, offset, value } => {
1514            write_result_len(ks.setrange(key, *offset, value))
1515        }
1516        ShardRequest::Keys { pattern } => {
1517            let keys = ks.keys(pattern);
1518            ShardResponse::StringArray(keys)
1519        }
1520        ShardRequest::Rename { key, newkey } => {
1521            use crate::keyspace::RenameError;
1522            match ks.rename(key, newkey) {
1523                Ok(()) => ShardResponse::Ok,
1524                Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1525            }
1526        }
1527        ShardRequest::Copy {
1528            source,
1529            destination,
1530            replace,
1531        } => {
1532            use crate::keyspace::CopyError;
1533            match ks.copy(source, destination, *replace) {
1534                Ok(copied) => ShardResponse::Bool(copied),
1535                Err(CopyError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1536                Err(CopyError::OutOfMemory) => ShardResponse::OutOfMemory,
1537            }
1538        }
1539        ShardRequest::ObjectEncoding { key } => ShardResponse::EncodingName(ks.encoding(key)),
1540        ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
1541        ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
1542        ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
1543        ShardRequest::RandomKey => match ks.random_key() {
1544            Some(k) => ShardResponse::StringArray(vec![k]),
1545            None => ShardResponse::StringArray(vec![]),
1546        },
1547        ShardRequest::Touch { key } => ShardResponse::Bool(ks.touch(key)),
1548        ShardRequest::Sort {
1549            key,
1550            desc,
1551            alpha,
1552            limit,
1553        } => match ks.sort(key, *desc, *alpha, *limit) {
1554            Ok(items) => ShardResponse::Array(items),
1555            Err(_) => ShardResponse::WrongType,
1556        },
1557        ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
1558        ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
1559        ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
1560        ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
1561        ShardRequest::Pexpire { key, milliseconds } => {
1562            ShardResponse::Bool(ks.pexpire(key, *milliseconds))
1563        }
1564        ShardRequest::LPush { key, values } => write_result_len(ks.lpush(key, values)),
1565        ShardRequest::RPush { key, values } => write_result_len(ks.rpush(key, values)),
1566        ShardRequest::LPop { key } => match ks.lpop(key) {
1567            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1568            Err(_) => ShardResponse::WrongType,
1569        },
1570        ShardRequest::RPop { key } => match ks.rpop(key) {
1571            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1572            Err(_) => ShardResponse::WrongType,
1573        },
1574        ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
1575            Ok(items) => ShardResponse::Array(items),
1576            Err(_) => ShardResponse::WrongType,
1577        },
1578        ShardRequest::LLen { key } => match ks.llen(key) {
1579            Ok(len) => ShardResponse::Len(len),
1580            Err(_) => ShardResponse::WrongType,
1581        },
1582        ShardRequest::LIndex { key, index } => match ks.lindex(key, *index) {
1583            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1584            Err(_) => ShardResponse::WrongType,
1585        },
1586        ShardRequest::LSet { key, index, value } => match ks.lset(key, *index, value.clone()) {
1587            Ok(()) => ShardResponse::Ok,
1588            Err(e) => match e {
1589                LsetError::WrongType => ShardResponse::WrongType,
1590                LsetError::NoSuchKey => ShardResponse::Err("ERR no such key".into()),
1591                LsetError::IndexOutOfRange => ShardResponse::Err("ERR index out of range".into()),
1592            },
1593        },
1594        ShardRequest::LTrim { key, start, stop } => match ks.ltrim(key, *start, *stop) {
1595            Ok(()) => ShardResponse::Ok,
1596            Err(_) => ShardResponse::WrongType,
1597        },
1598        ShardRequest::LInsert {
1599            key,
1600            before,
1601            pivot,
1602            value,
1603        } => match ks.linsert(key, *before, pivot, value.clone()) {
1604            Ok(n) => ShardResponse::Integer(n),
1605            Err(WriteError::WrongType) => ShardResponse::WrongType,
1606            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1607        },
1608        ShardRequest::LRem { key, count, value } => match ks.lrem(key, *count, value) {
1609            Ok(n) => ShardResponse::Len(n),
1610            Err(_) => ShardResponse::WrongType,
1611        },
1612        ShardRequest::LPos {
1613            key,
1614            element,
1615            rank,
1616            count,
1617            maxlen,
1618        } => match ks.lpos(key, element, *rank, *count, *maxlen) {
1619            Ok(positions) => ShardResponse::IntegerArray(positions),
1620            Err(_) => ShardResponse::WrongType,
1621        },
1622        ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
1623        ShardRequest::ZAdd {
1624            key,
1625            members,
1626            nx,
1627            xx,
1628            gt,
1629            lt,
1630            ch,
1631        } => {
1632            let flags = ZAddFlags {
1633                nx: *nx,
1634                xx: *xx,
1635                gt: *gt,
1636                lt: *lt,
1637                ch: *ch,
1638            };
1639            match ks.zadd(key, members, &flags) {
1640                Ok(result) => ShardResponse::ZAddLen {
1641                    count: result.count,
1642                    applied: result.applied,
1643                },
1644                Err(WriteError::WrongType) => ShardResponse::WrongType,
1645                Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1646            }
1647        }
1648        ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
1649            Ok(removed) => ShardResponse::ZRemLen {
1650                count: removed.len(),
1651                removed,
1652            },
1653            Err(_) => ShardResponse::WrongType,
1654        },
1655        ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
1656            Ok(score) => ShardResponse::Score(score),
1657            Err(_) => ShardResponse::WrongType,
1658        },
1659        ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
1660            Ok(rank) => ShardResponse::Rank(rank),
1661            Err(_) => ShardResponse::WrongType,
1662        },
1663        ShardRequest::ZCard { key } => match ks.zcard(key) {
1664            Ok(len) => ShardResponse::Len(len),
1665            Err(_) => ShardResponse::WrongType,
1666        },
1667        ShardRequest::ZRevRank { key, member } => match ks.zrevrank(key, member) {
1668            Ok(rank) => ShardResponse::Rank(rank),
1669            Err(_) => ShardResponse::WrongType,
1670        },
1671        ShardRequest::ZRange {
1672            key, start, stop, ..
1673        } => match ks.zrange(key, *start, *stop) {
1674            Ok(items) => ShardResponse::ScoredArray(items),
1675            Err(_) => ShardResponse::WrongType,
1676        },
1677        ShardRequest::ZRevRange {
1678            key, start, stop, ..
1679        } => match ks.zrevrange(key, *start, *stop) {
1680            Ok(items) => ShardResponse::ScoredArray(items),
1681            Err(_) => ShardResponse::WrongType,
1682        },
1683        ShardRequest::ZCount { key, min, max } => match ks.zcount(key, *min, *max) {
1684            Ok(count) => ShardResponse::Len(count),
1685            Err(_) => ShardResponse::WrongType,
1686        },
1687        ShardRequest::ZIncrBy {
1688            key,
1689            increment,
1690            member,
1691        } => match ks.zincrby(key, *increment, member) {
1692            Ok(new_score) => ShardResponse::ZIncrByResult {
1693                new_score,
1694                member: member.clone(),
1695            },
1696            Err(WriteError::WrongType) => ShardResponse::WrongType,
1697            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1698        },
1699        ShardRequest::ZRangeByScore {
1700            key,
1701            min,
1702            max,
1703            offset,
1704            count,
1705        } => match ks.zrangebyscore(key, *min, *max, *offset, *count) {
1706            Ok(items) => ShardResponse::ScoredArray(items),
1707            Err(_) => ShardResponse::WrongType,
1708        },
1709        ShardRequest::ZRevRangeByScore {
1710            key,
1711            min,
1712            max,
1713            offset,
1714            count,
1715        } => match ks.zrevrangebyscore(key, *min, *max, *offset, *count) {
1716            Ok(items) => ShardResponse::ScoredArray(items),
1717            Err(_) => ShardResponse::WrongType,
1718        },
1719        ShardRequest::ZPopMin { key, count } => match ks.zpopmin(key, *count) {
1720            Ok(items) => ShardResponse::ZPopResult(items),
1721            Err(_) => ShardResponse::WrongType,
1722        },
1723        ShardRequest::ZPopMax { key, count } => match ks.zpopmax(key, *count) {
1724            Ok(items) => ShardResponse::ZPopResult(items),
1725            Err(_) => ShardResponse::WrongType,
1726        },
1727        ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
1728        ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
1729        ShardRequest::KeyVersion { ref key } => ShardResponse::Version(ks.key_version(key)),
1730        ShardRequest::FlushDb => {
1731            ks.clear();
1732            ShardResponse::Ok
1733        }
1734        ShardRequest::Scan {
1735            cursor,
1736            count,
1737            pattern,
1738        } => {
1739            let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
1740            ShardResponse::Scan {
1741                cursor: next_cursor,
1742                keys,
1743            }
1744        }
1745        ShardRequest::HSet { key, fields } => write_result_len(ks.hset(key, fields)),
1746        ShardRequest::HGet { key, field } => match ks.hget(key, field) {
1747            Ok(val) => ShardResponse::Value(val.map(Value::String)),
1748            Err(_) => ShardResponse::WrongType,
1749        },
1750        ShardRequest::HGetAll { key } => match ks.hgetall(key) {
1751            Ok(fields) => ShardResponse::HashFields(fields),
1752            Err(_) => ShardResponse::WrongType,
1753        },
1754        ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
1755            Ok(removed) => ShardResponse::HDelLen {
1756                count: removed.len(),
1757                removed,
1758            },
1759            Err(_) => ShardResponse::WrongType,
1760        },
1761        ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
1762            Ok(exists) => ShardResponse::Bool(exists),
1763            Err(_) => ShardResponse::WrongType,
1764        },
1765        ShardRequest::HLen { key } => match ks.hlen(key) {
1766            Ok(len) => ShardResponse::Len(len),
1767            Err(_) => ShardResponse::WrongType,
1768        },
1769        ShardRequest::HIncrBy { key, field, delta } => incr_result(ks.hincrby(key, field, *delta)),
1770        ShardRequest::HKeys { key } => match ks.hkeys(key) {
1771            Ok(keys) => ShardResponse::StringArray(keys),
1772            Err(_) => ShardResponse::WrongType,
1773        },
1774        ShardRequest::HVals { key } => match ks.hvals(key) {
1775            Ok(vals) => ShardResponse::Array(vals),
1776            Err(_) => ShardResponse::WrongType,
1777        },
1778        ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
1779            Ok(vals) => ShardResponse::OptionalArray(vals),
1780            Err(_) => ShardResponse::WrongType,
1781        },
1782        ShardRequest::SAdd { key, members } => write_result_len(ks.sadd(key, members)),
1783        ShardRequest::SRem { key, members } => match ks.srem(key, members) {
1784            Ok(count) => ShardResponse::Len(count),
1785            Err(_) => ShardResponse::WrongType,
1786        },
1787        ShardRequest::SMembers { key } => match ks.smembers(key) {
1788            Ok(members) => ShardResponse::StringArray(members),
1789            Err(_) => ShardResponse::WrongType,
1790        },
1791        ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
1792            Ok(exists) => ShardResponse::Bool(exists),
1793            Err(_) => ShardResponse::WrongType,
1794        },
1795        ShardRequest::SCard { key } => match ks.scard(key) {
1796            Ok(count) => ShardResponse::Len(count),
1797            Err(_) => ShardResponse::WrongType,
1798        },
1799        ShardRequest::SUnion { keys } => match ks.sunion(keys) {
1800            Ok(members) => ShardResponse::StringArray(members),
1801            Err(_) => ShardResponse::WrongType,
1802        },
1803        ShardRequest::SInter { keys } => match ks.sinter(keys) {
1804            Ok(members) => ShardResponse::StringArray(members),
1805            Err(_) => ShardResponse::WrongType,
1806        },
1807        ShardRequest::SDiff { keys } => match ks.sdiff(keys) {
1808            Ok(members) => ShardResponse::StringArray(members),
1809            Err(_) => ShardResponse::WrongType,
1810        },
1811        ShardRequest::SUnionStore { dest, keys } => store_set_response(ks.sunionstore(dest, keys)),
1812        ShardRequest::SInterStore { dest, keys } => store_set_response(ks.sinterstore(dest, keys)),
1813        ShardRequest::SDiffStore { dest, keys } => store_set_response(ks.sdiffstore(dest, keys)),
1814        ShardRequest::SRandMember { key, count } => match ks.srandmember(key, *count) {
1815            Ok(members) => ShardResponse::StringArray(members),
1816            Err(_) => ShardResponse::WrongType,
1817        },
1818        ShardRequest::SPop { key, count } => match ks.spop(key, *count) {
1819            Ok(members) => ShardResponse::StringArray(members),
1820            Err(_) => ShardResponse::WrongType,
1821        },
1822        ShardRequest::SMisMember { key, members } => match ks.smismember(key, members) {
1823            Ok(results) => ShardResponse::BoolArray(results),
1824            Err(_) => ShardResponse::WrongType,
1825        },
1826        ShardRequest::SScan {
1827            key,
1828            cursor,
1829            count,
1830            pattern,
1831        } => match ks.scan_set(key, *cursor, *count, pattern.as_deref()) {
1832            Ok((next, members)) => {
1833                let items = members.into_iter().map(Bytes::from).collect();
1834                ShardResponse::CollectionScan {
1835                    cursor: next,
1836                    items,
1837                }
1838            }
1839            Err(_) => ShardResponse::WrongType,
1840        },
1841        ShardRequest::HScan {
1842            key,
1843            cursor,
1844            count,
1845            pattern,
1846        } => match ks.scan_hash(key, *cursor, *count, pattern.as_deref()) {
1847            Ok((next, fields)) => {
1848                let mut items = Vec::with_capacity(fields.len() * 2);
1849                for (field, value) in fields {
1850                    items.push(Bytes::from(field));
1851                    items.push(value);
1852                }
1853                ShardResponse::CollectionScan {
1854                    cursor: next,
1855                    items,
1856                }
1857            }
1858            Err(_) => ShardResponse::WrongType,
1859        },
1860        ShardRequest::ZScan {
1861            key,
1862            cursor,
1863            count,
1864            pattern,
1865        } => match ks.scan_sorted_set(key, *cursor, *count, pattern.as_deref()) {
1866            Ok((next, members)) => {
1867                let mut items = Vec::with_capacity(members.len() * 2);
1868                for (member, score) in members {
1869                    items.push(Bytes::from(member));
1870                    items.push(Bytes::from(score.to_string()));
1871                }
1872                ShardResponse::CollectionScan {
1873                    cursor: next,
1874                    items,
1875                }
1876            }
1877            Err(_) => ShardResponse::WrongType,
1878        },
1879        ShardRequest::CountKeysInSlot { slot } => {
1880            ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
1881        }
1882        ShardRequest::GetKeysInSlot { slot, count } => {
1883            ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
1884        }
1885        ShardRequest::DumpKey { key } => match ks.dump(key) {
1886            Some((value, ttl_ms)) => {
1887                let snap = persistence::value_to_snap(value);
1888                match snapshot::serialize_snap_value(&snap) {
1889                    Ok(data) => ShardResponse::KeyDump { data, ttl_ms },
1890                    Err(e) => ShardResponse::Err(format!("ERR snapshot serialization failed: {e}")),
1891                }
1892            }
1893            None => ShardResponse::Value(None),
1894        },
1895        ShardRequest::RestoreKey {
1896            key,
1897            ttl_ms,
1898            data,
1899            replace,
1900        } => match snapshot::deserialize_snap_value(data) {
1901            Ok(snap) => {
1902                let exists = ks.exists(key);
1903                if exists && !*replace {
1904                    ShardResponse::Err("ERR Target key name already exists".into())
1905                } else {
1906                    let value = persistence::snap_to_value(snap);
1907                    let ttl = if *ttl_ms == 0 {
1908                        None
1909                    } else {
1910                        Some(Duration::from_millis(*ttl_ms))
1911                    };
1912                    ks.restore(key.clone(), value, ttl);
1913                    ShardResponse::Ok
1914                }
1915            }
1916            Err(e) => ShardResponse::Err(format!("ERR DUMP payload corrupted: {e}")),
1917        },
1918        #[cfg(feature = "vector")]
1919        ShardRequest::VAdd {
1920            key,
1921            element,
1922            vector,
1923            metric,
1924            quantization,
1925            connectivity,
1926            expansion_add,
1927        } => {
1928            use crate::types::vector::{DistanceMetric, QuantizationType};
1929            match ks.vadd(
1930                key,
1931                element.clone(),
1932                vector.clone(),
1933                DistanceMetric::from_u8(*metric),
1934                QuantizationType::from_u8(*quantization),
1935                *connectivity as usize,
1936                *expansion_add as usize,
1937            ) {
1938                Ok(result) => ShardResponse::VAddResult {
1939                    element: result.element,
1940                    vector: result.vector,
1941                    added: result.added,
1942                },
1943                Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
1944                Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1945                Err(crate::keyspace::VectorWriteError::IndexError(e))
1946                | Err(crate::keyspace::VectorWriteError::PartialBatch { message: e, .. }) => {
1947                    ShardResponse::Err(format!("ERR vector index: {e}"))
1948                }
1949            }
1950        }
1951        #[cfg(feature = "vector")]
1952        ShardRequest::VAddBatch {
1953            key,
1954            entries,
1955            metric,
1956            quantization,
1957            connectivity,
1958            expansion_add,
1959            ..
1960        } => {
1961            use crate::types::vector::{DistanceMetric, QuantizationType};
1962            // take ownership of entries to avoid cloning vectors during
1963            // batch insertion. the entries vec in the request becomes empty,
1964            // which is fine because to_aof_records uses response.applied
1965            // instead of request.entries.
1966            let owned_entries = std::mem::take(entries);
1967            match ks.vadd_batch(
1968                key,
1969                owned_entries,
1970                DistanceMetric::from_u8(*metric),
1971                QuantizationType::from_u8(*quantization),
1972                *connectivity as usize,
1973                *expansion_add as usize,
1974            ) {
1975                Ok(result) => ShardResponse::VAddBatchResult {
1976                    added_count: result.added_count,
1977                    applied: result.applied,
1978                },
1979                Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
1980                Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1981                Err(crate::keyspace::VectorWriteError::IndexError(e)) => {
1982                    ShardResponse::Err(format!("ERR vector index: {e}"))
1983                }
1984                Err(crate::keyspace::VectorWriteError::PartialBatch { applied, .. }) => {
1985                    // partial success: return applied vectors for AOF persistence
1986                    ShardResponse::VAddBatchResult {
1987                        added_count: applied.len(),
1988                        applied,
1989                    }
1990                }
1991            }
1992        }
1993        #[cfg(feature = "vector")]
1994        ShardRequest::VSim {
1995            key,
1996            query,
1997            count,
1998            ef_search,
1999        } => match ks.vsim(key, query, *count, *ef_search) {
2000            Ok(results) => ShardResponse::VSimResult(
2001                results
2002                    .into_iter()
2003                    .map(|r| (r.element, r.distance))
2004                    .collect(),
2005            ),
2006            Err(_) => ShardResponse::WrongType,
2007        },
2008        #[cfg(feature = "vector")]
2009        ShardRequest::VRem { key, element } => match ks.vrem(key, element) {
2010            Ok(removed) => ShardResponse::Bool(removed),
2011            Err(_) => ShardResponse::WrongType,
2012        },
2013        #[cfg(feature = "vector")]
2014        ShardRequest::VGet { key, element } => match ks.vget(key, element) {
2015            Ok(data) => ShardResponse::VectorData(data),
2016            Err(_) => ShardResponse::WrongType,
2017        },
2018        #[cfg(feature = "vector")]
2019        ShardRequest::VCard { key } => match ks.vcard(key) {
2020            Ok(count) => ShardResponse::Integer(count as i64),
2021            Err(_) => ShardResponse::WrongType,
2022        },
2023        #[cfg(feature = "vector")]
2024        ShardRequest::VDim { key } => match ks.vdim(key) {
2025            Ok(dim) => ShardResponse::Integer(dim as i64),
2026            Err(_) => ShardResponse::WrongType,
2027        },
2028        #[cfg(feature = "vector")]
2029        ShardRequest::VInfo { key } => match ks.vinfo(key) {
2030            Ok(Some(info)) => {
2031                let fields = vec![
2032                    ("dim".to_owned(), info.dim.to_string()),
2033                    ("count".to_owned(), info.count.to_string()),
2034                    ("metric".to_owned(), info.metric.to_string()),
2035                    ("quantization".to_owned(), info.quantization.to_string()),
2036                    ("connectivity".to_owned(), info.connectivity.to_string()),
2037                    ("expansion_add".to_owned(), info.expansion_add.to_string()),
2038                ];
2039                ShardResponse::VectorInfo(Some(fields))
2040            }
2041            Ok(None) => ShardResponse::VectorInfo(None),
2042            Err(_) => ShardResponse::WrongType,
2043        },
2044        #[cfg(feature = "protobuf")]
2045        ShardRequest::ProtoSet {
2046            key,
2047            type_name,
2048            data,
2049            expire,
2050            nx,
2051            xx,
2052        } => {
2053            if *nx && ks.exists(key) {
2054                return ShardResponse::Value(None);
2055            }
2056            if *xx && !ks.exists(key) {
2057                return ShardResponse::Value(None);
2058            }
2059            match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
2060                SetResult::Ok | SetResult::Blocked => ShardResponse::Ok,
2061                SetResult::OutOfMemory => ShardResponse::OutOfMemory,
2062            }
2063        }
2064        #[cfg(feature = "protobuf")]
2065        ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
2066            Ok(val) => ShardResponse::ProtoValue(val),
2067            Err(_) => ShardResponse::WrongType,
2068        },
2069        #[cfg(feature = "protobuf")]
2070        ShardRequest::ProtoType { key } => match ks.proto_type(key) {
2071            Ok(name) => ShardResponse::ProtoTypeName(name),
2072            Err(_) => ShardResponse::WrongType,
2073        },
2074        // ProtoRegisterAof is a no-op for the keyspace — the AOF record
2075        // is written by the to_aof_record path after dispatch returns Ok.
2076        #[cfg(feature = "protobuf")]
2077        ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
2078        #[cfg(feature = "protobuf")]
2079        ShardRequest::ProtoSetField {
2080            key,
2081            field_path,
2082            value,
2083        } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2084            let new_data = reg.set_field(type_name, data, field_path, value)?;
2085            Ok(ShardResponse::ProtoFieldUpdated {
2086                type_name: type_name.to_owned(),
2087                data: new_data,
2088                expire: ttl,
2089            })
2090        }),
2091        #[cfg(feature = "protobuf")]
2092        ShardRequest::ProtoDelField { key, field_path } => {
2093            dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2094                let new_data = reg.clear_field(type_name, data, field_path)?;
2095                Ok(ShardResponse::ProtoFieldUpdated {
2096                    type_name: type_name.to_owned(),
2097                    data: new_data,
2098                    expire: ttl,
2099                })
2100            })
2101        }
2102        // these requests are intercepted in process_message, not handled here
2103        ShardRequest::Snapshot
2104        | ShardRequest::SerializeSnapshot
2105        | ShardRequest::RewriteAof
2106        | ShardRequest::FlushDbAsync
2107        | ShardRequest::BLPop { .. }
2108        | ShardRequest::BRPop { .. } => ShardResponse::Ok,
2109    }
2110}
2111
2112/// Shared logic for atomic proto field operations (SETFIELD/DELFIELD).
2113///
2114/// Reads the proto value, acquires the schema registry, calls the
2115/// provided mutation closure, then writes the result back to the keyspace
2116/// — all within the single-threaded shard dispatch.
2117#[cfg(feature = "protobuf")]
2118fn dispatch_proto_field_op<F>(
2119    ks: &mut Keyspace,
2120    schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
2121    key: &str,
2122    mutate: F,
2123) -> ShardResponse
2124where
2125    F: FnOnce(
2126        &crate::schema::SchemaRegistry,
2127        &str,
2128        &[u8],
2129        Option<Duration>,
2130    ) -> Result<ShardResponse, crate::schema::SchemaError>,
2131{
2132    let registry = match schema_registry {
2133        Some(r) => r,
2134        None => return ShardResponse::Err("protobuf support is not enabled".into()),
2135    };
2136
2137    let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
2138        Ok(Some(tuple)) => tuple,
2139        Ok(None) => return ShardResponse::Value(None),
2140        Err(_) => return ShardResponse::WrongType,
2141    };
2142
2143    let reg = match registry.read() {
2144        Ok(r) => r,
2145        Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
2146    };
2147
2148    let resp = match mutate(&reg, &type_name, &data, remaining_ttl) {
2149        Ok(r) => r,
2150        Err(e) => return ShardResponse::Err(e.to_string()),
2151    };
2152
2153    // write the updated value back, preserving the original TTL
2154    if let ShardResponse::ProtoFieldUpdated {
2155        ref type_name,
2156        ref data,
2157        expire,
2158    } = resp
2159    {
2160        ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
2161    }
2162
2163    resp
2164}
2165
2166#[cfg(test)]
2167mod tests {
2168    use super::*;
2169
2170    /// Test helper: dispatch without a schema registry.
2171    fn test_dispatch(ks: &mut Keyspace, mut req: ShardRequest) -> ShardResponse {
2172        dispatch(
2173            ks,
2174            &mut req,
2175            #[cfg(feature = "protobuf")]
2176            &None,
2177        )
2178    }
2179
2180    #[test]
2181    fn dispatch_set_and_get() {
2182        let mut ks = Keyspace::new();
2183
2184        let resp = test_dispatch(
2185            &mut ks,
2186            ShardRequest::Set {
2187                key: "k".into(),
2188                value: Bytes::from("v"),
2189                expire: None,
2190                nx: false,
2191                xx: false,
2192            },
2193        );
2194        assert!(matches!(resp, ShardResponse::Ok));
2195
2196        let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "k".into() });
2197        match resp {
2198            ShardResponse::Value(Some(Value::String(data))) => {
2199                assert_eq!(data, Bytes::from("v"));
2200            }
2201            other => panic!("expected Value(Some(String)), got {other:?}"),
2202        }
2203    }
2204
2205    #[test]
2206    fn dispatch_get_missing() {
2207        let mut ks = Keyspace::new();
2208        let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "nope".into() });
2209        assert!(matches!(resp, ShardResponse::Value(None)));
2210    }
2211
2212    #[test]
2213    fn dispatch_del() {
2214        let mut ks = Keyspace::new();
2215        ks.set("key".into(), Bytes::from("val"), None, false, false);
2216
2217        let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2218        assert!(matches!(resp, ShardResponse::Bool(true)));
2219
2220        let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2221        assert!(matches!(resp, ShardResponse::Bool(false)));
2222    }
2223
2224    #[test]
2225    fn dispatch_exists() {
2226        let mut ks = Keyspace::new();
2227        ks.set("yes".into(), Bytes::from("here"), None, false, false);
2228
2229        let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "yes".into() });
2230        assert!(matches!(resp, ShardResponse::Bool(true)));
2231
2232        let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "no".into() });
2233        assert!(matches!(resp, ShardResponse::Bool(false)));
2234    }
2235
2236    #[test]
2237    fn dispatch_expire_and_ttl() {
2238        let mut ks = Keyspace::new();
2239        ks.set("key".into(), Bytes::from("val"), None, false, false);
2240
2241        let resp = test_dispatch(
2242            &mut ks,
2243            ShardRequest::Expire {
2244                key: "key".into(),
2245                seconds: 60,
2246            },
2247        );
2248        assert!(matches!(resp, ShardResponse::Bool(true)));
2249
2250        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2251        match resp {
2252            ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
2253            other => panic!("expected Ttl(Seconds), got {other:?}"),
2254        }
2255    }
2256
2257    #[test]
2258    fn dispatch_ttl_missing() {
2259        let mut ks = Keyspace::new();
2260        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "gone".into() });
2261        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2262    }
2263
2264    #[test]
2265    fn dispatch_incr_new_key() {
2266        let mut ks = Keyspace::new();
2267        let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "c".into() });
2268        assert!(matches!(resp, ShardResponse::Integer(1)));
2269    }
2270
2271    #[test]
2272    fn dispatch_decr_existing() {
2273        let mut ks = Keyspace::new();
2274        ks.set("n".into(), Bytes::from("10"), None, false, false);
2275        let resp = test_dispatch(&mut ks, ShardRequest::Decr { key: "n".into() });
2276        assert!(matches!(resp, ShardResponse::Integer(9)));
2277    }
2278
2279    #[test]
2280    fn dispatch_incr_non_integer() {
2281        let mut ks = Keyspace::new();
2282        ks.set("s".into(), Bytes::from("hello"), None, false, false);
2283        let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "s".into() });
2284        assert!(matches!(resp, ShardResponse::Err(_)));
2285    }
2286
2287    #[test]
2288    fn dispatch_incrby() {
2289        let mut ks = Keyspace::new();
2290        ks.set("n".into(), Bytes::from("10"), None, false, false);
2291        let resp = test_dispatch(
2292            &mut ks,
2293            ShardRequest::IncrBy {
2294                key: "n".into(),
2295                delta: 5,
2296            },
2297        );
2298        assert!(matches!(resp, ShardResponse::Integer(15)));
2299    }
2300
2301    #[test]
2302    fn dispatch_decrby() {
2303        let mut ks = Keyspace::new();
2304        ks.set("n".into(), Bytes::from("10"), None, false, false);
2305        let resp = test_dispatch(
2306            &mut ks,
2307            ShardRequest::DecrBy {
2308                key: "n".into(),
2309                delta: 3,
2310            },
2311        );
2312        assert!(matches!(resp, ShardResponse::Integer(7)));
2313    }
2314
2315    #[test]
2316    fn dispatch_incrby_new_key() {
2317        let mut ks = Keyspace::new();
2318        let resp = test_dispatch(
2319            &mut ks,
2320            ShardRequest::IncrBy {
2321                key: "new".into(),
2322                delta: 42,
2323            },
2324        );
2325        assert!(matches!(resp, ShardResponse::Integer(42)));
2326    }
2327
2328    #[test]
2329    fn dispatch_incrbyfloat() {
2330        let mut ks = Keyspace::new();
2331        ks.set("n".into(), Bytes::from("10.5"), None, false, false);
2332        let resp = test_dispatch(
2333            &mut ks,
2334            ShardRequest::IncrByFloat {
2335                key: "n".into(),
2336                delta: 2.3,
2337            },
2338        );
2339        match resp {
2340            ShardResponse::BulkString(val) => {
2341                let f: f64 = val.parse().unwrap();
2342                assert!((f - 12.8).abs() < 0.001);
2343            }
2344            other => panic!("expected BulkString, got {other:?}"),
2345        }
2346    }
2347
2348    #[test]
2349    fn dispatch_append() {
2350        let mut ks = Keyspace::new();
2351        ks.set("k".into(), Bytes::from("hello"), None, false, false);
2352        let resp = test_dispatch(
2353            &mut ks,
2354            ShardRequest::Append {
2355                key: "k".into(),
2356                value: Bytes::from(" world"),
2357            },
2358        );
2359        assert!(matches!(resp, ShardResponse::Len(11)));
2360    }
2361
2362    #[test]
2363    fn dispatch_strlen() {
2364        let mut ks = Keyspace::new();
2365        ks.set("k".into(), Bytes::from("hello"), None, false, false);
2366        let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "k".into() });
2367        assert!(matches!(resp, ShardResponse::Len(5)));
2368    }
2369
2370    #[test]
2371    fn dispatch_strlen_missing() {
2372        let mut ks = Keyspace::new();
2373        let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "nope".into() });
2374        assert!(matches!(resp, ShardResponse::Len(0)));
2375    }
2376
2377    #[test]
2378    fn dispatch_incrbyfloat_new_key() {
2379        let mut ks = Keyspace::new();
2380        let resp = test_dispatch(
2381            &mut ks,
2382            ShardRequest::IncrByFloat {
2383                key: "new".into(),
2384                delta: 2.72,
2385            },
2386        );
2387        match resp {
2388            ShardResponse::BulkString(val) => {
2389                let f: f64 = val.parse().unwrap();
2390                assert!((f - 2.72).abs() < 0.001);
2391            }
2392            other => panic!("expected BulkString, got {other:?}"),
2393        }
2394    }
2395
2396    #[test]
2397    fn dispatch_persist_removes_ttl() {
2398        let mut ks = Keyspace::new();
2399        ks.set(
2400            "key".into(),
2401            Bytes::from("val"),
2402            Some(Duration::from_secs(60)),
2403            false,
2404            false,
2405        );
2406
2407        let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "key".into() });
2408        assert!(matches!(resp, ShardResponse::Bool(true)));
2409
2410        let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2411        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
2412    }
2413
2414    #[test]
2415    fn dispatch_persist_missing_key() {
2416        let mut ks = Keyspace::new();
2417        let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "nope".into() });
2418        assert!(matches!(resp, ShardResponse::Bool(false)));
2419    }
2420
2421    #[test]
2422    fn dispatch_pttl() {
2423        let mut ks = Keyspace::new();
2424        ks.set(
2425            "key".into(),
2426            Bytes::from("val"),
2427            Some(Duration::from_secs(60)),
2428            false,
2429            false,
2430        );
2431
2432        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2433        match resp {
2434            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2435                assert!(ms > 59_000 && ms <= 60_000);
2436            }
2437            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2438        }
2439    }
2440
2441    #[test]
2442    fn dispatch_pttl_missing() {
2443        let mut ks = Keyspace::new();
2444        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "nope".into() });
2445        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2446    }
2447
2448    #[test]
2449    fn dispatch_pexpire() {
2450        let mut ks = Keyspace::new();
2451        ks.set("key".into(), Bytes::from("val"), None, false, false);
2452
2453        let resp = test_dispatch(
2454            &mut ks,
2455            ShardRequest::Pexpire {
2456                key: "key".into(),
2457                milliseconds: 5000,
2458            },
2459        );
2460        assert!(matches!(resp, ShardResponse::Bool(true)));
2461
2462        let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2463        match resp {
2464            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2465                assert!(ms > 4000 && ms <= 5000);
2466            }
2467            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2468        }
2469    }
2470
2471    #[test]
2472    fn dispatch_set_nx_when_key_missing() {
2473        let mut ks = Keyspace::new();
2474        let resp = test_dispatch(
2475            &mut ks,
2476            ShardRequest::Set {
2477                key: "k".into(),
2478                value: Bytes::from("v"),
2479                expire: None,
2480                nx: true,
2481                xx: false,
2482            },
2483        );
2484        assert!(matches!(resp, ShardResponse::Ok));
2485        assert!(ks.exists("k"));
2486    }
2487
2488    #[test]
2489    fn dispatch_set_nx_when_key_exists() {
2490        let mut ks = Keyspace::new();
2491        ks.set("k".into(), Bytes::from("old"), None, false, false);
2492
2493        let resp = test_dispatch(
2494            &mut ks,
2495            ShardRequest::Set {
2496                key: "k".into(),
2497                value: Bytes::from("new"),
2498                expire: None,
2499                nx: true,
2500                xx: false,
2501            },
2502        );
2503        // NX should block — returns nil
2504        assert!(matches!(resp, ShardResponse::Value(None)));
2505        // original value should remain
2506        match ks.get("k").unwrap() {
2507            Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2508            other => panic!("expected old value, got {other:?}"),
2509        }
2510    }
2511
2512    #[test]
2513    fn dispatch_set_xx_when_key_exists() {
2514        let mut ks = Keyspace::new();
2515        ks.set("k".into(), Bytes::from("old"), None, false, false);
2516
2517        let resp = test_dispatch(
2518            &mut ks,
2519            ShardRequest::Set {
2520                key: "k".into(),
2521                value: Bytes::from("new"),
2522                expire: None,
2523                nx: false,
2524                xx: true,
2525            },
2526        );
2527        assert!(matches!(resp, ShardResponse::Ok));
2528        match ks.get("k").unwrap() {
2529            Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2530            other => panic!("expected new value, got {other:?}"),
2531        }
2532    }
2533
2534    #[test]
2535    fn dispatch_set_xx_when_key_missing() {
2536        let mut ks = Keyspace::new();
2537        let resp = test_dispatch(
2538            &mut ks,
2539            ShardRequest::Set {
2540                key: "k".into(),
2541                value: Bytes::from("v"),
2542                expire: None,
2543                nx: false,
2544                xx: true,
2545            },
2546        );
2547        // XX should block — returns nil
2548        assert!(matches!(resp, ShardResponse::Value(None)));
2549        assert!(!ks.exists("k"));
2550    }
2551
2552    #[test]
2553    fn dispatch_flushdb_clears_all_keys() {
2554        let mut ks = Keyspace::new();
2555        ks.set("a".into(), Bytes::from("1"), None, false, false);
2556        ks.set("b".into(), Bytes::from("2"), None, false, false);
2557
2558        assert_eq!(ks.len(), 2);
2559
2560        let resp = test_dispatch(&mut ks, ShardRequest::FlushDb);
2561        assert!(matches!(resp, ShardResponse::Ok));
2562        assert_eq!(ks.len(), 0);
2563    }
2564
2565    #[test]
2566    fn dispatch_scan_returns_keys() {
2567        let mut ks = Keyspace::new();
2568        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2569        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2570        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2571
2572        let resp = test_dispatch(
2573            &mut ks,
2574            ShardRequest::Scan {
2575                cursor: 0,
2576                count: 10,
2577                pattern: None,
2578            },
2579        );
2580
2581        match resp {
2582            ShardResponse::Scan { cursor, keys } => {
2583                assert_eq!(cursor, 0); // complete in one pass
2584                assert_eq!(keys.len(), 3);
2585            }
2586            _ => panic!("expected Scan response"),
2587        }
2588    }
2589
2590    #[test]
2591    fn dispatch_scan_with_pattern() {
2592        let mut ks = Keyspace::new();
2593        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2594        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2595        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2596
2597        let resp = test_dispatch(
2598            &mut ks,
2599            ShardRequest::Scan {
2600                cursor: 0,
2601                count: 10,
2602                pattern: Some("user:*".into()),
2603            },
2604        );
2605
2606        match resp {
2607            ShardResponse::Scan { cursor, keys } => {
2608                assert_eq!(cursor, 0);
2609                assert_eq!(keys.len(), 2);
2610                for k in &keys {
2611                    assert!(k.starts_with("user:"));
2612                }
2613            }
2614            _ => panic!("expected Scan response"),
2615        }
2616    }
2617
2618    #[test]
2619    fn dispatch_keys() {
2620        let mut ks = Keyspace::new();
2621        ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2622        ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2623        ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2624        let resp = test_dispatch(
2625            &mut ks,
2626            ShardRequest::Keys {
2627                pattern: "user:*".into(),
2628            },
2629        );
2630        match resp {
2631            ShardResponse::StringArray(mut keys) => {
2632                keys.sort();
2633                assert_eq!(keys, vec!["user:1", "user:2"]);
2634            }
2635            other => panic!("expected StringArray, got {other:?}"),
2636        }
2637    }
2638
2639    #[test]
2640    fn dispatch_rename() {
2641        let mut ks = Keyspace::new();
2642        ks.set("old".into(), Bytes::from("value"), None, false, false);
2643        let resp = test_dispatch(
2644            &mut ks,
2645            ShardRequest::Rename {
2646                key: "old".into(),
2647                newkey: "new".into(),
2648            },
2649        );
2650        assert!(matches!(resp, ShardResponse::Ok));
2651        assert!(!ks.exists("old"));
2652        assert!(ks.exists("new"));
2653    }
2654
2655    #[test]
2656    fn dispatch_rename_missing_key() {
2657        let mut ks = Keyspace::new();
2658        let resp = test_dispatch(
2659            &mut ks,
2660            ShardRequest::Rename {
2661                key: "missing".into(),
2662                newkey: "new".into(),
2663            },
2664        );
2665        assert!(matches!(resp, ShardResponse::Err(_)));
2666    }
2667
2668    #[test]
2669    fn dump_key_returns_serialized_value() {
2670        let mut ks = Keyspace::new();
2671        ks.set(
2672            "greeting".into(),
2673            Bytes::from("hello"),
2674            Some(Duration::from_secs(60)),
2675            false,
2676            false,
2677        );
2678
2679        let resp = test_dispatch(
2680            &mut ks,
2681            ShardRequest::DumpKey {
2682                key: "greeting".into(),
2683            },
2684        );
2685        match resp {
2686            ShardResponse::KeyDump { data, ttl_ms } => {
2687                assert!(!data.is_empty());
2688                assert!(ttl_ms > 0);
2689                // verify the data round-trips
2690                let snap = snapshot::deserialize_snap_value(&data).unwrap();
2691                assert!(matches!(snap, SnapValue::String(ref b) if b == &Bytes::from("hello")));
2692            }
2693            other => panic!("expected KeyDump, got {other:?}"),
2694        }
2695    }
2696
2697    #[test]
2698    fn dump_key_missing_returns_none() {
2699        let mut ks = Keyspace::new();
2700        let resp = test_dispatch(&mut ks, ShardRequest::DumpKey { key: "nope".into() });
2701        assert!(matches!(resp, ShardResponse::Value(None)));
2702    }
2703
2704    #[test]
2705    fn restore_key_inserts_value() {
2706        let mut ks = Keyspace::new();
2707        let snap = SnapValue::String(Bytes::from("restored"));
2708        let data = snapshot::serialize_snap_value(&snap).unwrap();
2709
2710        let resp = test_dispatch(
2711            &mut ks,
2712            ShardRequest::RestoreKey {
2713                key: "mykey".into(),
2714                ttl_ms: 0,
2715                data: Bytes::from(data),
2716                replace: false,
2717            },
2718        );
2719        assert!(matches!(resp, ShardResponse::Ok));
2720        assert_eq!(
2721            ks.get("mykey").unwrap(),
2722            Some(Value::String(Bytes::from("restored")))
2723        );
2724    }
2725
2726    #[test]
2727    fn restore_key_with_ttl() {
2728        let mut ks = Keyspace::new();
2729        let snap = SnapValue::String(Bytes::from("temp"));
2730        let data = snapshot::serialize_snap_value(&snap).unwrap();
2731
2732        let resp = test_dispatch(
2733            &mut ks,
2734            ShardRequest::RestoreKey {
2735                key: "ttlkey".into(),
2736                ttl_ms: 30_000,
2737                data: Bytes::from(data),
2738                replace: false,
2739            },
2740        );
2741        assert!(matches!(resp, ShardResponse::Ok));
2742        match ks.pttl("ttlkey") {
2743            TtlResult::Milliseconds(ms) => assert!(ms > 29_000 && ms <= 30_000),
2744            other => panic!("expected Milliseconds, got {other:?}"),
2745        }
2746    }
2747
2748    #[test]
2749    fn restore_key_rejects_duplicate_without_replace() {
2750        let mut ks = Keyspace::new();
2751        ks.set("existing".into(), Bytes::from("old"), None, false, false);
2752
2753        let snap = SnapValue::String(Bytes::from("new"));
2754        let data = snapshot::serialize_snap_value(&snap).unwrap();
2755
2756        let resp = test_dispatch(
2757            &mut ks,
2758            ShardRequest::RestoreKey {
2759                key: "existing".into(),
2760                ttl_ms: 0,
2761                data: Bytes::from(data),
2762                replace: false,
2763            },
2764        );
2765        assert!(matches!(resp, ShardResponse::Err(_)));
2766        // original value unchanged
2767        assert_eq!(
2768            ks.get("existing").unwrap(),
2769            Some(Value::String(Bytes::from("old")))
2770        );
2771    }
2772
2773    #[test]
2774    fn restore_key_replace_overwrites() {
2775        let mut ks = Keyspace::new();
2776        ks.set("existing".into(), Bytes::from("old"), None, false, false);
2777
2778        let snap = SnapValue::String(Bytes::from("new"));
2779        let data = snapshot::serialize_snap_value(&snap).unwrap();
2780
2781        let resp = test_dispatch(
2782            &mut ks,
2783            ShardRequest::RestoreKey {
2784                key: "existing".into(),
2785                ttl_ms: 0,
2786                data: Bytes::from(data),
2787                replace: true,
2788            },
2789        );
2790        assert!(matches!(resp, ShardResponse::Ok));
2791        assert_eq!(
2792            ks.get("existing").unwrap(),
2793            Some(Value::String(Bytes::from("new")))
2794        );
2795    }
2796
2797    #[test]
2798    fn dump_and_restore_hash_roundtrip() {
2799        let mut ks = Keyspace::new();
2800        ks.hset(
2801            "myhash",
2802            &[
2803                ("f1".into(), Bytes::from("v1")),
2804                ("f2".into(), Bytes::from("v2")),
2805            ],
2806        )
2807        .unwrap();
2808
2809        // dump
2810        let resp = test_dispatch(
2811            &mut ks,
2812            ShardRequest::DumpKey {
2813                key: "myhash".into(),
2814            },
2815        );
2816        let (data, _ttl) = match resp {
2817            ShardResponse::KeyDump { data, ttl_ms } => (data, ttl_ms),
2818            other => panic!("expected KeyDump, got {other:?}"),
2819        };
2820
2821        // restore to a new key
2822        let resp = test_dispatch(
2823            &mut ks,
2824            ShardRequest::RestoreKey {
2825                key: "myhash2".into(),
2826                ttl_ms: 0,
2827                data: Bytes::from(data),
2828                replace: false,
2829            },
2830        );
2831        assert!(matches!(resp, ShardResponse::Ok));
2832
2833        // verify fields
2834        assert_eq!(ks.hget("myhash2", "f1").unwrap(), Some(Bytes::from("v1")));
2835        assert_eq!(ks.hget("myhash2", "f2").unwrap(), Some(Bytes::from("v2")));
2836    }
2837
2838    #[test]
2839    fn is_write_classifies_correctly() {
2840        // write commands
2841        assert!(ShardRequest::Set {
2842            key: "k".into(),
2843            value: Bytes::from("v"),
2844            expire: None,
2845            nx: false,
2846            xx: false,
2847        }
2848        .is_write());
2849        assert!(ShardRequest::Del { key: "k".into() }.is_write());
2850        assert!(ShardRequest::Incr { key: "k".into() }.is_write());
2851        assert!(ShardRequest::LPush {
2852            key: "k".into(),
2853            values: vec![],
2854        }
2855        .is_write());
2856        assert!(ShardRequest::HSet {
2857            key: "k".into(),
2858            fields: vec![],
2859        }
2860        .is_write());
2861        assert!(ShardRequest::SAdd {
2862            key: "k".into(),
2863            members: vec![],
2864        }
2865        .is_write());
2866        assert!(ShardRequest::FlushDb.is_write());
2867
2868        // read commands
2869        assert!(!ShardRequest::Get { key: "k".into() }.is_write());
2870        assert!(!ShardRequest::Exists { key: "k".into() }.is_write());
2871        assert!(!ShardRequest::Ttl { key: "k".into() }.is_write());
2872        assert!(!ShardRequest::DbSize.is_write());
2873        assert!(!ShardRequest::Stats.is_write());
2874        assert!(!ShardRequest::LLen { key: "k".into() }.is_write());
2875        assert!(!ShardRequest::HGet {
2876            key: "k".into(),
2877            field: "f".into(),
2878        }
2879        .is_write());
2880        assert!(!ShardRequest::SMembers { key: "k".into() }.is_write());
2881    }
2882}