tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::SingleSubscriberOutputStream;
use super::common::wait_for_terminal_shared;
use super::reader_test_support::{recv_event_with_timeout, spawn_configured_reader};
use crate::output_stream::event::StreamEvent;
use crate::output_stream::policy::DeliveryGuarantee;
use crate::{DEFAULT_READ_CHUNK_SIZE, LineParsingOptions, Next, NumBytesExt, StreamConfig};
use assertr::prelude::*;
use std::io::{self, Cursor};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
use tokio::time::sleep;
use tracing_test::traced_test;

#[derive(Debug)]
struct AlwaysReadyBytes {
    remaining: usize,
    bytes_read: Arc<AtomicUsize>,
}

impl AlwaysReadyBytes {
    fn new(total_bytes: usize, bytes_read: Arc<AtomicUsize>) -> Self {
        Self {
            remaining: total_bytes,
            bytes_read,
        }
    }
}

impl AsyncRead for AlwaysReadyBytes {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        const BYTES: [u8; 1024] = [b'x'; 1024];

        if self.remaining == 0 {
            return Poll::Ready(Ok(()));
        }

        let len = self.remaining.min(buf.remaining()).min(BYTES.len());
        buf.put_slice(&BYTES[..len]);
        self.remaining -= len;
        self.bytes_read.fetch_add(len, Ordering::Relaxed);
        Poll::Ready(Ok(()))
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn configured_reader_sends_pending_gap_before_terminal_eof() {
    let read = Cursor::new(b"aabbcc".to_vec());
    let (stream_reader, mut rx, shared) =
        spawn_configured_reader(read, DeliveryGuarantee::BestEffort, 2.bytes(), 1);
    drop(shared);

    match rx.recv().await.unwrap() {
        StreamEvent::Chunk(chunk) => {
            assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
        }
        other => {
            assert_that!(&other).fail(format_args!("expected first chunk, got {other:?}"));
        }
    }

    let mut previous = None;
    while let Some(event) = rx.recv().await {
        match event {
            StreamEvent::Eof => {
                assert_that!(previous).is_equal_to(Some(StreamEvent::Gap));
                break;
            }
            StreamEvent::Chunk(_) => {}
            event @ StreamEvent::Gap => {
                previous = Some(event);
            }
            StreamEvent::ReadError(err) => {
                assert_that!(&err).fail(format_args!("unexpected read error: {err}"));
            }
        }
    }

    stream_reader.await.unwrap();
    assert_that!(rx.recv().await).is_none();
}

#[tokio::test(flavor = "multi_thread")]
async fn configured_reader_records_eof_while_active_queue_is_full() {
    let read = Cursor::new(b"aabbcc".to_vec());
    let (stream_reader, mut rx, shared) =
        spawn_configured_reader(read, DeliveryGuarantee::BestEffort, 2.bytes(), 1);

    let terminal = tokio::time::timeout(Duration::from_secs(1), wait_for_terminal_shared(&shared))
        .await
        .expect("EOF should be recorded even while live delivery is blocked");
    assert_that!(terminal).is_equal_to(StreamEvent::Eof);
    assert_that!(stream_reader.is_finished()).is_false();
    drop(shared);

    match recv_event_with_timeout(&mut rx).await {
        StreamEvent::Chunk(chunk) => {
            assert_that!(chunk.as_ref()).is_equal_to(b"aa".as_slice());
        }
        other => {
            assert_that!(&other).fail(format_args!("expected buffered chunk, got {other:?}"));
        }
    }
    assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
    assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);

    stream_reader.await.unwrap();
    assert_that!(rx.recv().await).is_none();
}

#[tokio::test(flavor = "current_thread")]
async fn configured_best_effort_yields_when_pending_gap_channel_is_full() {
    let total_bytes = 1024;
    let bytes_read = Arc::new(AtomicUsize::new(0));
    let (stream_reader, mut rx, _shared) = spawn_configured_reader(
        AlwaysReadyBytes::new(total_bytes, Arc::clone(&bytes_read)),
        DeliveryGuarantee::BestEffort,
        1.bytes(),
        1,
    );

    match rx.recv().await.unwrap() {
        StreamEvent::Chunk(chunk) => {
            assert_that!(chunk.as_ref()).is_equal_to(b"x".as_slice());
        }
        other => {
            assert_that!(&other).fail(format_args!("expected first chunk, got {other:?}"));
        }
    }

    let observed = bytes_read.load(Ordering::Relaxed);
    assert_that!(observed < total_bytes)
        .with_detail_message(format!(
            "reader consumed all {total_bytes} bytes before yielding"
        ))
        .is_true();

    drop(rx);
    stream_reader.await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn configured_reader_sends_pending_gap_before_resumed_chunk_delivery() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let (stream_reader, mut rx, shared) =
        spawn_configured_reader(read_half, DeliveryGuarantee::BestEffort, 2.bytes(), 2);
    drop(shared);

    write_half.write_all(b"aabbcc").await.unwrap();
    write_half.flush().await.unwrap();
    sleep(Duration::from_millis(25)).await;

    for expected in [b"aa".as_slice(), b"bb".as_slice()] {
        match rx.recv().await.unwrap() {
            StreamEvent::Chunk(chunk) => {
                assert_that!(chunk.as_ref()).is_equal_to(expected);
            }
            other => {
                assert_that!(&other).fail(format_args!("expected buffered chunk, got {other:?}"));
            }
        }
    }

    write_half.write_all(b"dd").await.unwrap();
    write_half.flush().await.unwrap();
    drop(write_half);

    assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Gap);
    match rx.recv().await.unwrap() {
        StreamEvent::Chunk(chunk) => {
            assert_that!(chunk.as_ref()).is_equal_to(b"dd".as_slice());
        }
        other => {
            assert_that!(&other).fail(format_args!("expected resumed chunk, got {other:?}"));
        }
    }
    assert_that!(rx.recv().await.unwrap()).is_equal_to(StreamEvent::Eof);

    stream_reader.await.unwrap();
    assert_that!(rx.recv().await).is_none();
}

#[tokio::test]
#[traced_test]
async fn handles_backpressure_by_dropping_newer_chunks_after_channel_buffer_filled_up() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let os = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        StreamConfig::builder()
            .best_effort_delivery()
            .no_replay()
            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
            .max_buffered_chunks(2)
            .build(),
    );

    let inspector = os
        .inspect_lines_async(
            |_line| async move {
                sleep(Duration::from_millis(100)).await;
                Next::Continue
            },
            LineParsingOptions::default(),
        )
        .unwrap();

    let producer = tokio::spawn(async move {
        for count in 1..=15 {
            write_half
                .write_all(format!("{count}\n").as_bytes())
                .await
                .unwrap();
            sleep(Duration::from_millis(25)).await;
        }
    });

    producer.await.unwrap();
    inspector.wait().await.unwrap();
    drop(os);

    logs_assert(|lines: &[&str]| {
        let lagged_logs = lines
            .iter()
            .filter(|line| line.contains("Stream reader is lagging behind lagged="))
            .count();
        if lagged_logs == 0 {
            return Err("Expected at least one lagged log".to_string());
        }
        Ok(())
    });
}