use serde::{Deserialize, Serialize};
use super::{RateStrategy, ReportInterval};
use crate::{CanonicalColumnName, aggregation::Aggregate, unit::MeasurementUnit};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResamplingPlan {
pub measurement: CanonicalColumnName,
pub path: ResamplingPath,
pub target_rate_ms: i64,
pub native_rate_ms: Option<i64>,
pub aggregation: Aggregate,
pub aggregation_source: AggregationSource,
pub reason: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ResamplingPath {
Passthrough,
Aggregate,
Upsample,
Sparse,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AggregationSource {
Schema,
Override,
}
pub struct ResamplingPlanner<'m, 'i> {
unit: &'m MeasurementUnit,
interval: &'i ReportInterval,
}
impl<'m, 'i> ResamplingPlanner<'m, 'i> {
pub fn new(unit: &'m MeasurementUnit, interval: &'i ReportInterval) -> Self {
Self { unit, interval }
}
pub fn plan(&self) -> ResamplingPlan {
let native_rate_ms = self.unit.sample_rate_ms;
let target_rate_ms = self.interval.bucket.approximate_ms();
let (aggregation, aggregation_source) = self.choose_aggregation();
let (path, reason) = self.choose_path(native_rate_ms, target_rate_ms);
ResamplingPlan {
measurement: self.unit.name.clone(),
path,
target_rate_ms,
native_rate_ms,
aggregation,
aggregation_source,
reason,
}
}
fn choose_aggregation(&self) -> (Aggregate, AggregationSource) {
if let Some(ref overrides) = self.interval.aggregation_override
&& let Some(agg) = overrides.get(&self.unit.name)
{
return (*agg, AggregationSource::Override);
}
(self.unit.signal_aggregation(), AggregationSource::Schema)
}
fn choose_path(
&self,
native_rate_ms: Option<i64>,
target_rate_ms: i64,
) -> (ResamplingPath, String) {
if matches!(self.interval.bucket, super::IntervalBucket::WholeWindow,) {
return (
ResamplingPath::Aggregate,
"whole-window bucket folds every observation per subject".into(),
);
}
let Some(native) = native_rate_ms else {
return (
ResamplingPath::Passthrough,
"no native sample rate configured — passthrough".into(),
);
};
if native == target_rate_ms {
return (
ResamplingPath::Passthrough,
format!("native rate {native}ms matches interval"),
);
}
if native < target_rate_ms {
return (
ResamplingPath::Aggregate,
format!(
"native {native}ms finer than interval {target_rate_ms}ms — aggregate \
(no upsample/downsample choice needed)"
),
);
}
let has_upsample_strategy = self.unit.upsample_strategy.is_some();
match self.interval.strategy {
RateStrategy::Auto => {
if has_upsample_strategy {
(
ResamplingPath::Upsample,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms; \
schema declares upsample_strategy — honoring author intent"
),
)
} else {
(
ResamplingPath::Sparse,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms; \
no upsample_strategy declared on measurement — sparse"
),
)
}
}
RateStrategy::Upsample => {
if has_upsample_strategy {
(
ResamplingPath::Upsample,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms, \
strategy=Upsample with declared upsample_strategy — forward-fill"
),
)
} else {
(
ResamplingPath::Sparse,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms, \
strategy=Upsample but measurement has no upsample_strategy — \
cannot force what isn't configured; sparse"
),
)
}
}
RateStrategy::AggregateOrSparse => (
ResamplingPath::Sparse,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms, \
strategy=AggregateOrSparse — sparse (ignores schema's upsample_strategy)"
),
),
RateStrategy::Native => (
ResamplingPath::Sparse,
format!(
"native {native}ms coarser than interval {target_rate_ms}ms, \
strategy=Native — sparse on report grid"
),
),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::{
MeasurementKind, ResampleStrategy, interval::IntervalBucket, signal_policy::SignalPolicy,
unit::MeasurementUnit,
};
fn unit_named(name: &str, kind: MeasurementKind, native_rate_ms: i64) -> MeasurementUnit {
MeasurementUnit::new("subject", "time", name, kind)
.with_signal_policy(SignalPolicy::instant())
.with_sample_rate_ms(native_rate_ms)
}
fn sump_unit() -> MeasurementUnit {
unit_named("sump", MeasurementKind::Measure, 60_000) }
fn precip_unit() -> MeasurementUnit {
unit_named("historical_precip", MeasurementKind::Measure, 3_600_000) }
fn monthly_auto() -> ReportInterval {
ReportInterval {
bucket: IntervalBucket::Months(1),
strategy: RateStrategy::Auto,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
}
}
fn five_min_auto() -> ReportInterval {
ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 5 * 60_000,
},
strategy: RateStrategy::Auto,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
}
}
fn one_min_auto() -> ReportInterval {
ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 60_000,
},
strategy: RateStrategy::Auto,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
}
}
#[test]
fn aggregate_when_native_finer_than_interval() {
let plan = ResamplingPlanner::new(&sump_unit(), &monthly_auto()).plan();
assert_eq!(plan.path, ResamplingPath::Aggregate);
assert_eq!(plan.aggregation, Aggregate::Mean);
assert_eq!(plan.aggregation_source, AggregationSource::Schema);
}
#[test]
fn aggregate_path_independent_of_strategy_when_native_finer() {
for strategy in [
RateStrategy::Auto,
RateStrategy::Upsample,
RateStrategy::Native,
RateStrategy::AggregateOrSparse,
] {
let interval = ReportInterval {
bucket: IntervalBucket::Months(1),
strategy,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&sump_unit(), &interval).plan();
assert_eq!(
plan.path,
ResamplingPath::Aggregate,
"strategy {strategy:?} must choose Aggregate when native < interval",
);
}
}
#[test]
fn passthrough_when_native_matches_interval() {
let plan = ResamplingPlanner::new(&sump_unit(), &one_min_auto()).plan();
assert_eq!(plan.path, ResamplingPath::Passthrough);
}
#[test]
fn auto_sparse_when_native_coarser_and_no_upsample_declared() {
let plan = ResamplingPlanner::new(&precip_unit(), &five_min_auto()).plan();
assert_eq!(plan.path, ResamplingPath::Sparse);
assert!(plan.reason.contains("no upsample_strategy declared"));
}
#[test]
fn auto_upsamples_when_schema_declares_upsample_strategy() {
let precip = precip_unit().with_upsample(ResampleStrategy::ForwardFill);
let plan = ResamplingPlanner::new(&precip, &five_min_auto()).plan();
assert_eq!(
plan.path,
ResamplingPath::Upsample,
"Auto + schema upsample_strategy → Upsample (honor author intent)",
);
assert!(plan.reason.contains("honoring author intent"));
}
#[test]
fn aggregate_or_sparse_ignores_schema_upsample_declaration() {
let precip = precip_unit().with_upsample(ResampleStrategy::ForwardFill);
let interval = ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 5 * 60_000,
},
strategy: RateStrategy::AggregateOrSparse,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&precip, &interval).plan();
assert_eq!(plan.path, ResamplingPath::Sparse);
assert!(plan.reason.contains("ignores schema's upsample_strategy"));
}
#[test]
fn sparse_when_native_strategy_and_native_coarser() {
let interval = ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 5 * 60_000,
},
strategy: RateStrategy::Native,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&precip_unit(), &interval).plan();
assert_eq!(plan.path, ResamplingPath::Sparse);
}
#[test]
fn upsample_when_strategy_requests_and_measurement_declares_upsample() {
let precip = precip_unit().with_upsample(ResampleStrategy::ForwardFill);
let interval = ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 5 * 60_000,
},
strategy: RateStrategy::Upsample,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&precip, &interval).plan();
assert_eq!(plan.path, ResamplingPath::Upsample);
}
#[test]
fn falls_back_to_sparse_when_upsample_requested_but_not_declared() {
let interval = ReportInterval {
bucket: IntervalBucket::Fixed {
duration_ms: 5 * 60_000,
},
strategy: RateStrategy::Upsample,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&precip_unit(), &interval).plan();
assert_eq!(plan.path, ResamplingPath::Sparse);
assert!(plan.reason.contains("no upsample_strategy"));
}
#[test]
fn aggregation_uses_schema_default_when_no_override() {
let plan = ResamplingPlanner::new(&sump_unit(), &monthly_auto()).plan();
assert_eq!(plan.aggregation, Aggregate::Mean);
assert_eq!(plan.aggregation_source, AggregationSource::Schema);
}
#[test]
fn aggregation_override_wins_over_schema_default() {
let mut overrides = HashMap::new();
overrides.insert(CanonicalColumnName::new("sump"), Aggregate::Max);
let interval = ReportInterval {
bucket: IntervalBucket::Months(1),
strategy: RateStrategy::Auto,
aggregation_override: Some(overrides),
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&sump_unit(), &interval).plan();
assert_eq!(plan.aggregation, Aggregate::Max);
assert_eq!(plan.aggregation_source, AggregationSource::Override);
}
#[test]
fn aggregation_override_for_different_measurement_is_ignored() {
let mut overrides = HashMap::new();
overrides.insert(
CanonicalColumnName::new("historical_precip"),
Aggregate::Sum,
);
let interval = ReportInterval {
bucket: IntervalBucket::Months(1),
strategy: RateStrategy::Auto,
aggregation_override: Some(overrides),
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&sump_unit(), &interval).plan();
assert_eq!(plan.aggregation, Aggregate::Mean);
assert_eq!(plan.aggregation_source, AggregationSource::Schema);
}
#[test]
fn plan_records_measurement_name_and_rates() {
let plan = ResamplingPlanner::new(&sump_unit(), &monthly_auto()).plan();
assert_eq!(plan.measurement, CanonicalColumnName::new("sump"));
assert_eq!(plan.native_rate_ms, Some(60_000));
assert_eq!(
plan.target_rate_ms,
IntervalBucket::Months(1).approximate_ms()
);
assert!(
!plan.reason.is_empty(),
"reason must be populated for diagnostics"
);
}
#[test]
fn interval_bucket_approximate_ms_orders_correctly() {
let minute = IntervalBucket::Fixed {
duration_ms: 60_000,
}
.approximate_ms();
let hour = IntervalBucket::Hours(1).approximate_ms();
let day = IntervalBucket::Days(1).approximate_ms();
let week = IntervalBucket::Weeks(1).approximate_ms();
let month = IntervalBucket::Months(1).approximate_ms();
let whole = IntervalBucket::WholeWindow.approximate_ms();
assert!(minute < hour);
assert!(hour < day);
assert!(day < week);
assert!(week < month);
assert!(month < whole, "WholeWindow must sort coarsest");
}
fn whole_window(strategy: RateStrategy) -> ReportInterval {
ReportInterval {
bucket: IntervalBucket::WholeWindow,
strategy,
aggregation_override: None,
empty_bucket: super::super::EmptyBucketPolicy::Null,
}
}
#[test]
fn whole_window_always_picks_aggregate_regardless_of_strategy() {
for strategy in [
RateStrategy::Auto,
RateStrategy::Upsample,
RateStrategy::Native,
RateStrategy::AggregateOrSparse,
] {
let plan = ResamplingPlanner::new(&sump_unit(), &whole_window(strategy)).plan();
assert_eq!(
plan.path,
ResamplingPath::Aggregate,
"WholeWindow must pick Aggregate for strategy={strategy:?}",
);
assert!(
plan.reason.contains("whole-window"),
"reason must name the bucket: got {:?}",
plan.reason,
);
}
}
#[test]
fn whole_window_aggregation_honors_override() {
let mut overrides = HashMap::new();
overrides.insert(CanonicalColumnName::new("sump"), Aggregate::Max);
let interval = ReportInterval {
bucket: IntervalBucket::WholeWindow,
strategy: RateStrategy::Auto,
aggregation_override: Some(overrides),
empty_bucket: super::super::EmptyBucketPolicy::Null,
};
let plan = ResamplingPlanner::new(&sump_unit(), &interval).plan();
assert_eq!(plan.path, ResamplingPath::Aggregate);
assert_eq!(plan.aggregation, Aggregate::Max);
assert_eq!(plan.aggregation_source, AggregationSource::Override);
}
}