rsigma_runtime/sources/
cache.rs1use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10
11#[derive(Clone)]
13struct CacheEntry {
14 value: serde_json::Value,
15 stored_at: Instant,
16}
17
18pub struct SourceCache {
23 entries: Mutex<HashMap<String, CacheEntry>>,
24 db: Option<Mutex<rusqlite::Connection>>,
25 ttl: Option<Duration>,
26}
27
28impl SourceCache {
29 pub fn new() -> Self {
31 Self {
32 entries: Mutex::new(HashMap::new()),
33 db: None,
34 ttl: None,
35 }
36 }
37
38 pub fn with_ttl(ttl: Duration) -> Self {
41 Self {
42 entries: Mutex::new(HashMap::new()),
43 db: None,
44 ttl: Some(ttl),
45 }
46 }
47
48 pub fn with_sqlite(path: &Path) -> Result<Self, String> {
53 Self::with_sqlite_and_ttl(path, None)
54 }
55
56 pub fn with_sqlite_and_ttl(path: &Path, ttl: Option<Duration>) -> Result<Self, String> {
58 let conn = rusqlite::Connection::open(path)
59 .map_err(|e| format!("failed to open source cache DB: {e}"))?;
60
61 conn.execute_batch(
62 "CREATE TABLE IF NOT EXISTS source_cache (
63 source_id TEXT PRIMARY KEY,
64 value TEXT NOT NULL,
65 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
66 )",
67 )
68 .map_err(|e| format!("failed to create source_cache table: {e}"))?;
69
70 let entries = {
71 let mut map = HashMap::new();
72 let mut stmt = conn
73 .prepare("SELECT source_id, value FROM source_cache")
74 .map_err(|e| format!("failed to prepare SELECT: {e}"))?;
75 let rows = stmt
76 .query_map([], |row| {
77 let id: String = row.get(0)?;
78 let val: String = row.get(1)?;
79 Ok((id, val))
80 })
81 .map_err(|e| format!("failed to query source_cache: {e}"))?;
82
83 for (id, val_str) in rows.flatten() {
84 if let Ok(val) = serde_json::from_str(&val_str) {
85 map.insert(
86 id,
87 CacheEntry {
88 value: val,
89 stored_at: Instant::now(),
90 },
91 );
92 }
93 }
94 map
95 };
96
97 Ok(Self {
98 entries: Mutex::new(entries),
99 db: Some(Mutex::new(conn)),
100 ttl,
101 })
102 }
103
104 pub fn store(&self, source_id: &str, value: &serde_json::Value) {
106 {
107 let mut entries = self.entries.lock().unwrap();
108 entries.insert(
109 source_id.to_string(),
110 CacheEntry {
111 value: value.clone(),
112 stored_at: Instant::now(),
113 },
114 );
115 }
116
117 if let Some(db) = &self.db {
118 let conn = db.lock().unwrap();
119 let val_str = serde_json::to_string(value).unwrap_or_default();
120 let _ = conn.execute(
121 "INSERT OR REPLACE INTO source_cache (source_id, value, updated_at) VALUES (?1, ?2, datetime('now'))",
122 rusqlite::params![source_id, val_str],
123 );
124 }
125 }
126
127 pub fn get(&self, source_id: &str) -> Option<serde_json::Value> {
130 let entries = self.entries.lock().unwrap();
131 let entry = entries.get(source_id)?;
132
133 if let Some(ttl) = self.ttl
134 && entry.stored_at.elapsed() > ttl
135 {
136 return None;
137 }
138
139 Some(entry.value.clone())
140 }
141
142 pub fn invalidate(&self, source_id: &str) {
144 {
145 let mut entries = self.entries.lock().unwrap();
146 entries.remove(source_id);
147 }
148
149 if let Some(db) = &self.db {
150 let conn = db.lock().unwrap();
151 let _ = conn.execute(
152 "DELETE FROM source_cache WHERE source_id = ?1",
153 rusqlite::params![source_id],
154 );
155 }
156 }
157
158 pub fn clear(&self) {
160 {
161 let mut entries = self.entries.lock().unwrap();
162 entries.clear();
163 }
164
165 if let Some(db) = &self.db {
166 let conn = db.lock().unwrap();
167 let _ = conn.execute("DELETE FROM source_cache", []);
168 }
169 }
170
171 pub fn evict_expired(&self) {
174 let Some(ttl) = self.ttl else { return };
175
176 let expired_ids: Vec<String> = {
177 let entries = self.entries.lock().unwrap();
178 entries
179 .iter()
180 .filter(|(_, entry)| entry.stored_at.elapsed() > ttl)
181 .map(|(id, _)| id.clone())
182 .collect()
183 };
184
185 if expired_ids.is_empty() {
186 return;
187 }
188
189 {
190 let mut entries = self.entries.lock().unwrap();
191 for id in &expired_ids {
192 entries.remove(id);
193 }
194 }
195
196 if let Some(db) = &self.db {
197 let conn = db.lock().unwrap();
198 for id in &expired_ids {
199 let _ = conn.execute(
200 "DELETE FROM source_cache WHERE source_id = ?1",
201 rusqlite::params![id],
202 );
203 }
204 }
205 }
206
207 pub fn len(&self) -> usize {
209 let entries = self.entries.lock().unwrap();
210 entries.len()
211 }
212
213 pub fn is_empty(&self) -> bool {
215 self.len() == 0
216 }
217
218 pub fn ttl(&self) -> Option<Duration> {
220 self.ttl
221 }
222}
223
224impl Default for SourceCache {
225 fn default() -> Self {
226 Self::new()
227 }
228}