actix_cloud/memorydb/
default.rs1use std::{
2 cmp::{max, Reverse},
3 collections::HashMap,
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::bail;
9use async_trait::async_trait;
10use chrono::Utc;
11use glob::Pattern;
12use parking_lot::{RwLock, RwLockWriteGuard};
13use priority_queue::PriorityQueue;
14
15use super::interface::MemoryDB;
16use crate::Result;
17
18struct Data(String, Option<i64>);
19
20impl Data {
21 fn now() -> i64 {
22 Utc::now().timestamp()
23 }
24
25 fn parse_ttl(ttl: Option<i64>) -> Option<i64> {
26 ttl.map(|x| Self::now().saturating_add(x))
27 }
28
29 fn new<S>(value: S, ttl: Option<i64>) -> Self
30 where
31 S: Into<String>,
32 {
33 Self(value.into(), Self::parse_ttl(ttl))
34 }
35
36 fn set_ttl(&mut self, ttl: Option<i64>) {
37 self.1 = Self::parse_ttl(ttl);
38 }
39
40 fn get_ttl(&self) -> Option<i64> {
41 self.1.map(|x| x.saturating_sub(Self::now()))
42 }
43
44 fn valid(&self) -> bool {
45 if let Some(x) = self.1 {
46 x > Self::now()
47 } else {
48 true
49 }
50 }
51}
52
53#[derive(Clone)]
54pub struct DefaultBackend {
55 data: Arc<RwLock<HashMap<String, Data>>>,
56 capacity: Option<usize>,
57}
58
59impl DefaultBackend {
60 pub fn new(capacity: Option<usize>) -> Self {
61 Self {
62 data: Default::default(),
63 capacity,
64 }
65 }
66
67 fn gc(&self, wlock: &mut RwLockWriteGuard<HashMap<String, Data>>, num: usize) -> usize {
72 let mut queue = PriorityQueue::new();
73 let mut delete = Vec::new();
74 for (k, v) in wlock.iter() {
75 if !v.valid() {
76 delete.push(k.to_owned());
77 } else if let Some(x) = v.1 {
78 queue.push(k.to_owned(), Reverse(x));
79 }
80 }
81 for i in &delete {
82 wlock.remove(i);
83 }
84 let mut ret = delete.len();
85 if ret < num {
86 let remain = num - ret;
87 for _ in 0..remain {
88 if let Some(k) = queue.pop() {
89 wlock.remove(&k.0);
90 ret += 1;
91 } else {
92 return ret;
93 }
94 }
95 }
96 ret
97 }
98}
99
100impl Default for DefaultBackend {
101 fn default() -> Self {
102 Self::new(None)
103 }
104}
105
106#[async_trait]
107impl MemoryDB for DefaultBackend {
108 async fn set(&self, key: &str, value: &str) -> Result<()> {
109 let mut wlock = self.data.write();
110 if let Some(x) = self.capacity {
112 if x == wlock.len()
113 && self.gc(&mut wlock, max(x / 10, 1)) == 0
114 && wlock.get(key).is_none()
115 {
116 bail!("Capacity is full");
117 }
118 }
119 wlock.insert(key.to_owned(), Data::new(value, None));
120 Ok(())
121 }
122
123 async fn get(&self, key: &str) -> Result<Option<String>> {
124 let rlock = self.data.read();
125 if let Some(v) = rlock.get(key) {
126 if v.valid() {
127 Ok(Some(v.0.to_owned()))
128 } else {
129 drop(rlock);
130 self.data.write().remove(key);
131 Ok(None)
132 }
133 } else {
134 Ok(None)
135 }
136 }
137
138 async fn get_del(&self, key: &str) -> Result<Option<String>> {
139 let v = self.data.write().remove(key);
140 if let Some(v) = v {
141 if v.valid() {
142 return Ok(Some(v.0));
143 }
144 }
145 Ok(None)
146 }
147
148 async fn get_ex(&self, key: &str, ttl: &Duration) -> Result<Option<String>> {
149 let mut wlock = self.data.write();
150 if let Some(v) = wlock.get_mut(key) {
151 if v.valid() {
152 v.set_ttl(Some(ttl.as_secs().try_into()?));
153 Ok(Some(v.0.to_owned()))
154 } else {
155 wlock.remove(key);
156 Ok(None)
157 }
158 } else {
159 Ok(None)
160 }
161 }
162
163 async fn set_ex(&self, key: &str, value: &str, ttl: &Duration) -> Result<()> {
164 let mut wlock = self.data.write();
165 if let Some(x) = self.capacity {
167 if x == wlock.len()
168 && self.gc(&mut wlock, max(x / 10, 1)) == 0
169 && wlock.get(key).is_none()
170 {
171 bail!("Capacity is full");
172 }
173 }
174 wlock.insert(
175 key.to_owned(),
176 Data::new(value, Some(ttl.as_secs().try_into()?)),
177 );
178 Ok(())
179 }
180
181 async fn del(&self, key: &str) -> Result<bool> {
182 Ok(self.data.write().remove(key).is_some())
183 }
184
185 async fn expire(&self, key: &str, ttl: i64) -> Result<bool> {
186 if ttl <= 0 {
187 self.del(key).await
188 } else {
189 let mut wlock = self.data.write();
190 if let Some(v) = wlock.get_mut(key) {
191 if v.valid() {
192 v.set_ttl(Some(ttl));
193 Ok(true)
194 } else {
195 wlock.remove(key);
196 Ok(false)
197 }
198 } else {
199 Ok(false)
200 }
201 }
202 }
203
204 async fn flush(&self) -> Result<()> {
205 self.data.write().clear();
206 Ok(())
207 }
208
209 async fn keys(&self, key: &str) -> Result<Vec<String>> {
210 let mut ret = Vec::new();
211 let p = Pattern::new(key)?;
212 for (k, v) in self.data.read().iter() {
213 if v.valid() && p.matches(k) {
214 ret.push(k.to_owned());
215 }
216 }
217 Ok(ret)
218 }
219
220 async fn dels(&self, keys: &[String]) -> Result<u64> {
221 let mut wlock = self.data.write();
222 let mut sum = 0;
223 for i in keys {
224 if wlock.remove(i).is_some() {
225 sum += 1;
226 }
227 }
228 Ok(sum)
229 }
230
231 async fn ttl(&self, key: &str) -> Result<Option<i64>> {
232 let rlock = self.data.read();
233 if let Some(v) = rlock.get(key) {
234 if v.valid() {
235 Ok(v.get_ttl())
236 } else {
237 drop(rlock);
238 self.data.write().remove(key);
239 Ok(None)
240 }
241 } else {
242 Ok(None)
243 }
244 }
245}