Skip to main content

vyre_runtime/megakernel/telemetry/
types.rs

1use super::slot;
2use rustc_hash::FxHashMap;
3
4/// Decoded top-level ring slot state.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum RingStatus {
7    /// Slot is free.
8    Empty,
9    /// Slot is published and waiting for a worker.
10    Published,
11    /// Slot has been claimed by a worker.
12    Claimed,
13    /// Slot completed and can be recycled.
14    Done,
15    /// Slot is waiting for an asynchronous IO continuation.
16    WaitIo,
17    /// Slot yielded execution back to the scheduler.
18    Yield,
19    /// Slot is heavily contested and has been requeued.
20    Requeue,
21    /// Slot hit a hardware or software fault constraint.
22    Fault,
23    /// Unknown raw wire value.
24    Unknown(u32),
25}
26
27impl RingStatus {
28    #[must_use]
29    pub(super) fn from_raw(raw: u32) -> Self {
30        match raw {
31            slot::EMPTY => Self::Empty,
32            slot::PUBLISHED => Self::Published,
33            slot::CLAIMED => Self::Claimed,
34            slot::DONE => Self::Done,
35            slot::WAIT_IO => Self::WaitIo,
36            slot::YIELD => Self::Yield,
37            slot::REQUEUE => Self::Requeue,
38            slot::FAULT => Self::Fault,
39            other => Self::Unknown(other),
40        }
41    }
42
43    /// Raw wire discriminant for sketching, replay, and compact telemetry.
44    #[must_use]
45    pub const fn raw(self) -> u32 {
46        match self {
47            Self::Empty => slot::EMPTY,
48            Self::Published => slot::PUBLISHED,
49            Self::Claimed => slot::CLAIMED,
50            Self::Done => slot::DONE,
51            Self::WaitIo => slot::WAIT_IO,
52            Self::Yield => slot::YIELD,
53            Self::Requeue => slot::REQUEUE,
54            Self::Fault => slot::FAULT,
55            Self::Unknown(raw) => raw,
56        }
57    }
58
59    /// Whether this status still represents in-flight work rather than a
60    /// terminal slot outcome.
61    #[must_use]
62    pub const fn is_active(self) -> bool {
63        matches!(
64            self,
65            Self::Published | Self::Claimed | Self::WaitIo | Self::Yield | Self::Requeue
66        )
67    }
68}
69
70/// Snapshot of one ring slot.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct RingSlotSnapshot {
73    /// Zero-based slot index.
74    pub slot_idx: u32,
75    /// Current state.
76    pub status: RingStatus,
77    /// Tenant id assigned to the slot.
78    pub tenant_id: u32,
79    /// Top-level opcode currently stored in the slot.
80    pub opcode: u32,
81    /// First three argument words, useful for quick debugging.
82    pub args_prefix: [u32; 3],
83}
84
85/// Aggregated telemetry for one ticketed route window.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct WindowTelemetry {
88    /// Stable ticket id encoded in `arg0`.
89    pub ticket: u32,
90    /// Tenant id shared by all emitted slots in this window.
91    pub tenant_id: u32,
92    /// Opcode shared by the window payload slots.
93    pub opcode: u32,
94    /// Number of required slots in the window.
95    pub required_slots: u32,
96    /// Number of lookahead slots in the window.
97    pub lookahead_slots: u32,
98    /// Number of slots currently published.
99    pub published: u32,
100    /// Number of slots currently claimed.
101    pub claimed: u32,
102    /// Number of slots completed.
103    pub done: u32,
104    /// Number of slots waiting for I/O.
105    pub wait_io: u32,
106    /// Number of yielded slots.
107    pub yield_count: u32,
108    /// Number of requeued slots.
109    pub requeue: u32,
110    /// Number of faulted slots.
111    pub fault: u32,
112}
113
114impl WindowTelemetry {
115    /// Whether this ticket still has unfinished work in the ring.
116    #[must_use]
117    pub const fn is_active(&self) -> bool {
118        self.published > 0
119            || self.claimed > 0
120            || self.wait_io > 0
121            || self.yield_count > 0
122            || self.requeue > 0
123    }
124}
125
126/// Slot occupancy counts across the ring.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub struct RingOccupancy {
129    /// Number of empty slots.
130    pub empty: u32,
131    /// Number of published slots.
132    pub published: u32,
133    /// Number of claimed slots.
134    pub claimed: u32,
135    /// Number of done slots.
136    pub done: u32,
137    /// Number of slots waiting for IO.
138    pub wait_io: u32,
139    /// Number of slots yielded.
140    pub yield_count: u32,
141    /// Number of requeued slots.
142    pub requeue: u32,
143    /// Number of faulted slots.
144    pub fault: u32,
145    /// Number of slots with unrecognized raw status values.
146    pub unknown: u32,
147}
148
149impl RingOccupancy {
150    /// Total slots represented by this occupancy snapshot.
151    #[must_use]
152    pub fn total_slots(&self) -> u32 {
153        checked_status_sum(
154            [
155                self.empty,
156                self.published,
157                self.claimed,
158                self.done,
159                self.wait_io,
160                self.yield_count,
161                self.requeue,
162                self.fault,
163                self.unknown,
164            ],
165            "total ring slots",
166        )
167    }
168
169    /// Host-visible active queue depth: all non-empty slots that are not done.
170    #[must_use]
171    pub fn queue_depth(&self) -> u32 {
172        checked_status_sum(
173            [
174                self.published,
175                self.claimed,
176                self.wait_io,
177                self.yield_count,
178                self.requeue,
179                self.fault,
180                self.unknown,
181            ],
182            "ring queue depth",
183        )
184    }
185}
186
187/// Schema version for IO/runtime evidence emitted from megakernel telemetry.
188pub const RUNTIME_IO_EVIDENCE_SCHEMA_VERSION: u32 = 1;
189
190/// Required metric families for runtime IO/residency evidence.
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub enum RuntimeEvidenceMetricFamily {
193    /// Ring occupancy metrics are present.
194    Ring,
195    /// Control-buffer decode metrics are present.
196    Control,
197    /// Host/device copy accounting metrics are present.
198    Copy,
199    /// Resident device-byte metrics are present.
200    Residency,
201}
202
203impl RuntimeEvidenceMetricFamily {
204    /// Stable evidence-family token.
205    #[must_use]
206    pub const fn as_str(self) -> &'static str {
207        match self {
208            Self::Ring => "ring",
209            Self::Control => "control",
210            Self::Copy => "copy",
211            Self::Residency => "residency",
212        }
213    }
214}
215
216/// Coverage bits for the required runtime evidence metric families.
217#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
218pub struct RuntimeEvidenceMetricCoverage {
219    /// Ring occupancy metrics are present.
220    pub ring: bool,
221    /// Control-buffer decode metrics are present.
222    pub control: bool,
223    /// Host/device copy accounting metrics are present.
224    pub copy: bool,
225    /// Resident device-byte metrics are present.
226    pub residency: bool,
227}
228
229impl RuntimeEvidenceMetricCoverage {
230    /// Coverage with every runtime evidence family present.
231    #[must_use]
232    pub const fn complete() -> Self {
233        Self {
234            ring: true,
235            control: true,
236            copy: true,
237            residency: true,
238        }
239    }
240
241    /// Missing required metric families.
242    #[must_use]
243    pub fn missing_families(self) -> Vec<RuntimeEvidenceMetricFamily> {
244        let mut missing = Vec::new();
245        if !self.ring {
246            missing.push(RuntimeEvidenceMetricFamily::Ring);
247        }
248        if !self.control {
249            missing.push(RuntimeEvidenceMetricFamily::Control);
250        }
251        if !self.copy {
252            missing.push(RuntimeEvidenceMetricFamily::Copy);
253        }
254        if !self.residency {
255            missing.push(RuntimeEvidenceMetricFamily::Residency);
256        }
257        missing
258    }
259}
260
261/// Runtime-owned IO/residency evidence envelope for release and benchmark artifacts.
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct MegakernelRuntimeEvidence {
264    /// Runtime evidence schema version.
265    pub schema_version: u32,
266    /// Device-resident bytes retained by the dispatch family.
267    pub resident_device_bytes: u64,
268    /// Host-visible copy bytes still required for this evidence sample.
269    pub host_copy_bytes: u64,
270    /// Host copy bytes avoided by resident handles or device-side IO.
271    pub host_copy_avoided_bytes: u64,
272    /// Ring occupancy snapshot.
273    pub ring_occupancy: RingOccupancy,
274    /// Control-buffer decode cost in nanoseconds.
275    pub control_decode_ns: u64,
276    /// Ring decode cost in nanoseconds.
277    pub ring_decode_ns: u64,
278    /// Required metric-family coverage.
279    pub coverage: RuntimeEvidenceMetricCoverage,
280}
281
282impl MegakernelRuntimeEvidence {
283    /// Construct a complete runtime evidence envelope.
284    #[must_use]
285    pub const fn complete(
286        resident_device_bytes: u64,
287        host_copy_bytes: u64,
288        host_copy_avoided_bytes: u64,
289        ring_occupancy: RingOccupancy,
290        control_decode_ns: u64,
291        ring_decode_ns: u64,
292    ) -> Self {
293        Self {
294            schema_version: RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
295            resident_device_bytes,
296            host_copy_bytes,
297            host_copy_avoided_bytes,
298            ring_occupancy,
299            control_decode_ns,
300            ring_decode_ns,
301            coverage: RuntimeEvidenceMetricCoverage::complete(),
302        }
303    }
304
305    /// Metric families missing from this evidence envelope.
306    #[must_use]
307    pub fn missing_metric_families(&self) -> Vec<RuntimeEvidenceMetricFamily> {
308        self.coverage.missing_families()
309    }
310
311    /// Whether all required runtime evidence families are present.
312    #[must_use]
313    pub fn is_complete(&self) -> bool {
314        self.schema_version == RUNTIME_IO_EVIDENCE_SCHEMA_VERSION
315            && self.missing_metric_families().is_empty()
316    }
317
318    /// Avoided host-copy bytes in basis points of total relevant copy volume.
319    #[must_use]
320    pub fn host_copy_avoidance_bps(&self) -> u16 {
321        let total = u128::from(self.host_copy_bytes)
322            .saturating_add(u128::from(self.host_copy_avoided_bytes));
323        if total == 0 {
324            return 0;
325        }
326        let bps = u128::from(self.host_copy_avoided_bytes)
327            .saturating_mul(10_000)
328            / total;
329        bps.min(10_000) as u16
330    }
331}
332
333fn checked_status_sum<const N: usize>(values: [u32; N], label: &'static str) -> u32 {
334    let _ = label;
335    values
336        .into_iter()
337        .fold(0_u32, |acc, value| acc.saturating_add(value))
338}
339
340#[cfg(test)]
341mod evidence_tests {
342    use super::*;
343
344    #[test]
345    fn runtime_evidence_reports_missing_metric_families() {
346        let evidence = MegakernelRuntimeEvidence {
347            schema_version: RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
348            resident_device_bytes: 0,
349            host_copy_bytes: 0,
350            host_copy_avoided_bytes: 0,
351            ring_occupancy: RingOccupancy::default(),
352            control_decode_ns: 0,
353            ring_decode_ns: 0,
354            coverage: RuntimeEvidenceMetricCoverage {
355                ring: true,
356                control: false,
357                copy: true,
358                residency: false,
359            },
360        };
361
362        let missing = evidence
363            .missing_metric_families()
364            .into_iter()
365            .map(RuntimeEvidenceMetricFamily::as_str)
366            .collect::<Vec<_>>();
367
368        assert_eq!(missing, vec!["control", "residency"]);
369        assert!(!evidence.is_complete());
370    }
371
372    #[test]
373    fn runtime_evidence_records_copy_avoidance_and_occupancy() {
374        let evidence = MegakernelRuntimeEvidence::complete(
375            4096,
376            1024,
377            3072,
378            RingOccupancy {
379                empty: 1,
380                published: 2,
381                claimed: 3,
382                done: 4,
383                wait_io: 5,
384                yield_count: 6,
385                requeue: 7,
386                fault: 8,
387                unknown: 9,
388            },
389            11,
390            13,
391        );
392
393        assert!(evidence.is_complete());
394        assert_eq!(evidence.resident_device_bytes, 4096);
395        assert_eq!(evidence.ring_occupancy.total_slots(), 45);
396        assert_eq!(evidence.ring_occupancy.queue_depth(), 40);
397        assert_eq!(evidence.host_copy_avoidance_bps(), 7500);
398    }
399}
400
401/// Structured view of the control buffer.
402#[derive(Debug, Clone, PartialEq, Eq, Default)]
403pub struct ControlSnapshot {
404    /// Shutdown flag.
405    pub shutdown: bool,
406    /// Total drained slots.
407    pub done_count: u32,
408    /// Epoch value (batch fences).
409    pub epoch: u32,
410    /// Non-zero opcode metrics.
411    pub metrics: Vec<(u32, u32)>,
412    /// Per-tenant fairness counters (cumulative).
413    pub tenant_fairness: Vec<u32>,
414    /// Per-priority fairness counters (cumulative).
415    pub priority_fairness: Vec<u32>,
416}
417
418/// Aggregated runtime performance counters derived from one telemetry snapshot.
419#[derive(Debug, Clone, Copy, PartialEq, Eq)]
420pub struct MegakernelRuntimeCounters {
421    /// Total ring slots represented by the snapshot.
422    pub total_slots: u32,
423    /// Active queue depth: published/claimed/waiting/requeued/fault/unknown slots.
424    pub queue_depth: u32,
425    /// Empty ring slots, used as the host-visible idle-capacity signal.
426    pub gpu_idle_slots: u32,
427    /// Idle slots in parts per million of the ring size.
428    pub gpu_idle_ppm: u32,
429    /// Active frontier density in basis points of the ring size.
430    pub frontier_density_bps: u16,
431    /// Occupancy proxy in basis points: non-idle slots divided by total slots.
432    pub occupancy_proxy_bps: u16,
433    /// Total slots the GPU has drained according to the control buffer.
434    pub drained_slots: u32,
435    /// Done slots visible in the ring snapshot and pending reclaim.
436    pub unreclaimed_done_slots: u32,
437    /// Sum of tenant fairness counters.
438    pub tenant_fairness_total: u64,
439    /// Max minus min non-zero tenant fairness counter.
440    pub tenant_fairness_skew: u32,
441    /// Sum of priority fairness counters.
442    pub priority_fairness_total: u64,
443    /// Requeued slots visible in the ring.
444    pub requeue_slots: u32,
445    /// Faulted slots visible in the ring.
446    pub fault_slots: u32,
447}
448
449/// Watchdog view computed from two host-visible telemetry snapshots.
450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
451pub struct MegakernelWatchdogSnapshot {
452    /// Increase in drained slots between the previous and current snapshot.
453    pub done_delta: u32,
454    /// Current active queue depth.
455    pub queue_depth: u32,
456    /// Current faulted slots.
457    pub fault_slots: u32,
458    /// Current requeued slots.
459    pub requeue_slots: u32,
460    /// Current idle slots in parts per million.
461    pub gpu_idle_ppm: u32,
462    /// True when work remains queued but no drain progress was observed.
463    pub suspected_stall: bool,
464}
465
466/// Combined host-visible telemetry for a megakernel run.
467#[derive(Debug, Clone, PartialEq, Eq, Default)]
468pub struct RingTelemetry {
469    /// Decoded control-buffer snapshot.
470    pub control: ControlSnapshot,
471    /// Occupancy summary.
472    pub occupancy: RingOccupancy,
473    /// All decoded slots.
474    pub slots: Vec<RingSlotSnapshot>,
475    /// Decoded ticketed windows for any caller-specified window opcodes.
476    pub windows: Vec<WindowTelemetry>,
477}
478
479/// Schema version for telemetry decode capacity evidence.
480pub const TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION: u32 = 1;
481
482/// Evidence that a telemetry decode used caller-owned output and scratch buffers.
483#[derive(Debug, Clone, Copy, PartialEq, Eq)]
484pub struct TelemetryDecodeCapacityEvidence {
485    /// Evidence schema version.
486    pub schema_version: u32,
487    /// Number of decoded ring slots in the output snapshot.
488    pub decoded_slot_count: usize,
489    /// Capacity of the caller-owned ring-slot output buffer.
490    pub slot_output_capacity: usize,
491    /// Number of decoded route-window rows in the output snapshot.
492    pub decoded_window_count: usize,
493    /// Capacity of the caller-owned route-window output buffer.
494    pub window_output_capacity: usize,
495    /// Capacity retained for sorted window-opcode scratch.
496    pub window_opcode_scratch_capacity: usize,
497    /// Capacity retained for route-window accumulator scratch.
498    pub window_accumulator_scratch_capacity: usize,
499    /// True when evidence was produced from caller-owned scratch.
500    pub uses_caller_owned_scratch: bool,
501}
502
503impl TelemetryDecodeCapacityEvidence {
504    /// Return true when output and scratch capacities cover the decoded rows.
505    #[must_use]
506    pub fn is_complete(self) -> bool {
507        self.schema_version == TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION
508            && self.uses_caller_owned_scratch
509            && self.slot_output_capacity >= self.decoded_slot_count
510            && self.window_output_capacity >= self.decoded_window_count
511            && self.window_accumulator_scratch_capacity >= self.decoded_window_count
512    }
513}
514
515impl RingTelemetry {
516    /// Build capacity evidence for a strict caller-owned telemetry decode.
517    #[must_use]
518    pub fn decode_capacity_evidence(
519        &self,
520        scratch: &TelemetryDecodeScratch,
521    ) -> TelemetryDecodeCapacityEvidence {
522        TelemetryDecodeCapacityEvidence {
523            schema_version: TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION,
524            decoded_slot_count: self.slots.len(),
525            slot_output_capacity: self.slots.capacity(),
526            decoded_window_count: self.windows.len(),
527            window_output_capacity: self.windows.capacity(),
528            window_opcode_scratch_capacity: scratch.window_opcodes.capacity(),
529            window_accumulator_scratch_capacity: scratch.windows.capacity(),
530            uses_caller_owned_scratch: true,
531        }
532    }
533}
534
535/// Caller-owned scratch for repeated megakernel telemetry decodes.
536///
537/// Long-running supervisors poll telemetry at high frequency. Reusing this
538/// scratch keeps each sample to straight-line buffer rewrites rather than
539/// per-poll map allocation.
540#[derive(Debug, Default)]
541pub struct TelemetryDecodeScratch {
542    pub(super) window_opcodes: Vec<u32>,
543    pub(super) windows: FxHashMap<(u32, u32), WindowAccumulator>,
544}
545
546impl TelemetryDecodeScratch {
547    /// Construct empty decode scratch.
548    #[must_use]
549    pub fn new() -> Self {
550        Self {
551            window_opcodes: Vec::new(),
552            windows: FxHashMap::default(),
553        }
554    }
555
556    /// Clear retained decode rows without releasing allocated scratch capacity.
557    pub fn clear(&mut self) {
558        self.window_opcodes.clear();
559        self.windows.clear();
560    }
561}
562
563#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
564pub(super) struct WindowAccumulator {
565    pub(super) tenant_id: u32,
566    pub(super) opcode: u32,
567    pub(super) required_slots: u32,
568    pub(super) lookahead_slots: u32,
569    pub(super) published: u32,
570    pub(super) claimed: u32,
571    pub(super) done: u32,
572    pub(super) wait_io: u32,
573    pub(super) yield_count: u32,
574    pub(super) requeue: u32,
575    pub(super) fault: u32,
576}