1use std::path::PathBuf;
9use std::time::Duration;
10
11use bytes::Bytes;
12use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
13use ember_persistence::recovery::{self, RecoveredValue};
14use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
15use tokio::sync::{mpsc, oneshot};
16use tracing::{info, warn};
17
18use crate::dropper::DropHandle;
19use crate::error::ShardError;
20use crate::expiry;
21use crate::keyspace::{
22 IncrError, IncrFloatError, Keyspace, KeyspaceStats, SetResult, ShardConfig, TtlResult,
23 WriteError,
24};
25use crate::types::sorted_set::ZAddFlags;
26use crate::types::Value;
27
28const EXPIRY_TICK: Duration = Duration::from_millis(100);
31
32const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
34
35#[derive(Debug, Clone)]
37pub struct ShardPersistenceConfig {
38 pub data_dir: PathBuf,
40 pub append_only: bool,
42 pub fsync_policy: FsyncPolicy,
44 #[cfg(feature = "encryption")]
47 pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
48}
49
50#[derive(Debug)]
52pub enum ShardRequest {
53 Get {
54 key: String,
55 },
56 Set {
57 key: String,
58 value: Bytes,
59 expire: Option<Duration>,
60 nx: bool,
62 xx: bool,
64 },
65 Incr {
66 key: String,
67 },
68 Decr {
69 key: String,
70 },
71 IncrBy {
72 key: String,
73 delta: i64,
74 },
75 DecrBy {
76 key: String,
77 delta: i64,
78 },
79 IncrByFloat {
80 key: String,
81 delta: f64,
82 },
83 Append {
84 key: String,
85 value: Bytes,
86 },
87 Strlen {
88 key: String,
89 },
90 Keys {
92 pattern: String,
93 },
94 Rename {
96 key: String,
97 newkey: String,
98 },
99 Del {
100 key: String,
101 },
102 Unlink {
104 key: String,
105 },
106 Exists {
107 key: String,
108 },
109 Expire {
110 key: String,
111 seconds: u64,
112 },
113 Ttl {
114 key: String,
115 },
116 Persist {
117 key: String,
118 },
119 Pttl {
120 key: String,
121 },
122 Pexpire {
123 key: String,
124 milliseconds: u64,
125 },
126 LPush {
127 key: String,
128 values: Vec<Bytes>,
129 },
130 RPush {
131 key: String,
132 values: Vec<Bytes>,
133 },
134 LPop {
135 key: String,
136 },
137 RPop {
138 key: String,
139 },
140 LRange {
141 key: String,
142 start: i64,
143 stop: i64,
144 },
145 LLen {
146 key: String,
147 },
148 Type {
149 key: String,
150 },
151 ZAdd {
152 key: String,
153 members: Vec<(f64, String)>,
154 nx: bool,
155 xx: bool,
156 gt: bool,
157 lt: bool,
158 ch: bool,
159 },
160 ZRem {
161 key: String,
162 members: Vec<String>,
163 },
164 ZScore {
165 key: String,
166 member: String,
167 },
168 ZRank {
169 key: String,
170 member: String,
171 },
172 ZCard {
173 key: String,
174 },
175 ZRange {
176 key: String,
177 start: i64,
178 stop: i64,
179 with_scores: bool,
180 },
181 HSet {
182 key: String,
183 fields: Vec<(String, Bytes)>,
184 },
185 HGet {
186 key: String,
187 field: String,
188 },
189 HGetAll {
190 key: String,
191 },
192 HDel {
193 key: String,
194 fields: Vec<String>,
195 },
196 HExists {
197 key: String,
198 field: String,
199 },
200 HLen {
201 key: String,
202 },
203 HIncrBy {
204 key: String,
205 field: String,
206 delta: i64,
207 },
208 HKeys {
209 key: String,
210 },
211 HVals {
212 key: String,
213 },
214 HMGet {
215 key: String,
216 fields: Vec<String>,
217 },
218 SAdd {
219 key: String,
220 members: Vec<String>,
221 },
222 SRem {
223 key: String,
224 members: Vec<String>,
225 },
226 SMembers {
227 key: String,
228 },
229 SIsMember {
230 key: String,
231 member: String,
232 },
233 SCard {
234 key: String,
235 },
236 DbSize,
238 Stats,
240 Snapshot,
242 RewriteAof,
244 FlushDb,
246 FlushDbAsync,
248 Scan {
250 cursor: u64,
251 count: usize,
252 pattern: Option<String>,
253 },
254 CountKeysInSlot {
256 slot: u16,
257 },
258 GetKeysInSlot {
260 slot: u16,
261 count: usize,
262 },
263}
264
265#[derive(Debug)]
267pub enum ShardResponse {
268 Value(Option<Value>),
270 Ok,
272 Integer(i64),
274 Bool(bool),
276 Ttl(TtlResult),
278 OutOfMemory,
280 KeyCount(usize),
282 Stats(KeyspaceStats),
284 Len(usize),
286 Array(Vec<Bytes>),
288 TypeName(&'static str),
290 ZAddLen {
292 count: usize,
293 applied: Vec<(f64, String)>,
294 },
295 ZRemLen { count: usize, removed: Vec<String> },
297 Score(Option<f64>),
299 Rank(Option<usize>),
301 ScoredArray(Vec<(String, f64)>),
303 BulkString(String),
305 WrongType,
307 Err(String),
309 Scan { cursor: u64, keys: Vec<String> },
311 HashFields(Vec<(String, Bytes)>),
313 HDelLen { count: usize, removed: Vec<String> },
315 StringArray(Vec<String>),
317 OptionalArray(Vec<Option<Bytes>>),
319}
320
321#[derive(Debug)]
323pub struct ShardMessage {
324 pub request: ShardRequest,
325 pub reply: oneshot::Sender<ShardResponse>,
326}
327
328#[derive(Debug, Clone)]
333pub struct ShardHandle {
334 tx: mpsc::Sender<ShardMessage>,
335}
336
337impl ShardHandle {
338 pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
342 let rx = self.dispatch(request).await?;
343 rx.await.map_err(|_| ShardError::Unavailable)
344 }
345
346 pub(crate) async fn dispatch(
350 &self,
351 request: ShardRequest,
352 ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
353 let (reply_tx, reply_rx) = oneshot::channel();
354 let msg = ShardMessage {
355 request,
356 reply: reply_tx,
357 };
358 self.tx
359 .send(msg)
360 .await
361 .map_err(|_| ShardError::Unavailable)?;
362 Ok(reply_rx)
363 }
364}
365
366pub fn spawn_shard(
372 buffer: usize,
373 config: ShardConfig,
374 persistence: Option<ShardPersistenceConfig>,
375 drop_handle: Option<DropHandle>,
376) -> ShardHandle {
377 let (tx, rx) = mpsc::channel(buffer);
378 tokio::spawn(run_shard(rx, config, persistence, drop_handle));
379 ShardHandle { tx }
380}
381
382async fn run_shard(
385 mut rx: mpsc::Receiver<ShardMessage>,
386 config: ShardConfig,
387 persistence: Option<ShardPersistenceConfig>,
388 drop_handle: Option<DropHandle>,
389) {
390 let shard_id = config.shard_id;
391 let mut keyspace = Keyspace::with_config(config);
392
393 if let Some(handle) = drop_handle.clone() {
394 keyspace.set_drop_handle(handle);
395 }
396
397 if let Some(ref pcfg) = persistence {
399 #[cfg(feature = "encryption")]
400 let result = if let Some(ref key) = pcfg.encryption_key {
401 recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
402 } else {
403 recovery::recover_shard(&pcfg.data_dir, shard_id)
404 };
405 #[cfg(not(feature = "encryption"))]
406 let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
407 let count = result.entries.len();
408 for entry in result.entries {
409 let value = match entry.value {
410 RecoveredValue::String(data) => Value::String(data),
411 RecoveredValue::List(deque) => Value::List(deque),
412 RecoveredValue::SortedSet(members) => {
413 let mut ss = crate::types::sorted_set::SortedSet::new();
414 for (score, member) in members {
415 ss.add(member, score);
416 }
417 Value::SortedSet(ss)
418 }
419 RecoveredValue::Hash(map) => Value::Hash(map),
420 RecoveredValue::Set(set) => Value::Set(set),
421 };
422 keyspace.restore(entry.key, value, entry.ttl);
423 }
424 if count > 0 {
425 info!(
426 shard_id,
427 recovered_keys = count,
428 snapshot = result.loaded_snapshot,
429 aof = result.replayed_aof,
430 "recovered shard state"
431 );
432 }
433 }
434
435 let mut aof_writer: Option<AofWriter> = match &persistence {
437 Some(pcfg) if pcfg.append_only => {
438 let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
439 #[cfg(feature = "encryption")]
440 let result = if let Some(ref key) = pcfg.encryption_key {
441 AofWriter::open_encrypted(path, key.clone())
442 } else {
443 AofWriter::open(path)
444 };
445 #[cfg(not(feature = "encryption"))]
446 let result = AofWriter::open(path);
447 match result {
448 Ok(w) => Some(w),
449 Err(e) => {
450 warn!(shard_id, "failed to open AOF writer: {e}");
451 None
452 }
453 }
454 }
455 _ => None,
456 };
457
458 let fsync_policy = persistence
459 .as_ref()
460 .map(|p| p.fsync_policy)
461 .unwrap_or(FsyncPolicy::No);
462
463 let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
465 expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
466
467 let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
468 fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469
470 loop {
471 tokio::select! {
472 msg = rx.recv() => {
473 match msg {
474 Some(msg) => {
475 let request_kind = describe_request(&msg.request);
476 let response = dispatch(&mut keyspace, &msg.request);
477
478 if let Some(ref mut writer) = aof_writer {
480 if let Some(record) = to_aof_record(&msg.request, &response) {
481 if let Err(e) = writer.write_record(&record) {
482 warn!(shard_id, "aof write failed: {e}");
483 }
484 if fsync_policy == FsyncPolicy::Always {
485 if let Err(e) = writer.sync() {
486 warn!(shard_id, "aof sync failed: {e}");
487 }
488 }
489 }
490 }
491
492 match request_kind {
495 RequestKind::Snapshot => {
496 let resp = handle_snapshot(
497 &keyspace, &persistence, shard_id,
498 );
499 let _ = msg.reply.send(resp);
500 continue;
501 }
502 RequestKind::RewriteAof => {
503 let resp = handle_rewrite(
504 &keyspace,
505 &persistence,
506 &mut aof_writer,
507 shard_id,
508 );
509 let _ = msg.reply.send(resp);
510 continue;
511 }
512 RequestKind::FlushDbAsync => {
513 let old_entries = keyspace.flush_async();
514 if let Some(ref handle) = drop_handle {
515 handle.defer_entries(old_entries);
516 }
517 let _ = msg.reply.send(ShardResponse::Ok);
519 continue;
520 }
521 RequestKind::Other => {}
522 }
523
524 let _ = msg.reply.send(response);
525 }
526 None => break, }
528 }
529 _ = expiry_tick.tick() => {
530 expiry::run_expiration_cycle(&mut keyspace);
531 }
532 _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
533 if let Some(ref mut writer) = aof_writer {
534 if let Err(e) = writer.sync() {
535 warn!(shard_id, "periodic aof sync failed: {e}");
536 }
537 }
538 }
539 }
540 }
541
542 if let Some(ref mut writer) = aof_writer {
544 let _ = writer.sync();
545 }
546}
547
548enum RequestKind {
551 Snapshot,
552 RewriteAof,
553 FlushDbAsync,
554 Other,
555}
556
557fn describe_request(req: &ShardRequest) -> RequestKind {
558 match req {
559 ShardRequest::Snapshot => RequestKind::Snapshot,
560 ShardRequest::RewriteAof => RequestKind::RewriteAof,
561 ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
562 _ => RequestKind::Other,
563 }
564}
565
566fn dispatch(ks: &mut Keyspace, req: &ShardRequest) -> ShardResponse {
568 match req {
569 ShardRequest::Get { key } => match ks.get(key) {
570 Ok(val) => ShardResponse::Value(val),
571 Err(_) => ShardResponse::WrongType,
572 },
573 ShardRequest::Set {
574 key,
575 value,
576 expire,
577 nx,
578 xx,
579 } => {
580 if *nx && ks.exists(key) {
582 return ShardResponse::Value(None);
583 }
584 if *xx && !ks.exists(key) {
586 return ShardResponse::Value(None);
587 }
588 match ks.set(key.clone(), value.clone(), *expire) {
589 SetResult::Ok => ShardResponse::Ok,
590 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
591 }
592 }
593 ShardRequest::Incr { key } => match ks.incr(key) {
594 Ok(val) => ShardResponse::Integer(val),
595 Err(IncrError::WrongType) => ShardResponse::WrongType,
596 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
597 Err(e) => ShardResponse::Err(e.to_string()),
598 },
599 ShardRequest::Decr { key } => match ks.decr(key) {
600 Ok(val) => ShardResponse::Integer(val),
601 Err(IncrError::WrongType) => ShardResponse::WrongType,
602 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
603 Err(e) => ShardResponse::Err(e.to_string()),
604 },
605 ShardRequest::IncrBy { key, delta } => match ks.incr_by(key, *delta) {
606 Ok(val) => ShardResponse::Integer(val),
607 Err(IncrError::WrongType) => ShardResponse::WrongType,
608 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
609 Err(e) => ShardResponse::Err(e.to_string()),
610 },
611 ShardRequest::DecrBy { key, delta } => match ks.incr_by(key, -delta) {
612 Ok(val) => ShardResponse::Integer(val),
613 Err(IncrError::WrongType) => ShardResponse::WrongType,
614 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
615 Err(e) => ShardResponse::Err(e.to_string()),
616 },
617 ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
618 Ok(val) => ShardResponse::BulkString(val),
619 Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
620 Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
621 Err(e) => ShardResponse::Err(e.to_string()),
622 },
623 ShardRequest::Append { key, value } => match ks.append(key, value) {
624 Ok(len) => ShardResponse::Len(len),
625 Err(WriteError::WrongType) => ShardResponse::WrongType,
626 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
627 },
628 ShardRequest::Strlen { key } => match ks.strlen(key) {
629 Ok(len) => ShardResponse::Len(len),
630 Err(_) => ShardResponse::WrongType,
631 },
632 ShardRequest::Keys { pattern } => {
633 let keys = ks.keys(pattern);
634 ShardResponse::StringArray(keys)
635 }
636 ShardRequest::Rename { key, newkey } => {
637 use crate::keyspace::RenameError;
638 match ks.rename(key, newkey) {
639 Ok(()) => ShardResponse::Ok,
640 Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
641 }
642 }
643 ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
644 ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
645 ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
646 ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
647 ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
648 ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
649 ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
650 ShardRequest::Pexpire { key, milliseconds } => {
651 ShardResponse::Bool(ks.pexpire(key, *milliseconds))
652 }
653 ShardRequest::LPush { key, values } => match ks.lpush(key, values) {
654 Ok(len) => ShardResponse::Len(len),
655 Err(WriteError::WrongType) => ShardResponse::WrongType,
656 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
657 },
658 ShardRequest::RPush { key, values } => match ks.rpush(key, values) {
659 Ok(len) => ShardResponse::Len(len),
660 Err(WriteError::WrongType) => ShardResponse::WrongType,
661 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
662 },
663 ShardRequest::LPop { key } => match ks.lpop(key) {
664 Ok(val) => ShardResponse::Value(val.map(Value::String)),
665 Err(_) => ShardResponse::WrongType,
666 },
667 ShardRequest::RPop { key } => match ks.rpop(key) {
668 Ok(val) => ShardResponse::Value(val.map(Value::String)),
669 Err(_) => ShardResponse::WrongType,
670 },
671 ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
672 Ok(items) => ShardResponse::Array(items),
673 Err(_) => ShardResponse::WrongType,
674 },
675 ShardRequest::LLen { key } => match ks.llen(key) {
676 Ok(len) => ShardResponse::Len(len),
677 Err(_) => ShardResponse::WrongType,
678 },
679 ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
680 ShardRequest::ZAdd {
681 key,
682 members,
683 nx,
684 xx,
685 gt,
686 lt,
687 ch,
688 } => {
689 let flags = ZAddFlags {
690 nx: *nx,
691 xx: *xx,
692 gt: *gt,
693 lt: *lt,
694 ch: *ch,
695 };
696 match ks.zadd(key, members, &flags) {
697 Ok(result) => ShardResponse::ZAddLen {
698 count: result.count,
699 applied: result.applied,
700 },
701 Err(WriteError::WrongType) => ShardResponse::WrongType,
702 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
703 }
704 }
705 ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
706 Ok(removed) => ShardResponse::ZRemLen {
707 count: removed.len(),
708 removed,
709 },
710 Err(_) => ShardResponse::WrongType,
711 },
712 ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
713 Ok(score) => ShardResponse::Score(score),
714 Err(_) => ShardResponse::WrongType,
715 },
716 ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
717 Ok(rank) => ShardResponse::Rank(rank),
718 Err(_) => ShardResponse::WrongType,
719 },
720 ShardRequest::ZCard { key } => match ks.zcard(key) {
721 Ok(len) => ShardResponse::Len(len),
722 Err(_) => ShardResponse::WrongType,
723 },
724 ShardRequest::ZRange {
725 key, start, stop, ..
726 } => match ks.zrange(key, *start, *stop) {
727 Ok(items) => ShardResponse::ScoredArray(items),
728 Err(_) => ShardResponse::WrongType,
729 },
730 ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
731 ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
732 ShardRequest::FlushDb => {
733 ks.clear();
734 ShardResponse::Ok
735 }
736 ShardRequest::Scan {
737 cursor,
738 count,
739 pattern,
740 } => {
741 let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
742 ShardResponse::Scan {
743 cursor: next_cursor,
744 keys,
745 }
746 }
747 ShardRequest::HSet { key, fields } => match ks.hset(key, fields) {
748 Ok(count) => ShardResponse::Len(count),
749 Err(WriteError::WrongType) => ShardResponse::WrongType,
750 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
751 },
752 ShardRequest::HGet { key, field } => match ks.hget(key, field) {
753 Ok(val) => ShardResponse::Value(val.map(Value::String)),
754 Err(_) => ShardResponse::WrongType,
755 },
756 ShardRequest::HGetAll { key } => match ks.hgetall(key) {
757 Ok(fields) => ShardResponse::HashFields(fields),
758 Err(_) => ShardResponse::WrongType,
759 },
760 ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
761 Ok(removed) => ShardResponse::HDelLen {
762 count: removed.len(),
763 removed,
764 },
765 Err(_) => ShardResponse::WrongType,
766 },
767 ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
768 Ok(exists) => ShardResponse::Bool(exists),
769 Err(_) => ShardResponse::WrongType,
770 },
771 ShardRequest::HLen { key } => match ks.hlen(key) {
772 Ok(len) => ShardResponse::Len(len),
773 Err(_) => ShardResponse::WrongType,
774 },
775 ShardRequest::HIncrBy { key, field, delta } => match ks.hincrby(key, field, *delta) {
776 Ok(val) => ShardResponse::Integer(val),
777 Err(IncrError::WrongType) => ShardResponse::WrongType,
778 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
779 Err(e) => ShardResponse::Err(e.to_string()),
780 },
781 ShardRequest::HKeys { key } => match ks.hkeys(key) {
782 Ok(keys) => ShardResponse::StringArray(keys),
783 Err(_) => ShardResponse::WrongType,
784 },
785 ShardRequest::HVals { key } => match ks.hvals(key) {
786 Ok(vals) => ShardResponse::Array(vals),
787 Err(_) => ShardResponse::WrongType,
788 },
789 ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
790 Ok(vals) => ShardResponse::OptionalArray(vals),
791 Err(_) => ShardResponse::WrongType,
792 },
793 ShardRequest::SAdd { key, members } => match ks.sadd(key, members) {
794 Ok(count) => ShardResponse::Len(count),
795 Err(WriteError::WrongType) => ShardResponse::WrongType,
796 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
797 },
798 ShardRequest::SRem { key, members } => match ks.srem(key, members) {
799 Ok(count) => ShardResponse::Len(count),
800 Err(_) => ShardResponse::WrongType,
801 },
802 ShardRequest::SMembers { key } => match ks.smembers(key) {
803 Ok(members) => ShardResponse::StringArray(members),
804 Err(_) => ShardResponse::WrongType,
805 },
806 ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
807 Ok(exists) => ShardResponse::Bool(exists),
808 Err(_) => ShardResponse::WrongType,
809 },
810 ShardRequest::SCard { key } => match ks.scard(key) {
811 Ok(count) => ShardResponse::Len(count),
812 Err(_) => ShardResponse::WrongType,
813 },
814 ShardRequest::CountKeysInSlot { slot } => {
815 ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
816 }
817 ShardRequest::GetKeysInSlot { slot, count } => {
818 ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
819 }
820 ShardRequest::Snapshot | ShardRequest::RewriteAof | ShardRequest::FlushDbAsync => {
822 ShardResponse::Ok
823 }
824 }
825}
826
827fn to_aof_record(req: &ShardRequest, resp: &ShardResponse) -> Option<AofRecord> {
830 match (req, resp) {
831 (
832 ShardRequest::Set {
833 key, value, expire, ..
834 },
835 ShardResponse::Ok,
836 ) => {
837 let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
838 Some(AofRecord::Set {
839 key: key.clone(),
840 value: value.clone(),
841 expire_ms,
842 })
843 }
844 (ShardRequest::Del { key }, ShardResponse::Bool(true))
845 | (ShardRequest::Unlink { key }, ShardResponse::Bool(true)) => {
846 Some(AofRecord::Del { key: key.clone() })
847 }
848 (ShardRequest::Expire { key, seconds }, ShardResponse::Bool(true)) => {
849 Some(AofRecord::Expire {
850 key: key.clone(),
851 seconds: *seconds,
852 })
853 }
854 (ShardRequest::LPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::LPush {
855 key: key.clone(),
856 values: values.clone(),
857 }),
858 (ShardRequest::RPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::RPush {
859 key: key.clone(),
860 values: values.clone(),
861 }),
862 (ShardRequest::LPop { key }, ShardResponse::Value(Some(_))) => {
863 Some(AofRecord::LPop { key: key.clone() })
864 }
865 (ShardRequest::RPop { key }, ShardResponse::Value(Some(_))) => {
866 Some(AofRecord::RPop { key: key.clone() })
867 }
868 (ShardRequest::ZAdd { key, .. }, ShardResponse::ZAddLen { applied, .. })
869 if !applied.is_empty() =>
870 {
871 Some(AofRecord::ZAdd {
872 key: key.clone(),
873 members: applied.clone(),
874 })
875 }
876 (ShardRequest::ZRem { key, .. }, ShardResponse::ZRemLen { removed, .. })
877 if !removed.is_empty() =>
878 {
879 Some(AofRecord::ZRem {
880 key: key.clone(),
881 members: removed.clone(),
882 })
883 }
884 (ShardRequest::Incr { key }, ShardResponse::Integer(_)) => {
885 Some(AofRecord::Incr { key: key.clone() })
886 }
887 (ShardRequest::Decr { key }, ShardResponse::Integer(_)) => {
888 Some(AofRecord::Decr { key: key.clone() })
889 }
890 (ShardRequest::IncrBy { key, delta }, ShardResponse::Integer(_)) => {
891 Some(AofRecord::IncrBy {
892 key: key.clone(),
893 delta: *delta,
894 })
895 }
896 (ShardRequest::DecrBy { key, delta }, ShardResponse::Integer(_)) => {
897 Some(AofRecord::DecrBy {
898 key: key.clone(),
899 delta: *delta,
900 })
901 }
902 (ShardRequest::IncrByFloat { key, .. }, ShardResponse::BulkString(val)) => {
905 Some(AofRecord::Set {
906 key: key.clone(),
907 value: Bytes::from(val.clone()),
908 expire_ms: -1,
909 })
910 }
911 (ShardRequest::Append { key, value }, ShardResponse::Len(_)) => Some(AofRecord::Append {
913 key: key.clone(),
914 value: value.clone(),
915 }),
916 (ShardRequest::Rename { key, newkey }, ShardResponse::Ok) => Some(AofRecord::Rename {
917 key: key.clone(),
918 newkey: newkey.clone(),
919 }),
920 (ShardRequest::Persist { key }, ShardResponse::Bool(true)) => {
921 Some(AofRecord::Persist { key: key.clone() })
922 }
923 (ShardRequest::Pexpire { key, milliseconds }, ShardResponse::Bool(true)) => {
924 Some(AofRecord::Pexpire {
925 key: key.clone(),
926 milliseconds: *milliseconds,
927 })
928 }
929 (ShardRequest::HSet { key, fields }, ShardResponse::Len(_)) => Some(AofRecord::HSet {
931 key: key.clone(),
932 fields: fields.clone(),
933 }),
934 (ShardRequest::HDel { key, .. }, ShardResponse::HDelLen { removed, .. })
935 if !removed.is_empty() =>
936 {
937 Some(AofRecord::HDel {
938 key: key.clone(),
939 fields: removed.clone(),
940 })
941 }
942 (ShardRequest::HIncrBy { key, field, delta }, ShardResponse::Integer(_)) => {
943 Some(AofRecord::HIncrBy {
944 key: key.clone(),
945 field: field.clone(),
946 delta: *delta,
947 })
948 }
949 (ShardRequest::SAdd { key, members }, ShardResponse::Len(count)) if *count > 0 => {
951 Some(AofRecord::SAdd {
952 key: key.clone(),
953 members: members.clone(),
954 })
955 }
956 (ShardRequest::SRem { key, members }, ShardResponse::Len(count)) if *count > 0 => {
957 Some(AofRecord::SRem {
958 key: key.clone(),
959 members: members.clone(),
960 })
961 }
962 _ => None,
963 }
964}
965
966fn handle_snapshot(
968 keyspace: &Keyspace,
969 persistence: &Option<ShardPersistenceConfig>,
970 shard_id: u16,
971) -> ShardResponse {
972 let pcfg = match persistence {
973 Some(p) => p,
974 None => return ShardResponse::Err("persistence not configured".into()),
975 };
976
977 let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
978 let result = write_snapshot(
979 keyspace,
980 &path,
981 shard_id,
982 #[cfg(feature = "encryption")]
983 pcfg.encryption_key.as_ref(),
984 );
985 match result {
986 Ok(count) => {
987 info!(shard_id, entries = count, "snapshot written");
988 ShardResponse::Ok
989 }
990 Err(e) => {
991 warn!(shard_id, "snapshot failed: {e}");
992 ShardResponse::Err(format!("snapshot failed: {e}"))
993 }
994 }
995}
996
997fn handle_rewrite(
999 keyspace: &Keyspace,
1000 persistence: &Option<ShardPersistenceConfig>,
1001 aof_writer: &mut Option<AofWriter>,
1002 shard_id: u16,
1003) -> ShardResponse {
1004 let pcfg = match persistence {
1005 Some(p) => p,
1006 None => return ShardResponse::Err("persistence not configured".into()),
1007 };
1008
1009 let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1010 let result = write_snapshot(
1011 keyspace,
1012 &path,
1013 shard_id,
1014 #[cfg(feature = "encryption")]
1015 pcfg.encryption_key.as_ref(),
1016 );
1017 match result {
1018 Ok(count) => {
1019 if let Some(ref mut writer) = aof_writer {
1021 if let Err(e) = writer.truncate() {
1022 warn!(shard_id, "aof truncate after rewrite failed: {e}");
1023 }
1024 }
1025 info!(shard_id, entries = count, "aof rewrite complete");
1026 ShardResponse::Ok
1027 }
1028 Err(e) => {
1029 warn!(shard_id, "aof rewrite failed: {e}");
1030 ShardResponse::Err(format!("rewrite failed: {e}"))
1031 }
1032 }
1033}
1034
1035fn write_snapshot(
1037 keyspace: &Keyspace,
1038 path: &std::path::Path,
1039 shard_id: u16,
1040 #[cfg(feature = "encryption")] encryption_key: Option<
1041 &ember_persistence::encryption::EncryptionKey,
1042 >,
1043) -> Result<u32, ember_persistence::format::FormatError> {
1044 #[cfg(feature = "encryption")]
1045 let mut writer = if let Some(key) = encryption_key {
1046 SnapshotWriter::create_encrypted(path, shard_id, key.clone())?
1047 } else {
1048 SnapshotWriter::create(path, shard_id)?
1049 };
1050 #[cfg(not(feature = "encryption"))]
1051 let mut writer = SnapshotWriter::create(path, shard_id)?;
1052 let mut count = 0u32;
1053
1054 for (key, value, ttl_ms) in keyspace.iter_entries() {
1055 let snap_value = match value {
1056 Value::String(data) => SnapValue::String(data.clone()),
1057 Value::List(deque) => SnapValue::List(deque.clone()),
1058 Value::SortedSet(ss) => {
1059 let members: Vec<(f64, String)> = ss
1060 .iter()
1061 .map(|(member, score)| (score, member.to_owned()))
1062 .collect();
1063 SnapValue::SortedSet(members)
1064 }
1065 Value::Hash(map) => SnapValue::Hash(map.clone()),
1066 Value::Set(set) => SnapValue::Set(set.clone()),
1067 };
1068 writer.write_entry(&SnapEntry {
1069 key: key.to_owned(),
1070 value: snap_value,
1071 expire_ms: ttl_ms,
1072 })?;
1073 count += 1;
1074 }
1075
1076 writer.finish()?;
1077 Ok(count)
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use super::*;
1083
1084 #[test]
1085 fn dispatch_set_and_get() {
1086 let mut ks = Keyspace::new();
1087
1088 let resp = dispatch(
1089 &mut ks,
1090 &ShardRequest::Set {
1091 key: "k".into(),
1092 value: Bytes::from("v"),
1093 expire: None,
1094 nx: false,
1095 xx: false,
1096 },
1097 );
1098 assert!(matches!(resp, ShardResponse::Ok));
1099
1100 let resp = dispatch(&mut ks, &ShardRequest::Get { key: "k".into() });
1101 match resp {
1102 ShardResponse::Value(Some(Value::String(data))) => {
1103 assert_eq!(data, Bytes::from("v"));
1104 }
1105 other => panic!("expected Value(Some(String)), got {other:?}"),
1106 }
1107 }
1108
1109 #[test]
1110 fn dispatch_get_missing() {
1111 let mut ks = Keyspace::new();
1112 let resp = dispatch(&mut ks, &ShardRequest::Get { key: "nope".into() });
1113 assert!(matches!(resp, ShardResponse::Value(None)));
1114 }
1115
1116 #[test]
1117 fn dispatch_del() {
1118 let mut ks = Keyspace::new();
1119 ks.set("key".into(), Bytes::from("val"), None);
1120
1121 let resp = dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1122 assert!(matches!(resp, ShardResponse::Bool(true)));
1123
1124 let resp = dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1125 assert!(matches!(resp, ShardResponse::Bool(false)));
1126 }
1127
1128 #[test]
1129 fn dispatch_exists() {
1130 let mut ks = Keyspace::new();
1131 ks.set("yes".into(), Bytes::from("here"), None);
1132
1133 let resp = dispatch(&mut ks, &ShardRequest::Exists { key: "yes".into() });
1134 assert!(matches!(resp, ShardResponse::Bool(true)));
1135
1136 let resp = dispatch(&mut ks, &ShardRequest::Exists { key: "no".into() });
1137 assert!(matches!(resp, ShardResponse::Bool(false)));
1138 }
1139
1140 #[test]
1141 fn dispatch_expire_and_ttl() {
1142 let mut ks = Keyspace::new();
1143 ks.set("key".into(), Bytes::from("val"), None);
1144
1145 let resp = dispatch(
1146 &mut ks,
1147 &ShardRequest::Expire {
1148 key: "key".into(),
1149 seconds: 60,
1150 },
1151 );
1152 assert!(matches!(resp, ShardResponse::Bool(true)));
1153
1154 let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1155 match resp {
1156 ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
1157 other => panic!("expected Ttl(Seconds), got {other:?}"),
1158 }
1159 }
1160
1161 #[test]
1162 fn dispatch_ttl_missing() {
1163 let mut ks = Keyspace::new();
1164 let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "gone".into() });
1165 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1166 }
1167
1168 #[tokio::test]
1169 async fn shard_round_trip() {
1170 let handle = spawn_shard(16, ShardConfig::default(), None, None);
1171
1172 let resp = handle
1173 .send(ShardRequest::Set {
1174 key: "hello".into(),
1175 value: Bytes::from("world"),
1176 expire: None,
1177 nx: false,
1178 xx: false,
1179 })
1180 .await
1181 .unwrap();
1182 assert!(matches!(resp, ShardResponse::Ok));
1183
1184 let resp = handle
1185 .send(ShardRequest::Get {
1186 key: "hello".into(),
1187 })
1188 .await
1189 .unwrap();
1190 match resp {
1191 ShardResponse::Value(Some(Value::String(data))) => {
1192 assert_eq!(data, Bytes::from("world"));
1193 }
1194 other => panic!("expected Value(Some(String)), got {other:?}"),
1195 }
1196 }
1197
1198 #[tokio::test]
1199 async fn expired_key_through_shard() {
1200 let handle = spawn_shard(16, ShardConfig::default(), None, None);
1201
1202 handle
1203 .send(ShardRequest::Set {
1204 key: "temp".into(),
1205 value: Bytes::from("gone"),
1206 expire: Some(Duration::from_millis(10)),
1207 nx: false,
1208 xx: false,
1209 })
1210 .await
1211 .unwrap();
1212
1213 tokio::time::sleep(Duration::from_millis(30)).await;
1214
1215 let resp = handle
1216 .send(ShardRequest::Get { key: "temp".into() })
1217 .await
1218 .unwrap();
1219 assert!(matches!(resp, ShardResponse::Value(None)));
1220 }
1221
1222 #[tokio::test]
1223 async fn active_expiration_cleans_up_without_access() {
1224 let handle = spawn_shard(16, ShardConfig::default(), None, None);
1225
1226 handle
1228 .send(ShardRequest::Set {
1229 key: "ephemeral".into(),
1230 value: Bytes::from("temp"),
1231 expire: Some(Duration::from_millis(10)),
1232 nx: false,
1233 xx: false,
1234 })
1235 .await
1236 .unwrap();
1237
1238 handle
1240 .send(ShardRequest::Set {
1241 key: "persistent".into(),
1242 value: Bytes::from("stays"),
1243 expire: None,
1244 nx: false,
1245 xx: false,
1246 })
1247 .await
1248 .unwrap();
1249
1250 tokio::time::sleep(Duration::from_millis(250)).await;
1253
1254 let resp = handle
1256 .send(ShardRequest::Exists {
1257 key: "ephemeral".into(),
1258 })
1259 .await
1260 .unwrap();
1261 assert!(matches!(resp, ShardResponse::Bool(false)));
1262
1263 let resp = handle
1265 .send(ShardRequest::Exists {
1266 key: "persistent".into(),
1267 })
1268 .await
1269 .unwrap();
1270 assert!(matches!(resp, ShardResponse::Bool(true)));
1271 }
1272
1273 #[tokio::test]
1274 async fn shard_with_persistence_snapshot_and_recovery() {
1275 let dir = tempfile::tempdir().unwrap();
1276 let pcfg = ShardPersistenceConfig {
1277 data_dir: dir.path().to_owned(),
1278 append_only: true,
1279 fsync_policy: FsyncPolicy::Always,
1280 #[cfg(feature = "encryption")]
1281 encryption_key: None,
1282 };
1283 let config = ShardConfig {
1284 shard_id: 0,
1285 ..ShardConfig::default()
1286 };
1287
1288 {
1290 let handle = spawn_shard(16, config.clone(), Some(pcfg.clone()), None);
1291 handle
1292 .send(ShardRequest::Set {
1293 key: "a".into(),
1294 value: Bytes::from("1"),
1295 expire: None,
1296 nx: false,
1297 xx: false,
1298 })
1299 .await
1300 .unwrap();
1301 handle
1302 .send(ShardRequest::Set {
1303 key: "b".into(),
1304 value: Bytes::from("2"),
1305 expire: Some(Duration::from_secs(300)),
1306 nx: false,
1307 xx: false,
1308 })
1309 .await
1310 .unwrap();
1311 handle.send(ShardRequest::Snapshot).await.unwrap();
1312 handle
1314 .send(ShardRequest::Set {
1315 key: "c".into(),
1316 value: Bytes::from("3"),
1317 expire: None,
1318 nx: false,
1319 xx: false,
1320 })
1321 .await
1322 .unwrap();
1323 }
1325
1326 tokio::time::sleep(Duration::from_millis(50)).await;
1328
1329 {
1331 let handle = spawn_shard(16, config, Some(pcfg), None);
1332 tokio::time::sleep(Duration::from_millis(50)).await;
1334
1335 let resp = handle
1336 .send(ShardRequest::Get { key: "a".into() })
1337 .await
1338 .unwrap();
1339 match resp {
1340 ShardResponse::Value(Some(Value::String(data))) => {
1341 assert_eq!(data, Bytes::from("1"));
1342 }
1343 other => panic!("expected a=1, got {other:?}"),
1344 }
1345
1346 let resp = handle
1347 .send(ShardRequest::Get { key: "b".into() })
1348 .await
1349 .unwrap();
1350 assert!(matches!(resp, ShardResponse::Value(Some(_))));
1351
1352 let resp = handle
1353 .send(ShardRequest::Get { key: "c".into() })
1354 .await
1355 .unwrap();
1356 match resp {
1357 ShardResponse::Value(Some(Value::String(data))) => {
1358 assert_eq!(data, Bytes::from("3"));
1359 }
1360 other => panic!("expected c=3, got {other:?}"),
1361 }
1362 }
1363 }
1364
1365 #[test]
1366 fn to_aof_record_for_set() {
1367 let req = ShardRequest::Set {
1368 key: "k".into(),
1369 value: Bytes::from("v"),
1370 expire: Some(Duration::from_secs(60)),
1371 nx: false,
1372 xx: false,
1373 };
1374 let resp = ShardResponse::Ok;
1375 let record = to_aof_record(&req, &resp).unwrap();
1376 match record {
1377 AofRecord::Set { key, expire_ms, .. } => {
1378 assert_eq!(key, "k");
1379 assert_eq!(expire_ms, 60_000);
1380 }
1381 other => panic!("expected Set, got {other:?}"),
1382 }
1383 }
1384
1385 #[test]
1386 fn to_aof_record_skips_failed_set() {
1387 let req = ShardRequest::Set {
1388 key: "k".into(),
1389 value: Bytes::from("v"),
1390 expire: None,
1391 nx: false,
1392 xx: false,
1393 };
1394 let resp = ShardResponse::OutOfMemory;
1395 assert!(to_aof_record(&req, &resp).is_none());
1396 }
1397
1398 #[test]
1399 fn to_aof_record_for_del() {
1400 let req = ShardRequest::Del { key: "k".into() };
1401 let resp = ShardResponse::Bool(true);
1402 let record = to_aof_record(&req, &resp).unwrap();
1403 assert!(matches!(record, AofRecord::Del { .. }));
1404 }
1405
1406 #[test]
1407 fn to_aof_record_skips_failed_del() {
1408 let req = ShardRequest::Del { key: "k".into() };
1409 let resp = ShardResponse::Bool(false);
1410 assert!(to_aof_record(&req, &resp).is_none());
1411 }
1412
1413 #[test]
1414 fn dispatch_incr_new_key() {
1415 let mut ks = Keyspace::new();
1416 let resp = dispatch(&mut ks, &ShardRequest::Incr { key: "c".into() });
1417 assert!(matches!(resp, ShardResponse::Integer(1)));
1418 }
1419
1420 #[test]
1421 fn dispatch_decr_existing() {
1422 let mut ks = Keyspace::new();
1423 ks.set("n".into(), Bytes::from("10"), None);
1424 let resp = dispatch(&mut ks, &ShardRequest::Decr { key: "n".into() });
1425 assert!(matches!(resp, ShardResponse::Integer(9)));
1426 }
1427
1428 #[test]
1429 fn dispatch_incr_non_integer() {
1430 let mut ks = Keyspace::new();
1431 ks.set("s".into(), Bytes::from("hello"), None);
1432 let resp = dispatch(&mut ks, &ShardRequest::Incr { key: "s".into() });
1433 assert!(matches!(resp, ShardResponse::Err(_)));
1434 }
1435
1436 #[test]
1437 fn dispatch_incrby() {
1438 let mut ks = Keyspace::new();
1439 ks.set("n".into(), Bytes::from("10"), None);
1440 let resp = dispatch(
1441 &mut ks,
1442 &ShardRequest::IncrBy {
1443 key: "n".into(),
1444 delta: 5,
1445 },
1446 );
1447 assert!(matches!(resp, ShardResponse::Integer(15)));
1448 }
1449
1450 #[test]
1451 fn dispatch_decrby() {
1452 let mut ks = Keyspace::new();
1453 ks.set("n".into(), Bytes::from("10"), None);
1454 let resp = dispatch(
1455 &mut ks,
1456 &ShardRequest::DecrBy {
1457 key: "n".into(),
1458 delta: 3,
1459 },
1460 );
1461 assert!(matches!(resp, ShardResponse::Integer(7)));
1462 }
1463
1464 #[test]
1465 fn dispatch_incrby_new_key() {
1466 let mut ks = Keyspace::new();
1467 let resp = dispatch(
1468 &mut ks,
1469 &ShardRequest::IncrBy {
1470 key: "new".into(),
1471 delta: 42,
1472 },
1473 );
1474 assert!(matches!(resp, ShardResponse::Integer(42)));
1475 }
1476
1477 #[test]
1478 fn dispatch_incrbyfloat() {
1479 let mut ks = Keyspace::new();
1480 ks.set("n".into(), Bytes::from("10.5"), None);
1481 let resp = dispatch(
1482 &mut ks,
1483 &ShardRequest::IncrByFloat {
1484 key: "n".into(),
1485 delta: 2.3,
1486 },
1487 );
1488 match resp {
1489 ShardResponse::BulkString(val) => {
1490 let f: f64 = val.parse().unwrap();
1491 assert!((f - 12.8).abs() < 0.001);
1492 }
1493 other => panic!("expected BulkString, got {other:?}"),
1494 }
1495 }
1496
1497 #[test]
1498 fn dispatch_append() {
1499 let mut ks = Keyspace::new();
1500 ks.set("k".into(), Bytes::from("hello"), None);
1501 let resp = dispatch(
1502 &mut ks,
1503 &ShardRequest::Append {
1504 key: "k".into(),
1505 value: Bytes::from(" world"),
1506 },
1507 );
1508 assert!(matches!(resp, ShardResponse::Len(11)));
1509 }
1510
1511 #[test]
1512 fn dispatch_strlen() {
1513 let mut ks = Keyspace::new();
1514 ks.set("k".into(), Bytes::from("hello"), None);
1515 let resp = dispatch(&mut ks, &ShardRequest::Strlen { key: "k".into() });
1516 assert!(matches!(resp, ShardResponse::Len(5)));
1517 }
1518
1519 #[test]
1520 fn dispatch_strlen_missing() {
1521 let mut ks = Keyspace::new();
1522 let resp = dispatch(&mut ks, &ShardRequest::Strlen { key: "nope".into() });
1523 assert!(matches!(resp, ShardResponse::Len(0)));
1524 }
1525
1526 #[test]
1527 fn to_aof_record_for_append() {
1528 let req = ShardRequest::Append {
1529 key: "k".into(),
1530 value: Bytes::from("data"),
1531 };
1532 let resp = ShardResponse::Len(10);
1533 let record = to_aof_record(&req, &resp).unwrap();
1534 match record {
1535 AofRecord::Append { key, value } => {
1536 assert_eq!(key, "k");
1537 assert_eq!(value, Bytes::from("data"));
1538 }
1539 other => panic!("expected Append, got {other:?}"),
1540 }
1541 }
1542
1543 #[test]
1544 fn dispatch_incrbyfloat_new_key() {
1545 let mut ks = Keyspace::new();
1546 let resp = dispatch(
1547 &mut ks,
1548 &ShardRequest::IncrByFloat {
1549 key: "new".into(),
1550 delta: 2.72,
1551 },
1552 );
1553 match resp {
1554 ShardResponse::BulkString(val) => {
1555 let f: f64 = val.parse().unwrap();
1556 assert!((f - 2.72).abs() < 0.001);
1557 }
1558 other => panic!("expected BulkString, got {other:?}"),
1559 }
1560 }
1561
1562 #[test]
1563 fn to_aof_record_for_incr() {
1564 let req = ShardRequest::Incr { key: "c".into() };
1565 let resp = ShardResponse::Integer(1);
1566 let record = to_aof_record(&req, &resp).unwrap();
1567 assert!(matches!(record, AofRecord::Incr { .. }));
1568 }
1569
1570 #[test]
1571 fn to_aof_record_for_decr() {
1572 let req = ShardRequest::Decr { key: "c".into() };
1573 let resp = ShardResponse::Integer(-1);
1574 let record = to_aof_record(&req, &resp).unwrap();
1575 assert!(matches!(record, AofRecord::Decr { .. }));
1576 }
1577
1578 #[test]
1579 fn to_aof_record_for_incrby() {
1580 let req = ShardRequest::IncrBy {
1581 key: "c".into(),
1582 delta: 5,
1583 };
1584 let resp = ShardResponse::Integer(15);
1585 let record = to_aof_record(&req, &resp).unwrap();
1586 match record {
1587 AofRecord::IncrBy { key, delta } => {
1588 assert_eq!(key, "c");
1589 assert_eq!(delta, 5);
1590 }
1591 other => panic!("expected IncrBy, got {other:?}"),
1592 }
1593 }
1594
1595 #[test]
1596 fn to_aof_record_for_decrby() {
1597 let req = ShardRequest::DecrBy {
1598 key: "c".into(),
1599 delta: 3,
1600 };
1601 let resp = ShardResponse::Integer(7);
1602 let record = to_aof_record(&req, &resp).unwrap();
1603 match record {
1604 AofRecord::DecrBy { key, delta } => {
1605 assert_eq!(key, "c");
1606 assert_eq!(delta, 3);
1607 }
1608 other => panic!("expected DecrBy, got {other:?}"),
1609 }
1610 }
1611
1612 #[test]
1613 fn dispatch_persist_removes_ttl() {
1614 let mut ks = Keyspace::new();
1615 ks.set(
1616 "key".into(),
1617 Bytes::from("val"),
1618 Some(Duration::from_secs(60)),
1619 );
1620
1621 let resp = dispatch(&mut ks, &ShardRequest::Persist { key: "key".into() });
1622 assert!(matches!(resp, ShardResponse::Bool(true)));
1623
1624 let resp = dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1625 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
1626 }
1627
1628 #[test]
1629 fn dispatch_persist_missing_key() {
1630 let mut ks = Keyspace::new();
1631 let resp = dispatch(&mut ks, &ShardRequest::Persist { key: "nope".into() });
1632 assert!(matches!(resp, ShardResponse::Bool(false)));
1633 }
1634
1635 #[test]
1636 fn dispatch_pttl() {
1637 let mut ks = Keyspace::new();
1638 ks.set(
1639 "key".into(),
1640 Bytes::from("val"),
1641 Some(Duration::from_secs(60)),
1642 );
1643
1644 let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1645 match resp {
1646 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1647 assert!(ms > 59_000 && ms <= 60_000);
1648 }
1649 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1650 }
1651 }
1652
1653 #[test]
1654 fn dispatch_pttl_missing() {
1655 let mut ks = Keyspace::new();
1656 let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "nope".into() });
1657 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1658 }
1659
1660 #[test]
1661 fn dispatch_pexpire() {
1662 let mut ks = Keyspace::new();
1663 ks.set("key".into(), Bytes::from("val"), None);
1664
1665 let resp = dispatch(
1666 &mut ks,
1667 &ShardRequest::Pexpire {
1668 key: "key".into(),
1669 milliseconds: 5000,
1670 },
1671 );
1672 assert!(matches!(resp, ShardResponse::Bool(true)));
1673
1674 let resp = dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1675 match resp {
1676 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1677 assert!(ms > 4000 && ms <= 5000);
1678 }
1679 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1680 }
1681 }
1682
1683 #[test]
1684 fn to_aof_record_for_persist() {
1685 let req = ShardRequest::Persist { key: "k".into() };
1686 let resp = ShardResponse::Bool(true);
1687 let record = to_aof_record(&req, &resp).unwrap();
1688 assert!(matches!(record, AofRecord::Persist { .. }));
1689 }
1690
1691 #[test]
1692 fn to_aof_record_skips_failed_persist() {
1693 let req = ShardRequest::Persist { key: "k".into() };
1694 let resp = ShardResponse::Bool(false);
1695 assert!(to_aof_record(&req, &resp).is_none());
1696 }
1697
1698 #[test]
1699 fn to_aof_record_for_pexpire() {
1700 let req = ShardRequest::Pexpire {
1701 key: "k".into(),
1702 milliseconds: 5000,
1703 };
1704 let resp = ShardResponse::Bool(true);
1705 let record = to_aof_record(&req, &resp).unwrap();
1706 match record {
1707 AofRecord::Pexpire { key, milliseconds } => {
1708 assert_eq!(key, "k");
1709 assert_eq!(milliseconds, 5000);
1710 }
1711 other => panic!("expected Pexpire, got {other:?}"),
1712 }
1713 }
1714
1715 #[test]
1716 fn to_aof_record_skips_failed_pexpire() {
1717 let req = ShardRequest::Pexpire {
1718 key: "k".into(),
1719 milliseconds: 5000,
1720 };
1721 let resp = ShardResponse::Bool(false);
1722 assert!(to_aof_record(&req, &resp).is_none());
1723 }
1724
1725 #[test]
1726 fn dispatch_set_nx_when_key_missing() {
1727 let mut ks = Keyspace::new();
1728 let resp = dispatch(
1729 &mut ks,
1730 &ShardRequest::Set {
1731 key: "k".into(),
1732 value: Bytes::from("v"),
1733 expire: None,
1734 nx: true,
1735 xx: false,
1736 },
1737 );
1738 assert!(matches!(resp, ShardResponse::Ok));
1739 assert!(ks.exists("k"));
1740 }
1741
1742 #[test]
1743 fn dispatch_set_nx_when_key_exists() {
1744 let mut ks = Keyspace::new();
1745 ks.set("k".into(), Bytes::from("old"), None);
1746
1747 let resp = dispatch(
1748 &mut ks,
1749 &ShardRequest::Set {
1750 key: "k".into(),
1751 value: Bytes::from("new"),
1752 expire: None,
1753 nx: true,
1754 xx: false,
1755 },
1756 );
1757 assert!(matches!(resp, ShardResponse::Value(None)));
1759 match ks.get("k").unwrap() {
1761 Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
1762 other => panic!("expected old value, got {other:?}"),
1763 }
1764 }
1765
1766 #[test]
1767 fn dispatch_set_xx_when_key_exists() {
1768 let mut ks = Keyspace::new();
1769 ks.set("k".into(), Bytes::from("old"), None);
1770
1771 let resp = dispatch(
1772 &mut ks,
1773 &ShardRequest::Set {
1774 key: "k".into(),
1775 value: Bytes::from("new"),
1776 expire: None,
1777 nx: false,
1778 xx: true,
1779 },
1780 );
1781 assert!(matches!(resp, ShardResponse::Ok));
1782 match ks.get("k").unwrap() {
1783 Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
1784 other => panic!("expected new value, got {other:?}"),
1785 }
1786 }
1787
1788 #[test]
1789 fn dispatch_set_xx_when_key_missing() {
1790 let mut ks = Keyspace::new();
1791 let resp = dispatch(
1792 &mut ks,
1793 &ShardRequest::Set {
1794 key: "k".into(),
1795 value: Bytes::from("v"),
1796 expire: None,
1797 nx: false,
1798 xx: true,
1799 },
1800 );
1801 assert!(matches!(resp, ShardResponse::Value(None)));
1803 assert!(!ks.exists("k"));
1804 }
1805
1806 #[test]
1807 fn to_aof_record_skips_nx_blocked_set() {
1808 let req = ShardRequest::Set {
1809 key: "k".into(),
1810 value: Bytes::from("v"),
1811 expire: None,
1812 nx: true,
1813 xx: false,
1814 };
1815 let resp = ShardResponse::Value(None);
1817 assert!(to_aof_record(&req, &resp).is_none());
1818 }
1819
1820 #[test]
1821 fn dispatch_flushdb_clears_all_keys() {
1822 let mut ks = Keyspace::new();
1823 ks.set("a".into(), Bytes::from("1"), None);
1824 ks.set("b".into(), Bytes::from("2"), None);
1825
1826 assert_eq!(ks.len(), 2);
1827
1828 let resp = dispatch(&mut ks, &ShardRequest::FlushDb);
1829 assert!(matches!(resp, ShardResponse::Ok));
1830 assert_eq!(ks.len(), 0);
1831 }
1832
1833 #[test]
1834 fn dispatch_scan_returns_keys() {
1835 let mut ks = Keyspace::new();
1836 ks.set("user:1".into(), Bytes::from("a"), None);
1837 ks.set("user:2".into(), Bytes::from("b"), None);
1838 ks.set("item:1".into(), Bytes::from("c"), None);
1839
1840 let resp = dispatch(
1841 &mut ks,
1842 &ShardRequest::Scan {
1843 cursor: 0,
1844 count: 10,
1845 pattern: None,
1846 },
1847 );
1848
1849 match resp {
1850 ShardResponse::Scan { cursor, keys } => {
1851 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
1853 }
1854 _ => panic!("expected Scan response"),
1855 }
1856 }
1857
1858 #[test]
1859 fn dispatch_scan_with_pattern() {
1860 let mut ks = Keyspace::new();
1861 ks.set("user:1".into(), Bytes::from("a"), None);
1862 ks.set("user:2".into(), Bytes::from("b"), None);
1863 ks.set("item:1".into(), Bytes::from("c"), None);
1864
1865 let resp = dispatch(
1866 &mut ks,
1867 &ShardRequest::Scan {
1868 cursor: 0,
1869 count: 10,
1870 pattern: Some("user:*".into()),
1871 },
1872 );
1873
1874 match resp {
1875 ShardResponse::Scan { cursor, keys } => {
1876 assert_eq!(cursor, 0);
1877 assert_eq!(keys.len(), 2);
1878 for k in &keys {
1879 assert!(k.starts_with("user:"));
1880 }
1881 }
1882 _ => panic!("expected Scan response"),
1883 }
1884 }
1885
1886 #[test]
1887 fn to_aof_record_for_hset() {
1888 let req = ShardRequest::HSet {
1889 key: "h".into(),
1890 fields: vec![("f1".into(), Bytes::from("v1"))],
1891 };
1892 let resp = ShardResponse::Len(1);
1893 let record = to_aof_record(&req, &resp).unwrap();
1894 match record {
1895 AofRecord::HSet { key, fields } => {
1896 assert_eq!(key, "h");
1897 assert_eq!(fields.len(), 1);
1898 }
1899 _ => panic!("expected HSet record"),
1900 }
1901 }
1902
1903 #[test]
1904 fn to_aof_record_for_hdel() {
1905 let req = ShardRequest::HDel {
1906 key: "h".into(),
1907 fields: vec!["f1".into(), "f2".into()],
1908 };
1909 let resp = ShardResponse::HDelLen {
1910 count: 2,
1911 removed: vec!["f1".into(), "f2".into()],
1912 };
1913 let record = to_aof_record(&req, &resp).unwrap();
1914 match record {
1915 AofRecord::HDel { key, fields } => {
1916 assert_eq!(key, "h");
1917 assert_eq!(fields.len(), 2);
1918 }
1919 _ => panic!("expected HDel record"),
1920 }
1921 }
1922
1923 #[test]
1924 fn to_aof_record_skips_hdel_when_none_removed() {
1925 let req = ShardRequest::HDel {
1926 key: "h".into(),
1927 fields: vec!["f1".into()],
1928 };
1929 let resp = ShardResponse::HDelLen {
1930 count: 0,
1931 removed: vec![],
1932 };
1933 assert!(to_aof_record(&req, &resp).is_none());
1934 }
1935
1936 #[test]
1937 fn to_aof_record_for_hincrby() {
1938 let req = ShardRequest::HIncrBy {
1939 key: "h".into(),
1940 field: "counter".into(),
1941 delta: 5,
1942 };
1943 let resp = ShardResponse::Integer(10);
1944 let record = to_aof_record(&req, &resp).unwrap();
1945 match record {
1946 AofRecord::HIncrBy { key, field, delta } => {
1947 assert_eq!(key, "h");
1948 assert_eq!(field, "counter");
1949 assert_eq!(delta, 5);
1950 }
1951 _ => panic!("expected HIncrBy record"),
1952 }
1953 }
1954
1955 #[test]
1956 fn to_aof_record_for_sadd() {
1957 let req = ShardRequest::SAdd {
1958 key: "s".into(),
1959 members: vec!["m1".into(), "m2".into()],
1960 };
1961 let resp = ShardResponse::Len(2);
1962 let record = to_aof_record(&req, &resp).unwrap();
1963 match record {
1964 AofRecord::SAdd { key, members } => {
1965 assert_eq!(key, "s");
1966 assert_eq!(members.len(), 2);
1967 }
1968 _ => panic!("expected SAdd record"),
1969 }
1970 }
1971
1972 #[test]
1973 fn to_aof_record_skips_sadd_when_none_added() {
1974 let req = ShardRequest::SAdd {
1975 key: "s".into(),
1976 members: vec!["m1".into()],
1977 };
1978 let resp = ShardResponse::Len(0);
1979 assert!(to_aof_record(&req, &resp).is_none());
1980 }
1981
1982 #[test]
1983 fn to_aof_record_for_srem() {
1984 let req = ShardRequest::SRem {
1985 key: "s".into(),
1986 members: vec!["m1".into()],
1987 };
1988 let resp = ShardResponse::Len(1);
1989 let record = to_aof_record(&req, &resp).unwrap();
1990 match record {
1991 AofRecord::SRem { key, members } => {
1992 assert_eq!(key, "s");
1993 assert_eq!(members.len(), 1);
1994 }
1995 _ => panic!("expected SRem record"),
1996 }
1997 }
1998
1999 #[test]
2000 fn to_aof_record_skips_srem_when_none_removed() {
2001 let req = ShardRequest::SRem {
2002 key: "s".into(),
2003 members: vec!["m1".into()],
2004 };
2005 let resp = ShardResponse::Len(0);
2006 assert!(to_aof_record(&req, &resp).is_none());
2007 }
2008
2009 #[test]
2010 fn dispatch_keys() {
2011 let mut ks = Keyspace::new();
2012 ks.set("user:1".into(), Bytes::from("a"), None);
2013 ks.set("user:2".into(), Bytes::from("b"), None);
2014 ks.set("item:1".into(), Bytes::from("c"), None);
2015 let resp = dispatch(
2016 &mut ks,
2017 &ShardRequest::Keys {
2018 pattern: "user:*".into(),
2019 },
2020 );
2021 match resp {
2022 ShardResponse::StringArray(mut keys) => {
2023 keys.sort();
2024 assert_eq!(keys, vec!["user:1", "user:2"]);
2025 }
2026 other => panic!("expected StringArray, got {other:?}"),
2027 }
2028 }
2029
2030 #[test]
2031 fn dispatch_rename() {
2032 let mut ks = Keyspace::new();
2033 ks.set("old".into(), Bytes::from("value"), None);
2034 let resp = dispatch(
2035 &mut ks,
2036 &ShardRequest::Rename {
2037 key: "old".into(),
2038 newkey: "new".into(),
2039 },
2040 );
2041 assert!(matches!(resp, ShardResponse::Ok));
2042 assert!(!ks.exists("old"));
2043 assert!(ks.exists("new"));
2044 }
2045
2046 #[test]
2047 fn dispatch_rename_missing_key() {
2048 let mut ks = Keyspace::new();
2049 let resp = dispatch(
2050 &mut ks,
2051 &ShardRequest::Rename {
2052 key: "missing".into(),
2053 newkey: "new".into(),
2054 },
2055 );
2056 assert!(matches!(resp, ShardResponse::Err(_)));
2057 }
2058
2059 #[test]
2060 fn to_aof_record_for_rename() {
2061 let req = ShardRequest::Rename {
2062 key: "old".into(),
2063 newkey: "new".into(),
2064 };
2065 let resp = ShardResponse::Ok;
2066 let record = to_aof_record(&req, &resp).unwrap();
2067 match record {
2068 AofRecord::Rename { key, newkey } => {
2069 assert_eq!(key, "old");
2070 assert_eq!(newkey, "new");
2071 }
2072 other => panic!("expected Rename, got {other:?}"),
2073 }
2074 }
2075}