use std::sync::Arc;
use svod_device::device::Program as DeviceProgram;
use svod_device::{Buffer, CpuTimelineSignal, ExecParams, HardwareQueue, TimelineSignal};
use svod_dtype::DeviceSpec;
use crate::error::Result;
#[allow(dead_code)] enum PendingOp {
Wait { signal: CpuTimelineSignal, value: u64 },
Signal { signal: CpuTimelineSignal, value: u64 },
Exec {
program: Arc<dyn DeviceProgram>,
buffer_ptrs: Vec<*mut u8>,
vals: Vec<i64>,
global_size: Option<[usize; 3]>,
local_size: Option<[usize; 3]>,
},
Copy { dst_ptr: *mut u8, src_ptr: *const u8, size: usize },
MemoryBarrier,
}
unsafe impl Send for PendingOp {}
pub struct CpuQueue {
pending: Vec<PendingOp>,
errors: Vec<String>,
device: DeviceSpec,
}
impl std::fmt::Debug for CpuQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CpuQueue")
.field("pending_count", &self.pending.len())
.field("error_count", &self.errors.len())
.field("device", &self.device)
.finish()
}
}
impl CpuQueue {
pub fn new() -> Self {
Self { pending: Vec::new(), errors: Vec::new(), device: DeviceSpec::Cpu }
}
fn execute_op(op: PendingOp) -> Result<()> {
match op {
PendingOp::Wait { signal, value } => {
signal.wait(value, 0).map_err(|e| crate::Error::Device { source: e })?;
}
PendingOp::Signal { signal, value } => {
signal.set(value);
}
PendingOp::Exec { program, buffer_ptrs, vals, global_size, local_size } => {
unsafe {
program
.execute(&buffer_ptrs, &vals, global_size, local_size)
.map_err(|e| crate::Error::Device { source: e })?;
}
}
PendingOp::Copy { dst_ptr, src_ptr, size } => {
unsafe {
std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size);
}
}
PendingOp::MemoryBarrier => {
std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
}
}
Ok(())
}
}
impl Default for CpuQueue {
fn default() -> Self {
Self::new()
}
}
impl HardwareQueue for CpuQueue {
type Signal = CpuTimelineSignal;
fn wait(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
self.pending.push(PendingOp::Wait { signal: signal.clone(), value });
self
}
fn signal(&mut self, signal: &Self::Signal, value: u64) -> &mut Self {
self.pending.push(PendingOp::Signal { signal: signal.clone(), value });
self
}
fn exec(&mut self, program: Arc<dyn DeviceProgram>, buffers: &[&Buffer], params: &ExecParams) -> &mut Self {
let mut buffer_ptrs = Vec::with_capacity(buffers.len());
for buffer in buffers {
if let Err(err) = buffer.ensure_allocated() {
self.errors.push(format!("CPU queue exec buffer allocation failed: {err}"));
return self;
}
buffer_ptrs.push(unsafe { buffer.as_raw_ptr() });
}
self.pending.push(PendingOp::Exec {
program,
buffer_ptrs,
vals: params.vals.clone(),
global_size: Some(params.global_size),
local_size: params.local_size,
});
self
}
fn copy(&mut self, dst: &Buffer, src: &Buffer) -> &mut Self {
if src.size() != dst.size() {
self.errors.push(format!(
"CPU queue copy size mismatch: src={} bytes, dst={} bytes",
src.size(),
dst.size()
));
return self;
}
if let Err(err) = dst.ensure_allocated() {
self.errors.push(format!("CPU queue copy dst allocation failed: {err}"));
return self;
}
if let Err(err) = src.ensure_allocated() {
self.errors.push(format!("CPU queue copy src allocation failed: {err}"));
return self;
}
let dst_ptr = unsafe { dst.as_raw_ptr() };
let src_ptr = unsafe { src.as_raw_ptr() as *const u8 };
let size = src.size();
self.pending.push(PendingOp::Copy { dst_ptr, src_ptr, size });
self
}
fn memory_barrier(&mut self) -> &mut Self {
self.pending.push(PendingOp::MemoryBarrier);
self
}
fn submit(&mut self) -> svod_device::Result<()> {
if !self.errors.is_empty() {
let errors = std::mem::take(&mut self.errors);
self.pending.clear();
return Err(svod_device::Error::Runtime { message: errors.join("; ") });
}
let ops = std::mem::take(&mut self.pending);
for op in ops {
Self::execute_op(op).map_err(|e| svod_device::Error::Runtime { message: e.to_string() })?;
}
Ok(())
}
fn device(&self) -> &DeviceSpec {
&self.device
}
}
#[cfg(test)]
#[path = "../test/unit/cpu_queue.rs"]
mod tests;