Skip to main content

rsigma_runtime/sources/
cache.rs

1//! Source resolution cache with in-memory and optional SQLite persistence.
2//!
3//! Stores last-known-good values so that `on_error: use_cached` can serve
4//! stale data when a source fetch fails. Supports optional TTL-based expiration.
5
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// A cached entry with its stored timestamp.
12#[derive(Clone)]
13struct CacheEntry {
14    value: serde_json::Value,
15    stored_at: Instant,
16}
17
18/// Thread-safe cache for resolved source data.
19///
20/// Provides an in-memory layer with optional SQLite-backed disk persistence
21/// and optional TTL-based expiration.
22pub struct SourceCache {
23    entries: Mutex<HashMap<String, CacheEntry>>,
24    db: Option<Mutex<rusqlite::Connection>>,
25    ttl: Option<Duration>,
26}
27
28impl SourceCache {
29    /// Create a new in-memory-only cache (no TTL).
30    pub fn new() -> Self {
31        Self {
32            entries: Mutex::new(HashMap::new()),
33            db: None,
34            ttl: None,
35        }
36    }
37
38    /// Create a new in-memory-only cache with a TTL.
39    /// Entries older than `ttl` are considered expired and will not be returned.
40    pub fn with_ttl(ttl: Duration) -> Self {
41        Self {
42            entries: Mutex::new(HashMap::new()),
43            db: None,
44            ttl: Some(ttl),
45        }
46    }
47
48    /// Create a cache backed by a SQLite database at the given path.
49    ///
50    /// The table is created if it does not exist. Existing cached values
51    /// are loaded into memory on construction.
52    pub fn with_sqlite(path: &Path) -> Result<Self, String> {
53        Self::with_sqlite_and_ttl(path, None)
54    }
55
56    /// Create a SQLite-backed cache with an optional TTL.
57    pub fn with_sqlite_and_ttl(path: &Path, ttl: Option<Duration>) -> Result<Self, String> {
58        let conn = rusqlite::Connection::open(path)
59            .map_err(|e| format!("failed to open source cache DB: {e}"))?;
60
61        conn.execute_batch(
62            "CREATE TABLE IF NOT EXISTS source_cache (
63                source_id TEXT PRIMARY KEY,
64                value TEXT NOT NULL,
65                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
66            )",
67        )
68        .map_err(|e| format!("failed to create source_cache table: {e}"))?;
69
70        let entries = {
71            let mut map = HashMap::new();
72            let mut stmt = conn
73                .prepare("SELECT source_id, value FROM source_cache")
74                .map_err(|e| format!("failed to prepare SELECT: {e}"))?;
75            let rows = stmt
76                .query_map([], |row| {
77                    let id: String = row.get(0)?;
78                    let val: String = row.get(1)?;
79                    Ok((id, val))
80                })
81                .map_err(|e| format!("failed to query source_cache: {e}"))?;
82
83            for (id, val_str) in rows.flatten() {
84                if let Ok(val) = serde_json::from_str(&val_str) {
85                    map.insert(
86                        id,
87                        CacheEntry {
88                            value: val,
89                            stored_at: Instant::now(),
90                        },
91                    );
92                }
93            }
94            map
95        };
96
97        Ok(Self {
98            entries: Mutex::new(entries),
99            db: Some(Mutex::new(conn)),
100            ttl,
101        })
102    }
103
104    /// Store a resolved value in the cache (memory + disk if available).
105    pub fn store(&self, source_id: &str, value: &serde_json::Value) {
106        {
107            let mut entries = self.entries.lock().unwrap();
108            entries.insert(
109                source_id.to_string(),
110                CacheEntry {
111                    value: value.clone(),
112                    stored_at: Instant::now(),
113                },
114            );
115        }
116
117        if let Some(db) = &self.db {
118            let conn = db.lock().unwrap();
119            let val_str = serde_json::to_string(value).unwrap_or_default();
120            let _ = conn.execute(
121                "INSERT OR REPLACE INTO source_cache (source_id, value, updated_at) VALUES (?1, ?2, datetime('now'))",
122                rusqlite::params![source_id, val_str],
123            );
124        }
125    }
126
127    /// Retrieve a cached value for a source.
128    /// Returns `None` if no entry exists or if the entry has expired (when TTL is set).
129    pub fn get(&self, source_id: &str) -> Option<serde_json::Value> {
130        let entries = self.entries.lock().unwrap();
131        let entry = entries.get(source_id)?;
132
133        if let Some(ttl) = self.ttl
134            && entry.stored_at.elapsed() > ttl
135        {
136            return None;
137        }
138
139        Some(entry.value.clone())
140    }
141
142    /// Remove a cached entry (memory + disk).
143    pub fn invalidate(&self, source_id: &str) {
144        {
145            let mut entries = self.entries.lock().unwrap();
146            entries.remove(source_id);
147        }
148
149        if let Some(db) = &self.db {
150            let conn = db.lock().unwrap();
151            let _ = conn.execute(
152                "DELETE FROM source_cache WHERE source_id = ?1",
153                rusqlite::params![source_id],
154            );
155        }
156    }
157
158    /// Clear all cached entries (memory + disk).
159    pub fn clear(&self) {
160        {
161            let mut entries = self.entries.lock().unwrap();
162            entries.clear();
163        }
164
165        if let Some(db) = &self.db {
166            let conn = db.lock().unwrap();
167            let _ = conn.execute("DELETE FROM source_cache", []);
168        }
169    }
170
171    /// Remove all expired entries from the cache (memory + disk).
172    /// Only meaningful when a TTL is configured.
173    pub fn evict_expired(&self) {
174        let Some(ttl) = self.ttl else { return };
175
176        let expired_ids: Vec<String> = {
177            let entries = self.entries.lock().unwrap();
178            entries
179                .iter()
180                .filter(|(_, entry)| entry.stored_at.elapsed() > ttl)
181                .map(|(id, _)| id.clone())
182                .collect()
183        };
184
185        if expired_ids.is_empty() {
186            return;
187        }
188
189        {
190            let mut entries = self.entries.lock().unwrap();
191            for id in &expired_ids {
192                entries.remove(id);
193            }
194        }
195
196        if let Some(db) = &self.db {
197            let conn = db.lock().unwrap();
198            for id in &expired_ids {
199                let _ = conn.execute(
200                    "DELETE FROM source_cache WHERE source_id = ?1",
201                    rusqlite::params![id],
202                );
203            }
204        }
205    }
206
207    /// Returns the number of cached entries (including potentially expired ones).
208    pub fn len(&self) -> usize {
209        let entries = self.entries.lock().unwrap();
210        entries.len()
211    }
212
213    /// Returns true if the cache is empty.
214    pub fn is_empty(&self) -> bool {
215        self.len() == 0
216    }
217
218    /// Returns the configured TTL, if any.
219    pub fn ttl(&self) -> Option<Duration> {
220        self.ttl
221    }
222}
223
224impl Default for SourceCache {
225    fn default() -> Self {
226        Self::new()
227    }
228}