Skip to main content

datasynth_generators/anomaly/correlation/
cascade.rs

1//! Error cascade modeling.
2//!
3//! Models how errors propagate through a system, where one error
4//! leads to others (e.g., wrong account coding leads to reconciliation
5//! differences which lead to correcting entries).
6
7use chrono::NaiveDate;
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10use uuid::Uuid;
11
12use datasynth_core::models::AnomalyType;
13
14/// Configuration for cascade generation.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct CascadeConfig {
17    /// Maximum depth of cascade (how many steps).
18    pub max_depth: u32,
19    /// Probability of cascade continuing at each step.
20    pub continuation_probability: f64,
21    /// Whether cascades can branch (multiple consequences from one step).
22    pub allow_branching: bool,
23    /// Maximum branches per step.
24    pub max_branches: u32,
25}
26
27impl Default for CascadeConfig {
28    fn default() -> Self {
29        Self {
30            max_depth: 4,
31            continuation_probability: 0.7,
32            allow_branching: true,
33            max_branches: 2,
34        }
35    }
36}
37
38/// A step in an error cascade.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct CascadeStep {
41    /// Step number in cascade.
42    pub step: u32,
43    /// Anomaly type for this step.
44    pub anomaly_type: AnomalyType,
45    /// Days after previous step.
46    pub lag_days: i32,
47    /// Description of why this step occurs.
48    pub reason: String,
49    /// Whether this step was executed.
50    pub executed: bool,
51    /// Document ID if executed.
52    pub document_id: Option<String>,
53    /// Anomaly ID if labeled.
54    pub anomaly_id: Option<String>,
55}
56
57impl CascadeStep {
58    /// Creates a new cascade step.
59    pub fn new(step: u32, anomaly_type: AnomalyType, lag_days: i32) -> Self {
60        Self {
61            step,
62            anomaly_type,
63            lag_days,
64            reason: String::new(),
65            executed: false,
66            document_id: None,
67            anomaly_id: None,
68        }
69    }
70
71    /// Sets the reason for this step.
72    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
73        self.reason = reason.into();
74        self
75    }
76
77    /// Marks step as executed.
78    pub fn mark_executed(&mut self, document_id: impl Into<String>, anomaly_id: impl Into<String>) {
79        self.executed = true;
80        self.document_id = Some(document_id.into());
81        self.anomaly_id = Some(anomaly_id.into());
82    }
83}
84
85/// An error cascade instance.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ErrorCascade {
88    /// Unique cascade ID.
89    pub cascade_id: Uuid,
90    /// Trigger anomaly type.
91    pub trigger: AnomalyType,
92    /// Trigger document ID.
93    pub trigger_document_id: String,
94    /// Trigger date.
95    pub trigger_date: NaiveDate,
96    /// Steps in the cascade.
97    pub steps: Vec<CascadeStep>,
98    /// Current step index.
99    pub current_step: usize,
100}
101
102impl ErrorCascade {
103    /// Creates a new error cascade.
104    pub fn new(
105        trigger: AnomalyType,
106        trigger_document_id: impl Into<String>,
107        trigger_date: NaiveDate,
108    ) -> Self {
109        Self {
110            cascade_id: Uuid::new_v4(),
111            trigger,
112            trigger_document_id: trigger_document_id.into(),
113            trigger_date,
114            steps: Vec::new(),
115            current_step: 0,
116        }
117    }
118
119    /// Adds a step to the cascade.
120    pub fn add_step(&mut self, step: CascadeStep) {
121        self.steps.push(step);
122    }
123
124    /// Gets the next step to execute.
125    pub fn next_step(&self) -> Option<&CascadeStep> {
126        self.steps.get(self.current_step)
127    }
128
129    /// Gets the next step mutably.
130    pub fn next_step_mut(&mut self) -> Option<&mut CascadeStep> {
131        self.steps.get_mut(self.current_step)
132    }
133
134    /// Advances to the next step.
135    pub fn advance(&mut self) {
136        if self.current_step < self.steps.len() {
137            self.current_step += 1;
138        }
139    }
140
141    /// Returns whether the cascade is complete.
142    pub fn is_complete(&self) -> bool {
143        self.current_step >= self.steps.len()
144    }
145
146    /// Gets the expected date for the next step.
147    pub fn next_step_date(&self) -> Option<NaiveDate> {
148        if let Some(step) = self.next_step() {
149            // Calculate date based on trigger date plus accumulated lags
150            let total_lag: i32 = self.steps[..self.current_step]
151                .iter()
152                .map(|s| s.lag_days)
153                .sum::<i32>()
154                + step.lag_days;
155            Some(self.trigger_date + chrono::Duration::days(total_lag as i64))
156        } else {
157            None
158        }
159    }
160}
161
162/// Generator for error cascades.
163pub struct CascadeGenerator {
164    config: CascadeConfig,
165    /// Active cascades.
166    active_cascades: Vec<ErrorCascade>,
167    /// Completed cascades.
168    completed_cascades: Vec<ErrorCascade>,
169    /// Cascade templates by trigger type.
170    templates: Vec<CascadeTemplate>,
171}
172
173/// Template for a cascade based on trigger type.
174#[derive(Debug, Clone)]
175pub struct CascadeTemplate {
176    /// Trigger anomaly type.
177    pub trigger: AnomalyType,
178    /// Potential cascade steps.
179    pub steps: Vec<CascadeStepTemplate>,
180}
181
182/// Template for a cascade step.
183#[derive(Debug, Clone)]
184pub struct CascadeStepTemplate {
185    /// Anomaly type for this step.
186    pub anomaly_type: AnomalyType,
187    /// Probability this step occurs.
188    pub probability: f64,
189    /// Minimum lag days.
190    pub lag_min: i32,
191    /// Maximum lag days.
192    pub lag_max: i32,
193    /// Reason description.
194    pub reason: String,
195}
196
197impl CascadeStepTemplate {
198    /// Creates a new step template.
199    pub fn new(
200        anomaly_type: AnomalyType,
201        probability: f64,
202        lag_range: (i32, i32),
203        reason: impl Into<String>,
204    ) -> Self {
205        Self {
206            anomaly_type,
207            probability,
208            lag_min: lag_range.0,
209            lag_max: lag_range.1,
210            reason: reason.into(),
211        }
212    }
213}
214
215impl Default for CascadeGenerator {
216    fn default() -> Self {
217        Self::new(CascadeConfig::default())
218    }
219}
220
221impl CascadeGenerator {
222    /// Creates a new cascade generator.
223    pub fn new(config: CascadeConfig) -> Self {
224        Self {
225            config,
226            active_cascades: Vec::new(),
227            completed_cascades: Vec::new(),
228            templates: Self::default_templates(),
229        }
230    }
231
232    /// Creates default cascade templates.
233    fn default_templates() -> Vec<CascadeTemplate> {
234        use datasynth_core::models::{ErrorType, ProcessIssueType};
235
236        vec![
237            // Account misclassification cascade
238            CascadeTemplate {
239                trigger: AnomalyType::Error(ErrorType::MisclassifiedAccount),
240                steps: vec![
241                    CascadeStepTemplate::new(
242                        AnomalyType::Error(ErrorType::DuplicateEntry),
243                        0.40,
244                        (5, 15),
245                        "Attempt to correct via additional entry",
246                    ),
247                    CascadeStepTemplate::new(
248                        AnomalyType::Error(ErrorType::ReversedAmount),
249                        0.30,
250                        (10, 30),
251                        "Reversal of original entry",
252                    ),
253                    CascadeStepTemplate::new(
254                        AnomalyType::Error(ErrorType::WrongPeriod),
255                        0.25,
256                        (30, 60),
257                        "Correction posted to wrong period",
258                    ),
259                ],
260            },
261            // Wrong period cascade
262            CascadeTemplate {
263                trigger: AnomalyType::Error(ErrorType::WrongPeriod),
264                steps: vec![
265                    CascadeStepTemplate::new(
266                        AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
267                        0.50,
268                        (1, 5),
269                        "Late correction posting",
270                    ),
271                    CascadeStepTemplate::new(
272                        AnomalyType::Error(ErrorType::CutoffError),
273                        0.35,
274                        (5, 15),
275                        "Additional cutoff issues from correction",
276                    ),
277                ],
278            },
279            // Missing field cascade
280            CascadeTemplate {
281                trigger: AnomalyType::Error(ErrorType::MissingField),
282                steps: vec![
283                    CascadeStepTemplate::new(
284                        AnomalyType::ProcessIssue(ProcessIssueType::MissingDocumentation),
285                        0.60,
286                        (1, 7),
287                        "Request for missing documentation",
288                    ),
289                    CascadeStepTemplate::new(
290                        AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
291                        0.40,
292                        (5, 14),
293                        "Delayed posting while gathering info",
294                    ),
295                ],
296            },
297            // Duplicate entry cascade
298            CascadeTemplate {
299                trigger: AnomalyType::Error(ErrorType::DuplicateEntry),
300                steps: vec![CascadeStepTemplate::new(
301                    AnomalyType::Error(ErrorType::ReversedAmount),
302                    0.70,
303                    (1, 5),
304                    "Reversal of duplicate",
305                )],
306            },
307        ]
308    }
309
310    /// Starts a new cascade if a template matches.
311    pub fn maybe_start_cascade<R: Rng>(
312        &mut self,
313        trigger: &AnomalyType,
314        document_id: impl Into<String>,
315        date: NaiveDate,
316        rng: &mut R,
317    ) -> Option<Uuid> {
318        // Find matching template
319        let template = self.templates.iter().find(|t| t.trigger == *trigger)?;
320
321        let mut cascade = ErrorCascade::new(trigger.clone(), document_id, date);
322
323        // Generate steps from template
324        let mut step_num = 0u32;
325        for step_template in &template.steps {
326            if rng.gen::<f64>() < step_template.probability {
327                step_num += 1;
328
329                if step_num > self.config.max_depth {
330                    break;
331                }
332
333                let lag = if step_template.lag_min == step_template.lag_max {
334                    step_template.lag_min
335                } else {
336                    rng.gen_range(step_template.lag_min..=step_template.lag_max)
337                };
338
339                let step = CascadeStep::new(step_num, step_template.anomaly_type.clone(), lag)
340                    .with_reason(&step_template.reason);
341
342                cascade.add_step(step);
343            }
344        }
345
346        // Only create cascade if it has steps
347        if cascade.steps.is_empty() {
348            return None;
349        }
350
351        let cascade_id = cascade.cascade_id;
352        self.active_cascades.push(cascade);
353        Some(cascade_id)
354    }
355
356    /// Gets cascades that have steps due on or before a given date.
357    pub fn get_due_cascades(&mut self, date: NaiveDate) -> Vec<(Uuid, CascadeStep)> {
358        let mut due = Vec::new();
359
360        for cascade in &self.active_cascades {
361            if let Some(next_date) = cascade.next_step_date() {
362                if next_date <= date {
363                    if let Some(step) = cascade.next_step() {
364                        due.push((cascade.cascade_id, step.clone()));
365                    }
366                }
367            }
368        }
369
370        due
371    }
372
373    /// Marks a cascade step as executed and advances.
374    pub fn execute_step(
375        &mut self,
376        cascade_id: Uuid,
377        document_id: impl Into<String>,
378        anomaly_id: impl Into<String>,
379    ) {
380        let doc_id = document_id.into();
381        let ano_id = anomaly_id.into();
382
383        if let Some(cascade) = self
384            .active_cascades
385            .iter_mut()
386            .find(|c| c.cascade_id == cascade_id)
387        {
388            if let Some(step) = cascade.next_step_mut() {
389                step.mark_executed(&doc_id, &ano_id);
390            }
391            cascade.advance();
392
393            // Move to completed if done
394            if cascade.is_complete() {
395                // Will be handled by cleanup
396            }
397        }
398    }
399
400    /// Cleans up completed cascades.
401    pub fn cleanup(&mut self) {
402        let completed: Vec<_> = self
403            .active_cascades
404            .drain(..)
405            .filter(|c| !c.is_complete())
406            .collect();
407
408        let newly_completed: Vec<_> = self
409            .active_cascades
410            .iter()
411            .filter(|c| c.is_complete())
412            .cloned()
413            .collect();
414
415        self.completed_cascades.extend(newly_completed);
416        self.active_cascades = completed;
417    }
418
419    /// Returns active cascade count.
420    pub fn active_count(&self) -> usize {
421        self.active_cascades.len()
422    }
423
424    /// Returns completed cascade count.
425    pub fn completed_count(&self) -> usize {
426        self.completed_cascades.len()
427    }
428
429    /// Adds a custom template.
430    pub fn add_template(&mut self, template: CascadeTemplate) {
431        self.templates.push(template);
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use datasynth_core::models::ErrorType;
439    use rand::SeedableRng;
440    use rand_chacha::ChaCha8Rng;
441
442    #[test]
443    fn test_cascade_step() {
444        let step = CascadeStep::new(1, AnomalyType::Error(ErrorType::DuplicateEntry), 5)
445            .with_reason("Test reason");
446
447        assert_eq!(step.step, 1);
448        assert_eq!(step.lag_days, 5);
449        assert!(!step.executed);
450    }
451
452    #[test]
453    fn test_error_cascade() {
454        let mut cascade = ErrorCascade::new(
455            AnomalyType::Error(ErrorType::MisclassifiedAccount),
456            "JE001",
457            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
458        );
459
460        cascade.add_step(CascadeStep::new(
461            1,
462            AnomalyType::Error(ErrorType::DuplicateEntry),
463            5,
464        ));
465        cascade.add_step(CascadeStep::new(
466            2,
467            AnomalyType::Error(ErrorType::ReversedAmount),
468            10,
469        ));
470
471        assert_eq!(cascade.steps.len(), 2);
472        assert!(!cascade.is_complete());
473
474        // Check next step date
475        let next_date = cascade.next_step_date().unwrap();
476        assert_eq!(next_date, NaiveDate::from_ymd_opt(2024, 1, 20).unwrap());
477    }
478
479    #[test]
480    fn test_cascade_generator() {
481        let mut generator = CascadeGenerator::new(CascadeConfig::default());
482        let mut rng = ChaCha8Rng::seed_from_u64(42);
483
484        // Try to start a cascade for misclassified account
485        let cascade_id = generator.maybe_start_cascade(
486            &AnomalyType::Error(ErrorType::MisclassifiedAccount),
487            "JE001",
488            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
489            &mut rng,
490        );
491
492        // May or may not create cascade depending on RNG
493        if cascade_id.is_some() {
494            assert!(generator.active_count() > 0);
495        }
496    }
497
498    #[test]
499    fn test_cascade_generator_no_match() {
500        let mut generator = CascadeGenerator::new(CascadeConfig::default());
501        let mut rng = ChaCha8Rng::seed_from_u64(42);
502
503        // Try to start a cascade for an unregistered trigger
504        let cascade_id = generator.maybe_start_cascade(
505            &AnomalyType::Fraud(datasynth_core::models::FraudType::SelfApproval),
506            "JE001",
507            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
508            &mut rng,
509        );
510
511        assert!(cascade_id.is_none());
512    }
513}