hyprstream_core/storage/
cache.rs

1use std::sync::Arc;
2use std::time::{SystemTime, Duration, UNIX_EPOCH};
3use tokio::sync::RwLock;
4use tonic::Status;
5
6/// Shared cache eviction manager that can be used across different storage backends.
7#[derive(Clone)]
8pub struct CacheManager {
9    ttl: Option<u64>,
10    last_eviction: Arc<RwLock<SystemTime>>,
11    min_eviction_interval: Duration,
12}
13
14impl CacheManager {
15    /// Creates a new cache manager with the specified TTL.
16    pub fn new(ttl: Option<u64>) -> Self {
17        Self {
18            ttl,
19            last_eviction: Arc::new(RwLock::new(SystemTime::now())),
20            min_eviction_interval: Duration::from_secs(60), // Default 60s between evictions
21        }
22    }
23
24    /// Sets a custom minimum interval between evictions.
25    pub fn set_min_eviction_interval(&mut self, interval: Duration) {
26        self.min_eviction_interval = interval;
27    }
28
29    /// Checks if eviction should be performed based on TTL and rate limiting.
30    /// Returns the cutoff timestamp if eviction should proceed, None otherwise.
31    pub async fn should_evict(&self) -> Result<Option<i64>, Status> {
32        match self.ttl {
33            None | Some(0) => Ok(None), // No TTL or TTL=0 means no eviction
34            Some(ttl) => {
35                let now = SystemTime::now();
36                let last = *self.last_eviction.read().await;
37                
38                // Check if enough time has passed since last eviction
39                if now.duration_since(last).unwrap_or(Duration::from_secs(0)) < self.min_eviction_interval {
40                    return Ok(None);
41                }
42
43                // Calculate cutoff timestamp
44                let cutoff = now
45                    .duration_since(UNIX_EPOCH)
46                    .map_err(|e| Status::internal(e.to_string()))?
47                    .as_secs() as i64
48                    - ttl as i64;
49
50                // Update last eviction time
51                *self.last_eviction.write().await = now;
52
53                Ok(Some(cutoff))
54            }
55        }
56    }
57
58    /// Generates an optimized SQL query for evicting expired entries.
59    pub fn eviction_query(&self, cutoff: i64) -> String {
60        format!(
61            "DELETE FROM metrics USING (
62                SELECT timestamp 
63                FROM metrics 
64                WHERE timestamp < {} 
65                LIMIT 10000
66            ) as expired 
67            WHERE metrics.timestamp = expired.timestamp",
68            cutoff
69        )
70    }
71}
72
73/// Trait for storage backends that support cache eviction.
74#[async_trait::async_trait]
75pub trait CacheEviction {
76    /// Executes the eviction query in the background.
77    async fn execute_eviction(&self, query: &str) -> Result<(), Status>;
78}