1use datasynth_core::causal_dag::{CausalDAG, CausalDAGError};
7use datasynth_core::{Intervention, InterventionTiming, InterventionType, OnsetType};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use thiserror::Error;
11
12#[derive(Debug, Clone)]
14pub struct ValidatedIntervention {
15 pub intervention: Intervention,
16 pub affected_config_paths: Vec<String>,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct PropagatedInterventions {
22 pub changes_by_month: BTreeMap<u32, Vec<ConfigChange>>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConfigChange {
28 pub path: String,
30 pub value: serde_json::Value,
32 pub source_node: String,
34 pub is_direct: bool,
36}
37
38#[derive(Debug, Error)]
40pub enum PropagationError {
41 #[error("DAG validation failed: {0}")]
42 DagValidation(#[from] CausalDAGError),
43 #[error("no causal node mapping for intervention target: {0}")]
44 NoNodeMapping(String),
45}
46
47pub struct CausalPropagationEngine<'a> {
49 dag: &'a CausalDAG,
50}
51
52impl<'a> CausalPropagationEngine<'a> {
53 pub fn new(dag: &'a CausalDAG) -> Self {
54 Self { dag }
55 }
56
57 pub fn propagate(
59 &self,
60 interventions: &[ValidatedIntervention],
61 period_months: u32,
62 ) -> Result<PropagatedInterventions, PropagationError> {
63 let mut result = PropagatedInterventions::default();
64
65 for month in 1..=period_months {
66 let direct = self.compute_direct_effects(interventions, month);
68
69 if direct.is_empty() {
70 continue;
71 }
72
73 let propagated_values = self.dag.propagate(&direct, month);
75
76 let mut changes = Vec::new();
78 for (node_id, value) in &propagated_values {
79 if let Some(node) = self.dag.find_node(node_id) {
80 if (value - node.baseline_value).abs() < f64::EPSILON {
82 continue;
83 }
84
85 let is_direct = direct.contains_key(node_id);
86 for binding in &node.config_bindings {
87 changes.push(ConfigChange {
88 path: binding.clone(),
89 value: serde_json::Value::from(*value),
90 source_node: node_id.clone(),
91 is_direct,
92 });
93 }
94 }
95 }
96
97 if !changes.is_empty() {
98 result.changes_by_month.insert(month, changes);
99 }
100 }
101
102 Ok(result)
103 }
104
105 fn compute_direct_effects(
107 &self,
108 interventions: &[ValidatedIntervention],
109 month: u32,
110 ) -> HashMap<String, f64> {
111 let mut effects = HashMap::new();
112
113 for validated in interventions {
114 let timing = &validated.intervention.timing;
115
116 if !Self::is_active(timing, month) {
118 continue;
119 }
120
121 let onset_factor = Self::compute_onset_factor(timing, month);
123
124 self.map_intervention_to_nodes(
126 &validated.intervention.intervention_type,
127 onset_factor,
128 &mut effects,
129 );
130 }
131
132 effects
133 }
134
135 fn is_active(timing: &InterventionTiming, month: u32) -> bool {
137 if month < timing.start_month {
138 return false;
139 }
140 if let Some(duration) = timing.duration_months {
141 if month >= timing.start_month + duration {
142 return false;
143 }
144 }
145 true
146 }
147
148 fn compute_onset_factor(timing: &InterventionTiming, month: u32) -> f64 {
150 let months_active = month - timing.start_month;
151
152 match &timing.onset {
153 OnsetType::Sudden => 1.0,
154 OnsetType::Gradual => {
155 let ramp = timing.ramp_months.unwrap_or(1).max(1);
156 if months_active >= ramp {
157 1.0
158 } else {
159 months_active as f64 / ramp as f64
160 }
161 }
162 OnsetType::Oscillating => {
163 let ramp = timing.ramp_months.unwrap_or(4).max(1) as f64;
164 let phase = months_active as f64 / ramp;
165 0.5 * (1.0 - (std::f64::consts::PI * phase).cos())
167 }
168 OnsetType::Custom { .. } => {
169 let ramp = timing.ramp_months.unwrap_or(1).max(1);
171 if months_active >= ramp {
172 1.0
173 } else {
174 months_active as f64 / ramp as f64
175 }
176 }
177 }
178 }
179
180 fn map_intervention_to_nodes(
182 &self,
183 intervention_type: &InterventionType,
184 onset_factor: f64,
185 effects: &mut HashMap<String, f64>,
186 ) {
187 match intervention_type {
188 InterventionType::ParameterShift(ps) => {
189 for node in &self.dag.nodes {
191 if node.config_bindings.contains(&ps.target) {
192 if let Some(to_val) = ps.to.as_f64() {
193 let from_val = ps
194 .from
195 .as_ref()
196 .and_then(serde_json::Value::as_f64)
197 .unwrap_or(node.baseline_value);
198 let interpolated = from_val + (to_val - from_val) * onset_factor;
199 effects.insert(node.id.clone(), interpolated);
200 }
201 }
202 }
203 }
204 InterventionType::MacroShock(ms) => {
205 use datasynth_core::MacroShockType;
207 let severity = ms.severity * onset_factor;
208 match ms.subtype {
209 MacroShockType::Recession => {
210 if let Some(node) = self.dag.find_node("gdp_growth") {
211 let shock = ms.overrides.get("gdp_growth").copied().unwrap_or(-0.02);
212 effects.insert(
213 "gdp_growth".to_string(),
214 node.baseline_value + shock * severity,
215 );
216 }
217 if let Some(node) = self.dag.find_node("unemployment_rate") {
218 let shock = ms
219 .overrides
220 .get("unemployment_rate")
221 .copied()
222 .unwrap_or(0.03);
223 effects.insert(
224 "unemployment_rate".to_string(),
225 node.baseline_value + shock * severity,
226 );
227 }
228 }
229 MacroShockType::InflationSpike => {
230 if let Some(node) = self.dag.find_node("inflation_rate") {
231 let shock = ms.overrides.get("inflation_rate").copied().unwrap_or(0.05);
232 effects.insert(
233 "inflation_rate".to_string(),
234 node.baseline_value + shock * severity,
235 );
236 }
237 }
238 MacroShockType::InterestRateShock => {
239 if let Some(node) = self.dag.find_node("interest_rate") {
240 let shock = ms.overrides.get("interest_rate").copied().unwrap_or(0.03);
241 effects.insert(
242 "interest_rate".to_string(),
243 node.baseline_value + shock * severity,
244 );
245 }
246 }
247 _ => {
248 if let Some(node) = self.dag.find_node("gdp_growth") {
250 effects.insert(
251 "gdp_growth".to_string(),
252 node.baseline_value * (1.0 - 0.1 * severity),
253 );
254 }
255 }
256 }
257 }
258 InterventionType::ControlFailure(cf) => {
259 if let Some(node) = self.dag.find_node("control_effectiveness") {
260 let new_effectiveness = node.baseline_value * cf.severity * onset_factor
261 + node.baseline_value * (1.0 - onset_factor);
262 effects.insert("control_effectiveness".to_string(), new_effectiveness);
263 }
264 }
265 InterventionType::EntityEvent(ee) => {
266 use datasynth_core::InterventionEntityEvent;
267 let rate_increase = ee
268 .parameters
269 .get("rate_increase")
270 .and_then(serde_json::Value::as_f64)
271 .unwrap_or(0.05);
272 match ee.subtype {
273 InterventionEntityEvent::VendorDefault => {
274 if let Some(node) = self.dag.find_node("vendor_default_rate") {
275 effects.insert(
276 "vendor_default_rate".to_string(),
277 node.baseline_value + rate_increase * onset_factor,
278 );
279 }
280 }
281 InterventionEntityEvent::CustomerChurn => {
282 if let Some(node) = self.dag.find_node("customer_churn_rate") {
283 effects.insert(
284 "customer_churn_rate".to_string(),
285 node.baseline_value + rate_increase * onset_factor,
286 );
287 }
288 }
289 InterventionEntityEvent::EmployeeDeparture
290 | InterventionEntityEvent::KeyPersonRisk => {
291 if let Some(node) = self.dag.find_node("processing_lag") {
293 effects.insert(
294 "processing_lag".to_string(),
295 node.baseline_value * (1.0 + 0.2 * onset_factor),
296 );
297 }
298 if let Some(node) = self.dag.find_node("error_rate") {
299 effects.insert(
300 "error_rate".to_string(),
301 node.baseline_value * (1.0 + 0.15 * onset_factor),
302 );
303 }
304 }
305 InterventionEntityEvent::NewVendorOnboarding => {
306 if let Some(node) = self.dag.find_node("transaction_volume") {
308 effects.insert(
309 "transaction_volume".to_string(),
310 node.baseline_value * (1.0 + 0.1 * onset_factor),
311 );
312 }
313 }
314 InterventionEntityEvent::MergerAcquisition => {
315 if let Some(node) = self.dag.find_node("transaction_volume") {
317 effects.insert(
318 "transaction_volume".to_string(),
319 node.baseline_value * (1.0 + 0.5 * onset_factor),
320 );
321 }
322 if let Some(node) = self.dag.find_node("error_rate") {
323 effects.insert(
324 "error_rate".to_string(),
325 node.baseline_value * (1.0 + 0.3 * onset_factor),
326 );
327 }
328 }
329 InterventionEntityEvent::VendorCollusion => {
330 if let Some(node) = self.dag.find_node("misstatement_risk") {
332 effects.insert(
333 "misstatement_risk".to_string(),
334 (node.baseline_value + 0.15 * onset_factor).min(1.0),
335 );
336 }
337 if let Some(node) = self.dag.find_node("control_effectiveness") {
338 effects.insert(
339 "control_effectiveness".to_string(),
340 node.baseline_value * (1.0 - 0.2 * onset_factor),
341 );
342 }
343 }
344 InterventionEntityEvent::CustomerConsolidation => {
345 if let Some(node) = self.dag.find_node("customer_churn_rate") {
347 effects.insert(
348 "customer_churn_rate".to_string(),
349 node.baseline_value + rate_increase * onset_factor,
350 );
351 }
352 }
353 }
354 }
355 InterventionType::Custom(ci) => {
356 for (path, value) in &ci.config_overrides {
358 for node in &self.dag.nodes {
359 if node.config_bindings.contains(path) {
360 if let Some(v) = value.as_f64() {
361 let interpolated =
362 node.baseline_value + (v - node.baseline_value) * onset_factor;
363 effects.insert(node.id.clone(), interpolated);
364 }
365 }
366 }
367 }
368 }
369 InterventionType::ProcessChange(pc) => {
370 use datasynth_core::ProcessChangeType;
371 match pc.subtype {
372 ProcessChangeType::ProcessAutomation => {
373 if let Some(node) = self.dag.find_node("processing_lag") {
375 effects.insert(
376 "processing_lag".to_string(),
377 node.baseline_value * (1.0 - 0.3 * onset_factor),
378 );
379 }
380 if let Some(node) = self.dag.find_node("error_rate") {
381 effects.insert(
382 "error_rate".to_string(),
383 node.baseline_value * (1.0 - 0.2 * onset_factor),
384 );
385 }
386 }
387 ProcessChangeType::ApprovalThresholdChange
388 | ProcessChangeType::NewApprovalLevel => {
389 if let Some(node) = self.dag.find_node("control_effectiveness") {
391 effects.insert(
392 "control_effectiveness".to_string(),
393 (node.baseline_value + 0.1 * onset_factor).min(1.0),
394 );
395 }
396 }
397 ProcessChangeType::PolicyChange => {
398 if let Some(node) = self.dag.find_node("sod_compliance") {
399 effects.insert(
400 "sod_compliance".to_string(),
401 (node.baseline_value + 0.05 * onset_factor).min(1.0),
402 );
403 }
404 }
405 ProcessChangeType::SystemMigration
406 | ProcessChangeType::OutsourcingTransition
407 | ProcessChangeType::ReorganizationRestructuring => {
408 if let Some(node) = self.dag.find_node("processing_lag") {
410 effects.insert(
411 "processing_lag".to_string(),
412 node.baseline_value * (1.0 + 0.15 * onset_factor),
413 );
414 }
415 if let Some(node) = self.dag.find_node("error_rate") {
416 effects.insert(
417 "error_rate".to_string(),
418 node.baseline_value * (1.0 + 0.1 * onset_factor),
419 );
420 }
421 }
422 }
423 }
424 InterventionType::RegulatoryChange(rc) => {
425 let severity = rc
426 .parameters
427 .get("severity")
428 .and_then(serde_json::Value::as_f64)
429 .unwrap_or(0.5);
430 if let Some(node) = self.dag.find_node("sod_compliance") {
432 effects.insert(
433 "sod_compliance".to_string(),
434 (node.baseline_value + severity * 0.2 * onset_factor).min(1.0),
435 );
436 }
437 if let Some(node) = self.dag.find_node("control_effectiveness") {
438 effects.insert(
439 "control_effectiveness".to_string(),
440 (node.baseline_value + severity * 0.15 * onset_factor).min(1.0),
441 );
442 }
443 if let Some(node) = self.dag.find_node("misstatement_risk") {
444 effects.insert(
445 "misstatement_risk".to_string(),
446 node.baseline_value * (1.0 - severity * 0.1 * onset_factor),
447 );
448 }
449 }
450 InterventionType::Composite(comp) => {
451 for child in &comp.children {
452 self.map_intervention_to_nodes(child, onset_factor, effects);
453 }
454 }
455 }
456 }
457}
458
459#[cfg(test)]
460#[allow(clippy::unwrap_used)]
461mod tests {
462 use super::*;
463 use datasynth_core::causal_dag::{CausalEdge, CausalNode, NodeCategory, TransferFunction};
464 use datasynth_core::{MacroShockIntervention, MacroShockType};
465 use uuid::Uuid;
466
467 fn make_simple_dag() -> CausalDAG {
468 let mut dag = CausalDAG {
469 nodes: vec![
470 CausalNode {
471 id: "gdp_growth".to_string(),
472 label: "GDP Growth".to_string(),
473 category: NodeCategory::Macro,
474 baseline_value: 0.025,
475 bounds: Some((-0.10, 0.15)),
476 interventionable: true,
477 config_bindings: vec![],
478 },
479 CausalNode {
480 id: "transaction_volume".to_string(),
481 label: "Transaction Volume".to_string(),
482 category: NodeCategory::Operational,
483 baseline_value: 1.0,
484 bounds: Some((0.2, 3.0)),
485 interventionable: true,
486 config_bindings: vec!["transactions.volume_multiplier".to_string()],
487 },
488 CausalNode {
489 id: "error_rate".to_string(),
490 label: "Error Rate".to_string(),
491 category: NodeCategory::Outcome,
492 baseline_value: 0.02,
493 bounds: Some((0.0, 0.30)),
494 interventionable: false,
495 config_bindings: vec!["anomaly_injection.base_rate".to_string()],
496 },
497 ],
498 edges: vec![
499 CausalEdge {
500 from: "gdp_growth".to_string(),
501 to: "transaction_volume".to_string(),
502 transfer: TransferFunction::Linear {
503 coefficient: 0.8,
504 intercept: 0.0,
505 },
506 lag_months: 0,
507 strength: 1.0,
508 mechanism: Some("GDP drives volume".to_string()),
509 },
510 CausalEdge {
511 from: "transaction_volume".to_string(),
512 to: "error_rate".to_string(),
513 transfer: TransferFunction::Linear {
514 coefficient: 0.01,
515 intercept: 0.0,
516 },
517 lag_months: 0,
518 strength: 1.0,
519 mechanism: Some("Volume increases errors".to_string()),
520 },
521 ],
522 topological_order: vec![],
523 };
524 dag.validate().expect("DAG should be valid");
525 dag
526 }
527
528 fn make_intervention(
529 intervention_type: InterventionType,
530 start_month: u32,
531 onset: OnsetType,
532 ) -> Intervention {
533 Intervention {
534 id: Uuid::new_v4(),
535 intervention_type,
536 timing: InterventionTiming {
537 start_month,
538 duration_months: None,
539 onset,
540 ramp_months: Some(3),
541 },
542 label: None,
543 priority: 0,
544 }
545 }
546
547 #[test]
548 fn test_propagation_no_interventions() {
549 let dag = make_simple_dag();
550 let engine = CausalPropagationEngine::new(&dag);
551 let result = engine.propagate(&[], 12).unwrap();
552 assert!(result.changes_by_month.is_empty());
553 }
554
555 #[test]
556 fn test_propagation_sudden_onset() {
557 let dag = make_simple_dag();
558 let engine = CausalPropagationEngine::new(&dag);
559
560 let intervention = make_intervention(
561 InterventionType::MacroShock(MacroShockIntervention {
562 subtype: MacroShockType::Recession,
563 severity: 1.0,
564 preset: None,
565 overrides: {
566 let mut m = HashMap::new();
567 m.insert("gdp_growth".to_string(), -0.02);
568 m
569 },
570 }),
571 3,
572 OnsetType::Sudden,
573 );
574
575 let validated = vec![ValidatedIntervention {
576 intervention,
577 affected_config_paths: vec!["gdp_growth".to_string()],
578 }];
579
580 let result = engine.propagate(&validated, 6).unwrap();
581 assert!(result.changes_by_month.contains_key(&3));
583 assert!(!result.changes_by_month.contains_key(&1));
585 assert!(!result.changes_by_month.contains_key(&2));
586 }
587
588 #[test]
589 fn test_propagation_gradual_onset() {
590 let dag = make_simple_dag();
591 let engine = CausalPropagationEngine::new(&dag);
592
593 let intervention = make_intervention(
594 InterventionType::MacroShock(MacroShockIntervention {
595 subtype: MacroShockType::Recession,
596 severity: 1.0,
597 preset: None,
598 overrides: {
599 let mut m = HashMap::new();
600 m.insert("gdp_growth".to_string(), -0.02);
601 m
602 },
603 }),
604 1,
605 OnsetType::Gradual,
606 );
607
608 let validated = vec![ValidatedIntervention {
609 intervention,
610 affected_config_paths: vec!["gdp_growth".to_string()],
611 }];
612
613 let result = engine.propagate(&validated, 6).unwrap();
614 assert!(result.changes_by_month.contains_key(&2));
618 assert!(result.changes_by_month.contains_key(&4));
619 }
620
621 #[test]
622 fn test_propagation_chain_through_dag() {
623 let dag = make_simple_dag();
624 let engine = CausalPropagationEngine::new(&dag);
625
626 let intervention = make_intervention(
627 InterventionType::MacroShock(MacroShockIntervention {
628 subtype: MacroShockType::Recession,
629 severity: 1.0,
630 preset: None,
631 overrides: {
632 let mut m = HashMap::new();
633 m.insert("gdp_growth".to_string(), -0.05);
634 m
635 },
636 }),
637 1,
638 OnsetType::Sudden,
639 );
640
641 let validated = vec![ValidatedIntervention {
642 intervention,
643 affected_config_paths: vec!["gdp_growth".to_string()],
644 }];
645
646 let result = engine.propagate(&validated, 3).unwrap();
647 if let Some(changes) = result.changes_by_month.get(&1) {
649 let paths: Vec<&str> = changes.iter().map(|c| c.path.as_str()).collect();
650 assert!(
651 paths.contains(&"transactions.volume_multiplier")
652 || paths.contains(&"anomaly_injection.base_rate")
653 );
654 }
655 }
656
657 #[test]
658 fn test_propagation_lag_respected() {
659 let mut dag = CausalDAG {
660 nodes: vec![
661 CausalNode {
662 id: "a".to_string(),
663 label: "A".to_string(),
664 category: NodeCategory::Macro,
665 baseline_value: 1.0,
666 bounds: None,
667 interventionable: true,
668 config_bindings: vec![],
669 },
670 CausalNode {
671 id: "b".to_string(),
672 label: "B".to_string(),
673 category: NodeCategory::Operational,
674 baseline_value: 0.0,
675 bounds: None,
676 interventionable: false,
677 config_bindings: vec!["test.path".to_string()],
678 },
679 ],
680 edges: vec![CausalEdge {
681 from: "a".to_string(),
682 to: "b".to_string(),
683 transfer: TransferFunction::Linear {
684 coefficient: 1.0,
685 intercept: 0.0,
686 },
687 lag_months: 3,
688 strength: 1.0,
689 mechanism: None,
690 }],
691 topological_order: vec![],
692 };
693 dag.validate().expect("DAG should be valid");
694
695 let engine = CausalPropagationEngine::new(&dag);
696
697 let intervention_type = InterventionType::Custom(datasynth_core::CustomIntervention {
698 name: "test".to_string(),
699 config_overrides: HashMap::new(),
700 downstream_triggers: vec![],
701 });
702
703 let intervention = Intervention {
705 id: Uuid::new_v4(),
706 intervention_type,
707 timing: InterventionTiming {
708 start_month: 1,
709 duration_months: None,
710 onset: OnsetType::Sudden,
711 ramp_months: None,
712 },
713 label: None,
714 priority: 0,
715 };
716
717 let validated = vec![ValidatedIntervention {
718 intervention,
719 affected_config_paths: vec![],
720 }];
721
722 let result = engine.propagate(&validated, 6).unwrap();
723 assert!(result.changes_by_month.is_empty() || !result.changes_by_month.is_empty());
726 }
727
728 #[test]
729 fn test_propagation_node_bounds_clamped() {
730 let dag = make_simple_dag();
731 let engine = CausalPropagationEngine::new(&dag);
732
733 let intervention = make_intervention(
734 InterventionType::MacroShock(MacroShockIntervention {
735 subtype: MacroShockType::Recession,
736 severity: 5.0, preset: None,
738 overrides: {
739 let mut m = HashMap::new();
740 m.insert("gdp_growth".to_string(), -0.20);
741 m
742 },
743 }),
744 1,
745 OnsetType::Sudden,
746 );
747
748 let validated = vec![ValidatedIntervention {
749 intervention,
750 affected_config_paths: vec!["gdp_growth".to_string()],
751 }];
752
753 let result = engine.propagate(&validated, 3).unwrap();
754 assert!(!result.changes_by_month.is_empty());
757 }
758
759 fn make_dag_with_operational_nodes() -> CausalDAG {
760 let mut dag = CausalDAG {
761 nodes: vec![
762 CausalNode {
763 id: "processing_lag".to_string(),
764 label: "Processing Lag".to_string(),
765 category: NodeCategory::Operational,
766 baseline_value: 2.0,
767 bounds: Some((0.5, 10.0)),
768 interventionable: true,
769 config_bindings: vec!["temporal_patterns.processing_lags.base_mu".to_string()],
770 },
771 CausalNode {
772 id: "error_rate".to_string(),
773 label: "Error Rate".to_string(),
774 category: NodeCategory::Outcome,
775 baseline_value: 0.02,
776 bounds: Some((0.0, 0.30)),
777 interventionable: false,
778 config_bindings: vec!["anomaly_injection.base_rate".to_string()],
779 },
780 CausalNode {
781 id: "control_effectiveness".to_string(),
782 label: "Control Effectiveness".to_string(),
783 category: NodeCategory::Operational,
784 baseline_value: 0.85,
785 bounds: Some((0.0, 1.0)),
786 interventionable: true,
787 config_bindings: vec!["internal_controls.exception_rate".to_string()],
788 },
789 CausalNode {
790 id: "sod_compliance".to_string(),
791 label: "SoD Compliance".to_string(),
792 category: NodeCategory::Operational,
793 baseline_value: 0.90,
794 bounds: Some((0.0, 1.0)),
795 interventionable: true,
796 config_bindings: vec!["internal_controls.sod_violation_rate".to_string()],
797 },
798 CausalNode {
799 id: "misstatement_risk".to_string(),
800 label: "Misstatement Risk".to_string(),
801 category: NodeCategory::Outcome,
802 baseline_value: 0.05,
803 bounds: Some((0.0, 1.0)),
804 interventionable: false,
805 config_bindings: vec!["fraud.fraud_rate".to_string()],
806 },
807 ],
808 edges: vec![CausalEdge {
809 from: "processing_lag".to_string(),
810 to: "error_rate".to_string(),
811 transfer: TransferFunction::Linear {
812 coefficient: 0.01,
813 intercept: 0.0,
814 },
815 lag_months: 0,
816 strength: 1.0,
817 mechanism: Some("Lag increases errors".to_string()),
818 }],
819 topological_order: vec![],
820 };
821 dag.validate().expect("DAG should be valid");
822 dag
823 }
824
825 #[test]
826 fn test_propagation_process_change_automation() {
827 let dag = make_dag_with_operational_nodes();
828 let engine = CausalPropagationEngine::new(&dag);
829
830 let intervention = make_intervention(
831 InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
832 subtype: datasynth_core::ProcessChangeType::ProcessAutomation,
833 parameters: HashMap::new(),
834 }),
835 1,
836 OnsetType::Sudden,
837 );
838
839 let validated = vec![ValidatedIntervention {
840 intervention,
841 affected_config_paths: vec![],
842 }];
843
844 let result = engine.propagate(&validated, 3).unwrap();
845 assert!(!result.changes_by_month.is_empty());
847 if let Some(changes) = result.changes_by_month.get(&1) {
848 let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
849 assert!(lag_change.is_some(), "Should have processing_lag change");
850 }
851 }
852
853 #[test]
854 fn test_propagation_regulatory_change() {
855 let dag = make_dag_with_operational_nodes();
856 let engine = CausalPropagationEngine::new(&dag);
857
858 let mut params = HashMap::new();
859 params.insert("severity".to_string(), serde_json::json!(0.8));
860
861 let intervention = make_intervention(
862 InterventionType::RegulatoryChange(datasynth_core::RegulatoryChangeIntervention {
863 subtype: datasynth_core::RegulatoryChangeType::NewStandardAdoption,
864 parameters: params,
865 }),
866 1,
867 OnsetType::Sudden,
868 );
869
870 let validated = vec![ValidatedIntervention {
871 intervention,
872 affected_config_paths: vec![],
873 }];
874
875 let result = engine.propagate(&validated, 3).unwrap();
876 assert!(!result.changes_by_month.is_empty());
878 }
879
880 #[test]
881 fn test_propagation_entity_event_employee_departure() {
882 let dag = make_dag_with_operational_nodes();
883 let engine = CausalPropagationEngine::new(&dag);
884
885 let intervention = make_intervention(
886 InterventionType::EntityEvent(datasynth_core::EntityEventIntervention {
887 subtype: datasynth_core::InterventionEntityEvent::EmployeeDeparture,
888 target: datasynth_core::EntityTarget {
889 cluster: None,
890 entity_ids: None,
891 filter: None,
892 count: Some(3),
893 fraction: None,
894 },
895 parameters: HashMap::new(),
896 }),
897 1,
898 OnsetType::Sudden,
899 );
900
901 let validated = vec![ValidatedIntervention {
902 intervention,
903 affected_config_paths: vec![],
904 }];
905
906 let result = engine.propagate(&validated, 2).unwrap();
907 assert!(!result.changes_by_month.is_empty());
909 }
910
911 #[test]
912 fn test_propagation_process_change_system_migration() {
913 let dag = make_dag_with_operational_nodes();
914 let engine = CausalPropagationEngine::new(&dag);
915
916 let intervention = make_intervention(
917 InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
918 subtype: datasynth_core::ProcessChangeType::SystemMigration,
919 parameters: HashMap::new(),
920 }),
921 1,
922 OnsetType::Sudden,
923 );
924
925 let validated = vec![ValidatedIntervention {
926 intervention,
927 affected_config_paths: vec![],
928 }];
929
930 let result = engine.propagate(&validated, 2).unwrap();
931 assert!(!result.changes_by_month.is_empty());
933 if let Some(changes) = result.changes_by_month.get(&1) {
934 let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
935 assert!(lag_change.is_some(), "Should have processing_lag change");
936 }
937 }
938}