Skip to main content

vyre_runtime/megakernel/
ring.rs

1//! Ring producer / consumer traits for the megakernel host protocol.
2//!
3//! T036 / T037 in `VyreOffload/RELEASE_PLAN.md`. Today the protocol
4//! module ships byte-oriented `encode_*` / `decode_*` helpers and the
5//! consumer (host) drives a `Vec<u8>` ring directly. To make the ring
6//! source swappable  -  in-process host, out-of-process broker, or a
7//! GPU-direct producer  -  we lift the two halves of that contract behind
8//! traits and keep the existing path as the default in-process impl.
9//!
10//! The wire format is owned by [`super::protocol`]; this module sits
11//! one level above it (publishing/observation surface, not bytes).
12//!
13//! ### Producer
14//!
15//! [`RingProducer::publish`] writes one encoded slot. The encoded bytes
16//! come from a `protocol::encode_*` helper; the producer never inspects
17//! them beyond their length. Producers are responsible for the
18//! visibility/fence semantics the GPU expects (atomic store of the
19//! status word last); the default in-process producer does this via the
20//! protocol codec's byte ordering and memcpy.
21//!
22//! ### Consumer
23//!
24//! [`RingConsumer::read_slot`] is a read-only view of one slot's bytes.
25//! Consumers may decode with `protocol::decode_*`. A consumer is
26//! decoupled from where the bytes are stored (host RAM, GPU mirror,
27//! shared-mem broker)  -  only the byte layout matters.
28//!
29//! ### Boundary
30//!
31//! Neither trait names a consumer-specific concept (no "expert", no
32//! "MoE", no "shard"). The two traits are vyre-generic  -  see the
33//! boundary rule in `AGENTS.md`.
34
35use super::protocol::{self, ProtocolError};
36
37const SLOT_WORDS_USIZE: usize = 16;
38const STATUS_WORD_USIZE: usize = 0;
39/// Bytes per slot in the megakernel ring buffer (= `SLOT_WORDS * 4`).
40pub const SLOT_BYTES: usize = SLOT_WORDS_USIZE * 4;
41
42/// Producer half of the megakernel ring contract.
43///
44/// Implementations write encoded slot bytes (from
45/// [`protocol::encode_load_miss`] et al.) into a ring of `slot_count`
46/// fixed-size slots. The mapping from logical slot index to physical
47/// storage is the implementation's concern; consumers only see slot
48/// indices and the byte layout the protocol module defines.
49pub trait RingProducer {
50    /// Publish `encoded` into `slot_idx`. `encoded` must be exactly
51    /// [`SLOT_BYTES`] long; otherwise returns
52    /// [`ProtocolError::MisalignedByteLength`].
53    fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError>;
54
55    /// Number of slots in the underlying ring.
56    fn slot_count(&self) -> u32;
57
58    /// Stable identifier for telemetry (e.g. `"in-process-host"`,
59    /// `"uring-cmd-nvme"`, `"gds"`).
60    fn name(&self) -> &'static str;
61}
62
63/// Consumer half of the megakernel ring contract.
64pub trait RingConsumer {
65    /// Copy slot `slot_idx`'s bytes into `out`. `out` must be exactly
66    /// [`SLOT_BYTES`] long; otherwise returns
67    /// [`ProtocolError::MisalignedByteLength`].
68    fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError>;
69
70    /// Fallibly count slots currently in `DONE` status.
71    ///
72    /// The default implementation walks the ring through [`Self::read_slot`].
73    /// Specialized consumers backed by a device/control-buffer counter may
74    /// override this method to avoid host reads. Unlike [`Self::done_count`],
75    /// this surface reports malformed slot bytes and host arithmetic overflow
76    /// as [`ProtocolError`] instead of panicking.
77    fn try_done_count(&self) -> Result<u32, ProtocolError> {
78        let mut acc = 0u32;
79        let mut buf = [0u8; SLOT_BYTES];
80        for slot in 0..self.slot_count() {
81            self.read_slot(slot, &mut buf)?;
82            if read_slot_status_word(&buf)? == protocol::slot::DONE {
83                acc = acc
84                    .checked_add(1)
85                    .ok_or(ProtocolError::ByteLengthOverflow {
86                        buffer: "ring done count",
87                        fix: "shard the ring before host observation",
88                    })?;
89            }
90        }
91        Ok(acc)
92    }
93
94    /// Compatibility-only lossy count of slots currently in `DONE` status.
95    ///
96    /// Runtime paths must call [`Self::try_done_count`] so malformed snapshots
97    /// and host arithmetic overflow remain observable as [`ProtocolError`].
98    #[deprecated(
99        note = "use RingConsumer::try_done_count so malformed ring snapshots do not collapse to zero"
100    )]
101    fn done_count(&self) -> u32 {
102        self.try_done_count().unwrap_or(0)
103    }
104
105    /// Number of slots in the underlying ring.
106    fn slot_count(&self) -> u32;
107}
108
109/// Default in-process ring backed by a `Vec<u8>`. Both [`RingProducer`]
110/// and [`RingConsumer`] are implemented on a single `&mut` /`&` borrow
111/// so the producer-consumer parity test can drive both halves with the
112/// same buffer.
113pub struct HostRing {
114    bytes: Vec<u8>,
115    slot_count: u32,
116}
117
118impl HostRing {
119    /// Allocate a new ring of `slot_count` empty slots.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`ProtocolError::ByteLengthOverflow`] if `slot_count`
124    /// exceeds [`protocol::MAX_ENCODED_RING_SLOTS`].
125    pub fn new(slot_count: u32) -> Result<Self, ProtocolError> {
126        let bytes = protocol::try_encode_empty_ring(slot_count)?;
127        Ok(Self { bytes, slot_count })
128    }
129
130    /// Borrow the underlying ring bytes (for the dispatch path that
131    /// still consumes `&[u8]` directly).
132    #[must_use]
133    pub fn as_bytes(&self) -> &[u8] {
134        &self.bytes
135    }
136
137    /// Mutably borrow the underlying ring bytes.
138    #[must_use]
139    pub fn as_bytes_mut(&mut self) -> &mut [u8] {
140        &mut self.bytes
141    }
142}
143
144fn ring_slot_base(slot_idx: u32) -> Result<usize, ProtocolError> {
145    usize::try_from(slot_idx)
146        .map_err(|_| ProtocolError::MissingWord {
147            buffer: "ring slot",
148            word_idx: usize::MAX,
149            byte_len: 0,
150            fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
151        })?
152        .checked_mul(SLOT_BYTES)
153        .ok_or(ProtocolError::MissingWord {
154            buffer: "ring slot",
155            word_idx: usize::MAX,
156            byte_len: 0,
157            fix: "slot byte offset overflowed usize; shard the megakernel ring before host access",
158        })
159}
160
161fn ring_slot_word_index(slot_idx: u32) -> Result<usize, ProtocolError> {
162    usize::try_from(slot_idx)
163        .map_err(|_| ProtocolError::MissingWord {
164            buffer: "ring slot",
165            word_idx: usize::MAX,
166            byte_len: 0,
167            fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
168        })?
169        .checked_mul(SLOT_WORDS_USIZE)
170        .ok_or(ProtocolError::MissingWord {
171            buffer: "ring slot",
172            word_idx: usize::MAX,
173            byte_len: 0,
174            fix: "slot word offset overflowed usize; shard the megakernel ring before host access",
175        })
176}
177
178fn read_slot_status_word(slot_bytes: &[u8]) -> Result<u32, ProtocolError> {
179    let status_offset =
180        STATUS_WORD_USIZE
181            .checked_mul(4)
182            .ok_or(ProtocolError::ByteLengthOverflow {
183                buffer: "ring slot status",
184                fix: "keep ring status word indices within host address space",
185            })?;
186    let status_end = status_offset
187        .checked_add(4)
188        .ok_or(ProtocolError::ByteLengthOverflow {
189            buffer: "ring slot status",
190            fix: "keep ring status word indices within host address space",
191        })?;
192    let bytes = slot_bytes
193        .get(status_offset..status_end)
194        .ok_or(ProtocolError::MissingWord {
195            buffer: "ring slot",
196            word_idx: STATUS_WORD_USIZE,
197            byte_len: slot_bytes.len(),
198            fix: "read a complete SLOT_BYTES slot before counting DONE status",
199        })?;
200    Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
201}
202
203impl RingProducer for HostRing {
204    fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError> {
205        if encoded.len() != SLOT_BYTES {
206            return Err(ProtocolError::MisalignedByteLength {
207                buffer: "ring slot",
208                byte_len: encoded.len(),
209                fix: "encoded slot must be exactly SLOT_BYTES (64) long",
210            });
211        }
212        if slot_idx >= self.slot_count {
213            return Err(ProtocolError::MissingWord {
214                buffer: "ring slot",
215                word_idx: ring_slot_word_index(slot_idx)?,
216                byte_len: self.bytes.len(),
217                fix: "slot_idx must be < slot_count",
218            });
219        }
220        let base = ring_slot_base(slot_idx)?;
221        self.bytes[base..base + SLOT_BYTES].copy_from_slice(encoded);
222        Ok(())
223    }
224
225    fn slot_count(&self) -> u32 {
226        self.slot_count
227    }
228
229    fn name(&self) -> &'static str {
230        "in-process-host"
231    }
232}
233
234impl RingConsumer for HostRing {
235    fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError> {
236        if out.len() != SLOT_BYTES {
237            return Err(ProtocolError::MisalignedByteLength {
238                buffer: "ring slot",
239                byte_len: out.len(),
240                fix: "out slice must be exactly SLOT_BYTES (64) long",
241            });
242        }
243        if slot_idx >= self.slot_count {
244            return Err(ProtocolError::MissingWord {
245                buffer: "ring slot",
246                word_idx: ring_slot_word_index(slot_idx)?,
247                byte_len: self.bytes.len(),
248                fix: "slot_idx must be < slot_count",
249            });
250        }
251        let base = ring_slot_base(slot_idx)?;
252        out.copy_from_slice(&self.bytes[base..base + SLOT_BYTES]);
253        Ok(())
254    }
255
256    fn try_done_count(&self) -> Result<u32, ProtocolError> {
257        let status_word_offset = STATUS_WORD_USIZE * 4;
258        let mut done = 0u32;
259        let slot_count =
260            usize::try_from(self.slot_count).map_err(|_| ProtocolError::ByteLengthOverflow {
261                buffer: "ring slot count",
262                fix: "shard the ring before host observation",
263            })?;
264        for slot in 0..slot_count {
265            let base = slot
266                .checked_mul(SLOT_BYTES)
267                .and_then(|offset| offset.checked_add(status_word_offset))
268                .ok_or(ProtocolError::ByteLengthOverflow {
269                    buffer: "ring status offset",
270                    fix: "shard the ring before host observation",
271                })?;
272            let end = base
273                .checked_add(4)
274                .ok_or(ProtocolError::ByteLengthOverflow {
275                    buffer: "ring status offset",
276                    fix: "shard the ring before host observation",
277                })?;
278            let word = read_slot_status_word(self.bytes.get(base..end).ok_or(
279                ProtocolError::MissingWord {
280                    buffer: "ring slot",
281                    word_idx: slot
282                        .checked_mul(SLOT_WORDS_USIZE)
283                        .and_then(|word| word.checked_add(STATUS_WORD_USIZE))
284                        .unwrap_or(usize::MAX),
285                    byte_len: self.bytes.len(),
286                    fix: "slot_count and ring byte length disagree; rebuild HostRing through HostRing::new",
287                },
288            )?)?;
289            if word == protocol::slot::DONE {
290                done = done
291                    .checked_add(1)
292                    .ok_or(ProtocolError::ByteLengthOverflow {
293                        buffer: "ring done count",
294                        fix: "shard the ring before host observation",
295                    })?;
296            }
297        }
298        Ok(done)
299    }
300
301    fn slot_count(&self) -> u32 {
302        self.slot_count
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309
310    /// Parity: a slot published via the trait must round-trip through
311    /// the consumer trait and decode identically via the existing
312    /// `protocol::decode_load_miss` helper.
313    #[test]
314    fn host_ring_publishes_and_round_trips_a_load_miss() {
315        let mut ring = HostRing::new(4).expect("Fix: ring constructs");
316        let encoded = protocol::encode_load_miss(123, true);
317
318        RingProducer::publish(&mut ring, 1, &encoded).expect("Fix: publish");
319
320        let mut slot_bytes = [0u8; SLOT_BYTES];
321        RingConsumer::read_slot(&ring, 1, &mut slot_bytes).expect("Fix: read_slot");
322        assert_eq!(slot_bytes.as_slice(), encoded.as_slice());
323
324        // And, importantly, the existing decoder must read it back from
325        // the ring bytes at slot 1.
326        let decoded = protocol::decode_load_miss(ring.as_bytes(), 1);
327        assert_eq!(decoded, Some((123, true)));
328    }
329
330    #[test]
331    fn host_ring_rejects_out_of_range_slot() {
332        let mut ring = HostRing::new(2).unwrap();
333        let encoded = protocol::encode_load_miss(0, false);
334        let err_hi = RingProducer::publish(&mut ring, 2, &encoded).expect_err("slot 2 OOB");
335        assert!(
336            matches!(err_hi, ProtocolError::MissingWord { .. }),
337            "OOB publish error: {err_hi}"
338        );
339        let err_max =
340            RingProducer::publish(&mut ring, u32::MAX, &encoded).expect_err("slot MAX OOB");
341        assert!(
342            matches!(err_max, ProtocolError::MissingWord { .. }),
343            "MAX slot publish error: {err_max}"
344        );
345
346        let mut buf = [0u8; SLOT_BYTES];
347        let read_err = RingConsumer::read_slot(&ring, 2, &mut buf).expect_err("read OOB");
348        assert!(
349            matches!(read_err, ProtocolError::MissingWord { .. }),
350            "OOB read error: {read_err}"
351        );
352    }
353
354    #[test]
355    fn host_ring_rejects_mis_sized_encoded() {
356        let mut ring = HostRing::new(2).unwrap();
357        let short = [0u8; SLOT_BYTES - 1];
358        let short_pub = RingProducer::publish(&mut ring, 0, &short).expect_err("short publish");
359        assert!(
360            matches!(short_pub, ProtocolError::MisalignedByteLength { .. }),
361            "short publish error: {short_pub}"
362        );
363        let long = [0u8; SLOT_BYTES + 1];
364        let long_pub = RingProducer::publish(&mut ring, 0, &long).expect_err("long publish");
365        assert!(
366            matches!(long_pub, ProtocolError::MisalignedByteLength { .. }),
367            "long publish error: {long_pub}"
368        );
369
370        let mut short_out = [0u8; SLOT_BYTES - 1];
371        let short_read =
372            RingConsumer::read_slot(&ring, 0, &mut short_out).expect_err("short read buffer");
373        assert!(
374            matches!(short_read, ProtocolError::MisalignedByteLength { .. }),
375            "short read error: {short_read}"
376        );
377    }
378
379    /// Default try_done_count walks the ring; if we stamp DONE into a slot's
380    /// status word manually it must show up in the count.
381    #[test]
382    fn default_try_done_count_walks_the_ring() {
383        let mut ring = HostRing::new(4).unwrap();
384        // Empty ring: done count is 0.
385        assert_eq!(RingConsumer::try_done_count(&ring).unwrap(), 0);
386
387        // Stamp DONE into slot 0's status word.
388        let bytes = ring.as_bytes_mut();
389        let status_offset = STATUS_WORD_USIZE * 4;
390        bytes[status_offset..status_offset + 4]
391            .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
392
393        // And into slot 2's status word.
394        let status_offset_2 = 2 * SLOT_BYTES + STATUS_WORD_USIZE * 4;
395        bytes[status_offset_2..status_offset_2 + 4]
396            .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
397
398        assert_eq!(RingConsumer::try_done_count(&ring).unwrap(), 2);
399    }
400
401    #[test]
402    fn try_done_count_rejects_inconsistent_host_ring_bytes() {
403        let ring = HostRing {
404            bytes: vec![0u8; SLOT_BYTES],
405            slot_count: 2,
406        };
407
408        let error = RingConsumer::try_done_count(&ring)
409            .expect_err("Fix: malformed ring snapshots must not panic in fallible DONE count");
410        assert!(
411            matches!(error, ProtocolError::MissingWord { .. }),
412            "Fix: malformed ring error must explain the slot-count/byte mismatch: {error}"
413        );
414    }
415}