cobre-sddp 0.8.2

Stochastic Dual Dynamic Programming (SDDP) for hydrothermal dispatch and energy planning
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//! Stochastic preprocessing pipeline: PAR estimation, opening tree loading, and stochastic context construction.

use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;

use cobre_core::{
    EntityId, System,
    scenario::{SamplingScheme, ScenarioSource},
};
use cobre_stochastic::{OpeningTreeInputs, StochasticContext, context::OpeningTree};

use crate::{EstimationPath, EstimationReport, SddpError};

// ---------------------------------------------------------------------------
// PrepareStochasticResult + prepare_stochastic
// ---------------------------------------------------------------------------

/// Result of the stochastic preprocessing pipeline.
///
/// Bundles outputs from [`prepare_stochastic`].
#[derive(Debug)]
pub struct PrepareStochasticResult {
    /// Updated system with estimated PAR models (if estimation ran).
    pub system: System,
    /// Built stochastic context, ready to pass to [`crate::setup::StudySetup::new`].
    pub stochastic: StochasticContext,
    /// Estimation report (`Some` if `inflow_history.parquet` was present and
    /// `inflow_seasonal_stats.parquet` was absent, triggering auto-estimation).
    pub estimation_report: Option<EstimationReport>,
    /// Which of the 7 estimation path rows was taken during preprocessing.
    pub estimation_path: EstimationPath,
}

/// Load and validate a user-supplied opening tree from `scenarios/noise_openings.parquet`.
///
/// Returns `Ok(None)` if the file is absent.
///
/// # Errors
///
/// Returns [`SddpError::Io`] if the file exists but cannot be read or fails validation.
fn load_user_opening_tree_inner(
    case_dir: &Path,
    system: &System,
) -> Result<Option<OpeningTree>, SddpError> {
    let mut ctx = cobre_io::ValidationContext::new();
    let manifest = cobre_io::validate_structure(case_dir, &mut ctx);

    if !manifest.scenarios_noise_openings_parquet {
        return Ok(None);
    }

    let path = case_dir.join("scenarios").join("noise_openings.parquet");

    let rows = cobre_io::scenarios::load_noise_openings(Some(&path))?;

    let n_hydros = system.hydros().len();
    let mut load_bus_ids: Vec<EntityId> = system
        .load_models()
        .iter()
        .filter(|m| m.std_mw > 0.0)
        .map(|m| m.bus_id)
        .collect();
    load_bus_ids.sort_unstable_by_key(|id| id.0);
    load_bus_ids.dedup();
    let n_load_buses = load_bus_ids.len();
    let expected_dim = n_hydros + n_load_buses;

    let expected_stages = system.stages().iter().filter(|s| s.id >= 0).count();
    let mut openings_by_stage: BTreeMap<i32, BTreeSet<u32>> = BTreeMap::new();
    for row in &rows {
        openings_by_stage
            .entry(row.stage_id)
            .or_default()
            .insert(row.opening_index);
    }
    let openings_per_stage: Vec<usize> = openings_by_stage.values().map(BTreeSet::len).collect();

    cobre_io::scenarios::validate_noise_openings(
        &rows,
        expected_dim,
        expected_stages,
        &openings_per_stage,
    )?;

    let tree = cobre_io::scenarios::assemble_opening_tree(rows, expected_dim);
    Ok(Some(tree))
}

