1use std::sync::Arc;
18use std::collections::{BTreeMap, HashMap, VecDeque};
19use std::time::{SystemTime, Duration};
20
21use tokio::sync::{Mutex, MutexGuard};
22
23use super::container::Container;
24use super::container::ContainerPtr;
25
26type Key = super::Key;
27type Value = super::Value;
28type Arguments = super::Arguments;
29type ExecResult = super::ExecResult;
30
31impl super::Storage {
32 pub fn make_container(cnt: Container) -> ContainerPtr {
33 Arc::new(Mutex::new(cnt))
34 }
35 pub fn make_container_with<F: FnMut() -> Container>(mut factory: F) -> ContainerPtr {
36 Self::make_container(factory())
37 }
38
39 pub async fn try_get_container(&self, key: &Key) -> Option<ContainerPtr> {
40 let containers = self.containers.lock().await;
41 containers
42 .get(key)
43 .cloned()
44 }
45
46 pub async fn get_container<F: FnMut() -> Container>(&self, key: Key, factory: F) -> ContainerPtr {
47 let mut containers = self.containers.lock().await;
48 containers
49 .entry(key.clone())
50 .or_insert_with(||Self::make_container_with(factory))
51 .clone()
52 }
53
54 pub async fn try_get_containers(&self, keys: &Vec<Key>) -> Vec<Option<ContainerPtr>> {
55 let containers = self.containers.lock().await;
56
57 keys
58 .iter()
59 .map(|key| {
60 match containers.get(key) {
61 Some(v) => Some(v.clone()),
62 None => None,
63 }
64 })
65 .collect()
66 }
67
68 pub async fn get_containers<F: FnMut() -> Container>(&self, mut keys: Vec<Key>, mut factory: F) -> Vec<ContainerPtr> {
69 let mut containers = self.containers.lock().await;
70
71 keys
72 .drain(..)
73 .map(|key| {
74 if let Some(v) = containers.get(&key) {
75 v.clone()
76 } else {
77 let c = Self::make_container_with(||factory());
78 containers.insert(key, c.clone());
79 c
80 }
81 })
82 .collect()
83 }
84
85 pub async fn lock_all<'a, T: 'a>(mut writes: impl Iterator<Item=&'a Mutex<T>>, mut reads: impl Iterator<Item=Option<&'a Mutex<T>>>) -> (Vec<MutexGuard<'a, T>>, Vec<Option<MutexGuard<'a, T>>>) {
86 let mut mutexes = BTreeMap::<u64, &'a Mutex<T>>::new();
87 let mut guards = HashMap::<u64, MutexGuard<'a, T>>::new();
88 let mut output_order_writes = Vec::<u64>::new();
89 let mut output_order_reads = Vec::<u64>::new();
90 while let Some(m) = writes.next() {
91 let address = m as *const Mutex<T> as u64;
92 mutexes.insert(address, m);
93 output_order_writes.push(address);
94 }
95 while let Some(m) = reads.next() {
96 match m {
97 None => output_order_reads.push(0),
98 Some(m) => {
99 let address = m as *const Mutex<T> as u64;
100 mutexes.insert(address, m);
101 output_order_reads.push(address);
102 },
103 }
104 }
105 for (address, m) in mutexes {
106 guards.insert(address, m.lock().await);
107 }
108 let writes = output_order_writes
109 .iter()
110 .map(|a|guards.remove(a).unwrap())
111 .collect()
112 ;
113 let reads = output_order_reads
114 .iter()
115 .map(|a|{
116 match a {
117 0 => None,
118 a => Some(guards.remove(a).unwrap()),
119 }
120 })
121 .collect()
122 ;
123 (writes, reads)
124 }
125
126 pub async fn keys_keys(&self, mut args: Arguments) -> ExecResult {
127 let pattern = Self::extract_key(args.pop_front())?;
128 let pattern = std::str::from_utf8(&pattern[..]).map_err(|e|format!("{}", e))?;
129 let pattern = regex::bytes::Regex::new(pattern).map_err(|e|format!("{}", e))?;
130 let filter = |key: &&Key| -> bool {
131 pattern.is_match(&key[..])
132 };
133
134 let containers = self.containers.lock().await;
135
136 Ok(Value::Array(
137 containers
138 .keys()
139 .filter(filter)
140 .map(|key| Value::Buffer(key.clone()))
141 .collect()
142 ))
143 }
144
145 pub async fn keys_exists(&self, mut args: Arguments) -> ExecResult {
146 let containers = self.containers.lock().await;
147
148 let mut exists_count = 0;
149 while let Ok(key) = Self::extract_key(args.pop_front()) {
150 if let Some(_) = containers.get(&key) {
151 exists_count = exists_count + 1;
152 }
153 }
154 Ok(Value::Integer(exists_count))
155 }
156
157 pub async fn keys_now(&self, _args: Arguments) -> ExecResult {
158 let timepoint = SystemTime::now();
159 let timestamp = timepoint.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
160 Ok(Value::Integer(timestamp as i64))
161 }
162
163 pub async fn keys_pnow(&self, _args: Arguments) -> ExecResult {
164 let timepoint = SystemTime::now();
165 let timestamp = timepoint.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
166 Ok(Value::Integer(timestamp as i64))
167 }
168
169 pub async fn keys_del(&self, mut args: Arguments) -> ExecResult {
170 let mut containers = self.containers.lock().await;
171
172 let mut removed_count = 0;
173 while let Ok(key) = Self::extract_key(args.pop_front()) {
174 if let Some(_) = containers.remove(&key) {
175 removed_count = removed_count + 1;
176 }
177 }
178 Ok(Value::Integer(removed_count))
179 }
180
181 async fn key_expiration(&self, cnt: &ContainerPtr) -> Option<std::time::SystemTime> {
182 let cnt = cnt.lock().await;
183 match &*cnt {
184 Container::Set(c) => c.expiration_time,
185 Container::List(c) => c.expiration_time,
186 Container::Hash(c) => c.expiration_time,
187 Container::Strings(c) => c.expiration_time,
188 }
189 }
190
191 pub async fn keys_rename(&mut self, mut args: Arguments) -> ExecResult {
192 let key = Self::extract_key(args.pop_front())?;
193 let newkey = Self::extract_key(args.pop_front())?;
194
195 let mut containers = self.containers.lock().await;
196 let cnt = containers.remove(&key).ok_or_else(||format!("key '{:?}' not found", &key[..]))?;
197 let timepoint = self.key_expiration(&cnt).await;
198 containers.insert(newkey.clone(), cnt);
199 drop(containers);
200
201 if let Some(timepoint) = timepoint {
202 self.expire_key_at(&newkey, timepoint).await;
203 }
204 Ok(Value::Ok)
205 }
206
207 fn check_type(key_type: &str) -> Result<(), String> {
208 match key_type {
209 "set" => Ok(()),
210 "list" => Ok(()),
211 "hash" => Ok(()),
212 "string" => Ok(()),
213 t => Err(format!("Unexpected type '{}'", t)),
214 }
215 }
216
217 fn type_to_string(c: &Container) -> &str {
218 match *c {
219 Container::Set(_) => "set",
220 Container::List(_) => "list",
221 Container::Hash(_) => "hash",
222 Container::Strings(_) => "string",
223 }
224 }
225
226 pub async fn keys_type(&self, mut args: Arguments) -> ExecResult {
227 let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
228 let cnts = self.try_get_containers(&keys).await;
229 let mut types = VecDeque::new();
230 for c in cnts {
231 let ktype = match c {
232 None => Value::Nill,
233 Some(c) => {
234 let c = c.lock().await;
235 let t = Self::type_to_string(&c);
236 Value::Buffer(Vec::from(t.as_bytes()))
237 }
238 };
239 types.push_back(ktype);
240 }
241 match types.len() {
242 0 => Err(format!("TYPE key")),
243 1 => Ok(types.remove(0).unwrap()),
244 _ => Ok(Value::Array(types)),
245 }
246 }
247
248 fn get_expiration_time(c: &Container) -> Option<SystemTime> {
249 match c {
250 Container::Set(c) => c.expiration_time,
251 Container::List(c) => c.expiration_time,
252 Container::Hash(c) => c.expiration_time,
253 Container::Strings(c) => c.expiration_time,
254 }
255 }
256 fn set_expiration_time(c: &mut Container, t: Option<SystemTime>) {
257 let expire = match c {
258 Container::Set(c) => &mut c.expiration_time,
259 Container::List(c) => &mut c.expiration_time,
260 Container::Hash(c) => &mut c.expiration_time,
261 Container::Strings(c) => &mut c.expiration_time,
262 };
263 *expire = t;
264 }
265
266 async fn keys_expiration_time<F>(&mut self, mut args: Arguments, dur_to_i64: F) -> ExecResult
267 where F: FnOnce(Duration)->i64 {
268 let key = Self::extract_key(args.pop_front())?;
269 match self.try_get_container(&key).await {
270 None => Ok(Value::Integer(-2)),
271 Some(c) => {
272 let c = c.lock().await;
273 match Self::get_expiration_time(&*c) {
274 None => Ok(Value::Integer(-1)),
275 Some(tm) => {
276 let ttl = tm.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0));
277 Ok(Value::Integer(dur_to_i64(ttl)))
278 },
279 }
280 }
281 }
282 }
283
284 pub async fn keys_pttl(&mut self, args: Arguments) -> ExecResult {
285 self.keys_expiration_time(args, |ttl|ttl.as_millis() as i64).await
286 }
287
288 pub async fn keys_ttl(&mut self, args: Arguments) -> ExecResult {
289 self.keys_expiration_time(args, |ttl|ttl.as_secs() as i64).await
290 }
291
292 async fn keys_expire_impl(&mut self, key: Key, timepoint: SystemTime) -> ExecResult {
293 let c = self.try_get_container(&key).await;
294 match c {
295 None => Ok(Value::Bool(false)),
296 Some(c) => {
297 let mut c = c.lock().await;
298 Self::set_expiration_time(&mut *c, Some(timepoint));
299 drop(c);
300 self.expire_key_at(&key, timepoint).await;
301 Ok(Value::Bool(true))
302 },
303 }
304 }
305
306 pub async fn keys_expire(&mut self, mut args: Arguments) -> ExecResult {
307 let key = Self::extract_key(args.pop_front())?;
308 let seconds = Self::extract_unsigned_integer(args.pop_front())?;
309 let timepoint = SystemTime::now() + Duration::from_secs(seconds);
310 self.keys_expire_impl(key, timepoint).await
311 }
312
313 pub async fn keys_expire_at(&mut self, mut args: Arguments) -> ExecResult {
314 let key = Self::extract_key(args.pop_front())?;
315 let seconds = Self::extract_unsigned_integer(args.pop_front())?;
316 let timepoint = SystemTime::UNIX_EPOCH + Duration::from_secs(seconds);
317 self.keys_expire_impl(key, timepoint).await
318 }
319
320 pub async fn keys_pexpire(&mut self, mut args: Arguments) -> ExecResult {
321 let key = Self::extract_key(args.pop_front())?;
322 let millis = Self::extract_unsigned_integer(args.pop_front())?;
323 let timepoint = SystemTime::now() + Duration::from_millis(millis);
324 self.keys_expire_impl(key, timepoint).await
325 }
326
327 pub async fn keys_pexpire_at(&mut self, mut args: Arguments) -> ExecResult {
328 let key = Self::extract_key(args.pop_front())?;
329 let millis = Self::extract_unsigned_integer(args.pop_front())?;
330 let timepoint = SystemTime::UNIX_EPOCH + Duration::from_millis(millis);
331 self.keys_expire_impl(key, timepoint).await
332 }
333
334 pub async fn keys_check_expirations(&self) {
335 log::debug!("Begin expiration check");
336
337 let (now, expired) = {
338 let mut controller = self.expire_controller.lock().await;
339 controller.pop_now_and_expired_keys()
340 };
341
342 log::debug!("{:?}: {:?}", now, expired);
343
344 for key in expired {
345 if let Some(c) = self.try_get_container(&key).await {
346 let c = c.lock().await;
347 let tm = Self::get_expiration_time(&*c);
348 log::debug!("{:?}: {:?} vs {:?}", key, tm, now);
349 match tm {
350 Some(time) => {
351 if time > now {
352 log::warn!("{:?}: will removed at {:?}", key, time);
353 } else {
354 log::debug!("{:?}: expired and removed", key);
355 let mut containers = self.containers.lock().await;
356 containers.remove(&key);
357 }
358 },
359 None => (),
360 }
361 }
362 }
363 log::debug!("Check expiration done");
364 }
365
366 pub async fn keys_scan(&self, mut args: Arguments) -> ExecResult {
367 let start = Self::extract_index(args.pop_front())?;
368
369 let mut pattern: Option<String> = None;
370 let mut key_type: Option<String> = None;
371 let mut max_check = 100usize;
372
373 while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
374 match &subcmd.to_uppercase()[..] {
375 "MATCH" => pattern = Some(Self::extract_string(args.pop_front())?),
376 "COUNT" => max_check = Self::extract_index(args.pop_front())?,
377 "TYPE" => key_type = Some(Self::extract_string(args.pop_front())?),
378 arg => return Err(format!("Unexpected argument '{}'", arg)),
379 }
380 }
381 if let Some(key_type) = &key_type {
382 Self::check_type(&key_type[..])?;
383 }
384
385 let pattern = match pattern {
386 None => None,
387 Some(pattern) => Some(regex::bytes::Regex::new(&pattern[..]).map_err(|e|format!("{}", e))?),
388 };
389
390 let containers = self.containers.lock().await;
391
392 let mut keys = vec![];
393
394 let end = start + max_check;
395 let mut next = start;
396 for i in start..end {
397 next = i;
398 if let Some((key, container)) = containers.get_index(i) {
399 if let Some(key_type) = &key_type {
400 match container.try_lock() {
401 Err(_) => break,
402 Ok(container) => {
403 let t = Self::type_to_string(&container);
404 if key_type != t {
405 continue;
406 }
407 }
408 }
409 }
410 if let Some(pattern) = &pattern {
411 if ! pattern.is_match(&key[..]) {
412 continue;
413 }
414 }
415 keys.push(key.clone());
416 } else {
417 next = 0;
418 break;
419 }
420 }
421
422 let next = Value::Integer(next as i64);
423 let keys = Value::Array(
424 keys
425 .drain(..)
426 .map(|key| Value::Buffer(key))
427 .collect()
428 );
429 Ok(Value::Array(vec![next, keys].into()))
430 }
431}
432