1use chrono::NaiveDate;
8use rand::Rng;
9use serde::{Deserialize, Serialize};
10use uuid::Uuid;
11
12use datasynth_core::models::AnomalyType;
13use datasynth_core::uuid_factory::{DeterministicUuidFactory, GeneratorType};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CascadeConfig {
18 pub max_depth: u32,
20 pub continuation_probability: f64,
22 pub allow_branching: bool,
24 pub max_branches: u32,
26}
27
28impl Default for CascadeConfig {
29 fn default() -> Self {
30 Self {
31 max_depth: 4,
32 continuation_probability: 0.7,
33 allow_branching: true,
34 max_branches: 2,
35 }
36 }
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CascadeStep {
42 pub step: u32,
44 pub anomaly_type: AnomalyType,
46 pub lag_days: i32,
48 pub reason: String,
50 pub executed: bool,
52 pub document_id: Option<String>,
54 pub anomaly_id: Option<String>,
56}
57
58impl CascadeStep {
59 pub fn new(step: u32, anomaly_type: AnomalyType, lag_days: i32) -> Self {
61 Self {
62 step,
63 anomaly_type,
64 lag_days,
65 reason: String::new(),
66 executed: false,
67 document_id: None,
68 anomaly_id: None,
69 }
70 }
71
72 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
74 self.reason = reason.into();
75 self
76 }
77
78 pub fn mark_executed(&mut self, document_id: impl Into<String>, anomaly_id: impl Into<String>) {
80 self.executed = true;
81 self.document_id = Some(document_id.into());
82 self.anomaly_id = Some(anomaly_id.into());
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ErrorCascade {
89 pub cascade_id: Uuid,
91 pub trigger: AnomalyType,
93 pub trigger_document_id: String,
95 pub trigger_date: NaiveDate,
97 pub steps: Vec<CascadeStep>,
99 pub current_step: usize,
101}
102
103impl ErrorCascade {
104 pub fn new(
106 trigger: AnomalyType,
107 trigger_document_id: impl Into<String>,
108 trigger_date: NaiveDate,
109 uuid_factory: &DeterministicUuidFactory,
110 ) -> Self {
111 Self {
112 cascade_id: uuid_factory.next(),
113 trigger,
114 trigger_document_id: trigger_document_id.into(),
115 trigger_date,
116 steps: Vec::new(),
117 current_step: 0,
118 }
119 }
120
121 pub fn add_step(&mut self, step: CascadeStep) {
123 self.steps.push(step);
124 }
125
126 pub fn next_step(&self) -> Option<&CascadeStep> {
128 self.steps.get(self.current_step)
129 }
130
131 pub fn next_step_mut(&mut self) -> Option<&mut CascadeStep> {
133 self.steps.get_mut(self.current_step)
134 }
135
136 pub fn advance(&mut self) {
138 if self.current_step < self.steps.len() {
139 self.current_step += 1;
140 }
141 }
142
143 pub fn is_complete(&self) -> bool {
145 self.current_step >= self.steps.len()
146 }
147
148 pub fn next_step_date(&self) -> Option<NaiveDate> {
150 if let Some(step) = self.next_step() {
151 let total_lag: i32 = self.steps[..self.current_step]
153 .iter()
154 .map(|s| s.lag_days)
155 .sum::<i32>()
156 + step.lag_days;
157 Some(self.trigger_date + chrono::Duration::days(total_lag as i64))
158 } else {
159 None
160 }
161 }
162}
163
164pub struct CascadeGenerator {
166 config: CascadeConfig,
167 uuid_factory: DeterministicUuidFactory,
169 active_cascades: Vec<ErrorCascade>,
171 completed_cascades: Vec<ErrorCascade>,
173 templates: Vec<CascadeTemplate>,
175}
176
177#[derive(Debug, Clone)]
179pub struct CascadeTemplate {
180 pub trigger: AnomalyType,
182 pub steps: Vec<CascadeStepTemplate>,
184}
185
186#[derive(Debug, Clone)]
188pub struct CascadeStepTemplate {
189 pub anomaly_type: AnomalyType,
191 pub probability: f64,
193 pub lag_min: i32,
195 pub lag_max: i32,
197 pub reason: String,
199}
200
201impl CascadeStepTemplate {
202 pub fn new(
204 anomaly_type: AnomalyType,
205 probability: f64,
206 lag_range: (i32, i32),
207 reason: impl Into<String>,
208 ) -> Self {
209 Self {
210 anomaly_type,
211 probability,
212 lag_min: lag_range.0,
213 lag_max: lag_range.1,
214 reason: reason.into(),
215 }
216 }
217}
218
219impl Default for CascadeGenerator {
220 fn default() -> Self {
221 Self::new(CascadeConfig::default())
222 }
223}
224
225impl CascadeGenerator {
226 pub fn new(config: CascadeConfig) -> Self {
228 Self {
229 config,
230 uuid_factory: DeterministicUuidFactory::new(0, GeneratorType::Anomaly),
231 active_cascades: Vec::new(),
232 completed_cascades: Vec::new(),
233 templates: Self::default_templates(),
234 }
235 }
236
237 fn default_templates() -> Vec<CascadeTemplate> {
239 use datasynth_core::models::{ErrorType, ProcessIssueType};
240
241 vec![
242 CascadeTemplate {
244 trigger: AnomalyType::Error(ErrorType::MisclassifiedAccount),
245 steps: vec![
246 CascadeStepTemplate::new(
247 AnomalyType::Error(ErrorType::DuplicateEntry),
248 0.40,
249 (5, 15),
250 "Attempt to correct via additional entry",
251 ),
252 CascadeStepTemplate::new(
253 AnomalyType::Error(ErrorType::ReversedAmount),
254 0.30,
255 (10, 30),
256 "Reversal of original entry",
257 ),
258 CascadeStepTemplate::new(
259 AnomalyType::Error(ErrorType::WrongPeriod),
260 0.25,
261 (30, 60),
262 "Correction posted to wrong period",
263 ),
264 ],
265 },
266 CascadeTemplate {
268 trigger: AnomalyType::Error(ErrorType::WrongPeriod),
269 steps: vec![
270 CascadeStepTemplate::new(
271 AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
272 0.50,
273 (1, 5),
274 "Late correction posting",
275 ),
276 CascadeStepTemplate::new(
277 AnomalyType::Error(ErrorType::CutoffError),
278 0.35,
279 (5, 15),
280 "Additional cutoff issues from correction",
281 ),
282 ],
283 },
284 CascadeTemplate {
286 trigger: AnomalyType::Error(ErrorType::MissingField),
287 steps: vec![
288 CascadeStepTemplate::new(
289 AnomalyType::ProcessIssue(ProcessIssueType::MissingDocumentation),
290 0.60,
291 (1, 7),
292 "Request for missing documentation",
293 ),
294 CascadeStepTemplate::new(
295 AnomalyType::ProcessIssue(ProcessIssueType::LatePosting),
296 0.40,
297 (5, 14),
298 "Delayed posting while gathering info",
299 ),
300 ],
301 },
302 CascadeTemplate {
304 trigger: AnomalyType::Error(ErrorType::DuplicateEntry),
305 steps: vec![CascadeStepTemplate::new(
306 AnomalyType::Error(ErrorType::ReversedAmount),
307 0.70,
308 (1, 5),
309 "Reversal of duplicate",
310 )],
311 },
312 ]
313 }
314
315 pub fn maybe_start_cascade<R: Rng>(
317 &mut self,
318 trigger: &AnomalyType,
319 document_id: impl Into<String>,
320 date: NaiveDate,
321 rng: &mut R,
322 ) -> Option<Uuid> {
323 let template = self.templates.iter().find(|t| t.trigger == *trigger)?;
325
326 let mut cascade = ErrorCascade::new(trigger.clone(), document_id, date, &self.uuid_factory);
327
328 let mut step_num = 0u32;
330 for step_template in &template.steps {
331 if rng.random::<f64>() < step_template.probability {
332 step_num += 1;
333
334 if step_num > self.config.max_depth {
335 break;
336 }
337
338 let lag = if step_template.lag_min == step_template.lag_max {
339 step_template.lag_min
340 } else {
341 rng.random_range(step_template.lag_min..=step_template.lag_max)
342 };
343
344 let step = CascadeStep::new(step_num, step_template.anomaly_type.clone(), lag)
345 .with_reason(&step_template.reason);
346
347 cascade.add_step(step);
348 }
349 }
350
351 if cascade.steps.is_empty() {
353 return None;
354 }
355
356 let cascade_id = cascade.cascade_id;
357 self.active_cascades.push(cascade);
358 Some(cascade_id)
359 }
360
361 pub fn get_due_cascades(&mut self, date: NaiveDate) -> Vec<(Uuid, CascadeStep)> {
363 let mut due = Vec::new();
364
365 for cascade in &self.active_cascades {
366 if let Some(next_date) = cascade.next_step_date() {
367 if next_date <= date {
368 if let Some(step) = cascade.next_step() {
369 due.push((cascade.cascade_id, step.clone()));
370 }
371 }
372 }
373 }
374
375 due
376 }
377
378 pub fn execute_step(
380 &mut self,
381 cascade_id: Uuid,
382 document_id: impl Into<String>,
383 anomaly_id: impl Into<String>,
384 ) {
385 let doc_id = document_id.into();
386 let ano_id = anomaly_id.into();
387
388 if let Some(cascade) = self
389 .active_cascades
390 .iter_mut()
391 .find(|c| c.cascade_id == cascade_id)
392 {
393 if let Some(step) = cascade.next_step_mut() {
394 step.mark_executed(&doc_id, &ano_id);
395 }
396 cascade.advance();
397
398 if cascade.is_complete() {
400 }
402 }
403 }
404
405 pub fn cleanup(&mut self) {
407 let completed: Vec<_> = self
408 .active_cascades
409 .drain(..)
410 .filter(|c| !c.is_complete())
411 .collect();
412
413 let newly_completed: Vec<_> = self
414 .active_cascades
415 .iter()
416 .filter(|c| c.is_complete())
417 .cloned()
418 .collect();
419
420 self.completed_cascades.extend(newly_completed);
421 self.active_cascades = completed;
422 }
423
424 pub fn active_count(&self) -> usize {
426 self.active_cascades.len()
427 }
428
429 pub fn completed_count(&self) -> usize {
431 self.completed_cascades.len()
432 }
433
434 pub fn add_template(&mut self, template: CascadeTemplate) {
436 self.templates.push(template);
437 }
438}
439
440#[cfg(test)]
441#[allow(clippy::unwrap_used)]
442mod tests {
443 use super::*;
444 use datasynth_core::models::ErrorType;
445 use rand::SeedableRng;
446 use rand_chacha::ChaCha8Rng;
447
448 #[test]
449 fn test_cascade_step() {
450 let step = CascadeStep::new(1, AnomalyType::Error(ErrorType::DuplicateEntry), 5)
451 .with_reason("Test reason");
452
453 assert_eq!(step.step, 1);
454 assert_eq!(step.lag_days, 5);
455 assert!(!step.executed);
456 }
457
458 #[test]
459 fn test_error_cascade() {
460 let uuid_factory = DeterministicUuidFactory::new(42, GeneratorType::Anomaly);
461 let mut cascade = ErrorCascade::new(
462 AnomalyType::Error(ErrorType::MisclassifiedAccount),
463 "JE001",
464 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
465 &uuid_factory,
466 );
467
468 cascade.add_step(CascadeStep::new(
469 1,
470 AnomalyType::Error(ErrorType::DuplicateEntry),
471 5,
472 ));
473 cascade.add_step(CascadeStep::new(
474 2,
475 AnomalyType::Error(ErrorType::ReversedAmount),
476 10,
477 ));
478
479 assert_eq!(cascade.steps.len(), 2);
480 assert!(!cascade.is_complete());
481
482 let next_date = cascade.next_step_date().unwrap();
484 assert_eq!(next_date, NaiveDate::from_ymd_opt(2024, 1, 20).unwrap());
485 }
486
487 #[test]
488 fn test_cascade_generator() {
489 let mut generator = CascadeGenerator::new(CascadeConfig::default());
490 let mut rng = ChaCha8Rng::seed_from_u64(42);
491
492 let cascade_id = generator.maybe_start_cascade(
494 &AnomalyType::Error(ErrorType::MisclassifiedAccount),
495 "JE001",
496 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
497 &mut rng,
498 );
499
500 if cascade_id.is_some() {
502 assert!(generator.active_count() > 0);
503 }
504 }
505
506 #[test]
507 fn test_cascade_generator_no_match() {
508 let mut generator = CascadeGenerator::new(CascadeConfig::default());
509 let mut rng = ChaCha8Rng::seed_from_u64(42);
510
511 let cascade_id = generator.maybe_start_cascade(
513 &AnomalyType::Fraud(datasynth_core::models::FraudType::SelfApproval),
514 "JE001",
515 NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
516 &mut rng,
517 );
518
519 assert!(cascade_id.is_none());
520 }
521}