use std::time::Instant;
use cobre_comm::Communicator;
use cobre_solver::{Basis, RowBatch, SolverError, SolverInterface};
use cobre_stochastic::sample_forward;
use rayon::iter::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
};
use crate::{
FutureCostFunction, SddpError, StageIndexer, TrajectoryRecord,
context::{StageContext, TrainingContext},
lp_builder::COST_SCALE_FACTOR,
noise::{transform_inflow_noise, transform_load_noise, transform_ncs_noise},
workspace::{BasisStore, BasisStoreSliceMut, SolverWorkspace},
};
#[derive(Debug, Clone)]
#[must_use]
pub struct ForwardResult {
pub scenario_costs: Vec<f64>,
pub elapsed_ms: u64,
pub lp_solves: u64,
}
#[derive(Debug, Clone)]
#[must_use]
pub struct SyncResult {
pub global_ub_mean: f64,
pub global_ub_std: f64,
pub ci_95_half_width: f64,
pub sync_time_ms: u64,
}
pub fn sync_forward<C: Communicator>(
local: &ForwardResult,
comm: &C,
total_forward_passes: usize,
) -> Result<SyncResult, SddpError> {
let start = Instant::now();
let num_ranks = comm.size();
let my_rank = comm.rank();
let base = total_forward_passes / num_ranks;
let remainder = total_forward_passes % num_ranks;
let counts: Vec<usize> = (0..num_ranks)
.map(|r| base + usize::from(r < remainder))
.collect();
let mut displs = vec![0usize; num_ranks];
for r in 1..num_ranks {
displs[r] = displs[r - 1] + counts[r - 1];
}
let global_n = counts.iter().sum::<usize>();
debug_assert_eq!(
global_n, total_forward_passes,
"counts sum {global_n} != total_forward_passes {total_forward_passes}",
);
let mut global_costs = vec![0.0_f64; global_n];
debug_assert_eq!(
local.scenario_costs.len(),
counts[my_rank],
"rank {my_rank}: scenario_costs length {} != expected count {}",
local.scenario_costs.len(),
counts[my_rank],
);
comm.allgatherv(&local.scenario_costs, &mut global_costs, &counts, &displs)?;
#[allow(clippy::cast_precision_loss)]
let global_n_f64 = global_n as f64;
let mut cost_sum = 0.0_f64;
let mut cost_sum_sq = 0.0_f64;
for &c in &global_costs {
cost_sum += c;
cost_sum_sq += c * c;
}
let mean = cost_sum / global_n_f64;
let (std_dev, ci_95) = if global_n > 1 {
let variance = (cost_sum_sq - global_n_f64 * mean * mean) / (global_n_f64 - 1.0);
let sd = variance.max(0.0).sqrt();
let ci = 1.96_f64 * sd / global_n_f64.sqrt();
(sd, ci)
} else {
(0.0_f64, 0.0_f64)
};
#[allow(clippy::cast_possible_truncation)]
let sync_time_ms = start.elapsed().as_millis() as u64;
Ok(SyncResult {
global_ub_mean: mean,
global_ub_std: std_dev,
ci_95_half_width: ci_95,
sync_time_ms,
})
}
#[inline]
fn push_scaled_coefficient(batch: &mut RowBatch, j: usize, coeff: f64, col_scale: &[f64]) {
debug_assert!(
i32::try_from(j).is_ok(),
"column index j={j} exceeds i32::MAX"
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.col_indices.push(j as i32);
let d = if col_scale.is_empty() {
1.0
} else {
col_scale[j]
};
batch.values.push(-coeff * d);
}
pub fn build_cut_row_batch_into(
batch: &mut RowBatch,
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
) {
batch.clear();
let n_state = indexer.n_state;
let theta_col = indexer.theta;
let mask = &indexer.nonzero_state_indices;
let is_sparse = !mask.is_empty();
let num_cuts: usize = fcf.active_cuts(stage).count();
if num_cuts == 0 {
batch.row_starts.push(0_i32);
return;
}
let nnz_per_cut = if is_sparse {
mask.len() + 1
} else {
n_state + 1
};
let total_nnz = num_cuts * nnz_per_cut;
let mut nz_offset = 0;
for (_slot, intercept, coefficients) in fcf.active_cuts(stage) {
debug_assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length {got} != n_state {expected}",
got = coefficients.len(),
expected = n_state,
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.row_starts.push(nz_offset as i32);
if is_sparse {
for &j in mask {
push_scaled_coefficient(batch, j, coefficients[j], col_scale);
}
} else {
for (j, &c) in coefficients.iter().enumerate() {
push_scaled_coefficient(batch, j, c, col_scale);
}
}
debug_assert!(
i32::try_from(theta_col).is_ok(),
"theta_col={theta_col} exceeds i32::MAX"
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.col_indices.push(theta_col as i32);
let d_theta = if col_scale.is_empty() {
1.0
} else {
col_scale[theta_col]
};
batch.values.push(d_theta);
batch.row_lower.push(intercept);
batch.row_upper.push(f64::INFINITY);
nz_offset += nnz_per_cut;
}
#[allow(clippy::expect_used)]
batch.row_starts.push(
i32::try_from(total_nnz).expect("total_nnz exceeds i32::MAX; LP exceeds HiGHS API limit"),
);
batch.num_rows = num_cuts;
}
#[must_use]
pub fn build_cut_row_batch(
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
) -> RowBatch {
let mut batch = RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
};
build_cut_row_batch_into(&mut batch, fcf, stage, indexer, col_scale);
batch
}
pub fn append_new_cuts_to_lp<S: SolverInterface>(
solver: &mut S,
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
row_map: &mut crate::cut::CutRowMap,
batch_buf: &mut RowBatch,
) -> usize {
batch_buf.clear();
let n_state = indexer.n_state;
let theta_col = indexer.theta;
let nnz_per_cut = n_state + 1;
let mut new_count = 0usize;
let mut nz_offset = 0usize;
for (slot, intercept, coefficients) in fcf.active_cuts(stage) {
if row_map.lp_row_for_slot(slot).is_some() {
continue;
}
debug_assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length {got} != n_state {expected}",
got = coefficients.len(),
expected = n_state,
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch_buf.row_starts.push(nz_offset as i32);
for (j, &c) in coefficients.iter().enumerate() {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch_buf.col_indices.push(j as i32);
let d = if col_scale.is_empty() {
1.0
} else {
col_scale[j]
};
batch_buf.values.push(-c * d);
}
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch_buf.col_indices.push(theta_col as i32);
let d_theta = if col_scale.is_empty() {
1.0
} else {
col_scale[theta_col]
};
batch_buf.values.push(d_theta);
batch_buf.row_lower.push(intercept);
batch_buf.row_upper.push(f64::INFINITY);
row_map.insert(slot);
new_count += 1;
nz_offset += nnz_per_cut;
}
if new_count > 0 {
let total_nnz = new_count * nnz_per_cut;
#[allow(clippy::expect_used)]
batch_buf.row_starts.push(
i32::try_from(total_nnz)
.expect("total_nnz exceeds i32::MAX; LP exceeds HiGHS API limit"),
);
batch_buf.num_rows = new_count;
solver.add_rows(batch_buf);
}
new_count
}
pub fn deactivate_cuts_in_lp<S: SolverInterface>(
solver: &mut S,
deactivation_set: &crate::cut_selection::DeactivationSet,
row_map: &mut crate::cut::CutRowMap,
) -> usize {
if deactivation_set.indices.is_empty() {
return 0;
}
let mut indices: Vec<usize> = Vec::with_capacity(deactivation_set.indices.len());
let mut lower: Vec<f64> = Vec::with_capacity(deactivation_set.indices.len());
let mut upper: Vec<f64> = Vec::with_capacity(deactivation_set.indices.len());
for &slot_u32 in &deactivation_set.indices {
let slot = slot_u32 as usize;
if let Some(lp_row) = row_map.lp_row_for_slot(slot) {
if row_map.is_slot_active(slot) {
indices.push(lp_row);
lower.push(f64::NEG_INFINITY);
upper.push(f64::INFINITY);
row_map.deactivate(slot);
}
}
}
if !indices.is_empty() {
solver.set_row_bounds(&indices, &lower, &upper);
}
indices.len()
}
pub struct ForwardPassBatch {
pub local_forward_passes: usize,
pub iteration: u64,
pub fwd_offset: usize,
}
#[inline]
pub(crate) fn partition(n_scenarios: usize, n_workers: usize, worker_id: usize) -> (usize, usize) {
if n_workers == 0 {
return (0, 0);
}
let base = n_scenarios / n_workers;
let remainder = n_scenarios % n_workers;
let start = base * worker_id + worker_id.min(remainder);
let end = start + base + usize::from(worker_id < remainder);
(start, end)
}
struct StageKey<'a> {
t: usize,
m: usize,
local_m: usize,
num_stages: usize,
iteration: u64,
raw_noise: &'a [f64],
}
#[allow(clippy::too_many_lines)]
fn run_forward_stage<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
basis_slice: &mut BasisStoreSliceMut<'_>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
key: &StageKey<'_>,
worker_records: &mut [TrajectoryRecord],
) -> Result<f64, SddpError> {
let StageKey {
t,
m,
local_m,
num_stages,
iteration,
raw_noise,
} = *key;
let n_hydros = ctx.n_hydros;
let n_load_buses = ctx.n_load_buses;
let indexer = training_ctx.indexer;
let stochastic = training_ctx.stochastic;
let horizon = training_ctx.horizon;
let (state_ref, scratch) = (&ws.current_state[..], &mut ws.scratch);
transform_inflow_noise(raw_noise, t, state_ref, ctx, training_ctx, scratch);
let blk = if n_load_buses > 0 {
ctx.block_counts_per_stage[t]
} else {
0
};
transform_load_noise(
raw_noise,
n_hydros,
n_load_buses,
stochastic,
t,
blk,
&mut ws.scratch.load_rhs_buf,
);
let n_stochastic_ncs = stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 {
transform_ncs_noise(
raw_noise,
n_hydros,
n_load_buses,
stochastic,
t,
ctx.block_counts_per_stage[t],
ctx.ncs_max_gen,
&mut ws.scratch.ncs_col_upper_buf,
);
}
ws.patch_buf.fill_forward_patches(
indexer,
&ws.current_state,
&ws.scratch.noise_buf,
ctx.base_rows[t],
&ctx.templates[t].row_scale,
);
if n_load_buses > 0 {
ws.patch_buf.fill_load_patches(
ctx.load_balance_row_starts[t],
ctx.block_counts_per_stage[t],
&ws.scratch.load_rhs_buf,
ctx.load_bus_indices,
&ctx.templates[t].row_scale,
);
}
ws.patch_buf.fill_z_inflow_patches(
indexer.z_inflow_row_start,
&ws.scratch.z_inflow_rhs_buf,
&ctx.templates[t].row_scale,
);
let pc = ws.patch_buf.forward_patch_count();
ws.solver.set_row_bounds(
&ws.patch_buf.indices[..pc],
&ws.patch_buf.lower[..pc],
&ws.patch_buf.upper[..pc],
);
if n_stochastic_ncs > 0 && !indexer.ncs_generation.is_empty() {
let n_blks = ctx.block_counts_per_stage[t];
let expected_len = n_stochastic_ncs * n_blks;
if ws.scratch.ncs_col_indices_buf.len() != expected_len {
ws.scratch.ncs_col_indices_buf.clear();
ws.scratch.ncs_col_lower_buf.clear();
for ncs_idx in 0..n_stochastic_ncs {
for blk in 0..n_blks {
ws.scratch
.ncs_col_indices_buf
.push(indexer.ncs_generation.start + ncs_idx * n_blks + blk);
ws.scratch.ncs_col_lower_buf.push(0.0);
}
}
}
ws.solver.set_col_bounds(
&ws.scratch.ncs_col_indices_buf,
&ws.scratch.ncs_col_lower_buf,
&ws.scratch.ncs_col_upper_buf,
);
}
if horizon.is_terminal(t + 1) {
ws.solver.set_col_bounds(&[indexer.theta], &[0.0], &[0.0]);
}
let view = match basis_slice.get(m, t) {
Some(rb) => ws.solver.solve_with_basis(rb),
None => ws.solver.solve(),
}
.map_err(|e| {
*basis_slice.get_mut(m, t) = None;
match e {
SolverError::Infeasible => SddpError::Infeasible {
stage: t,
iteration,
scenario: m,
},
other => SddpError::Solver(other),
}
})?;
let col_scale = &ctx.templates[t].col_scale;
let unscaled_primal = &mut ws.scratch.unscaled_primal;
if col_scale.is_empty() {
unscaled_primal.clear();
unscaled_primal.extend_from_slice(view.primal);
} else {
unscaled_primal.resize(view.primal.len(), 0.0);
for (j, (xp, &d)) in view.primal.iter().zip(col_scale).enumerate() {
unscaled_primal[j] = d * xp;
}
}
let d_t = ctx.discount_factors.get(t).copied().unwrap_or(1.0);
let stage_cost = (view.objective - d_t * unscaled_primal[indexer.theta]) * COST_SCALE_FACTOR;
let rec = &mut worker_records[local_m * num_stages + t];
rec.primal.clear();
rec.primal.extend_from_slice(unscaled_primal);
rec.dual.clear();
rec.stage_cost = stage_cost;
let lag_start = indexer.inflow_lags.start;
let lag_len = indexer.hydro_count * indexer.max_par_order;
ws.scratch.lag_matrix_buf.clear();
ws.scratch
.lag_matrix_buf
.extend_from_slice(&ws.current_state[lag_start..lag_start + lag_len]);
ws.current_state.clear();
ws.current_state
.extend_from_slice(&unscaled_primal[..indexer.n_state]);
crate::noise::shift_lag_state(
&mut ws.current_state,
&ws.scratch.lag_matrix_buf,
unscaled_primal,
indexer,
);
rec.state.clear();
rec.state.extend_from_slice(&ws.current_state);
if let Some(rb) = basis_slice.get_mut(m, t) {
ws.solver.get_basis(rb);
} else {
let mut rb = Basis::new(ctx.templates[t].num_cols, ctx.templates[t].num_rows);
ws.solver.get_basis(&mut rb);
*basis_slice.get_mut(m, t) = Some(rb);
}
Ok(stage_cost)
}
#[allow(clippy::too_many_arguments)]
pub fn run_forward_pass<S: SolverInterface + Send>(
workspaces: &mut [SolverWorkspace<S>],
basis_store: &mut BasisStore,
ctx: &StageContext<'_>,
fcf: &FutureCostFunction,
cut_batches: &mut [RowBatch],
training_ctx: &TrainingContext<'_>,
batch: &ForwardPassBatch,
records: &mut [TrajectoryRecord],
) -> Result<ForwardResult, SddpError> {
let TrainingContext {
horizon,
indexer,
stochastic,
initial_state,
..
} = training_ctx;
let ForwardPassBatch {
local_forward_passes,
iteration,
fwd_offset,
} = batch;
let (num_stages, forward_passes) = (horizon.num_stages(), *local_forward_passes);
debug_assert_eq!(records.len(), forward_passes * num_stages);
debug_assert_eq!(initial_state.len(), indexer.n_state);
let start = Instant::now();
for (t, batch) in cut_batches.iter_mut().enumerate().take(num_stages) {
build_cut_row_batch_into(batch, fcf, t, indexer, &ctx.templates[t].col_scale);
}
let tree_view = stochastic.tree_view();
let base_seed = stochastic.base_seed();
let n_workers = workspaces.len().max(1);
let mut remaining: &mut [TrajectoryRecord] = records;
let mut record_slices: Vec<&mut [TrajectoryRecord]> = Vec::with_capacity(n_workers);
for w in 0..n_workers {
let (start_m, end_m) = partition(forward_passes, n_workers, w);
let (slice, rest) = remaining.split_at_mut((end_m - start_m) * num_stages);
record_slices.push(slice);
remaining = rest;
}
let basis_slices: Vec<BasisStoreSliceMut<'_>> = basis_store.split_workers_mut(n_workers);
let worker_results: Vec<Result<(Vec<f64>, u64), SddpError>> = workspaces
.par_iter_mut()
.zip(record_slices.par_iter_mut())
.zip(basis_slices.into_par_iter())
.enumerate()
.map(|(w, ((ws, worker_records), mut basis_slice))| {
let (start_m, end_m) = partition(forward_passes, n_workers, w);
let n_local = end_m - start_m;
let mut trajectory_costs = vec![0.0_f64; n_local];
let local_solve_count_before = ws.solver.statistics().solve_count;
for t in 0..num_stages {
ws.solver.load_model(&ctx.templates[t]);
if cut_batches[t].num_rows > 0 {
ws.solver.add_rows(&cut_batches[t]);
}
let cum_d = ctx
.cumulative_discount_factors
.get(t)
.copied()
.unwrap_or(1.0);
for (local_m, m) in (start_m..end_m).enumerate() {
ws.current_state.clear();
let src: &[f64] = if t == 0 {
initial_state
} else {
&worker_records[local_m * num_stages + (t - 1)].state
};
ws.current_state.extend_from_slice(src);
let global_scenario = fwd_offset + m;
#[allow(clippy::cast_possible_truncation)]
let (i32, s32, t32) = (*iteration as u32, global_scenario as u32, t as u32);
let (_, raw_noise) = sample_forward(&tree_view, base_seed, i32, s32, t32, t);
let key = StageKey {
t,
m,
local_m,
num_stages,
iteration: *iteration,
raw_noise,
};
trajectory_costs[local_m] += cum_d
* run_forward_stage(
ws,
&mut basis_slice,
ctx,
training_ctx,
&key,
worker_records,
)?;
}
}
let local_solves = ws.solver.statistics().solve_count - local_solve_count_before;
Ok((trajectory_costs, local_solves))
})
.collect();
let mut scenario_costs = Vec::with_capacity(forward_passes);
let mut lp_solves = 0u64;
for result in worker_results {
let (worker_costs, w_solves) = result?;
scenario_costs.extend(worker_costs);
lp_solves += w_solves;
}
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
Ok(ForwardResult {
scenario_costs,
elapsed_ms,
lp_solves,
})
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_comm::{CommData, Communicator, ReduceOp};
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
LoadModel,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_solver::{
Basis, LpSolution, RowBatch, SolverError, SolverInterface, SolverStatistics, StageTemplate,
};
use cobre_stochastic::StochasticContext;
use cobre_stochastic::context::build_stochastic_context;
use cobre_comm::LocalBackend;
use super::{
ForwardPassBatch, ForwardResult, SyncResult, build_cut_row_batch, partition,
run_forward_pass, sync_forward,
};
use crate::{
FutureCostFunction, HorizonMode, InflowNonNegativityMethod, StageIndexer, TrainingConfig,
TrajectoryRecord,
context::{StageContext, TrainingContext},
workspace::{BasisStore, SolverWorkspace},
};
fn empty_cut_batches(n_stages: usize) -> Vec<RowBatch> {
(0..n_stages)
.map(|_| RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
})
.collect()
}
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
warm_start_calls: usize,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn do_solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
}
impl SolverInterface for MockSolver {
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, _cuts: &RowBatch) {}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
self.do_solve()
}
fn reset(&mut self) {}
fn get_basis(&mut self, _out: &mut Basis) {}
fn solve_with_basis(
&mut self,
_basis: &Basis,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
self.warm_start_calls += 1;
self.do_solve()
}
fn statistics(&self) -> SolverStatistics {
SolverStatistics {
solve_count: self.call_count as u64,
..SolverStatistics::default()
}
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 0, 1, 1], row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, f64::NEG_INFINITY, 0.0, 0.0],
col_upper: vec![f64::INFINITY; 4],
objective: vec![0.0, 0.0, 0.0, 1.0], row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn fixed_solution(
num_cols: usize,
objective: f64,
theta_col: usize,
theta_val: f64,
) -> LpSolution {
let mut primal = vec![0.0_f64; num_cols];
primal[theta_col] = theta_val;
let num_rows = 1; LpSolution {
objective,
primal,
dual: vec![0.0; num_rows],
reduced_costs: vec![0.0; num_cols],
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn empty_records(n: usize) -> Vec<TrajectoryRecord> {
(0..n)
.map(|_| TrajectoryRecord {
primal: Vec::new(),
dual: Vec::new(),
stage_cost: 0.0,
state: Vec::new(),
})
.collect()
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_1_hydro_3_stages() -> StochasticContext {
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let make_stage = |idx: usize, id: i32| Stage {
index: idx,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let stages = vec![make_stage(0, 0), make_stage(1, 1), make_stage(2, 2)];
let inflow = |stage_id: i32| InflowModel {
hydro_id: EntityId(1),
stage_id,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(vec![inflow(0), inflow(1), inflow(2)])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
#[test]
fn forward_result_field_access() {
let r = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 123,
lp_solves: 0,
};
assert_eq!(r.scenario_costs.len(), 4);
assert_eq!(r.scenario_costs[0], 60.0);
assert_eq!(r.elapsed_ms, 123);
}
#[test]
fn forward_result_clone_and_debug() {
let r = ForwardResult {
scenario_costs: vec![1.0, 2.0],
elapsed_ms: 5,
lp_solves: 0,
};
let c = r.clone();
assert_eq!(c.scenario_costs.len(), r.scenario_costs.len());
assert_eq!(c.scenario_costs[0].to_bits(), r.scenario_costs[0].to_bits());
let s = format!("{r:?}");
assert!(s.contains("ForwardResult"));
}
#[test]
fn build_cut_row_batch_empty_cuts_returns_empty_batch() {
let fcf = FutureCostFunction::new(2, 1, 1, 10, 0);
let indexer = StageIndexer::new(1, 0);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 0);
assert_eq!(batch.row_starts, vec![0]);
assert!(batch.col_indices.is_empty());
assert!(batch.values.is_empty());
assert!(batch.row_lower.is_empty());
assert!(batch.row_upper.is_empty());
}
#[test]
fn build_cut_row_batch_one_cut_correct_structure() {
let mut fcf = FutureCostFunction::new(2, 1, 1, 10, 0);
fcf.add_cut(0, 0, 0, 5.0, &[2.0]);
let indexer = StageIndexer::new(1, 0);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_starts, vec![0, 2]);
assert_eq!(batch.col_indices, vec![0, 3]); assert_eq!(batch.values, vec![-2.0, 1.0]);
assert_eq!(batch.row_lower, vec![5.0]);
assert!(batch.row_upper[0].is_infinite() && batch.row_upper[0] > 0.0);
}
#[test]
fn build_cut_row_batch_two_cuts_correct_row_starts() {
let mut fcf = FutureCostFunction::new(2, 2, 1, 10, 0);
fcf.add_cut(1, 0, 0, 10.0, &[1.0, 3.0]);
fcf.add_cut(1, 1, 0, 20.0, &[2.0, 4.0]);
let indexer = StageIndexer::new(1, 1);
let batch = build_cut_row_batch(&fcf, 1, &indexer, &[]);
assert_eq!(batch.num_rows, 2);
assert_eq!(batch.row_starts, vec![0, 3, 6]);
assert_eq!(batch.col_indices[0], 0);
assert_eq!(batch.col_indices[1], 1);
assert_eq!(batch.col_indices[2], 4); assert_eq!(batch.values[0], -1.0);
assert_eq!(batch.values[1], -3.0);
assert_eq!(batch.values[2], 1.0);
assert_eq!(batch.col_indices[3], 0);
assert_eq!(batch.col_indices[4], 1);
assert_eq!(batch.col_indices[5], 4); assert_eq!(batch.values[3], -2.0);
assert_eq!(batch.values[4], -4.0);
assert_eq!(batch.values[5], 1.0);
assert_eq!(batch.row_lower, vec![10.0, 20.0]);
assert!(batch.row_upper[0].is_infinite() && batch.row_upper[0] > 0.0);
assert!(batch.row_upper[1].is_infinite() && batch.row_upper[1] > 0.0);
}
#[test]
fn build_cut_row_batch_zero_coefficient_state_variable() {
let mut fcf = FutureCostFunction::new(1, 2, 1, 5, 0);
fcf.add_cut(0, 0, 0, 3.0, &[0.0, 7.0]);
let indexer = StageIndexer::new(1, 1);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.col_indices, vec![0, 1, 4]); assert_eq!(batch.values, vec![0.0, -7.0, 1.0]);
assert_eq!(batch.row_lower, vec![3.0]);
}
fn single_workspace(solver: MockSolver, indexer: &StageIndexer) -> SolverWorkspace<MockSolver> {
SolverWorkspace {
solver,
patch_buf: crate::lp_builder::PatchBuffer::new(
indexer.hydro_count,
indexer.max_par_order,
0,
0,
),
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(indexer.hydro_count),
inflow_m3s_buf: Vec::with_capacity(indexer.hydro_count),
lag_matrix_buf: Vec::with_capacity(indexer.max_par_order * indexer.hydro_count),
par_inflow_buf: Vec::with_capacity(indexer.hydro_count),
eta_floor_buf: Vec::with_capacity(indexer.hydro_count),
zero_targets_buf: vec![0.0_f64; indexer.hydro_count],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}
}
#[test]
fn ac_two_scenarios_three_stages_fixed_solution() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let config = TrainingConfig {
forward_passes: 2,
max_iterations: 100,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender: None,
cut_activity_tolerance: 0.0,
n_fwd_threads: 1,
max_blocks: 1,
cut_selection: None,
shutdown_flag: None,
start_iteration: 0,
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
for (i, record) in records.iter().enumerate() {
assert_eq!(
record.stage_cost, 70_000.0,
"record[{i}].stage_cost should be 70_000.0 ((objective - theta) * COST_SCALE_FACTOR)"
);
}
assert_eq!(result.scenario_costs[0], 210_000.0);
assert_eq!(result.scenario_costs[1], 210_000.0);
}
#[test]
fn ac_infeasible_at_stage_1_scenario_0_returns_infeasible_error() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::infeasible_on(solution, 2);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let config = TrainingConfig {
forward_passes: 2,
max_iterations: 100,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender: None,
cut_activity_tolerance: 0.0,
n_fwd_threads: 1,
max_blocks: 1,
cut_selection: None,
shutdown_flag: None,
start_iteration: 0,
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
},
&mut records,
);
match result {
Err(crate::SddpError::Infeasible {
stage, scenario, ..
}) => {
assert_eq!(stage, 1, "expected stage=1");
assert_eq!(scenario, 0, "expected scenario=0");
}
other => panic!("expected Infeasible, got {other:?}"),
}
}
#[test]
fn ac_global_scenario_index_rank1_scenario0() {
let rank = 1usize;
let forward_passes = 3usize;
let m = 0usize;
let global_scenario = rank * forward_passes + m;
assert_eq!(global_scenario, 3);
}
#[test]
fn cost_statistics_accumulated_correctly() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let config = TrainingConfig {
forward_passes: 2,
max_iterations: 100,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender: None,
cut_activity_tolerance: 0.0,
n_fwd_threads: 1,
max_blocks: 1,
cut_selection: None,
shutdown_flag: None,
start_iteration: 0,
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
assert_eq!(result.scenario_costs[0], 210_000.0);
assert_eq!(result.scenario_costs[1], 210_000.0);
let cost_sum: f64 = result.scenario_costs.iter().sum();
let cost_sum_sq: f64 = result.scenario_costs.iter().map(|c| c * c).sum();
assert_eq!(cost_sum, 420_000.0);
assert_eq!(cost_sum_sq, 210_000.0_f64.powi(2) * 2.0);
}
#[test]
fn sync_result_field_access() {
let r = SyncResult {
global_ub_mean: 75.0,
global_ub_std: 12.909,
ci_95_half_width: 12.651,
sync_time_ms: 7,
};
assert_eq!(r.global_ub_mean, 75.0);
assert_eq!(r.global_ub_std, 12.909);
assert_eq!(r.ci_95_half_width, 12.651);
assert_eq!(r.sync_time_ms, 7);
}
#[test]
fn sync_result_clone_and_debug() {
let r = SyncResult {
global_ub_mean: 2.0,
global_ub_std: 3.0,
ci_95_half_width: 4.0,
sync_time_ms: 5,
};
let c = r.clone();
assert_eq!(c.global_ub_mean, r.global_ub_mean);
assert_eq!(c.global_ub_std, r.global_ub_std);
let s = format!("{r:?}");
assert!(s.contains("SyncResult"));
}
#[test]
fn ub_statistics_four_scenarios_correct_mean_and_std() {
let local = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 4).unwrap();
assert_eq!(result.global_ub_mean, 75.0, "mean must be 300/4 = 75");
let expected_std = (500.0_f64 / 3.0).sqrt();
let tolerance = 1e-9;
assert!(
(result.global_ub_std - expected_std).abs() < tolerance,
"std deviation {got} should be ≈ {expected_std}",
got = result.global_ub_std,
);
let expected_ci = 1.96_f64 * expected_std / 4.0_f64.sqrt();
assert!(
(result.ci_95_half_width - expected_ci).abs() < tolerance,
"ci_95 {got} should be ≈ {expected_ci}",
got = result.ci_95_half_width,
);
}
#[test]
fn ac_ticket_acceptance_criterion_ub_mean() {
let local = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 4).unwrap();
assert_eq!(result.global_ub_mean, 75.0);
assert!(
result.global_ub_std > 0.0,
"std must be positive for 4 distinct scenarios"
);
}
#[test]
fn canonical_summation_identical_regardless_of_partition() {
let single_rank = ForwardResult {
scenario_costs: vec![1.0, 2.0, 3.0, 4.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result_single = sync_forward(&single_rank, &comm, 4).unwrap();
let global_costs = [1.0_f64, 2.0, 3.0, 4.0];
let global_n = global_costs.len();
#[allow(clippy::cast_precision_loss)]
let global_n_f64 = global_n as f64;
let cost_sum: f64 = global_costs.iter().sum();
let cost_sum_sq: f64 = global_costs.iter().map(|c| c * c).sum();
let mean = cost_sum / global_n_f64;
let variance = (cost_sum_sq - global_n_f64 * mean * mean) / (global_n_f64 - 1.0);
let expected_std = variance.max(0.0).sqrt();
let expected_mean = mean;
assert_eq!(
result_single.global_ub_mean.to_bits(),
expected_mean.to_bits(),
"mean must be bit-identical to sequential summation of [1,2,3,4]"
);
assert_eq!(
result_single.global_ub_std.to_bits(),
expected_std.to_bits(),
"std must be bit-identical to sequential summation of [1,2,3,4]"
);
}
#[test]
fn bessel_correction_single_scenario_zero_std_and_ci() {
let local = ForwardResult {
scenario_costs: vec![500.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 1).unwrap();
assert_eq!(
result.global_ub_std, 0.0,
"std must be 0.0 for a single scenario (N=1 Bessel correction)"
);
assert_eq!(
result.ci_95_half_width, 0.0,
"ci_95 must be 0.0 for a single scenario"
);
}
#[test]
fn negative_variance_guard_produces_zero_std_not_nan() {
let v = 1.0e15_f64;
let local = ForwardResult {
scenario_costs: vec![v, v],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
assert!(
!result.global_ub_std.is_nan(),
"std must not be NaN even when floating-point variance is slightly negative"
);
assert_eq!(
result.global_ub_std, 0.0,
"std must be 0.0 when variance is zero (or clamps from tiny negative)"
);
}
#[test]
fn sync_forward_local_backend_global_equals_local() {
let local = ForwardResult {
scenario_costs: vec![420.0, 420.0],
elapsed_ms: 5,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
assert_eq!(
result.global_ub_mean, 420.0,
"global_ub_mean must equal the arithmetic mean of the cost vector"
);
}
#[test]
fn sync_forward_sync_time_ms_is_valid_u64() {
let local = ForwardResult {
scenario_costs: vec![50.0, 50.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
let _ = result.sync_time_ms;
}
#[test]
fn sync_forward_comm_error_wraps_as_sddp_communication() {
use cobre_comm::CommError;
struct FailingComm;
impl Communicator for FailingComm {
fn allgatherv<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn allreduce<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn broadcast<T: CommData>(
&self,
_buf: &mut [T],
_root: usize,
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn barrier(&self) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
1
}
}
let local = ForwardResult {
scenario_costs: vec![100.0],
elapsed_ms: 0,
lp_solves: 0,
};
let comm = FailingComm;
let err = sync_forward(&local, &comm, 1).unwrap_err();
assert!(
matches!(err, crate::SddpError::Communication(_)),
"CommError must be wrapped as SddpError::Communication, got: {err:?}"
);
}
fn run_one_iteration(
ws: &mut SolverWorkspace<MockSolver>,
basis_store: &mut BasisStore,
) -> Result<(), crate::SddpError> {
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(3, indexer.n_state, 1, 100, 0);
let config = TrainingConfig {
forward_passes: 1,
max_iterations: 100,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender: None,
cut_activity_tolerance: 0.0,
n_fwd_threads: 1,
max_blocks: 1,
cut_selection: None,
shutdown_flag: None,
start_iteration: 0,
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
run_forward_pass(
std::slice::from_mut(ws),
basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.map(|_| ())
}
#[test]
fn warm_start_first_iteration_cold_second_iteration_warm() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 3);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert_eq!(
ws.solver.warm_start_calls, 0,
"first iteration must use cold-start for all stages (warm_start_calls == 0)"
);
assert!(
(0..3).all(|t| basis_store.get(0, t).is_some()),
"basis_store must be fully populated for scenario 0 after the first iteration"
);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert!(
ws.solver.warm_start_calls > 0,
"second iteration must use warm-start for at least one stage \
(warm_start_calls > 0, got {})",
ws.solver.warm_start_calls
);
}
#[test]
fn basis_invalidated_on_solver_error() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::infeasible_on(solution, 4);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 3);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert!(
(0..3).all(|t| basis_store.get(0, t).is_some()),
"basis_store must be fully populated for scenario 0 after iteration 1"
);
let err = run_one_iteration(&mut ws, &mut basis_store).unwrap_err();
assert!(
matches!(err, crate::SddpError::Infeasible { stage: 1, .. }),
"expected Infeasible at stage 1, got: {err:?}"
);
assert!(
basis_store.get(0, 1).is_none(),
"basis_store.get(0, 1) must be None after solver error at stage 1"
);
assert!(
basis_store.get(0, 0).is_some(),
"basis_store.get(0, 0) must be Some (stage 0 succeeded before error)"
);
}
#[test]
fn test_forward_pass_parallel_cost_agreement() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10;
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let mut ws1 = single_workspace(MockSolver::always_ok(solution.clone()), &indexer);
let mut records1 = empty_records(n_scenarios * 3);
let mut basis_store1 = BasisStore::new(n_scenarios, templates.len());
let result1 = run_forward_pass(
std::slice::from_mut(&mut ws1),
&mut basis_store1,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
},
&mut records1,
)
.unwrap();
let mut workspaces4: Vec<SolverWorkspace<MockSolver>> = (0..4)
.map(|_| single_workspace(MockSolver::always_ok(solution.clone()), &indexer))
.collect();
let mut records4 = empty_records(n_scenarios * 3);
let mut basis_store4 = BasisStore::new(n_scenarios, templates.len());
let result4 = run_forward_pass(
&mut workspaces4,
&mut basis_store4,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
},
&mut records4,
)
.unwrap();
assert_eq!(
result1.scenario_costs.len(),
result4.scenario_costs.len(),
"scenario_costs length must be identical for 1 and 4 workspaces"
);
for (i, (c1, c4)) in result1
.scenario_costs
.iter()
.zip(result4.scenario_costs.iter())
.enumerate()
{
assert_eq!(
c1.to_bits(),
c4.to_bits(),
"scenario_costs[{i}] must be bit-identical: 1-workspace={c1:.17e}, 4-workspace={c4:.17e}"
);
}
}
#[test]
fn test_forward_pass_work_distribution() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let horizon = HorizonMode::Finite { num_stages: 3 };
let num_stages = 3usize;
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10usize;
let n_workers = 4usize;
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|_| single_workspace(MockSolver::always_ok(solution.clone()), &indexer))
.collect();
let mut records = empty_records(n_scenarios * num_stages);
let mut basis_store = BasisStore::new(n_scenarios, num_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let _result = run_forward_pass(
&mut workspaces,
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
for (w, ws) in workspaces.iter().enumerate() {
let (start_m, end_m) = partition(n_scenarios, n_workers, w);
let assigned_scenarios = end_m - start_m;
let expected_solves = assigned_scenarios * num_stages;
let floor_scenarios = n_scenarios / n_workers;
let ceil_scenarios = n_scenarios.div_ceil(n_workers);
assert!(
assigned_scenarios == floor_scenarios || assigned_scenarios == ceil_scenarios,
"worker {w} assigned {assigned_scenarios} scenarios, expected {floor_scenarios} or {ceil_scenarios}"
);
let actual_solves = usize::try_from(ws.solver.statistics().solve_count)
.expect("solve_count fits in usize in tests");
assert_eq!(
actual_solves, expected_solves,
"worker {w} (scenarios [{start_m}, {end_m})) performed {actual_solves} solves, expected {expected_solves}"
);
}
let total_solves: usize = workspaces
.iter()
.map(|ws| {
usize::try_from(ws.solver.statistics().solve_count)
.expect("solve_count fits in usize in tests")
})
.sum();
assert_eq!(
total_solves,
n_scenarios * num_stages,
"total solve count {total_solves} must equal n_scenarios * num_stages = {}",
n_scenarios * num_stages
);
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_1h_1s(mean_m3s: f64, std_m3s: f64) -> StochasticContext {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s,
std_m3s,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
fn minimal_template_1_0_with_base(base_rhs: f64) -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 3,
num_nz: 1,
col_starts: vec![0_i32, 0, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, f64::NEG_INFINITY, 0.0, 0.0],
col_upper: vec![f64::INFINITY; 4],
objective: vec![0.0, 0.0, 0.0, 1.0],
row_lower: vec![0.0, 0.0, base_rhs],
row_upper: vec![0.0, 0.0, base_rhs],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn run_single_stage_forward(
stochastic: &StochasticContext,
inflow_method: &InflowNonNegativityMethod,
base_rhs: f64,
noise_scale_val: f64,
) -> Vec<f64> {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 0.0, indexer.theta, 0.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, 0);
let horizon = HorizonMode::Finite { num_stages: 1 };
let template = minimal_template_1_0_with_base(base_rhs);
let templates = vec![template];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 1);
let noise_scale = vec![noise_scale_val];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let _ = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method,
stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
ws.scratch.noise_buf.clone()
}
#[test]
fn truncation_clamps_negative_inflow_noise() {
let mean_m3s = -1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64; let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let noise_buf_truncation = run_single_stage_forward(
&stochastic,
&InflowNonNegativityMethod::Truncation,
base_rhs,
noise_scale_val,
);
assert_eq!(noise_buf_truncation.len(), 1, "noise_buf must have 1 entry");
assert!(
noise_buf_truncation[0] >= 0.0,
"after truncation, noise_buf[0] must be >= 0 (inflow cannot be negative), got {}",
noise_buf_truncation[0]
);
}
#[test]
fn truncation_no_clamp_when_inflow_positive() {
let mean_m3s = 1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64;
let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let noise_buf_truncation = run_single_stage_forward(
&stochastic,
&InflowNonNegativityMethod::Truncation,
base_rhs,
noise_scale_val,
);
let noise_buf_none = run_single_stage_forward(
&stochastic,
&InflowNonNegativityMethod::None,
base_rhs,
noise_scale_val,
);
assert_eq!(noise_buf_truncation.len(), 1);
assert_eq!(noise_buf_none.len(), 1);
assert_eq!(
noise_buf_truncation[0].to_bits(),
noise_buf_none[0].to_bits(),
"when inflow is positive, truncation must not alter the noise buffer (expected identical bits)"
);
}
#[test]
fn none_method_unchanged_with_truncation_code_present() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let config = TrainingConfig {
forward_passes: 2,
max_iterations: 100,
checkpoint_interval: None,
warm_start_cuts: 0,
event_sender: None,
cut_activity_tolerance: 0.0,
n_fwd_threads: 1,
max_blocks: 1,
cut_selection: None,
shutdown_flag: None,
start_iteration: 0,
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
for (i, record) in records.iter().enumerate() {
assert_eq!(
record.stage_cost, 70_000.0,
"none_method: record[{i}].stage_cost should be 70_000.0 ((objective - theta) * COST_SCALE_FACTOR)"
);
}
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_1_hydro_1_load_bus(mean_mw: f64, std_mw: f64) -> StochasticContext {
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: cobre_core::entities::hydro::HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(10),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 20.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let load_model = LoadModel {
bus_id: EntityId(1),
stage_id: 0,
mean_mw,
std_mw,
};
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles: std::collections::BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.load_models(vec![load_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
#[test]
fn test_forward_pass_parallel_infeasibility() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, 0);
let horizon = HorizonMode::Finite { num_stages: 3 };
let num_stages = 3usize;
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10usize;
let n_workers = 4usize;
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|w| {
let solver = if w == 1 {
MockSolver::infeasible_on(solution.clone(), 0)
} else {
MockSolver::always_ok(solution.clone())
};
single_workspace(solver, &indexer)
})
.collect();
let mut records = empty_records(n_scenarios * num_stages);
let mut basis_store = BasisStore::new(n_scenarios, num_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let result = run_forward_pass(
&mut workspaces,
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
},
&mut records,
);
match result {
Err(crate::SddpError::Infeasible {
stage,
scenario,
iteration,
}) => {
assert_eq!(
stage, 0,
"infeasible stage must be 0 (first stage of worker 1)"
);
assert_eq!(
scenario, 3,
"infeasible scenario must be 3 (start_m of worker 1)"
);
assert_eq!(
iteration, 0,
"iteration must be 0 (first training iteration)"
);
}
Err(other) => panic!("expected SddpError::Infeasible, got: {other:?}"),
Ok(_) => panic!("expected Err(SddpError::Infeasible), got Ok"),
}
}
#[test]
fn forward_pass_load_noise_positive_realization() {
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus(300.0, 30.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, n_load_buses, 1);
let mut ws = SolverWorkspace {
solver: MockSolver::always_ok(fixed_solution(4, 100.0, indexer.theta, 30.0)),
patch_buf,
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(1),
inflow_m3s_buf: Vec::with_capacity(1),
lag_matrix_buf: Vec::with_capacity(0),
par_inflow_buf: Vec::with_capacity(1),
eta_floor_buf: Vec::with_capacity(1),
zero_targets_buf: vec![0.0_f64; 1],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
};
let templates = vec![minimal_template_1_0_with_base(100.0)];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, 0);
let horizon = HorizonMode::Finite { num_stages: 1 };
let mut basis_store = BasisStore::new(1, 1);
let load_balance_row_starts = vec![10usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[1.0],
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry (1 load bus x 1 block)"
);
assert!(
ws.scratch.load_rhs_buf[0] > 0.0,
"load realization must be positive with mean=300, std=30: got {}",
ws.scratch.load_rhs_buf[0]
);
let cat4_start = 2;
assert_eq!(
ws.patch_buf.lower[cat4_start], ws.scratch.load_rhs_buf[0],
"patch_buf lower must equal load_rhs_buf[0]"
);
assert_eq!(
ws.patch_buf.upper[cat4_start], ws.scratch.load_rhs_buf[0],
"patch_buf upper must equal load_rhs_buf[0] (equality constraint)"
);
assert_eq!(
ws.patch_buf.indices[cat4_start], 10,
"patch index must be load_balance_row_starts[0] + 0 * n_blks"
);
}
#[test]
fn forward_pass_load_noise_clamped_to_zero() {
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus(-1000.0, 1.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, n_load_buses, 1);
let mut ws = SolverWorkspace {
solver: MockSolver::always_ok(fixed_solution(4, 100.0, indexer.theta, 30.0)),
patch_buf,
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(1),
inflow_m3s_buf: Vec::with_capacity(1),
lag_matrix_buf: Vec::with_capacity(0),
par_inflow_buf: Vec::with_capacity(1),
eta_floor_buf: Vec::with_capacity(1),
zero_targets_buf: vec![0.0_f64; 1],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
};
let templates = vec![minimal_template_1_0_with_base(100.0)];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, 0);
let horizon = HorizonMode::Finite { num_stages: 1 };
let mut basis_store = BasisStore::new(1, 1);
let load_balance_row_starts = vec![10usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[1.0],
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry (1 load bus x 1 block)"
);
assert_eq!(
ws.scratch.load_rhs_buf[0], 0.0,
"realization with mean=-1000 must be clamped to 0.0, got {}",
ws.scratch.load_rhs_buf[0]
);
let cat4_start = 2;
assert_eq!(
ws.patch_buf.lower[cat4_start], 0.0,
"patch lower must be 0.0 (clamped)"
);
assert_eq!(
ws.patch_buf.upper[cat4_start], 0.0,
"patch upper must be 0.0 (clamped)"
);
}
#[test]
fn forward_pass_no_load_buses_unchanged() {
let stochastic = make_stochastic_context_1_hydro_3_stages();
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let mut ws = single_workspace(MockSolver::always_ok(solution), &indexer);
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(3); let fcf = FutureCostFunction::new(3, indexer.n_state, 1, 10, 0);
let horizon = HorizonMode::Finite { num_stages: 3 };
let mut basis_store = BasisStore::new(1, 3);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[], n_hydros: 0, n_load_buses: 0, load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1, 1, 1],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
},
&ForwardPassBatch {
local_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.patch_buf.forward_patch_count(),
2,
"forward_patch_count must be N*(2+L)=2 when n_load_buses=0, got {}",
ws.patch_buf.forward_patch_count()
);
assert!(
ws.scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
}
struct RecordingMockSolver {
last_batch: Option<RowBatch>,
add_rows_count: usize,
}
impl RecordingMockSolver {
fn new() -> Self {
Self {
last_batch: None,
add_rows_count: 0,
}
}
}
impl SolverInterface for RecordingMockSolver {
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, cuts: &RowBatch) {
self.last_batch = Some(RowBatch {
num_rows: cuts.num_rows,
row_starts: cuts.row_starts.clone(),
col_indices: cuts.col_indices.clone(),
values: cuts.values.clone(),
row_lower: cuts.row_lower.clone(),
row_upper: cuts.row_upper.clone(),
});
self.add_rows_count += 1;
}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
Err(SolverError::InternalError {
message: "not implemented for test".to_string(),
error_code: None,
})
}
fn reset(&mut self) {}
fn get_basis(&mut self, _out: &mut Basis) {}
fn solve_with_basis(
&mut self,
_basis: &Basis,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
Err(SolverError::InternalError {
message: "not implemented for test".to_string(),
error_code: None,
})
}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"RecordingMock"
}
}
fn empty_row_batch() -> RowBatch {
RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
}
}
#[test]
fn append_new_cuts_returns_zero_when_no_new_cuts() {
use crate::cut::CutRowMap;
let fcf = crate::FutureCostFunction::new(2, 1, 1, 10, 0);
let indexer = crate::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 0);
assert_eq!(solver.add_rows_count, 0);
}
#[test]
fn append_new_cuts_appends_all_on_empty_row_map() {
use crate::cut::CutRowMap;
let mut fcf = crate::FutureCostFunction::new(2, 1, 1, 10, 0);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 2);
assert_eq!(solver.add_rows_count, 1);
assert_eq!(row_map.total_cut_rows(), 2);
assert_eq!(row_map.active_count(), 2);
assert_eq!(row_map.lp_row_for_slot(0), Some(5));
assert_eq!(row_map.lp_row_for_slot(1), Some(6));
}
#[test]
fn append_new_cuts_skips_already_mapped_cuts() {
use crate::cut::CutRowMap;
let mut fcf = crate::FutureCostFunction::new(2, 1, 1, 10, 0);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 1);
assert_eq!(solver.add_rows_count, 1);
assert_eq!(row_map.total_cut_rows(), 2);
assert!(solver.last_batch.as_ref().is_some_and(|b| b.num_rows == 1));
}
#[test]
fn append_new_cuts_matches_build_cut_row_batch_into() {
use crate::cut::CutRowMap;
let mut fcf = crate::FutureCostFunction::new(2, 1, 1, 10, 0);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::StageIndexer::new(1, 0);
let mut expected_batch = empty_row_batch();
super::build_cut_row_batch_into(&mut expected_batch, &fcf, 0, &indexer, &[]);
let mut row_map = CutRowMap::new(10, 5);
let mut actual_batch = empty_row_batch();
let mut solver = RecordingMockSolver::new();
super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut actual_batch,
);
assert_eq!(actual_batch.num_rows, expected_batch.num_rows);
assert_eq!(actual_batch.row_starts, expected_batch.row_starts);
assert_eq!(actual_batch.col_indices, expected_batch.col_indices);
assert_eq!(actual_batch.values, expected_batch.values);
assert_eq!(actual_batch.row_lower, expected_batch.row_lower);
assert_eq!(actual_batch.row_upper, expected_batch.row_upper);
}
#[test]
fn append_new_cuts_with_scaling_matches_build() {
use crate::cut::CutRowMap;
let mut fcf = crate::FutureCostFunction::new(2, 1, 1, 10, 0);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]);
let indexer = crate::StageIndexer::new(1, 0);
let col_scale = vec![0.5, 2.0, 1.0, 0.1];
let mut expected = empty_row_batch();
super::build_cut_row_batch_into(&mut expected, &fcf, 0, &indexer, &col_scale);
let mut row_map = CutRowMap::new(10, 5);
let mut actual = empty_row_batch();
let mut solver = RecordingMockSolver::new();
super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&col_scale,
&mut row_map,
&mut actual,
);
assert_eq!(actual.values, expected.values);
assert_eq!(actual.col_indices, expected.col_indices);
}
struct BoundRecordingMockSolver {
last_indices: Vec<usize>,
last_lower: Vec<f64>,
last_upper: Vec<f64>,
set_row_bounds_count: usize,
}
impl BoundRecordingMockSolver {
fn new() -> Self {
Self {
last_indices: Vec::new(),
last_lower: Vec::new(),
last_upper: Vec::new(),
set_row_bounds_count: 0,
}
}
}
impl SolverInterface for BoundRecordingMockSolver {
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, _cuts: &RowBatch) {}
fn set_row_bounds(&mut self, indices: &[usize], lower: &[f64], upper: &[f64]) {
self.last_indices = indices.to_vec();
self.last_lower = lower.to_vec();
self.last_upper = upper.to_vec();
self.set_row_bounds_count += 1;
}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
Err(SolverError::InternalError {
message: "not implemented".to_string(),
error_code: None,
})
}
fn reset(&mut self) {}
fn get_basis(&mut self, _out: &mut Basis) {}
fn solve_with_basis(
&mut self,
_basis: &Basis,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
Err(SolverError::InternalError {
message: "not implemented".to_string(),
error_code: None,
})
}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"BoundRecordingMock"
}
}
#[test]
fn deactivate_cuts_empty_set_returns_zero() {
use crate::cut::CutRowMap;
use crate::cut_selection::DeactivationSet;
let mut solver = BoundRecordingMockSolver::new();
let mut row_map = CutRowMap::new(10, 5);
let deact = DeactivationSet {
stage_index: 0,
indices: vec![],
};
let count = super::deactivate_cuts_in_lp(&mut solver, &deact, &mut row_map);
assert_eq!(count, 0);
assert_eq!(solver.set_row_bounds_count, 0);
}
#[test]
fn deactivate_cuts_zeros_bounds_for_mapped_slots() {
use crate::cut::CutRowMap;
use crate::cut_selection::DeactivationSet;
let mut solver = BoundRecordingMockSolver::new();
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0); row_map.insert(1); row_map.insert(2);
let deact = DeactivationSet {
stage_index: 0,
indices: vec![0, 2], };
let count = super::deactivate_cuts_in_lp(&mut solver, &deact, &mut row_map);
assert_eq!(count, 2);
assert_eq!(solver.set_row_bounds_count, 1);
assert_eq!(solver.last_indices, vec![5, 7]);
assert!(solver.last_lower.iter().all(|&v| v == f64::NEG_INFINITY));
assert!(solver.last_upper.iter().all(|&v| v == f64::INFINITY));
assert_eq!(row_map.active_count(), 1); }
#[test]
fn deactivate_cuts_skips_unmapped_slots() {
use crate::cut::CutRowMap;
use crate::cut_selection::DeactivationSet;
let mut solver = BoundRecordingMockSolver::new();
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0);
let deact = DeactivationSet {
stage_index: 0,
indices: vec![0, 3],
};
let count = super::deactivate_cuts_in_lp(&mut solver, &deact, &mut row_map);
assert_eq!(count, 1);
assert_eq!(solver.last_indices, vec![5]);
}
#[test]
fn deactivate_cuts_preserves_row_mapping() {
use crate::cut::CutRowMap;
use crate::cut_selection::DeactivationSet;
let mut solver = BoundRecordingMockSolver::new();
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0);
let deact = DeactivationSet {
stage_index: 0,
indices: vec![0],
};
super::deactivate_cuts_in_lp(&mut solver, &deact, &mut row_map);
assert_eq!(row_map.lp_row_for_slot(0), Some(5));
assert!(!row_map.is_slot_active(0));
}
#[test]
fn deactivate_already_deactivated_slot_is_noop() {
use crate::cut::CutRowMap;
use crate::cut_selection::DeactivationSet;
let mut solver = BoundRecordingMockSolver::new();
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0);
row_map.deactivate(0);
let deact = DeactivationSet {
stage_index: 0,
indices: vec![0],
};
let count = super::deactivate_cuts_in_lp(&mut solver, &deact, &mut row_map);
assert_eq!(count, 0);
assert_eq!(solver.set_row_bounds_count, 0);
}
}