vyre_runtime/megakernel/telemetry/types.rs
1use super::slot;
2use rustc_hash::FxHashMap;
3
4/// Decoded top-level ring slot state.
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum RingStatus {
7 /// Slot is free.
8 Empty,
9 /// Slot is published and waiting for a worker.
10 Published,
11 /// Slot has been claimed by a worker.
12 Claimed,
13 /// Slot completed and can be recycled.
14 Done,
15 /// Slot is waiting for an asynchronous IO continuation.
16 WaitIo,
17 /// Slot yielded execution back to the scheduler.
18 Yield,
19 /// Slot is heavily contested and has been requeued.
20 Requeue,
21 /// Slot hit a hardware or software fault constraint.
22 Fault,
23 /// Unknown raw wire value.
24 Unknown(u32),
25}
26
27impl RingStatus {
28 #[must_use]
29 pub(super) fn from_raw(raw: u32) -> Self {
30 match raw {
31 slot::EMPTY => Self::Empty,
32 slot::PUBLISHED => Self::Published,
33 slot::CLAIMED => Self::Claimed,
34 slot::DONE => Self::Done,
35 slot::WAIT_IO => Self::WaitIo,
36 slot::YIELD => Self::Yield,
37 slot::REQUEUE => Self::Requeue,
38 slot::FAULT => Self::Fault,
39 other => Self::Unknown(other),
40 }
41 }
42
43 /// Raw wire discriminant for sketching, replay, and compact telemetry.
44 #[must_use]
45 pub const fn raw(self) -> u32 {
46 match self {
47 Self::Empty => slot::EMPTY,
48 Self::Published => slot::PUBLISHED,
49 Self::Claimed => slot::CLAIMED,
50 Self::Done => slot::DONE,
51 Self::WaitIo => slot::WAIT_IO,
52 Self::Yield => slot::YIELD,
53 Self::Requeue => slot::REQUEUE,
54 Self::Fault => slot::FAULT,
55 Self::Unknown(raw) => raw,
56 }
57 }
58
59 /// Whether this status still represents in-flight work rather than a
60 /// terminal slot outcome.
61 #[must_use]
62 pub const fn is_active(self) -> bool {
63 matches!(
64 self,
65 Self::Published | Self::Claimed | Self::WaitIo | Self::Yield | Self::Requeue
66 )
67 }
68}
69
70/// Snapshot of one ring slot.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct RingSlotSnapshot {
73 /// Zero-based slot index.
74 pub slot_idx: u32,
75 /// Current state.
76 pub status: RingStatus,
77 /// Tenant id assigned to the slot.
78 pub tenant_id: u32,
79 /// Top-level opcode currently stored in the slot.
80 pub opcode: u32,
81 /// First three argument words, useful for quick debugging.
82 pub args_prefix: [u32; 3],
83}
84
85/// Aggregated telemetry for one ticketed route window.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct WindowTelemetry {
88 /// Stable ticket id encoded in `arg0`.
89 pub ticket: u32,
90 /// Tenant id shared by all emitted slots in this window.
91 pub tenant_id: u32,
92 /// Opcode shared by the window payload slots.
93 pub opcode: u32,
94 /// Number of required slots in the window.
95 pub required_slots: u32,
96 /// Number of lookahead slots in the window.
97 pub lookahead_slots: u32,
98 /// Number of slots currently published.
99 pub published: u32,
100 /// Number of slots currently claimed.
101 pub claimed: u32,
102 /// Number of slots completed.
103 pub done: u32,
104 /// Number of slots waiting for I/O.
105 pub wait_io: u32,
106 /// Number of yielded slots.
107 pub yield_count: u32,
108 /// Number of requeued slots.
109 pub requeue: u32,
110 /// Number of faulted slots.
111 pub fault: u32,
112}
113
114impl WindowTelemetry {
115 /// Whether this ticket still has unfinished work in the ring.
116 #[must_use]
117 pub const fn is_active(&self) -> bool {
118 self.published > 0
119 || self.claimed > 0
120 || self.wait_io > 0
121 || self.yield_count > 0
122 || self.requeue > 0
123 }
124}
125
126/// Slot occupancy counts across the ring.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
128pub struct RingOccupancy {
129 /// Number of empty slots.
130 pub empty: u32,
131 /// Number of published slots.
132 pub published: u32,
133 /// Number of claimed slots.
134 pub claimed: u32,
135 /// Number of done slots.
136 pub done: u32,
137 /// Number of slots waiting for IO.
138 pub wait_io: u32,
139 /// Number of slots yielded.
140 pub yield_count: u32,
141 /// Number of requeued slots.
142 pub requeue: u32,
143 /// Number of faulted slots.
144 pub fault: u32,
145 /// Number of slots with unrecognized raw status values.
146 pub unknown: u32,
147}
148
149impl RingOccupancy {
150 /// Total slots represented by this occupancy snapshot.
151 #[must_use]
152 pub fn total_slots(&self) -> u32 {
153 checked_status_sum(
154 [
155 self.empty,
156 self.published,
157 self.claimed,
158 self.done,
159 self.wait_io,
160 self.yield_count,
161 self.requeue,
162 self.fault,
163 self.unknown,
164 ],
165 "total ring slots",
166 )
167 }
168
169 /// Host-visible active queue depth: all non-empty slots that are not done.
170 #[must_use]
171 pub fn queue_depth(&self) -> u32 {
172 checked_status_sum(
173 [
174 self.published,
175 self.claimed,
176 self.wait_io,
177 self.yield_count,
178 self.requeue,
179 self.fault,
180 self.unknown,
181 ],
182 "ring queue depth",
183 )
184 }
185}
186
187fn checked_status_sum<const N: usize>(values: [u32; N], label: &'static str) -> u32 {
188 values
189 .into_iter()
190 .try_fold(0_u32, |acc, value| acc.checked_add(value))
191 .unwrap_or_else(|| {
192 panic!("megakernel telemetry {label} overflowed u32. Fix: shard the ring snapshot.")
193 })
194}
195
196/// Structured view of the control buffer.
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub struct ControlSnapshot {
199 /// Shutdown flag.
200 pub shutdown: bool,
201 /// Total drained slots.
202 pub done_count: u32,
203 /// Epoch value (batch fences).
204 pub epoch: u32,
205 /// Non-zero opcode metrics.
206 pub metrics: Vec<(u32, u32)>,
207 /// Per-tenant fairness counters (cumulative).
208 pub tenant_fairness: Vec<u32>,
209 /// Per-priority fairness counters (cumulative).
210 pub priority_fairness: Vec<u32>,
211}
212
213/// Aggregated runtime performance counters derived from one telemetry snapshot.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub struct MegakernelRuntimeCounters {
216 /// Total ring slots represented by the snapshot.
217 pub total_slots: u32,
218 /// Active queue depth: published/claimed/waiting/requeued/fault/unknown slots.
219 pub queue_depth: u32,
220 /// Empty ring slots, used as the host-visible idle-capacity signal.
221 pub gpu_idle_slots: u32,
222 /// Idle slots in parts per million of the ring size.
223 pub gpu_idle_ppm: u32,
224 /// Active frontier density in basis points of the ring size.
225 pub frontier_density_bps: u16,
226 /// Occupancy proxy in basis points: non-idle slots divided by total slots.
227 pub occupancy_proxy_bps: u16,
228 /// Total slots the GPU has drained according to the control buffer.
229 pub drained_slots: u32,
230 /// Done slots visible in the ring snapshot and pending reclaim.
231 pub unreclaimed_done_slots: u32,
232 /// Sum of tenant fairness counters.
233 pub tenant_fairness_total: u64,
234 /// Max minus min non-zero tenant fairness counter.
235 pub tenant_fairness_skew: u32,
236 /// Sum of priority fairness counters.
237 pub priority_fairness_total: u64,
238 /// Requeued slots visible in the ring.
239 pub requeue_slots: u32,
240 /// Faulted slots visible in the ring.
241 pub fault_slots: u32,
242}
243
244/// Watchdog view computed from two host-visible telemetry snapshots.
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub struct MegakernelWatchdogSnapshot {
247 /// Increase in drained slots between the previous and current snapshot.
248 pub done_delta: u32,
249 /// Current active queue depth.
250 pub queue_depth: u32,
251 /// Current faulted slots.
252 pub fault_slots: u32,
253 /// Current requeued slots.
254 pub requeue_slots: u32,
255 /// Current idle slots in parts per million.
256 pub gpu_idle_ppm: u32,
257 /// True when work remains queued but no drain progress was observed.
258 pub suspected_stall: bool,
259}
260
261/// Combined host-visible telemetry for a megakernel run.
262#[derive(Debug, Clone, PartialEq, Eq, Default)]
263pub struct RingTelemetry {
264 /// Decoded control-buffer snapshot.
265 pub control: ControlSnapshot,
266 /// Occupancy summary.
267 pub occupancy: RingOccupancy,
268 /// All decoded slots.
269 pub slots: Vec<RingSlotSnapshot>,
270 /// Decoded ticketed windows for any caller-specified window opcodes.
271 pub windows: Vec<WindowTelemetry>,
272}
273
274/// Caller-owned scratch for repeated megakernel telemetry decodes.
275///
276/// Long-running supervisors poll telemetry at high frequency. Reusing this
277/// scratch keeps each sample to straight-line buffer rewrites rather than
278/// per-poll map allocation.
279#[derive(Debug, Default)]
280pub struct TelemetryDecodeScratch {
281 pub(super) window_opcodes: Vec<u32>,
282 pub(super) windows: FxHashMap<(u32, u32), WindowAccumulator>,
283}
284
285impl TelemetryDecodeScratch {
286 /// Construct empty decode scratch.
287 #[must_use]
288 pub fn new() -> Self {
289 Self {
290 window_opcodes: Vec::new(),
291 windows: FxHashMap::default(),
292 }
293 }
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
297pub(super) struct WindowAccumulator {
298 pub(super) tenant_id: u32,
299 pub(super) opcode: u32,
300 pub(super) required_slots: u32,
301 pub(super) lookahead_slots: u32,
302 pub(super) published: u32,
303 pub(super) claimed: u32,
304 pub(super) done: u32,
305 pub(super) wait_io: u32,
306 pub(super) yield_count: u32,
307 pub(super) requeue: u32,
308 pub(super) fault: u32,
309}