Skip to main content

svod_device/
queue.rs

1//! Hardware command queue abstraction for parallel execution.
2//!
3//! This module provides a device-agnostic interface for command queues,
4//! abstracting over CUDA streams, Metal command buffers, CPU task queues, etc.
5//!
6//! # Design
7//!
8//! The `HardwareQueue` trait uses a builder pattern for chaining operations:
9//!
10//! ```ignore
11//! queue
12//!     .wait(&signal, 1)       // Wait for dependency
13//!     .exec(&kernel, &bufs)   // Execute kernel
14//!     .signal(&signal, 2)     // Signal completion
15//!     .submit()?;             // Submit to hardware
16//! ```
17//!
18//! # Queue Types
19//!
20//! Most devices have two queue types:
21//! - **Compute queue**: For kernel execution
22//! - **Copy queue**: For DMA transfers (optional, some devices share)
23
24use std::sync::Arc;
25
26use svod_dtype::DeviceSpec;
27
28use crate::buffer::Buffer;
29use crate::device::Program;
30use crate::error::Result;
31use crate::sync::TimelineSignal;
32
33/// Kernel execution parameters.
34#[derive(Debug, Clone)]
35pub struct ExecParams {
36    /// Global work size (total number of work items per dimension).
37    pub global_size: [usize; 3],
38    /// Local work size (work group size per dimension). None means direct global-id execution.
39    pub local_size: Option<[usize; 3]>,
40    /// Positional runtime variable values.
41    pub vals: Vec<i64>,
42}
43
44impl ExecParams {
45    /// Create 1D execution parameters.
46    pub fn new_1d(global: usize, local: usize) -> Self {
47        Self { global_size: [global, 1, 1], local_size: Some([local, 1, 1]), vals: Vec::new() }
48    }
49
50    /// Create 2D execution parameters.
51    pub fn new_2d(global: [usize; 2], local: [usize; 2]) -> Self {
52        Self { global_size: [global[0], global[1], 1], local_size: Some([local[0], local[1], 1]), vals: Vec::new() }
53    }
54
55    /// Create 3D execution parameters.
56    pub fn new_3d(global: [usize; 3], local: [usize; 3]) -> Self {
57        Self { global_size: global, local_size: Some(local), vals: Vec::new() }
58    }
59
60    /// Attach positional runtime variable values.
61    pub fn with_vals(mut self, vals: Vec<i64>) -> Self {
62        self.vals = vals;
63        self
64    }
65}
66
67impl Default for ExecParams {
68    fn default() -> Self {
69        Self { global_size: [1, 1, 1], local_size: Some([1, 1, 1]), vals: Vec::new() }
70    }
71}
72
73/// Hardware command queue for submitting operations to a device.
74///
75/// Queues batch operations and submit them to hardware atomically.
76/// All operations are non-blocking until `submit()` is called.
77///
78/// # Thread Safety
79///
80/// Queues are `Send` but not necessarily `Sync`. Each queue should be
81/// owned by a single thread/task at a time.
82pub trait HardwareQueue: Send + std::fmt::Debug {
83    /// The timeline signal type used by this queue.
84    type Signal: TimelineSignal;
85
86    /// Wait for a signal to reach a value before executing subsequent operations.
87    ///
88    /// This creates a dependency: operations after this call won't start
89    /// until the signal reaches `value`.
90    fn wait(&mut self, signal: &Self::Signal, value: u64) -> &mut Self;
91
92    /// Signal a value after all previous operations complete.
93    ///
94    /// Operations submitted after this call may start before the signal is set.
95    fn signal(&mut self, signal: &Self::Signal, value: u64) -> &mut Self;
96
97    /// Execute a compiled program with the given buffers and parameters.
98    ///
99    /// # Arguments
100    ///
101    /// * `program` - The compiled program to execute
102    /// * `buffers` - Buffer arguments (raw pointers extracted internally)
103    /// * `params` - Execution parameters (grid size, etc.)
104    ///
105    /// # Safety
106    ///
107    /// Caller must ensure:
108    /// - All buffers are allocated
109    /// - No conflicting buffer accesses (handled by executor)
110    fn exec(&mut self, program: Arc<dyn Program>, buffers: &[&Buffer], params: &ExecParams) -> &mut Self;
111
112    /// Copy data between buffers.
113    ///
114    /// Both buffers must be accessible from this queue's device.
115    /// For cross-device copies, use the executor's transfer mechanism.
116    fn copy(&mut self, dst: &Buffer, src: &Buffer) -> &mut Self;
117
118    /// Insert a memory barrier.
119    ///
120    /// Ensures all previous memory operations are visible to subsequent operations.
121    /// Mostly needed for CPU and some GPU memory models.
122    fn memory_barrier(&mut self) -> &mut Self;
123
124    /// Submit all batched operations to the hardware.
125    ///
126    /// This is the only blocking point - it submits work but doesn't wait
127    /// for completion. Use signals to synchronize.
128    fn submit(&mut self) -> Result<()>;
129
130    /// Get the device this queue belongs to.
131    fn device(&self) -> &DeviceSpec;
132}
133
134/// Factory for creating hardware queues.
135///
136/// Each device implementation provides a factory that creates queues
137/// for that device type.
138pub trait QueueFactory: Send + Sync + std::fmt::Debug {
139    /// The queue type produced by this factory.
140    type Queue: HardwareQueue;
141
142    /// The signal type used by queues from this factory.
143    type Signal: TimelineSignal;
144
145    /// Create a new compute queue.
146    fn create_compute_queue(&self) -> Result<Self::Queue>;
147
148    /// Create a new copy/DMA queue if supported.
149    ///
150    /// Returns `None` if the device doesn't support separate copy queues.
151    fn create_copy_queue(&self) -> Result<Option<Self::Queue>>;
152
153    /// Create a new timeline signal.
154    fn create_signal(&self) -> Result<Arc<Self::Signal>>;
155
156    /// Get the device specification.
157    fn device(&self) -> &DeviceSpec;
158}
159
160/// Type-erased queue for use in the unified executor.
161///
162/// This wraps a concrete `HardwareQueue` implementation and provides
163/// a common interface that doesn't require knowing the signal type.
164pub struct DynQueue {
165    inner: Box<dyn DynQueueInner>,
166}
167
168impl std::fmt::Debug for DynQueue {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("DynQueue").field("device", &self.inner.device()).finish()
171    }
172}
173
174impl DynQueue {
175    /// Create a new type-erased queue from a concrete implementation.
176    pub fn new<Q: HardwareQueue + 'static>(queue: Q) -> Self
177    where
178        Q::Signal: 'static,
179    {
180        Self { inner: Box::new(DynQueueWrapper { queue, _phantom: std::marker::PhantomData }) }
181    }
182
183    /// Wait for a type-erased signal.
184    pub fn wait(&mut self, signal: &dyn TimelineSignal, value: u64) -> &mut Self {
185        self.inner.wait_dyn(signal, value);
186        self
187    }
188
189    /// Signal completion.
190    pub fn signal(&mut self, signal: &dyn TimelineSignal, value: u64) -> &mut Self {
191        self.inner.signal_dyn(signal, value);
192        self
193    }
194
195    /// Execute a program.
196    pub fn exec(&mut self, program: Arc<dyn Program>, buffers: &[&Buffer], params: &ExecParams) -> &mut Self {
197        self.inner.exec_dyn(program, buffers, params);
198        self
199    }
200
201    /// Copy between buffers.
202    pub fn copy(&mut self, dst: &Buffer, src: &Buffer) -> &mut Self {
203        self.inner.copy_dyn(dst, src);
204        self
205    }
206
207    /// Insert memory barrier.
208    pub fn memory_barrier(&mut self) -> &mut Self {
209        self.inner.memory_barrier_dyn();
210        self
211    }
212
213    /// Submit to hardware.
214    pub fn submit(&mut self) -> Result<()> {
215        self.inner.submit_dyn()
216    }
217
218    /// Get the device.
219    pub fn device(&self) -> &DeviceSpec {
220        self.inner.device()
221    }
222}
223
224/// Internal trait for type erasure.
225trait DynQueueInner: Send + std::fmt::Debug {
226    fn wait_dyn(&mut self, signal: &dyn TimelineSignal, value: u64);
227    fn signal_dyn(&mut self, signal: &dyn TimelineSignal, value: u64);
228    fn exec_dyn(&mut self, program: Arc<dyn Program>, buffers: &[&Buffer], params: &ExecParams);
229    fn copy_dyn(&mut self, dst: &Buffer, src: &Buffer);
230    fn memory_barrier_dyn(&mut self);
231    fn submit_dyn(&mut self) -> Result<()>;
232    fn device(&self) -> &DeviceSpec;
233}
234
235/// Wrapper for concrete queue types.
236struct DynQueueWrapper<Q: HardwareQueue> {
237    queue: Q,
238    _phantom: std::marker::PhantomData<Q::Signal>,
239}
240
241impl<Q: HardwareQueue> std::fmt::Debug for DynQueueWrapper<Q> {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        f.debug_struct("DynQueueWrapper").field("queue", &self.queue).finish()
244    }
245}
246
247impl<Q: HardwareQueue + 'static> DynQueueInner for DynQueueWrapper<Q>
248where
249    Q::Signal: 'static,
250{
251    fn wait_dyn(&mut self, signal: &dyn TimelineSignal, value: u64) {
252        let signal = signal.as_any().downcast_ref::<Q::Signal>().unwrap_or_else(|| {
253            panic!("DynQueue wait signal type mismatch: queue device {:?}, signal {:?}", self.queue.device(), signal)
254        });
255        self.queue.wait(signal, value);
256    }
257
258    fn signal_dyn(&mut self, signal: &dyn TimelineSignal, value: u64) {
259        let signal = signal.as_any().downcast_ref::<Q::Signal>().unwrap_or_else(|| {
260            panic!("DynQueue signal type mismatch: queue device {:?}, signal {:?}", self.queue.device(), signal)
261        });
262        self.queue.signal(signal, value);
263    }
264
265    fn exec_dyn(&mut self, program: Arc<dyn Program>, buffers: &[&Buffer], params: &ExecParams) {
266        self.queue.exec(program, buffers, params);
267    }
268
269    fn copy_dyn(&mut self, dst: &Buffer, src: &Buffer) {
270        self.queue.copy(dst, src);
271    }
272
273    fn memory_barrier_dyn(&mut self) {
274        self.queue.memory_barrier();
275    }
276
277    fn submit_dyn(&mut self) -> Result<()> {
278        self.queue.submit()
279    }
280
281    fn device(&self) -> &DeviceSpec {
282        self.queue.device()
283    }
284}