use std::collections::BTreeSet;
use std::sync::{atomic::AtomicU64, mpsc::sync_channel};
use std::thread;
use anyhow::{Context, Result, ensure};
use clap::Parser;
use rayon::prelude::*;
use tracing::*;
use crate::backend;
use crate::config::Configuration;
use crate::hashing::ObjectId;
use crate::index;
use crate::pack;
use crate::upload;
#[derive(Debug, Parser)]
#[command(verbatim_doc_comment)]
pub struct Args {
#[clap(short = 'n', long)]
dry_run: bool,
}
pub fn run(config: &Configuration, repository: &camino::Utf8Path, args: Args) -> Result<()> {
let (_cfg, cached_backend) = backend::open(
repository,
config.cache_size,
backend::CacheBehavior::Normal,
)?;
let superseded = cached_backend
.list_indexes()?
.iter()
.map(|(idx, _idx_len)| idx)
.map(backend::id_from_path)
.collect::<Result<BTreeSet<ObjectId>>>()?;
let replacing = index::Index {
supersedes: superseded.clone(),
..Default::default()
};
let (pack_tx, pack_rx) = sync_channel(num_cpus::get_physical());
let (upload_tx, upload_rx) = sync_channel(0);
let indexed_packs = AtomicU64::new(0); let indexer = thread::spawn(move || {
index::index(
index::Resumable::No,
replacing,
pack_rx,
upload_tx,
&indexed_packs,
)
});
info!("Reading all packs to build a new index");
cached_backend
.list_packs()?
.par_iter()
.try_for_each_with::<_, _, Result<()>>(pack_tx, |pack_tx, (pack_file, _pack_len)| {
let id = backend::id_from_path(pack_file)?;
let manifest = pack::load_manifest(&id, &cached_backend)?;
let metadata = pack::PackMetadata { id, manifest };
pack_tx
.send(metadata)
.context("Pack thread closed unexpectedly")?;
Ok(())
})?;
let umode = if args.dry_run {
upload::Mode::DryRun
} else {
upload::Mode::LiveFire
};
upload::upload(umode, &cached_backend, upload_rx)?;
ensure!(indexer.join().unwrap()?, "No new index built");
if !args.dry_run {
info!("Uploaded a new index; removing previous ones");
for old_index in superseded {
cached_backend.remove_index(&old_index)?;
}
}
Ok(())
}