running-process 4.4.0

Subprocess and PTY runtime for the running-process project
Documentation
//! Phase 1 of #221: the process-observation capability model and the
//! portable process-lifecycle baseline.
//!
//! This module defines the stable observation types — [`ObserverConfig`],
//! [`ObserverCapabilities`], [`ObserverEvent`], and the
//! [`ObserverSubscriber`] handle — plus the always-available lifecycle
//! backend that emits [`started`](ObserverEventKind::Started) and
//! [`exited`](ObserverEventKind::Exited) events for child processes spawned
//! by this crate.
//!
//! ## Scope (Phase 1 only)
//!
//! Only the [`EventCategory::Lifecycle`] category is
//! [`supported`](CapabilitySupport::Supported). Every other category
//! ([`File`](EventCategory::File), [`Network`](EventCategory::Network),
//! [`Process`](EventCategory::Process)) reports
//! [`unavailable`](CapabilitySupport::Unavailable) with an honest reason,
//! because syscall-level backends (seccomp/eBPF/ETW) are Phase 3 work and
//! are deliberately not wired here.
//!
//! ## Off by default
//!
//! Observation is entirely opt-in. A [`NativeProcess`](crate::NativeProcess)
//! emits no events unless an [`ObserverConfig`] is attached via
//! [`NativeProcess::with_observer`](crate::NativeProcess::with_observer) (or
//! the equivalent builder seam). With no observer configured the lifecycle
//! hooks are inert: no channel, no allocation, no events.
//!
//! The handle is a plain `std::sync::mpsc` receiver so the lifecycle
//! baseline stays free of the daemon runtime (tokio/IPC). Phase 2 layers the
//! daemon-owned subscriber model on top of these same event types.

use std::sync::mpsc::{Receiver, Sender};
use std::time::{SystemTime, UNIX_EPOCH};

/// Category of observable process activity.
///
/// Phase 1 only implements [`Lifecycle`](Self::Lifecycle). The remaining
/// categories exist so capability negotiation can report them as
/// `unavailable` with an honest reason until their Phase 3 platform backends
/// land.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EventCategory {
    /// Process start and exit for children spawned by this crate.
    Lifecycle,
    /// Filesystem activity (open/read/write/unlink). Requires a Phase 3
    /// platform backend.
    File,
    /// Network activity (connect/accept/send/recv). Requires a Phase 3
    /// platform backend.
    Network,
    /// Descendant process creation outside the crate's own spawn path.
    /// Requires a Phase 3 platform backend.
    Process,
}

impl EventCategory {
    /// All categories the capability matrix reports on, in a stable order.
    pub const ALL: [EventCategory; 4] = [
        EventCategory::Lifecycle,
        EventCategory::File,
        EventCategory::Network,
        EventCategory::Process,
    ];

    /// Return the stable lowercase category name.
    pub fn as_str(self) -> &'static str {
        match self {
            EventCategory::Lifecycle => "lifecycle",
            EventCategory::File => "file",
            EventCategory::Network => "network",
            EventCategory::Process => "process",
        }
    }
}

/// Negotiated support level for a single [`EventCategory`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CapabilitySupport {
    /// The category is fully observable on this platform.
    Supported,
    /// The category is observable but with documented gaps or caveats.
    Partial,
    /// The category cannot be observed by the active backend set.
    Unavailable,
}

impl CapabilitySupport {
    /// Return the stable lowercase support-level name.
    pub fn as_str(self) -> &'static str {
        match self {
            CapabilitySupport::Supported => "supported",
            CapabilitySupport::Partial => "partial",
            CapabilitySupport::Unavailable => "unavailable",
        }
    }
}

/// Capability report for one [`EventCategory`]: the negotiated support
/// level, the backend that would serve it, and a human-readable reason.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CategoryCapability {
    /// Which category this entry describes.
    pub category: EventCategory,
    /// Negotiated support level.
    pub support: CapabilitySupport,
    /// Name of the backend serving (or that would serve) this category.
    pub backend: &'static str,
    /// Human-readable explanation, especially for `Partial`/`Unavailable`.
    pub reason: &'static str,
}

