1use super::{
2 control, debug, slot, DebugRecord, ProtocolError, CONTROL_MIN_WORDS, MAX_DEBUG_RECORDS,
3 MAX_ENCODED_DEBUG_RECORDS, MAX_ENCODED_OBSERVABLE_SLOTS, MAX_ENCODED_RING_SLOTS,
4 MAX_OBSERVABLE_SLOTS, MAX_RING_SLOTS, SLOT_WORDS, STATUS_WORD,
5};
6
7#[must_use]
9pub fn control_byte_len(observable_slots: u32) -> Option<usize> {
10 if observable_slots > MAX_OBSERVABLE_SLOTS {
11 return None;
12 }
13 let words = control::OBSERVABLE_BASE.checked_add(observable_slots)?;
14 words_to_bytes(words.max(CONTROL_MIN_WORDS))
15}
16
17#[must_use]
19pub fn ring_byte_len(slot_count: u32) -> Option<usize> {
20 if slot_count > MAX_RING_SLOTS {
21 return None;
22 }
23 let words = slot_count.checked_mul(SLOT_WORDS)?;
24 words_to_bytes(words)
25}
26
27#[must_use]
29pub fn debug_log_byte_len(record_capacity: u32) -> Option<usize> {
30 if record_capacity > MAX_DEBUG_RECORDS {
31 return None;
32 }
33 let record_words = record_capacity.checked_mul(debug::RECORD_WORDS)?;
34 let words = debug::RECORDS_BASE.checked_add(record_words)?;
35 words_to_bytes(words)
36}
37
38fn control_encode_capacity(observable_slots: u32) -> Result<usize, ProtocolError> {
39 if observable_slots > MAX_ENCODED_OBSERVABLE_SLOTS {
40 return Err(ProtocolError::ByteLengthOverflow {
41 buffer: "control",
42 fix: "shard observable results or reduce observable_slots to the megakernel allocation cap before encoding control",
43 });
44 }
45 control_byte_len(observable_slots).ok_or(ProtocolError::ByteLengthOverflow {
46 buffer: "control",
47 fix: "shard observable results or reduce observable_slots to the megakernel protocol cap before encoding control",
48 })
49}
50
51fn ring_encode_capacity(slot_count: u32) -> Result<usize, ProtocolError> {
52 if slot_count > MAX_ENCODED_RING_SLOTS {
53 return Err(ProtocolError::ByteLengthOverflow {
54 buffer: "ring",
55 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
56 });
57 }
58 ring_byte_len(slot_count).ok_or(ProtocolError::ByteLengthOverflow {
59 buffer: "ring",
60 fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
61 })
62}
63
64fn debug_log_encode_capacity(record_capacity: u32) -> Result<usize, ProtocolError> {
65 if record_capacity > MAX_ENCODED_DEBUG_RECORDS {
66 return Err(ProtocolError::ByteLengthOverflow {
67 buffer: "debug_log",
68 fix:
69 "reduce debug-log record_capacity to the megakernel allocation cap before encoding",
70 });
71 }
72 debug_log_byte_len(record_capacity).ok_or(ProtocolError::ByteLengthOverflow {
73 buffer: "debug_log",
74 fix: "reduce debug-log record_capacity to the megakernel protocol cap before encoding",
75 })
76}
77
78pub fn encode_control(
85 shutdown: bool,
86 tenant_count: u32,
87 observable_slots: u32,
88) -> Result<Vec<u8>, ProtocolError> {
89 try_encode_control(shutdown, tenant_count, observable_slots)
90}
91
92pub fn try_encode_control(
99 shutdown: bool,
100 tenant_count: u32,
101 observable_slots: u32,
102) -> Result<Vec<u8>, ProtocolError> {
103 let total_bytes = control_encode_capacity(observable_slots)?;
104 let mut bytes = Vec::new();
105 try_reserve_protocol_capacity(
106 &mut bytes,
107 total_bytes,
108 "control",
109 "control encode could not reserve host staging bytes; reduce observable_slots or reuse a preallocated control buffer",
110 )?;
111 try_encode_control_into(shutdown, tenant_count, observable_slots, &mut bytes)?;
112 Ok(bytes)
113}
114
115pub fn try_encode_control_into(
125 shutdown: bool,
126 tenant_count: u32,
127 observable_slots: u32,
128 dst: &mut Vec<u8>,
129) -> Result<(), ProtocolError> {
130 let total_bytes = control_encode_capacity(observable_slots)?;
131 dst.clear();
132 try_reserve_protocol_capacity(
133 dst,
134 total_bytes,
135 "control",
136 "control encode could not reserve caller-owned staging bytes; reduce observable_slots or reuse a larger control buffer",
137 )?;
138 dst.resize(total_bytes, 0);
139
140 if shutdown {
141 write_word(
142 dst,
143 control_word_index(control::SHUTDOWN, "shutdown word")?,
144 1,
145 );
146 }
147 write_word(
148 dst,
149 control_word_index(control::TENANT_BASE, "tenant base word")?,
150 control::TENANT_BASE + 1,
151 );
152
153 let tenant_table_start = control_word_index(control::TENANT_BASE, "tenant base word")?
154 .checked_add(1)
155 .ok_or(ProtocolError::ByteLengthOverflow {
156 buffer: "control",
157 fix: "tenant table start overflowed usize; reduce control protocol constants",
158 })?;
159 let requested_tenant_words =
160 usize::try_from(tenant_count).map_err(|_| ProtocolError::ByteLengthOverflow {
161 buffer: "control",
162 fix: "tenant_count cannot fit host usize; split tenant tables before encoding",
163 })?;
164 let tenant_table_end = core::cmp::min(
165 tenant_table_start
166 .checked_add(requested_tenant_words)
167 .ok_or(ProtocolError::ByteLengthOverflow {
168 buffer: "control",
169 fix: "tenant table end overflowed usize; split tenant tables before encoding",
170 })?,
171 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?,
172 );
173 for word_idx in tenant_table_start..tenant_table_end {
174 write_word(dst, word_idx, !0u32);
175 }
176
177 let quota_table_start =
178 control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?;
179 let quota_table_end = core::cmp::min(
180 quota_table_start
181 .checked_add(requested_tenant_words)
182 .ok_or(ProtocolError::ByteLengthOverflow {
183 buffer: "control",
184 fix: "quota table end overflowed usize; split tenant tables before encoding",
185 })?,
186 control_word_index(control::TENANT_FAIRNESS_BASE, "tenant fairness base word")?,
187 );
188 for word_idx in quota_table_start..quota_table_end {
189 write_word(dst, word_idx, 1_000_000);
190 }
191 Ok(())
192}
193
194pub fn encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
201 try_encode_empty_ring(slot_count)
202}
203
204pub fn try_encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
211 let total_bytes = ring_encode_capacity(slot_count)?;
212 let mut bytes = Vec::new();
213 try_reserve_protocol_capacity(
214 &mut bytes,
215 total_bytes,
216 "ring",
217 "ring encode could not reserve host staging bytes; split the dispatch into smaller ring shards or reuse a preallocated ring buffer",
218 )?;
219 try_encode_empty_ring_into(slot_count, &mut bytes)?;
220 Ok(bytes)
221}
222
223pub fn try_encode_empty_ring_into(slot_count: u32, dst: &mut Vec<u8>) -> Result<(), ProtocolError> {
232 let total_bytes = ring_encode_capacity(slot_count)?;
233 dst.clear();
234 try_reserve_protocol_capacity(
235 dst,
236 total_bytes,
237 "ring",
238 "ring encode could not reserve caller-owned staging bytes; split the dispatch into smaller ring shards or reuse a larger ring buffer",
239 )?;
240 dst.resize(total_bytes, 0);
241 Ok(())
242}
243
244pub fn encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
251 try_encode_empty_debug_log(record_capacity)
252}
253
254pub fn try_encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
261 let total_bytes = debug_log_encode_capacity(record_capacity)?;
262 let mut bytes = Vec::new();
263 try_reserve_protocol_capacity(
264 &mut bytes,
265 total_bytes,
266 "debug_log",
267 "debug-log encode could not reserve host staging bytes; reduce record_capacity or reuse a preallocated debug-log buffer",
268 )?;
269 try_encode_empty_debug_log_into(record_capacity, &mut bytes)?;
270 Ok(bytes)
271}
272
273pub fn try_encode_empty_debug_log_into(
282 record_capacity: u32,
283 dst: &mut Vec<u8>,
284) -> Result<(), ProtocolError> {
285 let total_bytes = debug_log_encode_capacity(record_capacity)?;
286 dst.clear();
287 try_reserve_protocol_capacity(
288 dst,
289 total_bytes,
290 "debug_log",
291 "debug-log encode could not reserve caller-owned staging bytes; reduce record_capacity or reuse a larger debug-log buffer",
292 )?;
293 dst.resize(total_bytes, 0);
294 Ok(())
295}
296
297#[must_use]
299pub fn read_done_count(control_bytes: &[u8]) -> u32 {
300 try_read_done_count(control_bytes).unwrap_or(0)
301}
302
303#[must_use]
305pub fn read_epoch(control_bytes: &[u8]) -> u32 {
306 try_read_epoch(control_bytes).unwrap_or(0)
307}
308
309pub fn try_read_done_count(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
316 read_required_word(
317 "control",
318 control_bytes,
319 control_word_index(control::DONE_COUNT, "done-count word")?,
320 )
321}
322
323pub fn try_read_epoch(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
330 read_required_word(
331 "control",
332 control_bytes,
333 control_word_index(control::EPOCH, "epoch word")?,
334 )
335}
336
337#[must_use]
339pub fn read_observable(control_bytes: &[u8], index: u32) -> u32 {
340 try_read_observable(control_bytes, index).unwrap_or(0)
341}
342
343pub fn try_read_observable(control_bytes: &[u8], index: u32) -> Result<u32, ProtocolError> {
350 let word_idx = control_word_index(
351 control::OBSERVABLE_BASE
352 .checked_add(index)
353 .ok_or(ProtocolError::ByteLengthOverflow {
354 buffer: "control",
355 fix: "observable index overflows the protocol word offset; shard observable reads",
356 })?,
357 "observable word index",
358 )?;
359 read_required_word("control", control_bytes, word_idx)
360}
361
362#[must_use]
364pub fn read_metrics(control_bytes: &[u8]) -> Vec<(u32, u32)> {
365 let mut result = Vec::new();
366 read_metrics_into(control_bytes, &mut result);
367 result
368}
369
370pub fn read_metrics_into(control_bytes: &[u8], out: &mut Vec<(u32, u32)>) {
374 out.clear();
375 let Ok(metrics_base) = control_word_index(control::METRICS_BASE, "metrics base word") else {
376 return;
377 };
378 let available_words = control_bytes.len() / 4;
379 if available_words <= metrics_base {
380 return;
381 }
382 let available_slots = (available_words - metrics_base).min(control::METRICS_SLOTS as usize);
383 let nonzero = count_nonzero_metrics_truncated(control_bytes, metrics_base, available_slots);
384 if try_reserve_target_capacity(out, nonzero).is_err() {
385 return;
386 }
387 for slot in 0..available_slots {
388 let word_idx = metrics_base + slot;
389 let Some(count) = read_word_unaligned(control_bytes, word_idx) else {
390 break;
391 };
392 if count > 0 {
393 out.push((slot as u32, count));
394 }
395 }
396}
397
398pub fn try_read_metrics(control_bytes: &[u8]) -> Result<Vec<(u32, u32)>, ProtocolError> {
405 let mut result = Vec::new();
406 try_read_metrics_into(control_bytes, &mut result)?;
407 Ok(result)
408}
409
410pub fn try_read_metrics_into(
419 control_bytes: &[u8],
420 out: &mut Vec<(u32, u32)>,
421) -> Result<(), ProtocolError> {
422 validate_word_aligned("control", control_bytes)?;
423 out.clear();
424 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(control_bytes) {
425 try_reserve_target_capacity(
426 out,
427 count_nonzero_metrics_words_strict(words, control_bytes.len())?,
428 )?;
429 for i in 0..control::METRICS_SLOTS {
430 let word_idx = metrics_word_index(i)?;
431 let count =
432 words
433 .get(word_idx)
434 .copied()
435 .map(u32::from_le)
436 .ok_or(ProtocolError::MissingWord {
437 buffer: "control",
438 word_idx,
439 byte_len: control_bytes.len(),
440 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
441 })?;
442 if count > 0 {
443 out.push((i, count));
444 }
445 }
446 return Ok(());
447 }
448 try_reserve_target_capacity(out, count_nonzero_metrics_unaligned_strict(control_bytes)?)?;
449 for i in 0..control::METRICS_SLOTS {
450 let word_idx = metrics_word_index(i)?;
451 let count = read_word_unaligned(control_bytes, word_idx)
452 .ok_or(ProtocolError::MissingWord {
453 buffer: "control",
454 word_idx,
455 byte_len: control_bytes.len(),
456 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
457 })?;
458 if count > 0 {
459 out.push((i, count));
460 }
461 }
462 Ok(())
463}
464
465mod debug_log;
466
467pub use debug_log::{
468 read_debug_log, read_debug_log_into, try_read_debug_log, try_read_debug_log_into,
469};
470
471#[must_use]
477pub fn count_done_ring_slots(ring_bytes: &[u8], item_count: usize) -> Option<u64> {
478 if item_count == 0 {
479 return None;
480 }
481 let slot_words = usize::try_from(SLOT_WORDS).ok()?;
482 let required_bytes = item_count.checked_mul(slot_words)?.checked_mul(4)?;
483 if ring_bytes.len() < required_bytes {
484 return None;
485 }
486 let status_word = usize::try_from(STATUS_WORD).ok()?;
487 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
488 let done = (0..item_count)
489 .filter(|slot_idx| {
490 let word_idx = slot_idx
491 .checked_mul(slot_words)
492 .and_then(|base| base.checked_add(status_word));
493 word_idx.and_then(|idx| read_word_from_optional_words(words, ring_bytes, idx))
494 == Some(slot::DONE)
495 })
496 .count();
497 u64::try_from(done).ok()
498}
499
500pub fn try_count_done_ring_slots(
508 ring_bytes: &[u8],
509 item_count: usize,
510) -> Result<u64, ProtocolError> {
511 if item_count == 0 {
512 return Ok(0);
513 }
514 validate_word_aligned("ring", ring_bytes)?;
515 let slot_words =
516 usize::try_from(SLOT_WORDS).map_err(|_| ProtocolError::ByteLengthOverflow {
517 buffer: "ring",
518 fix: "keep SLOT_WORDS representable in host usize before decoding ring status",
519 })?;
520 let required_bytes = item_count
521 .checked_mul(slot_words)
522 .and_then(|words| words.checked_mul(4))
523 .ok_or(ProtocolError::ByteLengthOverflow {
524 buffer: "ring",
525 fix: "split the dispatch before ring status decode overflows host address space",
526 })?;
527 if ring_bytes.len() < required_bytes {
528 return Err(ProtocolError::MissingWord {
529 buffer: "ring",
530 word_idx: required_bytes / 4,
531 byte_len: ring_bytes.len(),
532 fix: "decode only full ring readbacks sized for the submitted megakernel item_count",
533 });
534 }
535 let status_word =
536 usize::try_from(STATUS_WORD).map_err(|_| ProtocolError::ByteLengthOverflow {
537 buffer: "ring",
538 fix: "keep STATUS_WORD representable in host usize before decoding ring status",
539 })?;
540 let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
541 let mut done = 0_u64;
542 for slot_idx in 0..item_count {
543 let word_idx = slot_idx
544 .checked_mul(slot_words)
545 .and_then(|base| base.checked_add(status_word))
546 .ok_or(ProtocolError::ByteLengthOverflow {
547 buffer: "ring",
548 fix: "split the dispatch before ring status word indexing overflows host address space",
549 })?;
550 if read_word_from_optional_words(words, ring_bytes, word_idx) == Some(slot::DONE) {
551 done = done
552 .checked_add(1)
553 .ok_or(ProtocolError::ByteLengthOverflow {
554 buffer: "ring",
555 fix: "split the dispatch before DONE slot count exceeds u64",
556 })?;
557 }
558 }
559 Ok(done)
560}
561
562fn try_reserve_target_capacity<T>(
563 out: &mut Vec<T>,
564 target_capacity: usize,
565) -> Result<(), ProtocolError> {
566 try_reserve_protocol_capacity(
567 out,
568 target_capacity,
569 "control",
570 "host metrics decode could not reserve output records; reduce metrics fanout or decode into a reused scratch vector",
571 )
572}
573
574pub(super) fn try_reserve_protocol_capacity<T>(
575 out: &mut Vec<T>,
576 target_capacity: usize,
577 buffer: &'static str,
578 fix: &'static str,
579) -> Result<(), ProtocolError> {
580 vyre_foundation::allocation::try_reserve_vec_to_capacity(out, target_capacity)
581 .map_err(|_| ProtocolError::ByteLengthOverflow { buffer, fix })
582}
583
584fn count_nonzero_metrics_words_strict(
585 words: &[u32],
586 byte_len: usize,
587) -> Result<usize, ProtocolError> {
588 let mut count = 0usize;
589 for i in 0..control::METRICS_SLOTS {
590 let word_idx = metrics_word_index(i)?;
591 let word = words
592 .get(word_idx)
593 .copied()
594 .map(u32::from_le)
595 .ok_or(ProtocolError::MissingWord {
596 buffer: "control",
597 word_idx,
598 byte_len,
599 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
600 })?;
601 if word > 0 {
602 count += 1;
603 }
604 }
605 Ok(count)
606}
607
608fn count_nonzero_metrics_unaligned_strict(control_bytes: &[u8]) -> Result<usize, ProtocolError> {
609 let mut count = 0usize;
610 for i in 0..control::METRICS_SLOTS {
611 let word_idx = metrics_word_index(i)?;
612 let word = read_word_unaligned(control_bytes, word_idx)
613 .ok_or(ProtocolError::MissingWord {
614 buffer: "control",
615 word_idx,
616 byte_len: control_bytes.len(),
617 fix: "decode only control buffers produced by the matching megakernel protocol encoder",
618 })?;
619 if word > 0 {
620 count += 1;
621 }
622 }
623 Ok(count)
624}
625
626fn count_nonzero_metrics_truncated(
627 control_bytes: &[u8],
628 metrics_base: usize,
629 available_slots: usize,
630) -> usize {
631 let mut count = 0usize;
632 for slot in 0..available_slots {
633 if read_word_unaligned(control_bytes, metrics_base + slot).unwrap_or(0) > 0 {
634 count += 1;
635 }
636 }
637 count
638}
639
640fn metrics_word_index(slot: u32) -> Result<usize, ProtocolError> {
641 let word =
642 control::METRICS_BASE
643 .checked_add(slot)
644 .ok_or(ProtocolError::ByteLengthOverflow {
645 buffer: "control",
646 fix: "metrics slot index overflows the protocol word offset; shard metrics reads",
647 })?;
648 control_word_index(word, "metrics word index")
649}
650
651fn control_word_index(word: u32, label: &'static str) -> Result<usize, ProtocolError> {
652 usize::try_from(word).map_err(|_| ProtocolError::ByteLengthOverflow {
653 buffer: "control",
654 fix: match label {
655 "observable word index" => {
656 "observable word index cannot fit host usize; shard observable reads"
657 }
658 "metrics word index" => "metrics word index cannot fit host usize; shard metrics reads",
659 _ => "control word index cannot fit host usize; shard protocol reads",
660 },
661 })
662}
663
664pub(crate) fn read_word(bytes: &[u8], word_idx: usize) -> Option<u32> {
665 if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(bytes) {
666 return words.get(word_idx).copied().map(u32::from_le);
667 }
668 read_word_unaligned(bytes, word_idx)
669}
670
671fn read_word_from_optional_words(
672 words: Option<&[u32]>,
673 bytes: &[u8],
674 word_idx: usize,
675) -> Option<u32> {
676 if let Some(words) = words {
677 return words.get(word_idx).copied().map(u32::from_le);
678 }
679 read_word_unaligned(bytes, word_idx)
680}
681
682fn read_word_unaligned(bytes: &[u8], word_idx: usize) -> Option<u32> {
683 let off = word_idx.checked_mul(4)?;
684 let end = off.checked_add(4)?;
685 let word = bytes.get(off..end)?;
686 Some(u32::from_le_bytes(word.try_into().ok()?))
687}
688
689fn read_required_word(
690 buffer: &'static str,
691 bytes: &[u8],
692 word_idx: usize,
693) -> Result<u32, ProtocolError> {
694 validate_word_aligned(buffer, bytes)?;
695 read_word(bytes, word_idx).ok_or(ProtocolError::MissingWord {
696 buffer,
697 word_idx,
698 byte_len: bytes.len(),
699 fix: "decode only buffers produced by the matching megakernel protocol encoder",
700 })
701}
702
703fn validate_word_aligned(buffer: &'static str, bytes: &[u8]) -> Result<(), ProtocolError> {
704 if bytes.len() % 4 == 0 {
705 Ok(())
706 } else {
707 Err(ProtocolError::MisalignedByteLength {
708 buffer,
709 byte_len: bytes.len(),
710 fix: "pass whole u32 protocol words; do not decode partial DMA/readback buffers",
711 })
712 }
713}
714
715pub(crate) fn write_word(bytes: &mut [u8], word_idx: usize, value: u32) {
716 let off = word_idx * 4;
717 bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
718}
719
720pub(super) fn words_to_bytes(words: u32) -> Option<usize> {
721 usize::try_from(words).ok()?.checked_mul(4)
722}