#[cfg(test)]
use cobre_comm::Communicator;
use cobre_solver::{RowBatch, SolutionView, SolverInterface, SolverStatistics};
use crate::{
SddpError,
context::{StageContext, TrainingContext},
cut::pool::CutPool,
forward::write_capture_metadata,
noise::{NcsNoiseOffsets, transform_inflow_noise, transform_load_noise, transform_ncs_noise},
risk_measure::RiskMeasure,
solver_stats::SolverStatsDelta,
state_exchange::ExchangeBuffers,
workspace::{BasisStoreSliceMut, CapturedBasis, SolverWorkspace},
};
pub type StageWorkerOpeningDelta = (i32, i32, usize, SolverStatsDelta);
#[derive(Debug, Clone)]
#[must_use]
pub struct BackwardResult {
pub cuts_generated: usize,
pub elapsed_ms: u64,
pub lp_solves: u64,
pub stage_stats: Vec<(usize, Vec<StageWorkerOpeningDelta>)>,
pub state_exchange_time_ms: u64,
pub cut_batch_build_time_ms: u64,
pub setup_time_ms: u64,
pub load_imbalance_ms: u64,
pub scheduling_overhead_ms: u64,
pub cut_sync_time_ms: u64,
}
pub(crate) struct StagedCut {
pub(crate) trial_point_idx: usize,
pub(crate) intercept: f64,
pub(crate) coefficients: Vec<f64>,
pub(crate) forward_pass_index: u32,
}
pub(crate) struct SuccessorSpec<'a> {
pub(crate) t: usize,
pub(crate) successor: usize,
pub(crate) my_rank: usize,
pub(crate) probabilities: &'a [f64],
pub(crate) cut_batch: &'a RowBatch,
pub(crate) num_cuts_at_successor: usize,
pub(crate) template_num_rows: usize,
pub(crate) baked_template: &'a cobre_solver::StageTemplate,
pub(crate) successor_active_slots: &'a [usize],
pub(crate) cut_activity_tolerance: f64,
pub(crate) basis_activity_window: u32,
pub(crate) successor_populated_count: usize,
pub(crate) successor_pool: &'a CutPool,
}
pub(crate) fn load_backward_lp<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
succ: &SuccessorSpec<'_>,
) {
ws.solver.load_model(succ.baked_template);
if succ.cut_batch.num_rows > 0 {
ws.solver.add_rows(succ.cut_batch);
}
}
fn patch_opening_bounds<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
raw_noise: &[f64],
x_hat: &[f64],
s: usize,
) {
let n_blks = if ctx.n_load_buses > 0 {
ctx.block_counts_per_stage[s]
} else {
0
};
transform_inflow_noise(raw_noise, s, x_hat, ctx, training_ctx, &mut ws.scratch);
transform_load_noise(
raw_noise,
ctx.n_hydros,
ctx.n_load_buses,
training_ctx.stochastic,
s,
n_blks,
&mut ws.scratch.load_rhs_buf,
);
let n_stochastic_ncs = training_ctx.stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 {
transform_ncs_noise(
raw_noise,
&NcsNoiseOffsets {
n_hydros: ctx.n_hydros,
n_load_buses: ctx.n_load_buses,
},
training_ctx.stochastic,
s,
ctx.block_counts_per_stage[s],
ctx.ncs_max_gen,
ctx.ncs_allow_curtailment,
&mut ws.scratch.ncs_col_lower_buf,
&mut ws.scratch.ncs_col_upper_buf,
);
}
ws.patch_buf.fill_forward_patches(
training_ctx.indexer,
x_hat,
&ws.scratch.noise_buf,
ctx.base_rows[s],
&ctx.templates[s].row_scale,
);
if ctx.n_load_buses > 0 {
ws.patch_buf.fill_load_patches(
ctx.load_balance_row_starts[s],
n_blks,
&ws.scratch.load_rhs_buf,
ctx.load_bus_indices,
&ctx.templates[s].row_scale,
);
}
ws.patch_buf.fill_z_inflow_patches(
training_ctx.indexer.z_inflow_row_start,
&ws.scratch.z_inflow_rhs_buf,
&ctx.templates[s].row_scale,
);
let pc = ws.patch_buf.forward_patch_count();
ws.solver.set_row_bounds(
&ws.patch_buf.indices[..pc],
&ws.patch_buf.lower[..pc],
&ws.patch_buf.upper[..pc],
);
if n_stochastic_ncs > 0 && !training_ctx.indexer.ncs_generation.is_empty() {
let n_blks_stage = ctx.block_counts_per_stage[s];
let expected_len = n_stochastic_ncs * n_blks_stage;
if ws.scratch.ncs_col_indices_buf.len() != expected_len {
ws.scratch.ncs_col_indices_buf.clear();
for ncs_idx in 0..n_stochastic_ncs {
for blk in 0..n_blks_stage {
ws.scratch.ncs_col_indices_buf.push(
training_ctx.indexer.ncs_generation.start + ncs_idx * n_blks_stage + blk,
);
}
}
}
ws.solver.set_col_bounds(
&ws.scratch.ncs_col_indices_buf,
&ws.scratch.ncs_col_lower_buf,
&ws.scratch.ncs_col_upper_buf,
);
}
}
#[inline]
fn resolve_backward_basis<'a>(
basis_slice: &'a BasisStoreSliceMut<'_>,
m: usize,
s: usize,
) -> Option<&'a CapturedBasis> {
basis_slice.get(m, s)
}
fn extract_duals_from_view(
view: &SolutionView<'_>,
n_state: usize,
row_scale: &[f64],
succ: &SuccessorSpec<'_>,
state_duals: &mut Vec<f64>,
cut_duals: &mut Vec<f64>,
) -> f64 {
let objective = view.objective;
state_duals.clear();
if row_scale.is_empty() {
state_duals.extend_from_slice(&view.dual[..n_state]);
} else {
state_duals.extend(
view.dual[..n_state]
.iter()
.zip(row_scale)
.map(|(&d, &rs)| d * rs),
);
}
debug_assert_eq!(
state_duals.len(),
n_state,
"state_duals must contain exactly n_state entries after fill"
);
cut_duals.clear();
if succ.num_cuts_at_successor > 0 {
cut_duals.extend_from_slice(
&view.dual[succ.template_num_rows..succ.template_num_rows + succ.num_cuts_at_successor],
);
}
objective
}
fn accumulate_opening_outcome<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
succ: &SuccessorSpec<'_>,
omega: usize,
objective: f64,
x_hat: &[f64],
stats_before: &SolverStatistics,
stats_after: &SolverStatistics,
) {
let opening_delta = SolverStatsDelta::from_snapshots(stats_before, stats_after);
SolverStatsDelta::accumulate_into(
&mut ws.backward_accum.per_opening_stats[omega],
&opening_delta,
);
let out = &mut ws.backward_accum.outcomes[omega];
out.coefficients
.copy_from_slice(&ws.backward_accum.state_duals_buf);
out.objective_value = objective;
out.intercept = objective
- out
.coefficients
.iter()
.zip(x_hat)
.map(|(pi, x)| pi * x)
.sum::<f64>();
for (cut_idx, &slot) in succ.successor_active_slots.iter().enumerate() {
if ws
.backward_accum
.cut_duals_buf
.get(cut_idx)
.is_some_and(|&d| d > succ.cut_activity_tolerance)
{
ws.backward_accum.slot_increments[slot] += 1;
}
}
}
fn save_basis_at_omega_zero<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
succ: &SuccessorSpec<'_>,
basis_slice: &mut BasisStoreSliceMut<'_>,
m: usize,
x_hat: &[f64],
) {
let s = succ.successor;
let num_cols = succ.baked_template.num_cols;
let base_row_count = succ.template_num_rows;
let cut_row_count = succ.num_cuts_at_successor;
let basis_row_capacity = base_row_count + cut_row_count;
if let Some(captured) = basis_slice.get_mut(m, s).as_mut() {
ws.solver.get_basis(&mut captured.basis);
write_capture_metadata(
captured,
succ.successor_pool,
base_row_count,
cut_row_count,
x_hat,
);
} else {
let mut captured = CapturedBasis::new(
num_cols,
basis_row_capacity,
base_row_count,
cut_row_count,
x_hat.len(),
);
ws.solver.get_basis(&mut captured.basis);
write_capture_metadata(
&mut captured,
succ.successor_pool,
base_row_count,
cut_row_count,
x_hat,
);
*basis_slice.get_mut(m, s) = Some(captured);
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_trial_point_backward<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
exchange: &ExchangeBuffers,
fwd_offset: usize,
iteration: u64,
risk_measures: &[RiskMeasure],
succ: &SuccessorSpec<'_>,
basis_slice: &mut BasisStoreSliceMut<'_>,
m: usize,
) -> Result<StagedCut, SddpError> {
let indexer = training_ctx.indexer;
let tree_view = training_ctx.stochastic.tree_view();
let x_hat = exchange.state_at(succ.my_rank, m);
let scenario = fwd_offset + m;
let s = succ.successor;
debug_assert_eq!(
ws.backward_accum.per_opening_stats.len(),
succ.probabilities.len(),
"per_opening_stats must be initialised to n_openings before each stage's trial-point loop"
);
for omega in 0..succ.probabilities.len() {
let raw_noise = tree_view.opening(s, omega);
patch_opening_bounds(ws, ctx, training_ctx, raw_noise, x_hat, s);
let mut state_duals = std::mem::take(&mut ws.backward_accum.state_duals_buf);
let mut cut_duals = std::mem::take(&mut ws.backward_accum.cut_duals_buf);
let stats_before_omega = ws.solver.statistics();
let stored_basis = if omega == 0 {
resolve_backward_basis(basis_slice, m, s)
} else {
None
};
let inputs = crate::stage_solve::StageInputs {
stage_context: ctx,
indexer,
pool: succ.successor_pool,
current_state: x_hat,
stored_basis,
baked_template: succ.baked_template,
stage_index: s,
scenario_index: scenario,
iteration: Some(iteration),
horizon_is_terminal: false,
terminal_has_boundary_cuts: false,
basis_activity_window: succ.basis_activity_window,
};
let outcome =
crate::stage_solve::run_stage_solve(ws, crate::stage_solve::Phase::Backward, &inputs)?;
let crate::stage_solve::StageOutcome::Backward {
view,
recon_stats: _,
} = outcome
else {
unreachable!("run_stage_solve(Phase::Backward) returns Backward variant")
};
let objective = extract_duals_from_view(
&view,
indexer.n_state,
&ctx.templates[s].row_scale,
succ,
&mut state_duals,
&mut cut_duals,
);
let _ = view;
ws.backward_accum.state_duals_buf = state_duals;
ws.backward_accum.cut_duals_buf = cut_duals;
let stats_after_omega = ws.solver.statistics();
accumulate_opening_outcome(
ws,
succ,
omega,
objective,
x_hat,
&stats_before_omega,
&stats_after_omega,
);
if omega == 0 {
save_basis_at_omega_zero(ws, succ, basis_slice, m, x_hat);
}
}
let n_openings = succ.probabilities.len();
let mut agg_intercept = 0.0_f64;
risk_measures[succ.t].aggregate_cut_into(
&ws.backward_accum.outcomes[..n_openings],
succ.probabilities,
&mut agg_intercept,
&mut ws.backward_accum.agg_coefficients,
&mut ws.backward_accum.risk_scratch,
);
let agg_coefficients = ws.backward_accum.agg_coefficients.clone();
debug_assert!(
u32::try_from(scenario).is_ok(),
"global scenario index overflows u32"
);
#[allow(clippy::cast_possible_truncation)]
let forward_pass_index = scenario as u32;
let pop = ws.backward_accum.slot_increments.len();
for slot in 0..pop {
let count = ws.backward_accum.slot_increments[slot];
if count > 0 {
ws.backward_accum.metadata_sync_contribution[slot] += count;
ws.backward_accum.metadata_sync_window_contribution[slot] |= 1u32;
}
}
Ok(StagedCut {
trial_point_idx: m,
intercept: agg_intercept,
coefficients: agg_coefficients,
forward_pass_index,
})
}
#[cfg(test)]
fn run_backward_pass<S: SolverInterface + Send, C: Communicator>(
inputs: &mut crate::backward_pass_state::BackwardPassInputs<'_, S, C>,
) -> Result<BackwardResult, SddpError> {
let n_workers_local = inputs.workspaces.len();
let n_ranks = inputs.comm.size();
let num_stages = inputs.training_ctx.horizon.num_stages();
let bwd_max_openings = (0..num_stages)
.map(|t| inputs.training_ctx.stochastic.opening_tree().n_openings(t))
.max()
.unwrap_or(0);
let real_states_capacity =
inputs.exchange.real_total_scenarios() * inputs.training_ctx.indexer.n_state;
let mut bwd_state = crate::backward_pass_state::BackwardPassState::new(
n_workers_local,
n_ranks,
bwd_max_openings,
real_states_capacity,
);
bwd_state.run(inputs)
}
#[cfg(test)]
mod tests {
use cobre_comm::{CommData, CommError, Communicator, ReduceOp};
use cobre_solver::{
Basis, LpSolution, RowBatch, SolverError, SolverInterface, SolverStatistics, StageTemplate,
};
use cobre_core::scenario::SamplingScheme;
use super::{BackwardResult, run_backward_pass};
use crate::{
context::{StageContext, TrainingContext},
cut::FutureCostFunction,
cut_sync::CutSyncBuffers,
horizon_mode::HorizonMode,
indexer::StageIndexer,
inflow_method::InflowNonNegativityMethod,
risk_measure::RiskMeasure,
solver_stats::SolverStatsDelta,
state_exchange::ExchangeBuffers,
trajectory::TrajectoryRecord,
workspace::{BackwardAccumulators, BasisStore, SolverWorkspace},
};
fn empty_cut_batches(n_stages: usize) -> Vec<RowBatch> {
(0..n_stages)
.map(|_| RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
})
.collect()
}
struct StubComm;
impl Communicator for StubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
unreachable!("StubComm broadcast not used in backward pass tests")
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
1
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
current_num_rows: usize,
warm_start_calls: usize,
cut_dual_padding: f64,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
cut_dual_padding: 0.0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
cut_dual_padding: 0.0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn always_ok_with_binding_cuts(solution: LpSolution) -> Self {
let mut s = Self::always_ok(solution);
s.cut_dual_padding = 1.0;
s
}
}
impl SolverInterface for MockSolver {
fn solver_name_version(&self) -> String {
"MockSolver 0.0.0".to_string()
}
fn load_model(&mut self, template: &StageTemplate) {
self.current_num_rows = template.num_rows;
}
fn add_rows(&mut self, cuts: &RowBatch) {
self.current_num_rows += cuts.num_rows;
}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(
&mut self,
basis: Option<&Basis>,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
if basis.is_some() {
self.warm_start_calls += 1;
}
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_dual
.resize(self.current_num_rows, self.cut_dual_padding);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
fn get_basis(&mut self, _out: &mut Basis) {}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 3,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn solution_1_0(objective: f64, dual_storage: f64) -> LpSolution {
LpSolution {
objective,
primal: vec![0.0, 0.0, 0.0],
dual: vec![dual_storage],
reduced_costs: vec![0.0; 3],
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn single_workspace(solver: MockSolver, n_state: usize) -> Vec<SolverWorkspace<MockSolver>> {
use crate::lp_builder::PatchBuffer;
vec![SolverWorkspace {
rank: 0,
worker_id: 0,
solver,
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
}]
}
fn empty_basis_store(num_scenarios: usize, num_stages: usize) -> BasisStore {
BasisStore::new(num_scenarios, num_stages)
}
fn basis_store_with_one(
num_scenarios: usize,
num_stages: usize,
scenario: usize,
stage: usize,
basis: Basis,
) -> BasisStore {
let mut store = BasisStore::new(num_scenarios, num_stages);
*store.get_mut(scenario, stage) = Some(crate::workspace::CapturedBasis {
basis,
base_row_count: 0,
cut_row_slots: Vec::new(),
state_at_capture: Vec::new(),
});
store
}
fn exchange_with_states(n_state: usize, states: Vec<Vec<f64>>) -> ExchangeBuffers {
use cobre_comm::LocalBackend;
let local_count = states.len();
let mut bufs = ExchangeBuffers::new(n_state, local_count, 1);
let records: Vec<TrajectoryRecord> = states
.into_iter()
.map(|state| TrajectoryRecord {
primal: vec![],
dual: vec![],
stage_cost: 0.0,
state,
})
.collect();
let comm = LocalBackend;
bufs.exchange(&records, 0, 1, &comm).unwrap();
bufs
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context(
n_stages: usize,
branching_factor: usize,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::{
Bus, DeficitSegment, EntityId, SystemBuilder,
scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile,
InflowModel,
},
temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
},
};
use cobre_stochastic::context::{
ClassSchemes, OpeningTreeInputs, build_stochastic_context,
};
use std::collections::BTreeMap;
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation)]
let inflow = |stage_idx: usize| InflowModel {
hydro_id: EntityId(1),
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
stage_id: stage_idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let inflow_models: Vec<InflowModel> = (0..n_stages).map(inflow).collect();
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
fn backward_result_fields_accessible() {
let r = BackwardResult {
cuts_generated: 6,
elapsed_ms: 42,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
cut_sync_time_ms: 0,
};
assert_eq!(r.cuts_generated, 6);
assert_eq!(r.elapsed_ms, 42);
assert!(r.stage_stats.is_empty());
assert_eq!(r.state_exchange_time_ms, 0);
assert_eq!(r.cut_batch_build_time_ms, 0);
assert_eq!(r.setup_time_ms, 0);
assert_eq!(r.load_imbalance_ms, 0);
assert_eq!(r.scheduling_overhead_ms, 0);
assert_eq!(r.cut_sync_time_ms, 0);
}
#[test]
fn backward_result_clone_and_debug() {
let r = BackwardResult {
cuts_generated: 3,
elapsed_ms: 100,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
cut_sync_time_ms: 0,
};
let c = r.clone();
assert_eq!(c.cuts_generated, 3);
let s = format!("{r:?}");
assert!(s.contains("BackwardResult"));
}
#[test]
fn dual_extraction_formula_coefficients_are_negated_duals() {
let d0 = 3.5_f64;
let d1 = -1.2_f64;
let dual = [d0, d1];
let coefficients: Vec<f64> = dual.iter().map(|&d| -d).collect();
assert!((coefficients[0] - (-d0)).abs() < f64::EPSILON);
assert!((coefficients[1] - (-d1)).abs() < f64::EPSILON);
}
#[test]
fn intercept_formula_matches_spec() {
let objective = 50.0_f64;
let coefficients = [2.0_f64, -1.0_f64];
let x_hat = [10.0_f64, 5.0_f64];
let pi_dot_x: f64 = coefficients
.iter()
.zip(x_hat.iter())
.map(|(p, x)| p * x)
.sum();
let intercept = objective - pi_dot_x;
assert!((intercept - 35.0).abs() < f64::EPSILON);
}
#[test]
fn single_stage_system_produces_no_cuts() {
let stochastic = make_stochastic_context(1, 2);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let n_stages = 1_usize;
let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 0);
assert_eq!(fcf.total_active_cuts(), 0);
}
#[test]
fn two_stage_system_two_trial_points_generates_two_cuts_at_stage_0() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0); let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state; let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 2);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 0);
}
#[test]
fn cut_inserted_with_correct_stage_iteration_and_forward_pass_index() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0], vec![10.0], vec![15.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let meta = &fcf.pools[0].metadata[7];
assert_eq!(meta.iteration_generated, 2);
assert_eq!(meta.forward_pass_index, 1);
}
#[test]
fn no_cuts_generated_at_last_stage() {
let n_stages = 5_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 4);
for t in 0..4 {
assert_eq!(fcf.active_cuts(t).count(), 1, "stage {t} should have 1 cut");
}
assert_eq!(fcf.active_cuts(4).count(), 0, "stage 4 must have no cuts");
}
#[test]
fn elapsed_ms_is_non_negative() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 2);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(10.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let _ = result.elapsed_ms;
}
#[test]
fn infeasible_solver_returns_sddp_infeasible_error() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
}
#[test]
fn expectation_aggregation_mean_of_per_opening_intercepts() {
use crate::risk_measure::BackwardOutcome as BO;
let outcomes = vec![
BO {
intercept: 10.0,
coefficients: vec![],
objective_value: 10.0,
},
BO {
intercept: 20.0,
coefficients: vec![],
objective_value: 20.0,
},
BO {
intercept: 30.0,
coefficients: vec![],
objective_value: 30.0,
},
];
let probs = vec![1.0 / 3.0; 3];
let (intercept, _) = RiskMeasure::Expectation.aggregate_cut(&outcomes, &probs);
assert!(
(intercept - 20.0).abs() < 1e-10,
"expected 20.0, got {intercept}"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_coefficients_and_intercept_match_dual_extraction_formula() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
assert!(
(intercept - 110.0).abs() < 1e-10,
"expected intercept=110.0, got {intercept}"
);
assert_eq!(coefficients.len(), 1);
assert!(
(coefficients[0] - (-3.0)).abs() < 1e-10,
"expected coefficient=-3.0, got {}",
coefficients[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_gradient_sign_physically_correct() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![50.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1, "expected exactly one cut");
let (_, _intercept, coefficients) = &cuts[0];
assert!(
coefficients[0] < 0.0,
"cut coefficient must be negative (more storage → less future cost), \
got {} — likely the Benders cut sign bug has been reintroduced",
coefficients[0]
);
assert!(
(coefficients[0] - (-2.0)).abs() < 1e-10,
"expected coefficient=-2.0, got {}",
coefficients[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_is_tight_at_trial_point() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let x_hat = 30.0_f64;
let mut exchange = exchange_with_states(n_state, vec![vec![x_hat]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let q_xhat = 200.0_f64; let dual_storage = -4.0_f64;
let solution = solution_1_0(q_xhat, dual_storage);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
let cut_at_xhat = intercept + coefficients[0] * x_hat;
assert!(
(cut_at_xhat - q_xhat).abs() < 1e-10,
"cut must be tight at trial point: \
cut_value={cut_at_xhat}, Q(x̂)={q_xhat}, \
intercept={intercept}, coeff={}, x̂={x_hat}",
coefficients[0]
);
}
#[test]
fn single_rank_backward_pass_with_local_backend_produces_correct_fcf() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 4);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 2);
assert_eq!(fcf.active_cuts(2).count(), 0);
}
#[test]
#[allow(clippy::too_many_lines)]
fn forward_pass_index_matches_global_scenario_index() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 6_u32; let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(
n_state,
vec![
vec![1.0],
vec![2.0],
vec![3.0],
vec![4.0],
vec![5.0],
vec![6.0],
],
);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let meta = &fcf.pools[0].metadata[17];
assert_eq!(meta.iteration_generated, 2, "iteration_generated must be 2");
assert_eq!(
meta.forward_pass_index, 5,
"forward_pass_index must be 5 (= global m)"
);
}
#[test]
fn warm_start_uses_prepopulated_forward_basis() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store =
basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let warm_start_calls = workspaces[0].solver.warm_start_calls;
assert_eq!(
warm_start_calls, 1,
"first opening at successor stage must call solve(Some(&basis)) \
when basis_store.get(0, 1) is pre-populated (warm_start_calls == 1, got {warm_start_calls})"
);
}
#[test]
fn multi_opening_subsequent_openings_use_internal_hotstart() {
let n_stages = 2_usize;
let n_openings = 3_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let warm_start_calls = workspaces[0].solver.warm_start_calls;
assert_eq!(
warm_start_calls, 0,
"P3b: no warm-start calls expected when BasisStore is empty \
(warm_start_calls == 0, got {warm_start_calls})"
);
}
#[test]
fn backward_solver_error_propagates() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store =
basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore must not be mutated by the backward pass error path"
);
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_possible_truncation,
clippy::cast_precision_loss
)]
fn test_backward_pass_parallel_cut_determinism() {
use crate::lp_builder::PatchBuffer;
let n_stages = 3_usize;
let n_openings = 2_usize;
let n_trial_points = 8_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
#[allow(clippy::cast_possible_truncation)]
let forward_passes = n_trial_points as u32;
let states: Vec<Vec<f64>> = (0..n_trial_points).map(|i| vec![i as f64 + 1.0]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let comm = StubComm;
let mut fcf_1 =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let solver_1 = MockSolver::always_ok(solution.clone());
let mut workspaces_1 = vec![SolverWorkspace {
rank: 0,
worker_id: 0,
solver: solver_1,
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
}];
let mut basis_store_1 = empty_basis_store(exchange.local_count(), n_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces_1,
basis_store: &mut basis_store_1,
ctx: &ctx,
baked: &templates,
fcf: &mut fcf_1,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let mut fcf_4 =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut workspaces_4: Vec<SolverWorkspace<MockSolver>> = (0..4_i32)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: idx,
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store_4 = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces_4,
basis_store: &mut basis_store_4,
ctx: &ctx,
baked: &templates,
fcf: &mut fcf_4,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
for t in 0..(n_stages - 1) {
let cuts_1: Vec<_> = fcf_1.active_cuts(t).collect();
let cuts_4: Vec<_> = fcf_4.active_cuts(t).collect();
assert_eq!(
cuts_1.len(),
cuts_4.len(),
"stage {t}: cut count differs (1 workspace: {}, 4 workspaces: {})",
cuts_1.len(),
cuts_4.len()
);
for (idx, ((slot_1, intercept_1, coeff_1), (slot_4, intercept_4, coeff_4))) in
cuts_1.iter().zip(cuts_4.iter()).enumerate()
{
assert_eq!(
slot_1, slot_4,
"stage {t} cut {idx}: slot mismatch ({slot_1} vs {slot_4})"
);
assert!(
(intercept_1 - intercept_4).abs() < 1e-12,
"stage {t} cut {idx}: intercept mismatch ({intercept_1} vs {intercept_4})"
);
assert_eq!(
coeff_1.len(),
coeff_4.len(),
"stage {t} cut {idx}: coefficient vector length mismatch"
);
for (j, (c1, c4)) in coeff_1.iter().zip(coeff_4.iter()).enumerate() {
assert!(
(c1 - c4).abs() < 1e-12,
"stage {t} cut {idx} coeff[{j}]: {c1} vs {c4}"
);
}
}
}
assert_eq!(fcf_1.active_cuts(n_stages - 1).count(), 0);
assert_eq!(fcf_4.active_cuts(n_stages - 1).count(), 0);
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_with_load(
n_stages: usize,
branching_factor: usize,
mean_mw: f64,
std_mw: f64,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{CorrelationModel, InflowModel, LoadModel};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::{
ClassSchemes, OpeningTreeInputs, build_stochastic_context,
};
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let inflow_models: Vec<InflowModel> = (0..n_stages)
.map(|idx| InflowModel {
hydro_id: EntityId(10),
stage_id: idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
})
.collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let load_models: Vec<LoadModel> = (0..n_stages)
.map(|idx| LoadModel {
bus_id: EntityId(1),
stage_id: idx as i32,
mean_mw,
std_mw,
})
.collect();
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles: std::collections::BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.load_models(load_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_load_patches_applied() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 300.0, 30.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
workspaces[0].scratch.load_rhs_buf.len(),
1,
"load_rhs_buf must have 1 entry (1 load bus × 1 block)"
);
assert!(
workspaces[0].scratch.load_rhs_buf[0] > 0.0,
"load realization must be positive with mean=300, std=30: got {}",
workspaces[0].scratch.load_rhs_buf[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_no_load_buses_unchanged() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 0, 0);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1_usize; 2],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
workspaces[0].patch_buf.forward_patch_count(),
3,
"forward_patch_count must be N*(2+L)+N=3 when n_load_buses=0, got {}",
workspaces[0].patch_buf.forward_patch_count()
);
assert!(
workspaces[0].scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_cut_coefficients_unaffected() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 200.0, 20.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state; let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: MockSolver::always_ok(solution),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 1);
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, _intercept, coefficients) = &cuts[0];
assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length must be n_state={n_state}, got {} — \
load buses must not add state variables",
coefficients.len()
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn per_stage_cut_sync_invariant_after_bug1_fix() {
use cobre_comm::LocalBackend;
let n_stages = 4_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, forward_passes as usize, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 9);
assert_eq!(fcf.active_cuts(0).count(), 3, "stage 0 must have 3 cuts");
assert_eq!(fcf.active_cuts(1).count(), 3, "stage 1 must have 3 cuts");
assert_eq!(fcf.active_cuts(2).count(), 3, "stage 2 must have 3 cuts");
assert_eq!(
fcf.active_cuts(3).count(),
0,
"terminal stage must have 0 cuts"
);
assert!(
result.cut_sync_time_ms < 10_000,
"cut_sync_time_ms should be reasonable, got {}",
result.cut_sync_time_ms
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn metadata_sync_updates_active_count_and_last_active_iter() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok_with_binding_cuts(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, forward_passes as usize, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 6);
assert_eq!(fcf.pools[1].populated_count, 6);
for slot in 3..6 {
assert!(
fcf.pools[1].metadata[slot].active_count > 0,
"slot {slot} active_count should be > 0 (cuts were binding)"
);
assert_eq!(
fcf.pools[1].metadata[slot].active_count, 6,
"slot {slot} active_count should be 6 (3 trial points × 2 openings)"
);
assert_eq!(
fcf.pools[1].metadata[slot].last_active_iter, 1,
"slot {slot} last_active_iter should be 1 (current iteration)"
);
}
for slot in 3..6 {
assert_eq!(
fcf.pools[1].metadata[slot].active_window & 1,
1,
"slot {slot} active_window bit 0 should be set (cut was binding this iteration)"
);
}
for slot in 0..3 {
assert_eq!(
fcf.pools[1].metadata[slot].active_window, 0,
"slot {slot} active_window should be 0 (cut was not binding)"
);
}
assert_eq!(fcf.pools[2].populated_count, 0);
}
#[test]
fn active_window_pre_allocation_is_zero() {
use crate::cut::CutPool;
let n_state = 1;
let capacity = 8;
let pool = CutPool::new(capacity, n_state, 3, 0);
for m in &pool.metadata {
assert_eq!(
m.active_window, 0,
"newly allocated CutMetadata must have active_window == 0 before add_cut"
);
}
}
#[test]
fn add_cut_seeds_active_window_with_seed_bit() {
use crate::basis_reconstruct::{DEFAULT_RECENT_WINDOW_BITS, SEED_BIT};
use crate::cut::CutPool;
let mut pool = CutPool::new(8, 1, 3, 0);
pool.add_cut(
1,
0,
0.5,
&[1.0_f64],
);
assert_eq!(
pool.metadata[3].active_window, SEED_BIT,
"add_cut must seed active_window = SEED_BIT \
so the classifier treats the generating event as a bind signal."
);
assert_ne!(
pool.metadata[3].active_window & (DEFAULT_RECENT_WINDOW_BITS | SEED_BIT),
0,
"the seed must fire the classifier's new-cut LOWER branch"
);
assert_eq!(
SEED_BIT & DEFAULT_RECENT_WINDOW_BITS,
0,
"SEED_BIT must not overlap DEFAULT_RECENT_WINDOW_BITS"
);
}
#[test]
fn seed_bit_cleared_by_end_of_iter_shift() {
use crate::basis_reconstruct::{DEFAULT_RECENT_WINDOW_BITS, SEED_BIT};
let mut aw: u32 = SEED_BIT;
aw = (aw & !SEED_BIT) << 1;
assert_eq!(
aw, 0,
"pure seed with no binding observation must shift to 0"
);
let mut aw: u32 = SEED_BIT | 0b1;
aw = (aw & !SEED_BIT) << 1;
assert_eq!(
aw, 0b10,
"observed-binding bit 0 must survive and land at bit 1; \
seed bit must be cleared"
);
assert_ne!(
aw & DEFAULT_RECENT_WINDOW_BITS,
0,
"surviving bit 1 still fires the classifier in iter i+1 (real activity)"
);
}
#[test]
fn active_window_shift_clears_bit_zero() {
let mut window: u32 = 1; window <<= 1;
assert_eq!(window & 1, 0, "shift must clear bit 0");
assert_eq!(window & 2, 2, "shift must move bit 0 to bit 1");
let mut w: u32 = 0xFFFF_FFFF;
for _ in 0..32 {
w <<= 1;
}
assert_eq!(w, 0, "32 left-shifts of u32 must overflow to 0");
let mut w2: u32 = 0;
w2 <<= 1;
assert_eq!(w2, 0, "shift of 0 must remain 0");
}
#[test]
#[allow(clippy::too_many_lines)]
fn active_window_or_reduction_round_trip_local() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let local_count = exchange.local_count();
let solver = MockSolver::always_ok_with_binding_cuts(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(local_count, n_stages);
let mut csb = CutSyncBuffers::new(n_state, local_count, 1);
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: local_count,
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let active_slots: Vec<usize> = fcf.pools[1]
.active_cuts()
.map(|(slot, _, _)| slot)
.collect();
assert!(
!active_slots.is_empty(),
"pool[1] must have at least one active cut"
);
for slot in active_slots {
let window = fcf.pools[1].metadata[slot].active_window;
assert_eq!(
window & 1,
1,
"slot {slot} active_window bit 0 must be set after a binding backward pass \
(got {window:#010x})"
);
}
}
#[test]
fn active_window_cleared_at_iteration_start() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let local_count = exchange.local_count();
let solver = MockSolver::always_ok_with_binding_cuts(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(local_count, n_stages);
let mut csb = CutSyncBuffers::new(n_state, local_count, 1);
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: local_count,
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
for ws in &workspaces {
for &v in &ws.backward_accum.metadata_sync_window_contribution {
assert_eq!(
v, 0,
"metadata_sync_window_contribution must be cleared after backward pass"
);
}
}
}
#[allow(clippy::too_many_lines, clippy::cast_precision_loss)]
fn run_backward_pass_with_n_workers(n_workers: usize) -> FutureCostFunction {
use crate::lp_builder::PatchBuffer;
let n_stages = 2_usize;
let local_work = 6_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
#[allow(clippy::cast_possible_truncation)]
let forward_passes = local_work as u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 64, &vec![0; n_stages]);
let states: Vec<Vec<f64>> = (0..local_work)
.map(|i| vec![(i + 1) as f64 * 10.0])
.collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("worker_id fits in i32"),
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let comm = StubComm;
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
result.cuts_generated, local_work,
"n_workers={n_workers}: expected {local_work} cuts, got {}",
result.cuts_generated,
);
fcf
}
#[test]
fn work_stealing_produces_identical_results_across_worker_counts() {
let fcf_1 = run_backward_pass_with_n_workers(1);
let fcf_3 = run_backward_pass_with_n_workers(3);
let num_stages = 2;
assert!(
fcf_1.active_cuts(0).count() > 0,
"1-worker run produced no cuts at stage 0"
);
for stage in 0..num_stages {
let cuts_1: Vec<_> = fcf_1.active_cuts(stage).collect();
let cuts_3: Vec<_> = fcf_3.active_cuts(stage).collect();
assert_eq!(
cuts_1.len(),
cuts_3.len(),
"stage {stage}: cut count mismatch ({} vs {})",
cuts_1.len(),
cuts_3.len(),
);
for (i, ((s1, int1, c1), (s3, int3, c3))) in cuts_1.iter().zip(&cuts_3).enumerate() {
assert_eq!(
s1, s3,
"stage {stage}, cut {i}: slot mismatch ({s1} vs {s3})"
);
assert_eq!(
int1, int3,
"stage {stage}, cut {i}: intercept mismatch ({int1} vs {int3})"
);
assert_eq!(
c1, c3,
"stage {stage}, cut {i}: coefficients mismatch ({c1:?} vs {c3:?})"
);
}
}
}
fn make_stats(
solve_s: f64,
load_s: f64,
set_bounds_s: f64,
basis_set_s: f64,
) -> SolverStatistics {
SolverStatistics {
total_solve_time_seconds: solve_s,
total_load_model_time_seconds: load_s,
total_set_bounds_time_seconds: set_bounds_s,
total_basis_set_time_seconds: basis_set_s,
..SolverStatistics::default()
}
}
fn decompose_overhead(
pairs: &[(SolverStatistics, SolverStatistics)],
parallel_wall_ms: u64,
) -> (u64, u64, u64) {
use crate::solver_stats::SolverStatsDelta;
#[allow(clippy::cast_precision_loss)]
let n_workers = pairs.len() as f64;
let worker_deltas: Vec<SolverStatsDelta> = pairs
.iter()
.map(|(before, after)| SolverStatsDelta::from_snapshots(before, after))
.collect();
let stage_setup_ms: f64 = worker_deltas
.iter()
.map(|d| d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms)
.sum();
let worker_totals: Vec<f64> = worker_deltas
.iter()
.map(|d| {
d.solve_time_ms + d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms
})
.collect();
let max_worker_ms = worker_totals.iter().copied().fold(0.0_f64, f64::max);
let avg_worker_ms = if worker_totals.is_empty() {
0.0_f64
} else {
worker_totals.iter().sum::<f64>() / n_workers
};
let stage_imbalance_ms = (max_worker_ms - avg_worker_ms).max(0.0);
#[allow(clippy::cast_precision_loss)]
let stage_scheduling_ms = (parallel_wall_ms as f64 - max_worker_ms).max(0.0);
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
(
stage_setup_ms as u64,
stage_imbalance_ms as u64,
stage_scheduling_ms as u64,
)
}
#[test]
fn decompose_four_workers_different_solve_times() {
let zero = SolverStatistics::default();
let pairs = vec![
(zero.clone(), make_stats(0.1, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.2, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.15, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.18, 0.0, 0.0, 0.0)), ];
let (setup_ms, imbalance_ms, scheduling_ms) = decompose_overhead(&pairs, 250);
assert_eq!(setup_ms, 0, "no setup work expected");
assert_eq!(
imbalance_ms, 42,
"imbalance = trunc(max(200.0) - avg(157.5)) = trunc(42.5) = 42"
);
assert_eq!(scheduling_ms, 50, "scheduling overhead = wall - max_worker");
}
#[test]
fn decompose_setup_time_is_aggregate_non_solve_work() {
let zero = SolverStatistics::default();
let pairs = vec![
(zero.clone(), make_stats(0.0, 0.020, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.025, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.015, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.022, 0.0, 0.0)), ];
let (setup_ms, _imbalance_ms, _scheduling_ms) = decompose_overhead(&pairs, 300);
assert_eq!(
setup_ms, 82,
"aggregate setup must sum all workers' non-solve work"
);
}
#[test]
fn decompose_identical_workers_zero_imbalance() {
let zero = SolverStatistics::default();
let after = make_stats(0.1, 0.01, 0.002, 0.001);
let pairs = vec![
(zero.clone(), after.clone()),
(zero.clone(), after.clone()),
(zero.clone(), after.clone()),
];
let (_, imbalance_ms, _) = decompose_overhead(&pairs, 200);
assert_eq!(
imbalance_ms, 0,
"identical workers must have zero imbalance"
);
}
#[test]
fn decompose_single_worker() {
let zero = SolverStatistics::default();
let after = make_stats(0.1, 0.020, 0.0, 0.0);
let pairs = vec![(zero.clone(), after)];
let (setup_ms, imbalance_ms, scheduling_ms) = decompose_overhead(&pairs, 150);
assert_eq!(setup_ms, 20, "single worker: setup = 20 ms");
assert_eq!(imbalance_ms, 0, "single worker: imbalance must be 0");
assert_eq!(
scheduling_ms, 30,
"single worker: scheduling = wall - worker_total"
);
}
#[test]
fn decompose_scheduling_clamped_when_worker_exceeds_wall() {
let zero = SolverStatistics::default();
let after = make_stats(0.2, 0.0, 0.0, 0.0);
let pairs = vec![(zero.clone(), after)];
let (_, _, scheduling_ms) = decompose_overhead(&pairs, 180);
assert_eq!(scheduling_ms, 0, "negative scheduling must be clamped to 0");
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_precision_loss,
clippy::cast_possible_truncation
)]
fn allgatherv_single_rank_two_workers_stage_stats_has_per_worker_entries() {
use crate::lp_builder::PatchBuffer;
let n_stages = 2_usize;
let n_openings = 4_usize;
let n_workers = 2_usize;
let local_work = 4_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let solution = solution_1_0(100.0, -5.0);
let states: Vec<Vec<f64>> = (0..local_work).map(|i| vec![(i + 1) as f64]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits in i32"),
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut fcf =
FutureCostFunction::new(n_stages, n_state, local_work as u32, 64, &vec![0; n_stages]);
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &StubComm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.expect("single-rank 2-worker backward must not error");
assert_eq!(
result.stage_stats.len(),
1,
"expected 1 backward stage entry (successor=1)"
);
let (successor, entries) = &result.stage_stats[0];
assert_eq!(*successor, 1_usize, "successor index must be 1");
for (rank, _wid, _omega, _delta) in entries {
assert_eq!(*rank, 0_i32, "all entries must have rank=0 for np=1");
}
let omega0_wids: Vec<i32> = entries
.iter()
.filter(|(_, _, omega, _)| *omega == 0)
.map(|(_, wid, _, _)| *wid)
.collect();
assert!(
omega0_wids.contains(&0),
"worker_id=0 must appear at omega=0"
);
assert!(
omega0_wids.contains(&1),
"worker_id=1 must appear at omega=0"
);
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_precision_loss,
clippy::cast_possible_truncation
)]
fn allgatherv_dual_rank_stub_stage_stats_contains_both_ranks() {
use crate::lp_builder::PatchBuffer;
struct DualRankStubComm;
impl Communicator for DualRankStubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
counts: &[usize],
displs: &[usize],
) -> Result<(), CommError> {
for (r, (&count, &displ)) in counts.iter().zip(displs).enumerate() {
let src = &send[..count.min(send.len())];
recv[displ..displ + src.len()].copy_from_slice(src);
let _ = r; }
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn broadcast<T: CommData>(
&self,
_buf: &mut [T],
_root: usize,
) -> Result<(), CommError> {
Ok(())
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
2
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
let n_stages = 2_usize;
let n_openings = 2_usize;
let n_workers = 1_usize;
let local_work = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let solution = solution_1_0(100.0, -5.0);
let states: Vec<Vec<f64>> = (0..local_work).map(|i| vec![(i + 1) as f64]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits in i32"),
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut fcf =
FutureCostFunction::new(n_stages, n_state, local_work as u32, 64, &vec![0; n_stages]);
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &DualRankStubComm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.expect("dual-rank stub backward must not error");
assert_eq!(result.stage_stats.len(), 1);
let (_, entries) = &result.stage_stats[0];
let ranks_seen: Vec<i32> = entries
.iter()
.map(|(rank, _, _, _)| *rank)
.collect::<std::collections::HashSet<i32>>()
.into_iter()
.collect();
assert!(
ranks_seen.contains(&0),
"rank=0 must appear in stage_stats; got {ranks_seen:?}"
);
assert!(
ranks_seen.contains(&1),
"rank=1 must appear in stage_stats; got {ranks_seen:?}"
);
}
#[allow(clippy::too_many_lines)]
fn run_one_trial_point_with_stores(
basis_store: &mut crate::workspace::BasisStore,
) -> Result<Vec<SolverWorkspace<MockSolver>>, crate::SddpError> {
use crate::context::StageContext;
let n_stages = 2_usize;
let n_openings = 1_usize;
let n_state = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = StageIndexer::new(n_state, 0);
let solver = MockSolver::always_ok(solution_1_0(100.0, -5.0));
let mut workspaces = single_workspace(solver, n_state);
let ws = &mut workspaces[0];
ws.backward_accum
.outcomes
.push(crate::risk_measure::BackwardOutcome {
intercept: 0.0,
coefficients: vec![0.0; n_state],
objective_value: 0.0,
});
ws.backward_accum
.per_opening_stats
.push(SolverStatsDelta::default());
ws.backward_accum.agg_coefficients.resize(n_state, 0.0);
let exchange = exchange_with_states(n_state, vec![vec![5.0]]);
let templates: &'static _ = Box::leak(Box::new(vec![
minimal_template_1_0(),
minimal_template_1_0(),
]));
let base_rows: &'static _ = Box::leak(Box::new(vec![1_usize, 1_usize]));
let ctx: StageContext<'static> = StageContext {
templates,
base_rows,
noise_scale: Box::leak(Box::new(vec![])),
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: Box::leak(Box::new(vec![])),
load_bus_indices: Box::leak(Box::new(vec![])),
block_counts_per_stage: Box::leak(Box::new(vec![])),
ncs_max_gen: Box::leak(Box::new(vec![])),
ncs_allow_curtailment: Box::leak(Box::new(vec![])),
discount_factors: Box::leak(Box::new(vec![])),
cumulative_discount_factors: Box::leak(Box::new(vec![])),
stage_lag_transitions: Box::leak(Box::new(vec![])),
noise_group_ids: Box::leak(Box::new(vec![])),
downstream_par_order: 0,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let training_ctx = TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
};
let iteration: u64 = 1;
let fwd_offset: usize = 0;
let succ_probabilities = vec![1.0_f64; n_openings];
let successor_active_slots: Vec<usize> = vec![];
let baked_template = minimal_template_1_0();
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, &vec![0u32; n_stages]);
let empty_cut_batch = RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
};
let succ_spec = super::SuccessorSpec {
t: 0,
successor: 1,
my_rank: 0,
probabilities: &succ_probabilities,
cut_batch: &empty_cut_batch,
num_cuts_at_successor: 0,
template_num_rows: baked_template.num_rows,
baked_template: &baked_template,
successor_active_slots: &successor_active_slots,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
successor_populated_count: fcf.pools[1].populated_count,
successor_pool: &fcf.pools[1],
};
let mut basis_slices = basis_store.split_workers_mut(1);
let mut basis_slice = basis_slices.remove(0);
let ws = &mut workspaces[0];
super::load_backward_lp(ws, &succ_spec);
ws.backward_accum
.per_opening_stats
.resize_with(n_openings, SolverStatsDelta::default);
for slot in &mut ws.backward_accum.per_opening_stats[..n_openings] {
*slot = SolverStatsDelta::default();
}
ws.backward_accum.slot_increments.resize(1, 0);
ws.backward_accum.slot_increments[..1].fill(0);
super::process_trial_point_backward(
ws,
&ctx,
&training_ctx,
&exchange,
fwd_offset,
iteration,
&risk_measures,
&succ_spec,
&mut basis_slice,
0,
)?;
Ok(workspaces)
}
#[test]
fn resolve_backward_basis_returns_some_when_slot_is_populated() {
use crate::workspace::{BasisStore, CapturedBasis};
let b = CapturedBasis::new(2, 2, 0, 0, 0);
let mut store = BasisStore::new(1, 2);
*store.get_mut(0, 1) = Some(b);
let slices = store.split_workers_mut(1);
let slice = &slices[0];
let basis_ref = super::resolve_backward_basis(slice, 0, 1);
assert!(basis_ref.is_some(), "expected Some when slot has a basis");
drop(slices);
}
#[test]
fn resolve_backward_basis_returns_none_when_slot_is_empty() {
use crate::workspace::BasisStore;
let mut store = BasisStore::new(1, 2);
let slices = store.split_workers_mut(1);
let slice = &slices[0];
let basis_ref = super::resolve_backward_basis(slice, 0, 1);
assert!(basis_ref.is_none(), "expected None for empty slot");
drop(slices);
}
#[test]
fn backward_write_populates_basis_store_at_omega_zero() {
use crate::workspace::BasisStore;
let mut basis_store = BasisStore::new(1, 2);
let workspaces = run_one_trial_point_with_stores(&mut basis_store).unwrap();
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore[0, 1] must be Some after backward write at omega=0"
);
let captured = basis_store.get(0, 1).unwrap();
assert_eq!(
captured.state_at_capture,
vec![5.0_f64],
"state_at_capture must equal x_hat"
);
assert_eq!(
workspaces[0].solver.call_count, 1,
"solver must be called exactly once for a 1-opening backward pass"
);
}
#[test]
fn backward_write_preserves_slot_on_infeasibility_at_omega_zero() {
use cobre_solver::Basis;
use crate::workspace::{BasisStore, CapturedBasis};
let pre_existing = CapturedBasis {
basis: Basis::new(2, 2),
base_row_count: 99,
cut_row_slots: Vec::new(),
state_at_capture: vec![42.0],
};
let mut basis_store = BasisStore::new(1, 2);
*basis_store.get_mut(0, 1) = Some(pre_existing);
assert_eq!(
basis_store.get(0, 1).unwrap().base_row_count,
99,
"sentinel must be in place before the infeasible solve"
);
let result = run_one_trial_point_with_stores(&mut basis_store);
assert!(result.is_ok(), "expected Ok for successful solve");
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore[0, 1] must not be None after successful reuse-path write at ω=0"
);
assert_eq!(
basis_store.get(0, 1).unwrap().state_at_capture,
vec![5.0_f64],
"state_at_capture must be updated to x_hat by the reuse path"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn handshake_passes_with_local_backend() {
use crate::lp_builder::PatchBuffer;
let n_stages = 1_usize;
let n_workers = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits i32"),
solver: MockSolver::always_ok(solution.clone()),
patch_buf: PatchBuffer::new(1, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 1, 1);
let comm = StubComm;
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
result.is_ok(),
"handshake must pass when all ranks have the same n_workers_local; got: {result:?}"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn handshake_rejects_nonuniform_workers() {
struct NonUniformStubComm;
impl Communicator for NonUniformStubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<(), CommError> {
match op {
ReduceOp::Min => {
for r in recv.iter_mut() {
*r = T::default();
}
}
_ => {
recv[..send.len()].copy_from_slice(send);
}
}
Ok(())
}
fn broadcast<T: CommData>(
&self,
_buf: &mut [T],
_root: usize,
) -> Result<(), CommError> {
Ok(())
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
2
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
let n_stages = 1_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = StageIndexer::new(1, 0);
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let comm = NonUniformStubComm;
let mut workspaces =
single_workspace(MockSolver::always_ok(solution_1_0(100.0, -5.0)), n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 1, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &templates,
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
match result {
Err(crate::SddpError::Validation(ref msg)) => {
assert!(
msg.contains("non-uniform n_workers_local"),
"error message must contain 'non-uniform n_workers_local'; got: {msg}"
);
assert!(
msg.contains("min=0"),
"error message must mention min=0 (stub Min returns T::default()); got: {msg}"
);
assert!(
msg.contains("max=1"),
"error message must mention max=1 (stub Max echoes local=1); got: {msg}"
);
assert!(
msg.contains("local=1"),
"error message must mention local=1 (single workspace); got: {msg}"
);
}
other => panic!(
"expected Err(SddpError::Validation(_)) from non-uniform handshake, got: {other:?}"
),
}
}
}