use std::path::Path;
use std::sync::Arc;
use cid::CidGeneric;
use redb::{
CommitError, Database, ReadTransaction, ReadableTable, StorageError, TableDefinition,
TableError, TransactionError, WriteTransaction,
};
use tokio::task::spawn_blocking;
use crate::counter::Counter;
use crate::{Blockstore, Error, Result};
const BLOCKS_TABLE: TableDefinition<'static, &[u8], &[u8]> =
TableDefinition::new("BLOCKSTORE.BLOCKS");
#[derive(Debug)]
pub struct RedbBlockstore {
db: Arc<Database>,
task_counter: Counter,
}
impl RedbBlockstore {
pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_owned();
let db = spawn_blocking(|| Database::create(path))
.await?
.map_err(|e| Error::FatalDatabaseError(e.to_string()))?;
Ok(RedbBlockstore::new(Arc::new(db)))
}
pub fn in_memory() -> Result<Self> {
let db = Database::builder()
.create_with_backend(redb::backends::InMemoryBackend::new())
.map_err(|e| Error::FatalDatabaseError(e.to_string()))?;
Ok(RedbBlockstore::new(Arc::new(db)))
}
pub fn new(db: Arc<Database>) -> Self {
RedbBlockstore {
db,
task_counter: Counter::new(),
}
}
pub fn raw_db(&self) -> Arc<Database> {
self.db.clone()
}
async fn read_tx<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone();
let guard = self.task_counter.guard();
tokio::task::spawn_blocking(move || {
let _guard = guard;
{
let db = db;
let mut tx = db.begin_read()?;
f(&mut tx)
}
})
.await?
}
async fn write_tx<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone();
let guard = self.task_counter.guard();
tokio::task::spawn_blocking(move || {
let _guard = guard;
{
let db = db;
let mut tx = db.begin_write()?;
let res = f(&mut tx);
if res.is_ok() {
tx.commit()?;
} else {
tx.abort()?;
}
res
}
})
.await?
}
}
impl Blockstore for RedbBlockstore {
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>> {
let cid = cid.to_bytes();
self.read_tx(move |tx| {
let blocks_table = match tx.open_table(BLOCKS_TABLE) {
Ok(val) => val,
Err(TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(e.into()),
};
Ok(blocks_table
.get(&cid[..])?
.map(|guard| guard.value().to_owned()))
})
.await
}
async fn put_keyed<const S: usize>(&self, cid: &CidGeneric<S>, data: &[u8]) -> Result<()> {
let cid = cid.to_bytes();
let data = data.to_vec();
self.write_tx(move |tx| {
let mut blocks_table = tx.open_table(BLOCKS_TABLE)?;
if blocks_table.get(&cid[..])?.is_none() {
blocks_table.insert(&cid[..], &data[..])?;
}
Ok(())
})
.await
}
async fn remove<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<()> {
let cid = cid.to_bytes();
self.write_tx(move |tx| {
let mut blocks_table = tx.open_table(BLOCKS_TABLE)?;
blocks_table.remove(&cid[..])?;
Ok(())
})
.await
}
async fn has<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<bool> {
let cid = cid.to_bytes();
self.read_tx(move |tx| {
let blocks_table = match tx.open_table(BLOCKS_TABLE) {
Ok(val) => val,
Err(TableError::TableDoesNotExist(_)) => return Ok(false),
Err(e) => return Err(e.into()),
};
Ok(blocks_table.get(&cid[..])?.is_some())
})
.await
}
async fn close(mut self) -> Result<()> {
self.task_counter.wait_guards().await;
Ok(())
}
}
impl From<TransactionError> for Error {
fn from(e: TransactionError) -> Self {
match e {
TransactionError::ReadTransactionStillInUse(_) => {
unreachable!("redb::ReadTransaction::close is never used")
}
e => Error::FatalDatabaseError(e.to_string()),
}
}
}
impl From<TableError> for Error {
fn from(e: TableError) -> Self {
match e {
TableError::Storage(e) => e.into(),
TableError::TableAlreadyOpen(table, location) => {
unreachable!("Table {table} already opened from: {location}")
}
e @ TableError::TableDoesNotExist(_) => {
unreachable!("redb::ReadTransaction::open_table result not handled correctly: {e}");
}
e => Error::StoredDataError(e.to_string()),
}
}
}
impl From<StorageError> for Error {
fn from(e: StorageError) -> Self {
match e {
StorageError::ValueTooLarge(_) => Error::ValueTooLarge,
e => Error::FatalDatabaseError(e.to_string()),
}
}
}
impl From<CommitError> for Error {
fn from(e: CommitError) -> Self {
Error::FatalDatabaseError(e.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::cid_v1;
#[tokio::test]
async fn store_persists() {
let dir = tempfile::TempDir::with_prefix("redb-blockstore-test")
.unwrap()
.keep();
let db_path = dir.join("db");
let store = RedbBlockstore::open(&db_path).await.unwrap();
let cid = cid_v1::<64>(b"1");
let data = b"data";
store.put_keyed(&cid, data).await.unwrap();
spawn_blocking(move || drop(store)).await.unwrap();
let store = RedbBlockstore::open(&db_path).await.unwrap();
let received = store.get(&cid).await.unwrap();
assert_eq!(received, Some(data.to_vec()));
}
}