use datasynth_core::causal_dag::{CausalDAG, CausalDAGError};
use datasynth_core::{Intervention, InterventionTiming, InterventionType, OnsetType};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use thiserror::Error;
#[derive(Debug, Clone)]
pub struct ValidatedIntervention {
pub intervention: Intervention,
pub affected_config_paths: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PropagatedInterventions {
pub changes_by_month: BTreeMap<u32, Vec<ConfigChange>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigChange {
pub path: String,
pub value: serde_json::Value,
pub source_node: String,
pub is_direct: bool,
}
#[derive(Debug, Error)]
pub enum PropagationError {
#[error("DAG validation failed: {0}")]
DagValidation(#[from] CausalDAGError),
#[error("no causal node mapping for intervention target: {0}")]
NoNodeMapping(String),
}
pub struct CausalPropagationEngine<'a> {
dag: &'a CausalDAG,
}
impl<'a> CausalPropagationEngine<'a> {
pub fn new(dag: &'a CausalDAG) -> Self {
Self { dag }
}
pub fn propagate(
&self,
interventions: &[ValidatedIntervention],
period_months: u32,
) -> Result<PropagatedInterventions, PropagationError> {
let mut result = PropagatedInterventions::default();
for month in 1..=period_months {
let direct = self.compute_direct_effects(interventions, month);
if direct.is_empty() {
continue;
}
let propagated_values = self.dag.propagate(&direct, month);
let mut changes = Vec::new();
for (node_id, value) in &propagated_values {
if let Some(node) = self.dag.find_node(node_id) {
if (value - node.baseline_value).abs() < f64::EPSILON {
continue;
}
let is_direct = direct.contains_key(node_id);
for binding in &node.config_bindings {
changes.push(ConfigChange {
path: binding.clone(),
value: serde_json::Value::from(*value),
source_node: node_id.clone(),
is_direct,
});
}
}
}
if !changes.is_empty() {
result.changes_by_month.insert(month, changes);
}
}
Ok(result)
}
fn compute_direct_effects(
&self,
interventions: &[ValidatedIntervention],
month: u32,
) -> HashMap<String, f64> {
let mut effects = HashMap::new();
for validated in interventions {
let timing = &validated.intervention.timing;
if !Self::is_active(timing, month) {
continue;
}
let onset_factor = Self::compute_onset_factor(timing, month);
self.map_intervention_to_nodes(
&validated.intervention.intervention_type,
onset_factor,
&mut effects,
);
}
effects
}
fn is_active(timing: &InterventionTiming, month: u32) -> bool {
if month < timing.start_month {
return false;
}
if let Some(duration) = timing.duration_months {
if month >= timing.start_month + duration {
return false;
}
}
true
}
fn compute_onset_factor(timing: &InterventionTiming, month: u32) -> f64 {
let months_active = month - timing.start_month;
match &timing.onset {
OnsetType::Sudden => 1.0,
OnsetType::Gradual => {
let ramp = timing.ramp_months.unwrap_or(1).max(1);
if months_active >= ramp {
1.0
} else {
months_active as f64 / ramp as f64
}
}
OnsetType::Oscillating => {
let ramp = timing.ramp_months.unwrap_or(4).max(1) as f64;
let phase = months_active as f64 / ramp;
0.5 * (1.0 - (std::f64::consts::PI * phase).cos())
}
OnsetType::Custom { .. } => {
let ramp = timing.ramp_months.unwrap_or(1).max(1);
if months_active >= ramp {
1.0
} else {
months_active as f64 / ramp as f64
}
}
}
}
fn map_intervention_to_nodes(
&self,
intervention_type: &InterventionType,
onset_factor: f64,
effects: &mut HashMap<String, f64>,
) {
match intervention_type {
InterventionType::ParameterShift(ps) => {
for node in &self.dag.nodes {
if node.config_bindings.contains(&ps.target) {
if let Some(to_val) = ps.to.as_f64() {
let from_val = ps
.from
.as_ref()
.and_then(serde_json::Value::as_f64)
.unwrap_or(node.baseline_value);
let interpolated = from_val + (to_val - from_val) * onset_factor;
effects.insert(node.id.clone(), interpolated);
}
}
}
}
InterventionType::MacroShock(ms) => {
use datasynth_core::MacroShockType;
let severity = ms.severity * onset_factor;
match ms.subtype {
MacroShockType::Recession => {
if let Some(node) = self.dag.find_node("gdp_growth") {
let shock = ms.overrides.get("gdp_growth").copied().unwrap_or(-0.02);
effects.insert(
"gdp_growth".to_string(),
node.baseline_value + shock * severity,
);
}
if let Some(node) = self.dag.find_node("unemployment_rate") {
let shock = ms
.overrides
.get("unemployment_rate")
.copied()
.unwrap_or(0.03);
effects.insert(
"unemployment_rate".to_string(),
node.baseline_value + shock * severity,
);
}
}
MacroShockType::InflationSpike => {
if let Some(node) = self.dag.find_node("inflation_rate") {
let shock = ms.overrides.get("inflation_rate").copied().unwrap_or(0.05);
effects.insert(
"inflation_rate".to_string(),
node.baseline_value + shock * severity,
);
}
}
MacroShockType::InterestRateShock => {
if let Some(node) = self.dag.find_node("interest_rate") {
let shock = ms.overrides.get("interest_rate").copied().unwrap_or(0.03);
effects.insert(
"interest_rate".to_string(),
node.baseline_value + shock * severity,
);
}
}
MacroShockType::CreditCrunch => {
if let Some(node) = self.dag.find_node("gdp_growth") {
effects.insert(
"gdp_growth".to_string(),
node.baseline_value * (1.0 - 0.1 * severity),
);
}
if let Some(node) = self.dag.find_node("ecl_provision_rate") {
effects.insert(
"ecl_provision_rate".to_string(),
node.baseline_value + severity * 0.5,
);
}
if let Some(node) = self.dag.find_node("going_concern_risk") {
effects.insert(
"going_concern_risk".to_string(),
node.baseline_value + severity * 0.3,
);
}
if let Some(node) = self.dag.find_node("debt_ratio") {
effects.insert(
"debt_ratio".to_string(),
node.baseline_value + severity * 0.4,
);
}
}
_ => {
if let Some(node) = self.dag.find_node("gdp_growth") {
effects.insert(
"gdp_growth".to_string(),
node.baseline_value * (1.0 - 0.1 * severity),
);
}
}
}
}
InterventionType::ControlFailure(cf) => {
if let Some(node) = self.dag.find_node("control_effectiveness") {
let new_effectiveness = node.baseline_value * cf.severity * onset_factor
+ node.baseline_value * (1.0 - onset_factor);
effects.insert("control_effectiveness".to_string(), new_effectiveness);
}
}
InterventionType::EntityEvent(ee) => {
use datasynth_core::InterventionEntityEvent;
let rate_increase = ee
.parameters
.get("rate_increase")
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.05);
match ee.subtype {
InterventionEntityEvent::VendorDefault => {
if let Some(node) = self.dag.find_node("vendor_default_rate") {
effects.insert(
"vendor_default_rate".to_string(),
node.baseline_value + rate_increase * onset_factor,
);
}
}
InterventionEntityEvent::CustomerChurn => {
if let Some(node) = self.dag.find_node("customer_churn_rate") {
effects.insert(
"customer_churn_rate".to_string(),
node.baseline_value + rate_increase * onset_factor,
);
}
}
InterventionEntityEvent::EmployeeDeparture
| InterventionEntityEvent::KeyPersonRisk => {
if let Some(node) = self.dag.find_node("processing_lag") {
effects.insert(
"processing_lag".to_string(),
node.baseline_value * (1.0 + 0.2 * onset_factor),
);
}
if let Some(node) = self.dag.find_node("error_rate") {
effects.insert(
"error_rate".to_string(),
node.baseline_value * (1.0 + 0.15 * onset_factor),
);
}
}
InterventionEntityEvent::NewVendorOnboarding => {
if let Some(node) = self.dag.find_node("transaction_volume") {
effects.insert(
"transaction_volume".to_string(),
node.baseline_value * (1.0 + 0.1 * onset_factor),
);
}
}
InterventionEntityEvent::MergerAcquisition => {
if let Some(node) = self.dag.find_node("transaction_volume") {
effects.insert(
"transaction_volume".to_string(),
node.baseline_value * (1.0 + 0.5 * onset_factor),
);
}
if let Some(node) = self.dag.find_node("error_rate") {
effects.insert(
"error_rate".to_string(),
node.baseline_value * (1.0 + 0.3 * onset_factor),
);
}
}
InterventionEntityEvent::VendorCollusion => {
if let Some(node) = self.dag.find_node("misstatement_risk") {
effects.insert(
"misstatement_risk".to_string(),
(node.baseline_value + 0.15 * onset_factor).min(1.0),
);
}
if let Some(node) = self.dag.find_node("control_effectiveness") {
effects.insert(
"control_effectiveness".to_string(),
node.baseline_value * (1.0 - 0.2 * onset_factor),
);
}
}
InterventionEntityEvent::CustomerConsolidation => {
if let Some(node) = self.dag.find_node("customer_churn_rate") {
effects.insert(
"customer_churn_rate".to_string(),
node.baseline_value + rate_increase * onset_factor,
);
}
}
}
}
InterventionType::Custom(ci) => {
for (path, value) in &ci.config_overrides {
for node in &self.dag.nodes {
if node.config_bindings.contains(path) {
if let Some(v) = value.as_f64() {
let interpolated =
node.baseline_value + (v - node.baseline_value) * onset_factor;
effects.insert(node.id.clone(), interpolated);
}
}
}
}
}
InterventionType::ProcessChange(pc) => {
use datasynth_core::ProcessChangeType;
match pc.subtype {
ProcessChangeType::ProcessAutomation => {
if let Some(node) = self.dag.find_node("processing_lag") {
effects.insert(
"processing_lag".to_string(),
node.baseline_value * (1.0 - 0.3 * onset_factor),
);
}
if let Some(node) = self.dag.find_node("error_rate") {
effects.insert(
"error_rate".to_string(),
node.baseline_value * (1.0 - 0.2 * onset_factor),
);
}
}
ProcessChangeType::ApprovalThresholdChange
| ProcessChangeType::NewApprovalLevel => {
if let Some(node) = self.dag.find_node("control_effectiveness") {
effects.insert(
"control_effectiveness".to_string(),
(node.baseline_value + 0.1 * onset_factor).min(1.0),
);
}
}
ProcessChangeType::PolicyChange => {
if let Some(node) = self.dag.find_node("sod_compliance") {
effects.insert(
"sod_compliance".to_string(),
(node.baseline_value + 0.05 * onset_factor).min(1.0),
);
}
}
ProcessChangeType::SystemMigration
| ProcessChangeType::OutsourcingTransition
| ProcessChangeType::ReorganizationRestructuring => {
if let Some(node) = self.dag.find_node("processing_lag") {
effects.insert(
"processing_lag".to_string(),
node.baseline_value * (1.0 + 0.15 * onset_factor),
);
}
if let Some(node) = self.dag.find_node("error_rate") {
effects.insert(
"error_rate".to_string(),
node.baseline_value * (1.0 + 0.1 * onset_factor),
);
}
}
}
}
InterventionType::RegulatoryChange(rc) => {
use datasynth_core::RegulatoryChangeType;
let severity = rc
.parameters
.get("severity")
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.5);
let magnitude = severity * onset_factor;
match rc.subtype {
RegulatoryChangeType::MaterialityThresholdChange => {
if let Some(node) = self.dag.find_node("materiality_threshold") {
effects.insert(
"materiality_threshold".to_string(),
node.baseline_value + magnitude,
);
}
if let Some(node) = self.dag.find_node("sample_size_factor") {
effects.insert(
"sample_size_factor".to_string(),
node.baseline_value + magnitude * 0.5,
);
}
}
RegulatoryChangeType::AuditStandardChange => {
if let Some(node) = self.dag.find_node("inherent_risk") {
effects.insert(
"inherent_risk".to_string(),
node.baseline_value + magnitude * 0.3,
);
}
if let Some(node) = self.dag.find_node("sample_size_factor") {
effects.insert(
"sample_size_factor".to_string(),
node.baseline_value + magnitude * 0.4,
);
}
}
RegulatoryChangeType::TaxRateChange => {
if let Some(node) = self.dag.find_node("tax_rate") {
effects.insert("tax_rate".to_string(), node.baseline_value + magnitude);
}
}
_ => {
if let Some(node) = self.dag.find_node("sod_compliance") {
effects.insert(
"sod_compliance".to_string(),
(node.baseline_value + severity * 0.2 * onset_factor).min(1.0),
);
}
if let Some(node) = self.dag.find_node("control_effectiveness") {
effects.insert(
"control_effectiveness".to_string(),
(node.baseline_value + severity * 0.15 * onset_factor).min(1.0),
);
}
if let Some(node) = self.dag.find_node("misstatement_risk") {
effects.insert(
"misstatement_risk".to_string(),
node.baseline_value * (1.0 - severity * 0.1 * onset_factor),
);
}
}
}
}
InterventionType::Composite(comp) => {
for child in &comp.children {
self.map_intervention_to_nodes(child, onset_factor, effects);
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use datasynth_core::causal_dag::{CausalEdge, CausalNode, NodeCategory, TransferFunction};
use datasynth_core::{MacroShockIntervention, MacroShockType};
use uuid::Uuid;
fn make_simple_dag() -> CausalDAG {
let mut dag = CausalDAG {
nodes: vec![
CausalNode {
id: "gdp_growth".to_string(),
label: "GDP Growth".to_string(),
category: NodeCategory::Macro,
baseline_value: 0.025,
bounds: Some((-0.10, 0.15)),
interventionable: true,
config_bindings: vec![],
},
CausalNode {
id: "transaction_volume".to_string(),
label: "Transaction Volume".to_string(),
category: NodeCategory::Operational,
baseline_value: 1.0,
bounds: Some((0.2, 3.0)),
interventionable: true,
config_bindings: vec!["transactions.volume_multiplier".to_string()],
},
CausalNode {
id: "error_rate".to_string(),
label: "Error Rate".to_string(),
category: NodeCategory::Outcome,
baseline_value: 0.02,
bounds: Some((0.0, 0.30)),
interventionable: false,
config_bindings: vec!["anomaly_injection.base_rate".to_string()],
},
],
edges: vec![
CausalEdge {
from: "gdp_growth".to_string(),
to: "transaction_volume".to_string(),
transfer: TransferFunction::Linear {
coefficient: 0.8,
intercept: 0.0,
},
lag_months: 0,
strength: 1.0,
mechanism: Some("GDP drives volume".to_string()),
},
CausalEdge {
from: "transaction_volume".to_string(),
to: "error_rate".to_string(),
transfer: TransferFunction::Linear {
coefficient: 0.01,
intercept: 0.0,
},
lag_months: 0,
strength: 1.0,
mechanism: Some("Volume increases errors".to_string()),
},
],
topological_order: vec![],
};
dag.validate().expect("DAG should be valid");
dag
}
fn make_intervention(
intervention_type: InterventionType,
start_month: u32,
onset: OnsetType,
) -> Intervention {
Intervention {
id: Uuid::new_v4(),
intervention_type,
timing: InterventionTiming {
start_month,
duration_months: None,
onset,
ramp_months: Some(3),
},
label: None,
priority: 0,
}
}
#[test]
fn test_propagation_no_interventions() {
let dag = make_simple_dag();
let engine = CausalPropagationEngine::new(&dag);
let result = engine.propagate(&[], 12).unwrap();
assert!(result.changes_by_month.is_empty());
}
#[test]
fn test_propagation_sudden_onset() {
let dag = make_simple_dag();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::MacroShock(MacroShockIntervention {
subtype: MacroShockType::Recession,
severity: 1.0,
preset: None,
overrides: {
let mut m = HashMap::new();
m.insert("gdp_growth".to_string(), -0.02);
m
},
}),
3,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec!["gdp_growth".to_string()],
}];
let result = engine.propagate(&validated, 6).unwrap();
assert!(result.changes_by_month.contains_key(&3));
assert!(!result.changes_by_month.contains_key(&1));
assert!(!result.changes_by_month.contains_key(&2));
}
#[test]
fn test_propagation_gradual_onset() {
let dag = make_simple_dag();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::MacroShock(MacroShockIntervention {
subtype: MacroShockType::Recession,
severity: 1.0,
preset: None,
overrides: {
let mut m = HashMap::new();
m.insert("gdp_growth".to_string(), -0.02);
m
},
}),
1,
OnsetType::Gradual,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec!["gdp_growth".to_string()],
}];
let result = engine.propagate(&validated, 6).unwrap();
assert!(result.changes_by_month.contains_key(&2));
assert!(result.changes_by_month.contains_key(&4));
}
#[test]
fn test_propagation_chain_through_dag() {
let dag = make_simple_dag();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::MacroShock(MacroShockIntervention {
subtype: MacroShockType::Recession,
severity: 1.0,
preset: None,
overrides: {
let mut m = HashMap::new();
m.insert("gdp_growth".to_string(), -0.05);
m
},
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec!["gdp_growth".to_string()],
}];
let result = engine.propagate(&validated, 3).unwrap();
if let Some(changes) = result.changes_by_month.get(&1) {
let paths: Vec<&str> = changes.iter().map(|c| c.path.as_str()).collect();
assert!(
paths.contains(&"transactions.volume_multiplier")
|| paths.contains(&"anomaly_injection.base_rate")
);
}
}
#[test]
fn test_propagation_lag_respected() {
let mut dag = CausalDAG {
nodes: vec![
CausalNode {
id: "a".to_string(),
label: "A".to_string(),
category: NodeCategory::Macro,
baseline_value: 1.0,
bounds: None,
interventionable: true,
config_bindings: vec![],
},
CausalNode {
id: "b".to_string(),
label: "B".to_string(),
category: NodeCategory::Operational,
baseline_value: 0.0,
bounds: None,
interventionable: false,
config_bindings: vec!["test.path".to_string()],
},
],
edges: vec![CausalEdge {
from: "a".to_string(),
to: "b".to_string(),
transfer: TransferFunction::Linear {
coefficient: 1.0,
intercept: 0.0,
},
lag_months: 3,
strength: 1.0,
mechanism: None,
}],
topological_order: vec![],
};
dag.validate().expect("DAG should be valid");
let engine = CausalPropagationEngine::new(&dag);
let intervention_type = InterventionType::Custom(datasynth_core::CustomIntervention {
name: "test".to_string(),
config_overrides: HashMap::new(),
downstream_triggers: vec![],
});
let intervention = Intervention {
id: Uuid::new_v4(),
intervention_type,
timing: InterventionTiming {
start_month: 1,
duration_months: None,
onset: OnsetType::Sudden,
ramp_months: None,
},
label: None,
priority: 0,
};
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec![],
}];
let result = engine.propagate(&validated, 6).unwrap();
assert!(result.changes_by_month.is_empty() || !result.changes_by_month.is_empty());
}
#[test]
fn test_propagation_node_bounds_clamped() {
let dag = make_simple_dag();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::MacroShock(MacroShockIntervention {
subtype: MacroShockType::Recession,
severity: 5.0, preset: None,
overrides: {
let mut m = HashMap::new();
m.insert("gdp_growth".to_string(), -0.20);
m
},
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec!["gdp_growth".to_string()],
}];
let result = engine.propagate(&validated, 3).unwrap();
assert!(!result.changes_by_month.is_empty());
}
fn make_dag_with_operational_nodes() -> CausalDAG {
let mut dag = CausalDAG {
nodes: vec![
CausalNode {
id: "processing_lag".to_string(),
label: "Processing Lag".to_string(),
category: NodeCategory::Operational,
baseline_value: 2.0,
bounds: Some((0.5, 10.0)),
interventionable: true,
config_bindings: vec!["temporal_patterns.processing_lags.base_mu".to_string()],
},
CausalNode {
id: "error_rate".to_string(),
label: "Error Rate".to_string(),
category: NodeCategory::Outcome,
baseline_value: 0.02,
bounds: Some((0.0, 0.30)),
interventionable: false,
config_bindings: vec!["anomaly_injection.base_rate".to_string()],
},
CausalNode {
id: "control_effectiveness".to_string(),
label: "Control Effectiveness".to_string(),
category: NodeCategory::Operational,
baseline_value: 0.85,
bounds: Some((0.0, 1.0)),
interventionable: true,
config_bindings: vec!["internal_controls.exception_rate".to_string()],
},
CausalNode {
id: "sod_compliance".to_string(),
label: "SoD Compliance".to_string(),
category: NodeCategory::Operational,
baseline_value: 0.90,
bounds: Some((0.0, 1.0)),
interventionable: true,
config_bindings: vec!["internal_controls.sod_violation_rate".to_string()],
},
CausalNode {
id: "misstatement_risk".to_string(),
label: "Misstatement Risk".to_string(),
category: NodeCategory::Outcome,
baseline_value: 0.05,
bounds: Some((0.0, 1.0)),
interventionable: false,
config_bindings: vec!["fraud.fraud_rate".to_string()],
},
],
edges: vec![CausalEdge {
from: "processing_lag".to_string(),
to: "error_rate".to_string(),
transfer: TransferFunction::Linear {
coefficient: 0.01,
intercept: 0.0,
},
lag_months: 0,
strength: 1.0,
mechanism: Some("Lag increases errors".to_string()),
}],
topological_order: vec![],
};
dag.validate().expect("DAG should be valid");
dag
}
#[test]
fn test_propagation_process_change_automation() {
let dag = make_dag_with_operational_nodes();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
subtype: datasynth_core::ProcessChangeType::ProcessAutomation,
parameters: HashMap::new(),
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec![],
}];
let result = engine.propagate(&validated, 3).unwrap();
assert!(!result.changes_by_month.is_empty());
if let Some(changes) = result.changes_by_month.get(&1) {
let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
assert!(lag_change.is_some(), "Should have processing_lag change");
}
}
#[test]
fn test_propagation_regulatory_change() {
let dag = make_dag_with_operational_nodes();
let engine = CausalPropagationEngine::new(&dag);
let mut params = HashMap::new();
params.insert("severity".to_string(), serde_json::json!(0.8));
let intervention = make_intervention(
InterventionType::RegulatoryChange(datasynth_core::RegulatoryChangeIntervention {
subtype: datasynth_core::RegulatoryChangeType::NewStandardAdoption,
parameters: params,
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec![],
}];
let result = engine.propagate(&validated, 3).unwrap();
assert!(!result.changes_by_month.is_empty());
}
#[test]
fn test_propagation_entity_event_employee_departure() {
let dag = make_dag_with_operational_nodes();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::EntityEvent(datasynth_core::EntityEventIntervention {
subtype: datasynth_core::InterventionEntityEvent::EmployeeDeparture,
target: datasynth_core::EntityTarget {
cluster: None,
entity_ids: None,
filter: None,
count: Some(3),
fraction: None,
},
parameters: HashMap::new(),
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec![],
}];
let result = engine.propagate(&validated, 2).unwrap();
assert!(!result.changes_by_month.is_empty());
}
#[test]
fn test_propagation_process_change_system_migration() {
let dag = make_dag_with_operational_nodes();
let engine = CausalPropagationEngine::new(&dag);
let intervention = make_intervention(
InterventionType::ProcessChange(datasynth_core::ProcessChangeIntervention {
subtype: datasynth_core::ProcessChangeType::SystemMigration,
parameters: HashMap::new(),
}),
1,
OnsetType::Sudden,
);
let validated = vec![ValidatedIntervention {
intervention,
affected_config_paths: vec![],
}];
let result = engine.propagate(&validated, 2).unwrap();
assert!(!result.changes_by_month.is_empty());
if let Some(changes) = result.changes_by_month.get(&1) {
let lag_change = changes.iter().find(|c| c.source_node == "processing_lag");
assert!(lag_change.is_some(), "Should have processing_lag change");
}
}
}