rmux-server 0.2.0

Tokio daemon and request dispatcher for the RMUX terminal multiplexer.
Documentation
#![cfg(unix)]

mod common;

use std::error::Error;
use std::io;
use std::time::Duration;

use common::{session_name, start_server, ClientConnection, TestHarness, PTY_TEST_LOCK};
use rmux_proto::{
    AttachMessage, AttachSessionRequest, ListClientsRequest, NewSessionRequest, OptionName,
    Request, Response, ScopeSelector, SetOptionMode, SetOptionRequest, SuspendClientRequest,
    TerminalSize,
};
use tokio::io::AsyncReadExt;
use tokio::time::{timeout, Instant};

const STEP_TIMEOUT: Duration = Duration::from_secs(3);

#[tokio::test]
async fn attach_session_emits_status_row_for_single_pane_session() -> Result<(), Box<dyn Error>> {
    let _guard = PTY_TEST_LOCK.lock().await;
    let harness = TestHarness::new("status-attach");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let alpha = session_name("alpha");

    let created = common::send_request(
        &socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: alpha.clone(),
            detached: true,
            size: Some(TerminalSize { cols: 20, rows: 4 }),
            environment: None,
        }),
    )
    .await?;
    assert!(matches!(created, Response::NewSession(_)));

    let (_response, mut attach_stream) = ClientConnection::connect(&socket_path)
        .await?
        .begin_attach(AttachSessionRequest { target: alpha })
        .await?;

    let status_text = read_attach_data_until_contains(&mut attach_stream, "[alpha]").await?;
    assert!(status_text.contains("[alpha]"));
    assert!(status_text.contains("\u{1b}[4;1H"));
    assert!(!status_text.contains(''));

    drop(attach_stream);
    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn attach_session_status_context_populates_session_attached() -> Result<(), Box<dyn Error>> {
    let _guard = PTY_TEST_LOCK.lock().await;
    let harness = TestHarness::new("status-session-attached");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let alpha = session_name("alpha");

    let created = common::send_request(
        &socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: alpha.clone(),
            detached: true,
            size: Some(TerminalSize { cols: 30, rows: 4 }),
            environment: None,
        }),
    )
    .await?;
    assert!(matches!(created, Response::NewSession(_)));

    for (option, value) in [
        (OptionName::StatusLeft, "attached=#{session_attached}"),
        (OptionName::StatusRight, ""),
    ] {
        let response = common::send_request(
            &socket_path,
            &Request::SetOption(SetOptionRequest {
                scope: ScopeSelector::Session(alpha.clone()),
                option,
                value: value.to_owned(),
                mode: SetOptionMode::Replace,
            }),
        )
        .await?;
        assert!(matches!(response, Response::SetOption(_)));
    }

    let (_response, mut attach_stream) = ClientConnection::connect(&socket_path)
        .await?
        .begin_attach(AttachSessionRequest { target: alpha })
        .await?;

    let status_text = read_attach_data_until_contains(&mut attach_stream, "attached=1").await?;
    assert!(status_text.contains("attached=1"));

    drop(attach_stream);
    handle.shutdown().await?;
    Ok(())
}

