use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::io;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use blake2_rfc::blake2b;
use blake2_rfc::blake2b::Blake2b;
use nutmeg::models::UnboundedModel;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use thousands::Separable;
use crate::blockhash::BlockHash;
use crate::compress::snappy::{Compressor, Decompressor};
use crate::kind::Kind;
use crate::stats::{BackupStats, Sizes, ValidateStats};
use crate::transport::local::LocalTransport;
use crate::transport::{DirEntry, ListDirNames, Transport};
use crate::*;
const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2;
const SUBDIR_NAME_CHARS: usize = 3;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Address {
pub hash: BlockHash,
#[serde(default)]
#[serde(skip_serializing_if = "crate::misc::zero_u64")]
pub start: u64,
pub len: u64,
}
#[derive(Clone, Debug)]
pub struct BlockDir {
transport: Arc<dyn Transport>,
}
fn subdir_relpath(block_hash: &str) -> &str {
&block_hash[..SUBDIR_NAME_CHARS]
}
fn block_relpath(hash: &BlockHash) -> String {
let hash_hex = hash.to_string();
format!("{}/{}", subdir_relpath(&hash_hex), hash_hex)
}
impl BlockDir {
pub fn open_path(path: &Path) -> BlockDir {
BlockDir::open(Box::new(LocalTransport::new(path)))
}
pub fn open(transport: Box<dyn Transport>) -> BlockDir {
BlockDir {
transport: Arc::from(transport),
}
}
pub fn create_path(path: &Path) -> Result<BlockDir> {
BlockDir::create(Box::new(LocalTransport::new(path)))
}
pub fn create(transport: Box<dyn Transport>) -> Result<BlockDir> {
transport
.create_dir("")
.map_err(|source| Error::CreateBlockDir { source })?;
Ok(BlockDir {
transport: Arc::from(transport),
})
}
pub(crate) fn compress_and_store(&mut self, in_buf: &[u8], hash: &BlockHash) -> Result<u64> {
let mut compressor = Compressor::new();
let compressed = compressor.compress(in_buf)?;
let comp_len: u64 = compressed.len().try_into().unwrap();
let hex_hash = hash.to_string();
let relpath = block_relpath(hash);
self.transport.create_dir(subdir_relpath(&hex_hash))?;
self.transport
.write_file(&relpath, compressed)
.or_else(|io_err| {
if io_err.kind() == io::ErrorKind::AlreadyExists {
ui::problem(&format!(
"Unexpected late detection of existing block {hex_hash:?}"
));
Ok(())
} else {
Err(Error::WriteBlock {
hash: hex_hash,
source: io_err,
})
}
})?;
Ok(comp_len)
}
pub(crate) fn store_or_deduplicate(
&mut self,
block_data: &[u8],
stats: &mut BackupStats,
) -> Result<BlockHash> {
let hash = self.hash_bytes(block_data);
if self.contains(&hash)? {
stats.deduplicated_blocks += 1;
stats.deduplicated_bytes += block_data.len() as u64;
} else {
let comp_len = self.compress_and_store(block_data, &hash)?;
stats.written_blocks += 1;
stats.uncompressed_bytes += block_data.len() as u64;
stats.compressed_bytes += comp_len;
}
Ok(hash)
}
pub fn contains(&self, hash: &BlockHash) -> Result<bool> {
self.transport
.is_file(&block_relpath(hash))
.map_err(Error::from)
}
pub fn compressed_size(&self, hash: &BlockHash) -> Result<u64> {
Ok(self.transport.metadata(&block_relpath(hash))?.len)
}
pub fn get(&self, address: &Address) -> Result<(Vec<u8>, Sizes)> {
let (mut decompressed, sizes) = self.get_block_content(&address.hash)?;
let len = address.len as usize;
let start = address.start as usize;
let actual_len = decompressed.len();
if (start + len) > actual_len {
return Err(Error::AddressTooLong {
address: address.to_owned(),
actual_len,
});
}
if start != 0 {
let trimmed = decompressed[start..(start + len)].to_owned();
Ok((trimmed, sizes))
} else {
decompressed.truncate(len);
Ok((decompressed, sizes))
}
}
pub fn delete_block(&self, hash: &BlockHash) -> Result<()> {
self.transport
.remove_file(&block_relpath(hash))
.map_err(Error::from)
}
fn subdirs(&self) -> Result<Vec<String>> {
let ListDirNames { mut dirs, .. } = self.transport.list_dir_names("")?;
dirs.retain(|dirname| {
if dirname.len() == SUBDIR_NAME_CHARS {
true
} else {
ui::problem(&format!("Unexpected subdirectory in blockdir: {dirname:?}"));
false
}
});
Ok(dirs)
}
fn iter_block_dir_entries(&self) -> Result<impl Iterator<Item = DirEntry>> {
let transport = self.transport.clone();
Ok(self
.subdirs()?
.into_iter()
.map(move |subdir_name| transport.iter_dir_entries(&subdir_name))
.filter_map(|iter_or| {
if let Err(ref err) = iter_or {
ui::problem(&format!("Error listing block directory: {:?}", &err));
}
iter_or.ok()
})
.flatten()
.filter_map(|iter_or| {
if let Err(ref err) = iter_or {
ui::problem(&format!("Error listing block subdirectory: {:?}", &err));
}
iter_or.ok()
})
.filter(|DirEntry { name, kind, .. }| {
*kind == Kind::File
&& name.len() == BLOCKDIR_FILE_NAME_LEN
&& !name.starts_with(TMP_PREFIX)
}))
}
pub fn block_names(&self) -> Result<impl Iterator<Item = BlockHash>> {
let progress = nutmeg::View::new("List blocks", ui::nutmeg_options());
progress.update(|_| ());
Ok(self
.iter_block_dir_entries()?
.filter_map(|de| de.name.parse().ok()))
}
pub fn block_names_set(&self) -> Result<HashSet<BlockHash>> {
let progress = nutmeg::View::new(UnboundedModel::new("List blocks"), ui::nutmeg_options());
Ok(self
.iter_block_dir_entries()?
.filter_map(|de| de.name.parse().ok())
.inspect(|_| progress.update(|model| model.increment(1)))
.collect())
}
pub fn validate(&self, stats: &mut ValidateStats) -> Result<HashMap<BlockHash, usize>> {
ui::println("Count blocks...");
let blocks = self.block_names_set()?;
crate::ui::println(&format!(
"Check {} blocks...",
blocks.len().separate_with_commas()
));
stats.block_read_count = blocks.len().try_into().unwrap();
struct ProgressModel {
total_blocks: usize,
blocks_done: usize,
bytes_done: usize,
start: Instant,
}
impl nutmeg::Model for ProgressModel {
fn render(&mut self, _width: usize) -> String {
format!(
"Check block {}/{}: {} done, {} MB checked, {} remaining",
self.blocks_done,
self.total_blocks,
nutmeg::percent_done(self.blocks_done, self.total_blocks),
self.bytes_done / 1_000_000,
nutmeg::estimate_remaining(&self.start, self.blocks_done, self.total_blocks)
)
}
}
let progress_bar = nutmeg::View::new(
ProgressModel {
total_blocks: blocks.len(),
blocks_done: 0,
bytes_done: 0,
start: Instant::now(),
},
ui::nutmeg_options(),
);
let results: Vec<Option<(BlockHash, usize)>> = blocks
.into_par_iter()
.map(|hash| {
let r = self
.get_block_content(&hash)
.map(|(bytes, _sizes)| (hash, bytes.len()))
.ok();
let bytes = r.as_ref().map(|x| x.1).unwrap_or_default();
progress_bar.update(|model| {
model.blocks_done += 1;
model.bytes_done += bytes
});
r
})
.collect();
stats.block_error_count += results.iter().filter(|o| o.is_none()).count();
let len_map: HashMap<BlockHash, usize> = results
.into_iter()
.flatten() .collect();
Ok(len_map)
}
pub fn get_block_content(&self, hash: &BlockHash) -> Result<(Vec<u8>, Sizes)> {
let mut decompressor = Decompressor::new();
let block_relpath = block_relpath(hash);
let compressed_bytes =
self.transport
.read_file(&block_relpath)
.map_err(|source| Error::ReadBlock {
source,
hash: hash.to_string(),
})?;
let decompressed_bytes = decompressor.decompress(&compressed_bytes)?;
let actual_hash = BlockHash::from(blake2b::blake2b(
BLAKE_HASH_SIZE_BYTES,
&[],
decompressed_bytes,
));
if actual_hash != *hash {
ui::problem(&format!(
"Block file {:?} has actual decompressed hash {}",
&block_relpath, actual_hash
));
return Err(Error::BlockCorrupt {
hash: hash.to_string(),
actual_hash: actual_hash.to_string(),
});
}
let sizes = Sizes {
uncompressed: decompressed_bytes.len() as u64,
compressed: compressed_bytes.len() as u64,
};
Ok((decompressor.take_buffer(), sizes))
}
fn hash_bytes(&self, in_buf: &[u8]) -> BlockHash {
let mut hasher = Blake2b::new(BLAKE_HASH_SIZE_BYTES);
hasher.update(in_buf);
BlockHash::from(hasher.finalize())
}
}