use std::time::Instant;
use cobre_comm::Communicator;
use cobre_solver::{RowBatch, SolverError, SolverInterface};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use crate::{
FutureCostFunction, SddpError, TrajectoryRecord,
context::{StageContext, TrainingContext},
cut_sync::CutSyncBuffers,
forward::{build_cut_row_batch_into, partition},
noise::{transform_inflow_noise, transform_load_noise, transform_ncs_noise},
risk_measure::BackwardOutcome,
risk_measure::RiskMeasure,
solver_stats::{SolverStatsDelta, aggregate_solver_statistics},
state_exchange::ExchangeBuffers,
workspace::{BasisStore, SolverWorkspace},
};
#[derive(Debug, Clone)]
#[must_use]
pub struct BackwardResult {
pub cuts_generated: usize,
pub elapsed_ms: u64,
pub lp_solves: u64,
pub stage_stats: Vec<(usize, SolverStatsDelta)>,
pub state_exchange_time_ms: u64,
pub cut_batch_build_time_ms: u64,
pub rayon_overhead_time_ms: u64,
pub cut_sync_time_ms: u64,
}
struct StagedCut {
trial_point_idx: usize,
intercept: f64,
coefficients: Vec<f64>,
forward_pass_index: u32,
binding_increments: Vec<(usize, u64)>,
}
pub struct BackwardPassSpec<'a> {
pub exchange: &'a mut ExchangeBuffers,
pub records: &'a [TrajectoryRecord],
pub iteration: u64,
pub local_work: usize,
pub fwd_offset: usize,
pub risk_measures: &'a [RiskMeasure],
pub cut_activity_tolerance: f64,
pub cut_sync_bufs: &'a mut CutSyncBuffers,
pub probabilities_buf: &'a mut Vec<f64>,
pub successor_active_slots_buf: &'a mut Vec<usize>,
pub visited_archive: Option<&'a mut crate::visited_states::VisitedStatesArchive>,
}
struct SuccessorSpec<'a> {
t: usize,
successor: usize,
my_rank: usize,
probabilities: &'a [f64],
cut_batch: &'a RowBatch,
num_cuts_at_successor: usize,
template_num_rows: usize,
successor_active_slots: &'a [usize],
basis_store: &'a BasisStore,
cut_activity_tolerance: f64,
successor_populated_count: usize,
}
struct TrialAccumulators {
outcomes: Vec<BackwardOutcome>,
slot_increments: Vec<u64>,
}
fn load_backward_lp<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
ctx: &StageContext<'_>,
cut_batch: &RowBatch,
s: usize,
) {
ws.solver.load_model(&ctx.templates[s]);
ws.solver.add_rows(cut_batch);
}
fn patch_opening_bounds<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
raw_noise: &[f64],
x_hat: &[f64],
s: usize,
) {
let n_blks = if ctx.n_load_buses > 0 {
ctx.block_counts_per_stage[s]
} else {
0
};
transform_inflow_noise(raw_noise, s, x_hat, ctx, training_ctx, &mut ws.scratch);
transform_load_noise(
raw_noise,
ctx.n_hydros,
ctx.n_load_buses,
training_ctx.stochastic,
s,
n_blks,
&mut ws.scratch.load_rhs_buf,
);
let n_stochastic_ncs = training_ctx.stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 {
transform_ncs_noise(
raw_noise,
ctx.n_hydros,
ctx.n_load_buses,
training_ctx.stochastic,
s,
ctx.block_counts_per_stage[s],
ctx.ncs_max_gen,
&mut ws.scratch.ncs_col_upper_buf,
);
}
ws.patch_buf.fill_forward_patches(
training_ctx.indexer,
x_hat,
&ws.scratch.noise_buf,
ctx.base_rows[s],
&ctx.templates[s].row_scale,
);
if ctx.n_load_buses > 0 {
ws.patch_buf.fill_load_patches(
ctx.load_balance_row_starts[s],
n_blks,
&ws.scratch.load_rhs_buf,
ctx.load_bus_indices,
&ctx.templates[s].row_scale,
);
}
ws.patch_buf.fill_z_inflow_patches(
training_ctx.indexer.z_inflow_row_start,
&ws.scratch.z_inflow_rhs_buf,
&ctx.templates[s].row_scale,
);
let pc = ws.patch_buf.forward_patch_count();
ws.solver.set_row_bounds(
&ws.patch_buf.indices[..pc],
&ws.patch_buf.lower[..pc],
&ws.patch_buf.upper[..pc],
);
if n_stochastic_ncs > 0 && !training_ctx.indexer.ncs_generation.is_empty() {
let n_blks_stage = ctx.block_counts_per_stage[s];
let expected_len = n_stochastic_ncs * n_blks_stage;
if ws.scratch.ncs_col_indices_buf.len() != expected_len {
ws.scratch.ncs_col_indices_buf.clear();
ws.scratch.ncs_col_lower_buf.clear();
for ncs_idx in 0..n_stochastic_ncs {
for blk in 0..n_blks_stage {
ws.scratch.ncs_col_indices_buf.push(
training_ctx.indexer.ncs_generation.start + ncs_idx * n_blks_stage + blk,
);
ws.scratch.ncs_col_lower_buf.push(0.0);
}
}
}
ws.solver.set_col_bounds(
&ws.scratch.ncs_col_indices_buf,
&ws.scratch.ncs_col_lower_buf,
&ws.scratch.ncs_col_upper_buf,
);
}
}
fn process_trial_point_backward<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
spec: &BackwardPassSpec<'_>,
succ: &SuccessorSpec<'_>,
m: usize,
accum: &mut TrialAccumulators,
) -> Result<StagedCut, SddpError> {
let indexer = training_ctx.indexer;
let tree_view = training_ctx.stochastic.tree_view();
let x_hat = spec.exchange.state_at(succ.my_rank, m);
let scenario = spec.fwd_offset + m;
let s = succ.successor;
for omega in 0..succ.probabilities.len() {
let raw_noise = tree_view.opening(s, omega);
patch_opening_bounds(ws, ctx, training_ctx, raw_noise, x_hat, s);
let view = if omega == 0 {
if let Some(rb) = succ.basis_store.get(m, s) {
ws.solver.solve_with_basis(rb)
} else {
ws.solver.solve()
}
} else {
ws.solver.solve()
}
.map_err(|e| match e {
SolverError::Infeasible => SddpError::Infeasible {
stage: succ.t,
iteration: spec.iteration,
scenario,
},
other => SddpError::Solver(other),
})?;
let objective = view.objective;
let row_scale = &ctx.templates[s].row_scale;
let out = &mut accum.outcomes[omega];
if row_scale.is_empty() {
out.coefficients
.copy_from_slice(&view.dual[..indexer.n_state]);
} else {
for (i, &d) in view.dual[..indexer.n_state].iter().enumerate() {
out.coefficients[i] = d * row_scale[i];
}
}
out.objective_value = objective;
out.intercept = objective
- out
.coefficients
.iter()
.zip(x_hat)
.map(|(pi, x)| pi * x)
.sum::<f64>();
if succ.num_cuts_at_successor > 0 {
let cut_duals = &view.dual
[succ.template_num_rows..succ.template_num_rows + succ.num_cuts_at_successor];
for (cut_idx, &slot) in succ.successor_active_slots.iter().enumerate() {
if cut_duals[cut_idx] > succ.cut_activity_tolerance {
accum.slot_increments[slot] += 1;
}
}
}
}
let (agg_intercept, agg_coefficients) =
spec.risk_measures[succ.t].aggregate_cut(&accum.outcomes, succ.probabilities);
debug_assert!(
u32::try_from(scenario).is_ok(),
"global scenario index overflows u32"
);
#[allow(clippy::cast_possible_truncation)]
let forward_pass_index = scenario as u32;
let binding_increments: Vec<(usize, u64)> = accum
.slot_increments
.iter()
.enumerate()
.filter(|&(_, c)| *c > 0)
.map(|(s, c)| (s, *c))
.collect();
Ok(StagedCut {
trial_point_idx: m,
intercept: agg_intercept,
coefficients: agg_coefficients,
forward_pass_index,
binding_increments,
})
}
fn process_stage_backward<S: SolverInterface + Send>(
workspaces: &mut [SolverWorkspace<S>],
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
spec: &BackwardPassSpec<'_>,
succ: &SuccessorSpec<'_>,
) -> Vec<Result<Vec<StagedCut>, SddpError>> {
let n_openings = succ.probabilities.len();
let n_workers = workspaces.len().max(1);
let local_work = spec.local_work;
workspaces
.par_iter_mut()
.enumerate()
.map(|(worker_id, ws)| {
let (start_m, end_m) = partition(local_work, n_workers, worker_id);
if start_m < end_m {
load_backward_lp(ws, ctx, succ.cut_batch, succ.successor);
}
let mut staged: Vec<StagedCut> = Vec::with_capacity(end_m - start_m);
let n_state = training_ctx.indexer.n_state;
let mut accum = TrialAccumulators {
outcomes: (0..n_openings)
.map(|_| BackwardOutcome {
intercept: 0.0,
coefficients: vec![0.0; n_state],
objective_value: 0.0,
})
.collect(),
slot_increments: vec![0u64; succ.successor_populated_count],
};
for m in start_m..end_m {
accum.slot_increments.fill(0);
staged.push(process_trial_point_backward(
ws,
ctx,
training_ctx,
spec,
succ,
m,
&mut accum,
)?);
}
Ok(staged)
})
.collect()
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
pub fn run_backward_pass<S: SolverInterface + Send, C: Communicator>(
workspaces: &mut [SolverWorkspace<S>],
basis_store: &BasisStore,
ctx: &StageContext<'_>,
fcf: &mut FutureCostFunction,
cut_batches: &mut [RowBatch],
training_ctx: &TrainingContext<'_>,
spec: &mut BackwardPassSpec<'_>,
comm: &C,
) -> Result<BackwardResult, SddpError> {
let TrainingContext {
horizon,
indexer,
stochastic,
..
} = training_ctx;
let num_stages = horizon.num_stages();
let my_rank = comm.rank();
debug_assert_eq!(ctx.templates.len(), num_stages);
debug_assert_eq!(ctx.base_rows.len(), num_stages);
debug_assert_eq!(spec.risk_measures.len(), num_stages);
let start = Instant::now();
let solves_before: u64 = workspaces
.iter()
.map(|ws| ws.solver.statistics().solve_count)
.sum();
let mut cuts_generated: usize = 0;
let mut stage_stats: Vec<(usize, SolverStatsDelta)> = Vec::new();
let mut state_exchange_ms: u64 = 0;
let mut cut_batch_build_ms: u64 = 0;
let mut rayon_overhead_ms: u64 = 0;
let mut cut_sync_ms: u64 = 0;
#[allow(clippy::cast_precision_loss)]
let n_workers = workspaces.len() as f64;
let tree_view = stochastic.tree_view();
let mut staged_cuts_buf: Vec<StagedCut> = Vec::new();
for t in (0..num_stages.saturating_sub(1)).rev() {
if !spec.records.is_empty() {
let exch_start = Instant::now();
spec.exchange.exchange(spec.records, t, num_stages, comm)?;
#[allow(clippy::cast_possible_truncation)]
{
state_exchange_ms += exch_start.elapsed().as_millis() as u64;
}
}
if let Some(ref mut archive) = spec.visited_archive {
let total_fwd = spec.exchange.total_scenarios();
archive.archive_gathered_states(t, spec.exchange.gathered_states(), total_fwd);
}
let successor = t + 1;
let stage_stats_before = {
let pool_stats: Vec<_> = workspaces.iter().map(|w| w.solver.statistics()).collect();
aggregate_solver_statistics(&pool_stats)
};
let n_openings = tree_view.n_openings(successor);
spec.probabilities_buf.clear();
#[allow(clippy::cast_precision_loss)]
spec.probabilities_buf
.resize(n_openings, 1.0_f64 / n_openings as f64);
let batch_start = Instant::now();
build_cut_row_batch_into(
&mut cut_batches[successor],
fcf,
successor,
indexer,
&ctx.templates[successor].col_scale,
);
#[allow(clippy::cast_possible_truncation)]
{
cut_batch_build_ms += batch_start.elapsed().as_millis() as u64;
}
let num_cuts_at_successor = cut_batches[successor].num_rows;
let template_num_rows = ctx.templates[successor].num_rows;
spec.successor_active_slots_buf.clear();
spec.successor_active_slots_buf
.extend(fcf.active_cuts(successor).map(|(slot, _, _)| slot));
let succ_spec = SuccessorSpec {
t,
successor,
my_rank,
probabilities: spec.probabilities_buf,
cut_batch: &cut_batches[successor],
num_cuts_at_successor,
template_num_rows,
successor_active_slots: spec.successor_active_slots_buf,
basis_store,
cut_activity_tolerance: spec.cut_activity_tolerance,
successor_populated_count: fcf.pools[successor].populated_count,
};
let process_start = Instant::now();
let worker_staged = process_stage_backward(workspaces, ctx, training_ctx, spec, &succ_spec);
staged_cuts_buf.clear();
for worker_result in worker_staged {
staged_cuts_buf.extend(worker_result?);
}
debug_assert!(
staged_cuts_buf
.windows(2)
.all(|w| w[0].trial_point_idx <= w[1].trial_point_idx),
"backward pass cuts must be sorted by trial_point_idx after worker concatenation"
);
for cut in &staged_cuts_buf {
fcf.add_cut(
t,
spec.iteration,
cut.forward_pass_index,
cut.intercept,
&cut.coefficients,
);
cuts_generated += 1;
for &(slot, increment) in &cut.binding_increments {
fcf.pools[successor].metadata[slot].active_count += increment;
fcf.pools[successor].metadata[slot].last_active_iter = spec.iteration;
}
}
let sync_start = Instant::now();
let n_local = spec.cut_sync_bufs.pack_local_cuts(fcf, t, spec.iteration);
spec.cut_sync_bufs.sync_packed_cuts(t, n_local, fcf, comm)?;
#[allow(clippy::cast_possible_truncation)]
{
cut_sync_ms += sync_start.elapsed().as_millis() as u64;
}
let stage_stats_after = {
let pool_stats: Vec<_> = workspaces.iter().map(|w| w.solver.statistics()).collect();
aggregate_solver_statistics(&pool_stats)
};
let stage_delta = SolverStatsDelta::from_snapshots(&stage_stats_before, &stage_stats_after);
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
{
let process_wall_ms = process_start.elapsed().as_millis() as u64;
let avg_solve_ms = (stage_delta.solve_time_ms / n_workers) as u64;
rayon_overhead_ms += process_wall_ms.saturating_sub(avg_solve_ms);
}
stage_stats.push((successor, stage_delta));
}
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
let solves_after: u64 = workspaces
.iter()
.map(|ws| ws.solver.statistics().solve_count)
.sum();
Ok(BackwardResult {
cuts_generated,
elapsed_ms,
lp_solves: solves_after - solves_before,
stage_stats,
state_exchange_time_ms: state_exchange_ms,
cut_batch_build_time_ms: cut_batch_build_ms,
rayon_overhead_time_ms: rayon_overhead_ms,
cut_sync_time_ms: cut_sync_ms,
})
}
#[cfg(test)]
mod tests {
use cobre_comm::{CommData, CommError, Communicator, ReduceOp};
use cobre_solver::{
Basis, LpSolution, RowBatch, SolverError, SolverInterface, SolverStatistics, StageTemplate,
};
use super::{BackwardPassSpec, BackwardResult, run_backward_pass};
use crate::{
ExchangeBuffers, FutureCostFunction, HorizonMode, InflowNonNegativityMethod, RiskMeasure,
StageIndexer, TrajectoryRecord,
context::{StageContext, TrainingContext},
cut_sync::CutSyncBuffers,
workspace::{BasisStore, SolverWorkspace},
};
fn empty_cut_batches(n_stages: usize) -> Vec<RowBatch> {
(0..n_stages)
.map(|_| RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
})
.collect()
}
struct StubComm;
impl Communicator for StubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn allreduce<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
unreachable!("StubComm allreduce not used in backward pass tests")
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
unreachable!("StubComm broadcast not used in backward pass tests")
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
1
}
}
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
current_num_rows: usize,
warm_start_calls: usize,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
}
impl SolverInterface for MockSolver {
fn load_model(&mut self, template: &StageTemplate) {
self.current_num_rows = template.num_rows;
}
fn add_rows(&mut self, cuts: &RowBatch) {
self.current_num_rows += cuts.num_rows;
}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_dual.resize(self.current_num_rows, 0.0);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
fn reset(&mut self) {}
fn get_basis(&mut self, _out: &mut Basis) {}
fn solve_with_basis(
&mut self,
_basis: &Basis,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
self.warm_start_calls += 1;
self.solve()
}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 3,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn solution_1_0(objective: f64, dual_storage: f64) -> LpSolution {
LpSolution {
objective,
primal: vec![0.0, 0.0, 0.0],
dual: vec![dual_storage],
reduced_costs: vec![0.0; 3],
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn single_workspace(solver: MockSolver, n_state: usize) -> Vec<SolverWorkspace<MockSolver>> {
use crate::lp_builder::PatchBuffer;
vec![SolverWorkspace {
solver,
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}]
}
fn empty_basis_store(num_scenarios: usize, num_stages: usize) -> BasisStore {
BasisStore::new(num_scenarios, num_stages)
}
fn basis_store_with_one(
num_scenarios: usize,
num_stages: usize,
scenario: usize,
stage: usize,
basis: Basis,
) -> BasisStore {
let mut store = BasisStore::new(num_scenarios, num_stages);
*store.get_mut(scenario, stage) = Some(basis);
store
}
fn exchange_with_states(n_state: usize, states: Vec<Vec<f64>>) -> ExchangeBuffers {
use cobre_comm::LocalBackend;
let local_count = states.len();
let mut bufs = ExchangeBuffers::new(n_state, local_count, 1);
let records: Vec<TrajectoryRecord> = states
.into_iter()
.map(|state| TrajectoryRecord {
primal: vec![],
dual: vec![],
stage_cost: 0.0,
state,
})
.collect();
let comm = LocalBackend;
bufs.exchange(&records, 0, 1, &comm).unwrap();
bufs
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context(
n_stages: usize,
branching_factor: usize,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::{
Bus, DeficitSegment, EntityId, SystemBuilder,
scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile,
InflowModel,
},
temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
},
};
use cobre_stochastic::context::build_stochastic_context;
use std::collections::BTreeMap;
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation)]
let inflow = |stage_idx: usize| InflowModel {
hydro_id: EntityId(1),
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
stage_id: stage_idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
};
let inflow_models: Vec<InflowModel> = (0..n_stages).map(inflow).collect();
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
#[test]
fn backward_result_fields_accessible() {
let r = BackwardResult {
cuts_generated: 6,
elapsed_ms: 42,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
rayon_overhead_time_ms: 0,
cut_sync_time_ms: 0,
};
assert_eq!(r.cuts_generated, 6);
assert_eq!(r.elapsed_ms, 42);
assert!(r.stage_stats.is_empty());
assert_eq!(r.state_exchange_time_ms, 0);
assert_eq!(r.cut_batch_build_time_ms, 0);
assert_eq!(r.rayon_overhead_time_ms, 0);
assert_eq!(r.cut_sync_time_ms, 0);
}
#[test]
fn backward_result_clone_and_debug() {
let r = BackwardResult {
cuts_generated: 3,
elapsed_ms: 100,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
rayon_overhead_time_ms: 0,
cut_sync_time_ms: 0,
};
let c = r.clone();
assert_eq!(c.cuts_generated, 3);
let s = format!("{r:?}");
assert!(s.contains("BackwardResult"));
}
#[test]
fn dual_extraction_formula_coefficients_are_negated_duals() {
let d0 = 3.5_f64;
let d1 = -1.2_f64;
let dual = [d0, d1];
let coefficients: Vec<f64> = dual.iter().map(|&d| -d).collect();
assert!((coefficients[0] - (-d0)).abs() < f64::EPSILON);
assert!((coefficients[1] - (-d1)).abs() < f64::EPSILON);
}
#[test]
fn intercept_formula_matches_spec() {
let objective = 50.0_f64;
let coefficients = [2.0_f64, -1.0_f64];
let x_hat = [10.0_f64, 5.0_f64];
let pi_dot_x: f64 = coefficients
.iter()
.zip(x_hat.iter())
.map(|(p, x)| p * x)
.sum();
let intercept = objective - pi_dot_x;
assert!((intercept - 35.0).abs() < f64::EPSILON);
}
#[test]
fn single_stage_system_produces_no_cuts() {
let stochastic = make_stochastic_context(1, 2);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let n_stages = 1_usize;
let forward_passes = 2_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 0);
assert_eq!(fcf.total_active_cuts(), 0);
}
#[test]
fn two_stage_system_two_trial_points_generates_two_cuts_at_stage_0() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0); let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state; let forward_passes = 2_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 2);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 0);
}
#[test]
fn cut_inserted_with_correct_stage_iteration_and_forward_pass_index() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 20, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0], vec![10.0], vec![15.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let meta = &fcf.pools[0].metadata[7];
assert_eq!(meta.iteration_generated, 2);
assert_eq!(meta.forward_pass_index, 1);
}
#[test]
fn no_cuts_generated_at_last_stage() {
let n_stages = 5_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 4);
for t in 0..4 {
assert_eq!(fcf.active_cuts(t).count(), 1, "stage {t} should have 1 cut");
}
assert_eq!(fcf.active_cuts(4).count(), 0, "stage 4 must have no cuts");
}
#[test]
fn elapsed_ms_is_non_negative() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 2);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(10.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let _ = result.elapsed_ms;
}
#[test]
fn infeasible_solver_returns_sddp_infeasible_error() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
);
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
}
#[test]
fn expectation_aggregation_mean_of_per_opening_intercepts() {
use crate::risk_measure::BackwardOutcome as BO;
let outcomes = vec![
BO {
intercept: 10.0,
coefficients: vec![],
objective_value: 10.0,
},
BO {
intercept: 20.0,
coefficients: vec![],
objective_value: 20.0,
},
BO {
intercept: 30.0,
coefficients: vec![],
objective_value: 30.0,
},
];
let probs = vec![1.0 / 3.0; 3];
let (intercept, _) = RiskMeasure::Expectation.aggregate_cut(&outcomes, &probs);
assert!(
(intercept - 20.0).abs() < 1e-10,
"expected 20.0, got {intercept}"
);
}
#[test]
fn cut_coefficients_and_intercept_match_dual_extraction_formula() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
assert!(
(intercept - 110.0).abs() < 1e-10,
"expected intercept=110.0, got {intercept}"
);
assert_eq!(coefficients.len(), 1);
assert!(
(coefficients[0] - (-3.0)).abs() < 1e-10,
"expected coefficient=-3.0, got {}",
coefficients[0]
);
}
#[test]
fn cut_gradient_sign_physically_correct() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![50.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1, "expected exactly one cut");
let (_, _intercept, coefficients) = &cuts[0];
assert!(
coefficients[0] < 0.0,
"cut coefficient must be negative (more storage → less future cost), \
got {} — likely the Benders cut sign bug has been reintroduced",
coefficients[0]
);
assert!(
(coefficients[0] - (-2.0)).abs() < 1e-10,
"expected coefficient=-2.0, got {}",
coefficients[0]
);
}
#[test]
fn cut_is_tight_at_trial_point() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let x_hat = 30.0_f64;
let mut exchange = exchange_with_states(n_state, vec![vec![x_hat]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let q_xhat = 200.0_f64; let dual_storage = -4.0_f64;
let solution = solution_1_0(q_xhat, dual_storage);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
let cut_at_xhat = intercept + coefficients[0] * x_hat;
assert!(
(cut_at_xhat - q_xhat).abs() < 1e-10,
"cut must be tight at trial point: \
cut_value={cut_at_xhat}, Q(x̂)={q_xhat}, \
intercept={intercept}, coeff={}, x̂={x_hat}",
coefficients[0]
);
}
#[test]
fn single_rank_backward_pass_with_local_backend_produces_correct_fcf() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 2_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 4);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 2);
assert_eq!(fcf.active_cuts(2).count(), 0);
}
#[test]
fn forward_pass_index_matches_global_scenario_index() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 6_u32; let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 20, 0);
let mut exchange = exchange_with_states(
n_state,
vec![
vec![1.0],
vec![2.0],
vec![3.0],
vec![4.0],
vec![5.0],
vec![6.0],
],
);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let meta = &fcf.pools[0].metadata[17];
assert_eq!(meta.iteration_generated, 2, "iteration_generated must be 2");
assert_eq!(
meta.forward_pass_index, 5,
"forward_pass_index must be 5 (= global m)"
);
}
#[test]
fn warm_start_uses_prepopulated_forward_basis() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let basis_store = basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let warm_start_calls = workspaces[0].solver.warm_start_calls;
assert_eq!(
warm_start_calls, 1,
"first opening at successor stage must call solve_with_basis \
when basis_store.get(0, 1) is pre-populated (warm_start_calls == 1, got {warm_start_calls})"
);
}
#[test]
fn multi_opening_subsequent_openings_use_internal_hotstart() {
let n_stages = 2_usize;
let n_openings = 3_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let warm_start_calls = workspaces[0].solver.warm_start_calls;
assert_eq!(
warm_start_calls, 0,
"P3b: no solve_with_basis calls expected when BasisStore is empty \
(warm_start_calls == 0, got {warm_start_calls})"
);
}
#[test]
fn backward_solver_error_propagates() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let basis_store = basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
);
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore must not be mutated by the backward pass error path"
);
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_possible_truncation,
clippy::cast_precision_loss
)]
fn test_backward_pass_parallel_cut_determinism() {
use crate::lp_builder::PatchBuffer;
let n_stages = 3_usize;
let n_openings = 2_usize;
let n_trial_points = 8_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
#[allow(clippy::cast_possible_truncation)]
let forward_passes = n_trial_points as u32;
let states: Vec<Vec<f64>> = (0..n_trial_points).map(|i| vec![i as f64 + 1.0]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let comm = StubComm;
let mut fcf_1 = FutureCostFunction::new(n_stages, n_state, forward_passes, 20, 0);
let solver_1 = MockSolver::always_ok(solution.clone());
let mut workspaces_1 = vec![SolverWorkspace {
solver: solver_1,
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
}];
let basis_store_1 = empty_basis_store(exchange.local_count(), n_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
};
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces_1,
&basis_store_1,
&ctx,
&mut fcf_1,
&mut empty_cut_batches(n_stages),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
let mut fcf_4 = FutureCostFunction::new(n_stages, n_state, forward_passes, 20, 0);
let mut workspaces_4: Vec<SolverWorkspace<MockSolver>> = (0..4)
.map(|_| SolverWorkspace {
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
})
.collect();
let basis_store_4 = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces_4,
&basis_store_4,
&ctx,
&mut fcf_4,
&mut empty_cut_batches(n_stages),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
for t in 0..(n_stages - 1) {
let cuts_1: Vec<_> = fcf_1.active_cuts(t).collect();
let cuts_4: Vec<_> = fcf_4.active_cuts(t).collect();
assert_eq!(
cuts_1.len(),
cuts_4.len(),
"stage {t}: cut count differs (1 workspace: {}, 4 workspaces: {})",
cuts_1.len(),
cuts_4.len()
);
for (idx, ((slot_1, intercept_1, coeff_1), (slot_4, intercept_4, coeff_4))) in
cuts_1.iter().zip(cuts_4.iter()).enumerate()
{
assert_eq!(
slot_1, slot_4,
"stage {t} cut {idx}: slot mismatch ({slot_1} vs {slot_4})"
);
assert!(
(intercept_1 - intercept_4).abs() < 1e-12,
"stage {t} cut {idx}: intercept mismatch ({intercept_1} vs {intercept_4})"
);
assert_eq!(
coeff_1.len(),
coeff_4.len(),
"stage {t} cut {idx}: coefficient vector length mismatch"
);
for (j, (c1, c4)) in coeff_1.iter().zip(coeff_4.iter()).enumerate() {
assert!(
(c1 - c4).abs() < 1e-12,
"stage {t} cut {idx} coeff[{j}]: {c1} vs {c4}"
);
}
}
}
assert_eq!(fcf_1.active_cuts(n_stages - 1).count(), 0);
assert_eq!(fcf_4.active_cuts(n_stages - 1).count(), 0);
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_with_load(
n_stages: usize,
branching_factor: usize,
mean_mw: f64,
std_mw: f64,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{CorrelationModel, InflowModel, LoadModel};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::build_stochastic_context;
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity {
productivity_mw_per_m3s: 1.0,
},
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
fpha_turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let inflow_models: Vec<InflowModel> = (0..n_stages)
.map(|idx| InflowModel {
hydro_id: EntityId(10),
stage_id: idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
})
.collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let load_models: Vec<LoadModel> = (0..n_stages)
.map(|idx| LoadModel {
bus_id: EntityId(1),
stage_id: idx as i32,
mean_mw,
std_mw,
})
.collect();
let correlation = CorrelationModel {
method: "cholesky".to_string(),
profiles: std::collections::BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.load_models(load_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(&system, 42, &[], &[], None).unwrap()
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_load_patches_applied() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 300.0, 30.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
};
let mut workspaces = vec![ws];
let comm = StubComm;
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].scratch.load_rhs_buf.len(),
1,
"load_rhs_buf must have 1 entry (1 load bus × 1 block)"
);
assert!(
workspaces[0].scratch.load_rhs_buf[0] > 0.0,
"load realization must be positive with mean=300, std=30: got {}",
workspaces[0].scratch.load_rhs_buf[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_no_load_buses_unchanged() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 0, 0);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
};
let mut workspaces = vec![ws];
let comm = StubComm;
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let _ = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1_usize; 2],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(
workspaces[0].patch_buf.forward_patch_count(),
3,
"forward_patch_count must be N*(2+L)+N=3 when n_load_buses=0, got {}",
workspaces[0].patch_buf.forward_patch_count()
);
assert!(
workspaces[0].scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_cut_coefficients_unaffected() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 200.0, 20.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state; let forward_passes = 1_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 10, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let ws = SolverWorkspace {
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
},
};
let mut workspaces = vec![ws];
let comm = StubComm;
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::new(n_state, 64, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 1);
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, _intercept, coefficients) = &cuts[0];
assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length must be n_state={n_state}, got {} — \
load buses must not add state variables",
coefficients.len()
);
}
#[test]
fn per_stage_cut_sync_invariant_after_bug1_fix() {
use cobre_comm::LocalBackend;
let n_stages = 4_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf = FutureCostFunction::new(n_stages, n_state, forward_passes, 20, 0);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, forward_passes as usize, 1);
let result = run_backward_pass(
&mut workspaces,
&basis_store,
&StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
},
&mut fcf,
&mut empty_cut_batches(templates.len()),
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
},
&mut BackwardPassSpec {
records: &[],
iteration: 1,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
probabilities_buf: &mut Vec::new(),
successor_active_slots_buf: &mut Vec::new(),
visited_archive: None,
},
&comm,
)
.unwrap();
assert_eq!(result.cuts_generated, 9);
assert_eq!(fcf.active_cuts(0).count(), 3, "stage 0 must have 3 cuts");
assert_eq!(fcf.active_cuts(1).count(), 3, "stage 1 must have 3 cuts");
assert_eq!(fcf.active_cuts(2).count(), 3, "stage 2 must have 3 cuts");
assert_eq!(
fcf.active_cuts(3).count(),
0,
"terminal stage must have 0 cuts"
);
assert!(
result.cut_sync_time_ms < 10_000,
"cut_sync_time_ms should be reasonable, got {}",
result.cut_sync_time_ms
);
}
}