use std::collections::{BTreeMap, BTreeSet};
use std::thread;
use anyhow::Result;
use camino::Utf8Path;
use clap::Parser;
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use tracing::*;
use crate::backend;
use crate::backup;
use crate::config::Configuration;
use crate::file_util::nice_size;
use crate::hashing::ObjectId;
use crate::index;
use crate::pack;
use crate::read;
use crate::repack;
use crate::snapshot;
use crate::tree;
#[derive(Debug, Parser)]
#[command(verbatim_doc_comment)]
pub struct Args {
#[clap(short = 'n', long)]
dry_run: bool,
}
pub fn run(config: &Configuration, repository: &Utf8Path, args: Args) -> Result<()> {
let (backend_config, cached_backend) = backend::open(
repository,
config.cache_size,
backend::CacheBehavior::Normal,
)?;
let index = index::build_master_index(&cached_backend)?;
let blob_map = index::blob_to_pack_map(&index)?;
let snapshots = snapshot::load_chronologically(&cached_backend)?;
let snapshots_and_forests = repack::load_forests(
snapshots,
&mut tree::Cache::new(&index, &blob_map, &cached_backend),
)?;
let reachable_blobs = reachable_blobs(snapshots_and_forests.par_iter().map(|s| &s.forest));
let (reusable_packs, packs_to_prune) = partition_reusable_packs(&index, &reachable_blobs);
let (droppable_packs, sparse_packs) =
partition_droppable_packs(&packs_to_prune, &reachable_blobs);
drop(reachable_blobs);
let reusable_size = packs_blob_size(reusable_packs.values());
if packs_to_prune.is_empty() {
println!("All {reusable_size} in use! Nothing to do.");
return Ok(());
}
let superseded = cached_backend
.list_indexes()?
.iter()
.map(|(idx, _idx_len)| idx)
.map(backend::id_from_path)
.collect::<Result<BTreeSet<ObjectId>>>()?;
fn idlist<S: ToString, I: Iterator<Item = S>>(p: I) -> String {
p.map(|id| id.to_string()).collect::<Vec<_>>().join(", ")
}
trace!(
"Packs [{}] are entirely in use",
idlist(reusable_packs.keys())
);
debug!("Packs [{}] could be repacked", idlist(sparse_packs.keys()));
debug!("Packs [{}] can be dropped", idlist(droppable_packs.keys()));
debug!("Indexes [{}] could be replaced", idlist(superseded.iter()));
println!(
"Keep {} packs ({reusable_size}), rewrite {} ({}), drop {} ({}), and replace the {} current indexes",
reusable_packs.len(),
sparse_packs.len(),
packs_blob_size(sparse_packs.values()),
droppable_packs.len(),
packs_blob_size(droppable_packs.values()),
superseded.len()
);
drop(sparse_packs);
drop(droppable_packs);
let reusable_packs: BTreeMap<ObjectId, pack::PackManifest> = reusable_packs
.into_iter()
.map(|(id, manifest)| (*id, manifest.clone()))
.collect();
let mut new_index = index::Index {
packs: reusable_packs,
supersedes: superseded.clone(),
};
let mut packed_blobs = index::blob_id_set(&new_index)?;
let maybe_resumable = backup::find_resumable(&cached_backend)?;
let mut packs_to_upload = vec![];
if let Some(backup::ResumableBackup {
wip_index,
cwd_packfiles,
}) = maybe_resumable
{
if wip_index.supersedes != new_index.supersedes {
warn!("WIP index file supersedes a different set of indexes. Starting over.");
}
else if !wip_index
.packs
.keys()
.collect::<FxHashSet<_>>()
.is_superset(&new_index.packs.keys().collect())
{
warn!("WIP index file isn't a superset of reusable packs. Starting over.");
} else {
for manifest in wip_index.packs.values() {
for entry in manifest {
packed_blobs.insert(entry.id);
}
}
packs_to_upload = cwd_packfiles;
new_index = wip_index;
}
}
let bmode = if args.dry_run {
backup::Mode::DryRun
} else {
backup::Mode::LiveFire
};
let back_stats = backup::BackupStatistics::default();
let walk_stats = repack::WalkStatistics::default();
thread::scope(|s| -> Result<()> {
let mut backup = backup::spawn_backup_threads(
s,
bmode,
&backend_config,
&cached_backend,
new_index,
&back_stats,
);
let progress_thread = repack::ui::ProgressThread::spawn(
s,
&back_stats,
&walk_stats,
&cached_backend.bytes_downloaded,
&cached_backend.bytes_uploaded,
);
let run_res = (|| {
if !args.dry_run {
backup::upload_cwd_packfiles(&mut backup.upload_tx, &packs_to_upload)?;
}
drop(packs_to_upload);
let mut reader = read::ChunkReader::new(&cached_backend, &index, &blob_map);
let filter = |_p: &Utf8Path| true;
repack::walk_snapshots(
repack::Op::Prune,
&snapshots_and_forests,
filter,
&mut reader,
&mut packed_blobs,
&mut backup,
&walk_stats,
)?;
backup.join()?;
Ok(())
})();
progress_thread.join();
run_res
})?;
if !args.dry_run {
info!("Prune complete, removing old indexes");
for old_index in &superseded {
cached_backend.remove_index(old_index)?;
}
for old_pack in packs_to_prune.keys() {
cached_backend.remove_pack(old_pack)?;
}
} else {
info!("Prune complete");
}
Ok(())
}
fn reachable_blobs<'a, I: ParallelIterator<Item = &'a tree::Forest>>(
forests: I,
) -> FxHashSet<ObjectId> {
forests
.map(blobs_in_forest)
.reduce(FxHashSet::default, |mut a, b| {
a.extend(b);
a
})
}
fn blobs_in_forest(forest: &tree::Forest) -> FxHashSet<ObjectId> {
let mut blobs = FxHashSet::default();
for (f, t) in forest {
blobs.insert(*f);
blobs.extend(tree::chunks_in_tree(t));
}
blobs
}
#[allow(clippy::type_complexity)]
fn partition_reusable_packs<'a>(
index: &'a index::Index,
reachable_blobs: &FxHashSet<ObjectId>,
) -> (
BTreeMap<&'a ObjectId, &'a pack::PackManifest>,
BTreeMap<&'a ObjectId, &'a pack::PackManifest>,
) {
index.packs.iter().partition(|(_pack_id, manifest)| {
manifest
.iter()
.map(|entry| &entry.id)
.all(|id| reachable_blobs.contains(id))
})
}
#[allow(clippy::type_complexity)]
fn partition_droppable_packs<'a>(
packs_to_prune: &BTreeMap<&'a ObjectId, &'a pack::PackManifest>,
reachable_blobs: &FxHashSet<ObjectId>,
) -> (
BTreeMap<&'a ObjectId, &'a pack::PackManifest>,
BTreeMap<&'a ObjectId, &'a pack::PackManifest>,
) {
packs_to_prune.iter().partition(|(_pack_id, manifest)| {
!manifest
.iter()
.map(|entry| &entry.id)
.any(|id| reachable_blobs.contains(id))
})
}
fn packs_blob_size<'a, 'b: 'a, I: Iterator<Item = &'a &'b pack::PackManifest>>(
manifests: I,
) -> String {
nice_size(
manifests
.map(|m| m.iter().map(|e| e.length as u64).sum::<u64>())
.sum(),
)
}