tastty-driver 0.1.0

Terminal automation driver built on tastty
//! Shared exit-wait notifier driven by the session's exit reaper.
//!
//! One [`ExitNotifier`] lives per [`Session`](crate::Session). The reaper
//! thread transitions it from `Pending` to either [`ExitNotifierState::Done`]
//! or [`ExitNotifierState::Failed`] exactly once when the child exit (or a
//! reap failure) is observed, then wakes every waiting consumer:
//! [`ExitNotifier::wait_blocking`] for the synchronous
//! [`Session::wait_exit`](crate::Session::wait_exit) path, and
//! [`ExitWaitFuture`] for the async
//! [`Session::wait_exit_async`](crate::Session::wait_exit_async) path.
//!
//! Sync waiters block on a [`Condvar`]; async waiters register a [`Waker`]
//! that the reaper takes when it finalises the state.

use std::sync::{Condvar, Mutex};
use std::task::Waker;
use std::time::{Duration, Instant};

#[cfg(feature = "async")]
use std::future::Future;
#[cfg(feature = "async")]
use std::pin::Pin;
#[cfg(feature = "async")]
use std::sync::Arc;
#[cfg(feature = "async")]
use std::task::{Context, Poll};

use crate::{Error, ExitStatus, Result};

pub(crate) struct ExitNotifier {
    state: Mutex<ExitNotifierState>,
    condvar: Condvar,
}

enum ExitNotifierState {
    /// Reaper has not finalised yet. `wakers` holds one entry per pending
    /// async future, keyed by a monotonic id so a dropped future can
    /// remove its entry without disturbing siblings.
    Pending {
        // Only consulted by the `async` feature's [`ExitWaitFuture`]
        // registration path; the sync `wait_blocking` waiter is parked on
        // the condvar and never touches this field.
        #[cfg_attr(
            not(feature = "async"),
            expect(
                dead_code,
                reason = "key allocator only used by the async ExitWaitFuture path"
            )
        )]
        next_key: u64,
        wakers: Vec<(u64, Waker)>,
    },
    /// Reaper observed the child exit. The status is `Copy`, so every
    /// consumer takes its own copy.
    Done(ExitStatus),
    /// Reap failed, or the [`Session`](crate::Session) was dropped before
    /// exit was observed. Consumers receive a synthetic
    /// [`Error::ExitStatus`] wrapping
    /// [`tastty::Error::ExitStatusUnavailable`]; the original syscall
    /// error (if any) is logged at the reaper boundary but not retained.
    Failed,
}

fn failure_error() -> Error {
    Error::ExitStatus(tastty::Error::ExitStatusUnavailable)
}

impl ExitNotifier {
    pub(crate) fn new() -> Self {
        Self {
            state: Mutex::new(ExitNotifierState::Pending {
                next_key: 0,
                wakers: Vec::new(),
            }),
            condvar: Condvar::new(),
        }
    }

    /// Transition `Pending -> Done(status)` and wake every consumer. A
    /// no-op once the notifier is finalised, so the reaper's
    /// shutdown-fallback path can safely call this after a successful
    /// notify without double-firing.
    pub(crate) fn notify_exit(&self, status: ExitStatus) {
        let wakers = {
            let mut state = self
                .state
                .lock()
                .unwrap_or_else(std::sync::PoisonError::into_inner);
            match std::mem::replace(&mut *state, ExitNotifierState::Done(status)) {
                ExitNotifierState::Pending { wakers, .. } => wakers,
                other => {
                    *state = other;
                    return;
                }
            }
        };
        self.condvar.notify_all();
        for (_, waker) in wakers {
            waker.wake();
        }
    }

    /// Transition `Pending -> Failed` and wake every consumer. Used by
    /// the reaper for both `try_wait` errors and the session-drop
    /// fallback.
    pub(crate) fn notify_failure(&self) {
        let wakers = {
            let mut state = self
                .state
                .lock()
                .unwrap_or_else(std::sync::PoisonError::into_inner);
            match std::mem::replace(&mut *state, ExitNotifierState::Failed) {
                ExitNotifierState::Pending { wakers, .. } => wakers,
                other => {
                    *state = other;
                    return;
                }
            }
        };
        self.condvar.notify_all();
        for (_, waker) in wakers {
            waker.wake();
        }
    }

    /// Block the calling thread until the notifier is finalised. The
    /// caller's context is fully passive: no busy-poll, no per-call
    /// thread - just a `Condvar::wait` that the reaper notifies.
    pub(crate) fn wait_blocking(&self) -> Result<ExitStatus> {
        let mut guard = self
            .state
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        loop {
            match &*guard {
                ExitNotifierState::Done(status) => return Ok(*status),
                ExitNotifierState::Failed => return Err(failure_error()),
                ExitNotifierState::Pending { .. } => {
                    guard = self.condvar.wait(guard).unwrap_or_else(|p| p.into_inner());
                }
            }
        }
    }

