rmux-server 0.1.1

Tokio daemon and request dispatcher for the RMUX terminal multiplexer.
Documentation
#![cfg(unix)]

use std::error::Error;
use std::fs;
use std::io;
use std::path::Path;
use std::time::Duration;

mod common;

use common::{
    read_response_exact, send_request, session_name, start_server, ClientConnection, TestHarness,
};
use rmux_proto::{
    encode_frame, AttachSessionRequest, ErrorResponse, HookLifecycle, HookName, KillSessionRequest,
    NewSessionRequest, Request, Response, RmuxError, ScopeSelector, SetHookRequest, TerminalSize,
};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio::time::{sleep, timeout};

const ATTACH_SETTLE_DELAY: Duration = Duration::from_millis(50);
const STEP_TIMEOUT: Duration = Duration::from_millis(150);
const WAIT_TIMEOUT: Duration = Duration::from_secs(2);

#[tokio::test]
async fn persistent_client_attached_hooks_run_on_every_attach() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("persistent-client-attached-hook");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let output_path = hook_output_path(&socket_path);

    create_session(&socket_path, "alpha").await?;
    register_hook(
        &socket_path,
        ScopeSelector::Global,
        "printf ab >> {path} && printf cd >> {path}",
        &output_path,
        HookLifecycle::Persistent,
    )
    .await?;

    attach_once(&socket_path, "alpha").await?;
    attach_once(&socket_path, "alpha").await?;
    wait_for_file_contents(&output_path, "abcdabcd").await?;

    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn one_shot_client_attached_hooks_are_removed_after_dispatch() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("oneshot-client-attached-hook");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let output_path = hook_output_path(&socket_path);

    create_session(&socket_path, "keepalive").await?;
    create_session(&socket_path, "alpha").await?;
    register_hook(
        &socket_path,
        ScopeSelector::Session(session_name("alpha")),
        "printf once >> {path}",
        &output_path,
        HookLifecycle::OneShot,
    )
    .await?;

    attach_once(&socket_path, "alpha").await?;
    attach_once(&socket_path, "alpha").await?;
    wait_for_file_contents(&output_path, "once").await?;
    sleep(Duration::from_millis(100)).await;
    assert_eq!(fs::read_to_string(&output_path)?, "once");

    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn session_scoped_hooks_are_cleared_when_sessions_are_killed() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("session-hook-cleanup-on-kill");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let output_path = hook_output_path(&socket_path);

    create_session(&socket_path, "keepalive")
        .await
        .map_err(|error| format!("create keepalive: {error}"))?;
    create_session(&socket_path, "alpha")
        .await
        .map_err(|error| format!("create alpha before hook: {error}"))?;
    register_hook(
        &socket_path,
        ScopeSelector::Session(session_name("alpha")),
        "printf stale >> {path}",
        &output_path,
        HookLifecycle::Persistent,
    )
    .await
    .map_err(|error| format!("register alpha hook: {error}"))?;
    kill_session(&socket_path, "alpha")
        .await
        .map_err(|error| format!("kill alpha: {error}"))?;
    create_session(&socket_path, "alpha")
        .await
        .map_err(|error| format!("recreate alpha: {error}"))?;

    attach_once(&socket_path, "alpha")
        .await
        .map_err(|error| format!("attach recreated alpha: {error}"))?;
    sleep(Duration::from_millis(100)).await;
    assert!(
        !output_path.exists(),
        "recreated sessions must not inherit prior session-scoped hooks"
    );

    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn session_created_hooks_run_only_after_successful_creates() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("session-created-hook");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let output_path = hook_output_path(&socket_path);

    register_hook_for(
        &socket_path,
        ScopeSelector::Global,
        HookName::SessionCreated,
        "printf created > {path}",
        &output_path,
        HookLifecycle::Persistent,
    )
    .await?;

    create_session(&socket_path, "alpha").await?;
    wait_for_file_contents(&output_path, "created").await?;
    fs::remove_file(&output_path)?;

    let duplicate = send_request(
        &socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: session_name("alpha"),
            detached: true,
            size: Some(TerminalSize { cols: 80, rows: 24 }),
            environment: None,
        }),
    )
    .await?;
    assert!(matches!(duplicate, Response::Error(_)));
    sleep(Duration::from_millis(100)).await;
    assert!(
        !output_path.exists(),
        "failed session creates must not run session-created hooks"
    );

    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn slow_hooks_do_not_block_attach_completion() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("nonblocking-client-attached-hook");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let output_path = hook_output_path(&socket_path);

    create_session(&socket_path, "alpha").await?;
    register_hook(
        &socket_path,
        ScopeSelector::Global,
        "sleep 0.3; printf ready > {path}",
        &output_path,
        HookLifecycle::Persistent,
    )
    .await?;

    let (_, attach_stream) = timeout(STEP_TIMEOUT, async {
        ClientConnection::connect(&socket_path)
            .await?
            .begin_attach(AttachSessionRequest {
                target: session_name("alpha"),
            })
            .await
    })
    .await??;

    assert!(
        !output_path.exists(),
        "slow hook should not complete before attach returns"
    );
    wait_for_file_contents(&output_path, "ready").await?;

    drop(attach_stream);
    sleep(ATTACH_SETTLE_DELAY).await;
    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn invalid_hook_event_wire_values_are_rejected() -> Result<(), Box<dyn Error>> {
    let harness = TestHarness::new("invalid-hook-event");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let mut stream = UnixStream::connect(&socket_path).await?;
    let mut frame = encode_frame(&Request::SetHook(SetHookRequest {
        scope: ScopeSelector::Global,
        hook: HookName::ClientAttached,
        command: "true".to_owned(),
        lifecycle: HookLifecycle::Persistent,
    }))?;

    assert_eq!(&frame[12..16], &[0, 0, 0, 0]);
    frame[12..16].copy_from_slice(&70_u32.to_le_bytes());
    stream.write_all(&frame).await?;

    match read_response_exact(&mut stream).await? {
        Response::Error(ErrorResponse {
            error: RmuxError::Decode(message),
        }) => assert!(
            message.contains("variant"),
            "expected enum-variant decode failure, received: {message}"
        ),
        other => panic!("unexpected response for invalid hook event: {other:?}"),
    }

    create_session(&socket_path, "alpha").await?;
    handle.shutdown().await?;
    Ok(())
}

