Skip to main content

nv_core/
health.rs

1//! Health events, stop reasons, and decode outcome/preference classification.
2//!
3//! [`HealthEvent`] is the primary mechanism for observing runtime behavior.
4//! Events are broadcast to subscribers via a channel. They cover source
5//! lifecycle, stage errors, feed restarts, backpressure, and view-state changes.
6//!
7//! [`DecodeOutcome`] provides a backend-neutral classification of which
8//! decoder class is in effect (hardware, software, or unknown).
9//!
10//! [`DecodePreference`] is the user-facing decode strategy selection
11//! (auto, CPU-only, prefer-hardware, require-hardware).
12
13use crate::error::{MediaError, StageError};
14use crate::id::{FeedId, StageId};
15
16// ---------------------------------------------------------------------------
17// Decode outcome — backend-neutral decoder classification
18// ---------------------------------------------------------------------------
19
20/// Backend-neutral classification of the effective decoder.
21///
22/// After the media backend negotiates a decoder for a stream, this
23/// categorises the result without exposing backend element names, GPU
24/// memory modes, or inference-framework details.
25///
26/// Used in [`HealthEvent::DecodeDecision`] and the internal
27/// `DecodeDecisionInfo` diagnostic report.
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
29pub enum DecodeOutcome {
30    /// A hardware-accelerated video decoder is in use.
31    Hardware,
32    /// A software (CPU-only) video decoder is in use.
33    Software,
34    /// The backend could not determine which decoder class is active.
35    ///
36    /// This can happen with custom pipeline fragments or when the
37    /// backend does not expose decoder identity.
38    Unknown,
39}
40
41impl std::fmt::Display for DecodeOutcome {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            Self::Hardware => f.write_str("Hardware"),
45            Self::Software => f.write_str("Software"),
46            Self::Unknown => f.write_str("Unknown"),
47        }
48    }
49}
50
51// ---------------------------------------------------------------------------
52// Decode preference — user-facing decode strategy
53// ---------------------------------------------------------------------------
54
55/// User-facing decode preference for a feed.
56///
57/// Controls which decoder strategy the media backend uses when constructing
58/// the decode pipeline. The default is [`Auto`](Self::Auto), which preserves
59/// the backend's existing selection heuristic.
60///
61/// This type is backend-neutral — it does not expose GStreamer element names,
62/// GPU memory modes, or inference-framework details.
63///
64/// | Variant | Behavior |
65/// |---|---|
66/// | `Auto` | Backend picks the best available decoder (default). |
67/// | `CpuOnly` | Force software decoding — never use a hardware decoder. |
68/// | `PreferHardware` | Try hardware first; fall back to software silently. |
69/// | `RequireHardware` | Demand hardware decoding; fail-fast if unavailable. |
70#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
71pub enum DecodePreference {
72    /// Automatically select the best decoder: prefer hardware, fall back to
73    /// software. This is the current default behavior preserved exactly.
74    #[default]
75    Auto,
76
77    /// Force software decoding. The backend must never attempt a hardware
78    /// decoder. Useful in environments without GPU access or where
79    /// deterministic CPU-only behaviour is required.
80    CpuOnly,
81
82    /// Prefer hardware decoding, but fall back to software silently if no
83    /// hardware decoder is available. No error is raised on fallback.
84    PreferHardware,
85
86    /// Require hardware decoding. If no hardware decoder is available, the
87    /// backend must fail-fast with a [`MediaError`]
88    /// instead of silently falling back to software.
89    RequireHardware,
90}
91
92impl std::fmt::Display for DecodePreference {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        match self {
95            Self::Auto => f.write_str("Auto"),
96            Self::CpuOnly => f.write_str("CpuOnly"),
97            Self::PreferHardware => f.write_str("PreferHardware"),
98            Self::RequireHardware => f.write_str("RequireHardware"),
99        }
100    }
101}
102
103/// A health event emitted by the runtime.
104///
105/// Subscribe via [`Runtime::health_subscribe()`](crate) (aggregate).
106/// Per-feed filtering is the subscriber's responsibility.
107#[derive(Debug, Clone)]
108pub enum HealthEvent {
109    /// The video source connected successfully.
110    SourceConnected { feed_id: FeedId },
111
112    /// The video source disconnected.
113    SourceDisconnected { feed_id: FeedId, reason: MediaError },
114
115    /// The video source is attempting to reconnect.
116    SourceReconnecting { feed_id: FeedId, attempt: u32 },
117
118    /// An RTSP source is using insecure (non-TLS) transport.
119    ///
120    /// Emitted once per session start when the effective RTSP URL uses
121    /// `rtsp://` instead of `rtsps://`. This is informational — the feed
122    /// still operates, but the operator should consider migrating the
123    /// camera to TLS or a firewalled network segment.
124    ///
125    /// Not emitted when `RtspSecurityPolicy::RequireTls` is set (that
126    /// policy rejects insecure sources at config time).
127    InsecureRtspSource {
128        feed_id: FeedId,
129        /// Redacted URL (credentials stripped) for operator diagnostics.
130        redacted_url: String,
131    },
132
133    /// A stage returned an error for a single frame.
134    /// The frame was dropped; the feed continues.
135    StageError {
136        feed_id: FeedId,
137        stage_id: StageId,
138        error: StageError,
139    },
140
141    /// A stage panicked. The feed will restart per its restart policy.
142    StagePanic { feed_id: FeedId, stage_id: StageId },
143
144    /// The feed is restarting.
145    FeedRestarting { feed_id: FeedId, restart_count: u32 },
146
147    /// The feed has stopped permanently.
148    FeedStopped { feed_id: FeedId, reason: StopReason },
149
150    /// Frames were dropped due to backpressure.
151    BackpressureDrop {
152        feed_id: FeedId,
153        frames_dropped: u64,
154    },
155
156    /// Frames are being processed with significant wall-clock staleness.
157    ///
158    /// Emitted when the age of a frame at processing time (wall-clock
159    /// now minus the wall-clock timestamp assigned at media bridge)
160    /// exceeds a threshold, indicating the consumer is falling behind
161    /// the source — typically due to buffer-pool starvation,
162    /// inference backlog, or TCP accumulation.
163    ///
164    /// Events are coalesced: under sustained lag, the executor emits
165    /// one event per throttle window (1 second) with `frames_lagged`
166    /// reflecting the number of stale frames in that window.
167    FrameLag {
168        feed_id: FeedId,
169        /// Frame age of the most recent stale frame, in milliseconds.
170        frame_age_ms: u64,
171        /// Number of frames exceeding the threshold since the last
172        /// `FrameLag` event (per-event delta).
173        frames_lagged: u64,
174    },
175
176    /// The view epoch changed (camera discontinuity detected).
177    ViewEpochChanged {
178        feed_id: FeedId,
179        /// New epoch value (from `nv-view`, represented as u64 here to avoid
180        /// circular dependency; re-exported with proper type in `nv-runtime`).
181        epoch: u64,
182    },
183
184    /// The view validity has degraded due to camera motion.
185    ViewDegraded {
186        feed_id: FeedId,
187        stability_score: f32,
188    },
189
190    /// A compensation transform was applied to active tracks.
191    ViewCompensationApplied { feed_id: FeedId, epoch: u64 },
192
193    /// The output broadcast channel is saturated — the internal sentinel
194    /// receiver observed ring-buffer wrap.
195    ///
196    /// An internal sentinel receiver (the slowest possible consumer)
197    /// monitors the aggregate output channel. When production outpaces
198    /// the sentinel's drain interval, the ring buffer wraps past the
199    /// sentinel's read position and `messages_lost` reports how many
200    /// messages the sentinel missed.
201    ///
202    /// **What this means:** the channel is under backpressure. The
203    /// sentinel observes worst-case wrap — it does **not** prove that
204    /// any specific external subscriber lost messages. A subscriber
205    /// consuming faster than the sentinel will experience less (or no)
206    /// loss. Treat this as a saturation / capacity warning, not a
207    /// per-subscriber loss report.
208    ///
209    /// **Attribution:** this is a runtime-global event. The output
210    /// channel is shared across all feeds, so saturation is not
211    /// attributable to any single feed.
212    ///
213    /// **`messages_lost` semantics:** per-event delta — the number of
214    /// messages the sentinel missed since the previous `OutputLagged`
215    /// event (or since runtime start for the first event). This is
216    /// **not** a cumulative total and **not** a count of messages lost
217    /// by external subscribers.
218    ///
219    /// **Throttling:** events are coalesced to prevent storms. The
220    /// runtime emits one event on the initial saturation transition,
221    /// then at most one per second during sustained saturation, each
222    /// carrying the accumulated sentinel-observed delta for that
223    /// interval.
224    ///
225    /// **Action:** consider increasing `output_capacity` or improving
226    /// subscriber throughput.
227    OutputLagged {
228        /// Sentinel-observed ring-buffer wrap since the previous
229        /// `OutputLagged` event (per-event delta, not cumulative).
230        /// Reflects channel saturation, not guaranteed per-subscriber
231        /// loss.
232        messages_lost: u64,
233    },
234
235    /// The source reached end-of-stream (file sources).
236    ///
237    /// For non-looping file sources this is terminal: the feed stops
238    /// with [`StopReason::EndOfStream`] rather than restarting.
239    SourceEos { feed_id: FeedId },
240
241    /// A decode decision was made for a feed's video stream.
242    ///
243    /// Emitted once per session start (not per frame) when the backend
244    /// identifies which decoder was selected for the stream. Provides
245    /// visibility into hardware vs. software decode selection without
246    /// exposing backend-specific element names in required fields.
247    ///
248    /// `detail` is a backend-specific diagnostic string (e.g., the
249    /// GStreamer element name). It is intended for logging — do not
250    /// match on its contents.
251    DecodeDecision {
252        feed_id: FeedId,
253        /// Hardware, Software, or Unknown.
254        outcome: DecodeOutcome,
255        /// The user-requested decode preference that was in effect.
256        preference: DecodePreference,
257        /// Whether the adaptive fallback cache overrode the requested
258        /// preference for this session.
259        fallback_active: bool,
260        /// Human-readable reason for fallback (populated when
261        /// `fallback_active` is `true`).
262        fallback_reason: Option<String>,
263        /// Backend-specific diagnostic detail (element name etc.).
264        detail: String,
265    },
266
267    /// The per-feed `OutputSink` panicked during `emit()`.
268    ///
269    /// The output is dropped but the feed continues processing.
270    /// The runtime wraps `OutputSink::emit()` in `catch_unwind` to
271    /// prevent a misbehaving sink from tearing down the worker thread.
272    SinkPanic { feed_id: FeedId },
273
274    /// The per-feed sink worker did not shut down within the timeout.
275    ///
276    /// The sink thread is detached and a placeholder sink is installed.
277    /// This typically means `OutputSink::emit()` is blocked on
278    /// downstream I/O. Distinct from [`SinkPanic`](Self::SinkPanic)
279    /// to allow operators to route hung-sink alerts separately from
280    /// crash alerts.
281    SinkTimeout { feed_id: FeedId },
282
283    /// The per-feed sink queue is full — output was dropped.
284    ///
285    /// The feed continues processing; only the output delivery is
286    /// dropped to prevent slow downstream I/O from blocking the
287    /// perception pipeline.
288    ///
289    /// `outputs_dropped` is the number of outputs dropped since the
290    /// last `SinkBackpressure` event (per-event delta).
291    SinkBackpressure {
292        feed_id: FeedId,
293        outputs_dropped: u64,
294    },
295
296    /// Tracks were rejected by the temporal store's admission
297    /// control because the hard cap was reached and no evictable
298    /// candidates (Lost/Coasted/Tentative) were available.
299    ///
300    /// The feed continues processing. This event indicates tracker
301    /// saturation — the scene has more confirmed objects than
302    /// `max_concurrent_tracks` allows.
303    ///
304    /// Events are coalesced per frame: a single event is emitted
305    /// with the total number of tracks rejected in that frame.
306    TrackAdmissionRejected {
307        feed_id: FeedId,
308        /// Number of tracks rejected in this frame.
309        rejected_count: u32,
310    },
311
312    /// The batch processor returned an error or panicked.
313    ///
314    /// All frames in the affected batch are dropped. Each feed thread
315    /// waiting on that batch receives the error and skips the frame.
316    BatchError {
317        processor_id: StageId,
318        batch_size: u32,
319        error: StageError,
320    },
321
322    /// A feed's batch submission was rejected because the coordinator's
323    /// queue is full or the coordinator has shut down. The frame is
324    /// dropped for this feed.
325    ///
326    /// Events are coalesced: under sustained overload, the executor
327    /// emits one event per throttle window (1 second) with
328    /// `dropped_count` reflecting the number of frames rejected in
329    /// that window. On recovery (a subsequent successful submission),
330    /// any remaining accumulated count is flushed immediately.
331    ///
332    /// Indicates the batch coordinator is overloaded — either the
333    /// processor is too slow for the combined frame rate, or
334    /// `max_batch_size` is too small.
335    BatchSubmissionRejected {
336        feed_id: FeedId,
337        processor_id: StageId,
338        /// Number of frames rejected in this throttle window.
339        dropped_count: u64,
340    },
341
342    /// A feed's batch response timed out — the coordinator did not
343    /// return a result within `max_latency + response_timeout`.
344    ///
345    /// Events are coalesced: under sustained timeout conditions, the
346    /// executor emits one event per throttle window (1 second) with
347    /// `timed_out_count` reflecting the number of timeouts in that
348    /// window. On recovery (a subsequent successful submission), any
349    /// remaining accumulated count is flushed immediately.
350    ///
351    /// Indicates the batch processor is slower than the configured
352    /// safety margin. Consider increasing `response_timeout` or
353    /// reducing `max_batch_size`.
354    BatchTimeout {
355        feed_id: FeedId,
356        processor_id: StageId,
357        /// Number of timeouts in this throttle window.
358        timed_out_count: u64,
359    },
360
361    /// A feed's batch submission was rejected because the feed already
362    /// has the maximum number of in-flight items in the coordinator
363    /// pipeline (default: 1).
364    ///
365    /// This occurs when a prior submission timed out on the feed side
366    /// but has not yet been processed (or drained) by the coordinator.
367    /// The in-flight cap prevents one feed from accumulating orphaned
368    /// items in the shared queue.
369    ///
370    /// Events are coalesced with the same 1-second throttle window as
371    /// other batch rejection events. On recovery, any remaining
372    /// accumulated count is flushed immediately.
373    BatchInFlightExceeded {
374        feed_id: FeedId,
375        processor_id: StageId,
376        /// Number of submissions rejected in this throttle window.
377        rejected_count: u64,
378    },
379    /// The effective device residency is lower than what was requested.
380    ///
381    /// This occurs when `DeviceResidency::Cuda` was requested but the
382    /// required GStreamer CUDA elements (`cudaupload`, `cudaconvert`)
383    /// are not available at runtime — the backend silently falls back to
384    /// host-memory decoding.
385    ///
386    /// Emitted once per session start alongside `DecodeDecision`.
387    ///
388    /// **Action:** install GStreamer >= 1.20 CUDA plugins, or switch to
389    /// a platform-specific provider (`DeviceResidency::Provider`) for
390    /// guaranteed GPU residency.
391    ResidencyDowngrade {
392        feed_id: FeedId,
393        /// What was requested (e.g., "Cuda").
394        requested: String,
395        /// What is actually in effect (e.g., "Host").
396        effective: String,
397    },
398}
399
400/// Reason a feed stopped permanently.
401#[derive(Debug, Clone)]
402pub enum StopReason {
403    /// Normal shutdown requested by the user.
404    UserRequested,
405
406    /// The restart limit was exceeded after repeated failures.
407    RestartLimitExceeded { restart_count: u32 },
408
409    /// End of stream on a file source.
410    EndOfStream,
411
412    /// An unrecoverable error occurred.
413    Fatal { detail: String },
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn decode_outcome_display() {
422        assert_eq!(DecodeOutcome::Hardware.to_string(), "Hardware");
423        assert_eq!(DecodeOutcome::Software.to_string(), "Software");
424        assert_eq!(DecodeOutcome::Unknown.to_string(), "Unknown");
425    }
426
427    #[test]
428    fn decode_preference_display() {
429        assert_eq!(DecodePreference::Auto.to_string(), "Auto");
430        assert_eq!(DecodePreference::CpuOnly.to_string(), "CpuOnly");
431        assert_eq!(
432            DecodePreference::PreferHardware.to_string(),
433            "PreferHardware"
434        );
435        assert_eq!(
436            DecodePreference::RequireHardware.to_string(),
437            "RequireHardware"
438        );
439    }
440
441    #[test]
442    fn stop_reason_variants() {
443        let user = StopReason::UserRequested;
444        let restart = StopReason::RestartLimitExceeded { restart_count: 3 };
445        let eos = StopReason::EndOfStream;
446        let fatal = StopReason::Fatal {
447            detail: "test".into(),
448        };
449
450        // Ensure Debug formatting doesn't panic.
451        let _ = format!("{user:?}");
452        let _ = format!("{restart:?}");
453        let _ = format!("{eos:?}");
454        let _ = format!("{fatal:?}");
455    }
456
457    #[test]
458    fn health_event_is_send_sync() {
459        fn assert_send_sync<T: Send + Sync>() {}
460        // HealthEvent must be broadcastable across threads.
461        assert_send_sync::<HealthEvent>();
462    }
463}