1#![deny(unsafe_code)]
8
9#[allow(unused_imports)]
10use serde::{de, Deserialize, Serialize};
11
12#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
14mod storage;
15#[cfg(feature = "redis")]
16mod storage_redis;
17#[cfg(feature = "redis-cluster")]
18mod storage_redis_cluster;
19#[cfg(feature = "sled")]
20mod storage_sled;
21
22#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
24pub use storage::{DefaultStorageDB, List, Map, StorageDB, StorageList, StorageMap};
25#[cfg(feature = "redis")]
26pub use storage_redis::{RedisConfig, RedisStorageDB};
27#[cfg(feature = "redis-cluster")]
28pub use storage_redis_cluster::{
29    RedisConfig as RedisClusterConfig, RedisStorageDB as RedisClusterStorageDB,
30};
31#[cfg(feature = "sled")]
32pub use storage_sled::{SledConfig, SledStorageDB};
33
34pub type Result<T> = anyhow::Result<T>;
36
37#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
45pub async fn init_db(cfg: &Config) -> Result<DefaultStorageDB> {
46    match cfg.typ {
47        #[cfg(feature = "sled")]
48        StorageType::Sled => {
49            let db = SledStorageDB::new(cfg.sled.clone()).await?;
50            Ok(DefaultStorageDB::Sled(db))
51        }
52        #[cfg(feature = "redis")]
53        StorageType::Redis => {
54            let db = RedisStorageDB::new(cfg.redis.clone()).await?;
55            Ok(DefaultStorageDB::Redis(db))
56        }
57        #[cfg(feature = "redis-cluster")]
58        StorageType::RedisCluster => {
59            let db = RedisClusterStorageDB::new(cfg.redis_cluster.clone()).await?;
60            Ok(DefaultStorageDB::RedisCluster(db))
61        }
62    }
63}
64
65#[derive(Debug, Clone, Deserialize, Serialize)]
70#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
71pub struct Config {
72    #[serde(alias = "type")]
75    pub typ: StorageType,
76
77    #[serde(default)]
79    #[cfg(feature = "sled")]
80    pub sled: SledConfig,
81
82    #[serde(default)]
84    #[cfg(feature = "redis")]
85    pub redis: RedisConfig,
86
87    #[serde(default, rename = "redis-cluster")]
89    #[cfg(feature = "redis-cluster")]
90    pub redis_cluster: RedisClusterConfig,
91}
92
93#[derive(Debug, Clone, Serialize)]
97#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
98pub enum StorageType {
99    #[cfg(feature = "sled")]
101    Sled,
102    #[cfg(feature = "redis")]
104    Redis,
105    #[cfg(feature = "redis-cluster")]
107    RedisCluster,
108}
109
110#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
112impl<'de> de::Deserialize<'de> for StorageType {
113    #[inline]
114    fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
115    where
116        D: de::Deserializer<'de>,
117    {
118        let t = match (String::deserialize(deserializer)?)
119            .to_ascii_lowercase()
120            .as_str()
121        {
122            #[cfg(feature = "sled")]
123            "sled" => StorageType::Sled,
124            #[cfg(feature = "redis")]
125            "redis" => StorageType::Redis,
126            #[cfg(feature = "redis-cluster")]
127            "redis-cluster" => StorageType::RedisCluster,
128            _ => {
129                return Err(de::Error::custom(
130                    "invalid storage type, expected one of: 'sled', 'redis', 'redis-cluster'",
131                ))
132            }
133        };
134        Ok(t)
135    }
136}
137
138#[allow(dead_code)]
140pub(crate) type TimestampMillis = i64;
141
142#[allow(dead_code)]
146#[inline]
147pub(crate) fn timestamp_millis() -> TimestampMillis {
148    use std::time::{SystemTime, UNIX_EPOCH};
149    SystemTime::now()
150        .duration_since(UNIX_EPOCH)
151        .map(|dur| dur.as_millis() as TimestampMillis)
152        .unwrap_or_else(|_| {
153            let now = chrono::Local::now();
154            now.timestamp_millis() as TimestampMillis
155        })
156}
157
158#[cfg(test)]
159#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))]
160mod tests {
161    use super::storage::*;
162    use super::*;
163    use std::borrow::Cow;
164    use std::time::Duration;
165    use tokio::time::sleep;
166
167    fn get_cfg(name: &str) -> Config {
168        let cfg = Config {
169            typ: {
170                cfg_if::cfg_if! {
171                    if #[cfg(feature = "sled")] {
172                        StorageType::Sled
173                    } else if #[cfg(feature = "redis-cluster")] {
174                        StorageType::RedisCluster
175                    } else if #[cfg(feature = "redis")] {
176                        StorageType::Redis
177                    } else {
178                        compile_error!("No storage backend feature enabled!");
179                    }
180                }
181            },
182            #[cfg(feature = "sled")]
183            sled: SledConfig {
184                path: format!("./.catch/{}", name),
185                cleanup_f: |_db| {},
186                ..Default::default()
187            },
188            #[cfg(feature = "redis")]
189            redis: RedisConfig {
190                url: "redis://127.0.0.1:6379/".into(),
191                prefix: name.to_owned(),
192            },
193            #[cfg(feature = "redis-cluster")]
194            redis_cluster: RedisClusterConfig {
195                urls: [
196                    "redis://127.0.0.1:6380/".into(),
197                    "redis://127.0.0.1:6381/".into(),
198                    "redis://127.0.0.1:6382/".into(),
199                ]
200                .into(),
201                prefix: name.to_owned(),
202            },
203        };
204        cfg
205    }
206
207    #[tokio::main]
208    #[test]
209    #[cfg(feature = "ttl")]
210    async fn test_sled_cleanup() {
211        use super::{SledStorageDB, StorageDB};
212        let cfg = Config {
213            typ: StorageType::Sled,
214            sled: SledConfig {
215                path: format!("./.catch/{}", "sled_cleanup"),
216                cache_capacity: convert::Bytesize::from(1024 * 1024 * 1024 * 3),
217                cleanup_f: move |_db| {
218                    #[cfg(feature = "ttl")]
219                    {
220                        let db = _db.clone();
221                        tokio::spawn(async move {
222                            let limit = 1000;
224                            for i in 0..10 {
225                                println!("{} a start cleanups ...", i,);
226                                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
228                                let mut total_cleanups = 0;
229                                let now = std::time::Instant::now();
230                                println!("{} b start cleanups ...", i,);
231                                loop {
232                                    println!("{} c start cleanups ...", i,);
233                                    let db1 = db.clone();
234                                    let count =
235                                        tokio::task::spawn_blocking(move || db1.cleanup(limit))
236                                            .await
237                                            .unwrap();
238                                    total_cleanups += count;
240                                    println!(
241                                        "{} def_cleanup: {}, total cleanups: {}, cost time: {:?}",
242                                        i,
243                                        count,
244                                        total_cleanups,
245                                        now.elapsed()
246                                    );
247
248                                    if count < limit {
249                                        break;
250                                    }
251                                }
252                                println!(
253                                    "{} total cleanups: {}, cost time: {:?}",
254                                    i,
255                                    total_cleanups,
256                                    now.elapsed()
257                                );
258                            }
259                            println!("&&&&&&&&&& test_sled_cleanup cleanup end. &&&&&&&&&&&");
260                        });
261                    }
262                },
263                ..Default::default()
264            },
265            redis: RedisConfig {
266                url: "redis://127.0.0.1:6379/".into(),
267                prefix: "sled_cleanup".to_owned(),
268            },
269            redis_cluster: RedisClusterConfig {
270                urls: [
271                    "redis://127.0.0.1:6380/".into(),
272                    "redis://127.0.0.1:6381/".into(),
273                    "redis://127.0.0.1:6382/".into(),
274                ]
275                .into(),
276                prefix: "sled_cleanup".to_owned(),
277            },
278        };
279
280        let db = SledStorageDB::new(cfg.sled.clone()).await.unwrap();
281        let max = 3000;
282
283        for i in 0..max {
284            let map = db.map(format!("map_{}", i), None).await.unwrap();
285            map.insert("k_1", &1).await.unwrap();
286            map.insert("k_2", &2).await.unwrap();
287            map.expire(100).await.unwrap();
288        }
289
290        for i in 0..max {
291            let list = db.list(format!("list_{}", i), None).await.unwrap();
292            list.push(&1).await.unwrap();
293            list.push(&2).await.unwrap();
294            list.expire(100).await.unwrap();
295        }
296
297        tokio::time::sleep(Duration::from_millis(120)).await;
298
299        println!(
300            "$$$ db_size: {:?}",
301            db.db_size().await,
302            );
305
306        tokio::time::sleep(Duration::from_secs(3)).await;
307        println!(
308            "$$$ db_size: {:?}",
309            db.db_size().await,
310            );
313    }
314
315    #[tokio::main]
316    #[test]
317    async fn test_stress() {
318        let cfg = get_cfg("stress");
319        let db = init_db(&cfg).await.unwrap();
320        let now = std::time::Instant::now();
321        for i in 0..10_000usize {
322            db.insert(i.to_be_bytes(), &i).await.unwrap();
323        }
324        let k_9999_val = db.get::<_, usize>(9999usize.to_be_bytes()).await.unwrap();
325        println!(
326            "test_stress 9999: {:?}, cost time: {:?}",
327            k_9999_val,
328            now.elapsed()
329        );
330        assert_eq!(k_9999_val, Some(9999));
331
332        let s_m_1 = db.map("s_m_1", None).await.unwrap();
333        s_m_1.clear().await.unwrap();
334        let now = std::time::Instant::now();
335        for i in 0..10_000usize {
336            s_m_1.insert(i.to_be_bytes(), &i).await.unwrap();
337        }
338        #[cfg(feature = "map_len")]
339        assert_eq!(s_m_1.len().await.unwrap(), 10_000);
340        let k_9999_val = s_m_1
341            .get::<_, usize>(9999usize.to_be_bytes())
342            .await
343            .unwrap();
344        println!(
345            "test_stress s_m_1 9999: {:?}, cost time: {:?}",
346            k_9999_val,
347            now.elapsed()
348        );
349        assert_eq!(k_9999_val, Some(9999));
350
351        let s_l_1 = db.list("s_l_1", None).await.unwrap();
352        s_l_1.clear().await.unwrap();
353        let now = std::time::Instant::now();
354        for i in 0..10_000usize {
355            s_l_1.push(&i).await.unwrap();
356        }
357        println!("test_stress s_l_1: {:?}", s_l_1.len().await.unwrap());
358        assert_eq!(s_l_1.len().await.unwrap(), 10_000);
359        let l_9999_val = s_l_1.get_index::<usize>(9999).await.unwrap();
360        println!(
361            "test_stress s_l_1 9999: {:?}, cost time: {:?}",
362            l_9999_val,
363            now.elapsed()
364        );
365        assert_eq!(l_9999_val, Some(9999));
366
367        tokio::time::sleep(Duration::from_secs(1)).await;
368
369        let now = std::time::Instant::now();
370        for i in 0..10_000usize {
371            let s_m = db.map(format!("s_m_{}", i), None).await.unwrap();
372            s_m.insert(i.to_be_bytes(), &i).await.unwrap();
373        }
374        println!("test_stress s_m, cost time: {:?}", now.elapsed());
375
376        let now = std::time::Instant::now();
377        for i in 0..10_000usize {
378            let s_l = db.list(format!("s_l_{}", i), None).await.unwrap();
379            s_l.push(&i).await.unwrap();
380        }
381        println!("test_stress s_l, cost time: {:?}", now.elapsed());
382
383        tokio::time::sleep(Duration::from_secs(3)).await;
384        println!("$$$ test_stress db_size: {:?}", db.db_size().await,);
385    }
386
387    #[cfg(feature = "ttl")]
388    #[tokio::main]
389    #[test]
390    async fn test_expiration_cleaning() {
391        let cfg = get_cfg("expiration_cleaning");
393        let db = init_db(&cfg).await.unwrap();
394        for i in 0..3usize {
395            let key = format!("k_{}", i);
396            db.insert(key.as_bytes(), &format!("v_{}", (i * 10)))
397                .await
398                .unwrap();
399            let res = db.expire(key, 1500).await.unwrap();
400            println!("expire res: {:?}", res);
401        }
402
403        let m_1 = db.map("m_1", None).await.unwrap();
404        m_1.insert("m_k_1", &1).await.unwrap();
405        m_1.insert("m_k_2", &2).await.unwrap();
406        let res = m_1.expire(1500).await.unwrap();
407        println!("m_1 expire res: {:?}", res);
408
409        let l_1 = db.list("l_1", None).await.unwrap();
410        l_1.clear().await.unwrap();
411        l_1.push(&11).await.unwrap();
412        l_1.push(&22).await.unwrap();
413
414        let res = l_1.expire(1500).await.unwrap();
415        println!("l_1 expire res: {:?}", res);
416
417        tokio::time::sleep(Duration::from_millis(1700)).await;
418        let k_0_val = db.get::<_, String>("k_0").await.unwrap();
419        println!("k_0_val: {:?}", k_0_val);
420        assert_eq!(k_0_val, None);
421
422        let m_k_2 = m_1.get::<_, i32>("m_k_2").await.unwrap();
423        println!("m_k_2: {:?}", m_k_2);
424        assert_eq!(m_k_2, None);
425
426        let l_all = l_1.all::<i32>().await.unwrap();
427        println!("l_all: {:?}", l_all);
428        assert_eq!(l_all, Vec::<i32>::new());
429
430        tokio::time::sleep(Duration::from_secs(1)).await;
431    }
432
433    #[tokio::main]
434    #[test]
435    async fn test_db_insert() {
436        let cfg = get_cfg("db_insert");
437
438        let db = init_db(&cfg).await.unwrap();
439        let db_key_1 = b"key_1";
440        let db_key_2 = b"key_2";
441        let db_val_1 = String::from("val_001");
442        let db_val_2 = String::from("val_002");
443        db.insert::<_, String>(db_key_1, &db_val_1).await.unwrap();
444        assert_eq!(
445            db.get::<_, String>(db_key_1).await.unwrap(),
446            Some(db_val_1.clone())
447        );
448        assert_eq!(db.get::<_, String>(db_key_2).await.unwrap(), None);
449
450        db.remove(db_key_1).await.unwrap();
451        assert_eq!(db.get::<_, String>(db_key_1).await.unwrap(), None);
452
453        db.insert::<_, String>(db_key_1, &db_val_1).await.unwrap();
454        db.insert::<_, String>(db_key_2, &db_val_2).await.unwrap();
455
456        assert_eq!(
457            db.get::<_, String>(db_key_1).await.unwrap(),
458            Some(db_val_1.clone())
459        );
460        assert_eq!(
461            db.get::<_, String>(db_key_2).await.unwrap(),
462            Some(db_val_2.clone())
463        );
464        db.remove(db_key_2).await.unwrap();
465        assert_eq!(db.get::<_, String>(db_key_2).await.unwrap(), None);
466
467        assert!(db.contains_key(db_key_1).await.unwrap());
468
469        let map_1 = db.map("map_1", None).await.unwrap();
470        map_1.insert("m_k_1", &100).await.unwrap();
471        assert!(db.map_contains_key("map_1").await.unwrap());
472
473        let map_2 = db.map("map_2", None).await.unwrap();
474        map_2.clear().await.unwrap();
475        println!(
476            "test_db_insert contains_key(map_2) {:?}",
477            db.map_contains_key("map_2").await
478        );
479        assert!(!db.map_contains_key("map_2").await.unwrap());
480
481        let list_1 = db.list("list_1", None).await.unwrap();
482        list_1.clear().await.unwrap();
483        println!(
484            "test_db_insert contains_key(list_1) {:?}",
485            db.list_contains_key("list_1").await
486        );
487        assert!(!db.list_contains_key("list_1").await.unwrap());
488        list_1.push(&20).await.unwrap();
489        assert!(db.list_contains_key("list_1").await.unwrap());
490    }
491
492    #[tokio::main]
493    #[test]
494    async fn test_db_remove() {
495        let cfg = get_cfg("db_remove");
496
497        let db = init_db(&cfg).await.unwrap();
498        let db_key_1 = b"key_11";
499        let db_key_2 = b"key_22";
500        let db_val_1 = String::from("val_001");
501        db.insert::<_, String>(db_key_1, &db_val_1).await.unwrap();
502        assert_eq!(
503            db.get::<_, String>(db_key_1).await.unwrap(),
504            Some(db_val_1.clone())
505        );
506        assert_eq!(db.contains_key(db_key_1).await.unwrap(), true);
507
508        db.remove(db_key_1).await.unwrap();
509        assert_eq!(db.get::<_, String>(db_key_1).await.unwrap(), None);
510        assert_eq!(db.contains_key(db_key_1).await.unwrap(), false);
511
512        let m2 = db.map(db_key_2, None).await.unwrap();
513        m2.clear().await.unwrap();
514        assert_eq!(db.contains_key(db_key_2).await.unwrap(), false);
515        m2.insert("m_k_1", &100).await.unwrap();
516        assert_eq!(db.map_contains_key(db_key_2).await.unwrap(), true);
517        m2.clear().await.unwrap();
518        assert_eq!(db.map_contains_key(db_key_2).await.unwrap(), false);
519        m2.insert("m_k_1", &100).await.unwrap();
520        assert_eq!(db.map_contains_key(db_key_2).await.unwrap(), true);
521        m2.remove("m_k_1").await.unwrap();
522        }
524
525    #[tokio::main]
526    #[test]
527    async fn test_db_contains_key() {
528        let cfg = get_cfg("db_contains_key");
529        let db = init_db(&cfg).await.unwrap();
530        db.remove("test_c_001").await.unwrap();
531        let c_res = db.contains_key("test_c_001").await.unwrap();
532        assert!(!c_res);
533
534        db.insert("test_c_001", &"val_001").await.unwrap();
535        let c_res = db.contains_key("test_c_001").await.unwrap();
536        assert!(c_res);
537
538        let map_001 = db.map("map_001", None).await.unwrap();
539        map_001.clear().await.unwrap();
540        map_001.insert("k1", &1).await.unwrap();
541        assert_eq!(map_001.is_empty().await.unwrap(), false);
542        #[cfg(feature = "map_len")]
543        assert_eq!(map_001.len().await.unwrap(), 1);
544        let c_res = db.map_contains_key("map_001").await.unwrap();
545        assert!(c_res);
546        map_001.clear().await.unwrap();
547        assert_eq!(map_001.is_empty().await.unwrap(), true);
548        #[cfg(feature = "map_len")]
549        assert_eq!(map_001.len().await.unwrap(), 0);
550        let c_res = db.map_contains_key("map_001").await.unwrap();
551        assert!(!c_res);
552
553        let l1 = db.list("list_001", None).await.unwrap();
554        l1.push(&"aa").await.unwrap();
555        l1.push(&"bb").await.unwrap();
556        assert_eq!(l1.is_empty().await.unwrap(), false);
557        let c_res = db.list_contains_key("list_001").await.unwrap();
558        assert!(c_res);
559
560        let map_002 = db.map("map_002", None).await.unwrap();
561        map_002.clear().await.unwrap();
562        #[cfg(feature = "map_len")]
563        println!("test_db_contains_key len: {}", map_002.len().await.unwrap());
564        println!(
565            "test_db_contains_key is_empty: {}",
566            map_002.is_empty().await.unwrap()
567        );
568        let c_res = db.map_contains_key("map_002").await.unwrap();
569        assert!(!c_res);
570        assert_eq!(map_002.is_empty().await.unwrap(), true);
571        #[cfg(feature = "map_len")]
572        assert_eq!(map_002.len().await.unwrap(), 0);
573
574        let list_002 = db.list("list_002", None).await.unwrap();
575        let c_res = db.list_contains_key("list_002").await.unwrap();
576        assert!(!c_res);
577        assert_eq!(list_002.is_empty().await.unwrap(), true);
578    }
579
580    #[tokio::main]
581    #[test]
582    async fn test_db_contains_key2() {
583        let cfg = get_cfg("db_contains_key2");
584        let db = init_db(&cfg).await.unwrap();
585        let max = 10;
586        for i in 0..max {
587            db.insert(format!("key_{}", i), &1).await.unwrap();
588        }
589
590        for i in 0..max {
591            let c_res = db.contains_key(format!("key_{}", i)).await.unwrap();
592            assert!(c_res);
593        }
594
595        for i in 0..max {
596            db.remove(format!("key_{}", i)).await.unwrap();
597        }
598
599        for i in 0..max {
600            let c_res = db.contains_key(format!("key_{}", i)).await.unwrap();
601            assert!(!c_res);
602        }
603    }
604
605    #[cfg(feature = "ttl")]
606    #[tokio::main]
607    #[test]
608    async fn test_db_expire() {
609        let cfg = get_cfg("expire");
610        let db = init_db(&cfg).await.unwrap();
611
612        let res_none = db.ttl("test_k001").await.unwrap();
613        println!("ttl res_none: {:?}", res_none);
614        assert_eq!(res_none, None);
615        db.insert("tkey_001", &10).await.unwrap();
617        let expire_res = db.expire("tkey_001", 1000).await.unwrap();
618        println!("expire_res: {:?}", expire_res);
619        let tkey_001_ttl = db.ttl("tkey_001").await.unwrap();
620        println!("tkey_001_ttl: {:?}", tkey_001_ttl);
621        assert!(tkey_001_ttl.is_some());
622        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
623        db.insert("tkey_001", &20).await.unwrap();
624        let tkey_001_ttl = db.ttl("tkey_001").await.unwrap();
625        println!("tkey_001_ttl: {:?}", tkey_001_ttl);
626        assert!(tkey_001_ttl.is_some() && tkey_001_ttl.unwrap() > 100000);
627
628        db.remove("ttl_key_1").await.unwrap();
630        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap();
631        println!("ttl_001_res: {:?}", ttl_001_res);
632        assert!(ttl_001_res.is_none());
633        db.insert("ttl_key_1", &11).await.unwrap();
634        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap();
635        println!("ttl_001_res: {:?}", ttl_001_res);
636        assert!(ttl_001_res.is_some());
637        let expire_res = db.expire("ttl_key_1", 1 * 1000).await.unwrap();
638        println!("expire_res: {:?}", expire_res);
639        assert_eq!(expire_res, expire_res);
640        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap().unwrap();
641        println!("ttl_001_res: {:?}", ttl_001_res);
642        assert!(ttl_001_res <= 1 * 1000);
643        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
644        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap().unwrap();
645        println!("<500 ttl_001_res: {:?}", ttl_001_res);
646        assert!(ttl_001_res <= 500);
647        db.insert("ttl_key_1", &11).await.unwrap();
648        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap().unwrap();
649        println!("ttl_key_1 ttl_001_res: {:?}", ttl_001_res);
650        assert!(ttl_001_res > 1000);
652        let expire_res = db.expire("ttl_key_1", 300).await.unwrap();
653        println!("expire_res: {:?}", expire_res);
654        assert_eq!(expire_res, expire_res);
655        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap().unwrap();
656        println!("<300 ttl_001_res: {:?}", ttl_001_res);
657        assert!(ttl_001_res > 200 && ttl_001_res <= 300);
658        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
659        let ttl_001_res = db.ttl("ttl_key_1").await.unwrap();
660        println!("None ttl_001_res: {:?}", ttl_001_res);
661        assert!(ttl_001_res.is_none());
662        db.insert("ttl_key_1", &111).await.unwrap();
665        let ttl_key_1_res = db.ttl("ttl_key_1").await.unwrap();
666        println!(
667            "ttl_key_1_res: {:?}",
668            ttl_key_1_res.map(|d| Duration::from_millis(d as u64))
669        );
670        assert!(ttl_key_1_res.is_some());
671
672        db.remove("ttl_key_1").await.unwrap();
673        let ttl_key_1_res = db.ttl("ttl_key_1").await.unwrap();
674        println!("ttl_key_1_res: {:?}", ttl_key_1_res);
675        assert!(ttl_key_1_res.is_none());
676
677        let expire_res = db.expire("ttl_key_1", 1 * 1000).await.unwrap();
678        println!("db expire_res: {:?}", expire_res);
679        assert!(!expire_res);
680
681        db.insert("ttl_key_1", &222).await.unwrap();
682        let ttl_key_1_res = db.ttl("ttl_key_1").await.unwrap();
683        println!(
684            "db ttl_key_1_res: {:?}",
685            ttl_key_1_res.map(|d| Duration::from_millis(d as u64))
686        );
687        assert!(ttl_key_1_res.is_some());
688
689        let expire_res = db.expire("ttl_key_1", 500).await.unwrap();
690        println!("db expire_res: {:?}", expire_res);
691        assert!(expire_res);
692        let ttl_key_1_res = db.ttl("ttl_key_1").await.unwrap();
693        println!(
694            "db ttl_key_1_res: {:?}",
695            ttl_key_1_res.map(|d| Duration::from_millis(d as u64))
696        );
697
698        tokio::time::sleep(std::time::Duration::from_millis(700)).await;
699        assert_eq!(db.get::<_, i32>("ttl_key_1").await.unwrap(), None);
700        assert_eq!(db.contains_key("ttl_key_1").await.unwrap(), false);
701
702        let mut ttl_001 = db.map("ttl_001", None).await.unwrap();
704        ttl_001.clear().await.unwrap();
705        let ttl_001_res_none = db.ttl("ttl_001").await.unwrap();
706        println!(
707            "1 test_db_expire map ttl_001_res_none: {:?}",
708            ttl_001_res_none
709        );
710        assert_eq!(ttl_001_res_none, None);
711
712        ttl_001.insert("k1", &11).await.unwrap();
713        ttl_001.insert("k2", &22).await.unwrap();
714
715        assert_eq!(ttl_001.is_empty().await.unwrap(), false);
716        #[cfg(feature = "map_len")]
717        assert_eq!(ttl_001.len().await.unwrap(), 2);
718        let ttl_001_res = ttl_001.ttl().await.unwrap();
719        println!("2 test_db_expire map ttl_001_res: {:?}", ttl_001_res);
720        assert_eq!(ttl_001_res.is_some(), true);
721
722        let expire_res = ttl_001.expire(1 * 1000).await.unwrap();
723        println!("3 test_db_expire map expire_res: {:?}", expire_res);
724        assert_eq!(expire_res, true);
725
726        let ttl_001_res = ttl_001.ttl().await.unwrap();
727        println!("4 test_db_expire map ttl_001_res: {:?}", ttl_001_res);
728        assert!(ttl_001_res.unwrap() <= 1 * 1000);
729
730        let k1_v = ttl_001.get::<_, i32>("k1").await.unwrap();
731        let k2_v = ttl_001.get::<_, i32>("k2").await.unwrap();
732        println!("test_db_expire k1_v: {:?}", k1_v);
733        println!("test_db_expire k2_v: {:?}", k2_v);
734        assert_eq!(k1_v, Some(11));
735        assert_eq!(k2_v, Some(22));
736
737        tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
738        assert_eq!(db.map_contains_key("ttl_001").await.unwrap(), false);
739        #[cfg(feature = "map_len")]
740        assert_eq!(ttl_001.len().await.unwrap(), 0);
741        assert_eq!(ttl_001.is_empty().await.unwrap(), true);
742        assert_eq!(
743            ttl_001.remove_and_fetch::<_, i32>("k1").await.unwrap(),
744            None
745        );
746        assert!(ttl_001.iter::<i32>().await.unwrap().next().await.is_none());
747        assert!(ttl_001.key_iter().await.unwrap().next().await.is_none());
748
749        let mut vals = Vec::new();
750        let mut iter = ttl_001.prefix_iter::<_, i32>("k").await.unwrap();
751        while let Some(item) = iter.next().await {
752            vals.push(item.unwrap())
753        }
754        drop(iter);
755        println!("Iter vals: {:?}", vals);
756
757        assert!(ttl_001
758            .prefix_iter::<_, i32>("k")
759            .await
760            .unwrap()
761            .next()
762            .await
763            .is_none());
764
765        let k1_v = ttl_001.get::<_, i32>("k1").await.unwrap();
766        let k2_v = ttl_001.get::<_, i32>("k2").await.unwrap();
767        println!("test_db_expire k1_v: {:?}", k1_v);
768        println!("test_db_expire k2_v: {:?}", k2_v);
769        assert_eq!(k1_v, None);
770        assert_eq!(k2_v, None);
771
772        let ttl_001_res = ttl_001.ttl().await.unwrap();
773        println!("ttl_001_res: {:?}", ttl_001_res);
774        assert!(ttl_001_res.is_none());
775        ttl_001.insert("k1", &11).await.unwrap();
776        let ttl_001_res = ttl_001.ttl().await.unwrap();
777        println!("xxxx ttl_001_res: {:?}", ttl_001_res);
778        assert!(ttl_001_res.is_some());
779        let expire_res = ttl_001.expire(1 * 1000).await.unwrap();
780        println!("expire_res: {:?}", expire_res);
781        assert_eq!(expire_res, expire_res);
782        let ttl_001_res = ttl_001.ttl().await.unwrap().unwrap();
783        println!("x0 ttl_001_res: {:?}", ttl_001_res);
784        assert!(ttl_001_res <= 1 * 1000);
785        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
786        let ttl_001_res = ttl_001.ttl().await.unwrap().unwrap();
787        println!("x1 ttl_001_res: {:?}", ttl_001_res);
788        assert!(ttl_001_res <= 500);
789        ttl_001.insert("k1", &11).await.unwrap();
790        let ttl_001_res = ttl_001.ttl().await.unwrap().unwrap();
791        println!("x2 ttl_001_res: {:?}", ttl_001_res);
792        assert!(ttl_001_res <= 500);
793        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
794        ttl_001.insert("k1", &11).await.unwrap();
795        let ttl_001_res = ttl_001.ttl().await.unwrap().unwrap();
796        println!(
797            "x3 ttl_001_res: {:?}  {:?}",
798            ttl_001_res,
799            (TimestampMillis::MAX - ttl_001_res)
800        );
801        assert!(ttl_001_res >= 10000);
802        assert_eq!(db.map_contains_key("ttl_001").await.unwrap(), true);
803
804        let mut l_ttl_001 = db.list("l_ttl_001", None).await.unwrap();
806        l_ttl_001.clear().await.unwrap();
807        let l_ttl_001_res_none = l_ttl_001.ttl().await.unwrap();
808        println!(
809            "1 test_db_expire list l_ttl_001_res_none: {:?}",
810            l_ttl_001_res_none
811        );
812        assert_eq!(db.list_contains_key("l_ttl_001").await.unwrap(), false);
813        assert_eq!(l_ttl_001.is_empty().await.unwrap(), true);
814        assert_eq!(l_ttl_001.len().await.unwrap(), 0);
815        assert_eq!(l_ttl_001_res_none, None);
816
817        l_ttl_001.push(&11).await.unwrap();
818        l_ttl_001.push(&22).await.unwrap();
819        assert_eq!(db.list_contains_key("l_ttl_001").await.unwrap(), true);
820        assert_eq!(l_ttl_001.is_empty().await.unwrap(), false);
821        assert_eq!(l_ttl_001.len().await.unwrap(), 2);
822        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap();
823        println!("2 test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
824        assert_eq!(l_ttl_001_res.is_some(), true);
825
826        let expire_res = l_ttl_001.expire(1 * 1000).await.unwrap();
827        println!("3 test_db_expire list expire_res: {:?}", expire_res);
828        assert_eq!(expire_res, true);
829
830        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap().unwrap();
831        println!("4 test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
832        assert!(l_ttl_001_res <= 1 * 1000);
833
834        let k1_v = l_ttl_001.get_index::<i32>(0).await.unwrap();
835        let k2_v = l_ttl_001.get_index::<i32>(1).await.unwrap();
836        println!("test_db_expire list k1_v: {:?}", k1_v);
837        println!("test_db_expire list k2_v: {:?}", k2_v);
838        assert_eq!(k1_v, Some(11));
839        assert_eq!(k2_v, Some(22));
840
841        tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
842        assert_eq!(db.list_contains_key("l_ttl_001").await.unwrap(), false);
843        assert_eq!(l_ttl_001.len().await.unwrap(), 0);
844        assert_eq!(l_ttl_001.is_empty().await.unwrap(), true);
845        assert_eq!(l_ttl_001.all::<i32>().await.unwrap().len(), 0);
846        assert!(l_ttl_001
847            .iter::<i32>()
848            .await
849            .unwrap()
850            .next()
851            .await
852            .is_none());
853        assert_eq!(l_ttl_001.pop::<i32>().await.unwrap(), None);
854        let k1_v = l_ttl_001.get_index::<i32>(0).await.unwrap();
855        let k2_v = l_ttl_001.get_index::<i32>(1).await.unwrap();
856        println!("test_db_expire list k1_v: {:?}", k1_v);
857        println!("test_db_expire list k2_v: {:?}", k2_v);
858        assert_eq!(k1_v, None);
859        assert_eq!(k2_v, None);
860
861        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap();
862        println!("test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
863        assert!(l_ttl_001_res.is_none());
864        l_ttl_001.push(&11).await.unwrap();
865        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap();
866        println!(
867            "xxxx test_db_expire list l_ttl_001_res: {:?}",
868            l_ttl_001_res
869        );
870        assert!(l_ttl_001_res.is_some());
871        let expire_res = l_ttl_001.expire(1 * 1000).await.unwrap();
872        println!("test_db_expire list expire_res: {:?}", expire_res);
873        assert_eq!(expire_res, expire_res);
874        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap().unwrap();
875        println!("x0 test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
876        assert!(l_ttl_001_res <= 1 * 1000);
877        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
878        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap();
879        println!("x1 test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
880        assert!(l_ttl_001_res.unwrap() <= 500);
881        l_ttl_001.push(&11).await.unwrap();
882
883        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap().unwrap();
884        println!("x2 test_db_expire list l_ttl_001_res: {:?}", l_ttl_001_res);
885        assert!(l_ttl_001_res <= 500);
886
887        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
888        l_ttl_001.push(&11).await.unwrap();
889
890        let l_ttl_001_res = l_ttl_001.ttl().await.unwrap().unwrap();
891        println!(
892            "x3 test_db_expire list l_ttl_001_res: {:?}  {:?}",
893            l_ttl_001_res,
894            (TimestampMillis::MAX - l_ttl_001_res)
895        );
896        assert!(l_ttl_001_res >= 10000);
897    }
898
899    #[tokio::main]
900    #[test]
901    async fn test_map_insert() {
902        let cfg = get_cfg("map_insert");
903        let db = init_db(&cfg).await.unwrap();
904
905        let map001 = db.map("001", None).await.unwrap();
906        map001.clear().await.unwrap();
907        #[cfg(feature = "map_len")]
908        assert_eq!(map001.len().await.unwrap(), 0);
909
910        map001.insert("key_1", &1).await.unwrap();
911        map001.insert("key_2", &2).await.unwrap();
912        #[cfg(feature = "map_len")]
913        assert_eq!(map001.len().await.unwrap(), 2);
914
915        let val = map001.get::<_, i32>("key_1").await.unwrap();
916        println!("test_map_insert val: {:?}", val);
917        assert_eq!(val, Some(1));
918
919        map001.remove::<_>("key_1").await.unwrap();
920        let val = map001.get::<_, i32>("key_1").await.unwrap();
921        println!("test_map_insert val: {:?}", val);
922        assert_eq!(val, None);
923
924        #[cfg(feature = "map_len")]
925        println!("test_map_insert len: {:?}", map001.len().await.unwrap());
926        #[cfg(feature = "map_len")]
927        assert_eq!(map001.len().await.unwrap(), 1);
928    }
929
930    #[tokio::main]
931    #[test]
932    async fn test_map_contains_key() {
933        let cfg = get_cfg("map_contains_key");
934        let db = init_db(&cfg).await.unwrap();
935
936        let map001 = db.map("m001", None).await.unwrap();
937        map001.clear().await.unwrap();
938        #[cfg(feature = "map_len")]
939        assert_eq!(map001.len().await.unwrap(), 0);
940        assert_eq!(map001.contains_key("k001").await.unwrap(), false);
941
942        map001.insert("k001", &"val_001").await.unwrap();
943        assert_eq!(map001.contains_key("k001").await.unwrap(), true);
944
945        map001.remove::<_>("k001").await.unwrap();
946        assert_eq!(map001.contains_key("k001").await.unwrap(), false);
947        #[cfg(feature = "map_len")]
948        assert_eq!(map001.len().await.unwrap(), 0);
949    }
950
951    #[tokio::main]
952    #[test]
953    async fn test_map() {
954        let cfg = get_cfg("map");
955        let db = init_db(&cfg).await.unwrap();
956
957        let kv001 = db.map("tree_kv001", None).await.unwrap();
958        let kv_key_1 = b"kv_key_1";
959
960        kv001.clear().await.unwrap();
961        #[cfg(feature = "map_len")]
962        assert_eq!(kv001.len().await.unwrap(), 0);
963
964        let kv_val_1 = String::from("kv_val_001");
965        kv001
966            .insert::<_, String>(kv_key_1, &kv_val_1)
967            .await
968            .unwrap();
969        assert_eq!(
970            kv001.get::<_, String>(kv_key_1).await.unwrap(),
971            Some(kv_val_1.clone())
972        );
973        assert_eq!(kv001.get::<_, String>(b"kv_key_2").await.unwrap(), None);
974        #[cfg(feature = "map_len")]
975        assert_eq!(kv001.len().await.unwrap(), 1);
976        assert_eq!(kv001.is_empty().await.unwrap(), false);
977
978        assert!(kv001.contains_key(kv_key_1).await.unwrap());
979
980        kv001.remove(kv_key_1).await.unwrap();
981        assert_eq!(kv001.get::<_, String>(kv_key_1).await.unwrap(), None);
982        assert!(!kv001.contains_key(kv_key_1).await.unwrap());
983        #[cfg(feature = "map_len")]
984        assert_eq!(kv001.len().await.unwrap(), 0);
985        assert_eq!(kv001.is_empty().await.unwrap(), true);
986
987        assert_eq!(
988            kv001.remove_and_fetch::<_, String>(kv_key_1).await.unwrap(),
989            None
990        );
991        kv001
992            .insert::<_, String>(kv_key_1, &kv_val_1)
993            .await
994            .unwrap();
995        assert_eq!(
996            kv001.remove_and_fetch::<_, String>(kv_key_1).await.unwrap(),
997            Some(kv_val_1)
998        );
999        assert_eq!(
1000            kv001.remove_and_fetch::<_, String>(kv_key_1).await.unwrap(),
1001            None
1002        );
1003
1004        kv001.insert(b"kv_key_3", "3").await.unwrap();
1005        kv001.insert(b"kv_key_4", "4").await.unwrap();
1006        kv001.insert(b"kv_key_5", "5").await.unwrap();
1007        kv001.insert(b"kv_key_6", "6").await.unwrap();
1008        #[cfg(feature = "map_len")]
1009        assert_eq!(kv001.len().await.unwrap(), 4);
1010        kv001.remove_with_prefix("kv_key_").await.unwrap();
1011        #[cfg(feature = "map_len")]
1012        assert_eq!(kv001.len().await.unwrap(), 0);
1013        assert_eq!(kv001.is_empty().await.unwrap(), true);
1014    }
1015
1016    #[tokio::main]
1017    #[test]
1018    async fn test_map_iter2() {
1019        let cfg = get_cfg("map_iter2");
1020        let mut db = init_db(&cfg).await.unwrap();
1021
1022        let mut map_iter = db.map_iter().await.unwrap();
1023        while let Some(map) = map_iter.next().await {
1024            let map = map.unwrap();
1025            map.clear().await.unwrap();
1026        }
1027
1028        drop(map_iter);
1029
1030        let max = 10;
1031
1032        for i in 0..max {
1033            let map1 = db.map(format!("map-{}", i), None).await.unwrap();
1034            map1.insert(format!("map-{}-data", i), &i).await.unwrap();
1035        }
1036
1037        let mut map_iter = db.map_iter().await.unwrap();
1038
1039        let mut count = 0;
1041        while let Some(map) = map_iter.next().await {
1042            let mut map = map.unwrap();
1043            let mut iter = map.iter::<i32>().await.unwrap();
1044            while let Some(item) = iter.next().await {
1045                let (key, val) = item.unwrap();
1046                println!("key: {:?}, val: {:?}", String::from_utf8_lossy(&key), val);
1047            }
1048            count += 1;
1049        }
1050
1051        println!("max: {:?}, count: {:?}", max, count);
1052        assert_eq!(max, count);
1053    }
1054
1055    #[tokio::main]
1108    #[test]
1109    async fn test_batch() {
1110        let cfg = get_cfg("batch");
1111        let db = init_db(&cfg).await.unwrap();
1112
1113        let skv = db.map("batch_kv001", None).await.unwrap();
1114
1115        let mut kvs = Vec::new();
1116        for i in 0..100 {
1117            kvs.push((format!("key_{}", i).as_bytes().to_vec(), i));
1118        }
1119        skv.batch_insert(kvs.clone()).await.unwrap();
1120        #[cfg(feature = "map_len")]
1121        assert_eq!(skv.len().await.unwrap(), 100);
1122
1123        let mut ks = Vec::new();
1124        for i in 0..50 {
1125            ks.push(format!("key_{}", i).as_bytes().to_vec());
1126        }
1127        skv.batch_remove(ks).await.unwrap();
1128        #[cfg(feature = "map_len")]
1129        assert_eq!(skv.len().await.unwrap(), 50);
1130    }
1131
1132    #[tokio::main]
1133    #[test]
1134    async fn test_iter() {
1135        let cfg = get_cfg("iter");
1136        let db = init_db(&cfg).await.unwrap();
1137
1138        let mut skv = db.map("iter_kv002", None).await.unwrap();
1139        skv.clear().await.unwrap();
1140
1141        for i in 0..10 {
1142            skv.insert::<_, i32>(format!("key_{}", i), &i)
1143                .await
1144                .unwrap();
1145        }
1146
1147        let mut vals = Vec::new();
1148        let mut iter = skv.iter::<i32>().await.unwrap();
1149        while let Some(item) = iter.next().await {
1150            vals.push(item.unwrap())
1151        }
1152        drop(iter);
1153
1154        assert_eq!(
1155            vals,
1156            vec![
1157                (b"key_0".to_vec(), 0),
1158                (b"key_1".to_vec(), 1),
1159                (b"key_2".to_vec(), 2),
1160                (b"key_3".to_vec(), 3),
1161                (b"key_4".to_vec(), 4),
1162                (b"key_5".to_vec(), 5),
1163                (b"key_6".to_vec(), 6),
1164                (b"key_7".to_vec(), 7),
1165                (b"key_8".to_vec(), 8),
1166                (b"key_9".to_vec(), 9),
1167            ]
1168        );
1169
1170        let mut keys = Vec::new();
1171        let mut key_iter = skv.key_iter().await.unwrap();
1172        while let Some(item) = key_iter.next().await {
1173            keys.push(String::from_utf8(item.unwrap()).unwrap())
1174        }
1175        drop(key_iter);
1176
1177        assert_eq!(
1178            keys,
1179            vec![
1180                "key_0", "key_1", "key_2", "key_3", "key_4", "key_5", "key_6", "key_7", "key_8",
1181                "key_9"
1182            ]
1183        );
1184
1185        for i in 0..5 {
1186            skv.insert::<_, i32>(format!("key2_{}", i), &i)
1187                .await
1188                .unwrap();
1189        }
1190
1191        let mut vals = Vec::new();
1192        let mut prefix_iter = skv.prefix_iter::<_, i32>("key2_").await.unwrap();
1193        while let Some(item) = prefix_iter.next().await {
1194            vals.push(item.unwrap())
1195        }
1196
1197        assert_eq!(
1198            vals,
1199            vec![
1200                (b"key2_0".to_vec(), 0),
1201                (b"key2_1".to_vec(), 1),
1202                (b"key2_2".to_vec(), 2),
1203                (b"key2_3".to_vec(), 3),
1204                (b"key2_4".to_vec(), 4)
1205            ]
1206        );
1207    }
1208
1209    #[tokio::main]
1210    #[test]
1211    async fn test_list() {
1212        let cfg = get_cfg("array");
1213        let db = init_db(&cfg).await.unwrap();
1214
1215        let array_a = db.list("array_a", None).await.unwrap();
1216        let array_b = db.list("array_b", None).await.unwrap();
1217        let mut array_c = db.list("array_c", None).await.unwrap();
1218
1219        array_a.clear().await.unwrap();
1220        array_b.clear().await.unwrap();
1221        array_c.clear().await.unwrap();
1222
1223        db.insert("key_001", &1).await.unwrap();
1224        db.insert("key_002", &2).await.unwrap();
1225        db.insert("key_003", &3).await.unwrap();
1226
1227        for i in 0..5 {
1228            array_a.push(&i).await.unwrap();
1229        }
1230        assert_eq!(array_a.len().await.unwrap(), 5);
1231
1232        let vals = array_a.all::<i32>().await.unwrap();
1233        assert_eq!(vals.len(), 5);
1234        assert_eq!(vals, vec![0, 1, 2, 3, 4]);
1235
1236        let val_1 = array_a.get_index::<i32>(1).await.unwrap();
1237        assert_eq!(val_1, Some(1));
1238
1239        let val_0 = array_a.pop::<i32>().await.unwrap();
1240        assert_eq!(val_0, Some(0));
1241
1242        let val_1 = array_a.pop::<i32>().await.unwrap();
1243        assert_eq!(val_1, Some(1));
1244
1245        let vals = array_a.all::<i32>().await.unwrap();
1246        assert_eq!(vals.len(), 3);
1247        assert_eq!(vals, vec![2, 3, 4]);
1248
1249        for i in 0..20 {
1250            array_a.push_limit(&i, 5, true).await.unwrap();
1251        }
1252        assert_eq!(array_a.len().await.unwrap(), 5);
1253        let vals = array_a.all::<i32>().await.unwrap();
1254        assert_eq!(vals.len(), 5);
1255        assert_eq!(vals, vec![15, 16, 17, 18, 19]);
1256
1257        for i in 0..4 {
1258            array_b.push(&i).await.unwrap();
1259        }
1260
1261        for i in 0..3 {
1262            array_c.push(&i).await.unwrap();
1263        }
1264
1265        println!("array_c.len(): {}", array_c.len().await.unwrap());
1266
1267        let mut vals = Vec::new();
1268        let mut iter = array_c.iter::<i32>().await.unwrap();
1269        while let Some(val) = iter.next().await {
1270            let val = val.unwrap();
1271            vals.push(val);
1272            println!("array_c iter val: {}", val);
1273        }
1274        assert_eq!(vals, vec![0, 1, 2]);
1275    }
1276
1277    #[tokio::main]
1278    #[test]
1279    async fn test_list2() {
1280        let cfg = get_cfg("map_list");
1281        let db = init_db(&cfg).await.unwrap();
1282
1283        let ml001 = db.map("m_l_001", None).await.unwrap();
1284        ml001.clear().await.unwrap();
1285        #[cfg(feature = "map_len")]
1286        assert_eq!(ml001.len().await.unwrap(), 0);
1287        assert_eq!(ml001.is_empty().await.unwrap(), true);
1288
1289        let mut l001 = db.list("l_001", None).await.unwrap();
1290        l001.clear().await.unwrap();
1291        assert_eq!(l001.len().await.unwrap(), 0);
1292        assert_eq!(l001.is_empty().await.unwrap(), true);
1293        l001.push(&100).await.unwrap();
1294        l001.push(&101).await.unwrap();
1295        assert_eq!(l001.len().await.unwrap(), 2);
1296        assert_eq!(l001.is_empty().await.unwrap(), false);
1297
1298        for v in 100..200 {
1299            l001.push_limit(&v, 5, true).await.unwrap();
1300        }
1301        assert_eq!(l001.len().await.unwrap(), 5);
1302        assert_eq!(l001.is_empty().await.unwrap(), false);
1303
1304        let mut iter = l001.iter::<i32>().await.unwrap();
1305        let mut vals = Vec::new();
1306        while let Some(val) = iter.next().await {
1307            let val = val.unwrap();
1308            vals.push(val);
1309        }
1310        drop(iter);
1311        assert_eq!(vals, [195, 196, 197, 198, 199]);
1312
1313        assert_eq!(l001.all::<i32>().await.unwrap(), [195, 196, 197, 198, 199]);
1314
1315        assert_eq!(l001.get_index(0).await.unwrap(), Some(195));
1316        assert_eq!(l001.get_index(2).await.unwrap(), Some(197));
1317        assert_eq!(l001.get_index(4).await.unwrap(), Some(199));
1318        assert!(l001.get_index::<i32>(5).await.unwrap().is_none());
1319
1320        let mut pops = Vec::new();
1321        while let Some(item) = l001.pop::<i32>().await.unwrap() {
1322            println!("list pop item: {:?}", item);
1323            pops.push(item);
1324        }
1325        assert_eq!(pops, [195, 196, 197, 198, 199]);
1326
1327        assert_eq!(l001.len().await.unwrap(), 0);
1328        assert_eq!(l001.is_empty().await.unwrap(), true);
1329
1330        for v in 10..20 {
1331            l001.push_limit(&v, 5, true).await.unwrap();
1332        }
1333
1334        let l002 = db.list("l_002", None).await.unwrap();
1335        for v in 20..30 {
1336            l002.push_limit(&v, 5, true).await.unwrap();
1337        }
1338
1339        assert_eq!(l001.all::<i32>().await.unwrap(), [15, 16, 17, 18, 19]);
1340        assert_eq!(l002.all::<i32>().await.unwrap(), [25, 26, 27, 28, 29]);
1341
1342        assert_eq!(l001.len().await.unwrap(), 5);
1343        assert_eq!(l001.is_empty().await.unwrap(), false);
1344
1345        assert_eq!(l002.len().await.unwrap(), 5);
1346        assert_eq!(l002.is_empty().await.unwrap(), false);
1347
1348        l001.clear().await.unwrap();
1349        assert_eq!(l001.len().await.unwrap(), 0);
1350        assert_eq!(l001.is_empty().await.unwrap(), true);
1351
1352        l002.clear().await.unwrap();
1353        assert_eq!(l002.len().await.unwrap(), 0);
1354        assert_eq!(l002.is_empty().await.unwrap(), true);
1355    }
1356
1357    #[tokio::main]
1358    #[test]
1359    async fn test_list_iter() {
1360        let cfg = get_cfg("list_iter");
1361        let mut db = init_db(&cfg).await.unwrap();
1362
1363        let l1 = db.list("l1", None).await.unwrap();
1364        let l2 = db.list("l2", None).await.unwrap();
1365        let l3 = db.list("l3", None).await.unwrap();
1366        l1.clear().await.unwrap();
1367        l2.clear().await.unwrap();
1368        l3.clear().await.unwrap();
1369
1370        l1.push(&1).await.unwrap();
1371        l2.push(&1).await.unwrap();
1372        l2.push(&2).await.unwrap();
1373        l3.push(&1).await.unwrap();
1374        l3.push(&2).await.unwrap();
1375        l3.push(&3).await.unwrap();
1376
1377        let mut iter = db.list_iter().await.unwrap();
1378        while let Some(l) = iter.next().await {
1379            let l = l.unwrap();
1380            let name = String::from_utf8(l.name().to_vec());
1381            println!("list name: {:?}, len: {:?}", name, l.len().await);
1382            let len = l.len().await.unwrap();
1383            assert!(len == 1 || len == 2 || len == 3);
1384        }
1385    }
1386
1387    #[tokio::main]
1388    #[test]
1389    async fn test_list_iter2() {
1390        let cfg = get_cfg("list_iter2");
1391        let mut db = init_db(&cfg).await.unwrap();
1392
1393        let mut list_iter = db.list_iter().await.unwrap();
1394        while let Some(list) = list_iter.next().await {
1395            let list = list.unwrap();
1396            list.clear().await.unwrap();
1397        }
1398
1399        drop(list_iter);
1400
1401        let max = 10;
1402
1403        for i in 0..max {
1404            let list1 = db.list(format!("list-{}", i), None).await.unwrap();
1405            list1.push(&i).await.unwrap();
1406        }
1407
1408        let mut list_iter = db.list_iter().await.unwrap();
1409
1410        let mut count = 0;
1411        while let Some(list) = list_iter.next().await {
1412            let mut list = list.unwrap();
1413            let mut iter = list.iter::<i32>().await.unwrap();
1414            while let Some(item) = iter.next().await {
1415                let val = item.unwrap();
1416                println!("val: {:?}", val);
1417            }
1418            count += 1;
1419        }
1420
1421        println!("max: {:?}, count: {:?}", max, count);
1422        assert_eq!(max, count);
1423    }
1424
1425    #[tokio::main]
1426    #[test]
1427    async fn test_map_iter() {
1428        let cfg = get_cfg("async_map_iter");
1429        let mut db = init_db(&cfg).await.unwrap();
1430
1431        let m1 = db.map("m1", None).await.unwrap();
1432        let m2 = db.map("m2", None).await.unwrap();
1433        let m3 = db.map("m3", None).await.unwrap();
1434
1435        m1.insert("k1", &1).await.unwrap();
1436        m2.insert("k1", &1).await.unwrap();
1437        m2.insert("k2", &2).await.unwrap();
1438        m3.insert("k1", &1).await.unwrap();
1439        m3.insert("k2", &2).await.unwrap();
1440        m3.insert("k3", &3).await.unwrap();
1441
1442        let mut iter = db.map_iter().await.unwrap();
1443        let mut map_names = Vec::new();
1444        while let Some(m) = iter.next().await {
1445            let m = m.unwrap();
1446            map_names.push(String::from_utf8(m.name().to_vec()).unwrap());
1447            let name = String::from_utf8(m.name().to_vec());
1448            println!("map name: {:?}", name);
1449            #[cfg(feature = "map_len")]
1450            {
1451                let len = m.len().await.unwrap();
1452                println!("map len: {:?}", len);
1453                assert!(len == 1 || len == 2 || len == 3);
1454            }
1455        }
1456        for name in map_names.iter() {
1457            assert!(vec!["m1", "m2", "m3"].contains(&name.as_str()));
1458        }
1459    }
1460
1461    #[tokio::main]
1462    #[test]
1463    async fn test_counter() {
1464        let cfg = get_cfg("incr");
1465        let db = init_db(&cfg).await.unwrap();
1466
1467        db.remove("incr1").await.unwrap();
1468        db.remove("incr2").await.unwrap();
1469        db.remove("incr3").await.unwrap();
1470
1471        db.counter_incr("incr1", 3).await.unwrap();
1472        db.counter_incr("incr2", -3).await.unwrap();
1473        db.counter_incr("incr3", 10).await.unwrap();
1474
1475        assert_eq!(db.counter_get("incr1").await.unwrap(), Some(3));
1476        assert_eq!(db.counter_get("incr2").await.unwrap(), Some(-3));
1477        assert_eq!(db.counter_get("incr3").await.unwrap(), Some(10));
1478
1479        db.counter_decr("incr3", 2).await.unwrap();
1480        assert_eq!(db.counter_get("incr3").await.unwrap(), Some(8));
1481
1482        db.counter_decr("incr3", -3).await.unwrap();
1483        assert_eq!(db.counter_get("incr3").await.unwrap(), Some(11));
1484
1485        db.counter_set("incr3", 100).await.unwrap();
1486        assert_eq!(db.counter_get("incr3").await.unwrap(), Some(100));
1487
1488        db.counter_incr("incr3", 10).await.unwrap();
1489        assert_eq!(db.counter_get("incr3").await.unwrap(), Some(110));
1490
1491        assert_eq!(db.counter_get("incr4").await.unwrap(), None);
1492    }
1493
1494    #[tokio::main]
1495    #[test]
1496    async fn test_db_batch() {
1497        let cfg = get_cfg("db_batch_insert");
1498        let db = init_db(&cfg).await.unwrap();
1499
1500        let mut key_vals = Vec::new();
1501        for i in 0..100 {
1502            key_vals.push((format!("key_{}", i).as_bytes().to_vec(), i));
1503        }
1504
1505        db.batch_insert(key_vals).await.unwrap();
1506
1507        assert_eq!(db.get("key_99").await.unwrap(), Some(99));
1508        assert_eq!(db.get::<_, usize>("key_100").await.unwrap(), None);
1509
1510        let mut keys = Vec::new();
1511        for i in 0..50 {
1512            keys.push(format!("key_{}", i).as_bytes().to_vec());
1513        }
1514        db.batch_remove(keys).await.unwrap();
1515
1516        assert_eq!(db.get::<_, usize>("key_0").await.unwrap(), None);
1517        assert_eq!(db.get::<_, usize>("key_49").await.unwrap(), None);
1518        assert_eq!(db.get("key_50").await.unwrap(), Some(50));
1519
1520        let mut keys = Vec::new();
1521        for i in 50..100 {
1522            keys.push(format!("key_{}", i).as_bytes().to_vec());
1523        }
1524        db.batch_remove(keys).await.unwrap();
1525    }
1526
1527    #[tokio::main]
1528    #[test]
1529    async fn test_list_pushs() {
1530        let cfg = get_cfg("list_pushs");
1531        let db = init_db(&cfg).await.unwrap();
1532        let l11 = db.list("l11", None).await.unwrap();
1533        l11.clear().await.unwrap();
1534        let mut vals = Vec::new();
1535        for i in 0..10 {
1536            vals.push(i);
1537        }
1538        l11.pushs(vals).await.unwrap();
1539        assert_eq!(l11.len().await.unwrap(), 10);
1540        println!("{:?}", l11.all::<i32>().await.unwrap());
1541        assert_eq!(
1542            l11.all::<i32>().await.unwrap(),
1543            vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1544        );
1545
1546        let mut vals = Vec::new();
1547        for i in 20..25 {
1548            vals.push(i);
1549        }
1550        l11.pushs(vals).await.unwrap();
1551        assert_eq!(l11.len().await.unwrap(), 15);
1552        println!("{:?}", l11.all::<i32>().await.unwrap());
1553        assert_eq!(
1554            l11.all::<i32>().await.unwrap(),
1555            vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24]
1556        );
1557    }
1558
1559    #[tokio::main]
1560    #[test]
1561    async fn test_list_pop() {
1562        let cfg = get_cfg("list_pop");
1563        let db = init_db(&cfg).await.unwrap();
1564        let l11 = db.list("l11", None).await.unwrap();
1565        l11.clear().await.unwrap();
1566        for i in 0..10 {
1567            l11.push(&i).await.unwrap();
1568        }
1569        println!("{:?}", l11.all::<i32>().await.unwrap());
1570        println!(
1571            "l11.get_index(): {:?}",
1572            l11.get_index::<i32>(0).await.unwrap()
1573        );
1574        println!("l11.pop(): {:?}", l11.pop::<i32>().await.unwrap());
1575        println!("l11.pop(): {:?}", l11.pop::<i32>().await.unwrap());
1576        println!(
1577            "all: {:?}, len: {:?}",
1578            l11.all::<i32>().await.unwrap(),
1579            l11.len().await
1580        );
1581        assert_eq!(l11.len().await.unwrap(), 8);
1582        assert_eq!(
1583            l11.all::<i32>().await.unwrap(),
1584            vec![2, 3, 4, 5, 6, 7, 8, 9]
1585        );
1586
1587        }
1627
1628    #[tokio::main]
1629    #[test]
1630    async fn test_session_iter() {
1631        let cfg = get_cfg("session");
1632        let mut db = init_db(&cfg).await.unwrap();
1633        let now = std::time::Instant::now();
1634        let mut iter = db.map_iter().await.unwrap();
1635        let mut count = 0;
1636        while let Some(m) = iter.next().await {
1637            let _m = m.unwrap();
1638            count += 1;
1640        }
1641        println!("count: {}, cost time: {:?}", count, now.elapsed());
1642    }
1643
1644    #[tokio::main]
1645    #[allow(dead_code)]
1646    async fn test_map_expire() {
1648        let cfg = get_cfg("map_expire");
1649        let db = init_db(&cfg).await.unwrap();
1650
1651        #[cfg(feature = "ttl")]
1652        #[cfg(feature = "map_len")]
1653        {
1654            let map1 = db.map("map1", Some(1000)).await.unwrap();
1655            println!("ttl: {:?}", map1.ttl().await.unwrap());
1656            map1.insert("k1", &1).await.unwrap();
1657            map1.insert("k2", &2).await.unwrap();
1658            println!("ttl: {:?}", map1.ttl().await.unwrap());
1659            assert_eq!(map1.is_empty().await.unwrap(), false);
1660            assert_eq!(map1.len().await.unwrap(), 2);
1661            sleep(Duration::from_millis(1200)).await;
1662            println!("ttl: {:?}", map1.ttl().await.unwrap());
1663            assert_eq!(map1.len().await.unwrap(), 0);
1664            assert_eq!(map1.is_empty().await.unwrap(), true);
1665            map1.clear().await.unwrap();
1666        }
1667
1668        let mut db1 = db.clone();
1669        tokio::spawn(async move {
1670            loop {
1671                sleep(Duration::from_millis(10000)).await;
1672                let mut iter = db1.map_iter().await.unwrap();
1673                let limit = 10;
1674                let mut c = 0;
1675                while let Some(map) = iter.next().await {
1676                    let map = map.unwrap();
1677                    println!(
1678                        "map.is_empty(): {:?}, now: {:?}",
1679                        map.is_empty().await.unwrap(),
1680                        timestamp_millis()
1681                    );
1682                    c += 1;
1683                    if c > limit {
1684                        break;
1685                    }
1686                }
1687            }
1688        });
1689
1690        for x in 0..500 {
1691            let db = db.clone();
1692            tokio::spawn(async move {
1693                for i in 0..10_000 {
1694                    let map = match db
1695                        .map(format!("map_{}_{}", x, i), Some(1000 * 60))
1696                        .await
1698                    {
1699                        Ok(map) => map,
1700                        Err(e) => {
1701                            println!("map_expire {:?}", e);
1702                            continue;
1703                        }
1704                    };
1705                    if let Err(e) = map.insert(format!("k1_{}", i), &i).await {
1706                        println!("insert {:?}", e);
1707                    }
1708                    sleep(Duration::from_millis(0)).await;
1709                    if let Err(e) = map.insert(format!("k2_{}", i), &i).await {
1710                        println!("insert {:?}", e);
1711                    }
1712                    sleep(Duration::from_millis(0)).await;
1713                    if let Err(e) = map.insert(format!("k3_{}", i), &i).await {
1714                        println!("insert {:?}", e);
1715                    }
1716                    sleep(Duration::from_millis(0)).await;
1717                }
1718                println!("********************* end {:?}", x);
1719            });
1720        }
1721
1722        sleep(Duration::from_secs(100000)).await;
1723    }
1724
1725    #[tokio::main]
1726    #[allow(dead_code)]
1727    #[cfg(feature = "sled")]
1728    async fn test_map_expire_list() {
1730        use super::{SledStorageDB, StorageDB};
1731        let cfg = Config {
1732            typ: StorageType::Sled,
1733            sled: SledConfig {
1734                path: format!("./.catch/{}", "map_expire_list"),
1735                cache_capacity: convert::Bytesize::from(1024 * 1024 * 1024 * 3),
1736                cleanup_f: move |_db| {
1737                    #[cfg(feature = "ttl")]
1738                    {
1739                        let db = _db.clone();
1740                        std::thread::spawn(move || {
1741                            let limit = 1000;
1742                            for _ in 0..5 {
1743                                std::thread::sleep(std::time::Duration::from_secs(3));
1744                                let mut total_cleanups = 0;
1745                                let now = std::time::Instant::now();
1746                                loop {
1747                                    let count = db.cleanup(limit);
1748                                    total_cleanups += count;
1749                                    println!(
1750                                        "def_cleanup: {}, total cleanups: {}, cost time: {:?}",
1751                                        count,
1752                                        total_cleanups,
1753                                        now.elapsed()
1754                                    );
1755
1756                                    if count < limit {
1757                                        break;
1758                                    }
1759
1760                                    std::thread::sleep(std::time::Duration::from_millis(10));
1761                                }
1762                                println!(
1763                                    "total cleanups: {}, cost time: {:?}",
1764                                    total_cleanups,
1765                                    now.elapsed()
1766                                );
1767                            }
1768                        });
1769                    }
1770                },
1771                ..Default::default()
1772            },
1773            #[cfg(feature = "redis")]
1774            redis: RedisConfig {
1775                url: "redis://127.0.0.1:6379/".into(),
1776                prefix: "map_expire_list".to_owned(),
1777            },
1778            #[cfg(feature = "redis-cluster")]
1779            redis_cluster: RedisClusterConfig {
1780                urls: [
1781                    "redis://127.0.0.1:6380/".into(),
1782                    "redis://127.0.0.1:6381/".into(),
1783                    "redis://127.0.0.1:6382/".into(),
1784                ]
1785                .into(),
1786                prefix: "map_expire_list".to_owned(),
1787            },
1788        };
1789
1790        let mut db = SledStorageDB::new(cfg.sled.clone()).await.unwrap();
1791
1792        let mut expireat_count = 0;
1793        for item in db.db.iter() {
1794            let (key, val) = item.unwrap();
1795            println!(
1796                "item: {:?}, val: {:?}",
1797                String::from_utf8_lossy(key.as_ref()),
1798                val.as_ref()
1799                    .try_into()
1800                    .map(|v: [u8; 8]| usize::from_be_bytes(v))
1801            );
1802            expireat_count += 1;
1803        }
1804        println!("expireat_count: {}", expireat_count);
1805
1806        let mut iter = db.map_iter().await.unwrap();
1807        let mut c = 0;
1809        let mut emptys = 0;
1810        while let Some(map) = iter.next().await {
1811            let map = map.unwrap();
1812            c += 1;
1813            if map.is_empty().await.unwrap() {
1814                emptys += 1;
1815            }
1816        }
1817        println!("c: {}, emptys: {}", c, emptys);
1818    }
1819
1820    #[tokio::main]
1821    #[test]
1822    async fn test_db_size() {
1823        let cfg = get_cfg("db_size");
1824        let mut db = init_db(&cfg).await.unwrap();
1825        let iter = db.scan("*").await.unwrap();
1826        for item in collect(iter).await {
1827            db.remove(item).await.unwrap();
1828        }
1829        println!("test_db_size db_size: {:?}", db.db_size().await);
1830        db.insert("k1", &1).await.unwrap();
1831        db.insert("k2", &2).await.unwrap();
1832        db.insert("k3", &3).await.unwrap();
1833        println!("test_db_size db_size: {:?}", db.db_size().await);
1834        let m = db.map("map1", None).await.unwrap();
1835        m.insert("mk1", &1).await.unwrap();
1836        m.insert("mk2", &2).await.unwrap();
1837        println!("test_db_size db_size: {:?}", db.db_size().await);
1838
1839        db.batch_insert(vec![
1840            (Vec::from("batch/len/1"), 11),
1841            (Vec::from("batch/len/2"), 22),
1842            (Vec::from("batch/len/3"), 33),
1843        ])
1844        .await
1845        .unwrap();
1846        println!("test_db_size db_size: {:?}", db.db_size().await);
1847    }
1848
1849    async fn collect(mut iter: Box<dyn AsyncIterator<Item = Result<Key>> + Send + '_>) -> Vec<Key> {
1850        let mut data = Vec::new();
1851        while let Some(key) = iter.next().await {
1852            data.push(key.unwrap())
1853        }
1854        data
1855    }
1856
1857    #[tokio::main]
1858    #[test]
1859    async fn test_scan() {
1860        let cfg = get_cfg("scan");
1861        let mut db = init_db(&cfg).await.unwrap();
1862        let iter = db.scan("*").await.unwrap();
1863        for item in collect(iter).await {
1864            println!("removed item: {:?}", String::from_utf8_lossy(&item));
1865            db.remove(item).await.unwrap();
1866        }
1867        println!("test_scan db_size: {:?}", db.db_size().await);
1868        db.insert("foo/abcd/1", &1).await.unwrap();
1869        db.insert("foo/abcd/2", &2).await.unwrap();
1870        db.insert("foo/abcd/3", &3).await.unwrap();
1871        db.insert("foo/abcd/**/4", &11).await.unwrap();
1872        db.insert("foo/abcd/*/4", &22).await.unwrap();
1873        db.insert("foo/abcd/*", &33).await.unwrap();
1874        db.insert("iot/abcd/5/a", &5).await.unwrap();
1875        db.insert("iot/abcd/6/b", &6).await.unwrap();
1876        db.insert("iot/abcd/7/c", &7).await.unwrap();
1877        db.insert("iot/abcd/", &8).await.unwrap();
1878        db.insert("iot/abcd", &9).await.unwrap();
1879
1880        println!("test_scan db_size: {:?}", db.db_size().await);
1881
1882        let format_topic = |t: &str| -> Cow<'_, str> {
1883            if t.len() == 1 {
1884                if t == "#" || t == "+" {
1885                    return Cow::Borrowed("*");
1886                }
1887            }
1888
1889            let t = t.replace("*", "\\*").replace("?", "\\?").replace("+", "*");
1890
1891            if t.len() > 1 && t.ends_with("/#") {
1892                Cow::Owned([&t[0..(t.len() - 2)], "*"].concat())
1893            } else {
1894                Cow::Owned(t)
1895            }
1896        };
1897
1898        let topic = format_topic("foo/abcd/#");
1900        println!("topic: {}", topic);
1901        let iter = db.scan(topic.as_bytes()).await.unwrap();
1902        let items = collect(iter).await;
1903        for item in items.iter() {
1904            println!("item: {:?}", String::from_utf8_lossy(&item));
1905        }
1906        assert_eq!(items.len(), 6);
1907
1908        let topic = format_topic("foo/abcd/*/#");
1910        println!("---topic: {} {}---", topic, "foo/abcd/\\**");
1911        let iter = db.scan(topic.as_bytes()).await.unwrap();
1912        assert_eq!(collect(iter).await.len(), 3);
1913
1914        let topic = format_topic("foo/abcd/*");
1916        println!("---topic: {} {}---", topic, "foo/abcd/\\*");
1917        let iter = db.scan(topic.as_bytes()).await.unwrap();
1918        assert_eq!(collect(iter).await.len(), 1);
1919
1920        let topic = format_topic("foo/abcd/+/#");
1922        println!("---topic: {} {}---", topic, "foo/abcd/*/*");
1923        let iter = db.scan(topic.as_bytes()).await.unwrap();
1924        assert_eq!(collect(iter).await.len(), 6);
1925
1926        let topic = format_topic("iot/abcd/#");
1928        println!("---topic: {} {}---", topic, "iot/abcd*");
1929        let iter = db.scan(topic.as_bytes()).await.unwrap();
1930        assert_eq!(collect(iter).await.len(), 5);
1931
1932        let topic = format_topic("iot/abcd/+");
1934        println!("---topic: {} {}---", topic, "iot/abcd/*");
1935        let iter = db.scan(topic.as_bytes()).await.unwrap();
1936        assert_eq!(collect(iter).await.len(), 4);
1937    }
1938
1939    #[tokio::main]
1940    #[cfg(feature = "len")]
1941    #[allow(dead_code)]
1942    async fn test_len() {
1944        let cfg = get_cfg("test_len");
1945        let mut db = init_db(&cfg).await.unwrap();
1946        println!("a test_len len: {:?}", db.len().await);
1947        let iter = db.scan("*").await.unwrap();
1948        for item in collect(iter).await {
1949            println!(
1950                "test_len remove item: {:?}",
1951                String::from_utf8_lossy(item.as_slice())
1952            );
1953            db.remove(item).await.unwrap();
1954        }
1955        println!("b test_len len: {:?}", db.len().await);
1956        db.insert("foo/len/1", &1).await.unwrap();
1957        db.insert("foo/len/2", &2).await.unwrap();
1958        db.insert("foo/len/3", &3).await.unwrap();
1959        db.insert("foo/len/4", &4).await.unwrap();
1960
1961        db.expire_at("foo/len/3", timestamp_millis() + 1 * 1000)
1962            .await
1963            .unwrap();
1964        db.expire("foo/len/4", 1000 * 2).await.unwrap();
1965        println!("test_len len: {:?}", db.len().await);
1966        assert_eq!(db.len().await.unwrap(), 4);
1967
1968        sleep(Duration::from_millis(1100)).await;
1969        println!("test_len len: {:?}", db.len().await);
1970        assert_eq!(db.len().await.unwrap(), 3);
1971
1972        sleep(Duration::from_millis(1100)).await;
1973        println!("test_len len: {:?}", db.len().await);
1974        assert_eq!(db.len().await.unwrap(), 2);
1975
1976        db.remove("foo/len/1").await.unwrap();
1977        println!("test_len len: {:?}", db.len().await);
1978        assert_eq!(db.len().await.unwrap(), 1);
1979
1980        db.batch_insert(vec![
1981            (Vec::from("batch/len/1"), 11),
1982            (Vec::from("batch/len/2"), 22),
1983            (Vec::from("batch/len/3"), 33),
1984        ])
1985        .await
1986        .unwrap();
1987        assert_eq!(db.len().await.unwrap(), 4);
1988
1989        db.batch_remove(vec![Vec::from("batch/len/1"), Vec::from("batch/len/2")])
1990            .await
1991            .unwrap();
1992
1993        assert_eq!(db.len().await.unwrap(), 2);
1994        println!("test_len len: {:?}", db.len().await);
1995    }
1996}