1use crate::accounting::{
4 checked_add_u64_count as checked_add, checked_add_usize_count as checked_add_usize,
5 checked_sub_u64_count as checked_sub, ArithmeticOverflow,
6};
7use crate::numeric::BackendNumericPolicy;
8use crate::reservation_policy::{
9 reserved_typed_vec as reserved_vec, ReservationPolicy, ReusableIndexScratch,
10};
11
12const RESULT_COMPACTION_RESERVATION: ReservationPolicy = ReservationPolicy::new(
13 "result compaction",
14 "shard result readback planning before launch",
15);
16
17const RESULT_COMPACTION_NUMERIC: BackendNumericPolicy =
18 BackendNumericPolicy::new("result compaction");
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq)]
22pub struct ResultSlot {
23 pub slot: u32,
25 pub meaningful_bytes: u64,
27 pub capacity_bytes: u64,
29}
30
31#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub struct CompactResultRecord {
34 pub slot: u32,
36 pub compact_offset: u64,
38 pub bytes: u64,
40}
41
42#[derive(Clone, Debug, Eq, PartialEq)]
44pub struct ResultCompactionPlan {
45 pub compact_records: Vec<CompactResultRecord>,
47 pub direct_slots: Vec<u32>,
49 pub full_capacity_bytes: u64,
51 pub compact_bytes: u64,
53 pub direct_bytes: u64,
55 pub selected_readback_bytes: u64,
57 pub avoided_readback_bytes: u64,
59 pub avoided_readback_basis_points: u32,
61}
62
63#[derive(Debug, Default)]
65pub struct ResultCompactionScratch {
66 index_scratch: ReusableIndexScratch<u32>,
67}
68
69impl ResultCompactionScratch {
70 #[must_use]
72 pub fn new() -> Self {
73 Self::default()
74 }
75
76 pub fn try_with_capacity(slot_count: usize) -> Result<Self, ResultCompactionError> {
82 let mut scratch = Self::default();
83 scratch.try_reserve_slots(slot_count)?;
84 Ok(scratch)
85 }
86
87 pub fn try_reserve_slots(&mut self, slot_count: usize) -> Result<(), ResultCompactionError> {
93 self.index_scratch.try_reserve_with(
94 RESULT_COMPACTION_RESERVATION,
95 slot_count,
96 "scratch.ids",
97 "scratch.ordered_indices",
98 storage_reserve_failed,
99 )
100 }
101
102 #[must_use]
104 pub fn id_capacity(&self) -> usize {
105 self.index_scratch.seen_capacity()
106 }
107
108 #[must_use]
110 pub fn ordered_index_capacity(&self) -> usize {
111 self.index_scratch.ordered_index_capacity()
112 }
113}
114
115#[derive(Clone, Debug, Eq, PartialEq)]
117pub enum ResultCompactionError {
118 DuplicateSlot {
120 slot: u32,
122 },
123 MeaningfulExceedsCapacity {
125 slot: u32,
127 meaningful_bytes: u64,
129 capacity_bytes: u64,
131 },
132 ByteCountOverflow {
134 field: &'static str,
136 },
137 StorageReserveFailed {
139 field: &'static str,
141 requested: usize,
143 message: String,
145 },
146}
147
148impl ArithmeticOverflow for ResultCompactionError {
149 fn arithmetic_overflow(field: &'static str) -> Self {
150 Self::ByteCountOverflow { field }
151 }
152}
153
154impl std::fmt::Display for ResultCompactionError {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 match self {
157 Self::DuplicateSlot { slot } => write!(
158 f,
159 "result compaction received duplicate output slot {slot}. Fix: assign unique output slots before readback planning."
160 ),
161 Self::MeaningfulExceedsCapacity {
162 slot,
163 meaningful_bytes,
164 capacity_bytes,
165 } => write!(
166 f,
167 "result slot {slot} has meaningful_bytes={meaningful_bytes} above capacity_bytes={capacity_bytes}. Fix: compute compact result sizes before dispatch readback."
168 ),
169 Self::ByteCountOverflow { field } => write!(
170 f,
171 "result compaction overflowed while computing {field}. Fix: shard compact result readback before launch."
172 ),
173 Self::StorageReserveFailed {
174 field,
175 requested,
176 message,
177 } => write!(
178 f,
179 "result compaction failed to reserve {field} for {requested} entries: {message}. Fix: shard result readback planning before launch."
180 ),
181 }
182 }
183}
184
185impl std::error::Error for ResultCompactionError {}
186
187pub fn plan_result_compaction(
194 slots: &[ResultSlot],
195 max_compact_record_bytes: u64,
196) -> Result<ResultCompactionPlan, ResultCompactionError> {
197 let mut scratch = ResultCompactionScratch::try_with_capacity(slots.len())?;
198 plan_result_compaction_with_scratch(slots, max_compact_record_bytes, &mut scratch)
199}
200
201pub fn plan_result_compaction_with_scratch(
208 slots: &[ResultSlot],
209 max_compact_record_bytes: u64,
210 scratch: &mut ResultCompactionScratch,
211) -> Result<ResultCompactionPlan, ResultCompactionError> {
212 scratch.index_scratch.clear();
213 scratch.try_reserve_slots(slots.len())?;
214 let mut full_capacity_bytes = 0_u64;
215 let mut compact_record_count = 0usize;
216 let mut direct_slot_count = 0usize;
217
218 for (index, slot) in slots.iter().copied().enumerate() {
219 if !scratch.index_scratch.insert_seen(slot.slot) {
220 return Err(ResultCompactionError::DuplicateSlot { slot: slot.slot });
221 }
222 if slot.meaningful_bytes > slot.capacity_bytes {
223 return Err(ResultCompactionError::MeaningfulExceedsCapacity {
224 slot: slot.slot,
225 meaningful_bytes: slot.meaningful_bytes,
226 capacity_bytes: slot.capacity_bytes,
227 });
228 }
229 full_capacity_bytes = checked_add(
230 full_capacity_bytes,
231 slot.capacity_bytes,
232 "full capacity bytes",
233 )?;
234 if slot.meaningful_bytes != 0 {
235 if slot.meaningful_bytes <= max_compact_record_bytes {
236 compact_record_count =
237 checked_add_usize(compact_record_count, 1, "compact record count")?;
238 } else {
239 direct_slot_count = checked_add_usize(direct_slot_count, 1, "direct slot count")?;
240 }
241 }
242 scratch.index_scratch.push_index(index);
243 }
244 scratch
245 .index_scratch
246 .sort_indices_unstable_by_key_if_needed(|index| slots[index].slot);
247
248 let mut compact_records = reserved_result_vec(compact_record_count, "compact_records")?;
249 let mut direct_slots = reserved_result_vec(direct_slot_count, "direct_slots")?;
250 let mut compact_bytes = 0_u64;
251 let mut direct_bytes = 0_u64;
252
253 for &index in scratch.index_scratch.ordered_indices() {
254 let slot = slots[index];
255 if slot.meaningful_bytes == 0 {
256 continue;
257 }
258 if slot.meaningful_bytes <= max_compact_record_bytes {
259 compact_records.push(CompactResultRecord {
260 slot: slot.slot,
261 compact_offset: compact_bytes,
262 bytes: slot.meaningful_bytes,
263 });
264 compact_bytes = checked_add(compact_bytes, slot.meaningful_bytes, "compact bytes")?;
265 } else {
266 direct_slots.push(slot.slot);
267 direct_bytes = checked_add(direct_bytes, slot.meaningful_bytes, "direct bytes")?;
268 }
269 }
270
271 let selected_readback_bytes =
272 checked_add(compact_bytes, direct_bytes, "selected readback bytes")?;
273 let avoided_readback_bytes = checked_sub(
274 full_capacity_bytes,
275 selected_readback_bytes,
276 "avoided readback bytes",
277 )?;
278
279 Ok(ResultCompactionPlan {
280 compact_records,
281 direct_slots,
282 full_capacity_bytes,
283 compact_bytes,
284 direct_bytes,
285 selected_readback_bytes,
286 avoided_readback_bytes,
287 avoided_readback_basis_points: RESULT_COMPACTION_NUMERIC.ratio_basis_points_u64(
288 avoided_readback_bytes,
289 full_capacity_bytes,
290 0,
291 "result-compaction avoided-readback",
292 ),
293 })
294}
295
296fn reserved_result_vec<T>(
297 capacity: usize,
298 field: &'static str,
299) -> Result<Vec<T>, ResultCompactionError> {
300 reserved_vec(
301 RESULT_COMPACTION_RESERVATION,
302 capacity,
303 field,
304 storage_reserve_failed,
305 )
306}
307
308fn storage_reserve_failed(
309 field: &'static str,
310 requested: usize,
311 message: String,
312) -> ResultCompactionError {
313 ResultCompactionError::StorageReserveFailed {
314 field,
315 requested,
316 message,
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn result_compaction_packs_small_outputs_and_skips_empty_slots() {
326 let plan =
327 plan_result_compaction(&[slot(2, 0, 128), slot(1, 12, 128), slot(3, 24, 256)], 32)
328 .expect("Fix: small outputs should compact");
329
330 assert_eq!(
331 plan.compact_records,
332 vec![
333 CompactResultRecord {
334 slot: 1,
335 compact_offset: 0,
336 bytes: 12,
337 },
338 CompactResultRecord {
339 slot: 3,
340 compact_offset: 12,
341 bytes: 24,
342 },
343 ]
344 );
345 assert_eq!(plan.direct_slots, Vec::<u32>::new());
346 assert_eq!(plan.full_capacity_bytes, 512);
347 assert_eq!(plan.compact_bytes, 36);
348 assert_eq!(plan.direct_bytes, 0);
349 assert_eq!(plan.selected_readback_bytes, 36);
350 assert_eq!(plan.avoided_readback_bytes, 476);
351 assert_eq!(plan.avoided_readback_basis_points, 9_296);
352 }
353
354 #[test]
355 fn result_compaction_keeps_large_outputs_direct() {
356 let plan = plan_result_compaction(&[slot(1, 64, 128), slot(2, 512, 1_024)], 128)
357 .expect("Fix: mixed outputs should plan");
358
359 assert_eq!(plan.compact_records.len(), 1);
360 assert_eq!(plan.direct_slots, vec![2]);
361 assert_eq!(plan.full_capacity_bytes, 1_152);
362 assert_eq!(plan.compact_bytes, 64);
363 assert_eq!(plan.direct_bytes, 512);
364 assert_eq!(plan.selected_readback_bytes, 576);
365 assert_eq!(plan.avoided_readback_bytes, 576);
366 assert_eq!(plan.avoided_readback_basis_points, 5_000);
367 }
368
369 #[test]
370 fn result_compaction_reports_zero_work_telemetry_without_division() {
371 let plan = plan_result_compaction(&[slot(4, 0, 0), slot(9, 0, 0)], 128)
372 .expect("Fix: zero-capacity outputs should plan");
373
374 assert!(plan.compact_records.is_empty());
375 assert!(plan.direct_slots.is_empty());
376 assert_eq!(plan.full_capacity_bytes, 0);
377 assert_eq!(plan.compact_bytes, 0);
378 assert_eq!(plan.direct_bytes, 0);
379 assert_eq!(plan.selected_readback_bytes, 0);
380 assert_eq!(plan.avoided_readback_bytes, 0);
381 assert_eq!(plan.avoided_readback_basis_points, 0);
382 }
383
384 #[test]
385 fn result_compaction_rejects_invalid_slots() {
386 assert_eq!(
387 plan_result_compaction(&[slot(1, 1, 8), slot(1, 1, 8)], 4)
388 .expect_err("duplicate slots should fail"),
389 ResultCompactionError::DuplicateSlot { slot: 1 }
390 );
391 assert_eq!(
392 plan_result_compaction(&[slot(2, 9, 8)], 4)
393 .expect_err("meaningful bytes above capacity should fail"),
394 ResultCompactionError::MeaningfulExceedsCapacity {
395 slot: 2,
396 meaningful_bytes: 9,
397 capacity_bytes: 8,
398 }
399 );
400 }
401
402 #[test]
403 fn result_compaction_avoids_tree_sets_and_slot_vector_copies() {
404 let src = include_str!("result_compaction.rs");
405 assert!(
406 !src.contains(concat!("BTree", "Set")),
407 "Fix: result compaction duplicate detection should use a hash set; slot ordering should be a final index sort."
408 );
409 assert!(
410 !src.contains(concat!("slots", ".to_vec()")),
411 "Fix: result compaction should sort slot indices rather than copying every slot before planning readback."
412 );
413 assert!(
414 !src.contains(concat!(".", "saturating_sub")),
415 "Fix: result compaction avoided-readback accounting must be exact, not saturating."
416 );
417 assert!(
418 !src.contains(concat!(" as ", "f32")) && !src.contains(concat!(" as ", "f64")),
419 "Fix: result compaction efficiency telemetry must use integer arithmetic, not lossy floats."
420 );
421 assert!(
422 src.contains("pub full_capacity_bytes: u64")
423 && src.contains("pub selected_readback_bytes: u64")
424 && src.contains("pub avoided_readback_basis_points: u32"),
425 "Fix: result compaction plans must expose checked capacity and integer reduction telemetry."
426 );
427 assert!(src.contains("RESULT_COMPACTION_NUMERIC.ratio_basis_points_u64"));
428 assert!(
429 !src.contains(concat!("fn ", "ratio_basis_points(")),
430 "Fix: result compaction must not carry a local numeric wrapper around the shared numeric policy."
431 );
432 assert!(
433 src.contains("ResultCompactionScratch::try_with_capacity(slots.len())?"),
434 "Fix: result compaction must stage scratch with fallible release-path allocation."
435 );
436 assert!(
437 src.contains("scratch.try_reserve_slots(slots.len())?"),
438 "Fix: caller-owned result compaction scratch must grow through fallible reservation."
439 );
440 assert!(
441 src.contains("ReusableIndexScratch"),
442 "Fix: result compaction duplicate detection and ordering scratch must share the paired typed fallible reservation helper."
443 );
444 assert!(
445 src.contains("StorageReserveFailed"),
446 "Fix: result compaction allocation failures must surface as actionable launch-planning errors."
447 );
448 assert!(
449 !src.contains(concat!("FxHashSet::with_capacity", "_and_hasher")),
450 "Fix: result compaction scratch hash storage must not allocate infallibly."
451 );
452 assert!(
453 !src.contains(concat!("Vec::with_capacity", "(slot_count)"))
454 && !src.contains(concat!("Vec::with_capacity", "(slots.len())")),
455 "Fix: result compaction scratch/result vectors must not allocate infallibly."
456 );
457 }
458
459 #[test]
460 fn result_compaction_reuses_caller_owned_slot_planning_scratch() {
461 let mut scratch =
462 ResultCompactionScratch::try_with_capacity(96).expect("Fix: scratch capacity");
463 let wide = (0..96)
464 .rev()
465 .map(|index| slot(index, 8, 64))
466 .collect::<Vec<_>>();
467 let first = plan_result_compaction_with_scratch(&wide, 16, &mut scratch)
468 .expect("Fix: wide compact result set should plan with reusable scratch");
469 let id_capacity = scratch.id_capacity();
470 let ordered_index_capacity = scratch.ordered_index_capacity();
471
472 assert_eq!(first.compact_records.len(), 96);
473 assert_eq!(first.compact_records[0].slot, 0);
474
475 let second = plan_result_compaction_with_scratch(
476 &[slot(7, 0, 128), slot(3, 512, 1_024), slot(5, 16, 128)],
477 32,
478 &mut scratch,
479 )
480 .expect("Fix: smaller mixed result set should reuse previous scratch");
481
482 assert_eq!(second.compact_records[0].slot, 5);
483 assert_eq!(second.direct_slots, vec![3]);
484 assert!(scratch.id_capacity() >= id_capacity);
485 assert!(scratch.ordered_index_capacity() >= ordered_index_capacity);
486 }
487
488 #[test]
489 fn generated_result_compaction_profiles_preserve_exact_telemetry_for_4096_shapes() {
490 let mut scratch = ResultCompactionScratch::default();
491 for slot_count in 1u32..=128 {
492 for compact_threshold in 0u64..32 {
493 let slots = (0..slot_count)
494 .rev()
495 .map(|slot_id| {
496 let meaningful = u64::from((slot_id % 17) + 1);
497 ResultSlot {
498 slot: slot_id,
499 meaningful_bytes: meaningful,
500 capacity_bytes: meaningful + compact_threshold + 8,
501 }
502 })
503 .collect::<Vec<_>>();
504
505 let plan =
506 plan_result_compaction_with_scratch(&slots, compact_threshold, &mut scratch)
507 .expect("Fix: generated result compaction profile should plan");
508
509 let expected_full = slots.iter().map(|slot| slot.capacity_bytes).sum::<u64>();
510 let expected_selected = slots.iter().map(|slot| slot.meaningful_bytes).sum::<u64>();
511 assert_eq!(plan.full_capacity_bytes, expected_full);
512 assert_eq!(plan.selected_readback_bytes, expected_selected);
513 assert_eq!(
514 plan.avoided_readback_bytes,
515 expected_full - expected_selected
516 );
517 assert!(plan
518 .compact_records
519 .windows(2)
520 .all(|pair| pair[0].slot < pair[1].slot));
521 assert!(plan.direct_slots.windows(2).all(|pair| pair[0] < pair[1]));
522 }
523 }
524 }
525
526 fn slot(slot: u32, meaningful_bytes: u64, capacity_bytes: u64) -> ResultSlot {
527 ResultSlot {
528 slot,
529 meaningful_bytes,
530 capacity_bytes,
531 }
532 }
533}