1use super::protocol::{self, ProtocolError};
36
37const SLOT_WORDS_USIZE: usize = 16;
38const STATUS_WORD_USIZE: usize = 0;
39pub const SLOT_BYTES: usize = SLOT_WORDS_USIZE * 4;
41
42pub trait RingProducer {
50 fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError>;
54
55 fn slot_count(&self) -> u32;
57
58 fn name(&self) -> &'static str;
61}
62
63pub trait RingConsumer {
65 fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError>;
69
70 fn try_done_count(&self) -> Result<u32, ProtocolError> {
78 let mut acc = 0u32;
79 let mut buf = [0u8; SLOT_BYTES];
80 for slot in 0..self.slot_count() {
81 self.read_slot(slot, &mut buf)?;
82 if read_slot_status_word(&buf)? == protocol::slot::DONE {
83 acc = acc
84 .checked_add(1)
85 .ok_or(ProtocolError::ByteLengthOverflow {
86 buffer: "ring done count",
87 fix: "shard the ring before host observation",
88 })?;
89 }
90 }
91 Ok(acc)
92 }
93
94 #[deprecated(
99 note = "use RingConsumer::try_done_count so malformed ring snapshots do not collapse to zero"
100 )]
101 fn done_count(&self) -> u32 {
102 self.try_done_count().unwrap_or(0)
103 }
104
105 fn slot_count(&self) -> u32;
107}
108
109pub struct HostRing {
114 bytes: Vec<u8>,
115 slot_count: u32,
116}
117
118impl HostRing {
119 pub fn new(slot_count: u32) -> Result<Self, ProtocolError> {
126 let bytes = protocol::try_encode_empty_ring(slot_count)?;
127 Ok(Self { bytes, slot_count })
128 }
129
130 #[must_use]
133 pub fn as_bytes(&self) -> &[u8] {
134 &self.bytes
135 }
136
137 #[must_use]
139 pub fn as_bytes_mut(&mut self) -> &mut [u8] {
140 &mut self.bytes
141 }
142}
143
144fn ring_slot_base(slot_idx: u32) -> Result<usize, ProtocolError> {
145 usize::try_from(slot_idx)
146 .map_err(|_| ProtocolError::MissingWord {
147 buffer: "ring slot",
148 word_idx: usize::MAX,
149 byte_len: 0,
150 fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
151 })?
152 .checked_mul(SLOT_BYTES)
153 .ok_or(ProtocolError::MissingWord {
154 buffer: "ring slot",
155 word_idx: usize::MAX,
156 byte_len: 0,
157 fix: "slot byte offset overflowed usize; shard the megakernel ring before host access",
158 })
159}
160
161fn ring_slot_word_index(slot_idx: u32) -> Result<usize, ProtocolError> {
162 usize::try_from(slot_idx)
163 .map_err(|_| ProtocolError::MissingWord {
164 buffer: "ring slot",
165 word_idx: usize::MAX,
166 byte_len: 0,
167 fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
168 })?
169 .checked_mul(SLOT_WORDS_USIZE)
170 .ok_or(ProtocolError::MissingWord {
171 buffer: "ring slot",
172 word_idx: usize::MAX,
173 byte_len: 0,
174 fix: "slot word offset overflowed usize; shard the megakernel ring before host access",
175 })
176}
177
178fn read_slot_status_word(slot_bytes: &[u8]) -> Result<u32, ProtocolError> {
179 let status_offset =
180 STATUS_WORD_USIZE
181 .checked_mul(4)
182 .ok_or(ProtocolError::ByteLengthOverflow {
183 buffer: "ring slot status",
184 fix: "keep ring status word indices within host address space",
185 })?;
186 let status_end = status_offset
187 .checked_add(4)
188 .ok_or(ProtocolError::ByteLengthOverflow {
189 buffer: "ring slot status",
190 fix: "keep ring status word indices within host address space",
191 })?;
192 let bytes = slot_bytes
193 .get(status_offset..status_end)
194 .ok_or(ProtocolError::MissingWord {
195 buffer: "ring slot",
196 word_idx: STATUS_WORD_USIZE,
197 byte_len: slot_bytes.len(),
198 fix: "read a complete SLOT_BYTES slot before counting DONE status",
199 })?;
200 Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
201}
202
203impl RingProducer for HostRing {
204 fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError> {
205 if encoded.len() != SLOT_BYTES {
206 return Err(ProtocolError::MisalignedByteLength {
207 buffer: "ring slot",
208 byte_len: encoded.len(),
209 fix: "encoded slot must be exactly SLOT_BYTES (64) long",
210 });
211 }
212 if slot_idx >= self.slot_count {
213 return Err(ProtocolError::MissingWord {
214 buffer: "ring slot",
215 word_idx: ring_slot_word_index(slot_idx)?,
216 byte_len: self.bytes.len(),
217 fix: "slot_idx must be < slot_count",
218 });
219 }
220 let base = ring_slot_base(slot_idx)?;
221 self.bytes[base..base + SLOT_BYTES].copy_from_slice(encoded);
222 Ok(())
223 }
224
225 fn slot_count(&self) -> u32 {
226 self.slot_count
227 }
228
229 fn name(&self) -> &'static str {
230 "in-process-host"
231 }
232}
233
234impl RingConsumer for HostRing {
235 fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError> {
236 if out.len() != SLOT_BYTES {
237 return Err(ProtocolError::MisalignedByteLength {
238 buffer: "ring slot",
239 byte_len: out.len(),
240 fix: "out slice must be exactly SLOT_BYTES (64) long",
241 });
242 }
243 if slot_idx >= self.slot_count {
244 return Err(ProtocolError::MissingWord {
245 buffer: "ring slot",
246 word_idx: ring_slot_word_index(slot_idx)?,
247 byte_len: self.bytes.len(),
248 fix: "slot_idx must be < slot_count",
249 });
250 }
251 let base = ring_slot_base(slot_idx)?;
252 out.copy_from_slice(&self.bytes[base..base + SLOT_BYTES]);
253 Ok(())
254 }
255
256 fn try_done_count(&self) -> Result<u32, ProtocolError> {
257 let status_word_offset = STATUS_WORD_USIZE * 4;
258 let mut done = 0u32;
259 let slot_count =
260 usize::try_from(self.slot_count).map_err(|_| ProtocolError::ByteLengthOverflow {
261 buffer: "ring slot count",
262 fix: "shard the ring before host observation",
263 })?;
264 for slot in 0..slot_count {
265 let base = slot
266 .checked_mul(SLOT_BYTES)
267 .and_then(|offset| offset.checked_add(status_word_offset))
268 .ok_or(ProtocolError::ByteLengthOverflow {
269 buffer: "ring status offset",
270 fix: "shard the ring before host observation",
271 })?;
272 let end = base
273 .checked_add(4)
274 .ok_or(ProtocolError::ByteLengthOverflow {
275 buffer: "ring status offset",
276 fix: "shard the ring before host observation",
277 })?;
278 let word = read_slot_status_word(self.bytes.get(base..end).ok_or(
279 ProtocolError::MissingWord {
280 buffer: "ring slot",
281 word_idx: slot
282 .checked_mul(SLOT_WORDS_USIZE)
283 .and_then(|word| word.checked_add(STATUS_WORD_USIZE))
284 .unwrap_or(usize::MAX),
285 byte_len: self.bytes.len(),
286 fix: "slot_count and ring byte length disagree; rebuild HostRing through HostRing::new",
287 },
288 )?)?;
289 if word == protocol::slot::DONE {
290 done = done
291 .checked_add(1)
292 .ok_or(ProtocolError::ByteLengthOverflow {
293 buffer: "ring done count",
294 fix: "shard the ring before host observation",
295 })?;
296 }
297 }
298 Ok(done)
299 }
300
301 fn slot_count(&self) -> u32 {
302 self.slot_count
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309
310 #[test]
314 fn host_ring_publishes_and_round_trips_a_load_miss() {
315 let mut ring = HostRing::new(4).expect("Fix: ring constructs");
316 let encoded = protocol::encode_load_miss(123, true);
317
318 RingProducer::publish(&mut ring, 1, &encoded).expect("Fix: publish");
319
320 let mut slot_bytes = [0u8; SLOT_BYTES];
321 RingConsumer::read_slot(&ring, 1, &mut slot_bytes).expect("Fix: read_slot");
322 assert_eq!(slot_bytes.as_slice(), encoded.as_slice());
323
324 let decoded = protocol::decode_load_miss(ring.as_bytes(), 1);
327 assert_eq!(decoded, Some((123, true)));
328 }
329
330 #[test]
331 fn host_ring_rejects_out_of_range_slot() {
332 let mut ring = HostRing::new(2).unwrap();
333 let encoded = protocol::encode_load_miss(0, false);
334 let err_hi = RingProducer::publish(&mut ring, 2, &encoded).expect_err("slot 2 OOB");
335 assert!(
336 matches!(err_hi, ProtocolError::MissingWord { .. }),
337 "OOB publish error: {err_hi}"
338 );
339 let err_max =
340 RingProducer::publish(&mut ring, u32::MAX, &encoded).expect_err("slot MAX OOB");
341 assert!(
342 matches!(err_max, ProtocolError::MissingWord { .. }),
343 "MAX slot publish error: {err_max}"
344 );
345
346 let mut buf = [0u8; SLOT_BYTES];
347 let read_err = RingConsumer::read_slot(&ring, 2, &mut buf).expect_err("read OOB");
348 assert!(
349 matches!(read_err, ProtocolError::MissingWord { .. }),
350 "OOB read error: {read_err}"
351 );
352 }
353
354 #[test]
355 fn host_ring_rejects_mis_sized_encoded() {
356 let mut ring = HostRing::new(2).unwrap();
357 let short = [0u8; SLOT_BYTES - 1];
358 let short_pub = RingProducer::publish(&mut ring, 0, &short).expect_err("short publish");
359 assert!(
360 matches!(short_pub, ProtocolError::MisalignedByteLength { .. }),
361 "short publish error: {short_pub}"
362 );
363 let long = [0u8; SLOT_BYTES + 1];
364 let long_pub = RingProducer::publish(&mut ring, 0, &long).expect_err("long publish");
365 assert!(
366 matches!(long_pub, ProtocolError::MisalignedByteLength { .. }),
367 "long publish error: {long_pub}"
368 );
369
370 let mut short_out = [0u8; SLOT_BYTES - 1];
371 let short_read =
372 RingConsumer::read_slot(&ring, 0, &mut short_out).expect_err("short read buffer");
373 assert!(
374 matches!(short_read, ProtocolError::MisalignedByteLength { .. }),
375 "short read error: {short_read}"
376 );
377 }
378
379 #[test]
382 fn default_try_done_count_walks_the_ring() {
383 let mut ring = HostRing::new(4).unwrap();
384 assert_eq!(RingConsumer::try_done_count(&ring).unwrap(), 0);
386
387 let bytes = ring.as_bytes_mut();
389 let status_offset = STATUS_WORD_USIZE * 4;
390 bytes[status_offset..status_offset + 4]
391 .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
392
393 let status_offset_2 = 2 * SLOT_BYTES + STATUS_WORD_USIZE * 4;
395 bytes[status_offset_2..status_offset_2 + 4]
396 .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
397
398 assert_eq!(RingConsumer::try_done_count(&ring).unwrap(), 2);
399 }
400
401 #[test]
402 fn try_done_count_rejects_inconsistent_host_ring_bytes() {
403 let ring = HostRing {
404 bytes: vec![0u8; SLOT_BYTES],
405 slot_count: 2,
406 };
407
408 let error = RingConsumer::try_done_count(&ring)
409 .expect_err("Fix: malformed ring snapshots must not panic in fallible DONE count");
410 assert!(
411 matches!(error, ProtocolError::MissingWord { .. }),
412 "Fix: malformed ring error must explain the slot-count/byte mismatch: {error}"
413 );
414 }
415}