/// Build NCS entity factor entries from `System::resolved_ncs_factors()`.
///
/// Converts the dense 3D table into `(entity_id, stage_id, block_pairs)` tuples
/// for `PrecomputedNormal::build`.
#[must_use]
pub fn build_ncs_factor_entries(
    system: &System,
) -> Vec<(
    cobre_core::EntityId,
    i32,
    Vec<cobre_stochastic::normal::precompute::BlockFactorPair>,
)> {
    use cobre_stochastic::normal::precompute::BlockFactorPair;
    use std::collections::BTreeSet;

    let stochastic_ncs: BTreeSet<cobre_core::EntityId> =
        system.ncs_models().iter().map(|m| m.ncs_id).collect();

    if stochastic_ncs.is_empty() {
        return Vec::new();
    }

    let study_stages: Vec<_> = system.stages().iter().filter(|s| s.id >= 0).collect();
    let ncs_ids: Vec<cobre_core::EntityId> = system
        .non_controllable_sources()
        .iter()
        .map(|n| n.id)
        .collect();

    let mut entries = Vec::new();
    for (ncs_idx, ncs_id) in ncs_ids.iter().enumerate() {
        if !stochastic_ncs.contains(ncs_id) {
            continue;
        }
        for (stage_idx, stage) in study_stages.iter().enumerate() {
            #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
            let block_pairs: Vec<BlockFactorPair> = stage
                .blocks
                .iter()
                .enumerate()
                .map(|(block_idx, _)| {
                    let factor = system
                        .resolved_ncs_factors()
                        .factor(ncs_idx, stage_idx, block_idx);
                    // block_idx is a small count (< 1000 in practice); fits in i32.
                    (block_idx as i32, factor)
                })
                .collect();
            entries.push((*ncs_id, stage.id, block_pairs));
        }
    }
    entries
}

/// Load `scenarios/load_factors.json` from the case directory, returning an
/// empty vec when the file is absent. This is consumed by the stochastic
/// context builder for per-block noise scaling.
///
/// # Errors
///
/// Returns [`SddpError`] if the file exists but cannot be read or parsed.
pub fn load_load_factors_for_stochastic(
    case_dir: &Path,
) -> Result<Vec<cobre_io::scenarios::LoadFactorEntry>, SddpError> {
    let path = case_dir.join("scenarios").join("load_factors.json");
    if !path.exists() {
        return Ok(Vec::new());
    }
    cobre_io::scenarios::parse_load_factors(&path).map_err(SddpError::from)
}

/// Build the `HistoricalScenarioLibrary` for the opening tree when any stage
/// uses `NoiseMethod::HistoricalResiduals`. Returns `None` when no stage
/// needs historical draws.
fn build_opening_tree_library(
    system: &System,
    training_source: &ScenarioSource,
) -> Result<Option<cobre_stochastic::HistoricalScenarioLibrary>, SddpError> {
    use cobre_core::temporal::NoiseMethod;
    let needs_historical_tree = system
        .stages()
        .iter()
        .any(|s| s.id >= 0 && s.scenario_config.noise_method == NoiseMethod::HistoricalResiduals);
    if !needs_historical_tree {
        return Ok(None);
    }
    let study_stages: Vec<_> = system
        .stages()
        .iter()
        .filter(|s| s.id >= 0)
        .cloned()
        .collect();
    let hydro_ids: Vec<EntityId> = system.hydros().iter().map(|h| h.id).collect();
    let cycle_len = system
        .policy_graph()
        .season_map
        .as_ref()
        .map(|sm| sm.seasons.len());
    let par = cobre_stochastic::PrecomputedPar::build(
        system.inflow_models(),
        &study_stages,
        &hydro_ids,
        cycle_len,
    )?;
    let max_order = par.max_order();
    let user_pool = training_source.historical_years.as_ref();
    let window_years = cobre_stochastic::discover_historical_windows(
        system.inflow_history(),
        &hydro_ids,
        &study_stages,
        max_order,
        user_pool,
        system.policy_graph().season_map.as_ref(),
        1,
    )?;
    let mut lib = cobre_stochastic::HistoricalScenarioLibrary::new(
        window_years.len(),
        study_stages.len(),
        hydro_ids.len(),
        max_order,
        window_years.clone(),
    );
    // Compute stage_lag_transitions so the η-inversion rolling chain matches
    // the forward-pass lag accumulator. `downstream_par_order = max_order`
    // ensures the chain is wide enough for all AR lags.
    let season_map_ref = system.policy_graph().season_map.as_ref();
    // `precompute_stage_lag_transitions` requires a non-optional &SeasonMap.
    // When the system has no season_map, supply an empty noop map (same
    // pattern used in `StudySetup::new`).
    let noop_season_map;
    let effective_season_map: &cobre_core::temporal::SeasonMap = if let Some(sm) = season_map_ref {
        sm
    } else {
        noop_season_map = cobre_core::temporal::SeasonMap {
            cycle_type: cobre_core::temporal::SeasonCycleType::Monthly,
            seasons: Vec::new(),
        };
        &noop_season_map
    };
    let stage_lag_transitions = crate::lag_transition::precompute_stage_lag_transitions(
        &study_stages,
        effective_season_map,
        max_order,
    );
    cobre_stochastic::standardize_historical_windows(
        &mut lib,
        system.inflow_history(),
        &hydro_ids,
        &study_stages,
        &par,
        &window_years,
        season_map_ref,
        &system.initial_conditions().past_inflows,
        &stage_lag_transitions,
    );
    Ok(Some(lib))
}

