nix-cache-watcher 0.0.7

Upload new nix artifacts to an s3-compatible binary cache
Documentation
//! nix-cache-watcher binary
#![warn(
    clippy::all,
    clippy::pedantic,
    rust_2018_idioms,
    missing_docs,
    clippy::missing_docs_in_private_items
)]
#![allow(
    clippy::option_if_let_else,
    clippy::module_name_repetitions,
    clippy::shadow_unrelated,
    clippy::must_use_candidate,
    clippy::implicit_hasher
)]

use std::{
    ffi::OsStr,
    fs::File,
    io::{BufWriter, Write},
    os::unix::ffi::OsStrExt,
    path::{Path, PathBuf},
};

use clap::{Parser, Subcommand};
use color_eyre::eyre::{eyre, Context, Result};
use nix_cache_watcher::nix::{
    sign_store_paths, upload_paths_to_cache, NixConfiguration, StoreState,
};
use tracing::{debug, info, instrument, metadata::LevelFilter};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

/// Wrapper for the main command invocation
#[derive(Parser)]
#[clap(name = "nix-cache-watcher")]
#[clap(version, author, about, long_about = None)]
#[clap(propagate_version = true)]
struct Cli {
    /// Control logging
    #[clap(short, long, action, global(true))]
    verbose: bool,
    /// Should unicode be respected when printing paths?
    ///
    /// This will force non-unicode characters in paths to undergo lossy encoding to produce valid UTF-8.
    /// This option is automatically and forcibly enabled when outputting to a tty.
    #[clap(short = 'u', long, action, global(true))]
    respect_unicode: bool,
    /// Location to serialize the state of the store to
    #[clap(
        long,
        value_parser,
        value_name = "FILE",
        default_value = ".nix_store_state",
        global(true)
    )]
    state_location: PathBuf,
    /// How many arguments should we pass to `nix` when we shell out
    #[clap(long, value_parser, default_value_t = 50)]
    fanout_factor: usize,
    /// Should we filter against a binary cache?
    ///
    /// When enabled, will check with the provided remote (different than the one being uploaded to)
    /// binary cache and skip operating on any paths that already exist in it
    #[clap(short, long, action, global(true))]
    filter_against_cache: bool,
    /// The URI of the cache to filter against
    ///
    /// This is expected to not contain a trailing slash
    #[clap(
        short,
        long,
        value_parser,
        value_name = "URI",
        default_value = "https://cache.nixos.org",
        global(true)
    )]
    cache_uri: String,
    /// Should we filter against the local (upload destination) cache?
    ///
    /// The provided URI is expected to not contain a trailing slash
    #[clap(short, long, value_parser, value_name = "URI", global(true))]
    local_cache_uri: Option<String>,
    /// Subcommand
    #[clap(subcommand)]
    command: Commands,
}

/// Individual sub commands
#[derive(Subcommand)]
enum Commands {
    /// Save the current state of the store, creating a checkpoint file
    SaveStore,
    /// Diff's the current state against the saved state and prints paths that have been added since
    /// the checkpoint was taken
    DiffStore,
    /// Signs paths in the nix store that are new since the last snapshot
    SignStore {
        /// Location of the key file to use for signing
        #[clap(short, long, value_parser, value_name = "FILE")]
        key_file: PathBuf,
    },
    /// Uploads new store paths to the cache by shelling out to the nix tooling
    UploadDiff {
        /// Store path
        #[clap(short, long, value_parser, value_name = "STORE_PATH")]
        remote_store: String,
    },
}

#[async_std::main]
async fn main() -> Result<()> {
    // Parse the cli and setup color_eyre
    let cli = Cli::parse();
    color_eyre::install()?;
    // Setup the logging if on verbose mode
    if cli.verbose {
        tracing_subscriber::registry()
            .with(fmt::layer().with_writer(std::io::stderr).pretty())
            .with(
                EnvFilter::builder()
                    .with_default_directive(LevelFilter::DEBUG.into())
                    .from_env_lossy(),
            )
            .init();
    } else {
        tracing_subscriber::registry()
            .with(fmt::layer().with_writer(std::io::stderr).pretty())
            .with(
                EnvFilter::builder()
                    .with_default_directive(LevelFilter::WARN.into())
                    .from_env_lossy(),
            )
            .init();
    }
    // Generate the configuration
    // TODO: Arguments to override this
    let nix_config = NixConfiguration::default();
    // Get our cache_uri option
    let cache_uri: Option<&str> = if cli.filter_against_cache {
        Some(&cli.cache_uri)
    } else {
        None
    };
    // Run the subcommand
    match cli.command {
        // Save the current store state
        Commands::SaveStore => {
            // Generate the store state
            let store_state = StoreState::from_store(&nix_config)
                .context("Error reading state of the nix store")?;
            println!("Captured {} store paths", store_state.path_count());
            // Write it out to a file
            // First get the location, defauling to `.nix_store_state`
            let location = cli.state_location;
            // Open up a file at that path
            info!(?location, "Creating store state file");
            let file = File::create(&location).context("Failed to create store state file")?;
            // Setup the encoder for xz compression
            let mut encoder = xz2::write::XzEncoder::new(file, 3);
            // Encode into the file
            bincode::encode_into_std_write(&store_state, &mut encoder, bincode::config::standard())
                .context("Failed to serialize the store state to file")?;
            // Finish encoding and flush the file
            let mut file = encoder.finish().context("Failed to flush xz encoder")?;
            file.flush().context("Failed to close out file")?;
        }
        Commands::DiffStore => {
            // Try to restore the previous snapshot
            let location = cli.state_location;
            let diff = diff_state(
                &location,
                &nix_config,
                cache_uri,
                cli.local_cache_uri.as_deref(),
            )
            .await?;
            info!("{} new store paths detected", diff.len());
            // Print them out, this gets a bit complicated, so we delegate to a method
            print_paths(diff, cli.respect_unicode).context("Error printing paths")?;
        }
        Commands::SignStore { key_file } => {
            // Go ahead and diff the store
            let location = cli.state_location;
            let diff = diff_state(
                &location,
                &nix_config,
                cache_uri,
                cli.local_cache_uri.as_deref(),
            )
            .await?;
            info!("Signing {} new store paths", diff.len());
            // Ship them off for signing
            let results = sign_store_paths(diff, key_file, cli.fanout_factor)
                .context("Failed to sign some or all store paths")?;
            print!("Signed {} paths in {:?}", results.count, results.duration);
        }
        Commands::UploadDiff { remote_store } => {
            // Go ahead and diff the store
            let location = cli.state_location;
            let diff = diff_state(
                &location,
                &nix_config,
                cache_uri,
                cli.local_cache_uri.as_deref(),
            )
            .await?;
            info!("Uploading {} new store paths", diff.len());
            upload_paths_to_cache(diff, &remote_store, cli.fanout_factor)
                .context("Failed uploading paths to cache")?;
        }
    }
    Ok(())
}