async fn create_session(socket_path: &Path, name: &str) -> Result<(), Box<dyn Error>> {
    let response = send_request(
        socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: session_name(name),
            detached: true,
            size: Some(TerminalSize { cols: 80, rows: 24 }),
            environment: None,
        }),
    )
    .await?;

    assert!(matches!(response, Response::NewSession(_)));
    Ok(())
}

async fn kill_session(socket_path: &Path, name: &str) -> Result<(), Box<dyn Error>> {
    let response = send_request(
        socket_path,
        &Request::KillSession(KillSessionRequest {
            target: session_name(name),
            kill_all_except_target: false,
            clear_alerts: false,
        }),
    )
    .await?;

    assert_eq!(
        response,
        Response::KillSession(rmux_proto::KillSessionResponse { existed: true })
    );
    Ok(())
}

async fn register_hook(
    socket_path: &Path,
    scope: ScopeSelector,
    command_template: &str,
    output_path: &Path,
    lifecycle: HookLifecycle,
) -> Result<(), Box<dyn Error>> {
    register_hook_for(
        socket_path,
        scope,
        HookName::ClientAttached,
        command_template,
        output_path,
        lifecycle,
    )
    .await
}

async fn register_hook_for(
    socket_path: &Path,
    scope: ScopeSelector,
    hook: HookName,
    command_template: &str,
    output_path: &Path,
    lifecycle: HookLifecycle,
) -> Result<(), Box<dyn Error>> {
    let command = command_template.replace("{path}", &shell_quote(output_path));
    let response = send_request(
        socket_path,
        &Request::SetHook(SetHookRequest {
            scope: scope.clone(),
            hook,
            command,
            lifecycle,
        }),
    )
    .await?;

    assert_eq!(
        response,
        Response::SetHook(rmux_proto::SetHookResponse {
            scope,
            hook,
            lifecycle,
        })
    );
    Ok(())
}

async fn attach_once(socket_path: &Path, name: &str) -> Result<(), Box<dyn Error>> {
    let (_, attach_stream) = ClientConnection::connect(socket_path)
        .await?
        .begin_attach(AttachSessionRequest {
            target: session_name(name),
        })
        .await?;

    drop(attach_stream);
    sleep(ATTACH_SETTLE_DELAY).await;
    Ok(())
}

fn hook_output_path(socket_path: &Path) -> std::path::PathBuf {
    socket_path
        .parent()
        .expect("test harness socket path has a parent directory")
        .join("hook-output.txt")
}

fn shell_quote(path: &Path) -> String {
    format!("'{}'", path.display().to_string().replace('\'', "'\\''"))
}

async fn wait_for_file_contents(path: &Path, expected: &str) -> Result<(), Box<dyn Error>> {
    for _ in 0..100 {
        match fs::read_to_string(path) {
            Ok(contents) if contents == expected => return Ok(()),
            Ok(_) | Err(_) => sleep(Duration::from_millis(20)).await,
        }
    }

    Err(io::Error::other(format!(
        "file '{}' never reached expected contents '{expected}' within {WAIT_TIMEOUT:?}",
        path.display()
    ))
    .into())
}