rust-parallel 1.22.0

Fast command line app in rust/tokio to run commands in parallel. Similar interface to GNU parallel or xargs.
use anyhow::Context;

use tokio::sync::mpsc::Sender;

use tracing::{debug, instrument, warn};

use std::sync::Arc;

use crate::{command_line_args::CommandLineArgs, parser::Parsers, progress::Progress};

use super::{BufferedInput, Input, InputList, InputMessage, buffered_reader::BufferedInputReader};

pub struct InputTask {
    sender: Sender<InputMessage>,
    command_line_args: &'static CommandLineArgs,
    progress: Arc<Progress>,
    parsers: Parsers,
}

impl InputTask {
    pub fn new(
        command_line_args: &'static CommandLineArgs,
        sender: Sender<InputMessage>,
        progress: &Arc<Progress>,
    ) -> anyhow::Result<Self> {
        let parsers = Parsers::new(command_line_args)?;
        Ok(Self {
            sender,
            command_line_args,
            progress: Arc::clone(progress),
            parsers,
        })
    }

    async fn send(&self, input_message: InputMessage) {
        self.progress.increment_total_commands(1);

        if let Err(e) = self.sender.send(input_message).await {
            warn!("input sender send error: {}", e);
        }
    }

    #[instrument(
        name = "InputTask::process_buffered_input",
        skip_all,
        fields(
            buffered_input = %buffered_input
        ),
        level = "debug"
    )]
    async fn process_buffered_input(&self, buffered_input: BufferedInput) -> anyhow::Result<()> {
        debug!("begin process_buffered_input");

        let mut input_reader =
            BufferedInputReader::new(buffered_input, self.command_line_args).await?;

        let input = Input::Buffered(buffered_input);

        let parser = self.parsers.buffered_input_line_parser();

        loop {
            match input_reader
                .next_segment()
                .await
                .context("next_segment error")?
            {
                Some((line_number, segment)) => {
                    if let Some(command_and_args) = parser.parse_segment(segment) {
                        let input_message = (command_and_args, input, line_number.into()).into();
                        self.send(input_message).await;
                    }
                }
                None => {
                    debug!("input_reader.next_segment EOF");
                    break;
                }
            }
        }

        debug!("end process_buffered_input");

        Ok(())
    }

    #[instrument(
        name = "InputTask::process_command_line_args_input",
        skip_all,
        level = "debug"
    )]
    async fn process_command_line_args_input(&self) {
        debug!("begin process_command_line_args_input");

        let parser = self.parsers.command_line_args_parser();

        let input = Input::CommandLineArgs;

        let num_argument_groups = parser.num_argument_groups();

        for i in 0..num_argument_groups {
            let line_number = i + 1;

            if let Some(command_and_args) = parser.parse_next_argument_group() {
                let input_message = (command_and_args, input, line_number.into()).into();
                self.send(input_message).await;
            }
        }

        debug!("end process_command_line_args_input");
    }

    #[instrument(name = "InputTask::process_pipe_input", skip_all, level = "debug")]
    async fn process_pipe_input(&self) -> anyhow::Result<()> {
        debug!("begin process_pipe_input");

        let input = Input::Buffered(BufferedInput::Stdin);

        let mut input_reader =
            BufferedInputReader::new(BufferedInput::Stdin, self.command_line_args).await?;

        let parser = self.parsers.pipe_mode_parser();

        let mut range_start = 1usize;
        let mut range_end = range_start;

        loop {
            match input_reader
                .next_segment()
                .await
                .context("next_segment error")?
            {
                Some((line_number, segment)) => {
                    range_end = line_number;
                    if let Some(command_and_args) = parser.parse_segment(segment) {
                        let input_message =
                            (command_and_args, input, (range_start, range_end).into()).into();
                        self.send(input_message).await;
                        range_start = range_end + 1;
                    }
                }
                None => {
                    debug!("input_reader.next_segment EOF");
                    if let Some(command_and_args) = parser.parse_last_command() {
                        let input_message =
                            (command_and_args, input, (range_start, range_end).into()).into();
                        self.send(input_message).await;
                    }
                    break;
                }
            }
        }

        Ok(())
    }

    #[instrument(skip_all, name = "InputTask::run", level = "debug")]
    pub async fn run(self) {
        debug!("begin run");

        match super::build_input_list(self.command_line_args) {
            InputList::Buffered(buffered_inputs) => {
                for buffered_input in buffered_inputs {
                    self.process_buffered_input(buffered_input)
                        .await
                        .unwrap_or_else(|e| {
                            warn!(
                                "process_buffered_input error buffered_input = {}: {}",
                                buffered_input, e
                            );
                        });
                }
            }
            InputList::CommandLineArgs => self.process_command_line_args_input().await,
            InputList::Pipe => self.process_pipe_input().await.unwrap_or_else(|e| {
                warn!("process_pipe_input error: {}", e);
            }),
        }

        debug!("end run");
    }
}