Skip to main content

vyre_runtime/megakernel/
ring.rs

1//! Ring producer / consumer traits for the megakernel host protocol.
2//!
3//! T036 / T037 in `VyreOffload/RELEASE_PLAN.md`. Today the protocol
4//! module ships byte-oriented `encode_*` / `decode_*` helpers and the
5//! consumer (host) drives a `Vec<u8>` ring directly. To make the ring
6//! source swappable  -  in-process host, out-of-process broker, or a
7//! GPU-direct producer  -  we lift the two halves of that contract behind
8//! traits and keep the existing path as the default in-process impl.
9//!
10//! The wire format is owned by [`super::protocol`]; this module sits
11//! one level above it (publishing/observation surface, not bytes).
12//!
13//! ### Producer
14//!
15//! [`RingProducer::publish`] writes one encoded slot. The encoded bytes
16//! come from a `protocol::encode_*` helper; the producer never inspects
17//! them beyond their length. Producers are responsible for the
18//! visibility/fence semantics the GPU expects (atomic store of the
19//! status word last); the default in-process producer does this via the
20//! protocol codec's byte ordering and memcpy.
21//!
22//! ### Consumer
23//!
24//! [`RingConsumer::read_slot`] is a read-only view of one slot's bytes.
25//! Consumers may decode with `protocol::decode_*`. A consumer is
26//! decoupled from where the bytes are stored (host RAM, GPU mirror,
27//! shared-mem broker)  -  only the byte layout matters.
28//!
29//! ### Boundary
30//!
31//! Neither trait names a consumer-specific concept (no "expert", no
32//! "MoE", no "shard"). The two traits are vyre-generic  -  see the
33//! boundary rule in `AGENTS.md`.
34
35use super::protocol::{self, ProtocolError};
36
37const SLOT_WORDS_USIZE: usize = 16;
38const STATUS_WORD_USIZE: usize = 0;
39/// Bytes per slot in the megakernel ring buffer (= `SLOT_WORDS * 4`).
40pub const SLOT_BYTES: usize = SLOT_WORDS_USIZE * 4;
41
42/// Producer half of the megakernel ring contract.
43///
44/// Implementations write encoded slot bytes (from
45/// [`protocol::encode_load_miss`] et al.) into a ring of `slot_count`
46/// fixed-size slots. The mapping from logical slot index to physical
47/// storage is the implementation's concern; consumers only see slot
48/// indices and the byte layout the protocol module defines.
49pub trait RingProducer {
50    /// Publish `encoded` into `slot_idx`. `encoded` must be exactly
51    /// [`SLOT_BYTES`] long; otherwise returns
52    /// [`ProtocolError::MisalignedByteLength`].
53    fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError>;
54
55    /// Number of slots in the underlying ring.
56    fn slot_count(&self) -> u32;
57
58    /// Stable identifier for telemetry (e.g. `"in-process-host"`,
59    /// `"uring-cmd-nvme"`, `"gds"`).
60    fn name(&self) -> &'static str;
61}
62
63/// Consumer half of the megakernel ring contract.
64pub trait RingConsumer {
65    /// Copy slot `slot_idx`'s bytes into `out`. `out` must be exactly
66    /// [`SLOT_BYTES`] long; otherwise returns
67    /// [`ProtocolError::MisalignedByteLength`].
68    fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError>;
69
70    /// Count slots currently in `DONE` status. The default
71    /// implementation walks the ring; specialized consumers (e.g. ones
72    /// backed by a control-buffer counter) may override.
73    fn done_count(&self) -> u32 {
74        let mut acc = 0u32;
75        let mut buf = [0u8; SLOT_BYTES];
76        for slot in 0..self.slot_count() {
77            if self.read_slot(slot, &mut buf).is_err() {
78                continue;
79            }
80            let status_offset = STATUS_WORD_USIZE * 4;
81            let word = u32::from_le_bytes([
82                buf[status_offset],
83                buf[status_offset + 1],
84                buf[status_offset + 2],
85                buf[status_offset + 3],
86            ]);
87            if word == protocol::slot::DONE {
88                acc = acc.checked_add(1).unwrap_or_else(|| {
89                    panic!(
90                        "megakernel ring consumer done_count overflowed u32. Fix: shard the ring before host observation."
91                    )
92                });
93            }
94        }
95        acc
96    }
97
98    /// Number of slots in the underlying ring.
99    fn slot_count(&self) -> u32;
100}
101
102/// Default in-process ring backed by a `Vec<u8>`. Both [`RingProducer`]
103/// and [`RingConsumer`] are implemented on a single `&mut` /`&` borrow
104/// so the producer-consumer parity test can drive both halves with the
105/// same buffer.
106pub struct HostRing {
107    bytes: Vec<u8>,
108    slot_count: u32,
109}
110
111impl HostRing {
112    /// Allocate a new ring of `slot_count` empty slots.
113    ///
114    /// # Errors
115    ///
116    /// Returns [`ProtocolError::ByteLengthOverflow`] if `slot_count`
117    /// exceeds [`protocol::MAX_ENCODED_RING_SLOTS`].
118    pub fn new(slot_count: u32) -> Result<Self, ProtocolError> {
119        let bytes = protocol::try_encode_empty_ring(slot_count)?;
120        Ok(Self { bytes, slot_count })
121    }
122
123    /// Borrow the underlying ring bytes (for the dispatch path that
124    /// still consumes `&[u8]` directly).
125    #[must_use]
126    pub fn as_bytes(&self) -> &[u8] {
127        &self.bytes
128    }
129
130    /// Mutably borrow the underlying ring bytes.
131    #[must_use]
132    pub fn as_bytes_mut(&mut self) -> &mut [u8] {
133        &mut self.bytes
134    }
135}
136
137fn ring_slot_base(slot_idx: u32) -> Result<usize, ProtocolError> {
138    usize::try_from(slot_idx)
139        .map_err(|_| ProtocolError::MissingWord {
140            buffer: "ring slot",
141            word_idx: usize::MAX,
142            byte_len: 0,
143            fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
144        })?
145        .checked_mul(SLOT_BYTES)
146        .ok_or(ProtocolError::MissingWord {
147            buffer: "ring slot",
148            word_idx: usize::MAX,
149            byte_len: 0,
150            fix: "slot byte offset overflowed usize; shard the megakernel ring before host access",
151        })
152}
153
154fn ring_slot_word_index(slot_idx: u32) -> Result<usize, ProtocolError> {
155    usize::try_from(slot_idx)
156        .map_err(|_| ProtocolError::MissingWord {
157            buffer: "ring slot",
158            word_idx: usize::MAX,
159            byte_len: 0,
160            fix: "slot_idx cannot fit host usize; shard the megakernel ring before host access",
161        })?
162        .checked_mul(SLOT_WORDS_USIZE)
163        .ok_or(ProtocolError::MissingWord {
164            buffer: "ring slot",
165            word_idx: usize::MAX,
166            byte_len: 0,
167            fix: "slot word offset overflowed usize; shard the megakernel ring before host access",
168        })
169}
170
171impl RingProducer for HostRing {
172    fn publish(&mut self, slot_idx: u32, encoded: &[u8]) -> Result<(), ProtocolError> {
173        if encoded.len() != SLOT_BYTES {
174            return Err(ProtocolError::MisalignedByteLength {
175                buffer: "ring slot",
176                byte_len: encoded.len(),
177                fix: "encoded slot must be exactly SLOT_BYTES (64) long",
178            });
179        }
180        if slot_idx >= self.slot_count {
181            return Err(ProtocolError::MissingWord {
182                buffer: "ring slot",
183                word_idx: ring_slot_word_index(slot_idx)?,
184                byte_len: self.bytes.len(),
185                fix: "slot_idx must be < slot_count",
186            });
187        }
188        let base = ring_slot_base(slot_idx)?;
189        self.bytes[base..base + SLOT_BYTES].copy_from_slice(encoded);
190        Ok(())
191    }
192
193    fn slot_count(&self) -> u32 {
194        self.slot_count
195    }
196
197    fn name(&self) -> &'static str {
198        "in-process-host"
199    }
200}
201
202impl RingConsumer for HostRing {
203    fn read_slot(&self, slot_idx: u32, out: &mut [u8]) -> Result<(), ProtocolError> {
204        if out.len() != SLOT_BYTES {
205            return Err(ProtocolError::MisalignedByteLength {
206                buffer: "ring slot",
207                byte_len: out.len(),
208                fix: "out slice must be exactly SLOT_BYTES (64) long",
209            });
210        }
211        if slot_idx >= self.slot_count {
212            return Err(ProtocolError::MissingWord {
213                buffer: "ring slot",
214                word_idx: ring_slot_word_index(slot_idx)?,
215                byte_len: self.bytes.len(),
216                fix: "slot_idx must be < slot_count",
217            });
218        }
219        let base = ring_slot_base(slot_idx)?;
220        out.copy_from_slice(&self.bytes[base..base + SLOT_BYTES]);
221        Ok(())
222    }
223
224    fn done_count(&self) -> u32 {
225        let status_word_offset = STATUS_WORD_USIZE * 4;
226        let mut done = 0u32;
227        let slot_count = usize::try_from(self.slot_count).unwrap_or_else(|source| {
228            panic!(
229                "megakernel ring slot_count cannot fit usize: {source}. Fix: shard the ring before host observation."
230            )
231        });
232        for slot in 0..slot_count {
233            let base = slot
234                .checked_mul(SLOT_BYTES)
235                .and_then(|offset| offset.checked_add(status_word_offset))
236                .unwrap_or_else(|| {
237                    panic!(
238                        "megakernel ring status byte offset overflowed usize. Fix: shard the ring before host observation."
239                    )
240                });
241            let word = u32::from_le_bytes([
242                self.bytes[base],
243                self.bytes[base + 1],
244                self.bytes[base + 2],
245                self.bytes[base + 3],
246            ]);
247            if word == protocol::slot::DONE {
248                done = done.checked_add(1).unwrap_or_else(|| {
249                    panic!(
250                        "megakernel ring done count overflowed u32. Fix: shard the ring before host observation."
251                    )
252                });
253            }
254        }
255        done
256    }
257
258    fn slot_count(&self) -> u32 {
259        self.slot_count
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    /// Parity: a slot published via the trait must round-trip through
268    /// the consumer trait and decode identically via the existing
269    /// `protocol::decode_load_miss` helper.
270    #[test]
271    fn host_ring_publishes_and_round_trips_a_load_miss() {
272        let mut ring = HostRing::new(4).expect("Fix: ring constructs");
273        let encoded = protocol::encode_load_miss(123, true);
274
275        RingProducer::publish(&mut ring, 1, &encoded).expect("Fix: publish");
276
277        let mut slot_bytes = [0u8; SLOT_BYTES];
278        RingConsumer::read_slot(&ring, 1, &mut slot_bytes).expect("Fix: read_slot");
279        assert_eq!(slot_bytes.as_slice(), encoded.as_slice());
280
281        // And, importantly, the existing decoder must read it back from
282        // the ring bytes at slot 1.
283        let decoded = protocol::decode_load_miss(ring.as_bytes(), 1);
284        assert_eq!(decoded, Some((123, true)));
285    }
286
287    #[test]
288    fn host_ring_rejects_out_of_range_slot() {
289        let mut ring = HostRing::new(2).unwrap();
290        let encoded = protocol::encode_load_miss(0, false);
291        let err_hi = RingProducer::publish(&mut ring, 2, &encoded).expect_err("slot 2 OOB");
292        assert!(
293            err_hi.to_string().contains("slot") || err_hi.to_string().contains("range"),
294            "OOB publish error: {err_hi}"
295        );
296        let err_max =
297            RingProducer::publish(&mut ring, u32::MAX, &encoded).expect_err("slot MAX OOB");
298        assert!(
299            err_max.to_string().contains("slot") || err_max.to_string().contains("range"),
300            "MAX slot publish error: {err_max}"
301        );
302
303        let mut buf = [0u8; SLOT_BYTES];
304        let read_err = RingConsumer::read_slot(&ring, 2, &mut buf).expect_err("read OOB");
305        assert!(
306            read_err.to_string().contains("slot") || read_err.to_string().contains("range"),
307            "OOB read error: {read_err}"
308        );
309    }
310
311    #[test]
312    fn host_ring_rejects_mis_sized_encoded() {
313        let mut ring = HostRing::new(2).unwrap();
314        let short = [0u8; SLOT_BYTES - 1];
315        let short_pub = RingProducer::publish(&mut ring, 0, &short).expect_err("short publish");
316        assert!(
317            short_pub.to_string().contains("SLOT") || short_pub.to_string().contains("byte"),
318            "short publish error: {short_pub}"
319        );
320        let long = [0u8; SLOT_BYTES + 1];
321        let long_pub = RingProducer::publish(&mut ring, 0, &long).expect_err("long publish");
322        assert!(
323            long_pub.to_string().contains("SLOT") || long_pub.to_string().contains("byte"),
324            "long publish error: {long_pub}"
325        );
326
327        let mut short_out = [0u8; SLOT_BYTES - 1];
328        let short_read =
329            RingConsumer::read_slot(&ring, 0, &mut short_out).expect_err("short read buffer");
330        assert!(
331            short_read.to_string().contains("SLOT") || short_read.to_string().contains("byte"),
332            "short read error: {short_read}"
333        );
334    }
335
336    /// Default done_count walks the ring; if we stamp DONE into a slot's
337    /// status word manually it must show up in the count.
338    #[test]
339    fn default_done_count_walks_the_ring() {
340        let mut ring = HostRing::new(4).unwrap();
341        // Empty ring: done_count is 0.
342        assert_eq!(RingConsumer::done_count(&ring), 0);
343
344        // Stamp DONE into slot 0's status word.
345        let bytes = ring.as_bytes_mut();
346        let status_offset = STATUS_WORD_USIZE * 4;
347        bytes[status_offset..status_offset + 4]
348            .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
349
350        // And into slot 2's status word.
351        let status_offset_2 = 2 * SLOT_BYTES + STATUS_WORD_USIZE * 4;
352        bytes[status_offset_2..status_offset_2 + 4]
353            .copy_from_slice(&protocol::slot::DONE.to_le_bytes());
354
355        assert_eq!(RingConsumer::done_count(&ring), 2);
356    }
357}