/// The full capability matrix produced by [`ObserverCapabilities::negotiate`].
///
/// Each [`EventCategory`] appears exactly once. Phase 1 reports
/// [`Lifecycle`](EventCategory::Lifecycle) as
/// [`Supported`](CapabilitySupport::Supported) and the rest as
/// [`Unavailable`](CapabilitySupport::Unavailable).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObserverCapabilities {
    categories: Vec<CategoryCapability>,
}

impl ObserverCapabilities {
    /// Negotiate the capability matrix for the current platform.
    ///
    /// Phase 1 is platform-agnostic: the portable lifecycle baseline is
    /// `Supported` on Windows, macOS, and Linux, and all syscall-level
    /// categories are honestly `Unavailable` pending Phase 3 backends.
    pub fn negotiate() -> Self {
        let categories = EventCategory::ALL
            .iter()
            .map(|&category| match category {
                EventCategory::Lifecycle => CategoryCapability {
                    category,
                    support: CapabilitySupport::Supported,
                    backend: "portable-lifecycle",
                    reason: "started/exited emitted from the crate spawn and reap path",
                },
                EventCategory::File => CategoryCapability {
                    category,
                    support: CapabilitySupport::Unavailable,
                    backend: "none",
                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
                },
                EventCategory::Network => CategoryCapability {
                    category,
                    support: CapabilitySupport::Unavailable,
                    backend: "none",
                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
                },
                EventCategory::Process => CategoryCapability {
                    category,
                    support: CapabilitySupport::Unavailable,
                    backend: "none",
                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
                },
            })
            .collect();
        Self { categories }
    }

    /// Return the capability entries in stable [`EventCategory::ALL`] order.
    pub fn categories(&self) -> &[CategoryCapability] {
        &self.categories
    }

    /// Look up the capability entry for one category.
    pub fn category(&self, category: EventCategory) -> &CategoryCapability {
        self.categories
            .iter()
            .find(|entry| entry.category == category)
            .expect("ObserverCapabilities always contains every EventCategory")
    }

    /// Return the negotiated support level for one category.
    pub fn support(&self, category: EventCategory) -> CapabilitySupport {
        self.category(category).support
    }

    /// Return whether a category is fully [`Supported`](CapabilitySupport::Supported).
    pub fn is_supported(&self, category: EventCategory) -> bool {
        self.support(category) == CapabilitySupport::Supported
    }
}

/// What happened to an observed process.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ObserverEventKind {
    /// The child process was spawned. Carries no extra payload.
    Started,
    /// The child process exited. Carries the OS exit code (Unix signal
    /// exits are negative signal numbers, matching the rest of the crate).
    Exited {
        /// Exit code of the child.
        exit_code: i32,
    },
}

impl ObserverEventKind {
    /// Return the stable lowercase event-kind name.
    pub fn as_str(&self) -> &'static str {
        match self {
            ObserverEventKind::Started => "started",
            ObserverEventKind::Exited { .. } => "exited",
        }
    }
}

/// A single observation emitted by the lifecycle baseline.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObserverEvent {
    /// Which category produced the event. Always
    /// [`EventCategory::Lifecycle`] in Phase 1.
    pub category: EventCategory,
    /// What happened.
    pub kind: ObserverEventKind,
    /// OS process id of the observed child.
    pub pid: u32,
    /// Milliseconds since the Unix epoch when the event was recorded.
    pub timestamp_ms: u128,
}

impl ObserverEvent {
    /// Construct an event, stamping it with the current wall-clock time.
    fn now(category: EventCategory, kind: ObserverEventKind, pid: u32) -> Self {
        let timestamp_ms = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis())
            .unwrap_or(0);
        Self {
            category,
            kind,
            pid,
            timestamp_ms,
        }
    }
}

/// Opt-in configuration that turns process observation on for a single
/// [`NativeProcess`](crate::NativeProcess).
///
/// Constructing a config does not by itself observe anything; it is attached
/// to a process via
/// [`NativeProcess::with_observer`](crate::NativeProcess::with_observer).
/// With no config attached, the process emits no events (off by default).
#[derive(Debug, Clone)]
pub struct ObserverConfig {
    categories: Vec<EventCategory>,
}

