Skip to main content

vernier_core/
stream.rs

1//! Streaming evaluator (ADR-0013).
2//!
3//! Composes around the locked spine: each `update()` runs `match_image`
4//! over the new detections (via the existing `evaluate_with` path) and
5//! appends one [`PerImageEval`] per `(category, area, image)` cell to an
6//! internal sparse store. [`StreamingEvaluator::snapshot`] and
7//! [`StreamingEvaluator::finalize`] call [`crate::accumulate`] over the
8//! current store unchanged.
9//!
10//! Per ADR-0005 this module does NOT edit `matching.rs` or
11//! `accumulate.rs`. It only orchestrates.
12//!
13//! ## v0 deferrals
14//!
15//! - Strict-mode `(score, stream_position)` tiebreak is not yet wired
16//!   through the matching path; `next_dt_id` carries the monotonic
17//!   counter for the future implementation.
18
19use std::collections::{HashMap, HashSet};
20use std::mem::size_of;
21
22use crate::accumulate::{accumulate, AccumulateParams, Accumulated, PerImageEval};
23use crate::dataset::{
24    AnnId, CategoryId, CocoDataset, CocoDetection, CocoDetections, DetectionInput, EvalDataset,
25    ImageId,
26};
27use crate::error::EvalError;
28use crate::evaluate::{evaluate_with, EvalImageMeta, EvalKernel, OwnedEvaluateParams};
29use crate::parity::{recall_thresholds, ParityMode};
30use crate::summarize::{summarize_detection, summarize_with, StatRequest, Summary};
31
32/// Default fallback when [`MemoryBudget::system_total_bytes`] returns
33/// `None`. 8 GiB.
34const DEFAULT_BUDGET_BYTES: usize = 8 * 1024 * 1024 * 1024;
35
36/// Soft-warn fraction of the budget. Once `total_used >= soft_warn *
37/// budget`, the next [`UpdateReport`] sets `soft_warn_triggered = true`
38/// (one-shot per evaluator).
39const DEFAULT_SOFT_WARN_FRACTION: f64 = 0.80;
40
41/// Memory budget for a [`StreamingEvaluator`].
42///
43/// The streaming evaluator holds one [`PerImageEval`] per
44/// `(category, area, image)` cell that has received any detection. This
45/// can grow large for big datasets with many categories — the budget is
46/// the cap that bounds it.
47#[derive(Debug, Clone, Copy)]
48pub struct MemoryBudget {
49    /// Hard cap, in bytes. `update()` returns
50    /// [`EvalError::OutOfBudget`] before inserting any cell that would
51    /// take the total past this number.
52    pub bytes: usize,
53    /// Soft-warn threshold as a fraction of `bytes` (typically `0.80`).
54    /// Once crossed, the next [`UpdateReport`] flags
55    /// `soft_warn_triggered`. Set to `> 1.0` to disable.
56    pub soft_warn_fraction: f64,
57}
58
59impl MemoryBudget {
60    /// `min(8 GiB, system_total / 2)`. The system-total query is
61    /// best-effort; a read failure falls back to 8 GiB.
62    pub fn auto_default() -> Self {
63        let half_total = Self::system_total_bytes()
64            .map(|t| t / 2)
65            .unwrap_or(DEFAULT_BUDGET_BYTES);
66        Self {
67            bytes: DEFAULT_BUDGET_BYTES.min(half_total),
68            soft_warn_fraction: DEFAULT_SOFT_WARN_FRACTION,
69        }
70    }
71
72    /// Read-the-system-once helper. Reads `/proc/meminfo` on Linux,
73    /// returns `None` on other platforms or on read/parse failure.
74    fn system_total_bytes() -> Option<usize> {
75        if cfg!(target_os = "linux") {
76            let contents = std::fs::read_to_string("/proc/meminfo").ok()?;
77            for line in contents.lines() {
78                if let Some(rest) = line.strip_prefix("MemTotal:") {
79                    let rest = rest.trim();
80                    let kb_part = rest.strip_suffix(" kB")?;
81                    let kb: usize = kb_part.trim().parse().ok()?;
82                    return Some(kb.saturating_mul(1024));
83                }
84            }
85            None
86        } else {
87            None
88        }
89    }
90}
91
92/// Static metadata describing the `(K, A, I)` evaluation grid the
93/// streaming evaluator targets.
94///
95/// Built once at construction from the [`CocoDataset`] and the
96/// [`OwnedEvaluateParams`] — never mutated thereafter. Mirrors the axes
97/// the unchanged batch [`crate::evaluate_with`] orchestrator emits.
98#[derive(Debug, Clone)]
99pub struct EvalGridMeta {
100    /// `K` axis size (number of categories, or `1` when `use_cats=false`).
101    pub n_categories: usize,
102    /// `A` axis size (number of area ranges).
103    pub n_area_ranges: usize,
104    /// `I` axis size (number of images in the GT dataset).
105    pub n_images: usize,
106    /// Maps each GT [`CategoryId`] to its position on the K-axis. Empty
107    /// when `use_cats=false`.
108    pub category_id_to_idx: HashMap<CategoryId, usize>,
109    /// Maps each GT [`ImageId`] to its position on the I-axis.
110    pub image_id_to_idx: HashMap<ImageId, usize>,
111}
112
113/// Sparse `(k, a, i) -> PerImageEval` store backing the streaming
114/// evaluator.
115///
116/// Cells absent from the store represent the same "no detections, no
117/// non-ignore GTs" condition as `None` entries in the batch
118/// [`crate::EvalGrid::eval_imgs`]. [`Self::flatten`] re-densifies the
119/// store into the dense `Vec<Option<PerImageEval>>` shape
120/// [`crate::accumulate`] consumes.
121#[derive(Debug, Clone, Default)]
122pub struct PerImageEvalStore {
123    /// Sparse cells keyed by `(k, a, i)`.
124    cells: HashMap<(usize, usize, usize), PerImageEval>,
125}
126
127impl PerImageEvalStore {
128    /// Empty store.
129    pub fn new() -> Self {
130        Self::default()
131    }
132
133    /// Number of populated cells.
134    pub fn len(&self) -> usize {
135        self.cells.len()
136    }
137
138    /// `true` if no cells have been inserted.
139    pub fn is_empty(&self) -> bool {
140        self.cells.is_empty()
141    }
142
143    /// Insert (or overwrite) the cell at `(k, a, i)`.
144    pub fn insert(&mut self, k: usize, a: usize, i: usize, cell: PerImageEval) {
145        self.cells.insert((k, a, i), cell);
146    }
147
148    /// Borrow the underlying `(k, a, i) -> PerImageEval` map. Used by
149    /// the distributed-eval encoder (ADR-0031) to walk cells in
150    /// canonical order.
151    pub(crate) fn as_map(&self) -> &HashMap<(usize, usize, usize), PerImageEval> {
152        &self.cells
153    }
154
155    /// Move-out variant of [`Self::as_map`] used by the
156    /// `from_partials` constructor to swap a freshly merged store
157    /// in.
158    pub(crate) fn from_map(cells: HashMap<(usize, usize, usize), PerImageEval>) -> Self {
159        Self { cells }
160    }
161
162    /// Densify into the `[k * A * I + a * I + i]`-laid-out
163    /// `Vec<Option<Box<PerImageEval>>>` that [`crate::accumulate`]
164    /// consumes. Cloning is intentional — `accumulate` borrows the
165    /// slice and the store must remain valid for further updates after
166    /// a snapshot. The cells are heap-boxed so the dense slot Vec only
167    /// pays for a pointer per slot at zero-init time (see
168    /// [`crate::EvalGrid::eval_imgs`]).
169    pub fn flatten(&self, meta: &EvalGridMeta) -> Vec<Option<Box<PerImageEval>>> {
170        let total = meta.n_categories * meta.n_area_ranges * meta.n_images;
171        let mut out: Vec<Option<Box<PerImageEval>>> = Vec::with_capacity(total);
172        for k in 0..meta.n_categories {
173            for a in 0..meta.n_area_ranges {
174                for i in 0..meta.n_images {
175                    out.push(self.cells.get(&(k, a, i)).cloned().map(Box::new));
176                }
177            }
178        }
179        out
180    }
181}
182
183/// Bundle returned by [`StreamingEvaluator::snapshot_with_cells`]:
184/// the canonical [`Summary`] plus the per-image cell store needed by
185/// the ADR-0018 calibration summarizer.
186///
187/// The cell store is a dense `Vec<Option<Box<PerImageEval>>>` in
188/// `k * A * I + a * I + i` row-major order — the same shape
189/// [`crate::accumulate`] and
190/// [`crate::calibration::summarize_calibration`] both consume directly.
191/// `n_categories`, `n_area_ranges`, `iou_thresholds`, and `parity_mode`
192/// are mirrored alongside so the FFI handle can build an
193/// `EvalCells` without re-deriving them from a [`StreamingEvaluator`]
194/// reference that may be on the worker thread.
195#[derive(Debug, Clone)]
196pub struct SnapshotWithCells {
197    /// Canonical [`Summary`] — bit-identical to what
198    /// [`StreamingEvaluator::snapshot`] would have produced for the
199    /// same evaluator state.
200    pub summary: Summary,
201    /// Dense per-image cell store, GT-only overlay applied — directly
202    /// consumable by [`crate::calibration::summarize_calibration`].
203    pub eval_imgs: Vec<Option<Box<PerImageEval>>>,
204    /// `K` axis size mirrored from [`EvalGridMeta::n_categories`].
205    pub n_categories: usize,
206    /// `A` axis size mirrored from [`EvalGridMeta::n_area_ranges`].
207    pub n_area_ranges: usize,
208    /// Pinned IoU thresholds — the kernel's T-axis.
209    pub iou_thresholds: Vec<f64>,
210    /// Parity mode the evaluator was constructed under.
211    pub parity_mode: ParityMode,
212}
213
214/// Diagnostics returned from each [`StreamingEvaluator::update`] call.
215///
216/// Useful for training-loop logging (TensorBoard, console). All counters
217/// describe the *batch* that produced this report, not the cumulative
218/// totals (see [`StreamingEvaluator::detections_seen`] etc. for those).
219#[derive(Debug, Clone)]
220pub struct UpdateReport {
221    /// Number of detections accepted in this batch (after duplicate
222    /// rejection).
223    pub n_detections_accepted: usize,
224    /// Number of distinct images that received detections in this batch.
225    pub n_images_in_batch: usize,
226    /// Number of populated cells produced and inserted.
227    pub n_cells_inserted: usize,
228    /// `true` exactly on the *first* report whose post-insert total
229    /// crosses the soft-warn threshold (one-shot per evaluator).
230    pub soft_warn_triggered: bool,
231}
232
233/// Pre-parsed detection batch.
234///
235/// The streaming evaluator's normal entry point is [`StreamingEvaluator::update`]
236/// (raw JSON bytes); this struct names the parsed-but-not-yet-evaluated
237/// shape for callers that want to amortize the parse cost or supply
238/// detections from a non-JSON source. Generic over [`EvalKernel`] so the
239/// parsed form is type-tied to the evaluator's kernel choice — feeding a
240/// `ParsedDetections<BboxIou>` into a `StreamingEvaluator<OksSimilarity>`
241/// is a compile-time error.
242#[derive(Debug, Clone)]
243pub struct ParsedDetections<K: EvalKernel> {
244    /// Parsed detections.
245    pub detections: CocoDetections,
246    /// Type marker tying these detections to a specific kernel.
247    _kernel: std::marker::PhantomData<K>,
248}
249
250impl<K: EvalKernel> ParsedDetections<K> {
251    /// Wrap a pre-built [`CocoDetections`] for use with a streaming
252    /// evaluator of kernel `K`.
253    pub fn from_detections(detections: CocoDetections) -> Self {
254        Self {
255            detections,
256            _kernel: std::marker::PhantomData,
257        }
258    }
259
260    /// Parse detections from the `loadRes`-shaped JSON byte slice.
261    ///
262    /// # Errors
263    ///
264    /// Propagates [`EvalError::Json`] / [`EvalError::NonFinite`] from
265    /// the underlying [`CocoDetections::from_json_bytes`].
266    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, EvalError> {
267        Ok(Self::from_detections(CocoDetections::from_json_bytes(
268            bytes,
269        )?))
270    }
271}
272
273/// Streaming COCO evaluator, ADR-0013.
274///
275/// Holds a [`CocoDataset`] plus a sparse store of [`PerImageEval`] cells
276/// produced by per-batch `match_image` calls. [`Self::snapshot`] runs
277/// [`accumulate`] + summarize over the current store at any time;
278/// [`Self::finalize`] consumes the evaluator and returns the same
279/// [`Summary`]. Bit-identical to a batch run over the union of all
280/// `update()` batches submitted in order.
281///
282/// When `params.retain_iou` is `true`, the evaluator additionally
283/// retains a [`crate::tables::RetainedIous`] store keyed by `(k, i)`, populated
284/// incrementally as each batch's `evaluate_with` returns its per-batch
285/// retentions. Consumed by the per_pair / per_detection table builders.
286#[derive(Debug)]
287pub struct StreamingEvaluator<K: EvalKernel> {
288    dataset: CocoDataset,
289    kernel: K,
290    params: OwnedEvaluateParams,
291    parity_mode: ParityMode,
292    grid_meta: EvalGridMeta,
293    cells: PerImageEvalStore,
294    /// Per-`(k, a, i)` `EvalImageMeta` retained alongside `cells` when
295    /// `params.retain_iou` is true. Empty otherwise. Required by the
296    /// per_detection / per_pair builders for `dt_ids` / `gt_ids` / etc.
297    meta_cells: HashMap<(usize, usize, usize), EvalImageMeta>,
298    /// Per-`(category, image)` IoU matrices retained across `update()`
299    /// calls. `None` on the default path (`params.retain_iou=false`);
300    /// `Some` when retention was opted into at construction.
301    retained_ious: Option<crate::tables::RetainedIous>,
302    /// Detection records accumulated across `update()` calls, kept only
303    /// when `params.retain_iou` is true (consumed by `per_detection`'s
304    /// `area` / optional `bbox` columns). Records are owned and carry
305    /// their original ids; flushed into a fresh `CocoDetections` view
306    /// at finalize/snapshot time.
307    dets_seen: Vec<CocoDetection>,
308    seen_images: HashSet<i64>,
309    /// Image-grid indices for every entry in `seen_images`. Maintained
310    /// incrementally so `compute_summary` can decide which GT-only
311    /// cells to overlay without re-walking `seen_images` every call.
312    seen_image_indices: HashSet<usize>,
313    /// Lazily-built GT-only `(K, A, I)` grid used by `compute_summary`
314    /// to file cells for images that have not yet received any
315    /// detection. GT is immutable, so the grid is computed at most
316    /// once across the evaluator's lifetime.
317    gt_only_cells: Option<Vec<Option<Box<PerImageEval>>>>,
318    n_detections: usize,
319    /// Monotonic DT-id counter. Reserved for the strict-mode
320    /// `(score, stream_position)` tiebreak (ADR-0013 §Determinism); not
321    /// yet consumed by the matching path.
322    next_dt_id: i64,
323    /// Optional rank identifier for distributed-eval merge (ADR-0031).
324    /// `None` for single-rank usage. Set via [`Self::with_rank`] before
325    /// the first `update`. Carried in the partial header and used as
326    /// the strict-mode `(rank_id, local_position)` tiebreak when the
327    /// matching path consumes it.
328    rank_id: Option<crate::distributed::RankId>,
329    bytes_cells_struct: usize,
330    bytes_dt_scores: usize,
331    bytes_match_flags: usize,
332    budget: MemoryBudget,
333    soft_warn_fired: bool,
334}
335
336impl<K: EvalKernel> StreamingEvaluator<K> {
337    /// Construct a new streaming evaluator.
338    ///
339    /// # Errors
340    ///
341    /// Returns [`EvalError::InvalidConfig`] if `params.area_ranges` is
342    /// empty (the batch path tolerates this; the streaming evaluator
343    /// rejects it because the `(K, A, I)` grid would be degenerate).
344    pub fn new(
345        dataset: CocoDataset,
346        kernel: K,
347        params: OwnedEvaluateParams,
348        parity_mode: ParityMode,
349        budget: MemoryBudget,
350    ) -> Result<Self, EvalError> {
351        if params.area_ranges.is_empty() {
352            return Err(EvalError::InvalidConfig {
353                detail: "OwnedEvaluateParams.area_ranges must be non-empty".into(),
354            });
355        }
356        let grid_meta = build_grid_meta(&dataset, &params);
357        let retained_ious = if params.retain_iou {
358            Some(crate::tables::RetainedIous::new())
359        } else {
360            None
361        };
362        Ok(Self {
363            dataset,
364            kernel,
365            params,
366            parity_mode,
367            grid_meta,
368            cells: PerImageEvalStore::new(),
369            meta_cells: HashMap::new(),
370            retained_ious,
371            dets_seen: Vec::new(),
372            seen_images: HashSet::new(),
373            seen_image_indices: HashSet::new(),
374            gt_only_cells: None,
375            n_detections: 0,
376            next_dt_id: 1,
377            rank_id: None,
378            bytes_cells_struct: 0,
379            bytes_dt_scores: 0,
380            bytes_match_flags: 0,
381            budget,
382            soft_warn_fired: false,
383        })
384    }
385
386    /// Set this evaluator's rank identifier for distributed-eval merge
387    /// (ADR-0031). Builder shape: returns `Self`. Calling this after
388    /// the first [`Self::update`] is a programming error and returns
389    /// [`EvalError::InvalidConfig`] — rank identity is a
390    /// construction-time property, not a mid-run mutable parameter.
391    ///
392    /// # Errors
393    ///
394    /// [`EvalError::InvalidConfig`] when `n_detections > 0`.
395    pub fn with_rank(mut self, rank_id: crate::distributed::RankId) -> Result<Self, EvalError> {
396        if self.n_detections > 0 {
397            return Err(EvalError::InvalidConfig {
398                detail: "with_rank must be called before any update; rank identity is fixed at construction".into(),
399            });
400        }
401        self.rank_id = Some(rank_id);
402        Ok(self)
403    }
404
405    /// The rank id this evaluator was tagged with, if any.
406    pub fn rank_id(&self) -> Option<crate::distributed::RankId> {
407        self.rank_id
408    }
409
410    /// Number of distinct images with at least one accepted detection.
411    pub fn images_seen(&self) -> usize {
412        self.seen_images.len()
413    }
414
415    /// Number of detections accepted across all `update()` calls.
416    pub fn detections_seen(&self) -> usize {
417        self.n_detections
418    }
419
420    /// Number of GT images that have not yet received any detection.
421    pub fn images_pending(&self) -> usize {
422        self.grid_meta.n_images.saturating_sub(self.images_seen())
423    }
424
425    /// Total bytes the evaluator currently holds (sum of the three
426    /// breakdown components).
427    pub fn memory_used_bytes(&self) -> usize {
428        self.bytes_cells_struct + self.bytes_dt_scores + self.bytes_match_flags
429    }
430
431    /// View of the configured budget.
432    pub fn budget(&self) -> MemoryBudget {
433        self.budget
434    }
435
436    /// Read-only access to the static grid metadata.
437    pub fn grid_meta(&self) -> &EvalGridMeta {
438        &self.grid_meta
439    }
440
441    /// Read-only access to the per-`(category, image)` IoU matrices
442    /// retained when `params.retain_iou` was set at construction. `None`
443    /// on the default no-retention path.
444    pub fn retained_ious(&self) -> Option<&crate::tables::RetainedIous> {
445        self.retained_ious.as_ref()
446    }
447
448    /// Update with a new batch of detections, parsed from
449    /// `loadRes`-shaped JSON bytes.
450    ///
451    /// # Errors
452    ///
453    /// Propagates [`EvalError`] from the parse path, the underlying
454    /// [`evaluate_with`] call, and the budget check
455    /// ([`EvalError::OutOfBudget`]). On any error the evaluator state
456    /// is unchanged and remains usable.
457    pub fn update(&mut self, json_bytes: &[u8]) -> Result<UpdateReport, EvalError> {
458        let parsed = ParsedDetections::<K>::from_json_bytes(json_bytes)?;
459        self.update_parsed(parsed)
460    }
461
462    /// Update with a pre-parsed batch.
463    ///
464    /// # Errors
465    ///
466    /// - [`EvalError::InvalidAnnotation`] if any detection's `image_id`
467    ///   has already been processed in a prior `update()` call (no
468    ///   silent merge — submit duplicates as a single batch instead).
469    /// - [`EvalError::OutOfBudget`] if the projected post-insert total
470    ///   crosses [`MemoryBudget::bytes`]. State is unchanged on error.
471    /// - Any [`EvalError`] from the underlying [`evaluate_with`] call.
472    pub fn update_parsed(
473        &mut self,
474        parsed: ParsedDetections<K>,
475    ) -> Result<UpdateReport, EvalError> {
476        let detections = parsed.detections;
477
478        // Reject any detection whose image_id was already seen in a
479        // prior batch. This keeps update() additive: each cell is built
480        // exactly once and never mutated, which is what makes
481        // finalize() bit-identical to a batch run.
482        let mut batch_image_ids: HashSet<i64> = HashSet::new();
483        for dt in detections.detections() {
484            let id = dt.image_id.0;
485            if self.seen_images.contains(&id) {
486                return Err(EvalError::InvalidAnnotation {
487                    detail: format!(
488                        "image_id={id} was already submitted in a prior update(); \
489                         StreamingEvaluator does not silently merge — submit all \
490                         detections for an image in a single batch"
491                    ),
492                });
493            }
494            batch_image_ids.insert(id);
495        }
496
497        // Run the unchanged batch orchestrator over just this batch's
498        // detections. The grid it returns has the same `(K, A, I)`
499        // shape as the streaming evaluator's target grid (we share the
500        // dataset and params), but the orchestrator iterates the *full*
501        // GT image set — every image with any GTs produces cells, even
502        // when no detection in this batch landed on it. We filter those
503        // out below: streaming semantics file a cell exactly once,
504        // when its image first appears in a batch.
505        let mut grid = evaluate_with(
506            &self.dataset,
507            &detections,
508            self.params.borrow(),
509            self.parity_mode,
510            &self.kernel,
511        )?;
512
513        // Map batch image_ids to their I-axis indices so we can keep
514        // only the cells whose image appears in this batch. Anything
515        // else would (a) double-count GTs on every empty / partial
516        // update and (b) trip the duplicate-image_id guard the next
517        // time those images receive their own detections.
518        let mut batch_image_indices: HashSet<usize> = HashSet::with_capacity(batch_image_ids.len());
519        for id in &batch_image_ids {
520            if let Some(&idx) = self.grid_meta.image_id_to_idx.get(&ImageId(*id)) {
521                batch_image_indices.insert(idx);
522            }
523            // Unknown image_ids fall through silently; the underlying
524            // gather treats them as "no GTs / no DTs of interest" so
525            // they produce no cells anyway.
526        }
527
528        // Pre-compute insertion cost. Iterate batch images first so the
529        // walk is `O(batch_size * K * A)` instead of `O(I * K * A)` —
530        // critical at COCO scale where `I` is in the thousands but a
531        // typical training-loop batch covers tens of images.
532        let n_t = self.params.iou_thresholds.len();
533        let n_k = grid.n_categories;
534        let n_a = grid.n_area_ranges;
535        let n_i = grid.n_images;
536        let mut staged: Vec<(usize, usize, usize, PerImageEval, CellCost)> = Vec::new();
537        let mut cost_total = CellCost::default();
538        for &i in &batch_image_indices {
539            for k in 0..n_k {
540                for a in 0..n_a {
541                    let flat = k * n_a * n_i + a * n_i + i;
542                    if let Some(cell) = grid.eval_imgs.get(flat).and_then(|opt| opt.as_deref()) {
543                        let cost = cell_cost(cell, n_t);
544                        cost_total = cost_total.add(cost);
545                        staged.push((k, a, i, cell.clone(), cost));
546                    }
547                }
548            }
549        }
550
551        // Budget check: do not mutate state on overflow.
552        let projected = self.memory_used_bytes() + cost_total.total();
553        if projected > self.budget.bytes {
554            let mut breakdown: HashMap<&'static str, usize> = HashMap::new();
555            breakdown.insert(
556                "cells_store",
557                self.bytes_cells_struct + cost_total.cells_struct,
558            );
559            breakdown.insert("scores", self.bytes_dt_scores + cost_total.dt_scores);
560            breakdown.insert(
561                "match_flags",
562                self.bytes_match_flags + cost_total.match_flags,
563            );
564            return Err(EvalError::OutOfBudget {
565                used_bytes: projected,
566                budget_bytes: self.budget.bytes,
567                breakdown,
568            });
569        }
570
571        // All-or-nothing commit. The error paths above ran before any
572        // mutation; from here, every step is infallible.
573        let n_cells_inserted = staged.len();
574        for (k, a, i, cell, cost) in staged {
575            self.cells.insert(k, a, i, cell);
576            self.bytes_cells_struct += cost.cells_struct;
577            self.bytes_dt_scores += cost.dt_scores;
578            self.bytes_match_flags += cost.match_flags;
579        }
580
581        // Streaming rejects duplicate image_ids earlier, so each (k, i)
582        // entry is inserted at most once across all `update()` calls.
583        if let (Some(store), Some(per_batch)) =
584            (self.retained_ious.as_mut(), grid.retained_ious.as_mut())
585        {
586            for k in 0..n_k {
587                for &i in &batch_image_indices {
588                    if let Some(iou) = per_batch.remove(k, i) {
589                        store.insert(k, i, iou);
590                    }
591                }
592            }
593        }
594
595        // Retain `EvalImageMeta` and detection records when `retain_iou`
596        // is on — both are needed by per_detection / per_pair table
597        // builders. Cells without retention skip this work entirely.
598        if self.params.retain_iou {
599            for &i in &batch_image_indices {
600                for k in 0..n_k {
601                    for a in 0..n_a {
602                        let flat = k * n_a * n_i + a * n_i + i;
603                        if let Some(meta) = grid
604                            .eval_imgs_meta
605                            .get_mut(flat)
606                            .and_then(Option::take)
607                            .map(|b| *b)
608                        {
609                            self.meta_cells.insert((k, a, i), meta);
610                        }
611                    }
612                }
613            }
614            self.dets_seen
615                .extend(detections.detections().iter().cloned());
616        }
617
618        let n_detections_accepted = detections.detections().len();
619        self.n_detections += n_detections_accepted;
620        self.next_dt_id = self.next_dt_id.saturating_add(n_detections_accepted as i64);
621        for id in &batch_image_ids {
622            self.seen_images.insert(*id);
623        }
624        for idx in &batch_image_indices {
625            self.seen_image_indices.insert(*idx);
626        }
627
628        let total_used = self.memory_used_bytes();
629        let threshold = (self.budget.bytes as f64 * self.budget.soft_warn_fraction) as usize;
630        let soft_warn_triggered = total_used >= threshold && !self.soft_warn_fired;
631        if soft_warn_triggered {
632            self.soft_warn_fired = true;
633        }
634
635        Ok(UpdateReport {
636            n_detections_accepted,
637            n_images_in_batch: batch_image_ids.len(),
638            n_cells_inserted,
639            soft_warn_triggered,
640        })
641    }
642
643    /// Compute a [`Summary`] over the current store. Cheap to call
644    /// repeatedly. Bit-identical to a batch run over the union of all
645    /// detections submitted via `update()` so far (modulo stream-order
646    /// ULP wobble in `corrected` mode — see ADR-0013 §Determinism).
647    ///
648    /// Takes `&mut self` because the first call materializes a cached
649    /// GT-only `(K, A, I)` grid for images that haven't received any
650    /// detection yet; subsequent snapshots reuse the cache.
651    ///
652    /// # Errors
653    ///
654    /// Propagates [`EvalError`] from the underlying [`accumulate`] or
655    /// summarize call.
656    pub fn snapshot(&mut self) -> Result<Summary, EvalError> {
657        self.compute_summary()
658    }
659
660    /// Consume the evaluator and return its final [`Summary`].
661    ///
662    /// # Errors
663    ///
664    /// Propagates [`EvalError`] from the underlying [`accumulate`] or
665    /// summarize call.
666    pub fn finalize(mut self) -> Result<Summary, EvalError> {
667        self.compute_summary()
668    }
669
670    /// Snapshot the current state, returning both the canonical
671    /// [`Summary`] and a deep copy of the per-image cell store needed
672    /// by the ADR-0018 calibration summarizer.
673    ///
674    /// The [`Summary`] is bit-identical to what [`Self::snapshot`] would
675    /// have produced for the same evaluator state — this method only
676    /// adds the cell-store retention; the kernel maths are unchanged.
677    /// Callers that don't need calibration should prefer
678    /// [`Self::snapshot`] / [`Self::finalize`]: this variant pays for
679    /// an extra densification of the sparse `(k, a, i)` store, with
680    /// GT-only overlay applied so the cells are immediately consumable
681    /// by [`crate::calibration::summarize_calibration`] (which folds
682    /// over `dt_scores` / `dt_matched` / `dt_ignore` and ignores
683    /// GT-only slots anyway, but the consistency keeps the streaming
684    /// and batch surfaces interchangeable).
685    ///
686    /// Takes `&mut self` for the same reason as [`Self::snapshot`]: the
687    /// first call materializes the cached GT-only `(K, A, I)` grid.
688    ///
689    /// # Errors
690    ///
691    /// Propagates [`EvalError`] from the underlying [`accumulate`] or
692    /// summarize call.
693    pub fn snapshot_with_cells(&mut self) -> Result<SnapshotWithCells, EvalError> {
694        self.compute_summary_and_cells()
695    }
696
697    /// Consume the evaluator and return both its final [`Summary`] and
698    /// the requested result tables.
699    ///
700    /// v0.5 supports the *cheap* tables ([`crate::tables::TablesRequest::per_image`],
701    /// [`crate::tables::TablesRequest::per_class`]) on the streaming path —
702    /// neither needs the per-cell `EvalImageMeta` the cells store would
703    /// have to also retain. `per_detection` / `per_pair` on streaming
704    /// returns [`EvalError::NotImplemented`]; callers who need those
705    /// today should run the same workload via [`crate::evaluate_with`]
706    /// in batch mode.
707    ///
708    /// # Errors
709    ///
710    /// - [`EvalError::NotImplemented`] when `request.per_detection` or
711    ///   `request.per_pair` is set.
712    /// - Any error from the underlying [`accumulate`] / summarize /
713    ///   [`crate::tables::build_per_image`] / [`crate::tables::build_per_class`] calls.
714    pub fn finalize_with_tables(
715        mut self,
716        request: crate::tables::TablesRequest,
717        config: &crate::tables::TablesConfig,
718    ) -> Result<(Summary, crate::tables::Tables), EvalError> {
719        self.compute_summary_and_tables(request, config)
720    }
721
722    /// Mid-stream version of [`Self::finalize_with_tables`]. See
723    /// [`Self::snapshot`] for the determinism caveat.
724    ///
725    /// # Errors
726    ///
727    /// Same conditions as [`Self::finalize_with_tables`].
728    pub fn snapshot_with_tables(
729        &mut self,
730        request: crate::tables::TablesRequest,
731        config: &crate::tables::TablesConfig,
732    ) -> Result<(Summary, crate::tables::Tables), EvalError> {
733        self.compute_summary_and_tables(request, config)
734    }
735
736    fn compute_summary_and_tables(
737        &mut self,
738        request: crate::tables::TablesRequest,
739        config: &crate::tables::TablesConfig,
740    ) -> Result<(Summary, crate::tables::Tables), EvalError> {
741        if request.requires_iou_retention() && !self.params.retain_iou {
742            return Err(EvalError::InvalidConfig {
743                detail: "per_detection / per_pair require retain_iou=True at \
744                         StreamingEvaluator construction; rebuild the evaluator \
745                         with retain_iou=True to opt in"
746                    .into(),
747            });
748        }
749
750        // Build a synthetic EvalGrid from the cells store + GT-only
751        // overlay, identical in shape to what `evaluate_with` would
752        // have produced for the union of all submitted batches. The
753        // dense `eval_imgs` slice fully drives `build_per_image` /
754        // `build_per_class`; `eval_imgs_meta` is densified from
755        // `meta_cells` for per_detection / per_pair, and
756        // `retained_ious` is cloned from the streaming store. GT-only
757        // overlay images leave `eval_imgs_meta[flat]` at None — they
758        // contribute no detections, so per_detection / per_pair skip them.
759        let eval_imgs = self.densify_with_gt_overlay()?;
760        let n_k = self.grid_meta.n_categories;
761        let n_a = self.grid_meta.n_area_ranges;
762        let n_i = self.grid_meta.n_images;
763        let total = n_k * n_a * n_i;
764        let eval_imgs_meta: Vec<Option<Box<EvalImageMeta>>> = if self.params.retain_iou {
765            let mut out: Vec<Option<Box<EvalImageMeta>>> = Vec::with_capacity(total);
766            for k in 0..n_k {
767                for a in 0..n_a {
768                    for i in 0..n_i {
769                        out.push(self.meta_cells.get(&(k, a, i)).cloned().map(Box::new));
770                    }
771                }
772            }
773            out
774        } else {
775            vec![None; total]
776        };
777
778        let synthetic_grid = crate::evaluate::EvalGrid {
779            eval_imgs,
780            eval_imgs_meta,
781            n_categories: n_k,
782            n_area_ranges: n_a,
783            n_images: n_i,
784            retained_ious: self.retained_ious.clone(),
785        };
786
787        let max_dets: [usize; 3] = [1, 10, 100];
788        let (summary, accumulated) = self.summarize_dense(&synthetic_grid.eval_imgs)?;
789
790        // Build a fresh `CocoDetections` view over every detection seen
791        // so far when per_detection is requested; cheaper tables don't
792        // need the records, so skip the work entirely there.
793        let detections_view = if request.per_detection {
794            Some(CocoDetections::from_records(self.dets_seen.clone()))
795        } else {
796            None
797        };
798        let tables = crate::tables::build_tables(
799            &synthetic_grid,
800            &accumulated,
801            &self.dataset,
802            detections_view.as_ref(),
803            self.retained_ious.as_ref(),
804            &self.params.iou_thresholds,
805            &max_dets,
806            request,
807            config,
808        )?;
809        Ok((summary, tables))
810    }
811
812    /// Serialize the current evaluator state to an opaque byte blob
813    /// (ADR-0031 partial wire format). Non-consuming variant — the
814    /// evaluator stays usable for further `update` calls.
815    ///
816    /// # Errors
817    ///
818    /// [`EvalError::PartialFormatMismatch`] if rkyv archiving fails;
819    /// [`EvalError::InvalidConfig`] if params hashing fails.
820    pub fn snapshot_to_partial(&self) -> Result<Vec<u8>, EvalError> {
821        crate::distributed::encode(&self.encode_input())
822    }
823
824    /// Consuming variant of [`Self::snapshot_to_partial`]. The
825    /// evaluator is dropped after the partial is produced — the
826    /// expected shape for the rank-local final state in a
827    /// distributed-eval gather (ADR-0031).
828    ///
829    /// # Errors
830    ///
831    /// Same as [`Self::snapshot_to_partial`].
832    pub fn finalize_to_partial(self) -> Result<Vec<u8>, EvalError> {
833        crate::distributed::encode(&self.encode_input())
834    }
835
836    fn encode_input(&self) -> crate::distributed::EncodeInput<'_, K> {
837        crate::distributed::EncodeInput {
838            dataset: &self.dataset,
839            kernel: &self.kernel,
840            params: &self.params,
841            parity_mode: self.parity_mode,
842            rank_id: self.rank_id,
843            n_categories: self.grid_meta.n_categories as u32,
844            n_area_ranges: self.grid_meta.n_area_ranges as u32,
845            n_images: self.grid_meta.n_images as u32,
846            n_detections: self.n_detections as u64,
847            next_dt_id: self.next_dt_id,
848            seen_images: &self.seen_images,
849            cells: self.cells.as_map(),
850            meta_cells: if self.params.retain_iou {
851                Some(&self.meta_cells)
852            } else {
853                None
854            },
855            retained_ious: self.retained_ious.as_ref(),
856            dets_seen: if self.params.retain_iou {
857                Some(self.dets_seen.as_slice())
858            } else {
859                None
860            },
861            retain_iou: self.params.retain_iou,
862        }
863    }
864
865    /// Construct an evaluator equivalent to a batch run over the
866    /// union of all partials' submitted detections (ADR-0031).
867    ///
868    /// All partials must share `dataset_hash`, `params_hash`,
869    /// `parity_mode`, kernel kind, `retain_iou`, and grid dimensions.
870    /// In strict mode, every partial must declare a distinct
871    /// `rank_id`. Image-id sets across partials must be disjoint.
872    ///
873    /// # Errors
874    ///
875    /// - [`EvalError::PartialFormatMismatch`] on framing or rkyv
876    ///   validation failures (magic, version, CRC, kernel kind, grid
877    ///   dims, parity, retain_iou).
878    /// - [`EvalError::PartialDatasetMismatch`] on dataset_hash
879    ///   divergence.
880    /// - [`EvalError::PartialParamsMismatch`] on params_hash
881    ///   divergence.
882    /// - [`EvalError::PartialPartitionOverlap`] when two partials
883    ///   cover the same `image_id`.
884    /// - [`EvalError::PartialRankCollision`] when two strict-mode
885    ///   partials share a `rank_id`.
886    pub fn from_partials(
887        dataset: CocoDataset,
888        kernel: K,
889        params: OwnedEvaluateParams,
890        parity_mode: ParityMode,
891        budget: MemoryBudget,
892        partials: &[&[u8]],
893    ) -> Result<Self, EvalError> {
894        let mut ev = Self::new(dataset, kernel, params, parity_mode, budget)?;
895        let expected = crate::distributed::instance_expectation(
896            &ev.dataset,
897            &ev.kernel,
898            &ev.params,
899            parity_mode,
900            ev.grid_meta.n_categories as u32,
901            ev.grid_meta.n_area_ranges as u32,
902            ev.grid_meta.n_images as u32,
903        )?;
904        let mut acc =
905            crate::distributed::InstanceMergeAccumulator::new(parity_mode == ParityMode::Strict);
906        acc.set_retain_iou(ev.params.retain_iou);
907        for bytes in partials {
908            vernier_partial::with_validated_envelope(bytes, &expected, |view| acc.ingest(&view))?;
909        }
910        ev.install_merged_state(acc)?;
911        Ok(ev)
912    }
913
914    /// Swap a freshly merged [`crate::distributed::InstanceMergeAccumulator`]
915    /// into this evaluator's spine state. Internal helper for
916    /// [`Self::from_partials`].
917    fn install_merged_state(
918        &mut self,
919        acc: crate::distributed::InstanceMergeAccumulator,
920    ) -> Result<(), EvalError> {
921        // Bind every load-bearing field by name (`base`, `retain_iou`
922        // are merge-internal bookkeeping that doesn't carry into the
923        // spine — destructure asserts a future field addition is a
924        // compile error rather than silent loss).
925        let crate::distributed::InstanceMergeAccumulator {
926            base,
927            n_detections,
928            next_dt_id,
929            cells,
930            meta_cells,
931            retained_ious_map,
932            dets_seen,
933            retain_iou: _,
934        } = acc;
935        self.n_detections = n_detections;
936        self.next_dt_id = next_dt_id;
937        // base.image_ids() is the union image-id set;
938        // seen_image_indices is the parallel local-index set under
939        // the live grid_meta.
940        self.seen_image_indices = base
941            .image_ids()
942            .filter_map(|id| self.grid_meta.image_id_to_idx.get(&ImageId(id)).copied())
943            .collect();
944        self.seen_images = base.image_ids().collect();
945        self.cells = PerImageEvalStore::from_map(cells);
946        self.meta_cells = meta_cells;
947        if self.params.retain_iou {
948            self.retained_ious = Some(crate::tables::RetainedIous::from_map(retained_ious_map));
949        }
950        self.dets_seen = dets_seen;
951        Ok(())
952    }
953
954    /// Compute the GT-only `(K, A, I)` grid once and cache it. Subsequent
955    /// snapshots reuse the cached grid; GT is immutable, so the
956    /// underlying `evaluate_with` result never goes stale.
957    fn ensure_gt_only_cells(&mut self) -> Result<(), EvalError> {
958        if self.gt_only_cells.is_some() {
959            return Ok(());
960        }
961        let empty_dt = CocoDetections::from_inputs(Vec::new())?;
962        let grid = evaluate_with(
963            &self.dataset,
964            &empty_dt,
965            self.params.borrow(),
966            self.parity_mode,
967            &self.kernel,
968        )?;
969        self.gt_only_cells = Some(grid.eval_imgs);
970        Ok(())
971    }
972
973    fn compute_summary_and_cells(&mut self) -> Result<SnapshotWithCells, EvalError> {
974        let eval_imgs = self.densify_with_gt_overlay()?;
975        let (summary, _accumulated) = self.summarize_dense(&eval_imgs)?;
976        Ok(SnapshotWithCells {
977            summary,
978            eval_imgs,
979            n_categories: self.grid_meta.n_categories,
980            n_area_ranges: self.grid_meta.n_area_ranges,
981            iou_thresholds: self.params.iou_thresholds.clone(),
982            parity_mode: self.parity_mode,
983        })
984    }
985
986    fn compute_summary(&mut self) -> Result<Summary, EvalError> {
987        let eval_imgs = self.densify_with_gt_overlay()?;
988        let (summary, _accumulated) = self.summarize_dense(&eval_imgs)?;
989        Ok(summary)
990    }
991
992    /// Densify the per-image cell store with the GT-only overlay for
993    /// images that never received a detection across the stream.
994    /// Without this overlay, streaming `finalize().stats` diverges
995    /// from `Evaluator.evaluate(...).stats` whenever GT contains
996    /// images with no DTs anywhere — see ADR-0013 §"Per-image cell
997    /// coverage". Mutates `self` only to populate the lazy
998    /// `gt_only_cells` cache on first call.
999    fn densify_with_gt_overlay(&mut self) -> Result<Vec<Option<Box<PerImageEval>>>, EvalError> {
1000        let mut eval_imgs = self.cells.flatten(&self.grid_meta);
1001        if self.images_seen() >= self.grid_meta.n_images {
1002            return Ok(eval_imgs);
1003        }
1004        self.ensure_gt_only_cells()?;
1005        let n_k = self.grid_meta.n_categories;
1006        let n_a = self.grid_meta.n_area_ranges;
1007        let n_i = self.grid_meta.n_images;
1008        let gt_only = self
1009            .gt_only_cells
1010            .as_ref()
1011            .ok_or_else(|| EvalError::InvalidConfig {
1012                detail: "gt_only_cells cache missing after init".into(),
1013            })?;
1014        for i in 0..n_i {
1015            if self.seen_image_indices.contains(&i) {
1016                continue;
1017            }
1018            for k in 0..n_k {
1019                for a in 0..n_a {
1020                    let flat = k * n_a * n_i + a * n_i + i;
1021                    if let Some(cell) = gt_only.get(flat).and_then(|opt| opt.as_ref()) {
1022                        eval_imgs[flat] = Some(cell.clone());
1023                    }
1024                }
1025            }
1026        }
1027        Ok(eval_imgs)
1028    }
1029
1030    /// Accumulate-and-summarize over a pre-densified `eval_imgs` slice
1031    /// at the parity-pinned COCO `max_dets=[1,10,100]` ladder, with the
1032    /// ADR-0012 keypoints fork (single-rung `[20]` ladder + 10-stat
1033    /// plan). Returns `(Summary, Accumulated)` so callers that need the
1034    /// intermediate accumulator (table builders) can reuse it without
1035    /// re-running. The returned `Accumulated` is always the default-
1036    /// ladder one — the keypoints kp-ladder accumulator is internal.
1037    fn summarize_dense(
1038        &self,
1039        eval_imgs: &[Option<Box<PerImageEval>>],
1040    ) -> Result<(Summary, Accumulated), EvalError> {
1041        let max_dets: [usize; 3] = [1, 10, 100];
1042        let accum_params = AccumulateParams {
1043            iou_thresholds: &self.params.iou_thresholds,
1044            recall_thresholds: recall_thresholds(),
1045            max_dets: &max_dets,
1046            n_categories: self.grid_meta.n_categories,
1047            n_area_ranges: self.grid_meta.n_area_ranges,
1048            n_images: self.grid_meta.n_images,
1049        };
1050        let accumulated = accumulate(eval_imgs, accum_params, self.parity_mode)?;
1051        let summary = if self.kernel.is_keypoints() {
1052            let kp_max_dets: [usize; 1] = [20];
1053            let accum_params_kp = AccumulateParams {
1054                iou_thresholds: &self.params.iou_thresholds,
1055                recall_thresholds: recall_thresholds(),
1056                max_dets: &kp_max_dets,
1057                n_categories: self.grid_meta.n_categories,
1058                n_area_ranges: self.grid_meta.n_area_ranges,
1059                n_images: self.grid_meta.n_images,
1060            };
1061            let accumulated_kp = accumulate(eval_imgs, accum_params_kp, self.parity_mode)?;
1062            let plan = StatRequest::coco_keypoints_default();
1063            summarize_with(
1064                &accumulated_kp,
1065                &plan,
1066                &self.params.iou_thresholds,
1067                &kp_max_dets,
1068            )?
1069        } else {
1070            summarize_detection(&accumulated, &self.params.iou_thresholds, &max_dets)?
1071        };
1072        Ok((summary, accumulated))
1073    }
1074}
1075
1076/// Per-cell memory cost breakdown.
1077#[derive(Debug, Default, Clone, Copy)]
1078struct CellCost {
1079    cells_struct: usize,
1080    dt_scores: usize,
1081    match_flags: usize,
1082}
1083
1084impl CellCost {
1085    fn total(self) -> usize {
1086        self.cells_struct + self.dt_scores + self.match_flags
1087    }
1088    fn add(self, other: Self) -> Self {
1089        Self {
1090            cells_struct: self.cells_struct + other.cells_struct,
1091            dt_scores: self.dt_scores + other.dt_scores,
1092            match_flags: self.match_flags + other.match_flags,
1093        }
1094    }
1095}
1096
1097/// Compute the memory cost of a single [`PerImageEval`] under the
1098/// budget accounting policy (`bytes_cells_struct + bytes_dt_scores +
1099/// bytes_match_flags`).
1100fn cell_cost(cell: &PerImageEval, n_iou_thresholds: usize) -> CellCost {
1101    let n_d = cell.dt_scores.len();
1102    CellCost {
1103        cells_struct: size_of::<PerImageEval>(),
1104        dt_scores: cell.dt_scores.capacity() * size_of::<f64>(),
1105        // dt_matched + dt_ignore are both `(T, D)` Bool arrays.
1106        match_flags: n_iou_thresholds
1107            .saturating_mul(n_d)
1108            .saturating_mul(size_of::<bool>())
1109            .saturating_mul(2),
1110    }
1111}
1112
1113/// Build [`EvalGridMeta`] from the dataset and params, mirroring the
1114/// axis layout the batch [`crate::evaluate_with`] orchestrator emits.
1115fn build_grid_meta(dataset: &CocoDataset, params: &OwnedEvaluateParams) -> EvalGridMeta {
1116    let n_area_ranges = params.area_ranges.len();
1117    let n_images = dataset.images().len();
1118
1119    // Image ids: same id-ascending sort the batch orchestrator uses, so
1120    // I-axis indices match between the two paths.
1121    let mut image_ids: Vec<ImageId> = dataset.images().iter().map(|im| im.id).collect();
1122    image_ids.sort_unstable_by_key(|id| id.0);
1123    let mut image_id_to_idx: HashMap<ImageId, usize> = HashMap::with_capacity(n_images);
1124    for (i, id) in image_ids.into_iter().enumerate() {
1125        image_id_to_idx.insert(id, i);
1126    }
1127
1128    let (n_categories, category_id_to_idx) = if params.use_cats {
1129        let mut cat_ids: Vec<CategoryId> = dataset.categories().iter().map(|c| c.id).collect();
1130        cat_ids.sort_unstable_by_key(|c| c.0);
1131        let mut map: HashMap<CategoryId, usize> = HashMap::with_capacity(cat_ids.len());
1132        for (k, id) in cat_ids.iter().enumerate() {
1133            map.insert(*id, k);
1134        }
1135        (cat_ids.len(), map)
1136    } else {
1137        (1, HashMap::new())
1138    };
1139
1140    EvalGridMeta {
1141        n_categories,
1142        n_area_ranges,
1143        n_images,
1144        category_id_to_idx,
1145        image_id_to_idx,
1146    }
1147}
1148
1149// Silence the "imported but unused" warning on `CocoDetection` /
1150// `DetectionInput` / `AnnId` — these are part of the documented surface
1151// but unused by the compiled body. They live in the `use` block to
1152// keep the module's public-API touchpoints visible at a glance.
1153#[allow(dead_code)]
1154fn _docs_typecheck(_a: AnnId, _b: CocoDetection, _c: DetectionInput) {}
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159    use crate::dataset::{Bbox, CategoryMeta, CocoAnnotation, ImageMeta};
1160    use crate::evaluate::AreaRange;
1161    use crate::parity::iou_thresholds;
1162    use crate::similarity::BboxIou;
1163
1164    fn img(id: i64, w: u32, h: u32) -> ImageMeta {
1165        ImageMeta {
1166            id: ImageId(id),
1167            width: w,
1168            height: h,
1169            file_name: None,
1170        }
1171    }
1172
1173    fn cat(id: i64, name: &str) -> CategoryMeta {
1174        CategoryMeta {
1175            id: CategoryId(id),
1176            name: name.into(),
1177            supercategory: None,
1178        }
1179    }
1180
1181    fn ann(id: i64, image: i64, cat: i64, bbox: (f64, f64, f64, f64)) -> CocoAnnotation {
1182        CocoAnnotation {
1183            id: AnnId(id),
1184            image_id: ImageId(image),
1185            category_id: CategoryId(cat),
1186            area: bbox.2 * bbox.3,
1187            is_crowd: false,
1188            ignore_flag: None,
1189            bbox: Bbox {
1190                x: bbox.0,
1191                y: bbox.1,
1192                w: bbox.2,
1193                h: bbox.3,
1194            },
1195            segmentation: None,
1196            keypoints: None,
1197            num_keypoints: None,
1198        }
1199    }
1200
1201    fn tiny_dataset() -> CocoDataset {
1202        let images = vec![img(1, 100, 100), img(2, 100, 100)];
1203        let cats = vec![cat(1, "thing")];
1204        let anns = vec![
1205            ann(1, 1, 1, (0.0, 0.0, 10.0, 10.0)),
1206            ann(2, 2, 1, (50.0, 50.0, 10.0, 10.0)),
1207        ];
1208        CocoDataset::from_parts(images, anns, cats).unwrap()
1209    }
1210
1211    fn default_params() -> OwnedEvaluateParams {
1212        OwnedEvaluateParams {
1213            iou_thresholds: iou_thresholds().to_vec(),
1214            area_ranges: AreaRange::coco_default().to_vec(),
1215            max_dets_per_image: 100,
1216            use_cats: true,
1217            retain_iou: false,
1218        }
1219    }
1220
1221    #[test]
1222    fn auto_default_budget_is_nonzero() {
1223        let b = MemoryBudget::auto_default();
1224        assert!(b.bytes > 0);
1225        assert!((b.soft_warn_fraction - DEFAULT_SOFT_WARN_FRACTION).abs() < 1e-12);
1226    }
1227
1228    #[test]
1229    fn fresh_evaluator_reports_zero_counters() {
1230        let ds = tiny_dataset();
1231        let ev = StreamingEvaluator::new(
1232            ds,
1233            BboxIou,
1234            default_params(),
1235            ParityMode::Strict,
1236            MemoryBudget::auto_default(),
1237        )
1238        .unwrap();
1239        assert_eq!(ev.images_seen(), 0);
1240        assert_eq!(ev.detections_seen(), 0);
1241        assert_eq!(ev.memory_used_bytes(), 0);
1242        // 2 images in the dataset, 0 seen → 2 pending.
1243        assert_eq!(ev.images_pending(), 2);
1244        // K=1 cat, A=4 ranges, I=2 images.
1245        assert_eq!(ev.grid_meta().n_categories, 1);
1246        assert_eq!(ev.grid_meta().n_area_ranges, 4);
1247        assert_eq!(ev.grid_meta().n_images, 2);
1248    }
1249
1250    #[test]
1251    fn empty_update_returns_zero_counters() {
1252        let ds = tiny_dataset();
1253        let mut ev = StreamingEvaluator::new(
1254            ds,
1255            BboxIou,
1256            default_params(),
1257            ParityMode::Strict,
1258            MemoryBudget::auto_default(),
1259        )
1260        .unwrap();
1261        let report = ev.update(b"[]").unwrap();
1262        assert_eq!(report.n_detections_accepted, 0);
1263        assert_eq!(report.n_images_in_batch, 0);
1264        assert_eq!(report.n_cells_inserted, 0);
1265        assert!(!report.soft_warn_triggered);
1266        assert_eq!(ev.detections_seen(), 0);
1267        assert_eq!(ev.images_seen(), 0);
1268        assert_eq!(ev.memory_used_bytes(), 0);
1269    }
1270
1271    #[test]
1272    fn finalize_returns_summary_with_canonical_shape() {
1273        // 12 stats for a detection kernel; we don't pin values here —
1274        // the parity tests do that. Smoke check only.
1275        let ds = tiny_dataset();
1276        let ev = StreamingEvaluator::new(
1277            ds,
1278            BboxIou,
1279            default_params(),
1280            ParityMode::Strict,
1281            MemoryBudget::auto_default(),
1282        )
1283        .unwrap();
1284        let summary = ev.finalize().unwrap();
1285        assert_eq!(summary.lines.len(), 12);
1286    }
1287
1288    #[test]
1289    fn snapshot_with_cells_matches_snapshot_summary() {
1290        // ADR-0018 Unit 6: the cell-retention snapshot must produce a
1291        // bit-identical Summary to the canonical `snapshot()` path —
1292        // calibration adds the cells alongside, never mutates the
1293        // canonical kernel maths. Drive both paths from the same
1294        // evaluator state and compare stats[] element-wise.
1295        let ds = tiny_dataset();
1296        let mut ev = StreamingEvaluator::new(
1297            ds,
1298            BboxIou,
1299            default_params(),
1300            ParityMode::Strict,
1301            MemoryBudget::auto_default(),
1302        )
1303        .unwrap();
1304        let batch = br#"[{"image_id": 1, "category_id": 1, "score": 0.9, "bbox": [0, 0, 10, 10]}]"#;
1305        ev.update(batch).unwrap();
1306
1307        let canonical = ev.snapshot().unwrap();
1308        let bundle = ev.snapshot_with_cells().unwrap();
1309        // Bit-equal — these come from the same evaluator state with no
1310        // intermediate mutation; the summary kernel is deterministic.
1311        assert_eq!(bundle.summary.lines.len(), canonical.lines.len());
1312        for (a, b) in canonical.lines.iter().zip(bundle.summary.lines.iter()) {
1313            // `StatLine` is just floats; bit-equal under f64 ==.
1314            assert_eq!(a.value.to_bits(), b.value.to_bits());
1315        }
1316        // Axes + parity mirrored — these are the values the FFI handle
1317        // (`EvalCells`) needs to hand off to the calibration kernel.
1318        assert_eq!(bundle.n_categories, ev.grid_meta().n_categories);
1319        assert_eq!(bundle.n_area_ranges, ev.grid_meta().n_area_ranges);
1320        assert_eq!(bundle.iou_thresholds, ev.params.iou_thresholds);
1321        assert!(matches!(bundle.parity_mode, ParityMode::Strict));
1322        // Cell store densified to K * A * I — same shape `accumulate`
1323        // and `summarize_calibration` consume.
1324        let expected_len =
1325            ev.grid_meta().n_categories * ev.grid_meta().n_area_ranges * ev.grid_meta().n_images;
1326        assert_eq!(bundle.eval_imgs.len(), expected_len);
1327        // At least one populated cell from the single submitted batch.
1328        assert!(bundle.eval_imgs.iter().any(|c| c.is_some()));
1329    }
1330
1331    #[test]
1332    fn duplicate_image_id_across_updates_is_rejected() {
1333        let ds = tiny_dataset();
1334        let mut ev = StreamingEvaluator::new(
1335            ds,
1336            BboxIou,
1337            default_params(),
1338            ParityMode::Strict,
1339            MemoryBudget::auto_default(),
1340        )
1341        .unwrap();
1342        // First batch: a single DT on image 1.
1343        let batch1 =
1344            br#"[{"image_id": 1, "category_id": 1, "score": 0.9, "bbox": [0, 0, 10, 10]}]"#;
1345        ev.update(batch1).unwrap();
1346        assert_eq!(ev.images_seen(), 1);
1347
1348        // Second batch: another DT on the same image — must be rejected.
1349        let batch2 =
1350            br#"[{"image_id": 1, "category_id": 1, "score": 0.8, "bbox": [50, 50, 10, 10]}]"#;
1351        let err = ev.update(batch2).unwrap_err();
1352        assert!(matches!(err, EvalError::InvalidAnnotation { .. }));
1353        // State unchanged: still one image seen, original counters intact.
1354        assert_eq!(ev.images_seen(), 1);
1355        assert_eq!(ev.detections_seen(), 1);
1356    }
1357
1358    #[test]
1359    fn out_of_budget_does_not_mutate_state() {
1360        let ds = tiny_dataset();
1361        let tiny_budget = MemoryBudget {
1362            bytes: 1, // pathologically small — first cell will overflow
1363            soft_warn_fraction: 0.80,
1364        };
1365        let mut ev = StreamingEvaluator::new(
1366            ds,
1367            BboxIou,
1368            default_params(),
1369            ParityMode::Strict,
1370            tiny_budget,
1371        )
1372        .unwrap();
1373        let batch = br#"[{"image_id": 1, "category_id": 1, "score": 0.9, "bbox": [0, 0, 10, 10]}]"#;
1374        let err = ev.update(batch).unwrap_err();
1375        match err {
1376            EvalError::OutOfBudget {
1377                used_bytes,
1378                budget_bytes,
1379                breakdown,
1380            } => {
1381                assert!(used_bytes > budget_bytes);
1382                assert_eq!(budget_bytes, 1);
1383                assert!(breakdown.contains_key("cells_store"));
1384                assert!(breakdown.contains_key("scores"));
1385                assert!(breakdown.contains_key("match_flags"));
1386            }
1387            other => panic!("expected OutOfBudget, got {other:?}"),
1388        }
1389        // State unchanged.
1390        assert_eq!(ev.images_seen(), 0);
1391        assert_eq!(ev.detections_seen(), 0);
1392        assert_eq!(ev.memory_used_bytes(), 0);
1393    }
1394
1395    fn dt_json(image_id: i64, score: f64, bbox: (f64, f64, f64, f64)) -> Vec<u8> {
1396        let body = format!(
1397            r#"[{{"image_id":{image_id},"category_id":1,"score":{score},"bbox":[{},{},{},{}]}}]"#,
1398            bbox.0, bbox.1, bbox.2, bbox.3
1399        );
1400        body.into_bytes()
1401    }
1402
1403    #[test]
1404    fn from_partials_two_disjoint_partitions_equals_combined_stream() {
1405        // Rank 0 sees image 1; rank 1 sees image 2. Merge should produce
1406        // the same summary as a single evaluator that ate both images.
1407        let ds = tiny_dataset();
1408        let mut combined = StreamingEvaluator::new(
1409            ds.clone(),
1410            BboxIou,
1411            default_params(),
1412            ParityMode::Corrected,
1413            MemoryBudget::auto_default(),
1414        )
1415        .unwrap();
1416        combined
1417            .update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0)))
1418            .unwrap();
1419        combined
1420            .update(&dt_json(2, 0.8, (50.0, 50.0, 10.0, 10.0)))
1421            .unwrap();
1422        let combined_summary = combined.finalize().unwrap();
1423
1424        let mut rank0 = StreamingEvaluator::new(
1425            ds.clone(),
1426            BboxIou,
1427            default_params(),
1428            ParityMode::Corrected,
1429            MemoryBudget::auto_default(),
1430        )
1431        .unwrap()
1432        .with_rank(0)
1433        .unwrap();
1434        rank0
1435            .update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0)))
1436            .unwrap();
1437        let p0 = rank0.finalize_to_partial().unwrap();
1438
1439        let mut rank1 = StreamingEvaluator::new(
1440            ds.clone(),
1441            BboxIou,
1442            default_params(),
1443            ParityMode::Corrected,
1444            MemoryBudget::auto_default(),
1445        )
1446        .unwrap()
1447        .with_rank(1)
1448        .unwrap();
1449        rank1
1450            .update(&dt_json(2, 0.8, (50.0, 50.0, 10.0, 10.0)))
1451            .unwrap();
1452        let p1 = rank1.finalize_to_partial().unwrap();
1453
1454        let merged = StreamingEvaluator::<BboxIou>::from_partials(
1455            ds,
1456            BboxIou,
1457            default_params(),
1458            ParityMode::Corrected,
1459            MemoryBudget::auto_default(),
1460            &[&p0, &p1],
1461        )
1462        .unwrap();
1463        let merged_summary = merged.finalize().unwrap();
1464
1465        assert_eq!(combined_summary.stats(), merged_summary.stats());
1466    }
1467
1468    #[test]
1469    fn from_partials_overlap_returns_partition_overlap_error() {
1470        let ds = tiny_dataset();
1471        let mut a = StreamingEvaluator::new(
1472            ds.clone(),
1473            BboxIou,
1474            default_params(),
1475            ParityMode::Corrected,
1476            MemoryBudget::auto_default(),
1477        )
1478        .unwrap()
1479        .with_rank(0)
1480        .unwrap();
1481        a.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1482        let pa = a.finalize_to_partial().unwrap();
1483
1484        // Both partials cover image 1 — the partition rule must reject.
1485        let mut b = StreamingEvaluator::new(
1486            ds.clone(),
1487            BboxIou,
1488            default_params(),
1489            ParityMode::Corrected,
1490            MemoryBudget::auto_default(),
1491        )
1492        .unwrap()
1493        .with_rank(1)
1494        .unwrap();
1495        b.update(&dt_json(1, 0.7, (5.0, 5.0, 10.0, 10.0))).unwrap();
1496        let pb = b.finalize_to_partial().unwrap();
1497
1498        let err = StreamingEvaluator::<BboxIou>::from_partials(
1499            ds,
1500            BboxIou,
1501            default_params(),
1502            ParityMode::Corrected,
1503            MemoryBudget::auto_default(),
1504            &[&pa, &pb],
1505        )
1506        .unwrap_err();
1507        assert!(matches!(
1508            err,
1509            EvalError::PartialPartitionOverlap {
1510                rank_a: 0,
1511                rank_b: 1,
1512                image_id: 1,
1513            }
1514        ));
1515    }
1516
1517    #[test]
1518    fn from_partials_strict_mode_rank_collision_rejected() {
1519        let ds = tiny_dataset();
1520        let mut a = StreamingEvaluator::new(
1521            ds.clone(),
1522            BboxIou,
1523            default_params(),
1524            ParityMode::Strict,
1525            MemoryBudget::auto_default(),
1526        )
1527        .unwrap()
1528        .with_rank(7)
1529        .unwrap();
1530        a.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1531        let pa = a.finalize_to_partial().unwrap();
1532
1533        let mut b = StreamingEvaluator::new(
1534            ds.clone(),
1535            BboxIou,
1536            default_params(),
1537            ParityMode::Strict,
1538            MemoryBudget::auto_default(),
1539        )
1540        .unwrap()
1541        .with_rank(7)
1542        .unwrap();
1543        b.update(&dt_json(2, 0.8, (50.0, 50.0, 10.0, 10.0)))
1544            .unwrap();
1545        let pb = b.finalize_to_partial().unwrap();
1546
1547        let err = StreamingEvaluator::<BboxIou>::from_partials(
1548            ds,
1549            BboxIou,
1550            default_params(),
1551            ParityMode::Strict,
1552            MemoryBudget::auto_default(),
1553            &[&pa, &pb],
1554        )
1555        .unwrap_err();
1556        assert!(matches!(
1557            err,
1558            EvalError::PartialRankCollision { rank_id: 7 }
1559        ));
1560    }
1561
1562    #[test]
1563    fn from_partials_dataset_hash_mismatch_rejected() {
1564        let ds_a = tiny_dataset();
1565        // ds_b shifts the bbox of one annotation by 1 px → different hash.
1566        let images = vec![img(1, 100, 100), img(2, 100, 100)];
1567        let cats = vec![cat(1, "thing")];
1568        let anns = vec![
1569            ann(1, 1, 1, (1.0, 0.0, 10.0, 10.0)), // shifted from (0,0,10,10)
1570            ann(2, 2, 1, (50.0, 50.0, 10.0, 10.0)),
1571        ];
1572        let ds_b = CocoDataset::from_parts(images, anns, cats).unwrap();
1573        assert_ne!(ds_a.dataset_hash(), ds_b.dataset_hash());
1574
1575        let mut ev = StreamingEvaluator::new(
1576            ds_a,
1577            BboxIou,
1578            default_params(),
1579            ParityMode::Corrected,
1580            MemoryBudget::auto_default(),
1581        )
1582        .unwrap();
1583        ev.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1584        let blob = ev.finalize_to_partial().unwrap();
1585
1586        let err = StreamingEvaluator::<BboxIou>::from_partials(
1587            ds_b,
1588            BboxIou,
1589            default_params(),
1590            ParityMode::Corrected,
1591            MemoryBudget::auto_default(),
1592            &[&blob],
1593        )
1594        .unwrap_err();
1595        assert!(matches!(err, EvalError::PartialDatasetMismatch { .. }));
1596    }
1597
1598    #[test]
1599    fn from_partials_params_hash_mismatch_rejected() {
1600        let ds = tiny_dataset();
1601        let mut ev = StreamingEvaluator::new(
1602            ds.clone(),
1603            BboxIou,
1604            default_params(),
1605            ParityMode::Corrected,
1606            MemoryBudget::auto_default(),
1607        )
1608        .unwrap();
1609        ev.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1610        let blob = ev.finalize_to_partial().unwrap();
1611
1612        let mut other_params = default_params();
1613        other_params.max_dets_per_image = 50; // diverges from the default 100.
1614
1615        let err = StreamingEvaluator::<BboxIou>::from_partials(
1616            ds,
1617            BboxIou,
1618            other_params,
1619            ParityMode::Corrected,
1620            MemoryBudget::auto_default(),
1621            &[&blob],
1622        )
1623        .unwrap_err();
1624        assert!(matches!(err, EvalError::PartialParamsMismatch { .. }));
1625    }
1626
1627    #[test]
1628    fn with_rank_after_update_is_rejected() {
1629        let ds = tiny_dataset();
1630        let mut ev = StreamingEvaluator::new(
1631            ds,
1632            BboxIou,
1633            default_params(),
1634            ParityMode::Corrected,
1635            MemoryBudget::auto_default(),
1636        )
1637        .unwrap();
1638        ev.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1639        let err = ev.with_rank(0).unwrap_err();
1640        assert!(matches!(err, EvalError::InvalidConfig { .. }));
1641    }
1642
1643    #[test]
1644    fn corrupted_partial_returns_format_mismatch() {
1645        let ds = tiny_dataset();
1646        let mut ev = StreamingEvaluator::new(
1647            ds.clone(),
1648            BboxIou,
1649            default_params(),
1650            ParityMode::Corrected,
1651            MemoryBudget::auto_default(),
1652        )
1653        .unwrap();
1654        ev.update(&dt_json(1, 0.9, (0.0, 0.0, 10.0, 10.0))).unwrap();
1655        let mut blob = ev.finalize_to_partial().unwrap();
1656        // Corrupt the magic bytes — earliest validation step.
1657        blob[0] = b'X';
1658
1659        let err = StreamingEvaluator::<BboxIou>::from_partials(
1660            ds,
1661            BboxIou,
1662            default_params(),
1663            ParityMode::Corrected,
1664            MemoryBudget::auto_default(),
1665            &[&blob],
1666        )
1667        .unwrap_err();
1668        assert!(matches!(
1669            err,
1670            EvalError::PartialFormatMismatch {
1671                kind: crate::error::PartialFormatErrorKind::WrongMagic { .. }
1672            }
1673        ));
1674    }
1675
1676    #[test]
1677    fn flatten_round_trips_to_dense_layout() {
1678        let mut store = PerImageEvalStore::new();
1679        // Insert one cell at (0, 0, 0) of a 1x1x2 grid.
1680        let cell = PerImageEval {
1681            dt_scores: vec![0.5],
1682            dt_matched: ndarray::Array2::default((1, 1)),
1683            dt_ignore: ndarray::Array2::default((1, 1)),
1684            gt_ignore: vec![false],
1685        };
1686        store.insert(0, 0, 0, cell);
1687        let meta = EvalGridMeta {
1688            n_categories: 1,
1689            n_area_ranges: 1,
1690            n_images: 2,
1691            category_id_to_idx: HashMap::new(),
1692            image_id_to_idx: HashMap::new(),
1693        };
1694        let dense = store.flatten(&meta);
1695        assert_eq!(dense.len(), 2);
1696        assert!(dense[0].is_some());
1697        assert!(dense[1].is_none());
1698    }
1699}