Skip to main content

svod_runtime/devices/
cpu_queue.rs

1//! CPU hardware queue implementation using rayon.
2//!
3//! The `CpuQueue` batches kernel executions and submits them to rayon's
4//! thread pool for parallel execution.
5
6use std::sync::Arc;
7
8use svod_device::device::Program as DeviceProgram;
9use svod_device::{Buffer, CpuTimelineSignal, ExecParams, HardwareQueue, TimelineSignal};
10use svod_dtype::DeviceSpec;
11
12use crate::error::Result;
13
14/// Pending operation in the CPU queue.
15#[allow(dead_code)] // Will be used when batching is implemented
16enum PendingOp {
17    /// Wait for a signal to reach a value.
18    Wait { signal: CpuTimelineSignal, value: u64 },
19    /// Signal a value.
20    Signal { signal: CpuTimelineSignal, value: u64 },
21    /// Execute a program.
22    Exec {
23        program: Arc<dyn DeviceProgram>,
24        buffer_ptrs: Vec<*mut u8>,
25        vals: Vec<i64>,
26        global_size: Option<[usize; 3]>,
27        local_size: Option<[usize; 3]>,
28    },
29    /// Copy between buffers.
30    Copy { dst_ptr: *mut u8, src_ptr: *const u8, size: usize },
31    /// Memory barrier (no-op on CPU, for API compatibility).
32    MemoryBarrier,
33}
34
35// SAFETY: Buffer pointers are only used during submit() which is single-threaded.
36// The scheduler ensures exclusive access to buffers during execution.
37unsafe impl Send for PendingOp {}
38
39/// CPU command queue using rayon for parallel execution.
40///
41/// Operations are batched and submitted to rayon's thread pool.
42/// The queue itself is not thread-safe - use one per thread.
43pub struct CpuQueue {
44    /// Pending operations to execute on submit.
45    pending: Vec<PendingOp>,
46    /// Deferred builder-style errors surfaced by submit().
47    errors: Vec<String>,
48    /// Device specification.
49    device: DeviceSpec,
50}
51
52impl std::fmt::Debug for CpuQueue {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("CpuQueue")
55            .field("pending_count", &self.pending.len())
56            .field("error_count", &self.errors.len())
57            .field("device", &self.device)
58            .finish()
59    }
60}
61
62impl CpuQueue {
63    /// Create a new CPU queue.
64    pub fn new() -> Self {
65        Self { pending: Vec::new(), errors: Vec::new(), device: DeviceSpec::Cpu }
66    }
67
68    /// Execute a single pending operation.
69    fn execute_op(op: PendingOp) -> Result<()> {
70        match op {
71            PendingOp::Wait { signal, value } => {
72                signal.wait(value, 0).map_err(|e| crate::Error::Device { source: e })?;
73            }
74            PendingOp::Signal { signal, value } => {
75                signal.set(value);
76            }
77            PendingOp::Exec { program, buffer_ptrs, vals, global_size, local_size } => {
78                // SAFETY: Scheduler guarantees exclusive access during execution
79                unsafe {
80                    program
81                        .execute(&buffer_ptrs, &vals, global_size, local_size)
82                        .map_err(|e| crate::Error::Device { source: e })?;
83                }
84            }
85            PendingOp::Copy { dst_ptr, src_ptr, size } => {
86                // SAFETY: Scheduler guarantees exclusive access
87                unsafe {
88                    std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size);
89                }
90            }
91            PendingOp::MemoryBarrier => {
92                // CPU memory model is already coherent (no-op)
93                std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
94            }
95        }
96        Ok(())
97    }
98}
99
100impl Default for CpuQueue {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl HardwareQueue for CpuQueue {
107    type Signal = CpuTimelineSignal;
108
109    fn wait(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
110        self.pending.push(PendingOp::Wait { signal: signal.clone(), value });
111        self
112    }
113
114    fn signal(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
115        self.pending.push(PendingOp::Signal { signal: signal.clone(), value });
116        self
117    }
118
119    fn exec(&mut self, program: Arc<dyn DeviceProgram>, buffers: &[&Buffer], params: &ExecParams) -> &mut Self {
120        let mut buffer_ptrs = Vec::with_capacity(buffers.len());
121        for buffer in buffers {
122            if let Err(err) = buffer.ensure_allocated() {
123                self.errors.push(format!("CPU queue exec buffer allocation failed: {err}"));
124                return self;
125            }
126            // SAFETY: the buffer was just allocated and the queued op executes before
127            // submit() returns; scheduler ownership rules protect aliasing.
128            buffer_ptrs.push(unsafe { buffer.as_raw_ptr() });
129        }
130
131        self.pending.push(PendingOp::Exec {
132            program,
133            buffer_ptrs,
134            vals: params.vals.clone(),
135            global_size: Some(params.global_size),
136            local_size: params.local_size,
137        });
138
139        self
140    }
141
142    fn copy(&mut self, dst: &Buffer, src: &Buffer) -> &mut Self {
143        if src.size() != dst.size() {
144            self.errors.push(format!(
145                "CPU queue copy size mismatch: src={} bytes, dst={} bytes",
146                src.size(),
147                dst.size()
148            ));
149            return self;
150        }
151        if let Err(err) = dst.ensure_allocated() {
152            self.errors.push(format!("CPU queue copy dst allocation failed: {err}"));
153            return self;
154        }
155        if let Err(err) = src.ensure_allocated() {
156            self.errors.push(format!("CPU queue copy src allocation failed: {err}"));
157            return self;
158        }
159        // SAFETY: both buffers are allocated; the queued op executes before submit() returns
160        // and scheduler ownership rules protect aliasing.
161        let dst_ptr = unsafe { dst.as_raw_ptr() };
162        let src_ptr = unsafe { src.as_raw_ptr() as *const u8 };
163        let size = src.size();
164
165        self.pending.push(PendingOp::Copy { dst_ptr, src_ptr, size });
166        self
167    }
168
169    fn memory_barrier(&mut self) -> &mut Self {
170        self.pending.push(PendingOp::MemoryBarrier);
171        self
172    }
173
174    fn submit(&mut self) -> svod_device::Result<()> {
175        if !self.errors.is_empty() {
176            let errors = std::mem::take(&mut self.errors);
177            self.pending.clear();
178            return Err(svod_device::Error::Runtime { message: errors.join("; ") });
179        }
180
181        // CPU queue is intentionally serial; parallelism across kernels is handled by ExecutionPlan.
182        let ops = std::mem::take(&mut self.pending);
183
184        for op in ops {
185            Self::execute_op(op).map_err(|e| svod_device::Error::Runtime { message: e.to_string() })?;
186        }
187
188        Ok(())
189    }
190
191    fn device(&self) -> &DeviceSpec {
192        &self.device
193    }
194}
195
196#[cfg(test)]
197#[path = "../test/unit/cpu_queue.rs"]
198mod tests;