Skip to main content

nv_runtime/
feed_handle.rs

1//! Feed handle and runtime-observable types.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use nv_core::id::FeedId;
7use nv_core::metrics::FeedMetrics;
8
9/// Handle to a running feed.
10///
11/// Provides per-feed monitoring and control: health events, metrics,
12/// queue telemetry, uptime, pause/resume, and stop.
13///
14/// Backed by `Arc<FeedSharedState>` — cloning is cheap (Arc bump).
15/// Metrics are read from the same atomic counters that the feed worker
16/// thread writes.
17#[derive(Clone)]
18pub struct FeedHandle {
19    shared: Arc<crate::worker::FeedSharedState>,
20}
21
22/// Snapshot of queue depths and capacities for a feed.
23///
24/// **Source queue**: bounded frame queue between media ingress and pipeline.
25/// **Sink queue**: bounded channel between pipeline and output sink.
26///
27/// Values are approximate under concurrent access — sufficient for
28/// monitoring and dashboards, not for synchronization.
29#[derive(Debug, Clone, Copy)]
30pub struct QueueTelemetry {
31    /// Current number of frames in the source queue.
32    pub source_depth: usize,
33    /// Maximum capacity of the source queue.
34    pub source_capacity: usize,
35    /// Current number of outputs in the sink queue.
36    pub sink_depth: usize,
37    /// Maximum capacity of the sink queue.
38    pub sink_capacity: usize,
39}
40
41/// Snapshot of the decode method selected by the media backend.
42///
43/// Available after the stream starts and the backend confirms decoder
44/// negotiation. Use [`FeedHandle::decode_status()`] to poll.
45#[derive(Debug, Clone)]
46pub struct DecodeStatus {
47    /// Whether hardware or software decoding was selected.
48    pub outcome: nv_core::health::DecodeOutcome,
49    /// Backend-specific detail string (e.g., GStreamer element name).
50    ///
51    /// Intended for diagnostics and dashboards — do not match on its
52    /// contents programmatically.
53    pub detail: String,
54}
55
56impl FeedHandle {
57    /// Create a feed handle (internal — constructed by the runtime).
58    pub(crate) fn new(shared: Arc<crate::worker::FeedSharedState>) -> Self {
59        Self { shared }
60    }
61
62    /// The feed's unique identifier.
63    #[must_use]
64    pub fn id(&self) -> FeedId {
65        self.shared.id
66    }
67
68    /// Whether the feed is currently paused.
69    #[must_use]
70    pub fn is_paused(&self) -> bool {
71        self.shared
72            .paused
73            .load(std::sync::atomic::Ordering::Relaxed)
74    }
75
76    /// Whether the worker thread is still alive.
77    #[must_use]
78    pub fn is_alive(&self) -> bool {
79        self.shared.alive.load(std::sync::atomic::Ordering::Relaxed)
80    }
81
82    /// Get a snapshot of the feed's current metrics.
83    ///
84    /// Reads live atomic counters maintained by the feed worker thread.
85    #[must_use]
86    pub fn metrics(&self) -> FeedMetrics {
87        self.shared.metrics()
88    }
89
90    /// Get a snapshot of the feed's source and sink queue depths/capacities.
91    ///
92    /// Source queue depth reads the frame queue's internal lock briefly.
93    /// Sink queue depth reads an atomic counter with no locking.
94    ///
95    /// If no processing session is active (between restarts or after
96    /// shutdown), both depths return 0.
97    #[must_use]
98    pub fn queue_telemetry(&self) -> QueueTelemetry {
99        let (source_depth, source_capacity) = self.shared.source_queue_telemetry();
100        QueueTelemetry {
101            source_depth,
102            source_capacity,
103            sink_depth: self
104                .shared
105                .sink_occupancy
106                .load(std::sync::atomic::Ordering::Relaxed),
107            sink_capacity: self
108                .shared
109                .sink_capacity
110                .load(std::sync::atomic::Ordering::Relaxed),
111        }
112    }
113
114    /// Elapsed time since the feed's current processing session started.
115    ///
116    /// **Semantics: session-scoped uptime.** The clock resets on each
117    /// successful start or restart. If the feed has not started yet or
118    /// is between restart attempts, the value reflects the time since the
119    /// last session began.
120    ///
121    /// Useful for monitoring feed stability: a feed that restarts
122    /// frequently will show low uptime values.
123    #[must_use]
124    pub fn uptime(&self) -> Duration {
125        self.shared.session_uptime()
126    }
127
128    /// The decode method confirmed by the media backend for this feed.
129    ///
130    /// Returns `None` if no decode decision has been made yet (the
131    /// stream has not started, the backend has not negotiated a decoder,
132    /// or the feed is between restarts).
133    #[must_use]
134    pub fn decode_status(&self) -> Option<DecodeStatus> {
135        let (outcome, detail) = self.shared.decode_status()?;
136        Some(DecodeStatus { outcome, detail })
137    }
138
139    /// Get a consolidated diagnostics snapshot of this feed.
140    ///
141    /// Composes lifecycle state, metrics, queue depths, decode status,
142    /// and view-system health into a single read. All data comes from
143    /// the same atomic counters the individual accessors use — this is
144    /// a convenience composite, not a new data source.
145    ///
146    /// Suitable for periodic polling (1–5 s) by dashboards and health
147    /// probes.
148    #[must_use]
149    pub fn diagnostics(&self) -> crate::diagnostics::FeedDiagnostics {
150        use crate::diagnostics::{ViewDiagnostics, ViewStatus};
151
152        let metrics = self.metrics();
153        let validity_ordinal = self
154            .shared
155            .view_context_validity
156            .load(std::sync::atomic::Ordering::Relaxed);
157        let status = match validity_ordinal {
158            0 => ViewStatus::Stable,
159            1 => ViewStatus::Degraded,
160            _ => ViewStatus::Invalid,
161        };
162        let stability_bits = self
163            .shared
164            .view_stability_score
165            .load(std::sync::atomic::Ordering::Relaxed);
166
167        crate::diagnostics::FeedDiagnostics {
168            feed_id: self.id(),
169            alive: self.is_alive(),
170            paused: self.is_paused(),
171            uptime: self.uptime(),
172            metrics,
173            queues: self.queue_telemetry(),
174            decode: self.decode_status(),
175            view: ViewDiagnostics {
176                epoch: metrics.view_epoch,
177                stability_score: f32::from_bits(stability_bits),
178                status,
179            },
180            batch_processor_id: self.shared.batch_processor_id,
181        }
182    }
183
184    /// Pause the feed (stop pulling frames from source; stages idle).
185    ///
186    /// Uses a condvar to wake the worker without spin-sleeping.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the feed is already paused.
191    pub fn pause(&self) -> Result<(), nv_core::NvError> {
192        let was_paused = self
193            .shared
194            .paused
195            .swap(true, std::sync::atomic::Ordering::Relaxed);
196        if was_paused {
197            return Err(nv_core::NvError::Runtime(
198                nv_core::error::RuntimeError::AlreadyPaused,
199            ));
200        }
201        // Mirror into the condvar-guarded bool.
202        let (lock, _cvar) = &self.shared.pause_condvar;
203        *lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
204        Ok(())
205    }
206
207    /// Resume a paused feed.
208    ///
209    /// Notifies the worker thread via condvar so it wakes immediately.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the feed is not paused.
214    pub fn resume(&self) -> Result<(), nv_core::NvError> {
215        let was_paused = self
216            .shared
217            .paused
218            .swap(false, std::sync::atomic::Ordering::Relaxed);
219        if !was_paused {
220            return Err(nv_core::NvError::Runtime(
221                nv_core::error::RuntimeError::NotPaused,
222            ));
223        }
224        // Mirror into the condvar-guarded bool and wake the worker.
225        let (lock, cvar) = &self.shared.pause_condvar;
226        *lock.lock().unwrap_or_else(|e| e.into_inner()) = false;
227        cvar.notify_one();
228        Ok(())
229    }
230}