rjrssync 0.1.1

Fast rsync-like tool for incrementally copying files. Runs natively on both Windows and Linux and uses network for communication.
use aes_gcm::aead::{OsRng};
use aes_gcm::{Aes128Gcm, KeyInit, Key};
use log::{debug, error, info, log, trace};
use rust_embed::RustEmbed;
use std::borrow::Cow;
use std::ffi::OsStr;
use std::io::LineWriter;
use std::net::{TcpStream};
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::time::Duration;
use std::{
    fmt::{self, Display},
    io::{BufRead, BufReader, Write},
    process::{ChildStderr, ChildStdin, ChildStdout, Stdio},
    sync::mpsc::{RecvError, SendError},
    thread::JoinHandle,
};
use tempdir::TempDir;

use crate::*;
use crate::encrypted_comms::AsyncEncryptedComms;

/// Abstraction of two-way communication channel between this boss and a doer, which might be
/// remote (communicating over an encrypted TCP connection) or local (communicating via a channel to a background thread).
#[allow(clippy::large_enum_variant)]
pub enum Comms {
    Local {
        debug_name: String, // To identify this Comms against others for debugging, when there are several
        thread: JoinHandle<Result<(), String>>,
        sender: memory_bound_channel::Sender<Command>,
        receiver: memory_bound_channel::Receiver<Response>,
    },
    Remote {
        debug_name: String, // To identify this Comms against others for debugging, when there are several
        ssh_process: std::process::Child,
        // Once the network socket is set up, we don't need to communicate over stdin/stdout any more,
        // but we keep these around anyway in case.
        stdin: LineWriter<ChildStdin>,
        stdout: BufReader<ChildStdout>,
        stderr_reading_thread: JoinHandle<()>,

        encrypted_comms: AsyncEncryptedComms<Command, Response>,
    },
}
impl Comms {
    pub fn get_sender(&self) -> &memory_bound_channel::Sender<Command> {
        match self {
            Comms::Local { sender, .. } => &sender,
            Comms::Remote { encrypted_comms, .. } => &encrypted_comms.sender,
        }
    }

    pub fn get_receiver(&self) -> &memory_bound_channel::Receiver<Response> {
        match self {
            Comms::Local { receiver, .. } => &receiver,
            Comms::Remote { encrypted_comms, .. } => &encrypted_comms.receiver,
        }
    }

    /// This will block if there is not enough capacity in the channel, so
    /// that we don't use up infinite memory if the doer is being slow.
    pub fn send_command(&self, c: Command) -> Result<(), String> {
        trace!("Sending command {:?} to {}", c, &self);
        self.get_sender().send(c).map_err(|_| format!("Lost communication with {}", &self))
    }

    /// Blocks until a response is received, if none if buffered in the channel.
    pub fn receive_response(&self) -> Result<Response, String> {
        trace!("Waiting for response from {}", &self);
        self.get_receiver().recv().map_err(|_| format!("Lost communication with {}", &self))
    }

    /// Never blocks, will return None if the channel is empty.
    pub fn try_receive_response(&self) -> Result<Option<Response>, String>  {
        trace!("Checking for response from {}", &self);
        match self.get_receiver().try_recv() {
            Ok(r) => Ok(Some(r)),
            Err(crossbeam::channel::TryRecvError::Empty) => Ok(None),
            Err(crossbeam::channel::TryRecvError::Disconnected) => Err(format!("Lost communication with {}", &self))
        }
    }

