1use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use ciborium::Value as CborValue;
32use indexmap::IndexMap;
33use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
34use std::time::SystemTime;
35use vantage_core::{Result, error};
36use vantage_types::Record;
37
38use super::{Cache, CachedRows};
39
40const CACHE_FILE: &str = "vlive.redb";
41const TABLE_PREFIX: &str = "__vlive__";
42
43#[derive(Clone)]
46pub struct RedbCache {
47 db: Arc<Database>,
48 folder: PathBuf,
49}
50
51impl std::fmt::Debug for RedbCache {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_struct("RedbCache")
54 .field("folder", &self.folder)
55 .finish()
56 }
57}
58
59impl RedbCache {
60 pub fn open(folder: impl AsRef<Path>) -> Result<Self> {
66 let folder = folder.as_ref().to_path_buf();
67 std::fs::create_dir_all(&folder)
68 .map_err(|e| error!("Failed to create cache folder", details = e.to_string()))?;
69 let path = folder.join(CACHE_FILE);
70 let db = Database::create(&path)
71 .map_err(|e| error!("Failed to open cache redb", details = e.to_string()))?;
72 Ok(Self {
73 db: Arc::new(db),
74 folder,
75 })
76 }
77
78 pub fn folder(&self) -> &Path {
81 &self.folder
82 }
83}
84
85fn split_key(key: &str) -> (&str, String) {
90 match key.find('/') {
91 Some(i) => (&key[..i], key[i + 1..].to_string()),
92 None => (key, String::from("__root__")),
93 }
94}
95
96fn redb_table_name(root: &str) -> String {
97 format!("{}{}", TABLE_PREFIX, root)
98}
99
100fn cache_table_def(name: &str) -> TableDefinition<'_, &'static str, &'static [u8]> {
101 TableDefinition::new(name)
102}
103
104fn encode_rows(rows: &CachedRows) -> Result<Vec<u8>> {
105 let secs = rows
109 .fetched_at
110 .duration_since(SystemTime::UNIX_EPOCH)
111 .map(|d| d.as_secs() as i64)
112 .unwrap_or(0);
113
114 let row_pairs: Vec<(CborValue, CborValue)> = rows
115 .rows
116 .iter()
117 .map(|(id, rec)| {
118 let entries: Vec<(CborValue, CborValue)> = rec
119 .iter()
120 .map(|(k, v)| (CborValue::Text(k.clone()), v.clone()))
121 .collect();
122 (CborValue::Text(id.clone()), CborValue::Map(entries))
123 })
124 .collect();
125
126 let envelope = CborValue::Map(vec![
127 (
128 CborValue::Text("fetched_at".into()),
129 CborValue::Integer(secs.into()),
130 ),
131 (CborValue::Text("rows".into()), CborValue::Map(row_pairs)),
132 ]);
133
134 let mut bytes = Vec::new();
135 ciborium::ser::into_writer(&envelope, &mut bytes)
136 .map_err(|e| error!("CBOR encode failed", details = e.to_string()))?;
137 Ok(bytes)
138}
139
140fn decode_rows(bytes: &[u8]) -> Result<CachedRows> {
141 let parsed: CborValue = ciborium::de::from_reader(bytes)
142 .map_err(|e| error!("CBOR decode failed", details = e.to_string()))?;
143 let mut secs: i64 = 0;
144 let mut rows: IndexMap<String, Record<CborValue>> = IndexMap::new();
145
146 let pairs = match parsed {
147 CborValue::Map(p) => p,
148 _ => return Err(error!("RedbCache: expected envelope to be a map")),
149 };
150 for (k, v) in pairs {
151 let key = match k {
152 CborValue::Text(s) => s,
153 _ => continue,
154 };
155 match (key.as_str(), v) {
156 ("fetched_at", CborValue::Integer(i)) => {
157 secs = i64::try_from(i).unwrap_or(0);
158 }
159 ("rows", CborValue::Map(row_pairs)) => {
160 for (rk, rv) in row_pairs {
161 let id = match rk {
162 CborValue::Text(s) => s,
163 _ => continue,
164 };
165 let mut rec: Record<CborValue> = Record::new();
166 if let CborValue::Map(field_pairs) = rv {
167 for (fk, fv) in field_pairs {
168 if let CborValue::Text(name) = fk {
169 rec.insert(name, fv);
170 }
171 }
172 }
173 rows.insert(id, rec);
174 }
175 }
176 _ => {}
177 }
178 }
179
180 let fetched_at =
181 SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(secs.try_into().unwrap_or(0));
182 Ok(CachedRows { rows, fetched_at })
183}
184
185#[async_trait]
186impl Cache for RedbCache {
187 async fn get(&self, key: &str) -> Result<Option<CachedRows>> {
188 let (root, sub) = split_key(key);
189 let table_name = redb_table_name(root);
190
191 let txn = self
192 .db
193 .begin_read()
194 .map_err(|e| error!("redb begin_read failed", details = e.to_string()))?;
195 let table = match txn.open_table(cache_table_def(&table_name)) {
196 Ok(t) => t,
197 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
198 Err(e) => {
199 return Err(error!(
200 "Failed to open cache table",
201 table = table_name,
202 details = e.to_string()
203 ));
204 }
205 };
206 let bytes = table
207 .get(sub.as_str())
208 .map_err(|e| error!("redb cache get failed", details = e.to_string()))?;
209 match bytes {
210 Some(b) => Ok(Some(decode_rows(b.value())?)),
211 None => Ok(None),
212 }
213 }
214
215 async fn put(&self, key: &str, rows: CachedRows) -> Result<()> {
216 let (root, sub) = split_key(key);
217 let table_name = redb_table_name(root);
218 let bytes = encode_rows(&rows)?;
219
220 let txn = self
221 .db
222 .begin_write()
223 .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
224 {
225 let mut table = txn.open_table(cache_table_def(&table_name)).map_err(|e| {
226 error!(
227 "Failed to open cache table for write",
228 table = table_name,
229 details = e.to_string()
230 )
231 })?;
232 table
233 .insert(sub.as_str(), bytes.as_slice())
234 .map_err(|e| error!("redb cache insert failed", details = e.to_string()))?;
235 }
236 txn.commit()
237 .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
238 Ok(())
239 }
240
241 async fn invalidate_prefix(&self, prefix: &str) -> Result<()> {
242 if !prefix.contains('/') {
245 let table_name = redb_table_name(prefix);
246 let txn = self
247 .db
248 .begin_write()
249 .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
250 let _ = txn
253 .delete_table(cache_table_def(&table_name))
254 .map_err(|e| error!("delete_table failed", details = e.to_string()))?;
255 txn.commit()
256 .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
257 return Ok(());
258 }
259
260 let (root, sub_prefix) = split_key(prefix);
264 let table_name = redb_table_name(root);
265
266 let txn = self
267 .db
268 .begin_write()
269 .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
270 {
271 let mut table = match txn.open_table(cache_table_def(&table_name)) {
272 Ok(t) => t,
273 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(()),
274 Err(e) => {
275 return Err(error!(
276 "Failed to open cache table for invalidate",
277 table = table_name,
278 details = e.to_string()
279 ));
280 }
281 };
282 let mut to_delete: Vec<String> = Vec::new();
284 {
285 let iter = table
286 .iter()
287 .map_err(|e| error!("redb cache iter failed", details = e.to_string()))?;
288 for entry in iter {
289 let (k, _) = entry.map_err(|e| {
290 error!("redb cache iter entry failed", details = e.to_string())
291 })?;
292 if k.value().starts_with(&sub_prefix) {
293 to_delete.push(k.value().to_string());
294 }
295 }
296 }
297 for k in to_delete {
298 table
299 .remove(k.as_str())
300 .map_err(|e| error!("redb cache remove failed", details = e.to_string()))?;
301 }
302 if ReadableTableMetadata::len(&table).map_err(|e: redb::StorageError| {
304 error!("redb cache len failed", details = e.to_string())
305 })? == 0
306 {
307 drop(table);
308 let _ = txn
309 .delete_table(cache_table_def(&table_name))
310 .map_err(|e| error!("delete_table failed", details = e.to_string()))?;
311 }
312 }
313 txn.commit()
314 .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
315 Ok(())
316 }
317}