/// Diff the current state against a state file
#[instrument]
async fn diff_state(
    location: &Path,
    nix_config: &NixConfiguration,
    filter: Option<&str>,
    local_cache: Option<&str>,
) -> Result<Vec<PathBuf>> {
    // Open up a file at that path
    let old_state = open_state_file(location)?;
    // Get current store state
    let store_state =
        StoreState::from_store(nix_config).context("Error reading state of the nix store")?;
    info!("Captured {} store paths", store_state.path_count());
    // Diff them
    let state = old_state
        .diff(&store_state)
        .context("Failed to diff store")?;
    let path_count = state.path_count();
    info!("{} new paths", path_count);
    // Filter it if asked too
    let state = match filter {
        Some(cache_uri) => {
            let filtered = state
                .filter_against_cache(cache_uri)
                .await
                .context("Failed to filter diff")?;
            info!(
                "{} paths filtered out against remote cache, {} paths remaining",
                path_count - filtered.path_count(),
                filtered.path_count()
            );
            filtered
        }
        None => state,
    };
    let path_count = state.path_count();
    // Also filter against local cache
    match local_cache {
        Some(cache_uri) => {
            let filtered = state
                .filter_against_cache(cache_uri)
                .await
                .context("Failed to filter diff")?;
            info!(
                "{} paths filtered out against local cache, {} paths remaining.",
                path_count - filtered.path_count(),
                filtered.path_count()
            );
            Ok(filtered.paths().collect())
        }
        None => Ok(state.paths().collect()),
    }
}

/// Open up a serialized store state
#[instrument]
fn open_state_file(location: &Path) -> Result<StoreState> {
    info!(?location, "Opening store state file");
    let file = File::open(&location).context("Failed to create store state file")?;
    // Setup the decoder for xz decompression
    let mut decoder = xz2::read::XzDecoder::new(file);
    // Decode from the file
    let state: StoreState =
        bincode::decode_from_std_read(&mut decoder, bincode::config::standard())
            .context("Failed to decode store state from snapshot file")?;
    // Filter it if asked to
    info!(
        "Decoded previous store state with {} captured paths",
        state.path_count()
    );
    Ok(state)
}

/// Output a list of paths
///
/// Takes a flag that indicates if unicode should be respected or not, and will forcibly respect
/// unicode when `Stdout` belongs to a tty
///
/// This method is overkill for dealing with top level nix-store paths, but I want to have it around
/// anyway
fn print_paths(
    paths: impl IntoIterator<Item = impl AsRef<Path>>,
    respect_unicode: bool,
) -> Result<()> {
    // Get a locked stdout
    let stdout = std::io::stdout().lock();
    // Buffer it
    let mut stdout = BufWriter::new(stdout);
    // Use the display encoding if either respect_unicode is set, or we are a tty
    if respect_unicode || atty::is(atty::Stream::Stdout) {
        debug!(
            ?respect_unicode,
            "Respecting unicode, this may have been forcibly enabled by printing to a tty."
        );
        // Is a tty
        for path in paths {
            writeln!(stdout, "{}", path.as_ref().display())
                .context("Failed writing path to stdout")?;
        }
    } else {
        debug!(?respect_unicode, "Not respecting unicode");
        // Not a tty
        for path in paths {
            // Since this application is additional tooling for Nix, we can be pretty sure that we
            // are running on a *nix system, so we can just yeet the path into bytes.
            let path: &OsStr = path.as_ref().as_ref();
            let path = path.as_bytes();
            // Write it out
            stdout
                .write_all(path)
                .context("Failed writing path to stdout")?;
            // And then a newline
            writeln!(stdout).context("Failed writing path to stdout")?;
        }
    }
    // Flush the writer
    let mut stdout = match stdout.into_inner() {
        Ok(x) => x,
        Err(_) => return Err(eyre!("Failed flushing stdout")),
    };
    stdout.flush().context("Failed flushing stdout")?;
    Ok(())
}