nv_core/health.rs
1//! Health events, stop reasons, and decode outcome/preference classification.
2//!
3//! [`HealthEvent`] is the primary mechanism for observing runtime behavior.
4//! Events are broadcast to subscribers via a channel. They cover source
5//! lifecycle, stage errors, feed restarts, backpressure, and view-state changes.
6//!
7//! [`DecodeOutcome`] provides a backend-neutral classification of which
8//! decoder class is in effect (hardware, software, or unknown).
9//!
10//! [`DecodePreference`] is the user-facing decode strategy selection
11//! (auto, CPU-only, prefer-hardware, require-hardware).
12
13use crate::error::{MediaError, StageError};
14use crate::id::{FeedId, StageId};
15
16// ---------------------------------------------------------------------------
17// Decode outcome — backend-neutral decoder classification
18// ---------------------------------------------------------------------------
19
20/// Backend-neutral classification of the effective decoder.
21///
22/// After the media backend negotiates a decoder for a stream, this
23/// categorises the result without exposing backend element names, GPU
24/// memory modes, or inference-framework details.
25///
26/// Used in [`HealthEvent::DecodeDecision`] and the internal
27/// `DecodeDecisionInfo` diagnostic report.
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
29pub enum DecodeOutcome {
30 /// A hardware-accelerated video decoder is in use.
31 Hardware,
32 /// A software (CPU-only) video decoder is in use.
33 Software,
34 /// The backend could not determine which decoder class is active.
35 ///
36 /// This can happen with custom pipeline fragments or when the
37 /// backend does not expose decoder identity.
38 Unknown,
39}
40
41impl std::fmt::Display for DecodeOutcome {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 Self::Hardware => f.write_str("Hardware"),
45 Self::Software => f.write_str("Software"),
46 Self::Unknown => f.write_str("Unknown"),
47 }
48 }
49}
50
51// ---------------------------------------------------------------------------
52// Decode preference — user-facing decode strategy
53// ---------------------------------------------------------------------------
54
55/// User-facing decode preference for a feed.
56///
57/// Controls which decoder strategy the media backend uses when constructing
58/// the decode pipeline. The default is [`Auto`](Self::Auto), which preserves
59/// the backend's existing selection heuristic.
60///
61/// This type is backend-neutral — it does not expose GStreamer element names,
62/// GPU memory modes, or inference-framework details.
63///
64/// | Variant | Behavior |
65/// |---|---|
66/// | `Auto` | Backend picks the best available decoder (default). |
67/// | `CpuOnly` | Force software decoding — never use a hardware decoder. |
68/// | `PreferHardware` | Try hardware first; fall back to software silently. |
69/// | `RequireHardware` | Demand hardware decoding; fail-fast if unavailable. |
70#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
71pub enum DecodePreference {
72 /// Automatically select the best decoder: prefer hardware, fall back to
73 /// software. This is the current default behavior preserved exactly.
74 #[default]
75 Auto,
76
77 /// Force software decoding. The backend must never attempt a hardware
78 /// decoder. Useful in environments without GPU access or where
79 /// deterministic CPU-only behaviour is required.
80 CpuOnly,
81
82 /// Prefer hardware decoding, but fall back to software silently if no
83 /// hardware decoder is available. No error is raised on fallback.
84 PreferHardware,
85
86 /// Require hardware decoding. If no hardware decoder is available, the
87 /// backend must fail-fast with a [`MediaError`]
88 /// instead of silently falling back to software.
89 RequireHardware,
90}
91
92impl std::fmt::Display for DecodePreference {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 match self {
95 Self::Auto => f.write_str("Auto"),
96 Self::CpuOnly => f.write_str("CpuOnly"),
97 Self::PreferHardware => f.write_str("PreferHardware"),
98 Self::RequireHardware => f.write_str("RequireHardware"),
99 }
100 }
101}
102
103/// A health event emitted by the runtime.
104///
105/// Subscribe via [`Runtime::health_subscribe()`](crate) (aggregate).
106/// Per-feed filtering is the subscriber's responsibility.
107#[derive(Debug, Clone)]
108pub enum HealthEvent {
109 /// The video source connected successfully.
110 SourceConnected { feed_id: FeedId },
111
112 /// The video source disconnected.
113 SourceDisconnected { feed_id: FeedId, reason: MediaError },
114
115 /// The video source is attempting to reconnect.
116 SourceReconnecting { feed_id: FeedId, attempt: u32 },
117
118 /// An RTSP source is using insecure (non-TLS) transport.
119 ///
120 /// Emitted once per session start when the effective RTSP URL uses
121 /// `rtsp://` instead of `rtsps://`. This is informational — the feed
122 /// still operates, but the operator should consider migrating the
123 /// camera to TLS or a firewalled network segment.
124 ///
125 /// Not emitted when `RtspSecurityPolicy::RequireTls` is set (that
126 /// policy rejects insecure sources at config time).
127 InsecureRtspSource {
128 feed_id: FeedId,
129 /// Redacted URL (credentials stripped) for operator diagnostics.
130 redacted_url: String,
131 },
132
133 /// A stage returned an error for a single frame.
134 /// The frame was dropped; the feed continues.
135 StageError {
136 feed_id: FeedId,
137 stage_id: StageId,
138 error: StageError,
139 },
140
141 /// A stage panicked. The feed will restart per its restart policy.
142 StagePanic { feed_id: FeedId, stage_id: StageId },
143
144 /// The feed is restarting.
145 FeedRestarting { feed_id: FeedId, restart_count: u32 },
146
147 /// The feed has stopped permanently.
148 FeedStopped { feed_id: FeedId, reason: StopReason },
149
150 /// Frames were dropped due to backpressure.
151 BackpressureDrop {
152 feed_id: FeedId,
153 frames_dropped: u64,
154 },
155
156 /// Frames are being processed with significant wall-clock staleness.
157 ///
158 /// Emitted when the age of a frame at processing time (wall-clock
159 /// now minus the wall-clock timestamp assigned at media bridge)
160 /// exceeds a threshold, indicating the consumer is falling behind
161 /// the source — typically due to buffer-pool starvation,
162 /// inference backlog, or TCP accumulation.
163 ///
164 /// Events are coalesced: under sustained lag, the executor emits
165 /// one event per throttle window (1 second) with `frames_lagged`
166 /// reflecting the number of stale frames in that window.
167 FrameLag {
168 feed_id: FeedId,
169 /// Frame age of the most recent stale frame, in milliseconds.
170 frame_age_ms: u64,
171 /// Number of frames exceeding the threshold since the last
172 /// `FrameLag` event (per-event delta).
173 frames_lagged: u64,
174 },
175
176 /// The view epoch changed (camera discontinuity detected).
177 ViewEpochChanged {
178 feed_id: FeedId,
179 /// New epoch value (from `nv-view`, represented as u64 here to avoid
180 /// circular dependency; re-exported with proper type in `nv-runtime`).
181 epoch: u64,
182 },
183
184 /// The view validity has degraded due to camera motion.
185 ViewDegraded {
186 feed_id: FeedId,
187 stability_score: f32,
188 },
189
190 /// A compensation transform was applied to active tracks.
191 ViewCompensationApplied { feed_id: FeedId, epoch: u64 },
192
193 /// The output broadcast channel is saturated — the internal sentinel
194 /// receiver observed ring-buffer wrap.
195 ///
196 /// An internal sentinel receiver (the slowest possible consumer)
197 /// monitors the aggregate output channel. When production outpaces
198 /// the sentinel's drain interval, the ring buffer wraps past the
199 /// sentinel's read position and `messages_lost` reports how many
200 /// messages the sentinel missed.
201 ///
202 /// **What this means:** the channel is under backpressure. The
203 /// sentinel observes worst-case wrap — it does **not** prove that
204 /// any specific external subscriber lost messages. A subscriber
205 /// consuming faster than the sentinel will experience less (or no)
206 /// loss. Treat this as a saturation / capacity warning, not a
207 /// per-subscriber loss report.
208 ///
209 /// **Attribution:** this is a runtime-global event. The output
210 /// channel is shared across all feeds, so saturation is not
211 /// attributable to any single feed.
212 ///
213 /// **`messages_lost` semantics:** per-event delta — the number of
214 /// messages the sentinel missed since the previous `OutputLagged`
215 /// event (or since runtime start for the first event). This is
216 /// **not** a cumulative total and **not** a count of messages lost
217 /// by external subscribers.
218 ///
219 /// **Throttling:** events are coalesced to prevent storms. The
220 /// runtime emits one event on the initial saturation transition,
221 /// then at most one per second during sustained saturation, each
222 /// carrying the accumulated sentinel-observed delta for that
223 /// interval.
224 ///
225 /// **Action:** consider increasing `output_capacity` or improving
226 /// subscriber throughput.
227 OutputLagged {
228 /// Sentinel-observed ring-buffer wrap since the previous
229 /// `OutputLagged` event (per-event delta, not cumulative).
230 /// Reflects channel saturation, not guaranteed per-subscriber
231 /// loss.
232 messages_lost: u64,
233 },
234
235 /// The source reached end-of-stream (file sources).
236 ///
237 /// For non-looping file sources this is terminal: the feed stops
238 /// with [`StopReason::EndOfStream`] rather than restarting.
239 SourceEos { feed_id: FeedId },
240
241 /// A decode decision was made for a feed's video stream.
242 ///
243 /// Emitted once per session start (not per frame) when the backend
244 /// identifies which decoder was selected for the stream. Provides
245 /// visibility into hardware vs. software decode selection without
246 /// exposing backend-specific element names in required fields.
247 ///
248 /// `detail` is a backend-specific diagnostic string (e.g., the
249 /// GStreamer element name). It is intended for logging — do not
250 /// match on its contents.
251 DecodeDecision {
252 feed_id: FeedId,
253 /// Hardware, Software, or Unknown.
254 outcome: DecodeOutcome,
255 /// The user-requested decode preference that was in effect.
256 preference: DecodePreference,
257 /// Whether the adaptive fallback cache overrode the requested
258 /// preference for this session.
259 fallback_active: bool,
260 /// Human-readable reason for fallback (populated when
261 /// `fallback_active` is `true`).
262 fallback_reason: Option<String>,
263 /// Backend-specific diagnostic detail (element name etc.).
264 detail: String,
265 },
266
267 /// The per-feed `OutputSink` panicked during `emit()`.
268 ///
269 /// The output is dropped but the feed continues processing.
270 /// The runtime wraps `OutputSink::emit()` in `catch_unwind` to
271 /// prevent a misbehaving sink from tearing down the worker thread.
272 SinkPanic { feed_id: FeedId },
273
274 /// The per-feed sink worker did not shut down within the timeout.
275 ///
276 /// The sink thread is detached and a placeholder sink is installed.
277 /// This typically means `OutputSink::emit()` is blocked on
278 /// downstream I/O. Distinct from [`SinkPanic`](Self::SinkPanic)
279 /// to allow operators to route hung-sink alerts separately from
280 /// crash alerts.
281 SinkTimeout { feed_id: FeedId },
282
283 /// The per-feed sink queue is full — output was dropped.
284 ///
285 /// The feed continues processing; only the output delivery is
286 /// dropped to prevent slow downstream I/O from blocking the
287 /// perception pipeline.
288 ///
289 /// `outputs_dropped` is the number of outputs dropped since the
290 /// last `SinkBackpressure` event (per-event delta).
291 SinkBackpressure {
292 feed_id: FeedId,
293 outputs_dropped: u64,
294 },
295
296 /// Tracks were rejected by the temporal store's admission
297 /// control because the hard cap was reached and no evictable
298 /// candidates (Lost/Coasted/Tentative) were available.
299 ///
300 /// The feed continues processing. This event indicates tracker
301 /// saturation — the scene has more confirmed objects than
302 /// `max_concurrent_tracks` allows.
303 ///
304 /// Events are coalesced per frame: a single event is emitted
305 /// with the total number of tracks rejected in that frame.
306 TrackAdmissionRejected {
307 feed_id: FeedId,
308 /// Number of tracks rejected in this frame.
309 rejected_count: u32,
310 },
311
312 /// The batch processor returned an error or panicked.
313 ///
314 /// All frames in the affected batch are dropped. Each feed thread
315 /// waiting on that batch receives the error and skips the frame.
316 BatchError {
317 processor_id: StageId,
318 batch_size: u32,
319 error: StageError,
320 },
321
322 /// A feed's batch submission was rejected because the coordinator's
323 /// queue is full or the coordinator has shut down. The frame is
324 /// dropped for this feed.
325 ///
326 /// Events are coalesced: under sustained overload, the executor
327 /// emits one event per throttle window (1 second) with
328 /// `dropped_count` reflecting the number of frames rejected in
329 /// that window. On recovery (a subsequent successful submission),
330 /// any remaining accumulated count is flushed immediately.
331 ///
332 /// Indicates the batch coordinator is overloaded — either the
333 /// processor is too slow for the combined frame rate, or
334 /// `max_batch_size` is too small.
335 BatchSubmissionRejected {
336 feed_id: FeedId,
337 processor_id: StageId,
338 /// Number of frames rejected in this throttle window.
339 dropped_count: u64,
340 },
341
342 /// A feed's batch response timed out — the coordinator did not
343 /// return a result within `max_latency + response_timeout`.
344 ///
345 /// Events are coalesced: under sustained timeout conditions, the
346 /// executor emits one event per throttle window (1 second) with
347 /// `timed_out_count` reflecting the number of timeouts in that
348 /// window. On recovery (a subsequent successful submission), any
349 /// remaining accumulated count is flushed immediately.
350 ///
351 /// Indicates the batch processor is slower than the configured
352 /// safety margin. Consider increasing `response_timeout` or
353 /// reducing `max_batch_size`.
354 BatchTimeout {
355 feed_id: FeedId,
356 processor_id: StageId,
357 /// Number of timeouts in this throttle window.
358 timed_out_count: u64,
359 },
360
361 /// A feed's batch submission was rejected because the feed already
362 /// has the maximum number of in-flight items in the coordinator
363 /// pipeline (default: 1).
364 ///
365 /// This occurs when a prior submission timed out on the feed side
366 /// but has not yet been processed (or drained) by the coordinator.
367 /// The in-flight cap prevents one feed from accumulating orphaned
368 /// items in the shared queue.
369 ///
370 /// Events are coalesced with the same 1-second throttle window as
371 /// other batch rejection events. On recovery, any remaining
372 /// accumulated count is flushed immediately.
373 BatchInFlightExceeded {
374 feed_id: FeedId,
375 processor_id: StageId,
376 /// Number of submissions rejected in this throttle window.
377 rejected_count: u64,
378 },
379 /// The effective device residency is lower than what was requested.
380 ///
381 /// This occurs when `DeviceResidency::Cuda` was requested but the
382 /// required GStreamer CUDA elements (`cudaupload`, `cudaconvert`)
383 /// are not available at runtime — the backend silently falls back to
384 /// host-memory decoding.
385 ///
386 /// Emitted once per session start alongside `DecodeDecision`.
387 ///
388 /// **Action:** install GStreamer >= 1.20 CUDA plugins, or switch to
389 /// a platform-specific provider (`DeviceResidency::Provider`) for
390 /// guaranteed GPU residency.
391 ResidencyDowngrade {
392 feed_id: FeedId,
393 /// What was requested (e.g., "Cuda").
394 requested: String,
395 /// What is actually in effect (e.g., "Host").
396 effective: String,
397 },
398}
399
400/// Reason a feed stopped permanently.
401#[derive(Debug, Clone)]
402pub enum StopReason {
403 /// Normal shutdown requested by the user.
404 UserRequested,
405
406 /// The restart limit was exceeded after repeated failures.
407 RestartLimitExceeded { restart_count: u32 },
408
409 /// End of stream on a file source.
410 EndOfStream,
411
412 /// An unrecoverable error occurred.
413 Fatal { detail: String },
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn decode_outcome_display() {
422 assert_eq!(DecodeOutcome::Hardware.to_string(), "Hardware");
423 assert_eq!(DecodeOutcome::Software.to_string(), "Software");
424 assert_eq!(DecodeOutcome::Unknown.to_string(), "Unknown");
425 }
426
427 #[test]
428 fn decode_preference_display() {
429 assert_eq!(DecodePreference::Auto.to_string(), "Auto");
430 assert_eq!(DecodePreference::CpuOnly.to_string(), "CpuOnly");
431 assert_eq!(
432 DecodePreference::PreferHardware.to_string(),
433 "PreferHardware"
434 );
435 assert_eq!(
436 DecodePreference::RequireHardware.to_string(),
437 "RequireHardware"
438 );
439 }
440
441 #[test]
442 fn stop_reason_variants() {
443 let user = StopReason::UserRequested;
444 let restart = StopReason::RestartLimitExceeded { restart_count: 3 };
445 let eos = StopReason::EndOfStream;
446 let fatal = StopReason::Fatal {
447 detail: "test".into(),
448 };
449
450 // Ensure Debug formatting doesn't panic.
451 let _ = format!("{user:?}");
452 let _ = format!("{restart:?}");
453 let _ = format!("{eos:?}");
454 let _ = format!("{fatal:?}");
455 }
456
457 #[test]
458 fn health_event_is_send_sync() {
459 fn assert_send_sync<T: Send + Sync>() {}
460 // HealthEvent must be broadcastable across threads.
461 assert_send_sync::<HealthEvent>();
462 }
463}