tokio-process-tools 0.11.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::SingleSubscriberOutputStream;
use super::common::{best_effort_no_replay_options, wait_for_no_active_consumer};
use crate::output_stream::Consumable;
use crate::output_stream::line::adapter::ParseLines;
use crate::output_stream::visitors::collect::CollectChunks;
use crate::output_stream::visitors::inspect::{InspectChunks, InspectChunksAsync};
use crate::{
    CollectedBytes, ConsumerCancelOutcome, LineParsingOptions, Next, RawCollectionOptions,
};
use assertr::prelude::*;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::sync::oneshot;

#[tokio::test]
async fn drop_allows_later_collector() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    let inspector = stream
        .consume(InspectChunks::builder().f(|_chunk| Next::Continue).build())
        .unwrap();
    drop(inspector);
    wait_for_no_active_consumer(&stream).await;

    let collector = stream
        .consume(CollectChunks::fold(
            CollectedBytes::new(),
            CollectedBytes::collector(RawCollectionOptions::TrustedUnbounded),
        ))
        .unwrap();
    write_half.write_all(b"later").await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"later".to_vec());
}

#[tokio::test]
async fn wait_cancellation_releases_single_subscriber_claim() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    let (entered_tx, entered_rx) = oneshot::channel();
    let mut entered_tx = Some(entered_tx);
    let inspector = stream
        .consume_async(ParseLines::inspect_async(
            LineParsingOptions::default(),
            move |_line| {
                if let Some(entered_tx) = entered_tx.take() {
                    entered_tx.send(()).unwrap();
                }
                async move { std::future::pending::<Next>().await }
            },
        ))
        .unwrap();

    write_half.write_all(b"ready\n").await.unwrap();
    entered_rx.await.unwrap();

    let result = tokio::time::timeout(Duration::from_millis(25), inspector.wait()).await;
    assert_that!(result.is_err()).is_true();
    wait_for_no_active_consumer(&stream).await;

    let collector = stream
        .consume(CollectChunks::fold(
            CollectedBytes::new(),
            CollectedBytes::collector(RawCollectionOptions::TrustedUnbounded),
        ))
        .unwrap();
    write_half.write_all(b"later\n").await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"later\n".to_vec());
}

#[tokio::test]
async fn cancel_aborts_hanging_async_callback_after_timeout() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    let (entered_tx, entered_rx) = oneshot::channel();
    let mut entered_tx = Some(entered_tx);
    let inspector = stream
        .consume_async(
            InspectChunksAsync::builder()
                .f(move |_chunk| {
                    if let Some(entered_tx) = entered_tx.take() {
                        entered_tx.send(()).unwrap();
                    }
                    async move { std::future::pending::<Next>().await }
                })
                .build(),
        )
        .unwrap();

    write_half.write_all(b"ready").await.unwrap();
    entered_rx.await.unwrap();

    let outcome = inspector.cancel(Duration::from_millis(25)).await.unwrap();
    assert_that!(matches!(outcome, ConsumerCancelOutcome::Aborted)).is_true();
    wait_for_no_active_consumer(&stream).await;

    let collector = stream
        .consume(CollectChunks::fold(
            CollectedBytes::new(),
            CollectedBytes::collector(RawCollectionOptions::TrustedUnbounded),
        ))
        .unwrap();
    write_half.write_all(b"later").await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"later".to_vec());
}

#[tokio::test]
async fn abort_releases_single_subscriber_claim() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    let (entered_tx, entered_rx) = oneshot::channel();
    let mut entered_tx = Some(entered_tx);
    let inspector = stream
        .consume_async(
            InspectChunksAsync::builder()
                .f(move |_chunk| {
                    if let Some(entered_tx) = entered_tx.take() {
                        entered_tx.send(()).unwrap();
                    }
                    async move { std::future::pending::<Next>().await }
                })
                .build(),
        )
        .unwrap();

    write_half.write_all(b"ready").await.unwrap();
    entered_rx.await.unwrap();

    inspector.abort().await;
    wait_for_no_active_consumer(&stream).await;

    let collector = stream
        .consume(CollectChunks::fold(
            CollectedBytes::new(),
            CollectedBytes::collector(RawCollectionOptions::TrustedUnbounded),
        ))
        .unwrap();
    write_half.write_all(b"later").await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"later".to_vec());
}