eva_common/
cache.rs

1use crate::payload::{pack, unpack};
2use crate::{EResult, Error};
3use log::{error, trace};
4use serde::{de::DeserializeOwned, Serialize};
5use sqlx::{
6    sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteSynchronous},
7    ConnectOptions, Pool, Sqlite,
8};
9use std::str::FromStr;
10use std::time::Duration;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12use tokio::task::JoinHandle;
13
14#[inline]
15fn now() -> Duration {
16    SystemTime::now()
17        .duration_since(UNIX_EPOCH)
18        .expect("time went backwards")
19}
20
21#[allow(clippy::module_name_repetitions)]
22pub struct TtlCache {
23    path: String,
24    ttl: Duration,
25    pool: Pool<Sqlite>,
26    fut_cleaner: JoinHandle<()>,
27}
28
29impl Drop for TtlCache {
30    fn drop(&mut self) {
31        self.fut_cleaner.abort();
32    }
33}
34
35const CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
36
37impl TtlCache {
38    #[allow(clippy::cast_possible_wrap)]
39    pub async fn create(
40        path: &str,
41        ttl: Duration,
42        timeout: Duration,
43        pool_size: u32,
44    ) -> EResult<Self> {
45        let mut connection_options = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))?
46            .create_if_missing(true)
47            .synchronous(SqliteSynchronous::Extra)
48            .busy_timeout(timeout);
49        connection_options
50            .log_statements(log::LevelFilter::Trace)
51            .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(2));
52        let pool = SqlitePoolOptions::new()
53            .max_connections(pool_size)
54            .acquire_timeout(timeout)
55            .connect_with(connection_options)
56            .await?;
57        sqlx::query("CREATE TABLE IF NOT EXISTS kv(k VARCHAR(256), v BLOB, t INT, PRIMARY KEY(k))")
58            .execute(&pool)
59            .await?;
60        sqlx::query("CREATE INDEX IF NOT EXISTS kv_t ON kv(t)")
61            .execute(&pool)
62            .await?;
63        let p = pool.clone();
64        let db_path = path.to_owned();
65        let fut_cleaner = tokio::spawn(async move {
66            let mut next = Instant::now() + CLEANUP_INTERVAL;
67            loop {
68                trace!("cleaning up {} cache", db_path);
69                if let Err(e) = sqlx::query("DELETE FROM kv WHERE t < ?")
70                    .bind((now() - ttl).as_secs() as i64)
71                    .execute(&p)
72                    .await
73                {
74                    error!("cache {} error: {}", db_path, e);
75                }
76                let t = Instant::now();
77                if next > t {
78                    tokio::time::sleep(next - t).await;
79                }
80                next += CLEANUP_INTERVAL;
81            }
82        });
83        Ok(Self {
84            path: path.to_owned(),
85            ttl,
86            pool,
87            fut_cleaner,
88        })
89    }
90    #[allow(clippy::cast_possible_wrap)]
91    pub async fn set<V: Serialize>(&self, key: &str, value: &V) -> EResult<()> {
92        trace!("setting {} key {}", self.path, key);
93        if key.len() > 256 {
94            return Err(Error::invalid_data("key too long"));
95        }
96        sqlx::query("INSERT OR REPLACE INTO kv (k, v, t) VALUES (?, ?, ?)")
97            .bind(key)
98            .bind(pack(value)?)
99            .bind(now().as_secs() as i64)
100            .execute(&self.pool)
101            .await?;
102        Ok(())
103    }
104    pub async fn get<V: DeserializeOwned>(&self, key: &str) -> EResult<Option<V>> {
105        trace!("getting {} key {}", self.path, key);
106        let val: Option<(Vec<u8>,)> = sqlx::query_as("SELECT v FROM kv WHERE k = ? AND t > ?")
107            .bind(key)
108            .bind((now() - self.ttl).as_secs_f64())
109            .fetch_optional(&self.pool)
110            .await?;
111        if let Some(v) = val {
112            Ok(Some(unpack(&v.0)?))
113        } else {
114            Ok(None)
115        }
116    }
117    pub async fn delete(&self, key: &str) -> EResult<()> {
118        trace!("deleting {} key {}", self.path, key);
119        sqlx::query("DELETE FROM kv WHERE k = ?")
120            .bind(key)
121            .execute(&self.pool)
122            .await?;
123        Ok(())
124    }
125    pub async fn purge(&self) -> EResult<()> {
126        trace!("deleting all keys in {}", self.path);
127        sqlx::query("DELETE FROM kv").execute(&self.pool).await?;
128        Ok(())
129    }
130}