eva_common/
cache.rs

1use crate::payload::{pack, unpack};
2use crate::{EResult, Error};
3use log::{error, trace};
4use serde::{Serialize, de::DeserializeOwned};
5use sqlx::{
6    ConnectOptions, Pool, Sqlite,
7    sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteSynchronous},
8};
9use std::str::FromStr;
10use std::time::Duration;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12use tokio::task::JoinHandle;
13
14#[inline]
15fn now() -> Duration {
16    SystemTime::now()
17        .duration_since(UNIX_EPOCH)
18        .expect("time went backwards")
19}
20
21#[allow(clippy::module_name_repetitions)]
22pub struct TtlCache {
23    path: String,
24    ttl: Duration,
25    pool: Pool<Sqlite>,
26    fut_cleaner: JoinHandle<()>,
27}
28
29impl Drop for TtlCache {
30    fn drop(&mut self) {
31        self.fut_cleaner.abort();
32    }
33}
34
35const CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
36
37impl TtlCache {
38    #[allow(clippy::cast_possible_wrap)]
39    pub async fn create(
40        path: &str,
41        ttl: Duration,
42        timeout: Duration,
43        pool_size: u32,
44    ) -> EResult<Self> {
45        let connection_options = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))?
46            .create_if_missing(true)
47            .synchronous(SqliteSynchronous::Extra)
48            .busy_timeout(timeout)
49            .log_statements(log::LevelFilter::Trace)
50            .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(2));
51        let pool = SqlitePoolOptions::new()
52            .max_connections(pool_size)
53            .acquire_timeout(timeout)
54            .connect_with(connection_options)
55            .await?;
56        sqlx::query("CREATE TABLE IF NOT EXISTS kv(k VARCHAR(256), v BLOB, t INT, PRIMARY KEY(k))")
57            .execute(&pool)
58            .await?;
59        sqlx::query("CREATE INDEX IF NOT EXISTS kv_t ON kv(t)")
60            .execute(&pool)
61            .await?;
62        let p = pool.clone();
63        let db_path = path.to_owned();
64        let fut_cleaner = tokio::spawn(async move {
65            let mut next = Instant::now() + CLEANUP_INTERVAL;
66            loop {
67                trace!("cleaning up {} cache", db_path);
68                if let Err(e) = sqlx::query("DELETE FROM kv WHERE t < ?")
69                    .bind((now() - ttl).as_secs() as i64)
70                    .execute(&p)
71                    .await
72                {
73                    error!("cache {} error: {}", db_path, e);
74                }
75                let t = Instant::now();
76                if next > t {
77                    tokio::time::sleep(next - t).await;
78                }
79                next += CLEANUP_INTERVAL;
80            }
81        });
82        Ok(Self {
83            path: path.to_owned(),
84            ttl,
85            pool,
86            fut_cleaner,
87        })
88    }
89    #[allow(clippy::cast_possible_wrap)]
90    pub async fn set<V: Serialize>(&self, key: &str, value: &V) -> EResult<()> {
91        trace!("setting {} key {}", self.path, key);
92        if key.len() > 256 {
93            return Err(Error::invalid_data("key too long"));
94        }
95        sqlx::query("INSERT OR REPLACE INTO kv (k, v, t) VALUES (?, ?, ?)")
96            .bind(key)
97            .bind(pack(value)?)
98            .bind(now().as_secs() as i64)
99            .execute(&self.pool)
100            .await?;
101        Ok(())
102    }
103    pub async fn get<V: DeserializeOwned>(&self, key: &str) -> EResult<Option<V>> {
104        trace!("getting {} key {}", self.path, key);
105        let val: Option<(Vec<u8>,)> = sqlx::query_as("SELECT v FROM kv WHERE k = ? AND t > ?")
106            .bind(key)
107            .bind((now() - self.ttl).as_secs_f64())
108            .fetch_optional(&self.pool)
109            .await?;
110        if let Some(v) = val {
111            Ok(Some(unpack(&v.0)?))
112        } else {
113            Ok(None)
114        }
115    }
116    pub async fn delete(&self, key: &str) -> EResult<()> {
117        trace!("deleting {} key {}", self.path, key);
118        sqlx::query("DELETE FROM kv WHERE k = ?")
119            .bind(key)
120            .execute(&self.pool)
121            .await?;
122        Ok(())
123    }
124    pub async fn purge(&self) -> EResult<()> {
125        trace!("deleting all keys in {}", self.path);
126        sqlx::query("DELETE FROM kv").execute(&self.pool).await?;
127        Ok(())
128    }
129}