    // Tell the other end (thread or process over network) to shutdown once we're finished.
    // They should exit anyway due to a disconnection (of their channel or stdin), but this
    // gives a cleaner exit without errors.
    pub fn shutdown(self) {
        match self {
            Comms::Local { .. } => {
                // There's not much we can do about an error here, other than log it, which send_command already does, so we ignore any error.
                let _ = self.send_command(Command::Shutdown);
                // Join threads so that they're properly cleaned up including the profiling data
                if let Comms::Local { thread, .. } = self { // Always true, just need to extract the fields
                    if let Err(e) = thread.join().expect("Failed to join local doer thread") {
                        error!("Local doer thread exited with error: {e}");
                    }
                }
            }
            Comms::Remote { ref debug_name, .. } => {
                let _debug_name = debug_name.clone();
                // Synchronise the profiling clocks between local and remote profiling.
                // Do this by sending a special Command which the doer responds to immediately with its local 
                // profiling timer. We then compare that value with our own profiling clock to work out the offset.
                let profiling_offset = if cfg!(feature="profiling") {
                    // Do this a couple of times and take the average
                    let mut samples = vec![];
                    for i in 0..5 {
                        let start = PROFILING_START.elapsed();
                        self.send_command(Command::ProfilingTimeSync).expect("Failed to send profiling time sync");
                        match self.receive_response() {
                            Ok(Response::ProfilingTimeSync(remote_timestamp)) => {
                                let end = PROFILING_START.elapsed();
                                trace!("Profiling sync: start: {:?}, end: {:?}, diff: {:?}, remote: {:?}", start, end, end-start, remote_timestamp);
                                if i >= 2 { // Skip the first two to make sure the doer is ready to go (e.g. code cached)
                                    // Take the average of our local start/end timestamps, assuming that the round trip is symmetrical.
                                    let sample = (start + end) / 2 - remote_timestamp;
                                    trace!("Profiling sync sample: {:?}", sample);
                                    samples.push(sample);
                                }
                            },
                            x => panic!("Unexpected response (expected ProfilingTimeSync): {:?}", x),
                        }
                    }
                    let average = samples.iter().sum::<std::time::Duration>() / samples.len() as u32;
                    trace!("Profiling sync average: {:?}", average);
                    average
                } else {
                    Duration::new(0, 0)
                };

                // There's not much we can do about an error here, other than log it, which send_command already does, so we ignore any error.
                let _ = self.send_command(Command::Shutdown);

                // Shutdown the comms cleanly, potentially getting profiling data at the same time
                if let Comms::Remote { encrypted_comms, mut ssh_process, stdin, stdout, stderr_reading_thread, .. } = self { // This is always true, we just need a way of getting the fields
                    // Wait for remote doers to send back any profiling data, if enabled
                    match encrypted_comms.receiver.recv() {
                        Ok(Response::ProfilingData(x)) => add_remote_profiling(x, _debug_name, profiling_offset),
                        x => error!("Unexpected response as final message (expected ProfilingData): {:?}", x),
                    }

                    encrypted_comms.shutdown();

                    // Wait for the ssh process to cleanly shutdown.
                    // We don't strictly need to do this for most cases, but it's nice to have a clean shutdown.
                    // We do however need to do this when the doer is printing its memory usage, to make sure that we receive it
                    // before closing down ourself.
                    drop(stdin);
                    drop(stdout);
                    debug!("Waiting for stderr_reading_thread");
                    stderr_reading_thread.join().expect("Failed to join stderr_reading_thread");
                    debug!("Waiting for ssh child process");
                    let result = ssh_process.wait();
                    debug!("ssh child process wait result = {:?}", result);
                }
            }
        }
    }
}
impl Display for Comms {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Comms::Local { debug_name, .. } => write!(f, "{}", debug_name),
            Comms::Remote { debug_name, .. } => write!(f, "{}", debug_name),
        }
    }
}

// Sets up communications with the given computer, which may be either remote or local (if remote_hostname is empty).
pub fn setup_comms(
    remote_hostname: &str,
    remote_user: &str,
    remote_port_for_comms: Option<u16>,
    debug_name: String,
    force_redeploy: bool,
) -> Option<Comms> {
    profile_this!(format!("setup_comms {}", debug_name));
    debug!(
        "setup_comms with hostname '{}' and username '{}'. debug_name = {}",
        remote_hostname, remote_user, debug_name
    );

    // If the target is local, then start a thread to handle commands.
    // Use a separate thread to avoid synchronisation with the Boss (and both Source and Dest may be on same PC, so all three in one process),
    // and for consistency with remote doers.
    if remote_hostname.is_empty() {
        debug!("Spawning local thread for {} doer", debug_name);
        let debug_name = "Local ".to_string() + &debug_name + " doer";
        let (command_sender, command_receiver) = memory_bound_channel::new(BOSS_DOER_CHANNEL_MEMORY_CAPACITY);
        let (response_sender, response_receiver) = memory_bound_channel::new(BOSS_DOER_CHANNEL_MEMORY_CAPACITY);
        let thread_builder = thread::Builder::new().name(debug_name.clone());
        let thread = thread_builder.spawn(move || {
            doer_thread_running_on_boss(command_receiver, response_sender)
        }).unwrap();
        return Some(Comms::Local {
            debug_name,
            thread,
            sender: command_sender,
            receiver: response_receiver,
        });
    }

    // We first attempt to run a previously-deployed copy of the program on the remote, to save time.
    // If it exists and is a compatible version, we can use that. Otherwise we deploy a new version
    // and try again
    let mut deploy = force_redeploy;
    for attempt in 0..2 {
        if deploy {
            if deploy_to_remote(remote_hostname, remote_user).is_ok() {
                debug!("Successfully deployed, attempting to run again");
            } else {
                error!("Failed to deploy to remote");
                return None;
            }
        }

        match launch_doer_via_ssh(remote_hostname, remote_user, remote_port_for_comms) {
            SshDoerLaunchResult::FailedToRunSsh => {
                return None; // No point trying again. launch_doer_via_ssh will have logged the error already.
            }
            SshDoerLaunchResult::NotPresentOnRemote
            | SshDoerLaunchResult::HandshakeIncompatibleVersion
                if attempt == 0 =>
            {
                deploy = true; // Will attempt to deploy on next loop iteration
            }
            SshDoerLaunchResult::NotPresentOnRemote
            | SshDoerLaunchResult::HandshakeIncompatibleVersion => {
                // If this happens on the second attempt then something is wrong
                error!("Failed to launch remote doer even after new deployment.");
                return None;
            }
            SshDoerLaunchResult::ExitedUnexpectedly | SshDoerLaunchResult::CommunicationError => {
                // No point trying again. launch_doer_via_ssh will have logged the error already.
                return None;
            }
            SshDoerLaunchResult::Success {
                ssh_process,
                stdin,
                stdout,
                stderr,
                secret_key,
                actual_port,
            } => {
                // Start a background thread to print out log messages from the remote doer,
                // which it can send over its stderr.
                let debug_name_clone = debug_name.clone();
                let stderr_reading_thread = std::thread::spawn(move || remote_doer_logging_thread(stderr, debug_name_clone));

                // Connect to the network port that the doer should be listening on
                let addr = (remote_hostname, actual_port);
                debug!("Connecting to doer over network at {:?}", addr);
                let tcp_connection = {
                    profile_this!("Connecting");
                    match TcpStream::connect(addr) {
                        Ok(t) => {
                            debug!("Connected! {:?}", t);
                            t
                        }
                        Err(e) => {
                            error!("Failed to connect to network port: {}", e);
                            return None;
                        }
                    }
                };

                let debug_comms_name = "Remote ".to_string() + &debug_name;
                return Some(Comms::Remote {
                    debug_name: debug_comms_name.clone(),
                    ssh_process,
                    stdin,
                    stdout,
                    stderr_reading_thread,
                    encrypted_comms: AsyncEncryptedComms::new(
                        tcp_connection,
                        secret_key,
                        0, // Nonce counters must be different, so sender and receiver don't reuse
                        1,
                        ("boss", &debug_comms_name)
                    )
                });
            }
        };
    }
    panic!("Unreachable code");
}

