rutin_server/cmd/commands/
key.rs

1use super::*;
2use crate::{
3    Int,
4    cmd::{CmdExecutor, CmdUnparsed},
5    conf::AccessControl,
6    error::{RutinError, RutinResult},
7    frame::Resp3,
8    persist::rdb::{
9        encode_hash_value, encode_list_value, encode_set_value, encode_str_value, encode_zset_value,
10    },
11    server::{AsyncStream, Handler, NEVER_EXPIRE, UNIX_EPOCH},
12    shared::db::ObjectValueType,
13    util::atoi,
14};
15use bytes::BytesMut;
16use itertools::Itertools;
17use std::{fmt::Debug, slice, time::Duration};
18use tokio::time::Instant;
19use tracing::instrument;
20
21#[derive(Debug)]
22enum Opt {
23    NX, // 要求键无过期时间
24    XX, // 要求键有过期时间
25    GT, // 要求new_expire > 键的过期时间
26    LT, // 要求new_expire < 键的过期时间
27}
28
29impl TryFrom<&[u8]> for Opt {
30    type Error = RutinError;
31
32    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
33        match value {
34            b"NX" => Ok(Opt::NX),
35            b"XX" => Ok(Opt::XX),
36            b"GT" => Ok(Opt::GT),
37            b"LT" => Ok(Opt::LT),
38            _ => Err("ERR invalid option is given".into()),
39        }
40    }
41}
42
43/// 该命令用于在 key 存在时删除 key。
44/// # Reply:
45///
46/// **Integer reply:** the number of keys that were removed.
47#[derive(Debug)]
48pub struct Del<A> {
49    pub keys: Vec<A>,
50}
51
52impl<A> CmdExecutor<A> for Del<A>
53where
54    A: CmdArg,
55{
56    #[instrument(
57        level = "debug",
58        skip(handler),
59        ret(level = "debug"),
60        err(level = "debug")
61    )]
62    async fn execute(
63        self,
64        handler: &mut Handler<impl AsyncStream>,
65    ) -> RutinResult<Option<CheapResp3>> {
66        let db = handler.shared.db();
67        let mut count = 0;
68
69        for key in self.keys {
70            if db.remove_object(&key).await.is_some() {
71                count += 1;
72            }
73        }
74
75        Ok(Some(Resp3::new_integer(count)))
76    }
77
78    fn parse(args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
79        if args.is_empty() {
80            return Err(RutinError::WrongArgNum);
81        }
82
83        let keys = args
84            .map(|k| {
85                if ac.deny_reading_or_writing_key(&k, Self::CATS_FLAG) {
86                    return Err(RutinError::NoPermission);
87                }
88                Ok(k)
89            })
90            .try_collect()?;
91
92        Ok(Del { keys })
93    }
94
95    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
96    where
97        A: 'a,
98    {
99        Some(self.keys.iter())
100    }
101}
102
103/// 序列化给定 key ,并返回被序列化的值。
104/// # Reply:
105///
106/// **Bulk string reply:** the serialized value of the key.
107/// **Null reply:** the key does not exist.
108#[derive(Debug)]
109pub struct Dump<A> {
110    pub key: A,
111}
112
113impl<A> CmdExecutor<A> for Dump<A>
114where
115    A: CmdArg,
116{
117    #[instrument(
118        level = "debug",
119        skip(handler),
120        ret(level = "debug"),
121        err(level = "debug")
122    )]
123    async fn execute(
124        self,
125        handler: &mut Handler<impl AsyncStream>,
126    ) -> RutinResult<Option<CheapResp3>> {
127        let mut buf = BytesMut::with_capacity(1024);
128        handler
129            .shared
130            .db()
131            .visit_object(self.key.as_ref(), |obj| {
132                match obj.typ() {
133                    ObjectValueType::Str => encode_str_value(&mut buf, obj.on_str()?),
134                    ObjectValueType::List => encode_list_value(&mut buf, obj.on_list()?),
135                    ObjectValueType::Set => encode_set_value(&mut buf, obj.on_set()?),
136                    ObjectValueType::Hash => encode_hash_value(&mut buf, obj.on_hash()?),
137                    ObjectValueType::ZSet => encode_zset_value(&mut buf, obj.on_zset()?),
138                }
139
140                Ok(())
141            })
142            .await?;
143
144        Ok(Some(Resp3::new_blob_string(buf.freeze())))
145    }
146
147    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
148        if args.len() != 1 {
149            return Err(RutinError::WrongArgNum);
150        }
151
152        let key = args.next().unwrap();
153        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
154            return Err(RutinError::NoPermission);
155        }
156
157        Ok(Dump { key })
158    }
159
160    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
161    where
162        A: 'a,
163    {
164        Some(slice::from_ref(&self.key).iter())
165    }
166}
167
168/// 检查给定 key 是否存在。
169/// # Reply:
170///
171/// **Integer reply:** the number of keys that exist from those specified as arguments.
172#[derive(Debug)]
173pub struct Exists<A> {
174    pub keys: Vec<A>,
175}
176
177impl<A> CmdExecutor<A> for Exists<A>
178where
179    A: CmdArg,
180{
181    #[instrument(
182        level = "debug",
183        skip(handler),
184        ret(level = "debug"),
185        err(level = "debug")
186    )]
187    async fn execute(
188        self,
189        handler: &mut Handler<impl AsyncStream>,
190    ) -> RutinResult<Option<CheapResp3>> {
191        for key in self.keys {
192            if !handler.shared.db().contains_object(key.as_ref()).await {
193                return Err(RutinError::from(0));
194            }
195        }
196
197        Ok(Some(Resp3::new_integer(1)))
198    }
199
200    fn parse(args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
201        if args.is_empty() {
202            return Err(RutinError::WrongArgNum);
203        }
204
205        let keys = args
206            .map(|k| {
207                if ac.deny_reading_or_writing_key(&k, Self::CATS_FLAG) {
208                    return Err(RutinError::NoPermission);
209                }
210                Ok(k)
211            })
212            .try_collect()?;
213
214        Ok(Exists { keys })
215    }
216
217    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
218    where
219        A: 'a,
220    {
221        Some(self.keys.iter())
222    }
223}
224
225/// 为给定 key 设置过期时间,以秒计。
226/// # Reply:
227///
228/// **Integer reply:** 0 if the timeout was not set; for example, the key doesn't exist, or the operation was skipped because of the provided arguments.
229/// **Integer reply:** 1 if the timeout was set.
230#[derive(Debug)]
231pub struct Expire<A> {
232    key: A,
233    seconds: Duration,
234    opt: Option<Opt>,
235}
236
237impl<A> CmdExecutor<A> for Expire<A>
238where
239    A: CmdArg,
240{
241    #[instrument(
242        level = "debug",
243        skip(handler),
244        ret(level = "debug"),
245        err(level = "debug")
246    )]
247    async fn execute(
248        self,
249        handler: &mut Handler<impl AsyncStream>,
250    ) -> RutinResult<Option<CheapResp3>> {
251        let new_ex = Instant::now() + self.seconds;
252
253        let mut obj = handler.shared.db().object_entry(&self.key).await?;
254
255        let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
256
257        match self.opt {
258            // 无过期时间,则设置
259            Some(Opt::NX) => {
260                if *ex == *NEVER_EXPIRE {
261                    *ex = new_ex;
262                    return Ok(Some(Resp3::new_integer(1)));
263                }
264            }
265            // 有过期时间,则设置
266            Some(Opt::XX) => {
267                if *ex != *NEVER_EXPIRE {
268                    *ex = new_ex;
269                    return Ok(Some(Resp3::new_integer(1)));
270                }
271            }
272            // 过期时间大于给定时间,则设置
273            Some(Opt::GT) => {
274                if new_ex > *ex {
275                    *ex = new_ex;
276                    return Ok(Some(Resp3::new_integer(1)));
277                }
278            }
279            // 过期时间小于给定时间,则设置
280            Some(Opt::LT) => {
281                if new_ex < *ex {
282                    *ex = new_ex;
283                    return Ok(Some(Resp3::new_integer(1)));
284                }
285            }
286            None => {
287                *ex = new_ex;
288                return Ok(Some(Resp3::new_integer(1)));
289            }
290        }
291
292        Err(RutinError::from(0))
293    }
294
295    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
296        if args.len() != 2 && args.len() != 3 {
297            return Err(RutinError::WrongArgNum);
298        }
299
300        let key = args.next().unwrap();
301        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
302            return Err(RutinError::NoPermission);
303        }
304
305        let seconds = Duration::from_secs(atoi(args.next().unwrap().as_ref())?);
306        let opt = match args.next() {
307            Some(b) => Some(Opt::try_from(b.as_ref())?),
308            None => None,
309        };
310
311        Ok(Expire { key, seconds, opt })
312    }
313
314    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
315    where
316        A: 'a,
317    {
318        Some(slice::from_ref(&self.key).iter())
319    }
320}
321
322/// # Reply:
323///
324/// **Integer reply:** 0 if the timeout was not set; for example, the key doesn't exist, or the operation was skipped because of the provided arguments.
325/// **Integer reply:** 1 if the timeout was set.
326#[derive(Debug)]
327pub struct ExpireAt<A> {
328    key: A,
329    timestamp: Instant,
330    opt: Option<Opt>,
331}
332
333impl<A> CmdExecutor<A> for ExpireAt<A>
334where
335    A: CmdArg,
336{
337    #[instrument(
338        level = "debug",
339        skip(handler),
340        ret(level = "debug"),
341        err(level = "debug")
342    )]
343    async fn execute(
344        self,
345        handler: &mut Handler<impl AsyncStream>,
346    ) -> RutinResult<Option<CheapResp3>> {
347        let new_ex = self.timestamp;
348
349        let mut obj = handler.shared.db().object_entry(&self.key).await?;
350
351        let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
352
353        match self.opt {
354            // 无过期时间,则设置
355            Some(Opt::NX) => {
356                if *ex == *NEVER_EXPIRE {
357                    *ex = new_ex;
358                    return Ok(Some(Resp3::new_integer(1)));
359                }
360            }
361            // 有过期时间,则设置
362            Some(Opt::XX) => {
363                if *ex != *NEVER_EXPIRE {
364                    *ex = new_ex;
365                    return Ok(Some(Resp3::new_integer(1)));
366                }
367            }
368            // 过期时间大于给定时间,则设置
369            Some(Opt::GT) => {
370                if new_ex > *ex {
371                    *ex = new_ex;
372                    return Ok(Some(Resp3::new_integer(1)));
373                }
374            }
375            // 过期时间小于给定时间,则设置
376            Some(Opt::LT) => {
377                if new_ex < *ex {
378                    *ex = new_ex;
379                    return Ok(Some(Resp3::new_integer(1)));
380                }
381            }
382            None => {
383                *ex = new_ex;
384                return Ok(Some(Resp3::new_integer(1)));
385            }
386        }
387
388        Err(RutinError::from(0))
389    }
390
391    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
392        if args.len() != 2 && args.len() != 3 {
393            return Err(RutinError::WrongArgNum);
394        }
395
396        let key = args.next().unwrap();
397        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
398            return Err(RutinError::NoPermission);
399        }
400
401        let timestamp = atoi::<u64>(args.next().unwrap().as_ref())?;
402        let timestamp = *UNIX_EPOCH + Duration::from_secs(timestamp);
403        if timestamp <= Instant::now() {
404            return Err("ERR invalid timestamp".into());
405        }
406
407        let opt = match args.next() {
408            Some(b) => Some(Opt::try_from(b.as_ref())?),
409            None => None,
410        };
411
412        Ok(ExpireAt {
413            key,
414            timestamp,
415            opt,
416        })
417    }
418
419    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
420    where
421        A: 'a,
422    {
423        Some(slice::from_ref(&self.key).iter())
424    }
425}
426
427/// # Reply:
428///
429/// **Integer reply:** the expiration Unix timestamp in seconds.
430/// **Integer reply:** -1 if the key exists but has no associated expiration time.
431/// **Integer reply:** -2 if the key does not exist.
432#[derive(Debug)]
433pub struct ExpireTime<A> {
434    pub key: A,
435}
436
437impl<A> CmdExecutor<A> for ExpireTime<A>
438where
439    A: CmdArg,
440{
441    #[instrument(
442        level = "debug",
443        skip(handler),
444        ret(level = "debug"),
445        err(level = "debug")
446    )]
447    async fn execute(
448        self,
449        handler: &mut Handler<impl AsyncStream>,
450    ) -> RutinResult<Option<CheapResp3>> {
451        let ex = handler
452            .shared
453            .db()
454            .get_object_expired(self.key.as_ref())
455            .await
456            .map_err(|_| RutinError::from(-1))?;
457
458        if ex != *NEVER_EXPIRE {
459            Ok(Some(Resp3::new_integer(
460                ex.duration_since(*UNIX_EPOCH).as_secs() as Int,
461            )))
462        } else {
463            // 无过期时间
464            Err((-2).into())
465        }
466    }
467
468    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
469        if args.len() != 1 {
470            return Err(RutinError::WrongArgNum);
471        }
472
473        let key = args.next().unwrap();
474        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
475            return Err(RutinError::NoPermission);
476        }
477
478        Ok(ExpireTime { key })
479    }
480
481    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
482    where
483        A: 'a,
484    {
485        Some(slice::from_ref(&self.key).iter())
486    }
487}
488
489/// # Reply:
490///
491/// **Array reply:** a list of keys matching pattern.
492#[derive(Debug)]
493pub struct Keys<A> {
494    pub pattern: A,
495}
496
497impl<A> CmdExecutor<A> for Keys<A>
498where
499    A: CmdArg,
500{
501    #[instrument(
502        level = "debug",
503        skip(handler),
504        ret(level = "debug"),
505        err(level = "debug")
506    )]
507    async fn execute(
508        self,
509        handler: &mut Handler<impl AsyncStream>,
510    ) -> RutinResult<Option<CheapResp3>> {
511        let shared = handler.shared;
512        // let outbox = handler.context.mailbox.outbox.clone();
513        let re = regex::bytes::Regex::new(std::str::from_utf8(self.pattern.as_ref())?)?;
514
515        // 避免阻塞woker thread
516        // tokio::task::spawn_blocking(move || {
517        let matched_keys = shared
518            .db()
519            .entries
520            .iter()
521            .filter_map(|entry| {
522                re.is_match(entry.key())
523                    .then(|| CheapResp3::new_blob_string(entry.key().clone()))
524            })
525            .collect::<Vec<CheapResp3>>();
526
527        // outbox
528        //     .send(Letter::Resp3(Resp3::new_array(matched_keys)))
529        //     .ok();
530        // });
531
532        Ok(Some(Resp3::new_array(matched_keys)))
533    }
534
535    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
536        if args.len() != 1 {
537            return Err(RutinError::WrongArgNum);
538        }
539
540        let pattern = args.next().unwrap();
541        if ac.deny_reading_or_writing_key(&pattern, Self::CATS_FLAG) {
542            return Err(RutinError::NoPermission);
543        }
544
545        Ok(Keys { pattern })
546    }
547
548    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
549    where
550        A: 'a,
551    {
552        // 由于无法确定匹配的键,所以无法确定哪些键会被读取
553        None::<std::option::Iter<'a, A>>
554    }
555}
556
557/// 移除 key 的过期时间,key 将持久保持。
558/// # Reply:
559///
560/// **Integer reply:** 0 if key does not exist or does not have an associated timeout.
561/// **Integer reply:** 1 if the timeout has been removed.
562#[derive(Debug)]
563pub struct Persist<A> {
564    pub key: A,
565}
566impl<A> CmdExecutor<A> for Persist<A>
567where
568    A: CmdArg,
569{
570    #[instrument(
571        level = "debug",
572        skip(handler),
573        ret(level = "debug"),
574        err(level = "debug")
575    )]
576    async fn execute(
577        self,
578        handler: &mut Handler<impl AsyncStream>,
579    ) -> RutinResult<Option<CheapResp3>> {
580        let mut obj = handler.shared.db().object_entry(self.key.as_ref()).await?;
581        let ex = obj.expire_mut().ok_or_else(|| RutinError::from(0))?;
582
583        if *ex == *NEVER_EXPIRE {
584            return Err(0.into());
585        }
586
587        *ex = *NEVER_EXPIRE;
588
589        Ok(Some(Resp3::new_integer(1)))
590    }
591
592    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
593        if args.len() != 1 {
594            return Err(RutinError::WrongArgNum);
595        }
596
597        let key = args.next().unwrap();
598        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
599            return Err(RutinError::NoPermission);
600        }
601
602        Ok(Persist { key })
603    }
604
605    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
606    where
607        A: 'a,
608    {
609        Some(slice::from_ref(&self.key).iter())
610    }
611}
612
613/// 以毫秒为单位返回 key 的剩余的过期时间。
614/// # Reply:
615///
616/// **Integer reply:** TTL in milliseconds.
617/// **Integer reply:** -1 if the key exists but has no associated expiration.
618/// **Integer reply:** -2 if the key does not exist.
619#[derive(Debug)]
620pub struct Pttl<A> {
621    pub key: A,
622}
623
624impl<A> CmdExecutor<A> for Pttl<A>
625where
626    A: CmdArg,
627{
628    #[instrument(
629        level = "debug",
630        skip(handler),
631        ret(level = "debug"),
632        err(level = "debug")
633    )]
634    async fn execute(
635        self,
636        handler: &mut Handler<impl AsyncStream>,
637    ) -> RutinResult<Option<CheapResp3>> {
638        let ex = handler
639            .shared
640            .db()
641            .get_object_expired(self.key.as_ref())
642            .await
643            .map_err(|_| RutinError::from(-2))?;
644
645        if ex != *NEVER_EXPIRE {
646            let pttl = (ex - Instant::now()).as_millis();
647            Ok(Some(Resp3::new_integer(pttl as Int)))
648        } else {
649            Err((-1).into())
650        }
651    }
652
653    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
654        if args.len() != 1 {
655            return Err(RutinError::WrongArgNum);
656        }
657
658        let key = args.next().unwrap();
659        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
660            return Err(RutinError::NoPermission);
661        }
662
663        Ok(Pttl { key })
664    }
665
666    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
667    where
668        A: 'a,
669    {
670        Some(slice::from_ref(&self.key).iter())
671    }
672}
673
674/// 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)。
675/// # Reply:
676///
677/// **Integer reply:** TTL in seconds.
678/// **Integer reply:** -1 if the key exists but has no associated expiration.
679/// **Integer reply:** -2 if the key does not exist.
680#[derive(Debug)]
681pub struct Ttl<A> {
682    pub key: A,
683}
684
685impl<A> CmdExecutor<A> for Ttl<A>
686where
687    A: CmdArg,
688{
689    #[instrument(
690        level = "debug",
691        skip(handler),
692        ret(level = "debug"),
693        err(level = "debug")
694    )]
695    async fn execute(
696        self,
697        handler: &mut Handler<impl AsyncStream>,
698    ) -> RutinResult<Option<CheapResp3>> {
699        let ex = handler
700            .shared
701            .db()
702            .get_object_expired(self.key.as_ref())
703            .await
704            .map_err(|_| RutinError::from(-2))?;
705
706        if ex != *NEVER_EXPIRE {
707            let ttl = (ex - Instant::now()).as_secs();
708            Ok(Some(Resp3::new_integer(ttl as Int)))
709        } else {
710            Err((-1).into())
711        }
712    }
713
714    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
715        if args.len() != 1 {
716            return Err(RutinError::WrongArgNum);
717        }
718
719        let key = args.next().unwrap();
720        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
721            return Err(RutinError::NoPermission);
722        }
723
724        Ok(Ttl { key })
725    }
726
727    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
728    where
729        A: 'a,
730    {
731        Some(slice::from_ref(&self.key).iter())
732    }
733}
734
735/// 返回 key 所储存的值的类型。
736/// # Reply:
737///
738/// **Simple string reply:** the type of key, or none when key doesn't exist.
739#[derive(Debug)]
740pub struct Type<A> {
741    pub key: A,
742}
743
744impl<A> CmdExecutor<A> for Type<A>
745where
746    A: CmdArg,
747{
748    #[instrument(
749        level = "debug",
750        skip(handler),
751        ret(level = "debug"),
752        err(level = "debug")
753    )]
754    async fn execute(
755        self,
756        handler: &mut Handler<impl AsyncStream>,
757    ) -> RutinResult<Option<CheapResp3>> {
758        let typ = handler
759            .shared
760            .db()
761            .get_object(self.key.as_ref())
762            .await?
763            .type_str();
764
765        Ok(Some(Resp3::new_simple_string(typ)))
766    }
767
768    fn parse(mut args: CmdUnparsed<A>, ac: &AccessControl) -> RutinResult<Self> {
769        if args.len() != 1 {
770            return Err(RutinError::WrongArgNum);
771        }
772
773        let key = args.next().unwrap();
774        if ac.deny_reading_or_writing_key(&key, Self::CATS_FLAG) {
775            return Err(RutinError::NoPermission);
776        }
777
778        Ok(Type { key })
779    }
780
781    fn keys<'a>(&'a self) -> Option<impl Iterator<Item = &'a A>>
782    where
783        A: 'a,
784    {
785        Some(slice::from_ref(&self.key).iter())
786    }
787}
788
789#[cfg(test)]
790mod cmd_key_tests {
791    use super::*;
792    use crate::{
793        server::{NEVER_EXPIRE, UNIX_EPOCH},
794        shared::db::{Hash, List, Object, Set, Str, ZSet},
795        util::gen_test_handler,
796    };
797
798    // 允许的时间误差
799    const ALLOWED_DELTA: u64 = 3;
800
801    #[tokio::test]
802    async fn del_test() {
803        let mut handler = gen_test_handler();
804        let db = handler.shared.db();
805
806        db.insert_object(
807            "key1".as_bytes(),
808            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
809        )
810        .await
811        .unwrap();
812        assert!(db.contains_object("key1".as_bytes()).await);
813
814        // case: 键存在
815        let del_res = Del::test(&["DEL", "key1"], &mut handler)
816            .await
817            .unwrap()
818            .unwrap();
819        assert_eq!(del_res.into_integer_unchecked(), 1);
820        assert!(!handler.shared.db().contains_object("key1".as_bytes()).await);
821
822        // case: 键不存在
823        let del_res = Del::test(&["DEL", "key_nil"], &mut handler)
824            .await
825            .unwrap()
826            .unwrap();
827        assert_eq!(del_res.into_integer_unchecked(), 0);
828    }
829
830    #[tokio::test]
831    async fn exists_test() {
832        let mut handler = gen_test_handler();
833        let db = handler.shared.db();
834
835        db.insert_object(
836            "key1".as_bytes(),
837            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
838        )
839        .await
840        .unwrap();
841        assert!(db.contains_object("key1".as_bytes()).await);
842
843        // case: 键存在
844        let exists_res = Exists::test(&["key1"], &mut handler)
845            .await
846            .unwrap()
847            .unwrap();
848        assert_eq!(exists_res.into_integer_unchecked(), 1);
849
850        // case: 键不存在
851        let _exists_res = Exists::test(&["key_nil"], &mut handler).await.unwrap_err();
852    }
853
854    #[tokio::test]
855    async fn expire_test() {
856        let mut handler = gen_test_handler();
857        let db = handler.shared.db();
858
859        db.insert_object(
860            "key1".as_bytes(),
861            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
862        )
863        .await
864        .unwrap();
865        assert_eq!(
866            db.get_object("key1".as_bytes()).await.unwrap().expire,
867            *NEVER_EXPIRE
868        );
869
870        // case: 键存在,设置过期时间
871        let expire_res = Expire::test(&["key1", "10"], &mut handler)
872            .await
873            .unwrap()
874            .unwrap();
875        assert_eq!(expire_res.into_integer_unchecked(), 1);
876        assert!(
877            !handler
878                .shared
879                .db()
880                .get_object("key1".as_bytes())
881                .await
882                .unwrap()
883                .is_never_expired()
884        );
885
886        // case: 键不存在
887        let _expire_res = Expire::test(&["key_nil", "10"], &mut handler)
888            .await
889            .unwrap_err();
890
891        let db = handler.shared.db();
892
893        db.insert_object(
894            "key_with_ex".as_bytes(),
895            Object::with_expire(
896                Str::from("value_with_ex"),
897                Instant::now() + Duration::from_secs(10),
898            ),
899        )
900        .await
901        .unwrap();
902        db.insert_object(
903            "key_without_ex".as_bytes(),
904            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
905        )
906        .await
907        .unwrap();
908
909        // case: with EX option
910        let _expire_res = Expire::test(&["key_with_ex", "10", "NX"], &mut handler)
911            .await
912            .unwrap_err();
913
914        let expire_res = Expire::test(&["key_without_ex", "10", "NX"], &mut handler)
915            .await
916            .unwrap()
917            .unwrap();
918        assert_eq!(expire_res.into_integer_unchecked(), 1);
919
920        let db = handler.shared.db();
921
922        db.insert_object(
923            "key_with_ex".as_bytes(),
924            Object::with_expire(
925                Str::from("value_with_ex"),
926                Instant::now() + Duration::from_secs(10),
927            ),
928        )
929        .await
930        .unwrap();
931        db.insert_object(
932            "key_without_ex".as_bytes(),
933            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
934        )
935        .await
936        .unwrap();
937
938        // case: with NX option
939        let _expire_res = Expire::test(&["key_with_ex", "10", "NX"], &mut handler)
940            .await
941            .unwrap_err();
942
943        let expire_res = Expire::test(&["key_without_ex", "10", "NX"], &mut handler)
944            .await
945            .unwrap()
946            .unwrap();
947        assert_eq!(expire_res.into_integer_unchecked(), 1);
948
949        let db = handler.shared.db();
950
951        db.insert_object(
952            "key_with_ex".as_bytes(),
953            Object::with_expire(
954                Str::from("value_with_ex"),
955                Instant::now() + Duration::from_secs(10),
956            ),
957        )
958        .await
959        .unwrap();
960        db.insert_object(
961            "key_without_ex".as_bytes(),
962            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
963        )
964        .await
965        .unwrap();
966
967        // case: with GT option
968        let _expire_res = Expire::test(&["key_with_ex", "5", "GT"], &mut handler)
969            .await
970            .unwrap_err();
971
972        let expire_res = Expire::test(&["key_with_ex", "20", "GT"], &mut handler)
973            .await
974            .unwrap()
975            .unwrap();
976        assert_eq!(expire_res.into_integer_unchecked(), 1);
977
978        let db = handler.shared.db();
979
980        db.insert_object(
981            "key_with_ex".as_bytes(),
982            Object::with_expire(
983                Str::from("value_with_ex"),
984                Instant::now() + Duration::from_secs(10),
985            ),
986        )
987        .await
988        .unwrap();
989        db.insert_object(
990            "key_without_ex".as_bytes(),
991            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
992        )
993        .await
994        .unwrap();
995
996        // case: with LT option
997        let _expire_res = Expire::test(&["key_with_ex", "20", "LT"], &mut handler)
998            .await
999            .unwrap_err();
1000
1001        let expire_res = Expire::test(&["key_with_ex", "5", "LT"], &mut handler)
1002            .await
1003            .unwrap()
1004            .unwrap();
1005        assert_eq!(expire_res.into_integer_unchecked(), 1);
1006    }
1007
1008    #[tokio::test]
1009    async fn expire_at_test() {
1010        let mut handler = gen_test_handler();
1011        let db = handler.shared.db();
1012
1013        db.insert_object(
1014            "key1".as_bytes(),
1015            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1016        )
1017        .await
1018        .unwrap();
1019        assert!(
1020            db.get_object("key1".as_bytes())
1021                .await
1022                .unwrap()
1023                .is_never_expired()
1024        );
1025
1026        // case: 键存在,设置过期时间
1027        let expire_at_res = ExpireAt::test(&["key1", "1893427200"], &mut handler)
1028            .await
1029            .unwrap()
1030            .unwrap();
1031        assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1032        assert!(
1033            !handler
1034                .shared
1035                .db()
1036                .get_object("key1".as_bytes())
1037                .await
1038                .unwrap()
1039                .is_never_expired()
1040        );
1041
1042        // case: 键不存在
1043        let _expire_at_res = ExpireAt::test(&["key_nil", "1893427200"], &mut handler)
1044            .await
1045            .unwrap_err();
1046
1047        let db = handler.shared.db();
1048
1049        db.insert_object(
1050            "key_with_ex".as_bytes(),
1051            Object::with_expire(
1052                Str::from("value_with_ex"),
1053                *UNIX_EPOCH + Duration::from_secs(1893427200),
1054            ),
1055        )
1056        .await
1057        .unwrap();
1058        db.insert_object(
1059            "key_without_ex".as_bytes(),
1060            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1061        )
1062        .await
1063        .unwrap();
1064
1065        // case: with NX option
1066        let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427200", "NX"], &mut handler)
1067            .await
1068            .unwrap_err();
1069
1070        let expire_at_res = ExpireAt::test(&["key_without_ex", "1893427200", "NX"], &mut handler)
1071            .await
1072            .unwrap()
1073            .unwrap();
1074        assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1075
1076        let db = handler.shared.db();
1077
1078        db.insert_object(
1079            "key_with_ex".as_bytes(),
1080            Object::with_expire(
1081                Str::from("value_with_ex"),
1082                *UNIX_EPOCH + Duration::from_secs(1893427200),
1083            ),
1084        )
1085        .await
1086        .unwrap();
1087        db.insert_object(
1088            "key_without_ex".as_bytes(),
1089            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1090        )
1091        .await
1092        .unwrap();
1093
1094        // case: with GT option
1095        let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427000", "GT"], &mut handler)
1096            .await
1097            .unwrap_err();
1098
1099        let expire_at_res = ExpireAt::test(&["key_with_ex", "1893427201", "GT"], &mut handler)
1100            .await
1101            .unwrap()
1102            .unwrap();
1103        assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1104
1105        let db = handler.shared.db();
1106
1107        db.insert_object(
1108            "key_with_ex".as_bytes(),
1109            Object::with_expire(
1110                Str::from("value_with_ex"),
1111                *UNIX_EPOCH + Duration::from_secs(1893427200),
1112            ),
1113        )
1114        .await
1115        .unwrap();
1116        db.insert_object(
1117            "key_without_ex".as_bytes(),
1118            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1119        )
1120        .await
1121        .unwrap();
1122
1123        // case: with LT option
1124        let _expire_at_res = ExpireAt::test(&["key_with_ex", "1893427201", "LT"], &mut handler)
1125            .await
1126            .unwrap_err();
1127
1128        let expire_at_res = ExpireAt::test(&["key_with_ex", "1893427000", "LT"], &mut handler)
1129            .await
1130            .unwrap()
1131            .unwrap();
1132        assert_eq!(expire_at_res.into_integer_unchecked(), 1);
1133    }
1134
1135    #[tokio::test]
1136    async fn expire_time_test() {
1137        let mut handler = gen_test_handler();
1138        let db = handler.shared.db();
1139
1140        db.insert_object(
1141            "key1".as_bytes(),
1142            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1143        )
1144        .await
1145        .unwrap();
1146        assert!(
1147            db.get_object("key1".as_bytes())
1148                .await
1149                .unwrap()
1150                .is_never_expired()
1151        );
1152        let expire = Instant::now() + Duration::from_secs(10);
1153        db.insert_object(
1154            "key_with_ex".as_bytes(),
1155            Object::with_expire(Str::from("value_with_ex"), expire),
1156        )
1157        .await
1158        .unwrap();
1159
1160        // case: 键存在,但没有过期时间
1161        let _expire_time_res = ExpireTime::test(&["key1"], &mut handler).await.unwrap_err();
1162
1163        // case: 键不存在
1164        let _expire_time_res = ExpireTime::test(&["key_nil"], &mut handler)
1165            .await
1166            .unwrap_err();
1167
1168        // case: 键存在且有过期时间
1169        let expire_time_res = ExpireTime::test(&["key_with_ex"], &mut handler)
1170            .await
1171            .unwrap()
1172            .unwrap();
1173        assert_eq!(
1174            expire_time_res.into_integer_unchecked(),
1175            expire.duration_since(*UNIX_EPOCH).as_secs() as i128
1176        );
1177    }
1178
1179    #[tokio::test(flavor = "multi_thread")]
1180    async fn keys_test() {
1181        let mut handler = gen_test_handler();
1182        let db = handler.shared.db();
1183
1184        db.insert_object(
1185            "key1".as_bytes(),
1186            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1187        )
1188        .await
1189        .unwrap();
1190        db.insert_object(
1191            "key2".as_bytes(),
1192            Object::with_expire(Str::from("value2"), *NEVER_EXPIRE),
1193        )
1194        .await
1195        .unwrap();
1196        db.insert_object(
1197            "key3".as_bytes(),
1198            Object::with_expire(Str::from("value3"), *NEVER_EXPIRE),
1199        )
1200        .await
1201        .unwrap();
1202        db.insert_object(
1203            "key4".as_bytes(),
1204            Object::with_expire(Str::from("value4"), *NEVER_EXPIRE),
1205        )
1206        .await
1207        .unwrap();
1208
1209        let keys_res = Keys::test(&[".*"], &mut handler).await.unwrap().unwrap();
1210        let result = keys_res.into_array_unchecked();
1211        assert!(
1212            result.contains(&Resp3::new_blob_string("key1"))
1213                && result.contains(&Resp3::new_blob_string("key2"))
1214                && result.contains(&Resp3::new_blob_string("key3"))
1215                && result.contains(&Resp3::new_blob_string("key4"))
1216        );
1217
1218        let keys_res = Keys::test(&["key*"], &mut handler).await.unwrap().unwrap();
1219        let result = keys_res.into_array_unchecked();
1220        assert!(
1221            result.contains(&Resp3::new_blob_string("key1"))
1222                && result.contains(&Resp3::new_blob_string("key2"))
1223                && result.contains(&Resp3::new_blob_string("key3"))
1224                && result.contains(&Resp3::new_blob_string("key4"))
1225        );
1226
1227        let keys_res = Keys::test(&["key1"], &mut handler).await.unwrap().unwrap();
1228        let result = keys_res.into_array_unchecked();
1229        assert!(result.contains(&Resp3::new_blob_string("key1")));
1230    }
1231
1232    #[tokio::test]
1233    async fn persist_test() {
1234        let mut handler = gen_test_handler();
1235        let db = handler.shared.db();
1236
1237        db.insert_object(
1238            "key_with_ex".as_bytes(),
1239            Object::with_expire(
1240                Str::from("value_with_ex"),
1241                Instant::now() + Duration::from_secs(10),
1242            ),
1243        )
1244        .await
1245        .unwrap();
1246        db.insert_object(
1247            "key_without_ex".as_bytes(),
1248            Object::with_expire(Str::from("value_without_ex"), *NEVER_EXPIRE),
1249        )
1250        .await
1251        .unwrap();
1252
1253        // case: 键存在,有过期时间
1254        let persist_res = Persist::test(&["key_with_ex"], &mut handler)
1255            .await
1256            .unwrap()
1257            .unwrap();
1258        assert_eq!(persist_res.into_integer_unchecked(), 1);
1259        assert!(
1260            handler
1261                .shared
1262                .db()
1263                .get_object("key_with_ex".as_bytes())
1264                .await
1265                .unwrap()
1266                .is_never_expired()
1267        );
1268
1269        // case: 键存在,没有过期时间
1270        let _persist_res = Persist::test(&["key_without_ex"], &mut handler)
1271            .await
1272            .unwrap_err();
1273
1274        // case: 键不存在
1275        let _persist_res = Persist::test(&["key_nil"], &mut handler).await.unwrap_err();
1276    }
1277
1278    #[tokio::test]
1279    async fn pttl_test() {
1280        let mut handler = gen_test_handler();
1281        let db = handler.shared.db();
1282
1283        db.insert_object(
1284            "key1".as_bytes(),
1285            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1286        )
1287        .await
1288        .unwrap();
1289        assert_eq!(
1290            db.get_object("key1".as_bytes()).await.unwrap().expire,
1291            *NEVER_EXPIRE
1292        );
1293        let dur = Duration::from_secs(10);
1294        let expire = Instant::now() + dur;
1295        db.insert_object(
1296            "key_with_ex".as_bytes(),
1297            Object::with_expire(Str::from("value_with_ex"), expire),
1298        )
1299        .await
1300        .unwrap();
1301
1302        // case: 键存在,但没有过期时间
1303        let _pttl_res = Pttl::test(&["key1"], &mut handler).await.unwrap_err();
1304
1305        // case: 键不存在
1306        let _pttl_res = Pttl::test(&["key_nil"], &mut handler).await.unwrap_err();
1307
1308        // case: 键存在且有过期时间
1309        let pttl_res = Pttl::test(&["key_with_ex"], &mut handler)
1310            .await
1311            .unwrap()
1312            .unwrap();
1313        let result = pttl_res.into_integer_unchecked() as u64;
1314        assert!(dur.as_millis() as u64 - result < ALLOWED_DELTA);
1315    }
1316
1317    #[tokio::test]
1318    async fn ttl_test() {
1319        let mut handler = gen_test_handler();
1320        let db = handler.shared.db();
1321
1322        db.insert_object(
1323            "key1".as_bytes(),
1324            Object::with_expire(Str::from("value1"), *NEVER_EXPIRE),
1325        )
1326        .await
1327        .unwrap();
1328        assert_eq!(
1329            db.entries.get("key1".as_bytes()).unwrap().expire,
1330            *NEVER_EXPIRE
1331        );
1332        let dur = Duration::from_secs(10);
1333        let expire = Instant::now() + dur;
1334        db.insert_object(
1335            "key_with_ex".as_bytes(),
1336            Object::with_expire(Str::from("value_with_ex"), expire),
1337        )
1338        .await
1339        .unwrap();
1340
1341        // case: 键存在,但没有过期时间
1342        let _ttl_res = Ttl::test(&["key1"], &mut handler).await.unwrap_err();
1343
1344        // case: 键不存在
1345        let _ttl_res = Ttl::test(&["key_nil"], &mut handler).await.unwrap_err();
1346
1347        // case: 键存在且有过期时间
1348        let ttl_res = Ttl::test(&["key_with_ex"], &mut handler)
1349            .await
1350            .unwrap()
1351            .unwrap();
1352        let result = ttl_res.into_integer_unchecked() as u64;
1353        assert!(dur.as_secs() - result < ALLOWED_DELTA);
1354    }
1355
1356    #[tokio::test]
1357    async fn type_test() {
1358        let mut handler = gen_test_handler();
1359        let db = handler.shared.db();
1360
1361        db.insert_object(
1362            "key1".as_bytes(),
1363            Object::with_expire(Str::default(), *NEVER_EXPIRE),
1364        )
1365        .await
1366        .unwrap();
1367        db.insert_object(
1368            "key2".as_bytes(),
1369            Object::with_expire(List::default(), *NEVER_EXPIRE),
1370        )
1371        .await
1372        .unwrap();
1373        db.insert_object(
1374            "key3".as_bytes(),
1375            Object::with_expire(Set::default(), *NEVER_EXPIRE),
1376        )
1377        .await
1378        .unwrap();
1379        db.insert_object(
1380            "key4".as_bytes(),
1381            Object::with_expire(Hash::default(), *NEVER_EXPIRE),
1382        )
1383        .await
1384        .unwrap();
1385        db.insert_object(
1386            "key5".as_bytes(),
1387            Object::with_expire(ZSet::default(), *NEVER_EXPIRE),
1388        )
1389        .await
1390        .unwrap();
1391
1392        // case: 键存在
1393        let typ_res = Type::test(&["key1"], &mut handler).await.unwrap().unwrap();
1394        assert_eq!(typ_res.into_simple_string_unchecked(), "string");
1395
1396        let typ_res = Type::test(&["key2"], &mut handler).await.unwrap().unwrap();
1397        assert_eq!(typ_res.into_simple_string_unchecked(), "list");
1398
1399        let typ_res = Type::test(&["key3"], &mut handler).await.unwrap().unwrap();
1400        assert_eq!(typ_res.into_simple_string_unchecked(), "set");
1401
1402        let typ_res = Type::test(&["key4"], &mut handler).await.unwrap().unwrap();
1403        assert_eq!(typ_res.into_simple_string_unchecked(), "hash");
1404
1405        let typ_res = Type::test(&["key5"], &mut handler).await.unwrap().unwrap();
1406        assert_eq!(typ_res.into_simple_string_unchecked(), "zset");
1407    }
1408}