actix_cloud/memorydb/
default.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use chrono::Utc;
6use glob::Pattern;
7use parking_lot::RwLock;
8
9use super::interface::MemoryDB;
10use crate::Result;
11
12struct Data(String, Option<u64>);
13
14impl Data {
15    fn now() -> Result<u64> {
16        Utc::now().timestamp().try_into().map_err(Into::into)
17    }
18
19    fn parse_ttl(ttl: Option<u64>) -> Result<Option<u64>> {
20        if let Some(x) = ttl {
21            Ok(Some(
22                Self::now()?
23                    .checked_add(x)
24                    .ok_or(anyhow!("timestamp overflow"))?,
25            ))
26        } else {
27            Ok(None)
28        }
29    }
30
31    fn new<S>(value: S, ttl: Option<u64>) -> Result<Self>
32    where
33        S: Into<String>,
34    {
35        Ok(Self(value.into(), Self::parse_ttl(ttl)?))
36    }
37
38    fn set_ttl(&mut self, ttl: Option<u64>) -> Result<()> {
39        self.1 = Self::parse_ttl(ttl)?;
40        Ok(())
41    }
42
43    fn get_ttl(&self) -> Result<Option<u64>> {
44        if let Some(x) = self.1 {
45            Ok(Some(
46                x.checked_sub(Self::now()?)
47                    .ok_or(anyhow!("timestamp overflow"))?,
48            ))
49        } else {
50            Ok(None)
51        }
52    }
53
54    fn valid(&self) -> Result<bool> {
55        if let Some(x) = self.1 {
56            if x > Self::now()? {
57                Ok(true)
58            } else {
59                Ok(false)
60            }
61        } else {
62            Ok(true)
63        }
64    }
65}
66
67#[derive(Clone)]
68pub struct DefaultBackend {
69    data: Arc<RwLock<HashMap<String, Data>>>,
70}
71
72impl DefaultBackend {
73    pub fn new() -> Self {
74        Self {
75            data: Arc::new(RwLock::new(HashMap::new())),
76        }
77    }
78}
79
80impl Default for DefaultBackend {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86#[async_trait]
87impl MemoryDB for DefaultBackend {
88    async fn set(&self, key: &str, value: &str) -> Result<()> {
89        self.data
90            .write()
91            .insert(key.to_owned(), Data::new(value, None)?);
92        Ok(())
93    }
94
95    async fn get(&self, key: &str) -> Result<Option<String>> {
96        let rlock = self.data.read();
97        if let Some(v) = rlock.get(key) {
98            if v.valid()? {
99                Ok(Some(v.0.to_owned()))
100            } else {
101                drop(rlock);
102                self.data.write().remove(key);
103                Ok(None)
104            }
105        } else {
106            Ok(None)
107        }
108    }
109
110    async fn get_del(&self, key: &str) -> Result<Option<String>> {
111        let v = self.data.write().remove(key);
112        if let Some(v) = v {
113            if v.valid()? {
114                return Ok(Some(v.0));
115            }
116        }
117        Ok(None)
118    }
119
120    async fn get_ex(&self, key: &str, ttl: &Duration) -> Result<Option<String>> {
121        let mut wlock = self.data.write();
122        if let Some(v) = wlock.get_mut(key) {
123            if v.valid()? {
124                v.set_ttl(Some(ttl.as_secs()))?;
125                Ok(Some(v.0.to_owned()))
126            } else {
127                wlock.remove(key);
128                Ok(None)
129            }
130        } else {
131            Ok(None)
132        }
133    }
134
135    async fn set_ex(&self, key: &str, value: &str, ttl: &Duration) -> Result<()> {
136        self.data
137            .write()
138            .insert(key.to_owned(), Data::new(value, Some(ttl.as_secs()))?);
139        Ok(())
140    }
141
142    async fn del(&self, key: &str) -> Result<bool> {
143        Ok(self.data.write().remove(key).is_some())
144    }
145
146    async fn expire(&self, key: &str, ttl: i64) -> Result<bool> {
147        if ttl <= 0 {
148            self.del(key).await
149        } else {
150            let mut wlock = self.data.write();
151            if let Some(v) = wlock.get_mut(key) {
152                if v.valid()? {
153                    v.set_ttl(Some(ttl as u64))?;
154                    Ok(true)
155                } else {
156                    wlock.remove(key);
157                    Ok(false)
158                }
159            } else {
160                Ok(false)
161            }
162        }
163    }
164
165    async fn flush(&self) -> Result<()> {
166        self.data.write().clear();
167        Ok(())
168    }
169
170    async fn keys(&self, key: &str) -> Result<Vec<String>> {
171        let mut ret = Vec::new();
172        let p = Pattern::new(key)?;
173        for (k, v) in self.data.read().iter() {
174            if v.valid()? && p.matches(k) {
175                ret.push(k.to_owned());
176            }
177        }
178        Ok(ret)
179    }
180
181    async fn dels(&self, keys: &[String]) -> Result<u64> {
182        let mut wlock = self.data.write();
183        let mut sum = 0;
184        for i in keys {
185            if wlock.remove(i).is_some() {
186                sum += 1;
187            }
188        }
189        Ok(sum)
190    }
191
192    async fn ttl(&self, key: &str) -> Result<Option<u64>> {
193        let rlock = self.data.read();
194        if let Some(v) = rlock.get(key) {
195            if v.valid()? {
196                Ok(v.get_ttl()?)
197            } else {
198                drop(rlock);
199                self.data.write().remove(key);
200                Ok(None)
201            }
202        } else {
203            Ok(None)
204        }
205    }
206}