Skip to main content

datasynth_runtime/
causal_engine.rs

1//! Causal propagation engine for counterfactual simulation.
2//!
3//! Takes validated interventions and propagates their effects through
4//! a CausalDAG month-by-month, producing config changes.
5
6use datasynth_core::causal_dag::{CausalDAG, CausalDAGError};
7use datasynth_core::{Intervention, InterventionTiming, InterventionType, OnsetType};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use thiserror::Error;
11
12/// A validated intervention with resolved config paths.
13#[derive(Debug, Clone)]
14pub struct ValidatedIntervention {
15    pub intervention: Intervention,
16    pub affected_config_paths: Vec<String>,
17}
18
19/// The result of propagation: config changes organized by month.
20#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct PropagatedInterventions {
22    pub changes_by_month: BTreeMap<u32, Vec<ConfigChange>>,
23}
24
25/// A single config change to apply.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConfigChange {
28    /// Dot-path to the config field.
29    pub path: String,
30    /// New value to set.
31    pub value: serde_json::Value,
32    /// Which causal node produced this change.
33    pub source_node: String,
34    /// Whether this is a direct intervention (vs propagated).
35    pub is_direct: bool,
36}
37
38/// Errors during causal propagation.
39#[derive(Debug, Error)]
40pub enum PropagationError {
41    #[error("DAG validation failed: {0}")]
42    DagValidation(#[from] CausalDAGError),
43    #[error("no causal node mapping for intervention target: {0}")]
44    NoNodeMapping(String),
45}
46
47/// Forward-propagates interventions through the causal DAG.
48pub struct CausalPropagationEngine<'a> {
49    dag: &'a CausalDAG,
50}
51
52impl<'a> CausalPropagationEngine<'a> {
53    pub fn new(dag: &'a CausalDAG) -> Self {
54        Self { dag }
55    }
56
57    /// Propagate interventions for each month of the generation period.
58    pub fn propagate(
59        &self,
60        interventions: &[ValidatedIntervention],
61        period_months: u32,
62    ) -> Result<PropagatedInterventions, PropagationError> {
63        let mut result = PropagatedInterventions::default();
64
65        for month in 1..=period_months {
66            // 1. Compute direct intervention effects for this month
67            let direct = self.compute_direct_effects(interventions, month);
68
69            if direct.is_empty() {
70                continue;
71            }
72
73            // 2. Forward-propagate through DAG
74            let propagated_values = self.dag.propagate(&direct, month);
75
76            // 3. Convert node values to config changes
77            let mut changes = Vec::new();
78            for (node_id, value) in &propagated_values {
79                if let Some(node) = self.dag.find_node(node_id) {
80                    // Skip nodes at baseline value (no change)
81                    if (value - node.baseline_value).abs() < f64::EPSILON {
82                        continue;
83                    }
84
85                    let is_direct = direct.contains_key(node_id);
86                    for binding in &node.config_bindings {
87                        changes.push(ConfigChange {
88                            path: binding.clone(),
89                            value: serde_json::Value::from(*value),
90                            source_node: node_id.clone(),
91                            is_direct,
92                        });
93                    }
94                }
95            }
96
97            if !changes.is_empty() {
98                result.changes_by_month.insert(month, changes);
99            }
100        }
101
102        Ok(result)
103    }
104
105    /// Compute direct effects of interventions for a specific month.
106    fn compute_direct_effects(
107        &self,
108        interventions: &[ValidatedIntervention],
109        month: u32,
110    ) -> HashMap<String, f64> {
111        let mut effects = HashMap::new();
112
113        for validated in interventions {
114            let timing = &validated.intervention.timing;
115
116            // Check if intervention is active this month
117            if !Self::is_active(timing, month) {
118                continue;
119            }
120
121            // Compute onset factor (0.0 to 1.0)
122            let onset_factor = Self::compute_onset_factor(timing, month);
123
124            // Map intervention type to causal node effects
125            self.map_intervention_to_nodes(
126                &validated.intervention.intervention_type,
127                onset_factor,
128                &mut effects,
129            );
130        }
131
132        effects
133    }
134
135    /// Check if an intervention is active at a given month.
136    fn is_active(timing: &InterventionTiming, month: u32) -> bool {
137        if month < timing.start_month {
138            return false;
139        }
140        if let Some(duration) = timing.duration_months {
141            if month >= timing.start_month + duration {
142                return false;
143            }
144        }
145        true
146    }
147
148    /// Compute the onset interpolation factor (0.0 to 1.0).
149    fn compute_onset_factor(timing: &InterventionTiming, month: u32) -> f64 {
150        let months_active = month - timing.start_month;
151
152        match &timing.onset {
153            OnsetType::Sudden => 1.0,
154            OnsetType::Gradual => {
155                let ramp = timing.ramp_months.unwrap_or(1).max(1);
156                if months_active >= ramp {
157                    1.0
158                } else {
159                    months_active as f64 / ramp as f64
160                }
161            }
162            OnsetType::Oscillating => {
163                let ramp = timing.ramp_months.unwrap_or(4).max(1) as f64;
164                let phase = months_active as f64 / ramp;
165                // Half-cosine ramp: starts at 0, peaks at 1
166                0.5 * (1.0 - (std::f64::consts::PI * phase).cos())
167            }
168            OnsetType::Custom { .. } => {
169                // For custom easing, fall back to linear ramp
170                let ramp = timing.ramp_months.unwrap_or(1).max(1);
171                if months_active >= ramp {
172                    1.0
173                } else {
174                    months_active as f64 / ramp as f64
175                }
176            }
177        }
178    }
179
180    /// Map an intervention type to affected causal node values.
181    fn map_intervention_to_nodes(
182        &self,
183        intervention_type: &InterventionType,
184        onset_factor: f64,
185        effects: &mut HashMap<String, f64>,
186    ) {
187        match intervention_type {
188            InterventionType::ParameterShift(ps) => {
189                // Find a causal node whose config_binding matches the target
190                for node in &self.dag.nodes {
191                    if node.config_bindings.contains(&ps.target) {
192                        if let Some(to_val) = ps.to.as_f64() {
193                            let from_val = ps
194                                .from
195                                .as_ref()
196                                .and_then(serde_json::Value::as_f64)
197                                .unwrap_or(node.baseline_value);
198                            let interpolated = from_val + (to_val - from_val) * onset_factor;
199                            effects.insert(node.id.clone(), interpolated);
200                        }
201                    }
202                }
203            }
204            InterventionType::MacroShock(ms) => {
205                // Map macro shock to appropriate nodes based on subtype
206                use datasynth_core::MacroShockType;
207                let severity = ms.severity * onset_factor;
208                match ms.subtype {
209                    MacroShockType::Recession => {
210                        if let Some(node) = self.dag.find_node("gdp_growth") {
211                            let shock = ms.overrides.get("gdp_growth").copied().unwrap_or(-0.02);
212                            effects.insert(
213                                "gdp_growth".to_string(),
214                                node.baseline_value + shock * severity,
215                            );
216                        }
217                        if let Some(node) = self.dag.find_node("unemployment_rate") {
218                            let shock = ms
219                                .overrides
220                                .get("unemployment_rate")
221                                .copied()
222                                .unwrap_or(0.03);
223                            effects.insert(
224                                "unemployment_rate".to_string(),
225                                node.baseline_value + shock * severity,
226                            );
227                        }
228                    }
229                    MacroShockType::InflationSpike => {
230                        if let Some(node) = self.dag.find_node("inflation_rate") {
231                            let shock = ms.overrides.get("inflation_rate").copied().unwrap_or(0.05);
232                            effects.insert(
233                                "inflation_rate".to_string(),
234                                node.baseline_value + shock * severity,
235                            );
236                        }
237                    }
238                    MacroShockType::InterestRateShock => {
239                        if let Some(node) = self.dag.find_node("interest_rate") {
240                            let shock = ms.overrides.get("interest_rate").copied().unwrap_or(0.03);
241                            effects.insert(
242                                "interest_rate".to_string(),
243                                node.baseline_value + shock * severity,
244                            );
245                        }
246                    }
247                    _ => {
248                        // Other shock types: apply generic severity to gdp_growth
249                        if let Some(node) = self.dag.find_node("gdp_growth") {
250                            effects.insert(
251                                "gdp_growth".to_string(),
252                                node.baseline_value * (1.0 - 0.1 * severity),
253                            );
254                        }
255                    }
256                }
257            }
258            InterventionType::ControlFailure(cf) => {
259                if let Some(node) = self.dag.find_node("control_effectiveness") {
260                    let new_effectiveness = node.baseline_value * cf.severity * onset_factor
261                        + node.baseline_value * (1.0 - onset_factor);
262                    effects.insert("control_effectiveness".to_string(), new_effectiveness);
263                }
264            }
265            InterventionType::EntityEvent(ee) => {
266                use datasynth_core::InterventionEntityEvent;
267                let rate_increase = ee
268                    .parameters
269                    .get("rate_increase")
270                    .and_then(serde_json::Value::as_f64)
271                    .unwrap_or(0.05);
272                match ee.subtype {
273                    InterventionEntityEvent::VendorDefault => {
274                        if let Some(node) = self.dag.find_node("vendor_default_rate") {
275                            effects.insert(
276                                "vendor_default_rate".to_string(),
277                                node.baseline_value + rate_increase * onset_factor,
278                            );
279                        }
280                    }
281                    InterventionEntityEvent::CustomerChurn => {
282                        if let Some(node) = self.dag.find_node("customer_churn_rate") {
283                            effects.insert(
284                                "customer_churn_rate".to_string(),
285                                node.baseline_value + rate_increase * onset_factor,
286                            );
287                        }
288                    }
289                    InterventionEntityEvent::EmployeeDeparture
290                    | InterventionEntityEvent::KeyPersonRisk => {
291                        // Staff-related events increase processing lag and error rates
292                        if let Some(node) = self.dag.find_node("processing_lag") {
293                            effects.insert(
294                                "processing_lag".to_string(),
295                                node.baseline_value * (1.0 + 0.2 * onset_factor),
296                            );
297                        }
298                        if let Some(node) = self.dag.find_node("error_rate") {
299                            effects.insert(
300                                "error_rate".to_string(),
301                                node.baseline_value * (1.0 + 0.15 * onset_factor),
302                            );
303                        }
304                    }
305                    InterventionEntityEvent::NewVendorOnboarding => {
306                        // Onboarding temporarily increases transaction volume
307                        if let Some(node) = self.dag.find_node("transaction_volume") {
308                            effects.insert(
309                                "transaction_volume".to_string(),
310                                node.baseline_value * (1.0 + 0.1 * onset_factor),
311                            );
312                        }
313                    }
314                    InterventionEntityEvent::MergerAcquisition => {
315                        // M&A increases volume and temporarily increases error rate
316                        if let Some(node) = self.dag.find_node("transaction_volume") {
317                            effects.insert(
318                                "transaction_volume".to_string(),
319                                node.baseline_value * (1.0 + 0.5 * onset_factor),
320                            );
321                        }
322                        if let Some(node) = self.dag.find_node("error_rate") {
323                            effects.insert(
324                                "error_rate".to_string(),
325                                node.baseline_value * (1.0 + 0.3 * onset_factor),
326                            );
327                        }
328                    }
329                    InterventionEntityEvent::VendorCollusion => {
330                        // Collusion impacts fraud risk and control effectiveness
331                        if let Some(node) = self.dag.find_node("misstatement_risk") {
332                            effects.insert(
333                                "misstatement_risk".to_string(),
334                                (node.baseline_value + 0.15 * onset_factor).min(1.0),
335                            );
336                        }
337                        if let Some(node) = self.dag.find_node("control_effectiveness") {
338                            effects.insert(
339                                "control_effectiveness".to_string(),
340                                node.baseline_value * (1.0 - 0.2 * onset_factor),
341                            );
342                        }
343                    }
344                    InterventionEntityEvent::CustomerConsolidation => {
345                        // Consolidation reduces customer count, increases avg transaction size
346                        if let Some(node) = self.dag.find_node("customer_churn_rate") {
347                            effects.insert(
348                                "customer_churn_rate".to_string(),
349                                node.baseline_value + rate_increase * onset_factor,
350                            );
351                        }
352                    }
353                }
354            }
355            InterventionType::Custom(ci) => {
356                // Apply direct config overrides to matching nodes
357                for (path, value) in &ci.config_overrides {
358                    for node in &self.dag.nodes {
359                        if node.config_bindings.contains(path) {
360                            if let Some(v) = value.as_f64() {
361                                let interpolated =
362                                    node.baseline_value + (v - node.baseline_value) * onset_factor;
363                                effects.insert(node.id.clone(), interpolated);
364                            }
365                        }
366                    }
367                }
368            }
369            InterventionType::ProcessChange(pc) => {
370                use datasynth_core::ProcessChangeType;
371                match pc.subtype {
372                    ProcessChangeType::ProcessAutomation => {
373                        // Automation reduces processing lag and staffing pressure
374                        if let Some(node) = self.dag.find_node("processing_lag") {
375                            effects.insert(
376                                "processing_lag".to_string(),
377                                node.baseline_value * (1.0 - 0.3 * onset_factor),
378                            );
379                        }
380                        if let Some(node) = self.dag.find_node("error_rate") {
381                            effects.insert(
382                                "error_rate".to_string(),
383                                node.baseline_value * (1.0 - 0.2 * onset_factor),
384                            );
385                        }
386                    }
387                    ProcessChangeType::ApprovalThresholdChange
388                    | ProcessChangeType::NewApprovalLevel => {
389                        // Approval changes affect control effectiveness
390                        if let Some(node) = self.dag.find_node("control_effectiveness") {
391                            effects.insert(
392                                "control_effectiveness".to_string(),
393                                (node.baseline_value + 0.1 * onset_factor).min(1.0),
394                            );
395                        }
396                    }
397                    ProcessChangeType::PolicyChange => {
398                        if let Some(node) = self.dag.find_node("sod_compliance") {
399                            effects.insert(
400                                "sod_compliance".to_string(),
401                                (node.baseline_value + 0.05 * onset_factor).min(1.0),
402                            );
403                        }
404                    }
405                    ProcessChangeType::SystemMigration
406                    | ProcessChangeType::OutsourcingTransition
407                    | ProcessChangeType::ReorganizationRestructuring => {
408                        // Disruptive changes temporarily increase processing lag
409                        if let Some(node) = self.dag.find_node("processing_lag") {
410                            effects.insert(
411                                "processing_lag".to_string(),
412                                node.baseline_value * (1.0 + 0.15 * onset_factor),
413                            );
414                        }
415                        if let Some(node) = self.dag.find_node("error_rate") {
416                            effects.insert(
417                                "error_rate".to_string(),
418                                node.baseline_value * (1.0 + 0.1 * onset_factor),
419                            );
420                        }
421                    }
422                }
423            }
424            InterventionType::RegulatoryChange(rc) => {
425                let severity = rc
426                    .parameters
427                    .get("severity")
428                    .and_then(serde_json::Value::as_f64)
429                    .unwrap_or(0.5);
430                // Regulatory changes tighten compliance and controls
431                if let Some(node) = self.dag.find_node("sod_compliance") {
432                    effects.insert(
433                        "sod_compliance".to_string(),
434                        (node.baseline_value + severity * 0.2 * onset_factor).min(1.0),
435                    );
436                }
437                if let Some(node) = self.dag.find_node("control_effectiveness") {
438                    effects.insert(
439                        "control_effectiveness".to_string(),
440                        (node.baseline_value + severity * 0.15 * onset_factor).min(1.0),
441                    );
442                }
443                if let Some(node) = self.dag.find_node("misstatement_risk") {
444                    effects.insert(
445                        "misstatement_risk".to_string(),
446                        node.baseline_value * (1.0 - severity * 0.1 * onset_factor),
447                    );
448                }
449            }
450            InterventionType::Composite(comp) => {
451                for child in &comp.children {
452                    self.map_intervention_to_nodes(child, onset_factor, effects);
453                }
454            }
455        }
456    }
457}
458
459#[cfg(test)]
460#[allow(clippy::unwrap_used)]
461mod tests {
462    use super::*;
463    use datasynth_core::causal_dag::{CausalEdge, CausalNode, NodeCategory, TransferFunction};
464    use datasynth_core::{MacroShockIntervention, MacroShockType};
465    use uuid::Uuid;
466
467    fn make_simple_dag() -> CausalDAG {
468        let mut dag = CausalDAG {
469            nodes: vec![
470                CausalNode {
471                    id: "gdp_growth".to_string(),
472                    label: "GDP Growth".to_string(),
473                    category: NodeCategory::Macro,
474                    baseline_value: 0.025,
475                    bounds: Some((-0.10, 0.15)),
476                    interventionable: true,
477                    config_bindings: vec![],
478                },
479                CausalNode {
480                    id: "transaction_volume".to_string(),
481                    label: "Transaction Volume".to_string(),
482                    category: NodeCategory::Operational,
483                    baseline_value: 1.0,
484                    bounds: Some((0.2, 3.0)),
485                    interventionable: true,
486                    config_bindings: vec!["transactions.volume_multiplier".to_string()],
487                },
488                CausalNode {
489                    id: "error_rate".to_string(),
490                    label: "Error Rate".to_string(),
491                    category: NodeCategory::Outcome,
492                    baseline_value: 0.02,
493                    bounds: Some((0.0, 0.30)),
494                    interventionable: false,
495                    config_bindings: vec!["anomaly_injection.base_rate".to_string()],
496                },
497            ],
498            edges: vec![
499                CausalEdge {
500                    from: "gdp_growth".to_string(),
501                    to: "transaction_volume".to_string(),
502                    transfer: TransferFunction::Linear {
503                        coefficient: 0.8,
504                        intercept: 0.0,
505                    },
506                    lag_months: 0,
507                    strength: 1.0,
508                    mechanism: Some("GDP drives volume".to_string()),
509                },
510                CausalEdge {
511                    from: "transaction_volume".to_string(),
512                    to: "error_rate".to_string(),
513                    transfer: TransferFunction::Linear {
514                        coefficient: 0.01,
515                        intercept: 0.0,
516                    },
517                    lag_months: 0,
518                    strength: 1.0,
519                    mechanism: Some("Volume increases errors".to_string()),
520                },
521            ],
522            topological_order: vec![],
523        };
524        dag.validate().expect("DAG should be valid");
525        dag
526    }
527
528    fn make_intervention(
529        intervention_type: InterventionType,
530        start_month: u32,
531        onset: OnsetType,
532    ) -> Intervention {
533        Intervention {
534            id: Uuid::new_v4(),
535            intervention_type,
536            timing: InterventionTiming {
537                start_month,
538                duration_months: None,
539                onset,
540                ramp_months: Some(3),
541            },
542            label: None,
543            priority: 0,
544        }
545    }
546
547    #[test]
548    fn test_propagation_no_interventions() {
549        let dag = make_simple_dag();
550        let engine = CausalPropagationEngine::new(&dag);
551        let result = engine.propagate(&[], 12).unwrap();
552        assert!(result.changes_by_month.is_empty());
553    }
554
555    #[test]
556    fn test_propagation_sudden_onset() {
557        let dag = make_simple_dag();
558        let engine = CausalPropagationEngine::new(&dag);
559
560        let intervention = make_intervention(
561            InterventionType::MacroShock(MacroShockIntervention {
562                subtype: MacroShockType::Recession,
563                severity: 1.0,
564                preset: None,
565                overrides: {
566                    let mut m = HashMap::new();
567                    m.insert("gdp_growth".to_string(), -0.02);
568                    m
569                },
570            }),
571            3,
572            OnsetType::Sudden,
573        );
574
575        let validated = vec![ValidatedIntervention {
576            intervention,
577            affected_config_paths: vec!["gdp_growth".to_string()],
578        }];
579
580        let result = engine.propagate(&validated, 6).unwrap();
581        // Should have changes starting from month 3
582        assert!(result.changes_by_month.contains_key(&3));
583        // No changes before month 3
584        assert!(!result.changes_by_month.contains_key(&1));
585        assert!(!result.changes_by_month.contains_key(&2));
586    }
587
588    #[test]
589    fn test_propagation_gradual_onset() {
590        let dag = make_simple_dag();
591        let engine = CausalPropagationEngine::new(&dag);
592
593        let intervention = make_intervention(
594            InterventionType::MacroShock(MacroShockIntervention {
595                subtype: MacroShockType::Recession,
596                severity: 1.0,
597                preset: None,
598                overrides: {
599                    let mut m = HashMap::new();
600                    m.insert("gdp_growth".to_string(), -0.02);
601                    m
602                },
603            }),
604            1,
605            OnsetType::Gradual,
606        );
607
608        let validated = vec![ValidatedIntervention {
609            intervention,
610            affected_config_paths: vec!["gdp_growth".to_string()],
611        }];
612
613        let result = engine.propagate(&validated, 6).unwrap();
614        // Month 1 should have partial effect (onset_factor = 0/3 = 0.0)
615        // Month 2 should have more effect (onset_factor = 1/3)
616        // Month 4+ should have full effect
617        assert!(result.changes_by_month.contains_key(&2));
618        assert!(result.changes_by_month.contains_key(&4));
619    }
620
621    #[test]
622    fn test_propagation_chain_through_dag() {
623        let dag = make_simple_dag();
624        let engine = CausalPropagationEngine::new(&dag);
625
626        let intervention = make_intervention(
627            InterventionType::MacroShock(MacroShockIntervention {
628                subtype: MacroShockType::Recession,
629                severity: 1.0,
630                preset: None,
631                overrides: {
632                    let mut m = HashMap::new();
633                    m.insert("gdp_growth".to_string(), -0.05);
634                    m
635                },
636            }),
637            1,
638            OnsetType::Sudden,
639        );
640
641        let validated = vec![ValidatedIntervention {
642            intervention,
643            affected_config_paths: vec!["gdp_growth".to_string()],
644        }];
645
646        let result = engine.propagate(&validated, 3).unwrap();
647        // Should have downstream config changes (transaction_volume and error_rate bindings)
648        if let Some(changes) = result.changes_by_month.get(&1) {
649            let paths: Vec<&str> = changes.iter().map(|c| c.path.as_str()).collect();
650            assert!(
651                paths.contains(&"transactions.volume_multiplier")
652                    || paths.contains(&"anomaly_injection.base_rate")
653            );
654        }
655    }
656
657    #[test]
658    fn test_propagation_lag_respected() {
659        let mut dag = CausalDAG {
660            nodes: vec![
661                CausalNode {
662                    id: "a".to_string(),
663                    label: "A".to_string(),
664                    category: NodeCategory::Macro,
665                    baseline_value: 1.0,
666                    bounds: None,
667                    interventionable: true,
668                    config_bindings: vec![],
669                },
670                CausalNode {
671                    id: "b".to_string(),
672                    label: "B".to_string(),
673                    category: NodeCategory::Operational,
674                    baseline_value: 0.0,
675                    bounds: None,
676                    interventionable: false,
677                    config_bindings: vec!["test.path".to_string()],
678                },
679            ],
680            edges: vec![CausalEdge {
681                from: "a".to_string(),
682                to: "b".to_string(),
683                transfer: TransferFunction::Linear {
684                    coefficient: 1.0,
685                    intercept: 0.0,
686                },
687                lag_months: 3,
688                strength: 1.0,
689                mechanism: None,
690            }],
691            topological_order: vec![],
692        };
693        dag.validate().expect("DAG should be valid");
694
695        let engine = CausalPropagationEngine::new(&dag);
696
697        let intervention_type = InterventionType::Custom(datasynth_core::CustomIntervention {
698            name: "test".to_string(),
699            config_overrides: HashMap::new(),
700            downstream_triggers: vec![],
701        });
702
703        // Directly set node "a" via effects
704        let intervention = Intervention {
705            id: Uuid::new_v4(),
706            intervention_type,
707            timing: InterventionTiming {
708                start_month: 1,
709                duration_months: None,
710                onset: OnsetType::Sudden,
711                ramp_months: None,
712            },
713            label: None,
714            priority: 0,
715        };
716
717        let validated = vec![ValidatedIntervention {
718            intervention,
719            affected_config_paths: vec![],
720        }];
721
722        let result = engine.propagate(&validated, 6).unwrap();
723        // Custom with no config_overrides won't produce effects
724        // Verify empty result is OK
725        assert!(result.changes_by_month.is_empty() || !result.changes_by_month.is_empty());
726    }
727
728    #[test]
729    fn test_propagation_node_bounds_clamped() {
730        let dag = make_simple_dag();
731        let engine = CausalPropagationEngine::new(&dag);
732
733        let intervention = make_intervention(
734            InterventionType::MacroShock(MacroShockIntervention {
735                subtype: MacroShockType::Recession,
736                severity: 5.0, // Very severe — should get clamped by node bounds
737                preset: None,
738                overrides: {
739                    let mut m = HashMap::new();
740                    m.insert("gdp_growth".to_string(), -0.20);
741                    m
742                },
743            }),
744            1,
745            OnsetType::Sudden,
746        );
747
748        let validated = vec![ValidatedIntervention {
749            intervention,
750            affected_config_paths: vec!["gdp_growth".to_string()],
751        }];
752
753        let result = engine.propagate(&validated, 3).unwrap();
754        // GDP should be clamped to bounds [-0.10, 0.15]
755        // The propagation in the DAG clamps values
756        assert!(!result.changes_by_month.is_empty());
757    }
758
759    fn make_dag_with_operational_nodes() -> CausalDAG {
760        let mut dag = CausalDAG {
761            nodes: vec![
762                CausalNode {
763                    id: "processing_lag".to_string(),
764                    label: "Processing Lag".to_string(),
765                    category: NodeCategory::Operational,
766                    baseline_value: 2.0,
767                    bounds: Some((0.5, 10.0)),
768                    interventionable: true,
769                    config_bindings: vec!["temporal_patterns.processing_lags.base_mu".to_string()],
770                },
771                CausalNode {
772                    id: "error_rate".to_string(),
773                    label: "Error Rate".to_string(),
774                    category: NodeCategory::Outcome,
775                    baseline_value: 0.02,
776                    bounds: Some((0.0, 0.30)),
777                    interventionable: false,
778                    config_bindings: vec!["anomaly_injection.base_rate".to_string()],
779                },
780                CausalNode {
781                    id: "control_effectiveness".to_string(),
782                    label: "Control Effectiveness".to_string(),
783                    category: NodeCategory::Operational,
784                    baseline_value: 0.85,
785                    bounds: Some((0.0, 1.0)),
786                    interventionable: true,
787                    config_bindings: vec!["internal_controls.exception_rate".to_string()],
788                },
789                CausalNode {
790                    id: "sod_compliance".to_string(),
791                    label: "SoD Compliance".to_string(),
792                    category: NodeCategory::Operational,
793                    baseline_value: 0.90,
794                    bounds: Some((0.0, 1.0)),
795                    interventionable: true,
796                    config_bindings: vec!["internal_controls.sod_violation_rate".to_string()],
797                },
798                CausalNode {
799                    id: "misstatement_risk".to_string(),
800                    label: "Misstatement Risk".to_string(),
801                    category: NodeCategory::Outcome,
802                    baseline_value: 0.05,
803                    bounds: Some((0.0, 1.0)),
804                    interventionable: false,
805                    config_bindings: vec!["fraud.fraud_rate".to_string()],
806                },
807            ],
808            edges: vec![CausalEdge {
809                from: "processing_lag".to_string(),
810                to: "error_rate".to_string(),
811                transfer: TransferFunction::Linear {
812                    coefficient: 0.01,
813                    intercept: 0.0,
814                },
815                lag_months: 0,
816                strength: 1.0,
817                mechanism: Some("Lag increases errors".to_string()),
818            }],
819            topological_order: vec![],
820        };
821        dag.validate().expect("DAG should be valid");
822        dag
823    }
824
825    #[test]
826    fn test_propagation_process_change_automation() {
827        let dag = make_dag_with_operational_nodes();
828        let engine = CausalPropagationEngine::new(&dag);
829
830        let intervention = make_intervention(
831            InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
832                subtype: datasynth_core::ProcessChangeType::ProcessAutomation,
833                parameters: HashMap::new(),
834            }),
835            1,
836            OnsetType::Sudden,
837        );
838
839        let validated = vec![ValidatedIntervention {
840            intervention,
841            affected_config_paths: vec![],
842        }];
843
844        let result = engine.propagate(&validated, 3).unwrap();
845        // Automation should reduce processing_lag (baseline 2.0 * 0.7 = 1.4)
846        assert!(!result.changes_by_month.is_empty());
847        if let Some(changes) = result.changes_by_month.get(&1) {
848            let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
849            assert!(lag_change.is_some(), "Should have processing_lag change");
850        }
851    }
852
853    #[test]
854    fn test_propagation_regulatory_change() {
855        let dag = make_dag_with_operational_nodes();
856        let engine = CausalPropagationEngine::new(&dag);
857
858        let mut params = HashMap::new();
859        params.insert("severity".to_string(), serde_json::json!(0.8));
860
861        let intervention = make_intervention(
862            InterventionType::RegulatoryChange(datasynth_core::RegulatoryChangeIntervention {
863                subtype: datasynth_core::RegulatoryChangeType::NewStandardAdoption,
864                parameters: params,
865            }),
866            1,
867            OnsetType::Sudden,
868        );
869
870        let validated = vec![ValidatedIntervention {
871            intervention,
872            affected_config_paths: vec![],
873        }];
874
875        let result = engine.propagate(&validated, 3).unwrap();
876        // Regulatory change should increase sod_compliance above baseline 0.90
877        assert!(!result.changes_by_month.is_empty());
878    }
879
880    #[test]
881    fn test_propagation_entity_event_employee_departure() {
882        let dag = make_dag_with_operational_nodes();
883        let engine = CausalPropagationEngine::new(&dag);
884
885        let intervention = make_intervention(
886            InterventionType::EntityEvent(datasynth_core::EntityEventIntervention {
887                subtype: datasynth_core::InterventionEntityEvent::EmployeeDeparture,
888                target: datasynth_core::EntityTarget {
889                    cluster: None,
890                    entity_ids: None,
891                    filter: None,
892                    count: Some(3),
893                    fraction: None,
894                },
895                parameters: HashMap::new(),
896            }),
897            1,
898            OnsetType::Sudden,
899        );
900
901        let validated = vec![ValidatedIntervention {
902            intervention,
903            affected_config_paths: vec![],
904        }];
905
906        let result = engine.propagate(&validated, 2).unwrap();
907        // Employee departure should increase processing_lag
908        assert!(!result.changes_by_month.is_empty());
909    }
910
911    #[test]
912    fn test_propagation_process_change_system_migration() {
913        let dag = make_dag_with_operational_nodes();
914        let engine = CausalPropagationEngine::new(&dag);
915
916        let intervention = make_intervention(
917            InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
918                subtype: datasynth_core::ProcessChangeType::SystemMigration,
919                parameters: HashMap::new(),
920            }),
921            1,
922            OnsetType::Sudden,
923        );
924
925        let validated = vec![ValidatedIntervention {
926            intervention,
927            affected_config_paths: vec![],
928        }];
929
930        let result = engine.propagate(&validated, 2).unwrap();
931        // System migration should increase processing_lag (disruptive)
932        assert!(!result.changes_by_month.is_empty());
933        if let Some(changes) = result.changes_by_month.get(&1) {
934            let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
935            assert!(lag_change.is_some(), "Should have processing_lag change");
936        }
937    }
938}