1use std::path::PathBuf;
9use std::time::Duration;
10
11use bytes::Bytes;
12use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
13use ember_persistence::recovery::{self, RecoveredValue};
14use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
15use tokio::sync::{mpsc, oneshot};
16use tracing::{info, warn};
17
18use crate::dropper::DropHandle;
19use crate::error::ShardError;
20use crate::expiry;
21use crate::keyspace::{
22 IncrError, IncrFloatError, Keyspace, KeyspaceStats, SetResult, ShardConfig, TtlResult,
23 WriteError,
24};
25use crate::types::sorted_set::ZAddFlags;
26use crate::types::Value;
27
28const EXPIRY_TICK: Duration = Duration::from_millis(100);
31
32const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
34
35#[derive(Debug, Clone)]
37pub struct ShardPersistenceConfig {
38 pub data_dir: PathBuf,
40 pub append_only: bool,
42 pub fsync_policy: FsyncPolicy,
44 #[cfg(feature = "encryption")]
47 pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
48}
49
50#[derive(Debug)]
52pub enum ShardRequest {
53 Get {
54 key: String,
55 },
56 Set {
57 key: String,
58 value: Bytes,
59 expire: Option<Duration>,
60 nx: bool,
62 xx: bool,
64 },
65 Incr {
66 key: String,
67 },
68 Decr {
69 key: String,
70 },
71 IncrBy {
72 key: String,
73 delta: i64,
74 },
75 DecrBy {
76 key: String,
77 delta: i64,
78 },
79 IncrByFloat {
80 key: String,
81 delta: f64,
82 },
83 Append {
84 key: String,
85 value: Bytes,
86 },
87 Strlen {
88 key: String,
89 },
90 Keys {
92 pattern: String,
93 },
94 Rename {
96 key: String,
97 newkey: String,
98 },
99 Del {
100 key: String,
101 },
102 Unlink {
104 key: String,
105 },
106 Exists {
107 key: String,
108 },
109 Expire {
110 key: String,
111 seconds: u64,
112 },
113 Ttl {
114 key: String,
115 },
116 Persist {
117 key: String,
118 },
119 Pttl {
120 key: String,
121 },
122 Pexpire {
123 key: String,
124 milliseconds: u64,
125 },
126 LPush {
127 key: String,
128 values: Vec<Bytes>,
129 },
130 RPush {
131 key: String,
132 values: Vec<Bytes>,
133 },
134 LPop {
135 key: String,
136 },
137 RPop {
138 key: String,
139 },
140 LRange {
141 key: String,
142 start: i64,
143 stop: i64,
144 },
145 LLen {
146 key: String,
147 },
148 Type {
149 key: String,
150 },
151 ZAdd {
152 key: String,
153 members: Vec<(f64, String)>,
154 nx: bool,
155 xx: bool,
156 gt: bool,
157 lt: bool,
158 ch: bool,
159 },
160 ZRem {
161 key: String,
162 members: Vec<String>,
163 },
164 ZScore {
165 key: String,
166 member: String,
167 },
168 ZRank {
169 key: String,
170 member: String,
171 },
172 ZCard {
173 key: String,
174 },
175 ZRange {
176 key: String,
177 start: i64,
178 stop: i64,
179 with_scores: bool,
180 },
181 HSet {
182 key: String,
183 fields: Vec<(String, Bytes)>,
184 },
185 HGet {
186 key: String,
187 field: String,
188 },
189 HGetAll {
190 key: String,
191 },
192 HDel {
193 key: String,
194 fields: Vec<String>,
195 },
196 HExists {
197 key: String,
198 field: String,
199 },
200 HLen {
201 key: String,
202 },
203 HIncrBy {
204 key: String,
205 field: String,
206 delta: i64,
207 },
208 HKeys {
209 key: String,
210 },
211 HVals {
212 key: String,
213 },
214 HMGet {
215 key: String,
216 fields: Vec<String>,
217 },
218 SAdd {
219 key: String,
220 members: Vec<String>,
221 },
222 SRem {
223 key: String,
224 members: Vec<String>,
225 },
226 SMembers {
227 key: String,
228 },
229 SIsMember {
230 key: String,
231 member: String,
232 },
233 SCard {
234 key: String,
235 },
236 DbSize,
238 Stats,
240 Snapshot,
242 RewriteAof,
244 FlushDb,
246 FlushDbAsync,
248 Scan {
250 cursor: u64,
251 count: usize,
252 pattern: Option<String>,
253 },
254 CountKeysInSlot {
256 slot: u16,
257 },
258 GetKeysInSlot {
260 slot: u16,
261 count: usize,
262 },
263 #[cfg(feature = "protobuf")]
265 ProtoSet {
266 key: String,
267 type_name: String,
268 data: Bytes,
269 expire: Option<Duration>,
270 nx: bool,
271 xx: bool,
272 },
273 #[cfg(feature = "protobuf")]
275 ProtoGet {
276 key: String,
277 },
278 #[cfg(feature = "protobuf")]
280 ProtoType {
281 key: String,
282 },
283 #[cfg(feature = "protobuf")]
287 ProtoRegisterAof {
288 name: String,
289 descriptor: Bytes,
290 },
291 #[cfg(feature = "protobuf")]
294 ProtoSetField {
295 key: String,
296 field_path: String,
297 value: String,
298 },
299 #[cfg(feature = "protobuf")]
302 ProtoDelField {
303 key: String,
304 field_path: String,
305 },
306}
307
308#[derive(Debug)]
310pub enum ShardResponse {
311 Value(Option<Value>),
313 Ok,
315 Integer(i64),
317 Bool(bool),
319 Ttl(TtlResult),
321 OutOfMemory,
323 KeyCount(usize),
325 Stats(KeyspaceStats),
327 Len(usize),
329 Array(Vec<Bytes>),
331 TypeName(&'static str),
333 ZAddLen {
335 count: usize,
336 applied: Vec<(f64, String)>,
337 },
338 ZRemLen { count: usize, removed: Vec<String> },
340 Score(Option<f64>),
342 Rank(Option<usize>),
344 ScoredArray(Vec<(String, f64)>),
346 BulkString(String),
348 WrongType,
350 Err(String),
352 Scan { cursor: u64, keys: Vec<String> },
354 HashFields(Vec<(String, Bytes)>),
356 HDelLen { count: usize, removed: Vec<String> },
358 StringArray(Vec<String>),
360 OptionalArray(Vec<Option<Bytes>>),
362 #[cfg(feature = "protobuf")]
364 ProtoValue(Option<(String, Bytes, Option<Duration>)>),
365 #[cfg(feature = "protobuf")]
367 ProtoTypeName(Option<String>),
368 #[cfg(feature = "protobuf")]
371 ProtoFieldUpdated {
372 type_name: String,
373 data: Bytes,
374 expire: Option<Duration>,
375 },
376}
377
378#[derive(Debug)]
380pub struct ShardMessage {
381 pub request: ShardRequest,
382 pub reply: oneshot::Sender<ShardResponse>,
383}
384
385#[derive(Debug, Clone)]
390pub struct ShardHandle {
391 tx: mpsc::Sender<ShardMessage>,
392}
393
394impl ShardHandle {
395 pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
399 let rx = self.dispatch(request).await?;
400 rx.await.map_err(|_| ShardError::Unavailable)
401 }
402
403 pub(crate) async fn dispatch(
407 &self,
408 request: ShardRequest,
409 ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
410 let (reply_tx, reply_rx) = oneshot::channel();
411 let msg = ShardMessage {
412 request,
413 reply: reply_tx,
414 };
415 self.tx
416 .send(msg)
417 .await
418 .map_err(|_| ShardError::Unavailable)?;
419 Ok(reply_rx)
420 }
421}
422
423pub fn spawn_shard(
429 buffer: usize,
430 config: ShardConfig,
431 persistence: Option<ShardPersistenceConfig>,
432 drop_handle: Option<DropHandle>,
433 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
434) -> ShardHandle {
435 let (tx, rx) = mpsc::channel(buffer);
436 tokio::spawn(run_shard(
437 rx,
438 config,
439 persistence,
440 drop_handle,
441 #[cfg(feature = "protobuf")]
442 schema_registry,
443 ));
444 ShardHandle { tx }
445}
446
447async fn run_shard(
450 mut rx: mpsc::Receiver<ShardMessage>,
451 config: ShardConfig,
452 persistence: Option<ShardPersistenceConfig>,
453 drop_handle: Option<DropHandle>,
454 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
455) {
456 let shard_id = config.shard_id;
457 let mut keyspace = Keyspace::with_config(config);
458
459 if let Some(handle) = drop_handle.clone() {
460 keyspace.set_drop_handle(handle);
461 }
462
463 if let Some(ref pcfg) = persistence {
465 #[cfg(feature = "encryption")]
466 let result = if let Some(ref key) = pcfg.encryption_key {
467 recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
468 } else {
469 recovery::recover_shard(&pcfg.data_dir, shard_id)
470 };
471 #[cfg(not(feature = "encryption"))]
472 let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
473 let count = result.entries.len();
474 for entry in result.entries {
475 let value = match entry.value {
476 RecoveredValue::String(data) => Value::String(data),
477 RecoveredValue::List(deque) => Value::List(deque),
478 RecoveredValue::SortedSet(members) => {
479 let mut ss = crate::types::sorted_set::SortedSet::new();
480 for (score, member) in members {
481 ss.add(member, score);
482 }
483 Value::SortedSet(ss)
484 }
485 RecoveredValue::Hash(map) => Value::Hash(map),
486 RecoveredValue::Set(set) => Value::Set(set),
487 #[cfg(feature = "protobuf")]
488 RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
489 };
490 keyspace.restore(entry.key, value, entry.ttl);
491 }
492 if count > 0 {
493 info!(
494 shard_id,
495 recovered_keys = count,
496 snapshot = result.loaded_snapshot,
497 aof = result.replayed_aof,
498 "recovered shard state"
499 );
500 }
501
502 #[cfg(feature = "protobuf")]
504 if let Some(ref registry) = schema_registry {
505 if !result.schemas.is_empty() {
506 if let Ok(mut reg) = registry.write() {
507 let schema_count = result.schemas.len();
508 for (name, descriptor) in result.schemas {
509 reg.restore(name, descriptor);
510 }
511 info!(
512 shard_id,
513 schemas = schema_count,
514 "restored schemas from AOF"
515 );
516 }
517 }
518 }
519 }
520
521 let mut aof_writer: Option<AofWriter> = match &persistence {
523 Some(pcfg) if pcfg.append_only => {
524 let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
525 #[cfg(feature = "encryption")]
526 let result = if let Some(ref key) = pcfg.encryption_key {
527 AofWriter::open_encrypted(path, key.clone())
528 } else {
529 AofWriter::open(path)
530 };
531 #[cfg(not(feature = "encryption"))]
532 let result = AofWriter::open(path);
533 match result {
534 Ok(w) => Some(w),
535 Err(e) => {
536 warn!(shard_id, "failed to open AOF writer: {e}");
537 None
538 }
539 }
540 }
541 _ => None,
542 };
543
544 let fsync_policy = persistence
545 .as_ref()
546 .map(|p| p.fsync_policy)
547 .unwrap_or(FsyncPolicy::No);
548
549 let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
551 expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
552
553 let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
554 fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
555
556 loop {
557 tokio::select! {
558 msg = rx.recv() => {
559 match msg {
560 Some(msg) => {
561 let request_kind = describe_request(&msg.request);
562 let response = dispatch(
563 &mut keyspace,
564 &msg.request,
565 #[cfg(feature = "protobuf")]
566 &schema_registry,
567 );
568
569 if let Some(ref mut writer) = aof_writer {
571 if let Some(record) = to_aof_record(&msg.request, &response) {
572 if let Err(e) = writer.write_record(&record) {
573 warn!(shard_id, "aof write failed: {e}");
574 }
575 if fsync_policy == FsyncPolicy::Always {
576 if let Err(e) = writer.sync() {
577 warn!(shard_id, "aof sync failed: {e}");
578 }
579 }
580 }
581 }
582
583 match request_kind {
586 RequestKind::Snapshot => {
587 let resp = handle_snapshot(
588 &keyspace, &persistence, shard_id,
589 );
590 let _ = msg.reply.send(resp);
591 continue;
592 }
593 RequestKind::RewriteAof => {
594 let resp = handle_rewrite(
595 &keyspace,
596 &persistence,
597 &mut aof_writer,
598 shard_id,
599 #[cfg(feature = "protobuf")]
600 &schema_registry,
601 );
602 let _ = msg.reply.send(resp);
603 continue;
604 }
605 RequestKind::FlushDbAsync => {
606 let old_entries = keyspace.flush_async();
607 if let Some(ref handle) = drop_handle {
608 handle.defer_entries(old_entries);
609 }
610 let _ = msg.reply.send(ShardResponse::Ok);
612 continue;
613 }
614 RequestKind::Other => {}
615 }
616
617 let _ = msg.reply.send(response);
618 }
619 None => break, }
621 }
622 _ = expiry_tick.tick() => {
623 expiry::run_expiration_cycle(&mut keyspace);
624 }
625 _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
626 if let Some(ref mut writer) = aof_writer {
627 if let Err(e) = writer.sync() {
628 warn!(shard_id, "periodic aof sync failed: {e}");
629 }
630 }
631 }
632 }
633 }
634
635 if let Some(ref mut writer) = aof_writer {
637 let _ = writer.sync();
638 }
639}
640
641enum RequestKind {
644 Snapshot,
645 RewriteAof,
646 FlushDbAsync,
647 Other,
648}
649
650fn describe_request(req: &ShardRequest) -> RequestKind {
651 match req {
652 ShardRequest::Snapshot => RequestKind::Snapshot,
653 ShardRequest::RewriteAof => RequestKind::RewriteAof,
654 ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
655 _ => RequestKind::Other,
656 }
657}
658
659fn dispatch(
661 ks: &mut Keyspace,
662 req: &ShardRequest,
663 #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
664) -> ShardResponse {
665 match req {
666 ShardRequest::Get { key } => match ks.get(key) {
667 Ok(val) => ShardResponse::Value(val),
668 Err(_) => ShardResponse::WrongType,
669 },
670 ShardRequest::Set {
671 key,
672 value,
673 expire,
674 nx,
675 xx,
676 } => {
677 if *nx && ks.exists(key) {
679 return ShardResponse::Value(None);
680 }
681 if *xx && !ks.exists(key) {
683 return ShardResponse::Value(None);
684 }
685 match ks.set(key.clone(), value.clone(), *expire) {
686 SetResult::Ok => ShardResponse::Ok,
687 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
688 }
689 }
690 ShardRequest::Incr { key } => match ks.incr(key) {
691 Ok(val) => ShardResponse::Integer(val),
692 Err(IncrError::WrongType) => ShardResponse::WrongType,
693 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
694 Err(e) => ShardResponse::Err(e.to_string()),
695 },
696 ShardRequest::Decr { key } => match ks.decr(key) {
697 Ok(val) => ShardResponse::Integer(val),
698 Err(IncrError::WrongType) => ShardResponse::WrongType,
699 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
700 Err(e) => ShardResponse::Err(e.to_string()),
701 },
702 ShardRequest::IncrBy { key, delta } => match ks.incr_by(key, *delta) {
703 Ok(val) => ShardResponse::Integer(val),
704 Err(IncrError::WrongType) => ShardResponse::WrongType,
705 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
706 Err(e) => ShardResponse::Err(e.to_string()),
707 },
708 ShardRequest::DecrBy { key, delta } => match ks.incr_by(key, -delta) {
709 Ok(val) => ShardResponse::Integer(val),
710 Err(IncrError::WrongType) => ShardResponse::WrongType,
711 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
712 Err(e) => ShardResponse::Err(e.to_string()),
713 },
714 ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
715 Ok(val) => ShardResponse::BulkString(val),
716 Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
717 Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
718 Err(e) => ShardResponse::Err(e.to_string()),
719 },
720 ShardRequest::Append { key, value } => match ks.append(key, value) {
721 Ok(len) => ShardResponse::Len(len),
722 Err(WriteError::WrongType) => ShardResponse::WrongType,
723 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
724 },
725 ShardRequest::Strlen { key } => match ks.strlen(key) {
726 Ok(len) => ShardResponse::Len(len),
727 Err(_) => ShardResponse::WrongType,
728 },
729 ShardRequest::Keys { pattern } => {
730 let keys = ks.keys(pattern);
731 ShardResponse::StringArray(keys)
732 }
733 ShardRequest::Rename { key, newkey } => {
734 use crate::keyspace::RenameError;
735 match ks.rename(key, newkey) {
736 Ok(()) => ShardResponse::Ok,
737 Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
738 }
739 }
740 ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
741 ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
742 ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
743 ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
744 ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
745 ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
746 ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
747 ShardRequest::Pexpire { key, milliseconds } => {
748 ShardResponse::Bool(ks.pexpire(key, *milliseconds))
749 }
750 ShardRequest::LPush { key, values } => match ks.lpush(key, values) {
751 Ok(len) => ShardResponse::Len(len),
752 Err(WriteError::WrongType) => ShardResponse::WrongType,
753 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
754 },
755 ShardRequest::RPush { key, values } => match ks.rpush(key, values) {
756 Ok(len) => ShardResponse::Len(len),
757 Err(WriteError::WrongType) => ShardResponse::WrongType,
758 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
759 },
760 ShardRequest::LPop { key } => match ks.lpop(key) {
761 Ok(val) => ShardResponse::Value(val.map(Value::String)),
762 Err(_) => ShardResponse::WrongType,
763 },
764 ShardRequest::RPop { key } => match ks.rpop(key) {
765 Ok(val) => ShardResponse::Value(val.map(Value::String)),
766 Err(_) => ShardResponse::WrongType,
767 },
768 ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
769 Ok(items) => ShardResponse::Array(items),
770 Err(_) => ShardResponse::WrongType,
771 },
772 ShardRequest::LLen { key } => match ks.llen(key) {
773 Ok(len) => ShardResponse::Len(len),
774 Err(_) => ShardResponse::WrongType,
775 },
776 ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
777 ShardRequest::ZAdd {
778 key,
779 members,
780 nx,
781 xx,
782 gt,
783 lt,
784 ch,
785 } => {
786 let flags = ZAddFlags {
787 nx: *nx,
788 xx: *xx,
789 gt: *gt,
790 lt: *lt,
791 ch: *ch,
792 };
793 match ks.zadd(key, members, &flags) {
794 Ok(result) => ShardResponse::ZAddLen {
795 count: result.count,
796 applied: result.applied,
797 },
798 Err(WriteError::WrongType) => ShardResponse::WrongType,
799 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
800 }
801 }
802 ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
803 Ok(removed) => ShardResponse::ZRemLen {
804 count: removed.len(),
805 removed,
806 },
807 Err(_) => ShardResponse::WrongType,
808 },
809 ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
810 Ok(score) => ShardResponse::Score(score),
811 Err(_) => ShardResponse::WrongType,
812 },
813 ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
814 Ok(rank) => ShardResponse::Rank(rank),
815 Err(_) => ShardResponse::WrongType,
816 },
817 ShardRequest::ZCard { key } => match ks.zcard(key) {
818 Ok(len) => ShardResponse::Len(len),
819 Err(_) => ShardResponse::WrongType,
820 },
821 ShardRequest::ZRange {
822 key, start, stop, ..
823 } => match ks.zrange(key, *start, *stop) {
824 Ok(items) => ShardResponse::ScoredArray(items),
825 Err(_) => ShardResponse::WrongType,
826 },
827 ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
828 ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
829 ShardRequest::FlushDb => {
830 ks.clear();
831 ShardResponse::Ok
832 }
833 ShardRequest::Scan {
834 cursor,
835 count,
836 pattern,
837 } => {
838 let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
839 ShardResponse::Scan {
840 cursor: next_cursor,
841 keys,
842 }
843 }
844 ShardRequest::HSet { key, fields } => match ks.hset(key, fields) {
845 Ok(count) => ShardResponse::Len(count),
846 Err(WriteError::WrongType) => ShardResponse::WrongType,
847 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
848 },
849 ShardRequest::HGet { key, field } => match ks.hget(key, field) {
850 Ok(val) => ShardResponse::Value(val.map(Value::String)),
851 Err(_) => ShardResponse::WrongType,
852 },
853 ShardRequest::HGetAll { key } => match ks.hgetall(key) {
854 Ok(fields) => ShardResponse::HashFields(fields),
855 Err(_) => ShardResponse::WrongType,
856 },
857 ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
858 Ok(removed) => ShardResponse::HDelLen {
859 count: removed.len(),
860 removed,
861 },
862 Err(_) => ShardResponse::WrongType,
863 },
864 ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
865 Ok(exists) => ShardResponse::Bool(exists),
866 Err(_) => ShardResponse::WrongType,
867 },
868 ShardRequest::HLen { key } => match ks.hlen(key) {
869 Ok(len) => ShardResponse::Len(len),
870 Err(_) => ShardResponse::WrongType,
871 },
872 ShardRequest::HIncrBy { key, field, delta } => match ks.hincrby(key, field, *delta) {
873 Ok(val) => ShardResponse::Integer(val),
874 Err(IncrError::WrongType) => ShardResponse::WrongType,
875 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
876 Err(e) => ShardResponse::Err(e.to_string()),
877 },
878 ShardRequest::HKeys { key } => match ks.hkeys(key) {
879 Ok(keys) => ShardResponse::StringArray(keys),
880 Err(_) => ShardResponse::WrongType,
881 },
882 ShardRequest::HVals { key } => match ks.hvals(key) {
883 Ok(vals) => ShardResponse::Array(vals),
884 Err(_) => ShardResponse::WrongType,
885 },
886 ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
887 Ok(vals) => ShardResponse::OptionalArray(vals),
888 Err(_) => ShardResponse::WrongType,
889 },
890 ShardRequest::SAdd { key, members } => match ks.sadd(key, members) {
891 Ok(count) => ShardResponse::Len(count),
892 Err(WriteError::WrongType) => ShardResponse::WrongType,
893 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
894 },
895 ShardRequest::SRem { key, members } => match ks.srem(key, members) {
896 Ok(count) => ShardResponse::Len(count),
897 Err(_) => ShardResponse::WrongType,
898 },
899 ShardRequest::SMembers { key } => match ks.smembers(key) {
900 Ok(members) => ShardResponse::StringArray(members),
901 Err(_) => ShardResponse::WrongType,
902 },
903 ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
904 Ok(exists) => ShardResponse::Bool(exists),
905 Err(_) => ShardResponse::WrongType,
906 },
907 ShardRequest::SCard { key } => match ks.scard(key) {
908 Ok(count) => ShardResponse::Len(count),
909 Err(_) => ShardResponse::WrongType,
910 },
911 ShardRequest::CountKeysInSlot { slot } => {
912 ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
913 }
914 ShardRequest::GetKeysInSlot { slot, count } => {
915 ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
916 }
917 #[cfg(feature = "protobuf")]
918 ShardRequest::ProtoSet {
919 key,
920 type_name,
921 data,
922 expire,
923 nx,
924 xx,
925 } => {
926 if *nx && ks.exists(key) {
927 return ShardResponse::Value(None);
928 }
929 if *xx && !ks.exists(key) {
930 return ShardResponse::Value(None);
931 }
932 match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
933 SetResult::Ok => ShardResponse::Ok,
934 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
935 }
936 }
937 #[cfg(feature = "protobuf")]
938 ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
939 Ok(val) => ShardResponse::ProtoValue(val),
940 Err(_) => ShardResponse::WrongType,
941 },
942 #[cfg(feature = "protobuf")]
943 ShardRequest::ProtoType { key } => match ks.proto_type(key) {
944 Ok(name) => ShardResponse::ProtoTypeName(name),
945 Err(_) => ShardResponse::WrongType,
946 },
947 #[cfg(feature = "protobuf")]
950 ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
951 #[cfg(feature = "protobuf")]
952 ShardRequest::ProtoSetField {
953 key,
954 field_path,
955 value,
956 } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
957 let new_data = reg.set_field(type_name, data, field_path, value)?;
958 Ok(ShardResponse::ProtoFieldUpdated {
959 type_name: type_name.to_owned(),
960 data: new_data,
961 expire: ttl,
962 })
963 }),
964 #[cfg(feature = "protobuf")]
965 ShardRequest::ProtoDelField { key, field_path } => {
966 dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
967 let new_data = reg.clear_field(type_name, data, field_path)?;
968 Ok(ShardResponse::ProtoFieldUpdated {
969 type_name: type_name.to_owned(),
970 data: new_data,
971 expire: ttl,
972 })
973 })
974 }
975 ShardRequest::Snapshot | ShardRequest::RewriteAof | ShardRequest::FlushDbAsync => {
977 ShardResponse::Ok
978 }
979 }
980}
981
982#[cfg(feature = "protobuf")]
988fn dispatch_proto_field_op<F>(
989 ks: &mut Keyspace,
990 schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
991 key: &str,
992 mutate: F,
993) -> ShardResponse
994where
995 F: FnOnce(
996 &crate::schema::SchemaRegistry,
997 &str,
998 &[u8],
999 Option<Duration>,
1000 ) -> Result<ShardResponse, crate::schema::SchemaError>,
1001{
1002 let registry = match schema_registry {
1003 Some(r) => r,
1004 None => return ShardResponse::Err("protobuf support is not enabled".into()),
1005 };
1006
1007 let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
1008 Ok(Some(tuple)) => tuple,
1009 Ok(None) => return ShardResponse::Value(None),
1010 Err(_) => return ShardResponse::WrongType,
1011 };
1012
1013 let reg = match registry.read() {
1014 Ok(r) => r,
1015 Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
1016 };
1017
1018 let resp = match mutate(®, &type_name, &data, remaining_ttl) {
1019 Ok(r) => r,
1020 Err(e) => return ShardResponse::Err(e.to_string()),
1021 };
1022
1023 if let ShardResponse::ProtoFieldUpdated {
1025 ref type_name,
1026 ref data,
1027 expire,
1028 } = resp
1029 {
1030 ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
1031 }
1032
1033 resp
1034}
1035
1036fn to_aof_record(req: &ShardRequest, resp: &ShardResponse) -> Option<AofRecord> {
1039 match (req, resp) {
1040 (
1041 ShardRequest::Set {
1042 key, value, expire, ..
1043 },
1044 ShardResponse::Ok,
1045 ) => {
1046 let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1047 Some(AofRecord::Set {
1048 key: key.clone(),
1049 value: value.clone(),
1050 expire_ms,
1051 })
1052 }
1053 (ShardRequest::Del { key }, ShardResponse::Bool(true))
1054 | (ShardRequest::Unlink { key }, ShardResponse::Bool(true)) => {
1055 Some(AofRecord::Del { key: key.clone() })
1056 }
1057 (ShardRequest::Expire { key, seconds }, ShardResponse::Bool(true)) => {
1058 Some(AofRecord::Expire {
1059 key: key.clone(),
1060 seconds: *seconds,
1061 })
1062 }
1063 (ShardRequest::LPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::LPush {
1064 key: key.clone(),
1065 values: values.clone(),
1066 }),
1067 (ShardRequest::RPush { key, values }, ShardResponse::Len(_)) => Some(AofRecord::RPush {
1068 key: key.clone(),
1069 values: values.clone(),
1070 }),
1071 (ShardRequest::LPop { key }, ShardResponse::Value(Some(_))) => {
1072 Some(AofRecord::LPop { key: key.clone() })
1073 }
1074 (ShardRequest::RPop { key }, ShardResponse::Value(Some(_))) => {
1075 Some(AofRecord::RPop { key: key.clone() })
1076 }
1077 (ShardRequest::ZAdd { key, .. }, ShardResponse::ZAddLen { applied, .. })
1078 if !applied.is_empty() =>
1079 {
1080 Some(AofRecord::ZAdd {
1081 key: key.clone(),
1082 members: applied.clone(),
1083 })
1084 }
1085 (ShardRequest::ZRem { key, .. }, ShardResponse::ZRemLen { removed, .. })
1086 if !removed.is_empty() =>
1087 {
1088 Some(AofRecord::ZRem {
1089 key: key.clone(),
1090 members: removed.clone(),
1091 })
1092 }
1093 (ShardRequest::Incr { key }, ShardResponse::Integer(_)) => {
1094 Some(AofRecord::Incr { key: key.clone() })
1095 }
1096 (ShardRequest::Decr { key }, ShardResponse::Integer(_)) => {
1097 Some(AofRecord::Decr { key: key.clone() })
1098 }
1099 (ShardRequest::IncrBy { key, delta }, ShardResponse::Integer(_)) => {
1100 Some(AofRecord::IncrBy {
1101 key: key.clone(),
1102 delta: *delta,
1103 })
1104 }
1105 (ShardRequest::DecrBy { key, delta }, ShardResponse::Integer(_)) => {
1106 Some(AofRecord::DecrBy {
1107 key: key.clone(),
1108 delta: *delta,
1109 })
1110 }
1111 (ShardRequest::IncrByFloat { key, .. }, ShardResponse::BulkString(val)) => {
1114 Some(AofRecord::Set {
1115 key: key.clone(),
1116 value: Bytes::from(val.clone()),
1117 expire_ms: -1,
1118 })
1119 }
1120 (ShardRequest::Append { key, value }, ShardResponse::Len(_)) => Some(AofRecord::Append {
1122 key: key.clone(),
1123 value: value.clone(),
1124 }),
1125 (ShardRequest::Rename { key, newkey }, ShardResponse::Ok) => Some(AofRecord::Rename {
1126 key: key.clone(),
1127 newkey: newkey.clone(),
1128 }),
1129 (ShardRequest::Persist { key }, ShardResponse::Bool(true)) => {
1130 Some(AofRecord::Persist { key: key.clone() })
1131 }
1132 (ShardRequest::Pexpire { key, milliseconds }, ShardResponse::Bool(true)) => {
1133 Some(AofRecord::Pexpire {
1134 key: key.clone(),
1135 milliseconds: *milliseconds,
1136 })
1137 }
1138 (ShardRequest::HSet { key, fields }, ShardResponse::Len(_)) => Some(AofRecord::HSet {
1140 key: key.clone(),
1141 fields: fields.clone(),
1142 }),
1143 (ShardRequest::HDel { key, .. }, ShardResponse::HDelLen { removed, .. })
1144 if !removed.is_empty() =>
1145 {
1146 Some(AofRecord::HDel {
1147 key: key.clone(),
1148 fields: removed.clone(),
1149 })
1150 }
1151 (ShardRequest::HIncrBy { key, field, delta }, ShardResponse::Integer(_)) => {
1152 Some(AofRecord::HIncrBy {
1153 key: key.clone(),
1154 field: field.clone(),
1155 delta: *delta,
1156 })
1157 }
1158 (ShardRequest::SAdd { key, members }, ShardResponse::Len(count)) if *count > 0 => {
1160 Some(AofRecord::SAdd {
1161 key: key.clone(),
1162 members: members.clone(),
1163 })
1164 }
1165 (ShardRequest::SRem { key, members }, ShardResponse::Len(count)) if *count > 0 => {
1166 Some(AofRecord::SRem {
1167 key: key.clone(),
1168 members: members.clone(),
1169 })
1170 }
1171 #[cfg(feature = "protobuf")]
1173 (
1174 ShardRequest::ProtoSet {
1175 key,
1176 type_name,
1177 data,
1178 expire,
1179 ..
1180 },
1181 ShardResponse::Ok,
1182 ) => {
1183 let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1184 Some(AofRecord::ProtoSet {
1185 key: key.clone(),
1186 type_name: type_name.clone(),
1187 data: data.clone(),
1188 expire_ms,
1189 })
1190 }
1191 #[cfg(feature = "protobuf")]
1192 (ShardRequest::ProtoRegisterAof { name, descriptor }, ShardResponse::Ok) => {
1193 Some(AofRecord::ProtoRegister {
1194 name: name.clone(),
1195 descriptor: descriptor.clone(),
1196 })
1197 }
1198 #[cfg(feature = "protobuf")]
1200 (
1201 ShardRequest::ProtoSetField { key, .. } | ShardRequest::ProtoDelField { key, .. },
1202 ShardResponse::ProtoFieldUpdated {
1203 type_name,
1204 data,
1205 expire,
1206 },
1207 ) => {
1208 let expire_ms = expire.map(|d| d.as_millis() as i64).unwrap_or(-1);
1209 Some(AofRecord::ProtoSet {
1210 key: key.clone(),
1211 type_name: type_name.clone(),
1212 data: data.clone(),
1213 expire_ms,
1214 })
1215 }
1216 _ => None,
1217 }
1218}
1219
1220fn handle_snapshot(
1222 keyspace: &Keyspace,
1223 persistence: &Option<ShardPersistenceConfig>,
1224 shard_id: u16,
1225) -> ShardResponse {
1226 let pcfg = match persistence {
1227 Some(p) => p,
1228 None => return ShardResponse::Err("persistence not configured".into()),
1229 };
1230
1231 let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1232 let result = write_snapshot(
1233 keyspace,
1234 &path,
1235 shard_id,
1236 #[cfg(feature = "encryption")]
1237 pcfg.encryption_key.as_ref(),
1238 );
1239 match result {
1240 Ok(count) => {
1241 info!(shard_id, entries = count, "snapshot written");
1242 ShardResponse::Ok
1243 }
1244 Err(e) => {
1245 warn!(shard_id, "snapshot failed: {e}");
1246 ShardResponse::Err(format!("snapshot failed: {e}"))
1247 }
1248 }
1249}
1250
1251fn handle_rewrite(
1256 keyspace: &Keyspace,
1257 persistence: &Option<ShardPersistenceConfig>,
1258 aof_writer: &mut Option<AofWriter>,
1259 shard_id: u16,
1260 #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1261) -> ShardResponse {
1262 let pcfg = match persistence {
1263 Some(p) => p,
1264 None => return ShardResponse::Err("persistence not configured".into()),
1265 };
1266
1267 let path = snapshot::snapshot_path(&pcfg.data_dir, shard_id);
1268 let result = write_snapshot(
1269 keyspace,
1270 &path,
1271 shard_id,
1272 #[cfg(feature = "encryption")]
1273 pcfg.encryption_key.as_ref(),
1274 );
1275 match result {
1276 Ok(count) => {
1277 if let Some(ref mut writer) = aof_writer {
1279 if let Err(e) = writer.truncate() {
1280 warn!(shard_id, "aof truncate after rewrite failed: {e}");
1281 }
1282
1283 #[cfg(feature = "protobuf")]
1285 if let Some(ref registry) = schema_registry {
1286 if let Ok(reg) = registry.read() {
1287 for (name, descriptor) in reg.iter_schemas() {
1288 let record = AofRecord::ProtoRegister {
1289 name: name.to_owned(),
1290 descriptor: descriptor.clone(),
1291 };
1292 if let Err(e) = writer.write_record(&record) {
1293 warn!(shard_id, "failed to re-persist schema after rewrite: {e}");
1294 }
1295 }
1296 }
1297 }
1298
1299 if let Err(e) = writer.sync() {
1301 warn!(shard_id, "aof sync after rewrite failed: {e}");
1302 }
1303 }
1304 info!(shard_id, entries = count, "aof rewrite complete");
1305 ShardResponse::Ok
1306 }
1307 Err(e) => {
1308 warn!(shard_id, "aof rewrite failed: {e}");
1309 ShardResponse::Err(format!("rewrite failed: {e}"))
1310 }
1311 }
1312}
1313
1314fn write_snapshot(
1316 keyspace: &Keyspace,
1317 path: &std::path::Path,
1318 shard_id: u16,
1319 #[cfg(feature = "encryption")] encryption_key: Option<
1320 &ember_persistence::encryption::EncryptionKey,
1321 >,
1322) -> Result<u32, ember_persistence::format::FormatError> {
1323 #[cfg(feature = "encryption")]
1324 let mut writer = if let Some(key) = encryption_key {
1325 SnapshotWriter::create_encrypted(path, shard_id, key.clone())?
1326 } else {
1327 SnapshotWriter::create(path, shard_id)?
1328 };
1329 #[cfg(not(feature = "encryption"))]
1330 let mut writer = SnapshotWriter::create(path, shard_id)?;
1331 let mut count = 0u32;
1332
1333 for (key, value, ttl_ms) in keyspace.iter_entries() {
1334 let snap_value = match value {
1335 Value::String(data) => SnapValue::String(data.clone()),
1336 Value::List(deque) => SnapValue::List(deque.clone()),
1337 Value::SortedSet(ss) => {
1338 let members: Vec<(f64, String)> = ss
1339 .iter()
1340 .map(|(member, score)| (score, member.to_owned()))
1341 .collect();
1342 SnapValue::SortedSet(members)
1343 }
1344 Value::Hash(map) => SnapValue::Hash(map.clone()),
1345 Value::Set(set) => SnapValue::Set(set.clone()),
1346 #[cfg(feature = "protobuf")]
1347 Value::Proto { type_name, data } => SnapValue::Proto {
1348 type_name: type_name.clone(),
1349 data: data.clone(),
1350 },
1351 };
1352 writer.write_entry(&SnapEntry {
1353 key: key.to_owned(),
1354 value: snap_value,
1355 expire_ms: ttl_ms,
1356 })?;
1357 count += 1;
1358 }
1359
1360 writer.finish()?;
1361 Ok(count)
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use super::*;
1367
1368 fn test_dispatch(ks: &mut Keyspace, req: &ShardRequest) -> ShardResponse {
1370 dispatch(
1371 ks,
1372 req,
1373 #[cfg(feature = "protobuf")]
1374 &None,
1375 )
1376 }
1377
1378 #[test]
1379 fn dispatch_set_and_get() {
1380 let mut ks = Keyspace::new();
1381
1382 let resp = test_dispatch(
1383 &mut ks,
1384 &ShardRequest::Set {
1385 key: "k".into(),
1386 value: Bytes::from("v"),
1387 expire: None,
1388 nx: false,
1389 xx: false,
1390 },
1391 );
1392 assert!(matches!(resp, ShardResponse::Ok));
1393
1394 let resp = test_dispatch(&mut ks, &ShardRequest::Get { key: "k".into() });
1395 match resp {
1396 ShardResponse::Value(Some(Value::String(data))) => {
1397 assert_eq!(data, Bytes::from("v"));
1398 }
1399 other => panic!("expected Value(Some(String)), got {other:?}"),
1400 }
1401 }
1402
1403 #[test]
1404 fn dispatch_get_missing() {
1405 let mut ks = Keyspace::new();
1406 let resp = test_dispatch(&mut ks, &ShardRequest::Get { key: "nope".into() });
1407 assert!(matches!(resp, ShardResponse::Value(None)));
1408 }
1409
1410 #[test]
1411 fn dispatch_del() {
1412 let mut ks = Keyspace::new();
1413 ks.set("key".into(), Bytes::from("val"), None);
1414
1415 let resp = test_dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1416 assert!(matches!(resp, ShardResponse::Bool(true)));
1417
1418 let resp = test_dispatch(&mut ks, &ShardRequest::Del { key: "key".into() });
1419 assert!(matches!(resp, ShardResponse::Bool(false)));
1420 }
1421
1422 #[test]
1423 fn dispatch_exists() {
1424 let mut ks = Keyspace::new();
1425 ks.set("yes".into(), Bytes::from("here"), None);
1426
1427 let resp = test_dispatch(&mut ks, &ShardRequest::Exists { key: "yes".into() });
1428 assert!(matches!(resp, ShardResponse::Bool(true)));
1429
1430 let resp = test_dispatch(&mut ks, &ShardRequest::Exists { key: "no".into() });
1431 assert!(matches!(resp, ShardResponse::Bool(false)));
1432 }
1433
1434 #[test]
1435 fn dispatch_expire_and_ttl() {
1436 let mut ks = Keyspace::new();
1437 ks.set("key".into(), Bytes::from("val"), None);
1438
1439 let resp = test_dispatch(
1440 &mut ks,
1441 &ShardRequest::Expire {
1442 key: "key".into(),
1443 seconds: 60,
1444 },
1445 );
1446 assert!(matches!(resp, ShardResponse::Bool(true)));
1447
1448 let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1449 match resp {
1450 ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
1451 other => panic!("expected Ttl(Seconds), got {other:?}"),
1452 }
1453 }
1454
1455 #[test]
1456 fn dispatch_ttl_missing() {
1457 let mut ks = Keyspace::new();
1458 let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "gone".into() });
1459 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1460 }
1461
1462 #[tokio::test]
1463 async fn shard_round_trip() {
1464 let handle = spawn_shard(
1465 16,
1466 ShardConfig::default(),
1467 None,
1468 None,
1469 #[cfg(feature = "protobuf")]
1470 None,
1471 );
1472
1473 let resp = handle
1474 .send(ShardRequest::Set {
1475 key: "hello".into(),
1476 value: Bytes::from("world"),
1477 expire: None,
1478 nx: false,
1479 xx: false,
1480 })
1481 .await
1482 .unwrap();
1483 assert!(matches!(resp, ShardResponse::Ok));
1484
1485 let resp = handle
1486 .send(ShardRequest::Get {
1487 key: "hello".into(),
1488 })
1489 .await
1490 .unwrap();
1491 match resp {
1492 ShardResponse::Value(Some(Value::String(data))) => {
1493 assert_eq!(data, Bytes::from("world"));
1494 }
1495 other => panic!("expected Value(Some(String)), got {other:?}"),
1496 }
1497 }
1498
1499 #[tokio::test]
1500 async fn expired_key_through_shard() {
1501 let handle = spawn_shard(
1502 16,
1503 ShardConfig::default(),
1504 None,
1505 None,
1506 #[cfg(feature = "protobuf")]
1507 None,
1508 );
1509
1510 handle
1511 .send(ShardRequest::Set {
1512 key: "temp".into(),
1513 value: Bytes::from("gone"),
1514 expire: Some(Duration::from_millis(10)),
1515 nx: false,
1516 xx: false,
1517 })
1518 .await
1519 .unwrap();
1520
1521 tokio::time::sleep(Duration::from_millis(30)).await;
1522
1523 let resp = handle
1524 .send(ShardRequest::Get { key: "temp".into() })
1525 .await
1526 .unwrap();
1527 assert!(matches!(resp, ShardResponse::Value(None)));
1528 }
1529
1530 #[tokio::test]
1531 async fn active_expiration_cleans_up_without_access() {
1532 let handle = spawn_shard(
1533 16,
1534 ShardConfig::default(),
1535 None,
1536 None,
1537 #[cfg(feature = "protobuf")]
1538 None,
1539 );
1540
1541 handle
1543 .send(ShardRequest::Set {
1544 key: "ephemeral".into(),
1545 value: Bytes::from("temp"),
1546 expire: Some(Duration::from_millis(10)),
1547 nx: false,
1548 xx: false,
1549 })
1550 .await
1551 .unwrap();
1552
1553 handle
1555 .send(ShardRequest::Set {
1556 key: "persistent".into(),
1557 value: Bytes::from("stays"),
1558 expire: None,
1559 nx: false,
1560 xx: false,
1561 })
1562 .await
1563 .unwrap();
1564
1565 tokio::time::sleep(Duration::from_millis(250)).await;
1568
1569 let resp = handle
1571 .send(ShardRequest::Exists {
1572 key: "ephemeral".into(),
1573 })
1574 .await
1575 .unwrap();
1576 assert!(matches!(resp, ShardResponse::Bool(false)));
1577
1578 let resp = handle
1580 .send(ShardRequest::Exists {
1581 key: "persistent".into(),
1582 })
1583 .await
1584 .unwrap();
1585 assert!(matches!(resp, ShardResponse::Bool(true)));
1586 }
1587
1588 #[tokio::test]
1589 async fn shard_with_persistence_snapshot_and_recovery() {
1590 let dir = tempfile::tempdir().unwrap();
1591 let pcfg = ShardPersistenceConfig {
1592 data_dir: dir.path().to_owned(),
1593 append_only: true,
1594 fsync_policy: FsyncPolicy::Always,
1595 #[cfg(feature = "encryption")]
1596 encryption_key: None,
1597 };
1598 let config = ShardConfig {
1599 shard_id: 0,
1600 ..ShardConfig::default()
1601 };
1602
1603 {
1605 let handle = spawn_shard(
1606 16,
1607 config.clone(),
1608 Some(pcfg.clone()),
1609 None,
1610 #[cfg(feature = "protobuf")]
1611 None,
1612 );
1613 handle
1614 .send(ShardRequest::Set {
1615 key: "a".into(),
1616 value: Bytes::from("1"),
1617 expire: None,
1618 nx: false,
1619 xx: false,
1620 })
1621 .await
1622 .unwrap();
1623 handle
1624 .send(ShardRequest::Set {
1625 key: "b".into(),
1626 value: Bytes::from("2"),
1627 expire: Some(Duration::from_secs(300)),
1628 nx: false,
1629 xx: false,
1630 })
1631 .await
1632 .unwrap();
1633 handle.send(ShardRequest::Snapshot).await.unwrap();
1634 handle
1636 .send(ShardRequest::Set {
1637 key: "c".into(),
1638 value: Bytes::from("3"),
1639 expire: None,
1640 nx: false,
1641 xx: false,
1642 })
1643 .await
1644 .unwrap();
1645 }
1647
1648 tokio::time::sleep(Duration::from_millis(50)).await;
1650
1651 {
1653 let handle = spawn_shard(
1654 16,
1655 config,
1656 Some(pcfg),
1657 None,
1658 #[cfg(feature = "protobuf")]
1659 None,
1660 );
1661 tokio::time::sleep(Duration::from_millis(50)).await;
1663
1664 let resp = handle
1665 .send(ShardRequest::Get { key: "a".into() })
1666 .await
1667 .unwrap();
1668 match resp {
1669 ShardResponse::Value(Some(Value::String(data))) => {
1670 assert_eq!(data, Bytes::from("1"));
1671 }
1672 other => panic!("expected a=1, got {other:?}"),
1673 }
1674
1675 let resp = handle
1676 .send(ShardRequest::Get { key: "b".into() })
1677 .await
1678 .unwrap();
1679 assert!(matches!(resp, ShardResponse::Value(Some(_))));
1680
1681 let resp = handle
1682 .send(ShardRequest::Get { key: "c".into() })
1683 .await
1684 .unwrap();
1685 match resp {
1686 ShardResponse::Value(Some(Value::String(data))) => {
1687 assert_eq!(data, Bytes::from("3"));
1688 }
1689 other => panic!("expected c=3, got {other:?}"),
1690 }
1691 }
1692 }
1693
1694 #[test]
1695 fn to_aof_record_for_set() {
1696 let req = ShardRequest::Set {
1697 key: "k".into(),
1698 value: Bytes::from("v"),
1699 expire: Some(Duration::from_secs(60)),
1700 nx: false,
1701 xx: false,
1702 };
1703 let resp = ShardResponse::Ok;
1704 let record = to_aof_record(&req, &resp).unwrap();
1705 match record {
1706 AofRecord::Set { key, expire_ms, .. } => {
1707 assert_eq!(key, "k");
1708 assert_eq!(expire_ms, 60_000);
1709 }
1710 other => panic!("expected Set, got {other:?}"),
1711 }
1712 }
1713
1714 #[test]
1715 fn to_aof_record_skips_failed_set() {
1716 let req = ShardRequest::Set {
1717 key: "k".into(),
1718 value: Bytes::from("v"),
1719 expire: None,
1720 nx: false,
1721 xx: false,
1722 };
1723 let resp = ShardResponse::OutOfMemory;
1724 assert!(to_aof_record(&req, &resp).is_none());
1725 }
1726
1727 #[test]
1728 fn to_aof_record_for_del() {
1729 let req = ShardRequest::Del { key: "k".into() };
1730 let resp = ShardResponse::Bool(true);
1731 let record = to_aof_record(&req, &resp).unwrap();
1732 assert!(matches!(record, AofRecord::Del { .. }));
1733 }
1734
1735 #[test]
1736 fn to_aof_record_skips_failed_del() {
1737 let req = ShardRequest::Del { key: "k".into() };
1738 let resp = ShardResponse::Bool(false);
1739 assert!(to_aof_record(&req, &resp).is_none());
1740 }
1741
1742 #[test]
1743 fn dispatch_incr_new_key() {
1744 let mut ks = Keyspace::new();
1745 let resp = test_dispatch(&mut ks, &ShardRequest::Incr { key: "c".into() });
1746 assert!(matches!(resp, ShardResponse::Integer(1)));
1747 }
1748
1749 #[test]
1750 fn dispatch_decr_existing() {
1751 let mut ks = Keyspace::new();
1752 ks.set("n".into(), Bytes::from("10"), None);
1753 let resp = test_dispatch(&mut ks, &ShardRequest::Decr { key: "n".into() });
1754 assert!(matches!(resp, ShardResponse::Integer(9)));
1755 }
1756
1757 #[test]
1758 fn dispatch_incr_non_integer() {
1759 let mut ks = Keyspace::new();
1760 ks.set("s".into(), Bytes::from("hello"), None);
1761 let resp = test_dispatch(&mut ks, &ShardRequest::Incr { key: "s".into() });
1762 assert!(matches!(resp, ShardResponse::Err(_)));
1763 }
1764
1765 #[test]
1766 fn dispatch_incrby() {
1767 let mut ks = Keyspace::new();
1768 ks.set("n".into(), Bytes::from("10"), None);
1769 let resp = test_dispatch(
1770 &mut ks,
1771 &ShardRequest::IncrBy {
1772 key: "n".into(),
1773 delta: 5,
1774 },
1775 );
1776 assert!(matches!(resp, ShardResponse::Integer(15)));
1777 }
1778
1779 #[test]
1780 fn dispatch_decrby() {
1781 let mut ks = Keyspace::new();
1782 ks.set("n".into(), Bytes::from("10"), None);
1783 let resp = test_dispatch(
1784 &mut ks,
1785 &ShardRequest::DecrBy {
1786 key: "n".into(),
1787 delta: 3,
1788 },
1789 );
1790 assert!(matches!(resp, ShardResponse::Integer(7)));
1791 }
1792
1793 #[test]
1794 fn dispatch_incrby_new_key() {
1795 let mut ks = Keyspace::new();
1796 let resp = test_dispatch(
1797 &mut ks,
1798 &ShardRequest::IncrBy {
1799 key: "new".into(),
1800 delta: 42,
1801 },
1802 );
1803 assert!(matches!(resp, ShardResponse::Integer(42)));
1804 }
1805
1806 #[test]
1807 fn dispatch_incrbyfloat() {
1808 let mut ks = Keyspace::new();
1809 ks.set("n".into(), Bytes::from("10.5"), None);
1810 let resp = test_dispatch(
1811 &mut ks,
1812 &ShardRequest::IncrByFloat {
1813 key: "n".into(),
1814 delta: 2.3,
1815 },
1816 );
1817 match resp {
1818 ShardResponse::BulkString(val) => {
1819 let f: f64 = val.parse().unwrap();
1820 assert!((f - 12.8).abs() < 0.001);
1821 }
1822 other => panic!("expected BulkString, got {other:?}"),
1823 }
1824 }
1825
1826 #[test]
1827 fn dispatch_append() {
1828 let mut ks = Keyspace::new();
1829 ks.set("k".into(), Bytes::from("hello"), None);
1830 let resp = test_dispatch(
1831 &mut ks,
1832 &ShardRequest::Append {
1833 key: "k".into(),
1834 value: Bytes::from(" world"),
1835 },
1836 );
1837 assert!(matches!(resp, ShardResponse::Len(11)));
1838 }
1839
1840 #[test]
1841 fn dispatch_strlen() {
1842 let mut ks = Keyspace::new();
1843 ks.set("k".into(), Bytes::from("hello"), None);
1844 let resp = test_dispatch(&mut ks, &ShardRequest::Strlen { key: "k".into() });
1845 assert!(matches!(resp, ShardResponse::Len(5)));
1846 }
1847
1848 #[test]
1849 fn dispatch_strlen_missing() {
1850 let mut ks = Keyspace::new();
1851 let resp = test_dispatch(&mut ks, &ShardRequest::Strlen { key: "nope".into() });
1852 assert!(matches!(resp, ShardResponse::Len(0)));
1853 }
1854
1855 #[test]
1856 fn to_aof_record_for_append() {
1857 let req = ShardRequest::Append {
1858 key: "k".into(),
1859 value: Bytes::from("data"),
1860 };
1861 let resp = ShardResponse::Len(10);
1862 let record = to_aof_record(&req, &resp).unwrap();
1863 match record {
1864 AofRecord::Append { key, value } => {
1865 assert_eq!(key, "k");
1866 assert_eq!(value, Bytes::from("data"));
1867 }
1868 other => panic!("expected Append, got {other:?}"),
1869 }
1870 }
1871
1872 #[test]
1873 fn dispatch_incrbyfloat_new_key() {
1874 let mut ks = Keyspace::new();
1875 let resp = test_dispatch(
1876 &mut ks,
1877 &ShardRequest::IncrByFloat {
1878 key: "new".into(),
1879 delta: 2.72,
1880 },
1881 );
1882 match resp {
1883 ShardResponse::BulkString(val) => {
1884 let f: f64 = val.parse().unwrap();
1885 assert!((f - 2.72).abs() < 0.001);
1886 }
1887 other => panic!("expected BulkString, got {other:?}"),
1888 }
1889 }
1890
1891 #[test]
1892 fn to_aof_record_for_incr() {
1893 let req = ShardRequest::Incr { key: "c".into() };
1894 let resp = ShardResponse::Integer(1);
1895 let record = to_aof_record(&req, &resp).unwrap();
1896 assert!(matches!(record, AofRecord::Incr { .. }));
1897 }
1898
1899 #[test]
1900 fn to_aof_record_for_decr() {
1901 let req = ShardRequest::Decr { key: "c".into() };
1902 let resp = ShardResponse::Integer(-1);
1903 let record = to_aof_record(&req, &resp).unwrap();
1904 assert!(matches!(record, AofRecord::Decr { .. }));
1905 }
1906
1907 #[test]
1908 fn to_aof_record_for_incrby() {
1909 let req = ShardRequest::IncrBy {
1910 key: "c".into(),
1911 delta: 5,
1912 };
1913 let resp = ShardResponse::Integer(15);
1914 let record = to_aof_record(&req, &resp).unwrap();
1915 match record {
1916 AofRecord::IncrBy { key, delta } => {
1917 assert_eq!(key, "c");
1918 assert_eq!(delta, 5);
1919 }
1920 other => panic!("expected IncrBy, got {other:?}"),
1921 }
1922 }
1923
1924 #[test]
1925 fn to_aof_record_for_decrby() {
1926 let req = ShardRequest::DecrBy {
1927 key: "c".into(),
1928 delta: 3,
1929 };
1930 let resp = ShardResponse::Integer(7);
1931 let record = to_aof_record(&req, &resp).unwrap();
1932 match record {
1933 AofRecord::DecrBy { key, delta } => {
1934 assert_eq!(key, "c");
1935 assert_eq!(delta, 3);
1936 }
1937 other => panic!("expected DecrBy, got {other:?}"),
1938 }
1939 }
1940
1941 #[test]
1942 fn dispatch_persist_removes_ttl() {
1943 let mut ks = Keyspace::new();
1944 ks.set(
1945 "key".into(),
1946 Bytes::from("val"),
1947 Some(Duration::from_secs(60)),
1948 );
1949
1950 let resp = test_dispatch(&mut ks, &ShardRequest::Persist { key: "key".into() });
1951 assert!(matches!(resp, ShardResponse::Bool(true)));
1952
1953 let resp = test_dispatch(&mut ks, &ShardRequest::Ttl { key: "key".into() });
1954 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
1955 }
1956
1957 #[test]
1958 fn dispatch_persist_missing_key() {
1959 let mut ks = Keyspace::new();
1960 let resp = test_dispatch(&mut ks, &ShardRequest::Persist { key: "nope".into() });
1961 assert!(matches!(resp, ShardResponse::Bool(false)));
1962 }
1963
1964 #[test]
1965 fn dispatch_pttl() {
1966 let mut ks = Keyspace::new();
1967 ks.set(
1968 "key".into(),
1969 Bytes::from("val"),
1970 Some(Duration::from_secs(60)),
1971 );
1972
1973 let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
1974 match resp {
1975 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
1976 assert!(ms > 59_000 && ms <= 60_000);
1977 }
1978 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
1979 }
1980 }
1981
1982 #[test]
1983 fn dispatch_pttl_missing() {
1984 let mut ks = Keyspace::new();
1985 let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "nope".into() });
1986 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
1987 }
1988
1989 #[test]
1990 fn dispatch_pexpire() {
1991 let mut ks = Keyspace::new();
1992 ks.set("key".into(), Bytes::from("val"), None);
1993
1994 let resp = test_dispatch(
1995 &mut ks,
1996 &ShardRequest::Pexpire {
1997 key: "key".into(),
1998 milliseconds: 5000,
1999 },
2000 );
2001 assert!(matches!(resp, ShardResponse::Bool(true)));
2002
2003 let resp = test_dispatch(&mut ks, &ShardRequest::Pttl { key: "key".into() });
2004 match resp {
2005 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2006 assert!(ms > 4000 && ms <= 5000);
2007 }
2008 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2009 }
2010 }
2011
2012 #[test]
2013 fn to_aof_record_for_persist() {
2014 let req = ShardRequest::Persist { key: "k".into() };
2015 let resp = ShardResponse::Bool(true);
2016 let record = to_aof_record(&req, &resp).unwrap();
2017 assert!(matches!(record, AofRecord::Persist { .. }));
2018 }
2019
2020 #[test]
2021 fn to_aof_record_skips_failed_persist() {
2022 let req = ShardRequest::Persist { key: "k".into() };
2023 let resp = ShardResponse::Bool(false);
2024 assert!(to_aof_record(&req, &resp).is_none());
2025 }
2026
2027 #[test]
2028 fn to_aof_record_for_pexpire() {
2029 let req = ShardRequest::Pexpire {
2030 key: "k".into(),
2031 milliseconds: 5000,
2032 };
2033 let resp = ShardResponse::Bool(true);
2034 let record = to_aof_record(&req, &resp).unwrap();
2035 match record {
2036 AofRecord::Pexpire { key, milliseconds } => {
2037 assert_eq!(key, "k");
2038 assert_eq!(milliseconds, 5000);
2039 }
2040 other => panic!("expected Pexpire, got {other:?}"),
2041 }
2042 }
2043
2044 #[test]
2045 fn to_aof_record_skips_failed_pexpire() {
2046 let req = ShardRequest::Pexpire {
2047 key: "k".into(),
2048 milliseconds: 5000,
2049 };
2050 let resp = ShardResponse::Bool(false);
2051 assert!(to_aof_record(&req, &resp).is_none());
2052 }
2053
2054 #[test]
2055 fn dispatch_set_nx_when_key_missing() {
2056 let mut ks = Keyspace::new();
2057 let resp = test_dispatch(
2058 &mut ks,
2059 &ShardRequest::Set {
2060 key: "k".into(),
2061 value: Bytes::from("v"),
2062 expire: None,
2063 nx: true,
2064 xx: false,
2065 },
2066 );
2067 assert!(matches!(resp, ShardResponse::Ok));
2068 assert!(ks.exists("k"));
2069 }
2070
2071 #[test]
2072 fn dispatch_set_nx_when_key_exists() {
2073 let mut ks = Keyspace::new();
2074 ks.set("k".into(), Bytes::from("old"), None);
2075
2076 let resp = test_dispatch(
2077 &mut ks,
2078 &ShardRequest::Set {
2079 key: "k".into(),
2080 value: Bytes::from("new"),
2081 expire: None,
2082 nx: true,
2083 xx: false,
2084 },
2085 );
2086 assert!(matches!(resp, ShardResponse::Value(None)));
2088 match ks.get("k").unwrap() {
2090 Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2091 other => panic!("expected old value, got {other:?}"),
2092 }
2093 }
2094
2095 #[test]
2096 fn dispatch_set_xx_when_key_exists() {
2097 let mut ks = Keyspace::new();
2098 ks.set("k".into(), Bytes::from("old"), None);
2099
2100 let resp = test_dispatch(
2101 &mut ks,
2102 &ShardRequest::Set {
2103 key: "k".into(),
2104 value: Bytes::from("new"),
2105 expire: None,
2106 nx: false,
2107 xx: true,
2108 },
2109 );
2110 assert!(matches!(resp, ShardResponse::Ok));
2111 match ks.get("k").unwrap() {
2112 Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2113 other => panic!("expected new value, got {other:?}"),
2114 }
2115 }
2116
2117 #[test]
2118 fn dispatch_set_xx_when_key_missing() {
2119 let mut ks = Keyspace::new();
2120 let resp = test_dispatch(
2121 &mut ks,
2122 &ShardRequest::Set {
2123 key: "k".into(),
2124 value: Bytes::from("v"),
2125 expire: None,
2126 nx: false,
2127 xx: true,
2128 },
2129 );
2130 assert!(matches!(resp, ShardResponse::Value(None)));
2132 assert!(!ks.exists("k"));
2133 }
2134
2135 #[test]
2136 fn to_aof_record_skips_nx_blocked_set() {
2137 let req = ShardRequest::Set {
2138 key: "k".into(),
2139 value: Bytes::from("v"),
2140 expire: None,
2141 nx: true,
2142 xx: false,
2143 };
2144 let resp = ShardResponse::Value(None);
2146 assert!(to_aof_record(&req, &resp).is_none());
2147 }
2148
2149 #[test]
2150 fn dispatch_flushdb_clears_all_keys() {
2151 let mut ks = Keyspace::new();
2152 ks.set("a".into(), Bytes::from("1"), None);
2153 ks.set("b".into(), Bytes::from("2"), None);
2154
2155 assert_eq!(ks.len(), 2);
2156
2157 let resp = test_dispatch(&mut ks, &ShardRequest::FlushDb);
2158 assert!(matches!(resp, ShardResponse::Ok));
2159 assert_eq!(ks.len(), 0);
2160 }
2161
2162 #[test]
2163 fn dispatch_scan_returns_keys() {
2164 let mut ks = Keyspace::new();
2165 ks.set("user:1".into(), Bytes::from("a"), None);
2166 ks.set("user:2".into(), Bytes::from("b"), None);
2167 ks.set("item:1".into(), Bytes::from("c"), None);
2168
2169 let resp = test_dispatch(
2170 &mut ks,
2171 &ShardRequest::Scan {
2172 cursor: 0,
2173 count: 10,
2174 pattern: None,
2175 },
2176 );
2177
2178 match resp {
2179 ShardResponse::Scan { cursor, keys } => {
2180 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
2182 }
2183 _ => panic!("expected Scan response"),
2184 }
2185 }
2186
2187 #[test]
2188 fn dispatch_scan_with_pattern() {
2189 let mut ks = Keyspace::new();
2190 ks.set("user:1".into(), Bytes::from("a"), None);
2191 ks.set("user:2".into(), Bytes::from("b"), None);
2192 ks.set("item:1".into(), Bytes::from("c"), None);
2193
2194 let resp = test_dispatch(
2195 &mut ks,
2196 &ShardRequest::Scan {
2197 cursor: 0,
2198 count: 10,
2199 pattern: Some("user:*".into()),
2200 },
2201 );
2202
2203 match resp {
2204 ShardResponse::Scan { cursor, keys } => {
2205 assert_eq!(cursor, 0);
2206 assert_eq!(keys.len(), 2);
2207 for k in &keys {
2208 assert!(k.starts_with("user:"));
2209 }
2210 }
2211 _ => panic!("expected Scan response"),
2212 }
2213 }
2214
2215 #[test]
2216 fn to_aof_record_for_hset() {
2217 let req = ShardRequest::HSet {
2218 key: "h".into(),
2219 fields: vec![("f1".into(), Bytes::from("v1"))],
2220 };
2221 let resp = ShardResponse::Len(1);
2222 let record = to_aof_record(&req, &resp).unwrap();
2223 match record {
2224 AofRecord::HSet { key, fields } => {
2225 assert_eq!(key, "h");
2226 assert_eq!(fields.len(), 1);
2227 }
2228 _ => panic!("expected HSet record"),
2229 }
2230 }
2231
2232 #[test]
2233 fn to_aof_record_for_hdel() {
2234 let req = ShardRequest::HDel {
2235 key: "h".into(),
2236 fields: vec!["f1".into(), "f2".into()],
2237 };
2238 let resp = ShardResponse::HDelLen {
2239 count: 2,
2240 removed: vec!["f1".into(), "f2".into()],
2241 };
2242 let record = to_aof_record(&req, &resp).unwrap();
2243 match record {
2244 AofRecord::HDel { key, fields } => {
2245 assert_eq!(key, "h");
2246 assert_eq!(fields.len(), 2);
2247 }
2248 _ => panic!("expected HDel record"),
2249 }
2250 }
2251
2252 #[test]
2253 fn to_aof_record_skips_hdel_when_none_removed() {
2254 let req = ShardRequest::HDel {
2255 key: "h".into(),
2256 fields: vec!["f1".into()],
2257 };
2258 let resp = ShardResponse::HDelLen {
2259 count: 0,
2260 removed: vec![],
2261 };
2262 assert!(to_aof_record(&req, &resp).is_none());
2263 }
2264
2265 #[test]
2266 fn to_aof_record_for_hincrby() {
2267 let req = ShardRequest::HIncrBy {
2268 key: "h".into(),
2269 field: "counter".into(),
2270 delta: 5,
2271 };
2272 let resp = ShardResponse::Integer(10);
2273 let record = to_aof_record(&req, &resp).unwrap();
2274 match record {
2275 AofRecord::HIncrBy { key, field, delta } => {
2276 assert_eq!(key, "h");
2277 assert_eq!(field, "counter");
2278 assert_eq!(delta, 5);
2279 }
2280 _ => panic!("expected HIncrBy record"),
2281 }
2282 }
2283
2284 #[test]
2285 fn to_aof_record_for_sadd() {
2286 let req = ShardRequest::SAdd {
2287 key: "s".into(),
2288 members: vec!["m1".into(), "m2".into()],
2289 };
2290 let resp = ShardResponse::Len(2);
2291 let record = to_aof_record(&req, &resp).unwrap();
2292 match record {
2293 AofRecord::SAdd { key, members } => {
2294 assert_eq!(key, "s");
2295 assert_eq!(members.len(), 2);
2296 }
2297 _ => panic!("expected SAdd record"),
2298 }
2299 }
2300
2301 #[test]
2302 fn to_aof_record_skips_sadd_when_none_added() {
2303 let req = ShardRequest::SAdd {
2304 key: "s".into(),
2305 members: vec!["m1".into()],
2306 };
2307 let resp = ShardResponse::Len(0);
2308 assert!(to_aof_record(&req, &resp).is_none());
2309 }
2310
2311 #[test]
2312 fn to_aof_record_for_srem() {
2313 let req = ShardRequest::SRem {
2314 key: "s".into(),
2315 members: vec!["m1".into()],
2316 };
2317 let resp = ShardResponse::Len(1);
2318 let record = to_aof_record(&req, &resp).unwrap();
2319 match record {
2320 AofRecord::SRem { key, members } => {
2321 assert_eq!(key, "s");
2322 assert_eq!(members.len(), 1);
2323 }
2324 _ => panic!("expected SRem record"),
2325 }
2326 }
2327
2328 #[test]
2329 fn to_aof_record_skips_srem_when_none_removed() {
2330 let req = ShardRequest::SRem {
2331 key: "s".into(),
2332 members: vec!["m1".into()],
2333 };
2334 let resp = ShardResponse::Len(0);
2335 assert!(to_aof_record(&req, &resp).is_none());
2336 }
2337
2338 #[test]
2339 fn dispatch_keys() {
2340 let mut ks = Keyspace::new();
2341 ks.set("user:1".into(), Bytes::from("a"), None);
2342 ks.set("user:2".into(), Bytes::from("b"), None);
2343 ks.set("item:1".into(), Bytes::from("c"), None);
2344 let resp = test_dispatch(
2345 &mut ks,
2346 &ShardRequest::Keys {
2347 pattern: "user:*".into(),
2348 },
2349 );
2350 match resp {
2351 ShardResponse::StringArray(mut keys) => {
2352 keys.sort();
2353 assert_eq!(keys, vec!["user:1", "user:2"]);
2354 }
2355 other => panic!("expected StringArray, got {other:?}"),
2356 }
2357 }
2358
2359 #[test]
2360 fn dispatch_rename() {
2361 let mut ks = Keyspace::new();
2362 ks.set("old".into(), Bytes::from("value"), None);
2363 let resp = test_dispatch(
2364 &mut ks,
2365 &ShardRequest::Rename {
2366 key: "old".into(),
2367 newkey: "new".into(),
2368 },
2369 );
2370 assert!(matches!(resp, ShardResponse::Ok));
2371 assert!(!ks.exists("old"));
2372 assert!(ks.exists("new"));
2373 }
2374
2375 #[test]
2376 fn dispatch_rename_missing_key() {
2377 let mut ks = Keyspace::new();
2378 let resp = test_dispatch(
2379 &mut ks,
2380 &ShardRequest::Rename {
2381 key: "missing".into(),
2382 newkey: "new".into(),
2383 },
2384 );
2385 assert!(matches!(resp, ShardResponse::Err(_)));
2386 }
2387
2388 #[test]
2389 fn to_aof_record_for_rename() {
2390 let req = ShardRequest::Rename {
2391 key: "old".into(),
2392 newkey: "new".into(),
2393 };
2394 let resp = ShardResponse::Ok;
2395 let record = to_aof_record(&req, &resp).unwrap();
2396 match record {
2397 AofRecord::Rename { key, newkey } => {
2398 assert_eq!(key, "old");
2399 assert_eq!(newkey, "new");
2400 }
2401 other => panic!("expected Rename, got {other:?}"),
2402 }
2403 }
2404}