use crate::error::Error;
use crate::repo::paths::{block_path, filestem_to_block_cid};
use crate::repo::{BlockPut, BlockStore};
use crate::repo::{BlockRm, BlockRmError};
use crate::Block;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt};
use futures_timer::Delay;
use libipld::Cid;
use std::collections::{BTreeSet, HashMap};
use std::io::{self, ErrorKind, Read};
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;
use super::RepoBlockCommand;
#[derive(Debug)]
pub struct FsBlockStore {
path: PathBuf,
tx: futures::channel::mpsc::Sender<RepoBlockCommand>,
}
pub struct FsBlockStoreTask {
timeout: Duration,
temp: HashMap<Cid, Delay>,
path: PathBuf,
rx: futures::channel::mpsc::Receiver<RepoBlockCommand>,
}
impl FsBlockStore {
pub fn new(path: PathBuf, duration: Duration) -> Self {
let (tx, rx) = futures::channel::mpsc::channel(1);
let mut task = FsBlockStoreTask {
path: path.clone(),
timeout: duration,
temp: HashMap::new(),
rx,
};
tokio::spawn(async move {
task.start().await;
});
Self { path, tx }
}
}
#[async_trait]
impl BlockStore for FsBlockStore {
async fn init(&self) -> Result<(), Error> {
fs::create_dir_all(self.path.clone()).await?;
Ok(())
}
async fn open(&self) -> Result<(), Error> {
Ok(())
}
async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Contains {
cid: *cid,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Get {
cid: *cid,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Size {
cid: cid.to_vec(),
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn total_size(&self) -> Result<usize, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::TotalSize { response: tx })
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::PutBlock {
block,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn remove(&self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Remove {
cid: *cid,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn remove_garbage(&self, references: BoxStream<'static, Cid>) -> Result<Vec<Cid>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Cleanup {
refs: references,
response: tx,
})
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn list(&self) -> Result<Vec<Cid>, Error> {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::List { response: tx })
.await;
rx.await.map_err(anyhow::Error::from)?
}
async fn wipe(&self) {
let (tx, rx) = futures::channel::oneshot::channel();
let _ = self
.tx
.clone()
.send(RepoBlockCommand::Wipe { response: tx })
.await;
let _ = rx.await.map_err(anyhow::Error::from);
}
}
impl FsBlockStoreTask {
async fn start(&mut self) {
loop {
tokio::select! {
biased;
_ = futures::future::poll_fn(|cx| {
self.temp.retain(|_, timer| timer.poll_unpin(cx).is_pending());
std::task::Poll::Pending
}) => {}
Some(command) = self.rx.next() => {
match command {
RepoBlockCommand::Contains { cid, response } => {
let _ = response.send(self.contains(&cid).await);
}
RepoBlockCommand::Get { cid, response } => {
let _ = response.send(self.get(&cid).await);
}
RepoBlockCommand::PutBlock { block, response } => {
let _ = response.send(self.put(block).await);
}
RepoBlockCommand::Size { cid, response } => {
let _ = response.send(Ok(self.size(&cid).await));
}
RepoBlockCommand::TotalSize { response } => {
let _ = response.send(Ok(self.total_size().await));
}
RepoBlockCommand::Remove { cid, response } => {
let _ = response.send(self.remove(&cid).await);
}
RepoBlockCommand::Cleanup {
refs,
response,
} => {
let _ = response.send(self.cleanup(refs).await);
},
RepoBlockCommand::List { response } => {
let _ = response.send(self.list().await);
}
RepoBlockCommand::Wipe { response } => {
let _ = response.send({
self.wipe().await;
Ok(())
});
}
}
}
}
}
}
}
impl FsBlockStoreTask {
async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
let path = block_path(self.path.clone(), cid);
let metadata = match fs::metadata(path).await {
Ok(m) => m,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
Err(e) => return Err(e.into()),
};
Ok(metadata.is_file())
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
let path = block_path(self.path.clone(), cid);
let cid = *cid;
tokio::task::spawn_blocking(move || {
let mut file = match std::fs::File::open(path) {
Ok(file) => file,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => {
return Err(e.into());
}
};
let len = file.metadata()?.len();
let mut data = Vec::with_capacity(len as usize);
file.read_to_end(&mut data)?;
let block = Block::new(cid, data)?;
Ok(Some(block))
})
.await?
}
async fn put(&mut self, block: Block) -> Result<(Cid, BlockPut), Error> {
let target_path = block_path(self.path.clone(), block.cid());
let cid = *block.cid();
let je = tokio::task::spawn_blocking(move || {
let sharded = target_path
.parent()
.expect("we already have at least the shard parent");
std::fs::create_dir_all(sharded)?;
let target = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&target_path)?;
let temp_path = target_path.with_extension("tmp");
match write_through_tempfile(target, &target_path, temp_path, block.data()) {
Ok(()) => {
trace!("successfully wrote the block");
Ok::<_, std::io::Error>(Ok(block.data().len()))
}
Err(e) => {
match std::fs::remove_file(&target_path) {
Ok(_) => debug!("removed partially written {:?}", target_path),
Err(removal) => warn!(
"failed to remove partially written {:?}: {}",
target_path, removal
),
}
Ok(Err(e))
}
}
})
.await
.map_err(|e| {
error!("blocking put task error: {}", e);
e
})?;
match je {
Ok(Ok(written)) => {
trace!(bytes = written, "block writing succeeded");
self.temp.insert(cid, Delay::new(self.timeout));
Ok((cid, BlockPut::NewBlock))
}
Ok(Err(e)) => {
trace!("write failed but hopefully the target was removed");
Err(Error::new(e))
}
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
trace!("block exist: {}", e);
Ok((cid, BlockPut::Existed))
}
Err(e) => Err(Error::new(e)),
}
}
async fn size(&self, cids: &[Cid]) -> Option<usize> {
let mut block_sizes = HashMap::new();
for cid in cids {
let path = block_path(self.path.clone(), cid);
if let Ok(size) = fs::metadata(path).await.map(|m| m.len() as usize) {
block_sizes.insert(*cid, size);
}
}
Some(block_sizes.values().sum())
}
async fn total_size(&self) -> usize {
fs::metadata(&self.path)
.await
.map(|m| m.len() as usize)
.unwrap_or_default()
}
async fn remove(&mut self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error> {
let path = block_path(self.path.clone(), cid);
trace!(cid = %cid, "removing block after synchronizing");
match fs::remove_file(path).await {
Ok(()) => Ok(Ok(BlockRm::Removed(*cid))),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Ok(Err(BlockRmError::NotFound(*cid)))
}
Err(e) => Err(e.into()),
}
}
async fn cleanup(&mut self, refs: BoxStream<'_, Cid>) -> Result<Vec<Cid>, Error> {
let mut refs = refs.collect::<BTreeSet<_>>().await;
refs.extend(self.temp.keys().cloned());
let blocks = self.list_stream().await?;
let removed_blocks = blocks
.try_filter(|(cid, _)| futures::future::ready(!refs.contains(cid)))
.try_filter_map(|(cid, path)| async move {
fs::remove_file(path).await?;
Ok(Some(cid))
})
.try_collect()
.await?;
Ok(removed_blocks)
}
async fn list_stream(&self) -> Result<BoxStream<'_, Result<(Cid, PathBuf), io::Error>>, Error> {
let stream = ReadDirStream::new(fs::read_dir(&self.path).await?);
Ok(stream
.try_filter_map(|d| async move {
Ok(if d.file_type().await?.is_dir() {
Some(ReadDirStream::new(fs::read_dir(d.path()).await?))
} else {
None
})
})
.try_flatten()
.try_filter_map(|d| {
let name = d.file_name();
let path: &std::path::Path = name.as_ref();
futures::future::ready(if path.extension() != Some("data".as_ref()) {
Ok(None)
} else {
let maybe_cid = filestem_to_block_cid(path.file_stem());
Ok(maybe_cid)
})
})
.try_filter_map(|cid| {
let path = self.path.clone();
async move {
let path = block_path(path, &cid);
Ok(Some((cid, path)))
}
})
.boxed())
}
async fn list(&self) -> Result<Vec<Cid>, Error> {
let stream = self.list_stream().await?;
let vec = stream
.try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
.try_collect()
.await?;
Ok(vec)
}
async fn wipe(&mut self) {}
}
fn write_through_tempfile(
target: std::fs::File,
target_path: impl AsRef<std::path::Path>,
temp_path: impl AsRef<std::path::Path>,
data: &[u8],
) -> Result<(), std::io::Error> {
use std::io::Write;
let mut temp = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
temp.write_all(data)?;
temp.flush()?;
temp.sync_all()?;
drop(temp);
drop(target);
std::fs::rename(temp_path, target_path)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Block;
use hex_literal::hex;
use libipld::{
multihash::{Code, MultihashDigest},
Cid, IpldCodec,
};
use std::convert::TryFrom;
use std::env::temp_dir;
use std::sync::Arc;
#[tokio::test]
async fn test_fs_blockstore() {
let mut tmp = temp_dir();
tmp.push("blockstore1");
std::fs::remove_dir_all(tmp.clone()).ok();
let store = FsBlockStore::new(tmp.clone(), Duration::ZERO);
let data = b"1".to_vec();
let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();
store.init().await.unwrap();
store.open().await.unwrap();
let contains = store.contains(&cid).await.unwrap();
assert!(!contains);
let get = store.get(&cid).await.unwrap();
assert_eq!(get, None);
if store.remove(&cid).await.unwrap().is_ok() {
panic!("block should not be found")
}
let put = store.put(block.clone()).await.unwrap();
assert_eq!(put.0, cid.to_owned());
let contains = store.contains(&cid);
assert!(contains.await.unwrap());
let get = store.get(&cid);
assert_eq!(get.await.unwrap(), Some(block.clone()));
store.remove(&cid).await.unwrap().unwrap();
let contains = store.contains(&cid);
assert!(!contains.await.unwrap());
let get = store.get(&cid);
assert_eq!(get.await.unwrap(), None);
std::fs::remove_dir_all(tmp).ok();
}
#[tokio::test]
async fn test_fs_blockstore_open() {
let mut tmp = temp_dir();
tmp.push("blockstore2");
std::fs::remove_dir_all(&tmp).ok();
let data = b"1".to_vec();
let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();
let block_store = FsBlockStore::new(tmp.clone(), Duration::ZERO);
block_store.init().await.unwrap();
block_store.open().await.unwrap();
assert!(!block_store.contains(block.cid()).await.unwrap());
block_store.put(block.clone()).await.unwrap();
let block_store = FsBlockStore::new(tmp.clone(), Duration::ZERO);
block_store.open().await.unwrap();
assert!(block_store.contains(block.cid()).await.unwrap());
assert_eq!(block_store.get(block.cid()).await.unwrap().unwrap(), block);
std::fs::remove_dir_all(&tmp).ok();
}
#[tokio::test]
async fn test_fs_blockstore_list() {
let mut tmp = temp_dir();
tmp.push("blockstore_list");
std::fs::remove_dir_all(&tmp).ok();
let block_store = FsBlockStore::new(tmp.clone(), Duration::ZERO);
block_store.init().await.unwrap();
block_store.open().await.unwrap();
for data in &[b"1", b"2", b"3"] {
let data_slice = data.to_vec();
let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
let block = Block::new(cid, data_slice).unwrap();
block_store.put(block.clone()).await.unwrap();
}
let cids = block_store.list().await.unwrap();
assert_eq!(cids.len(), 3);
for cid in cids.iter() {
assert!(block_store.contains(cid).await.unwrap());
}
}
#[tokio::test]
async fn race_to_insert_new() {
let mut tmp = temp_dir();
tmp.push("race_to_insert_new");
std::fs::remove_dir_all(&tmp).ok();
let single = FsBlockStore::new(tmp.clone(), Duration::ZERO);
single.init().await.unwrap();
let single = Arc::new(single);
let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
let data = hex!("0a0d08021207666f6f6261720a1807");
let block = Block::new(cid, data.into()).unwrap();
let count = 10;
let (writes, existing) = race_to_insert_scenario(count, block, &single).await;
assert_eq!(writes, 1);
assert_eq!(existing, count - 1);
}
async fn race_to_insert_scenario(
count: usize,
block: Block,
blockstore: &Arc<FsBlockStore>,
) -> (usize, usize) {
let barrier = Arc::new(tokio::sync::Barrier::new(count));
let join_handles = (0..count)
.map(|_| {
tokio::spawn({
let bs = Arc::clone(blockstore);
let barrier = Arc::clone(&barrier);
let block = block.clone();
async move {
barrier.wait().await;
bs.put(block).await
}
})
})
.collect::<Vec<_>>();
let mut writes = 0usize;
let mut existing = 0usize;
for jh in join_handles {
let res = jh.await;
match res {
Ok(Ok((_, BlockPut::NewBlock))) => writes += 1,
Ok(Ok((_, BlockPut::Existed))) => existing += 1,
Ok(Err(e)) => tracing::error!("joinhandle err: {e}"),
_ => unreachable!("join error"),
}
}
(writes, existing)
}
#[tokio::test]
async fn remove() {
let mut tmp = temp_dir();
tmp.push("remove");
std::fs::remove_dir_all(&tmp).ok();
let single = FsBlockStore::new(tmp.clone(), Duration::ZERO);
single.init().await.unwrap();
let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
let data = hex!("0a0d08021207666f6f6261720a1807");
let block = Block::new(cid, data.into()).unwrap();
assert_eq!(single.list().await.unwrap().len(), 0);
single.put(block).await.unwrap();
assert_eq!(single.list().await.unwrap()[0].hash(), cid.hash());
single.remove(&cid).await.unwrap().unwrap();
assert_eq!(single.list().await.unwrap().len(), 0);
}
}