use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::mpsc::{Sender, SyncSender};
use std::time::Instant;
use cobre_comm::Communicator;
use cobre_core::{EntityId, TrainingEvent};
use cobre_solver::{Basis, RowBatch, SolverError, SolverInterface};
use cobre_stochastic::sample_forward;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use crate::{
FutureCostFunction,
context::{StageContext, TrainingContext},
forward::{build_cut_row_batch, partition},
lp_builder::COST_SCALE_FACTOR,
noise::{transform_inflow_noise, transform_load_noise, transform_ncs_noise},
simulation::{
config::SimulationConfig,
error::SimulationError,
extraction::EntityCounts,
extraction::{
SolutionView, StageExtractionSpec, accumulate_category_costs, assign_scenarios,
extract_stage_result,
},
types::{ScenarioCategoryCosts, SimulationScenarioResult, SimulationStageResult},
},
solver_stats::SolverStatsDelta,
workspace::SolverWorkspace,
};
const SIMULATION_SEED_OFFSET: u32 = u32::MAX / 2;
type WorkerCosts = Vec<(u32, f64, ScenarioCategoryCosts)>;
type WorkerStats = Vec<(u32, SolverStatsDelta)>;
#[derive(Debug)]
pub struct SimulationRunResult {
pub costs: Vec<(u32, f64, ScenarioCategoryCosts)>,
pub solver_stats: Vec<(u32, SolverStatsDelta)>,
}
pub struct SimulationOutputSpec<'a> {
pub result_tx: &'a SyncSender<SimulationScenarioResult>,
pub zeta_per_stage: &'a [f64],
pub block_hours_per_stage: &'a [Vec<f64>],
pub entity_counts: &'a EntityCounts,
pub generic_constraint_row_entries: &'a [Vec<crate::lp_builder::GenericConstraintRowEntry>],
pub ncs_col_starts: &'a [usize],
pub n_ncs_per_stage: &'a [usize],
pub ncs_entity_ids_per_stage: &'a [Vec<i32>],
pub diversion_upstream: &'a HashMap<EntityId, Vec<usize>>,
pub hydro_productivities_per_stage: &'a [Vec<f64>],
pub event_sender: Option<Sender<TrainingEvent>>,
}
struct ScenarioIds {
scenario_id: u32,
global_scenario: u32,
num_stages: usize,
}
#[allow(clippy::too_many_arguments)]
fn build_row_lower_unscaled<'a>(
template_row_lower: &[f64],
row_scale: &[f64],
load_rhs_buf: &[f64],
scratch_buf: &'a mut Vec<f64>,
n_load_buses: usize,
load_balance_row_start: usize,
n_blks: usize,
load_bus_indices: &[usize],
) -> &'a [f64] {
scratch_buf.clear();
scratch_buf.reserve(template_row_lower.len());
if row_scale.is_empty() {
scratch_buf.extend_from_slice(template_row_lower);
} else {
for (i, &val) in template_row_lower.iter().enumerate() {
let scale = if i < row_scale.len() && row_scale[i] != 0.0 {
row_scale[i]
} else {
1.0
};
scratch_buf.push(val / scale);
}
}
if n_load_buses > 0 && !load_rhs_buf.is_empty() {
let mut rhs_idx = 0;
for &bus_pos in load_bus_indices {
for blk in 0..n_blks {
scratch_buf[load_balance_row_start + bus_pos * n_blks + blk] =
load_rhs_buf[rhs_idx];
rhs_idx += 1;
}
}
}
&scratch_buf[..template_row_lower.len()]
}
struct SimStageIds {
t: usize,
stage_id_u32: u32,
scenario_id: u32,
}
fn apply_ncs_col_bounds<S: SolverInterface>(
solver: &mut S,
scratch: &mut crate::workspace::ScratchBuffers,
ncs_generation_start: usize,
n_stochastic_ncs: usize,
n_blks: usize,
) {
let expected_len = n_stochastic_ncs * n_blks;
if scratch.ncs_col_indices_buf.len() != expected_len {
scratch.ncs_col_indices_buf.clear();
scratch.ncs_col_lower_buf.clear();
for ncs_idx in 0..n_stochastic_ncs {
for blk in 0..n_blks {
scratch
.ncs_col_indices_buf
.push(ncs_generation_start + ncs_idx * n_blks + blk);
scratch.ncs_col_lower_buf.push(0.0);
}
}
}
solver.set_col_bounds(
&scratch.ncs_col_indices_buf,
&scratch.ncs_col_lower_buf,
&scratch.ncs_col_upper_buf,
);
}
#[allow(clippy::too_many_lines)]
fn solve_simulation_stage<S: SolverInterface>(
ws: &mut crate::workspace::SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
cut_batch: &RowBatch,
output: &SimulationOutputSpec<'_>,
ids: &SimStageIds,
warm_basis: Option<&Basis>,
) -> Result<(f64, SimulationStageResult), SimulationError> {
let TrainingContext {
indexer,
stochastic,
..
} = training_ctx;
let t = ids.t;
ws.solver.load_model(&ctx.templates[t]);
ws.solver.add_rows(cut_batch);
ws.patch_buf.fill_forward_patches(
indexer,
&ws.current_state,
&ws.scratch.noise_buf,
ctx.base_rows[t],
&ctx.templates[t].row_scale,
);
if ctx.n_load_buses > 0 {
ws.patch_buf.fill_load_patches(
ctx.load_balance_row_starts[t],
ctx.block_counts_per_stage[t],
&ws.scratch.load_rhs_buf,
ctx.load_bus_indices,
&ctx.templates[t].row_scale,
);
}
ws.patch_buf.fill_z_inflow_patches(
indexer.z_inflow_row_start,
&ws.scratch.z_inflow_rhs_buf,
&ctx.templates[t].row_scale,
);
let pc = ws.patch_buf.forward_patch_count();
ws.solver.set_row_bounds(
&ws.patch_buf.indices[..pc],
&ws.patch_buf.lower[..pc],
&ws.patch_buf.upper[..pc],
);
let n_stochastic_ncs = stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 && !indexer.ncs_generation.is_empty() {
apply_ncs_col_bounds(
&mut ws.solver,
&mut ws.scratch,
indexer.ncs_generation.start,
n_stochastic_ncs,
ctx.block_counts_per_stage[t],
);
}
let view = if let Some(basis) = warm_basis {
ws.solver.solve_with_basis(basis)
} else {
ws.solver.solve()
}
.map_err(|e| match e {
SolverError::Infeasible => SimulationError::LpInfeasible {
scenario_id: ids.scenario_id,
stage_id: ids.stage_id_u32,
solver_message: "LP infeasible".to_string(),
},
other => SimulationError::SolverError {
scenario_id: ids.scenario_id,
stage_id: ids.stage_id_u32,
solver_message: other.to_string(),
},
})?;
let col_scale = &ctx.templates[ids.t].col_scale;
{
let buf = &mut ws.scratch.unscaled_primal;
if col_scale.is_empty() {
buf.clear();
buf.extend_from_slice(view.primal);
} else {
buf.resize(view.primal.len(), 0.0);
for (j, (xp, &d)) in view.primal.iter().zip(col_scale).enumerate() {
buf[j] = d * xp;
}
}
}
let row_scale = &ctx.templates[ids.t].row_scale;
{
let buf = &mut ws.scratch.unscaled_dual;
if row_scale.is_empty() {
buf.clear();
buf.extend_from_slice(view.dual);
} else {
buf.resize(view.dual.len(), 0.0);
for (i, &d) in view.dual.iter().enumerate() {
let scale = if i < row_scale.len() {
row_scale[i]
} else {
1.0
};
buf[i] = d * scale;
}
}
}
let unscaled_primal = std::mem::take(&mut ws.scratch.unscaled_primal);
let unscaled_dual = std::mem::take(&mut ws.scratch.unscaled_dual);
let unscaled_view = cobre_solver::SolutionView {
objective: view.objective,
primal: &unscaled_primal,
dual: &unscaled_dual,
reduced_costs: view.reduced_costs,
iterations: view.iterations,
solve_time_seconds: view.solve_time_seconds,
};
let (immediate_cost, result) = extract_sim_stage_result(
&mut ws.scratch,
ctx,
output,
indexer,
ids,
&unscaled_view,
n_stochastic_ncs,
);
let lag_start = indexer.inflow_lags.start;
let lag_len = indexer.hydro_count * indexer.max_par_order;
ws.scratch.lag_matrix_buf.clear();
ws.scratch
.lag_matrix_buf
.extend_from_slice(&ws.current_state[lag_start..lag_start + lag_len]);
ws.current_state.clear();
ws.current_state
.extend_from_slice(&unscaled_primal[..indexer.n_state]);
crate::noise::shift_lag_state(
&mut ws.current_state,
&ws.scratch.lag_matrix_buf,
&unscaled_primal,
indexer,
);
ws.scratch.unscaled_primal = unscaled_primal;
ws.scratch.unscaled_dual = unscaled_dual;
Ok((immediate_cost, result))
}
fn extract_sim_stage_result(
scratch: &mut crate::workspace::ScratchBuffers,
ctx: &StageContext<'_>,
output: &SimulationOutputSpec<'_>,
indexer: &crate::StageIndexer,
ids: &SimStageIds,
view: &cobre_solver::SolutionView<'_>,
n_stochastic_ncs: usize,
) -> (f64, SimulationStageResult) {
let t = ids.t;
let immediate_cost = (view.objective - view.primal[indexer.theta]) * COST_SCALE_FACTOR;
scratch.inflow_m3s_buf.clear();
for h in 0..ctx.n_hydros {
scratch
.inflow_m3s_buf
.push(view.primal[indexer.z_inflow.start + h]);
}
let blk_hrs = output
.block_hours_per_stage
.get(t)
.map_or(&[][..], |v| v.as_slice());
let (load_row_start, load_n_blks) = if ctx.n_load_buses > 0 {
(
ctx.load_balance_row_starts[t],
ctx.block_counts_per_stage[t],
)
} else {
(0, 0)
};
let row_lower_ref = build_row_lower_unscaled(
&ctx.templates[t].row_lower,
&ctx.templates[t].row_scale,
&scratch.load_rhs_buf,
&mut scratch.row_lower_buf,
ctx.n_load_buses,
load_row_start,
load_n_blks,
ctx.load_bus_indices,
);
let ncs_n = output.n_ncs_per_stage.get(t).copied().unwrap_or(0);
let ncs_col_start = output.ncs_col_starts.get(t).copied().unwrap_or(0);
let stage_n_blks = ctx.block_counts_per_stage.get(t).copied().unwrap_or(0);
let ncs_col_upper: &[f64] = if n_stochastic_ncs > 0
&& n_stochastic_ncs == ncs_n
&& !scratch.ncs_col_upper_buf.is_empty()
{
&scratch.ncs_col_upper_buf
} else if ncs_n > 0 && stage_n_blks > 0 {
let start = ncs_col_start;
let end = start + ncs_n * stage_n_blks;
if end <= ctx.templates[t].col_upper.len() {
&ctx.templates[t].col_upper[start..end]
} else {
&[]
}
} else {
&[]
};
let result = extract_stage_result(
&SolutionView {
primal: view.primal,
dual: view.dual,
objective: view.objective,
objective_coeffs: &ctx.templates[t].objective,
row_lower: row_lower_ref,
},
&StageExtractionSpec {
indexer,
entity_counts: output.entity_counts,
inflow_m3s_per_hydro: &scratch.inflow_m3s_buf,
block_hours: blk_hrs,
generic_constraint_entries: output
.generic_constraint_row_entries
.get(t)
.map_or(&[], Vec::as_slice),
ncs_col_start,
n_ncs: ncs_n,
ncs_entity_ids: output
.ncs_entity_ids_per_stage
.get(t)
.map_or(&[], Vec::as_slice),
ncs_col_upper,
diversion_upstream: output.diversion_upstream,
hydro_productivities: output
.hydro_productivities_per_stage
.get(t)
.map_or(&[], Vec::as_slice),
col_scale: &ctx.templates[t].col_scale,
row_scale: &ctx.templates[t].row_scale,
},
ids.stage_id_u32,
);
(immediate_cost, result)
}
fn process_scenario_stages<S: SolverInterface>(
ws: &mut crate::workspace::SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
cut_batches: &[RowBatch],
output: &SimulationOutputSpec<'_>,
ids: &ScenarioIds,
stage_bases: &[Option<Basis>],
) -> Result<(f64, Vec<SimulationStageResult>), SimulationError> {
let TrainingContext {
indexer,
stochastic,
initial_state,
..
} = training_ctx;
ws.current_state.clear();
ws.current_state.extend_from_slice(initial_state);
let tree_view = stochastic.tree_view();
let base_seed = stochastic.base_seed();
let mut total_cost = 0.0_f64;
let mut stage_results = Vec::with_capacity(ids.num_stages);
#[allow(clippy::needless_range_loop)] for t in 0..ids.num_stages {
#[allow(clippy::cast_possible_truncation)]
let stage_id_u32 = t as u32;
let (_opening_idx, raw_noise) = sample_forward(
&tree_view,
base_seed,
0,
ids.global_scenario,
stage_id_u32,
t,
);
transform_inflow_noise(
raw_noise,
t,
&ws.current_state,
ctx,
training_ctx,
&mut ws.scratch,
);
transform_load_noise(
raw_noise,
ctx.n_hydros,
ctx.n_load_buses,
stochastic,
t,
if ctx.n_load_buses > 0 {
ctx.block_counts_per_stage[t]
} else {
0
},
&mut ws.scratch.load_rhs_buf,
);
let n_stochastic_ncs = stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 {
transform_ncs_noise(
raw_noise,
ctx.n_hydros,
ctx.n_load_buses,
stochastic,
t,
ctx.block_counts_per_stage[t],
ctx.ncs_max_gen,
&mut ws.scratch.ncs_col_upper_buf,
);
}
let (cost, result) = solve_simulation_stage(
ws,
ctx,
training_ctx,
&cut_batches[t],
output,
&SimStageIds {
t,
stage_id_u32,
scenario_id: ids.scenario_id,
},
stage_bases.get(t).and_then(Option::as_ref),
)?;
total_cost += cost;
stage_results.push(result);
}
let _ = indexer; Ok((total_cost, stage_results))
}
fn emit_sim_progress(
sender: Option<&Sender<TrainingEvent>>,
scenario_cost: f64,
solve_time_ms: f64,
lp_solves: u64,
completed: u32,
total: u32,
elapsed_ms: u64,
) {
if let Some(s) = sender {
let _ = s.send(TrainingEvent::SimulationProgress {
scenarios_complete: completed,
scenarios_total: total,
elapsed_ms,
scenario_cost,
solve_time_ms,
lp_solves,
});
}
}
fn dispatch_scenario_result(
output: &SimulationOutputSpec<'_>,
scenario_id: u32,
total_cost: f64,
stage_results: Vec<SimulationStageResult>,
) -> Result<(u32, f64, ScenarioCategoryCosts), SimulationError> {
let mut category_costs = ScenarioCategoryCosts {
resource_cost: 0.0,
recourse_cost: 0.0,
violation_cost: 0.0,
regularization_cost: 0.0,
imputed_cost: 0.0,
};
for sr in &stage_results {
for c in &sr.costs {
accumulate_category_costs(c, &mut category_costs);
}
}
let compact_category = category_costs.clone();
output
.result_tx
.send(SimulationScenarioResult {
scenario_id,
total_cost,
per_category_costs: category_costs,
stages: stage_results,
})
.map_err(|_| SimulationError::ChannelClosed)?;
Ok((scenario_id, total_cost, compact_category))
}
#[allow(
clippy::needless_pass_by_value,
clippy::too_many_lines,
clippy::too_many_arguments
)]
pub fn simulate<S: SolverInterface + Send, C: Communicator>(
workspaces: &mut [SolverWorkspace<S>],
ctx: &StageContext<'_>,
fcf: &FutureCostFunction,
training_ctx: &TrainingContext<'_>,
config: &SimulationConfig,
output: SimulationOutputSpec<'_>,
stage_bases: &[Option<Basis>],
comm: &C,
) -> Result<SimulationRunResult, SimulationError> {
let TrainingContext {
horizon,
indexer,
initial_state,
..
} = training_ctx;
let num_stages = horizon.num_stages();
let rank = comm.rank();
debug_assert_eq!(
ctx.templates.len(),
num_stages,
"templates.len()={} != num_stages={num_stages}",
ctx.templates.len()
);
debug_assert_eq!(
ctx.base_rows.len(),
num_stages,
"base_rows.len()={} != num_stages={num_stages}",
ctx.base_rows.len()
);
debug_assert_eq!(
initial_state.len(),
indexer.n_state,
"initial_state.len()={} != n_state={}",
initial_state.len(),
indexer.n_state
);
let cut_batches: Vec<RowBatch> = (0..num_stages)
.map(|t| build_cut_row_batch(fcf, t, indexer, &ctx.templates[t].col_scale))
.collect();
let scenario_range = assign_scenarios(config.n_scenarios, rank, comm.size());
#[allow(clippy::cast_possible_truncation)]
let local_count = (scenario_range.end - scenario_range.start) as usize;
let scenario_start = scenario_range.start as usize;
let n_workers = workspaces.len().max(1);
let sim_start = Instant::now();
let scenarios_complete = AtomicU32::new(0);
let worker_results: Vec<Result<(WorkerCosts, WorkerStats), SimulationError>> = workspaces
.par_iter_mut()
.enumerate()
.map(|(w, ws)| {
let (start_local, end_local) = partition(local_count, n_workers, w);
let worker_sender: Option<Sender<TrainingEvent>> = output.event_sender.clone();
let n_scenarios = end_local - start_local;
let mut worker_costs = Vec::with_capacity(n_scenarios);
let mut worker_stats = Vec::with_capacity(n_scenarios);
for local_idx in start_local..end_local {
#[allow(clippy::cast_possible_truncation)]
let scenario_id = (scenario_start + local_idx) as u32;
let global_scenario = SIMULATION_SEED_OFFSET.saturating_add(scenario_id);
let stats_before = ws.solver.statistics();
let (total_cost, stage_results) = process_scenario_stages(
ws,
ctx,
training_ctx,
&cut_batches,
&output,
&ScenarioIds {
scenario_id,
global_scenario,
num_stages,
},
stage_bases,
)?;
let stats_after = ws.solver.statistics();
let scenario_delta = SolverStatsDelta::from_snapshots(&stats_before, &stats_after);
let scenario_solve_time_ms = scenario_delta.solve_time_ms;
let scenario_lp_solves = scenario_delta.lp_solves;
worker_stats.push((scenario_id, scenario_delta));
worker_costs.push(dispatch_scenario_result(
&output,
scenario_id,
total_cost,
stage_results,
)?);
let completed = scenarios_complete.fetch_add(1, Ordering::Relaxed) + 1;
#[allow(clippy::cast_possible_truncation)]
emit_sim_progress(
worker_sender.as_ref(),
total_cost,
scenario_solve_time_ms,
scenario_lp_solves,
completed,
config.n_scenarios,
sim_start.elapsed().as_millis() as u64,
);
}
Ok((worker_costs, worker_stats))
})
.collect();
let mut all_costs = Vec::with_capacity(local_count);
let mut all_stats = Vec::with_capacity(local_count);
for result in worker_results {
let (costs, stats) = result?;
all_costs.extend(costs);
all_stats.extend(stats);
}
all_costs.sort_by_key(|&(id, _, _)| id);
all_stats.sort_by_key(|&(id, _)| id);
if let Some(sender) = output.event_sender {
#[allow(clippy::cast_possible_truncation)]
let _ = sender.send(TrainingEvent::SimulationFinished {
scenarios: config.n_scenarios,
output_dir: String::new(),
elapsed_ms: sim_start.elapsed().as_millis() as u64,
});
}
Ok(SimulationRunResult {
costs: all_costs,
solver_stats: all_stats,
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic, clippy::too_many_lines)]
mod tests {
use std::collections::HashMap;
use std::sync::mpsc;
use cobre_comm::{CommData, CommError, Communicator, ReduceOp};
use cobre_solver::{
Basis, LpSolution, RowBatch, SolverError, SolverInterface, SolverStatistics, StageTemplate,
};
use cobre_stochastic::StochasticContext;
use super::{SimulationOutputSpec, simulate};
use crate::{
FutureCostFunction, HorizonMode, InflowNonNegativityMethod, PatchBuffer, StageIndexer,
context::{StageContext, TrainingContext},
simulation::{config::SimulationConfig, error::SimulationError, extraction::EntityCounts},
workspace::{ScratchBuffers, SolverWorkspace},
};
struct StubComm {
rank: usize,
size: usize,
}
impl Communicator for StubComm {
fn allgatherv<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
unreachable!("StubComm allgatherv not used in simulate tests")
}
fn allreduce<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
unreachable!("StubComm allreduce not used in simulate tests")
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
unreachable!("StubComm broadcast not used in simulate tests")
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
self.rank
}
fn size(&self) -> usize {
self.size
}
}
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn do_solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
}
impl SolverInterface for MockSolver {
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, _cuts: &RowBatch) {}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
self.do_solve()
}
fn reset(&mut self) {}
fn get_basis(&mut self, _out: &mut Basis) {}
fn solve_with_basis(
&mut self,
_basis: &Basis,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
self.do_solve()
}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, f64::NEG_INFINITY, 0.0, 0.0],
col_upper: vec![f64::INFINITY; 4],
objective: vec![0.0, 0.0, 0.0, 1.0],
row_lower: vec![0.0, 0.0],
row_upper: vec![0.0, 0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn fixed_solution(objective: f64, theta_val: f64) -> LpSolution {
let num_cols = 4; let mut primal = vec![0.0_f64; num_cols];
primal[3] = theta_val; LpSolution {
objective,
primal,
dual: vec![0.0_f64; 2],
reduced_costs: vec![0.0_f64; num_cols],
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn entity_counts_1_hydro() -> EntityCounts {
EntityCounts {
hydro_ids: vec![1],
hydro_productivities: vec![1.0],
thermal_ids: vec![],
line_ids: vec![],
bus_ids: vec![],
pumping_station_ids: vec![],
contract_ids: vec![],
non_controllable_ids: vec![],
}
}
fn make_stochastic_context(n_stages: usize) -> StochasticContext {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::build_stochastic_context;
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
},
};
let make_stage = |idx: usize, id: i32| Stage {
index: idx,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages)
.map(|i| make_stage(i, i32::try_from(i).unwrap()))
.collect();
let inflow = |stage_id: i32| InflowModel {
hydro_id: EntityId(1),
stage_id,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let inflow_models: Vec<InflowModel> = (0..n_stages)
.map(|i| inflow(i32::try_from(i).unwrap()))
.collect();
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
fn hydro_productivities_1hydro(n_stages: usize) -> Vec<Vec<f64>> {
vec![vec![1.0]; n_stages]
}
fn single_workspace(solver: MockSolver) -> Vec<SolverWorkspace<MockSolver>> {
vec![SolverWorkspace {
solver,
patch_buf: PatchBuffer::new(1, 0, 0, 0), current_state: Vec::with_capacity(1),
scratch: ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}]
}
#[test]
fn simulate_single_rank_4_scenarios_produces_4_results() {
let n_stages = 2;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0); let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0); let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
);
assert!(result.is_ok(), "simulate returned error: {result:?}");
let run_result = result.unwrap();
assert_eq!(
run_result.costs.len(),
4,
"cost buffer should have 4 entries"
);
let mut received = 0;
while rx.try_recv().is_ok() {
received += 1;
}
assert_eq!(received, 4, "channel should have received 4 results");
}
#[test]
fn simulate_infeasible_returns_lp_infeasible_error() {
let n_stages = 2;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::infeasible_on(solution, 5);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
);
match result {
Err(SimulationError::LpInfeasible {
scenario_id,
stage_id,
..
}) => {
assert_eq!(scenario_id, 2, "expected scenario_id=2, got {scenario_id}");
assert_eq!(stage_id, 1, "expected stage_id=1, got {stage_id}");
}
other => panic!("expected LpInfeasible, got {other:?}"),
}
}
#[test]
fn simulate_infeasible_at_scenario2_stage3() {
let n_stages = 4;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::infeasible_on(solution, 11);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
);
match result {
Err(SimulationError::LpInfeasible {
scenario_id,
stage_id,
..
}) => {
assert_eq!(scenario_id, 2, "expected scenario_id=2, got {scenario_id}");
assert_eq!(stage_id, 3, "expected stage_id=3, got {stage_id}");
}
other => panic!("expected LpInfeasible, got {other:?}"),
}
}
#[test]
fn simulate_channel_closed_returns_error() {
let n_stages = 2;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 2,
io_channel_capacity: 1,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, rx) = mpsc::sync_channel(1);
drop(rx);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
);
assert!(
matches!(result, Err(SimulationError::ChannelClosed)),
"expected ChannelClosed, got {result:?}"
);
}
#[test]
fn simulate_total_cost_equals_sum_of_stage_costs() {
let n_stages = 3;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0); let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 2,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let objective = 100.0_f64;
let theta_val = 30.0_f64;
let expected_stage_cost = (objective - theta_val) * 1000.0; #[allow(clippy::cast_precision_loss)]
let expected_total_cost = expected_stage_cost * n_stages as f64;
let solution = fixed_solution(objective, theta_val);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let run_result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(run_result.costs.len(), 2);
for (scenario_id, total_cost, _) in &run_result.costs {
assert!(
(total_cost - expected_total_cost).abs() < 1e-9,
"scenario {scenario_id}: expected total_cost={expected_total_cost}, got {total_cost}"
);
}
}
#[test]
fn simulate_cost_buffer_scenario_ids_match_assigned_range() {
let n_stages = 1;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 6,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(50.0, 10.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 2 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let run_result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(
run_result.costs.len(),
3,
"rank 0 should process 3 scenarios"
);
let ids: Vec<u32> = run_result.costs.iter().map(|(id, _, _)| *id).collect();
assert_eq!(
ids,
vec![0, 1, 2],
"scenario IDs must match assigned range 0..3"
);
}
#[test]
fn simulate_channel_receives_results_in_scenario_order() {
let n_stages = 1;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 3,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 20.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
let received: Vec<u32> = (0..3).map(|_| rx.recv().unwrap().scenario_id).collect();
assert_eq!(received, vec![0, 1, 2]);
}
#[test]
fn test_simulation_parallel_cost_determinism() {
let n_stages = 2;
let n_scenarios = 20u32;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios,
io_channel_capacity: 64,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let objective = 100.0_f64;
let theta_val = 30.0_f64;
let solution = fixed_solution(objective, theta_val);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let hprod = hydro_productivities_1hydro(n_stages);
let (tx1, _rx1) = mpsc::sync_channel(64);
let mut workspaces_1 = single_workspace(MockSolver::always_ok(solution.clone()));
let result_1 = simulate(
&mut workspaces_1,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx1,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
let (tx4, _rx4) = mpsc::sync_channel(64);
let mut workspaces_4: Vec<SolverWorkspace<MockSolver>> = (0..4)
.map(|_| SolverWorkspace {
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(1),
scratch: ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
})
.collect();
let result_4 = simulate(
&mut workspaces_4,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx4,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
let costs_1 = &result_1.costs;
let costs_4 = &result_4.costs;
assert_eq!(
costs_1.len(),
n_scenarios as usize,
"1-workspace: 20 entries"
);
assert_eq!(
costs_4.len(),
n_scenarios as usize,
"4-workspace: 20 entries"
);
let ids_1: Vec<u32> = costs_1.iter().map(|(id, _, _)| *id).collect();
let ids_4: Vec<u32> = costs_4.iter().map(|(id, _, _)| *id).collect();
let expected_ids: Vec<u32> = (0..n_scenarios).collect();
assert_eq!(ids_1, expected_ids, "1-workspace: sorted scenario IDs");
assert_eq!(ids_4, expected_ids, "4-workspace: sorted scenario IDs");
for i in 0..n_scenarios as usize {
let (id1, cost1, _) = &costs_1[i];
let (id4, cost4, _) = &costs_4[i];
assert_eq!(id1, id4, "scenario_id mismatch at index {i}");
assert!(
(cost1 - cost4).abs() < 1e-9,
"cost mismatch for scenario {id1}: 1-ws={cost1}, 4-ws={cost4}"
);
}
}
#[test]
fn simulate_emits_progress_events() {
use cobre_core::TrainingEvent;
let n_stages = 2;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 10,
io_channel_capacity: 32,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(32);
let (event_tx, event_rx) = mpsc::channel::<TrainingEvent>();
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: Some(event_tx),
},
&[],
&comm,
);
assert!(result.is_ok(), "simulate returned error: {result:?}");
let events: Vec<TrainingEvent> = event_rx.iter().collect();
let progress_events: Vec<_> = events
.iter()
.filter(|e| matches!(e, TrainingEvent::SimulationProgress { .. }))
.collect();
assert!(
!progress_events.is_empty(),
"at least 1 SimulationProgress event expected"
);
for event in &progress_events {
let TrainingEvent::SimulationProgress {
scenarios_complete,
scenario_cost,
..
} = event
else {
continue;
};
assert!(
*scenarios_complete > 0,
"scenarios_complete must be > 0, got {scenarios_complete}"
);
assert!(
scenario_cost.is_finite() && !scenario_cost.is_nan(),
"scenario_cost must be finite and non-NaN, got {scenario_cost}"
);
}
}
#[test]
fn simulate_no_events_when_sender_is_none() {
let n_stages = 2;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
let result = simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
);
assert!(result.is_ok(), "simulate returned error: {result:?}");
let run_result = result.unwrap();
assert_eq!(
run_result.costs.len(),
4,
"cost buffer must have 4 entries when event_sender is None"
);
}
#[test]
fn simulate_progress_events_received_before_return() {
use cobre_core::TrainingEvent;
let n_stages = 1;
let n_scenarios = 10;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios,
io_channel_capacity: 32,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(32);
let (event_tx, event_rx) = mpsc::channel::<TrainingEvent>();
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: Some(event_tx),
},
&[],
&comm,
)
.unwrap();
let events: Vec<TrainingEvent> = event_rx.iter().collect();
let progress_count = events
.iter()
.filter(|e| matches!(e, TrainingEvent::SimulationProgress { .. }))
.count();
assert!(
progress_count > 0,
"expected SimulationProgress events in channel after simulate() returns, got 0"
);
assert_eq!(
progress_count, n_scenarios as usize,
"expected {n_scenarios} SimulationProgress events (one per scenario), got {progress_count}"
);
}
#[test]
fn simulate_progress_scenario_cost_equals_total_cost() {
use cobre_core::TrainingEvent;
let n_stages = 1;
let n_scenarios = 5_u32;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios,
io_channel_capacity: 32,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let expected_stage_cost = 70_000.0_f64;
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(32);
let (event_tx, event_rx) = mpsc::channel::<TrainingEvent>();
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: Some(event_tx),
},
&[],
&comm,
)
.unwrap();
let events: Vec<TrainingEvent> = event_rx.iter().collect();
let progress_events: Vec<_> = events
.iter()
.filter(|e| matches!(e, TrainingEvent::SimulationProgress { .. }))
.collect();
assert_eq!(
progress_events.len(),
n_scenarios as usize,
"expected {n_scenarios} progress events"
);
for event in &progress_events {
let TrainingEvent::SimulationProgress { scenario_cost, .. } = event else {
continue;
};
assert!(
(scenario_cost - expected_stage_cost).abs() < 1e-9,
"scenario_cost must equal expected cost {expected_stage_cost}, got {scenario_cost}"
);
}
}
#[test]
fn simulate_emits_simulation_finished_as_last_event() {
use cobre_core::TrainingEvent;
let n_stages = 1;
let n_scenarios = 6_u32;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios,
io_channel_capacity: 32,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(32);
let (event_tx, event_rx) = mpsc::channel::<TrainingEvent>();
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: Some(event_tx),
},
&[],
&comm,
)
.unwrap();
let events: Vec<TrainingEvent> = event_rx.iter().collect();
assert!(
events.len() > n_scenarios as usize,
"expected at least {} events, got {}",
n_scenarios + 1,
events.len()
);
let last = events.last().unwrap();
assert!(
matches!(last, TrainingEvent::SimulationFinished { .. }),
"last event must be SimulationFinished, got {last:?}"
);
let TrainingEvent::SimulationFinished { scenarios, .. } = last else {
panic!("last event is not SimulationFinished");
};
assert_eq!(
*scenarios, n_scenarios,
"SimulationFinished.scenarios must equal n_scenarios={n_scenarios}, got {scenarios}"
);
let progress_count = events
.iter()
.filter(|e| matches!(e, TrainingEvent::SimulationProgress { .. }))
.count();
assert_eq!(
progress_count, n_scenarios as usize,
"expected {n_scenarios} SimulationProgress events before SimulationFinished"
);
}
#[test]
fn simulate_progress_scenario_cost_is_finite() {
use cobre_core::TrainingEvent;
let n_stages = 1;
let templates: Vec<StageTemplate> = (0..n_stages).map(|_| minimal_template_1_0()).collect();
let base_rows: Vec<usize> = vec![0; n_stages];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let stochastic = make_stochastic_context(n_stages);
let config = SimulationConfig {
n_scenarios: 5,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (result_tx, _result_rx) = mpsc::sync_channel(16);
let (event_tx, event_rx) = mpsc::channel::<TrainingEvent>();
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &result_tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: Some(event_tx),
},
&[],
&comm,
)
.unwrap();
let events: Vec<TrainingEvent> = event_rx.iter().collect();
let progress_events: Vec<_> = events
.iter()
.filter(|e| matches!(e, TrainingEvent::SimulationProgress { .. }))
.collect();
for event in &progress_events {
let TrainingEvent::SimulationProgress { scenario_cost, .. } = event else {
continue;
};
assert!(
scenario_cost.is_finite() && !scenario_cost.is_nan(),
"scenario_cost must be finite and non-NaN, got {scenario_cost}"
);
}
}
fn make_stochastic_context_1_hydro_1_load_bus_sim(
mean_mw: f64,
std_mw: f64,
) -> StochasticContext {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{CorrelationModel, InflowModel, LoadModel};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::build_stochastic_context;
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(10),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 20.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let load_model = LoadModel {
bus_id: EntityId(1),
stage_id: 0,
mean_mw,
std_mw,
};
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles: BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.load_models(vec![load_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
#[test]
fn simulation_load_patches_applied() {
let n_stages = 1;
let template = StageTemplate {
num_cols: 3,
num_rows: 3,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![0.0, 100.0, 300.0], row_upper: vec![0.0, 100.0, 300.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 3,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template];
let base_rows = vec![1usize];
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus_sim(300.0, 30.0);
let indexer = StageIndexer::new(1, 0); let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let config = SimulationConfig {
n_scenarios: 1,
io_channel_capacity: 4,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(4);
let mut workspaces = vec![SolverWorkspace {
solver,
patch_buf: PatchBuffer::new(1, 0, n_load_buses, 1),
current_state: Vec::with_capacity(1),
scratch: ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}];
let load_balance_row_starts = vec![2usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let noise_scale = vec![1.0_f64];
let hprod = hydro_productivities_1hydro(n_stages);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry (1 load bus x 1 block)"
);
assert!(
workspaces[0].scratch.load_rhs_buf[0] > 0.0,
"realization must be positive with mean=300, std=30: got {}",
workspaces[0].scratch.load_rhs_buf[0]
);
let d_observed = workspaces[0].scratch.load_rhs_buf[0];
let mean_mw_val = 300.0_f64;
let std_mw_val = 30.0_f64;
assert!(
d_observed != mean_mw_val,
"realization must differ from template mean (noise was applied)"
);
let eta_back = (d_observed - mean_mw_val) / std_mw_val;
let recomputed = (mean_mw_val + std_mw_val * eta_back).max(0.0);
assert!(
(d_observed - recomputed).abs() < 1e-10,
"formula consistency: d={d_observed}, eta_back={eta_back}, recomputed={recomputed}"
);
let cat4_start = 2; assert_eq!(
workspaces[0].patch_buf.lower[cat4_start], workspaces[0].scratch.load_rhs_buf[0],
"patch_buf lower at load slot must equal load_rhs_buf[0]"
);
assert_eq!(
workspaces[0].patch_buf.upper[cat4_start], workspaces[0].scratch.load_rhs_buf[0],
"patch_buf upper at load slot must equal load_rhs_buf[0] (equality constraint)"
);
assert!(
!workspaces[0].scratch.row_lower_buf.is_empty(),
"row_lower_buf must be populated for extraction"
);
assert_eq!(
workspaces[0].scratch.row_lower_buf[2], d_observed,
"extraction row_lower_buf must contain stochastic load, not template mean"
);
assert!(
(workspaces[0].scratch.row_lower_buf[2] - mean_mw_val).abs() > 1e-6,
"extracted load_mw must differ from template mean {mean_mw_val}: got {}",
workspaces[0].scratch.row_lower_buf[2]
);
}
#[test]
fn simulation_no_load_buses_unchanged() {
let n_stages = 1;
let templates = vec![minimal_template_1_0()];
let base_rows = vec![0usize];
let stochastic = make_stochastic_context(n_stages);
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let config = SimulationConfig {
n_scenarios: 1,
io_channel_capacity: 4,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(4);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace(solver);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert!(
workspaces[0].scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
assert_eq!(
workspaces[0].patch_buf.forward_patch_count(),
2,
"forward_patch_count must be N*(2+L)=2 when n_load_buses=0, got {}",
workspaces[0].patch_buf.forward_patch_count()
);
}
#[test]
fn simulation_inflow_extraction_unaffected() {
let n_stages = 1;
let template = StageTemplate {
num_cols: 3,
num_rows: 3,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![0.0, 100.0, 300.0],
row_upper: vec![0.0, 100.0, 300.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 3,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template];
let base_rows = vec![1usize];
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus_sim(300.0, 30.0);
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, 0);
let config = SimulationConfig {
n_scenarios: 1,
io_channel_capacity: 4,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![50.0_f64];
let solution = fixed_solution(100.0, 30.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(4);
let mut workspaces = vec![SolverWorkspace {
solver,
patch_buf: PatchBuffer::new(1, 0, n_load_buses, 1),
current_state: Vec::with_capacity(1),
scratch: ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}];
let load_balance_row_starts = vec![2usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let noise_scale = vec![1.0_f64];
let hprod = hydro_productivities_1hydro(n_stages);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].scratch.noise_buf.len(),
1,
"noise_buf must have 1 entry (1 hydro), not contaminated by load noise: len={}",
workspaces[0].scratch.noise_buf.len()
);
assert!(
workspaces[0].scratch.noise_buf[0] > 50.0 && workspaces[0].scratch.noise_buf[0] < 200.0,
"noise_buf[0] must be a reasonable inflow value near 100.0, got {}",
workspaces[0].scratch.noise_buf[0]
);
assert_eq!(
workspaces[0].scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry alongside noise_buf"
);
}
fn make_stochastic_1h_1s(mean_m3s: f64, std_m3s: f64) -> StochasticContext {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::build_stochastic_context;
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s,
std_m3s,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
fn minimal_template_1_0_with_base(base_rhs: f64) -> StageTemplate {
StageTemplate {
num_cols: 3,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![base_rhs],
row_upper: vec![base_rhs],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn single_workspace_with_hydros(
solver: MockSolver,
hydro_count: usize,
) -> Vec<SolverWorkspace<MockSolver>> {
vec![SolverWorkspace {
solver,
patch_buf: PatchBuffer::new(hydro_count, 0, 0, 0),
current_state: Vec::with_capacity(hydro_count),
scratch: ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: vec![0.0_f64; hydro_count],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}]
}
#[test]
fn simulation_truncation_clamps_negative_inflow_noise() {
let mean_m3s = -1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64; let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let n_stages = 1;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let template = minimal_template_1_0_with_base(base_rhs);
let templates = vec![template];
let base_rows = vec![0_usize];
let noise_scale = vec![noise_scale_val];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, indexer.n_state, 1, 10, 0);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![0.0_f64];
let solution = fixed_solution(0.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace_with_hydros(solver, 1);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[n_stages],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::Truncation,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].scratch.noise_buf.len(),
1,
"noise_buf must have exactly 1 entry for 1 hydro"
);
assert!(
workspaces[0].scratch.noise_buf[0] >= 0.0,
"after truncation, noise_buf[0] must be >= 0 (inflow cannot be negative), got {}",
workspaces[0].scratch.noise_buf[0]
);
}
#[test]
fn simulation_none_method_produces_raw_negative_noise() {
let mean_m3s = -1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64;
let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let n_stages = 1;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let template = minimal_template_1_0_with_base(base_rhs);
let templates = vec![template];
let base_rows = vec![0_usize];
let noise_scale = vec![noise_scale_val];
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(n_stages, indexer.n_state, 1, 10, 0);
let config = SimulationConfig {
n_scenarios: 4,
io_channel_capacity: 16,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let initial_state = vec![0.0_f64];
let solution = fixed_solution(0.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm { rank: 0, size: 1 };
let entity_counts = entity_counts_1_hydro();
let (tx, _rx) = mpsc::sync_channel(16);
let hprod = hydro_productivities_1hydro(n_stages);
let mut workspaces = single_workspace_with_hydros(solver, 1);
simulate(
&mut workspaces,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[n_stages],
ncs_max_gen: &[],
},
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&config,
SimulationOutputSpec {
result_tx: &tx,
zeta_per_stage: &[],
block_hours_per_stage: &[],
entity_counts: &entity_counts,
generic_constraint_row_entries: &[],
ncs_col_starts: &[],
n_ncs_per_stage: &[],
ncs_entity_ids_per_stage: &[],
diversion_upstream: &HashMap::new(),
hydro_productivities_per_stage: &hprod,
event_sender: None,
},
&[],
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].scratch.noise_buf.len(),
1,
"noise_buf must have exactly 1 entry for 1 hydro"
);
assert!(
workspaces[0].scratch.noise_buf[0] < 0.0,
"with None method, noise_buf[0] must be negative (raw eta applied), got {}",
workspaces[0].scratch.noise_buf[0]
);
}
}