1use super::*;
2use crate::{
3 Int,
4 cmd::{CmdExecutor, CmdUnparsed},
5 conf::AccessControl,
6 error::{RutinError, RutinResult},
7 frame::Resp3,
8 persist::rdb::{
9 encode_hash_value, encode_list_value, encode_set_value, encode_str_value, encode_zset_value,
10 },
11 server::{AsyncStream, Handler, NEVER_EXPIRE, UNIX_EPOCH},
12 shared::db::ObjectValueType,
13 util::atoi,
14};
15use bytes::BytesMut;
16use itertools::Itertools;
17use std::{fmt::Debug, slice, time::Duration};
18use tokio::time::Instant;
19use tracing::instrument;
20
21#[derive(Debug)]
22enum Opt {
23 NX, XX, GT, LT, }
28
29impl TryFrom<&[u8]> for Opt {
30 type Error = RutinError;
31
32 fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
33 match value {
34 b"NX" => Ok(Opt::NX),
35 b"XX" => Ok(Opt::XX),
36 b"GT" => Ok(Opt::GT),
37 b"LT" => Ok(Opt::LT),
38 _ => Err("ERR invalid option is given".into()),
39 }
40 }
41}
42
43#[derive(Debug)]
48pub struct Del<A> {
49 pub keys: Vec<A>,
50}
51
52impl<A> CmdExecutor<A> for Del<A>
53where
54 A: CmdArg,
55{
56 #[instrument(
57 level = "debug",
58 skip(handler),
59 ret(level = "debug"),
60 err(level = "debug")
61 )]
62 async fn execute(
63 self,
64 handler: &mut Handler<impl AsyncStream>,
65 ) -> RutinResult<Option<CheapResp3>> {
66 let db = handler.shared.db();
67 let mut count = 0;
68
69 for key in self.keys {
70 if db.remove_object(&key).await.is_some() {
71 count += 1;
72 }
73 }
74
75 Ok(Some(Resp3::new_integer(count)))
76 }
77
78 fn parse(args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
79 if args.is_empty() {
80 return Err(RutinError::WrongArgNum);
81 }
82
83 let keys = args
84 .map(|k| {
85 if ac.deny_reading_or_writing_key(&k, Self::CATS_FLAG) {
86 return Err(RutinError::NoPermission);
87 }
88 Ok(k)
89 })
90 .try_collect()?;
91
92 Ok(Del { keys })
93 }
94
95 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
96 where
97 A: 'a,
98 {
99 Some(self.keys.iter())
100 }
101}
102
103#[derive(Debug)]
109pub struct Dump<A> {
110 pub key: A,
111}
112
113impl<A> CmdExecutor<A> for Dump<A>
114where
115 A: CmdArg,
116{
117 #[instrument(
118 level = "debug",
119 skip(handler),
120 ret(level = "debug"),
121 err(level = "debug")
122 )]
123 async fn execute(
124 self,
125 handler: &mut Handler<impl AsyncStream>,
126 ) -> RutinResult<Option<CheapResp3>> {
127 let mut buf = BytesMut::with_capacity(1024);
128 handler
129 .shared
130 .db()
131 .visit_object(self.key.as_ref(), |obj| {
132 match obj.typ() {
133 ObjectValueType::Str => encode_str_value(&mut buf, obj.on_str()?),
134 ObjectValueType::List => encode_list_value(&mut buf, obj.on_list()?),
135 ObjectValueType::Set => encode_set_value(&mut buf, obj.on_set()?),
136 ObjectValueType::Hash => encode_hash_value(&mut buf, obj.on_hash()?),
137 ObjectValueType::ZSet => encode_zset_value(&mut buf, obj.on_zset()?),
138 }
139
140 Ok(())
141 })
142 .await?;
143
144 Ok(Some(Resp3::new_blob_string(buf.freeze())))
145 }
146
147 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
148 if args.len() != 1 {
149 return Err(RutinError::WrongArgNum);
150 }
151
152 let key = args.next().unwrap();
153 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
154 return Err(RutinError::NoPermission);
155 }
156
157 Ok(Dump { key })
158 }
159
160 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
161 where
162 A: 'a,
163 {
164 Some(slice::from_ref(&self.key).iter())
165 }
166}
167
168#[derive(Debug)]
173pub struct Exists<A> {
174 pub keys: Vec<A>,
175}
176
177impl<A> CmdExecutor<A> for Exists<A>
178where
179 A: CmdArg,
180{
181 #[instrument(
182 level = "debug",
183 skip(handler),
184 ret(level = "debug"),
185 err(level = "debug")
186 )]
187 async fn execute(
188 self,
189 handler: &mut Handler<impl AsyncStream>,
190 ) -> RutinResult<Option<CheapResp3>> {
191 for key in self.keys {
192 if !handler.shared.db().contains_object(key.as_ref()).await {
193 return Err(RutinError::from(0));
194 }
195 }
196
197 Ok(Some(Resp3::new_integer(1)))
198 }
199
200 fn parse(args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
201 if args.is_empty() {
202 return Err(RutinError::WrongArgNum);
203 }
204
205 let keys = args
206 .map(|k| {
207 if ac.deny_reading_or_writing_key(&k, Self::CATS_FLAG) {
208 return Err(RutinError::NoPermission);
209 }
210 Ok(k)
211 })
212 .try_collect()?;
213
214 Ok(Exists { keys })
215 }
216
217 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
218 where
219 A: 'a,
220 {
221 Some(self.keys.iter())
222 }
223}
224
225#[derive(Debug)]
231pub struct Expire<A> {
232 key: A,
233 seconds: Duration,
234 opt: Option<Opt>,
235}
236
237impl<A> CmdExecutor<A> for Expire<A>
238where
239 A: CmdArg,
240{
241 #[instrument(
242 level = "debug",
243 skip(handler),
244 ret(level = "debug"),
245 err(level = "debug")
246 )]
247 async fn execute(
248 self,
249 handler: &mut Handler<impl AsyncStream>,
250 ) -> RutinResult<Option<CheapResp3>> {
251 let new_ex = Instant::now() + self.seconds;
252
253 let mut obj = handler.shared.db().object_entry(&self.key).await?;
254
255 let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
256
257 match self.opt {
258 Some(Opt::NX) => {
260 if *ex == *NEVER_EXPIRE {
261 *ex = new_ex;
262 return Ok(Some(Resp3::new_integer(1)));
263 }
264 }
265 Some(Opt::XX) => {
267 if *ex != *NEVER_EXPIRE {
268 *ex = new_ex;
269 return Ok(Some(Resp3::new_integer(1)));
270 }
271 }
272 Some(Opt::GT) => {
274 if new_ex > *ex {
275 *ex = new_ex;
276 return Ok(Some(Resp3::new_integer(1)));
277 }
278 }
279 Some(Opt::LT) => {
281 if new_ex < *ex {
282 *ex = new_ex;
283 return Ok(Some(Resp3::new_integer(1)));
284 }
285 }
286 None => {
287 *ex = new_ex;
288 return Ok(Some(Resp3::new_integer(1)));
289 }
290 }
291
292 Err(RutinError::from(0))
293 }
294
295 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
296 if args.len() != 2 && args.len() != 3 {
297 return Err(RutinError::WrongArgNum);
298 }
299
300 let key = args.next().unwrap();
301 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
302 return Err(RutinError::NoPermission);
303 }
304
305 let seconds = Duration::from_secs(atoi(args.next().unwrap().as_ref())?);
306 let opt = match args.next() {
307 Some(b) => Some(Opt::try_from(b.as_ref())?),
308 None => None,
309 };
310
311 Ok(Expire { key, seconds, opt })
312 }
313
314 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
315 where
316 A: 'a,
317 {
318 Some(slice::from_ref(&self.key).iter())
319 }
320}
321
322#[derive(Debug)]
327pub struct ExpireAt<A> {
328 key: A,
329 timestamp: Instant,
330 opt: Option<Opt>,
331}
332
333impl<A> CmdExecutor<A> for ExpireAt<A>
334where
335 A: CmdArg,
336{
337 #[instrument(
338 level = "debug",
339 skip(handler),
340 ret(level = "debug"),
341 err(level = "debug")
342 )]
343 async fn execute(
344 self,
345 handler: &mut Handler<impl AsyncStream>,
346 ) -> RutinResult<Option<CheapResp3>> {
347 let new_ex = self.timestamp;
348
349 let mut obj = handler.shared.db().object_entry(&self.key).await?;
350
351 let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
352
353 match self.opt {
354 Some(Opt::NX) => {
356 if *ex == *NEVER_EXPIRE {
357 *ex = new_ex;
358 return Ok(Some(Resp3::new_integer(1)));
359 }
360 }
361 Some(Opt::XX) => {
363 if *ex != *NEVER_EXPIRE {
364 *ex = new_ex;
365 return Ok(Some(Resp3::new_integer(1)));
366 }
367 }
368 Some(Opt::GT) => {
370 if new_ex > *ex {
371 *ex = new_ex;
372 return Ok(Some(Resp3::new_integer(1)));
373 }
374 }
375 Some(Opt::LT) => {
377 if new_ex < *ex {
378 *ex = new_ex;
379 return Ok(Some(Resp3::new_integer(1)));
380 }
381 }
382 None => {
383 *ex = new_ex;
384 return Ok(Some(Resp3::new_integer(1)));
385 }
386 }
387
388 Err(RutinError::from(0))
389 }
390
391 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
392 if args.len() != 2 && args.len() != 3 {
393 return Err(RutinError::WrongArgNum);
394 }
395
396 let key = args.next().unwrap();
397 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
398 return Err(RutinError::NoPermission);
399 }
400
401 let timestamp = atoi::<u64>(args.next().unwrap().as_ref())?;
402 let timestamp = *UNIX_EPOCH + Duration::from_secs(timestamp);
403 if timestamp <= Instant::now() {
404 return Err("ERR invalid timestamp".into());
405 }
406
407 let opt = match args.next() {
408 Some(b) => Some(Opt::try_from(b.as_ref())?),
409 None => None,
410 };
411
412 Ok(ExpireAt {
413 key,
414 timestamp,
415 opt,
416 })
417 }
418
419 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
420 where
421 A: 'a,
422 {
423 Some(slice::from_ref(&self.key).iter())
424 }
425}
426
427#[derive(Debug)]
433pub struct ExpireTime<A> {
434 pub key: A,
435}
436
437impl<A> CmdExecutor<A> for ExpireTime<A>
438where
439 A: CmdArg,
440{
441 #[instrument(
442 level = "debug",
443 skip(handler),
444 ret(level = "debug"),
445 err(level = "debug")
446 )]
447 async fn execute(
448 self,
449 handler: &mut Handler<impl AsyncStream>,
450 ) -> RutinResult<Option<CheapResp3>> {
451 let ex = handler
452 .shared
453 .db()
454 .get_object_expired(self.key.as_ref())
455 .await
456 .map_err(|_| RutinError::from(-1))?;
457
458 if ex != *NEVER_EXPIRE {
459 Ok(Some(Resp3::new_integer(
460 ex.duration_since(*UNIX_EPOCH).as_secs() as Int,
461 )))
462 } else {
463 Err((-2).into())
465 }
466 }
467
468 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
469 if args.len() != 1 {
470 return Err(RutinError::WrongArgNum);
471 }
472
473 let key = args.next().unwrap();
474 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
475 return Err(RutinError::NoPermission);
476 }
477
478 Ok(ExpireTime { key })
479 }
480
481 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
482 where
483 A: 'a,
484 {
485 Some(slice::from_ref(&self.key).iter())
486 }
487}
488
489#[derive(Debug)]
493pub struct Keys<A> {
494 pub pattern: A,
495}
496
497impl<A> CmdExecutor<A> for Keys<A>
498where
499 A: CmdArg,
500{
501 #[instrument(
502 level = "debug",
503 skip(handler),
504 ret(level = "debug"),
505 err(level = "debug")
506 )]
507 async fn execute(
508 self,
509 handler: &mut Handler<impl AsyncStream>,
510 ) -> RutinResult<Option<CheapResp3>> {
511 let shared = handler.shared;
512 let re = regex::bytes::Regex::new(std::str::from_utf8(self.pattern.as_ref())?)?;
514
515 let matched_keys = shared
518 .db()
519 .entries
520 .iter()
521 .filter_map(|entry| {
522 re.is_match(entry.key())
523 .then(|| CheapResp3::new_blob_string(entry.key().clone()))
524 })
525 .collect::<Vec<CheapResp3>>();
526
527 Ok(Some(Resp3::new_array(matched_keys)))
533 }
534
535 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
536 if args.len() != 1 {
537 return Err(RutinError::WrongArgNum);
538 }
539
540 let pattern = args.next().unwrap();
541 if ac.deny_reading_or_writing_key(&pattern, Self::CATS_FLAG) {
542 return Err(RutinError::NoPermission);
543 }
544
545 Ok(Keys { pattern })
546 }
547
548 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
549 where
550 A: 'a,
551 {
552 None::<std::option::Iter<'a, A>>
554 }
555}
556
557#[derive(Debug)]
563pub struct Persist<A> {
564 pub key: A,
565}
566impl<A> CmdExecutor<A> for Persist<A>
567where
568 A: CmdArg,
569{
570 #[instrument(
571 level = "debug",
572 skip(handler),
573 ret(level = "debug"),
574 err(level = "debug")
575 )]
576 async fn execute(
577 self,
578 handler: &mut Handler<impl AsyncStream>,
579 ) -> RutinResult<Option<CheapResp3>> {
580 let mut obj = handler.shared.db().object_entry(self.key.as_ref()).await?;
581 let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
582
583 if *ex == *NEVER_EXPIRE {
584 return Err(0.into());
585 }
586
587 *ex = *NEVER_EXPIRE;
588
589 Ok(Some(Resp3::new_integer(1)))
590 }
591
592 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
593 if args.len() != 1 {
594 return Err(RutinError::WrongArgNum);
595 }
596
597 let key = args.next().unwrap();
598 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
599 return Err(RutinError::NoPermission);
600 }
601
602 Ok(Persist { key })
603 }
604
605 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
606 where
607 A: 'a,
608 {
609 Some(slice::from_ref(&self.key).iter())
610 }
611}
612
613#[derive(Debug)]
620pub struct Pttl<A> {
621 pub key: A,
622}
623
624impl<A> CmdExecutor<A> for Pttl<A>
625where
626 A: CmdArg,
627{
628 #[instrument(
629 level = "debug",
630 skip(handler),
631 ret(level = "debug"),
632 err(level = "debug")
633 )]
634 async fn execute(
635 self,
636 handler: &mut Handler<impl AsyncStream>,
637 ) -> RutinResult<Option<CheapResp3>> {
638 let ex = handler
639 .shared
640 .db()
641 .get_object_expired(self.key.as_ref())
642 .await
643 .map_err(|_| RutinError::from(-2))?;
644
645 if ex != *NEVER_EXPIRE {
646 let pttl = (ex - Instant::now()).as_millis();
647 Ok(Some(Resp3::new_integer(pttl as Int)))
648 } else {
649 Err((-1).into())
650 }
651 }
652
653 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
654 if args.len() != 1 {
655 return Err(RutinError::WrongArgNum);
656 }
657
658 let key = args.next().unwrap();
659 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
660 return Err(RutinError::NoPermission);
661 }
662
663 Ok(Pttl { key })
664 }
665
666 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
667 where
668 A: 'a,
669 {
670 Some(slice::from_ref(&self.key).iter())
671 }
672}
673
674#[derive(Debug)]
681pub struct Ttl<A> {
682 pub key: A,
683}
684
685impl<A> CmdExecutor<A> for Ttl<A>
686where
687 A: CmdArg,
688{
689 #[instrument(
690 level = "debug",
691 skip(handler),
692 ret(level = "debug"),
693 err(level = "debug")
694 )]
695 async fn execute(
696 self,
697 handler: &mut Handler<impl AsyncStream>,
698 ) -> RutinResult<Option<CheapResp3>> {
699 let ex = handler
700 .shared
701 .db()
702 .get_object_expired(self.key.as_ref())
703 .await
704 .map_err(|_| RutinError::from(-2))?;
705
706 if ex != *NEVER_EXPIRE {
707 let ttl = (ex - Instant::now()).as_secs();
708 Ok(Some(Resp3::new_integer(ttl as Int)))
709 } else {
710 Err((-1).into())
711 }
712 }
713
714 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
715 if args.len() != 1 {
716 return Err(RutinError::WrongArgNum);
717 }
718
719 let key = args.next().unwrap();
720 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
721 return Err(RutinError::NoPermission);
722 }
723
724 Ok(Ttl { key })
725 }
726
727 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
728 where
729 A: 'a,
730 {
731 Some(slice::from_ref(&self.key).iter())
732 }
733}
734
735#[derive(Debug)]
740pub struct Type<A> {
741 pub key: A,
742}
743
744impl<A> CmdExecutor<A> for Type<A>
745where
746 A: CmdArg,
747{
748 #[instrument(
749 level = "debug",
750 skip(handler),
751 ret(level = "debug"),
752 err(level = "debug")
753 )]
754 async fn execute(
755 self,
756 handler: &mut Handler<impl AsyncStream>,
757 ) -> RutinResult<Option<CheapResp3>> {
758 let typ = handler
759 .shared
760 .db()
761 .get_object(self.key.as_ref())
762 .await?
763 .type_str();
764
765 Ok(Some(Resp3::new_simple_string(typ)))
766 }
767
768 fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
769 if args.len() != 1 {
770 return Err(RutinError::WrongArgNum);
771 }
772
773 let key = args.next().unwrap();
774 if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
775 return Err(RutinError::NoPermission);
776 }
777
778 Ok(Type { key })
779 }
780
781 fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
782 where
783 A: 'a,
784 {
785 Some(slice::from_ref(&self.key).iter())
786 }
787}
788
789#[cfg(test)]
790mod cmd_key_tests {
791 use super::*;
792 use crate::{
793 server::{NEVER_EXPIRE, UNIX_EPOCH},
794 shared::db::{Hash, List, Object, Set, Str, ZSet},
795 util::gen_test_handler,
796 };
797
798 const ALLOWED_DELTA: u64 = 3;
800
801 #[tokio::test]
802 async fn del_test() {
803 let mut handler = gen_test_handler();
804 let db = handler.shared.db();
805
806 db.insert_object(
807 "key1".as_bytes(),
808 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
809 )
810 .await
811 .unwrap();
812 assert!(db.contains_object("key1".as_bytes()).await);
813
814 let del_res = Del::test(&["DEL", "key1"], &mut handler)
816 .await
817 .unwrap()
818 .unwrap();
819 assert_eq!(del_res.into_integer_unchecked(), 1);
820 assert!(!handler.shared.db().contains_object("key1".as_bytes()).await);
821
822 let del_res = Del::test(&["DEL", "key_nil"], &mut handler)
824 .await
825 .unwrap()
826 .unwrap();
827 assert_eq!(del_res.into_integer_unchecked(), 0);
828 }
829
830 #[tokio::test]
831 async fn exists_test() {
832 let mut handler = gen_test_handler();
833 let db = handler.shared.db();
834
835 db.insert_object(
836 "key1".as_bytes(),
837 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
838 )
839 .await
840 .unwrap();
841 assert!(db.contains_object("key1".as_bytes()).await);
842
843 let exists_res = Exists::test(&["key1"], &mut handler)
845 .await
846 .unwrap()
847 .unwrap();
848 assert_eq!(exists_res.into_integer_unchecked(), 1);
849
850 let _exists_res = Exists::test(&["key_nil"], &mut handler).await.unwrap_err();
852 }
853
854 #[tokio::test]
855 async fn expire_test() {
856 let mut handler = gen_test_handler();
857 let db = handler.shared.db();
858
859 db.insert_object(
860 "key1".as_bytes(),
861 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
862 )
863 .await
864 .unwrap();
865 assert_eq!(
866 db.get_object("key1".as_bytes()).await.unwrap().expire,
867 *NEVER_EXPIRE
868 );
869
870 let expire_res = Expire::test(&["key1", "10"], &mut handler)
872 .await
873 .unwrap()
874 .unwrap();
875 assert_eq!(expire_res.into_integer_unchecked(), 1);
876 assert!(
877 !handler
878 .shared
879 .db()
880 .get_object("key1".as_bytes())
881 .await
882 .unwrap()
883 .is_never_expired()
884 );
885
886 let _expire_res = Expire::test(&["key_nil", "10"], &mut handler)
888 .await
889 .unwrap_err();
890
891 let db = handler.shared.db();
892
893 db.insert_object(
894 "key_with_ex".as_bytes(),
895 Object::with_expire(
896 Str::from("value_with_ex"),
897 Instant::now() + Duration::from_secs(10),
898 ),
899 )
900 .await
901 .unwrap();
902 db.insert_object(
903 "key_without_ex".as_bytes(),
904 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
905 )
906 .await
907 .unwrap();
908
909 let _expire_res = Expire::test(&["key_with_ex", "10", "NX"], &mut handler)
911 .await
912 .unwrap_err();
913
914 let expire_res = Expire::test(&["key_without_ex", "10", "NX"], &mut handler)
915 .await
916 .unwrap()
917 .unwrap();
918 assert_eq!(expire_res.into_integer_unchecked(), 1);
919
920 let db = handler.shared.db();
921
922 db.insert_object(
923 "key_with_ex".as_bytes(),
924 Object::with_expire(
925 Str::from("value_with_ex"),
926 Instant::now() + Duration::from_secs(10),
927 ),
928 )
929 .await
930 .unwrap();
931 db.insert_object(
932 "key_without_ex".as_bytes(),
933 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
934 )
935 .await
936 .unwrap();
937
938 let _expire_res = Expire::test(&["key_with_ex", "10", "NX"], &mut handler)
940 .await
941 .unwrap_err();
942
943 let expire_res = Expire::test(&["key_without_ex", "10", "NX"], &mut handler)
944 .await
945 .unwrap()
946 .unwrap();
947 assert_eq!(expire_res.into_integer_unchecked(), 1);
948
949 let db = handler.shared.db();
950
951 db.insert_object(
952 "key_with_ex".as_bytes(),
953 Object::with_expire(
954 Str::from("value_with_ex"),
955 Instant::now() + Duration::from_secs(10),
956 ),
957 )
958 .await
959 .unwrap();
960 db.insert_object(
961 "key_without_ex".as_bytes(),
962 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
963 )
964 .await
965 .unwrap();
966
967 let _expire_res = Expire::test(&["key_with_ex", "5", "GT"], &mut handler)
969 .await
970 .unwrap_err();
971
972 let expire_res = Expire::test(&["key_with_ex", "20", "GT"], &mut handler)
973 .await
974 .unwrap()
975 .unwrap();
976 assert_eq!(expire_res.into_integer_unchecked(), 1);
977
978 let db = handler.shared.db();
979
980 db.insert_object(
981 "key_with_ex".as_bytes(),
982 Object::with_expire(
983 Str::from("value_with_ex"),
984 Instant::now() + Duration::from_secs(10),
985 ),
986 )
987 .await
988 .unwrap();
989 db.insert_object(
990 "key_without_ex".as_bytes(),
991 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
992 )
993 .await
994 .unwrap();
995
996 let _expire_res = Expire::test(&["key_with_ex", "20", "LT"], &mut handler)
998 .await
999 .unwrap_err();
1000
1001 let expire_res = Expire::test(&["key_with_ex", "5", "LT"], &mut handler)
1002 .await
1003 .unwrap()
1004 .unwrap();
1005 assert_eq!(expire_res.into_integer_unchecked(), 1);
1006 }
1007
1008 #[tokio::test]
1009 async fn expire_at_test() {
1010 let mut handler = gen_test_handler();
1011 let db = handler.shared.db();
1012
1013 db.insert_object(
1014 "key1".as_bytes(),
1015 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1016 )
1017 .await
1018 .unwrap();
1019 assert!(
1020 db.get_object("key1".as_bytes())
1021 .await
1022 .unwrap()
1023 .is_never_expired()
1024 );
1025
1026 let expire_at_res = ExpireAt::test(&["key1", "1893427200"], &mut handler)
1028 .await
1029 .unwrap()
1030 .unwrap();
1031 assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1032 assert!(
1033 !handler
1034 .shared
1035 .db()
1036 .get_object("key1".as_bytes())
1037 .await
1038 .unwrap()
1039 .is_never_expired()
1040 );
1041
1042 let _expire_at_res = ExpireAt::test(&["key_nil", "1893427200"], &mut handler)
1044 .await
1045 .unwrap_err();
1046
1047 let db = handler.shared.db();
1048
1049 db.insert_object(
1050 "key_with_ex".as_bytes(),
1051 Object::with_expire(
1052 Str::from("value_with_ex"),
1053 *UNIX_EPOCH + Duration::from_secs(1893427200),
1054 ),
1055 )
1056 .await
1057 .unwrap();
1058 db.insert_object(
1059 "key_without_ex".as_bytes(),
1060 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1061 )
1062 .await
1063 .unwrap();
1064
1065 let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427200", "NX"], &mut handler)
1067 .await
1068 .unwrap_err();
1069
1070 let expire_at_res = ExpireAt::test(&["key_without_ex", "1893427200", "NX"], &mut handler)
1071 .await
1072 .unwrap()
1073 .unwrap();
1074 assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1075
1076 let db = handler.shared.db();
1077
1078 db.insert_object(
1079 "key_with_ex".as_bytes(),
1080 Object::with_expire(
1081 Str::from("value_with_ex"),
1082 *UNIX_EPOCH + Duration::from_secs(1893427200),
1083 ),
1084 )
1085 .await
1086 .unwrap();
1087 db.insert_object(
1088 "key_without_ex".as_bytes(),
1089 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1090 )
1091 .await
1092 .unwrap();
1093
1094 let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427000", "GT"], &mut handler)
1096 .await
1097 .unwrap_err();
1098
1099 let expire_at_res = ExpireAt::test(&["key_with_ex", "1893427201", "GT"], &mut handler)
1100 .await
1101 .unwrap()
1102 .unwrap();
1103 assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1104
1105 let db = handler.shared.db();
1106
1107 db.insert_object(
1108 "key_with_ex".as_bytes(),
1109 Object::with_expire(
1110 Str::from("value_with_ex"),
1111 *UNIX_EPOCH + Duration::from_secs(1893427200),
1112 ),
1113 )
1114 .await
1115 .unwrap();
1116 db.insert_object(
1117 "key_without_ex".as_bytes(),
1118 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1119 )
1120 .await
1121 .unwrap();
1122
1123 let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427201", "LT"], &mut handler)
1125 .await
1126 .unwrap_err();
1127
1128 let expire_at_res = ExpireAt::test(&["key_with_ex", "1893427000", "LT"], &mut handler)
1129 .await
1130 .unwrap()
1131 .unwrap();
1132 assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1133 }
1134
1135 #[tokio::test]
1136 async fn expire_time_test() {
1137 let mut handler = gen_test_handler();
1138 let db = handler.shared.db();
1139
1140 db.insert_object(
1141 "key1".as_bytes(),
1142 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1143 )
1144 .await
1145 .unwrap();
1146 assert!(
1147 db.get_object("key1".as_bytes())
1148 .await
1149 .unwrap()
1150 .is_never_expired()
1151 );
1152 let expire = Instant::now() + Duration::from_secs(10);
1153 db.insert_object(
1154 "key_with_ex".as_bytes(),
1155 Object::with_expire(Str::from("value_with_ex"), expire),
1156 )
1157 .await
1158 .unwrap();
1159
1160 let _expire_time_res = ExpireTime::test(&["key1"], &mut handler).await.unwrap_err();
1162
1163 let _expire_time_res = ExpireTime::test(&["key_nil"], &mut handler)
1165 .await
1166 .unwrap_err();
1167
1168 let expire_time_res = ExpireTime::test(&["key_with_ex"], &mut handler)
1170 .await
1171 .unwrap()
1172 .unwrap();
1173 assert_eq!(
1174 expire_time_res.into_integer_unchecked(),
1175 expire.duration_since(*UNIX_EPOCH).as_secs() as i128
1176 );
1177 }
1178
1179 #[tokio::test(flavor = "multi_thread")]
1180 async fn keys_test() {
1181 let mut handler = gen_test_handler();
1182 let db = handler.shared.db();
1183
1184 db.insert_object(
1185 "key1".as_bytes(),
1186 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1187 )
1188 .await
1189 .unwrap();
1190 db.insert_object(
1191 "key2".as_bytes(),
1192 Object::with_expire(Str::from("value2"), *NEVER_EXPIRE),
1193 )
1194 .await
1195 .unwrap();
1196 db.insert_object(
1197 "key3".as_bytes(),
1198 Object::with_expire(Str::from("value3"), *NEVER_EXPIRE),
1199 )
1200 .await
1201 .unwrap();
1202 db.insert_object(
1203 "key4".as_bytes(),
1204 Object::with_expire(Str::from("value4"), *NEVER_EXPIRE),
1205 )
1206 .await
1207 .unwrap();
1208
1209 let keys_res = Keys::test(&[".*"], &mut handler).await.unwrap().unwrap();
1210 let result = keys_res.into_array_unchecked();
1211 assert!(
1212 result.contains(&Resp3::new_blob_string("key1"))
1213 && result.contains(&Resp3::new_blob_string("key2"))
1214 && result.contains(&Resp3::new_blob_string("key3"))
1215 && result.contains(&Resp3::new_blob_string("key4"))
1216 );
1217
1218 let keys_res = Keys::test(&["key*"], &mut handler).await.unwrap().unwrap();
1219 let result = keys_res.into_array_unchecked();
1220 assert!(
1221 result.contains(&Resp3::new_blob_string("key1"))
1222 && result.contains(&Resp3::new_blob_string("key2"))
1223 && result.contains(&Resp3::new_blob_string("key3"))
1224 && result.contains(&Resp3::new_blob_string("key4"))
1225 );
1226
1227 let keys_res = Keys::test(&["key1"], &mut handler).await.unwrap().unwrap();
1228 let result = keys_res.into_array_unchecked();
1229 assert!(result.contains(&Resp3::new_blob_string("key1")));
1230 }
1231
1232 #[tokio::test]
1233 async fn persist_test() {
1234 let mut handler = gen_test_handler();
1235 let db = handler.shared.db();
1236
1237 db.insert_object(
1238 "key_with_ex".as_bytes(),
1239 Object::with_expire(
1240 Str::from("value_with_ex"),
1241 Instant::now() + Duration::from_secs(10),
1242 ),
1243 )
1244 .await
1245 .unwrap();
1246 db.insert_object(
1247 "key_without_ex".as_bytes(),
1248 Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1249 )
1250 .await
1251 .unwrap();
1252
1253 let persist_res = Persist::test(&["key_with_ex"], &mut handler)
1255 .await
1256 .unwrap()
1257 .unwrap();
1258 assert_eq!(persist_res.into_integer_unchecked(), 1);
1259 assert!(
1260 handler
1261 .shared
1262 .db()
1263 .get_object("key_with_ex".as_bytes())
1264 .await
1265 .unwrap()
1266 .is_never_expired()
1267 );
1268
1269 let _persist_res = Persist::test(&["key_without_ex"], &mut handler)
1271 .await
1272 .unwrap_err();
1273
1274 let _persist_res = Persist::test(&["key_nil"], &mut handler).await.unwrap_err();
1276 }
1277
1278 #[tokio::test]
1279 async fn pttl_test() {
1280 let mut handler = gen_test_handler();
1281 let db = handler.shared.db();
1282
1283 db.insert_object(
1284 "key1".as_bytes(),
1285 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1286 )
1287 .await
1288 .unwrap();
1289 assert_eq!(
1290 db.get_object("key1".as_bytes()).await.unwrap().expire,
1291 *NEVER_EXPIRE
1292 );
1293 let dur = Duration::from_secs(10);
1294 let expire = Instant::now() + dur;
1295 db.insert_object(
1296 "key_with_ex".as_bytes(),
1297 Object::with_expire(Str::from("value_with_ex"), expire),
1298 )
1299 .await
1300 .unwrap();
1301
1302 let _pttl_res = Pttl::test(&["key1"], &mut handler).await.unwrap_err();
1304
1305 let _pttl_res = Pttl::test(&["key_nil"], &mut handler).await.unwrap_err();
1307
1308 let pttl_res = Pttl::test(&["key_with_ex"], &mut handler)
1310 .await
1311 .unwrap()
1312 .unwrap();
1313 let result = pttl_res.into_integer_unchecked() as u64;
1314 assert!(dur.as_millis() as u64 - result < ALLOWED_DELTA);
1315 }
1316
1317 #[tokio::test]
1318 async fn ttl_test() {
1319 let mut handler = gen_test_handler();
1320 let db = handler.shared.db();
1321
1322 db.insert_object(
1323 "key1".as_bytes(),
1324 Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1325 )
1326 .await
1327 .unwrap();
1328 assert_eq!(
1329 db.entries.get("key1".as_bytes()).unwrap().expire,
1330 *NEVER_EXPIRE
1331 );
1332 let dur = Duration::from_secs(10);
1333 let expire = Instant::now() + dur;
1334 db.insert_object(
1335 "key_with_ex".as_bytes(),
1336 Object::with_expire(Str::from("value_with_ex"), expire),
1337 )
1338 .await
1339 .unwrap();
1340
1341 let _ttl_res = Ttl::test(&["key1"], &mut handler).await.unwrap_err();
1343
1344 let _ttl_res = Ttl::test(&["key_nil"], &mut handler).await.unwrap_err();
1346
1347 let ttl_res = Ttl::test(&["key_with_ex"], &mut handler)
1349 .await
1350 .unwrap()
1351 .unwrap();
1352 let result = ttl_res.into_integer_unchecked() as u64;
1353 assert!(dur.as_secs() - result < ALLOWED_DELTA);
1354 }
1355
1356 #[tokio::test]
1357 async fn type_test() {
1358 let mut handler = gen_test_handler();
1359 let db = handler.shared.db();
1360
1361 db.insert_object(
1362 "key1".as_bytes(),
1363 Object::with_expire(Str::default(), *NEVER_EXPIRE),
1364 )
1365 .await
1366 .unwrap();
1367 db.insert_object(
1368 "key2".as_bytes(),
1369 Object::with_expire(List::default(), *NEVER_EXPIRE),
1370 )
1371 .await
1372 .unwrap();
1373 db.insert_object(
1374 "key3".as_bytes(),
1375 Object::with_expire(Set::default(), *NEVER_EXPIRE),
1376 )
1377 .await
1378 .unwrap();
1379 db.insert_object(
1380 "key4".as_bytes(),
1381 Object::with_expire(Hash::default(), *NEVER_EXPIRE),
1382 )
1383 .await
1384 .unwrap();
1385 db.insert_object(
1386 "key5".as_bytes(),
1387 Object::with_expire(ZSet::default(), *NEVER_EXPIRE),
1388 )
1389 .await
1390 .unwrap();
1391
1392 let typ_res = Type::test(&["key1"], &mut handler).await.unwrap().unwrap();
1394 assert_eq!(typ_res.into_simple_string_unchecked(), "string");
1395
1396 let typ_res = Type::test(&["key2"], &mut handler).await.unwrap().unwrap();
1397 assert_eq!(typ_res.into_simple_string_unchecked(), "list");
1398
1399 let typ_res = Type::test(&["key3"], &mut handler).await.unwrap().unwrap();
1400 assert_eq!(typ_res.into_simple_string_unchecked(), "set");
1401
1402 let typ_res = Type::test(&["key4"], &mut handler).await.unwrap().unwrap();
1403 assert_eq!(typ_res.into_simple_string_unchecked(), "hash");
1404
1405 let typ_res = Type::test(&["key5"], &mut handler).await.unwrap().unwrap();
1406 assert_eq!(typ_res.into_simple_string_unchecked(), "zset");
1407 }
1408}