1use crate::payload::{pack, unpack};
2use crate::{EResult, Error};
3use log::{error, trace};
4use serde::{de::DeserializeOwned, Serialize};
5use sqlx::{
6 sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteSynchronous},
7 ConnectOptions, Pool, Sqlite,
8};
9use std::str::FromStr;
10use std::time::Duration;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12use tokio::task::JoinHandle;
13
14#[inline]
15fn now() -> Duration {
16 SystemTime::now()
17 .duration_since(UNIX_EPOCH)
18 .expect("time went backwards")
19}
20
21#[allow(clippy::module_name_repetitions)]
22pub struct TtlCache {
23 path: String,
24 ttl: Duration,
25 pool: Pool<Sqlite>,
26 fut_cleaner: JoinHandle<()>,
27}
28
29impl Drop for TtlCache {
30 fn drop(&mut self) {
31 self.fut_cleaner.abort();
32 }
33}
34
35const CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
36
37impl TtlCache {
38 #[allow(clippy::cast_possible_wrap)]
39 pub async fn create(
40 path: &str,
41 ttl: Duration,
42 timeout: Duration,
43 pool_size: u32,
44 ) -> EResult<Self> {
45 let mut connection_options = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))?
46 .create_if_missing(true)
47 .synchronous(SqliteSynchronous::Extra)
48 .busy_timeout(timeout);
49 connection_options
50 .log_statements(log::LevelFilter::Trace)
51 .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(2));
52 let pool = SqlitePoolOptions::new()
53 .max_connections(pool_size)
54 .acquire_timeout(timeout)
55 .connect_with(connection_options)
56 .await?;
57 sqlx::query("CREATE TABLE IF NOT EXISTS kv(k VARCHAR(256), v BLOB, t INT, PRIMARY KEY(k))")
58 .execute(&pool)
59 .await?;
60 sqlx::query("CREATE INDEX IF NOT EXISTS kv_t ON kv(t)")
61 .execute(&pool)
62 .await?;
63 let p = pool.clone();
64 let db_path = path.to_owned();
65 let fut_cleaner = tokio::spawn(async move {
66 let mut next = Instant::now() + CLEANUP_INTERVAL;
67 loop {
68 trace!("cleaning up {} cache", db_path);
69 if let Err(e) = sqlx::query("DELETE FROM kv WHERE t < ?")
70 .bind((now() - ttl).as_secs() as i64)
71 .execute(&p)
72 .await
73 {
74 error!("cache {} error: {}", db_path, e);
75 }
76 let t = Instant::now();
77 if next > t {
78 tokio::time::sleep(next - t).await;
79 }
80 next += CLEANUP_INTERVAL;
81 }
82 });
83 Ok(Self {
84 path: path.to_owned(),
85 ttl,
86 pool,
87 fut_cleaner,
88 })
89 }
90 #[allow(clippy::cast_possible_wrap)]
91 pub async fn set<V: Serialize>(&self, key: &str, value: &V) -> EResult<()> {
92 trace!("setting {} key {}", self.path, key);
93 if key.len() > 256 {
94 return Err(Error::invalid_data("key too long"));
95 }
96 sqlx::query("INSERT OR REPLACE INTO kv (k, v, t) VALUES (?, ?, ?)")
97 .bind(key)
98 .bind(pack(value)?)
99 .bind(now().as_secs() as i64)
100 .execute(&self.pool)
101 .await?;
102 Ok(())
103 }
104 pub async fn get<V: DeserializeOwned>(&self, key: &str) -> EResult<Option<V>> {
105 trace!("getting {} key {}", self.path, key);
106 let val: Option<(Vec<u8>,)> = sqlx::query_as("SELECT v FROM kv WHERE k = ? AND t > ?")
107 .bind(key)
108 .bind((now() - self.ttl).as_secs_f64())
109 .fetch_optional(&self.pool)
110 .await?;
111 if let Some(v) = val {
112 Ok(Some(unpack(&v.0)?))
113 } else {
114 Ok(None)
115 }
116 }
117 pub async fn delete(&self, key: &str) -> EResult<()> {
118 trace!("deleting {} key {}", self.path, key);
119 sqlx::query("DELETE FROM kv WHERE k = ?")
120 .bind(key)
121 .execute(&self.pool)
122 .await?;
123 Ok(())
124 }
125 pub async fn purge(&self) -> EResult<()> {
126 trace!("deleting all keys in {}", self.path);
127 sqlx::query("DELETE FROM kv").execute(&self.pool).await?;
128 Ok(())
129 }
130}