use std::cmp::min;
use std::collections::HashMap;
use std::io::{self, Seek, SeekFrom, Read};
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, Arc};
use std::fs::{File};
use failure::Backtrace;
use futures::{Future, Async};
use futures_cpupool::{CpuPool, CpuFuture};
use dir_signature::v1::{self, ParseError};
use virtual_path::VPath;
pub use block_id::{BlockHash};
pub trait GetBlock {
type Data: AsRef<[u8]>;
type Error: fmt::Display;
type Future: Future<Item=Self::Data, Error=Self::Error> + 'static;
fn read_block(&self, hash: BlockHash, hint: BlockHint) -> Self::Future;
}
#[derive(Debug)]
pub struct BlockHint {
hint: Option<(VPath, PathBuf, u64)>,
}
trait Assert: Send + Sync + 'static {}
impl Assert for BlockHint {}
impl Assert for BlockHash {}
fn _object_safe() {
use futures::future::FutureResult;
let _: Option<&GetBlock<Data=Vec<u8>, Error=String,
Future=FutureResult<Vec<u8>, String>>> = None;
}
#[derive(Debug, Clone)]
struct BlockPointer {
path: Arc<PathBuf>,
offset: u64,
size: usize,
}
#[derive(Debug, Clone)]
pub struct ThreadedBlockReader {
pool: CpuPool,
blocks: Arc<RwLock<HashMap<BlockHash, BlockPointer>>>,
}
#[derive(Debug)]
pub struct FutureBlock(CpuFuture<Vec<u8>, ReadError>);
#[derive(Debug, Fail)]
pub enum ReadError {
#[fail(display="error reading file {:?}: {}", _0, _1)]
Fs(PathBuf, io::Error, Backtrace),
#[fail(display="block {} not found", _0)]
NotFound(BlockHash),
#[doc(hidden)]
#[fail(display="block lock was poisoned")]
LockError(Backtrace),
#[doc(hidden)]
#[fail(display="non-existent-error")]
__Nonexhaustive,
}
#[derive(Debug, Fail)]
pub enum DirError {
#[fail(display="error parsing index: {}", _0)]
ParseError(ParseError),
#[doc(hidden)]
#[fail(display="blocks lock was poisoned")]
LockError(Backtrace),
#[doc(hidden)]
#[fail(display="hash size is unsupported")]
HashSize(Backtrace),
#[doc(hidden)]
#[fail(display="non-existent-error")]
__Nonexhaustive,
}
impl ThreadedBlockReader {
pub fn new() -> ThreadedBlockReader {
ThreadedBlockReader {
pool: CpuPool::new(40),
blocks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn new_num_threads(num: usize) -> ThreadedBlockReader {
ThreadedBlockReader {
pool: CpuPool::new(num),
blocks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn register_dir<P: AsRef<Path>>(&self, dir: P, virtual_path: &VPath,
index_data: &[u8])
-> Result<(), DirError>
{
self._register_dir(dir.as_ref(), virtual_path, index_data)
}
fn _register_dir(&self, dir: &Path, _vpath: &VPath, index_data: &[u8])
-> Result<(), DirError>
{
let ref mut cur = io::Cursor::new(&index_data);
let mut parser = v1::Parser::new(cur).map_err(DirError::ParseError)?;
let header = parser.get_header();
let block_size = header.get_block_size();
let mut blocks = self.blocks.write()
.map_err(|_| DirError::LockError(Backtrace::new()))?;
for entry in parser.iter() {
match entry.expect("just created index is valid") {
v1::Entry::File { ref path, ref hashes, size, .. } => {
let path = Arc::new(dir.join(
path.strip_prefix("/").expect("paths are absolute")
));
let mut left = size;
for (idx, hash) in hashes.iter().enumerate() {
let id = BlockHash::from_bytes(hash)
.ok_or_else(||
DirError::HashSize(Backtrace::new()))?;
blocks.insert(id, BlockPointer {
path: path.clone(),
offset: idx as u64 * block_size,
size: min(left, block_size) as usize,
});
left = left.saturating_sub(block_size);
}
}
_ => {}
}
}
Ok(())
}
}
impl GetBlock for ThreadedBlockReader {
type Data = Vec<u8>;
type Error = ReadError;
type Future = FutureBlock;
fn read_block(&self, hash: BlockHash, _hint: BlockHint) -> FutureBlock {
let blocks = self.blocks.clone();
FutureBlock(self.pool.spawn_fn(move || {
let blocks = blocks.read()
.map_err(|_| ReadError::LockError(Backtrace::new()))?;
let pointer = blocks.get(&hash)
.ok_or_else(|| ReadError::NotFound(hash))?;
let mut result = vec![0u8; pointer.size];
let mut file = File::open(&*pointer.path)
.map_err(|e| ReadError::Fs(pointer.path.to_path_buf(), e,
Backtrace::new()))?;
file.seek(SeekFrom::Start(pointer.offset))
.map_err(|e| ReadError::Fs(pointer.path.to_path_buf(), e,
Backtrace::new()))?;
let bytes = file.read(&mut result[..])
.map_err(|e| ReadError::Fs(pointer.path.to_path_buf(), e,
Backtrace::new()))?;
result.truncate(bytes);
assert_eq!(result.len(), pointer.size);
Ok(result)
}))
}
}
impl Future for FutureBlock {
type Item = Vec<u8>;
type Error = ReadError;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
self.0.poll()
}
}
impl BlockHint {
pub fn empty() -> BlockHint {
BlockHint {
hint: None,
}
}
}