vyre-runtime 0.6.2

Persistent megakernel + io_uring zero-copy streaming runtime for vyre - GPU as VIR0 bytecode interpreter
Documentation
//! Typed host readback view for persistent megakernel outputs.

use super::io;
use super::protocol;
use super::protocol_api::{validate_control_bytes, validate_debug_log_bytes};
use crate::PipelineError;

/// Decoded megakernel output buffers in ABI order.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MegakernelReadback {
    /// Control buffer bytes after dispatch.
    pub control_bytes: Vec<u8>,
    /// Ring buffer bytes after dispatch.
    pub ring_bytes: Vec<u8>,
    /// Debug-log buffer bytes after dispatch.
    pub debug_log_bytes: Vec<u8>,
    /// IO queue bytes after dispatch.
    pub io_queue_bytes: Vec<u8>,
}

/// Host-visible byte volume for one strict megakernel readback.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct MegakernelReadbackCounters {
    /// Bytes copied back for the control buffer.
    pub control_bytes: usize,
    /// Bytes copied back for the ring buffer.
    pub ring_bytes: usize,
    /// Bytes copied back for the debug log.
    pub debug_log_bytes: usize,
    /// Bytes copied back for the IO queue.
    pub io_queue_bytes: usize,
    /// Total host-visible readback bytes.
    pub total_bytes: usize,
}

impl MegakernelReadback {
    /// Decode the backend output vector produced by [`super::Megakernel`].
    ///
    /// # Errors
    ///
    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
    /// shapes do not match the persistent megakernel ABI.
    pub fn from_outputs(outputs: Vec<Vec<u8>>, slot_count: u32) -> Result<Self, PipelineError> {
        Self::validate_output_refs(&outputs, slot_count)?;
        let [control, ring, debug_log, io_queue] =
            <[Vec<u8>; 4]>::try_from(outputs).map_err(|outputs| {
                PipelineError::Backend(format!(
                    "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable between validation and decode.",
                    outputs.len()
                ))
            })?;
        Ok(Self {
            control_bytes: control,
            ring_bytes: ring,
            debug_log_bytes: debug_log,
            io_queue_bytes: io_queue,
        })
    }

    /// Decode backend outputs into caller-owned readback storage.
    ///
    /// # Errors
    ///
    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
    /// shapes do not match the persistent megakernel ABI.
    pub fn from_outputs_into(
        mut outputs: Vec<Vec<u8>>,
        slot_count: u32,
        out: &mut Self,
    ) -> Result<(), PipelineError> {
        Self::drain_outputs_into(&mut outputs, slot_count, out)
    }

    /// Decode backend outputs into caller-owned readback storage while
    /// preserving the outer output-vector allocation for the next dispatch.
    ///
    /// # Errors
    ///
    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
    /// shapes do not match the persistent megakernel ABI.
    pub fn drain_outputs_into(
        outputs: &mut Vec<Vec<u8>>,
        slot_count: u32,
        out: &mut Self,
    ) -> Result<(), PipelineError> {
        Self::validate_output_refs(outputs, slot_count)?;
        if outputs.len() != 4 {
            return Err(PipelineError::Backend(format!(
                "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable during drain.",
                outputs.len()
            )));
        }
        std::mem::swap(&mut out.control_bytes, &mut outputs[0]);
        std::mem::swap(&mut out.ring_bytes, &mut outputs[1]);
        std::mem::swap(&mut out.debug_log_bytes, &mut outputs[2]);
        std::mem::swap(&mut out.io_queue_bytes, &mut outputs[3]);
        Ok(())
    }

    /// Number of slots described by this readback ring.
    ///
    /// # Errors
    ///
    /// Returns when the ring length is not a whole number of slot records.
    pub fn slot_count(&self) -> Result<u32, PipelineError> {
        let slot_words = usize::try_from(protocol::SLOT_WORDS).map_err(|_| {
            PipelineError::Backend(
                "megakernel SLOT_WORDS overflowed usize. Fix: reduce SLOT_WORDS.".to_string(),
            )
        })?;
        let slot_bytes = slot_words
            .checked_mul(std::mem::size_of::<u32>())
            .ok_or_else(|| {
                PipelineError::Backend(
                    "megakernel slot byte width overflowed usize. Fix: reduce SLOT_WORDS."
                        .to_string(),
                )
            })?;
        if self.ring_bytes.len() % slot_bytes != 0 {
            return Err(PipelineError::Backend(format!(
                "megakernel readback ring has {} bytes, not a multiple of {slot_bytes}. Fix: rebuild the ring with Megakernel::encode_empty_ring.",
                self.ring_bytes.len()
            )));
        }
        u32::try_from(self.ring_bytes.len() / slot_bytes).map_err(|_| {
            PipelineError::Backend(
                "megakernel readback slot count overflowed u32. Fix: split the ring into smaller shards."
                    .to_string(),
            )
        })
    }

