tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::SingleSubscriberOutputStream;
use super::common::wait_for_no_active_consumer;
use crate::{NumBytesExt, RawCollectionOptions, StreamConfig};
use assertr::prelude::*;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::time::sleep;

#[tokio::test]
async fn reader_drains_after_consumer_drop() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        StreamConfig::builder()
            .reliable_for_active_subscribers()
            .no_replay()
            .read_chunk_size(16.bytes())
            .max_buffered_chunks(1)
            .build(),
    );

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    drop(collector);
    wait_for_no_active_consumer(&stream).await;

    let idle_output = vec![b'x'; 4096];
    tokio::time::timeout(Duration::from_secs(1), write_half.write_all(&idle_output))
        .await
        .expect("reader should keep draining with no active consumer")
        .unwrap();
    sleep(Duration::from_millis(25)).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    write_half.write_all(b"tail").await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"tail".to_vec());
}