interprocess-docfix 1.2.2

Interprocess communication toolkit. Docs fixed.
Documentation
use {
    super::util::{NameGen, TestResult},
    anyhow::Context,
    futures::io::{AsyncReadExt, AsyncWriteExt},
    interprocess::os::windows::named_pipe::{
        tokio::{DuplexMsgPipeStream, PipeListenerOptionsExt},
        PipeListenerOptions, PipeMode,
    },
    std::{convert::TryInto, ffi::OsStr, io, sync::Arc, time::Duration},
    tokio::{sync::oneshot::Sender, task, time::sleep, try_join},
};

const SERVER_MSG_1: &[u8] = b"Server message 1";
const SERVER_MSG_2: &[u8] = b"Server message 2";

const CLIENT_MSG_1: &[u8] = b"Client message 1";
const CLIENT_MSG_2: &[u8] = b"Client message 2";

pub async fn server(name_sender: Sender<String>, num_clients: u32) -> TestResult {
    async fn handle_conn(conn: DuplexMsgPipeStream) -> TestResult {
        let (mut reader, mut writer) = conn.split();
        let (mut buf1, mut buf2) = ([0; CLIENT_MSG_1.len()], [0; CLIENT_MSG_2.len()]);

        let read = async {
            let read = reader
                .read(&mut buf1)
                .await
                .context("First pipe receive failed")?;
            assert_eq!(read, CLIENT_MSG_1.len());
            assert_eq!(&buf1[0..read], CLIENT_MSG_1);

            let read = reader
                .read(&mut buf2)
                .await
                .context("Second pipe receive failed")?;
            assert_eq!(read, CLIENT_MSG_2.len());
            assert_eq!(&buf2[0..read], CLIENT_MSG_2);

            TestResult::Ok(())
        };
        let write = async {
            let written = writer
                .write(SERVER_MSG_1)
                .await
                .context("Pipe send failed")?;
            assert_eq!(written, SERVER_MSG_1.len());

            let written = writer
                .write(SERVER_MSG_2)
                .await
                .context("Pipe send failed")?;
            assert_eq!(written, SERVER_MSG_2.len());

            TestResult::Ok(())
        };
        try_join!(read, write)?;

        Ok(())
    }

    let (name, listener) = NameGen::new(true)
        .find_map(|nm| {
            let rnm: &OsStr = nm.as_ref();
            let l = match PipeListenerOptions::new()
                .name(rnm)
                .mode(PipeMode::Messages)
                .create_tokio::<DuplexMsgPipeStream>()
            {
                Ok(l) => l,
                Err(e) if e.kind() == io::ErrorKind::AddrInUse => return None,
                Err(e) => return Some(Err(e)),
            };
            Some(Ok((nm, l)))
        })
        .unwrap()
        .context("Listener bind failed")?;

    let _ = name_sender.send(name);

    let mut tasks = Vec::with_capacity(num_clients.try_into().unwrap());

    for _ in 0..num_clients {
        let conn = match listener.accept().await {
            Ok(c) => c,
            Err(e) => {
                eprintln!("Incoming connection failed: {}", e);
                continue;
            }
        };
        let task = task::spawn(handle_conn(conn));
        tasks.push(task);
    }
    for task in tasks {
        task.await
            .context("Server task panicked")?
            .context("Server task returned early with error")?;
    }

    Ok(())
}
pub async fn client(name: Arc<String>) -> TestResult {
    let (mut reader, mut writer) = loop {
        match DuplexMsgPipeStream::connect(name.as_str()) {
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                sleep(Duration::from_millis(10)).await;
                continue;
            }
            not_busy => break not_busy,
        }
    }
    .context("Connect failed")?
    .split();

    let (mut buf1, mut buf2) = ([0; SERVER_MSG_1.len()], [0; SERVER_MSG_2.len()]);

    let read = async {
        let read = reader
            .read(&mut buf1)
            .await
            .context("First pipe receive failed")?;
        assert_eq!(read, SERVER_MSG_1.len());
        assert_eq!(&buf1[0..read], SERVER_MSG_1);

        let read = reader
            .read(&mut buf2)
            .await
            .context("Second pipe receive failed")?;
        assert_eq!(read, SERVER_MSG_2.len());
        assert_eq!(&buf2[0..read], SERVER_MSG_2);

        TestResult::Ok(())
    };
    let write = async {
        let written = writer
            .write(CLIENT_MSG_1)
            .await
            .context("First pipe send failed")?;
        assert_eq!(written, CLIENT_MSG_1.len());

        let written = writer
            .write(CLIENT_MSG_2)
            .await
            .context("Second pipe send failed")?;
        assert_eq!(written, CLIENT_MSG_2.len());

        TestResult::Ok(())
    };
    try_join!(read, write)?;

    Ok(())
}