use std::sync::mpsc::Sender;
use std::time::Instant;
use cobre_comm::Communicator;
use cobre_core::{TrainingEvent, WelfordAccumulator};
use cobre_solver::{RowBatch, SolverInterface, StageTemplate};
use cobre_stochastic::context::ClassSchemes;
use cobre_stochastic::{
ClassDimensions, ForwardSampler, ForwardSamplerConfig, build_forward_sampler,
};
use crate::{
context::{StageContext, TrainingContext},
cut::FutureCostFunction,
cut::pool::CutPool,
error::SddpError,
indexer::StageIndexer,
lp_builder::COST_SCALE_FACTOR,
noise::{NcsNoiseOffsets, transform_inflow_noise, transform_load_noise, transform_ncs_noise},
solver_stats::SolverStatsDelta,
trajectory::TrajectoryRecord,
workspace::{BasisStore, BasisStoreSliceMut, CapturedBasis, SolverWorkspace},
};
#[derive(Debug, Clone)]
#[must_use]
pub struct ForwardResult {
pub scenario_costs: Vec<f64>,
pub elapsed_ms: u64,
pub lp_solves: u64,
pub setup_time_ms: u64,
pub load_imbalance_ms: u64,
pub scheduling_overhead_ms: u64,
pub stage_stats: Vec<SolverStatsDelta>,
}
#[derive(Debug, Clone)]
#[must_use]
pub struct SyncResult {
pub global_ub_mean: f64,
pub global_ub_std: f64,
pub ci_95_half_width: f64,
pub sync_time_ms: u64,
}
pub fn sync_forward<C: Communicator>(
local: &ForwardResult,
comm: &C,
total_forward_passes: usize,
) -> Result<SyncResult, SddpError> {
let start = Instant::now();
let num_ranks = comm.size();
let my_rank = comm.rank();
let base = total_forward_passes / num_ranks;
let remainder = total_forward_passes % num_ranks;
let counts: Vec<usize> = (0..num_ranks)
.map(|r| base + usize::from(r < remainder))
.collect();
let mut displs = vec![0usize; num_ranks];
for r in 1..num_ranks {
displs[r] = displs[r - 1] + counts[r - 1];
}
let global_n = counts.iter().sum::<usize>();
debug_assert_eq!(
global_n, total_forward_passes,
"counts sum {global_n} != total_forward_passes {total_forward_passes}",
);
let mut global_costs = vec![0.0_f64; global_n];
debug_assert_eq!(
local.scenario_costs.len(),
counts[my_rank],
"rank {my_rank}: scenario_costs length {} != expected count {}",
local.scenario_costs.len(),
counts[my_rank],
);
comm.allgatherv(&local.scenario_costs, &mut global_costs, &counts, &displs)?;
let mut welford = WelfordAccumulator::new();
for &c in &global_costs {
welford.update(c);
}
let mean = welford.mean();
let (std_dev, ci_95) = if global_n > 1 {
let sd = welford.sample_std_dev();
let ci = welford.sample_ci_95_half_width();
(sd, ci)
} else {
(0.0_f64, 0.0_f64)
};
#[allow(clippy::cast_possible_truncation)]
let sync_time_ms = start.elapsed().as_millis() as u64;
Ok(SyncResult {
global_ub_mean: mean,
global_ub_std: std_dev,
ci_95_half_width: ci_95,
sync_time_ms,
})
}
#[allow(clippy::empty_line_after_doc_comments)]
#[inline]
fn push_scaled_coefficient(batch: &mut RowBatch, j: usize, coeff: f64, col_scale: &[f64]) {
debug_assert!(
i32::try_from(j).is_ok(),
"column index j={j} exceeds i32::MAX"
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.col_indices.push(j as i32);
let d = if col_scale.is_empty() {
1.0
} else {
col_scale[j]
};
batch.values.push(-coeff * d);
}
pub fn build_cut_row_batch_into(
batch: &mut RowBatch,
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
) {
batch.clear();
let n_state = indexer.n_state;
let theta_col = indexer.theta;
let mask = &indexer.nonzero_state_indices;
let is_sparse = !mask.is_empty();
let num_cuts: usize = fcf.pools[stage].active_count();
if num_cuts == 0 {
batch.row_starts.push(0_i32);
return;
}
let nnz_per_cut = if is_sparse {
mask.len() + 1
} else {
n_state + 1
};
let total_nnz = num_cuts * nnz_per_cut;
let mut nz_offset = 0;
for (_slot, intercept, coefficients) in fcf.active_cuts(stage) {
debug_assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length {got} != n_state {expected}",
got = coefficients.len(),
expected = n_state,
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.row_starts.push(nz_offset as i32);
if is_sparse {
for &j in mask {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch, lp_col, coefficients[j], col_scale);
}
} else {
for (j, &c) in coefficients.iter().enumerate() {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch, lp_col, c, col_scale);
}
}
debug_assert!(
i32::try_from(theta_col).is_ok(),
"theta_col={theta_col} exceeds i32::MAX"
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.col_indices.push(theta_col as i32);
let d_theta = if col_scale.is_empty() {
1.0
} else {
col_scale[theta_col]
};
batch.values.push(d_theta);
batch.row_lower.push(intercept);
batch.row_upper.push(f64::INFINITY);
nz_offset += nnz_per_cut;
}
#[allow(clippy::expect_used)]
batch.row_starts.push(
i32::try_from(total_nnz).expect("total_nnz exceeds i32::MAX; LP exceeds HiGHS API limit"),
);
batch.num_rows = num_cuts;
}
#[must_use]
pub fn build_cut_row_batch(
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
) -> RowBatch {
let mut batch = RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
};
build_cut_row_batch_into(&mut batch, fcf, stage, indexer, col_scale);
batch
}
pub fn build_delta_cut_row_batch_into(
batch: &mut RowBatch,
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
current_iteration: u64,
) {
batch.clear();
let n_state = indexer.n_state;
let theta_col = indexer.theta;
let mask = &indexer.nonzero_state_indices;
let is_sparse = !mask.is_empty();
let num_cuts: usize = fcf.pools[stage]
.active_delta_cuts(current_iteration)
.count();
if num_cuts == 0 {
batch.row_starts.push(0_i32);
return;
}
let nnz_per_cut = if is_sparse {
mask.len() + 1
} else {
n_state + 1
};
let total_nnz = num_cuts * nnz_per_cut;
let mut nz_offset = 0;
for (_slot, intercept, coefficients) in fcf.pools[stage].active_delta_cuts(current_iteration) {
debug_assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length {got} != n_state {expected}",
got = coefficients.len(),
expected = n_state,
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.row_starts.push(nz_offset as i32);
if is_sparse {
for &j in mask {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch, lp_col, coefficients[j], col_scale);
}
} else {
for (j, &c) in coefficients.iter().enumerate() {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch, lp_col, c, col_scale);
}
}
debug_assert!(
i32::try_from(theta_col).is_ok(),
"theta_col={theta_col} exceeds i32::MAX"
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch.col_indices.push(theta_col as i32);
let d_theta = if col_scale.is_empty() {
1.0
} else {
col_scale[theta_col]
};
batch.values.push(d_theta);
batch.row_lower.push(intercept);
batch.row_upper.push(f64::INFINITY);
nz_offset += nnz_per_cut;
}
#[allow(clippy::expect_used)]
batch.row_starts.push(
i32::try_from(total_nnz).expect("total_nnz exceeds i32::MAX; LP exceeds HiGHS API limit"),
);
batch.num_rows = num_cuts;
}
pub fn append_new_cuts_to_lp<S: SolverInterface>(
solver: &mut S,
fcf: &FutureCostFunction,
stage: usize,
indexer: &StageIndexer,
col_scale: &[f64],
row_map: &mut crate::cut::CutRowMap,
batch_buf: &mut RowBatch,
) -> usize {
batch_buf.clear();
let n_state = indexer.n_state;
let theta_col = indexer.theta;
let mask = &indexer.nonzero_state_indices;
let is_sparse = !mask.is_empty();
let nnz_per_cut = if is_sparse {
mask.len() + 1
} else {
n_state + 1
};
let mut new_count = 0usize;
let mut nz_offset = 0usize;
for (slot, intercept, coefficients) in fcf.active_cuts(stage) {
if row_map.lp_row_for_slot(slot).is_some() {
continue;
}
debug_assert_eq!(
coefficients.len(),
n_state,
"cut coefficients length {got} != n_state {expected}",
got = coefficients.len(),
expected = n_state,
);
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch_buf.row_starts.push(nz_offset as i32);
if is_sparse {
for &j in mask {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch_buf, lp_col, coefficients[j], col_scale);
}
} else {
for (j, &c) in coefficients.iter().enumerate() {
let lp_col = indexer.state_to_lp_column(j);
push_scaled_coefficient(batch_buf, lp_col, c, col_scale);
}
}
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
batch_buf.col_indices.push(theta_col as i32);
let d_theta = if col_scale.is_empty() {
1.0
} else {
col_scale[theta_col]
};
batch_buf.values.push(d_theta);
batch_buf.row_lower.push(intercept);
batch_buf.row_upper.push(f64::INFINITY);
row_map.insert(slot);
new_count += 1;
nz_offset += nnz_per_cut;
}
if new_count > 0 {
let total_nnz = new_count * nnz_per_cut;
#[allow(clippy::expect_used)]
batch_buf.row_starts.push(
i32::try_from(total_nnz)
.expect("total_nnz exceeds i32::MAX; LP exceeds HiGHS API limit"),
);
batch_buf.num_rows = new_count;
solver.add_rows(batch_buf);
}
new_count
}
pub struct ForwardPassBatch<'a> {
pub local_forward_passes: usize,
pub total_forward_passes: usize,
pub iteration: u64,
pub fwd_offset: usize,
pub event_sender: Option<&'a Sender<TrainingEvent>>,
pub basis_activity_window: u32,
}
#[inline]
pub(crate) fn partition(n_scenarios: usize, n_workers: usize, worker_id: usize) -> (usize, usize) {
if n_workers == 0 {
return (0, 0);
}
let base = n_scenarios / n_workers;
let remainder = n_scenarios % n_workers;
let start = base * worker_id + worker_id.min(remainder);
let end = start + base + usize::from(worker_id < remainder);
(start, end)
}
pub(crate) struct StageKey<'a> {
pub(crate) t: usize,
pub(crate) m: usize,
pub(crate) local_m: usize,
pub(crate) num_stages: usize,
pub(crate) iteration: u64,
pub(crate) raw_noise: &'a [f64],
pub(crate) basis_row_capacity: usize,
pub(crate) terminal_has_boundary_cuts: bool,
pub(crate) pool: &'a CutPool,
pub(crate) baked_template: &'a StageTemplate,
pub(crate) basis_activity_window: u32,
}
#[allow(clippy::cast_possible_truncation)]
pub(crate) fn write_capture_metadata(
captured: &mut CapturedBasis,
pool: &CutPool,
base_row_count: usize,
cut_row_count: usize,
current_state: &[f64],
) {
captured.cut_row_slots.clear();
for (slot, _intercept, _coeffs) in pool.active_cuts().take(cut_row_count) {
captured.cut_row_slots.push(slot as u32);
}
captured.state_at_capture.clear();
captured.state_at_capture.extend_from_slice(current_state);
captured.base_row_count = base_row_count;
let expected_len = base_row_count + cut_row_count;
if captured.basis.row_status.len() != expected_len {
captured.basis.row_status.resize(
expected_len,
crate::basis_reconstruct::HIGHS_BASIS_STATUS_BASIC,
);
}
}
#[allow(clippy::too_many_lines)]
pub(crate) fn run_forward_stage<S: SolverInterface + Send>(
ws: &mut SolverWorkspace<S>,
basis_slice: &mut BasisStoreSliceMut<'_>,
ctx: &StageContext<'_>,
training_ctx: &TrainingContext<'_>,
key: &StageKey<'_>,
worker_records: &mut [TrajectoryRecord],
) -> Result<f64, SddpError> {
let StageKey {
t,
m,
local_m,
num_stages,
iteration,
raw_noise,
basis_row_capacity,
terminal_has_boundary_cuts,
pool,
baked_template,
basis_activity_window,
} = *key;
let n_hydros = ctx.n_hydros;
let n_load_buses = ctx.n_load_buses;
let indexer = training_ctx.indexer;
let stochastic = training_ctx.stochastic;
let horizon = training_ctx.horizon;
let (state_ref, scratch) = (&ws.current_state[..], &mut ws.scratch);
transform_inflow_noise(raw_noise, t, state_ref, ctx, training_ctx, scratch);
let blk = if n_load_buses > 0 {
ctx.block_counts_per_stage[t]
} else {
0
};
transform_load_noise(
raw_noise,
n_hydros,
n_load_buses,
stochastic,
t,
blk,
&mut ws.scratch.load_rhs_buf,
);
let n_stochastic_ncs = stochastic.n_stochastic_ncs();
if n_stochastic_ncs > 0 {
transform_ncs_noise(
raw_noise,
&NcsNoiseOffsets {
n_hydros,
n_load_buses,
},
stochastic,
t,
ctx.block_counts_per_stage[t],
ctx.ncs_max_gen,
ctx.ncs_allow_curtailment,
&mut ws.scratch.ncs_col_lower_buf,
&mut ws.scratch.ncs_col_upper_buf,
);
}
ws.patch_buf.fill_forward_patches(
indexer,
&ws.current_state,
&ws.scratch.noise_buf,
ctx.base_rows[t],
&ctx.templates[t].row_scale,
);
if n_load_buses > 0 {
ws.patch_buf.fill_load_patches(
ctx.load_balance_row_starts[t],
ctx.block_counts_per_stage[t],
&ws.scratch.load_rhs_buf,
ctx.load_bus_indices,
&ctx.templates[t].row_scale,
);
}
ws.patch_buf.fill_z_inflow_patches(
indexer.z_inflow_row_start,
&ws.scratch.z_inflow_rhs_buf,
&ctx.templates[t].row_scale,
);
let pc = ws.patch_buf.forward_patch_count();
ws.solver.set_row_bounds(
&ws.patch_buf.indices[..pc],
&ws.patch_buf.lower[..pc],
&ws.patch_buf.upper[..pc],
);
if n_stochastic_ncs > 0 && !indexer.ncs_generation.is_empty() {
let n_blks = ctx.block_counts_per_stage[t];
let expected_len = n_stochastic_ncs * n_blks;
if ws.scratch.ncs_col_indices_buf.len() != expected_len {
ws.scratch.ncs_col_indices_buf.clear();
for ncs_idx in 0..n_stochastic_ncs {
for blk in 0..n_blks {
ws.scratch
.ncs_col_indices_buf
.push(indexer.ncs_generation.start + ncs_idx * n_blks + blk);
}
}
}
ws.solver.set_col_bounds(
&ws.scratch.ncs_col_indices_buf,
&ws.scratch.ncs_col_lower_buf,
&ws.scratch.ncs_col_upper_buf,
);
}
if horizon.is_terminal(t + 1) && !terminal_has_boundary_cuts {
ws.solver.set_col_bounds(&[indexer.theta], &[0.0], &[0.0]);
}
let mut current_state_local: Vec<f64> = std::mem::take(&mut ws.scratch.current_state_scratch);
current_state_local.clear();
current_state_local.extend_from_slice(&ws.current_state[..indexer.n_state]);
let inputs = crate::stage_solve::StageInputs {
stage_context: ctx,
indexer,
pool,
current_state: ¤t_state_local,
stored_basis: basis_slice.get_mut(m, t).as_ref(),
baked_template,
stage_index: t,
scenario_index: m,
iteration: Some(iteration),
horizon_is_terminal: horizon.is_terminal(t + 1),
terminal_has_boundary_cuts,
basis_activity_window,
};
let outcome =
crate::stage_solve::run_stage_solve(ws, crate::stage_solve::Phase::Forward, &inputs)
.map_err(|e| {
if matches!(e, SddpError::Infeasible { .. }) {
*basis_slice.get_mut(m, t) = None;
}
e
})?;
let crate::stage_solve::StageOutcome::Forward {
view,
recon_stats: _,
} = outcome
else {
unreachable!("run_stage_solve(Phase::Forward, ...) returns Forward variant")
};
let col_scale = &ctx.templates[t].col_scale;
let view_objective = view.objective;
let unscaled_primal: Vec<f64> = if col_scale.is_empty() {
view.primal.to_vec()
} else {
view.primal
.iter()
.zip(col_scale)
.map(|(&xp, &d)| d * xp)
.collect()
};
let _ = view;
ws.scratch.current_state_scratch = current_state_local;
let d_t = ctx.discount_factors.get(t).copied().unwrap_or(1.0);
let stage_cost = (view_objective - d_t * unscaled_primal[indexer.theta]) * COST_SCALE_FACTOR;
let rec = &mut worker_records[local_m * num_stages + t];
rec.primal.clear();
rec.dual.clear();
rec.stage_cost = stage_cost;
let lag_start = indexer.inflow_lags.start;
let lag_len = indexer.hydro_count * indexer.max_par_order;
ws.scratch.lag_matrix_buf.clear();
ws.scratch
.lag_matrix_buf
.extend_from_slice(&ws.current_state[lag_start..lag_start + lag_len]);
ws.current_state.clear();
ws.current_state
.extend_from_slice(&unscaled_primal[..indexer.n_state]);
let stage_lag = ctx.stage_lag_transitions.get(t).copied().unwrap_or(
cobre_core::temporal::StageLagTransition {
accumulate_weight: 1.0,
spillover_weight: 0.0,
finalize_period: true,
accumulate_downstream: false,
downstream_accumulate_weight: 0.0,
downstream_spillover_weight: 0.0,
downstream_finalize: false,
rebuild_from_downstream: false,
},
);
let downstream_par_order = ws
.scratch
.downstream_completed_lags
.len()
.checked_div(ws.scratch.lag_accumulator.len())
.unwrap_or(0);
crate::noise::accumulate_and_shift_lag_state(
&mut ws.current_state,
&ws.scratch.lag_matrix_buf,
&unscaled_primal,
indexer,
&stage_lag,
&mut crate::noise::LagAccumState {
accumulator: &mut ws.scratch.lag_accumulator,
weight_accum: &mut ws.scratch.lag_weight_accum,
},
&mut crate::noise::DownstreamAccumState {
accumulator: &mut ws.scratch.downstream_accumulator,
weight_accum: &mut ws.scratch.downstream_weight_accum,
completed_lags: &mut ws.scratch.downstream_completed_lags,
n_completed: &mut ws.scratch.downstream_n_completed,
par_order: downstream_par_order,
},
);
rec.state.clear();
rec.state.extend_from_slice(&ws.current_state);
let cut_row_count = basis_row_capacity.saturating_sub(ctx.templates[t].num_rows);
if let Some(captured) = basis_slice.get_mut(m, t) {
ws.solver.get_basis(&mut captured.basis);
write_capture_metadata(
captured,
pool,
ctx.templates[t].num_rows,
cut_row_count,
&ws.current_state[..indexer.n_state],
);
} else {
let mut captured = CapturedBasis::new(
ctx.templates[t].num_cols,
basis_row_capacity,
ctx.templates[t].num_rows,
cut_row_count,
indexer.n_state,
);
ws.solver.get_basis(&mut captured.basis);
write_capture_metadata(
&mut captured,
pool,
ctx.templates[t].num_rows,
cut_row_count,
&ws.current_state[..indexer.n_state],
);
*basis_slice.get_mut(m, t) = Some(captured);
}
Ok(stage_cost)
}
pub fn build_sampler_from_ctx<'a>(
ctx: &'a TrainingContext<'a>,
) -> Result<ForwardSampler<'a>, SddpError> {
let stochastic = ctx.stochastic;
build_forward_sampler(ForwardSamplerConfig {
class_schemes: ClassSchemes {
inflow: Some(ctx.inflow_scheme),
load: Some(ctx.load_scheme),
ncs: Some(ctx.ncs_scheme),
},
ctx: stochastic,
stages: ctx.stages,
dims: ClassDimensions {
n_hydros: stochastic.n_hydros(),
n_load_buses: stochastic.n_load_buses(),
n_ncs: stochastic.n_stochastic_ncs(),
},
historical_library: ctx.historical_library,
external_inflow_library: ctx.external_inflow_library,
external_load_library: ctx.external_load_library,
external_ncs_library: ctx.external_ncs_library,
})
.map_err(SddpError::Stochastic)
}
pub fn run_forward_pass<S: SolverInterface + Send>(
workspaces: &mut [SolverWorkspace<S>],
basis_store: &mut BasisStore,
ctx: &StageContext<'_>,
baked: &[StageTemplate],
fcf: &FutureCostFunction,
training_ctx: &TrainingContext<'_>,
batch: &ForwardPassBatch<'_>,
records: &mut [TrajectoryRecord],
) -> Result<ForwardResult, SddpError> {
use crate::forward_pass_state::{ForwardPassInputs, ForwardPassState};
let n_workers = workspaces.len().max(1);
let num_stages = training_ctx.horizon.num_stages();
let mut state = ForwardPassState::new(n_workers, num_stages, batch.local_forward_passes);
let mut inputs = ForwardPassInputs {
workspaces,
basis_store,
ctx,
baked,
fcf,
training_ctx,
records,
local_forward_passes: batch.local_forward_passes,
total_forward_passes: batch.total_forward_passes,
iteration: batch.iteration,
fwd_offset: batch.fwd_offset,
event_sender: batch.event_sender,
basis_activity_window: batch.basis_activity_window,
};
state.run(&mut inputs)
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_comm::{CommData, Communicator, ReduceOp};
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
LoadModel, SamplingScheme,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
use cobre_solver::{
Basis, LpSolution, RowBatch, SolverError, SolverInterface, SolverStatistics, StageTemplate,
};
use cobre_stochastic::StochasticContext;
use cobre_stochastic::context::{ClassSchemes, OpeningTreeInputs, build_stochastic_context};
use cobre_comm::LocalBackend;
use super::{
ForwardPassBatch, ForwardResult, SyncResult, build_cut_row_batch,
build_delta_cut_row_batch_into, partition, run_forward_pass, sync_forward,
};
use crate::{
StoppingMode, StoppingRule, StoppingRuleSet, TrainingConfig,
config::{CutManagementConfig, EventConfig, LoopConfig},
context::{StageContext, TrainingContext},
cut::FutureCostFunction,
horizon_mode::HorizonMode,
indexer::StageIndexer,
inflow_method::InflowNonNegativityMethod,
risk_measure::RiskMeasure,
trajectory::TrajectoryRecord,
workspace::{BackwardAccumulators, BasisStore, SolverWorkspace},
};
struct MockSolver {
solution: LpSolution,
infeasible_at: Option<usize>,
call_count: usize,
warm_start_calls: usize,
buf_primal: Vec<f64>,
buf_dual: Vec<f64>,
buf_reduced_costs: Vec<f64>,
}
impl MockSolver {
fn always_ok(solution: LpSolution) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: None,
call_count: 0,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn infeasible_on(solution: LpSolution, n: usize) -> Self {
let buf_primal = solution.primal.clone();
let buf_dual = solution.dual.clone();
let buf_reduced_costs = solution.reduced_costs.clone();
Self {
solution,
infeasible_at: Some(n),
call_count: 0,
warm_start_calls: 0,
buf_primal,
buf_dual,
buf_reduced_costs,
}
}
fn do_solve(&mut self) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
let call = self.call_count;
self.call_count += 1;
if self.infeasible_at == Some(call) {
return Err(SolverError::Infeasible);
}
self.buf_primal.clone_from(&self.solution.primal);
self.buf_dual.clone_from(&self.solution.dual);
self.buf_reduced_costs
.clone_from(&self.solution.reduced_costs);
Ok(cobre_solver::SolutionView {
objective: self.solution.objective,
primal: &self.buf_primal,
dual: &self.buf_dual,
reduced_costs: &self.buf_reduced_costs,
iterations: self.solution.iterations,
solve_time_seconds: self.solution.solve_time_seconds,
})
}
}
impl SolverInterface for MockSolver {
fn solver_name_version(&self) -> String {
"MockSolver 0.0.0".to_string()
}
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, _cuts: &RowBatch) {}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(
&mut self,
basis: Option<&Basis>,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
if basis.is_some() {
self.warm_start_calls += 1;
}
self.do_solve()
}
fn get_basis(&mut self, _out: &mut Basis) {}
fn statistics(&self) -> SolverStatistics {
SolverStatistics {
solve_count: self.call_count as u64,
..SolverStatistics::default()
}
}
fn name(&self) -> &'static str {
"Mock"
}
}
fn minimal_template_1_0() -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 1,
num_nz: 1,
col_starts: vec![0_i32, 0, 0, 1, 1], row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, f64::NEG_INFINITY, 0.0, 0.0],
col_upper: vec![f64::INFINITY; 4],
objective: vec![0.0, 0.0, 0.0, 1.0], row_lower: vec![0.0],
row_upper: vec![0.0],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn fixed_solution(
num_cols: usize,
objective: f64,
theta_col: usize,
theta_val: f64,
) -> LpSolution {
let mut primal = vec![0.0_f64; num_cols];
primal[theta_col] = theta_val;
let num_rows = 1; LpSolution {
objective,
primal,
dual: vec![0.0; num_rows],
reduced_costs: vec![0.0; num_cols],
iterations: 0,
solve_time_seconds: 0.0,
}
}
fn empty_records(n: usize) -> Vec<TrajectoryRecord> {
(0..n)
.map(|_| TrajectoryRecord {
primal: Vec::new(),
dual: Vec::new(),
stage_cost: 0.0,
state: Vec::new(),
})
.collect()
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_1_hydro_3_stages() -> StochasticContext {
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let make_stage = |idx: usize, id: i32| Stage {
index: idx,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let stages = vec![make_stage(0, 0), make_stage(1, 1), make_stage(2, 2)];
let inflow = |stage_id: i32| InflowModel {
hydro_id: EntityId(1),
stage_id,
mean_m3s: 100.0,
std_m3s: 30.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(stages)
.inflow_models(vec![inflow(0), inflow(1), inflow(2)])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
fn forward_result_field_access() {
let r = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 123,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
assert_eq!(r.scenario_costs.len(), 4);
assert_eq!(r.scenario_costs[0], 60.0);
assert_eq!(r.elapsed_ms, 123);
}
#[test]
fn forward_result_clone_and_debug() {
let r = ForwardResult {
scenario_costs: vec![1.0, 2.0],
elapsed_ms: 5,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let c = r.clone();
assert_eq!(c.scenario_costs.len(), r.scenario_costs.len());
assert_eq!(c.scenario_costs[0].to_bits(), r.scenario_costs[0].to_bits());
let s = format!("{r:?}");
assert!(s.contains("ForwardResult"));
}
#[test]
fn forward_overhead_decomposition_four_workers() {
use cobre_solver::SolverStatistics;
use crate::solver_stats::SolverStatsDelta;
fn make_stats(
solve_s: f64,
load_model_s: f64,
set_bounds_s: f64,
basis_set_s: f64,
) -> SolverStatistics {
SolverStatistics {
total_solve_time_seconds: solve_s,
total_load_model_time_seconds: load_model_s,
total_set_bounds_time_seconds: set_bounds_s,
total_basis_set_time_seconds: basis_set_s,
..SolverStatistics::default()
}
}
let befores = [
make_stats(0.0, 0.0, 0.0, 0.0),
make_stats(0.0, 0.0, 0.0, 0.0),
make_stats(0.0, 0.0, 0.0, 0.0),
make_stats(0.0, 0.0, 0.0, 0.0),
];
let afters = [
make_stats(0.500, 0.050, 0.0, 0.0),
make_stats(0.600, 0.060, 0.0, 0.0),
make_stats(0.550, 0.045, 0.0, 0.0),
make_stats(0.580, 0.055, 0.0, 0.0),
];
let deltas: Vec<SolverStatsDelta> = befores
.iter()
.zip(&afters)
.map(|(b, a)| SolverStatsDelta::from_snapshots(b, a))
.collect();
let setup_ms: f64 = deltas
.iter()
.map(|d| d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms)
.sum();
let worker_totals: Vec<f64> = deltas
.iter()
.map(|d| {
d.solve_time_ms + d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms
})
.collect();
let n_workers_f = 4.0_f64;
let max_ms = worker_totals.iter().copied().fold(0.0_f64, f64::max);
let avg_ms = worker_totals.iter().sum::<f64>() / n_workers_f;
let imbalance_ms = (max_ms - avg_ms).max(0.0);
let parallel_wall_ms = 700_u64; #[allow(clippy::cast_precision_loss)]
let scheduling_ms = (parallel_wall_ms as f64 - max_ms).max(0.0);
assert!(
(setup_ms - 210.0).abs() < 0.001,
"setup_time_ms should be 210, got {setup_ms}"
);
assert!(
(max_ms - 660.0).abs() < 0.001,
"max_worker_ms should be 660, got {max_ms}"
);
assert!(
(avg_ms - 610.0).abs() < 0.001,
"avg_worker_ms should be 610, got {avg_ms}"
);
assert!(
(imbalance_ms - 50.0).abs() < 0.001,
"load_imbalance_ms should be 50, got {imbalance_ms}"
);
assert!(
(scheduling_ms - 40.0).abs() < 0.001,
"scheduling_overhead_ms should be 40, got {scheduling_ms}"
);
}
#[test]
fn forward_overhead_decomposition_single_worker_zero_imbalance() {
use cobre_solver::SolverStatistics;
use crate::solver_stats::SolverStatsDelta;
let before = SolverStatistics::default();
let after = SolverStatistics {
total_solve_time_seconds: 1.0,
total_load_model_time_seconds: 0.1,
..SolverStatistics::default()
};
let deltas = [SolverStatsDelta::from_snapshots(&before, &after)];
let worker_totals: Vec<f64> = deltas
.iter()
.map(|d| {
d.solve_time_ms + d.load_model_time_ms + d.set_bounds_time_ms + d.basis_set_time_ms
})
.collect();
let n_workers_f = 1.0_f64;
let max_ms = worker_totals.iter().copied().fold(0.0_f64, f64::max);
let avg_ms = worker_totals.iter().sum::<f64>() / n_workers_f;
let imbalance_ms = (max_ms - avg_ms).max(0.0);
assert_eq!(
imbalance_ms, 0.0,
"load_imbalance_ms must be 0.0 for a single worker"
);
}
#[test]
fn forward_overhead_scheduling_clamped_to_zero_on_clock_skew() {
use cobre_solver::SolverStatistics;
use crate::solver_stats::SolverStatsDelta;
let before = SolverStatistics::default();
let after = SolverStatistics {
total_solve_time_seconds: 1.0, ..SolverStatistics::default()
};
let deltas = [SolverStatsDelta::from_snapshots(&before, &after)];
let max_ms = deltas
.iter()
.map(|d| d.solve_time_ms)
.fold(0.0_f64, f64::max);
let parallel_wall_ms = 800_u64;
#[allow(clippy::cast_precision_loss)]
let scheduling_ms = (parallel_wall_ms as f64 - max_ms).max(0.0);
assert_eq!(
scheduling_ms, 0.0,
"scheduling_overhead_ms must clamp to 0.0 on clock skew"
);
}
#[test]
fn build_cut_row_batch_empty_cuts_returns_empty_batch() {
let fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
let indexer = StageIndexer::new(1, 0);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 0);
assert_eq!(batch.row_starts, vec![0]);
assert!(batch.col_indices.is_empty());
assert!(batch.values.is_empty());
assert!(batch.row_lower.is_empty());
assert!(batch.row_upper.is_empty());
}
#[test]
fn build_cut_row_batch_one_cut_correct_structure() {
let mut fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 0, 0, 5.0, &[2.0]);
let indexer = StageIndexer::new(1, 0);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_starts, vec![0, 2]);
assert_eq!(batch.col_indices, vec![0, 3]); assert_eq!(batch.values, vec![-2.0, 1.0]);
assert_eq!(batch.row_lower, vec![5.0]);
assert!(batch.row_upper[0].is_infinite() && batch.row_upper[0] > 0.0);
}
#[test]
fn build_cut_row_batch_two_cuts_correct_row_starts() {
let mut fcf = FutureCostFunction::new(2, 2, 1, 10, &[0; 2]);
fcf.add_cut(1, 0, 0, 10.0, &[1.0, 3.0]);
fcf.add_cut(1, 1, 0, 20.0, &[2.0, 4.0]);
let indexer = StageIndexer::new(1, 1);
let batch = build_cut_row_batch(&fcf, 1, &indexer, &[]);
assert_eq!(batch.num_rows, 2);
assert_eq!(batch.row_starts, vec![0, 3, 6]);
assert_eq!(batch.col_indices[0], 0); assert_eq!(batch.col_indices[1], 2); assert_eq!(batch.col_indices[2], 4); assert_eq!(batch.values[0], -1.0);
assert_eq!(batch.values[1], -3.0);
assert_eq!(batch.values[2], 1.0);
assert_eq!(batch.col_indices[3], 0); assert_eq!(batch.col_indices[4], 2); assert_eq!(batch.col_indices[5], 4); assert_eq!(batch.values[3], -2.0);
assert_eq!(batch.values[4], -4.0);
assert_eq!(batch.values[5], 1.0);
assert_eq!(batch.row_lower, vec![10.0, 20.0]);
assert!(batch.row_upper[0].is_infinite() && batch.row_upper[0] > 0.0);
assert!(batch.row_upper[1].is_infinite() && batch.row_upper[1] > 0.0);
}
#[test]
fn build_cut_row_batch_zero_coefficient_state_variable() {
let mut fcf = FutureCostFunction::new(1, 2, 1, 5, &[0; 1]);
fcf.add_cut(0, 0, 0, 3.0, &[0.0, 7.0]);
let indexer = StageIndexer::new(1, 1);
let batch = build_cut_row_batch(&fcf, 0, &indexer, &[]);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.col_indices, vec![0, 2, 4]); assert_eq!(batch.values, vec![0.0, -7.0, 1.0]);
assert_eq!(batch.row_lower, vec![3.0]);
}
fn single_workspace(solver: MockSolver, indexer: &StageIndexer) -> SolverWorkspace<MockSolver> {
SolverWorkspace {
rank: 0,
worker_id: 0,
solver,
patch_buf: crate::lp_builder::PatchBuffer::new(
indexer.hydro_count,
indexer.max_par_order,
0,
0,
),
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(indexer.hydro_count),
inflow_m3s_buf: Vec::with_capacity(indexer.hydro_count),
lag_matrix_buf: Vec::with_capacity(indexer.max_par_order * indexer.hydro_count),
par_inflow_buf: Vec::with_capacity(indexer.hydro_count),
eta_floor_buf: Vec::with_capacity(indexer.hydro_count),
zero_targets_buf: vec![0.0_f64; indexer.hydro_count],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::new(),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
}
}
fn make_stages_3() -> Vec<Stage> {
let make_stage = |idx: usize, id: i32| Stage {
index: idx,
id,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
vec![make_stage(0, 0), make_stage(1, 1), make_stage(2, 2)]
}
#[test]
#[allow(clippy::too_many_lines)]
fn ac_two_scenarios_three_stages_fixed_solution() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let config = TrainingConfig {
loop_config: LoopConfig {
forward_passes: 2,
max_iterations: 100,
start_iteration: 0,
n_fwd_threads: 1,
max_blocks: 1,
stopping_rules: StoppingRuleSet {
rules: vec![StoppingRule::IterationLimit { limit: 100 }],
mode: StoppingMode::Any,
},
},
cut_management: CutManagementConfig {
cut_selection: None,
budget: None,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
warm_start_cuts: 0,
risk_measures: vec![RiskMeasure::Expectation],
},
events: EventConfig {
event_sender: None,
checkpoint_interval: None,
shutdown_flag: None,
export_states: false,
},
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store =
BasisStore::new(config.loop_config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: config.loop_config.forward_passes as usize,
total_forward_passes: config.loop_config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
for (i, record) in records.iter().enumerate() {
assert_eq!(
record.stage_cost, 70_000.0,
"record[{i}].stage_cost should be 70_000.0 ((objective - theta) * COST_SCALE_FACTOR)"
);
}
assert_eq!(result.scenario_costs[0], 210_000.0);
assert_eq!(result.scenario_costs[1], 210_000.0);
}
#[test]
#[allow(clippy::too_many_lines)]
fn ac_infeasible_at_stage_1_scenario_0_returns_infeasible_error() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::infeasible_on(solution, 2);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let config = TrainingConfig {
loop_config: LoopConfig {
forward_passes: 2,
max_iterations: 100,
start_iteration: 0,
n_fwd_threads: 1,
max_blocks: 1,
stopping_rules: StoppingRuleSet {
rules: vec![StoppingRule::IterationLimit { limit: 100 }],
mode: StoppingMode::Any,
},
},
cut_management: CutManagementConfig {
cut_selection: None,
budget: None,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
warm_start_cuts: 0,
risk_measures: vec![RiskMeasure::Expectation],
},
events: EventConfig {
event_sender: None,
checkpoint_interval: None,
shutdown_flag: None,
export_states: false,
},
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store =
BasisStore::new(config.loop_config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: config.loop_config.forward_passes as usize,
total_forward_passes: config.loop_config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
);
match result {
Err(crate::SddpError::Infeasible {
stage, scenario, ..
}) => {
assert_eq!(stage, 1, "expected stage=1");
assert_eq!(scenario, 0, "expected scenario=0");
}
other => panic!("expected Infeasible, got {other:?}"),
}
}
#[test]
fn ac_global_scenario_index_rank1_scenario0() {
let rank = 1usize;
let forward_passes = 3usize;
let m = 0usize;
let global_scenario = rank * forward_passes + m;
assert_eq!(global_scenario, 3);
}
#[test]
#[allow(clippy::too_many_lines)]
fn cost_statistics_accumulated_correctly() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let config = TrainingConfig {
loop_config: LoopConfig {
forward_passes: 2,
max_iterations: 100,
start_iteration: 0,
n_fwd_threads: 1,
max_blocks: 1,
stopping_rules: StoppingRuleSet {
rules: vec![StoppingRule::IterationLimit { limit: 100 }],
mode: StoppingMode::Any,
},
},
cut_management: CutManagementConfig {
cut_selection: None,
budget: None,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
warm_start_cuts: 0,
risk_measures: vec![RiskMeasure::Expectation],
},
events: EventConfig {
event_sender: None,
checkpoint_interval: None,
shutdown_flag: None,
export_states: false,
},
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store =
BasisStore::new(config.loop_config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: config.loop_config.forward_passes as usize,
total_forward_passes: config.loop_config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
assert_eq!(result.scenario_costs[0], 210_000.0);
assert_eq!(result.scenario_costs[1], 210_000.0);
let cost_sum: f64 = result.scenario_costs.iter().sum();
let cost_sum_sq: f64 = result.scenario_costs.iter().map(|c| c * c).sum();
assert_eq!(cost_sum, 420_000.0);
assert_eq!(cost_sum_sq, 210_000.0_f64.powi(2) * 2.0);
}
#[test]
fn sync_result_field_access() {
let r = SyncResult {
global_ub_mean: 75.0,
global_ub_std: 12.909,
ci_95_half_width: 12.651,
sync_time_ms: 7,
};
assert_eq!(r.global_ub_mean, 75.0);
assert_eq!(r.global_ub_std, 12.909);
assert_eq!(r.ci_95_half_width, 12.651);
assert_eq!(r.sync_time_ms, 7);
}
#[test]
fn sync_result_clone_and_debug() {
let r = SyncResult {
global_ub_mean: 2.0,
global_ub_std: 3.0,
ci_95_half_width: 4.0,
sync_time_ms: 5,
};
let c = r.clone();
assert_eq!(c.global_ub_mean, r.global_ub_mean);
assert_eq!(c.global_ub_std, r.global_ub_std);
let s = format!("{r:?}");
assert!(s.contains("SyncResult"));
}
#[test]
fn ub_statistics_four_scenarios_correct_mean_and_std() {
let local = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 4).unwrap();
assert_eq!(result.global_ub_mean, 75.0, "mean must be 300/4 = 75");
let expected_std = (500.0_f64 / 3.0).sqrt();
let tolerance = 1e-9;
assert!(
(result.global_ub_std - expected_std).abs() < tolerance,
"std deviation {got} should be ≈ {expected_std}",
got = result.global_ub_std,
);
let expected_ci = 1.96_f64 * expected_std / 4.0_f64.sqrt();
assert!(
(result.ci_95_half_width - expected_ci).abs() < tolerance,
"ci_95 {got} should be ≈ {expected_ci}",
got = result.ci_95_half_width,
);
}
#[test]
fn acceptance_criterion_ub_mean() {
let local = ForwardResult {
scenario_costs: vec![60.0, 70.0, 80.0, 90.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 4).unwrap();
assert_eq!(result.global_ub_mean, 75.0);
assert!(
result.global_ub_std > 0.0,
"std must be positive for 4 distinct scenarios"
);
}
#[test]
fn canonical_summation_identical_regardless_of_partition() {
let single_rank = ForwardResult {
scenario_costs: vec![1.0, 2.0, 3.0, 4.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result_single = sync_forward(&single_rank, &comm, 4).unwrap();
let global_costs = [1.0_f64, 2.0, 3.0, 4.0];
let global_n = global_costs.len();
#[allow(clippy::cast_precision_loss)]
let global_n_f64 = global_n as f64;
let cost_sum: f64 = global_costs.iter().sum();
let cost_sum_sq: f64 = global_costs.iter().map(|c| c * c).sum();
let mean = cost_sum / global_n_f64;
let variance = (cost_sum_sq - global_n_f64 * mean * mean) / (global_n_f64 - 1.0);
let expected_std = variance.max(0.0).sqrt();
let expected_mean = mean;
assert_eq!(
result_single.global_ub_mean.to_bits(),
expected_mean.to_bits(),
"mean must be bit-identical to sequential summation of [1,2,3,4]"
);
assert_eq!(
result_single.global_ub_std.to_bits(),
expected_std.to_bits(),
"std must be bit-identical to sequential summation of [1,2,3,4]"
);
}
#[test]
fn bessel_correction_single_scenario_zero_std_and_ci() {
let local = ForwardResult {
scenario_costs: vec![500.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 1).unwrap();
assert_eq!(
result.global_ub_std, 0.0,
"std must be 0.0 for a single scenario (N=1 Bessel correction)"
);
assert_eq!(
result.ci_95_half_width, 0.0,
"ci_95 must be 0.0 for a single scenario"
);
}
#[test]
fn negative_variance_guard_produces_zero_std_not_nan() {
let v = 1.0e15_f64;
let local = ForwardResult {
scenario_costs: vec![v, v],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
assert!(
!result.global_ub_std.is_nan(),
"std must not be NaN even when floating-point variance is slightly negative"
);
assert_eq!(
result.global_ub_std, 0.0,
"std must be 0.0 when variance is zero (or clamps from tiny negative)"
);
}
#[test]
fn sync_forward_local_backend_global_equals_local() {
let local = ForwardResult {
scenario_costs: vec![420.0, 420.0],
elapsed_ms: 5,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
assert_eq!(
result.global_ub_mean, 420.0,
"global_ub_mean must equal the arithmetic mean of the cost vector"
);
}
#[test]
fn sync_forward_sync_time_ms_is_valid_u64() {
let local = ForwardResult {
scenario_costs: vec![50.0, 50.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = LocalBackend;
let result = sync_forward(&local, &comm, 2).unwrap();
let _ = result.sync_time_ms;
}
#[test]
fn sync_forward_comm_error_wraps_as_sddp_communication() {
use cobre_comm::CommError;
struct FailingComm;
impl Communicator for FailingComm {
fn allgatherv<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_counts: &[usize],
_displs: &[usize],
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn allreduce<T: CommData>(
&self,
_send: &[T],
_recv: &mut [T],
_op: ReduceOp,
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn broadcast<T: CommData>(
&self,
_buf: &mut [T],
_root: usize,
) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn barrier(&self) -> Result<(), CommError> {
Err(CommError::InvalidCommunicator)
}
fn rank(&self) -> usize {
0
}
fn size(&self) -> usize {
1
}
fn abort(&self, error_code: i32) -> ! {
std::process::exit(error_code)
}
}
let local = ForwardResult {
scenario_costs: vec![100.0],
elapsed_ms: 0,
lp_solves: 0,
setup_time_ms: 0,
load_imbalance_ms: 0,
scheduling_overhead_ms: 0,
stage_stats: Vec::new(),
};
let comm = FailingComm;
let err = sync_forward(&local, &comm, 1).unwrap_err();
assert!(
matches!(err, crate::SddpError::Communication(_)),
"CommError must be wrapped as SddpError::Communication, got: {err:?}"
);
}
fn run_one_iteration(
ws: &mut SolverWorkspace<MockSolver>,
basis_store: &mut BasisStore,
) -> Result<(), crate::SddpError> {
let indexer = StageIndexer::new(1, 0);
let fcf = FutureCostFunction::new(3, indexer.n_state, 1, 100, &[0; 3]);
let config = TrainingConfig {
loop_config: LoopConfig {
forward_passes: 1,
max_iterations: 100,
start_iteration: 0,
n_fwd_threads: 1,
max_blocks: 1,
stopping_rules: StoppingRuleSet {
rules: vec![StoppingRule::IterationLimit { limit: 100 }],
mode: StoppingMode::Any,
},
},
cut_management: CutManagementConfig {
cut_selection: None,
budget: None,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
warm_start_cuts: 0,
risk_measures: vec![RiskMeasure::Expectation],
},
events: EventConfig {
event_sender: None,
checkpoint_interval: None,
shutdown_flag: None,
export_states: false,
},
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
run_forward_pass(
std::slice::from_mut(ws),
basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: config.loop_config.forward_passes as usize,
total_forward_passes: config.loop_config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.map(|_| ())
}
#[test]
fn warm_start_first_iteration_cold_second_iteration_warm() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 3);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert_eq!(
ws.solver.warm_start_calls, 0,
"first iteration must use cold-start for all stages (warm_start_calls == 0)"
);
assert!(
(0..3).all(|t| basis_store.get(0, t).is_some()),
"basis_store must be fully populated for scenario 0 after the first iteration"
);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert!(
ws.solver.warm_start_calls > 0,
"second iteration must use warm-start for at least one stage \
(warm_start_calls > 0, got {})",
ws.solver.warm_start_calls
);
}
#[test]
fn basis_invalidated_on_solver_error() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::infeasible_on(solution, 4);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 3);
run_one_iteration(&mut ws, &mut basis_store).unwrap();
assert!(
(0..3).all(|t| basis_store.get(0, t).is_some()),
"basis_store must be fully populated for scenario 0 after iteration 1"
);
let err = run_one_iteration(&mut ws, &mut basis_store).unwrap_err();
assert!(
matches!(err, crate::SddpError::Infeasible { stage: 1, .. }),
"expected Infeasible at stage 1, got: {err:?}"
);
assert!(
basis_store.get(0, 1).is_none(),
"basis_store.get(0, 1) must be None after solver error at stage 1"
);
assert!(
basis_store.get(0, 0).is_some(),
"basis_store.get(0, 0) must be Some (stage 0 succeeded before error)"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_forward_pass_parallel_cost_agreement() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10;
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let mut ws1 = single_workspace(MockSolver::always_ok(solution.clone()), &indexer);
let mut records1 = empty_records(n_scenarios * 3);
let mut basis_store1 = BasisStore::new(n_scenarios, templates.len());
let result1 = run_forward_pass(
std::slice::from_mut(&mut ws1),
&mut basis_store1,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
total_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records1,
)
.unwrap();
let mut workspaces4: Vec<SolverWorkspace<MockSolver>> = (0..4)
.map(|_| single_workspace(MockSolver::always_ok(solution.clone()), &indexer))
.collect();
let mut records4 = empty_records(n_scenarios * 3);
let mut basis_store4 = BasisStore::new(n_scenarios, templates.len());
let result4 = run_forward_pass(
&mut workspaces4,
&mut basis_store4,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
total_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records4,
)
.unwrap();
assert_eq!(
result1.scenario_costs.len(),
result4.scenario_costs.len(),
"scenario_costs length must be identical for 1 and 4 workspaces"
);
for (i, (c1, c4)) in result1
.scenario_costs
.iter()
.zip(result4.scenario_costs.iter())
.enumerate()
{
assert_eq!(
c1.to_bits(),
c4.to_bits(),
"scenario_costs[{i}] must be bit-identical: 1-workspace={c1:.17e}, 4-workspace={c4:.17e}"
);
}
}
#[allow(clippy::too_many_lines)]
#[test]
fn test_forward_pass_work_distribution() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let horizon = HorizonMode::Finite { num_stages: 3 };
let num_stages = 3usize;
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10usize;
let n_workers = 4usize;
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|_| single_workspace(MockSolver::always_ok(solution.clone()), &indexer))
.collect();
let mut records = empty_records(n_scenarios * num_stages);
let mut basis_store = BasisStore::new(n_scenarios, num_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let _result = run_forward_pass(
&mut workspaces,
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
total_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
for (w, ws) in workspaces.iter().enumerate() {
let (start_m, end_m) = partition(n_scenarios, n_workers, w);
let assigned_scenarios = end_m - start_m;
let expected_solves = assigned_scenarios * num_stages;
let floor_scenarios = n_scenarios / n_workers;
let ceil_scenarios = n_scenarios.div_ceil(n_workers);
assert!(
assigned_scenarios == floor_scenarios || assigned_scenarios == ceil_scenarios,
"worker {w} assigned {assigned_scenarios} scenarios, expected {floor_scenarios} or {ceil_scenarios}"
);
let actual_solves = usize::try_from(ws.solver.statistics().solve_count)
.expect("solve_count fits in usize in tests");
assert_eq!(
actual_solves, expected_solves,
"worker {w} (scenarios [{start_m}, {end_m})) performed {actual_solves} solves, expected {expected_solves}"
);
}
let total_solves: usize = workspaces
.iter()
.map(|ws| {
usize::try_from(ws.solver.statistics().solve_count)
.expect("solve_count fits in usize in tests")
})
.sum();
assert_eq!(
total_solves,
n_scenarios * num_stages,
"total solve count {total_solves} must equal n_scenarios * num_stages = {}",
n_scenarios * num_stages
);
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_1h_1s(mean_m3s: f64, std_m3s: f64) -> StochasticContext {
use std::collections::BTreeMap;
use chrono::NaiveDate;
use cobre_core::entities::hydro::{Hydro, HydroGenerationModel, HydroPenalties};
use cobre_core::scenario::{
CorrelationEntity, CorrelationGroup, CorrelationModel, CorrelationProfile, InflowModel,
};
use cobre_core::temporal::{
Block, BlockMode, NoiseMethod, ScenarioSourceConfig, Stage, StageRiskConfig,
StageStateConfig,
};
use cobre_core::{Bus, DeficitSegment, EntityId, SystemBuilder};
let bus = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(1),
name: "H1".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(1),
stage_id: 0,
mean_m3s,
std_m3s,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let mut profiles = BTreeMap::new();
profiles.insert(
"default".to_string(),
CorrelationProfile {
groups: vec![CorrelationGroup {
name: "g1".to_string(),
entities: vec![CorrelationEntity {
entity_type: "inflow".to_string(),
id: EntityId(1),
}],
matrix: vec![vec![1.0]],
}],
},
);
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles,
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
fn minimal_template_1_0_with_base(base_rhs: f64) -> StageTemplate {
StageTemplate {
num_cols: 4,
num_rows: 3,
num_nz: 1,
col_starts: vec![0_i32, 0, 0, 1, 1],
row_indices: vec![0_i32],
values: vec![1.0],
col_lower: vec![0.0, f64::NEG_INFINITY, 0.0, 0.0],
col_upper: vec![f64::INFINITY; 4],
objective: vec![0.0, 0.0, 0.0, 1.0],
row_lower: vec![0.0, 0.0, base_rhs],
row_upper: vec![0.0, 0.0, base_rhs],
n_state: 1,
n_transfer: 0,
n_dual_relevant: 1,
n_hydro: 1,
max_par_order: 0,
col_scale: Vec::new(),
row_scale: Vec::new(),
}
}
fn run_single_stage_forward(
stochastic: &StochasticContext,
inflow_method: InflowNonNegativityMethod,
base_rhs: f64,
noise_scale_val: f64,
) -> Vec<f64> {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 0.0, indexer.theta, 0.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, &[0; 1]);
let horizon = HorizonMode::Finite { num_stages: 1 };
let template = minimal_template_1_0_with_base(base_rhs);
let templates = vec![template];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let mut ws = single_workspace(solver, &indexer);
let mut basis_store = BasisStore::new(1, 1);
let noise_scale = vec![noise_scale_val];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &noise_scale,
n_hydros: 1,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let stages = vec![Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 1,
noise_method: NoiseMethod::Saa,
},
}];
let _ = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &inflow_method,
stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: 1,
total_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
ws.scratch.noise_buf.clone()
}
#[test]
fn truncation_clamps_negative_inflow_noise() {
let mean_m3s = -1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64; let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let noise_buf_truncation = run_single_stage_forward(
&stochastic,
InflowNonNegativityMethod::Truncation,
base_rhs,
noise_scale_val,
);
assert_eq!(noise_buf_truncation.len(), 1, "noise_buf must have 1 entry");
assert!(
noise_buf_truncation[0] >= 0.0,
"after truncation, noise_buf[0] must be >= 0 (inflow cannot be negative), got {}",
noise_buf_truncation[0]
);
}
#[test]
fn truncation_no_clamp_when_inflow_positive() {
let mean_m3s = 1000.0_f64;
let sigma = 1.0_f64;
let zeta = 1.0_f64;
let base_rhs = zeta * mean_m3s;
let noise_scale_val = zeta * sigma;
let stochastic = make_stochastic_1h_1s(mean_m3s, sigma);
let noise_buf_truncation = run_single_stage_forward(
&stochastic,
InflowNonNegativityMethod::Truncation,
base_rhs,
noise_scale_val,
);
let noise_buf_none = run_single_stage_forward(
&stochastic,
InflowNonNegativityMethod::None,
base_rhs,
noise_scale_val,
);
assert_eq!(noise_buf_truncation.len(), 1);
assert_eq!(noise_buf_none.len(), 1);
assert_eq!(
noise_buf_truncation[0].to_bits(),
noise_buf_none[0].to_bits(),
"when inflow is positive, truncation must not alter the noise buffer (expected identical bits)"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn none_method_unchanged_with_truncation_code_present() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let solver = MockSolver::always_ok(solution);
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let config = TrainingConfig {
loop_config: LoopConfig {
forward_passes: 2,
max_iterations: 100,
start_iteration: 0,
n_fwd_threads: 1,
max_blocks: 1,
stopping_rules: StoppingRuleSet {
rules: vec![StoppingRule::IterationLimit { limit: 100 }],
mode: StoppingMode::Any,
},
},
cut_management: CutManagementConfig {
cut_selection: None,
budget: None,
cut_activity_tolerance: 0.0,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
warm_start_cuts: 0,
risk_measures: vec![RiskMeasure::Expectation],
},
events: EventConfig {
event_sender: None,
checkpoint_interval: None,
shutdown_flag: None,
export_states: false,
},
};
let horizon = HorizonMode::Finite { num_stages: 3 };
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(2 * 3);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let mut ws = single_workspace(solver, &indexer);
let mut basis_store =
BasisStore::new(config.loop_config.forward_passes as usize, templates.len());
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let result = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: config.loop_config.forward_passes as usize,
total_forward_passes: config.loop_config.forward_passes as usize,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(result.scenario_costs.len(), 2);
for (i, record) in records.iter().enumerate() {
assert_eq!(
record.stage_cost, 70_000.0,
"none_method: record[{i}].stage_cost should be 70_000.0 ((objective - theta) * COST_SCALE_FACTOR)"
);
}
}
#[allow(clippy::too_many_lines)]
fn make_stochastic_context_1_hydro_1_load_bus(mean_mw: f64, std_mw: f64) -> StochasticContext {
let bus0 = Bus {
id: EntityId(0),
name: "B0".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let bus1 = Bus {
id: EntityId(1),
name: "B1".to_string(),
deficit_segments: vec![DeficitSegment {
depth_mw: None,
cost_per_mwh: 1000.0,
}],
excess_cost: 0.0,
};
let hydro = Hydro {
id: EntityId(10),
name: "H10".to_string(),
bus_id: EntityId(0),
downstream_id: None,
entry_stage_id: None,
exit_stage_id: None,
min_storage_hm3: 0.0,
max_storage_hm3: 100.0,
min_outflow_m3s: 0.0,
max_outflow_m3s: None,
generation_model: HydroGenerationModel::ConstantProductivity,
min_turbined_m3s: 0.0,
max_turbined_m3s: 100.0,
specific_productivity_mw_per_m3s_per_m: None,
min_generation_mw: 0.0,
max_generation_mw: 100.0,
tailrace: None,
hydraulic_losses: None,
efficiency: None,
evaporation_coefficients_mm: None,
evaporation_reference_volumes_hm3: None,
diversion: None,
filling: None,
penalties: cobre_core::entities::hydro::HydroPenalties {
spillage_cost: 0.0,
diversion_cost: 0.0,
turbined_cost: 0.0,
storage_violation_below_cost: 0.0,
filling_target_violation_cost: 0.0,
turbined_violation_below_cost: 0.0,
outflow_violation_below_cost: 0.0,
outflow_violation_above_cost: 0.0,
generation_violation_below_cost: 0.0,
evaporation_violation_cost: 0.0,
water_withdrawal_violation_cost: 0.0,
water_withdrawal_violation_pos_cost: 0.0,
water_withdrawal_violation_neg_cost: 0.0,
evaporation_violation_pos_cost: 0.0,
evaporation_violation_neg_cost: 0.0,
inflow_nonnegativity_cost: 1000.0,
},
};
let stage = Stage {
index: 0,
id: 0,
start_date: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
end_date: NaiveDate::from_ymd_opt(2024, 2, 1).unwrap(),
season_id: Some(0),
blocks: vec![Block {
index: 0,
name: "S".to_string(),
duration_hours: 744.0,
}],
block_mode: BlockMode::Parallel,
state_config: StageStateConfig {
storage: true,
inflow_lags: false,
},
risk_config: StageRiskConfig::Expectation,
scenario_config: ScenarioSourceConfig {
branching_factor: 3,
noise_method: NoiseMethod::Saa,
},
};
let inflow_model = InflowModel {
hydro_id: EntityId(10),
stage_id: 0,
mean_m3s: 100.0,
std_m3s: 20.0,
ar_coefficients: vec![],
residual_std_ratio: 1.0,
annual: None,
};
let load_model = LoadModel {
bus_id: EntityId(1),
stage_id: 0,
mean_mw,
std_mw,
};
let correlation = CorrelationModel {
method: "spectral".to_string(),
profiles: std::collections::BTreeMap::new(),
schedule: vec![],
};
let system = SystemBuilder::new()
.buses(vec![bus0, bus1])
.hydros(vec![hydro])
.stages(vec![stage])
.inflow_models(vec![inflow_model])
.load_models(vec![load_model])
.correlation(correlation)
.build()
.unwrap();
build_stochastic_context(
&system,
42,
None,
&[],
&[],
OpeningTreeInputs::default(),
ClassSchemes {
inflow: Some(SamplingScheme::InSample),
load: Some(SamplingScheme::InSample),
ncs: Some(SamplingScheme::InSample),
},
)
.unwrap()
}
#[test]
fn test_forward_pass_parallel_infeasibility() {
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let fcf = FutureCostFunction::new(3, indexer.n_state, 2, 100, &[0; 3]);
let horizon = HorizonMode::Finite { num_stages: 3 };
let num_stages = 3usize;
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let n_scenarios = 10usize;
let n_workers = 4usize;
let mut workspaces: Vec<SolverWorkspace<MockSolver>> = (0..n_workers)
.map(|w| {
let solver = if w == 1 {
MockSolver::infeasible_on(solution.clone(), 0)
} else {
MockSolver::always_ok(solution.clone())
};
single_workspace(solver, &indexer)
})
.collect();
let mut records = empty_records(n_scenarios * num_stages);
let mut basis_store = BasisStore::new(n_scenarios, num_stages);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[],
n_hydros: 0,
n_load_buses: 0,
load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1usize, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let result = run_forward_pass(
&mut workspaces,
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: n_scenarios,
total_forward_passes: n_scenarios,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
);
match result {
Err(crate::SddpError::Infeasible {
stage,
scenario,
iteration,
}) => {
assert_eq!(
stage, 0,
"infeasible stage must be 0 (first stage of worker 1)"
);
assert_eq!(
scenario, 3,
"infeasible scenario must be 3 (start_m of worker 1)"
);
assert_eq!(
iteration, 0,
"iteration must be 0 (first training iteration)"
);
}
Err(other) => panic!("expected SddpError::Infeasible, got: {other:?}"),
Ok(_) => panic!("expected Err(SddpError::Infeasible), got Ok"),
}
}
#[test]
#[allow(clippy::too_many_lines)]
fn forward_pass_load_noise_positive_realization() {
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus(300.0, 30.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, n_load_buses, 1);
let mut ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: MockSolver::always_ok(fixed_solution(4, 100.0, indexer.theta, 30.0)),
patch_buf,
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(1),
inflow_m3s_buf: Vec::with_capacity(1),
lag_matrix_buf: Vec::with_capacity(0),
par_inflow_buf: Vec::with_capacity(1),
eta_floor_buf: Vec::with_capacity(1),
zero_targets_buf: vec![0.0_f64; 1],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let templates = vec![minimal_template_1_0_with_base(100.0)];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, &[0; 1]);
let horizon = HorizonMode::Finite { num_stages: 1 };
let mut basis_store = BasisStore::new(1, 1);
let load_balance_row_starts = vec![10usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[1.0],
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: 1,
total_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry (1 load bus x 1 block)"
);
assert!(
ws.scratch.load_rhs_buf[0] > 0.0,
"load realization must be positive with mean=300, std=30: got {}",
ws.scratch.load_rhs_buf[0]
);
let cat4_start = 2;
assert_eq!(
ws.patch_buf.lower[cat4_start], ws.scratch.load_rhs_buf[0],
"patch_buf lower must equal load_rhs_buf[0]"
);
assert_eq!(
ws.patch_buf.upper[cat4_start], ws.scratch.load_rhs_buf[0],
"patch_buf upper must equal load_rhs_buf[0] (equality constraint)"
);
assert_eq!(
ws.patch_buf.indices[cat4_start], 10,
"patch index must be load_balance_row_starts[0] + 0 * n_blks"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn forward_pass_load_noise_clamped_to_zero() {
let n_load_buses = 1usize;
let stochastic = make_stochastic_context_1_hydro_1_load_bus(-1000.0, 1.0);
let indexer = StageIndexer::new(1, 0);
let patch_buf = crate::lp_builder::PatchBuffer::new(1, 0, n_load_buses, 1);
let mut ws = SolverWorkspace {
rank: 0,
worker_id: 0,
solver: MockSolver::always_ok(fixed_solution(4, 100.0, indexer.theta, 30.0)),
patch_buf,
current_state: Vec::with_capacity(indexer.n_state),
scratch: crate::workspace::ScratchBuffers {
noise_buf: Vec::with_capacity(1),
inflow_m3s_buf: Vec::with_capacity(1),
lag_matrix_buf: Vec::with_capacity(0),
par_inflow_buf: Vec::with_capacity(1),
eta_floor_buf: Vec::with_capacity(1),
zero_targets_buf: vec![0.0_f64; 1],
ncs_col_upper_buf: Vec::new(),
ncs_col_lower_buf: Vec::new(),
ncs_col_indices_buf: Vec::new(),
load_rhs_buf: Vec::with_capacity(n_load_buses),
row_lower_buf: Vec::new(),
z_inflow_rhs_buf: Vec::new(),
effective_eta_buf: Vec::new(),
unscaled_primal: Vec::new(),
unscaled_dual: Vec::new(),
lag_accumulator: vec![],
lag_weight_accum: 0.0,
downstream_accumulator: Vec::new(),
downstream_weight_accum: 0.0,
downstream_completed_lags: Vec::new(),
downstream_n_completed: 0,
current_state_scratch: Vec::new(),
recon_slot_lookup: Vec::new(),
promotion_scratch: crate::basis_reconstruct::PromotionScratch::default(),
trajectory_costs_buf: Vec::new(),
raw_noise_buf: Vec::new(),
perm_scratch: Vec::new(),
},
scratch_basis: Basis::new(0, 0),
backward_accum: BackwardAccumulators::default(),
worker_timing_buf: cobre_core::WorkerPhaseTimings::default(),
};
let templates = vec![minimal_template_1_0_with_base(100.0)];
let base_rows = vec![2usize];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(1);
let fcf = FutureCostFunction::new(1, indexer.n_state, 1, 10, &[0; 1]);
let horizon = HorizonMode::Finite { num_stages: 1 };
let mut basis_store = BasisStore::new(1, 1);
let load_balance_row_starts = vec![10usize];
let load_bus_indices = vec![0usize];
let block_counts_per_stage = vec![1usize];
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[1.0],
n_hydros: 1,
n_load_buses,
load_balance_row_starts: &load_balance_row_starts,
load_bus_indices: &load_bus_indices,
block_counts_per_stage: &block_counts_per_stage,
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &[],
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: 1,
total_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.scratch.load_rhs_buf.len(),
n_load_buses,
"load_rhs_buf must have 1 entry (1 load bus x 1 block)"
);
assert_eq!(
ws.scratch.load_rhs_buf[0], 0.0,
"realization with mean=-1000 must be clamped to 0.0, got {}",
ws.scratch.load_rhs_buf[0]
);
let cat4_start = 2;
assert_eq!(
ws.patch_buf.lower[cat4_start], 0.0,
"patch lower must be 0.0 (clamped)"
);
assert_eq!(
ws.patch_buf.upper[cat4_start], 0.0,
"patch upper must be 0.0 (clamped)"
);
}
#[test]
fn forward_pass_no_load_buses_unchanged() {
let stochastic = make_stochastic_context_1_hydro_3_stages();
let stages = make_stages_3();
let indexer = StageIndexer::new(1, 0);
let solution = fixed_solution(4, 100.0, indexer.theta, 30.0);
let mut ws = single_workspace(MockSolver::always_ok(solution), &indexer);
let templates = vec![
minimal_template_1_0(),
minimal_template_1_0(),
minimal_template_1_0(),
];
let base_rows = vec![2usize, 2, 2];
let initial_state = vec![0.0_f64; indexer.n_state];
let mut records = empty_records(3); let fcf = FutureCostFunction::new(3, indexer.n_state, 1, 10, &[0; 3]);
let horizon = HorizonMode::Finite { num_stages: 3 };
let mut basis_store = BasisStore::new(1, 3);
let ctx = StageContext {
templates: &templates,
base_rows: &base_rows,
noise_scale: &[], n_hydros: 0, n_load_buses: 0, load_balance_row_starts: &[],
load_bus_indices: &[],
block_counts_per_stage: &[1, 1, 1],
ncs_max_gen: &[],
ncs_allow_curtailment: &[],
discount_factors: &[],
cumulative_discount_factors: &[],
stage_lag_transitions: &[],
noise_group_ids: &[],
downstream_par_order: 0,
};
let _fwd = run_forward_pass(
std::slice::from_mut(&mut ws),
&mut basis_store,
&ctx,
&templates,
&fcf,
&TrainingContext {
horizon: &horizon,
indexer: &indexer,
inflow_method: &InflowNonNegativityMethod::None,
stochastic: &stochastic,
initial_state: &initial_state,
inflow_scheme: SamplingScheme::InSample,
load_scheme: SamplingScheme::InSample,
ncs_scheme: SamplingScheme::InSample,
stages: &stages,
historical_library: None,
external_inflow_library: None,
external_load_library: None,
external_ncs_library: None,
recent_accum_seed: &[],
recent_weight_seed: 0.0,
},
&ForwardPassBatch {
local_forward_passes: 1,
total_forward_passes: 1,
iteration: 0,
fwd_offset: 0,
event_sender: None,
basis_activity_window: crate::basis_reconstruct::DEFAULT_BASIS_ACTIVITY_WINDOW,
},
&mut records,
)
.unwrap();
assert_eq!(
ws.patch_buf.forward_patch_count(),
2,
"forward_patch_count must be N*(2+L)=2 when n_load_buses=0, got {}",
ws.patch_buf.forward_patch_count()
);
assert!(
ws.scratch.load_rhs_buf.is_empty(),
"load_rhs_buf must be empty when n_load_buses=0"
);
}
struct RecordingMockSolver {
last_batch: Option<RowBatch>,
add_rows_count: usize,
}
impl RecordingMockSolver {
fn new() -> Self {
Self {
last_batch: None,
add_rows_count: 0,
}
}
}
impl SolverInterface for RecordingMockSolver {
fn solver_name_version(&self) -> String {
"MockSolver 0.0.0".to_string()
}
fn load_model(&mut self, _template: &StageTemplate) {}
fn add_rows(&mut self, cuts: &RowBatch) {
self.last_batch = Some(RowBatch {
num_rows: cuts.num_rows,
row_starts: cuts.row_starts.clone(),
col_indices: cuts.col_indices.clone(),
values: cuts.values.clone(),
row_lower: cuts.row_lower.clone(),
row_upper: cuts.row_upper.clone(),
});
self.add_rows_count += 1;
}
fn set_row_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn set_col_bounds(&mut self, _indices: &[usize], _lower: &[f64], _upper: &[f64]) {}
fn solve(
&mut self,
_basis: Option<&Basis>,
) -> Result<cobre_solver::SolutionView<'_>, SolverError> {
Err(SolverError::InternalError {
message: "not implemented for test".to_string(),
error_code: None,
})
}
fn get_basis(&mut self, _out: &mut Basis) {}
fn statistics(&self) -> SolverStatistics {
SolverStatistics::default()
}
fn name(&self) -> &'static str {
"RecordingMock"
}
}
fn empty_row_batch() -> RowBatch {
RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
}
}
#[test]
fn append_new_cuts_returns_zero_when_no_new_cuts() {
use crate::cut::CutRowMap;
let fcf = crate::cut::FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
let indexer = crate::indexer::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 0);
assert_eq!(solver.add_rows_count, 0);
}
#[test]
fn append_new_cuts_appends_all_on_empty_row_map() {
use crate::cut::CutRowMap;
let mut fcf = crate::cut::FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::indexer::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 2);
assert_eq!(solver.add_rows_count, 1);
assert_eq!(row_map.total_cut_rows(), 2);
assert_eq!(row_map.lp_row_for_slot(0), Some(5));
assert_eq!(row_map.lp_row_for_slot(1), Some(6));
}
#[test]
fn append_new_cuts_skips_already_mapped_cuts() {
use crate::cut::CutRowMap;
let mut fcf = crate::cut::FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::indexer::StageIndexer::new(1, 0);
let mut row_map = CutRowMap::new(10, 5);
row_map.insert(0);
let mut batch_buf = empty_row_batch();
let mut solver = RecordingMockSolver::new();
let count = super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut batch_buf,
);
assert_eq!(count, 1);
assert_eq!(solver.add_rows_count, 1);
assert_eq!(row_map.total_cut_rows(), 2);
assert!(solver.last_batch.as_ref().is_some_and(|b| b.num_rows == 1));
}
#[test]
fn append_new_cuts_matches_build_cut_row_batch_into() {
use crate::cut::CutRowMap;
let mut fcf = crate::cut::FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]); fcf.add_cut(0, 1, 0, 20.0, &[3.0]);
let indexer = crate::indexer::StageIndexer::new(1, 0);
let mut expected_batch = empty_row_batch();
super::build_cut_row_batch_into(&mut expected_batch, &fcf, 0, &indexer, &[]);
let mut row_map = CutRowMap::new(10, 5);
let mut actual_batch = empty_row_batch();
let mut solver = RecordingMockSolver::new();
super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&[],
&mut row_map,
&mut actual_batch,
);
assert_eq!(actual_batch.num_rows, expected_batch.num_rows);
assert_eq!(actual_batch.row_starts, expected_batch.row_starts);
assert_eq!(actual_batch.col_indices, expected_batch.col_indices);
assert_eq!(actual_batch.values, expected_batch.values);
assert_eq!(actual_batch.row_lower, expected_batch.row_lower);
assert_eq!(actual_batch.row_upper, expected_batch.row_upper);
}
#[test]
fn append_new_cuts_with_scaling_matches_build() {
use crate::cut::CutRowMap;
let mut fcf = crate::cut::FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 0, 0, 10.0, &[1.0]);
let indexer = crate::indexer::StageIndexer::new(1, 0);
let col_scale = vec![0.5, 2.0, 1.0, 0.1];
let mut expected = empty_row_batch();
super::build_cut_row_batch_into(&mut expected, &fcf, 0, &indexer, &col_scale);
let mut row_map = CutRowMap::new(10, 5);
let mut actual = empty_row_batch();
let mut solver = RecordingMockSolver::new();
super::append_new_cuts_to_lp(
&mut solver,
&fcf,
0,
&indexer,
&col_scale,
&mut row_map,
&mut actual,
);
assert_eq!(actual.values, expected.values);
assert_eq!(actual.col_indices, expected.col_indices);
}
fn empty_delta_batch() -> RowBatch {
RowBatch {
num_rows: 0,
row_starts: Vec::new(),
col_indices: Vec::new(),
values: Vec::new(),
row_lower: Vec::new(),
row_upper: Vec::new(),
}
}
#[test]
fn test_build_delta_empty_pool() {
let fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
let indexer = StageIndexer::new(1, 0);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.num_rows, 0);
assert_eq!(batch.row_starts, vec![0_i32]);
assert!(batch.col_indices.is_empty());
assert!(batch.values.is_empty());
assert!(batch.row_lower.is_empty());
assert!(batch.row_upper.is_empty());
}
#[test]
fn test_build_delta_single_iteration_filter() {
let mut fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 10.0, &[1.0]);
fcf.add_cut(0, 2, 0, 20.0, &[2.0]);
fcf.add_cut(0, 3, 0, 30.0, &[3.0]);
let indexer = StageIndexer::new(1, 0);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 2);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_lower, vec![20.0]);
assert_eq!(batch.row_starts, vec![0_i32, 2_i32]);
assert_eq!(batch.values[0], -2.0);
}
#[test]
fn test_build_delta_skips_deactivated_cuts() {
let mut fcf = FutureCostFunction::new(2, 1, 2, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 10.0, &[1.0]);
fcf.add_cut(0, 1, 1, 20.0, &[2.0]);
fcf.pools[0].deactivate(&[2]);
let indexer = StageIndexer::new(1, 0);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_lower, vec![20.0]);
}
#[test]
fn test_build_delta_excludes_warm_start_cuts() {
use cobre_io::OwnedPolicyCutRecord;
let warm_record = OwnedPolicyCutRecord {
cut_id: 0,
slot_index: 0,
coefficients: vec![5.0],
intercept: 99.0,
iteration: 0,
forward_pass_index: 0,
is_active: true,
};
let mut pool = crate::cut::pool::CutPool::new_with_warm_start(1, 2, 10, &[warm_record]);
pool.add_cut(1, 0, 7.0, &[1.0]);
let mut fcf = FutureCostFunction::new(2, 1, 2, 10, &[0; 2]);
fcf.pools[0] = pool;
let indexer = StageIndexer::new(1, 0);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_lower, vec![7.0]);
}
#[test]
fn test_build_delta_matches_full_batch_when_pool_has_only_current_iter() {
let mut fcf = FutureCostFunction::new(2, 1, 2, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 10.0, &[1.0]);
fcf.add_cut(0, 1, 1, 20.0, &[3.0]);
let indexer = StageIndexer::new(1, 0);
let mut batch_full = empty_delta_batch();
super::build_cut_row_batch_into(&mut batch_full, &fcf, 0, &indexer, &[]);
let mut batch_delta = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch_delta, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch_delta.num_rows, batch_full.num_rows);
assert_eq!(batch_delta.row_starts, batch_full.row_starts);
assert_eq!(batch_delta.col_indices, batch_full.col_indices);
assert_eq!(batch_delta.values, batch_full.values);
assert_eq!(batch_delta.row_lower, batch_full.row_lower);
assert_eq!(batch_delta.row_upper, batch_full.row_upper);
}
#[test]
fn test_build_delta_sparse_path() {
let indexer = StageIndexer::new(1, 1);
let mask_len = indexer.nonzero_state_indices.len();
if mask_len == 0 {
return;
}
let mut fcf = FutureCostFunction::new(2, indexer.n_state, 1, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 5.0, &vec![1.0; indexer.n_state]);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.col_indices.len(), mask_len + 1);
}
#[test]
fn test_build_delta_reuses_out_buffer() {
let mut fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 11.0, &[1.0]);
fcf.add_cut(0, 2, 0, 22.0, &[2.0]);
let indexer = StageIndexer::new(1, 0);
let mut batch = empty_delta_batch();
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_lower, vec![11.0]);
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 2);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_lower, vec![22.0]);
assert_eq!(batch.row_starts.len(), 2); }
#[test]
fn test_build_delta_clears_row_starts() {
let mut fcf = FutureCostFunction::new(2, 1, 1, 10, &[0; 2]);
fcf.add_cut(0, 1, 0, 5.0, &[1.0]);
let indexer = StageIndexer::new(1, 0);
let mut batch = RowBatch {
num_rows: 5,
row_starts: vec![0_i32, 2, 4, 6, 8, 10],
col_indices: vec![0_i32; 10],
values: vec![99.0_f64; 10],
row_lower: vec![0.0_f64; 5],
row_upper: vec![0.0_f64; 5],
};
build_delta_cut_row_batch_into(&mut batch, &fcf, 0, &indexer, &[], 1);
assert_eq!(batch.row_starts[0], 0_i32);
assert_eq!(batch.num_rows, 1);
assert_eq!(batch.row_starts.len(), 2);
}
}