rmux-server 0.1.1

Tokio daemon and request dispatcher for the RMUX terminal multiplexer.
Documentation
use super::*;

#[tokio::test]
async fn forward_attach_clears_persistent_overlay_with_fresh_switch_frame() {
    let handler = Arc::new(RequestHandler::new());
    let session_name = SessionName::new("alpha").expect("valid session name");
    let (stream, mut peer) = tokio::net::UnixStream::pair().expect("attach stream pair");
    let (shutdown_tx, shutdown_rx) = watch::channel(());
    let (control_tx, control_rx) = mpsc::unbounded_channel();
    let closing = Arc::new(AtomicBool::new(false));
    let live_input = LiveAttachInputContext {
        handler,
        attach_pid: std::process::id(),
    };

    let attach_task = tokio::spawn(forward_attach(
        stream,
        test_attach_target(&session_name, b"BASE-OLD", None),
        Vec::new(),
        shutdown_rx,
        control_rx,
        closing,
        Arc::new(AtomicU64::new(0)),
        live_input,
    ));

    let initial = read_attach_data_until(&mut peer, b"BASE-OLD").await;
    assert!(
        String::from_utf8_lossy(&initial).contains("BASE-OLD"),
        "initial attach should render the base pane"
    );

    control_tx
        .send(AttachControl::Overlay(OverlayFrame::persistent_with_state(
            b"MENU-OLD".to_vec(),
            0,
            1,
            7,
        )))
        .expect("send initial persistent overlay");
    let _ = read_attach_data_until(&mut peer, b"MENU-OLD").await;

    control_tx
        .send(AttachControl::AdvancePersistentOverlayState(8))
        .expect("send overlay state advance");
    control_tx
        .send(AttachControl::Overlay(OverlayFrame::persistent_with_state(
            Vec::new(),
            0,
            2,
            8,
        )))
        .expect("send persistent overlay clear");
    control_tx
        .send(AttachControl::switch(test_attach_target(
            &session_name,
            b"BASE-FRESH",
            None,
        )))
        .expect("send refreshed attach target");

    let refresh = read_attach_data_until(&mut peer, b"BASE-FRESH").await;
    let refresh_text = String::from_utf8_lossy(&refresh);
    assert!(
        !refresh_text.contains("BASE-OLD"),
        "overlay teardown must not paint stale base before the fresh switch: {refresh_text:?}"
    );

    shutdown_tx.send(()).expect("request attach shutdown");
    let result = attach_task.await.expect("attach task join");
    assert!(
        result.is_ok(),
        "forward_attach should stay healthy: {result:?}"
    );
}

#[tokio::test]
async fn forward_attach_does_not_paint_stale_base_while_overlay_dismiss_refresh_is_pending() {
    let handler = Arc::new(RequestHandler::new());
    let session_name = SessionName::new("alpha").expect("valid session name");
    let (stream, mut peer) = tokio::net::UnixStream::pair().expect("attach stream pair");
    let (shutdown_tx, shutdown_rx) = watch::channel(());
    let (control_tx, control_rx) = mpsc::unbounded_channel();
    let closing = Arc::new(AtomicBool::new(false));
    let live_input = LiveAttachInputContext {
        handler,
        attach_pid: std::process::id(),
    };

    let attach_task = tokio::spawn(forward_attach(
        stream,
        test_attach_target(&session_name, b"STALE-BASE", None),
        Vec::new(),
        shutdown_rx,
        control_rx,
        closing,
        Arc::new(AtomicU64::new(0)),
        live_input,
    ));

    let _ = read_attach_data_until(&mut peer, b"STALE-BASE").await;
    control_tx
        .send(AttachControl::Overlay(OverlayFrame::persistent_with_state(
            b"MENU-OLD".to_vec(),
            0,
            1,
            7,
        )))
        .expect("send initial persistent overlay");
    let _ = read_attach_data_until(&mut peer, b"MENU-OLD").await;

    control_tx
        .send(AttachControl::AdvancePersistentOverlayState(8))
        .expect("send overlay state advance");
    let pending_bytes = read_attach_data_for(&mut peer, Duration::from_millis(100)).await;
    let pending_text = String::from_utf8_lossy(&pending_bytes);
    assert!(
        !pending_text.contains("STALE-BASE"),
        "state advance must wait for the fresh switch instead of repainting a stale base: {pending_text:?}"
    );

    control_tx
        .send(AttachControl::switch(test_attach_target(
            &session_name,
            b"FRESH-BASE",
            None,
        )))
        .expect("send refreshed attach target");
    let refresh = read_attach_data_until(&mut peer, b"FRESH-BASE").await;
    let refresh_text = String::from_utf8_lossy(&refresh);
    assert!(
        !refresh_text.contains("STALE-BASE"),
        "overlay teardown must be resolved by the fresh switch: {refresh_text:?}"
    );

    shutdown_tx.send(()).expect("request attach shutdown");
    let result = attach_task.await.expect("attach task join");
    assert!(
        result.is_ok(),
        "forward_attach should stay healthy: {result:?}"
    );
}

async fn read_attach_data_for(peer: &mut tokio::net::UnixStream, duration: Duration) -> Vec<u8> {
    let mut collected = Vec::new();
    let mut frame_bytes = [0_u8; 4096];
    let mut decoder = AttachFrameDecoder::new();
    let deadline = tokio::time::sleep(duration);
    tokio::pin!(deadline);

    loop {
        tokio::select! {
            _ = &mut deadline => break,
            result = peer.read(&mut frame_bytes) => {
                let bytes_read = result.expect("read peer bytes");
                if bytes_read == 0 {
                    break;
                }
                decoder.push_bytes(&frame_bytes[..bytes_read]);
                while let Some(message) = decoder.next_message().expect("decode attach frame") {
                    if let AttachMessage::Data(bytes) = message {
                        collected.extend_from_slice(&bytes);
                    }
                }
            }
        }
    }

    collected
}