pssh-rs 0.2.7

pssh-rs is a parallel ssh tool written in rust.
use ansi_term::Colour::{Green, Red};
use args::HostInfo;
use rayon::prelude::*;
use ssh2::Session;
use std::io::prelude::*;
use std::io::Write;
use std::net::TcpStream;
use std::sync::mpsc::sync_channel;
use std::time::Duration;
use structopt::StructOpt;

mod args;

struct CommandOutcome {
    exit_status: i32,
    out: Vec<u8>,
    err: Vec<u8>,
}

fn main() -> anyhow::Result<()> {
    let args = args::CommandLineArgs::from_args();
    let hosts = args.get_hosts()?;

    let (sender, receiver) = sync_channel(hosts.len());
    std::thread::scope(|s| {
        s.spawn({
            let hosts = hosts.clone();
            move || {
                let mut command_results = Vec::with_capacity(hosts.len());
                for _ in 0..hosts.len() {
                    command_results.push(None);
                }

                let mut print_index: usize = 0;

                loop {
                    let Ok((index, host, result)) = receiver.recv() else {
                        break;
                    };

                    if !args.keep_stable {
                        print_command_result(host, &result).unwrap();
                        continue;
                    }

                    command_results[index] = Some((host, result));
                    while print_index < command_results.len() {
                        match command_results[print_index] {
                            Some((host, ref result)) => {
                                print_command_result(host, result).unwrap();
                                print_index += 1;
                            }
                            None => {
                                break;
                            }
                        }
                    }
                }
            }
        });

        rayon::ThreadPoolBuilder::new().num_threads(args.num_threads).build_global().unwrap();
        hosts.par_iter().enumerate().for_each(|(index, host)| {
            let result = remote_exec_command(host, &args.command);
            sender.send((index, host, result)).unwrap();
        });

        drop(sender);
    });

    Ok(())
}

fn remote_exec_command(host: &HostInfo, command: &str) -> anyhow::Result<CommandOutcome> {
    let addr = format!("{}:{}", host.host, host.port);
    let tcp = TcpStream::connect_timeout(&addr.parse()?, Duration::from_millis(host.timeout_ms as u64))?;
    let mut sess = Session::new()?;
    sess.set_tcp_stream(tcp);
    sess.set_timeout(host.timeout_ms);
    sess.handshake()?;
    sess.set_timeout(0);
    sess.userauth_password(&host.username, &host.password)?;

    let mut channel = sess.channel_session()?;
    let _ = channel.setenv("PSSH_RS_IP", &host.host);
    let _ = channel.setenv("PSSH_RS_PORT", &host.port.to_string());
    channel.exec(command)?;

    let (mut out, mut err) = (vec![], vec![]);

    std::thread::scope(|s| {
        s.spawn(|| {
            if let Err(err) = channel.stream(0).take(1024 * 1024).read_to_end(&mut out) {
                println!("failed to read: {err}");
            }
        });
        s.spawn(|| {
            if let Err(err) = channel.stderr().take(1024 * 1024).read_to_end(&mut err) {
                println!("failed to read: {err}");
            }
        });
    });

    channel.wait_close()?;

    let exit_status = channel.exit_status()?;
    Ok(CommandOutcome { exit_status, out, err })
}

fn print_command_result(host: &HostInfo, result: &anyhow::Result<CommandOutcome>) -> anyhow::Result<()> {
    let addr = format!("{}:{}", host.host, host.port);

    match result {
        Ok(command_outcome) => {
            print_command_outcome(&addr, command_outcome)?;
        }
        Err(err) => {
            println!("{}", Red.paint(format!("[{addr} ERROR: {err}]")));
        }
    }

    Ok(())
}

fn print_command_outcome(addr: &str, outcomd: &CommandOutcome) -> anyhow::Result<()> {
    if outcomd.exit_status == 0 {
        println!("{}", Green.paint(format!("[{addr} OK]")));
    } else {
        println!("{}", Red.paint(format!("[{addr} ERROR: exit with {}]", outcomd.exit_status)));
    }

    std::io::stdout().write_all(&outcomd.out)?;
    std::io::stdout().write_all(&outcomd.err)?;

    Ok(())
}