use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::thread;
use anyhow::{Result, bail};
use clap::Parser;
use console::Term;
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::*;
use crate::backend;
use crate::config::Configuration;
use crate::hashing::ObjectId;
use crate::index;
use crate::pack;
use crate::progress::{ProgressThread, print_download_line, spinner};
use crate::snapshot;
use crate::tree;
#[derive(Debug, Parser)]
#[clap(verbatim_doc_comment)]
pub struct Args {
#[clap(short, long)]
read_packs: bool,
}
#[derive(Default)]
pub struct ReadStatus {
packs_total: u32,
packs_read: AtomicU32,
blobs_total: u64,
blobs_read: AtomicU64,
}
pub fn run(config: &Configuration, repository: &camino::Utf8Path, args: Args) -> Result<()> {
let mut trouble = false;
let (_cfg, cached_backend) = backend::open(
repository,
config.cache_size,
backend::CacheBehavior::AlwaysRead,
)?;
let index = index::build_master_index(&cached_backend)?;
info!("Downloading pack list");
let all_packs = cached_backend.list_packs()?;
let borked_packs = AtomicU32::new(0);
if args.read_packs {
let stats = ReadStatus {
packs_total: index.packs.len() as u32,
blobs_total: index
.packs
.values()
.map(|manifest| manifest.len() as u64)
.sum(),
..Default::default()
};
thread::scope(|s| -> Result<()> {
let progress = ProgressThread::spawn(s, |i| {
print_progress(i, &Term::stdout(), &stats, &cached_backend.bytes_downloaded)
});
index.packs.par_iter().for_each(|(pack_id, manifest)| {
match check_pack(&cached_backend, pack_id, manifest, &stats.blobs_read) {
Ok(()) => debug!("Pack {pack_id} verified"),
Err(e) => {
error!("Pack {pack_id}: {e:?}");
borked_packs.fetch_add(1, Ordering::Relaxed);
}
};
stats.packs_read.fetch_add(1, Ordering::Relaxed);
});
progress.join();
Ok(())
})?;
} else {
info!("Checking that all indexed packs are present");
for pack_id in index.packs.keys() {
match backend::probe_pack(&all_packs, pack_id) {
Ok(()) => debug!("Pack {} found", pack_id),
Err(e) => {
error!("{e:?}"); borked_packs.fetch_add(1, Ordering::Relaxed);
}
}
}
}
let borked_packs = borked_packs.load(Ordering::SeqCst);
if borked_packs != 0 {
error!("{} broken packs", borked_packs);
trouble = true;
}
info!("Checking for unreachable packs (not listed in indexes)");
warn_on_unreachable_packs(&index, &all_packs)?;
info!("Checking that all chunks in snapshots are reachable");
let blob_map = index::blob_to_pack_map(&index)?;
let chunks_to_snapshots = map_chunks_to_snapshots(
&cached_backend,
&mut tree::Cache::new(&index, &blob_map, &cached_backend),
)?;
let mut missing_chunks: usize = 0;
for (chunk, snapshots) in &chunks_to_snapshots {
if !blob_map.contains_key(chunk) {
error!(
"Chunk {} is unreachable! (Used by snapshots {})",
chunk,
snapshots
.iter()
.map(|id| id.short_name())
.collect::<Vec<String>>()
.join(", ")
);
missing_chunks += 1;
}
}
if missing_chunks > 0 {
error!("{} missing chunks", missing_chunks);
trouble = true;
}
if trouble {
bail!("Check failed!");
} else {
info!("Check complete");
Ok(())
}
}
fn check_pack(
cached_backend: &backend::CachedBackend,
pack_id: &ObjectId,
manifest: &[pack::PackManifestEntry],
blobs_read: &AtomicU64,
) -> Result<()> {
let mut pack = cached_backend.read_pack(pack_id)?;
pack::verify(&mut pack, manifest, blobs_read)?;
Ok(())
}
pub fn warn_on_unreachable_packs(index: &index::Index, all_packs: &[(String, u64)]) -> Result<u64> {
let mut total_pack_size = 0u64;
let pack_ids = all_packs
.iter()
.map(|(pack, pack_len)| {
total_pack_size += *pack_len;
pack
})
.map(backend::id_from_path)
.collect::<Result<Vec<_>>>()?;
let mut unlisted_packs: usize = 0;
for pack_id in pack_ids {
if !index.packs.contains_key(&pack_id) {
warn!("Pack {pack_id} not listed in any index");
unlisted_packs += 1;
}
}
if unlisted_packs > 0 {
warn!(
"{unlisted_packs} {} unreachable. Consider running `rebuild-index` if you aren't running `backup` right now.",
if unlisted_packs == 1 {
"pack is"
} else {
"packs are"
}
);
}
Ok(total_pack_size)
}
fn map_chunks_to_snapshots(
cached_backend: &backend::CachedBackend,
tree_cache: &mut tree::Cache,
) -> Result<FxHashMap<ObjectId, FxHashSet<ObjectId>>> {
let mut chunks_to_snapshots = FxHashMap::default();
for (snapshot_path, _snapshot_len) in cached_backend.list_snapshots()? {
let snapshot_id = backend::id_from_path(&snapshot_path)?;
let snapshot = snapshot::load(&snapshot_id, cached_backend)?;
let snapshot_tree = tree::forest_from_root(&snapshot.tree, tree_cache)?;
for chunks in snapshot_tree
.values()
.map(|tree| tree::chunks_in_tree(tree))
{
for chunk in chunks {
let needed_by: &mut FxHashSet<ObjectId> =
chunks_to_snapshots.entry(chunk).or_default();
needed_by.insert(snapshot_id);
}
}
}
Ok(chunks_to_snapshots)
}
fn print_progress(i: usize, term: &Term, stats: &ReadStatus, down: &AtomicU64) -> Result<()> {
if i > 0 {
term.clear_last_lines(2)?;
}
let s = spinner(i);
let p = stats.packs_read.load(Ordering::Relaxed);
let tp = stats.packs_total;
let b = stats.blobs_read.load(Ordering::Relaxed);
let tb = stats.blobs_total;
let perc = b as f64 / tb as f64 * 100.0;
println!("{s} {p}/{tp} packs | {b}/{tb} blobs ({perc:.0}%)");
let db = down.load(Ordering::Relaxed);
print_download_line(db);
Ok(())
}