processkit 0.6.2

Child-process management: kill-on-drop process trees and async run-and-capture
Documentation
//! Non-consuming readiness probes: `wait_for_line` / `wait_for` /
//! `wait_for_port` and their shared polling loop.

use std::future::Future;
use std::net::SocketAddr;
use std::time::{Duration, Instant};

use tokio::net::TcpStream;

use crate::error::{Error, Result};

use super::RunningProcess;

/// How often [`RunningProcess::wait_for`] / [`wait_for_port`]
/// (RunningProcess::wait_for_port) re-check readiness — responsive without
/// busy-spinning; matches the 50 ms liveness-poll cadence used elsewhere.
const READINESS_POLL: Duration = Duration::from_millis(50);

/// Cap on a single `wait_for_port` connect attempt (clamped to the remaining
/// budget), so one stalled connect can't overrun the probe deadline.
const CONNECT_ATTEMPT_CAP: Duration = Duration::from_secs(1);

impl RunningProcess {
    /// Wait until a stdout line matches `predicate` (returning that line), or
    /// fail with [`Error::NotReady`] when `within` elapses — or immediately
    /// when stdout closes before a match (e.g. the child exited and no
    /// descendant kept the pipe open), since no further line can arrive. A
    /// child that exits while a descendant still holds its stdout keeps the
    /// stream open, so that case waits out the deadline — the pipe, not the
    /// process, is what this probe watches.
    ///
    /// The readiness idiom: start a server, wait for its "listening on …"
    /// banner, then use it — no arbitrary sleeps.
    ///
    /// # Caveats
    ///
    /// - **Consumes stdout** up to and including the matching line (this is
    ///   the one-shot [`stdout_lines`](Self::stdout_lines) stream underneath —
    ///   if it was already called, the probe sees a closed stream and reports
    ///   `NotReady` immediately). Continue with
    ///   [`finish_streamed`](Self::finish_streamed) for the exit code and
    ///   stderr; the other probes don't touch stdout.
    /// - A failed probe does **not** kill the child — unlike
    ///   [`Command::timeout`](crate::Command::timeout), whose deadline (if
    ///   configured) is armed by this call exactly as `stdout_lines` always
    ///   arms it, and still tears the tree down independently of the probe.
    pub async fn wait_for_line(
        &mut self,
        predicate: impl Fn(&str) -> bool,
        within: Duration,
    ) -> Result<String> {
        use tokio_stream::StreamExt;

        // Bound the borrow: `StdoutLines` owns its sink (no borrow of self), so
        // `self.program` stays readable after the search.
        let mut lines = self.stdout_lines();
        let search = async {
            while let Some(line) = lines.next().await {
                if predicate(&line) {
                    return Some(line);
                }
            }
            None // stdout closed before any match — readiness can't happen.
        };
        match tokio::time::timeout(within, search).await {
            Ok(Some(line)) => Ok(line),
            Ok(None) | Err(_) => Err(self.not_ready(within)),
        }
    }

    /// Wait until `check` (re-invoked every ~50 ms, first attempt immediate)
    /// returns `true`, or fail with [`Error::NotReady`] when `within` elapses —
    /// or immediately when the child exits first (a dead process never becomes
    /// ready).
    ///
    /// The check is any async predicate — an HTTP health endpoint, a file
    /// appearing, a database accepting connections. Doesn't touch the child's
    /// pipes, so it composes with any later consumption
    /// ([`wait`](Self::wait), [`output_string`](Self::output_string), …). A
    /// failed probe does not kill the child. The deadline bounds the polling
    /// loop, not an in-flight check: a slow `check` future can overrun
    /// `within` by its own duration.
    pub async fn wait_for<F, Fut>(&mut self, check: F, within: Duration) -> Result<()>
    where
        F: FnMut() -> Fut,
        Fut: Future<Output = bool>,
    {
        self.poll_until(check, within).await
    }

    /// Wait until a TCP connection to `addr` is accepted, or fail with
    /// [`Error::NotReady`] when `within` elapses — or immediately when the
    /// child exits first.
    ///
    /// One connect attempt per ~50 ms tick (each attempt itself bounded so a
    /// stalled connect can't overrun the deadline); the probe connection is
    /// dropped as soon as it succeeds. Doesn't touch the child's pipes; a
    /// failed probe does not kill the child.
    pub async fn wait_for_port(&mut self, addr: SocketAddr, within: Duration) -> Result<()> {
        let deadline = Instant::now() + within;
        self.poll_until(
            move || {
                let remaining = deadline.saturating_duration_since(Instant::now());
                async move {
                    // Clamp the attempt to the remaining budget; floor at 1ms so
                    // the final tick still makes a (brief) attempt.
                    let cap = CONNECT_ATTEMPT_CAP
                        .min(remaining)
                        .max(Duration::from_millis(1));
                    matches!(
                        tokio::time::timeout(cap, TcpStream::connect(addr)).await,
                        Ok(Ok(_))
                    )
                }
            },
            within,
        )
        .await
    }

    /// Re-run `check` on the readiness cadence until it passes, the child
    /// exits, or the deadline elapses.
    async fn poll_until<F, Fut>(&mut self, mut check: F, within: Duration) -> Result<()>
    where
        F: FnMut() -> Fut,
        Fut: Future<Output = bool>,
    {
        let deadline = Instant::now() + within;
        loop {
            if check().await {
                return Ok(());
            }
            // An exited child can never become ready — fail fast rather than
            // burning the rest of the deadline. (`Err` from try_wait means
            // "couldn't tell"; keep polling, the deadline still bounds us.)
            if matches!(self.child.try_wait(), Ok(Some(_))) {
                return Err(self.not_ready(within));
            }
            let remaining = deadline.saturating_duration_since(Instant::now());
            if remaining.is_zero() {
                return Err(self.not_ready(within));
            }
            tokio::time::sleep(READINESS_POLL.min(remaining)).await;
        }
    }

    fn not_ready(&self, within: Duration) -> Error {
        Error::NotReady {
            program: self.program.clone(),
            timeout: within,
        }
    }
}