Skip to main content

vyre_runtime/megakernel/telemetry/
sketch.rs

1use super::RingTelemetry;
2use crate::PipelineError;
3
4/// Fixed-depth Count-Min sketch for compact megakernel telemetry.
5///
6/// The layout is intentionally plain `Vec<u64>` plus `(depth, width)` so the
7/// same shape can be mirrored by GPU control buffers later. Hashing is
8/// deterministic and seed-indexed; no host randomness is involved, which keeps
9/// replay and regression tests stable.
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct CountMinSketch {
12    depth: usize,
13    width: usize,
14    counters: Vec<u64>,
15}
16
17impl CountMinSketch {
18    /// Create a zeroed sketch with the requested dimensions.
19    ///
20    /// # Errors
21    ///
22    /// Returns [`PipelineError`] when either dimension is zero or the counter
23    /// table size overflows host address space.
24    pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
25        if depth == 0 || width == 0 {
26            return Err(PipelineError::QueueFull {
27                queue: "telemetry",
28                fix: "Count-Min sketch depth and width must be non-zero",
29            });
30        }
31        let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
32            queue: "telemetry",
33            fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
34        })?;
35        let mut counters = Vec::new();
36        reserve_counter_capacity(&mut counters, len)?;
37        counters.resize(len, 0);
38        Ok(Self {
39            depth,
40            width,
41            counters,
42        })
43    }
44
45    /// Number of independent hash rows.
46    #[must_use]
47    pub const fn depth(&self) -> usize {
48        self.depth
49    }
50
51    /// Number of counters per hash row.
52    #[must_use]
53    pub const fn width(&self) -> usize {
54        self.width
55    }
56
57    /// Raw row-major counters. Intended for readback, replay, and tests.
58    #[must_use]
59    pub fn counters(&self) -> &[u64] {
60        &self.counters
61    }
62
63    /// Reset all counters to zero while retaining allocation.
64    pub fn clear(&mut self) {
65        self.counters.fill(0);
66    }
67
68    /// Resize this sketch shape and clear counters.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`PipelineError`] when either dimension is zero or the counter
73    /// table size overflows host address space.
74    pub fn reset_shape(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
75        if depth == 0 || width == 0 {
76            return Err(PipelineError::QueueFull {
77                queue: "telemetry",
78                fix: "Count-Min sketch depth and width must be non-zero",
79            });
80        }
81        let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
82            queue: "telemetry",
83            fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
84        })?;
85        if self.depth == depth && self.width == width && self.counters.len() == len {
86            self.counters.fill(0);
87            return Ok(());
88        }
89        self.depth = depth;
90        self.width = width;
91        self.counters.clear();
92        reserve_counter_capacity(&mut self.counters, len)?;
93        self.counters.resize(len, 0);
94        Ok(())
95    }
96
97    /// Add `amount` to every row bucket selected for `key`.
98    pub fn add(&mut self, key: u32, amount: u64) {
99        if let Err(error) = self.try_add(key, amount) {
100            panic!("{error}");
101        }
102    }
103
104    /// Checked add of `amount` to every row bucket selected for `key`.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`PipelineError`] when a counter would overflow u64.
109    pub fn try_add(&mut self, key: u32, amount: u64) -> Result<(), PipelineError> {
110        if amount == 0 {
111            return Ok(());
112        }
113        for row in 0..self.depth {
114            let idx = self.bucket(row, key);
115            self.counters[idx] = self.counters[idx].checked_add(amount).ok_or_else(|| {
116                PipelineError::Backend(format!(
117                    "Count-Min sketch counter overflowed for row {row}, key {key}. Fix: snapshot and clear telemetry before counters reach u64::MAX."
118                ))
119            })?;
120        }
121        Ok(())
122    }
123
124    /// Conservative point estimate for `key`.
125    #[must_use]
126    pub fn estimate(&self, key: u32) -> u64 {
127        (0..self.depth)
128            .map(|row| self.counters[self.bucket(row, key)])
129            .min()
130            .unwrap_or(0)
131    }
132
133    /// Merge another sketch with identical dimensions into this sketch.
134    ///
135    /// # Errors
136    ///
137    /// Returns [`PipelineError`] if the sketches have different shapes.
138    pub fn merge(&mut self, other: &Self) -> Result<(), PipelineError> {
139        if self.depth != other.depth || self.width != other.width {
140            return Err(PipelineError::Backend(format!(
141                "cannot merge Count-Min sketches with shapes {}x{} and {}x{}. Fix: construct telemetry sketches with the same dimensions.",
142                self.depth, self.width, other.depth, other.width
143            )));
144        }
145        for (left, right) in self.counters.iter_mut().zip(&other.counters) {
146            *left = left.checked_add(*right).ok_or_else(|| {
147                PipelineError::Backend(
148                    "Count-Min sketch merge overflowed u64. Fix: merge and clear telemetry more frequently."
149                        .to_string(),
150                )
151            })?;
152        }
153        Ok(())
154    }
155
156    fn bucket(&self, row: usize, key: u32) -> usize {
157        let row_u64 = u64::try_from(row).unwrap_or_else(|error| {
158            panic!("Count-Min sketch row cannot fit u64: {error}. Fix: reduce sketch depth.")
159        });
160        let hash = splitmix64(u64::from(key) ^ row_u64.wrapping_mul(0x9E37_79B9_7F4A_7C15));
161        let bucket = usize::try_from(
162            hash % u64::try_from(self.width).unwrap_or_else(|error| {
163                panic!("Count-Min sketch width cannot fit u64: {error}. Fix: reduce sketch width.")
164            }),
165        )
166        .unwrap_or_else(|error| {
167            panic!("Count-Min sketch bucket cannot fit usize: {error}. Fix: reduce sketch width.")
168        });
169        row.checked_mul(self.width)
170            .and_then(|base| base.checked_add(bucket))
171            .unwrap_or_else(|| {
172                panic!(
173                    "Count-Min sketch bucket index overflowed usize. Fix: reduce sketch depth or width."
174                )
175            })
176    }
177}
178
179fn reserve_counter_capacity(counters: &mut Vec<u64>, len: usize) -> Result<(), PipelineError> {
180    vyre_foundation::allocation::try_reserve_vec_to_capacity(counters, len).map_err(|source| {
181        PipelineError::Backend(format!(
182            "Count-Min sketch could not reserve {len} counters: {source}. Fix: reduce telemetry sketch depth or width."
183        ))
184    })
185}
186
187/// Compact sketch summary derived from a megakernel telemetry snapshot.
188#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct SketchTelemetry {
190    /// Ring slots by opcode, regardless of terminal status.
191    pub ring_opcode: CountMinSketch,
192    /// Active ring slots by opcode.
193    pub active_opcode: CountMinSketch,
194    /// Ring slots by tenant id.
195    pub tenant: CountMinSketch,
196    /// Ring slots by raw status discriminant.
197    pub status: CountMinSketch,
198    /// Control-buffer dispatch metrics by opcode metric index.
199    pub dispatch_metrics: CountMinSketch,
200    /// Total decoded ring slots.
201    pub total_slots: u64,
202    /// Active decoded ring slots.
203    pub active_slots: u64,
204}
205
206/// Caller-owned scratch for repeated compact telemetry sketches.
207#[derive(Debug)]
208pub struct SketchTelemetryScratch {
209    /// Ring slots by opcode, regardless of terminal status.
210    pub ring_opcode: CountMinSketch,
211    /// Active ring slots by opcode.
212    pub active_opcode: CountMinSketch,
213    /// Ring slots by tenant id.
214    pub tenant: CountMinSketch,
215    /// Ring slots by raw status discriminant.
216    pub status: CountMinSketch,
217    /// Control-buffer dispatch metrics by opcode metric index.
218    pub dispatch_metrics: CountMinSketch,
219    pub(super) total_slots: u64,
220    pub(super) active_slots: u64,
221}
222
223impl SketchTelemetryScratch {
224    /// Create reusable sketch scratch with the requested dimensions.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`PipelineError`] when dimensions are invalid.
229    pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
230        Ok(Self {
231            ring_opcode: CountMinSketch::new(depth, width)?,
232            active_opcode: CountMinSketch::new(depth, width)?,
233            tenant: CountMinSketch::new(depth, width)?,
234            status: CountMinSketch::new(depth, width)?,
235            dispatch_metrics: CountMinSketch::new(depth, width)?,
236            total_slots: 0,
237            active_slots: 0,
238        })
239    }
240
241    fn reset(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
242        self.ring_opcode.reset_shape(depth, width)?;
243        self.active_opcode.reset_shape(depth, width)?;
244        self.tenant.reset_shape(depth, width)?;
245        self.status.reset_shape(depth, width)?;
246        self.dispatch_metrics.reset_shape(depth, width)?;
247        self.total_slots = 0;
248        self.active_slots = 0;
249        Ok(())
250    }
251
252    /// Convert this reusable scratch into an owned snapshot.
253    #[must_use]
254    pub fn to_snapshot(&self) -> SketchTelemetry {
255        SketchTelemetry {
256            ring_opcode: self.ring_opcode.clone(),
257            active_opcode: self.active_opcode.clone(),
258            tenant: self.tenant.clone(),
259            status: self.status.clone(),
260            dispatch_metrics: self.dispatch_metrics.clone(),
261            total_slots: self.total_slots,
262            active_slots: self.active_slots,
263        }
264    }
265
266    /// Move this scratch into an owned snapshot without cloning counter
267    /// arrays. Use this for one-shot sketches; keep [`Self::to_snapshot`] for
268    /// long-lived scratch that must be reused after sampling.
269    #[must_use]
270    pub fn into_snapshot(self) -> SketchTelemetry {
271        SketchTelemetry {
272            ring_opcode: self.ring_opcode,
273            active_opcode: self.active_opcode,
274            tenant: self.tenant,
275            status: self.status,
276            dispatch_metrics: self.dispatch_metrics,
277            total_slots: self.total_slots,
278            active_slots: self.active_slots,
279        }
280    }
281}
282
283impl RingTelemetry {
284    /// Build compact sketches from the decoded telemetry snapshot.
285    ///
286    /// This is the host mirror of the telemetry shape a GPU-resident
287    /// scheduler/fuzzer can maintain in control memory: hot opcodes, active
288    /// work, tenant pressure, status pressure, and dispatch metrics all become
289    /// bounded-size counters with deterministic replay semantics.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`PipelineError`] when sketch dimensions are invalid.
294    pub fn sketch(&self, depth: usize, width: usize) -> Result<SketchTelemetry, PipelineError> {
295        let mut scratch = SketchTelemetryScratch::new(depth, width)?;
296        self.sketch_into(depth, width, &mut scratch)?;
297        Ok(scratch.into_snapshot())
298    }
299
300    /// Build compact sketches into caller-owned scratch.
301    ///
302    /// # Errors
303    ///
304    /// Returns [`PipelineError`] when sketch dimensions are invalid.
305    pub fn sketch_into(
306        &self,
307        depth: usize,
308        width: usize,
309        scratch: &mut SketchTelemetryScratch,
310    ) -> Result<(), PipelineError> {
311        scratch.reset(depth, width)?;
312
313        for slot in &self.slots {
314            scratch.ring_opcode.try_add(slot.opcode, 1)?;
315            scratch.tenant.try_add(slot.tenant_id, 1)?;
316            scratch.status.try_add(slot.status.raw(), 1)?;
317            if slot.status.is_active() {
318                scratch.active_slots = scratch.active_slots.checked_add(1).ok_or_else(|| {
319                    PipelineError::Backend(
320                        "active megakernel telemetry slot count overflowed u64. Fix: snapshot telemetry before counters reach u64::MAX."
321                            .to_string(),
322                    )
323                })?;
324                scratch.active_opcode.try_add(slot.opcode, 1)?;
325            }
326        }
327
328        for (opcode_idx, count) in &self.control.metrics {
329            scratch
330                .dispatch_metrics
331                .try_add(*opcode_idx, u64::from(*count))?;
332        }
333        scratch.total_slots = u64::try_from(self.slots.len()).map_err(|error| {
334            PipelineError::Backend(format!(
335                "megakernel telemetry slot count cannot fit u64: {error}. Fix: shard telemetry snapshots before sketching."
336            ))
337        })?;
338        Ok(())
339    }
340}
341
342fn splitmix64(mut value: u64) -> u64 {
343    value = value.wrapping_add(0x9E37_79B9_7F4A_7C15);
344    value = (value ^ (value >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
345    value = (value ^ (value >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
346    value ^ (value >> 31)
347}