1mod aof;
47mod blocking;
48mod persistence;
49
50use std::collections::{HashMap, VecDeque};
51use std::path::PathBuf;
52use std::time::Duration;
53
54use bytes::Bytes;
55use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
56use ember_persistence::recovery::{self, RecoveredValue};
57use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
58use smallvec::{smallvec, SmallVec};
59use tokio::sync::{broadcast, mpsc, oneshot};
60use tracing::{debug, error, info, warn};
61
62use crate::dropper::DropHandle;
63use crate::error::ShardError;
64use crate::expiry;
65use crate::keyspace::{
66 EvictionPolicy, IncrError, IncrFloatError, Keyspace, KeyspaceStats, LsetError, SetResult,
67 ShardConfig, TtlResult, WriteError,
68};
69use crate::types::sorted_set::{ScoreBound, ZAddFlags};
70use crate::types::Value;
71use ember_protocol::command::{BitOpKind, BitRange};
72
73const EXPIRY_TICK: Duration = Duration::from_millis(100);
76
77const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
79
80#[derive(Debug, Clone)]
86pub struct ReplicationEvent {
87 pub shard_id: u16,
89 pub offset: u64,
91 pub record: AofRecord,
93}
94
95#[derive(Debug, Clone)]
97pub struct ShardPersistenceConfig {
98 pub data_dir: PathBuf,
100 pub append_only: bool,
102 pub fsync_policy: FsyncPolicy,
104 #[cfg(feature = "encryption")]
107 pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
108}
109
110#[derive(Debug)]
112pub enum ShardRequest {
113 Get {
114 key: String,
115 },
116 Set {
117 key: String,
118 value: Bytes,
119 expire: Option<Duration>,
120 nx: bool,
122 xx: bool,
124 },
125 Incr {
126 key: String,
127 },
128 Decr {
129 key: String,
130 },
131 IncrBy {
132 key: String,
133 delta: i64,
134 },
135 DecrBy {
136 key: String,
137 delta: i64,
138 },
139 IncrByFloat {
140 key: String,
141 delta: f64,
142 },
143 Append {
144 key: String,
145 value: Bytes,
146 },
147 Strlen {
148 key: String,
149 },
150 GetRange {
151 key: String,
152 start: i64,
153 end: i64,
154 },
155 SetRange {
156 key: String,
157 offset: usize,
158 value: Bytes,
159 },
160 GetBit {
162 key: String,
163 offset: u64,
164 },
165 SetBit {
167 key: String,
168 offset: u64,
169 value: u8,
170 },
171 BitCount {
173 key: String,
174 range: Option<BitRange>,
175 },
176 BitPos {
178 key: String,
179 bit: u8,
180 range: Option<BitRange>,
181 },
182 BitOp {
184 op: BitOpKind,
185 dest: String,
186 keys: Vec<String>,
187 },
188 Keys {
190 pattern: String,
191 },
192 Rename {
194 key: String,
195 newkey: String,
196 },
197 Copy {
199 source: String,
200 destination: String,
201 replace: bool,
202 },
203 ObjectEncoding {
205 key: String,
206 },
207 Del {
208 key: String,
209 },
210 Unlink {
212 key: String,
213 },
214 Exists {
215 key: String,
216 },
217 RandomKey,
219 Touch {
221 key: String,
222 },
223 Sort {
225 key: String,
226 desc: bool,
227 alpha: bool,
228 limit: Option<(i64, i64)>,
229 },
230 Expire {
231 key: String,
232 seconds: u64,
233 },
234 Ttl {
235 key: String,
236 },
237 MemoryUsage {
239 key: String,
240 },
241 Persist {
242 key: String,
243 },
244 Pttl {
245 key: String,
246 },
247 Pexpire {
248 key: String,
249 milliseconds: u64,
250 },
251 Expireat {
253 key: String,
254 timestamp: u64,
255 },
256 Pexpireat {
258 key: String,
259 timestamp_ms: u64,
260 },
261 LPush {
262 key: String,
263 values: Vec<Bytes>,
264 },
265 RPush {
266 key: String,
267 values: Vec<Bytes>,
268 },
269 LPop {
270 key: String,
271 },
272 RPop {
273 key: String,
274 },
275 LPopCount {
277 key: String,
278 count: usize,
279 },
280 RPopCount {
282 key: String,
283 count: usize,
284 },
285 BLPop {
290 key: String,
291 waiter: mpsc::Sender<(String, Bytes)>,
292 },
293 BRPop {
295 key: String,
296 waiter: mpsc::Sender<(String, Bytes)>,
297 },
298 LRange {
299 key: String,
300 start: i64,
301 stop: i64,
302 },
303 LLen {
304 key: String,
305 },
306 LIndex {
307 key: String,
308 index: i64,
309 },
310 LSet {
311 key: String,
312 index: i64,
313 value: Bytes,
314 },
315 LTrim {
316 key: String,
317 start: i64,
318 stop: i64,
319 },
320 LInsert {
321 key: String,
322 before: bool,
323 pivot: Bytes,
324 value: Bytes,
325 },
326 LRem {
327 key: String,
328 count: i64,
329 value: Bytes,
330 },
331 LPos {
332 key: String,
333 element: Bytes,
334 rank: i64,
335 count: usize,
336 maxlen: usize,
337 },
338 Type {
339 key: String,
340 },
341 ZAdd {
342 key: String,
343 members: Vec<(f64, String)>,
344 nx: bool,
345 xx: bool,
346 gt: bool,
347 lt: bool,
348 ch: bool,
349 },
350 ZRem {
351 key: String,
352 members: Vec<String>,
353 },
354 ZScore {
355 key: String,
356 member: String,
357 },
358 ZRank {
359 key: String,
360 member: String,
361 },
362 ZRevRank {
363 key: String,
364 member: String,
365 },
366 ZCard {
367 key: String,
368 },
369 ZRange {
370 key: String,
371 start: i64,
372 stop: i64,
373 with_scores: bool,
374 },
375 ZRevRange {
376 key: String,
377 start: i64,
378 stop: i64,
379 with_scores: bool,
380 },
381 ZCount {
382 key: String,
383 min: ScoreBound,
384 max: ScoreBound,
385 },
386 ZIncrBy {
387 key: String,
388 increment: f64,
389 member: String,
390 },
391 ZRangeByScore {
392 key: String,
393 min: ScoreBound,
394 max: ScoreBound,
395 offset: usize,
396 count: Option<usize>,
397 },
398 ZRevRangeByScore {
399 key: String,
400 min: ScoreBound,
401 max: ScoreBound,
402 offset: usize,
403 count: Option<usize>,
404 },
405 ZPopMin {
406 key: String,
407 count: usize,
408 },
409 ZPopMax {
410 key: String,
411 count: usize,
412 },
413 LmpopSingle {
415 key: String,
416 left: bool,
417 count: usize,
418 },
419 ZmpopSingle {
421 key: String,
422 min: bool,
423 count: usize,
424 },
425 HSet {
426 key: String,
427 fields: Vec<(String, Bytes)>,
428 },
429 HGet {
430 key: String,
431 field: String,
432 },
433 HGetAll {
434 key: String,
435 },
436 HDel {
437 key: String,
438 fields: Vec<String>,
439 },
440 HExists {
441 key: String,
442 field: String,
443 },
444 HLen {
445 key: String,
446 },
447 HIncrBy {
448 key: String,
449 field: String,
450 delta: i64,
451 },
452 HIncrByFloat {
454 key: String,
455 field: String,
456 delta: f64,
457 },
458 HKeys {
459 key: String,
460 },
461 HVals {
462 key: String,
463 },
464 HMGet {
465 key: String,
466 fields: Vec<String>,
467 },
468 HRandField {
470 key: String,
471 count: Option<i64>,
472 with_values: bool,
473 },
474 SAdd {
475 key: String,
476 members: Vec<String>,
477 },
478 SRem {
479 key: String,
480 members: Vec<String>,
481 },
482 SMembers {
483 key: String,
484 },
485 SIsMember {
486 key: String,
487 member: String,
488 },
489 SCard {
490 key: String,
491 },
492 SUnion {
493 keys: Vec<String>,
494 },
495 SInter {
496 keys: Vec<String>,
497 },
498 SDiff {
499 keys: Vec<String>,
500 },
501 SUnionStore {
502 dest: String,
503 keys: Vec<String>,
504 },
505 SInterStore {
506 dest: String,
507 keys: Vec<String>,
508 },
509 SDiffStore {
510 dest: String,
511 keys: Vec<String>,
512 },
513 SRandMember {
514 key: String,
515 count: i64,
516 },
517 SPop {
518 key: String,
519 count: usize,
520 },
521 SMisMember {
522 key: String,
523 members: Vec<String>,
524 },
525 SMove {
527 source: String,
528 destination: String,
529 member: String,
530 },
531 SInterCard {
533 keys: Vec<String>,
534 limit: usize,
535 },
536 Expiretime {
538 key: String,
539 },
540 Pexpiretime {
542 key: String,
543 },
544 LMove {
546 source: String,
547 destination: String,
548 src_left: bool,
549 dst_left: bool,
550 },
551 GetDel {
553 key: String,
554 },
555 GetSet {
557 key: String,
558 value: Bytes,
559 },
560 MSetNx {
562 pairs: Vec<(String, Bytes)>,
563 },
564 GetEx {
568 key: String,
569 expire: Option<Option<u64>>,
570 },
571 ZDiff {
573 keys: Vec<String>,
574 },
575 ZInter {
577 keys: Vec<String>,
578 },
579 ZUnion {
581 keys: Vec<String>,
582 },
583 ZDiffStore {
585 dest: String,
586 keys: Vec<String>,
587 },
588 ZInterStore {
590 dest: String,
591 keys: Vec<String>,
592 },
593 ZUnionStore {
595 dest: String,
596 keys: Vec<String>,
597 },
598 ZRandMember {
600 key: String,
601 count: Option<i64>,
602 with_scores: bool,
603 },
604 DbSize,
606 Stats,
608 KeyVersion {
611 key: String,
612 },
613 UpdateMemoryConfig {
618 max_memory: Option<usize>,
619 eviction_policy: EvictionPolicy,
620 },
621 Snapshot,
623 SerializeSnapshot,
628 RewriteAof,
630 FlushDb,
632 FlushDbAsync,
634 Scan {
636 cursor: u64,
637 count: usize,
638 pattern: Option<String>,
639 },
640 SScan {
642 key: String,
643 cursor: u64,
644 count: usize,
645 pattern: Option<String>,
646 },
647 HScan {
649 key: String,
650 cursor: u64,
651 count: usize,
652 pattern: Option<String>,
653 },
654 ZScan {
656 key: String,
657 cursor: u64,
658 count: usize,
659 pattern: Option<String>,
660 },
661 CountKeysInSlot {
663 slot: u16,
664 },
665 GetKeysInSlot {
667 slot: u16,
668 count: usize,
669 },
670 DumpKey {
672 key: String,
673 },
674 RestoreKey {
676 key: String,
677 ttl_ms: u64,
678 data: bytes::Bytes,
679 replace: bool,
680 },
681 #[cfg(feature = "vector")]
683 VAdd {
684 key: String,
685 element: String,
686 vector: Vec<f32>,
687 metric: u8,
688 quantization: u8,
689 connectivity: u32,
690 expansion_add: u32,
691 },
692 #[cfg(feature = "vector")]
694 VAddBatch {
695 key: String,
696 entries: Vec<(String, Vec<f32>)>,
697 dim: usize,
698 metric: u8,
699 quantization: u8,
700 connectivity: u32,
701 expansion_add: u32,
702 },
703 #[cfg(feature = "vector")]
705 VSim {
706 key: String,
707 query: Vec<f32>,
708 count: usize,
709 ef_search: usize,
710 },
711 #[cfg(feature = "vector")]
713 VRem {
714 key: String,
715 element: String,
716 },
717 #[cfg(feature = "vector")]
719 VGet {
720 key: String,
721 element: String,
722 },
723 #[cfg(feature = "vector")]
725 VCard {
726 key: String,
727 },
728 #[cfg(feature = "vector")]
730 VDim {
731 key: String,
732 },
733 #[cfg(feature = "vector")]
735 VInfo {
736 key: String,
737 },
738 #[cfg(feature = "protobuf")]
740 ProtoSet {
741 key: String,
742 type_name: String,
743 data: Bytes,
744 expire: Option<Duration>,
745 nx: bool,
746 xx: bool,
747 },
748 #[cfg(feature = "protobuf")]
750 ProtoGet {
751 key: String,
752 },
753 #[cfg(feature = "protobuf")]
755 ProtoType {
756 key: String,
757 },
758 #[cfg(feature = "protobuf")]
762 ProtoRegisterAof {
763 name: String,
764 descriptor: Bytes,
765 },
766 #[cfg(feature = "protobuf")]
769 ProtoSetField {
770 key: String,
771 field_path: String,
772 value: String,
773 },
774 #[cfg(feature = "protobuf")]
777 ProtoDelField {
778 key: String,
779 field_path: String,
780 },
781}
782
783impl ShardRequest {
784 fn is_write(&self) -> bool {
788 #[allow(unreachable_patterns)]
789 match self {
790 ShardRequest::Set { .. }
791 | ShardRequest::Incr { .. }
792 | ShardRequest::Decr { .. }
793 | ShardRequest::IncrBy { .. }
794 | ShardRequest::DecrBy { .. }
795 | ShardRequest::IncrByFloat { .. }
796 | ShardRequest::Append { .. }
797 | ShardRequest::SetBit { .. }
798 | ShardRequest::BitOp { .. }
799 | ShardRequest::Del { .. }
800 | ShardRequest::Unlink { .. }
801 | ShardRequest::Rename { .. }
802 | ShardRequest::Copy { .. }
803 | ShardRequest::Expire { .. }
804 | ShardRequest::Expireat { .. }
805 | ShardRequest::Persist { .. }
806 | ShardRequest::Pexpire { .. }
807 | ShardRequest::Pexpireat { .. }
808 | ShardRequest::LPush { .. }
809 | ShardRequest::RPush { .. }
810 | ShardRequest::LPop { .. }
811 | ShardRequest::RPop { .. }
812 | ShardRequest::LPopCount { .. }
813 | ShardRequest::RPopCount { .. }
814 | ShardRequest::LSet { .. }
815 | ShardRequest::LTrim { .. }
816 | ShardRequest::LInsert { .. }
817 | ShardRequest::LRem { .. }
818 | ShardRequest::BLPop { .. }
819 | ShardRequest::BRPop { .. }
820 | ShardRequest::ZAdd { .. }
821 | ShardRequest::ZRem { .. }
822 | ShardRequest::ZIncrBy { .. }
823 | ShardRequest::ZPopMin { .. }
824 | ShardRequest::ZPopMax { .. }
825 | ShardRequest::LmpopSingle { .. }
826 | ShardRequest::ZmpopSingle { .. }
827 | ShardRequest::HSet { .. }
828 | ShardRequest::HDel { .. }
829 | ShardRequest::HIncrBy { .. }
830 | ShardRequest::HIncrByFloat { .. }
831 | ShardRequest::SAdd { .. }
832 | ShardRequest::SRem { .. }
833 | ShardRequest::SPop { .. }
834 | ShardRequest::SUnionStore { .. }
835 | ShardRequest::SInterStore { .. }
836 | ShardRequest::SDiffStore { .. }
837 | ShardRequest::SMove { .. }
838 | ShardRequest::ZDiffStore { .. }
839 | ShardRequest::ZInterStore { .. }
840 | ShardRequest::ZUnionStore { .. }
841 | ShardRequest::LMove { .. }
842 | ShardRequest::GetDel { .. }
843 | ShardRequest::GetEx { .. }
844 | ShardRequest::GetSet { .. }
845 | ShardRequest::MSetNx { .. }
846 | ShardRequest::FlushDb
847 | ShardRequest::FlushDbAsync
848 | ShardRequest::RestoreKey { .. } => true,
849 #[cfg(feature = "protobuf")]
850 ShardRequest::ProtoSet { .. }
851 | ShardRequest::ProtoRegisterAof { .. }
852 | ShardRequest::ProtoSetField { .. }
853 | ShardRequest::ProtoDelField { .. } => true,
854 #[cfg(feature = "vector")]
855 ShardRequest::VAdd { .. }
856 | ShardRequest::VAddBatch { .. }
857 | ShardRequest::VRem { .. } => true,
858 _ => false,
859 }
860 }
861}
862
863#[derive(Debug)]
865pub enum ShardResponse {
866 Value(Option<Value>),
868 Ok,
870 Integer(i64),
872 Bool(bool),
874 Ttl(TtlResult),
876 OutOfMemory,
878 KeyCount(usize),
880 Stats(KeyspaceStats),
882 Len(usize),
884 Array(Vec<Bytes>),
886 TypeName(&'static str),
888 EncodingName(Option<&'static str>),
890 ZAddLen {
892 count: usize,
893 applied: Vec<(f64, String)>,
894 },
895 ZRemLen { count: usize, removed: Vec<String> },
897 Score(Option<f64>),
899 Rank(Option<usize>),
901 ScoredArray(Vec<(String, f64)>),
903 ZIncrByResult { new_score: f64, member: String },
905 ZPopResult(Vec<(String, f64)>),
907 BulkString(String),
909 WrongType,
911 Err(String),
913 Scan { cursor: u64, keys: Vec<String> },
915 CollectionScan { cursor: u64, items: Vec<Bytes> },
917 HashFields(Vec<(String, Bytes)>),
919 HRandFieldResult(Vec<(String, Option<Bytes>)>),
921 ZRandMemberResult(Vec<(String, Option<f64>)>),
923 HDelLen { count: usize, removed: Vec<String> },
925 StringArray(Vec<String>),
927 IntegerArray(Vec<i64>),
929 BoolArray(Vec<bool>),
931 SetStoreResult { count: usize, members: Vec<String> },
933 ZStoreResult {
935 count: usize,
936 members: Vec<(f64, String)>,
938 },
939 KeyDump { data: Vec<u8>, ttl_ms: i64 },
941 SnapshotData { shard_id: u16, data: Vec<u8> },
943 OptionalArray(Vec<Option<Bytes>>),
945 #[cfg(feature = "vector")]
947 VAddResult {
948 element: String,
949 vector: Vec<f32>,
950 added: bool,
951 },
952 #[cfg(feature = "vector")]
954 VAddBatchResult {
955 added_count: usize,
956 applied: Vec<(String, Vec<f32>)>,
957 },
958 #[cfg(feature = "vector")]
960 VSimResult(Vec<(String, f32)>),
961 #[cfg(feature = "vector")]
963 VectorData(Option<Vec<f32>>),
964 #[cfg(feature = "vector")]
966 VectorInfo(Option<Vec<(String, String)>>),
967 #[cfg(feature = "protobuf")]
969 ProtoValue(Option<(String, Bytes, Option<Duration>)>),
970 #[cfg(feature = "protobuf")]
972 ProtoTypeName(Option<String>),
973 #[cfg(feature = "protobuf")]
976 ProtoFieldUpdated {
977 type_name: String,
978 data: Bytes,
979 expire: Option<Duration>,
980 },
981 Version(Option<u64>),
983}
984
985#[derive(Debug)]
996pub enum ShardMessage {
997 Single {
999 request: ShardRequest,
1000 reply: oneshot::Sender<ShardResponse>,
1001 },
1002 SingleReusable {
1008 request: ShardRequest,
1009 reply: mpsc::Sender<ShardResponse>,
1010 },
1011 Batch(Vec<(ShardRequest, oneshot::Sender<ShardResponse>)>),
1013}
1014
1015#[derive(Debug, Clone)]
1020pub struct ShardHandle {
1021 tx: mpsc::Sender<ShardMessage>,
1022}
1023
1024impl ShardHandle {
1025 pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
1029 let rx = self.dispatch(request).await?;
1030 rx.await.map_err(|_| ShardError::Unavailable)
1031 }
1032
1033 pub async fn dispatch(
1038 &self,
1039 request: ShardRequest,
1040 ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
1041 let (reply_tx, reply_rx) = oneshot::channel();
1042 let msg = ShardMessage::Single {
1043 request,
1044 reply: reply_tx,
1045 };
1046 self.tx
1047 .send(msg)
1048 .await
1049 .map_err(|_| ShardError::Unavailable)?;
1050 Ok(reply_rx)
1051 }
1052
1053 pub async fn dispatch_reusable(
1059 &self,
1060 request: ShardRequest,
1061 reply: mpsc::Sender<ShardResponse>,
1062 ) -> Result<(), ShardError> {
1063 self.tx
1064 .send(ShardMessage::SingleReusable { request, reply })
1065 .await
1066 .map_err(|_| ShardError::Unavailable)
1067 }
1068
1069 pub async fn dispatch_batch(
1077 &self,
1078 requests: Vec<ShardRequest>,
1079 ) -> Result<Vec<oneshot::Receiver<ShardResponse>>, ShardError> {
1080 if requests.len() == 1 {
1081 let rx = self
1082 .dispatch(requests.into_iter().next().expect("len == 1"))
1083 .await?;
1084 return Ok(vec![rx]);
1085 }
1086 let mut receivers = Vec::with_capacity(requests.len());
1087 let mut entries = Vec::with_capacity(requests.len());
1088 for request in requests {
1089 let (tx, rx) = oneshot::channel();
1090 entries.push((request, tx));
1091 receivers.push(rx);
1092 }
1093 self.tx
1094 .send(ShardMessage::Batch(entries))
1095 .await
1096 .map_err(|_| ShardError::Unavailable)?;
1097 Ok(receivers)
1098 }
1099}
1100
1101pub struct PreparedShard {
1108 rx: mpsc::Receiver<ShardMessage>,
1109 config: ShardConfig,
1110 persistence: Option<ShardPersistenceConfig>,
1111 drop_handle: Option<DropHandle>,
1112 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1113 expired_tx: Option<broadcast::Sender<String>>,
1115 #[cfg(feature = "protobuf")]
1116 schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1117}
1118
1119pub fn prepare_shard(
1124 buffer: usize,
1125 config: ShardConfig,
1126 persistence: Option<ShardPersistenceConfig>,
1127 drop_handle: Option<DropHandle>,
1128 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1129 expired_tx: Option<broadcast::Sender<String>>,
1130 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1131) -> (ShardHandle, PreparedShard) {
1132 let (tx, rx) = mpsc::channel(buffer);
1133 let prepared = PreparedShard {
1134 rx,
1135 config,
1136 persistence,
1137 drop_handle,
1138 replication_tx,
1139 expired_tx,
1140 #[cfg(feature = "protobuf")]
1141 schema_registry,
1142 };
1143 (ShardHandle { tx }, prepared)
1144}
1145
1146pub async fn run_prepared(prepared: PreparedShard) {
1151 run_shard(
1152 prepared.rx,
1153 prepared.config,
1154 prepared.persistence,
1155 prepared.drop_handle,
1156 prepared.replication_tx,
1157 prepared.expired_tx,
1158 #[cfg(feature = "protobuf")]
1159 prepared.schema_registry,
1160 )
1161 .await
1162}
1163
1164pub fn spawn_shard(
1173 buffer: usize,
1174 config: ShardConfig,
1175 persistence: Option<ShardPersistenceConfig>,
1176 drop_handle: Option<DropHandle>,
1177 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1178 expired_tx: Option<broadcast::Sender<String>>,
1179 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1180) -> ShardHandle {
1181 let (handle, prepared) = prepare_shard(
1182 buffer,
1183 config,
1184 persistence,
1185 drop_handle,
1186 replication_tx,
1187 expired_tx,
1188 #[cfg(feature = "protobuf")]
1189 schema_registry,
1190 );
1191 tokio::spawn(run_prepared(prepared));
1192 handle
1193}
1194
1195async fn run_shard(
1198 mut rx: mpsc::Receiver<ShardMessage>,
1199 config: ShardConfig,
1200 persistence: Option<ShardPersistenceConfig>,
1201 drop_handle: Option<DropHandle>,
1202 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1203 expired_tx: Option<broadcast::Sender<String>>,
1204 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1205) {
1206 let shard_id = config.shard_id;
1207 let mut keyspace = Keyspace::with_config(config);
1208
1209 if let Some(handle) = drop_handle.clone() {
1210 keyspace.set_drop_handle(handle);
1211 }
1212
1213 if let Some(ref pcfg) = persistence {
1215 #[cfg(feature = "encryption")]
1216 let result = if let Some(ref key) = pcfg.encryption_key {
1217 recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
1218 } else {
1219 recovery::recover_shard(&pcfg.data_dir, shard_id)
1220 };
1221 #[cfg(not(feature = "encryption"))]
1222 let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
1223 let count = result.entries.len();
1224 for entry in result.entries {
1225 let value = match entry.value {
1226 RecoveredValue::String(data) => Value::String(data),
1227 RecoveredValue::List(deque) => Value::List(deque),
1228 RecoveredValue::SortedSet(members) => {
1229 let mut ss = crate::types::sorted_set::SortedSet::new();
1230 for (score, member) in members {
1231 ss.add(&member, score);
1232 }
1233 Value::SortedSet(Box::new(ss))
1234 }
1235 RecoveredValue::Hash(map) => {
1236 Value::Hash(Box::new(crate::types::hash::HashValue::from(map)))
1237 }
1238 RecoveredValue::Set(set) => Value::Set(Box::new(set)),
1239 #[cfg(feature = "vector")]
1240 RecoveredValue::Vector {
1241 metric,
1242 quantization,
1243 connectivity,
1244 expansion_add,
1245 elements,
1246 } => {
1247 use crate::types::vector::{DistanceMetric, QuantizationType, VectorSet};
1248 let dim = elements.first().map(|(_, v)| v.len()).unwrap_or(0);
1249 match VectorSet::new(
1250 dim,
1251 DistanceMetric::from_u8(metric),
1252 QuantizationType::from_u8(quantization),
1253 connectivity as usize,
1254 expansion_add as usize,
1255 ) {
1256 Ok(mut vs) => {
1257 for (element, vector) in elements {
1258 if let Err(e) = vs.add(element, &vector) {
1259 warn!("vector recovery: failed to add element: {e}");
1260 }
1261 }
1262 Value::Vector(vs)
1263 }
1264 Err(e) => {
1265 warn!("vector recovery: failed to create index: {e}");
1266 continue;
1267 }
1268 }
1269 }
1270 #[cfg(feature = "protobuf")]
1271 RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
1272 };
1273 keyspace.restore(entry.key, value, entry.ttl);
1274 }
1275 if count > 0 {
1276 info!(
1277 shard_id,
1278 recovered_keys = count,
1279 snapshot = result.loaded_snapshot,
1280 aof = result.replayed_aof,
1281 "recovered shard state"
1282 );
1283 }
1284
1285 #[cfg(feature = "protobuf")]
1287 if let Some(ref registry) = schema_registry {
1288 if !result.schemas.is_empty() {
1289 if let Ok(mut reg) = registry.write() {
1290 let schema_count = result.schemas.len();
1291 for (name, descriptor) in result.schemas {
1292 reg.restore(name, descriptor);
1293 }
1294 info!(
1295 shard_id,
1296 schemas = schema_count,
1297 "restored schemas from AOF"
1298 );
1299 }
1300 }
1301 }
1302 }
1303
1304 let mut aof_writer: Option<AofWriter> = match &persistence {
1306 Some(pcfg) if pcfg.append_only => {
1307 let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
1308 #[cfg(feature = "encryption")]
1309 let result = if let Some(ref key) = pcfg.encryption_key {
1310 AofWriter::open_encrypted(path, key.clone())
1311 } else {
1312 AofWriter::open(path)
1313 };
1314 #[cfg(not(feature = "encryption"))]
1315 let result = AofWriter::open(path);
1316 match result {
1317 Ok(w) => Some(w),
1318 Err(e) => {
1319 warn!(shard_id, "failed to open AOF writer: {e}");
1320 None
1321 }
1322 }
1323 }
1324 _ => None,
1325 };
1326
1327 let fsync_policy = persistence
1328 .as_ref()
1329 .map(|p| p.fsync_policy)
1330 .unwrap_or(FsyncPolicy::No);
1331
1332 let mut replication_offset: u64 = 0;
1334
1335 let mut lpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1337 let mut rpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1338
1339 let mut aof_errors: u32 = 0;
1341
1342 let mut disk_full: bool = false;
1345
1346 let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
1348 expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1349
1350 let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
1351 fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1352
1353 loop {
1354 tokio::select! {
1355 msg = rx.recv() => {
1356 match msg {
1357 Some(msg) => {
1358 let mut ctx = ProcessCtx {
1359 keyspace: &mut keyspace,
1360 aof_writer: &mut aof_writer,
1361 fsync_policy,
1362 persistence: &persistence,
1363 drop_handle: &drop_handle,
1364 shard_id,
1365 replication_tx: &replication_tx,
1366 replication_offset: &mut replication_offset,
1367 lpop_waiters: &mut lpop_waiters,
1368 rpop_waiters: &mut rpop_waiters,
1369 aof_errors: &mut aof_errors,
1370 disk_full: &mut disk_full,
1371 #[cfg(feature = "protobuf")]
1372 schema_registry: &schema_registry,
1373 };
1374 process_message(msg, &mut ctx);
1375
1376 while let Ok(msg) = rx.try_recv() {
1381 process_message(msg, &mut ctx);
1382 }
1383 }
1384 None => break, }
1386 }
1387 _ = expiry_tick.tick() => {
1388 let expired_keys = expiry::run_expiration_cycle(&mut keyspace);
1389 if let Some(ref tx) = expired_tx {
1390 if !expired_keys.is_empty() && tx.receiver_count() > 0 {
1391 for key in &expired_keys {
1392 let _ = tx.send(key.clone());
1393 }
1394 }
1395 }
1396 }
1397 _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
1398 if let Some(ref mut writer) = aof_writer {
1399 if let Err(e) = writer.sync() {
1400 if aof::log_aof_error(shard_id, &mut aof_errors, "sync", &e) {
1401 disk_full = true;
1402 }
1403 } else if aof_errors > 0 {
1404 let missed = aof_errors;
1405 aof_errors = 0;
1406 if disk_full {
1407 disk_full = false;
1408 info!(shard_id, missed_errors = missed, "aof sync recovered, accepting writes again");
1409 } else {
1410 info!(shard_id, missed_errors = missed, "aof sync recovered");
1411 }
1412 }
1413 }
1414 }
1415 }
1416 }
1417
1418 if let Some(ref mut writer) = aof_writer {
1420 let _ = writer.sync();
1421 }
1422}
1423
1424struct ProcessCtx<'a> {
1429 keyspace: &'a mut Keyspace,
1430 aof_writer: &'a mut Option<AofWriter>,
1431 fsync_policy: FsyncPolicy,
1432 persistence: &'a Option<ShardPersistenceConfig>,
1433 drop_handle: &'a Option<DropHandle>,
1434 shard_id: u16,
1435 replication_tx: &'a Option<broadcast::Sender<ReplicationEvent>>,
1436 replication_offset: &'a mut u64,
1437 lpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1439 rpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1441 aof_errors: &'a mut u32,
1444 disk_full: &'a mut bool,
1446 #[cfg(feature = "protobuf")]
1447 schema_registry: &'a Option<crate::schema::SharedSchemaRegistry>,
1448}
1449
1450fn process_message(msg: ShardMessage, ctx: &mut ProcessCtx<'_>) {
1455 match msg {
1456 ShardMessage::Single { request, reply } => {
1457 process_single(request, ReplySender::Oneshot(reply), ctx);
1458 }
1459 ShardMessage::SingleReusable { request, reply } => {
1460 process_single(request, ReplySender::Reusable(reply), ctx);
1461 }
1462 ShardMessage::Batch(entries) => {
1463 for (request, reply) in entries {
1464 process_single(request, ReplySender::Oneshot(reply), ctx);
1465 }
1466 }
1467 }
1468}
1469
1470enum ReplySender {
1475 Oneshot(oneshot::Sender<ShardResponse>),
1476 Reusable(mpsc::Sender<ShardResponse>),
1477}
1478
1479impl ReplySender {
1480 fn send(self, response: ShardResponse) {
1481 match self {
1482 ReplySender::Oneshot(tx) => {
1483 let _ = tx.send(response);
1484 }
1485 ReplySender::Reusable(tx) => {
1486 if let Err(e) = tx.try_send(response) {
1490 debug!("reusable reply channel full or closed: {e}");
1491 }
1492 }
1493 }
1494 }
1495}
1496
1497fn process_single(mut request: ShardRequest, reply: ReplySender, ctx: &mut ProcessCtx<'_>) {
1500 let fsync_policy = ctx.fsync_policy;
1502 let shard_id = ctx.shard_id;
1503
1504 if *ctx.disk_full && ctx.aof_writer.is_some() && request.is_write() {
1507 reply.send(ShardResponse::Err(
1508 "ERR disk full, write rejected — free disk space to resume writes".into(),
1509 ));
1510 return;
1511 }
1512
1513 match request {
1517 ShardRequest::BLPop { key, waiter } => {
1518 blocking::handle_blocking_pop(&key, waiter, true, reply, ctx);
1519 return;
1520 }
1521 ShardRequest::BRPop { key, waiter } => {
1522 blocking::handle_blocking_pop(&key, waiter, false, reply, ctx);
1523 return;
1524 }
1525 _ => {}
1526 }
1527
1528 let request_kind = describe_request(&request);
1529 let mut response = dispatch(
1530 ctx.keyspace,
1531 &mut request,
1532 #[cfg(feature = "protobuf")]
1533 ctx.schema_registry,
1534 );
1535
1536 if let ShardRequest::LPush { ref key, .. } | ShardRequest::RPush { ref key, .. } = request {
1539 if matches!(response, ShardResponse::Len(_)) {
1540 blocking::wake_blocked_waiters(key, ctx);
1541 }
1542 }
1543
1544 let records = aof::to_aof_records(request, &mut response);
1548
1549 if let Some(ref mut writer) = *ctx.aof_writer {
1551 let mut batch_ok = true;
1552 for record in &records {
1553 if let Err(e) = writer.write_record(record) {
1554 if aof::log_aof_error(shard_id, ctx.aof_errors, "write", &e) {
1555 *ctx.disk_full = true;
1556 }
1557 batch_ok = false;
1558 }
1559 }
1560 if !records.is_empty() && fsync_policy == FsyncPolicy::Always {
1561 if let Err(e) = writer.sync() {
1562 if aof::log_aof_error(shard_id, ctx.aof_errors, "sync", &e) {
1563 *ctx.disk_full = true;
1564 }
1565 batch_ok = false;
1566 }
1567 }
1568 if batch_ok && *ctx.aof_errors > 0 {
1569 let missed = *ctx.aof_errors;
1570 *ctx.aof_errors = 0;
1571 *ctx.disk_full = false;
1572 info!(shard_id, missed_errors = missed, "aof writes recovered");
1573 }
1574 }
1575
1576 if let Some(ref tx) = *ctx.replication_tx {
1578 for record in records {
1579 *ctx.replication_offset += 1;
1580 let _ = tx.send(ReplicationEvent {
1582 shard_id,
1583 offset: *ctx.replication_offset,
1584 record,
1585 });
1586 }
1587 }
1588
1589 match request_kind {
1591 RequestKind::Snapshot => {
1592 let resp = persistence::handle_snapshot(ctx.keyspace, ctx.persistence, shard_id);
1593 reply.send(resp);
1594 return;
1595 }
1596 RequestKind::SerializeSnapshot => {
1597 let resp = persistence::handle_serialize_snapshot(ctx.keyspace, shard_id);
1598 reply.send(resp);
1599 return;
1600 }
1601 RequestKind::RewriteAof => {
1602 let resp = persistence::handle_rewrite(
1603 ctx.keyspace,
1604 ctx.persistence,
1605 ctx.aof_writer,
1606 shard_id,
1607 #[cfg(feature = "protobuf")]
1608 ctx.schema_registry,
1609 );
1610 reply.send(resp);
1611 return;
1612 }
1613 RequestKind::FlushDbAsync => {
1614 let old_entries = ctx.keyspace.flush_async();
1615 if let Some(ref handle) = *ctx.drop_handle {
1616 handle.defer_entries(old_entries);
1617 }
1618 reply.send(ShardResponse::Ok);
1619 return;
1620 }
1621 RequestKind::UpdateMemoryConfig {
1622 max_memory,
1623 eviction_policy,
1624 } => {
1625 ctx.keyspace
1626 .update_memory_config(max_memory, eviction_policy);
1627 reply.send(ShardResponse::Ok);
1628 return;
1629 }
1630 RequestKind::Other => {}
1631 }
1632
1633 reply.send(response);
1634}
1635
1636enum RequestKind {
1639 Snapshot,
1640 SerializeSnapshot,
1641 RewriteAof,
1642 FlushDbAsync,
1643 UpdateMemoryConfig {
1644 max_memory: Option<usize>,
1645 eviction_policy: EvictionPolicy,
1646 },
1647 Other,
1648}
1649
1650fn describe_request(req: &ShardRequest) -> RequestKind {
1651 match req {
1652 ShardRequest::Snapshot => RequestKind::Snapshot,
1653 ShardRequest::SerializeSnapshot => RequestKind::SerializeSnapshot,
1654 ShardRequest::RewriteAof => RequestKind::RewriteAof,
1655 ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
1656 ShardRequest::UpdateMemoryConfig {
1657 max_memory,
1658 eviction_policy,
1659 } => RequestKind::UpdateMemoryConfig {
1660 max_memory: *max_memory,
1661 eviction_policy: *eviction_policy,
1662 },
1663 _ => RequestKind::Other,
1664 }
1665}
1666
1667fn incr_result(result: Result<i64, IncrError>) -> ShardResponse {
1669 match result {
1670 Ok(val) => ShardResponse::Integer(val),
1671 Err(IncrError::WrongType) => ShardResponse::WrongType,
1672 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
1673 Err(e) => ShardResponse::Err(e.to_string()),
1674 }
1675}
1676
1677fn write_result_len(result: Result<usize, WriteError>) -> ShardResponse {
1679 match result {
1680 Ok(len) => ShardResponse::Len(len),
1681 Err(WriteError::WrongType) => ShardResponse::WrongType,
1682 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1683 }
1684}
1685
1686fn store_set_response(result: Result<(usize, Vec<String>), WriteError>) -> ShardResponse {
1687 match result {
1688 Ok((count, members)) => ShardResponse::SetStoreResult { count, members },
1689 Err(WriteError::WrongType) => ShardResponse::WrongType,
1690 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1691 }
1692}
1693
1694fn dispatch(
1698 ks: &mut Keyspace,
1699 req: &mut ShardRequest,
1700 #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1701) -> ShardResponse {
1702 match req {
1703 ShardRequest::Get { key } => match ks.get_string(key) {
1704 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1705 Err(_) => ShardResponse::WrongType,
1706 },
1707 ShardRequest::Set {
1708 key,
1709 value,
1710 expire,
1711 nx,
1712 xx,
1713 } => match ks.set(key.clone(), value.clone(), *expire, *nx, *xx) {
1714 SetResult::Ok => ShardResponse::Ok,
1715 SetResult::Blocked => ShardResponse::Value(None),
1716 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
1717 },
1718 ShardRequest::Incr { key } => incr_result(ks.incr(key)),
1719 ShardRequest::Decr { key } => incr_result(ks.decr(key)),
1720 ShardRequest::IncrBy { key, delta } => incr_result(ks.incr_by(key, *delta)),
1721 ShardRequest::DecrBy { key, delta } => match delta.checked_neg() {
1722 Some(neg) => incr_result(ks.incr_by(key, neg)),
1723 None => ShardResponse::Err("ERR increment or decrement would overflow".into()),
1724 },
1725 ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
1726 Ok(val) => ShardResponse::BulkString(val),
1727 Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
1728 Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
1729 Err(e) => ShardResponse::Err(e.to_string()),
1730 },
1731 ShardRequest::Append { key, value } => write_result_len(ks.append(key, value)),
1732 ShardRequest::Strlen { key } => match ks.strlen(key) {
1733 Ok(len) => ShardResponse::Len(len),
1734 Err(_) => ShardResponse::WrongType,
1735 },
1736 ShardRequest::GetRange { key, start, end } => match ks.getrange(key, *start, *end) {
1737 Ok(data) => ShardResponse::Value(Some(Value::String(data))),
1738 Err(_) => ShardResponse::WrongType,
1739 },
1740 ShardRequest::SetRange { key, offset, value } => {
1741 write_result_len(ks.setrange(key, *offset, value))
1742 }
1743 ShardRequest::GetBit { key, offset } => match ks.getbit(key, *offset) {
1744 Ok(bit) => ShardResponse::Integer(bit as i64),
1745 Err(_) => ShardResponse::WrongType,
1746 },
1747 ShardRequest::SetBit { key, offset, value } => match ks.setbit(key, *offset, *value) {
1748 Ok(old_bit) => ShardResponse::Integer(old_bit as i64),
1749 Err(WriteError::WrongType) => ShardResponse::WrongType,
1750 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1751 },
1752 ShardRequest::BitCount { key, range } => match ks.bitcount(key, *range) {
1753 Ok(count) => ShardResponse::Integer(count as i64),
1754 Err(_) => ShardResponse::WrongType,
1755 },
1756 ShardRequest::BitPos { key, bit, range } => match ks.bitpos(key, *bit, *range) {
1757 Ok(pos) => ShardResponse::Integer(pos),
1758 Err(_) => ShardResponse::WrongType,
1759 },
1760 ShardRequest::BitOp { op, dest, keys } => match ks.bitop(*op, dest.clone(), keys) {
1761 Ok(len) => ShardResponse::Integer(len as i64),
1762 Err(WriteError::WrongType) => ShardResponse::WrongType,
1763 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1764 },
1765 ShardRequest::Keys { pattern } => {
1766 let keys = ks.keys(pattern);
1767 ShardResponse::StringArray(keys)
1768 }
1769 ShardRequest::Rename { key, newkey } => {
1770 use crate::keyspace::RenameError;
1771 match ks.rename(key, newkey) {
1772 Ok(()) => ShardResponse::Ok,
1773 Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1774 }
1775 }
1776 ShardRequest::Copy {
1777 source,
1778 destination,
1779 replace,
1780 } => {
1781 use crate::keyspace::CopyError;
1782 match ks.copy(source, destination, *replace) {
1783 Ok(copied) => ShardResponse::Bool(copied),
1784 Err(CopyError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1785 Err(CopyError::OutOfMemory) => ShardResponse::OutOfMemory,
1786 }
1787 }
1788 ShardRequest::ObjectEncoding { key } => ShardResponse::EncodingName(ks.encoding(key)),
1789 ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
1790 ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
1791 ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
1792 ShardRequest::RandomKey => match ks.random_key() {
1793 Some(k) => ShardResponse::StringArray(vec![k]),
1794 None => ShardResponse::StringArray(vec![]),
1795 },
1796 ShardRequest::Touch { key } => ShardResponse::Bool(ks.touch(key)),
1797 ShardRequest::Sort {
1798 key,
1799 desc,
1800 alpha,
1801 limit,
1802 } => match ks.sort(key, *desc, *alpha, *limit) {
1803 Ok(items) => ShardResponse::Array(items),
1804 Err(_) => ShardResponse::WrongType,
1805 },
1806 ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
1807 ShardRequest::Expireat { key, timestamp } => {
1808 ShardResponse::Bool(ks.expireat(key, *timestamp))
1809 }
1810 ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
1811 ShardRequest::MemoryUsage { key } => {
1812 ShardResponse::Integer(ks.memory_usage(key).map(|n| n as i64).unwrap_or(-1))
1813 }
1814 ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
1815 ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
1816 ShardRequest::Pexpire { key, milliseconds } => {
1817 ShardResponse::Bool(ks.pexpire(key, *milliseconds))
1818 }
1819 ShardRequest::Pexpireat { key, timestamp_ms } => {
1820 ShardResponse::Bool(ks.pexpireat(key, *timestamp_ms))
1821 }
1822 ShardRequest::LPush { key, values } => write_result_len(ks.lpush(key, values)),
1823 ShardRequest::RPush { key, values } => write_result_len(ks.rpush(key, values)),
1824 ShardRequest::LPop { key } => match ks.lpop(key) {
1825 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1826 Err(_) => ShardResponse::WrongType,
1827 },
1828 ShardRequest::RPop { key } => match ks.rpop(key) {
1829 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1830 Err(_) => ShardResponse::WrongType,
1831 },
1832 ShardRequest::LPopCount { key, count } => match ks.lpop_count(key, *count) {
1833 Ok(Some(items)) => ShardResponse::Array(items),
1834 Ok(None) => ShardResponse::Value(None),
1835 Err(_) => ShardResponse::WrongType,
1836 },
1837 ShardRequest::RPopCount { key, count } => match ks.rpop_count(key, *count) {
1838 Ok(Some(items)) => ShardResponse::Array(items),
1839 Ok(None) => ShardResponse::Value(None),
1840 Err(_) => ShardResponse::WrongType,
1841 },
1842 ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
1843 Ok(items) => ShardResponse::Array(items),
1844 Err(_) => ShardResponse::WrongType,
1845 },
1846 ShardRequest::LLen { key } => match ks.llen(key) {
1847 Ok(len) => ShardResponse::Len(len),
1848 Err(_) => ShardResponse::WrongType,
1849 },
1850 ShardRequest::LIndex { key, index } => match ks.lindex(key, *index) {
1851 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1852 Err(_) => ShardResponse::WrongType,
1853 },
1854 ShardRequest::LSet { key, index, value } => match ks.lset(key, *index, value.clone()) {
1855 Ok(()) => ShardResponse::Ok,
1856 Err(e) => match e {
1857 LsetError::WrongType => ShardResponse::WrongType,
1858 LsetError::NoSuchKey => ShardResponse::Err("ERR no such key".into()),
1859 LsetError::IndexOutOfRange => ShardResponse::Err("ERR index out of range".into()),
1860 },
1861 },
1862 ShardRequest::LTrim { key, start, stop } => match ks.ltrim(key, *start, *stop) {
1863 Ok(()) => ShardResponse::Ok,
1864 Err(_) => ShardResponse::WrongType,
1865 },
1866 ShardRequest::LInsert {
1867 key,
1868 before,
1869 pivot,
1870 value,
1871 } => match ks.linsert(key, *before, pivot, value.clone()) {
1872 Ok(n) => ShardResponse::Integer(n),
1873 Err(WriteError::WrongType) => ShardResponse::WrongType,
1874 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1875 },
1876 ShardRequest::LRem { key, count, value } => match ks.lrem(key, *count, value) {
1877 Ok(n) => ShardResponse::Len(n),
1878 Err(_) => ShardResponse::WrongType,
1879 },
1880 ShardRequest::LPos {
1881 key,
1882 element,
1883 rank,
1884 count,
1885 maxlen,
1886 } => match ks.lpos(key, element, *rank, *count, *maxlen) {
1887 Ok(positions) => ShardResponse::IntegerArray(positions),
1888 Err(_) => ShardResponse::WrongType,
1889 },
1890 ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
1891 ShardRequest::ZAdd {
1892 key,
1893 members,
1894 nx,
1895 xx,
1896 gt,
1897 lt,
1898 ch,
1899 } => {
1900 let flags = ZAddFlags {
1901 nx: *nx,
1902 xx: *xx,
1903 gt: *gt,
1904 lt: *lt,
1905 ch: *ch,
1906 };
1907 match ks.zadd(key, members, &flags) {
1908 Ok(result) => ShardResponse::ZAddLen {
1909 count: result.count,
1910 applied: result.applied,
1911 },
1912 Err(WriteError::WrongType) => ShardResponse::WrongType,
1913 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1914 }
1915 }
1916 ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
1917 Ok(removed) => ShardResponse::ZRemLen {
1918 count: removed.len(),
1919 removed,
1920 },
1921 Err(_) => ShardResponse::WrongType,
1922 },
1923 ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
1924 Ok(score) => ShardResponse::Score(score),
1925 Err(_) => ShardResponse::WrongType,
1926 },
1927 ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
1928 Ok(rank) => ShardResponse::Rank(rank),
1929 Err(_) => ShardResponse::WrongType,
1930 },
1931 ShardRequest::ZCard { key } => match ks.zcard(key) {
1932 Ok(len) => ShardResponse::Len(len),
1933 Err(_) => ShardResponse::WrongType,
1934 },
1935 ShardRequest::ZRevRank { key, member } => match ks.zrevrank(key, member) {
1936 Ok(rank) => ShardResponse::Rank(rank),
1937 Err(_) => ShardResponse::WrongType,
1938 },
1939 ShardRequest::ZRange {
1940 key, start, stop, ..
1941 } => match ks.zrange(key, *start, *stop) {
1942 Ok(items) => ShardResponse::ScoredArray(items),
1943 Err(_) => ShardResponse::WrongType,
1944 },
1945 ShardRequest::ZRevRange {
1946 key, start, stop, ..
1947 } => match ks.zrevrange(key, *start, *stop) {
1948 Ok(items) => ShardResponse::ScoredArray(items),
1949 Err(_) => ShardResponse::WrongType,
1950 },
1951 ShardRequest::ZCount { key, min, max } => match ks.zcount(key, *min, *max) {
1952 Ok(count) => ShardResponse::Len(count),
1953 Err(_) => ShardResponse::WrongType,
1954 },
1955 ShardRequest::ZIncrBy {
1956 key,
1957 increment,
1958 member,
1959 } => match ks.zincrby(key, *increment, member) {
1960 Ok(new_score) => ShardResponse::ZIncrByResult {
1961 new_score,
1962 member: member.clone(),
1963 },
1964 Err(WriteError::WrongType) => ShardResponse::WrongType,
1965 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1966 },
1967 ShardRequest::ZRangeByScore {
1968 key,
1969 min,
1970 max,
1971 offset,
1972 count,
1973 } => match ks.zrangebyscore(key, *min, *max, *offset, *count) {
1974 Ok(items) => ShardResponse::ScoredArray(items),
1975 Err(_) => ShardResponse::WrongType,
1976 },
1977 ShardRequest::ZRevRangeByScore {
1978 key,
1979 min,
1980 max,
1981 offset,
1982 count,
1983 } => match ks.zrevrangebyscore(key, *min, *max, *offset, *count) {
1984 Ok(items) => ShardResponse::ScoredArray(items),
1985 Err(_) => ShardResponse::WrongType,
1986 },
1987 ShardRequest::ZPopMin { key, count } => match ks.zpopmin(key, *count) {
1988 Ok(items) => ShardResponse::ZPopResult(items),
1989 Err(_) => ShardResponse::WrongType,
1990 },
1991 ShardRequest::ZPopMax { key, count } => match ks.zpopmax(key, *count) {
1992 Ok(items) => ShardResponse::ZPopResult(items),
1993 Err(_) => ShardResponse::WrongType,
1994 },
1995 ShardRequest::LmpopSingle { key, left, count } => {
1996 let result = if *left {
1997 ks.lpop_count(key, *count)
1998 } else {
1999 ks.rpop_count(key, *count)
2000 };
2001 match result {
2002 Ok(Some(items)) => ShardResponse::Array(items),
2003 Ok(None) => ShardResponse::Value(None),
2004 Err(_) => ShardResponse::WrongType,
2005 }
2006 }
2007 ShardRequest::ZmpopSingle { key, min, count } => {
2008 let result = if *min {
2009 ks.zpopmin(key, *count)
2010 } else {
2011 ks.zpopmax(key, *count)
2012 };
2013 match result {
2014 Ok(items) if !items.is_empty() => ShardResponse::ZPopResult(items),
2015 Ok(_) => ShardResponse::Value(None),
2016 Err(_) => ShardResponse::WrongType,
2017 }
2018 }
2019 ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
2020 ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
2021 ShardRequest::KeyVersion { ref key } => ShardResponse::Version(ks.key_version(key)),
2022 ShardRequest::FlushDb => {
2023 ks.clear();
2024 ShardResponse::Ok
2025 }
2026 ShardRequest::Scan {
2027 cursor,
2028 count,
2029 pattern,
2030 } => {
2031 let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
2032 ShardResponse::Scan {
2033 cursor: next_cursor,
2034 keys,
2035 }
2036 }
2037 ShardRequest::HSet { key, fields } => write_result_len(ks.hset(key, fields)),
2038 ShardRequest::HGet { key, field } => match ks.hget(key, field) {
2039 Ok(val) => ShardResponse::Value(val.map(Value::String)),
2040 Err(_) => ShardResponse::WrongType,
2041 },
2042 ShardRequest::HGetAll { key } => match ks.hgetall(key) {
2043 Ok(fields) => ShardResponse::HashFields(fields),
2044 Err(_) => ShardResponse::WrongType,
2045 },
2046 ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
2047 Ok(removed) => ShardResponse::HDelLen {
2048 count: removed.len(),
2049 removed,
2050 },
2051 Err(_) => ShardResponse::WrongType,
2052 },
2053 ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
2054 Ok(exists) => ShardResponse::Bool(exists),
2055 Err(_) => ShardResponse::WrongType,
2056 },
2057 ShardRequest::HLen { key } => match ks.hlen(key) {
2058 Ok(len) => ShardResponse::Len(len),
2059 Err(_) => ShardResponse::WrongType,
2060 },
2061 ShardRequest::HIncrBy { key, field, delta } => incr_result(ks.hincrby(key, field, *delta)),
2062 ShardRequest::HIncrByFloat { key, field, delta } => {
2063 match ks.hincrbyfloat(key, field, *delta) {
2064 Ok(val) => ShardResponse::BulkString(val),
2065 Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
2066 Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
2067 Err(e) => ShardResponse::Err(e.to_string()),
2068 }
2069 }
2070 ShardRequest::HKeys { key } => match ks.hkeys(key) {
2071 Ok(keys) => ShardResponse::StringArray(keys),
2072 Err(_) => ShardResponse::WrongType,
2073 },
2074 ShardRequest::HVals { key } => match ks.hvals(key) {
2075 Ok(vals) => ShardResponse::Array(vals),
2076 Err(_) => ShardResponse::WrongType,
2077 },
2078 ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
2079 Ok(vals) => ShardResponse::OptionalArray(vals),
2080 Err(_) => ShardResponse::WrongType,
2081 },
2082 ShardRequest::HRandField {
2083 key,
2084 count,
2085 with_values,
2086 } => match ks.hrandfield(key, *count, *with_values) {
2087 Ok(pairs) => ShardResponse::HRandFieldResult(pairs),
2088 Err(_) => ShardResponse::WrongType,
2089 },
2090 ShardRequest::SAdd { key, members } => write_result_len(ks.sadd(key, members)),
2091 ShardRequest::SRem { key, members } => match ks.srem(key, members) {
2092 Ok(count) => ShardResponse::Len(count),
2093 Err(_) => ShardResponse::WrongType,
2094 },
2095 ShardRequest::SMembers { key } => match ks.smembers(key) {
2096 Ok(members) => ShardResponse::StringArray(members),
2097 Err(_) => ShardResponse::WrongType,
2098 },
2099 ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
2100 Ok(exists) => ShardResponse::Bool(exists),
2101 Err(_) => ShardResponse::WrongType,
2102 },
2103 ShardRequest::SCard { key } => match ks.scard(key) {
2104 Ok(count) => ShardResponse::Len(count),
2105 Err(_) => ShardResponse::WrongType,
2106 },
2107 ShardRequest::SUnion { keys } => match ks.sunion(keys) {
2108 Ok(members) => ShardResponse::StringArray(members),
2109 Err(_) => ShardResponse::WrongType,
2110 },
2111 ShardRequest::SInter { keys } => match ks.sinter(keys) {
2112 Ok(members) => ShardResponse::StringArray(members),
2113 Err(_) => ShardResponse::WrongType,
2114 },
2115 ShardRequest::SDiff { keys } => match ks.sdiff(keys) {
2116 Ok(members) => ShardResponse::StringArray(members),
2117 Err(_) => ShardResponse::WrongType,
2118 },
2119 ShardRequest::SUnionStore { dest, keys } => store_set_response(ks.sunionstore(dest, keys)),
2120 ShardRequest::SInterStore { dest, keys } => store_set_response(ks.sinterstore(dest, keys)),
2121 ShardRequest::SDiffStore { dest, keys } => store_set_response(ks.sdiffstore(dest, keys)),
2122 ShardRequest::SRandMember { key, count } => match ks.srandmember(key, *count) {
2123 Ok(members) => ShardResponse::StringArray(members),
2124 Err(_) => ShardResponse::WrongType,
2125 },
2126 ShardRequest::SPop { key, count } => match ks.spop(key, *count) {
2127 Ok(members) => ShardResponse::StringArray(members),
2128 Err(_) => ShardResponse::WrongType,
2129 },
2130 ShardRequest::SMisMember { key, members } => match ks.smismember(key, members) {
2131 Ok(results) => ShardResponse::BoolArray(results),
2132 Err(_) => ShardResponse::WrongType,
2133 },
2134 ShardRequest::SMove {
2135 source,
2136 destination,
2137 member,
2138 } => match ks.smove(source, destination, member) {
2139 Ok(moved) => ShardResponse::Bool(moved),
2140 Err(WriteError::WrongType) => ShardResponse::WrongType,
2141 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2142 },
2143 ShardRequest::SInterCard { keys, limit } => match ks.sintercard(keys, *limit) {
2144 Ok(n) => ShardResponse::Integer(n as i64),
2145 Err(_) => ShardResponse::WrongType,
2146 },
2147 ShardRequest::Expiretime { key } => ShardResponse::Integer(ks.expiretime(key)),
2148 ShardRequest::Pexpiretime { key } => ShardResponse::Integer(ks.pexpiretime(key)),
2149 ShardRequest::LMove {
2150 source,
2151 destination,
2152 src_left,
2153 dst_left,
2154 } => match ks.lmove(source, destination, *src_left, *dst_left) {
2155 Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2156 Ok(None) => ShardResponse::Value(None),
2157 Err(e) => match e {
2158 WriteError::WrongType => ShardResponse::WrongType,
2159 WriteError::OutOfMemory => ShardResponse::OutOfMemory,
2160 },
2161 },
2162 ShardRequest::GetDel { key } => match ks.getdel(key) {
2163 Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2164 Ok(None) => ShardResponse::Value(None),
2165 Err(_) => ShardResponse::WrongType,
2166 },
2167 ShardRequest::GetEx { key, expire } => {
2168 let dur = expire.map(|opt| opt.map(Duration::from_millis));
2169 match ks.getex(key, dur) {
2170 Ok(Some(v)) => ShardResponse::Value(Some(Value::String(v))),
2171 Ok(None) => ShardResponse::Value(None),
2172 Err(_) => ShardResponse::WrongType,
2173 }
2174 }
2175 ShardRequest::GetSet { key, value } => match ks.getset(key, value.clone()) {
2176 Ok(old) => ShardResponse::Value(old.map(Value::String)),
2177 Err(_) => ShardResponse::WrongType,
2178 },
2179 ShardRequest::MSetNx { pairs } => {
2180 let result = ks.msetnx(pairs);
2181 ShardResponse::Bool(result)
2182 }
2183 ShardRequest::ZDiff { keys } => match ks.zdiff(keys) {
2184 Ok(pairs) => ShardResponse::ScoredArray(pairs),
2185 Err(_) => ShardResponse::WrongType,
2186 },
2187 ShardRequest::ZInter { keys } => match ks.zinter(keys) {
2188 Ok(pairs) => ShardResponse::ScoredArray(pairs),
2189 Err(_) => ShardResponse::WrongType,
2190 },
2191 ShardRequest::ZUnion { keys } => match ks.zunion(keys) {
2192 Ok(pairs) => ShardResponse::ScoredArray(pairs),
2193 Err(_) => ShardResponse::WrongType,
2194 },
2195 ShardRequest::ZDiffStore { dest, keys } => match ks.zdiffstore(dest, keys) {
2196 Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2197 Err(_) => ShardResponse::WrongType,
2198 },
2199 ShardRequest::ZInterStore { dest, keys } => match ks.zinterstore(dest, keys) {
2200 Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2201 Err(_) => ShardResponse::WrongType,
2202 },
2203 ShardRequest::ZUnionStore { dest, keys } => match ks.zunionstore(dest, keys) {
2204 Ok((count, members)) => ShardResponse::ZStoreResult { count, members },
2205 Err(_) => ShardResponse::WrongType,
2206 },
2207 ShardRequest::ZRandMember {
2208 key,
2209 count,
2210 with_scores,
2211 } => match ks.zrandmember(key, *count, *with_scores) {
2212 Ok(pairs) => ShardResponse::ZRandMemberResult(pairs),
2213 Err(_) => ShardResponse::WrongType,
2214 },
2215 ShardRequest::SScan {
2216 key,
2217 cursor,
2218 count,
2219 pattern,
2220 } => match ks.scan_set(key, *cursor, *count, pattern.as_deref()) {
2221 Ok((next, members)) => {
2222 let items = members.into_iter().map(Bytes::from).collect();
2223 ShardResponse::CollectionScan {
2224 cursor: next,
2225 items,
2226 }
2227 }
2228 Err(_) => ShardResponse::WrongType,
2229 },
2230 ShardRequest::HScan {
2231 key,
2232 cursor,
2233 count,
2234 pattern,
2235 } => match ks.scan_hash(key, *cursor, *count, pattern.as_deref()) {
2236 Ok((next, fields)) => {
2237 let mut items = Vec::with_capacity(fields.len() * 2);
2238 for (field, value) in fields {
2239 items.push(Bytes::from(field));
2240 items.push(value);
2241 }
2242 ShardResponse::CollectionScan {
2243 cursor: next,
2244 items,
2245 }
2246 }
2247 Err(_) => ShardResponse::WrongType,
2248 },
2249 ShardRequest::ZScan {
2250 key,
2251 cursor,
2252 count,
2253 pattern,
2254 } => match ks.scan_sorted_set(key, *cursor, *count, pattern.as_deref()) {
2255 Ok((next, members)) => {
2256 let mut items = Vec::with_capacity(members.len() * 2);
2257 for (member, score) in members {
2258 items.push(Bytes::from(member));
2259 items.push(Bytes::from(score.to_string()));
2260 }
2261 ShardResponse::CollectionScan {
2262 cursor: next,
2263 items,
2264 }
2265 }
2266 Err(_) => ShardResponse::WrongType,
2267 },
2268 ShardRequest::CountKeysInSlot { slot } => {
2269 ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
2270 }
2271 ShardRequest::GetKeysInSlot { slot, count } => {
2272 ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
2273 }
2274 ShardRequest::DumpKey { key } => match ks.dump(key) {
2275 Some((value, ttl_ms)) => {
2276 let snap = persistence::value_to_snap(value);
2277 match snapshot::serialize_snap_value(&snap) {
2278 Ok(data) => ShardResponse::KeyDump { data, ttl_ms },
2279 Err(e) => ShardResponse::Err(format!("ERR snapshot serialization failed: {e}")),
2280 }
2281 }
2282 None => ShardResponse::Value(None),
2283 },
2284 ShardRequest::RestoreKey {
2285 key,
2286 ttl_ms,
2287 data,
2288 replace,
2289 } => match snapshot::deserialize_snap_value(data) {
2290 Ok(snap) => {
2291 let exists = ks.exists(key);
2292 if exists && !*replace {
2293 ShardResponse::Err("ERR Target key name already exists".into())
2294 } else {
2295 let value = persistence::snap_to_value(snap);
2296 let ttl = if *ttl_ms == 0 {
2297 None
2298 } else {
2299 Some(Duration::from_millis(*ttl_ms))
2300 };
2301 ks.restore(key.clone(), value, ttl);
2302 ShardResponse::Ok
2303 }
2304 }
2305 Err(e) => ShardResponse::Err(format!("ERR DUMP payload corrupted: {e}")),
2306 },
2307 #[cfg(feature = "vector")]
2308 ShardRequest::VAdd {
2309 key,
2310 element,
2311 vector,
2312 metric,
2313 quantization,
2314 connectivity,
2315 expansion_add,
2316 } => {
2317 use crate::types::vector::{DistanceMetric, QuantizationType};
2318 match ks.vadd(
2319 key,
2320 element.clone(),
2321 vector.clone(),
2322 DistanceMetric::from_u8(*metric),
2323 QuantizationType::from_u8(*quantization),
2324 *connectivity as usize,
2325 *expansion_add as usize,
2326 ) {
2327 Ok(result) => ShardResponse::VAddResult {
2328 element: result.element,
2329 vector: result.vector,
2330 added: result.added,
2331 },
2332 Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
2333 Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2334 Err(crate::keyspace::VectorWriteError::IndexError(e))
2335 | Err(crate::keyspace::VectorWriteError::PartialBatch { message: e, .. }) => {
2336 ShardResponse::Err(format!("ERR vector index: {e}"))
2337 }
2338 }
2339 }
2340 #[cfg(feature = "vector")]
2341 ShardRequest::VAddBatch {
2342 key,
2343 entries,
2344 metric,
2345 quantization,
2346 connectivity,
2347 expansion_add,
2348 ..
2349 } => {
2350 use crate::types::vector::{DistanceMetric, QuantizationType};
2351 let owned_entries = std::mem::take(entries);
2356 match ks.vadd_batch(
2357 key,
2358 owned_entries,
2359 DistanceMetric::from_u8(*metric),
2360 QuantizationType::from_u8(*quantization),
2361 *connectivity as usize,
2362 *expansion_add as usize,
2363 ) {
2364 Ok(result) => ShardResponse::VAddBatchResult {
2365 added_count: result.added_count,
2366 applied: result.applied,
2367 },
2368 Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
2369 Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
2370 Err(crate::keyspace::VectorWriteError::IndexError(e)) => {
2371 ShardResponse::Err(format!("ERR vector index: {e}"))
2372 }
2373 Err(crate::keyspace::VectorWriteError::PartialBatch { applied, .. }) => {
2374 ShardResponse::VAddBatchResult {
2376 added_count: applied.len(),
2377 applied,
2378 }
2379 }
2380 }
2381 }
2382 #[cfg(feature = "vector")]
2383 ShardRequest::VSim {
2384 key,
2385 query,
2386 count,
2387 ef_search,
2388 } => match ks.vsim(key, query, *count, *ef_search) {
2389 Ok(results) => ShardResponse::VSimResult(
2390 results
2391 .into_iter()
2392 .map(|r| (r.element, r.distance))
2393 .collect(),
2394 ),
2395 Err(_) => ShardResponse::WrongType,
2396 },
2397 #[cfg(feature = "vector")]
2398 ShardRequest::VRem { key, element } => match ks.vrem(key, element) {
2399 Ok(removed) => ShardResponse::Bool(removed),
2400 Err(_) => ShardResponse::WrongType,
2401 },
2402 #[cfg(feature = "vector")]
2403 ShardRequest::VGet { key, element } => match ks.vget(key, element) {
2404 Ok(data) => ShardResponse::VectorData(data),
2405 Err(_) => ShardResponse::WrongType,
2406 },
2407 #[cfg(feature = "vector")]
2408 ShardRequest::VCard { key } => match ks.vcard(key) {
2409 Ok(count) => ShardResponse::Integer(count as i64),
2410 Err(_) => ShardResponse::WrongType,
2411 },
2412 #[cfg(feature = "vector")]
2413 ShardRequest::VDim { key } => match ks.vdim(key) {
2414 Ok(dim) => ShardResponse::Integer(dim as i64),
2415 Err(_) => ShardResponse::WrongType,
2416 },
2417 #[cfg(feature = "vector")]
2418 ShardRequest::VInfo { key } => match ks.vinfo(key) {
2419 Ok(Some(info)) => {
2420 let fields = vec![
2421 ("dim".to_owned(), info.dim.to_string()),
2422 ("count".to_owned(), info.count.to_string()),
2423 ("metric".to_owned(), info.metric.to_string()),
2424 ("quantization".to_owned(), info.quantization.to_string()),
2425 ("connectivity".to_owned(), info.connectivity.to_string()),
2426 ("expansion_add".to_owned(), info.expansion_add.to_string()),
2427 ];
2428 ShardResponse::VectorInfo(Some(fields))
2429 }
2430 Ok(None) => ShardResponse::VectorInfo(None),
2431 Err(_) => ShardResponse::WrongType,
2432 },
2433 #[cfg(feature = "protobuf")]
2434 ShardRequest::ProtoSet {
2435 key,
2436 type_name,
2437 data,
2438 expire,
2439 nx,
2440 xx,
2441 } => {
2442 if *nx && ks.exists(key) {
2443 return ShardResponse::Value(None);
2444 }
2445 if *xx && !ks.exists(key) {
2446 return ShardResponse::Value(None);
2447 }
2448 match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
2449 SetResult::Ok | SetResult::Blocked => ShardResponse::Ok,
2450 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
2451 }
2452 }
2453 #[cfg(feature = "protobuf")]
2454 ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
2455 Ok(val) => ShardResponse::ProtoValue(val),
2456 Err(_) => ShardResponse::WrongType,
2457 },
2458 #[cfg(feature = "protobuf")]
2459 ShardRequest::ProtoType { key } => match ks.proto_type(key) {
2460 Ok(name) => ShardResponse::ProtoTypeName(name),
2461 Err(_) => ShardResponse::WrongType,
2462 },
2463 #[cfg(feature = "protobuf")]
2466 ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
2467 #[cfg(feature = "protobuf")]
2468 ShardRequest::ProtoSetField {
2469 key,
2470 field_path,
2471 value,
2472 } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2473 let new_data = reg.set_field(type_name, data, field_path, value)?;
2474 Ok(ShardResponse::ProtoFieldUpdated {
2475 type_name: type_name.to_owned(),
2476 data: new_data,
2477 expire: ttl,
2478 })
2479 }),
2480 #[cfg(feature = "protobuf")]
2481 ShardRequest::ProtoDelField { key, field_path } => {
2482 dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2483 let new_data = reg.clear_field(type_name, data, field_path)?;
2484 Ok(ShardResponse::ProtoFieldUpdated {
2485 type_name: type_name.to_owned(),
2486 data: new_data,
2487 expire: ttl,
2488 })
2489 })
2490 }
2491 ShardRequest::Snapshot
2493 | ShardRequest::SerializeSnapshot
2494 | ShardRequest::RewriteAof
2495 | ShardRequest::FlushDbAsync
2496 | ShardRequest::UpdateMemoryConfig { .. }
2497 | ShardRequest::BLPop { .. }
2498 | ShardRequest::BRPop { .. } => ShardResponse::Ok,
2499 }
2500}
2501
2502#[cfg(feature = "protobuf")]
2508fn dispatch_proto_field_op<F>(
2509 ks: &mut Keyspace,
2510 schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
2511 key: &str,
2512 mutate: F,
2513) -> ShardResponse
2514where
2515 F: FnOnce(
2516 &crate::schema::SchemaRegistry,
2517 &str,
2518 &[u8],
2519 Option<Duration>,
2520 ) -> Result<ShardResponse, crate::schema::SchemaError>,
2521{
2522 let registry = match schema_registry {
2523 Some(r) => r,
2524 None => return ShardResponse::Err("protobuf support is not enabled".into()),
2525 };
2526
2527 let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
2528 Ok(Some(tuple)) => tuple,
2529 Ok(None) => return ShardResponse::Value(None),
2530 Err(_) => return ShardResponse::WrongType,
2531 };
2532
2533 let reg = match registry.read() {
2534 Ok(r) => r,
2535 Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
2536 };
2537
2538 let resp = match mutate(®, &type_name, &data, remaining_ttl) {
2539 Ok(r) => r,
2540 Err(e) => return ShardResponse::Err(e.to_string()),
2541 };
2542
2543 if let ShardResponse::ProtoFieldUpdated {
2545 ref type_name,
2546 ref data,
2547 expire,
2548 } = resp
2549 {
2550 ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
2551 }
2552
2553 resp
2554}
2555
2556#[cfg(test)]
2557mod tests {
2558 use super::*;
2559
2560 fn test_dispatch(ks: &mut Keyspace, mut req: ShardRequest) -> ShardResponse {
2562 dispatch(
2563 ks,
2564 &mut req,
2565 #[cfg(feature = "protobuf")]
2566 &None,
2567 )
2568 }
2569
2570 #[test]
2571 fn dispatch_set_and_get() {
2572 let mut ks = Keyspace::new();
2573
2574 let resp = test_dispatch(
2575 &mut ks,
2576 ShardRequest::Set {
2577 key: "k".into(),
2578 value: Bytes::from("v"),
2579 expire: None,
2580 nx: false,
2581 xx: false,
2582 },
2583 );
2584 assert!(matches!(resp, ShardResponse::Ok));
2585
2586 let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "k".into() });
2587 match resp {
2588 ShardResponse::Value(Some(Value::String(data))) => {
2589 assert_eq!(data, Bytes::from("v"));
2590 }
2591 other => panic!("expected Value(Some(String)), got {other:?}"),
2592 }
2593 }
2594
2595 #[test]
2596 fn dispatch_get_missing() {
2597 let mut ks = Keyspace::new();
2598 let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "nope".into() });
2599 assert!(matches!(resp, ShardResponse::Value(None)));
2600 }
2601
2602 #[test]
2603 fn dispatch_del() {
2604 let mut ks = Keyspace::new();
2605 ks.set("key".into(), Bytes::from("val"), None, false, false);
2606
2607 let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2608 assert!(matches!(resp, ShardResponse::Bool(true)));
2609
2610 let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2611 assert!(matches!(resp, ShardResponse::Bool(false)));
2612 }
2613
2614 #[test]
2615 fn dispatch_exists() {
2616 let mut ks = Keyspace::new();
2617 ks.set("yes".into(), Bytes::from("here"), None, false, false);
2618
2619 let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "yes".into() });
2620 assert!(matches!(resp, ShardResponse::Bool(true)));
2621
2622 let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "no".into() });
2623 assert!(matches!(resp, ShardResponse::Bool(false)));
2624 }
2625
2626 #[test]
2627 fn dispatch_expire_and_ttl() {
2628 let mut ks = Keyspace::new();
2629 ks.set("key".into(), Bytes::from("val"), None, false, false);
2630
2631 let resp = test_dispatch(
2632 &mut ks,
2633 ShardRequest::Expire {
2634 key: "key".into(),
2635 seconds: 60,
2636 },
2637 );
2638 assert!(matches!(resp, ShardResponse::Bool(true)));
2639
2640 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2641 match resp {
2642 ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
2643 other => panic!("expected Ttl(Seconds), got {other:?}"),
2644 }
2645 }
2646
2647 #[test]
2648 fn dispatch_ttl_missing() {
2649 let mut ks = Keyspace::new();
2650 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "gone".into() });
2651 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2652 }
2653
2654 #[test]
2655 fn dispatch_incr_new_key() {
2656 let mut ks = Keyspace::new();
2657 let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "c".into() });
2658 assert!(matches!(resp, ShardResponse::Integer(1)));
2659 }
2660
2661 #[test]
2662 fn dispatch_decr_existing() {
2663 let mut ks = Keyspace::new();
2664 ks.set("n".into(), Bytes::from("10"), None, false, false);
2665 let resp = test_dispatch(&mut ks, ShardRequest::Decr { key: "n".into() });
2666 assert!(matches!(resp, ShardResponse::Integer(9)));
2667 }
2668
2669 #[test]
2670 fn dispatch_incr_non_integer() {
2671 let mut ks = Keyspace::new();
2672 ks.set("s".into(), Bytes::from("hello"), None, false, false);
2673 let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "s".into() });
2674 assert!(matches!(resp, ShardResponse::Err(_)));
2675 }
2676
2677 #[test]
2678 fn dispatch_incrby() {
2679 let mut ks = Keyspace::new();
2680 ks.set("n".into(), Bytes::from("10"), None, false, false);
2681 let resp = test_dispatch(
2682 &mut ks,
2683 ShardRequest::IncrBy {
2684 key: "n".into(),
2685 delta: 5,
2686 },
2687 );
2688 assert!(matches!(resp, ShardResponse::Integer(15)));
2689 }
2690
2691 #[test]
2692 fn dispatch_decrby() {
2693 let mut ks = Keyspace::new();
2694 ks.set("n".into(), Bytes::from("10"), None, false, false);
2695 let resp = test_dispatch(
2696 &mut ks,
2697 ShardRequest::DecrBy {
2698 key: "n".into(),
2699 delta: 3,
2700 },
2701 );
2702 assert!(matches!(resp, ShardResponse::Integer(7)));
2703 }
2704
2705 #[test]
2706 fn dispatch_incrby_new_key() {
2707 let mut ks = Keyspace::new();
2708 let resp = test_dispatch(
2709 &mut ks,
2710 ShardRequest::IncrBy {
2711 key: "new".into(),
2712 delta: 42,
2713 },
2714 );
2715 assert!(matches!(resp, ShardResponse::Integer(42)));
2716 }
2717
2718 #[test]
2719 fn dispatch_incrbyfloat() {
2720 let mut ks = Keyspace::new();
2721 ks.set("n".into(), Bytes::from("10.5"), None, false, false);
2722 let resp = test_dispatch(
2723 &mut ks,
2724 ShardRequest::IncrByFloat {
2725 key: "n".into(),
2726 delta: 2.3,
2727 },
2728 );
2729 match resp {
2730 ShardResponse::BulkString(val) => {
2731 let f: f64 = val.parse().unwrap();
2732 assert!((f - 12.8).abs() < 0.001);
2733 }
2734 other => panic!("expected BulkString, got {other:?}"),
2735 }
2736 }
2737
2738 #[test]
2739 fn dispatch_append() {
2740 let mut ks = Keyspace::new();
2741 ks.set("k".into(), Bytes::from("hello"), None, false, false);
2742 let resp = test_dispatch(
2743 &mut ks,
2744 ShardRequest::Append {
2745 key: "k".into(),
2746 value: Bytes::from(" world"),
2747 },
2748 );
2749 assert!(matches!(resp, ShardResponse::Len(11)));
2750 }
2751
2752 #[test]
2753 fn dispatch_strlen() {
2754 let mut ks = Keyspace::new();
2755 ks.set("k".into(), Bytes::from("hello"), None, false, false);
2756 let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "k".into() });
2757 assert!(matches!(resp, ShardResponse::Len(5)));
2758 }
2759
2760 #[test]
2761 fn dispatch_strlen_missing() {
2762 let mut ks = Keyspace::new();
2763 let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "nope".into() });
2764 assert!(matches!(resp, ShardResponse::Len(0)));
2765 }
2766
2767 #[test]
2768 fn dispatch_incrbyfloat_new_key() {
2769 let mut ks = Keyspace::new();
2770 let resp = test_dispatch(
2771 &mut ks,
2772 ShardRequest::IncrByFloat {
2773 key: "new".into(),
2774 delta: 2.72,
2775 },
2776 );
2777 match resp {
2778 ShardResponse::BulkString(val) => {
2779 let f: f64 = val.parse().unwrap();
2780 assert!((f - 2.72).abs() < 0.001);
2781 }
2782 other => panic!("expected BulkString, got {other:?}"),
2783 }
2784 }
2785
2786 #[test]
2787 fn dispatch_persist_removes_ttl() {
2788 let mut ks = Keyspace::new();
2789 ks.set(
2790 "key".into(),
2791 Bytes::from("val"),
2792 Some(Duration::from_secs(60)),
2793 false,
2794 false,
2795 );
2796
2797 let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "key".into() });
2798 assert!(matches!(resp, ShardResponse::Bool(true)));
2799
2800 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2801 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
2802 }
2803
2804 #[test]
2805 fn dispatch_persist_missing_key() {
2806 let mut ks = Keyspace::new();
2807 let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "nope".into() });
2808 assert!(matches!(resp, ShardResponse::Bool(false)));
2809 }
2810
2811 #[test]
2812 fn dispatch_pttl() {
2813 let mut ks = Keyspace::new();
2814 ks.set(
2815 "key".into(),
2816 Bytes::from("val"),
2817 Some(Duration::from_secs(60)),
2818 false,
2819 false,
2820 );
2821
2822 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2823 match resp {
2824 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2825 assert!(ms > 59_000 && ms <= 60_000);
2826 }
2827 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2828 }
2829 }
2830
2831 #[test]
2832 fn dispatch_pttl_missing() {
2833 let mut ks = Keyspace::new();
2834 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "nope".into() });
2835 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2836 }
2837
2838 #[test]
2839 fn dispatch_pexpire() {
2840 let mut ks = Keyspace::new();
2841 ks.set("key".into(), Bytes::from("val"), None, false, false);
2842
2843 let resp = test_dispatch(
2844 &mut ks,
2845 ShardRequest::Pexpire {
2846 key: "key".into(),
2847 milliseconds: 5000,
2848 },
2849 );
2850 assert!(matches!(resp, ShardResponse::Bool(true)));
2851
2852 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2853 match resp {
2854 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2855 assert!(ms > 4000 && ms <= 5000);
2856 }
2857 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2858 }
2859 }
2860
2861 #[test]
2862 fn dispatch_set_nx_when_key_missing() {
2863 let mut ks = Keyspace::new();
2864 let resp = test_dispatch(
2865 &mut ks,
2866 ShardRequest::Set {
2867 key: "k".into(),
2868 value: Bytes::from("v"),
2869 expire: None,
2870 nx: true,
2871 xx: false,
2872 },
2873 );
2874 assert!(matches!(resp, ShardResponse::Ok));
2875 assert!(ks.exists("k"));
2876 }
2877
2878 #[test]
2879 fn dispatch_set_nx_when_key_exists() {
2880 let mut ks = Keyspace::new();
2881 ks.set("k".into(), Bytes::from("old"), None, false, false);
2882
2883 let resp = test_dispatch(
2884 &mut ks,
2885 ShardRequest::Set {
2886 key: "k".into(),
2887 value: Bytes::from("new"),
2888 expire: None,
2889 nx: true,
2890 xx: false,
2891 },
2892 );
2893 assert!(matches!(resp, ShardResponse::Value(None)));
2895 match ks.get("k").unwrap() {
2897 Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2898 other => panic!("expected old value, got {other:?}"),
2899 }
2900 }
2901
2902 #[test]
2903 fn dispatch_set_xx_when_key_exists() {
2904 let mut ks = Keyspace::new();
2905 ks.set("k".into(), Bytes::from("old"), None, false, false);
2906
2907 let resp = test_dispatch(
2908 &mut ks,
2909 ShardRequest::Set {
2910 key: "k".into(),
2911 value: Bytes::from("new"),
2912 expire: None,
2913 nx: false,
2914 xx: true,
2915 },
2916 );
2917 assert!(matches!(resp, ShardResponse::Ok));
2918 match ks.get("k").unwrap() {
2919 Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2920 other => panic!("expected new value, got {other:?}"),
2921 }
2922 }
2923
2924 #[test]
2925 fn dispatch_set_xx_when_key_missing() {
2926 let mut ks = Keyspace::new();
2927 let resp = test_dispatch(
2928 &mut ks,
2929 ShardRequest::Set {
2930 key: "k".into(),
2931 value: Bytes::from("v"),
2932 expire: None,
2933 nx: false,
2934 xx: true,
2935 },
2936 );
2937 assert!(matches!(resp, ShardResponse::Value(None)));
2939 assert!(!ks.exists("k"));
2940 }
2941
2942 #[test]
2943 fn dispatch_flushdb_clears_all_keys() {
2944 let mut ks = Keyspace::new();
2945 ks.set("a".into(), Bytes::from("1"), None, false, false);
2946 ks.set("b".into(), Bytes::from("2"), None, false, false);
2947
2948 assert_eq!(ks.len(), 2);
2949
2950 let resp = test_dispatch(&mut ks, ShardRequest::FlushDb);
2951 assert!(matches!(resp, ShardResponse::Ok));
2952 assert_eq!(ks.len(), 0);
2953 }
2954
2955 #[test]
2956 fn dispatch_scan_returns_keys() {
2957 let mut ks = Keyspace::new();
2958 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2959 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2960 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2961
2962 let resp = test_dispatch(
2963 &mut ks,
2964 ShardRequest::Scan {
2965 cursor: 0,
2966 count: 10,
2967 pattern: None,
2968 },
2969 );
2970
2971 match resp {
2972 ShardResponse::Scan { cursor, keys } => {
2973 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
2975 }
2976 _ => panic!("expected Scan response"),
2977 }
2978 }
2979
2980 #[test]
2981 fn dispatch_scan_with_pattern() {
2982 let mut ks = Keyspace::new();
2983 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2984 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2985 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2986
2987 let resp = test_dispatch(
2988 &mut ks,
2989 ShardRequest::Scan {
2990 cursor: 0,
2991 count: 10,
2992 pattern: Some("user:*".into()),
2993 },
2994 );
2995
2996 match resp {
2997 ShardResponse::Scan { cursor, keys } => {
2998 assert_eq!(cursor, 0);
2999 assert_eq!(keys.len(), 2);
3000 for k in &keys {
3001 assert!(k.starts_with("user:"));
3002 }
3003 }
3004 _ => panic!("expected Scan response"),
3005 }
3006 }
3007
3008 #[test]
3009 fn dispatch_keys() {
3010 let mut ks = Keyspace::new();
3011 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
3012 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
3013 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
3014 let resp = test_dispatch(
3015 &mut ks,
3016 ShardRequest::Keys {
3017 pattern: "user:*".into(),
3018 },
3019 );
3020 match resp {
3021 ShardResponse::StringArray(mut keys) => {
3022 keys.sort();
3023 assert_eq!(keys, vec!["user:1", "user:2"]);
3024 }
3025 other => panic!("expected StringArray, got {other:?}"),
3026 }
3027 }
3028
3029 #[test]
3030 fn dispatch_rename() {
3031 let mut ks = Keyspace::new();
3032 ks.set("old".into(), Bytes::from("value"), None, false, false);
3033 let resp = test_dispatch(
3034 &mut ks,
3035 ShardRequest::Rename {
3036 key: "old".into(),
3037 newkey: "new".into(),
3038 },
3039 );
3040 assert!(matches!(resp, ShardResponse::Ok));
3041 assert!(!ks.exists("old"));
3042 assert!(ks.exists("new"));
3043 }
3044
3045 #[test]
3046 fn dispatch_rename_missing_key() {
3047 let mut ks = Keyspace::new();
3048 let resp = test_dispatch(
3049 &mut ks,
3050 ShardRequest::Rename {
3051 key: "missing".into(),
3052 newkey: "new".into(),
3053 },
3054 );
3055 assert!(matches!(resp, ShardResponse::Err(_)));
3056 }
3057
3058 #[test]
3059 fn dump_key_returns_serialized_value() {
3060 let mut ks = Keyspace::new();
3061 ks.set(
3062 "greeting".into(),
3063 Bytes::from("hello"),
3064 Some(Duration::from_secs(60)),
3065 false,
3066 false,
3067 );
3068
3069 let resp = test_dispatch(
3070 &mut ks,
3071 ShardRequest::DumpKey {
3072 key: "greeting".into(),
3073 },
3074 );
3075 match resp {
3076 ShardResponse::KeyDump { data, ttl_ms } => {
3077 assert!(!data.is_empty());
3078 assert!(ttl_ms > 0);
3079 let snap = snapshot::deserialize_snap_value(&data).unwrap();
3081 assert!(matches!(snap, SnapValue::String(ref b) if b == &Bytes::from("hello")));
3082 }
3083 other => panic!("expected KeyDump, got {other:?}"),
3084 }
3085 }
3086
3087 #[test]
3088 fn dump_key_missing_returns_none() {
3089 let mut ks = Keyspace::new();
3090 let resp = test_dispatch(&mut ks, ShardRequest::DumpKey { key: "nope".into() });
3091 assert!(matches!(resp, ShardResponse::Value(None)));
3092 }
3093
3094 #[test]
3095 fn restore_key_inserts_value() {
3096 let mut ks = Keyspace::new();
3097 let snap = SnapValue::String(Bytes::from("restored"));
3098 let data = snapshot::serialize_snap_value(&snap).unwrap();
3099
3100 let resp = test_dispatch(
3101 &mut ks,
3102 ShardRequest::RestoreKey {
3103 key: "mykey".into(),
3104 ttl_ms: 0,
3105 data: Bytes::from(data),
3106 replace: false,
3107 },
3108 );
3109 assert!(matches!(resp, ShardResponse::Ok));
3110 assert_eq!(
3111 ks.get("mykey").unwrap(),
3112 Some(Value::String(Bytes::from("restored")))
3113 );
3114 }
3115
3116 #[test]
3117 fn restore_key_with_ttl() {
3118 let mut ks = Keyspace::new();
3119 let snap = SnapValue::String(Bytes::from("temp"));
3120 let data = snapshot::serialize_snap_value(&snap).unwrap();
3121
3122 let resp = test_dispatch(
3123 &mut ks,
3124 ShardRequest::RestoreKey {
3125 key: "ttlkey".into(),
3126 ttl_ms: 30_000,
3127 data: Bytes::from(data),
3128 replace: false,
3129 },
3130 );
3131 assert!(matches!(resp, ShardResponse::Ok));
3132 match ks.pttl("ttlkey") {
3133 TtlResult::Milliseconds(ms) => assert!(ms > 29_000 && ms <= 30_000),
3134 other => panic!("expected Milliseconds, got {other:?}"),
3135 }
3136 }
3137
3138 #[test]
3139 fn restore_key_rejects_duplicate_without_replace() {
3140 let mut ks = Keyspace::new();
3141 ks.set("existing".into(), Bytes::from("old"), None, false, false);
3142
3143 let snap = SnapValue::String(Bytes::from("new"));
3144 let data = snapshot::serialize_snap_value(&snap).unwrap();
3145
3146 let resp = test_dispatch(
3147 &mut ks,
3148 ShardRequest::RestoreKey {
3149 key: "existing".into(),
3150 ttl_ms: 0,
3151 data: Bytes::from(data),
3152 replace: false,
3153 },
3154 );
3155 assert!(matches!(resp, ShardResponse::Err(_)));
3156 assert_eq!(
3158 ks.get("existing").unwrap(),
3159 Some(Value::String(Bytes::from("old")))
3160 );
3161 }
3162
3163 #[test]
3164 fn restore_key_replace_overwrites() {
3165 let mut ks = Keyspace::new();
3166 ks.set("existing".into(), Bytes::from("old"), None, false, false);
3167
3168 let snap = SnapValue::String(Bytes::from("new"));
3169 let data = snapshot::serialize_snap_value(&snap).unwrap();
3170
3171 let resp = test_dispatch(
3172 &mut ks,
3173 ShardRequest::RestoreKey {
3174 key: "existing".into(),
3175 ttl_ms: 0,
3176 data: Bytes::from(data),
3177 replace: true,
3178 },
3179 );
3180 assert!(matches!(resp, ShardResponse::Ok));
3181 assert_eq!(
3182 ks.get("existing").unwrap(),
3183 Some(Value::String(Bytes::from("new")))
3184 );
3185 }
3186
3187 #[test]
3188 fn dump_and_restore_hash_roundtrip() {
3189 let mut ks = Keyspace::new();
3190 ks.hset(
3191 "myhash",
3192 &[
3193 ("f1".into(), Bytes::from("v1")),
3194 ("f2".into(), Bytes::from("v2")),
3195 ],
3196 )
3197 .unwrap();
3198
3199 let resp = test_dispatch(
3201 &mut ks,
3202 ShardRequest::DumpKey {
3203 key: "myhash".into(),
3204 },
3205 );
3206 let (data, _ttl) = match resp {
3207 ShardResponse::KeyDump { data, ttl_ms } => (data, ttl_ms),
3208 other => panic!("expected KeyDump, got {other:?}"),
3209 };
3210
3211 let resp = test_dispatch(
3213 &mut ks,
3214 ShardRequest::RestoreKey {
3215 key: "myhash2".into(),
3216 ttl_ms: 0,
3217 data: Bytes::from(data),
3218 replace: false,
3219 },
3220 );
3221 assert!(matches!(resp, ShardResponse::Ok));
3222
3223 assert_eq!(ks.hget("myhash2", "f1").unwrap(), Some(Bytes::from("v1")));
3225 assert_eq!(ks.hget("myhash2", "f2").unwrap(), Some(Bytes::from("v2")));
3226 }
3227
3228 #[test]
3229 fn is_write_classifies_correctly() {
3230 assert!(ShardRequest::Set {
3232 key: "k".into(),
3233 value: Bytes::from("v"),
3234 expire: None,
3235 nx: false,
3236 xx: false,
3237 }
3238 .is_write());
3239 assert!(ShardRequest::Del { key: "k".into() }.is_write());
3240 assert!(ShardRequest::Incr { key: "k".into() }.is_write());
3241 assert!(ShardRequest::LPush {
3242 key: "k".into(),
3243 values: vec![],
3244 }
3245 .is_write());
3246 assert!(ShardRequest::HSet {
3247 key: "k".into(),
3248 fields: vec![],
3249 }
3250 .is_write());
3251 assert!(ShardRequest::SAdd {
3252 key: "k".into(),
3253 members: vec![],
3254 }
3255 .is_write());
3256 assert!(ShardRequest::FlushDb.is_write());
3257
3258 assert!(!ShardRequest::Get { key: "k".into() }.is_write());
3260 assert!(!ShardRequest::Exists { key: "k".into() }.is_write());
3261 assert!(!ShardRequest::Ttl { key: "k".into() }.is_write());
3262 assert!(!ShardRequest::DbSize.is_write());
3263 assert!(!ShardRequest::Stats.is_write());
3264 assert!(!ShardRequest::LLen { key: "k".into() }.is_write());
3265 assert!(!ShardRequest::HGet {
3266 key: "k".into(),
3267 field: "f".into(),
3268 }
3269 .is_write());
3270 assert!(!ShardRequest::SMembers { key: "k".into() }.is_write());
3271 }
3272}