use crate::traits::BlockStore;
use async_trait::async_trait;
use ipfrs_core::{Block, Cid, Error, Result};
use memmap2::{Mmap, MmapOptions};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct MmapConfig {
pub path: PathBuf,
pub mmap_threshold: usize,
pub use_huge_pages: bool,
pub populate: bool,
}
impl Default for MmapConfig {
fn default() -> Self {
Self {
path: PathBuf::from(".ipfrs/blocks-mmap"),
mmap_threshold: 1024 * 1024, use_huge_pages: false,
populate: false,
}
}
}
impl MmapConfig {
pub fn new(path: PathBuf) -> Self {
Self {
path,
..Default::default()
}
}
pub fn with_threshold(mut self, threshold: usize) -> Self {
self.mmap_threshold = threshold;
self
}
pub fn with_huge_pages(mut self, enable: bool) -> Self {
self.use_huge_pages = enable;
self
}
pub fn with_populate(mut self, populate: bool) -> Self {
self.populate = populate;
self
}
fn block_path(&self, cid: &Cid) -> PathBuf {
let cid_str = cid.to_string();
let dir = &cid_str[..2.min(cid_str.len())];
self.path.join(dir).join(&cid_str)
}
}
pub struct MmapBlockStore {
config: MmapConfig,
mmap_cache: Arc<RwLock<HashMap<Cid, Arc<Mmap>>>>,
}
impl MmapBlockStore {
pub fn new(config: MmapConfig) -> Result<Self> {
std::fs::create_dir_all(&config.path)
.map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
Ok(Self {
config,
mmap_cache: Arc::new(RwLock::new(HashMap::new())),
})
}
fn get_mmap(&self, cid: &Cid) -> Result<Option<Arc<Mmap>>> {
{
let cache = self.mmap_cache.read();
if let Some(mmap) = cache.get(cid) {
return Ok(Some(Arc::clone(mmap)));
}
}
let path = self.config.block_path(cid);
if !path.exists() {
return Ok(None);
}
let file = File::open(&path)
.map_err(|e| Error::Storage(format!("Failed to open block file: {e}")))?;
let metadata = file
.metadata()
.map_err(|e| Error::Storage(format!("Failed to get file metadata: {e}")))?;
if metadata.len() < self.config.mmap_threshold as u64 {
return Ok(None);
}
let mut mmap_opts = MmapOptions::new();
#[cfg(unix)]
{
if self.config.populate {
mmap_opts.populate();
}
}
let mmap = unsafe {
mmap_opts
.map(&file)
.map_err(|e| Error::Storage(format!("Failed to create mmap: {e}")))?
};
let mmap = Arc::new(mmap);
{
let mut cache = self.mmap_cache.write();
cache.insert(*cid, Arc::clone(&mmap));
}
Ok(Some(mmap))
}
fn read_block_file(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
let path = self.config.block_path(cid);
if !path.exists() {
return Ok(None);
}
let mut file = File::open(&path)
.map_err(|e| Error::Storage(format!("Failed to open block file: {e}")))?;
let mut data = Vec::new();
file.read_to_end(&mut data)
.map_err(|e| Error::Storage(format!("Failed to read block file: {e}")))?;
Ok(Some(data))
}
fn write_block_file(&self, cid: &Cid, data: &[u8]) -> Result<()> {
let path = self.config.block_path(cid);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Storage(format!("Failed to create directory: {e}")))?;
}
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.map_err(|e| Error::Storage(format!("Failed to create block file: {e}")))?;
file.write_all(data)
.map_err(|e| Error::Storage(format!("Failed to write block file: {e}")))?;
file.sync_all()
.map_err(|e| Error::Storage(format!("Failed to sync block file: {e}")))?;
Ok(())
}
pub fn config(&self) -> &MmapConfig {
&self.config
}
pub fn clear_cache(&self) {
self.mmap_cache.write().clear();
}
pub fn cache_size(&self) -> usize {
self.mmap_cache.read().len()
}
}
#[async_trait]
impl BlockStore for MmapBlockStore {
async fn put(&self, block: &Block) -> Result<()> {
self.write_block_file(block.cid(), block.data())?;
self.mmap_cache.write().remove(block.cid());
Ok(())
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
if let Some(mmap) = self.get_mmap(cid)? {
let data = bytes::Bytes::copy_from_slice(&mmap[..]);
return Ok(Some(Block::from_parts(*cid, data)));
}
if let Some(data) = self.read_block_file(cid)? {
let data = bytes::Bytes::from(data);
return Ok(Some(Block::from_parts(*cid, data)));
}
Ok(None)
}
async fn has(&self, cid: &Cid) -> Result<bool> {
let path = self.config.block_path(cid);
Ok(path.exists())
}
async fn delete(&self, cid: &Cid) -> Result<()> {
self.mmap_cache.write().remove(cid);
let path = self.config.block_path(cid);
if path.exists() {
std::fs::remove_file(&path)
.map_err(|e| Error::Storage(format!("Failed to delete block file: {e}")))?;
}
Ok(())
}
fn len(&self) -> usize {
0
}
fn is_empty(&self) -> bool {
false
}
fn list_cids(&self) -> Result<Vec<Cid>> {
let mut cids = Vec::new();
fn walk_dir(dir: &Path, cids: &mut Vec<Cid>) -> Result<()> {
if !dir.exists() {
return Ok(());
}
for entry in std::fs::read_dir(dir)
.map_err(|e| Error::Storage(format!("Failed to read directory: {e}")))?
{
let entry =
entry.map_err(|e| Error::Storage(format!("Failed to read entry: {e}")))?;
let path = entry.path();
if path.is_dir() {
walk_dir(&path, cids)?;
} else if path.is_file() {
if let Some(file_name) = path.file_name() {
if let Some(cid_str) = file_name.to_str() {
if let Ok(cid) = cid_str.parse::<Cid>() {
cids.push(cid);
}
}
}
}
}
Ok(())
}
walk_dir(&self.config.path, &mut cids)?;
Ok(cids)
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
impl MmapBlockStore {
#[allow(clippy::unused_async)]
pub async fn get_range(
&self,
cid: &Cid,
offset: u64,
length: usize,
) -> Result<Option<bytes::Bytes>> {
if let Some(mmap) = self.get_mmap(cid)? {
let start = offset as usize;
let end = (start + length).min(mmap.len());
if start >= mmap.len() {
return Ok(Some(bytes::Bytes::new()));
}
let data = bytes::Bytes::copy_from_slice(&mmap[start..end]);
return Ok(Some(data));
}
let path = self.config.block_path(cid);
if !path.exists() {
return Ok(None);
}
let mut file = File::open(&path)
.map_err(|e| Error::Storage(format!("Failed to open block file: {e}")))?;
file.seek(SeekFrom::Start(offset))
.map_err(|e| Error::Storage(format!("Failed to seek in block file: {e}")))?;
let mut buffer = vec![0u8; length];
let n = file
.read(&mut buffer)
.map_err(|e| Error::Storage(format!("Failed to read from block file: {e}")))?;
buffer.truncate(n);
Ok(Some(bytes::Bytes::from(buffer)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[tokio::test]
async fn test_mmap_put_get_block() {
let config = MmapConfig::new(PathBuf::from("/tmp/ipfrs-test-mmap"));
let _ = std::fs::remove_dir_all(&config.path);
let store = MmapBlockStore::new(config).unwrap();
let small_data = Bytes::from("small block");
let small_block = Block::new(small_data.clone()).unwrap();
store.put(&small_block).await.unwrap();
let retrieved = store.get(small_block.cid()).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().data(), &small_data);
let large_data = Bytes::from(vec![0u8; 2 * 1024 * 1024]); let large_block = Block::new(large_data.clone()).unwrap();
store.put(&large_block).await.unwrap();
let retrieved = store.get(large_block.cid()).await.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().data(), &large_data);
assert!(store.has(small_block.cid()).await.unwrap());
assert!(store.has(large_block.cid()).await.unwrap());
store.delete(small_block.cid()).await.unwrap();
assert!(!store.has(small_block.cid()).await.unwrap());
}
#[tokio::test]
async fn test_mmap_partial_read() {
let config =
MmapConfig::new(PathBuf::from("/tmp/ipfrs-test-mmap-partial")).with_threshold(1024);
let _ = std::fs::remove_dir_all(&config.path);
let store = MmapBlockStore::new(config).unwrap();
let data = Bytes::from((0..10000).map(|i| (i % 256) as u8).collect::<Vec<u8>>());
let block = Block::new(data.clone()).unwrap();
store.put(&block).await.unwrap();
let range = store.get_range(block.cid(), 100, 500).await.unwrap();
assert!(range.is_some());
let range_data = range.unwrap();
assert_eq!(range_data.len(), 500);
assert_eq!(&range_data[..], &data[100..600]);
}
#[tokio::test]
async fn test_mmap_cache() {
let config =
MmapConfig::new(PathBuf::from("/tmp/ipfrs-test-mmap-cache")).with_threshold(1024);
let _ = std::fs::remove_dir_all(&config.path);
let store = MmapBlockStore::new(config).unwrap();
let data = Bytes::from(vec![0u8; 10000]);
let block = Block::new(data.clone()).unwrap();
store.put(&block).await.unwrap();
assert_eq!(store.cache_size(), 0);
let _ = store.get(block.cid()).await.unwrap();
assert_eq!(store.cache_size(), 1);
let _ = store.get(block.cid()).await.unwrap();
assert_eq!(store.cache_size(), 1);
store.clear_cache();
assert_eq!(store.cache_size(), 0);
}
#[tokio::test]
async fn test_mmap_list_cids() {
let config = MmapConfig::new(PathBuf::from("/tmp/ipfrs-test-mmap-list"));
let _ = std::fs::remove_dir_all(&config.path);
let store = MmapBlockStore::new(config).unwrap();
let blocks: Vec<Block> = (0..5)
.map(|i| {
let data = Bytes::from(format!("block {}", i));
Block::new(data).unwrap()
})
.collect();
for block in &blocks {
store.put(block).await.unwrap();
}
let cids = store.list_cids().unwrap();
assert_eq!(cids.len(), 5);
for block in &blocks {
assert!(cids.contains(block.cid()));
}
}
}