Skip to main content

vyre_runtime/megakernel/
telemetry.rs

1//! Host-side telemetry decoders for the megakernel ring and control buffers.
2//!
3//! The runtime already exposes low-level helpers such as
4//! `read_done_count`, `read_epoch`, and `read_metrics`. This module adds a
5//! single structured snapshot surface useful for wrappers like VyreOffload.
6
7use super::protocol::{control, slot, ARG0_WORD, OPCODE_WORD, STATUS_WORD, TENANT_WORD};
8use super::scaling::{
9    MegakernelLaunchPolicy, MegakernelLaunchRecommendation, MegakernelLaunchRequest,
10    PriorityRequeueAccounting,
11};
12use super::staging_reserve::{
13    reserve_hash_map_capacity, reserve_vec_capacity as reserve_target_capacity,
14};
15use crate::PipelineError;
16
17mod sketch;
18mod types;
19mod errors;
20pub use sketch::{CountMinSketch, SketchTelemetry, SketchTelemetryScratch};
21use types::WindowAccumulator;
22pub use types::{
23    ControlSnapshot, MegakernelRuntimeCounters, MegakernelRuntimeEvidence,
24    MegakernelWatchdogSnapshot, RingOccupancy, RingSlotSnapshot, RingStatus, RingTelemetry,
25    RuntimeEvidenceMetricCoverage, RuntimeEvidenceMetricFamily, TelemetryDecodeCapacityEvidence,
26    TelemetryDecodeScratch, WindowTelemetry, RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
27    TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION,
28};
29
30const SLOT_WORDS_USIZE: usize = 16;
31
32fn read_word(buf: &[u8], word_idx: usize) -> Option<u32> {
33    let off = word_idx.checked_mul(4)?;
34    let end = off.checked_add(4)?;
35    let bytes = buf.get(off..end)?;
36    Some(u32::from_le_bytes(bytes.try_into().ok()?))
37}
38
39fn try_read_slot_chunk_word(slot_bytes: &[u8], word_idx: u32) -> Result<u32, PipelineError> {
40    let word_idx = telemetry_u32_to_usize(word_idx, "slot word index")?;
41    let off = word_idx.checked_mul(4).ok_or_else(|| {
42        errors::slot_word_offset_overflow()
43    })?;
44    let end = off.checked_add(4).ok_or_else(|| {
45        errors::slot_word_end_overflow()
46    })?;
47    let bytes = slot_bytes.get(off..end).ok_or_else(|| {
48        errors::missing_slot_word(word_idx, slot_bytes.len())
49    })?;
50    Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
51}
52
53fn is_sorted_unique_u32(values: &[u32]) -> bool {
54    values.windows(2).all(|pair| pair[0] < pair[1])
55}
56
57impl ControlSnapshot {
58    /// Decode a structured control-buffer view.
59    #[must_use]
60    #[cfg(any(test, feature = "legacy-infallible"))]
61    pub fn decode(control_bytes: &[u8]) -> Self {
62        match Self::try_decode(control_bytes) {
63            Ok(snapshot) => snapshot,
64            Err(_) => Self::default(),
65        }
66    }
67
68    /// Decode a structured control-buffer view into caller-owned storage.
69    #[cfg(any(test, feature = "legacy-infallible"))]
70    pub fn decode_into(control_bytes: &[u8], out: &mut Self) {
71        if Self::try_decode_into(control_bytes, out).is_err() {
72            *out = Self::default();
73        }
74    }
75
76    /// Strictly decode a structured control-buffer view into owned storage.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`PipelineError`] when any fixed control word is missing from
81    /// the control snapshot.
82    pub fn try_decode(control_bytes: &[u8]) -> Result<Self, PipelineError> {
83        let mut out = Self::default();
84        Self::try_decode_into(control_bytes, &mut out)?;
85        Ok(out)
86    }
87
88    /// Strictly decode a structured control-buffer view.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`PipelineError`] when any fixed control word is missing from
93    /// the control snapshot.
94    pub fn try_decode_into(control_bytes: &[u8], out: &mut Self) -> Result<(), PipelineError> {
95        validate_control_snapshot(control_bytes)?;
96        out.shutdown =
97            read_required_control_word(control_bytes, control_word_index(control::SHUTDOWN)?)? != 0;
98        out.done_count =
99            read_required_control_word(control_bytes, control_word_index(control::DONE_COUNT)?)?;
100        out.epoch = read_required_control_word(control_bytes, control_word_index(control::EPOCH)?)?;
101        out.metrics.clear();
102        reserve_target_capacity(
103            &mut out.metrics,
104            telemetry_u32_to_usize(control::METRICS_SLOTS, "metrics slot count")?,
105            "metrics",
106        )?;
107        for i in 0..control::METRICS_SLOTS {
108            let count = read_required_control_word(
109                control_bytes,
110                control_offset_index(control::METRICS_BASE, i)?,
111            )?;
112            if count > 0 {
113                out.metrics.push((i, count));
114            }
115        }
116        out.tenant_fairness.clear();
117        reserve_target_capacity(
118            &mut out.tenant_fairness,
119            telemetry_u32_to_usize(control::TENANT_FAIRNESS_SLOTS, "tenant fairness slot count")?,
120            "tenant fairness",
121        )?;
122        for i in 0..control::TENANT_FAIRNESS_SLOTS {
123            out.tenant_fairness.push(read_required_control_word(
124                control_bytes,
125                control_offset_index(control::TENANT_FAIRNESS_BASE, i)?,
126            )?);
127        }
128        out.priority_fairness.clear();
129        reserve_target_capacity(
130            &mut out.priority_fairness,
131            telemetry_u32_to_usize(
132                control::PRIORITY_FAIRNESS_SLOTS,
133                "priority fairness slot count",
134            )?,
135            "priority fairness",
136        )?;
137        for i in 0..control::PRIORITY_FAIRNESS_SLOTS {
138            out.priority_fairness.push(read_required_control_word(
139                control_bytes,
140                control_offset_index(control::PRIORITY_FAIRNESS_BASE, i)?,
141            )?);
142        }
143        Ok(())
144    }
145}
146
147impl RingTelemetry {
148    /// Decode the ring and control buffers into one structured snapshot.
149    #[must_use]
150    #[cfg(any(test, feature = "legacy-infallible"))]
151    pub fn decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Self {
152        Self::decode_with_window_opcodes(control_bytes, ring_bytes, &[])
153    }
154
155    /// Strictly decode ring and control bytes after validating ABI alignment.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
160    /// the megakernel wire protocol.
161    pub fn try_decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Result<Self, PipelineError> {
162        Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, &[])
163    }
164
165    /// Decode the ring and control buffers, additionally grouping any slots
166    /// whose opcode is present in `window_opcodes` into ticketed route-window
167    /// telemetry records.
168    #[must_use]
169    #[cfg(any(test, feature = "legacy-infallible"))]
170    pub fn decode_with_window_opcodes(
171        control_bytes: &[u8],
172        ring_bytes: &[u8],
173        window_opcodes: &[u32],
174    ) -> Self {
175        match Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, window_opcodes) {
176            Ok(telemetry) => telemetry,
177            Err(_) => Self::default(),
178        }
179    }
180
181    /// Decode the ring and control buffers into caller-owned telemetry and
182    /// scratch storage.
183    #[cfg(any(test, feature = "legacy-infallible"))]
184    pub fn decode_with_window_opcodes_into(
185        control_bytes: &[u8],
186        ring_bytes: &[u8],
187        window_opcodes: &[u32],
188        out: &mut Self,
189        scratch: &mut TelemetryDecodeScratch,
190    ) {
191        Self::try_decode_with_window_opcodes_into(
192            control_bytes,
193            ring_bytes,
194            window_opcodes,
195            out,
196            scratch,
197        )
198        .unwrap_or_else(|_| {
199            *out = Self::default();
200            scratch.clear();
201        });
202    }
203
204    fn try_decode_with_window_opcodes_into_unchecked(
205        control_bytes: &[u8],
206        ring_bytes: &[u8],
207        window_opcodes: &[u32],
208        out: &mut Self,
209        scratch: &mut TelemetryDecodeScratch,
210    ) -> Result<(), PipelineError> {
211        enum WindowOpcodeMatcher<'a> {
212            None,
213            Single(u32),
214            DenseBitmap(u128),
215            SmallSlice(&'a [u32]),
216            LargeSlice(&'a [u32]),
217        }
218
219        ControlSnapshot::try_decode_into(control_bytes, &mut out.control)?;
220        let slot_count = ring_bytes.len() / slot_byte_len()?;
221        out.occupancy = RingOccupancy::default();
222        out.slots.clear();
223        reserve_target_capacity(&mut out.slots, slot_count, "ring slots")?;
224        out.windows.clear();
225        scratch.window_opcodes.clear();
226        scratch.windows.clear();
227        let window_opcode_lookup = if window_opcodes.is_empty() {
228            &[][..]
229        } else if is_sorted_unique_u32(window_opcodes) {
230            window_opcodes
231        } else {
232            reserve_target_capacity(
233                &mut scratch.window_opcodes,
234                window_opcodes.len(),
235                "window opcode scratch",
236            )?;
237            scratch.window_opcodes.extend_from_slice(window_opcodes);
238            scratch.window_opcodes.sort_unstable();
239            scratch.window_opcodes.dedup();
240            scratch.window_opcodes.as_slice()
241        };
242        let window_opcode_matcher = match window_opcode_lookup {
243            [] => WindowOpcodeMatcher::None,
244            [opcode] => WindowOpcodeMatcher::Single(*opcode),
245            opcodes if opcodes.len() > 1 && opcodes.iter().all(|opcode| *opcode < 128) => {
246                let bitmap = opcodes
247                    .iter()
248                    .fold(0_u128, |acc, &opcode| acc | (1_u128 << opcode));
249                WindowOpcodeMatcher::DenseBitmap(bitmap)
250            }
251            opcodes if opcodes.len() <= 8 => WindowOpcodeMatcher::SmallSlice(opcodes),
252            opcodes => WindowOpcodeMatcher::LargeSlice(opcodes),
253        };
254        if !matches!(window_opcode_matcher, WindowOpcodeMatcher::None) {
255            reserve_hash_map_capacity(
256                &mut scratch.windows,
257                slot_count,
258                "window accumulator scratch",
259            )?;
260        }
261        let decode_windows = !matches!(window_opcode_matcher, WindowOpcodeMatcher::None);
262
263        let slot_byte_len = slot_byte_len()?;
264        for (slot_idx, slot_bytes) in ring_bytes.chunks_exact(slot_byte_len).enumerate() {
265            let slot_idx = u32::try_from(slot_idx).map_err(|source| {
266                PipelineError::Backend(format!(
267                    "megakernel telemetry slot index cannot fit u32: {source}. Fix: shard ring snapshots before host decode."
268                ))
269            })?;
270            let status_raw = try_read_slot_chunk_word(slot_bytes, STATUS_WORD)?;
271            let status = RingStatus::from_raw(status_raw);
272            match status {
273                RingStatus::Empty => out.occupancy.empty += 1,
274                RingStatus::Published => out.occupancy.published += 1,
275                RingStatus::Claimed => out.occupancy.claimed += 1,
276                RingStatus::Done => out.occupancy.done += 1,
277                RingStatus::WaitIo => out.occupancy.wait_io += 1,
278                RingStatus::Yield => out.occupancy.yield_count += 1,
279                RingStatus::Requeue => out.occupancy.requeue += 1,
280                RingStatus::Fault => out.occupancy.fault += 1,
281                RingStatus::Unknown(_) => out.occupancy.unknown += 1,
282            }
283            let tenant_id = try_read_slot_chunk_word(slot_bytes, TENANT_WORD)?;
284            let opcode = try_read_slot_chunk_word(slot_bytes, OPCODE_WORD)?;
285            let args_prefix = [
286                try_read_slot_chunk_word(slot_bytes, ARG0_WORD)?,
287                try_read_slot_chunk_word(slot_bytes, ARG0_WORD + 1)?,
288                try_read_slot_chunk_word(slot_bytes, ARG0_WORD + 2)?,
289            ];
290            let is_window_opcode = match window_opcode_matcher {
291                WindowOpcodeMatcher::None => false,
292                WindowOpcodeMatcher::Single(expected) => opcode == expected,
293                WindowOpcodeMatcher::DenseBitmap(bitmap) => {
294                    opcode < 128 && ((bitmap >> opcode) & 1) == 1
295                }
296                WindowOpcodeMatcher::SmallSlice(window_opcodes) => window_opcodes.contains(&opcode),
297                WindowOpcodeMatcher::LargeSlice(window_opcodes) => {
298                    window_opcodes.binary_search(&opcode).is_ok()
299                }
300            };
301            if decode_windows && is_window_opcode {
302                let ticket = args_prefix[0];
303                let class_tag = args_prefix[1];
304                let entry =
305                    scratch
306                        .windows
307                        .entry((ticket, opcode))
308                        .or_insert_with(|| WindowAccumulator {
309                            tenant_id,
310                            opcode,
311                            ..WindowAccumulator::default()
312                        });
313                match class_tag {
314                    0 => entry.required_slots += 1,
315                    1 => entry.lookahead_slots += 1,
316                    _ => {}
317                }
318                match status {
319                    RingStatus::Published => entry.published += 1,
320                    RingStatus::Claimed => entry.claimed += 1,
321                    RingStatus::Done => entry.done += 1,
322                    RingStatus::WaitIo => entry.wait_io += 1,
323                    RingStatus::Yield => entry.yield_count += 1,
324                    RingStatus::Requeue => entry.requeue += 1,
325                    RingStatus::Fault => entry.fault += 1,
326                    RingStatus::Empty | RingStatus::Unknown(_) => {}
327                }
328            }
329            out.slots.push(RingSlotSnapshot {
330                slot_idx,
331                status,
332                tenant_id,
333                opcode,
334                args_prefix,
335            });
336        }
337
338        reserve_target_capacity(&mut out.windows, scratch.windows.len(), "window output")?;
339        for (&(ticket, _), acc) in &scratch.windows {
340            out.windows.push(WindowTelemetry {
341                ticket,
342                tenant_id: acc.tenant_id,
343                opcode: acc.opcode,
344                required_slots: acc.required_slots,
345                lookahead_slots: acc.lookahead_slots,
346                published: acc.published,
347                claimed: acc.claimed,
348                done: acc.done,
349                wait_io: acc.wait_io,
350                yield_count: acc.yield_count,
351                requeue: acc.requeue,
352                fault: acc.fault,
353            });
354        }
355        out.windows
356            .sort_unstable_by_key(|window| (window.ticket, window.opcode));
357        Ok(())
358    }
359
360    /// Strictly decode ring/control bytes and group selected window opcodes.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
365    /// the megakernel wire protocol.
366    pub fn try_decode_with_window_opcodes(
367        control_bytes: &[u8],
368        ring_bytes: &[u8],
369        window_opcodes: &[u32],
370    ) -> Result<Self, PipelineError> {
371        validate_telemetry_buffers(control_bytes, ring_bytes)?;
372        let mut out = Self::default();
373        let mut scratch = TelemetryDecodeScratch::new();
374        Self::try_decode_with_window_opcodes_into_unchecked(
375            control_bytes,
376            ring_bytes,
377            window_opcodes,
378            &mut out,
379            &mut scratch,
380        )?;
381        Ok(out)
382    }
383
384    /// Strictly decode ring/control bytes into caller-owned telemetry and
385    /// scratch storage.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`PipelineError`] when buffers are truncated or not aligned to
390    /// the megakernel wire protocol.
391    pub fn try_decode_with_window_opcodes_into(
392        control_bytes: &[u8],
393        ring_bytes: &[u8],
394        window_opcodes: &[u32],
395        out: &mut Self,
396        scratch: &mut TelemetryDecodeScratch,
397    ) -> Result<(), PipelineError> {
398        validate_telemetry_buffers(control_bytes, ring_bytes)?;
399        Self::try_decode_with_window_opcodes_into_unchecked(
400            control_bytes,
401            ring_bytes,
402            window_opcodes,
403            out,
404            scratch,
405        )?;
406        Ok(())
407    }
408
409    /// Active slots matching a given opcode.
410    #[must_use]
411    #[cfg(any(test, feature = "legacy-infallible"))]
412    pub fn active_slots_for_opcode(&self, opcode: u32) -> Vec<&RingSlotSnapshot> {
413        match self.try_active_slots_for_opcode(opcode) {
414            Ok(slots) => slots,
415            Err(_) => Vec::default(),
416        }
417    }
418
419    /// Active slots matching a given opcode with fallible output staging.
420    ///
421    /// # Errors
422    ///
423    /// Returns [`PipelineError`] when output storage cannot be reserved.
424    pub fn try_active_slots_for_opcode(
425        &self,
426        opcode: u32,
427    ) -> Result<Vec<&RingSlotSnapshot>, PipelineError> {
428        let mut out = Vec::default();
429        self.try_active_slots_for_opcode_into(opcode, &mut out)?;
430        Ok(out)
431    }
432
433    /// Active slots matching a given opcode as an iterator.
434    pub fn active_slots_for_opcode_iter(
435        &self,
436        opcode: u32,
437    ) -> impl Iterator<Item = &RingSlotSnapshot> {
438        self.slots
439            .iter()
440            .filter(move |slot| slot.opcode == opcode && slot.status.is_active())
441    }
442
443    /// Active slots matching a given opcode into caller-owned storage.
444    #[cfg(any(test, feature = "legacy-infallible"))]
445    pub fn active_slots_for_opcode_into<'a>(
446        &'a self,
447        opcode: u32,
448        out: &mut Vec<&'a RingSlotSnapshot>,
449    ) {
450        if self.try_active_slots_for_opcode_into(opcode, out).is_err() {
451            out.clear();
452        }
453    }
454
455    /// Active slots matching a given opcode into caller-owned storage.
456    ///
457    /// # Errors
458    ///
459    /// Returns [`PipelineError`] when output storage cannot be reserved.
460    pub fn try_active_slots_for_opcode_into<'a>(
461        &'a self,
462        opcode: u32,
463        out: &mut Vec<&'a RingSlotSnapshot>,
464    ) -> Result<(), PipelineError> {
465        out.clear();
466        reserve_target_capacity(out, self.slots.len(), "active slot output")?;
467        self.slots
468            .iter()
469            .filter(|slot| slot.opcode == opcode && slot.status.is_active())
470            .for_each(|slot| out.push(slot));
471        Ok(())
472    }
473
474    /// Unfinished ticketed windows.
475    #[must_use]
476    #[cfg(any(test, feature = "legacy-infallible"))]
477    pub fn active_windows(&self) -> Vec<&WindowTelemetry> {
478        match self.try_active_windows() {
479            Ok(windows) => windows,
480            Err(_) => Vec::default(),
481        }
482    }
483
484    /// Unfinished ticketed windows with fallible output staging.
485    ///
486    /// # Errors
487    ///
488    /// Returns [`PipelineError`] when output storage cannot be reserved.
489    pub fn try_active_windows(&self) -> Result<Vec<&WindowTelemetry>, PipelineError> {
490        let mut out = Vec::default();
491        self.try_active_windows_into(&mut out)?;
492        Ok(out)
493    }
494
495    /// Unfinished ticketed windows into caller-owned storage.
496    #[cfg(any(test, feature = "legacy-infallible"))]
497    pub fn active_windows_into<'a>(&'a self, out: &mut Vec<&'a WindowTelemetry>) {
498        if self.try_active_windows_into(out).is_err() {
499            out.clear();
500        }
501    }
502
503    /// Unfinished ticketed windows into caller-owned storage.
504    ///
505    /// # Errors
506    ///
507    /// Returns [`PipelineError`] when output storage cannot be reserved.
508    pub fn try_active_windows_into<'a>(
509        &'a self,
510        out: &mut Vec<&'a WindowTelemetry>,
511    ) -> Result<(), PipelineError> {
512        out.clear();
513        reserve_target_capacity(out, self.windows.len(), "active window output")?;
514        self.windows
515            .iter()
516            .filter(|window| window.is_active())
517            .for_each(|window| out.push(window));
518        Ok(())
519    }
520
521    /// Summarize priority requeue/aging pressure visible in the ring snapshot.
522    #[must_use]
523    pub fn priority_accounting(&self) -> PriorityRequeueAccounting {
524        PriorityRequeueAccounting {
525            requeue_count: u64::from(self.occupancy.requeue),
526            aged_promotions: 0,
527            max_priority_age: 0,
528        }
529    }
530
531    /// Aggregate queue, idle, fairness, and drain counters into one cheap
532    /// runtime snapshot for SRE dashboards and launch-policy feedback.
533    #[must_use]
534    #[cfg(any(test, feature = "legacy-infallible"))]
535    pub fn runtime_counters(&self) -> MegakernelRuntimeCounters {
536        match self.try_runtime_counters() {
537            Ok(counters) => counters,
538            Err(_) => zero_runtime_counters(),
539        }
540    }
541
542    /// Fallibly aggregate queue, idle, fairness, and drain counters.
543    ///
544    /// # Errors
545    ///
546    /// Returns [`PipelineError`] when counter aggregation overflows or decoded
547    /// telemetry contains an impossible relationship.
548    pub fn try_runtime_counters(&self) -> Result<MegakernelRuntimeCounters, PipelineError> {
549        let total_slots = self.occupancy.total_slots();
550        let queue_depth = self.occupancy.queue_depth();
551        let gpu_idle_slots = self.occupancy.empty;
552        let gpu_idle_ppm = if total_slots == 0 {
553            0
554        } else {
555            let raw_idle_ppm = (u64::from(gpu_idle_slots) * 1_000_000) / u64::from(total_slots);
556            raw_idle_ppm.min(1_000_000) as u32
557        };
558        let frontier_density_bps = try_density_bps(queue_depth, total_slots)?;
559        let active_slots = total_slots.saturating_sub(gpu_idle_slots);
560        let occupancy_proxy_bps = try_density_bps(active_slots, total_slots)?;
561        let tenant_fairness_total = try_sum_u32_as_u64(
562            &self.control.tenant_fairness,
563            "tenant fairness total",
564            "shard tenant counters before telemetry aggregation",
565        )?;
566        let priority_fairness_total = try_sum_u32_as_u64(
567            &self.control.priority_fairness,
568            "priority fairness total",
569            "shard priority counters before telemetry aggregation",
570        )?;
571        let tenant_fairness_skew = try_fairness_skew(&self.control.tenant_fairness)?;
572        Ok(MegakernelRuntimeCounters {
573            total_slots,
574            queue_depth,
575            gpu_idle_slots,
576            gpu_idle_ppm,
577            frontier_density_bps,
578            occupancy_proxy_bps,
579            drained_slots: self.control.done_count,
580            unreclaimed_done_slots: self.occupancy.done,
581            tenant_fairness_total,
582            tenant_fairness_skew,
583            priority_fairness_total,
584            requeue_slots: self.occupancy.requeue,
585            fault_slots: self.occupancy.fault,
586        })
587    }
588
589    /// Derive persistent-kernel health from two snapshots without polling the
590    /// device or synchronizing with the GPU.
591    #[must_use]
592    #[cfg(any(test, feature = "legacy-infallible"))]
593    pub fn health_since(&self, previous: &RingTelemetry) -> MegakernelWatchdogSnapshot {
594        match self.try_health_since(previous) {
595            Ok(snapshot) => snapshot,
596            Err(_) => zero_watchdog_snapshot(),
597        }
598    }
599
600    /// Fallibly derive persistent-kernel health from two snapshots.
601    ///
602    /// # Errors
603    ///
604    /// Returns [`PipelineError`] when counters wrap, move backwards, or cannot
605    /// be aggregated without overflow.
606    pub fn try_health_since(
607        &self,
608        previous: &RingTelemetry,
609    ) -> Result<MegakernelWatchdogSnapshot, PipelineError> {
610        let counters = self.try_runtime_counters()?;
611        let done_delta = self
612            .control
613            .done_count
614            .checked_sub(previous.control.done_count)
615            .ok_or_else(|| {
616                errors::done_counter_backwards(previous.control.done_count, self.control.done_count)
617            })?;
618        let suspected_stall =
619            counters.queue_depth > 0 && done_delta == 0 && counters.fault_slots == 0;
620        Ok(MegakernelWatchdogSnapshot {
621            done_delta,
622            queue_depth: counters.queue_depth,
623            fault_slots: counters.fault_slots,
624            requeue_slots: counters.requeue_slots,
625            gpu_idle_ppm: counters.gpu_idle_ppm,
626            suspected_stall,
627        })
628    }
629
630    /// Feed telemetry into the shared launch policy.
631    ///
632    /// # Errors
633    ///
634    /// Returns a backend error when the supplied adapter limits are malformed.
635    pub fn recommend_launch(
636        &self,
637        mut request: MegakernelLaunchRequest,
638    ) -> Result<MegakernelLaunchRecommendation, vyre_driver::BackendError> {
639        let counters = self
640            .try_runtime_counters()
641            .map_err(errors::launch_telemetry_failed)?;
642        if request.graph_node_count == 0 {
643            request.graph_node_count = counters.total_slots;
644        }
645        if request.graph_edge_count == 0 {
646            request.graph_edge_count = counters.queue_depth;
647        }
648        if request.frontier_density_bps == 0 {
649            request.frontier_density_bps = counters.frontier_density_bps;
650        }
651        request.hot_opcode_count = self
652            .control
653            .metrics
654            .iter()
655            .filter(|(_, count)| *count > 0)
656            .count()
657            .try_into()
658            .map_err(errors::hot_opcode_count_overflow)?;
659        let mut hot_window_count = 0usize;
660        for window in &self.windows {
661            let demand = window
662                .required_slots
663                .checked_add(window.lookahead_slots)
664                .ok_or_else(|| {
665                    errors::route_window_demand_overflow()
666                })?;
667            if demand >= 4 {
668                hot_window_count = hot_window_count.checked_add(1).ok_or_else(|| {
669                    errors::hot_window_count_overflow()
670                })?;
671            }
672        }
673        request.hot_window_count = hot_window_count
674            .try_into()
675            .map_err(errors::hot_window_count_too_wide)?;
676        request.requeue_count = request
677            .requeue_count
678            .checked_add(u64::from(self.occupancy.requeue))
679            .ok_or_else(errors::requeue_count_overflow)?;
680        MegakernelLaunchPolicy::standard().recommend(request)
681    }
682}
683
684/// All-zero runtime counters, returned by the infallible `runtime_counters`
685/// accessor when the fallible decode path reports an error.
686#[cfg(any(test, feature = "legacy-infallible"))]
687fn zero_runtime_counters() -> MegakernelRuntimeCounters {
688    MegakernelRuntimeCounters {
689        total_slots: 0,
690        queue_depth: 0,
691        gpu_idle_slots: 0,
692        gpu_idle_ppm: 0,
693        frontier_density_bps: 0,
694        occupancy_proxy_bps: 0,
695        drained_slots: 0,
696        unreclaimed_done_slots: 0,
697        tenant_fairness_total: 0,
698        tenant_fairness_skew: 0,
699        priority_fairness_total: 0,
700        requeue_slots: 0,
701        fault_slots: 0,
702    }
703}
704
705/// All-zero watchdog snapshot, returned by the infallible `health_since`
706/// accessor when the fallible derivation path reports an error.
707#[cfg(any(test, feature = "legacy-infallible"))]
708fn zero_watchdog_snapshot() -> MegakernelWatchdogSnapshot {
709    MegakernelWatchdogSnapshot {
710        done_delta: 0,
711        queue_depth: 0,
712        fault_slots: 0,
713        requeue_slots: 0,
714        gpu_idle_ppm: 0,
715        suspected_stall: false,
716    }
717}
718
719fn read_required_control_word(control_bytes: &[u8], word_idx: usize) -> Result<u32, PipelineError> {
720    read_word(control_bytes, word_idx).ok_or_else(|| errors::missing_control_word(word_idx))
721}
722
723fn try_density_bps(numerator: u32, denominator: u32) -> Result<u16, PipelineError> {
724    if denominator == 0 {
725        return Ok(0);
726    }
727    let bps = (u64::from(numerator) * 10_000) / u64::from(denominator);
728    u16::try_from(bps.min(u64::from(u16::MAX))).map_err(errors::density_bps_overflow)
729}
730
731fn validate_telemetry_buffers(
732    control_bytes: &[u8],
733    ring_bytes: &[u8],
734) -> Result<(), PipelineError> {
735    validate_control_snapshot(control_bytes)?;
736    let slot_bytes = slot_byte_len()?;
737    if ring_bytes.len() % slot_bytes != 0 {
738        return Err(errors::ring_slot_alignment(ring_bytes.len(), slot_bytes));
739    }
740    let slot_count = ring_bytes.len() / slot_bytes;
741    if u32::try_from(slot_count).is_err() {
742        return Err(errors::ring_slot_count_too_wide(slot_count));
743    }
744    Ok(())
745}
746
747fn validate_control_snapshot(control_bytes: &[u8]) -> Result<(), PipelineError> {
748    let min_control = super::protocol::control_byte_len(0).ok_or_else(|| {
749        errors::control_length_overflow()
750    })?;
751    if control_bytes.len() < min_control || control_bytes.len() % 4 != 0 {
752        return Err(errors::bad_control_snapshot(
753            control_bytes.len(),
754            min_control,
755        ));
756    }
757    Ok(())
758}
759
760fn slot_byte_len() -> Result<usize, PipelineError> {
761    SLOT_WORDS_USIZE.checked_mul(4).ok_or_else(|| {
762        errors::slot_byte_width_overflow()
763    })
764}
765
766fn telemetry_u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
767    usize::try_from(value).map_err(|source| errors::telemetry_u32_to_usize(value, label, source))
768}
769
770fn control_word_index(word: u32) -> Result<usize, PipelineError> {
771    usize::try_from(word).map_err(|source| errors::control_word_index(word, source))
772}
773
774fn control_offset_index(base: u32, offset: u32) -> Result<usize, PipelineError> {
775    let word = base.checked_add(offset).ok_or_else(|| {
776        errors::control_word_offset_overflow()
777    })?;
778    control_word_index(word)
779}
780
781fn try_sum_u32_as_u64(
782    counters: &[u32],
783    label: &'static str,
784    fix: &'static str,
785) -> Result<u64, PipelineError> {
786    counters.iter().try_fold(0u64, |acc, &count| {
787        acc.checked_add(u64::from(count)).ok_or_else(|| {
788            errors::counter_sum_overflow(label, fix)
789        })
790    })
791}
792
793fn try_fairness_skew(counters: &[u32]) -> Result<u32, PipelineError> {
794    let mut min_nonzero = u32::MAX;
795    let mut max = 0u32;
796    for &count in counters {
797        if count != 0 {
798            min_nonzero = min_nonzero.min(count);
799            max = max.max(count);
800        }
801    }
802    if min_nonzero == u32::MAX {
803        Ok(0)
804    } else {
805        max.checked_sub(min_nonzero).ok_or_else(|| {
806            errors::fairness_skew_invalid(max, min_nonzero)
807        })
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    include!("telemetry_tests.rs");
814}