use crate::SddpError;
#[cfg(test)]
fn run_backward_pass<S, C: Communicator>(
inputs: &mut crate::backward_pass_state::BackwardPassInputs<'_, S, C>,
) -> Result<BackwardResult, SddpError>
where
S: SolverInterface<Profile = cobre_solver::ActiveProfile> + Send,
{
let n_workers_local = inputs.workspaces.len();
let n_ranks = inputs.comm.size();
let num_stages = inputs.training_ctx.horizon.num_stages();
let bwd_max_openings = (0..num_stages)
.map(|t| inputs.training_ctx.stochastic.opening_tree().n_openings(t))
.max()
.unwrap_or(0);
let real_states_capacity =
inputs.exchange.real_total_scenarios() * inputs.training_ctx.indexer.n_state;
let mut bwd_state = crate::backward_pass_state::BackwardPassState::new(
n_workers_local,
n_ranks,
bwd_max_openings,
real_states_capacity,
);
bwd_state.run(inputs)
}
use cobre_comm::{CommData, CommError, Communicator, ReduceOp};
use cobre_solver::{
Basis, LpSolution, ProfiledSolver, RowBatch, SolverError, SolverInterface, SolverStatistics,
StageTemplate,
};
use cobre_core::scenario::SamplingScheme;
use super::BackwardResult;
use crate::{
context::{StageContext, TrainingContext},
cut::FutureCostFunction,
cut_sync::CutSyncBuffers,
horizon_mode::HorizonMode,
indexer::StageIndexer,
inflow_method::InflowNonNegativityMethod,
risk_measure::RiskMeasure,
solver_stats::SolverStatsDelta,
state_exchange::ExchangeBuffers,
trajectory::TrajectoryRecord,
workspace::{BackwardAccumulators, BasisStore, SolverWorkspace},
};
fn empty_cut_batches(n_stages: usize) -> Vec<RowBatch> {
(0..n_stages)
.map(|_| RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
})
.collect()
}
struct StubComm;
impl Communicator for StubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
unreachable!("StubComm broadcast not used in backward pass tests")
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
1
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
current_num_rows: usize,
warm_start_calls: usize,
cut_dual_padding: f64,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
cut_dual_padding: 0.0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let base_rows = solution.dual.len();
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
current_num_rows: base_rows,
warm_start_calls: 0,
cut_dual_padding: 0.0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn always_ok_with_binding_cuts(solution: LpSolution) -> Self {
let mut s = Self::always_ok(solution);
s.cut_dual_padding = 1.0;
s
}
}
impl SolverInterface for MockSolver {
type Profile = cobre_solver::ActiveProfile;
fn apply_profile(&mut self, _profile: &cobre_solver::ActiveProfile) {}
fn solver_name_version(&self) -> String {
"MockSolver 0.0.0".to_string()
}
fn load_model(&mut self, template: &StageTemplate) {
self.current_num_rows = template.num_rows;
}
fn add_rows(&mut self, cuts: &RowBatch) {
self.current_num_rows += cuts.num_rows;
}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(
&mut self,
basis: Option<&Basis>,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
if basis.is_some() {
self.warm_start_calls += 1;
}
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_dual
.resize(self.current_num_rows, self.cut_dual_padding);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
fn get_basis(&mut self, _out: &mut Basis) {}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn statistics_into(&self, out: &mut SolverStatistics) {
out.copy_from(&SolverStatistics::default());
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 3,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn solution_1_0(objective: f64, dual_storage: f64) -> LpSolution {
let mut reduced_costs = vec![0.0; 3];
reduced_costs[2] = dual_storage;
LpSolution {
objective,
primal: vec![0.0, 0.0, 0.0],
dual: vec![dual_storage],
reduced_costs,
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn staged_cut_coefficients<'a>(cut: &super::StagedCut, arena: &'a [f64]) -> &'a [f64] {
&arena[cut.coefficients_range.clone()]
}
fn single_workspace(solver: MockSolver, n_state: usize) -> Vec<SolverWorkspace<MockSolver>> {
use crate::lp_builder::PatchBuffer;
vec![SolverWorkspace {
rank: 0,
worker_id: 0,
solver: ProfiledSolver::new(solver),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
}]
}
fn empty_basis_store(num_scenarios: usize, num_stages: usize) -> BasisStore {
BasisStore::new(num_scenarios, num_stages)
}
fn basis_store_with_one(
num_scenarios: usize,
num_stages: usize,
scenario: usize,
stage: usize,
basis: Basis,
) -> BasisStore {
let mut store = BasisStore::new(num_scenarios, num_stages);
let base_row_count = basis.row_status.len();
*store.get_mut(scenario, stage) = Some(crate::workspace::CapturedBasis {
basis,
base_row_count,
cut_row_slots: Vec::new(),
state_at_capture: Vec::new(),
});
store
}
fn exchange_with_states(n_state: usize, states: Vec<Vec<f64>>) -> ExchangeBuffers {
use cobre_comm::LocalBackend;
let local_count = states.len();
let mut bufs = ExchangeBuffers::new(n_state, local_count, 1);
let records: Vec<TrajectoryRecord> = states
.into_iter()
.map(|state| TrajectoryRecord {
primal: vec![],
dual: vec![],
stage_cost: 0.0,
state,
})
.collect();
let comm = LocalBackend;
bufs.exchange(&records, 0, 1, &comm).unwrap();
bufs
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context(
n_stages: usize,
branching_factor: usize,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::{
Bus, DeficitSegment, EntityId, SystemBuilder,
scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
},
temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
},
};
use cobre_stochastic::context::{ClassSchemes, OpeningTreeInputs, build_stochastic_context};
use std::collections::BTreeMap;
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation)]
let inflow = |stage_idx: usize| InflowModel {
hydro_id: EntityId(1),
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
stage_id: stage_idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let inflow_models: Vec<InflowModel> = (0..n_stages).map(inflow).collect();
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
fn backward_result_fields_accessible() {
let r = BackwardResult {
cuts_generated: 6,
elapsed_ms: 42,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
cut_sync_time_ms: 0,
};
assert_eq!(r.cuts_generated, 6);
assert_eq!(r.elapsed_ms, 42);
assert!(r.stage_stats.is_empty());
assert_eq!(r.state_exchange_time_ms, 0);
assert_eq!(r.cut_batch_build_time_ms, 0);
assert_eq!(r.setup_time_ms, 0);
assert_eq!(r.load_imbalance_ms, 0);
assert_eq!(r.scheduling_overhead_ms, 0);
assert_eq!(r.cut_sync_time_ms, 0);
}
#[test]
fn backward_result_clone_and_debug() {
let r = BackwardResult {
cuts_generated: 3,
elapsed_ms: 100,
lp_solves: 0,
stage_stats: Vec::new(),
state_exchange_time_ms: 0,
cut_batch_build_time_ms: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
cut_sync_time_ms: 0,
};
let c = r.clone();
assert_eq!(c.cuts_generated, 3);
let s = format!("{r:?}");
assert!(s.contains("BackwardResult"));
}
#[test]
fn dual_extraction_formula_coefficients_are_negated_duals() {
let d0 = 3.5_f64;
let d1 = -1.2_f64;
let dual = [d0, d1];
let coefficients: Vec<f64> = dual.iter().map(|&d| -d).collect();
assert!((coefficients[0] - (-d0)).abs() < f64::EPSILON);
assert!((coefficients[1] - (-d1)).abs() < f64::EPSILON);
}
#[test]
fn intercept_formula_matches_spec() {
let objective = 50.0_f64;
let coefficients = [2.0_f64, -1.0_f64];
let x_hat = [10.0_f64, 5.0_f64];
let pi_dot_x: f64 = coefficients
.iter()
.zip(x_hat.iter())
.map(|(p, x)| p * x)
.sum();
let intercept = objective - pi_dot_x;
assert!((intercept - 35.0).abs() < f64::EPSILON);
}
#[test]
fn single_stage_system_produces_no_cuts() {
let stochastic = make_stochastic_context(1, 2);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let n_stages = 1_usize;
let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 0);
assert_eq!(fcf.total_active_cuts(), 0);
}
#[test]
fn two_stage_system_two_trial_points_generates_two_cuts_at_stage_0() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
}; let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state; let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 2);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 0);
}
#[test]
fn cut_inserted_with_correct_stage_iteration_and_forward_pass_index() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0], vec![10.0], vec![15.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let meta = &fcf.pools[0].metadata[7];
assert_eq!(meta.iteration_generated, 2);
assert_eq!(meta.forward_pass_index, 1);
}
#[test]
fn no_cuts_generated_at_last_stage() {
let n_stages = 5_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 4);
for t in 0..4 {
assert_eq!(fcf.active_cuts(t).count(), 1, "stage {t} should have 1 cut");
}
assert_eq!(fcf.active_cuts(4).count(), 0, "stage 4 must have no cuts");
}
#[test]
fn elapsed_ms_is_non_negative() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 2);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![5.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(10.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let _ = result.elapsed_ms;
}
#[test]
fn infeasible_solver_returns_sddp_infeasible_error() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
}
#[test]
fn expectation_aggregation_mean_of_per_opening_intercepts() {
use crate::risk_measure::BackwardOutcome as BO;
let outcomes = vec![
BO {
intercept: 10.0,
coefficients: vec![],
objective_value: 10.0,
},
BO {
intercept: 20.0,
coefficients: vec![],
objective_value: 20.0,
},
BO {
intercept: 30.0,
coefficients: vec![],
objective_value: 30.0,
},
];
let probs = vec![1.0 / 3.0; 3];
let (intercept, _) = RiskMeasure::Expectation.aggregate_cut(&outcomes, &probs);
assert!(
(intercept - 20.0).abs() < 1e-10,
"expected 20.0, got {intercept}"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_coefficients_and_intercept_match_dual_extraction_formula() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
assert!(
(intercept - 110.0).abs() < 1e-10,
"expected intercept=110.0, got {intercept}"
);
assert_eq!(coefficients.len(), 1);
assert!(
(coefficients[0] - (-3.0)).abs() < 1e-10,
"expected coefficient=-3.0, got {}",
coefficients[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_gradient_sign_physically_correct() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![50.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1, "expected exactly one cut");
let (_, _intercept, coefficients) = &cuts[0];
assert!(
coefficients[0] < 0.0,
"cut coefficient must be negative (more storage → less future cost), \
got {} — likely the Benders cut sign bug has been reintroduced",
coefficients[0]
);
assert!(
(coefficients[0] - (-2.0)).abs() < 1e-10,
"expected coefficient=-2.0, got {}",
coefficients[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cut_is_tight_at_trial_point() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let x_hat = 30.0_f64;
let mut exchange = exchange_with_states(n_state, vec![vec![x_hat]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let q_xhat = 200.0_f64; let dual_storage = -4.0_f64;
let solution = solution_1_0(q_xhat, dual_storage);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, intercept, coefficients) = &cuts[0];
let cut_at_xhat = intercept + coefficients[0] * x_hat;
assert!(
(cut_at_xhat - q_xhat).abs() < 1e-10,
"cut must be tight at trial point: \
cut_value={cut_at_xhat}, Q(x̂)={q_xhat}, \
intercept={intercept}, coeff={}, x̂={x_hat}",
coefficients[0]
);
}
#[test]
fn single_rank_backward_pass_with_local_backend_produces_correct_fcf() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 2_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 4);
assert_eq!(fcf.active_cuts(0).count(), 2);
assert_eq!(fcf.active_cuts(1).count(), 2);
assert_eq!(fcf.active_cuts(2).count(), 0);
}
#[test]
#[allow(clippy::too_many_lines)]
fn forward_pass_index_matches_global_scenario_index() {
let n_stages = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 6_u32; let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(
n_state,
vec![
vec![1.0],
vec![2.0],
vec![3.0],
vec![4.0],
vec![5.0],
vec![6.0],
],
);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(50.0, 0.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 2,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let meta = &fcf.pools[0].metadata[17];
assert_eq!(meta.iteration_generated, 2, "iteration_generated must be 2");
assert_eq!(
meta.forward_pass_index, 5,
"forward_pass_index must be 5 (= global m)"
);
}
#[test]
fn warm_start_uses_prepopulated_forward_basis() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let warm_start_calls = workspaces[0].solver.inner().warm_start_calls;
assert_eq!(
warm_start_calls, 1,
"first opening at successor stage must call solve(Some(&basis)) \
when basis_store.get(0, 1) is pre-populated (warm_start_calls == 1, got {warm_start_calls})"
);
}
#[test]
fn multi_opening_subsequent_openings_use_internal_hotstart() {
let n_stages = 2_usize;
let n_openings = 3_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = StubComm;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let warm_start_calls = workspaces[0].solver.inner().warm_start_calls;
assert_eq!(
warm_start_calls, 0,
"P3b: no warm-start calls expected when BasisStore is empty \
(warm_start_calls == 0, got {warm_start_calls})"
);
}
#[test]
fn backward_solver_error_propagates() {
let n_stages = 2_usize;
let n_openings = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(0.0, 0.0);
let solver = MockSolver::infeasible_on(solution, 0);
let comm = StubComm;
let pre_basis = Basis::new(templates[1].num_cols, templates[1].num_rows);
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = basis_store_with_one(exchange.local_count(), n_stages, 0, 1, pre_basis);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
matches!(result, Err(crate::SddpError::Infeasible { .. })),
"expected SddpError::Infeasible, got: {result:?}",
);
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore must not be mutated by the backward pass error path"
);
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_possible_truncation,
clippy::cast_precision_loss
)]
fn test_backward_pass_parallel_cut_determinism() {
use crate::lp_builder::PatchBuffer;
let n_stages = 3_usize;
let n_openings = 2_usize;
let n_trial_points = 8_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
#[allow(clippy::cast_possible_truncation)]
let forward_passes = n_trial_points as u32;
let states: Vec<Vec<f64>> = (0..n_trial_points).map(|i| vec![i as f64 + 1.0]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let comm = StubComm;
let mut fcf_1 =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let solver_1 = MockSolver::always_ok(solution.clone());
let mut workspaces_1 = vec![SolverWorkspace {
rank: 0,
worker_id: 0,
solver: ProfiledSolver::new(solver_1),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
}];
let mut basis_store_1 = empty_basis_store(exchange.local_count(), n_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces_1,
basis_store: &mut basis_store_1,
ctx: &ctx,
baked: &mut templates.clone(),
fcf: &mut fcf_1,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
let mut fcf_4 =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut workspaces_4: Vec<SolverWorkspace<MockSolver>> = (0..4_i32)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: idx,
solver: ProfiledSolver::new(MockSolver::always_ok(solution.clone())),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store_4 = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces_4,
basis_store: &mut basis_store_4,
ctx: &ctx,
baked: &mut templates.clone(),
fcf: &mut fcf_4,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
for t in 0..(n_stages - 1) {
let cuts_1: Vec<_> = fcf_1.active_cuts(t).collect();
let cuts_4: Vec<_> = fcf_4.active_cuts(t).collect();
assert_eq!(
cuts_1.len(),
cuts_4.len(),
"stage {t}: cut count differs (1 workspace: {}, 4 workspaces: {})",
cuts_1.len(),
cuts_4.len()
);
for (idx, ((slot_1, intercept_1, coeff_1), (slot_4, intercept_4, coeff_4))) in
cuts_1.iter().zip(cuts_4.iter()).enumerate()
{
assert_eq!(
slot_1, slot_4,
"stage {t} cut {idx}: slot mismatch ({slot_1} vs {slot_4})"
);
assert!(
(intercept_1 - intercept_4).abs() < 1e-12,
"stage {t} cut {idx}: intercept mismatch ({intercept_1} vs {intercept_4})"
);
assert_eq!(
coeff_1.len(),
coeff_4.len(),
"stage {t} cut {idx}: coefficient vector length mismatch"
);
for (j, (c1, c4)) in coeff_1.iter().zip(coeff_4.iter()).enumerate() {
assert!(
(c1 - c4).abs() < 1e-12,
"stage {t} cut {idx} coeff[{j}]: {c1} vs {c4}"
);
}
}
}
assert_eq!(fcf_1.active_cuts(n_stages - 1).count(), 0);
assert_eq!(fcf_4.active_cuts(n_stages - 1).count(), 0);
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_with_load(
n_stages: usize,
branching_factor: usize,
mean_mw: f64,
std_mw: f64,
) -> cobre_stochastic::StochasticContext {
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{CorrelationModel, InflowModel, LoadModel};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_stochastic::context::{ClassSchemes, OpeningTreeInputs, build_stochastic_context};
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let make_stage = |idx: usize| Stage {
index: idx,
id: idx as i32,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor,
noise_method: NoiseMethod::Saa,
},
};
let stages: Vec<Stage> = (0..n_stages).map(make_stage).collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let inflow_models: Vec<InflowModel> = (0..n_stages)
.map(|idx| InflowModel {
hydro_id: EntityId(10),
stage_id: idx as i32,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
})
.collect();
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let load_models: Vec<LoadModel> = (0..n_stages)
.map(|idx| LoadModel {
bus_id: EntityId(1),
stage_id: idx as i32,
mean_mw,
std_mw,
})
.collect();
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles: std::collections::BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(inflow_models)
.load_models(load_models)
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_load_patches_applied() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 300.0, 30.0);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1, 0, 0);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: ProfiledSolver::new(MockSolver::always_ok(solution)),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
workspaces[0].scratch.load_rhs_buf.len(),
1,
"load_rhs_buf must have 1 entry (1 load bus × 1 block)"
);
assert!(
workspaces[0].scratch.load_rhs_buf[0] > 0.0,
"load realization must be positive with mean=300, std=30: got {}",
workspaces[0].scratch.load_rhs_buf[0]
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_no_load_buses_unchanged() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 0, 0, 0, 0);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -2.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: ProfiledSolver::new(MockSolver::always_ok(solution)),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let _ = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1_usize; 2],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
workspaces[0].patch_buf.forward_patch_count(),
2,
"forward_patch_count must be N+z_inflow=2 when n_load_buses=0, got {}",
workspaces[0].patch_buf.forward_patch_count()
);
assert!(
workspaces[0].scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn backward_pass_cut_coefficients_unaffected() {
let n_stages = 2_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context_with_load(n_stages, n_openings, 200.0, 20.0);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, 1, 1, 0, 0);
let template = StageTemplate {
num_cols: 3,
num_rows: 2,
num_nz: 1,
col_starts: vec![0_i32, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, 0.0, 0.0],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY],
objective: vec![0.0, 0.0, 1.0],
row_lower: vec![50.0, 100.0],
row_upper: vec![50.0, 100.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
};
let templates = vec![template; n_stages];
let base_rows = vec![1_usize; n_stages];
let noise_scale = vec![1.0_f64; n_stages];
let n_state = indexer.n_state; let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(80.0, -3.0);
let ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: ProfiledSolver::new(MockSolver::always_ok(solution)),
patch_buf,
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(1),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let mut workspaces = vec![ws];
let comm = StubComm;
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let load_balance_row_starts = vec![10_usize; n_stages];
let load_bus_indices = vec![0_usize];
let block_counts_per_stage = vec![1_usize; n_stages];
let mut csb = CutSyncBuffers::with_distribution(n_state, 64, 1, exchange.local_count());
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 1,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 1);
let cuts: Vec<_> = fcf.active_cuts(0).collect();
assert_eq!(cuts.len(), 1);
let (_, _intercept, coefficients) = &cuts[0];
assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length must be n_state={n_state}, got {} — \
load buses must not add state variables",
coefficients.len()
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn per_stage_cut_sync_invariant_after_bug1_fix() {
use cobre_comm::LocalBackend;
let n_stages = 4_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, forward_passes as usize, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 9);
assert_eq!(fcf.active_cuts(0).count(), 3, "stage 0 must have 3 cuts");
assert_eq!(fcf.active_cuts(1).count(), 3, "stage 1 must have 3 cuts");
assert_eq!(fcf.active_cuts(2).count(), 3, "stage 2 must have 3 cuts");
assert_eq!(
fcf.active_cuts(3).count(),
0,
"terminal stage must have 0 cuts"
);
assert!(
result.cut_sync_time_ms < 10_000,
"cut_sync_time_ms should be reasonable, got {}",
result.cut_sync_time_ms
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn metadata_sync_updates_active_count_and_last_active_iter() {
use cobre_comm::LocalBackend;
let n_stages = 3_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let forward_passes = 3_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 20, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0], vec![20.0], vec![30.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let solver = MockSolver::always_ok_with_binding_cuts(solution);
let comm = LocalBackend;
let mut workspaces = single_workspace(solver, n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, forward_passes as usize, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 1,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(result.cuts_generated, 6);
assert_eq!(fcf.pools[1].populated_count, 6);
for slot in 3..6 {
assert!(
fcf.pools[1].metadata[slot].active_count > 0,
"slot {slot} active_count should be > 0 (cuts were binding)"
);
assert_eq!(
fcf.pools[1].metadata[slot].active_count, 6,
"slot {slot} active_count should be 6 (3 trial points × 2 openings)"
);
assert_eq!(
fcf.pools[1].metadata[slot].last_active_iter, 1,
"slot {slot} last_active_iter should be 1 (current iteration)"
);
}
assert_eq!(fcf.pools[2].populated_count, 0);
}
#[allow(clippy::too_many_lines, clippy::cast_precision_loss)]
fn run_backward_pass_with_n_workers(n_workers: usize) -> FutureCostFunction {
use crate::lp_builder::PatchBuffer;
let n_stages = 2_usize;
let local_work = 6_usize;
let n_openings = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
#[allow(clippy::cast_possible_truncation)]
let forward_passes = local_work as u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 64, &vec![0; n_stages]);
let states: Vec<Vec<f64>> = (0..local_work)
.map(|i| vec![(i + 1) as f64 * 10.0])
.collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("worker_id fits in i32"),
solver: ProfiledSolver::new(MockSolver::always_ok(solution.clone())),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let comm = StubComm;
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.unwrap();
assert_eq!(
result.cuts_generated, local_work,
"n_workers={n_workers}: expected {local_work} cuts, got {}",
result.cuts_generated,
);
fcf
}
#[test]
fn work_stealing_produces_identical_results_across_worker_counts() {
let fcf_1 = run_backward_pass_with_n_workers(1);
let fcf_3 = run_backward_pass_with_n_workers(3);
let num_stages = 2;
assert!(
fcf_1.active_cuts(0).count() > 0,
"1-worker run produced no cuts at stage 0"
);
for stage in 0..num_stages {
let cuts_1: Vec<_> = fcf_1.active_cuts(stage).collect();
let cuts_3: Vec<_> = fcf_3.active_cuts(stage).collect();
assert_eq!(
cuts_1.len(),
cuts_3.len(),
"stage {stage}: cut count mismatch ({} vs {})",
cuts_1.len(),
cuts_3.len(),
);
for (i, ((s1, int1, c1), (s3, int3, c3))) in cuts_1.iter().zip(&cuts_3).enumerate() {
assert_eq!(
s1, s3,
"stage {stage}, cut {i}: slot mismatch ({s1} vs {s3})"
);
assert_eq!(
int1, int3,
"stage {stage}, cut {i}: intercept mismatch ({int1} vs {int3})"
);
assert_eq!(
c1, c3,
"stage {stage}, cut {i}: coefficients mismatch ({c1:?} vs {c3:?})"
);
}
}
}
fn make_stats(solve_s: f64, load_s: f64, set_bounds_s: f64, basis_set_s: f64) -> SolverStatistics {
SolverStatistics {
total_solve_time_seconds: solve_s,
total_load_model_time_seconds: load_s,
total_set_bounds_time_seconds: set_bounds_s,
total_basis_set_time_seconds: basis_set_s,
..SolverStatistics::default()
}
}
fn decompose_overhead(
pairs: &[(SolverStatistics, SolverStatistics)],
parallel_wall_ms: u64,
) -> (u64, u64, u64) {
use crate::solver_stats::SolverStatsDelta;
#[allow(clippy::cast_precision_loss)]
let n_workers = pairs.len() as f64;
let worker_deltas: Vec<SolverStatsDelta> = pairs
.iter()
.map(|(before, after)| SolverStatsDelta::from_snapshots(before, after))
.collect();
let stage_setup_ms: f64 = worker_deltas
.iter()
.map(|d| d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms)
.sum();
let worker_totals: Vec<f64> = worker_deltas
.iter()
.map(|d| {
d.solve_time_ms + d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms
})
.collect();
let max_worker_ms = worker_totals.iter().copied().fold(0.0_f64, f64::max);
let avg_worker_ms = if worker_totals.is_empty() {
0.0_f64
} else {
worker_totals.iter().sum::<f64>() / n_workers
};
let stage_imbalance_ms = (max_worker_ms - avg_worker_ms).max(0.0);
#[allow(clippy::cast_precision_loss)]
let stage_scheduling_ms = (parallel_wall_ms as f64 - max_worker_ms).max(0.0);
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
(
stage_setup_ms as u64,
stage_imbalance_ms as u64,
stage_scheduling_ms as u64,
)
}
#[test]
fn decompose_four_workers_different_solve_times() {
let zero = SolverStatistics::default();
let pairs = vec![
(zero.clone(), make_stats(0.1, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.2, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.15, 0.0, 0.0, 0.0)), (zero.clone(), make_stats(0.18, 0.0, 0.0, 0.0)), ];
let (setup_ms, imbalance_ms, scheduling_ms) = decompose_overhead(&pairs, 250);
assert_eq!(setup_ms, 0, "no setup work expected");
assert_eq!(
imbalance_ms, 42,
"imbalance = trunc(max(200.0) - avg(157.5)) = trunc(42.5) = 42"
);
assert_eq!(scheduling_ms, 50, "scheduling overhead = wall - max_worker");
}
#[test]
fn decompose_setup_time_is_aggregate_non_solve_work() {
let zero = SolverStatistics::default();
let pairs = vec![
(zero.clone(), make_stats(0.0, 0.020, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.025, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.015, 0.0, 0.0)), (zero.clone(), make_stats(0.0, 0.022, 0.0, 0.0)), ];
let (setup_ms, _imbalance_ms, _scheduling_ms) = decompose_overhead(&pairs, 300);
assert_eq!(
setup_ms, 82,
"aggregate setup must sum all workers' non-solve work"
);
}
#[test]
fn decompose_identical_workers_zero_imbalance() {
let zero = SolverStatistics::default();
let after = make_stats(0.1, 0.01, 0.002, 0.001);
let pairs = vec![
(zero.clone(), after.clone()),
(zero.clone(), after.clone()),
(zero.clone(), after.clone()),
];
let (_, imbalance_ms, _) = decompose_overhead(&pairs, 200);
assert_eq!(
imbalance_ms, 0,
"identical workers must have zero imbalance"
);
}
#[test]
fn decompose_single_worker() {
let zero = SolverStatistics::default();
let after = make_stats(0.1, 0.020, 0.0, 0.0);
let pairs = vec![(zero.clone(), after)];
let (setup_ms, imbalance_ms, scheduling_ms) = decompose_overhead(&pairs, 150);
assert_eq!(setup_ms, 20, "single worker: setup = 20 ms");
assert_eq!(imbalance_ms, 0, "single worker: imbalance must be 0");
assert_eq!(
scheduling_ms, 30,
"single worker: scheduling = wall - worker_total"
);
}
#[test]
fn decompose_scheduling_clamped_when_worker_exceeds_wall() {
let zero = SolverStatistics::default();
let after = make_stats(0.2, 0.0, 0.0, 0.0);
let pairs = vec![(zero.clone(), after)];
let (_, _, scheduling_ms) = decompose_overhead(&pairs, 180);
assert_eq!(scheduling_ms, 0, "negative scheduling must be clamped to 0");
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_precision_loss,
clippy::cast_possible_truncation
)]
fn allgatherv_single_rank_two_workers_stage_stats_has_per_worker_entries() {
use crate::lp_builder::PatchBuffer;
let n_stages = 2_usize;
let n_openings = 4_usize;
let n_workers = 2_usize;
let local_work = 4_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let solution = solution_1_0(100.0, -5.0);
let states: Vec<Vec<f64>> = (0..local_work).map(|i| vec![(i + 1) as f64]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits in i32"),
solver: ProfiledSolver::new(MockSolver::always_ok(solution.clone())),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut fcf =
FutureCostFunction::new(n_stages, n_state, local_work as u32, 64, &vec![0; n_stages]);
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &StubComm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.expect("single-rank 2-worker backward must not error");
assert_eq!(
result.stage_stats.len(),
1,
"expected 1 backward stage entry (successor=1)"
);
let (successor, entries) = &result.stage_stats[0];
assert_eq!(*successor, 1_usize, "successor index must be 1");
for (rank, _wid, _omega, _delta) in entries {
assert_eq!(*rank, 0_i32, "all entries must have rank=0 for np=1");
}
let omega0_wids: Vec<i32> = entries
.iter()
.filter(|(_, _, omega, _)| *omega == 0)
.map(|(_, wid, _, _)| *wid)
.collect();
assert!(
omega0_wids.contains(&0),
"worker_id=0 must appear at omega=0"
);
assert!(
omega0_wids.contains(&1),
"worker_id=1 must appear at omega=0"
);
}
#[test]
#[allow(
clippy::too_many_lines,
clippy::cast_precision_loss,
clippy::cast_possible_truncation
)]
fn allgatherv_dual_rank_stub_stage_stats_contains_both_ranks() {
use crate::lp_builder::PatchBuffer;
struct DualRankStubComm;
impl Communicator for DualRankStubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
counts: &[usize],
displs: &[usize],
) -> Result<(), CommError> {
for (r, (&count, &displ)) in counts.iter().zip(displs).enumerate() {
let src = &send[..count.min(send.len())];
recv[displ..displ + src.len()].copy_from_slice(src);
let _ = r; }
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
Ok(())
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
2
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
let n_stages = 2_usize;
let n_openings = 2_usize;
let n_workers = 1_usize;
let local_work = 2_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0(); n_stages];
let base_rows = vec![1_usize; n_stages];
let n_state = indexer.n_state;
let solution = solution_1_0(100.0, -5.0);
let states: Vec<Vec<f64>> = (0..local_work).map(|i| vec![(i + 1) as f64]).collect();
let mut exchange = exchange_with_states(n_state, states);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits in i32"),
solver: ProfiledSolver::new(MockSolver::always_ok(solution.clone())),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut fcf =
FutureCostFunction::new(n_stages, n_state, local_work as u32, 64, &vec![0; n_stages]);
let mut csb = CutSyncBuffers::new(n_state, local_work, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(templates.len()),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &DualRankStubComm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
})
.expect("dual-rank stub backward must not error");
assert_eq!(result.stage_stats.len(), 1);
let (_, entries) = &result.stage_stats[0];
let ranks_seen: Vec<i32> = entries
.iter()
.map(|(rank, _, _, _)| *rank)
.collect::<std::collections::HashSet<i32>>()
.into_iter()
.collect();
assert!(
ranks_seen.contains(&0),
"rank=0 must appear in stage_stats; got {ranks_seen:?}"
);
assert!(
ranks_seen.contains(&1),
"rank=1 must appear in stage_stats; got {ranks_seen:?}"
);
}
#[allow(clippy::too_many_lines)]
fn run_one_trial_point_with_stores(
basis_store: &mut crate::workspace::BasisStore,
) -> Result<Vec<SolverWorkspace<MockSolver>>, crate::SddpError> {
use crate::context::StageContext;
let n_stages = 2_usize;
let n_openings = 1_usize;
let n_state = 1_usize;
let stochastic = make_stochastic_context(n_stages, n_openings);
let indexer = {
let mut ix = StageIndexer::new(n_state, 0);
ix.finalize_for_test();
ix
};
let solver = MockSolver::always_ok(solution_1_0(100.0, -5.0));
let mut workspaces = single_workspace(solver, n_state);
let ws = &mut workspaces[0];
ws.backward_accum
.outcomes
.push(crate::risk_measure::BackwardOutcome {
intercept: 0.0,
coefficients: vec![0.0; n_state],
objective_value: 0.0,
});
ws.backward_accum
.per_opening_stats
.push(SolverStatsDelta::default());
ws.backward_accum.agg_coefficients.resize(n_state, 0.0);
let exchange = exchange_with_states(n_state, vec![vec![5.0]]);
let templates: &'static _ = Box::leak(Box::new(vec![
minimal_template_1_0(),
minimal_template_1_0(),
]));
let base_rows: &'static _ = Box::leak(Box::new(vec![1_usize, 1_usize]));
let ctx: StageContext<'static> = StageContext {
templates,
base_rows,
noise_scale: Box::leak(Box::new(vec![])),
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: Box::leak(Box::new(vec![])),
load_bus_indices: Box::leak(Box::new(vec![])),
block_counts_per_stage: Box::leak(Box::new(vec![])),
ncs_max_gen: Box::leak(Box::new(vec![])),
ncs_allow_curtailment: Box::leak(Box::new(vec![])),
discount_factors: Box::leak(Box::new(vec![])),
cumulative_discount_factors: Box::leak(Box::new(vec![])),
stage_lag_transitions: Box::leak(Box::new(vec![])),
noise_group_ids: Box::leak(Box::new(vec![])),
downstream_par_order: 0,
};
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let training_ctx = TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
};
let iteration: u64 = 1;
let fwd_offset: usize = 0;
let succ_probabilities = vec![1.0_f64; n_openings];
let successor_active_slots: Vec<usize> = vec![];
let baked_template = minimal_template_1_0();
let fcf = FutureCostFunction::new(n_stages, 1, 1, 10, &vec![0u32; n_stages]);
let empty_cut_batch = RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
};
let succ_spec = super::SuccessorSpec {
t: 0,
successor: 1,
my_rank: 0,
probabilities: &succ_probabilities,
cut_batch: &empty_cut_batch,
num_cuts_at_successor: 0,
template_num_rows: baked_template.num_rows,
baked_template: &baked_template,
successor_active_slots: &successor_active_slots,
cut_activity_tolerance: 0.0,
successor_populated_count: fcf.pools[1].populated_count,
successor_pool: &fcf.pools[1],
};
let mut basis_slices = basis_store.split_workers_mut(1);
let mut basis_slice = basis_slices.remove(0);
let ws = &mut workspaces[0];
super::load_backward_lp(ws, &succ_spec);
ws.backward_accum
.per_opening_stats
.resize_with(n_openings, SolverStatsDelta::default);
for slot in &mut ws.backward_accum.per_opening_stats[..n_openings] {
*slot = SolverStatsDelta::default();
}
ws.backward_accum.slot_increments.resize(1, 0);
ws.backward_accum.slot_increments[..1].fill(0);
if ws.backward_accum.agg_arena.len() < n_state {
ws.backward_accum.agg_arena.resize(n_state, 0.0_f64);
}
super::process_trial_point_backward(
ws,
&ctx,
&training_ctx,
&exchange,
fwd_offset,
iteration,
&risk_measures,
&succ_spec,
&mut basis_slice,
&super::StageOpeningSolver::Baked,
0,
0,
)?;
Ok(workspaces)
}
#[test]
fn resolve_backward_basis_returns_some_when_slot_is_populated() {
use crate::workspace::{BasisStore, CapturedBasis};
let b = CapturedBasis::new(2, 2, 0, 0, 0);
let mut store = BasisStore::new(1, 2);
*store.get_mut(0, 1) = Some(b);
let slices = store.split_workers_mut(1);
let slice = &slices[0];
let basis_ref = super::resolve_backward_basis(slice, 0, 1);
assert!(basis_ref.is_some(), "expected Some when slot has a basis");
drop(slices);
}
#[test]
fn resolve_backward_basis_returns_none_when_slot_is_empty() {
use crate::workspace::BasisStore;
let mut store = BasisStore::new(1, 2);
let slices = store.split_workers_mut(1);
let slice = &slices[0];
let basis_ref = super::resolve_backward_basis(slice, 0, 1);
assert!(basis_ref.is_none(), "expected None for empty slot");
drop(slices);
}
#[test]
fn backward_write_populates_basis_store_at_omega_zero() {
use crate::workspace::BasisStore;
let mut basis_store = BasisStore::new(1, 2);
let workspaces = run_one_trial_point_with_stores(&mut basis_store).unwrap();
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore[0, 1] must be Some after backward write at omega=0"
);
let captured = basis_store.get(0, 1).unwrap();
assert_eq!(
captured.state_at_capture,
vec![5.0_f64],
"state_at_capture must equal x_hat"
);
assert_eq!(
workspaces[0].solver.inner().call_count,
1,
"solver must be called exactly once for a 1-opening backward pass"
);
}
#[test]
fn backward_write_preserves_slot_on_infeasibility_at_omega_zero() {
use cobre_solver::Basis;
use crate::workspace::{BasisStore, CapturedBasis};
let pre_existing = CapturedBasis {
basis: Basis::new(2, 2),
base_row_count: 2,
cut_row_slots: Vec::new(),
state_at_capture: vec![42.0],
};
let mut basis_store = BasisStore::new(1, 2);
*basis_store.get_mut(0, 1) = Some(pre_existing);
assert_eq!(
basis_store.get(0, 1).unwrap().state_at_capture,
vec![42.0_f64],
"sentinel must be in place before the infeasible solve"
);
let result = run_one_trial_point_with_stores(&mut basis_store);
assert!(result.is_ok(), "expected Ok for successful solve");
assert!(
basis_store.get(0, 1).is_some(),
"BasisStore[0, 1] must not be None after successful reuse-path write at ω=0"
);
assert_eq!(
basis_store.get(0, 1).unwrap().state_at_capture,
vec![5.0_f64],
"state_at_capture must be updated to x_hat by the reuse path"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn handshake_passes_with_local_backend() {
use crate::lp_builder::PatchBuffer;
let n_stages = 1_usize;
let n_workers = 2_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let solution = solution_1_0(100.0, -5.0);
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|idx| SolverWorkspace {
rank: 0,
worker_id: i32::try_from(idx).expect("idx fits i32"),
solver: ProfiledSolver::new(MockSolver::always_ok(solution.clone())),
patch_buf: PatchBuffer::new(1, 0, 0, 0, 0, 0),
current_state: Vec::with_capacity(n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::new(),
inflow_m3s_buf: Vec::new(),
lag_matrix_buf: Vec::new(),
par_inflow_buf: Vec::new(),
eta_floor_buf: Vec::new(),
zero_targets_buf: Vec::new(),
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
recon_slot_lookup: Vec::new(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
anticipated_state_buf: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
})
.collect();
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 1, 1);
let comm = StubComm;
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
assert!(
result.is_ok(),
"handshake must pass when all ranks have the same n_workers_local; got: {result:?}"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn handshake_rejects_nonuniform_workers() {
struct NonUniformStubComm;
impl Communicator for NonUniformStubComm {
fn allgatherv<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
recv[..send.len()].copy_from_slice(send);
Ok(())
}
fn allreduce<T: CommData>(
&self,
send: &[T],
recv: &mut [T],
op: ReduceOp,
) -> Result<(), CommError> {
match op {
ReduceOp::Min => {
for r in recv.iter_mut() {
*r = T::default();
}
}
_ => {
recv[..send.len()].copy_from_slice(send);
}
}
Ok(())
}
fn broadcast<T: CommData>(&self, _buf: &mut [T], _root: usize) -> Result<(), CommError> {
Ok(())
}
fn barrier(&self) -> Result<(), CommError> {
Ok(())
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
2
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
let n_stages = 1_usize;
let stochastic = make_stochastic_context(n_stages, 1);
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let templates = vec![minimal_template_1_0()];
let base_rows = vec![1_usize];
let n_state = indexer.n_state;
let forward_passes = 1_u32;
let mut fcf =
FutureCostFunction::new(n_stages, n_state, forward_passes, 10, &vec![0; n_stages]);
let mut exchange = exchange_with_states(n_state, vec![vec![10.0]]);
let horizon = HorizonMode::Finite {
num_stages: n_stages,
};
let risk_measures = vec![RiskMeasure::Expectation; n_stages];
let comm = NonUniformStubComm;
let mut workspaces =
single_workspace(MockSolver::always_ok(solution_1_0(100.0, -5.0)), n_state);
let mut basis_store = empty_basis_store(exchange.local_count(), n_stages);
let mut csb = CutSyncBuffers::new(n_state, 1, 1);
let result = run_backward_pass(&mut crate::backward_pass_state::BackwardPassInputs {
workspaces: &mut workspaces,
basis_store: &mut basis_store,
ctx: &StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
},
baked: &mut templates.clone(),
fcf: &mut fcf,
cut_batches: &mut empty_cut_batches(n_stages),
training_ctx: &TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: None,
noise_key_diag: None,
},
comm: &comm,
records: &[],
iteration: 0,
local_work: exchange.local_count(),
fwd_offset: 0,
risk_measures: &risk_measures,
exchange: &mut exchange,
cut_activity_tolerance: 0.0,
cut_sync_bufs: &mut csb,
visited_archive: None,
event_sender: None,
});
match result {
Err(crate::SddpError::Validation(ref msg)) => {
assert!(
msg.contains("non-uniform n_workers_local"),
"error message must contain 'non-uniform n_workers_local'; got: {msg}"
);
assert!(
msg.contains("min=0"),
"error message must mention min=0 (stub Min returns T::default()); got: {msg}"
);
assert!(
msg.contains("max=1"),
"error message must mention max=1 (stub Max echoes local=1); got: {msg}"
);
assert!(
msg.contains("local=1"),
"error message must mention local=1 (single workspace); got: {msg}"
);
}
other => panic!(
"expected Err(SddpError::Validation(_)) from non-uniform handshake, got: {other:?}"
),
}
}
fn make_anticipated_indexer_local(
n_anticipated: usize,
k_max: usize,
anticipated_lead_stages: Vec<usize>,
) -> StageIndexer {
use crate::indexer::{EquipmentCounts, EvapConfig, FphaColumnLayout};
let mut indexer = StageIndexer::with_equipment_and_evaporation(
&EquipmentCounts {
hydro_count: 0,
max_par_order: 0,
n_thermals: 0,
n_lines: 0,
n_buses: 1,
n_blks: 1,
has_inflow_penalty: false,
max_deficit_segments: 1,
n_anticipated,
k_max,
anticipated_lead_stages,
anticipated_thermal_indices: (0..n_anticipated).collect(),
},
&FphaColumnLayout {
hydro_indices: vec![],
planes_per_hydro: vec![],
},
&EvapConfig {
hydro_indices: vec![],
},
);
indexer.finalize_for_test();
indexer
}
#[test]
fn cut_coefficient_sign_convention_slot_zero_k2() {
let indexer = make_anticipated_indexer_local(1, 2, vec![2]);
assert_eq!(indexer.anticipated_state.start, 0);
assert_eq!(indexer.n_state, 2);
let mut fcf = FutureCostFunction::new(3, indexer.n_state, 1, 10, &[0; 3]);
let mut coefficients = vec![0.0_f64; indexer.n_state];
coefficients[indexer.anticipated_state.start] = 7.5;
fcf.add_cut(1, 0, 0, 0.0, &coefficients);
let mut batch = RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
};
crate::cut::row::build_cut_row_batch_into(&mut batch, &fcf, 1, &indexer, &[]);
let lp_col = indexer.state_to_lp_column(indexer.anticipated_state.start);
assert_eq!(lp_col, 1);
let pos = batch
.col_indices
.iter()
.position(|&c| c == lp_col as i32)
.expect("lp_col must appear in batch.col_indices");
assert!(
(batch.values[pos] - (-7.5_f64)).abs() < f64::EPSILON,
"expected batch.values[pos]=-7.5 for lp_col={lp_col}, got {}",
batch.values[pos]
);
}
use crate::cut_selection::{CutMetadata, CutSelectionStrategy};
use crate::dcs::DcsParams;
use crate::lp_builder::PatchBuffer;
use crate::workspace::WorkspaceSizing;
use cobre_solver::ActiveSolver;
fn dcs_core_template() -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 1,
num_nz: 2,
col_starts: vec![0_i32, 1, 1, 2, 2],
row_indices: vec![0_i32, 0],
values: vec![1.0, -1.0],
col_lower: vec![0.0, 0.0, 0.0, -1.0e6],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY, 1.0e6],
objective: vec![0.0, 0.0, 0.0, 1.0],
row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn dcs_two_stage_fcf() -> FutureCostFunction {
let n_stages = 2;
let mut fcf = FutureCostFunction::new(n_stages, 1, 8, 10, &vec![0; n_stages]);
fcf.add_cut(1, 0, 0, 1.0, &[0.0]);
fcf.add_cut(1, 0, 1, 0.0, &[2.0]);
fcf.add_cut(1, 0, 2, 3.0, &[0.0]);
let meta = |generated: u64, last: u64| CutMetadata {
iteration_generated: generated,
forward_pass_index: 0,
active_count: 0,
last_active_iter: last,
};
fcf.pools[1].metadata[0] = meta(1, 5);
fcf.pools[1].metadata[1] = meta(1, 1); fcf.pools[1].metadata[2] = meta(1, 5);
fcf
}
fn dcs_active_workspace() -> Vec<SolverWorkspace<ActiveSolver>> {
let sizing = WorkspaceSizing {
hydro_count: 1,
max_par_order: 0,
n_load_buses: 0,
max_blocks: 0,
downstream_par_order: 0,
max_openings: 1,
initial_pool_capacity: 16,
n_state: 1,
max_local_fwd: 1,
total_forward_passes: 1,
noise_dim: 1,
n_anticipated: 0,
k_max: 0,
};
let solver = ActiveSolver::new().expect("ActiveSolver::new()");
vec![SolverWorkspace::new(
0,
0,
solver,
PatchBuffer::new(1, 0, 0, 0, 0, 0),
1,
sizing,
)]
}
fn run_dcs_backward_trial_point(
dcs: Option<DcsParams>,
iteration: u64,
) -> (super::StagedCut, Vec<f64>, Vec<u64>) {
run_dcs_backward_trial_point_at(dcs, iteration, 2.0)
}
fn run_dcs_backward_trial_point_at(
dcs: Option<DcsParams>,
iteration: u64,
x_hat: f64,
) -> (super::StagedCut, Vec<f64>, Vec<u64>) {
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let n_state = indexer.n_state;
let core = dcs_core_template();
let templates = vec![core.clone(), core.clone()];
let base_rows = vec![0_usize, 0_usize];
let stochastic = make_stochastic_context(2, 1);
let horizon = HorizonMode::Finite { num_stages: 2 };
let risk_measures = vec![RiskMeasure::Expectation; 2];
let mut fcf = dcs_two_stage_fcf();
let cut_batch = crate::cut::row::build_cut_row_batch(&fcf, 1, &indexer, &[]);
let successor_active_slots: Vec<usize> = (0..fcf.pools[1].populated_count).collect();
let num_cuts = successor_active_slots.len();
let mut exchange = exchange_with_states(n_state, vec![vec![x_hat]]);
let mut workspaces = dcs_active_workspace();
let mut basis_store = empty_basis_store(exchange.local_count(), 2);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let training_ctx = TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs,
noise_key_diag: None,
};
let probabilities = vec![1.0_f64];
let succ = super::SuccessorSpec {
t: 0,
successor: 1,
my_rank: 0,
probabilities: &probabilities,
cut_batch: &cut_batch,
num_cuts_at_successor: num_cuts,
template_num_rows: core.num_rows,
baked_template: &core,
successor_active_slots: &successor_active_slots,
cut_activity_tolerance: 0.0,
successor_populated_count: fcf.pools[1].populated_count,
successor_pool: &fcf.pools[1],
};
let mut basis_slices = basis_store.split_workers_mut(1);
let ws = &mut workspaces[0];
let opening_solver = super::StageOpeningSolver::from_dcs_params(
dcs.filter(|params| params.is_active(iteration)),
);
opening_solver.prepare(ws, &ctx, &succ, iteration);
let n_openings = succ.probabilities.len();
while ws.backward_accum.outcomes.len() < n_openings {
ws.backward_accum
.outcomes
.push(crate::risk_measure::BackwardOutcome {
intercept: 0.0,
coefficients: vec![0.0_f64; n_state],
objective_value: 0.0,
});
}
let pop = succ.successor_populated_count;
if ws.backward_accum.slot_increments.len() < pop {
ws.backward_accum.slot_increments.resize(pop, 0);
}
ws.backward_accum.slot_increments[..pop].fill(0);
if ws.backward_accum.agg_coefficients.len() < n_state {
ws.backward_accum.agg_coefficients.resize(n_state, 0.0);
}
if ws.backward_accum.agg_arena.len() < n_state {
ws.backward_accum.agg_arena.resize(n_state, 0.0);
}
if ws.backward_accum.metadata_sync_contribution.len() < pop {
ws.backward_accum.metadata_sync_contribution.resize(pop, 0);
}
ws.backward_accum.metadata_sync_contribution[..pop].fill(0);
ws.backward_accum
.per_opening_stats
.resize_with(n_openings, SolverStatsDelta::default);
for slot in &mut ws.backward_accum.per_opening_stats[..n_openings] {
*slot = SolverStatsDelta::default();
}
let cut = super::process_trial_point_backward(
ws,
&ctx,
&training_ctx,
&exchange,
0,
iteration,
&risk_measures,
&succ,
&mut basis_slices[0],
&opening_solver,
0,
0,
)
.expect("backward trial-point solve must succeed");
let coefficients = staged_cut_coefficients(&cut, &ws.backward_accum.agg_arena).to_vec();
let meta_sync = ws.backward_accum.metadata_sync_contribution[..pop].to_vec();
let _ = (&mut fcf, &mut exchange);
(cut, coefficients, meta_sync)
}
fn dcs_params(start_iteration: u64) -> DcsParams {
DcsParams {
k1: None,
k2: 2,
nadic: 10,
epsilon_viol: 1e-10,
start_iteration,
max_inner_iterations: 50,
}
}
#[test]
fn backward_dcs_cut_equals_all_cuts_cut() {
let iteration = 5;
let (baked_cut, baked_coefficients, _) = run_dcs_backward_trial_point(None, iteration);
let (dcs_cut, dcs_coefficients, _) =
run_dcs_backward_trial_point(Some(dcs_params(2)), iteration);
assert!(
(baked_cut.intercept - dcs_cut.intercept).abs() < 1e-9,
"intercept: baked {} vs DCS {}",
baked_cut.intercept,
dcs_cut.intercept
);
assert_eq!(baked_coefficients.len(), dcs_coefficients.len());
for (i, (b, d)) in baked_coefficients.iter().zip(&dcs_coefficients).enumerate() {
assert!(
(b - d).abs() < 1e-9,
"coefficient[{i}]: baked {b} vs DCS {d}"
);
}
assert!(
(baked_coefficients[0] - 2.0).abs() < 1e-9,
"baked gradient must be the binding cut's 2.0, got {}",
baked_coefficients[0]
);
}
#[test]
fn backward_dcs_off_is_identical_to_baseline() {
let (cut_a, coefficients_a, _) = run_dcs_backward_trial_point(None, 5);
let (cut_b, coefficients_b, _) = run_dcs_backward_trial_point(None, 5);
assert_eq!(cut_a.intercept, cut_b.intercept);
assert_eq!(coefficients_a, coefficients_b);
assert!((coefficients_a[0] - 2.0).abs() < 1e-9);
}
#[test]
fn backward_dcs_inactive_before_start_iteration() {
let (baked_cut, baked_coefficients, baked_meta) = run_dcs_backward_trial_point(None, 1);
let (early_cut, early_coefficients, early_meta) =
run_dcs_backward_trial_point(Some(dcs_params(4)), 1);
assert_eq!(baked_cut.intercept, early_cut.intercept);
assert_eq!(baked_coefficients, early_coefficients);
assert_eq!(baked_meta, early_meta);
}
#[test]
fn backward_dcs_binding_counts_match_baked() {
let (_, _, baked_meta) = run_dcs_backward_trial_point(None, 5);
let (_, _, dcs_meta) = run_dcs_backward_trial_point(Some(dcs_params(2)), 5);
assert_eq!(
baked_meta,
vec![0, 1, 0],
"baked path must bump exactly binding slot 1, got {baked_meta:?}"
);
assert_eq!(
dcs_meta, baked_meta,
"DCS binding-count metadata must match baked (slot-correct via the \
resident CutRowMap), got DCS {dcs_meta:?} vs baked {baked_meta:?}"
);
}
#[test]
fn from_strategy_gates_the_backward_dcs_field() {
let dynamic = CutSelectionStrategy::Dynamic {
k1: None,
k2: 5,
nadic: 10,
epsilon_viol: 1e-10,
start_iteration: 2,
};
assert!(DcsParams::from_strategy(&dynamic).is_some());
let level1 = CutSelectionStrategy::Level1 {
check_frequency: 5,
tie_tolerance: 1e-10,
};
assert!(DcsParams::from_strategy(&level1).is_none());
}
fn dcs_params_k1(start_iteration: u64, k1: Option<u32>) -> DcsParams {
DcsParams {
k1,
k2: 2,
nadic: 10,
epsilon_viol: 1e-10,
start_iteration,
max_inner_iterations: 50,
}
}
#[test]
fn backward_dcs_exactness_and_terminates() {
let iteration = 5;
let (baked, baked_coefficients, _) = run_dcs_backward_trial_point(None, iteration);
let (dcs, dcs_coefficients, _) = run_dcs_backward_trial_point(Some(dcs_params(2)), iteration);
assert!((baked.intercept - dcs.intercept).abs() < 1e-9);
for (b, d) in baked_coefficients.iter().zip(&dcs_coefficients) {
assert!((b - d).abs() < 1e-9, "coeff mismatch baked {b} vs DCS {d}");
}
let tight = DcsParams {
max_inner_iterations: 1,
..dcs_params(2)
};
let (dcs_tc, dcs_tc_coefficients, _) = run_dcs_backward_trial_point(Some(tight), iteration);
assert!((baked.intercept - dcs_tc.intercept).abs() < 1e-9);
for (b, d) in baked_coefficients.iter().zip(&dcs_tc_coefficients) {
assert!(
(b - d).abs() < 1e-9,
"TC-fallback coeff mismatch baked {b} vs DCS {d}"
);
}
}
#[test]
fn backward_dcs_finite_k1_window_takes_effect() {
let iteration = 5;
let (baked, baked_coefficients, _) = run_dcs_backward_trial_point(None, iteration);
assert!((baked_coefficients[0] - 2.0).abs() < 1e-9);
let (windowed, windowed_coefficients, _) =
run_dcs_backward_trial_point(Some(dcs_params_k1(2, Some(1))), iteration);
assert!(
(windowed_coefficients[0] - baked_coefficients[0]).abs() > 1e-6
|| (windowed.intercept - baked.intercept).abs() > 1e-6,
"finite k1 must change the cut vs all-cuts (windowed coeff {} intercept {}; \
all-cuts coeff {} intercept {})",
windowed_coefficients[0],
windowed.intercept,
baked_coefficients[0],
baked.intercept,
);
}
#[test]
fn backward_dcs_run_to_run_determinism() {
let (cut_a, coefficients_a, meta_a) = run_dcs_backward_trial_point(Some(dcs_params(2)), 5);
let (cut_b, coefficients_b, meta_b) = run_dcs_backward_trial_point(Some(dcs_params(2)), 5);
assert_eq!(
cut_a.intercept.to_bits(),
cut_b.intercept.to_bits(),
"intercept must be bit-identical run-to-run"
);
assert_eq!(coefficients_a.len(), coefficients_b.len());
for (a, b) in coefficients_a.iter().zip(&coefficients_b) {
assert_eq!(
a.to_bits(),
b.to_bits(),
"coefficient must be bit-identical run-to-run"
);
}
assert_eq!(
meta_a, meta_b,
"binding-count metadata must be bit-identical run-to-run"
);
}
#[test]
#[cfg_attr(not(feature = "slow-tests"), ignore = "slow DCS exactness sweep")]
fn backward_dcs_exactness_sweep() {
let x_hats = [0.0_f64, 0.5, 1.0, 1.5, 2.0, 3.0, 5.0];
let iterations = [3_u64, 5, 7];
for &iteration in &iterations {
for &x in &x_hats {
let (baked, baked_coefficients, _) =
run_dcs_backward_trial_point_at(None, iteration, x);
let (dcs, dcs_coefficients, _) =
run_dcs_backward_trial_point_at(Some(dcs_params(2)), iteration, x);
assert!(
(baked.intercept - dcs.intercept).abs() < 1e-9,
"sweep iter {iteration} x_hat {x}: intercept baked {} vs DCS {}",
baked.intercept,
dcs.intercept
);
for (i, (b, d)) in baked_coefficients.iter().zip(&dcs_coefficients).enumerate() {
assert!(
(b - d).abs() < 1e-9,
"sweep iter {iteration} x_hat {x}: coeff[{i}] baked {b} vs DCS {d}"
);
}
}
}
}
fn dcs_baked_template_with_one_cut() -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 2,
num_nz: 4,
col_starts: vec![0_i32, 2, 2, 3, 4],
row_indices: vec![0_i32, 1, 0, 1],
values: vec![1.0, -5.0, -1.0, 1.0],
col_lower: vec![0.0, 0.0, 0.0, -1.0e6],
col_upper: vec![f64::INFINITY, f64::INFINITY, f64::INFINITY, 1.0e6],
objective: vec![0.0, 0.0, 0.0, 1.0],
row_lower: vec![0.0, 0.0],
row_upper: vec![0.0, f64::INFINITY],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
#[test]
fn backward_dcs_baked_cuts_present_no_duplicate_rows() {
let iteration = 5;
let indexer = {
let mut ix = StageIndexer::new(1, 0);
ix.finalize_for_test();
ix
};
let n_state = indexer.n_state;
let base = dcs_core_template();
let baked = dcs_baked_template_with_one_cut();
let templates = vec![base.clone(), base.clone()];
let base_rows = vec![0_usize, 0_usize];
let stochastic = make_stochastic_context(2, 1);
let horizon = HorizonMode::Finite { num_stages: 2 };
let risk_measures = vec![RiskMeasure::Expectation; 2];
let mut fcf = dcs_two_stage_fcf();
let cut_batch = crate::cut::row::build_cut_row_batch(&fcf, 1, &indexer, &[]);
let successor_active_slots: Vec<usize> = (0..fcf.pools[1].populated_count).collect();
let num_cuts = successor_active_slots.len();
let mut exchange = exchange_with_states(n_state, vec![vec![2.0]]);
let mut workspaces = dcs_active_workspace();
let mut basis_store = empty_basis_store(exchange.local_count(), 2);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let training_ctx = TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &[],
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
dcs: Some(dcs_params(2)),
noise_key_diag: None,
};
let probabilities = vec![1.0_f64];
let succ = super::SuccessorSpec {
t: 0,
successor: 1,
my_rank: 0,
probabilities: &probabilities,
cut_batch: &cut_batch,
num_cuts_at_successor: num_cuts,
template_num_rows: base.num_rows,
baked_template: &baked,
successor_active_slots: &successor_active_slots,
cut_activity_tolerance: 0.0,
successor_populated_count: fcf.pools[1].populated_count,
successor_pool: &fcf.pools[1],
};
let mut basis_slices = basis_store.split_workers_mut(1);
let ws = &mut workspaces[0];
let opening_solver = super::StageOpeningSolver::from_dcs_params(
training_ctx
.dcs
.filter(|params| params.is_active(iteration)),
);
opening_solver.prepare(ws, &ctx, &succ, iteration);
let n_openings = succ.probabilities.len();
while ws.backward_accum.outcomes.len() < n_openings {
ws.backward_accum
.outcomes
.push(crate::risk_measure::BackwardOutcome {
intercept: 0.0,
coefficients: vec![0.0_f64; n_state],
objective_value: 0.0,
});
}
let pop = succ.successor_populated_count;
if ws.backward_accum.slot_increments.len() < pop {
ws.backward_accum.slot_increments.resize(pop, 0);
}
ws.backward_accum.slot_increments[..pop].fill(0);
if ws.backward_accum.agg_coefficients.len() < n_state {
ws.backward_accum.agg_coefficients.resize(n_state, 0.0);
}
if ws.backward_accum.agg_arena.len() < n_state {
ws.backward_accum.agg_arena.resize(n_state, 0.0);
}
if ws.backward_accum.metadata_sync_contribution.len() < pop {
ws.backward_accum.metadata_sync_contribution.resize(pop, 0);
}
ws.backward_accum.metadata_sync_contribution[..pop].fill(0);
ws.backward_accum
.per_opening_stats
.resize_with(n_openings, SolverStatsDelta::default);
for slot in &mut ws.backward_accum.per_opening_stats[..n_openings] {
*slot = SolverStatsDelta::default();
}
let dcs_cut = super::process_trial_point_backward(
ws,
&ctx,
&training_ctx,
&exchange,
0,
iteration,
&risk_measures,
&succ,
&mut basis_slices[0],
&opening_solver,
0,
0,
)
.expect("DCS backward solve with baked cuts present must succeed");
let dcs_coefficients = staged_cut_coefficients(&dcs_cut, &ws.backward_accum.agg_arena).to_vec();
let _ = (&mut fcf, &mut exchange);
let (allcuts, allcuts_coefficients, _) = run_dcs_backward_trial_point(None, iteration);
assert!(
(dcs_cut.intercept - allcuts.intercept).abs() < 1e-9,
"intercept: DCS {} vs all-cuts {}",
dcs_cut.intercept,
allcuts.intercept
);
assert_eq!(dcs_coefficients.len(), allcuts_coefficients.len());
for (i, (d, a)) in dcs_coefficients
.iter()
.zip(&allcuts_coefficients)
.enumerate()
{
assert!((d - a).abs() < 1e-9, "coeff[{i}]: DCS {d} vs all-cuts {a}");
}
assert!((dcs_coefficients[0] - 2.0).abs() < 1e-9);
}