use super::io;
use super::protocol;
use super::protocol_api::{validate_control_bytes, validate_debug_log_bytes};
use crate::PipelineError;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MegakernelReadback {
pub control_bytes: Vec<u8>,
pub ring_bytes: Vec<u8>,
pub debug_log_bytes: Vec<u8>,
pub io_queue_bytes: Vec<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct MegakernelReadbackCounters {
pub control_bytes: usize,
pub ring_bytes: usize,
pub debug_log_bytes: usize,
pub io_queue_bytes: usize,
pub total_bytes: usize,
}
impl MegakernelReadback {
pub fn from_outputs(outputs: Vec<Vec<u8>>, slot_count: u32) -> Result<Self, PipelineError> {
Self::validate_output_refs(&outputs, slot_count)?;
let [control, ring, debug_log, io_queue] =
<[Vec<u8>; 4]>::try_from(outputs).map_err(|outputs| {
PipelineError::Backend(format!(
"megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable between validation and decode.",
outputs.len()
))
})?;
Ok(Self {
control_bytes: control,
ring_bytes: ring,
debug_log_bytes: debug_log,
io_queue_bytes: io_queue,
})
}
pub fn from_outputs_into(
mut outputs: Vec<Vec<u8>>,
slot_count: u32,
out: &mut Self,
) -> Result<(), PipelineError> {
Self::drain_outputs_into(&mut outputs, slot_count, out)
}
pub fn drain_outputs_into(
outputs: &mut Vec<Vec<u8>>,
slot_count: u32,
out: &mut Self,
) -> Result<(), PipelineError> {
Self::validate_output_refs(outputs, slot_count)?;
if outputs.len() != 4 {
return Err(PipelineError::Backend(format!(
"megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable during drain.",
outputs.len()
)));
}
std::mem::swap(&mut out.control_bytes, &mut outputs[0]);
std::mem::swap(&mut out.ring_bytes, &mut outputs[1]);
std::mem::swap(&mut out.debug_log_bytes, &mut outputs[2]);
std::mem::swap(&mut out.io_queue_bytes, &mut outputs[3]);
Ok(())
}
pub fn slot_count(&self) -> Result<u32, PipelineError> {
let slot_words = usize::try_from(protocol::SLOT_WORDS).map_err(|_| {
PipelineError::Backend(
"megakernel SLOT_WORDS overflowed usize. Fix: reduce SLOT_WORDS.".to_string(),
)
})?;
let slot_bytes = slot_words
.checked_mul(std::mem::size_of::<u32>())
.ok_or_else(|| {
PipelineError::Backend(
"megakernel slot byte width overflowed usize. Fix: reduce SLOT_WORDS."
.to_string(),
)
})?;
if self.ring_bytes.len() % slot_bytes != 0 {
return Err(PipelineError::Backend(format!(
"megakernel readback ring has {} bytes, not a multiple of {slot_bytes}. Fix: rebuild the ring with Megakernel::encode_empty_ring.",
self.ring_bytes.len()
)));
}
u32::try_from(self.ring_bytes.len() / slot_bytes).map_err(|_| {
PipelineError::Backend(
"megakernel readback slot count overflowed u32. Fix: split the ring into smaller shards."
.to_string(),
)
})
}
#[must_use]
pub fn counters(&self) -> MegakernelReadbackCounters {
let control_bytes = self.control_bytes.len();
let ring_bytes = self.ring_bytes.len();
let debug_log_bytes = self.debug_log_bytes.len();
let io_queue_bytes = self.io_queue_bytes.len();
MegakernelReadbackCounters {
control_bytes,
ring_bytes,
debug_log_bytes,
io_queue_bytes,
total_bytes: checked_add_usize(
checked_add_usize(
checked_add_usize(control_bytes, ring_bytes, "megakernel readback total bytes"),
debug_log_bytes,
"megakernel readback total bytes",
),
io_queue_bytes,
"megakernel readback total bytes",
),
}
}
fn validate_output_refs(outputs: &[Vec<u8>], slot_count: u32) -> Result<(), PipelineError> {
let [control, ring, debug_log, io_queue] = outputs else {
return Err(PipelineError::Backend(format!(
"megakernel readback returned {} buffers, expected 4. Fix: keep builder output declarations aligned with control/ring/debug/io ABI order.",
outputs.len()
)));
};
validate_control_bytes(control)?;
validate_debug_log_bytes(debug_log)?;
io::validate_io_queue_bytes(io_queue)?;
let expected_ring_bytes = protocol::ring_byte_len(slot_count).ok_or_else(|| {
PipelineError::Backend(
"megakernel ring byte length overflowed usize during readback validation. Fix: split the ring into smaller shards."
.to_string(),
)
})?;
if ring.len() != expected_ring_bytes {
return Err(PipelineError::Backend(format!(
"megakernel readback ring has {} bytes, expected {expected_ring_bytes}. Fix: read back the full ring buffer for the compiled slot count.",
ring.len()
)));
}
Ok(())
}
}
fn checked_add_usize(left: usize, right: usize, label: &str) -> usize {
let _ = label;
left.saturating_add(right)
}
#[cfg(test)]
mod tests {
use super::*;
fn valid_outputs(slot_count: u32) -> Vec<Vec<u8>> {
vec![
crate::megakernel::Megakernel::try_encode_control(false, 1, 4).unwrap(),
crate::megakernel::Megakernel::try_encode_empty_ring(slot_count).unwrap(),
crate::megakernel::Megakernel::try_encode_empty_debug_log(
protocol::debug::RECORD_CAPACITY,
)
.unwrap(),
io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap(),
]
}
#[test]
fn drain_outputs_into_retains_reusable_output_slots() {
let mut outputs = valid_outputs(4);
let mut readback = MegakernelReadback::default();
MegakernelReadback::drain_outputs_into(&mut outputs, 4, &mut readback)
.expect("Fix: valid megakernel outputs must decode");
assert_eq!(outputs.len(), 4);
assert!(outputs.iter().all(Vec::is_empty));
assert!(!readback.control_bytes.is_empty());
assert!(!readback.ring_bytes.is_empty());
assert!(!readback.debug_log_bytes.is_empty());
assert!(!readback.io_queue_bytes.is_empty());
}
#[test]
fn readback_counters_report_total_volume() {
let readback = MegakernelReadback::from_outputs(valid_outputs(4), 4)
.expect("Fix: valid megakernel outputs must decode");
let counters = readback.counters();
assert_eq!(counters.control_bytes, readback.control_bytes.len());
assert_eq!(counters.ring_bytes, readback.ring_bytes.len());
assert_eq!(counters.debug_log_bytes, readback.debug_log_bytes.len());
assert_eq!(counters.io_queue_bytes, readback.io_queue_bytes.len());
assert_eq!(
counters.total_bytes,
readback.control_bytes.len()
+ readback.ring_bytes.len()
+ readback.debug_log_bytes.len()
+ readback.io_queue_bytes.len()
);
}
}