use std::path::Path;
use cobre_core::System;
use crate::SddpError;
mod evaporation;
mod export;
mod production;
mod summary;
mod types;
pub use evaporation::{resolve_evaporation_models, resolve_evaporation_models_from_artifacts};
pub use export::{
build_deviation_summary, build_evaporation_model_rows, build_fpha_deviation_point_rows,
};
pub use production::{resolve_production_models, resolve_production_models_from_artifacts};
pub use summary::build_hydro_model_summary;
#[derive(Debug, Clone, Copy, Default)]
pub struct HydroFitTimings {
pub production_fit_seconds: f64,
pub evaporation_fit_seconds: f64,
}
pub use types::{
EvaporationModel, EvaporationModelSet, EvaporationReferenceSource, EvaporationSource,
FphaFitDeviationEntry, FphaHydroDetail, FphaPlane, HydroModelProvenance, HydroModelSummary,
LinearizedEvaporation, PrepareHydroModelsResult, ProductionModelSet, ProductionModelSource,
ResolvedProductionModel,
};
pub fn prepare_hydro_models(
system: &System,
case_dir: &Path,
collect_deviation_points: bool,
) -> Result<PrepareHydroModelsResult, SddpError> {
let artifacts = load_artifacts_for_hydro_models(case_dir)?;
prepare_hydro_models_from_artifacts(system, &artifacts, collect_deviation_points, None)
}
pub fn prepare_hydro_models_from_artifacts(
system: &System,
artifacts: &cobre_io::CaseArtifacts,
collect_deviation_points: bool,
timings: Option<&mut HydroFitTimings>,
) -> Result<PrepareHydroModelsResult, SddpError> {
let production_start = std::time::Instant::now();
let (
production,
productivity_override,
production_sources,
fpha_export_rows,
reference_volumes_hm3,
fpha_fit_deviations,
fpha_deviation_point_rows,
) = resolve_production_models_from_artifacts(system, artifacts, collect_deviation_points)?;
let production_fit_seconds = production_start.elapsed().as_secs_f64();
let evaporation_start = std::time::Instant::now();
let (evaporation, evaporation_sources, evaporation_reference_sources) =
resolve_evaporation_models_from_artifacts(system, artifacts)?;
let evaporation_fit_seconds = evaporation_start.elapsed().as_secs_f64();
if let Some(timings) = timings {
timings.production_fit_seconds = production_fit_seconds;
timings.evaporation_fit_seconds = evaporation_fit_seconds;
}
let mut vha_geometry_by_hydro: std::collections::HashMap<
cobre_core::EntityId,
Vec<cobre_io::HydroGeometryRow>,
> = std::collections::HashMap::new();
for row in &artifacts.hydro_geometry {
vha_geometry_by_hydro
.entry(row.hydro_id)
.or_default()
.push(row.clone());
}
for rows in vha_geometry_by_hydro.values_mut() {
rows.sort_by(|a, b| a.volume_hm3.total_cmp(&b.volume_hm3));
}
Ok(PrepareHydroModelsResult {
production,
productivity_override,
evaporation,
provenance: HydroModelProvenance {
production_sources,
evaporation_sources,
evaporation_reference_sources,
},
fpha_export_rows,
reference_volumes_hm3,
vha_geometry_by_hydro,
fpha_fit_deviations,
fpha_deviation_point_rows,
})
}
fn load_artifacts_for_hydro_models(case_dir: &Path) -> Result<cobre_io::CaseArtifacts, SddpError> {
let mut ctx = cobre_io::ValidationContext::new();
let manifest = cobre_io::validate_structure(case_dir, &mut ctx);
ctx.into_result().map_err(SddpError::from)?;
let prod_path = if manifest.system_hydro_production_models_json {
Some(case_dir.join("system").join("hydro_production_models.json"))
} else {
None
};
let geom_path = if manifest.system_hydro_geometry_parquet {
Some(case_dir.join("system").join("hydro_geometry.parquet"))
} else {
None
};
let fpha_path = if manifest.system_fpha_hyperplanes_parquet {
Some(case_dir.join("system").join("fpha_hyperplanes.parquet"))
} else {
None
};
let prod_eff_path = case_dir
.join("system")
.join("hydro_energy_productivity.parquet");
let prod_eff_path_opt = if prod_eff_path.exists() {
Some(prod_eff_path.as_path())
} else {
None
};
let tailrace_path = if manifest.system_tailrace_curves_parquet {
Some(case_dir.join("system").join("tailrace_curves.parquet"))
} else {
None
};
let production_file = cobre_io::extensions::load_production_models(prod_path.as_deref())?;
Ok(cobre_io::CaseArtifacts {
file_manifest: manifest,
hydro_geometry: cobre_io::extensions::load_hydro_geometry(geom_path.as_deref())?,
production_models: production_file.configs,
plane_reduction: production_file.plane_reduction,
hydro_energy_productivity: cobre_io::load_hydro_energy_productivity(prod_eff_path_opt)?,
fpha_hyperplanes: cobre_io::extensions::load_fpha_hyperplanes(fpha_path.as_deref())?,
scalar_parameters: Vec::new(),
tailrace_curves: cobre_io::extensions::load_tailrace_curves(tailrace_path.as_deref())?,
})
}
#[cfg(test)]
#[allow(
clippy::doc_markdown,
clippy::match_wildcard_for_single_variants,
clippy::cast_precision_loss,
clippy::unwrap_used,
clippy::expect_used,
clippy::panic
)]
mod tests {
#[test]
fn prepare_hydro_models_fpha_export_rows_are_identical_across_ranks() {
let case_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cobre-sddp parent dir must exist")
.parent()
.expect("crates parent dir must exist")
.join("examples/deterministic/d07-fpha-computed");
let system =
cobre_io::load_case(&case_dir).expect("d07-fpha-computed must load successfully");
let result_rank0 = super::prepare_hydro_models(&system, &case_dir, false)
.expect("prepare_hydro_models must succeed for rank 0");
let result_rank1 = super::prepare_hydro_models(&system, &case_dir, false)
.expect("prepare_hydro_models must succeed for rank 1");
assert!(
!result_rank0.fpha_export_rows.is_empty(),
"rank 0: fpha_export_rows must be non-empty for a computed-FPHA case"
);
assert_eq!(
result_rank0.fpha_export_rows, result_rank1.fpha_export_rows,
"fpha_export_rows must be bit-identical across ranks (deterministic preprocessing)"
);
}
#[test]
fn prepare_hydro_models_carries_sorted_vha_geometry() {
let case_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cobre-sddp parent dir must exist")
.parent()
.expect("crates parent dir must exist")
.join("examples/deterministic/d07-fpha-computed");
let system =
cobre_io::load_case(&case_dir).expect("d07-fpha-computed must load successfully");
let result = super::prepare_hydro_models(&system, &case_dir, false)
.expect("prepare_hydro_models must succeed");
assert!(
!result.vha_geometry_by_hydro.is_empty(),
"a computed-FPHA case ships VHA geometry; the map must be populated"
);
for rows in result.vha_geometry_by_hydro.values() {
assert!(!rows.is_empty(), "no plant entry may be empty");
assert!(
rows.windows(2).all(|w| w[0].volume_hm3 <= w[1].volume_hm3),
"each plant's VHA rows must be sorted by ascending volume"
);
}
}
#[test]
fn fit_is_thread_count_invariant() {
type FitOutputs = (
Vec<cobre_io::FphaHyperplaneRow>,
Vec<super::FphaFitDeviationEntry>,
);
let case_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cobre-sddp parent dir must exist")
.parent()
.expect("crates parent dir must exist")
.join("examples/deterministic/d31-backwater-reference-volume");
let system = cobre_io::load_case(&case_dir)
.expect("d31-backwater-reference-volume must load successfully");
let resolve_under_pool = |n: usize| -> FitOutputs {
rayon::ThreadPoolBuilder::new()
.num_threads(n)
.build()
.expect("rayon pool must build")
.install(|| {
let result = super::prepare_hydro_models(&system, &case_dir, false)
.expect("prepare_hydro_models must succeed");
(result.fpha_export_rows, result.fpha_fit_deviations)
})
};
let thread_counts = [1usize, 2, 4];
let outputs: Vec<FitOutputs> = thread_counts
.iter()
.map(|&n| resolve_under_pool(n))
.collect();
let rows: Vec<&Vec<cobre_io::FphaHyperplaneRow>> = outputs.iter().map(|(r, _)| r).collect();
let deviations: Vec<&Vec<super::FphaFitDeviationEntry>> =
outputs.iter().map(|(_, d)| d).collect();
assert!(
!rows[0].is_empty(),
"the computed-FPHA case must export at least one plane for the fit path to run"
);
assert!(
!deviations[0].is_empty(),
"the computed-FPHA case must record at least one fit deviation"
);
let assert_bit_identical = |a: &[cobre_io::FphaHyperplaneRow],
b: &[cobre_io::FphaHyperplaneRow],
threads_a: usize,
threads_b: usize| {
assert_eq!(
a.len(),
b.len(),
"row count must match across {threads_a}- and {threads_b}-thread pools"
);
for (ra, rb) in a.iter().zip(b) {
assert_eq!(ra.hydro_id, rb.hydro_id, "hydro_id must match");
assert_eq!(ra.stage_id, rb.stage_id, "stage_id must match");
assert_eq!(ra.plane_id, rb.plane_id, "plane_id must match");
assert_eq!(
ra.gamma_0.to_bits(),
rb.gamma_0.to_bits(),
"gamma_0 must be bit-identical across pool sizes"
);
assert_eq!(
ra.gamma_v.to_bits(),
rb.gamma_v.to_bits(),
"gamma_v must be bit-identical across pool sizes"
);
assert_eq!(
ra.gamma_q.to_bits(),
rb.gamma_q.to_bits(),
"gamma_q must be bit-identical across pool sizes"
);
assert_eq!(
ra.gamma_s.to_bits(),
rb.gamma_s.to_bits(),
"gamma_s must be bit-identical across pool sizes"
);
assert_eq!(
ra.kappa.to_bits(),
rb.kappa.to_bits(),
"kappa must be bit-identical across pool sizes"
);
}
};
let assert_deviations_bit_identical =
|a: &[super::FphaFitDeviationEntry],
b: &[super::FphaFitDeviationEntry],
threads_a: usize,
threads_b: usize| {
assert_eq!(
a.len(),
b.len(),
"deviation count must match across {threads_a}- and {threads_b}-thread pools"
);
for (da, db) in a.iter().zip(b) {
assert_eq!(da.hydro_id, db.hydro_id, "deviation hydro_id must match");
assert_eq!(da.stage_id, db.stage_id, "deviation stage_id must match");
assert_eq!(
da.mean_abs_mw.to_bits(),
db.mean_abs_mw.to_bits(),
"mean_abs_mw must be bit-identical across pool sizes"
);
assert_eq!(
da.max_abs_mw.to_bits(),
db.max_abs_mw.to_bits(),
"max_abs_mw must be bit-identical across pool sizes"
);
assert_eq!(
da.mean_signed_mw.to_bits(),
db.mean_signed_mw.to_bits(),
"mean_signed_mw must be bit-identical across pool sizes"
);
assert_eq!(
da.relative.to_bits(),
db.relative.to_bits(),
"relative must be bit-identical across pool sizes"
);
}
};
for ((rows_n, deviations_n), &n) in rows.iter().zip(&deviations).zip(&thread_counts).skip(1)
{
assert_bit_identical(rows[0], rows_n, thread_counts[0], n);
assert_deviations_bit_identical(deviations[0], deviations_n, thread_counts[0], n);
}
}
#[test]
fn deviation_points_are_thread_count_invariant_when_on() {
let case_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cobre-sddp parent dir must exist")
.parent()
.expect("crates parent dir must exist")
.join("examples/deterministic/d07-fpha-computed");
let system =
cobre_io::load_case(&case_dir).expect("d07-fpha-computed must load successfully");
let resolve_under_pool = |n: usize| -> Vec<cobre_io::FphaDeviationPointRow> {
rayon::ThreadPoolBuilder::new()
.num_threads(n)
.build()
.expect("rayon pool must build")
.install(|| {
super::prepare_hydro_models(&system, &case_dir, true)
.expect("prepare_hydro_models must succeed")
.fpha_deviation_point_rows
})
};
let thread_counts = [1usize, 2, 4];
let outputs: Vec<Vec<cobre_io::FphaDeviationPointRow>> = thread_counts
.iter()
.map(|&n| resolve_under_pool(n))
.collect();
assert!(
!outputs[0].is_empty(),
"flag-on computed-FPHA case must emit at least one deviation point row"
);
for (rows_n, &n) in outputs.iter().zip(&thread_counts).skip(1) {
assert_eq!(
outputs[0].len(),
rows_n.len(),
"deviation-point row count must match across {}- and {n}-thread pools",
thread_counts[0]
);
for (a, b) in outputs[0].iter().zip(rows_n) {
assert_eq!(a.hydro_id, b.hydro_id, "hydro_id must match");
assert_eq!(a.stage_id, b.stage_id, "stage_id must match");
assert_eq!(a.v.to_bits(), b.v.to_bits(), "v must be bit-identical");
assert_eq!(a.q.to_bits(), b.q.to_bits(), "q must be bit-identical");
assert_eq!(
a.fph_exact.to_bits(),
b.fph_exact.to_bits(),
"fph_exact must be bit-identical"
);
assert_eq!(
a.fpha_fitted.to_bits(),
b.fpha_fitted.to_bits(),
"fpha_fitted must be bit-identical"
);
assert_eq!(
a.deviation.to_bits(),
b.deviation.to_bits(),
"deviation must be bit-identical"
);
assert_eq!(
a.relative.to_bits(),
b.relative.to_bits(),
"relative must be bit-identical"
);
}
}
}
#[test]
fn deviation_points_off_is_empty_and_does_not_perturb_export_rows() {
let case_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cobre-sddp parent dir must exist")
.parent()
.expect("crates parent dir must exist")
.join("examples/deterministic/d07-fpha-computed");
let system =
cobre_io::load_case(&case_dir).expect("d07-fpha-computed must load successfully");
let off = super::prepare_hydro_models(&system, &case_dir, false)
.expect("prepare_hydro_models (off) must succeed");
let on = super::prepare_hydro_models(&system, &case_dir, true)
.expect("prepare_hydro_models (on) must succeed");
assert!(
off.fpha_deviation_point_rows.is_empty(),
"flag-off must collect no deviation point rows"
);
assert!(
!on.fpha_deviation_point_rows.is_empty(),
"flag-on must collect deviation point rows for the computed-FPHA case"
);
assert_eq!(
off.fpha_export_rows.len(),
on.fpha_export_rows.len(),
"export-row count must not depend on the deviation-points opt-in"
);
for (a, b) in off.fpha_export_rows.iter().zip(&on.fpha_export_rows) {
assert_eq!(a.hydro_id, b.hydro_id, "hydro_id must match");
assert_eq!(a.stage_id, b.stage_id, "stage_id must match");
assert_eq!(a.plane_id, b.plane_id, "plane_id must match");
assert_eq!(
a.gamma_0.to_bits(),
b.gamma_0.to_bits(),
"gamma_0 must be bit-identical with the flag on vs off"
);
assert_eq!(
a.gamma_v.to_bits(),
b.gamma_v.to_bits(),
"gamma_v must match"
);
assert_eq!(
a.gamma_q.to_bits(),
b.gamma_q.to_bits(),
"gamma_q must match"
);
assert_eq!(
a.gamma_s.to_bits(),
b.gamma_s.to_bits(),
"gamma_s must match"
);
assert_eq!(a.kappa.to_bits(), b.kappa.to_bits(), "kappa must match");
}
}
}