#[tokio::test]
async fn status_interval_refreshes_time_formats_without_pane_output() -> Result<(), Box<dyn Error>>
{
    let _guard = PTY_TEST_LOCK.lock().await;
    let harness = TestHarness::new("status-interval");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let alpha = session_name("alpha");

    let created = common::send_request(
        &socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: alpha.clone(),
            detached: true,
            size: Some(TerminalSize { cols: 40, rows: 4 }),
            environment: None,
        }),
    )
    .await?;
    assert!(matches!(created, Response::NewSession(_)));

    for (option, value) in [
        (OptionName::StatusInterval, "1"),
        (OptionName::StatusLeft, "[#{session_name}] "),
        (OptionName::StatusRight, "tick=%S"),
    ] {
        let response = common::send_request(
            &socket_path,
            &Request::SetOption(SetOptionRequest {
                scope: ScopeSelector::Session(alpha.clone()),
                option,
                value: value.to_owned(),
                mode: SetOptionMode::Replace,
            }),
        )
        .await?;
        assert!(matches!(response, Response::SetOption(_)));
    }

    let (_response, mut attach_stream) = ClientConnection::connect(&socket_path)
        .await?
        .begin_attach(AttachSessionRequest { target: alpha })
        .await?;

    let first_status = read_attach_data_until_contains(&mut attach_stream, "tick=").await?;
    let first_tick = extract_tick_second(&first_status)
        .ok_or_else(|| io::Error::other(format!("missing first tick in {first_status:?}")))?;
    let deadline = Instant::now() + Duration::from_secs(4);
    let mut output = String::new();

    while Instant::now() < deadline {
        let message = match timeout(
            deadline.saturating_duration_since(Instant::now()),
            read_attach_message(&mut attach_stream),
        )
        .await
        {
            Ok(message) => message?,
            Err(_) => break,
        };
        let Some(message) = message else {
            break;
        };
        if let AttachMessage::Data(bytes) = message {
            output.push_str(&String::from_utf8_lossy(&bytes));
            if extract_tick_second(&output).is_some_and(|tick| tick != first_tick) {
                drop(attach_stream);
                handle.shutdown().await?;
                return Ok(());
            }
        }
    }

    drop(attach_stream);
    handle.shutdown().await?;
    Err(io::Error::other(format!(
        "status interval never refreshed tick from {first_tick}; output was {output:?}"
    ))
    .into())
}

#[tokio::test]
async fn status_interval_does_not_refresh_suspended_attach_client() -> Result<(), Box<dyn Error>> {
    let _guard = PTY_TEST_LOCK.lock().await;
    let harness = TestHarness::new("status-interval-suspend");
    let socket_path = harness.socket_path().to_path_buf();
    let handle = start_server(&harness).await?;
    let alpha = session_name("alpha");

    let created = common::send_request(
        &socket_path,
        &Request::NewSession(NewSessionRequest {
            session_name: alpha.clone(),
            detached: true,
            size: Some(TerminalSize { cols: 40, rows: 4 }),
            environment: None,
        }),
    )
    .await?;
    assert!(matches!(created, Response::NewSession(_)));

    for (option, value) in [
        (OptionName::StatusInterval, "1"),
        (OptionName::StatusLeft, "[#{session_name}] "),
        (OptionName::StatusRight, "tick=%S"),
    ] {
        let response = common::send_request(
            &socket_path,
            &Request::SetOption(SetOptionRequest {
                scope: ScopeSelector::Session(alpha.clone()),
                option,
                value: value.to_owned(),
                mode: SetOptionMode::Replace,
            }),
        )
        .await?;
        assert!(matches!(response, Response::SetOption(_)));
    }

    let (_response, mut attach_stream) = ClientConnection::connect(&socket_path)
        .await?
        .begin_attach(AttachSessionRequest {
            target: alpha.clone(),
        })
        .await?;

    let _initial_status = read_attach_data_until_contains(&mut attach_stream, "tick=").await?;
    let attach_pid = attached_client_pid(&socket_path, &alpha).await?;
    let suspended = common::send_request(
        &socket_path,
        &Request::SuspendClient(SuspendClientRequest {
            target_client: Some(attach_pid),
        }),
    )
    .await?;
    assert!(matches!(suspended, Response::SuspendClient(_)));
    read_attach_until_suspend(&mut attach_stream).await?;

    let deadline = Instant::now() + Duration::from_millis(2200);
    while Instant::now() < deadline {
        let remaining = deadline.saturating_duration_since(Instant::now());
        match timeout(remaining, read_attach_message(&mut attach_stream)).await {
            Ok(Ok(Some(AttachMessage::Data(bytes)))) => {
                drop(attach_stream);
                handle.shutdown().await?;
                return Err(io::Error::other(format!(
                    "suspended attach client received status refresh bytes: {:?}",
                    String::from_utf8_lossy(&bytes)
                ))
                .into());
            }
            Ok(Ok(Some(_))) => {}
            Ok(Ok(None)) | Err(_) => break,
            Ok(Err(error)) => return Err(error),
        }
    }

    drop(attach_stream);
    handle.shutdown().await?;
    Ok(())
}

