Skip to main content

ember_core/
shard.rs

1//! Shard: an independent partition of the keyspace.
2//!
3//! Each shard runs as its own tokio task, owning a `Keyspace` with no
4//! internal locking. Commands arrive over an mpsc channel and responses
5//! go back on a per-request oneshot. A background tick drives active
6//! expiration of TTL'd keys.
7
8use std::path::PathBuf;
9use std::time::Duration;
10
11use bytes::Bytes;
12use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
13use ember_persistence::recovery::{self, RecoveredValue};
14use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
15use tokio::sync::{mpsc, oneshot};
16use tracing::{info, warn};
17
18use crate::dropper::DropHandle;
19use crate::error::ShardError;
20use crate::expiry;
21use crate::keyspace::{
22    IncrError, IncrFloatError, Keyspace, KeyspaceStats, SetResult, ShardConfig, TtlResult,
23    WriteError,
24};
25use crate::types::sorted_set::ZAddFlags;
26use crate::types::Value;
27
28/// How often the shard runs active expiration. 100ms matches
29/// Redis's hz=10 default and keeps CPU overhead negligible.
30const EXPIRY_TICK: Duration = Duration::from_millis(100);
31
32/// How often to fsync when using the `EverySec` policy.
33const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
34
35/// Optional persistence configuration for a shard.
36#[derive(Debug, Clone)]
37pub struct ShardPersistenceConfig {
38    /// Directory where AOF and snapshot files live.
39    pub data_dir: PathBuf,
40    /// Whether to write an AOF log of mutations.
41    pub append_only: bool,
42    /// When to fsync the AOF file.
43    pub fsync_policy: FsyncPolicy,
44    /// Optional encryption key for encrypting data at rest.
45    /// When set, AOF and snapshot files use the v3 encrypted format.
46    #[cfg(feature = "encryption")]
47    pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
48}
49
50/// A protocol-agnostic command sent to a shard.
51#[derive(Debug)]
52pub enum ShardRequest {
53    Get {
54        key: String,
55    },
56    Set {
57        key: String,
58        value: Bytes,
59        expire: Option<Duration>,
60        /// Only set the key if it does not already exist.
61        nx: bool,
62        /// Only set the key if it already exists.
63        xx: bool,
64    },
65    Incr {
66        key: String,
67    },
68    Decr {
69        key: String,
70    },
71    IncrBy {
72        key: String,
73        delta: i64,
74    },
75    DecrBy {
76        key: String,
77        delta: i64,
78    },
79    IncrByFloat {
80        key: String,
81        delta: f64,
82    },
83    Append {
84        key: String,
85        value: Bytes,
86    },
87    Strlen {
88        key: String,
89    },
90    /// Returns all keys matching a glob pattern in this shard.
91    Keys {
92        pattern: String,
93    },
94    /// Renames a key within this shard.
95    Rename {
96        key: String,
97        newkey: String,
98    },
99    Del {
100        key: String,
101    },
102    /// Like DEL but defers value deallocation to the background drop thread.
103    Unlink {
104        key: String,
105    },
106    Exists {
107        key: String,
108    },
109    Expire {
110        key: String,
111        seconds: u64,
112    },
113    Ttl {
114        key: String,
115    },
116    Persist {
117        key: String,
118    },
119    Pttl {
120        key: String,
121    },
122    Pexpire {
123        key: String,
124        milliseconds: u64,
125    },
126    LPush {
127        key: String,
128        values: Vec<Bytes>,
129    },
130    RPush {
131        key: String,
132        values: Vec<Bytes>,
133    },
134    LPop {
135        key: String,
136    },
137    RPop {
138        key: String,
139    },
140    LRange {
141        key: String,
142        start: i64,
143        stop: i64,
144    },
145    LLen {
146        key: String,
147    },
148    Type {
149        key: String,
150    },
151    ZAdd {
152        key: String,
153        members: Vec<(f64, String)>,
154        nx: bool,
155        xx: bool,
156        gt: bool,
157        lt: bool,
158        ch: bool,
159    },
160    ZRem {
161        key: String,
162        members: Vec<String>,
163    },
164    ZScore {
165        key: String,
166        member: String,
167    },
168    ZRank {
169        key: String,
170        member: String,
171    },
172    ZCard {
173        key: String,
174    },
175    ZRange {
176        key: String,
177        start: i64,
178        stop: i64,
179        with_scores: bool,
180    },
181    HSet {
182        key: String,
183        fields: Vec<(String, Bytes)>,
184    },
185    HGet {
186        key: String,
187        field: String,
188    },
189    HGetAll {
190        key: String,
191    },
192    HDel {
193        key: String,
194        fields: Vec<String>,
195    },
196    HExists {
197        key: String,
198        field: String,
199    },
200    HLen {
201        key: String,
202    },
203    HIncrBy {
204        key: String,
205        field: String,
206        delta: i64,
207    },
208    HKeys {
209        key: String,
210    },
211    HVals {
212        key: String,
213    },
214    HMGet {
215        key: String,
216        fields: Vec<String>,
217    },
218    SAdd {
219        key: String,
220        members: Vec<String>,
221    },
222    SRem {
223        key: String,
224        members: Vec<String>,
225    },
226    SMembers {
227        key: String,
228    },
229    SIsMember {
230        key: String,
231        member: String,
232    },
233    SCard {
234        key: String,
235    },
236    /// Returns the key count for this shard.
237    DbSize,
238    /// Returns keyspace stats for this shard.
239    Stats,
240    /// Triggers a snapshot write.
241    Snapshot,
242    /// Triggers an AOF rewrite (snapshot + truncate AOF).
243    RewriteAof,
244    /// Clears all keys from the keyspace.
245    FlushDb,
246    /// Clears all keys, deferring deallocation to the background drop thread.
247    FlushDbAsync,
248    /// Scans keys in the keyspace.
249    Scan {
250        cursor: u64,
251        count: usize,
252        pattern: Option<String>,
253    },
254    /// Counts keys in this shard that hash to the given cluster slot.
255    CountKeysInSlot {
256        slot: u16,
257    },
258    /// Returns up to `count` keys that hash to the given cluster slot.
259    GetKeysInSlot {
260        slot: u16,
261        count: usize,
262    },
263    /// Stores a validated protobuf value.
264    #[cfg(feature = "protobuf")]
265    ProtoSet {
266        key: String,
267        type_name: String,
268        data: Bytes,
269        expire: Option<Duration>,
270        nx: bool,
271        xx: bool,
272    },
273    /// Retrieves a protobuf value.
274    #[cfg(feature = "protobuf")]
275    ProtoGet {
276        key: String,
277    },
278    /// Returns the protobuf message type name for a key.
279    #[cfg(feature = "protobuf")]
280    ProtoType {
281        key: String,
282    },
283    /// Writes a ProtoRegister AOF record (no keyspace mutation).
284    /// Broadcast to all shards after a schema registration so the
285    /// schema is recovered from any shard's AOF on restart.
286    #[cfg(feature = "protobuf")]
287    ProtoRegisterAof {
288        name: String,
289        descriptor: Bytes,
290    },
291    /// Atomically reads a proto value, sets a field, and writes it back.
292    /// Runs entirely within the shard's single-threaded dispatch.
293    #[cfg(feature = "protobuf")]
294    ProtoSetField {
295        key: String,
296        field_path: String,
297        value: String,
298    },
299    /// Atomically reads a proto value, clears a field, and writes it back.
300    /// Runs entirely within the shard's single-threaded dispatch.
301    #[cfg(feature = "protobuf")]
302    ProtoDelField {
303        key: String,
304        field_path: String,
305    },
306}
307
308/// The shard's response to a request.
309#[derive(Debug)]
310pub enum ShardResponse {
311    /// A value (or None for a cache miss).
312    Value(Option<Value>),
313    /// Simple acknowledgement (e.g. SET).
314    Ok,
315    /// Integer result (e.g. INCR, DECR).
316    Integer(i64),
317    /// Boolean result (e.g. DEL, EXISTS, EXPIRE).
318    Bool(bool),
319    /// TTL query result.
320    Ttl(TtlResult),
321    /// Memory limit reached and eviction policy is NoEviction.
322    OutOfMemory,
323    /// Key count for a shard (DBSIZE).
324    KeyCount(usize),
325    /// Full stats for a shard (INFO).
326    Stats(KeyspaceStats),
327    /// Integer length result (e.g. LPUSH, RPUSH, LLEN).
328    Len(usize),
329    /// Array of bulk values (e.g. LRANGE).
330    Array(Vec<Bytes>),
331    /// The type name of a stored value.
332    TypeName(&'static str),
333    /// ZADD result: count for the client + actually applied members for AOF.
334    ZAddLen {
335        count: usize,
336        applied: Vec<(f64, String)>,
337    },
338    /// ZREM result: count for the client + actually removed members for AOF.
339    ZRemLen { count: usize, removed: Vec<String> },
340    /// Float score result (e.g. ZSCORE).
341    Score(Option<f64>),
342    /// Rank result (e.g. ZRANK).
343    Rank(Option<usize>),
344    /// Scored array of (member, score) pairs (e.g. ZRANGE).
345    ScoredArray(Vec<(String, f64)>),
346    /// A bulk string result (e.g. INCRBYFLOAT).
347    BulkString(String),
348    /// Command used against a key holding the wrong kind of value.
349    WrongType,
350    /// An error message.
351    Err(String),
352    /// Scan result: next cursor and list of keys.
353    Scan { cursor: u64, keys: Vec<String> },
354    /// HGETALL result: all field-value pairs.
355    HashFields(Vec<(String, Bytes)>),
356    /// HDEL result: removed count + field names for AOF.
357    HDelLen { count: usize, removed: Vec<String> },
358    /// Array of strings (e.g. HKEYS).
359    StringArray(Vec<String>),
360    /// HMGET result: array of optional values.
361    OptionalArray(Vec<Option<Bytes>>),
362    /// PROTO.GET result: (type_name, data, remaining_ttl) or None.
363    #[cfg(feature = "protobuf")]
364    ProtoValue(Option<(String, Bytes, Option<Duration>)>),
365    /// PROTO.TYPE result: message type name or None.
366    #[cfg(feature = "protobuf")]
367    ProtoTypeName(Option<String>),
368    /// Result of an atomic SETFIELD/DELFIELD: carries the updated value
369    /// for AOF persistence.
370    #[cfg(feature = "protobuf")]
371    ProtoFieldUpdated {
372        type_name: String,
373        data: Bytes,
374        expire: Option<Duration>,
375    },
376}
377
378/// A request bundled with its reply channel.
379#[derive(Debug)]
380pub struct ShardMessage {
381    pub request: ShardRequest,
382    pub reply: oneshot::Sender<ShardResponse>,
383}
384
385/// A cloneable handle for sending commands to a shard task.
386///
387/// Wraps the mpsc sender so callers don't need to manage oneshot
388/// channels directly.
389#[derive(Debug, Clone)]
390pub struct ShardHandle {
391    tx: mpsc::Sender<ShardMessage>,
392}
393
394impl ShardHandle {
395    /// Sends a request and waits for the response.
396    ///
397    /// Returns `ShardError::Unavailable` if the shard task has stopped.
398    pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
399        let rx = self.dispatch(request).await?;
400        rx.await.map_err(|_| ShardError::Unavailable)
401    }
402
403    /// Sends a request and returns the reply channel without waiting
404    /// for the response. Used by `Engine::broadcast` to fan out to
405    /// all shards before collecting results.
406    pub(crate) async fn dispatch(
407        &self,
408        request: ShardRequest,
409    ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
410        let (reply_tx, reply_rx) = oneshot::channel();
411        let msg = ShardMessage {
412            request,
413            reply: reply_tx,
414        };
415        self.tx
416            .send(msg)
417            .await
418            .map_err(|_| ShardError::Unavailable)?;
419        Ok(reply_rx)
420    }
421}
422
423/// Spawns a shard task and returns the handle for communicating with it.
424///
425/// `buffer` controls the mpsc channel capacity — higher values absorb
426/// burst traffic at the cost of memory. When `drop_handle` is provided,
427/// large value deallocations are deferred to the background drop thread.
428pub fn spawn_shard(
429    buffer: usize,
430    config: ShardConfig,
431    persistence: Option<ShardPersistenceConfig>,
432    drop_handle: Option<DropHandle>,
433    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
434) -> ShardHandle {
435    let (tx, rx) = mpsc::channel(buffer);
436    tokio::spawn(run_shard(
437        rx,
438        config,
439        persistence,
440        drop_handle,
441        #[cfg(feature = "protobuf")]
442        schema_registry,
443    ));
444    ShardHandle { tx }
445}
446
447/// The shard's main loop. Processes messages and runs periodic
448/// active expiration until the channel closes.
449async fn run_shard(
450    mut rx: mpsc::Receiver<ShardMessage>,
451    config: ShardConfig,
452    persistence: Option<ShardPersistenceConfig>,
453    drop_handle: Option<DropHandle>,
454    #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
455) {
456    let shard_id = config.shard_id;
457    let mut keyspace = Keyspace::with_config(config);
458
459    if let Some(handle) = drop_handle.clone() {
460        keyspace.set_drop_handle(handle);
461    }
462
463    // -- recovery --
464    if let Some(ref pcfg) = persistence {
465        #[cfg(feature = "encryption")]
466        let result = if let Some(ref key) = pcfg.encryption_key {
467            recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
468        } else {
469            recovery::recover_shard(&pcfg.data_dir, shard_id)
470        };
471        #[cfg(not(feature = "encryption"))]
472        let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
473        let count = result.entries.len();
474        for entry in result.entries {
475            let value = match entry.value {
476                RecoveredValue::String(data) => Value::String(data),
477                RecoveredValue::List(deque) => Value::List(deque),
478                RecoveredValue::SortedSet(members) => {
479                    let mut ss = crate::types::sorted_set::SortedSet::new();
480                    for (score, member) in members {
481                        ss.add(member, score);
482                    }
483                    Value::SortedSet(ss)
484                }
485                RecoveredValue::Hash(map) => Value::Hash(map),
486                RecoveredValue::Set(set) => Value::Set(set),
487                #[cfg(feature = "protobuf")]
488                RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
489            };
490            keyspace.restore(entry.key, value, entry.ttl);
491        }
492        if count > 0 {
493            info!(
494                shard_id,
495                recovered_keys = count,
496                snapshot = result.loaded_snapshot,
497                aof = result.replayed_aof,
498                "recovered shard state"
499            );
500        }
501
502        // restore schemas found in the AOF into the shared registry
503        #[cfg(feature = "protobuf")]
504        if let Some(ref registry) = schema_registry {
505            if !result.schemas.is_empty() {
506                if let Ok(mut reg) = registry.write() {
507                    let schema_count = result.schemas.len();
508                    for (name, descriptor) in result.schemas {
509                        reg.restore(name, descriptor);
510                    }
511                    info!(
512                        shard_id,
513                        schemas = schema_count,
514                        "restored schemas from AOF"
515                    );
516                }
517            }
518        }
519    }
520
521    // -- AOF writer --
522    let mut aof_writer: Option<AofWriter> = match &persistence {
523        Some(pcfg) if pcfg.append_only => {
524            let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
525            #[cfg(feature = "encryption")]
526            let result = if let Some(ref key) = pcfg.encryption_key {
527                AofWriter::open_encrypted(path, key.clone())
528            } else {
529                AofWriter::open(path)
530            };
531            #[cfg(not(feature = "encryption"))]
532            let result = AofWriter::open(path);
533            match result {
534                Ok(w) => Some(w),
535                Err(e) => {
536                    warn!(shard_id, "failed to open AOF writer: {e}");
537                    None
538                }
539            }
540        }
541        _ => None,
542    };
543
544    let fsync_policy = persistence
545        .as_ref()
546        .map(|p| p.fsync_policy)
547        .unwrap_or(FsyncPolicy::No);
548
549    // -- tickers --
550    let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
551    expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
552
553    let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
554    fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
555
556    loop {
557        tokio::select! {
558            msg = rx.recv() => {
559                match msg {
560                    Some(msg) => {
561                        let request_kind = describe_request(&msg.request);
562                        let response = dispatch(
563                            &mut keyspace,
564                            &msg.request,
565                            #[cfg(feature = "protobuf")]
566                            &schema_registry,
567                        );
568
569                        // write AOF record for successful mutations
570                        if let Some(ref mut writer) = aof_writer {
571                            if let Some(record) = to_aof_record(&msg.request, &response) {
572                                if let Err(e) = writer.write_record(&record) {
573                                    warn!(shard_id, "aof write failed: {e}");
574                                }
575                                if fsync_policy == FsyncPolicy::Always {
576                                    if let Err(e) = writer.sync() {
577                                        warn!(shard_id, "aof sync failed: {e}");
578                                    }
579                                }
580                            }
581                        }
582
583                        // handle snapshot/rewrite (these need mutable access
584                        // to both keyspace and aof_writer)
585                        match request_kind {
586                            RequestKind::Snapshot => {
587                                let resp = handle_snapshot(
588                                    &keyspace, &persistence, shard_id,
589                                );
590                                let _ = msg.reply.send(resp);
591                                continue;
592                            }
593                            RequestKind::RewriteAof => {
594                                let resp = handle_rewrite(
595                                    &keyspace,
596                                    &persistence,
597                                    &mut aof_writer,
598                                    shard_id,
599                                    #[cfg(feature = "protobuf")]
600                                    &schema_registry,
601                                );
602                                let _ = msg.reply.send(resp);
603                                continue;
604                            }
605                            RequestKind::FlushDbAsync => {
606                                let old_entries = keyspace.flush_async();
607                                if let Some(ref handle) = drop_handle {
608                                    handle.defer_entries(old_entries);
609                                }
610                                // else: old_entries drops inline here
611                                let _ = msg.reply.send(ShardResponse::Ok);
612                                continue;
613                            }
614                            RequestKind::Other => {}
615                        }
616
617                        let _ = msg.reply.send(response);
618                    }
619                    None => break, // channel closed, shard shutting down
620                }
621            }
622            _ = expiry_tick.tick() => {
623                expiry::run_expiration_cycle(&mut keyspace);
624            }
625            _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
626                if let Some(ref mut writer) = aof_writer {
627                    if let Err(e) = writer.sync() {
628                        warn!(shard_id, "periodic aof sync failed: {e}");
629                    }
630                }
631            }
632        }
633    }
634
635    // flush AOF on clean shutdown
636    if let Some(ref mut writer) = aof_writer {
637        let _ = writer.sync();
638    }
639}
640
641/// Lightweight tag so we can identify requests that need special
642/// handling after dispatch without borrowing the request again.
643enum RequestKind {
644    Snapshot,
645    RewriteAof,
646    FlushDbAsync,
647    Other,
648}
649
650fn describe_request(req: &ShardRequest) -> RequestKind {
651    match req {
652        ShardRequest::Snapshot => RequestKind::Snapshot,
653        ShardRequest::RewriteAof => RequestKind::RewriteAof,
654        ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
655        _ => RequestKind::Other,
656    }
657}
658
659/// Executes a single request against the keyspace.
660fn dispatch(
661    ks: &mut Keyspace,
662    req: &ShardRequest,
663    #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
664) -> ShardResponse {
665    match req {
666        ShardRequest::Get { key } => match ks.get(key) {
667            Ok(val) => ShardResponse::Value(val),
668            Err(_) => ShardResponse::WrongType,
669        },
670        ShardRequest::Set {
671            key,
672            value,
673            expire,
674            nx,
675            xx,
676        } => {
677            // NX: only set if key does NOT already exist
678            if *nx && ks.exists(key) {
679                return ShardResponse::Value(None);
680            }
681            // XX: only set if key DOES already exist
682            if *xx && !ks.exists(key) {
683                return ShardResponse::Value(None);
684            }
685            match ks.set(key.clone(), value.clone(), *expire) {
686                SetResult::Ok => ShardResponse::Ok,
687                SetResult::OutOfMemory => ShardResponse::OutOfMemory,
688            }
689        }
690        ShardRequest::Incr { key } => match ks.incr(key) {
691            Ok(val) => ShardResponse::Integer(val),
692            Err(IncrError::WrongType) => ShardResponse::WrongType,
693            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
694            Err(e) => ShardResponse::Err(e.to_string()),
695        },
696        ShardRequest::Decr { key } => match ks.decr(key) {
697            Ok(val) => ShardResponse::Integer(val),
698            Err(IncrError::WrongType) => ShardResponse::WrongType,
699            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
700            Err(e) => ShardResponse::Err(e.to_string()),
701        },
702        ShardRequest::IncrBy { key, delta } => match ks.incr_by(key, *delta) {
703            Ok(val) => ShardResponse::Integer(val),
704            Err(IncrError::WrongType) => ShardResponse::WrongType,
705            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
706            Err(e) => ShardResponse::Err(e.to_string()),
707        },
708        ShardRequest::DecrBy { key, delta } => match ks.incr_by(key, -delta) {
709            Ok(val) => ShardResponse::Integer(val),
710            Err(IncrError::WrongType) => ShardResponse::WrongType,
711            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
712            Err(e) => ShardResponse::Err(e.to_string()),
713        },
714        ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
715            Ok(val) => ShardResponse::BulkString(val),
716            Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
717            Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
718            Err(e) => ShardResponse::Err(e.to_string()),
719        },
720        ShardRequest::Append { key, value } => match ks.append(key, value) {
721            Ok(len) => ShardResponse::Len(len),
722            Err(WriteError::WrongType) => ShardResponse::WrongType,
723            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
724        },
725        ShardRequest::Strlen { key } => match ks.strlen(key) {
726            Ok(len) => ShardResponse::Len(len),
727            Err(_) => ShardResponse::WrongType,
728        },
729        ShardRequest::Keys { pattern } => {
730            let keys = ks.keys(pattern);
731            ShardResponse::StringArray(keys)
732        }
733        ShardRequest::Rename { key, newkey } => {
734            use crate::keyspace::RenameError;
735            match ks.rename(key, newkey) {
736                Ok(()) => ShardResponse::Ok,
737                Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
738            }
739        }
740        ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
741        ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
742        ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
743        ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
744        ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
745        ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
746        ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
747        ShardRequest::Pexpire { key, milliseconds } => {
748            ShardResponse::Bool(ks.pexpire(key, *milliseconds))
749        }
750        ShardRequest::LPush { key, values } => match ks.lpush(key, values) {
751            Ok(len) => ShardResponse::Len(len),
752            Err(WriteError::WrongType) => ShardResponse::WrongType,
753            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
754        },
755        ShardRequest::RPush { key, values } => match ks.rpush(key, values) {
756            Ok(len) => ShardResponse::Len(len),
757            Err(WriteError::WrongType) => ShardResponse::WrongType,
758            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
759        },
760        ShardRequest::LPop { key } => match ks.lpop(key) {
761            Ok(val) => ShardResponse::Value(val.map(Value::String)),
762            Err(_) => ShardResponse::WrongType,
763        },
764        ShardRequest::RPop { key } => match ks.rpop(key) {
765            Ok(val) => ShardResponse::Value(val.map(Value::String)),
766            Err(_) => ShardResponse::WrongType,
767        },
768        ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
769            Ok(items) => ShardResponse::Array(items),
770            Err(_) => ShardResponse::WrongType,
771        },
772        ShardRequest::LLen { key } => match ks.llen(key) {
773            Ok(len) => ShardResponse::Len(len),
774            Err(_) => ShardResponse::WrongType,
775        },
776        ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
777        ShardRequest::ZAdd {
778            key,
779            members,
780            nx,
781            xx,
782            gt,
783            lt,
784            ch,
785        } => {
786            let flags = ZAddFlags {
787                nx: *nx,
788                xx: *xx,
789                gt: *gt,
790                lt: *lt,
791                ch: *ch,
792            };
793            match ks.zadd(key, members, &flags) {
794                Ok(result) => ShardResponse::ZAddLen {
795                    count: result.count,
796                    applied: result.applied,
797                },
798                Err(WriteError::WrongType) => ShardResponse::WrongType,
799                Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
800            }
801        }
802        ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
803            Ok(removed) => ShardResponse::ZRemLen {
804                count: removed.len(),
805                removed,
806            },
807            Err(_) => ShardResponse::WrongType,
808        },
809        ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
810            Ok(score) => ShardResponse::Score(score),
811            Err(_) => ShardResponse::WrongType,
812        },
813        ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
814            Ok(rank) => ShardResponse::Rank(rank),
815            Err(_) => ShardResponse::WrongType,
816        },
817        ShardRequest::ZCard { key } => match ks.zcard(key) {
818            Ok(len) => ShardResponse::Len(len),
819            Err(_) => ShardResponse::WrongType,
820        },
821        ShardRequest::ZRange {
822            key, start, stop, ..
823        } => match ks.zrange(key, *start, *stop) {
824            Ok(items) => ShardResponse::ScoredArray(items),
825            Err(_) => ShardResponse::WrongType,
826        },
827        ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
828        ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
829        ShardRequest::FlushDb => {
830            ks.clear();
831            ShardResponse::Ok
832        }
833        ShardRequest::Scan {
834            cursor,
835            count,
836            pattern,
837        } => {
838            let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
839            ShardResponse::Scan {
840                cursor: next_cursor,
841                keys,
842            }
843        }
844        ShardRequest::HSet { key, fields } => match ks.hset(key, fields) {
845            Ok(count) => ShardResponse::Len(count),
846            Err(WriteError::WrongType) => ShardResponse::WrongType,
847            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
848        },
849        ShardRequest::HGet { key, field } => match ks.hget(key, field) {
850            Ok(val) => ShardResponse::Value(val.map(Value::String)),
851            Err(_) => ShardResponse::WrongType,
852        },
853        ShardRequest::HGetAll { key } => match ks.hgetall(key) {
854            Ok(fields) => ShardResponse::HashFields(fields),
855            Err(_) => ShardResponse::WrongType,
856        },
857        ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
858            Ok(removed) => ShardResponse::HDelLen {
859                count: removed.len(),
860                removed,
861            },
862            Err(_) => ShardResponse::WrongType,
863        },
864        ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
865            Ok(exists) => ShardResponse::Bool(exists),
866            Err(_) => ShardResponse::WrongType,
867        },
868        ShardRequest::HLen { key } => match ks.hlen(key) {
869            Ok(len) => ShardResponse::Len(len),
870            Err(_) => ShardResponse::WrongType,
871        },
872        ShardRequest::HIncrBy { key, field, delta } => match ks.hincrby(key, field, *delta) {
873            Ok(val) => ShardResponse::Integer(val),
874            Err(IncrError::WrongType) => ShardResponse::WrongType,
875            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
876            Err(e) => ShardResponse::Err(e.to_string()),
877        },
878        ShardRequest::HKeys { key } => match ks.hkeys(key) {
879            Ok(keys) => ShardResponse::StringArray(keys),
880            Err(_) => ShardResponse::WrongType,
881        },
882        ShardRequest::HVals { key } => match ks.hvals(key) {
883            Ok(vals) => ShardResponse::Array(vals),
884            Err(_) => ShardResponse::WrongType,
885        },
886        ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
887            Ok(vals) => ShardResponse::OptionalArray(vals),
888            Err(_) => ShardResponse::WrongType,
889        },
890        ShardRequest::SAdd { key, members } => match ks.sadd(key, members) {
891            Ok(count) => ShardResponse::Len(count),
892            Err(WriteError::WrongType) => ShardResponse::WrongType,
893            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
894        },
895        ShardRequest::SRem { key, members } => match ks.srem(key, members) {
896            Ok(count) => ShardResponse::Len(count),
897            Err(_) => ShardResponse::WrongType,
898        },
899        ShardRequest::SMembers { key } => match ks.smembers(key) {
900            Ok(members) => ShardResponse::StringArray(members),
901            Err(_) => ShardResponse::WrongType,
902        },
903        ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
904            Ok(exists) => ShardResponse::Bool(exists),
905            Err(_) => ShardResponse::WrongType,
906        },
907        ShardRequest::SCard { key } => match ks.scard(key) {
908            Ok(count) => ShardResponse::Len(count),
909            Err(_) => ShardResponse::WrongType,
910        },
911        ShardRequest::CountKeysInSlot { slot } => {
912            ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
913        }
914        ShardRequest::GetKeysInSlot { slot, count } => {
915            ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
916        }
917        #[cfg(feature = "protobuf")]
918        ShardRequest::ProtoSet {
919            key,
920            type_name,
921            data,
922            expire,
923            nx,
924            xx,
925        } => {
926            if *nx && ks.exists(key) {
927                return ShardResponse::Value(None);
928            }
929            if *xx && !ks.exists(key) {
930                return ShardResponse::Value(None);
931            }
932            match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
933                SetResult::Ok => ShardResponse::Ok,
934                SetResult::OutOfMemory => ShardResponse::OutOfMemory,
935            }
936        }
937        #[cfg(feature = "protobuf")]
938        ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
939            Ok(val) => ShardResponse::ProtoValue(val),
940            Err(_) => ShardResponse::WrongType,
941        },
942        #[cfg(feature = "protobuf")]
943        ShardRequest::ProtoType { key } => match ks.proto_type(key) {
944            Ok(name) => ShardResponse::ProtoTypeName(name),
945            Err(_) => ShardResponse::WrongType,
946        },
947        // ProtoRegisterAof is a no-op for the keyspace — the AOF record
948        // is written by the to_aof_record path after dispatch returns Ok.
949        #[cfg(feature = "protobuf")]
950        ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
951        #[cfg(feature = "protobuf")]
952        ShardRequest::ProtoSetField {
953            key,
954            field_path,
955            value,
956        } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
957            let new_data = reg.set_field(type_name, data, field_path, value)?;
958            Ok(ShardResponse::ProtoFieldUpdated {
959                type_name: type_name.to_owned(),
960                data: new_data,
961                expire: ttl,
962            })
963        }),
964        #[cfg(feature = "protobuf")]
965        ShardRequest::ProtoDelField { key, field_path } => {
966            dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
967                let new_data = reg.clear_field(type_name, data, field_path)?;
968                Ok(ShardResponse::ProtoFieldUpdated {
969                    type_name: type_name.to_owned(),
970                    data: new_data,
971                    expire: ttl,
972                })
973            })
974        }
975        // snapshot/rewrite/flush_async are handled in the main loop, not here
976        ShardRequest::Snapshot | ShardRequest::RewriteAof | ShardRequest::FlushDbAsync => {
977            ShardResponse::Ok
978        }
979    }
980}
981
982/// Shared logic for atomic proto field operations (SETFIELD/DELFIELD).
983///
984/// Reads the proto value, acquires the schema registry, calls the
985/// provided mutation closure, then writes the result back to the keyspace
986/// — all within the single-threaded shard dispatch.
987#[cfg(feature = "protobuf")]
988fn dispatch_proto_field_op<F>(
989    ks: &mut Keyspace,
990    schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
991    key: &str,
992    mutate: F,
993) -> ShardResponse
994where
995    F: FnOnce(
996        &crate::schema::SchemaRegistry,
997        &str,
998        &[u8],
999        Option<Duration>,
1000    ) -> Result<ShardResponse, crate::schema::SchemaError>,
1001{
1002    let registry = match schema_registry {
1003        Some(r) => r,
1004        None => return ShardResponse::Err("protobuf support is not enabled".into()),
1005    };
1006
1007    let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
1008        Ok(Some(tuple)) => tuple,
1009        Ok(None) => return ShardResponse::Value(None),
1010        Err(_) => return ShardResponse::WrongType,
1011    };
1012
1013    let reg = match registry.read() {
1014        Ok(r) => r,
1015        Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
1016    };
1017
1018    let resp = match mutate(&reg, &type_name, &data, remaining_ttl) {
1019        Ok(r) => r,
1020        Err(e) => return ShardResponse::Err(e.to_string()),
1021    };
1022
1023    // write the updated value back, preserving the original TTL
1024    if let ShardResponse::ProtoFieldUpdated {
1025        ref type_name,
1026        ref data,
1027        expire,
1028    } = resp
1029    {
1030        ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
1031    }
1032
1033    resp
1034}
1035
1036/// Converts a successful mutation request+response pair into an AOF record.
1037/// Returns None for non-mutation requests or failed mutations.
1038fn to_aof_record(req: &ShardRequest, resp: &ShardResponse) -> Option<AofRecord> {
1039    match (req, resp) {
1040        (
1041            ShardRequest::Set {
1042                key, value, expire, ..
1043            },
1044            ShardResponse::Ok,
1045        ) => {
1046            let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1047            Some(AofRecord::Set {
1048                key: key.clone(),
1049                value: value.clone(),
1050                expire_ms,
1051            })
1052        }
1053        (ShardRequest::Del { key }, ShardResponse::Bool(true))
1054        | (ShardRequest::Unlink { key }, ShardResponse::Bool(true)) => {
1055            Some(AofRecord::Del { key: key.clone() })
1056        }
1057        (ShardRequest::Expire { key, seconds }, ShardResponse::Bool(true)) => {
1058            Some(AofRecord::Expire {
1059                key: key.clone(),
1060                seconds: *seconds,
1061            })
1062        }
1063        (ShardRequest::LPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::LPush {
1064            key: key.clone(),
1065            values: values.clone(),
1066        }),
1067        (ShardRequest::RPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::RPush {
1068            key: key.clone(),
1069            values: values.clone(),
1070        }),
1071        (ShardRequest::LPop { key }, ShardResponse::Value(Some(_))) => {
1072            Some(AofRecord::LPop { key: key.clone() })
1073        }
1074        (ShardRequest::RPop { key }, ShardResponse::Value(Some(_))) => {
1075            Some(AofRecord::RPop { key: key.clone() })
1076        }
1077        (ShardRequest::ZAdd { key, .. }, ShardResponse::ZAddLen { applied, .. })
1078            if !applied.is_empty() =>
1079        {
1080            Some(AofRecord::ZAdd {
1081                key: key.clone(),
1082                members: applied.clone(),
1083            })
1084        }
1085        (ShardRequest::ZRem { key, .. }, ShardResponse::ZRemLen { removed, .. })
1086            if !removed.is_empty() =>
1087        {
1088            Some(AofRecord::ZRem {
1089                key: key.clone(),
1090                members: removed.clone(),
1091            })
1092        }
1093        (ShardRequest::Incr { key }, ShardResponse::Integer(_)) => {
1094            Some(AofRecord::Incr { key: key.clone() })
1095        }
1096        (ShardRequest::Decr { key }, ShardResponse::Integer(_)) => {
1097            Some(AofRecord::Decr { key: key.clone() })
1098        }
1099        (ShardRequest::IncrBy { key, delta }, ShardResponse::Integer(_)) => {
1100            Some(AofRecord::IncrBy {
1101                key: key.clone(),
1102                delta: *delta,
1103            })
1104        }
1105        (ShardRequest::DecrBy { key, delta }, ShardResponse::Integer(_)) => {
1106            Some(AofRecord::DecrBy {
1107                key: key.clone(),
1108                delta: *delta,
1109            })
1110        }
1111        // INCRBYFLOAT: record as a SET with the resulting value to avoid
1112        // float rounding drift during replay.
1113        (ShardRequest::IncrByFloat { key, .. }, ShardResponse::BulkString(val)) => {
1114            Some(AofRecord::Set {
1115                key: key.clone(),
1116                value: Bytes::from(val.clone()),
1117                expire_ms: -1,
1118            })
1119        }
1120        // APPEND: record the appended value for replay
1121        (ShardRequest::Append { key, value }, ShardResponse::Len(_)) => Some(AofRecord::Append {
1122            key: key.clone(),
1123            value: value.clone(),
1124        }),
1125        (ShardRequest::Rename { key, newkey }, ShardResponse::Ok) => Some(AofRecord::Rename {
1126            key: key.clone(),
1127            newkey: newkey.clone(),
1128        }),
1129        (ShardRequest::Persist { key }, ShardResponse::Bool(true)) => {
1130            Some(AofRecord::Persist { key: key.clone() })
1131        }
1132        (ShardRequest::Pexpire { key, milliseconds }, ShardResponse::Bool(true)) => {
1133            Some(AofRecord::Pexpire {
1134                key: key.clone(),
1135                milliseconds: *milliseconds,
1136            })
1137        }
1138        // Hash commands
1139        (ShardRequest::HSet { key, fields }, ShardResponse::Len(_)) => Some(AofRecord::HSet {
1140            key: key.clone(),
1141            fields: fields.clone(),
1142        }),
1143        (ShardRequest::HDel { key, .. }, ShardResponse::HDelLen { removed, .. })
1144            if !removed.is_empty() =>
1145        {
1146            Some(AofRecord::HDel {
1147                key: key.clone(),
1148                fields: removed.clone(),
1149            })
1150        }
1151        (ShardRequest::HIncrBy { key, field, delta }, ShardResponse::Integer(_)) => {
1152            Some(AofRecord::HIncrBy {
1153                key: key.clone(),
1154                field: field.clone(),
1155                delta: *delta,
1156            })
1157        }
1158        // Set commands
1159        (ShardRequest::SAdd { key, members }, ShardResponse::Len(count)) if *count > 0 => {
1160            Some(AofRecord::SAdd {
1161                key: key.clone(),
1162                members: members.clone(),
1163            })
1164        }
1165        (ShardRequest::SRem { key, members }, ShardResponse::Len(count)) if *count > 0 => {
1166            Some(AofRecord::SRem {
1167                key: key.clone(),
1168                members: members.clone(),
1169            })
1170        }
1171        // Proto commands
1172        #[cfg(feature = "protobuf")]
1173        (
1174            ShardRequest::ProtoSet {
1175                key,
1176                type_name,
1177                data,
1178                expire,
1179                ..
1180            },
1181            ShardResponse::Ok,
1182        ) => {
1183            let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1184            Some(AofRecord::ProtoSet {
1185                key: key.clone(),
1186                type_name: type_name.clone(),
1187                data: data.clone(),
1188                expire_ms,
1189            })
1190        }
1191        #[cfg(feature = "protobuf")]
1192        (ShardRequest::ProtoRegisterAof { name, descriptor }, ShardResponse::Ok) => {
1193            Some(AofRecord::ProtoRegister {
1194                name: name.clone(),
1195                descriptor: descriptor.clone(),
1196            })
1197        }
1198        // atomic field ops persist as a full ProtoSet (the whole re-encoded value)
1199        #[cfg(feature = "protobuf")]
1200        (
1201            ShardRequest::ProtoSetField { key, .. } | ShardRequest::ProtoDelField { key, .. },
1202            ShardResponse::ProtoFieldUpdated {
1203                type_name,
1204                data,
1205                expire,
1206            },
1207        ) => {
1208            let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1209            Some(AofRecord::ProtoSet {
1210                key: key.clone(),
1211                type_name: type_name.clone(),
1212                data: data.clone(),
1213                expire_ms,
1214            })
1215        }
1216        _ => None,
1217    }
1218}
1219
1220/// Writes a snapshot of the current keyspace.
1221fn handle_snapshot(
1222    keyspace: &Keyspace,
1223    persistence: &Option<ShardPersistenceConfig>,
1224    shard_id: u16,
1225) -> ShardResponse {
1226    let pcfg = match persistence {
1227        Some(p) => p,
1228        None => return ShardResponse::Err("persistence not configured".into()),
1229    };
1230
1231    let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1232    let result = write_snapshot(
1233        keyspace,
1234        &path,
1235        shard_id,
1236        #[cfg(feature = "encryption")]
1237        pcfg.encryption_key.as_ref(),
1238    );
1239    match result {
1240        Ok(count) => {
1241            info!(shard_id, entries = count, "snapshot written");
1242            ShardResponse::Ok
1243        }
1244        Err(e) => {
1245            warn!(shard_id, "snapshot failed: {e}");
1246            ShardResponse::Err(format!("snapshot failed: {e}"))
1247        }
1248    }
1249}
1250
1251/// Writes a snapshot and then truncates the AOF.
1252///
1253/// When protobuf is enabled, re-persists all registered schemas to the
1254/// AOF after truncation so they survive the next restart.
1255fn handle_rewrite(
1256    keyspace: &Keyspace,
1257    persistence: &Option<ShardPersistenceConfig>,
1258    aof_writer: &mut Option<AofWriter>,
1259    shard_id: u16,
1260    #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1261) -> ShardResponse {
1262    let pcfg = match persistence {
1263        Some(p) => p,
1264        None => return ShardResponse::Err("persistence not configured".into()),
1265    };
1266
1267    let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1268    let result = write_snapshot(
1269        keyspace,
1270        &path,
1271        shard_id,
1272        #[cfg(feature = "encryption")]
1273        pcfg.encryption_key.as_ref(),
1274    );
1275    match result {
1276        Ok(count) => {
1277            // truncate AOF after successful snapshot
1278            if let Some(ref mut writer) = aof_writer {
1279                if let Err(e) = writer.truncate() {
1280                    warn!(shard_id, "aof truncate after rewrite failed: {e}");
1281                }
1282
1283                // re-persist schemas so they survive the next recovery
1284                #[cfg(feature = "protobuf")]
1285                if let Some(ref registry) = schema_registry {
1286                    if let Ok(reg) = registry.read() {
1287                        for (name, descriptor) in reg.iter_schemas() {
1288                            let record = AofRecord::ProtoRegister {
1289                                name: name.to_owned(),
1290                                descriptor: descriptor.clone(),
1291                            };
1292                            if let Err(e) = writer.write_record(&record) {
1293                                warn!(shard_id, "failed to re-persist schema after rewrite: {e}");
1294                            }
1295                        }
1296                    }
1297                }
1298
1299                // flush so schemas are durable before we report success
1300                if let Err(e) = writer.sync() {
1301                    warn!(shard_id, "aof sync after rewrite failed: {e}");
1302                }
1303            }
1304            info!(shard_id, entries = count, "aof rewrite complete");
1305            ShardResponse::Ok
1306        }
1307        Err(e) => {
1308            warn!(shard_id, "aof rewrite failed: {e}");
1309            ShardResponse::Err(format!("rewrite failed: {e}"))
1310        }
1311    }
1312}
1313
1314/// Iterates the keyspace and writes all live entries to a snapshot file.
1315fn write_snapshot(
1316    keyspace: &Keyspace,
1317    path: &std::path::Path,
1318    shard_id: u16,
1319    #[cfg(feature = "encryption")] encryption_key: Option<
1320        &ember_persistence::encryption::EncryptionKey,
1321    >,
1322) -> Result<u32, ember_persistence::format::FormatError> {
1323    #[cfg(feature = "encryption")]
1324    let mut writer = if let Some(key) = encryption_key {
1325        SnapshotWriter::create_encrypted(path, shard_id, key.clone())?
1326    } else {
1327        SnapshotWriter::create(path, shard_id)?
1328    };
1329    #[cfg(not(feature = "encryption"))]
1330    let mut writer = SnapshotWriter::create(path, shard_id)?;
1331    let mut count = 0u32;
1332
1333    for (key, value, ttl_ms) in keyspace.iter_entries() {
1334        let snap_value = match value {
1335            Value::String(data) => SnapValue::String(data.clone()),
1336            Value::List(deque) => SnapValue::List(deque.clone()),
1337            Value::SortedSet(ss) => {
1338                let members: Vec<(f64, String)> = ss
1339                    .iter()
1340                    .map(|(member, score)| (score, member.to_owned()))
1341                    .collect();
1342                SnapValue::SortedSet(members)
1343            }
1344            Value::Hash(map) => SnapValue::Hash(map.clone()),
1345            Value::Set(set) => SnapValue::Set(set.clone()),
1346            #[cfg(feature = "protobuf")]
1347            Value::Proto { type_name, data } => SnapValue::Proto {
1348                type_name: type_name.clone(),
1349                data: data.clone(),
1350            },
1351        };
1352        writer.write_entry(&SnapEntry {
1353            key: key.to_owned(),
1354            value: snap_value,
1355            expire_ms: ttl_ms,
1356        })?;
1357        count += 1;
1358    }
1359
1360    writer.finish()?;
1361    Ok(count)
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use super::*;
1367
1368    /// Test helper: dispatch without a schema registry.
1369    fn test_dispatch(ks: &mut Keyspace, req: &ShardRequest) -> ShardResponse {
1370        dispatch(
1371            ks,
1372            req,
1373            #[cfg(feature = "protobuf")]
1374            &None,
1375        )
1376    }
1377
1378    #[test]
1379    fn dispatch_set_and_get() {
1380        let mut ks = Keyspace::new();
1381
1382        let resp = test_dispatch(
1383            &mut ks,
1384            &ShardRequest::Set {
1385                key: "k".into(),
1386                value: Bytes::from("v"),
1387                expire: None,
1388                nx: false,
1389                xx: false,
1390            },
1391        );
1392        assert!(matches!(resp, ShardResponse::Ok));
1393
1394        let resp = test_dispatch(&mut ks, &ShardRequest::Get { key: "k".into() });
1395        match resp {
1396            ShardResponse::Value(Some(Value::String(data))) => {
1397                assert_eq!(data, Bytes::from("v"));
1398            }
1399            other => panic!("expected Value(Some(String)), got {other:?}"),
1400        }
1401    }
1402
1403    #[test]
1404    fn dispatch_get_missing() {
1405        let mut ks = Keyspace::new();
1406        let resp = test_dispatch(&mut ks, &ShardRequest::Get { key: "nope".into() });
1407        assert!(matches!(resp, ShardResponse::Value(None)));
1408    }
1409
1410    #[test]
1411    fn dispatch_del() {
1412        let mut ks = Keyspace::new();
1413        ks.set("key".into(), Bytes::from("val"), None);
1414
1415        let resp = test_dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1416        assert!(matches!(resp, ShardResponse::Bool(true)));
1417
1418        let resp = test_dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1419        assert!(matches!(resp, ShardResponse::Bool(false)));
1420    }
1421
1422    #[test]
1423    fn dispatch_exists() {
1424        let mut ks = Keyspace::new();
1425        ks.set("yes".into(), Bytes::from("here"), None);
1426
1427        let resp = test_dispatch(&mut ks, &ShardRequest::Exists { key: "yes".into() });
1428        assert!(matches!(resp, ShardResponse::Bool(true)));
1429
1430        let resp = test_dispatch(&mut ks, &ShardRequest::Exists { key: "no".into() });
1431        assert!(matches!(resp, ShardResponse::Bool(false)));
1432    }
1433
1434    #[test]
1435    fn dispatch_expire_and_ttl() {
1436        let mut ks = Keyspace::new();
1437        ks.set("key".into(), Bytes::from("val"), None);
1438
1439        let resp = test_dispatch(
1440            &mut ks,
1441            &ShardRequest::Expire {
1442                key: "key".into(),
1443                seconds: 60,
1444            },
1445        );
1446        assert!(matches!(resp, ShardResponse::Bool(true)));
1447
1448        let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1449        match resp {
1450            ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
1451            other => panic!("expected Ttl(Seconds), got {other:?}"),
1452        }
1453    }
1454
1455    #[test]
1456    fn dispatch_ttl_missing() {
1457        let mut ks = Keyspace::new();
1458        let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "gone".into() });
1459        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1460    }
1461
1462    #[tokio::test]
1463    async fn shard_round_trip() {
1464        let handle = spawn_shard(
1465            16,
1466            ShardConfig::default(),
1467            None,
1468            None,
1469            #[cfg(feature = "protobuf")]
1470            None,
1471        );
1472
1473        let resp = handle
1474            .send(ShardRequest::Set {
1475                key: "hello".into(),
1476                value: Bytes::from("world"),
1477                expire: None,
1478                nx: false,
1479                xx: false,
1480            })
1481            .await
1482            .unwrap();
1483        assert!(matches!(resp, ShardResponse::Ok));
1484
1485        let resp = handle
1486            .send(ShardRequest::Get {
1487                key: "hello".into(),
1488            })
1489            .await
1490            .unwrap();
1491        match resp {
1492            ShardResponse::Value(Some(Value::String(data))) => {
1493                assert_eq!(data, Bytes::from("world"));
1494            }
1495            other => panic!("expected Value(Some(String)), got {other:?}"),
1496        }
1497    }
1498
1499    #[tokio::test]
1500    async fn expired_key_through_shard() {
1501        let handle = spawn_shard(
1502            16,
1503            ShardConfig::default(),
1504            None,
1505            None,
1506            #[cfg(feature = "protobuf")]
1507            None,
1508        );
1509
1510        handle
1511            .send(ShardRequest::Set {
1512                key: "temp".into(),
1513                value: Bytes::from("gone"),
1514                expire: Some(Duration::from_millis(10)),
1515                nx: false,
1516                xx: false,
1517            })
1518            .await
1519            .unwrap();
1520
1521        tokio::time::sleep(Duration::from_millis(30)).await;
1522
1523        let resp = handle
1524            .send(ShardRequest::Get { key: "temp".into() })
1525            .await
1526            .unwrap();
1527        assert!(matches!(resp, ShardResponse::Value(None)));
1528    }
1529
1530    #[tokio::test]
1531    async fn active_expiration_cleans_up_without_access() {
1532        let handle = spawn_shard(
1533            16,
1534            ShardConfig::default(),
1535            None,
1536            None,
1537            #[cfg(feature = "protobuf")]
1538            None,
1539        );
1540
1541        // set a key with a short TTL
1542        handle
1543            .send(ShardRequest::Set {
1544                key: "ephemeral".into(),
1545                value: Bytes::from("temp"),
1546                expire: Some(Duration::from_millis(10)),
1547                nx: false,
1548                xx: false,
1549            })
1550            .await
1551            .unwrap();
1552
1553        // also set a persistent key
1554        handle
1555            .send(ShardRequest::Set {
1556                key: "persistent".into(),
1557                value: Bytes::from("stays"),
1558                expire: None,
1559                nx: false,
1560                xx: false,
1561            })
1562            .await
1563            .unwrap();
1564
1565        // wait long enough for the TTL to expire AND for the background
1566        // tick to fire (100ms interval + some slack)
1567        tokio::time::sleep(Duration::from_millis(250)).await;
1568
1569        // the ephemeral key should be gone even though we never accessed it
1570        let resp = handle
1571            .send(ShardRequest::Exists {
1572                key: "ephemeral".into(),
1573            })
1574            .await
1575            .unwrap();
1576        assert!(matches!(resp, ShardResponse::Bool(false)));
1577
1578        // the persistent key should still be there
1579        let resp = handle
1580            .send(ShardRequest::Exists {
1581                key: "persistent".into(),
1582            })
1583            .await
1584            .unwrap();
1585        assert!(matches!(resp, ShardResponse::Bool(true)));
1586    }
1587
1588    #[tokio::test]
1589    async fn shard_with_persistence_snapshot_and_recovery() {
1590        let dir = tempfile::tempdir().unwrap();
1591        let pcfg = ShardPersistenceConfig {
1592            data_dir: dir.path().to_owned(),
1593            append_only: true,
1594            fsync_policy: FsyncPolicy::Always,
1595            #[cfg(feature = "encryption")]
1596            encryption_key: None,
1597        };
1598        let config = ShardConfig {
1599            shard_id: 0,
1600            ..ShardConfig::default()
1601        };
1602
1603        // write some keys then trigger a snapshot
1604        {
1605            let handle = spawn_shard(
1606                16,
1607                config.clone(),
1608                Some(pcfg.clone()),
1609                None,
1610                #[cfg(feature = "protobuf")]
1611                None,
1612            );
1613            handle
1614                .send(ShardRequest::Set {
1615                    key: "a".into(),
1616                    value: Bytes::from("1"),
1617                    expire: None,
1618                    nx: false,
1619                    xx: false,
1620                })
1621                .await
1622                .unwrap();
1623            handle
1624                .send(ShardRequest::Set {
1625                    key: "b".into(),
1626                    value: Bytes::from("2"),
1627                    expire: Some(Duration::from_secs(300)),
1628                    nx: false,
1629                    xx: false,
1630                })
1631                .await
1632                .unwrap();
1633            handle.send(ShardRequest::Snapshot).await.unwrap();
1634            // write one more key that goes only to AOF
1635            handle
1636                .send(ShardRequest::Set {
1637                    key: "c".into(),
1638                    value: Bytes::from("3"),
1639                    expire: None,
1640                    nx: false,
1641                    xx: false,
1642                })
1643                .await
1644                .unwrap();
1645            // drop handle to shut down shard
1646        }
1647
1648        // give it a moment to flush
1649        tokio::time::sleep(Duration::from_millis(50)).await;
1650
1651        // start a new shard with the same config — should recover
1652        {
1653            let handle = spawn_shard(
1654                16,
1655                config,
1656                Some(pcfg),
1657                None,
1658                #[cfg(feature = "protobuf")]
1659                None,
1660            );
1661            // give it a moment to recover
1662            tokio::time::sleep(Duration::from_millis(50)).await;
1663
1664            let resp = handle
1665                .send(ShardRequest::Get { key: "a".into() })
1666                .await
1667                .unwrap();
1668            match resp {
1669                ShardResponse::Value(Some(Value::String(data))) => {
1670                    assert_eq!(data, Bytes::from("1"));
1671                }
1672                other => panic!("expected a=1, got {other:?}"),
1673            }
1674
1675            let resp = handle
1676                .send(ShardRequest::Get { key: "b".into() })
1677                .await
1678                .unwrap();
1679            assert!(matches!(resp, ShardResponse::Value(Some(_))));
1680
1681            let resp = handle
1682                .send(ShardRequest::Get { key: "c".into() })
1683                .await
1684                .unwrap();
1685            match resp {
1686                ShardResponse::Value(Some(Value::String(data))) => {
1687                    assert_eq!(data, Bytes::from("3"));
1688                }
1689                other => panic!("expected c=3, got {other:?}"),
1690            }
1691        }
1692    }
1693
1694    #[test]
1695    fn to_aof_record_for_set() {
1696        let req = ShardRequest::Set {
1697            key: "k".into(),
1698            value: Bytes::from("v"),
1699            expire: Some(Duration::from_secs(60)),
1700            nx: false,
1701            xx: false,
1702        };
1703        let resp = ShardResponse::Ok;
1704        let record = to_aof_record(&req, &resp).unwrap();
1705        match record {
1706            AofRecord::Set { key, expire_ms, .. } => {
1707                assert_eq!(key, "k");
1708                assert_eq!(expire_ms, 60_000);
1709            }
1710            other => panic!("expected Set, got {other:?}"),
1711        }
1712    }
1713
1714    #[test]
1715    fn to_aof_record_skips_failed_set() {
1716        let req = ShardRequest::Set {
1717            key: "k".into(),
1718            value: Bytes::from("v"),
1719            expire: None,
1720            nx: false,
1721            xx: false,
1722        };
1723        let resp = ShardResponse::OutOfMemory;
1724        assert!(to_aof_record(&req, &resp).is_none());
1725    }
1726
1727    #[test]
1728    fn to_aof_record_for_del() {
1729        let req = ShardRequest::Del { key: "k".into() };
1730        let resp = ShardResponse::Bool(true);
1731        let record = to_aof_record(&req, &resp).unwrap();
1732        assert!(matches!(record, AofRecord::Del { .. }));
1733    }
1734
1735    #[test]
1736    fn to_aof_record_skips_failed_del() {
1737        let req = ShardRequest::Del { key: "k".into() };
1738        let resp = ShardResponse::Bool(false);
1739        assert!(to_aof_record(&req, &resp).is_none());
1740    }
1741
1742    #[test]
1743    fn dispatch_incr_new_key() {
1744        let mut ks = Keyspace::new();
1745        let resp = test_dispatch(&mut ks, &ShardRequest::Incr { key: "c".into() });
1746        assert!(matches!(resp, ShardResponse::Integer(1)));
1747    }
1748
1749    #[test]
1750    fn dispatch_decr_existing() {
1751        let mut ks = Keyspace::new();
1752        ks.set("n".into(), Bytes::from("10"), None);
1753        let resp = test_dispatch(&mut ks, &ShardRequest::Decr { key: "n".into() });
1754        assert!(matches!(resp, ShardResponse::Integer(9)));
1755    }
1756
1757    #[test]
1758    fn dispatch_incr_non_integer() {
1759        let mut ks = Keyspace::new();
1760        ks.set("s".into(), Bytes::from("hello"), None);
1761        let resp = test_dispatch(&mut ks, &ShardRequest::Incr { key: "s".into() });
1762        assert!(matches!(resp, ShardResponse::Err(_)));
1763    }
1764
1765    #[test]
1766    fn dispatch_incrby() {
1767        let mut ks = Keyspace::new();
1768        ks.set("n".into(), Bytes::from("10"), None);
1769        let resp = test_dispatch(
1770            &mut ks,
1771            &ShardRequest::IncrBy {
1772                key: "n".into(),
1773                delta: 5,
1774            },
1775        );
1776        assert!(matches!(resp, ShardResponse::Integer(15)));
1777    }
1778
1779    #[test]
1780    fn dispatch_decrby() {
1781        let mut ks = Keyspace::new();
1782        ks.set("n".into(), Bytes::from("10"), None);
1783        let resp = test_dispatch(
1784            &mut ks,
1785            &ShardRequest::DecrBy {
1786                key: "n".into(),
1787                delta: 3,
1788            },
1789        );
1790        assert!(matches!(resp, ShardResponse::Integer(7)));
1791    }
1792
1793    #[test]
1794    fn dispatch_incrby_new_key() {
1795        let mut ks = Keyspace::new();
1796        let resp = test_dispatch(
1797            &mut ks,
1798            &ShardRequest::IncrBy {
1799                key: "new".into(),
1800                delta: 42,
1801            },
1802        );
1803        assert!(matches!(resp, ShardResponse::Integer(42)));
1804    }
1805
1806    #[test]
1807    fn dispatch_incrbyfloat() {
1808        let mut ks = Keyspace::new();
1809        ks.set("n".into(), Bytes::from("10.5"), None);
1810        let resp = test_dispatch(
1811            &mut ks,
1812            &ShardRequest::IncrByFloat {
1813                key: "n".into(),
1814                delta: 2.3,
1815            },
1816        );
1817        match resp {
1818            ShardResponse::BulkString(val) => {
1819                let f: f64 = val.parse().unwrap();
1820                assert!((f - 12.8).abs() < 0.001);
1821            }
1822            other => panic!("expected BulkString, got {other:?}"),
1823        }
1824    }
1825
1826    #[test]
1827    fn dispatch_append() {
1828        let mut ks = Keyspace::new();
1829        ks.set("k".into(), Bytes::from("hello"), None);
1830        let resp = test_dispatch(
1831            &mut ks,
1832            &ShardRequest::Append {
1833                key: "k".into(),
1834                value: Bytes::from(" world"),
1835            },
1836        );
1837        assert!(matches!(resp, ShardResponse::Len(11)));
1838    }
1839
1840    #[test]
1841    fn dispatch_strlen() {
1842        let mut ks = Keyspace::new();
1843        ks.set("k".into(), Bytes::from("hello"), None);
1844        let resp = test_dispatch(&mut ks, &ShardRequest::Strlen { key: "k".into() });
1845        assert!(matches!(resp, ShardResponse::Len(5)));
1846    }
1847
1848    #[test]
1849    fn dispatch_strlen_missing() {
1850        let mut ks = Keyspace::new();
1851        let resp = test_dispatch(&mut ks, &ShardRequest::Strlen { key: "nope".into() });
1852        assert!(matches!(resp, ShardResponse::Len(0)));
1853    }
1854
1855    #[test]
1856    fn to_aof_record_for_append() {
1857        let req = ShardRequest::Append {
1858            key: "k".into(),
1859            value: Bytes::from("data"),
1860        };
1861        let resp = ShardResponse::Len(10);
1862        let record = to_aof_record(&req, &resp).unwrap();
1863        match record {
1864            AofRecord::Append { key, value } => {
1865                assert_eq!(key, "k");
1866                assert_eq!(value, Bytes::from("data"));
1867            }
1868            other => panic!("expected Append, got {other:?}"),
1869        }
1870    }
1871
1872    #[test]
1873    fn dispatch_incrbyfloat_new_key() {
1874        let mut ks = Keyspace::new();
1875        let resp = test_dispatch(
1876            &mut ks,
1877            &ShardRequest::IncrByFloat {
1878                key: "new".into(),
1879                delta: 2.72,
1880            },
1881        );
1882        match resp {
1883            ShardResponse::BulkString(val) => {
1884                let f: f64 = val.parse().unwrap();
1885                assert!((f - 2.72).abs() < 0.001);
1886            }
1887            other => panic!("expected BulkString, got {other:?}"),
1888        }
1889    }
1890
1891    #[test]
1892    fn to_aof_record_for_incr() {
1893        let req = ShardRequest::Incr { key: "c".into() };
1894        let resp = ShardResponse::Integer(1);
1895        let record = to_aof_record(&req, &resp).unwrap();
1896        assert!(matches!(record, AofRecord::Incr { .. }));
1897    }
1898
1899    #[test]
1900    fn to_aof_record_for_decr() {
1901        let req = ShardRequest::Decr { key: "c".into() };
1902        let resp = ShardResponse::Integer(-1);
1903        let record = to_aof_record(&req, &resp).unwrap();
1904        assert!(matches!(record, AofRecord::Decr { .. }));
1905    }
1906
1907    #[test]
1908    fn to_aof_record_for_incrby() {
1909        let req = ShardRequest::IncrBy {
1910            key: "c".into(),
1911            delta: 5,
1912        };
1913        let resp = ShardResponse::Integer(15);
1914        let record = to_aof_record(&req, &resp).unwrap();
1915        match record {
1916            AofRecord::IncrBy { key, delta } => {
1917                assert_eq!(key, "c");
1918                assert_eq!(delta, 5);
1919            }
1920            other => panic!("expected IncrBy, got {other:?}"),
1921        }
1922    }
1923
1924    #[test]
1925    fn to_aof_record_for_decrby() {
1926        let req = ShardRequest::DecrBy {
1927            key: "c".into(),
1928            delta: 3,
1929        };
1930        let resp = ShardResponse::Integer(7);
1931        let record = to_aof_record(&req, &resp).unwrap();
1932        match record {
1933            AofRecord::DecrBy { key, delta } => {
1934                assert_eq!(key, "c");
1935                assert_eq!(delta, 3);
1936            }
1937            other => panic!("expected DecrBy, got {other:?}"),
1938        }
1939    }
1940
1941    #[test]
1942    fn dispatch_persist_removes_ttl() {
1943        let mut ks = Keyspace::new();
1944        ks.set(
1945            "key".into(),
1946            Bytes::from("val"),
1947            Some(Duration::from_secs(60)),
1948        );
1949
1950        let resp = test_dispatch(&mut ks, &ShardRequest::Persist { key: "key".into() });
1951        assert!(matches!(resp, ShardResponse::Bool(true)));
1952
1953        let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1954        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
1955    }
1956
1957    #[test]
1958    fn dispatch_persist_missing_key() {
1959        let mut ks = Keyspace::new();
1960        let resp = test_dispatch(&mut ks, &ShardRequest::Persist { key: "nope".into() });
1961        assert!(matches!(resp, ShardResponse::Bool(false)));
1962    }
1963
1964    #[test]
1965    fn dispatch_pttl() {
1966        let mut ks = Keyspace::new();
1967        ks.set(
1968            "key".into(),
1969            Bytes::from("val"),
1970            Some(Duration::from_secs(60)),
1971        );
1972
1973        let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1974        match resp {
1975            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1976                assert!(ms > 59_000 && ms <= 60_000);
1977            }
1978            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1979        }
1980    }
1981
1982    #[test]
1983    fn dispatch_pttl_missing() {
1984        let mut ks = Keyspace::new();
1985        let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "nope".into() });
1986        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1987    }
1988
1989    #[test]
1990    fn dispatch_pexpire() {
1991        let mut ks = Keyspace::new();
1992        ks.set("key".into(), Bytes::from("val"), None);
1993
1994        let resp = test_dispatch(
1995            &mut ks,
1996            &ShardRequest::Pexpire {
1997                key: "key".into(),
1998                milliseconds: 5000,
1999            },
2000        );
2001        assert!(matches!(resp, ShardResponse::Bool(true)));
2002
2003        let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
2004        match resp {
2005            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2006                assert!(ms > 4000 && ms <= 5000);
2007            }
2008            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2009        }
2010    }
2011
2012    #[test]
2013    fn to_aof_record_for_persist() {
2014        let req = ShardRequest::Persist { key: "k".into() };
2015        let resp = ShardResponse::Bool(true);
2016        let record = to_aof_record(&req, &resp).unwrap();
2017        assert!(matches!(record, AofRecord::Persist { .. }));
2018    }
2019
2020    #[test]
2021    fn to_aof_record_skips_failed_persist() {
2022        let req = ShardRequest::Persist { key: "k".into() };
2023        let resp = ShardResponse::Bool(false);
2024        assert!(to_aof_record(&req, &resp).is_none());
2025    }
2026
2027    #[test]
2028    fn to_aof_record_for_pexpire() {
2029        let req = ShardRequest::Pexpire {
2030            key: "k".into(),
2031            milliseconds: 5000,
2032        };
2033        let resp = ShardResponse::Bool(true);
2034        let record = to_aof_record(&req, &resp).unwrap();
2035        match record {
2036            AofRecord::Pexpire { key, milliseconds } => {
2037                assert_eq!(key, "k");
2038                assert_eq!(milliseconds, 5000);
2039            }
2040            other => panic!("expected Pexpire, got {other:?}"),
2041        }
2042    }
2043
2044    #[test]
2045    fn to_aof_record_skips_failed_pexpire() {
2046        let req = ShardRequest::Pexpire {
2047            key: "k".into(),
2048            milliseconds: 5000,
2049        };
2050        let resp = ShardResponse::Bool(false);
2051        assert!(to_aof_record(&req, &resp).is_none());
2052    }
2053
2054    #[test]
2055    fn dispatch_set_nx_when_key_missing() {
2056        let mut ks = Keyspace::new();
2057        let resp = test_dispatch(
2058            &mut ks,
2059            &ShardRequest::Set {
2060                key: "k".into(),
2061                value: Bytes::from("v"),
2062                expire: None,
2063                nx: true,
2064                xx: false,
2065            },
2066        );
2067        assert!(matches!(resp, ShardResponse::Ok));
2068        assert!(ks.exists("k"));
2069    }
2070
2071    #[test]
2072    fn dispatch_set_nx_when_key_exists() {
2073        let mut ks = Keyspace::new();
2074        ks.set("k".into(), Bytes::from("old"), None);
2075
2076        let resp = test_dispatch(
2077            &mut ks,
2078            &ShardRequest::Set {
2079                key: "k".into(),
2080                value: Bytes::from("new"),
2081                expire: None,
2082                nx: true,
2083                xx: false,
2084            },
2085        );
2086        // NX should block — returns nil
2087        assert!(matches!(resp, ShardResponse::Value(None)));
2088        // original value should remain
2089        match ks.get("k").unwrap() {
2090            Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2091            other => panic!("expected old value, got {other:?}"),
2092        }
2093    }
2094
2095    #[test]
2096    fn dispatch_set_xx_when_key_exists() {
2097        let mut ks = Keyspace::new();
2098        ks.set("k".into(), Bytes::from("old"), None);
2099
2100        let resp = test_dispatch(
2101            &mut ks,
2102            &ShardRequest::Set {
2103                key: "k".into(),
2104                value: Bytes::from("new"),
2105                expire: None,
2106                nx: false,
2107                xx: true,
2108            },
2109        );
2110        assert!(matches!(resp, ShardResponse::Ok));
2111        match ks.get("k").unwrap() {
2112            Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2113            other => panic!("expected new value, got {other:?}"),
2114        }
2115    }
2116
2117    #[test]
2118    fn dispatch_set_xx_when_key_missing() {
2119        let mut ks = Keyspace::new();
2120        let resp = test_dispatch(
2121            &mut ks,
2122            &ShardRequest::Set {
2123                key: "k".into(),
2124                value: Bytes::from("v"),
2125                expire: None,
2126                nx: false,
2127                xx: true,
2128            },
2129        );
2130        // XX should block — returns nil
2131        assert!(matches!(resp, ShardResponse::Value(None)));
2132        assert!(!ks.exists("k"));
2133    }
2134
2135    #[test]
2136    fn to_aof_record_skips_nx_blocked_set() {
2137        let req = ShardRequest::Set {
2138            key: "k".into(),
2139            value: Bytes::from("v"),
2140            expire: None,
2141            nx: true,
2142            xx: false,
2143        };
2144        // when NX blocks, the shard returns Value(None), not Ok
2145        let resp = ShardResponse::Value(None);
2146        assert!(to_aof_record(&req, &resp).is_none());
2147    }
2148
2149    #[test]
2150    fn dispatch_flushdb_clears_all_keys() {
2151        let mut ks = Keyspace::new();
2152        ks.set("a".into(), Bytes::from("1"), None);
2153        ks.set("b".into(), Bytes::from("2"), None);
2154
2155        assert_eq!(ks.len(), 2);
2156
2157        let resp = test_dispatch(&mut ks, &ShardRequest::FlushDb);
2158        assert!(matches!(resp, ShardResponse::Ok));
2159        assert_eq!(ks.len(), 0);
2160    }
2161
2162    #[test]
2163    fn dispatch_scan_returns_keys() {
2164        let mut ks = Keyspace::new();
2165        ks.set("user:1".into(), Bytes::from("a"), None);
2166        ks.set("user:2".into(), Bytes::from("b"), None);
2167        ks.set("item:1".into(), Bytes::from("c"), None);
2168
2169        let resp = test_dispatch(
2170            &mut ks,
2171            &ShardRequest::Scan {
2172                cursor: 0,
2173                count: 10,
2174                pattern: None,
2175            },
2176        );
2177
2178        match resp {
2179            ShardResponse::Scan { cursor, keys } => {
2180                assert_eq!(cursor, 0); // complete in one pass
2181                assert_eq!(keys.len(), 3);
2182            }
2183            _ => panic!("expected Scan response"),
2184        }
2185    }
2186
2187    #[test]
2188    fn dispatch_scan_with_pattern() {
2189        let mut ks = Keyspace::new();
2190        ks.set("user:1".into(), Bytes::from("a"), None);
2191        ks.set("user:2".into(), Bytes::from("b"), None);
2192        ks.set("item:1".into(), Bytes::from("c"), None);
2193
2194        let resp = test_dispatch(
2195            &mut ks,
2196            &ShardRequest::Scan {
2197                cursor: 0,
2198                count: 10,
2199                pattern: Some("user:*".into()),
2200            },
2201        );
2202
2203        match resp {
2204            ShardResponse::Scan { cursor, keys } => {
2205                assert_eq!(cursor, 0);
2206                assert_eq!(keys.len(), 2);
2207                for k in &keys {
2208                    assert!(k.starts_with("user:"));
2209                }
2210            }
2211            _ => panic!("expected Scan response"),
2212        }
2213    }
2214
2215    #[test]
2216    fn to_aof_record_for_hset() {
2217        let req = ShardRequest::HSet {
2218            key: "h".into(),
2219            fields: vec![("f1".into(), Bytes::from("v1"))],
2220        };
2221        let resp = ShardResponse::Len(1);
2222        let record = to_aof_record(&req, &resp).unwrap();
2223        match record {
2224            AofRecord::HSet { key, fields } => {
2225                assert_eq!(key, "h");
2226                assert_eq!(fields.len(), 1);
2227            }
2228            _ => panic!("expected HSet record"),
2229        }
2230    }
2231
2232    #[test]
2233    fn to_aof_record_for_hdel() {
2234        let req = ShardRequest::HDel {
2235            key: "h".into(),
2236            fields: vec!["f1".into(), "f2".into()],
2237        };
2238        let resp = ShardResponse::HDelLen {
2239            count: 2,
2240            removed: vec!["f1".into(), "f2".into()],
2241        };
2242        let record = to_aof_record(&req, &resp).unwrap();
2243        match record {
2244            AofRecord::HDel { key, fields } => {
2245                assert_eq!(key, "h");
2246                assert_eq!(fields.len(), 2);
2247            }
2248            _ => panic!("expected HDel record"),
2249        }
2250    }
2251
2252    #[test]
2253    fn to_aof_record_skips_hdel_when_none_removed() {
2254        let req = ShardRequest::HDel {
2255            key: "h".into(),
2256            fields: vec!["f1".into()],
2257        };
2258        let resp = ShardResponse::HDelLen {
2259            count: 0,
2260            removed: vec![],
2261        };
2262        assert!(to_aof_record(&req, &resp).is_none());
2263    }
2264
2265    #[test]
2266    fn to_aof_record_for_hincrby() {
2267        let req = ShardRequest::HIncrBy {
2268            key: "h".into(),
2269            field: "counter".into(),
2270            delta: 5,
2271        };
2272        let resp = ShardResponse::Integer(10);
2273        let record = to_aof_record(&req, &resp).unwrap();
2274        match record {
2275            AofRecord::HIncrBy { key, field, delta } => {
2276                assert_eq!(key, "h");
2277                assert_eq!(field, "counter");
2278                assert_eq!(delta, 5);
2279            }
2280            _ => panic!("expected HIncrBy record"),
2281        }
2282    }
2283
2284    #[test]
2285    fn to_aof_record_for_sadd() {
2286        let req = ShardRequest::SAdd {
2287            key: "s".into(),
2288            members: vec!["m1".into(), "m2".into()],
2289        };
2290        let resp = ShardResponse::Len(2);
2291        let record = to_aof_record(&req, &resp).unwrap();
2292        match record {
2293            AofRecord::SAdd { key, members } => {
2294                assert_eq!(key, "s");
2295                assert_eq!(members.len(), 2);
2296            }
2297            _ => panic!("expected SAdd record"),
2298        }
2299    }
2300
2301    #[test]
2302    fn to_aof_record_skips_sadd_when_none_added() {
2303        let req = ShardRequest::SAdd {
2304            key: "s".into(),
2305            members: vec!["m1".into()],
2306        };
2307        let resp = ShardResponse::Len(0);
2308        assert!(to_aof_record(&req, &resp).is_none());
2309    }
2310
2311    #[test]
2312    fn to_aof_record_for_srem() {
2313        let req = ShardRequest::SRem {
2314            key: "s".into(),
2315            members: vec!["m1".into()],
2316        };
2317        let resp = ShardResponse::Len(1);
2318        let record = to_aof_record(&req, &resp).unwrap();
2319        match record {
2320            AofRecord::SRem { key, members } => {
2321                assert_eq!(key, "s");
2322                assert_eq!(members.len(), 1);
2323            }
2324            _ => panic!("expected SRem record"),
2325        }
2326    }
2327
2328    #[test]
2329    fn to_aof_record_skips_srem_when_none_removed() {
2330        let req = ShardRequest::SRem {
2331            key: "s".into(),
2332            members: vec!["m1".into()],
2333        };
2334        let resp = ShardResponse::Len(0);
2335        assert!(to_aof_record(&req, &resp).is_none());
2336    }
2337
2338    #[test]
2339    fn dispatch_keys() {
2340        let mut ks = Keyspace::new();
2341        ks.set("user:1".into(), Bytes::from("a"), None);
2342        ks.set("user:2".into(), Bytes::from("b"), None);
2343        ks.set("item:1".into(), Bytes::from("c"), None);
2344        let resp = test_dispatch(
2345            &mut ks,
2346            &ShardRequest::Keys {
2347                pattern: "user:*".into(),
2348            },
2349        );
2350        match resp {
2351            ShardResponse::StringArray(mut keys) => {
2352                keys.sort();
2353                assert_eq!(keys, vec!["user:1", "user:2"]);
2354            }
2355            other => panic!("expected StringArray, got {other:?}"),
2356        }
2357    }
2358
2359    #[test]
2360    fn dispatch_rename() {
2361        let mut ks = Keyspace::new();
2362        ks.set("old".into(), Bytes::from("value"), None);
2363        let resp = test_dispatch(
2364            &mut ks,
2365            &ShardRequest::Rename {
2366                key: "old".into(),
2367                newkey: "new".into(),
2368            },
2369        );
2370        assert!(matches!(resp, ShardResponse::Ok));
2371        assert!(!ks.exists("old"));
2372        assert!(ks.exists("new"));
2373    }
2374
2375    #[test]
2376    fn dispatch_rename_missing_key() {
2377        let mut ks = Keyspace::new();
2378        let resp = test_dispatch(
2379            &mut ks,
2380            &ShardRequest::Rename {
2381                key: "missing".into(),
2382                newkey: "new".into(),
2383            },
2384        );
2385        assert!(matches!(resp, ShardResponse::Err(_)));
2386    }
2387
2388    #[test]
2389    fn to_aof_record_for_rename() {
2390        let req = ShardRequest::Rename {
2391            key: "old".into(),
2392            newkey: "new".into(),
2393        };
2394        let resp = ShardResponse::Ok;
2395        let record = to_aof_record(&req, &resp).unwrap();
2396        match record {
2397            AofRecord::Rename { key, newkey } => {
2398                assert_eq!(key, "old");
2399                assert_eq!(newkey, "new");
2400            }
2401            other => panic!("expected Rename, got {other:?}"),
2402        }
2403    }
2404}