use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use cobre_comm::Communicator;
use cobre_core::{EntityId, System, TrainingEvent, entities::hydro::HydroGenerationModel};
use cobre_solver::{SolverError, SolverInterface};
use cobre_stochastic::{StochasticContext, context::OpeningTree};
use crate::{
FutureCostFunction, HorizonMode, InflowNonNegativityMethod, RiskMeasure, SddpError,
SimulationConfig, SimulationError, SimulationScenarioResult, SolverWorkspace, StageContext,
StageIndexer, StageTemplates, TrainingConfig, TrainingContext, TrainingOutcome, TrainingResult,
WorkspacePool, build_stage_templates,
cut_selection::{CutSelectionStrategy, parse_cut_selection_config},
hydro_models::{EvaporationModel, PrepareHydroModelsResult, ResolvedProductionModel},
lp_builder,
simulation::{EntityCounts, SimulationOutputSpec},
stopping_rule::{StoppingMode, StoppingRule, StoppingRuleSet},
};
pub const DEFAULT_FORWARD_PASSES: u32 = 1;
pub const DEFAULT_MAX_ITERATIONS: u64 = 100;
pub const DEFAULT_SEED: u64 = 42;
#[derive(Debug, Clone)]
pub struct StudyParams {
pub seed: u64,
pub forward_passes: u32,
pub stopping_rule_set: StoppingRuleSet,
pub n_scenarios: u32,
pub io_channel_capacity: usize,
pub policy_path: String,
pub inflow_method: InflowNonNegativityMethod,
pub cut_selection: Option<CutSelectionStrategy>,
pub cut_activity_tolerance: f64,
}
impl StudyParams {
pub fn from_config(config: &cobre_io::Config) -> Result<Self, SddpError> {
use cobre_io::config::StoppingRuleConfig;
let seed = config.training.seed.map_or(DEFAULT_SEED, i64::unsigned_abs);
let forward_passes = config
.training
.forward_passes
.unwrap_or(DEFAULT_FORWARD_PASSES);
let rule_configs = match &config.training.stopping_rules {
Some(rules) if !rules.is_empty() => rules.clone(),
_ => vec![StoppingRuleConfig::IterationLimit {
limit: u32::try_from(DEFAULT_MAX_ITERATIONS).unwrap_or(u32::MAX),
}],
};
let stopping_rules: Vec<StoppingRule> = rule_configs
.into_iter()
.map(|c| match c {
StoppingRuleConfig::IterationLimit { limit } => StoppingRule::IterationLimit {
limit: u64::from(limit),
},
StoppingRuleConfig::TimeLimit { seconds } => StoppingRule::TimeLimit { seconds },
StoppingRuleConfig::BoundStalling {
iterations,
tolerance,
} => StoppingRule::BoundStalling {
iterations: u64::from(iterations),
tolerance,
},
StoppingRuleConfig::Simulation { .. } => {
StoppingRule::IterationLimit {
limit: DEFAULT_MAX_ITERATIONS,
}
}
})
.collect();
let stopping_mode = if config.training.stopping_mode.eq_ignore_ascii_case("all") {
StoppingMode::All
} else {
StoppingMode::Any
};
let stopping_rule_set = StoppingRuleSet {
rules: stopping_rules,
mode: stopping_mode,
};
let n_scenarios = if config.simulation.enabled {
config.simulation.num_scenarios
} else {
0
};
let io_channel_capacity =
usize::try_from(config.simulation.io_channel_capacity).unwrap_or(64);
let policy_path = config.policy.path.clone();
let inflow_method = InflowNonNegativityMethod::from(&config.modeling.inflow_non_negativity);
let cut_selection = parse_cut_selection_config(&config.training.cut_selection)
.map_err(|msg| SddpError::Validation(format!("cut_selection config error: {msg}")))?;
let cut_activity_tolerance = config
.training
.cut_selection
.cut_activity_tolerance
.unwrap_or(0.0);
Ok(Self {
seed,
forward_passes,
stopping_rule_set,
n_scenarios,
io_channel_capacity,
policy_path,
inflow_method,
cut_selection,
cut_activity_tolerance,
})
}
}
#[derive(Debug)]
pub struct StudySetup {
stage_templates: StageTemplates,
stochastic: StochasticContext,
indexer: StageIndexer,
fcf: FutureCostFunction,
initial_state: Vec<f64>,
horizon: HorizonMode,
risk_measures: Vec<RiskMeasure>,
entity_counts: EntityCounts,
hydro_models: PrepareHydroModelsResult,
ncs_entity_ids_per_stage: Vec<Vec<i32>>,
ncs_max_gen: Vec<f64>,
block_counts_per_stage: Vec<usize>,
max_blocks: usize,
scaling_report: crate::scaling_report::ScalingReport,
seed: u64,
forward_passes: u32,
max_iterations: u64,
start_iteration: u64,
n_scenarios: u32,
io_channel_capacity: usize,
policy_path: String,
inflow_method: InflowNonNegativityMethod,
cut_selection: Option<CutSelectionStrategy>,
cut_activity_tolerance: f64,
stopping_rule_set: StoppingRuleSet,
}
impl StudySetup {
pub fn new(
system: &System,
config: &cobre_io::Config,
stochastic: StochasticContext,
hydro_models: PrepareHydroModelsResult,
) -> Result<Self, SddpError> {
let params = StudyParams::from_config(config)?;
Self::from_broadcast_params(
system,
stochastic,
params.seed,
params.forward_passes,
params.stopping_rule_set,
params.n_scenarios,
params.io_channel_capacity,
params.policy_path,
params.inflow_method,
params.cut_selection,
params.cut_activity_tolerance,
hydro_models,
)
}
#[allow(
clippy::too_many_arguments,
clippy::too_many_lines,
clippy::missing_panics_doc
)]
pub fn from_broadcast_params(
system: &System,
stochastic: StochasticContext,
seed: u64,
forward_passes: u32,
stopping_rule_set: StoppingRuleSet,
n_scenarios: u32,
io_channel_capacity: usize,
policy_path: String,
inflow_method: InflowNonNegativityMethod,
cut_selection: Option<CutSelectionStrategy>,
cut_activity_tolerance: f64,
hydro_models: PrepareHydroModelsResult,
) -> Result<Self, SddpError> {
use crate::scaling_report::{
LpDimensions, StageScalingReport, build_scaling_report, compute_coefficient_range,
summarize_scale_factors,
};
let mut stage_templates = build_stage_templates(
system,
&inflow_method,
stochastic.par(),
stochastic.normal(),
&hydro_models.production,
&hydro_models.evaporation,
)?;
{
let pg = system.policy_graph();
let study_stages: Vec<_> = system.stages().iter().filter(|s| s.id >= 0).collect();
stage_templates.discount_factors = study_stages
.iter()
.map(|stage| {
let rate = pg
.transitions
.iter()
.find(|tr| tr.source_id == stage.id)
.and_then(|tr| tr.annual_discount_rate_override)
.unwrap_or(pg.annual_discount_rate);
if rate == 0.0 {
1.0
} else {
let dt_days = f64::from(
i32::try_from((stage.end_date - stage.start_date).num_days())
.unwrap_or(i32::MAX),
);
1.0 / (1.0 + rate).powf(dt_days / 365.25)
}
})
.collect();
}
{
let n = stage_templates.discount_factors.len();
let mut cumulative = vec![1.0; n];
for t in 1..n {
cumulative[t] = cumulative[t - 1] * stage_templates.discount_factors[t - 1];
}
stage_templates.cumulative_discount_factors = cumulative;
}
if let Some(first) = stage_templates.templates.first() {
let theta_col = StageIndexer::new(stage_templates.n_hydros, first.max_par_order).theta;
for (s_idx, tmpl) in stage_templates.templates.iter_mut().enumerate() {
tmpl.objective[theta_col] *= stage_templates.discount_factors[s_idx];
}
}
let mut stage_scaling_reports = Vec::with_capacity(stage_templates.templates.len());
for (stage_id, tmpl) in stage_templates.templates.iter_mut().enumerate() {
let pre_scaling = compute_coefficient_range(tmpl);
let col_scale =
lp_builder::compute_col_scale(tmpl.num_cols, &tmpl.col_starts, &tmpl.values);
lp_builder::apply_col_scale(tmpl, &col_scale);
tmpl.col_scale.clone_from(&col_scale);
let row_scale = lp_builder::compute_row_scale(
tmpl.num_rows,
tmpl.num_cols,
&tmpl.col_starts,
&tmpl.row_indices,
&tmpl.values,
);
lp_builder::apply_row_scale(tmpl, &row_scale);
tmpl.row_scale.clone_from(&row_scale);
let post_scaling = compute_coefficient_range(tmpl);
stage_scaling_reports.push(StageScalingReport {
stage_id,
dimensions: LpDimensions {
num_cols: tmpl.num_cols,
num_rows: tmpl.num_rows,
num_nz: tmpl.num_nz,
},
pre_scaling,
post_scaling,
col_scale: summarize_scale_factors(&col_scale),
row_scale: summarize_scale_factors(&row_scale),
});
}
let scaling_report =
build_scaling_report(lp_builder::COST_SCALE_FACTOR, stage_scaling_reports);
let n_hydros_noise = stage_templates.n_hydros;
for (s_idx, tmpl) in stage_templates.templates.iter().enumerate() {
if !tmpl.row_scale.is_empty() {
let base_row = stage_templates.base_rows[s_idx];
for h in 0..n_hydros_noise {
stage_templates.noise_scale[s_idx * n_hydros_noise + h] *=
tmpl.row_scale[base_row + h];
}
}
}
if stage_templates.templates.is_empty() {
return Err(SddpError::Validation(
"system has no study stages".to_string(),
));
}
let stage_templates_ref = &stage_templates.templates;
let n_blks_stage0 = system.stages().first().map_or(1, |s| s.blocks.len().max(1));
let has_inflow_penalty =
inflow_method.has_slack_columns() && stage_templates_ref[0].n_hydro > 0;
let n_hydros = system.hydros().len();
let mut fpha_hydro_indices: Vec<usize> = Vec::new();
let mut fpha_planes: Vec<usize> = Vec::new();
let mut evap_hydro_indices: Vec<usize> = Vec::new();
for h_idx in 0..n_hydros {
if let ResolvedProductionModel::Fpha { planes, .. } =
hydro_models.production.model(h_idx, 0)
{
fpha_hydro_indices.push(h_idx);
fpha_planes.push(planes.len());
}
if matches!(
hydro_models.evaporation.model(h_idx),
EvaporationModel::Linearized { .. }
) {
evap_hydro_indices.push(h_idx);
}
}
let max_deficit_segments = system
.buses()
.iter()
.map(|b| b.deficit_segments.len())
.max()
.unwrap_or(0);
let eq_counts = crate::indexer::EquipmentCounts {
hydro_count: stage_templates_ref[0].n_hydro,
max_par_order: stage_templates_ref[0].max_par_order,
n_thermals: system.thermals().len(),
n_lines: system.lines().len(),
n_buses: system.buses().len(),
n_blks: n_blks_stage0,
has_inflow_penalty,
max_deficit_segments,
};
let fpha_cfg = crate::indexer::FphaColumnLayout {
hydro_indices: fpha_hydro_indices,
planes_per_hydro: fpha_planes,
};
let evap_cfg = crate::indexer::EvapConfig {
hydro_indices: evap_hydro_indices,
};
let mut indexer =
StageIndexer::with_equipment_and_evaporation(&eq_counts, &fpha_cfg, &evap_cfg);
if !stage_templates.ncs_col_starts.is_empty() {
let ncs_start = stage_templates.ncs_col_starts[0];
let n_ncs_stage0 = stage_templates.n_ncs_per_stage[0];
indexer.ncs_generation = ncs_start..(ncs_start + n_ncs_stage0 * n_blks_stage0);
for (s, &start) in stage_templates.ncs_col_starts.iter().enumerate() {
debug_assert_eq!(
start, ncs_start,
"NCS column start differs at stage {s}: expected {ncs_start}, got {start}"
);
}
}
if indexer.max_par_order > 0 && stochastic.par().n_hydros() > 0 {
let par = stochastic.par();
let ar_orders: Vec<usize> = (0..par.n_hydros()).map(|h| par.order(h)).collect();
indexer.set_nonzero_mask(&ar_orders);
}
let initial_state = build_initial_state(system, &indexer);
let n_stages = stage_templates_ref.len();
let max_iterations = max_iterations_from_rules(&stopping_rule_set);
let fcf_capacity_iterations = max_iterations.saturating_add(1);
let fcf = FutureCostFunction::new(
n_stages,
indexer.n_state,
forward_passes,
fcf_capacity_iterations,
0,
);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures: Vec<RiskMeasure> = system
.stages()
.iter()
.filter(|s| s.id >= 0)
.map(|s| RiskMeasure::from(s.risk_config))
.collect();
let entity_counts = build_entity_counts(system);
let ncs_entity_ids_per_stage: Vec<Vec<i32>> = stage_templates
.active_ncs_indices
.iter()
.map(|stage_indices| {
stage_indices
.iter()
.map(|&sys_idx| entity_counts.non_controllable_ids[sys_idx])
.collect()
})
.collect();
let ncs_max_gen: Vec<f64> = {
let stoch_ncs_ids = stochastic.ncs_entity_ids();
let mut result = Vec::with_capacity(stoch_ncs_ids.len());
for ncs_id in stoch_ncs_ids {
let max_gen = system
.non_controllable_sources()
.iter()
.find(|n| n.id == *ncs_id)
.map(|n| n.max_generation_mw)
.ok_or_else(|| {
SddpError::Validation(format!(
"stochastic NCS entity {ncs_id:?} not found in system non_controllable_sources"
))
})?;
result.push(max_gen);
}
result
};
let block_counts_per_stage: Vec<usize> = stage_templates
.block_hours_per_stage
.iter()
.map(Vec::len)
.collect();
let max_blocks = block_counts_per_stage.iter().copied().max().unwrap_or(0);
Ok(Self {
stage_templates,
stochastic,
indexer,
fcf,
initial_state,
horizon,
risk_measures,
entity_counts,
hydro_models,
ncs_entity_ids_per_stage,
ncs_max_gen,
block_counts_per_stage,
max_blocks,
scaling_report,
seed,
forward_passes,
max_iterations,
start_iteration: 0,
n_scenarios,
io_channel_capacity,
policy_path,
inflow_method,
cut_selection,
cut_activity_tolerance,
stopping_rule_set,
})
}
#[must_use]
pub fn templates_full(&self) -> &StageTemplates {
&self.stage_templates
}
#[must_use]
pub fn scaling_report(&self) -> &crate::scaling_report::ScalingReport {
&self.scaling_report
}
#[must_use]
pub fn stage_templates(&self) -> &[cobre_solver::StageTemplate] {
&self.stage_templates.templates
}
#[must_use]
pub fn base_rows(&self) -> &[usize] {
&self.stage_templates.base_rows
}
#[must_use]
pub fn noise_scale(&self) -> &[f64] {
&self.stage_templates.noise_scale
}
#[must_use]
pub fn stochastic(&self) -> &StochasticContext {
&self.stochastic
}
#[must_use]
pub fn indexer(&self) -> &StageIndexer {
&self.indexer
}
#[must_use]
pub fn fcf(&self) -> &FutureCostFunction {
&self.fcf
}
#[must_use]
pub fn fcf_mut(&mut self) -> &mut FutureCostFunction {
&mut self.fcf
}
pub fn replace_fcf(&mut self, fcf: FutureCostFunction) {
self.fcf = fcf;
}
#[must_use]
pub fn num_stages(&self) -> usize {
self.stage_templates.templates.len()
}
#[must_use]
pub fn initial_state(&self) -> &[f64] {
&self.initial_state
}
#[must_use]
pub fn horizon(&self) -> &HorizonMode {
&self.horizon
}
#[must_use]
pub fn risk_measures(&self) -> &[RiskMeasure] {
&self.risk_measures
}
#[must_use]
pub fn entity_counts(&self) -> &EntityCounts {
&self.entity_counts
}
#[must_use]
pub fn hydro_models(&self) -> &PrepareHydroModelsResult {
&self.hydro_models
}
#[must_use]
pub fn block_counts_per_stage(&self) -> &[usize] {
&self.block_counts_per_stage
}
#[must_use]
pub fn max_blocks(&self) -> usize {
self.max_blocks
}
#[must_use]
pub fn seed(&self) -> u64 {
self.seed
}
#[must_use]
pub fn forward_passes(&self) -> u32 {
self.forward_passes
}
#[must_use]
pub fn max_iterations(&self) -> u64 {
self.max_iterations
}
pub fn set_start_iteration(&mut self, iteration: u64) {
self.start_iteration = iteration;
}
#[must_use]
pub fn n_scenarios(&self) -> u32 {
self.n_scenarios
}
#[must_use]
pub fn io_channel_capacity(&self) -> usize {
self.io_channel_capacity
}
#[must_use]
pub fn policy_path(&self) -> &str {
&self.policy_path
}
#[must_use]
pub fn inflow_method(&self) -> &InflowNonNegativityMethod {
&self.inflow_method
}
#[must_use]
pub fn cut_selection(&self) -> Option<&CutSelectionStrategy> {
self.cut_selection.as_ref()
}
#[must_use]
pub fn cut_activity_tolerance(&self) -> f64 {
self.cut_activity_tolerance
}
#[must_use]
pub fn stopping_rule_set(&self) -> &StoppingRuleSet {
&self.stopping_rule_set
}
#[must_use]
pub fn stage_ctx(&self) -> StageContext<'_> {
StageContext {
templates: &self.stage_templates.templates,
base_rows: &self.stage_templates.base_rows,
noise_scale: &self.stage_templates.noise_scale,
n_hydros: self.stage_templates.n_hydros,
n_load_buses: self.stage_templates.n_load_buses,
load_balance_row_starts: &self.stage_templates.load_balance_row_starts,
load_bus_indices: &self.stage_templates.load_bus_indices,
block_counts_per_stage: &self.block_counts_per_stage,
ncs_max_gen: &self.ncs_max_gen,
discount_factors: &self.stage_templates.discount_factors,
cumulative_discount_factors: &self.stage_templates.cumulative_discount_factors,
}
}
#[must_use]
pub fn training_ctx(&self) -> TrainingContext<'_> {
TrainingContext {
horizon: &self.horizon,
indexer: &self.indexer,
inflow_method: &self.inflow_method,
stochastic: &self.stochastic,
initial_state: &self.initial_state,
}
}
pub fn train<S: SolverInterface + Send, C: Communicator>(
&mut self,
solver: &mut S,
comm: &C,
n_threads: usize,
solver_factory: impl Fn() -> Result<S, SolverError>,
event_sender: Option<Sender<TrainingEvent>>,
shutdown_flag: Option<&Arc<AtomicBool>>,
) -> Result<TrainingOutcome, SddpError> {
let training_config = TrainingConfig {
forward_passes: self.forward_passes,
max_iterations: self.max_iterations,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender,
cut_activity_tolerance: self.cut_activity_tolerance,
n_fwd_threads: n_threads,
max_blocks: self.max_blocks,
cut_selection: self.cut_selection.clone(),
shutdown_flag: shutdown_flag.map(Arc::clone),
start_iteration: self.start_iteration,
};
let stage_ctx = StageContext {
templates: &self.stage_templates.templates,
base_rows: &self.stage_templates.base_rows,
noise_scale: &self.stage_templates.noise_scale,
n_hydros: self.stage_templates.n_hydros,
n_load_buses: self.stage_templates.n_load_buses,
load_balance_row_starts: &self.stage_templates.load_balance_row_starts,
load_bus_indices: &self.stage_templates.load_bus_indices,
block_counts_per_stage: &self.block_counts_per_stage,
ncs_max_gen: &self.ncs_max_gen,
discount_factors: &self.stage_templates.discount_factors,
cumulative_discount_factors: &self.stage_templates.cumulative_discount_factors,
};
let training_ctx = TrainingContext {
horizon: &self.horizon,
indexer: &self.indexer,
inflow_method: &self.inflow_method,
stochastic: &self.stochastic,
initial_state: &self.initial_state,
};
crate::train(
solver,
training_config,
&mut self.fcf,
&stage_ctx,
&training_ctx,
self.stochastic.opening_tree(),
&self.risk_measures,
self.stopping_rule_set.clone(),
comm,
solver_factory,
)
}
pub fn simulate<S: SolverInterface + Send, C: Communicator>(
&self,
workspaces: &mut [SolverWorkspace<S>],
comm: &C,
result_tx: &SyncSender<SimulationScenarioResult>,
event_sender: Option<Sender<TrainingEvent>>,
stage_bases: &[Option<cobre_solver::Basis>],
) -> Result<crate::SimulationRunResult, SimulationError> {
let stage_ctx = self.stage_ctx();
let training_ctx = self.training_ctx();
let sim_config = self.simulation_config();
let output = SimulationOutputSpec {
result_tx,
zeta_per_stage: &self.stage_templates.zeta_per_stage,
block_hours_per_stage: &self.stage_templates.block_hours_per_stage,
entity_counts: &self.entity_counts,
generic_constraint_row_entries: &self.stage_templates.generic_constraint_row_entries,
ncs_col_starts: &self.stage_templates.ncs_col_starts,
n_ncs_per_stage: &self.stage_templates.n_ncs_per_stage,
ncs_entity_ids_per_stage: &self.ncs_entity_ids_per_stage,
diversion_upstream: &self.stage_templates.diversion_upstream,
hydro_productivities_per_stage: &self.stage_templates.hydro_productivities_per_stage,
event_sender,
};
crate::simulate(
workspaces,
&stage_ctx,
&self.fcf,
&training_ctx,
&sim_config,
output,
stage_bases,
comm,
)
}
#[must_use]
pub fn build_training_output(
&self,
result: &TrainingResult,
events: &[TrainingEvent],
) -> cobre_io::TrainingOutput {
crate::build_training_output(result, events, &self.fcf)
}
pub fn create_workspace_pool<S: SolverInterface + Send>(
&self,
n_threads: usize,
solver_factory: impl Fn() -> Result<S, SolverError>,
) -> Result<WorkspacePool<S>, SolverError> {
WorkspacePool::try_new(
n_threads,
self.indexer.hydro_count,
self.indexer.max_par_order,
self.indexer.n_state,
self.stage_templates.n_load_buses,
self.max_blocks,
solver_factory,
)
}
#[must_use]
pub fn simulation_config(&self) -> SimulationConfig {
SimulationConfig {
n_scenarios: self.n_scenarios,
io_channel_capacity: self.io_channel_capacity,
}
}
}
fn max_iterations_from_rules(rules: &StoppingRuleSet) -> u64 {
rules
.rules
.iter()
.filter_map(|r| {
if let StoppingRule::IterationLimit { limit } = r {
Some(*limit)
} else {
None
}
})
.max()
.unwrap_or(DEFAULT_MAX_ITERATIONS)
}
fn build_entity_counts(system: &System) -> EntityCounts {
EntityCounts {
hydro_ids: system.hydros().iter().map(|h| h.id.0).collect(),
hydro_productivities: system
.hydros()
.iter()
.map(|h| match &h.generation_model {
HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s,
}
| HydroGenerationModel::LinearizedHead {
productivity_mw_per_m3s,
} => *productivity_mw_per_m3s,
HydroGenerationModel::Fpha => 0.0,
})
.collect(),
thermal_ids: system.thermals().iter().map(|t| t.id.0).collect(),
line_ids: system.lines().iter().map(|l| l.id.0).collect(),
bus_ids: system.buses().iter().map(|b| b.id.0).collect(),
pumping_station_ids: system.pumping_stations().iter().map(|p| p.id.0).collect(),
contract_ids: system.contracts().iter().map(|c| c.id.0).collect(),
non_controllable_ids: system
.non_controllable_sources()
.iter()
.map(|n| n.id.0)
.collect(),
}
}
fn build_initial_state(system: &System, indexer: &StageIndexer) -> Vec<f64> {
let mut state = vec![0.0_f64; indexer.n_state];
let hydros = system.hydros();
let ic = system.initial_conditions();
for hs in &ic.storage {
if let Ok(idx) = hydros.binary_search_by_key(&hs.hydro_id.0, |h| h.id.0) {
state[idx] = hs.value_hm3;
}
}
if indexer.max_par_order > 0 {
let n_h = indexer.hydro_count;
for pi in &ic.past_inflows {
if let Ok(idx) = hydros.binary_search_by_key(&pi.hydro_id.0, |h| h.id.0) {
let n_lags = pi.values_m3s.len().min(indexer.max_par_order);
for lag in 0..n_lags {
let slot = indexer.inflow_lags.start + lag * n_h + idx;
state[slot] = pi.values_m3s[lag];
}
}
}
}
state
}
#[derive(Debug)]
pub struct PrepareStochasticResult {
pub system: System,
pub stochastic: StochasticContext,
pub estimation_report: Option<crate::EstimationReport>,
}
fn load_user_opening_tree_inner(
case_dir: &Path,
system: &System,
) -> Result<Option<OpeningTree>, SddpError> {
let mut ctx = cobre_io::ValidationContext::new();
let manifest = cobre_io::validate_structure(case_dir, &mut ctx);
if !manifest.scenarios_noise_openings_parquet {
return Ok(None);
}
let path = case_dir.join("scenarios").join("noise_openings.parquet");
let rows = cobre_io::scenarios::load_noise_openings(Some(&path))?;
let n_hydros = system.hydros().len();
let mut load_bus_ids: Vec<EntityId> = system
.load_models()
.iter()
.filter(|m| m.std_mw > 0.0)
.map(|m| m.bus_id)
.collect();
load_bus_ids.sort_unstable_by_key(|id| id.0);
load_bus_ids.dedup();
let n_load_buses = load_bus_ids.len();
let expected_dim = n_hydros + n_load_buses;
let expected_stages = system.stages().iter().filter(|s| s.id >= 0).count();
let mut openings_by_stage: BTreeMap<i32, BTreeSet<u32>> = BTreeMap::new();
for row in &rows {
openings_by_stage
.entry(row.stage_id)
.or_default()
.insert(row.opening_index);
}
let openings_per_stage: Vec<usize> = openings_by_stage.values().map(BTreeSet::len).collect();
cobre_io::scenarios::validate_noise_openings(
&rows,
expected_dim,
expected_stages,
&openings_per_stage,
)?;
let tree = cobre_io::scenarios::assemble_opening_tree(rows, expected_dim);
Ok(Some(tree))
}
fn build_ncs_factor_entries(
system: &System,
) -> Vec<(
cobre_core::EntityId,
i32,
Vec<cobre_stochastic::normal::precompute::BlockFactorPair>,
)> {
use cobre_stochastic::normal::precompute::BlockFactorPair;
use std::collections::BTreeSet;
let stochastic_ncs: BTreeSet<cobre_core::EntityId> =
system.ncs_models().iter().map(|m| m.ncs_id).collect();
if stochastic_ncs.is_empty() {
return Vec::new();
}
let study_stages: Vec<_> = system.stages().iter().filter(|s| s.id >= 0).collect();
let ncs_ids: Vec<cobre_core::EntityId> = system
.non_controllable_sources()
.iter()
.map(|n| n.id)
.collect();
let mut entries = Vec::new();
for (ncs_idx, ncs_id) in ncs_ids.iter().enumerate() {
if !stochastic_ncs.contains(ncs_id) {
continue;
}
for (stage_idx, stage) in study_stages.iter().enumerate() {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let block_pairs: Vec<BlockFactorPair> = stage
.blocks
.iter()
.enumerate()
.map(|(block_idx, _)| {
let factor = system
.resolved_ncs_factors()
.factor(ncs_idx, stage_idx, block_idx);
(block_idx as i32, factor)
})
.collect();
entries.push((*ncs_id, stage.id, block_pairs));
}
}
entries
}
fn load_load_factors_for_stochastic(
case_dir: &Path,
) -> Result<Vec<cobre_io::scenarios::LoadFactorEntry>, SddpError> {
let path = case_dir.join("scenarios").join("load_factors.json");
if !path.exists() {
return Ok(Vec::new());
}
cobre_io::scenarios::parse_load_factors(&path).map_err(SddpError::from)
}
pub fn prepare_stochastic(
system: System,
case_dir: &Path,
config: &cobre_io::Config,
seed: u64,
) -> Result<PrepareStochasticResult, SddpError> {
let (system, estimation_report) =
crate::estimation::estimate_from_history(system, case_dir, config)?;
let user_opening_tree = load_user_opening_tree_inner(case_dir, &system)?;
let load_factor_entries = load_load_factors_for_stochastic(case_dir)?;
let block_pairs: Vec<Vec<cobre_stochastic::normal::precompute::BlockFactorPair>> =
load_factor_entries
.iter()
.map(|e| {
e.block_factors
.iter()
.map(|bf| (bf.block_id, bf.factor))
.collect()
})
.collect();
let entity_factor_entries: Vec<cobre_stochastic::normal::precompute::EntityFactorEntry<'_>> =
load_factor_entries
.iter()
.zip(block_pairs.iter())
.map(|(e, pairs)| (e.bus_id, e.stage_id, pairs.as_slice()))
.collect();
let ncs_factor_entries = build_ncs_factor_entries(&system);
let ncs_entity_factor_entries: Vec<
cobre_stochastic::normal::precompute::EntityFactorEntry<'_>,
> = ncs_factor_entries
.iter()
.map(|(ncs_id, stage_id, pairs)| (*ncs_id, *stage_id, pairs.as_slice()))
.collect();
let stochastic = cobre_stochastic::build_stochastic_context(
&system,
seed,
&entity_factor_entries,
&ncs_entity_factor_entries,
user_opening_tree,
)?;
Ok(PrepareStochasticResult {
system,
stochastic,
estimation_report,
})
}
#[cfg(test)]
mod tests {
use super::StudySetup;
use crate::StageIndexer;
use crate::hydro_models::PrepareHydroModelsResult;
use cobre_core::{
BoundsCountsSpec, BoundsDefaults, BusStagePenalties, ContractStageBounds, HydroStageBounds,
HydroStagePenalties, LineStageBounds, LineStagePenalties, NcsStagePenalties,
PenaltiesCountsSpec, PenaltiesDefaults, PumpingStageBounds, ResolvedBounds,
ResolvedPenalties, ThermalStageBounds,
};
use cobre_core::{
EntityId, SystemBuilder,
entities::{
bus::{Bus, DeficitSegment},
hydro::{Hydro, HydroGenerationModel, HydroPenalties},
thermal::{Thermal, ThermalCostSegment},
},
scenario::{InflowModel, LoadModel},
temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
},
};
use cobre_io::config::{
Config, CutSelectionConfig, EstimationConfig, ExportsConfig, InflowNonNegativityConfig,
ModelingConfig, PolicyConfig, SimulationConfig as IoSimulationConfig, StoppingRuleConfig,
TrainingConfig, TrainingSolverConfig, UpperBoundEvaluationConfig,
};
use cobre_stochastic::build_stochastic_context;
#[allow(
clippy::too_many_lines,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::items_after_statements
)]
fn minimal_system(n_stages: usize) -> cobre_core::System {
use chrono::NaiveDate;
let bus = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 500.0,
}],
excess_cost: 0.0,
};
let thermal = Thermal {
id: EntityId(2),
name: "T1".to_string(),
bus_id: EntityId(1),
min_generation_mw: 0.0,
max_generation_mw: 100.0,
cost_segments: vec![ThermalCostSegment {
capacity_mw: 100.0,
cost_per_mwh: 50.0,
}],
gnl_config: None,
entry_stage_id: None,
exit_stage_id: None,
};
let hydro = Hydro {
id: EntityId(3),
name: "H1".to_string(),
bus_id: EntityId(1),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 200.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 2.5,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 250.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.01,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let stages: Vec<Stage> = (0..n_stages)
.map(|i| Stage {
index: i,
id: i as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: None,
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
})
.collect();
let inflow_models: Vec<InflowModel> = (0..n_stages)
.map(|i| InflowModel {
hydro_id: EntityId(3),
stage_id: i as i32,
mean_m3s: 80.0,
std_m3s: 20.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
})
.collect();
let load_models: Vec<LoadModel> = (0..n_stages)
.map(|i| LoadModel {
bus_id: EntityId(1),
stage_id: i as i32,
mean_mw: 100.0,
std_mw: 0.0,
})
.collect();
let n_st = n_stages.max(1);
fn default_hydro_bounds() -> HydroStageBounds {
HydroStageBounds {
min_storage_hm3: 0.0,
max_storage_hm3: 200.0,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
min_generation_mw: 0.0,
max_generation_mw: 250.0,
max_diversion_m3s: None,
filling_inflow_m3s: 0.0,
water_withdrawal_m3s: 0.0,
}
}
fn default_hydro_penalties() -> HydroStagePenalties {
HydroStagePenalties {
spillage_cost: 0.01,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 500.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
}
}
let bounds = ResolvedBounds::new(
&BoundsCountsSpec {
n_hydros: 1,
n_thermals: 1,
n_lines: 0,
n_pumping: 0,
n_contracts: 0,
n_stages: n_st,
},
&BoundsDefaults {
hydro: default_hydro_bounds(),
thermal: ThermalStageBounds {
min_generation_mw: 0.0,
max_generation_mw: 100.0,
},
line: LineStageBounds {
direct_mw: 0.0,
reverse_mw: 0.0,
},
pumping: PumpingStageBounds {
min_flow_m3s: 0.0,
max_flow_m3s: 0.0,
},
contract: ContractStageBounds {
min_mw: 0.0,
max_mw: 0.0,
price_per_mwh: 0.0,
},
},
);
let penalties = ResolvedPenalties::new(
&PenaltiesCountsSpec {
n_hydros: 1,
n_buses: 1,
n_lines: 0,
n_ncs: 0,
n_stages: n_st,
},
&PenaltiesDefaults {
hydro: default_hydro_penalties(),
bus: BusStagePenalties { excess_cost: 0.0 },
line: LineStagePenalties { exchange_cost: 0.0 },
ncs: NcsStagePenalties {
curtailment_cost: 0.0,
},
},
);
SystemBuilder::new()
.buses(vec![bus])
.thermals(vec![thermal])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.load_models(load_models)
.bounds(bounds)
.penalties(penalties)
.build()
.expect("minimal_system: valid")
}
fn minimal_config(forward_passes: u32, max_iterations: u32) -> Config {
Config {
schema: None,
modeling: ModelingConfig {
inflow_non_negativity: InflowNonNegativityConfig {
method: "penalty".to_string(),
penalty_cost: 1000.0,
},
},
training: TrainingConfig {
enabled: true,
seed: Some(42),
forward_passes: Some(forward_passes),
stopping_rules: Some(vec![StoppingRuleConfig::IterationLimit {
limit: max_iterations,
}]),
stopping_mode: "any".to_string(),
cut_formulation: None,
forward_pass: None,
cut_selection: CutSelectionConfig::default(),
solver: TrainingSolverConfig::default(),
},
upper_bound_evaluation: UpperBoundEvaluationConfig::default(),
policy: PolicyConfig::default(),
simulation: IoSimulationConfig::default(),
exports: ExportsConfig::default(),
estimation: EstimationConfig::default(),
}
}
#[test]
fn new_minimal_valid_system_returns_ok() {
let system = minimal_system(2);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let result = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
);
assert!(result.is_ok(), "expected Ok, got {result:?}");
let setup = result.unwrap();
assert!(!setup.stage_templates().is_empty());
}
#[test]
fn new_zero_stages_returns_validation_error() {
let system = minimal_system(0);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let result = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
);
assert!(result.is_err(), "expected Err, got Ok");
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("no study stages"),
"error message should contain 'no study stages': {msg}"
);
}
#[test]
fn accessor_methods_return_expected_values() {
let n_stages = 3;
let system = minimal_system(n_stages);
let config = minimal_config(2, 50);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
assert_eq!(setup.stage_templates().len(), n_stages);
assert_eq!(setup.base_rows().len(), n_stages);
assert_eq!(setup.seed(), 42);
assert_eq!(setup.forward_passes(), 2);
assert_eq!(setup.max_iterations(), 50);
assert_eq!(setup.n_scenarios(), 0); assert_eq!(setup.policy_path(), "./policy");
assert_eq!(setup.block_counts_per_stage().len(), n_stages);
assert!(setup.max_blocks() > 0);
assert_eq!(setup.horizon().num_stages(), n_stages);
assert_eq!(setup.risk_measures().len(), n_stages);
assert_eq!(setup.fcf().pools.len(), n_stages);
assert_eq!(setup.entity_counts().hydro_ids.len(), 1);
assert_eq!(setup.entity_counts().thermal_ids.len(), 1);
}
#[test]
fn fcf_mut_allows_cut_insertion() {
let system = minimal_system(2);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let mut setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let n_state = setup.indexer().n_state;
let coefficients = vec![1.0_f64; n_state];
setup.fcf_mut().add_cut(0, 0, 0, 42.0, &coefficients);
assert_eq!(setup.fcf().total_active_cuts(), 1);
}
#[test]
fn inflow_method_reflects_config() {
use crate::InflowNonNegativityMethod;
let system = minimal_system(2);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
assert!(
!matches!(setup.inflow_method(), InflowNonNegativityMethod::None),
"expected penalty or truncation method"
);
}
#[test]
fn cut_selection_none_when_disabled() {
let system = minimal_system(2);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
assert!(
setup.cut_selection().is_none(),
"cut_selection should be None when disabled"
);
}
#[test]
fn stage_ctx_fields_match_study_setup() {
let n_stages = 3;
let system = minimal_system(n_stages);
let config = minimal_config(2, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let ctx = setup.stage_ctx();
assert_eq!(
ctx.templates.len(),
setup.stage_templates().len(),
"templates length mismatch"
);
assert_eq!(
ctx.base_rows.len(),
setup.base_rows().len(),
"base_rows length mismatch"
);
assert_eq!(
ctx.noise_scale.len(),
setup.noise_scale().len(),
"noise_scale length mismatch"
);
assert_eq!(
ctx.n_hydros,
setup.entity_counts().hydro_ids.len(),
"n_hydros mismatch"
);
assert_eq!(
ctx.block_counts_per_stage.len(),
setup.block_counts_per_stage().len(),
"block_counts_per_stage length mismatch"
);
}
#[test]
fn training_ctx_fields_match_study_setup() {
let n_stages = 3;
let system = minimal_system(n_stages);
let config = minimal_config(2, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let ctx = setup.training_ctx();
assert_eq!(
ctx.horizon.num_stages(),
setup.horizon().num_stages(),
"horizon num_stages mismatch"
);
assert_eq!(
ctx.indexer.n_state,
setup.indexer().n_state,
"indexer n_state mismatch"
);
assert_eq!(
ctx.initial_state.len(),
setup.initial_state().len(),
"initial_state length mismatch"
);
}
#[test]
fn train_completes_within_iteration_limit() {
use cobre_comm::LocalBackend;
use cobre_solver::highs::HighsSolver;
let system = minimal_system(2);
let config = minimal_config(1, 3);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let mut setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let comm = LocalBackend;
let mut solver = HighsSolver::new().expect("solver");
let result = setup
.train(&mut solver, &comm, 1, HighsSolver::new, None, None)
.expect("train");
assert!(
result.result.iterations <= 3,
"expected iterations <= 3, got {}",
result.result.iterations
);
assert!(
result.result.iterations >= 1,
"expected at least 1 iteration, got {}",
result.result.iterations
);
}
#[test]
fn train_generates_cuts_in_fcf() {
use cobre_comm::LocalBackend;
use cobre_solver::highs::HighsSolver;
let system = minimal_system(2);
let config = minimal_config(1, 3);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let mut setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let comm = LocalBackend;
let mut solver = HighsSolver::new().expect("solver");
setup
.train(&mut solver, &comm, 1, HighsSolver::new, None, None)
.expect("train");
assert!(
setup.fcf().pools[0].populated_count > 0,
"expected at least one cut in FCF pool[0] after training"
);
}
#[test]
fn simulation_config_reflects_setup_fields() {
use cobre_io::config::SimulationConfig as IoSimulationConfig;
let mut config = minimal_config(1, 5);
config.simulation = IoSimulationConfig {
enabled: true,
num_scenarios: 50,
io_channel_capacity: 16,
..IoSimulationConfig::default()
};
let system = minimal_system(2);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let sim_cfg = setup.simulation_config();
assert_eq!(sim_cfg.n_scenarios, setup.n_scenarios());
assert_eq!(sim_cfg.io_channel_capacity, setup.io_channel_capacity());
}
#[test]
fn create_workspace_pool_returns_correct_size() {
use cobre_solver::highs::HighsSolver;
let system = minimal_system(2);
let config = minimal_config(1, 3);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let pool = setup
.create_workspace_pool(2, HighsSolver::new)
.expect("workspace pool");
assert_eq!(pool.workspaces.len(), 2);
}
#[test]
fn build_training_output_non_empty() {
use cobre_comm::LocalBackend;
use cobre_solver::highs::HighsSolver;
let system = minimal_system(2);
let config = minimal_config(1, 2);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let mut setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let comm = LocalBackend;
let mut solver = HighsSolver::new().expect("solver");
let (event_tx, event_rx) = std::sync::mpsc::channel();
let result = setup
.train(
&mut solver,
&comm,
1,
HighsSolver::new,
Some(event_tx),
None,
)
.expect("train");
let events: Vec<cobre_core::TrainingEvent> = event_rx.try_iter().collect();
let output = setup.build_training_output(&result.result, &events);
assert!(
!output.convergence_records.is_empty(),
"convergence_records must be non-empty after training"
);
}
#[test]
fn simulate_after_train_returns_nonempty_costs() {
use cobre_comm::LocalBackend;
use cobre_solver::highs::HighsSolver;
let mut config = minimal_config(1, 3);
config.simulation = cobre_io::config::SimulationConfig {
enabled: true,
num_scenarios: 3,
io_channel_capacity: 8,
..cobre_io::config::SimulationConfig::default()
};
let system = minimal_system(2);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let mut setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup");
let comm = LocalBackend;
let mut solver = HighsSolver::new().expect("solver");
setup
.train(&mut solver, &comm, 1, HighsSolver::new, None, None)
.expect("train");
let mut pool = setup
.create_workspace_pool(1, HighsSolver::new)
.expect("sim pool");
let io_capacity = setup.io_channel_capacity().max(1);
let (result_tx, result_rx) = std::sync::mpsc::sync_channel(io_capacity);
let drain_handle = std::thread::spawn(move || result_rx.into_iter().collect::<Vec<_>>());
let sim_result = setup
.simulate(&mut pool.workspaces, &comm, &result_tx, None, &[])
.expect("simulate");
drop(result_tx);
let _results = drain_handle.join().expect("drain thread");
assert!(
!sim_result.costs.is_empty(),
"simulate must return at least one cost entry"
);
assert_eq!(
sim_result.solver_stats.len(),
sim_result.costs.len(),
"one solver stats entry per scenario"
);
}
#[test]
fn study_params_from_config_defaults() {
use super::{DEFAULT_FORWARD_PASSES, DEFAULT_SEED, StudyParams};
use crate::stopping_rule::StoppingMode;
use cobre_io::config::{
Config, CutSelectionConfig, EstimationConfig, ExportsConfig, InflowNonNegativityConfig,
ModelingConfig, PolicyConfig, SimulationConfig as IoSimulationConfig, TrainingConfig,
TrainingSolverConfig, UpperBoundEvaluationConfig,
};
let config = Config {
schema: None,
modeling: ModelingConfig {
inflow_non_negativity: InflowNonNegativityConfig {
method: "none".to_string(),
penalty_cost: 0.0,
},
},
training: TrainingConfig {
enabled: true,
seed: None,
forward_passes: None,
stopping_rules: None,
stopping_mode: "any".to_string(),
cut_formulation: None,
forward_pass: None,
cut_selection: CutSelectionConfig::default(),
solver: TrainingSolverConfig::default(),
},
upper_bound_evaluation: UpperBoundEvaluationConfig::default(),
policy: PolicyConfig::default(),
simulation: IoSimulationConfig::default(),
exports: ExportsConfig::default(),
estimation: EstimationConfig::default(),
};
let params = StudyParams::from_config(&config).expect("from_config");
assert_eq!(
params.seed, DEFAULT_SEED,
"seed should default to DEFAULT_SEED"
);
assert_eq!(
params.forward_passes, DEFAULT_FORWARD_PASSES,
"forward_passes should default to DEFAULT_FORWARD_PASSES"
);
assert_eq!(
params.stopping_rule_set.rules.len(),
1,
"expected exactly 1 default stopping rule"
);
assert!(
matches!(
params.stopping_rule_set.rules[0],
crate::stopping_rule::StoppingRule::IterationLimit { .. }
),
"default rule should be IterationLimit"
);
assert!(
matches!(params.stopping_rule_set.mode, StoppingMode::Any),
"default stopping mode should be Any"
);
assert_eq!(
params.n_scenarios, 0,
"n_scenarios should be 0 when simulation disabled"
);
assert!(
params.cut_selection.is_none(),
"cut_selection should be None by default"
);
}
#[test]
fn study_params_from_config_explicit() {
use super::StudyParams;
use crate::stopping_rule::{StoppingMode, StoppingRule};
use cobre_io::config::{
Config, CutSelectionConfig, EstimationConfig, ExportsConfig, InflowNonNegativityConfig,
ModelingConfig, PolicyConfig, SimulationConfig as IoSimulationConfig,
StoppingRuleConfig, TrainingConfig, TrainingSolverConfig, UpperBoundEvaluationConfig,
};
let config = Config {
schema: None,
modeling: ModelingConfig {
inflow_non_negativity: InflowNonNegativityConfig {
method: "penalty".to_string(),
penalty_cost: 999.0,
},
},
training: TrainingConfig {
enabled: true,
seed: Some(1234),
forward_passes: Some(5),
stopping_rules: Some(vec![
StoppingRuleConfig::IterationLimit { limit: 50 },
StoppingRuleConfig::TimeLimit { seconds: 60.0 },
]),
stopping_mode: "all".to_string(),
cut_formulation: None,
forward_pass: None,
cut_selection: CutSelectionConfig::default(),
solver: TrainingSolverConfig::default(),
},
upper_bound_evaluation: UpperBoundEvaluationConfig::default(),
policy: PolicyConfig {
path: "./my_policy".to_string(),
..PolicyConfig::default()
},
simulation: IoSimulationConfig {
enabled: true,
num_scenarios: 200,
..IoSimulationConfig::default()
},
exports: ExportsConfig::default(),
estimation: EstimationConfig::default(),
};
let params = StudyParams::from_config(&config).expect("from_config");
assert_eq!(params.seed, 1234, "seed mismatch");
assert_eq!(params.forward_passes, 5, "forward_passes mismatch");
assert_eq!(
params.stopping_rule_set.rules.len(),
2,
"stopping rule count mismatch"
);
assert!(
matches!(
params.stopping_rule_set.rules[0],
StoppingRule::IterationLimit { limit: 50 }
),
"first rule should be IterationLimit(50)"
);
assert!(
matches!(
params.stopping_rule_set.rules[1],
StoppingRule::TimeLimit { seconds } if (seconds - 60.0).abs() < 1e-9
),
"second rule should be TimeLimit(60.0)"
);
assert!(
matches!(params.stopping_rule_set.mode, StoppingMode::All),
"stopping mode should be All"
);
assert_eq!(params.n_scenarios, 200, "n_scenarios mismatch");
assert_eq!(params.policy_path, "./my_policy", "policy_path mismatch");
}
fn write_minimal_case_dir(root: &std::path::Path) {
use std::fs;
fs::create_dir_all(root.join("system")).unwrap();
fs::write(root.join("config.json"), b"{}").unwrap();
fs::write(root.join("penalties.json"), b"{}").unwrap();
fs::write(root.join("stages.json"), b"{}").unwrap();
fs::write(root.join("initial_conditions.json"), b"{}").unwrap();
fs::write(root.join("system/buses.json"), b"{}").unwrap();
fs::write(root.join("system/lines.json"), b"{}").unwrap();
fs::write(root.join("system/hydros.json"), b"{}").unwrap();
fs::write(root.join("system/thermals.json"), b"{}").unwrap();
}
fn minimal_prepare_config() -> cobre_io::Config {
use cobre_io::config::{
Config, CutSelectionConfig, EstimationConfig, ExportsConfig, InflowNonNegativityConfig,
ModelingConfig, PolicyConfig, SimulationConfig as IoSimulationConfig, TrainingConfig,
TrainingSolverConfig, UpperBoundEvaluationConfig,
};
Config {
schema: None,
modeling: ModelingConfig {
inflow_non_negativity: InflowNonNegativityConfig {
method: "none".to_string(),
penalty_cost: 0.0,
},
},
training: TrainingConfig {
enabled: true,
seed: None,
forward_passes: None,
stopping_rules: None,
stopping_mode: "any".to_string(),
cut_formulation: None,
forward_pass: None,
cut_selection: CutSelectionConfig::default(),
solver: TrainingSolverConfig::default(),
},
upper_bound_evaluation: UpperBoundEvaluationConfig::default(),
policy: PolicyConfig::default(),
simulation: IoSimulationConfig::default(),
exports: ExportsConfig::default(),
estimation: EstimationConfig::default(),
}
}
#[test]
fn prepare_stochastic_no_history_no_tree_returns_none_report_and_generated_provenance() {
use super::prepare_stochastic;
use cobre_stochastic::provenance::ComponentProvenance;
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let root = dir.path();
write_minimal_case_dir(root);
let system = minimal_system(2);
let config = minimal_prepare_config();
let seed = 42_u64;
let result = prepare_stochastic(system, root, &config, seed)
.expect("prepare_stochastic should succeed with no optional files");
assert!(
result.estimation_report.is_none(),
"estimation_report must be None when no inflow_history.parquet is present"
);
assert_eq!(
result.stochastic.provenance().opening_tree,
ComponentProvenance::Generated,
"opening_tree provenance must be Generated when no user tree is supplied"
);
}
#[test]
fn prepare_stochastic_with_stats_file_present_skips_estimation() {
use super::prepare_stochastic;
use std::fs;
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let root = dir.path();
write_minimal_case_dir(root);
fs::create_dir_all(root.join("scenarios")).unwrap();
fs::write(root.join("scenarios/inflow_seasonal_stats.parquet"), b"").unwrap();
fs::write(root.join("scenarios/inflow_ar_coefficients.parquet"), b"").unwrap();
let system = minimal_system(2);
let config = minimal_prepare_config();
let seed = 42_u64;
let result = prepare_stochastic(system, root, &config, seed)
.expect("prepare_stochastic should succeed when stats file is present");
assert!(
result.estimation_report.is_none(),
"estimation_report must be None when inflow_seasonal_stats.parquet is present"
);
}
#[test]
fn prepare_stochastic_no_opening_tree_gives_non_user_supplied_provenance() {
use super::prepare_stochastic;
use cobre_stochastic::provenance::ComponentProvenance;
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let root = dir.path();
write_minimal_case_dir(root);
let system = minimal_system(2);
let config = minimal_prepare_config();
let result = prepare_stochastic(system, root, &config, 0)
.expect("prepare_stochastic must succeed with no opening tree file");
assert_ne!(
result.stochastic.provenance().opening_tree,
ComponentProvenance::UserSupplied,
"opening_tree provenance must not be UserSupplied when file is absent"
);
}
#[test]
fn default_from_system_gives_constant_and_no_evaporation() {
use crate::hydro_models::{
EvaporationModel, ProductionModelSource, ResolvedProductionModel,
};
let system = minimal_system(2);
let result = PrepareHydroModelsResult::default_from_system(&system);
assert_eq!(
result.provenance.production_sources.len(),
system.hydros().len(),
"production_sources length must equal n_hydros"
);
for (_, source) in &result.provenance.production_sources {
assert_eq!(
*source,
ProductionModelSource::DefaultConstant,
"all hydros must use DefaultConstant"
);
}
assert_eq!(
result.provenance.evaporation_sources.len(),
system.hydros().len(),
"evaporation_sources length must equal n_hydros"
);
assert!(
!result.evaporation.has_evaporation(),
"default result must have no evaporation"
);
let model = result.production.model(0, 0);
assert!(
matches!(model, ResolvedProductionModel::ConstantProductivity { .. }),
"default production model must be ConstantProductivity"
);
let evap = result.evaporation.model(0);
assert!(
matches!(evap, EvaporationModel::None),
"default evaporation model must be None"
);
}
#[test]
fn hydro_models_accessor_returns_stored_result() {
use crate::hydro_models::ProductionModelSource;
let system = minimal_system(2);
let config = minimal_config(1, 5);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let hydro_result = PrepareHydroModelsResult::default_from_system(&system);
let setup = StudySetup::new(&system, &config, stochastic, hydro_result).expect("setup");
let models = setup.hydro_models();
assert_eq!(
models.provenance.production_sources.len(),
system.hydros().len(),
"hydro_models() must return the stored result (provenance length mismatch)"
);
for (_, source) in &models.provenance.production_sources {
assert_eq!(
*source,
ProductionModelSource::DefaultConstant,
"stored result must preserve DefaultConstant provenance"
);
}
}
fn indexer_for_lag_test(hydro_count: usize, max_par_order: usize) -> StageIndexer {
StageIndexer::new(hydro_count, max_par_order)
}
#[allow(
clippy::too_many_lines,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::items_after_statements
)]
fn minimal_system_2_hydros_with_past_inflows(
n_stages: usize,
h1_past: Vec<f64>,
h2_past: Vec<f64>,
) -> cobre_core::System {
use chrono::NaiveDate;
let bus = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 500.0,
}],
excess_cost: 0.0,
};
let make_hydro = |id: i32, name: &str| Hydro {
id: EntityId(id),
name: name.to_string(),
bus_id: EntityId(1),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 200.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 2.5,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 250.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.01,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let n_st = n_stages.max(1);
let stages: Vec<Stage> = (0..n_stages)
.map(|i| Stage {
index: i,
id: i as i32,
start_date: NaiveDate::from_ymd_opt(2020, (i % 12 + 1) as u32, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(
if (i % 12 + 1) == 12 { 2021 } else { 2020 },
((i % 12 + 1) % 12 + 1) as u32,
1,
)
.unwrap(),
season_id: Some(i),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: true,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
})
.collect();
let inflow_models: Vec<InflowModel> = (0..n_stages)
.flat_map(|i| {
[1_i32, 2].map(|hid| InflowModel {
hydro_id: EntityId(hid),
stage_id: i as i32,
mean_m3s: 80.0,
std_m3s: 20.0,
ar_coefficients: vec![0.5, 0.3],
residual_std_ratio: 0.8,
})
})
.collect();
let load_models: Vec<LoadModel> = (0..n_stages)
.map(|i| LoadModel {
bus_id: EntityId(1),
stage_id: i as i32,
mean_mw: 100.0,
std_mw: 0.0,
})
.collect();
fn default_hydro_bounds() -> HydroStageBounds {
HydroStageBounds {
min_storage_hm3: 0.0,
max_storage_hm3: 200.0,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
min_generation_mw: 0.0,
max_generation_mw: 250.0,
max_diversion_m3s: None,
filling_inflow_m3s: 0.0,
water_withdrawal_m3s: 0.0,
}
}
fn default_hydro_penalties() -> HydroStagePenalties {
HydroStagePenalties {
spillage_cost: 0.01,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 500.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
}
}
let bounds = ResolvedBounds::new(
&BoundsCountsSpec {
n_hydros: 2,
n_thermals: 0,
n_lines: 0,
n_pumping: 0,
n_contracts: 0,
n_stages: n_st,
},
&BoundsDefaults {
hydro: default_hydro_bounds(),
thermal: ThermalStageBounds {
min_generation_mw: 0.0,
max_generation_mw: 0.0,
},
line: LineStageBounds {
direct_mw: 0.0,
reverse_mw: 0.0,
},
pumping: PumpingStageBounds {
min_flow_m3s: 0.0,
max_flow_m3s: 0.0,
},
contract: ContractStageBounds {
min_mw: 0.0,
max_mw: 0.0,
price_per_mwh: 0.0,
},
},
);
let penalties = ResolvedPenalties::new(
&PenaltiesCountsSpec {
n_hydros: 2,
n_buses: 1,
n_lines: 0,
n_ncs: 0,
n_stages: n_st,
},
&PenaltiesDefaults {
hydro: default_hydro_penalties(),
bus: BusStagePenalties { excess_cost: 0.0 },
line: LineStagePenalties { exchange_cost: 0.0 },
ncs: NcsStagePenalties {
curtailment_cost: 0.0,
},
},
);
let past_inflows = vec![
cobre_core::HydroPastInflows {
hydro_id: EntityId(1),
values_m3s: h1_past,
},
cobre_core::HydroPastInflows {
hydro_id: EntityId(2),
values_m3s: h2_past,
},
];
SystemBuilder::new()
.buses(vec![bus])
.thermals(vec![])
.hydros(vec![make_hydro(1, "H1"), make_hydro(2, "H2")])
.stages(stages)
.inflow_models(inflow_models)
.load_models(load_models)
.bounds(bounds)
.penalties(penalties)
.initial_conditions(cobre_core::InitialConditions {
storage: vec![],
filling_storage: vec![],
past_inflows,
})
.build()
.expect("minimal_system_2_hydros_with_past_inflows: valid")
}
#[test]
fn build_initial_state_populates_lags_from_past_inflows() {
use super::build_initial_state;
let system =
minimal_system_2_hydros_with_past_inflows(1, vec![600.0, 500.0], vec![200.0, 100.0]);
let indexer = indexer_for_lag_test(2, 2);
let state = build_initial_state(&system, &indexer);
let s = indexer.inflow_lags.start;
assert!(
(state[s] - 600.0).abs() < 1e-10,
"lag0 hydro 0: expected 600.0, got {}",
state[s]
);
assert!(
(state[s + 1] - 200.0).abs() < 1e-10,
"lag0 hydro 1: expected 200.0, got {}",
state[s + 1]
);
assert!(
(state[s + 2] - 500.0).abs() < 1e-10,
"lag1 hydro 0: expected 500.0, got {}",
state[s + 2]
);
assert!(
(state[s + 3] - 100.0).abs() < 1e-10,
"lag1 hydro 1: expected 100.0, got {}",
state[s + 3]
);
assert_eq!(
state.len(),
indexer.n_state,
"state length must equal n_state"
);
}
#[test]
fn build_initial_state_empty_past_inflows_leaves_zero_lags() {
use super::build_initial_state;
let system = minimal_system(2);
let indexer = indexer_for_lag_test(1, 3);
let state = build_initial_state(&system, &indexer);
let s = indexer.inflow_lags.start;
for l in 0..3 {
assert!(
state[s + l].abs() < 1e-10,
"lag slot {l} should be 0.0 when past_inflows is empty, got {}",
state[s + l]
);
}
}
#[test]
fn build_initial_state_unknown_hydro_in_past_inflows_stays_zero() {
use super::build_initial_state;
let system = {
minimal_system(2)
};
let indexer = indexer_for_lag_test(1, 2);
let state = build_initial_state(&system, &indexer);
let s = indexer.inflow_lags.start;
assert!(
state[s].abs() < 1e-10,
"lag 0 should be 0.0 when past_inflows is absent, got {}",
state[s]
);
assert!(
state[s + 1].abs() < 1e-10,
"lag 1 should be 0.0 when past_inflows is absent, got {}",
state[s + 1]
);
}
#[test]
fn study_setup_initial_state_has_nonzero_lags_from_past_inflows() {
let system =
minimal_system_2_hydros_with_past_inflows(3, vec![600.0, 500.0], vec![200.0, 100.0]);
let config = minimal_config(1, 10);
let stochastic =
build_stochastic_context(&system, 42, &[], &[], None).expect("stochastic context");
let setup = StudySetup::new(
&system,
&config,
stochastic,
PrepareHydroModelsResult::default_from_system(&system),
)
.expect("setup with past_inflows");
let state = setup.initial_state();
let n_hydros = 2;
let lag_start = n_hydros;
assert!(
(state[lag_start] - 600.0).abs() < 1e-10,
"lag0 hydro 0 should be 600.0 via StudySetup, got {}",
state[lag_start]
);
assert!(
(state[lag_start + 1] - 200.0).abs() < 1e-10,
"lag0 hydro 1 should be 200.0 via StudySetup, got {}",
state[lag_start + 1]
);
assert!(
(state[lag_start + 2] - 500.0).abs() < 1e-10,
"lag1 hydro 0 should be 500.0 via StudySetup, got {}",
state[lag_start + 2]
);
assert!(
(state[lag_start + 3] - 100.0).abs() < 1e-10,
"lag1 hydro 1 should be 100.0 via StudySetup, got {}",
state[lag_start + 3]
);
}
#[test]
fn build_initial_state_no_lags_state_is_storage_only() {
use super::build_initial_state;
let system = minimal_system(2);
let indexer = indexer_for_lag_test(1, 0);
assert_eq!(indexer.n_state, 1);
assert!(
indexer.inflow_lags.is_empty(),
"inflow_lags range should be empty for L=0"
);
let state = build_initial_state(&system, &indexer);
assert_eq!(state.len(), 1, "state length must equal n_state=1");
}
}