Skip to main content

running_process/observer/
mod.rs

1//! Phase 1 of #221: the process-observation capability model and the
2//! portable process-lifecycle baseline.
3//!
4//! This module defines the stable observation types — [`ObserverConfig`],
5//! [`ObserverCapabilities`], [`ObserverEvent`], and the
6//! [`ObserverSubscriber`] handle — plus the always-available lifecycle
7//! backend that emits [`started`](ObserverEventKind::Started) and
8//! [`exited`](ObserverEventKind::Exited) events for child processes spawned
9//! by this crate.
10//!
11//! ## Scope (Phase 1 only)
12//!
13//! Only the [`EventCategory::Lifecycle`] category is
14//! [`supported`](CapabilitySupport::Supported). Every other category
15//! ([`File`](EventCategory::File), [`Network`](EventCategory::Network),
16//! [`Process`](EventCategory::Process)) reports
17//! [`unavailable`](CapabilitySupport::Unavailable) with an honest reason,
18//! because syscall-level backends (seccomp/eBPF/ETW) are Phase 3 work and
19//! are deliberately not wired here.
20//!
21//! ## Off by default
22//!
23//! Observation is entirely opt-in. A [`NativeProcess`](crate::NativeProcess)
24//! emits no events unless an [`ObserverConfig`] is attached via
25//! [`NativeProcess::with_observer`](crate::NativeProcess::with_observer) (or
26//! the equivalent builder seam). With no observer configured the lifecycle
27//! hooks are inert: no channel, no allocation, no events.
28//!
29//! The handle is a plain `std::sync::mpsc` receiver so the lifecycle
30//! baseline stays free of the daemon runtime (tokio/IPC). Phase 2 layers the
31//! daemon-owned subscriber model on top of these same event types.
32
33use std::sync::mpsc::{Receiver, Sender};
34use std::time::{SystemTime, UNIX_EPOCH};
35
36/// Category of observable process activity.
37///
38/// Phase 1 only implements [`Lifecycle`](Self::Lifecycle). The remaining
39/// categories exist so capability negotiation can report them as
40/// `unavailable` with an honest reason until their Phase 3 platform backends
41/// land.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub enum EventCategory {
44    /// Process start and exit for children spawned by this crate.
45    Lifecycle,
46    /// Filesystem activity (open/read/write/unlink). Requires a Phase 3
47    /// platform backend.
48    File,
49    /// Network activity (connect/accept/send/recv). Requires a Phase 3
50    /// platform backend.
51    Network,
52    /// Descendant process creation outside the crate's own spawn path.
53    /// Requires a Phase 3 platform backend.
54    Process,
55}
56
57impl EventCategory {
58    /// All categories the capability matrix reports on, in a stable order.
59    pub const ALL: [EventCategory; 4] = [
60        EventCategory::Lifecycle,
61        EventCategory::File,
62        EventCategory::Network,
63        EventCategory::Process,
64    ];
65
66    /// Return the stable lowercase category name.
67    pub fn as_str(self) -> &'static str {
68        match self {
69            EventCategory::Lifecycle => "lifecycle",
70            EventCategory::File => "file",
71            EventCategory::Network => "network",
72            EventCategory::Process => "process",
73        }
74    }
75}
76
77/// Negotiated support level for a single [`EventCategory`].
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum CapabilitySupport {
80    /// The category is fully observable on this platform.
81    Supported,
82    /// The category is observable but with documented gaps or caveats.
83    Partial,
84    /// The category cannot be observed by the active backend set.
85    Unavailable,
86}
87
88impl CapabilitySupport {
89    /// Return the stable lowercase support-level name.
90    pub fn as_str(self) -> &'static str {
91        match self {
92            CapabilitySupport::Supported => "supported",
93            CapabilitySupport::Partial => "partial",
94            CapabilitySupport::Unavailable => "unavailable",
95        }
96    }
97}
98
99/// Capability report for one [`EventCategory`]: the negotiated support
100/// level, the backend that would serve it, and a human-readable reason.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct CategoryCapability {
103    /// Which category this entry describes.
104    pub category: EventCategory,
105    /// Negotiated support level.
106    pub support: CapabilitySupport,
107    /// Name of the backend serving (or that would serve) this category.
108    pub backend: &'static str,
109    /// Human-readable explanation, especially for `Partial`/`Unavailable`.
110    pub reason: &'static str,
111}
112
113/// The full capability matrix produced by [`ObserverCapabilities::negotiate`].
114///
115/// Each [`EventCategory`] appears exactly once. Phase 1 reports
116/// [`Lifecycle`](EventCategory::Lifecycle) as
117/// [`Supported`](CapabilitySupport::Supported) and the rest as
118/// [`Unavailable`](CapabilitySupport::Unavailable).
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct ObserverCapabilities {
121    categories: Vec<CategoryCapability>,
122}
123
124impl ObserverCapabilities {
125    /// Negotiate the capability matrix for the current platform.
126    ///
127    /// Phase 1 is platform-agnostic: the portable lifecycle baseline is
128    /// `Supported` on Windows, macOS, and Linux, and all syscall-level
129    /// categories are honestly `Unavailable` pending Phase 3 backends.
130    pub fn negotiate() -> Self {
131        let categories = EventCategory::ALL
132            .iter()
133            .map(|&category| match category {
134                EventCategory::Lifecycle => CategoryCapability {
135                    category,
136                    support: CapabilitySupport::Supported,
137                    backend: "portable-lifecycle",
138                    reason: "started/exited emitted from the crate spawn and reap path",
139                },
140                EventCategory::File => CategoryCapability {
141                    category,
142                    support: CapabilitySupport::Unavailable,
143                    backend: "none",
144                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
145                },
146                EventCategory::Network => CategoryCapability {
147                    category,
148                    support: CapabilitySupport::Unavailable,
149                    backend: "none",
150                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
151                },
152                EventCategory::Process => CategoryCapability {
153                    category,
154                    support: CapabilitySupport::Unavailable,
155                    backend: "none",
156                    reason: "requires Phase 3 platform backend (seccomp/eBPF/ETW)",
157                },
158            })
159            .collect();
160        Self { categories }
161    }
162
163    /// Return the capability entries in stable [`EventCategory::ALL`] order.
164    pub fn categories(&self) -> &[CategoryCapability] {
165        &self.categories
166    }
167
168    /// Look up the capability entry for one category.
169    pub fn category(&self, category: EventCategory) -> &CategoryCapability {
170        self.categories
171            .iter()
172            .find(|entry| entry.category == category)
173            .expect("ObserverCapabilities always contains every EventCategory")
174    }
175
176    /// Return the negotiated support level for one category.
177    pub fn support(&self, category: EventCategory) -> CapabilitySupport {
178        self.category(category).support
179    }
180
181    /// Return whether a category is fully [`Supported`](CapabilitySupport::Supported).
182    pub fn is_supported(&self, category: EventCategory) -> bool {
183        self.support(category) == CapabilitySupport::Supported
184    }
185}
186
187/// What happened to an observed process.
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub enum ObserverEventKind {
190    /// The child process was spawned. Carries no extra payload.
191    Started,
192    /// The child process exited. Carries the OS exit code (Unix signal
193    /// exits are negative signal numbers, matching the rest of the crate).
194    Exited {
195        /// Exit code of the child.
196        exit_code: i32,
197    },
198}
199
200impl ObserverEventKind {
201    /// Return the stable lowercase event-kind name.
202    pub fn as_str(&self) -> &'static str {
203        match self {
204            ObserverEventKind::Started => "started",
205            ObserverEventKind::Exited { .. } => "exited",
206        }
207    }
208}
209
210/// A single observation emitted by the lifecycle baseline.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct ObserverEvent {
213    /// Which category produced the event. Always
214    /// [`EventCategory::Lifecycle`] in Phase 1.
215    pub category: EventCategory,
216    /// What happened.
217    pub kind: ObserverEventKind,
218    /// OS process id of the observed child.
219    pub pid: u32,
220    /// Milliseconds since the Unix epoch when the event was recorded.
221    pub timestamp_ms: u128,
222}
223
224impl ObserverEvent {
225    /// Construct an event, stamping it with the current wall-clock time.
226    fn now(category: EventCategory, kind: ObserverEventKind, pid: u32) -> Self {
227        let timestamp_ms = SystemTime::now()
228            .duration_since(UNIX_EPOCH)
229            .map(|d| d.as_millis())
230            .unwrap_or(0);
231        Self {
232            category,
233            kind,
234            pid,
235            timestamp_ms,
236        }
237    }
238}
239
240/// Opt-in configuration that turns process observation on for a single
241/// [`NativeProcess`](crate::NativeProcess).
242///
243/// Constructing a config does not by itself observe anything; it is attached
244/// to a process via
245/// [`NativeProcess::with_observer`](crate::NativeProcess::with_observer).
246/// With no config attached, the process emits no events (off by default).
247#[derive(Debug, Clone)]
248pub struct ObserverConfig {
249    categories: Vec<EventCategory>,
250}
251
252impl ObserverConfig {
253    /// Create a config that observes only the Phase 1 lifecycle baseline.
254    ///
255    /// This is the recommended Phase 1 constructor: it requests exactly the
256    /// category that is actually `Supported`.
257    pub fn lifecycle() -> Self {
258        Self {
259            categories: vec![EventCategory::Lifecycle],
260        }
261    }
262
263    /// Create a config requesting an explicit set of categories.
264    ///
265    /// Categories that are not `Supported` on this platform simply never
266    /// produce events in Phase 1; callers should consult
267    /// [`ObserverCapabilities::negotiate`] to learn which ones are honored.
268    pub fn with_categories(categories: impl IntoIterator<Item = EventCategory>) -> Self {
269        Self {
270            categories: categories.into_iter().collect(),
271        }
272    }
273
274    /// Return whether this config requested observation of `category`.
275    pub fn observes(&self, category: EventCategory) -> bool {
276        self.categories.contains(&category)
277    }
278
279    /// The categories this config requested, in insertion order.
280    pub fn categories(&self) -> &[EventCategory] {
281        &self.categories
282    }
283}
284
285/// Receiver handle for observation events.
286///
287/// Returned by
288/// [`NativeProcess::with_observer`](crate::NativeProcess::with_observer).
289/// Dropping the subscriber detaches it; the emitter tolerates a closed
290/// channel and never blocks on a slow or absent consumer.
291pub struct ObserverSubscriber {
292    rx: Receiver<ObserverEvent>,
293}
294
295impl ObserverSubscriber {
296    /// Receive the next event, blocking until one arrives or the emitter is
297    /// dropped. Returns `None` once no more events can arrive.
298    pub fn recv(&self) -> Option<ObserverEvent> {
299        self.rx.recv().ok()
300    }
301
302    /// Try to receive an event without blocking.
303    pub fn try_recv(&self) -> Option<ObserverEvent> {
304        self.rx.try_recv().ok()
305    }
306
307    /// Drain all currently-queued events without blocking.
308    pub fn drain(&self) -> Vec<ObserverEvent> {
309        let mut events = Vec::new();
310        while let Ok(event) = self.rx.try_recv() {
311            events.push(event);
312        }
313        events
314    }
315
316    /// Borrow the underlying receiver for advanced use (e.g. `iter`/`select`).
317    pub fn receiver(&self) -> &Receiver<ObserverEvent> {
318        &self.rx
319    }
320}
321
322/// Internal emitter held by a [`NativeProcess`](crate::NativeProcess) when an
323/// [`ObserverConfig`] is attached.
324///
325/// `None` on a process means observation is off, so the lifecycle hooks are
326/// inert. This keeps the off-by-default path allocation-free.
327pub(crate) struct ObserverEmitter {
328    config: ObserverConfig,
329    tx: Sender<ObserverEvent>,
330}
331
332impl ObserverEmitter {
333    /// Build an emitter from a config and hand back the paired subscriber.
334    pub(crate) fn new(config: ObserverConfig) -> (Self, ObserverSubscriber) {
335        let (tx, rx) = std::sync::mpsc::channel();
336        (Self { config, tx }, ObserverSubscriber { rx })
337    }
338
339    /// Emit a `started` event for `pid` if the config observes lifecycle.
340    pub(crate) fn emit_started(&self, pid: u32) {
341        if !self.config.observes(EventCategory::Lifecycle) {
342            return;
343        }
344        // Ignore send errors: a dropped subscriber must never break the
345        // process spawn/reap path.
346        let _ = self.tx.send(ObserverEvent::now(
347            EventCategory::Lifecycle,
348            ObserverEventKind::Started,
349            pid,
350        ));
351    }
352
353    /// Emit an `exited` event for `pid` if the config observes lifecycle.
354    pub(crate) fn emit_exited(&self, pid: u32, exit_code: i32) {
355        if !self.config.observes(EventCategory::Lifecycle) {
356            return;
357        }
358        let _ = self.tx.send(ObserverEvent::now(
359            EventCategory::Lifecycle,
360            ObserverEventKind::Exited { exit_code },
361            pid,
362        ));
363    }
364}
365
366#[cfg(test)]
367mod tests;