vyre_runtime/megakernel/telemetry/
sketch.rs1use super::RingTelemetry;
2use crate::PipelineError;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct CountMinSketch {
12 depth: usize,
13 width: usize,
14 counters: Vec<u64>,
15}
16
17impl CountMinSketch {
18 pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
25 if depth == 0 || width == 0 {
26 return Err(PipelineError::QueueFull {
27 queue: "telemetry",
28 fix: "Count-Min sketch depth and width must be non-zero",
29 });
30 }
31 let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
32 queue: "telemetry",
33 fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
34 })?;
35 let mut counters = Vec::new();
36 reserve_counter_capacity(&mut counters, len)?;
37 counters.resize(len, 0);
38 Ok(Self {
39 depth,
40 width,
41 counters,
42 })
43 }
44
45 #[must_use]
47 pub const fn depth(&self) -> usize {
48 self.depth
49 }
50
51 #[must_use]
53 pub const fn width(&self) -> usize {
54 self.width
55 }
56
57 #[must_use]
59 pub fn counters(&self) -> &[u64] {
60 &self.counters
61 }
62
63 pub fn clear(&mut self) {
65 self.counters.fill(0);
66 }
67
68 pub fn reset_shape(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
75 if depth == 0 || width == 0 {
76 return Err(PipelineError::QueueFull {
77 queue: "telemetry",
78 fix: "Count-Min sketch depth and width must be non-zero",
79 });
80 }
81 let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
82 queue: "telemetry",
83 fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
84 })?;
85 if self.depth == depth && self.width == width && self.counters.len() == len {
86 self.counters.fill(0);
87 return Ok(());
88 }
89 self.depth = depth;
90 self.width = width;
91 self.counters.clear();
92 reserve_counter_capacity(&mut self.counters, len)?;
93 self.counters.resize(len, 0);
94 Ok(())
95 }
96
97 pub fn add(&mut self, key: u32, amount: u64) {
99 if amount == 0 {
100 return;
101 }
102 for row in 0..self.depth {
103 let idx = self.bucket(row, key);
104 self.counters[idx] = self.counters[idx].saturating_add(amount);
105 }
106 }
107
108 pub fn try_add(&mut self, key: u32, amount: u64) -> Result<(), PipelineError> {
114 if amount == 0 {
115 return Ok(());
116 }
117 for row in 0..self.depth {
118 let idx = self.bucket(row, key);
119 self.counters[idx] = self.counters[idx].checked_add(amount).ok_or_else(|| {
120 PipelineError::Backend(format!(
121 "Count-Min sketch counter overflowed for row {row}, key {key}. Fix: snapshot and clear telemetry before counters reach u64::MAX."
122 ))
123 })?;
124 }
125 Ok(())
126 }
127
128 #[must_use]
130 pub fn estimate(&self, key: u32) -> u64 {
131 (0..self.depth)
132 .map(|row| self.counters[self.bucket(row, key)])
133 .min()
134 .unwrap_or(0)
135 }
136
137 pub fn merge(&mut self, other: &Self) -> Result<(), PipelineError> {
143 if self.depth != other.depth || self.width != other.width {
144 return Err(PipelineError::Backend(format!(
145 "cannot merge Count-Min sketches with shapes {}x{} and {}x{}. Fix: construct telemetry sketches with the same dimensions.",
146 self.depth, self.width, other.depth, other.width
147 )));
148 }
149 for (left, right) in self.counters.iter_mut().zip(&other.counters) {
150 *left = left.checked_add(*right).ok_or_else(|| {
151 PipelineError::Backend(
152 "Count-Min sketch merge overflowed u64. Fix: merge and clear telemetry more frequently."
153 .to_string(),
154 )
155 })?;
156 }
157 Ok(())
158 }
159
160 fn bucket(&self, row: usize, key: u32) -> usize {
161 let row_u64 = row as u64;
162 let hash = splitmix64(u64::from(key) ^ row_u64.wrapping_mul(0x9E37_79B9_7F4A_7C15));
163 let bucket = (hash % self.width as u64) as usize;
164 row * self.width + bucket
165 }
166}
167
168fn reserve_counter_capacity(counters: &mut Vec<u64>, len: usize) -> Result<(), PipelineError> {
169 vyre_foundation::allocation::try_reserve_vec_to_capacity(counters, len).map_err(|source| {
170 PipelineError::Backend(format!(
171 "Count-Min sketch could not reserve {len} counters: {source}. Fix: reduce telemetry sketch depth or width."
172 ))
173 })
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct SketchTelemetry {
179 pub ring_opcode: CountMinSketch,
181 pub active_opcode: CountMinSketch,
183 pub tenant: CountMinSketch,
185 pub status: CountMinSketch,
187 pub dispatch_metrics: CountMinSketch,
189 pub total_slots: u64,
191 pub active_slots: u64,
193}
194
195#[derive(Debug)]
197pub struct SketchTelemetryScratch {
198 pub ring_opcode: CountMinSketch,
200 pub active_opcode: CountMinSketch,
202 pub tenant: CountMinSketch,
204 pub status: CountMinSketch,
206 pub dispatch_metrics: CountMinSketch,
208 pub(super) total_slots: u64,
209 pub(super) active_slots: u64,
210}
211
212impl SketchTelemetryScratch {
213 pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
219 Ok(Self {
220 ring_opcode: CountMinSketch::new(depth, width)?,
221 active_opcode: CountMinSketch::new(depth, width)?,
222 tenant: CountMinSketch::new(depth, width)?,
223 status: CountMinSketch::new(depth, width)?,
224 dispatch_metrics: CountMinSketch::new(depth, width)?,
225 total_slots: 0,
226 active_slots: 0,
227 })
228 }
229
230 fn reset(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
231 self.ring_opcode.reset_shape(depth, width)?;
232 self.active_opcode.reset_shape(depth, width)?;
233 self.tenant.reset_shape(depth, width)?;
234 self.status.reset_shape(depth, width)?;
235 self.dispatch_metrics.reset_shape(depth, width)?;
236 self.total_slots = 0;
237 self.active_slots = 0;
238 Ok(())
239 }
240
241 #[must_use]
243 pub fn to_snapshot(&self) -> SketchTelemetry {
244 SketchTelemetry {
245 ring_opcode: self.ring_opcode.clone(),
246 active_opcode: self.active_opcode.clone(),
247 tenant: self.tenant.clone(),
248 status: self.status.clone(),
249 dispatch_metrics: self.dispatch_metrics.clone(),
250 total_slots: self.total_slots,
251 active_slots: self.active_slots,
252 }
253 }
254
255 #[must_use]
259 pub fn into_snapshot(self) -> SketchTelemetry {
260 SketchTelemetry {
261 ring_opcode: self.ring_opcode,
262 active_opcode: self.active_opcode,
263 tenant: self.tenant,
264 status: self.status,
265 dispatch_metrics: self.dispatch_metrics,
266 total_slots: self.total_slots,
267 active_slots: self.active_slots,
268 }
269 }
270}
271
272impl RingTelemetry {
273 pub fn sketch(&self, depth: usize, width: usize) -> Result<SketchTelemetry, PipelineError> {
284 let mut scratch = SketchTelemetryScratch::new(depth, width)?;
285 self.sketch_into(depth, width, &mut scratch)?;
286 Ok(scratch.into_snapshot())
287 }
288
289 pub fn sketch_into(
295 &self,
296 depth: usize,
297 width: usize,
298 scratch: &mut SketchTelemetryScratch,
299 ) -> Result<(), PipelineError> {
300 scratch.reset(depth, width)?;
301
302 for slot in &self.slots {
303 scratch.ring_opcode.try_add(slot.opcode, 1)?;
304 scratch.tenant.try_add(slot.tenant_id, 1)?;
305 scratch.status.try_add(slot.status.raw(), 1)?;
306 if slot.status.is_active() {
307 scratch.active_slots = scratch.active_slots.checked_add(1).ok_or_else(|| {
308 PipelineError::Backend(
309 "active megakernel telemetry slot count overflowed u64. Fix: snapshot telemetry before counters reach u64::MAX."
310 .to_string(),
311 )
312 })?;
313 scratch.active_opcode.try_add(slot.opcode, 1)?;
314 }
315 }
316
317 for (opcode_idx, count) in &self.control.metrics {
318 scratch
319 .dispatch_metrics
320 .try_add(*opcode_idx, u64::from(*count))?;
321 }
322 scratch.total_slots = u64::try_from(self.slots.len()).map_err(|error| {
323 PipelineError::Backend(format!(
324 "megakernel telemetry slot count cannot fit u64: {error}. Fix: shard telemetry snapshots before sketching."
325 ))
326 })?;
327 Ok(())
328 }
329}
330
331fn splitmix64(mut value: u64) -> u64 {
332 value = value.wrapping_add(0x9E37_79B9_7F4A_7C15);
333 value = (value ^ (value >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
334 value = (value ^ (value >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
335 value ^ (value >> 31)
336}