use super::staging_reserve::reserve_vec_capacity as reserve_descriptor_vec;
use crate::PipelineError;
use smallvec::SmallVec;
const ARGS_PER_SLOT_USIZE: usize = 12;
use super::{protocol, Megakernel};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BuiltinOpcode {
Nop,
StoreU32,
AtomicAdd,
LoadU32,
CompareSwap,
Memcpy,
DfaStep,
BatchFence,
Printf,
Shutdown,
}
impl BuiltinOpcode {
#[must_use]
pub const fn into_wire(self) -> u32 {
match self {
Self::Nop => protocol::opcode::NOP,
Self::StoreU32 => protocol::opcode::STORE_U32,
Self::AtomicAdd => protocol::opcode::ATOMIC_ADD,
Self::LoadU32 => protocol::opcode::LOAD_U32,
Self::CompareSwap => protocol::opcode::COMPARE_SWAP,
Self::Memcpy => protocol::opcode::MEMCPY,
Self::DfaStep => protocol::opcode::DFA_STEP,
Self::BatchFence => protocol::opcode::BATCH_FENCE,
Self::Printf => protocol::opcode::PRINTF,
Self::Shutdown => protocol::opcode::SHUTDOWN,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SlotOpcode {
Builtin(BuiltinOpcode),
Custom(u32),
}
impl SlotOpcode {
#[must_use]
pub const fn into_wire(self) -> u32 {
match self {
Self::Builtin(op) => op.into_wire(),
Self::Custom(op) => op,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PackedOpDescriptor {
pub opcode: u8,
pub args: Vec<u32>,
}
impl PackedOpDescriptor {
#[must_use]
pub fn new(opcode: u8, args: Vec<u32>) -> Self {
Self { opcode, args }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SlotDescriptor {
Single {
tenant_id: u32,
opcode: SlotOpcode,
args: Vec<u32>,
},
Packed {
tenant_id: u32,
ops: Vec<PackedOpDescriptor>,
},
}
impl SlotDescriptor {
#[must_use]
pub fn single(tenant_id: u32, opcode: SlotOpcode, args: Vec<u32>) -> Self {
Self::Single {
tenant_id,
opcode,
args,
}
}
#[must_use]
pub fn packed(tenant_id: u32, ops: Vec<PackedOpDescriptor>) -> Self {
Self::Packed { tenant_id, ops }
}
pub fn publish_into(&self, ring_bytes: &mut [u8], slot_idx: u32) -> Result<(), PipelineError> {
match self {
Self::Single {
tenant_id,
opcode,
args,
} => {
Megakernel::publish_slot(ring_bytes, slot_idx, *tenant_id, opcode.into_wire(), args)
}
Self::Packed { tenant_id, ops } => {
Megakernel::publish_packed_descriptors(ring_bytes, slot_idx, *tenant_id, ops)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchDescriptor {
pub start_slot: u32,
pub items: Vec<SlotDescriptor>,
}
impl BatchDescriptor {
#[must_use]
pub fn new(start_slot: u32, items: Vec<SlotDescriptor>) -> Self {
Self { start_slot, items }
}
pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
let item_count = u32::try_from(self.items.len()).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: "batch size exceeds u32::MAX slots",
})?;
if item_count > 0 {
self.start_slot
.checked_add(item_count - 1)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "batch start plus item count overflows u32; split the descriptor batch before publishing",
})?;
}
for (slot_offset, item) in (0..item_count).zip(self.items.iter()) {
let slot_idx = self
.start_slot
.checked_add(slot_offset)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix:
"batch slot index overflowed u32; split the descriptor batch before publishing",
})?;
item.publish_into(ring_bytes, slot_idx)?;
}
Ok(item_count)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowClass {
Required,
Lookahead,
}
impl WindowClass {
#[must_use]
pub const fn into_wire(self) -> u32 {
match self {
Self::Required => 0,
Self::Lookahead => 1,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WindowDescriptor {
pub start_slot: u32,
pub tenant_id: u32,
pub opcode: SlotOpcode,
pub ticket: u32,
pub required: Vec<Vec<u32>>,
pub lookahead: Vec<Vec<u32>>,
}
impl WindowDescriptor {
#[must_use]
pub fn new(
start_slot: u32,
tenant_id: u32,
opcode: SlotOpcode,
ticket: u32,
required: Vec<Vec<u32>>,
lookahead: Vec<Vec<u32>>,
) -> Self {
Self {
start_slot,
tenant_id,
opcode,
ticket,
required,
lookahead,
}
}
#[must_use]
pub fn into_batch(&self) -> BatchDescriptor {
match self.try_into_batch() {
Ok(batch) => batch,
Err(_) => BatchDescriptor::new(self.start_slot, Vec::new()),
}
}
pub fn try_into_batch(&self) -> Result<BatchDescriptor, PipelineError> {
let item_count = self
.required
.len()
.checked_add(self.lookahead.len())
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix:
"window item count overflowed usize; split the window before materializing a batch",
})?;
let mut items = Vec::new();
reserve_descriptor_vec(&mut items, item_count, "window batch item")?;
for payload in &self.required {
let mut args = window_payload_args(self.ticket, WindowClass::Required, payload)?;
args.push(self.ticket);
args.push(WindowClass::Required.into_wire());
args.extend(payload.iter().copied());
items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
}
for payload in &self.lookahead {
let mut args = window_payload_args(self.ticket, WindowClass::Lookahead, payload)?;
args.push(self.ticket);
args.push(WindowClass::Lookahead.into_wire());
args.extend(payload.iter().copied());
items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
}
Ok(BatchDescriptor::new(self.start_slot, items))
}
pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
let consumed = self
.required
.len()
.checked_add(self.lookahead.len())
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window item count overflowed usize; split the window before publishing",
})?;
let consumed_u32 = u32::try_from(consumed).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: "window size exceeds u32::MAX slots; split the window before publishing",
})?;
if consumed_u32 == 0 {
return Ok(0);
}
self.start_slot
.checked_add(consumed_u32 - 1)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window start plus item count overflows u32; split the window before publishing",
})?;
let mut slot_offset = 0u32;
let mut args = SmallVec::<[u32; ARGS_PER_SLOT_USIZE]>::new();
for payload in &self.required {
publish_window_payload(
ring_bytes,
self.start_slot,
&mut slot_offset,
self.tenant_id,
self.opcode,
self.ticket,
WindowClass::Required,
payload,
&mut args,
)?;
}
for payload in &self.lookahead {
publish_window_payload(
ring_bytes,
self.start_slot,
&mut slot_offset,
self.tenant_id,
self.opcode,
self.ticket,
WindowClass::Lookahead,
payload,
&mut args,
)?;
}
Ok(slot_offset)
}
}
fn window_payload_args(
_ticket: u32,
_class: WindowClass,
payload: &[u32],
) -> Result<Vec<u32>, PipelineError> {
let required_args = payload
.len()
.checked_add(2)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window payload argument count overflowed usize; split the payload before materializing a batch",
})?;
if required_args > ARGS_PER_SLOT_USIZE {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
});
}
let mut args = Vec::new();
reserve_descriptor_vec(&mut args, required_args, "window payload arg")?;
Ok(args)
}
fn publish_window_payload(
ring_bytes: &mut [u8],
start_slot: u32,
slot_offset: &mut u32,
tenant_id: u32,
opcode: SlotOpcode,
ticket: u32,
class: WindowClass,
payload: &[u32],
args: &mut SmallVec<[u32; ARGS_PER_SLOT_USIZE]>,
) -> Result<(), PipelineError> {
let slot_idx = start_slot
.checked_add(*slot_offset)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window slot index overflowed u32; split the window before publishing",
})?;
args.clear();
let required_args = payload
.len()
.checked_add(2)
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window payload argument count overflowed usize; split the payload before publishing",
})?;
if required_args > ARGS_PER_SLOT_USIZE {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
});
}
args.push(ticket);
args.push(class.into_wire());
args.extend_from_slice(payload);
Megakernel::publish_slot(ring_bytes, slot_idx, tenant_id, opcode.into_wire(), args)?;
*slot_offset = slot_offset.checked_add(1).ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "window slot count overflowed u32; split the window before publishing",
})?;
Ok(())
}
#[cfg(test)]
mod tests;