use std::collections::VecDeque;
use std::sync::atomic::AtomicU64;
#[cfg(feature = "gst-backend")]
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use nv_core::config::SourceSpec;
use nv_core::error::MediaError;
use nv_core::id::FeedId;
use crate::bus::BusMessage;
#[cfg(feature = "gst-backend")]
use crate::clock::PtsTracker;
use crate::decode::{DecoderSelection, SelectedDecoderInfo, SelectedDecoderSlot};
use crate::event::MediaEvent;
use crate::hook::PostDecodeHook;
use crate::ingress::{DeviceResidency, FrameSink, PtzProvider};
use crate::pipeline::OutputFormat;
#[cfg(feature = "gst-backend")]
use crate::pipeline::PipelineBuilder;
pub(crate) type EventQueue = Arc<Mutex<VecDeque<MediaEvent>>>;
pub(crate) const EVENT_QUEUE_CAPACITY: usize = 64;
#[cfg(feature = "gst-backend")]
const BRIDGE_FAIL_ESCALATION_THRESHOLD: u64 = 30;
pub(crate) struct SessionConfig {
pub feed_id: FeedId,
pub spec: SourceSpec,
pub decoder: DecoderSelection,
pub output_format: OutputFormat,
pub ptz_provider: Option<Arc<dyn PtzProvider>>,
pub post_decode_hook: Option<PostDecodeHook>,
#[cfg_attr(not(feature = "gst-backend"), allow(dead_code))]
pub event_queue_capacity: usize,
pub device_residency: DeviceResidency,
}
impl std::fmt::Debug for SessionConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionConfig")
.field("feed_id", &self.feed_id)
.field("spec", &self.spec)
.field("decoder", &self.decoder)
.field("output_format", &self.output_format)
.field("ptz_provider", &self.ptz_provider.as_ref().map(|_| ".."))
.field(
"post_decode_hook",
&self.post_decode_hook.as_ref().map(|_| ".."),
)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SessionState {
Running,
Paused,
Stopped,
}
pub(crate) struct GstSession {
#[allow(dead_code)] feed_id: FeedId,
#[allow(dead_code)]
config: SessionConfig,
state: SessionState,
#[allow(dead_code)] frame_seq: Arc<AtomicU64>,
gpu_resident: bool,
selected_decoder: SelectedDecoderSlot,
#[cfg(feature = "gst-backend")]
pipeline: gstreamer::Pipeline,
#[cfg(feature = "gst-backend")]
bus: gstreamer::Bus,
#[cfg(feature = "gst-backend")]
_appsink: gstreamer_app::AppSink,
}
unsafe impl Send for GstSession {}
impl GstSession {
#[cfg(feature = "gst-backend")]
pub fn start(
config: SessionConfig,
sink: Arc<dyn FrameSink>,
event_queue: EventQueue,
) -> Result<Self, MediaError> {
use gstreamer as gst;
use gstreamer::prelude::*;
gst::init().map_err(|e| MediaError::Unsupported {
detail: format!("GStreamer init failed: {e}"),
})?;
let built = PipelineBuilder::new(config.spec.clone())
.decoder(config.decoder.clone())
.output_format(config.output_format)
.post_decode_hook(config.post_decode_hook.clone())
.device_residency(config.device_residency.clone())
.build()?;
let feed_id = config.feed_id;
let output_format = built.output_format;
let gpu_resident = built.gpu_resident;
let gpu_provider = built.gpu_provider.clone();
let frame_seq = Arc::new(AtomicU64::new(0));
let pts_tracker = Arc::new(Mutex::new(PtsTracker::new()));
let ptz = config.ptz_provider.clone();
let seq_counter = Arc::clone(&frame_seq);
let sink_clone = Arc::clone(&sink);
let sink_wake = Arc::clone(&sink);
let pts_clone = Arc::clone(&pts_tracker);
let eq_clone = Arc::clone(&event_queue);
let eq_capacity = config.event_queue_capacity;
let bridge_fail_count = Arc::new(AtomicU64::new(0));
let consecutive_fails = Arc::new(AtomicU64::new(0));
let fail_counter = Arc::clone(&bridge_fail_count);
let consec_counter = Arc::clone(&consecutive_fails);
built.appsink.set_callbacks(
gstreamer_app::AppSinkCallbacks::builder()
.new_sample(move |appsink| {
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let ptz_telemetry = ptz.as_ref().and_then(|p| p.latest());
let bridge_result = if gpu_resident {
if let Some(ref provider) = gpu_provider {
provider.bridge_sample(
feed_id,
&seq_counter,
output_format.to_pixel_format(),
&sample,
ptz_telemetry,
)
} else {
#[cfg(feature = "cuda")]
{
crate::gpu::bridge_gst_sample_device(
feed_id,
&seq_counter,
output_format,
&sample,
ptz_telemetry,
)
}
#[cfg(not(feature = "cuda"))]
{
let _ = ptz_telemetry;
Err(nv_core::error::MediaError::Unsupported {
detail: "CUDA feature not enabled".into(),
})
}
}
} else {
crate::bridge::bridge_gst_sample(
feed_id,
&seq_counter,
output_format,
&sample,
ptz_telemetry,
)
};
match bridge_result {
Ok(frame) => {
consec_counter.store(0, Ordering::Relaxed);
let pts_ns = frame.ts().as_nanos();
if let Ok(mut tracker) = pts_clone.lock() {
let result = tracker.observe(pts_ns);
if let crate::clock::PtsResult::Discontinuity {
gap_ns,
prev_ns,
current_ns,
} = result
{
if let Ok(mut q) = eq_clone.lock() {
if q.len() < eq_capacity {
q.push_back(MediaEvent::Discontinuity {
gap_ns,
prev_pts_ns: prev_ns,
current_pts_ns: current_ns,
});
} else {
tracing::warn!(
feed_id = %feed_id,
"event queue full, dropping discontinuity event"
);
}
} else {
tracing::warn!(
feed_id = %feed_id,
"event queue lock poisoned, dropping discontinuity event"
);
}
}
} else {
tracing::warn!(
feed_id = %feed_id,
"PTS tracker lock poisoned, skipping discontinuity detection"
);
}
sink_clone.on_frame(frame);
Ok(gst::FlowSuccess::Ok)
}
Err(e) => {
let n = fail_counter.fetch_add(1, Ordering::Relaxed) + 1;
let consecutive = consec_counter.fetch_add(1, Ordering::Relaxed) + 1;
if n <= 3 || n.is_multiple_of(100) {
tracing::warn!(
feed_id = %feed_id,
error = %e,
total_failures = n,
consecutive,
"bridge failed, dropping frame{}",
if n == 3 { " (further warnings throttled, logged every 100)" } else { "" },
);
}
if consecutive == BRIDGE_FAIL_ESCALATION_THRESHOLD {
tracing::error!(
feed_id = %feed_id,
consecutive,
"bridge failure streak reached escalation threshold — \
reporting error to source FSM for recovery",
);
sink_clone.on_error(nv_core::error::MediaError::DecodeFailed {
detail: format!(
"bridge failed on {consecutive} consecutive frames: {e}"
),
});
}
Ok(gst::FlowSuccess::Ok)
}
}
})
.eos(move |_appsink| {
sink.wake();
})
.build(),
);
let sink_bus = Arc::downgrade(&sink_wake);
built.bus.set_sync_handler(move |_bus, msg| {
use gstreamer::MessageView;
match msg.view() {
MessageView::Error(_)
| MessageView::Warning(_)
| MessageView::Eos(_)
| MessageView::StreamStart(_) => {
if let Some(s) = sink_bus.upgrade() {
s.wake();
}
}
_ => {}
}
gst::BusSyncReply::Pass
});
built
.pipeline
.set_state(gst::State::Playing)
.map_err(|e| MediaError::Unsupported {
detail: format!("failed to set pipeline to Playing: {e}"),
})?;
tracing::info!(
feed_id = %feed_id,
source = ?config.spec,
"GStreamer pipeline started"
);
Ok(Self {
feed_id,
config,
state: SessionState::Running,
gpu_resident,
frame_seq,
selected_decoder: built.selected_decoder,
pipeline: built.pipeline,
bus: built.bus,
_appsink: built.appsink,
})
}
#[cfg(not(feature = "gst-backend"))]
pub fn start(
config: SessionConfig,
_sink: Arc<dyn FrameSink>,
_event_queue: EventQueue,
) -> Result<Self, MediaError> {
let _ = config;
Err(MediaError::Unsupported {
detail: "GStreamer backend not linked (enable the `gst-backend` feature)".into(),
})
}
#[cfg(test)]
pub fn start_stub(config: SessionConfig) -> Self {
let feed_id = config.feed_id;
Self {
feed_id,
config,
state: SessionState::Running,
gpu_resident: false,
frame_seq: Arc::new(AtomicU64::new(0)),
selected_decoder: Arc::new(Mutex::new(None)),
#[cfg(feature = "gst-backend")]
pipeline: {
gstreamer::init().unwrap();
gstreamer::Pipeline::new()
},
#[cfg(feature = "gst-backend")]
bus: {
use gstreamer::prelude::*;
gstreamer::Pipeline::new().bus().unwrap()
},
#[cfg(feature = "gst-backend")]
_appsink: { gstreamer_app::AppSink::builder().build() },
}
}
#[cfg(feature = "gst-backend")]
pub fn poll_bus(&self) -> Option<BusMessage> {
use gstreamer::MessageView;
use gstreamer::prelude::*;
let pipeline_obj: &gstreamer::Object = self.pipeline.upcast_ref();
loop {
let msg = self.bus.pop()?;
let mapped = match msg.view() {
MessageView::Eos(_) => Some(BusMessage::Eos),
MessageView::Error(e) => {
let debug = e.debug().map(|d| d.to_string());
Some(BusMessage::Error {
message: e.error().to_string(),
debug,
})
}
MessageView::Warning(w) => {
let debug = w.debug().map(|d| d.to_string());
Some(BusMessage::Warning {
message: w.error().to_string(),
debug,
})
}
MessageView::StateChanged(sc) => {
if msg.src() == Some(pipeline_obj) {
Some(BusMessage::StateChanged {
old: map_gst_state(sc.old()),
new: map_gst_state(sc.current()),
})
} else {
None
}
}
MessageView::StreamStart(_) => Some(BusMessage::StreamStart),
MessageView::Latency(_) => Some(BusMessage::Latency),
MessageView::Buffering(b) => Some(BusMessage::Buffering {
percent: b.percent() as u32,
}),
_ => None,
};
if let Some(bus_msg) = mapped {
return Some(bus_msg);
}
}
}
#[cfg(not(feature = "gst-backend"))]
pub fn poll_bus(&self) -> Option<BusMessage> {
None
}
pub fn pause(&mut self) -> Result<(), MediaError> {
if self.state != SessionState::Running {
return Err(MediaError::Unsupported {
detail: "can only pause a running session".into(),
});
}
#[cfg(feature = "gst-backend")]
{
use gstreamer::prelude::*;
self.pipeline
.set_state(gstreamer::State::Paused)
.map_err(|e| MediaError::Unsupported {
detail: format!("failed to pause pipeline: {e}"),
})?;
}
self.state = SessionState::Paused;
Ok(())
}
pub fn resume(&mut self) -> Result<(), MediaError> {
if self.state != SessionState::Paused {
return Err(MediaError::Unsupported {
detail: "can only resume a paused session".into(),
});
}
#[cfg(feature = "gst-backend")]
{
use gstreamer::prelude::*;
self.pipeline
.set_state(gstreamer::State::Playing)
.map_err(|e| MediaError::Unsupported {
detail: format!("failed to resume pipeline: {e}"),
})?;
}
self.state = SessionState::Running;
Ok(())
}
pub fn stop(&mut self) -> Result<(), MediaError> {
if self.state == SessionState::Stopped {
return Ok(());
}
#[cfg(feature = "gst-backend")]
{
use gstreamer::prelude::*;
let _ = self.pipeline.set_state(gstreamer::State::Null);
}
self.state = SessionState::Stopped;
Ok(())
}
pub fn seek_start(&mut self) -> Result<(), MediaError> {
if self.state != SessionState::Running {
return Err(MediaError::Unsupported {
detail: "can only seek a running session".into(),
});
}
#[cfg(feature = "gst-backend")]
{
use gstreamer::prelude::*;
self.pipeline
.seek_simple(
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::ClockTime::ZERO,
)
.map_err(|_| MediaError::Unsupported {
detail: "pipeline seek to start failed".into(),
})?;
}
Ok(())
}
pub fn selected_decoder(&self) -> Option<SelectedDecoderInfo> {
self.selected_decoder.lock().ok().and_then(|g| g.clone())
}
pub fn gpu_resident(&self) -> bool {
self.gpu_resident
}
#[allow(dead_code)] pub fn requested_residency(&self) -> &DeviceResidency {
&self.config.device_residency
}
}
#[cfg(test)]
impl GstSession {
pub fn state(&self) -> SessionState {
self.state
}
pub fn set_selected_decoder(&self, info: Option<SelectedDecoderInfo>) {
if let Ok(mut slot) = self.selected_decoder.lock() {
*slot = info;
}
}
}
#[cfg(feature = "gst-backend")]
fn map_gst_state(state: gstreamer::State) -> crate::bus::ElementState {
match state {
gstreamer::State::Null | gstreamer::State::Ready => crate::bus::ElementState::Ready,
gstreamer::State::Paused => crate::bus::ElementState::Paused,
gstreamer::State::Playing => crate::bus::ElementState::Playing,
_ => crate::bus::ElementState::Null,
}
}
impl Drop for GstSession {
fn drop(&mut self) {
if self.state != SessionState::Stopped {
let _ = self.stop();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use nv_core::config::RtspTransport;
fn test_config() -> SessionConfig {
SessionConfig {
feed_id: FeedId::new(1),
spec: SourceSpec::Rtsp {
url: "rtsp://test/stream".into(),
transport: RtspTransport::Tcp,
security: nv_core::security::RtspSecurityPolicy::AllowInsecure,
},
decoder: DecoderSelection::Auto,
output_format: OutputFormat::default(),
ptz_provider: None,
post_decode_hook: None,
event_queue_capacity: EVENT_QUEUE_CAPACITY,
device_residency: DeviceResidency::default(),
}
}
#[test]
fn stub_session_starts_running() {
let s = GstSession::start_stub(test_config());
assert_eq!(s.state(), SessionState::Running);
}
#[test]
fn pause_resume_cycle() {
let mut s = GstSession::start_stub(test_config());
s.pause().unwrap();
assert_eq!(s.state(), SessionState::Paused);
s.resume().unwrap();
assert_eq!(s.state(), SessionState::Running);
}
#[test]
fn stop_is_terminal() {
let mut s = GstSession::start_stub(test_config());
s.stop().unwrap();
assert_eq!(s.state(), SessionState::Stopped);
s.stop().unwrap();
}
#[test]
fn cannot_pause_when_not_running() {
let mut s = GstSession::start_stub(test_config());
s.stop().unwrap();
assert!(s.pause().is_err());
}
#[test]
fn cannot_resume_when_not_paused() {
let mut s = GstSession::start_stub(test_config());
assert!(s.resume().is_err());
}
#[test]
fn drop_stops_session() {
let s = GstSession::start_stub(test_config());
assert_eq!(s.state(), SessionState::Running);
drop(s); }
const _: () = {
const fn assert_send<T: Send>() {}
assert_send::<GstSession>();
};
}