Skip to main content

vyre_runtime/megakernel/telemetry/
types.rs

1use super::slot;
2use rustc_hash::FxHashMap;
3
4/// Decoded top-level ring slot state.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum RingStatus {
7    /// Slot is free.
8    Empty,
9    /// Slot is published and waiting for a worker.
10    Published,
11    /// Slot has been claimed by a worker.
12    Claimed,
13    /// Slot completed and can be recycled.
14    Done,
15    /// Slot is waiting for an asynchronous IO continuation.
16    WaitIo,
17    /// Slot yielded execution back to the scheduler.
18    Yield,
19    /// Slot is heavily contested and has been requeued.
20    Requeue,
21    /// Slot hit a hardware or software fault constraint.
22    Fault,
23    /// Unknown raw wire value.
24    Unknown(u32),
25}
26
27impl RingStatus {
28    #[must_use]
29    pub(super) fn from_raw(raw: u32) -> Self {
30        match raw {
31            slot::EMPTY => Self::Empty,
32            slot::PUBLISHED => Self::Published,
33            slot::CLAIMED => Self::Claimed,
34            slot::DONE => Self::Done,
35            slot::WAIT_IO => Self::WaitIo,
36            slot::YIELD => Self::Yield,
37            slot::REQUEUE => Self::Requeue,
38            slot::FAULT => Self::Fault,
39            other => Self::Unknown(other),
40        }
41    }
42
43    /// Raw wire discriminant for sketching, replay, and compact telemetry.
44    #[must_use]
45    pub const fn raw(self) -> u32 {
46        match self {
47            Self::Empty => slot::EMPTY,
48            Self::Published => slot::PUBLISHED,
49            Self::Claimed => slot::CLAIMED,
50            Self::Done => slot::DONE,
51            Self::WaitIo => slot::WAIT_IO,
52            Self::Yield => slot::YIELD,
53            Self::Requeue => slot::REQUEUE,
54            Self::Fault => slot::FAULT,
55            Self::Unknown(raw) => raw,
56        }
57    }
58
59    /// Whether this status still represents in-flight work rather than a
60    /// terminal slot outcome.
61    #[must_use]
62    pub const fn is_active(self) -> bool {
63        matches!(
64            self,
65            Self::Published | Self::Claimed | Self::WaitIo | Self::Yield | Self::Requeue
66        )
67    }
68}
69
70/// Snapshot of one ring slot.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct RingSlotSnapshot {
73    /// Zero-based slot index.
74    pub slot_idx: u32,
75    /// Current state.
76    pub status: RingStatus,
77    /// Tenant id assigned to the slot.
78    pub tenant_id: u32,
79    /// Top-level opcode currently stored in the slot.
80    pub opcode: u32,
81    /// First three argument words, useful for quick debugging.
82    pub args_prefix: [u32; 3],
83}
84
85/// Aggregated telemetry for one ticketed route window.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct WindowTelemetry {
88    /// Stable ticket id encoded in `arg0`.
89    pub ticket: u32,
90    /// Tenant id shared by all emitted slots in this window.
91    pub tenant_id: u32,
92    /// Opcode shared by the window payload slots.
93    pub opcode: u32,
94    /// Number of required slots in the window.
95    pub required_slots: u32,
96    /// Number of lookahead slots in the window.
97    pub lookahead_slots: u32,
98    /// Number of slots currently published.
99    pub published: u32,
100    /// Number of slots currently claimed.
101    pub claimed: u32,
102    /// Number of slots completed.
103    pub done: u32,
104    /// Number of slots waiting for I/O.
105    pub wait_io: u32,
106    /// Number of yielded slots.
107    pub yield_count: u32,
108    /// Number of requeued slots.
109    pub requeue: u32,
110    /// Number of faulted slots.
111    pub fault: u32,
112}
113
114impl WindowTelemetry {
115    /// Whether this ticket still has unfinished work in the ring.
116    #[must_use]
117    pub const fn is_active(&self) -> bool {
118        self.published > 0
119            || self.claimed > 0
120            || self.wait_io > 0
121            || self.yield_count > 0
122            || self.requeue > 0
123    }
124}
125
126/// Slot occupancy counts across the ring.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub struct RingOccupancy {
129    /// Number of empty slots.
130    pub empty: u32,
131    /// Number of published slots.
132    pub published: u32,
133    /// Number of claimed slots.
134    pub claimed: u32,
135    /// Number of done slots.
136    pub done: u32,
137    /// Number of slots waiting for IO.
138    pub wait_io: u32,
139    /// Number of slots yielded.
140    pub yield_count: u32,
141    /// Number of requeued slots.
142    pub requeue: u32,
143    /// Number of faulted slots.
144    pub fault: u32,
145    /// Number of slots with unrecognized raw status values.
146    pub unknown: u32,
147}
148
149impl RingOccupancy {
150    /// Total slots represented by this occupancy snapshot.
151    #[must_use]
152    pub fn total_slots(&self) -> u32 {
153        checked_status_sum(
154            [
155                self.empty,
156                self.published,
157                self.claimed,
158                self.done,
159                self.wait_io,
160                self.yield_count,
161                self.requeue,
162                self.fault,
163                self.unknown,
164            ],
165            "total ring slots",
166        )
167    }
168
169    /// Host-visible active queue depth: all non-empty slots that are not done.
170    #[must_use]
171    pub fn queue_depth(&self) -> u32 {
172        checked_status_sum(
173            [
174                self.published,
175                self.claimed,
176                self.wait_io,
177                self.yield_count,
178                self.requeue,
179                self.fault,
180                self.unknown,
181            ],
182            "ring queue depth",
183        )
184    }
185}
186
187fn checked_status_sum<const N: usize>(values: [u32; N], label: &'static str) -> u32 {
188    values
189        .into_iter()
190        .try_fold(0_u32, |acc, value| acc.checked_add(value))
191        .unwrap_or_else(|| {
192            panic!("megakernel telemetry {label} overflowed u32. Fix: shard the ring snapshot.")
193        })
194}
195
196/// Structured view of the control buffer.
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub struct ControlSnapshot {
199    /// Shutdown flag.
200    pub shutdown: bool,
201    /// Total drained slots.
202    pub done_count: u32,
203    /// Epoch value (batch fences).
204    pub epoch: u32,
205    /// Non-zero opcode metrics.
206    pub metrics: Vec<(u32, u32)>,
207    /// Per-tenant fairness counters (cumulative).
208    pub tenant_fairness: Vec<u32>,
209    /// Per-priority fairness counters (cumulative).
210    pub priority_fairness: Vec<u32>,
211}
212
213/// Aggregated runtime performance counters derived from one telemetry snapshot.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub struct MegakernelRuntimeCounters {
216    /// Total ring slots represented by the snapshot.
217    pub total_slots: u32,
218    /// Active queue depth: published/claimed/waiting/requeued/fault/unknown slots.
219    pub queue_depth: u32,
220    /// Empty ring slots, used as the host-visible idle-capacity signal.
221    pub gpu_idle_slots: u32,
222    /// Idle slots in parts per million of the ring size.
223    pub gpu_idle_ppm: u32,
224    /// Active frontier density in basis points of the ring size.
225    pub frontier_density_bps: u16,
226    /// Occupancy proxy in basis points: non-idle slots divided by total slots.
227    pub occupancy_proxy_bps: u16,
228    /// Total slots the GPU has drained according to the control buffer.
229    pub drained_slots: u32,
230    /// Done slots visible in the ring snapshot and pending reclaim.
231    pub unreclaimed_done_slots: u32,
232    /// Sum of tenant fairness counters.
233    pub tenant_fairness_total: u64,
234    /// Max minus min non-zero tenant fairness counter.
235    pub tenant_fairness_skew: u32,
236    /// Sum of priority fairness counters.
237    pub priority_fairness_total: u64,
238    /// Requeued slots visible in the ring.
239    pub requeue_slots: u32,
240    /// Faulted slots visible in the ring.
241    pub fault_slots: u32,
242}
243
244/// Watchdog view computed from two host-visible telemetry snapshots.
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub struct MegakernelWatchdogSnapshot {
247    /// Increase in drained slots between the previous and current snapshot.
248    pub done_delta: u32,
249    /// Current active queue depth.
250    pub queue_depth: u32,
251    /// Current faulted slots.
252    pub fault_slots: u32,
253    /// Current requeued slots.
254    pub requeue_slots: u32,
255    /// Current idle slots in parts per million.
256    pub gpu_idle_ppm: u32,
257    /// True when work remains queued but no drain progress was observed.
258    pub suspected_stall: bool,
259}
260
261/// Combined host-visible telemetry for a megakernel run.
262#[derive(Debug, Clone, PartialEq, Eq, Default)]
263pub struct RingTelemetry {
264    /// Decoded control-buffer snapshot.
265    pub control: ControlSnapshot,
266    /// Occupancy summary.
267    pub occupancy: RingOccupancy,
268    /// All decoded slots.
269    pub slots: Vec<RingSlotSnapshot>,
270    /// Decoded ticketed windows for any caller-specified window opcodes.
271    pub windows: Vec<WindowTelemetry>,
272}
273
274/// Caller-owned scratch for repeated megakernel telemetry decodes.
275///
276/// Long-running supervisors poll telemetry at high frequency. Reusing this
277/// scratch keeps each sample to straight-line buffer rewrites rather than
278/// per-poll map allocation.
279#[derive(Debug, Default)]
280pub struct TelemetryDecodeScratch {
281    pub(super) window_opcodes: Vec<u32>,
282    pub(super) windows: FxHashMap<(u32, u32), WindowAccumulator>,
283}
284
285impl TelemetryDecodeScratch {
286    /// Construct empty decode scratch.
287    #[must_use]
288    pub fn new() -> Self {
289        Self {
290            window_opcodes: Vec::new(),
291            windows: FxHashMap::default(),
292        }
293    }
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
297pub(super) struct WindowAccumulator {
298    pub(super) tenant_id: u32,
299    pub(super) opcode: u32,
300    pub(super) required_slots: u32,
301    pub(super) lookahead_slots: u32,
302    pub(super) published: u32,
303    pub(super) claimed: u32,
304    pub(super) done: u32,
305    pub(super) wait_io: u32,
306    pub(super) yield_count: u32,
307    pub(super) requeue: u32,
308    pub(super) fault: u32,
309}