use std::io::IsTerminal;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use clap::{Args, CommandFactory, Parser, Subcommand};
use crate::progress::{should_render, use_color, ColorChoice, ProgressReporter};
use snapdir_catalog::{
ancestors_json_line, locations_json_line, revisions_json_line, Catalog, SystemClock,
};
use snapdir_core::{
cache, expand_excludes, snapshot_id, walk_with_meter, Blake3Hasher, Blake3KeyedHasher,
ExcludeMatcher, ExpandedExclude, FollowMode, Hasher, Manifest, ManifestEntry, Md5Hasher, Meter,
PathMode, PathType, Phase, Sha256Hasher, Store, WalkOptions,
};
use snapdir_stores::{
resolve_adapter, Adapter, B2Store, ExternalStore, FileStore, GcsStore, S3Store, StreamStore,
TransferAdaptivePolicy, TransferConfig,
};
const ADAPTIVE_CEILING_CAP: usize = 64;
#[derive(Debug, Parser)]
#[command(
name = "snapdir",
bin_name = "snapdir",
version,
propagate_version = true,
about = "Content-addressable directory snapshots.",
long_about = None
)]
pub struct Cli {
#[command(flatten)]
pub globals: GlobalArgs,
#[command(subcommand)]
pub command: Command,
}
#[derive(Debug, Args)]
#[allow(clippy::struct_excessive_bools)]
pub struct GlobalArgs {
#[arg(long, global = true, value_name = "DIR", env = "SNAPDIR_CACHE_DIR")]
pub cache_dir: Option<PathBuf>,
#[arg(long, global = true, value_name = "NAME", env = "SNAPDIR_CATALOG")]
pub catalog: Option<String>,
#[arg(long, global = true, value_name = "URI")]
pub store: Option<String>,
#[arg(long, global = true, value_name = "ID")]
pub id: Option<String>,
#[arg(
long,
global = true,
value_name = "PATTERN",
action = clap::ArgAction::Append,
value_delimiter = ','
)]
pub exclude: Vec<String>,
#[arg(
long,
global = true,
value_name = "PATTERN",
action = clap::ArgAction::Append,
value_delimiter = ','
)]
pub paths: Vec<String>,
#[arg(long, global = true)]
pub linked: bool,
#[arg(long, global = true)]
pub force: bool,
#[arg(long, global = true)]
pub purge: bool,
#[arg(long, global = true)]
pub keep: bool,
#[arg(long, global = true)]
pub dryrun: bool,
#[arg(long, global = true)]
pub verbose: bool,
#[arg(long, global = true)]
pub debug: bool,
#[arg(long, global = true, env = "SNAPDIR_NO_PROGRESS")]
pub no_progress: bool,
#[arg(long, short = 'q', global = true)]
pub quiet: bool,
#[arg(long, global = true, value_name = "WHEN", default_value = "auto")]
pub color: String,
#[arg(long, global = true, value_name = "DIR|STORE")]
pub location: Option<String>,
#[arg(
long,
short = 'j',
global = true,
value_name = "N",
env = "SNAPDIR_JOBS"
)]
pub jobs: Option<usize>,
#[arg(long, global = true, value_name = "RATE", env = "SNAPDIR_LIMIT_RATE")]
pub limit_rate: Option<String>,
#[arg(
long,
global = true,
value_name = "FRACTION",
num_args = 0..=1,
require_equals = true,
default_missing_value = "0.8",
env = "SNAPDIR_ADAPTIVE",
value_parser = parse_adaptive_fraction
)]
pub adaptive: Option<f64>,
#[arg(long, global = true, value_name = "N", env = "SNAPDIR_MAX_JOBS")]
pub max_jobs: Option<usize>,
}
#[derive(Debug, Subcommand)]
pub enum Command {
Manifest {
#[arg(long)]
absolute: bool,
#[arg(long)]
no_follow: bool,
#[arg(long, value_name = "NAME")]
checksum_bin: Option<String>,
#[arg(
long,
value_name = "PATTERN",
action = clap::ArgAction::Append,
value_delimiter = ','
)]
exclude: Vec<String>,
path: Option<PathBuf>,
},
Id {
path: Option<PathBuf>,
},
Stage {
dir: Option<PathBuf>,
},
Push {
path: Option<PathBuf>,
},
Fetch,
Pull {
path: Option<PathBuf>,
},
Checkout {
dir: Option<PathBuf>,
},
Verify,
VerifyCache,
FlushCache,
Locations,
Ancestors,
Revisions,
Defaults,
Sync {
#[arg(long, value_name = "STORE")]
from: String,
#[arg(long, value_name = "STORE")]
to: String,
},
Version,
#[command(hide = true)]
Completions {
shell: clap_complete::Shell,
},
#[command(hide = true)]
Man,
}
impl Cli {
pub fn run(&self) -> Result<()> {
match &self.command {
Command::Manifest {
absolute,
no_follow,
checksum_bin,
exclude,
path,
} => {
let exclude: &[String] = if exclude.is_empty() {
&self.globals.exclude
} else {
exclude
};
let manifest = self.build_manifest(
path.as_deref(),
*absolute,
*no_follow,
checksum_bin.as_deref(),
exclude,
None,
)?;
println!("{manifest}");
let id = snapshot_id(&manifest, &Blake3Hasher::new());
let abs = resolve_root(path.as_deref())
.context("resolving the manifested directory path")?;
self.log_event("manifest", &id, &abs.to_string_lossy())?;
Ok(())
}
Command::Id { path } => {
let manifest = self.build_manifest(
path.as_deref(),
false,
false,
None,
&self.globals.exclude,
None,
)?;
let id = snapshot_id(&manifest, &Blake3Hasher::new());
println!("{id}");
Ok(())
}
Command::Push { path } => self.run_push(path.as_deref()),
Command::Fetch => self.run_fetch(),
Command::Checkout { dir } => self.run_checkout(dir.as_deref()),
Command::Pull { path } => self.run_pull(path.as_deref()),
Command::Verify => self.run_verify(),
Command::Stage { dir } => self.run_stage(dir.as_deref()),
Command::VerifyCache => self.run_verify_cache(),
Command::FlushCache => self.run_flush_cache(),
Command::Locations => self.run_locations(),
Command::Ancestors => self.run_ancestors(),
Command::Revisions => self.run_revisions(),
Command::Version => {
println!("snapdir {}", env!("CARGO_PKG_VERSION"));
Ok(())
}
Command::Defaults => run_defaults(),
Command::Sync { from, to } => self.run_sync(from, to),
Command::Completions { shell } => {
let mut cmd = Cli::command();
clap_complete::generate(*shell, &mut cmd, "snapdir", &mut std::io::stdout());
Ok(())
}
Command::Man => {
clap_mangen::Man::new(Cli::command())
.render(&mut std::io::stdout())
.context("rendering the man page")?;
Ok(())
}
}
}
}
fn run_defaults() -> Result<()> {
let bin_path = std::env::current_exe()
.context("resolving the running binary path")?
.display()
.to_string();
let mut lines: Vec<String> = Vec::new();
let manifest_context = std::env::var("SNAPDIR_MANIFEST_CONTEXT").unwrap_or_default();
let manifest_exclude = std::env::var("SNAPDIR_MANIFEST_EXCLUDE").unwrap_or_default();
lines.push(format!("SNAPDIR_MANIFEST_BIN_PATH={bin_path}"));
lines.push(format!("SNAPDIR_MANIFEST_CONTEXT={manifest_context}"));
lines.push(format!("SNAPDIR_MANIFEST_EXCLUDE={manifest_exclude}"));
for (key, value) in std::env::vars() {
if !key.contains("SNAPDIR") || key.contains("VERSION") {
continue;
}
lines.push(reformat_env_default(&key, &value));
}
lines.push(format!("SNAPDIR_BIN_PATH={bin_path}"));
lines.sort();
lines.dedup();
for line in lines {
println!("{line}");
}
Ok(())
}
impl Cli {
fn run_push(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if path.is_none() && self.globals.id.is_some() {
let id = self.require_id()?;
let cache = self.cache_store()?;
let manifest = cache.get_manifest(id).with_context(|| {
format!("manifest {id} not found in the local cache; stage or fetch it first")
})?;
let store_url = self
.globals
.store
.as_deref()
.context("missing --store option")?;
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would push {id} to {store_url} (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let store = self.resolve_store(meter.clone())?;
let scratch = ScratchDir::new("push")?;
cache
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("materializing staged snapshot {id}"))?;
store
.push(&manifest, scratch.path())
.with_context(|| format!("pushing snapshot {id} to store"))?;
reporter.finish();
println!("{id}");
self.log_event("push", id, store_url)?;
return Ok(());
}
if let Some(m) = &meter {
m.set_phase(Phase::Hashing);
}
let manifest = self.build_manifest(
path,
false,
false,
None,
&self.globals.exclude,
meter.as_deref(),
)?;
let root = resolve_root(path).context("resolving push path")?;
let id = snapshot_id(&manifest, &Blake3Hasher::new());
let store_url = self
.globals
.store
.as_deref()
.context("missing --store option")?;
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would push {id} to {store_url} (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let store = self.resolve_store(meter.clone())?;
store
.push(&manifest, &root)
.with_context(|| format!("pushing snapshot {id} to store"))?;
reporter.finish();
println!("{id}");
self.log_event("push", &id, store_url)?;
Ok(())
}
fn run_fetch(&self) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self.fetch_inner(meter.as_ref());
reporter.finish();
result
}
fn fetch_inner(&self, meter: Option<&Arc<Meter>>) -> Result<()> {
let cache = self.cache_store_with_meter(meter.cloned())?;
if let Some(id) = self.globals.id.as_deref() {
if cache.get_manifest(id).is_ok() {
if self.globals.verbose && !self.globals.quiet {
eprintln!("CACHED: {id}");
}
return Ok(());
}
}
let store = self.resolve_store(meter.cloned())?;
let id = self.require_id()?;
let manifest = store
.get_manifest(id)
.with_context(|| format!("fetching manifest {id} from store"))?;
if self.globals.dryrun {
if !self.globals.quiet {
eprintln!("dry-run: would fetch {id} into the local cache (no writes performed)");
}
return Ok(());
}
if let Some(m) = meter {
m.set_total(total_object_bytes(&manifest));
}
let scratch = ScratchDir::new("fetch")?;
store
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("fetching objects for snapshot {id}"))?;
cache
.push(&manifest, scratch.path())
.with_context(|| format!("saving snapshot {id} to the local cache"))?;
if self.globals.verbose && !self.globals.quiet {
eprintln!("SAVED: {id}");
}
Ok(())
}
fn run_checkout(&self, dir: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self.checkout_inner(dir, meter.as_ref());
reporter.finish();
result
}
fn checkout_inner(&self, dir: Option<&Path>, meter: Option<&Arc<Meter>>) -> Result<()> {
let id = self.require_id()?;
let dest = resolve_root(dir).context("resolving checkout destination")?;
if self.globals.dryrun {
if !self.globals.quiet {
eprintln!(
"dry-run: would check out {id} to {} (no writes performed)",
dest.display()
);
}
return Ok(());
}
let cache = self.cache_store_with_meter(meter.cloned())?;
let manifest = cache.get_manifest(id).with_context(|| {
format!("manifest {id} not found locally; did you forget to fetch it?")
})?;
if let Some(m) = meter {
m.set_total(total_object_bytes(&manifest));
}
cache
.fetch_files(&manifest, &dest)
.with_context(|| format!("checking out snapshot {id} to {}", dest.display()))?;
restore_permissions(&manifest, &dest)?;
Ok(())
}
fn run_pull(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self
.fetch_inner(meter.as_ref())
.and_then(|()| self.checkout_inner(path, meter.as_ref()));
reporter.finish();
result
}
fn run_verify(&self) -> Result<()> {
if self.globals.purge {
anyhow::bail!(
"snapdir: `verify` does not support --purge; use `verify-cache --purge` to remove corrupt objects from the local cache"
);
}
let store = self.resolve_store(None)?;
let id = self.require_id()?;
let manifest = store
.get_manifest(id)
.with_context(|| format!("verifying manifest {id}"))?;
let scratch = ScratchDir::new("verify")?;
store
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("verifying objects for snapshot {id}"))?;
Ok(())
}
fn run_stage(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Hashing);
}
let manifest = self.build_manifest(
path,
false,
false,
None,
&self.globals.exclude,
meter.as_deref(),
)?;
let root = resolve_root(path).context("resolving stage path")?;
let id = snapshot_id(&manifest, &Blake3Hasher::new());
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would stage {id} into the local cache (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let cache = self.cache_store_with_meter(meter.clone())?;
cache
.push(&manifest, &root)
.with_context(|| format!("staging snapshot {id} into the local cache"))?;
reporter.finish();
println!("{id}");
self.log_event("stage", &id, &root.to_string_lossy())?;
Ok(())
}
fn run_verify_cache(&self) -> Result<()> {
let cache_dir = self.cache_dir();
let purge = self.globals.purge && !self.globals.dryrun;
if self.globals.purge && self.globals.dryrun {
eprintln!("dry-run: would purge corrupt objects from the cache (no writes performed)");
}
let report = cache::verify_cache(&cache_dir, purge, &Blake3Hasher::new())
.with_context(|| format!("verifying cache at {}", cache_dir.display()))?;
for checksum in &report.corrupt {
eprintln!("Checksum mismatch for {checksum}");
}
if purge && self.globals.verbose {
for checksum in &report.purged {
eprintln!("purged {checksum}");
}
}
if report.is_clean() {
return Ok(());
}
anyhow::bail!(
"snapdir: {} corrupt object(s) in the cache",
report.corrupt.len()
)
}
fn run_flush_cache(&self) -> Result<()> {
let cache_dir = self.cache_dir();
if self.globals.dryrun {
eprintln!(
"dry-run: would flush the cache at {} (no writes performed)",
cache_dir.display()
);
return Ok(());
}
cache::flush_cache(&cache_dir)
.with_context(|| format!("flushing cache at {}", cache_dir.display()))?;
Ok(())
}
fn run_locations(&self) -> Result<()> {
let catalog = self.open_catalog()?;
for record in catalog.locations().context("querying catalog locations")? {
println!("{}", locations_json_line(&record));
}
Ok(())
}
fn run_ancestors(&self) -> Result<()> {
let catalog = self.open_catalog()?;
let id = self.require_id()?;
let location = self.globals.location.as_deref();
for record in catalog
.ancestors(id, location)
.with_context(|| format!("querying catalog ancestors of {id}"))?
{
println!("{}", ancestors_json_line(&record));
}
Ok(())
}
fn run_revisions(&self) -> Result<()> {
let catalog = self.open_catalog()?;
let location = self
.globals
.location
.as_deref()
.or(self.globals.store.as_deref())
.context("missing --location option")?;
for record in catalog
.revisions(location)
.with_context(|| format!("querying catalog revisions at {location}"))?
{
println!("{}", revisions_json_line(&record));
}
Ok(())
}
fn log_event(&self, event: &str, id: &str, location: &str) -> Result<()> {
let Some(db) = self.catalog_db_path() else {
return Ok(());
};
let catalog =
Catalog::open(&db).with_context(|| format!("opening catalog at {}", db.display()))?;
catalog
.log(event, id, location, &SystemClock)
.with_context(|| format!("recording catalog event {event} for {id}"))?;
Ok(())
}
fn open_catalog(&self) -> Result<Catalog> {
let db = self
.catalog_db_path()
.context("error: Missing SNAPDIR_CATALOG or --catalog")?;
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating catalog directory {}", parent.display()))?;
}
Catalog::open(&db).with_context(|| format!("opening catalog at {}", db.display()))
}
fn catalog_db_path(&self) -> Option<PathBuf> {
let catalog = self.globals.catalog.as_deref()?;
if catalog.is_empty() {
return None;
}
if catalog.contains(std::path::MAIN_SEPARATOR) {
Some(PathBuf::from(catalog))
} else {
Some(self.cache_dir().join(format!("{catalog}-catalog.redb")))
}
}
fn resolve_store(&self, meter: Option<Arc<Meter>>) -> Result<Box<dyn Store>> {
let store_url = self
.globals
.store
.as_deref()
.context("missing --store option")?;
let adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
let config = self.transfer_config()?;
store_for_adapter(&adapter, store_url, config, meter)
}
fn transfer_config(&self) -> Result<TransferConfig> {
let max_bytes_per_sec = match self.globals.limit_rate.as_deref() {
Some(rate) => Some(parse_rate(rate)?),
None => None,
};
let auto = TransferConfig::default().concurrency.get();
let concurrency = match self.globals.jobs {
Some(n) if n > 0 => n,
_ => auto,
};
let base = TransferConfig::new(concurrency, max_bytes_per_sec);
match self.globals.adaptive {
None => Ok(base),
Some(fraction) => {
let ceiling = self
.globals
.max_jobs
.or(self.globals.jobs.filter(|&n| n > 0))
.unwrap_or(auto)
.clamp(1, ADAPTIVE_CEILING_CAP);
Ok(base.with_adaptive(TransferAdaptivePolicy::On { fraction, ceiling }))
}
}
}
fn log_transfer_config(&self) {
if self.globals.quiet || !self.globals.verbose {
return;
}
let Ok(config) = self.transfer_config() else {
return;
};
match config.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let pct = (fraction * 100.0).round() as u64;
match self.globals.limit_rate.as_deref() {
Some(rate) => eprintln!(
"adaptive: target {pct}% of capacity, ceiling {ceiling}, limit {rate}"
),
None => eprintln!("adaptive: target {pct}% of capacity, ceiling {ceiling}"),
}
}
TransferAdaptivePolicy::Off => {
let concurrency = config.concurrency.get();
match self.globals.limit_rate.as_deref() {
Some(rate) => eprintln!("transfers: {concurrency} concurrent, limit {rate}"),
None => eprintln!("transfers: {concurrency} concurrent"),
}
}
}
}
fn color_choice(&self) -> ColorChoice {
ColorChoice::parse(&self.globals.color)
}
fn start_progress(&self, jobs: usize) -> (Option<Arc<Meter>>, ProgressReporter) {
let is_tty = std::io::stderr().is_terminal();
let active = should_render(
is_tty,
self.globals.no_progress || self.globals.quiet,
std::env::var("TERM").ok().as_deref(),
);
if active {
let meter = Arc::new(Meter::new());
let color = use_color(
self.color_choice(),
is_tty,
std::env::var_os("NO_COLOR").is_some(),
);
let ascii = matches!(std::env::var("TERM").as_deref(), Ok("dumb"));
let reporter = ProgressReporter::start(
Arc::clone(&meter),
jobs,
true,
color,
ascii,
self.globals.adaptive,
);
(Some(meter), reporter)
} else {
let reporter =
ProgressReporter::start(Arc::new(Meter::new()), jobs, false, false, false, None);
(None, reporter)
}
}
fn run_sync(&self, from_url: &str, to_url: &str) -> Result<()> {
let id = self.require_id()?;
anyhow::ensure!(
from_url != to_url,
"sync --from and --to must differ (both are {from_url})"
);
let config = self.transfer_config()?;
let from_adapter = resolve_adapter(from_url).context("resolving --from store protocol")?;
let to_adapter = resolve_adapter(to_url).context("resolving --to store protocol")?;
let from_store = stream_store_for_adapter(&from_adapter, from_url, config.clone(), None)?;
let to_store = stream_store_for_adapter(&to_adapter, to_url, config.clone(), None)?;
self.log_transfer_config();
let jobs = config.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let report = snapdir_stores::sync_snapshot(
&*from_store,
&*to_store,
id,
&config,
self.globals.dryrun,
meter.as_deref(),
);
reporter.finish();
let report =
report.with_context(|| format!("syncing snapshot {id} from {from_url} to {to_url}"))?;
if report.dry_run {
if !self.globals.quiet {
eprintln!(
"dry-run: would copy {} object(s) for {id}",
report.objects_copied
);
}
} else {
println!("{id}");
if !self.globals.quiet {
eprintln!(
"synced {id}: {} copied, {} skipped ({} bytes)",
report.objects_copied, report.objects_skipped, report.bytes_copied
);
}
}
Ok(())
}
fn cache_store(&self) -> Result<FileStore> {
Ok(FileStore::from_root_with_config(
self.cache_dir(),
self.transfer_config()?,
))
}
fn cache_store_with_meter(&self, meter: Option<Arc<Meter>>) -> Result<FileStore> {
Ok(self.cache_store()?.with_meter(meter))
}
fn cache_dir(&self) -> PathBuf {
if let Some(dir) = &self.globals.cache_dir {
return dir.clone();
}
let home = std::env::var("HOME").unwrap_or_default();
let base = std::env::var("XDG_CACHE_HOME").unwrap_or_else(|_| format!("{home}/.cache"));
PathBuf::from(format!("{base}/snapdir"))
}
fn require_id(&self) -> Result<&str> {
self.globals.id.as_deref().context("missing --id option")
}
fn build_manifest(
&self,
path: Option<&Path>,
absolute: bool,
no_follow: bool,
checksum_bin: Option<&str>,
exclude: &[String],
meter: Option<&Meter>,
) -> Result<Manifest> {
let root = resolve_root(path).context("resolving manifest path")?;
let (home_cache, cache_dir) = exclude_runtime_paths(self.globals.cache_dir.as_deref());
let combined = combine_excludes(exclude, &home_cache, &cache_dir);
let matcher = match &combined.pattern {
Some(pattern) => {
Some(ExcludeMatcher::new(pattern).context("compiling --exclude pattern")?)
}
None => None,
};
let follow = if no_follow || combined.forces_no_follow {
FollowMode::NoFollow
} else {
FollowMode::Follow
};
let path_mode = if absolute {
PathMode::Absolute
} else {
PathMode::Relative
};
let options = WalkOptions {
follow,
path_mode,
exclude: matcher,
};
match checksum_bin {
None | Some("b3sum") => {
let context = std::env::var("SNAPDIR_MANIFEST_CONTEXT").unwrap_or_default();
if context.is_empty() {
walk_with(&root, &options, &Blake3Hasher::new(), meter)
} else {
walk_with(&root, &options, &Blake3KeyedHasher::new(context), meter)
}
}
Some("md5sum") => walk_with(&root, &options, &Md5Hasher::new(), meter),
Some("sha256sum") => walk_with(&root, &options, &Sha256Hasher::new(), meter),
Some(other) => {
anyhow::bail!("snapdir: unsupported --checksum-bin '{other}'")
}
}
}
}
fn store_for_adapter(
adapter: &Adapter,
store_url: &str,
config: TransferConfig,
meter: Option<Arc<Meter>>,
) -> Result<Box<dyn Store>> {
match adapter {
Adapter::File => Ok(Box::new(
FileStore::new_with_config(store_url, config).with_meter(meter),
)),
Adapter::S3 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let store = S3Store::connect_with(store_url, endpoint.as_deref(), config)
.with_context(|| format!("connecting to S3 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::B2 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let region = std::env::var("SNAPDIR_B2_REGION")
.or_else(|_| std::env::var("AWS_REGION"))
.ok();
let store =
B2Store::connect_with(store_url, endpoint.as_deref(), region.as_deref(), config)
.with_context(|| format!("connecting to B2 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::Gcs => {
let store = GcsStore::connect_with(store_url, config)
.with_context(|| format!("connecting to GCS store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::External { .. } => {
let store = ExternalStore::new(store_url)
.with_context(|| format!("resolving external store for {store_url}"))?;
Ok(Box::new(store))
}
}
}
fn stream_store_for_adapter(
adapter: &Adapter,
store_url: &str,
config: TransferConfig,
meter: Option<Arc<Meter>>,
) -> Result<Box<dyn StreamStore + Sync>> {
match adapter {
Adapter::File => Ok(Box::new(
FileStore::new_with_config(store_url, config).with_meter(meter),
)),
Adapter::S3 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let store = S3Store::connect_with(store_url, endpoint.as_deref(), config)
.with_context(|| format!("connecting to S3 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::B2 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let region = std::env::var("SNAPDIR_B2_REGION")
.or_else(|_| std::env::var("AWS_REGION"))
.ok();
let store =
B2Store::connect_with(store_url, endpoint.as_deref(), region.as_deref(), config)
.with_context(|| format!("connecting to B2 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::Gcs => {
let store = GcsStore::connect_with(store_url, config)
.with_context(|| format!("connecting to GCS store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::External { .. } => Err(anyhow::anyhow!(
"sync requires in-process stores (file/s3/b2/gcs); \
external `snapdir-*-store` URLs are not supported: {store_url}"
)),
}
}
fn parse_rate(s: &str) -> Result<u64> {
let trimmed = s.trim();
anyhow::ensure!(!trimmed.is_empty(), "empty --limit-rate value");
let split = trimmed
.find(|c: char| !(c.is_ascii_digit() || c == '.'))
.unwrap_or(trimmed.len());
let (num, suffix) = trimmed.split_at(split);
anyhow::ensure!(
!num.is_empty(),
"invalid --limit-rate '{s}': expected a number, optionally followed by K/M/G"
);
let value: f64 = num
.parse()
.with_context(|| format!("invalid --limit-rate '{s}': '{num}' is not a number"))?;
anyhow::ensure!(
value.is_finite() && value >= 0.0,
"invalid --limit-rate '{s}': must be a non-negative number"
);
let unit = suffix.trim().to_ascii_lowercase();
let unit = unit.strip_suffix('b').unwrap_or(&unit);
let unit = unit.strip_suffix('i').unwrap_or(unit);
let multiplier: f64 = match unit {
"" => 1.0,
"k" => 1024.0,
"m" => 1024.0 * 1024.0,
"g" => 1024.0 * 1024.0 * 1024.0,
other => {
anyhow::bail!("invalid --limit-rate '{s}': unknown unit '{other}' (use K, M, or G)")
}
};
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
Ok((value * multiplier) as u64)
}
fn parse_adaptive_fraction(s: &str) -> Result<f64, String> {
let value: f64 = s
.trim()
.parse()
.map_err(|_| format!("invalid --adaptive '{s}': expected a number in (0.0, 1.0]"))?;
if !value.is_finite() || value <= 0.0 || value > 1.0 {
return Err(format!(
"invalid --adaptive '{s}': fraction must be in (0.0, 1.0]"
));
}
Ok(value)
}
fn reformat_env_default(key: &str, value: &str) -> String {
let line = format!("{key}={value}");
let stripped = line
.strip_prefix("_SNAPDIR_")
.or_else(|| line.strip_prefix("SNAPDIR_"));
let body = match stripped {
Some(rest) => format!("--{rest}"),
None => line,
};
body.replace('_', "-").to_lowercase()
}
fn walk_with<H: Hasher>(
root: &Path,
options: &WalkOptions,
hasher: &H,
meter: Option<&Meter>,
) -> Result<Manifest> {
walk_with_meter(root, options, hasher, meter)
.with_context(|| format!("walking {}", root.display()))
}
fn total_object_bytes(manifest: &Manifest) -> u64 {
manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.size)
.sum()
}
fn restore_permissions(manifest: &Manifest, dest: &Path) -> Result<()> {
for entry in manifest.entries() {
if entry.path_type == PathType::Directory {
continue;
}
apply_mode(dest, entry)?;
}
let mut dirs: Vec<&_> = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::Directory)
.collect();
dirs.sort_by_key(|e| std::cmp::Reverse(e.path.len()));
for entry in dirs {
apply_mode(dest, entry)?;
}
Ok(())
}
fn apply_mode(dest: &Path, entry: &ManifestEntry) -> Result<()> {
let rel = entry.path.strip_prefix("./").unwrap_or(&entry.path);
let rel = rel.strip_suffix('/').unwrap_or(rel);
let target = if rel.is_empty() {
dest.to_path_buf()
} else {
dest.join(rel)
};
let mode = u32::from_str_radix(&entry.permissions, 8)
.with_context(|| format!("invalid permissions {:?}", entry.permissions))?;
std::fs::set_permissions(&target, std::fs::Permissions::from_mode(mode))
.with_context(|| format!("setting permissions on {}", target.display()))?;
Ok(())
}
struct ScratchDir {
path: PathBuf,
}
impl ScratchDir {
fn new(tag: &str) -> Result<Self> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("snapdir-cli-{tag}-{}-{n}", std::process::id()));
std::fs::create_dir_all(&path)
.with_context(|| format!("creating scratch dir {}", path.display()))?;
Ok(Self { path })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for ScratchDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn resolve_root(path: Option<&Path>) -> Result<PathBuf> {
let raw = match path {
Some(p) => p.to_path_buf(),
None => std::env::current_dir().context("getting current directory")?,
};
if raw.is_absolute() {
return Ok(raw);
}
let cwd = std::env::current_dir().context("getting current directory")?;
Ok(cwd.join(raw))
}
fn combine_excludes(patterns: &[String], home_cache: &str, cache_dir: &str) -> ExpandedExclude {
let mut groups: Vec<String> = Vec::new();
let mut forces_no_follow = false;
for pattern in patterns {
let expanded = expand_excludes(pattern, home_cache, cache_dir);
forces_no_follow |= expanded.forces_no_follow;
if let Some(ere) = expanded.pattern {
groups.push(format!("(?:{ere})"));
}
}
let pattern = if groups.is_empty() {
None
} else {
Some(groups.join("|"))
};
ExpandedExclude {
pattern,
forces_no_follow,
}
}
fn exclude_runtime_paths(cache_dir: Option<&Path>) -> (String, String) {
let home = std::env::var("HOME").unwrap_or_default();
let home_cache = format!("{home}/.cache/");
let cache_dir = if let Some(dir) = cache_dir {
dir.display().to_string()
} else {
let base = std::env::var("XDG_CACHE_HOME").unwrap_or_else(|_| format!("{home}/.cache"));
format!("{base}/snapdir")
};
(home_cache, cache_dir)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn remote_store_routing_resolves_every_scheme_to_its_adapter() {
assert_eq!(
resolve_adapter("file:///long/term/x").unwrap(),
Adapter::File
);
assert_eq!(resolve_adapter("s3://bucket/path").unwrap(), Adapter::S3);
assert_eq!(resolve_adapter("b2://bucket/path").unwrap(), Adapter::B2);
let gcs = resolve_adapter("gs://bucket/path").unwrap();
assert_eq!(gcs, Adapter::Gcs);
assert_eq!(gcs.name(), "gcs");
assert_eq!(gcs.store_binary(), "snapdir-gcs-store");
let xyz = resolve_adapter("xyz://bucket/path").unwrap();
assert_eq!(
xyz,
Adapter::External {
name: "xyz".to_owned()
}
);
assert!(!xyz.is_builtin());
assert_eq!(xyz.store_binary(), "snapdir-xyz-store");
}
#[test]
fn remote_store_routing_file_builds_filestore_without_io() {
let adapter = resolve_adapter("file:///tmp/snapdir-routing-test").unwrap();
let store = store_for_adapter(
&adapter,
"file:///tmp/snapdir-routing-test",
TransferConfig::default(),
None,
)
.unwrap();
assert!(store.get_manifest("0".repeat(64).as_str()).is_err());
}
#[test]
fn remote_store_routing_external_builds_shim_for_third_party_scheme() {
let adapter = resolve_adapter("xyz://bucket/base").unwrap();
let store = ExternalStore::new("xyz://bucket/base").unwrap();
assert_eq!(store.binary(), Path::new("snapdir-xyz-store"));
let routed = store_for_adapter(
&adapter,
"xyz://bucket/base",
TransferConfig::default(),
None,
);
assert!(routed.is_ok());
}
#[test]
fn remote_store_routing_rejects_invalid_protocol() {
assert!(resolve_adapter("NotAScheme://x").is_err());
}
fn cli_with(args: &[&str]) -> Cli {
unsafe {
std::env::remove_var("SNAPDIR_JOBS");
std::env::remove_var("SNAPDIR_LIMIT_RATE");
std::env::remove_var("SNAPDIR_ADAPTIVE");
std::env::remove_var("SNAPDIR_MAX_JOBS");
}
let mut full = vec!["snapdir"];
full.extend_from_slice(args);
full.push("defaults");
Cli::try_parse_from(full).expect("parse cli")
}
#[test]
fn transfer_flags_parse_rate() {
assert_eq!(parse_rate("10M").unwrap(), 10_485_760);
assert_eq!(parse_rate("512K").unwrap(), 524_288);
assert_eq!(parse_rate("1G").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1GiB").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1GB").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1000").unwrap(), 1000);
assert_eq!(parse_rate("1.5M").unwrap(), 1_572_864);
assert_eq!(parse_rate(" 2k ").unwrap(), 2048);
assert_eq!(parse_rate("1kib").unwrap(), 1024);
for bad in ["10X", "abc", "", " ", "M", "1.2.3", "-5M"] {
assert!(parse_rate(bad).is_err(), "expected {bad:?} to be rejected");
}
}
#[test]
fn transfer_flags_jobs_explicit() {
let cfg = cli_with(&["--jobs", "4"]).transfer_config().unwrap();
assert_eq!(cfg.concurrency.get(), 4);
assert_eq!(cfg.max_bytes_per_sec, None);
}
#[test]
fn transfer_flags_jobs_one_is_sequential() {
let cfg = cli_with(&["--jobs", "1"]).transfer_config().unwrap();
assert_eq!(cfg.concurrency.get(), 1);
}
#[test]
fn transfer_flags_jobs_unset_is_auto() {
let cfg = cli_with(&[]).transfer_config().unwrap();
assert!(cfg.concurrency.get() >= 1 && cfg.concurrency.get() <= 16);
assert_eq!(
cfg.concurrency.get(),
TransferConfig::default().concurrency.get()
);
}
#[test]
fn transfer_flags_jobs_zero_is_auto() {
let cfg = cli_with(&["--jobs", "0"]).transfer_config().unwrap();
assert!(cfg.concurrency.get() >= 1 && cfg.concurrency.get() <= 16);
assert_eq!(
cfg.concurrency.get(),
TransferConfig::default().concurrency.get()
);
}
#[test]
fn transfer_flags_limit_rate_threads_into_config() {
let cfg = cli_with(&["--limit-rate", "1M"]).transfer_config().unwrap();
assert_eq!(cfg.max_bytes_per_sec, Some(1_048_576));
let cfg = cli_with(&["-j", "2", "--limit-rate", "512K"])
.transfer_config()
.unwrap();
assert_eq!(cfg.concurrency.get(), 2);
assert_eq!(cfg.max_bytes_per_sec, Some(524_288));
}
#[test]
fn transfer_flags_bad_limit_rate_errors() {
assert!(cli_with(&["--limit-rate", "nope"])
.transfer_config()
.is_err());
}
#[test]
fn adaptive_flag_without_value_defaults_to_point_eight() {
let cfg = cli_with(&["--adaptive"]).transfer_config().unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
assert!(
(fraction - 0.8).abs() < 1e-9,
"expected fraction ~0.8, got {fraction}"
);
assert_eq!(ceiling, TransferConfig::default().concurrency.get());
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_flag_with_explicit_fraction() {
let cfg = cli_with(&["--adaptive=0.5"]).transfer_config().unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, .. } => {
assert!(
(fraction - 0.5).abs() < 1e-9,
"expected fraction ~0.5, got {fraction}"
);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_flag_out_of_range_is_rejected() {
for bad in ["0", "0.0", "1.5", "-0.2", "nope"] {
unsafe {
std::env::remove_var("SNAPDIR_ADAPTIVE");
std::env::remove_var("SNAPDIR_MAX_JOBS");
}
let arg = format!("--adaptive={bad}");
assert!(
Cli::try_parse_from(["snapdir", &arg, "defaults"]).is_err(),
"expected --adaptive={bad} to be rejected"
);
}
let cfg = cli_with(&["--adaptive=1.0"]).transfer_config().unwrap();
assert!(matches!(cfg.adaptive, TransferAdaptivePolicy::On { .. }));
}
#[test]
fn adaptive_unset_equals_pre_phase18_value() {
let cfg = cli_with(&["--jobs", "4", "--limit-rate", "1M"])
.transfer_config()
.unwrap();
let expected = TransferConfig::new(4, Some(1_048_576));
assert_eq!(cfg.adaptive, TransferAdaptivePolicy::Off);
assert_eq!(cfg.adaptive, expected.adaptive);
assert_eq!(cfg.concurrency, expected.concurrency);
assert_eq!(cfg.max_bytes_per_sec, expected.max_bytes_per_sec);
let cfg = cli_with(&[]).transfer_config().unwrap();
let expected = TransferConfig::new(TransferConfig::default().concurrency.get(), None);
assert_eq!(cfg.adaptive, TransferAdaptivePolicy::Off);
assert_eq!(cfg.concurrency, expected.concurrency);
assert_eq!(cfg.max_bytes_per_sec, expected.max_bytes_per_sec);
}
#[test]
fn adaptive_max_jobs_sets_ceiling() {
let cfg = cli_with(&["--adaptive", "--max-jobs", "8"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => assert_eq!(ceiling, 8),
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
let cfg = cli_with(&["--adaptive=0.6", "--jobs", "3", "--limit-rate", "512K"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
assert!((fraction - 0.6).abs() < 1e-9);
assert_eq!(ceiling, 3);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
assert_eq!(cfg.max_bytes_per_sec, Some(524_288));
let cfg = cli_with(&["--adaptive", "--jobs", "2", "--max-jobs", "12"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => assert_eq!(ceiling, 12),
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_ceiling_is_clamped_to_cap() {
let cfg = cli_with(&["--adaptive", "--max-jobs", "9999"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => {
assert_eq!(ceiling, ADAPTIVE_CEILING_CAP);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
}