Skip to main content

vyre_runtime/megakernel/
descriptor.rs

1//! Typed host-side descriptors for publishing work into the megakernel ring.
2//!
3//! Wrappers such as VyreOffload should not have to hand-assemble
4//! `(opcode, tenant_id, args)` tuples or know when to switch to the
5//! packed-slot path. These descriptors provide an additive typed API
6//! over the existing wire protocol.
7
8use super::staging_reserve::reserve_vec_capacity as reserve_descriptor_vec;
9use crate::PipelineError;
10
11use smallvec::SmallVec;
12
13const ARGS_PER_SLOT_USIZE: usize = 12;
14
15use super::{protocol, Megakernel};
16
17/// Built-in megakernel opcodes exposed as a typed host API.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum BuiltinOpcode {
20    /// No-op heartbeat / probe.
21    Nop,
22    /// `control[arg1] = arg0`.
23    StoreU32,
24    /// `atomic_add(control[arg1], arg0)`.
25    AtomicAdd,
26    /// `control[OBSERVABLE_BASE + arg1] = control[arg0]`.
27    LoadU32,
28    /// Compare-and-swap on `control[arg0]`.
29    CompareSwap,
30    /// Copy `arg2` words from `control[arg0]` to `control[arg1]`.
31    Memcpy,
32    /// Single DFA transition step.
33    DfaStep,
34    /// Batch fence / epoch bump.
35    BatchFence,
36    /// Emit a debug log record.
37    Printf,
38    /// Set `SHUTDOWN=1`.
39    Shutdown,
40}
41
42impl BuiltinOpcode {
43    /// Underlying wire opcode.
44    #[must_use]
45    pub const fn into_wire(self) -> u32 {
46        match self {
47            Self::Nop => protocol::opcode::NOP,
48            Self::StoreU32 => protocol::opcode::STORE_U32,
49            Self::AtomicAdd => protocol::opcode::ATOMIC_ADD,
50            Self::LoadU32 => protocol::opcode::LOAD_U32,
51            Self::CompareSwap => protocol::opcode::COMPARE_SWAP,
52            Self::Memcpy => protocol::opcode::MEMCPY,
53            Self::DfaStep => protocol::opcode::DFA_STEP,
54            Self::BatchFence => protocol::opcode::BATCH_FENCE,
55            Self::Printf => protocol::opcode::PRINTF,
56            Self::Shutdown => protocol::opcode::SHUTDOWN,
57        }
58    }
59}
60
61/// A slot opcode can target either a builtin wire opcode or a caller-defined
62/// extension registered via an opcode handler (see `handlers` module).
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SlotOpcode {
65    /// One of the frozen builtins in [`protocol::opcode`].
66    Builtin(BuiltinOpcode),
67    /// A custom extension opcode.
68    Custom(u32),
69}
70
71impl SlotOpcode {
72    /// Underlying wire opcode.
73    #[must_use]
74    pub const fn into_wire(self) -> u32 {
75        match self {
76            Self::Builtin(op) => op.into_wire(),
77            Self::Custom(op) => op,
78        }
79    }
80}
81
82/// One packed inner-op inside a `PACKED_SLOT`.
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct PackedOpDescriptor {
85    /// Inner opcode id. Must fit in `u8` due to the current wire format.
86    pub opcode: u8,
87    /// Positional `u32` arguments for the inner opcode.
88    pub args: Vec<u32>,
89}
90
91impl PackedOpDescriptor {
92    /// Convenience constructor.
93    #[must_use]
94    pub fn new(opcode: u8, args: Vec<u32>) -> Self {
95        Self { opcode, args }
96    }
97}
98
99/// One top-level slot publication request.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum SlotDescriptor {
102    /// Publish one normal slot.
103    Single {
104        /// Tenant id used for the runtime's authorization mask.
105        tenant_id: u32,
106        /// Slot opcode.
107        opcode: SlotOpcode,
108        /// Positional `u32` arguments.
109        args: Vec<u32>,
110    },
111    /// Publish one packed slot containing several inner ops.
112    Packed {
113        /// Tenant id used for the runtime's authorization mask.
114        tenant_id: u32,
115        /// Inner packed ops.
116        ops: Vec<PackedOpDescriptor>,
117    },
118}
119
120impl SlotDescriptor {
121    /// Build a simple slot descriptor.
122    #[must_use]
123    pub fn single(tenant_id: u32, opcode: SlotOpcode, args: Vec<u32>) -> Self {
124        Self::Single {
125            tenant_id,
126            opcode,
127            args,
128        }
129    }
130
131    /// Build a packed-slot descriptor.
132    #[must_use]
133    pub fn packed(tenant_id: u32, ops: Vec<PackedOpDescriptor>) -> Self {
134        Self::Packed { tenant_id, ops }
135    }
136
137    /// Publish this slot into the ring at `slot_idx`.
138    ///
139    /// # Errors
140    ///
141    /// Propagates any wire-level publication error from the underlying ring
142    /// protocol helpers.
143    pub fn publish_into(&self, ring_bytes: &mut [u8], slot_idx: u32) -> Result<(), PipelineError> {
144        match self {
145            Self::Single {
146                tenant_id,
147                opcode,
148                args,
149            } => {
150                Megakernel::publish_slot(ring_bytes, slot_idx, *tenant_id, opcode.into_wire(), args)
151            }
152            Self::Packed { tenant_id, ops } => {
153                Megakernel::publish_packed_descriptors(ring_bytes, slot_idx, *tenant_id, ops)
154            }
155        }
156    }
157}
158
159/// A typed batch publication request.
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct BatchDescriptor {
162    /// Slot index where the first item should be written.
163    pub start_slot: u32,
164    /// Items to publish in order.
165    pub items: Vec<SlotDescriptor>,
166}
167
168impl BatchDescriptor {
169    /// Convenience constructor.
170    #[must_use]
171    pub fn new(start_slot: u32, items: Vec<SlotDescriptor>) -> Self {
172        Self { start_slot, items }
173    }
174
175    /// Publish all items into the ring. Returns the number of slots consumed.
176    ///
177    /// # Errors
178    ///
179    /// Propagates any slot publication error.
180    pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
181        let item_count = u32::try_from(self.items.len()).map_err(|_| PipelineError::QueueFull {
182            queue: "submission",
183            fix: "batch size exceeds u32::MAX slots",
184        })?;
185        if item_count > 0 {
186            self.start_slot
187                .checked_add(item_count - 1)
188                .ok_or(PipelineError::QueueFull {
189                    queue: "submission",
190                    fix: "batch start plus item count overflows u32; split the descriptor batch before publishing",
191                })?;
192        }
193        for (slot_offset, item) in (0..item_count).zip(self.items.iter()) {
194            let slot_idx = self
195                .start_slot
196                .checked_add(slot_offset)
197                .ok_or(PipelineError::QueueFull {
198                queue: "submission",
199                fix:
200                    "batch slot index overflowed u32; split the descriptor batch before publishing",
201            })?;
202            item.publish_into(ring_bytes, slot_idx)?;
203        }
204        Ok(item_count)
205    }
206}
207
208/// Classification for items published inside a window descriptor.
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum WindowClass {
211    /// Required work that must converge for the window to be usable.
212    Required,
213    /// Lookahead work that improves the next step but is not on the immediate critical path.
214    Lookahead,
215}
216
217impl WindowClass {
218    /// Stable on-the-wire encoding  -  `Required` = 0, `Lookahead` = 1.
219    #[must_use]
220    pub const fn into_wire(self) -> u32 {
221        match self {
222            Self::Required => 0,
223            Self::Lookahead => 1,
224        }
225    }
226}
227
228/// A ticketed window of related slot publications.
229///
230/// Each emitted slot receives a stable prefix of `[window_ticket, class_tag]`
231/// followed by the caller-supplied payload, so wrappers can submit required and
232/// lookahead work as one structured batch without hand-assembling the prefix.
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct WindowDescriptor {
235    /// Slot index where the first window item should be written.
236    pub start_slot: u32,
237    /// Tenant id used for all emitted slots.
238    pub tenant_id: u32,
239    /// Slot opcode shared by all emitted slots.
240    pub opcode: SlotOpcode,
241    /// Stable ticket id correlating every slot in this window.
242    pub ticket: u32,
243    /// Required entries for the window.
244    pub required: Vec<Vec<u32>>,
245    /// Lookahead entries for the window.
246    pub lookahead: Vec<Vec<u32>>,
247}
248
249impl WindowDescriptor {
250    /// Convenience constructor.
251    #[must_use]
252    pub fn new(
253        start_slot: u32,
254        tenant_id: u32,
255        opcode: SlotOpcode,
256        ticket: u32,
257        required: Vec<Vec<u32>>,
258        lookahead: Vec<Vec<u32>>,
259    ) -> Self {
260        Self {
261            start_slot,
262            tenant_id,
263            opcode,
264            ticket,
265            required,
266            lookahead,
267        }
268    }
269
270    /// Convert the window into a typed batch publication.
271    #[must_use]
272    pub fn into_batch(&self) -> BatchDescriptor {
273        match self.try_into_batch() {
274            Ok(batch) => batch,
275            Err(error) => panic!("{error}"),
276        }
277    }
278
279    /// Convert the window into a typed batch publication with explicit staging
280    /// and ABI-bound errors.
281    pub fn try_into_batch(&self) -> Result<BatchDescriptor, PipelineError> {
282        let item_count = self
283            .required
284            .len()
285            .checked_add(self.lookahead.len())
286            .ok_or(PipelineError::QueueFull {
287            queue: "submission",
288            fix:
289                "window item count overflowed usize; split the window before materializing a batch",
290        })?;
291        let mut items = Vec::new();
292        reserve_descriptor_vec(&mut items, item_count, "window batch item")?;
293        for payload in &self.required {
294            let mut args = window_payload_args(self.ticket, WindowClass::Required, payload)?;
295            args.push(self.ticket);
296            args.push(WindowClass::Required.into_wire());
297            args.extend(payload.iter().copied());
298            items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
299        }
300        for payload in &self.lookahead {
301            let mut args = window_payload_args(self.ticket, WindowClass::Lookahead, payload)?;
302            args.push(self.ticket);
303            args.push(WindowClass::Lookahead.into_wire());
304            args.extend(payload.iter().copied());
305            items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
306        }
307        Ok(BatchDescriptor::new(self.start_slot, items))
308    }
309
310    /// Publish the full window into the ring and return the number of emitted slots.
311    pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
312        let consumed = self
313            .required
314            .len()
315            .checked_add(self.lookahead.len())
316            .ok_or(PipelineError::QueueFull {
317                queue: "submission",
318                fix: "window item count overflowed usize; split the window before publishing",
319            })?;
320        let consumed_u32 = u32::try_from(consumed).map_err(|_| PipelineError::QueueFull {
321            queue: "submission",
322            fix: "window size exceeds u32::MAX slots; split the window before publishing",
323        })?;
324        if consumed_u32 == 0 {
325            return Ok(0);
326        }
327        self.start_slot
328            .checked_add(consumed_u32 - 1)
329            .ok_or(PipelineError::QueueFull {
330                queue: "submission",
331                fix: "window start plus item count overflows u32; split the window before publishing",
332            })?;
333
334        let mut slot_offset = 0u32;
335        let mut args = SmallVec::<[u32; ARGS_PER_SLOT_USIZE]>::new();
336        for payload in &self.required {
337            publish_window_payload(
338                ring_bytes,
339                self.start_slot,
340                &mut slot_offset,
341                self.tenant_id,
342                self.opcode,
343                self.ticket,
344                WindowClass::Required,
345                payload,
346                &mut args,
347            )?;
348        }
349        for payload in &self.lookahead {
350            publish_window_payload(
351                ring_bytes,
352                self.start_slot,
353                &mut slot_offset,
354                self.tenant_id,
355                self.opcode,
356                self.ticket,
357                WindowClass::Lookahead,
358                payload,
359                &mut args,
360            )?;
361        }
362        Ok(slot_offset)
363    }
364}
365
366fn window_payload_args(
367    _ticket: u32,
368    _class: WindowClass,
369    payload: &[u32],
370) -> Result<Vec<u32>, PipelineError> {
371    let required_args = payload
372        .len()
373        .checked_add(2)
374        .ok_or(PipelineError::QueueFull {
375            queue: "submission",
376            fix: "window payload argument count overflowed usize; split the payload before materializing a batch",
377        })?;
378    if required_args > ARGS_PER_SLOT_USIZE {
379        return Err(PipelineError::QueueFull {
380            queue: "submission",
381            fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
382        });
383    }
384    let mut args = Vec::new();
385    reserve_descriptor_vec(&mut args, required_args, "window payload arg")?;
386    Ok(args)
387}
388
389fn publish_window_payload(
390    ring_bytes: &mut [u8],
391    start_slot: u32,
392    slot_offset: &mut u32,
393    tenant_id: u32,
394    opcode: SlotOpcode,
395    ticket: u32,
396    class: WindowClass,
397    payload: &[u32],
398    args: &mut SmallVec<[u32; ARGS_PER_SLOT_USIZE]>,
399) -> Result<(), PipelineError> {
400    let slot_idx = start_slot
401        .checked_add(*slot_offset)
402        .ok_or(PipelineError::QueueFull {
403            queue: "submission",
404            fix: "window slot index overflowed u32; split the window before publishing",
405        })?;
406    args.clear();
407    let required_args = payload
408        .len()
409        .checked_add(2)
410        .ok_or(PipelineError::QueueFull {
411        queue: "submission",
412        fix: "window payload argument count overflowed usize; split the payload before publishing",
413    })?;
414    if required_args > ARGS_PER_SLOT_USIZE {
415        return Err(PipelineError::QueueFull {
416            queue: "submission",
417            fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
418        });
419    }
420    args.push(ticket);
421    args.push(class.into_wire());
422    args.extend_from_slice(payload);
423    Megakernel::publish_slot(ring_bytes, slot_idx, tenant_id, opcode.into_wire(), args)?;
424    *slot_offset = slot_offset.checked_add(1).ok_or(PipelineError::QueueFull {
425        queue: "submission",
426        fix: "window slot count overflowed u32; split the window before publishing",
427    })?;
428    Ok(())
429}
430
431#[cfg(test)]
432mod tests;