Skip to main content

hydra_engine_wds/analysis/
binning.rs

1use super::errors::AnalysisComputeError;
2use crate::io::analysis_io::{
3    AnalysisArtifact, AnalysisSource, ContinuousDistribution, DistributionSet, HistogramBin,
4    StatusDistribution, SummaryStats, ANALYSIS_SCHEMA_VERSION,
5};
6use crate::io::out_reader;
7use crate::simulation::Simulation;
8use crate::RuntimeEstimate;
9
10/// Selects which result variables are included in an [`AnalysisArtifact`].
11///
12/// Setting a field to `false` omits that variable entirely; the corresponding
13/// histograms and summary statistics are not computed or stored. Use
14/// [`AnalysisSelection::all`] to enable every variable.
15#[derive(Debug, Clone, Copy)]
16pub struct AnalysisSelection {
17    /// Include nodal gauge-pressure histograms and summaries.
18    pub pressure: bool,
19    /// Include nodal hydraulic-head histograms and summaries.
20    pub head: bool,
21    /// Include link volumetric-flow histograms and summaries.
22    pub flow: bool,
23    /// Include link mean-velocity histograms and summaries.
24    pub velocity: bool,
25    /// Include link status (open/closed) distributions.
26    pub status: bool,
27}
28
29impl AnalysisSelection {
30    /// Returns an `AnalysisSelection` with every variable enabled.
31    pub fn all() -> Self {
32        Self {
33            pressure: true,
34            head: true,
35            flow: true,
36            velocity: true,
37            status: true,
38        }
39    }
40
41    fn any(self) -> bool {
42        self.pressure || self.head || self.flow || self.velocity || self.status
43    }
44
45    fn any_continuous(self) -> bool {
46        self.pressure || self.head || self.flow || self.velocity
47    }
48
49    fn continuous_count(self) -> usize {
50        usize::from(self.pressure)
51            + usize::from(self.head)
52            + usize::from(self.flow)
53            + usize::from(self.velocity)
54    }
55}
56
57/// Estimate analysis effort from summary metadata and module selection.
58pub fn estimate_analysis_runtime_millis(
59    node_count: usize,
60    link_count: usize,
61    period_count: usize,
62    selection: AnalysisSelection,
63) -> RuntimeEstimate {
64    if !selection.any() || period_count == 0 {
65        return RuntimeEstimate::Low;
66    }
67
68    let nodes = node_count.max(1) as f64;
69    let links = link_count.max(1) as f64;
70    let periods = period_count.max(1) as f64;
71
72    let node_modules = usize::from(selection.pressure) + usize::from(selection.head);
73    let link_modules = usize::from(selection.flow) + usize::from(selection.velocity);
74    let status_weight = if selection.status { 0.35 } else { 0.0 };
75
76    let per_period_ops =
77        nodes * node_modules as f64 + links * (link_modules as f64 + status_weight);
78    let pass_factor = if selection.any_continuous() {
79        1.75
80    } else {
81        1.0
82    };
83    let complexity_score = periods * per_period_ops * pass_factor;
84
85    if complexity_score < 4_000_000.0 {
86        RuntimeEstimate::Low
87    } else if complexity_score < 40_000_000.0 {
88        RuntimeEstimate::Medium
89    } else {
90        RuntimeEstimate::High
91    }
92}
93
94/// Build an aggregated analysis artifact from simulation snapshots.
95pub fn build_analysis_artifact(sim: &Simulation) -> Result<AnalysisArtifact, AnalysisComputeError> {
96    let times = sim.snapshot_times();
97    if times.is_empty() {
98        return Err(AnalysisComputeError::NoSnapshots);
99    }
100
101    let mut pressure_values = Vec::new();
102    let mut head_values = Vec::new();
103    let mut flow_values = Vec::new();
104    let mut velocity_values = Vec::new();
105    let mut status = StatusDistribution::default();
106
107    for t in &times {
108        let node_results = sim.all_node_results_at(*t)?;
109        for node in node_results {
110            pressure_values.push(node.pressure);
111            head_values.push(node.head);
112        }
113
114        let link_results = sim.all_link_results_at(*t)?;
115        for link in link_results {
116            flow_values.push(link.flow.abs());
117            velocity_values.push(link.velocity);
118            increment_status_from_sim_value(&mut status, link.status);
119        }
120    }
121
122    Ok(AnalysisArtifact {
123        schema_version: ANALYSIS_SCHEMA_VERSION,
124        source: AnalysisSource {
125            output_file: "network.out".to_string(),
126            report_file: "report.json".to_string(),
127            period_count: times.len(),
128        },
129        distributions: DistributionSet {
130            pressure: make_distribution(&pressure_values, 20),
131            head: make_distribution(&head_values, 20),
132            flow: make_distribution(&flow_values, 20),
133            velocity: make_distribution(&velocity_values, 20),
134            status,
135        },
136        demand_reliability: None,
137        service_compliance: None,
138    })
139}
140
141/// Build an aggregated analysis artifact from a persisted `.out` file.
142pub fn build_analysis_artifact_from_out(
143    out_path: &std::path::Path,
144) -> Result<AnalysisArtifact, AnalysisComputeError> {
145    build_analysis_artifact_from_out_with_progress_and_selection(
146        out_path,
147        AnalysisSelection::all(),
148        |_percent, _phase, _processed, _total| {},
149    )
150}
151
152/// Build an aggregated analysis artifact from a persisted `.out` file,
153/// reporting incremental progress through read and aggregation stages.
154pub fn build_analysis_artifact_from_out_with_progress<F>(
155    out_path: &std::path::Path,
156    on_progress: F,
157) -> Result<AnalysisArtifact, AnalysisComputeError>
158where
159    F: FnMut(f64, &'static str, usize, usize),
160{
161    build_analysis_artifact_from_out_with_progress_and_selection(
162        out_path,
163        AnalysisSelection::all(),
164        on_progress,
165    )
166}
167
168/// Build an [`AnalysisArtifact`] from a `.out` file with per-period progress
169/// callbacks and configurable variable selection.
170///
171/// `on_progress` is called after each reporting period with
172/// `(pct_complete: f64, phase_label: &str, current_period: usize, total_periods: usize)`.
173/// Pass a no-op closure when progress reporting is not needed.
174pub fn build_analysis_artifact_from_out_with_progress_and_selection<F>(
175    out_path: &std::path::Path,
176    selection: AnalysisSelection,
177    mut on_progress: F,
178) -> Result<AnalysisArtifact, AnalysisComputeError>
179where
180    F: FnMut(f64, &'static str, usize, usize),
181{
182    let meta = out_reader::read_metadata_checked(out_path)
183        .map_err(|e| AnalysisComputeError::OutRead(e.to_string()))?;
184
185    if meta.n_periods == 0 {
186        return Err(AnalysisComputeError::NoSnapshots);
187    }
188
189    if !selection.any() {
190        on_progress(100.0, "Finalizing analysis artifact", 0, 0);
191        return Ok(AnalysisArtifact {
192            schema_version: ANALYSIS_SCHEMA_VERSION,
193            source: AnalysisSource {
194                output_file: "network.out".to_string(),
195                report_file: "report.json".to_string(),
196                period_count: meta.n_periods,
197            },
198            distributions: DistributionSet::default(),
199            demand_reliability: None,
200            service_compliance: None,
201        });
202    }
203
204    on_progress(0.0, "Scanning periods", 0, meta.n_periods);
205
206    let mut pressure_stats = selection.pressure.then_some(RunningStats::default());
207    let mut head_stats = selection.head.then_some(RunningStats::default());
208    let mut flow_stats = selection.flow.then_some(RunningStats::default());
209    let mut velocity_stats = selection.velocity.then_some(RunningStats::default());
210    let mut status = StatusDistribution::default();
211    let needs_pass2 = selection.any_continuous();
212    let module_finalize_steps = selection.continuous_count();
213    let total_work =
214        (meta.n_periods + if needs_pass2 { meta.n_periods } else { 0 } + module_finalize_steps + 1)
215            as f64;
216
217    // Pass 1: compute exact min/max/mean and status counts.
218    for period in 0..meta.n_periods {
219        let pr = out_reader::read_period(out_path, &meta, period)
220            .map_err(AnalysisComputeError::OutRead)?;
221
222        if let Some(stats) = pressure_stats.as_mut() {
223            for v in pr.node_pressure.iter().copied() {
224                stats.observe(v as f64);
225            }
226        }
227        if let Some(stats) = head_stats.as_mut() {
228            for v in pr.node_head.iter().copied() {
229                stats.observe(v as f64);
230            }
231        }
232        if let Some(stats) = flow_stats.as_mut() {
233            for v in pr.link_flow.iter().copied() {
234                stats.observe((v as f64).abs());
235            }
236        }
237        if let Some(stats) = velocity_stats.as_mut() {
238            for v in pr.link_velocity.iter().copied() {
239                stats.observe(v as f64);
240            }
241        }
242        if selection.status {
243            for v in pr.link_status {
244                increment_status_from_out_value(&mut status, v as f64);
245            }
246        }
247
248        let percent = 100.0 * (period + 1) as f64 / total_work;
249        on_progress(percent, "Scanning periods", period + 1, meta.n_periods);
250    }
251
252    let mut pressure_hist = pressure_stats
253        .as_ref()
254        .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
255    let mut head_hist = head_stats
256        .as_ref()
257        .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
258    let mut flow_hist = flow_stats
259        .as_ref()
260        .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
261    let mut velocity_hist = velocity_stats
262        .as_ref()
263        .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
264
265    if needs_pass2 {
266        // Pass 2: fill histograms using fixed ranges from pass 1.
267        on_progress(
268            100.0 * meta.n_periods as f64 / total_work,
269            "Binning periods",
270            0,
271            meta.n_periods,
272        );
273        for period in 0..meta.n_periods {
274            let pr = out_reader::read_period(out_path, &meta, period)
275                .map_err(AnalysisComputeError::OutRead)?;
276
277            if let Some(hist) = pressure_hist.as_mut() {
278                for v in pr.node_pressure {
279                    hist.observe(v as f64);
280                }
281            }
282            if let Some(hist) = head_hist.as_mut() {
283                for v in pr.node_head {
284                    hist.observe(v as f64);
285                }
286            }
287            if let Some(hist) = flow_hist.as_mut() {
288                for v in pr.link_flow {
289                    hist.observe((v as f64).abs());
290                }
291            }
292            if let Some(hist) = velocity_hist.as_mut() {
293                for v in pr.link_velocity {
294                    hist.observe(v as f64);
295                }
296            }
297
298            let done_work = meta.n_periods + period + 1;
299            let percent = 100.0 * done_work as f64 / total_work;
300            on_progress(percent, "Binning periods", period + 1, meta.n_periods);
301        }
302    }
303
304    let mut completed_work = meta.n_periods + if needs_pass2 { meta.n_periods } else { 0 };
305    let mut pressure = ContinuousDistribution::default();
306    let mut head = ContinuousDistribution::default();
307    let mut flow = ContinuousDistribution::default();
308    let mut velocity = ContinuousDistribution::default();
309
310    if selection.pressure {
311        on_progress(
312            100.0 * completed_work as f64 / total_work,
313            "Computing pressure distribution",
314            meta.n_periods,
315            meta.n_periods,
316        );
317        if let (Some(hist), Some(stats)) = (pressure_hist.take(), pressure_stats) {
318            pressure = hist.into_distribution(&stats);
319        }
320        completed_work += 1;
321    }
322
323    if selection.head {
324        on_progress(
325            100.0 * completed_work as f64 / total_work,
326            "Computing head distribution",
327            meta.n_periods,
328            meta.n_periods,
329        );
330        if let (Some(hist), Some(stats)) = (head_hist.take(), head_stats) {
331            head = hist.into_distribution(&stats);
332        }
333        completed_work += 1;
334    }
335
336    if selection.flow {
337        on_progress(
338            100.0 * completed_work as f64 / total_work,
339            "Computing flow distribution",
340            meta.n_periods,
341            meta.n_periods,
342        );
343        if let (Some(hist), Some(stats)) = (flow_hist.take(), flow_stats) {
344            flow = hist.into_distribution(&stats);
345        }
346        completed_work += 1;
347    }
348
349    if selection.velocity {
350        on_progress(
351            100.0 * completed_work as f64 / total_work,
352            "Computing velocity distribution",
353            meta.n_periods,
354            meta.n_periods,
355        );
356        if let (Some(hist), Some(stats)) = (velocity_hist.take(), velocity_stats) {
357            velocity = hist.into_distribution(&stats);
358        }
359        completed_work += 1;
360    }
361
362    on_progress(
363        100.0 * completed_work as f64 / total_work,
364        "Finalizing analysis artifact",
365        meta.n_periods,
366        meta.n_periods,
367    );
368
369    Ok(AnalysisArtifact {
370        schema_version: ANALYSIS_SCHEMA_VERSION,
371        source: AnalysisSource {
372            output_file: "network.out".to_string(),
373            report_file: "report.json".to_string(),
374            period_count: meta.n_periods,
375        },
376        distributions: DistributionSet {
377            pressure,
378            head,
379            flow,
380            velocity,
381            status: if selection.status {
382                status
383            } else {
384                StatusDistribution::default()
385            },
386        },
387        demand_reliability: None,
388        service_compliance: None,
389    })
390}
391
392fn increment_status_from_sim_value(status: &mut StatusDistribution, value: f64) {
393    match value.round() as i32 {
394        0 => status.closed += 1,
395        1 => status.open += 1,
396        2 => status.active += 1,
397        _ => status.other += 1,
398    }
399}
400
401fn increment_status_from_out_value(status: &mut StatusDistribution, value: f64) {
402    // `.out` stores EPANET StatusType codes: 0,1,2,3,4,6,7.
403    match value.round() as i32 {
404        3 => status.open += 1,
405        4 | 6 => status.active += 1,
406        0 | 1 | 2 | 7 => status.closed += 1,
407        _ => status.other += 1,
408    }
409}
410
411#[derive(Clone, Copy)]
412struct RunningStats {
413    min: f64,
414    max: f64,
415    sum: f64,
416    count: u64,
417}
418
419impl Default for RunningStats {
420    fn default() -> Self {
421        Self {
422            min: f64::INFINITY,
423            max: f64::NEG_INFINITY,
424            sum: 0.0,
425            count: 0,
426        }
427    }
428}
429
430impl RunningStats {
431    fn observe(&mut self, value: f64) {
432        if value < self.min {
433            self.min = value;
434        }
435        if value > self.max {
436            self.max = value;
437        }
438        self.sum += value;
439        self.count += 1;
440    }
441
442    fn mean(&self) -> f64 {
443        if self.count == 0 {
444            0.0
445        } else {
446            self.sum / self.count as f64
447        }
448    }
449}
450
451struct HistogramAccumulator {
452    min: f64,
453    max: f64,
454    width: f64,
455    counts: Vec<u64>,
456}
457
458impl HistogramAccumulator {
459    fn new_from_stats(stats: &RunningStats, n_bins: usize) -> Self {
460        let bins = n_bins.max(1);
461        let span = (stats.max - stats.min).abs();
462        let width = if stats.count == 0 || span <= f64::EPSILON {
463            1.0
464        } else {
465            (stats.max - stats.min) / bins as f64
466        };
467        Self {
468            min: if stats.count == 0 { 0.0 } else { stats.min },
469            max: if stats.count == 0 { 0.0 } else { stats.max },
470            width,
471            counts: vec![0; bins],
472        }
473    }
474
475    fn observe(&mut self, value: f64) {
476        if self.counts.is_empty() {
477            return;
478        }
479        if (self.max - self.min).abs() <= f64::EPSILON {
480            self.counts[0] += 1;
481            return;
482        }
483        let mut idx = ((value - self.min) / self.width).floor() as isize;
484        if idx < 0 {
485            idx = 0;
486        }
487        if idx as usize >= self.counts.len() {
488            idx = self.counts.len() as isize - 1;
489        }
490        self.counts[idx as usize] += 1;
491    }
492
493    fn into_distribution(self, stats: &RunningStats) -> ContinuousDistribution {
494        if stats.count == 0 {
495            return ContinuousDistribution::default();
496        }
497
498        if (stats.max - stats.min).abs() <= f64::EPSILON {
499            return ContinuousDistribution {
500                bins: vec![HistogramBin {
501                    start: stats.min,
502                    end: stats.max,
503                    count: stats.count,
504                }],
505                summary: SummaryStats {
506                    min: stats.min,
507                    max: stats.max,
508                    mean: stats.mean(),
509                    p05: stats.min,
510                    p25: stats.min,
511                    p50: stats.min,
512                    p75: stats.min,
513                    p95: stats.min,
514                },
515                thresholds: None,
516            };
517        }
518
519        let bins: Vec<HistogramBin> = self
520            .counts
521            .iter()
522            .enumerate()
523            .map(|(i, &count)| HistogramBin {
524                start: self.min + i as f64 * self.width,
525                end: self.min + (i + 1) as f64 * self.width,
526                count,
527            })
528            .collect();
529
530        let p05 = approx_quantile_from_bins(&bins, 0.05, stats.min, stats.max);
531        let p25 = approx_quantile_from_bins(&bins, 0.25, stats.min, stats.max);
532        let p50 = approx_quantile_from_bins(&bins, 0.50, stats.min, stats.max);
533        let p75 = approx_quantile_from_bins(&bins, 0.75, stats.min, stats.max);
534        let p95 = approx_quantile_from_bins(&bins, 0.95, stats.min, stats.max);
535
536        ContinuousDistribution {
537            bins,
538            summary: SummaryStats {
539                min: stats.min,
540                max: stats.max,
541                mean: stats.mean(),
542                p05,
543                p25,
544                p50,
545                p75,
546                p95,
547            },
548            thresholds: None,
549        }
550    }
551}
552
553fn approx_quantile_from_bins(bins: &[HistogramBin], q: f64, min: f64, max: f64) -> f64 {
554    if bins.is_empty() {
555        return 0.0;
556    }
557    let total: u64 = bins.iter().map(|b| b.count).sum();
558    if total == 0 {
559        return 0.0;
560    }
561
562    let target = (q.clamp(0.0, 1.0) * (total.saturating_sub(1)) as f64).round() as u64;
563    let mut cumulative = 0u64;
564    for bin in bins {
565        let next = cumulative + bin.count;
566        if target < next {
567            if bin.count == 0 {
568                return bin.start;
569            }
570            let offset_in_bin = target.saturating_sub(cumulative) as f64 / bin.count as f64;
571            let estimate = bin.start + (bin.end - bin.start) * offset_in_bin;
572            return estimate.clamp(min, max);
573        }
574        cumulative = next;
575    }
576
577    max
578}
579
580fn make_distribution(values: &[f64], n_bins: usize) -> ContinuousDistribution {
581    if values.is_empty() {
582        return ContinuousDistribution::default();
583    }
584
585    let mut sorted = values.to_vec();
586    sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
587
588    let min = sorted[0];
589    let max = sorted[sorted.len() - 1];
590    let mean = sorted.iter().sum::<f64>() / sorted.len() as f64;
591
592    let bins = if (max - min).abs() <= f64::EPSILON {
593        vec![HistogramBin {
594            start: min,
595            end: max,
596            count: sorted.len() as u64,
597        }]
598    } else {
599        let width = (max - min) / n_bins as f64;
600        let mut counts = vec![0u64; n_bins];
601        for value in &sorted {
602            let mut idx = ((value - min) / width).floor() as usize;
603            if idx >= n_bins {
604                idx = n_bins - 1;
605            }
606            counts[idx] += 1;
607        }
608
609        (0..n_bins)
610            .map(|i| HistogramBin {
611                start: min + i as f64 * width,
612                end: min + (i + 1) as f64 * width,
613                count: counts[i],
614            })
615            .collect()
616    };
617
618    ContinuousDistribution {
619        bins,
620        summary: SummaryStats {
621            min,
622            max,
623            mean,
624            p05: quantile(&sorted, 0.05),
625            p25: quantile(&sorted, 0.25),
626            p50: quantile(&sorted, 0.50),
627            p75: quantile(&sorted, 0.75),
628            p95: quantile(&sorted, 0.95),
629        },
630        thresholds: None,
631    }
632}
633
634fn quantile(sorted: &[f64], q: f64) -> f64 {
635    if sorted.is_empty() {
636        return 0.0;
637    }
638    if sorted.len() == 1 {
639        return sorted[0];
640    }
641    let clamped_q = q.clamp(0.0, 1.0);
642    let pos = clamped_q * (sorted.len() - 1) as f64;
643    let lo = pos.floor() as usize;
644    let hi = pos.ceil() as usize;
645    if lo == hi {
646        sorted[lo]
647    } else {
648        let w = pos - lo as f64;
649        sorted[lo] * (1.0 - w) + sorted[hi] * w
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656
657    #[test]
658    fn quantile_interpolates() {
659        let s = vec![1.0, 2.0, 3.0, 4.0];
660        assert!((quantile(&s, 0.5) - 2.5).abs() < 1e-12);
661    }
662
663    #[test]
664    fn analysis_estimate_increases_with_periods() {
665        let selection = AnalysisSelection::all();
666        let short = estimate_analysis_runtime_millis(500, 600, 24, selection);
667        let long = estimate_analysis_runtime_millis(500, 600, 240, selection);
668        assert!(long >= short);
669    }
670
671    #[test]
672    fn analysis_estimate_increases_with_selected_modules() {
673        let fast = AnalysisSelection {
674            pressure: true,
675            head: false,
676            flow: false,
677            velocity: false,
678            status: true,
679        };
680        let full = AnalysisSelection::all();
681        let fast_est = estimate_analysis_runtime_millis(1_200, 1_400, 96, fast);
682        let full_est = estimate_analysis_runtime_millis(1_200, 1_400, 96, full);
683        assert!(full_est >= fast_est);
684    }
685
686    #[test]
687    fn extreme_analysis_case_maps_to_high_effort() {
688        let full = AnalysisSelection::all();
689        let effort = estimate_analysis_runtime_millis(100_000, 120_000, 20_000, full);
690        assert_eq!(effort, RuntimeEstimate::High);
691    }
692}