# rust-parallel
Command-line utility to execute commands in parallel and aggregate their output.
Something like a simple rust version of [GNU Parallel](https://www.gnu.org/software/parallel/).
Being written in asynchronous rust it is quite fast - see [benchmarks](https://github.com/aaronriekenberg/rust-parallel/wiki/Benchmarks).
[![Crates.io][crates-badge]][crates-url]
[crates-badge]: https://img.shields.io/crates/v/rust-parallel.svg
[crates-url]: https://crates.io/crates/rust-parallel
# Goals:
* Use only safe rust.
* Use only asynchronous operations supported by [tokio](https://tokio.rs), do not use any blocking operations.
* Support arbitrarily large number of input lines, avoid `O(number of input lines)` memory usage. In support of this:
* [`tokio::sync::Semaphore`](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html) is used carefully to limit the number of commands that run concurrently. Do not spawn tasks for all input lines immediately to limit memory usage.
* Support running commands on local machine only, not on remote machines.
# Tech Stack:
* [anyhow](https://github.com/dtolnay/anyhow) used for application error handling to propogate and format fatal errors.
* [clap](https://docs.rs/clap/latest/clap/) command line argument parser.
* [tokio](https://tokio.rs/) asynchronous runtime for rust. From tokio this app uses:
* `async` / `await` functions (aka coroutines)
* Singleton `CommandLineArgs` instance using [`tokio::sync::OnceCell`](https://docs.rs/tokio/latest/tokio/sync/struct.OnceCell.html).
* Asynchronous command execution using [`tokio::process::Command`](https://docs.rs/tokio/latest/tokio/process/struct.Command.html)
* [`tokio::sync::Semaphore`](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html) used to limit number of commands that run concurrently.
* Life would be a bit easier if `acquire_many` took a `usize` parameter: https://github.com/tokio-rs/tokio/issues/4446
* [`tokio::sync::Mutex`](https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html) used to protect access to stdout/stderr to prevent interleaved command output.
* [tracing](https://docs.rs/tracing/latest/tracing/) used for debug and warning logs.
# Installation:
1. [Install Rust](https://www.rust-lang.org/learn/get-started)
2. Install the latest version of this app from [crates.io](https://crates.io/crates/rust-parallel):
```
$ cargo install rust-parallel
```
# Usage:
```
$ rust-parallel -h
Run commands in parallel
Usage: rust-parallel [OPTIONS] [INPUTS]...
Arguments:
[INPUTS]... Input file or - for stdin. Defaults to stdin if no inputs are specified
Options:
-j, --jobs <JOBS> Maximum number of commands to run in parallel, defauts to num cpus [default: 12]
-s, --shell-enabled Use /bin/sh -c shell to run commands
-h, --help Print help information
-V, --version Print version information
```
# Demos:
Small demo of 5 echo commands. With `-j5` all 5 commands are run in parallel. With `-j1` commands are run sequentially:
```
$ cat >./test <<EOL
# input can contain comment lines (starting with #) and blank lines too
echo hi
echo there
echo how
echo are
echo you
EOL
hi
there
how
you
there
how
are
you
```
Using `awk` to form commands:
```
MD5 ("abaxial") = ac3a53971d52d9ce3277eadf03f13a5e
MD5 ("abaze") = 0b08c52aa63d947b6a5601ee975bc3a4
MD5 ("abaxile") = 21f5fc27d7d34117596e41d8c001087e
MD5 ("abbacomes") = 76640eb0c929bc97d016731bfbe9a4f8
MD5 ("abbacy") = 08aeac72800adc98d2aba540b6195921
MD5 ("Abbadide") = 7add1d6f008790fa6783bc8798d8c803
MD5 ("abb") = ea01e5fd8e4d8832825acdd20eac5104
```
Using as part of a shell pipeline. stdout and stderr from each command run are copied to stdout/stderr of the rust-parallel process.
```
MD5 ("abbacomes") = 76640eb0c929bc97d016731bfbe9a4f8
MD5 ("abbacy") = 08aeac72800adc98d2aba540b6195921
MD5 ("Abbadide") = 7add1d6f008790fa6783bc8798d8c803
```
Using input file. Multiple inputs can be specified, `-` means stdin:
```
$ cat >./test1 <<EOL
echo hi
echo there
echo how
EOL
$ cat >./test2 <<EOL
echo are
echo you
EOL
how
hi
are
you
```
With debug logs enabled:
```
2022-12-27T13:47:06.174003Z DEBUG rust_parallel::command_line_args: command_line_args = CommandLineArgs { jobs: 8, shell_enabled: false, inputs: [] }
2022-12-27T13:47:06.174067Z DEBUG rust_parallel::command: begin run_commands
2022-12-27T13:47:06.174107Z DEBUG rust_parallel::command: begin process_one_input input = Stdin
2022-12-27T13:47:06.174357Z DEBUG rust_parallel::command: trimmed_line # input can contain comment lines (starting with #) and blank lines too
2022-12-27T13:47:06.174381Z DEBUG rust_parallel::command: trimmed_line
2022-12-27T13:47:06.174392Z DEBUG rust_parallel::command: trimmed_line echo hi
2022-12-27T13:47:06.174440Z DEBUG rust_parallel::command: trimmed_line echo there
2022-12-27T13:47:06.174466Z DEBUG rust_parallel::command: trimmed_line echo how
2022-12-27T13:47:06.174485Z DEBUG rust_parallel::command: trimmed_line echo are
2022-12-27T13:47:06.174502Z DEBUG rust_parallel::command: trimmed_line echo you
2022-12-27T13:47:06.174514Z DEBUG rust_parallel::command: begin run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 4 }, command: "echo there", shell_enabled: false }
2022-12-27T13:47:06.174534Z DEBUG rust_parallel::command: begin run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 5 }, command: "echo how", shell_enabled: false }
2022-12-27T13:47:06.174511Z DEBUG rust_parallel::command: begin run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 3 }, command: "echo hi", shell_enabled: false }
2022-12-27T13:47:06.174556Z DEBUG rust_parallel::command: begin run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 6 }, command: "echo are", shell_enabled: false }
2022-12-27T13:47:06.174591Z DEBUG rust_parallel::command: end process_one_input input = Stdin
2022-12-27T13:47:06.174596Z DEBUG rust_parallel::command: begin run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 7 }, command: "echo you", shell_enabled: false }
2022-12-27T13:47:06.175825Z DEBUG rust_parallel::command: before acquire_many command_semaphore = Semaphore { ll_sem: Semaphore { permits: 3 } }
2022-12-27T13:47:06.177347Z DEBUG rust_parallel::command: got command status = exit status: 0
2022-12-27T13:47:06.177352Z DEBUG rust_parallel::command: got command status = exit status: 0
there
2022-12-27T13:47:06.177591Z DEBUG rust_parallel::command: end run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 4 }, command: "echo there", shell_enabled: false }
how
2022-12-27T13:47:06.177768Z DEBUG rust_parallel::command: end run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 5 }, command: "echo how", shell_enabled: false }
2022-12-27T13:47:06.178158Z DEBUG rust_parallel::command: got command status = exit status: 0
are
2022-12-27T13:47:06.178256Z DEBUG rust_parallel::command: got command status = exit status: 0
2022-12-27T13:47:06.178260Z DEBUG rust_parallel::command: got command status = exit status: 0
2022-12-27T13:47:06.178334Z DEBUG rust_parallel::command: end run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 6 }, command: "echo are", shell_enabled: false }
hi
2022-12-27T13:47:06.178503Z DEBUG rust_parallel::command: end run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 3 }, command: "echo hi", shell_enabled: false }
you
2022-12-27T13:47:06.178694Z DEBUG rust_parallel::command: end run command = Command { input_line_number: InputLineNumber { input: Stdin, line_number: 7 }, command: "echo you", shell_enabled: false }
2022-12-27T13:47:06.178771Z DEBUG rust_parallel::command: end run_commands
2022-12-27T13:47:06.178795Z DEBUG rust_parallel: end try_main
```