1use super::io;
4use super::protocol;
5use super::protocol_api::{validate_control_bytes, validate_debug_log_bytes};
6use crate::PipelineError;
7
8#[derive(Debug, Clone, Default, PartialEq, Eq)]
10pub struct MegakernelReadback {
11 pub control_bytes: Vec<u8>,
13 pub ring_bytes: Vec<u8>,
15 pub debug_log_bytes: Vec<u8>,
17 pub io_queue_bytes: Vec<u8>,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub struct MegakernelReadbackCounters {
24 pub control_bytes: usize,
26 pub ring_bytes: usize,
28 pub debug_log_bytes: usize,
30 pub io_queue_bytes: usize,
32 pub total_bytes: usize,
34}
35
36impl MegakernelReadback {
37 pub fn from_outputs(outputs: Vec<Vec<u8>>, slot_count: u32) -> Result<Self, PipelineError> {
44 Self::validate_output_refs(&outputs, slot_count)?;
45 let [control, ring, debug_log, io_queue] =
46 <[Vec<u8>; 4]>::try_from(outputs).map_err(|outputs| {
47 PipelineError::Backend(format!(
48 "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable between validation and decode.",
49 outputs.len()
50 ))
51 })?;
52 Ok(Self {
53 control_bytes: control,
54 ring_bytes: ring,
55 debug_log_bytes: debug_log,
56 io_queue_bytes: io_queue,
57 })
58 }
59
60 pub fn from_outputs_into(
67 mut outputs: Vec<Vec<u8>>,
68 slot_count: u32,
69 out: &mut Self,
70 ) -> Result<(), PipelineError> {
71 Self::drain_outputs_into(&mut outputs, slot_count, out)
72 }
73
74 pub fn drain_outputs_into(
82 outputs: &mut Vec<Vec<u8>>,
83 slot_count: u32,
84 out: &mut Self,
85 ) -> Result<(), PipelineError> {
86 Self::validate_output_refs(outputs, slot_count)?;
87 if outputs.len() != 4 {
88 return Err(PipelineError::Backend(format!(
89 "megakernel readback returned {} buffers after validation, expected 4. Fix: keep output ownership immutable during drain.",
90 outputs.len()
91 )));
92 }
93 std::mem::swap(&mut out.control_bytes, &mut outputs[0]);
94 std::mem::swap(&mut out.ring_bytes, &mut outputs[1]);
95 std::mem::swap(&mut out.debug_log_bytes, &mut outputs[2]);
96 std::mem::swap(&mut out.io_queue_bytes, &mut outputs[3]);
97 Ok(())
98 }
99
100 pub fn slot_count(&self) -> Result<u32, PipelineError> {
106 let slot_words = usize::try_from(protocol::SLOT_WORDS).map_err(|_| {
107 PipelineError::Backend(
108 "megakernel SLOT_WORDS overflowed usize. Fix: reduce SLOT_WORDS.".to_string(),
109 )
110 })?;
111 let slot_bytes = slot_words
112 .checked_mul(std::mem::size_of::<u32>())
113 .ok_or_else(|| {
114 PipelineError::Backend(
115 "megakernel slot byte width overflowed usize. Fix: reduce SLOT_WORDS."
116 .to_string(),
117 )
118 })?;
119 if self.ring_bytes.len() % slot_bytes != 0 {
120 return Err(PipelineError::Backend(format!(
121 "megakernel readback ring has {} bytes, not a multiple of {slot_bytes}. Fix: rebuild the ring with Megakernel::encode_empty_ring.",
122 self.ring_bytes.len()
123 )));
124 }
125 u32::try_from(self.ring_bytes.len() / slot_bytes).map_err(|_| {
126 PipelineError::Backend(
127 "megakernel readback slot count overflowed u32. Fix: split the ring into smaller shards."
128 .to_string(),
129 )
130 })
131 }
132
133 #[must_use]
135 pub fn counters(&self) -> MegakernelReadbackCounters {
136 let control_bytes = self.control_bytes.len();
137 let ring_bytes = self.ring_bytes.len();
138 let debug_log_bytes = self.debug_log_bytes.len();
139 let io_queue_bytes = self.io_queue_bytes.len();
140 MegakernelReadbackCounters {
141 control_bytes,
142 ring_bytes,
143 debug_log_bytes,
144 io_queue_bytes,
145 total_bytes: checked_add_usize(
146 checked_add_usize(
147 checked_add_usize(control_bytes, ring_bytes, "megakernel readback total bytes"),
148 debug_log_bytes,
149 "megakernel readback total bytes",
150 ),
151 io_queue_bytes,
152 "megakernel readback total bytes",
153 ),
154 }
155 }
156
157 fn validate_output_refs(outputs: &[Vec<u8>], slot_count: u32) -> Result<(), PipelineError> {
158 let [control, ring, debug_log, io_queue] = outputs else {
159 return Err(PipelineError::Backend(format!(
160 "megakernel readback returned {} buffers, expected 4. Fix: keep builder output declarations aligned with control/ring/debug/io ABI order.",
161 outputs.len()
162 )));
163 };
164 validate_control_bytes(control)?;
165 validate_debug_log_bytes(debug_log)?;
166 io::validate_io_queue_bytes(io_queue)?;
167 let expected_ring_bytes = protocol::ring_byte_len(slot_count).ok_or_else(|| {
168 PipelineError::Backend(
169 "megakernel ring byte length overflowed usize during readback validation. Fix: split the ring into smaller shards."
170 .to_string(),
171 )
172 })?;
173 if ring.len() != expected_ring_bytes {
174 return Err(PipelineError::Backend(format!(
175 "megakernel readback ring has {} bytes, expected {expected_ring_bytes}. Fix: read back the full ring buffer for the compiled slot count.",
176 ring.len()
177 )));
178 }
179 Ok(())
180 }
181}
182
183fn checked_add_usize(left: usize, right: usize, label: &str) -> usize {
184 left.checked_add(right).unwrap_or_else(|| {
185 panic!("{label} overflowed usize. Fix: split megakernel readback buffers before telemetry/accounting.")
186 })
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 fn valid_outputs(slot_count: u32) -> Vec<Vec<u8>> {
194 vec![
195 crate::megakernel::Megakernel::try_encode_control(false, 1, 4).unwrap(),
196 crate::megakernel::Megakernel::try_encode_empty_ring(slot_count).unwrap(),
197 crate::megakernel::Megakernel::try_encode_empty_debug_log(
198 protocol::debug::RECORD_CAPACITY,
199 )
200 .unwrap(),
201 io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap(),
202 ]
203 }
204
205 #[test]
206 fn drain_outputs_into_retains_reusable_output_slots() {
207 let mut outputs = valid_outputs(4);
208 let mut readback = MegakernelReadback::default();
209
210 MegakernelReadback::drain_outputs_into(&mut outputs, 4, &mut readback)
211 .expect("Fix: valid megakernel outputs must decode");
212
213 assert_eq!(outputs.len(), 4);
214 assert!(outputs.iter().all(Vec::is_empty));
215 assert!(!readback.control_bytes.is_empty());
216 assert!(!readback.ring_bytes.is_empty());
217 assert!(!readback.debug_log_bytes.is_empty());
218 assert!(!readback.io_queue_bytes.is_empty());
219 }
220
221 #[test]
222 fn readback_counters_report_total_volume() {
223 let readback = MegakernelReadback::from_outputs(valid_outputs(4), 4)
224 .expect("Fix: valid megakernel outputs must decode");
225 let counters = readback.counters();
226
227 assert_eq!(counters.control_bytes, readback.control_bytes.len());
228 assert_eq!(counters.ring_bytes, readback.ring_bytes.len());
229 assert_eq!(counters.debug_log_bytes, readback.debug_log_bytes.len());
230 assert_eq!(counters.io_queue_bytes, readback.io_queue_bytes.len());
231 assert_eq!(
232 counters.total_bytes,
233 readback.control_bytes.len()
234 + readback.ring_bytes.len()
235 + readback.debug_log_bytes.len()
236 + readback.io_queue_bytes.len()
237 );
238 }
239}