nv_runtime/feed_handle.rs
1//! Feed handle and runtime-observable types.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use nv_core::id::FeedId;
7use nv_core::metrics::FeedMetrics;
8
9/// Handle to a running feed.
10///
11/// Provides per-feed monitoring and control: health events, metrics,
12/// queue telemetry, uptime, pause/resume, and stop.
13///
14/// Backed by `Arc<FeedSharedState>` — cloning is cheap (Arc bump).
15/// Metrics are read from the same atomic counters that the feed worker
16/// thread writes.
17#[derive(Clone)]
18pub struct FeedHandle {
19 shared: Arc<crate::worker::FeedSharedState>,
20}
21
22/// Snapshot of queue depths and capacities for a feed.
23///
24/// **Source queue**: bounded frame queue between media ingress and pipeline.
25/// **Sink queue**: bounded channel between pipeline and output sink.
26///
27/// Values are approximate under concurrent access — sufficient for
28/// monitoring and dashboards, not for synchronization.
29#[derive(Debug, Clone, Copy)]
30pub struct QueueTelemetry {
31 /// Current number of frames in the source queue.
32 pub source_depth: usize,
33 /// Maximum capacity of the source queue.
34 pub source_capacity: usize,
35 /// Current number of outputs in the sink queue.
36 pub sink_depth: usize,
37 /// Maximum capacity of the sink queue.
38 pub sink_capacity: usize,
39}
40
41/// Snapshot of the decode method selected by the media backend.
42///
43/// Available after the stream starts and the backend confirms decoder
44/// negotiation. Use [`FeedHandle::decode_status()`] to poll.
45#[derive(Debug, Clone)]
46pub struct DecodeStatus {
47 /// Whether hardware or software decoding was selected.
48 pub outcome: nv_core::health::DecodeOutcome,
49 /// Backend-specific detail string (e.g., GStreamer element name).
50 ///
51 /// Intended for diagnostics and dashboards — do not match on its
52 /// contents programmatically.
53 pub detail: String,
54}
55
56impl FeedHandle {
57 /// Create a feed handle (internal — constructed by the runtime).
58 pub(crate) fn new(shared: Arc<crate::worker::FeedSharedState>) -> Self {
59 Self { shared }
60 }
61
62 /// The feed's unique identifier.
63 #[must_use]
64 pub fn id(&self) -> FeedId {
65 self.shared.id
66 }
67
68 /// Whether the feed is currently paused.
69 #[must_use]
70 pub fn is_paused(&self) -> bool {
71 self.shared
72 .paused
73 .load(std::sync::atomic::Ordering::Relaxed)
74 }
75
76 /// Whether the worker thread is still alive.
77 #[must_use]
78 pub fn is_alive(&self) -> bool {
79 self.shared.alive.load(std::sync::atomic::Ordering::Relaxed)
80 }
81
82 /// Get a snapshot of the feed's current metrics.
83 ///
84 /// Reads live atomic counters maintained by the feed worker thread.
85 #[must_use]
86 pub fn metrics(&self) -> FeedMetrics {
87 self.shared.metrics()
88 }
89
90 /// Get a snapshot of the feed's source and sink queue depths/capacities.
91 ///
92 /// Source queue depth reads the frame queue's internal lock briefly.
93 /// Sink queue depth reads an atomic counter with no locking.
94 ///
95 /// If no processing session is active (between restarts or after
96 /// shutdown), both depths return 0.
97 #[must_use]
98 pub fn queue_telemetry(&self) -> QueueTelemetry {
99 let (source_depth, source_capacity) = self.shared.source_queue_telemetry();
100 QueueTelemetry {
101 source_depth,
102 source_capacity,
103 sink_depth: self
104 .shared
105 .sink_occupancy
106 .load(std::sync::atomic::Ordering::Relaxed),
107 sink_capacity: self
108 .shared
109 .sink_capacity
110 .load(std::sync::atomic::Ordering::Relaxed),
111 }
112 }
113
114 /// Elapsed time since the feed's current processing session started.
115 ///
116 /// **Semantics: session-scoped uptime.** The clock resets on each
117 /// successful start or restart. If the feed has not started yet or
118 /// is between restart attempts, the value reflects the time since the
119 /// last session began.
120 ///
121 /// Useful for monitoring feed stability: a feed that restarts
122 /// frequently will show low uptime values.
123 #[must_use]
124 pub fn uptime(&self) -> Duration {
125 self.shared.session_uptime()
126 }
127
128 /// The decode method confirmed by the media backend for this feed.
129 ///
130 /// Returns `None` if no decode decision has been made yet (the
131 /// stream has not started, the backend has not negotiated a decoder,
132 /// or the feed is between restarts).
133 #[must_use]
134 pub fn decode_status(&self) -> Option<DecodeStatus> {
135 let (outcome, detail) = self.shared.decode_status()?;
136 Some(DecodeStatus { outcome, detail })
137 }
138
139 /// Get a consolidated diagnostics snapshot of this feed.
140 ///
141 /// Composes lifecycle state, metrics, queue depths, decode status,
142 /// and view-system health into a single read. All data comes from
143 /// the same atomic counters the individual accessors use — this is
144 /// a convenience composite, not a new data source.
145 ///
146 /// Suitable for periodic polling (1–5 s) by dashboards and health
147 /// probes.
148 #[must_use]
149 pub fn diagnostics(&self) -> crate::diagnostics::FeedDiagnostics {
150 use crate::diagnostics::{ViewDiagnostics, ViewStatus};
151
152 let metrics = self.metrics();
153 let validity_ordinal = self
154 .shared
155 .view_context_validity
156 .load(std::sync::atomic::Ordering::Relaxed);
157 let status = match validity_ordinal {
158 0 => ViewStatus::Stable,
159 1 => ViewStatus::Degraded,
160 _ => ViewStatus::Invalid,
161 };
162 let stability_bits = self
163 .shared
164 .view_stability_score
165 .load(std::sync::atomic::Ordering::Relaxed);
166
167 crate::diagnostics::FeedDiagnostics {
168 feed_id: self.id(),
169 alive: self.is_alive(),
170 paused: self.is_paused(),
171 uptime: self.uptime(),
172 metrics,
173 queues: self.queue_telemetry(),
174 decode: self.decode_status(),
175 view: ViewDiagnostics {
176 epoch: metrics.view_epoch,
177 stability_score: f32::from_bits(stability_bits),
178 status,
179 },
180 batch_processor_id: self.shared.batch_processor_id,
181 }
182 }
183
184 /// Pause the feed (stop pulling frames from source; stages idle).
185 ///
186 /// Uses a condvar to wake the worker without spin-sleeping.
187 ///
188 /// # Errors
189 ///
190 /// Returns an error if the feed is already paused.
191 pub fn pause(&self) -> Result<(), nv_core::NvError> {
192 let was_paused = self
193 .shared
194 .paused
195 .swap(true, std::sync::atomic::Ordering::Relaxed);
196 if was_paused {
197 return Err(nv_core::NvError::Runtime(
198 nv_core::error::RuntimeError::AlreadyPaused,
199 ));
200 }
201 // Mirror into the condvar-guarded bool.
202 let (lock, _cvar) = &self.shared.pause_condvar;
203 *lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
204 Ok(())
205 }
206
207 /// Resume a paused feed.
208 ///
209 /// Notifies the worker thread via condvar so it wakes immediately.
210 ///
211 /// # Errors
212 ///
213 /// Returns an error if the feed is not paused.
214 pub fn resume(&self) -> Result<(), nv_core::NvError> {
215 let was_paused = self
216 .shared
217 .paused
218 .swap(false, std::sync::atomic::Ordering::Relaxed);
219 if !was_paused {
220 return Err(nv_core::NvError::Runtime(
221 nv_core::error::RuntimeError::NotPaused,
222 ));
223 }
224 // Mirror into the condvar-guarded bool and wake the worker.
225 let (lock, cvar) = &self.shared.pause_condvar;
226 *lock.lock().unwrap_or_else(|e| e.into_inner()) = false;
227 cvar.notify_one();
228 Ok(())
229 }
230}