use crate::traits::BlockStore;
use async_trait::async_trait;
use ipfrs_core::{Block, Cid, Error, Result};
use parity_db::{Db, Options};
use std::path::PathBuf;
use std::sync::Arc;
const BLOCKS_COLUMN: u8 = 0;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParityDbPreset {
FastWrite,
Balanced,
LowMemory,
}
#[derive(Debug, Clone)]
pub struct ParityDbConfig {
pub path: PathBuf,
pub preset: ParityDbPreset,
pub custom_options: Option<Options>,
}
impl ParityDbConfig {
pub fn new(path: PathBuf, preset: ParityDbPreset) -> Self {
Self {
path,
preset,
custom_options: None,
}
}
pub fn fast_write(path: PathBuf) -> Self {
Self::new(path, ParityDbPreset::FastWrite)
}
pub fn balanced(path: PathBuf) -> Self {
Self::new(path, ParityDbPreset::Balanced)
}
pub fn low_memory(path: PathBuf) -> Self {
Self::new(path, ParityDbPreset::LowMemory)
}
fn build_options(&self) -> Options {
if let Some(ref custom) = self.custom_options {
return custom.clone();
}
let mut options = Options::with_columns(&self.path, 1);
match self.preset {
ParityDbPreset::FastWrite => {
options.columns[BLOCKS_COLUMN as usize].btree_index = true;
options.columns[BLOCKS_COLUMN as usize].compression =
parity_db::CompressionType::Lz4;
options.sync_wal = false; options.sync_data = false; }
ParityDbPreset::Balanced => {
options.columns[BLOCKS_COLUMN as usize].btree_index = true;
options.columns[BLOCKS_COLUMN as usize].compression =
parity_db::CompressionType::Lz4;
options.sync_wal = true;
options.sync_data = false;
}
ParityDbPreset::LowMemory => {
options.columns[BLOCKS_COLUMN as usize].btree_index = false; options.columns[BLOCKS_COLUMN as usize].compression =
parity_db::CompressionType::Lz4;
options.sync_wal = true;
options.sync_data = true;
}
}
options
}
}
impl Default for ParityDbConfig {
fn default() -> Self {
Self::balanced(PathBuf::from(".ipfrs/blocks-paritydb"))
}
}
pub struct ParityDbBlockStore {
db: Arc<Db>,
}
impl ParityDbBlockStore {
pub fn new(config: ParityDbConfig) -> Result<Self> {
if let Some(parent) = config.path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
}
let options = config.build_options();
let db = Db::open_or_create(&options)
.map_err(|e| Error::Storage(format!("Failed to open ParityDB: {e}")))?;
Ok(Self { db: Arc::new(db) })
}
pub fn db(&self) -> &Arc<Db> {
&self.db
}
}
#[async_trait]
impl BlockStore for ParityDbBlockStore {
async fn put(&self, block: &Block) -> Result<()> {
let key = block.cid().to_bytes();
let value = block.data().to_vec();
let transaction = vec![(BLOCKS_COLUMN, key, Some(value))];
self.db
.commit(transaction)
.map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
Ok(())
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
let key = cid.to_bytes();
match self.db.get(BLOCKS_COLUMN, &key) {
Ok(Some(value)) => {
let data = bytes::Bytes::from(value);
Ok(Some(Block::from_parts(*cid, data)))
}
Ok(None) => Ok(None),
Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
}
}
async fn has(&self, cid: &Cid) -> Result<bool> {
let key = cid.to_bytes();
match self.db.get(BLOCKS_COLUMN, &key) {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(Error::Storage(format!("Failed to check block: {e}"))),
}
}
async fn delete(&self, cid: &Cid) -> Result<()> {
let key = cid.to_bytes();
let transaction = vec![(BLOCKS_COLUMN, key, None)];
self.db
.commit(transaction)
.map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
Ok(())
}
fn len(&self) -> usize {
0
}
fn is_empty(&self) -> bool {
false
}
fn list_cids(&self) -> Result<Vec<Cid>> {
let mut cids = Vec::new();
let mut iter = self
.db
.iter(BLOCKS_COLUMN)
.map_err(|e| Error::Storage(format!("Failed to create iterator: {e}")))?;
while let Some((key, _value)) = iter
.next()
.map_err(|e| Error::Storage(format!("Iterator error: {e}")))?
{
let cid = Cid::try_from(key.to_vec())
.map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
cids.push(cid);
}
Ok(cids)
}
async fn put_many(&self, blocks: &[Block]) -> Result<()> {
let mut transaction = Vec::new();
for block in blocks {
let key = block.cid().to_bytes();
let value = block.data().to_vec();
transaction.push((BLOCKS_COLUMN, key, Some(value)));
}
self.db
.commit(transaction)
.map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
Ok(())
}
async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
let mut results = Vec::with_capacity(cids.len());
for cid in cids {
let key = cid.to_bytes();
match self.db.get(BLOCKS_COLUMN, &key) {
Ok(Some(value)) => {
let data = bytes::Bytes::from(value);
results.push(Some(Block::from_parts(*cid, data)));
}
Ok(None) => results.push(None),
Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
}
}
Ok(results)
}
async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
let mut results = Vec::with_capacity(cids.len());
for cid in cids {
let key = cid.to_bytes();
match self.db.get(BLOCKS_COLUMN, &key) {
Ok(Some(_)) => results.push(true),
Ok(None) => results.push(false),
Err(e) => return Err(Error::Storage(format!("Failed to check block: {e}"))),
}
}
Ok(results)
}
async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
let mut transaction = Vec::new();
for cid in cids {
let key = cid.to_bytes();
transaction.push((BLOCKS_COLUMN, key, None));
}
self.db
.commit(transaction)
.map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
Ok(())
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[tokio::test]
async fn test_paritydb_put_get_block() {
let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb"));
let _ = std::fs::remove_dir_all(&config.path);
let store = ParityDbBlockStore::new(config).unwrap();
let data = Bytes::from("hello paritydb");
let block = Block::new(data.clone()).unwrap();
store.put(&block).await.unwrap();
let retrieved = store.get(block.cid()).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().data(), &data);
assert!(store.has(block.cid()).await.unwrap());
store.delete(block.cid()).await.unwrap();
assert!(!store.has(block.cid()).await.unwrap());
}
#[tokio::test]
async fn test_paritydb_batch_operations() {
let config = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-batch"));
let _ = std::fs::remove_dir_all(&config.path);
let store = ParityDbBlockStore::new(config).unwrap();
let blocks: Vec<Block> = (0..10)
.map(|i| {
let data = Bytes::from(format!("block {}", i));
Block::new(data).unwrap()
})
.collect();
store.put_many(&blocks).await.unwrap();
let cids: Vec<Cid> = blocks.iter().map(|b| *b.cid()).collect();
let exists = store.has_many(&cids).await.unwrap();
assert!(exists.iter().all(|&x| x));
let retrieved = store.get_many(&cids).await.unwrap();
assert_eq!(retrieved.len(), blocks.len());
for (i, opt_block) in retrieved.iter().enumerate() {
assert!(opt_block.is_some());
assert_eq!(opt_block.as_ref().unwrap().data(), blocks[i].data());
}
store.delete_many(&cids).await.unwrap();
let exists = store.has_many(&cids).await.unwrap();
assert!(exists.iter().all(|&x| !x));
}
#[tokio::test]
async fn test_paritydb_presets() {
let config1 = ParityDbConfig::fast_write(PathBuf::from("/tmp/ipfrs-test-paritydb-fast"));
let _ = std::fs::remove_dir_all(&config1.path);
assert_eq!(config1.preset, ParityDbPreset::FastWrite);
let _store1 = ParityDbBlockStore::new(config1).unwrap();
let config2 = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-balanced"));
let _ = std::fs::remove_dir_all(&config2.path);
assert_eq!(config2.preset, ParityDbPreset::Balanced);
let _store2 = ParityDbBlockStore::new(config2).unwrap();
let config3 = ParityDbConfig::low_memory(PathBuf::from("/tmp/ipfrs-test-paritydb-lowmem"));
let _ = std::fs::remove_dir_all(&config3.path);
assert_eq!(config3.preset, ParityDbPreset::LowMemory);
let _store3 = ParityDbBlockStore::new(config3).unwrap();
}
#[tokio::test]
async fn test_paritydb_list_cids() {
let config = ParityDbConfig::balanced(PathBuf::from("/tmp/ipfrs-test-paritydb-list"));
let _ = std::fs::remove_dir_all(&config.path);
let store = ParityDbBlockStore::new(config).unwrap();
let blocks: Vec<Block> = (0..5)
.map(|i| {
let data = Bytes::from(format!("block {}", i));
Block::new(data).unwrap()
})
.collect();
store.put_many(&blocks).await.unwrap();
let cids = store.list_cids().unwrap();
assert_eq!(cids.len(), 5);
for block in &blocks {
assert!(cids.contains(block.cid()));
}
}
}