    /// Host-visible readback byte counters for B.21 telemetry.
    #[must_use]
    pub fn counters(&self) -> MegakernelReadbackCounters {
        let control_bytes = self.control_bytes.len();
        let ring_bytes = self.ring_bytes.len();
        let debug_log_bytes = self.debug_log_bytes.len();
        let io_queue_bytes = self.io_queue_bytes.len();
        MegakernelReadbackCounters {
            control_bytes,
            ring_bytes,
            debug_log_bytes,
            io_queue_bytes,
            total_bytes: checked_add_usize(
                checked_add_usize(
                    checked_add_usize(control_bytes, ring_bytes, "megakernel readback total bytes"),
                    debug_log_bytes,
                    "megakernel readback total bytes",
                ),
                io_queue_bytes,
                "megakernel readback total bytes",
            ),
        }
    }

    fn validate_output_refs(outputs: &[Vec<u8>], slot_count: u32) -> Result<(), PipelineError> {
        let [control, ring, debug_log, io_queue] = outputs else {
            return Err(PipelineError::Backend(format!(
                "megakernel readback returned {} buffers, expected 4. Fix: keep builder output declarations aligned with control/ring/debug/io ABI order.",
                outputs.len()
            )));
        };
        validate_control_bytes(control)?;
        validate_debug_log_bytes(debug_log)?;
        io::validate_io_queue_bytes(io_queue)?;
        let expected_ring_bytes = protocol::ring_byte_len(slot_count).ok_or_else(|| {
            PipelineError::Backend(
                "megakernel ring byte length overflowed usize during readback validation. Fix: split the ring into smaller shards."
                    .to_string(),
            )
        })?;
        if ring.len() != expected_ring_bytes {
            return Err(PipelineError::Backend(format!(
                "megakernel readback ring has {} bytes, expected {expected_ring_bytes}. Fix: read back the full ring buffer for the compiled slot count.",
                ring.len()
            )));
        }
        Ok(())
    }
}

fn checked_add_usize(left: usize, right: usize, label: &str) -> usize {
    left.checked_add(right).unwrap_or_else(|| {
        panic!("{label} overflowed usize. Fix: split megakernel readback buffers before telemetry/accounting.")
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    fn valid_outputs(slot_count: u32) -> Vec<Vec<u8>> {
        vec![
            crate::megakernel::Megakernel::try_encode_control(false, 1, 4).unwrap(),
            crate::megakernel::Megakernel::try_encode_empty_ring(slot_count).unwrap(),
            crate::megakernel::Megakernel::try_encode_empty_debug_log(
                protocol::debug::RECORD_CAPACITY,
            )
            .unwrap(),
            io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap(),
        ]
    }

    #[test]
    fn drain_outputs_into_retains_reusable_output_slots() {
        let mut outputs = valid_outputs(4);
        let mut readback = MegakernelReadback::default();

        MegakernelReadback::drain_outputs_into(&mut outputs, 4, &mut readback)
            .expect("Fix: valid megakernel outputs must decode");

        assert_eq!(outputs.len(), 4);
        assert!(outputs.iter().all(Vec::is_empty));
        assert!(!readback.control_bytes.is_empty());
        assert!(!readback.ring_bytes.is_empty());
        assert!(!readback.debug_log_bytes.is_empty());
        assert!(!readback.io_queue_bytes.is_empty());
    }

    #[test]
    fn readback_counters_report_total_volume() {
        let readback = MegakernelReadback::from_outputs(valid_outputs(4), 4)
            .expect("Fix: valid megakernel outputs must decode");
        let counters = readback.counters();

        assert_eq!(counters.control_bytes, readback.control_bytes.len());
        assert_eq!(counters.ring_bytes, readback.ring_bytes.len());
        assert_eq!(counters.debug_log_bytes, readback.debug_log_bytes.len());
        assert_eq!(counters.io_queue_bytes, readback.io_queue_bytes.len());
        assert_eq!(
            counters.total_bytes,
            readback.control_bytes.len()
                + readback.ring_bytes.len()
                + readback.debug_log_bytes.len()
                + readback.io_queue_bytes.len()
        );
    }
}