nv_media/ingress.rs
1//! Public trait contracts for media ingress.
2//!
3//! These traits define the boundary between the media backend and the rest of
4//! the library. They ensure that no backend-specific types leak into the public
5//! API, and that alternative media backends can be substituted by implementing
6//! [`MediaIngressFactory`].
7//!
8//! The default implementation is GStreamer-backed (see [`DefaultMediaFactory`](super::DefaultMediaFactory)).
9//! All other crates interact through these traits, never through GStreamer types.
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use nv_core::config::{ReconnectPolicy, SourceSpec};
15use nv_core::error::MediaError;
16use nv_core::health::HealthEvent;
17use nv_core::id::FeedId;
18use nv_frame::FrameEnvelope;
19
20use crate::bridge::PtzTelemetry;
21use crate::decode::DecodePreference;
22use crate::gpu_provider::SharedGpuProvider;
23use crate::hook::PostDecodeHook;
24
25/// Where decoded frames reside after the media bridge.
26///
27/// Controls whether the pipeline tail maps frames to host memory
28/// (default), uses the built-in CUDA path, or delegates to a custom
29/// [`GpuPipelineProvider`](crate::GpuPipelineProvider).
30///
31/// # Resolution order
32///
33/// The pipeline builder resolves the device residency strategy in this
34/// order:
35///
36/// 1. **`Provider`** — the provider builds the pipeline tail and bridges
37/// frames. No built-in CUDA elements are needed. **If the provider
38/// fails, pipeline construction returns an error** — there is no
39/// silent fallback to Host, because the user explicitly selected a
40/// specific hardware integration.
41/// 2. **`Cuda`** — the built-in upstream CUDA path (`cudaupload`,
42/// `cudaconvert`). Requires the `cuda` cargo feature and GStreamer
43/// ≥ 1.20. Falls back to `Host` with a warning if the elements are
44/// not available.
45/// 3. **`Host`** — standard `videoconvert → appsink` path. Frames
46/// arrive in CPU-accessible mapped memory.
47///
48/// # Feature gating
49///
50/// - `Cuda` requires the `cuda` cargo feature on `nv-media`. Without
51/// it, requesting `Cuda` returns `MediaError::Unsupported`.
52/// - `Provider` does **not** require the `cuda` feature — the provider
53/// decides which GStreamer elements to use.
54///
55/// # Hardware extension
56///
57/// To add support for a new hardware backend (e.g., AMD ROCm), implement
58/// [`GpuPipelineProvider`](crate::GpuPipelineProvider) in a new crate
59/// and pass `DeviceResidency::Provider(Arc::new(MyProvider::new()))`.
60/// No changes to the core library are needed.
61#[derive(Clone, Default)]
62pub enum DeviceResidency {
63 /// Frames are mapped to CPU-accessible memory (zero-copy GStreamer
64 /// buffer mapping). This is the standard path.
65 #[default]
66 Host,
67 /// Frames remain on the GPU as CUDA device memory via the built-in
68 /// upstream GStreamer CUDA elements (`cudaupload`, `cudaconvert`).
69 ///
70 /// Requires the `cuda` cargo feature. On GStreamer < 1.20 (where the
71 /// CUDA elements are unavailable), falls back to `Host` with a
72 /// warning.
73 ///
74 /// Stages access the device pointer via
75 /// [`FrameEnvelope::accelerated_handle::<CudaBufferHandle>()`](nv_frame::FrameEnvelope::accelerated_handle).
76 /// CPU consumers can still call
77 /// [`FrameEnvelope::require_host_data()`](nv_frame::FrameEnvelope::require_host_data)
78 /// — the materializer downloads device data on first access and
79 /// caches the result.
80 Cuda,
81 /// Frames remain on a device managed by the supplied provider.
82 ///
83 /// The provider controls pipeline tail construction and frame
84 /// bridging. This variant does **not** require the `cuda` cargo
85 /// feature — the provider decides which GStreamer elements and
86 /// memory model to use.
87 ///
88 /// See [`GpuPipelineProvider`](crate::GpuPipelineProvider) for the
89 /// trait contract.
90 Provider(SharedGpuProvider),
91}
92
93impl std::fmt::Debug for DeviceResidency {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 Self::Host => write!(f, "Host"),
97 Self::Cuda => write!(f, "Cuda"),
98 Self::Provider(p) => write!(f, "Provider({})", p.name()),
99 }
100 }
101}
102
103impl DeviceResidency {
104 /// Whether this residency requests device-resident frames (non-host).
105 #[inline]
106 pub fn is_device(&self) -> bool {
107 !matches!(self, Self::Host)
108 }
109
110 /// Extract the provider reference, if this is the `Provider` variant.
111 #[inline]
112 pub fn provider(&self) -> Option<&SharedGpuProvider> {
113 match self {
114 Self::Provider(p) => Some(p),
115 _ => None,
116 }
117 }
118}
119
120/// Reported lifecycle state of a media source after a [`tick()`](MediaIngress::tick).
121///
122/// The runtime uses this to decide whether the feed is still alive,
123/// currently recovering, or permanently stopped.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum SourceStatus {
126 /// The source is running normally — frames are (or will be) flowing.
127 Running,
128 /// The source is attempting to reconnect after a failure.
129 ///
130 /// Frames are not flowing. The runtime should keep ticking to drive
131 /// the reconnection state machine forward.
132 Reconnecting,
133 /// The source is permanently stopped (reconnection budget exhausted,
134 /// terminal EOS, or explicit stop).
135 Stopped,
136}
137
138/// Result of a [`MediaIngress::tick()`] call.
139///
140/// Combines the source's lifecycle status with an optional scheduling hint
141/// that tells the runtime how soon the next tick is needed.
142#[derive(Debug, Clone)]
143pub struct TickOutcome {
144 /// Current lifecycle state of the source.
145 pub status: SourceStatus,
146 /// Suggested delay before the next `tick()` call.
147 ///
148 /// - `Some(d)` — the source has a pending deadline (e.g., a reconnect
149 /// backoff that expires in `d`). The runtime should arrange to tick
150 /// again after at most `d`.
151 /// - `None` — no specific urgency. The runtime will wait indefinitely
152 /// for the next frame, EOS, or error — no periodic tick occurs.
153 /// Sources that rely on polling to discover state changes (e.g., a
154 /// stopped flag) **must** provide a `Some` hint.
155 pub next_tick: Option<Duration>,
156}
157
158impl TickOutcome {
159 /// Source is running, no specific tick urgency.
160 #[inline]
161 pub fn running() -> Self {
162 Self {
163 status: SourceStatus::Running,
164 next_tick: None,
165 }
166 }
167
168 /// Source is reconnecting; tick again after `delay`.
169 #[inline]
170 pub fn reconnecting(delay: Duration) -> Self {
171 Self {
172 status: SourceStatus::Reconnecting,
173 next_tick: Some(delay),
174 }
175 }
176
177 /// Source is permanently stopped.
178 #[inline]
179 pub fn stopped() -> Self {
180 Self {
181 status: SourceStatus::Stopped,
182 next_tick: None,
183 }
184 }
185}
186
187/// Trait contract for a media ingress source.
188///
189/// Abstracts source lifecycle so the runtime and pipeline code do not depend
190/// on GStreamer internals. The GStreamer-backed `MediaSource` implements this
191/// trait within `nv-media`.
192///
193/// Each `MediaIngress` instance handles exactly one feed. Implementations must
194/// be `Send` (moved onto the feed's I/O thread at startup) but need not be `Sync`.
195///
196/// # Lifecycle
197///
198/// 1. `start(sink)` — begin producing frames, delivering them via `sink`.
199/// 2. Frames flow: decoded frames are pushed to `sink.on_frame()`.
200/// 3. The runtime calls `tick()` on every processed frame **and** whenever
201/// the frame queue times out according to [`TickOutcome::next_tick`].
202/// This is event-driven: when `next_tick` is `None`, the runtime waits
203/// indefinitely for the next frame, EOS, or error — there is **no**
204/// fixed polling interval.
205/// 4. `pause()` / `resume()` — temporarily halt/restart frame production.
206/// 5. `stop()` — tear down the source and release all resources.
207///
208/// # Tick scheduling
209///
210/// The runtime does **not** poll at a fixed interval. Tick frequency is
211/// determined entirely by frame arrivals and the `next_tick` hint:
212///
213/// - When frames are flowing, `tick()` is called after each frame.
214/// - When the queue is idle and `next_tick` is `Some(d)`, the runtime
215/// wakes after `d` to call `tick()`.
216/// - When the queue is idle and `next_tick` is `None`, the runtime
217/// sleeps indefinitely — only a new frame, `on_error()`, `on_eos()`,
218/// or shutdown will wake it.
219///
220/// Sources that need periodic management (e.g., reconnection with
221/// backoff) **must** return `Some(remaining)` in `next_tick` so the
222/// runtime wakes at the right time. Sources that are purely
223/// frame-driven (e.g., test doubles) can return `None`.
224pub trait MediaIngress: Send + 'static {
225 /// Begin producing frames.
226 ///
227 /// Decoded frames are delivered to `sink`. The implementation may spawn
228 /// internal threads or use the calling thread, depending on the backend.
229 ///
230 /// On success the source enters the `Running` state immediately.
231 /// On failure the source remains in the `Idle` state and the caller
232 /// (typically the runtime) may retry. The returned [`MediaError`] carries
233 /// the typed reason for the failure (e.g., `Unsupported` when the
234 /// backend is not linked, `ConnectionFailed` when the stream cannot
235 /// be reached).
236 ///
237 /// # Errors
238 ///
239 /// Returns `MediaError` if the pipeline cannot be constructed or the
240 /// source cannot be connected on initial attempt.
241 fn start(&mut self, sink: Box<dyn FrameSink>) -> Result<(), MediaError>;
242
243 /// Stop the source and release all backend resources.
244 ///
245 /// After `stop()`, the source may not be restarted.
246 ///
247 /// # Errors
248 ///
249 /// Returns `MediaError` if teardown encounters an issue.
250 fn stop(&mut self) -> Result<(), MediaError>;
251
252 /// Pause frame production without releasing the connection.
253 ///
254 /// # Errors
255 ///
256 /// Returns `MediaError` if the backend does not support pausing or
257 /// the source is not in a running state.
258 fn pause(&mut self) -> Result<(), MediaError>;
259
260 /// Resume frame production after a pause.
261 ///
262 /// # Errors
263 ///
264 /// Returns `MediaError` if the source is not paused.
265 fn resume(&mut self) -> Result<(), MediaError>;
266
267 /// Drive internal lifecycle management: poll the backend bus for
268 /// errors/events and advance the reconnection state machine.
269 ///
270 /// Called by the runtime after each processed frame and whenever
271 /// the frame queue times out according to the previous
272 /// [`TickOutcome::next_tick`] hint. There is no fixed polling
273 /// interval — tick frequency is entirely event-driven.
274 ///
275 /// Implementations should:
276 ///
277 /// 1. Drain pending bus messages and process lifecycle events.
278 /// 2. If in a reconnecting state and the backoff deadline has elapsed,
279 /// attempt reconnection.
280 /// 3. Return a [`TickOutcome`] carrying the current status and an
281 /// optional hint for when the next tick is needed.
282 ///
283 /// The `next_tick` hint allows the runtime to sleep efficiently.
284 /// For example, when reconnecting with a 2-second backoff,
285 /// `next_tick` should be `Some(remaining)` so the runtime wakes
286 /// exactly when the backoff expires. Returning `None` means the
287 /// runtime will wait indefinitely for the next frame/error/EOS.
288 ///
289 /// Sources that need to poll for state changes **must** provide a
290 /// `Some` hint — without one, the runtime will not call `tick()`
291 /// again until a frame or error arrives.
292 ///
293 /// For sources that do not need periodic management (e.g., test
294 /// doubles that produce all frames upfront), the default
295 /// implementation returns [`TickOutcome::running()`].
296 fn tick(&mut self) -> TickOutcome {
297 TickOutcome::running()
298 }
299
300 /// The source specification this ingress was created from.
301 fn source_spec(&self) -> &SourceSpec;
302
303 /// The feed ID this ingress is associated with.
304 fn feed_id(&self) -> FeedId;
305
306 /// The effective decode status after backend negotiation.
307 ///
308 /// Returns `None` if no decode decision has been made yet (the stream
309 /// has not started, or the backend does not report decoder identity).
310 ///
311 /// The tuple contains `(outcome, backend_detail)` where
312 /// `backend_detail` is an opaque diagnostic string (e.g., the
313 /// GStreamer element name). Do not match on its contents.
314 fn decode_status(&self) -> Option<(nv_core::health::DecodeOutcome, String)> {
315 None
316 }
317}
318
319/// Receives decoded frames from a [`MediaIngress`] source.
320///
321/// Implemented by the runtime's pipeline executor. The media backend calls
322/// these methods on its internal thread — implementations must be
323/// `Send + Sync` because the callback may be invoked from a GStreamer
324/// streaming thread.
325pub trait FrameSink: Send + Sync + 'static {
326 /// A new decoded frame is available.
327 fn on_frame(&self, frame: FrameEnvelope);
328
329 /// The source encountered an error (e.g., transient decode failure,
330 /// connection loss, or timeout).
331 ///
332 /// The error preserves the original typed [`MediaError`] variant so
333 /// the receiver can distinguish connection failures from decode errors,
334 /// timeouts, etc. without parsing strings.
335 fn on_error(&self, error: MediaError);
336
337 /// End of stream — the source has no more frames to produce from
338 /// the current session.
339 ///
340 /// Called by the source FSM only when the session is definitively
341 /// over: a non-looping file reaching its end, or a live source whose
342 /// reconnection budget is exhausted. This is a terminal signal.
343 ///
344 /// Implementations should close the frame queue so the worker thread
345 /// observes the `Closed` state and exits the processing loop.
346 fn on_eos(&self);
347
348 /// Wake the consumer thread for control-plane processing.
349 ///
350 /// Called by the backend when a lifecycle-relevant bus event occurs
351 /// (error, EOS) to ensure the consumer thread ticks the source
352 /// promptly — even when no frames are flowing and the queue pop
353 /// has no deadline.
354 ///
355 /// The default implementation is a no-op. Implementations backed by
356 /// a frame queue should notify the queue's consumer condvar.
357 fn wake(&self) {}
358}
359
360/// Configuration bundle passed to [`MediaIngressFactory::create()`].
361///
362/// Replaces positional arguments with a named struct so new options can
363/// be added without breaking the trait signature.
364///
365/// Construct via [`IngressOptions::new()`] and the `with_*` builder
366/// methods. The struct is `#[non_exhaustive]`, so adding fields is
367/// not a semver break.
368///
369/// # Examples
370///
371/// ```
372/// use nv_core::config::{ReconnectPolicy, SourceSpec};
373/// use nv_core::id::FeedId;
374/// use nv_media::{DecodePreference, DeviceResidency, IngressOptions};
375///
376/// let options = IngressOptions::new(
377/// FeedId::new(1),
378/// SourceSpec::rtsp("rtsp://cam/stream"),
379/// ReconnectPolicy::default(),
380/// )
381/// .with_decode_preference(DecodePreference::CpuOnly)
382/// .with_device_residency(DeviceResidency::Host);
383/// ```
384#[non_exhaustive]
385pub struct IngressOptions {
386 /// Feed identifier.
387 pub feed_id: FeedId,
388 /// Source specification (RTSP URL, file path, etc.).
389 pub spec: SourceSpec,
390 /// Reconnection policy for the source.
391 pub reconnect: ReconnectPolicy,
392 /// Optional PTZ telemetry provider.
393 pub ptz_provider: Option<Arc<dyn PtzProvider>>,
394 /// Decode preference — controls hardware vs. software decode selection.
395 pub decode_preference: DecodePreference,
396 /// Optional post-decode hook — can inject a pipeline element between
397 /// the decoder and the color-space converter.
398 ///
399 /// **Ignored when `device_residency` is `Provider`** — the provider
400 /// controls the full decoder-to-tail path. Hooks are only evaluated
401 /// for `Host` and `Cuda` residency modes.
402 pub post_decode_hook: Option<PostDecodeHook>,
403 /// Maximum number of media events buffered before new events are
404 /// dropped. Default: 64.
405 pub event_queue_capacity: usize,
406 /// Where decoded frames reside after the bridge.
407 ///
408 /// [`DeviceResidency::Host`] (default) maps frames to CPU memory.
409 /// [`DeviceResidency::Cuda`] uses the built-in CUDA path.
410 /// [`DeviceResidency::Provider`] delegates to a custom provider.
411 ///
412 /// See [`DeviceResidency`] for resolution semantics.
413 pub device_residency: DeviceResidency,
414}
415
416impl IngressOptions {
417 /// Create a new options bundle with required fields.
418 ///
419 /// Optional fields default to:
420 /// - `ptz_provider`: `None`
421 /// - `decode_preference`: [`DecodePreference::Auto`]
422 #[must_use]
423 pub fn new(feed_id: FeedId, spec: SourceSpec, reconnect: ReconnectPolicy) -> Self {
424 Self {
425 feed_id,
426 spec,
427 reconnect,
428 ptz_provider: None,
429 decode_preference: DecodePreference::default(),
430 post_decode_hook: None,
431 event_queue_capacity: 64,
432 device_residency: DeviceResidency::default(),
433 }
434 }
435
436 /// Attach a PTZ telemetry provider.
437 #[must_use]
438 pub fn with_ptz_provider(mut self, provider: Arc<dyn PtzProvider>) -> Self {
439 self.ptz_provider = Some(provider);
440 self
441 }
442
443 /// Set the decode preference.
444 #[must_use]
445 pub fn with_decode_preference(mut self, pref: DecodePreference) -> Self {
446 self.decode_preference = pref;
447 self
448 }
449
450 /// Attach a post-decode hook.
451 #[must_use]
452 pub fn with_post_decode_hook(mut self, hook: PostDecodeHook) -> Self {
453 self.post_decode_hook = Some(hook);
454 self
455 }
456
457 /// Set the event queue capacity. Default: 64.
458 #[must_use]
459 pub fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
460 self.event_queue_capacity = capacity;
461 self
462 }
463
464 /// Set the device residency mode for decoded frames.
465 ///
466 /// [`DeviceResidency::Host`] (default) maps frames to CPU memory.
467 /// [`DeviceResidency::Cuda`] uses the built-in CUDA path.
468 /// [`DeviceResidency::Provider`] delegates to a custom
469 /// [`GpuPipelineProvider`](crate::GpuPipelineProvider).
470 ///
471 /// See [`DeviceResidency`] for resolution semantics and feature
472 /// gating details.
473 #[must_use]
474 pub fn with_device_residency(mut self, residency: DeviceResidency) -> Self {
475 self.device_residency = residency;
476 self
477 }
478}
479
480/// Factory for creating [`MediaIngress`] instances from a source spec.
481///
482/// The runtime holds one factory and calls `create()` for each new feed.
483/// The default implementation ([`DefaultMediaFactory`](super::DefaultMediaFactory))
484/// produces backend-specific media sources. Custom implementations can
485/// substitute alternative backends or test doubles.
486pub trait MediaIngressFactory: Send + Sync + 'static {
487 /// Create a new media ingress for the given feed configuration.
488 ///
489 /// All feed-level options are bundled in [`IngressOptions`], which
490 /// includes an optional [`PtzProvider`] for feeds that have external
491 /// PTZ telemetry (e.g., ONVIF).
492 ///
493 /// # Errors
494 ///
495 /// Returns `MediaError` if the source specification is invalid or
496 /// the backend cannot handle the requested format.
497 fn create(&self, options: IngressOptions) -> Result<Box<dyn MediaIngress>, MediaError>;
498}
499
500/// Receives [`HealthEvent`]s emitted by a [`MediaIngress`] source.
501///
502/// The runtime typically provides an implementation that forwards events
503/// to subscribers via a channel. The media layer calls these methods from
504/// its management thread; implementations must be `Send + Sync`.
505pub trait HealthSink: Send + Sync + 'static {
506 /// Emit a health event.
507 fn emit(&self, event: HealthEvent);
508}
509
510/// Provides the latest PTZ telemetry for a feed.
511///
512/// Implemented by the runtime or an external ONVIF/serial adapter. The
513/// appsink callback queries this on every decoded frame to attach telemetry
514/// to the frame's [`TypedMetadata`](nv_core::TypedMetadata).
515///
516/// Must be `Send + Sync` since it is called from the GStreamer streaming thread.
517pub trait PtzProvider: Send + Sync + 'static {
518 /// Return the latest PTZ telemetry, or `None` if unavailable.
519 fn latest(&self) -> Option<PtzTelemetry>;
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[test]
527 fn device_residency_default_is_host() {
528 assert!(matches!(DeviceResidency::default(), DeviceResidency::Host));
529 }
530
531 #[test]
532 fn device_residency_host_is_not_device() {
533 assert!(!DeviceResidency::Host.is_device());
534 }
535
536 #[test]
537 fn device_residency_cuda_is_device() {
538 assert!(DeviceResidency::Cuda.is_device());
539 }
540
541 #[test]
542 fn device_residency_host_has_no_provider() {
543 assert!(DeviceResidency::Host.provider().is_none());
544 }
545
546 #[test]
547 fn device_residency_cuda_has_no_provider() {
548 assert!(DeviceResidency::Cuda.provider().is_none());
549 }
550
551 #[test]
552 fn device_residency_debug_variants() {
553 assert_eq!(format!("{:?}", DeviceResidency::Host), "Host");
554 assert_eq!(format!("{:?}", DeviceResidency::Cuda), "Cuda");
555 }
556
557 #[test]
558 fn device_residency_provider_is_device() {
559 use crate::gpu_provider::GpuPipelineProvider;
560 #[cfg(feature = "gst-backend")]
561 use nv_core::error::MediaError;
562 #[cfg(feature = "gst-backend")]
563 use nv_core::id::FeedId;
564 #[cfg(feature = "gst-backend")]
565 use nv_frame::PixelFormat;
566 use std::sync::Arc;
567
568 struct StubProvider;
569 impl GpuPipelineProvider for StubProvider {
570 fn name(&self) -> &str {
571 "stub"
572 }
573 #[cfg(feature = "gst-backend")]
574 fn build_pipeline_tail(
575 &self,
576 _: PixelFormat,
577 ) -> Result<crate::gpu_provider::GpuPipelineTail, MediaError> {
578 unimplemented!()
579 }
580 #[cfg(feature = "gst-backend")]
581 fn bridge_sample(
582 &self,
583 _: FeedId,
584 _: &Arc<std::sync::atomic::AtomicU64>,
585 _: PixelFormat,
586 _: &gstreamer::Sample,
587 _: Option<crate::PtzTelemetry>,
588 ) -> Result<nv_frame::FrameEnvelope, MediaError> {
589 unimplemented!()
590 }
591 }
592
593 let p = DeviceResidency::Provider(Arc::new(StubProvider));
594 assert!(p.is_device());
595 assert!(p.provider().is_some());
596 assert_eq!(p.provider().unwrap().name(), "stub");
597 assert_eq!(format!("{p:?}"), "Provider(stub)");
598 }
599
600 fn test_feed_id() -> nv_core::id::FeedId {
601 nv_core::id::FeedId::new(1)
602 }
603
604 fn test_reconnect() -> nv_core::config::ReconnectPolicy {
605 nv_core::config::ReconnectPolicy::default()
606 }
607
608 #[test]
609 fn ingress_options_default_residency_is_host() {
610 let opts = IngressOptions::new(
611 test_feed_id(),
612 SourceSpec::file("/tmp/test.mp4"),
613 test_reconnect(),
614 );
615 assert!(matches!(opts.device_residency, DeviceResidency::Host));
616 }
617
618 #[test]
619 fn ingress_options_with_device_residency() {
620 let opts = IngressOptions::new(
621 test_feed_id(),
622 SourceSpec::file("/tmp/test.mp4"),
623 test_reconnect(),
624 )
625 .with_device_residency(DeviceResidency::Cuda);
626 assert!(matches!(opts.device_residency, DeviceResidency::Cuda));
627 }
628}