1use super::protocol::{self, ProtocolError};
36
37const SLOT_WORDS_USIZE: usize = 16;
38const STATUS_WORD_USIZE: usize = 0;
39pub const SLOT_BYTES: usize = SLOT_WORDS_USIZE * 4;
41
42pub trait RingProducer {
50 fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError>;
54
55 fn slot_count(&self) -> u32;
57
58 fn name(&self) -> &'static str;
61}
62
63pub trait RingConsumer {
65 fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError>;
69
70 fn done_count(&self) -> u32 {
74 let mut acc = 0u32;
75 let mut buf = [0u8; SLOT_BYTES];
76 for slot in 0..self.slot_count() {
77 if self.read_slot(slot, &mut buf).is_err() {
78 continue;
79 }
80 let status_offset = STATUS_WORD_USIZE * 4;
81 let word = u32::from_le_bytes([
82 buf[status_offset],
83 buf[status_offset + 1],
84 buf[status_offset + 2],
85 buf[status_offset + 3],
86 ]);
87 if word == protocol::slot::DONE {
88 acc = acc.checked_add(1).unwrap_or_else(|| {
89 panic!(
90 "megakernel ring consumer done_count overflowed u32. Fix: shard the ring before host observation."
91 )
92 });
93 }
94 }
95 acc
96 }
97
98 fn slot_count(&self) -> u32;
100}
101
102pub struct HostRing {
107 bytes: Vec<u8>,
108 slot_count: u32,
109}
110
111impl HostRing {
112 pub fn new(slot_count: u32) -> Result<Self, ProtocolError> {
119 let bytes = protocol::try_encode_empty_ring(slot_count)?;
120 Ok(Self { bytes, slot_count })
121 }
122
123 #[must_use]
126 pub fn as_bytes(&self) -> &[u8] {
127 &self.bytes
128 }
129
130 #[must_use]
132 pub fn as_bytes_mut(&mut self) -> &mut [u8] {
133 &mut self.bytes
134 }
135}
136
137fn ring_slot_base(slot_idx: u32) -> Result<usize, ProtocolError> {
138 usize::try_from(slot_idx)
139 .map_err(|_| ProtocolError::MissingWord {
140 buffer: "ring slot",
141 word_idx: usize::MAX,
142 byte_len: 0,
143 fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
144 })?
145 .checked_mul(SLOT_BYTES)
146 .ok_or(ProtocolError::MissingWord {
147 buffer: "ring slot",
148 word_idx: usize::MAX,
149 byte_len: 0,
150 fix: "slot byte offset overflowed usize; shard the megakernel ring before host access",
151 })
152}
153
154fn ring_slot_word_index(slot_idx: u32) -> Result<usize, ProtocolError> {
155 usize::try_from(slot_idx)
156 .map_err(|_| ProtocolError::MissingWord {
157 buffer: "ring slot",
158 word_idx: usize::MAX,
159 byte_len: 0,
160 fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
161 })?
162 .checked_mul(SLOT_WORDS_USIZE)
163 .ok_or(ProtocolError::MissingWord {
164 buffer: "ring slot",
165 word_idx: usize::MAX,
166 byte_len: 0,
167 fix: "slot word offset overflowed usize; shard the megakernel ring before host access",
168 })
169}
170
171impl RingProducer for HostRing {
172 fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError> {
173 if encoded.len() != SLOT_BYTES {
174 return Err(ProtocolError::MisalignedByteLength {
175 buffer: "ring slot",
176 byte_len: encoded.len(),
177 fix: "encoded slot must be exactly SLOT_BYTES (64) long",
178 });
179 }
180 if slot_idx >= self.slot_count {
181 return Err(ProtocolError::MissingWord {
182 buffer: "ring slot",
183 word_idx: ring_slot_word_index(slot_idx)?,
184 byte_len: self.bytes.len(),
185 fix: "slot_idx must be < slot_count",
186 });
187 }
188 let base = ring_slot_base(slot_idx)?;
189 self.bytes[base..base + SLOT_BYTES].copy_from_slice(encoded);
190 Ok(())
191 }
192
193 fn slot_count(&self) -> u32 {
194 self.slot_count
195 }
196
197 fn name(&self) -> &'static str {
198 "in-process-host"
199 }
200}
201
202impl RingConsumer for HostRing {
203 fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError> {
204 if out.len() != SLOT_BYTES {
205 return Err(ProtocolError::MisalignedByteLength {
206 buffer: "ring slot",
207 byte_len: out.len(),
208 fix: "out slice must be exactly SLOT_BYTES (64) long",
209 });
210 }
211 if slot_idx >= self.slot_count {
212 return Err(ProtocolError::MissingWord {
213 buffer: "ring slot",
214 word_idx: ring_slot_word_index(slot_idx)?,
215 byte_len: self.bytes.len(),
216 fix: "slot_idx must be < slot_count",
217 });
218 }
219 let base = ring_slot_base(slot_idx)?;
220 out.copy_from_slice(&self.bytes[base..base + SLOT_BYTES]);
221 Ok(())
222 }
223
224 fn done_count(&self) -> u32 {
225 let status_word_offset = STATUS_WORD_USIZE * 4;
226 let mut done = 0u32;
227 let slot_count = usize::try_from(self.slot_count).unwrap_or_else(|source| {
228 panic!(
229 "megakernel ring slot_count cannot fit usize: {source}. Fix: shard the ring before host observation."
230 )
231 });
232 for slot in 0..slot_count {
233 let base = slot
234 .checked_mul(SLOT_BYTES)
235 .and_then(|offset| offset.checked_add(status_word_offset))
236 .unwrap_or_else(|| {
237 panic!(
238 "megakernel ring status byte offset overflowed usize. Fix: shard the ring before host observation."
239 )
240 });
241 let word = u32::from_le_bytes([
242 self.bytes[base],
243 self.bytes[base + 1],
244 self.bytes[base + 2],
245 self.bytes[base + 3],
246 ]);
247 if word == protocol::slot::DONE {
248 done = done.checked_add(1).unwrap_or_else(|| {
249 panic!(
250 "megakernel ring done count overflowed u32. Fix: shard the ring before host observation."
251 )
252 });
253 }
254 }
255 done
256 }
257
258 fn slot_count(&self) -> u32 {
259 self.slot_count
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
271 fn host_ring_publishes_and_round_trips_a_load_miss() {
272 let mut ring = HostRing::new(4).expect("Fix: ring constructs");
273 let encoded = protocol::encode_load_miss(123, true);
274
275 RingProducer::publish(&mut ring, 1, &encoded).expect("Fix: publish");
276
277 let mut slot_bytes = [0u8; SLOT_BYTES];
278 RingConsumer::read_slot(&ring, 1, &mut slot_bytes).expect("Fix: read_slot");
279 assert_eq!(slot_bytes.as_slice(), encoded.as_slice());
280
281 let decoded = protocol::decode_load_miss(ring.as_bytes(), 1);
284 assert_eq!(decoded, Some((123, true)));
285 }
286
287 #[test]
288 fn host_ring_rejects_out_of_range_slot() {
289 let mut ring = HostRing::new(2).unwrap();
290 let encoded = protocol::encode_load_miss(0, false);
291 let err_hi = RingProducer::publish(&mut ring, 2, &encoded).expect_err("slot 2 OOB");
292 assert!(
293 err_hi.to_string().contains("slot") || err_hi.to_string().contains("range"),
294 "OOB publish error: {err_hi}"
295 );
296 let err_max =
297 RingProducer::publish(&mut ring, u32::MAX, &encoded).expect_err("slot MAX OOB");
298 assert!(
299 err_max.to_string().contains("slot") || err_max.to_string().contains("range"),
300 "MAX slot publish error: {err_max}"
301 );
302
303 let mut buf = [0u8; SLOT_BYTES];
304 let read_err = RingConsumer::read_slot(&ring, 2, &mut buf).expect_err("read OOB");
305 assert!(
306 read_err.to_string().contains("slot") || read_err.to_string().contains("range"),
307 "OOB read error: {read_err}"
308 );
309 }
310
311 #[test]
312 fn host_ring_rejects_mis_sized_encoded() {
313 let mut ring = HostRing::new(2).unwrap();
314 let short = [0u8; SLOT_BYTES - 1];
315 let short_pub = RingProducer::publish(&mut ring, 0, &short).expect_err("short publish");
316 assert!(
317 short_pub.to_string().contains("SLOT") || short_pub.to_string().contains("byte"),
318 "short publish error: {short_pub}"
319 );
320 let long = [0u8; SLOT_BYTES + 1];
321 let long_pub = RingProducer::publish(&mut ring, 0, &long).expect_err("long publish");
322 assert!(
323 long_pub.to_string().contains("SLOT") || long_pub.to_string().contains("byte"),
324 "long publish error: {long_pub}"
325 );
326
327 let mut short_out = [0u8; SLOT_BYTES - 1];
328 let short_read =
329 RingConsumer::read_slot(&ring, 0, &mut short_out).expect_err("short read buffer");
330 assert!(
331 short_read.to_string().contains("SLOT") || short_read.to_string().contains("byte"),
332 "short read error: {short_read}"
333 );
334 }
335
336 #[test]
339 fn default_done_count_walks_the_ring() {
340 let mut ring = HostRing::new(4).unwrap();
341 assert_eq!(RingConsumer::done_count(&ring), 0);
343
344 let bytes = ring.as_bytes_mut();
346 let status_offset = STATUS_WORD_USIZE * 4;
347 bytes[status_offset..status_offset + 4]
348 .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
349
350 let status_offset_2 = 2 * SLOT_BYTES + STATUS_WORD_USIZE * 4;
352 bytes[status_offset_2..status_offset_2 + 4]
353 .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
354
355 assert_eq!(RingConsumer::done_count(&ring), 2);
356 }
357}