use std::io::{BufRead, IsTerminal, Read, Write};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum};
use crate::progress::{should_render, use_color, ColorChoice, ProgressReporter};
use snapdir_catalog::{
ancestors_json_line, locations_json_line, revisions_json_line, Catalog, SystemClock,
};
use snapdir_core::hash_file::HashFile;
use snapdir_core::{
cache, expand_excludes, snapshot_id, walk_with_guards, walk_with_meter, Blake3Hasher,
Blake3KeyedHasher, CopyGuard, ExcludeMatcher, ExpandedExclude, FollowMode, Hasher, Manifest,
ManifestEntry, Md5Hasher, Meter, PathMode, PathType, Phase, Sha256Hasher, Store, StoreError,
WalkOptions,
};
use snapdir_stores::{
is_hex64, limits, read_pack, resolve_adapter, write_pack_with_format, Adapter, B2Store,
Durability, ExternalStore, FileSink, FileStore, GcsStore, PackFormat, PackReadReport, PackSink,
RetryPolicy, S3Store, SplitStore, StreamSink, StreamStore, TransferAdaptivePolicy,
TransferConfig, DEFAULT_ZSTD_LEVEL, WIRE_CAPS, WIRE_VERSION,
};
const ADAPTIVE_CEILING_CAP: usize = 64;
const NO_STORE_CONFIGURED: &str =
"no store configured: pass --store <uri> or set the SNAPDIR_STORE environment variable";
#[derive(Debug, Parser)]
#[command(
name = "snapdir",
bin_name = "snapdir",
version,
propagate_version = true,
about = "Content-addressable directory snapshots.",
long_about = None
)]
pub struct Cli {
#[command(flatten)]
pub universal: UniversalArgs,
#[command(subcommand)]
pub command: Command,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, ValueEnum)]
pub enum ColorArg {
#[default]
Auto,
Always,
Never,
}
impl ColorArg {
fn resolve(self) -> ColorChoice {
match self {
Self::Auto => ColorChoice::Auto,
Self::Always => ColorChoice::Always,
Self::Never => ColorChoice::Never,
}
}
}
#[derive(Debug, Args)]
pub struct UniversalArgs {
#[arg(long, short = 'q', global = true)]
pub quiet: bool,
#[arg(long, global = true, value_name = "WHEN", value_enum, default_value_t = ColorArg::Auto)]
pub color: ColorArg,
#[arg(long, global = true, env = "SNAPDIR_NO_PROGRESS")]
pub no_progress: bool,
#[arg(long, global = true)]
pub verbose: bool,
}
#[derive(Debug, Default, Args)]
pub struct WalkArgs {
#[arg(
long,
value_name = "PATTERN",
action = clap::ArgAction::Append,
value_delimiter = ','
)]
pub exclude: Vec<String>,
#[arg(long, value_name = "N", env = "SNAPDIR_WALK_JOBS")]
pub walk_jobs: Option<usize>,
}
#[derive(Debug, Default, Args)]
#[allow(clippy::struct_excessive_bools)]
pub struct TransferArgs {
#[arg(long, value_name = "URI", env = "SNAPDIR_STORE")]
pub store: Option<String>,
#[arg(long, value_name = "NAME", env = "SNAPDIR_CATALOG")]
pub catalog: Option<String>,
#[arg(long, value_name = "URI", env = "SNAPDIR_OBJECTS_STORE")]
pub objects_store: Option<String>,
#[arg(long, value_name = "DIR", env = "SNAPDIR_CACHE_DIR")]
pub cache_dir: Option<PathBuf>,
#[arg(long, value_name = "ID")]
pub id: Option<String>,
#[arg(long, short = 'j', value_name = "N", env = "SNAPDIR_JOBS")]
pub jobs: Option<usize>,
#[arg(long, value_name = "RATE", env = "SNAPDIR_LIMIT_RATE", value_parser = parse_rate_arg)]
pub limit_rate: Option<String>,
#[arg(
long,
value_name = "FRACTION",
num_args = 0..=1,
require_equals = true,
default_missing_value = "0.8",
env = "SNAPDIR_ADAPTIVE",
value_parser = parse_adaptive_fraction
)]
pub adaptive: Option<f64>,
#[arg(long, value_name = "N", env = "SNAPDIR_MAX_JOBS")]
pub max_jobs: Option<usize>,
#[arg(long, value_name = "N")]
pub max_retries: Option<u32>,
#[arg(long, value_name = "MS")]
pub retry_base_ms: Option<u64>,
#[arg(long, value_name = "MS")]
pub retry_max_ms: Option<u64>,
#[arg(long, value_name = "N")]
pub max_requests: Option<u64>,
#[arg(long)]
pub linked: bool,
#[arg(long)]
pub force: bool,
#[arg(long)]
pub keep: bool,
#[arg(long)]
pub dryrun: bool,
}
#[derive(Debug, Default, Args)]
pub struct DefaultsArgs {
#[arg(long, value_name = "URI", env = "SNAPDIR_STORE")]
pub store: Option<String>,
#[arg(long, value_name = "NAME", env = "SNAPDIR_CATALOG")]
pub catalog: Option<String>,
#[arg(long, value_name = "URI", env = "SNAPDIR_OBJECTS_STORE")]
pub objects_store: Option<String>,
#[arg(long, value_name = "DIR", env = "SNAPDIR_CACHE_DIR")]
pub cache_dir: Option<PathBuf>,
#[arg(long, value_name = "N", env = "SNAPDIR_WALK_JOBS")]
pub walk_jobs: Option<usize>,
#[arg(long, short = 'j', value_name = "N", env = "SNAPDIR_JOBS")]
pub jobs: Option<usize>,
#[arg(long, value_name = "RATE", env = "SNAPDIR_LIMIT_RATE", value_parser = parse_rate_arg)]
pub limit_rate: Option<String>,
#[arg(
long,
value_name = "FRACTION",
num_args = 0..=1,
require_equals = true,
default_missing_value = "0.8",
env = "SNAPDIR_ADAPTIVE",
value_parser = parse_adaptive_fraction
)]
pub adaptive: Option<f64>,
#[arg(long, value_name = "N", env = "SNAPDIR_MAX_JOBS")]
pub max_jobs: Option<usize>,
#[arg(long, value_name = "N")]
pub max_retries: Option<u32>,
#[arg(long, value_name = "MS")]
pub retry_base_ms: Option<u64>,
#[arg(long, value_name = "MS")]
pub retry_max_ms: Option<u64>,
#[arg(long, value_name = "N")]
pub max_requests: Option<u64>,
}
#[derive(Debug, Default, Args)]
pub struct CatalogArgs {
#[arg(long, value_name = "NAME", env = "SNAPDIR_CATALOG")]
pub catalog: Option<String>,
#[arg(long, value_name = "DIR|STORE")]
pub location: Option<String>,
#[arg(long, value_name = "ID")]
pub id: Option<String>,
#[arg(long, value_name = "URI", env = "SNAPDIR_STORE")]
pub store: Option<String>,
#[arg(long, value_name = "DIR", env = "SNAPDIR_CACHE_DIR")]
pub cache_dir: Option<PathBuf>,
}
#[derive(Debug, Default, Args)]
pub struct CacheMgmtArgs {
#[arg(long, value_name = "URI", env = "SNAPDIR_STORE")]
pub store: Option<String>,
#[arg(long, value_name = "ID")]
pub id: Option<String>,
#[arg(long)]
pub purge: bool,
#[arg(long)]
pub force: bool,
#[arg(long)]
pub dryrun: bool,
#[arg(long, value_name = "DIR", env = "SNAPDIR_CACHE_DIR")]
pub cache_dir: Option<PathBuf>,
}
#[derive(Debug, Default, Args)]
pub struct DiffIdArgs {
#[arg(long, value_name = "ID")]
pub id: Option<String>,
}
#[derive(Debug, Default, Args)]
pub struct PlumbingArgs {
#[arg(long, value_name = "URI", env = "SNAPDIR_STORE")]
pub store: Option<String>,
#[arg(long, value_name = "URI", env = "SNAPDIR_OBJECTS_STORE")]
pub objects_store: Option<String>,
}
#[derive(Debug, Default)]
#[allow(clippy::struct_excessive_bools)]
pub struct Resolved {
pub cache_dir: Option<PathBuf>,
pub catalog: Option<String>,
pub store: Option<String>,
pub objects_store: Option<String>,
pub id: Option<String>,
pub exclude: Vec<String>,
pub linked: bool,
pub force: bool,
pub purge: bool,
pub keep: bool,
pub dryrun: bool,
pub verbose: bool,
pub no_progress: bool,
pub quiet: bool,
pub color: ColorArg,
pub location: Option<String>,
pub jobs: Option<usize>,
pub walk_jobs: Option<usize>,
pub limit_rate: Option<String>,
pub adaptive: Option<f64>,
pub max_jobs: Option<usize>,
pub max_retries: Option<u32>,
pub retry_base_ms: Option<u64>,
pub retry_max_ms: Option<u64>,
pub max_requests: Option<u64>,
}
impl Resolved {
fn from_universal(u: &UniversalArgs) -> Self {
Resolved {
quiet: u.quiet,
color: u.color,
no_progress: u.no_progress,
verbose: u.verbose,
..Resolved::default()
}
}
}
#[derive(Debug)]
pub struct Ctx {
globals: Resolved,
command: Command,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum PackFormatArg {
V1,
Zstd,
}
impl PackFormatArg {
fn resolve(self) -> PackFormat {
match self {
Self::V1 => PackFormat::V1,
Self::Zstd => {
let level = std::env::var("SNAPDIR_SSH_ZSTD_LEVEL")
.ok()
.and_then(|v| v.trim().parse::<i32>().ok())
.unwrap_or(DEFAULT_ZSTD_LEVEL);
PackFormat::Zstd(level)
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum OnConflictArg {
Error,
LastWins,
}
impl OnConflictArg {
fn resolve(self) -> crate::diff::OnConflict {
match self {
Self::Error => crate::diff::OnConflict::Error,
Self::LastWins => crate::diff::OnConflict::LastWins,
}
}
}
#[derive(Debug, Subcommand)]
pub enum Command {
Manifest {
#[arg(long)]
absolute: bool,
#[arg(long)]
no_follow: bool,
#[arg(long, value_name = "NAME")]
checksum_bin: Option<String>,
#[arg(
long,
value_name = "PATTERN",
action = clap::ArgAction::Append,
value_delimiter = ','
)]
exclude: Vec<String>,
#[arg(long, value_name = "N", env = "SNAPDIR_WALK_JOBS")]
walk_jobs: Option<usize>,
#[arg(long, value_name = "NAME", env = "SNAPDIR_CATALOG")]
catalog: Option<String>,
path: Option<PathBuf>,
},
Id {
#[command(flatten)]
walk: WalkArgs,
path: Option<PathBuf>,
},
Stage {
#[command(flatten)]
walk: WalkArgs,
#[command(flatten)]
transfer: TransferArgs,
dir: Option<PathBuf>,
},
Push {
#[command(flatten)]
walk: WalkArgs,
#[command(flatten)]
transfer: TransferArgs,
path: Option<PathBuf>,
},
Fetch {
#[command(flatten)]
transfer: TransferArgs,
},
Pull {
#[command(flatten)]
transfer: TransferArgs,
path: Option<PathBuf>,
},
Checkout {
#[command(flatten)]
transfer: TransferArgs,
dir: Option<PathBuf>,
},
Verify {
#[command(flatten)]
cache_mgmt: CacheMgmtArgs,
},
VerifyCache {
#[command(flatten)]
cache_mgmt: CacheMgmtArgs,
},
FlushCache {
#[command(flatten)]
cache_mgmt: CacheMgmtArgs,
},
Locations {
#[command(flatten)]
catalog: CatalogArgs,
},
Ancestors {
#[command(flatten)]
catalog: CatalogArgs,
},
Revisions {
#[command(flatten)]
catalog: CatalogArgs,
},
Defaults {
#[command(flatten)]
config: DefaultsArgs,
},
Sync {
#[command(flatten)]
transfer: TransferArgs,
#[arg(long, value_name = "STORE", env = "SNAPDIR_STORE")]
from: String,
#[arg(long, value_name = "STORE")]
to: String,
#[arg(long, value_name = "URI")]
from_objects: Option<String>,
#[arg(long, value_name = "URI")]
to_objects: Option<String>,
},
Diff {
#[command(flatten)]
id_arg: DiffIdArgs,
#[arg(long, value_name = "REF", action = clap::ArgAction::Append)]
from: Vec<String>,
#[arg(long, value_name = "REF", action = clap::ArgAction::Append)]
to: Vec<String>,
#[arg(long)]
all: bool,
#[arg(long)]
json: bool,
#[arg(long)]
exit_code: bool,
#[arg(long, value_name = "POLICY", value_enum, default_value_t = OnConflictArg::Error)]
on_conflict: OnConflictArg,
},
Version {
#[arg(long, hide = true)]
capabilities: bool,
},
#[command(name = "autocomplete", alias = "completions")]
Completions {
shell: clap_complete::Shell,
},
#[command(hide = true)]
Man,
#[command(hide = true)]
ObjectsNeeded {
#[command(flatten)]
plumbing: PlumbingArgs,
},
#[command(hide = true)]
SendPack {
#[command(flatten)]
plumbing: PlumbingArgs,
#[arg(long, value_name = "FILE|-")]
ids: PathBuf,
#[arg(long, value_name = "ID")]
manifest_id: Option<String>,
#[arg(long, value_name = "FORMAT", value_enum, default_value_t = PackFormatArg::V1, hide = true)]
pack_format: PackFormatArg,
},
#[command(hide = true)]
ReceivePack {
#[command(flatten)]
plumbing: PlumbingArgs,
#[arg(long, value_name = "ID")]
require_manifest: Option<String>,
},
}
impl Cli {
pub fn run(self) -> Result<()> {
let mut globals = Resolved::from_universal(&self.universal);
match &self.command {
Command::Manifest {
exclude,
walk_jobs,
catalog,
..
} => {
globals.exclude.clone_from(exclude);
globals.walk_jobs = *walk_jobs;
globals.catalog.clone_from(catalog);
}
Command::Id { walk, .. } => merge_walk(&mut globals, walk),
Command::Stage { walk, transfer, .. } | Command::Push { walk, transfer, .. } => {
merge_walk(&mut globals, walk);
merge_transfer(&mut globals, transfer);
}
Command::Fetch { transfer }
| Command::Pull { transfer, .. }
| Command::Checkout { transfer, .. }
| Command::Sync { transfer, .. } => merge_transfer(&mut globals, transfer),
Command::Defaults { config } => merge_defaults(&mut globals, config),
Command::Verify { cache_mgmt }
| Command::VerifyCache { cache_mgmt }
| Command::FlushCache { cache_mgmt } => merge_cache_mgmt(&mut globals, cache_mgmt),
Command::Locations { catalog }
| Command::Ancestors { catalog }
| Command::Revisions { catalog } => merge_catalog(&mut globals, catalog),
Command::Diff { id_arg, .. } => globals.id.clone_from(&id_arg.id),
Command::ObjectsNeeded { plumbing }
| Command::SendPack { plumbing, .. }
| Command::ReceivePack { plumbing, .. } => merge_plumbing(&mut globals, plumbing),
Command::Version { .. } | Command::Completions { .. } | Command::Man => {}
}
Ctx {
globals,
command: self.command,
}
.run()
}
}
fn merge_walk(g: &mut Resolved, w: &WalkArgs) {
g.exclude.clone_from(&w.exclude);
g.walk_jobs = w.walk_jobs;
}
fn merge_transfer(g: &mut Resolved, t: &TransferArgs) {
g.store.clone_from(&t.store);
g.catalog.clone_from(&t.catalog);
g.objects_store.clone_from(&t.objects_store);
g.cache_dir.clone_from(&t.cache_dir);
g.id.clone_from(&t.id);
g.jobs = t.jobs;
g.limit_rate.clone_from(&t.limit_rate);
g.adaptive = t.adaptive;
g.max_jobs = t.max_jobs;
g.max_retries = t.max_retries;
g.retry_base_ms = t.retry_base_ms;
g.retry_max_ms = t.retry_max_ms;
g.max_requests = t.max_requests;
g.linked = t.linked;
g.force = t.force;
g.keep = t.keep;
g.dryrun = t.dryrun;
}
fn merge_defaults(g: &mut Resolved, d: &DefaultsArgs) {
g.store.clone_from(&d.store);
g.catalog.clone_from(&d.catalog);
g.objects_store.clone_from(&d.objects_store);
g.cache_dir.clone_from(&d.cache_dir);
g.walk_jobs = d.walk_jobs;
g.jobs = d.jobs;
g.limit_rate.clone_from(&d.limit_rate);
g.adaptive = d.adaptive;
g.max_jobs = d.max_jobs;
g.max_retries = d.max_retries;
g.retry_base_ms = d.retry_base_ms;
g.retry_max_ms = d.retry_max_ms;
g.max_requests = d.max_requests;
}
fn merge_catalog(g: &mut Resolved, c: &CatalogArgs) {
g.catalog.clone_from(&c.catalog);
g.location.clone_from(&c.location);
g.id.clone_from(&c.id);
g.store.clone_from(&c.store);
g.cache_dir.clone_from(&c.cache_dir);
}
fn merge_cache_mgmt(g: &mut Resolved, c: &CacheMgmtArgs) {
g.store.clone_from(&c.store);
g.id.clone_from(&c.id);
g.purge = c.purge;
g.force = c.force;
g.dryrun = c.dryrun;
g.cache_dir.clone_from(&c.cache_dir);
}
fn merge_plumbing(g: &mut Resolved, p: &PlumbingArgs) {
g.store.clone_from(&p.store);
g.objects_store.clone_from(&p.objects_store);
}
impl Ctx {
#[allow(clippy::too_many_lines)]
pub fn run(&self) -> Result<()> {
match &self.command {
Command::Manifest {
absolute,
no_follow,
checksum_bin,
exclude,
path,
..
} => {
let exclude: &[String] = if exclude.is_empty() {
&self.globals.exclude
} else {
exclude
};
let jobs = self.walk_jobs();
let (meter, reporter) = self.start_progress(jobs);
let manifest = self.build_manifest(
path.as_deref(),
*absolute,
*no_follow,
checksum_bin.as_deref(),
exclude,
meter.as_deref(),
);
reporter.finish();
let manifest = manifest?;
println!("{manifest}");
let id = snapshot_id(&manifest, &Blake3Hasher::new());
let abs = resolve_root(path.as_deref())
.context("resolving the manifested directory path")?;
self.log_event("manifest", &id, &abs.to_string_lossy(), false)?;
Ok(())
}
Command::Id { path, .. } => {
let manifest = if path.is_none() && !std::io::stdin().is_terminal() {
let mut text = String::new();
std::io::stdin()
.read_to_string(&mut text)
.context("reading manifest from stdin")?;
Manifest::parse(&text).context("parsing manifest from stdin")?
} else if path.is_none() {
anyhow::bail!(
"no directory given and no manifest on stdin; \
pass a PATH (or `.`), or pipe a manifest"
);
} else {
let jobs = self.walk_jobs();
let (meter, reporter) = self.start_progress(jobs);
let manifest = self.build_manifest(
path.as_deref(),
false,
false,
None,
&self.globals.exclude,
meter.as_deref(),
);
reporter.finish();
manifest?
};
let id = snapshot_id(&manifest, &Blake3Hasher::new());
println!("{id}");
Ok(())
}
Command::Push { path, .. } => self.run_push(path.as_deref()),
Command::Fetch { .. } => self.run_fetch(),
Command::Checkout { dir, .. } => self.run_checkout(dir.as_deref()),
Command::Pull { path, .. } => self.run_pull(path.as_deref()),
Command::Verify { .. } => self.run_verify(),
Command::Stage { dir, .. } => self.run_stage(dir.as_deref()),
Command::VerifyCache { .. } => self.run_verify_cache(),
Command::FlushCache { .. } => self.run_flush_cache(),
Command::Locations { .. } => self.run_locations(),
Command::Ancestors { .. } => self.run_ancestors(),
Command::Revisions { .. } => self.run_revisions(),
Command::Version { capabilities } => {
if *capabilities {
println!(
"snapdir {} wire={WIRE_VERSION} caps={}",
env!("CARGO_PKG_VERSION"),
WIRE_CAPS.join(",")
);
} else {
println!("snapdir {}", env!("CARGO_PKG_VERSION"));
}
Ok(())
}
Command::Defaults { .. } => self.run_defaults(),
Command::Sync {
from,
to,
from_objects,
to_objects,
..
} => self.run_sync(from, to, from_objects.as_deref(), to_objects.as_deref()),
Command::Diff {
from,
to,
all,
json,
exit_code,
on_conflict,
..
} => self.run_diff(from, to, *all, *json, *exit_code, on_conflict.resolve()),
Command::Completions { shell } => {
let mut cmd = Cli::command();
clap_complete::generate(*shell, &mut cmd, "snapdir", &mut std::io::stdout());
Ok(())
}
Command::Man => {
clap_mangen::Man::new(Cli::command())
.render(&mut std::io::stdout())
.context("rendering the man page")?;
Ok(())
}
Command::ObjectsNeeded { .. } => self.run_objects_needed(),
Command::SendPack {
ids,
manifest_id,
pack_format,
..
} => self.run_send_pack(ids, manifest_id.as_deref(), pack_format.resolve()),
Command::ReceivePack {
require_manifest, ..
} => self.run_receive_pack(require_manifest.as_deref()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Source {
Flag,
Env,
Default,
}
enum CatalogTarget {
Enabled(PathBuf),
Disabled,
}
impl Source {
fn tag(self) -> &'static str {
match self {
Source::Flag => "flag",
Source::Env => "env",
Source::Default => "default",
}
}
}
fn knob_source(flag_set: bool, env_name: &str) -> Source {
if flag_set {
Source::Flag
} else if !env_name.is_empty() && std::env::var_os(env_name).is_some() {
Source::Env
} else {
Source::Default
}
}
fn print_catalog_disabled() {
eprintln!("catalog disabled (--catalog none): nothing to query");
}
impl Ctx {
#[allow(clippy::too_many_lines)]
fn run_defaults(&self) -> Result<()> {
let argv: Vec<String> = std::env::args().collect();
let has_flag = |name: &str| -> bool {
let eq = format!("{name}=");
argv.iter().any(|a| a == name || a.starts_with(&eq))
};
let mut out: Vec<String> = Vec::new();
let mut emit = |knob: &str, value: &str, src: Source| {
out.push(format!("{knob} {value} source={}", src.tag()));
};
let cache_dir = self.cache_dir();
emit(
"cache-dir",
&cache_dir.display().to_string(),
knob_source(has_flag("--cache-dir"), "SNAPDIR_CACHE_DIR"),
);
emit(
"store",
self.globals.store.as_deref().unwrap_or("none"),
knob_source(has_flag("--store"), "SNAPDIR_STORE"),
);
emit(
"objects-store",
self.globals.objects_store.as_deref().unwrap_or("none"),
knob_source(has_flag("--objects-store"), "SNAPDIR_OBJECTS_STORE"),
);
let catalog_value = match self.resolve_catalog(true) {
CatalogTarget::Enabled(db) => db.display().to_string(),
CatalogTarget::Disabled => "none".to_owned(),
};
emit(
"catalog",
&catalog_value,
knob_source(has_flag("--catalog"), "SNAPDIR_CATALOG"),
);
let jobs = self.transfer_config()?.concurrency.get();
emit(
"jobs",
&jobs.to_string(),
knob_source(has_flag("--jobs") || has_flag("-j"), "SNAPDIR_JOBS"),
);
let walk_jobs = match self.globals.walk_jobs {
Some(n) if n > 0 => n,
_ => std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.clamp(1, 16),
};
emit(
"walk-jobs",
&walk_jobs.to_string(),
knob_source(has_flag("--walk-jobs"), "SNAPDIR_WALK_JOBS"),
);
emit(
"limit-rate",
self.globals.limit_rate.as_deref().unwrap_or("none"),
knob_source(has_flag("--limit-rate"), "SNAPDIR_LIMIT_RATE"),
);
let adaptive = self
.globals
.adaptive
.map_or_else(|| "off".to_string(), |f| f.to_string());
emit(
"adaptive",
&adaptive,
knob_source(has_flag("--adaptive"), "SNAPDIR_ADAPTIVE"),
);
emit(
"max-jobs",
&self
.globals
.max_jobs
.map_or_else(|| "none".to_string(), |n| n.to_string()),
knob_source(has_flag("--max-jobs"), "SNAPDIR_MAX_JOBS"),
);
let retry = self.resolve_retry_policy();
emit(
"max-retries",
&retry.max_attempts.to_string(),
knob_source(has_flag("--max-retries"), "SNAPDIR_MAX_RETRIES"),
);
emit(
"retry-base-ms",
&retry.base.as_millis().to_string(),
knob_source(has_flag("--retry-base-ms"), "SNAPDIR_RETRY_BASE_MS"),
);
emit(
"retry-max-ms",
&retry.cap.as_millis().to_string(),
knob_source(has_flag("--retry-max-ms"), "SNAPDIR_RETRY_MAX_MS"),
);
let max_requests = self
.globals
.max_requests
.or_else(|| env_u64("SNAPDIR_MAX_REQUESTS"));
emit(
"max-requests",
&max_requests.map_or_else(|| "none".to_string(), |n| n.to_string()),
knob_source(has_flag("--max-requests"), "SNAPDIR_MAX_REQUESTS"),
);
emit(
"no-progress",
if self.globals.no_progress {
"true"
} else {
"false"
},
knob_source(has_flag("--no-progress"), "SNAPDIR_NO_PROGRESS"),
);
let color = match self.globals.color {
ColorArg::Auto => "auto",
ColorArg::Always => "always",
ColorArg::Never => "never",
};
emit("color", color, knob_source(has_flag("--color"), ""));
let fsync = match std::env::var("SNAPDIR_FSYNC").ok().as_deref() {
Some("off") => "off",
_ => "batch",
};
emit("fsync", fsync, knob_source(false, "SNAPDIR_FSYNC"));
let clonefile_on = !matches!(std::env::var("SNAPDIR_CLONEFILE").as_deref(), Ok("0"));
emit(
"clonefile",
if clonefile_on { "enabled" } else { "disabled" },
knob_source(false, "SNAPDIR_CLONEFILE"),
);
let verify_on = matches!(std::env::var("SNAPDIR_VERIFY_COPIES").as_deref(), Ok("1"));
emit(
"verify-copies",
if verify_on { "enabled" } else { "disabled" },
knob_source(false, "SNAPDIR_VERIFY_COPIES"),
);
for line in &out {
println!("{line}");
}
let mut others: Vec<(String, String)> = std::env::vars()
.filter(|(k, _)| k.starts_with("SNAPDIR") && !k.contains("VERSION"))
.collect();
others.sort();
let mut printed_header = false;
for (key, value) in others {
let legacy = key == "SNAPDIR_MANIFEST_CONTEXT" || key == "SNAPDIR_MANIFEST_EXCLUDE";
if !printed_header {
println!("other-env:");
printed_header = true;
}
if legacy {
println!(" {key}={value} (legacy)");
} else {
println!(" {key}={value}");
}
}
Ok(())
}
}
impl Ctx {
fn run_push(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if path.is_none() && self.globals.id.is_some() {
let id = self.require_id()?;
let cache = self.cache_store()?;
let manifest = cache.get_manifest(id).with_context(|| {
format!("manifest {id} not found in the local cache; stage or fetch it first")
})?;
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would push {id} to {store_url} (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let store = self.resolve_store(meter.clone())?;
if self.store_is_external()? {
store
.push(&manifest, &self.cache_dir())
.with_context(|| format!("pushing snapshot {id} to store"))?;
} else {
let scratch = ScratchDir::new("push")?;
cache
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("materializing staged snapshot {id}"))?;
store
.push(&manifest, scratch.path())
.with_context(|| format!("pushing snapshot {id} to store"))?;
}
reporter.finish();
println!("{id}");
self.log_event("push", id, store_url, true)?;
return Ok(());
}
if let Some(m) = &meter {
m.set_phase(Phase::Hashing);
}
let manifest = self.build_manifest(
path,
false,
false,
None,
&self.globals.exclude,
meter.as_deref(),
)?;
let root = resolve_root(path).context("resolving push path")?;
let id = snapshot_id(&manifest, &Blake3Hasher::new());
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would push {id} to {store_url} (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let store = self.resolve_store(meter.clone())?;
if self.store_is_external()? {
let cache = self.cache_store_with_meter(meter.clone())?;
cache
.push(&manifest, &root)
.with_context(|| format!("staging snapshot {id} into the local cache"))?;
store
.push(&manifest, &self.cache_dir())
.with_context(|| format!("pushing snapshot {id} to store"))?;
} else {
store
.push(&manifest, &root)
.with_context(|| format!("pushing snapshot {id} to store"))?;
}
reporter.finish();
println!("{id}");
self.log_event("push", &id, store_url, true)?;
Ok(())
}
fn run_fetch(&self) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self.fetch_inner(meter.as_ref());
reporter.finish();
result
}
fn fetch_inner(&self, meter: Option<&Arc<Meter>>) -> Result<()> {
let cache = self.cache_store_with_meter(meter.cloned())?;
let mut healing = false;
if let Some(id) = self.globals.id.as_deref() {
if let Ok(manifest) = cache.get_manifest(id) {
if Self::missing_cache_objects(&manifest, &self.cache_dir()).is_empty() {
if self.globals.verbose && !self.globals.quiet {
eprintln!("CACHED: {id}");
}
return Ok(());
}
healing = true;
}
}
let store = self.resolve_store(meter.cloned())?;
let id = self.require_id()?;
let manifest = store
.get_manifest(id)
.with_context(|| format!("fetching manifest {id} from store"))?;
if self.globals.dryrun {
if !self.globals.quiet {
eprintln!("dry-run: would fetch {id} into the local cache (no writes performed)");
}
return Ok(());
}
if let Some(m) = meter {
m.set_total(total_object_bytes(&manifest));
}
if self.store_is_external()? {
self.split_read_hint(
store
.fetch_files(&manifest, &self.cache_dir())
.with_context(|| format!("fetching objects for snapshot {id}")),
)?;
cache
.put_manifest(id, &manifest)
.with_context(|| format!("saving snapshot {id} to the local cache"))?;
} else {
let scratch = ScratchDir::new("fetch")?;
self.split_read_hint(
store
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("fetching objects for snapshot {id}")),
)?;
if healing {
let manifest_file = self
.cache_dir()
.join(snapdir_core::store::manifest_path(id));
if manifest_file.exists() {
std::fs::remove_file(&manifest_file).with_context(|| {
format!(
"removing stale cached manifest {} before re-fetch",
manifest_file.display()
)
})?;
}
}
cache
.push(&manifest, scratch.path())
.with_context(|| format!("saving snapshot {id} to the local cache"))?;
}
if self.globals.verbose && !self.globals.quiet {
eprintln!("SAVED: {id}");
}
Ok(())
}
fn run_checkout(&self, dir: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self.checkout_inner(dir, meter.as_ref());
reporter.finish();
result
}
fn checkout_inner(&self, dir: Option<&Path>, meter: Option<&Arc<Meter>>) -> Result<()> {
let id = self.require_id()?;
let dest = resolve_root(dir).context("resolving checkout destination")?;
if self.globals.dryrun {
if !self.globals.quiet {
eprintln!(
"dry-run: would check out {id} to {} (no writes performed)",
dest.display()
);
}
return Ok(());
}
let cache = self.cache_store_with_meter(meter.cloned())?;
let manifest = cache.get_manifest(id).with_context(|| {
format!("manifest {id} not found locally; did you forget to fetch it?")
})?;
let missing = Self::missing_cache_objects(&manifest, &self.cache_dir());
if let Some((checksum, path)) = missing.first() {
anyhow::bail!(
"snapdir: cannot check out {id}: object {checksum} for {path} is missing from the \
cache ({} object(s) absent); re-run `fetch`/`pull` to restore it",
missing.len()
);
}
if let Some(m) = meter {
m.set_total(total_object_bytes(&manifest));
}
cache
.fetch_files(&manifest, &dest)
.with_context(|| format!("checking out snapshot {id} to {}", dest.display()))?;
restore_permissions(&manifest, &dest)?;
Ok(())
}
fn run_pull(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let result = self
.fetch_inner(meter.as_ref())
.and_then(|()| self.checkout_inner(path, meter.as_ref()));
reporter.finish();
result
}
fn run_verify(&self) -> Result<()> {
if self.globals.purge {
anyhow::bail!(
"snapdir: `verify` does not support --purge; use `verify-cache --purge` to remove corrupt objects from the local cache"
);
}
let store = self.resolve_store(None)?;
let id = self.require_id()?;
let manifest = store
.get_manifest(id)
.with_context(|| format!("verifying manifest {id}"))?;
let scratch = ScratchDir::new("verify")?;
store
.fetch_files(&manifest, scratch.path())
.with_context(|| format!("verifying objects for snapshot {id}"))?;
Ok(())
}
fn run_stage(&self, path: Option<&Path>) -> Result<()> {
self.log_transfer_config();
let jobs = self.transfer_config()?.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Hashing);
}
let (manifest, copy_guards) = self.build_manifest_with_guards(
path,
false,
false,
None,
&self.globals.exclude,
meter.as_deref(),
)?;
let root = resolve_root(path).context("resolving stage path")?;
let id = snapshot_id(&manifest, &Blake3Hasher::new());
if self.globals.dryrun {
reporter.finish();
println!("{id}");
if !self.globals.quiet {
eprintln!("dry-run: would stage {id} into the local cache (no writes performed)");
}
return Ok(());
}
if let Some(m) = &meter {
m.set_total(total_object_bytes(&manifest));
m.set_phase(Phase::Transfer);
}
let cache = self
.cache_store_with_meter(meter.clone())?
.with_copy_guards(copy_guards);
cache
.push(&manifest, &root)
.with_context(|| format!("staging snapshot {id} into the local cache"))?;
reporter.finish();
println!("{id}");
self.log_event("stage", &id, &root.to_string_lossy(), true)?;
Ok(())
}
fn run_verify_cache(&self) -> Result<()> {
let cache_dir = self.cache_dir();
let purge = self.globals.purge && !self.globals.dryrun;
if self.globals.purge && self.globals.dryrun {
eprintln!("dry-run: would purge corrupt objects from the cache (no writes performed)");
}
let report = cache::verify_cache(&cache_dir, purge, &Blake3Hasher::new())
.with_context(|| format!("verifying cache at {}", cache_dir.display()))?;
for checksum in &report.corrupt {
eprintln!("Checksum mismatch for {checksum}");
}
if purge && self.globals.verbose {
for checksum in &report.purged {
eprintln!("purged {checksum}");
}
}
let missing = self.missing_cache_objects_for_verify(&cache_dir)?;
for (checksum, path) in &missing {
eprintln!("Missing object {checksum} for {path}");
}
if report.is_clean() && missing.is_empty() {
return Ok(());
}
anyhow::bail!(
"snapdir: {} corrupt + {} missing object(s) in the cache",
report.corrupt.len(),
missing.len()
)
}
fn missing_cache_objects_for_verify(&self, cache_dir: &Path) -> Result<Vec<(String, String)>> {
let cache = self.cache_store()?;
let ids: Vec<String> = if let Some(id) = self.globals.id.as_deref() {
vec![id.to_owned()]
} else {
cache
.list_manifest_ids()
.with_context(|| format!("listing cached manifests at {}", cache_dir.display()))?
};
let mut seen = std::collections::BTreeMap::new();
for id in ids {
let manifest = cache
.get_manifest(&id)
.with_context(|| format!("reading cached manifest {id}"))?;
for (checksum, path) in Self::missing_cache_objects(&manifest, cache_dir) {
seen.entry(checksum).or_insert(path);
}
}
Ok(seen.into_iter().collect())
}
fn run_flush_cache(&self) -> Result<()> {
let cache_dir = self.cache_dir();
if self.globals.dryrun {
eprintln!(
"dry-run: would flush the cache at {} (no writes performed)",
cache_dir.display()
);
return Ok(());
}
cache::flush_cache(&cache_dir)
.with_context(|| format!("flushing cache at {}", cache_dir.display()))?;
Ok(())
}
fn run_locations(&self) -> Result<()> {
let Some(catalog) = self.open_catalog()? else {
print_catalog_disabled();
return Ok(());
};
for record in catalog.locations().context("querying catalog locations")? {
println!("{}", locations_json_line(&record));
}
Ok(())
}
fn run_ancestors(&self) -> Result<()> {
let Some(catalog) = self.open_catalog()? else {
print_catalog_disabled();
return Ok(());
};
let id = self.require_id()?;
let location = self.globals.location.as_deref();
for record in catalog
.ancestors(id, location)
.with_context(|| format!("querying catalog ancestors of {id}"))?
{
println!("{}", ancestors_json_line(&record));
}
Ok(())
}
fn run_revisions(&self) -> Result<()> {
let Some(catalog) = self.open_catalog()? else {
print_catalog_disabled();
return Ok(());
};
let location = self
.globals
.location
.as_deref()
.or(self.globals.store.as_deref())
.context("missing --location option")?;
for record in catalog
.revisions(location)
.with_context(|| format!("querying catalog revisions at {location}"))?
{
println!("{}", revisions_json_line(&record));
}
Ok(())
}
fn log_event(&self, event: &str, id: &str, location: &str, allow_default: bool) -> Result<()> {
let CatalogTarget::Enabled(db) = self.resolve_catalog(allow_default) else {
return Ok(());
};
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating catalog directory {}", parent.display()))?;
}
let catalog =
Catalog::open(&db).with_context(|| format!("opening catalog at {}", db.display()))?;
catalog
.log(event, id, location, &SystemClock)
.with_context(|| format!("recording catalog event {event} for {id}"))?;
Ok(())
}
fn open_catalog(&self) -> Result<Option<Catalog>> {
let CatalogTarget::Enabled(db) = self.resolve_catalog(true) else {
return Ok(None);
};
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating catalog directory {}", parent.display()))?;
}
let catalog =
Catalog::open(&db).with_context(|| format!("opening catalog at {}", db.display()))?;
Ok(Some(catalog))
}
fn resolve_catalog(&self, allow_default: bool) -> CatalogTarget {
match self.globals.catalog.as_deref() {
Some(v) if v == "none" || v.is_empty() => CatalogTarget::Disabled,
Some(v) if v.contains(std::path::MAIN_SEPARATOR) => {
CatalogTarget::Enabled(PathBuf::from(v))
}
Some(v) => CatalogTarget::Enabled(self.cache_dir().join(format!("{v}-catalog.redb"))),
None if allow_default => {
CatalogTarget::Enabled(self.cache_dir().join("default-catalog.redb"))
}
None => CatalogTarget::Disabled,
}
}
fn resolve_store(&self, meter: Option<Arc<Meter>>) -> Result<Box<dyn Store>> {
if self.globals.objects_store.is_some() {
return Ok(Box::new(self.resolve_split_store(meter)?));
}
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
let adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
let config = self.transfer_config_for(Some(adapter.name()))?;
store_for_adapter(&adapter, store_url, config, meter)
}
fn resolve_split_store(&self, meter: Option<Arc<Meter>>) -> Result<SplitStore> {
let objects_url = self
.globals
.objects_store
.as_deref()
.context("missing --objects-store option")?;
let store_url = self.globals.store.as_deref().context(
"missing --store option: --objects-store sets the object pool, but the manifest \
location (--store / $SNAPDIR_STORE) is still required",
)?;
let objects_adapter =
resolve_adapter(objects_url).context("resolving --objects-store protocol")?;
let objects_config = self.transfer_config_for(Some(objects_adapter.name()))?;
let objects =
stream_store_for_adapter(&objects_adapter, objects_url, objects_config, meter.clone())?;
let manifests_adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
let manifests_config = self.transfer_config_for(Some(manifests_adapter.name()))?;
let manifests =
stream_store_for_adapter(&manifests_adapter, store_url, manifests_config, meter)?;
Ok(SplitStore::from_boxed(objects, manifests))
}
fn store_is_external(&self) -> Result<bool> {
if self.globals.objects_store.is_some() {
return Ok(false);
}
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
let adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
Ok(matches!(adapter, Adapter::External { .. }))
}
fn transfer_config(&self) -> Result<TransferConfig> {
self.transfer_config_for(None)
}
fn transfer_config_for(&self, scheme: Option<&str>) -> Result<TransferConfig> {
let limit_rate = match self.globals.limit_rate.as_deref() {
Some(rate) => Some(parse_rate(rate)?),
None => None,
};
let (max_requests_per_sec, max_bytes_per_sec) = match scheme {
Some(scheme) => self.resolve_rate_limits(scheme, limit_rate),
None => (None, limit_rate),
};
let auto = TransferConfig::default().concurrency.get();
let concurrency = match self.globals.jobs {
Some(n) if n > 0 => n,
_ => auto,
};
let base = TransferConfig::new(concurrency, max_bytes_per_sec)
.with_retry(self.resolve_retry_policy());
let base = TransferConfig {
max_requests_per_sec,
..base
};
match self.globals.adaptive {
None => Ok(base),
Some(fraction) => {
let ceiling = self
.globals
.max_jobs
.or(self.globals.jobs.filter(|&n| n > 0))
.unwrap_or(auto)
.clamp(1, ADAPTIVE_CEILING_CAP);
Ok(base.with_adaptive(TransferAdaptivePolicy::On { fraction, ceiling }))
}
}
}
fn resolve_retry_policy(&self) -> RetryPolicy {
let default = RetryPolicy::default();
let max_attempts = self
.globals
.max_retries
.or_else(|| env_u64("SNAPDIR_MAX_RETRIES").and_then(|n| u32::try_from(n).ok()))
.unwrap_or(default.max_attempts);
let base_ms = self
.globals
.retry_base_ms
.or_else(|| env_u64("SNAPDIR_RETRY_BASE_MS"))
.map_or(default.base, std::time::Duration::from_millis);
let cap_ms = self
.globals
.retry_max_ms
.or_else(|| env_u64("SNAPDIR_RETRY_MAX_MS"))
.map_or(default.cap, std::time::Duration::from_millis);
RetryPolicy {
max_attempts,
base: base_ms,
cap: cap_ms,
}
}
fn resolve_rate_limits(
&self,
scheme: &str,
limit_rate: Option<u64>,
) -> (Option<u64>, Option<u64>) {
let backend = limits::for_scheme(scheme);
let req_override = self
.globals
.max_requests
.or_else(|| env_u64("SNAPDIR_MAX_REQUESTS"))
.filter(|&n| n > 0);
let max_requests_per_sec =
req_override.or_else(|| min_opt(backend.read_rps, backend.write_rps));
let max_bytes_per_sec = limit_rate.or_else(|| min_opt(backend.read_bps, backend.write_bps));
(max_requests_per_sec, max_bytes_per_sec)
}
fn log_transfer_config(&self) {
if self.globals.quiet || !self.globals.verbose {
return;
}
let Ok(config) = self.transfer_config() else {
return;
};
match config.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let pct = (fraction * 100.0).round() as u64;
match self.globals.limit_rate.as_deref() {
Some(rate) => eprintln!(
"adaptive: target {pct}% of capacity, ceiling {ceiling}, limit {rate}"
),
None => eprintln!("adaptive: target {pct}% of capacity, ceiling {ceiling}"),
}
}
TransferAdaptivePolicy::Off => {
let concurrency = config.concurrency.get();
match self.globals.limit_rate.as_deref() {
Some(rate) => eprintln!("transfers: {concurrency} concurrent, limit {rate}"),
None => eprintln!("transfers: {concurrency} concurrent"),
}
}
}
}
fn color_choice(&self) -> ColorChoice {
self.globals.color.resolve()
}
fn walk_jobs(&self) -> usize {
match self.globals.walk_jobs {
Some(n) if n > 0 => n,
_ => std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.clamp(1, 16),
}
}
fn start_progress(&self, jobs: usize) -> (Option<Arc<Meter>>, ProgressReporter) {
let is_tty = std::io::stderr().is_terminal();
let active = should_render(
is_tty,
self.globals.no_progress || self.globals.quiet,
std::env::var("TERM").ok().as_deref(),
);
if active {
let meter = Arc::new(Meter::new());
meter.set_phase(Phase::Discovering);
let color = use_color(
self.color_choice(),
is_tty,
std::env::var_os("NO_COLOR").is_some(),
);
let ascii = matches!(std::env::var("TERM").as_deref(), Ok("dumb"));
let reporter = ProgressReporter::start(
Arc::clone(&meter),
jobs,
true,
color,
ascii,
self.globals.adaptive,
);
(Some(meter), reporter)
} else {
let reporter =
ProgressReporter::start(Arc::new(Meter::new()), jobs, false, false, false, None);
(None, reporter)
}
}
fn sync_side_store(
&self,
manifest_url: &str,
objects_url: Option<&str>,
side: &str,
) -> Result<Box<dyn StreamStore + Sync>> {
let Some(objects_url) = objects_url else {
let adapter = resolve_adapter(manifest_url)
.with_context(|| format!("resolving {side} store protocol"))?;
let config = self.transfer_config_for(Some(adapter.name()))?;
return stream_store_for_adapter(&adapter, manifest_url, config, None);
};
let objects_adapter = resolve_adapter(objects_url)
.with_context(|| format!("resolving {side}-objects protocol"))?;
let objects_config = self.transfer_config_for(Some(objects_adapter.name()))?;
let objects =
stream_store_for_adapter(&objects_adapter, objects_url, objects_config, None)?;
let manifests_adapter = resolve_adapter(manifest_url)
.with_context(|| format!("resolving {side} store protocol"))?;
let manifests_config = self.transfer_config_for(Some(manifests_adapter.name()))?;
let manifests =
stream_store_for_adapter(&manifests_adapter, manifest_url, manifests_config, None)?;
Ok(Box::new(SplitStore::from_boxed(objects, manifests)))
}
fn run_sync(
&self,
from_url: &str,
to_url: &str,
from_objects: Option<&str>,
to_objects: Option<&str>,
) -> Result<()> {
let id = self.require_id()?;
anyhow::ensure!(
from_url != to_url,
"sync --from and --to must differ (both are {from_url})"
);
let from_adapter = resolve_adapter(from_url).context("resolving --from store protocol")?;
let from_store = self.sync_side_store(from_url, from_objects, "--from")?;
let to_store = self.sync_side_store(to_url, to_objects, "--to")?;
let config = self.transfer_config_for(Some(from_adapter.name()))?;
self.log_transfer_config();
let jobs = config.concurrency.get();
let (meter, reporter) = self.start_progress(jobs);
if let Some(m) = &meter {
m.set_phase(Phase::Transfer);
}
let report = snapdir_stores::sync_snapshot(
&*from_store,
&*to_store,
id,
&config,
self.globals.dryrun,
meter.as_deref(),
);
reporter.finish();
let report =
report.with_context(|| format!("syncing snapshot {id} from {from_url} to {to_url}"))?;
if report.dry_run {
if !self.globals.quiet {
eprintln!(
"dry-run: would copy {} object(s) for {id}",
report.objects_copied
);
}
} else {
println!("{id}");
if !self.globals.quiet {
eprintln!(
"synced {id}: {} copied, {} skipped ({} bytes)",
report.objects_copied, report.objects_skipped, report.bytes_copied
);
}
}
Ok(())
}
fn run_diff(
&self,
from: &[String],
to: &[String],
all: bool,
json: bool,
exit_code: bool,
on_conflict: crate::diff::OnConflict,
) -> Result<()> {
use crate::diff::{classify, render_json, render_porcelain, union_side};
let from_manifests = self.resolve_side(from).context("resolving --from side")?;
let to_manifests = self.resolve_side(to).context("resolving --to side")?;
let from_map = union_side(&from_manifests, on_conflict).map_err(|c| {
anyhow::anyhow!(
"intra-side conflict on --from: the path {:?} has differing content across \
two refs (collision); pass --on-conflict last-wins to let the last ref win",
c.path
)
})?;
let to_map = union_side(&to_manifests, on_conflict).map_err(|c| {
anyhow::anyhow!(
"intra-side conflict on --to: the path {:?} has differing content across \
two refs (collision); pass --on-conflict last-wins to let the last ref win",
c.path
)
})?;
let rows = classify(&from_map, &to_map, all);
let has_difference = rows
.iter()
.any(|r| r.status != crate::diff::Status::Unchanged);
let out = std::io::stdout();
let mut out = out.lock();
if json {
writeln!(out, "{}", render_json(&rows))?;
} else {
write!(out, "{}", render_porcelain(&rows))?;
}
out.flush()?;
if exit_code && has_difference {
drop(out);
std::process::exit(1);
}
Ok(())
}
fn resolve_side(&self, refs: &[String]) -> Result<Vec<Manifest>> {
let pinned_id = self.globals.id.as_deref();
let mut manifests = Vec::new();
for store_url in refs {
let adapter =
resolve_adapter(store_url).context("resolving a --from/--to ref protocol")?;
let config = self.transfer_config_for(Some(adapter.name()))?;
let store = stream_store_for_adapter(&adapter, store_url, config, None)?;
let all_ids = store
.list_manifest_ids()
.with_context(|| format!("listing manifests in {store_url}"))?;
let ids: Vec<String> = match pinned_id {
Some(id) if all_ids.iter().any(|x| x == id) => vec![id.to_owned()],
_ => all_ids,
};
for id in &ids {
let manifest = store
.get_manifest(id)
.with_context(|| format!("reading manifest {id} from {store_url}"))?;
manifests.push(manifest);
}
}
Ok(manifests)
}
fn run_objects_needed(&self) -> Result<()> {
let ids = read_checksum_lines(std::io::stdin().lock())
.context("reading object checksums from stdin")?;
let ids = dedupe_preserving_order(ids);
let store = self.resolve_stream_store()?;
let needed = store
.objects_needed(&ids)
.context("querying the store for absent objects")?;
let stdout = std::io::stdout();
let mut out = stdout.lock();
for id in needed {
writeln!(out, "{id}")?;
}
Ok(())
}
fn run_send_pack(
&self,
ids: &Path,
manifest_id: Option<&str>,
format: PackFormat,
) -> Result<()> {
let ids = if ids == Path::new("-") {
read_checksum_lines(std::io::stdin().lock())
.context("reading object checksums from stdin")?
} else {
let file = std::fs::File::open(ids)
.with_context(|| format!("opening --ids file {}", ids.display()))?;
read_checksum_lines(std::io::BufReader::new(file))
.with_context(|| format!("reading object checksums from {}", ids.display()))?
};
let ids = dedupe_preserving_order(ids);
if let Some(id) = manifest_id {
anyhow::ensure!(
is_hex64(id),
"invalid --manifest-id {id:?}: expected 64 lowercase hex characters"
);
}
let store = self.resolve_stream_store()?;
let stdout = std::io::stdout();
let report = write_pack_with_format(&*store, &ids, manifest_id, format, stdout.lock())
.context("writing pack stream to stdout")?;
if !self.globals.quiet {
eprintln!(
"sent pack: {} object(s){}",
report.objects_written,
if report.manifest_written {
" + manifest"
} else {
""
}
);
}
Ok(())
}
fn run_receive_pack(&self, require_manifest: Option<&str>) -> Result<()> {
if let Some(id) = require_manifest {
anyhow::ensure!(
is_hex64(id),
"invalid --require-manifest {id:?}: expected 64 lowercase hex characters"
);
}
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
let adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
let config = self.transfer_config_for(Some(adapter.name()))?;
let stdin = std::io::stdin();
let (report, committed) = if matches!(adapter, Adapter::File) {
let durability = fsync_durability_from_env()?;
let store = FileStore::new_with_config(store_url, config);
let mut sink = FileSink::new(&store).with_durability(durability);
read_pack_recording(stdin.lock(), &mut sink)?
} else {
let store = stream_store_for_adapter(&adapter, store_url, config, None)?;
let mut sink = StreamSink::new(&*store);
read_pack_recording(stdin.lock(), &mut sink)?
};
if let Some(required) = require_manifest {
match committed.as_deref() {
Some(id) if id == required => {}
Some(other) => anyhow::bail!(
"pack committed manifest {other}, but --require-manifest expected {required}"
),
None => anyhow::bail!(
"pack stream carried no manifest record, but --require-manifest {required} \
was given"
),
}
}
if !self.globals.quiet {
eprintln!(
"received pack: {} object(s) written, {} skipped{}",
report.objects_written,
report.objects_skipped,
match &committed {
Some(id) => format!(", manifest {id}"),
None => String::new(),
}
);
}
Ok(())
}
fn resolve_stream_store(&self) -> Result<Box<dyn StreamStore + Sync>> {
if self.globals.objects_store.is_some() {
return Ok(Box::new(self.resolve_split_store(None)?));
}
let store_url = self.globals.store.as_deref().context(NO_STORE_CONFIGURED)?;
let adapter = resolve_adapter(store_url).context("resolving --store protocol")?;
let config = self.transfer_config_for(Some(adapter.name()))?;
stream_store_for_adapter(&adapter, store_url, config, None)
}
fn split_read_hint<T>(&self, result: Result<T>) -> Result<T> {
if self.globals.objects_store.is_some() {
return result;
}
result.map_err(|e| {
e.context(
"if this snapshot was pushed with a split --objects-store, re-run with \
--objects-store/--from-objects pointing at that object pool",
)
})
}
fn cache_store(&self) -> Result<FileStore> {
Ok(FileStore::from_root_with_config(
self.cache_dir(),
self.transfer_config()?,
))
}
fn cache_store_with_meter(&self, meter: Option<Arc<Meter>>) -> Result<FileStore> {
Ok(self.cache_store()?.with_meter(meter))
}
fn cache_dir(&self) -> PathBuf {
if let Some(dir) = &self.globals.cache_dir {
return dir.clone();
}
let home = std::env::var("HOME").unwrap_or_default();
let base = std::env::var("XDG_CACHE_HOME").unwrap_or_else(|_| format!("{home}/.cache"));
PathBuf::from(format!("{base}/snapdir"))
}
fn missing_cache_objects(manifest: &Manifest, cache_dir: &Path) -> Vec<(String, String)> {
let mut missing = Vec::new();
for entry in manifest.entries() {
if entry.path_type != PathType::File {
continue;
}
let object = cache_dir.join(snapdir_core::store::object_path(&entry.checksum));
if !object.is_file() {
missing.push((entry.checksum.clone(), entry.path.clone()));
}
}
missing
}
fn require_id(&self) -> Result<&str> {
self.globals.id.as_deref().context("missing --id option")
}
fn build_manifest(
&self,
path: Option<&Path>,
absolute: bool,
no_follow: bool,
checksum_bin: Option<&str>,
exclude: &[String],
meter: Option<&Meter>,
) -> Result<Manifest> {
let (root, options) = self.resolve_walk(
path,
absolute,
no_follow,
exclude,
"resolving manifest path",
)?;
match checksum_bin {
None | Some("b3sum") => {
let context = std::env::var("SNAPDIR_MANIFEST_CONTEXT").unwrap_or_default();
if context.is_empty() {
walk_with(&root, &options, &Blake3Hasher::new(), meter)
} else {
walk_with(&root, &options, &Blake3KeyedHasher::new(context), meter)
}
}
Some("md5sum") => walk_with(&root, &options, &Md5Hasher::new(), meter),
Some("sha256sum") => walk_with(&root, &options, &Sha256Hasher::new(), meter),
Some(other) => {
anyhow::bail!("snapdir: unsupported --checksum-bin '{other}'")
}
}
}
fn build_manifest_with_guards(
&self,
path: Option<&Path>,
absolute: bool,
no_follow: bool,
checksum_bin: Option<&str>,
exclude: &[String],
meter: Option<&Meter>,
) -> Result<(Manifest, std::collections::HashMap<PathBuf, CopyGuard>)> {
let (root, options) =
self.resolve_walk(path, absolute, no_follow, exclude, "resolving stage path")?;
match checksum_bin {
None | Some("b3sum") => {
let context = std::env::var("SNAPDIR_MANIFEST_CONTEXT").unwrap_or_default();
if context.is_empty() {
walk_with_guards_ctx(&root, &options, &Blake3Hasher::new(), meter)
} else {
walk_with_guards_ctx(&root, &options, &Blake3KeyedHasher::new(context), meter)
}
}
Some("md5sum") => walk_with_guards_ctx(&root, &options, &Md5Hasher::new(), meter),
Some("sha256sum") => walk_with_guards_ctx(&root, &options, &Sha256Hasher::new(), meter),
Some(other) => {
anyhow::bail!("snapdir: unsupported --checksum-bin '{other}'")
}
}
}
fn resolve_walk(
&self,
path: Option<&Path>,
absolute: bool,
no_follow: bool,
exclude: &[String],
context: &'static str,
) -> Result<(PathBuf, WalkOptions)> {
let root = resolve_root(path).context(context)?;
let (home_cache, cache_dir) = exclude_runtime_paths(self.globals.cache_dir.as_deref());
let combined = combine_excludes(exclude, &home_cache, &cache_dir);
let matcher = match &combined.pattern {
Some(pattern) => {
Some(ExcludeMatcher::new(pattern).context("compiling --exclude pattern")?)
}
None => None,
};
let follow = if no_follow || combined.forces_no_follow {
FollowMode::NoFollow
} else {
FollowMode::Follow
};
let path_mode = if absolute {
PathMode::Absolute
} else {
PathMode::Relative
};
let options = WalkOptions {
follow,
path_mode,
exclude: matcher,
walk_jobs: self.globals.walk_jobs,
};
Ok((root, options))
}
}
fn store_for_adapter(
adapter: &Adapter,
store_url: &str,
config: TransferConfig,
meter: Option<Arc<Meter>>,
) -> Result<Box<dyn Store>> {
match adapter {
Adapter::File => Ok(Box::new(
FileStore::new_with_config(store_url, config).with_meter(meter),
)),
Adapter::S3 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let store = S3Store::connect_with(store_url, endpoint.as_deref(), config)
.with_context(|| format!("connecting to S3 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::B2 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let region = std::env::var("SNAPDIR_B2_REGION")
.or_else(|_| std::env::var("AWS_REGION"))
.ok();
let store =
B2Store::connect_with(store_url, endpoint.as_deref(), region.as_deref(), config)
.with_context(|| format!("connecting to B2 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::Gcs => {
let store = GcsStore::connect_with(store_url, config)
.with_context(|| format!("connecting to GCS store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::External { .. } => {
let store = ExternalStore::new(store_url)
.with_context(|| format!("resolving external store for {store_url}"))?;
Ok(Box::new(store))
}
}
}
fn stream_store_for_adapter(
adapter: &Adapter,
store_url: &str,
config: TransferConfig,
meter: Option<Arc<Meter>>,
) -> Result<Box<dyn StreamStore + Sync>> {
match adapter {
Adapter::File => Ok(Box::new(
FileStore::new_with_config(store_url, config).with_meter(meter),
)),
Adapter::S3 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let store = S3Store::connect_with(store_url, endpoint.as_deref(), config)
.with_context(|| format!("connecting to S3 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::B2 => {
let endpoint = std::env::var("SNAPDIR_S3_STORE_ENDPOINT_URL").ok();
let region = std::env::var("SNAPDIR_B2_REGION")
.or_else(|_| std::env::var("AWS_REGION"))
.ok();
let store =
B2Store::connect_with(store_url, endpoint.as_deref(), region.as_deref(), config)
.with_context(|| format!("connecting to B2 store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::Gcs => {
let store = GcsStore::connect_with(store_url, config)
.with_context(|| format!("connecting to GCS store {store_url}"))?;
Ok(Box::new(store.with_meter(meter)))
}
Adapter::External { .. } => Err(anyhow::anyhow!(
"sync requires in-process stores (file/s3/b2/gcs); \
external `snapdir-*-store` URLs are not supported: {store_url}"
)),
}
}
fn read_checksum_lines(reader: impl BufRead) -> Result<Vec<String>> {
let mut ids = Vec::new();
for line in reader.lines() {
let line = line.context("reading checksum list")?;
if line.is_empty() {
continue;
}
anyhow::ensure!(
is_hex64(&line),
"invalid object checksum {line:?}: expected 64 lowercase hex characters"
);
ids.push(line);
}
Ok(ids)
}
fn dedupe_preserving_order(ids: Vec<String>) -> Vec<String> {
let mut seen = std::collections::HashSet::with_capacity(ids.len());
ids.into_iter()
.filter(|id| seen.insert(id.clone()))
.collect()
}
fn fsync_durability_from_env() -> Result<Durability> {
match std::env::var("SNAPDIR_FSYNC") {
Err(std::env::VarError::NotPresent) => Ok(Durability::Batch),
Ok(raw) => match raw.trim() {
"" | "batch" => Ok(Durability::Batch),
"off" => Ok(Durability::Off),
other => anyhow::bail!(
"invalid SNAPDIR_FSYNC {other:?}: expected `batch` (default) or `off`"
),
},
Err(std::env::VarError::NotUnicode(_)) => anyhow::bail!(
"invalid SNAPDIR_FSYNC: value is not valid UTF-8; expected `batch` (default) or `off`"
),
}
}
struct RecordingSink<'a> {
inner: &'a mut dyn PackSink,
manifest_id: Option<String>,
}
impl PackSink for RecordingSink<'_> {
fn has_object(&mut self, checksum: &str) -> Result<bool, StoreError> {
self.inner.has_object(checksum)
}
fn stage_object(
&mut self,
checksum: &str,
len: u64,
payload: &mut dyn Read,
) -> Result<(), StoreError> {
self.inner.stage_object(checksum, len, payload)
}
fn commit_object(&mut self, checksum: &str) -> Result<(), StoreError> {
self.inner.commit_object(checksum)
}
fn abort_object(&mut self, checksum: &str) {
self.inner.abort_object(checksum);
}
fn put_manifest(&mut self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
self.inner.put_manifest(id, manifest)?;
self.manifest_id = Some(id.to_owned());
Ok(())
}
fn flush_barrier(&mut self) -> Result<(), StoreError> {
self.inner.flush_barrier()
}
}
fn read_pack_recording(
input: impl Read,
sink: &mut dyn PackSink,
) -> Result<(PackReadReport, Option<String>)> {
let mut recording = RecordingSink {
inner: sink,
manifest_id: None,
};
let report = read_pack(input, &mut recording).context("reading pack stream from stdin")?;
Ok((report, recording.manifest_id))
}
fn parse_rate(s: &str) -> Result<u64> {
let trimmed = s.trim();
anyhow::ensure!(!trimmed.is_empty(), "empty --limit-rate value");
let split = trimmed
.find(|c: char| !(c.is_ascii_digit() || c == '.'))
.unwrap_or(trimmed.len());
let (num, suffix) = trimmed.split_at(split);
anyhow::ensure!(
!num.is_empty(),
"invalid --limit-rate '{s}': expected a number, optionally followed by K/M/G"
);
let value: f64 = num
.parse()
.with_context(|| format!("invalid --limit-rate '{s}': '{num}' is not a number"))?;
anyhow::ensure!(
value.is_finite() && value >= 0.0,
"invalid --limit-rate '{s}': must be a non-negative number"
);
let unit = suffix.trim().to_ascii_lowercase();
let unit = unit.strip_suffix('b').unwrap_or(&unit);
let unit = unit.strip_suffix('i').unwrap_or(unit);
let multiplier: f64 = match unit {
"" => 1.0,
"k" => 1024.0,
"m" => 1024.0 * 1024.0,
"g" => 1024.0 * 1024.0 * 1024.0,
other => {
anyhow::bail!("invalid --limit-rate '{s}': unknown unit '{other}' (use K, M, or G)")
}
};
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
Ok((value * multiplier) as u64)
}
fn parse_rate_arg(s: &str) -> Result<String, String> {
match parse_rate(s) {
Ok(_) => Ok(s.to_owned()),
Err(_) => Err(format!(
"invalid --limit-rate '{s}': expected a wget-style byte rate, e.g. 10M, 512K, or 1G"
)),
}
}
fn env_u64(key: &str) -> Option<u64> {
std::env::var(key).ok().and_then(|v| v.trim().parse().ok())
}
fn min_opt(a: Option<u64>, b: Option<u64>) -> Option<u64> {
match (a, b) {
(Some(x), Some(y)) => Some(x.min(y)),
(Some(x), None) | (None, Some(x)) => Some(x),
(None, None) => None,
}
}
fn parse_adaptive_fraction(s: &str) -> Result<f64, String> {
let value: f64 = s
.trim()
.parse()
.map_err(|_| format!("invalid --adaptive '{s}': expected a number in (0.0, 1.0]"))?;
if !value.is_finite() || value <= 0.0 || value > 1.0 {
return Err(format!(
"invalid --adaptive '{s}': fraction must be in (0.0, 1.0]"
));
}
Ok(value)
}
fn walk_with<H: Hasher + HashFile + Sync>(
root: &Path,
options: &WalkOptions,
hasher: &H,
meter: Option<&Meter>,
) -> Result<Manifest> {
walk_with_meter(root, options, hasher, meter)
.with_context(|| format!("walking {}", root.display()))
}
fn walk_with_guards_ctx<H: Hasher + HashFile + Sync>(
root: &Path,
options: &WalkOptions,
hasher: &H,
meter: Option<&Meter>,
) -> Result<(Manifest, std::collections::HashMap<PathBuf, CopyGuard>)> {
walk_with_guards(root, options, hasher, meter)
.with_context(|| format!("walking {}", root.display()))
}
fn total_object_bytes(manifest: &Manifest) -> u64 {
manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::File)
.map(|e| e.size)
.sum()
}
fn restore_permissions(manifest: &Manifest, dest: &Path) -> Result<()> {
for entry in manifest.entries() {
if entry.path_type == PathType::Directory {
continue;
}
apply_mode(dest, entry)?;
}
let mut dirs: Vec<&_> = manifest
.entries()
.iter()
.filter(|e| e.path_type == PathType::Directory)
.collect();
dirs.sort_by_key(|e| std::cmp::Reverse(e.path.len()));
for entry in dirs {
apply_mode(dest, entry)?;
}
Ok(())
}
fn apply_mode(dest: &Path, entry: &ManifestEntry) -> Result<()> {
let rel = entry.path.strip_prefix("./").unwrap_or(&entry.path);
let rel = rel.strip_suffix('/').unwrap_or(rel);
let target = if rel.is_empty() {
dest.to_path_buf()
} else {
dest.join(rel)
};
let mode = u32::from_str_radix(&entry.permissions, 8)
.with_context(|| format!("invalid permissions {:?}", entry.permissions))?;
std::fs::set_permissions(&target, std::fs::Permissions::from_mode(mode))
.with_context(|| format!("setting permissions on {}", target.display()))?;
Ok(())
}
struct ScratchDir {
path: PathBuf,
}
impl ScratchDir {
fn new(tag: &str) -> Result<Self> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("snapdir-cli-{tag}-{}-{n}", std::process::id()));
std::fs::create_dir_all(&path)
.with_context(|| format!("creating scratch dir {}", path.display()))?;
Ok(Self { path })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for ScratchDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn resolve_root(path: Option<&Path>) -> Result<PathBuf> {
let raw = match path {
Some(p) => p.to_path_buf(),
None => std::env::current_dir().context("getting current directory")?,
};
let abs = if raw.is_absolute() {
raw
} else {
let cwd = std::env::current_dir().context("getting current directory")?;
cwd.join(raw)
};
Ok(lexically_normalize_root(&abs))
}
fn lexically_normalize_root(abs: &Path) -> PathBuf {
use std::path::Component;
let mut out = PathBuf::new();
let mut pushed_any = false;
for comp in abs.components() {
match comp {
Component::CurDir => {}
Component::RootDir | Component::Prefix(_) => out.push(comp.as_os_str()),
Component::Normal(_) | Component::ParentDir => {
out.push(comp.as_os_str());
pushed_any = true;
}
}
}
if !pushed_any && out.as_os_str().is_empty() {
out.push(Component::RootDir.as_os_str());
}
out
}
fn combine_excludes(patterns: &[String], home_cache: &str, cache_dir: &str) -> ExpandedExclude {
let mut groups: Vec<String> = Vec::new();
let mut forces_no_follow = false;
for pattern in patterns {
let expanded = expand_excludes(pattern, home_cache, cache_dir);
forces_no_follow |= expanded.forces_no_follow;
if let Some(ere) = expanded.pattern {
groups.push(format!("(?:{ere})"));
}
}
let pattern = if groups.is_empty() {
None
} else {
Some(groups.join("|"))
};
ExpandedExclude {
pattern,
forces_no_follow,
}
}
fn exclude_runtime_paths(cache_dir: Option<&Path>) -> (String, String) {
let home = std::env::var("HOME").unwrap_or_default();
let home_cache = format!("{home}/.cache/");
let cache_dir = if let Some(dir) = cache_dir {
dir.display().to_string()
} else {
let base = std::env::var("XDG_CACHE_HOME").unwrap_or_else(|_| format!("{home}/.cache"));
format!("{base}/snapdir")
};
(home_cache, cache_dir)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn remote_store_routing_resolves_every_scheme_to_its_adapter() {
assert_eq!(
resolve_adapter("file:///long/term/x").unwrap(),
Adapter::File
);
assert_eq!(resolve_adapter("s3://bucket/path").unwrap(), Adapter::S3);
assert_eq!(resolve_adapter("b2://bucket/path").unwrap(), Adapter::B2);
let gcs = resolve_adapter("gs://bucket/path").unwrap();
assert_eq!(gcs, Adapter::Gcs);
assert_eq!(gcs.name(), "gcs");
assert_eq!(gcs.store_binary(), "snapdir-gcs-store");
let xyz = resolve_adapter("xyz://bucket/path").unwrap();
assert_eq!(
xyz,
Adapter::External {
name: "xyz".to_owned()
}
);
assert!(!xyz.is_builtin());
assert_eq!(xyz.store_binary(), "snapdir-xyz-store");
}
#[test]
fn remote_store_routing_file_builds_filestore_without_io() {
let adapter = resolve_adapter("file:///tmp/snapdir-routing-test").unwrap();
let store = store_for_adapter(
&adapter,
"file:///tmp/snapdir-routing-test",
TransferConfig::default(),
None,
)
.unwrap();
assert!(store.get_manifest("0".repeat(64).as_str()).is_err());
}
#[test]
fn remote_store_routing_external_builds_shim_for_third_party_scheme() {
let adapter = resolve_adapter("xyz://bucket/base").unwrap();
let store = ExternalStore::new("xyz://bucket/base").unwrap();
assert_eq!(store.binary(), Path::new("snapdir-xyz-store"));
let routed = store_for_adapter(
&adapter,
"xyz://bucket/base",
TransferConfig::default(),
None,
);
assert!(routed.is_ok());
}
#[test]
fn remote_store_routing_rejects_invalid_protocol() {
assert!(resolve_adapter("NotAScheme://x").is_err());
}
fn cli_with(args: &[&str]) -> Ctx {
unsafe {
std::env::remove_var("SNAPDIR_JOBS");
std::env::remove_var("SNAPDIR_LIMIT_RATE");
std::env::remove_var("SNAPDIR_ADAPTIVE");
std::env::remove_var("SNAPDIR_MAX_JOBS");
}
let mut full = vec!["snapdir", "fetch"];
full.extend_from_slice(args);
ctx_from(Cli::try_parse_from(full).expect("parse cli"))
}
fn ctx_from(cli: Cli) -> Ctx {
let mut globals = Resolved::from_universal(&cli.universal);
match &cli.command {
Command::Manifest {
exclude, walk_jobs, ..
} => {
globals.exclude.clone_from(exclude);
globals.walk_jobs = *walk_jobs;
}
Command::Id { walk, .. } => merge_walk(&mut globals, walk),
Command::Stage { walk, transfer, .. } | Command::Push { walk, transfer, .. } => {
merge_walk(&mut globals, walk);
merge_transfer(&mut globals, transfer);
}
Command::Fetch { transfer }
| Command::Pull { transfer, .. }
| Command::Checkout { transfer, .. }
| Command::Sync { transfer, .. } => merge_transfer(&mut globals, transfer),
Command::Verify { cache_mgmt }
| Command::VerifyCache { cache_mgmt }
| Command::FlushCache { cache_mgmt } => merge_cache_mgmt(&mut globals, cache_mgmt),
Command::Locations { catalog }
| Command::Ancestors { catalog }
| Command::Revisions { catalog } => merge_catalog(&mut globals, catalog),
Command::Diff { id_arg, .. } => globals.id.clone_from(&id_arg.id),
_ => {}
}
Ctx {
globals,
command: cli.command,
}
}
#[test]
fn transfer_flags_parse_rate() {
assert_eq!(parse_rate("10M").unwrap(), 10_485_760);
assert_eq!(parse_rate("512K").unwrap(), 524_288);
assert_eq!(parse_rate("1G").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1GiB").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1GB").unwrap(), 1_073_741_824);
assert_eq!(parse_rate("1000").unwrap(), 1000);
assert_eq!(parse_rate("1.5M").unwrap(), 1_572_864);
assert_eq!(parse_rate(" 2k ").unwrap(), 2048);
assert_eq!(parse_rate("1kib").unwrap(), 1024);
for bad in ["10X", "abc", "", " ", "M", "1.2.3", "-5M"] {
assert!(parse_rate(bad).is_err(), "expected {bad:?} to be rejected");
}
}
#[test]
fn transfer_flags_jobs_explicit() {
let cfg = cli_with(&["--jobs", "4"]).transfer_config().unwrap();
assert_eq!(cfg.concurrency.get(), 4);
assert_eq!(cfg.max_bytes_per_sec, None);
}
#[test]
fn transfer_flags_jobs_one_is_sequential() {
let cfg = cli_with(&["--jobs", "1"]).transfer_config().unwrap();
assert_eq!(cfg.concurrency.get(), 1);
}
#[test]
fn transfer_flags_jobs_unset_is_auto() {
let cfg = cli_with(&[]).transfer_config().unwrap();
assert!(cfg.concurrency.get() >= 1 && cfg.concurrency.get() <= 16);
assert_eq!(
cfg.concurrency.get(),
TransferConfig::default().concurrency.get()
);
}
#[test]
fn transfer_flags_jobs_zero_is_auto() {
let cfg = cli_with(&["--jobs", "0"]).transfer_config().unwrap();
assert!(cfg.concurrency.get() >= 1 && cfg.concurrency.get() <= 16);
assert_eq!(
cfg.concurrency.get(),
TransferConfig::default().concurrency.get()
);
}
#[test]
fn transfer_flags_limit_rate_threads_into_config() {
let cfg = cli_with(&["--limit-rate", "1M"]).transfer_config().unwrap();
assert_eq!(cfg.max_bytes_per_sec, Some(1_048_576));
let cfg = cli_with(&["-j", "2", "--limit-rate", "512K"])
.transfer_config()
.unwrap();
assert_eq!(cfg.concurrency.get(), 2);
assert_eq!(cfg.max_bytes_per_sec, Some(524_288));
}
#[test]
fn transfer_flags_bad_limit_rate_errors() {
assert!(
Cli::try_parse_from(["snapdir", "fetch", "--limit-rate", "nope"]).is_err(),
"a malformed --limit-rate must be rejected at parse time"
);
let cfg = cli_with(&["--limit-rate", "1M"]).transfer_config().unwrap();
assert_eq!(cfg.max_bytes_per_sec, Some(1_048_576));
}
#[test]
fn adaptive_flag_without_value_defaults_to_point_eight() {
let cfg = cli_with(&["--adaptive"]).transfer_config().unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
assert!(
(fraction - 0.8).abs() < 1e-9,
"expected fraction ~0.8, got {fraction}"
);
assert_eq!(ceiling, TransferConfig::default().concurrency.get());
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_flag_with_explicit_fraction() {
let cfg = cli_with(&["--adaptive=0.5"]).transfer_config().unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, .. } => {
assert!(
(fraction - 0.5).abs() < 1e-9,
"expected fraction ~0.5, got {fraction}"
);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_flag_out_of_range_is_rejected() {
for bad in ["0", "0.0", "1.5", "-0.2", "nope"] {
unsafe {
std::env::remove_var("SNAPDIR_ADAPTIVE");
std::env::remove_var("SNAPDIR_MAX_JOBS");
}
let arg = format!("--adaptive={bad}");
assert!(
Cli::try_parse_from(["snapdir", "fetch", &arg]).is_err(),
"expected --adaptive={bad} to be rejected"
);
}
let cfg = cli_with(&["--adaptive=1.0"]).transfer_config().unwrap();
assert!(matches!(cfg.adaptive, TransferAdaptivePolicy::On { .. }));
}
#[test]
fn adaptive_unset_equals_pre_phase18_value() {
let cfg = cli_with(&["--jobs", "4", "--limit-rate", "1M"])
.transfer_config()
.unwrap();
let expected = TransferConfig::new(4, Some(1_048_576));
assert_eq!(cfg.adaptive, TransferAdaptivePolicy::Off);
assert_eq!(cfg.adaptive, expected.adaptive);
assert_eq!(cfg.concurrency, expected.concurrency);
assert_eq!(cfg.max_bytes_per_sec, expected.max_bytes_per_sec);
let cfg = cli_with(&[]).transfer_config().unwrap();
let expected = TransferConfig::new(TransferConfig::default().concurrency.get(), None);
assert_eq!(cfg.adaptive, TransferAdaptivePolicy::Off);
assert_eq!(cfg.concurrency, expected.concurrency);
assert_eq!(cfg.max_bytes_per_sec, expected.max_bytes_per_sec);
}
#[test]
fn adaptive_max_jobs_sets_ceiling() {
let cfg = cli_with(&["--adaptive", "--max-jobs", "8"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => assert_eq!(ceiling, 8),
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
let cfg = cli_with(&["--adaptive=0.6", "--jobs", "3", "--limit-rate", "512K"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { fraction, ceiling } => {
assert!((fraction - 0.6).abs() < 1e-9);
assert_eq!(ceiling, 3);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
assert_eq!(cfg.max_bytes_per_sec, Some(524_288));
let cfg = cli_with(&["--adaptive", "--jobs", "2", "--max-jobs", "12"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => assert_eq!(ceiling, 12),
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
#[test]
fn adaptive_ceiling_is_clamped_to_cap() {
let cfg = cli_with(&["--adaptive", "--max-jobs", "9999"])
.transfer_config()
.unwrap();
match cfg.adaptive {
TransferAdaptivePolicy::On { ceiling, .. } => {
assert_eq!(ceiling, ADAPTIVE_CEILING_CAP);
}
TransferAdaptivePolicy::Off => panic!("expected adaptive On"),
}
}
static RATELIMIT_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn ratelimit_clear_env() {
unsafe {
for key in [
"SNAPDIR_MAX_RETRIES",
"SNAPDIR_RETRY_BASE_MS",
"SNAPDIR_RETRY_MAX_MS",
"SNAPDIR_MAX_REQUESTS",
"SNAPDIR_LIMIT_RATE",
] {
std::env::remove_var(key);
}
}
}
#[test]
fn ratelimit_retry_policy_flag_beats_env_beats_default() {
let _guard = RATELIMIT_ENV_LOCK.lock().unwrap();
ratelimit_clear_env();
let p = cli_with(&[]).resolve_retry_policy();
assert_eq!(p, RetryPolicy::default());
assert_eq!(p.max_attempts, 5);
assert_eq!(p.base, std::time::Duration::from_millis(250));
assert_eq!(p.cap, std::time::Duration::from_secs(30));
ratelimit_clear_env();
unsafe {
std::env::set_var("SNAPDIR_MAX_RETRIES", "7");
std::env::set_var("SNAPDIR_RETRY_BASE_MS", "100");
std::env::set_var("SNAPDIR_RETRY_MAX_MS", "9000");
}
let p = cli_with(&[]).resolve_retry_policy();
assert_eq!(p.max_attempts, 7);
assert_eq!(p.base, std::time::Duration::from_millis(100));
assert_eq!(p.cap, std::time::Duration::from_secs(9));
let p = cli_with(&[
"--max-retries",
"9",
"--retry-base-ms",
"500",
"--retry-max-ms",
"45000",
])
.resolve_retry_policy();
assert_eq!(p.max_attempts, 9);
assert_eq!(p.base, std::time::Duration::from_millis(500));
assert_eq!(p.cap, std::time::Duration::from_secs(45));
ratelimit_clear_env();
}
#[test]
fn ratelimit_rate_limits_per_backend_defaults() {
let _guard = RATELIMIT_ENV_LOCK.lock().unwrap();
ratelimit_clear_env();
let cli = cli_with(&[]);
assert_eq!(
cli.resolve_rate_limits("b2", None),
(Some(20), Some(25 * 1024 * 1024))
);
assert_eq!(cli.resolve_rate_limits("s3", None), (Some(3500), None));
assert_eq!(cli.resolve_rate_limits("gcs", None), (Some(1000), None));
assert_eq!(cli.resolve_rate_limits("gs", None), (Some(1000), None));
assert_eq!(cli.resolve_rate_limits("file", None), (None, None));
assert_eq!(cli.resolve_rate_limits("azure", None), (None, None));
ratelimit_clear_env();
}
#[test]
fn ratelimit_max_requests_flag_beats_env_beats_backend_default() {
let _guard = RATELIMIT_ENV_LOCK.lock().unwrap();
ratelimit_clear_env();
assert_eq!(cli_with(&[]).resolve_rate_limits("b2", None).0, Some(20));
assert_eq!(cli_with(&[]).resolve_rate_limits("file", None).0, None);
ratelimit_clear_env();
unsafe {
std::env::set_var("SNAPDIR_MAX_REQUESTS", "2");
}
assert_eq!(cli_with(&[]).resolve_rate_limits("b2", None).0, Some(2));
assert_eq!(cli_with(&[]).resolve_rate_limits("file", None).0, Some(2));
let cli = cli_with(&["--max-requests", "3"]);
assert_eq!(cli.resolve_rate_limits("b2", None).0, Some(3));
assert_eq!(cli.resolve_rate_limits("file", None).0, Some(3));
ratelimit_clear_env();
let cli = cli_with(&["--max-requests", "0"]);
assert_eq!(cli.resolve_rate_limits("b2", None).0, Some(20));
assert_eq!(cli.resolve_rate_limits("file", None).0, None);
ratelimit_clear_env();
}
#[test]
fn ratelimit_limit_rate_overrides_backend_byte_default() {
let _guard = RATELIMIT_ENV_LOCK.lock().unwrap();
ratelimit_clear_env();
let cli = cli_with(&[]);
assert_eq!(
cli.resolve_rate_limits("b2", None).1,
Some(25 * 1024 * 1024)
);
assert_eq!(
cli.resolve_rate_limits("b2", Some(1_048_576)).1,
Some(1_048_576)
);
assert_eq!(
cli.resolve_rate_limits("file", Some(1_048_576)).1,
Some(1_048_576)
);
assert_eq!(cli.resolve_rate_limits("file", None).1, None);
ratelimit_clear_env();
}
#[test]
fn ratelimit_transfer_config_unchanged_when_knobs_unset() {
let _guard = RATELIMIT_ENV_LOCK.lock().unwrap();
ratelimit_clear_env();
let scheme_cfg = cli_with(&[]).transfer_config_for(Some("file")).unwrap();
let no_scheme_cfg = cli_with(&[]).transfer_config_for(None).unwrap();
let historical = TransferConfig::new(TransferConfig::default().concurrency.get(), None);
for cfg in [&scheme_cfg, &no_scheme_cfg] {
assert_eq!(cfg.max_requests_per_sec, None);
assert_eq!(cfg.max_bytes_per_sec, None);
assert_eq!(cfg.retry, RetryPolicy::default());
assert_eq!(cfg.adaptive, TransferAdaptivePolicy::Off);
assert_eq!(cfg.concurrency, historical.concurrency);
}
ratelimit_clear_env();
}
}