fn remote_doer_logging_thread(mut stderr: BufReader<ChildStderr>, debug_name: String) {
    loop {
        let mut l: String = "".to_string();
        match stderr.read_line(&mut l) {
            Ok(0) => break, // end of stream
            Ok(_) => {
                l.pop(); // Remove the trailing newline
                // Use a custom target to indicate this is from a remote doer in the log output
                // Preserve the log level of the remote messages if possible
                match &l.splitn(4, ' ').collect::<Vec<&str>>()[..] {
                    [timestamp, level_str, target, msg] => {
                        let target = format!("remote {debug_name}: {target}");
                        match log::Level::from_str(level_str) {
                            Ok(level) => log!(target: &target, level, "{} {}", timestamp, msg),
                            Err(_) => debug!(target: &target, "{}", l),
                        }
                    }
                    _ => debug!(target: &format!("remote {debug_name}"), "{}", l),
                }
            }
            Err(_) => break,
        }
    }
}

// Result of launch_doer_via_ssh function.
enum SshDoerLaunchResult {
    /// The ssh process couldn't be started, for example because the ssh executable isn't available on the PATH.
    FailedToRunSsh,
    /// We connected to the remote computer, but couldn't launch rjrssync because it didn't exist.
    /// This would be expected if this computer has never been used as a remote target before.
    NotPresentOnRemote,
    /// ssh exited before the handshake with rjrssync took place. This could be due to many reasons,
    /// for example rjrssync couldn't launch correctly because it bind to a free port.
    ExitedUnexpectedly,
    /// Failed to send/receive some data on stdin/stdout commuicating with the doer.
    CommunicationError,
    /// rjrssync launched successfully on the remote computer, but it reported a version number that
    /// isn't compatible with our version.
    HandshakeIncompatibleVersion,
    /// rjrssync launched successfully on the remote computer, is a compatible version, and is now
    /// listening for an incoming network connection on the actual_port. It has been provided
    /// with a secret shared key for encryption, which is stored here too.
    /// The fields here can be used to communicate with the remote rjrssync via its stdin/stdout,
    /// but we use the network connection for the main data transfer because it is faster (see README.md).
    Success {
        ssh_process: std::process::Child,
        stdin: LineWriter<ChildStdin>,
        stdout: BufReader<ChildStdout>,
        stderr: BufReader<ChildStderr>,
        secret_key: Key<Aes128Gcm>,
        actual_port: u16
    },
}

// Sent from the threads reading stdout and stderr of ssh back to the main thread.
enum OutputReaderThreadMsg {
    Line(String),
    Error(std::io::Error),
    StreamClosed,
    HandshakeStarted(String),
    HandshakeCompleted(String, OutputReaderStream), // Also sends back the stream, so the main thread can take back control
}

#[derive(Clone, Copy, PartialEq)]
enum OutputReaderStreamType {
    Stdout,
    Stderr,
}
impl Display for OutputReaderStreamType {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            OutputReaderStreamType::Stdout => write!(f, "stdout"),
            OutputReaderStreamType::Stderr => write!(f, "stderr"),
        }
    }
}

