Skip to main content

vyre_runtime/megakernel/
readback.rs

1//! Typed host readback view for persistent megakernel outputs.
2
3use super::io;
4use super::protocol;
5use super::protocol_api::{validate_control_bytes, validate_debug_log_bytes};
6use crate::PipelineError;
7
8/// Decoded megakernel output buffers in ABI order.
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
10pub struct MegakernelReadback {
11    /// Control buffer bytes after dispatch.
12    pub control_bytes: Vec<u8>,
13    /// Ring buffer bytes after dispatch.
14    pub ring_bytes: Vec<u8>,
15    /// Debug-log buffer bytes after dispatch.
16    pub debug_log_bytes: Vec<u8>,
17    /// IO queue bytes after dispatch.
18    pub io_queue_bytes: Vec<u8>,
19}
20
21/// Host-visible byte volume for one strict megakernel readback.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct MegakernelReadbackCounters {
24    /// Bytes copied back for the control buffer.
25    pub control_bytes: usize,
26    /// Bytes copied back for the ring buffer.
27    pub ring_bytes: usize,
28    /// Bytes copied back for the debug log.
29    pub debug_log_bytes: usize,
30    /// Bytes copied back for the IO queue.
31    pub io_queue_bytes: usize,
32    /// Total host-visible readback bytes.
33    pub total_bytes: usize,
34}
35
36impl MegakernelReadback {
37    /// Decode the backend output vector produced by [`super::Megakernel`].
38    ///
39    /// # Errors
40    ///
41    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
42    /// shapes do not match the persistent megakernel ABI.
43    pub fn from_outputs(outputs: Vec<Vec<u8>>, slot_count: u32) -> Result<Self, PipelineError> {
44        Self::validate_output_refs(&outputs, slot_count)?;
45        let [control, ring, debug_log, io_queue] =
46            <[Vec<u8>; 4]>::try_from(outputs).map_err(|outputs| {
47                PipelineError::Backend(format!(
48                    "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable between validation and decode.",
49                    outputs.len()
50                ))
51            })?;
52        Ok(Self {
53            control_bytes: control,
54            ring_bytes: ring,
55            debug_log_bytes: debug_log,
56            io_queue_bytes: io_queue,
57        })
58    }
59
60    /// Decode backend outputs into caller-owned readback storage.
61    ///
62    /// # Errors
63    ///
64    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
65    /// shapes do not match the persistent megakernel ABI.
66    pub fn from_outputs_into(
67        mut outputs: Vec<Vec<u8>>,
68        slot_count: u32,
69        out: &mut Self,
70    ) -> Result<(), PipelineError> {
71        Self::drain_outputs_into(&mut outputs, slot_count, out)
72    }
73
74    /// Decode backend outputs into caller-owned readback storage while
75    /// preserving the outer output-vector allocation for the next dispatch.
76    ///
77    /// # Errors
78    ///
79    /// Returns [`PipelineError::Backend`] when output count or protocol buffer
80    /// shapes do not match the persistent megakernel ABI.
81    pub fn drain_outputs_into(
82        outputs: &mut Vec<Vec<u8>>,
83        slot_count: u32,
84        out: &mut Self,
85    ) -> Result<(), PipelineError> {
86        Self::validate_output_refs(outputs, slot_count)?;
87        if outputs.len() != 4 {
88            return Err(PipelineError::Backend(format!(
89                "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable during drain.",
90                outputs.len()
91            )));
92        }
93        std::mem::swap(&mut out.control_bytes, &mut outputs[0]);
94        std::mem::swap(&mut out.ring_bytes, &mut outputs[1]);
95        std::mem::swap(&mut out.debug_log_bytes, &mut outputs[2]);
96        std::mem::swap(&mut out.io_queue_bytes, &mut outputs[3]);
97        Ok(())
98    }
99
100    /// Number of slots described by this readback ring.
101    ///
102    /// # Errors
103    ///
104    /// Returns when the ring length is not a whole number of slot records.
105    pub fn slot_count(&self) -> Result<u32, PipelineError> {
106        let slot_words = usize::try_from(protocol::SLOT_WORDS).map_err(|_| {
107            PipelineError::Backend(
108                "megakernel SLOT_WORDS overflowed usize. Fix: reduce SLOT_WORDS.".to_string(),
109            )
110        })?;
111        let slot_bytes = slot_words
112            .checked_mul(std::mem::size_of::<u32>())
113            .ok_or_else(|| {
114                PipelineError::Backend(
115                    "megakernel slot byte width overflowed usize. Fix: reduce SLOT_WORDS."
116                        .to_string(),
117                )
118            })?;
119        if self.ring_bytes.len() % slot_bytes != 0 {
120            return Err(PipelineError::Backend(format!(
121                "megakernel readback ring has {} bytes, not a multiple of {slot_bytes}. Fix: rebuild the ring with Megakernel::encode_empty_ring.",
122                self.ring_bytes.len()
123            )));
124        }
125        u32::try_from(self.ring_bytes.len() / slot_bytes).map_err(|_| {
126            PipelineError::Backend(
127                "megakernel readback slot count overflowed u32. Fix: split the ring into smaller shards."
128                    .to_string(),
129            )
130        })
131    }
132
133    /// Host-visible readback byte counters for B.21 telemetry.
134    #[must_use]
135    pub fn counters(&self) -> MegakernelReadbackCounters {
136        let control_bytes = self.control_bytes.len();
137        let ring_bytes = self.ring_bytes.len();
138        let debug_log_bytes = self.debug_log_bytes.len();
139        let io_queue_bytes = self.io_queue_bytes.len();
140        MegakernelReadbackCounters {
141            control_bytes,
142            ring_bytes,
143            debug_log_bytes,
144            io_queue_bytes,
145            total_bytes: checked_add_usize(
146                checked_add_usize(
147                    checked_add_usize(control_bytes, ring_bytes, "megakernel readback total bytes"),
148                    debug_log_bytes,
149                    "megakernel readback total bytes",
150                ),
151                io_queue_bytes,
152                "megakernel readback total bytes",
153            ),
154        }
155    }
156
157    fn validate_output_refs(outputs: &[Vec<u8>], slot_count: u32) -> Result<(), PipelineError> {
158        let [control, ring, debug_log, io_queue] = outputs else {
159            return Err(PipelineError::Backend(format!(
160                "megakernel readback returned {} buffers, expected 4. Fix: keep builder output declarations aligned with control/ring/debug/io ABI order.",
161                outputs.len()
162            )));
163        };
164        validate_control_bytes(control)?;
165        validate_debug_log_bytes(debug_log)?;
166        io::validate_io_queue_bytes(io_queue)?;
167        let expected_ring_bytes = protocol::ring_byte_len(slot_count).ok_or_else(|| {
168            PipelineError::Backend(
169                "megakernel ring byte length overflowed usize during readback validation. Fix: split the ring into smaller shards."
170                    .to_string(),
171            )
172        })?;
173        if ring.len() != expected_ring_bytes {
174            return Err(PipelineError::Backend(format!(
175                "megakernel readback ring has {} bytes, expected {expected_ring_bytes}. Fix: read back the full ring buffer for the compiled slot count.",
176                ring.len()
177            )));
178        }
179        Ok(())
180    }
181}
182
183fn checked_add_usize(left: usize, right: usize, label: &str) -> usize {
184    left.checked_add(right).unwrap_or_else(|| {
185        panic!("{label} overflowed usize. Fix: split megakernel readback buffers before telemetry/accounting.")
186    })
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    fn valid_outputs(slot_count: u32) -> Vec<Vec<u8>> {
194        vec![
195            crate::megakernel::Megakernel::try_encode_control(false, 1, 4).unwrap(),
196            crate::megakernel::Megakernel::try_encode_empty_ring(slot_count).unwrap(),
197            crate::megakernel::Megakernel::try_encode_empty_debug_log(
198                protocol::debug::RECORD_CAPACITY,
199            )
200            .unwrap(),
201            io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap(),
202        ]
203    }
204
205    #[test]
206    fn drain_outputs_into_retains_reusable_output_slots() {
207        let mut outputs = valid_outputs(4);
208        let mut readback = MegakernelReadback::default();
209
210        MegakernelReadback::drain_outputs_into(&mut outputs, 4, &mut readback)
211            .expect("Fix: valid megakernel outputs must decode");
212
213        assert_eq!(outputs.len(), 4);
214        assert!(outputs.iter().all(Vec::is_empty));
215        assert!(!readback.control_bytes.is_empty());
216        assert!(!readback.ring_bytes.is_empty());
217        assert!(!readback.debug_log_bytes.is_empty());
218        assert!(!readback.io_queue_bytes.is_empty());
219    }
220
221    #[test]
222    fn readback_counters_report_total_volume() {
223        let readback = MegakernelReadback::from_outputs(valid_outputs(4), 4)
224            .expect("Fix: valid megakernel outputs must decode");
225        let counters = readback.counters();
226
227        assert_eq!(counters.control_bytes, readback.control_bytes.len());
228        assert_eq!(counters.ring_bytes, readback.ring_bytes.len());
229        assert_eq!(counters.debug_log_bytes, readback.debug_log_bytes.len());
230        assert_eq!(counters.io_queue_bytes, readback.io_queue_bytes.len());
231        assert_eq!(
232            counters.total_bytes,
233            readback.control_bytes.len()
234                + readback.ring_bytes.len()
235                + readback.debug_log_bytes.len()
236                + readback.io_queue_bytes.len()
237        );
238    }
239}