tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::BroadcastOutputStream;
use super::common::{best_effort_no_replay_options, reliable_options};
use crate::{LineParsingOptions, ReplayRetention, WaitForLineResult};
use assertr::prelude::*;
use std::time::Duration;

#[tokio::test]
async fn dropping_fast_stream_closes_waiting_subscribers() {
    let (read_half, _write_half) = tokio::io::duplex(64);
    let stream =
        BroadcastOutputStream::from_stream(read_half, "custom", best_effort_no_replay_options());

    let waiter = stream.wait_for_line(
        Duration::from_secs(1),
        |line| line == "never",
        LineParsingOptions::default(),
    );
    drop(stream);

    let result = tokio::time::timeout(Duration::from_secs(1), waiter).await;
    assert_that!(result)
        .is_ok()
        .is_equal_to(Ok(WaitForLineResult::StreamClosed));
}

#[tokio::test]
async fn dropping_fanout_replay_stream_closes_waiting_subscribers() {
    let (read_half, _write_half) = tokio::io::duplex(64);
    let stream = BroadcastOutputStream::from_stream(
        read_half,
        "custom",
        reliable_options(ReplayRetention::All),
    );

    let waiter = stream.wait_for_line(
        Duration::from_secs(1),
        |line| line == "never",
        LineParsingOptions::default(),
    );
    drop(stream);

    let result = tokio::time::timeout(Duration::from_secs(1), waiter).await;
    assert_that!(result)
        .is_ok()
        .is_equal_to(Ok(WaitForLineResult::StreamClosed));
}