Skip to main content

vyre_driver/
device_diagnostic_aggregation.rs

1//! Backend-neutral device-side diagnostic aggregation planning.
2//!
3//! Frontend diagnostics are sparse relative to token/fact streams. Reading the
4//! whole candidate stream back to the host and filtering on CPU is release-path
5//! wrong: it moves bytes that the device already proved irrelevant. This module
6//! plans resident counter and compact-record slabs for device-side diagnostic
7//! aggregation, then final-only readback of counters and compact records.
8
9use crate::accounting::{
10    checked_add_u64_count as checked_add, checked_add_usize_count as checked_add_usize,
11    checked_mul_u64_count as checked_mul, ArithmeticOverflow,
12};
13use crate::numeric::BackendNumericPolicy;
14use crate::reservation_policy::{
15    reserved_typed_vec as reserved_vec, ReservationPolicy, ReusableIndexScratch,
16};
17
18const DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION: ReservationPolicy = ReservationPolicy::new(
19    "device diagnostic aggregation",
20    "shard diagnostic aggregation before launch planning",
21);
22
23const DEVICE_DIAGNOSTIC_AGGREGATION_NUMERIC: BackendNumericPolicy =
24    BackendNumericPolicy::new("device diagnostic aggregation");
25
26/// One device-resident diagnostic shard before aggregation.
27#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28pub struct DiagnosticShard {
29    /// Stable shard id.
30    pub shard: u32,
31    /// Candidate token/fact items inspected by the device.
32    pub candidate_items: u64,
33    /// Diagnostics emitted by the device for this shard.
34    pub emitted_diagnostics: u64,
35    /// Bytes per candidate item in the unaggregated stream.
36    pub raw_item_bytes: u64,
37    /// Bytes per compact diagnostic record.
38    pub diagnostic_record_bytes: u64,
39    /// Bytes for device-side counters and overflow flags for this shard.
40    pub counter_bytes: u64,
41    /// Non-zero severity/category mask represented by emitted diagnostics.
42    pub severity_mask: u32,
43}
44
45/// One compact diagnostic readback range.
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
47pub struct DiagnosticCompactRange {
48    /// Source shard id.
49    pub shard: u32,
50    /// Offset in the compact diagnostic slab.
51    pub compact_offset: u64,
52    /// Diagnostics represented in this range.
53    pub records: u64,
54    /// Bytes copied into the compact diagnostic slab.
55    pub bytes: u64,
56}
57
58/// Device diagnostic aggregation plan.
59#[derive(Clone, Debug, Eq, PartialEq)]
60pub struct DiagnosticAggregationPlan {
61    /// Compact readback ranges ordered by shard id.
62    pub compact_ranges: Vec<DiagnosticCompactRange>,
63    /// Total counter/overflow bytes read by the host.
64    pub counter_readback_bytes: u64,
65    /// Total compact diagnostic record bytes read by the host.
66    pub compact_readback_bytes: u64,
67    /// Total host readback bytes after device aggregation.
68    pub host_readback_bytes: u64,
69    /// Bytes that would have been read by a raw candidate-stream readback.
70    pub raw_candidate_readback_bytes: u64,
71    /// Bytes avoided by aggregating on device.
72    pub avoided_readback_bytes: u64,
73    /// Aggregate compression ratio in basis points.
74    pub compression_ratio_bps: u32,
75    /// Diagnostics omitted because per-shard caps were reached.
76    pub overflow_records: u64,
77    /// Whether any shard needs a device-side overflow flag.
78    pub requires_overflow_flag: bool,
79    /// Whether aggregation requires a device-side prefix scan over records.
80    pub requires_device_prefix_scan: bool,
81    /// This plan never requires host participation before final readback.
82    pub final_only_host_readback: bool,
83}
84
85/// Caller-owned scratch for repeated device diagnostic aggregation planning.
86#[derive(Debug, Default)]
87pub struct DiagnosticAggregationScratch {
88    index_scratch: ReusableIndexScratch<u32>,
89}
90
91impl DiagnosticAggregationScratch {
92    /// Allocate empty reusable diagnostic aggregation scratch.
93    #[must_use]
94    pub fn new() -> Self {
95        Self::default()
96    }
97
98    /// Allocate reusable diagnostic aggregation scratch for a known shard count.
99    ///
100    /// # Errors
101    ///
102    /// Returns [`DiagnosticAggregationError`] when scratch storage cannot be reserved.
103    pub fn try_with_capacity(shard_count: usize) -> Result<Self, DiagnosticAggregationError> {
104        let mut scratch = Self::default();
105        scratch.try_reserve_shards(shard_count)?;
106        Ok(scratch)
107    }
108
109    /// Reserve reusable diagnostic aggregation scratch for a known shard count.
110    ///
111    /// # Errors
112    ///
113    /// Returns [`DiagnosticAggregationError`] when scratch storage cannot be reserved.
114    pub fn try_reserve_shards(
115        &mut self,
116        shard_count: usize,
117    ) -> Result<(), DiagnosticAggregationError> {
118        self.index_scratch.try_reserve_with(
119            DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION,
120            shard_count,
121            "scratch.ids",
122            "scratch.ordered_indices",
123            storage_reserve_failed,
124        )
125    }
126
127    /// Retained duplicate-detection capacity.
128    #[must_use]
129    pub fn id_capacity(&self) -> usize {
130        self.index_scratch.seen_capacity()
131    }
132
133    /// Retained shard-ordering capacity.
134    #[must_use]
135    pub fn ordered_index_capacity(&self) -> usize {
136        self.index_scratch.ordered_index_capacity()
137    }
138}
139
140/// Device diagnostic aggregation planning errors.
141#[derive(Clone, Debug, Eq, PartialEq)]
142pub enum DiagnosticAggregationError {
143    /// Duplicate shard id.
144    DuplicateShard {
145        /// Duplicate shard id.
146        shard: u32,
147    },
148    /// Candidate items cannot be zero for an emitted shard.
149    ZeroCandidates {
150        /// Invalid shard id.
151        shard: u32,
152    },
153    /// Raw candidate ABI width must be non-zero.
154    ZeroRawItemBytes {
155        /// Invalid shard id.
156        shard: u32,
157    },
158    /// Compact diagnostic ABI width must be non-zero when diagnostics exist.
159    ZeroDiagnosticRecordBytes {
160        /// Invalid shard id.
161        shard: u32,
162    },
163    /// Diagnostic count cannot exceed inspected candidate items.
164    EmittedExceedsCandidates {
165        /// Invalid shard id.
166        shard: u32,
167        /// Emitted diagnostics.
168        emitted_diagnostics: u64,
169        /// Candidate items.
170        candidate_items: u64,
171    },
172    /// Non-empty diagnostic shards need a non-zero severity/category mask.
173    MissingSeverityMask {
174        /// Invalid shard id.
175        shard: u32,
176    },
177    /// Per-shard compact cap cannot be zero.
178    ZeroRecordCap,
179    /// Byte arithmetic overflowed.
180    ByteCountOverflow {
181        /// Field being computed.
182        field: &'static str,
183    },
184    /// Aggregation slabs exceed the explicit device budget.
185    OverBudget {
186        /// Required resident/readback bytes.
187        required_bytes: u64,
188        /// Caller-provided budget.
189        budget_bytes: u64,
190    },
191    /// Scratch or result-vector storage reservation failed before launch planning.
192    StorageReserveFailed {
193        /// Field being reserved.
194        field: &'static str,
195        /// Requested total capacity.
196        requested: usize,
197        /// Allocator failure details.
198        message: String,
199    },
200}
201
202impl ArithmeticOverflow for DiagnosticAggregationError {
203    fn arithmetic_overflow(field: &'static str) -> Self {
204        Self::ByteCountOverflow { field }
205    }
206}
207
208impl std::fmt::Display for DiagnosticAggregationError {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        match self {
211            Self::DuplicateShard { shard } => write!(
212                f,
213                "device diagnostic aggregation received duplicate shard {shard}. Fix: assign unique diagnostic shard ids before device compaction."
214            ),
215            Self::ZeroCandidates { shard } => write!(
216                f,
217                "device diagnostic shard {shard} emitted diagnostics with zero candidates. Fix: emit diagnostic shards only after device candidate classification."
218            ),
219            Self::ZeroRawItemBytes { shard } => write!(
220                f,
221                "device diagnostic shard {shard} has raw_item_bytes=0. Fix: pass the concrete token/fact candidate ABI width."
222            ),
223            Self::ZeroDiagnosticRecordBytes { shard } => write!(
224                f,
225                "device diagnostic shard {shard} has diagnostic_record_bytes=0. Fix: pass the compact diagnostic record ABI width."
226            ),
227            Self::EmittedExceedsCandidates {
228                shard,
229                emitted_diagnostics,
230                candidate_items,
231            } => write!(
232                f,
233                "device diagnostic shard {shard} emitted {emitted_diagnostics} diagnostics from {candidate_items} candidates. Fix: clamp emission to the device candidate count or split the shard."
234            ),
235            Self::MissingSeverityMask { shard } => write!(
236                f,
237                "device diagnostic shard {shard} emitted diagnostics without a severity/category mask. Fix: preserve diagnostic class bits during device aggregation."
238            ),
239            Self::ZeroRecordCap => write!(
240                f,
241                "device diagnostic aggregation received a zero per-shard record cap. Fix: set an explicit compact diagnostic cap before launch."
242            ),
243            Self::ByteCountOverflow { field } => write!(
244                f,
245                "device diagnostic aggregation overflowed while computing {field}. Fix: shard diagnostic aggregation before readback planning."
246            ),
247            Self::OverBudget {
248                required_bytes,
249                budget_bytes,
250            } => write!(
251                f,
252                "device diagnostic aggregation requires {required_bytes} bytes but budget allows {budget_bytes}. Fix: reduce per-shard caps, split shards, or raise the explicit device budget."
253            ),
254            Self::StorageReserveFailed {
255                field,
256                requested,
257                message,
258            } => write!(
259                f,
260                "device diagnostic aggregation failed to reserve {field} for {requested} entries: {message}. Fix: shard diagnostic aggregation before launch planning."
261            ),
262        }
263    }
264}
265
266impl std::error::Error for DiagnosticAggregationError {}
267
268/// Plan device-side diagnostic aggregation and final-only compact readback.
269///
270/// # Errors
271///
272/// Returns [`DiagnosticAggregationError`] when shards are invalid, byte
273/// accounting overflows, the explicit budget is exceeded, or planner storage
274/// cannot be reserved.
275pub fn plan_device_diagnostic_aggregation(
276    shards: &[DiagnosticShard],
277    max_records_per_shard: u64,
278    budget_bytes: u64,
279) -> Result<DiagnosticAggregationPlan, DiagnosticAggregationError> {
280    let mut scratch = DiagnosticAggregationScratch::try_with_capacity(shards.len())?;
281    plan_device_diagnostic_aggregation_with_scratch(
282        shards,
283        max_records_per_shard,
284        budget_bytes,
285        &mut scratch,
286    )
287}
288
289/// Plan device-side diagnostic aggregation using caller-owned temporary storage.
290///
291/// # Errors
292///
293/// Returns [`DiagnosticAggregationError`] when shards are invalid, byte
294/// accounting overflows, the explicit budget is exceeded, or planner storage
295/// cannot be reserved.
296pub fn plan_device_diagnostic_aggregation_with_scratch(
297    shards: &[DiagnosticShard],
298    max_records_per_shard: u64,
299    budget_bytes: u64,
300    scratch: &mut DiagnosticAggregationScratch,
301) -> Result<DiagnosticAggregationPlan, DiagnosticAggregationError> {
302    if max_records_per_shard == 0 {
303        return Err(DiagnosticAggregationError::ZeroRecordCap);
304    }
305
306    scratch.index_scratch.clear();
307    scratch.try_reserve_shards(shards.len())?;
308    let mut counter_readback_bytes = 0_u64;
309    let mut compact_readback_bytes = 0_u64;
310    let mut raw_candidate_readback_bytes = 0_u64;
311    let mut overflow_records = 0_u64;
312    let mut non_empty_diagnostic_shards = 0usize;
313
314    for (index, shard) in shards.iter().copied().enumerate() {
315        validate_shard(shard, &mut scratch.index_scratch)?;
316
317        let raw_bytes = checked_mul(
318            shard.candidate_items,
319            shard.raw_item_bytes,
320            "raw candidate readback bytes",
321        )?;
322        raw_candidate_readback_bytes = checked_add(
323            raw_candidate_readback_bytes,
324            raw_bytes,
325            "total raw candidate readback bytes",
326        )?;
327        counter_readback_bytes = checked_add(
328            counter_readback_bytes,
329            shard.counter_bytes,
330            "counter readback bytes",
331        )?;
332        if shard.emitted_diagnostics != 0 {
333            non_empty_diagnostic_shards = checked_add_usize(
334                non_empty_diagnostic_shards,
335                1,
336                "non-empty diagnostic shard count",
337            )?;
338        }
339        scratch.index_scratch.push_index(index);
340    }
341    scratch
342        .index_scratch
343        .sort_indices_unstable_by_key_if_needed(|index| shards[index].shard);
344
345    let mut compact_ranges =
346        reserved_aggregation_vec(non_empty_diagnostic_shards, "compact_ranges")?;
347
348    for &index in scratch.index_scratch.ordered_indices() {
349        let shard = shards[index];
350        if shard.emitted_diagnostics == 0 {
351            continue;
352        }
353
354        let compact_records = shard.emitted_diagnostics.min(max_records_per_shard);
355        let omitted = shard.emitted_diagnostics - compact_records;
356        overflow_records = checked_add(overflow_records, omitted, "overflow records")?;
357        let compact_bytes = checked_mul(
358            compact_records,
359            shard.diagnostic_record_bytes,
360            "compact diagnostic bytes",
361        )?;
362        compact_ranges.push(DiagnosticCompactRange {
363            shard: shard.shard,
364            compact_offset: compact_readback_bytes,
365            records: compact_records,
366            bytes: compact_bytes,
367        });
368        compact_readback_bytes = checked_add(
369            compact_readback_bytes,
370            compact_bytes,
371            "total compact diagnostic bytes",
372        )?;
373    }
374
375    let host_readback_bytes = checked_add(
376        counter_readback_bytes,
377        compact_readback_bytes,
378        "host diagnostic readback bytes",
379    )?;
380    if host_readback_bytes > budget_bytes {
381        return Err(DiagnosticAggregationError::OverBudget {
382            required_bytes: host_readback_bytes,
383            budget_bytes,
384        });
385    }
386    let compression_ratio_bps =
387        diagnostic_compression_ratio_bps(host_readback_bytes, raw_candidate_readback_bytes);
388
389    Ok(DiagnosticAggregationPlan {
390        compact_ranges,
391        counter_readback_bytes,
392        compact_readback_bytes,
393        host_readback_bytes,
394        raw_candidate_readback_bytes,
395        avoided_readback_bytes: avoided_readback_bytes(
396            raw_candidate_readback_bytes,
397            host_readback_bytes,
398        ),
399        compression_ratio_bps,
400        overflow_records,
401        requires_overflow_flag: overflow_records != 0,
402        requires_device_prefix_scan: non_empty_diagnostic_shards > 1,
403        final_only_host_readback: true,
404    })
405}
406
407fn validate_shard(
408    shard: DiagnosticShard,
409    scratch: &mut ReusableIndexScratch<u32>,
410) -> Result<(), DiagnosticAggregationError> {
411    if !scratch.insert_seen(shard.shard) {
412        return Err(DiagnosticAggregationError::DuplicateShard { shard: shard.shard });
413    }
414    if shard.raw_item_bytes == 0 {
415        return Err(DiagnosticAggregationError::ZeroRawItemBytes { shard: shard.shard });
416    }
417    if shard.emitted_diagnostics > shard.candidate_items {
418        return Err(DiagnosticAggregationError::EmittedExceedsCandidates {
419            shard: shard.shard,
420            emitted_diagnostics: shard.emitted_diagnostics,
421            candidate_items: shard.candidate_items,
422        });
423    }
424    if shard.emitted_diagnostics != 0 && shard.candidate_items == 0 {
425        return Err(DiagnosticAggregationError::ZeroCandidates { shard: shard.shard });
426    }
427    if shard.emitted_diagnostics != 0 && shard.diagnostic_record_bytes == 0 {
428        return Err(DiagnosticAggregationError::ZeroDiagnosticRecordBytes { shard: shard.shard });
429    }
430    if shard.emitted_diagnostics != 0 && shard.severity_mask == 0 {
431        return Err(DiagnosticAggregationError::MissingSeverityMask { shard: shard.shard });
432    }
433    Ok(())
434}
435
436/// Compression ratio of compact diagnostic readback relative to raw candidate readback.
437#[must_use]
438pub fn diagnostic_compression_ratio_bps(
439    host_readback_bytes: u64,
440    raw_candidate_readback_bytes: u64,
441) -> u32 {
442    DEVICE_DIAGNOSTIC_AGGREGATION_NUMERIC.ratio_basis_points_u64(
443        host_readback_bytes,
444        raw_candidate_readback_bytes,
445        0,
446        "diagnostic compression ratio",
447    )
448}
449
450fn avoided_readback_bytes(raw_candidate_readback_bytes: u64, host_readback_bytes: u64) -> u64 {
451    if raw_candidate_readback_bytes >= host_readback_bytes {
452        raw_candidate_readback_bytes - host_readback_bytes
453    } else {
454        0
455    }
456}
457
458fn reserved_aggregation_vec<T>(
459    capacity: usize,
460    field: &'static str,
461) -> Result<Vec<T>, DiagnosticAggregationError> {
462    reserved_vec(
463        DEVICE_DIAGNOSTIC_AGGREGATION_RESERVATION,
464        capacity,
465        field,
466        storage_reserve_failed,
467    )
468}
469
470fn storage_reserve_failed(
471    field: &'static str,
472    requested: usize,
473    message: String,
474) -> DiagnosticAggregationError {
475    DiagnosticAggregationError::StorageReserveFailed {
476        field,
477        requested,
478        message,
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn diagnostic_aggregation_compacts_sparse_device_diagnostics() {
488        let plan = plan_device_diagnostic_aggregation(
489            &[
490                shard(2, 2_000, 4, 32, 24, 16, 0b010),
491                shard(1, 1_000, 2, 32, 24, 16, 0b001),
492                shard(3, 4_000, 0, 32, 24, 16, 0),
493            ],
494            64,
495            1_024,
496        )
497        .expect("Fix: sparse diagnostics should aggregate on device");
498
499        assert_eq!(
500            plan.compact_ranges,
501            vec![
502                DiagnosticCompactRange {
503                    shard: 1,
504                    compact_offset: 0,
505                    records: 2,
506                    bytes: 48,
507                },
508                DiagnosticCompactRange {
509                    shard: 2,
510                    compact_offset: 48,
511                    records: 4,
512                    bytes: 96,
513                },
514            ]
515        );
516        assert_eq!(plan.counter_readback_bytes, 48);
517        assert_eq!(plan.compact_readback_bytes, 144);
518        assert_eq!(plan.host_readback_bytes, 192);
519        assert_eq!(plan.raw_candidate_readback_bytes, 224_000);
520        assert_eq!(plan.avoided_readback_bytes, 223_808);
521        assert!(plan.compression_ratio_bps < 10);
522        assert!(plan.requires_device_prefix_scan);
523        assert!(plan.final_only_host_readback);
524    }
525
526    #[test]
527    fn diagnostic_aggregation_caps_overflow_without_host_filtering() {
528        let plan =
529            plan_device_diagnostic_aggregation(&[shard(7, 1_000, 10, 32, 16, 8, 0b111)], 3, 128)
530                .expect("Fix: overflow should be represented by device-side flags");
531
532        assert_eq!(plan.compact_ranges[0].records, 3);
533        assert_eq!(plan.overflow_records, 7);
534        assert!(plan.requires_overflow_flag);
535        assert_eq!(plan.host_readback_bytes, 56);
536        assert!(
537            !plan.requires_device_prefix_scan,
538            "Fix: a single non-empty diagnostic shard has compact offset zero and must not schedule a device prefix scan."
539        );
540    }
541
542    #[test]
543    fn diagnostic_aggregation_ratio_does_not_saturate_before_division() {
544        let plan = plan_device_diagnostic_aggregation(
545            &[shard(9, u64::MAX / 32, 1, 32, 16, u64::MAX / 20, 0b001)],
546            1,
547            u64::MAX,
548        )
549        .expect("Fix: large diagnostic plans must retain exact ratio arithmetic");
550
551        let expected = (((plan.host_readback_bytes as u128) * 10_000)
552            / plan.raw_candidate_readback_bytes as u128) as u32;
553        assert_eq!(plan.compression_ratio_bps, expected);
554        assert!(plan.compression_ratio_bps > 100);
555    }
556
557    #[test]
558    fn diagnostic_aggregation_rejects_invalid_or_cpu_shaped_inputs() {
559        assert_eq!(
560            plan_device_diagnostic_aggregation(
561                &[shard(1, 8, 1, 32, 24, 8, 1), shard(1, 8, 1, 32, 24, 8, 1)],
562                4,
563                1_024,
564            )
565            .expect_err("duplicate shard should fail"),
566            DiagnosticAggregationError::DuplicateShard { shard: 1 }
567        );
568        assert_eq!(
569            plan_device_diagnostic_aggregation(&[shard(2, 8, 9, 32, 24, 8, 1)], 4, 1_024)
570                .expect_err("emitted diagnostics cannot exceed candidates"),
571            DiagnosticAggregationError::EmittedExceedsCandidates {
572                shard: 2,
573                emitted_diagnostics: 9,
574                candidate_items: 8,
575            }
576        );
577        assert_eq!(
578            plan_device_diagnostic_aggregation(&[shard(3, 8, 1, 32, 24, 8, 0)], 4, 1_024)
579                .expect_err("diagnostics must retain class mask"),
580            DiagnosticAggregationError::MissingSeverityMask { shard: 3 }
581        );
582        assert_eq!(
583            plan_device_diagnostic_aggregation(&[shard(4, 8, 1, 32, 24, 8, 1)], 4, 16)
584                .expect_err("over budget plan should fail"),
585            DiagnosticAggregationError::OverBudget {
586                required_bytes: 32,
587                budget_bytes: 16,
588            }
589        );
590    }
591
592    #[test]
593    fn diagnostic_aggregation_reports_zero_avoided_bytes_when_counters_exceed_raw_stream() {
594        let plan = plan_device_diagnostic_aggregation(
595            &[shard(1, 1, 0, 1, 8, 64, 0)],
596            1,
597            128,
598        )
599        .expect("Fix: diagnostic aggregation should report negative savings as zero avoided bytes, not fail with underflow");
600
601        assert_eq!(plan.raw_candidate_readback_bytes, 1);
602        assert_eq!(plan.host_readback_bytes, 64);
603        assert_eq!(plan.avoided_readback_bytes, 0);
604        assert_eq!(plan.compression_ratio_bps, 640_000);
605        assert!(plan.final_only_host_readback);
606    }
607
608    #[test]
609    fn diagnostic_aggregation_avoids_tree_sets_and_shard_vector_copies() {
610        let src = include_str!("device_diagnostic_aggregation.rs");
611        assert!(
612            !src.contains(concat!("BTree", "Set")),
613            "Fix: diagnostic aggregation should hash shard ids and sort compact-readback indices once."
614        );
615        assert!(
616            !src.contains(concat!("shards", ".to_vec()")),
617            "Fix: diagnostic aggregation should not copy all shard records before final compact-range ordering."
618        );
619        assert!(
620            src.contains("fn avoided_readback_bytes(")
621                && src.contains("raw_candidate_readback_bytes >= host_readback_bytes"),
622            "Fix: avoided-readback telemetry must explicitly clamp negative savings to zero after checked host/raw accounting."
623        );
624        assert!(
625            src.contains("DiagnosticAggregationScratch::try_with_capacity(shards.len())?"),
626            "Fix: diagnostic aggregation must stage scratch with fallible release-path allocation."
627        );
628        assert!(
629            src.contains("scratch.try_reserve_shards(shards.len())?"),
630            "Fix: caller-owned diagnostic aggregation scratch must grow through fallible reservation."
631        );
632        assert!(
633            src.contains("ReusableIndexScratch"),
634            "Fix: diagnostic aggregation duplicate detection and ordering scratch must share the paired typed fallible reservation helper."
635        );
636        assert!(
637            src.contains("StorageReserveFailed"),
638            "Fix: diagnostic aggregation allocation failures must surface as actionable launch-planning errors."
639        );
640        assert!(
641            !src.contains(concat!("FxHashSet::with_capacity", "_and_hasher")),
642            "Fix: diagnostic aggregation scratch hash storage must not allocate infallibly."
643        );
644        assert!(
645            !src.contains(concat!("Vec::with_capacity", "(shard_count)"))
646                && !src.contains(concat!("Vec::with_capacity", "(shards.len())")),
647            "Fix: diagnostic aggregation scratch/result vectors must not allocate infallibly."
648        );
649    }
650
651    #[test]
652    fn diagnostic_aggregation_reuses_caller_owned_shard_planning_scratch() {
653        let mut scratch =
654            DiagnosticAggregationScratch::try_with_capacity(128).expect("Fix: scratch capacity");
655        let wide = (0..128)
656            .rev()
657            .map(|index| shard(index, 1_024, 1, 32, 16, 8, 1))
658            .collect::<Vec<_>>();
659        let first =
660            plan_device_diagnostic_aggregation_with_scratch(&wide, 4, 1 << 20, &mut scratch)
661                .expect("Fix: wide diagnostic aggregation should plan with reusable scratch");
662        let id_capacity = scratch.id_capacity();
663        let ordered_index_capacity = scratch.ordered_index_capacity();
664
665        assert_eq!(first.compact_ranges.len(), 128);
666        assert_eq!(first.compact_ranges[0].shard, 0);
667
668        let second = plan_device_diagnostic_aggregation_with_scratch(
669            &[
670                shard(9, 1_000, 0, 32, 24, 16, 0),
671                shard(3, 1_000, 7, 32, 24, 16, 1),
672            ],
673            3,
674            1 << 20,
675            &mut scratch,
676        )
677        .expect("Fix: smaller diagnostic aggregation should reuse previous scratch");
678
679        assert_eq!(second.compact_ranges[0].shard, 3);
680        assert_eq!(second.overflow_records, 4);
681        assert!(scratch.id_capacity() >= id_capacity);
682        assert!(scratch.ordered_index_capacity() >= ordered_index_capacity);
683    }
684
685    #[test]
686    fn generated_diagnostic_aggregation_profiles_preserve_exact_telemetry_for_4096_shapes() {
687        let mut scratch = DiagnosticAggregationScratch::default();
688        for shard_count in 1u32..=128 {
689            for cap in 1u64..=32 {
690                let shards = (0..shard_count)
691                    .rev()
692                    .map(|id| {
693                        let candidates = u64::from((id % 19) + 1) * 8;
694                        let emitted = u64::from(id % 7);
695                        shard(
696                            id,
697                            candidates,
698                            emitted.min(candidates),
699                            16,
700                            12,
701                            8,
702                            if emitted == 0 { 0 } else { 1 << (id % 8) },
703                        )
704                    })
705                    .collect::<Vec<_>>();
706
707                let plan = plan_device_diagnostic_aggregation_with_scratch(
708                    &shards,
709                    cap,
710                    u64::MAX,
711                    &mut scratch,
712                )
713                .expect("Fix: generated diagnostic aggregation profile should plan");
714
715                let expected_raw = shards
716                    .iter()
717                    .map(|shard| shard.candidate_items * shard.raw_item_bytes)
718                    .sum::<u64>();
719                let expected_counter = shards.iter().map(|shard| shard.counter_bytes).sum::<u64>();
720                let expected_compact = shards
721                    .iter()
722                    .map(|shard| shard.emitted_diagnostics.min(cap) * shard.diagnostic_record_bytes)
723                    .sum::<u64>();
724                assert_eq!(plan.raw_candidate_readback_bytes, expected_raw);
725                assert_eq!(plan.counter_readback_bytes, expected_counter);
726                assert_eq!(plan.compact_readback_bytes, expected_compact);
727                assert_eq!(
728                    plan.host_readback_bytes,
729                    expected_counter + expected_compact
730                );
731                assert!(plan
732                    .compact_ranges
733                    .windows(2)
734                    .all(|pair| pair[0].shard < pair[1].shard));
735                assert!(plan.final_only_host_readback);
736            }
737        }
738    }
739
740    #[test]
741    fn diagnostic_aggregation_production_ratio_path_does_not_panic() {
742        let source = include_str!("device_diagnostic_aggregation.rs");
743        let production = source
744            .split("#[cfg(test)]")
745            .next()
746            .expect("Fix: diagnostic aggregation source must contain production section");
747        assert!(
748            !production.contains(".expect(")
749                && !production.contains(concat!("panic", "!("))
750                && !production.contains(".unwrap_or_else("),
751            "Fix: diagnostic aggregation production planning must return errors or bounded telemetry instead of panicking."
752        );
753        assert_eq!(
754            diagnostic_compression_ratio_bps(u64::MAX, 1),
755            u32::MAX,
756            "Fix: diagnostic compression telemetry must remain bounded when a pathological ratio exceeds export width."
757        );
758    }
759
760    fn shard(
761        shard: u32,
762        candidate_items: u64,
763        emitted_diagnostics: u64,
764        raw_item_bytes: u64,
765        diagnostic_record_bytes: u64,
766        counter_bytes: u64,
767        severity_mask: u32,
768    ) -> DiagnosticShard {
769        DiagnosticShard {
770            shard,
771            candidate_items,
772            emitted_diagnostics,
773            raw_item_bytes,
774            diagnostic_record_bytes,
775            counter_bytes,
776            severity_mask,
777        }
778    }
779}