Skip to main content

datasynth_generators/anomaly/
injector.rs

1//! Main anomaly injection engine.
2//!
3//! The injector coordinates anomaly generation across all data types,
4//! managing rates, patterns, clustering, and label generation.
5
6use chrono::NaiveDate;
7use rand::Rng;
8use rand::SeedableRng;
9use rand_chacha::ChaCha8Rng;
10use rust_decimal::Decimal;
11use std::collections::HashMap;
12
13use datasynth_core::models::{
14    AnomalyCausalReason, AnomalyRateConfig, AnomalySummary, AnomalyType, ErrorType, FraudType,
15    JournalEntry, LabeledAnomaly, RelationalAnomalyType,
16};
17
18use super::patterns::{
19    should_inject_anomaly, AnomalyPatternConfig, ClusterManager, EntityTargetingManager,
20    TemporalPattern,
21};
22use super::strategies::{DuplicationStrategy, StrategyCollection};
23use super::types::AnomalyTypeSelector;
24
25/// Configuration for the anomaly injector.
26#[derive(Debug, Clone)]
27pub struct AnomalyInjectorConfig {
28    /// Rate configuration.
29    pub rates: AnomalyRateConfig,
30    /// Pattern configuration.
31    pub patterns: AnomalyPatternConfig,
32    /// Random seed for reproducibility.
33    pub seed: u64,
34    /// Whether to generate labels.
35    pub generate_labels: bool,
36    /// Whether to allow duplicate injection.
37    pub allow_duplicates: bool,
38    /// Maximum anomalies per document.
39    pub max_anomalies_per_document: usize,
40    /// Company codes to target (empty = all).
41    pub target_companies: Vec<String>,
42    /// Date range for injection.
43    pub date_range: Option<(NaiveDate, NaiveDate)>,
44}
45
46impl Default for AnomalyInjectorConfig {
47    fn default() -> Self {
48        Self {
49            rates: AnomalyRateConfig::default(),
50            patterns: AnomalyPatternConfig::default(),
51            seed: 42,
52            generate_labels: true,
53            allow_duplicates: true,
54            max_anomalies_per_document: 2,
55            target_companies: Vec::new(),
56            date_range: None,
57        }
58    }
59}
60
61/// Result of an injection batch.
62#[derive(Debug, Clone)]
63pub struct InjectionBatchResult {
64    /// Number of entries processed.
65    pub entries_processed: usize,
66    /// Number of anomalies injected.
67    pub anomalies_injected: usize,
68    /// Number of duplicates created.
69    pub duplicates_created: usize,
70    /// Labels generated.
71    pub labels: Vec<LabeledAnomaly>,
72    /// Summary of anomalies.
73    pub summary: AnomalySummary,
74    /// Entries that were modified (document numbers).
75    pub modified_documents: Vec<String>,
76}
77
78/// Main anomaly injection engine.
79#[allow(dead_code)]
80pub struct AnomalyInjector {
81    config: AnomalyInjectorConfig,
82    rng: ChaCha8Rng,
83    type_selector: AnomalyTypeSelector,
84    strategies: StrategyCollection,
85    cluster_manager: ClusterManager,
86    entity_targeting: EntityTargetingManager,
87    /// Tracking which documents already have anomalies.
88    document_anomaly_counts: HashMap<String, usize>,
89    /// All generated labels.
90    labels: Vec<LabeledAnomaly>,
91    /// Statistics.
92    stats: InjectorStats,
93}
94
95/// Internal statistics tracking.
96#[derive(Debug, Clone, Default)]
97#[allow(dead_code)]
98pub struct InjectorStats {
99    total_processed: usize,
100    total_injected: usize,
101    by_category: HashMap<String, usize>,
102    by_type: HashMap<String, usize>,
103    by_company: HashMap<String, usize>,
104    skipped_rate: usize,
105    skipped_date: usize,
106    skipped_company: usize,
107    skipped_max_per_doc: usize,
108}
109
110impl AnomalyInjector {
111    /// Creates a new anomaly injector.
112    pub fn new(config: AnomalyInjectorConfig) -> Self {
113        let rng = ChaCha8Rng::seed_from_u64(config.seed);
114        let cluster_manager = ClusterManager::new(config.patterns.clustering.clone());
115        let entity_targeting =
116            EntityTargetingManager::new(config.patterns.entity_targeting.clone());
117
118        Self {
119            config,
120            rng,
121            type_selector: AnomalyTypeSelector::new(),
122            strategies: StrategyCollection::default(),
123            cluster_manager,
124            entity_targeting,
125            document_anomaly_counts: HashMap::new(),
126            labels: Vec::new(),
127            stats: InjectorStats::default(),
128        }
129    }
130
131    /// Processes a batch of journal entries, potentially injecting anomalies.
132    pub fn process_entries(&mut self, entries: &mut [JournalEntry]) -> InjectionBatchResult {
133        let mut modified_documents = Vec::new();
134        let mut duplicates = Vec::new();
135
136        for entry in entries.iter_mut() {
137            self.stats.total_processed += 1;
138
139            // Check if we should process this entry
140            if !self.should_process(entry) {
141                continue;
142            }
143
144            // Determine if we inject an anomaly
145            if should_inject_anomaly(
146                self.config.rates.total_rate,
147                entry.posting_date(),
148                &self.config.patterns.temporal_pattern,
149                &mut self.rng,
150            ) {
151                // Select anomaly category based on rates
152                let anomaly_type = self.select_anomaly_category();
153
154                // Apply the anomaly
155                if let Some(label) = self.inject_anomaly(entry, anomaly_type) {
156                    modified_documents.push(entry.document_number().clone());
157                    self.labels.push(label);
158                    self.stats.total_injected += 1;
159                }
160
161                // Check for duplicate injection
162                if self.config.allow_duplicates
163                    && matches!(
164                        self.labels.last().map(|l| &l.anomaly_type),
165                        Some(AnomalyType::Error(ErrorType::DuplicateEntry))
166                            | Some(AnomalyType::Fraud(FraudType::DuplicatePayment))
167                    )
168                {
169                    let dup_strategy = DuplicationStrategy::default();
170                    let duplicate = dup_strategy.duplicate(entry, &mut self.rng);
171                    duplicates.push(duplicate);
172                }
173            }
174        }
175
176        // Count duplicates
177        let duplicates_created = duplicates.len();
178
179        // Build summary
180        let summary = AnomalySummary::from_anomalies(&self.labels);
181
182        InjectionBatchResult {
183            entries_processed: self.stats.total_processed,
184            anomalies_injected: self.stats.total_injected,
185            duplicates_created,
186            labels: self.labels.clone(),
187            summary,
188            modified_documents,
189        }
190    }
191
192    /// Checks if an entry should be processed.
193    fn should_process(&mut self, entry: &JournalEntry) -> bool {
194        // Check company filter
195        if !self.config.target_companies.is_empty()
196            && !self
197                .config
198                .target_companies
199                .iter()
200                .any(|c| c == entry.company_code())
201        {
202            self.stats.skipped_company += 1;
203            return false;
204        }
205
206        // Check date range
207        if let Some((start, end)) = self.config.date_range {
208            if entry.posting_date() < start || entry.posting_date() > end {
209                self.stats.skipped_date += 1;
210                return false;
211            }
212        }
213
214        // Check max anomalies per document
215        let current_count = self
216            .document_anomaly_counts
217            .get(&entry.document_number())
218            .copied()
219            .unwrap_or(0);
220        if current_count >= self.config.max_anomalies_per_document {
221            self.stats.skipped_max_per_doc += 1;
222            return false;
223        }
224
225        true
226    }
227
228    /// Selects an anomaly category based on configured rates.
229    fn select_anomaly_category(&mut self) -> AnomalyType {
230        let r = self.rng.gen::<f64>();
231        let rates = &self.config.rates;
232
233        let mut cumulative = 0.0;
234
235        cumulative += rates.fraud_rate;
236        if r < cumulative {
237            return self.type_selector.select_fraud(&mut self.rng);
238        }
239
240        cumulative += rates.error_rate;
241        if r < cumulative {
242            return self.type_selector.select_error(&mut self.rng);
243        }
244
245        cumulative += rates.process_issue_rate;
246        if r < cumulative {
247            return self.type_selector.select_process_issue(&mut self.rng);
248        }
249
250        cumulative += rates.statistical_rate;
251        if r < cumulative {
252            return self.type_selector.select_statistical(&mut self.rng);
253        }
254
255        self.type_selector.select_relational(&mut self.rng)
256    }
257
258    /// Injects an anomaly into an entry.
259    fn inject_anomaly(
260        &mut self,
261        entry: &mut JournalEntry,
262        anomaly_type: AnomalyType,
263    ) -> Option<LabeledAnomaly> {
264        // Check if strategy can be applied
265        if !self.strategies.can_apply(entry, &anomaly_type) {
266            return None;
267        }
268
269        // Apply the strategy
270        let result = self
271            .strategies
272            .apply_strategy(entry, &anomaly_type, &mut self.rng);
273
274        if !result.success {
275            return None;
276        }
277
278        // Update document anomaly count
279        *self
280            .document_anomaly_counts
281            .entry(entry.document_number().clone())
282            .or_insert(0) += 1;
283
284        // Update statistics
285        let category = anomaly_type.category().to_string();
286        let type_name = anomaly_type.type_name();
287
288        *self.stats.by_category.entry(category).or_insert(0) += 1;
289        *self.stats.by_type.entry(type_name.clone()).or_insert(0) += 1;
290        *self
291            .stats
292            .by_company
293            .entry(entry.company_code().to_string())
294            .or_insert(0) += 1;
295
296        // Generate label
297        if self.config.generate_labels {
298            let anomaly_id = format!("ANO{:08}", self.labels.len() + 1);
299
300            // Update entry header with anomaly tracking fields
301            entry.header.is_anomaly = true;
302            entry.header.anomaly_id = Some(anomaly_id.clone());
303            entry.header.anomaly_type = Some(type_name.clone());
304
305            // Also set fraud flag if this is a fraud anomaly
306            if matches!(anomaly_type, AnomalyType::Fraud(_)) {
307                entry.header.is_fraud = true;
308                if let AnomalyType::Fraud(ref ft) = anomaly_type {
309                    entry.header.fraud_type = Some(*ft);
310                }
311            }
312
313            let mut label = LabeledAnomaly::new(
314                anomaly_id,
315                anomaly_type.clone(),
316                entry.document_number().clone(),
317                "JE".to_string(),
318                entry.company_code().to_string(),
319                entry.posting_date(),
320            )
321            .with_description(&result.description)
322            .with_injection_strategy(&type_name);
323
324            // Add causal reason with injection context (provenance tracking)
325            let causal_reason = AnomalyCausalReason::RandomRate {
326                base_rate: self.config.rates.total_rate,
327            };
328            label = label.with_causal_reason(causal_reason);
329
330            // Add monetary impact
331            if let Some(impact) = result.monetary_impact {
332                label = label.with_monetary_impact(impact);
333            }
334
335            // Add related entities
336            for entity in &result.related_entities {
337                label = label.with_related_entity(entity);
338            }
339
340            // Add metadata
341            for (key, value) in &result.metadata {
342                label = label.with_metadata(key, value);
343            }
344
345            // Assign cluster and update causal reason if in cluster
346            if let Some(cluster_id) =
347                self.cluster_manager
348                    .assign_cluster(entry.posting_date(), &type_name, &mut self.rng)
349            {
350                label = label.with_cluster(&cluster_id);
351                // Update causal reason to reflect cluster membership
352                label = label.with_causal_reason(AnomalyCausalReason::ClusterMembership {
353                    cluster_id: cluster_id.clone(),
354                });
355            }
356
357            return Some(label);
358        }
359
360        None
361    }
362
363    /// Injects a specific anomaly type into an entry.
364    pub fn inject_specific(
365        &mut self,
366        entry: &mut JournalEntry,
367        anomaly_type: AnomalyType,
368    ) -> Option<LabeledAnomaly> {
369        self.inject_anomaly(entry, anomaly_type)
370    }
371
372    /// Creates a self-approval anomaly.
373    pub fn create_self_approval(
374        &mut self,
375        entry: &mut JournalEntry,
376        user_id: &str,
377    ) -> Option<LabeledAnomaly> {
378        let anomaly_type = AnomalyType::Fraud(FraudType::SelfApproval);
379
380        let label = LabeledAnomaly::new(
381            format!("ANO{:08}", self.labels.len() + 1),
382            anomaly_type,
383            entry.document_number().clone(),
384            "JE".to_string(),
385            entry.company_code().to_string(),
386            entry.posting_date(),
387        )
388        .with_description(&format!("User {} approved their own transaction", user_id))
389        .with_related_entity(user_id)
390        .with_injection_strategy("ManualSelfApproval")
391        .with_causal_reason(AnomalyCausalReason::EntityTargeting {
392            target_type: "User".to_string(),
393            target_id: user_id.to_string(),
394        });
395
396        // Set entry header anomaly tracking fields
397        entry.header.is_anomaly = true;
398        entry.header.is_fraud = true;
399        entry.header.anomaly_id = Some(label.anomaly_id.clone());
400        entry.header.anomaly_type = Some("SelfApproval".to_string());
401        entry.header.fraud_type = Some(FraudType::SelfApproval);
402
403        // Set approver = requester
404        entry.header.created_by = user_id.to_string();
405
406        self.labels.push(label.clone());
407        Some(label)
408    }
409
410    /// Creates a segregation of duties violation.
411    pub fn create_sod_violation(
412        &mut self,
413        entry: &mut JournalEntry,
414        user_id: &str,
415        conflicting_duties: (&str, &str),
416    ) -> Option<LabeledAnomaly> {
417        let anomaly_type = AnomalyType::Fraud(FraudType::SegregationOfDutiesViolation);
418
419        let label = LabeledAnomaly::new(
420            format!("ANO{:08}", self.labels.len() + 1),
421            anomaly_type,
422            entry.document_number().clone(),
423            "JE".to_string(),
424            entry.company_code().to_string(),
425            entry.posting_date(),
426        )
427        .with_description(&format!(
428            "User {} performed conflicting duties: {} and {}",
429            user_id, conflicting_duties.0, conflicting_duties.1
430        ))
431        .with_related_entity(user_id)
432        .with_metadata("duty1", conflicting_duties.0)
433        .with_metadata("duty2", conflicting_duties.1)
434        .with_injection_strategy("ManualSoDViolation")
435        .with_causal_reason(AnomalyCausalReason::EntityTargeting {
436            target_type: "User".to_string(),
437            target_id: user_id.to_string(),
438        });
439
440        // Set entry header anomaly tracking fields
441        entry.header.is_anomaly = true;
442        entry.header.is_fraud = true;
443        entry.header.anomaly_id = Some(label.anomaly_id.clone());
444        entry.header.anomaly_type = Some("SegregationOfDutiesViolation".to_string());
445        entry.header.fraud_type = Some(FraudType::SegregationOfDutiesViolation);
446
447        self.labels.push(label.clone());
448        Some(label)
449    }
450
451    /// Creates an intercompany mismatch anomaly.
452    pub fn create_ic_mismatch(
453        &mut self,
454        entry: &mut JournalEntry,
455        matching_company: &str,
456        expected_amount: Decimal,
457        actual_amount: Decimal,
458    ) -> Option<LabeledAnomaly> {
459        let anomaly_type = AnomalyType::Relational(RelationalAnomalyType::UnmatchedIntercompany);
460
461        let label = LabeledAnomaly::new(
462            format!("ANO{:08}", self.labels.len() + 1),
463            anomaly_type,
464            entry.document_number().clone(),
465            "JE".to_string(),
466            entry.company_code().to_string(),
467            entry.posting_date(),
468        )
469        .with_description(&format!(
470            "Intercompany mismatch with {}: expected {} but got {}",
471            matching_company, expected_amount, actual_amount
472        ))
473        .with_related_entity(matching_company)
474        .with_monetary_impact(actual_amount - expected_amount)
475        .with_metadata("expected_amount", &expected_amount.to_string())
476        .with_metadata("actual_amount", &actual_amount.to_string())
477        .with_injection_strategy("ManualICMismatch")
478        .with_causal_reason(AnomalyCausalReason::EntityTargeting {
479            target_type: "Intercompany".to_string(),
480            target_id: matching_company.to_string(),
481        });
482
483        // Set entry header anomaly tracking fields
484        entry.header.is_anomaly = true;
485        entry.header.anomaly_id = Some(label.anomaly_id.clone());
486        entry.header.anomaly_type = Some("UnmatchedIntercompany".to_string());
487
488        self.labels.push(label.clone());
489        Some(label)
490    }
491
492    /// Returns all generated labels.
493    pub fn get_labels(&self) -> &[LabeledAnomaly] {
494        &self.labels
495    }
496
497    /// Returns the anomaly summary.
498    pub fn get_summary(&self) -> AnomalySummary {
499        AnomalySummary::from_anomalies(&self.labels)
500    }
501
502    /// Returns injection statistics.
503    pub fn get_stats(&self) -> &InjectorStats {
504        &self.stats
505    }
506
507    /// Clears all labels and resets statistics.
508    pub fn reset(&mut self) {
509        self.labels.clear();
510        self.document_anomaly_counts.clear();
511        self.stats = InjectorStats::default();
512        self.cluster_manager = ClusterManager::new(self.config.patterns.clustering.clone());
513    }
514
515    /// Returns the number of clusters created.
516    pub fn cluster_count(&self) -> usize {
517        self.cluster_manager.cluster_count()
518    }
519}
520
521/// Builder for AnomalyInjectorConfig.
522pub struct AnomalyInjectorConfigBuilder {
523    config: AnomalyInjectorConfig,
524}
525
526impl AnomalyInjectorConfigBuilder {
527    /// Creates a new builder with default configuration.
528    pub fn new() -> Self {
529        Self {
530            config: AnomalyInjectorConfig::default(),
531        }
532    }
533
534    /// Sets the total anomaly rate.
535    pub fn with_total_rate(mut self, rate: f64) -> Self {
536        self.config.rates.total_rate = rate;
537        self
538    }
539
540    /// Sets the fraud rate (proportion of anomalies).
541    pub fn with_fraud_rate(mut self, rate: f64) -> Self {
542        self.config.rates.fraud_rate = rate;
543        self
544    }
545
546    /// Sets the error rate (proportion of anomalies).
547    pub fn with_error_rate(mut self, rate: f64) -> Self {
548        self.config.rates.error_rate = rate;
549        self
550    }
551
552    /// Sets the random seed.
553    pub fn with_seed(mut self, seed: u64) -> Self {
554        self.config.seed = seed;
555        self
556    }
557
558    /// Sets the temporal pattern.
559    pub fn with_temporal_pattern(mut self, pattern: TemporalPattern) -> Self {
560        self.config.patterns.temporal_pattern = pattern;
561        self
562    }
563
564    /// Enables or disables label generation.
565    pub fn with_labels(mut self, generate: bool) -> Self {
566        self.config.generate_labels = generate;
567        self
568    }
569
570    /// Sets target companies.
571    pub fn with_target_companies(mut self, companies: Vec<String>) -> Self {
572        self.config.target_companies = companies;
573        self
574    }
575
576    /// Sets the date range.
577    pub fn with_date_range(mut self, start: NaiveDate, end: NaiveDate) -> Self {
578        self.config.date_range = Some((start, end));
579        self
580    }
581
582    /// Builds the configuration.
583    pub fn build(self) -> AnomalyInjectorConfig {
584        self.config
585    }
586}
587
588impl Default for AnomalyInjectorConfigBuilder {
589    fn default() -> Self {
590        Self::new()
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use chrono::NaiveDate;
598    use datasynth_core::models::{JournalEntryLine, StatisticalAnomalyType};
599    use rust_decimal_macros::dec;
600
601    fn create_test_entry(doc_num: &str) -> JournalEntry {
602        let mut entry = JournalEntry::new_simple(
603            doc_num.to_string(),
604            "1000".to_string(),
605            NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
606            "Test Entry".to_string(),
607        );
608
609        entry.add_line(JournalEntryLine {
610            line_number: 1,
611            gl_account: "5000".to_string(),
612            debit_amount: dec!(1000),
613            ..Default::default()
614        });
615
616        entry.add_line(JournalEntryLine {
617            line_number: 2,
618            gl_account: "1000".to_string(),
619            credit_amount: dec!(1000),
620            ..Default::default()
621        });
622
623        entry
624    }
625
626    #[test]
627    fn test_anomaly_injector_basic() {
628        let config = AnomalyInjectorConfigBuilder::new()
629            .with_total_rate(0.5) // High rate for testing
630            .with_seed(42)
631            .build();
632
633        let mut injector = AnomalyInjector::new(config);
634
635        let mut entries: Vec<_> = (0..100)
636            .map(|i| create_test_entry(&format!("JE{:04}", i)))
637            .collect();
638
639        let result = injector.process_entries(&mut entries);
640
641        // With 50% rate, we should have some anomalies
642        assert!(result.anomalies_injected > 0);
643        assert!(!result.labels.is_empty());
644        assert_eq!(result.labels.len(), result.anomalies_injected);
645    }
646
647    #[test]
648    fn test_specific_injection() {
649        let config = AnomalyInjectorConfig::default();
650        let mut injector = AnomalyInjector::new(config);
651
652        let mut entry = create_test_entry("JE001");
653        let anomaly_type = AnomalyType::Statistical(StatisticalAnomalyType::UnusuallyHighAmount);
654
655        let label = injector.inject_specific(&mut entry, anomaly_type);
656
657        assert!(label.is_some());
658        let label = label.unwrap();
659        // document_id is the UUID string from the journal entry header
660        assert!(!label.document_id.is_empty());
661        assert_eq!(label.document_id, entry.document_number());
662    }
663
664    #[test]
665    fn test_self_approval_injection() {
666        let config = AnomalyInjectorConfig::default();
667        let mut injector = AnomalyInjector::new(config);
668
669        let mut entry = create_test_entry("JE001");
670        let label = injector.create_self_approval(&mut entry, "USER001");
671
672        assert!(label.is_some());
673        let label = label.unwrap();
674        assert!(matches!(
675            label.anomaly_type,
676            AnomalyType::Fraud(FraudType::SelfApproval)
677        ));
678        assert!(label.related_entities.contains(&"USER001".to_string()));
679    }
680
681    #[test]
682    fn test_company_filtering() {
683        let config = AnomalyInjectorConfigBuilder::new()
684            .with_total_rate(1.0) // Inject all
685            .with_target_companies(vec!["2000".to_string()])
686            .build();
687
688        let mut injector = AnomalyInjector::new(config);
689
690        let mut entries = vec![
691            create_test_entry("JE001"), // company 1000
692            create_test_entry("JE002"), // company 1000
693        ];
694
695        let result = injector.process_entries(&mut entries);
696
697        // No anomalies because entries are in company 1000, not 2000
698        assert_eq!(result.anomalies_injected, 0);
699    }
700}