io-streams 0.16.3

Unbuffered and unlocked I/O streams
Documentation
#![cfg_attr(can_vector, feature(can_vector))]
#![cfg_attr(write_all_vectored, feature(write_all_vectored))]

use cap_tempfile::{ambient_authority, tempdir, TempDir};
#[cfg(not(target_os = "wasi"))]
use io_streams::StreamDuplexer;
use io_streams::{StreamReader, StreamWriter};
use std::io::{copy, Read, Write};
#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
use {socketpair::SocketpairStream, std::str};

fn tmpdir() -> TempDir {
    tempdir(ambient_authority()).expect("expected to be able to create a temporary directory")
}

#[test]
fn test_copy() -> anyhow::Result<()> {
    let dir = tmpdir();
    let in_txt = "in.txt";
    let out_txt = "out.txt";

    let mut in_file = dir.create(in_txt)?;
    write!(in_file, "Hello, world!")?;

    // Test regular file I/O.
    {
        let mut input = StreamReader::file(dir.open(in_txt)?);
        let mut output = StreamWriter::file(dir.create(out_txt)?);
        copy(&mut input, &mut output)?;
        output.flush()?;
        let mut s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!");
        dir.remove_file(out_txt)?;
    }

    // Test I/O through piped threads.
    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
    {
        let mut input = StreamReader::piped_thread(Box::new(dir.open(in_txt)?))?;
        let mut output = StreamWriter::piped_thread(Box::new(dir.create(out_txt)?))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        let mut s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!");
        dir.remove_file(out_txt)?;
    }

    // Test regular file I/O through piped threads, not because this is
    // amazingly useful, but because these things should compose and we can.
    // This also tests that `StreamReader` and `StreamWriter`
    // implement `Send`.
    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
    {
        let mut input =
            StreamReader::piped_thread(Box::new(StreamReader::file(dir.open(in_txt)?)))?;
        let mut output =
            StreamWriter::piped_thread(Box::new(StreamWriter::file(dir.create(out_txt)?)))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        let mut s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!");
        dir.remove_file(out_txt)?;
    }

    // They compose with themselves too.
    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
    {
        let mut input = StreamReader::piped_thread(Box::new(StreamReader::piped_thread(
            Box::new(StreamReader::file(dir.open(in_txt)?)),
        )?))?;
        let mut output = StreamWriter::piped_thread(Box::new(StreamWriter::piped_thread(
            Box::new(StreamWriter::file(dir.create(out_txt)?)),
        )?))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        let mut s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!");
        dir.remove_file(out_txt)?;
    }

    // Test flushing between writes.
    #[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
    {
        let mut input = StreamReader::piped_thread(Box::new(StreamReader::piped_thread(
            Box::new(StreamReader::file(dir.open(in_txt)?)),
        )?))?;
        let mut output = StreamWriter::piped_thread(Box::new(StreamWriter::piped_thread(
            Box::new(StreamWriter::file(dir.create(out_txt)?)),
        )?))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        let mut s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!");
        input = StreamReader::piped_thread(Box::new(StreamReader::piped_thread(Box::new(
            StreamReader::file(dir.open(in_txt)?),
        ))?))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!Hello, world!");
        input = StreamReader::piped_thread(Box::new(StreamReader::piped_thread(Box::new(
            StreamReader::file(dir.open(in_txt)?),
        ))?))?;
        copy(&mut input, &mut output)?;
        output.flush()?;
        s = String::new();
        dir.open(out_txt)?.read_to_string(&mut s)?;
        assert_eq!(s, "Hello, world!Hello, world!Hello, world!");
        dir.remove_file(out_txt)?;
    }

    Ok(())
}

#[test]
#[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
fn test_null_output() -> anyhow::Result<()> {
    let mut input = StreamReader::str("send to null")?;
    let mut output = StreamWriter::null()?;
    copy(&mut input, &mut output)?;
    output.flush()?;
    Ok(())
}

#[test]
#[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
fn test_null_input() -> anyhow::Result<()> {
    let mut input = StreamReader::null()?;
    let mut s = String::new();
    input.read_to_string(&mut s)?;
    assert!(s.is_empty());
    Ok(())
}

