use std::sync::Arc;
use bytes::Bytes;
use mnem_core::error::StoreError;
use mnem_core::id::Cid;
use mnem_core::store::{Blockstore, blockstore::recompute_cid};
use redb::{Database, ReadableDatabase};
use crate::{OBJECTS_TABLE, redb_err};
pub struct RedbBlockstore {
db: Arc<Database>,
}
impl RedbBlockstore {
#[must_use]
pub const fn new(db: Arc<Database>) -> Self {
Self { db }
}
}
impl std::fmt::Debug for RedbBlockstore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbBlockstore").finish_non_exhaustive()
}
}
impl Blockstore for RedbBlockstore {
fn has(&self, cid: &Cid) -> Result<bool, StoreError> {
let key = cid.to_bytes();
let tx = self.db.begin_read().map_err(redb_err)?;
let table = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
Ok(table.get(&key[..]).map_err(redb_err)?.is_some())
}
fn get(&self, cid: &Cid) -> Result<Option<Bytes>, StoreError> {
let key = cid.to_bytes();
let tx = self.db.begin_read().map_err(redb_err)?;
let table = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
match table.get(&key[..]).map_err(redb_err)? {
Some(access) => Ok(Some(Bytes::copy_from_slice(access.value()))),
None => Ok(None),
}
}
fn put(&self, cid: Cid, data: Bytes) -> Result<(), StoreError> {
if let Some(computed) = recompute_cid(&cid, &data)
&& computed != cid
{
return Err(StoreError::CidMismatch {
claimed: cid,
computed,
});
}
let key = cid.to_bytes();
let tx = self.db.begin_write().map_err(redb_err)?;
{
let mut table = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
table.insert(&key[..], &data[..]).map_err(redb_err)?;
}
tx.commit().map_err(redb_err)?;
Ok(())
}
fn put_trusted(&self, cid: Cid, data: Bytes) -> Result<(), StoreError> {
let key = cid.to_bytes();
let tx = self.db.begin_write().map_err(redb_err)?;
{
let mut table = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
table.insert(&key[..], &data[..]).map_err(redb_err)?;
}
tx.commit().map_err(redb_err)?;
Ok(())
}
fn delete(&self, cid: &Cid) -> Result<(), StoreError> {
let key = cid.to_bytes();
let tx = self.db.begin_write().map_err(redb_err)?;
{
let mut table = tx.open_table(OBJECTS_TABLE).map_err(redb_err)?;
table.remove(&key[..]).map_err(redb_err)?;
}
tx.commit().map_err(redb_err)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use mnem_core::codec::hash_to_cid;
use redb::Database;
use serde::Serialize;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Serialize)]
struct Sample {
n: u32,
}
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn tmp_file(name: &str) -> PathBuf {
let path = std::env::temp_dir().join(format!(
"mnem-redb-blockstore-{name}-{}-{}.redb",
std::process::id(),
COUNTER.fetch_add(1, Ordering::Relaxed)
));
let _ = std::fs::remove_file(&path);
path
}
fn new_store() -> RedbBlockstore {
let p = tmp_file("bs");
let db = Arc::new(Database::create(&p).unwrap());
let tx = db.begin_write().unwrap();
let _ = tx.open_table(OBJECTS_TABLE).unwrap();
tx.commit().unwrap();
RedbBlockstore::new(db)
}
#[test]
fn put_get_has_delete_round_trip() {
let bs = new_store();
let (bytes, cid) = hash_to_cid(&Sample { n: 7 }).unwrap();
assert!(!bs.has(&cid).unwrap());
bs.put(cid.clone(), bytes.clone()).unwrap();
assert!(bs.has(&cid).unwrap());
assert_eq!(bs.get(&cid).unwrap(), Some(bytes));
bs.delete(&cid).unwrap();
assert!(!bs.has(&cid).unwrap());
}
#[test]
fn put_is_idempotent() {
let bs = new_store();
let (bytes, cid) = hash_to_cid(&Sample { n: 42 }).unwrap();
bs.put(cid.clone(), bytes.clone()).unwrap();
bs.put(cid.clone(), bytes).unwrap(); assert!(bs.has(&cid).unwrap());
}
#[test]
fn delete_missing_is_ok() {
let bs = new_store();
let (_, cid) = hash_to_cid(&Sample { n: 99 }).unwrap();
bs.delete(&cid).unwrap();
}
#[test]
fn put_trusted_round_trip() {
let bs = new_store();
let (bytes, cid) = hash_to_cid(&Sample { n: 17 }).unwrap();
bs.put_trusted(cid.clone(), bytes.clone()).unwrap();
assert!(bs.has(&cid).unwrap());
assert_eq!(bs.get(&cid).unwrap(), Some(bytes));
}
#[test]
fn put_trusted_skips_verification_by_design() {
let bs = new_store();
let (_, cid) = hash_to_cid(&Sample { n: 1 }).unwrap();
let wrong_bytes = Bytes::from_static(b"not the sample");
bs.put_trusted(cid.clone(), wrong_bytes.clone())
.expect("put_trusted skips verify");
assert_eq!(bs.get(&cid).unwrap(), Some(wrong_bytes));
}
#[test]
fn put_rejects_cid_mismatch() {
let bs = new_store();
let (_, cid) = hash_to_cid(&Sample { n: 1 }).unwrap();
let wrong_bytes = Bytes::from_static(b"not the sample");
let err = bs.put(cid.clone(), wrong_bytes).unwrap_err();
match err {
StoreError::CidMismatch { claimed, .. } => assert_eq!(claimed, cid),
e => panic!("wrong variant: {e:?}"),
}
assert!(!bs.has(&cid).unwrap());
}
}