1use super::errors::AnalysisComputeError;
2use crate::io::analysis_io::{
3 AnalysisArtifact, AnalysisSource, ContinuousDistribution, DistributionSet, HistogramBin,
4 StatusDistribution, SummaryStats, ANALYSIS_SCHEMA_VERSION,
5};
6use crate::io::out_reader;
7use crate::simulation::Simulation;
8use crate::RuntimeEstimate;
9
10#[derive(Debug, Clone, Copy)]
16pub struct AnalysisSelection {
17 pub pressure: bool,
19 pub head: bool,
21 pub flow: bool,
23 pub velocity: bool,
25 pub status: bool,
27}
28
29impl AnalysisSelection {
30 pub fn all() -> Self {
32 Self {
33 pressure: true,
34 head: true,
35 flow: true,
36 velocity: true,
37 status: true,
38 }
39 }
40
41 fn any(self) -> bool {
42 self.pressure || self.head || self.flow || self.velocity || self.status
43 }
44
45 fn any_continuous(self) -> bool {
46 self.pressure || self.head || self.flow || self.velocity
47 }
48
49 fn continuous_count(self) -> usize {
50 usize::from(self.pressure)
51 + usize::from(self.head)
52 + usize::from(self.flow)
53 + usize::from(self.velocity)
54 }
55}
56
57pub fn estimate_analysis_runtime_millis(
59 node_count: usize,
60 link_count: usize,
61 period_count: usize,
62 selection: AnalysisSelection,
63) -> RuntimeEstimate {
64 if !selection.any() || period_count == 0 {
65 return RuntimeEstimate::Low;
66 }
67
68 let nodes = node_count.max(1) as f64;
69 let links = link_count.max(1) as f64;
70 let periods = period_count.max(1) as f64;
71
72 let node_modules = usize::from(selection.pressure) + usize::from(selection.head);
73 let link_modules = usize::from(selection.flow) + usize::from(selection.velocity);
74 let status_weight = if selection.status { 0.35 } else { 0.0 };
75
76 let per_period_ops =
77 nodes * node_modules as f64 + links * (link_modules as f64 + status_weight);
78 let pass_factor = if selection.any_continuous() {
79 1.75
80 } else {
81 1.0
82 };
83 let complexity_score = periods * per_period_ops * pass_factor;
84
85 if complexity_score < 4_000_000.0 {
86 RuntimeEstimate::Low
87 } else if complexity_score < 40_000_000.0 {
88 RuntimeEstimate::Medium
89 } else {
90 RuntimeEstimate::High
91 }
92}
93
94pub fn build_analysis_artifact(sim: &Simulation) -> Result<AnalysisArtifact, AnalysisComputeError> {
96 let times = sim.snapshot_times();
97 if times.is_empty() {
98 return Err(AnalysisComputeError::NoSnapshots);
99 }
100
101 let mut pressure_values = Vec::new();
102 let mut head_values = Vec::new();
103 let mut flow_values = Vec::new();
104 let mut velocity_values = Vec::new();
105 let mut status = StatusDistribution::default();
106
107 for t in × {
108 let node_results = sim.all_node_results_at(*t)?;
109 for node in node_results {
110 pressure_values.push(node.pressure);
111 head_values.push(node.head);
112 }
113
114 let link_results = sim.all_link_results_at(*t)?;
115 for link in link_results {
116 flow_values.push(link.flow.abs());
117 velocity_values.push(link.velocity);
118 increment_status_from_sim_value(&mut status, link.status);
119 }
120 }
121
122 Ok(AnalysisArtifact {
123 schema_version: ANALYSIS_SCHEMA_VERSION,
124 source: AnalysisSource {
125 output_file: "network.out".to_string(),
126 report_file: "report.json".to_string(),
127 period_count: times.len(),
128 },
129 distributions: DistributionSet {
130 pressure: make_distribution(&pressure_values, 20),
131 head: make_distribution(&head_values, 20),
132 flow: make_distribution(&flow_values, 20),
133 velocity: make_distribution(&velocity_values, 20),
134 status,
135 },
136 demand_reliability: None,
137 service_compliance: None,
138 })
139}
140
141pub fn build_analysis_artifact_from_out(
143 out_path: &std::path::Path,
144) -> Result<AnalysisArtifact, AnalysisComputeError> {
145 build_analysis_artifact_from_out_with_progress_and_selection(
146 out_path,
147 AnalysisSelection::all(),
148 |_percent, _phase, _processed, _total| {},
149 )
150}
151
152pub fn build_analysis_artifact_from_out_with_progress<F>(
155 out_path: &std::path::Path,
156 on_progress: F,
157) -> Result<AnalysisArtifact, AnalysisComputeError>
158where
159 F: FnMut(f64, &'static str, usize, usize),
160{
161 build_analysis_artifact_from_out_with_progress_and_selection(
162 out_path,
163 AnalysisSelection::all(),
164 on_progress,
165 )
166}
167
168pub fn build_analysis_artifact_from_out_with_progress_and_selection<F>(
175 out_path: &std::path::Path,
176 selection: AnalysisSelection,
177 mut on_progress: F,
178) -> Result<AnalysisArtifact, AnalysisComputeError>
179where
180 F: FnMut(f64, &'static str, usize, usize),
181{
182 let meta = out_reader::read_metadata_checked(out_path)
183 .map_err(|e| AnalysisComputeError::OutRead(e.to_string()))?;
184
185 if meta.n_periods == 0 {
186 return Err(AnalysisComputeError::NoSnapshots);
187 }
188
189 if !selection.any() {
190 on_progress(100.0, "Finalizing analysis artifact", 0, 0);
191 return Ok(AnalysisArtifact {
192 schema_version: ANALYSIS_SCHEMA_VERSION,
193 source: AnalysisSource {
194 output_file: "network.out".to_string(),
195 report_file: "report.json".to_string(),
196 period_count: meta.n_periods,
197 },
198 distributions: DistributionSet::default(),
199 demand_reliability: None,
200 service_compliance: None,
201 });
202 }
203
204 on_progress(0.0, "Scanning periods", 0, meta.n_periods);
205
206 let mut pressure_stats = selection.pressure.then_some(RunningStats::default());
207 let mut head_stats = selection.head.then_some(RunningStats::default());
208 let mut flow_stats = selection.flow.then_some(RunningStats::default());
209 let mut velocity_stats = selection.velocity.then_some(RunningStats::default());
210 let mut status = StatusDistribution::default();
211 let needs_pass2 = selection.any_continuous();
212 let module_finalize_steps = selection.continuous_count();
213 let total_work =
214 (meta.n_periods + if needs_pass2 { meta.n_periods } else { 0 } + module_finalize_steps + 1)
215 as f64;
216
217 for period in 0..meta.n_periods {
219 let pr = out_reader::read_period(out_path, &meta, period)
220 .map_err(AnalysisComputeError::OutRead)?;
221
222 if let Some(stats) = pressure_stats.as_mut() {
223 for v in pr.node_pressure.iter().copied() {
224 stats.observe(v as f64);
225 }
226 }
227 if let Some(stats) = head_stats.as_mut() {
228 for v in pr.node_head.iter().copied() {
229 stats.observe(v as f64);
230 }
231 }
232 if let Some(stats) = flow_stats.as_mut() {
233 for v in pr.link_flow.iter().copied() {
234 stats.observe((v as f64).abs());
235 }
236 }
237 if let Some(stats) = velocity_stats.as_mut() {
238 for v in pr.link_velocity.iter().copied() {
239 stats.observe(v as f64);
240 }
241 }
242 if selection.status {
243 for v in pr.link_status {
244 increment_status_from_out_value(&mut status, v as f64);
245 }
246 }
247
248 let percent = 100.0 * (period + 1) as f64 / total_work;
249 on_progress(percent, "Scanning periods", period + 1, meta.n_periods);
250 }
251
252 let mut pressure_hist = pressure_stats
253 .as_ref()
254 .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
255 let mut head_hist = head_stats
256 .as_ref()
257 .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
258 let mut flow_hist = flow_stats
259 .as_ref()
260 .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
261 let mut velocity_hist = velocity_stats
262 .as_ref()
263 .map(|stats| HistogramAccumulator::new_from_stats(stats, 20));
264
265 if needs_pass2 {
266 on_progress(
268 100.0 * meta.n_periods as f64 / total_work,
269 "Binning periods",
270 0,
271 meta.n_periods,
272 );
273 for period in 0..meta.n_periods {
274 let pr = out_reader::read_period(out_path, &meta, period)
275 .map_err(AnalysisComputeError::OutRead)?;
276
277 if let Some(hist) = pressure_hist.as_mut() {
278 for v in pr.node_pressure {
279 hist.observe(v as f64);
280 }
281 }
282 if let Some(hist) = head_hist.as_mut() {
283 for v in pr.node_head {
284 hist.observe(v as f64);
285 }
286 }
287 if let Some(hist) = flow_hist.as_mut() {
288 for v in pr.link_flow {
289 hist.observe((v as f64).abs());
290 }
291 }
292 if let Some(hist) = velocity_hist.as_mut() {
293 for v in pr.link_velocity {
294 hist.observe(v as f64);
295 }
296 }
297
298 let done_work = meta.n_periods + period + 1;
299 let percent = 100.0 * done_work as f64 / total_work;
300 on_progress(percent, "Binning periods", period + 1, meta.n_periods);
301 }
302 }
303
304 let mut completed_work = meta.n_periods + if needs_pass2 { meta.n_periods } else { 0 };
305 let mut pressure = ContinuousDistribution::default();
306 let mut head = ContinuousDistribution::default();
307 let mut flow = ContinuousDistribution::default();
308 let mut velocity = ContinuousDistribution::default();
309
310 if selection.pressure {
311 on_progress(
312 100.0 * completed_work as f64 / total_work,
313 "Computing pressure distribution",
314 meta.n_periods,
315 meta.n_periods,
316 );
317 if let (Some(hist), Some(stats)) = (pressure_hist.take(), pressure_stats) {
318 pressure = hist.into_distribution(&stats);
319 }
320 completed_work += 1;
321 }
322
323 if selection.head {
324 on_progress(
325 100.0 * completed_work as f64 / total_work,
326 "Computing head distribution",
327 meta.n_periods,
328 meta.n_periods,
329 );
330 if let (Some(hist), Some(stats)) = (head_hist.take(), head_stats) {
331 head = hist.into_distribution(&stats);
332 }
333 completed_work += 1;
334 }
335
336 if selection.flow {
337 on_progress(
338 100.0 * completed_work as f64 / total_work,
339 "Computing flow distribution",
340 meta.n_periods,
341 meta.n_periods,
342 );
343 if let (Some(hist), Some(stats)) = (flow_hist.take(), flow_stats) {
344 flow = hist.into_distribution(&stats);
345 }
346 completed_work += 1;
347 }
348
349 if selection.velocity {
350 on_progress(
351 100.0 * completed_work as f64 / total_work,
352 "Computing velocity distribution",
353 meta.n_periods,
354 meta.n_periods,
355 );
356 if let (Some(hist), Some(stats)) = (velocity_hist.take(), velocity_stats) {
357 velocity = hist.into_distribution(&stats);
358 }
359 completed_work += 1;
360 }
361
362 on_progress(
363 100.0 * completed_work as f64 / total_work,
364 "Finalizing analysis artifact",
365 meta.n_periods,
366 meta.n_periods,
367 );
368
369 Ok(AnalysisArtifact {
370 schema_version: ANALYSIS_SCHEMA_VERSION,
371 source: AnalysisSource {
372 output_file: "network.out".to_string(),
373 report_file: "report.json".to_string(),
374 period_count: meta.n_periods,
375 },
376 distributions: DistributionSet {
377 pressure,
378 head,
379 flow,
380 velocity,
381 status: if selection.status {
382 status
383 } else {
384 StatusDistribution::default()
385 },
386 },
387 demand_reliability: None,
388 service_compliance: None,
389 })
390}
391
392fn increment_status_from_sim_value(status: &mut StatusDistribution, value: f64) {
393 match value.round() as i32 {
394 0 => status.closed += 1,
395 1 => status.open += 1,
396 2 => status.active += 1,
397 _ => status.other += 1,
398 }
399}
400
401fn increment_status_from_out_value(status: &mut StatusDistribution, value: f64) {
402 match value.round() as i32 {
404 3 => status.open += 1,
405 4 | 6 => status.active += 1,
406 0 | 1 | 2 | 7 => status.closed += 1,
407 _ => status.other += 1,
408 }
409}
410
411#[derive(Clone, Copy)]
412struct RunningStats {
413 min: f64,
414 max: f64,
415 sum: f64,
416 count: u64,
417}
418
419impl Default for RunningStats {
420 fn default() -> Self {
421 Self {
422 min: f64::INFINITY,
423 max: f64::NEG_INFINITY,
424 sum: 0.0,
425 count: 0,
426 }
427 }
428}
429
430impl RunningStats {
431 fn observe(&mut self, value: f64) {
432 if value < self.min {
433 self.min = value;
434 }
435 if value > self.max {
436 self.max = value;
437 }
438 self.sum += value;
439 self.count += 1;
440 }
441
442 fn mean(&self) -> f64 {
443 if self.count == 0 {
444 0.0
445 } else {
446 self.sum / self.count as f64
447 }
448 }
449}
450
451struct HistogramAccumulator {
452 min: f64,
453 max: f64,
454 width: f64,
455 counts: Vec<u64>,
456}
457
458impl HistogramAccumulator {
459 fn new_from_stats(stats: &RunningStats, n_bins: usize) -> Self {
460 let bins = n_bins.max(1);
461 let span = (stats.max - stats.min).abs();
462 let width = if stats.count == 0 || span <= f64::EPSILON {
463 1.0
464 } else {
465 (stats.max - stats.min) / bins as f64
466 };
467 Self {
468 min: if stats.count == 0 { 0.0 } else { stats.min },
469 max: if stats.count == 0 { 0.0 } else { stats.max },
470 width,
471 counts: vec![0; bins],
472 }
473 }
474
475 fn observe(&mut self, value: f64) {
476 if self.counts.is_empty() {
477 return;
478 }
479 if (self.max - self.min).abs() <= f64::EPSILON {
480 self.counts[0] += 1;
481 return;
482 }
483 let mut idx = ((value - self.min) / self.width).floor() as isize;
484 if idx < 0 {
485 idx = 0;
486 }
487 if idx as usize >= self.counts.len() {
488 idx = self.counts.len() as isize - 1;
489 }
490 self.counts[idx as usize] += 1;
491 }
492
493 fn into_distribution(self, stats: &RunningStats) -> ContinuousDistribution {
494 if stats.count == 0 {
495 return ContinuousDistribution::default();
496 }
497
498 if (stats.max - stats.min).abs() <= f64::EPSILON {
499 return ContinuousDistribution {
500 bins: vec![HistogramBin {
501 start: stats.min,
502 end: stats.max,
503 count: stats.count,
504 }],
505 summary: SummaryStats {
506 min: stats.min,
507 max: stats.max,
508 mean: stats.mean(),
509 p05: stats.min,
510 p25: stats.min,
511 p50: stats.min,
512 p75: stats.min,
513 p95: stats.min,
514 },
515 thresholds: None,
516 };
517 }
518
519 let bins: Vec<HistogramBin> = self
520 .counts
521 .iter()
522 .enumerate()
523 .map(|(i, &count)| HistogramBin {
524 start: self.min + i as f64 * self.width,
525 end: self.min + (i + 1) as f64 * self.width,
526 count,
527 })
528 .collect();
529
530 let p05 = approx_quantile_from_bins(&bins, 0.05, stats.min, stats.max);
531 let p25 = approx_quantile_from_bins(&bins, 0.25, stats.min, stats.max);
532 let p50 = approx_quantile_from_bins(&bins, 0.50, stats.min, stats.max);
533 let p75 = approx_quantile_from_bins(&bins, 0.75, stats.min, stats.max);
534 let p95 = approx_quantile_from_bins(&bins, 0.95, stats.min, stats.max);
535
536 ContinuousDistribution {
537 bins,
538 summary: SummaryStats {
539 min: stats.min,
540 max: stats.max,
541 mean: stats.mean(),
542 p05,
543 p25,
544 p50,
545 p75,
546 p95,
547 },
548 thresholds: None,
549 }
550 }
551}
552
553fn approx_quantile_from_bins(bins: &[HistogramBin], q: f64, min: f64, max: f64) -> f64 {
554 if bins.is_empty() {
555 return 0.0;
556 }
557 let total: u64 = bins.iter().map(|b| b.count).sum();
558 if total == 0 {
559 return 0.0;
560 }
561
562 let target = (q.clamp(0.0, 1.0) * (total.saturating_sub(1)) as f64).round() as u64;
563 let mut cumulative = 0u64;
564 for bin in bins {
565 let next = cumulative + bin.count;
566 if target < next {
567 if bin.count == 0 {
568 return bin.start;
569 }
570 let offset_in_bin = target.saturating_sub(cumulative) as f64 / bin.count as f64;
571 let estimate = bin.start + (bin.end - bin.start) * offset_in_bin;
572 return estimate.clamp(min, max);
573 }
574 cumulative = next;
575 }
576
577 max
578}
579
580fn make_distribution(values: &[f64], n_bins: usize) -> ContinuousDistribution {
581 if values.is_empty() {
582 return ContinuousDistribution::default();
583 }
584
585 let mut sorted = values.to_vec();
586 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
587
588 let min = sorted[0];
589 let max = sorted[sorted.len() - 1];
590 let mean = sorted.iter().sum::<f64>() / sorted.len() as f64;
591
592 let bins = if (max - min).abs() <= f64::EPSILON {
593 vec![HistogramBin {
594 start: min,
595 end: max,
596 count: sorted.len() as u64,
597 }]
598 } else {
599 let width = (max - min) / n_bins as f64;
600 let mut counts = vec![0u64; n_bins];
601 for value in &sorted {
602 let mut idx = ((value - min) / width).floor() as usize;
603 if idx >= n_bins {
604 idx = n_bins - 1;
605 }
606 counts[idx] += 1;
607 }
608
609 (0..n_bins)
610 .map(|i| HistogramBin {
611 start: min + i as f64 * width,
612 end: min + (i + 1) as f64 * width,
613 count: counts[i],
614 })
615 .collect()
616 };
617
618 ContinuousDistribution {
619 bins,
620 summary: SummaryStats {
621 min,
622 max,
623 mean,
624 p05: quantile(&sorted, 0.05),
625 p25: quantile(&sorted, 0.25),
626 p50: quantile(&sorted, 0.50),
627 p75: quantile(&sorted, 0.75),
628 p95: quantile(&sorted, 0.95),
629 },
630 thresholds: None,
631 }
632}
633
634fn quantile(sorted: &[f64], q: f64) -> f64 {
635 if sorted.is_empty() {
636 return 0.0;
637 }
638 if sorted.len() == 1 {
639 return sorted[0];
640 }
641 let clamped_q = q.clamp(0.0, 1.0);
642 let pos = clamped_q * (sorted.len() - 1) as f64;
643 let lo = pos.floor() as usize;
644 let hi = pos.ceil() as usize;
645 if lo == hi {
646 sorted[lo]
647 } else {
648 let w = pos - lo as f64;
649 sorted[lo] * (1.0 - w) + sorted[hi] * w
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn quantile_interpolates() {
659 let s = vec![1.0, 2.0, 3.0, 4.0];
660 assert!((quantile(&s, 0.5) - 2.5).abs() < 1e-12);
661 }
662
663 #[test]
664 fn analysis_estimate_increases_with_periods() {
665 let selection = AnalysisSelection::all();
666 let short = estimate_analysis_runtime_millis(500, 600, 24, selection);
667 let long = estimate_analysis_runtime_millis(500, 600, 240, selection);
668 assert!(long >= short);
669 }
670
671 #[test]
672 fn analysis_estimate_increases_with_selected_modules() {
673 let fast = AnalysisSelection {
674 pressure: true,
675 head: false,
676 flow: false,
677 velocity: false,
678 status: true,
679 };
680 let full = AnalysisSelection::all();
681 let fast_est = estimate_analysis_runtime_millis(1_200, 1_400, 96, fast);
682 let full_est = estimate_analysis_runtime_millis(1_200, 1_400, 96, full);
683 assert!(full_est >= fast_est);
684 }
685
686 #[test]
687 fn extreme_analysis_case_maps_to_high_effort() {
688 let full = AnalysisSelection::all();
689 let effort = estimate_analysis_runtime_millis(100_000, 120_000, 20_000, full);
690 assert_eq!(effort, RuntimeEstimate::High);
691 }
692}