Skip to main content

vyre_runtime/megakernel/
telemetry.rs

1//! Host-side telemetry decoders for the megakernel ring and control buffers.
2//!
3//! The runtime already exposes low-level helpers such as
4//! `read_done_count`, `read_epoch`, and `read_metrics`. This module adds a
5//! single structured snapshot surface useful for wrappers like VyreOffload.
6
7use super::protocol::{control, slot, ARG0_WORD, OPCODE_WORD, STATUS_WORD, TENANT_WORD};
8use super::scaling::{
9    MegakernelLaunchPolicy, MegakernelLaunchRecommendation, MegakernelLaunchRequest,
10    PriorityRequeueAccounting,
11};
12use super::staging_reserve::{
13    reserve_hash_map_capacity, reserve_vec_capacity as reserve_target_capacity,
14};
15use crate::PipelineError;
16
17mod sketch;
18mod types;
19pub use sketch::{CountMinSketch, SketchTelemetry, SketchTelemetryScratch};
20use types::WindowAccumulator;
21pub use types::{
22    ControlSnapshot, MegakernelRuntimeCounters, MegakernelWatchdogSnapshot, RingOccupancy,
23    RingSlotSnapshot, RingStatus, RingTelemetry, TelemetryDecodeScratch, WindowTelemetry,
24};
25
26const SLOT_WORDS_USIZE: usize = 16;
27
28fn read_word(buf: &[u8], word_idx: usize) -> Option<u32> {
29    let off = word_idx.checked_mul(4)?;
30    let end = off.checked_add(4)?;
31    let bytes = buf.get(off..end)?;
32    Some(u32::from_le_bytes(bytes.try_into().ok()?))
33}
34
35fn read_slot_chunk_word_exact(slot_bytes: &[u8], word_idx: u32) -> u32 {
36    let off = telemetry_u32_to_usize(word_idx, "slot word index")
37        .checked_mul(4)
38        .unwrap_or_else(|| {
39            panic!(
40                "megakernel telemetry slot word byte offset overflowed usize. Fix: keep slot word indices within host address space."
41            )
42        });
43    u32::from_le_bytes([
44        slot_bytes[off],
45        slot_bytes[off + 1],
46        slot_bytes[off + 2],
47        slot_bytes[off + 3],
48    ])
49}
50
51fn is_sorted_unique_u32(values: &[u32]) -> bool {
52    values.windows(2).all(|pair| pair[0] < pair[1])
53}
54
55impl ControlSnapshot {
56    /// Decode a structured control-buffer view.
57    #[must_use]
58    pub fn decode(control_bytes: &[u8]) -> Self {
59        let mut out = Self::default();
60        Self::try_decode_into(control_bytes, &mut out).unwrap_or_else(|source| {
61            panic!(
62                "megakernel control telemetry decode failed: {source}. Fix: capture the full control buffer before telemetry decode."
63            )
64        });
65        out
66    }
67
68    /// Decode a structured control-buffer view into caller-owned storage.
69    pub fn decode_into(control_bytes: &[u8], out: &mut Self) {
70        Self::try_decode_into(control_bytes, out).unwrap_or_else(|source| {
71            panic!(
72                "megakernel control telemetry decode failed: {source}. Fix: capture the full control buffer before telemetry decode."
73            )
74        });
75    }
76
77    /// Strictly decode a structured control-buffer view.
78    ///
79    /// # Errors
80    ///
81    /// Returns [`PipelineError`] when any fixed control word is missing from
82    /// the control snapshot.
83    pub fn try_decode_into(control_bytes: &[u8], out: &mut Self) -> Result<(), PipelineError> {
84        validate_control_snapshot(control_bytes)?;
85        out.shutdown =
86            read_required_control_word(control_bytes, control_word_index(control::SHUTDOWN)?)? != 0;
87        out.done_count =
88            read_required_control_word(control_bytes, control_word_index(control::DONE_COUNT)?)?;
89        out.epoch = read_required_control_word(control_bytes, control_word_index(control::EPOCH)?)?;
90        out.metrics.clear();
91        reserve_target_capacity(
92            &mut out.metrics,
93            telemetry_u32_to_usize(control::METRICS_SLOTS, "metrics slot count"),
94            "metrics",
95        )?;
96        for i in 0..control::METRICS_SLOTS {
97            let count = read_required_control_word(
98                control_bytes,
99                control_offset_index(control::METRICS_BASE, i)?,
100            )?;
101            if count > 0 {
102                out.metrics.push((i, count));
103            }
104        }
105        out.tenant_fairness.clear();
106        reserve_target_capacity(
107            &mut out.tenant_fairness,
108            telemetry_u32_to_usize(control::TENANT_FAIRNESS_SLOTS, "tenant fairness slot count"),
109            "tenant fairness",
110        )?;
111        for i in 0..control::TENANT_FAIRNESS_SLOTS {
112            out.tenant_fairness.push(read_required_control_word(
113                control_bytes,
114                control_offset_index(control::TENANT_FAIRNESS_BASE, i)?,
115            )?);
116        }
117        out.priority_fairness.clear();
118        reserve_target_capacity(
119            &mut out.priority_fairness,
120            telemetry_u32_to_usize(
121                control::PRIORITY_FAIRNESS_SLOTS,
122                "priority fairness slot count",
123            ),
124            "priority fairness",
125        )?;
126        for i in 0..control::PRIORITY_FAIRNESS_SLOTS {
127            out.priority_fairness.push(read_required_control_word(
128                control_bytes,
129                control_offset_index(control::PRIORITY_FAIRNESS_BASE, i)?,
130            )?);
131        }
132        Ok(())
133    }
134}
135
136impl RingTelemetry {
137    /// Decode the ring and control buffers into one structured snapshot.
138    #[must_use]
139    pub fn decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Self {
140        Self::decode_with_window_opcodes(control_bytes, ring_bytes, &[])
141    }
142
143    /// Strictly decode ring and control bytes after validating ABI alignment.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
148    /// the megakernel wire protocol.
149    pub fn try_decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Result<Self, PipelineError> {
150        Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, &[])
151    }
152
153    /// Decode the ring and control buffers, additionally grouping any slots
154    /// whose opcode is present in `window_opcodes` into ticketed route-window
155    /// telemetry records.
156    #[must_use]
157    pub fn decode_with_window_opcodes(
158        control_bytes: &[u8],
159        ring_bytes: &[u8],
160        window_opcodes: &[u32],
161    ) -> Self {
162        validate_telemetry_buffers(control_bytes, ring_bytes).unwrap_or_else(|source| {
163            panic!(
164                "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
165            )
166        });
167        let mut out = Self::default();
168        let mut scratch = TelemetryDecodeScratch::new();
169        Self::try_decode_with_window_opcodes_into_unchecked(
170            control_bytes,
171            ring_bytes,
172            window_opcodes,
173            &mut out,
174            &mut scratch,
175        )
176        .unwrap_or_else(|source| {
177            panic!(
178                "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
179            )
180        });
181        out
182    }
183
184    /// Decode the ring and control buffers into caller-owned telemetry and
185    /// scratch storage.
186    pub fn decode_with_window_opcodes_into(
187        control_bytes: &[u8],
188        ring_bytes: &[u8],
189        window_opcodes: &[u32],
190        out: &mut Self,
191        scratch: &mut TelemetryDecodeScratch,
192    ) {
193        validate_telemetry_buffers(control_bytes, ring_bytes).unwrap_or_else(|source| {
194            panic!(
195                "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
196            )
197        });
198        Self::try_decode_with_window_opcodes_into_unchecked(
199            control_bytes,
200            ring_bytes,
201            window_opcodes,
202            out,
203            scratch,
204        )
205        .unwrap_or_else(|source| {
206            panic!(
207                "megakernel ring telemetry decode failed: {source}. Fix: capture full control and whole ring-slot buffers before telemetry decode."
208            )
209        });
210    }
211
212    fn try_decode_with_window_opcodes_into_unchecked(
213        control_bytes: &[u8],
214        ring_bytes: &[u8],
215        window_opcodes: &[u32],
216        out: &mut Self,
217        scratch: &mut TelemetryDecodeScratch,
218    ) -> Result<(), PipelineError> {
219        enum WindowOpcodeMatcher<'a> {
220            None,
221            Single(u32),
222            DenseBitmap(u128),
223            SmallSlice(&'a [u32]),
224            LargeSlice(&'a [u32]),
225        }
226
227        ControlSnapshot::try_decode_into(control_bytes, &mut out.control)?;
228        let slot_count = ring_bytes.len() / slot_byte_len();
229        out.occupancy = RingOccupancy::default();
230        out.slots.clear();
231        reserve_target_capacity(&mut out.slots, slot_count, "ring slots")?;
232        out.windows.clear();
233        scratch.window_opcodes.clear();
234        scratch.windows.clear();
235        let window_opcode_lookup = if window_opcodes.is_empty() {
236            &[][..]
237        } else if is_sorted_unique_u32(window_opcodes) {
238            window_opcodes
239        } else {
240            reserve_target_capacity(
241                &mut scratch.window_opcodes,
242                window_opcodes.len(),
243                "window opcode scratch",
244            )?;
245            scratch.window_opcodes.extend_from_slice(window_opcodes);
246            scratch.window_opcodes.sort_unstable();
247            scratch.window_opcodes.dedup();
248            scratch.window_opcodes.as_slice()
249        };
250        let window_opcode_matcher = match window_opcode_lookup {
251            [] => WindowOpcodeMatcher::None,
252            [opcode] => WindowOpcodeMatcher::Single(*opcode),
253            opcodes if opcodes.len() > 1 && opcodes.iter().all(|opcode| *opcode < 128) => {
254                let bitmap = opcodes
255                    .iter()
256                    .fold(0_u128, |acc, &opcode| acc | (1_u128 << opcode));
257                WindowOpcodeMatcher::DenseBitmap(bitmap)
258            }
259            opcodes if opcodes.len() <= 8 => WindowOpcodeMatcher::SmallSlice(opcodes),
260            opcodes => WindowOpcodeMatcher::LargeSlice(opcodes),
261        };
262        if !matches!(window_opcode_matcher, WindowOpcodeMatcher::None) {
263            reserve_hash_map_capacity(
264                &mut scratch.windows,
265                slot_count,
266                "window accumulator scratch",
267            )?;
268        }
269        let decode_windows = !matches!(window_opcode_matcher, WindowOpcodeMatcher::None);
270
271        let slot_byte_len = slot_byte_len();
272        for (slot_idx, slot_bytes) in ring_bytes.chunks_exact(slot_byte_len).enumerate() {
273            let slot_idx = u32::try_from(slot_idx).unwrap_or_else(|source| {
274                panic!(
275                    "megakernel telemetry slot index cannot fit u32: {source}. Fix: shard ring snapshots before host decode."
276                )
277            });
278            let status_raw = read_slot_chunk_word_exact(slot_bytes, STATUS_WORD);
279            let status = RingStatus::from_raw(status_raw);
280            match status {
281                RingStatus::Empty => out.occupancy.empty += 1,
282                RingStatus::Published => out.occupancy.published += 1,
283                RingStatus::Claimed => out.occupancy.claimed += 1,
284                RingStatus::Done => out.occupancy.done += 1,
285                RingStatus::WaitIo => out.occupancy.wait_io += 1,
286                RingStatus::Yield => out.occupancy.yield_count += 1,
287                RingStatus::Requeue => out.occupancy.requeue += 1,
288                RingStatus::Fault => out.occupancy.fault += 1,
289                RingStatus::Unknown(_) => out.occupancy.unknown += 1,
290            }
291            let tenant_id = read_slot_chunk_word_exact(slot_bytes, TENANT_WORD);
292            let opcode = read_slot_chunk_word_exact(slot_bytes, OPCODE_WORD);
293            let args_prefix = [
294                read_slot_chunk_word_exact(slot_bytes, ARG0_WORD),
295                read_slot_chunk_word_exact(slot_bytes, ARG0_WORD + 1),
296                read_slot_chunk_word_exact(slot_bytes, ARG0_WORD + 2),
297            ];
298            let is_window_opcode = match window_opcode_matcher {
299                WindowOpcodeMatcher::None => false,
300                WindowOpcodeMatcher::Single(expected) => opcode == expected,
301                WindowOpcodeMatcher::DenseBitmap(bitmap) => {
302                    opcode < 128 && ((bitmap >> opcode) & 1) == 1
303                }
304                WindowOpcodeMatcher::SmallSlice(window_opcodes) => window_opcodes.contains(&opcode),
305                WindowOpcodeMatcher::LargeSlice(window_opcodes) => {
306                    window_opcodes.binary_search(&opcode).is_ok()
307                }
308            };
309            if decode_windows && is_window_opcode {
310                let ticket = args_prefix[0];
311                let class_tag = args_prefix[1];
312                let entry =
313                    scratch
314                        .windows
315                        .entry((ticket, opcode))
316                        .or_insert_with(|| WindowAccumulator {
317                            tenant_id,
318                            opcode,
319                            ..WindowAccumulator::default()
320                        });
321                match class_tag {
322                    0 => entry.required_slots += 1,
323                    1 => entry.lookahead_slots += 1,
324                    _ => {}
325                }
326                match status {
327                    RingStatus::Published => entry.published += 1,
328                    RingStatus::Claimed => entry.claimed += 1,
329                    RingStatus::Done => entry.done += 1,
330                    RingStatus::WaitIo => entry.wait_io += 1,
331                    RingStatus::Yield => entry.yield_count += 1,
332                    RingStatus::Requeue => entry.requeue += 1,
333                    RingStatus::Fault => entry.fault += 1,
334                    RingStatus::Empty | RingStatus::Unknown(_) => {}
335                }
336            }
337            out.slots.push(RingSlotSnapshot {
338                slot_idx,
339                status,
340                tenant_id,
341                opcode,
342                args_prefix,
343            });
344        }
345
346        reserve_target_capacity(&mut out.windows, scratch.windows.len(), "window output")?;
347        for (&(ticket, _), acc) in &scratch.windows {
348            out.windows.push(WindowTelemetry {
349                ticket,
350                tenant_id: acc.tenant_id,
351                opcode: acc.opcode,
352                required_slots: acc.required_slots,
353                lookahead_slots: acc.lookahead_slots,
354                published: acc.published,
355                claimed: acc.claimed,
356                done: acc.done,
357                wait_io: acc.wait_io,
358                yield_count: acc.yield_count,
359                requeue: acc.requeue,
360                fault: acc.fault,
361            });
362        }
363        out.windows
364            .sort_unstable_by_key(|window| (window.ticket, window.opcode));
365        Ok(())
366    }
367
368    /// Strictly decode ring/control bytes and group selected window opcodes.
369    ///
370    /// # Errors
371    ///
372    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
373    /// the megakernel wire protocol.
374    pub fn try_decode_with_window_opcodes(
375        control_bytes: &[u8],
376        ring_bytes: &[u8],
377        window_opcodes: &[u32],
378    ) -> Result<Self, PipelineError> {
379        validate_telemetry_buffers(control_bytes, ring_bytes)?;
380        let mut out = Self::default();
381        let mut scratch = TelemetryDecodeScratch::new();
382        Self::try_decode_with_window_opcodes_into_unchecked(
383            control_bytes,
384            ring_bytes,
385            window_opcodes,
386            &mut out,
387            &mut scratch,
388        )?;
389        Ok(out)
390    }
391
392    /// Strictly decode ring/control bytes into caller-owned telemetry and
393    /// scratch storage.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
398    /// the megakernel wire protocol.
399    pub fn try_decode_with_window_opcodes_into(
400        control_bytes: &[u8],
401        ring_bytes: &[u8],
402        window_opcodes: &[u32],
403        out: &mut Self,
404        scratch: &mut TelemetryDecodeScratch,
405    ) -> Result<(), PipelineError> {
406        validate_telemetry_buffers(control_bytes, ring_bytes)?;
407        Self::try_decode_with_window_opcodes_into_unchecked(
408            control_bytes,
409            ring_bytes,
410            window_opcodes,
411            out,
412            scratch,
413        )?;
414        Ok(())
415    }
416
417    /// Active slots matching a given opcode.
418    #[must_use]
419    pub fn active_slots_for_opcode(&self, opcode: u32) -> Vec<&RingSlotSnapshot> {
420        self.try_active_slots_for_opcode(opcode).unwrap_or_else(|source| {
421            panic!(
422                "megakernel active-slot telemetry query failed: {source}. Fix: decode into caller-owned reusable slot scratch."
423            )
424        })
425    }
426
427    /// Active slots matching a given opcode with fallible output staging.
428    ///
429    /// # Errors
430    ///
431    /// Returns [`PipelineError`] when output storage cannot be reserved.
432    pub fn try_active_slots_for_opcode(
433        &self,
434        opcode: u32,
435    ) -> Result<Vec<&RingSlotSnapshot>, PipelineError> {
436        let mut out = Vec::new();
437        self.try_active_slots_for_opcode_into(opcode, &mut out)?;
438        Ok(out)
439    }
440
441    /// Active slots matching a given opcode as an iterator.
442    pub fn active_slots_for_opcode_iter(
443        &self,
444        opcode: u32,
445    ) -> impl Iterator<Item = &RingSlotSnapshot> {
446        self.slots
447            .iter()
448            .filter(move |slot| slot.opcode == opcode && slot.status.is_active())
449    }
450
451    /// Active slots matching a given opcode into caller-owned storage.
452    pub fn active_slots_for_opcode_into<'a>(
453        &'a self,
454        opcode: u32,
455        out: &mut Vec<&'a RingSlotSnapshot>,
456    ) {
457        self.try_active_slots_for_opcode_into(opcode, out)
458            .unwrap_or_else(|source| {
459                panic!(
460                    "megakernel active-slot telemetry query failed: {source}. Fix: decode into caller-owned reusable slot scratch."
461                )
462            });
463    }
464
465    /// Active slots matching a given opcode into caller-owned storage.
466    ///
467    /// # Errors
468    ///
469    /// Returns [`PipelineError`] when output storage cannot be reserved.
470    pub fn try_active_slots_for_opcode_into<'a>(
471        &'a self,
472        opcode: u32,
473        out: &mut Vec<&'a RingSlotSnapshot>,
474    ) -> Result<(), PipelineError> {
475        out.clear();
476        reserve_target_capacity(out, self.slots.len(), "active slot output")?;
477        self.slots
478            .iter()
479            .filter(|slot| slot.opcode == opcode && slot.status.is_active())
480            .for_each(|slot| out.push(slot));
481        Ok(())
482    }
483
484    /// Unfinished ticketed windows.
485    #[must_use]
486    pub fn active_windows(&self) -> Vec<&WindowTelemetry> {
487        self.try_active_windows().unwrap_or_else(|source| {
488            panic!(
489                "megakernel active-window telemetry query failed: {source}. Fix: decode into caller-owned reusable window scratch."
490            )
491        })
492    }
493
494    /// Unfinished ticketed windows with fallible output staging.
495    ///
496    /// # Errors
497    ///
498    /// Returns [`PipelineError`] when output storage cannot be reserved.
499    pub fn try_active_windows(&self) -> Result<Vec<&WindowTelemetry>, PipelineError> {
500        let mut out = Vec::new();
501        self.try_active_windows_into(&mut out)?;
502        Ok(out)
503    }
504
505    /// Unfinished ticketed windows into caller-owned storage.
506    pub fn active_windows_into<'a>(&'a self, out: &mut Vec<&'a WindowTelemetry>) {
507        self.try_active_windows_into(out).unwrap_or_else(|source| {
508            panic!(
509                "megakernel active-window telemetry query failed: {source}. Fix: decode into caller-owned reusable window scratch."
510            )
511        });
512    }
513
514    /// Unfinished ticketed windows into caller-owned storage.
515    ///
516    /// # Errors
517    ///
518    /// Returns [`PipelineError`] when output storage cannot be reserved.
519    pub fn try_active_windows_into<'a>(
520        &'a self,
521        out: &mut Vec<&'a WindowTelemetry>,
522    ) -> Result<(), PipelineError> {
523        out.clear();
524        reserve_target_capacity(out, self.windows.len(), "active window output")?;
525        self.windows
526            .iter()
527            .filter(|window| window.is_active())
528            .for_each(|window| out.push(window));
529        Ok(())
530    }
531
532    /// Summarize priority requeue/aging pressure visible in the ring snapshot.
533    #[must_use]
534    pub fn priority_accounting(&self) -> PriorityRequeueAccounting {
535        PriorityRequeueAccounting {
536            requeue_count: u64::from(self.occupancy.requeue),
537            aged_promotions: 0,
538            max_priority_age: 0,
539        }
540    }
541
542    /// Aggregate queue, idle, fairness, and drain counters into one cheap
543    /// runtime snapshot for SRE dashboards and launch-policy feedback.
544    #[must_use]
545    pub fn runtime_counters(&self) -> MegakernelRuntimeCounters {
546        let total_slots = self.occupancy.total_slots();
547        let queue_depth = self.occupancy.queue_depth();
548        let gpu_idle_slots = self.occupancy.empty;
549        let gpu_idle_ppm = if total_slots == 0 {
550            0
551        } else {
552            let raw_idle_ppm = (u64::from(gpu_idle_slots) * 1_000_000) / u64::from(total_slots);
553            raw_idle_ppm.min(1_000_000) as u32
554        };
555        let frontier_density_bps = density_bps(queue_depth, total_slots);
556        let active_slots = total_slots.saturating_sub(gpu_idle_slots);
557        let occupancy_proxy_bps = density_bps(active_slots, total_slots);
558        let tenant_fairness_total = self
559            .control
560            .tenant_fairness
561            .iter()
562            .try_fold(0u64, |acc, &count| acc.checked_add(u64::from(count)))
563            .unwrap_or_else(|| {
564                panic!(
565                    "megakernel tenant fairness total overflowed u64. Fix: shard tenant counters before telemetry aggregation."
566                )
567            });
568        let priority_fairness_total = self
569            .control
570            .priority_fairness
571            .iter()
572            .try_fold(0u64, |acc, &count| acc.checked_add(u64::from(count)))
573            .unwrap_or_else(|| {
574                panic!(
575                    "megakernel priority fairness total overflowed u64. Fix: shard priority counters before telemetry aggregation."
576                )
577            });
578        let tenant_fairness_skew = fairness_skew(&self.control.tenant_fairness);
579        MegakernelRuntimeCounters {
580            total_slots,
581            queue_depth,
582            gpu_idle_slots,
583            gpu_idle_ppm,
584            frontier_density_bps,
585            occupancy_proxy_bps,
586            drained_slots: self.control.done_count,
587            unreclaimed_done_slots: self.occupancy.done,
588            tenant_fairness_total,
589            tenant_fairness_skew,
590            priority_fairness_total,
591            requeue_slots: self.occupancy.requeue,
592            fault_slots: self.occupancy.fault,
593        }
594    }
595
596    /// Derive persistent-kernel health from two snapshots without polling the
597    /// device or synchronizing with the GPU.
598    #[must_use]
599    pub fn health_since(&self, previous: &RingTelemetry) -> MegakernelWatchdogSnapshot {
600        let counters = self.runtime_counters();
601        let done_delta = self
602            .control
603            .done_count
604            .checked_sub(previous.control.done_count)
605            .unwrap_or_else(|| {
606                panic!(
607                    "megakernel done counter moved backwards from {} to {}. Fix: treat counter reset/wrap as a new telemetry epoch.",
608                    previous.control.done_count,
609                    self.control.done_count
610                )
611            });
612        let suspected_stall =
613            counters.queue_depth > 0 && done_delta == 0 && counters.fault_slots == 0;
614        MegakernelWatchdogSnapshot {
615            done_delta,
616            queue_depth: counters.queue_depth,
617            fault_slots: counters.fault_slots,
618            requeue_slots: counters.requeue_slots,
619            gpu_idle_ppm: counters.gpu_idle_ppm,
620            suspected_stall,
621        }
622    }
623
624    /// Feed telemetry into the shared launch policy.
625    ///
626    /// # Errors
627    ///
628    /// Returns a backend error when the supplied adapter limits are malformed.
629    pub fn recommend_launch(
630        &self,
631        mut request: MegakernelLaunchRequest,
632    ) -> Result<MegakernelLaunchRecommendation, vyre_driver::BackendError> {
633        let counters = self.runtime_counters();
634        if request.graph_node_count == 0 {
635            request.graph_node_count = counters.total_slots;
636        }
637        if request.graph_edge_count == 0 {
638            request.graph_edge_count = counters.queue_depth;
639        }
640        if request.frontier_density_bps == 0 {
641            request.frontier_density_bps = counters.frontier_density_bps;
642        }
643        request.hot_opcode_count = self
644            .control
645            .metrics
646            .iter()
647            .filter(|(_, count)| *count > 0)
648            .count()
649            .try_into()
650            .unwrap_or_else(|source| {
651                panic!(
652                    "megakernel hot opcode count cannot fit u32: {source}. Fix: cap metrics slots at the protocol boundary."
653                )
654            });
655        request.hot_window_count = self
656            .windows
657            .iter()
658            .filter(|window| {
659                window
660                    .required_slots
661                    .checked_add(window.lookahead_slots)
662                    .unwrap_or_else(|| {
663                        panic!(
664                            "megakernel route-window slot demand overflowed u32. Fix: shard route windows before telemetry aggregation."
665                        )
666                    })
667                    >= 4
668            })
669            .count()
670            .try_into()
671            .unwrap_or_else(|source| {
672                panic!(
673                    "megakernel hot window count cannot fit u32: {source}. Fix: shard telemetry windows before launch recommendation."
674                )
675            });
676        request.requeue_count = request
677            .requeue_count
678            .checked_add(u64::from(self.occupancy.requeue))
679            .unwrap_or_else(|| {
680                panic!(
681                    "megakernel requeue count overflowed u64. Fix: shard telemetry windows before launch recommendation."
682                )
683            });
684        MegakernelLaunchPolicy::standard().recommend(request)
685    }
686}
687
688
689fn read_required_control_word(control_bytes: &[u8], word_idx: usize) -> Result<u32, PipelineError> {
690    read_word(control_bytes, word_idx).ok_or_else(|| {
691        PipelineError::Backend(format!(
692            "megakernel control snapshot is missing required word {word_idx}. Fix: capture the full control buffer before telemetry decode."
693        ))
694    })
695}
696
697fn density_bps(numerator: u32, denominator: u32) -> u16 {
698    if denominator == 0 {
699        return 0;
700    }
701    let bps = (u64::from(numerator) * 10_000) / u64::from(denominator);
702    u16::try_from(bps.min(u64::from(u16::MAX))).unwrap_or_else(|source| {
703        panic!(
704            "megakernel density bps cannot fit u16 after clamp: {source}. Fix: repair density accounting."
705        )
706    })
707}
708
709fn validate_telemetry_buffers(
710    control_bytes: &[u8],
711    ring_bytes: &[u8],
712) -> Result<(), PipelineError> {
713    validate_control_snapshot(control_bytes)?;
714    let slot_bytes = slot_byte_len();
715    if ring_bytes.len() % slot_bytes != 0 {
716        return Err(PipelineError::Backend(format!(
717            "megakernel ring snapshot has {} bytes, not a multiple of slot size {slot_bytes}. Fix: capture whole ring slots.",
718            ring_bytes.len()
719        )));
720    }
721    let slot_count = ring_bytes.len() / slot_bytes;
722    if u32::try_from(slot_count).is_err() {
723        return Err(PipelineError::Backend(format!(
724            "megakernel ring snapshot has {slot_count} slots, above the u32 telemetry ABI. Fix: shard ring snapshots before host decode."
725        )));
726    }
727    Ok(())
728}
729
730fn validate_control_snapshot(control_bytes: &[u8]) -> Result<(), PipelineError> {
731    let min_control = super::protocol::control_byte_len(0).ok_or_else(|| {
732        PipelineError::Backend(
733            "megakernel control length overflowed usize. Fix: keep protocol constants bounded."
734                .to_string(),
735        )
736    })?;
737    if control_bytes.len() < min_control || control_bytes.len() % 4 != 0 {
738        return Err(PipelineError::Backend(format!(
739            "megakernel control snapshot has {} bytes, expected at least {min_control} and 4-byte alignment. Fix: capture the full control buffer.",
740            control_bytes.len()
741        )));
742    }
743    Ok(())
744}
745
746fn slot_byte_len() -> usize {
747    SLOT_WORDS_USIZE.checked_mul(4).unwrap_or_else(|| {
748        panic!(
749            "megakernel telemetry slot byte width overflowed usize. Fix: keep SLOT_WORDS within host address space."
750        )
751    })
752}
753
754fn telemetry_u32_to_usize(value: u32, label: &'static str) -> usize {
755    usize::try_from(value).unwrap_or_else(|source| {
756        panic!(
757            "megakernel telemetry {label} value {value} cannot fit usize: {source}. Fix: shard telemetry buffers before host decode."
758        )
759    })
760}
761
762fn control_word_index(word: u32) -> Result<usize, PipelineError> {
763    usize::try_from(word).map_err(|source| {
764        PipelineError::Backend(format!(
765            "megakernel control word index {word} cannot fit usize: {source}. Fix: keep control ABI words within host address space."
766        ))
767    })
768}
769
770fn control_offset_index(base: u32, offset: u32) -> Result<usize, PipelineError> {
771    let word = base.checked_add(offset).ok_or_else(|| {
772        PipelineError::Backend(
773            "megakernel control word offset overflowed u32. Fix: shard telemetry arrays before host decode."
774                .to_string(),
775        )
776    })?;
777    control_word_index(word)
778}
779
780fn fairness_skew(counters: &[u32]) -> u32 {
781    let mut min_nonzero = u32::MAX;
782    let mut max = 0u32;
783    for &count in counters {
784        if count != 0 {
785            min_nonzero = min_nonzero.min(count);
786            max = max.max(count);
787        }
788    }
789    if min_nonzero == u32::MAX {
790        0
791    } else {
792        max.checked_sub(min_nonzero).unwrap_or_else(|| {
793            panic!(
794                "megakernel fairness skew saw max {max} below min_nonzero {min_nonzero}. Fix: reject malformed fairness counters before telemetry aggregation."
795            )
796        })
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    include!("telemetry_tests.rs");
803}
804