Skip to main content

vantage_live/cache/
redb.rs

1//! redb-backed cache. Persists `CachedRows` on disk so cache state
2//! survives process restarts.
3//!
4//! ## Layout
5//!
6//! One redb file (`vlive.redb`) inside the caller-supplied folder.
7//! Inside the file, **one redb table per `cache_key`**, namespaced as
8//! `__vlive__{cache_key}` so cache tables can't collide with whatever
9//! else the user keeps in the same folder. Sub-keys inside that table
10//! are the part after the first `/` of the cache key (`page_1`,
11//! `id/foo`, etc.). Values are CBOR-encoded `CachedRows`.
12//!
13//! Per-table layout makes the hot invalidation path cheap:
14//! `invalidate_prefix("clients")` matches the whole cache_key →
15//! `delete_table("__vlive__clients")` is O(1)-ish (just unlinks the
16//! tree root). Sub-prefix invalidates fall back to scan + delete inside
17//! the table, but v1 of LiveTable always passes the bare cache_key.
18//!
19//! ## Concurrency
20//!
21//! redb takes an OS-level exclusive lock on its file — only one
22//! process can open the cache folder at a time. Trying to open the
23//! same folder from a second process returns
24//! `redb::DatabaseError::DatabaseAlreadyOpen`. If you need cross-process
25//! cache sharing, this isn't the layer for it (network cache: Redis).
26
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use ciborium::Value as CborValue;
32use indexmap::IndexMap;
33use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
34use std::time::SystemTime;
35use vantage_core::{Result, error};
36use vantage_types::Record;
37
38use super::{Cache, CachedRows};
39
40const CACHE_FILE: &str = "vlive.redb";
41const TABLE_PREFIX: &str = "__vlive__";
42
43/// Redb-backed cache. Cheap to clone — the inner `Arc<Database>` is
44/// shared.
45#[derive(Clone)]
46pub struct RedbCache {
47    db: Arc<Database>,
48    folder: PathBuf,
49}
50
51impl std::fmt::Debug for RedbCache {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("RedbCache")
54            .field("folder", &self.folder)
55            .finish()
56    }
57}
58
59impl RedbCache {
60    /// Open or create a cache in `folder`. Creates the folder if it
61    /// doesn't exist; opens (or creates) `vlive.redb` inside.
62    ///
63    /// Returns an error if another process already has the cache open
64    /// (redb's exclusive file lock — see module docs).
65    pub fn open(folder: impl AsRef<Path>) -> Result<Self> {
66        let folder = folder.as_ref().to_path_buf();
67        std::fs::create_dir_all(&folder)
68            .map_err(|e| error!("Failed to create cache folder", details = e.to_string()))?;
69        let path = folder.join(CACHE_FILE);
70        let db = Database::create(&path)
71            .map_err(|e| error!("Failed to open cache redb", details = e.to_string()))?;
72        Ok(Self {
73            db: Arc::new(db),
74            folder,
75        })
76    }
77
78    /// Folder this cache lives in. Useful for diagnostics; do not use
79    /// to open a second handle while this one is alive.
80    pub fn folder(&self) -> &Path {
81        &self.folder
82    }
83}
84
85/// Split a cache key like `clients/page_1` into `("clients", "page_1")`.
86/// If there's no `/`, the whole key is treated as the root and the
87/// sub-key is empty (the `__root__` sentinel is used internally so
88/// every entry has a non-empty redb sub-key).
89fn split_key(key: &str) -> (&str, String) {
90    match key.find('/') {
91        Some(i) => (&key[..i], key[i + 1..].to_string()),
92        None => (key, String::from("__root__")),
93    }
94}
95
96fn redb_table_name(root: &str) -> String {
97    format!("{}{}", TABLE_PREFIX, root)
98}
99
100fn cache_table_def(name: &str) -> TableDefinition<'_, &'static str, &'static [u8]> {
101    TableDefinition::new(name)
102}
103
104fn encode_rows(rows: &CachedRows) -> Result<Vec<u8>> {
105    // CachedRows isn't directly Serialize, so encode field-by-field as a
106    // CBOR map. fetched_at is stored as seconds-since-epoch so the
107    // representation is portable / compact.
108    let secs = rows
109        .fetched_at
110        .duration_since(SystemTime::UNIX_EPOCH)
111        .map(|d| d.as_secs() as i64)
112        .unwrap_or(0);
113
114    let row_pairs: Vec<(CborValue, CborValue)> = rows
115        .rows
116        .iter()
117        .map(|(id, rec)| {
118            let entries: Vec<(CborValue, CborValue)> = rec
119                .iter()
120                .map(|(k, v)| (CborValue::Text(k.clone()), v.clone()))
121                .collect();
122            (CborValue::Text(id.clone()), CborValue::Map(entries))
123        })
124        .collect();
125
126    let envelope = CborValue::Map(vec![
127        (
128            CborValue::Text("fetched_at".into()),
129            CborValue::Integer(secs.into()),
130        ),
131        (CborValue::Text("rows".into()), CborValue::Map(row_pairs)),
132    ]);
133
134    let mut bytes = Vec::new();
135    ciborium::ser::into_writer(&envelope, &mut bytes)
136        .map_err(|e| error!("CBOR encode failed", details = e.to_string()))?;
137    Ok(bytes)
138}
139
140fn decode_rows(bytes: &[u8]) -> Result<CachedRows> {
141    let parsed: CborValue = ciborium::de::from_reader(bytes)
142        .map_err(|e| error!("CBOR decode failed", details = e.to_string()))?;
143    let mut secs: i64 = 0;
144    let mut rows: IndexMap<String, Record<CborValue>> = IndexMap::new();
145
146    let pairs = match parsed {
147        CborValue::Map(p) => p,
148        _ => return Err(error!("RedbCache: expected envelope to be a map")),
149    };
150    for (k, v) in pairs {
151        let key = match k {
152            CborValue::Text(s) => s,
153            _ => continue,
154        };
155        match (key.as_str(), v) {
156            ("fetched_at", CborValue::Integer(i)) => {
157                secs = i64::try_from(i).unwrap_or(0);
158            }
159            ("rows", CborValue::Map(row_pairs)) => {
160                for (rk, rv) in row_pairs {
161                    let id = match rk {
162                        CborValue::Text(s) => s,
163                        _ => continue,
164                    };
165                    let mut rec: Record<CborValue> = Record::new();
166                    if let CborValue::Map(field_pairs) = rv {
167                        for (fk, fv) in field_pairs {
168                            if let CborValue::Text(name) = fk {
169                                rec.insert(name, fv);
170                            }
171                        }
172                    }
173                    rows.insert(id, rec);
174                }
175            }
176            _ => {}
177        }
178    }
179
180    let fetched_at =
181        SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(secs.try_into().unwrap_or(0));
182    Ok(CachedRows { rows, fetched_at })
183}
184
185#[async_trait]
186impl Cache for RedbCache {
187    async fn get(&self, key: &str) -> Result<Option<CachedRows>> {
188        let (root, sub) = split_key(key);
189        let table_name = redb_table_name(root);
190
191        let txn = self
192            .db
193            .begin_read()
194            .map_err(|e| error!("redb begin_read failed", details = e.to_string()))?;
195        let table = match txn.open_table(cache_table_def(&table_name)) {
196            Ok(t) => t,
197            Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
198            Err(e) => {
199                return Err(error!(
200                    "Failed to open cache table",
201                    table = table_name,
202                    details = e.to_string()
203                ));
204            }
205        };
206        let bytes = table
207            .get(sub.as_str())
208            .map_err(|e| error!("redb cache get failed", details = e.to_string()))?;
209        match bytes {
210            Some(b) => Ok(Some(decode_rows(b.value())?)),
211            None => Ok(None),
212        }
213    }
214
215    async fn put(&self, key: &str, rows: CachedRows) -> Result<()> {
216        let (root, sub) = split_key(key);
217        let table_name = redb_table_name(root);
218        let bytes = encode_rows(&rows)?;
219
220        let txn = self
221            .db
222            .begin_write()
223            .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
224        {
225            let mut table = txn.open_table(cache_table_def(&table_name)).map_err(|e| {
226                error!(
227                    "Failed to open cache table for write",
228                    table = table_name,
229                    details = e.to_string()
230                )
231            })?;
232            table
233                .insert(sub.as_str(), bytes.as_slice())
234                .map_err(|e| error!("redb cache insert failed", details = e.to_string()))?;
235        }
236        txn.commit()
237            .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
238        Ok(())
239    }
240
241    async fn invalidate_prefix(&self, prefix: &str) -> Result<()> {
242        // The fast path: prefix is exactly a cache_key (no '/') →
243        // drop the whole table.
244        if !prefix.contains('/') {
245            let table_name = redb_table_name(prefix);
246            let txn = self
247                .db
248                .begin_write()
249                .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
250            // delete_table returns Ok(false) if the table didn't exist,
251            // Ok(true) if it was dropped. Either way, swallow.
252            let _ = txn
253                .delete_table(cache_table_def(&table_name))
254                .map_err(|e| error!("delete_table failed", details = e.to_string()))?;
255            txn.commit()
256                .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
257            return Ok(());
258        }
259
260        // Sub-prefix: scan inside the appropriate table and delete the
261        // matching sub-keys. Used by future surgical-invalidation paths;
262        // v1 LiveTable doesn't currently call this.
263        let (root, sub_prefix) = split_key(prefix);
264        let table_name = redb_table_name(root);
265
266        let txn = self
267            .db
268            .begin_write()
269            .map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
270        {
271            let mut table = match txn.open_table(cache_table_def(&table_name)) {
272                Ok(t) => t,
273                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(()),
274                Err(e) => {
275                    return Err(error!(
276                        "Failed to open cache table for invalidate",
277                        table = table_name,
278                        details = e.to_string()
279                    ));
280                }
281            };
282            // Collect keys matching prefix first (can't mutate during iter).
283            let mut to_delete: Vec<String> = Vec::new();
284            {
285                let iter = table
286                    .iter()
287                    .map_err(|e| error!("redb cache iter failed", details = e.to_string()))?;
288                for entry in iter {
289                    let (k, _) = entry.map_err(|e| {
290                        error!("redb cache iter entry failed", details = e.to_string())
291                    })?;
292                    if k.value().starts_with(&sub_prefix) {
293                        to_delete.push(k.value().to_string());
294                    }
295                }
296            }
297            for k in to_delete {
298                table
299                    .remove(k.as_str())
300                    .map_err(|e| error!("redb cache remove failed", details = e.to_string()))?;
301            }
302            // If the table is now empty, drop it so we don't leak empties.
303            if ReadableTableMetadata::len(&table).map_err(|e: redb::StorageError| {
304                error!("redb cache len failed", details = e.to_string())
305            })? == 0
306            {
307                drop(table);
308                let _ = txn
309                    .delete_table(cache_table_def(&table_name))
310                    .map_err(|e| error!("delete_table failed", details = e.to_string()))?;
311            }
312        }
313        txn.commit()
314            .map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
315        Ok(())
316    }
317}