oxicuda_driver/stream.rs
1//! CUDA stream management.
2//!
3//! Streams are command queues on the GPU. Commands within a stream
4//! execute in order. Different streams can execute concurrently.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! # use std::sync::Arc;
10//! # use oxicuda_driver::context::Context;
11//! # use oxicuda_driver::stream::Stream;
12//! # fn main() -> Result<(), oxicuda_driver::error::CudaError> {
13//! // Assuming `ctx` is an Arc<Context> obtained from Context::new(...)
14//! # let ctx: Arc<Context> = unimplemented!();
15//! let stream = Stream::new(&ctx)?;
16//! // ... enqueue work on the stream ...
17//! stream.synchronize()?;
18//! # Ok(())
19//! # }
20//! ```
21
22use std::sync::Arc;
23
24use crate::context::Context;
25use crate::error::CudaResult;
26use crate::event::Event;
27use crate::ffi::{CU_STREAM_NON_BLOCKING, CUstream};
28use crate::loader::try_driver;
29
30/// A CUDA stream (GPU command queue).
31///
32/// Streams provide ordered, asynchronous execution of GPU commands.
33/// Commands enqueued on the same stream execute sequentially, while
34/// commands on different streams may execute concurrently.
35///
36/// The stream holds an [`Arc<Context>`] to ensure the parent context
37/// outlives the stream.
38pub struct Stream {
39 /// Raw CUDA stream handle.
40 raw: CUstream,
41 /// Keeps the parent context alive for the lifetime of the stream.
42 ctx: Arc<Context>,
43}
44
45// SAFETY: CUDA streams are safe to send between threads when properly
46// synchronised via the driver API. The Arc<Context> is already Send.
47unsafe impl Send for Stream {}
48
49impl Stream {
50 /// Creates a new stream with [`CU_STREAM_NON_BLOCKING`] flag.
51 ///
52 /// Non-blocking streams do not implicitly synchronise with the
53 /// default (NULL) stream, allowing maximum concurrency.
54 ///
55 /// # Errors
56 ///
57 /// Returns a [`CudaError`](crate::error::CudaError) if the driver
58 /// call fails (e.g. invalid context, out of resources).
59 pub fn new(ctx: &Arc<Context>) -> CudaResult<Self> {
60 let api = try_driver()?;
61 let mut raw = CUstream::default();
62 crate::cuda_call!((api.cu_stream_create)(&mut raw, CU_STREAM_NON_BLOCKING))?;
63 Ok(Self {
64 raw,
65 ctx: Arc::clone(ctx),
66 })
67 }
68
69 /// Creates a new stream with the specified priority and
70 /// [`CU_STREAM_NON_BLOCKING`] flag.
71 ///
72 /// Lower numerical values indicate higher priority. The valid range
73 /// can be queried via `cuCtxGetStreamPriorityRange`.
74 ///
75 /// # Errors
76 ///
77 /// Returns a [`CudaError`](crate::error::CudaError) if the priority
78 /// is out of range or the driver call otherwise fails.
79 pub fn with_priority(ctx: &Arc<Context>, priority: i32) -> CudaResult<Self> {
80 let api = try_driver()?;
81 let mut raw = CUstream::default();
82 crate::cuda_call!((api.cu_stream_create_with_priority)(
83 &mut raw,
84 CU_STREAM_NON_BLOCKING,
85 priority
86 ))?;
87 Ok(Self {
88 raw,
89 ctx: Arc::clone(ctx),
90 })
91 }
92
93 /// Blocks the calling thread until all previously enqueued commands
94 /// in this stream have completed.
95 ///
96 /// # Errors
97 ///
98 /// Returns a [`CudaError`](crate::error::CudaError) if any enqueued
99 /// operation failed or the driver reports an error.
100 pub fn synchronize(&self) -> CudaResult<()> {
101 let api = try_driver()?;
102 crate::cuda_call!((api.cu_stream_synchronize)(self.raw))
103 }
104
105 /// Makes all future work submitted to this stream wait until
106 /// the given event has been recorded and completed.
107 ///
108 /// This is the primary mechanism for inter-stream synchronisation:
109 /// record an [`Event`] on one stream, then call `wait_event` on
110 /// another stream to establish an ordering dependency.
111 ///
112 /// # Errors
113 ///
114 /// Returns a [`CudaError`](crate::error::CudaError) if the driver
115 /// call fails (e.g. invalid event handle).
116 pub fn wait_event(&self, event: &Event) -> CudaResult<()> {
117 let api = try_driver()?;
118 // flags = 0 is the only documented value.
119 crate::cuda_call!((api.cu_stream_wait_event)(self.raw, event.raw(), 0))
120 }
121
122 /// Returns the raw [`CUstream`] handle.
123 ///
124 /// # Safety (caller)
125 ///
126 /// The caller must not destroy or otherwise invalidate the handle
127 /// while this `Stream` is still alive.
128 #[inline]
129 pub fn raw(&self) -> CUstream {
130 self.raw
131 }
132
133 /// Returns a reference to the parent [`Context`].
134 #[inline]
135 pub fn context(&self) -> &Arc<Context> {
136 &self.ctx
137 }
138}
139
140impl Drop for Stream {
141 fn drop(&mut self) {
142 if let Ok(api) = try_driver() {
143 let rc = unsafe { (api.cu_stream_destroy_v2)(self.raw) };
144 if rc != 0 {
145 tracing::warn!(
146 cuda_error = rc,
147 stream = ?self.raw,
148 "cuStreamDestroy_v2 failed during drop"
149 );
150 }
151 }
152 }
153}