use std::fs::{self, File};
use std::str::FromStr;
use std::sync::{
atomic::AtomicU64,
mpsc::{Receiver, SyncSender, sync_channel},
};
use std::thread;
use anyhow::{Context, Result, bail};
use camino::Utf8Path;
use rustc_hash::FxHashSet;
use tracing::*;
use crate::backend;
use crate::blob::Blob;
use crate::hashing::ObjectId;
use crate::index;
use crate::pack;
use crate::upload;
pub enum Mode {
DryRun,
LiveFire,
}
pub struct Backup<'scope, 'env> {
pub chunk_tx: SyncSender<Blob>,
pub tree_tx: SyncSender<Blob>,
pub upload_tx: SyncSender<(String, File)>,
pub statistics: &'env BackupStatistics,
threads: thread::ScopedJoinHandle<'scope, Result<()>>,
}
#[derive(Debug, Default)]
pub struct BackupStatistics {
pub chunk_bytes: AtomicU64,
pub tree_bytes: AtomicU64,
pub compressed_bytes: AtomicU64,
pub indexed_packs: AtomicU64,
}
impl Backup<'_, '_> {
pub fn join(self) -> Result<()> {
drop(self.chunk_tx);
drop(self.tree_tx);
drop(self.upload_tx);
self.threads.join().unwrap()?;
match fs::remove_file(index::WIP_NAME) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
otherwise => otherwise,
}
.with_context(|| format!("Couldn't remove {}", index::WIP_NAME))?;
Ok(())
}
}
pub fn spawn_backup_threads<'scope, 'env>(
s: &'scope thread::Scope<'scope, 'env>,
mode: Mode,
backend_config: &'env backend::Configuration,
cached_backend: &'env backend::CachedBackend,
starting_index: index::Index,
statistics: &'env BackupStatistics,
) -> Backup<'scope, 'env> {
let (chunk_tx, chunk_rx) = sync_channel(0);
let (tree_tx, tree_rx) = sync_channel(0);
let (upload_tx, upload_rx) = sync_channel(0);
let upload_tx2 = upload_tx.clone();
let threads = thread::Builder::new()
.name(String::from("backup master"))
.spawn_scoped(s, move || {
backup_master_thread(
mode,
chunk_rx,
tree_rx,
upload_tx2,
upload_rx,
backend_config,
cached_backend,
statistics,
starting_index,
)
})
.unwrap();
Backup {
chunk_tx,
tree_tx,
upload_tx,
statistics,
threads,
}
}
#[expect(clippy::too_many_arguments)] fn backup_master_thread<'env>(
mode: Mode,
chunk_rx: Receiver<Blob>,
tree_rx: Receiver<Blob>,
upload_tx: SyncSender<(String, File)>,
upload_rx: Receiver<(String, File)>,
backend_config: &'env backend::Configuration,
cached_backend: &'env backend::CachedBackend,
statistics: &'env BackupStatistics,
starting_index: index::Index,
) -> Result<()> {
let (chunk_index_tx, index_rx) = sync_channel(0);
let tree_index_tx = chunk_index_tx.clone();
let chunk_pack_upload_tx = upload_tx;
let tree_pack_upload_tx = chunk_pack_upload_tx.clone();
let index_upload_tx = chunk_pack_upload_tx.clone();
let pack_size = backend_config.pack_size;
let chunk_bytes = &statistics.chunk_bytes;
let tree_bytes = &statistics.tree_bytes;
let comp_bytes = &statistics.compressed_bytes;
let indexed_packs = &statistics.indexed_packs;
thread::scope(|s| {
let chunk_packer = thread::Builder::new()
.name(String::from("chunk packer"))
.spawn_scoped(s, move || {
pack::pack(
pack_size,
chunk_rx,
chunk_index_tx,
chunk_pack_upload_tx,
chunk_bytes,
comp_bytes,
)
})
.unwrap();
let tree_packer = thread::Builder::new()
.name(String::from("tree packer"))
.spawn_scoped(s, move || {
pack::pack(
pack_size,
tree_rx,
tree_index_tx,
tree_pack_upload_tx,
tree_bytes,
comp_bytes,
)
})
.unwrap();
let resumable = match mode {
Mode::LiveFire => index::Resumable::Yes,
Mode::DryRun => index::Resumable::No,
};
let indexer = thread::Builder::new()
.name(String::from("indexer"))
.spawn_scoped(s, move || {
index::index(
resumable,
starting_index,
index_rx,
index_upload_tx,
indexed_packs,
)
})
.unwrap();
let umode = match mode {
Mode::LiveFire => upload::Mode::LiveFire,
Mode::DryRun => upload::Mode::DryRun,
};
let uploader = thread::Builder::new()
.name(String::from("uploader"))
.spawn_scoped(s, move || upload::upload(umode, cached_backend, upload_rx))
.unwrap();
let mut errors: Vec<anyhow::Error> = Vec::new();
let mut append_error = |thread: &'static str, result: Option<anyhow::Error>| {
if let Some(e) = result {
errors.push(e.context(thread));
}
};
append_error("Packing chunks failed", chunk_packer.join().unwrap().err());
append_error("Packing trees failed", tree_packer.join().unwrap().err());
append_error("Indexing failed", indexer.join().unwrap().err());
append_error("Uploading failed", uploader.join().unwrap().err());
if errors.is_empty() {
Ok(())
} else {
for e in errors {
error!("{:?}", e);
}
bail!("backup failed");
}
})
}
#[derive(Default)]
pub struct ResumableBackup {
pub wip_index: index::Index,
pub cwd_packfiles: Vec<ObjectId>,
}
pub fn find_resumable(backend: &backend::CachedBackend) -> Result<Option<ResumableBackup>> {
let wip_index = match index::read_wip()? {
Some(i) => i,
None => {
trace!("No WIP index file found, nothing to resume");
return Ok(None);
}
};
info!("WIP index file found, resuming where we left off...");
debug!("Looking for packfiles that haven't been uploaded...");
let cwd_packfiles = find_cwd_packfiles(&wip_index)?;
let mut missing_packfiles: FxHashSet<ObjectId> = wip_index.packs.keys().copied().collect();
for p in &cwd_packfiles {
assert!(missing_packfiles.remove(p));
}
if !missing_packfiles.is_empty() {
debug!("Checking backend for other packfiles in the index...");
let packs = backend.list_packs()?;
let mut errs = false;
for p in &missing_packfiles {
if let Err(e) = backend::probe_pack(&packs, p) {
error!("{e}");
errs = true;
} else {
trace!("Found pack {p}");
}
}
if errs {
bail!("WIP index file references packfiles not backed up or in the working directory.");
}
}
Ok(Some(ResumableBackup {
wip_index,
cwd_packfiles,
}))
}
fn find_cwd_packfiles(index: &index::Index) -> Result<Vec<ObjectId>> {
let mut packfiles = vec![];
let cwd = std::env::current_dir()?;
let cwd: &Utf8Path = TryFrom::try_from(cwd.as_path())
.with_context(|| format!("current directory {} isn't UTF-8", cwd.display()))?;
for entry in cwd.read_dir_utf8()? {
let entry = entry?;
let name_tokens: Vec<_> = entry.file_name().split('.').collect();
if name_tokens.len() != 2 || name_tokens[1] != "pack" || !entry.file_type()?.is_file() {
continue;
}
if let Ok(id) = ObjectId::from_str(name_tokens[0]) {
if index.packs.contains_key(&id) {
trace!("Found {} in the WIP index", entry.file_name());
packfiles.push(id);
} else {
warn!(
"Found {} but it isn't in the WIP index. Ignoring",
entry.file_name()
);
}
}
}
Ok(packfiles)
}
pub fn upload_cwd_packfiles(up: &mut SyncSender<(String, File)>, packs: &[ObjectId]) -> Result<()> {
for p in packs {
let name = format!("{p}.pack");
let fd = File::open(&name).with_context(|| format!("Couldn't open {name}"))?;
up.send((name, fd))
.context("uploader channel exited early")?;
}
Ok(())
}