Skip to main content

vyre_driver/
result_compaction.rs

1//! Backend-neutral compact result readback planning.
2
3use crate::accounting::{
4    checked_add_u64_count as checked_add, checked_add_usize_count as checked_add_usize,
5    checked_sub_u64_count as checked_sub, ArithmeticOverflow,
6};
7use crate::numeric::BackendNumericPolicy;
8use crate::reservation_policy::{
9    reserved_typed_vec as reserved_vec, ReservationPolicy, ReusableIndexScratch,
10};
11
12const RESULT_COMPACTION_RESERVATION: ReservationPolicy = ReservationPolicy::new(
13    "result compaction",
14    "shard result readback planning before launch",
15);
16
17const RESULT_COMPACTION_NUMERIC: BackendNumericPolicy =
18    BackendNumericPolicy::new("result compaction");
19
20/// One output slot before result compaction.
21#[derive(Clone, Copy, Debug, Eq, PartialEq)]
22pub struct ResultSlot {
23    /// Stable output slot id.
24    pub slot: u32,
25    /// Meaningful bytes produced by the kernel.
26    pub meaningful_bytes: u64,
27    /// Allocated/readback capacity for the output slot.
28    pub capacity_bytes: u64,
29}
30
31/// One compact readback record.
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub struct CompactResultRecord {
34    /// Source output slot id.
35    pub slot: u32,
36    /// Offset in the compact readback slab.
37    pub compact_offset: u64,
38    /// Meaningful bytes copied into the slab.
39    pub bytes: u64,
40}
41
42/// Compact result readback plan.
43#[derive(Clone, Debug, Eq, PartialEq)]
44pub struct ResultCompactionPlan {
45    /// Records copied into the compact slab.
46    pub compact_records: Vec<CompactResultRecord>,
47    /// Output slots left as direct readback ranges.
48    pub direct_slots: Vec<u32>,
49    /// Total allocated/readback capacity across all output slots.
50    pub full_capacity_bytes: u64,
51    /// Total compact slab bytes.
52    pub compact_bytes: u64,
53    /// Total direct readback bytes.
54    pub direct_bytes: u64,
55    /// Total bytes actually selected for readback after compaction planning.
56    pub selected_readback_bytes: u64,
57    /// Bytes avoided compared with reading full output capacities.
58    pub avoided_readback_bytes: u64,
59    /// Avoided readback as floor basis points of full capacity.
60    pub avoided_readback_basis_points: u32,
61}
62
63/// Caller-owned scratch for repeated result-compaction planning.
64#[derive(Debug, Default)]
65pub struct ResultCompactionScratch {
66    index_scratch: ReusableIndexScratch<u32>,
67}
68
69impl ResultCompactionScratch {
70    /// Allocate empty reusable compaction scratch.
71    #[must_use]
72    pub fn new() -> Self {
73        Self::default()
74    }
75
76    /// Allocate reusable compaction scratch for a known output-slot count.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`ResultCompactionError`] when scratch storage cannot be reserved.
81    pub fn try_with_capacity(slot_count: usize) -> Result<Self, ResultCompactionError> {
82        let mut scratch = Self::default();
83        scratch.try_reserve_slots(slot_count)?;
84        Ok(scratch)
85    }
86
87    /// Reserve reusable compaction scratch for a known output-slot count.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`ResultCompactionError`] when scratch storage cannot be reserved.
92    pub fn try_reserve_slots(&mut self, slot_count: usize) -> Result<(), ResultCompactionError> {
93        self.index_scratch.try_reserve_with(
94            RESULT_COMPACTION_RESERVATION,
95            slot_count,
96            "scratch.ids",
97            "scratch.ordered_indices",
98            storage_reserve_failed,
99        )
100    }
101
102    /// Retained duplicate-detection capacity.
103    #[must_use]
104    pub fn id_capacity(&self) -> usize {
105        self.index_scratch.seen_capacity()
106    }
107
108    /// Retained slot-ordering capacity.
109    #[must_use]
110    pub fn ordered_index_capacity(&self) -> usize {
111        self.index_scratch.ordered_index_capacity()
112    }
113}
114
115/// Result compaction errors.
116#[derive(Clone, Debug, Eq, PartialEq)]
117pub enum ResultCompactionError {
118    /// Duplicate output slot id.
119    DuplicateSlot {
120        /// Duplicate slot.
121        slot: u32,
122    },
123    /// Meaningful bytes exceed allocated slot capacity.
124    MeaningfulExceedsCapacity {
125        /// Output slot.
126        slot: u32,
127        /// Meaningful bytes.
128        meaningful_bytes: u64,
129        /// Slot capacity.
130        capacity_bytes: u64,
131    },
132    /// Byte arithmetic overflowed.
133    ByteCountOverflow {
134        /// Field being computed.
135        field: &'static str,
136    },
137    /// Scratch or result-vector storage reservation failed before launch planning.
138    StorageReserveFailed {
139        /// Field being reserved.
140        field: &'static str,
141        /// Requested total capacity.
142        requested: usize,
143        /// Allocator failure details.
144        message: String,
145    },
146}
147
148impl ArithmeticOverflow for ResultCompactionError {
149    fn arithmetic_overflow(field: &'static str) -> Self {
150        Self::ByteCountOverflow { field }
151    }
152}
153
154impl std::fmt::Display for ResultCompactionError {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        match self {
157            Self::DuplicateSlot { slot } => write!(
158                f,
159                "result compaction received duplicate output slot {slot}. Fix: assign unique output slots before readback planning."
160            ),
161            Self::MeaningfulExceedsCapacity {
162                slot,
163                meaningful_bytes,
164                capacity_bytes,
165            } => write!(
166                f,
167                "result slot {slot} has meaningful_bytes={meaningful_bytes} above capacity_bytes={capacity_bytes}. Fix: compute compact result sizes before dispatch readback."
168            ),
169            Self::ByteCountOverflow { field } => write!(
170                f,
171                "result compaction overflowed while computing {field}. Fix: shard compact result readback before launch."
172            ),
173            Self::StorageReserveFailed {
174                field,
175                requested,
176                message,
177            } => write!(
178                f,
179                "result compaction failed to reserve {field} for {requested} entries: {message}. Fix: shard result readback planning before launch."
180            ),
181        }
182    }
183}
184
185impl std::error::Error for ResultCompactionError {}
186
187/// Plan compact readback for small outputs.
188///
189/// # Errors
190///
191/// Returns [`ResultCompactionError`] when slots are invalid, byte accounting
192/// overflows, or result storage cannot be reserved.
193pub fn plan_result_compaction(
194    slots: &[ResultSlot],
195    max_compact_record_bytes: u64,
196) -> Result<ResultCompactionPlan, ResultCompactionError> {
197    let mut scratch = ResultCompactionScratch::try_with_capacity(slots.len())?;
198    plan_result_compaction_with_scratch(slots, max_compact_record_bytes, &mut scratch)
199}
200
201/// Plan compact readback using caller-owned temporary storage.
202///
203/// # Errors
204///
205/// Returns [`ResultCompactionError`] when slots are invalid, byte accounting
206/// overflows, or result storage cannot be reserved.
207pub fn plan_result_compaction_with_scratch(
208    slots: &[ResultSlot],
209    max_compact_record_bytes: u64,
210    scratch: &mut ResultCompactionScratch,
211) -> Result<ResultCompactionPlan, ResultCompactionError> {
212    scratch.index_scratch.clear();
213    scratch.try_reserve_slots(slots.len())?;
214    let mut full_capacity_bytes = 0_u64;
215    let mut compact_record_count = 0usize;
216    let mut direct_slot_count = 0usize;
217
218    for (index, slot) in slots.iter().copied().enumerate() {
219        if !scratch.index_scratch.insert_seen(slot.slot) {
220            return Err(ResultCompactionError::DuplicateSlot { slot: slot.slot });
221        }
222        if slot.meaningful_bytes > slot.capacity_bytes {
223            return Err(ResultCompactionError::MeaningfulExceedsCapacity {
224                slot: slot.slot,
225                meaningful_bytes: slot.meaningful_bytes,
226                capacity_bytes: slot.capacity_bytes,
227            });
228        }
229        full_capacity_bytes = checked_add(
230            full_capacity_bytes,
231            slot.capacity_bytes,
232            "full capacity bytes",
233        )?;
234        if slot.meaningful_bytes != 0 {
235            if slot.meaningful_bytes <= max_compact_record_bytes {
236                compact_record_count =
237                    checked_add_usize(compact_record_count, 1, "compact record count")?;
238            } else {
239                direct_slot_count = checked_add_usize(direct_slot_count, 1, "direct slot count")?;
240            }
241        }
242        scratch.index_scratch.push_index(index);
243    }
244    scratch
245        .index_scratch
246        .sort_indices_unstable_by_key_if_needed(|index| slots[index].slot);
247
248    let mut compact_records = reserved_result_vec(compact_record_count, "compact_records")?;
249    let mut direct_slots = reserved_result_vec(direct_slot_count, "direct_slots")?;
250    let mut compact_bytes = 0_u64;
251    let mut direct_bytes = 0_u64;
252
253    for &index in scratch.index_scratch.ordered_indices() {
254        let slot = slots[index];
255        if slot.meaningful_bytes == 0 {
256            continue;
257        }
258        if slot.meaningful_bytes <= max_compact_record_bytes {
259            compact_records.push(CompactResultRecord {
260                slot: slot.slot,
261                compact_offset: compact_bytes,
262                bytes: slot.meaningful_bytes,
263            });
264            compact_bytes = checked_add(compact_bytes, slot.meaningful_bytes, "compact bytes")?;
265        } else {
266            direct_slots.push(slot.slot);
267            direct_bytes = checked_add(direct_bytes, slot.meaningful_bytes, "direct bytes")?;
268        }
269    }
270
271    let selected_readback_bytes =
272        checked_add(compact_bytes, direct_bytes, "selected readback bytes")?;
273    let avoided_readback_bytes = checked_sub(
274        full_capacity_bytes,
275        selected_readback_bytes,
276        "avoided readback bytes",
277    )?;
278
279    Ok(ResultCompactionPlan {
280        compact_records,
281        direct_slots,
282        full_capacity_bytes,
283        compact_bytes,
284        direct_bytes,
285        selected_readback_bytes,
286        avoided_readback_bytes,
287        avoided_readback_basis_points: RESULT_COMPACTION_NUMERIC.ratio_basis_points_u64(
288            avoided_readback_bytes,
289            full_capacity_bytes,
290            0,
291            "result-compaction avoided-readback",
292        ),
293    })
294}
295
296fn reserved_result_vec<T>(
297    capacity: usize,
298    field: &'static str,
299) -> Result<Vec<T>, ResultCompactionError> {
300    reserved_vec(
301        RESULT_COMPACTION_RESERVATION,
302        capacity,
303        field,
304        storage_reserve_failed,
305    )
306}
307
308fn storage_reserve_failed(
309    field: &'static str,
310    requested: usize,
311    message: String,
312) -> ResultCompactionError {
313    ResultCompactionError::StorageReserveFailed {
314        field,
315        requested,
316        message,
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn result_compaction_packs_small_outputs_and_skips_empty_slots() {
326        let plan =
327            plan_result_compaction(&[slot(2, 0, 128), slot(1, 12, 128), slot(3, 24, 256)], 32)
328                .expect("Fix: small outputs should compact");
329
330        assert_eq!(
331            plan.compact_records,
332            vec![
333                CompactResultRecord {
334                    slot: 1,
335                    compact_offset: 0,
336                    bytes: 12,
337                },
338                CompactResultRecord {
339                    slot: 3,
340                    compact_offset: 12,
341                    bytes: 24,
342                },
343            ]
344        );
345        assert_eq!(plan.direct_slots, Vec::<u32>::new());
346        assert_eq!(plan.full_capacity_bytes, 512);
347        assert_eq!(plan.compact_bytes, 36);
348        assert_eq!(plan.direct_bytes, 0);
349        assert_eq!(plan.selected_readback_bytes, 36);
350        assert_eq!(plan.avoided_readback_bytes, 476);
351        assert_eq!(plan.avoided_readback_basis_points, 9_296);
352    }
353
354    #[test]
355    fn result_compaction_keeps_large_outputs_direct() {
356        let plan = plan_result_compaction(&[slot(1, 64, 128), slot(2, 512, 1_024)], 128)
357            .expect("Fix: mixed outputs should plan");
358
359        assert_eq!(plan.compact_records.len(), 1);
360        assert_eq!(plan.direct_slots, vec![2]);
361        assert_eq!(plan.full_capacity_bytes, 1_152);
362        assert_eq!(plan.compact_bytes, 64);
363        assert_eq!(plan.direct_bytes, 512);
364        assert_eq!(plan.selected_readback_bytes, 576);
365        assert_eq!(plan.avoided_readback_bytes, 576);
366        assert_eq!(plan.avoided_readback_basis_points, 5_000);
367    }
368
369    #[test]
370    fn result_compaction_reports_zero_work_telemetry_without_division() {
371        let plan = plan_result_compaction(&[slot(4, 0, 0), slot(9, 0, 0)], 128)
372            .expect("Fix: zero-capacity outputs should plan");
373
374        assert!(plan.compact_records.is_empty());
375        assert!(plan.direct_slots.is_empty());
376        assert_eq!(plan.full_capacity_bytes, 0);
377        assert_eq!(plan.compact_bytes, 0);
378        assert_eq!(plan.direct_bytes, 0);
379        assert_eq!(plan.selected_readback_bytes, 0);
380        assert_eq!(plan.avoided_readback_bytes, 0);
381        assert_eq!(plan.avoided_readback_basis_points, 0);
382    }
383
384    #[test]
385    fn result_compaction_rejects_invalid_slots() {
386        assert_eq!(
387            plan_result_compaction(&[slot(1, 1, 8), slot(1, 1, 8)], 4)
388                .expect_err("duplicate slots should fail"),
389            ResultCompactionError::DuplicateSlot { slot: 1 }
390        );
391        assert_eq!(
392            plan_result_compaction(&[slot(2, 9, 8)], 4)
393                .expect_err("meaningful bytes above capacity should fail"),
394            ResultCompactionError::MeaningfulExceedsCapacity {
395                slot: 2,
396                meaningful_bytes: 9,
397                capacity_bytes: 8,
398            }
399        );
400    }
401
402    #[test]
403    fn result_compaction_avoids_tree_sets_and_slot_vector_copies() {
404        let src = include_str!("result_compaction.rs");
405        assert!(
406            !src.contains(concat!("BTree", "Set")),
407            "Fix: result compaction duplicate detection should use a hash set; slot ordering should be a final index sort."
408        );
409        assert!(
410            !src.contains(concat!("slots", ".to_vec()")),
411            "Fix: result compaction should sort slot indices rather than copying every slot before planning readback."
412        );
413        assert!(
414            !src.contains(concat!(".", "saturating_sub")),
415            "Fix: result compaction avoided-readback accounting must be exact, not saturating."
416        );
417        assert!(
418            !src.contains(concat!(" as ", "f32")) && !src.contains(concat!(" as ", "f64")),
419            "Fix: result compaction efficiency telemetry must use integer arithmetic, not lossy floats."
420        );
421        assert!(
422            src.contains("pub full_capacity_bytes: u64")
423                && src.contains("pub selected_readback_bytes: u64")
424                && src.contains("pub avoided_readback_basis_points: u32"),
425            "Fix: result compaction plans must expose checked capacity and integer reduction telemetry."
426        );
427        assert!(src.contains("RESULT_COMPACTION_NUMERIC.ratio_basis_points_u64"));
428        assert!(
429            !src.contains(concat!("fn ", "ratio_basis_points(")),
430            "Fix: result compaction must not carry a local numeric wrapper around the shared numeric policy."
431        );
432        assert!(
433            src.contains("ResultCompactionScratch::try_with_capacity(slots.len())?"),
434            "Fix: result compaction must stage scratch with fallible release-path allocation."
435        );
436        assert!(
437            src.contains("scratch.try_reserve_slots(slots.len())?"),
438            "Fix: caller-owned result compaction scratch must grow through fallible reservation."
439        );
440        assert!(
441            src.contains("ReusableIndexScratch"),
442            "Fix: result compaction duplicate detection and ordering scratch must share the paired typed fallible reservation helper."
443        );
444        assert!(
445            src.contains("StorageReserveFailed"),
446            "Fix: result compaction allocation failures must surface as actionable launch-planning errors."
447        );
448        assert!(
449            !src.contains(concat!("FxHashSet::with_capacity", "_and_hasher")),
450            "Fix: result compaction scratch hash storage must not allocate infallibly."
451        );
452        assert!(
453            !src.contains(concat!("Vec::with_capacity", "(slot_count)"))
454                && !src.contains(concat!("Vec::with_capacity", "(slots.len())")),
455            "Fix: result compaction scratch/result vectors must not allocate infallibly."
456        );
457    }
458
459    #[test]
460    fn result_compaction_reuses_caller_owned_slot_planning_scratch() {
461        let mut scratch =
462            ResultCompactionScratch::try_with_capacity(96).expect("Fix: scratch capacity");
463        let wide = (0..96)
464            .rev()
465            .map(|index| slot(index, 8, 64))
466            .collect::<Vec<_>>();
467        let first = plan_result_compaction_with_scratch(&wide, 16, &mut scratch)
468            .expect("Fix: wide compact result set should plan with reusable scratch");
469        let id_capacity = scratch.id_capacity();
470        let ordered_index_capacity = scratch.ordered_index_capacity();
471
472        assert_eq!(first.compact_records.len(), 96);
473        assert_eq!(first.compact_records[0].slot, 0);
474
475        let second = plan_result_compaction_with_scratch(
476            &[slot(7, 0, 128), slot(3, 512, 1_024), slot(5, 16, 128)],
477            32,
478            &mut scratch,
479        )
480        .expect("Fix: smaller mixed result set should reuse previous scratch");
481
482        assert_eq!(second.compact_records[0].slot, 5);
483        assert_eq!(second.direct_slots, vec![3]);
484        assert!(scratch.id_capacity() >= id_capacity);
485        assert!(scratch.ordered_index_capacity() >= ordered_index_capacity);
486    }
487
488    #[test]
489    fn generated_result_compaction_profiles_preserve_exact_telemetry_for_4096_shapes() {
490        let mut scratch = ResultCompactionScratch::default();
491        for slot_count in 1u32..=128 {
492            for compact_threshold in 0u64..32 {
493                let slots = (0..slot_count)
494                    .rev()
495                    .map(|slot_id| {
496                        let meaningful = u64::from((slot_id % 17) + 1);
497                        ResultSlot {
498                            slot: slot_id,
499                            meaningful_bytes: meaningful,
500                            capacity_bytes: meaningful + compact_threshold + 8,
501                        }
502                    })
503                    .collect::<Vec<_>>();
504
505                let plan =
506                    plan_result_compaction_with_scratch(&slots, compact_threshold, &mut scratch)
507                        .expect("Fix: generated result compaction profile should plan");
508
509                let expected_full = slots.iter().map(|slot| slot.capacity_bytes).sum::<u64>();
510                let expected_selected = slots.iter().map(|slot| slot.meaningful_bytes).sum::<u64>();
511                assert_eq!(plan.full_capacity_bytes, expected_full);
512                assert_eq!(plan.selected_readback_bytes, expected_selected);
513                assert_eq!(
514                    plan.avoided_readback_bytes,
515                    expected_full - expected_selected
516                );
517                assert!(plan
518                    .compact_records
519                    .windows(2)
520                    .all(|pair| pair[0].slot < pair[1].slot));
521                assert!(plan.direct_slots.windows(2).all(|pair| pair[0] < pair[1]));
522            }
523        }
524    }
525
526    fn slot(slot: u32, meaningful_bytes: u64, capacity_bytes: u64) -> ResultSlot {
527        ResultSlot {
528            slot,
529            meaningful_bytes,
530            capacity_bytes,
531        }
532    }
533}