1use super::{
2 control, debug, slot, DebugRecord, ProtocolError, CONTROL_MIN_WORDS, MAX_DEBUG_RECORDS,
3 MAX_ENCODED_DEBUG_RECORDS, MAX_ENCODED_OBSERVABLE_SLOTS, MAX_ENCODED_RING_SLOTS,
4 MAX_OBSERVABLE_SLOTS, MAX_RING_SLOTS, SLOT_WORDS, STATUS_WORD,
5};
6
7#[must_use]
9pub fn control_byte_len(observable_slots: u32) -> Option<usize> {
10 if observable_slots > MAX_OBSERVABLE_SLOTS {
11 return None;
12 }
13 let words = control::OBSERVABLE_BASE.checked_add(observable_slots)?;
14 words_to_bytes(words.max(CONTROL_MIN_WORDS))
15}
16
17#[must_use]
19pub fn ring_byte_len(slot_count: u32) -> Option<usize> {
20 if slot_count > MAX_RING_SLOTS {
21 return None;
22 }
23 let words = slot_count.checked_mul(SLOT_WORDS)?;
24 words_to_bytes(words)
25}
26
27#[must_use]
29pub fn debug_log_byte_len(record_capacity: u32) -> Option<usize> {
30 if record_capacity > MAX_DEBUG_RECORDS {
31 return None;
32 }
33 let record_words = record_capacity.checked_mul(debug::RECORD_WORDS)?;
34 let words = debug::RECORDS_BASE.checked_add(record_words)?;
35 words_to_bytes(words)
36}
37
38fn control_encode_capacity(observable_slots: u32) -> Result<usize, ProtocolError> {
39 if observable_slots > MAX_ENCODED_OBSERVABLE_SLOTS {
40 return Err(ProtocolError::ByteLengthOverflow {
41 buffer: "control",
42 fix: "shard observable results or reduce observable_slots to the megakernel allocation cap before encoding control",
43 });
44 }
45 control_byte_len(observable_slots).ok_or(ProtocolError::ByteLengthOverflow {
46 buffer: "control",
47 fix: "shard observable results or reduce observable_slots to the megakernel protocol cap before encoding control",
48 })
49}
50
51fn ring_encode_capacity(slot_count: u32) -> Result<usize, ProtocolError> {
52 if slot_count > MAX_ENCODED_RING_SLOTS {
53 return Err(ProtocolError::ByteLengthOverflow {
54 buffer: "ring",
55 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
56 });
57 }
58 ring_byte_len(slot_count).ok_or(ProtocolError::ByteLengthOverflow {
59 buffer: "ring",
60 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
61 })
62}
63
64fn debug_log_encode_capacity(record_capacity: u32) -> Result<usize, ProtocolError> {
65 if record_capacity > MAX_ENCODED_DEBUG_RECORDS {
66 return Err(ProtocolError::ByteLengthOverflow {
67 buffer: "debug_log",
68 fix:
69 "reduce debug-log record_capacity to the megakernel allocation cap before encoding",
70 });
71 }
72 debug_log_byte_len(record_capacity).ok_or(ProtocolError::ByteLengthOverflow {
73 buffer: "debug_log",
74 fix: "reduce debug-log record_capacity to the megakernel protocol cap before encoding",
75 })
76}
77
78pub fn encode_control(
85 shutdown: bool,
86 tenant_count: u32,
87 observable_slots: u32,
88) -> Result<Vec<u8>, ProtocolError> {
89 try_encode_control(shutdown, tenant_count, observable_slots)
90}
91
92pub fn try_encode_control(
99 shutdown: bool,
100 tenant_count: u32,
101 observable_slots: u32,
102) -> Result<Vec<u8>, ProtocolError> {
103 let total_bytes = control_encode_capacity(observable_slots)?;
104 let mut bytes = Vec::new();
105 try_reserve_protocol_capacity(
106 &mut bytes,
107 total_bytes,
108 "control",
109 "control encode could not reserve host staging bytes; reduce observable_slots or reuse a preallocated control buffer",
110 )?;
111 try_encode_control_into(shutdown, tenant_count, observable_slots, &mut bytes)?;
112 Ok(bytes)
113}
114
115pub fn try_encode_control_into(
125 shutdown: bool,
126 tenant_count: u32,
127 observable_slots: u32,
128 dst: &mut Vec<u8>,
129) -> Result<(), ProtocolError> {
130 let total_bytes = control_encode_capacity(observable_slots)?;
131 dst.clear();
132 try_reserve_protocol_capacity(
133 dst,
134 total_bytes,
135 "control",
136 "control encode could not reserve caller-owned staging bytes; reduce observable_slots or reuse a larger control buffer",
137 )?;
138 dst.resize(total_bytes, 0);
139
140 if shutdown {
141 write_word(
142 dst,
143 control_word_index(control::SHUTDOWN, "shutdown word")?,
144 1,
145 );
146 }
147 write_word(
148 dst,
149 control_word_index(control::TENANT_BASE, "tenant base word")?,
150 control::TENANT_BASE + 1,
151 );
152
153 let tenant_table_start = control_word_index(control::TENANT_BASE, "tenant base word")?
154 .checked_add(1)
155 .ok_or(ProtocolError::ByteLengthOverflow {
156 buffer: "control",
157 fix: "tenant table start overflowed usize; reduce control protocol constants",
158 })?;
159 let requested_tenant_words =
160 usize::try_from(tenant_count).map_err(|_| ProtocolError::ByteLengthOverflow {
161 buffer: "control",
162 fix: "tenant_count cannot fit host usize; split tenant tables before encoding",
163 })?;
164 let tenant_table_end = core::cmp::min(
165 tenant_table_start
166 .checked_add(requested_tenant_words)
167 .ok_or(ProtocolError::ByteLengthOverflow {
168 buffer: "control",
169 fix: "tenant table end overflowed usize; split tenant tables before encoding",
170 })?,
171 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?,
172 );
173 for word_idx in tenant_table_start..tenant_table_end {
174 write_word(dst, word_idx, !0u32);
175 }
176
177 let quota_table_start =
178 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?;
179 let quota_table_end = core::cmp::min(
180 quota_table_start
181 .checked_add(requested_tenant_words)
182 .ok_or(ProtocolError::ByteLengthOverflow {
183 buffer: "control",
184 fix: "quota table end overflowed usize; split tenant tables before encoding",
185 })?,
186 control_word_index(control::TENANT_FAIRNESS_BASE, "tenant fairness base word")?,
187 );
188 for word_idx in quota_table_start..quota_table_end {
189 write_word(dst, word_idx, 1_000_000);
190 }
191 Ok(())
192}
193
194pub fn encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
201 try_encode_empty_ring(slot_count)
202}
203
204pub fn try_encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
211 let total_bytes = ring_encode_capacity(slot_count)?;
212 let mut bytes = Vec::new();
213 try_reserve_protocol_capacity(
214 &mut bytes,
215 total_bytes,
216 "ring",
217 "ring encode could not reserve host staging bytes; split the dispatch into smaller ring shards or reuse a preallocated ring buffer",
218 )?;
219 try_encode_empty_ring_into(slot_count, &mut bytes)?;
220 Ok(bytes)
221}
222
223pub fn try_encode_empty_ring_into(slot_count: u32, dst: &mut Vec<u8>) -> Result<(), ProtocolError> {
232 let total_bytes = ring_encode_capacity(slot_count)?;
233 dst.clear();
234 try_reserve_protocol_capacity(
235 dst,
236 total_bytes,
237 "ring",
238 "ring encode could not reserve caller-owned staging bytes; split the dispatch into smaller ring shards or reuse a larger ring buffer",
239 )?;
240 dst.resize(total_bytes, 0);
241 Ok(())
242}
243
244pub fn encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
251 try_encode_empty_debug_log(record_capacity)
252}
253
254pub fn try_encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
261 let total_bytes = debug_log_encode_capacity(record_capacity)?;
262 let mut bytes = Vec::new();
263 try_reserve_protocol_capacity(
264 &mut bytes,
265 total_bytes,
266 "debug_log",
267 "debug-log encode could not reserve host staging bytes; reduce record_capacity or reuse a preallocated debug-log buffer",
268 )?;
269 try_encode_empty_debug_log_into(record_capacity, &mut bytes)?;
270 Ok(bytes)
271}
272
273pub fn try_encode_empty_debug_log_into(
282 record_capacity: u32,
283 dst: &mut Vec<u8>,
284) -> Result<(), ProtocolError> {
285 let total_bytes = debug_log_encode_capacity(record_capacity)?;
286 dst.clear();
287 try_reserve_protocol_capacity(
288 dst,
289 total_bytes,
290 "debug_log",
291 "debug-log encode could not reserve caller-owned staging bytes; reduce record_capacity or reuse a larger debug-log buffer",
292 )?;
293 dst.resize(total_bytes, 0);
294 Ok(())
295}
296
297#[must_use]
299pub fn read_done_count(control_bytes: &[u8]) -> u32 {
300 try_read_done_count(control_bytes).unwrap_or_else(|source| {
301 panic!(
302 "megakernel control done_count decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
303 )
304 })
305}
306
307#[must_use]
309pub fn read_epoch(control_bytes: &[u8]) -> u32 {
310 try_read_epoch(control_bytes).unwrap_or_else(|source| {
311 panic!(
312 "megakernel control epoch decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
313 )
314 })
315}
316
317pub fn try_read_done_count(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
324 read_required_word(
325 "control",
326 control_bytes,
327 control_word_index(control::DONE_COUNT, "done-count word")?,
328 )
329}
330
331pub fn try_read_epoch(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
338 read_required_word(
339 "control",
340 control_bytes,
341 control_word_index(control::EPOCH, "epoch word")?,
342 )
343}
344
345#[must_use]
347pub fn read_observable(control_bytes: &[u8], index: u32) -> u32 {
348 try_read_observable(control_bytes, index).unwrap_or(0)
349}
350
351pub fn try_read_observable(control_bytes: &[u8], index: u32) -> Result<u32, ProtocolError> {
358 let word_idx = control_word_index(
359 control::OBSERVABLE_BASE
360 .checked_add(index)
361 .ok_or(ProtocolError::ByteLengthOverflow {
362 buffer: "control",
363 fix: "observable index overflows the protocol word offset; shard observable reads",
364 })?,
365 "observable word index",
366 )?;
367 read_required_word("control", control_bytes, word_idx)
368}
369
370#[must_use]
372pub fn read_metrics(control_bytes: &[u8]) -> Vec<(u32, u32)> {
373 let mut result = Vec::new();
374 read_metrics_into(control_bytes, &mut result);
375 result
376}
377
378pub fn read_metrics_into(control_bytes: &[u8], out: &mut Vec<(u32, u32)>) {
382 out.clear();
383 let Ok(metrics_base) = control_word_index(control::METRICS_BASE, "metrics base word") else {
384 return;
385 };
386 let available_words = control_bytes.len() / 4;
387 if available_words <= metrics_base {
388 return;
389 }
390 let available_slots = (available_words - metrics_base).min(control::METRICS_SLOTS as usize);
391 let nonzero = count_nonzero_metrics_truncated(control_bytes, metrics_base, available_slots);
392 if try_reserve_target_capacity(out, nonzero).is_err() {
393 return;
394 }
395 for slot in 0..available_slots {
396 let word_idx = metrics_base + slot;
397 let Some(count) = read_word_unaligned(control_bytes, word_idx) else {
398 break;
399 };
400 if count > 0 {
401 out.push((slot as u32, count));
402 }
403 }
404}
405
406pub fn try_read_metrics(control_bytes: &[u8]) -> Result<Vec<(u32, u32)>, ProtocolError> {
413 let mut result = Vec::new();
414 try_read_metrics_into(control_bytes, &mut result)?;
415 Ok(result)
416}
417
418pub fn try_read_metrics_into(
427 control_bytes: &[u8],
428 out: &mut Vec<(u32, u32)>,
429) -> Result<(), ProtocolError> {
430 validate_word_aligned("control", control_bytes)?;
431 out.clear();
432 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(control_bytes) {
433 try_reserve_target_capacity(
434 out,
435 count_nonzero_metrics_words_strict(words, control_bytes.len())?,
436 )?;
437 for i in 0..control::METRICS_SLOTS {
438 let word_idx = metrics_word_index(i)?;
439 let count =
440 words
441 .get(word_idx)
442 .copied()
443 .map(u32::from_le)
444 .ok_or(ProtocolError::MissingWord {
445 buffer: "control",
446 word_idx,
447 byte_len: control_bytes.len(),
448 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
449 })?;
450 if count > 0 {
451 out.push((i, count));
452 }
453 }
454 return Ok(());
455 }
456 try_reserve_target_capacity(out, count_nonzero_metrics_unaligned_strict(control_bytes)?)?;
457 for i in 0..control::METRICS_SLOTS {
458 let word_idx = metrics_word_index(i)?;
459 let count = read_word_unaligned(control_bytes, word_idx)
460 .ok_or(ProtocolError::MissingWord {
461 buffer: "control",
462 word_idx,
463 byte_len: control_bytes.len(),
464 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
465 })?;
466 if count > 0 {
467 out.push((i, count));
468 }
469 }
470 Ok(())
471}
472
473
474mod debug_log;
475
476pub use debug_log::{
477 read_debug_log, read_debug_log_into, try_read_debug_log, try_read_debug_log_into,
478};
479
480#[must_use]
486pub fn count_done_ring_slots(ring_bytes: &[u8], item_count: usize) -> Option<u64> {
487 if item_count == 0 {
488 return None;
489 }
490 let slot_words = usize::try_from(SLOT_WORDS).ok()?;
491 let required_bytes = item_count.checked_mul(slot_words)?.checked_mul(4)?;
492 if ring_bytes.len() < required_bytes {
493 return None;
494 }
495 let status_word = usize::try_from(STATUS_WORD).ok()?;
496 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
497 let done = (0..item_count)
498 .filter(|slot_idx| {
499 let word_idx = slot_idx
500 .checked_mul(slot_words)
501 .and_then(|base| base.checked_add(status_word));
502 word_idx.and_then(|idx| read_word_from_optional_words(words, ring_bytes, idx))
503 == Some(slot::DONE)
504 })
505 .count();
506 u64::try_from(done).ok()
507}
508
509pub fn try_count_done_ring_slots(
517 ring_bytes: &[u8],
518 item_count: usize,
519) -> Result<u64, ProtocolError> {
520 if item_count == 0 {
521 return Ok(0);
522 }
523 validate_word_aligned("ring", ring_bytes)?;
524 let slot_words =
525 usize::try_from(SLOT_WORDS).map_err(|_| ProtocolError::ByteLengthOverflow {
526 buffer: "ring",
527 fix: "keep SLOT_WORDS representable in host usize before decoding ring status",
528 })?;
529 let required_bytes = item_count
530 .checked_mul(slot_words)
531 .and_then(|words| words.checked_mul(4))
532 .ok_or(ProtocolError::ByteLengthOverflow {
533 buffer: "ring",
534 fix: "split the dispatch before ring status decode overflows host address space",
535 })?;
536 if ring_bytes.len() < required_bytes {
537 return Err(ProtocolError::MissingWord {
538 buffer: "ring",
539 word_idx: required_bytes / 4,
540 byte_len: ring_bytes.len(),
541 fix: "decode only full ring readbacks sized for the submitted megakernel item_count",
542 });
543 }
544 let status_word =
545 usize::try_from(STATUS_WORD).map_err(|_| ProtocolError::ByteLengthOverflow {
546 buffer: "ring",
547 fix: "keep STATUS_WORD representable in host usize before decoding ring status",
548 })?;
549 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
550 let mut done = 0_u64;
551 for slot_idx in 0..item_count {
552 let word_idx = slot_idx
553 .checked_mul(slot_words)
554 .and_then(|base| base.checked_add(status_word))
555 .ok_or(ProtocolError::ByteLengthOverflow {
556 buffer: "ring",
557 fix: "split the dispatch before ring status word indexing overflows host address space",
558 })?;
559 if read_word_from_optional_words(words, ring_bytes, word_idx) == Some(slot::DONE) {
560 done = done
561 .checked_add(1)
562 .ok_or(ProtocolError::ByteLengthOverflow {
563 buffer: "ring",
564 fix: "split the dispatch before DONE slot count exceeds u64",
565 })?;
566 }
567 }
568 Ok(done)
569}
570
571fn try_reserve_target_capacity<T>(
572 out: &mut Vec<T>,
573 target_capacity: usize,
574) -> Result<(), ProtocolError> {
575 try_reserve_protocol_capacity(
576 out,
577 target_capacity,
578 "control",
579 "host metrics decode could not reserve output records; reduce metrics fanout or decode into a reused scratch vector",
580 )
581}
582
583fn try_reserve_protocol_capacity<T>(
584 out: &mut Vec<T>,
585 target_capacity: usize,
586 buffer: &'static str,
587 fix: &'static str,
588) -> Result<(), ProtocolError> {
589 vyre_foundation::allocation::try_reserve_vec_to_capacity(out, target_capacity)
590 .map_err(|_| ProtocolError::ByteLengthOverflow { buffer, fix })
591}
592
593fn count_nonzero_metrics_words_strict(
594 words: &[u32],
595 byte_len: usize,
596) -> Result<usize, ProtocolError> {
597 let mut count = 0usize;
598 for i in 0..control::METRICS_SLOTS {
599 let word_idx = metrics_word_index(i)?;
600 let word = words
601 .get(word_idx)
602 .copied()
603 .map(u32::from_le)
604 .ok_or(ProtocolError::MissingWord {
605 buffer: "control",
606 word_idx,
607 byte_len,
608 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
609 })?;
610 if word > 0 {
611 count += 1;
612 }
613 }
614 Ok(count)
615}
616
617fn count_nonzero_metrics_unaligned_strict(control_bytes: &[u8]) -> Result<usize, ProtocolError> {
618 let mut count = 0usize;
619 for i in 0..control::METRICS_SLOTS {
620 let word_idx = metrics_word_index(i)?;
621 let word = read_word_unaligned(control_bytes, word_idx)
622 .ok_or(ProtocolError::MissingWord {
623 buffer: "control",
624 word_idx,
625 byte_len: control_bytes.len(),
626 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
627 })?;
628 if word > 0 {
629 count += 1;
630 }
631 }
632 Ok(count)
633}
634
635fn count_nonzero_metrics_truncated(
636 control_bytes: &[u8],
637 metrics_base: usize,
638 available_slots: usize,
639) -> usize {
640 let mut count = 0usize;
641 for slot in 0..available_slots {
642 if read_word_unaligned(control_bytes, metrics_base + slot).unwrap_or(0) > 0 {
643 count += 1;
644 }
645 }
646 count
647}
648
649fn metrics_word_index(slot: u32) -> Result<usize, ProtocolError> {
650 let word =
651 control::METRICS_BASE
652 .checked_add(slot)
653 .ok_or(ProtocolError::ByteLengthOverflow {
654 buffer: "control",
655 fix: "metrics slot index overflows the protocol word offset; shard metrics reads",
656 })?;
657 control_word_index(word, "metrics word index")
658}
659
660fn control_word_index(word: u32, label: &'static str) -> Result<usize, ProtocolError> {
661 usize::try_from(word).map_err(|_| ProtocolError::ByteLengthOverflow {
662 buffer: "control",
663 fix: match label {
664 "observable word index" => {
665 "observable word index cannot fit host usize; shard observable reads"
666 }
667 "metrics word index" => "metrics word index cannot fit host usize; shard metrics reads",
668 _ => "control word index cannot fit host usize; shard protocol reads",
669 },
670 })
671}
672
673pub(crate) fn read_word(bytes: &[u8], word_idx: usize) -> Option<u32> {
674 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(bytes) {
675 return words.get(word_idx).copied().map(u32::from_le);
676 }
677 read_word_unaligned(bytes, word_idx)
678}
679
680fn read_word_from_optional_words(
681 words: Option<&[u32]>,
682 bytes: &[u8],
683 word_idx: usize,
684) -> Option<u32> {
685 if let Some(words) = words {
686 return words.get(word_idx).copied().map(u32::from_le);
687 }
688 read_word_unaligned(bytes, word_idx)
689}
690
691fn read_word_unaligned(bytes: &[u8], word_idx: usize) -> Option<u32> {
692 let off = word_idx.checked_mul(4)?;
693 let end = off.checked_add(4)?;
694 let word = bytes.get(off..end)?;
695 Some(u32::from_le_bytes(word.try_into().ok()?))
696}
697
698fn read_required_word(
699 buffer: &'static str,
700 bytes: &[u8],
701 word_idx: usize,
702) -> Result<u32, ProtocolError> {
703 validate_word_aligned(buffer, bytes)?;
704 read_word(bytes, word_idx).ok_or(ProtocolError::MissingWord {
705 buffer,
706 word_idx,
707 byte_len: bytes.len(),
708 fix: "decode only buffers produced by the matching megakernel protocol encoder",
709 })
710}
711
712fn validate_word_aligned(buffer: &'static str, bytes: &[u8]) -> Result<(), ProtocolError> {
713 if bytes.len() % 4 == 0 {
714 Ok(())
715 } else {
716 Err(ProtocolError::MisalignedByteLength {
717 buffer,
718 byte_len: bytes.len(),
719 fix: "pass whole u32 protocol words; do not decode partial DMA/readback buffers",
720 })
721 }
722}
723
724pub(crate) fn write_word(bytes: &mut [u8], word_idx: usize, value: u32) {
725 let off = word_idx * 4;
726 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
727}
728
729fn words_to_bytes(words: u32) -> Option<usize> {
730 usize::try_from(words).ok()?.checked_mul(4)
731}
732