impl ObserverConfig {
    /// Create a config that observes only the Phase 1 lifecycle baseline.
    ///
    /// This is the recommended Phase 1 constructor: it requests exactly the
    /// category that is actually `Supported`.
    pub fn lifecycle() -> Self {
        Self {
            categories: vec![EventCategory::Lifecycle],
        }
    }

    /// Create a config requesting an explicit set of categories.
    ///
    /// Categories that are not `Supported` on this platform simply never
    /// produce events in Phase 1; callers should consult
    /// [`ObserverCapabilities::negotiate`] to learn which ones are honored.
    pub fn with_categories(categories: impl IntoIterator<Item = EventCategory>) -> Self {
        Self {
            categories: categories.into_iter().collect(),
        }
    }

    /// Return whether this config requested observation of `category`.
    pub fn observes(&self, category: EventCategory) -> bool {
        self.categories.contains(&category)
    }

    /// The categories this config requested, in insertion order.
    pub fn categories(&self) -> &[EventCategory] {
        &self.categories
    }
}

/// Receiver handle for observation events.
///
/// Returned by
/// [`NativeProcess::with_observer`](crate::NativeProcess::with_observer).
/// Dropping the subscriber detaches it; the emitter tolerates a closed
/// channel and never blocks on a slow or absent consumer.
pub struct ObserverSubscriber {
    rx: Receiver<ObserverEvent>,
}

impl ObserverSubscriber {
    /// Receive the next event, blocking until one arrives or the emitter is
    /// dropped. Returns `None` once no more events can arrive.
    pub fn recv(&self) -> Option<ObserverEvent> {
        self.rx.recv().ok()
    }

    /// Try to receive an event without blocking.
    pub fn try_recv(&self) -> Option<ObserverEvent> {
        self.rx.try_recv().ok()
    }

    /// Drain all currently-queued events without blocking.
    pub fn drain(&self) -> Vec<ObserverEvent> {
        let mut events = Vec::new();
        while let Ok(event) = self.rx.try_recv() {
            events.push(event);
        }
        events
    }

    /// Borrow the underlying receiver for advanced use (e.g. `iter`/`select`).
    pub fn receiver(&self) -> &Receiver<ObserverEvent> {
        &self.rx
    }
}

/// Internal emitter held by a [`NativeProcess`](crate::NativeProcess) when an
/// [`ObserverConfig`] is attached.
///
/// `None` on a process means observation is off, so the lifecycle hooks are
/// inert. This keeps the off-by-default path allocation-free.
pub(crate) struct ObserverEmitter {
    config: ObserverConfig,
    tx: Sender<ObserverEvent>,
}

impl ObserverEmitter {
    /// Build an emitter from a config and hand back the paired subscriber.
    pub(crate) fn new(config: ObserverConfig) -> (Self, ObserverSubscriber) {
        let (tx, rx) = std::sync::mpsc::channel();
        (Self { config, tx }, ObserverSubscriber { rx })
    }

    /// Emit a `started` event for `pid` if the config observes lifecycle.
    pub(crate) fn emit_started(&self, pid: u32) {
        if !self.config.observes(EventCategory::Lifecycle) {
            return;
        }
        // Ignore send errors: a dropped subscriber must never break the
        // process spawn/reap path.
        let _ = self.tx.send(ObserverEvent::now(
            EventCategory::Lifecycle,
            ObserverEventKind::Started,
            pid,
        ));
    }

    /// Emit an `exited` event for `pid` if the config observes lifecycle.
    pub(crate) fn emit_exited(&self, pid: u32, exit_code: i32) {
        if !self.config.observes(EventCategory::Lifecycle) {
            return;
        }
        let _ = self.tx.send(ObserverEvent::now(
            EventCategory::Lifecycle,
            ObserverEventKind::Exited { exit_code },
            pid,
        ));
    }
}

#[cfg(test)]
mod tests;