Skip to main content

vyre_runtime/megakernel/protocol/
codec.rs

1use super::{
2    control, debug, slot, DebugRecord, ProtocolError, CONTROL_MIN_WORDS, MAX_DEBUG_RECORDS,
3    MAX_ENCODED_DEBUG_RECORDS, MAX_ENCODED_OBSERVABLE_SLOTS, MAX_ENCODED_RING_SLOTS,
4    MAX_OBSERVABLE_SLOTS, MAX_RING_SLOTS, SLOT_WORDS, STATUS_WORD,
5};
6
7/// Return the number of bytes required by a control buffer with `observable_slots`.
8#[must_use]
9pub fn control_byte_len(observable_slots: u32) -> Option<usize> {
10    if observable_slots > MAX_OBSERVABLE_SLOTS {
11        return None;
12    }
13    let words = control::OBSERVABLE_BASE.checked_add(observable_slots)?;
14    words_to_bytes(words.max(CONTROL_MIN_WORDS))
15}
16
17/// Return the number of bytes required by a ring buffer with `slot_count` slots.
18#[must_use]
19pub fn ring_byte_len(slot_count: u32) -> Option<usize> {
20    if slot_count > MAX_RING_SLOTS {
21        return None;
22    }
23    let words = slot_count.checked_mul(SLOT_WORDS)?;
24    words_to_bytes(words)
25}
26
27/// Return the number of bytes required by a debug-log buffer.
28#[must_use]
29pub fn debug_log_byte_len(record_capacity: u32) -> Option<usize> {
30    if record_capacity > MAX_DEBUG_RECORDS {
31        return None;
32    }
33    let record_words = record_capacity.checked_mul(debug::RECORD_WORDS)?;
34    let words = debug::RECORDS_BASE.checked_add(record_words)?;
35    words_to_bytes(words)
36}
37
38fn control_encode_capacity(observable_slots: u32) -> Result<usize, ProtocolError> {
39    if observable_slots > MAX_ENCODED_OBSERVABLE_SLOTS {
40        return Err(ProtocolError::ByteLengthOverflow {
41            buffer: "control",
42            fix: "shard observable results or reduce observable_slots to the megakernel allocation cap before encoding control",
43        });
44    }
45    control_byte_len(observable_slots).ok_or(ProtocolError::ByteLengthOverflow {
46        buffer: "control",
47        fix: "shard observable results or reduce observable_slots to the megakernel protocol cap before encoding control",
48    })
49}
50
51fn ring_encode_capacity(slot_count: u32) -> Result<usize, ProtocolError> {
52    if slot_count > MAX_ENCODED_RING_SLOTS {
53        return Err(ProtocolError::ByteLengthOverflow {
54            buffer: "ring",
55            fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel allocation cap or host address space",
56        });
57    }
58    ring_byte_len(slot_count).ok_or(ProtocolError::ByteLengthOverflow {
59        buffer: "ring",
60        fix: "split the dispatch into smaller ring shards before encoding; slot_count exceeds the megakernel protocol cap or host address space",
61    })
62}
63
64fn debug_log_encode_capacity(record_capacity: u32) -> Result<usize, ProtocolError> {
65    if record_capacity > MAX_ENCODED_DEBUG_RECORDS {
66        return Err(ProtocolError::ByteLengthOverflow {
67            buffer: "debug_log",
68            fix:
69                "reduce debug-log record_capacity to the megakernel allocation cap before encoding",
70        });
71    }
72    debug_log_byte_len(record_capacity).ok_or(ProtocolError::ByteLengthOverflow {
73        buffer: "debug_log",
74        fix: "reduce debug-log record_capacity to the megakernel protocol cap before encoding",
75    })
76}
77
78/// Encode a control-buffer payload.
79///
80/// # Errors
81///
82/// Returns [`ProtocolError`] when the requested observable region overflows
83/// host address space.
84pub fn encode_control(
85    shutdown: bool,
86    tenant_count: u32,
87    observable_slots: u32,
88) -> Result<Vec<u8>, ProtocolError> {
89    try_encode_control(shutdown, tenant_count, observable_slots)
90}
91
92/// Strictly encode a control-buffer payload.
93///
94/// # Errors
95///
96/// Returns [`ProtocolError`] when the requested observable region overflows
97/// host address space.
98pub fn try_encode_control(
99    shutdown: bool,
100    tenant_count: u32,
101    observable_slots: u32,
102) -> Result<Vec<u8>, ProtocolError> {
103    let total_bytes = control_encode_capacity(observable_slots)?;
104    let mut bytes = Vec::new();
105    try_reserve_protocol_capacity(
106        &mut bytes,
107        total_bytes,
108        "control",
109        "control encode could not reserve host staging bytes; reduce observable_slots or reuse a preallocated control buffer",
110    )?;
111    try_encode_control_into(shutdown, tenant_count, observable_slots, &mut bytes)?;
112    Ok(bytes)
113}
114
115/// Strictly encode a control-buffer payload into caller-owned storage.
116///
117/// Clears and resizes `dst` to the exact control-buffer byte length, reusing
118/// any existing allocation.
119///
120/// # Errors
121///
122/// Returns [`ProtocolError`] when the requested observable region overflows
123/// host address space.
124pub fn try_encode_control_into(
125    shutdown: bool,
126    tenant_count: u32,
127    observable_slots: u32,
128    dst: &mut Vec<u8>,
129) -> Result<(), ProtocolError> {
130    let total_bytes = control_encode_capacity(observable_slots)?;
131    dst.clear();
132    try_reserve_protocol_capacity(
133        dst,
134        total_bytes,
135        "control",
136        "control encode could not reserve caller-owned staging bytes; reduce observable_slots or reuse a larger control buffer",
137    )?;
138    dst.resize(total_bytes, 0);
139
140    if shutdown {
141        write_word(
142            dst,
143            control_word_index(control::SHUTDOWN, "shutdown word")?,
144            1,
145        );
146    }
147    write_word(
148        dst,
149        control_word_index(control::TENANT_BASE, "tenant base word")?,
150        control::TENANT_BASE + 1,
151    );
152
153    let tenant_table_start = control_word_index(control::TENANT_BASE, "tenant base word")?
154        .checked_add(1)
155        .ok_or(ProtocolError::ByteLengthOverflow {
156            buffer: "control",
157            fix: "tenant table start overflowed usize; reduce control protocol constants",
158        })?;
159    let requested_tenant_words =
160        usize::try_from(tenant_count).map_err(|_| ProtocolError::ByteLengthOverflow {
161            buffer: "control",
162            fix: "tenant_count cannot fit host usize; split tenant tables before encoding",
163        })?;
164    let tenant_table_end = core::cmp::min(
165        tenant_table_start
166            .checked_add(requested_tenant_words)
167            .ok_or(ProtocolError::ByteLengthOverflow {
168                buffer: "control",
169                fix: "tenant table end overflowed usize; split tenant tables before encoding",
170            })?,
171        control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?,
172    );
173    for word_idx in tenant_table_start..tenant_table_end {
174        write_word(dst, word_idx, !0u32);
175    }
176
177    let quota_table_start =
178        control_word_index(control::TENANT_QUOTA_BASE, "tenant quota base word")?;
179    let quota_table_end = core::cmp::min(
180        quota_table_start
181            .checked_add(requested_tenant_words)
182            .ok_or(ProtocolError::ByteLengthOverflow {
183                buffer: "control",
184                fix: "quota table end overflowed usize; split tenant tables before encoding",
185            })?,
186        control_word_index(control::TENANT_FAIRNESS_BASE, "tenant fairness base word")?,
187    );
188    for word_idx in quota_table_start..quota_table_end {
189        write_word(dst, word_idx, 1_000_000);
190    }
191    Ok(())
192}
193
194/// Encode an empty ring buffer with `slot_count` slots.
195///
196/// # Errors
197///
198/// Returns [`ProtocolError`] when the requested ring size overflows host
199/// address space.
200pub fn encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
201    try_encode_empty_ring(slot_count)
202}
203
204/// Strictly encode an empty ring buffer with `slot_count` slots.
205///
206/// # Errors
207///
208/// Returns [`ProtocolError`] when the requested ring size overflows host
209/// address space.
210pub fn try_encode_empty_ring(slot_count: u32) -> Result<Vec<u8>, ProtocolError> {
211    let total_bytes = ring_encode_capacity(slot_count)?;
212    let mut bytes = Vec::new();
213    try_reserve_protocol_capacity(
214        &mut bytes,
215        total_bytes,
216        "ring",
217        "ring encode could not reserve host staging bytes; split the dispatch into smaller ring shards or reuse a preallocated ring buffer",
218    )?;
219    try_encode_empty_ring_into(slot_count, &mut bytes)?;
220    Ok(bytes)
221}
222
223/// Strictly encode an empty ring buffer into caller-owned storage.
224///
225/// Clears and resizes `dst` to the exact ring byte length, reusing allocation.
226///
227/// # Errors
228///
229/// Returns [`ProtocolError`] when the requested ring size overflows host
230/// address space.
231pub fn try_encode_empty_ring_into(slot_count: u32, dst: &mut Vec<u8>) -> Result<(), ProtocolError> {
232    let total_bytes = ring_encode_capacity(slot_count)?;
233    dst.clear();
234    try_reserve_protocol_capacity(
235        dst,
236        total_bytes,
237        "ring",
238        "ring encode could not reserve caller-owned staging bytes; split the dispatch into smaller ring shards or reuse a larger ring buffer",
239    )?;
240    dst.resize(total_bytes, 0);
241    Ok(())
242}
243
244/// Encode an empty PRINTF channel buffer.
245///
246/// # Errors
247///
248/// Returns [`ProtocolError`] when the requested debug-log size overflows host
249/// address space.
250pub fn encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
251    try_encode_empty_debug_log(record_capacity)
252}
253
254/// Strictly encode an empty PRINTF channel buffer.
255///
256/// # Errors
257///
258/// Returns [`ProtocolError`] when the requested debug-log size overflows host
259/// address space.
260pub fn try_encode_empty_debug_log(record_capacity: u32) -> Result<Vec<u8>, ProtocolError> {
261    let total_bytes = debug_log_encode_capacity(record_capacity)?;
262    let mut bytes = Vec::new();
263    try_reserve_protocol_capacity(
264        &mut bytes,
265        total_bytes,
266        "debug_log",
267        "debug-log encode could not reserve host staging bytes; reduce record_capacity or reuse a preallocated debug-log buffer",
268    )?;
269    try_encode_empty_debug_log_into(record_capacity, &mut bytes)?;
270    Ok(bytes)
271}
272
273/// Strictly encode an empty PRINTF channel buffer into caller-owned storage.
274///
275/// Clears and resizes `dst` to the exact debug-log byte length, reusing allocation.
276///
277/// # Errors
278///
279/// Returns [`ProtocolError`] when the requested debug-log size overflows host
280/// address space.
281pub fn try_encode_empty_debug_log_into(
282    record_capacity: u32,
283    dst: &mut Vec<u8>,
284) -> Result<(), ProtocolError> {
285    let total_bytes = debug_log_encode_capacity(record_capacity)?;
286    dst.clear();
287    try_reserve_protocol_capacity(
288        dst,
289        total_bytes,
290        "debug_log",
291        "debug-log encode could not reserve caller-owned staging bytes; reduce record_capacity or reuse a larger debug-log buffer",
292    )?;
293    dst.resize(total_bytes, 0);
294    Ok(())
295}
296
297/// Decode the kernel's `done_count` from a control buffer.
298#[must_use]
299pub fn read_done_count(control_bytes: &[u8]) -> u32 {
300    try_read_done_count(control_bytes).unwrap_or_else(|source| {
301        panic!(
302            "megakernel control done_count decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
303        )
304    })
305}
306
307/// Read the epoch counter from a control buffer.
308#[must_use]
309pub fn read_epoch(control_bytes: &[u8]) -> u32 {
310    try_read_epoch(control_bytes).unwrap_or_else(|source| {
311        panic!(
312            "megakernel control epoch decode failed: {source}. Fix: use a complete control readback produced by the matching megakernel protocol encoder."
313        )
314    })
315}
316
317/// Strictly decode the kernel's `done_count` from a control buffer.
318///
319/// # Errors
320///
321/// Returns [`ProtocolError`] when the buffer is not word-aligned or is too
322/// short to contain the fixed control header.
323pub fn try_read_done_count(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
324    read_required_word(
325        "control",
326        control_bytes,
327        control_word_index(control::DONE_COUNT, "done-count word")?,
328    )
329}
330
331/// Strictly decode the epoch counter from a control buffer.
332///
333/// # Errors
334///
335/// Returns [`ProtocolError`] when the buffer is not word-aligned or is too
336/// short to contain the epoch word.
337pub fn try_read_epoch(control_bytes: &[u8]) -> Result<u32, ProtocolError> {
338    read_required_word(
339        "control",
340        control_bytes,
341        control_word_index(control::EPOCH, "epoch word")?,
342    )
343}
344
345/// Read an observable result word from a control buffer.
346#[must_use]
347pub fn read_observable(control_bytes: &[u8], index: u32) -> u32 {
348    try_read_observable(control_bytes, index).unwrap_or(0)
349}
350
351/// Strictly read an observable result word from a control buffer.
352///
353/// # Errors
354///
355/// Returns [`ProtocolError`] when the buffer is not word-aligned, the index
356/// overflows the observable word offset, or the word is outside the buffer.
357pub fn try_read_observable(control_bytes: &[u8], index: u32) -> Result<u32, ProtocolError> {
358    let word_idx = control_word_index(
359        control::OBSERVABLE_BASE
360            .checked_add(index)
361            .ok_or(ProtocolError::ByteLengthOverflow {
362                buffer: "control",
363                fix: "observable index overflows the protocol word offset; shard observable reads",
364            })?,
365        "observable word index",
366    )?;
367    read_required_word("control", control_bytes, word_idx)
368}
369
370/// Read per-opcode metrics counters from a control buffer.
371#[must_use]
372pub fn read_metrics(control_bytes: &[u8]) -> Vec<(u32, u32)> {
373    let mut result = Vec::new();
374    read_metrics_into(control_bytes, &mut result);
375    result
376}
377
378/// Read per-opcode metrics counters into caller-owned storage.
379///
380/// Clears `out`, then reuses its allocation.
381pub fn read_metrics_into(control_bytes: &[u8], out: &mut Vec<(u32, u32)>) {
382    out.clear();
383    let Ok(metrics_base) = control_word_index(control::METRICS_BASE, "metrics base word") else {
384        return;
385    };
386    let available_words = control_bytes.len() / 4;
387    if available_words <= metrics_base {
388        return;
389    }
390    let available_slots = (available_words - metrics_base).min(control::METRICS_SLOTS as usize);
391    let nonzero = count_nonzero_metrics_truncated(control_bytes, metrics_base, available_slots);
392    if try_reserve_target_capacity(out, nonzero).is_err() {
393        return;
394    }
395    for slot in 0..available_slots {
396        let word_idx = metrics_base + slot;
397        let Some(count) = read_word_unaligned(control_bytes, word_idx) else {
398            break;
399        };
400        if count > 0 {
401            out.push((slot as u32, count));
402        }
403    }
404}
405
406/// Strictly read per-opcode metrics counters from a control buffer.
407///
408/// # Errors
409///
410/// Returns [`ProtocolError`] when the buffer is not word-aligned or is too
411/// short for the fixed metrics window.
412pub fn try_read_metrics(control_bytes: &[u8]) -> Result<Vec<(u32, u32)>, ProtocolError> {
413    let mut result = Vec::new();
414    try_read_metrics_into(control_bytes, &mut result)?;
415    Ok(result)
416}
417
418/// Strictly read per-opcode metrics counters into caller-owned storage.
419///
420/// Clears `out`, then reuses its allocation.
421///
422/// # Errors
423///
424/// Returns [`ProtocolError`] when the buffer is not word-aligned or is too
425/// short for the fixed metrics window.
426pub fn try_read_metrics_into(
427    control_bytes: &[u8],
428    out: &mut Vec<(u32, u32)>,
429) -> Result<(), ProtocolError> {
430    validate_word_aligned("control", control_bytes)?;
431    out.clear();
432    if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(control_bytes) {
433        try_reserve_target_capacity(
434            out,
435            count_nonzero_metrics_words_strict(words, control_bytes.len())?,
436        )?;
437        for i in 0..control::METRICS_SLOTS {
438            let word_idx = metrics_word_index(i)?;
439            let count =
440                words
441                    .get(word_idx)
442                    .copied()
443                    .map(u32::from_le)
444                    .ok_or(ProtocolError::MissingWord {
445                        buffer: "control",
446                        word_idx,
447                        byte_len: control_bytes.len(),
448                        fix: "decode only control buffers produced by the matching megakernel protocol encoder",
449                    })?;
450            if count > 0 {
451                out.push((i, count));
452            }
453        }
454        return Ok(());
455    }
456    try_reserve_target_capacity(out, count_nonzero_metrics_unaligned_strict(control_bytes)?)?;
457    for i in 0..control::METRICS_SLOTS {
458        let word_idx = metrics_word_index(i)?;
459        let count = read_word_unaligned(control_bytes, word_idx)
460            .ok_or(ProtocolError::MissingWord {
461            buffer: "control",
462            word_idx,
463            byte_len: control_bytes.len(),
464            fix: "decode only control buffers produced by the matching megakernel protocol encoder",
465        })?;
466        if count > 0 {
467            out.push((i, count));
468        }
469    }
470    Ok(())
471}
472
473
474mod debug_log;
475
476pub use debug_log::{
477    read_debug_log, read_debug_log_into, try_read_debug_log, try_read_debug_log_into,
478};
479
480/// Count DONE slots in a ring-buffer readback.
481///
482/// Returns `None` when the supplied bytes cannot contain `item_count` whole
483/// slots. This is intentionally part of the protocol module: DONE status is an
484/// ABI word, not a backend-specific readback rule.
485#[must_use]
486pub fn count_done_ring_slots(ring_bytes: &[u8], item_count: usize) -> Option<u64> {
487    if item_count == 0 {
488        return None;
489    }
490    let slot_words = usize::try_from(SLOT_WORDS).ok()?;
491    let required_bytes = item_count.checked_mul(slot_words)?.checked_mul(4)?;
492    if ring_bytes.len() < required_bytes {
493        return None;
494    }
495    let status_word = usize::try_from(STATUS_WORD).ok()?;
496    let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
497    let done = (0..item_count)
498        .filter(|slot_idx| {
499            let word_idx = slot_idx
500                .checked_mul(slot_words)
501                .and_then(|base| base.checked_add(status_word));
502            word_idx.and_then(|idx| read_word_from_optional_words(words, ring_bytes, idx))
503                == Some(slot::DONE)
504        })
505        .count();
506    u64::try_from(done).ok()
507}
508
509/// Strictly count DONE slots in a ring-buffer readback.
510///
511/// # Errors
512///
513/// Returns [`ProtocolError`] when `ring_bytes` cannot contain `item_count`
514/// complete ring slots or when the byte count overflows the host protocol
515/// domain.
516pub fn try_count_done_ring_slots(
517    ring_bytes: &[u8],
518    item_count: usize,
519) -> Result<u64, ProtocolError> {
520    if item_count == 0 {
521        return Ok(0);
522    }
523    validate_word_aligned("ring", ring_bytes)?;
524    let slot_words =
525        usize::try_from(SLOT_WORDS).map_err(|_| ProtocolError::ByteLengthOverflow {
526            buffer: "ring",
527            fix: "keep SLOT_WORDS representable in host usize before decoding ring status",
528        })?;
529    let required_bytes = item_count
530        .checked_mul(slot_words)
531        .and_then(|words| words.checked_mul(4))
532        .ok_or(ProtocolError::ByteLengthOverflow {
533            buffer: "ring",
534            fix: "split the dispatch before ring status decode overflows host address space",
535        })?;
536    if ring_bytes.len() < required_bytes {
537        return Err(ProtocolError::MissingWord {
538            buffer: "ring",
539            word_idx: required_bytes / 4,
540            byte_len: ring_bytes.len(),
541            fix: "decode only full ring readbacks sized for the submitted megakernel item_count",
542        });
543    }
544    let status_word =
545        usize::try_from(STATUS_WORD).map_err(|_| ProtocolError::ByteLengthOverflow {
546            buffer: "ring",
547            fix: "keep STATUS_WORD representable in host usize before decoding ring status",
548        })?;
549    let words = bytemuck::try_cast_slice::<u8, u32>(ring_bytes).ok();
550    let mut done = 0_u64;
551    for slot_idx in 0..item_count {
552        let word_idx = slot_idx
553            .checked_mul(slot_words)
554            .and_then(|base| base.checked_add(status_word))
555            .ok_or(ProtocolError::ByteLengthOverflow {
556                buffer: "ring",
557                fix: "split the dispatch before ring status word indexing overflows host address space",
558            })?;
559        if read_word_from_optional_words(words, ring_bytes, word_idx) == Some(slot::DONE) {
560            done = done
561                .checked_add(1)
562                .ok_or(ProtocolError::ByteLengthOverflow {
563                    buffer: "ring",
564                    fix: "split the dispatch before DONE slot count exceeds u64",
565                })?;
566        }
567    }
568    Ok(done)
569}
570
571fn try_reserve_target_capacity<T>(
572    out: &mut Vec<T>,
573    target_capacity: usize,
574) -> Result<(), ProtocolError> {
575    try_reserve_protocol_capacity(
576        out,
577        target_capacity,
578        "control",
579        "host metrics decode could not reserve output records; reduce metrics fanout or decode into a reused scratch vector",
580    )
581}
582
583fn try_reserve_protocol_capacity<T>(
584    out: &mut Vec<T>,
585    target_capacity: usize,
586    buffer: &'static str,
587    fix: &'static str,
588) -> Result<(), ProtocolError> {
589    vyre_foundation::allocation::try_reserve_vec_to_capacity(out, target_capacity)
590        .map_err(|_| ProtocolError::ByteLengthOverflow { buffer, fix })
591}
592
593fn count_nonzero_metrics_words_strict(
594    words: &[u32],
595    byte_len: usize,
596) -> Result<usize, ProtocolError> {
597    let mut count = 0usize;
598    for i in 0..control::METRICS_SLOTS {
599        let word_idx = metrics_word_index(i)?;
600        let word = words
601            .get(word_idx)
602            .copied()
603            .map(u32::from_le)
604            .ok_or(ProtocolError::MissingWord {
605            buffer: "control",
606            word_idx,
607            byte_len,
608            fix: "decode only control buffers produced by the matching megakernel protocol encoder",
609        })?;
610        if word > 0 {
611            count += 1;
612        }
613    }
614    Ok(count)
615}
616
617fn count_nonzero_metrics_unaligned_strict(control_bytes: &[u8]) -> Result<usize, ProtocolError> {
618    let mut count = 0usize;
619    for i in 0..control::METRICS_SLOTS {
620        let word_idx = metrics_word_index(i)?;
621        let word = read_word_unaligned(control_bytes, word_idx)
622            .ok_or(ProtocolError::MissingWord {
623            buffer: "control",
624            word_idx,
625            byte_len: control_bytes.len(),
626            fix: "decode only control buffers produced by the matching megakernel protocol encoder",
627        })?;
628        if word > 0 {
629            count += 1;
630        }
631    }
632    Ok(count)
633}
634
635fn count_nonzero_metrics_truncated(
636    control_bytes: &[u8],
637    metrics_base: usize,
638    available_slots: usize,
639) -> usize {
640    let mut count = 0usize;
641    for slot in 0..available_slots {
642        if read_word_unaligned(control_bytes, metrics_base + slot).unwrap_or(0) > 0 {
643            count += 1;
644        }
645    }
646    count
647}
648
649fn metrics_word_index(slot: u32) -> Result<usize, ProtocolError> {
650    let word =
651        control::METRICS_BASE
652            .checked_add(slot)
653            .ok_or(ProtocolError::ByteLengthOverflow {
654                buffer: "control",
655                fix: "metrics slot index overflows the protocol word offset; shard metrics reads",
656            })?;
657    control_word_index(word, "metrics word index")
658}
659
660fn control_word_index(word: u32, label: &'static str) -> Result<usize, ProtocolError> {
661    usize::try_from(word).map_err(|_| ProtocolError::ByteLengthOverflow {
662        buffer: "control",
663        fix: match label {
664            "observable word index" => {
665                "observable word index cannot fit host usize; shard observable reads"
666            }
667            "metrics word index" => "metrics word index cannot fit host usize; shard metrics reads",
668            _ => "control word index cannot fit host usize; shard protocol reads",
669        },
670    })
671}
672
673pub(crate) fn read_word(bytes: &[u8], word_idx: usize) -> Option<u32> {
674    if let Ok(words) = bytemuck::try_cast_slice::<u8, u32>(bytes) {
675        return words.get(word_idx).copied().map(u32::from_le);
676    }
677    read_word_unaligned(bytes, word_idx)
678}
679
680fn read_word_from_optional_words(
681    words: Option<&[u32]>,
682    bytes: &[u8],
683    word_idx: usize,
684) -> Option<u32> {
685    if let Some(words) = words {
686        return words.get(word_idx).copied().map(u32::from_le);
687    }
688    read_word_unaligned(bytes, word_idx)
689}
690
691fn read_word_unaligned(bytes: &[u8], word_idx: usize) -> Option<u32> {
692    let off = word_idx.checked_mul(4)?;
693    let end = off.checked_add(4)?;
694    let word = bytes.get(off..end)?;
695    Some(u32::from_le_bytes(word.try_into().ok()?))
696}
697
698fn read_required_word(
699    buffer: &'static str,
700    bytes: &[u8],
701    word_idx: usize,
702) -> Result<u32, ProtocolError> {
703    validate_word_aligned(buffer, bytes)?;
704    read_word(bytes, word_idx).ok_or(ProtocolError::MissingWord {
705        buffer,
706        word_idx,
707        byte_len: bytes.len(),
708        fix: "decode only buffers produced by the matching megakernel protocol encoder",
709    })
710}
711
712fn validate_word_aligned(buffer: &'static str, bytes: &[u8]) -> Result<(), ProtocolError> {
713    if bytes.len() % 4 == 0 {
714        Ok(())
715    } else {
716        Err(ProtocolError::MisalignedByteLength {
717            buffer,
718            byte_len: bytes.len(),
719            fix: "pass whole u32 protocol words; do not decode partial DMA/readback buffers",
720        })
721    }
722}
723
724pub(crate) fn write_word(bytes: &mut [u8], word_idx: usize, value: u32) {
725    let off = word_idx * 4;
726    bytes[off..off + 4].copy_from_slice(&value.to_le_bytes());
727}
728
729fn words_to_bytes(words: u32) -> Option<usize> {
730    usize::try_from(words).ok()?.checked_mul(4)
731}
732