use std::fmt;
use std::io::prelude::*;
use std::mem::take;
use std::path::Path;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use derive_more::{Add, AddAssign};
use itertools::Itertools;
use tracing::{trace, warn};
use crate::blockdir::Address;
use crate::change::Change;
use crate::counters::Counter;
use crate::io::read_with_retries;
use crate::monitor::Monitor;
use crate::stats::{write_compressed_size, write_count, write_duration, write_size};
use crate::stitch::IterStitchedIndexHunks;
use crate::*;
pub struct BackupOptions<'cb> {
pub exclude: Exclude,
pub max_entries_per_hunk: usize,
pub change_callback: Option<ChangeCallback<'cb>>,
pub max_block_size: usize,
pub small_file_cap: u64,
pub owner: bool,
}
impl Default for BackupOptions<'_> {
fn default() -> BackupOptions<'static> {
BackupOptions {
exclude: Exclude::nothing(),
max_entries_per_hunk: 100_000,
change_callback: None,
max_block_size: 20 << 20,
small_file_cap: 1 << 20,
owner: true,
}
}
}
pub fn backup(
archive: &Archive,
source_path: &Path,
options: &BackupOptions,
monitor: Arc<dyn Monitor>,
) -> Result<BackupStats> {
let start = Instant::now();
let mut writer = BackupWriter::begin(archive, options, monitor.clone())?;
let mut stats = BackupStats::default();
let source_tree = LiveTree::open(source_path)?;
let task = monitor.start_task("Backup".to_string());
let entry_iter =
source_tree.iter_entries(Apath::root(), options.exclude.clone(), monitor.clone())?;
for entry_group in entry_iter.chunks(options.max_entries_per_hunk).into_iter() {
for mut entry in entry_group {
if !options.owner {
entry.owner.clear();
}
match writer.copy_entry(&entry, &source_tree, options, monitor.clone()) {
Err(err) => {
monitor.error(err);
stats.errors += 1;
continue;
}
Ok(Some(entry_change)) => {
match entry_change.change {
Change::Changed { .. } => monitor.count(Counter::EntriesChanged, 1),
Change::Added { .. } => monitor.count(Counter::EntriesAdded, 1),
Change::Unchanged { .. } => monitor.count(Counter::EntriesUnchanged, 1),
Change::Deleted { .. } => monitor.count(Counter::EntriesDeleted, 1),
}
if let Some(cb) = &options.change_callback {
cb(&entry_change)?;
}
}
Ok(_) => {}
}
task.set_name(format!("Backup {}", entry.apath()));
}
writer.flush_group(monitor.clone())?;
}
stats += writer.finish(monitor.clone())?;
stats.elapsed = start.elapsed();
let block_stats = &archive.block_dir.stats;
stats.read_blocks = block_stats.read_blocks.load(Relaxed);
stats.read_blocks_compressed_bytes = block_stats.read_block_compressed_bytes.load(Relaxed);
stats.read_blocks_uncompressed_bytes = block_stats.read_block_uncompressed_bytes.load(Relaxed);
Ok(stats)
}
struct BackupWriter {
band: Band,
index_builder: IndexWriter,
stats: BackupStats,
block_dir: Arc<BlockDir>,
basis_index: crate::index::IndexEntryIter<crate::stitch::IterStitchedIndexHunks>,
file_combiner: FileCombiner,
}
impl BackupWriter {
pub fn begin(
archive: &Archive,
options: &BackupOptions,
monitor: Arc<dyn Monitor>,
) -> Result<Self> {
if gc_lock::GarbageCollectionLock::is_locked(archive)? {
return Err(Error::GarbageCollectionLockHeld);
}
let basis_index = if let Some(basis_band_id) = archive.last_band_id()? {
IterStitchedIndexHunks::new(archive, basis_band_id, monitor)
} else {
IterStitchedIndexHunks::empty(archive, monitor)
}
.iter_entries(Apath::root(), Exclude::nothing());
let band = Band::create(archive)?;
let index_builder = band.index_builder();
Ok(BackupWriter {
band,
index_builder,
block_dir: archive.block_dir.clone(),
stats: BackupStats::default(),
basis_index,
file_combiner: FileCombiner::new(archive.block_dir.clone(), options.max_block_size),
})
}
fn finish(self, monitor: Arc<dyn Monitor>) -> Result<BackupStats> {
let hunks = self.index_builder.finish(monitor)?;
self.band.close(hunks as u64)?;
Ok(BackupStats { ..self.stats })
}
fn flush_group(&mut self, monitor: Arc<dyn Monitor>) -> Result<()> {
let (stats, mut entries) = self.file_combiner.drain(monitor.clone())?;
self.stats += stats;
self.index_builder.append_entries(&mut entries);
self.index_builder.finish_hunk(monitor)
}
fn copy_entry(
&mut self,
entry: &EntryValue,
source: &LiveTree,
options: &BackupOptions,
monitor: Arc<dyn Monitor>,
) -> Result<Option<EntryChange>> {
match entry.kind() {
Kind::Dir => self.copy_dir(entry, monitor.as_ref()),
Kind::File => self.copy_file(entry, source, options, monitor.clone()),
Kind::Symlink => self.copy_symlink(entry, monitor.as_ref()),
Kind::Unknown => {
self.stats.unknown_kind += 1;
Ok(None)
}
}
}
fn copy_dir(
&mut self,
source_entry: &EntryValue,
monitor: &dyn Monitor,
) -> Result<Option<EntryChange>> {
monitor.count(Counter::Dirs, 1);
self.stats.directories += 1;
self.index_builder
.push_entry(IndexEntry::metadata_from(source_entry));
Ok(None) }
fn copy_file(
&mut self,
source_entry: &EntryValue,
from_tree: &LiveTree,
options: &BackupOptions,
monitor: Arc<dyn Monitor>,
) -> Result<Option<EntryChange>> {
self.stats.files += 1;
monitor.count(Counter::Files, 1);
let apath = source_entry.apath();
let result = if let Some(basis_entry) = self.basis_index.advance_to(apath) {
if content_heuristically_unchanged(source_entry, &basis_entry) {
if all_blocks_present(&basis_entry.addrs, &self.block_dir, &monitor) {
self.stats.unmodified_files += 1;
let new_entry = IndexEntry {
addrs: basis_entry.addrs.clone(),
..IndexEntry::metadata_from(source_entry)
};
let change = if new_entry == basis_entry {
EntryChange::unchanged(&basis_entry)
} else {
trace!(%apath, "Content same, metadata changed");
EntryChange::changed(&basis_entry, source_entry)
};
self.index_builder.push_entry(new_entry);
return Ok(Some(change));
} else {
warn!(%apath, "Some referenced blocks are missing or truncated; file will be stored again");
self.stats.modified_files += 1;
self.stats.replaced_damaged_blocks += 1;
Some(EntryChange::changed(&basis_entry, source_entry))
}
} else {
self.stats.modified_files += 1;
Some(EntryChange::changed(&basis_entry, source_entry))
}
} else {
self.stats.new_files += 1;
Some(EntryChange::added(source_entry))
};
let size = source_entry.size().expect("source entry has a size");
if size == 0 {
self.index_builder
.push_entry(IndexEntry::metadata_from(source_entry));
self.stats.empty_files += 1;
monitor.count(Counter::EmptyFiles, 1);
} else {
let mut source_file = from_tree.open_file(source_entry)?;
if size <= options.small_file_cap {
self.file_combiner
.push_file(source_entry, &mut source_file, monitor.clone())?;
monitor.count(Counter::SmallFiles, 1);
} else {
let addrs = store_file_content(
apath,
&mut source_file,
&self.block_dir,
&mut self.stats,
options.max_block_size,
monitor.clone(),
)?;
self.index_builder.push_entry(IndexEntry {
addrs,
..IndexEntry::metadata_from(source_entry)
});
}
}
Ok(result)
}
fn copy_symlink(
&mut self,
source_entry: &EntryValue,
monitor: &dyn Monitor,
) -> Result<Option<EntryChange>> {
monitor.count(Counter::Symlinks, 1);
let target = source_entry.symlink_target();
self.stats.symlinks += 1;
assert!(target.is_some());
self.index_builder
.push_entry(IndexEntry::metadata_from(source_entry));
Ok(None)
}
}
fn all_blocks_present(
addresses: &[Address],
block_dir: &BlockDir,
monitor: &Arc<dyn Monitor>,
) -> bool {
addresses
.iter()
.map(|addr| &addr.hash)
.unique()
.all(|hash| block_dir.contains(hash, monitor.clone()).unwrap_or(false))
}
fn store_file_content(
apath: &Apath,
from_file: &mut dyn Read,
block_dir: &BlockDir,
stats: &mut BackupStats,
max_block_size: usize,
monitor: Arc<dyn Monitor>,
) -> Result<Vec<Address>> {
let mut addresses = Vec::<Address>::with_capacity(1);
loop {
let buffer = read_with_retries(max_block_size, from_file).map_err(|source| {
Error::ReadSourceFile {
path: apath.to_string().into(),
source,
}
})?;
if buffer.is_empty() {
break;
}
let buffer = buffer.freeze();
monitor.count(Counter::FileBytes, buffer.len());
let len = buffer.len() as u64;
let hash = block_dir.store_or_deduplicate(buffer, stats, monitor.clone())?;
addresses.push(Address {
hash,
start: 0,
len,
});
}
match addresses.len() {
0 => {
monitor.count(Counter::EmptyFiles, 1);
stats.empty_files += 1;
}
1 => {
monitor.count(Counter::SingleBlockFiles, 1);
stats.single_block_files += 1
}
_ => {
monitor.count(Counter::MultiBlockFiles, 1);
stats.multi_block_files += 1
}
}
Ok(addresses)
}
struct FileCombiner {
buf: BytesMut,
queue: Vec<QueuedFile>,
finished: Vec<IndexEntry>,
stats: BackupStats,
block_dir: Arc<BlockDir>,
max_block_size: usize,
}
struct QueuedFile {
start: usize,
len: usize,
entry: IndexEntry,
}
impl FileCombiner {
fn new(block_dir: Arc<BlockDir>, max_block_size: usize) -> FileCombiner {
FileCombiner {
block_dir,
buf: BytesMut::new(),
queue: Vec::new(),
finished: Vec::new(),
stats: BackupStats::default(),
max_block_size,
}
}
fn drain(&mut self, monitor: Arc<dyn Monitor>) -> Result<(BackupStats, Vec<IndexEntry>)> {
self.flush(monitor)?;
debug_assert!(self.queue.is_empty());
debug_assert!(self.buf.is_empty());
Ok((
std::mem::take(&mut self.stats),
std::mem::take(&mut self.finished),
))
}
fn flush(&mut self, monitor: Arc<dyn Monitor>) -> Result<()> {
if self.queue.is_empty() {
debug_assert!(self.buf.is_empty());
return Ok(());
}
let hash = self.block_dir.store_or_deduplicate(
take(&mut self.buf).freeze(),
&mut self.stats,
monitor,
)?;
self.stats.combined_blocks += 1;
self.finished
.extend(self.queue.drain(..).map(|qf| IndexEntry {
addrs: vec![Address {
hash: hash.clone(),
start: qf.start.try_into().unwrap(),
len: qf.len.try_into().unwrap(),
}],
..qf.entry
}));
Ok(())
}
fn push_file(
&mut self,
entry: &EntryValue,
from_file: &mut dyn Read,
monitor: Arc<dyn Monitor>,
) -> Result<()> {
let start = self.buf.len();
let expected_len: usize = entry
.size()
.expect("small file has no length")
.try_into()
.unwrap();
let index_entry = IndexEntry::metadata_from(entry);
if expected_len == 0 {
self.stats.empty_files += 1;
self.finished.push(index_entry);
return Ok(());
}
self.buf.resize(start + expected_len, 0);
let len =
from_file
.read(&mut self.buf[start..])
.map_err(|source| Error::ReadSourceFile {
path: entry.apath.to_string().into(),
source,
})?;
self.buf.truncate(start + len);
if len == 0 {
self.stats.empty_files += 1;
self.finished.push(index_entry);
return Ok(());
}
self.stats.small_combined_files += 1;
self.queue.push(QueuedFile {
start,
len,
entry: index_entry,
});
if self.buf.len() >= self.max_block_size {
self.flush(monitor)
} else {
Ok(())
}
}
}
fn content_heuristically_unchanged<E: EntryTrait, O: EntryTrait>(
new_entry: &E,
basis_entry: &O,
) -> bool {
basis_entry.kind() == new_entry.kind()
&& basis_entry.mtime() == new_entry.mtime()
&& basis_entry.size() == new_entry.size()
}
#[derive(Add, AddAssign, Debug, Default, Eq, PartialEq, Clone)]
pub struct BackupStats {
pub files: usize,
pub symlinks: usize,
pub directories: usize,
pub unknown_kind: usize,
pub unmodified_files: usize,
pub modified_files: usize,
pub new_files: usize,
pub replaced_damaged_blocks: usize,
pub deduplicated_bytes: u64,
pub uncompressed_bytes: u64,
pub compressed_bytes: u64,
pub deduplicated_blocks: usize,
pub written_blocks: usize,
pub combined_blocks: usize,
pub empty_files: usize,
pub small_combined_files: usize,
pub single_block_files: usize,
pub multi_block_files: usize,
pub errors: usize,
pub elapsed: Duration,
pub read_blocks: usize,
pub read_blocks_uncompressed_bytes: usize,
pub read_blocks_compressed_bytes: usize,
}
impl fmt::Display for BackupStats {
fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
write_count(w, "files:", self.files);
write_count(w, " unmodified files", self.unmodified_files);
write_count(w, " modified files", self.modified_files);
write_count(w, " new files", self.new_files);
write_count(w, "symlinks", self.symlinks);
write_count(w, "directories", self.directories);
write_count(w, "unsupported file kind", self.unknown_kind);
writeln!(w).unwrap();
write_count(w, "files stored:", self.new_files + self.modified_files);
write_count(w, " empty files", self.empty_files);
write_count(w, " small combined files", self.small_combined_files);
write_count(w, " single block files", self.single_block_files);
write_count(w, " multi-block files", self.multi_block_files);
writeln!(w).unwrap();
write_count(w, "data blocks deduplicated:", self.deduplicated_blocks);
write_size(w, " saved", self.deduplicated_bytes);
writeln!(w).unwrap();
write_count(w, "new data blocks written:", self.written_blocks);
write_count(w, " blocks of combined files", self.combined_blocks);
write_compressed_size(w, self.compressed_bytes, self.uncompressed_bytes);
writeln!(w).unwrap();
write_count(w, "blocks read", self.read_blocks);
write_size(
w,
" uncompressed",
self.read_blocks_uncompressed_bytes as u64,
);
write_size(w, " compressed", self.read_blocks_compressed_bytes as u64);
writeln!(w).unwrap();
write_count(w, "errors", self.errors);
write_duration(w, "elapsed", self.elapsed)?;
Ok(())
}
}