Skip to main content

vyre_runtime/megakernel/protocol_api/
publish.rs

1use crate::megakernel::planner::MegakernelWorkItem;
2use crate::megakernel::protocol::{self, slot, SLOT_WORDS};
3use crate::megakernel::{scheduler, Megakernel, PackedOpDescriptor};
4use crate::PipelineError;
5
6const SLOT_WORDS_USIZE: usize = 16;
7const STATUS_WORD_USIZE: usize = 0;
8const OPCODE_WORD_USIZE: usize = 1;
9const TENANT_WORD_USIZE: usize = 2;
10const PRIORITY_WORD_USIZE: usize = 3;
11const ARG0_WORD_USIZE: usize = 4;
12const ARGS_PER_SLOT_USIZE: usize = 12;
13
14#[derive(Debug, Clone, Copy)]
15struct RingPublishView {
16    slot_bytes: usize,
17    slot_capacity: usize,
18}
19
20/// Explicit host-observable ring slot lifecycle transition.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum RingSlotTransition {
23    /// Host publishes a fully written slot.
24    Publish,
25    /// Worker claims a published or scheduler-ready slot.
26    Claim,
27    /// Worker marks a claimed slot done.
28    Done,
29    /// Runtime marks an in-flight slot faulted.
30    Fault,
31    /// Host cancels an unclaimed in-flight slot.
32    Cancel,
33}
34
35impl RingSlotTransition {
36    fn label(self) -> &'static str {
37        match self {
38            Self::Publish => "publish",
39            Self::Claim => "claim",
40            Self::Done => "done",
41            Self::Fault => "fault",
42            Self::Cancel => "cancel",
43        }
44    }
45
46    fn target_status(self) -> u32 {
47        match self {
48            Self::Publish => slot::PUBLISHED,
49            Self::Claim => slot::CLAIMED,
50            Self::Done => slot::DONE,
51            Self::Fault => slot::FAULT,
52            Self::Cancel => slot::EMPTY,
53        }
54    }
55
56    fn allows(self, current_status: u32) -> bool {
57        match self {
58            Self::Publish => matches!(current_status, slot::EMPTY | slot::DONE),
59            Self::Claim => matches!(
60                current_status,
61                slot::PUBLISHED | slot::YIELD | slot::REQUEUE
62            ),
63            Self::Done => current_status == slot::CLAIMED,
64            Self::Fault => matches!(
65                current_status,
66                slot::PUBLISHED | slot::CLAIMED | slot::WAIT_IO | slot::YIELD | slot::REQUEUE
67            ),
68            Self::Cancel => matches!(
69                current_status,
70                slot::PUBLISHED | slot::WAIT_IO | slot::YIELD | slot::REQUEUE
71            ),
72        }
73    }
74}
75
76fn validate_ring_publish_view(ring_bytes: &[u8]) -> Result<RingPublishView, PipelineError> {
77    let slot_bytes = SLOT_WORDS_USIZE
78        .checked_mul(4)
79        .ok_or(PipelineError::QueueFull {
80            queue: "submission",
81            fix: "slot byte width overflowed usize; keep SLOT_WORDS within the u32 ABI",
82        })?;
83    if ring_bytes.len() % slot_bytes != 0 {
84        return Err(PipelineError::QueueFull {
85            queue: "submission",
86            fix: "ring buffer byte length is not an exact multiple of SLOT_WORDS * 4; rebuild it with Megakernel::encode_empty_ring",
87        });
88    }
89    Ok(RingPublishView {
90        slot_bytes,
91        slot_capacity: ring_bytes.len() / slot_bytes,
92    })
93}
94
95impl Megakernel {
96    /// Apply one explicit lifecycle transition to a ring slot status word.
97    ///
98    /// This helper is for host-side tests, recovery, cancellation, and
99    /// diagnostics. Normal publication should still use [`Megakernel::publish_slot`]
100    /// or batch publishers so payload words are written before the PUBLISHED
101    /// status barrier.
102    ///
103    /// # Errors
104    ///
105    /// Returns [`PipelineError::QueueFull`] when the slot is out of bounds,
106    /// the ring is malformed, or the requested transition is illegal for the
107    /// current status word.
108    pub fn transition_slot_status(
109        ring_bytes: &mut [u8],
110        slot_idx: u32,
111        transition: RingSlotTransition,
112    ) -> Result<u32, PipelineError> {
113        if transition == RingSlotTransition::Publish {
114            return Err(PipelineError::QueueFull {
115                queue: "submission",
116                fix: "publish transitions must use publish_slot or a batch publisher so payload words are written before PUBLISHED status",
117            });
118        }
119        let view = validate_ring_publish_view(ring_bytes)?;
120        let current_status = read_slot_status_word(ring_bytes, view, slot_idx)?;
121        validate_slot_transition(current_status, transition)?;
122        write_slot_status_word(ring_bytes, view, slot_idx, transition.target_status())?;
123        Ok(current_status)
124    }
125
126    /// Publish one opcode into `ring_bytes[slot_idx]`.
127    ///
128    /// # Errors
129    ///
130    /// [`PipelineError::QueueFull`] when out of bounds, too many args,
131    /// or the slot is still in flight.
132    pub fn publish_slot(
133        ring_bytes: &mut [u8],
134        slot_idx: u32,
135        tenant_id: u32,
136        opcode: u32,
137        args: &[u32],
138    ) -> Result<(), PipelineError> {
139        let view = validate_ring_publish_view(ring_bytes)?;
140        Self::publish_slot_validated(ring_bytes, view, slot_idx, tenant_id, opcode, args)
141    }
142
143    /// Reset `ring_bytes` to an empty ring and publish a contiguous `MegakernelWorkItem`
144    /// queue into slots `0..items.len()`.
145    ///
146    /// This is the hot-path publisher for one-shot megakernel launches. It
147    /// validates the full batch before mutating `ring_bytes`, encodes an empty
148    /// ring once, writes the fixed `MegakernelWorkItem` ABI directly, and stores
149    /// [`slot::PUBLISHED`] last for each slot.
150    ///
151    /// # Errors
152    ///
153    /// Returns [`PipelineError::QueueFull`] when `slot_count` cannot encode,
154    /// the queue does not fit in the ring, the slot ABI cannot hold a
155    /// `MegakernelWorkItem`, or an item opcode is not publishable.
156    pub fn encode_work_items_ring_into(
157        slot_count: u32,
158        tenant_id: u32,
159        items: &[MegakernelWorkItem],
160        ring_bytes: &mut Vec<u8>,
161    ) -> Result<(), PipelineError> {
162        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
163            queue: "submission",
164            fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
165        })?;
166        if item_count > slot_count {
167            return Err(PipelineError::QueueFull {
168                queue: "submission",
169                fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
170            });
171        }
172        if ARGS_PER_SLOT_USIZE < 3 {
173            return Err(PipelineError::QueueFull {
174                queue: "submission",
175                fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
176            });
177        }
178        for item in items {
179            if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
180                return Err(PipelineError::QueueFull {
181                    queue: "submission",
182                    fix,
183                });
184            }
185        }
186
187        protocol::try_encode_empty_ring_into(slot_count, ring_bytes)
188            .map_err(super::protocol_error)?;
189        let view = validate_ring_publish_view(ring_bytes)?;
190        debug_assert!(items.len() <= view.slot_capacity);
191
192        for (slot_idx, item) in items.iter().enumerate() {
193            let slot_idx = u32::try_from(slot_idx).map_err(|_| PipelineError::QueueFull {
194                queue: "submission",
195                fix: "work item publish slot index exceeds u32::MAX; split the publish batch",
196            })?;
197            write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
198        }
199        Ok(())
200    }
201
202    /// Publish a contiguous fixed-ABI work-item window into an existing ring
203    /// without resetting unrelated slots.
204    ///
205    /// This is the resident hot path for repeated megakernel queue updates:
206    /// validate the whole target window first, then write each slot once and
207    /// store [`slot::PUBLISHED`] last. Unlike
208    /// [`Megakernel::encode_work_items_ring_into`], this does not clear the
209    /// full ring, so sparse updates scale with `items.len()` rather than
210    /// `slot_count`.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`PipelineError::QueueFull`] when the target window is outside
215    /// the ring, any slot is still in flight, or an item opcode is not
216    /// publishable.
217    pub fn publish_work_items(
218        ring_bytes: &mut [u8],
219        start_slot: u32,
220        tenant_id: u32,
221        items: &[MegakernelWorkItem],
222    ) -> Result<u32, PipelineError> {
223        validate_work_items(items)?;
224        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
225            queue: "submission",
226            fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
227        })?;
228        let view = validate_ring_publish_view(ring_bytes)?;
229        let end_slot = start_slot
230            .checked_add(item_count)
231            .ok_or(PipelineError::QueueFull {
232                queue: "submission",
233                fix: "work item publish slot index overflowed u32; split the publish batch",
234            })?;
235        if u32_to_usize(end_slot)? > view.slot_capacity {
236            return Err(PipelineError::QueueFull {
237                queue: "submission",
238                fix:
239                    "work item publish exceeds ring slot count; enlarge the ring or split the batch",
240            });
241        }
242        for slot_idx in start_slot..end_slot {
243            validate_publishable_slot(ring_bytes, view, slot_idx)?;
244        }
245        for (offset, item) in items.iter().enumerate() {
246            let slot_idx = start_slot
247                .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
248                    queue: "submission",
249                    fix: "work item publish offset exceeds u32::MAX; split the publish batch",
250                })?)
251                .ok_or(PipelineError::QueueFull {
252                    queue: "submission",
253                    fix: "work item publish slot index overflowed u32; split the publish batch",
254                })?;
255            write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
256        }
257        Ok(item_count)
258    }
259
260    /// Reset `ring_words` to an empty ring and publish a contiguous `MegakernelWorkItem`
261    /// queue as native little-endian u32 words.
262    ///
263    /// This is equivalent to [`Megakernel::encode_work_items_ring_into`] but
264    /// avoids thousands of tiny byte-slice stores on hot dispatch paths. Callers
265    /// can pass the result to backends as bytes with `bytemuck::cast_slice`.
266    ///
267    /// # Errors
268    ///
269    /// Returns [`PipelineError::QueueFull`] when `slot_count` cannot encode,
270    /// the queue does not fit in the ring, the slot ABI cannot hold a
271    /// `MegakernelWorkItem`, or an item opcode is not publishable.
272    pub fn encode_work_items_ring_words_into(
273        slot_count: u32,
274        tenant_id: u32,
275        items: &[MegakernelWorkItem],
276        ring_words: &mut Vec<u32>,
277    ) -> Result<(), PipelineError> {
278        validate_work_item_batch(slot_count, items)?;
279        let total_words = encoded_ring_word_count(slot_count)?;
280
281        if ring_words.len() != total_words {
282            ring_words.clear();
283            ring_words.resize(total_words, 0);
284        } else {
285            let slot_count = u32_to_usize(slot_count)?;
286            for slot_idx in items.len()..slot_count {
287                ring_words[slot_idx * SLOT_WORDS_USIZE + STATUS_WORD_USIZE] = slot::EMPTY;
288            }
289        }
290
291        for (slot_idx, item) in items.iter().enumerate() {
292            let base = slot_idx * SLOT_WORDS_USIZE;
293            ring_words[base + OPCODE_WORD_USIZE] = item.op_handle;
294            ring_words[base + TENANT_WORD_USIZE] = tenant_id;
295            ring_words[base + PRIORITY_WORD_USIZE] = scheduler::priority::NORMAL;
296            ring_words[base + ARG0_WORD_USIZE] = item.input_handle;
297            ring_words[base + ARG0_WORD_USIZE + 1] = item.output_handle;
298            ring_words[base + ARG0_WORD_USIZE + 2] = item.param;
299            ring_words[base + STATUS_WORD_USIZE] = slot::PUBLISHED;
300        }
301        Ok(())
302    }
303
304    fn publish_slot_validated(
305        ring_bytes: &mut [u8],
306        view: RingPublishView,
307        slot_idx: u32,
308        tenant_id: u32,
309        opcode: u32,
310        args: &[u32],
311    ) -> Result<(), PipelineError> {
312        if u32_to_usize(slot_idx)? >= view.slot_capacity {
313            return Err(PipelineError::QueueFull {
314                queue: "submission",
315                fix: "slot_idx exceeds ring slot count; enlarge the ring via encode_empty_ring",
316            });
317        }
318        if args.len() > ARGS_PER_SLOT_USIZE {
319            return Err(PipelineError::QueueFull {
320                queue: "submission",
321                fix: "too many args for one slot; 12 u32 args max per slot",
322            });
323        }
324        if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
325            return Err(PipelineError::QueueFull {
326                queue: "submission",
327                fix,
328            });
329        }
330
331        let base = slot_base(slot_idx, view)?;
332        let read_word = |buf: &[u8], word_idx: usize| -> Result<u32, PipelineError> {
333            let off = base + word_idx * 4;
334            let bytes = buf.get(off..off + 4).ok_or(PipelineError::QueueFull {
335                queue: "submission",
336                fix: "slot word is outside the validated ring buffer; validate ring length before publishing",
337            })?;
338            let mut word = [0u8; 4];
339            word.copy_from_slice(bytes);
340            Ok(u32::from_le_bytes(word))
341        };
342
343        let current_status = read_word(ring_bytes, STATUS_WORD_USIZE)?;
344        validate_slot_transition(current_status, RingSlotTransition::Publish)?;
345
346        let write_word = |buf: &mut [u8], word_idx: usize, value: u32| {
347            let off = base + word_idx * 4;
348            buf[off..off + 4].copy_from_slice(&value.to_le_bytes());
349        };
350
351        write_word(ring_bytes, OPCODE_WORD_USIZE, opcode);
352        write_word(ring_bytes, TENANT_WORD_USIZE, tenant_id);
353        write_word(ring_bytes, PRIORITY_WORD_USIZE, scheduler::priority::NORMAL);
354        let args_start = base + ARG0_WORD_USIZE * 4;
355        let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
356        ring_bytes[args_start..args_end].fill(0);
357        for (i, arg) in args.iter().enumerate() {
358            write_word(ring_bytes, ARG0_WORD_USIZE + i, *arg);
359        }
360        // Status last  -  PUBLISH is the publish barrier.
361        write_word(ring_bytes, STATUS_WORD_USIZE, slot::PUBLISHED);
362
363        Ok(())
364    }
365
366    /// Publish one packed slot containing multiple inner ops.
367    ///
368    /// The inner opcode id is stored as `u8`; args are packed into the slot's
369    /// 12-word payload tail and addressed by per-op `arg_offset` values.
370    ///
371    /// # Errors
372    ///
373    /// Returns [`PipelineError::QueueFull`] when the packed payload exceeds
374    /// the slot capacity or when the target slot is not publishable.
375    pub fn publish_packed_slot<A>(
376        ring_bytes: &mut [u8],
377        slot_idx: u32,
378        tenant_id: u32,
379        ops: &[(u8, A)],
380    ) -> Result<(), PipelineError>
381    where
382        A: AsRef<[u32]>,
383    {
384        Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
385            let (op_id, args) = &ops[index];
386            (*op_id, args.as_ref())
387        })
388    }
389
390    pub(crate) fn publish_packed_descriptors(
391        ring_bytes: &mut [u8],
392        slot_idx: u32,
393        tenant_id: u32,
394        ops: &[PackedOpDescriptor],
395    ) -> Result<(), PipelineError> {
396        Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
397            let op = &ops[index];
398            (op.opcode, op.args.as_slice())
399        })
400    }
401
402    fn publish_packed_slot_from<'a>(
403        ring_bytes: &mut [u8],
404        slot_idx: u32,
405        tenant_id: u32,
406        op_count: usize,
407        mut op_at: impl FnMut(usize) -> (u8, &'a [u32]),
408    ) -> Result<(), PipelineError> {
409        let opcode_count = u8::try_from(op_count).map_err(|_| PipelineError::QueueFull {
410            queue: "submission",
411            fix: "packed slot supports at most 255 inner opcodes",
412        })?;
413        let metadata_bytes = op_count
414            .checked_mul(2)
415            .and_then(|bytes| bytes.checked_add(2))
416            .ok_or(PipelineError::QueueFull {
417                queue: "submission",
418                fix: "packed slot metadata length overflowed usize; reduce packed opcode count",
419            })?;
420        let metadata_words = metadata_bytes.div_ceil(4);
421        if metadata_words > ARGS_PER_SLOT_USIZE {
422            return Err(PipelineError::QueueFull {
423                queue: "submission",
424                fix: "packed slot metadata exceeds the 12-word slot argument budget",
425            });
426        }
427
428        let mut packed_args = [0u32; ARGS_PER_SLOT_USIZE];
429        let mut packed_arg_words = 0usize;
430        let mut args = [0u32; ARGS_PER_SLOT_USIZE];
431        write_packed_metadata_byte(&mut args, 0, opcode_count);
432        let metadata_payload_bytes =
433            metadata_words
434                .checked_mul(4)
435                .ok_or(PipelineError::QueueFull {
436                queue: "submission",
437                fix:
438                    "packed slot metadata byte length overflowed usize; reduce packed opcode count",
439            })?;
440        for index in 0..op_count {
441            let arg_offset =
442                u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
443                    queue: "submission",
444                    fix: "packed slot arg offsets must fit in one u8 word index",
445                })?;
446            let (op_id, op_args) = op_at(index);
447            let end =
448                packed_arg_words
449                    .checked_add(op_args.len())
450                    .ok_or(PipelineError::QueueFull {
451                        queue: "submission",
452                        fix: "packed slot arg word count overflowed usize; reduce packed args",
453                    })?;
454            let total_words = metadata_words
455                .checked_add(end)
456                .ok_or(PipelineError::QueueFull {
457                    queue: "submission",
458                    fix: "packed slot total word count overflowed usize; reduce packed args",
459                })?;
460            if total_words > ARGS_PER_SLOT_USIZE {
461                return Err(PipelineError::QueueFull {
462                    queue: "submission",
463                    fix: "packed slot payload exceeds the 12-word slot argument budget",
464                });
465            }
466            packed_args[packed_arg_words..end].copy_from_slice(op_args);
467            packed_arg_words = end;
468
469            let byte_index = 2 + index * 2;
470            debug_assert!(byte_index + 1 < metadata_payload_bytes);
471            write_packed_metadata_byte(&mut args, byte_index, op_id);
472            write_packed_metadata_byte(&mut args, byte_index + 1, arg_offset);
473        }
474
475        // Byte 1: total packed arg word count, so the host-side
476        // decoder can slice off the correct portion without relying
477        // on trailing-zero heuristics (slot memory can legitimately
478        // contain zero arg values, and rings aren't guaranteed zero
479        // after wrap-around).
480        let packed_arg_words_u8 =
481            u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
482                queue: "submission",
483                fix: "packed slot total arg words must fit in one u8",
484            })?;
485        write_packed_metadata_byte(&mut args, 1, packed_arg_words_u8);
486        let total_words = metadata_words + packed_arg_words;
487        args[metadata_words..total_words].copy_from_slice(&packed_args[..packed_arg_words]);
488        Self::publish_slot(
489            ring_bytes,
490            slot_idx,
491            tenant_id,
492            protocol::opcode::PACKED_SLOT,
493            &args[..total_words],
494        )
495    }
496
497    /// Publish multiple slots atomically  -  the final slot is a
498    /// `BATCH_FENCE` that signals completion to the host. This is
499    /// the high-throughput entry point for scanner pipelines: publish
500    /// N work items + 1 fence in one call.
501    ///
502    /// # Errors
503    ///
504    /// [`PipelineError::QueueFull`] if any slot rejects.
505    pub fn batch_publish<A>(
506        ring_bytes: &mut [u8],
507        start_slot: u32,
508        tenant_id: u32,
509        items: &[(u32, A)], // (opcode, args) pairs
510        batch_tag: u32,
511    ) -> Result<u32, PipelineError>
512    where
513        A: AsRef<[u32]>,
514    {
515        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
516            queue: "submission",
517            fix: "batch item count exceeds u32::MAX; split the publish batch",
518        })?;
519        let view = validate_ring_publish_view(ring_bytes)?;
520        let total_slots = item_count.checked_add(1).ok_or(PipelineError::QueueFull {
521            queue: "submission",
522            fix: "batch publish slot count overflowed u32; split the publish batch",
523        })?;
524        let end_slot = start_slot
525            .checked_add(total_slots)
526            .ok_or(PipelineError::QueueFull {
527                queue: "submission",
528                fix: "batch publish slot index overflowed u32; split the publish batch",
529            })?;
530        if u32_to_usize(end_slot)? > view.slot_capacity {
531            return Err(PipelineError::QueueFull {
532                queue: "submission",
533                fix: "batch publish exceeds ring slot count; enlarge the ring or split the batch",
534            });
535        }
536        for (opcode, args) in items {
537            validate_publish_payload(*opcode, args.as_ref())?;
538        }
539        validate_publish_payload(protocol::opcode::BATCH_FENCE, &[item_count, batch_tag])?;
540        for slot_idx in start_slot..end_slot {
541            validate_publishable_slot(ring_bytes, view, slot_idx)?;
542        }
543
544        for (offset, (opcode, args)) in items.iter().enumerate() {
545            let slot_idx = start_slot
546                .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
547                    queue: "submission",
548                    fix: "batch publish offset exceeds u32::MAX; split the publish batch",
549                })?)
550                .ok_or(PipelineError::QueueFull {
551                    queue: "submission",
552                    fix: "batch publish slot index overflowed u32; split the publish batch",
553                })?;
554            write_slot_unchecked(
555                ring_bytes,
556                view,
557                slot_idx,
558                tenant_id,
559                *opcode,
560                args.as_ref(),
561            )?;
562        }
563        let fence_slot = start_slot
564            .checked_add(item_count)
565            .ok_or(PipelineError::QueueFull {
566                queue: "submission",
567                fix: "batch publish fence slot overflowed u32; split the publish batch",
568            })?;
569        write_slot_unchecked(
570            ring_bytes,
571            view,
572            fence_slot,
573            tenant_id,
574            protocol::opcode::BATCH_FENCE,
575            &[item_count, batch_tag],
576        )?;
577        fence_slot
578            .checked_add(1)
579            .and_then(|end| end.checked_sub(start_slot))
580            .ok_or(PipelineError::QueueFull {
581                queue: "submission",
582                fix: "batch publish consumed-slot count overflowed u32; split the publish batch",
583            })
584    }
585}
586
587fn validate_publish_payload(opcode: u32, args: &[u32]) -> Result<(), PipelineError> {
588    if args.len() > ARGS_PER_SLOT_USIZE {
589        return Err(PipelineError::QueueFull {
590            queue: "submission",
591            fix: "too many args for one slot; 12 u32 args max per slot",
592        });
593    }
594    if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
595        return Err(PipelineError::QueueFull {
596            queue: "submission",
597            fix,
598        });
599    }
600    Ok(())
601}
602
603fn u32_to_usize(value: u32) -> Result<usize, PipelineError> {
604    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
605        queue: "submission",
606        fix: "u32 slot index cannot fit host usize; shard the megakernel ring for this target",
607    })
608}
609
610fn slot_base(slot_idx: u32, view: RingPublishView) -> Result<usize, PipelineError> {
611    u32_to_usize(slot_idx)?
612        .checked_mul(view.slot_bytes)
613        .ok_or(PipelineError::QueueFull {
614            queue: "submission",
615            fix: "slot byte offset overflowed usize; shard the ring before publishing",
616        })
617}
618
619fn validate_publishable_slot(
620    ring_bytes: &[u8],
621    view: RingPublishView,
622    slot_idx: u32,
623) -> Result<(), PipelineError> {
624    let current_status = read_slot_status_word(ring_bytes, view, slot_idx)?;
625    validate_slot_transition(current_status, RingSlotTransition::Publish)
626}
627
628fn validate_slot_transition(
629    current_status: u32,
630    transition: RingSlotTransition,
631) -> Result<(), PipelineError> {
632    if transition.allows(current_status) {
633        return Ok(());
634    }
635    Err(PipelineError::QueueFull {
636        queue: "submission",
637        fix: illegal_transition_fix(transition, current_status),
638    })
639}
640
641fn illegal_transition_fix(transition: RingSlotTransition, _current_status: u32) -> &'static str {
642    match transition {
643        RingSlotTransition::Publish => {
644            "slot is not publishable; only EMPTY and DONE slots may be written by the host"
645        }
646        RingSlotTransition::Claim => {
647            "illegal ring slot transition: claim requires PUBLISHED, YIELD, or REQUEUE status"
648        }
649        RingSlotTransition::Done => "illegal ring slot transition: done requires CLAIMED status",
650        RingSlotTransition::Fault => {
651            "illegal ring slot transition: fault requires an in-flight slot status"
652        }
653        RingSlotTransition::Cancel => {
654            "illegal ring slot transition: cancel requires an unclaimed in-flight slot status"
655        }
656    }
657}
658
659fn slot_status_offset(slot_idx: u32, view: RingPublishView) -> Result<usize, PipelineError> {
660    let base = slot_base(slot_idx, view)?;
661    base.checked_add(
662        STATUS_WORD_USIZE
663            .checked_mul(4)
664            .ok_or(PipelineError::QueueFull {
665            queue: "submission",
666            fix:
667                "slot status word byte offset overflowed usize; keep SLOT_WORDS within the u32 ABI",
668        })?,
669    )
670    .ok_or(PipelineError::QueueFull {
671        queue: "submission",
672        fix: "slot status byte offset overflowed usize; shard the ring before publishing",
673    })
674}
675
676fn read_slot_status_word(
677    ring_bytes: &[u8],
678    view: RingPublishView,
679    slot_idx: u32,
680) -> Result<u32, PipelineError> {
681    let status_offset = slot_status_offset(slot_idx, view)?;
682    let status_end = status_offset
683        .checked_add(4)
684        .ok_or(PipelineError::QueueFull {
685            queue: "submission",
686            fix: "slot status byte end overflowed usize; shard the ring before publishing",
687        })?;
688    let status_bytes = ring_bytes
689        .get(status_offset..status_end)
690        .ok_or(PipelineError::QueueFull {
691            queue: "submission",
692            fix: "slot status is outside the validated ring buffer; validate ring length before publishing",
693        })?;
694    Ok(u32::from_le_bytes([
695        status_bytes[0],
696        status_bytes[1],
697        status_bytes[2],
698        status_bytes[3],
699    ]))
700}
701
702fn write_slot_status_word(
703    ring_bytes: &mut [u8],
704    view: RingPublishView,
705    slot_idx: u32,
706    value: u32,
707) -> Result<(), PipelineError> {
708    let status_offset = slot_status_offset(slot_idx, view)?;
709    let status_end = status_offset
710        .checked_add(4)
711        .ok_or(PipelineError::QueueFull {
712            queue: "submission",
713            fix: "slot status byte end overflowed usize; shard the ring before publishing",
714        })?;
715    let status_bytes = ring_bytes
716        .get_mut(status_offset..status_end)
717        .ok_or(PipelineError::QueueFull {
718            queue: "submission",
719            fix: "slot status is outside the validated ring buffer; validate ring length before publishing",
720        })?;
721    status_bytes.copy_from_slice(&value.to_le_bytes());
722    Ok(())
723}
724
725fn write_slot_unchecked(
726    ring_bytes: &mut [u8],
727    view: RingPublishView,
728    slot_idx: u32,
729    tenant_id: u32,
730    opcode: u32,
731    args: &[u32],
732) -> Result<(), PipelineError> {
733    let base = slot_base(slot_idx, view)?;
734    write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, opcode);
735    write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
736    write_slot_word(
737        ring_bytes,
738        base,
739        PRIORITY_WORD_USIZE,
740        scheduler::priority::NORMAL,
741    );
742    let args_start = base + ARG0_WORD_USIZE * 4;
743    let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
744    ring_bytes[args_start..args_end].fill(0);
745    for (index, arg) in args.iter().enumerate() {
746        write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + index, *arg);
747    }
748    write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
749    Ok(())
750}
751
752fn write_work_item_unchecked(
753    ring_bytes: &mut [u8],
754    view: RingPublishView,
755    slot_idx: u32,
756    tenant_id: u32,
757    item: &MegakernelWorkItem,
758) -> Result<(), PipelineError> {
759    let base = slot_base(slot_idx, view)?;
760    write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, item.op_handle);
761    write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
762    write_slot_word(
763        ring_bytes,
764        base,
765        PRIORITY_WORD_USIZE,
766        scheduler::priority::NORMAL,
767    );
768    let args_start = base + ARG0_WORD_USIZE * 4;
769    let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
770    ring_bytes[args_start..args_end].fill(0);
771    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE, item.input_handle);
772    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 1, item.output_handle);
773    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 2, item.param);
774    write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
775    Ok(())
776}
777
778fn validate_work_item_batch(
779    slot_count: u32,
780    items: &[MegakernelWorkItem],
781) -> Result<(), PipelineError> {
782    let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
783        queue: "submission",
784        fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
785    })?;
786    if item_count > slot_count {
787        return Err(PipelineError::QueueFull {
788            queue: "submission",
789            fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
790        });
791    }
792    validate_work_items(items)
793}
794
795fn validate_work_items(items: &[MegakernelWorkItem]) -> Result<(), PipelineError> {
796    if ARGS_PER_SLOT_USIZE < 3 {
797        return Err(PipelineError::QueueFull {
798            queue: "submission",
799            fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
800        });
801    }
802    for item in items {
803        if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
804            return Err(PipelineError::QueueFull {
805                queue: "submission",
806                fix,
807            });
808        }
809    }
810    Ok(())
811}
812
813fn encoded_ring_word_count(slot_count: u32) -> Result<usize, PipelineError> {
814    if slot_count > protocol::MAX_ENCODED_RING_SLOTS {
815        return Err(PipelineError::QueueFull {
816            queue: "submission",
817            fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
818        });
819    }
820    let words = slot_count
821        .checked_mul(SLOT_WORDS)
822        .ok_or(PipelineError::QueueFull {
823            queue: "submission",
824            fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
825        })?;
826    usize::try_from(words).map_err(|_| PipelineError::QueueFull {
827        queue: "submission",
828        fix: "split the dispatch into smaller ring shards before encoding; ring word count does not fit usize",
829    })
830}
831
832fn write_slot_word(ring_bytes: &mut [u8], slot_base: usize, word_idx: usize, value: u32) {
833    let off = slot_base + word_idx * 4;
834    ring_bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
835}
836
837fn write_packed_metadata_byte(args: &mut [u32; ARGS_PER_SLOT_USIZE], byte_index: usize, value: u8) {
838    let word_index = byte_index / 4;
839    let shift = ((byte_index % 4) * 8) as u32;
840    args[word_index] |= u32::from(value) << shift;
841}