Skip to main content

oxicuda_driver/
stream.rs

1//! CUDA stream management.
2//!
3//! Streams are command queues on the GPU. Commands within a stream
4//! execute in order. Different streams can execute concurrently.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! # use std::sync::Arc;
10//! # use oxicuda_driver::context::Context;
11//! # use oxicuda_driver::stream::Stream;
12//! # fn main() -> Result<(), oxicuda_driver::error::CudaError> {
13//! // Assuming `ctx` is an Arc<Context> obtained from Context::new(...)
14//! # let ctx: Arc<Context> = unimplemented!();
15//! let stream = Stream::new(&ctx)?;
16//! // ... enqueue work on the stream ...
17//! stream.synchronize()?;
18//! # Ok(())
19//! # }
20//! ```
21
22use std::sync::Arc;
23
24use crate::context::Context;
25use crate::error::CudaResult;
26use crate::event::Event;
27use crate::ffi::{CU_STREAM_NON_BLOCKING, CUstream};
28use crate::loader::try_driver;
29
30/// A CUDA stream (GPU command queue).
31///
32/// Streams provide ordered, asynchronous execution of GPU commands.
33/// Commands enqueued on the same stream execute sequentially, while
34/// commands on different streams may execute concurrently.
35///
36/// The stream holds an [`Arc<Context>`] to ensure the parent context
37/// outlives the stream.
38pub struct Stream {
39    /// Raw CUDA stream handle.
40    raw: CUstream,
41    /// Keeps the parent context alive for the lifetime of the stream.
42    ctx: Arc<Context>,
43}
44
45// SAFETY: CUDA streams are safe to send between threads when properly
46// synchronised via the driver API. The Arc<Context> is already Send.
47unsafe impl Send for Stream {}
48
49impl Stream {
50    /// Creates a new stream with [`CU_STREAM_NON_BLOCKING`] flag.
51    ///
52    /// Non-blocking streams do not implicitly synchronise with the
53    /// default (NULL) stream, allowing maximum concurrency.
54    ///
55    /// # Errors
56    ///
57    /// Returns a [`CudaError`](crate::error::CudaError) if the driver
58    /// call fails (e.g. invalid context, out of resources).
59    pub fn new(ctx: &Arc<Context>) -> CudaResult<Self> {
60        let api = try_driver()?;
61        let mut raw = CUstream::default();
62        crate::cuda_call!((api.cu_stream_create)(&mut raw, CU_STREAM_NON_BLOCKING))?;
63        Ok(Self {
64            raw,
65            ctx: Arc::clone(ctx),
66        })
67    }
68
69    /// Creates a new stream with the specified priority and
70    /// [`CU_STREAM_NON_BLOCKING`] flag.
71    ///
72    /// Lower numerical values indicate higher priority. The valid range
73    /// can be queried via `cuCtxGetStreamPriorityRange`.
74    ///
75    /// # Errors
76    ///
77    /// Returns a [`CudaError`](crate::error::CudaError) if the priority
78    /// is out of range or the driver call otherwise fails.
79    pub fn with_priority(ctx: &Arc<Context>, priority: i32) -> CudaResult<Self> {
80        let api = try_driver()?;
81        let mut raw = CUstream::default();
82        crate::cuda_call!((api.cu_stream_create_with_priority)(
83            &mut raw,
84            CU_STREAM_NON_BLOCKING,
85            priority
86        ))?;
87        Ok(Self {
88            raw,
89            ctx: Arc::clone(ctx),
90        })
91    }
92
93    /// Blocks the calling thread until all previously enqueued commands
94    /// in this stream have completed.
95    ///
96    /// # Errors
97    ///
98    /// Returns a [`CudaError`](crate::error::CudaError) if any enqueued
99    /// operation failed or the driver reports an error.
100    pub fn synchronize(&self) -> CudaResult<()> {
101        let api = try_driver()?;
102        crate::cuda_call!((api.cu_stream_synchronize)(self.raw))
103    }
104
105    /// Makes all future work submitted to this stream wait until
106    /// the given event has been recorded and completed.
107    ///
108    /// This is the primary mechanism for inter-stream synchronisation:
109    /// record an [`Event`] on one stream, then call `wait_event` on
110    /// another stream to establish an ordering dependency.
111    ///
112    /// # Errors
113    ///
114    /// Returns a [`CudaError`](crate::error::CudaError) if the driver
115    /// call fails (e.g. invalid event handle).
116    pub fn wait_event(&self, event: &Event) -> CudaResult<()> {
117        let api = try_driver()?;
118        // flags = 0 is the only documented value.
119        crate::cuda_call!((api.cu_stream_wait_event)(self.raw, event.raw(), 0))
120    }
121
122    /// Returns the raw [`CUstream`] handle.
123    ///
124    /// # Safety (caller)
125    ///
126    /// The caller must not destroy or otherwise invalidate the handle
127    /// while this `Stream` is still alive.
128    #[inline]
129    pub fn raw(&self) -> CUstream {
130        self.raw
131    }
132
133    /// Returns a reference to the parent [`Context`].
134    #[inline]
135    pub fn context(&self) -> &Arc<Context> {
136        &self.ctx
137    }
138}
139
140impl Drop for Stream {
141    fn drop(&mut self) {
142        if let Ok(api) = try_driver() {
143            let rc = unsafe { (api.cu_stream_destroy_v2)(self.raw) };
144            if rc != 0 {
145                tracing::warn!(
146                    cuda_error = rc,
147                    stream = ?self.raw,
148                    "cuStreamDestroy_v2 failed during drop"
149                );
150            }
151        }
152    }
153}