use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use bytes::Bytes;
use lru::LruCache;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use tracing::{instrument, trace};
use crate::compress::snappy::{Compressor, Decompressor};
use crate::counters::Counter;
use crate::monitor::Monitor;
use crate::transport::ListDir;
use crate::*;
const SUBDIR_NAME_CHARS: usize = 3;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Address {
pub hash: BlockHash,
#[serde(default)]
#[serde(skip_serializing_if = "crate::misc::zero_u64")]
pub start: u64,
pub len: u64,
}
#[derive(Debug)]
pub struct BlockDir {
transport: Arc<dyn Transport>,
pub stats: BlockDirStats,
cache: RwLock<LruCache<BlockHash, Bytes>>,
exists: RwLock<LruCache<BlockHash, ()>>,
}
fn subdir_relpath(block_hash: &str) -> &str {
&block_hash[..SUBDIR_NAME_CHARS]
}
pub fn block_relpath(hash: &BlockHash) -> String {
let hash_hex = hash.to_string();
format!("{}/{}", subdir_relpath(&hash_hex), hash_hex)
}
impl BlockDir {
pub fn open(transport: Arc<dyn Transport>) -> BlockDir {
const BLOCK_CACHE_SIZE: usize = 100;
const EXISTENCE_CACHE_SIZE: usize = (64 << 20) / BLAKE_HASH_SIZE_BYTES;
BlockDir {
transport,
stats: BlockDirStats::default(),
cache: RwLock::new(LruCache::new(BLOCK_CACHE_SIZE.try_into().unwrap())),
exists: RwLock::new(LruCache::new(EXISTENCE_CACHE_SIZE.try_into().unwrap())),
}
}
pub fn create(transport: Arc<dyn Transport>) -> Result<BlockDir> {
transport.create_dir("")?;
Ok(BlockDir::open(transport))
}
pub(crate) fn store_or_deduplicate(
&self,
block_data: Bytes,
stats: &mut BackupStats,
monitor: Arc<dyn Monitor>,
) -> Result<BlockHash> {
let hash = BlockHash::hash_bytes(&block_data);
let uncomp_len = block_data.len() as u64;
if self.contains(&hash, monitor.clone())? {
stats.deduplicated_blocks += 1;
stats.deduplicated_bytes += uncomp_len;
monitor.count(Counter::DeduplicatedBlocks, 1);
monitor.count(Counter::DeduplicatedBlockBytes, block_data.len());
return Ok(hash);
}
let compressed = Compressor::new().compress(&block_data)?;
monitor.count(Counter::BlockWriteUncompressedBytes, block_data.len());
let comp_len: u64 = compressed.len().try_into().unwrap();
let hex_hash = hash.to_string();
let relpath = block_relpath(&hash);
self.transport.create_dir(subdir_relpath(&hex_hash))?;
self.transport.write_file(&relpath, &compressed)?;
stats.written_blocks += 1;
stats.uncompressed_bytes += uncomp_len;
stats.compressed_bytes += comp_len;
monitor.count(Counter::BlockWrites, 1);
monitor.count(Counter::BlockWriteCompressedBytes, compressed.len());
self.cache
.write()
.expect("Lock cache")
.put(hash.clone(), block_data);
self.exists.write().unwrap().push(hash.clone(), ());
Ok(hash)
}
pub fn contains(&self, hash: &BlockHash, monitor: Arc<dyn Monitor>) -> Result<bool> {
if self.cache.read().expect("Lock cache").contains(hash)
|| self.exists.read().unwrap().contains(hash)
{
monitor.count(Counter::BlockExistenceCacheHit, 1);
self.stats.cache_hit.fetch_add(1, Relaxed);
return Ok(true);
}
monitor.count(Counter::BlockExistenceCacheMiss, 1);
match self.transport.metadata(&block_relpath(hash)) {
Err(err) if err.is_not_found() => Ok(false),
Err(err) => {
warn!(?err, ?hash, "Error checking presence of block");
Err(err.into())
}
Ok(metadata) if metadata.kind == Kind::File && metadata.len > 0 => {
self.exists.write().unwrap().put(hash.clone(), ());
Ok(true)
}
Ok(_) => Ok(false),
}
}
pub fn compressed_size(&self, hash: &BlockHash) -> Result<u64> {
Ok(self.transport.metadata(&block_relpath(hash))?.len)
}
pub fn read_address(&self, address: &Address, monitor: Arc<dyn Monitor>) -> Result<Bytes> {
let bytes = self.get_block_content(&address.hash, monitor)?;
let len = address.len as usize;
let start = address.start as usize;
let end = start + len;
let actual_len = bytes.len();
if end > actual_len {
return Err(Error::BlockTooShort {
hash: address.hash.clone(),
actual_len,
referenced_len: len,
});
}
Ok(bytes.slice(start..end))
}
#[instrument(skip(self, monitor))]
pub fn get_block_content(&self, hash: &BlockHash, monitor: Arc<dyn Monitor>) -> Result<Bytes> {
if let Some(hit) = self.cache.write().expect("Lock cache").get(hash) {
monitor.count(Counter::BlockContentCacheHit, 1);
self.stats.cache_hit.fetch_add(1, Relaxed);
trace!("Block cache hit");
return Ok(hit.clone());
}
monitor.count(Counter::BlockContentCacheMiss, 1);
let mut decompressor = Decompressor::new();
let block_relpath = block_relpath(hash);
let compressed_bytes = self.transport.read_file(&block_relpath)?;
let decompressed_bytes = decompressor.decompress(&compressed_bytes)?;
let actual_hash = BlockHash::hash_bytes(&decompressed_bytes);
if actual_hash != *hash {
return Err(Error::BlockCorrupt { hash: hash.clone() });
}
self.cache
.write()
.expect("Lock cache")
.put(hash.clone(), decompressed_bytes.clone());
self.exists.write().unwrap().put(hash.clone(), ());
self.stats.read_blocks.fetch_add(1, Relaxed);
monitor.count(Counter::BlockReads, 1);
self.stats
.read_block_compressed_bytes
.fetch_add(compressed_bytes.len(), Relaxed);
monitor.count(Counter::BlockReadCompressedBytes, compressed_bytes.len());
self.stats
.read_block_uncompressed_bytes
.fetch_add(decompressed_bytes.len(), Relaxed);
monitor.count(
Counter::BlockReadUncompressedBytes,
decompressed_bytes.len(),
);
Ok(decompressed_bytes)
}
pub fn delete_block(&self, hash: &BlockHash) -> Result<()> {
self.cache.write().expect("Lock cache").pop(hash);
self.exists.write().unwrap().pop(hash);
self.transport
.remove_file(&block_relpath(hash))
.map_err(Error::from)
}
fn subdirs(&self) -> Result<Vec<String>> {
let ListDir { mut dirs, .. } = self.transport.list_dir("")?;
dirs.retain(|dirname| {
if dirname.len() == SUBDIR_NAME_CHARS {
true
} else {
warn!("Unexpected subdirectory in blockdir: {dirname:?}");
false
}
});
Ok(dirs)
}
pub fn blocks(
&self,
monitor: Arc<dyn Monitor>,
) -> Result<impl ParallelIterator<Item = BlockHash>> {
let transport = self.transport.clone();
let task = monitor.start_task("List block subdir".to_string());
let subdirs = self.subdirs()?;
task.set_total(subdirs.len());
Ok(subdirs
.into_par_iter()
.map(move |subdir_name| {
let r = transport.list_dir(&subdir_name);
task.increment(1);
r
})
.filter_map(move |iter_or| match iter_or {
Err(source) => {
monitor.error(Error::ListBlocks { source });
None
}
Ok(ListDir { files, .. }) => Some(files),
})
.flatten()
.filter_map(|name| name.parse().ok()))
}
pub fn validate(&self, monitor: Arc<dyn Monitor>) -> Result<HashMap<BlockHash, usize>> {
debug!("Start list blocks");
let blocks = self
.blocks(monitor.clone())?
.collect::<HashSet<BlockHash>>();
debug!("Check {} blocks", blocks.len());
let task = monitor.start_task("Validate blocks".to_string());
task.set_total(blocks.len());
let block_lens = blocks
.into_par_iter()
.flat_map(
|hash| match self.get_block_content(&hash, monitor.clone()) {
Ok(bytes) => {
task.increment(1);
Some((hash, bytes.len()))
}
Err(err) => {
monitor.error(err);
None
}
},
)
.collect();
Ok(block_lens)
}
}
#[derive(Debug, Default)]
pub struct BlockDirStats {
pub read_blocks: AtomicUsize,
pub read_block_compressed_bytes: AtomicUsize,
pub read_block_uncompressed_bytes: AtomicUsize,
pub cache_hit: AtomicUsize,
}
#[cfg(test)]
mod test {
use std::fs::{create_dir, write, OpenOptions};
use tempfile::TempDir;
use crate::monitor::test::TestMonitor;
use crate::transport::open_local_transport;
use super::*;
#[test]
fn empty_block_file_counts_as_not_present() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let mut stats = BackupStats::default();
let monitor = TestMonitor::arc();
let hash = blockdir
.store_or_deduplicate(Bytes::from("stuff"), &mut stats, monitor.clone())
.unwrap();
assert_eq!(monitor.get_counter(Counter::BlockWrites), 1);
assert_eq!(monitor.get_counter(Counter::DeduplicatedBlocks), 0);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheMiss), 1);
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheMiss), 1);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1); let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let monitor = TestMonitor::arc();
OpenOptions::new()
.write(true)
.truncate(true)
.create(false)
.open(tempdir.path().join(block_relpath(&hash)))
.expect("Truncate block");
assert!(!blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 0);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheMiss), 1);
}
#[test]
fn temp_files_are_not_returned_as_blocks() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let monitor = TestMonitor::arc();
let subdir = tempdir.path().join(subdir_relpath("123"));
create_dir(&subdir).unwrap();
write(
subdir.join(format!("{}{}", TMP_PREFIX, "123123123")),
b"123",
)
.unwrap();
let blocks = blockdir
.blocks(monitor.clone())
.unwrap()
.collect::<Vec<_>>();
assert_eq!(blocks, []);
}
#[test]
fn cache_hit() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let mut stats = BackupStats::default();
let content = Bytes::from("stuff");
let hash = blockdir
.store_or_deduplicate(content.clone(), &mut stats, TestMonitor::arc())
.unwrap();
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0);
let monitor = TestMonitor::arc();
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 1);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1);
let monitor = TestMonitor::arc();
let retrieved = blockdir.get_block_content(&hash, monitor.clone()).unwrap();
assert_eq!(content, retrieved);
assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 1);
assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 0);
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 2); let retrieved = blockdir.get_block_content(&hash, monitor.clone()).unwrap();
assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 2);
assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 0);
assert_eq!(content, retrieved);
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 3); }
#[test]
fn existence_cache_hit() {
let tempdir = TempDir::new().unwrap();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
let mut stats = BackupStats::default();
let content = Bytes::from("stuff");
let monitor = TestMonitor::arc();
let hash = blockdir
.store_or_deduplicate(content.clone(), &mut stats, monitor.clone())
.unwrap();
let monitor = TestMonitor::arc();
let blockdir = BlockDir::open(open_local_transport(tempdir.path()).unwrap());
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 0);
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 1);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 1);
assert!(blockdir.contains(&hash, monitor.clone()).unwrap());
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 2);
assert_eq!(monitor.get_counter(Counter::BlockExistenceCacheHit), 2);
let retrieved = blockdir.get_block_content(&hash, monitor.clone()).unwrap();
assert_eq!(content, retrieved);
assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 1);
assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 0);
assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 2); }
}