lib/
cache_manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::collections::{BinaryHeap, HashMap};
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::Duration;

use crate::cache_manager_config::CacheManagerConfig;
use crate::cache_task::CacheTask;
use crate::db_cache::CacheInvalidator;

pub struct CacheManager {
    invalidators: HashMap<&'static str, Arc<dyn CacheInvalidator>>,
    priority_heap: BinaryHeap<CacheTask>,
    config: CacheManagerConfig,
    rx: mpsc::Receiver<CacheTask>,
    tx: mpsc::Sender<CacheTask>,
}

impl CacheManager {
    pub fn new(config: CacheManagerConfig) -> Self {
        let (tx, rx) = mpsc::channel::<CacheTask>();

        Self { invalidators: HashMap::default(), priority_heap: Default::default(), config, rx, tx }
    }
    pub fn register<T>(&mut self, invalidator: Arc<T>)
    where
        T: CacheInvalidator + 'static,
    {
        if self.invalidators.contains_key(invalidator.cache_id()) {
            panic!("#{} cache currently registered!", invalidator.cache_id());
        }
        self.invalidators.insert(invalidator.cache_id(), invalidator);
    }

    pub fn sender(&self) -> mpsc::Sender<CacheTask> {
        self.tx.clone()
    }

    pub fn start(mut self) {
        thread::spawn(move || {
            let max_pending_ms_await = Duration::from_millis(self.config.max_pending_ms_await());
            let max_pending_bulk_ms_await = Duration::from_millis(self.config.max_pending_bulk_ms_await());
            let mut tasks_pushed = 0;

            loop {
                tasks_pushed = 0;
                if let Ok(task) = self.rx.recv_timeout(max_pending_ms_await) {
                    self.priority_heap.push(task);
                    tasks_pushed += 1;

                    while tasks_pushed < self.config.max_task_drain_size() {
                        match self.rx.recv_timeout(max_pending_bulk_ms_await) {
                            Ok(task) => {
                                self.priority_heap.push(task);
                                tasks_pushed += 1;
                            }
                            Err(_) => break,
                        }
                    }
                }

                loop {
                    match self.priority_heap.peek() {
                        Some(val) if !val.is_expired() => break,
                        Some(_) => {
                            if let Some(CacheTask::INVALIDATION { cache_id, key, .. }) = self.priority_heap.pop() {
                                self.invalidators.get(cache_id)
                                    .expect("Invalidator found")
                                    .invalidate(key);
                            }
                        }
                        None => break,
                    }
                }
            }
        });
    }
}