Skip to main content

vyre_runtime/megakernel/protocol_api/
publish.rs

1use crate::megakernel::planner::MegakernelWorkItem;
2use crate::megakernel::protocol::{self, slot, SLOT_WORDS};
3use crate::megakernel::{scheduler, Megakernel, PackedOpDescriptor};
4use crate::PipelineError;
5
6const SLOT_WORDS_USIZE: usize = 16;
7const STATUS_WORD_USIZE: usize = 0;
8const OPCODE_WORD_USIZE: usize = 1;
9const TENANT_WORD_USIZE: usize = 2;
10const PRIORITY_WORD_USIZE: usize = 3;
11const ARG0_WORD_USIZE: usize = 4;
12const ARGS_PER_SLOT_USIZE: usize = 12;
13
14#[derive(Debug, Clone, Copy)]
15struct RingPublishView {
16    slot_bytes: usize,
17    slot_capacity: usize,
18}
19
20fn validate_ring_publish_view(ring_bytes: &[u8]) -> Result<RingPublishView, PipelineError> {
21    let slot_bytes = SLOT_WORDS_USIZE
22        .checked_mul(4)
23        .ok_or(PipelineError::QueueFull {
24            queue: "submission",
25            fix: "slot byte width overflowed usize; keep SLOT_WORDS within the u32 ABI",
26        })?;
27    if ring_bytes.len() % slot_bytes != 0 {
28        return Err(PipelineError::QueueFull {
29            queue: "submission",
30            fix: "ring buffer byte length is not an exact multiple of SLOT_WORDS * 4; rebuild it with Megakernel::encode_empty_ring",
31        });
32    }
33    Ok(RingPublishView {
34        slot_bytes,
35        slot_capacity: ring_bytes.len() / slot_bytes,
36    })
37}
38
39impl Megakernel {
40    /// Publish one opcode into `ring_bytes[slot_idx]`.
41    ///
42    /// # Errors
43    ///
44    /// [`PipelineError::QueueFull`] when out of bounds, too many args,
45    /// or the slot is still in flight.
46    pub fn publish_slot(
47        ring_bytes: &mut [u8],
48        slot_idx: u32,
49        tenant_id: u32,
50        opcode: u32,
51        args: &[u32],
52    ) -> Result<(), PipelineError> {
53        let view = validate_ring_publish_view(ring_bytes)?;
54        Self::publish_slot_validated(ring_bytes, view, slot_idx, tenant_id, opcode, args)
55    }
56
57    /// Reset `ring_bytes` to an empty ring and publish a contiguous `MegakernelWorkItem`
58    /// queue into slots `0..items.len()`.
59    ///
60    /// This is the hot-path publisher for one-shot megakernel launches. It
61    /// validates the full batch before mutating `ring_bytes`, encodes an empty
62    /// ring once, writes the fixed `MegakernelWorkItem` ABI directly, and stores
63    /// [`slot::PUBLISHED`] last for each slot.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`PipelineError::QueueFull`] when `slot_count` cannot encode,
68    /// the queue does not fit in the ring, the slot ABI cannot hold a
69    /// `MegakernelWorkItem`, or an item opcode is not publishable.
70    pub fn encode_work_items_ring_into(
71        slot_count: u32,
72        tenant_id: u32,
73        items: &[MegakernelWorkItem],
74        ring_bytes: &mut Vec<u8>,
75    ) -> Result<(), PipelineError> {
76        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
77            queue: "submission",
78            fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
79        })?;
80        if item_count > slot_count {
81            return Err(PipelineError::QueueFull {
82                queue: "submission",
83                fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
84            });
85        }
86        if ARGS_PER_SLOT_USIZE < 3 {
87            return Err(PipelineError::QueueFull {
88                queue: "submission",
89                fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
90            });
91        }
92        for item in items {
93            if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
94                return Err(PipelineError::QueueFull {
95                    queue: "submission",
96                    fix,
97                });
98            }
99        }
100
101        protocol::try_encode_empty_ring_into(slot_count, ring_bytes)
102            .map_err(super::protocol_error)?;
103        let view = validate_ring_publish_view(ring_bytes)?;
104        debug_assert!(items.len() <= view.slot_capacity);
105
106        for (slot_idx, item) in items.iter().enumerate() {
107            let slot_idx = u32::try_from(slot_idx).map_err(|_| PipelineError::QueueFull {
108                queue: "submission",
109                fix: "work item publish slot index exceeds u32::MAX; split the publish batch",
110            })?;
111            write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
112        }
113        Ok(())
114    }
115
116    /// Publish a contiguous fixed-ABI work-item window into an existing ring
117    /// without resetting unrelated slots.
118    ///
119    /// This is the resident hot path for repeated megakernel queue updates:
120    /// validate the whole target window first, then write each slot once and
121    /// store [`slot::PUBLISHED`] last. Unlike
122    /// [`Megakernel::encode_work_items_ring_into`], this does not clear the
123    /// full ring, so sparse updates scale with `items.len()` rather than
124    /// `slot_count`.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`PipelineError::QueueFull`] when the target window is outside
129    /// the ring, any slot is still in flight, or an item opcode is not
130    /// publishable.
131    pub fn publish_work_items(
132        ring_bytes: &mut [u8],
133        start_slot: u32,
134        tenant_id: u32,
135        items: &[MegakernelWorkItem],
136    ) -> Result<u32, PipelineError> {
137        validate_work_items(items)?;
138        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
139            queue: "submission",
140            fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
141        })?;
142        let view = validate_ring_publish_view(ring_bytes)?;
143        let end_slot = start_slot
144            .checked_add(item_count)
145            .ok_or(PipelineError::QueueFull {
146                queue: "submission",
147                fix: "work item publish slot index overflowed u32; split the publish batch",
148            })?;
149        if u32_to_usize(end_slot)? > view.slot_capacity {
150            return Err(PipelineError::QueueFull {
151                queue: "submission",
152                fix:
153                    "work item publish exceeds ring slot count; enlarge the ring or split the batch",
154            });
155        }
156        for slot_idx in start_slot..end_slot {
157            validate_publishable_slot(ring_bytes, view, slot_idx)?;
158        }
159        for (offset, item) in items.iter().enumerate() {
160            let slot_idx = start_slot
161                .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
162                    queue: "submission",
163                    fix: "work item publish offset exceeds u32::MAX; split the publish batch",
164                })?)
165                .ok_or(PipelineError::QueueFull {
166                    queue: "submission",
167                    fix: "work item publish slot index overflowed u32; split the publish batch",
168                })?;
169            write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
170        }
171        Ok(item_count)
172    }
173
174    /// Reset `ring_words` to an empty ring and publish a contiguous `MegakernelWorkItem`
175    /// queue as native little-endian u32 words.
176    ///
177    /// This is equivalent to [`Megakernel::encode_work_items_ring_into`] but
178    /// avoids thousands of tiny byte-slice stores on hot dispatch paths. Callers
179    /// can pass the result to backends as bytes with `bytemuck::cast_slice`.
180    ///
181    /// # Errors
182    ///
183    /// Returns [`PipelineError::QueueFull`] when `slot_count` cannot encode,
184    /// the queue does not fit in the ring, the slot ABI cannot hold a
185    /// `MegakernelWorkItem`, or an item opcode is not publishable.
186    pub fn encode_work_items_ring_words_into(
187        slot_count: u32,
188        tenant_id: u32,
189        items: &[MegakernelWorkItem],
190        ring_words: &mut Vec<u32>,
191    ) -> Result<(), PipelineError> {
192        validate_work_item_batch(slot_count, items)?;
193        let total_words = encoded_ring_word_count(slot_count)?;
194
195        if ring_words.len() != total_words {
196            ring_words.clear();
197            ring_words.resize(total_words, 0);
198        } else {
199            let slot_count = u32_to_usize(slot_count)?;
200            for slot_idx in items.len()..slot_count {
201                ring_words[slot_idx * SLOT_WORDS_USIZE + STATUS_WORD_USIZE] = slot::EMPTY;
202            }
203        }
204
205        for (slot_idx, item) in items.iter().enumerate() {
206            let base = slot_idx * SLOT_WORDS_USIZE;
207            ring_words[base + OPCODE_WORD_USIZE] = item.op_handle;
208            ring_words[base + TENANT_WORD_USIZE] = tenant_id;
209            ring_words[base + PRIORITY_WORD_USIZE] = scheduler::priority::NORMAL;
210            ring_words[base + ARG0_WORD_USIZE] = item.input_handle;
211            ring_words[base + ARG0_WORD_USIZE + 1] = item.output_handle;
212            ring_words[base + ARG0_WORD_USIZE + 2] = item.param;
213            ring_words[base + STATUS_WORD_USIZE] = slot::PUBLISHED;
214        }
215        Ok(())
216    }
217
218    fn publish_slot_validated(
219        ring_bytes: &mut [u8],
220        view: RingPublishView,
221        slot_idx: u32,
222        tenant_id: u32,
223        opcode: u32,
224        args: &[u32],
225    ) -> Result<(), PipelineError> {
226        if u32_to_usize(slot_idx)? >= view.slot_capacity {
227            return Err(PipelineError::QueueFull {
228                queue: "submission",
229                fix: "slot_idx exceeds ring slot count; enlarge the ring via encode_empty_ring",
230            });
231        }
232        if args.len() > ARGS_PER_SLOT_USIZE {
233            return Err(PipelineError::QueueFull {
234                queue: "submission",
235                fix: "too many args for one slot; 12 u32 args max per slot",
236            });
237        }
238        if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
239            return Err(PipelineError::QueueFull {
240                queue: "submission",
241                fix,
242            });
243        }
244
245        let base = slot_base(slot_idx, view)?;
246        let read_word = |buf: &[u8], word_idx: usize| -> Result<u32, PipelineError> {
247            let off = base + word_idx * 4;
248            let bytes = buf.get(off..off + 4).ok_or(PipelineError::QueueFull {
249                queue: "submission",
250                fix: "slot word is outside the validated ring buffer; validate ring length before publishing",
251            })?;
252            let mut word = [0u8; 4];
253            word.copy_from_slice(bytes);
254            Ok(u32::from_le_bytes(word))
255        };
256
257        let current_status = read_word(ring_bytes, STATUS_WORD_USIZE)?;
258        if current_status != slot::EMPTY && current_status != slot::DONE {
259            return Err(PipelineError::QueueFull {
260                queue: "submission",
261                fix:
262                    "slot is not publishable; only EMPTY and DONE slots may be written by the host",
263            });
264        }
265
266        let write_word = |buf: &mut [u8], word_idx: usize, value: u32| {
267            let off = base + word_idx * 4;
268            buf[off..off + 4].copy_from_slice(&value.to_le_bytes());
269        };
270
271        write_word(ring_bytes, OPCODE_WORD_USIZE, opcode);
272        write_word(ring_bytes, TENANT_WORD_USIZE, tenant_id);
273        write_word(ring_bytes, PRIORITY_WORD_USIZE, scheduler::priority::NORMAL);
274        let args_start = base + ARG0_WORD_USIZE * 4;
275        let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
276        ring_bytes[args_start..args_end].fill(0);
277        for (i, arg) in args.iter().enumerate() {
278            write_word(ring_bytes, ARG0_WORD_USIZE + i, *arg);
279        }
280        // Status last  -  PUBLISH is the publish barrier.
281        write_word(ring_bytes, STATUS_WORD_USIZE, slot::PUBLISHED);
282
283        Ok(())
284    }
285
286    /// Publish one packed slot containing multiple inner ops.
287    ///
288    /// The inner opcode id is stored as `u8`; args are packed into the slot's
289    /// 12-word payload tail and addressed by per-op `arg_offset` values.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`PipelineError::QueueFull`] when the packed payload exceeds
294    /// the slot capacity or when the target slot is not publishable.
295    pub fn publish_packed_slot<A>(
296        ring_bytes: &mut [u8],
297        slot_idx: u32,
298        tenant_id: u32,
299        ops: &[(u8, A)],
300    ) -> Result<(), PipelineError>
301    where
302        A: AsRef<[u32]>,
303    {
304        Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
305            let (op_id, args) = &ops[index];
306            (*op_id, args.as_ref())
307        })
308    }
309
310    pub(crate) fn publish_packed_descriptors(
311        ring_bytes: &mut [u8],
312        slot_idx: u32,
313        tenant_id: u32,
314        ops: &[PackedOpDescriptor],
315    ) -> Result<(), PipelineError> {
316        Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
317            let op = &ops[index];
318            (op.opcode, op.args.as_slice())
319        })
320    }
321
322    fn publish_packed_slot_from<'a>(
323        ring_bytes: &mut [u8],
324        slot_idx: u32,
325        tenant_id: u32,
326        op_count: usize,
327        mut op_at: impl FnMut(usize) -> (u8, &'a [u32]),
328    ) -> Result<(), PipelineError> {
329        let opcode_count = u8::try_from(op_count).map_err(|_| PipelineError::QueueFull {
330            queue: "submission",
331            fix: "packed slot supports at most 255 inner opcodes",
332        })?;
333        let metadata_bytes = op_count
334            .checked_mul(2)
335            .and_then(|bytes| bytes.checked_add(2))
336            .ok_or(PipelineError::QueueFull {
337                queue: "submission",
338                fix: "packed slot metadata length overflowed usize; reduce packed opcode count",
339            })?;
340        let metadata_words = metadata_bytes.div_ceil(4);
341        if metadata_words > ARGS_PER_SLOT_USIZE {
342            return Err(PipelineError::QueueFull {
343                queue: "submission",
344                fix: "packed slot metadata exceeds the 12-word slot argument budget",
345            });
346        }
347
348        let mut packed_args = [0u32; ARGS_PER_SLOT_USIZE];
349        let mut packed_arg_words = 0usize;
350        let mut args = [0u32; ARGS_PER_SLOT_USIZE];
351        write_packed_metadata_byte(&mut args, 0, opcode_count);
352        let metadata_payload_bytes =
353            metadata_words
354                .checked_mul(4)
355                .ok_or(PipelineError::QueueFull {
356                queue: "submission",
357                fix:
358                    "packed slot metadata byte length overflowed usize; reduce packed opcode count",
359            })?;
360        for index in 0..op_count {
361            let arg_offset =
362                u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
363                    queue: "submission",
364                    fix: "packed slot arg offsets must fit in one u8 word index",
365                })?;
366            let (op_id, op_args) = op_at(index);
367            let end =
368                packed_arg_words
369                    .checked_add(op_args.len())
370                    .ok_or(PipelineError::QueueFull {
371                        queue: "submission",
372                        fix: "packed slot arg word count overflowed usize; reduce packed args",
373                    })?;
374            let total_words = metadata_words
375                .checked_add(end)
376                .ok_or(PipelineError::QueueFull {
377                    queue: "submission",
378                    fix: "packed slot total word count overflowed usize; reduce packed args",
379                })?;
380            if total_words > ARGS_PER_SLOT_USIZE {
381                return Err(PipelineError::QueueFull {
382                    queue: "submission",
383                    fix: "packed slot payload exceeds the 12-word slot argument budget",
384                });
385            }
386            packed_args[packed_arg_words..end].copy_from_slice(op_args);
387            packed_arg_words = end;
388
389            let byte_index = 2 + index * 2;
390            debug_assert!(byte_index + 1 < metadata_payload_bytes);
391            write_packed_metadata_byte(&mut args, byte_index, op_id);
392            write_packed_metadata_byte(&mut args, byte_index + 1, arg_offset);
393        }
394
395        // Byte 1: total packed arg word count, so the host-side
396        // decoder can slice off the correct portion without relying
397        // on trailing-zero heuristics (slot memory can legitimately
398        // contain zero arg values, and rings aren't guaranteed zero
399        // after wrap-around).
400        let packed_arg_words_u8 =
401            u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
402                queue: "submission",
403                fix: "packed slot total arg words must fit in one u8",
404            })?;
405        write_packed_metadata_byte(&mut args, 1, packed_arg_words_u8);
406        let total_words = metadata_words + packed_arg_words;
407        args[metadata_words..total_words].copy_from_slice(&packed_args[..packed_arg_words]);
408        Self::publish_slot(
409            ring_bytes,
410            slot_idx,
411            tenant_id,
412            protocol::opcode::PACKED_SLOT,
413            &args[..total_words],
414        )
415    }
416
417    /// Publish multiple slots atomically  -  the final slot is a
418    /// `BATCH_FENCE` that signals completion to the host. This is
419    /// the high-throughput entry point for scanner pipelines: publish
420    /// N work items + 1 fence in one call.
421    ///
422    /// # Errors
423    ///
424    /// [`PipelineError::QueueFull`] if any slot rejects.
425    pub fn batch_publish<A>(
426        ring_bytes: &mut [u8],
427        start_slot: u32,
428        tenant_id: u32,
429        items: &[(u32, A)], // (opcode, args) pairs
430        batch_tag: u32,
431    ) -> Result<u32, PipelineError>
432    where
433        A: AsRef<[u32]>,
434    {
435        let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
436            queue: "submission",
437            fix: "batch item count exceeds u32::MAX; split the publish batch",
438        })?;
439        let view = validate_ring_publish_view(ring_bytes)?;
440        let total_slots = item_count.checked_add(1).ok_or(PipelineError::QueueFull {
441            queue: "submission",
442            fix: "batch publish slot count overflowed u32; split the publish batch",
443        })?;
444        let end_slot = start_slot
445            .checked_add(total_slots)
446            .ok_or(PipelineError::QueueFull {
447                queue: "submission",
448                fix: "batch publish slot index overflowed u32; split the publish batch",
449            })?;
450        if u32_to_usize(end_slot)? > view.slot_capacity {
451            return Err(PipelineError::QueueFull {
452                queue: "submission",
453                fix: "batch publish exceeds ring slot count; enlarge the ring or split the batch",
454            });
455        }
456        for (opcode, args) in items {
457            validate_publish_payload(*opcode, args.as_ref())?;
458        }
459        validate_publish_payload(protocol::opcode::BATCH_FENCE, &[item_count, batch_tag])?;
460        for slot_idx in start_slot..end_slot {
461            validate_publishable_slot(ring_bytes, view, slot_idx)?;
462        }
463
464        for (offset, (opcode, args)) in items.iter().enumerate() {
465            let slot_idx = start_slot
466                .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
467                    queue: "submission",
468                    fix: "batch publish offset exceeds u32::MAX; split the publish batch",
469                })?)
470                .ok_or(PipelineError::QueueFull {
471                    queue: "submission",
472                    fix: "batch publish slot index overflowed u32; split the publish batch",
473                })?;
474            write_slot_unchecked(
475                ring_bytes,
476                view,
477                slot_idx,
478                tenant_id,
479                *opcode,
480                args.as_ref(),
481            )?;
482        }
483        let fence_slot = start_slot
484            .checked_add(item_count)
485            .ok_or(PipelineError::QueueFull {
486                queue: "submission",
487                fix: "batch publish fence slot overflowed u32; split the publish batch",
488            })?;
489        write_slot_unchecked(
490            ring_bytes,
491            view,
492            fence_slot,
493            tenant_id,
494            protocol::opcode::BATCH_FENCE,
495            &[item_count, batch_tag],
496        )?;
497        fence_slot
498            .checked_add(1)
499            .and_then(|end| end.checked_sub(start_slot))
500            .ok_or(PipelineError::QueueFull {
501                queue: "submission",
502                fix: "batch publish consumed-slot count overflowed u32; split the publish batch",
503            })
504    }
505}
506
507
508fn validate_publish_payload(opcode: u32, args: &[u32]) -> Result<(), PipelineError> {
509    if args.len() > ARGS_PER_SLOT_USIZE {
510        return Err(PipelineError::QueueFull {
511            queue: "submission",
512            fix: "too many args for one slot; 12 u32 args max per slot",
513        });
514    }
515    if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
516        return Err(PipelineError::QueueFull {
517            queue: "submission",
518            fix,
519        });
520    }
521    Ok(())
522}
523
524fn u32_to_usize(value: u32) -> Result<usize, PipelineError> {
525    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
526        queue: "submission",
527        fix: "u32 slot index cannot fit host usize; shard the megakernel ring for this target",
528    })
529}
530
531fn slot_base(slot_idx: u32, view: RingPublishView) -> Result<usize, PipelineError> {
532    u32_to_usize(slot_idx)?
533        .checked_mul(view.slot_bytes)
534        .ok_or(PipelineError::QueueFull {
535            queue: "submission",
536            fix: "slot byte offset overflowed usize; shard the ring before publishing",
537        })
538}
539
540fn validate_publishable_slot(
541    ring_bytes: &[u8],
542    view: RingPublishView,
543    slot_idx: u32,
544) -> Result<(), PipelineError> {
545    let base = slot_base(slot_idx, view)?;
546    let status_offset =
547        base.checked_add(STATUS_WORD_USIZE.checked_mul(4).ok_or(
548            PipelineError::QueueFull {
549                queue: "submission",
550                fix: "slot status word byte offset overflowed usize; keep SLOT_WORDS within the u32 ABI",
551            },
552        )?)
553        .ok_or(PipelineError::QueueFull {
554            queue: "submission",
555            fix: "slot status byte offset overflowed usize; shard the ring before publishing",
556        })?;
557    let status_bytes = ring_bytes
558        .get(status_offset..status_offset + 4)
559        .ok_or(PipelineError::QueueFull {
560            queue: "submission",
561            fix: "slot status is outside the validated ring buffer; validate ring length before publishing",
562        })?;
563    let current_status = u32::from_le_bytes([
564        status_bytes[0],
565        status_bytes[1],
566        status_bytes[2],
567        status_bytes[3],
568    ]);
569    if current_status != slot::EMPTY && current_status != slot::DONE {
570        return Err(PipelineError::QueueFull {
571            queue: "submission",
572            fix: "slot is not publishable; only EMPTY and DONE slots may be written by the host",
573        });
574    }
575    Ok(())
576}
577
578fn write_slot_unchecked(
579    ring_bytes: &mut [u8],
580    view: RingPublishView,
581    slot_idx: u32,
582    tenant_id: u32,
583    opcode: u32,
584    args: &[u32],
585) -> Result<(), PipelineError> {
586    let base = slot_base(slot_idx, view)?;
587    write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, opcode);
588    write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
589    write_slot_word(
590        ring_bytes,
591        base,
592        PRIORITY_WORD_USIZE,
593        scheduler::priority::NORMAL,
594    );
595    let args_start = base + ARG0_WORD_USIZE * 4;
596    let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
597    ring_bytes[args_start..args_end].fill(0);
598    for (index, arg) in args.iter().enumerate() {
599        write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + index, *arg);
600    }
601    write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
602    Ok(())
603}
604
605fn write_work_item_unchecked(
606    ring_bytes: &mut [u8],
607    view: RingPublishView,
608    slot_idx: u32,
609    tenant_id: u32,
610    item: &MegakernelWorkItem,
611) -> Result<(), PipelineError> {
612    let base = slot_base(slot_idx, view)?;
613    write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, item.op_handle);
614    write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
615    write_slot_word(
616        ring_bytes,
617        base,
618        PRIORITY_WORD_USIZE,
619        scheduler::priority::NORMAL,
620    );
621    let args_start = base + ARG0_WORD_USIZE * 4;
622    let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
623    ring_bytes[args_start..args_end].fill(0);
624    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE, item.input_handle);
625    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 1, item.output_handle);
626    write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 2, item.param);
627    write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
628    Ok(())
629}
630
631fn validate_work_item_batch(
632    slot_count: u32,
633    items: &[MegakernelWorkItem],
634) -> Result<(), PipelineError> {
635    let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
636        queue: "submission",
637        fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
638    })?;
639    if item_count > slot_count {
640        return Err(PipelineError::QueueFull {
641            queue: "submission",
642            fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
643        });
644    }
645    validate_work_items(items)
646}
647
648fn validate_work_items(items: &[MegakernelWorkItem]) -> Result<(), PipelineError> {
649    if ARGS_PER_SLOT_USIZE < 3 {
650        return Err(PipelineError::QueueFull {
651            queue: "submission",
652            fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
653        });
654    }
655    for item in items {
656        if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
657            return Err(PipelineError::QueueFull {
658                queue: "submission",
659                fix,
660            });
661        }
662    }
663    Ok(())
664}
665
666fn encoded_ring_word_count(slot_count: u32) -> Result<usize, PipelineError> {
667    if slot_count > protocol::MAX_ENCODED_RING_SLOTS {
668        return Err(PipelineError::QueueFull {
669            queue: "submission",
670            fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
671        });
672    }
673    let words = slot_count
674        .checked_mul(SLOT_WORDS)
675        .ok_or(PipelineError::QueueFull {
676            queue: "submission",
677            fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
678        })?;
679    usize::try_from(words).map_err(|_| PipelineError::QueueFull {
680        queue: "submission",
681        fix: "split the dispatch into smaller ring shards before encoding; ring word count does not fit usize",
682    })
683}
684
685fn write_slot_word(ring_bytes: &mut [u8], slot_base: usize, word_idx: usize, value: u32) {
686    let off = slot_base + word_idx * 4;
687    ring_bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
688}
689
690fn write_packed_metadata_byte(args: &mut [u32; ARGS_PER_SLOT_USIZE], byte_index: usize, value: u8) {
691    let word_index = byte_index / 4;
692    let shift = u32::try_from((byte_index % 4) * 8).unwrap_or_else(|source| {
693        panic!(
694            "packed metadata byte shift cannot fit u32: {source}. Fix: keep packed metadata byte indices within one u32 word lane."
695        )
696    });
697    args[word_index] |= u32::from(value) << shift;
698}
699