1use crate::payload::{pack, unpack};
2use crate::{EResult, Error};
3use log::{error, trace};
4use serde::{Serialize, de::DeserializeOwned};
5use sqlx::{
6 ConnectOptions, Pool, Sqlite,
7 sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteSynchronous},
8};
9use std::str::FromStr;
10use std::time::Duration;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12use tokio::task::JoinHandle;
13
14#[inline]
15fn now() -> Duration {
16 SystemTime::now()
17 .duration_since(UNIX_EPOCH)
18 .expect("time went backwards")
19}
20
21#[allow(clippy::module_name_repetitions)]
22pub struct TtlCache {
23 path: String,
24 ttl: Duration,
25 pool: Pool<Sqlite>,
26 fut_cleaner: JoinHandle<()>,
27}
28
29impl Drop for TtlCache {
30 fn drop(&mut self) {
31 self.fut_cleaner.abort();
32 }
33}
34
35const CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
36
37impl TtlCache {
38 #[allow(clippy::cast_possible_wrap)]
39 pub async fn create(
40 path: &str,
41 ttl: Duration,
42 timeout: Duration,
43 pool_size: u32,
44 ) -> EResult<Self> {
45 let connection_options = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))?
46 .create_if_missing(true)
47 .synchronous(SqliteSynchronous::Extra)
48 .busy_timeout(timeout)
49 .log_statements(log::LevelFilter::Trace)
50 .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(2));
51 let pool = SqlitePoolOptions::new()
52 .max_connections(pool_size)
53 .acquire_timeout(timeout)
54 .connect_with(connection_options)
55 .await?;
56 sqlx::query("CREATE TABLE IF NOT EXISTS kv(k VARCHAR(256), v BLOB, t INT, PRIMARY KEY(k))")
57 .execute(&pool)
58 .await?;
59 sqlx::query("CREATE INDEX IF NOT EXISTS kv_t ON kv(t)")
60 .execute(&pool)
61 .await?;
62 let p = pool.clone();
63 let db_path = path.to_owned();
64 let fut_cleaner = tokio::spawn(async move {
65 let mut next = Instant::now() + CLEANUP_INTERVAL;
66 loop {
67 trace!("cleaning up {} cache", db_path);
68 if let Err(e) = sqlx::query("DELETE FROM kv WHERE t < ?")
69 .bind((now() - ttl).as_secs() as i64)
70 .execute(&p)
71 .await
72 {
73 error!("cache {} error: {}", db_path, e);
74 }
75 let t = Instant::now();
76 if next > t {
77 tokio::time::sleep(next - t).await;
78 }
79 next += CLEANUP_INTERVAL;
80 }
81 });
82 Ok(Self {
83 path: path.to_owned(),
84 ttl,
85 pool,
86 fut_cleaner,
87 })
88 }
89 #[allow(clippy::cast_possible_wrap)]
90 pub async fn set<V: Serialize>(&self, key: &str, value: &V) -> EResult<()> {
91 trace!("setting {} key {}", self.path, key);
92 if key.len() > 256 {
93 return Err(Error::invalid_data("key too long"));
94 }
95 sqlx::query("INSERT OR REPLACE INTO kv (k, v, t) VALUES (?, ?, ?)")
96 .bind(key)
97 .bind(pack(value)?)
98 .bind(now().as_secs() as i64)
99 .execute(&self.pool)
100 .await?;
101 Ok(())
102 }
103 pub async fn get<V: DeserializeOwned>(&self, key: &str) -> EResult<Option<V>> {
104 trace!("getting {} key {}", self.path, key);
105 let val: Option<(Vec<u8>,)> = sqlx::query_as("SELECT v FROM kv WHERE k = ? AND t > ?")
106 .bind(key)
107 .bind((now() - self.ttl).as_secs_f64())
108 .fetch_optional(&self.pool)
109 .await?;
110 if let Some(v) = val {
111 Ok(Some(unpack(&v.0)?))
112 } else {
113 Ok(None)
114 }
115 }
116 pub async fn delete(&self, key: &str) -> EResult<()> {
117 trace!("deleting {} key {}", self.path, key);
118 sqlx::query("DELETE FROM kv WHERE k = ?")
119 .bind(key)
120 .execute(&self.pool)
121 .await?;
122 Ok(())
123 }
124 pub async fn purge(&self) -> EResult<()> {
125 trace!("deleting all keys in {}", self.path);
126 sqlx::query("DELETE FROM kv").execute(&self.pool).await?;
127 Ok(())
128 }
129}