Skip to main content

ember_core/
shard.rs

1//! Shard: an independent partition of the keyspace.
2//!
3//! Each shard runs as its own tokio task, owning a `Keyspace` with no
4//! internal locking. Commands arrive over an mpsc channel and responses
5//! go back on a per-request oneshot. A background tick drives active
6//! expiration of TTL'd keys.
7
8use std::path::PathBuf;
9use std::time::Duration;
10
11use bytes::Bytes;
12use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
13use ember_persistence::recovery::{self, RecoveredValue};
14use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
15use tokio::sync::{mpsc, oneshot};
16use tracing::{info, warn};
17
18use crate::dropper::DropHandle;
19use crate::error::ShardError;
20use crate::expiry;
21use crate::keyspace::{
22    IncrError, IncrFloatError, Keyspace, KeyspaceStats, SetResult, ShardConfig, TtlResult,
23    WriteError,
24};
25use crate::types::sorted_set::ZAddFlags;
26use crate::types::Value;
27
28/// How often the shard runs active expiration. 100ms matches
29/// Redis's hz=10 default and keeps CPU overhead negligible.
30const EXPIRY_TICK: Duration = Duration::from_millis(100);
31
32/// How often to fsync when using the `EverySec` policy.
33const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
34
35/// Optional persistence configuration for a shard.
36#[derive(Debug, Clone)]
37pub struct ShardPersistenceConfig {
38    /// Directory where AOF and snapshot files live.
39    pub data_dir: PathBuf,
40    /// Whether to write an AOF log of mutations.
41    pub append_only: bool,
42    /// When to fsync the AOF file.
43    pub fsync_policy: FsyncPolicy,
44    /// Optional encryption key for encrypting data at rest.
45    /// When set, AOF and snapshot files use the v3 encrypted format.
46    #[cfg(feature = "encryption")]
47    pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
48}
49
50/// A protocol-agnostic command sent to a shard.
51#[derive(Debug)]
52pub enum ShardRequest {
53    Get {
54        key: String,
55    },
56    Set {
57        key: String,
58        value: Bytes,
59        expire: Option<Duration>,
60        /// Only set the key if it does not already exist.
61        nx: bool,
62        /// Only set the key if it already exists.
63        xx: bool,
64    },
65    Incr {
66        key: String,
67    },
68    Decr {
69        key: String,
70    },
71    IncrBy {
72        key: String,
73        delta: i64,
74    },
75    DecrBy {
76        key: String,
77        delta: i64,
78    },
79    IncrByFloat {
80        key: String,
81        delta: f64,
82    },
83    Append {
84        key: String,
85        value: Bytes,
86    },
87    Strlen {
88        key: String,
89    },
90    /// Returns all keys matching a glob pattern in this shard.
91    Keys {
92        pattern: String,
93    },
94    /// Renames a key within this shard.
95    Rename {
96        key: String,
97        newkey: String,
98    },
99    Del {
100        key: String,
101    },
102    /// Like DEL but defers value deallocation to the background drop thread.
103    Unlink {
104        key: String,
105    },
106    Exists {
107        key: String,
108    },
109    Expire {
110        key: String,
111        seconds: u64,
112    },
113    Ttl {
114        key: String,
115    },
116    Persist {
117        key: String,
118    },
119    Pttl {
120        key: String,
121    },
122    Pexpire {
123        key: String,
124        milliseconds: u64,
125    },
126    LPush {
127        key: String,
128        values: Vec<Bytes>,
129    },
130    RPush {
131        key: String,
132        values: Vec<Bytes>,
133    },
134    LPop {
135        key: String,
136    },
137    RPop {
138        key: String,
139    },
140    LRange {
141        key: String,
142        start: i64,
143        stop: i64,
144    },
145    LLen {
146        key: String,
147    },
148    Type {
149        key: String,
150    },
151    ZAdd {
152        key: String,
153        members: Vec<(f64, String)>,
154        nx: bool,
155        xx: bool,
156        gt: bool,
157        lt: bool,
158        ch: bool,
159    },
160    ZRem {
161        key: String,
162        members: Vec<String>,
163    },
164    ZScore {
165        key: String,
166        member: String,
167    },
168    ZRank {
169        key: String,
170        member: String,
171    },
172    ZCard {
173        key: String,
174    },
175    ZRange {
176        key: String,
177        start: i64,
178        stop: i64,
179        with_scores: bool,
180    },
181    HSet {
182        key: String,
183        fields: Vec<(String, Bytes)>,
184    },
185    HGet {
186        key: String,
187        field: String,
188    },
189    HGetAll {
190        key: String,
191    },
192    HDel {
193        key: String,
194        fields: Vec<String>,
195    },
196    HExists {
197        key: String,
198        field: String,
199    },
200    HLen {
201        key: String,
202    },
203    HIncrBy {
204        key: String,
205        field: String,
206        delta: i64,
207    },
208    HKeys {
209        key: String,
210    },
211    HVals {
212        key: String,
213    },
214    HMGet {
215        key: String,
216        fields: Vec<String>,
217    },
218    SAdd {
219        key: String,
220        members: Vec<String>,
221    },
222    SRem {
223        key: String,
224        members: Vec<String>,
225    },
226    SMembers {
227        key: String,
228    },
229    SIsMember {
230        key: String,
231        member: String,
232    },
233    SCard {
234        key: String,
235    },
236    /// Returns the key count for this shard.
237    DbSize,
238    /// Returns keyspace stats for this shard.
239    Stats,
240    /// Triggers a snapshot write.
241    Snapshot,
242    /// Triggers an AOF rewrite (snapshot + truncate AOF).
243    RewriteAof,
244    /// Clears all keys from the keyspace.
245    FlushDb,
246    /// Clears all keys, deferring deallocation to the background drop thread.
247    FlushDbAsync,
248    /// Scans keys in the keyspace.
249    Scan {
250        cursor: u64,
251        count: usize,
252        pattern: Option<String>,
253    },
254    /// Counts keys in this shard that hash to the given cluster slot.
255    CountKeysInSlot {
256        slot: u16,
257    },
258    /// Returns up to `count` keys that hash to the given cluster slot.
259    GetKeysInSlot {
260        slot: u16,
261        count: usize,
262    },
263}
264
265/// The shard's response to a request.
266#[derive(Debug)]
267pub enum ShardResponse {
268    /// A value (or None for a cache miss).
269    Value(Option<Value>),
270    /// Simple acknowledgement (e.g. SET).
271    Ok,
272    /// Integer result (e.g. INCR, DECR).
273    Integer(i64),
274    /// Boolean result (e.g. DEL, EXISTS, EXPIRE).
275    Bool(bool),
276    /// TTL query result.
277    Ttl(TtlResult),
278    /// Memory limit reached and eviction policy is NoEviction.
279    OutOfMemory,
280    /// Key count for a shard (DBSIZE).
281    KeyCount(usize),
282    /// Full stats for a shard (INFO).
283    Stats(KeyspaceStats),
284    /// Integer length result (e.g. LPUSH, RPUSH, LLEN).
285    Len(usize),
286    /// Array of bulk values (e.g. LRANGE).
287    Array(Vec<Bytes>),
288    /// The type name of a stored value.
289    TypeName(&'static str),
290    /// ZADD result: count for the client + actually applied members for AOF.
291    ZAddLen {
292        count: usize,
293        applied: Vec<(f64, String)>,
294    },
295    /// ZREM result: count for the client + actually removed members for AOF.
296    ZRemLen { count: usize, removed: Vec<String> },
297    /// Float score result (e.g. ZSCORE).
298    Score(Option<f64>),
299    /// Rank result (e.g. ZRANK).
300    Rank(Option<usize>),
301    /// Scored array of (member, score) pairs (e.g. ZRANGE).
302    ScoredArray(Vec<(String, f64)>),
303    /// A bulk string result (e.g. INCRBYFLOAT).
304    BulkString(String),
305    /// Command used against a key holding the wrong kind of value.
306    WrongType,
307    /// An error message.
308    Err(String),
309    /// Scan result: next cursor and list of keys.
310    Scan { cursor: u64, keys: Vec<String> },
311    /// HGETALL result: all field-value pairs.
312    HashFields(Vec<(String, Bytes)>),
313    /// HDEL result: removed count + field names for AOF.
314    HDelLen { count: usize, removed: Vec<String> },
315    /// Array of strings (e.g. HKEYS).
316    StringArray(Vec<String>),
317    /// HMGET result: array of optional values.
318    OptionalArray(Vec<Option<Bytes>>),
319}
320
321/// A request bundled with its reply channel.
322#[derive(Debug)]
323pub struct ShardMessage {
324    pub request: ShardRequest,
325    pub reply: oneshot::Sender<ShardResponse>,
326}
327
328/// A cloneable handle for sending commands to a shard task.
329///
330/// Wraps the mpsc sender so callers don't need to manage oneshot
331/// channels directly.
332#[derive(Debug, Clone)]
333pub struct ShardHandle {
334    tx: mpsc::Sender<ShardMessage>,
335}
336
337impl ShardHandle {
338    /// Sends a request and waits for the response.
339    ///
340    /// Returns `ShardError::Unavailable` if the shard task has stopped.
341    pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
342        let rx = self.dispatch(request).await?;
343        rx.await.map_err(|_| ShardError::Unavailable)
344    }
345
346    /// Sends a request and returns the reply channel without waiting
347    /// for the response. Used by `Engine::broadcast` to fan out to
348    /// all shards before collecting results.
349    pub(crate) async fn dispatch(
350        &self,
351        request: ShardRequest,
352    ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
353        let (reply_tx, reply_rx) = oneshot::channel();
354        let msg = ShardMessage {
355            request,
356            reply: reply_tx,
357        };
358        self.tx
359            .send(msg)
360            .await
361            .map_err(|_| ShardError::Unavailable)?;
362        Ok(reply_rx)
363    }
364}
365
366/// Spawns a shard task and returns the handle for communicating with it.
367///
368/// `buffer` controls the mpsc channel capacity — higher values absorb
369/// burst traffic at the cost of memory. When `drop_handle` is provided,
370/// large value deallocations are deferred to the background drop thread.
371pub fn spawn_shard(
372    buffer: usize,
373    config: ShardConfig,
374    persistence: Option<ShardPersistenceConfig>,
375    drop_handle: Option<DropHandle>,
376) -> ShardHandle {
377    let (tx, rx) = mpsc::channel(buffer);
378    tokio::spawn(run_shard(rx, config, persistence, drop_handle));
379    ShardHandle { tx }
380}
381
382/// The shard's main loop. Processes messages and runs periodic
383/// active expiration until the channel closes.
384async fn run_shard(
385    mut rx: mpsc::Receiver<ShardMessage>,
386    config: ShardConfig,
387    persistence: Option<ShardPersistenceConfig>,
388    drop_handle: Option<DropHandle>,
389) {
390    let shard_id = config.shard_id;
391    let mut keyspace = Keyspace::with_config(config);
392
393    if let Some(handle) = drop_handle.clone() {
394        keyspace.set_drop_handle(handle);
395    }
396
397    // -- recovery --
398    if let Some(ref pcfg) = persistence {
399        #[cfg(feature = "encryption")]
400        let result = if let Some(ref key) = pcfg.encryption_key {
401            recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
402        } else {
403            recovery::recover_shard(&pcfg.data_dir, shard_id)
404        };
405        #[cfg(not(feature = "encryption"))]
406        let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
407        let count = result.entries.len();
408        for entry in result.entries {
409            let value = match entry.value {
410                RecoveredValue::String(data) => Value::String(data),
411                RecoveredValue::List(deque) => Value::List(deque),
412                RecoveredValue::SortedSet(members) => {
413                    let mut ss = crate::types::sorted_set::SortedSet::new();
414                    for (score, member) in members {
415                        ss.add(member, score);
416                    }
417                    Value::SortedSet(ss)
418                }
419                RecoveredValue::Hash(map) => Value::Hash(map),
420                RecoveredValue::Set(set) => Value::Set(set),
421            };
422            keyspace.restore(entry.key, value, entry.ttl);
423        }
424        if count > 0 {
425            info!(
426                shard_id,
427                recovered_keys = count,
428                snapshot = result.loaded_snapshot,
429                aof = result.replayed_aof,
430                "recovered shard state"
431            );
432        }
433    }
434
435    // -- AOF writer --
436    let mut aof_writer: Option<AofWriter> = match &persistence {
437        Some(pcfg) if pcfg.append_only => {
438            let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
439            #[cfg(feature = "encryption")]
440            let result = if let Some(ref key) = pcfg.encryption_key {
441                AofWriter::open_encrypted(path, key.clone())
442            } else {
443                AofWriter::open(path)
444            };
445            #[cfg(not(feature = "encryption"))]
446            let result = AofWriter::open(path);
447            match result {
448                Ok(w) => Some(w),
449                Err(e) => {
450                    warn!(shard_id, "failed to open AOF writer: {e}");
451                    None
452                }
453            }
454        }
455        _ => None,
456    };
457
458    let fsync_policy = persistence
459        .as_ref()
460        .map(|p| p.fsync_policy)
461        .unwrap_or(FsyncPolicy::No);
462
463    // -- tickers --
464    let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
465    expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
466
467    let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
468    fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469
470    loop {
471        tokio::select! {
472            msg = rx.recv() => {
473                match msg {
474                    Some(msg) => {
475                        let request_kind = describe_request(&msg.request);
476                        let response = dispatch(&mut keyspace, &msg.request);
477
478                        // write AOF record for successful mutations
479                        if let Some(ref mut writer) = aof_writer {
480                            if let Some(record) = to_aof_record(&msg.request, &response) {
481                                if let Err(e) = writer.write_record(&record) {
482                                    warn!(shard_id, "aof write failed: {e}");
483                                }
484                                if fsync_policy == FsyncPolicy::Always {
485                                    if let Err(e) = writer.sync() {
486                                        warn!(shard_id, "aof sync failed: {e}");
487                                    }
488                                }
489                            }
490                        }
491
492                        // handle snapshot/rewrite (these need mutable access
493                        // to both keyspace and aof_writer)
494                        match request_kind {
495                            RequestKind::Snapshot => {
496                                let resp = handle_snapshot(
497                                    &keyspace, &persistence, shard_id,
498                                );
499                                let _ = msg.reply.send(resp);
500                                continue;
501                            }
502                            RequestKind::RewriteAof => {
503                                let resp = handle_rewrite(
504                                    &keyspace,
505                                    &persistence,
506                                    &mut aof_writer,
507                                    shard_id,
508                                );
509                                let _ = msg.reply.send(resp);
510                                continue;
511                            }
512                            RequestKind::FlushDbAsync => {
513                                let old_entries = keyspace.flush_async();
514                                if let Some(ref handle) = drop_handle {
515                                    handle.defer_entries(old_entries);
516                                }
517                                // else: old_entries drops inline here
518                                let _ = msg.reply.send(ShardResponse::Ok);
519                                continue;
520                            }
521                            RequestKind::Other => {}
522                        }
523
524                        let _ = msg.reply.send(response);
525                    }
526                    None => break, // channel closed, shard shutting down
527                }
528            }
529            _ = expiry_tick.tick() => {
530                expiry::run_expiration_cycle(&mut keyspace);
531            }
532            _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
533                if let Some(ref mut writer) = aof_writer {
534                    if let Err(e) = writer.sync() {
535                        warn!(shard_id, "periodic aof sync failed: {e}");
536                    }
537                }
538            }
539        }
540    }
541
542    // flush AOF on clean shutdown
543    if let Some(ref mut writer) = aof_writer {
544        let _ = writer.sync();
545    }
546}
547
548/// Lightweight tag so we can identify requests that need special
549/// handling after dispatch without borrowing the request again.
550enum RequestKind {
551    Snapshot,
552    RewriteAof,
553    FlushDbAsync,
554    Other,
555}
556
557fn describe_request(req: &ShardRequest) -> RequestKind {
558    match req {
559        ShardRequest::Snapshot => RequestKind::Snapshot,
560        ShardRequest::RewriteAof => RequestKind::RewriteAof,
561        ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
562        _ => RequestKind::Other,
563    }
564}
565
566/// Executes a single request against the keyspace.
567fn dispatch(ks: &mut Keyspace, req: &ShardRequest) -> ShardResponse {
568    match req {
569        ShardRequest::Get { key } => match ks.get(key) {
570            Ok(val) => ShardResponse::Value(val),
571            Err(_) => ShardResponse::WrongType,
572        },
573        ShardRequest::Set {
574            key,
575            value,
576            expire,
577            nx,
578            xx,
579        } => {
580            // NX: only set if key does NOT already exist
581            if *nx && ks.exists(key) {
582                return ShardResponse::Value(None);
583            }
584            // XX: only set if key DOES already exist
585            if *xx && !ks.exists(key) {
586                return ShardResponse::Value(None);
587            }
588            match ks.set(key.clone(), value.clone(), *expire) {
589                SetResult::Ok => ShardResponse::Ok,
590                SetResult::OutOfMemory => ShardResponse::OutOfMemory,
591            }
592        }
593        ShardRequest::Incr { key } => match ks.incr(key) {
594            Ok(val) => ShardResponse::Integer(val),
595            Err(IncrError::WrongType) => ShardResponse::WrongType,
596            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
597            Err(e) => ShardResponse::Err(e.to_string()),
598        },
599        ShardRequest::Decr { key } => match ks.decr(key) {
600            Ok(val) => ShardResponse::Integer(val),
601            Err(IncrError::WrongType) => ShardResponse::WrongType,
602            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
603            Err(e) => ShardResponse::Err(e.to_string()),
604        },
605        ShardRequest::IncrBy { key, delta } => match ks.incr_by(key, *delta) {
606            Ok(val) => ShardResponse::Integer(val),
607            Err(IncrError::WrongType) => ShardResponse::WrongType,
608            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
609            Err(e) => ShardResponse::Err(e.to_string()),
610        },
611        ShardRequest::DecrBy { key, delta } => match ks.incr_by(key, -delta) {
612            Ok(val) => ShardResponse::Integer(val),
613            Err(IncrError::WrongType) => ShardResponse::WrongType,
614            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
615            Err(e) => ShardResponse::Err(e.to_string()),
616        },
617        ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
618            Ok(val) => ShardResponse::BulkString(val),
619            Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
620            Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
621            Err(e) => ShardResponse::Err(e.to_string()),
622        },
623        ShardRequest::Append { key, value } => match ks.append(key, value) {
624            Ok(len) => ShardResponse::Len(len),
625            Err(WriteError::WrongType) => ShardResponse::WrongType,
626            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
627        },
628        ShardRequest::Strlen { key } => match ks.strlen(key) {
629            Ok(len) => ShardResponse::Len(len),
630            Err(_) => ShardResponse::WrongType,
631        },
632        ShardRequest::Keys { pattern } => {
633            let keys = ks.keys(pattern);
634            ShardResponse::StringArray(keys)
635        }
636        ShardRequest::Rename { key, newkey } => {
637            use crate::keyspace::RenameError;
638            match ks.rename(key, newkey) {
639                Ok(()) => ShardResponse::Ok,
640                Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
641            }
642        }
643        ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
644        ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
645        ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
646        ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
647        ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
648        ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
649        ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
650        ShardRequest::Pexpire { key, milliseconds } => {
651            ShardResponse::Bool(ks.pexpire(key, *milliseconds))
652        }
653        ShardRequest::LPush { key, values } => match ks.lpush(key, values) {
654            Ok(len) => ShardResponse::Len(len),
655            Err(WriteError::WrongType) => ShardResponse::WrongType,
656            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
657        },
658        ShardRequest::RPush { key, values } => match ks.rpush(key, values) {
659            Ok(len) => ShardResponse::Len(len),
660            Err(WriteError::WrongType) => ShardResponse::WrongType,
661            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
662        },
663        ShardRequest::LPop { key } => match ks.lpop(key) {
664            Ok(val) => ShardResponse::Value(val.map(Value::String)),
665            Err(_) => ShardResponse::WrongType,
666        },
667        ShardRequest::RPop { key } => match ks.rpop(key) {
668            Ok(val) => ShardResponse::Value(val.map(Value::String)),
669            Err(_) => ShardResponse::WrongType,
670        },
671        ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
672            Ok(items) => ShardResponse::Array(items),
673            Err(_) => ShardResponse::WrongType,
674        },
675        ShardRequest::LLen { key } => match ks.llen(key) {
676            Ok(len) => ShardResponse::Len(len),
677            Err(_) => ShardResponse::WrongType,
678        },
679        ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
680        ShardRequest::ZAdd {
681            key,
682            members,
683            nx,
684            xx,
685            gt,
686            lt,
687            ch,
688        } => {
689            let flags = ZAddFlags {
690                nx: *nx,
691                xx: *xx,
692                gt: *gt,
693                lt: *lt,
694                ch: *ch,
695            };
696            match ks.zadd(key, members, &flags) {
697                Ok(result) => ShardResponse::ZAddLen {
698                    count: result.count,
699                    applied: result.applied,
700                },
701                Err(WriteError::WrongType) => ShardResponse::WrongType,
702                Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
703            }
704        }
705        ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
706            Ok(removed) => ShardResponse::ZRemLen {
707                count: removed.len(),
708                removed,
709            },
710            Err(_) => ShardResponse::WrongType,
711        },
712        ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
713            Ok(score) => ShardResponse::Score(score),
714            Err(_) => ShardResponse::WrongType,
715        },
716        ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
717            Ok(rank) => ShardResponse::Rank(rank),
718            Err(_) => ShardResponse::WrongType,
719        },
720        ShardRequest::ZCard { key } => match ks.zcard(key) {
721            Ok(len) => ShardResponse::Len(len),
722            Err(_) => ShardResponse::WrongType,
723        },
724        ShardRequest::ZRange {
725            key, start, stop, ..
726        } => match ks.zrange(key, *start, *stop) {
727            Ok(items) => ShardResponse::ScoredArray(items),
728            Err(_) => ShardResponse::WrongType,
729        },
730        ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
731        ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
732        ShardRequest::FlushDb => {
733            ks.clear();
734            ShardResponse::Ok
735        }
736        ShardRequest::Scan {
737            cursor,
738            count,
739            pattern,
740        } => {
741            let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
742            ShardResponse::Scan {
743                cursor: next_cursor,
744                keys,
745            }
746        }
747        ShardRequest::HSet { key, fields } => match ks.hset(key, fields) {
748            Ok(count) => ShardResponse::Len(count),
749            Err(WriteError::WrongType) => ShardResponse::WrongType,
750            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
751        },
752        ShardRequest::HGet { key, field } => match ks.hget(key, field) {
753            Ok(val) => ShardResponse::Value(val.map(Value::String)),
754            Err(_) => ShardResponse::WrongType,
755        },
756        ShardRequest::HGetAll { key } => match ks.hgetall(key) {
757            Ok(fields) => ShardResponse::HashFields(fields),
758            Err(_) => ShardResponse::WrongType,
759        },
760        ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
761            Ok(removed) => ShardResponse::HDelLen {
762                count: removed.len(),
763                removed,
764            },
765            Err(_) => ShardResponse::WrongType,
766        },
767        ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
768            Ok(exists) => ShardResponse::Bool(exists),
769            Err(_) => ShardResponse::WrongType,
770        },
771        ShardRequest::HLen { key } => match ks.hlen(key) {
772            Ok(len) => ShardResponse::Len(len),
773            Err(_) => ShardResponse::WrongType,
774        },
775        ShardRequest::HIncrBy { key, field, delta } => match ks.hincrby(key, field, *delta) {
776            Ok(val) => ShardResponse::Integer(val),
777            Err(IncrError::WrongType) => ShardResponse::WrongType,
778            Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
779            Err(e) => ShardResponse::Err(e.to_string()),
780        },
781        ShardRequest::HKeys { key } => match ks.hkeys(key) {
782            Ok(keys) => ShardResponse::StringArray(keys),
783            Err(_) => ShardResponse::WrongType,
784        },
785        ShardRequest::HVals { key } => match ks.hvals(key) {
786            Ok(vals) => ShardResponse::Array(vals),
787            Err(_) => ShardResponse::WrongType,
788        },
789        ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
790            Ok(vals) => ShardResponse::OptionalArray(vals),
791            Err(_) => ShardResponse::WrongType,
792        },
793        ShardRequest::SAdd { key, members } => match ks.sadd(key, members) {
794            Ok(count) => ShardResponse::Len(count),
795            Err(WriteError::WrongType) => ShardResponse::WrongType,
796            Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
797        },
798        ShardRequest::SRem { key, members } => match ks.srem(key, members) {
799            Ok(count) => ShardResponse::Len(count),
800            Err(_) => ShardResponse::WrongType,
801        },
802        ShardRequest::SMembers { key } => match ks.smembers(key) {
803            Ok(members) => ShardResponse::StringArray(members),
804            Err(_) => ShardResponse::WrongType,
805        },
806        ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
807            Ok(exists) => ShardResponse::Bool(exists),
808            Err(_) => ShardResponse::WrongType,
809        },
810        ShardRequest::SCard { key } => match ks.scard(key) {
811            Ok(count) => ShardResponse::Len(count),
812            Err(_) => ShardResponse::WrongType,
813        },
814        ShardRequest::CountKeysInSlot { slot } => {
815            ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
816        }
817        ShardRequest::GetKeysInSlot { slot, count } => {
818            ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
819        }
820        // snapshot/rewrite/flush_async are handled in the main loop, not here
821        ShardRequest::Snapshot | ShardRequest::RewriteAof | ShardRequest::FlushDbAsync => {
822            ShardResponse::Ok
823        }
824    }
825}
826
827/// Converts a successful mutation request+response pair into an AOF record.
828/// Returns None for non-mutation requests or failed mutations.
829fn to_aof_record(req: &ShardRequest, resp: &ShardResponse) -> Option<AofRecord> {
830    match (req, resp) {
831        (
832            ShardRequest::Set {
833                key, value, expire, ..
834            },
835            ShardResponse::Ok,
836        ) => {
837            let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
838            Some(AofRecord::Set {
839                key: key.clone(),
840                value: value.clone(),
841                expire_ms,
842            })
843        }
844        (ShardRequest::Del { key }, ShardResponse::Bool(true))
845        | (ShardRequest::Unlink { key }, ShardResponse::Bool(true)) => {
846            Some(AofRecord::Del { key: key.clone() })
847        }
848        (ShardRequest::Expire { key, seconds }, ShardResponse::Bool(true)) => {
849            Some(AofRecord::Expire {
850                key: key.clone(),
851                seconds: *seconds,
852            })
853        }
854        (ShardRequest::LPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::LPush {
855            key: key.clone(),
856            values: values.clone(),
857        }),
858        (ShardRequest::RPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::RPush {
859            key: key.clone(),
860            values: values.clone(),
861        }),
862        (ShardRequest::LPop { key }, ShardResponse::Value(Some(_))) => {
863            Some(AofRecord::LPop { key: key.clone() })
864        }
865        (ShardRequest::RPop { key }, ShardResponse::Value(Some(_))) => {
866            Some(AofRecord::RPop { key: key.clone() })
867        }
868        (ShardRequest::ZAdd { key, .. }, ShardResponse::ZAddLen { applied, .. })
869            if !applied.is_empty() =>
870        {
871            Some(AofRecord::ZAdd {
872                key: key.clone(),
873                members: applied.clone(),
874            })
875        }
876        (ShardRequest::ZRem { key, .. }, ShardResponse::ZRemLen { removed, .. })
877            if !removed.is_empty() =>
878        {
879            Some(AofRecord::ZRem {
880                key: key.clone(),
881                members: removed.clone(),
882            })
883        }
884        (ShardRequest::Incr { key }, ShardResponse::Integer(_)) => {
885            Some(AofRecord::Incr { key: key.clone() })
886        }
887        (ShardRequest::Decr { key }, ShardResponse::Integer(_)) => {
888            Some(AofRecord::Decr { key: key.clone() })
889        }
890        (ShardRequest::IncrBy { key, delta }, ShardResponse::Integer(_)) => {
891            Some(AofRecord::IncrBy {
892                key: key.clone(),
893                delta: *delta,
894            })
895        }
896        (ShardRequest::DecrBy { key, delta }, ShardResponse::Integer(_)) => {
897            Some(AofRecord::DecrBy {
898                key: key.clone(),
899                delta: *delta,
900            })
901        }
902        // INCRBYFLOAT: record as a SET with the resulting value to avoid
903        // float rounding drift during replay.
904        (ShardRequest::IncrByFloat { key, .. }, ShardResponse::BulkString(val)) => {
905            Some(AofRecord::Set {
906                key: key.clone(),
907                value: Bytes::from(val.clone()),
908                expire_ms: -1,
909            })
910        }
911        // APPEND: record the appended value for replay
912        (ShardRequest::Append { key, value }, ShardResponse::Len(_)) => Some(AofRecord::Append {
913            key: key.clone(),
914            value: value.clone(),
915        }),
916        (ShardRequest::Rename { key, newkey }, ShardResponse::Ok) => Some(AofRecord::Rename {
917            key: key.clone(),
918            newkey: newkey.clone(),
919        }),
920        (ShardRequest::Persist { key }, ShardResponse::Bool(true)) => {
921            Some(AofRecord::Persist { key: key.clone() })
922        }
923        (ShardRequest::Pexpire { key, milliseconds }, ShardResponse::Bool(true)) => {
924            Some(AofRecord::Pexpire {
925                key: key.clone(),
926                milliseconds: *milliseconds,
927            })
928        }
929        // Hash commands
930        (ShardRequest::HSet { key, fields }, ShardResponse::Len(_)) => Some(AofRecord::HSet {
931            key: key.clone(),
932            fields: fields.clone(),
933        }),
934        (ShardRequest::HDel { key, .. }, ShardResponse::HDelLen { removed, .. })
935            if !removed.is_empty() =>
936        {
937            Some(AofRecord::HDel {
938                key: key.clone(),
939                fields: removed.clone(),
940            })
941        }
942        (ShardRequest::HIncrBy { key, field, delta }, ShardResponse::Integer(_)) => {
943            Some(AofRecord::HIncrBy {
944                key: key.clone(),
945                field: field.clone(),
946                delta: *delta,
947            })
948        }
949        // Set commands
950        (ShardRequest::SAdd { key, members }, ShardResponse::Len(count)) if *count > 0 => {
951            Some(AofRecord::SAdd {
952                key: key.clone(),
953                members: members.clone(),
954            })
955        }
956        (ShardRequest::SRem { key, members }, ShardResponse::Len(count)) if *count > 0 => {
957            Some(AofRecord::SRem {
958                key: key.clone(),
959                members: members.clone(),
960            })
961        }
962        _ => None,
963    }
964}
965
966/// Writes a snapshot of the current keyspace.
967fn handle_snapshot(
968    keyspace: &Keyspace,
969    persistence: &Option<ShardPersistenceConfig>,
970    shard_id: u16,
971) -> ShardResponse {
972    let pcfg = match persistence {
973        Some(p) => p,
974        None => return ShardResponse::Err("persistence not configured".into()),
975    };
976
977    let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
978    let result = write_snapshot(
979        keyspace,
980        &path,
981        shard_id,
982        #[cfg(feature = "encryption")]
983        pcfg.encryption_key.as_ref(),
984    );
985    match result {
986        Ok(count) => {
987            info!(shard_id, entries = count, "snapshot written");
988            ShardResponse::Ok
989        }
990        Err(e) => {
991            warn!(shard_id, "snapshot failed: {e}");
992            ShardResponse::Err(format!("snapshot failed: {e}"))
993        }
994    }
995}
996
997/// Writes a snapshot and then truncates the AOF.
998fn handle_rewrite(
999    keyspace: &Keyspace,
1000    persistence: &Option<ShardPersistenceConfig>,
1001    aof_writer: &mut Option<AofWriter>,
1002    shard_id: u16,
1003) -> ShardResponse {
1004    let pcfg = match persistence {
1005        Some(p) => p,
1006        None => return ShardResponse::Err("persistence not configured".into()),
1007    };
1008
1009    let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1010    let result = write_snapshot(
1011        keyspace,
1012        &path,
1013        shard_id,
1014        #[cfg(feature = "encryption")]
1015        pcfg.encryption_key.as_ref(),
1016    );
1017    match result {
1018        Ok(count) => {
1019            // truncate AOF after successful snapshot
1020            if let Some(ref mut writer) = aof_writer {
1021                if let Err(e) = writer.truncate() {
1022                    warn!(shard_id, "aof truncate after rewrite failed: {e}");
1023                }
1024            }
1025            info!(shard_id, entries = count, "aof rewrite complete");
1026            ShardResponse::Ok
1027        }
1028        Err(e) => {
1029            warn!(shard_id, "aof rewrite failed: {e}");
1030            ShardResponse::Err(format!("rewrite failed: {e}"))
1031        }
1032    }
1033}
1034
1035/// Iterates the keyspace and writes all live entries to a snapshot file.
1036fn write_snapshot(
1037    keyspace: &Keyspace,
1038    path: &std::path::Path,
1039    shard_id: u16,
1040    #[cfg(feature = "encryption")] encryption_key: Option<
1041        &ember_persistence::encryption::EncryptionKey,
1042    >,
1043) -> Result<u32, ember_persistence::format::FormatError> {
1044    #[cfg(feature = "encryption")]
1045    let mut writer = if let Some(key) = encryption_key {
1046        SnapshotWriter::create_encrypted(path, shard_id, key.clone())?
1047    } else {
1048        SnapshotWriter::create(path, shard_id)?
1049    };
1050    #[cfg(not(feature = "encryption"))]
1051    let mut writer = SnapshotWriter::create(path, shard_id)?;
1052    let mut count = 0u32;
1053
1054    for (key, value, ttl_ms) in keyspace.iter_entries() {
1055        let snap_value = match value {
1056            Value::String(data) => SnapValue::String(data.clone()),
1057            Value::List(deque) => SnapValue::List(deque.clone()),
1058            Value::SortedSet(ss) => {
1059                let members: Vec<(f64, String)> = ss
1060                    .iter()
1061                    .map(|(member, score)| (score, member.to_owned()))
1062                    .collect();
1063                SnapValue::SortedSet(members)
1064            }
1065            Value::Hash(map) => SnapValue::Hash(map.clone()),
1066            Value::Set(set) => SnapValue::Set(set.clone()),
1067        };
1068        writer.write_entry(&SnapEntry {
1069            key: key.to_owned(),
1070            value: snap_value,
1071            expire_ms: ttl_ms,
1072        })?;
1073        count += 1;
1074    }
1075
1076    writer.finish()?;
1077    Ok(count)
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083
1084    #[test]
1085    fn dispatch_set_and_get() {
1086        let mut ks = Keyspace::new();
1087
1088        let resp = dispatch(
1089            &mut ks,
1090            &ShardRequest::Set {
1091                key: "k".into(),
1092                value: Bytes::from("v"),
1093                expire: None,
1094                nx: false,
1095                xx: false,
1096            },
1097        );
1098        assert!(matches!(resp, ShardResponse::Ok));
1099
1100        let resp = dispatch(&mut ks, &ShardRequest::Get { key: "k".into() });
1101        match resp {
1102            ShardResponse::Value(Some(Value::String(data))) => {
1103                assert_eq!(data, Bytes::from("v"));
1104            }
1105            other => panic!("expected Value(Some(String)), got {other:?}"),
1106        }
1107    }
1108
1109    #[test]
1110    fn dispatch_get_missing() {
1111        let mut ks = Keyspace::new();
1112        let resp = dispatch(&mut ks, &ShardRequest::Get { key: "nope".into() });
1113        assert!(matches!(resp, ShardResponse::Value(None)));
1114    }
1115
1116    #[test]
1117    fn dispatch_del() {
1118        let mut ks = Keyspace::new();
1119        ks.set("key".into(), Bytes::from("val"), None);
1120
1121        let resp = dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1122        assert!(matches!(resp, ShardResponse::Bool(true)));
1123
1124        let resp = dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1125        assert!(matches!(resp, ShardResponse::Bool(false)));
1126    }
1127
1128    #[test]
1129    fn dispatch_exists() {
1130        let mut ks = Keyspace::new();
1131        ks.set("yes".into(), Bytes::from("here"), None);
1132
1133        let resp = dispatch(&mut ks, &ShardRequest::Exists { key: "yes".into() });
1134        assert!(matches!(resp, ShardResponse::Bool(true)));
1135
1136        let resp = dispatch(&mut ks, &ShardRequest::Exists { key: "no".into() });
1137        assert!(matches!(resp, ShardResponse::Bool(false)));
1138    }
1139
1140    #[test]
1141    fn dispatch_expire_and_ttl() {
1142        let mut ks = Keyspace::new();
1143        ks.set("key".into(), Bytes::from("val"), None);
1144
1145        let resp = dispatch(
1146            &mut ks,
1147            &ShardRequest::Expire {
1148                key: "key".into(),
1149                seconds: 60,
1150            },
1151        );
1152        assert!(matches!(resp, ShardResponse::Bool(true)));
1153
1154        let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1155        match resp {
1156            ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
1157            other => panic!("expected Ttl(Seconds), got {other:?}"),
1158        }
1159    }
1160
1161    #[test]
1162    fn dispatch_ttl_missing() {
1163        let mut ks = Keyspace::new();
1164        let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "gone".into() });
1165        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1166    }
1167
1168    #[tokio::test]
1169    async fn shard_round_trip() {
1170        let handle = spawn_shard(16, ShardConfig::default(), None, None);
1171
1172        let resp = handle
1173            .send(ShardRequest::Set {
1174                key: "hello".into(),
1175                value: Bytes::from("world"),
1176                expire: None,
1177                nx: false,
1178                xx: false,
1179            })
1180            .await
1181            .unwrap();
1182        assert!(matches!(resp, ShardResponse::Ok));
1183
1184        let resp = handle
1185            .send(ShardRequest::Get {
1186                key: "hello".into(),
1187            })
1188            .await
1189            .unwrap();
1190        match resp {
1191            ShardResponse::Value(Some(Value::String(data))) => {
1192                assert_eq!(data, Bytes::from("world"));
1193            }
1194            other => panic!("expected Value(Some(String)), got {other:?}"),
1195        }
1196    }
1197
1198    #[tokio::test]
1199    async fn expired_key_through_shard() {
1200        let handle = spawn_shard(16, ShardConfig::default(), None, None);
1201
1202        handle
1203            .send(ShardRequest::Set {
1204                key: "temp".into(),
1205                value: Bytes::from("gone"),
1206                expire: Some(Duration::from_millis(10)),
1207                nx: false,
1208                xx: false,
1209            })
1210            .await
1211            .unwrap();
1212
1213        tokio::time::sleep(Duration::from_millis(30)).await;
1214
1215        let resp = handle
1216            .send(ShardRequest::Get { key: "temp".into() })
1217            .await
1218            .unwrap();
1219        assert!(matches!(resp, ShardResponse::Value(None)));
1220    }
1221
1222    #[tokio::test]
1223    async fn active_expiration_cleans_up_without_access() {
1224        let handle = spawn_shard(16, ShardConfig::default(), None, None);
1225
1226        // set a key with a short TTL
1227        handle
1228            .send(ShardRequest::Set {
1229                key: "ephemeral".into(),
1230                value: Bytes::from("temp"),
1231                expire: Some(Duration::from_millis(10)),
1232                nx: false,
1233                xx: false,
1234            })
1235            .await
1236            .unwrap();
1237
1238        // also set a persistent key
1239        handle
1240            .send(ShardRequest::Set {
1241                key: "persistent".into(),
1242                value: Bytes::from("stays"),
1243                expire: None,
1244                nx: false,
1245                xx: false,
1246            })
1247            .await
1248            .unwrap();
1249
1250        // wait long enough for the TTL to expire AND for the background
1251        // tick to fire (100ms interval + some slack)
1252        tokio::time::sleep(Duration::from_millis(250)).await;
1253
1254        // the ephemeral key should be gone even though we never accessed it
1255        let resp = handle
1256            .send(ShardRequest::Exists {
1257                key: "ephemeral".into(),
1258            })
1259            .await
1260            .unwrap();
1261        assert!(matches!(resp, ShardResponse::Bool(false)));
1262
1263        // the persistent key should still be there
1264        let resp = handle
1265            .send(ShardRequest::Exists {
1266                key: "persistent".into(),
1267            })
1268            .await
1269            .unwrap();
1270        assert!(matches!(resp, ShardResponse::Bool(true)));
1271    }
1272
1273    #[tokio::test]
1274    async fn shard_with_persistence_snapshot_and_recovery() {
1275        let dir = tempfile::tempdir().unwrap();
1276        let pcfg = ShardPersistenceConfig {
1277            data_dir: dir.path().to_owned(),
1278            append_only: true,
1279            fsync_policy: FsyncPolicy::Always,
1280            #[cfg(feature = "encryption")]
1281            encryption_key: None,
1282        };
1283        let config = ShardConfig {
1284            shard_id: 0,
1285            ..ShardConfig::default()
1286        };
1287
1288        // write some keys then trigger a snapshot
1289        {
1290            let handle = spawn_shard(16, config.clone(), Some(pcfg.clone()), None);
1291            handle
1292                .send(ShardRequest::Set {
1293                    key: "a".into(),
1294                    value: Bytes::from("1"),
1295                    expire: None,
1296                    nx: false,
1297                    xx: false,
1298                })
1299                .await
1300                .unwrap();
1301            handle
1302                .send(ShardRequest::Set {
1303                    key: "b".into(),
1304                    value: Bytes::from("2"),
1305                    expire: Some(Duration::from_secs(300)),
1306                    nx: false,
1307                    xx: false,
1308                })
1309                .await
1310                .unwrap();
1311            handle.send(ShardRequest::Snapshot).await.unwrap();
1312            // write one more key that goes only to AOF
1313            handle
1314                .send(ShardRequest::Set {
1315                    key: "c".into(),
1316                    value: Bytes::from("3"),
1317                    expire: None,
1318                    nx: false,
1319                    xx: false,
1320                })
1321                .await
1322                .unwrap();
1323            // drop handle to shut down shard
1324        }
1325
1326        // give it a moment to flush
1327        tokio::time::sleep(Duration::from_millis(50)).await;
1328
1329        // start a new shard with the same config — should recover
1330        {
1331            let handle = spawn_shard(16, config, Some(pcfg), None);
1332            // give it a moment to recover
1333            tokio::time::sleep(Duration::from_millis(50)).await;
1334
1335            let resp = handle
1336                .send(ShardRequest::Get { key: "a".into() })
1337                .await
1338                .unwrap();
1339            match resp {
1340                ShardResponse::Value(Some(Value::String(data))) => {
1341                    assert_eq!(data, Bytes::from("1"));
1342                }
1343                other => panic!("expected a=1, got {other:?}"),
1344            }
1345
1346            let resp = handle
1347                .send(ShardRequest::Get { key: "b".into() })
1348                .await
1349                .unwrap();
1350            assert!(matches!(resp, ShardResponse::Value(Some(_))));
1351
1352            let resp = handle
1353                .send(ShardRequest::Get { key: "c".into() })
1354                .await
1355                .unwrap();
1356            match resp {
1357                ShardResponse::Value(Some(Value::String(data))) => {
1358                    assert_eq!(data, Bytes::from("3"));
1359                }
1360                other => panic!("expected c=3, got {other:?}"),
1361            }
1362        }
1363    }
1364
1365    #[test]
1366    fn to_aof_record_for_set() {
1367        let req = ShardRequest::Set {
1368            key: "k".into(),
1369            value: Bytes::from("v"),
1370            expire: Some(Duration::from_secs(60)),
1371            nx: false,
1372            xx: false,
1373        };
1374        let resp = ShardResponse::Ok;
1375        let record = to_aof_record(&req, &resp).unwrap();
1376        match record {
1377            AofRecord::Set { key, expire_ms, .. } => {
1378                assert_eq!(key, "k");
1379                assert_eq!(expire_ms, 60_000);
1380            }
1381            other => panic!("expected Set, got {other:?}"),
1382        }
1383    }
1384
1385    #[test]
1386    fn to_aof_record_skips_failed_set() {
1387        let req = ShardRequest::Set {
1388            key: "k".into(),
1389            value: Bytes::from("v"),
1390            expire: None,
1391            nx: false,
1392            xx: false,
1393        };
1394        let resp = ShardResponse::OutOfMemory;
1395        assert!(to_aof_record(&req, &resp).is_none());
1396    }
1397
1398    #[test]
1399    fn to_aof_record_for_del() {
1400        let req = ShardRequest::Del { key: "k".into() };
1401        let resp = ShardResponse::Bool(true);
1402        let record = to_aof_record(&req, &resp).unwrap();
1403        assert!(matches!(record, AofRecord::Del { .. }));
1404    }
1405
1406    #[test]
1407    fn to_aof_record_skips_failed_del() {
1408        let req = ShardRequest::Del { key: "k".into() };
1409        let resp = ShardResponse::Bool(false);
1410        assert!(to_aof_record(&req, &resp).is_none());
1411    }
1412
1413    #[test]
1414    fn dispatch_incr_new_key() {
1415        let mut ks = Keyspace::new();
1416        let resp = dispatch(&mut ks, &ShardRequest::Incr { key: "c".into() });
1417        assert!(matches!(resp, ShardResponse::Integer(1)));
1418    }
1419
1420    #[test]
1421    fn dispatch_decr_existing() {
1422        let mut ks = Keyspace::new();
1423        ks.set("n".into(), Bytes::from("10"), None);
1424        let resp = dispatch(&mut ks, &ShardRequest::Decr { key: "n".into() });
1425        assert!(matches!(resp, ShardResponse::Integer(9)));
1426    }
1427
1428    #[test]
1429    fn dispatch_incr_non_integer() {
1430        let mut ks = Keyspace::new();
1431        ks.set("s".into(), Bytes::from("hello"), None);
1432        let resp = dispatch(&mut ks, &ShardRequest::Incr { key: "s".into() });
1433        assert!(matches!(resp, ShardResponse::Err(_)));
1434    }
1435
1436    #[test]
1437    fn dispatch_incrby() {
1438        let mut ks = Keyspace::new();
1439        ks.set("n".into(), Bytes::from("10"), None);
1440        let resp = dispatch(
1441            &mut ks,
1442            &ShardRequest::IncrBy {
1443                key: "n".into(),
1444                delta: 5,
1445            },
1446        );
1447        assert!(matches!(resp, ShardResponse::Integer(15)));
1448    }
1449
1450    #[test]
1451    fn dispatch_decrby() {
1452        let mut ks = Keyspace::new();
1453        ks.set("n".into(), Bytes::from("10"), None);
1454        let resp = dispatch(
1455            &mut ks,
1456            &ShardRequest::DecrBy {
1457                key: "n".into(),
1458                delta: 3,
1459            },
1460        );
1461        assert!(matches!(resp, ShardResponse::Integer(7)));
1462    }
1463
1464    #[test]
1465    fn dispatch_incrby_new_key() {
1466        let mut ks = Keyspace::new();
1467        let resp = dispatch(
1468            &mut ks,
1469            &ShardRequest::IncrBy {
1470                key: "new".into(),
1471                delta: 42,
1472            },
1473        );
1474        assert!(matches!(resp, ShardResponse::Integer(42)));
1475    }
1476
1477    #[test]
1478    fn dispatch_incrbyfloat() {
1479        let mut ks = Keyspace::new();
1480        ks.set("n".into(), Bytes::from("10.5"), None);
1481        let resp = dispatch(
1482            &mut ks,
1483            &ShardRequest::IncrByFloat {
1484                key: "n".into(),
1485                delta: 2.3,
1486            },
1487        );
1488        match resp {
1489            ShardResponse::BulkString(val) => {
1490                let f: f64 = val.parse().unwrap();
1491                assert!((f - 12.8).abs() < 0.001);
1492            }
1493            other => panic!("expected BulkString, got {other:?}"),
1494        }
1495    }
1496
1497    #[test]
1498    fn dispatch_append() {
1499        let mut ks = Keyspace::new();
1500        ks.set("k".into(), Bytes::from("hello"), None);
1501        let resp = dispatch(
1502            &mut ks,
1503            &ShardRequest::Append {
1504                key: "k".into(),
1505                value: Bytes::from(" world"),
1506            },
1507        );
1508        assert!(matches!(resp, ShardResponse::Len(11)));
1509    }
1510
1511    #[test]
1512    fn dispatch_strlen() {
1513        let mut ks = Keyspace::new();
1514        ks.set("k".into(), Bytes::from("hello"), None);
1515        let resp = dispatch(&mut ks, &ShardRequest::Strlen { key: "k".into() });
1516        assert!(matches!(resp, ShardResponse::Len(5)));
1517    }
1518
1519    #[test]
1520    fn dispatch_strlen_missing() {
1521        let mut ks = Keyspace::new();
1522        let resp = dispatch(&mut ks, &ShardRequest::Strlen { key: "nope".into() });
1523        assert!(matches!(resp, ShardResponse::Len(0)));
1524    }
1525
1526    #[test]
1527    fn to_aof_record_for_append() {
1528        let req = ShardRequest::Append {
1529            key: "k".into(),
1530            value: Bytes::from("data"),
1531        };
1532        let resp = ShardResponse::Len(10);
1533        let record = to_aof_record(&req, &resp).unwrap();
1534        match record {
1535            AofRecord::Append { key, value } => {
1536                assert_eq!(key, "k");
1537                assert_eq!(value, Bytes::from("data"));
1538            }
1539            other => panic!("expected Append, got {other:?}"),
1540        }
1541    }
1542
1543    #[test]
1544    fn dispatch_incrbyfloat_new_key() {
1545        let mut ks = Keyspace::new();
1546        let resp = dispatch(
1547            &mut ks,
1548            &ShardRequest::IncrByFloat {
1549                key: "new".into(),
1550                delta: 2.72,
1551            },
1552        );
1553        match resp {
1554            ShardResponse::BulkString(val) => {
1555                let f: f64 = val.parse().unwrap();
1556                assert!((f - 2.72).abs() < 0.001);
1557            }
1558            other => panic!("expected BulkString, got {other:?}"),
1559        }
1560    }
1561
1562    #[test]
1563    fn to_aof_record_for_incr() {
1564        let req = ShardRequest::Incr { key: "c".into() };
1565        let resp = ShardResponse::Integer(1);
1566        let record = to_aof_record(&req, &resp).unwrap();
1567        assert!(matches!(record, AofRecord::Incr { .. }));
1568    }
1569
1570    #[test]
1571    fn to_aof_record_for_decr() {
1572        let req = ShardRequest::Decr { key: "c".into() };
1573        let resp = ShardResponse::Integer(-1);
1574        let record = to_aof_record(&req, &resp).unwrap();
1575        assert!(matches!(record, AofRecord::Decr { .. }));
1576    }
1577
1578    #[test]
1579    fn to_aof_record_for_incrby() {
1580        let req = ShardRequest::IncrBy {
1581            key: "c".into(),
1582            delta: 5,
1583        };
1584        let resp = ShardResponse::Integer(15);
1585        let record = to_aof_record(&req, &resp).unwrap();
1586        match record {
1587            AofRecord::IncrBy { key, delta } => {
1588                assert_eq!(key, "c");
1589                assert_eq!(delta, 5);
1590            }
1591            other => panic!("expected IncrBy, got {other:?}"),
1592        }
1593    }
1594
1595    #[test]
1596    fn to_aof_record_for_decrby() {
1597        let req = ShardRequest::DecrBy {
1598            key: "c".into(),
1599            delta: 3,
1600        };
1601        let resp = ShardResponse::Integer(7);
1602        let record = to_aof_record(&req, &resp).unwrap();
1603        match record {
1604            AofRecord::DecrBy { key, delta } => {
1605                assert_eq!(key, "c");
1606                assert_eq!(delta, 3);
1607            }
1608            other => panic!("expected DecrBy, got {other:?}"),
1609        }
1610    }
1611
1612    #[test]
1613    fn dispatch_persist_removes_ttl() {
1614        let mut ks = Keyspace::new();
1615        ks.set(
1616            "key".into(),
1617            Bytes::from("val"),
1618            Some(Duration::from_secs(60)),
1619        );
1620
1621        let resp = dispatch(&mut ks, &ShardRequest::Persist { key: "key".into() });
1622        assert!(matches!(resp, ShardResponse::Bool(true)));
1623
1624        let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1625        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
1626    }
1627
1628    #[test]
1629    fn dispatch_persist_missing_key() {
1630        let mut ks = Keyspace::new();
1631        let resp = dispatch(&mut ks, &ShardRequest::Persist { key: "nope".into() });
1632        assert!(matches!(resp, ShardResponse::Bool(false)));
1633    }
1634
1635    #[test]
1636    fn dispatch_pttl() {
1637        let mut ks = Keyspace::new();
1638        ks.set(
1639            "key".into(),
1640            Bytes::from("val"),
1641            Some(Duration::from_secs(60)),
1642        );
1643
1644        let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1645        match resp {
1646            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1647                assert!(ms > 59_000 && ms <= 60_000);
1648            }
1649            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1650        }
1651    }
1652
1653    #[test]
1654    fn dispatch_pttl_missing() {
1655        let mut ks = Keyspace::new();
1656        let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "nope".into() });
1657        assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1658    }
1659
1660    #[test]
1661    fn dispatch_pexpire() {
1662        let mut ks = Keyspace::new();
1663        ks.set("key".into(), Bytes::from("val"), None);
1664
1665        let resp = dispatch(
1666            &mut ks,
1667            &ShardRequest::Pexpire {
1668                key: "key".into(),
1669                milliseconds: 5000,
1670            },
1671        );
1672        assert!(matches!(resp, ShardResponse::Bool(true)));
1673
1674        let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1675        match resp {
1676            ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1677                assert!(ms > 4000 && ms <= 5000);
1678            }
1679            other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1680        }
1681    }
1682
1683    #[test]
1684    fn to_aof_record_for_persist() {
1685        let req = ShardRequest::Persist { key: "k".into() };
1686        let resp = ShardResponse::Bool(true);
1687        let record = to_aof_record(&req, &resp).unwrap();
1688        assert!(matches!(record, AofRecord::Persist { .. }));
1689    }
1690
1691    #[test]
1692    fn to_aof_record_skips_failed_persist() {
1693        let req = ShardRequest::Persist { key: "k".into() };
1694        let resp = ShardResponse::Bool(false);
1695        assert!(to_aof_record(&req, &resp).is_none());
1696    }
1697
1698    #[test]
1699    fn to_aof_record_for_pexpire() {
1700        let req = ShardRequest::Pexpire {
1701            key: "k".into(),
1702            milliseconds: 5000,
1703        };
1704        let resp = ShardResponse::Bool(true);
1705        let record = to_aof_record(&req, &resp).unwrap();
1706        match record {
1707            AofRecord::Pexpire { key, milliseconds } => {
1708                assert_eq!(key, "k");
1709                assert_eq!(milliseconds, 5000);
1710            }
1711            other => panic!("expected Pexpire, got {other:?}"),
1712        }
1713    }
1714
1715    #[test]
1716    fn to_aof_record_skips_failed_pexpire() {
1717        let req = ShardRequest::Pexpire {
1718            key: "k".into(),
1719            milliseconds: 5000,
1720        };
1721        let resp = ShardResponse::Bool(false);
1722        assert!(to_aof_record(&req, &resp).is_none());
1723    }
1724
1725    #[test]
1726    fn dispatch_set_nx_when_key_missing() {
1727        let mut ks = Keyspace::new();
1728        let resp = dispatch(
1729            &mut ks,
1730            &ShardRequest::Set {
1731                key: "k".into(),
1732                value: Bytes::from("v"),
1733                expire: None,
1734                nx: true,
1735                xx: false,
1736            },
1737        );
1738        assert!(matches!(resp, ShardResponse::Ok));
1739        assert!(ks.exists("k"));
1740    }
1741
1742    #[test]
1743    fn dispatch_set_nx_when_key_exists() {
1744        let mut ks = Keyspace::new();
1745        ks.set("k".into(), Bytes::from("old"), None);
1746
1747        let resp = dispatch(
1748            &mut ks,
1749            &ShardRequest::Set {
1750                key: "k".into(),
1751                value: Bytes::from("new"),
1752                expire: None,
1753                nx: true,
1754                xx: false,
1755            },
1756        );
1757        // NX should block — returns nil
1758        assert!(matches!(resp, ShardResponse::Value(None)));
1759        // original value should remain
1760        match ks.get("k").unwrap() {
1761            Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
1762            other => panic!("expected old value, got {other:?}"),
1763        }
1764    }
1765
1766    #[test]
1767    fn dispatch_set_xx_when_key_exists() {
1768        let mut ks = Keyspace::new();
1769        ks.set("k".into(), Bytes::from("old"), None);
1770
1771        let resp = dispatch(
1772            &mut ks,
1773            &ShardRequest::Set {
1774                key: "k".into(),
1775                value: Bytes::from("new"),
1776                expire: None,
1777                nx: false,
1778                xx: true,
1779            },
1780        );
1781        assert!(matches!(resp, ShardResponse::Ok));
1782        match ks.get("k").unwrap() {
1783            Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
1784            other => panic!("expected new value, got {other:?}"),
1785        }
1786    }
1787
1788    #[test]
1789    fn dispatch_set_xx_when_key_missing() {
1790        let mut ks = Keyspace::new();
1791        let resp = dispatch(
1792            &mut ks,
1793            &ShardRequest::Set {
1794                key: "k".into(),
1795                value: Bytes::from("v"),
1796                expire: None,
1797                nx: false,
1798                xx: true,
1799            },
1800        );
1801        // XX should block — returns nil
1802        assert!(matches!(resp, ShardResponse::Value(None)));
1803        assert!(!ks.exists("k"));
1804    }
1805
1806    #[test]
1807    fn to_aof_record_skips_nx_blocked_set() {
1808        let req = ShardRequest::Set {
1809            key: "k".into(),
1810            value: Bytes::from("v"),
1811            expire: None,
1812            nx: true,
1813            xx: false,
1814        };
1815        // when NX blocks, the shard returns Value(None), not Ok
1816        let resp = ShardResponse::Value(None);
1817        assert!(to_aof_record(&req, &resp).is_none());
1818    }
1819
1820    #[test]
1821    fn dispatch_flushdb_clears_all_keys() {
1822        let mut ks = Keyspace::new();
1823        ks.set("a".into(), Bytes::from("1"), None);
1824        ks.set("b".into(), Bytes::from("2"), None);
1825
1826        assert_eq!(ks.len(), 2);
1827
1828        let resp = dispatch(&mut ks, &ShardRequest::FlushDb);
1829        assert!(matches!(resp, ShardResponse::Ok));
1830        assert_eq!(ks.len(), 0);
1831    }
1832
1833    #[test]
1834    fn dispatch_scan_returns_keys() {
1835        let mut ks = Keyspace::new();
1836        ks.set("user:1".into(), Bytes::from("a"), None);
1837        ks.set("user:2".into(), Bytes::from("b"), None);
1838        ks.set("item:1".into(), Bytes::from("c"), None);
1839
1840        let resp = dispatch(
1841            &mut ks,
1842            &ShardRequest::Scan {
1843                cursor: 0,
1844                count: 10,
1845                pattern: None,
1846            },
1847        );
1848
1849        match resp {
1850            ShardResponse::Scan { cursor, keys } => {
1851                assert_eq!(cursor, 0); // complete in one pass
1852                assert_eq!(keys.len(), 3);
1853            }
1854            _ => panic!("expected Scan response"),
1855        }
1856    }
1857
1858    #[test]
1859    fn dispatch_scan_with_pattern() {
1860        let mut ks = Keyspace::new();
1861        ks.set("user:1".into(), Bytes::from("a"), None);
1862        ks.set("user:2".into(), Bytes::from("b"), None);
1863        ks.set("item:1".into(), Bytes::from("c"), None);
1864
1865        let resp = dispatch(
1866            &mut ks,
1867            &ShardRequest::Scan {
1868                cursor: 0,
1869                count: 10,
1870                pattern: Some("user:*".into()),
1871            },
1872        );
1873
1874        match resp {
1875            ShardResponse::Scan { cursor, keys } => {
1876                assert_eq!(cursor, 0);
1877                assert_eq!(keys.len(), 2);
1878                for k in &keys {
1879                    assert!(k.starts_with("user:"));
1880                }
1881            }
1882            _ => panic!("expected Scan response"),
1883        }
1884    }
1885
1886    #[test]
1887    fn to_aof_record_for_hset() {
1888        let req = ShardRequest::HSet {
1889            key: "h".into(),
1890            fields: vec![("f1".into(), Bytes::from("v1"))],
1891        };
1892        let resp = ShardResponse::Len(1);
1893        let record = to_aof_record(&req, &resp).unwrap();
1894        match record {
1895            AofRecord::HSet { key, fields } => {
1896                assert_eq!(key, "h");
1897                assert_eq!(fields.len(), 1);
1898            }
1899            _ => panic!("expected HSet record"),
1900        }
1901    }
1902
1903    #[test]
1904    fn to_aof_record_for_hdel() {
1905        let req = ShardRequest::HDel {
1906            key: "h".into(),
1907            fields: vec!["f1".into(), "f2".into()],
1908        };
1909        let resp = ShardResponse::HDelLen {
1910            count: 2,
1911            removed: vec!["f1".into(), "f2".into()],
1912        };
1913        let record = to_aof_record(&req, &resp).unwrap();
1914        match record {
1915            AofRecord::HDel { key, fields } => {
1916                assert_eq!(key, "h");
1917                assert_eq!(fields.len(), 2);
1918            }
1919            _ => panic!("expected HDel record"),
1920        }
1921    }
1922
1923    #[test]
1924    fn to_aof_record_skips_hdel_when_none_removed() {
1925        let req = ShardRequest::HDel {
1926            key: "h".into(),
1927            fields: vec!["f1".into()],
1928        };
1929        let resp = ShardResponse::HDelLen {
1930            count: 0,
1931            removed: vec![],
1932        };
1933        assert!(to_aof_record(&req, &resp).is_none());
1934    }
1935
1936    #[test]
1937    fn to_aof_record_for_hincrby() {
1938        let req = ShardRequest::HIncrBy {
1939            key: "h".into(),
1940            field: "counter".into(),
1941            delta: 5,
1942        };
1943        let resp = ShardResponse::Integer(10);
1944        let record = to_aof_record(&req, &resp).unwrap();
1945        match record {
1946            AofRecord::HIncrBy { key, field, delta } => {
1947                assert_eq!(key, "h");
1948                assert_eq!(field, "counter");
1949                assert_eq!(delta, 5);
1950            }
1951            _ => panic!("expected HIncrBy record"),
1952        }
1953    }
1954
1955    #[test]
1956    fn to_aof_record_for_sadd() {
1957        let req = ShardRequest::SAdd {
1958            key: "s".into(),
1959            members: vec!["m1".into(), "m2".into()],
1960        };
1961        let resp = ShardResponse::Len(2);
1962        let record = to_aof_record(&req, &resp).unwrap();
1963        match record {
1964            AofRecord::SAdd { key, members } => {
1965                assert_eq!(key, "s");
1966                assert_eq!(members.len(), 2);
1967            }
1968            _ => panic!("expected SAdd record"),
1969        }
1970    }
1971
1972    #[test]
1973    fn to_aof_record_skips_sadd_when_none_added() {
1974        let req = ShardRequest::SAdd {
1975            key: "s".into(),
1976            members: vec!["m1".into()],
1977        };
1978        let resp = ShardResponse::Len(0);
1979        assert!(to_aof_record(&req, &resp).is_none());
1980    }
1981
1982    #[test]
1983    fn to_aof_record_for_srem() {
1984        let req = ShardRequest::SRem {
1985            key: "s".into(),
1986            members: vec!["m1".into()],
1987        };
1988        let resp = ShardResponse::Len(1);
1989        let record = to_aof_record(&req, &resp).unwrap();
1990        match record {
1991            AofRecord::SRem { key, members } => {
1992                assert_eq!(key, "s");
1993                assert_eq!(members.len(), 1);
1994            }
1995            _ => panic!("expected SRem record"),
1996        }
1997    }
1998
1999    #[test]
2000    fn to_aof_record_skips_srem_when_none_removed() {
2001        let req = ShardRequest::SRem {
2002            key: "s".into(),
2003            members: vec!["m1".into()],
2004        };
2005        let resp = ShardResponse::Len(0);
2006        assert!(to_aof_record(&req, &resp).is_none());
2007    }
2008
2009    #[test]
2010    fn dispatch_keys() {
2011        let mut ks = Keyspace::new();
2012        ks.set("user:1".into(), Bytes::from("a"), None);
2013        ks.set("user:2".into(), Bytes::from("b"), None);
2014        ks.set("item:1".into(), Bytes::from("c"), None);
2015        let resp = dispatch(
2016            &mut ks,
2017            &ShardRequest::Keys {
2018                pattern: "user:*".into(),
2019            },
2020        );
2021        match resp {
2022            ShardResponse::StringArray(mut keys) => {
2023                keys.sort();
2024                assert_eq!(keys, vec!["user:1", "user:2"]);
2025            }
2026            other => panic!("expected StringArray, got {other:?}"),
2027        }
2028    }
2029
2030    #[test]
2031    fn dispatch_rename() {
2032        let mut ks = Keyspace::new();
2033        ks.set("old".into(), Bytes::from("value"), None);
2034        let resp = dispatch(
2035            &mut ks,
2036            &ShardRequest::Rename {
2037                key: "old".into(),
2038                newkey: "new".into(),
2039            },
2040        );
2041        assert!(matches!(resp, ShardResponse::Ok));
2042        assert!(!ks.exists("old"));
2043        assert!(ks.exists("new"));
2044    }
2045
2046    #[test]
2047    fn dispatch_rename_missing_key() {
2048        let mut ks = Keyspace::new();
2049        let resp = dispatch(
2050            &mut ks,
2051            &ShardRequest::Rename {
2052                key: "missing".into(),
2053                newkey: "new".into(),
2054            },
2055        );
2056        assert!(matches!(resp, ShardResponse::Err(_)));
2057    }
2058
2059    #[test]
2060    fn to_aof_record_for_rename() {
2061        let req = ShardRequest::Rename {
2062            key: "old".into(),
2063            newkey: "new".into(),
2064        };
2065        let resp = ShardResponse::Ok;
2066        let record = to_aof_record(&req, &resp).unwrap();
2067        match record {
2068            AofRecord::Rename { key, newkey } => {
2069                assert_eq!(key, "old");
2070                assert_eq!(newkey, "new");
2071            }
2072            other => panic!("expected Rename, got {other:?}"),
2073        }
2074    }
2075}