use std::sync::atomic::{fence, Ordering};
use crate::PipelineError;
use super::super::protocol::slot;
use super::helpers::queue_word_index;
use super::{io_op, io_status, io_word, IoCompletion, IO_SLOT_COUNT, IO_SLOT_WORDS};
#[derive(Debug, Clone)]
pub struct MegakernelIoQueue {
words: Vec<u32>,
slot_count: u32,
}
impl MegakernelIoQueue {
pub fn new(slot_count: u32) -> Result<Self, PipelineError> {
if slot_count == 0 {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "MegakernelIoQueue requires at least one slot",
});
}
if slot_count > IO_SLOT_COUNT {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "MegakernelIoQueue exceeds the compiled IO poll window of 64 slots; enlarge IO_SLOT_COUNT and rebuild the megakernel before publishing more than 64 completions",
});
}
let word_count = slot_count
.checked_mul(IO_SLOT_WORDS)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "io_queue word count overflows u32; shard the queue before allocating",
})?;
let word_count = usize::try_from(word_count).map_err(|error| {
PipelineError::Backend(format!(
"io_queue word count cannot fit host usize: {error}. Fix: shard the queue before allocating."
))
})?;
Ok(Self {
words: vec![0; word_count],
slot_count,
})
}
#[must_use]
pub fn as_bytes(&self) -> &[u8] {
bytemuck::cast_slice(&self.words)
}
#[must_use]
pub fn as_mut_bytes(&mut self) -> &mut [u8] {
bytemuck::cast_slice_mut(&mut self.words)
}
#[must_use]
pub fn slot_count(&self) -> u32 {
self.slot_count
}
pub fn publish_slot(
&mut self,
queue_slot: u32,
mapped_slot: u32,
byte_count: u32,
tag: u32,
) -> Result<(), PipelineError> {
if queue_slot >= self.slot_count {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or publish into a valid slot id",
});
}
let current_status = self.read_word(queue_slot, io_word::STATUS);
if current_status != slot::EMPTY {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "io_queue slot still in flight; wait for the GPU to recycle it before publishing again",
});
}
self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, 0);
self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, mapped_slot);
self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
self.write_word_unfenced(queue_slot, io_word::TAG, tag);
fence(Ordering::Release);
self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
fence(Ordering::Release);
Ok(())
}
pub fn submit_dma_read(
&mut self,
queue_slot: u32,
src_handle: u32,
dst_handle: u32,
byte_count: u32,
tag: u32,
) -> Result<(), PipelineError> {
if queue_slot >= self.slot_count {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or submit into a valid slot id",
});
}
let current_status = self.read_word(queue_slot, io_word::STATUS);
if current_status != slot::EMPTY {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "io_queue slot still in flight; wait for completion before submitting a new request",
});
}
self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, src_handle);
self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, dst_handle);
self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
self.write_word_unfenced(queue_slot, io_word::TAG, tag);
fence(Ordering::Release);
self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
fence(Ordering::Release);
Ok(())
}
#[must_use]
pub fn completion(&self, queue_slot: u32) -> Option<IoCompletion> {
if queue_slot >= self.slot_count {
return None;
}
let status = self.read_word(queue_slot, io_word::STATUS);
if status == slot::EMPTY {
return None;
}
Some(IoCompletion {
slot_idx: queue_slot,
mapped_slot: self.read_word_unfenced(queue_slot, io_word::DST_HANDLE),
byte_count: self.read_word_unfenced(queue_slot, io_word::BYTE_COUNT),
tag: self.read_word_unfenced(queue_slot, io_word::TAG),
})
}
#[must_use]
pub fn is_recycled(&self, queue_slot: u32) -> bool {
if queue_slot >= self.slot_count {
return false;
}
let status = self.read_word(queue_slot, io_word::STATUS);
match status {
slot::EMPTY => true,
slot::PUBLISHED | slot::CLAIMED | io_status::OK | io_status::ERROR | slot::DONE => {
false
}
_ => false,
}
}
fn read_word(&self, slot_idx: u32, word: u32) -> u32 {
let idx = queue_word_index(slot_idx, word);
fence(Ordering::Acquire);
self.words[idx]
}
fn read_word_unfenced(&self, slot_idx: u32, word: u32) -> u32 {
let idx = queue_word_index(slot_idx, word);
self.words[idx]
}
fn write_word_unfenced(&mut self, slot_idx: u32, word: u32, value: u32) {
let idx = queue_word_index(slot_idx, word);
self.words[idx] = value;
}
}