use std::collections::{BinaryHeap, HashMap};
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::Duration;
use crate::cache_manager_config::CacheManagerConfig;
use crate::cache_task::CacheTask;
use crate::db_cache::CacheInvalidator;
pub struct CacheManager {
invalidators: HashMap<&'static str, Arc<dyn CacheInvalidator>>,
priority_heap: BinaryHeap<CacheTask>,
config: CacheManagerConfig,
rx: mpsc::Receiver<CacheTask>,
tx: mpsc::Sender<CacheTask>,
}
impl CacheManager {
pub fn new(config: CacheManagerConfig) -> Self {
let (tx, rx) = mpsc::channel::<CacheTask>();
Self { invalidators: HashMap::default(), priority_heap: Default::default(), config, rx, tx }
}
pub fn register<T>(&mut self, invalidator: Arc<T>)
where
T: CacheInvalidator + 'static,
{
if self.invalidators.contains_key(invalidator.cache_id()) {
panic!("#{} cache currently registered!", invalidator.cache_id());
}
self.invalidators.insert(invalidator.cache_id(), invalidator);
}
pub fn sender(&self) -> mpsc::Sender<CacheTask> {
self.tx.clone()
}
pub fn start(mut self) {
thread::spawn(move || {
let max_pending_ms_await = Duration::from_millis(self.config.max_pending_ms_await());
let max_pending_bulk_ms_await = Duration::from_millis(self.config.max_pending_bulk_ms_await());
let mut tasks_pushed = 0;
loop {
tasks_pushed = 0;
if let Ok(task) = self.rx.recv_timeout(max_pending_ms_await) {
self.priority_heap.push(task);
tasks_pushed += 1;
while tasks_pushed < self.config.max_task_drain_size() {
match self.rx.recv_timeout(max_pending_bulk_ms_await) {
Ok(task) => {
self.priority_heap.push(task);
tasks_pushed += 1;
}
Err(_) => break,
}
}
}
loop {
match self.priority_heap.peek() {
Some(val) if !val.is_expired() => break,
Some(_) => {
if let Some(CacheTask::INVALIDATION { cache_id, key, .. }) = self.priority_heap.pop() {
self.invalidators.get(cache_id)
.expect("Invalidator found")
.invalidate(key);
}
}
None => break,
}
}
}
});
}
}