use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use ciborium::Value as CborValue;
use indexmap::IndexMap;
use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
use std::time::SystemTime;
use vantage_core::{Result, error};
use vantage_types::Record;
use super::{Cache, CachedRows};
const CACHE_FILE: &str = "vlive.redb";
const TABLE_PREFIX: &str = "__vlive__";
#[derive(Clone)]
pub struct RedbCache {
db: Arc<Database>,
folder: PathBuf,
}
impl std::fmt::Debug for RedbCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbCache")
.field("folder", &self.folder)
.finish()
}
}
impl RedbCache {
pub fn open(folder: impl AsRef<Path>) -> Result<Self> {
let folder = folder.as_ref().to_path_buf();
std::fs::create_dir_all(&folder)
.map_err(|e| error!("Failed to create cache folder", details = e.to_string()))?;
let path = folder.join(CACHE_FILE);
let db = Database::create(&path)
.map_err(|e| error!("Failed to open cache redb", details = e.to_string()))?;
Ok(Self {
db: Arc::new(db),
folder,
})
}
pub fn folder(&self) -> &Path {
&self.folder
}
}
fn split_key(key: &str) -> (&str, String) {
match key.find('/') {
Some(i) => (&key[..i], key[i + 1..].to_string()),
None => (key, String::from("__root__")),
}
}
fn redb_table_name(root: &str) -> String {
format!("{}{}", TABLE_PREFIX, root)
}
fn cache_table_def(name: &str) -> TableDefinition<'_, &'static str, &'static [u8]> {
TableDefinition::new(name)
}
fn encode_rows(rows: &CachedRows) -> Result<Vec<u8>> {
let secs = rows
.fetched_at
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let row_pairs: Vec<(CborValue, CborValue)> = rows
.rows
.iter()
.map(|(id, rec)| {
let entries: Vec<(CborValue, CborValue)> = rec
.iter()
.map(|(k, v)| (CborValue::Text(k.clone()), v.clone()))
.collect();
(CborValue::Text(id.clone()), CborValue::Map(entries))
})
.collect();
let envelope = CborValue::Map(vec![
(
CborValue::Text("fetched_at".into()),
CborValue::Integer(secs.into()),
),
(CborValue::Text("rows".into()), CborValue::Map(row_pairs)),
]);
let mut bytes = Vec::new();
ciborium::ser::into_writer(&envelope, &mut bytes)
.map_err(|e| error!("CBOR encode failed", details = e.to_string()))?;
Ok(bytes)
}
fn decode_rows(bytes: &[u8]) -> Result<CachedRows> {
let parsed: CborValue = ciborium::de::from_reader(bytes)
.map_err(|e| error!("CBOR decode failed", details = e.to_string()))?;
let mut secs: i64 = 0;
let mut rows: IndexMap<String, Record<CborValue>> = IndexMap::new();
let pairs = match parsed {
CborValue::Map(p) => p,
_ => return Err(error!("RedbCache: expected envelope to be a map")),
};
for (k, v) in pairs {
let key = match k {
CborValue::Text(s) => s,
_ => continue,
};
match (key.as_str(), v) {
("fetched_at", CborValue::Integer(i)) => {
secs = i64::try_from(i).unwrap_or(0);
}
("rows", CborValue::Map(row_pairs)) => {
for (rk, rv) in row_pairs {
let id = match rk {
CborValue::Text(s) => s,
_ => continue,
};
let mut rec: Record<CborValue> = Record::new();
if let CborValue::Map(field_pairs) = rv {
for (fk, fv) in field_pairs {
if let CborValue::Text(name) = fk {
rec.insert(name, fv);
}
}
}
rows.insert(id, rec);
}
}
_ => {}
}
}
let fetched_at =
SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(secs.try_into().unwrap_or(0));
Ok(CachedRows { rows, fetched_at })
}
#[async_trait]
impl Cache for RedbCache {
async fn get(&self, key: &str) -> Result<Option<CachedRows>> {
let (root, sub) = split_key(key);
let table_name = redb_table_name(root);
let txn = self
.db
.begin_read()
.map_err(|e| error!("redb begin_read failed", details = e.to_string()))?;
let table = match txn.open_table(cache_table_def(&table_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => {
return Err(error!(
"Failed to open cache table",
table = table_name,
details = e.to_string()
));
}
};
let bytes = table
.get(sub.as_str())
.map_err(|e| error!("redb cache get failed", details = e.to_string()))?;
match bytes {
Some(b) => Ok(Some(decode_rows(b.value())?)),
None => Ok(None),
}
}
async fn put(&self, key: &str, rows: CachedRows) -> Result<()> {
let (root, sub) = split_key(key);
let table_name = redb_table_name(root);
let bytes = encode_rows(&rows)?;
let txn = self
.db
.begin_write()
.map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
{
let mut table = txn.open_table(cache_table_def(&table_name)).map_err(|e| {
error!(
"Failed to open cache table for write",
table = table_name,
details = e.to_string()
)
})?;
table
.insert(sub.as_str(), bytes.as_slice())
.map_err(|e| error!("redb cache insert failed", details = e.to_string()))?;
}
txn.commit()
.map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
Ok(())
}
async fn invalidate_prefix(&self, prefix: &str) -> Result<()> {
if !prefix.contains('/') {
let table_name = redb_table_name(prefix);
let txn = self
.db
.begin_write()
.map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
let _ = txn
.delete_table(cache_table_def(&table_name))
.map_err(|e| error!("delete_table failed", details = e.to_string()))?;
txn.commit()
.map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
return Ok(());
}
let (root, sub_prefix) = split_key(prefix);
let table_name = redb_table_name(root);
let txn = self
.db
.begin_write()
.map_err(|e| error!("redb begin_write failed", details = e.to_string()))?;
{
let mut table = match txn.open_table(cache_table_def(&table_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(()),
Err(e) => {
return Err(error!(
"Failed to open cache table for invalidate",
table = table_name,
details = e.to_string()
));
}
};
let mut to_delete: Vec<String> = Vec::new();
{
let iter = table
.iter()
.map_err(|e| error!("redb cache iter failed", details = e.to_string()))?;
for entry in iter {
let (k, _) = entry.map_err(|e| {
error!("redb cache iter entry failed", details = e.to_string())
})?;
if k.value().starts_with(&sub_prefix) {
to_delete.push(k.value().to_string());
}
}
}
for k in to_delete {
table
.remove(k.as_str())
.map_err(|e| error!("redb cache remove failed", details = e.to_string()))?;
}
if ReadableTableMetadata::len(&table).map_err(|e: redb::StorageError| {
error!("redb cache len failed", details = e.to_string())
})? == 0
{
drop(table);
let _ = txn
.delete_table(cache_table_def(&table_name))
.map_err(|e| error!("delete_table failed", details = e.to_string()))?;
}
}
txn.commit()
.map_err(|e| error!("redb cache commit failed", details = e.to_string()))?;
Ok(())
}
}