1use crate::accounting::{
10 checked_add_u64_count as checked_add, checked_add_usize_count as checked_add_usize,
11 checked_mul_u64_count as checked_mul, ArithmeticOverflow,
12};
13use crate::numeric::BackendNumericPolicy;
14use crate::reservation_policy::{
15 reserved_typed_vec as reserved_vec, ReservationPolicy, ReusableIndexScratch,
16};
17
18const DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION: ReservationPolicy = ReservationPolicy::new(
19 "device diagnostic aggregation",
20 "shard diagnostic aggregation before launch planning",
21);
22
23const DEVICE_DIAGNOSTIC_AGGREGATION_NUMERIC: BackendNumericPolicy =
24 BackendNumericPolicy::new("device diagnostic aggregation");
25
26#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28pub struct DiagnosticShard {
29 pub shard: u32,
31 pub candidate_items: u64,
33 pub emitted_diagnostics: u64,
35 pub raw_item_bytes: u64,
37 pub diagnostic_record_bytes: u64,
39 pub counter_bytes: u64,
41 pub severity_mask: u32,
43}
44
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
47pub struct DiagnosticCompactRange {
48 pub shard: u32,
50 pub compact_offset: u64,
52 pub records: u64,
54 pub bytes: u64,
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
60pub struct DiagnosticAggregationPlan {
61 pub compact_ranges: Vec<DiagnosticCompactRange>,
63 pub counter_readback_bytes: u64,
65 pub compact_readback_bytes: u64,
67 pub host_readback_bytes: u64,
69 pub raw_candidate_readback_bytes: u64,
71 pub avoided_readback_bytes: u64,
73 pub compression_ratio_bps: u32,
75 pub overflow_records: u64,
77 pub requires_overflow_flag: bool,
79 pub requires_device_prefix_scan: bool,
81 pub final_only_host_readback: bool,
83}
84
85#[derive(Debug, Default)]
87pub struct DiagnosticAggregationScratch {
88 index_scratch: ReusableIndexScratch<u32>,
89}
90
91impl DiagnosticAggregationScratch {
92 #[must_use]
94 pub fn new() -> Self {
95 Self::default()
96 }
97
98 pub fn try_with_capacity(shard_count: usize) -> Result<Self, DiagnosticAggregationError> {
104 let mut scratch = Self::default();
105 scratch.try_reserve_shards(shard_count)?;
106 Ok(scratch)
107 }
108
109 pub fn try_reserve_shards(
115 &mut self,
116 shard_count: usize,
117 ) -> Result<(), DiagnosticAggregationError> {
118 self.index_scratch.try_reserve_with(
119 DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION,
120 shard_count,
121 "scratch.ids",
122 "scratch.ordered_indices",
123 storage_reserve_failed,
124 )
125 }
126
127 #[must_use]
129 pub fn id_capacity(&self) -> usize {
130 self.index_scratch.seen_capacity()
131 }
132
133 #[must_use]
135 pub fn ordered_index_capacity(&self) -> usize {
136 self.index_scratch.ordered_index_capacity()
137 }
138}
139
140#[derive(Clone, Debug, Eq, PartialEq)]
142pub enum DiagnosticAggregationError {
143 DuplicateShard {
145 shard: u32,
147 },
148 ZeroCandidates {
150 shard: u32,
152 },
153 ZeroRawItemBytes {
155 shard: u32,
157 },
158 ZeroDiagnosticRecordBytes {
160 shard: u32,
162 },
163 EmittedExceedsCandidates {
165 shard: u32,
167 emitted_diagnostics: u64,
169 candidate_items: u64,
171 },
172 MissingSeverityMask {
174 shard: u32,
176 },
177 ZeroRecordCap,
179 ByteCountOverflow {
181 field: &'static str,
183 },
184 OverBudget {
186 required_bytes: u64,
188 budget_bytes: u64,
190 },
191 StorageReserveFailed {
193 field: &'static str,
195 requested: usize,
197 message: String,
199 },
200}
201
202impl ArithmeticOverflow for DiagnosticAggregationError {
203 fn arithmetic_overflow(field: &'static str) -> Self {
204 Self::ByteCountOverflow { field }
205 }
206}
207
208impl std::fmt::Display for DiagnosticAggregationError {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 match self {
211 Self::DuplicateShard { shard } => write!(
212 f,
213 "device diagnostic aggregation received duplicate shard {shard}. Fix: assign unique diagnostic shard ids before device compaction."
214 ),
215 Self::ZeroCandidates { shard } => write!(
216 f,
217 "device diagnostic shard {shard} emitted diagnostics with zero candidates. Fix: emit diagnostic shards only after device candidate classification."
218 ),
219 Self::ZeroRawItemBytes { shard } => write!(
220 f,
221 "device diagnostic shard {shard} has raw_item_bytes=0. Fix: pass the concrete token/fact candidate ABI width."
222 ),
223 Self::ZeroDiagnosticRecordBytes { shard } => write!(
224 f,
225 "device diagnostic shard {shard} has diagnostic_record_bytes=0. Fix: pass the compact diagnostic record ABI width."
226 ),
227 Self::EmittedExceedsCandidates {
228 shard,
229 emitted_diagnostics,
230 candidate_items,
231 } => write!(
232 f,
233 "device diagnostic shard {shard} emitted {emitted_diagnostics} diagnostics from {candidate_items} candidates. Fix: clamp emission to the device candidate count or split the shard."
234 ),
235 Self::MissingSeverityMask { shard } => write!(
236 f,
237 "device diagnostic shard {shard} emitted diagnostics without a severity/category mask. Fix: preserve diagnostic class bits during device aggregation."
238 ),
239 Self::ZeroRecordCap => write!(
240 f,
241 "device diagnostic aggregation received a zero per-shard record cap. Fix: set an explicit compact diagnostic cap before launch."
242 ),
243 Self::ByteCountOverflow { field } => write!(
244 f,
245 "device diagnostic aggregation overflowed while computing {field}. Fix: shard diagnostic aggregation before readback planning."
246 ),
247 Self::OverBudget {
248 required_bytes,
249 budget_bytes,
250 } => write!(
251 f,
252 "device diagnostic aggregation requires {required_bytes} bytes but budget allows {budget_bytes}. Fix: reduce per-shard caps, split shards, or raise the explicit device budget."
253 ),
254 Self::StorageReserveFailed {
255 field,
256 requested,
257 message,
258 } => write!(
259 f,
260 "device diagnostic aggregation failed to reserve {field} for {requested} entries: {message}. Fix: shard diagnostic aggregation before launch planning."
261 ),
262 }
263 }
264}
265
266impl std::error::Error for DiagnosticAggregationError {}
267
268pub fn plan_device_diagnostic_aggregation(
276 shards: &[DiagnosticShard],
277 max_records_per_shard: u64,
278 budget_bytes: u64,
279) -> Result<DiagnosticAggregationPlan, DiagnosticAggregationError> {
280 let mut scratch = DiagnosticAggregationScratch::try_with_capacity(shards.len())?;
281 plan_device_diagnostic_aggregation_with_scratch(
282 shards,
283 max_records_per_shard,
284 budget_bytes,
285 &mut scratch,
286 )
287}
288
289pub fn plan_device_diagnostic_aggregation_with_scratch(
297 shards: &[DiagnosticShard],
298 max_records_per_shard: u64,
299 budget_bytes: u64,
300 scratch: &mut DiagnosticAggregationScratch,
301) -> Result<DiagnosticAggregationPlan, DiagnosticAggregationError> {
302 if max_records_per_shard == 0 {
303 return Err(DiagnosticAggregationError::ZeroRecordCap);
304 }
305
306 scratch.index_scratch.clear();
307 scratch.try_reserve_shards(shards.len())?;
308 let mut counter_readback_bytes = 0_u64;
309 let mut compact_readback_bytes = 0_u64;
310 let mut raw_candidate_readback_bytes = 0_u64;
311 let mut overflow_records = 0_u64;
312 let mut non_empty_diagnostic_shards = 0usize;
313
314 for (index, shard) in shards.iter().copied().enumerate() {
315 validate_shard(shard, &mut scratch.index_scratch)?;
316
317 let raw_bytes = checked_mul(
318 shard.candidate_items,
319 shard.raw_item_bytes,
320 "raw candidate readback bytes",
321 )?;
322 raw_candidate_readback_bytes = checked_add(
323 raw_candidate_readback_bytes,
324 raw_bytes,
325 "total raw candidate readback bytes",
326 )?;
327 counter_readback_bytes = checked_add(
328 counter_readback_bytes,
329 shard.counter_bytes,
330 "counter readback bytes",
331 )?;
332 if shard.emitted_diagnostics != 0 {
333 non_empty_diagnostic_shards = checked_add_usize(
334 non_empty_diagnostic_shards,
335 1,
336 "non-empty diagnostic shard count",
337 )?;
338 }
339 scratch.index_scratch.push_index(index);
340 }
341 scratch
342 .index_scratch
343 .sort_indices_unstable_by_key_if_needed(|index| shards[index].shard);
344
345 let mut compact_ranges =
346 reserved_aggregation_vec(non_empty_diagnostic_shards, "compact_ranges")?;
347
348 for &index in scratch.index_scratch.ordered_indices() {
349 let shard = shards[index];
350 if shard.emitted_diagnostics == 0 {
351 continue;
352 }
353
354 let compact_records = shard.emitted_diagnostics.min(max_records_per_shard);
355 let omitted = shard.emitted_diagnostics - compact_records;
356 overflow_records = checked_add(overflow_records, omitted, "overflow records")?;
357 let compact_bytes = checked_mul(
358 compact_records,
359 shard.diagnostic_record_bytes,
360 "compact diagnostic bytes",
361 )?;
362 compact_ranges.push(DiagnosticCompactRange {
363 shard: shard.shard,
364 compact_offset: compact_readback_bytes,
365 records: compact_records,
366 bytes: compact_bytes,
367 });
368 compact_readback_bytes = checked_add(
369 compact_readback_bytes,
370 compact_bytes,
371 "total compact diagnostic bytes",
372 )?;
373 }
374
375 let host_readback_bytes = checked_add(
376 counter_readback_bytes,
377 compact_readback_bytes,
378 "host diagnostic readback bytes",
379 )?;
380 if host_readback_bytes > budget_bytes {
381 return Err(DiagnosticAggregationError::OverBudget {
382 required_bytes: host_readback_bytes,
383 budget_bytes,
384 });
385 }
386 let compression_ratio_bps =
387 diagnostic_compression_ratio_bps(host_readback_bytes, raw_candidate_readback_bytes);
388
389 Ok(DiagnosticAggregationPlan {
390 compact_ranges,
391 counter_readback_bytes,
392 compact_readback_bytes,
393 host_readback_bytes,
394 raw_candidate_readback_bytes,
395 avoided_readback_bytes: avoided_readback_bytes(
396 raw_candidate_readback_bytes,
397 host_readback_bytes,
398 ),
399 compression_ratio_bps,
400 overflow_records,
401 requires_overflow_flag: overflow_records != 0,
402 requires_device_prefix_scan: non_empty_diagnostic_shards > 1,
403 final_only_host_readback: true,
404 })
405}
406
407fn validate_shard(
408 shard: DiagnosticShard,
409 scratch: &mut ReusableIndexScratch<u32>,
410) -> Result<(), DiagnosticAggregationError> {
411 if !scratch.insert_seen(shard.shard) {
412 return Err(DiagnosticAggregationError::DuplicateShard { shard: shard.shard });
413 }
414 if shard.raw_item_bytes == 0 {
415 return Err(DiagnosticAggregationError::ZeroRawItemBytes { shard: shard.shard });
416 }
417 if shard.emitted_diagnostics > shard.candidate_items {
418 return Err(DiagnosticAggregationError::EmittedExceedsCandidates {
419 shard: shard.shard,
420 emitted_diagnostics: shard.emitted_diagnostics,
421 candidate_items: shard.candidate_items,
422 });
423 }
424 if shard.emitted_diagnostics != 0 && shard.candidate_items == 0 {
425 return Err(DiagnosticAggregationError::ZeroCandidates { shard: shard.shard });
426 }
427 if shard.emitted_diagnostics != 0 && shard.diagnostic_record_bytes == 0 {
428 return Err(DiagnosticAggregationError::ZeroDiagnosticRecordBytes { shard: shard.shard });
429 }
430 if shard.emitted_diagnostics != 0 && shard.severity_mask == 0 {
431 return Err(DiagnosticAggregationError::MissingSeverityMask { shard: shard.shard });
432 }
433 Ok(())
434}
435
436#[must_use]
438pub fn diagnostic_compression_ratio_bps(
439 host_readback_bytes: u64,
440 raw_candidate_readback_bytes: u64,
441) -> u32 {
442 DEVICE_DIAGNOSTIC_AGGREGATION_NUMERIC.ratio_basis_points_u64(
443 host_readback_bytes,
444 raw_candidate_readback_bytes,
445 0,
446 "diagnostic compression ratio",
447 )
448}
449
450fn avoided_readback_bytes(raw_candidate_readback_bytes: u64, host_readback_bytes: u64) -> u64 {
451 if raw_candidate_readback_bytes >= host_readback_bytes {
452 raw_candidate_readback_bytes - host_readback_bytes
453 } else {
454 0
455 }
456}
457
458fn reserved_aggregation_vec<T>(
459 capacity: usize,
460 field: &'static str,
461) -> Result<Vec<T>, DiagnosticAggregationError> {
462 reserved_vec(
463 DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION,
464 capacity,
465 field,
466 storage_reserve_failed,
467 )
468}
469
470fn storage_reserve_failed(
471 field: &'static str,
472 requested: usize,
473 message: String,
474) -> DiagnosticAggregationError {
475 DiagnosticAggregationError::StorageReserveFailed {
476 field,
477 requested,
478 message,
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn diagnostic_aggregation_compacts_sparse_device_diagnostics() {
488 let plan = plan_device_diagnostic_aggregation(
489 &[
490 shard(2, 2_000, 4, 32, 24, 16, 0b010),
491 shard(1, 1_000, 2, 32, 24, 16, 0b001),
492 shard(3, 4_000, 0, 32, 24, 16, 0),
493 ],
494 64,
495 1_024,
496 )
497 .expect("Fix: sparse diagnostics should aggregate on device");
498
499 assert_eq!(
500 plan.compact_ranges,
501 vec![
502 DiagnosticCompactRange {
503 shard: 1,
504 compact_offset: 0,
505 records: 2,
506 bytes: 48,
507 },
508 DiagnosticCompactRange {
509 shard: 2,
510 compact_offset: 48,
511 records: 4,
512 bytes: 96,
513 },
514 ]
515 );
516 assert_eq!(plan.counter_readback_bytes, 48);
517 assert_eq!(plan.compact_readback_bytes, 144);
518 assert_eq!(plan.host_readback_bytes, 192);
519 assert_eq!(plan.raw_candidate_readback_bytes, 224_000);
520 assert_eq!(plan.avoided_readback_bytes, 223_808);
521 assert!(plan.compression_ratio_bps < 10);
522 assert!(plan.requires_device_prefix_scan);
523 assert!(plan.final_only_host_readback);
524 }
525
526 #[test]
527 fn diagnostic_aggregation_caps_overflow_without_host_filtering() {
528 let plan =
529 plan_device_diagnostic_aggregation(&[shard(7, 1_000, 10, 32, 16, 8, 0b111)], 3, 128)
530 .expect("Fix: overflow should be represented by device-side flags");
531
532 assert_eq!(plan.compact_ranges[0].records, 3);
533 assert_eq!(plan.overflow_records, 7);
534 assert!(plan.requires_overflow_flag);
535 assert_eq!(plan.host_readback_bytes, 56);
536 assert!(
537 !plan.requires_device_prefix_scan,
538 "Fix: a single non-empty diagnostic shard has compact offset zero and must not schedule a device prefix scan."
539 );
540 }
541
542 #[test]
543 fn diagnostic_aggregation_ratio_does_not_saturate_before_division() {
544 let plan = plan_device_diagnostic_aggregation(
545 &[shard(9, u64::MAX / 32, 1, 32, 16, u64::MAX / 20, 0b001)],
546 1,
547 u64::MAX,
548 )
549 .expect("Fix: large diagnostic plans must retain exact ratio arithmetic");
550
551 let expected = (((plan.host_readback_bytes as u128) * 10_000)
552 / plan.raw_candidate_readback_bytes as u128) as u32;
553 assert_eq!(plan.compression_ratio_bps, expected);
554 assert!(plan.compression_ratio_bps > 100);
555 }
556
557 #[test]
558 fn diagnostic_aggregation_rejects_invalid_or_cpu_shaped_inputs() {
559 assert_eq!(
560 plan_device_diagnostic_aggregation(
561 &[shard(1, 8, 1, 32, 24, 8, 1), shard(1, 8, 1, 32, 24, 8, 1)],
562 4,
563 1_024,
564 )
565 .expect_err("duplicate shard should fail"),
566 DiagnosticAggregationError::DuplicateShard { shard: 1 }
567 );
568 assert_eq!(
569 plan_device_diagnostic_aggregation(&[shard(2, 8, 9, 32, 24, 8, 1)], 4, 1_024)
570 .expect_err("emitted diagnostics cannot exceed candidates"),
571 DiagnosticAggregationError::EmittedExceedsCandidates {
572 shard: 2,
573 emitted_diagnostics: 9,
574 candidate_items: 8,
575 }
576 );
577 assert_eq!(
578 plan_device_diagnostic_aggregation(&[shard(3, 8, 1, 32, 24, 8, 0)], 4, 1_024)
579 .expect_err("diagnostics must retain class mask"),
580 DiagnosticAggregationError::MissingSeverityMask { shard: 3 }
581 );
582 assert_eq!(
583 plan_device_diagnostic_aggregation(&[shard(4, 8, 1, 32, 24, 8, 1)], 4, 16)
584 .expect_err("over budget plan should fail"),
585 DiagnosticAggregationError::OverBudget {
586 required_bytes: 32,
587 budget_bytes: 16,
588 }
589 );
590 }
591
592 #[test]
593 fn diagnostic_aggregation_reports_zero_avoided_bytes_when_counters_exceed_raw_stream() {
594 let plan = plan_device_diagnostic_aggregation(
595 &[shard(1, 1, 0, 1, 8, 64, 0)],
596 1,
597 128,
598 )
599 .expect("Fix: diagnostic aggregation should report negative savings as zero avoided bytes, not fail with underflow");
600
601 assert_eq!(plan.raw_candidate_readback_bytes, 1);
602 assert_eq!(plan.host_readback_bytes, 64);
603 assert_eq!(plan.avoided_readback_bytes, 0);
604 assert_eq!(plan.compression_ratio_bps, 640_000);
605 assert!(plan.final_only_host_readback);
606 }
607
608 #[test]
609 fn diagnostic_aggregation_avoids_tree_sets_and_shard_vector_copies() {
610 let src = include_str!("device_diagnostic_aggregation.rs");
611 assert!(
612 !src.contains(concat!("BTree", "Set")),
613 "Fix: diagnostic aggregation should hash shard ids and sort compact-readback indices once."
614 );
615 assert!(
616 !src.contains(concat!("shards", ".to_vec()")),
617 "Fix: diagnostic aggregation should not copy all shard records before final compact-range ordering."
618 );
619 assert!(
620 src.contains("fn avoided_readback_bytes(")
621 && src.contains("raw_candidate_readback_bytes >= host_readback_bytes"),
622 "Fix: avoided-readback telemetry must explicitly clamp negative savings to zero after checked host/raw accounting."
623 );
624 assert!(
625 src.contains("DiagnosticAggregationScratch::try_with_capacity(shards.len())?"),
626 "Fix: diagnostic aggregation must stage scratch with fallible release-path allocation."
627 );
628 assert!(
629 src.contains("scratch.try_reserve_shards(shards.len())?"),
630 "Fix: caller-owned diagnostic aggregation scratch must grow through fallible reservation."
631 );
632 assert!(
633 src.contains("ReusableIndexScratch"),
634 "Fix: diagnostic aggregation duplicate detection and ordering scratch must share the paired typed fallible reservation helper."
635 );
636 assert!(
637 src.contains("StorageReserveFailed"),
638 "Fix: diagnostic aggregation allocation failures must surface as actionable launch-planning errors."
639 );
640 assert!(
641 !src.contains(concat!("FxHashSet::with_capacity", "_and_hasher")),
642 "Fix: diagnostic aggregation scratch hash storage must not allocate infallibly."
643 );
644 assert!(
645 !src.contains(concat!("Vec::with_capacity", "(shard_count)"))
646 && !src.contains(concat!("Vec::with_capacity", "(shards.len())")),
647 "Fix: diagnostic aggregation scratch/result vectors must not allocate infallibly."
648 );
649 }
650
651 #[test]
652 fn diagnostic_aggregation_reuses_caller_owned_shard_planning_scratch() {
653 let mut scratch =
654 DiagnosticAggregationScratch::try_with_capacity(128).expect("Fix: scratch capacity");
655 let wide = (0..128)
656 .rev()
657 .map(|index| shard(index, 1_024, 1, 32, 16, 8, 1))
658 .collect::<Vec<_>>();
659 let first =
660 plan_device_diagnostic_aggregation_with_scratch(&wide, 4, 1 << 20, &mut scratch)
661 .expect("Fix: wide diagnostic aggregation should plan with reusable scratch");
662 let id_capacity = scratch.id_capacity();
663 let ordered_index_capacity = scratch.ordered_index_capacity();
664
665 assert_eq!(first.compact_ranges.len(), 128);
666 assert_eq!(first.compact_ranges[0].shard, 0);
667
668 let second = plan_device_diagnostic_aggregation_with_scratch(
669 &[
670 shard(9, 1_000, 0, 32, 24, 16, 0),
671 shard(3, 1_000, 7, 32, 24, 16, 1),
672 ],
673 3,
674 1 << 20,
675 &mut scratch,
676 )
677 .expect("Fix: smaller diagnostic aggregation should reuse previous scratch");
678
679 assert_eq!(second.compact_ranges[0].shard, 3);
680 assert_eq!(second.overflow_records, 4);
681 assert!(scratch.id_capacity() >= id_capacity);
682 assert!(scratch.ordered_index_capacity() >= ordered_index_capacity);
683 }
684
685 #[test]
686 fn generated_diagnostic_aggregation_profiles_preserve_exact_telemetry_for_4096_shapes() {
687 let mut scratch = DiagnosticAggregationScratch::default();
688 for shard_count in 1u32..=128 {
689 for cap in 1u64..=32 {
690 let shards = (0..shard_count)
691 .rev()
692 .map(|id| {
693 let candidates = u64::from((id % 19) + 1) * 8;
694 let emitted = u64::from(id % 7);
695 shard(
696 id,
697 candidates,
698 emitted.min(candidates),
699 16,
700 12,
701 8,
702 if emitted == 0 { 0 } else { 1 << (id % 8) },
703 )
704 })
705 .collect::<Vec<_>>();
706
707 let plan = plan_device_diagnostic_aggregation_with_scratch(
708 &shards,
709 cap,
710 u64::MAX,
711 &mut scratch,
712 )
713 .expect("Fix: generated diagnostic aggregation profile should plan");
714
715 let expected_raw = shards
716 .iter()
717 .map(|shard| shard.candidate_items * shard.raw_item_bytes)
718 .sum::<u64>();
719 let expected_counter = shards.iter().map(|shard| shard.counter_bytes).sum::<u64>();
720 let expected_compact = shards
721 .iter()
722 .map(|shard| shard.emitted_diagnostics.min(cap) * shard.diagnostic_record_bytes)
723 .sum::<u64>();
724 assert_eq!(plan.raw_candidate_readback_bytes, expected_raw);
725 assert_eq!(plan.counter_readback_bytes, expected_counter);
726 assert_eq!(plan.compact_readback_bytes, expected_compact);
727 assert_eq!(
728 plan.host_readback_bytes,
729 expected_counter + expected_compact
730 );
731 assert!(plan
732 .compact_ranges
733 .windows(2)
734 .all(|pair| pair[0].shard < pair[1].shard));
735 assert!(plan.final_only_host_readback);
736 }
737 }
738 }
739
740 #[test]
741 fn diagnostic_aggregation_production_ratio_path_does_not_panic() {
742 let source = include_str!("device_diagnostic_aggregation.rs");
743 let production = source
744 .split("#[cfg(test)]")
745 .next()
746 .expect("Fix: diagnostic aggregation source must contain production section");
747 assert!(
748 !production.contains(".expect(")
749 && !production.contains(concat!("panic", "!("))
750 && !production.contains(".unwrap_or_else("),
751 "Fix: diagnostic aggregation production planning must return errors or bounded telemetry instead of panicking."
752 );
753 assert_eq!(
754 diagnostic_compression_ratio_bps(u64::MAX, 1),
755 u32::MAX,
756 "Fix: diagnostic compression telemetry must remain bounded when a pathological ratio exceeds export width."
757 );
758 }
759
760 fn shard(
761 shard: u32,
762 candidate_items: u64,
763 emitted_diagnostics: u64,
764 raw_item_bytes: u64,
765 diagnostic_record_bytes: u64,
766 counter_bytes: u64,
767 severity_mask: u32,
768 ) -> DiagnosticShard {
769 DiagnosticShard {
770 shard,
771 candidate_items,
772 emitted_diagnostics,
773 raw_item_bytes,
774 diagnostic_record_bytes,
775 counter_bytes,
776 severity_mask,
777 }
778 }
779}