1mod aof;
47mod blocking;
48mod persistence;
49
50use std::collections::{HashMap, VecDeque};
51use std::path::PathBuf;
52use std::time::Duration;
53
54use bytes::Bytes;
55use ember_persistence::aof::{AofRecord, AofWriter, FsyncPolicy};
56use ember_persistence::recovery::{self, RecoveredValue};
57use ember_persistence::snapshot::{self, SnapEntry, SnapValue, SnapshotWriter};
58use smallvec::{smallvec, SmallVec};
59use tokio::sync::{broadcast, mpsc, oneshot};
60use tracing::{debug, error, info, warn};
61
62use crate::dropper::DropHandle;
63use crate::error::ShardError;
64use crate::expiry;
65use crate::keyspace::{
66 IncrError, IncrFloatError, Keyspace, KeyspaceStats, LsetError, SetResult, ShardConfig,
67 TtlResult, WriteError,
68};
69use crate::types::sorted_set::{ScoreBound, ZAddFlags};
70use crate::types::Value;
71
72const EXPIRY_TICK: Duration = Duration::from_millis(100);
75
76const FSYNC_INTERVAL: Duration = Duration::from_secs(1);
78
79#[derive(Debug, Clone)]
85pub struct ReplicationEvent {
86 pub shard_id: u16,
88 pub offset: u64,
90 pub record: AofRecord,
92}
93
94#[derive(Debug, Clone)]
96pub struct ShardPersistenceConfig {
97 pub data_dir: PathBuf,
99 pub append_only: bool,
101 pub fsync_policy: FsyncPolicy,
103 #[cfg(feature = "encryption")]
106 pub encryption_key: Option<ember_persistence::encryption::EncryptionKey>,
107}
108
109#[derive(Debug)]
111pub enum ShardRequest {
112 Get {
113 key: String,
114 },
115 Set {
116 key: String,
117 value: Bytes,
118 expire: Option<Duration>,
119 nx: bool,
121 xx: bool,
123 },
124 Incr {
125 key: String,
126 },
127 Decr {
128 key: String,
129 },
130 IncrBy {
131 key: String,
132 delta: i64,
133 },
134 DecrBy {
135 key: String,
136 delta: i64,
137 },
138 IncrByFloat {
139 key: String,
140 delta: f64,
141 },
142 Append {
143 key: String,
144 value: Bytes,
145 },
146 Strlen {
147 key: String,
148 },
149 GetRange {
150 key: String,
151 start: i64,
152 end: i64,
153 },
154 SetRange {
155 key: String,
156 offset: usize,
157 value: Bytes,
158 },
159 Keys {
161 pattern: String,
162 },
163 Rename {
165 key: String,
166 newkey: String,
167 },
168 Copy {
170 source: String,
171 destination: String,
172 replace: bool,
173 },
174 ObjectEncoding {
176 key: String,
177 },
178 Del {
179 key: String,
180 },
181 Unlink {
183 key: String,
184 },
185 Exists {
186 key: String,
187 },
188 RandomKey,
190 Touch {
192 key: String,
193 },
194 Sort {
196 key: String,
197 desc: bool,
198 alpha: bool,
199 limit: Option<(i64, i64)>,
200 },
201 Expire {
202 key: String,
203 seconds: u64,
204 },
205 Ttl {
206 key: String,
207 },
208 Persist {
209 key: String,
210 },
211 Pttl {
212 key: String,
213 },
214 Pexpire {
215 key: String,
216 milliseconds: u64,
217 },
218 LPush {
219 key: String,
220 values: Vec<Bytes>,
221 },
222 RPush {
223 key: String,
224 values: Vec<Bytes>,
225 },
226 LPop {
227 key: String,
228 },
229 RPop {
230 key: String,
231 },
232 BLPop {
237 key: String,
238 waiter: mpsc::Sender<(String, Bytes)>,
239 },
240 BRPop {
242 key: String,
243 waiter: mpsc::Sender<(String, Bytes)>,
244 },
245 LRange {
246 key: String,
247 start: i64,
248 stop: i64,
249 },
250 LLen {
251 key: String,
252 },
253 LIndex {
254 key: String,
255 index: i64,
256 },
257 LSet {
258 key: String,
259 index: i64,
260 value: Bytes,
261 },
262 LTrim {
263 key: String,
264 start: i64,
265 stop: i64,
266 },
267 LInsert {
268 key: String,
269 before: bool,
270 pivot: Bytes,
271 value: Bytes,
272 },
273 LRem {
274 key: String,
275 count: i64,
276 value: Bytes,
277 },
278 LPos {
279 key: String,
280 element: Bytes,
281 rank: i64,
282 count: usize,
283 maxlen: usize,
284 },
285 Type {
286 key: String,
287 },
288 ZAdd {
289 key: String,
290 members: Vec<(f64, String)>,
291 nx: bool,
292 xx: bool,
293 gt: bool,
294 lt: bool,
295 ch: bool,
296 },
297 ZRem {
298 key: String,
299 members: Vec<String>,
300 },
301 ZScore {
302 key: String,
303 member: String,
304 },
305 ZRank {
306 key: String,
307 member: String,
308 },
309 ZRevRank {
310 key: String,
311 member: String,
312 },
313 ZCard {
314 key: String,
315 },
316 ZRange {
317 key: String,
318 start: i64,
319 stop: i64,
320 with_scores: bool,
321 },
322 ZRevRange {
323 key: String,
324 start: i64,
325 stop: i64,
326 with_scores: bool,
327 },
328 ZCount {
329 key: String,
330 min: ScoreBound,
331 max: ScoreBound,
332 },
333 ZIncrBy {
334 key: String,
335 increment: f64,
336 member: String,
337 },
338 ZRangeByScore {
339 key: String,
340 min: ScoreBound,
341 max: ScoreBound,
342 offset: usize,
343 count: Option<usize>,
344 },
345 ZRevRangeByScore {
346 key: String,
347 min: ScoreBound,
348 max: ScoreBound,
349 offset: usize,
350 count: Option<usize>,
351 },
352 ZPopMin {
353 key: String,
354 count: usize,
355 },
356 ZPopMax {
357 key: String,
358 count: usize,
359 },
360 HSet {
361 key: String,
362 fields: Vec<(String, Bytes)>,
363 },
364 HGet {
365 key: String,
366 field: String,
367 },
368 HGetAll {
369 key: String,
370 },
371 HDel {
372 key: String,
373 fields: Vec<String>,
374 },
375 HExists {
376 key: String,
377 field: String,
378 },
379 HLen {
380 key: String,
381 },
382 HIncrBy {
383 key: String,
384 field: String,
385 delta: i64,
386 },
387 HKeys {
388 key: String,
389 },
390 HVals {
391 key: String,
392 },
393 HMGet {
394 key: String,
395 fields: Vec<String>,
396 },
397 SAdd {
398 key: String,
399 members: Vec<String>,
400 },
401 SRem {
402 key: String,
403 members: Vec<String>,
404 },
405 SMembers {
406 key: String,
407 },
408 SIsMember {
409 key: String,
410 member: String,
411 },
412 SCard {
413 key: String,
414 },
415 SUnion {
416 keys: Vec<String>,
417 },
418 SInter {
419 keys: Vec<String>,
420 },
421 SDiff {
422 keys: Vec<String>,
423 },
424 SUnionStore {
425 dest: String,
426 keys: Vec<String>,
427 },
428 SInterStore {
429 dest: String,
430 keys: Vec<String>,
431 },
432 SDiffStore {
433 dest: String,
434 keys: Vec<String>,
435 },
436 SRandMember {
437 key: String,
438 count: i64,
439 },
440 SPop {
441 key: String,
442 count: usize,
443 },
444 SMisMember {
445 key: String,
446 members: Vec<String>,
447 },
448 DbSize,
450 Stats,
452 KeyVersion {
455 key: String,
456 },
457 Snapshot,
459 SerializeSnapshot,
464 RewriteAof,
466 FlushDb,
468 FlushDbAsync,
470 Scan {
472 cursor: u64,
473 count: usize,
474 pattern: Option<String>,
475 },
476 SScan {
478 key: String,
479 cursor: u64,
480 count: usize,
481 pattern: Option<String>,
482 },
483 HScan {
485 key: String,
486 cursor: u64,
487 count: usize,
488 pattern: Option<String>,
489 },
490 ZScan {
492 key: String,
493 cursor: u64,
494 count: usize,
495 pattern: Option<String>,
496 },
497 CountKeysInSlot {
499 slot: u16,
500 },
501 GetKeysInSlot {
503 slot: u16,
504 count: usize,
505 },
506 DumpKey {
508 key: String,
509 },
510 RestoreKey {
512 key: String,
513 ttl_ms: u64,
514 data: bytes::Bytes,
515 replace: bool,
516 },
517 #[cfg(feature = "vector")]
519 VAdd {
520 key: String,
521 element: String,
522 vector: Vec<f32>,
523 metric: u8,
524 quantization: u8,
525 connectivity: u32,
526 expansion_add: u32,
527 },
528 #[cfg(feature = "vector")]
530 VAddBatch {
531 key: String,
532 entries: Vec<(String, Vec<f32>)>,
533 dim: usize,
534 metric: u8,
535 quantization: u8,
536 connectivity: u32,
537 expansion_add: u32,
538 },
539 #[cfg(feature = "vector")]
541 VSim {
542 key: String,
543 query: Vec<f32>,
544 count: usize,
545 ef_search: usize,
546 },
547 #[cfg(feature = "vector")]
549 VRem {
550 key: String,
551 element: String,
552 },
553 #[cfg(feature = "vector")]
555 VGet {
556 key: String,
557 element: String,
558 },
559 #[cfg(feature = "vector")]
561 VCard {
562 key: String,
563 },
564 #[cfg(feature = "vector")]
566 VDim {
567 key: String,
568 },
569 #[cfg(feature = "vector")]
571 VInfo {
572 key: String,
573 },
574 #[cfg(feature = "protobuf")]
576 ProtoSet {
577 key: String,
578 type_name: String,
579 data: Bytes,
580 expire: Option<Duration>,
581 nx: bool,
582 xx: bool,
583 },
584 #[cfg(feature = "protobuf")]
586 ProtoGet {
587 key: String,
588 },
589 #[cfg(feature = "protobuf")]
591 ProtoType {
592 key: String,
593 },
594 #[cfg(feature = "protobuf")]
598 ProtoRegisterAof {
599 name: String,
600 descriptor: Bytes,
601 },
602 #[cfg(feature = "protobuf")]
605 ProtoSetField {
606 key: String,
607 field_path: String,
608 value: String,
609 },
610 #[cfg(feature = "protobuf")]
613 ProtoDelField {
614 key: String,
615 field_path: String,
616 },
617}
618
619impl ShardRequest {
620 fn is_write(&self) -> bool {
624 #[allow(unreachable_patterns)]
625 match self {
626 ShardRequest::Set { .. }
627 | ShardRequest::Incr { .. }
628 | ShardRequest::Decr { .. }
629 | ShardRequest::IncrBy { .. }
630 | ShardRequest::DecrBy { .. }
631 | ShardRequest::IncrByFloat { .. }
632 | ShardRequest::Append { .. }
633 | ShardRequest::Del { .. }
634 | ShardRequest::Unlink { .. }
635 | ShardRequest::Rename { .. }
636 | ShardRequest::Copy { .. }
637 | ShardRequest::Expire { .. }
638 | ShardRequest::Persist { .. }
639 | ShardRequest::Pexpire { .. }
640 | ShardRequest::LPush { .. }
641 | ShardRequest::RPush { .. }
642 | ShardRequest::LPop { .. }
643 | ShardRequest::RPop { .. }
644 | ShardRequest::LSet { .. }
645 | ShardRequest::LTrim { .. }
646 | ShardRequest::LInsert { .. }
647 | ShardRequest::LRem { .. }
648 | ShardRequest::BLPop { .. }
649 | ShardRequest::BRPop { .. }
650 | ShardRequest::ZAdd { .. }
651 | ShardRequest::ZRem { .. }
652 | ShardRequest::ZIncrBy { .. }
653 | ShardRequest::ZPopMin { .. }
654 | ShardRequest::ZPopMax { .. }
655 | ShardRequest::HSet { .. }
656 | ShardRequest::HDel { .. }
657 | ShardRequest::HIncrBy { .. }
658 | ShardRequest::SAdd { .. }
659 | ShardRequest::SRem { .. }
660 | ShardRequest::SPop { .. }
661 | ShardRequest::SUnionStore { .. }
662 | ShardRequest::SInterStore { .. }
663 | ShardRequest::SDiffStore { .. }
664 | ShardRequest::FlushDb
665 | ShardRequest::FlushDbAsync
666 | ShardRequest::RestoreKey { .. } => true,
667 #[cfg(feature = "protobuf")]
668 ShardRequest::ProtoSet { .. }
669 | ShardRequest::ProtoRegisterAof { .. }
670 | ShardRequest::ProtoSetField { .. }
671 | ShardRequest::ProtoDelField { .. } => true,
672 #[cfg(feature = "vector")]
673 ShardRequest::VAdd { .. }
674 | ShardRequest::VAddBatch { .. }
675 | ShardRequest::VRem { .. } => true,
676 _ => false,
677 }
678 }
679}
680
681#[derive(Debug)]
683pub enum ShardResponse {
684 Value(Option<Value>),
686 Ok,
688 Integer(i64),
690 Bool(bool),
692 Ttl(TtlResult),
694 OutOfMemory,
696 KeyCount(usize),
698 Stats(KeyspaceStats),
700 Len(usize),
702 Array(Vec<Bytes>),
704 TypeName(&'static str),
706 EncodingName(Option<&'static str>),
708 ZAddLen {
710 count: usize,
711 applied: Vec<(f64, String)>,
712 },
713 ZRemLen { count: usize, removed: Vec<String> },
715 Score(Option<f64>),
717 Rank(Option<usize>),
719 ScoredArray(Vec<(String, f64)>),
721 ZIncrByResult { new_score: f64, member: String },
723 ZPopResult(Vec<(String, f64)>),
725 BulkString(String),
727 WrongType,
729 Err(String),
731 Scan { cursor: u64, keys: Vec<String> },
733 CollectionScan { cursor: u64, items: Vec<Bytes> },
735 HashFields(Vec<(String, Bytes)>),
737 HDelLen { count: usize, removed: Vec<String> },
739 StringArray(Vec<String>),
741 IntegerArray(Vec<i64>),
743 BoolArray(Vec<bool>),
745 SetStoreResult { count: usize, members: Vec<String> },
747 KeyDump { data: Vec<u8>, ttl_ms: i64 },
749 SnapshotData { shard_id: u16, data: Vec<u8> },
751 OptionalArray(Vec<Option<Bytes>>),
753 #[cfg(feature = "vector")]
755 VAddResult {
756 element: String,
757 vector: Vec<f32>,
758 added: bool,
759 },
760 #[cfg(feature = "vector")]
762 VAddBatchResult {
763 added_count: usize,
764 applied: Vec<(String, Vec<f32>)>,
765 },
766 #[cfg(feature = "vector")]
768 VSimResult(Vec<(String, f32)>),
769 #[cfg(feature = "vector")]
771 VectorData(Option<Vec<f32>>),
772 #[cfg(feature = "vector")]
774 VectorInfo(Option<Vec<(String, String)>>),
775 #[cfg(feature = "protobuf")]
777 ProtoValue(Option<(String, Bytes, Option<Duration>)>),
778 #[cfg(feature = "protobuf")]
780 ProtoTypeName(Option<String>),
781 #[cfg(feature = "protobuf")]
784 ProtoFieldUpdated {
785 type_name: String,
786 data: Bytes,
787 expire: Option<Duration>,
788 },
789 Version(Option<u64>),
791}
792
793#[derive(Debug)]
804pub enum ShardMessage {
805 Single {
807 request: ShardRequest,
808 reply: oneshot::Sender<ShardResponse>,
809 },
810 SingleReusable {
816 request: ShardRequest,
817 reply: mpsc::Sender<ShardResponse>,
818 },
819 Batch(Vec<(ShardRequest, oneshot::Sender<ShardResponse>)>),
821}
822
823#[derive(Debug, Clone)]
828pub struct ShardHandle {
829 tx: mpsc::Sender<ShardMessage>,
830}
831
832impl ShardHandle {
833 pub async fn send(&self, request: ShardRequest) -> Result<ShardResponse, ShardError> {
837 let rx = self.dispatch(request).await?;
838 rx.await.map_err(|_| ShardError::Unavailable)
839 }
840
841 pub async fn dispatch(
846 &self,
847 request: ShardRequest,
848 ) -> Result<oneshot::Receiver<ShardResponse>, ShardError> {
849 let (reply_tx, reply_rx) = oneshot::channel();
850 let msg = ShardMessage::Single {
851 request,
852 reply: reply_tx,
853 };
854 self.tx
855 .send(msg)
856 .await
857 .map_err(|_| ShardError::Unavailable)?;
858 Ok(reply_rx)
859 }
860
861 pub async fn dispatch_reusable(
867 &self,
868 request: ShardRequest,
869 reply: mpsc::Sender<ShardResponse>,
870 ) -> Result<(), ShardError> {
871 self.tx
872 .send(ShardMessage::SingleReusable { request, reply })
873 .await
874 .map_err(|_| ShardError::Unavailable)
875 }
876
877 pub async fn dispatch_batch(
885 &self,
886 requests: Vec<ShardRequest>,
887 ) -> Result<Vec<oneshot::Receiver<ShardResponse>>, ShardError> {
888 if requests.len() == 1 {
889 let rx = self
890 .dispatch(requests.into_iter().next().expect("len == 1"))
891 .await?;
892 return Ok(vec![rx]);
893 }
894 let mut receivers = Vec::with_capacity(requests.len());
895 let mut entries = Vec::with_capacity(requests.len());
896 for request in requests {
897 let (tx, rx) = oneshot::channel();
898 entries.push((request, tx));
899 receivers.push(rx);
900 }
901 self.tx
902 .send(ShardMessage::Batch(entries))
903 .await
904 .map_err(|_| ShardError::Unavailable)?;
905 Ok(receivers)
906 }
907}
908
909pub struct PreparedShard {
916 rx: mpsc::Receiver<ShardMessage>,
917 config: ShardConfig,
918 persistence: Option<ShardPersistenceConfig>,
919 drop_handle: Option<DropHandle>,
920 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
921 #[cfg(feature = "protobuf")]
922 schema_registry: Option<crate::schema::SharedSchemaRegistry>,
923}
924
925pub fn prepare_shard(
930 buffer: usize,
931 config: ShardConfig,
932 persistence: Option<ShardPersistenceConfig>,
933 drop_handle: Option<DropHandle>,
934 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
935 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
936) -> (ShardHandle, PreparedShard) {
937 let (tx, rx) = mpsc::channel(buffer);
938 let prepared = PreparedShard {
939 rx,
940 config,
941 persistence,
942 drop_handle,
943 replication_tx,
944 #[cfg(feature = "protobuf")]
945 schema_registry,
946 };
947 (ShardHandle { tx }, prepared)
948}
949
950pub async fn run_prepared(prepared: PreparedShard) {
955 run_shard(
956 prepared.rx,
957 prepared.config,
958 prepared.persistence,
959 prepared.drop_handle,
960 prepared.replication_tx,
961 #[cfg(feature = "protobuf")]
962 prepared.schema_registry,
963 )
964 .await
965}
966
967pub fn spawn_shard(
976 buffer: usize,
977 config: ShardConfig,
978 persistence: Option<ShardPersistenceConfig>,
979 drop_handle: Option<DropHandle>,
980 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
981 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
982) -> ShardHandle {
983 let (handle, prepared) = prepare_shard(
984 buffer,
985 config,
986 persistence,
987 drop_handle,
988 replication_tx,
989 #[cfg(feature = "protobuf")]
990 schema_registry,
991 );
992 tokio::spawn(run_prepared(prepared));
993 handle
994}
995
996async fn run_shard(
999 mut rx: mpsc::Receiver<ShardMessage>,
1000 config: ShardConfig,
1001 persistence: Option<ShardPersistenceConfig>,
1002 drop_handle: Option<DropHandle>,
1003 replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
1004 #[cfg(feature = "protobuf")] schema_registry: Option<crate::schema::SharedSchemaRegistry>,
1005) {
1006 let shard_id = config.shard_id;
1007 let mut keyspace = Keyspace::with_config(config);
1008
1009 if let Some(handle) = drop_handle.clone() {
1010 keyspace.set_drop_handle(handle);
1011 }
1012
1013 if let Some(ref pcfg) = persistence {
1015 #[cfg(feature = "encryption")]
1016 let result = if let Some(ref key) = pcfg.encryption_key {
1017 recovery::recover_shard_encrypted(&pcfg.data_dir, shard_id, key.clone())
1018 } else {
1019 recovery::recover_shard(&pcfg.data_dir, shard_id)
1020 };
1021 #[cfg(not(feature = "encryption"))]
1022 let result = recovery::recover_shard(&pcfg.data_dir, shard_id);
1023 let count = result.entries.len();
1024 for entry in result.entries {
1025 let value = match entry.value {
1026 RecoveredValue::String(data) => Value::String(data),
1027 RecoveredValue::List(deque) => Value::List(deque),
1028 RecoveredValue::SortedSet(members) => {
1029 let mut ss = crate::types::sorted_set::SortedSet::new();
1030 for (score, member) in members {
1031 ss.add(&member, score);
1032 }
1033 Value::SortedSet(Box::new(ss))
1034 }
1035 RecoveredValue::Hash(map) => {
1036 Value::Hash(Box::new(crate::types::hash::HashValue::from(map)))
1037 }
1038 RecoveredValue::Set(set) => Value::Set(Box::new(set)),
1039 #[cfg(feature = "vector")]
1040 RecoveredValue::Vector {
1041 metric,
1042 quantization,
1043 connectivity,
1044 expansion_add,
1045 elements,
1046 } => {
1047 use crate::types::vector::{DistanceMetric, QuantizationType, VectorSet};
1048 let dim = elements.first().map(|(_, v)| v.len()).unwrap_or(0);
1049 match VectorSet::new(
1050 dim,
1051 DistanceMetric::from_u8(metric),
1052 QuantizationType::from_u8(quantization),
1053 connectivity as usize,
1054 expansion_add as usize,
1055 ) {
1056 Ok(mut vs) => {
1057 for (element, vector) in elements {
1058 if let Err(e) = vs.add(element, &vector) {
1059 warn!("vector recovery: failed to add element: {e}");
1060 }
1061 }
1062 Value::Vector(vs)
1063 }
1064 Err(e) => {
1065 warn!("vector recovery: failed to create index: {e}");
1066 continue;
1067 }
1068 }
1069 }
1070 #[cfg(feature = "protobuf")]
1071 RecoveredValue::Proto { type_name, data } => Value::Proto { type_name, data },
1072 };
1073 keyspace.restore(entry.key, value, entry.ttl);
1074 }
1075 if count > 0 {
1076 info!(
1077 shard_id,
1078 recovered_keys = count,
1079 snapshot = result.loaded_snapshot,
1080 aof = result.replayed_aof,
1081 "recovered shard state"
1082 );
1083 }
1084
1085 #[cfg(feature = "protobuf")]
1087 if let Some(ref registry) = schema_registry {
1088 if !result.schemas.is_empty() {
1089 if let Ok(mut reg) = registry.write() {
1090 let schema_count = result.schemas.len();
1091 for (name, descriptor) in result.schemas {
1092 reg.restore(name, descriptor);
1093 }
1094 info!(
1095 shard_id,
1096 schemas = schema_count,
1097 "restored schemas from AOF"
1098 );
1099 }
1100 }
1101 }
1102 }
1103
1104 let mut aof_writer: Option<AofWriter> = match &persistence {
1106 Some(pcfg) if pcfg.append_only => {
1107 let path = ember_persistence::aof::aof_path(&pcfg.data_dir, shard_id);
1108 #[cfg(feature = "encryption")]
1109 let result = if let Some(ref key) = pcfg.encryption_key {
1110 AofWriter::open_encrypted(path, key.clone())
1111 } else {
1112 AofWriter::open(path)
1113 };
1114 #[cfg(not(feature = "encryption"))]
1115 let result = AofWriter::open(path);
1116 match result {
1117 Ok(w) => Some(w),
1118 Err(e) => {
1119 warn!(shard_id, "failed to open AOF writer: {e}");
1120 None
1121 }
1122 }
1123 }
1124 _ => None,
1125 };
1126
1127 let fsync_policy = persistence
1128 .as_ref()
1129 .map(|p| p.fsync_policy)
1130 .unwrap_or(FsyncPolicy::No);
1131
1132 let mut replication_offset: u64 = 0;
1134
1135 let mut lpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1137 let mut rpop_waiters: HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>> = HashMap::new();
1138
1139 let mut aof_errors: u32 = 0;
1141
1142 let mut disk_full: bool = false;
1145
1146 let mut expiry_tick = tokio::time::interval(EXPIRY_TICK);
1148 expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1149
1150 let mut fsync_tick = tokio::time::interval(FSYNC_INTERVAL);
1151 fsync_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1152
1153 loop {
1154 tokio::select! {
1155 msg = rx.recv() => {
1156 match msg {
1157 Some(msg) => {
1158 let mut ctx = ProcessCtx {
1159 keyspace: &mut keyspace,
1160 aof_writer: &mut aof_writer,
1161 fsync_policy,
1162 persistence: &persistence,
1163 drop_handle: &drop_handle,
1164 shard_id,
1165 replication_tx: &replication_tx,
1166 replication_offset: &mut replication_offset,
1167 lpop_waiters: &mut lpop_waiters,
1168 rpop_waiters: &mut rpop_waiters,
1169 aof_errors: &mut aof_errors,
1170 disk_full: &mut disk_full,
1171 #[cfg(feature = "protobuf")]
1172 schema_registry: &schema_registry,
1173 };
1174 process_message(msg, &mut ctx);
1175
1176 while let Ok(msg) = rx.try_recv() {
1181 process_message(msg, &mut ctx);
1182 }
1183 }
1184 None => break, }
1186 }
1187 _ = expiry_tick.tick() => {
1188 expiry::run_expiration_cycle(&mut keyspace);
1189 }
1190 _ = fsync_tick.tick(), if fsync_policy == FsyncPolicy::EverySec => {
1191 if let Some(ref mut writer) = aof_writer {
1192 if let Err(e) = writer.sync() {
1193 if aof::log_aof_error(shard_id, &mut aof_errors, "sync", &e) {
1194 disk_full = true;
1195 }
1196 } else if aof_errors > 0 {
1197 let missed = aof_errors;
1198 aof_errors = 0;
1199 if disk_full {
1200 disk_full = false;
1201 info!(shard_id, missed_errors = missed, "aof sync recovered, accepting writes again");
1202 } else {
1203 info!(shard_id, missed_errors = missed, "aof sync recovered");
1204 }
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 if let Some(ref mut writer) = aof_writer {
1213 let _ = writer.sync();
1214 }
1215}
1216
1217struct ProcessCtx<'a> {
1222 keyspace: &'a mut Keyspace,
1223 aof_writer: &'a mut Option<AofWriter>,
1224 fsync_policy: FsyncPolicy,
1225 persistence: &'a Option<ShardPersistenceConfig>,
1226 drop_handle: &'a Option<DropHandle>,
1227 shard_id: u16,
1228 replication_tx: &'a Option<broadcast::Sender<ReplicationEvent>>,
1229 replication_offset: &'a mut u64,
1230 lpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1232 rpop_waiters: &'a mut HashMap<String, VecDeque<mpsc::Sender<(String, Bytes)>>>,
1234 aof_errors: &'a mut u32,
1237 disk_full: &'a mut bool,
1239 #[cfg(feature = "protobuf")]
1240 schema_registry: &'a Option<crate::schema::SharedSchemaRegistry>,
1241}
1242
1243fn process_message(msg: ShardMessage, ctx: &mut ProcessCtx<'_>) {
1248 match msg {
1249 ShardMessage::Single { request, reply } => {
1250 process_single(request, ReplySender::Oneshot(reply), ctx);
1251 }
1252 ShardMessage::SingleReusable { request, reply } => {
1253 process_single(request, ReplySender::Reusable(reply), ctx);
1254 }
1255 ShardMessage::Batch(entries) => {
1256 for (request, reply) in entries {
1257 process_single(request, ReplySender::Oneshot(reply), ctx);
1258 }
1259 }
1260 }
1261}
1262
1263enum ReplySender {
1268 Oneshot(oneshot::Sender<ShardResponse>),
1269 Reusable(mpsc::Sender<ShardResponse>),
1270}
1271
1272impl ReplySender {
1273 fn send(self, response: ShardResponse) {
1274 match self {
1275 ReplySender::Oneshot(tx) => {
1276 let _ = tx.send(response);
1277 }
1278 ReplySender::Reusable(tx) => {
1279 if let Err(e) = tx.try_send(response) {
1283 debug!("reusable reply channel full or closed: {e}");
1284 }
1285 }
1286 }
1287 }
1288}
1289
1290fn process_single(mut request: ShardRequest, reply: ReplySender, ctx: &mut ProcessCtx<'_>) {
1293 let fsync_policy = ctx.fsync_policy;
1295 let shard_id = ctx.shard_id;
1296
1297 if *ctx.disk_full && ctx.aof_writer.is_some() && request.is_write() {
1300 reply.send(ShardResponse::Err(
1301 "ERR disk full, write rejected — free disk space to resume writes".into(),
1302 ));
1303 return;
1304 }
1305
1306 match request {
1310 ShardRequest::BLPop { key, waiter } => {
1311 blocking::handle_blocking_pop(&key, waiter, true, reply, ctx);
1312 return;
1313 }
1314 ShardRequest::BRPop { key, waiter } => {
1315 blocking::handle_blocking_pop(&key, waiter, false, reply, ctx);
1316 return;
1317 }
1318 _ => {}
1319 }
1320
1321 let request_kind = describe_request(&request);
1322 let mut response = dispatch(
1323 ctx.keyspace,
1324 &mut request,
1325 #[cfg(feature = "protobuf")]
1326 ctx.schema_registry,
1327 );
1328
1329 if let ShardRequest::LPush { ref key, .. } | ShardRequest::RPush { ref key, .. } = request {
1332 if matches!(response, ShardResponse::Len(_)) {
1333 blocking::wake_blocked_waiters(key, ctx);
1334 }
1335 }
1336
1337 let records = aof::to_aof_records(request, &mut response);
1341
1342 if let Some(ref mut writer) = *ctx.aof_writer {
1344 let mut batch_ok = true;
1345 for record in &records {
1346 if let Err(e) = writer.write_record(record) {
1347 if aof::log_aof_error(shard_id, ctx.aof_errors, "write", &e) {
1348 *ctx.disk_full = true;
1349 }
1350 batch_ok = false;
1351 }
1352 }
1353 if !records.is_empty() && fsync_policy == FsyncPolicy::Always {
1354 if let Err(e) = writer.sync() {
1355 if aof::log_aof_error(shard_id, ctx.aof_errors, "sync", &e) {
1356 *ctx.disk_full = true;
1357 }
1358 batch_ok = false;
1359 }
1360 }
1361 if batch_ok && *ctx.aof_errors > 0 {
1362 let missed = *ctx.aof_errors;
1363 *ctx.aof_errors = 0;
1364 *ctx.disk_full = false;
1365 info!(shard_id, missed_errors = missed, "aof writes recovered");
1366 }
1367 }
1368
1369 if let Some(ref tx) = *ctx.replication_tx {
1371 for record in records {
1372 *ctx.replication_offset += 1;
1373 let _ = tx.send(ReplicationEvent {
1375 shard_id,
1376 offset: *ctx.replication_offset,
1377 record,
1378 });
1379 }
1380 }
1381
1382 match request_kind {
1384 RequestKind::Snapshot => {
1385 let resp = persistence::handle_snapshot(ctx.keyspace, ctx.persistence, shard_id);
1386 reply.send(resp);
1387 return;
1388 }
1389 RequestKind::SerializeSnapshot => {
1390 let resp = persistence::handle_serialize_snapshot(ctx.keyspace, shard_id);
1391 reply.send(resp);
1392 return;
1393 }
1394 RequestKind::RewriteAof => {
1395 let resp = persistence::handle_rewrite(
1396 ctx.keyspace,
1397 ctx.persistence,
1398 ctx.aof_writer,
1399 shard_id,
1400 #[cfg(feature = "protobuf")]
1401 ctx.schema_registry,
1402 );
1403 reply.send(resp);
1404 return;
1405 }
1406 RequestKind::FlushDbAsync => {
1407 let old_entries = ctx.keyspace.flush_async();
1408 if let Some(ref handle) = *ctx.drop_handle {
1409 handle.defer_entries(old_entries);
1410 }
1411 reply.send(ShardResponse::Ok);
1412 return;
1413 }
1414 RequestKind::Other => {}
1415 }
1416
1417 reply.send(response);
1418}
1419
1420enum RequestKind {
1423 Snapshot,
1424 SerializeSnapshot,
1425 RewriteAof,
1426 FlushDbAsync,
1427 Other,
1428}
1429
1430fn describe_request(req: &ShardRequest) -> RequestKind {
1431 match req {
1432 ShardRequest::Snapshot => RequestKind::Snapshot,
1433 ShardRequest::SerializeSnapshot => RequestKind::SerializeSnapshot,
1434 ShardRequest::RewriteAof => RequestKind::RewriteAof,
1435 ShardRequest::FlushDbAsync => RequestKind::FlushDbAsync,
1436 _ => RequestKind::Other,
1437 }
1438}
1439
1440fn incr_result(result: Result<i64, IncrError>) -> ShardResponse {
1442 match result {
1443 Ok(val) => ShardResponse::Integer(val),
1444 Err(IncrError::WrongType) => ShardResponse::WrongType,
1445 Err(IncrError::OutOfMemory) => ShardResponse::OutOfMemory,
1446 Err(e) => ShardResponse::Err(e.to_string()),
1447 }
1448}
1449
1450fn write_result_len(result: Result<usize, WriteError>) -> ShardResponse {
1452 match result {
1453 Ok(len) => ShardResponse::Len(len),
1454 Err(WriteError::WrongType) => ShardResponse::WrongType,
1455 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1456 }
1457}
1458
1459fn store_set_response(result: Result<(usize, Vec<String>), WriteError>) -> ShardResponse {
1460 match result {
1461 Ok((count, members)) => ShardResponse::SetStoreResult { count, members },
1462 Err(WriteError::WrongType) => ShardResponse::WrongType,
1463 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1464 }
1465}
1466
1467fn dispatch(
1471 ks: &mut Keyspace,
1472 req: &mut ShardRequest,
1473 #[cfg(feature = "protobuf")] schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
1474) -> ShardResponse {
1475 match req {
1476 ShardRequest::Get { key } => match ks.get_string(key) {
1477 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1478 Err(_) => ShardResponse::WrongType,
1479 },
1480 ShardRequest::Set {
1481 key,
1482 value,
1483 expire,
1484 nx,
1485 xx,
1486 } => match ks.set(key.clone(), value.clone(), *expire, *nx, *xx) {
1487 SetResult::Ok => ShardResponse::Ok,
1488 SetResult::Blocked => ShardResponse::Value(None),
1489 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
1490 },
1491 ShardRequest::Incr { key } => incr_result(ks.incr(key)),
1492 ShardRequest::Decr { key } => incr_result(ks.decr(key)),
1493 ShardRequest::IncrBy { key, delta } => incr_result(ks.incr_by(key, *delta)),
1494 ShardRequest::DecrBy { key, delta } => match delta.checked_neg() {
1495 Some(neg) => incr_result(ks.incr_by(key, neg)),
1496 None => ShardResponse::Err("ERR increment or decrement would overflow".into()),
1497 },
1498 ShardRequest::IncrByFloat { key, delta } => match ks.incr_by_float(key, *delta) {
1499 Ok(val) => ShardResponse::BulkString(val),
1500 Err(IncrFloatError::WrongType) => ShardResponse::WrongType,
1501 Err(IncrFloatError::OutOfMemory) => ShardResponse::OutOfMemory,
1502 Err(e) => ShardResponse::Err(e.to_string()),
1503 },
1504 ShardRequest::Append { key, value } => write_result_len(ks.append(key, value)),
1505 ShardRequest::Strlen { key } => match ks.strlen(key) {
1506 Ok(len) => ShardResponse::Len(len),
1507 Err(_) => ShardResponse::WrongType,
1508 },
1509 ShardRequest::GetRange { key, start, end } => match ks.getrange(key, *start, *end) {
1510 Ok(data) => ShardResponse::Value(Some(Value::String(data))),
1511 Err(_) => ShardResponse::WrongType,
1512 },
1513 ShardRequest::SetRange { key, offset, value } => {
1514 write_result_len(ks.setrange(key, *offset, value))
1515 }
1516 ShardRequest::Keys { pattern } => {
1517 let keys = ks.keys(pattern);
1518 ShardResponse::StringArray(keys)
1519 }
1520 ShardRequest::Rename { key, newkey } => {
1521 use crate::keyspace::RenameError;
1522 match ks.rename(key, newkey) {
1523 Ok(()) => ShardResponse::Ok,
1524 Err(RenameError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1525 }
1526 }
1527 ShardRequest::Copy {
1528 source,
1529 destination,
1530 replace,
1531 } => {
1532 use crate::keyspace::CopyError;
1533 match ks.copy(source, destination, *replace) {
1534 Ok(copied) => ShardResponse::Bool(copied),
1535 Err(CopyError::NoSuchKey) => ShardResponse::Err("ERR no such key".into()),
1536 Err(CopyError::OutOfMemory) => ShardResponse::OutOfMemory,
1537 }
1538 }
1539 ShardRequest::ObjectEncoding { key } => ShardResponse::EncodingName(ks.encoding(key)),
1540 ShardRequest::Del { key } => ShardResponse::Bool(ks.del(key)),
1541 ShardRequest::Unlink { key } => ShardResponse::Bool(ks.unlink(key)),
1542 ShardRequest::Exists { key } => ShardResponse::Bool(ks.exists(key)),
1543 ShardRequest::RandomKey => match ks.random_key() {
1544 Some(k) => ShardResponse::StringArray(vec![k]),
1545 None => ShardResponse::StringArray(vec![]),
1546 },
1547 ShardRequest::Touch { key } => ShardResponse::Bool(ks.touch(key)),
1548 ShardRequest::Sort {
1549 key,
1550 desc,
1551 alpha,
1552 limit,
1553 } => match ks.sort(key, *desc, *alpha, *limit) {
1554 Ok(items) => ShardResponse::Array(items),
1555 Err(_) => ShardResponse::WrongType,
1556 },
1557 ShardRequest::Expire { key, seconds } => ShardResponse::Bool(ks.expire(key, *seconds)),
1558 ShardRequest::Ttl { key } => ShardResponse::Ttl(ks.ttl(key)),
1559 ShardRequest::Persist { key } => ShardResponse::Bool(ks.persist(key)),
1560 ShardRequest::Pttl { key } => ShardResponse::Ttl(ks.pttl(key)),
1561 ShardRequest::Pexpire { key, milliseconds } => {
1562 ShardResponse::Bool(ks.pexpire(key, *milliseconds))
1563 }
1564 ShardRequest::LPush { key, values } => write_result_len(ks.lpush(key, values)),
1565 ShardRequest::RPush { key, values } => write_result_len(ks.rpush(key, values)),
1566 ShardRequest::LPop { key } => match ks.lpop(key) {
1567 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1568 Err(_) => ShardResponse::WrongType,
1569 },
1570 ShardRequest::RPop { key } => match ks.rpop(key) {
1571 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1572 Err(_) => ShardResponse::WrongType,
1573 },
1574 ShardRequest::LRange { key, start, stop } => match ks.lrange(key, *start, *stop) {
1575 Ok(items) => ShardResponse::Array(items),
1576 Err(_) => ShardResponse::WrongType,
1577 },
1578 ShardRequest::LLen { key } => match ks.llen(key) {
1579 Ok(len) => ShardResponse::Len(len),
1580 Err(_) => ShardResponse::WrongType,
1581 },
1582 ShardRequest::LIndex { key, index } => match ks.lindex(key, *index) {
1583 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1584 Err(_) => ShardResponse::WrongType,
1585 },
1586 ShardRequest::LSet { key, index, value } => match ks.lset(key, *index, value.clone()) {
1587 Ok(()) => ShardResponse::Ok,
1588 Err(e) => match e {
1589 LsetError::WrongType => ShardResponse::WrongType,
1590 LsetError::NoSuchKey => ShardResponse::Err("ERR no such key".into()),
1591 LsetError::IndexOutOfRange => ShardResponse::Err("ERR index out of range".into()),
1592 },
1593 },
1594 ShardRequest::LTrim { key, start, stop } => match ks.ltrim(key, *start, *stop) {
1595 Ok(()) => ShardResponse::Ok,
1596 Err(_) => ShardResponse::WrongType,
1597 },
1598 ShardRequest::LInsert {
1599 key,
1600 before,
1601 pivot,
1602 value,
1603 } => match ks.linsert(key, *before, pivot, value.clone()) {
1604 Ok(n) => ShardResponse::Integer(n),
1605 Err(WriteError::WrongType) => ShardResponse::WrongType,
1606 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1607 },
1608 ShardRequest::LRem { key, count, value } => match ks.lrem(key, *count, value) {
1609 Ok(n) => ShardResponse::Len(n),
1610 Err(_) => ShardResponse::WrongType,
1611 },
1612 ShardRequest::LPos {
1613 key,
1614 element,
1615 rank,
1616 count,
1617 maxlen,
1618 } => match ks.lpos(key, element, *rank, *count, *maxlen) {
1619 Ok(positions) => ShardResponse::IntegerArray(positions),
1620 Err(_) => ShardResponse::WrongType,
1621 },
1622 ShardRequest::Type { key } => ShardResponse::TypeName(ks.value_type(key)),
1623 ShardRequest::ZAdd {
1624 key,
1625 members,
1626 nx,
1627 xx,
1628 gt,
1629 lt,
1630 ch,
1631 } => {
1632 let flags = ZAddFlags {
1633 nx: *nx,
1634 xx: *xx,
1635 gt: *gt,
1636 lt: *lt,
1637 ch: *ch,
1638 };
1639 match ks.zadd(key, members, &flags) {
1640 Ok(result) => ShardResponse::ZAddLen {
1641 count: result.count,
1642 applied: result.applied,
1643 },
1644 Err(WriteError::WrongType) => ShardResponse::WrongType,
1645 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1646 }
1647 }
1648 ShardRequest::ZRem { key, members } => match ks.zrem(key, members) {
1649 Ok(removed) => ShardResponse::ZRemLen {
1650 count: removed.len(),
1651 removed,
1652 },
1653 Err(_) => ShardResponse::WrongType,
1654 },
1655 ShardRequest::ZScore { key, member } => match ks.zscore(key, member) {
1656 Ok(score) => ShardResponse::Score(score),
1657 Err(_) => ShardResponse::WrongType,
1658 },
1659 ShardRequest::ZRank { key, member } => match ks.zrank(key, member) {
1660 Ok(rank) => ShardResponse::Rank(rank),
1661 Err(_) => ShardResponse::WrongType,
1662 },
1663 ShardRequest::ZCard { key } => match ks.zcard(key) {
1664 Ok(len) => ShardResponse::Len(len),
1665 Err(_) => ShardResponse::WrongType,
1666 },
1667 ShardRequest::ZRevRank { key, member } => match ks.zrevrank(key, member) {
1668 Ok(rank) => ShardResponse::Rank(rank),
1669 Err(_) => ShardResponse::WrongType,
1670 },
1671 ShardRequest::ZRange {
1672 key, start, stop, ..
1673 } => match ks.zrange(key, *start, *stop) {
1674 Ok(items) => ShardResponse::ScoredArray(items),
1675 Err(_) => ShardResponse::WrongType,
1676 },
1677 ShardRequest::ZRevRange {
1678 key, start, stop, ..
1679 } => match ks.zrevrange(key, *start, *stop) {
1680 Ok(items) => ShardResponse::ScoredArray(items),
1681 Err(_) => ShardResponse::WrongType,
1682 },
1683 ShardRequest::ZCount { key, min, max } => match ks.zcount(key, *min, *max) {
1684 Ok(count) => ShardResponse::Len(count),
1685 Err(_) => ShardResponse::WrongType,
1686 },
1687 ShardRequest::ZIncrBy {
1688 key,
1689 increment,
1690 member,
1691 } => match ks.zincrby(key, *increment, member) {
1692 Ok(new_score) => ShardResponse::ZIncrByResult {
1693 new_score,
1694 member: member.clone(),
1695 },
1696 Err(WriteError::WrongType) => ShardResponse::WrongType,
1697 Err(WriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1698 },
1699 ShardRequest::ZRangeByScore {
1700 key,
1701 min,
1702 max,
1703 offset,
1704 count,
1705 } => match ks.zrangebyscore(key, *min, *max, *offset, *count) {
1706 Ok(items) => ShardResponse::ScoredArray(items),
1707 Err(_) => ShardResponse::WrongType,
1708 },
1709 ShardRequest::ZRevRangeByScore {
1710 key,
1711 min,
1712 max,
1713 offset,
1714 count,
1715 } => match ks.zrevrangebyscore(key, *min, *max, *offset, *count) {
1716 Ok(items) => ShardResponse::ScoredArray(items),
1717 Err(_) => ShardResponse::WrongType,
1718 },
1719 ShardRequest::ZPopMin { key, count } => match ks.zpopmin(key, *count) {
1720 Ok(items) => ShardResponse::ZPopResult(items),
1721 Err(_) => ShardResponse::WrongType,
1722 },
1723 ShardRequest::ZPopMax { key, count } => match ks.zpopmax(key, *count) {
1724 Ok(items) => ShardResponse::ZPopResult(items),
1725 Err(_) => ShardResponse::WrongType,
1726 },
1727 ShardRequest::DbSize => ShardResponse::KeyCount(ks.len()),
1728 ShardRequest::Stats => ShardResponse::Stats(ks.stats()),
1729 ShardRequest::KeyVersion { ref key } => ShardResponse::Version(ks.key_version(key)),
1730 ShardRequest::FlushDb => {
1731 ks.clear();
1732 ShardResponse::Ok
1733 }
1734 ShardRequest::Scan {
1735 cursor,
1736 count,
1737 pattern,
1738 } => {
1739 let (next_cursor, keys) = ks.scan_keys(*cursor, *count, pattern.as_deref());
1740 ShardResponse::Scan {
1741 cursor: next_cursor,
1742 keys,
1743 }
1744 }
1745 ShardRequest::HSet { key, fields } => write_result_len(ks.hset(key, fields)),
1746 ShardRequest::HGet { key, field } => match ks.hget(key, field) {
1747 Ok(val) => ShardResponse::Value(val.map(Value::String)),
1748 Err(_) => ShardResponse::WrongType,
1749 },
1750 ShardRequest::HGetAll { key } => match ks.hgetall(key) {
1751 Ok(fields) => ShardResponse::HashFields(fields),
1752 Err(_) => ShardResponse::WrongType,
1753 },
1754 ShardRequest::HDel { key, fields } => match ks.hdel(key, fields) {
1755 Ok(removed) => ShardResponse::HDelLen {
1756 count: removed.len(),
1757 removed,
1758 },
1759 Err(_) => ShardResponse::WrongType,
1760 },
1761 ShardRequest::HExists { key, field } => match ks.hexists(key, field) {
1762 Ok(exists) => ShardResponse::Bool(exists),
1763 Err(_) => ShardResponse::WrongType,
1764 },
1765 ShardRequest::HLen { key } => match ks.hlen(key) {
1766 Ok(len) => ShardResponse::Len(len),
1767 Err(_) => ShardResponse::WrongType,
1768 },
1769 ShardRequest::HIncrBy { key, field, delta } => incr_result(ks.hincrby(key, field, *delta)),
1770 ShardRequest::HKeys { key } => match ks.hkeys(key) {
1771 Ok(keys) => ShardResponse::StringArray(keys),
1772 Err(_) => ShardResponse::WrongType,
1773 },
1774 ShardRequest::HVals { key } => match ks.hvals(key) {
1775 Ok(vals) => ShardResponse::Array(vals),
1776 Err(_) => ShardResponse::WrongType,
1777 },
1778 ShardRequest::HMGet { key, fields } => match ks.hmget(key, fields) {
1779 Ok(vals) => ShardResponse::OptionalArray(vals),
1780 Err(_) => ShardResponse::WrongType,
1781 },
1782 ShardRequest::SAdd { key, members } => write_result_len(ks.sadd(key, members)),
1783 ShardRequest::SRem { key, members } => match ks.srem(key, members) {
1784 Ok(count) => ShardResponse::Len(count),
1785 Err(_) => ShardResponse::WrongType,
1786 },
1787 ShardRequest::SMembers { key } => match ks.smembers(key) {
1788 Ok(members) => ShardResponse::StringArray(members),
1789 Err(_) => ShardResponse::WrongType,
1790 },
1791 ShardRequest::SIsMember { key, member } => match ks.sismember(key, member) {
1792 Ok(exists) => ShardResponse::Bool(exists),
1793 Err(_) => ShardResponse::WrongType,
1794 },
1795 ShardRequest::SCard { key } => match ks.scard(key) {
1796 Ok(count) => ShardResponse::Len(count),
1797 Err(_) => ShardResponse::WrongType,
1798 },
1799 ShardRequest::SUnion { keys } => match ks.sunion(keys) {
1800 Ok(members) => ShardResponse::StringArray(members),
1801 Err(_) => ShardResponse::WrongType,
1802 },
1803 ShardRequest::SInter { keys } => match ks.sinter(keys) {
1804 Ok(members) => ShardResponse::StringArray(members),
1805 Err(_) => ShardResponse::WrongType,
1806 },
1807 ShardRequest::SDiff { keys } => match ks.sdiff(keys) {
1808 Ok(members) => ShardResponse::StringArray(members),
1809 Err(_) => ShardResponse::WrongType,
1810 },
1811 ShardRequest::SUnionStore { dest, keys } => store_set_response(ks.sunionstore(dest, keys)),
1812 ShardRequest::SInterStore { dest, keys } => store_set_response(ks.sinterstore(dest, keys)),
1813 ShardRequest::SDiffStore { dest, keys } => store_set_response(ks.sdiffstore(dest, keys)),
1814 ShardRequest::SRandMember { key, count } => match ks.srandmember(key, *count) {
1815 Ok(members) => ShardResponse::StringArray(members),
1816 Err(_) => ShardResponse::WrongType,
1817 },
1818 ShardRequest::SPop { key, count } => match ks.spop(key, *count) {
1819 Ok(members) => ShardResponse::StringArray(members),
1820 Err(_) => ShardResponse::WrongType,
1821 },
1822 ShardRequest::SMisMember { key, members } => match ks.smismember(key, members) {
1823 Ok(results) => ShardResponse::BoolArray(results),
1824 Err(_) => ShardResponse::WrongType,
1825 },
1826 ShardRequest::SScan {
1827 key,
1828 cursor,
1829 count,
1830 pattern,
1831 } => match ks.scan_set(key, *cursor, *count, pattern.as_deref()) {
1832 Ok((next, members)) => {
1833 let items = members.into_iter().map(Bytes::from).collect();
1834 ShardResponse::CollectionScan {
1835 cursor: next,
1836 items,
1837 }
1838 }
1839 Err(_) => ShardResponse::WrongType,
1840 },
1841 ShardRequest::HScan {
1842 key,
1843 cursor,
1844 count,
1845 pattern,
1846 } => match ks.scan_hash(key, *cursor, *count, pattern.as_deref()) {
1847 Ok((next, fields)) => {
1848 let mut items = Vec::with_capacity(fields.len() * 2);
1849 for (field, value) in fields {
1850 items.push(Bytes::from(field));
1851 items.push(value);
1852 }
1853 ShardResponse::CollectionScan {
1854 cursor: next,
1855 items,
1856 }
1857 }
1858 Err(_) => ShardResponse::WrongType,
1859 },
1860 ShardRequest::ZScan {
1861 key,
1862 cursor,
1863 count,
1864 pattern,
1865 } => match ks.scan_sorted_set(key, *cursor, *count, pattern.as_deref()) {
1866 Ok((next, members)) => {
1867 let mut items = Vec::with_capacity(members.len() * 2);
1868 for (member, score) in members {
1869 items.push(Bytes::from(member));
1870 items.push(Bytes::from(score.to_string()));
1871 }
1872 ShardResponse::CollectionScan {
1873 cursor: next,
1874 items,
1875 }
1876 }
1877 Err(_) => ShardResponse::WrongType,
1878 },
1879 ShardRequest::CountKeysInSlot { slot } => {
1880 ShardResponse::KeyCount(ks.count_keys_in_slot(*slot))
1881 }
1882 ShardRequest::GetKeysInSlot { slot, count } => {
1883 ShardResponse::StringArray(ks.get_keys_in_slot(*slot, *count))
1884 }
1885 ShardRequest::DumpKey { key } => match ks.dump(key) {
1886 Some((value, ttl_ms)) => {
1887 let snap = persistence::value_to_snap(value);
1888 match snapshot::serialize_snap_value(&snap) {
1889 Ok(data) => ShardResponse::KeyDump { data, ttl_ms },
1890 Err(e) => ShardResponse::Err(format!("ERR snapshot serialization failed: {e}")),
1891 }
1892 }
1893 None => ShardResponse::Value(None),
1894 },
1895 ShardRequest::RestoreKey {
1896 key,
1897 ttl_ms,
1898 data,
1899 replace,
1900 } => match snapshot::deserialize_snap_value(data) {
1901 Ok(snap) => {
1902 let exists = ks.exists(key);
1903 if exists && !*replace {
1904 ShardResponse::Err("ERR Target key name already exists".into())
1905 } else {
1906 let value = persistence::snap_to_value(snap);
1907 let ttl = if *ttl_ms == 0 {
1908 None
1909 } else {
1910 Some(Duration::from_millis(*ttl_ms))
1911 };
1912 ks.restore(key.clone(), value, ttl);
1913 ShardResponse::Ok
1914 }
1915 }
1916 Err(e) => ShardResponse::Err(format!("ERR DUMP payload corrupted: {e}")),
1917 },
1918 #[cfg(feature = "vector")]
1919 ShardRequest::VAdd {
1920 key,
1921 element,
1922 vector,
1923 metric,
1924 quantization,
1925 connectivity,
1926 expansion_add,
1927 } => {
1928 use crate::types::vector::{DistanceMetric, QuantizationType};
1929 match ks.vadd(
1930 key,
1931 element.clone(),
1932 vector.clone(),
1933 DistanceMetric::from_u8(*metric),
1934 QuantizationType::from_u8(*quantization),
1935 *connectivity as usize,
1936 *expansion_add as usize,
1937 ) {
1938 Ok(result) => ShardResponse::VAddResult {
1939 element: result.element,
1940 vector: result.vector,
1941 added: result.added,
1942 },
1943 Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
1944 Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1945 Err(crate::keyspace::VectorWriteError::IndexError(e))
1946 | Err(crate::keyspace::VectorWriteError::PartialBatch { message: e, .. }) => {
1947 ShardResponse::Err(format!("ERR vector index: {e}"))
1948 }
1949 }
1950 }
1951 #[cfg(feature = "vector")]
1952 ShardRequest::VAddBatch {
1953 key,
1954 entries,
1955 metric,
1956 quantization,
1957 connectivity,
1958 expansion_add,
1959 ..
1960 } => {
1961 use crate::types::vector::{DistanceMetric, QuantizationType};
1962 let owned_entries = std::mem::take(entries);
1967 match ks.vadd_batch(
1968 key,
1969 owned_entries,
1970 DistanceMetric::from_u8(*metric),
1971 QuantizationType::from_u8(*quantization),
1972 *connectivity as usize,
1973 *expansion_add as usize,
1974 ) {
1975 Ok(result) => ShardResponse::VAddBatchResult {
1976 added_count: result.added_count,
1977 applied: result.applied,
1978 },
1979 Err(crate::keyspace::VectorWriteError::WrongType) => ShardResponse::WrongType,
1980 Err(crate::keyspace::VectorWriteError::OutOfMemory) => ShardResponse::OutOfMemory,
1981 Err(crate::keyspace::VectorWriteError::IndexError(e)) => {
1982 ShardResponse::Err(format!("ERR vector index: {e}"))
1983 }
1984 Err(crate::keyspace::VectorWriteError::PartialBatch { applied, .. }) => {
1985 ShardResponse::VAddBatchResult {
1987 added_count: applied.len(),
1988 applied,
1989 }
1990 }
1991 }
1992 }
1993 #[cfg(feature = "vector")]
1994 ShardRequest::VSim {
1995 key,
1996 query,
1997 count,
1998 ef_search,
1999 } => match ks.vsim(key, query, *count, *ef_search) {
2000 Ok(results) => ShardResponse::VSimResult(
2001 results
2002 .into_iter()
2003 .map(|r| (r.element, r.distance))
2004 .collect(),
2005 ),
2006 Err(_) => ShardResponse::WrongType,
2007 },
2008 #[cfg(feature = "vector")]
2009 ShardRequest::VRem { key, element } => match ks.vrem(key, element) {
2010 Ok(removed) => ShardResponse::Bool(removed),
2011 Err(_) => ShardResponse::WrongType,
2012 },
2013 #[cfg(feature = "vector")]
2014 ShardRequest::VGet { key, element } => match ks.vget(key, element) {
2015 Ok(data) => ShardResponse::VectorData(data),
2016 Err(_) => ShardResponse::WrongType,
2017 },
2018 #[cfg(feature = "vector")]
2019 ShardRequest::VCard { key } => match ks.vcard(key) {
2020 Ok(count) => ShardResponse::Integer(count as i64),
2021 Err(_) => ShardResponse::WrongType,
2022 },
2023 #[cfg(feature = "vector")]
2024 ShardRequest::VDim { key } => match ks.vdim(key) {
2025 Ok(dim) => ShardResponse::Integer(dim as i64),
2026 Err(_) => ShardResponse::WrongType,
2027 },
2028 #[cfg(feature = "vector")]
2029 ShardRequest::VInfo { key } => match ks.vinfo(key) {
2030 Ok(Some(info)) => {
2031 let fields = vec![
2032 ("dim".to_owned(), info.dim.to_string()),
2033 ("count".to_owned(), info.count.to_string()),
2034 ("metric".to_owned(), info.metric.to_string()),
2035 ("quantization".to_owned(), info.quantization.to_string()),
2036 ("connectivity".to_owned(), info.connectivity.to_string()),
2037 ("expansion_add".to_owned(), info.expansion_add.to_string()),
2038 ];
2039 ShardResponse::VectorInfo(Some(fields))
2040 }
2041 Ok(None) => ShardResponse::VectorInfo(None),
2042 Err(_) => ShardResponse::WrongType,
2043 },
2044 #[cfg(feature = "protobuf")]
2045 ShardRequest::ProtoSet {
2046 key,
2047 type_name,
2048 data,
2049 expire,
2050 nx,
2051 xx,
2052 } => {
2053 if *nx && ks.exists(key) {
2054 return ShardResponse::Value(None);
2055 }
2056 if *xx && !ks.exists(key) {
2057 return ShardResponse::Value(None);
2058 }
2059 match ks.proto_set(key.clone(), type_name.clone(), data.clone(), *expire) {
2060 SetResult::Ok | SetResult::Blocked => ShardResponse::Ok,
2061 SetResult::OutOfMemory => ShardResponse::OutOfMemory,
2062 }
2063 }
2064 #[cfg(feature = "protobuf")]
2065 ShardRequest::ProtoGet { key } => match ks.proto_get(key) {
2066 Ok(val) => ShardResponse::ProtoValue(val),
2067 Err(_) => ShardResponse::WrongType,
2068 },
2069 #[cfg(feature = "protobuf")]
2070 ShardRequest::ProtoType { key } => match ks.proto_type(key) {
2071 Ok(name) => ShardResponse::ProtoTypeName(name),
2072 Err(_) => ShardResponse::WrongType,
2073 },
2074 #[cfg(feature = "protobuf")]
2077 ShardRequest::ProtoRegisterAof { .. } => ShardResponse::Ok,
2078 #[cfg(feature = "protobuf")]
2079 ShardRequest::ProtoSetField {
2080 key,
2081 field_path,
2082 value,
2083 } => dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2084 let new_data = reg.set_field(type_name, data, field_path, value)?;
2085 Ok(ShardResponse::ProtoFieldUpdated {
2086 type_name: type_name.to_owned(),
2087 data: new_data,
2088 expire: ttl,
2089 })
2090 }),
2091 #[cfg(feature = "protobuf")]
2092 ShardRequest::ProtoDelField { key, field_path } => {
2093 dispatch_proto_field_op(ks, schema_registry, key, |reg, type_name, data, ttl| {
2094 let new_data = reg.clear_field(type_name, data, field_path)?;
2095 Ok(ShardResponse::ProtoFieldUpdated {
2096 type_name: type_name.to_owned(),
2097 data: new_data,
2098 expire: ttl,
2099 })
2100 })
2101 }
2102 ShardRequest::Snapshot
2104 | ShardRequest::SerializeSnapshot
2105 | ShardRequest::RewriteAof
2106 | ShardRequest::FlushDbAsync
2107 | ShardRequest::BLPop { .. }
2108 | ShardRequest::BRPop { .. } => ShardResponse::Ok,
2109 }
2110}
2111
2112#[cfg(feature = "protobuf")]
2118fn dispatch_proto_field_op<F>(
2119 ks: &mut Keyspace,
2120 schema_registry: &Option<crate::schema::SharedSchemaRegistry>,
2121 key: &str,
2122 mutate: F,
2123) -> ShardResponse
2124where
2125 F: FnOnce(
2126 &crate::schema::SchemaRegistry,
2127 &str,
2128 &[u8],
2129 Option<Duration>,
2130 ) -> Result<ShardResponse, crate::schema::SchemaError>,
2131{
2132 let registry = match schema_registry {
2133 Some(r) => r,
2134 None => return ShardResponse::Err("protobuf support is not enabled".into()),
2135 };
2136
2137 let (type_name, data, remaining_ttl) = match ks.proto_get(key) {
2138 Ok(Some(tuple)) => tuple,
2139 Ok(None) => return ShardResponse::Value(None),
2140 Err(_) => return ShardResponse::WrongType,
2141 };
2142
2143 let reg = match registry.read() {
2144 Ok(r) => r,
2145 Err(_) => return ShardResponse::Err("schema registry lock poisoned".into()),
2146 };
2147
2148 let resp = match mutate(®, &type_name, &data, remaining_ttl) {
2149 Ok(r) => r,
2150 Err(e) => return ShardResponse::Err(e.to_string()),
2151 };
2152
2153 if let ShardResponse::ProtoFieldUpdated {
2155 ref type_name,
2156 ref data,
2157 expire,
2158 } = resp
2159 {
2160 ks.proto_set(key.to_owned(), type_name.clone(), data.clone(), expire);
2161 }
2162
2163 resp
2164}
2165
2166#[cfg(test)]
2167mod tests {
2168 use super::*;
2169
2170 fn test_dispatch(ks: &mut Keyspace, mut req: ShardRequest) -> ShardResponse {
2172 dispatch(
2173 ks,
2174 &mut req,
2175 #[cfg(feature = "protobuf")]
2176 &None,
2177 )
2178 }
2179
2180 #[test]
2181 fn dispatch_set_and_get() {
2182 let mut ks = Keyspace::new();
2183
2184 let resp = test_dispatch(
2185 &mut ks,
2186 ShardRequest::Set {
2187 key: "k".into(),
2188 value: Bytes::from("v"),
2189 expire: None,
2190 nx: false,
2191 xx: false,
2192 },
2193 );
2194 assert!(matches!(resp, ShardResponse::Ok));
2195
2196 let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "k".into() });
2197 match resp {
2198 ShardResponse::Value(Some(Value::String(data))) => {
2199 assert_eq!(data, Bytes::from("v"));
2200 }
2201 other => panic!("expected Value(Some(String)), got {other:?}"),
2202 }
2203 }
2204
2205 #[test]
2206 fn dispatch_get_missing() {
2207 let mut ks = Keyspace::new();
2208 let resp = test_dispatch(&mut ks, ShardRequest::Get { key: "nope".into() });
2209 assert!(matches!(resp, ShardResponse::Value(None)));
2210 }
2211
2212 #[test]
2213 fn dispatch_del() {
2214 let mut ks = Keyspace::new();
2215 ks.set("key".into(), Bytes::from("val"), None, false, false);
2216
2217 let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2218 assert!(matches!(resp, ShardResponse::Bool(true)));
2219
2220 let resp = test_dispatch(&mut ks, ShardRequest::Del { key: "key".into() });
2221 assert!(matches!(resp, ShardResponse::Bool(false)));
2222 }
2223
2224 #[test]
2225 fn dispatch_exists() {
2226 let mut ks = Keyspace::new();
2227 ks.set("yes".into(), Bytes::from("here"), None, false, false);
2228
2229 let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "yes".into() });
2230 assert!(matches!(resp, ShardResponse::Bool(true)));
2231
2232 let resp = test_dispatch(&mut ks, ShardRequest::Exists { key: "no".into() });
2233 assert!(matches!(resp, ShardResponse::Bool(false)));
2234 }
2235
2236 #[test]
2237 fn dispatch_expire_and_ttl() {
2238 let mut ks = Keyspace::new();
2239 ks.set("key".into(), Bytes::from("val"), None, false, false);
2240
2241 let resp = test_dispatch(
2242 &mut ks,
2243 ShardRequest::Expire {
2244 key: "key".into(),
2245 seconds: 60,
2246 },
2247 );
2248 assert!(matches!(resp, ShardResponse::Bool(true)));
2249
2250 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2251 match resp {
2252 ShardResponse::Ttl(TtlResult::Seconds(s)) => assert!((58..=60).contains(&s)),
2253 other => panic!("expected Ttl(Seconds), got {other:?}"),
2254 }
2255 }
2256
2257 #[test]
2258 fn dispatch_ttl_missing() {
2259 let mut ks = Keyspace::new();
2260 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "gone".into() });
2261 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2262 }
2263
2264 #[test]
2265 fn dispatch_incr_new_key() {
2266 let mut ks = Keyspace::new();
2267 let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "c".into() });
2268 assert!(matches!(resp, ShardResponse::Integer(1)));
2269 }
2270
2271 #[test]
2272 fn dispatch_decr_existing() {
2273 let mut ks = Keyspace::new();
2274 ks.set("n".into(), Bytes::from("10"), None, false, false);
2275 let resp = test_dispatch(&mut ks, ShardRequest::Decr { key: "n".into() });
2276 assert!(matches!(resp, ShardResponse::Integer(9)));
2277 }
2278
2279 #[test]
2280 fn dispatch_incr_non_integer() {
2281 let mut ks = Keyspace::new();
2282 ks.set("s".into(), Bytes::from("hello"), None, false, false);
2283 let resp = test_dispatch(&mut ks, ShardRequest::Incr { key: "s".into() });
2284 assert!(matches!(resp, ShardResponse::Err(_)));
2285 }
2286
2287 #[test]
2288 fn dispatch_incrby() {
2289 let mut ks = Keyspace::new();
2290 ks.set("n".into(), Bytes::from("10"), None, false, false);
2291 let resp = test_dispatch(
2292 &mut ks,
2293 ShardRequest::IncrBy {
2294 key: "n".into(),
2295 delta: 5,
2296 },
2297 );
2298 assert!(matches!(resp, ShardResponse::Integer(15)));
2299 }
2300
2301 #[test]
2302 fn dispatch_decrby() {
2303 let mut ks = Keyspace::new();
2304 ks.set("n".into(), Bytes::from("10"), None, false, false);
2305 let resp = test_dispatch(
2306 &mut ks,
2307 ShardRequest::DecrBy {
2308 key: "n".into(),
2309 delta: 3,
2310 },
2311 );
2312 assert!(matches!(resp, ShardResponse::Integer(7)));
2313 }
2314
2315 #[test]
2316 fn dispatch_incrby_new_key() {
2317 let mut ks = Keyspace::new();
2318 let resp = test_dispatch(
2319 &mut ks,
2320 ShardRequest::IncrBy {
2321 key: "new".into(),
2322 delta: 42,
2323 },
2324 );
2325 assert!(matches!(resp, ShardResponse::Integer(42)));
2326 }
2327
2328 #[test]
2329 fn dispatch_incrbyfloat() {
2330 let mut ks = Keyspace::new();
2331 ks.set("n".into(), Bytes::from("10.5"), None, false, false);
2332 let resp = test_dispatch(
2333 &mut ks,
2334 ShardRequest::IncrByFloat {
2335 key: "n".into(),
2336 delta: 2.3,
2337 },
2338 );
2339 match resp {
2340 ShardResponse::BulkString(val) => {
2341 let f: f64 = val.parse().unwrap();
2342 assert!((f - 12.8).abs() < 0.001);
2343 }
2344 other => panic!("expected BulkString, got {other:?}"),
2345 }
2346 }
2347
2348 #[test]
2349 fn dispatch_append() {
2350 let mut ks = Keyspace::new();
2351 ks.set("k".into(), Bytes::from("hello"), None, false, false);
2352 let resp = test_dispatch(
2353 &mut ks,
2354 ShardRequest::Append {
2355 key: "k".into(),
2356 value: Bytes::from(" world"),
2357 },
2358 );
2359 assert!(matches!(resp, ShardResponse::Len(11)));
2360 }
2361
2362 #[test]
2363 fn dispatch_strlen() {
2364 let mut ks = Keyspace::new();
2365 ks.set("k".into(), Bytes::from("hello"), None, false, false);
2366 let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "k".into() });
2367 assert!(matches!(resp, ShardResponse::Len(5)));
2368 }
2369
2370 #[test]
2371 fn dispatch_strlen_missing() {
2372 let mut ks = Keyspace::new();
2373 let resp = test_dispatch(&mut ks, ShardRequest::Strlen { key: "nope".into() });
2374 assert!(matches!(resp, ShardResponse::Len(0)));
2375 }
2376
2377 #[test]
2378 fn dispatch_incrbyfloat_new_key() {
2379 let mut ks = Keyspace::new();
2380 let resp = test_dispatch(
2381 &mut ks,
2382 ShardRequest::IncrByFloat {
2383 key: "new".into(),
2384 delta: 2.72,
2385 },
2386 );
2387 match resp {
2388 ShardResponse::BulkString(val) => {
2389 let f: f64 = val.parse().unwrap();
2390 assert!((f - 2.72).abs() < 0.001);
2391 }
2392 other => panic!("expected BulkString, got {other:?}"),
2393 }
2394 }
2395
2396 #[test]
2397 fn dispatch_persist_removes_ttl() {
2398 let mut ks = Keyspace::new();
2399 ks.set(
2400 "key".into(),
2401 Bytes::from("val"),
2402 Some(Duration::from_secs(60)),
2403 false,
2404 false,
2405 );
2406
2407 let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "key".into() });
2408 assert!(matches!(resp, ShardResponse::Bool(true)));
2409
2410 let resp = test_dispatch(&mut ks, ShardRequest::Ttl { key: "key".into() });
2411 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NoExpiry)));
2412 }
2413
2414 #[test]
2415 fn dispatch_persist_missing_key() {
2416 let mut ks = Keyspace::new();
2417 let resp = test_dispatch(&mut ks, ShardRequest::Persist { key: "nope".into() });
2418 assert!(matches!(resp, ShardResponse::Bool(false)));
2419 }
2420
2421 #[test]
2422 fn dispatch_pttl() {
2423 let mut ks = Keyspace::new();
2424 ks.set(
2425 "key".into(),
2426 Bytes::from("val"),
2427 Some(Duration::from_secs(60)),
2428 false,
2429 false,
2430 );
2431
2432 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2433 match resp {
2434 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2435 assert!(ms > 59_000 && ms <= 60_000);
2436 }
2437 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2438 }
2439 }
2440
2441 #[test]
2442 fn dispatch_pttl_missing() {
2443 let mut ks = Keyspace::new();
2444 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "nope".into() });
2445 assert!(matches!(resp, ShardResponse::Ttl(TtlResult::NotFound)));
2446 }
2447
2448 #[test]
2449 fn dispatch_pexpire() {
2450 let mut ks = Keyspace::new();
2451 ks.set("key".into(), Bytes::from("val"), None, false, false);
2452
2453 let resp = test_dispatch(
2454 &mut ks,
2455 ShardRequest::Pexpire {
2456 key: "key".into(),
2457 milliseconds: 5000,
2458 },
2459 );
2460 assert!(matches!(resp, ShardResponse::Bool(true)));
2461
2462 let resp = test_dispatch(&mut ks, ShardRequest::Pttl { key: "key".into() });
2463 match resp {
2464 ShardResponse::Ttl(TtlResult::Milliseconds(ms)) => {
2465 assert!(ms > 4000 && ms <= 5000);
2466 }
2467 other => panic!("expected Ttl(Milliseconds), got {other:?}"),
2468 }
2469 }
2470
2471 #[test]
2472 fn dispatch_set_nx_when_key_missing() {
2473 let mut ks = Keyspace::new();
2474 let resp = test_dispatch(
2475 &mut ks,
2476 ShardRequest::Set {
2477 key: "k".into(),
2478 value: Bytes::from("v"),
2479 expire: None,
2480 nx: true,
2481 xx: false,
2482 },
2483 );
2484 assert!(matches!(resp, ShardResponse::Ok));
2485 assert!(ks.exists("k"));
2486 }
2487
2488 #[test]
2489 fn dispatch_set_nx_when_key_exists() {
2490 let mut ks = Keyspace::new();
2491 ks.set("k".into(), Bytes::from("old"), None, false, false);
2492
2493 let resp = test_dispatch(
2494 &mut ks,
2495 ShardRequest::Set {
2496 key: "k".into(),
2497 value: Bytes::from("new"),
2498 expire: None,
2499 nx: true,
2500 xx: false,
2501 },
2502 );
2503 assert!(matches!(resp, ShardResponse::Value(None)));
2505 match ks.get("k").unwrap() {
2507 Some(Value::String(data)) => assert_eq!(data, Bytes::from("old")),
2508 other => panic!("expected old value, got {other:?}"),
2509 }
2510 }
2511
2512 #[test]
2513 fn dispatch_set_xx_when_key_exists() {
2514 let mut ks = Keyspace::new();
2515 ks.set("k".into(), Bytes::from("old"), None, false, false);
2516
2517 let resp = test_dispatch(
2518 &mut ks,
2519 ShardRequest::Set {
2520 key: "k".into(),
2521 value: Bytes::from("new"),
2522 expire: None,
2523 nx: false,
2524 xx: true,
2525 },
2526 );
2527 assert!(matches!(resp, ShardResponse::Ok));
2528 match ks.get("k").unwrap() {
2529 Some(Value::String(data)) => assert_eq!(data, Bytes::from("new")),
2530 other => panic!("expected new value, got {other:?}"),
2531 }
2532 }
2533
2534 #[test]
2535 fn dispatch_set_xx_when_key_missing() {
2536 let mut ks = Keyspace::new();
2537 let resp = test_dispatch(
2538 &mut ks,
2539 ShardRequest::Set {
2540 key: "k".into(),
2541 value: Bytes::from("v"),
2542 expire: None,
2543 nx: false,
2544 xx: true,
2545 },
2546 );
2547 assert!(matches!(resp, ShardResponse::Value(None)));
2549 assert!(!ks.exists("k"));
2550 }
2551
2552 #[test]
2553 fn dispatch_flushdb_clears_all_keys() {
2554 let mut ks = Keyspace::new();
2555 ks.set("a".into(), Bytes::from("1"), None, false, false);
2556 ks.set("b".into(), Bytes::from("2"), None, false, false);
2557
2558 assert_eq!(ks.len(), 2);
2559
2560 let resp = test_dispatch(&mut ks, ShardRequest::FlushDb);
2561 assert!(matches!(resp, ShardResponse::Ok));
2562 assert_eq!(ks.len(), 0);
2563 }
2564
2565 #[test]
2566 fn dispatch_scan_returns_keys() {
2567 let mut ks = Keyspace::new();
2568 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2569 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2570 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2571
2572 let resp = test_dispatch(
2573 &mut ks,
2574 ShardRequest::Scan {
2575 cursor: 0,
2576 count: 10,
2577 pattern: None,
2578 },
2579 );
2580
2581 match resp {
2582 ShardResponse::Scan { cursor, keys } => {
2583 assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
2585 }
2586 _ => panic!("expected Scan response"),
2587 }
2588 }
2589
2590 #[test]
2591 fn dispatch_scan_with_pattern() {
2592 let mut ks = Keyspace::new();
2593 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2594 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2595 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2596
2597 let resp = test_dispatch(
2598 &mut ks,
2599 ShardRequest::Scan {
2600 cursor: 0,
2601 count: 10,
2602 pattern: Some("user:*".into()),
2603 },
2604 );
2605
2606 match resp {
2607 ShardResponse::Scan { cursor, keys } => {
2608 assert_eq!(cursor, 0);
2609 assert_eq!(keys.len(), 2);
2610 for k in &keys {
2611 assert!(k.starts_with("user:"));
2612 }
2613 }
2614 _ => panic!("expected Scan response"),
2615 }
2616 }
2617
2618 #[test]
2619 fn dispatch_keys() {
2620 let mut ks = Keyspace::new();
2621 ks.set("user:1".into(), Bytes::from("a"), None, false, false);
2622 ks.set("user:2".into(), Bytes::from("b"), None, false, false);
2623 ks.set("item:1".into(), Bytes::from("c"), None, false, false);
2624 let resp = test_dispatch(
2625 &mut ks,
2626 ShardRequest::Keys {
2627 pattern: "user:*".into(),
2628 },
2629 );
2630 match resp {
2631 ShardResponse::StringArray(mut keys) => {
2632 keys.sort();
2633 assert_eq!(keys, vec!["user:1", "user:2"]);
2634 }
2635 other => panic!("expected StringArray, got {other:?}"),
2636 }
2637 }
2638
2639 #[test]
2640 fn dispatch_rename() {
2641 let mut ks = Keyspace::new();
2642 ks.set("old".into(), Bytes::from("value"), None, false, false);
2643 let resp = test_dispatch(
2644 &mut ks,
2645 ShardRequest::Rename {
2646 key: "old".into(),
2647 newkey: "new".into(),
2648 },
2649 );
2650 assert!(matches!(resp, ShardResponse::Ok));
2651 assert!(!ks.exists("old"));
2652 assert!(ks.exists("new"));
2653 }
2654
2655 #[test]
2656 fn dispatch_rename_missing_key() {
2657 let mut ks = Keyspace::new();
2658 let resp = test_dispatch(
2659 &mut ks,
2660 ShardRequest::Rename {
2661 key: "missing".into(),
2662 newkey: "new".into(),
2663 },
2664 );
2665 assert!(matches!(resp, ShardResponse::Err(_)));
2666 }
2667
2668 #[test]
2669 fn dump_key_returns_serialized_value() {
2670 let mut ks = Keyspace::new();
2671 ks.set(
2672 "greeting".into(),
2673 Bytes::from("hello"),
2674 Some(Duration::from_secs(60)),
2675 false,
2676 false,
2677 );
2678
2679 let resp = test_dispatch(
2680 &mut ks,
2681 ShardRequest::DumpKey {
2682 key: "greeting".into(),
2683 },
2684 );
2685 match resp {
2686 ShardResponse::KeyDump { data, ttl_ms } => {
2687 assert!(!data.is_empty());
2688 assert!(ttl_ms > 0);
2689 let snap = snapshot::deserialize_snap_value(&data).unwrap();
2691 assert!(matches!(snap, SnapValue::String(ref b) if b == &Bytes::from("hello")));
2692 }
2693 other => panic!("expected KeyDump, got {other:?}"),
2694 }
2695 }
2696
2697 #[test]
2698 fn dump_key_missing_returns_none() {
2699 let mut ks = Keyspace::new();
2700 let resp = test_dispatch(&mut ks, ShardRequest::DumpKey { key: "nope".into() });
2701 assert!(matches!(resp, ShardResponse::Value(None)));
2702 }
2703
2704 #[test]
2705 fn restore_key_inserts_value() {
2706 let mut ks = Keyspace::new();
2707 let snap = SnapValue::String(Bytes::from("restored"));
2708 let data = snapshot::serialize_snap_value(&snap).unwrap();
2709
2710 let resp = test_dispatch(
2711 &mut ks,
2712 ShardRequest::RestoreKey {
2713 key: "mykey".into(),
2714 ttl_ms: 0,
2715 data: Bytes::from(data),
2716 replace: false,
2717 },
2718 );
2719 assert!(matches!(resp, ShardResponse::Ok));
2720 assert_eq!(
2721 ks.get("mykey").unwrap(),
2722 Some(Value::String(Bytes::from("restored")))
2723 );
2724 }
2725
2726 #[test]
2727 fn restore_key_with_ttl() {
2728 let mut ks = Keyspace::new();
2729 let snap = SnapValue::String(Bytes::from("temp"));
2730 let data = snapshot::serialize_snap_value(&snap).unwrap();
2731
2732 let resp = test_dispatch(
2733 &mut ks,
2734 ShardRequest::RestoreKey {
2735 key: "ttlkey".into(),
2736 ttl_ms: 30_000,
2737 data: Bytes::from(data),
2738 replace: false,
2739 },
2740 );
2741 assert!(matches!(resp, ShardResponse::Ok));
2742 match ks.pttl("ttlkey") {
2743 TtlResult::Milliseconds(ms) => assert!(ms > 29_000 && ms <= 30_000),
2744 other => panic!("expected Milliseconds, got {other:?}"),
2745 }
2746 }
2747
2748 #[test]
2749 fn restore_key_rejects_duplicate_without_replace() {
2750 let mut ks = Keyspace::new();
2751 ks.set("existing".into(), Bytes::from("old"), None, false, false);
2752
2753 let snap = SnapValue::String(Bytes::from("new"));
2754 let data = snapshot::serialize_snap_value(&snap).unwrap();
2755
2756 let resp = test_dispatch(
2757 &mut ks,
2758 ShardRequest::RestoreKey {
2759 key: "existing".into(),
2760 ttl_ms: 0,
2761 data: Bytes::from(data),
2762 replace: false,
2763 },
2764 );
2765 assert!(matches!(resp, ShardResponse::Err(_)));
2766 assert_eq!(
2768 ks.get("existing").unwrap(),
2769 Some(Value::String(Bytes::from("old")))
2770 );
2771 }
2772
2773 #[test]
2774 fn restore_key_replace_overwrites() {
2775 let mut ks = Keyspace::new();
2776 ks.set("existing".into(), Bytes::from("old"), None, false, false);
2777
2778 let snap = SnapValue::String(Bytes::from("new"));
2779 let data = snapshot::serialize_snap_value(&snap).unwrap();
2780
2781 let resp = test_dispatch(
2782 &mut ks,
2783 ShardRequest::RestoreKey {
2784 key: "existing".into(),
2785 ttl_ms: 0,
2786 data: Bytes::from(data),
2787 replace: true,
2788 },
2789 );
2790 assert!(matches!(resp, ShardResponse::Ok));
2791 assert_eq!(
2792 ks.get("existing").unwrap(),
2793 Some(Value::String(Bytes::from("new")))
2794 );
2795 }
2796
2797 #[test]
2798 fn dump_and_restore_hash_roundtrip() {
2799 let mut ks = Keyspace::new();
2800 ks.hset(
2801 "myhash",
2802 &[
2803 ("f1".into(), Bytes::from("v1")),
2804 ("f2".into(), Bytes::from("v2")),
2805 ],
2806 )
2807 .unwrap();
2808
2809 let resp = test_dispatch(
2811 &mut ks,
2812 ShardRequest::DumpKey {
2813 key: "myhash".into(),
2814 },
2815 );
2816 let (data, _ttl) = match resp {
2817 ShardResponse::KeyDump { data, ttl_ms } => (data, ttl_ms),
2818 other => panic!("expected KeyDump, got {other:?}"),
2819 };
2820
2821 let resp = test_dispatch(
2823 &mut ks,
2824 ShardRequest::RestoreKey {
2825 key: "myhash2".into(),
2826 ttl_ms: 0,
2827 data: Bytes::from(data),
2828 replace: false,
2829 },
2830 );
2831 assert!(matches!(resp, ShardResponse::Ok));
2832
2833 assert_eq!(ks.hget("myhash2", "f1").unwrap(), Some(Bytes::from("v1")));
2835 assert_eq!(ks.hget("myhash2", "f2").unwrap(), Some(Bytes::from("v2")));
2836 }
2837
2838 #[test]
2839 fn is_write_classifies_correctly() {
2840 assert!(ShardRequest::Set {
2842 key: "k".into(),
2843 value: Bytes::from("v"),
2844 expire: None,
2845 nx: false,
2846 xx: false,
2847 }
2848 .is_write());
2849 assert!(ShardRequest::Del { key: "k".into() }.is_write());
2850 assert!(ShardRequest::Incr { key: "k".into() }.is_write());
2851 assert!(ShardRequest::LPush {
2852 key: "k".into(),
2853 values: vec![],
2854 }
2855 .is_write());
2856 assert!(ShardRequest::HSet {
2857 key: "k".into(),
2858 fields: vec![],
2859 }
2860 .is_write());
2861 assert!(ShardRequest::SAdd {
2862 key: "k".into(),
2863 members: vec![],
2864 }
2865 .is_write());
2866 assert!(ShardRequest::FlushDb.is_write());
2867
2868 assert!(!ShardRequest::Get { key: "k".into() }.is_write());
2870 assert!(!ShardRequest::Exists { key: "k".into() }.is_write());
2871 assert!(!ShardRequest::Ttl { key: "k".into() }.is_write());
2872 assert!(!ShardRequest::DbSize.is_write());
2873 assert!(!ShardRequest::Stats.is_write());
2874 assert!(!ShardRequest::LLen { key: "k".into() }.is_write());
2875 assert!(!ShardRequest::HGet {
2876 key: "k".into(),
2877 field: "f".into(),
2878 }
2879 .is_write());
2880 assert!(!ShardRequest::SMembers { key: "k".into() }.is_write());
2881 }
2882}