// To avoid having to use trait objects, this provides a simple abstraction over either a stdout or stderr.
enum OutputReaderStream {
    Stdout(BufReader<ChildStdout>),
    Stderr(BufReader<ChildStderr>),
}
impl OutputReaderStream {
    fn get_type(&self) -> OutputReaderStreamType {
        match self {
            OutputReaderStream::Stdout(_) => OutputReaderStreamType::Stdout,
            OutputReaderStream::Stderr(_) => OutputReaderStreamType::Stderr,
        }
    }
    fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
        match self {
            OutputReaderStream::Stdout(b) => b.read_line(buf),
            OutputReaderStream::Stderr(b) => b.read_line(buf),
        }
    }
}

// Generic thread function for reading from stdout or stderr of the ssh process.
// It sends messages back to the main thread using a channel.
// We need to handle both in the same way - waiting until we complete the handshake indicating that the doer
// copy has started up correctly. We need a handshake on both threads so that each background thread knows when
// it's time to finish and return control of the stream to the main thread.
fn output_reader_thread_main(
    mut stream: OutputReaderStream,
    sender: Sender<(OutputReaderStreamType, OutputReaderThreadMsg)>,
) -> Result<(), SendError<(OutputReaderStreamType, OutputReaderThreadMsg)>> {
    let stream_type = stream.get_type();
    loop {
        let mut l: String = "".to_string();
        // Note we ignore errors on the sender here, as the other end should never have been dropped while it still cares
        // about our messages, but may have dropped if they abandon the ssh process, letting it finish itself.
        match stream.read_line(&mut l) {
            Err(e) => {
                sender.send((stream_type, OutputReaderThreadMsg::Error(e)))?;
                return Ok(());
            }
            Ok(0) => {
                // end of stream
                sender.send((stream_type, OutputReaderThreadMsg::StreamClosed))?;
                return Ok(());
            }
            Ok(_) => {
                l.pop(); // Remove the trailing newline
                if l.starts_with(HANDSHAKE_STARTED_MSG) {
                    // Check the version first before passing the secret, so that we can stop
                    // if it's the wrong version (as the secret exchange protocol may have changed!)
                    sender.send((
                        stream_type,
                        OutputReaderThreadMsg::HandshakeStarted(l),
                    ))?;
                } else if l.starts_with(HANDSHAKE_COMPLETED_MSG) {
                    // Remote end has booted up properly and is ready for network connection.
                    // Finish this thread and return control of the stdout to the main thread, so it can communicate directly
                    sender.send((
                        stream_type,
                        OutputReaderThreadMsg::HandshakeCompleted(l, stream),
                    ))?;
                    return Ok(());
                }
                else {
                    // A line of other content, for example a prompt or error from ssh itself
                    sender.send((stream_type, OutputReaderThreadMsg::Line(l)))?;
                }
            }
        }
    }
}

