Skip to main content

dsfb_dscd/
sweep.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use anyhow::{anyhow, Context, Result};
6use csv::{StringRecord, Writer};
7use dsfb::sim::SimConfig;
8use dsfb_add::SimulationConfig;
9use rayon::prelude::*;
10
11use crate::config::{DscdScalingConfig, DscdSweepConfig, OutputPaths};
12use crate::graph::{
13    add_trust_gated_edge_with_provenance, reachable_from, DscdEdge, DscdGraph, Event, EventId,
14};
15use crate::integrations::{
16    compute_structural_growth_for_dscd, generate_dscd_events_from_dsfb, DscdObserverSample,
17};
18
19#[derive(Debug, Clone)]
20pub struct ThresholdRecord {
21    pub tau: f64,
22    pub expansion_ratio: f64,
23    pub reachable_size: usize,
24    pub s_infty: Option<f64>,
25}
26
27#[derive(Debug, Clone, Copy)]
28struct ScalingSummaryRow {
29    n: usize,
30    tau_star: f64,
31    width_0_1_to_0_9: f64,
32    max_derivative: f64,
33}
34
35#[derive(Debug, Clone, Copy, Default)]
36struct ThresholdEvalOptions {
37    start: Option<EventId>,
38    max_depth: Option<usize>,
39    s_infty: Option<f64>,
40    progress_start: Option<usize>,
41    progress_end: Option<usize>,
42}
43
44#[derive(Debug, Clone, Copy)]
45struct CandidateEdge {
46    from: EventId,
47    to: EventId,
48    observer_id: u32,
49    trust: f64,
50    rewrite_rule_id: u32,
51}
52
53pub fn build_graph_from_samples(
54    events: &[Event],
55    samples: &[DscdObserverSample],
56    trust_threshold: f64,
57) -> DscdGraph {
58    let candidate_edges = prepare_candidate_edges(samples);
59    build_graph_from_candidate_edges(events, &candidate_edges, trust_threshold)
60}
61
62fn build_graph_from_candidate_edges(
63    events: &[Event],
64    candidate_edges: &[CandidateEdge],
65    trust_threshold: f64,
66) -> DscdGraph {
67    let mut graph = DscdGraph::default();
68    for event in events {
69        graph.add_event(event.clone());
70    }
71
72    for candidate in candidate_edges {
73        add_trust_gated_edge_with_provenance(
74            &mut graph,
75            candidate.from,
76            candidate.to,
77            candidate.observer_id,
78            candidate.trust,
79            trust_threshold,
80            candidate.rewrite_rule_id,
81        );
82    }
83
84    graph
85}
86
87fn prepare_candidate_edges(samples: &[DscdObserverSample]) -> Vec<CandidateEdge> {
88    let mut by_observer: BTreeMap<u32, Vec<&DscdObserverSample>> = BTreeMap::new();
89    for sample in samples {
90        by_observer
91            .entry(sample.observer_id)
92            .or_default()
93            .push(sample);
94    }
95
96    let mut candidate_edges = Vec::new();
97    for (observer_id, samples_for_observer) in &mut by_observer {
98        samples_for_observer.sort_by_key(|sample| sample.event_id);
99        for pair in samples_for_observer.windows(2) {
100            candidate_edges.push(CandidateEdge {
101                from: EventId(pair[0].event_id),
102                to: EventId(pair[1].event_id),
103                observer_id: *observer_id,
104                trust: pair[1].trust,
105                rewrite_rule_id: pair[0].rewrite_rule_id,
106            });
107        }
108    }
109
110    candidate_edges
111}
112
113pub fn run_trust_threshold_sweep(
114    cfg: &DscdSweepConfig,
115    dsfb_cfg: &SimConfig,
116    add_cfg: &SimulationConfig,
117    output_paths: &OutputPaths,
118) -> Result<Vec<ThresholdRecord>> {
119    cfg.validate()?;
120
121    report_progress(2, "generating DSCD events from DSFB");
122    let event_batch = generate_dscd_events_from_dsfb(dsfb_cfg, cfg.dsfb_params, cfg.num_events)?;
123    report_progress(15, format!("generated {} events", event_batch.events.len()));
124
125    report_progress(20, "computing structural growth baseline");
126    let growth = compute_structural_growth_for_dscd(add_cfg)?;
127
128    let tau_grid = cfg.tau_grid();
129    let tau_total = tau_grid.len();
130    report_progress(25, format!("prepared tau grid with {tau_total} steps"));
131
132    let start = event_batch.events.first().map(|event| event.id);
133    let first_tau = tau_grid[0];
134
135    report_progress(28, "writing graph event and edge snapshots");
136    let base_graph = build_graph_from_samples(
137        &event_batch.events,
138        &event_batch.observer_samples,
139        first_tau,
140    );
141    write_graph_events_csv(
142        &output_paths.run_dir.join("graph_events.csv"),
143        &event_batch.events,
144        &event_batch.observer_samples,
145    )?;
146    write_graph_edges_csv(
147        &output_paths.run_dir.join("graph_edges.csv"),
148        first_tau,
149        &base_graph.edges,
150    )?;
151
152    if let Some(edge) = base_graph.edges.first() {
153        let _ = write_edge_provenance_csv(
154            &output_paths.run_dir,
155            edge.edge_id,
156            &base_graph.edges,
157            &event_batch.observer_samples,
158        )?;
159    }
160
161    report_progress(30, "running trust-threshold sweep");
162    let records = compute_threshold_records(
163        &event_batch.events,
164        &event_batch.observer_samples,
165        &tau_grid,
166        ThresholdEvalOptions {
167            start,
168            max_depth: cfg.max_depth,
169            s_infty: Some(growth.s_infty),
170            progress_start: Some(30),
171            progress_end: Some(95),
172        },
173    );
174
175    report_progress(97, "writing threshold_sweep.csv");
176    write_threshold_sweep_csv(&output_paths.run_dir.join("threshold_sweep.csv"), &records)?;
177
178    report_progress(98, "writing finite_size_scaling.csv");
179    let transition_width = compute_width_0_1_to_0_9(&records);
180    let max_derivative = compute_max_derivative(&records).unwrap_or(0.0);
181    write_finite_size_scaling_csv(
182        &output_paths.run_dir.join("finite_size_scaling.csv"),
183        cfg.num_events,
184        transition_width,
185        max_derivative,
186    )?;
187
188    report_progress(100, "DSCD sweep complete");
189    Ok(records)
190}
191
192/// Run deterministic finite-size scaling for the DSCD threshold transition.
193///
194/// This routine is fully deterministic (no randomness) and is used to build the
195/// scaling outputs that support DSCD threshold-sharpening analysis (Theorem 4).
196pub fn run_threshold_scaling(config: &DscdScalingConfig, output_dir: &Path) -> Result<()> {
197    config.validate()?;
198    fs::create_dir_all(output_dir).with_context(|| {
199        format!(
200            "failed to create scaling output dir {}",
201            output_dir.display()
202        )
203    })?;
204
205    let mut summary_rows = Vec::with_capacity(config.event_counts.len());
206    let representative_n = config.event_counts.last().copied().unwrap_or(0);
207    for (idx, &n) in config.event_counts.iter().enumerate() {
208        report_progress(
209            5 + ((idx + 1) * 60) / config.event_counts.len(),
210            format!(
211                "scaling sweep for N={n} ({}/{})",
212                idx + 1,
213                config.event_counts.len()
214            ),
215        );
216
217        let dsfb_cfg = SimConfig {
218            steps: n,
219            ..SimConfig::default()
220        };
221        let event_batch = generate_dscd_events_from_dsfb(&dsfb_cfg, config.dsfb_params, n)?;
222        let base_graph = build_graph_from_samples(
223            &event_batch.events,
224            &event_batch.observer_samples,
225            config.tau_grid[0],
226        );
227
228        let records = compute_threshold_records(
229            &event_batch.events,
230            &event_batch.observer_samples,
231            &config.tau_grid,
232            ThresholdEvalOptions {
233                start: Some(config.initial_event),
234                max_depth: Some(config.max_path_length),
235                s_infty: None,
236                progress_start: None,
237                progress_end: None,
238            },
239        );
240
241        write_threshold_curve_csv(
242            &output_dir.join(format!("threshold_curve_N_{n}.csv")),
243            &records,
244        )?;
245
246        let row = ScalingSummaryRow {
247            n,
248            tau_star: compute_tau_star(&records, config.critical_fraction),
249            width_0_1_to_0_9: compute_width_0_1_to_0_9(&records),
250            max_derivative: compute_max_derivative(&records).unwrap_or(0.0),
251        };
252        summary_rows.push(row);
253
254        if n == representative_n {
255            write_graph_events_csv(
256                &output_dir.join("graph_events.csv"),
257                &event_batch.events,
258                &event_batch.observer_samples,
259            )?;
260            write_graph_edges_csv(
261                &output_dir.join("graph_edges.csv"),
262                config.tau_grid[0],
263                &base_graph.edges,
264            )?;
265            if let Some(edge) = base_graph.edges.first() {
266                let _ = write_edge_provenance_csv(
267                    output_dir,
268                    edge.edge_id,
269                    &base_graph.edges,
270                    &event_batch.observer_samples,
271                )?;
272            }
273        }
274    }
275
276    report_progress(92, "writing threshold_scaling_summary.csv");
277    write_threshold_scaling_summary_csv(
278        &output_dir.join("threshold_scaling_summary.csv"),
279        &summary_rows,
280    )?;
281    write_finite_size_scaling_series_csv(
282        &output_dir.join("finite_size_scaling.csv"),
283        &summary_rows,
284    )?;
285    report_progress(100, "threshold scaling complete");
286
287    Ok(())
288}
289
290/// Export full provenance for an edge identified by `edge_id`.
291///
292/// The output is written to `edge_provenance_<edge_id>.csv` in `run_dir`.
293pub fn export_edge_provenance_by_edge_id(run_dir: &Path, edge_id: u64) -> Result<PathBuf> {
294    let edge_row = load_edge_row_by_id(run_dir, edge_id)?;
295    let source_event_row =
296        load_source_event_row(run_dir, edge_row.source_event_id, edge_row.observer_id)?;
297    write_edge_provenance_row(run_dir, edge_row, source_event_row)
298}
299
300/// Export full provenance for an edge identified by `(source_event_id, target_event_id)`.
301pub fn export_edge_provenance_by_endpoints(
302    run_dir: &Path,
303    source_event_id: EventId,
304    target_event_id: EventId,
305) -> Result<PathBuf> {
306    let edge_row = load_edge_row_by_endpoints(run_dir, source_event_id.0, target_event_id.0)?;
307    let source_event_row =
308        load_source_event_row(run_dir, edge_row.source_event_id, edge_row.observer_id)?;
309    write_edge_provenance_row(run_dir, edge_row, source_event_row)
310}
311
312fn compute_threshold_records(
313    events: &[Event],
314    samples: &[DscdObserverSample],
315    tau_grid: &[f64],
316    options: ThresholdEvalOptions,
317) -> Vec<ThresholdRecord> {
318    let candidate_edges = prepare_candidate_edges(samples);
319
320    if options.progress_start.is_none() && options.progress_end.is_none() {
321        let worker_threads = scaling_worker_threads();
322        if worker_threads > 1 && tau_grid.len() > 1 {
323            if let Ok(pool) = rayon::ThreadPoolBuilder::new()
324                .num_threads(worker_threads)
325                .build()
326            {
327                return pool.install(|| {
328                    tau_grid
329                        .par_iter()
330                        .copied()
331                        .map(|tau| {
332                            compute_threshold_record_for_tau(events, &candidate_edges, tau, options)
333                        })
334                        .collect()
335                });
336            }
337        }
338
339        return tau_grid
340            .iter()
341            .copied()
342            .map(|tau| compute_threshold_record_for_tau(events, &candidate_edges, tau, options))
343            .collect();
344    }
345
346    let mut records = Vec::with_capacity(tau_grid.len());
347    let mut last_reported = options.progress_start.unwrap_or(0).saturating_sub(1);
348
349    for (idx, tau) in tau_grid.iter().copied().enumerate() {
350        records.push(compute_threshold_record_for_tau(
351            events,
352            &candidate_edges,
353            tau,
354            options,
355        ));
356
357        if let (Some(start_pct), Some(end_pct)) = (options.progress_start, options.progress_end) {
358            let span = end_pct.saturating_sub(start_pct);
359            let progress = start_pct + ((idx + 1) * span) / tau_grid.len().max(1);
360            if progress > last_reported {
361                report_progress(progress, format!("tau step {}/{}", idx + 1, tau_grid.len()));
362                last_reported = progress;
363            }
364        }
365    }
366
367    records
368}
369
370fn compute_threshold_record_for_tau(
371    events: &[Event],
372    candidate_edges: &[CandidateEdge],
373    tau: f64,
374    options: ThresholdEvalOptions,
375) -> ThresholdRecord {
376    let graph = build_graph_from_candidate_edges(events, candidate_edges, tau);
377    let reachable_size = options
378        .start
379        .map(|start_event| reachable_from(&graph, start_event, options.max_depth).len())
380        .unwrap_or(0);
381    let expansion_ratio = if events.is_empty() {
382        0.0
383    } else {
384        reachable_size as f64 / events.len() as f64
385    };
386
387    ThresholdRecord {
388        tau,
389        expansion_ratio,
390        reachable_size,
391        s_infty: options.s_infty,
392    }
393}
394
395fn scaling_worker_threads() -> usize {
396    let available = std::thread::available_parallelism()
397        .map(|value| value.get())
398        .unwrap_or(1);
399    let default_threads = available.clamp(1, 8);
400
401    std::env::var("DSFB_DSCD_THREADS")
402        .ok()
403        .and_then(|raw| raw.parse::<usize>().ok())
404        .map(|requested| requested.clamp(1, available))
405        .unwrap_or(default_threads)
406}
407
408fn report_progress(percent: usize, message: impl AsRef<str>) {
409    let pct = percent.min(100);
410    eprintln!("[{pct:>3}%] {}", message.as_ref());
411}
412
413fn write_graph_events_csv(
414    path: &Path,
415    events: &[Event],
416    samples: &[DscdObserverSample],
417) -> Result<()> {
418    let mut writer = Writer::from_path(path)?;
419    writer.write_record([
420        "event_id",
421        "time_index",
422        "observer_id",
423        "residual_state",
424        "rewrite_rule_id",
425        "trust_value",
426        "residual_summary",
427        "rewrite_rule_label",
428        "trust_profile",
429        "envelope_ok",
430        "timestamp",
431        "structural_tag",
432    ])?;
433
434    let event_by_id: HashMap<u64, &Event> =
435        events.iter().map(|event| (event.id.0, event)).collect();
436
437    for sample in samples {
438        let event_meta = event_by_id.get(&sample.event_id).copied();
439        writer.write_record([
440            sample.event_id.to_string(),
441            sample.time_index.to_string(),
442            sample.observer_id.to_string(),
443            sample.residual_state.as_str().to_string(),
444            sample.rewrite_rule_id.to_string(),
445            sample.trust.to_string(),
446            sample.residual_summary.to_string(),
447            sample.rewrite_rule_label.to_string(),
448            sample.trust_profile.as_str().to_string(),
449            sample.envelope_ok.to_string(),
450            event_meta
451                .and_then(|event| event.timestamp)
452                .map(|value| value.to_string())
453                .unwrap_or_default(),
454            event_meta
455                .and_then(|event| event.structural_tag)
456                .map(|value| value.to_string())
457                .unwrap_or_default(),
458        ])?;
459    }
460
461    writer.flush()?;
462    Ok(())
463}
464
465fn write_graph_edges_csv(path: &Path, tau_threshold: f64, edges: &[DscdEdge]) -> Result<()> {
466    let mut writer = Writer::from_path(path)?;
467    writer.write_record([
468        "edge_id",
469        "source_event_id",
470        "target_event_id",
471        "from_event_id",
472        "to_event_id",
473        "observer_id",
474        "trust_at_creation",
475        "trust_value",
476        "rewrite_rule_at_source",
477        "tau_threshold",
478    ])?;
479
480    for edge in edges {
481        writer.write_record([
482            edge.edge_id.to_string(),
483            edge.from.0.to_string(),
484            edge.to.0.to_string(),
485            edge.from.0.to_string(),
486            edge.to.0.to_string(),
487            edge.observer_id.to_string(),
488            edge.trust_at_creation.to_string(),
489            edge.trust_value.to_string(),
490            edge.rewrite_rule_at_source.to_string(),
491            tau_threshold.to_string(),
492        ])?;
493    }
494
495    writer.flush()?;
496    Ok(())
497}
498
499fn write_threshold_sweep_csv(path: &Path, records: &[ThresholdRecord]) -> Result<()> {
500    let mut writer = Writer::from_path(path)?;
501    writer.write_record(["tau", "expansion_ratio", "reachable_size", "s_infty"])?;
502
503    for record in records {
504        writer.write_record([
505            record.tau.to_string(),
506            record.expansion_ratio.to_string(),
507            record.reachable_size.to_string(),
508            record
509                .s_infty
510                .map(|value| value.to_string())
511                .unwrap_or_default(),
512        ])?;
513    }
514
515    writer.flush()?;
516    Ok(())
517}
518
519fn write_threshold_curve_csv(path: &Path, records: &[ThresholdRecord]) -> Result<()> {
520    let mut writer = Writer::from_path(path)?;
521    writer.write_record(["tau", "expansion_ratio"])?;
522    for record in records {
523        writer.write_record([record.tau.to_string(), record.expansion_ratio.to_string()])?;
524    }
525    writer.flush()?;
526    Ok(())
527}
528
529fn write_threshold_scaling_summary_csv(path: &Path, rows: &[ScalingSummaryRow]) -> Result<()> {
530    let mut writer = Writer::from_path(path)?;
531    writer.write_record(["N", "tau_star", "width_0_1_to_0_9", "max_derivative"])?;
532    for row in rows {
533        writer.write_record([
534            row.n.to_string(),
535            row.tau_star.to_string(),
536            row.width_0_1_to_0_9.to_string(),
537            row.max_derivative.to_string(),
538        ])?;
539    }
540    writer.flush()?;
541    Ok(())
542}
543
544fn compute_tau_star(records: &[ThresholdRecord], critical_fraction: f64) -> f64 {
545    interpolate_tau_at_or_below(records, critical_fraction)
546        .or_else(|| records.last().map(|record| record.tau))
547        .unwrap_or(0.0)
548}
549
550fn compute_width_0_1_to_0_9(records: &[ThresholdRecord]) -> f64 {
551    let tau_0_9 = interpolate_tau_at_or_below(records, 0.9)
552        .or_else(|| records.first().map(|record| record.tau))
553        .unwrap_or(0.0);
554    let tau_0_1 = interpolate_tau_at_or_below(records, 0.1)
555        .or_else(|| records.last().map(|record| record.tau))
556        .unwrap_or(tau_0_9);
557    (tau_0_1 - tau_0_9).max(0.0)
558}
559
560fn interpolate_tau_at_or_below(records: &[ThresholdRecord], threshold: f64) -> Option<f64> {
561    if records.is_empty() {
562        return None;
563    }
564
565    if records[0].expansion_ratio <= threshold {
566        return Some(records[0].tau);
567    }
568
569    for pair in records.windows(2) {
570        let prev = &pair[0];
571        let curr = &pair[1];
572        if curr.expansion_ratio <= threshold {
573            let drho = curr.expansion_ratio - prev.expansion_ratio;
574            if drho.abs() <= f64::EPSILON {
575                return Some(curr.tau);
576            }
577            let alpha = (threshold - prev.expansion_ratio) / drho;
578            return Some(prev.tau + alpha * (curr.tau - prev.tau));
579        }
580    }
581
582    None
583}
584
585fn compute_max_derivative(records: &[ThresholdRecord]) -> Option<f64> {
586    let mut max_derivative: Option<f64> = None;
587    for pair in records.windows(2) {
588        let dtau = pair[1].tau - pair[0].tau;
589        if dtau.abs() <= f64::EPSILON {
590            continue;
591        }
592
593        let derivative = ((pair[1].expansion_ratio - pair[0].expansion_ratio) / dtau).abs();
594        if !derivative.is_finite() {
595            continue;
596        }
597
598        max_derivative = Some(match max_derivative {
599            Some(current) => current.max(derivative),
600            None => derivative,
601        });
602    }
603
604    max_derivative
605}
606
607fn write_finite_size_scaling_csv(
608    path: &Path,
609    num_events: usize,
610    transition_width: f64,
611    max_derivative: f64,
612) -> Result<()> {
613    let mut writer = Writer::from_path(path)?;
614    writer.write_record([
615        "num_events",
616        "transition_width",
617        "width_0_1_to_0_9",
618        "max_derivative",
619    ])?;
620    writer.write_record([
621        num_events.to_string(),
622        transition_width.to_string(),
623        transition_width.to_string(),
624        max_derivative.to_string(),
625    ])?;
626    writer.flush()?;
627    Ok(())
628}
629
630fn write_finite_size_scaling_series_csv(path: &Path, rows: &[ScalingSummaryRow]) -> Result<()> {
631    let mut writer = Writer::from_path(path)?;
632    writer.write_record([
633        "num_events",
634        "transition_width",
635        "width_0_1_to_0_9",
636        "max_derivative",
637    ])?;
638    for row in rows {
639        writer.write_record([
640            row.n.to_string(),
641            row.width_0_1_to_0_9.to_string(),
642            row.width_0_1_to_0_9.to_string(),
643            row.max_derivative.to_string(),
644        ])?;
645    }
646    writer.flush()?;
647    Ok(())
648}
649
650#[derive(Debug, Clone, Copy)]
651struct EdgeCsvRow {
652    edge_id: u64,
653    source_event_id: u64,
654    target_event_id: u64,
655    observer_id: u32,
656    trust_at_creation: f64,
657    rewrite_rule_at_source: u32,
658}
659
660#[derive(Debug, Clone)]
661struct EventCsvRow {
662    time_index: String,
663    residual_state: String,
664    rewrite_rule_id: String,
665    residual_summary: String,
666    trust_value: String,
667}
668
669fn write_edge_provenance_csv(
670    run_dir: &Path,
671    edge_id: u64,
672    edges: &[DscdEdge],
673    samples: &[DscdObserverSample],
674) -> Result<PathBuf> {
675    let edge = edges
676        .iter()
677        .find(|edge| edge.edge_id == edge_id)
678        .ok_or_else(|| anyhow!("edge_id {} not found", edge_id))?;
679
680    let source_sample = samples
681        .iter()
682        .find(|sample| sample.event_id == edge.from.0 && sample.observer_id == edge.observer_id);
683
684    let path = run_dir.join(format!("edge_provenance_{}.csv", edge_id));
685    let mut writer = Writer::from_path(&path)?;
686    writer.write_record([
687        "edge_id",
688        "source_event_id",
689        "target_event_id",
690        "observer_id",
691        "trust_at_creation",
692        "rewrite_rule_at_source",
693        "time_index",
694        "residual_state",
695        "rewrite_rule_id",
696        "residual_summary",
697        "trust_value",
698    ])?;
699
700    writer.write_record([
701        edge.edge_id.to_string(),
702        edge.from.0.to_string(),
703        edge.to.0.to_string(),
704        edge.observer_id.to_string(),
705        edge.trust_at_creation.to_string(),
706        edge.rewrite_rule_at_source.to_string(),
707        source_sample
708            .map(|sample| sample.time_index.to_string())
709            .unwrap_or_default(),
710        source_sample
711            .map(|sample| sample.residual_state.as_str().to_string())
712            .unwrap_or_default(),
713        source_sample
714            .map(|sample| sample.rewrite_rule_id.to_string())
715            .unwrap_or_default(),
716        source_sample
717            .map(|sample| sample.residual_summary.to_string())
718            .unwrap_or_default(),
719        source_sample
720            .map(|sample| sample.trust.to_string())
721            .unwrap_or_default(),
722    ])?;
723
724    writer.flush()?;
725    Ok(path)
726}
727
728fn load_edge_row_by_id(run_dir: &Path, edge_id: u64) -> Result<EdgeCsvRow> {
729    let path = run_dir.join("graph_edges.csv");
730    let mut reader = csv::Reader::from_path(&path)
731        .with_context(|| format!("failed to open {}", path.display()))?;
732    let headers = reader.headers()?.clone();
733
734    let edge_id_idx = header_index(&headers, "edge_id")?;
735    let source_idx = header_index(&headers, "source_event_id")?;
736    let target_idx = header_index(&headers, "target_event_id")?;
737    let observer_idx = header_index(&headers, "observer_id")?;
738    let trust_idx = header_index(&headers, "trust_at_creation")?;
739    let rewrite_idx = header_index(&headers, "rewrite_rule_at_source")?;
740
741    for row in reader.records() {
742        let row = row?;
743        if parse_u64(&row, edge_id_idx)? == edge_id {
744            return Ok(EdgeCsvRow {
745                edge_id,
746                source_event_id: parse_u64(&row, source_idx)?,
747                target_event_id: parse_u64(&row, target_idx)?,
748                observer_id: parse_u32(&row, observer_idx)?,
749                trust_at_creation: parse_f64(&row, trust_idx)?,
750                rewrite_rule_at_source: parse_u32(&row, rewrite_idx)?,
751            });
752        }
753    }
754
755    Err(anyhow!(
756        "edge_id {} not found in {}",
757        edge_id,
758        path.display()
759    ))
760}
761
762fn load_edge_row_by_endpoints(
763    run_dir: &Path,
764    source_event_id: u64,
765    target_event_id: u64,
766) -> Result<EdgeCsvRow> {
767    let path = run_dir.join("graph_edges.csv");
768    let mut reader = csv::Reader::from_path(&path)
769        .with_context(|| format!("failed to open {}", path.display()))?;
770    let headers = reader.headers()?.clone();
771
772    let edge_id_idx = header_index(&headers, "edge_id")?;
773    let source_idx = header_index(&headers, "source_event_id")?;
774    let target_idx = header_index(&headers, "target_event_id")?;
775    let observer_idx = header_index(&headers, "observer_id")?;
776    let trust_idx = header_index(&headers, "trust_at_creation")?;
777    let rewrite_idx = header_index(&headers, "rewrite_rule_at_source")?;
778
779    for row in reader.records() {
780        let row = row?;
781        if parse_u64(&row, source_idx)? == source_event_id
782            && parse_u64(&row, target_idx)? == target_event_id
783        {
784            return Ok(EdgeCsvRow {
785                edge_id: parse_u64(&row, edge_id_idx)?,
786                source_event_id,
787                target_event_id,
788                observer_id: parse_u32(&row, observer_idx)?,
789                trust_at_creation: parse_f64(&row, trust_idx)?,
790                rewrite_rule_at_source: parse_u32(&row, rewrite_idx)?,
791            });
792        }
793    }
794
795    Err(anyhow!(
796        "edge ({}, {}) not found in {}",
797        source_event_id,
798        target_event_id,
799        path.display()
800    ))
801}
802
803fn load_source_event_row(
804    run_dir: &Path,
805    source_event_id: u64,
806    observer_id: u32,
807) -> Result<Option<EventCsvRow>> {
808    let path = run_dir.join("graph_events.csv");
809    let mut reader = csv::Reader::from_path(&path)
810        .with_context(|| format!("failed to open {}", path.display()))?;
811    let headers = reader.headers()?.clone();
812
813    let event_id_idx = header_index(&headers, "event_id")?;
814    let observer_idx = header_index(&headers, "observer_id")?;
815    let time_idx = header_index(&headers, "time_index")?;
816    let state_idx = header_index(&headers, "residual_state")?;
817    let rewrite_idx = header_index(&headers, "rewrite_rule_id")?;
818    let residual_idx = header_index(&headers, "residual_summary")?;
819    let trust_idx = header_index(&headers, "trust_value")?;
820
821    for row in reader.records() {
822        let row = row?;
823        if parse_u64(&row, event_id_idx)? == source_event_id
824            && parse_u32(&row, observer_idx)? == observer_id
825        {
826            return Ok(Some(EventCsvRow {
827                time_index: row.get(time_idx).unwrap_or_default().to_string(),
828                residual_state: row.get(state_idx).unwrap_or_default().to_string(),
829                rewrite_rule_id: row.get(rewrite_idx).unwrap_or_default().to_string(),
830                residual_summary: row.get(residual_idx).unwrap_or_default().to_string(),
831                trust_value: row.get(trust_idx).unwrap_or_default().to_string(),
832            }));
833        }
834    }
835
836    Ok(None)
837}
838
839fn write_edge_provenance_row(
840    run_dir: &Path,
841    edge_row: EdgeCsvRow,
842    event_row: Option<EventCsvRow>,
843) -> Result<PathBuf> {
844    let path = run_dir.join(format!("edge_provenance_{}.csv", edge_row.edge_id));
845    let mut writer = Writer::from_path(&path)?;
846    writer.write_record([
847        "edge_id",
848        "source_event_id",
849        "target_event_id",
850        "observer_id",
851        "trust_at_creation",
852        "rewrite_rule_at_source",
853        "time_index",
854        "residual_state",
855        "rewrite_rule_id",
856        "residual_summary",
857        "trust_value",
858    ])?;
859
860    writer.write_record([
861        edge_row.edge_id.to_string(),
862        edge_row.source_event_id.to_string(),
863        edge_row.target_event_id.to_string(),
864        edge_row.observer_id.to_string(),
865        edge_row.trust_at_creation.to_string(),
866        edge_row.rewrite_rule_at_source.to_string(),
867        event_row
868            .as_ref()
869            .map(|row| row.time_index.clone())
870            .unwrap_or_default(),
871        event_row
872            .as_ref()
873            .map(|row| row.residual_state.clone())
874            .unwrap_or_default(),
875        event_row
876            .as_ref()
877            .map(|row| row.rewrite_rule_id.clone())
878            .unwrap_or_default(),
879        event_row
880            .as_ref()
881            .map(|row| row.residual_summary.clone())
882            .unwrap_or_default(),
883        event_row
884            .as_ref()
885            .map(|row| row.trust_value.clone())
886            .unwrap_or_default(),
887    ])?;
888
889    writer.flush()?;
890    Ok(path)
891}
892
893fn header_index(headers: &StringRecord, name: &str) -> Result<usize> {
894    headers
895        .iter()
896        .position(|column| column == name)
897        .ok_or_else(|| anyhow!("missing column '{}'", name))
898}
899
900fn parse_u64(record: &StringRecord, idx: usize) -> Result<u64> {
901    record
902        .get(idx)
903        .ok_or_else(|| anyhow!("missing field index {}", idx))?
904        .parse::<u64>()
905        .with_context(|| format!("failed to parse u64 at index {}", idx))
906}
907
908fn parse_u32(record: &StringRecord, idx: usize) -> Result<u32> {
909    record
910        .get(idx)
911        .ok_or_else(|| anyhow!("missing field index {}", idx))?
912        .parse::<u32>()
913        .with_context(|| format!("failed to parse u32 at index {}", idx))
914}
915
916fn parse_f64(record: &StringRecord, idx: usize) -> Result<f64> {
917    record
918        .get(idx)
919        .ok_or_else(|| anyhow!("missing field index {}", idx))?
920        .parse::<f64>()
921        .with_context(|| format!("failed to parse f64 at index {}", idx))
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use std::time::{SystemTime, UNIX_EPOCH};
928
929    use crate::integrations::{ResidualState, TrustProfile};
930
931    fn toy_events() -> Vec<Event> {
932        (0..5_u64)
933            .map(|raw_id| Event {
934                id: EventId(raw_id),
935                timestamp: Some(raw_id as f64),
936                structural_tag: Some(raw_id as f64 * 0.1),
937            })
938            .collect()
939    }
940
941    fn toy_samples() -> Vec<DscdObserverSample> {
942        vec![
943            DscdObserverSample {
944                event_id: 0,
945                time_index: 0,
946                observer_id: 0,
947                trust: 1.0,
948                residual_summary: 0.0,
949                residual_state: ResidualState::Low,
950                rewrite_rule_id: 0,
951                rewrite_rule_label: "stable_envelope",
952                trust_profile: TrustProfile::Medium,
953                envelope_ok: true,
954            },
955            DscdObserverSample {
956                event_id: 1,
957                time_index: 1,
958                observer_id: 0,
959                trust: 0.8,
960                residual_summary: 0.1,
961                residual_state: ResidualState::Low,
962                rewrite_rule_id: 0,
963                rewrite_rule_label: "stable_envelope",
964                trust_profile: TrustProfile::Medium,
965                envelope_ok: true,
966            },
967            DscdObserverSample {
968                event_id: 2,
969                time_index: 2,
970                observer_id: 0,
971                trust: 0.6,
972                residual_summary: 0.2,
973                residual_state: ResidualState::Medium,
974                rewrite_rule_id: 1,
975                rewrite_rule_label: "moderate_envelope",
976                trust_profile: TrustProfile::Medium,
977                envelope_ok: true,
978            },
979            DscdObserverSample {
980                event_id: 3,
981                time_index: 3,
982                observer_id: 0,
983                trust: 0.3,
984                residual_summary: 0.3,
985                residual_state: ResidualState::High,
986                rewrite_rule_id: 2,
987                rewrite_rule_label: "high_residual_recovery",
988                trust_profile: TrustProfile::Medium,
989                envelope_ok: true,
990            },
991            DscdObserverSample {
992                event_id: 4,
993                time_index: 4,
994                observer_id: 0,
995                trust: 0.1,
996                residual_summary: 0.4,
997                residual_state: ResidualState::High,
998                rewrite_rule_id: 3,
999                rewrite_rule_label: "envelope_decay",
1000                trust_profile: TrustProfile::Medium,
1001                envelope_ok: false,
1002            },
1003        ]
1004    }
1005
1006    #[test]
1007    fn stricter_thresholds_do_not_increase_reachability() {
1008        let events = toy_events();
1009        let samples = toy_samples();
1010        let taus = [0.0, 0.25, 0.5, 0.75];
1011
1012        let ratios: Vec<f64> = taus
1013            .iter()
1014            .map(|tau| {
1015                let graph = build_graph_from_samples(&events, &samples, *tau);
1016                reachable_from(&graph, EventId(0), None).len() as f64 / events.len() as f64
1017            })
1018            .collect();
1019
1020        assert!(ratios.windows(2).all(|pair| pair[0] + 1.0e-12 >= pair[1]));
1021    }
1022
1023    #[test]
1024    fn finite_size_metrics_are_computed_from_threshold_records() {
1025        let records = vec![
1026            ThresholdRecord {
1027                tau: 0.0,
1028                expansion_ratio: 1.0,
1029                reachable_size: 0,
1030                s_infty: None,
1031            },
1032            ThresholdRecord {
1033                tau: 0.2,
1034                expansion_ratio: 0.8,
1035                reachable_size: 0,
1036                s_infty: None,
1037            },
1038            ThresholdRecord {
1039                tau: 0.4,
1040                expansion_ratio: 0.2,
1041                reachable_size: 0,
1042                s_infty: None,
1043            },
1044            ThresholdRecord {
1045                tau: 0.6,
1046                expansion_ratio: 0.0,
1047                reachable_size: 0,
1048                s_infty: None,
1049            },
1050        ];
1051
1052        let width = compute_width_0_1_to_0_9(&records);
1053        let max_derivative = compute_max_derivative(&records).expect("max derivative");
1054
1055        assert!((width - 0.4).abs() < 1.0e-12);
1056        assert!((max_derivative - 3.0).abs() < 1.0e-12);
1057    }
1058
1059    #[test]
1060    fn scaling_writes_summary_and_curves() {
1061        let now = SystemTime::now()
1062            .duration_since(UNIX_EPOCH)
1063            .expect("clock")
1064            .as_nanos();
1065        let out_dir = std::env::temp_dir().join(format!("dsfb-dscd-scaling-test-{}", now));
1066
1067        let cfg = DscdScalingConfig {
1068            event_counts: vec![64, 128],
1069            tau_grid: vec![0.0, 0.5, 1.0],
1070            initial_event: EventId(0),
1071            max_path_length: usize::MAX,
1072            critical_fraction: 0.5,
1073            ..DscdScalingConfig::default()
1074        };
1075
1076        run_threshold_scaling(&cfg, &out_dir).expect("scaling should run");
1077
1078        let summary_path = out_dir.join("threshold_scaling_summary.csv");
1079        assert!(summary_path.exists());
1080        assert!(out_dir.join("threshold_curve_N_64.csv").exists());
1081        assert!(out_dir.join("threshold_curve_N_128.csv").exists());
1082
1083        let mut reader = csv::Reader::from_path(&summary_path).expect("open summary");
1084        let rows: Vec<_> = reader
1085            .records()
1086            .collect::<std::result::Result<_, _>>()
1087            .expect("rows");
1088        assert_eq!(rows.len(), 2);
1089
1090        let tau_star: f64 = rows[0][1].parse().expect("tau_star parse");
1091        let width: f64 = rows[0][2].parse().expect("width parse");
1092        assert!((0.0..=1.0).contains(&tau_star));
1093        assert!(width >= 0.0);
1094
1095        let _ = fs::remove_dir_all(&out_dir);
1096    }
1097}