use std::ptr::NonNull;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use crossbeam_queue::ArrayQueue;
use cudarc::driver::{
sys::{CUevent, CUevent_flags, CUresult, CUstream, CUstream_flags, CUstream_st},
CudaContext,
};
use vyre_driver::{backend::private, BackendError, PendingDispatch};
use crate::backend::telemetry::CudaTelemetry;
use crate::backend::{cuda_check, DispatchAllocations, HostTransferAllocations, ResidentUseGuard};
#[derive(Debug)]
pub(crate) struct CudaStream {
raw: CUstream,
}
unsafe impl Send for CudaStream {}
unsafe impl Sync for CudaStream {}
impl CudaStream {
pub(crate) fn non_blocking() -> Result<Self, BackendError> {
let raw = create_non_blocking_raw_stream("cuStreamCreate")?;
Ok(Self { raw: raw.as_ptr() })
}
#[must_use]
pub(crate) fn raw(&self) -> CUstream {
self.raw
}
pub(crate) fn synchronize(&self) -> Result<(), BackendError> {
synchronize_raw_stream(self.raw, "cuStreamSynchronize")
}
}
pub(crate) fn create_non_blocking_raw_stream(
label: &'static str,
) -> Result<NonNull<CUstream_st>, BackendError> {
let mut raw = std::ptr::null_mut();
unsafe {
cuda_check(
cudarc::driver::sys::cuStreamCreate(
&mut raw,
CUstream_flags::CU_STREAM_NON_BLOCKING as u32,
),
label,
)?;
}
NonNull::new(raw).ok_or_else(|| BackendError::DispatchFailed {
code: None,
message: format!(
"{label} returned a null stream after reporting success. Fix: update the CUDA driver or disable the CUDA path using this stream."
),
})
}
pub(crate) fn destroy_raw_stream(stream: CUstream, label: &'static str) {
if stream.is_null() {
return;
}
unsafe {
let result = cudarc::driver::sys::cuStreamDestroy_v2(stream);
if result != CUresult::CUDA_SUCCESS {
tracing::error!(
"Fix: {label} failed during CUDA stream drop with {result:?}; ensure pending work is synchronized before dropping dispatch resources."
);
}
}
}
pub(crate) fn query_raw_stream_ready(
stream: CUstream,
label: &'static str,
) -> Result<bool, BackendError> {
if stream.is_null() {
return Err(BackendError::InvalidProgram {
fix: format!(
"Fix: {label} received a null CUDA stream; use a backend-owned non-blocking stream instead of querying CUDA's legacy default stream."
),
});
}
let result = unsafe { cudarc::driver::sys::cuStreamQuery(stream) };
match result {
CUresult::CUDA_SUCCESS => Ok(true),
CUresult::CUDA_ERROR_NOT_READY => Ok(false),
other => cuda_check(other, label).map(|()| true),
}
}
pub(crate) fn synchronize_raw_stream(
stream: CUstream,
label: &'static str,
) -> Result<(), BackendError> {
if stream.is_null() {
return Err(BackendError::InvalidProgram {
fix: format!(
"Fix: {label} received a null CUDA stream; use a backend-owned non-blocking stream instead of the legacy default stream."
),
});
}
unsafe { cuda_check(cudarc::driver::sys::cuStreamSynchronize(stream), label) }
}
impl Drop for CudaStream {
fn drop(&mut self) {
destroy_raw_stream(self.raw, "cuStreamDestroy_v2");
}
}
#[derive(Debug)]
pub(crate) struct CudaEvent {
raw: CUevent,
}
unsafe impl Send for CudaEvent {}
unsafe impl Sync for CudaEvent {}
impl CudaEvent {
pub(crate) fn completion() -> Result<Self, BackendError> {
let raw = create_raw_event(
CUevent_flags::CU_EVENT_DISABLE_TIMING as u32,
"cuEventCreate",
)?;
Ok(Self { raw })
}
pub(crate) fn timing() -> Result<Self, BackendError> {
let raw = create_raw_event(0, "cuEventCreate")?;
Ok(Self { raw })
}
pub(crate) fn record(&self, stream: CUstream) -> Result<(), BackendError> {
if self.raw.is_null() {
return Err(BackendError::InvalidProgram {
fix: "Fix: cuEventRecord received a null CUDA event; acquire a backend-owned event before recording completion.".to_string(),
});
}
if stream.is_null() {
return Err(BackendError::InvalidProgram {
fix: "Fix: cuEventRecord received a null CUDA stream; record events on a backend-owned non-blocking stream instead of CUDA's legacy default stream.".to_string(),
});
}
unsafe {
cuda_check(
cudarc::driver::sys::cuEventRecord(self.raw, stream),
"cuEventRecord",
)
}
}
pub(crate) fn query_ready(&self) -> Result<bool, BackendError> {
if self.raw.is_null() {
return Err(BackendError::InvalidProgram {
fix: "Fix: cuEventQuery received a null CUDA event; pending dispatches must own a recorded completion event before readiness polling.".to_string(),
});
}
let result = unsafe { cudarc::driver::sys::cuEventQuery(self.raw) };
match result {
CUresult::CUDA_SUCCESS => Ok(true),
CUresult::CUDA_ERROR_NOT_READY => Ok(false),
other => cuda_check(other, "cuEventQuery").map(|()| true),
}
}
pub(crate) fn synchronize(&self) -> Result<(), BackendError> {
if self.raw.is_null() {
return Err(BackendError::InvalidProgram {
fix: "Fix: cuEventSynchronize received a null CUDA event; pending dispatches must own a recorded completion event before synchronization.".to_string(),
});
}
unsafe {
cuda_check(
cudarc::driver::sys::cuEventSynchronize(self.raw),
"cuEventSynchronize",
)
}
}
pub(crate) fn elapsed_time_ns(&self, end: &CudaEvent) -> Result<u64, BackendError> {
if self.raw.is_null() || end.raw.is_null() {
return Err(BackendError::InvalidProgram {
fix: "Fix: cuEventElapsedTime received a null CUDA timing event; record both timing events before reading elapsed time.".to_string(),
});
}
let mut elapsed_ms = 0.0f32;
unsafe {
cuda_check(
cudarc::driver::sys::cuEventElapsedTime(
(&mut elapsed_ms) as *mut f32,
self.raw,
end.raw,
),
"cuEventElapsedTime",
)?;
}
let elapsed_ns = f64::from(elapsed_ms) * 1_000_000.0;
if !elapsed_ns.is_finite() || elapsed_ns < 0.0 || elapsed_ns > u64::MAX as f64 {
return Err(BackendError::InvalidProgram {
fix: format!(
"Fix: CUDA event elapsed time {elapsed_ms} ms cannot fit u64 nanoseconds; inspect CUDA event timing and split the dispatch before telemetry overflows."
),
});
}
crate::numeric::CUDA_NUMERIC.rounded_f64_to_u64(elapsed_ns, "event elapsed nanoseconds")
}
}
impl Drop for CudaEvent {
fn drop(&mut self) {
destroy_raw_event(self.raw, "cuEventDestroy_v2");
}
}
fn create_raw_event(flags: u32, label: &'static str) -> Result<CUevent, BackendError> {
let mut raw = std::ptr::null_mut();
unsafe {
cuda_check(cudarc::driver::sys::cuEventCreate(&mut raw, flags), label)?;
}
if raw.is_null() {
return Err(BackendError::DispatchFailed {
code: None,
message: format!(
"{label} returned a null event after reporting success. Fix: update the CUDA driver or disable event-backed CUDA dispatch for this device."
),
});
}
Ok(raw)
}
fn destroy_raw_event(event: CUevent, label: &'static str) {
if event.is_null() {
return;
}
unsafe {
let result = cudarc::driver::sys::cuEventDestroy_v2(event);
if result != CUresult::CUDA_SUCCESS {
tracing::error!(
"Fix: {label} failed during CUDA event drop with {result:?}; ensure pending work is synchronized before dropping dispatch resources."
);
}
}
}
#[derive(Debug)]
pub(crate) struct CudaLaunchResourcePool {
streams: ArrayQueue<CudaStream>,
events: ArrayQueue<CudaEvent>,
timing_events: ArrayQueue<CudaEvent>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CudaLaunchResourceCounts {
pub streams: usize,
pub completion_events: usize,
pub timing_events: usize,
}
#[derive(Debug)]
pub(crate) struct CudaLaunchResourceLease {
pool: Arc<CudaLaunchResourcePool>,
stream: Option<CudaStream>,
timing_events: Option<(CudaEvent, CudaEvent)>,
}
#[derive(Debug)]
pub(crate) struct CudaTimingEventPairLease {
pool: Arc<CudaLaunchResourcePool>,
timing_events: Option<(CudaEvent, CudaEvent)>,
}
impl CudaTimingEventPairLease {
pub(crate) fn acquire(pool: Arc<CudaLaunchResourcePool>) -> Result<Self, BackendError> {
let timing_events = pool.acquire_timing_event_pair()?;
Ok(Self {
pool,
timing_events: Some(timing_events),
})
}
pub(crate) fn events(&self) -> Result<&(CudaEvent, CudaEvent), BackendError> {
self.timing_events
.as_ref()
.ok_or_else(|| BackendError::InvalidProgram {
fix: "Fix: CUDA timing event pair lease was already consumed; acquire a fresh timing lease before recording graph replay events.".to_string(),
})
}
}
impl Drop for CudaTimingEventPairLease {
fn drop(&mut self) {
if let Some((start, end)) = self.timing_events.take() {
self.pool.release_timing_event(start);
self.pool.release_timing_event(end);
}
}
}
impl CudaLaunchResourceLease {
pub(crate) fn acquire(
pool: Arc<CudaLaunchResourcePool>,
capture_timing: bool,
) -> Result<Self, BackendError> {
let stream = pool.acquire_stream()?;
let timing_events = if capture_timing {
match pool.acquire_timing_event_pair() {
Ok(pair) => Some(pair),
Err(error) => {
pool.release_stream(stream);
return Err(error);
}
}
} else {
None
};
Ok(Self {
pool,
stream: Some(stream),
timing_events,
})
}
pub(crate) fn stream_raw(&self) -> Result<CUstream, BackendError> {
self.stream
.as_ref()
.map(CudaStream::raw)
.ok_or_else(|| BackendError::InvalidProgram {
fix: "Fix: CUDA launch resource lease stream was already consumed; acquire a fresh launch-resource lease before enqueueing CUDA work.".to_string(),
})
}
pub(crate) fn timing_events(&self) -> Result<Option<&(CudaEvent, CudaEvent)>, BackendError> {
if self.stream.is_none() {
return Err(BackendError::InvalidProgram {
fix: "Fix: CUDA launch resource lease timing events were queried after the stream was consumed; query timing events before transferring the lease into a pending dispatch.".to_string(),
});
}
Ok(self.timing_events.as_ref())
}
pub(crate) fn into_parts(
mut self,
) -> Result<(CudaStream, Option<(CudaEvent, CudaEvent)>), BackendError> {
let stream = self.stream.take().ok_or_else(|| BackendError::InvalidProgram {
fix: "Fix: CUDA launch resource lease stream was already consumed; pending dispatch ownership cannot be built twice from the same lease.".to_string(),
})?;
let timing_events = self.timing_events.take();
Ok((stream, timing_events))
}
}
impl Drop for CudaLaunchResourceLease {
fn drop(&mut self) {
if let Some((start, end)) = self.timing_events.take() {
self.pool.release_timing_event(start);
self.pool.release_timing_event(end);
}
if let Some(stream) = self.stream.take() {
self.pool.release_stream(stream);
}
}
}
impl CudaLaunchResourcePool {
pub(crate) fn new(max_cached: usize) -> Self {
let max_cached = max_cached.max(1);
Self {
streams: ArrayQueue::new(max_cached),
events: ArrayQueue::new(max_cached),
timing_events: ArrayQueue::new(max_cached),
}
}
pub(crate) fn acquire_stream(&self) -> Result<CudaStream, BackendError> {
if let Some(stream) = self.streams.pop() {
return Ok(stream);
}
CudaStream::non_blocking()
}
pub(crate) fn acquire_event(&self) -> Result<CudaEvent, BackendError> {
if let Some(event) = self.events.pop() {
return Ok(event);
}
CudaEvent::completion()
}
pub(crate) fn acquire_timing_event(&self) -> Result<CudaEvent, BackendError> {
if let Some(event) = self.timing_events.pop() {
return Ok(event);
}
CudaEvent::timing()
}
pub(crate) fn acquire_timing_event_pair(&self) -> Result<(CudaEvent, CudaEvent), BackendError> {
let start = self.acquire_timing_event()?;
match self.acquire_timing_event() {
Ok(end) => Ok((start, end)),
Err(error) => {
self.release_timing_event(start);
Err(error)
}
}
}
pub(crate) fn release_stream(&self, stream: CudaStream) {
if let Err(stream) = self.streams.push(stream) {
drop(stream);
}
}
pub(crate) fn release_event(&self, event: CudaEvent) {
if let Err(event) = self.events.push(event) {
drop(event);
}
}
pub(crate) fn release_timing_event(&self, event: CudaEvent) {
if let Err(event) = self.timing_events.push(event) {
drop(event);
}
}
pub(crate) fn cached_counts(&self) -> Result<(usize, usize), BackendError> {
Ok((self.streams.len(), self.events.len()))
}
pub(crate) fn cached_counts_detailed(&self) -> Result<CudaLaunchResourceCounts, BackendError> {
Ok(CudaLaunchResourceCounts {
streams: self.streams.len(),
completion_events: self.events.len(),
timing_events: self.timing_events.len(),
})
}
pub(crate) fn clear(&self) -> Result<(), BackendError> {
while self.streams.pop().is_some() {}
while self.events.pop().is_some() {}
while self.timing_events.pop().is_some() {}
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct CudaPendingDispatch {
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
event: Option<CudaEvent>,
stream: Option<CudaStream>,
allocations: Option<DispatchAllocations>,
resident_use: Option<ResidentUseGuard>,
host_transfers: Option<HostTransferAllocations>,
outputs: Vec<Vec<u8>>,
timing_start: Option<CudaEvent>,
timing_end: Option<CudaEvent>,
ready_device_ns: Option<u64>,
telemetry: Arc<CudaTelemetry>,
completed: AtomicBool,
}
impl CudaPendingDispatch {
pub(crate) fn new_ready(
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
outputs: Vec<Vec<u8>>,
telemetry: Arc<CudaTelemetry>,
) -> Self {
Self {
ctx,
pool,
event: None,
stream: None,
allocations: None,
resident_use: None,
host_transfers: None,
outputs,
timing_start: None,
timing_end: None,
ready_device_ns: None,
telemetry,
completed: AtomicBool::new(true),
}
}
pub(crate) fn new_ready_timed(
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
outputs: Vec<Vec<u8>>,
device_ns: Option<u64>,
telemetry: Arc<CudaTelemetry>,
) -> Self {
Self {
ctx,
pool,
event: None,
stream: None,
allocations: None,
resident_use: None,
host_transfers: None,
outputs,
timing_start: None,
timing_end: None,
ready_device_ns: device_ns,
telemetry,
completed: AtomicBool::new(true),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_resident_batch_pending(
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
event: CudaEvent,
stream: CudaStream,
allocations: DispatchAllocations,
resident_use: ResidentUseGuard,
host_transfers: HostTransferAllocations,
telemetry: Arc<CudaTelemetry>,
) -> Self {
Self::new(
ctx,
pool,
event,
stream,
allocations,
Some(resident_use),
Some(host_transfers),
Vec::new(),
telemetry,
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
event: CudaEvent,
stream: CudaStream,
allocations: DispatchAllocations,
resident_use: Option<ResidentUseGuard>,
host_transfers: Option<HostTransferAllocations>,
outputs: Vec<Vec<u8>>,
telemetry: Arc<CudaTelemetry>,
) -> Self {
Self {
ctx,
pool,
event: Some(event),
stream: Some(stream),
allocations: Some(allocations),
resident_use,
host_transfers,
outputs,
timing_start: None,
timing_end: None,
ready_device_ns: None,
telemetry,
completed: AtomicBool::new(false),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_with_timing(
ctx: Arc<CudaContext>,
pool: Arc<CudaLaunchResourcePool>,
event: CudaEvent,
stream: CudaStream,
allocations: DispatchAllocations,
resident_use: Option<ResidentUseGuard>,
host_transfers: Option<HostTransferAllocations>,
outputs: Vec<Vec<u8>>,
timing_start: CudaEvent,
timing_end: CudaEvent,
telemetry: Arc<CudaTelemetry>,
) -> Self {
Self {
ctx,
pool,
event: Some(event),
stream: Some(stream),
allocations: Some(allocations),
resident_use,
host_transfers,
outputs,
timing_start: Some(timing_start),
timing_end: Some(timing_end),
ready_device_ns: None,
telemetry,
completed: AtomicBool::new(false),
}
}
fn bind_context(&self) -> Result<(), BackendError> {
self.ctx
.bind_to_thread()
.map_err(|e| BackendError::DispatchFailed {
code: None,
message: format!("CUDA context bind failed: {e}"),
})
}
fn synchronize(&self) -> Result<(), BackendError> {
if self.completed.load(Ordering::Acquire) {
return Ok(());
}
self.bind_context()?;
let event = self
.event
.as_ref()
.ok_or_else(|| BackendError::DispatchFailed {
code: None,
message: "CUDA pending dispatch completion event was already released".to_string(),
})?;
event.synchronize()?;
self.telemetry.record_sync_point();
self.completed.store(true, Ordering::Release);
Ok(())
}
fn release_launch_resources(&mut self) {
if let Some(event) = self.event.take() {
self.pool.release_event(event);
}
if let Some(event) = self.timing_start.take() {
self.pool.release_timing_event(event);
}
if let Some(event) = self.timing_end.take() {
self.pool.release_timing_event(event);
}
if let Some(stream) = self.stream.take() {
self.pool.release_stream(stream);
}
}
pub(crate) fn await_timed_result(
mut self,
) -> Result<(Vec<Vec<u8>>, Option<u64>), BackendError> {
self.synchronize()?;
let device_ns = match self.ready_device_ns.take() {
Some(device_ns) => Some(device_ns),
None => match (self.timing_start.as_ref(), self.timing_end.as_ref()) {
(Some(start), Some(end)) => Some(start.elapsed_time_ns(end)?),
_ => None,
},
};
self.release_launch_resources();
self.allocations.take();
self.resident_use.take();
let outputs = self.collect_outputs()?;
self.host_transfers.take();
Ok((outputs, device_ns))
}
fn collect_outputs(&mut self) -> Result<Vec<Vec<u8>>, BackendError> {
if let Some(transfers) = self.host_transfers.as_ref() {
let mut outputs = std::mem::take(&mut self.outputs);
transfers.collect_outputs_into(&mut outputs)?;
Ok(outputs)
} else {
Ok(std::mem::take(&mut self.outputs))
}
}
fn collect_outputs_into(&mut self, outputs: &mut Vec<Vec<u8>>) -> Result<(), BackendError> {
if let Some(transfers) = self.host_transfers.as_ref() {
transfers.collect_outputs_into(outputs)?;
} else {
vyre_driver::replace_output_buffers_preserving_slots(
std::mem::take(&mut self.outputs),
outputs,
);
}
Ok(())
}
}
impl private::Sealed for CudaPendingDispatch {}
impl PendingDispatch for CudaPendingDispatch {
fn is_ready(&self) -> bool {
if self.completed.load(Ordering::Acquire) {
return true;
}
if self.bind_context().is_err() {
return false;
}
let Some(event) = self.event.as_ref() else {
return true;
};
let ready = match event.query_ready() {
Ok(ready) => ready,
Err(error) => {
tracing::error!(
"Fix: CUDA pending dispatch readiness query failed: {error}. Await the dispatch to surface synchronization failure details."
);
false
}
};
if ready {
self.completed.store(true, Ordering::Release);
}
ready
}
fn await_result(mut self: Box<Self>) -> Result<Vec<Vec<u8>>, BackendError> {
self.synchronize()?;
self.release_launch_resources();
self.allocations.take();
self.resident_use.take();
let outputs = self.collect_outputs()?;
self.host_transfers.take();
Ok(outputs)
}
fn await_result_into(
mut self: Box<Self>,
outputs: &mut Vec<Vec<u8>>,
) -> Result<(), BackendError> {
self.synchronize()?;
self.release_launch_resources();
self.allocations.take();
self.resident_use.take();
self.collect_outputs_into(outputs)?;
self.host_transfers.take();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{query_raw_stream_ready, synchronize_raw_stream, CudaLaunchResourcePool};
#[test]
fn launch_resource_leases_do_not_panic_on_consumed_state() {
let source = include_str!("stream.rs");
assert!(
!source.contains(concat!(".expect", "(\"Fix: CUDA launch resource lease stream was already consumed")),
"Fix: CUDA launch resource leases must return typed backend errors when consumed twice, not panic."
);
assert!(
!source.contains(concat!(".expect", "(\"Fix: CUDA timing event pair lease was already consumed")),
"Fix: CUDA graph replay timing leases must return typed backend errors when consumed twice, not panic."
);
}
#[test]
fn launch_resource_counts_include_timing_events() {
let pool = CudaLaunchResourcePool::new(8);
let counts = pool
.cached_counts_detailed()
.expect("Fix: empty launch resource pool counts should be readable");
assert_eq!(counts.streams, 0);
assert_eq!(counts.completion_events, 0);
assert_eq!(counts.timing_events, 0);
let source = include_str!("stream.rs");
assert!(
source.contains("pub struct CudaLaunchResourceCounts")
&& source.contains("pub timing_events: usize")
&& source.contains("cached_counts_detailed"),
"Fix: CUDA launch-resource telemetry must expose timing-event cache pressure, not just streams and completion events."
);
}
#[test]
fn raw_stream_sync_rejects_null_default_stream() {
let err = synchronize_raw_stream(std::ptr::null_mut(), "unit sync")
.expect_err("Fix: raw stream sync must reject the legacy null stream");
assert!(
err.to_string().contains("null CUDA stream"),
"raw sync diagnostic must explain the default-stream hazard: {err}"
);
}
#[test]
fn raw_stream_query_rejects_null_default_stream() {
let err = query_raw_stream_ready(std::ptr::null_mut(), "unit query")
.expect_err("Fix: raw stream query must reject the legacy null stream");
assert!(
err.to_string().contains("null CUDA stream"),
"raw query diagnostic must explain the default-stream hazard: {err}"
);
}
#[test]
fn event_record_rejects_null_event_before_ffi() {
let event = super::CudaEvent {
raw: std::ptr::null_mut(),
};
let err = event
.record(std::ptr::null_mut())
.expect_err("Fix: event recording must reject invalid event handles before FFI");
assert!(
err.to_string().contains("null CUDA event"),
"event record diagnostic must explain the null-event hazard: {err}"
);
}
#[test]
fn event_record_rejects_null_default_stream_before_ffi() {
let event = std::mem::ManuallyDrop::new(super::CudaEvent {
raw: std::ptr::NonNull::<cudarc::driver::sys::CUevent_st>::dangling().as_ptr(),
});
let err = event
.record(std::ptr::null_mut())
.expect_err("Fix: event recording must reject CUDA's legacy null stream before FFI");
assert!(
err.to_string().contains("null CUDA stream"),
"event record diagnostic must explain the default-stream hazard: {err}"
);
}
#[test]
fn event_query_and_sync_reject_null_event_before_ffi() {
let event = super::CudaEvent {
raw: std::ptr::null_mut(),
};
let query_err = event
.query_ready()
.expect_err("Fix: event readiness query must reject null events before FFI");
assert!(
query_err.to_string().contains("null CUDA event"),
"event query diagnostic must explain the null-event hazard: {query_err}"
);
let sync_err = event
.synchronize()
.expect_err("Fix: event synchronize must reject null events before FFI");
assert!(
sync_err.to_string().contains("null CUDA event"),
"event sync diagnostic must explain the null-event hazard: {sync_err}"
);
}
#[test]
fn event_elapsed_time_rejects_null_timing_event_before_ffi() {
let event = super::CudaEvent {
raw: std::ptr::null_mut(),
};
let err = event
.elapsed_time_ns(&event)
.expect_err("Fix: elapsed timing must reject null events before FFI");
assert!(
err.to_string().contains("null CUDA timing event"),
"event elapsed diagnostic must explain the null-event hazard: {err}"
);
}
#[test]
fn stream_lifecycle_ffi_is_single_sourced_for_graph_capture() {
let stream = include_str!("stream.rs");
let cuda_graph = include_str!("backend/cuda_graph.rs");
let create_ffi = concat!("cudarc::driver::sys::", "cuStreamCreate(");
let destroy_ffi = concat!("cudarc::driver::sys::", "cuStreamDestroy_v2(");
assert_eq!(
stream.matches(create_ffi).count(),
1,
"Fix: raw CUDA stream creation must stay behind create_non_blocking_raw_stream."
);
assert_eq!(
stream.matches(destroy_ffi).count(),
1,
"Fix: raw CUDA stream destruction must stay behind destroy_raw_stream."
);
assert_eq!(
cuda_graph.matches(create_ffi).count() + cuda_graph.matches(destroy_ffi).count(),
0,
"Fix: cudaGraph capture must use the shared stream lifecycle helpers instead of direct stream FFI."
);
assert!(
stream.contains("fn create_non_blocking_raw_stream(")
&& stream.contains("returned a null stream after reporting success")
&& cuda_graph.contains("create_non_blocking_raw_stream"),
"Fix: shared CUDA stream creation must reject null-success handles and be used by cudaGraph."
);
}
#[test]
fn event_lifecycle_ffi_is_single_sourced() {
let stream = include_str!("stream.rs");
let create_ffi = concat!("cudarc::driver::sys::", "cuEventCreate(");
let destroy_ffi = concat!("cudarc::driver::sys::", "cuEventDestroy_v2(");
assert_eq!(
stream.matches(create_ffi).count(),
1,
"Fix: raw CUDA event creation must stay behind create_raw_event."
);
assert_eq!(
stream.matches(destroy_ffi).count(),
1,
"Fix: raw CUDA event destruction must stay behind destroy_raw_event."
);
assert!(
stream.contains("fn create_raw_event(")
&& stream.contains("returned a null event after reporting success")
&& stream.contains("fn destroy_raw_event(")
&& stream.contains("CudaEvent::completion")
&& stream.contains("CudaEvent::timing"),
"Fix: CUDA event lifecycle must use shared create/destroy helpers with null-success validation."
);
}
#[test]
fn graph_replay_uses_shared_stream_query_helper() {
let stream = include_str!("stream.rs");
let graph_replay = include_str!("backend/cuda_graph_replay.rs");
let query_ffi = concat!("cudarc::driver::sys::", "cuStreamQuery(");
assert_eq!(
stream.matches(query_ffi).count(),
1,
"Fix: raw CUDA stream query must stay behind query_raw_stream_ready."
);
assert_eq!(
graph_replay.matches(query_ffi).count(),
0,
"Fix: CUDA graph replay must use query_raw_stream_ready instead of raw cuStreamQuery."
);
assert!(
graph_replay.contains("query_raw_stream_ready")
&& stream.contains("fn query_raw_stream_ready("),
"Fix: graph replay polling must use the shared stream query helper."
);
}
}
impl Drop for CudaPendingDispatch {
fn drop(&mut self) {
if !self.completed.load(Ordering::Acquire) {
if let Err(error) = self.ctx.bind_to_thread() {
tracing::error!(
"Fix: failed to bind CUDA context while dropping pending dispatch: {error}. Dispatch completion could not be forced."
);
}
if let Some(stream) = self.stream.as_ref() {
if let Err(error) = stream.synchronize() {
tracing::error!(
"Fix: failed to synchronize CUDA stream while dropping pending dispatch: {error}. Dispatch completion state may be stale."
);
} else {
self.telemetry.record_sync_point();
}
}
self.completed.store(true, Ordering::Release);
}
self.release_launch_resources();
self.allocations.take();
self.resident_use.take();
self.host_transfers.take();
}
}