#[test]
#[cfg(not(target_os = "wasi"))] // WASI doesn't support pipes yet
fn test_null_duplex() -> anyhow::Result<()> {
    let mut duplex = StreamDuplexer::null()?;
    let mut s = String::new();
    duplex.read_to_string(&mut s)?;
    assert!(s.is_empty());
    let mut input = StreamReader::str("send to null")?;
    copy(&mut input, &mut duplex)?;
    duplex.flush()?;
    Ok(())
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
#[test]
fn test_socketed_thread_func() -> anyhow::Result<()> {
    let mut thread = StreamDuplexer::socketed_thread_func(Box::new(
        |mut stream: SocketpairStream| -> std::io::Result<SocketpairStream> {
            let mut buf = [0_u8; 4096];
            let n = stream.read(&mut buf)?;
            assert_eq!(str::from_utf8(&buf[..n]).unwrap(), "hello world\n");

            writeln!(stream, "greetings")?;
            stream.flush()?;

            let n = stream.read(&mut buf)?;
            assert_eq!(str::from_utf8(&buf[..n]).unwrap(), "goodbye\n");
            Ok(stream)
        },
    ))?;

    writeln!(thread, "hello world")?;
    thread.flush()?;

    let mut buf = [0_u8; 4096];
    let n = thread.read(&mut buf)?;
    assert_eq!(str::from_utf8(&buf[..n]).unwrap(), "greetings\n");

    writeln!(thread, "goodbye")?;
    thread.flush()?;

    Ok(())
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
struct Mock(bool);

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
impl Read for Mock {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        assert!(!self.0);
        self.0 = true;
        assert!(!buf.is_empty());
        let len = buf.len() - 1;
        buf[..len].copy_from_slice(&vec![0xab_u8; len]);
        Ok(len)
    }
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
impl Write for Mock {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        assert!(self.0);
        self.0 = false;
        assert_eq!(buf, &vec![0xcd_u8; buf.len()][..]);
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
impl system_interface::io::ReadReady for Mock {
    fn num_ready_bytes(&self) -> std::io::Result<u64> {
        if self.0 {
            Ok(0)
        } else {
            Ok(1)
        }
    }
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
impl duplex::Duplex for Mock {}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
#[test]
fn test_socketed_thread_read_first() -> anyhow::Result<()> {
    let mut thread = StreamDuplexer::socketed_thread_read_first(Box::new(Mock(false)))?;

    let mut buf = [0_u8; 4];

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    Ok(())
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
#[test]
fn test_socketed_thread_write_first() -> anyhow::Result<()> {
    let mut thread = StreamDuplexer::socketed_thread_write_first(Box::new(Mock(true)))?;
    let mut buf = [0_u8; 4];

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    Ok(())
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
#[test]
fn test_socketed_thread_auto_read_first() -> anyhow::Result<()> {
    let mut thread = StreamDuplexer::socketed_thread(Box::new(Mock(false)))?;

    let mut buf = [0_u8; 4];

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd")?;
    assert_eq!(n, 3);
    thread.flush()?;

    Ok(())
}

#[cfg(all(not(target_os = "wasi"), feature = "socketpair"))]
#[test]
fn test_socketed_thread_auto_write_first() -> anyhow::Result<()> {
    let mut thread = StreamDuplexer::socketed_thread(Box::new(Mock(true)))?;
    let mut buf = [0_u8; 4];

    let n = thread.write(b"\xcd\xcd\xcd\xcd")?;
    assert_eq!(n, 4);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    let n = thread.write(b"\xcd\xcd\xcd")?;
    assert_eq!(n, 3);
    thread.flush()?;

    let n = thread.read(&mut buf)?;
    assert_eq!(n, 4);
    assert_eq!(&buf, &[0xab_u8; 4]);

    Ok(())
}

#[test]
fn test_stdio() {
    // Test that we can at least construct the stdio streams.
    let _ = StreamReader::stdin();
    let _ = StreamWriter::stdout();
    let _ = StreamWriter::stderr();
}