async fn attached_client_pid(
    socket_path: &std::path::Path,
    session_name: &rmux_proto::SessionName,
) -> Result<String, Box<dyn Error>> {
    match common::send_request(
        socket_path,
        &Request::ListClients(ListClientsRequest {
            format: Some("#{client_pid}".to_owned()),
            filter: None,
            sort_order: None,
            reversed: false,
            target_session: Some(session_name.clone()),
        }),
    )
    .await?
    {
        Response::ListClients(response) => {
            assert_eq!(response.match_count, 1);
            Ok(String::from_utf8_lossy(response.output.stdout())
                .trim()
                .to_owned())
        }
        other => {
            Err(io::Error::other(format!("unexpected list-clients response: {other:?}")).into())
        }
    }
}

async fn read_attach_until_suspend(
    stream: &mut tokio::net::UnixStream,
) -> Result<(), Box<dyn Error>> {
    let deadline = Instant::now() + STEP_TIMEOUT;

    while Instant::now() < deadline {
        let message = match timeout(
            deadline.saturating_duration_since(Instant::now()),
            read_attach_message(stream),
        )
        .await
        {
            Ok(message) => message?,
            Err(_) => break,
        };
        match message {
            Some(AttachMessage::Suspend) => return Ok(()),
            Some(AttachMessage::Data(bytes)) if bytes == [5] => return Ok(()),
            Some(_) => {}
            None => break,
        }
    }

    Err(io::Error::other("attach stream never received suspend control").into())
}

async fn read_attach_data_until_contains(
    stream: &mut tokio::net::UnixStream,
    needle: &str,
) -> Result<String, Box<dyn Error>> {
    let deadline = Instant::now() + STEP_TIMEOUT;
    let mut output = String::new();

    while Instant::now() < deadline {
        let message = match timeout(
            deadline.saturating_duration_since(Instant::now()),
            read_attach_message(stream),
        )
        .await
        {
            Ok(message) => message?,
            Err(_) => break,
        };

        let Some(message) = message else {
            break;
        };

        if let AttachMessage::Data(bytes) = message {
            output.push_str(&String::from_utf8_lossy(&bytes));
            if output.contains(needle) {
                return Ok(output);
            }
        }
    }

    Err(io::Error::other(format!(
        "attach stream never included expected status marker {needle:?}; output was {output:?}"
    ))
    .into())
}

async fn read_attach_message(
    stream: &mut tokio::net::UnixStream,
) -> Result<Option<AttachMessage>, Box<dyn Error>> {
    let mut tag = [0_u8; 1];
    let bytes_read = stream.read(&mut tag).await?;
    if bytes_read == 0 {
        return Ok(None);
    }

    match tag[0] {
        1 => {
            let mut length = [0_u8; 4];
            stream.read_exact(&mut length).await?;
            let payload_len = u32::from_le_bytes(length) as usize;
            let mut payload = vec![0_u8; payload_len];
            stream.read_exact(&mut payload).await?;
            Ok(Some(AttachMessage::Data(payload)))
        }
        2 => {
            let mut size = [0_u8; 4];
            stream.read_exact(&mut size).await?;
            Ok(Some(AttachMessage::Resize(rmux_proto::TerminalSize {
                cols: u16::from_le_bytes([size[0], size[1]]),
                rows: u16::from_le_bytes([size[2], size[3]]),
            })))
        }
        5 => Ok(Some(AttachMessage::Suspend)),
        other => Err(rmux_proto::RmuxError::Decode(format!(
            "unknown attach-stream message tag {other}"
        ))
        .into()),
    }
}

fn extract_tick_second(output: &str) -> Option<String> {
    let start = output.rfind("tick=")? + "tick=".len();
    let tick = output.get(start..start + 2)?;
    tick.bytes()
        .all(|byte| byte.is_ascii_digit())
        .then(|| tick.to_owned())
}