rust-parallel 1.22.0

Fast command line app in rust/tokio to run commands in parallel. Similar interface to GNU parallel or xargs.
mod task;

use anyhow::Context;

use tokio::{
    sync::mpsc::{Sender, channel},
    task::JoinHandle,
};

use tracing::{debug, warn};

use std::process::{ExitStatus, Output};

use crate::{
    command_line_args::CommandLineArgs, common::OwnedCommandAndArgs, input::InputLineNumber,
};

#[derive(Debug)]
struct OutputMessage {
    exit_status: ExitStatus,
    stdout: Vec<u8>,
    stderr: Vec<u8>,
    command_and_args: OwnedCommandAndArgs,
    input_line_number: InputLineNumber,
}

pub struct OutputSender {
    sender: Sender<OutputMessage>,
}

impl OutputSender {
    pub async fn send(
        self,
        output: Output,
        command_and_args: OwnedCommandAndArgs,
        input_line_number: InputLineNumber,
    ) {
        if output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
            return;
        }

        let output_message = OutputMessage {
            exit_status: output.status,
            stdout: output.stdout,
            stderr: output.stderr,
            command_and_args,
            input_line_number,
        };

        if let Err(e) = self.sender.send(output_message).await {
            warn!("sender.send error: {}", e);
        }
    }
}

pub struct OutputWriter {
    sender: Sender<OutputMessage>,
    output_task_join_handle: JoinHandle<()>,
}

impl OutputWriter {
    pub fn new(command_line_args: &CommandLineArgs) -> Self {
        let (sender, receiver) = channel(command_line_args.channel_capacity);
        debug!(
            "created output channel with capacity {}",
            command_line_args.channel_capacity,
        );

        let output_task_join_handle =
            tokio::spawn(task::OutputTask::new(receiver, command_line_args.keep_order).run());

        Self {
            sender,
            output_task_join_handle,
        }
    }

    pub fn sender(&self) -> OutputSender {
        OutputSender {
            sender: self.sender.clone(),
        }
    }

    pub async fn wait_for_completion(self) -> anyhow::Result<()> {
        drop(self.sender);

        self.output_task_join_handle
            .await
            .context("OutputWriter::wait_for_completion: output_task_join_handle.await error")?;

        Ok(())
    }
}