/// Compute per-stage external scenario counts for opening tree clamping.
///
/// When any entity class uses External sampling, the external library is padded
/// to a uniform scenario count after loading. The opening tree generator must
/// clamp per-stage openings to the pre-padding raw count. The element-wise
/// minimum across entity classes is returned.
fn compute_external_scenario_counts(
    system: &System,
    training_source: &ScenarioSource,
) -> Option<Vec<usize>> {
    let study_stages: Vec<_> = system
        .stages()
        .iter()
        .filter(|s| s.id >= 0)
        .cloned()
        .collect();
    let n_stages = study_stages.len();

    let inflow_counts: Option<Vec<usize>> =
        if training_source.inflow_scheme == SamplingScheme::External && n_stages > 0 {
            let external_rows = system.external_scenarios();
            let n_hydros = system.hydros().len();
            let mut rows_per_stage = vec![0usize; n_stages];
            #[allow(clippy::cast_sign_loss)]
            for row in external_rows {
                let s = row.stage_id as usize;
                if s < n_stages {
                    rows_per_stage[s] += 1;
                }
            }
            Some(if n_hydros > 0 {
                rows_per_stage.iter().map(|&r| r / n_hydros).collect()
            } else {
                vec![0usize; n_stages]
            })
        } else {
            None
        };

    let load_counts: Option<Vec<usize>> =
        if training_source.load_scheme == SamplingScheme::External && n_stages > 0 {
            let external_rows = system.external_load_scenarios();
            let mut bus_ids: Vec<EntityId> = system
                .load_models()
                .iter()
                .filter(|m| m.std_mw > 0.0)
                .map(|m| m.bus_id)
                .collect();
            bus_ids.sort_unstable_by_key(|id| id.0);
            bus_ids.dedup();
            let n_buses = bus_ids.len();
            let mut rows_per_stage = vec![0usize; n_stages];
            #[allow(clippy::cast_sign_loss)]
            for row in external_rows {
                let s = row.stage_id as usize;
                if s < n_stages {
                    rows_per_stage[s] += 1;
                }
            }
            Some(if n_buses > 0 {
                rows_per_stage.iter().map(|&r| r / n_buses).collect()
            } else {
                vec![0usize; n_stages]
            })
        } else {
            None
        };

    let ncs_counts: Option<Vec<usize>> =
        if training_source.ncs_scheme == SamplingScheme::External && n_stages > 0 {
            let external_rows = system.external_ncs_scenarios();
            let mut ncs_ids: Vec<EntityId> = system.ncs_models().iter().map(|m| m.ncs_id).collect();
            ncs_ids.sort_unstable_by_key(|id| id.0);
            ncs_ids.dedup();
            let n_ncs = ncs_ids.len();
            let mut rows_per_stage = vec![0usize; n_stages];
            #[allow(clippy::cast_sign_loss)]
            for row in external_rows {
                let s = row.stage_id as usize;
                if s < n_stages {
                    rows_per_stage[s] += 1;
                }
            }
            Some(if n_ncs > 0 {
                rows_per_stage.iter().map(|&r| r / n_ncs).collect()
            } else {
                vec![0usize; n_stages]
            })
        } else {
            None
        };

    match (inflow_counts, load_counts, ncs_counts) {
        (None, None, None) => None,
        (Some(a), None, None) => Some(a),
        (None, Some(b), None) => Some(b),
        (None, None, Some(c)) => Some(c),
        (Some(a), Some(b), None) => Some(a.iter().zip(b.iter()).map(|(&x, &y)| x.min(y)).collect()),
        (Some(a), None, Some(c)) => Some(a.iter().zip(c.iter()).map(|(&x, &y)| x.min(y)).collect()),
        (None, Some(b), Some(c)) => Some(b.iter().zip(c.iter()).map(|(&x, &y)| x.min(y)).collect()),
        (Some(a), Some(b), Some(c)) => Some(
            a.iter()
                .zip(b.iter())
                .zip(c.iter())
                .map(|((&x, &y), &z)| x.min(y).min(z))
                .collect(),
        ),
    }
}

