rust-parallel 0.1.11

Run commands in parallel
use anyhow::Context;

use awaitgroup::WaitGroup;

use tokio::{
    io::{AsyncBufReadExt, AsyncRead, BufReader},
    process::Command as TokioCommand,
    sync::{OwnedSemaphorePermit, Semaphore},
};

use tracing::{debug, warn};

use std::sync::Arc;

use crate::command_line_args;

#[derive(Debug, Clone, Copy)]
enum Input {
    Stdin,

    File { file_name: &'static str },
}

#[derive(Debug)]
struct Command {
    _input: Input,
    _line_number: u64,
    command: String,
    shell_enabled: bool,
}

impl Command {
    async fn run(self, _permit: OwnedSemaphorePermit, _worker: awaitgroup::Worker) {
        debug!("begin run_command command = {:?}", self);

        let command_output = if self.shell_enabled {
            TokioCommand::new("/bin/sh")
                .args(["-c", &self.command])
                .output()
                .await
        } else {
            let split: Vec<&str> = self.command.split_whitespace().collect();

            let command = split.get(0).expect("invalid command string");
            let args = &split[1..];

            TokioCommand::new(command).args(args).output().await
        };

        match command_output {
            Ok(output) => {
                debug!("got command status = {}", output.status);
                if output.stdout.len() > 0 {
                    print!("{}", &String::from_utf8_lossy(&output.stdout));
                }
                if output.stderr.len() > 0 {
                    eprint!("{}", &String::from_utf8_lossy(&output.stderr));
                }
            }
            Err(e) => {
                warn!("got error running command {:?}: {}", self, e);
            }
        };

        debug!("end run_command command = {:?}", self);
    }
}

pub struct CommandService {
    command_semaphore: Arc<Semaphore>,
    wait_group: WaitGroup,
}

impl CommandService {
    pub fn new() -> Self {
        Self {
            command_semaphore: Arc::new(Semaphore::new(*command_line_args::instance().jobs())),
            wait_group: WaitGroup::new(),
        }
    }

    async fn process_one_input(
        &self,
        input: Input,
        mut reader: BufReader<impl AsyncRead + Unpin>,
    ) -> anyhow::Result<()> {
        debug!("begin process_one_input input = {:?}", input);

        let args = command_line_args::instance();

        let mut line = String::new();
        let mut line_number = 0u64;

        loop {
            line.clear();

            let bytes_read = reader
                .read_line(&mut line)
                .await
                .context("read_line error")?;
            if bytes_read == 0 {
                break;
            }

            line_number += 1;

            let trimmed_line = line.trim();

            debug!("read line {}", trimmed_line);

            if trimmed_line.is_empty() || trimmed_line.starts_with("#") {
                continue;
            }

            let permit = Arc::clone(&self.command_semaphore)
                .acquire_owned()
                .await
                .context("command_semaphore.acquire_owned error")?;

            let worker = self.wait_group.worker();

            let command = Command {
                _input: input,
                _line_number: line_number,
                command: trimmed_line.to_owned(),
                shell_enabled: *args.shell_enabled(),
            };

            tokio::spawn(command.run(permit, worker));
        }

        debug!("end process_one_input input = {:?}", input);

        Ok(())
    }

    pub async fn spawn_commands(self) -> anyhow::Result<WaitGroup> {
        debug!("begin spawn_commands");

        let args = command_line_args::instance();

        let inputs = if args.inputs().is_empty() {
            vec![Input::Stdin]
        } else {
            args.inputs()
                .iter()
                .map(|input_name| {
                    if input_name == "-" {
                        Input::Stdin
                    } else {
                        Input::File {
                            file_name: input_name,
                        }
                    }
                })
                .collect()
        };

        for input in inputs {
            match input {
                Input::Stdin => {
                    let reader = BufReader::new(tokio::io::stdin());

                    self.process_one_input(input, reader).await?;
                }
                Input::File { file_name } => {
                    let file = tokio::fs::File::open(file_name).await.with_context(|| {
                        format!("error opening input file file_name = '{}'", file_name)
                    })?;
                    let reader = BufReader::new(file);

                    self.process_one_input(input, reader).await?;
                }
            }
        }

        debug!("end spawn_commands");

        Ok(self.wait_group)
    }
}