rmux-client 0.1.1

Blocking local client and attach-mode plumbing for the RMUX terminal multiplexer.
Documentation
use std::io;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use super::{ensure_server_running_with_probe, AutoStartError};
use crate::{ClientError, ConnectResult, Connection};

#[test]
fn auto_start_returns_existing_connection_without_launching() {
    let launch_calls = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        let (client, _server) = UnixStream::pair().expect("create unix stream pair");
        let connection = Connection::new(client).expect("connection with timeout");
        Ok(ConnectResult::Connected(connection))
    };
    let mut launch = || -> Result<(), AutoStartError> {
        launch_calls.fetch_add(1, Ordering::Relaxed);
        Ok(())
    };

    let result = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-existing.sock").as_path(),
        Duration::from_millis(10),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    );

    assert!(result.is_ok(), "connected server should be returned");
    assert_eq!(launch_calls.load(Ordering::Relaxed), 0);
}

#[test]
fn auto_start_launches_then_polls_until_connected() {
    let connect_calls = AtomicUsize::new(0);
    let launch_calls = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        let call = connect_calls.fetch_add(1, Ordering::Relaxed);
        if call < 3 {
            return Ok(ConnectResult::Absent);
        }

        let (client, _server) = UnixStream::pair().expect("create unix stream pair");
        let connection = Connection::new(client).expect("connection with timeout");
        Ok(ConnectResult::Connected(connection))
    };
    let mut launch = || -> Result<(), AutoStartError> {
        launch_calls.fetch_add(1, Ordering::Relaxed);
        Ok(())
    };

    let result = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-poll.sock").as_path(),
        Duration::from_millis(50),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    );

    assert!(result.is_ok(), "poll loop should eventually connect");
    assert_eq!(launch_calls.load(Ordering::Relaxed), 1);
    assert!(
        connect_calls.load(Ordering::Relaxed) >= 4,
        "expected at least initial absent check plus polling retries"
    );
}

#[test]
fn auto_start_propagates_real_connect_errors_without_launching() {
    let launch_calls = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        Err(ClientError::Io(io::Error::new(
            io::ErrorKind::PermissionDenied,
            "permission denied",
        )))
    };
    let mut launch = || -> Result<(), AutoStartError> {
        launch_calls.fetch_add(1, Ordering::Relaxed);
        Ok(())
    };

    let error = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-error.sock").as_path(),
        Duration::from_millis(10),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    )
    .expect_err("real connect error should fail");

    assert!(matches!(
        error,
        AutoStartError::Client(ClientError::Io(ref io_error))
            if io_error.kind() == io::ErrorKind::PermissionDenied
    ));
    assert_eq!(launch_calls.load(Ordering::Relaxed), 0);
}

#[test]
fn auto_start_propagates_real_poll_errors_after_launch() {
    let call_count = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        let call = call_count.fetch_add(1, Ordering::Relaxed);
        if call == 0 {
            return Ok(ConnectResult::Absent);
        }

        Err(ClientError::Io(io::Error::new(
            io::ErrorKind::BrokenPipe,
            "broken pipe",
        )))
    };
    let mut launch = || -> Result<(), AutoStartError> { Ok(()) };

    let error = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-poll-error.sock").as_path(),
        Duration::from_millis(10),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    )
    .expect_err("poll error should fail");

    assert!(matches!(
        error,
        AutoStartError::Client(ClientError::Io(ref io_error))
            if io_error.kind() == io::ErrorKind::BrokenPipe
    ));
}

#[test]
fn auto_start_retries_transient_poll_errors_after_launch() {
    let call_count = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        let call = call_count.fetch_add(1, Ordering::Relaxed);
        match call {
            0 => Ok(ConnectResult::Absent),
            1 | 2 => Err(ClientError::Io(io::Error::from(io::ErrorKind::WouldBlock))),
            _ => {
                let (client, _server) = UnixStream::pair().expect("create unix stream pair");
                let connection = Connection::new(client).expect("connection with timeout");
                Ok(ConnectResult::Connected(connection))
            }
        }
    };
    let mut launch = || -> Result<(), AutoStartError> { Ok(()) };

    let result = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-would-block.sock").as_path(),
        Duration::from_millis(50),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    );

    assert!(result.is_ok(), "transient poll errors should keep polling");
    assert!(
        call_count.load(Ordering::Relaxed) >= 4,
        "expected absent, transient retries, then connected"
    );
}

