tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::common::wait_for_terminal_shared;
use super::reader_test_support::spawn_configured_reader;
use crate::NumBytesExt;
use crate::output_stream::event::StreamEvent;
use crate::output_stream::policy::DeliveryGuarantee;
use crate::test_support::ReadErrorAfterBytes;
use assertr::prelude::*;
use std::io;
use std::time::Duration;

#[tokio::test]
async fn configured_reader_publishes_read_error_without_panicking() {
    let (stream_reader, mut rx, _shared) = spawn_configured_reader(
        ReadErrorAfterBytes::new(b"ready\n", io::ErrorKind::BrokenPipe),
        DeliveryGuarantee::BestEffort,
        2.bytes(),
        64,
    );

    stream_reader.await.unwrap();

    let mut saw_error = false;
    while let Some(event) = tokio::time::timeout(Duration::from_secs(1), rx.recv())
        .await
        .expect("timed out waiting for stream event")
    {
        match event {
            StreamEvent::Chunk(_) | StreamEvent::Gap => {}
            StreamEvent::Eof => {
                assert_that!(()).fail("read failure must not be reported as EOF");
            }
            StreamEvent::ReadError(err) => {
                assert_that!(err.stream_name()).is_equal_to("custom");
                assert_that!(err.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
                saw_error = true;
                break;
            }
        }
    }

    assert_that!(saw_error).is_true();
}

#[tokio::test(flavor = "multi_thread")]
async fn configured_reader_records_read_error_while_active_queue_is_full() {
    let (stream_reader, mut rx, shared) = spawn_configured_reader(
        ReadErrorAfterBytes::new(b"aabbcc", io::ErrorKind::BrokenPipe),
        DeliveryGuarantee::BestEffort,
        2.bytes(),
        1,
    );

    let terminal = tokio::time::timeout(Duration::from_secs(1), wait_for_terminal_shared(&shared))
        .await
        .expect("read error should be recorded even while live delivery is blocked");
    match terminal {
        StreamEvent::ReadError(err) => {
            assert_that!(err.stream_name()).is_equal_to("custom");
            assert_that!(err.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
        }
        other => {
            assert_that!(&other).fail(format_args!("expected read error, got {other:?}"));
        }
    }
    assert_that!(stream_reader.is_finished()).is_false();
    drop(shared);

    match rx.recv().await.unwrap() {
        StreamEvent::Chunk(chunk) => {
            assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
        }
        other => {
            assert_that!(&other).fail(format_args!("expected buffered chunk, got {other:?}"));
        }
    }
    assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
    match rx.recv().await.unwrap() {
        StreamEvent::ReadError(err) => {
            assert_that!(err.stream_name()).is_equal_to("custom");
            assert_that!(err.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
        }
        other => {
            assert_that!(&other).fail(format_args!("expected read error, got {other:?}"));
        }
    }

    stream_reader.await.unwrap();
    assert_that!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .expect("timed out waiting for stream closure")
    )
    .is_none();
}