radish_database/
keys.rs

1/* Copyright (c) 2020 Dmitry Shatilov <shatilov dot diman at gmail dot com>
2 * 
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Affero General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 * GNU Affero General Public License for more details.
12
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use std::sync::Arc;
18use std::collections::{BTreeMap, HashMap, VecDeque};
19use std::time::{SystemTime, Duration};
20
21use tokio::sync::{Mutex, MutexGuard};
22
23use super::container::Container;
24use super::container::ContainerPtr;
25
26type Key = super::Key;
27type Value = super::Value;
28type Arguments = super::Arguments;
29type ExecResult = super::ExecResult;
30
31impl super::Storage {
32	pub fn make_container(cnt: Container) -> ContainerPtr {
33		Arc::new(Mutex::new(cnt))
34	}
35	pub fn make_container_with<F: FnMut() -> Container>(mut factory: F) -> ContainerPtr {
36		Self::make_container(factory())
37	}
38
39	pub async fn try_get_container(&self, key: &Key) -> Option<ContainerPtr> {
40		let containers = self.containers.lock().await;
41		containers
42		.get(key)
43		.cloned()
44	}
45
46	pub async fn get_container<F: FnMut() -> Container>(&self, key: Key, factory: F) -> ContainerPtr {
47		let mut containers = self.containers.lock().await;
48		containers
49		.entry(key.clone())
50		.or_insert_with(||Self::make_container_with(factory))
51		.clone()
52	}
53
54	pub async fn try_get_containers(&self, keys: &Vec<Key>) -> Vec<Option<ContainerPtr>> {
55		let containers = self.containers.lock().await;
56
57		keys
58		.iter()
59		.map(|key| {
60			match containers.get(key) {
61				Some(v) => Some(v.clone()),
62				None => None,
63			}
64		})
65		.collect()
66	}
67
68	pub async fn get_containers<F: FnMut() -> Container>(&self, mut keys: Vec<Key>, mut factory: F) -> Vec<ContainerPtr> {
69		let mut containers = self.containers.lock().await;
70
71		keys
72		.drain(..)
73		.map(|key| {
74			if let Some(v) = containers.get(&key) {
75				v.clone()
76			} else {
77				let c = Self::make_container_with(||factory());
78				containers.insert(key, c.clone());
79				c
80			}
81		})
82		.collect()
83	}
84
85	pub async fn lock_all<'a, T: 'a>(mut writes: impl Iterator<Item=&'a Mutex<T>>, mut reads: impl Iterator<Item=Option<&'a Mutex<T>>>) -> (Vec<MutexGuard<'a, T>>, Vec<Option<MutexGuard<'a, T>>>) {
86		let mut mutexes = BTreeMap::<u64, &'a Mutex<T>>::new();
87		let mut guards = HashMap::<u64, MutexGuard<'a, T>>::new();
88		let mut output_order_writes = Vec::<u64>::new();
89		let mut output_order_reads = Vec::<u64>::new();
90		while let Some(m) = writes.next() {
91			let address = m as *const Mutex<T> as u64;
92			mutexes.insert(address, m);
93			output_order_writes.push(address);
94		}
95		while let Some(m) = reads.next() {
96			match m {
97				None => output_order_reads.push(0),
98				Some(m) => {
99					let address = m as *const Mutex<T> as u64;
100					mutexes.insert(address, m);
101					output_order_reads.push(address);
102				},
103			}
104		}
105		for (address, m) in mutexes {
106			guards.insert(address, m.lock().await);
107		}
108		let writes = output_order_writes
109			.iter()
110			.map(|a|guards.remove(a).unwrap())
111			.collect()
112		;
113		let reads = output_order_reads
114			.iter()
115			.map(|a|{
116				match a {
117					0 => None,
118					a => Some(guards.remove(a).unwrap()),
119				}
120			})
121			.collect()
122		;
123		(writes, reads)
124	}
125
126	pub async fn keys_keys(&self, mut args: Arguments) -> ExecResult {
127		let pattern = Self::extract_key(args.pop_front())?;
128		let pattern = std::str::from_utf8(&pattern[..]).map_err(|e|format!("{}", e))?;
129		let pattern = regex::bytes::Regex::new(pattern).map_err(|e|format!("{}", e))?;
130		let filter = |key: &&Key| -> bool {
131			pattern.is_match(&key[..])
132		};
133
134		let containers = self.containers.lock().await;
135
136		Ok(Value::Array(
137			containers
138			.keys()
139			.filter(filter)
140			.map(|key| Value::Buffer(key.clone()))
141			.collect()
142		))
143	}
144
145	pub async fn keys_exists(&self, mut args: Arguments) -> ExecResult {
146		let containers = self.containers.lock().await;
147
148		let mut exists_count = 0;
149		while let Ok(key) = Self::extract_key(args.pop_front()) {
150			if let Some(_) = containers.get(&key) {
151				exists_count = exists_count + 1;
152			}
153		}
154		Ok(Value::Integer(exists_count))
155	}
156
157	pub async fn keys_now(&self, _args: Arguments) -> ExecResult {
158		let timepoint = SystemTime::now();
159		let timestamp = timepoint.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
160		Ok(Value::Integer(timestamp as i64))
161	}
162
163	pub async fn keys_pnow(&self, _args: Arguments) -> ExecResult {
164		let timepoint = SystemTime::now();
165		let timestamp = timepoint.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
166		Ok(Value::Integer(timestamp as i64))
167	}
168
169	pub async fn keys_del(&self, mut args: Arguments) -> ExecResult {
170		let mut containers = self.containers.lock().await;
171
172		let mut removed_count = 0;
173		while let Ok(key) = Self::extract_key(args.pop_front()) {
174			if let Some(_) = containers.remove(&key) {
175				removed_count = removed_count + 1;
176			}
177		}
178		Ok(Value::Integer(removed_count))
179	}
180
181	async fn key_expiration(&self, cnt: &ContainerPtr) -> Option<std::time::SystemTime> {
182		let cnt = cnt.lock().await;
183		match &*cnt {
184			Container::Set(c) => c.expiration_time,
185			Container::List(c) => c.expiration_time,
186			Container::Hash(c) => c.expiration_time,
187			Container::Strings(c) => c.expiration_time,
188		}
189	}
190
191	pub async fn keys_rename(&mut self, mut args: Arguments) -> ExecResult {
192		let key = Self::extract_key(args.pop_front())?;
193		let newkey = Self::extract_key(args.pop_front())?;
194
195		let mut containers = self.containers.lock().await;
196		let cnt = containers.remove(&key).ok_or_else(||format!("key '{:?}' not found", &key[..]))?;
197		let timepoint = self.key_expiration(&cnt).await;
198		containers.insert(newkey.clone(), cnt);
199		drop(containers);
200
201		if let Some(timepoint) = timepoint {
202			self.expire_key_at(&newkey, timepoint).await;
203		}
204		Ok(Value::Ok)
205	}
206
207	fn check_type(key_type: &str) -> Result<(), String> {
208		match key_type {
209			"set" => Ok(()),
210			"list" => Ok(()),
211			"hash" => Ok(()),
212			"string" => Ok(()),
213			t => Err(format!("Unexpected type '{}'", t)),
214		}
215	}
216
217	fn type_to_string(c: &Container) -> &str {
218		match *c {
219			Container::Set(_) => "set",
220			Container::List(_) => "list",
221			Container::Hash(_) => "hash",
222			Container::Strings(_) => "string",
223		}
224	}
225
226	pub async fn keys_type(&self, mut args: Arguments) -> ExecResult {
227		let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
228		let cnts = self.try_get_containers(&keys).await;
229		let mut types = VecDeque::new();
230		for c in cnts {
231			let ktype = match c {
232				None => Value::Nill,
233				Some(c) => {
234					let c = c.lock().await;
235					let t = Self::type_to_string(&c);
236					Value::Buffer(Vec::from(t.as_bytes()))
237				}
238			};
239			types.push_back(ktype);
240		}
241		match types.len() {
242			0 => Err(format!("TYPE key")),
243			1 => Ok(types.remove(0).unwrap()),
244			_ => Ok(Value::Array(types)),
245		}
246	}
247
248	fn get_expiration_time(c: &Container) -> Option<SystemTime> {
249		match c {
250			Container::Set(c) => c.expiration_time,
251			Container::List(c) => c.expiration_time,
252			Container::Hash(c) => c.expiration_time,
253			Container::Strings(c) => c.expiration_time,
254		}
255	}
256	fn set_expiration_time(c: &mut Container, t: Option<SystemTime>) {
257		let expire = match c {
258			Container::Set(c) => &mut c.expiration_time,
259			Container::List(c) => &mut c.expiration_time,
260			Container::Hash(c) => &mut c.expiration_time,
261			Container::Strings(c) => &mut c.expiration_time,
262		};
263		*expire = t;
264	}
265
266	async fn keys_expiration_time<F>(&mut self, mut args: Arguments, dur_to_i64: F) -> ExecResult
267	where F: FnOnce(Duration)->i64 {
268		let key = Self::extract_key(args.pop_front())?;
269		match self.try_get_container(&key).await {
270			None => Ok(Value::Integer(-2)),
271			Some(c) => {
272				let c = c.lock().await;
273				match Self::get_expiration_time(&*c) {
274					None => Ok(Value::Integer(-1)),
275					Some(tm) => {
276						let ttl = tm.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0));
277						Ok(Value::Integer(dur_to_i64(ttl)))
278					},
279				}
280			}
281		}
282	}
283
284	pub async fn keys_pttl(&mut self, args: Arguments) -> ExecResult {
285		self.keys_expiration_time(args, |ttl|ttl.as_millis() as i64).await
286	}
287
288	pub async fn keys_ttl(&mut self, args: Arguments) -> ExecResult {
289		self.keys_expiration_time(args, |ttl|ttl.as_secs() as i64).await
290	}
291
292	async fn keys_expire_impl(&mut self, key: Key, timepoint: SystemTime) -> ExecResult {
293		let c = self.try_get_container(&key).await;
294		match c {
295			None => Ok(Value::Bool(false)),
296			Some(c) => {
297				let mut c = c.lock().await;
298				Self::set_expiration_time(&mut *c, Some(timepoint));
299				drop(c);
300				self.expire_key_at(&key, timepoint).await;
301				Ok(Value::Bool(true))
302			},
303		}
304	}
305
306	pub async fn keys_expire(&mut self, mut args: Arguments) -> ExecResult {
307		let key = Self::extract_key(args.pop_front())?;
308		let seconds = Self::extract_unsigned_integer(args.pop_front())?;
309		let timepoint = SystemTime::now() + Duration::from_secs(seconds);
310		self.keys_expire_impl(key, timepoint).await
311	}
312
313	pub async fn keys_expire_at(&mut self, mut args: Arguments) -> ExecResult {
314		let key = Self::extract_key(args.pop_front())?;
315		let seconds = Self::extract_unsigned_integer(args.pop_front())?;
316		let timepoint = SystemTime::UNIX_EPOCH + Duration::from_secs(seconds);
317		self.keys_expire_impl(key, timepoint).await
318	}
319
320	pub async fn keys_pexpire(&mut self, mut args: Arguments) -> ExecResult {
321		let key = Self::extract_key(args.pop_front())?;
322		let millis = Self::extract_unsigned_integer(args.pop_front())?;
323		let timepoint = SystemTime::now() + Duration::from_millis(millis);
324		self.keys_expire_impl(key, timepoint).await
325	}
326
327	pub async fn keys_pexpire_at(&mut self, mut args: Arguments) -> ExecResult {
328		let key = Self::extract_key(args.pop_front())?;
329		let millis = Self::extract_unsigned_integer(args.pop_front())?;
330		let timepoint = SystemTime::UNIX_EPOCH + Duration::from_millis(millis);
331		self.keys_expire_impl(key, timepoint).await
332	}
333
334	pub async fn keys_check_expirations(&self) {
335		log::debug!("Begin expiration check");
336
337		let (now, expired) = {
338			let mut controller = self.expire_controller.lock().await;
339			controller.pop_now_and_expired_keys()
340		};
341
342		log::debug!("{:?}: {:?}", now, expired);
343
344		for key in expired {
345			if let Some(c) = self.try_get_container(&key).await {
346				let c = c.lock().await;
347				let tm = Self::get_expiration_time(&*c);
348				log::debug!("{:?}: {:?} vs {:?}", key, tm, now);
349				match tm {
350					Some(time) => {
351						if time > now {
352							log::warn!("{:?}: will removed at {:?}", key, time);
353						} else {
354							log::debug!("{:?}: expired and removed", key);
355							let mut containers = self.containers.lock().await;
356							containers.remove(&key);
357						}
358					},
359					None => (),
360				}
361			}
362		}
363		log::debug!("Check expiration done");
364	}
365
366	pub async fn keys_scan(&self, mut args: Arguments) -> ExecResult {
367		let start = Self::extract_index(args.pop_front())?;
368
369		let mut pattern: Option<String> = None;
370		let mut key_type: Option<String> = None;
371		let mut max_check = 100usize;
372
373		while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
374			match &subcmd.to_uppercase()[..] {
375				"MATCH" => pattern = Some(Self::extract_string(args.pop_front())?),
376				"COUNT" => max_check = Self::extract_index(args.pop_front())?,
377				"TYPE" => key_type = Some(Self::extract_string(args.pop_front())?),
378				arg => return Err(format!("Unexpected argument '{}'", arg)),
379			}
380		}
381		if let Some(key_type) = &key_type {
382			Self::check_type(&key_type[..])?;
383		}
384
385		let pattern = match pattern {
386			None => None,
387			Some(pattern) => Some(regex::bytes::Regex::new(&pattern[..]).map_err(|e|format!("{}", e))?),
388		};
389
390		let containers = self.containers.lock().await;
391
392		let mut keys = vec![];
393
394		let end = start + max_check;
395		let mut next = start;
396		for i in start..end {
397			next = i;
398			if let Some((key, container)) = containers.get_index(i) {
399				if let Some(key_type) = &key_type {
400					match container.try_lock() {
401						Err(_) => break,
402						Ok(container) => {
403							let t = Self::type_to_string(&container);
404							if key_type != t {
405								continue;
406							}
407						}
408					}
409				}
410				if let Some(pattern) = &pattern {
411					if ! pattern.is_match(&key[..]) {
412						continue;
413					}
414				}
415				keys.push(key.clone());
416			} else {
417				next = 0;
418				break;
419			}
420		}
421
422		let next = Value::Integer(next as i64);
423		let keys = Value::Array(
424			keys
425			.drain(..)
426			.map(|key| Value::Buffer(key))
427			.collect()
428		);
429		Ok(Value::Array(vec![next, keys].into()))
430	}
431}
432