rust-parallel 1.22.0

Fast command line app in rust/tokio to run commands in parallel. Similar interface to GNU parallel or xargs.
mod metrics;
mod path_cache;

use anyhow::Context;

use tokio::sync::Semaphore;

use tracing::{Level, Span, debug, error, info, instrument, span_enabled, trace};

use std::sync::Arc;

use crate::{
    command_line_args::CommandLineArgs,
    common::OwnedCommandAndArgs,
    input::{InputLineNumber, InputMessage, InputProducer},
    output::{OutputSender, OutputWriter},
    process::ChildProcessFactory,
    progress::Progress,
};

use self::{metrics::CommandMetrics, path_cache::CommandPathCache};

#[derive(Debug)]
struct Command {
    command_and_args: OwnedCommandAndArgs,
    input_line_number: InputLineNumber,
}

impl Command {
    #[instrument(
        name = "Command::run",
        skip_all,
        fields(
            cmd = ?self.command_and_args.command_path,
            args = ?self.command_and_args.args,
            line = %self.input_line_number,
            stdin = %self.command_and_args.stdin,
            child_pid,
        ),
        level = "debug")]
    async fn run(self, context: &CommandRunContext, output_sender: OutputSender) {
        debug!("begin run");

        let command_metrics = &context.command_metrics;

        let OwnedCommandAndArgs {
            command_path,
            args,
            stdin,
        } = &self.command_and_args;

        command_metrics.increment_commands_run();

        let child_process = match context
            .child_process_factory
            .spawn(command_path, args, stdin.clone())
            .await
        {
            Err(e) => {
                error!("spawn error command: {}: {}", self, e);
                command_metrics.increment_spawn_errors();
                return;
            }
            Ok(child_process) => child_process,
        };

        if span_enabled!(Level::DEBUG) {
            let child_pid = child_process.id();
            Span::current().record("child_pid", child_pid);

            debug!("spawned child process, awaiting completion");
        }

        match child_process.await_completion().await {
            Err(e) => {
                error!("child process error command: {} error: {}", self, e);
                command_metrics.handle_child_process_execution_error(e);
            }
            Ok(output) => {
                debug!("command exit status = {}", output.status);
                if !output.status.success() {
                    command_metrics.increment_exit_status_errors();
                }

                output_sender
                    .send(output, self.command_and_args, self.input_line_number)
                    .await;
            }
        };

        debug!("end run");
    }
}

impl std::fmt::Display for Command {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{},line={}",
            self.command_and_args, self.input_line_number
        )
    }
}

pub struct CommandService {
    command_line_args: &'static CommandLineArgs,
    command_path_cache: CommandPathCache,
    command_semaphore: Arc<Semaphore>,
    context: Arc<CommandRunContext>,
    output_writer: OutputWriter,
}

impl CommandService {
    pub fn new(command_line_args: &'static CommandLineArgs, progress: Arc<Progress>) -> Self {
        let context = Arc::new(CommandRunContext {
            child_process_factory: ChildProcessFactory::new(command_line_args),
            command_metrics: CommandMetrics::default(),
            progress,
        });
        Self {
            command_line_args,
            command_path_cache: CommandPathCache::new(command_line_args),
            command_semaphore: Arc::new(Semaphore::new(command_line_args.jobs)),
            context,
            output_writer: OutputWriter::new(command_line_args),
        }
    }

    async fn spawn_command(
        &self,
        command_and_args: OwnedCommandAndArgs,
        input_line_number: InputLineNumber,
    ) -> anyhow::Result<()> {
        let command = Command {
            command_and_args,
            input_line_number,
        };

        if self.command_line_args.dry_run {
            info!("{}", command);
            return Ok(());
        }

        if self.command_line_args.exit_on_error && self.context.command_metrics.error_occurred() {
            trace!("return from spawn_command due to exit_on_error");
            return Ok(());
        }

        let context_clone = Arc::clone(&self.context);

        let output_sender = self.output_writer.sender();

        let permit = Arc::clone(&self.command_semaphore)
            .acquire_owned()
            .await
            .context("command_semaphore.acquire_owned error")?;

        tokio::spawn(async move {
            command.run(&context_clone, output_sender).await;

            drop(permit);

            context_clone.progress.command_finished();
        });

        Ok(())
    }

    async fn process_input_message(&self, input_message: InputMessage) -> anyhow::Result<()> {
        let InputMessage {
            command_and_args,
            input_line_number,
        } = input_message;

        let Some(command_path) = self
            .command_path_cache
            .resolve_command_path(command_and_args.command_path.clone())
            .await?
        else {
            error!("command path cache error resolving command path: {command_and_args}");
            self.context.command_metrics.increment_spawn_errors();
            return Ok(());
        };

        let command_and_args = command_and_args.with_command_path(command_path);

        self.spawn_command(command_and_args, input_line_number)
            .await?;

        Ok(())
    }

    async fn process_inputs(&self) -> anyhow::Result<()> {
        let mut input_producer =
            InputProducer::new(self.command_line_args, &self.context.progress)?;

        while let Some(input_message) = input_producer.receiver().recv().await {
            self.process_input_message(input_message).await?;
        }

        input_producer.wait_for_completion().await?;

        Ok(())
    }

    #[instrument(name = "CommandService::run_commands", skip_all, level = "debug")]
    pub async fn run_commands(self) -> anyhow::Result<()> {
        debug!("begin run_commands");

        self.process_inputs().await?;

        debug!("before output_writer.wait_for_completion",);

        self.output_writer.wait_for_completion().await?;

        self.context.progress.finish();

        if self.context.command_metrics.error_occurred() {
            anyhow::bail!("command failures: {}", self.context.command_metrics);
        }

        debug!(
            "end run_commands command_metrics = {}",
            self.context.command_metrics
        );

        Ok(())
    }
}

struct CommandRunContext {
    child_process_factory: ChildProcessFactory,
    command_metrics: CommandMetrics,
    progress: Arc<Progress>,
}