Skip to main content

datasynth_generators/anomaly/correlation/
cascade.rs

1//! Error cascade modeling.
2//!
3//! Models how errors propagate through a system, where one error
4//! leads to others (e.g., wrong account coding leads to reconciliation
5//! differences which lead to correcting entries).
6
7use chrono::NaiveDate;
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10use uuid::Uuid;
11
12use datasynth_core::models::AnomalyType;
13use datasynth_core::uuid_factory::{DeterministicUuidFactory, GeneratorType};
14
15/// Configuration for cascade generation.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CascadeConfig {
18    /// Maximum depth of cascade (how many steps).
19    pub max_depth: u32,
20    /// Probability of cascade continuing at each step.
21    pub continuation_probability: f64,
22    /// Whether cascades can branch (multiple consequences from one step).
23    pub allow_branching: bool,
24    /// Maximum branches per step.
25    pub max_branches: u32,
26}
27
28impl Default for CascadeConfig {
29    fn default() -> Self {
30        Self {
31            max_depth: 4,
32            continuation_probability: 0.7,
33            allow_branching: true,
34            max_branches: 2,
35        }
36    }
37}
38
39/// A step in an error cascade.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CascadeStep {
42    /// Step number in cascade.
43    pub step: u32,
44    /// Anomaly type for this step.
45    pub anomaly_type: AnomalyType,
46    /// Days after previous step.
47    pub lag_days: i32,
48    /// Description of why this step occurs.
49    pub reason: String,
50    /// Whether this step was executed.
51    pub executed: bool,
52    /// Document ID if executed.
53    pub document_id: Option<String>,
54    /// Anomaly ID if labeled.
55    pub anomaly_id: Option<String>,
56}
57
58impl CascadeStep {
59    /// Creates a new cascade step.
60    pub fn new(step: u32, anomaly_type: AnomalyType, lag_days: i32) -> Self {
61        Self {
62            step,
63            anomaly_type,
64            lag_days,
65            reason: String::new(),
66            executed: false,
67            document_id: None,
68            anomaly_id: None,
69        }
70    }
71
72    /// Sets the reason for this step.
73    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
74        self.reason = reason.into();
75        self
76    }
77
78    /// Marks step as executed.
79    pub fn mark_executed(&mut self, document_id: impl Into<String>, anomaly_id: impl Into<String>) {
80        self.executed = true;
81        self.document_id = Some(document_id.into());
82        self.anomaly_id = Some(anomaly_id.into());
83    }
84}
85
86/// An error cascade instance.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ErrorCascade {
89    /// Unique cascade ID.
90    pub cascade_id: Uuid,
91    /// Trigger anomaly type.
92    pub trigger: AnomalyType,
93    /// Trigger document ID.
94    pub trigger_document_id: String,
95    /// Trigger date.
96    pub trigger_date: NaiveDate,
97    /// Steps in the cascade.
98    pub steps: Vec<CascadeStep>,
99    /// Current step index.
100    pub current_step: usize,
101}
102
103impl ErrorCascade {
104    /// Creates a new error cascade.
105    pub fn new(
106        trigger: AnomalyType,
107        trigger_document_id: impl Into<String>,
108        trigger_date: NaiveDate,
109        uuid_factory: &DeterministicUuidFactory,
110    ) -> Self {
111        Self {
112            cascade_id: uuid_factory.next(),
113            trigger,
114            trigger_document_id: trigger_document_id.into(),
115            trigger_date,
116            steps: Vec::new(),
117            current_step: 0,
118        }
119    }
120
121    /// Adds a step to the cascade.
122    pub fn add_step(&mut self, step: CascadeStep) {
123        self.steps.push(step);
124    }
125
126    /// Gets the next step to execute.
127    pub fn next_step(&self) -> Option<&CascadeStep> {
128        self.steps.get(self.current_step)
129    }
130
131    /// Gets the next step mutably.
132    pub fn next_step_mut(&mut self) -> Option<&mut CascadeStep> {
133        self.steps.get_mut(self.current_step)
134    }
135
136    /// Advances to the next step.
137    pub fn advance(&mut self) {
138        if self.current_step < self.steps.len() {
139            self.current_step += 1;
140        }
141    }
142
143    /// Returns whether the cascade is complete.
144    pub fn is_complete(&self) -> bool {
145        self.current_step >= self.steps.len()
146    }
147
148    /// Gets the expected date for the next step.
149    pub fn next_step_date(&self) -> Option<NaiveDate> {
150        if let Some(step) = self.next_step() {
151            // Calculate date based on trigger date plus accumulated lags
152            let total_lag: i32 = self.steps[..self.current_step]
153                .iter()
154                .map(|s| s.lag_days)
155                .sum::<i32>()
156                + step.lag_days;
157            Some(self.trigger_date + chrono::Duration::days(total_lag as i64))
158        } else {
159            None
160        }
161    }
162}
163
164/// Generator for error cascades.
165pub struct CascadeGenerator {
166    config: CascadeConfig,
167    /// Deterministic UUID factory for cascade IDs.
168    uuid_factory: DeterministicUuidFactory,
169    /// Active cascades.
170    active_cascades: Vec<ErrorCascade>,
171    /// Completed cascades.
172    completed_cascades: Vec<ErrorCascade>,
173    /// Cascade templates by trigger type.
174    templates: Vec<CascadeTemplate>,
175}
176
177/// Template for a cascade based on trigger type.
178#[derive(Debug, Clone)]
179pub struct CascadeTemplate {
180    /// Trigger anomaly type.
181    pub trigger: AnomalyType,
182    /// Potential cascade steps.
183    pub steps: Vec<CascadeStepTemplate>,
184}
185
186/// Template for a cascade step.
187#[derive(Debug, Clone)]
188pub struct CascadeStepTemplate {
189    /// Anomaly type for this step.
190    pub anomaly_type: AnomalyType,
191    /// Probability this step occurs.
192    pub probability: f64,
193    /// Minimum lag days.
194    pub lag_min: i32,
195    /// Maximum lag days.
196    pub lag_max: i32,
197    /// Reason description.
198    pub reason: String,
199}
200
201impl CascadeStepTemplate {
202    /// Creates a new step template.
203    pub fn new(
204        anomaly_type: AnomalyType,
205        probability: f64,
206        lag_range: (i32, i32),
207        reason: impl Into<String>,
208    ) -> Self {
209        Self {
210            anomaly_type,
211            probability,
212            lag_min: lag_range.0,
213            lag_max: lag_range.1,
214            reason: reason.into(),
215        }
216    }
217}
218
219impl Default for CascadeGenerator {
220    fn default() -> Self {
221        Self::new(CascadeConfig::default())
222    }
223}
224
225impl CascadeGenerator {
226    /// Creates a new cascade generator.
227    pub fn new(config: CascadeConfig) -> Self {
228        Self {
229            config,
230            uuid_factory: DeterministicUuidFactory::new(0, GeneratorType::Anomaly),
231            active_cascades: Vec::new(),
232            completed_cascades: Vec::new(),
233            templates: Self::default_templates(),
234        }
235    }
236
237    /// Creates default cascade templates.
238    fn default_templates() -> Vec<CascadeTemplate> {
239        use datasynth_core::models::{ErrorType, ProcessIssueType};
240
241        vec![
242            // Account misclassification cascade
243            CascadeTemplate {
244                trigger: AnomalyType::Error(ErrorType::MisclassifiedAccount),
245                steps: vec![
246                    CascadeStepTemplate::new(
247                        AnomalyType::Error(ErrorType::DuplicateEntry),
248                        0.40,
249                        (5, 15),
250                        "Attempt to correct via additional entry",
251                    ),
252                    CascadeStepTemplate::new(
253                        AnomalyType::Error(ErrorType::ReversedAmount),
254                        0.30,
255                        (10, 30),
256                        "Reversal of original entry",
257                    ),
258                    CascadeStepTemplate::new(
259                        AnomalyType::Error(ErrorType::WrongPeriod),
260                        0.25,
261                        (30, 60),
262                        "Correction posted to wrong period",
263                    ),
264                ],
265            },
266            // Wrong period cascade
267            CascadeTemplate {
268                trigger: AnomalyType::Error(ErrorType::WrongPeriod),
269                steps: vec![
270                    CascadeStepTemplate::new(
271                        AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
272                        0.50,
273                        (1, 5),
274                        "Late correction posting",
275                    ),
276                    CascadeStepTemplate::new(
277                        AnomalyType::Error(ErrorType::CutoffError),
278                        0.35,
279                        (5, 15),
280                        "Additional cutoff issues from correction",
281                    ),
282                ],
283            },
284            // Missing field cascade
285            CascadeTemplate {
286                trigger: AnomalyType::Error(ErrorType::MissingField),
287                steps: vec![
288                    CascadeStepTemplate::new(
289                        AnomalyType::ProcessIssue(ProcessIssueType::MissingDocumentation),
290                        0.60,
291                        (1, 7),
292                        "Request for missing documentation",
293                    ),
294                    CascadeStepTemplate::new(
295                        AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
296                        0.40,
297                        (5, 14),
298                        "Delayed posting while gathering info",
299                    ),
300                ],
301            },
302            // Duplicate entry cascade
303            CascadeTemplate {
304                trigger: AnomalyType::Error(ErrorType::DuplicateEntry),
305                steps: vec![CascadeStepTemplate::new(
306                    AnomalyType::Error(ErrorType::ReversedAmount),
307                    0.70,
308                    (1, 5),
309                    "Reversal of duplicate",
310                )],
311            },
312        ]
313    }
314
315    /// Starts a new cascade if a template matches.
316    pub fn maybe_start_cascade<R: Rng>(
317        &mut self,
318        trigger: &AnomalyType,
319        document_id: impl Into<String>,
320        date: NaiveDate,
321        rng: &mut R,
322    ) -> Option<Uuid> {
323        // Find matching template
324        let template = self.templates.iter().find(|t| t.trigger == *trigger)?;
325
326        let mut cascade = ErrorCascade::new(trigger.clone(), document_id, date, &self.uuid_factory);
327
328        // Generate steps from template
329        let mut step_num = 0u32;
330        for step_template in &template.steps {
331            if rng.random::<f64>() < step_template.probability {
332                step_num += 1;
333
334                if step_num > self.config.max_depth {
335                    break;
336                }
337
338                let lag = if step_template.lag_min == step_template.lag_max {
339                    step_template.lag_min
340                } else {
341                    rng.random_range(step_template.lag_min..=step_template.lag_max)
342                };
343
344                let step = CascadeStep::new(step_num, step_template.anomaly_type.clone(), lag)
345                    .with_reason(&step_template.reason);
346
347                cascade.add_step(step);
348            }
349        }
350
351        // Only create cascade if it has steps
352        if cascade.steps.is_empty() {
353            return None;
354        }
355
356        let cascade_id = cascade.cascade_id;
357        self.active_cascades.push(cascade);
358        Some(cascade_id)
359    }
360
361    /// Gets cascades that have steps due on or before a given date.
362    pub fn get_due_cascades(&mut self, date: NaiveDate) -> Vec<(Uuid, CascadeStep)> {
363        let mut due = Vec::new();
364
365        for cascade in &self.active_cascades {
366            if let Some(next_date) = cascade.next_step_date() {
367                if next_date <= date {
368                    if let Some(step) = cascade.next_step() {
369                        due.push((cascade.cascade_id, step.clone()));
370                    }
371                }
372            }
373        }
374
375        due
376    }
377
378    /// Marks a cascade step as executed and advances.
379    pub fn execute_step(
380        &mut self,
381        cascade_id: Uuid,
382        document_id: impl Into<String>,
383        anomaly_id: impl Into<String>,
384    ) {
385        let doc_id = document_id.into();
386        let ano_id = anomaly_id.into();
387
388        if let Some(cascade) = self
389            .active_cascades
390            .iter_mut()
391            .find(|c| c.cascade_id == cascade_id)
392        {
393            if let Some(step) = cascade.next_step_mut() {
394                step.mark_executed(&doc_id, &ano_id);
395            }
396            cascade.advance();
397
398            // Move to completed if done
399            if cascade.is_complete() {
400                // Will be handled by cleanup
401            }
402        }
403    }
404
405    /// Cleans up completed cascades.
406    pub fn cleanup(&mut self) {
407        let completed: Vec<_> = self
408            .active_cascades
409            .drain(..)
410            .filter(|c| !c.is_complete())
411            .collect();
412
413        let newly_completed: Vec<_> = self
414            .active_cascades
415            .iter()
416            .filter(|c| c.is_complete())
417            .cloned()
418            .collect();
419
420        self.completed_cascades.extend(newly_completed);
421        self.active_cascades = completed;
422    }
423
424    /// Returns active cascade count.
425    pub fn active_count(&self) -> usize {
426        self.active_cascades.len()
427    }
428
429    /// Returns completed cascade count.
430    pub fn completed_count(&self) -> usize {
431        self.completed_cascades.len()
432    }
433
434    /// Adds a custom template.
435    pub fn add_template(&mut self, template: CascadeTemplate) {
436        self.templates.push(template);
437    }
438}
439
440#[cfg(test)]
441#[allow(clippy::unwrap_used)]
442mod tests {
443    use super::*;
444    use datasynth_core::models::ErrorType;
445    use rand::SeedableRng;
446    use rand_chacha::ChaCha8Rng;
447
448    #[test]
449    fn test_cascade_step() {
450        let step = CascadeStep::new(1, AnomalyType::Error(ErrorType::DuplicateEntry), 5)
451            .with_reason("Test reason");
452
453        assert_eq!(step.step, 1);
454        assert_eq!(step.lag_days, 5);
455        assert!(!step.executed);
456    }
457
458    #[test]
459    fn test_error_cascade() {
460        let uuid_factory = DeterministicUuidFactory::new(42, GeneratorType::Anomaly);
461        let mut cascade = ErrorCascade::new(
462            AnomalyType::Error(ErrorType::MisclassifiedAccount),
463            "JE001",
464            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
465            &uuid_factory,
466        );
467
468        cascade.add_step(CascadeStep::new(
469            1,
470            AnomalyType::Error(ErrorType::DuplicateEntry),
471            5,
472        ));
473        cascade.add_step(CascadeStep::new(
474            2,
475            AnomalyType::Error(ErrorType::ReversedAmount),
476            10,
477        ));
478
479        assert_eq!(cascade.steps.len(), 2);
480        assert!(!cascade.is_complete());
481
482        // Check next step date
483        let next_date = cascade.next_step_date().unwrap();
484        assert_eq!(next_date, NaiveDate::from_ymd_opt(2024, 1, 20).unwrap());
485    }
486
487    #[test]
488    fn test_cascade_generator() {
489        let mut generator = CascadeGenerator::new(CascadeConfig::default());
490        let mut rng = ChaCha8Rng::seed_from_u64(42);
491
492        // Try to start a cascade for misclassified account
493        let cascade_id = generator.maybe_start_cascade(
494            &AnomalyType::Error(ErrorType::MisclassifiedAccount),
495            "JE001",
496            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
497            &mut rng,
498        );
499
500        // May or may not create cascade depending on RNG
501        if cascade_id.is_some() {
502            assert!(generator.active_count() > 0);
503        }
504    }
505
506    #[test]
507    fn test_cascade_generator_no_match() {
508        let mut generator = CascadeGenerator::new(CascadeConfig::default());
509        let mut rng = ChaCha8Rng::seed_from_u64(42);
510
511        // Try to start a cascade for an unregistered trigger
512        let cascade_id = generator.maybe_start_cascade(
513            &AnomalyType::Fraud(datasynth_core::models::FraudType::SelfApproval),
514            "JE001",
515            NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
516            &mut rng,
517        );
518
519        assert!(cascade_id.is_none());
520    }
521}