    /// Bounded variant of [`wait_blocking`](Self::wait_blocking).
    ///
    /// Returns `Some(result)` when the notifier finalises within
    /// `timeout` (including the case where it is already finalised at
    /// call time), and `None` when the deadline elapses while still
    /// `Pending`. Like [`wait_blocking`](Self::wait_blocking), the wait
    /// is fully passive: the caller parks on the same condvar that the
    /// reaper notifies, with no per-call polling thread.
    pub(crate) fn wait_blocking_timeout(&self, timeout: Duration) -> Option<Result<ExitStatus>> {
        let deadline = Instant::now().checked_add(timeout);
        let mut guard = self
            .state
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        loop {
            match &*guard {
                ExitNotifierState::Done(status) => return Some(Ok(*status)),
                ExitNotifierState::Failed => return Some(Err(failure_error())),
                ExitNotifierState::Pending { .. } => {
                    let remaining = match deadline {
                        Some(deadline) => deadline.saturating_duration_since(Instant::now()),
                        None => Duration::MAX,
                    };
                    if remaining.is_zero() {
                        return None;
                    }
                    let (next, result) = self
                        .condvar
                        .wait_timeout(guard, remaining)
                        .unwrap_or_else(|p| p.into_inner());
                    guard = next;
                    if result.timed_out() && matches!(*guard, ExitNotifierState::Pending { .. }) {
                        return None;
                    }
                }
            }
        }
    }
}

#[cfg(feature = "async")]
impl ExitNotifier {
    /// Either finalise the future (state already `Done` / `Failed`) or
    /// register/refresh its [`Waker`] under `key_slot`. Idempotent on
    /// re-poll: if `key_slot` already names a registered waker, only the
    /// stored `Waker` is updated.
    fn poll_status(&self, key_slot: &mut Option<u64>, waker: &Waker) -> Poll<Result<ExitStatus>> {
        let mut state = self
            .state
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        match &mut *state {
            ExitNotifierState::Done(status) => Poll::Ready(Ok(*status)),
            ExitNotifierState::Failed => Poll::Ready(Err(failure_error())),
            ExitNotifierState::Pending { next_key, wakers } => {
                if let Some(key) = *key_slot {
                    for (k, w) in wakers.iter_mut() {
                        if *k == key {
                            if !w.will_wake(waker) {
                                *w = waker.clone();
                            }
                            return Poll::Pending;
                        }
                    }
                }
                let key = *next_key;
                *next_key += 1;
                wakers.push((key, waker.clone()));
                *key_slot = Some(key);
                Poll::Pending
            }
        }
    }

    fn unregister(&self, key: u64) {
        let mut state = self
            .state
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        if let ExitNotifierState::Pending { wakers, .. } = &mut *state {
            wakers.retain(|(k, _)| *k != key);
        }
    }
}

/// Future returned by
/// [`Session::wait_exit_async`](crate::Session::wait_exit_async).
///
/// Holds an [`Arc`] of the session's [`ExitNotifier`] and a slot for the
/// monotonic key under which its [`Waker`] is registered. The first poll
/// installs the waker; subsequent polls refresh it in place to handle
/// executor migration. When the session's reaper finalises the notifier,
/// every registered waker fires and each future's next poll observes the
/// terminal state.
///
/// Dropping the future removes its waker entry from the notifier so a
/// later wake does not visit a slot whose future is gone.
#[cfg(feature = "async")]
pub(crate) struct ExitWaitFuture {
    notifier: Arc<ExitNotifier>,
    key: Option<u64>,
    done: bool,
}

#[cfg(feature = "async")]
impl ExitWaitFuture {
    pub(crate) fn new(notifier: Arc<ExitNotifier>) -> Self {
        Self {
            notifier,
            key: None,
            done: false,
        }
    }
}

#[cfg(feature = "async")]
impl Future for ExitWaitFuture {
    type Output = Result<ExitStatus>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        assert!(!this.done, "ExitWaitFuture polled after completion");
        match this.notifier.poll_status(&mut this.key, cx.waker()) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(result) => {
                this.done = true;
                // Notify drained the wakers vec already; nothing left to
                // remove on drop.
                this.key = None;
                Poll::Ready(result)
            }
        }
    }
}

#[cfg(feature = "async")]
impl Drop for ExitWaitFuture {
    fn drop(&mut self) {
        if let Some(key) = self.key {
            self.notifier.unregister(key);
        }
    }
}