#[test]
fn auto_start_waits_for_a_ready_response_after_connecting() {
    let connect_call_count = AtomicUsize::new(0);
    let probe_call_count = AtomicUsize::new(0);
    let mut connect = || -> Result<ConnectResult, ClientError> {
        let call = connect_call_count.fetch_add(1, Ordering::Relaxed);
        let (client, server) = UnixStream::pair().expect("create unix stream pair");
        match call {
            0 => Ok(ConnectResult::Absent),
            _ => {
                std::thread::spawn(move || {
                    drop(server);
                });
                let connection = Connection::new(client).expect("connection with timeout");
                Ok(ConnectResult::Connected(connection))
            }
        }
    };
    let mut launch = || -> Result<(), AutoStartError> { Ok(()) };
    let mut probe = |_: &mut Connection| -> Result<(), ClientError> {
        let call = probe_call_count.fetch_add(1, Ordering::Relaxed);
        if call == 0 {
            return Err(ClientError::Io(io::Error::from(io::ErrorKind::WouldBlock)));
        }
        Ok(())
    };

    let result = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-ready.sock").as_path(),
        Duration::from_millis(50),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        &mut probe,
    );

    assert!(
        result.is_ok(),
        "readiness probe should wait for a real response"
    );
    assert!(
        connect_call_count.load(Ordering::Relaxed) >= 3,
        "expected absent, unready connect, then ready connect"
    );
    assert!(
        probe_call_count.load(Ordering::Relaxed) >= 2,
        "expected an unready probe before the ready probe"
    );
}

#[test]
fn auto_start_times_out_if_server_never_appears() {
    let mut connect = || -> Result<ConnectResult, ClientError> { Ok(ConnectResult::Absent) };
    let mut launch = || -> Result<(), AutoStartError> { Ok(()) };
    let socket_path = PathBuf::from("/tmp/rmux-auto-start-timeout.sock");

    let error = ensure_server_running_with_probe(
        socket_path.as_path(),
        Duration::from_millis(10),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    )
    .expect_err("missing server should time out");

    assert!(matches!(
        error,
        AutoStartError::TimedOut {
            ref socket_path,
            waited
        } if socket_path == Path::new("/tmp/rmux-auto-start-timeout.sock")
            && waited == Duration::from_millis(10)
    ));
}

#[test]
fn auto_start_treats_competing_startup_success_as_connected() {
    let connect_results = Arc::new(Mutex::new(vec![
        Ok(ConnectResult::Absent),
        Ok(ConnectResult::Absent),
        Ok(ConnectResult::Connected(
            Connection::new(UnixStream::pair().expect("pair").0).expect("connection with timeout"),
        )),
    ]));
    let launch_calls = AtomicUsize::new(0);
    let connect_results_clone = Arc::clone(&connect_results);
    let mut connect = move || -> Result<ConnectResult, ClientError> {
        connect_results_clone
            .lock()
            .expect("lock results")
            .remove(0)
    };
    let mut launch = || -> Result<(), AutoStartError> {
        launch_calls.fetch_add(1, Ordering::Relaxed);
        Ok(())
    };

    let result = ensure_server_running_with_probe(
        PathBuf::from("/tmp/rmux-auto-start-race.sock").as_path(),
        Duration::from_millis(20),
        Duration::from_millis(1),
        &mut connect,
        &mut launch,
        |_| Ok(()),
    );

    assert!(
        result.is_ok(),
        "polling success should win even if another daemon bound first"
    );
    assert_eq!(launch_calls.load(Ordering::Relaxed), 1);
}