svod_runtime/devices/
cpu_queue.rs1use std::sync::Arc;
7
8use svod_device::device::Program as DeviceProgram;
9use svod_device::{Buffer, CpuTimelineSignal, ExecParams, HardwareQueue, TimelineSignal};
10use svod_dtype::DeviceSpec;
11
12use crate::error::Result;
13
14#[allow(dead_code)] enum PendingOp {
17 Wait { signal: CpuTimelineSignal, value: u64 },
19 Signal { signal: CpuTimelineSignal, value: u64 },
21 Exec {
23 program: Arc<dyn DeviceProgram>,
24 buffer_ptrs: Vec<*mut u8>,
25 vals: Vec<i64>,
26 global_size: Option<[usize; 3]>,
27 local_size: Option<[usize; 3]>,
28 },
29 Copy { dst_ptr: *mut u8, src_ptr: *const u8, size: usize },
31 MemoryBarrier,
33}
34
35unsafe impl Send for PendingOp {}
38
39pub struct CpuQueue {
44 pending: Vec<PendingOp>,
46 errors: Vec<String>,
48 device: DeviceSpec,
50}
51
52impl std::fmt::Debug for CpuQueue {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("CpuQueue")
55 .field("pending_count", &self.pending.len())
56 .field("error_count", &self.errors.len())
57 .field("device", &self.device)
58 .finish()
59 }
60}
61
62impl CpuQueue {
63 pub fn new() -> Self {
65 Self { pending: Vec::new(), errors: Vec::new(), device: DeviceSpec::Cpu }
66 }
67
68 fn execute_op(op: PendingOp) -> Result<()> {
70 match op {
71 PendingOp::Wait { signal, value } => {
72 signal.wait(value, 0).map_err(|e| crate::Error::Device { source: e })?;
73 }
74 PendingOp::Signal { signal, value } => {
75 signal.set(value);
76 }
77 PendingOp::Exec { program, buffer_ptrs, vals, global_size, local_size } => {
78 unsafe {
80 program
81 .execute(&buffer_ptrs, &vals, global_size, local_size)
82 .map_err(|e| crate::Error::Device { source: e })?;
83 }
84 }
85 PendingOp::Copy { dst_ptr, src_ptr, size } => {
86 unsafe {
88 std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size);
89 }
90 }
91 PendingOp::MemoryBarrier => {
92 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
94 }
95 }
96 Ok(())
97 }
98}
99
100impl Default for CpuQueue {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106impl HardwareQueue for CpuQueue {
107 type Signal = CpuTimelineSignal;
108
109 fn wait(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
110 self.pending.push(PendingOp::Wait { signal: signal.clone(), value });
111 self
112 }
113
114 fn signal(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
115 self.pending.push(PendingOp::Signal { signal: signal.clone(), value });
116 self
117 }
118
119 fn exec(&mut self, program: Arc<dyn DeviceProgram>, buffers: &[&Buffer], params: &ExecParams) -> &mut Self {
120 let mut buffer_ptrs = Vec::with_capacity(buffers.len());
121 for buffer in buffers {
122 if let Err(err) = buffer.ensure_allocated() {
123 self.errors.push(format!("CPU queue exec buffer allocation failed: {err}"));
124 return self;
125 }
126 buffer_ptrs.push(unsafe { buffer.as_raw_ptr() });
129 }
130
131 self.pending.push(PendingOp::Exec {
132 program,
133 buffer_ptrs,
134 vals: params.vals.clone(),
135 global_size: Some(params.global_size),
136 local_size: params.local_size,
137 });
138
139 self
140 }
141
142 fn copy(&mut self, dst: &Buffer, src: &Buffer) -> &mut Self {
143 if src.size() != dst.size() {
144 self.errors.push(format!(
145 "CPU queue copy size mismatch: src={} bytes, dst={} bytes",
146 src.size(),
147 dst.size()
148 ));
149 return self;
150 }
151 if let Err(err) = dst.ensure_allocated() {
152 self.errors.push(format!("CPU queue copy dst allocation failed: {err}"));
153 return self;
154 }
155 if let Err(err) = src.ensure_allocated() {
156 self.errors.push(format!("CPU queue copy src allocation failed: {err}"));
157 return self;
158 }
159 let dst_ptr = unsafe { dst.as_raw_ptr() };
162 let src_ptr = unsafe { src.as_raw_ptr() as *const u8 };
163 let size = src.size();
164
165 self.pending.push(PendingOp::Copy { dst_ptr, src_ptr, size });
166 self
167 }
168
169 fn memory_barrier(&mut self) -> &mut Self {
170 self.pending.push(PendingOp::MemoryBarrier);
171 self
172 }
173
174 fn submit(&mut self) -> svod_device::Result<()> {
175 if !self.errors.is_empty() {
176 let errors = std::mem::take(&mut self.errors);
177 self.pending.clear();
178 return Err(svod_device::Error::Runtime { message: errors.join("; ") });
179 }
180
181 let ops = std::mem::take(&mut self.pending);
183
184 for op in ops {
185 Self::execute_op(op).map_err(|e| svod_device::Error::Runtime { message: e.to_string() })?;
186 }
187
188 Ok(())
189 }
190
191 fn device(&self) -> &DeviceSpec {
192 &self.device
193 }
194}
195
196#[cfg(test)]
197#[path = "../test/unit/cpu_queue.rs"]
198mod tests;