use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use nv_core::TypedMetadata;
use nv_core::error::MediaError;
use nv_core::id::FeedId;
use nv_core::timestamp::{MonotonicTs, WallTs};
use nv_frame::{FrameEnvelope, HostBytes, HostMaterializeFn, PixelFormat};
use crate::bridge::PtzTelemetry;
use crate::pipeline::OutputFormat;
const GST_CUDA_MEMORY_TYPE_NAME: &str = "cuda-memory";
const GST_MAP_CUDA: u32 = gstreamer::ffi::GST_MAP_FLAG_LAST << 1;
pub struct CudaBufferHandle {
pub device_ptr: u64,
pub len: usize,
pub width: u32,
pub height: u32,
pub stride: u32,
pub format: PixelFormat,
_gst_buffer: gstreamer::Buffer,
}
unsafe impl Send for CudaBufferHandle {}
unsafe impl Sync for CudaBufferHandle {}
fn extract_cuda_device_ptr(buffer: &gstreamer::BufferRef) -> Result<(u64, usize), MediaError> {
#[allow(unused_imports)] use gstreamer::glib::translate::ToGlibPtr;
unsafe {
let mut map_info = MaybeUninit::<gstreamer::ffi::GstMapInfo>::zeroed();
let flags = gstreamer::ffi::GST_MAP_READ | GST_MAP_CUDA;
let ok =
gstreamer::ffi::gst_buffer_map(buffer.as_ptr() as *mut _, map_info.as_mut_ptr(), flags);
if ok == gstreamer::glib::ffi::GFALSE {
return Err(MediaError::DecodeFailed {
detail: "gst_buffer_map with GST_MAP_CUDA failed — \
CUDA allocator may not be available"
.into(),
});
}
let info = map_info.assume_init();
let device_ptr = info.data as u64;
let size = info.size;
gstreamer::ffi::gst_buffer_unmap(buffer.as_ptr() as *mut _, map_info.as_mut_ptr());
Ok((device_ptr, size))
}
}
const MIN_DEVICE_BUFFER_SIZE: usize = 1;
fn validate_device_ptr(ptr: u64, len: usize, width: u32, height: u32) -> Result<(), MediaError> {
if ptr == 0 {
return Err(MediaError::DecodeFailed {
detail: "CUDA device pointer is null — the GStreamer CUDA allocator \
may not have handled GST_MAP_CUDA correctly, or the buffer \
does not contain device memory"
.into(),
});
}
if len < MIN_DEVICE_BUFFER_SIZE {
return Err(MediaError::DecodeFailed {
detail: format!(
"CUDA device buffer size is {len} bytes — expected at least \
{MIN_DEVICE_BUFFER_SIZE} byte(s) for a mapped frame"
),
});
}
let min_plausible = (width as usize).saturating_mul(height as usize);
if min_plausible > 0 && len < min_plausible {
return Err(MediaError::DecodeFailed {
detail: format!(
"CUDA device buffer too small for declared geometry: \
buffer is {len} bytes but {width}×{height} requires at \
least {min_plausible} bytes (1 byte/pixel minimum)"
),
});
}
Ok(())
}
pub(crate) fn bridge_gst_sample_device(
feed_id: FeedId,
seq: &Arc<AtomicU64>,
output_format: OutputFormat,
sample: &gstreamer::Sample,
ptz: Option<PtzTelemetry>,
) -> Result<FrameEnvelope, MediaError> {
let caps = sample.caps().ok_or_else(|| MediaError::DecodeFailed {
detail: "sample has no caps".into(),
})?;
let video_info =
gstreamer_video::VideoInfo::from_caps(caps).map_err(|e| MediaError::DecodeFailed {
detail: format!("failed to parse VideoInfo from caps: {e}"),
})?;
let width = video_info.width();
let height = video_info.height();
let stride = video_info.stride()[0] as u32;
let format = output_format.to_pixel_format();
let buffer = sample
.buffer_owned()
.ok_or_else(|| MediaError::DecodeFailed {
detail: "sample has no buffer".into(),
})?;
let pts_ns = buffer.pts().map(|pts| pts.nseconds()).unwrap_or(0);
let ts = MonotonicTs::from_nanos(pts_ns);
let wall_ts = WallTs::now();
let frame_seq = seq.fetch_add(1, Ordering::Relaxed);
let memory = buffer.memory(0).ok_or_else(|| MediaError::DecodeFailed {
detail: "buffer has no memory block".into(),
})?;
if !memory.is_type(GST_CUDA_MEMORY_TYPE_NAME) {
return Err(MediaError::DecodeFailed {
detail: format!(
"buffer memory is not CUDA device memory (expected type \
\"{GST_CUDA_MEMORY_TYPE_NAME}\") — is the cudaupload \
element present and working?"
),
});
}
let (device_ptr, len) = extract_cuda_device_ptr(&buffer)?;
validate_device_ptr(device_ptr, len, width, height)?;
let handle = Arc::new(CudaBufferHandle {
device_ptr,
len,
width,
height,
stride,
format,
_gst_buffer: buffer.clone(),
});
let mat_buffer = buffer.clone();
let materialize: HostMaterializeFn = Box::new(move || {
let map = mat_buffer.map_readable().map_err(|_| {
nv_frame::FrameAccessError::MaterializationFailed {
detail: "failed to map CUDA buffer to host".into(),
}
})?;
Ok(HostBytes::from_vec(map.as_slice().to_vec()))
});
let mut metadata = TypedMetadata::new();
if let Some(telemetry) = ptz {
metadata.insert(telemetry);
}
Ok(FrameEnvelope::new_device(
feed_id,
frame_seq,
ts,
wall_ts,
width,
height,
format,
stride,
handle,
Some(materialize),
metadata,
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn gst_map_cuda_flag_value() {
assert_eq!(gstreamer::ffi::GST_MAP_FLAG_LAST, 1 << 16);
assert_eq!(GST_MAP_CUDA, 1 << 17);
assert_eq!(GST_MAP_CUDA, 131_072);
}
#[test]
fn cuda_buffer_handle_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CudaBufferHandle>();
}
#[test]
fn cuda_memory_type_name_matches_gstreamer_convention() {
assert_eq!(GST_CUDA_MEMORY_TYPE_NAME, "cuda-memory");
}
#[test]
fn validate_rejects_null_pointer() {
let err = validate_device_ptr(0, 1024, 32, 32).unwrap_err();
match err {
MediaError::DecodeFailed { detail } => {
assert!(detail.contains("null"), "expected 'null' in: {detail}");
}
other => panic!("expected DecodeFailed, got {other}"),
}
}
#[test]
fn validate_rejects_zero_length() {
let err = validate_device_ptr(0xDEAD_BEEF, 0, 32, 32).unwrap_err();
match err {
MediaError::DecodeFailed { detail } => {
assert!(
detail.contains("0 bytes"),
"expected size mention in: {detail}"
);
}
other => panic!("expected DecodeFailed, got {other}"),
}
}
#[test]
fn validate_rejects_undersized_buffer() {
let err = validate_device_ptr(0xDEAD_BEEF, 1000, 640, 480).unwrap_err();
match err {
MediaError::DecodeFailed { detail } => {
assert!(
detail.contains("too small"),
"expected 'too small' in: {detail}"
);
assert!(detail.contains("640"), "should mention width: {detail}");
assert!(detail.contains("480"), "should mention height: {detail}");
}
other => panic!("expected DecodeFailed, got {other}"),
}
}
#[test]
fn validate_accepts_valid_pointer() {
validate_device_ptr(0x1000_0000, 921_600, 640, 480).unwrap();
}
#[test]
fn validate_accepts_larger_than_minimum() {
validate_device_ptr(0x1000_0000, 1_000_000, 640, 480).unwrap();
}
#[test]
fn validate_accepts_1x1_frame() {
validate_device_ptr(0x1000_0000, 1, 1, 1).unwrap();
}
}