1use crate::megakernel::planner::MegakernelWorkItem;
2use crate::megakernel::protocol::{self, slot, SLOT_WORDS};
3use crate::megakernel::{scheduler, Megakernel, PackedOpDescriptor};
4use crate::PipelineError;
5
6const SLOT_WORDS_USIZE: usize = 16;
7const STATUS_WORD_USIZE: usize = 0;
8const OPCODE_WORD_USIZE: usize = 1;
9const TENANT_WORD_USIZE: usize = 2;
10const PRIORITY_WORD_USIZE: usize = 3;
11const ARG0_WORD_USIZE: usize = 4;
12const ARGS_PER_SLOT_USIZE: usize = 12;
13
14#[derive(Debug, Clone, Copy)]
15struct RingPublishView {
16 slot_bytes: usize,
17 slot_capacity: usize,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum RingSlotTransition {
23 Publish,
25 Claim,
27 Done,
29 Fault,
31 Cancel,
33}
34
35impl RingSlotTransition {
36 fn label(self) -> &'static str {
37 match self {
38 Self::Publish => "publish",
39 Self::Claim => "claim",
40 Self::Done => "done",
41 Self::Fault => "fault",
42 Self::Cancel => "cancel",
43 }
44 }
45
46 fn target_status(self) -> u32 {
47 match self {
48 Self::Publish => slot::PUBLISHED,
49 Self::Claim => slot::CLAIMED,
50 Self::Done => slot::DONE,
51 Self::Fault => slot::FAULT,
52 Self::Cancel => slot::EMPTY,
53 }
54 }
55
56 fn allows(self, current_status: u32) -> bool {
57 match self {
58 Self::Publish => matches!(current_status, slot::EMPTY | slot::DONE),
59 Self::Claim => matches!(
60 current_status,
61 slot::PUBLISHED | slot::YIELD | slot::REQUEUE
62 ),
63 Self::Done => current_status == slot::CLAIMED,
64 Self::Fault => matches!(
65 current_status,
66 slot::PUBLISHED | slot::CLAIMED | slot::WAIT_IO | slot::YIELD | slot::REQUEUE
67 ),
68 Self::Cancel => matches!(
69 current_status,
70 slot::PUBLISHED | slot::WAIT_IO | slot::YIELD | slot::REQUEUE
71 ),
72 }
73 }
74}
75
76fn validate_ring_publish_view(ring_bytes: &[u8]) -> Result<RingPublishView, PipelineError> {
77 let slot_bytes = SLOT_WORDS_USIZE
78 .checked_mul(4)
79 .ok_or(PipelineError::QueueFull {
80 queue: "submission",
81 fix: "slot byte width overflowed usize; keep SLOT_WORDS within the u32 ABI",
82 })?;
83 if ring_bytes.len() % slot_bytes != 0 {
84 return Err(PipelineError::QueueFull {
85 queue: "submission",
86 fix: "ring buffer byte length is not an exact multiple of SLOT_WORDS * 4; rebuild it with Megakernel::encode_empty_ring",
87 });
88 }
89 Ok(RingPublishView {
90 slot_bytes,
91 slot_capacity: ring_bytes.len() / slot_bytes,
92 })
93}
94
95impl Megakernel {
96 pub fn transition_slot_status(
109 ring_bytes: &mut [u8],
110 slot_idx: u32,
111 transition: RingSlotTransition,
112 ) -> Result<u32, PipelineError> {
113 if transition == RingSlotTransition::Publish {
114 return Err(PipelineError::QueueFull {
115 queue: "submission",
116 fix: "publish transitions must use publish_slot or a batch publisher so payload words are written before PUBLISHED status",
117 });
118 }
119 let view = validate_ring_publish_view(ring_bytes)?;
120 let current_status = read_slot_status_word(ring_bytes, view, slot_idx)?;
121 validate_slot_transition(current_status, transition)?;
122 write_slot_status_word(ring_bytes, view, slot_idx, transition.target_status())?;
123 Ok(current_status)
124 }
125
126 pub fn publish_slot(
133 ring_bytes: &mut [u8],
134 slot_idx: u32,
135 tenant_id: u32,
136 opcode: u32,
137 args: &[u32],
138 ) -> Result<(), PipelineError> {
139 let view = validate_ring_publish_view(ring_bytes)?;
140 Self::publish_slot_validated(ring_bytes, view, slot_idx, tenant_id, opcode, args)
141 }
142
143 pub fn encode_work_items_ring_into(
157 slot_count: u32,
158 tenant_id: u32,
159 items: &[MegakernelWorkItem],
160 ring_bytes: &mut Vec<u8>,
161 ) -> Result<(), PipelineError> {
162 let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
163 queue: "submission",
164 fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
165 })?;
166 if item_count > slot_count {
167 return Err(PipelineError::QueueFull {
168 queue: "submission",
169 fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
170 });
171 }
172 if ARGS_PER_SLOT_USIZE < 3 {
173 return Err(PipelineError::QueueFull {
174 queue: "submission",
175 fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
176 });
177 }
178 for item in items {
179 if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
180 return Err(PipelineError::QueueFull {
181 queue: "submission",
182 fix,
183 });
184 }
185 }
186
187 protocol::try_encode_empty_ring_into(slot_count, ring_bytes)
188 .map_err(super::protocol_error)?;
189 let view = validate_ring_publish_view(ring_bytes)?;
190 debug_assert!(items.len() <= view.slot_capacity);
191
192 for (slot_idx, item) in items.iter().enumerate() {
193 let slot_idx = u32::try_from(slot_idx).map_err(|_| PipelineError::QueueFull {
194 queue: "submission",
195 fix: "work item publish slot index exceeds u32::MAX; split the publish batch",
196 })?;
197 write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
198 }
199 Ok(())
200 }
201
202 pub fn publish_work_items(
218 ring_bytes: &mut [u8],
219 start_slot: u32,
220 tenant_id: u32,
221 items: &[MegakernelWorkItem],
222 ) -> Result<u32, PipelineError> {
223 validate_work_items(items)?;
224 let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
225 queue: "submission",
226 fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
227 })?;
228 let view = validate_ring_publish_view(ring_bytes)?;
229 let end_slot = start_slot
230 .checked_add(item_count)
231 .ok_or(PipelineError::QueueFull {
232 queue: "submission",
233 fix: "work item publish slot index overflowed u32; split the publish batch",
234 })?;
235 if u32_to_usize(end_slot)? > view.slot_capacity {
236 return Err(PipelineError::QueueFull {
237 queue: "submission",
238 fix:
239 "work item publish exceeds ring slot count; enlarge the ring or split the batch",
240 });
241 }
242 for slot_idx in start_slot..end_slot {
243 validate_publishable_slot(ring_bytes, view, slot_idx)?;
244 }
245 for (offset, item) in items.iter().enumerate() {
246 let slot_idx = start_slot
247 .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
248 queue: "submission",
249 fix: "work item publish offset exceeds u32::MAX; split the publish batch",
250 })?)
251 .ok_or(PipelineError::QueueFull {
252 queue: "submission",
253 fix: "work item publish slot index overflowed u32; split the publish batch",
254 })?;
255 write_work_item_unchecked(ring_bytes, view, slot_idx, tenant_id, item)?;
256 }
257 Ok(item_count)
258 }
259
260 pub fn encode_work_items_ring_words_into(
273 slot_count: u32,
274 tenant_id: u32,
275 items: &[MegakernelWorkItem],
276 ring_words: &mut Vec<u32>,
277 ) -> Result<(), PipelineError> {
278 validate_work_item_batch(slot_count, items)?;
279 let total_words = encoded_ring_word_count(slot_count)?;
280
281 if ring_words.len() != total_words {
282 ring_words.clear();
283 ring_words.resize(total_words, 0);
284 } else {
285 let slot_count = u32_to_usize(slot_count)?;
286 for slot_idx in items.len()..slot_count {
287 ring_words[slot_idx * SLOT_WORDS_USIZE + STATUS_WORD_USIZE] = slot::EMPTY;
288 }
289 }
290
291 for (slot_idx, item) in items.iter().enumerate() {
292 let base = slot_idx * SLOT_WORDS_USIZE;
293 ring_words[base + OPCODE_WORD_USIZE] = item.op_handle;
294 ring_words[base + TENANT_WORD_USIZE] = tenant_id;
295 ring_words[base + PRIORITY_WORD_USIZE] = scheduler::priority::NORMAL;
296 ring_words[base + ARG0_WORD_USIZE] = item.input_handle;
297 ring_words[base + ARG0_WORD_USIZE + 1] = item.output_handle;
298 ring_words[base + ARG0_WORD_USIZE + 2] = item.param;
299 ring_words[base + STATUS_WORD_USIZE] = slot::PUBLISHED;
300 }
301 Ok(())
302 }
303
304 fn publish_slot_validated(
305 ring_bytes: &mut [u8],
306 view: RingPublishView,
307 slot_idx: u32,
308 tenant_id: u32,
309 opcode: u32,
310 args: &[u32],
311 ) -> Result<(), PipelineError> {
312 if u32_to_usize(slot_idx)? >= view.slot_capacity {
313 return Err(PipelineError::QueueFull {
314 queue: "submission",
315 fix: "slot_idx exceeds ring slot count; enlarge the ring via encode_empty_ring",
316 });
317 }
318 if args.len() > ARGS_PER_SLOT_USIZE {
319 return Err(PipelineError::QueueFull {
320 queue: "submission",
321 fix: "too many args for one slot; 12 u32 args max per slot",
322 });
323 }
324 if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
325 return Err(PipelineError::QueueFull {
326 queue: "submission",
327 fix,
328 });
329 }
330
331 let base = slot_base(slot_idx, view)?;
332 let read_word = |buf: &[u8], word_idx: usize| -> Result<u32, PipelineError> {
333 let off = base + word_idx * 4;
334 let bytes = buf.get(off..off + 4).ok_or(PipelineError::QueueFull {
335 queue: "submission",
336 fix: "slot word is outside the validated ring buffer; validate ring length before publishing",
337 })?;
338 let mut word = [0u8; 4];
339 word.copy_from_slice(bytes);
340 Ok(u32::from_le_bytes(word))
341 };
342
343 let current_status = read_word(ring_bytes, STATUS_WORD_USIZE)?;
344 validate_slot_transition(current_status, RingSlotTransition::Publish)?;
345
346 let write_word = |buf: &mut [u8], word_idx: usize, value: u32| {
347 let off = base + word_idx * 4;
348 buf[off..off + 4].copy_from_slice(&value.to_le_bytes());
349 };
350
351 write_word(ring_bytes, OPCODE_WORD_USIZE, opcode);
352 write_word(ring_bytes, TENANT_WORD_USIZE, tenant_id);
353 write_word(ring_bytes, PRIORITY_WORD_USIZE, scheduler::priority::NORMAL);
354 let args_start = base + ARG0_WORD_USIZE * 4;
355 let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
356 ring_bytes[args_start..args_end].fill(0);
357 for (i, arg) in args.iter().enumerate() {
358 write_word(ring_bytes, ARG0_WORD_USIZE + i, *arg);
359 }
360 write_word(ring_bytes, STATUS_WORD_USIZE, slot::PUBLISHED);
362
363 Ok(())
364 }
365
366 pub fn publish_packed_slot<A>(
376 ring_bytes: &mut [u8],
377 slot_idx: u32,
378 tenant_id: u32,
379 ops: &[(u8, A)],
380 ) -> Result<(), PipelineError>
381 where
382 A: AsRef<[u32]>,
383 {
384 Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
385 let (op_id, args) = &ops[index];
386 (*op_id, args.as_ref())
387 })
388 }
389
390 pub(crate) fn publish_packed_descriptors(
391 ring_bytes: &mut [u8],
392 slot_idx: u32,
393 tenant_id: u32,
394 ops: &[PackedOpDescriptor],
395 ) -> Result<(), PipelineError> {
396 Self::publish_packed_slot_from(ring_bytes, slot_idx, tenant_id, ops.len(), |index| {
397 let op = &ops[index];
398 (op.opcode, op.args.as_slice())
399 })
400 }
401
402 fn publish_packed_slot_from<'a>(
403 ring_bytes: &mut [u8],
404 slot_idx: u32,
405 tenant_id: u32,
406 op_count: usize,
407 mut op_at: impl FnMut(usize) -> (u8, &'a [u32]),
408 ) -> Result<(), PipelineError> {
409 let opcode_count = u8::try_from(op_count).map_err(|_| PipelineError::QueueFull {
410 queue: "submission",
411 fix: "packed slot supports at most 255 inner opcodes",
412 })?;
413 let metadata_bytes = op_count
414 .checked_mul(2)
415 .and_then(|bytes| bytes.checked_add(2))
416 .ok_or(PipelineError::QueueFull {
417 queue: "submission",
418 fix: "packed slot metadata length overflowed usize; reduce packed opcode count",
419 })?;
420 let metadata_words = metadata_bytes.div_ceil(4);
421 if metadata_words > ARGS_PER_SLOT_USIZE {
422 return Err(PipelineError::QueueFull {
423 queue: "submission",
424 fix: "packed slot metadata exceeds the 12-word slot argument budget",
425 });
426 }
427
428 let mut packed_args = [0u32; ARGS_PER_SLOT_USIZE];
429 let mut packed_arg_words = 0usize;
430 let mut args = [0u32; ARGS_PER_SLOT_USIZE];
431 write_packed_metadata_byte(&mut args, 0, opcode_count);
432 let metadata_payload_bytes =
433 metadata_words
434 .checked_mul(4)
435 .ok_or(PipelineError::QueueFull {
436 queue: "submission",
437 fix:
438 "packed slot metadata byte length overflowed usize; reduce packed opcode count",
439 })?;
440 for index in 0..op_count {
441 let arg_offset =
442 u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
443 queue: "submission",
444 fix: "packed slot arg offsets must fit in one u8 word index",
445 })?;
446 let (op_id, op_args) = op_at(index);
447 let end =
448 packed_arg_words
449 .checked_add(op_args.len())
450 .ok_or(PipelineError::QueueFull {
451 queue: "submission",
452 fix: "packed slot arg word count overflowed usize; reduce packed args",
453 })?;
454 let total_words = metadata_words
455 .checked_add(end)
456 .ok_or(PipelineError::QueueFull {
457 queue: "submission",
458 fix: "packed slot total word count overflowed usize; reduce packed args",
459 })?;
460 if total_words > ARGS_PER_SLOT_USIZE {
461 return Err(PipelineError::QueueFull {
462 queue: "submission",
463 fix: "packed slot payload exceeds the 12-word slot argument budget",
464 });
465 }
466 packed_args[packed_arg_words..end].copy_from_slice(op_args);
467 packed_arg_words = end;
468
469 let byte_index = 2 + index * 2;
470 debug_assert!(byte_index + 1 < metadata_payload_bytes);
471 write_packed_metadata_byte(&mut args, byte_index, op_id);
472 write_packed_metadata_byte(&mut args, byte_index + 1, arg_offset);
473 }
474
475 let packed_arg_words_u8 =
481 u8::try_from(packed_arg_words).map_err(|_| PipelineError::QueueFull {
482 queue: "submission",
483 fix: "packed slot total arg words must fit in one u8",
484 })?;
485 write_packed_metadata_byte(&mut args, 1, packed_arg_words_u8);
486 let total_words = metadata_words + packed_arg_words;
487 args[metadata_words..total_words].copy_from_slice(&packed_args[..packed_arg_words]);
488 Self::publish_slot(
489 ring_bytes,
490 slot_idx,
491 tenant_id,
492 protocol::opcode::PACKED_SLOT,
493 &args[..total_words],
494 )
495 }
496
497 pub fn batch_publish<A>(
506 ring_bytes: &mut [u8],
507 start_slot: u32,
508 tenant_id: u32,
509 items: &[(u32, A)], batch_tag: u32,
511 ) -> Result<u32, PipelineError>
512 where
513 A: AsRef<[u32]>,
514 {
515 let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
516 queue: "submission",
517 fix: "batch item count exceeds u32::MAX; split the publish batch",
518 })?;
519 let view = validate_ring_publish_view(ring_bytes)?;
520 let total_slots = item_count.checked_add(1).ok_or(PipelineError::QueueFull {
521 queue: "submission",
522 fix: "batch publish slot count overflowed u32; split the publish batch",
523 })?;
524 let end_slot = start_slot
525 .checked_add(total_slots)
526 .ok_or(PipelineError::QueueFull {
527 queue: "submission",
528 fix: "batch publish slot index overflowed u32; split the publish batch",
529 })?;
530 if u32_to_usize(end_slot)? > view.slot_capacity {
531 return Err(PipelineError::QueueFull {
532 queue: "submission",
533 fix: "batch publish exceeds ring slot count; enlarge the ring or split the batch",
534 });
535 }
536 for (opcode, args) in items {
537 validate_publish_payload(*opcode, args.as_ref())?;
538 }
539 validate_publish_payload(protocol::opcode::BATCH_FENCE, &[item_count, batch_tag])?;
540 for slot_idx in start_slot..end_slot {
541 validate_publishable_slot(ring_bytes, view, slot_idx)?;
542 }
543
544 for (offset, (opcode, args)) in items.iter().enumerate() {
545 let slot_idx = start_slot
546 .checked_add(u32::try_from(offset).map_err(|_| PipelineError::QueueFull {
547 queue: "submission",
548 fix: "batch publish offset exceeds u32::MAX; split the publish batch",
549 })?)
550 .ok_or(PipelineError::QueueFull {
551 queue: "submission",
552 fix: "batch publish slot index overflowed u32; split the publish batch",
553 })?;
554 write_slot_unchecked(
555 ring_bytes,
556 view,
557 slot_idx,
558 tenant_id,
559 *opcode,
560 args.as_ref(),
561 )?;
562 }
563 let fence_slot = start_slot
564 .checked_add(item_count)
565 .ok_or(PipelineError::QueueFull {
566 queue: "submission",
567 fix: "batch publish fence slot overflowed u32; split the publish batch",
568 })?;
569 write_slot_unchecked(
570 ring_bytes,
571 view,
572 fence_slot,
573 tenant_id,
574 protocol::opcode::BATCH_FENCE,
575 &[item_count, batch_tag],
576 )?;
577 fence_slot
578 .checked_add(1)
579 .and_then(|end| end.checked_sub(start_slot))
580 .ok_or(PipelineError::QueueFull {
581 queue: "submission",
582 fix: "batch publish consumed-slot count overflowed u32; split the publish batch",
583 })
584 }
585}
586
587fn validate_publish_payload(opcode: u32, args: &[u32]) -> Result<(), PipelineError> {
588 if args.len() > ARGS_PER_SLOT_USIZE {
589 return Err(PipelineError::QueueFull {
590 queue: "submission",
591 fix: "too many args for one slot; 12 u32 args max per slot",
592 });
593 }
594 if let Err(fix) = protocol::opcode::validate_publish_opcode(opcode) {
595 return Err(PipelineError::QueueFull {
596 queue: "submission",
597 fix,
598 });
599 }
600 Ok(())
601}
602
603fn u32_to_usize(value: u32) -> Result<usize, PipelineError> {
604 usize::try_from(value).map_err(|_| PipelineError::QueueFull {
605 queue: "submission",
606 fix: "u32 slot index cannot fit host usize; shard the megakernel ring for this target",
607 })
608}
609
610fn slot_base(slot_idx: u32, view: RingPublishView) -> Result<usize, PipelineError> {
611 u32_to_usize(slot_idx)?
612 .checked_mul(view.slot_bytes)
613 .ok_or(PipelineError::QueueFull {
614 queue: "submission",
615 fix: "slot byte offset overflowed usize; shard the ring before publishing",
616 })
617}
618
619fn validate_publishable_slot(
620 ring_bytes: &[u8],
621 view: RingPublishView,
622 slot_idx: u32,
623) -> Result<(), PipelineError> {
624 let current_status = read_slot_status_word(ring_bytes, view, slot_idx)?;
625 validate_slot_transition(current_status, RingSlotTransition::Publish)
626}
627
628fn validate_slot_transition(
629 current_status: u32,
630 transition: RingSlotTransition,
631) -> Result<(), PipelineError> {
632 if transition.allows(current_status) {
633 return Ok(());
634 }
635 Err(PipelineError::QueueFull {
636 queue: "submission",
637 fix: illegal_transition_fix(transition, current_status),
638 })
639}
640
641fn illegal_transition_fix(transition: RingSlotTransition, _current_status: u32) -> &'static str {
642 match transition {
643 RingSlotTransition::Publish => {
644 "slot is not publishable; only EMPTY and DONE slots may be written by the host"
645 }
646 RingSlotTransition::Claim => {
647 "illegal ring slot transition: claim requires PUBLISHED, YIELD, or REQUEUE status"
648 }
649 RingSlotTransition::Done => "illegal ring slot transition: done requires CLAIMED status",
650 RingSlotTransition::Fault => {
651 "illegal ring slot transition: fault requires an in-flight slot status"
652 }
653 RingSlotTransition::Cancel => {
654 "illegal ring slot transition: cancel requires an unclaimed in-flight slot status"
655 }
656 }
657}
658
659fn slot_status_offset(slot_idx: u32, view: RingPublishView) -> Result<usize, PipelineError> {
660 let base = slot_base(slot_idx, view)?;
661 base.checked_add(
662 STATUS_WORD_USIZE
663 .checked_mul(4)
664 .ok_or(PipelineError::QueueFull {
665 queue: "submission",
666 fix:
667 "slot status word byte offset overflowed usize; keep SLOT_WORDS within the u32 ABI",
668 })?,
669 )
670 .ok_or(PipelineError::QueueFull {
671 queue: "submission",
672 fix: "slot status byte offset overflowed usize; shard the ring before publishing",
673 })
674}
675
676fn read_slot_status_word(
677 ring_bytes: &[u8],
678 view: RingPublishView,
679 slot_idx: u32,
680) -> Result<u32, PipelineError> {
681 let status_offset = slot_status_offset(slot_idx, view)?;
682 let status_end = status_offset
683 .checked_add(4)
684 .ok_or(PipelineError::QueueFull {
685 queue: "submission",
686 fix: "slot status byte end overflowed usize; shard the ring before publishing",
687 })?;
688 let status_bytes = ring_bytes
689 .get(status_offset..status_end)
690 .ok_or(PipelineError::QueueFull {
691 queue: "submission",
692 fix: "slot status is outside the validated ring buffer; validate ring length before publishing",
693 })?;
694 Ok(u32::from_le_bytes([
695 status_bytes[0],
696 status_bytes[1],
697 status_bytes[2],
698 status_bytes[3],
699 ]))
700}
701
702fn write_slot_status_word(
703 ring_bytes: &mut [u8],
704 view: RingPublishView,
705 slot_idx: u32,
706 value: u32,
707) -> Result<(), PipelineError> {
708 let status_offset = slot_status_offset(slot_idx, view)?;
709 let status_end = status_offset
710 .checked_add(4)
711 .ok_or(PipelineError::QueueFull {
712 queue: "submission",
713 fix: "slot status byte end overflowed usize; shard the ring before publishing",
714 })?;
715 let status_bytes = ring_bytes
716 .get_mut(status_offset..status_end)
717 .ok_or(PipelineError::QueueFull {
718 queue: "submission",
719 fix: "slot status is outside the validated ring buffer; validate ring length before publishing",
720 })?;
721 status_bytes.copy_from_slice(&value.to_le_bytes());
722 Ok(())
723}
724
725fn write_slot_unchecked(
726 ring_bytes: &mut [u8],
727 view: RingPublishView,
728 slot_idx: u32,
729 tenant_id: u32,
730 opcode: u32,
731 args: &[u32],
732) -> Result<(), PipelineError> {
733 let base = slot_base(slot_idx, view)?;
734 write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, opcode);
735 write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
736 write_slot_word(
737 ring_bytes,
738 base,
739 PRIORITY_WORD_USIZE,
740 scheduler::priority::NORMAL,
741 );
742 let args_start = base + ARG0_WORD_USIZE * 4;
743 let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
744 ring_bytes[args_start..args_end].fill(0);
745 for (index, arg) in args.iter().enumerate() {
746 write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + index, *arg);
747 }
748 write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
749 Ok(())
750}
751
752fn write_work_item_unchecked(
753 ring_bytes: &mut [u8],
754 view: RingPublishView,
755 slot_idx: u32,
756 tenant_id: u32,
757 item: &MegakernelWorkItem,
758) -> Result<(), PipelineError> {
759 let base = slot_base(slot_idx, view)?;
760 write_slot_word(ring_bytes, base, OPCODE_WORD_USIZE, item.op_handle);
761 write_slot_word(ring_bytes, base, TENANT_WORD_USIZE, tenant_id);
762 write_slot_word(
763 ring_bytes,
764 base,
765 PRIORITY_WORD_USIZE,
766 scheduler::priority::NORMAL,
767 );
768 let args_start = base + ARG0_WORD_USIZE * 4;
769 let args_end = args_start + ARGS_PER_SLOT_USIZE * 4;
770 ring_bytes[args_start..args_end].fill(0);
771 write_slot_word(ring_bytes, base, ARG0_WORD_USIZE, item.input_handle);
772 write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 1, item.output_handle);
773 write_slot_word(ring_bytes, base, ARG0_WORD_USIZE + 2, item.param);
774 write_slot_word(ring_bytes, base, STATUS_WORD_USIZE, slot::PUBLISHED);
775 Ok(())
776}
777
778fn validate_work_item_batch(
779 slot_count: u32,
780 items: &[MegakernelWorkItem],
781) -> Result<(), PipelineError> {
782 let item_count = u32::try_from(items.len()).map_err(|_| PipelineError::QueueFull {
783 queue: "submission",
784 fix: "work item count exceeds u32::MAX; shard the megakernel queue before publishing",
785 })?;
786 if item_count > slot_count {
787 return Err(PipelineError::QueueFull {
788 queue: "submission",
789 fix: "work item count exceeds ring slot count; enlarge the launch geometry before publishing",
790 });
791 }
792 validate_work_items(items)
793}
794
795fn validate_work_items(items: &[MegakernelWorkItem]) -> Result<(), PipelineError> {
796 if ARGS_PER_SLOT_USIZE < 3 {
797 return Err(PipelineError::QueueFull {
798 queue: "submission",
799 fix: "MegakernelWorkItem publication requires three argument words; increase ARGS_PER_SLOT",
800 });
801 }
802 for item in items {
803 if let Err(fix) = protocol::opcode::validate_publish_opcode(item.op_handle) {
804 return Err(PipelineError::QueueFull {
805 queue: "submission",
806 fix,
807 });
808 }
809 }
810 Ok(())
811}
812
813fn encoded_ring_word_count(slot_count: u32) -> Result<usize, PipelineError> {
814 if slot_count > protocol::MAX_ENCODED_RING_SLOTS {
815 return Err(PipelineError::QueueFull {
816 queue: "submission",
817 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
818 });
819 }
820 let words = slot_count
821 .checked_mul(SLOT_WORDS)
822 .ok_or(PipelineError::QueueFull {
823 queue: "submission",
824 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
825 })?;
826 usize::try_from(words).map_err(|_| PipelineError::QueueFull {
827 queue: "submission",
828 fix: "split the dispatch into smaller ring shards before encoding; ring word count does not fit usize",
829 })
830}
831
832fn write_slot_word(ring_bytes: &mut [u8], slot_base: usize, word_idx: usize, value: u32) {
833 let off = slot_base + word_idx * 4;
834 ring_bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
835}
836
837fn write_packed_metadata_byte(args: &mut [u32; ARGS_PER_SLOT_USIZE], byte_index: usize, value: u8) {
838 let word_index = byte_index / 4;
839 let shift = ((byte_index % 4) * 8) as u32;
840 args[word_index] |= u32::from(value) << shift;
841}