radish_database/
expire.rs

1/* Copyright (c) 2020 Dmitry Shatilov <shatilov dot diman at gmail dot com>
2 * 
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Affero General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 * GNU Affero General Public License for more details.
12
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use std::time::{SystemTime, Duration};
18use std::collections::{BTreeMap, HashSet};
19
20type Key = super::Key;
21
22pub struct ExpireController {
23	expires_queue: BTreeMap<SystemTime, HashSet<Key>>,
24}
25
26impl ExpireController {
27	pub fn new() -> Self {
28		Self {
29			expires_queue: BTreeMap::new(),
30		}
31	}
32
33	pub fn pop_now_and_expired_keys(&mut self) -> (SystemTime, HashSet<Key>) {
34		let pivot = SystemTime::now() + Duration::from_micros(1);
35		let tail = self.expires_queue.split_off(&pivot);
36		let mut expireds = std::mem::replace(&mut self.expires_queue, tail);
37		log::debug!("{:?} && {:?}", expireds, self.expires_queue);
38
39		let mut times = expireds.keys().cloned().collect::<Vec<SystemTime>>();
40		let mut out_keys = HashSet::new();
41		while let Some(time) = times.pop() {
42			if let Some(keys) = expireds.remove(&time) {
43				for key in keys {
44					out_keys.insert(key);
45				}
46			}
47		}
48		(pivot, out_keys)
49	}
50
51	pub fn expire_key_at(&mut self, key: &Key, timepoint: SystemTime) {
52		log::debug!("{:?}: will expired at {:?}", key, timepoint);
53
54		let keys = self.expires_queue.entry(timepoint).or_insert_with(||HashSet::new());
55		keys.insert(key.clone());
56	}
57}
58
59impl super::Storage {
60
61	pub async fn expire_key_at(&mut self, key: &Key, timepoint: SystemTime) {
62		let mut controller = self.expire_controller.lock().await;
63		controller.expire_key_at(key, timepoint);
64		drop(controller);
65		let awaker = self.expire_awaker.clone();
66		let mut awaker = awaker.lock().await;
67		if let Some(awaker) = &mut *awaker {
68			(*awaker)(timepoint);
69		}
70	}
71}
72