1use datasynth_core::causal_dag::{CausalDAG, CausalDAGError};
7use datasynth_core::{Intervention, InterventionTiming, InterventionType, OnsetType};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use thiserror::Error;
11
12#[derive(Debug, Clone)]
14pub struct ValidatedIntervention {
15 pub intervention: Intervention,
16 pub affected_config_paths: Vec<String>,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct PropagatedInterventions {
22 pub changes_by_month: BTreeMap<u32, Vec<ConfigChange>>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConfigChange {
28 pub path: String,
30 pub value: serde_json::Value,
32 pub source_node: String,
34 pub is_direct: bool,
36}
37
38#[derive(Debug, Error)]
40pub enum PropagationError {
41 #[error("DAG validation failed: {0}")]
42 DagValidation(#[from] CausalDAGError),
43 #[error("no causal node mapping for intervention target: {0}")]
44 NoNodeMapping(String),
45}
46
47pub struct CausalPropagationEngine<'a> {
49 dag: &'a CausalDAG,
50}
51
52impl<'a> CausalPropagationEngine<'a> {
53 pub fn new(dag: &'a CausalDAG) -> Self {
54 Self { dag }
55 }
56
57 pub fn propagate(
59 &self,
60 interventions: &[ValidatedIntervention],
61 period_months: u32,
62 ) -> Result<PropagatedInterventions, PropagationError> {
63 let mut result = PropagatedInterventions::default();
64
65 for month in 1..=period_months {
66 let direct = self.compute_direct_effects(interventions, month);
68
69 if direct.is_empty() {
70 continue;
71 }
72
73 let propagated_values = self.dag.propagate(&direct, month);
75
76 let mut changes = Vec::new();
78 for (node_id, value) in &propagated_values {
79 if let Some(node) = self.dag.find_node(node_id) {
80 if (value - node.baseline_value).abs() < f64::EPSILON {
82 continue;
83 }
84
85 let is_direct = direct.contains_key(node_id);
86 for binding in &node.config_bindings {
87 changes.push(ConfigChange {
88 path: binding.clone(),
89 value: serde_json::Value::from(*value),
90 source_node: node_id.clone(),
91 is_direct,
92 });
93 }
94 }
95 }
96
97 if !changes.is_empty() {
98 result.changes_by_month.insert(month, changes);
99 }
100 }
101
102 Ok(result)
103 }
104
105 fn compute_direct_effects(
107 &self,
108 interventions: &[ValidatedIntervention],
109 month: u32,
110 ) -> HashMap<String, f64> {
111 let mut effects = HashMap::new();
112
113 for validated in interventions {
114 let timing = &validated.intervention.timing;
115
116 if !Self::is_active(timing, month) {
118 continue;
119 }
120
121 let onset_factor = Self::compute_onset_factor(timing, month);
123
124 self.map_intervention_to_nodes(
126 &validated.intervention.intervention_type,
127 onset_factor,
128 &mut effects,
129 );
130 }
131
132 effects
133 }
134
135 fn is_active(timing: &InterventionTiming, month: u32) -> bool {
137 if month < timing.start_month {
138 return false;
139 }
140 if let Some(duration) = timing.duration_months {
141 if month >= timing.start_month + duration {
142 return false;
143 }
144 }
145 true
146 }
147
148 fn compute_onset_factor(timing: &InterventionTiming, month: u32) -> f64 {
150 let months_active = month - timing.start_month;
151
152 match &timing.onset {
153 OnsetType::Sudden => 1.0,
154 OnsetType::Gradual => {
155 let ramp = timing.ramp_months.unwrap_or(1).max(1);
156 if months_active >= ramp {
157 1.0
158 } else {
159 months_active as f64 / ramp as f64
160 }
161 }
162 OnsetType::Oscillating => {
163 let ramp = timing.ramp_months.unwrap_or(4).max(1) as f64;
164 let phase = months_active as f64 / ramp;
165 0.5 * (1.0 - (std::f64::consts::PI * phase).cos())
167 }
168 OnsetType::Custom { .. } => {
169 let ramp = timing.ramp_months.unwrap_or(1).max(1);
171 if months_active >= ramp {
172 1.0
173 } else {
174 months_active as f64 / ramp as f64
175 }
176 }
177 }
178 }
179
180 fn map_intervention_to_nodes(
182 &self,
183 intervention_type: &InterventionType,
184 onset_factor: f64,
185 effects: &mut HashMap<String, f64>,
186 ) {
187 match intervention_type {
188 InterventionType::ParameterShift(ps) => {
189 for node in &self.dag.nodes {
191 if node.config_bindings.contains(&ps.target) {
192 if let Some(to_val) = ps.to.as_f64() {
193 let from_val = ps
194 .from
195 .as_ref()
196 .and_then(serde_json::Value::as_f64)
197 .unwrap_or(node.baseline_value);
198 let interpolated = from_val + (to_val - from_val) * onset_factor;
199 effects.insert(node.id.clone(), interpolated);
200 }
201 }
202 }
203 }
204 InterventionType::MacroShock(ms) => {
205 use datasynth_core::MacroShockType;
207 let severity = ms.severity * onset_factor;
208 match ms.subtype {
209 MacroShockType::Recession => {
210 if let Some(node) = self.dag.find_node("gdp_growth") {
211 let shock = ms.overrides.get("gdp_growth").copied().unwrap_or(-0.02);
212 effects.insert(
213 "gdp_growth".to_string(),
214 node.baseline_value + shock * severity,
215 );
216 }
217 if let Some(node) = self.dag.find_node("unemployment_rate") {
218 let shock = ms
219 .overrides
220 .get("unemployment_rate")
221 .copied()
222 .unwrap_or(0.03);
223 effects.insert(
224 "unemployment_rate".to_string(),
225 node.baseline_value + shock * severity,
226 );
227 }
228 }
229 MacroShockType::InflationSpike => {
230 if let Some(node) = self.dag.find_node("inflation_rate") {
231 let shock = ms.overrides.get("inflation_rate").copied().unwrap_or(0.05);
232 effects.insert(
233 "inflation_rate".to_string(),
234 node.baseline_value + shock * severity,
235 );
236 }
237 }
238 MacroShockType::InterestRateShock => {
239 if let Some(node) = self.dag.find_node("interest_rate") {
240 let shock = ms.overrides.get("interest_rate").copied().unwrap_or(0.03);
241 effects.insert(
242 "interest_rate".to_string(),
243 node.baseline_value + shock * severity,
244 );
245 }
246 }
247 MacroShockType::CreditCrunch => {
248 if let Some(node) = self.dag.find_node("gdp_growth") {
249 effects.insert(
250 "gdp_growth".to_string(),
251 node.baseline_value * (1.0 - 0.1 * severity),
252 );
253 }
254 if let Some(node) = self.dag.find_node("ecl_provision_rate") {
255 effects.insert(
256 "ecl_provision_rate".to_string(),
257 node.baseline_value + severity * 0.5,
258 );
259 }
260 if let Some(node) = self.dag.find_node("going_concern_risk") {
261 effects.insert(
262 "going_concern_risk".to_string(),
263 node.baseline_value + severity * 0.3,
264 );
265 }
266 if let Some(node) = self.dag.find_node("debt_ratio") {
267 effects.insert(
268 "debt_ratio".to_string(),
269 node.baseline_value + severity * 0.4,
270 );
271 }
272 }
273 _ => {
274 if let Some(node) = self.dag.find_node("gdp_growth") {
276 effects.insert(
277 "gdp_growth".to_string(),
278 node.baseline_value * (1.0 - 0.1 * severity),
279 );
280 }
281 }
282 }
283 }
284 InterventionType::ControlFailure(cf) => {
285 if let Some(node) = self.dag.find_node("control_effectiveness") {
286 let new_effectiveness = node.baseline_value * cf.severity * onset_factor
287 + node.baseline_value * (1.0 - onset_factor);
288 effects.insert("control_effectiveness".to_string(), new_effectiveness);
289 }
290 }
291 InterventionType::EntityEvent(ee) => {
292 use datasynth_core::InterventionEntityEvent;
293 let rate_increase = ee
294 .parameters
295 .get("rate_increase")
296 .and_then(serde_json::Value::as_f64)
297 .unwrap_or(0.05);
298 match ee.subtype {
299 InterventionEntityEvent::VendorDefault => {
300 if let Some(node) = self.dag.find_node("vendor_default_rate") {
301 effects.insert(
302 "vendor_default_rate".to_string(),
303 node.baseline_value + rate_increase * onset_factor,
304 );
305 }
306 }
307 InterventionEntityEvent::CustomerChurn => {
308 if let Some(node) = self.dag.find_node("customer_churn_rate") {
309 effects.insert(
310 "customer_churn_rate".to_string(),
311 node.baseline_value + rate_increase * onset_factor,
312 );
313 }
314 }
315 InterventionEntityEvent::EmployeeDeparture
316 | InterventionEntityEvent::KeyPersonRisk => {
317 if let Some(node) = self.dag.find_node("processing_lag") {
319 effects.insert(
320 "processing_lag".to_string(),
321 node.baseline_value * (1.0 + 0.2 * onset_factor),
322 );
323 }
324 if let Some(node) = self.dag.find_node("error_rate") {
325 effects.insert(
326 "error_rate".to_string(),
327 node.baseline_value * (1.0 + 0.15 * onset_factor),
328 );
329 }
330 }
331 InterventionEntityEvent::NewVendorOnboarding => {
332 if let Some(node) = self.dag.find_node("transaction_volume") {
334 effects.insert(
335 "transaction_volume".to_string(),
336 node.baseline_value * (1.0 + 0.1 * onset_factor),
337 );
338 }
339 }
340 InterventionEntityEvent::MergerAcquisition => {
341 if let Some(node) = self.dag.find_node("transaction_volume") {
343 effects.insert(
344 "transaction_volume".to_string(),
345 node.baseline_value * (1.0 + 0.5 * onset_factor),
346 );
347 }
348 if let Some(node) = self.dag.find_node("error_rate") {
349 effects.insert(
350 "error_rate".to_string(),
351 node.baseline_value * (1.0 + 0.3 * onset_factor),
352 );
353 }
354 }
355 InterventionEntityEvent::VendorCollusion => {
356 if let Some(node) = self.dag.find_node("misstatement_risk") {
358 effects.insert(
359 "misstatement_risk".to_string(),
360 (node.baseline_value + 0.15 * onset_factor).min(1.0),
361 );
362 }
363 if let Some(node) = self.dag.find_node("control_effectiveness") {
364 effects.insert(
365 "control_effectiveness".to_string(),
366 node.baseline_value * (1.0 - 0.2 * onset_factor),
367 );
368 }
369 }
370 InterventionEntityEvent::CustomerConsolidation => {
371 if let Some(node) = self.dag.find_node("customer_churn_rate") {
373 effects.insert(
374 "customer_churn_rate".to_string(),
375 node.baseline_value + rate_increase * onset_factor,
376 );
377 }
378 }
379 }
380 }
381 InterventionType::Custom(ci) => {
382 for (path, value) in &ci.config_overrides {
384 for node in &self.dag.nodes {
385 if node.config_bindings.contains(path) {
386 if let Some(v) = value.as_f64() {
387 let interpolated =
388 node.baseline_value + (v - node.baseline_value) * onset_factor;
389 effects.insert(node.id.clone(), interpolated);
390 }
391 }
392 }
393 }
394 }
395 InterventionType::ProcessChange(pc) => {
396 use datasynth_core::ProcessChangeType;
397 match pc.subtype {
398 ProcessChangeType::ProcessAutomation => {
399 if let Some(node) = self.dag.find_node("processing_lag") {
401 effects.insert(
402 "processing_lag".to_string(),
403 node.baseline_value * (1.0 - 0.3 * onset_factor),
404 );
405 }
406 if let Some(node) = self.dag.find_node("error_rate") {
407 effects.insert(
408 "error_rate".to_string(),
409 node.baseline_value * (1.0 - 0.2 * onset_factor),
410 );
411 }
412 }
413 ProcessChangeType::ApprovalThresholdChange
414 | ProcessChangeType::NewApprovalLevel => {
415 if let Some(node) = self.dag.find_node("control_effectiveness") {
417 effects.insert(
418 "control_effectiveness".to_string(),
419 (node.baseline_value + 0.1 * onset_factor).min(1.0),
420 );
421 }
422 }
423 ProcessChangeType::PolicyChange => {
424 if let Some(node) = self.dag.find_node("sod_compliance") {
425 effects.insert(
426 "sod_compliance".to_string(),
427 (node.baseline_value + 0.05 * onset_factor).min(1.0),
428 );
429 }
430 }
431 ProcessChangeType::SystemMigration
432 | ProcessChangeType::OutsourcingTransition
433 | ProcessChangeType::ReorganizationRestructuring => {
434 if let Some(node) = self.dag.find_node("processing_lag") {
436 effects.insert(
437 "processing_lag".to_string(),
438 node.baseline_value * (1.0 + 0.15 * onset_factor),
439 );
440 }
441 if let Some(node) = self.dag.find_node("error_rate") {
442 effects.insert(
443 "error_rate".to_string(),
444 node.baseline_value * (1.0 + 0.1 * onset_factor),
445 );
446 }
447 }
448 }
449 }
450 InterventionType::RegulatoryChange(rc) => {
451 use datasynth_core::RegulatoryChangeType;
452 let severity = rc
453 .parameters
454 .get("severity")
455 .and_then(serde_json::Value::as_f64)
456 .unwrap_or(0.5);
457 let magnitude = severity * onset_factor;
458 match rc.subtype {
459 RegulatoryChangeType::MaterialityThresholdChange => {
460 if let Some(node) = self.dag.find_node("materiality_threshold") {
461 effects.insert(
462 "materiality_threshold".to_string(),
463 node.baseline_value + magnitude,
464 );
465 }
466 if let Some(node) = self.dag.find_node("sample_size_factor") {
467 effects.insert(
468 "sample_size_factor".to_string(),
469 node.baseline_value + magnitude * 0.5,
470 );
471 }
472 }
473 RegulatoryChangeType::AuditStandardChange => {
474 if let Some(node) = self.dag.find_node("inherent_risk") {
475 effects.insert(
476 "inherent_risk".to_string(),
477 node.baseline_value + magnitude * 0.3,
478 );
479 }
480 if let Some(node) = self.dag.find_node("sample_size_factor") {
481 effects.insert(
482 "sample_size_factor".to_string(),
483 node.baseline_value + magnitude * 0.4,
484 );
485 }
486 }
487 RegulatoryChangeType::TaxRateChange => {
488 if let Some(node) = self.dag.find_node("tax_rate") {
489 effects.insert("tax_rate".to_string(), node.baseline_value + magnitude);
490 }
491 }
492 _ => {
493 if let Some(node) = self.dag.find_node("sod_compliance") {
495 effects.insert(
496 "sod_compliance".to_string(),
497 (node.baseline_value + severity * 0.2 * onset_factor).min(1.0),
498 );
499 }
500 if let Some(node) = self.dag.find_node("control_effectiveness") {
501 effects.insert(
502 "control_effectiveness".to_string(),
503 (node.baseline_value + severity * 0.15 * onset_factor).min(1.0),
504 );
505 }
506 if let Some(node) = self.dag.find_node("misstatement_risk") {
507 effects.insert(
508 "misstatement_risk".to_string(),
509 node.baseline_value * (1.0 - severity * 0.1 * onset_factor),
510 );
511 }
512 }
513 }
514 }
515 InterventionType::Composite(comp) => {
516 for child in &comp.children {
517 self.map_intervention_to_nodes(child, onset_factor, effects);
518 }
519 }
520 }
521 }
522}
523
524#[cfg(test)]
525#[allow(clippy::unwrap_used)]
526mod tests {
527 use super::*;
528 use datasynth_core::causal_dag::{CausalEdge, CausalNode, NodeCategory, TransferFunction};
529 use datasynth_core::{MacroShockIntervention, MacroShockType};
530 use uuid::Uuid;
531
532 fn make_simple_dag() -> CausalDAG {
533 let mut dag = CausalDAG {
534 nodes: vec![
535 CausalNode {
536 id: "gdp_growth".to_string(),
537 label: "GDP Growth".to_string(),
538 category: NodeCategory::Macro,
539 baseline_value: 0.025,
540 bounds: Some((-0.10, 0.15)),
541 interventionable: true,
542 config_bindings: vec![],
543 },
544 CausalNode {
545 id: "transaction_volume".to_string(),
546 label: "Transaction Volume".to_string(),
547 category: NodeCategory::Operational,
548 baseline_value: 1.0,
549 bounds: Some((0.2, 3.0)),
550 interventionable: true,
551 config_bindings: vec!["transactions.volume_multiplier".to_string()],
552 },
553 CausalNode {
554 id: "error_rate".to_string(),
555 label: "Error Rate".to_string(),
556 category: NodeCategory::Outcome,
557 baseline_value: 0.02,
558 bounds: Some((0.0, 0.30)),
559 interventionable: false,
560 config_bindings: vec!["anomaly_injection.base_rate".to_string()],
561 },
562 ],
563 edges: vec![
564 CausalEdge {
565 from: "gdp_growth".to_string(),
566 to: "transaction_volume".to_string(),
567 transfer: TransferFunction::Linear {
568 coefficient: 0.8,
569 intercept: 0.0,
570 },
571 lag_months: 0,
572 strength: 1.0,
573 mechanism: Some("GDP drives volume".to_string()),
574 },
575 CausalEdge {
576 from: "transaction_volume".to_string(),
577 to: "error_rate".to_string(),
578 transfer: TransferFunction::Linear {
579 coefficient: 0.01,
580 intercept: 0.0,
581 },
582 lag_months: 0,
583 strength: 1.0,
584 mechanism: Some("Volume increases errors".to_string()),
585 },
586 ],
587 topological_order: vec![],
588 };
589 dag.validate().expect("DAG should be valid");
590 dag
591 }
592
593 fn make_intervention(
594 intervention_type: InterventionType,
595 start_month: u32,
596 onset: OnsetType,
597 ) -> Intervention {
598 Intervention {
599 id: Uuid::new_v4(),
600 intervention_type,
601 timing: InterventionTiming {
602 start_month,
603 duration_months: None,
604 onset,
605 ramp_months: Some(3),
606 },
607 label: None,
608 priority: 0,
609 }
610 }
611
612 #[test]
613 fn test_propagation_no_interventions() {
614 let dag = make_simple_dag();
615 let engine = CausalPropagationEngine::new(&dag);
616 let result = engine.propagate(&[], 12).unwrap();
617 assert!(result.changes_by_month.is_empty());
618 }
619
620 #[test]
621 fn test_propagation_sudden_onset() {
622 let dag = make_simple_dag();
623 let engine = CausalPropagationEngine::new(&dag);
624
625 let intervention = make_intervention(
626 InterventionType::MacroShock(MacroShockIntervention {
627 subtype: MacroShockType::Recession,
628 severity: 1.0,
629 preset: None,
630 overrides: {
631 let mut m = HashMap::new();
632 m.insert("gdp_growth".to_string(), -0.02);
633 m
634 },
635 }),
636 3,
637 OnsetType::Sudden,
638 );
639
640 let validated = vec![ValidatedIntervention {
641 intervention,
642 affected_config_paths: vec!["gdp_growth".to_string()],
643 }];
644
645 let result = engine.propagate(&validated, 6).unwrap();
646 assert!(result.changes_by_month.contains_key(&3));
648 assert!(!result.changes_by_month.contains_key(&1));
650 assert!(!result.changes_by_month.contains_key(&2));
651 }
652
653 #[test]
654 fn test_propagation_gradual_onset() {
655 let dag = make_simple_dag();
656 let engine = CausalPropagationEngine::new(&dag);
657
658 let intervention = make_intervention(
659 InterventionType::MacroShock(MacroShockIntervention {
660 subtype: MacroShockType::Recession,
661 severity: 1.0,
662 preset: None,
663 overrides: {
664 let mut m = HashMap::new();
665 m.insert("gdp_growth".to_string(), -0.02);
666 m
667 },
668 }),
669 1,
670 OnsetType::Gradual,
671 );
672
673 let validated = vec![ValidatedIntervention {
674 intervention,
675 affected_config_paths: vec!["gdp_growth".to_string()],
676 }];
677
678 let result = engine.propagate(&validated, 6).unwrap();
679 assert!(result.changes_by_month.contains_key(&2));
683 assert!(result.changes_by_month.contains_key(&4));
684 }
685
686 #[test]
687 fn test_propagation_chain_through_dag() {
688 let dag = make_simple_dag();
689 let engine = CausalPropagationEngine::new(&dag);
690
691 let intervention = make_intervention(
692 InterventionType::MacroShock(MacroShockIntervention {
693 subtype: MacroShockType::Recession,
694 severity: 1.0,
695 preset: None,
696 overrides: {
697 let mut m = HashMap::new();
698 m.insert("gdp_growth".to_string(), -0.05);
699 m
700 },
701 }),
702 1,
703 OnsetType::Sudden,
704 );
705
706 let validated = vec![ValidatedIntervention {
707 intervention,
708 affected_config_paths: vec!["gdp_growth".to_string()],
709 }];
710
711 let result = engine.propagate(&validated, 3).unwrap();
712 if let Some(changes) = result.changes_by_month.get(&1) {
714 let paths: Vec<&str> = changes.iter().map(|c| c.path.as_str()).collect();
715 assert!(
716 paths.contains(&"transactions.volume_multiplier")
717 || paths.contains(&"anomaly_injection.base_rate")
718 );
719 }
720 }
721
722 #[test]
723 fn test_propagation_lag_respected() {
724 let mut dag = CausalDAG {
725 nodes: vec![
726 CausalNode {
727 id: "a".to_string(),
728 label: "A".to_string(),
729 category: NodeCategory::Macro,
730 baseline_value: 1.0,
731 bounds: None,
732 interventionable: true,
733 config_bindings: vec![],
734 },
735 CausalNode {
736 id: "b".to_string(),
737 label: "B".to_string(),
738 category: NodeCategory::Operational,
739 baseline_value: 0.0,
740 bounds: None,
741 interventionable: false,
742 config_bindings: vec!["test.path".to_string()],
743 },
744 ],
745 edges: vec![CausalEdge {
746 from: "a".to_string(),
747 to: "b".to_string(),
748 transfer: TransferFunction::Linear {
749 coefficient: 1.0,
750 intercept: 0.0,
751 },
752 lag_months: 3,
753 strength: 1.0,
754 mechanism: None,
755 }],
756 topological_order: vec![],
757 };
758 dag.validate().expect("DAG should be valid");
759
760 let engine = CausalPropagationEngine::new(&dag);
761
762 let intervention_type = InterventionType::Custom(datasynth_core::CustomIntervention {
763 name: "test".to_string(),
764 config_overrides: HashMap::new(),
765 downstream_triggers: vec![],
766 });
767
768 let intervention = Intervention {
770 id: Uuid::new_v4(),
771 intervention_type,
772 timing: InterventionTiming {
773 start_month: 1,
774 duration_months: None,
775 onset: OnsetType::Sudden,
776 ramp_months: None,
777 },
778 label: None,
779 priority: 0,
780 };
781
782 let validated = vec![ValidatedIntervention {
783 intervention,
784 affected_config_paths: vec![],
785 }];
786
787 let result = engine.propagate(&validated, 6).unwrap();
788 assert!(result.changes_by_month.is_empty() || !result.changes_by_month.is_empty());
791 }
792
793 #[test]
794 fn test_propagation_node_bounds_clamped() {
795 let dag = make_simple_dag();
796 let engine = CausalPropagationEngine::new(&dag);
797
798 let intervention = make_intervention(
799 InterventionType::MacroShock(MacroShockIntervention {
800 subtype: MacroShockType::Recession,
801 severity: 5.0, preset: None,
803 overrides: {
804 let mut m = HashMap::new();
805 m.insert("gdp_growth".to_string(), -0.20);
806 m
807 },
808 }),
809 1,
810 OnsetType::Sudden,
811 );
812
813 let validated = vec![ValidatedIntervention {
814 intervention,
815 affected_config_paths: vec!["gdp_growth".to_string()],
816 }];
817
818 let result = engine.propagate(&validated, 3).unwrap();
819 assert!(!result.changes_by_month.is_empty());
822 }
823
824 fn make_dag_with_operational_nodes() -> CausalDAG {
825 let mut dag = CausalDAG {
826 nodes: vec![
827 CausalNode {
828 id: "processing_lag".to_string(),
829 label: "Processing Lag".to_string(),
830 category: NodeCategory::Operational,
831 baseline_value: 2.0,
832 bounds: Some((0.5, 10.0)),
833 interventionable: true,
834 config_bindings: vec!["temporal_patterns.processing_lags.base_mu".to_string()],
835 },
836 CausalNode {
837 id: "error_rate".to_string(),
838 label: "Error Rate".to_string(),
839 category: NodeCategory::Outcome,
840 baseline_value: 0.02,
841 bounds: Some((0.0, 0.30)),
842 interventionable: false,
843 config_bindings: vec!["anomaly_injection.base_rate".to_string()],
844 },
845 CausalNode {
846 id: "control_effectiveness".to_string(),
847 label: "Control Effectiveness".to_string(),
848 category: NodeCategory::Operational,
849 baseline_value: 0.85,
850 bounds: Some((0.0, 1.0)),
851 interventionable: true,
852 config_bindings: vec!["internal_controls.exception_rate".to_string()],
853 },
854 CausalNode {
855 id: "sod_compliance".to_string(),
856 label: "SoD Compliance".to_string(),
857 category: NodeCategory::Operational,
858 baseline_value: 0.90,
859 bounds: Some((0.0, 1.0)),
860 interventionable: true,
861 config_bindings: vec!["internal_controls.sod_violation_rate".to_string()],
862 },
863 CausalNode {
864 id: "misstatement_risk".to_string(),
865 label: "Misstatement Risk".to_string(),
866 category: NodeCategory::Outcome,
867 baseline_value: 0.05,
868 bounds: Some((0.0, 1.0)),
869 interventionable: false,
870 config_bindings: vec!["fraud.fraud_rate".to_string()],
871 },
872 ],
873 edges: vec![CausalEdge {
874 from: "processing_lag".to_string(),
875 to: "error_rate".to_string(),
876 transfer: TransferFunction::Linear {
877 coefficient: 0.01,
878 intercept: 0.0,
879 },
880 lag_months: 0,
881 strength: 1.0,
882 mechanism: Some("Lag increases errors".to_string()),
883 }],
884 topological_order: vec![],
885 };
886 dag.validate().expect("DAG should be valid");
887 dag
888 }
889
890 #[test]
891 fn test_propagation_process_change_automation() {
892 let dag = make_dag_with_operational_nodes();
893 let engine = CausalPropagationEngine::new(&dag);
894
895 let intervention = make_intervention(
896 InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
897 subtype: datasynth_core::ProcessChangeType::ProcessAutomation,
898 parameters: HashMap::new(),
899 }),
900 1,
901 OnsetType::Sudden,
902 );
903
904 let validated = vec![ValidatedIntervention {
905 intervention,
906 affected_config_paths: vec![],
907 }];
908
909 let result = engine.propagate(&validated, 3).unwrap();
910 assert!(!result.changes_by_month.is_empty());
912 if let Some(changes) = result.changes_by_month.get(&1) {
913 let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
914 assert!(lag_change.is_some(), "Should have processing_lag change");
915 }
916 }
917
918 #[test]
919 fn test_propagation_regulatory_change() {
920 let dag = make_dag_with_operational_nodes();
921 let engine = CausalPropagationEngine::new(&dag);
922
923 let mut params = HashMap::new();
924 params.insert("severity".to_string(), serde_json::json!(0.8));
925
926 let intervention = make_intervention(
927 InterventionType::RegulatoryChange(datasynth_core::RegulatoryChangeIntervention {
928 subtype: datasynth_core::RegulatoryChangeType::NewStandardAdoption,
929 parameters: params,
930 }),
931 1,
932 OnsetType::Sudden,
933 );
934
935 let validated = vec![ValidatedIntervention {
936 intervention,
937 affected_config_paths: vec![],
938 }];
939
940 let result = engine.propagate(&validated, 3).unwrap();
941 assert!(!result.changes_by_month.is_empty());
943 }
944
945 #[test]
946 fn test_propagation_entity_event_employee_departure() {
947 let dag = make_dag_with_operational_nodes();
948 let engine = CausalPropagationEngine::new(&dag);
949
950 let intervention = make_intervention(
951 InterventionType::EntityEvent(datasynth_core::EntityEventIntervention {
952 subtype: datasynth_core::InterventionEntityEvent::EmployeeDeparture,
953 target: datasynth_core::EntityTarget {
954 cluster: None,
955 entity_ids: None,
956 filter: None,
957 count: Some(3),
958 fraction: None,
959 },
960 parameters: HashMap::new(),
961 }),
962 1,
963 OnsetType::Sudden,
964 );
965
966 let validated = vec![ValidatedIntervention {
967 intervention,
968 affected_config_paths: vec![],
969 }];
970
971 let result = engine.propagate(&validated, 2).unwrap();
972 assert!(!result.changes_by_month.is_empty());
974 }
975
976 #[test]
977 fn test_propagation_process_change_system_migration() {
978 let dag = make_dag_with_operational_nodes();
979 let engine = CausalPropagationEngine::new(&dag);
980
981 let intervention = make_intervention(
982 InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
983 subtype: datasynth_core::ProcessChangeType::SystemMigration,
984 parameters: HashMap::new(),
985 }),
986 1,
987 OnsetType::Sudden,
988 );
989
990 let validated = vec![ValidatedIntervention {
991 intervention,
992 affected_config_paths: vec![],
993 }];
994
995 let result = engine.propagate(&validated, 2).unwrap();
996 assert!(!result.changes_by_month.is_empty());
998 if let Some(changes) = result.changes_by_month.get(&1) {
999 let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
1000 assert!(lag_change.is_some(), "Should have processing_lag change");
1001 }
1002 }
1003}