/// Attempts to launch a remote copy of rjrssync on the given remote computer using ssh.
/// Additionally checks that the remote doer is a compatible version, and is now
/// listening for an incoming network connection on the requested port. It is also provided
/// with a randomly generated secret shared key for encryption, which is returned to the caller
/// for setting up encrypted communication over the network connection.
fn launch_doer_via_ssh(remote_hostname: &str, remote_user: &str, remote_port_for_comms: Option<u16>) -> SshDoerLaunchResult {
    profile_this!();
    let user_prefix = if remote_user.is_empty() {
        "".to_string()
    } else {
        remote_user.to_string() + "@"
    };

    // Forward our logging configuration to the remote doer, so that our logging levels are in sync.
    // Note that forwarding it as an env var is more complicated on Windows (no "ENV=VALUE cmd" syntax), so
    // we use a command-line arg instead.
    // We need to forward both RUST_LOG and also any command-line setting (--quiet/--verbose flags).
    // Unfortunately there isn't a way to reconstruct a log filter string from the current config,
    // so we have to do it manually.
    let log_arg = if let Ok(l) = std::env::var("RUST_LOG") {
        format!(" --log-filter {} ", l)
    } else {
        format!(" --log-filter {} ", log::max_level()) // max_level will be affected by --quiet/--verbose
    };

    // Forward any specific port request to the remote doer
    let port_arg = match remote_port_for_comms {
        Some(p) => format!(" --port {p}"),
        None => "".to_string()
    };

    // Forward memory dumping flag to the remote doer
    let memory_dump_arg = match std::env::var("RJRSSYNC_TEST_DUMP_MEMORY_USAGE") {
        Ok(_) => format!(" --dump-memory-usage"),
        Err(_) => "".to_string()
    };

    // Note we don't cd, so that relative paths for the path specified by the user on the remote
    // will be correct (relative to their ssh default dir, e.g. home dir)
    let doer_args = format!("--doer {} {} {}", log_arg, port_arg, memory_dump_arg);
    // Try launching using both Unix and Windows paths, as we don't know what the remote system is
    // We run a command that doesn't print out anything on both Windows and Linux, so we don't pollute the output
    // (we show all output from ssh, in case it contains prompts etc. that are useful/required for the user to see).
    // Note the \n to send a two-line command - it seems Windows ignores this, but Linux runs it.
    let windows_command = format!("{}rjrssync\\target\\release\\rjrssync.exe {}", REMOTE_TEMP_WINDOWS, doer_args);
    let unix_command = format!("{}rjrssync/target/release/rjrssync {}", REMOTE_TEMP_UNIX, doer_args);
    let remote_command = format!("echo >/dev/null # >nul & {windows_command}\n{unix_command}");
    debug!("Running remote command: {}", remote_command);
    // Note we use the user's existing ssh tool so that their config/settings will be used for
    // logging in to the remote system (as opposed to using an ssh library called from our code).
    let mut ssh_process = match std::process::Command::new("ssh")
        .arg(user_prefix + remote_hostname)
        .arg(remote_command)
        // Note that even though we're piping stdin, ssh still seems able to accept answers to prompts about
        // host key verification and password input somehow.
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(c) => c,
        Err(e) => {
            error!("Error launching ssh: {}", e);
            return SshDoerLaunchResult::FailedToRunSsh;
        }
    };

    // Some of the output from ssh (errors etc.) comes on stderr, so we need to display both stdout and stderr
    // to the user, before the handshake is estabilished.
    debug!("Waiting for remote copy to send version handshake");
    let mut handshake_start_timer = Some(start_timer("Waiting for handshake start"));

    // unwrap is fine here, as the streams should always be available as we piped them all
    let mut ssh_stdin = LineWriter::new(ssh_process.stdin.take().unwrap());
    let ssh_stdout = ssh_process.stdout.take().unwrap();
    let ssh_stderr = ssh_process.stderr.take().unwrap();

    // Spawn a background thread for each stdout and stderr, to process messages we get from ssh
    // and forward them to the main thread. This is easier than some kind of async IO stuff.
    #[allow(clippy::type_complexity)]
    let (sender1, receiver): (
        Sender<(OutputReaderStreamType, OutputReaderThreadMsg)>,
        Receiver<(OutputReaderStreamType, OutputReaderThreadMsg)>,
    ) = mpsc::channel();
    let sender2 = sender1.clone();
    let thread_builder = thread::Builder::new().name("ssh_stdout_reader".to_string());
    thread_builder.spawn(move || {
        output_reader_thread_main(
            OutputReaderStream::Stdout(BufReader::new(ssh_stdout)),
            sender1,
        )
    }).unwrap();
    let thread_builder = thread::Builder::new().name("ssh_stderr_reader".to_string());
    thread_builder.spawn(move || {
        output_reader_thread_main(
            OutputReaderStream::Stderr(BufReader::new(ssh_stderr)),
            sender2,
        )
    }).unwrap();

    // Wait for messages from the background threads which are reading stdout and stderr
    #[derive(Default)]
    struct HandshookStdoutAndStderr {
        stdout: Option<BufReader<ChildStdout>>,
        stderr: Option<BufReader<ChildStderr>>,
        secret_key: Option<Key<Aes128Gcm>>,
    }
    let mut handshook_data = HandshookStdoutAndStderr::default();
    let mut _handshaking_timer = None;
    loop {
        match receiver.recv() {
            Ok((stream_type, OutputReaderThreadMsg::Line(l))) => {
                // Show ssh output to the user, as this might be useful/necessary
                info!("ssh {}: {}", stream_type, l);
                // Check for both the Linux (bash) and Windows (cmd) errors
                if l.contains("No such file or directory") ||
                    l.contains("The system cannot find the path specified") ||
                    l.contains("is not recognized as an internal or external command") {
                    debug!("rjrssync not present on remote computer");
                    // Note the stdin of the ssh will be dropped and this will tidy everything up nicely
                    return SshDoerLaunchResult::NotPresentOnRemote;
                }
            }
            Ok((stream_type, OutputReaderThreadMsg::HandshakeStarted(line))) => {
                handshake_start_timer.take();
                _handshaking_timer = Some(start_timer("Handshaking"));
                debug!("Handshake started on {}: {}", stream_type, line);

                let remote_version = line.split_at(HANDSHAKE_STARTED_MSG.len()).1;
                if remote_version != VERSION.to_string() {
                    debug!(
                        "Remote server has incompatible version ({} vs local version {})",
                        remote_version, VERSION
                    );
                    // Note the stdin of the ssh will be dropped and this will tidy everything up nicely
                    return SshDoerLaunchResult::HandshakeIncompatibleVersion;
                }

                // Generate and send a secret key, so that we can authenticate/encrypt the network connection
                // Only do this once (when stdout has passed the version check, not on stderr too)
                if stream_type == OutputReaderStreamType::Stdout {
                    debug!("Sending secret key");
                    // Note that we generate a new key for each doer, otherwise the nonces would be re-used with the same key
                    let key = Aes128Gcm::generate_key(&mut OsRng);
                    let mut msg = base64::encode(key).as_bytes().to_vec();
                    msg.push(b'\n');
                    if let Err(e) = ssh_stdin.write_all(&msg) {
                        error!("Failed to send secret: {}", e);
                        return SshDoerLaunchResult::CommunicationError;
                    }
                    handshook_data.secret_key = Some(key); // Remember the key - we'll need it too!
                }
            }
            Ok((stream_type, OutputReaderThreadMsg::HandshakeCompleted(line, s))) => {
                debug!("Handshake completed on {}: {}", stream_type, line);
                match s {
                    OutputReaderStream::Stdout(b) => handshook_data.stdout = Some(b),
                    OutputReaderStream::Stderr(b) => handshook_data.stderr = Some(b),
                }

                let actual_port : u16 = match line.split_at(HANDSHAKE_COMPLETED_MSG.len()).1.parse() {
                    Ok(p) => p,
                    Err(e) => {
                        error!("Failed to parse port number from line '{}': {e}", line);
                        return SshDoerLaunchResult::CommunicationError;
                    } 
                };

                // Need to wait for both stdout and stderr to pass the handshake
                if let HandshookStdoutAndStderr { stdout: Some(stdout), stderr: Some(stderr), secret_key: Some(secret_key) } = handshook_data {
                    return SshDoerLaunchResult::Success {
                        ssh_process,
                        stdin: ssh_stdin,
                        stdout,
                        stderr,
                        secret_key,
                        actual_port,
                    };
                };
            }
            Ok((stream_type, OutputReaderThreadMsg::Error(e))) => {
                error!("Error reading from {}: {}", stream_type, e);
                return SshDoerLaunchResult::CommunicationError;
            }
            Ok((stream_type, OutputReaderThreadMsg::StreamClosed)) => {
                debug!("ssh {} closed", stream_type);
            }
            Err(RecvError) => {
                // Both senders have been dropped, i.e. both background threads exited, before
                // we got any expected error (e.g. "No such file or directory"), so we don't know
                // why it exited.
                debug!("Both reader threads done, ssh must have exited. Waiting for process.");
                // Wait for the process to exit, for tidyness
                let result = ssh_process.wait();
                error!("ssh exited unexpectedly with {:?}", result);
                return SshDoerLaunchResult::ExitedUnexpectedly;
            }
        }
    }
}

