1use chrono::NaiveDate;
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10use uuid::Uuid;
11
12use datasynth_core::models::AnomalyType;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct CascadeConfig {
17 pub max_depth: u32,
19 pub continuation_probability: f64,
21 pub allow_branching: bool,
23 pub max_branches: u32,
25}
26
27impl Default for CascadeConfig {
28 fn default() -> Self {
29 Self {
30 max_depth: 4,
31 continuation_probability: 0.7,
32 allow_branching: true,
33 max_branches: 2,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct CascadeStep {
41 pub step: u32,
43 pub anomaly_type: AnomalyType,
45 pub lag_days: i32,
47 pub reason: String,
49 pub executed: bool,
51 pub document_id: Option<String>,
53 pub anomaly_id: Option<String>,
55}
56
57impl CascadeStep {
58 pub fn new(step: u32, anomaly_type: AnomalyType, lag_days: i32) -> Self {
60 Self {
61 step,
62 anomaly_type,
63 lag_days,
64 reason: String::new(),
65 executed: false,
66 document_id: None,
67 anomaly_id: None,
68 }
69 }
70
71 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
73 self.reason = reason.into();
74 self
75 }
76
77 pub fn mark_executed(&mut self, document_id: impl Into<String>, anomaly_id: impl Into<String>) {
79 self.executed = true;
80 self.document_id = Some(document_id.into());
81 self.anomaly_id = Some(anomaly_id.into());
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ErrorCascade {
88 pub cascade_id: Uuid,
90 pub trigger: AnomalyType,
92 pub trigger_document_id: String,
94 pub trigger_date: NaiveDate,
96 pub steps: Vec<CascadeStep>,
98 pub current_step: usize,
100}
101
102impl ErrorCascade {
103 pub fn new(
105 trigger: AnomalyType,
106 trigger_document_id: impl Into<String>,
107 trigger_date: NaiveDate,
108 ) -> Self {
109 Self {
110 cascade_id: Uuid::new_v4(),
111 trigger,
112 trigger_document_id: trigger_document_id.into(),
113 trigger_date,
114 steps: Vec::new(),
115 current_step: 0,
116 }
117 }
118
119 pub fn add_step(&mut self, step: CascadeStep) {
121 self.steps.push(step);
122 }
123
124 pub fn next_step(&self) -> Option<&CascadeStep> {
126 self.steps.get(self.current_step)
127 }
128
129 pub fn next_step_mut(&mut self) -> Option<&mut CascadeStep> {
131 self.steps.get_mut(self.current_step)
132 }
133
134 pub fn advance(&mut self) {
136 if self.current_step < self.steps.len() {
137 self.current_step += 1;
138 }
139 }
140
141 pub fn is_complete(&self) -> bool {
143 self.current_step >= self.steps.len()
144 }
145
146 pub fn next_step_date(&self) -> Option<NaiveDate> {
148 if let Some(step) = self.next_step() {
149 let total_lag: i32 = self.steps[..self.current_step]
151 .iter()
152 .map(|s| s.lag_days)
153 .sum::<i32>()
154 + step.lag_days;
155 Some(self.trigger_date + chrono::Duration::days(total_lag as i64))
156 } else {
157 None
158 }
159 }
160}
161
162pub struct CascadeGenerator {
164 config: CascadeConfig,
165 active_cascades: Vec<ErrorCascade>,
167 completed_cascades: Vec<ErrorCascade>,
169 templates: Vec<CascadeTemplate>,
171}
172
173#[derive(Debug, Clone)]
175pub struct CascadeTemplate {
176 pub trigger: AnomalyType,
178 pub steps: Vec<CascadeStepTemplate>,
180}
181
182#[derive(Debug, Clone)]
184pub struct CascadeStepTemplate {
185 pub anomaly_type: AnomalyType,
187 pub probability: f64,
189 pub lag_min: i32,
191 pub lag_max: i32,
193 pub reason: String,
195}
196
197impl CascadeStepTemplate {
198 pub fn new(
200 anomaly_type: AnomalyType,
201 probability: f64,
202 lag_range: (i32, i32),
203 reason: impl Into<String>,
204 ) -> Self {
205 Self {
206 anomaly_type,
207 probability,
208 lag_min: lag_range.0,
209 lag_max: lag_range.1,
210 reason: reason.into(),
211 }
212 }
213}
214
215impl Default for CascadeGenerator {
216 fn default() -> Self {
217 Self::new(CascadeConfig::default())
218 }
219}
220
221impl CascadeGenerator {
222 pub fn new(config: CascadeConfig) -> Self {
224 Self {
225 config,
226 active_cascades: Vec::new(),
227 completed_cascades: Vec::new(),
228 templates: Self::default_templates(),
229 }
230 }
231
232 fn default_templates() -> Vec<CascadeTemplate> {
234 use datasynth_core::models::{ErrorType, ProcessIssueType};
235
236 vec![
237 CascadeTemplate {
239 trigger: AnomalyType::Error(ErrorType::MisclassifiedAccount),
240 steps: vec![
241 CascadeStepTemplate::new(
242 AnomalyType::Error(ErrorType::DuplicateEntry),
243 0.40,
244 (5, 15),
245 "Attempt to correct via additional entry",
246 ),
247 CascadeStepTemplate::new(
248 AnomalyType::Error(ErrorType::ReversedAmount),
249 0.30,
250 (10, 30),
251 "Reversal of original entry",
252 ),
253 CascadeStepTemplate::new(
254 AnomalyType::Error(ErrorType::WrongPeriod),
255 0.25,
256 (30, 60),
257 "Correction posted to wrong period",
258 ),
259 ],
260 },
261 CascadeTemplate {
263 trigger: AnomalyType::Error(ErrorType::WrongPeriod),
264 steps: vec![
265 CascadeStepTemplate::new(
266 AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
267 0.50,
268 (1, 5),
269 "Late correction posting",
270 ),
271 CascadeStepTemplate::new(
272 AnomalyType::Error(ErrorType::CutoffError),
273 0.35,
274 (5, 15),
275 "Additional cutoff issues from correction",
276 ),
277 ],
278 },
279 CascadeTemplate {
281 trigger: AnomalyType::Error(ErrorType::MissingField),
282 steps: vec![
283 CascadeStepTemplate::new(
284 AnomalyType::ProcessIssue(ProcessIssueType::MissingDocumentation),
285 0.60,
286 (1, 7),
287 "Request for missing documentation",
288 ),
289 CascadeStepTemplate::new(
290 AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
291 0.40,
292 (5, 14),
293 "Delayed posting while gathering info",
294 ),
295 ],
296 },
297 CascadeTemplate {
299 trigger: AnomalyType::Error(ErrorType::DuplicateEntry),
300 steps: vec![CascadeStepTemplate::new(
301 AnomalyType::Error(ErrorType::ReversedAmount),
302 0.70,
303 (1, 5),
304 "Reversal of duplicate",
305 )],
306 },
307 ]
308 }
309
310 pub fn maybe_start_cascade<R: Rng>(
312 &mut self,
313 trigger: &AnomalyType,
314 document_id: impl Into<String>,
315 date: NaiveDate,
316 rng: &mut R,
317 ) -> Option<Uuid> {
318 let template = self.templates.iter().find(|t| t.trigger == *trigger)?;
320
321 let mut cascade = ErrorCascade::new(trigger.clone(), document_id, date);
322
323 let mut step_num = 0u32;
325 for step_template in &template.steps {
326 if rng.gen::<f64>() < step_template.probability {
327 step_num += 1;
328
329 if step_num > self.config.max_depth {
330 break;
331 }
332
333 let lag = if step_template.lag_min == step_template.lag_max {
334 step_template.lag_min
335 } else {
336 rng.gen_range(step_template.lag_min..=step_template.lag_max)
337 };
338
339 let step = CascadeStep::new(step_num, step_template.anomaly_type.clone(), lag)
340 .with_reason(&step_template.reason);
341
342 cascade.add_step(step);
343 }
344 }
345
346 if cascade.steps.is_empty() {
348 return None;
349 }
350
351 let cascade_id = cascade.cascade_id;
352 self.active_cascades.push(cascade);
353 Some(cascade_id)
354 }
355
356 pub fn get_due_cascades(&mut self, date: NaiveDate) -> Vec<(Uuid, CascadeStep)> {
358 let mut due = Vec::new();
359
360 for cascade in &self.active_cascades {
361 if let Some(next_date) = cascade.next_step_date() {
362 if next_date <= date {
363 if let Some(step) = cascade.next_step() {
364 due.push((cascade.cascade_id, step.clone()));
365 }
366 }
367 }
368 }
369
370 due
371 }
372
373 pub fn execute_step(
375 &mut self,
376 cascade_id: Uuid,
377 document_id: impl Into<String>,
378 anomaly_id: impl Into<String>,
379 ) {
380 let doc_id = document_id.into();
381 let ano_id = anomaly_id.into();
382
383 if let Some(cascade) = self
384 .active_cascades
385 .iter_mut()
386 .find(|c| c.cascade_id == cascade_id)
387 {
388 if let Some(step) = cascade.next_step_mut() {
389 step.mark_executed(&doc_id, &ano_id);
390 }
391 cascade.advance();
392
393 if cascade.is_complete() {
395 }
397 }
398 }
399
400 pub fn cleanup(&mut self) {
402 let completed: Vec<_> = self
403 .active_cascades
404 .drain(..)
405 .filter(|c| !c.is_complete())
406 .collect();
407
408 let newly_completed: Vec<_> = self
409 .active_cascades
410 .iter()
411 .filter(|c| c.is_complete())
412 .cloned()
413 .collect();
414
415 self.completed_cascades.extend(newly_completed);
416 self.active_cascades = completed;
417 }
418
419 pub fn active_count(&self) -> usize {
421 self.active_cascades.len()
422 }
423
424 pub fn completed_count(&self) -> usize {
426 self.completed_cascades.len()
427 }
428
429 pub fn add_template(&mut self, template: CascadeTemplate) {
431 self.templates.push(template);
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use datasynth_core::models::ErrorType;
439 use rand::SeedableRng;
440 use rand_chacha::ChaCha8Rng;
441
442 #[test]
443 fn test_cascade_step() {
444 let step = CascadeStep::new(1, AnomalyType::Error(ErrorType::DuplicateEntry), 5)
445 .with_reason("Test reason");
446
447 assert_eq!(step.step, 1);
448 assert_eq!(step.lag_days, 5);
449 assert!(!step.executed);
450 }
451
452 #[test]
453 fn test_error_cascade() {
454 let mut cascade = ErrorCascade::new(
455 AnomalyType::Error(ErrorType::MisclassifiedAccount),
456 "JE001",
457 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
458 );
459
460 cascade.add_step(CascadeStep::new(
461 1,
462 AnomalyType::Error(ErrorType::DuplicateEntry),
463 5,
464 ));
465 cascade.add_step(CascadeStep::new(
466 2,
467 AnomalyType::Error(ErrorType::ReversedAmount),
468 10,
469 ));
470
471 assert_eq!(cascade.steps.len(), 2);
472 assert!(!cascade.is_complete());
473
474 let next_date = cascade.next_step_date().unwrap();
476 assert_eq!(next_date, NaiveDate::from_ymd_opt(2024, 1, 20).unwrap());
477 }
478
479 #[test]
480 fn test_cascade_generator() {
481 let mut generator = CascadeGenerator::new(CascadeConfig::default());
482 let mut rng = ChaCha8Rng::seed_from_u64(42);
483
484 let cascade_id = generator.maybe_start_cascade(
486 &AnomalyType::Error(ErrorType::MisclassifiedAccount),
487 "JE001",
488 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
489 &mut rng,
490 );
491
492 if cascade_id.is_some() {
494 assert!(generator.active_count() > 0);
495 }
496 }
497
498 #[test]
499 fn test_cascade_generator_no_match() {
500 let mut generator = CascadeGenerator::new(CascadeConfig::default());
501 let mut rng = ChaCha8Rng::seed_from_u64(42);
502
503 let cascade_id = generator.maybe_start_cascade(
505 &AnomalyType::Fraud(datasynth_core::models::FraudType::SelfApproval),
506 "JE001",
507 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
508 &mut rng,
509 );
510
511 assert!(cascade_id.is_none());
512 }
513}