1use super::protocol::{control, slot, ARG0_WORD, OPCODE_WORD, STATUS_WORD, TENANT_WORD};
8use super::scaling::{
9 MegakernelLaunchPolicy, MegakernelLaunchRecommendation, MegakernelLaunchRequest,
10 PriorityRequeueAccounting,
11};
12use super::staging_reserve::{
13 reserve_hash_map_capacity, reserve_vec_capacity as reserve_target_capacity,
14};
15use crate::PipelineError;
16
17mod sketch;
18mod types;
19pub use sketch::{CountMinSketch, SketchTelemetry, SketchTelemetryScratch};
20use types::WindowAccumulator;
21pub use types::{
22 ControlSnapshot, MegakernelRuntimeCounters, MegakernelWatchdogSnapshot, RingOccupancy,
23 RingSlotSnapshot, RingStatus, RingTelemetry, TelemetryDecodeScratch, WindowTelemetry,
24};
25
26const SLOT_WORDS_USIZE: usize = 16;
27
28fn read_word(buf: &[u8], word_idx: usize) -> Option<u32> {
29 let off = word_idx.checked_mul(4)?;
30 let end = off.checked_add(4)?;
31 let bytes = buf.get(off..end)?;
32 Some(u32::from_le_bytes(bytes.try_into().ok()?))
33}
34
35fn read_slot_chunk_word_exact(slot_bytes: &[u8], word_idx: u32) -> u32 {
36 let off = telemetry_u32_to_usize(word_idx, "slot word index")
37 .checked_mul(4)
38 .unwrap_or_else(|| {
39 panic!(
40 "megakernel telemetry slot word byte offset overflowed usize. Fix: keep slot word indices within host address space."
41 )
42 });
43 u32::from_le_bytes([
44 slot_bytes[off],
45 slot_bytes[off + 1],
46 slot_bytes[off + 2],
47 slot_bytes[off + 3],
48 ])
49}
50
51fn is_sorted_unique_u32(values: &[u32]) -> bool {
52 values.windows(2).all(|pair| pair[0] < pair[1])
53}
54
55impl ControlSnapshot {
56 #[must_use]
58 pub fn decode(control_bytes: &[u8]) -> Self {
59 let mut out = Self::default();
60 Self::try_decode_into(control_bytes, &mut out).unwrap_or_else(|source| {
61 panic!(
62 "megakernel control telemetry decode failed: {source}. Fix: capture the full control buffer before telemetry decode."
63 )
64 });
65 out
66 }
67
68 pub fn decode_into(control_bytes: &[u8], out: &mut Self) {
70 Self::try_decode_into(control_bytes, out).unwrap_or_else(|source| {
71 panic!(
72 "megakernel control telemetry decode failed: {source}. Fix: capture the full control buffer before telemetry decode."
73 )
74 });
75 }
76
77 pub fn try_decode_into(control_bytes: &[u8], out: &mut Self) -> Result<(), PipelineError> {
84 validate_control_snapshot(control_bytes)?;
85 out.shutdown =
86 read_required_control_word(control_bytes, control_word_index(control::SHUTDOWN)?)? != 0;
87 out.done_count =
88 read_required_control_word(control_bytes, control_word_index(control::DONE_COUNT)?)?;
89 out.epoch = read_required_control_word(control_bytes, control_word_index(control::EPOCH)?)?;
90 out.metrics.clear();
91 reserve_target_capacity(
92 &mut out.metrics,
93 telemetry_u32_to_usize(control::METRICS_SLOTS, "metrics slot count"),
94 "metrics",
95 )?;
96 for i in 0..control::METRICS_SLOTS {
97 let count = read_required_control_word(
98 control_bytes,
99 control_offset_index(control::METRICS_BASE, i)?,
100 )?;
101 if count > 0 {
102 out.metrics.push((i, count));
103 }
104 }
105 out.tenant_fairness.clear();
106 reserve_target_capacity(
107 &mut out.tenant_fairness,
108 telemetry_u32_to_usize(control::TENANT_FAIRNESS_SLOTS, "tenant fairness slot count"),
109 "tenant fairness",
110 )?;
111 for i in 0..control::TENANT_FAIRNESS_SLOTS {
112 out.tenant_fairness.push(read_required_control_word(
113 control_bytes,
114 control_offset_index(control::TENANT_FAIRNESS_BASE, i)?,
115 )?);
116 }
117 out.priority_fairness.clear();
118 reserve_target_capacity(
119 &mut out.priority_fairness,
120 telemetry_u32_to_usize(
121 control::PRIORITY_FAIRNESS_SLOTS,
122 "priority fairness slot count",
123 ),
124 "priority fairness",
125 )?;
126 for i in 0..control::PRIORITY_FAIRNESS_SLOTS {
127 out.priority_fairness.push(read_required_control_word(
128 control_bytes,
129 control_offset_index(control::PRIORITY_FAIRNESS_BASE, i)?,
130 )?);
131 }
132 Ok(())
133 }
134}
135
136impl RingTelemetry {
137 #[must_use]
139 pub fn decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Self {
140 Self::decode_with_window_opcodes(control_bytes, ring_bytes, &[])
141 }
142
143 pub fn try_decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Result<Self, PipelineError> {
150 Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, &[])
151 }
152
153 #[must_use]
157 pub fn decode_with_window_opcodes(
158 control_bytes: &[u8],
159 ring_bytes: &[u8],
160 window_opcodes: &[u32],
161 ) -> Self {
162 validate_telemetry_buffers(control_bytes, ring_bytes).unwrap_or_else(|source| {
163 panic!(
164 "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
165 )
166 });
167 let mut out = Self::default();
168 let mut scratch = TelemetryDecodeScratch::new();
169 Self::try_decode_with_window_opcodes_into_unchecked(
170 control_bytes,
171 ring_bytes,
172 window_opcodes,
173 &mut out,
174 &mut scratch,
175 )
176 .unwrap_or_else(|source| {
177 panic!(
178 "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
179 )
180 });
181 out
182 }
183
184 pub fn decode_with_window_opcodes_into(
187 control_bytes: &[u8],
188 ring_bytes: &[u8],
189 window_opcodes: &[u32],
190 out: &mut Self,
191 scratch: &mut TelemetryDecodeScratch,
192 ) {
193 validate_telemetry_buffers(control_bytes, ring_bytes).unwrap_or_else(|source| {
194 panic!(
195 "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
196 )
197 });
198 Self::try_decode_with_window_opcodes_into_unchecked(
199 control_bytes,
200 ring_bytes,
201 window_opcodes,
202 out,
203 scratch,
204 )
205 .unwrap_or_else(|source| {
206 panic!(
207 "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
208 )
209 });
210 }
211
212 fn try_decode_with_window_opcodes_into_unchecked(
213 control_bytes: &[u8],
214 ring_bytes: &[u8],
215 window_opcodes: &[u32],
216 out: &mut Self,
217 scratch: &mut TelemetryDecodeScratch,
218 ) -> Result<(), PipelineError> {
219 enum WindowOpcodeMatcher<'a> {
220 None,
221 Single(u32),
222 DenseBitmap(u128),
223 SmallSlice(&'a [u32]),
224 LargeSlice(&'a [u32]),
225 }
226
227 ControlSnapshot::try_decode_into(control_bytes, &mut out.control)?;
228 let slot_count = ring_bytes.len() / slot_byte_len();
229 out.occupancy = RingOccupancy::default();
230 out.slots.clear();
231 reserve_target_capacity(&mut out.slots, slot_count, "ring slots")?;
232 out.windows.clear();
233 scratch.window_opcodes.clear();
234 scratch.windows.clear();
235 let window_opcode_lookup = if window_opcodes.is_empty() {
236 &[][..]
237 } else if is_sorted_unique_u32(window_opcodes) {
238 window_opcodes
239 } else {
240 reserve_target_capacity(
241 &mut scratch.window_opcodes,
242 window_opcodes.len(),
243 "window opcode scratch",
244 )?;
245 scratch.window_opcodes.extend_from_slice(window_opcodes);
246 scratch.window_opcodes.sort_unstable();
247 scratch.window_opcodes.dedup();
248 scratch.window_opcodes.as_slice()
249 };
250 let window_opcode_matcher = match window_opcode_lookup {
251 [] => WindowOpcodeMatcher::None,
252 [opcode] => WindowOpcodeMatcher::Single(*opcode),
253 opcodes if opcodes.len() > 1 && opcodes.iter().all(|opcode| *opcode < 128) => {
254 let bitmap = opcodes
255 .iter()
256 .fold(0_u128, |acc, &opcode| acc | (1_u128 << opcode));
257 WindowOpcodeMatcher::DenseBitmap(bitmap)
258 }
259 opcodes if opcodes.len() <= 8 => WindowOpcodeMatcher::SmallSlice(opcodes),
260 opcodes => WindowOpcodeMatcher::LargeSlice(opcodes),
261 };
262 if !matches!(window_opcode_matcher, WindowOpcodeMatcher::None) {
263 reserve_hash_map_capacity(
264 &mut scratch.windows,
265 slot_count,
266 "window accumulator scratch",
267 )?;
268 }
269 let decode_windows = !matches!(window_opcode_matcher, WindowOpcodeMatcher::None);
270
271 let slot_byte_len = slot_byte_len();
272 for (slot_idx, slot_bytes) in ring_bytes.chunks_exact(slot_byte_len).enumerate() {
273 let slot_idx = u32::try_from(slot_idx).unwrap_or_else(|source| {
274 panic!(
275 "megakernel telemetry slot index cannot fit u32: {source}. Fix: shard ring snapshots before host decode."
276 )
277 });
278 let status_raw = read_slot_chunk_word_exact(slot_bytes, STATUS_WORD);
279 let status = RingStatus::from_raw(status_raw);
280 match status {
281 RingStatus::Empty => out.occupancy.empty += 1,
282 RingStatus::Published => out.occupancy.published += 1,
283 RingStatus::Claimed => out.occupancy.claimed += 1,
284 RingStatus::Done => out.occupancy.done += 1,
285 RingStatus::WaitIo => out.occupancy.wait_io += 1,
286 RingStatus::Yield => out.occupancy.yield_count += 1,
287 RingStatus::Requeue => out.occupancy.requeue += 1,
288 RingStatus::Fault => out.occupancy.fault += 1,
289 RingStatus::Unknown(_) => out.occupancy.unknown += 1,
290 }
291 let tenant_id = read_slot_chunk_word_exact(slot_bytes, TENANT_WORD);
292 let opcode = read_slot_chunk_word_exact(slot_bytes, OPCODE_WORD);
293 let args_prefix = [
294 read_slot_chunk_word_exact(slot_bytes, ARG0_WORD),
295 read_slot_chunk_word_exact(slot_bytes, ARG0_WORD + 1),
296 read_slot_chunk_word_exact(slot_bytes, ARG0_WORD + 2),
297 ];
298 let is_window_opcode = match window_opcode_matcher {
299 WindowOpcodeMatcher::None => false,
300 WindowOpcodeMatcher::Single(expected) => opcode == expected,
301 WindowOpcodeMatcher::DenseBitmap(bitmap) => {
302 opcode < 128 && ((bitmap >> opcode) & 1) == 1
303 }
304 WindowOpcodeMatcher::SmallSlice(window_opcodes) => window_opcodes.contains(&opcode),
305 WindowOpcodeMatcher::LargeSlice(window_opcodes) => {
306 window_opcodes.binary_search(&opcode).is_ok()
307 }
308 };
309 if decode_windows && is_window_opcode {
310 let ticket = args_prefix[0];
311 let class_tag = args_prefix[1];
312 let entry =
313 scratch
314 .windows
315 .entry((ticket, opcode))
316 .or_insert_with(|| WindowAccumulator {
317 tenant_id,
318 opcode,
319 ..WindowAccumulator::default()
320 });
321 match class_tag {
322 0 => entry.required_slots += 1,
323 1 => entry.lookahead_slots += 1,
324 _ => {}
325 }
326 match status {
327 RingStatus::Published => entry.published += 1,
328 RingStatus::Claimed => entry.claimed += 1,
329 RingStatus::Done => entry.done += 1,
330 RingStatus::WaitIo => entry.wait_io += 1,
331 RingStatus::Yield => entry.yield_count += 1,
332 RingStatus::Requeue => entry.requeue += 1,
333 RingStatus::Fault => entry.fault += 1,
334 RingStatus::Empty | RingStatus::Unknown(_) => {}
335 }
336 }
337 out.slots.push(RingSlotSnapshot {
338 slot_idx,
339 status,
340 tenant_id,
341 opcode,
342 args_prefix,
343 });
344 }
345
346 reserve_target_capacity(&mut out.windows, scratch.windows.len(), "window output")?;
347 for (&(ticket, _), acc) in &scratch.windows {
348 out.windows.push(WindowTelemetry {
349 ticket,
350 tenant_id: acc.tenant_id,
351 opcode: acc.opcode,
352 required_slots: acc.required_slots,
353 lookahead_slots: acc.lookahead_slots,
354 published: acc.published,
355 claimed: acc.claimed,
356 done: acc.done,
357 wait_io: acc.wait_io,
358 yield_count: acc.yield_count,
359 requeue: acc.requeue,
360 fault: acc.fault,
361 });
362 }
363 out.windows
364 .sort_unstable_by_key(|window| (window.ticket, window.opcode));
365 Ok(())
366 }
367
368 pub fn try_decode_with_window_opcodes(
375 control_bytes: &[u8],
376 ring_bytes: &[u8],
377 window_opcodes: &[u32],
378 ) -> Result<Self, PipelineError> {
379 validate_telemetry_buffers(control_bytes, ring_bytes)?;
380 let mut out = Self::default();
381 let mut scratch = TelemetryDecodeScratch::new();
382 Self::try_decode_with_window_opcodes_into_unchecked(
383 control_bytes,
384 ring_bytes,
385 window_opcodes,
386 &mut out,
387 &mut scratch,
388 )?;
389 Ok(out)
390 }
391
392 pub fn try_decode_with_window_opcodes_into(
400 control_bytes: &[u8],
401 ring_bytes: &[u8],
402 window_opcodes: &[u32],
403 out: &mut Self,
404 scratch: &mut TelemetryDecodeScratch,
405 ) -> Result<(), PipelineError> {
406 validate_telemetry_buffers(control_bytes, ring_bytes)?;
407 Self::try_decode_with_window_opcodes_into_unchecked(
408 control_bytes,
409 ring_bytes,
410 window_opcodes,
411 out,
412 scratch,
413 )?;
414 Ok(())
415 }
416
417 #[must_use]
419 pub fn active_slots_for_opcode(&self, opcode: u32) -> Vec<&RingSlotSnapshot> {
420 self.try_active_slots_for_opcode(opcode).unwrap_or_else(|source| {
421 panic!(
422 "megakernel active-slot telemetry query failed: {source}. Fix: decode into caller-owned reusable slot scratch."
423 )
424 })
425 }
426
427 pub fn try_active_slots_for_opcode(
433 &self,
434 opcode: u32,
435 ) -> Result<Vec<&RingSlotSnapshot>, PipelineError> {
436 let mut out = Vec::new();
437 self.try_active_slots_for_opcode_into(opcode, &mut out)?;
438 Ok(out)
439 }
440
441 pub fn active_slots_for_opcode_iter(
443 &self,
444 opcode: u32,
445 ) -> impl Iterator<Item = &RingSlotSnapshot> {
446 self.slots
447 .iter()
448 .filter(move |slot| slot.opcode == opcode && slot.status.is_active())
449 }
450
451 pub fn active_slots_for_opcode_into<'a>(
453 &'a self,
454 opcode: u32,
455 out: &mut Vec<&'a RingSlotSnapshot>,
456 ) {
457 self.try_active_slots_for_opcode_into(opcode, out)
458 .unwrap_or_else(|source| {
459 panic!(
460 "megakernel active-slot telemetry query failed: {source}. Fix: decode into caller-owned reusable slot scratch."
461 )
462 });
463 }
464
465 pub fn try_active_slots_for_opcode_into<'a>(
471 &'a self,
472 opcode: u32,
473 out: &mut Vec<&'a RingSlotSnapshot>,
474 ) -> Result<(), PipelineError> {
475 out.clear();
476 reserve_target_capacity(out, self.slots.len(), "active slot output")?;
477 self.slots
478 .iter()
479 .filter(|slot| slot.opcode == opcode && slot.status.is_active())
480 .for_each(|slot| out.push(slot));
481 Ok(())
482 }
483
484 #[must_use]
486 pub fn active_windows(&self) -> Vec<&WindowTelemetry> {
487 self.try_active_windows().unwrap_or_else(|source| {
488 panic!(
489 "megakernel active-window telemetry query failed: {source}. Fix: decode into caller-owned reusable window scratch."
490 )
491 })
492 }
493
494 pub fn try_active_windows(&self) -> Result<Vec<&WindowTelemetry>, PipelineError> {
500 let mut out = Vec::new();
501 self.try_active_windows_into(&mut out)?;
502 Ok(out)
503 }
504
505 pub fn active_windows_into<'a>(&'a self, out: &mut Vec<&'a WindowTelemetry>) {
507 self.try_active_windows_into(out).unwrap_or_else(|source| {
508 panic!(
509 "megakernel active-window telemetry query failed: {source}. Fix: decode into caller-owned reusable window scratch."
510 )
511 });
512 }
513
514 pub fn try_active_windows_into<'a>(
520 &'a self,
521 out: &mut Vec<&'a WindowTelemetry>,
522 ) -> Result<(), PipelineError> {
523 out.clear();
524 reserve_target_capacity(out, self.windows.len(), "active window output")?;
525 self.windows
526 .iter()
527 .filter(|window| window.is_active())
528 .for_each(|window| out.push(window));
529 Ok(())
530 }
531
532 #[must_use]
534 pub fn priority_accounting(&self) -> PriorityRequeueAccounting {
535 PriorityRequeueAccounting {
536 requeue_count: u64::from(self.occupancy.requeue),
537 aged_promotions: 0,
538 max_priority_age: 0,
539 }
540 }
541
542 #[must_use]
545 pub fn runtime_counters(&self) -> MegakernelRuntimeCounters {
546 let total_slots = self.occupancy.total_slots();
547 let queue_depth = self.occupancy.queue_depth();
548 let gpu_idle_slots = self.occupancy.empty;
549 let gpu_idle_ppm = if total_slots == 0 {
550 0
551 } else {
552 let raw_idle_ppm = (u64::from(gpu_idle_slots) * 1_000_000) / u64::from(total_slots);
553 raw_idle_ppm.min(1_000_000) as u32
554 };
555 let frontier_density_bps = density_bps(queue_depth, total_slots);
556 let active_slots = total_slots.saturating_sub(gpu_idle_slots);
557 let occupancy_proxy_bps = density_bps(active_slots, total_slots);
558 let tenant_fairness_total = self
559 .control
560 .tenant_fairness
561 .iter()
562 .try_fold(0u64, |acc, &count| acc.checked_add(u64::from(count)))
563 .unwrap_or_else(|| {
564 panic!(
565 "megakernel tenant fairness total overflowed u64. Fix: shard tenant counters before telemetry aggregation."
566 )
567 });
568 let priority_fairness_total = self
569 .control
570 .priority_fairness
571 .iter()
572 .try_fold(0u64, |acc, &count| acc.checked_add(u64::from(count)))
573 .unwrap_or_else(|| {
574 panic!(
575 "megakernel priority fairness total overflowed u64. Fix: shard priority counters before telemetry aggregation."
576 )
577 });
578 let tenant_fairness_skew = fairness_skew(&self.control.tenant_fairness);
579 MegakernelRuntimeCounters {
580 total_slots,
581 queue_depth,
582 gpu_idle_slots,
583 gpu_idle_ppm,
584 frontier_density_bps,
585 occupancy_proxy_bps,
586 drained_slots: self.control.done_count,
587 unreclaimed_done_slots: self.occupancy.done,
588 tenant_fairness_total,
589 tenant_fairness_skew,
590 priority_fairness_total,
591 requeue_slots: self.occupancy.requeue,
592 fault_slots: self.occupancy.fault,
593 }
594 }
595
596 #[must_use]
599 pub fn health_since(&self, previous: &RingTelemetry) -> MegakernelWatchdogSnapshot {
600 let counters = self.runtime_counters();
601 let done_delta = self
602 .control
603 .done_count
604 .checked_sub(previous.control.done_count)
605 .unwrap_or_else(|| {
606 panic!(
607 "megakernel done counter moved backwards from {} to {}. Fix: treat counter reset/wrap as a new telemetry epoch.",
608 previous.control.done_count,
609 self.control.done_count
610 )
611 });
612 let suspected_stall =
613 counters.queue_depth > 0 && done_delta == 0 && counters.fault_slots == 0;
614 MegakernelWatchdogSnapshot {
615 done_delta,
616 queue_depth: counters.queue_depth,
617 fault_slots: counters.fault_slots,
618 requeue_slots: counters.requeue_slots,
619 gpu_idle_ppm: counters.gpu_idle_ppm,
620 suspected_stall,
621 }
622 }
623
624 pub fn recommend_launch(
630 &self,
631 mut request: MegakernelLaunchRequest,
632 ) -> Result<MegakernelLaunchRecommendation, vyre_driver::BackendError> {
633 let counters = self.runtime_counters();
634 if request.graph_node_count == 0 {
635 request.graph_node_count = counters.total_slots;
636 }
637 if request.graph_edge_count == 0 {
638 request.graph_edge_count = counters.queue_depth;
639 }
640 if request.frontier_density_bps == 0 {
641 request.frontier_density_bps = counters.frontier_density_bps;
642 }
643 request.hot_opcode_count = self
644 .control
645 .metrics
646 .iter()
647 .filter(|(_, count)| *count > 0)
648 .count()
649 .try_into()
650 .unwrap_or_else(|source| {
651 panic!(
652 "megakernel hot opcode count cannot fit u32: {source}. Fix: cap metrics slots at the protocol boundary."
653 )
654 });
655 request.hot_window_count = self
656 .windows
657 .iter()
658 .filter(|window| {
659 window
660 .required_slots
661 .checked_add(window.lookahead_slots)
662 .unwrap_or_else(|| {
663 panic!(
664 "megakernel route-window slot demand overflowed u32. Fix: shard route windows before telemetry aggregation."
665 )
666 })
667 >= 4
668 })
669 .count()
670 .try_into()
671 .unwrap_or_else(|source| {
672 panic!(
673 "megakernel hot window count cannot fit u32: {source}. Fix: shard telemetry windows before launch recommendation."
674 )
675 });
676 request.requeue_count = request
677 .requeue_count
678 .checked_add(u64::from(self.occupancy.requeue))
679 .unwrap_or_else(|| {
680 panic!(
681 "megakernel requeue count overflowed u64. Fix: shard telemetry windows before launch recommendation."
682 )
683 });
684 MegakernelLaunchPolicy::standard().recommend(request)
685 }
686}
687
688
689fn read_required_control_word(control_bytes: &[u8], word_idx: usize) -> Result<u32, PipelineError> {
690 read_word(control_bytes, word_idx).ok_or_else(|| {
691 PipelineError::Backend(format!(
692 "megakernel control snapshot is missing required word {word_idx}. Fix: capture the full control buffer before telemetry decode."
693 ))
694 })
695}
696
697fn density_bps(numerator: u32, denominator: u32) -> u16 {
698 if denominator == 0 {
699 return 0;
700 }
701 let bps = (u64::from(numerator) * 10_000) / u64::from(denominator);
702 u16::try_from(bps.min(u64::from(u16::MAX))).unwrap_or_else(|source| {
703 panic!(
704 "megakernel density bps cannot fit u16 after clamp: {source}. Fix: repair density accounting."
705 )
706 })
707}
708
709fn validate_telemetry_buffers(
710 control_bytes: &[u8],
711 ring_bytes: &[u8],
712) -> Result<(), PipelineError> {
713 validate_control_snapshot(control_bytes)?;
714 let slot_bytes = slot_byte_len();
715 if ring_bytes.len() % slot_bytes != 0 {
716 return Err(PipelineError::Backend(format!(
717 "megakernel ring snapshot has {} bytes, not a multiple of slot size {slot_bytes}. Fix: capture whole ring slots.",
718 ring_bytes.len()
719 )));
720 }
721 let slot_count = ring_bytes.len() / slot_bytes;
722 if u32::try_from(slot_count).is_err() {
723 return Err(PipelineError::Backend(format!(
724 "megakernel ring snapshot has {slot_count} slots, above the u32 telemetry ABI. Fix: shard ring snapshots before host decode."
725 )));
726 }
727 Ok(())
728}
729
730fn validate_control_snapshot(control_bytes: &[u8]) -> Result<(), PipelineError> {
731 let min_control = super::protocol::control_byte_len(0).ok_or_else(|| {
732 PipelineError::Backend(
733 "megakernel control length overflowed usize. Fix: keep protocol constants bounded."
734 .to_string(),
735 )
736 })?;
737 if control_bytes.len() < min_control || control_bytes.len() % 4 != 0 {
738 return Err(PipelineError::Backend(format!(
739 "megakernel control snapshot has {} bytes, expected at least {min_control} and 4-byte alignment. Fix: capture the full control buffer.",
740 control_bytes.len()
741 )));
742 }
743 Ok(())
744}
745
746fn slot_byte_len() -> usize {
747 SLOT_WORDS_USIZE.checked_mul(4).unwrap_or_else(|| {
748 panic!(
749 "megakernel telemetry slot byte width overflowed usize. Fix: keep SLOT_WORDS within host address space."
750 )
751 })
752}
753
754fn telemetry_u32_to_usize(value: u32, label: &'static str) -> usize {
755 usize::try_from(value).unwrap_or_else(|source| {
756 panic!(
757 "megakernel telemetry {label} value {value} cannot fit usize: {source}. Fix: shard telemetry buffers before host decode."
758 )
759 })
760}
761
762fn control_word_index(word: u32) -> Result<usize, PipelineError> {
763 usize::try_from(word).map_err(|source| {
764 PipelineError::Backend(format!(
765 "megakernel control word index {word} cannot fit usize: {source}. Fix: keep control ABI words within host address space."
766 ))
767 })
768}
769
770fn control_offset_index(base: u32, offset: u32) -> Result<usize, PipelineError> {
771 let word = base.checked_add(offset).ok_or_else(|| {
772 PipelineError::Backend(
773 "megakernel control word offset overflowed u32. Fix: shard telemetry arrays before host decode."
774 .to_string(),
775 )
776 })?;
777 control_word_index(word)
778}
779
780fn fairness_skew(counters: &[u32]) -> u32 {
781 let mut min_nonzero = u32::MAX;
782 let mut max = 0u32;
783 for &count in counters {
784 if count != 0 {
785 min_nonzero = min_nonzero.min(count);
786 max = max.max(count);
787 }
788 }
789 if min_nonzero == u32::MAX {
790 0
791 } else {
792 max.checked_sub(min_nonzero).unwrap_or_else(|| {
793 panic!(
794 "megakernel fairness skew saw max {max} below min_nonzero {min_nonzero}. Fix: reject malformed fairness counters before telemetry aggregation."
795 )
796 })
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 include!("telemetry_tests.rs");
803}
804