// This embeds the source code of the program into the executable, so it can be deployed remotely and built on other platforms.
// We only include the src/ folder using this crate, and the Cargo.toml/lock files are included separately.
// This is because the RustEmbed crate doesn't handle well including the whole repository and then filtering out the
// bits we don't want (e.g. target/, .git/), because it walks the entire directory structure before filtering, which means
// that it looks through all the .git and target folders first, which is slow and error prone (intermittent errors possibly
// due to files being deleted partway through the build).
#[derive(RustEmbed)]
#[folder = "src/"]
#[prefix = "src/"] // So that these files are placed in a src/ subfolder when extracted
#[exclude = "bin/*"] // No need to copy testing code
struct EmbeddedSrcFolder;

const EMBEDDED_CARGO_TOML : &'static str = include_str!("../Cargo.toml");
const EMBEDDED_CARGO_LOCK : &[u8] = include_bytes!("../Cargo.lock");

/// Deploys the source code of rjrssync to the given remote computer and builds it, ready to be executed.
fn deploy_to_remote(remote_hostname: &str, remote_user: &str) -> Result<(), ()> {
    // We're about to show a bunch of output from scp/ssh, so this log message may as well be the same severity,
    // so the user knows what's happening.
    info!("Deploying onto '{}'", &remote_hostname); 
    profile_this!();

    let user_prefix = if remote_user.is_empty() {
        "".to_string()
    } else {
        remote_user.to_string() + "@"
    };

    // Copy our embedded source tree to the remote, so we can build it there.
    // (we can't simply copy the binary as it might not be compatible with the remote platform)
    // We use the user's existing ssh/scp tool so that their config/settings will be used for
    // logging in to the remote system (as opposed to using an ssh library called from our code).

    // Extract embedded source code to a temporary local folder
    let local_temp_dir = match TempDir::new("rjrssync") {
        Ok(x) => x,
        Err(e) => {
            error!("Error creating temp dir: {}", e);
            return Err(());
        }
    };
    debug!(
        "Extracting embedded source to local temp dir: {}",
        local_temp_dir.path().display()
    );
    // Add cargo.lock/toml to the list of files we're about to extract.
    // These aren't included in the src/ folder (see EmbeddedSrcFolder comments for reason)
    let mut all_files_iter : Vec<(Cow<'static, str>, Cow<[u8]>)> = EmbeddedSrcFolder::iter().map(
        |p| (p.clone(), EmbeddedSrcFolder::get(&p).unwrap().data)

    ).collect();
    // Cargo.toml has some special processing to remove lines that aren't relevant for remotely deployed
    // copies
    let mut processed_cargo_toml: Vec<u8> = vec![];
    let mut in_non_remote_block = false;
    for line in EMBEDDED_CARGO_TOML.lines() {
        match line {
            "#if NonRemote" => in_non_remote_block = true,
            "#end" => in_non_remote_block = false,
            l => if !in_non_remote_block {
                processed_cargo_toml.extend_from_slice(l.as_bytes());
                processed_cargo_toml.push(b'\n');
            }
        }
    }
    all_files_iter.push((Cow::from("Cargo.toml"), Cow::from(processed_cargo_toml)));
    all_files_iter.push((Cow::from("Cargo.lock"), Cow::from(EMBEDDED_CARGO_LOCK)));
    for (path, contents) in all_files_iter {
        // Add an extra "rjrssync" folder with a fixed name (as opposed to the temp dir, whose name varies), to work around SCP weirdness below.
        let local_temp_path = local_temp_dir.path().join("rjrssync").join(&*path);

        if let Err(e) = std::fs::create_dir_all(local_temp_path.parent().unwrap()) {
            error!("Error creating folders for local temp file '{}': {}", local_temp_path.display(), e);
            return Err(());
        }

        let mut f = match std::fs::File::create(&local_temp_path) {
            Ok(x) => x,
            Err(e) => {
                error!(
                    "Error creating local temp file {}: {}",
                    local_temp_path.display(),
                    e
                );
                return Err(());
            }
        };

        if let Err(e) = f.write_all(&contents) {
            error!("Error writing local temp file: {}", e);
            return Err(());
        }
    }

    // Determine if the target system is Windows or Linux, so that we know where to copy our files to
    // We run a command that doesn't print out anything on both Windows and Linux, so we don't pollute the output
    // (we show all output from ssh, in case it contains prompts etc. that are useful/required for the user to see).
    // Note the \n to send a two-line command - it seems Windows ignores this, but Linux runs it.
    let remote_command = format!("echo >/dev/null # >nul & echo This is a Windows system\necho This is a Linux system");
    debug!("Running remote command: {}", remote_command);
    let os_test_output = match run_process_with_live_output("ssh", &[user_prefix.clone() + remote_hostname, remote_command.to_string()]) {
        Err(e) => {
            error!("Error running ssh: {}", e);
            return Err(());
        }
        Ok(output) if output.exit_status.success() => output.stdout,
        Ok(output) => {
            error!("Error checking remote OS. Exit status from ssh: {}", output.exit_status);
            return Err(());
        }
    };

    // We could check for "linux" in the string, but there are other Unix systems we might want to supoprt e.g. Mac,
    // so we fall back to Linux as a default
    let is_windows = os_test_output.contains("Windows");

    // Deploy to remote target using scp
    // Note we need to deal with the case where the the remote folder doesn't exist, and the case where it does, so
    // we copy into /tmp (which should always exist), rather than directly to /tmp/rjrssync which may or may not
    let source_spec = local_temp_dir.path().join("rjrssync");
    let remote_temp = if is_windows {
        REMOTE_TEMP_WINDOWS
    } else {
        REMOTE_TEMP_UNIX
    };
    let remote_spec = format!("{user_prefix}{remote_hostname}:{remote_temp}");
    debug!("Copying {} to {}", source_spec.display(), remote_spec);
    match run_process_with_live_output("scp", &[OsStr::new("-r"), source_spec.as_os_str(), OsStr::new(&remote_spec)]) {
        Err(e) => {
            error!("Error running scp: {}", e);
            return Err(());
        }
        Ok(s) if s.exit_status.success() => {
            // Good!
        }
        Ok(s) => {
            error!("Error copying source code. Exit status from scp: {}", s.exit_status);
            return Err(());
        }
    };

    // Build the program remotely (using the cargo on the remote system)
    // Note that we could merge this ssh command with the one to run the program once it's built (in launch_doer_via_ssh),
    // but this would make error reporting slightly more difficult as the command in launch_doer_via_ssh is more tricky as
    // we are parsing the stdout, but for the command here we can wait for it to finish easily.
    let cargo_command = format!("cargo build --release{}",
        if cfg!(feature="profiling") {
            " --features=profiling" // If this is a profiling build, then turn on profiling on the remote side too
        } else {
            ""
        });
    let remote_command = if is_windows {
        format!("cd /d {REMOTE_TEMP_WINDOWS}\\rjrssync && {cargo_command}")
    } else {
        // Attempt to load .profile first, as cargo might not be on the PATH otherwise.
        // Still continue even if this fails, as it might not be available on this system.
        // Note that the previous attempt to do this (using $SHELL -lc '...') wasn't as good as it runs bashrc,
        // which is only meant for interative shells, but .profile is meant for scripts too.
        format!("source ~/.profile; cd {REMOTE_TEMP_UNIX}/rjrssync && {cargo_command}")
    };
    debug!("Running remote command: {}", remote_command);
    match run_process_with_live_output("ssh", &[user_prefix + remote_hostname, remote_command]) {
        Err(e) => {
            error!("Error running ssh: {}", e);
            return Err(());
        }
        Ok(s) if s.exit_status.success() => {
            // Good!
        }
        Ok(s) => {
            error!("Error building on remote. Exit status from ssh: {}", s.exit_status);
            return Err(());
        }
    };

    Ok(())
}

