eat-rocks 0.1.1

Restore a rocks database from object storage
Documentation
use std::path::PathBuf;
use std::sync::Arc;

use clap::{Parser, Subcommand};
use object_store::{ObjectStore, aws::AmazonS3Builder};
use tracing::warn;

#[derive(Debug, thiserror::Error)]
enum CliError {
    #[error("failed to initialize S3 store for {endpoint:?}")]
    StoreInit {
        endpoint: String,
        #[source]
        source: object_store::Error,
    },

    #[error("failed to list backups at prefix {prefix:?}")]
    List {
        prefix: String,
        #[source]
        source: eat_rocks::Error,
    },

    #[error("restore to {} failed", target.display())]
    Restore {
        target: PathBuf,
        #[source]
        source: eat_rocks::Error,
    },
}

#[derive(Parser)]
#[command(version, about)]
struct Cli {
    /// s3-compatible endpoint url
    ///
    /// for subdomain style (aka virtual-hosted) like tigris, include the bucket
    /// in the hostname, like `--endpoint https://constellation.t3.storage.dev`.
    #[arg(long)]
    endpoint: String,

    /// bucket name
    ///
    /// only for path-style buckets (minio, localstack,). see `--endpoint`.
    #[arg(long)]
    bucket: Option<String>,

    /// Prefix within the bucket (path to backup root)
    #[arg(long, default_value = "")]
    prefix: String,

    /// access key ID, omit for public access
    #[arg(long, env = "AWS_ACCESS_KEY_ID", requires = "secret_access_key")]
    access_key_id: Option<String>,

    /// secret access key, omit for public access
    #[arg(long, env = "AWS_SECRET_ACCESS_KEY", requires = "access_key_id")]
    secret_access_key: Option<String>,

    #[command(subcommand)]
    command: Command,
}

#[derive(Subcommand)]
enum Command {
    /// show available backups
    List,
    /// restore a backup to a local directory
    Restore {
        /// backup to restore
        ///
        /// default: latest. use `list` to see available backups.
        #[arg(long)]
        backup_id: Option<u64>,

        /// WAL directory (default: same as target)
        #[arg(long)]
        wal_dir: Option<PathBuf>,

        /// max concurrent actions against object storage
        #[arg(long, default_value_t = eat_rocks::DEFAULT_CONCURRENCY)]
        concurrency: usize,

        /// skip verifying crc32c checksums
        ///
        /// these are almost free (they're computed as the file is streamed),
        /// and the restore should typically be i/o-bound, so i'm not sure when/
        /// why turning this off would be useful.
        #[arg(long)]
        no_verify: bool,

        /// target directory for restored database
        target: PathBuf,
    },
}

impl Cli {
    fn build_store(&self) -> Result<Arc<dyn ObjectStore>, Box<CliError>> {
        // if `--bucket` is passed, then we get path-style buckets in the URL.
        // if not, the caller is responsible for putting the bucket in the endpoint
        // url, but we still need to set it, hence the `_` placeholder.
        let bucket = self.bucket.as_deref().unwrap_or("_");

        let mut builder = AmazonS3Builder::new()
            .with_endpoint(&self.endpoint)
            .with_bucket_name(bucket)
            .with_allow_http(true)
            .with_virtual_hosted_style_request(self.bucket.is_none());

        builder = match (&self.access_key_id, &self.secret_access_key) {
            (Some(key_id), Some(secret)) => builder
                .with_access_key_id(key_id)
                .with_secret_access_key(secret),
            (None, None) => builder.with_skip_signature(true),
            _ => unreachable!("clap `requires` ensures both or neither are present"),
        };

        let store = builder.build().map_err(|source| CliError::StoreInit {
            endpoint: self.endpoint.clone(),
            source,
        })?;
        Ok(Arc::new(store))
    }
}

// ---------------------------------------------------------------------------

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
        )
        .with_target(false)
        .init();

    let cli = Cli::parse();
    let result = match &cli.command {
        Command::List => list(&cli).await,
        Command::Restore { .. } => cmd_restore(&cli).await,
    };

    if let Err(e) = result {
        eprintln!("error: {e}");
        let mut err: &dyn std::error::Error = &e;
        while let Some(source) = err.source() {
            eprintln!("  caused by: {source}");
            err = source;
        }
        std::process::exit(1);
    }
}

async fn list(cli: &Cli) -> Result<(), Box<CliError>> {
    let store = cli.build_store()?;
    let prefix = &cli.prefix;

    let ids = eat_rocks::list_backup_ids(&store, prefix)
        .await
        .map_err(|source| CliError::List {
            prefix: prefix.clone(),
            source,
        })?;

    if ids.is_empty() {
        warn!("no backups found");
        return Ok(());
    }

    for id in &ids {
        match eat_rocks::fetch_meta(&store, prefix, *id).await {
            Ok(meta) => {
                println!(
                    "backup {id:>4} | seq {:>12} | ts {} | {} files",
                    meta.sequence_number,
                    meta.timestamp,
                    meta.files.len(),
                );
            }
            Err(e) => warn!(backup_id = id, error = %e, "failed to read backup metadata"),
        }
    }

    Ok(())
}

async fn cmd_restore(cli: &Cli) -> Result<(), Box<CliError>> {
    let Command::Restore {
        backup_id,
        target,
        wal_dir,
        concurrency,
        no_verify,
    } = &cli.command
    else {
        unreachable!()
    };

    let store = cli.build_store()?;

    eat_rocks::restore(
        store,
        &cli.prefix,
        target,
        eat_rocks::RestoreOptions {
            backup_id: *backup_id,
            concurrency: *concurrency,
            verify: !no_verify,
            wal_dir: wal_dir.clone(),
        },
    )
    .await
    .map_err(|source| CliError::Restore {
        target: target.clone(),
        source,
    })?;

    Ok(())
}