interprocess 2.4.0

Interprocess communication toolkit
Documentation
use {
    super::drive_server,
    crate::{
        os::windows::named_pipe::{
            pipe_mode, DuplexPipeStream, PipeListener, PipeMode, RecvPipeStream, SendPipeStream,
        },
        tests::util::*,
        SubUsizeExt,
    },
    color_eyre::eyre::{ensure, WrapErr},
    recvmsg::{MsgBuf, RecvMsg, RecvResult},
    std::{
        str,
        sync::{
            mpsc::{Receiver, Sender},
            Arc,
        },
    },
};

fn msgs(server: bool) -> [Box<str>; 2] {
    [
        message(Some(format_args!("First")), server, None),
        message(Some(format_args!("Second")), server, None),
    ]
}
fn futf8(m: &[u8]) -> TestResult<&str> {
    str::from_utf8(m).context("received message was not valid UTF-8")
}

fn handle_conn_duplex(
    listener: &mut PipeListener<pipe_mode::Messages, pipe_mode::Messages>,
) -> TestResult {
    let (mut recver, mut sender) = listener.accept().opname("accept")?.split();

    let [msg1, msg2] = msgs(false);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)?;

    let [msg1, msg2] = msgs(true);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)?;

    DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
    Ok(())
}
fn handle_conn_cts(
    listener: &mut PipeListener<pipe_mode::Messages, pipe_mode::None>,
) -> TestResult {
    let mut recver = listener.accept().opname("accept")?;
    let [msg1, msg2] = msgs(false);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)
}
fn handle_conn_stc(
    listener: &mut PipeListener<pipe_mode::None, pipe_mode::Messages>,
) -> TestResult {
    let mut sender = listener.accept().opname("accept")?;
    let [msg1, msg2] = msgs(true);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)
}

pub fn server_duplex(
    id: &str,
    name_sender: Sender<Arc<str>>,
    num_clients: u32,
    doa_sync: Receiver<()>,
) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_duplex::<pipe_mode::Messages>(),
        handle_conn_duplex,
        doa_sync,
    )
}
pub fn server_cts(
    id: &str,
    name_sender: Sender<Arc<str>>,
    num_clients: u32,
    doa_sync: Receiver<()>,
) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_recv_only::<pipe_mode::Messages>(),
        handle_conn_cts,
        doa_sync,
    )
}
pub fn server_stc(
    id: &str,
    name_sender: Sender<Arc<str>>,
    num_clients: u32,
    doa_sync: Receiver<()>,
) -> TestResult {
    drive_server(
        id,
        name_sender,
        num_clients,
        |plo| plo.mode(PipeMode::Messages).create_send_only::<pipe_mode::Messages>(),
        handle_conn_stc,
        doa_sync,
    )
}

pub fn client_duplex(name: &str, doa: bool) -> TestResult {
    let conn =
        DuplexPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?;
    if doa {
        return Ok(());
    }

    let (mut recver, mut sender) = conn.split();

    let [msg1, msg2] = msgs(false);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)?;

    let [msg1, msg2] = msgs(true);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)?;

    DuplexPipeStream::reunite(recver, sender).opname("reunite")?;
    Ok(())
}
pub fn client_cts(name: &str, doa: bool) -> TestResult {
    let mut sender =
        SendPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?;
    if doa {
        return Ok(());
    }
    let [msg1, msg2] = msgs(false);
    send(&mut sender, msg1, 0)?;
    send(&mut sender, msg2, 1)
}
pub fn client_stc(name: &str, doa: bool) -> TestResult {
    let mut recver =
        RecvPipeStream::<pipe_mode::Messages>::connect_by_path(name).opname("connect")?;
    if doa {
        return Ok(());
    }
    let [msg1, msg2] = msgs(true);
    recv(&mut recver, msg1, 0)?;
    recv(&mut recver, msg2, 1)
}

fn recv(
    conn: &mut RecvPipeStream<pipe_mode::Messages>,
    exp: impl AsRef<str>,
    nr: u8,
) -> TestResult {
    let fs = ["first", "second"][nr.to_usize()];
    let exp_ = exp.as_ref();
    let mut len = exp_.len();
    if nr == 2 {
        len -= 1; // tests spill
    }
    let mut buf = MsgBuf::from(Vec::with_capacity(len));

    let rslt = conn.recv_msg(&mut buf, None).with_context(|| format!("{} receive failed", fs))?;

    ensure_eq!(futf8(buf.filled_part())?, exp_);
    if nr == 2 {
        ensure!(matches!(rslt, RecvResult::Spilled));
    } else {
        ensure!(matches!(rslt, RecvResult::Fit));
    }
    Ok(())
}

fn send(
    conn: &mut SendPipeStream<pipe_mode::Messages>,
    msg: impl AsRef<str>,
    nr: u8,
) -> TestResult {
    let msg_ = msg.as_ref();
    let fs = ["first", "second"][nr.to_usize()];

    let sent = conn.send(msg_.as_bytes()).wrap_err_with(|| format!("{} send failed", fs))?;

    ensure_eq!(sent, msg_.len());
    Ok(())
}