struct ProcessOutput {
    exit_status: std::process::ExitStatus,
    stdout: String,
    #[allow(unused)]
    stderr: String,
}

/// Runs a child processes and waits for it to exit. The stdout and stderr of the child process
/// are captured and forwarded to our own, with a prefix to indicate that they're from the child.
// We capture and then forward stdout and stderr, so the user can see what's happening and if there are any errors.
// Simply letting the child process inherit out stdout/stderr seems to cause problems with line endings getting messed
// up and losing output, and unwanted clearing of the screen.
// This is especially important if using --force-redeploy on a broken remote, as you don't see any errors from the initial
// attempt to connect either
fn run_process_with_live_output<I, S>(program: &str, args: I) -> Result<ProcessOutput, String>
where
    I: IntoIterator<Item = S>,
    S: AsRef<OsStr>
{
    let mut c = std::process::Command::new(program);
    let c = c.args(args);
    debug!("Running {:?} {:?}...", c.get_program(), c.get_args());

    let mut child = match c
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
    {
        Ok(c) => c,
        Err(e) => return Err(format!("Error launching {}: {}", program, e)),
    };

    // unwrap is fine here, as the streams should always be available as we piped them all
    let child_stdout = child.stdout.take().unwrap();
    let child_stderr = child.stderr.take().unwrap();

    // Spawn a background thread for each stdout and stderr, to process messages we get from the child
    // and forward them to the main thread. This is easier than some kind of async IO stuff.
    let (sender1, receiver): (
        Sender<OutputReaderThreadMsg2>,
        Receiver<OutputReaderThreadMsg2>,
    ) = mpsc::channel();
    let sender2 = sender1.clone();
    let thread_builder = thread::Builder::new().name("child_stdout_reader".to_string());
    thread_builder.spawn(move || {
        output_reader_thread_main2(child_stdout, OutputReaderStreamType::Stdout, sender1)
    }).unwrap();
    let thread_builder = thread::Builder::new().name("child_stderr_reader".to_string());
    thread_builder.spawn(move || {
        output_reader_thread_main2(child_stderr, OutputReaderStreamType::Stderr, sender2)
    }).unwrap();

    let mut captured_stdout = String::new();
    let mut captured_stderr = String::new();
    loop {
        match receiver.recv() {
            Ok(OutputReaderThreadMsg2::Line(stream_type, l)) => {
                // Show output to the user, as this might be useful/necessary
                info!("{} {}: {}", program, stream_type, l);
                match stream_type {
                    OutputReaderStreamType::Stdout => captured_stdout += &(l + "\n"),
                    OutputReaderStreamType::Stderr => captured_stderr += &(l + "\n"),
                }
            }
            Ok(OutputReaderThreadMsg2::Error(stream_type, e)) => {
                return Err(format!("Error reading from {} {}: {}", program, stream_type, e));
            }
            Ok(OutputReaderThreadMsg2::StreamClosed(stream_type)) => {
                debug!("{} {} closed", program, stream_type);
            }
            Err(RecvError) => {
                // Both senders have been dropped, i.e. both background threads exited
                debug!("Both reader threads done, child process must have exited. Waiting for process.");
                // Wait for the process to exit, to get the exit code
                let result = match child.wait() {
                    Ok(r) => r,
                    Err(e) => return Err(format!("Error waiting for {}: {}", program, e)),
                };
                return Ok (ProcessOutput { exit_status: result, stdout: captured_stdout, stderr: captured_stderr });
            }
        }
    }
}

