Skip to main content

nv_media/
ingress.rs

1//! Public trait contracts for media ingress.
2//!
3//! These traits define the boundary between the media backend and the rest of
4//! the library. They ensure that no backend-specific types leak into the public
5//! API, and that alternative media backends can be substituted by implementing
6//! [`MediaIngressFactory`].
7//!
8//! The default implementation is GStreamer-backed (see [`DefaultMediaFactory`](super::DefaultMediaFactory)).
9//! All other crates interact through these traits, never through GStreamer types.
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use nv_core::config::{ReconnectPolicy, SourceSpec};
15use nv_core::error::MediaError;
16use nv_core::health::HealthEvent;
17use nv_core::id::FeedId;
18use nv_frame::FrameEnvelope;
19
20use crate::bridge::PtzTelemetry;
21use crate::decode::DecodePreference;
22use crate::gpu_provider::SharedGpuProvider;
23use crate::hook::PostDecodeHook;
24
25/// Where decoded frames reside after the media bridge.
26///
27/// Controls whether the pipeline tail maps frames to host memory
28/// (default), uses the built-in CUDA path, or delegates to a custom
29/// [`GpuPipelineProvider`](crate::GpuPipelineProvider).
30///
31/// # Resolution order
32///
33/// The pipeline builder resolves the device residency strategy in this
34/// order:
35///
36/// 1. **`Provider`** — the provider builds the pipeline tail and bridges
37///    frames.  No built-in CUDA elements are needed.  **If the provider
38///    fails, pipeline construction returns an error** — there is no
39///    silent fallback to Host, because the user explicitly selected a
40///    specific hardware integration.
41/// 2. **`Cuda`** — the built-in upstream CUDA path (`cudaupload`,
42///    `cudaconvert`).  Requires the `cuda` cargo feature and GStreamer
43///    ≥ 1.20.  Falls back to `Host` with a warning if the elements are
44///    not available.
45/// 3. **`Host`** — standard `videoconvert → appsink` path.  Frames
46///    arrive in CPU-accessible mapped memory.
47///
48/// # Feature gating
49///
50/// - `Cuda` requires the `cuda` cargo feature on `nv-media`.  Without
51///   it, requesting `Cuda` returns `MediaError::Unsupported`.
52/// - `Provider` does **not** require the `cuda` feature — the provider
53///   decides which GStreamer elements to use.
54///
55/// # Hardware extension
56///
57/// To add support for a new hardware backend (e.g., AMD ROCm), implement
58/// [`GpuPipelineProvider`](crate::GpuPipelineProvider) in a new crate
59/// and pass `DeviceResidency::Provider(Arc::new(MyProvider::new()))`.
60/// No changes to the core library are needed.
61#[derive(Clone, Default)]
62pub enum DeviceResidency {
63    /// Frames are mapped to CPU-accessible memory (zero-copy GStreamer
64    /// buffer mapping). This is the standard path.
65    #[default]
66    Host,
67    /// Frames remain on the GPU as CUDA device memory via the built-in
68    /// upstream GStreamer CUDA elements (`cudaupload`, `cudaconvert`).
69    ///
70    /// Requires the `cuda` cargo feature. On GStreamer < 1.20 (where the
71    /// CUDA elements are unavailable), falls back to `Host` with a
72    /// warning.
73    ///
74    /// Stages access the device pointer via
75    /// [`FrameEnvelope::accelerated_handle::<CudaBufferHandle>()`](nv_frame::FrameEnvelope::accelerated_handle).
76    /// CPU consumers can still call
77    /// [`FrameEnvelope::require_host_data()`](nv_frame::FrameEnvelope::require_host_data)
78    /// — the materializer downloads device data on first access and
79    /// caches the result.
80    Cuda,
81    /// Frames remain on a device managed by the supplied provider.
82    ///
83    /// The provider controls pipeline tail construction and frame
84    /// bridging.  This variant does **not** require the `cuda` cargo
85    /// feature — the provider decides which GStreamer elements and
86    /// memory model to use.
87    ///
88    /// See [`GpuPipelineProvider`](crate::GpuPipelineProvider) for the
89    /// trait contract.
90    Provider(SharedGpuProvider),
91}
92
93impl std::fmt::Debug for DeviceResidency {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            Self::Host => write!(f, "Host"),
97            Self::Cuda => write!(f, "Cuda"),
98            Self::Provider(p) => write!(f, "Provider({})", p.name()),
99        }
100    }
101}
102
103impl DeviceResidency {
104    /// Whether this residency requests device-resident frames (non-host).
105    #[inline]
106    pub fn is_device(&self) -> bool {
107        !matches!(self, Self::Host)
108    }
109
110    /// Extract the provider reference, if this is the `Provider` variant.
111    #[inline]
112    pub fn provider(&self) -> Option<&SharedGpuProvider> {
113        match self {
114            Self::Provider(p) => Some(p),
115            _ => None,
116        }
117    }
118}
119
120/// Reported lifecycle state of a media source after a [`tick()`](MediaIngress::tick).
121///
122/// The runtime uses this to decide whether the feed is still alive,
123/// currently recovering, or permanently stopped.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum SourceStatus {
126    /// The source is running normally — frames are (or will be) flowing.
127    Running,
128    /// The source is attempting to reconnect after a failure.
129    ///
130    /// Frames are not flowing. The runtime should keep ticking to drive
131    /// the reconnection state machine forward.
132    Reconnecting,
133    /// The source is permanently stopped (reconnection budget exhausted,
134    /// terminal EOS, or explicit stop).
135    Stopped,
136}
137
138/// Result of a [`MediaIngress::tick()`] call.
139///
140/// Combines the source's lifecycle status with an optional scheduling hint
141/// that tells the runtime how soon the next tick is needed.
142#[derive(Debug, Clone)]
143pub struct TickOutcome {
144    /// Current lifecycle state of the source.
145    pub status: SourceStatus,
146    /// Suggested delay before the next `tick()` call.
147    ///
148    /// - `Some(d)` — the source has a pending deadline (e.g., a reconnect
149    ///   backoff that expires in `d`). The runtime should arrange to tick
150    ///   again after at most `d`.
151    /// - `None` — no specific urgency. The runtime will wait indefinitely
152    ///   for the next frame, EOS, or error — no periodic tick occurs.
153    ///   Sources that rely on polling to discover state changes (e.g., a
154    ///   stopped flag) **must** provide a `Some` hint.
155    pub next_tick: Option<Duration>,
156}
157
158impl TickOutcome {
159    /// Source is running, no specific tick urgency.
160    #[inline]
161    pub fn running() -> Self {
162        Self {
163            status: SourceStatus::Running,
164            next_tick: None,
165        }
166    }
167
168    /// Source is reconnecting; tick again after `delay`.
169    #[inline]
170    pub fn reconnecting(delay: Duration) -> Self {
171        Self {
172            status: SourceStatus::Reconnecting,
173            next_tick: Some(delay),
174        }
175    }
176
177    /// Source is permanently stopped.
178    #[inline]
179    pub fn stopped() -> Self {
180        Self {
181            status: SourceStatus::Stopped,
182            next_tick: None,
183        }
184    }
185}
186
187/// Trait contract for a media ingress source.
188///
189/// Abstracts source lifecycle so the runtime and pipeline code do not depend
190/// on GStreamer internals. The GStreamer-backed `MediaSource` implements this
191/// trait within `nv-media`.
192///
193/// Each `MediaIngress` instance handles exactly one feed. Implementations must
194/// be `Send` (moved onto the feed's I/O thread at startup) but need not be `Sync`.
195///
196/// # Lifecycle
197///
198/// 1. `start(sink)` — begin producing frames, delivering them via `sink`.
199/// 2. Frames flow: decoded frames are pushed to `sink.on_frame()`.
200/// 3. The runtime calls `tick()` on every processed frame **and** whenever
201///    the frame queue times out according to [`TickOutcome::next_tick`].
202///    This is event-driven: when `next_tick` is `None`, the runtime waits
203///    indefinitely for the next frame, EOS, or error — there is **no**
204///    fixed polling interval.
205/// 4. `pause()` / `resume()` — temporarily halt/restart frame production.
206/// 5. `stop()` — tear down the source and release all resources.
207///
208/// # Tick scheduling
209///
210/// The runtime does **not** poll at a fixed interval. Tick frequency is
211/// determined entirely by frame arrivals and the `next_tick` hint:
212///
213/// - When frames are flowing, `tick()` is called after each frame.
214/// - When the queue is idle and `next_tick` is `Some(d)`, the runtime
215///   wakes after `d` to call `tick()`.
216/// - When the queue is idle and `next_tick` is `None`, the runtime
217///   sleeps indefinitely — only a new frame, `on_error()`, `on_eos()`,
218///   or shutdown will wake it.
219///
220/// Sources that need periodic management (e.g., reconnection with
221/// backoff) **must** return `Some(remaining)` in `next_tick` so the
222/// runtime wakes at the right time. Sources that are purely
223/// frame-driven (e.g., test doubles) can return `None`.
224pub trait MediaIngress: Send + 'static {
225    /// Begin producing frames.
226    ///
227    /// Decoded frames are delivered to `sink`. The implementation may spawn
228    /// internal threads or use the calling thread, depending on the backend.
229    ///
230    /// On success the source enters the `Running` state immediately.
231    /// On failure the source remains in the `Idle` state and the caller
232    /// (typically the runtime) may retry. The returned [`MediaError`] carries
233    /// the typed reason for the failure (e.g., `Unsupported` when the
234    /// backend is not linked, `ConnectionFailed` when the stream cannot
235    /// be reached).
236    ///
237    /// # Errors
238    ///
239    /// Returns `MediaError` if the pipeline cannot be constructed or the
240    /// source cannot be connected on initial attempt.
241    fn start(&mut self, sink: Box<dyn FrameSink>) -> Result<(), MediaError>;
242
243    /// Stop the source and release all backend resources.
244    ///
245    /// After `stop()`, the source may not be restarted.
246    ///
247    /// # Errors
248    ///
249    /// Returns `MediaError` if teardown encounters an issue.
250    fn stop(&mut self) -> Result<(), MediaError>;
251
252    /// Pause frame production without releasing the connection.
253    ///
254    /// # Errors
255    ///
256    /// Returns `MediaError` if the backend does not support pausing or
257    /// the source is not in a running state.
258    fn pause(&mut self) -> Result<(), MediaError>;
259
260    /// Resume frame production after a pause.
261    ///
262    /// # Errors
263    ///
264    /// Returns `MediaError` if the source is not paused.
265    fn resume(&mut self) -> Result<(), MediaError>;
266
267    /// Drive internal lifecycle management: poll the backend bus for
268    /// errors/events and advance the reconnection state machine.
269    ///
270    /// Called by the runtime after each processed frame and whenever
271    /// the frame queue times out according to the previous
272    /// [`TickOutcome::next_tick`] hint. There is no fixed polling
273    /// interval — tick frequency is entirely event-driven.
274    ///
275    /// Implementations should:
276    ///
277    /// 1. Drain pending bus messages and process lifecycle events.
278    /// 2. If in a reconnecting state and the backoff deadline has elapsed,
279    ///    attempt reconnection.
280    /// 3. Return a [`TickOutcome`] carrying the current status and an
281    ///    optional hint for when the next tick is needed.
282    ///
283    /// The `next_tick` hint allows the runtime to sleep efficiently.
284    /// For example, when reconnecting with a 2-second backoff,
285    /// `next_tick` should be `Some(remaining)` so the runtime wakes
286    /// exactly when the backoff expires. Returning `None` means the
287    /// runtime will wait indefinitely for the next frame/error/EOS.
288    ///
289    /// Sources that need to poll for state changes **must** provide a
290    /// `Some` hint — without one, the runtime will not call `tick()`
291    /// again until a frame or error arrives.
292    ///
293    /// For sources that do not need periodic management (e.g., test
294    /// doubles that produce all frames upfront), the default
295    /// implementation returns [`TickOutcome::running()`].
296    fn tick(&mut self) -> TickOutcome {
297        TickOutcome::running()
298    }
299
300    /// The source specification this ingress was created from.
301    fn source_spec(&self) -> &SourceSpec;
302
303    /// The feed ID this ingress is associated with.
304    fn feed_id(&self) -> FeedId;
305
306    /// The effective decode status after backend negotiation.
307    ///
308    /// Returns `None` if no decode decision has been made yet (the stream
309    /// has not started, or the backend does not report decoder identity).
310    ///
311    /// The tuple contains `(outcome, backend_detail)` where
312    /// `backend_detail` is an opaque diagnostic string (e.g., the
313    /// GStreamer element name). Do not match on its contents.
314    fn decode_status(&self) -> Option<(nv_core::health::DecodeOutcome, String)> {
315        None
316    }
317}
318
319/// Receives decoded frames from a [`MediaIngress`] source.
320///
321/// Implemented by the runtime's pipeline executor. The media backend calls
322/// these methods on its internal thread — implementations must be
323/// `Send + Sync` because the callback may be invoked from a GStreamer
324/// streaming thread.
325pub trait FrameSink: Send + Sync + 'static {
326    /// A new decoded frame is available.
327    fn on_frame(&self, frame: FrameEnvelope);
328
329    /// The source encountered an error (e.g., transient decode failure,
330    /// connection loss, or timeout).
331    ///
332    /// The error preserves the original typed [`MediaError`] variant so
333    /// the receiver can distinguish connection failures from decode errors,
334    /// timeouts, etc. without parsing strings.
335    fn on_error(&self, error: MediaError);
336
337    /// End of stream — the source has no more frames to produce from
338    /// the current session.
339    ///
340    /// Called by the source FSM only when the session is definitively
341    /// over: a non-looping file reaching its end, or a live source whose
342    /// reconnection budget is exhausted. This is a terminal signal.
343    ///
344    /// Implementations should close the frame queue so the worker thread
345    /// observes the `Closed` state and exits the processing loop.
346    fn on_eos(&self);
347
348    /// Wake the consumer thread for control-plane processing.
349    ///
350    /// Called by the backend when a lifecycle-relevant bus event occurs
351    /// (error, EOS) to ensure the consumer thread ticks the source
352    /// promptly — even when no frames are flowing and the queue pop
353    /// has no deadline.
354    ///
355    /// The default implementation is a no-op. Implementations backed by
356    /// a frame queue should notify the queue's consumer condvar.
357    fn wake(&self) {}
358}
359
360/// Configuration bundle passed to [`MediaIngressFactory::create()`].
361///
362/// Replaces positional arguments with a named struct so new options can
363/// be added without breaking the trait signature.
364///
365/// Construct via [`IngressOptions::new()`] and the `with_*` builder
366/// methods. The struct is `#[non_exhaustive]`, so adding fields is
367/// not a semver break.
368///
369/// # Examples
370///
371/// ```
372/// use nv_core::config::{ReconnectPolicy, SourceSpec};
373/// use nv_core::id::FeedId;
374/// use nv_media::{DecodePreference, DeviceResidency, IngressOptions};
375///
376/// let options = IngressOptions::new(
377///         FeedId::new(1),
378///         SourceSpec::rtsp("rtsp://cam/stream"),
379///         ReconnectPolicy::default(),
380///     )
381///     .with_decode_preference(DecodePreference::CpuOnly)
382///     .with_device_residency(DeviceResidency::Host);
383/// ```
384#[non_exhaustive]
385pub struct IngressOptions {
386    /// Feed identifier.
387    pub feed_id: FeedId,
388    /// Source specification (RTSP URL, file path, etc.).
389    pub spec: SourceSpec,
390    /// Reconnection policy for the source.
391    pub reconnect: ReconnectPolicy,
392    /// Optional PTZ telemetry provider.
393    pub ptz_provider: Option<Arc<dyn PtzProvider>>,
394    /// Decode preference — controls hardware vs. software decode selection.
395    pub decode_preference: DecodePreference,
396    /// Optional post-decode hook — can inject a pipeline element between
397    /// the decoder and the color-space converter.
398    ///
399    /// **Ignored when `device_residency` is `Provider`** — the provider
400    /// controls the full decoder-to-tail path. Hooks are only evaluated
401    /// for `Host` and `Cuda` residency modes.
402    pub post_decode_hook: Option<PostDecodeHook>,
403    /// Maximum number of media events buffered before new events are
404    /// dropped. Default: 64.
405    pub event_queue_capacity: usize,
406    /// Where decoded frames reside after the bridge.
407    ///
408    /// [`DeviceResidency::Host`] (default) maps frames to CPU memory.
409    /// [`DeviceResidency::Cuda`] uses the built-in CUDA path.
410    /// [`DeviceResidency::Provider`] delegates to a custom provider.
411    ///
412    /// See [`DeviceResidency`] for resolution semantics.
413    pub device_residency: DeviceResidency,
414}
415
416impl IngressOptions {
417    /// Create a new options bundle with required fields.
418    ///
419    /// Optional fields default to:
420    /// - `ptz_provider`: `None`
421    /// - `decode_preference`: [`DecodePreference::Auto`]
422    #[must_use]
423    pub fn new(feed_id: FeedId, spec: SourceSpec, reconnect: ReconnectPolicy) -> Self {
424        Self {
425            feed_id,
426            spec,
427            reconnect,
428            ptz_provider: None,
429            decode_preference: DecodePreference::default(),
430            post_decode_hook: None,
431            event_queue_capacity: 64,
432            device_residency: DeviceResidency::default(),
433        }
434    }
435
436    /// Attach a PTZ telemetry provider.
437    #[must_use]
438    pub fn with_ptz_provider(mut self, provider: Arc<dyn PtzProvider>) -> Self {
439        self.ptz_provider = Some(provider);
440        self
441    }
442
443    /// Set the decode preference.
444    #[must_use]
445    pub fn with_decode_preference(mut self, pref: DecodePreference) -> Self {
446        self.decode_preference = pref;
447        self
448    }
449
450    /// Attach a post-decode hook.
451    #[must_use]
452    pub fn with_post_decode_hook(mut self, hook: PostDecodeHook) -> Self {
453        self.post_decode_hook = Some(hook);
454        self
455    }
456
457    /// Set the event queue capacity. Default: 64.
458    #[must_use]
459    pub fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
460        self.event_queue_capacity = capacity;
461        self
462    }
463
464    /// Set the device residency mode for decoded frames.
465    ///
466    /// [`DeviceResidency::Host`] (default) maps frames to CPU memory.
467    /// [`DeviceResidency::Cuda`] uses the built-in CUDA path.
468    /// [`DeviceResidency::Provider`] delegates to a custom
469    /// [`GpuPipelineProvider`](crate::GpuPipelineProvider).
470    ///
471    /// See [`DeviceResidency`] for resolution semantics and feature
472    /// gating details.
473    #[must_use]
474    pub fn with_device_residency(mut self, residency: DeviceResidency) -> Self {
475        self.device_residency = residency;
476        self
477    }
478}
479
480/// Factory for creating [`MediaIngress`] instances from a source spec.
481///
482/// The runtime holds one factory and calls `create()` for each new feed.
483/// The default implementation ([`DefaultMediaFactory`](super::DefaultMediaFactory))
484/// produces backend-specific media sources. Custom implementations can
485/// substitute alternative backends or test doubles.
486pub trait MediaIngressFactory: Send + Sync + 'static {
487    /// Create a new media ingress for the given feed configuration.
488    ///
489    /// All feed-level options are bundled in [`IngressOptions`], which
490    /// includes an optional [`PtzProvider`] for feeds that have external
491    /// PTZ telemetry (e.g., ONVIF).
492    ///
493    /// # Errors
494    ///
495    /// Returns `MediaError` if the source specification is invalid or
496    /// the backend cannot handle the requested format.
497    fn create(&self, options: IngressOptions) -> Result<Box<dyn MediaIngress>, MediaError>;
498}
499
500/// Receives [`HealthEvent`]s emitted by a [`MediaIngress`] source.
501///
502/// The runtime typically provides an implementation that forwards events
503/// to subscribers via a channel. The media layer calls these methods from
504/// its management thread; implementations must be `Send + Sync`.
505pub trait HealthSink: Send + Sync + 'static {
506    /// Emit a health event.
507    fn emit(&self, event: HealthEvent);
508}
509
510/// Provides the latest PTZ telemetry for a feed.
511///
512/// Implemented by the runtime or an external ONVIF/serial adapter. The
513/// appsink callback queries this on every decoded frame to attach telemetry
514/// to the frame's [`TypedMetadata`](nv_core::TypedMetadata).
515///
516/// Must be `Send + Sync` since it is called from the GStreamer streaming thread.
517pub trait PtzProvider: Send + Sync + 'static {
518    /// Return the latest PTZ telemetry, or `None` if unavailable.
519    fn latest(&self) -> Option<PtzTelemetry>;
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn device_residency_default_is_host() {
528        assert!(matches!(DeviceResidency::default(), DeviceResidency::Host));
529    }
530
531    #[test]
532    fn device_residency_host_is_not_device() {
533        assert!(!DeviceResidency::Host.is_device());
534    }
535
536    #[test]
537    fn device_residency_cuda_is_device() {
538        assert!(DeviceResidency::Cuda.is_device());
539    }
540
541    #[test]
542    fn device_residency_host_has_no_provider() {
543        assert!(DeviceResidency::Host.provider().is_none());
544    }
545
546    #[test]
547    fn device_residency_cuda_has_no_provider() {
548        assert!(DeviceResidency::Cuda.provider().is_none());
549    }
550
551    #[test]
552    fn device_residency_debug_variants() {
553        assert_eq!(format!("{:?}", DeviceResidency::Host), "Host");
554        assert_eq!(format!("{:?}", DeviceResidency::Cuda), "Cuda");
555    }
556
557    #[test]
558    fn device_residency_provider_is_device() {
559        use crate::gpu_provider::GpuPipelineProvider;
560        #[cfg(feature = "gst-backend")]
561        use nv_core::error::MediaError;
562        #[cfg(feature = "gst-backend")]
563        use nv_core::id::FeedId;
564        #[cfg(feature = "gst-backend")]
565        use nv_frame::PixelFormat;
566        use std::sync::Arc;
567
568        struct StubProvider;
569        impl GpuPipelineProvider for StubProvider {
570            fn name(&self) -> &str {
571                "stub"
572            }
573            #[cfg(feature = "gst-backend")]
574            fn build_pipeline_tail(
575                &self,
576                _: PixelFormat,
577            ) -> Result<crate::gpu_provider::GpuPipelineTail, MediaError> {
578                unimplemented!()
579            }
580            #[cfg(feature = "gst-backend")]
581            fn bridge_sample(
582                &self,
583                _: FeedId,
584                _: &Arc<std::sync::atomic::AtomicU64>,
585                _: PixelFormat,
586                _: &gstreamer::Sample,
587                _: Option<crate::PtzTelemetry>,
588            ) -> Result<nv_frame::FrameEnvelope, MediaError> {
589                unimplemented!()
590            }
591        }
592
593        let p = DeviceResidency::Provider(Arc::new(StubProvider));
594        assert!(p.is_device());
595        assert!(p.provider().is_some());
596        assert_eq!(p.provider().unwrap().name(), "stub");
597        assert_eq!(format!("{p:?}"), "Provider(stub)");
598    }
599
600    fn test_feed_id() -> nv_core::id::FeedId {
601        nv_core::id::FeedId::new(1)
602    }
603
604    fn test_reconnect() -> nv_core::config::ReconnectPolicy {
605        nv_core::config::ReconnectPolicy::default()
606    }
607
608    #[test]
609    fn ingress_options_default_residency_is_host() {
610        let opts = IngressOptions::new(
611            test_feed_id(),
612            SourceSpec::file("/tmp/test.mp4"),
613            test_reconnect(),
614        );
615        assert!(matches!(opts.device_residency, DeviceResidency::Host));
616    }
617
618    #[test]
619    fn ingress_options_with_device_residency() {
620        let opts = IngressOptions::new(
621            test_feed_id(),
622            SourceSpec::file("/tmp/test.mp4"),
623            test_reconnect(),
624        )
625        .with_device_residency(DeviceResidency::Cuda);
626        assert!(matches!(opts.device_residency, DeviceResidency::Cuda));
627    }
628}