1use super::protocol::{control, slot, ARG0_WORD, OPCODE_WORD, STATUS_WORD, TENANT_WORD};
8use super::scaling::{
9 MegakernelLaunchPolicy, MegakernelLaunchRecommendation, MegakernelLaunchRequest,
10 PriorityRequeueAccounting,
11};
12use super::staging_reserve::{
13 reserve_hash_map_capacity, reserve_vec_capacity as reserve_target_capacity,
14};
15use crate::PipelineError;
16
17mod sketch;
18mod types;
19mod errors;
20pub use sketch::{CountMinSketch, SketchTelemetry, SketchTelemetryScratch};
21use types::WindowAccumulator;
22pub use types::{
23 ControlSnapshot, MegakernelRuntimeCounters, MegakernelRuntimeEvidence,
24 MegakernelWatchdogSnapshot, RingOccupancy, RingSlotSnapshot, RingStatus, RingTelemetry,
25 RuntimeEvidenceMetricCoverage, RuntimeEvidenceMetricFamily, TelemetryDecodeCapacityEvidence,
26 TelemetryDecodeScratch, WindowTelemetry, RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
27 TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION,
28};
29
30const SLOT_WORDS_USIZE: usize = 16;
31
32fn read_word(buf: &[u8], word_idx: usize) -> Option<u32> {
33 let off = word_idx.checked_mul(4)?;
34 let end = off.checked_add(4)?;
35 let bytes = buf.get(off..end)?;
36 Some(u32::from_le_bytes(bytes.try_into().ok()?))
37}
38
39fn try_read_slot_chunk_word(slot_bytes: &[u8], word_idx: u32) -> Result<u32, PipelineError> {
40 let word_idx = telemetry_u32_to_usize(word_idx, "slot word index")?;
41 let off = word_idx.checked_mul(4).ok_or_else(|| {
42 errors::slot_word_offset_overflow()
43 })?;
44 let end = off.checked_add(4).ok_or_else(|| {
45 errors::slot_word_end_overflow()
46 })?;
47 let bytes = slot_bytes.get(off..end).ok_or_else(|| {
48 errors::missing_slot_word(word_idx, slot_bytes.len())
49 })?;
50 Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
51}
52
53fn is_sorted_unique_u32(values: &[u32]) -> bool {
54 values.windows(2).all(|pair| pair[0] < pair[1])
55}
56
57impl ControlSnapshot {
58 #[must_use]
60 #[cfg(any(test, feature = "legacy-infallible"))]
61 pub fn decode(control_bytes: &[u8]) -> Self {
62 match Self::try_decode(control_bytes) {
63 Ok(snapshot) => snapshot,
64 Err(_) => Self::default(),
65 }
66 }
67
68 #[cfg(any(test, feature = "legacy-infallible"))]
70 pub fn decode_into(control_bytes: &[u8], out: &mut Self) {
71 if Self::try_decode_into(control_bytes, out).is_err() {
72 *out = Self::default();
73 }
74 }
75
76 pub fn try_decode(control_bytes: &[u8]) -> Result<Self, PipelineError> {
83 let mut out = Self::default();
84 Self::try_decode_into(control_bytes, &mut out)?;
85 Ok(out)
86 }
87
88 pub fn try_decode_into(control_bytes: &[u8], out: &mut Self) -> Result<(), PipelineError> {
95 validate_control_snapshot(control_bytes)?;
96 out.shutdown =
97 read_required_control_word(control_bytes, control_word_index(control::SHUTDOWN)?)? != 0;
98 out.done_count =
99 read_required_control_word(control_bytes, control_word_index(control::DONE_COUNT)?)?;
100 out.epoch = read_required_control_word(control_bytes, control_word_index(control::EPOCH)?)?;
101 out.metrics.clear();
102 reserve_target_capacity(
103 &mut out.metrics,
104 telemetry_u32_to_usize(control::METRICS_SLOTS, "metrics slot count")?,
105 "metrics",
106 )?;
107 for i in 0..control::METRICS_SLOTS {
108 let count = read_required_control_word(
109 control_bytes,
110 control_offset_index(control::METRICS_BASE, i)?,
111 )?;
112 if count > 0 {
113 out.metrics.push((i, count));
114 }
115 }
116 out.tenant_fairness.clear();
117 reserve_target_capacity(
118 &mut out.tenant_fairness,
119 telemetry_u32_to_usize(control::TENANT_FAIRNESS_SLOTS, "tenant fairness slot count")?,
120 "tenant fairness",
121 )?;
122 for i in 0..control::TENANT_FAIRNESS_SLOTS {
123 out.tenant_fairness.push(read_required_control_word(
124 control_bytes,
125 control_offset_index(control::TENANT_FAIRNESS_BASE, i)?,
126 )?);
127 }
128 out.priority_fairness.clear();
129 reserve_target_capacity(
130 &mut out.priority_fairness,
131 telemetry_u32_to_usize(
132 control::PRIORITY_FAIRNESS_SLOTS,
133 "priority fairness slot count",
134 )?,
135 "priority fairness",
136 )?;
137 for i in 0..control::PRIORITY_FAIRNESS_SLOTS {
138 out.priority_fairness.push(read_required_control_word(
139 control_bytes,
140 control_offset_index(control::PRIORITY_FAIRNESS_BASE, i)?,
141 )?);
142 }
143 Ok(())
144 }
145}
146
147impl RingTelemetry {
148 #[must_use]
150 #[cfg(any(test, feature = "legacy-infallible"))]
151 pub fn decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Self {
152 Self::decode_with_window_opcodes(control_bytes, ring_bytes, &[])
153 }
154
155 pub fn try_decode(control_bytes: &[u8], ring_bytes: &[u8]) -> Result<Self, PipelineError> {
162 Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, &[])
163 }
164
165 #[must_use]
169 #[cfg(any(test, feature = "legacy-infallible"))]
170 pub fn decode_with_window_opcodes(
171 control_bytes: &[u8],
172 ring_bytes: &[u8],
173 window_opcodes: &[u32],
174 ) -> Self {
175 match Self::try_decode_with_window_opcodes(control_bytes, ring_bytes, window_opcodes) {
176 Ok(telemetry) => telemetry,
177 Err(_) => Self::default(),
178 }
179 }
180
181 #[cfg(any(test, feature = "legacy-infallible"))]
184 pub fn decode_with_window_opcodes_into(
185 control_bytes: &[u8],
186 ring_bytes: &[u8],
187 window_opcodes: &[u32],
188 out: &mut Self,
189 scratch: &mut TelemetryDecodeScratch,
190 ) {
191 Self::try_decode_with_window_opcodes_into(
192 control_bytes,
193 ring_bytes,
194 window_opcodes,
195 out,
196 scratch,
197 )
198 .unwrap_or_else(|_| {
199 *out = Self::default();
200 scratch.clear();
201 });
202 }
203
204 fn try_decode_with_window_opcodes_into_unchecked(
205 control_bytes: &[u8],
206 ring_bytes: &[u8],
207 window_opcodes: &[u32],
208 out: &mut Self,
209 scratch: &mut TelemetryDecodeScratch,
210 ) -> Result<(), PipelineError> {
211 enum WindowOpcodeMatcher<'a> {
212 None,
213 Single(u32),
214 DenseBitmap(u128),
215 SmallSlice(&'a [u32]),
216 LargeSlice(&'a [u32]),
217 }
218
219 ControlSnapshot::try_decode_into(control_bytes, &mut out.control)?;
220 let slot_count = ring_bytes.len() / slot_byte_len()?;
221 out.occupancy = RingOccupancy::default();
222 out.slots.clear();
223 reserve_target_capacity(&mut out.slots, slot_count, "ring slots")?;
224 out.windows.clear();
225 scratch.window_opcodes.clear();
226 scratch.windows.clear();
227 let window_opcode_lookup = if window_opcodes.is_empty() {
228 &[][..]
229 } else if is_sorted_unique_u32(window_opcodes) {
230 window_opcodes
231 } else {
232 reserve_target_capacity(
233 &mut scratch.window_opcodes,
234 window_opcodes.len(),
235 "window opcode scratch",
236 )?;
237 scratch.window_opcodes.extend_from_slice(window_opcodes);
238 scratch.window_opcodes.sort_unstable();
239 scratch.window_opcodes.dedup();
240 scratch.window_opcodes.as_slice()
241 };
242 let window_opcode_matcher = match window_opcode_lookup {
243 [] => WindowOpcodeMatcher::None,
244 [opcode] => WindowOpcodeMatcher::Single(*opcode),
245 opcodes if opcodes.len() > 1 && opcodes.iter().all(|opcode| *opcode < 128) => {
246 let bitmap = opcodes
247 .iter()
248 .fold(0_u128, |acc, &opcode| acc | (1_u128 << opcode));
249 WindowOpcodeMatcher::DenseBitmap(bitmap)
250 }
251 opcodes if opcodes.len() <= 8 => WindowOpcodeMatcher::SmallSlice(opcodes),
252 opcodes => WindowOpcodeMatcher::LargeSlice(opcodes),
253 };
254 if !matches!(window_opcode_matcher, WindowOpcodeMatcher::None) {
255 reserve_hash_map_capacity(
256 &mut scratch.windows,
257 slot_count,
258 "window accumulator scratch",
259 )?;
260 }
261 let decode_windows = !matches!(window_opcode_matcher, WindowOpcodeMatcher::None);
262
263 let slot_byte_len = slot_byte_len()?;
264 for (slot_idx, slot_bytes) in ring_bytes.chunks_exact(slot_byte_len).enumerate() {
265 let slot_idx = u32::try_from(slot_idx).map_err(|source| {
266 PipelineError::Backend(format!(
267 "megakernel telemetry slot index cannot fit u32: {source}. Fix: shard ring snapshots before host decode."
268 ))
269 })?;
270 let status_raw = try_read_slot_chunk_word(slot_bytes, STATUS_WORD)?;
271 let status = RingStatus::from_raw(status_raw);
272 match status {
273 RingStatus::Empty => out.occupancy.empty += 1,
274 RingStatus::Published => out.occupancy.published += 1,
275 RingStatus::Claimed => out.occupancy.claimed += 1,
276 RingStatus::Done => out.occupancy.done += 1,
277 RingStatus::WaitIo => out.occupancy.wait_io += 1,
278 RingStatus::Yield => out.occupancy.yield_count += 1,
279 RingStatus::Requeue => out.occupancy.requeue += 1,
280 RingStatus::Fault => out.occupancy.fault += 1,
281 RingStatus::Unknown(_) => out.occupancy.unknown += 1,
282 }
283 let tenant_id = try_read_slot_chunk_word(slot_bytes, TENANT_WORD)?;
284 let opcode = try_read_slot_chunk_word(slot_bytes, OPCODE_WORD)?;
285 let args_prefix = [
286 try_read_slot_chunk_word(slot_bytes, ARG0_WORD)?,
287 try_read_slot_chunk_word(slot_bytes, ARG0_WORD + 1)?,
288 try_read_slot_chunk_word(slot_bytes, ARG0_WORD + 2)?,
289 ];
290 let is_window_opcode = match window_opcode_matcher {
291 WindowOpcodeMatcher::None => false,
292 WindowOpcodeMatcher::Single(expected) => opcode == expected,
293 WindowOpcodeMatcher::DenseBitmap(bitmap) => {
294 opcode < 128 && ((bitmap >> opcode) & 1) == 1
295 }
296 WindowOpcodeMatcher::SmallSlice(window_opcodes) => window_opcodes.contains(&opcode),
297 WindowOpcodeMatcher::LargeSlice(window_opcodes) => {
298 window_opcodes.binary_search(&opcode).is_ok()
299 }
300 };
301 if decode_windows && is_window_opcode {
302 let ticket = args_prefix[0];
303 let class_tag = args_prefix[1];
304 let entry =
305 scratch
306 .windows
307 .entry((ticket, opcode))
308 .or_insert_with(|| WindowAccumulator {
309 tenant_id,
310 opcode,
311 ..WindowAccumulator::default()
312 });
313 match class_tag {
314 0 => entry.required_slots += 1,
315 1 => entry.lookahead_slots += 1,
316 _ => {}
317 }
318 match status {
319 RingStatus::Published => entry.published += 1,
320 RingStatus::Claimed => entry.claimed += 1,
321 RingStatus::Done => entry.done += 1,
322 RingStatus::WaitIo => entry.wait_io += 1,
323 RingStatus::Yield => entry.yield_count += 1,
324 RingStatus::Requeue => entry.requeue += 1,
325 RingStatus::Fault => entry.fault += 1,
326 RingStatus::Empty | RingStatus::Unknown(_) => {}
327 }
328 }
329 out.slots.push(RingSlotSnapshot {
330 slot_idx,
331 status,
332 tenant_id,
333 opcode,
334 args_prefix,
335 });
336 }
337
338 reserve_target_capacity(&mut out.windows, scratch.windows.len(), "window output")?;
339 for (&(ticket, _), acc) in &scratch.windows {
340 out.windows.push(WindowTelemetry {
341 ticket,
342 tenant_id: acc.tenant_id,
343 opcode: acc.opcode,
344 required_slots: acc.required_slots,
345 lookahead_slots: acc.lookahead_slots,
346 published: acc.published,
347 claimed: acc.claimed,
348 done: acc.done,
349 wait_io: acc.wait_io,
350 yield_count: acc.yield_count,
351 requeue: acc.requeue,
352 fault: acc.fault,
353 });
354 }
355 out.windows
356 .sort_unstable_by_key(|window| (window.ticket, window.opcode));
357 Ok(())
358 }
359
360 pub fn try_decode_with_window_opcodes(
367 control_bytes: &[u8],
368 ring_bytes: &[u8],
369 window_opcodes: &[u32],
370 ) -> Result<Self, PipelineError> {
371 validate_telemetry_buffers(control_bytes, ring_bytes)?;
372 let mut out = Self::default();
373 let mut scratch = TelemetryDecodeScratch::new();
374 Self::try_decode_with_window_opcodes_into_unchecked(
375 control_bytes,
376 ring_bytes,
377 window_opcodes,
378 &mut out,
379 &mut scratch,
380 )?;
381 Ok(out)
382 }
383
384 pub fn try_decode_with_window_opcodes_into(
392 control_bytes: &[u8],
393 ring_bytes: &[u8],
394 window_opcodes: &[u32],
395 out: &mut Self,
396 scratch: &mut TelemetryDecodeScratch,
397 ) -> Result<(), PipelineError> {
398 validate_telemetry_buffers(control_bytes, ring_bytes)?;
399 Self::try_decode_with_window_opcodes_into_unchecked(
400 control_bytes,
401 ring_bytes,
402 window_opcodes,
403 out,
404 scratch,
405 )?;
406 Ok(())
407 }
408
409 #[must_use]
411 #[cfg(any(test, feature = "legacy-infallible"))]
412 pub fn active_slots_for_opcode(&self, opcode: u32) -> Vec<&RingSlotSnapshot> {
413 match self.try_active_slots_for_opcode(opcode) {
414 Ok(slots) => slots,
415 Err(_) => Vec::default(),
416 }
417 }
418
419 pub fn try_active_slots_for_opcode(
425 &self,
426 opcode: u32,
427 ) -> Result<Vec<&RingSlotSnapshot>, PipelineError> {
428 let mut out = Vec::default();
429 self.try_active_slots_for_opcode_into(opcode, &mut out)?;
430 Ok(out)
431 }
432
433 pub fn active_slots_for_opcode_iter(
435 &self,
436 opcode: u32,
437 ) -> impl Iterator<Item = &RingSlotSnapshot> {
438 self.slots
439 .iter()
440 .filter(move |slot| slot.opcode == opcode && slot.status.is_active())
441 }
442
443 #[cfg(any(test, feature = "legacy-infallible"))]
445 pub fn active_slots_for_opcode_into<'a>(
446 &'a self,
447 opcode: u32,
448 out: &mut Vec<&'a RingSlotSnapshot>,
449 ) {
450 if self.try_active_slots_for_opcode_into(opcode, out).is_err() {
451 out.clear();
452 }
453 }
454
455 pub fn try_active_slots_for_opcode_into<'a>(
461 &'a self,
462 opcode: u32,
463 out: &mut Vec<&'a RingSlotSnapshot>,
464 ) -> Result<(), PipelineError> {
465 out.clear();
466 reserve_target_capacity(out, self.slots.len(), "active slot output")?;
467 self.slots
468 .iter()
469 .filter(|slot| slot.opcode == opcode && slot.status.is_active())
470 .for_each(|slot| out.push(slot));
471 Ok(())
472 }
473
474 #[must_use]
476 #[cfg(any(test, feature = "legacy-infallible"))]
477 pub fn active_windows(&self) -> Vec<&WindowTelemetry> {
478 match self.try_active_windows() {
479 Ok(windows) => windows,
480 Err(_) => Vec::default(),
481 }
482 }
483
484 pub fn try_active_windows(&self) -> Result<Vec<&WindowTelemetry>, PipelineError> {
490 let mut out = Vec::default();
491 self.try_active_windows_into(&mut out)?;
492 Ok(out)
493 }
494
495 #[cfg(any(test, feature = "legacy-infallible"))]
497 pub fn active_windows_into<'a>(&'a self, out: &mut Vec<&'a WindowTelemetry>) {
498 if self.try_active_windows_into(out).is_err() {
499 out.clear();
500 }
501 }
502
503 pub fn try_active_windows_into<'a>(
509 &'a self,
510 out: &mut Vec<&'a WindowTelemetry>,
511 ) -> Result<(), PipelineError> {
512 out.clear();
513 reserve_target_capacity(out, self.windows.len(), "active window output")?;
514 self.windows
515 .iter()
516 .filter(|window| window.is_active())
517 .for_each(|window| out.push(window));
518 Ok(())
519 }
520
521 #[must_use]
523 pub fn priority_accounting(&self) -> PriorityRequeueAccounting {
524 PriorityRequeueAccounting {
525 requeue_count: u64::from(self.occupancy.requeue),
526 aged_promotions: 0,
527 max_priority_age: 0,
528 }
529 }
530
531 #[must_use]
534 #[cfg(any(test, feature = "legacy-infallible"))]
535 pub fn runtime_counters(&self) -> MegakernelRuntimeCounters {
536 match self.try_runtime_counters() {
537 Ok(counters) => counters,
538 Err(_) => zero_runtime_counters(),
539 }
540 }
541
542 pub fn try_runtime_counters(&self) -> Result<MegakernelRuntimeCounters, PipelineError> {
549 let total_slots = self.occupancy.total_slots();
550 let queue_depth = self.occupancy.queue_depth();
551 let gpu_idle_slots = self.occupancy.empty;
552 let gpu_idle_ppm = if total_slots == 0 {
553 0
554 } else {
555 let raw_idle_ppm = (u64::from(gpu_idle_slots) * 1_000_000) / u64::from(total_slots);
556 raw_idle_ppm.min(1_000_000) as u32
557 };
558 let frontier_density_bps = try_density_bps(queue_depth, total_slots)?;
559 let active_slots = total_slots.saturating_sub(gpu_idle_slots);
560 let occupancy_proxy_bps = try_density_bps(active_slots, total_slots)?;
561 let tenant_fairness_total = try_sum_u32_as_u64(
562 &self.control.tenant_fairness,
563 "tenant fairness total",
564 "shard tenant counters before telemetry aggregation",
565 )?;
566 let priority_fairness_total = try_sum_u32_as_u64(
567 &self.control.priority_fairness,
568 "priority fairness total",
569 "shard priority counters before telemetry aggregation",
570 )?;
571 let tenant_fairness_skew = try_fairness_skew(&self.control.tenant_fairness)?;
572 Ok(MegakernelRuntimeCounters {
573 total_slots,
574 queue_depth,
575 gpu_idle_slots,
576 gpu_idle_ppm,
577 frontier_density_bps,
578 occupancy_proxy_bps,
579 drained_slots: self.control.done_count,
580 unreclaimed_done_slots: self.occupancy.done,
581 tenant_fairness_total,
582 tenant_fairness_skew,
583 priority_fairness_total,
584 requeue_slots: self.occupancy.requeue,
585 fault_slots: self.occupancy.fault,
586 })
587 }
588
589 #[must_use]
592 #[cfg(any(test, feature = "legacy-infallible"))]
593 pub fn health_since(&self, previous: &RingTelemetry) -> MegakernelWatchdogSnapshot {
594 match self.try_health_since(previous) {
595 Ok(snapshot) => snapshot,
596 Err(_) => zero_watchdog_snapshot(),
597 }
598 }
599
600 pub fn try_health_since(
607 &self,
608 previous: &RingTelemetry,
609 ) -> Result<MegakernelWatchdogSnapshot, PipelineError> {
610 let counters = self.try_runtime_counters()?;
611 let done_delta = self
612 .control
613 .done_count
614 .checked_sub(previous.control.done_count)
615 .ok_or_else(|| {
616 errors::done_counter_backwards(previous.control.done_count, self.control.done_count)
617 })?;
618 let suspected_stall =
619 counters.queue_depth > 0 && done_delta == 0 && counters.fault_slots == 0;
620 Ok(MegakernelWatchdogSnapshot {
621 done_delta,
622 queue_depth: counters.queue_depth,
623 fault_slots: counters.fault_slots,
624 requeue_slots: counters.requeue_slots,
625 gpu_idle_ppm: counters.gpu_idle_ppm,
626 suspected_stall,
627 })
628 }
629
630 pub fn recommend_launch(
636 &self,
637 mut request: MegakernelLaunchRequest,
638 ) -> Result<MegakernelLaunchRecommendation, vyre_driver::BackendError> {
639 let counters = self
640 .try_runtime_counters()
641 .map_err(errors::launch_telemetry_failed)?;
642 if request.graph_node_count == 0 {
643 request.graph_node_count = counters.total_slots;
644 }
645 if request.graph_edge_count == 0 {
646 request.graph_edge_count = counters.queue_depth;
647 }
648 if request.frontier_density_bps == 0 {
649 request.frontier_density_bps = counters.frontier_density_bps;
650 }
651 request.hot_opcode_count = self
652 .control
653 .metrics
654 .iter()
655 .filter(|(_, count)| *count > 0)
656 .count()
657 .try_into()
658 .map_err(errors::hot_opcode_count_overflow)?;
659 let mut hot_window_count = 0usize;
660 for window in &self.windows {
661 let demand = window
662 .required_slots
663 .checked_add(window.lookahead_slots)
664 .ok_or_else(|| {
665 errors::route_window_demand_overflow()
666 })?;
667 if demand >= 4 {
668 hot_window_count = hot_window_count.checked_add(1).ok_or_else(|| {
669 errors::hot_window_count_overflow()
670 })?;
671 }
672 }
673 request.hot_window_count = hot_window_count
674 .try_into()
675 .map_err(errors::hot_window_count_too_wide)?;
676 request.requeue_count = request
677 .requeue_count
678 .checked_add(u64::from(self.occupancy.requeue))
679 .ok_or_else(errors::requeue_count_overflow)?;
680 MegakernelLaunchPolicy::standard().recommend(request)
681 }
682}
683
684#[cfg(any(test, feature = "legacy-infallible"))]
687fn zero_runtime_counters() -> MegakernelRuntimeCounters {
688 MegakernelRuntimeCounters {
689 total_slots: 0,
690 queue_depth: 0,
691 gpu_idle_slots: 0,
692 gpu_idle_ppm: 0,
693 frontier_density_bps: 0,
694 occupancy_proxy_bps: 0,
695 drained_slots: 0,
696 unreclaimed_done_slots: 0,
697 tenant_fairness_total: 0,
698 tenant_fairness_skew: 0,
699 priority_fairness_total: 0,
700 requeue_slots: 0,
701 fault_slots: 0,
702 }
703}
704
705#[cfg(any(test, feature = "legacy-infallible"))]
708fn zero_watchdog_snapshot() -> MegakernelWatchdogSnapshot {
709 MegakernelWatchdogSnapshot {
710 done_delta: 0,
711 queue_depth: 0,
712 fault_slots: 0,
713 requeue_slots: 0,
714 gpu_idle_ppm: 0,
715 suspected_stall: false,
716 }
717}
718
719fn read_required_control_word(control_bytes: &[u8], word_idx: usize) -> Result<u32, PipelineError> {
720 read_word(control_bytes, word_idx).ok_or_else(|| errors::missing_control_word(word_idx))
721}
722
723fn try_density_bps(numerator: u32, denominator: u32) -> Result<u16, PipelineError> {
724 if denominator == 0 {
725 return Ok(0);
726 }
727 let bps = (u64::from(numerator) * 10_000) / u64::from(denominator);
728 u16::try_from(bps.min(u64::from(u16::MAX))).map_err(errors::density_bps_overflow)
729}
730
731fn validate_telemetry_buffers(
732 control_bytes: &[u8],
733 ring_bytes: &[u8],
734) -> Result<(), PipelineError> {
735 validate_control_snapshot(control_bytes)?;
736 let slot_bytes = slot_byte_len()?;
737 if ring_bytes.len() % slot_bytes != 0 {
738 return Err(errors::ring_slot_alignment(ring_bytes.len(), slot_bytes));
739 }
740 let slot_count = ring_bytes.len() / slot_bytes;
741 if u32::try_from(slot_count).is_err() {
742 return Err(errors::ring_slot_count_too_wide(slot_count));
743 }
744 Ok(())
745}
746
747fn validate_control_snapshot(control_bytes: &[u8]) -> Result<(), PipelineError> {
748 let min_control = super::protocol::control_byte_len(0).ok_or_else(|| {
749 errors::control_length_overflow()
750 })?;
751 if control_bytes.len() < min_control || control_bytes.len() % 4 != 0 {
752 return Err(errors::bad_control_snapshot(
753 control_bytes.len(),
754 min_control,
755 ));
756 }
757 Ok(())
758}
759
760fn slot_byte_len() -> Result<usize, PipelineError> {
761 SLOT_WORDS_USIZE.checked_mul(4).ok_or_else(|| {
762 errors::slot_byte_width_overflow()
763 })
764}
765
766fn telemetry_u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
767 usize::try_from(value).map_err(|source| errors::telemetry_u32_to_usize(value, label, source))
768}
769
770fn control_word_index(word: u32) -> Result<usize, PipelineError> {
771 usize::try_from(word).map_err(|source| errors::control_word_index(word, source))
772}
773
774fn control_offset_index(base: u32, offset: u32) -> Result<usize, PipelineError> {
775 let word = base.checked_add(offset).ok_or_else(|| {
776 errors::control_word_offset_overflow()
777 })?;
778 control_word_index(word)
779}
780
781fn try_sum_u32_as_u64(
782 counters: &[u32],
783 label: &'static str,
784 fix: &'static str,
785) -> Result<u64, PipelineError> {
786 counters.iter().try_fold(0u64, |acc, &count| {
787 acc.checked_add(u64::from(count)).ok_or_else(|| {
788 errors::counter_sum_overflow(label, fix)
789 })
790 })
791}
792
793fn try_fairness_skew(counters: &[u32]) -> Result<u32, PipelineError> {
794 let mut min_nonzero = u32::MAX;
795 let mut max = 0u32;
796 for &count in counters {
797 if count != 0 {
798 min_nonzero = min_nonzero.min(count);
799 max = max.max(count);
800 }
801 }
802 if min_nonzero == u32::MAX {
803 Ok(0)
804 } else {
805 max.checked_sub(min_nonzero).ok_or_else(|| {
806 errors::fairness_skew_invalid(max, min_nonzero)
807 })
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 include!("telemetry_tests.rs");
814}