tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::SingleSubscriberOutputStream;
use super::common::{
    best_effort_no_replay_options, reliable_replay_options, wait_for_bytes_ingested,
    wait_for_no_active_consumer, wait_for_terminal,
};
use crate::output_stream::event::StreamEvent;
use crate::test_support::ReadErrorAfterBytes;
use crate::{
    ConsumerError, LineParsingOptions, NumBytesExt, RawCollectionOptions, ReplayRetention,
    StreamConfig, WaitForLineResult,
};
use assertr::prelude::*;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn typed_no_replay_starts_consumer_at_live_output() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    write_half.write_all(b"old").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 3).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    write_half.write_all(b"live").await.unwrap();
    write_half.flush().await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"live".to_vec());
}

#[tokio::test]
async fn typed_replay_all_delivers_pre_consumer_output_before_live_output() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        reliable_replay_options(ReplayRetention::All),
    );

    write_half.write_all(b"old").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 3).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    write_half.write_all(b"live").await.unwrap();
    write_half.flush().await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"oldlive".to_vec());
}

#[tokio::test]
async fn configured_subscription_snapshots_replay_buffer_from_shared_state() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        reliable_replay_options(ReplayRetention::All),
    );
    let shared = Arc::clone(&stream.configured_shared);

    write_half.write_all(b"old").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 3).await;

    {
        let state = shared
            .state
            .lock()
            .expect("single-subscriber state poisoned");
        assert_that!(state.events.len()).is_equal_to(1);
    }

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();

    {
        let state = shared
            .state
            .lock()
            .expect("single-subscriber state poisoned");
        assert_that!(state.events.len()).is_equal_to(1);
    }

    write_half.write_all(b"live").await.unwrap();
    write_half.flush().await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"oldlive".to_vec());
}

#[tokio::test]
async fn no_replay_discards_output_between_consumers() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    assert_that!(
        collector
            .cancel(Duration::from_secs(1))
            .await
            .unwrap()
            .expect_cancelled("collector should observe cancellation")
            .bytes
    )
    .is_empty();
    wait_for_no_active_consumer(&stream).await;

    write_half.write_all(b"idle").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 4).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_empty();
}

#[tokio::test]
async fn replay_retains_output_between_consumers() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        reliable_replay_options(ReplayRetention::All),
    );

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    assert_that!(
        collector
            .cancel(Duration::from_secs(1))
            .await
            .unwrap()
            .expect_cancelled("collector should observe cancellation")
            .bytes
    )
    .is_empty();
    wait_for_no_active_consumer(&stream).await;

    write_half.write_all(b"idle").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 4).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"idle".to_vec());
}

#[tokio::test]
async fn replay_retention_limits_later_consumer() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        reliable_replay_options(ReplayRetention::LastChunks(1)),
    );

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    assert_that!(
        collector
            .cancel(Duration::from_secs(1))
            .await
            .unwrap()
            .expect_cancelled("collector should observe cancellation")
            .bytes
    )
    .is_empty();
    wait_for_no_active_consumer(&stream).await;

    write_half.write_all(b"aabbcc").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 6).await;

    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"cc".to_vec());
}

#[tokio::test]
async fn later_consumer_observes_eof() {
    let (read_half, write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        best_effort_no_replay_options(),
    );

    drop(write_half);
    assert_that!(wait_for_terminal(&stream).await).is_equal_to(StreamEvent::Eof);

    let bytes = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap()
        .wait()
        .await
        .unwrap();
    assert_that!(bytes.bytes).is_empty();
}

#[tokio::test]
async fn later_consumer_observes_read_error() {
    let stream = SingleSubscriberOutputStream::from_stream(
        ReadErrorAfterBytes::new(b"before-error", io::ErrorKind::BrokenPipe),
        "custom",
        best_effort_no_replay_options(),
    );

    let _ = wait_for_terminal(&stream).await;

    match stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap()
        .wait()
        .await
    {
        Err(ConsumerError::StreamRead { source }) => {
            assert_that!(source.stream_name()).is_equal_to("custom");
            assert_that!(source.kind()).is_equal_to(io::ErrorKind::BrokenPipe);
        }
        other => {
            assert_that!(&other).fail(format_args!("expected stream read error, got {other:?}"));
        }
    }
}

#[tokio::test]
async fn configured_subscription_after_seal_starts_live_with_empty_shared_replay() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let stream = SingleSubscriberOutputStream::from_stream(
        read_half,
        "custom",
        reliable_replay_options(ReplayRetention::All),
    );
    let shared = Arc::clone(&stream.configured_shared);

    write_half.write_all(b"old").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 3).await;

    {
        let state = shared
            .state
            .lock()
            .expect("single-subscriber state poisoned");
        assert_that!(state.events.len()).is_equal_to(1);
    }

    stream.seal_replay();
    let collector = stream
        .collect_chunks_into_vec(RawCollectionOptions::TrustedUnbounded)
        .unwrap();

    {
        let state = shared
            .state
            .lock()
            .expect("single-subscriber state poisoned");
        assert_that!(state.events.len()).is_equal_to(0);
    }

    write_half.write_all(b"live").await.unwrap();
    write_half.flush().await.unwrap();
    drop(write_half);

    let bytes = collector.wait().await.unwrap();
    assert_that!(bytes.bytes).is_equal_to(b"live".to_vec());
}

#[tokio::test]
async fn typed_wait_after_seal_starts_live() {
    let (read_half, mut write_half) = tokio::io::duplex(64);
    let options = StreamConfig::builder()
        .reliable_for_active_subscribers()
        .replay_all()
        .read_chunk_size(4.bytes())
        .max_buffered_chunks(4)
        .build();
    let stream = SingleSubscriberOutputStream::from_stream(read_half, "custom", options);

    write_half.write_all(b"ready\n").await.unwrap();
    write_half.flush().await.unwrap();
    wait_for_bytes_ingested(&stream, 6).await;
    stream.seal_replay();

    let waiter = stream
        .wait_for_line(
            Duration::from_secs(1),
            |line| line == "live",
            LineParsingOptions::default(),
        )
        .unwrap();

    write_half.write_all(b"live\n").await.unwrap();
    drop(write_half);

    assert_that!(waiter.await).is_equal_to(Ok(WaitForLineResult::Matched));
}