use std::path::PathBuf;
use std::sync::Arc;
use clap::{Parser, Subcommand};
use object_store::{ObjectStore, aws::AmazonS3Builder};
use tracing::warn;
#[derive(Debug, thiserror::Error)]
enum CliError {
#[error("failed to initialize S3 store for {endpoint:?}")]
StoreInit {
endpoint: String,
#[source]
source: object_store::Error,
},
#[error("failed to list backups at prefix {prefix:?}")]
List {
prefix: String,
#[source]
source: eat_rocks::Error,
},
#[error("restore to {} failed", target.display())]
Restore {
target: PathBuf,
#[source]
source: eat_rocks::Error,
},
}
#[derive(Parser)]
#[command(version, about)]
struct Cli {
#[arg(long)]
endpoint: String,
#[arg(long)]
bucket: Option<String>,
#[arg(long, default_value = "")]
prefix: String,
#[arg(long, env = "AWS_ACCESS_KEY_ID", requires = "secret_access_key")]
access_key_id: Option<String>,
#[arg(long, env = "AWS_SECRET_ACCESS_KEY", requires = "access_key_id")]
secret_access_key: Option<String>,
#[command(subcommand)]
command: Command,
}
#[derive(Subcommand)]
enum Command {
List,
Restore {
#[arg(long)]
backup_id: Option<u64>,
#[arg(long)]
wal_dir: Option<PathBuf>,
#[arg(long, default_value_t = eat_rocks::DEFAULT_CONCURRENCY)]
concurrency: usize,
#[arg(long)]
no_verify: bool,
target: PathBuf,
},
}
impl Cli {
fn build_store(&self) -> Result<Arc<dyn ObjectStore>, Box<CliError>> {
let bucket = self.bucket.as_deref().unwrap_or("_");
let mut builder = AmazonS3Builder::new()
.with_endpoint(&self.endpoint)
.with_bucket_name(bucket)
.with_allow_http(true)
.with_virtual_hosted_style_request(self.bucket.is_none());
builder = match (&self.access_key_id, &self.secret_access_key) {
(Some(key_id), Some(secret)) => builder
.with_access_key_id(key_id)
.with_secret_access_key(secret),
(None, None) => builder.with_skip_signature(true),
_ => unreachable!("clap `requires` ensures both or neither are present"),
};
let store = builder.build().map_err(|source| CliError::StoreInit {
endpoint: self.endpoint.clone(),
source,
})?;
Ok(Arc::new(store))
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_target(false)
.init();
let cli = Cli::parse();
let result = match &cli.command {
Command::List => list(&cli).await,
Command::Restore { .. } => cmd_restore(&cli).await,
};
if let Err(e) = result {
eprintln!("error: {e}");
let mut err: &dyn std::error::Error = &e;
while let Some(source) = err.source() {
eprintln!(" caused by: {source}");
err = source;
}
std::process::exit(1);
}
}
async fn list(cli: &Cli) -> Result<(), Box<CliError>> {
let store = cli.build_store()?;
let prefix = &cli.prefix;
let ids = eat_rocks::list_backup_ids(&store, prefix)
.await
.map_err(|source| CliError::List {
prefix: prefix.clone(),
source,
})?;
if ids.is_empty() {
warn!("no backups found");
return Ok(());
}
for id in &ids {
match eat_rocks::fetch_meta(&store, prefix, *id).await {
Ok(meta) => {
println!(
"backup {id:>4} | seq {:>12} | ts {} | {} files",
meta.sequence_number,
meta.timestamp,
meta.files.len(),
);
}
Err(e) => warn!(backup_id = id, error = %e, "failed to read backup metadata"),
}
}
Ok(())
}
async fn cmd_restore(cli: &Cli) -> Result<(), Box<CliError>> {
let Command::Restore {
backup_id,
target,
wal_dir,
concurrency,
no_verify,
} = &cli.command
else {
unreachable!()
};
let store = cli.build_store()?;
eat_rocks::restore(
store,
&cli.prefix,
target,
eat_rocks::RestoreOptions {
backup_id: *backup_id,
concurrency: *concurrency,
verify: !no_verify,
wal_dir: wal_dir.clone(),
},
)
.await
.map_err(|source| CliError::Restore {
target: target.clone(),
source,
})?;
Ok(())
}