1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//! Return shape for subset-pipeline stages.
//!
//! Every optional stage (interval aggregation, group_by, …) is written
//! as a "total function" — it accepts an `Option<&Params>` and returns
//! a [`StageOutcome`]. When params are `None`, the stage no-ops and
//! returns [`StageOutcome::passthrough`].
//!
//! The caller merges each stage's outcome into its accumulators with
//! `.extend()` calls, so the main orchestration flow contains no
//! `if let Some` branches per stage. The pattern is:
//!
//! ```ignore
//! let outcome = interval::apply_interval(data, request.report_interval.as_ref(), …)?;
//! data = outcome.data;
//! interval_stats.extend(outcome.stats);
//! stage_trace.extend(outcome.diags);
//!
//! let outcome = group::apply_group_by(data, request.group_by.as_ref(), …)?;
//! data = outcome.data;
//! group_stats.extend(outcome.stats);
//! stage_trace.extend(outcome.diags);
//! ```
//!
//! `extend` from empty collections is a no-op, so skipped stages cost
//! nothing at the call site. The whole pipeline reads as a linear
//! sequence of reductions.
use DataFrame;
use StageDiag;
/// The return shape for a subset-pipeline stage.
///
/// - `data` — the transformed DataFrame (or the input unchanged if the
/// stage no-opped).
/// - `stats` — per-cell diagnostics specific to the stage (empty when
/// the stage didn't run). Each stage defines its own `S` type
/// (e.g., `IntervalStats`, `GroupStats`).
/// - `diags` — entries to append to the subset's stage trace (empty
/// when the stage didn't run).