Skip to main content

vyre_runtime/megakernel/telemetry/
sketch.rs

1use super::RingTelemetry;
2use crate::PipelineError;
3
4/// Fixed-depth Count-Min sketch for compact megakernel telemetry.
5///
6/// The layout is intentionally plain `Vec<u64>` plus `(depth, width)` so the
7/// same shape can be mirrored by GPU control buffers later. Hashing is
8/// deterministic and seed-indexed; no host randomness is involved, which keeps
9/// replay and regression tests stable.
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct CountMinSketch {
12    depth: usize,
13    width: usize,
14    counters: Vec<u64>,
15}
16
17impl CountMinSketch {
18    /// Create a zeroed sketch with the requested dimensions.
19    ///
20    /// # Errors
21    ///
22    /// Returns [`PipelineError`] when either dimension is zero or the counter
23    /// table size overflows host address space.
24    pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
25        if depth == 0 || width == 0 {
26            return Err(PipelineError::QueueFull {
27                queue: "telemetry",
28                fix: "Count-Min sketch depth and width must be non-zero",
29            });
30        }
31        let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
32            queue: "telemetry",
33            fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
34        })?;
35        let mut counters = Vec::new();
36        reserve_counter_capacity(&mut counters, len)?;
37        counters.resize(len, 0);
38        Ok(Self {
39            depth,
40            width,
41            counters,
42        })
43    }
44
45    /// Number of independent hash rows.
46    #[must_use]
47    pub const fn depth(&self) -> usize {
48        self.depth
49    }
50
51    /// Number of counters per hash row.
52    #[must_use]
53    pub const fn width(&self) -> usize {
54        self.width
55    }
56
57    /// Raw row-major counters. Intended for readback, replay, and tests.
58    #[must_use]
59    pub fn counters(&self) -> &[u64] {
60        &self.counters
61    }
62
63    /// Reset all counters to zero while retaining allocation.
64    pub fn clear(&mut self) {
65        self.counters.fill(0);
66    }
67
68    /// Resize this sketch shape and clear counters.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`PipelineError`] when either dimension is zero or the counter
73    /// table size overflows host address space.
74    pub fn reset_shape(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
75        if depth == 0 || width == 0 {
76            return Err(PipelineError::QueueFull {
77                queue: "telemetry",
78                fix: "Count-Min sketch depth and width must be non-zero",
79            });
80        }
81        let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
82            queue: "telemetry",
83            fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
84        })?;
85        if self.depth == depth && self.width == width && self.counters.len() == len {
86            self.counters.fill(0);
87            return Ok(());
88        }
89        self.depth = depth;
90        self.width = width;
91        self.counters.clear();
92        reserve_counter_capacity(&mut self.counters, len)?;
93        self.counters.resize(len, 0);
94        Ok(())
95    }
96
97    /// Add `amount` to every row bucket selected for `key`.
98    pub fn add(&mut self, key: u32, amount: u64) {
99        if amount == 0 {
100            return;
101        }
102        for row in 0..self.depth {
103            let idx = self.bucket(row, key);
104            self.counters[idx] = self.counters[idx].saturating_add(amount);
105        }
106    }
107
108    /// Checked add of `amount` to every row bucket selected for `key`.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`PipelineError`] when a counter would overflow u64.
113    pub fn try_add(&mut self, key: u32, amount: u64) -> Result<(), PipelineError> {
114        if amount == 0 {
115            return Ok(());
116        }
117        for row in 0..self.depth {
118            let idx = self.bucket(row, key);
119            self.counters[idx] = self.counters[idx].checked_add(amount).ok_or_else(|| {
120                PipelineError::Backend(format!(
121                    "Count-Min sketch counter overflowed for row {row}, key {key}. Fix: snapshot and clear telemetry before counters reach u64::MAX."
122                ))
123            })?;
124        }
125        Ok(())
126    }
127
128    /// Conservative point estimate for `key`.
129    #[must_use]
130    pub fn estimate(&self, key: u32) -> u64 {
131        (0..self.depth)
132            .map(|row| self.counters[self.bucket(row, key)])
133            .min()
134            .unwrap_or(0)
135    }
136
137    /// Merge another sketch with identical dimensions into this sketch.
138    ///
139    /// # Errors
140    ///
141    /// Returns [`PipelineError`] if the sketches have different shapes.
142    pub fn merge(&mut self, other: &Self) -> Result<(), PipelineError> {
143        if self.depth != other.depth || self.width != other.width {
144            return Err(PipelineError::Backend(format!(
145                "cannot merge Count-Min sketches with shapes {}x{} and {}x{}. Fix: construct telemetry sketches with the same dimensions.",
146                self.depth, self.width, other.depth, other.width
147            )));
148        }
149        for (left, right) in self.counters.iter_mut().zip(&other.counters) {
150            *left = left.checked_add(*right).ok_or_else(|| {
151                PipelineError::Backend(
152                    "Count-Min sketch merge overflowed u64. Fix: merge and clear telemetry more frequently."
153                        .to_string(),
154                )
155            })?;
156        }
157        Ok(())
158    }
159
160    fn bucket(&self, row: usize, key: u32) -> usize {
161        let row_u64 = row as u64;
162        let hash = splitmix64(u64::from(key) ^ row_u64.wrapping_mul(0x9E37_79B9_7F4A_7C15));
163        let bucket = (hash % self.width as u64) as usize;
164        row * self.width + bucket
165    }
166}
167
168fn reserve_counter_capacity(counters: &mut Vec<u64>, len: usize) -> Result<(), PipelineError> {
169    vyre_foundation::allocation::try_reserve_vec_to_capacity(counters, len).map_err(|source| {
170        PipelineError::Backend(format!(
171            "Count-Min sketch could not reserve {len} counters: {source}. Fix: reduce telemetry sketch depth or width."
172        ))
173    })
174}
175
176/// Compact sketch summary derived from a megakernel telemetry snapshot.
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SketchTelemetry {
179    /// Ring slots by opcode, regardless of terminal status.
180    pub ring_opcode: CountMinSketch,
181    /// Active ring slots by opcode.
182    pub active_opcode: CountMinSketch,
183    /// Ring slots by tenant id.
184    pub tenant: CountMinSketch,
185    /// Ring slots by raw status discriminant.
186    pub status: CountMinSketch,
187    /// Control-buffer dispatch metrics by opcode metric index.
188    pub dispatch_metrics: CountMinSketch,
189    /// Total decoded ring slots.
190    pub total_slots: u64,
191    /// Active decoded ring slots.
192    pub active_slots: u64,
193}
194
195/// Caller-owned scratch for repeated compact telemetry sketches.
196#[derive(Debug)]
197pub struct SketchTelemetryScratch {
198    /// Ring slots by opcode, regardless of terminal status.
199    pub ring_opcode: CountMinSketch,
200    /// Active ring slots by opcode.
201    pub active_opcode: CountMinSketch,
202    /// Ring slots by tenant id.
203    pub tenant: CountMinSketch,
204    /// Ring slots by raw status discriminant.
205    pub status: CountMinSketch,
206    /// Control-buffer dispatch metrics by opcode metric index.
207    pub dispatch_metrics: CountMinSketch,
208    pub(super) total_slots: u64,
209    pub(super) active_slots: u64,
210}
211
212impl SketchTelemetryScratch {
213    /// Create reusable sketch scratch with the requested dimensions.
214    ///
215    /// # Errors
216    ///
217    /// Returns [`PipelineError`] when dimensions are invalid.
218    pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
219        Ok(Self {
220            ring_opcode: CountMinSketch::new(depth, width)?,
221            active_opcode: CountMinSketch::new(depth, width)?,
222            tenant: CountMinSketch::new(depth, width)?,
223            status: CountMinSketch::new(depth, width)?,
224            dispatch_metrics: CountMinSketch::new(depth, width)?,
225            total_slots: 0,
226            active_slots: 0,
227        })
228    }
229
230    fn reset(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
231        self.ring_opcode.reset_shape(depth, width)?;
232        self.active_opcode.reset_shape(depth, width)?;
233        self.tenant.reset_shape(depth, width)?;
234        self.status.reset_shape(depth, width)?;
235        self.dispatch_metrics.reset_shape(depth, width)?;
236        self.total_slots = 0;
237        self.active_slots = 0;
238        Ok(())
239    }
240
241    /// Convert this reusable scratch into an owned snapshot.
242    #[must_use]
243    pub fn to_snapshot(&self) -> SketchTelemetry {
244        SketchTelemetry {
245            ring_opcode: self.ring_opcode.clone(),
246            active_opcode: self.active_opcode.clone(),
247            tenant: self.tenant.clone(),
248            status: self.status.clone(),
249            dispatch_metrics: self.dispatch_metrics.clone(),
250            total_slots: self.total_slots,
251            active_slots: self.active_slots,
252        }
253    }
254
255    /// Move this scratch into an owned snapshot without cloning counter
256    /// arrays. Use this for one-shot sketches; keep [`Self::to_snapshot`] for
257    /// long-lived scratch that must be reused after sampling.
258    #[must_use]
259    pub fn into_snapshot(self) -> SketchTelemetry {
260        SketchTelemetry {
261            ring_opcode: self.ring_opcode,
262            active_opcode: self.active_opcode,
263            tenant: self.tenant,
264            status: self.status,
265            dispatch_metrics: self.dispatch_metrics,
266            total_slots: self.total_slots,
267            active_slots: self.active_slots,
268        }
269    }
270}
271
272impl RingTelemetry {
273    /// Build compact sketches from the decoded telemetry snapshot.
274    ///
275    /// This is the host mirror of the telemetry shape a GPU-resident
276    /// scheduler/fuzzer can maintain in control memory: hot opcodes, active
277    /// work, tenant pressure, status pressure, and dispatch metrics all become
278    /// bounded-size counters with deterministic replay semantics.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`PipelineError`] when sketch dimensions are invalid.
283    pub fn sketch(&self, depth: usize, width: usize) -> Result<SketchTelemetry, PipelineError> {
284        let mut scratch = SketchTelemetryScratch::new(depth, width)?;
285        self.sketch_into(depth, width, &mut scratch)?;
286        Ok(scratch.into_snapshot())
287    }
288
289    /// Build compact sketches into caller-owned scratch.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`PipelineError`] when sketch dimensions are invalid.
294    pub fn sketch_into(
295        &self,
296        depth: usize,
297        width: usize,
298        scratch: &mut SketchTelemetryScratch,
299    ) -> Result<(), PipelineError> {
300        scratch.reset(depth, width)?;
301
302        for slot in &self.slots {
303            scratch.ring_opcode.try_add(slot.opcode, 1)?;
304            scratch.tenant.try_add(slot.tenant_id, 1)?;
305            scratch.status.try_add(slot.status.raw(), 1)?;
306            if slot.status.is_active() {
307                scratch.active_slots = scratch.active_slots.checked_add(1).ok_or_else(|| {
308                    PipelineError::Backend(
309                        "active megakernel telemetry slot count overflowed u64. Fix: snapshot telemetry before counters reach u64::MAX."
310                            .to_string(),
311                    )
312                })?;
313                scratch.active_opcode.try_add(slot.opcode, 1)?;
314            }
315        }
316
317        for (opcode_idx, count) in &self.control.metrics {
318            scratch
319                .dispatch_metrics
320                .try_add(*opcode_idx, u64::from(*count))?;
321        }
322        scratch.total_slots = u64::try_from(self.slots.len()).map_err(|error| {
323            PipelineError::Backend(format!(
324                "megakernel telemetry slot count cannot fit u64: {error}. Fix: shard telemetry snapshots before sketching."
325            ))
326        })?;
327        Ok(())
328    }
329}
330
331fn splitmix64(mut value: u64) -> u64 {
332    value = value.wrapping_add(0x9E37_79B9_7F4A_7C15);
333    value = (value ^ (value >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
334    value = (value ^ (value >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
335    value ^ (value >> 31)
336}