actix_cloud/memorydb/
default.rs

1use std::{
2    cmp::{max, Reverse},
3    collections::HashMap,
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::bail;
9use async_trait::async_trait;
10use chrono::Utc;
11use glob::Pattern;
12use parking_lot::{RwLock, RwLockWriteGuard};
13use priority_queue::PriorityQueue;
14
15use super::interface::MemoryDB;
16use crate::Result;
17
18struct Data(String, Option<i64>);
19
20impl Data {
21    fn now() -> i64 {
22        Utc::now().timestamp()
23    }
24
25    fn parse_ttl(ttl: Option<i64>) -> Option<i64> {
26        ttl.map(|x| Self::now().saturating_add(x))
27    }
28
29    fn new<S>(value: S, ttl: Option<i64>) -> Self
30    where
31        S: Into<String>,
32    {
33        Self(value.into(), Self::parse_ttl(ttl))
34    }
35
36    fn set_ttl(&mut self, ttl: Option<i64>) {
37        self.1 = Self::parse_ttl(ttl);
38    }
39
40    fn get_ttl(&self) -> Option<i64> {
41        self.1.map(|x| x.saturating_sub(Self::now()))
42    }
43
44    fn valid(&self) -> bool {
45        if let Some(x) = self.1 {
46            x > Self::now()
47        } else {
48            true
49        }
50    }
51}
52
53#[derive(Clone)]
54pub struct DefaultBackend {
55    data: Arc<RwLock<HashMap<String, Data>>>,
56    capacity: Option<usize>,
57}
58
59impl DefaultBackend {
60    pub fn new(capacity: Option<usize>) -> Self {
61        Self {
62            data: Default::default(),
63            capacity,
64        }
65    }
66
67    /// Evict `num` keys from memory. Return evicted number.
68    ///
69    /// - Evict any expired keys (`x`).
70    /// - If `x < num`, evict at most `num-x` keys sorted by TTL.
71    fn gc(&self, wlock: &mut RwLockWriteGuard<HashMap<String, Data>>, num: usize) -> usize {
72        let mut queue = PriorityQueue::new();
73        let mut delete = Vec::new();
74        for (k, v) in wlock.iter() {
75            if !v.valid() {
76                delete.push(k.to_owned());
77            } else if let Some(x) = v.1 {
78                queue.push(k.to_owned(), Reverse(x));
79            }
80        }
81        for i in &delete {
82            wlock.remove(i);
83        }
84        let mut ret = delete.len();
85        if ret < num {
86            let remain = num - ret;
87            for _ in 0..remain {
88                if let Some(k) = queue.pop() {
89                    wlock.remove(&k.0);
90                    ret += 1;
91                } else {
92                    return ret;
93                }
94            }
95        }
96        ret
97    }
98}
99
100impl Default for DefaultBackend {
101    fn default() -> Self {
102        Self::new(None)
103    }
104}
105
106#[async_trait]
107impl MemoryDB for DefaultBackend {
108    async fn set(&self, key: &str, value: &str) -> Result<()> {
109        let mut wlock = self.data.write();
110        // full
111        if let Some(x) = self.capacity {
112            if x == wlock.len()
113                && self.gc(&mut wlock, max(x / 10, 1)) == 0
114                && wlock.get(key).is_none()
115            {
116                bail!("Capacity is full");
117            }
118        }
119        wlock.insert(key.to_owned(), Data::new(value, None));
120        Ok(())
121    }
122
123    async fn get(&self, key: &str) -> Result<Option<String>> {
124        let rlock = self.data.read();
125        if let Some(v) = rlock.get(key) {
126            if v.valid() {
127                Ok(Some(v.0.to_owned()))
128            } else {
129                drop(rlock);
130                self.data.write().remove(key);
131                Ok(None)
132            }
133        } else {
134            Ok(None)
135        }
136    }
137
138    async fn get_del(&self, key: &str) -> Result<Option<String>> {
139        let v = self.data.write().remove(key);
140        if let Some(v) = v {
141            if v.valid() {
142                return Ok(Some(v.0));
143            }
144        }
145        Ok(None)
146    }
147
148    async fn get_ex(&self, key: &str, ttl: &Duration) -> Result<Option<String>> {
149        let mut wlock = self.data.write();
150        if let Some(v) = wlock.get_mut(key) {
151            if v.valid() {
152                v.set_ttl(Some(ttl.as_secs().try_into()?));
153                Ok(Some(v.0.to_owned()))
154            } else {
155                wlock.remove(key);
156                Ok(None)
157            }
158        } else {
159            Ok(None)
160        }
161    }
162
163    async fn set_ex(&self, key: &str, value: &str, ttl: &Duration) -> Result<()> {
164        let mut wlock = self.data.write();
165        // full
166        if let Some(x) = self.capacity {
167            if x == wlock.len()
168                && self.gc(&mut wlock, max(x / 10, 1)) == 0
169                && wlock.get(key).is_none()
170            {
171                bail!("Capacity is full");
172            }
173        }
174        wlock.insert(
175            key.to_owned(),
176            Data::new(value, Some(ttl.as_secs().try_into()?)),
177        );
178        Ok(())
179    }
180
181    async fn del(&self, key: &str) -> Result<bool> {
182        Ok(self.data.write().remove(key).is_some())
183    }
184
185    async fn expire(&self, key: &str, ttl: i64) -> Result<bool> {
186        if ttl <= 0 {
187            self.del(key).await
188        } else {
189            let mut wlock = self.data.write();
190            if let Some(v) = wlock.get_mut(key) {
191                if v.valid() {
192                    v.set_ttl(Some(ttl));
193                    Ok(true)
194                } else {
195                    wlock.remove(key);
196                    Ok(false)
197                }
198            } else {
199                Ok(false)
200            }
201        }
202    }
203
204    async fn flush(&self) -> Result<()> {
205        self.data.write().clear();
206        Ok(())
207    }
208
209    async fn keys(&self, key: &str) -> Result<Vec<String>> {
210        let mut ret = Vec::new();
211        let p = Pattern::new(key)?;
212        for (k, v) in self.data.read().iter() {
213            if v.valid() && p.matches(k) {
214                ret.push(k.to_owned());
215            }
216        }
217        Ok(ret)
218    }
219
220    async fn dels(&self, keys: &[String]) -> Result<u64> {
221        let mut wlock = self.data.write();
222        let mut sum = 0;
223        for i in keys {
224            if wlock.remove(i).is_some() {
225                sum += 1;
226            }
227        }
228        Ok(sum)
229    }
230
231    async fn ttl(&self, key: &str) -> Result<Option<i64>> {
232        let rlock = self.data.read();
233        if let Some(v) = rlock.get(key) {
234            if v.valid() {
235                Ok(v.get_ttl())
236            } else {
237                drop(rlock);
238                self.data.write().remove(key);
239                Ok(None)
240            }
241        } else {
242            Ok(None)
243        }
244    }
245}