vyre_runtime/megakernel/telemetry/
sketch.rs1use super::RingTelemetry;
2use crate::PipelineError;
3
4#[derive(Debug, Clone, PartialEq, Eq)]
11pub struct CountMinSketch {
12 depth: usize,
13 width: usize,
14 counters: Vec<u64>,
15}
16
17impl CountMinSketch {
18 pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
25 if depth == 0 || width == 0 {
26 return Err(PipelineError::QueueFull {
27 queue: "telemetry",
28 fix: "Count-Min sketch depth and width must be non-zero",
29 });
30 }
31 let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
32 queue: "telemetry",
33 fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
34 })?;
35 let mut counters = Vec::new();
36 reserve_counter_capacity(&mut counters, len)?;
37 counters.resize(len, 0);
38 Ok(Self {
39 depth,
40 width,
41 counters,
42 })
43 }
44
45 #[must_use]
47 pub const fn depth(&self) -> usize {
48 self.depth
49 }
50
51 #[must_use]
53 pub const fn width(&self) -> usize {
54 self.width
55 }
56
57 #[must_use]
59 pub fn counters(&self) -> &[u64] {
60 &self.counters
61 }
62
63 pub fn clear(&mut self) {
65 self.counters.fill(0);
66 }
67
68 pub fn reset_shape(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
75 if depth == 0 || width == 0 {
76 return Err(PipelineError::QueueFull {
77 queue: "telemetry",
78 fix: "Count-Min sketch depth and width must be non-zero",
79 });
80 }
81 let len = depth.checked_mul(width).ok_or(PipelineError::QueueFull {
82 queue: "telemetry",
83 fix: "Count-Min sketch dimensions overflowed host address space; reduce depth or width",
84 })?;
85 if self.depth == depth && self.width == width && self.counters.len() == len {
86 self.counters.fill(0);
87 return Ok(());
88 }
89 self.depth = depth;
90 self.width = width;
91 self.counters.clear();
92 reserve_counter_capacity(&mut self.counters, len)?;
93 self.counters.resize(len, 0);
94 Ok(())
95 }
96
97 pub fn add(&mut self, key: u32, amount: u64) {
99 if let Err(error) = self.try_add(key, amount) {
100 panic!("{error}");
101 }
102 }
103
104 pub fn try_add(&mut self, key: u32, amount: u64) -> Result<(), PipelineError> {
110 if amount == 0 {
111 return Ok(());
112 }
113 for row in 0..self.depth {
114 let idx = self.bucket(row, key);
115 self.counters[idx] = self.counters[idx].checked_add(amount).ok_or_else(|| {
116 PipelineError::Backend(format!(
117 "Count-Min sketch counter overflowed for row {row}, key {key}. Fix: snapshot and clear telemetry before counters reach u64::MAX."
118 ))
119 })?;
120 }
121 Ok(())
122 }
123
124 #[must_use]
126 pub fn estimate(&self, key: u32) -> u64 {
127 (0..self.depth)
128 .map(|row| self.counters[self.bucket(row, key)])
129 .min()
130 .unwrap_or(0)
131 }
132
133 pub fn merge(&mut self, other: &Self) -> Result<(), PipelineError> {
139 if self.depth != other.depth || self.width != other.width {
140 return Err(PipelineError::Backend(format!(
141 "cannot merge Count-Min sketches with shapes {}x{} and {}x{}. Fix: construct telemetry sketches with the same dimensions.",
142 self.depth, self.width, other.depth, other.width
143 )));
144 }
145 for (left, right) in self.counters.iter_mut().zip(&other.counters) {
146 *left = left.checked_add(*right).ok_or_else(|| {
147 PipelineError::Backend(
148 "Count-Min sketch merge overflowed u64. Fix: merge and clear telemetry more frequently."
149 .to_string(),
150 )
151 })?;
152 }
153 Ok(())
154 }
155
156 fn bucket(&self, row: usize, key: u32) -> usize {
157 let row_u64 = u64::try_from(row).unwrap_or_else(|error| {
158 panic!("Count-Min sketch row cannot fit u64: {error}. Fix: reduce sketch depth.")
159 });
160 let hash = splitmix64(u64::from(key) ^ row_u64.wrapping_mul(0x9E37_79B9_7F4A_7C15));
161 let bucket = usize::try_from(
162 hash % u64::try_from(self.width).unwrap_or_else(|error| {
163 panic!("Count-Min sketch width cannot fit u64: {error}. Fix: reduce sketch width.")
164 }),
165 )
166 .unwrap_or_else(|error| {
167 panic!("Count-Min sketch bucket cannot fit usize: {error}. Fix: reduce sketch width.")
168 });
169 row.checked_mul(self.width)
170 .and_then(|base| base.checked_add(bucket))
171 .unwrap_or_else(|| {
172 panic!(
173 "Count-Min sketch bucket index overflowed usize. Fix: reduce sketch depth or width."
174 )
175 })
176 }
177}
178
179fn reserve_counter_capacity(counters: &mut Vec<u64>, len: usize) -> Result<(), PipelineError> {
180 vyre_foundation::allocation::try_reserve_vec_to_capacity(counters, len).map_err(|source| {
181 PipelineError::Backend(format!(
182 "Count-Min sketch could not reserve {len} counters: {source}. Fix: reduce telemetry sketch depth or width."
183 ))
184 })
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
189pub struct SketchTelemetry {
190 pub ring_opcode: CountMinSketch,
192 pub active_opcode: CountMinSketch,
194 pub tenant: CountMinSketch,
196 pub status: CountMinSketch,
198 pub dispatch_metrics: CountMinSketch,
200 pub total_slots: u64,
202 pub active_slots: u64,
204}
205
206#[derive(Debug)]
208pub struct SketchTelemetryScratch {
209 pub ring_opcode: CountMinSketch,
211 pub active_opcode: CountMinSketch,
213 pub tenant: CountMinSketch,
215 pub status: CountMinSketch,
217 pub dispatch_metrics: CountMinSketch,
219 pub(super) total_slots: u64,
220 pub(super) active_slots: u64,
221}
222
223impl SketchTelemetryScratch {
224 pub fn new(depth: usize, width: usize) -> Result<Self, PipelineError> {
230 Ok(Self {
231 ring_opcode: CountMinSketch::new(depth, width)?,
232 active_opcode: CountMinSketch::new(depth, width)?,
233 tenant: CountMinSketch::new(depth, width)?,
234 status: CountMinSketch::new(depth, width)?,
235 dispatch_metrics: CountMinSketch::new(depth, width)?,
236 total_slots: 0,
237 active_slots: 0,
238 })
239 }
240
241 fn reset(&mut self, depth: usize, width: usize) -> Result<(), PipelineError> {
242 self.ring_opcode.reset_shape(depth, width)?;
243 self.active_opcode.reset_shape(depth, width)?;
244 self.tenant.reset_shape(depth, width)?;
245 self.status.reset_shape(depth, width)?;
246 self.dispatch_metrics.reset_shape(depth, width)?;
247 self.total_slots = 0;
248 self.active_slots = 0;
249 Ok(())
250 }
251
252 #[must_use]
254 pub fn to_snapshot(&self) -> SketchTelemetry {
255 SketchTelemetry {
256 ring_opcode: self.ring_opcode.clone(),
257 active_opcode: self.active_opcode.clone(),
258 tenant: self.tenant.clone(),
259 status: self.status.clone(),
260 dispatch_metrics: self.dispatch_metrics.clone(),
261 total_slots: self.total_slots,
262 active_slots: self.active_slots,
263 }
264 }
265
266 #[must_use]
270 pub fn into_snapshot(self) -> SketchTelemetry {
271 SketchTelemetry {
272 ring_opcode: self.ring_opcode,
273 active_opcode: self.active_opcode,
274 tenant: self.tenant,
275 status: self.status,
276 dispatch_metrics: self.dispatch_metrics,
277 total_slots: self.total_slots,
278 active_slots: self.active_slots,
279 }
280 }
281}
282
283impl RingTelemetry {
284 pub fn sketch(&self, depth: usize, width: usize) -> Result<SketchTelemetry, PipelineError> {
295 let mut scratch = SketchTelemetryScratch::new(depth, width)?;
296 self.sketch_into(depth, width, &mut scratch)?;
297 Ok(scratch.into_snapshot())
298 }
299
300 pub fn sketch_into(
306 &self,
307 depth: usize,
308 width: usize,
309 scratch: &mut SketchTelemetryScratch,
310 ) -> Result<(), PipelineError> {
311 scratch.reset(depth, width)?;
312
313 for slot in &self.slots {
314 scratch.ring_opcode.try_add(slot.opcode, 1)?;
315 scratch.tenant.try_add(slot.tenant_id, 1)?;
316 scratch.status.try_add(slot.status.raw(), 1)?;
317 if slot.status.is_active() {
318 scratch.active_slots = scratch.active_slots.checked_add(1).ok_or_else(|| {
319 PipelineError::Backend(
320 "active megakernel telemetry slot count overflowed u64. Fix: snapshot telemetry before counters reach u64::MAX."
321 .to_string(),
322 )
323 })?;
324 scratch.active_opcode.try_add(slot.opcode, 1)?;
325 }
326 }
327
328 for (opcode_idx, count) in &self.control.metrics {
329 scratch
330 .dispatch_metrics
331 .try_add(*opcode_idx, u64::from(*count))?;
332 }
333 scratch.total_slots = u64::try_from(self.slots.len()).map_err(|error| {
334 PipelineError::Backend(format!(
335 "megakernel telemetry slot count cannot fit u64: {error}. Fix: shard telemetry snapshots before sketching."
336 ))
337 })?;
338 Ok(())
339 }
340}
341
342fn splitmix64(mut value: u64) -> u64 {
343 value = value.wrapping_add(0x9E37_79B9_7F4A_7C15);
344 value = (value ^ (value >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
345 value = (value ^ (value >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
346 value ^ (value >> 31)
347}