use super::protocol::{self, ProtocolError};
const SLOT_WORDS_USIZE: usize = 16;
const STATUS_WORD_USIZE: usize = 0;
pub const SLOT_BYTES: usize = SLOT_WORDS_USIZE * 4;
pub trait RingProducer {
fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError>;
fn slot_count(&self) -> u32;
fn name(&self) -> &'static str;
}
pub trait RingConsumer {
fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError>;
fn done_count(&self) -> u32 {
let mut acc = 0u32;
let mut buf = [0u8; SLOT_BYTES];
for slot in 0..self.slot_count() {
if self.read_slot(slot, &mut buf).is_err() {
continue;
}
let status_offset = STATUS_WORD_USIZE * 4;
let word = u32::from_le_bytes([
buf[status_offset],
buf[status_offset + 1],
buf[status_offset + 2],
buf[status_offset + 3],
]);
if word == protocol::slot::DONE {
acc = acc.checked_add(1).unwrap_or_else(|| {
panic!(
"megakernel ring consumer done_count overflowed u32. Fix: shard the ring before host observation."
)
});
}
}
acc
}
fn slot_count(&self) -> u32;
}
pub struct HostRing {
bytes: Vec<u8>,
slot_count: u32,
}
impl HostRing {
pub fn new(slot_count: u32) -> Result<Self, ProtocolError> {
let bytes = protocol::try_encode_empty_ring(slot_count)?;
Ok(Self { bytes, slot_count })
}
#[must_use]
pub fn as_bytes(&self) -> &[u8] {
&self.bytes
}
#[must_use]
pub fn as_bytes_mut(&mut self) -> &mut [u8] {
&mut self.bytes
}
}
fn ring_slot_base(slot_idx: u32) -> Result<usize, ProtocolError> {
usize::try_from(slot_idx)
.map_err(|_| ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: usize::MAX,
byte_len: 0,
fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
})?
.checked_mul(SLOT_BYTES)
.ok_or(ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: usize::MAX,
byte_len: 0,
fix: "slot byte offset overflowed usize; shard the megakernel ring before host access",
})
}
fn ring_slot_word_index(slot_idx: u32) -> Result<usize, ProtocolError> {
usize::try_from(slot_idx)
.map_err(|_| ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: usize::MAX,
byte_len: 0,
fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
})?
.checked_mul(SLOT_WORDS_USIZE)
.ok_or(ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: usize::MAX,
byte_len: 0,
fix: "slot word offset overflowed usize; shard the megakernel ring before host access",
})
}
impl RingProducer for HostRing {
fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError> {
if encoded.len() != SLOT_BYTES {
return Err(ProtocolError::MisalignedByteLength {
buffer: "ring slot",
byte_len: encoded.len(),
fix: "encoded slot must be exactly SLOT_BYTES (64) long",
});
}
if slot_idx >= self.slot_count {
return Err(ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: ring_slot_word_index(slot_idx)?,
byte_len: self.bytes.len(),
fix: "slot_idx must be < slot_count",
});
}
let base = ring_slot_base(slot_idx)?;
self.bytes[base..base + SLOT_BYTES].copy_from_slice(encoded);
Ok(())
}
fn slot_count(&self) -> u32 {
self.slot_count
}
fn name(&self) -> &'static str {
"in-process-host"
}
}
impl RingConsumer for HostRing {
fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError> {
if out.len() != SLOT_BYTES {
return Err(ProtocolError::MisalignedByteLength {
buffer: "ring slot",
byte_len: out.len(),
fix: "out slice must be exactly SLOT_BYTES (64) long",
});
}
if slot_idx >= self.slot_count {
return Err(ProtocolError::MissingWord {
buffer: "ring slot",
word_idx: ring_slot_word_index(slot_idx)?,
byte_len: self.bytes.len(),
fix: "slot_idx must be < slot_count",
});
}
let base = ring_slot_base(slot_idx)?;
out.copy_from_slice(&self.bytes[base..base + SLOT_BYTES]);
Ok(())
}
fn done_count(&self) -> u32 {
let status_word_offset = STATUS_WORD_USIZE * 4;
let mut done = 0u32;
let slot_count = usize::try_from(self.slot_count).unwrap_or_else(|source| {
panic!(
"megakernel ring slot_count cannot fit usize: {source}. Fix: shard the ring before host observation."
)
});
for slot in 0..slot_count {
let base = slot
.checked_mul(SLOT_BYTES)
.and_then(|offset| offset.checked_add(status_word_offset))
.unwrap_or_else(|| {
panic!(
"megakernel ring status byte offset overflowed usize. Fix: shard the ring before host observation."
)
});
let word = u32::from_le_bytes([
self.bytes[base],
self.bytes[base + 1],
self.bytes[base + 2],
self.bytes[base + 3],
]);
if word == protocol::slot::DONE {
done = done.checked_add(1).unwrap_or_else(|| {
panic!(
"megakernel ring done count overflowed u32. Fix: shard the ring before host observation."
)
});
}
}
done
}
fn slot_count(&self) -> u32 {
self.slot_count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn host_ring_publishes_and_round_trips_a_load_miss() {
let mut ring = HostRing::new(4).expect("Fix: ring constructs");
let encoded = protocol::encode_load_miss(123, true);
RingProducer::publish(&mut ring, 1, &encoded).expect("Fix: publish");
let mut slot_bytes = [0u8; SLOT_BYTES];
RingConsumer::read_slot(&ring, 1, &mut slot_bytes).expect("Fix: read_slot");
assert_eq!(slot_bytes.as_slice(), encoded.as_slice());
let decoded = protocol::decode_load_miss(ring.as_bytes(), 1);
assert_eq!(decoded, Some((123, true)));
}
#[test]
fn host_ring_rejects_out_of_range_slot() {
let mut ring = HostRing::new(2).unwrap();
let encoded = protocol::encode_load_miss(0, false);
let err_hi = RingProducer::publish(&mut ring, 2, &encoded).expect_err("slot 2 OOB");
assert!(
err_hi.to_string().contains("slot") || err_hi.to_string().contains("range"),
"OOB publish error: {err_hi}"
);
let err_max =
RingProducer::publish(&mut ring, u32::MAX, &encoded).expect_err("slot MAX OOB");
assert!(
err_max.to_string().contains("slot") || err_max.to_string().contains("range"),
"MAX slot publish error: {err_max}"
);
let mut buf = [0u8; SLOT_BYTES];
let read_err = RingConsumer::read_slot(&ring, 2, &mut buf).expect_err("read OOB");
assert!(
read_err.to_string().contains("slot") || read_err.to_string().contains("range"),
"OOB read error: {read_err}"
);
}
#[test]
fn host_ring_rejects_mis_sized_encoded() {
let mut ring = HostRing::new(2).unwrap();
let short = [0u8; SLOT_BYTES - 1];
let short_pub = RingProducer::publish(&mut ring, 0, &short).expect_err("short publish");
assert!(
short_pub.to_string().contains("SLOT") || short_pub.to_string().contains("byte"),
"short publish error: {short_pub}"
);
let long = [0u8; SLOT_BYTES + 1];
let long_pub = RingProducer::publish(&mut ring, 0, &long).expect_err("long publish");
assert!(
long_pub.to_string().contains("SLOT") || long_pub.to_string().contains("byte"),
"long publish error: {long_pub}"
);
let mut short_out = [0u8; SLOT_BYTES - 1];
let short_read =
RingConsumer::read_slot(&ring, 0, &mut short_out).expect_err("short read buffer");
assert!(
short_read.to_string().contains("SLOT") || short_read.to_string().contains("byte"),
"short read error: {short_read}"
);
}
#[test]
fn default_done_count_walks_the_ring() {
let mut ring = HostRing::new(4).unwrap();
assert_eq!(RingConsumer::done_count(&ring), 0);
let bytes = ring.as_bytes_mut();
let status_offset = STATUS_WORD_USIZE * 4;
bytes[status_offset..status_offset + 4]
.copy_from_slice(&protocol::slot::DONE.to_le_bytes());
let status_offset_2 = 2 * SLOT_BYTES + STATUS_WORD_USIZE * 4;
bytes[status_offset_2..status_offset_2 + 4]
.copy_from_slice(&protocol::slot::DONE.to_le_bytes());
assert_eq!(RingConsumer::done_count(&ring), 2);
}
}