autocache 0.2.1

automatic cache management
Documentation
use std::sync::Arc;

use anyhow::{bail, Result};
use arc_swap::ArcSwapOption;
use chrono::prelude::*;
use futures::future::BoxFuture;

use crate::{cache::Cache, entry::EntryTrait};

#[derive(Clone)]
struct CacheItem<V> {
    time_to_remove_ms: Option<i64>,
    value: V,
}

impl<V> CacheItem<V> {
    fn need_to_remove(&self) -> bool {
        if self.time_to_remove_ms.is_none() {
            return false;
        }

        Utc.timestamp_millis_opt(self.time_to_remove_ms.unwrap())
            .unwrap()
            < chrono::Utc::now()
    }
}

pub struct TtlCache<K, V> {
    data: Arc<parking_lot::RwLock<im::OrdMap<K, CacheItem<V>>>>,

    ttl: Option<std::time::Duration>,
    expire_listener:
        ArcSwapOption<Box<dyn Fn(Vec<(K, V)>) -> BoxFuture<'static, ()> + Send + Sync>>,

    stop_notifier: ArcSwapOption<tokio::sync::Notify>,
}

impl<K, V> TtlCache<K, V> {
    pub fn new(ttl: Option<std::time::Duration>) -> Self {
        Self {
            data: Arc::new(parking_lot::RwLock::new(im::OrdMap::new())),
            ttl,
            expire_listener: None.into(),

            stop_notifier: None.into(),
        }
    }

    pub fn new_with_expire_listener(
        ttl: Option<std::time::Duration>,
        listener: impl Fn(Vec<(K, V)>) -> BoxFuture<'static, ()> + Send + Sync + 'static,
    ) -> Self {
        Self {
            data: Arc::new(parking_lot::RwLock::new(im::OrdMap::new())),
            ttl,
            expire_listener: ArcSwapOption::new(Some(Arc::new(Box::new(listener)))),

            stop_notifier: None.into(),
        }
    }

    pub fn set_expire_listener(
        &self,
        listener: impl Fn(Vec<(K, V)>) -> BoxFuture<'static, ()> + Send + Sync + 'static,
    ) -> Result<()> {
        if self.stop_notifier.load().is_some() {
            bail!("expire listener already set");
        }

        self.expire_listener
            .store(Some(Arc::new(Box::new(listener))));

        Ok(())
    }
}

impl<K, V> Cache for TtlCache<K, V>
where
    K: Ord + Sync + Send + Clone,
    V: Clone + Sync + Send,
{
    type Key = K;
    type Value = V;

    async fn mget(&self, keys: &[Self::Key]) -> Result<Vec<Self::Value>> {
        Ok(keys
            .iter()
            .filter_map(|key| self.data.read().get(key).cloned().map(|c| c.value))
            .collect::<Vec<_>>())
    }

    async fn mset(&self, kvs: &[(Self::Key, Self::Value)]) -> Result<()> {
        for kv in kvs.into_iter() {
            self.data.write().insert(
                kv.0.clone(),
                CacheItem {
                    time_to_remove_ms: self.ttl.map(|ttl| {
                        (chrono::Utc::now() + chrono::Duration::from_std(ttl).unwrap())
                            .timestamp_millis()
                    }),
                    value: kv.1.clone(),
                },
            );
        }

        Ok(())
    }

    async fn mdel(&self, keys: &[Self::Key]) -> Result<()> {
        keys.iter().for_each(|key| {
            self.data.write().remove(key);
        });

        Ok(())
    }

    fn name(&self) -> &'static str {
        "ttlcache"
    }
}

impl<K, V> TtlCache<K, V>
where
    K: Ord + Sync + Send + Clone + 'static,
    V: Clone + Sync + Send + EntryTrait<K> + 'static,
{
    async fn check_expires(
        cache: Arc<parking_lot::RwLock<im::OrdMap<K, CacheItem<V>>>>,
        expire_listener: Arc<Box<dyn Fn(Vec<(K, V)>) -> BoxFuture<'static, ()> + Send + Sync>>,
    ) {
        let cache_snap = cache.read().clone();

        let mut expires = Vec::with_capacity(128);

        for (key, ci) in cache_snap.iter() {
            if ci.value.is_expired() {
                expires.push((key.clone(), ci.value.clone()));

                if expires.len() == 100 {
                    expire_listener(expires.clone()).await;
                    expires.clear();
                }
            }
        }

        if !expires.is_empty() {
            expire_listener(expires.clone()).await;
        }
    }

    fn cleanup_ttl(cache: Arc<parking_lot::RwLock<im::OrdMap<K, CacheItem<V>>>>) {
        let cache_snap = cache.read().clone();

        let keys_to_remove = cache_snap
            .iter()
            .filter_map(|(key, ci)| {
                if ci.need_to_remove() {
                    Some(key.clone())
                } else {
                    None
                }
            })
            .take(100)
            .collect::<Vec<_>>();

        for key in keys_to_remove.iter() {
            cache.write().remove(key);
        }
    }

    pub fn start(&self) -> Result<()> {
        if self.stop_notifier.load().is_some() {
            return Ok(());
        }

        let Some(listener) = self.expire_listener.load().clone() else {
            bail!("expire listener is none");
        };

        let notifier = Arc::new(tokio::sync::Notify::new());
        self.stop_notifier.store(Some(notifier.clone()));

        let mut ticker = tokio::time::interval(std::time::Duration::from_secs(10));
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        let cache = self.data.clone();
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = ticker.tick()=> {
                        Self::cleanup_ttl(cache.clone());
                        Self::check_expires(cache.clone(), listener.clone()).await;
                    }

                    _ = notifier.notified() => {
                        return;
                    }
                }
            }
        });

        Ok(())
    }

    pub fn stop(&self) -> Result<()> {
        if let Some(s) = self.stop_notifier.load().as_ref() {
            s.notify_one();
        }

        Ok(())
    }
}