use crate::traits::BlockStore;
use async_trait::async_trait;
use ipfrs_core::{Block, Cid, Error, Result};
use sled::Db;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct BlockStoreConfig {
pub path: PathBuf,
pub cache_size: usize,
}
impl Default for BlockStoreConfig {
fn default() -> Self {
Self {
path: PathBuf::from(".ipfrs/blocks"),
cache_size: 100 * 1024 * 1024, }
}
}
impl BlockStoreConfig {
pub fn development() -> Self {
Self {
path: PathBuf::from("/tmp/ipfrs-dev"),
cache_size: 50 * 1024 * 1024,
}
}
pub fn production(path: PathBuf) -> Self {
Self {
path,
cache_size: 500 * 1024 * 1024,
}
}
pub fn embedded(path: PathBuf) -> Self {
Self {
path,
cache_size: 10 * 1024 * 1024,
}
}
pub fn testing() -> Self {
let temp_dir = std::env::temp_dir().join(format!("ipfrs-test-{}", std::process::id()));
Self {
path: temp_dir,
cache_size: 5 * 1024 * 1024,
}
}
pub fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}
pub fn with_cache_mb(mut self, cache_mb: usize) -> Self {
self.cache_size = cache_mb * 1024 * 1024;
self
}
pub fn with_cache_bytes(mut self, cache_bytes: usize) -> Self {
self.cache_size = cache_bytes;
self
}
}
pub struct SledBlockStore {
db: Db,
}
impl SledBlockStore {
pub fn new(config: BlockStoreConfig) -> Result<Self> {
if let Some(parent) = config.path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
}
let db = sled::Config::new()
.path(&config.path)
.cache_capacity(config.cache_size as u64)
.open()
.map_err(|e| Error::Storage(format!("Failed to open database: {e}")))?;
Ok(Self { db })
}
}
#[async_trait]
impl BlockStore for SledBlockStore {
async fn put(&self, block: &Block) -> Result<()> {
let key = block.cid().to_bytes();
let value = block.data().to_vec();
self.db
.insert(key, value)
.map_err(|e| Error::Storage(format!("Failed to insert block: {e}")))?;
self.db
.flush_async()
.await
.map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
Ok(())
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
let key = cid.to_bytes();
match self.db.get(&key) {
Ok(Some(value)) => {
let data = bytes::Bytes::from(value.to_vec());
Ok(Some(Block::from_parts(*cid, data)))
}
Ok(None) => Ok(None),
Err(e) => Err(Error::Storage(format!("Failed to get block: {e}"))),
}
}
async fn has(&self, cid: &Cid) -> Result<bool> {
let key = cid.to_bytes();
self.db
.contains_key(&key)
.map_err(|e| Error::Storage(format!("Failed to check block: {e}")))
}
async fn delete(&self, cid: &Cid) -> Result<()> {
let key = cid.to_bytes();
self.db
.remove(&key)
.map_err(|e| Error::Storage(format!("Failed to delete block: {e}")))?;
self.db
.flush_async()
.await
.map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
Ok(())
}
fn len(&self) -> usize {
self.db.len()
}
fn is_empty(&self) -> bool {
self.db.is_empty()
}
fn list_cids(&self) -> Result<Vec<Cid>> {
let mut cids = Vec::new();
for item in self.db.iter() {
let (key, _) = item.map_err(|e| Error::Storage(format!("Iteration error: {e}")))?;
let cid = Cid::try_from(key.to_vec())
.map_err(|e| Error::Cid(format!("Failed to parse CID: {e}")))?;
cids.push(cid);
}
Ok(cids)
}
async fn put_many(&self, blocks: &[Block]) -> Result<()> {
let mut batch = sled::Batch::default();
for block in blocks {
let key = block.cid().to_bytes();
let value = block.data().to_vec();
batch.insert(key, value);
}
self.db
.apply_batch(batch)
.map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
self.db
.flush_async()
.await
.map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
Ok(())
}
async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
let mut results = Vec::with_capacity(cids.len());
for cid in cids {
let key = cid.to_bytes();
match self.db.get(&key) {
Ok(Some(value)) => {
let data = bytes::Bytes::from(value.to_vec());
results.push(Some(Block::from_parts(*cid, data)));
}
Ok(None) => results.push(None),
Err(e) => return Err(Error::Storage(format!("Failed to get block: {e}"))),
}
}
Ok(results)
}
async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
let mut results = Vec::with_capacity(cids.len());
for cid in cids {
let key = cid.to_bytes();
let exists = self
.db
.contains_key(&key)
.map_err(|e| Error::Storage(format!("Failed to check block: {e}")))?;
results.push(exists);
}
Ok(results)
}
async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
let mut batch = sled::Batch::default();
for cid in cids {
let key = cid.to_bytes();
batch.remove(key);
}
self.db
.apply_batch(batch)
.map_err(|e| Error::Storage(format!("Failed to apply batch: {e}")))?;
self.db
.flush_async()
.await
.map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
Ok(())
}
async fn flush(&self) -> Result<()> {
self.db
.flush_async()
.await
.map_err(|e| Error::Storage(format!("Failed to flush: {e}")))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[tokio::test]
async fn test_put_get_block() {
let config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-test-blockstore"),
cache_size: 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&config.path);
let store = SledBlockStore::new(config).unwrap();
let data = Bytes::from("hello world");
let block = Block::new(data.clone()).unwrap();
store.put(&block).await.unwrap();
let retrieved = store.get(block.cid()).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().data(), &data);
assert!(store.has(block.cid()).await.unwrap());
store.delete(block.cid()).await.unwrap();
assert!(!store.has(block.cid()).await.unwrap());
}
}