tokio-process-tools 0.9.0

Correctness-focused async subprocess orchestration for Tokio: bounded output, multi-consumer streams, output detection, guaranteed cleanup and graceful termination.
Documentation
use super::super::SingleSubscriberOutputStream;
use super::super::state::ConfiguredShared;
use crate::AsyncChunkCollector;
use crate::output_stream::event::{Chunk, StreamEvent};
use crate::output_stream::policy::{Delivery, Replay};
use crate::{Next, NumBytes};
use crate::{NumBytesExt, ReplayRetention, StreamConfig};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use tokio::sync::oneshot;

pub(super) fn best_effort_no_replay_options() -> StreamConfig {
    best_effort_no_replay_options_with(4.bytes(), 4)
}

pub(super) fn best_effort_no_replay_options_with(
    read_chunk_size: NumBytes,
    max_buffered_chunks: usize,
) -> StreamConfig {
    StreamConfig::builder()
        .best_effort_delivery()
        .no_replay()
        .read_chunk_size(read_chunk_size)
        .max_buffered_chunks(max_buffered_chunks)
        .build()
}

pub(super) fn reliable_replay_options(
    replay_retention: ReplayRetention,
) -> StreamConfig<crate::ReliableDelivery, crate::ReplayEnabled> {
    let builder = StreamConfig::builder().reliable_for_active_subscribers();
    match replay_retention {
        ReplayRetention::LastChunks(chunks) => builder.replay_last_chunks(chunks),
        ReplayRetention::LastBytes(bytes) => builder.replay_last_bytes(bytes),
        ReplayRetention::All => builder.replay_all(),
    }
    .read_chunk_size(4.bytes())
    .max_buffered_chunks(4)
    .build()
}

pub(super) async fn wait_for_no_active_consumer<D, R>(stream: &SingleSubscriberOutputStream<D, R>)
where
    D: Delivery,
    R: Replay,
{
    stream
        .configured_shared
        .subscribe_active()
        .wait_for(Option::is_none)
        .await
        .expect("active subscriber watch closed");
}

pub(super) async fn wait_for_bytes_ingested<D, R>(
    stream: &SingleSubscriberOutputStream<D, R>,
    bytes: u64,
) where
    D: Delivery,
    R: Replay,
{
    wait_for_bytes_ingested_shared(&stream.configured_shared, bytes).await;
}

pub(super) async fn wait_for_bytes_ingested_shared(shared: &ConfiguredShared, bytes: u64) {
    shared
        .subscribe_bytes_ingested()
        .wait_for(|observed| *observed >= bytes)
        .await
        .expect("bytes-ingested watch closed");
}

pub(super) async fn wait_for_terminal<D, R>(
    stream: &SingleSubscriberOutputStream<D, R>,
) -> StreamEvent
where
    D: Delivery,
    R: Replay,
{
    wait_for_terminal_shared(&stream.configured_shared).await
}

pub(super) async fn wait_for_terminal_shared(shared: &ConfiguredShared) -> StreamEvent {
    shared
        .subscribe_terminal()
        .wait_for(Option::is_some)
        .await
        .expect("terminal watch closed")
        .clone()
        .expect("terminal watch settled with None after wait_for is_some")
}

pub(super) struct HangingChunkCollector {
    entered_tx: Option<oneshot::Sender<()>>,
}

impl HangingChunkCollector {
    pub(super) fn new(entered_tx: oneshot::Sender<()>) -> Self {
        Self {
            entered_tx: Some(entered_tx),
        }
    }
}

impl AsyncChunkCollector<Vec<u8>> for HangingChunkCollector {
    async fn collect<'a>(&'a mut self, _chunk: Chunk, _seen: &'a mut Vec<u8>) -> Next {
        if let Some(entered_tx) = self.entered_tx.take() {
            entered_tx.send(()).unwrap();
        }
        std::future::pending::<Next>().await
    }
}

pub(super) struct PendingWrite {
    entered_tx: Option<oneshot::Sender<()>>,
}

impl PendingWrite {
    pub(super) fn new(entered_tx: oneshot::Sender<()>) -> Self {
        Self {
            entered_tx: Some(entered_tx),
        }
    }
}

impl AsyncWrite for PendingWrite {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        _buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        if let Some(entered_tx) = self.entered_tx.take() {
            entered_tx.send(()).unwrap();
        }
        Poll::Pending
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}