use std::sync::Arc;
use std::time::Duration;
use nv_core::config::{ReconnectPolicy, SourceSpec};
use nv_core::error::MediaError;
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use nv_frame::FrameEnvelope;
use crate::bridge::PtzTelemetry;
use crate::decode::DecodePreference;
use crate::gpu_provider::SharedGpuProvider;
use crate::hook::PostDecodeHook;
#[derive(Clone, Default)]
pub enum DeviceResidency {
#[default]
Host,
Cuda,
Provider(SharedGpuProvider),
}
impl std::fmt::Debug for DeviceResidency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Host => write!(f, "Host"),
Self::Cuda => write!(f, "Cuda"),
Self::Provider(p) => write!(f, "Provider({})", p.name()),
}
}
}
impl DeviceResidency {
#[inline]
pub fn is_device(&self) -> bool {
!matches!(self, Self::Host)
}
#[inline]
pub fn provider(&self) -> Option<&SharedGpuProvider> {
match self {
Self::Provider(p) => Some(p),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceStatus {
Running,
Reconnecting,
Stopped,
}
#[derive(Debug, Clone)]
pub struct TickOutcome {
pub status: SourceStatus,
pub next_tick: Option<Duration>,
}
impl TickOutcome {
#[inline]
pub fn running() -> Self {
Self {
status: SourceStatus::Running,
next_tick: None,
}
}
#[inline]
pub fn reconnecting(delay: Duration) -> Self {
Self {
status: SourceStatus::Reconnecting,
next_tick: Some(delay),
}
}
#[inline]
pub fn stopped() -> Self {
Self {
status: SourceStatus::Stopped,
next_tick: None,
}
}
}
pub trait MediaIngress: Send + 'static {
fn start(&mut self, sink: Box<dyn FrameSink>) -> Result<(), MediaError>;
fn stop(&mut self) -> Result<(), MediaError>;
fn pause(&mut self) -> Result<(), MediaError>;
fn resume(&mut self) -> Result<(), MediaError>;
fn tick(&mut self) -> TickOutcome {
TickOutcome::running()
}
fn source_spec(&self) -> &SourceSpec;
fn feed_id(&self) -> FeedId;
fn decode_status(&self) -> Option<(nv_core::health::DecodeOutcome, String)> {
None
}
}
pub trait FrameSink: Send + Sync + 'static {
fn on_frame(&self, frame: FrameEnvelope);
fn on_error(&self, error: MediaError);
fn on_eos(&self);
fn wake(&self) {}
}
#[non_exhaustive]
pub struct IngressOptions {
pub feed_id: FeedId,
pub spec: SourceSpec,
pub reconnect: ReconnectPolicy,
pub ptz_provider: Option<Arc<dyn PtzProvider>>,
pub decode_preference: DecodePreference,
pub post_decode_hook: Option<PostDecodeHook>,
pub event_queue_capacity: usize,
pub device_residency: DeviceResidency,
}
impl IngressOptions {
#[must_use]
pub fn new(feed_id: FeedId, spec: SourceSpec, reconnect: ReconnectPolicy) -> Self {
Self {
feed_id,
spec,
reconnect,
ptz_provider: None,
decode_preference: DecodePreference::default(),
post_decode_hook: None,
event_queue_capacity: 64,
device_residency: DeviceResidency::default(),
}
}
#[must_use]
pub fn with_ptz_provider(mut self, provider: Arc<dyn PtzProvider>) -> Self {
self.ptz_provider = Some(provider);
self
}
#[must_use]
pub fn with_decode_preference(mut self, pref: DecodePreference) -> Self {
self.decode_preference = pref;
self
}
#[must_use]
pub fn with_post_decode_hook(mut self, hook: PostDecodeHook) -> Self {
self.post_decode_hook = Some(hook);
self
}
#[must_use]
pub fn with_event_queue_capacity(mut self, capacity: usize) -> Self {
self.event_queue_capacity = capacity;
self
}
#[must_use]
pub fn with_device_residency(mut self, residency: DeviceResidency) -> Self {
self.device_residency = residency;
self
}
}
pub trait MediaIngressFactory: Send + Sync + 'static {
fn create(&self, options: IngressOptions) -> Result<Box<dyn MediaIngress>, MediaError>;
}
pub trait HealthSink: Send + Sync + 'static {
fn emit(&self, event: HealthEvent);
}
pub trait PtzProvider: Send + Sync + 'static {
fn latest(&self) -> Option<PtzTelemetry>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn device_residency_default_is_host() {
assert!(matches!(DeviceResidency::default(), DeviceResidency::Host));
}
#[test]
fn device_residency_host_is_not_device() {
assert!(!DeviceResidency::Host.is_device());
}
#[test]
fn device_residency_cuda_is_device() {
assert!(DeviceResidency::Cuda.is_device());
}
#[test]
fn device_residency_host_has_no_provider() {
assert!(DeviceResidency::Host.provider().is_none());
}
#[test]
fn device_residency_cuda_has_no_provider() {
assert!(DeviceResidency::Cuda.provider().is_none());
}
#[test]
fn device_residency_debug_variants() {
assert_eq!(format!("{:?}", DeviceResidency::Host), "Host");
assert_eq!(format!("{:?}", DeviceResidency::Cuda), "Cuda");
}
#[test]
fn device_residency_provider_is_device() {
use crate::gpu_provider::GpuPipelineProvider;
#[cfg(feature = "gst-backend")]
use nv_core::error::MediaError;
#[cfg(feature = "gst-backend")]
use nv_core::id::FeedId;
#[cfg(feature = "gst-backend")]
use nv_frame::PixelFormat;
use std::sync::Arc;
struct StubProvider;
impl GpuPipelineProvider for StubProvider {
fn name(&self) -> &str {
"stub"
}
#[cfg(feature = "gst-backend")]
fn build_pipeline_tail(
&self,
_: PixelFormat,
) -> Result<crate::gpu_provider::GpuPipelineTail, MediaError> {
unimplemented!()
}
#[cfg(feature = "gst-backend")]
fn bridge_sample(
&self,
_: FeedId,
_: &Arc<std::sync::atomic::AtomicU64>,
_: PixelFormat,
_: &gstreamer::Sample,
_: Option<crate::PtzTelemetry>,
) -> Result<nv_frame::FrameEnvelope, MediaError> {
unimplemented!()
}
}
let p = DeviceResidency::Provider(Arc::new(StubProvider));
assert!(p.is_device());
assert!(p.provider().is_some());
assert_eq!(p.provider().unwrap().name(), "stub");
assert_eq!(format!("{p:?}"), "Provider(stub)");
}
fn test_feed_id() -> nv_core::id::FeedId {
nv_core::id::FeedId::new(1)
}
fn test_reconnect() -> nv_core::config::ReconnectPolicy {
nv_core::config::ReconnectPolicy::default()
}
#[test]
fn ingress_options_default_residency_is_host() {
let opts = IngressOptions::new(
test_feed_id(),
SourceSpec::file("/tmp/test.mp4"),
test_reconnect(),
);
assert!(matches!(opts.device_residency, DeviceResidency::Host));
}
#[test]
fn ingress_options_with_device_residency() {
let opts = IngressOptions::new(
test_feed_id(),
SourceSpec::file("/tmp/test.mp4"),
test_reconnect(),
)
.with_device_residency(DeviceResidency::Cuda);
assert!(matches!(opts.device_residency, DeviceResidency::Cuda));
}
}