1use super::{
2 control, debug, slot, DebugRecord, ProtocolError, CONTROL_MIN_WORDS, MAX_DEBUG_RECORDS,
3 MAX_ENCODED_DEBUG_RECORDS, MAX_ENCODED_OBSERVABLE_SLOTS, MAX_ENCODED_RING_SLOTS,
4 MAX_OBSERVABLE_SLOTS, MAX_RING_SLOTS, SLOT_WORDS, STATUS_WORD,
5};
6
7#[must_use]
9pub fn control_byte_len(observable_slots: u32) -> Option<usize> {
10 if observable_slots > MAX_OBSERVABLE_SLOTS {
11 return None;
12 }
13 let words = control::OBSERVABLE_BASE.checked_add(observable_slots)?;
14 words_to_bytes(words.max(CONTROL_MIN_WORDS))
15}
16
17#[must_use]
19pub fn ring_byte_len(slot_count: u32) -> Option<usize> {
20 if slot_count > MAX_RING_SLOTS {
21 return None;
22 }
23 let words = slot_count.checked_mul(SLOT_WORDS)?;
24 words_to_bytes(words)
25}
26
27#[must_use]
29pub fn debug_log_byte_len(record_capacity: u32) -> Option<usize> {
30 if record_capacity > MAX_DEBUG_RECORDS {
31 return None;
32 }
33 let record_words = record_capacity.checked_mul(debug::RECORD_WORDS)?;
34 let words = debug::RECORDS_BASE.checked_add(record_words)?;
35 words_to_bytes(words)
36}
37
38fn control_encode_capacity(observable_slots: u32) -> Result<usize, ProtocolError> {
39 if observable_slots > MAX_ENCODED_OBSERVABLE_SLOTS {
40 return Err(ProtocolError::ByteLengthOverflow {
41 buffer: "control",
42 fix: "shard observable results or reduce observable_slots to the megakernel allocation cap before encoding control",
43 });
44 }
45 control_byte_len(observable_slots).ok_or(ProtocolError::ByteLengthOverflow {
46 buffer: "control",
47 fix: "shard observable results or reduce observable_slots to the megakernel protocol cap before encoding control",
48 })
49}
50
51fn ring_encode_capacity(slot_count: u32) -> Result<usize, ProtocolError> {
52 if slot_count > MAX_ENCODED_RING_SLOTS {
53 return Err(ProtocolError::ByteLengthOverflow {
54 buffer: "ring",
55 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
56 });
57 }
58 ring_byte_len(slot_count).ok_or(ProtocolError::ByteLengthOverflow {
59 buffer: "ring",
60 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
61 })
62}
63
64fn debug_log_encode_capacity(record_capacity: u32) -> Result<usize, ProtocolError> {
65 if record_capacity > MAX_ENCODED_DEBUG_RECORDS {
66 return Err(ProtocolError::ByteLengthOverflow {
67 buffer: "debug_log",
68 fix:
69 "reduce debug-log record_capacity to the megakernel allocation cap before encoding",
70 });
71 }
72 debug_log_byte_len(record_capacity).ok_or(ProtocolError::ByteLengthOverflow {
73 buffer: "debug_log",
74 fix: "reduce debug-log record_capacity to the megakernel protocol cap before encoding",
75 })
76}
77
78pub fn encode_control(
85 shutdown: bool,
86 tenant_count: u32,
87 observable_slots: u32,
88) -> Result<Vec<u8>, ProtocolError> {
89 try_encode_control(shutdown, tenant_count, observable_slots)
90}
91
92pub fn try_encode_control(
99 shutdown: bool,
100 tenant_count: u32,
101 observable_slots: u32,
102) -> Result<Vec<u8>, ProtocolError> {
103 let total_bytes = control_encode_capacity(observable_slots)?;
104 let mut bytes = Vec::new();
105 try_reserve_protocol_capacity(
106 &mut bytes,
107 total_bytes,
108 "control",
109 "control encode could not reserve host staging bytes; reduce observable_slots or reuse a preallocated control buffer",
110 )?;
111 try_encode_control_into(shutdown, tenant_count, observable_slots, &mut bytes)?;
112 Ok(bytes)
113}
114
115pub fn try_encode_control_into(
125 shutdown: bool,
126 tenant_count: u32,
127 observable_slots: u32,
128 dst: &mut Vec<u8>,
129) -> Result<(), ProtocolError> {
130 let total_bytes = control_encode_capacity(observable_slots)?;
131 dst.clear();
132 try_reserve_protocol_capacity(
133 dst,
134 total_bytes,
135 "control",
136 "control encode could not reserve caller-owned staging bytes; reduce observable_slots or reuse a larger control buffer",
137 )?;
138 dst.resize(total_bytes, 0);
139
140 if shutdown {
141 write_word(
142 dst,
143 control_word_index(control::SHUTDOWN, "shutdown word")?,
144 1,
145 );
146 }
147 write_word(
148 dst,
149 control_word_index(control::TENANT_BASE, "tenant base word")?,
150 control::TENANT_BASE + 1,
151 );
152
153 let tenant_table_start = control_word_index(control::TENANT_BASE, "tenant base word")?
154 .checked_add(1)
155 .ok_or(ProtocolError::ByteLengthOverflow {
156 buffer: "control",
157 fix: "tenant table start overflowed usize; reduce control protocol constants",
158 })?;
159 let requested_tenant_words =
160 usize::try_from(tenant_count).map_err(|_| ProtocolError::ByteLengthOverflow {
161 buffer: "control",
162 fix: "tenant_count cannot fit host usize; split tenant tables before encoding",
163 })?;
164 let tenant_table_end = core::cmp::min(
165 tenant_table_start
166 .checked_add(requested_tenant_words)
167 .ok_or(ProtocolError::ByteLengthOverflow {
168 buffer: "control",
169 fix: "tenant table end overflowed usize; split tenant tables before encoding",
170 })?,
171 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?,
172 );
173 for word_idx in tenant_table_start..tenant_table_end {
174 write_word(dst, word_idx, !0u32);
175 }
176
177 let quota_table_start =
178 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?;
179 let quota_table_end = core::cmp::min(
180 quota_table_start
181 .checked_add(requested_tenant_words)
182 .ok_or(ProtocolError::ByteLengthOverflow {
183 buffer: "control",
184 fix: "quota table end overflowed usize; split tenant tables before encoding",
185 })?,
186 control_word_index(control::TENANT_FAIRNESS_BASE, "tenant fairness base word")?,
187 );
188 for word_idx in quota_table_start..quota_table_end {
189 write_word(dst, word_idx, 1_000_000);
190 }
191 Ok(())
192}
193
194pub fn encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
201 try_encode_empty_ring(slot_count)
202}
203
204pub fn try_encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
211 let total_bytes = ring_encode_capacity(slot_count)?;
212 let mut bytes = Vec::new();
213 try_reserve_protocol_capacity(
214 &mut bytes,
215 total_bytes,
216 "ring",
217 "ring encode could not reserve host staging bytes; split the dispatch into smaller ring shards or reuse a preallocated ring buffer",
218 )?;
219 try_encode_empty_ring_into(slot_count, &mut bytes)?;
220 Ok(bytes)
221}
222
223pub fn try_encode_empty_ring_into(slot_count: u32, dst: &mut Vec<u8>) -> Result<(), ProtocolError> {
232 let total_bytes = ring_encode_capacity(slot_count)?;
233 dst.clear();
234 try_reserve_protocol_capacity(
235 dst,
236 total_bytes,
237 "ring",
238 "ring encode could not reserve caller-owned staging bytes; split the dispatch into smaller ring shards or reuse a larger ring buffer",
239 )?;
240 dst.resize(total_bytes, 0);
241 Ok(())
242}
243
244pub fn encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
251 try_encode_empty_debug_log(record_capacity)
252}
253
254pub fn try_encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
261 let total_bytes = debug_log_encode_capacity(record_capacity)?;
262 let mut bytes = Vec::new();
263 try_reserve_protocol_capacity(
264 &mut bytes,
265 total_bytes,
266 "debug_log",
267 "debug-log encode could not reserve host staging bytes; reduce record_capacity or reuse a preallocated debug-log buffer",
268 )?;
269 try_encode_empty_debug_log_into(record_capacity, &mut bytes)?;
270 Ok(bytes)
271}
272
273pub fn try_encode_empty_debug_log_into(
282 record_capacity: u32,
283 dst: &mut Vec<u8>,
284) -> Result<(), ProtocolError> {
285 let total_bytes = debug_log_encode_capacity(record_capacity)?;
286 dst.clear();
287 try_reserve_protocol_capacity(
288 dst,
289 total_bytes,
290 "debug_log",
291 "debug-log encode could not reserve caller-owned staging bytes; reduce record_capacity or reuse a larger debug-log buffer",
292 )?;
293 dst.resize(total_bytes, 0);
294 Ok(())
295}
296
297#[must_use]
299pub fn read_done_count(control_bytes: &[u8]) -> u32 {
300 try_read_done_count(control_bytes).unwrap_or_else(|source| {
301 panic!(
302 "megakernel control done_count decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
303 )
304 })
305}
306
307#[must_use]
309pub fn read_epoch(control_bytes: &[u8]) -> u32 {
310 try_read_epoch(control_bytes).unwrap_or_else(|source| {
311 panic!(
312 "megakernel control epoch decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
313 )
314 })
315}
316
317pub fn try_read_done_count(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
324 read_required_word(
325 "control",
326 control_bytes,
327 control_word_index(control::DONE_COUNT, "done-count word")?,
328 )
329}
330
331pub fn try_read_epoch(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
338 read_required_word(
339 "control",
340 control_bytes,
341 control_word_index(control::EPOCH, "epoch word")?,
342 )
343}
344
345#[must_use]
347pub fn read_observable(control_bytes: &[u8], index: u32) -> u32 {
348 try_read_observable(control_bytes, index).unwrap_or(0)
349}
350
351pub fn try_read_observable(control_bytes: &[u8], index: u32) -> Result<u32, ProtocolError> {
358 let word_idx = control_word_index(
359 control::OBSERVABLE_BASE
360 .checked_add(index)
361 .ok_or(ProtocolError::ByteLengthOverflow {
362 buffer: "control",
363 fix: "observable index overflows the protocol word offset; shard observable reads",
364 })?,
365 "observable word index",
366 )?;
367 read_required_word("control", control_bytes, word_idx)
368}
369
370#[must_use]
372pub fn read_metrics(control_bytes: &[u8]) -> Vec<(u32, u32)> {
373 let mut result = Vec::new();
374 read_metrics_into(control_bytes, &mut result);
375 result
376}
377
378pub fn read_metrics_into(control_bytes: &[u8], out: &mut Vec<(u32, u32)>) {
382 out.clear();
383 let Ok(metrics_base) = control_word_index(control::METRICS_BASE, "metrics base word") else {
384 return;
385 };
386 let available_words = control_bytes.len() / 4;
387 if available_words <= metrics_base {
388 return;
389 }
390 let available_slots = (available_words - metrics_base).min(control::METRICS_SLOTS as usize);
391 let nonzero = count_nonzero_metrics_truncated(control_bytes, metrics_base, available_slots);
392 if try_reserve_target_capacity(out, nonzero).is_err() {
393 return;
394 }
395 for slot in 0..available_slots {
396 let word_idx = metrics_base + slot;
397 let Some(count) = read_word_unaligned(control_bytes, word_idx) else {
398 break;
399 };
400 if count > 0 {
401 out.push((slot as u32, count));
402 }
403 }
404}
405
406pub fn try_read_metrics(control_bytes: &[u8]) -> Result<Vec<(u32, u32)>, ProtocolError> {
413 let mut result = Vec::new();
414 try_read_metrics_into(control_bytes, &mut result)?;
415 Ok(result)
416}
417
418pub fn try_read_metrics_into(
427 control_bytes: &[u8],
428 out: &mut Vec<(u32, u32)>,
429) -> Result<(), ProtocolError> {
430 validate_word_aligned("control", control_bytes)?;
431 out.clear();
432 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(control_bytes) {
433 try_reserve_target_capacity(
434 out,
435 count_nonzero_metrics_words_strict(words, control_bytes.len())?,
436 )?;
437 for i in 0..control::METRICS_SLOTS {
438 let word_idx = metrics_word_index(i)?;
439 let count =
440 words
441 .get(word_idx)
442 .copied()
443 .map(u32::from_le)
444 .ok_or(ProtocolError::MissingWord {
445 buffer: "control",
446 word_idx,
447 byte_len: control_bytes.len(),
448 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
449 })?;
450 if count > 0 {
451 out.push((i, count));
452 }
453 }
454 return Ok(());
455 }
456 try_reserve_target_capacity(out, count_nonzero_metrics_unaligned_strict(control_bytes)?)?;
457 for i in 0..control::METRICS_SLOTS {
458 let word_idx = metrics_word_index(i)?;
459 let count = read_word_unaligned(control_bytes, word_idx)
460 .ok_or(ProtocolError::MissingWord {
461 buffer: "control",
462 word_idx,
463 byte_len: control_bytes.len(),
464 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
465 })?;
466 if count > 0 {
467 out.push((i, count));
468 }
469 }
470 Ok(())
471}
472
473mod debug_log;
474
475pub use debug_log::{
476 read_debug_log, read_debug_log_into, try_read_debug_log, try_read_debug_log_into,
477};
478
479#[must_use]
485pub fn count_done_ring_slots(ring_bytes: &[u8], item_count: usize) -> Option<u64> {
486 if item_count == 0 {
487 return None;
488 }
489 let slot_words = usize::try_from(SLOT_WORDS).ok()?;
490 let required_bytes = item_count.checked_mul(slot_words)?.checked_mul(4)?;
491 if ring_bytes.len() < required_bytes {
492 return None;
493 }
494 let status_word = usize::try_from(STATUS_WORD).ok()?;
495 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
496 let done = (0..item_count)
497 .filter(|slot_idx| {
498 let word_idx = slot_idx
499 .checked_mul(slot_words)
500 .and_then(|base| base.checked_add(status_word));
501 word_idx.and_then(|idx| read_word_from_optional_words(words, ring_bytes, idx))
502 == Some(slot::DONE)
503 })
504 .count();
505 u64::try_from(done).ok()
506}
507
508pub fn try_count_done_ring_slots(
516 ring_bytes: &[u8],
517 item_count: usize,
518) -> Result<u64, ProtocolError> {
519 if item_count == 0 {
520 return Ok(0);
521 }
522 validate_word_aligned("ring", ring_bytes)?;
523 let slot_words =
524 usize::try_from(SLOT_WORDS).map_err(|_| ProtocolError::ByteLengthOverflow {
525 buffer: "ring",
526 fix: "keep SLOT_WORDS representable in host usize before decoding ring status",
527 })?;
528 let required_bytes = item_count
529 .checked_mul(slot_words)
530 .and_then(|words| words.checked_mul(4))
531 .ok_or(ProtocolError::ByteLengthOverflow {
532 buffer: "ring",
533 fix: "split the dispatch before ring status decode overflows host address space",
534 })?;
535 if ring_bytes.len() < required_bytes {
536 return Err(ProtocolError::MissingWord {
537 buffer: "ring",
538 word_idx: required_bytes / 4,
539 byte_len: ring_bytes.len(),
540 fix: "decode only full ring readbacks sized for the submitted megakernel item_count",
541 });
542 }
543 let status_word =
544 usize::try_from(STATUS_WORD).map_err(|_| ProtocolError::ByteLengthOverflow {
545 buffer: "ring",
546 fix: "keep STATUS_WORD representable in host usize before decoding ring status",
547 })?;
548 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
549 let mut done = 0_u64;
550 for slot_idx in 0..item_count {
551 let word_idx = slot_idx
552 .checked_mul(slot_words)
553 .and_then(|base| base.checked_add(status_word))
554 .ok_or(ProtocolError::ByteLengthOverflow {
555 buffer: "ring",
556 fix: "split the dispatch before ring status word indexing overflows host address space",
557 })?;
558 if read_word_from_optional_words(words, ring_bytes, word_idx) == Some(slot::DONE) {
559 done = done
560 .checked_add(1)
561 .ok_or(ProtocolError::ByteLengthOverflow {
562 buffer: "ring",
563 fix: "split the dispatch before DONE slot count exceeds u64",
564 })?;
565 }
566 }
567 Ok(done)
568}
569
570fn try_reserve_target_capacity<T>(
571 out: &mut Vec<T>,
572 target_capacity: usize,
573) -> Result<(), ProtocolError> {
574 try_reserve_protocol_capacity(
575 out,
576 target_capacity,
577 "control",
578 "host metrics decode could not reserve output records; reduce metrics fanout or decode into a reused scratch vector",
579 )
580}
581
582fn try_reserve_protocol_capacity<T>(
583 out: &mut Vec<T>,
584 target_capacity: usize,
585 buffer: &'static str,
586 fix: &'static str,
587) -> Result<(), ProtocolError> {
588 vyre_foundation::allocation::try_reserve_vec_to_capacity(out, target_capacity)
589 .map_err(|_| ProtocolError::ByteLengthOverflow { buffer, fix })
590}
591
592fn count_nonzero_metrics_words_strict(
593 words: &[u32],
594 byte_len: usize,
595) -> Result<usize, ProtocolError> {
596 let mut count = 0usize;
597 for i in 0..control::METRICS_SLOTS {
598 let word_idx = metrics_word_index(i)?;
599 let word = words
600 .get(word_idx)
601 .copied()
602 .map(u32::from_le)
603 .ok_or(ProtocolError::MissingWord {
604 buffer: "control",
605 word_idx,
606 byte_len,
607 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
608 })?;
609 if word > 0 {
610 count += 1;
611 }
612 }
613 Ok(count)
614}
615
616fn count_nonzero_metrics_unaligned_strict(control_bytes: &[u8]) -> Result<usize, ProtocolError> {
617 let mut count = 0usize;
618 for i in 0..control::METRICS_SLOTS {
619 let word_idx = metrics_word_index(i)?;
620 let word = read_word_unaligned(control_bytes, word_idx)
621 .ok_or(ProtocolError::MissingWord {
622 buffer: "control",
623 word_idx,
624 byte_len: control_bytes.len(),
625 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
626 })?;
627 if word > 0 {
628 count += 1;
629 }
630 }
631 Ok(count)
632}
633
634fn count_nonzero_metrics_truncated(
635 control_bytes: &[u8],
636 metrics_base: usize,
637 available_slots: usize,
638) -> usize {
639 let mut count = 0usize;
640 for slot in 0..available_slots {
641 if read_word_unaligned(control_bytes, metrics_base + slot).unwrap_or(0) > 0 {
642 count += 1;
643 }
644 }
645 count
646}
647
648fn metrics_word_index(slot: u32) -> Result<usize, ProtocolError> {
649 let word =
650 control::METRICS_BASE
651 .checked_add(slot)
652 .ok_or(ProtocolError::ByteLengthOverflow {
653 buffer: "control",
654 fix: "metrics slot index overflows the protocol word offset; shard metrics reads",
655 })?;
656 control_word_index(word, "metrics word index")
657}
658
659fn control_word_index(word: u32, label: &'static str) -> Result<usize, ProtocolError> {
660 usize::try_from(word).map_err(|_| ProtocolError::ByteLengthOverflow {
661 buffer: "control",
662 fix: match label {
663 "observable word index" => {
664 "observable word index cannot fit host usize; shard observable reads"
665 }
666 "metrics word index" => "metrics word index cannot fit host usize; shard metrics reads",
667 _ => "control word index cannot fit host usize; shard protocol reads",
668 },
669 })
670}
671
672pub(crate) fn read_word(bytes: &[u8], word_idx: usize) -> Option<u32> {
673 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(bytes) {
674 return words.get(word_idx).copied().map(u32::from_le);
675 }
676 read_word_unaligned(bytes, word_idx)
677}
678
679fn read_word_from_optional_words(
680 words: Option<&[u32]>,
681 bytes: &[u8],
682 word_idx: usize,
683) -> Option<u32> {
684 if let Some(words) = words {
685 return words.get(word_idx).copied().map(u32::from_le);
686 }
687 read_word_unaligned(bytes, word_idx)
688}
689
690fn read_word_unaligned(bytes: &[u8], word_idx: usize) -> Option<u32> {
691 let off = word_idx.checked_mul(4)?;
692 let end = off.checked_add(4)?;
693 let word = bytes.get(off..end)?;
694 Some(u32::from_le_bytes(word.try_into().ok()?))
695}
696
697fn read_required_word(
698 buffer: &'static str,
699 bytes: &[u8],
700 word_idx: usize,
701) -> Result<u32, ProtocolError> {
702 validate_word_aligned(buffer, bytes)?;
703 read_word(bytes, word_idx).ok_or(ProtocolError::MissingWord {
704 buffer,
705 word_idx,
706 byte_len: bytes.len(),
707 fix: "decode only buffers produced by the matching megakernel protocol encoder",
708 })
709}
710
711fn validate_word_aligned(buffer: &'static str, bytes: &[u8]) -> Result<(), ProtocolError> {
712 if bytes.len() % 4 == 0 {
713 Ok(())
714 } else {
715 Err(ProtocolError::MisalignedByteLength {
716 buffer,
717 byte_len: bytes.len(),
718 fix: "pass whole u32 protocol words; do not decode partial DMA/readback buffers",
719 })
720 }
721}
722
723pub(crate) fn write_word(bytes: &mut [u8], word_idx: usize, value: u32) {
724 let off = word_idx * 4;
725 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
726}
727
728fn words_to_bytes(words: u32) -> Option<usize> {
729 usize::try_from(words).ok()?.checked_mul(4)
730}