use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
use chrono::NaiveDate;
use cobre_core::{EntityId, System};
use cobre_io::{
Config, FileManifest, LoadError, ValidationContext, parse_inflow_ar_coefficients,
parse_inflow_history,
scenarios::{InflowArCoefficientRow, InflowSeasonalStatsRow, assemble_inflow_models},
validate_structure,
};
use cobre_stochastic::{
StochasticError,
par::contribution::{
check_negative_contributions, compute_contributions, find_max_valid_order,
has_negative_phi1,
},
par::fitting::{
ArCoefficientEstimate, SeasonalStats, estimate_ar_coefficients_with_season_map,
estimate_correlation_with_season_map, estimate_periodic_ar_coefficients,
estimate_seasonal_stats_with_season_map, find_season_for_date, periodic_pacf,
select_order_pacf,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EstimationPath {
Deterministic,
UserStatsWhiteNoise,
UserProvidedNoHistory,
FullEstimation,
UserArHistoryStats,
PartialEstimation,
UserProvidedAll,
}
impl EstimationPath {
#[must_use]
pub fn resolve(manifest: &cobre_io::FileManifest) -> Self {
match (
manifest.scenarios_inflow_history_parquet,
manifest.scenarios_inflow_seasonal_stats_parquet,
manifest.scenarios_inflow_ar_coefficients_parquet,
) {
(false, false, _) => Self::Deterministic,
(false, true, false) => Self::UserStatsWhiteNoise,
(false, true, true) => Self::UserProvidedNoHistory,
(true, false, false) => Self::FullEstimation,
(true, false, true) => Self::UserArHistoryStats,
(true, true, false) => Self::PartialEstimation,
(true, true, true) => Self::UserProvidedAll,
}
}
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Deterministic => "deterministic",
Self::UserStatsWhiteNoise => "user_stats_white_noise",
Self::UserProvidedNoHistory => "user_provided_no_history",
Self::FullEstimation => "full_estimation",
Self::UserArHistoryStats => "user_ar_history_stats",
Self::PartialEstimation => "partial_estimation",
Self::UserProvidedAll => "user_provided_all",
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum EstimationError {
#[error("load error: {0}")]
Load(#[from] LoadError),
#[error("estimation failed: {0}")]
Stochastic(#[from] StochasticError),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReductionReason {
MagnitudeBound,
Phi1Negative,
NegativeContribution,
}
impl ReductionReason {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::MagnitudeBound => "magnitude_bound",
Self::Phi1Negative => "phi1_negative",
Self::NegativeContribution => "negative_contribution",
}
}
}
#[derive(Debug, Clone)]
pub struct ContributionReduction {
pub season_id: usize,
pub original_order: usize,
pub reduced_order: usize,
pub contributions: Vec<f64>,
pub reason: ReductionReason,
}
#[derive(Debug, Clone)]
pub struct HydroEstimationEntry {
pub selected_order: u32,
pub coefficients: Vec<Vec<f64>>,
pub contribution_reductions: Vec<ContributionReduction>,
}
#[derive(Debug, Clone)]
pub struct LagScaleWarning {
pub hydro_id: EntityId,
pub lag_value: f64,
pub user_mean: f64,
pub estimated_mean: f64,
}
#[must_use]
#[derive(Debug, Clone)]
pub struct EstimationReport {
pub entries: BTreeMap<EntityId, HydroEstimationEntry>,
pub method: String,
pub white_noise_fallbacks: Vec<EntityId>,
pub lag_scale_warnings: Vec<LagScaleWarning>,
pub std_ratio_warnings: Vec<StdRatioDivergence>,
}
#[derive(Debug, Clone)]
pub struct StdRatioDivergence {
pub hydro_id: EntityId,
pub season_a: usize,
pub season_b: usize,
pub user_ratio: f64,
pub estimated_ratio: f64,
pub divergence: f64,
}
#[derive(Debug, Clone)]
pub struct ContributionValidationResult {
pub valid: bool,
pub max_valid_order: usize,
pub contributions: Vec<f64>,
}
fn validate_order_contributions(
season_id: usize,
n_seasons: usize,
current_order: usize,
all_season_coefficients: &[Vec<f64>],
std_by_season: &[f64],
) -> ContributionValidationResult {
if current_order == 0 {
return ContributionValidationResult {
valid: true,
max_valid_order: 0,
contributions: Vec::new(),
};
}
let coeff_refs: Vec<&[f64]> = all_season_coefficients.iter().map(Vec::as_slice).collect();
let contributions = compute_contributions(
season_id,
n_seasons,
current_order,
&coeff_refs,
std_by_season,
);
let valid = !check_negative_contributions(&contributions);
let max_valid_order = if valid {
current_order
} else {
find_max_valid_order(&contributions)
};
ContributionValidationResult {
valid,
max_valid_order,
contributions,
}
}
pub fn estimate_from_history(
system: System,
case_dir: &Path,
config: &Config,
) -> Result<(System, Option<EstimationReport>, EstimationPath), EstimationError> {
let mut ctx = ValidationContext::new();
let manifest = validate_structure(case_dir, &mut ctx);
if ctx.into_result().is_err() {
return Ok((system, None, EstimationPath::Deterministic));
}
let path = EstimationPath::resolve(&manifest);
match path {
EstimationPath::Deterministic
| EstimationPath::UserStatsWhiteNoise
| EstimationPath::UserProvidedNoHistory
| EstimationPath::UserProvidedAll => Ok((system, None, path)),
EstimationPath::PartialEstimation => {
let (system, report) = run_partial_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
EstimationPath::FullEstimation => {
let (system, report) = run_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
EstimationPath::UserArHistoryStats => {
let (system, report) = run_user_ar_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
}
}
fn run_estimation(
system: System,
case_dir: &Path,
config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
let seasonal_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let max_order = config.estimation.max_order as usize;
let (ar_estimates, estimation_report) = estimate_ar_coefficients_with_selection(
&observations,
&seasonal_stats,
stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: config.estimation.max_coefficient_magnitude,
season_map,
},
)?;
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&ar_estimates,
&seasonal_stats,
stages,
&hydro_ids,
season_map,
)?
};
let stats_rows = seasonal_stats_to_rows(&seasonal_stats, stages);
let coeff_rows = ar_estimates_to_rows(&ar_estimates, stages);
let inflow_models = assemble_inflow_models(stats_rows, coeff_rows)?;
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
#[allow(clippy::too_many_lines)]
fn run_partial_estimation(
system: System,
case_dir: &Path,
config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
if system.inflow_models().is_empty() {
return Err(EstimationError::Load(
cobre_io::LoadError::ConstraintError {
description: "manifest indicates inflow_seasonal_stats.parquet is present \
but system.inflow_models() is empty; \
no user stats available for partial estimation"
.to_string(),
},
));
}
let fitting_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let max_order = config.estimation.max_order as usize;
let (ar_estimates, estimation_report) = estimate_ar_coefficients_with_selection(
&observations,
&fitting_stats,
stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: config.estimation.max_coefficient_magnitude,
season_map,
},
)?;
let estimated_hydro_ids: HashSet<EntityId> =
fitting_stats.iter().map(|s| s.entity_id).collect();
let user_stats_hydro_ids: HashSet<EntityId> =
system.inflow_models().iter().map(|m| m.hydro_id).collect();
let mut missing_stats: Vec<EntityId> = estimated_hydro_ids
.difference(&user_stats_hydro_ids)
.copied()
.collect();
missing_stats.sort();
if !missing_stats.is_empty() {
let ids: Vec<String> = missing_stats.iter().map(|id| id.0.to_string()).collect();
return Err(EstimationError::Load(
cobre_io::LoadError::ConstraintError {
description: format!(
"partial estimation: AR coefficients were estimated for hydro(s) \
[{ids}] but inflow_seasonal_stats.parquet has no entry for them; \
all hydros with estimated AR must have user-provided stats",
ids = ids.join(", ")
),
},
));
}
let mut white_noise_fallbacks: Vec<EntityId> = user_stats_hydro_ids
.difference(&estimated_hydro_ids)
.copied()
.collect();
white_noise_fallbacks.sort();
let lag_scale_warnings: Vec<LagScaleWarning> = {
let estimated_mean_at_stage0: BTreeMap<EntityId, f64> = fitting_stats
.iter()
.filter(|s| s.stage_id == 0)
.map(|s| (s.entity_id, s.mean))
.collect();
let user_mean_at_stage0: BTreeMap<EntityId, f64> = system
.inflow_models()
.iter()
.filter(|m| m.stage_id == 0)
.map(|m| (m.hydro_id, m.mean_m3s))
.collect();
let mut sorted_past_inflows: Vec<&cobre_core::HydroPastInflows> =
system.initial_conditions().past_inflows.iter().collect();
sorted_past_inflows.sort_by_key(|p| p.hydro_id);
let mut warnings: Vec<LagScaleWarning> = Vec::new();
for past in sorted_past_inflows {
let Some(&lag_value) = past.values_m3s.first() else {
continue;
};
let Some(&estimated_mean) = estimated_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
let Some(&user_mean) = user_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
if (lag_value - estimated_mean).abs() < (lag_value - user_mean).abs() {
eprintln!(
"warning: hydro {} initial lag ({:.1}) is closer to estimated mean \
({:.1}) than user mean ({:.1}) -- initial conditions may be at \
historical scale",
past.hydro_id.0, lag_value, estimated_mean, user_mean
);
warnings.push(LagScaleWarning {
hydro_id: past.hydro_id,
lag_value,
user_mean,
estimated_mean,
});
}
}
warnings
};
let std_ratio_warnings = check_std_ratio_divergence(&system, &fitting_stats, stages);
for w in &std_ratio_warnings {
eprintln!(
"warning: hydro {} season {}->{} std ratio diverges {:.1}x between \
user ({:.2}) and estimated ({:.2})",
w.hydro_id.0, w.season_a, w.season_b, w.divergence, w.user_ratio, w.estimated_ratio
);
}
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&ar_estimates,
&fitting_stats,
stages,
&hydro_ids,
season_map,
)?
};
let user_stats_rows = user_stats_to_rows(&system);
let coeff_rows = ar_estimates_to_rows(&ar_estimates, stages);
let inflow_models = assemble_inflow_models(user_stats_rows, coeff_rows)?;
let mut estimation_report = estimation_report;
estimation_report.white_noise_fallbacks = white_noise_fallbacks;
estimation_report.lag_scale_warnings = lag_scale_warnings;
estimation_report.std_ratio_warnings = std_ratio_warnings;
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
fn run_user_ar_estimation(
system: System,
case_dir: &Path,
_config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
let seasonal_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let ar_path = case_dir.join("scenarios/inflow_ar_coefficients.parquet");
let user_ar_rows = parse_inflow_ar_coefficients(&ar_path)?;
let user_ar_estimates = ar_rows_to_estimates(&user_ar_rows, stages);
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&user_ar_estimates,
&seasonal_stats,
stages,
&hydro_ids,
season_map,
)?
};
let stats_rows = seasonal_stats_to_rows(&seasonal_stats, stages);
let inflow_models = assemble_inflow_models(stats_rows, user_ar_rows)?;
let estimation_report = EstimationReport {
entries: BTreeMap::new(),
method: "user_provided".to_string(),
white_noise_fallbacks: Vec::new(),
lag_scale_warnings: Vec::new(),
std_ratio_warnings: Vec::new(),
};
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
fn ar_rows_to_estimates(
rows: &[InflowArCoefficientRow],
stages: &[cobre_core::temporal::Stage],
) -> Vec<ArCoefficientEstimate> {
let stage_id_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut first_stage: HashMap<(EntityId, usize), i32> = HashMap::new();
let mut groups: BTreeMap<(EntityId, usize), (Vec<f64>, f64)> = BTreeMap::new();
for row in rows {
let Some(&season_id) = stage_id_to_season.get(&row.stage_id) else {
continue;
};
let key = (row.hydro_id, season_id);
let canonical_stage = first_stage.entry(key).or_insert(row.stage_id);
if *canonical_stage != row.stage_id {
continue;
}
let entry = groups
.entry(key)
.or_insert_with(|| (Vec::new(), row.residual_std_ratio));
entry.0.push(row.coefficient);
}
groups
.into_iter()
.map(
|((hydro_id, season_id), (coefficients, residual_std_ratio))| ArCoefficientEstimate {
hydro_id,
season_id,
coefficients,
residual_std_ratio,
},
)
.collect()
}
fn user_stats_to_rows(system: &System) -> Vec<InflowSeasonalStatsRow> {
system
.inflow_models()
.iter()
.map(|m| InflowSeasonalStatsRow {
hydro_id: m.hydro_id,
stage_id: m.stage_id,
mean_m3s: m.mean_m3s,
std_m3s: m.std_m3s,
})
.collect()
}
fn check_std_ratio_divergence(
system: &System,
fitting_stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
) -> Vec<StdRatioDivergence> {
let stage_id_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut user_std: BTreeMap<(EntityId, usize), f64> = BTreeMap::new();
for m in system.inflow_models() {
let Some(&season_id) = stage_id_to_season.get(&m.stage_id) else {
continue;
};
user_std.entry((m.hydro_id, season_id)).or_insert(m.std_m3s);
}
let mut est_std: BTreeMap<(EntityId, usize), f64> = BTreeMap::new();
for s in fitting_stats {
let Some(&season_id) = stage_id_to_season.get(&s.stage_id) else {
continue;
};
est_std.entry((s.entity_id, season_id)).or_insert(s.std);
}
let user_hydros: std::collections::BTreeSet<EntityId> =
user_std.keys().map(|(h, _)| *h).collect();
let est_hydros: std::collections::BTreeSet<EntityId> =
est_std.keys().map(|(h, _)| *h).collect();
let common_hydros: Vec<EntityId> = user_hydros.intersection(&est_hydros).copied().collect();
let mut warnings: Vec<StdRatioDivergence> = Vec::new();
for hydro_id in common_hydros {
let season_ids: Vec<usize> = {
let mut ids: Vec<usize> = user_std
.keys()
.filter(|(h, _)| *h == hydro_id)
.map(|(_, s)| *s)
.collect();
ids.sort_unstable();
ids.dedup();
ids
};
let n = season_ids.len();
if n < 2 {
continue;
}
for i in 0..n {
let season_a = season_ids[i];
let season_b = season_ids[(i + 1) % n];
let Some(&u_a) = user_std.get(&(hydro_id, season_a)) else {
continue;
};
let Some(&u_b) = user_std.get(&(hydro_id, season_b)) else {
continue;
};
let Some(&e_a) = est_std.get(&(hydro_id, season_a)) else {
continue;
};
let Some(&e_b) = est_std.get(&(hydro_id, season_b)) else {
continue;
};
if u_b.abs() < 1e-12 || e_b.abs() < 1e-12 {
continue;
}
let ratio_user = u_a / u_b;
let ratio_est = e_a / e_b;
if ratio_user.abs() < 1e-12 || ratio_est.abs() < 1e-12 {
continue;
}
let divergence = (ratio_user / ratio_est)
.abs()
.max((ratio_est / ratio_user).abs());
if divergence > 2.0 {
warnings.push(StdRatioDivergence {
hydro_id,
season_a,
season_b,
user_ratio: ratio_user,
estimated_ratio: ratio_est,
divergence,
});
}
}
}
warnings.sort_by_key(|w| (w.hydro_id, w.season_a));
warnings
}
struct ArEstimationConfig<'a> {
max_order: usize,
max_coeff_magnitude: Option<f64>,
season_map: Option<&'a cobre_core::temporal::SeasonMap>,
}
fn estimate_ar_coefficients_with_selection(
observations: &[(EntityId, NaiveDate, f64)],
seasonal_stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
hydro_ids: &[EntityId],
cfg: &ArEstimationConfig<'_>,
) -> Result<(Vec<ArCoefficientEstimate>, EstimationReport), StochasticError> {
estimate_ar_with_pacf(
observations,
seasonal_stats,
stages,
hydro_ids,
cfg.max_order,
cfg.season_map,
cfg.max_coeff_magnitude,
)
}
#[allow(clippy::too_many_lines)]
fn estimate_ar_with_pacf(
observations: &[(EntityId, NaiveDate, f64)],
seasonal_stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
hydro_ids: &[EntityId],
max_order: usize,
season_map: Option<&cobre_core::temporal::SeasonMap>,
max_coeff_magnitude: Option<f64>,
) -> Result<(Vec<ArCoefficientEstimate>, EstimationReport), StochasticError> {
if max_order == 0 {
let estimates = estimate_ar_coefficients_with_season_map(
observations,
seasonal_stats,
stages,
hydro_ids,
0,
season_map,
)?;
let report = EstimationReport {
entries: BTreeMap::new(),
method: "PACF".to_string(),
white_noise_fallbacks: Vec::new(),
lag_scale_warnings: Vec::new(),
std_ratio_warnings: Vec::new(),
};
return Ok((estimates, report));
}
let mut stage_index = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.start_date, s.end_date, s.id, sid)))
.collect::<Vec<_>>();
stage_index.sort_unstable_by_key(|(start, _, _, _)| *start);
let stage_id_to_season: HashMap<i32, usize> = stage_index
.iter()
.map(|&(_, _, stage_id, season_id)| (stage_id, season_id))
.collect();
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = seasonal_stats
.iter()
.filter_map(|s| {
let season_id = stage_id_to_season.get(&s.stage_id).copied()?;
Some(((s.entity_id, season_id), s))
})
.collect();
let n_seasons: usize = {
let mut max_season = 0usize;
for &(_, _, _, season_id) in &stage_index {
if season_id >= max_season {
max_season = season_id + 1;
}
}
max_season
};
let entity_set: HashSet<EntityId> = hydro_ids.iter().copied().collect();
let mut group_obs: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
for &(entity_id, date, value) in observations {
if !entity_set.contains(&entity_id) {
continue;
}
let Some(season_id) = find_season_for_date(&stage_index, date)
.or_else(|| season_map.and_then(|sm| sm.season_for_date(date)))
else {
continue;
};
group_obs
.entry((entity_id, season_id))
.or_default()
.push(value);
}
let z_alpha = 1.96_f64;
let mut estimates: Vec<ArCoefficientEstimate> = Vec::new();
for &hydro_id in hydro_ids {
let mut obs_by_season: Vec<Vec<f64>> = vec![Vec::new(); n_seasons];
let mut stats_by_season: Vec<(f64, f64)> = vec![(0.0, 0.0); n_seasons];
for season in 0..n_seasons {
if let Some(obs) = group_obs.get(&(hydro_id, season)) {
obs_by_season[season].clone_from(obs);
}
if let Some(stats) = stats_map.get(&(hydro_id, season)) {
stats_by_season[season] = (stats.mean, stats.std);
}
}
let obs_refs: Vec<&[f64]> = obs_by_season.iter().map(Vec::as_slice).collect();
for season in 0..n_seasons {
let stats_s = stats_by_season[season];
if stats_s.1 == 0.0 || obs_by_season[season].len() < 2 {
estimates.push(ArCoefficientEstimate {
hydro_id,
season_id: season,
coefficients: Vec::new(),
residual_std_ratio: 1.0,
});
continue;
}
let n_obs = obs_by_season[season].len();
let pacf_values =
periodic_pacf(season, max_order, n_seasons, &obs_refs, &stats_by_season);
let pacf_result = select_order_pacf(&pacf_values, n_obs, z_alpha);
let selected_order = pacf_result.selected_order;
let yw_result = estimate_periodic_ar_coefficients(
season,
selected_order,
n_seasons,
&obs_refs,
&stats_by_season,
);
estimates.push(ArCoefficientEstimate {
hydro_id,
season_id: season,
coefficients: yw_result.coefficients,
residual_std_ratio: yw_result.residual_std_ratio,
});
}
}
let reductions = iterative_pacf_reduction(
&mut estimates,
n_seasons,
hydro_ids,
&group_obs,
&stats_map,
max_order,
z_alpha,
max_coeff_magnitude,
);
let report = build_estimation_report(&estimates, n_seasons, &reductions, "PACF");
Ok((estimates, report))
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
fn iterative_pacf_reduction(
estimates: &mut [ArCoefficientEstimate],
n_seasons: usize,
hydro_ids: &[EntityId],
group_obs: &HashMap<(EntityId, usize), Vec<f64>>,
stats_map: &HashMap<(EntityId, usize), &SeasonalStats>,
initial_max_order: usize,
z_alpha: f64,
max_coeff_magnitude: Option<f64>,
) -> HashMap<EntityId, Vec<ContributionReduction>> {
let mut all_reductions: HashMap<EntityId, Vec<ContributionReduction>> = HashMap::new();
if let Some(threshold) = max_coeff_magnitude {
for est in estimates.iter_mut() {
let has_explosive = est.coefficients.iter().any(|c| c.abs() > threshold);
if has_explosive {
let original_order = est.coefficients.len();
all_reductions
.entry(est.hydro_id)
.or_default()
.push(ContributionReduction {
season_id: est.season_id,
original_order,
reduced_order: 0,
contributions: Vec::new(),
reason: ReductionReason::MagnitudeBound,
});
est.coefficients.clear();
est.residual_std_ratio = 1.0;
}
}
}
for est in estimates.iter_mut() {
if has_negative_phi1(&est.coefficients) {
let original_order = est.coefficients.len();
all_reductions
.entry(est.hydro_id)
.or_default()
.push(ContributionReduction {
season_id: est.season_id,
original_order,
reduced_order: 0,
contributions: Vec::new(),
reason: ReductionReason::Phi1Negative,
});
est.coefficients.clear();
est.residual_std_ratio = 1.0;
}
}
let mut hydro_indices: BTreeMap<EntityId, Vec<usize>> = BTreeMap::new();
for (idx, est) in estimates.iter().enumerate() {
hydro_indices.entry(est.hydro_id).or_default().push(idx);
}
for &hydro_id in hydro_ids {
let Some(indices) = hydro_indices.get(&hydro_id) else {
continue;
};
let mut obs_by_season: Vec<Vec<f64>> = vec![Vec::new(); n_seasons];
let mut stats_by_season: Vec<(f64, f64)> = vec![(0.0, 0.0); n_seasons];
for season in 0..n_seasons {
if let Some(obs) = group_obs.get(&(hydro_id, season)) {
obs_by_season[season].clone_from(obs);
}
if let Some(stats) = stats_map.get(&(hydro_id, season)) {
stats_by_season[season] = (stats.mean, stats.std);
}
}
let std_by_season: Vec<f64> = (0..n_seasons)
.map(|sid| stats_map.get(&(hydro_id, sid)).map_or(0.0, |s| s.std))
.collect();
let mut max_orders: Vec<usize> = vec![initial_max_order; n_seasons];
let mut all_coeffs: Vec<Vec<f64>> = vec![Vec::new(); n_seasons];
for &idx in indices {
let est = &estimates[idx];
if est.season_id < n_seasons {
all_coeffs[est.season_id].clone_from(&est.coefficients);
}
}
let mut frozen: Vec<bool> = vec![false; n_seasons];
for &idx in indices {
let sid = estimates[idx].season_id;
if estimates[idx].coefficients.is_empty() {
frozen[sid] = true;
}
}
let obs_refs: Vec<&[f64]> = obs_by_season.iter().map(Vec::as_slice).collect();
loop {
let mut failing_seasons: Vec<usize> = Vec::new();
for &idx in indices {
let season_id = estimates[idx].season_id;
if frozen[season_id] {
continue;
}
let current_order = estimates[idx].coefficients.len();
if current_order == 0 {
continue;
}
let result = validate_order_contributions(
season_id,
n_seasons,
current_order,
&all_coeffs,
&std_by_season,
);
if !result.valid {
all_reductions
.entry(hydro_id)
.or_default()
.push(ContributionReduction {
season_id,
original_order: current_order,
reduced_order: result.max_valid_order,
contributions: result.contributions,
reason: ReductionReason::NegativeContribution,
});
failing_seasons.push(season_id);
}
}
if failing_seasons.is_empty() {
break; }
let mut any_reselected = false;
for &season_id in &failing_seasons {
if max_orders[season_id] == 0 {
continue;
}
max_orders[season_id] -= 1;
if max_orders[season_id] == 0 {
for &idx in indices {
if estimates[idx].season_id == season_id {
estimates[idx].coefficients.clear();
estimates[idx].residual_std_ratio = 1.0;
all_coeffs[season_id].clear();
frozen[season_id] = true;
}
}
continue;
}
let stats_s = stats_by_season[season_id];
if stats_s.1 == 0.0 || obs_by_season[season_id].len() < 2 {
frozen[season_id] = true;
continue;
}
let n_obs = obs_by_season[season_id].len();
let pacf_values = periodic_pacf(
season_id,
max_orders[season_id],
n_seasons,
&obs_refs,
&stats_by_season,
);
let pacf_result = select_order_pacf(&pacf_values, n_obs, z_alpha);
let selected_order = pacf_result.selected_order;
let yw_result = estimate_periodic_ar_coefficients(
season_id,
selected_order,
n_seasons,
&obs_refs,
&stats_by_season,
);
for &idx in indices {
if estimates[idx].season_id == season_id {
estimates[idx]
.coefficients
.clone_from(&yw_result.coefficients);
estimates[idx].residual_std_ratio = yw_result.residual_std_ratio;
all_coeffs[season_id].clone_from(&yw_result.coefficients);
}
}
if has_negative_phi1(&all_coeffs[season_id]) {
let original_order = all_coeffs[season_id].len();
all_reductions
.entry(hydro_id)
.or_default()
.push(ContributionReduction {
season_id,
original_order,
reduced_order: 0,
contributions: Vec::new(),
reason: ReductionReason::Phi1Negative,
});
for &idx in indices {
if estimates[idx].season_id == season_id {
estimates[idx].coefficients.clear();
estimates[idx].residual_std_ratio = 1.0;
all_coeffs[season_id].clear();
frozen[season_id] = true;
}
}
} else {
any_reselected = true;
}
}
if !any_reselected {
break; }
}
}
all_reductions
}
#[cfg(test)]
fn apply_contribution_validation(
estimates: &mut [ArCoefficientEstimate],
n_seasons: usize,
stats_map: &HashMap<(EntityId, usize), &SeasonalStats>,
sigma2_map: &HashMap<(EntityId, usize), Vec<f64>>,
max_coeff_magnitude: Option<f64>,
) -> HashMap<EntityId, Vec<ContributionReduction>> {
let mut all_reductions: HashMap<EntityId, Vec<ContributionReduction>> = HashMap::new();
if let Some(threshold) = max_coeff_magnitude {
for est in estimates.iter_mut() {
let has_explosive = est.coefficients.iter().any(|c| c.abs() > threshold);
if has_explosive {
let original_order = est.coefficients.len();
all_reductions
.entry(est.hydro_id)
.or_default()
.push(ContributionReduction {
season_id: est.season_id,
original_order,
reduced_order: 0,
contributions: Vec::new(), reason: ReductionReason::MagnitudeBound,
});
est.coefficients.clear();
est.residual_std_ratio = 1.0;
}
}
}
for est in estimates.iter_mut() {
if has_negative_phi1(&est.coefficients) {
let original_order = est.coefficients.len();
all_reductions
.entry(est.hydro_id)
.or_default()
.push(ContributionReduction {
season_id: est.season_id,
original_order,
reduced_order: 0,
contributions: Vec::new(),
reason: ReductionReason::Phi1Negative,
});
est.coefficients.clear();
est.residual_std_ratio = 1.0;
}
}
let mut hydro_indices: BTreeMap<EntityId, Vec<usize>> = BTreeMap::new();
for (idx, est) in estimates.iter().enumerate() {
hydro_indices.entry(est.hydro_id).or_default().push(idx);
}
for (&hydro_id, indices) in &hydro_indices {
let std_by_season: Vec<f64> = (0..n_seasons)
.map(|sid| stats_map.get(&(hydro_id, sid)).map_or(0.0, |s| s.std))
.collect();
let mut all_coeffs: Vec<Vec<f64>> = vec![Vec::new(); n_seasons];
for &idx in indices {
let est = &estimates[idx];
if est.season_id < n_seasons {
all_coeffs[est.season_id].clone_from(&est.coefficients);
}
}
for &idx in indices {
let season_id = estimates[idx].season_id;
let mut current_order = estimates[idx].coefficients.len();
loop {
let result = validate_order_contributions(
season_id,
n_seasons,
current_order,
&all_coeffs,
&std_by_season,
);
if result.valid || current_order == 0 {
break;
}
let original_order = current_order;
let reduced_order = result.max_valid_order;
all_reductions
.entry(hydro_id)
.or_default()
.push(ContributionReduction {
season_id,
original_order,
reduced_order,
contributions: result.contributions,
reason: ReductionReason::NegativeContribution,
});
estimates[idx].coefficients.truncate(reduced_order);
if reduced_order == 0 {
estimates[idx].residual_std_ratio = 1.0;
} else if let Some(sigma2_vec) = sigma2_map.get(&(hydro_id, season_id)) {
if reduced_order <= sigma2_vec.len() {
let sigma2 = sigma2_vec[reduced_order - 1];
estimates[idx].residual_std_ratio =
if sigma2 <= 0.0 { 1.0 } else { sigma2.sqrt() };
}
}
all_coeffs[season_id].clone_from(&estimates[idx].coefficients);
current_order = reduced_order;
}
}
}
all_reductions
}
pub(crate) fn build_estimation_report(
estimates: &[ArCoefficientEstimate],
n_seasons: usize,
contribution_reductions: &HashMap<EntityId, Vec<ContributionReduction>>,
method: &str,
) -> EstimationReport {
let mut hydro_coeffs: BTreeMap<EntityId, Vec<(usize, Vec<f64>)>> = BTreeMap::new();
for est in estimates {
hydro_coeffs
.entry(est.hydro_id)
.or_default()
.push((est.season_id, est.coefficients.clone()));
}
let mut entries: BTreeMap<EntityId, HydroEstimationEntry> = BTreeMap::new();
for (hydro_id, mut season_coeffs) in hydro_coeffs {
season_coeffs.sort_by_key(|(season_id, _)| *season_id);
let selected_order = season_coeffs
.iter()
.map(|(_, coeffs)| coeffs.len())
.max()
.unwrap_or(0);
let season_map: HashMap<usize, Vec<f64>> = season_coeffs.into_iter().collect();
let coefficients: Vec<Vec<f64>> = (0..n_seasons)
.map(|sid| season_map.get(&sid).cloned().unwrap_or_default())
.collect();
let reductions = contribution_reductions
.get(&hydro_id)
.cloned()
.unwrap_or_default();
#[allow(clippy::cast_possible_truncation)]
entries.insert(
hydro_id,
HydroEstimationEntry {
selected_order: selected_order as u32,
coefficients,
contribution_reductions: reductions,
},
);
}
EstimationReport {
entries,
method: method.to_string(),
white_noise_fallbacks: Vec::new(),
lag_scale_warnings: Vec::new(),
std_ratio_warnings: Vec::new(),
}
}
fn seasonal_stats_to_rows(
stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
) -> Vec<InflowSeasonalStatsRow> {
let stage_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut season_to_stages: HashMap<usize, Vec<i32>> = HashMap::new();
for stage in stages {
if let Some(sid) = stage.season_id {
season_to_stages.entry(sid).or_default().push(stage.id);
}
}
let mut rows = Vec::with_capacity(stats.len() * 10);
for s in stats {
if let Some(&season_id) = stage_to_season.get(&s.stage_id) {
if let Some(stage_ids) = season_to_stages.get(&season_id) {
for &stage_id in stage_ids {
rows.push(InflowSeasonalStatsRow {
hydro_id: s.entity_id,
stage_id,
mean_m3s: s.mean,
std_m3s: s.std,
});
}
continue;
}
}
rows.push(InflowSeasonalStatsRow {
hydro_id: s.entity_id,
stage_id: s.stage_id,
mean_m3s: s.mean,
std_m3s: s.std,
});
}
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id));
rows
}
fn ar_estimates_to_rows(
ar_estimates: &[ArCoefficientEstimate],
stages: &[cobre_core::temporal::Stage],
) -> Vec<InflowArCoefficientRow> {
let mut season_to_stages: HashMap<usize, Vec<i32>> = HashMap::new();
for stage in stages {
if let Some(sid) = stage.season_id {
season_to_stages.entry(sid).or_default().push(stage.id);
}
}
let mut rows: Vec<InflowArCoefficientRow> = Vec::new();
for est in ar_estimates {
let Some(stage_ids) = season_to_stages.get(&est.season_id) else {
continue;
};
for &stage_id in stage_ids {
for (lag_idx, &coeff) in est.coefficients.iter().enumerate() {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let lag = (lag_idx + 1) as i32;
rows.push(InflowArCoefficientRow {
hydro_id: est.hydro_id,
stage_id,
lag,
coefficient: coeff,
residual_std_ratio: est.residual_std_ratio,
});
}
}
}
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id, r.lag));
rows
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::too_many_lines,
clippy::float_cmp,
clippy::doc_markdown,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::useless_vec
)]
mod tests {
use super::*;
use cobre_core::scenario::{CorrelationModel, InflowModel};
use cobre_core::{EntityId, SystemBuilder};
fn minimal_system_with_inflow_models(models: Vec<InflowModel>) -> System {
SystemBuilder::new()
.inflow_models(models)
.build()
.expect("valid system")
}
#[test]
fn test_with_scenario_models_replaces_fields() {
use cobre_core::{
Bus, DeficitSegment,
scenario::{CorrelationModel, InflowModel},
};
let bus = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let old_model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 10.0,
std_m3s: 1.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let system = SystemBuilder::new()
.buses(vec![bus])
.inflow_models(vec![old_model.clone(), {
let mut m = old_model.clone();
m.stage_id = 1;
m
}])
.build()
.expect("valid system");
assert_eq!(system.inflow_models().len(), 2);
assert_eq!(system.n_buses(), 1);
let new_models: Vec<InflowModel> = (0..4)
.map(|i| InflowModel {
hydro_id: EntityId(1),
stage_id: i,
mean_m3s: 50.0,
std_m3s: 5.0,
ar_coefficients: vec![0.4],
residual_std_ratio: 0.9,
})
.collect();
let new_corr = CorrelationModel::default();
let updated = system.with_scenario_models(new_models.clone(), new_corr.clone());
assert_eq!(updated.inflow_models().len(), 4, "expected 4 inflow models");
assert_eq!(
*updated.correlation(),
new_corr,
"correlation should equal new_corr"
);
assert_eq!(updated.n_buses(), 1, "buses must be preserved");
assert!(
updated.hydros().is_empty(),
"hydros must be preserved (empty)"
);
assert!(
updated.stages().is_empty(),
"stages must be preserved (empty)"
);
}
#[test]
fn test_with_scenario_models_clears_when_empty() {
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let system = minimal_system_with_inflow_models(vec![model]);
assert_eq!(system.inflow_models().len(), 1);
let updated = system.with_scenario_models(vec![], CorrelationModel::default());
assert!(updated.inflow_models().is_empty());
}
#[test]
fn test_estimate_explicit_stats_returns_unchanged() {
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
std::fs::write(scenarios.join("inflow_history.parquet"), b"").unwrap();
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"").unwrap();
std::fs::write(scenarios.join("inflow_ar_coefficients.parquet"), b"").unwrap();
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![0.5],
residual_std_ratio: 0.87,
};
let system = minimal_system_with_inflow_models(vec![model]);
let original_len = system.inflow_models().len();
let config = default_config();
let (result, report, _path) = estimate_from_history(system, case_dir, &config).unwrap();
assert_eq!(
result.inflow_models().len(),
original_len,
"explicit stats: system must be unchanged"
);
assert!(
report.is_none(),
"explicit stats path must return None report"
);
}
#[test]
fn test_estimate_no_history_returns_unchanged() {
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let system = minimal_system_with_inflow_models(vec![model]);
let original_len = system.inflow_models().len();
let config = default_config();
let (result, report, _path) = estimate_from_history(system, case_dir, &config).unwrap();
assert_eq!(
result.inflow_models().len(),
original_len,
"no history: system must be unchanged"
);
assert!(report.is_none(), "no history path must return None report");
}
#[test]
fn test_estimation_path_resolve_all_8_combinations() {
use cobre_io::FileManifest;
let make = |history: bool, stats: bool, ar: bool| FileManifest {
scenarios_inflow_history_parquet: history,
scenarios_inflow_seasonal_stats_parquet: stats,
scenarios_inflow_ar_coefficients_parquet: ar,
..Default::default()
};
assert_eq!(
EstimationPath::resolve(&make(false, false, false)),
EstimationPath::Deterministic,
);
assert_eq!(
EstimationPath::resolve(&make(false, false, true)),
EstimationPath::Deterministic,
);
assert_eq!(
EstimationPath::resolve(&make(false, true, false)),
EstimationPath::UserStatsWhiteNoise,
);
assert_eq!(
EstimationPath::resolve(&make(false, true, true)),
EstimationPath::UserProvidedNoHistory,
);
assert_eq!(
EstimationPath::resolve(&make(true, false, false)),
EstimationPath::FullEstimation,
);
assert_eq!(
EstimationPath::resolve(&make(true, false, true)),
EstimationPath::UserArHistoryStats,
);
assert_eq!(
EstimationPath::resolve(&make(true, true, false)),
EstimationPath::PartialEstimation,
);
assert_eq!(
EstimationPath::resolve(&make(true, true, true)),
EstimationPath::UserProvidedAll,
);
}
#[test]
fn test_estimation_path_as_str_round_trip() {
let variants = [
EstimationPath::Deterministic,
EstimationPath::UserStatsWhiteNoise,
EstimationPath::UserProvidedNoHistory,
EstimationPath::FullEstimation,
EstimationPath::UserArHistoryStats,
EstimationPath::PartialEstimation,
EstimationPath::UserProvidedAll,
];
let strings: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
for s in &strings {
assert!(!s.is_empty(), "as_str() returned empty string");
}
let unique: std::collections::HashSet<&&str> = strings.iter().collect();
assert_eq!(
unique.len(),
variants.len(),
"as_str() must return unique strings for each variant"
);
}
#[test]
fn test_user_stats_to_rows_maps_all_models() {
let models = vec![
InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
},
InflowModel {
hydro_id: EntityId(1),
stage_id: 1,
mean_m3s: 120.0,
std_m3s: 12.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
},
InflowModel {
hydro_id: EntityId(2),
stage_id: 0,
mean_m3s: 50.0,
std_m3s: 5.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
},
];
let system = minimal_system_with_inflow_models(models.clone());
let rows = user_stats_to_rows(&system);
assert_eq!(rows.len(), 3, "must produce one row per InflowModel");
for (model, row) in models.iter().zip(rows.iter()) {
assert_eq!(row.hydro_id, model.hydro_id, "hydro_id must be preserved");
assert_eq!(row.stage_id, model.stage_id, "stage_id must be preserved");
assert_eq!(
row.mean_m3s.to_bits(),
model.mean_m3s.to_bits(),
"mean_m3s must be bitwise identical"
);
assert_eq!(
row.std_m3s.to_bits(),
model.std_m3s.to_bits(),
"std_m3s must be bitwise identical"
);
}
}
#[test]
fn test_user_stats_to_rows_empty_system() {
let system = minimal_system_with_inflow_models(vec![]);
let rows = user_stats_to_rows(&system);
assert!(rows.is_empty(), "empty system must produce empty rows");
}
fn write_unit_test_inflow_history(path: &std::path::Path, hydro_id: i32, n_years: usize) {
use arrow::array::{Date32Array, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use chrono::NaiveDate;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_to_days = |d: NaiveDate| -> i32 {
i32::try_from((d - epoch).num_days()).expect("date in Date32 range")
};
let (obs_s0, obs_s1) = simulate_two_season_par2(0.7, 0.15, n_years, 99);
let mut ids: Vec<i32> = Vec::with_capacity(n_years * 2);
let mut dates: Vec<i32> = Vec::with_capacity(n_years * 2);
let mut values: Vec<f64> = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = (1970 + y) as i32;
ids.push(hydro_id);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 1, 15).unwrap()));
values.push(obs_s0[y] + 300.0);
ids.push(hydro_id);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 7, 15).unwrap()));
values.push(obs_s1[y] + 300.0);
}
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("value_m3s", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Date32Array::from(dates)),
Arc::new(Float64Array::from(values)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[allow(clippy::cast_possible_wrap)]
fn build_system_with_user_stats(n_years: usize) -> System {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::InflowModel;
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let hydro_id = EntityId(1);
let bus = Bus {
id: EntityId(10),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let inflow_models: Vec<InflowModel> = stages
.iter()
.map(|s| InflowModel {
hydro_id,
stage_id: s.id,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
})
.collect();
let hydro = Hydro {
id: hydro_id,
name: "H1".to_string(),
bus_id: EntityId(10),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 0.9,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.build()
.expect("valid system with user stats")
}
fn setup_partial_estimation_case(case_dir: &std::path::Path, n_years: usize) {
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_unit_test_inflow_history(
&scenarios.join("inflow_history.parquet"),
1, n_years,
);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
}
#[test]
fn test_partial_estimation_preserves_user_stats() {
use tempfile::TempDir;
const N_YEARS: usize = 30; let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("partial estimation must succeed");
assert_eq!(
path,
EstimationPath::PartialEstimation,
"expected PartialEstimation path"
);
assert!(
report.is_some(),
"PartialEstimation must return Some(report)"
);
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"partial estimation must produce at least one inflow model"
);
for m in models {
assert_eq!(
m.mean_m3s.to_bits(),
100.0_f64.to_bits(),
"mean_m3s must be bitwise identical to user value 100.0 for stage {}",
m.stage_id
);
assert_eq!(
m.std_m3s.to_bits(),
10.0_f64.to_bits(),
"std_m3s must be bitwise identical to user value 10.0 for stage {}",
m.stage_id
);
assert!(
!m.ar_coefficients.is_empty(),
"ar_coefficients must be non-empty for stage {} (estimated from history)",
m.stage_id
);
}
}
#[test]
fn test_partial_estimation_returns_report() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (_updated, report, _path) = estimate_from_history(system, case_dir, &config)
.expect("partial estimation must succeed");
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert_eq!(
report.method, "PACF",
"estimation method must be PACF, got '{}'",
report.method
);
assert_eq!(
report.entries.len(),
1,
"report must contain exactly 1 entry (one hydro), got {}",
report.entries.len()
);
assert!(
report.entries.contains_key(&EntityId(1)),
"report must contain an entry for hydro_id=1"
);
}
#[allow(clippy::too_many_lines, clippy::similar_names)]
fn collect_lag_scale_warnings(
hydro_id: EntityId,
lag_value: f64,
user_mean: f64,
estimated_mean: f64,
include_past_inflows: bool,
) -> Vec<LagScaleWarning> {
use cobre_core::scenario::InflowModel;
use cobre_core::{HydroPastInflows, InitialConditions};
let fitting_stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: estimated_mean,
std: 50.0,
}];
let user_model = InflowModel {
hydro_id,
stage_id: 0,
mean_m3s: user_mean,
std_m3s: 50.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let past_inflows = if include_past_inflows {
vec![HydroPastInflows {
hydro_id,
values_m3s: vec![lag_value],
}]
} else {
vec![]
};
let ic = InitialConditions {
storage: vec![],
filling_storage: vec![],
past_inflows,
};
let system = SystemBuilder::new()
.inflow_models(vec![user_model])
.initial_conditions(ic)
.build()
.expect("valid system");
let estimated_mean_at_stage0: BTreeMap<EntityId, f64> = fitting_stats
.iter()
.filter(|s| s.stage_id == 0)
.map(|s| (s.entity_id, s.mean))
.collect();
let user_mean_at_stage0: BTreeMap<EntityId, f64> = system
.inflow_models()
.iter()
.filter(|m| m.stage_id == 0)
.map(|m| (m.hydro_id, m.mean_m3s))
.collect();
let mut sorted_past_inflows: Vec<&cobre_core::HydroPastInflows> =
system.initial_conditions().past_inflows.iter().collect();
sorted_past_inflows.sort_by_key(|p| p.hydro_id);
let mut warnings: Vec<LagScaleWarning> = Vec::new();
for past in sorted_past_inflows {
let Some(&lv) = past.values_m3s.first() else {
continue;
};
let Some(&est_mean) = estimated_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
let Some(&usr_mean) = user_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
if (lv - est_mean).abs() < (lv - usr_mean).abs() {
warnings.push(LagScaleWarning {
hydro_id: past.hydro_id,
lag_value: lv,
user_mean: usr_mean,
estimated_mean: est_mean,
});
}
}
warnings
}
#[test]
fn test_lag_scale_warning_fires_when_closer_to_estimated() {
let warnings = collect_lag_scale_warnings(
EntityId(1),
500.0, 800.0, 480.0, true, );
assert_eq!(
warnings.len(),
1,
"expected exactly one LagScaleWarning when lag is closer to estimated mean"
);
assert_eq!(
warnings[0].hydro_id,
EntityId(1),
"warning must record the correct hydro_id"
);
assert!((warnings[0].lag_value - 500.0).abs() < f64::EPSILON);
assert!((warnings[0].user_mean - 800.0).abs() < f64::EPSILON);
assert!((warnings[0].estimated_mean - 480.0).abs() < f64::EPSILON);
}
#[test]
fn test_lag_scale_warning_not_fires_when_closer_to_user() {
let warnings = collect_lag_scale_warnings(
EntityId(1),
790.0, 800.0, 480.0, true, );
assert!(
warnings.is_empty(),
"expected no LagScaleWarning when lag is closer to user mean"
);
}
#[test]
fn test_lag_scale_warning_empty_past_inflows() {
let warnings = collect_lag_scale_warnings(
EntityId(1),
500.0, 800.0, 480.0, false, );
assert!(
warnings.is_empty(),
"expected no warnings when past_inflows is empty"
);
}
#[test]
fn test_lag_scale_warning_skips_hydro_without_history() {
use cobre_core::scenario::InflowModel;
use cobre_core::{HydroPastInflows, InitialConditions};
let hydro_id = EntityId(1);
let other_hydro_id = EntityId(2);
let fitting_stats = vec![SeasonalStats {
entity_id: other_hydro_id,
stage_id: 0,
mean: 480.0,
std: 50.0,
}];
let user_model = InflowModel {
hydro_id,
stage_id: 0,
mean_m3s: 800.0,
std_m3s: 50.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let ic = InitialConditions {
storage: vec![],
filling_storage: vec![],
past_inflows: vec![HydroPastInflows {
hydro_id,
values_m3s: vec![500.0],
}],
};
let system = SystemBuilder::new()
.inflow_models(vec![user_model])
.initial_conditions(ic)
.build()
.expect("valid system");
let estimated_mean_at_stage0: BTreeMap<EntityId, f64> = fitting_stats
.iter()
.filter(|s| s.stage_id == 0)
.map(|s| (s.entity_id, s.mean))
.collect();
let user_mean_at_stage0: BTreeMap<EntityId, f64> = system
.inflow_models()
.iter()
.filter(|m| m.stage_id == 0)
.map(|m| (m.hydro_id, m.mean_m3s))
.collect();
let mut sorted_past_inflows: Vec<&cobre_core::HydroPastInflows> =
system.initial_conditions().past_inflows.iter().collect();
sorted_past_inflows.sort_by_key(|p| p.hydro_id);
let mut warnings: Vec<LagScaleWarning> = Vec::new();
for past in sorted_past_inflows {
let Some(&lv) = past.values_m3s.first() else {
continue;
};
let Some(&est_mean) = estimated_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
let Some(&usr_mean) = user_mean_at_stage0.get(&past.hydro_id) else {
continue;
};
if (lv - est_mean).abs() < (lv - usr_mean).abs() {
warnings.push(LagScaleWarning {
hydro_id: past.hydro_id,
lag_value: lv,
user_mean: usr_mean,
estimated_mean: est_mean,
});
}
}
assert!(
warnings.is_empty(),
"hydro without fitting_stats entry must be skipped (no warning)"
);
}
fn default_config() -> Config {
use cobre_io::config::{EstimationConfig, OrderSelectionMethod};
let mut cfg: Config = serde_json::from_str(MINIMAL_CONFIG_JSON).unwrap();
cfg.estimation = EstimationConfig {
max_order: 2,
order_selection: OrderSelectionMethod::Pacf,
min_observations_per_season: 2,
max_coefficient_magnitude: None,
};
cfg
}
const MINIMAL_CONFIG_JSON: &str = r#"{
"training": { "tree_seed": 42 },
"simulation": { "enabled": false, "num_scenarios": 0, "io_channel_capacity": 16 },
"modeling": {},
"policy": {},
"exports": {},
"output": {}
}"#;
fn create_required_files(case_dir: &std::path::Path) {
let _ = std::fs::create_dir_all(case_dir.join("system"));
let _ = std::fs::create_dir_all(case_dir.join("scenarios"));
let write = |name: &str| {
let _ = std::fs::write(case_dir.join(name), b"{}");
};
write("config.json");
write("penalties.json");
write("stages.json");
write("initial_conditions.json");
write("system/buses.json");
write("system/lines.json");
write("system/hydros.json");
write("system/thermals.json");
}
#[test]
fn test_estimation_report_structure() {
let h1 = EntityId(1);
let h2 = EntityId(2);
let n_seasons = 3_usize;
let mut estimates = Vec::new();
for &hydro_id in &[h1, h2] {
for season_id in 0..n_seasons {
estimates.push(ArCoefficientEstimate {
hydro_id,
season_id,
coefficients: vec![0.5, 0.3],
residual_std_ratio: 0.9,
});
}
}
let contribution_reductions: HashMap<EntityId, Vec<ContributionReduction>> = HashMap::new();
let report =
build_estimation_report(&estimates, n_seasons, &contribution_reductions, "PACF");
assert_eq!(report.entries.len(), 2, "expected 2 hydro entries");
for &hydro_id in &[h1, h2] {
let entry = report.entries.get(&hydro_id).expect("entry must exist");
assert_eq!(entry.selected_order, 2, "selected_order must be 2");
assert_eq!(
entry.coefficients.len(),
n_seasons,
"one coefficient vec per season"
);
}
}
#[test]
fn test_estimation_report_empty_for_pacf() {
use cobre_core::temporal::Stage;
let observations: Vec<(EntityId, chrono::NaiveDate, f64)> = vec![];
let seasonal_stats: Vec<SeasonalStats> = vec![];
let stages: Vec<Stage> = vec![];
let hydro_ids: Vec<EntityId> = vec![];
let max_order = 2;
let (_, report) = estimate_ar_coefficients_with_selection(
&observations,
&seasonal_stats,
&stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: None,
season_map: None,
},
)
.unwrap();
assert!(
report.entries.is_empty(),
"empty observations must produce empty EstimationReport"
);
}
#[test]
fn test_contribution_order_zero_fallback() {
let result = validate_order_contributions(
0, 1, 1, &[vec![-1.5]], &[10.0], );
assert!(!result.valid);
assert_eq!(result.max_valid_order, 0);
}
#[test]
fn test_contribution_order_zero_input_passes() {
let result = validate_order_contributions(0, 1, 0, &[Vec::new()], &[10.0]);
assert!(result.valid);
assert_eq!(result.max_valid_order, 0);
assert!(result.contributions.is_empty());
}
#[test]
fn test_contribution_stable_model_passes() {
let result = validate_order_contributions(
0, 1, 2, &[vec![0.4, 0.2]], &[10.0], );
assert!(result.valid);
assert_eq!(result.max_valid_order, 2);
assert_eq!(result.contributions.len(), 2);
}
#[test]
fn test_apply_contribution_validation_reduces_explosive() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.3, -0.8],
residual_std_ratio: 0.9,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 100.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let mut sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
sigma2_map.insert((hydro_id, 0), vec![0.81, 0.75]);
let reductions = apply_contribution_validation(
&mut estimates,
n_seasons,
&stats_map,
&sigma2_map,
None, );
assert_eq!(
estimates[0].coefficients.len(),
1,
"explosive AR(2) should be reduced to AR(1)"
);
assert!((estimates[0].coefficients[0] - 0.3).abs() < 1e-10);
assert!(
(estimates[0].residual_std_ratio - 0.81_f64.sqrt()).abs() < 1e-10,
"residual_std_ratio should be recomputed from sigma2_per_order[0]"
);
let entity_reductions = reductions.get(&hydro_id).expect("should have reductions");
assert_eq!(entity_reductions.len(), 1);
assert_eq!(entity_reductions[0].original_order, 2);
assert_eq!(entity_reductions[0].reduced_order, 1);
assert_eq!(entity_reductions[0].season_id, 0);
}
#[test]
fn test_pimental_like_multi_season_reduction() {
let hydro_id = EntityId(156);
let n_seasons = 12;
let mut estimates: Vec<ArCoefficientEstimate> = (0..n_seasons)
.map(|s| ArCoefficientEstimate {
hydro_id,
season_id: s,
coefficients: if s == 7 {
vec![0.5, 48.9]
} else {
vec![0.1] },
residual_std_ratio: 0.95,
})
.collect();
let stds: Vec<f64> = (0..n_seasons)
.map(|s| if s == 7 { 5.0 } else { 200.0 })
.collect();
let stats: Vec<SeasonalStats> = (0..n_seasons)
.map(|s| SeasonalStats {
entity_id: hydro_id,
stage_id: s as i32,
mean: 100.0,
std: stds[s],
})
.collect();
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = stats
.iter()
.enumerate()
.map(|(s, st)| ((hydro_id, s), st))
.collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions = apply_contribution_validation(
&mut estimates,
n_seasons,
&stats_map,
&sigma2_map,
None, );
let august_order = estimates[7].coefficients.len();
for (s, est) in estimates.iter().enumerate() {
if s != 7 {
assert_eq!(est.coefficients.len(), 1, "season {s} should remain AR(1)");
}
}
if august_order < 2 {
let entity_reductions = reductions.get(&hydro_id).expect("should have reductions");
assert!(
entity_reductions.iter().any(|r| r.season_id == 7),
"August reduction should be recorded"
);
}
}
#[test]
fn test_all_negative_fallback_to_white_noise() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![-2.0],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let _reductions = apply_contribution_validation(
&mut estimates,
n_seasons,
&stats_map,
&sigma2_map,
None, );
assert!(
estimates[0].coefficients.is_empty(),
"should fall back to order 0"
);
assert!(
(estimates[0].residual_std_ratio - 1.0).abs() < 1e-10,
"white-noise residual ratio should be 1.0"
);
}
#[test]
fn phi1_rejection_sets_order_to_zero() {
let hydro_id = EntityId(1);
let n_seasons = 2;
let mut estimates = vec![
ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![-0.3, 0.5],
residual_std_ratio: 0.8,
},
ArCoefficientEstimate {
hydro_id,
season_id: 1,
coefficients: vec![0.4, 0.2],
residual_std_ratio: 0.7,
},
];
let stats = vec![
SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
},
SeasonalStats {
entity_id: hydro_id,
stage_id: 1,
mean: 60.0,
std: 12.0,
},
];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = stats
.iter()
.enumerate()
.map(|(i, s)| ((s.entity_id, i), s))
.collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
assert!(
estimates[0].coefficients.is_empty(),
"season 0 should be cleared to order 0"
);
assert!(
(estimates[0].residual_std_ratio - 1.0).abs() < 1e-10,
"season 0 residual_std_ratio should be 1.0"
);
assert_eq!(
estimates[1].coefficients,
vec![0.4, 0.2],
"season 1 should be unchanged"
);
let hydro_reductions = reductions.get(&hydro_id).expect("should have reductions");
let r = hydro_reductions
.iter()
.find(|r| r.season_id == 0)
.expect("should have reduction for season 0");
assert_eq!(r.original_order, 2);
assert_eq!(r.reduced_order, 0);
assert!(r.contributions.is_empty());
}
#[test]
fn phi1_rejection_before_contribution_analysis() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![-0.01, 0.5],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
assert!(
estimates[0].coefficients.is_empty(),
"phi_1 = -0.01 should trigger rejection"
);
assert!(
reductions.contains_key(&hydro_id),
"should have a reduction entry"
);
}
#[test]
fn phi1_zero_is_not_rejected() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.0, 0.3],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let _reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
assert_eq!(
estimates[0].coefficients.len(),
2,
"phi_1 = 0.0 should not be rejected"
);
}
#[test]
fn phi1_rejection_interacts_with_magnitude_bound() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![-50.0, 0.3],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions = apply_contribution_validation(
&mut estimates,
n_seasons,
&stats_map,
&sigma2_map,
Some(10.0), );
assert!(
estimates[0].coefficients.is_empty(),
"should be cleared to order 0"
);
let hydro_reductions = reductions.get(&hydro_id).expect("should have reductions");
assert_eq!(
hydro_reductions.len(),
1,
"should have exactly 1 reduction entry (magnitude bound only)"
);
}
#[allow(clippy::cast_precision_loss, clippy::cast_lossless)] fn generate_ar_observations(coefficients: &[f64], n: usize) -> Vec<f64> {
let p = coefficients.len();
let mut values = vec![0.0_f64; n + p];
let mut seed: u64 = 42;
for i in p..(n + p) {
seed = seed.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
let noise = ((seed >> 33) as f64 / (u32::MAX as f64) - 0.5) * 2.0;
let mut val = noise;
for (j, c) in coefficients.iter().enumerate() {
val += c * values[i - j - 1];
}
values[i] = val;
}
values[p..].to_vec()
}
#[test]
fn iterative_reduction_terminates_at_zero() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.3, -0.8],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
assert!(
estimates[0].coefficients.len() < 2,
"order should be reduced from 2; got {}",
estimates[0].coefficients.len()
);
assert!(
reductions.contains_key(&hydro_id),
"should have a reduction entry"
);
}
#[test]
fn iterative_reduction_only_affects_failing_seasons() {
let hydro_id = EntityId(1);
let n_seasons = 2;
let mut estimates = vec![
ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.3, -0.8],
residual_std_ratio: 0.8,
},
ArCoefficientEstimate {
hydro_id,
season_id: 1,
coefficients: vec![0.4, 0.2],
residual_std_ratio: 0.7,
},
];
let stats = vec![
SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
},
SeasonalStats {
entity_id: hydro_id,
stage_id: 1,
mean: 60.0,
std: 10.0,
},
];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = stats
.iter()
.enumerate()
.map(|(i, s)| ((s.entity_id, i), s))
.collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let _reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
assert!(
estimates[0].coefficients.len() < 2,
"season 0 order should be reduced from 2; got {}",
estimates[0].coefficients.len()
);
assert_eq!(
estimates[1].coefficients,
vec![0.4, 0.2],
"season 1 should be unchanged"
);
}
#[test]
fn iterative_pacf_reduction_with_synthetic_observations() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let obs = generate_ar_observations(&[0.5, 0.2], 100);
let mut group_obs: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
group_obs.insert((hydro_id, 0), obs);
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 0.0,
std: 1.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.5, 0.2, -5.0], residual_std_ratio: 0.8,
}];
let reductions = iterative_pacf_reduction(
&mut estimates,
n_seasons,
&[hydro_id],
&group_obs,
&stats_map,
3, 1.96,
None,
);
assert!(
estimates[0].coefficients.len() < 3,
"order should be reduced from 3; got {}",
estimates[0].coefficients.len()
);
assert!(
reductions.contains_key(&hydro_id),
"should have a reduction entry"
);
}
#[test]
fn fixed_path_uses_truncation_not_reselection() {
let hydro_id = EntityId(1);
let n_seasons = 1;
let mut estimates = vec![ArCoefficientEstimate {
hydro_id,
season_id: 0,
coefficients: vec![0.5, 0.2, -0.8],
residual_std_ratio: 0.8,
}];
let stats = vec![SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: 50.0,
std: 10.0,
}];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> =
stats.iter().map(|s| ((s.entity_id, 0_usize), s)).collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions =
apply_contribution_validation(&mut estimates, n_seasons, &stats_map, &sigma2_map, None);
let final_order = estimates[0].coefficients.len();
assert!(
final_order <= 2,
"Fixed path should truncate; got order {final_order}"
);
assert!(
reductions.contains_key(&hydro_id),
"should have a reduction entry"
);
let r = &reductions[&hydro_id][0];
assert_eq!(r.original_order, 3);
assert!(r.reduced_order <= 2, "truncation should produce order <= 2");
}
#[test]
fn combined_strategies_produce_correct_reduction_reasons() {
let h1 = EntityId(1);
let h2 = EntityId(2);
let n_seasons = 2;
let mut estimates = vec![
ArCoefficientEstimate {
hydro_id: h1,
season_id: 0,
coefficients: vec![-0.3, 0.5],
residual_std_ratio: 0.8,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 1,
coefficients: vec![0.5, 0.2, -0.8],
residual_std_ratio: 0.7,
},
ArCoefficientEstimate {
hydro_id: h2,
season_id: 0,
coefficients: vec![50.0],
residual_std_ratio: 0.9,
},
ArCoefficientEstimate {
hydro_id: h2,
season_id: 1,
coefficients: vec![0.4, 0.2],
residual_std_ratio: 0.7,
},
];
let stats = vec![
SeasonalStats {
entity_id: h1,
stage_id: 0,
mean: 50.0,
std: 10.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 1,
mean: 60.0,
std: 10.0,
},
SeasonalStats {
entity_id: h2,
stage_id: 0,
mean: 70.0,
std: 15.0,
},
SeasonalStats {
entity_id: h2,
stage_id: 1,
mean: 80.0,
std: 12.0,
},
];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = stats
.iter()
.enumerate()
.map(|(i, s)| ((s.entity_id, i % 2), s))
.collect();
let sigma2_map: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
let reductions = apply_contribution_validation(
&mut estimates,
n_seasons,
&stats_map,
&sigma2_map,
Some(10.0),
);
let h1_reductions = &reductions[&h1];
let h1_s0 = h1_reductions
.iter()
.find(|r| r.season_id == 0)
.expect("should have reduction for H1 S0");
assert_eq!(h1_s0.reason, ReductionReason::Phi1Negative);
assert_eq!(h1_s0.reduced_order, 0);
let h1_s1 = h1_reductions
.iter()
.find(|r| r.season_id == 1)
.expect("should have reduction for H1 S1");
assert_eq!(h1_s1.reason, ReductionReason::NegativeContribution);
let h2_reductions = &reductions[&h2];
let h2_s0 = h2_reductions
.iter()
.find(|r| r.season_id == 0)
.expect("should have reduction for H2 S0");
assert_eq!(h2_s0.reason, ReductionReason::MagnitudeBound);
assert_eq!(h2_s0.reduced_order, 0);
assert!(
!h2_reductions.iter().any(|r| r.season_id == 1),
"H2 S1 should have no reduction"
);
}
fn make_expansion_stage(
index: usize,
id: i32,
season_id: Option<usize>,
) -> cobre_core::temporal::Stage {
use chrono::NaiveDate;
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
cobre_core::temporal::Stage {
index,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id,
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 10,
noise_method: NoiseMethod::Saa,
},
}
}
#[test]
fn seasonal_stats_to_rows_includes_prestudy_stages() {
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let stats = vec![
SeasonalStats {
entity_id: h1,
stage_id: 0,
mean: 100.0,
std: 20.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 1,
mean: 110.0,
std: 22.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 2,
mean: 120.0,
std: 24.0,
},
];
let rows = seasonal_stats_to_rows(&stats, &stages);
assert_eq!(rows.len(), 5, "expected 5 rows (3 study + 2 pre-study)");
let prestudy_rows: Vec<_> = rows.iter().filter(|r| r.stage_id < 0).collect();
assert_eq!(
prestudy_rows.len(),
2,
"expected 2 pre-study rows, got {}",
prestudy_rows.len()
);
let neg2 = rows.iter().find(|r| r.stage_id == -2).expect("row for -2");
assert!((neg2.mean_m3s - 110.0).abs() < f64::EPSILON);
assert!((neg2.std_m3s - 22.0).abs() < f64::EPSILON);
let neg1 = rows.iter().find(|r| r.stage_id == -1).expect("row for -1");
assert!((neg1.mean_m3s - 120.0).abs() < f64::EPSILON);
assert!((neg1.std_m3s - 24.0).abs() < f64::EPSILON);
for w in rows.windows(2) {
assert!(
(w[0].hydro_id.0, w[0].stage_id) <= (w[1].hydro_id.0, w[1].stage_id),
"rows not sorted"
);
}
}
#[test]
fn ar_estimates_to_rows_includes_prestudy_stages() {
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let ar_estimates = vec![
ArCoefficientEstimate {
hydro_id: h1,
season_id: 0,
coefficients: vec![0.3],
residual_std_ratio: 0.9,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 1,
coefficients: vec![0.4],
residual_std_ratio: 0.85,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 2,
coefficients: vec![0.5],
residual_std_ratio: 0.8,
},
];
let rows = ar_estimates_to_rows(&ar_estimates, &stages);
assert_eq!(rows.len(), 5, "expected 5 rows");
let prestudy_rows: Vec<_> = rows.iter().filter(|r| r.stage_id < 0).collect();
assert_eq!(prestudy_rows.len(), 2);
let neg2 = rows.iter().find(|r| r.stage_id == -2).expect("row for -2");
assert!((neg2.coefficient - 0.4).abs() < f64::EPSILON);
assert!((neg2.residual_std_ratio - 0.85).abs() < f64::EPSILON);
let neg1 = rows.iter().find(|r| r.stage_id == -1).expect("row for -1");
assert!((neg1.coefficient - 0.5).abs() < f64::EPSILON);
assert!((neg1.residual_std_ratio - 0.8).abs() < f64::EPSILON);
}
#[test]
fn full_estimation_produces_prestudy_inflow_models() {
use cobre_io::scenarios::assemble_inflow_models;
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let stats = vec![
SeasonalStats {
entity_id: h1,
stage_id: 0,
mean: 100.0,
std: 20.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 1,
mean: 110.0,
std: 22.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 2,
mean: 120.0,
std: 24.0,
},
];
let stats_rows = seasonal_stats_to_rows(&stats, &stages);
let ar_ests = vec![
ArCoefficientEstimate {
hydro_id: h1,
season_id: 0,
coefficients: vec![0.3],
residual_std_ratio: 0.9,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 1,
coefficients: vec![0.4],
residual_std_ratio: 0.85,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 2,
coefficients: vec![0.5],
residual_std_ratio: 0.8,
},
];
let coeff_rows = ar_estimates_to_rows(&ar_ests, &stages);
let inflow_models =
assemble_inflow_models(stats_rows, coeff_rows).expect("assembly should succeed");
assert!(
inflow_models.iter().any(|m| m.stage_id < 0),
"expected pre-study InflowModel entries (negative stage_id)"
);
let prestudy_neg2 = inflow_models
.iter()
.find(|m| m.stage_id == -2)
.expect("InflowModel for stage -2");
assert!((prestudy_neg2.mean_m3s - 110.0).abs() < f64::EPSILON);
assert!((prestudy_neg2.std_m3s - 22.0).abs() < f64::EPSILON);
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::cast_lossless
)]
fn simulate_two_season_par2(
phi_1: f64,
phi_2: f64,
n_years: usize,
seed: u64,
) -> (Vec<f64>, Vec<f64>) {
let n_total = n_years * 2;
let burnin = 200;
let n_generate = n_total + burnin;
let mut values = vec![0.0_f64; n_generate + 2];
let mut lcg: u64 = seed;
let lcg_next = |s: u64| -> u64 {
s.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407)
};
for i in 2..n_generate + 2 {
lcg = lcg_next(lcg);
let u1 = (lcg >> 11) as f64 / (1u64 << 53) as f64;
lcg = lcg_next(lcg);
let u2 = (lcg >> 11) as f64 / (1u64 << 53) as f64;
let u1_safe = u1.max(1e-15);
let noise = (-2.0 * u1_safe.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
values[i] = phi_1 * values[i - 1] + phi_2 * values[i - 2] + noise;
}
let start = burnin + 2;
let mut obs_s0 = Vec::with_capacity(n_years);
let mut obs_s1 = Vec::with_capacity(n_years);
for y in 0..n_years {
obs_s0.push(values[start + y * 2]);
obs_s1.push(values[start + y * 2 + 1]);
}
(obs_s0, obs_s1)
}
fn make_two_season_stage(
index: usize,
id: i32,
season_id: usize,
year: i32,
first_half: bool,
) -> cobre_core::temporal::Stage {
use chrono::NaiveDate;
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
let (start_date, end_date) = if first_half {
(
NaiveDate::from_ymd_opt(year, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(year, 7, 1).unwrap(),
)
} else {
(
NaiveDate::from_ymd_opt(year, 7, 1).unwrap(),
NaiveDate::from_ymd_opt(year + 1, 1, 1).unwrap(),
)
};
cobre_core::temporal::Stage {
index,
id,
start_date,
end_date,
season_id: Some(season_id),
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 4380.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: false,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
}
}
fn two_season_map() -> cobre_core::temporal::SeasonMap {
use cobre_core::temporal::{SeasonCycleType, SeasonDefinition, SeasonMap};
SeasonMap {
cycle_type: SeasonCycleType::Custom,
seasons: vec![
SeasonDefinition {
id: 0,
label: "H1".to_string(),
month_start: 1,
day_start: Some(1),
month_end: Some(6),
day_end: Some(30),
},
SeasonDefinition {
id: 1,
label: "H2".to_string(),
month_start: 7,
day_start: Some(1),
month_end: Some(12),
day_end: Some(31),
},
],
}
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn iterative_pacf_reduction_stable_par2_not_spuriously_reduced() {
use cobre_stochastic::par::fitting::{
estimate_periodic_ar_coefficients, periodic_pacf, select_order_pacf,
};
let hydro_id = EntityId(1);
let n_seasons = 2;
let n_years = 500_usize;
let (obs_s0, obs_s1) = simulate_two_season_par2(0.7, 0.15, n_years, 137);
let mut group_obs: HashMap<(EntityId, usize), Vec<f64>> = HashMap::new();
group_obs.insert((hydro_id, 0), obs_s0.clone());
group_obs.insert((hydro_id, 1), obs_s1.clone());
let mean_s0 = obs_s0.iter().sum::<f64>() / obs_s0.len() as f64;
let mean_s1 = obs_s1.iter().sum::<f64>() / obs_s1.len() as f64;
let std_s0 = {
let v = obs_s0.iter().map(|x| (x - mean_s0).powi(2)).sum::<f64>()
/ (obs_s0.len() - 1) as f64;
v.sqrt()
};
let std_s1 = {
let v = obs_s1.iter().map(|x| (x - mean_s1).powi(2)).sum::<f64>()
/ (obs_s1.len() - 1) as f64;
v.sqrt()
};
let stats_storage = vec![
SeasonalStats {
entity_id: hydro_id,
stage_id: 0,
mean: mean_s0,
std: std_s0,
},
SeasonalStats {
entity_id: hydro_id,
stage_id: 1,
mean: mean_s1,
std: std_s1,
},
];
let stats_map: HashMap<(EntityId, usize), &SeasonalStats> = stats_storage
.iter()
.enumerate()
.map(|(s, st)| ((hydro_id, s), st))
.collect();
let stats_by_season_pop = {
let n = obs_s0.len() as f64;
let mu0 = mean_s0;
let mu1 = mean_s1;
let s0 = (obs_s0.iter().map(|x| (x - mu0).powi(2)).sum::<f64>() / n).sqrt();
let s1 = (obs_s1.iter().map(|x| (x - mu1).powi(2)).sum::<f64>() / n).sqrt();
vec![(mu0, s0), (mu1, s1)]
};
let obs_refs: Vec<&[f64]> = vec![&obs_s0, &obs_s1];
let z_alpha = 1.96_f64;
let max_order = 2_usize;
let mut estimates: Vec<ArCoefficientEstimate> = Vec::new();
for season in 0..n_seasons {
let n_obs = obs_refs[season].len();
let pacf_values = periodic_pacf(
season,
max_order,
n_seasons,
&obs_refs,
&stats_by_season_pop,
);
let selected = select_order_pacf(&pacf_values, n_obs, z_alpha).selected_order;
let yw = estimate_periodic_ar_coefficients(
season,
selected,
n_seasons,
&obs_refs,
&stats_by_season_pop,
);
estimates.push(ArCoefficientEstimate {
hydro_id,
season_id: season,
coefficients: yw.coefficients,
residual_std_ratio: yw.residual_std_ratio,
});
}
for est in &estimates {
assert_eq!(
est.coefficients.len(),
2,
"season {} should select order 2; got {}",
est.season_id,
est.coefficients.len()
);
}
let coeffs_before: Vec<Vec<f64>> =
estimates.iter().map(|e| e.coefficients.clone()).collect();
let reductions = iterative_pacf_reduction(
&mut estimates,
n_seasons,
&[hydro_id],
&group_obs,
&stats_map,
max_order,
z_alpha,
None,
);
for est in &estimates {
assert_eq!(
est.coefficients.len(),
2,
"stable PAR(2) season {} should remain order 2; got {}",
est.season_id,
est.coefficients.len()
);
}
for (est, before) in estimates.iter().zip(coeffs_before.iter()) {
if est.coefficients.len() == before.len() {
for (a, b) in est.coefficients.iter().zip(before.iter()) {
assert!(
(a - b).abs() < 1e-10,
"season {} coefficient drift: {a} vs {b}",
est.season_id
);
}
} else {
let yw_direct = estimate_periodic_ar_coefficients(
est.season_id,
est.coefficients.len(),
n_seasons,
&obs_refs,
&stats_by_season_pop,
);
for (a, b) in est.coefficients.iter().zip(yw_direct.coefficients.iter()) {
assert!(
(a - b).abs() < 1e-10,
"season {} post-reduction coeff {a} vs YW {b}",
est.season_id
);
}
}
}
assert!(!reductions.contains_key(&hydro_id));
}
#[test]
#[allow(clippy::cast_precision_loss)]
fn roundtrip_estimation_two_season_par2_recovers_coefficients() {
use chrono::NaiveDate;
let hydro_id = EntityId(1);
let n_years = 1000_usize;
let true_phi1 = 0.7_f64;
let true_phi2 = 0.15_f64;
let (obs_s0, obs_s1) = simulate_two_season_par2(true_phi1, true_phi2, n_years, 42);
let ref_year = 2000_i32;
let stages = vec![
make_two_season_stage(0, 0, 0, ref_year, true),
make_two_season_stage(1, 1, 1, ref_year, false),
];
let n_f = n_years as f64;
let mu0 = obs_s0.iter().sum::<f64>() / n_f;
let mu1 = obs_s1.iter().sum::<f64>() / n_f;
let std0 = (obs_s0.iter().map(|x| (x - mu0).powi(2)).sum::<f64>() / (n_f - 1.0)).sqrt();
let std1 = (obs_s1.iter().map(|x| (x - mu1).powi(2)).sum::<f64>() / (n_f - 1.0)).sqrt();
let seasonal_stats = vec![
SeasonalStats {
entity_id: hydro_id,
stage_id: 0, mean: mu0,
std: std0,
},
SeasonalStats {
entity_id: hydro_id,
stage_id: 1, mean: mu1,
std: std1,
},
];
let season_map = two_season_map();
let mut observations: Vec<(EntityId, NaiveDate, f64)> = Vec::new();
for y in 0..n_years {
let year = (1970 + y) as i32;
observations.push((
hydro_id,
NaiveDate::from_ymd_opt(year, 1, 1).unwrap(),
obs_s0[y],
));
observations.push((
hydro_id,
NaiveDate::from_ymd_opt(year, 7, 1).unwrap(),
obs_s1[y],
));
}
let (estimates, _report) = estimate_ar_coefficients_with_selection(
&observations,
&seasonal_stats,
&stages,
&[hydro_id],
&ArEstimationConfig {
max_order: 2,
max_coeff_magnitude: None,
season_map: Some(&season_map),
},
)
.expect("estimation must succeed");
let est_s0 = estimates
.iter()
.find(|e| e.hydro_id == hydro_id && e.season_id == 0)
.expect("season 0 estimate must exist");
let est_s1 = estimates
.iter()
.find(|e| e.hydro_id == hydro_id && e.season_id == 1)
.expect("season 1 estimate must exist");
for est in [est_s0, est_s1] {
assert!(
est.coefficients.len() >= 2,
"season {} should select at least order 2; got {}",
est.season_id,
est.coefficients.len()
);
assert!(
(est.coefficients[0] - true_phi1).abs() < 0.15,
"season {} phi_1={:.4} should be within 0.15 of true {true_phi1:.4}",
est.season_id,
est.coefficients[0]
);
assert!(
(est.coefficients[1] - true_phi2).abs() < 0.15,
"season {} phi_2={:.4} should be within 0.15 of true {true_phi2:.4}",
est.season_id,
est.coefficients[1]
);
assert!(
est.residual_std_ratio.is_finite() && est.residual_std_ratio > 0.0,
"season {} residual_std_ratio={} should be positive finite",
est.season_id,
est.residual_std_ratio
);
}
}
#[test]
#[allow(clippy::cast_sign_loss)]
fn test_ar_rows_to_estimates_groups_by_season() {
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
let make_stage = |id: i32, season_id: usize| cobre_core::temporal::Stage {
index: id as usize,
id,
start_date: chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
end_date: chrono::NaiveDate::from_ymd_opt(1970, 7, 1).unwrap(),
season_id: Some(season_id),
blocks: vec![Block {
index: 0,
name: "T".to_string(),
duration_hours: 1.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: false,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
};
let stages = vec![make_stage(0, 0), make_stage(1, 0), make_stage(2, 1)];
let rows = vec![
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 0,
lag: 1,
coefficient: 0.50,
residual_std_ratio: 0.85,
},
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 1,
lag: 1,
coefficient: 0.50,
residual_std_ratio: 0.85,
},
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 2,
lag: 1,
coefficient: 0.60,
residual_std_ratio: 0.80,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 0,
lag: 1,
coefficient: 0.40,
residual_std_ratio: 0.90,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 1,
lag: 1,
coefficient: 0.40,
residual_std_ratio: 0.90,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 2,
lag: 1,
coefficient: 0.35,
residual_std_ratio: 0.88,
},
];
let estimates = ar_rows_to_estimates(&rows, &stages);
assert_eq!(
estimates.len(),
4,
"expected 4 estimates (2 hydros * 2 seasons), got {}",
estimates.len()
);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(1) && e.season_id == 0)
.expect("hydro 1, season 0 estimate must exist");
assert_eq!(e.coefficients.len(), 1, "AR(1) must have 1 coefficient");
assert!(
(e.coefficients[0] - 0.50).abs() < f64::EPSILON,
"coeff must be 0.50, got {}",
e.coefficients[0]
);
assert!(
(e.residual_std_ratio - 0.85).abs() < f64::EPSILON,
"residual_std_ratio must be 0.85"
);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(1) && e.season_id == 1)
.expect("hydro 1, season 1 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.60).abs() < f64::EPSILON);
assert!((e.residual_std_ratio - 0.80).abs() < f64::EPSILON);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(2) && e.season_id == 0)
.expect("hydro 2, season 0 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.40).abs() < f64::EPSILON);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(2) && e.season_id == 1)
.expect("hydro 2, season 1 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.35).abs() < f64::EPSILON);
}
fn write_unit_test_ar_coefficients(
path: &std::path::Path,
hydro_id: i32,
stages: &[cobre_core::temporal::Stage],
coefficient: f64,
residual_std_ratio: f64,
) {
use arrow::array::{Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("stage_id", DataType::Int32, false),
Field::new("lag", DataType::Int32, false),
Field::new("coefficient", DataType::Float64, false),
Field::new("residual_std_ratio", DataType::Float64, false),
]));
let n = stages.len();
let hydro_ids: Vec<i32> = vec![hydro_id; n];
let stage_ids: Vec<i32> = stages.iter().map(|s| s.id).collect();
let lags: Vec<i32> = vec![1; n];
let coefficients: Vec<f64> = vec![coefficient; n];
let ratios: Vec<f64> = vec![residual_std_ratio; n];
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(hydro_ids)),
Arc::new(Int32Array::from(stage_ids)),
Arc::new(Int32Array::from(lags)),
Arc::new(Float64Array::from(coefficients)),
Arc::new(Float64Array::from(ratios)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[allow(clippy::cast_possible_wrap)]
fn build_system_empty_models(n_years: usize) -> System {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let hydro_id = EntityId(1);
let bus = Bus {
id: EntityId(10),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let hydro = Hydro {
id: hydro_id,
name: "H1".to_string(),
bus_id: EntityId(10),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 0.9,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.build()
.expect("valid system with empty inflow models")
}
#[allow(clippy::cast_possible_wrap)]
fn setup_user_ar_case(
case_dir: &std::path::Path,
n_years: usize,
ar_coefficient: f64,
residual_std_ratio: f64,
) {
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_unit_test_inflow_history(&scenarios.join("inflow_history.parquet"), 1, n_years);
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
write_unit_test_ar_coefficients(
&scenarios.join("inflow_ar_coefficients.parquet"),
1,
&stages,
ar_coefficient,
residual_std_ratio,
);
}
#[test]
fn test_user_ar_estimation_preserves_ar_coefficients() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
const KNOWN_COEFF: f64 = 0.72;
const KNOWN_RATIO: f64 = 0.69;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, KNOWN_COEFF, KNOWN_RATIO);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
assert_eq!(
path,
EstimationPath::UserArHistoryStats,
"expected UserArHistoryStats path"
);
assert!(
report.is_some(),
"UserArHistoryStats must return Some(report)"
);
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
for m in models {
assert_eq!(
m.ar_coefficients.len(),
1,
"every model must have AR(1) coefficients (lag 1 only), stage {}",
m.stage_id
);
assert_eq!(
m.ar_coefficients[0].to_bits(),
KNOWN_COEFF.to_bits(),
"ar_coefficients[0] must be bitwise identical to {KNOWN_COEFF} for stage {}",
m.stage_id
);
assert_eq!(
m.residual_std_ratio.to_bits(),
KNOWN_RATIO.to_bits(),
"residual_std_ratio must be bitwise identical to {KNOWN_RATIO} for stage {}",
m.stage_id
);
}
}
#[test]
fn test_user_ar_estimation_estimates_stats_from_history() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, 0.55, 0.83);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (updated, _report, _path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
for m in models {
assert!(
m.mean_m3s.is_finite() && m.mean_m3s > 0.0,
"mean_m3s must be finite and positive, got {} for stage {}",
m.mean_m3s,
m.stage_id
);
assert!(
m.std_m3s.is_finite() && m.std_m3s >= 0.0,
"std_m3s must be finite and non-negative, got {} for stage {}",
m.std_m3s,
m.stage_id
);
}
}
#[test]
fn test_user_ar_estimation_returns_user_provided_report() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, 0.55, 0.83);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (_updated, report, _path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
let report = report.expect("UserArHistoryStats must return Some(EstimationReport)");
assert_eq!(
report.method, "user_provided",
"report method must be 'user_provided', got '{}'",
report.method
);
assert!(
report.entries.is_empty(),
"report entries must be empty (no AR was estimated), got {} entries",
report.entries.len()
);
}
fn make_hydro(hydro_id: EntityId, bus_id: EntityId) -> cobre_core::entities::hydro::Hydro {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel};
Hydro {
id: hydro_id,
name: format!("H{}", hydro_id.0),
bus_id,
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 0.9,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: cobre_core::entities::hydro::HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
}
}
#[allow(clippy::cast_possible_wrap)]
fn build_two_hydro_system_selective_stats(
n_years: usize,
all_hydro_ids: &[EntityId],
stats_hydro_ids: &[EntityId],
) -> System {
use cobre_core::scenario::InflowModel;
use cobre_core::{Bus, DeficitSegment, SystemBuilder};
let bus_id = EntityId(10);
let bus = Bus {
id: bus_id,
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let inflow_models: Vec<InflowModel> = stats_hydro_ids
.iter()
.flat_map(|&hid| {
stages.iter().map(move |s| InflowModel {
hydro_id: hid,
stage_id: s.id,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
})
})
.collect();
let hydros: Vec<_> = all_hydro_ids
.iter()
.map(|&hid| make_hydro(hid, bus_id))
.collect();
SystemBuilder::new()
.buses(vec![bus])
.hydros(hydros)
.stages(stages)
.inflow_models(inflow_models)
.build()
.expect("valid two-hydro system")
}
fn write_history_for_hydros(path: &std::path::Path, hydro_ids: &[i32], n_years: usize) {
use arrow::array::{Date32Array, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use chrono::NaiveDate;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_to_days = |d: NaiveDate| -> i32 {
i32::try_from((d - epoch).num_days()).expect("date in Date32 range")
};
let (obs_s0, obs_s1) = simulate_two_season_par2(0.7, 0.15, n_years, 99);
let mut ids: Vec<i32> = Vec::new();
let mut dates: Vec<i32> = Vec::new();
let mut values: Vec<f64> = Vec::new();
for &hid in hydro_ids {
for y in 0..n_years {
let year = (1970 + y) as i32;
ids.push(hid);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 1, 15).unwrap()));
values.push(obs_s0[y] + 300.0);
ids.push(hid);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 7, 15).unwrap()));
values.push(obs_s1[y] + 300.0);
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("value_m3s", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Date32Array::from(dates)),
Arc::new(Float64Array::from(values)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[test]
fn test_partial_estimation_direction_a_missing_stats() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1, 2], N_YEARS);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1), EntityId(2)],
&[EntityId(1)], );
let config = default_config();
let result = estimate_from_history(system, case_dir, &config);
assert!(
result.is_err(),
"Direction A must return Err when hydro 2 has AR estimates but no user stats"
);
let err = result.unwrap_err();
let description = err.to_string();
assert!(
description.contains('2'),
"error description must contain the uncovered hydro ID '2', got: {description}"
);
}
#[test]
fn test_partial_estimation_direction_b_white_noise_fallback() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1], N_YEARS);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1), EntityId(2)],
&[EntityId(1), EntityId(2)], );
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("Direction B must succeed (not an error)");
assert_eq!(
path,
EstimationPath::PartialEstimation,
"expected PartialEstimation path"
);
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert_eq!(
report.white_noise_fallbacks,
vec![EntityId(2)],
"white_noise_fallbacks must be [EntityId(2)], got {:?}",
report.white_noise_fallbacks
);
let hydro2_models: Vec<_> = updated
.inflow_models()
.iter()
.filter(|m| m.hydro_id == EntityId(2))
.collect();
assert!(
!hydro2_models.is_empty(),
"returned system must have inflow models for hydro 2"
);
for m in &hydro2_models {
assert!(
m.ar_coefficients.is_empty(),
"hydro 2 must have empty ar_coefficients (white-noise fallback), stage {}",
m.stage_id
);
}
}
#[test]
fn test_partial_estimation_exact_coverage_no_fallback() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (_updated, report, _path) =
estimate_from_history(system, case_dir, &config).expect("exact coverage must succeed");
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert!(
report.white_noise_fallbacks.is_empty(),
"white_noise_fallbacks must be empty for exact coverage, got {:?}",
report.white_noise_fallbacks
);
}
#[test]
fn test_full_estimation_report_has_empty_fallbacks() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1], N_YEARS);
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1)],
&[], );
let config = default_config();
let (_, report, path) =
estimate_from_history(system, case_dir, &config).expect("FullEstimation must succeed");
assert_eq!(
path,
EstimationPath::FullEstimation,
"expected FullEstimation path"
);
let report = report.expect("FullEstimation must return Some(EstimationReport)");
assert!(
report.white_noise_fallbacks.is_empty(),
"FullEstimation must never populate white_noise_fallbacks, got {:?}",
report.white_noise_fallbacks
);
}
fn collect_std_ratio_warnings(
hydro_id: EntityId,
user_stds: &[f64],
est_stds: &[f64],
) -> Vec<StdRatioDivergence> {
use cobre_core::scenario::InflowModel;
assert_eq!(
user_stds.len(),
est_stds.len(),
"user_stds and est_stds must have equal length"
);
let n = user_stds.len();
let stages: Vec<cobre_core::temporal::Stage> = (0..n)
.map(|i| {
let year = 1970_i32;
let first_half = i % 2 == 0;
make_two_season_stage(i, i as i32, i, year, first_half)
})
.collect();
let user_models: Vec<InflowModel> = (0..n)
.map(|i| InflowModel {
hydro_id,
stage_id: i as i32,
mean_m3s: 100.0,
std_m3s: user_stds[i],
ar_coefficients: vec![],
residual_std_ratio: 1.0,
})
.collect();
let system = SystemBuilder::new()
.inflow_models(user_models)
.stages(stages.clone())
.build()
.expect("valid system");
let fitting_stats: Vec<SeasonalStats> = (0..n)
.map(|i| SeasonalStats {
entity_id: hydro_id,
stage_id: i as i32,
mean: 100.0,
std: est_stds[i],
})
.collect();
check_std_ratio_divergence(&system, &fitting_stats, &stages)
}
#[test]
fn test_std_ratio_divergence_fires_when_ratios_diverge() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0], &[100.0, 100.0]);
assert!(
!warnings.is_empty(),
"expected at least one StdRatioDivergence when ratio diverges by 5x"
);
let pair_0_1 = warnings.iter().find(|w| w.season_a == 0 && w.season_b == 1);
assert!(
pair_0_1.is_some(),
"expected a warning for season pair 0→1, got {warnings:?}"
);
let w = pair_0_1.unwrap();
assert_eq!(
w.hydro_id,
EntityId(1),
"warning must record the correct hydro_id"
);
assert!(
(w.divergence - 5.0).abs() < 1e-10,
"divergence for pair 0→1 must be 5.0, got {}",
w.divergence
);
}
#[test]
fn test_std_ratio_divergence_not_fires_when_similar() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0], &[90.0, 18.0]);
assert!(
warnings.is_empty(),
"expected no StdRatioDivergence when ratios are similar, got {warnings:?}"
);
}
#[test]
fn test_std_ratio_divergence_skips_near_zero_std() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 0.0], &[90.0, 18.0]);
let _ = warnings; }
#[test]
fn test_std_ratio_divergence_wraps_last_to_first() {
let warnings =
collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0, 50.0], &[100.0, 20.0, 10.0]);
assert!(
warnings.len() >= 2,
"expected at least 2 StdRatioDivergence entries (including wrap), got {}",
warnings.len()
);
let has_wrap = warnings.iter().any(|w| w.season_a == 2 && w.season_b == 0);
assert!(
has_wrap,
"expected a warning for the wrap-around pair season 2 → season 0"
);
}
}