1use std::collections::{BTreeMap, HashMap};
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, Context, Result};
6use csv::{StringRecord, Writer};
7use dsfb::sim::SimConfig;
8use dsfb_add::SimulationConfig;
9use rayon::prelude::*;
10
11use crate::config::{DscdScalingConfig, DscdSweepConfig, OutputPaths};
12use crate::graph::{
13 add_trust_gated_edge_with_provenance, reachable_from, DscdEdge, DscdGraph, Event, EventId,
14};
15use crate::integrations::{
16 compute_structural_growth_for_dscd, generate_dscd_events_from_dsfb, DscdObserverSample,
17};
18
19#[derive(Debug, Clone)]
20pub struct ThresholdRecord {
21 pub tau: f64,
22 pub expansion_ratio: f64,
23 pub reachable_size: usize,
24 pub s_infty: Option<f64>,
25}
26
27#[derive(Debug, Clone, Copy)]
28struct ScalingSummaryRow {
29 n: usize,
30 tau_star: f64,
31 width_0_1_to_0_9: f64,
32 max_derivative: f64,
33}
34
35#[derive(Debug, Clone, Copy, Default)]
36struct ThresholdEvalOptions {
37 start: Option<EventId>,
38 max_depth: Option<usize>,
39 s_infty: Option<f64>,
40 progress_start: Option<usize>,
41 progress_end: Option<usize>,
42}
43
44#[derive(Debug, Clone, Copy)]
45struct CandidateEdge {
46 from: EventId,
47 to: EventId,
48 observer_id: u32,
49 trust: f64,
50 rewrite_rule_id: u32,
51}
52
53pub fn build_graph_from_samples(
54 events: &[Event],
55 samples: &[DscdObserverSample],
56 trust_threshold: f64,
57) -> DscdGraph {
58 let candidate_edges = prepare_candidate_edges(samples);
59 build_graph_from_candidate_edges(events, &candidate_edges, trust_threshold)
60}
61
62fn build_graph_from_candidate_edges(
63 events: &[Event],
64 candidate_edges: &[CandidateEdge],
65 trust_threshold: f64,
66) -> DscdGraph {
67 let mut graph = DscdGraph::default();
68 for event in events {
69 graph.add_event(event.clone());
70 }
71
72 for candidate in candidate_edges {
73 add_trust_gated_edge_with_provenance(
74 &mut graph,
75 candidate.from,
76 candidate.to,
77 candidate.observer_id,
78 candidate.trust,
79 trust_threshold,
80 candidate.rewrite_rule_id,
81 );
82 }
83
84 graph
85}
86
87fn prepare_candidate_edges(samples: &[DscdObserverSample]) -> Vec<CandidateEdge> {
88 let mut by_observer: BTreeMap<u32, Vec<&DscdObserverSample>> = BTreeMap::new();
89 for sample in samples {
90 by_observer
91 .entry(sample.observer_id)
92 .or_default()
93 .push(sample);
94 }
95
96 let mut candidate_edges = Vec::new();
97 for (observer_id, samples_for_observer) in &mut by_observer {
98 samples_for_observer.sort_by_key(|sample| sample.event_id);
99 for pair in samples_for_observer.windows(2) {
100 candidate_edges.push(CandidateEdge {
101 from: EventId(pair[0].event_id),
102 to: EventId(pair[1].event_id),
103 observer_id: *observer_id,
104 trust: pair[1].trust,
105 rewrite_rule_id: pair[0].rewrite_rule_id,
106 });
107 }
108 }
109
110 candidate_edges
111}
112
113pub fn run_trust_threshold_sweep(
114 cfg: &DscdSweepConfig,
115 dsfb_cfg: &SimConfig,
116 add_cfg: &SimulationConfig,
117 output_paths: &OutputPaths,
118) -> Result<Vec<ThresholdRecord>> {
119 cfg.validate()?;
120
121 report_progress(2, "generating DSCD events from DSFB");
122 let event_batch = generate_dscd_events_from_dsfb(dsfb_cfg, cfg.dsfb_params, cfg.num_events)?;
123 report_progress(15, format!("generated {} events", event_batch.events.len()));
124
125 report_progress(20, "computing structural growth baseline");
126 let growth = compute_structural_growth_for_dscd(add_cfg)?;
127
128 let tau_grid = cfg.tau_grid();
129 let tau_total = tau_grid.len();
130 report_progress(25, format!("prepared tau grid with {tau_total} steps"));
131
132 let start = event_batch.events.first().map(|event| event.id);
133 let first_tau = tau_grid[0];
134
135 report_progress(28, "writing graph event and edge snapshots");
136 let base_graph = build_graph_from_samples(
137 &event_batch.events,
138 &event_batch.observer_samples,
139 first_tau,
140 );
141 write_graph_events_csv(
142 &output_paths.run_dir.join("graph_events.csv"),
143 &event_batch.events,
144 &event_batch.observer_samples,
145 )?;
146 write_graph_edges_csv(
147 &output_paths.run_dir.join("graph_edges.csv"),
148 first_tau,
149 &base_graph.edges,
150 )?;
151
152 if let Some(edge) = base_graph.edges.first() {
153 let _ = write_edge_provenance_csv(
154 &output_paths.run_dir,
155 edge.edge_id,
156 &base_graph.edges,
157 &event_batch.observer_samples,
158 )?;
159 }
160
161 report_progress(30, "running trust-threshold sweep");
162 let records = compute_threshold_records(
163 &event_batch.events,
164 &event_batch.observer_samples,
165 &tau_grid,
166 ThresholdEvalOptions {
167 start,
168 max_depth: cfg.max_depth,
169 s_infty: Some(growth.s_infty),
170 progress_start: Some(30),
171 progress_end: Some(95),
172 },
173 );
174
175 report_progress(97, "writing threshold_sweep.csv");
176 write_threshold_sweep_csv(&output_paths.run_dir.join("threshold_sweep.csv"), &records)?;
177
178 report_progress(98, "writing finite_size_scaling.csv");
179 let transition_width = compute_width_0_1_to_0_9(&records);
180 let max_derivative = compute_max_derivative(&records).unwrap_or(0.0);
181 write_finite_size_scaling_csv(
182 &output_paths.run_dir.join("finite_size_scaling.csv"),
183 cfg.num_events,
184 transition_width,
185 max_derivative,
186 )?;
187
188 report_progress(100, "DSCD sweep complete");
189 Ok(records)
190}
191
192pub fn run_threshold_scaling(config: &DscdScalingConfig, output_dir: &Path) -> Result<()> {
197 config.validate()?;
198 fs::create_dir_all(output_dir).with_context(|| {
199 format!(
200 "failed to create scaling output dir {}",
201 output_dir.display()
202 )
203 })?;
204
205 let mut summary_rows = Vec::with_capacity(config.event_counts.len());
206 let representative_n = config.event_counts.last().copied().unwrap_or(0);
207 for (idx, &n) in config.event_counts.iter().enumerate() {
208 report_progress(
209 5 + ((idx + 1) * 60) / config.event_counts.len(),
210 format!(
211 "scaling sweep for N={n} ({}/{})",
212 idx + 1,
213 config.event_counts.len()
214 ),
215 );
216
217 let dsfb_cfg = SimConfig {
218 steps: n,
219 ..SimConfig::default()
220 };
221 let event_batch = generate_dscd_events_from_dsfb(&dsfb_cfg, config.dsfb_params, n)?;
222 let base_graph = build_graph_from_samples(
223 &event_batch.events,
224 &event_batch.observer_samples,
225 config.tau_grid[0],
226 );
227
228 let records = compute_threshold_records(
229 &event_batch.events,
230 &event_batch.observer_samples,
231 &config.tau_grid,
232 ThresholdEvalOptions {
233 start: Some(config.initial_event),
234 max_depth: Some(config.max_path_length),
235 s_infty: None,
236 progress_start: None,
237 progress_end: None,
238 },
239 );
240
241 write_threshold_curve_csv(
242 &output_dir.join(format!("threshold_curve_N_{n}.csv")),
243 &records,
244 )?;
245
246 let row = ScalingSummaryRow {
247 n,
248 tau_star: compute_tau_star(&records, config.critical_fraction),
249 width_0_1_to_0_9: compute_width_0_1_to_0_9(&records),
250 max_derivative: compute_max_derivative(&records).unwrap_or(0.0),
251 };
252 summary_rows.push(row);
253
254 if n == representative_n {
255 write_graph_events_csv(
256 &output_dir.join("graph_events.csv"),
257 &event_batch.events,
258 &event_batch.observer_samples,
259 )?;
260 write_graph_edges_csv(
261 &output_dir.join("graph_edges.csv"),
262 config.tau_grid[0],
263 &base_graph.edges,
264 )?;
265 if let Some(edge) = base_graph.edges.first() {
266 let _ = write_edge_provenance_csv(
267 output_dir,
268 edge.edge_id,
269 &base_graph.edges,
270 &event_batch.observer_samples,
271 )?;
272 }
273 }
274 }
275
276 report_progress(92, "writing threshold_scaling_summary.csv");
277 write_threshold_scaling_summary_csv(
278 &output_dir.join("threshold_scaling_summary.csv"),
279 &summary_rows,
280 )?;
281 write_finite_size_scaling_series_csv(
282 &output_dir.join("finite_size_scaling.csv"),
283 &summary_rows,
284 )?;
285 report_progress(100, "threshold scaling complete");
286
287 Ok(())
288}
289
290pub fn export_edge_provenance_by_edge_id(run_dir: &Path, edge_id: u64) -> Result<PathBuf> {
294 let edge_row = load_edge_row_by_id(run_dir, edge_id)?;
295 let source_event_row =
296 load_source_event_row(run_dir, edge_row.source_event_id, edge_row.observer_id)?;
297 write_edge_provenance_row(run_dir, edge_row, source_event_row)
298}
299
300pub fn export_edge_provenance_by_endpoints(
302 run_dir: &Path,
303 source_event_id: EventId,
304 target_event_id: EventId,
305) -> Result<PathBuf> {
306 let edge_row = load_edge_row_by_endpoints(run_dir, source_event_id.0, target_event_id.0)?;
307 let source_event_row =
308 load_source_event_row(run_dir, edge_row.source_event_id, edge_row.observer_id)?;
309 write_edge_provenance_row(run_dir, edge_row, source_event_row)
310}
311
312fn compute_threshold_records(
313 events: &[Event],
314 samples: &[DscdObserverSample],
315 tau_grid: &[f64],
316 options: ThresholdEvalOptions,
317) -> Vec<ThresholdRecord> {
318 let candidate_edges = prepare_candidate_edges(samples);
319
320 if options.progress_start.is_none() && options.progress_end.is_none() {
321 let worker_threads = scaling_worker_threads();
322 if worker_threads > 1 && tau_grid.len() > 1 {
323 if let Ok(pool) = rayon::ThreadPoolBuilder::new()
324 .num_threads(worker_threads)
325 .build()
326 {
327 return pool.install(|| {
328 tau_grid
329 .par_iter()
330 .copied()
331 .map(|tau| {
332 compute_threshold_record_for_tau(events, &candidate_edges, tau, options)
333 })
334 .collect()
335 });
336 }
337 }
338
339 return tau_grid
340 .iter()
341 .copied()
342 .map(|tau| compute_threshold_record_for_tau(events, &candidate_edges, tau, options))
343 .collect();
344 }
345
346 let mut records = Vec::with_capacity(tau_grid.len());
347 let mut last_reported = options.progress_start.unwrap_or(0).saturating_sub(1);
348
349 for (idx, tau) in tau_grid.iter().copied().enumerate() {
350 records.push(compute_threshold_record_for_tau(
351 events,
352 &candidate_edges,
353 tau,
354 options,
355 ));
356
357 if let (Some(start_pct), Some(end_pct)) = (options.progress_start, options.progress_end) {
358 let span = end_pct.saturating_sub(start_pct);
359 let progress = start_pct + ((idx + 1) * span) / tau_grid.len().max(1);
360 if progress > last_reported {
361 report_progress(progress, format!("tau step {}/{}", idx + 1, tau_grid.len()));
362 last_reported = progress;
363 }
364 }
365 }
366
367 records
368}
369
370fn compute_threshold_record_for_tau(
371 events: &[Event],
372 candidate_edges: &[CandidateEdge],
373 tau: f64,
374 options: ThresholdEvalOptions,
375) -> ThresholdRecord {
376 let graph = build_graph_from_candidate_edges(events, candidate_edges, tau);
377 let reachable_size = options
378 .start
379 .map(|start_event| reachable_from(&graph, start_event, options.max_depth).len())
380 .unwrap_or(0);
381 let expansion_ratio = if events.is_empty() {
382 0.0
383 } else {
384 reachable_size as f64 / events.len() as f64
385 };
386
387 ThresholdRecord {
388 tau,
389 expansion_ratio,
390 reachable_size,
391 s_infty: options.s_infty,
392 }
393}
394
395fn scaling_worker_threads() -> usize {
396 let available = std::thread::available_parallelism()
397 .map(|value| value.get())
398 .unwrap_or(1);
399 let default_threads = available.clamp(1, 8);
400
401 std::env::var("DSFB_DSCD_THREADS")
402 .ok()
403 .and_then(|raw| raw.parse::<usize>().ok())
404 .map(|requested| requested.clamp(1, available))
405 .unwrap_or(default_threads)
406}
407
408fn report_progress(percent: usize, message: impl AsRef<str>) {
409 let pct = percent.min(100);
410 eprintln!("[{pct:>3}%] {}", message.as_ref());
411}
412
413fn write_graph_events_csv(
414 path: &Path,
415 events: &[Event],
416 samples: &[DscdObserverSample],
417) -> Result<()> {
418 let mut writer = Writer::from_path(path)?;
419 writer.write_record([
420 "event_id",
421 "time_index",
422 "observer_id",
423 "residual_state",
424 "rewrite_rule_id",
425 "trust_value",
426 "residual_summary",
427 "rewrite_rule_label",
428 "trust_profile",
429 "envelope_ok",
430 "timestamp",
431 "structural_tag",
432 ])?;
433
434 let event_by_id: HashMap<u64, &Event> =
435 events.iter().map(|event| (event.id.0, event)).collect();
436
437 for sample in samples {
438 let event_meta = event_by_id.get(&sample.event_id).copied();
439 writer.write_record([
440 sample.event_id.to_string(),
441 sample.time_index.to_string(),
442 sample.observer_id.to_string(),
443 sample.residual_state.as_str().to_string(),
444 sample.rewrite_rule_id.to_string(),
445 sample.trust.to_string(),
446 sample.residual_summary.to_string(),
447 sample.rewrite_rule_label.to_string(),
448 sample.trust_profile.as_str().to_string(),
449 sample.envelope_ok.to_string(),
450 event_meta
451 .and_then(|event| event.timestamp)
452 .map(|value| value.to_string())
453 .unwrap_or_default(),
454 event_meta
455 .and_then(|event| event.structural_tag)
456 .map(|value| value.to_string())
457 .unwrap_or_default(),
458 ])?;
459 }
460
461 writer.flush()?;
462 Ok(())
463}
464
465fn write_graph_edges_csv(path: &Path, tau_threshold: f64, edges: &[DscdEdge]) -> Result<()> {
466 let mut writer = Writer::from_path(path)?;
467 writer.write_record([
468 "edge_id",
469 "source_event_id",
470 "target_event_id",
471 "from_event_id",
472 "to_event_id",
473 "observer_id",
474 "trust_at_creation",
475 "trust_value",
476 "rewrite_rule_at_source",
477 "tau_threshold",
478 ])?;
479
480 for edge in edges {
481 writer.write_record([
482 edge.edge_id.to_string(),
483 edge.from.0.to_string(),
484 edge.to.0.to_string(),
485 edge.from.0.to_string(),
486 edge.to.0.to_string(),
487 edge.observer_id.to_string(),
488 edge.trust_at_creation.to_string(),
489 edge.trust_value.to_string(),
490 edge.rewrite_rule_at_source.to_string(),
491 tau_threshold.to_string(),
492 ])?;
493 }
494
495 writer.flush()?;
496 Ok(())
497}
498
499fn write_threshold_sweep_csv(path: &Path, records: &[ThresholdRecord]) -> Result<()> {
500 let mut writer = Writer::from_path(path)?;
501 writer.write_record(["tau", "expansion_ratio", "reachable_size", "s_infty"])?;
502
503 for record in records {
504 writer.write_record([
505 record.tau.to_string(),
506 record.expansion_ratio.to_string(),
507 record.reachable_size.to_string(),
508 record
509 .s_infty
510 .map(|value| value.to_string())
511 .unwrap_or_default(),
512 ])?;
513 }
514
515 writer.flush()?;
516 Ok(())
517}
518
519fn write_threshold_curve_csv(path: &Path, records: &[ThresholdRecord]) -> Result<()> {
520 let mut writer = Writer::from_path(path)?;
521 writer.write_record(["tau", "expansion_ratio"])?;
522 for record in records {
523 writer.write_record([record.tau.to_string(), record.expansion_ratio.to_string()])?;
524 }
525 writer.flush()?;
526 Ok(())
527}
528
529fn write_threshold_scaling_summary_csv(path: &Path, rows: &[ScalingSummaryRow]) -> Result<()> {
530 let mut writer = Writer::from_path(path)?;
531 writer.write_record(["N", "tau_star", "width_0_1_to_0_9", "max_derivative"])?;
532 for row in rows {
533 writer.write_record([
534 row.n.to_string(),
535 row.tau_star.to_string(),
536 row.width_0_1_to_0_9.to_string(),
537 row.max_derivative.to_string(),
538 ])?;
539 }
540 writer.flush()?;
541 Ok(())
542}
543
544fn compute_tau_star(records: &[ThresholdRecord], critical_fraction: f64) -> f64 {
545 interpolate_tau_at_or_below(records, critical_fraction)
546 .or_else(|| records.last().map(|record| record.tau))
547 .unwrap_or(0.0)
548}
549
550fn compute_width_0_1_to_0_9(records: &[ThresholdRecord]) -> f64 {
551 let tau_0_9 = interpolate_tau_at_or_below(records, 0.9)
552 .or_else(|| records.first().map(|record| record.tau))
553 .unwrap_or(0.0);
554 let tau_0_1 = interpolate_tau_at_or_below(records, 0.1)
555 .or_else(|| records.last().map(|record| record.tau))
556 .unwrap_or(tau_0_9);
557 (tau_0_1 - tau_0_9).max(0.0)
558}
559
560fn interpolate_tau_at_or_below(records: &[ThresholdRecord], threshold: f64) -> Option<f64> {
561 if records.is_empty() {
562 return None;
563 }
564
565 if records[0].expansion_ratio <= threshold {
566 return Some(records[0].tau);
567 }
568
569 for pair in records.windows(2) {
570 let prev = &pair[0];
571 let curr = &pair[1];
572 if curr.expansion_ratio <= threshold {
573 let drho = curr.expansion_ratio - prev.expansion_ratio;
574 if drho.abs() <= f64::EPSILON {
575 return Some(curr.tau);
576 }
577 let alpha = (threshold - prev.expansion_ratio) / drho;
578 return Some(prev.tau + alpha * (curr.tau - prev.tau));
579 }
580 }
581
582 None
583}
584
585fn compute_max_derivative(records: &[ThresholdRecord]) -> Option<f64> {
586 let mut max_derivative: Option<f64> = None;
587 for pair in records.windows(2) {
588 let dtau = pair[1].tau - pair[0].tau;
589 if dtau.abs() <= f64::EPSILON {
590 continue;
591 }
592
593 let derivative = ((pair[1].expansion_ratio - pair[0].expansion_ratio) / dtau).abs();
594 if !derivative.is_finite() {
595 continue;
596 }
597
598 max_derivative = Some(match max_derivative {
599 Some(current) => current.max(derivative),
600 None => derivative,
601 });
602 }
603
604 max_derivative
605}
606
607fn write_finite_size_scaling_csv(
608 path: &Path,
609 num_events: usize,
610 transition_width: f64,
611 max_derivative: f64,
612) -> Result<()> {
613 let mut writer = Writer::from_path(path)?;
614 writer.write_record([
615 "num_events",
616 "transition_width",
617 "width_0_1_to_0_9",
618 "max_derivative",
619 ])?;
620 writer.write_record([
621 num_events.to_string(),
622 transition_width.to_string(),
623 transition_width.to_string(),
624 max_derivative.to_string(),
625 ])?;
626 writer.flush()?;
627 Ok(())
628}
629
630fn write_finite_size_scaling_series_csv(path: &Path, rows: &[ScalingSummaryRow]) -> Result<()> {
631 let mut writer = Writer::from_path(path)?;
632 writer.write_record([
633 "num_events",
634 "transition_width",
635 "width_0_1_to_0_9",
636 "max_derivative",
637 ])?;
638 for row in rows {
639 writer.write_record([
640 row.n.to_string(),
641 row.width_0_1_to_0_9.to_string(),
642 row.width_0_1_to_0_9.to_string(),
643 row.max_derivative.to_string(),
644 ])?;
645 }
646 writer.flush()?;
647 Ok(())
648}
649
650#[derive(Debug, Clone, Copy)]
651struct EdgeCsvRow {
652 edge_id: u64,
653 source_event_id: u64,
654 target_event_id: u64,
655 observer_id: u32,
656 trust_at_creation: f64,
657 rewrite_rule_at_source: u32,
658}
659
660#[derive(Debug, Clone)]
661struct EventCsvRow {
662 time_index: String,
663 residual_state: String,
664 rewrite_rule_id: String,
665 residual_summary: String,
666 trust_value: String,
667}
668
669fn write_edge_provenance_csv(
670 run_dir: &Path,
671 edge_id: u64,
672 edges: &[DscdEdge],
673 samples: &[DscdObserverSample],
674) -> Result<PathBuf> {
675 let edge = edges
676 .iter()
677 .find(|edge| edge.edge_id == edge_id)
678 .ok_or_else(|| anyhow!("edge_id {} not found", edge_id))?;
679
680 let source_sample = samples
681 .iter()
682 .find(|sample| sample.event_id == edge.from.0 && sample.observer_id == edge.observer_id);
683
684 let path = run_dir.join(format!("edge_provenance_{}.csv", edge_id));
685 let mut writer = Writer::from_path(&path)?;
686 writer.write_record([
687 "edge_id",
688 "source_event_id",
689 "target_event_id",
690 "observer_id",
691 "trust_at_creation",
692 "rewrite_rule_at_source",
693 "time_index",
694 "residual_state",
695 "rewrite_rule_id",
696 "residual_summary",
697 "trust_value",
698 ])?;
699
700 writer.write_record([
701 edge.edge_id.to_string(),
702 edge.from.0.to_string(),
703 edge.to.0.to_string(),
704 edge.observer_id.to_string(),
705 edge.trust_at_creation.to_string(),
706 edge.rewrite_rule_at_source.to_string(),
707 source_sample
708 .map(|sample| sample.time_index.to_string())
709 .unwrap_or_default(),
710 source_sample
711 .map(|sample| sample.residual_state.as_str().to_string())
712 .unwrap_or_default(),
713 source_sample
714 .map(|sample| sample.rewrite_rule_id.to_string())
715 .unwrap_or_default(),
716 source_sample
717 .map(|sample| sample.residual_summary.to_string())
718 .unwrap_or_default(),
719 source_sample
720 .map(|sample| sample.trust.to_string())
721 .unwrap_or_default(),
722 ])?;
723
724 writer.flush()?;
725 Ok(path)
726}
727
728fn load_edge_row_by_id(run_dir: &Path, edge_id: u64) -> Result<EdgeCsvRow> {
729 let path = run_dir.join("graph_edges.csv");
730 let mut reader = csv::Reader::from_path(&path)
731 .with_context(|| format!("failed to open {}", path.display()))?;
732 let headers = reader.headers()?.clone();
733
734 let edge_id_idx = header_index(&headers, "edge_id")?;
735 let source_idx = header_index(&headers, "source_event_id")?;
736 let target_idx = header_index(&headers, "target_event_id")?;
737 let observer_idx = header_index(&headers, "observer_id")?;
738 let trust_idx = header_index(&headers, "trust_at_creation")?;
739 let rewrite_idx = header_index(&headers, "rewrite_rule_at_source")?;
740
741 for row in reader.records() {
742 let row = row?;
743 if parse_u64(&row, edge_id_idx)? == edge_id {
744 return Ok(EdgeCsvRow {
745 edge_id,
746 source_event_id: parse_u64(&row, source_idx)?,
747 target_event_id: parse_u64(&row, target_idx)?,
748 observer_id: parse_u32(&row, observer_idx)?,
749 trust_at_creation: parse_f64(&row, trust_idx)?,
750 rewrite_rule_at_source: parse_u32(&row, rewrite_idx)?,
751 });
752 }
753 }
754
755 Err(anyhow!(
756 "edge_id {} not found in {}",
757 edge_id,
758 path.display()
759 ))
760}
761
762fn load_edge_row_by_endpoints(
763 run_dir: &Path,
764 source_event_id: u64,
765 target_event_id: u64,
766) -> Result<EdgeCsvRow> {
767 let path = run_dir.join("graph_edges.csv");
768 let mut reader = csv::Reader::from_path(&path)
769 .with_context(|| format!("failed to open {}", path.display()))?;
770 let headers = reader.headers()?.clone();
771
772 let edge_id_idx = header_index(&headers, "edge_id")?;
773 let source_idx = header_index(&headers, "source_event_id")?;
774 let target_idx = header_index(&headers, "target_event_id")?;
775 let observer_idx = header_index(&headers, "observer_id")?;
776 let trust_idx = header_index(&headers, "trust_at_creation")?;
777 let rewrite_idx = header_index(&headers, "rewrite_rule_at_source")?;
778
779 for row in reader.records() {
780 let row = row?;
781 if parse_u64(&row, source_idx)? == source_event_id
782 && parse_u64(&row, target_idx)? == target_event_id
783 {
784 return Ok(EdgeCsvRow {
785 edge_id: parse_u64(&row, edge_id_idx)?,
786 source_event_id,
787 target_event_id,
788 observer_id: parse_u32(&row, observer_idx)?,
789 trust_at_creation: parse_f64(&row, trust_idx)?,
790 rewrite_rule_at_source: parse_u32(&row, rewrite_idx)?,
791 });
792 }
793 }
794
795 Err(anyhow!(
796 "edge ({}, {}) not found in {}",
797 source_event_id,
798 target_event_id,
799 path.display()
800 ))
801}
802
803fn load_source_event_row(
804 run_dir: &Path,
805 source_event_id: u64,
806 observer_id: u32,
807) -> Result<Option<EventCsvRow>> {
808 let path = run_dir.join("graph_events.csv");
809 let mut reader = csv::Reader::from_path(&path)
810 .with_context(|| format!("failed to open {}", path.display()))?;
811 let headers = reader.headers()?.clone();
812
813 let event_id_idx = header_index(&headers, "event_id")?;
814 let observer_idx = header_index(&headers, "observer_id")?;
815 let time_idx = header_index(&headers, "time_index")?;
816 let state_idx = header_index(&headers, "residual_state")?;
817 let rewrite_idx = header_index(&headers, "rewrite_rule_id")?;
818 let residual_idx = header_index(&headers, "residual_summary")?;
819 let trust_idx = header_index(&headers, "trust_value")?;
820
821 for row in reader.records() {
822 let row = row?;
823 if parse_u64(&row, event_id_idx)? == source_event_id
824 && parse_u32(&row, observer_idx)? == observer_id
825 {
826 return Ok(Some(EventCsvRow {
827 time_index: row.get(time_idx).unwrap_or_default().to_string(),
828 residual_state: row.get(state_idx).unwrap_or_default().to_string(),
829 rewrite_rule_id: row.get(rewrite_idx).unwrap_or_default().to_string(),
830 residual_summary: row.get(residual_idx).unwrap_or_default().to_string(),
831 trust_value: row.get(trust_idx).unwrap_or_default().to_string(),
832 }));
833 }
834 }
835
836 Ok(None)
837}
838
839fn write_edge_provenance_row(
840 run_dir: &Path,
841 edge_row: EdgeCsvRow,
842 event_row: Option<EventCsvRow>,
843) -> Result<PathBuf> {
844 let path = run_dir.join(format!("edge_provenance_{}.csv", edge_row.edge_id));
845 let mut writer = Writer::from_path(&path)?;
846 writer.write_record([
847 "edge_id",
848 "source_event_id",
849 "target_event_id",
850 "observer_id",
851 "trust_at_creation",
852 "rewrite_rule_at_source",
853 "time_index",
854 "residual_state",
855 "rewrite_rule_id",
856 "residual_summary",
857 "trust_value",
858 ])?;
859
860 writer.write_record([
861 edge_row.edge_id.to_string(),
862 edge_row.source_event_id.to_string(),
863 edge_row.target_event_id.to_string(),
864 edge_row.observer_id.to_string(),
865 edge_row.trust_at_creation.to_string(),
866 edge_row.rewrite_rule_at_source.to_string(),
867 event_row
868 .as_ref()
869 .map(|row| row.time_index.clone())
870 .unwrap_or_default(),
871 event_row
872 .as_ref()
873 .map(|row| row.residual_state.clone())
874 .unwrap_or_default(),
875 event_row
876 .as_ref()
877 .map(|row| row.rewrite_rule_id.clone())
878 .unwrap_or_default(),
879 event_row
880 .as_ref()
881 .map(|row| row.residual_summary.clone())
882 .unwrap_or_default(),
883 event_row
884 .as_ref()
885 .map(|row| row.trust_value.clone())
886 .unwrap_or_default(),
887 ])?;
888
889 writer.flush()?;
890 Ok(path)
891}
892
893fn header_index(headers: &StringRecord, name: &str) -> Result<usize> {
894 headers
895 .iter()
896 .position(|column| column == name)
897 .ok_or_else(|| anyhow!("missing column '{}'", name))
898}
899
900fn parse_u64(record: &StringRecord, idx: usize) -> Result<u64> {
901 record
902 .get(idx)
903 .ok_or_else(|| anyhow!("missing field index {}", idx))?
904 .parse::<u64>()
905 .with_context(|| format!("failed to parse u64 at index {}", idx))
906}
907
908fn parse_u32(record: &StringRecord, idx: usize) -> Result<u32> {
909 record
910 .get(idx)
911 .ok_or_else(|| anyhow!("missing field index {}", idx))?
912 .parse::<u32>()
913 .with_context(|| format!("failed to parse u32 at index {}", idx))
914}
915
916fn parse_f64(record: &StringRecord, idx: usize) -> Result<f64> {
917 record
918 .get(idx)
919 .ok_or_else(|| anyhow!("missing field index {}", idx))?
920 .parse::<f64>()
921 .with_context(|| format!("failed to parse f64 at index {}", idx))
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927 use std::time::{SystemTime, UNIX_EPOCH};
928
929 use crate::integrations::{ResidualState, TrustProfile};
930
931 fn toy_events() -> Vec<Event> {
932 (0..5_u64)
933 .map(|raw_id| Event {
934 id: EventId(raw_id),
935 timestamp: Some(raw_id as f64),
936 structural_tag: Some(raw_id as f64 * 0.1),
937 })
938 .collect()
939 }
940
941 fn toy_samples() -> Vec<DscdObserverSample> {
942 vec![
943 DscdObserverSample {
944 event_id: 0,
945 time_index: 0,
946 observer_id: 0,
947 trust: 1.0,
948 residual_summary: 0.0,
949 residual_state: ResidualState::Low,
950 rewrite_rule_id: 0,
951 rewrite_rule_label: "stable_envelope",
952 trust_profile: TrustProfile::Medium,
953 envelope_ok: true,
954 },
955 DscdObserverSample {
956 event_id: 1,
957 time_index: 1,
958 observer_id: 0,
959 trust: 0.8,
960 residual_summary: 0.1,
961 residual_state: ResidualState::Low,
962 rewrite_rule_id: 0,
963 rewrite_rule_label: "stable_envelope",
964 trust_profile: TrustProfile::Medium,
965 envelope_ok: true,
966 },
967 DscdObserverSample {
968 event_id: 2,
969 time_index: 2,
970 observer_id: 0,
971 trust: 0.6,
972 residual_summary: 0.2,
973 residual_state: ResidualState::Medium,
974 rewrite_rule_id: 1,
975 rewrite_rule_label: "moderate_envelope",
976 trust_profile: TrustProfile::Medium,
977 envelope_ok: true,
978 },
979 DscdObserverSample {
980 event_id: 3,
981 time_index: 3,
982 observer_id: 0,
983 trust: 0.3,
984 residual_summary: 0.3,
985 residual_state: ResidualState::High,
986 rewrite_rule_id: 2,
987 rewrite_rule_label: "high_residual_recovery",
988 trust_profile: TrustProfile::Medium,
989 envelope_ok: true,
990 },
991 DscdObserverSample {
992 event_id: 4,
993 time_index: 4,
994 observer_id: 0,
995 trust: 0.1,
996 residual_summary: 0.4,
997 residual_state: ResidualState::High,
998 rewrite_rule_id: 3,
999 rewrite_rule_label: "envelope_decay",
1000 trust_profile: TrustProfile::Medium,
1001 envelope_ok: false,
1002 },
1003 ]
1004 }
1005
1006 #[test]
1007 fn stricter_thresholds_do_not_increase_reachability() {
1008 let events = toy_events();
1009 let samples = toy_samples();
1010 let taus = [0.0, 0.25, 0.5, 0.75];
1011
1012 let ratios: Vec<f64> = taus
1013 .iter()
1014 .map(|tau| {
1015 let graph = build_graph_from_samples(&events, &samples, *tau);
1016 reachable_from(&graph, EventId(0), None).len() as f64 / events.len() as f64
1017 })
1018 .collect();
1019
1020 assert!(ratios.windows(2).all(|pair| pair[0] + 1.0e-12 >= pair[1]));
1021 }
1022
1023 #[test]
1024 fn finite_size_metrics_are_computed_from_threshold_records() {
1025 let records = vec![
1026 ThresholdRecord {
1027 tau: 0.0,
1028 expansion_ratio: 1.0,
1029 reachable_size: 0,
1030 s_infty: None,
1031 },
1032 ThresholdRecord {
1033 tau: 0.2,
1034 expansion_ratio: 0.8,
1035 reachable_size: 0,
1036 s_infty: None,
1037 },
1038 ThresholdRecord {
1039 tau: 0.4,
1040 expansion_ratio: 0.2,
1041 reachable_size: 0,
1042 s_infty: None,
1043 },
1044 ThresholdRecord {
1045 tau: 0.6,
1046 expansion_ratio: 0.0,
1047 reachable_size: 0,
1048 s_infty: None,
1049 },
1050 ];
1051
1052 let width = compute_width_0_1_to_0_9(&records);
1053 let max_derivative = compute_max_derivative(&records).expect("max derivative");
1054
1055 assert!((width - 0.4).abs() < 1.0e-12);
1056 assert!((max_derivative - 3.0).abs() < 1.0e-12);
1057 }
1058
1059 #[test]
1060 fn scaling_writes_summary_and_curves() {
1061 let now = SystemTime::now()
1062 .duration_since(UNIX_EPOCH)
1063 .expect("clock")
1064 .as_nanos();
1065 let out_dir = std::env::temp_dir().join(format!("dsfb-dscd-scaling-test-{}", now));
1066
1067 let cfg = DscdScalingConfig {
1068 event_counts: vec![64, 128],
1069 tau_grid: vec![0.0, 0.5, 1.0],
1070 initial_event: EventId(0),
1071 max_path_length: usize::MAX,
1072 critical_fraction: 0.5,
1073 ..DscdScalingConfig::default()
1074 };
1075
1076 run_threshold_scaling(&cfg, &out_dir).expect("scaling should run");
1077
1078 let summary_path = out_dir.join("threshold_scaling_summary.csv");
1079 assert!(summary_path.exists());
1080 assert!(out_dir.join("threshold_curve_N_64.csv").exists());
1081 assert!(out_dir.join("threshold_curve_N_128.csv").exists());
1082
1083 let mut reader = csv::Reader::from_path(&summary_path).expect("open summary");
1084 let rows: Vec<_> = reader
1085 .records()
1086 .collect::<std::result::Result<_, _>>()
1087 .expect("rows");
1088 assert_eq!(rows.len(), 2);
1089
1090 let tau_star: f64 = rows[0][1].parse().expect("tau_star parse");
1091 let width: f64 = rows[0][2].parse().expect("width parse");
1092 assert!((0.0..=1.0).contains(&tau_star));
1093 assert!(width >= 0.0);
1094
1095 let _ = fs::remove_dir_all(&out_dir);
1096 }
1097}