use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
use chrono::{Months, NaiveDate};
use cobre_core::{EntityId, System};
use cobre_io::{
Config, FileManifest, LoadError, ValidationContext, parse_inflow_ar_coefficients,
parse_inflow_history,
scenarios::{
InflowAnnualComponentRow, InflowArCoefficientRow, InflowSeasonalStatsRow,
assemble_inflow_models,
},
validate_structure,
};
use cobre_stochastic::{
StochasticError,
par::aggregate::aggregate_observations_to_season,
par::fitting::{
ArCoefficientEstimate, ArEstimationConfig, SeasonalStats, StdRatioDivergence,
estimate_ar_coefficients_with_selection, estimate_correlation_with_season_map,
estimate_seasonal_stats_with_season_map,
},
};
pub use cobre_stochastic::par::fitting::EstimationReport;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EstimationPath {
Deterministic,
UserStatsWhiteNoise,
UserProvidedNoHistory,
FullEstimation,
UserArHistoryStats,
PartialEstimation,
UserProvidedAll,
}
impl EstimationPath {
#[must_use]
pub fn resolve(manifest: &cobre_io::FileManifest) -> Self {
match (
manifest.scenarios_inflow_history_parquet,
manifest.scenarios_inflow_seasonal_stats_parquet,
manifest.scenarios_inflow_ar_coefficients_parquet,
) {
(false, false, _) => Self::Deterministic,
(false, true, false) => Self::UserStatsWhiteNoise,
(false, true, true) => Self::UserProvidedNoHistory,
(true, false, false) => Self::FullEstimation,
(true, false, true) => Self::UserArHistoryStats,
(true, true, false) => Self::PartialEstimation,
(true, true, true) => Self::UserProvidedAll,
}
}
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Deterministic => "deterministic",
Self::UserStatsWhiteNoise => "user_stats_white_noise",
Self::UserProvidedNoHistory => "user_provided_no_history",
Self::FullEstimation => "full_estimation",
Self::UserArHistoryStats => "user_ar_history_stats",
Self::PartialEstimation => "partial_estimation",
Self::UserProvidedAll => "user_provided_all",
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum EstimationError {
#[error("load error: {0}")]
Load(#[from] LoadError),
#[error("estimation failed: {0}")]
Stochastic(#[from] StochasticError),
}
pub fn estimate_from_history(
system: System,
case_dir: &Path,
config: &Config,
) -> Result<(System, Option<EstimationReport>, EstimationPath), EstimationError> {
let mut ctx = ValidationContext::new();
let manifest = validate_structure(case_dir, &mut ctx);
if ctx.into_result().is_err() {
return Ok((system, None, EstimationPath::Deterministic));
}
let path = EstimationPath::resolve(&manifest);
match path {
EstimationPath::Deterministic
| EstimationPath::UserStatsWhiteNoise
| EstimationPath::UserProvidedNoHistory
| EstimationPath::UserProvidedAll => Ok((system, None, path)),
EstimationPath::PartialEstimation => {
let (system, report) = run_partial_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
EstimationPath::FullEstimation => {
let (system, report) = run_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
EstimationPath::UserArHistoryStats => {
let (system, report) = run_user_ar_estimation(system, case_dir, config, &manifest)?;
Ok((system, Some(report), path))
}
}
}
fn run_estimation(
system: System,
case_dir: &Path,
config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let study_stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
let max_order = config.estimation.max_order as usize;
let prestudy = synthesize_prestudy_stages(study_stages, max_order, season_map);
let stages: Vec<cobre_core::temporal::Stage> = study_stages
.iter()
.cloned()
.chain(prestudy.iter().cloned())
.collect();
let stages = stages.as_slice();
let observations = if let Some(sm) = season_map {
aggregate_observations_to_season(&observations, study_stages, sm)?
} else {
observations
};
let seasonal_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let (ar_estimates, estimation_report) = estimate_ar_coefficients_with_selection(
&observations,
&seasonal_stats,
stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: config.estimation.max_coefficient_magnitude,
season_map,
use_annual_component: matches!(
config.estimation.order_selection,
cobre_io::config::OrderSelectionMethod::PacfAnnual
),
},
)?;
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&ar_estimates,
&seasonal_stats,
stages,
&hydro_ids,
season_map,
)?
};
let stats_rows = seasonal_stats_to_rows(&seasonal_stats, stages);
let coeff_rows = ar_estimates_to_rows(&ar_estimates, stages);
let annual_rows = ar_estimates_to_annual_rows(&ar_estimates, stages);
let inflow_models = assemble_inflow_models(stats_rows, coeff_rows, annual_rows)?;
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
fn run_partial_estimation(
system: System,
case_dir: &Path,
config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let study_stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
let max_order = config.estimation.max_order as usize;
let prestudy = synthesize_prestudy_stages(study_stages, max_order, season_map);
let stages_owned: Vec<cobre_core::temporal::Stage> = study_stages
.iter()
.cloned()
.chain(prestudy.iter().cloned())
.collect();
let stages = stages_owned.as_slice();
let observations = load_and_aggregate_observations(case_dir, study_stages, season_map)?;
if system.inflow_models().is_empty() {
return Err(EstimationError::Load(
cobre_io::LoadError::ConstraintError {
description: "manifest indicates inflow_seasonal_stats.parquet is present \
but system.inflow_models() is empty; \
no user stats available for partial estimation"
.to_string(),
},
));
}
let fitting_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let (ar_estimates, mut estimation_report) = estimate_ar_coefficients_with_selection(
&observations,
&fitting_stats,
stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: config.estimation.max_coefficient_magnitude,
season_map,
use_annual_component: matches!(
config.estimation.order_selection,
cobre_io::config::OrderSelectionMethod::PacfAnnual
),
},
)?;
let (white_noise_fallbacks, std_ratio_warnings) =
validate_partial_estimation_coverage(&system, &fitting_stats, study_stages)?;
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&ar_estimates,
&fitting_stats,
stages,
&hydro_ids,
season_map,
)?
};
let mut stats_rows = user_stats_to_rows(&system);
stats_rows.extend(prestudy_seasonal_rows(&fitting_stats, &prestudy));
let coeff_rows = ar_estimates_to_rows(&ar_estimates, stages);
let annual_rows = ar_estimates_to_annual_rows(&ar_estimates, stages);
let inflow_models = assemble_inflow_models(stats_rows, coeff_rows, annual_rows)?;
estimation_report.white_noise_fallbacks = white_noise_fallbacks;
estimation_report.std_ratio_warnings = std_ratio_warnings;
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
fn load_and_aggregate_observations(
case_dir: &Path,
stages: &[cobre_core::temporal::Stage],
season_map: Option<&cobre_core::temporal::SeasonMap>,
) -> Result<Vec<(EntityId, NaiveDate, f64)>, EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
if let Some(sm) = season_map {
Ok(aggregate_observations_to_season(&observations, stages, sm)?)
} else {
Ok(observations)
}
}
type CoverageCheckResult = (Vec<EntityId>, Vec<StdRatioDivergence>);
fn validate_partial_estimation_coverage(
system: &System,
fitting_stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
) -> Result<CoverageCheckResult, EstimationError> {
let estimated_hydro_ids: HashSet<EntityId> =
fitting_stats.iter().map(|s| s.entity_id).collect();
let user_stats_hydro_ids: HashSet<EntityId> =
system.inflow_models().iter().map(|m| m.hydro_id).collect();
let mut missing_stats: Vec<EntityId> = estimated_hydro_ids
.difference(&user_stats_hydro_ids)
.copied()
.collect();
missing_stats.sort();
if !missing_stats.is_empty() {
let ids: Vec<String> = missing_stats.iter().map(|id| id.0.to_string()).collect();
return Err(EstimationError::Load(
cobre_io::LoadError::ConstraintError {
description: format!(
"partial estimation: AR coefficients were estimated for hydro(s) \
[{ids}] but inflow_seasonal_stats.parquet has no entry for them; \
all hydros with estimated AR must have user-provided stats",
ids = ids.join(", ")
),
},
));
}
let mut white_noise_fallbacks: Vec<EntityId> = user_stats_hydro_ids
.difference(&estimated_hydro_ids)
.copied()
.collect();
white_noise_fallbacks.sort();
let std_ratio_warnings = check_std_ratio_divergence(system, fitting_stats, stages);
for w in &std_ratio_warnings {
tracing::warn!(
"hydro {} season {}->{} std ratio diverges {:.1}x between \
user ({:.2}) and estimated ({:.2})",
w.hydro_id.0,
w.season_a,
w.season_b,
w.divergence,
w.user_ratio,
w.estimated_ratio
);
}
Ok((white_noise_fallbacks, std_ratio_warnings))
}
fn run_user_ar_estimation(
system: System,
case_dir: &Path,
_config: &Config,
manifest: &FileManifest,
) -> Result<(System, EstimationReport), EstimationError> {
let history_path = case_dir.join("scenarios/inflow_history.parquet");
let history = parse_inflow_history(&history_path)?;
let observations: Vec<(EntityId, NaiveDate, f64)> = history
.iter()
.map(|row| (row.hydro_id, row.date, row.value_m3s))
.collect();
let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
let stages = system.stages();
let season_map = system.policy_graph().season_map.as_ref();
let observations = if let Some(sm) = season_map {
aggregate_observations_to_season(&observations, stages, sm)?
} else {
observations
};
let seasonal_stats =
estimate_seasonal_stats_with_season_map(&observations, stages, &hydro_ids, season_map)?;
let ar_path = case_dir.join("scenarios/inflow_ar_coefficients.parquet");
let user_ar_rows = parse_inflow_ar_coefficients(&ar_path)?;
let user_ar_estimates = ar_rows_to_estimates(&user_ar_rows, stages);
let correlation = if manifest.scenarios_correlation_json {
system.correlation().clone()
} else {
estimate_correlation_with_season_map(
&observations,
&user_ar_estimates,
&seasonal_stats,
stages,
&hydro_ids,
season_map,
)?
};
let stats_rows = seasonal_stats_to_rows(&seasonal_stats, stages);
let inflow_models = assemble_inflow_models(stats_rows, user_ar_rows, vec![])?;
let estimation_report = EstimationReport {
entries: BTreeMap::new(),
method: "user_provided".to_string(),
white_noise_fallbacks: Vec::new(),
std_ratio_warnings: Vec::new(),
};
Ok((
system.with_scenario_models(inflow_models, correlation),
estimation_report,
))
}
fn ar_rows_to_estimates(
rows: &[InflowArCoefficientRow],
stages: &[cobre_core::temporal::Stage],
) -> Vec<ArCoefficientEstimate> {
let stage_id_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut first_stage: HashMap<(EntityId, usize), i32> = HashMap::new();
let mut groups: BTreeMap<(EntityId, usize), (Vec<f64>, f64)> = BTreeMap::new();
for row in rows {
let Some(&season_id) = stage_id_to_season.get(&row.stage_id) else {
continue;
};
let key = (row.hydro_id, season_id);
let canonical_stage = first_stage.entry(key).or_insert(row.stage_id);
if *canonical_stage != row.stage_id {
continue;
}
let entry = groups
.entry(key)
.or_insert_with(|| (Vec::new(), row.residual_std_ratio));
entry.0.push(row.coefficient);
}
groups
.into_iter()
.map(
|((hydro_id, season_id), (coefficients, residual_std_ratio))| ArCoefficientEstimate {
hydro_id,
season_id,
coefficients,
residual_std_ratio,
annual: None,
},
)
.collect()
}
fn user_stats_to_rows(system: &System) -> Vec<InflowSeasonalStatsRow> {
system
.inflow_models()
.iter()
.map(|m| InflowSeasonalStatsRow {
hydro_id: m.hydro_id,
stage_id: m.stage_id,
mean_m3s: m.mean_m3s,
std_m3s: m.std_m3s,
})
.collect()
}
fn check_std_ratio_divergence(
system: &System,
fitting_stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
) -> Vec<StdRatioDivergence> {
let stage_id_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut user_std: BTreeMap<(EntityId, usize), f64> = BTreeMap::new();
for m in system.inflow_models() {
let Some(&season_id) = stage_id_to_season.get(&m.stage_id) else {
continue;
};
user_std.entry((m.hydro_id, season_id)).or_insert(m.std_m3s);
}
let mut est_std: BTreeMap<(EntityId, usize), f64> = BTreeMap::new();
for s in fitting_stats {
let Some(&season_id) = stage_id_to_season.get(&s.stage_id) else {
continue;
};
est_std.entry((s.entity_id, season_id)).or_insert(s.std);
}
let user_hydros: std::collections::BTreeSet<EntityId> =
user_std.keys().map(|(h, _)| *h).collect();
let est_hydros: std::collections::BTreeSet<EntityId> =
est_std.keys().map(|(h, _)| *h).collect();
let common_hydros: Vec<EntityId> = user_hydros.intersection(&est_hydros).copied().collect();
let mut warnings: Vec<StdRatioDivergence> = Vec::new();
for hydro_id in common_hydros {
let season_ids: Vec<usize> = {
let mut ids: Vec<usize> = user_std
.keys()
.filter(|(h, _)| *h == hydro_id)
.map(|(_, s)| *s)
.collect();
ids.sort_unstable();
ids.dedup();
ids
};
let n = season_ids.len();
if n < 2 {
continue;
}
for i in 0..n {
let season_a = season_ids[i];
let season_b = season_ids[(i + 1) % n];
let Some(&u_a) = user_std.get(&(hydro_id, season_a)) else {
continue;
};
let Some(&u_b) = user_std.get(&(hydro_id, season_b)) else {
continue;
};
let Some(&e_a) = est_std.get(&(hydro_id, season_a)) else {
continue;
};
let Some(&e_b) = est_std.get(&(hydro_id, season_b)) else {
continue;
};
if u_b.abs() < 1e-12 || e_b.abs() < 1e-12 {
continue;
}
let ratio_user = u_a / u_b;
let ratio_est = e_a / e_b;
if ratio_user.abs() < 1e-12 || ratio_est.abs() < 1e-12 {
continue;
}
let divergence = (ratio_user / ratio_est)
.abs()
.max((ratio_est / ratio_user).abs());
if divergence > 2.0 {
warnings.push(StdRatioDivergence {
hydro_id,
season_a,
season_b,
user_ratio: ratio_user,
estimated_ratio: ratio_est,
divergence,
});
}
}
}
warnings.sort_by_key(|w| (w.hydro_id, w.season_a));
warnings
}
fn synthesize_prestudy_stages(
stages: &[cobre_core::temporal::Stage],
max_order: usize,
season_map: Option<&cobre_core::temporal::SeasonMap>,
) -> Vec<cobre_core::temporal::Stage> {
let Some(sm) = season_map else {
return Vec::new();
};
let cycle_len = sm.seasons.len();
if max_order == 0 || cycle_len == 0 {
return Vec::new();
}
let Some(first) = stages
.iter()
.filter(|s| s.id >= 0 && s.season_id.is_some())
.min_by_key(|s| s.id)
else {
return Vec::new();
};
let Some(first_season) = first.season_id else {
return Vec::new();
};
let study_seasons: HashSet<usize> = stages.iter().filter_map(|s| s.season_id).collect();
let lag_window = max_order.min(cycle_len - 1);
let mut synthetic = Vec::with_capacity(lag_window);
for k in 1..=lag_window {
let season_k = (first_season + cycle_len - (k % cycle_len)) % cycle_len;
if study_seasons.contains(&season_k) {
continue;
}
let (Some(start_k), Some(end_k)) = (
first
.start_date
.checked_sub_months(Months::new(u32::try_from(k).unwrap_or(u32::MAX))),
first
.start_date
.checked_sub_months(Months::new(u32::try_from(k - 1).unwrap_or(u32::MAX))),
) else {
continue;
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let id = first.id - k as i32;
let mut stage = first.clone();
stage.index = 0;
stage.id = id;
stage.start_date = start_k;
stage.end_date = end_k;
stage.season_id = Some(season_k);
synthetic.push(stage);
}
synthetic
}
fn prestudy_seasonal_rows(
fitting_stats: &[SeasonalStats],
prestudy: &[cobre_core::temporal::Stage],
) -> Vec<InflowSeasonalStatsRow> {
if prestudy.is_empty() {
return Vec::new();
}
let prestudy_ids: HashSet<i32> = prestudy.iter().map(|s| s.id).collect();
let mut rows: Vec<InflowSeasonalStatsRow> = fitting_stats
.iter()
.filter(|s| prestudy_ids.contains(&s.stage_id))
.map(|s| InflowSeasonalStatsRow {
hydro_id: s.entity_id,
stage_id: s.stage_id,
mean_m3s: s.mean,
std_m3s: s.std,
})
.collect();
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id));
rows
}
fn seasonal_stats_to_rows(
stats: &[SeasonalStats],
stages: &[cobre_core::temporal::Stage],
) -> Vec<InflowSeasonalStatsRow> {
let stage_to_season: HashMap<i32, usize> = stages
.iter()
.filter_map(|s| s.season_id.map(|sid| (s.id, sid)))
.collect();
let mut season_to_stages: HashMap<usize, Vec<i32>> = HashMap::new();
for stage in stages {
if let Some(sid) = stage.season_id {
season_to_stages.entry(sid).or_default().push(stage.id);
}
}
let mut rows = Vec::with_capacity(stats.len() * 10);
for s in stats {
if let Some(&season_id) = stage_to_season.get(&s.stage_id)
&& let Some(stage_ids) = season_to_stages.get(&season_id)
{
for &stage_id in stage_ids {
rows.push(InflowSeasonalStatsRow {
hydro_id: s.entity_id,
stage_id,
mean_m3s: s.mean,
std_m3s: s.std,
});
}
continue;
}
rows.push(InflowSeasonalStatsRow {
hydro_id: s.entity_id,
stage_id: s.stage_id,
mean_m3s: s.mean,
std_m3s: s.std,
});
}
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id));
rows
}
fn ar_estimates_to_rows(
ar_estimates: &[ArCoefficientEstimate],
stages: &[cobre_core::temporal::Stage],
) -> Vec<InflowArCoefficientRow> {
let mut season_to_stages: HashMap<usize, Vec<i32>> = HashMap::new();
for stage in stages {
if let Some(sid) = stage.season_id {
season_to_stages.entry(sid).or_default().push(stage.id);
}
}
let mut rows: Vec<InflowArCoefficientRow> = Vec::new();
for est in ar_estimates {
let Some(stage_ids) = season_to_stages.get(&est.season_id) else {
continue;
};
for &stage_id in stage_ids {
for (lag_idx, &coeff) in est.coefficients.iter().enumerate() {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let lag = (lag_idx + 1) as i32;
rows.push(InflowArCoefficientRow {
hydro_id: est.hydro_id,
stage_id,
lag,
coefficient: coeff,
residual_std_ratio: est.residual_std_ratio,
});
}
}
}
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id, r.lag));
rows
}
fn ar_estimates_to_annual_rows(
ar_estimates: &[ArCoefficientEstimate],
stages: &[cobre_core::temporal::Stage],
) -> Vec<InflowAnnualComponentRow> {
let mut season_to_stages: HashMap<usize, Vec<i32>> = HashMap::new();
for stage in stages {
if let Some(sid) = stage.season_id {
season_to_stages.entry(sid).or_default().push(stage.id);
}
}
let mut rows: Vec<InflowAnnualComponentRow> = Vec::new();
for est in ar_estimates {
let Some(ref ann) = est.annual else {
continue;
};
let Some(stage_ids) = season_to_stages.get(&est.season_id) else {
continue;
};
for &stage_id in stage_ids {
rows.push(InflowAnnualComponentRow {
hydro_id: est.hydro_id,
stage_id,
annual_coefficient: ann.coefficient,
annual_mean_m3s: ann.mean_m3s,
annual_std_m3s: ann.std_m3s,
});
}
}
rows.sort_by_key(|r| (r.hydro_id.0, r.stage_id));
rows
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::too_many_lines,
clippy::float_cmp,
clippy::doc_markdown,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::useless_vec
)]
mod tests {
use super::*;
use cobre_core::scenario::{CorrelationModel, InflowModel};
use cobre_core::{EntityId, SystemBuilder};
fn minimal_system_with_inflow_models(models: Vec<InflowModel>) -> System {
SystemBuilder::new()
.inflow_models(models)
.build()
.expect("valid system")
}
#[test]
fn test_with_scenario_models_replaces_fields() {
use cobre_core::{
Bus, DeficitSegment,
scenario::{CorrelationModel, InflowModel},
};
let bus = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let old_model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 10.0,
std_m3s: 1.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let system = SystemBuilder::new()
.buses(vec![bus])
.inflow_models(vec![old_model.clone(), {
let mut m = old_model.clone();
m.stage_id = 1;
m
}])
.build()
.expect("valid system");
assert_eq!(system.inflow_models().len(), 2);
assert_eq!(system.n_buses(), 1);
let new_models: Vec<InflowModel> = (0..4)
.map(|i| InflowModel {
hydro_id: EntityId(1),
stage_id: i,
mean_m3s: 50.0,
std_m3s: 5.0,
ar_coefficients: vec![0.4],
residual_std_ratio: 0.9,
annual: None,
})
.collect();
let new_corr = CorrelationModel::default();
let updated = system.with_scenario_models(new_models.clone(), new_corr.clone());
assert_eq!(updated.inflow_models().len(), 4, "expected 4 inflow models");
assert_eq!(
*updated.correlation(),
new_corr,
"correlation should equal new_corr"
);
assert_eq!(updated.n_buses(), 1, "buses must be preserved");
assert!(
updated.hydros().is_empty(),
"hydros must be preserved (empty)"
);
assert!(
updated.stages().is_empty(),
"stages must be preserved (empty)"
);
}
#[test]
fn test_with_scenario_models_clears_when_empty() {
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let system = minimal_system_with_inflow_models(vec![model]);
assert_eq!(system.inflow_models().len(), 1);
let updated = system.with_scenario_models(vec![], CorrelationModel::default());
assert!(updated.inflow_models().is_empty());
}
#[test]
fn test_estimate_explicit_stats_returns_unchanged() {
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
std::fs::write(scenarios.join("inflow_history.parquet"), b"").unwrap();
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"").unwrap();
std::fs::write(scenarios.join("inflow_ar_coefficients.parquet"), b"").unwrap();
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![0.5],
residual_std_ratio: 0.87,
annual: None,
};
let system = minimal_system_with_inflow_models(vec![model]);
let original_len = system.inflow_models().len();
let config = default_config();
let (result, report, _path) = estimate_from_history(system, case_dir, &config).unwrap();
assert_eq!(
result.inflow_models().len(),
original_len,
"explicit stats: system must be unchanged"
);
assert!(
report.is_none(),
"explicit stats path must return None report"
);
}
#[test]
fn test_estimate_no_history_returns_unchanged() {
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let system = minimal_system_with_inflow_models(vec![model]);
let original_len = system.inflow_models().len();
let config = default_config();
let (result, report, _path) = estimate_from_history(system, case_dir, &config).unwrap();
assert_eq!(
result.inflow_models().len(),
original_len,
"no history: system must be unchanged"
);
assert!(report.is_none(), "no history path must return None report");
}
#[test]
fn test_estimation_path_resolve_all_8_combinations() {
use cobre_io::FileManifest;
let make = |history: bool, stats: bool, ar: bool| FileManifest {
scenarios_inflow_history_parquet: history,
scenarios_inflow_seasonal_stats_parquet: stats,
scenarios_inflow_ar_coefficients_parquet: ar,
..Default::default()
};
assert_eq!(
EstimationPath::resolve(&make(false, false, false)),
EstimationPath::Deterministic,
);
assert_eq!(
EstimationPath::resolve(&make(false, false, true)),
EstimationPath::Deterministic,
);
assert_eq!(
EstimationPath::resolve(&make(false, true, false)),
EstimationPath::UserStatsWhiteNoise,
);
assert_eq!(
EstimationPath::resolve(&make(false, true, true)),
EstimationPath::UserProvidedNoHistory,
);
assert_eq!(
EstimationPath::resolve(&make(true, false, false)),
EstimationPath::FullEstimation,
);
assert_eq!(
EstimationPath::resolve(&make(true, false, true)),
EstimationPath::UserArHistoryStats,
);
assert_eq!(
EstimationPath::resolve(&make(true, true, false)),
EstimationPath::PartialEstimation,
);
assert_eq!(
EstimationPath::resolve(&make(true, true, true)),
EstimationPath::UserProvidedAll,
);
}
#[test]
fn test_estimation_path_as_str_round_trip() {
let variants = [
EstimationPath::Deterministic,
EstimationPath::UserStatsWhiteNoise,
EstimationPath::UserProvidedNoHistory,
EstimationPath::FullEstimation,
EstimationPath::UserArHistoryStats,
EstimationPath::PartialEstimation,
EstimationPath::UserProvidedAll,
];
let strings: Vec<&str> = variants.iter().map(|v| v.as_str()).collect();
for s in &strings {
assert!(!s.is_empty(), "as_str() returned empty string");
}
let unique: std::collections::HashSet<&&str> = strings.iter().collect();
assert_eq!(
unique.len(),
variants.len(),
"as_str() must return unique strings for each variant"
);
}
#[test]
fn test_user_stats_to_rows_maps_all_models() {
let models = vec![
InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
},
InflowModel {
hydro_id: EntityId(1),
stage_id: 1,
mean_m3s: 120.0,
std_m3s: 12.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
},
InflowModel {
hydro_id: EntityId(2),
stage_id: 0,
mean_m3s: 50.0,
std_m3s: 5.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
},
];
let system = minimal_system_with_inflow_models(models.clone());
let rows = user_stats_to_rows(&system);
assert_eq!(rows.len(), 3, "must produce one row per InflowModel");
for (model, row) in models.iter().zip(rows.iter()) {
assert_eq!(row.hydro_id, model.hydro_id, "hydro_id must be preserved");
assert_eq!(row.stage_id, model.stage_id, "stage_id must be preserved");
assert_eq!(
row.mean_m3s.to_bits(),
model.mean_m3s.to_bits(),
"mean_m3s must be bitwise identical"
);
assert_eq!(
row.std_m3s.to_bits(),
model.std_m3s.to_bits(),
"std_m3s must be bitwise identical"
);
}
}
#[test]
fn test_user_stats_to_rows_empty_system() {
let system = minimal_system_with_inflow_models(vec![]);
let rows = user_stats_to_rows(&system);
assert!(rows.is_empty(), "empty system must produce empty rows");
}
fn write_unit_test_inflow_history(path: &std::path::Path, hydro_id: i32, n_years: usize) {
use arrow::array::{Date32Array, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use chrono::NaiveDate;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_to_days = |d: NaiveDate| -> i32 {
i32::try_from((d - epoch).num_days()).expect("date in Date32 range")
};
let (obs_s0, obs_s1) = simulate_two_season_par2(0.7, 0.15, n_years, 99);
let mut ids: Vec<i32> = Vec::with_capacity(n_years * 2);
let mut dates: Vec<i32> = Vec::with_capacity(n_years * 2);
let mut values: Vec<f64> = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = (1970 + y) as i32;
ids.push(hydro_id);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 1, 15).unwrap()));
values.push(obs_s0[y] + 300.0);
ids.push(hydro_id);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 7, 15).unwrap()));
values.push(obs_s1[y] + 300.0);
}
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("value_m3s", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Date32Array::from(dates)),
Arc::new(Float64Array::from(values)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[allow(clippy::cast_possible_wrap)]
fn build_system_with_user_stats(n_years: usize) -> System {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::InflowModel;
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let hydro_id = EntityId(1);
let bus = Bus {
id: EntityId(10),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let inflow_models: Vec<InflowModel> = stages
.iter()
.map(|s| InflowModel {
hydro_id,
stage_id: s.id,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
})
.collect();
let hydro = Hydro {
id: hydro_id,
name: "H1".to_string(),
bus_id: EntityId(10),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.build()
.expect("valid system with user stats")
}
fn setup_partial_estimation_case(case_dir: &std::path::Path, n_years: usize) {
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_unit_test_inflow_history(
&scenarios.join("inflow_history.parquet"),
1, n_years,
);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
}
#[test]
fn test_partial_estimation_preserves_user_stats() {
use tempfile::TempDir;
const N_YEARS: usize = 30; let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("partial estimation must succeed");
assert_eq!(
path,
EstimationPath::PartialEstimation,
"expected PartialEstimation path"
);
assert!(
report.is_some(),
"PartialEstimation must return Some(report)"
);
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"partial estimation must produce at least one inflow model"
);
for m in models {
assert_eq!(
m.mean_m3s.to_bits(),
100.0_f64.to_bits(),
"mean_m3s must be bitwise identical to user value 100.0 for stage {}",
m.stage_id
);
assert_eq!(
m.std_m3s.to_bits(),
10.0_f64.to_bits(),
"std_m3s must be bitwise identical to user value 10.0 for stage {}",
m.stage_id
);
assert!(
!m.ar_coefficients.is_empty(),
"ar_coefficients must be non-empty for stage {} (estimated from history)",
m.stage_id
);
}
}
#[test]
fn test_partial_estimation_returns_report() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (_updated, report, _path) = estimate_from_history(system, case_dir, &config)
.expect("partial estimation must succeed");
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert_eq!(
report.method, "PACF",
"estimation method must be PACF, got '{}'",
report.method
);
assert_eq!(
report.entries.len(),
1,
"report must contain exactly 1 entry (one hydro), got {}",
report.entries.len()
);
assert!(
report.entries.contains_key(&EntityId(1)),
"report must contain an entry for hydro_id=1"
);
}
fn default_config() -> Config {
use cobre_io::config::{EstimationConfig, OrderSelectionMethod};
let mut cfg: Config = serde_json::from_str(MINIMAL_CONFIG_JSON).unwrap();
cfg.estimation = EstimationConfig {
max_order: 2,
order_selection: OrderSelectionMethod::Pacf,
min_observations_per_season: 2,
max_coefficient_magnitude: None,
};
cfg
}
const MINIMAL_CONFIG_JSON: &str = r#"{
"training": { "tree_seed": 42 },
"simulation": { "enabled": false, "num_scenarios": 0, "io_channel_capacity": 16 },
"modeling": {},
"policy": {},
"exports": {}
}"#;
fn create_required_files(case_dir: &std::path::Path) {
let _ = std::fs::create_dir_all(case_dir.join("system"));
let _ = std::fs::create_dir_all(case_dir.join("scenarios"));
let write = |name: &str| {
let _ = std::fs::write(case_dir.join(name), b"{}");
};
write("config.json");
write("penalties.json");
write("stages.json");
write("initial_conditions.json");
write("system/buses.json");
write("system/lines.json");
write("system/hydros.json");
write("system/thermals.json");
}
#[test]
fn test_estimation_report_structure() {
use cobre_stochastic::par::fitting::{ContributionReduction, build_estimation_report};
let h1 = EntityId(1);
let h2 = EntityId(2);
let n_seasons = 3_usize;
let mut estimates = Vec::new();
for &hydro_id in &[h1, h2] {
for season_id in 0..n_seasons {
estimates.push(ArCoefficientEstimate {
hydro_id,
season_id,
coefficients: vec![0.5, 0.3],
residual_std_ratio: 0.9,
annual: None,
});
}
}
let contribution_reductions: HashMap<EntityId, Vec<ContributionReduction>> = HashMap::new();
let report =
build_estimation_report(&estimates, n_seasons, &contribution_reductions, "PACF");
assert_eq!(report.entries.len(), 2, "expected 2 hydro entries");
for &hydro_id in &[h1, h2] {
let entry = report.entries.get(&hydro_id).expect("entry must exist");
assert_eq!(entry.selected_order, 2, "selected_order must be 2");
assert_eq!(
entry.coefficients.len(),
n_seasons,
"one coefficient vec per season"
);
}
}
#[test]
fn test_estimation_report_empty_for_pacf() {
use cobre_core::temporal::Stage;
let observations: Vec<(EntityId, chrono::NaiveDate, f64)> = vec![];
let seasonal_stats: Vec<SeasonalStats> = vec![];
let stages: Vec<Stage> = vec![];
let hydro_ids: Vec<EntityId> = vec![];
let max_order = 2;
let (_, report) = estimate_ar_coefficients_with_selection(
&observations,
&seasonal_stats,
&stages,
&hydro_ids,
&ArEstimationConfig {
max_order,
max_coeff_magnitude: None,
season_map: None,
use_annual_component: false,
},
)
.unwrap();
assert!(
report.entries.is_empty(),
"empty observations must produce empty EstimationReport"
);
}
fn make_expansion_stage(
index: usize,
id: i32,
season_id: Option<usize>,
) -> cobre_core::temporal::Stage {
use chrono::NaiveDate;
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
cobre_core::temporal::Stage {
index,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id,
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 10,
noise_method: NoiseMethod::Saa,
},
}
}
#[test]
fn seasonal_stats_to_rows_includes_prestudy_stages() {
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let stats = vec![
SeasonalStats {
entity_id: h1,
stage_id: 0,
mean: 100.0,
std: 20.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 1,
mean: 110.0,
std: 22.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 2,
mean: 120.0,
std: 24.0,
},
];
let rows = seasonal_stats_to_rows(&stats, &stages);
assert_eq!(rows.len(), 5, "expected 5 rows (3 study + 2 pre-study)");
let prestudy_rows: Vec<_> = rows.iter().filter(|r| r.stage_id < 0).collect();
assert_eq!(
prestudy_rows.len(),
2,
"expected 2 pre-study rows, got {}",
prestudy_rows.len()
);
let neg2 = rows.iter().find(|r| r.stage_id == -2).expect("row for -2");
assert!((neg2.mean_m3s - 110.0).abs() < f64::EPSILON);
assert!((neg2.std_m3s - 22.0).abs() < f64::EPSILON);
let neg1 = rows.iter().find(|r| r.stage_id == -1).expect("row for -1");
assert!((neg1.mean_m3s - 120.0).abs() < f64::EPSILON);
assert!((neg1.std_m3s - 24.0).abs() < f64::EPSILON);
for w in rows.windows(2) {
assert!(
(w[0].hydro_id.0, w[0].stage_id) <= (w[1].hydro_id.0, w[1].stage_id),
"rows not sorted"
);
}
}
#[test]
fn ar_estimates_to_rows_includes_prestudy_stages() {
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let ar_estimates = vec![
ArCoefficientEstimate {
hydro_id: h1,
season_id: 0,
coefficients: vec![0.3],
residual_std_ratio: 0.9,
annual: None,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 1,
coefficients: vec![0.4],
residual_std_ratio: 0.85,
annual: None,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 2,
coefficients: vec![0.5],
residual_std_ratio: 0.8,
annual: None,
},
];
let rows = ar_estimates_to_rows(&ar_estimates, &stages);
assert_eq!(rows.len(), 5, "expected 5 rows");
let prestudy_rows: Vec<_> = rows.iter().filter(|r| r.stage_id < 0).collect();
assert_eq!(prestudy_rows.len(), 2);
let neg2 = rows.iter().find(|r| r.stage_id == -2).expect("row for -2");
assert!((neg2.coefficient - 0.4).abs() < f64::EPSILON);
assert!((neg2.residual_std_ratio - 0.85).abs() < f64::EPSILON);
let neg1 = rows.iter().find(|r| r.stage_id == -1).expect("row for -1");
assert!((neg1.coefficient - 0.5).abs() < f64::EPSILON);
assert!((neg1.residual_std_ratio - 0.8).abs() < f64::EPSILON);
}
#[test]
fn full_estimation_produces_prestudy_inflow_models() {
use cobre_io::scenarios::assemble_inflow_models;
let stages = vec![
make_expansion_stage(0, -2, Some(1)),
make_expansion_stage(1, -1, Some(2)),
make_expansion_stage(2, 0, Some(0)),
make_expansion_stage(3, 1, Some(1)),
make_expansion_stage(4, 2, Some(2)),
];
let h1 = EntityId(1);
let stats = vec![
SeasonalStats {
entity_id: h1,
stage_id: 0,
mean: 100.0,
std: 20.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 1,
mean: 110.0,
std: 22.0,
},
SeasonalStats {
entity_id: h1,
stage_id: 2,
mean: 120.0,
std: 24.0,
},
];
let stats_rows = seasonal_stats_to_rows(&stats, &stages);
let ar_ests = vec![
ArCoefficientEstimate {
hydro_id: h1,
season_id: 0,
coefficients: vec![0.3],
residual_std_ratio: 0.9,
annual: None,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 1,
coefficients: vec![0.4],
residual_std_ratio: 0.85,
annual: None,
},
ArCoefficientEstimate {
hydro_id: h1,
season_id: 2,
coefficients: vec![0.5],
residual_std_ratio: 0.8,
annual: None,
},
];
let coeff_rows = ar_estimates_to_rows(&ar_ests, &stages);
let inflow_models = assemble_inflow_models(stats_rows, coeff_rows, vec![])
.expect("assembly should succeed");
assert!(
inflow_models.iter().any(|m| m.stage_id < 0),
"expected pre-study InflowModel entries (negative stage_id)"
);
let prestudy_neg2 = inflow_models
.iter()
.find(|m| m.stage_id == -2)
.expect("InflowModel for stage -2");
assert!((prestudy_neg2.mean_m3s - 110.0).abs() < f64::EPSILON);
assert!((prestudy_neg2.std_m3s - 22.0).abs() < f64::EPSILON);
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::cast_lossless
)]
fn simulate_two_season_par2(
phi_1: f64,
phi_2: f64,
n_years: usize,
seed: u64,
) -> (Vec<f64>, Vec<f64>) {
let n_total = n_years * 2;
let burnin = 200;
let n_generate = n_total + burnin;
let mut values = vec![0.0_f64; n_generate + 2];
let mut lcg: u64 = seed;
let lcg_next = |s: u64| -> u64 {
s.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407)
};
for i in 2..n_generate + 2 {
lcg = lcg_next(lcg);
let u1 = (lcg >> 11) as f64 / (1u64 << 53) as f64;
lcg = lcg_next(lcg);
let u2 = (lcg >> 11) as f64 / (1u64 << 53) as f64;
let u1_safe = u1.max(1e-15);
let noise = (-2.0 * u1_safe.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
values[i] = phi_1 * values[i - 1] + phi_2 * values[i - 2] + noise;
}
let start = burnin + 2;
let mut obs_s0 = Vec::with_capacity(n_years);
let mut obs_s1 = Vec::with_capacity(n_years);
for y in 0..n_years {
obs_s0.push(values[start + y * 2]);
obs_s1.push(values[start + y * 2 + 1]);
}
(obs_s0, obs_s1)
}
fn make_two_season_stage(
index: usize,
id: i32,
season_id: usize,
year: i32,
first_half: bool,
) -> cobre_core::temporal::Stage {
use chrono::NaiveDate;
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
let (start_date, end_date) = if first_half {
(
NaiveDate::from_ymd_opt(year, 1, 1).unwrap(),
NaiveDate::from_ymd_opt(year, 7, 1).unwrap(),
)
} else {
(
NaiveDate::from_ymd_opt(year, 7, 1).unwrap(),
NaiveDate::from_ymd_opt(year + 1, 1, 1).unwrap(),
)
};
cobre_core::temporal::Stage {
index,
id,
start_date,
end_date,
season_id: Some(season_id),
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 4380.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: false,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
}
}
#[test]
#[allow(clippy::cast_sign_loss)]
fn test_ar_rows_to_estimates_groups_by_season() {
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
let make_stage = |id: i32, season_id: usize| cobre_core::temporal::Stage {
index: id as usize,
id,
start_date: chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
end_date: chrono::NaiveDate::from_ymd_opt(1970, 7, 1).unwrap(),
season_id: Some(season_id),
blocks: vec![Block {
index: 0,
name: "T".to_string(),
duration_hours: 1.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: false,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
};
let stages = vec![make_stage(0, 0), make_stage(1, 0), make_stage(2, 1)];
let rows = vec![
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 0,
lag: 1,
coefficient: 0.50,
residual_std_ratio: 0.85,
},
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 1,
lag: 1,
coefficient: 0.50,
residual_std_ratio: 0.85,
},
InflowArCoefficientRow {
hydro_id: EntityId(1),
stage_id: 2,
lag: 1,
coefficient: 0.60,
residual_std_ratio: 0.80,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 0,
lag: 1,
coefficient: 0.40,
residual_std_ratio: 0.90,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 1,
lag: 1,
coefficient: 0.40,
residual_std_ratio: 0.90,
},
InflowArCoefficientRow {
hydro_id: EntityId(2),
stage_id: 2,
lag: 1,
coefficient: 0.35,
residual_std_ratio: 0.88,
},
];
let estimates = ar_rows_to_estimates(&rows, &stages);
assert_eq!(
estimates.len(),
4,
"expected 4 estimates (2 hydros * 2 seasons), got {}",
estimates.len()
);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(1) && e.season_id == 0)
.expect("hydro 1, season 0 estimate must exist");
assert_eq!(e.coefficients.len(), 1, "AR(1) must have 1 coefficient");
assert!(
(e.coefficients[0] - 0.50).abs() < f64::EPSILON,
"coeff must be 0.50, got {}",
e.coefficients[0]
);
assert!(
(e.residual_std_ratio - 0.85).abs() < f64::EPSILON,
"residual_std_ratio must be 0.85"
);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(1) && e.season_id == 1)
.expect("hydro 1, season 1 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.60).abs() < f64::EPSILON);
assert!((e.residual_std_ratio - 0.80).abs() < f64::EPSILON);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(2) && e.season_id == 0)
.expect("hydro 2, season 0 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.40).abs() < f64::EPSILON);
let e = estimates
.iter()
.find(|e| e.hydro_id == EntityId(2) && e.season_id == 1)
.expect("hydro 2, season 1 estimate must exist");
assert_eq!(e.coefficients.len(), 1);
assert!((e.coefficients[0] - 0.35).abs() < f64::EPSILON);
}
fn write_unit_test_ar_coefficients(
path: &std::path::Path,
hydro_id: i32,
stages: &[cobre_core::temporal::Stage],
coefficient: f64,
residual_std_ratio: f64,
) {
use arrow::array::{Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("stage_id", DataType::Int32, false),
Field::new("lag", DataType::Int32, false),
Field::new("coefficient", DataType::Float64, false),
Field::new("residual_std_ratio", DataType::Float64, false),
]));
let n = stages.len();
let hydro_ids: Vec<i32> = vec![hydro_id; n];
let stage_ids: Vec<i32> = stages.iter().map(|s| s.id).collect();
let lags: Vec<i32> = vec![1; n];
let coefficients: Vec<f64> = vec![coefficient; n];
let ratios: Vec<f64> = vec![residual_std_ratio; n];
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(hydro_ids)),
Arc::new(Int32Array::from(stage_ids)),
Arc::new(Int32Array::from(lags)),
Arc::new(Float64Array::from(coefficients)),
Arc::new(Float64Array::from(ratios)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[allow(clippy::cast_possible_wrap)]
fn build_system_empty_models(n_years: usize) -> System {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let hydro_id = EntityId(1);
let bus = Bus {
id: EntityId(10),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let hydro = Hydro {
id: hydro_id,
name: "H1".to_string(),
bus_id: EntityId(10),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.build()
.expect("valid system with empty inflow models")
}
#[allow(clippy::cast_possible_wrap)]
fn setup_user_ar_case(
case_dir: &std::path::Path,
n_years: usize,
ar_coefficient: f64,
residual_std_ratio: f64,
) {
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_unit_test_inflow_history(&scenarios.join("inflow_history.parquet"), 1, n_years);
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
write_unit_test_ar_coefficients(
&scenarios.join("inflow_ar_coefficients.parquet"),
1,
&stages,
ar_coefficient,
residual_std_ratio,
);
}
#[test]
fn test_user_ar_estimation_preserves_ar_coefficients() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
const KNOWN_COEFF: f64 = 0.72;
const KNOWN_RATIO: f64 = 0.69;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, KNOWN_COEFF, KNOWN_RATIO);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
assert_eq!(
path,
EstimationPath::UserArHistoryStats,
"expected UserArHistoryStats path"
);
assert!(
report.is_some(),
"UserArHistoryStats must return Some(report)"
);
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
for m in models {
assert_eq!(
m.ar_coefficients.len(),
1,
"every model must have AR(1) coefficients (lag 1 only), stage {}",
m.stage_id
);
assert_eq!(
m.ar_coefficients[0].to_bits(),
KNOWN_COEFF.to_bits(),
"ar_coefficients[0] must be bitwise identical to {KNOWN_COEFF} for stage {}",
m.stage_id
);
assert_eq!(
m.residual_std_ratio.to_bits(),
KNOWN_RATIO.to_bits(),
"residual_std_ratio must be bitwise identical to {KNOWN_RATIO} for stage {}",
m.stage_id
);
}
}
#[test]
fn test_user_ar_estimation_estimates_stats_from_history() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, 0.55, 0.83);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (updated, _report, _path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
for m in models {
assert!(
m.mean_m3s.is_finite() && m.mean_m3s > 0.0,
"mean_m3s must be finite and positive, got {} for stage {}",
m.mean_m3s,
m.stage_id
);
assert!(
m.std_m3s.is_finite() && m.std_m3s >= 0.0,
"std_m3s must be finite and non-negative, got {} for stage {}",
m.std_m3s,
m.stage_id
);
}
}
#[test]
fn test_user_ar_estimation_returns_user_provided_report() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_user_ar_case(case_dir, N_YEARS, 0.55, 0.83);
let system = build_system_empty_models(N_YEARS);
let config = default_config();
let (_updated, report, _path) = estimate_from_history(system, case_dir, &config)
.expect("UserArHistoryStats estimation must succeed");
let report = report.expect("UserArHistoryStats must return Some(EstimationReport)");
assert_eq!(
report.method, "user_provided",
"report method must be 'user_provided', got '{}'",
report.method
);
assert!(
report.entries.is_empty(),
"report entries must be empty (no AR was estimated), got {} entries",
report.entries.len()
);
}
fn make_hydro(hydro_id: EntityId, bus_id: EntityId) -> cobre_core::entities::hydro::Hydro {
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel};
Hydro {
id: hydro_id,
name: format!("H{}", hydro_id.0),
bus_id,
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 5000.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 1000.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 900.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: cobre_core::entities::hydro::HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 1000.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
}
}
#[allow(clippy::cast_possible_wrap)]
fn build_two_hydro_system_selective_stats(
n_years: usize,
all_hydro_ids: &[EntityId],
stats_hydro_ids: &[EntityId],
) -> System {
use cobre_core::scenario::InflowModel;
use cobre_core::{Bus, DeficitSegment, SystemBuilder};
let bus_id = EntityId(10);
let bus = Bus {
id: bus_id,
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let ref_year = 1970_i32;
let mut stages = Vec::with_capacity(n_years * 2);
for y in 0..n_years {
let year = ref_year + y as i32;
stages.push(make_two_season_stage(y * 2, (y * 2) as i32, 0, year, true));
stages.push(make_two_season_stage(
y * 2 + 1,
(y * 2 + 1) as i32,
1,
year,
false,
));
}
let inflow_models: Vec<InflowModel> = stats_hydro_ids
.iter()
.flat_map(|&hid| {
stages.iter().map(move |s| InflowModel {
hydro_id: hid,
stage_id: s.id,
mean_m3s: 100.0,
std_m3s: 10.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
})
})
.collect();
let hydros: Vec<_> = all_hydro_ids
.iter()
.map(|&hid| make_hydro(hid, bus_id))
.collect();
SystemBuilder::new()
.buses(vec![bus])
.hydros(hydros)
.stages(stages)
.inflow_models(inflow_models)
.build()
.expect("valid two-hydro system")
}
fn write_history_for_hydros(path: &std::path::Path, hydro_ids: &[i32], n_years: usize) {
use arrow::array::{Date32Array, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use chrono::NaiveDate;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_to_days = |d: NaiveDate| -> i32 {
i32::try_from((d - epoch).num_days()).expect("date in Date32 range")
};
let (obs_s0, obs_s1) = simulate_two_season_par2(0.7, 0.15, n_years, 99);
let mut ids: Vec<i32> = Vec::new();
let mut dates: Vec<i32> = Vec::new();
let mut values: Vec<f64> = Vec::new();
for &hid in hydro_ids {
for y in 0..n_years {
let year = (1970 + y) as i32;
ids.push(hid);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 1, 15).unwrap()));
values.push(obs_s0[y] + 300.0);
ids.push(hid);
dates.push(date_to_days(NaiveDate::from_ymd_opt(year, 7, 15).unwrap()));
values.push(obs_s1[y] + 300.0);
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("value_m3s", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Date32Array::from(dates)),
Arc::new(Float64Array::from(values)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[test]
fn test_partial_estimation_direction_a_missing_stats() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1, 2], N_YEARS);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1), EntityId(2)],
&[EntityId(1)], );
let config = default_config();
let result = estimate_from_history(system, case_dir, &config);
assert!(
result.is_err(),
"Direction A must return Err when hydro 2 has AR estimates but no user stats"
);
let err = result.unwrap_err();
let description = err.to_string();
assert!(
description.contains('2'),
"error description must contain the uncovered hydro ID '2', got: {description}"
);
}
#[test]
fn test_partial_estimation_direction_b_white_noise_fallback() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1], N_YEARS);
std::fs::write(scenarios.join("inflow_seasonal_stats.parquet"), b"sentinel")
.expect("write sentinel");
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1), EntityId(2)],
&[EntityId(1), EntityId(2)], );
let config = default_config();
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("Direction B must succeed (not an error)");
assert_eq!(
path,
EstimationPath::PartialEstimation,
"expected PartialEstimation path"
);
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert_eq!(
report.white_noise_fallbacks,
vec![EntityId(2)],
"white_noise_fallbacks must be [EntityId(2)], got {:?}",
report.white_noise_fallbacks
);
let hydro2_models: Vec<_> = updated
.inflow_models()
.iter()
.filter(|m| m.hydro_id == EntityId(2))
.collect();
assert!(
!hydro2_models.is_empty(),
"returned system must have inflow models for hydro 2"
);
for m in &hydro2_models {
assert!(
m.ar_coefficients.is_empty(),
"hydro 2 must have empty ar_coefficients (white-noise fallback), stage {}",
m.stage_id
);
}
}
#[test]
fn test_partial_estimation_exact_coverage_no_fallback() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
setup_partial_estimation_case(case_dir, N_YEARS);
let system = build_system_with_user_stats(N_YEARS);
let config = default_config();
let (_updated, report, _path) =
estimate_from_history(system, case_dir, &config).expect("exact coverage must succeed");
let report = report.expect("PartialEstimation must return Some(EstimationReport)");
assert!(
report.white_noise_fallbacks.is_empty(),
"white_noise_fallbacks must be empty for exact coverage, got {:?}",
report.white_noise_fallbacks
);
}
#[test]
fn test_full_estimation_report_has_empty_fallbacks() {
use tempfile::TempDir;
const N_YEARS: usize = 30;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_history_for_hydros(&scenarios.join("inflow_history.parquet"), &[1], N_YEARS);
let system = build_two_hydro_system_selective_stats(
N_YEARS,
&[EntityId(1)],
&[], );
let config = default_config();
let (_, report, path) =
estimate_from_history(system, case_dir, &config).expect("FullEstimation must succeed");
assert_eq!(
path,
EstimationPath::FullEstimation,
"expected FullEstimation path"
);
let report = report.expect("FullEstimation must return Some(EstimationReport)");
assert!(
report.white_noise_fallbacks.is_empty(),
"FullEstimation must never populate white_noise_fallbacks, got {:?}",
report.white_noise_fallbacks
);
}
fn collect_std_ratio_warnings(
hydro_id: EntityId,
user_stds: &[f64],
est_stds: &[f64],
) -> Vec<StdRatioDivergence> {
use cobre_core::scenario::InflowModel;
assert_eq!(
user_stds.len(),
est_stds.len(),
"user_stds and est_stds must have equal length"
);
let n = user_stds.len();
let stages: Vec<cobre_core::temporal::Stage> = (0..n)
.map(|i| {
let year = 1970_i32;
let first_half = i % 2 == 0;
make_two_season_stage(i, i as i32, i, year, first_half)
})
.collect();
let user_models: Vec<InflowModel> = (0..n)
.map(|i| InflowModel {
hydro_id,
stage_id: i as i32,
mean_m3s: 100.0,
std_m3s: user_stds[i],
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
})
.collect();
let system = SystemBuilder::new()
.inflow_models(user_models)
.stages(stages.clone())
.build()
.expect("valid system");
let fitting_stats: Vec<SeasonalStats> = (0..n)
.map(|i| SeasonalStats {
entity_id: hydro_id,
stage_id: i as i32,
mean: 100.0,
std: est_stds[i],
})
.collect();
check_std_ratio_divergence(&system, &fitting_stats, &stages)
}
#[test]
fn test_std_ratio_divergence_fires_when_ratios_diverge() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0], &[100.0, 100.0]);
assert!(
!warnings.is_empty(),
"expected at least one StdRatioDivergence when ratio diverges by 5x"
);
let pair_0_1 = warnings.iter().find(|w| w.season_a == 0 && w.season_b == 1);
assert!(
pair_0_1.is_some(),
"expected a warning for season pair 0→1, got {warnings:?}"
);
let w = pair_0_1.unwrap();
assert_eq!(
w.hydro_id,
EntityId(1),
"warning must record the correct hydro_id"
);
assert!(
(w.divergence - 5.0).abs() < 1e-10,
"divergence for pair 0→1 must be 5.0, got {}",
w.divergence
);
}
#[test]
fn test_std_ratio_divergence_not_fires_when_similar() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0], &[90.0, 18.0]);
assert!(
warnings.is_empty(),
"expected no StdRatioDivergence when ratios are similar, got {warnings:?}"
);
}
#[test]
fn test_std_ratio_divergence_skips_near_zero_std() {
let warnings = collect_std_ratio_warnings(EntityId(1), &[100.0, 0.0], &[90.0, 18.0]);
let _ = warnings; }
#[test]
fn test_std_ratio_divergence_wraps_last_to_first() {
let warnings =
collect_std_ratio_warnings(EntityId(1), &[100.0, 20.0, 50.0], &[100.0, 20.0, 10.0]);
assert!(
warnings.len() >= 2,
"expected at least 2 StdRatioDivergence entries (including wrap), got {}",
warnings.len()
);
let has_wrap = warnings.iter().any(|w| w.season_a == 2 && w.season_b == 0);
assert!(
has_wrap,
"expected a warning for the wrap-around pair season 2 → season 0"
);
}
use chrono::NaiveDate;
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, StageRiskConfig, StageStateConfig,
};
fn make_monthly_stages_for_annual(n_years: usize) -> Vec<cobre_core::temporal::Stage> {
let mut stages = Vec::new();
let mut idx = 0usize;
for year in 0..n_years {
for month in 0..12usize {
let y = 2000 + year as i32;
let m = month as u32 + 1;
let (ey, em) = if m == 12 { (y + 1, 1u32) } else { (y, m + 1) };
stages.push(cobre_core::temporal::Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(y, m, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(ey, em, 1).unwrap(),
season_id: Some(month),
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
});
idx += 1;
}
}
stages
}
fn synthetic_monthly_obs(
hydro_id: EntityId,
n_years: usize,
base: f64,
scale: f64,
drift: f64,
) -> Vec<(EntityId, NaiveDate, f64)> {
let mut obs = Vec::new();
for year in 0..n_years {
for month in 0..12usize {
let value = base
+ f64::from(u32::try_from(month + 1).unwrap()) * scale
+ f64::from(u32::try_from(year).unwrap()) * drift;
let date = NaiveDate::from_ymd_opt(
2000 + i32::try_from(year).unwrap(),
u32::try_from(month + 1).unwrap(),
1,
)
.unwrap();
obs.push((hydro_id, date, value));
}
}
obs
}
#[allow(clippy::cast_possible_wrap)]
fn build_two_hydro_monthly_system(n_years: usize) -> System {
use cobre_core::{Bus, DeficitSegment, SystemBuilder};
let bus_id = EntityId(10);
let bus = Bus {
id: bus_id,
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: Some(f64::INFINITY),
cost_per_mwh: 3000.0,
}],
excess_cost: 0.0,
};
let h1 = EntityId(1);
let h2 = EntityId(2);
let stages = make_monthly_stages_for_annual(n_years);
SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![make_hydro(h1, bus_id), make_hydro(h2, bus_id)])
.stages(stages)
.build()
.expect("valid two-hydro monthly system")
}
fn write_monthly_inflow_history_two_hydros(path: &std::path::Path, n_years: usize) {
use arrow::array::{Date32Array, Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;
let h1 = EntityId(1);
let h2 = EntityId(2);
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date_to_days = |d: NaiveDate| -> i32 { i32::try_from((d - epoch).num_days()).unwrap() };
let obs_h1 = synthetic_monthly_obs(h1, n_years, 100.0, 5.0, 1.0);
let obs_h2 = synthetic_monthly_obs(h2, n_years, 200.0, 3.0, 0.5);
let mut ids: Vec<i32> = Vec::new();
let mut dates: Vec<i32> = Vec::new();
let mut values: Vec<f64> = Vec::new();
for &(eid, date, value) in obs_h1.iter().chain(obs_h2.iter()) {
ids.push(eid.0);
dates.push(date_to_days(date));
values.push(value);
}
let schema = Arc::new(Schema::new(vec![
Field::new("hydro_id", DataType::Int32, false),
Field::new("date", DataType::Date32, false),
Field::new("value_m3s", DataType::Float64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Date32Array::from(dates)),
Arc::new(Float64Array::from(values)),
],
)
.expect("valid batch");
let file = std::fs::File::create(path).expect("create parquet file");
let mut writer = ArrowWriter::try_new(file, schema, None).expect("ArrowWriter");
writer.write(&batch).expect("write batch");
writer.close().expect("close writer");
}
#[test]
fn estimate_from_history_pacf_annual_populates_annual_field() {
use cobre_io::config::{EstimationConfig, OrderSelectionMethod};
use tempfile::TempDir;
const N_YEARS: usize = 5;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_monthly_inflow_history_two_hydros(&scenarios.join("inflow_history.parquet"), N_YEARS);
let system = build_two_hydro_monthly_system(N_YEARS);
let mut config: Config = serde_json::from_str(MINIMAL_CONFIG_JSON).unwrap();
config.estimation = EstimationConfig {
max_order: 2,
order_selection: OrderSelectionMethod::PacfAnnual,
min_observations_per_season: 2,
max_coefficient_magnitude: None,
};
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("full estimation with PacfAnnual must succeed");
assert_eq!(
path,
EstimationPath::FullEstimation,
"expected FullEstimation path"
);
assert!(report.is_some(), "FullEstimation must return Some(report)");
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
assert!(
models.iter().any(|m| m.annual.is_some()),
"PacfAnnual path must set annual=Some on at least one model"
);
}
#[test]
fn estimate_from_history_pacf_classical_keeps_annual_none() {
use cobre_io::config::{EstimationConfig, OrderSelectionMethod};
use tempfile::TempDir;
const N_YEARS: usize = 5;
let dir = TempDir::new().unwrap();
let case_dir = dir.path();
create_required_files(case_dir);
let scenarios = case_dir.join("scenarios");
std::fs::create_dir_all(&scenarios).unwrap();
write_monthly_inflow_history_two_hydros(&scenarios.join("inflow_history.parquet"), N_YEARS);
let system = build_two_hydro_monthly_system(N_YEARS);
let mut config: Config = serde_json::from_str(MINIMAL_CONFIG_JSON).unwrap();
config.estimation = EstimationConfig {
max_order: 2,
order_selection: OrderSelectionMethod::Pacf,
min_observations_per_season: 2,
max_coefficient_magnitude: None,
};
let (updated, report, path) = estimate_from_history(system, case_dir, &config)
.expect("full estimation with Pacf must succeed");
assert_eq!(
path,
EstimationPath::FullEstimation,
"expected FullEstimation path"
);
assert!(report.is_some(), "FullEstimation must return Some(report)");
let models = updated.inflow_models();
assert!(
!models.is_empty(),
"estimation must produce at least one inflow model"
);
assert!(
models.iter().all(|m| m.annual.is_none()),
"classical Pacf path must keep annual=None for every model"
);
}
#[test]
fn estimate_ar_coefficients_with_selection_classical_path_unchanged() {
let h1 = EntityId(1);
let h2 = EntityId(2);
let n_years = 30;
let stages = make_monthly_stages_for_annual(n_years);
let mut obs = synthetic_monthly_obs(h1, n_years, 100.0, 5.0, 1.0);
obs.extend(synthetic_monthly_obs(h2, n_years, 200.0, 3.0, 0.5));
let seasonal_stats = {
use cobre_stochastic::par::fitting::estimate_seasonal_stats_with_season_map;
estimate_seasonal_stats_with_season_map(&obs, &stages, &[h1, h2], None).unwrap()
};
let (estimates, report) = estimate_ar_coefficients_with_selection(
&obs,
&seasonal_stats,
&stages,
&[h1, h2],
&ArEstimationConfig {
max_order: 3,
max_coeff_magnitude: None,
season_map: None,
use_annual_component: false,
},
)
.expect("classical path must succeed");
assert_eq!(
report.method, "PACF",
"classical path must produce method=PACF, got {}",
report.method
);
for est in &estimates {
assert!(
est.annual.is_none(),
"classical path: hydro={} season={} must have annual=None",
est.hydro_id.0,
est.season_id
);
}
}
fn monthly_season_map() -> cobre_core::temporal::SeasonMap {
use cobre_core::temporal::{SeasonCycleType, SeasonDefinition, SeasonMap};
let seasons = (0..12usize)
.map(|m| SeasonDefinition {
id: m,
label: format!("M{m}"),
month_start: u32::try_from(m + 1).unwrap(),
day_start: None,
month_end: None,
day_end: None,
})
.collect();
SeasonMap {
cycle_type: SeasonCycleType::Monthly,
seasons,
}
}
fn partial_year_stages(
first_season: usize,
n: usize,
start_year: i32,
) -> Vec<cobre_core::temporal::Stage> {
(0..n)
.map(|k| {
let season = first_season + k;
let m = u32::try_from(season + 1).unwrap();
let (ey, em) = if m == 12 {
(start_year + 1, 1u32)
} else {
(start_year, m + 1)
};
cobre_core::temporal::Stage {
index: k,
id: i32::try_from(k).unwrap(),
start_date: NaiveDate::from_ymd_opt(start_year, m, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(ey, em, 1).unwrap(),
season_id: Some(season),
blocks: vec![Block {
index: 0,
name: "SINGLE".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
}
})
.collect()
}
#[test]
fn partial_year_par2_synthesizes_prestudy_lag_models() {
let h1 = EntityId(1);
let n_years = 30;
let max_order = 2usize;
let season_map = monthly_season_map();
let study_stages = partial_year_stages(8, 4, 2030);
let obs = synthetic_monthly_obs(h1, n_years, 100.0, 5.0, 1.0);
let prestudy = synthesize_prestudy_stages(&study_stages, max_order, Some(&season_map));
assert_eq!(
prestudy.len(),
2,
"expected 2 synthetic pre-study stages, got {}",
prestudy.len()
);
let mut pre_ids: Vec<i32> = prestudy.iter().map(|s| s.id).collect();
pre_ids.sort_unstable();
assert_eq!(pre_ids, vec![-2, -1], "pre-study ids must be -1, -2");
let season_of = |id: i32| prestudy.iter().find(|s| s.id == id).unwrap().season_id;
assert_eq!(
season_of(-1),
Some(7),
"stage -1 must map to season 7 (Aug)"
);
assert_eq!(
season_of(-2),
Some(6),
"stage -2 must map to season 6 (Jul)"
);
let stages: Vec<cobre_core::temporal::Stage> = study_stages
.iter()
.cloned()
.chain(prestudy.iter().cloned())
.collect();
let seasonal_stats = {
use cobre_stochastic::par::fitting::estimate_seasonal_stats_with_season_map;
estimate_seasonal_stats_with_season_map(&obs, &stages, &[h1], Some(&season_map))
.expect("seasonal stats must fit without panic")
};
let (ar_estimates, _report) = estimate_ar_coefficients_with_selection(
&obs,
&seasonal_stats,
&stages,
&[h1],
&ArEstimationConfig {
max_order,
max_coeff_magnitude: None,
season_map: Some(&season_map),
use_annual_component: false,
},
)
.expect("AR(2) fit must succeed");
let stats_rows = seasonal_stats_to_rows(&seasonal_stats, &stages);
let coeff_rows = ar_estimates_to_rows(&ar_estimates, &stages);
let annual_rows = ar_estimates_to_annual_rows(&ar_estimates, &stages);
let inflow_models = assemble_inflow_models(stats_rows, coeff_rows, annual_rows)
.expect("assembly must succeed with pre-study rows present");
let neg_models: Vec<&cobre_core::scenario::InflowModel> =
inflow_models.iter().filter(|m| m.stage_id < 0).collect();
assert!(
neg_models.iter().any(|m| m.stage_id == -1)
&& neg_models.iter().any(|m| m.stage_id == -2),
"expected InflowModel entries at stage_id -1 and -2, got {:?}",
neg_models.iter().map(|m| m.stage_id).collect::<Vec<_>>()
);
let par = cobre_stochastic::PrecomputedPar::build(
&inflow_models,
&study_stages,
&[h1],
Some(season_map.seasons.len()),
)
.expect("PrecomputedPar build must succeed");
let par_order = par.max_order();
assert!(
par_order >= 1 && par_order <= max_order,
"selected order {par_order} must be in 1..={max_order}"
);
let psi0 = par.psi_slice(0, 0);
assert_eq!(
psi0.len(),
par_order,
"psi stride must equal selected order"
);
assert!(
psi0.iter().any(|&p| p.abs() > 1e-9),
"first study stage psi must be non-zero for pre-study lags, got {psi0:?}"
);
}
}