hyprstream_core/storage/
cache.rs1use std::sync::Arc;
2use std::time::{SystemTime, Duration, UNIX_EPOCH};
3use tokio::sync::RwLock;
4use tonic::Status;
5
6#[derive(Clone)]
8pub struct CacheManager {
9 ttl: Option<u64>,
10 last_eviction: Arc<RwLock<SystemTime>>,
11 min_eviction_interval: Duration,
12}
13
14impl CacheManager {
15 pub fn new(ttl: Option<u64>) -> Self {
17 Self {
18 ttl,
19 last_eviction: Arc::new(RwLock::new(SystemTime::now())),
20 min_eviction_interval: Duration::from_secs(60), }
22 }
23
24 pub fn set_min_eviction_interval(&mut self, interval: Duration) {
26 self.min_eviction_interval = interval;
27 }
28
29 pub async fn should_evict(&self) -> Result<Option<i64>, Status> {
32 match self.ttl {
33 None | Some(0) => Ok(None), Some(ttl) => {
35 let now = SystemTime::now();
36 let last = *self.last_eviction.read().await;
37
38 if now.duration_since(last).unwrap_or(Duration::from_secs(0)) < self.min_eviction_interval {
40 return Ok(None);
41 }
42
43 let cutoff = now
45 .duration_since(UNIX_EPOCH)
46 .map_err(|e| Status::internal(e.to_string()))?
47 .as_secs() as i64
48 - ttl as i64;
49
50 *self.last_eviction.write().await = now;
52
53 Ok(Some(cutoff))
54 }
55 }
56 }
57
58 pub fn eviction_query(&self, cutoff: i64) -> String {
60 format!(
61 "DELETE FROM metrics USING (
62 SELECT timestamp
63 FROM metrics
64 WHERE timestamp < {}
65 LIMIT 10000
66 ) as expired
67 WHERE metrics.timestamp = expired.timestamp",
68 cutoff
69 )
70 }
71}
72
73#[async_trait::async_trait]
75pub trait CacheEviction {
76 async fn execute_eviction(&self, query: &str) -> Result<(), Status>;
78}