// Sent from the threads reading stdout and stderr of a child process back to the main thread.
enum OutputReaderThreadMsg2 {
    Line(OutputReaderStreamType, String),
    Error(OutputReaderStreamType, std::io::Error),
    StreamClosed(OutputReaderStreamType),
}

fn output_reader_thread_main2<S>(
    stream: S,
    stream_type: OutputReaderStreamType,
    sender: Sender<OutputReaderThreadMsg2>,
) -> Result<(), SendError<OutputReaderThreadMsg2>>
where S : std::io::Read {
    let mut reader = BufReader::new(stream);
    loop {
        let mut l: String = "".to_string();
        // Note we ignore errors on the sender here, as the other end should never have been dropped while it still cares
        // about our messages, but may have dropped if they abandon the child process, letting it finish itself.
        match reader.read_line(&mut l) {
            Err(e) => {
                sender.send(OutputReaderThreadMsg2::Error(stream_type, e))?;
                return Ok(());
            }
            Ok(0) => {
                // end of stream
                sender.send(OutputReaderThreadMsg2::StreamClosed(stream_type))?;
                return Ok(());
            }
            Ok(_) => {
                l.pop(); // Remove the trailing newline
                // A line of other content, for example a prompt or error from ssh itself
                sender.send(OutputReaderThreadMsg2::Line(stream_type, l))?;
            }
        }
    }
}