/// Run the stochastic preprocessing pipeline: PAR estimation, block factor
/// loading, opening-tree library construction, and stochastic context build.
///
/// # Errors
///
/// Returns [`SddpError::Io`] on file read/parse/validation failure,
/// or [`SddpError::Stochastic`] on PAR/decomposition failure.
pub fn prepare_stochastic(
    system: System,
    case_dir: &Path,
    config: &cobre_io::Config,
    seed: u64,
    training_source: &ScenarioSource,
) -> Result<PrepareStochasticResult, SddpError> {
    let (system, estimation_report, estimation_path) =
        crate::estimation::estimate_from_history(system, case_dir, config)?;

    let user_opening_tree = load_user_opening_tree_inner(case_dir, &system)?;

    // Load block-level load factors (optional).
    let load_factor_entries = load_load_factors_for_stochastic(case_dir)?;
    let block_pairs: Vec<Vec<cobre_stochastic::normal::precompute::BlockFactorPair>> =
        load_factor_entries
            .iter()
            .map(|e| {
                e.block_factors
                    .iter()
                    .map(|bf| (bf.block_id, bf.factor))
                    .collect()
            })
            .collect();
    let entity_factor_entries: Vec<cobre_stochastic::normal::precompute::EntityFactorEntry<'_>> =
        load_factor_entries
            .iter()
            .zip(block_pairs.iter())
            .map(|(e, pairs)| (e.bus_id, e.stage_id, pairs.as_slice()))
            .collect();

    let ncs_factor_entries = build_ncs_factor_entries(&system);
    let ncs_entity_factor_entries: Vec<
        cobre_stochastic::normal::precompute::EntityFactorEntry<'_>,
    > = ncs_factor_entries
        .iter()
        .map(|(ncs_id, stage_id, pairs)| (*ncs_id, *stage_id, pairs.as_slice()))
        .collect();

    let opening_tree_library = build_opening_tree_library(&system, training_source)?;
    let external_scenario_counts = compute_external_scenario_counts(&system, training_source);

    // Compute noise group IDs for noise-group sharing.
    // Groups stages with the same (season_id, year) so weekly stages within
    // the same monthly bucket share noise draws in the opening tree.
    // For uniform monthly studies each stage has a unique group ID, so no
    // sharing is triggered and the opening tree is identical to the pre-noise-
    // sharing baseline.
    let opening_tree_noise_group_ids: Vec<u32> = {
        let study_stages: Vec<_> = system
            .stages()
            .iter()
            .filter(|s| s.id >= 0)
            .cloned()
            .collect();
        crate::lag_transition::precompute_noise_groups(&study_stages)
    };

    let forward_seed = training_source.seed.map(i64::unsigned_abs);
    let stochastic = cobre_stochastic::build_stochastic_context(
        &system,
        seed,
        forward_seed,
        &entity_factor_entries,
        &ncs_entity_factor_entries,
        OpeningTreeInputs {
            user_tree: user_opening_tree,
            historical_library: opening_tree_library.as_ref(),
            external_scenario_counts,
            noise_group_ids: Some(opening_tree_noise_group_ids),
        },
        cobre_stochastic::ClassSchemes {
            inflow: Some(training_source.inflow_scheme),
            load: Some(training_source.load_scheme),
            ncs: Some(training_source.ncs_scheme),
        },
    )?;

    Ok(PrepareStochasticResult {
        system,
        stochastic,
        estimation_report,
        estimation_path,
    })
}