rmux-client 0.1.2

Blocking local client and attach-mode plumbing for the RMUX terminal multiplexer.
Documentation
#![allow(dead_code)]

use std::error::Error;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use rmux_proto::SessionName;
use rmux_server::{DaemonConfig, ServerDaemon};
use tokio::runtime::Builder;
use tokio::sync::oneshot;

static UNIQUE_ID: AtomicUsize = AtomicUsize::new(0);
const SERVER_READY_TIMEOUT: Duration = Duration::from_secs(5);

pub(crate) fn start_server(harness: &TestHarness) -> Result<TestServer, Box<dyn Error>> {
    TestServer::start(harness.socket_path().to_path_buf())
}

pub(crate) fn session_name(value: &str) -> SessionName {
    SessionName::new(value).expect("valid session name")
}

pub(crate) struct TestServer {
    shutdown: Option<oneshot::Sender<()>>,
    thread: Option<thread::JoinHandle<Result<(), String>>>,
}

impl TestServer {
    fn start(socket_path: PathBuf) -> Result<Self, Box<dyn Error>> {
        let (ready_tx, ready_rx) = mpsc::channel();
        let (shutdown_tx, shutdown_rx) = oneshot::channel();

        let thread = thread::spawn(move || -> Result<(), String> {
            let runtime = Builder::new_multi_thread()
                .worker_threads(2)
                .enable_all()
                .build()
                .map_err(|error| error.to_string())?;

            runtime.block_on(async move {
                let handle = match ServerDaemon::new(DaemonConfig::new(socket_path))
                    .bind()
                    .await
                {
                    Ok(handle) => handle,
                    Err(error) => {
                        let message = error.to_string();
                        let _ = ready_tx.send(Err(message.clone()));
                        return Err(message);
                    }
                };

                if ready_tx.send(Ok(())).is_err() {
                    return handle.shutdown().await.map_err(|error| error.to_string());
                }

                let _ = shutdown_rx.await;
                handle.shutdown().await.map_err(|error| error.to_string())
            })
        });

        match ready_rx.recv_timeout(SERVER_READY_TIMEOUT) {
            Ok(Ok(())) => Ok(Self {
                shutdown: Some(shutdown_tx),
                thread: Some(thread),
            }),
            Ok(Err(message)) => {
                join_server_thread(thread)?;
                Err(io::Error::other(message).into())
            }
            Err(error) => {
                let _ = shutdown_tx.send(());
                let _ = join_server_thread(thread);
                Err(io::Error::new(
                    io::ErrorKind::TimedOut,
                    format!("server failed to become ready: {error}"),
                )
                .into())
            }
        }
    }

    pub(crate) fn shutdown(&mut self) -> Result<(), Box<dyn Error>> {
        if let Some(shutdown) = self.shutdown.take() {
            let _ = shutdown.send(());
        }

        if let Some(thread) = self.thread.take() {
            join_server_thread(thread)?;
        }

        Ok(())
    }
}

impl Drop for TestServer {
    fn drop(&mut self) {
        let _ = self.shutdown();
    }
}

pub(crate) struct TestHarness {
    root: PathBuf,
    socket_path: PathBuf,
}

impl TestHarness {
    pub(crate) fn new(label: &str) -> Self {
        let unique_id = UNIQUE_ID.fetch_add(1, Ordering::Relaxed);
        let root = PathBuf::from("/tmp").join(format!(
            "rxc-{}-{unique_id}-{}",
            std::process::id(),
            compact_label(label)
        ));
        let socket_path = root.join("s.sock");

        Self { root, socket_path }
    }

    pub(crate) fn socket_path(&self) -> &Path {
        &self.socket_path
    }
}

impl Drop for TestHarness {
    fn drop(&mut self) {
        let _ = std::fs::remove_file(&self.socket_path);
        let _ = std::fs::remove_dir_all(&self.root);
    }
}

fn compact_label(label: &str) -> String {
    label
        .chars()
        .filter(|character| character.is_ascii_alphanumeric())
        .take(12)
        .collect()
}

fn join_server_thread(
    thread: thread::JoinHandle<Result<(), String>>,
) -> Result<(), Box<dyn Error>> {
    thread
        .join()
        .map_err(|_| io::Error::other("server thread panicked"))?
        .map_err(io::Error::other)?;
    Ok(())
}