datasynth_generators/anomaly/
injector.rs

1//! Main anomaly injection engine.
2//!
3//! The injector coordinates anomaly generation across all data types,
4//! managing rates, patterns, clustering, and label generation.
5
6use chrono::NaiveDate;
7use rand::Rng;
8use rand::SeedableRng;
9use rand_chacha::ChaCha8Rng;
10use rust_decimal::Decimal;
11use std::collections::HashMap;
12
13use datasynth_core::models::{
14    AnomalyRateConfig, AnomalySummary, AnomalyType, ErrorType, FraudType, JournalEntry,
15    LabeledAnomaly, RelationalAnomalyType,
16};
17
18use super::patterns::{
19    should_inject_anomaly, AnomalyPatternConfig, ClusterManager, EntityTargetingManager,
20    TemporalPattern,
21};
22use super::strategies::{DuplicationStrategy, StrategyCollection};
23use super::types::AnomalyTypeSelector;
24
25/// Configuration for the anomaly injector.
26#[derive(Debug, Clone)]
27pub struct AnomalyInjectorConfig {
28    /// Rate configuration.
29    pub rates: AnomalyRateConfig,
30    /// Pattern configuration.
31    pub patterns: AnomalyPatternConfig,
32    /// Random seed for reproducibility.
33    pub seed: u64,
34    /// Whether to generate labels.
35    pub generate_labels: bool,
36    /// Whether to allow duplicate injection.
37    pub allow_duplicates: bool,
38    /// Maximum anomalies per document.
39    pub max_anomalies_per_document: usize,
40    /// Company codes to target (empty = all).
41    pub target_companies: Vec<String>,
42    /// Date range for injection.
43    pub date_range: Option<(NaiveDate, NaiveDate)>,
44}
45
46impl Default for AnomalyInjectorConfig {
47    fn default() -> Self {
48        Self {
49            rates: AnomalyRateConfig::default(),
50            patterns: AnomalyPatternConfig::default(),
51            seed: 42,
52            generate_labels: true,
53            allow_duplicates: true,
54            max_anomalies_per_document: 2,
55            target_companies: Vec::new(),
56            date_range: None,
57        }
58    }
59}
60
61/// Result of an injection batch.
62#[derive(Debug, Clone)]
63pub struct InjectionBatchResult {
64    /// Number of entries processed.
65    pub entries_processed: usize,
66    /// Number of anomalies injected.
67    pub anomalies_injected: usize,
68    /// Number of duplicates created.
69    pub duplicates_created: usize,
70    /// Labels generated.
71    pub labels: Vec<LabeledAnomaly>,
72    /// Summary of anomalies.
73    pub summary: AnomalySummary,
74    /// Entries that were modified (document numbers).
75    pub modified_documents: Vec<String>,
76}
77
78/// Main anomaly injection engine.
79#[allow(dead_code)]
80pub struct AnomalyInjector {
81    config: AnomalyInjectorConfig,
82    rng: ChaCha8Rng,
83    type_selector: AnomalyTypeSelector,
84    strategies: StrategyCollection,
85    cluster_manager: ClusterManager,
86    entity_targeting: EntityTargetingManager,
87    /// Tracking which documents already have anomalies.
88    document_anomaly_counts: HashMap<String, usize>,
89    /// All generated labels.
90    labels: Vec<LabeledAnomaly>,
91    /// Statistics.
92    stats: InjectorStats,
93}
94
95/// Internal statistics tracking.
96#[derive(Debug, Clone, Default)]
97#[allow(dead_code)]
98pub struct InjectorStats {
99    total_processed: usize,
100    total_injected: usize,
101    by_category: HashMap<String, usize>,
102    by_type: HashMap<String, usize>,
103    by_company: HashMap<String, usize>,
104    skipped_rate: usize,
105    skipped_date: usize,
106    skipped_company: usize,
107    skipped_max_per_doc: usize,
108}
109
110impl AnomalyInjector {
111    /// Creates a new anomaly injector.
112    pub fn new(config: AnomalyInjectorConfig) -> Self {
113        let rng = ChaCha8Rng::seed_from_u64(config.seed);
114        let cluster_manager = ClusterManager::new(config.patterns.clustering.clone());
115        let entity_targeting =
116            EntityTargetingManager::new(config.patterns.entity_targeting.clone());
117
118        Self {
119            config,
120            rng,
121            type_selector: AnomalyTypeSelector::new(),
122            strategies: StrategyCollection::default(),
123            cluster_manager,
124            entity_targeting,
125            document_anomaly_counts: HashMap::new(),
126            labels: Vec::new(),
127            stats: InjectorStats::default(),
128        }
129    }
130
131    /// Processes a batch of journal entries, potentially injecting anomalies.
132    pub fn process_entries(&mut self, entries: &mut [JournalEntry]) -> InjectionBatchResult {
133        let mut modified_documents = Vec::new();
134        let mut duplicates = Vec::new();
135
136        for entry in entries.iter_mut() {
137            self.stats.total_processed += 1;
138
139            // Check if we should process this entry
140            if !self.should_process(entry) {
141                continue;
142            }
143
144            // Determine if we inject an anomaly
145            if should_inject_anomaly(
146                self.config.rates.total_rate,
147                entry.posting_date(),
148                &self.config.patterns.temporal_pattern,
149                &mut self.rng,
150            ) {
151                // Select anomaly category based on rates
152                let anomaly_type = self.select_anomaly_category();
153
154                // Apply the anomaly
155                if let Some(label) = self.inject_anomaly(entry, anomaly_type) {
156                    modified_documents.push(entry.document_number().clone());
157                    self.labels.push(label);
158                    self.stats.total_injected += 1;
159                }
160
161                // Check for duplicate injection
162                if self.config.allow_duplicates
163                    && matches!(
164                        self.labels.last().map(|l| &l.anomaly_type),
165                        Some(AnomalyType::Error(ErrorType::DuplicateEntry))
166                            | Some(AnomalyType::Fraud(FraudType::DuplicatePayment))
167                    )
168                {
169                    let dup_strategy = DuplicationStrategy::default();
170                    let duplicate = dup_strategy.duplicate(entry, &mut self.rng);
171                    duplicates.push(duplicate);
172                }
173            }
174        }
175
176        // Count duplicates
177        let duplicates_created = duplicates.len();
178
179        // Build summary
180        let summary = AnomalySummary::from_anomalies(&self.labels);
181
182        InjectionBatchResult {
183            entries_processed: self.stats.total_processed,
184            anomalies_injected: self.stats.total_injected,
185            duplicates_created,
186            labels: self.labels.clone(),
187            summary,
188            modified_documents,
189        }
190    }
191
192    /// Checks if an entry should be processed.
193    fn should_process(&mut self, entry: &JournalEntry) -> bool {
194        // Check company filter
195        if !self.config.target_companies.is_empty()
196            && !self
197                .config
198                .target_companies
199                .iter()
200                .any(|c| c == entry.company_code())
201        {
202            self.stats.skipped_company += 1;
203            return false;
204        }
205
206        // Check date range
207        if let Some((start, end)) = self.config.date_range {
208            if entry.posting_date() < start || entry.posting_date() > end {
209                self.stats.skipped_date += 1;
210                return false;
211            }
212        }
213
214        // Check max anomalies per document
215        let current_count = self
216            .document_anomaly_counts
217            .get(&entry.document_number())
218            .copied()
219            .unwrap_or(0);
220        if current_count >= self.config.max_anomalies_per_document {
221            self.stats.skipped_max_per_doc += 1;
222            return false;
223        }
224
225        true
226    }
227
228    /// Selects an anomaly category based on configured rates.
229    fn select_anomaly_category(&mut self) -> AnomalyType {
230        let r = self.rng.gen::<f64>();
231        let rates = &self.config.rates;
232
233        let mut cumulative = 0.0;
234
235        cumulative += rates.fraud_rate;
236        if r < cumulative {
237            return self.type_selector.select_fraud(&mut self.rng);
238        }
239
240        cumulative += rates.error_rate;
241        if r < cumulative {
242            return self.type_selector.select_error(&mut self.rng);
243        }
244
245        cumulative += rates.process_issue_rate;
246        if r < cumulative {
247            return self.type_selector.select_process_issue(&mut self.rng);
248        }
249
250        cumulative += rates.statistical_rate;
251        if r < cumulative {
252            return self.type_selector.select_statistical(&mut self.rng);
253        }
254
255        self.type_selector.select_relational(&mut self.rng)
256    }
257
258    /// Injects an anomaly into an entry.
259    fn inject_anomaly(
260        &mut self,
261        entry: &mut JournalEntry,
262        anomaly_type: AnomalyType,
263    ) -> Option<LabeledAnomaly> {
264        // Check if strategy can be applied
265        if !self.strategies.can_apply(entry, &anomaly_type) {
266            return None;
267        }
268
269        // Apply the strategy
270        let result = self
271            .strategies
272            .apply_strategy(entry, &anomaly_type, &mut self.rng);
273
274        if !result.success {
275            return None;
276        }
277
278        // Update document anomaly count
279        *self
280            .document_anomaly_counts
281            .entry(entry.document_number().clone())
282            .or_insert(0) += 1;
283
284        // Update statistics
285        let category = anomaly_type.category().to_string();
286        let type_name = anomaly_type.type_name();
287
288        *self.stats.by_category.entry(category).or_insert(0) += 1;
289        *self.stats.by_type.entry(type_name.clone()).or_insert(0) += 1;
290        *self
291            .stats
292            .by_company
293            .entry(entry.company_code().to_string())
294            .or_insert(0) += 1;
295
296        // Generate label
297        if self.config.generate_labels {
298            let anomaly_id = format!("ANO{:08}", self.labels.len() + 1);
299
300            let mut label = LabeledAnomaly::new(
301                anomaly_id,
302                anomaly_type.clone(),
303                entry.document_number().clone(),
304                "JE".to_string(),
305                entry.company_code().to_string(),
306                entry.posting_date(),
307            )
308            .with_description(&result.description)
309            .with_injection_strategy(&type_name);
310
311            // Add monetary impact
312            if let Some(impact) = result.monetary_impact {
313                label = label.with_monetary_impact(impact);
314            }
315
316            // Add related entities
317            for entity in &result.related_entities {
318                label = label.with_related_entity(entity);
319            }
320
321            // Add metadata
322            for (key, value) in &result.metadata {
323                label = label.with_metadata(key, value);
324            }
325
326            // Assign cluster
327            if let Some(cluster_id) =
328                self.cluster_manager
329                    .assign_cluster(entry.posting_date(), &type_name, &mut self.rng)
330            {
331                label = label.with_cluster(&cluster_id);
332            }
333
334            return Some(label);
335        }
336
337        None
338    }
339
340    /// Injects a specific anomaly type into an entry.
341    pub fn inject_specific(
342        &mut self,
343        entry: &mut JournalEntry,
344        anomaly_type: AnomalyType,
345    ) -> Option<LabeledAnomaly> {
346        self.inject_anomaly(entry, anomaly_type)
347    }
348
349    /// Creates a self-approval anomaly.
350    pub fn create_self_approval(
351        &mut self,
352        entry: &mut JournalEntry,
353        user_id: &str,
354    ) -> Option<LabeledAnomaly> {
355        let anomaly_type = AnomalyType::Fraud(FraudType::SelfApproval);
356
357        let label = LabeledAnomaly::new(
358            format!("ANO{:08}", self.labels.len() + 1),
359            anomaly_type,
360            entry.document_number().clone(),
361            "JE".to_string(),
362            entry.company_code().to_string(),
363            entry.posting_date(),
364        )
365        .with_description(&format!("User {} approved their own transaction", user_id))
366        .with_related_entity(user_id)
367        .with_injection_strategy("ManualSelfApproval");
368
369        // Set approver = requester
370        entry.header.created_by = user_id.to_string();
371
372        self.labels.push(label.clone());
373        Some(label)
374    }
375
376    /// Creates a segregation of duties violation.
377    pub fn create_sod_violation(
378        &mut self,
379        entry: &mut JournalEntry,
380        user_id: &str,
381        conflicting_duties: (&str, &str),
382    ) -> Option<LabeledAnomaly> {
383        let anomaly_type = AnomalyType::Fraud(FraudType::SegregationOfDutiesViolation);
384
385        let label = LabeledAnomaly::new(
386            format!("ANO{:08}", self.labels.len() + 1),
387            anomaly_type,
388            entry.document_number().clone(),
389            "JE".to_string(),
390            entry.company_code().to_string(),
391            entry.posting_date(),
392        )
393        .with_description(&format!(
394            "User {} performed conflicting duties: {} and {}",
395            user_id, conflicting_duties.0, conflicting_duties.1
396        ))
397        .with_related_entity(user_id)
398        .with_metadata("duty1", conflicting_duties.0)
399        .with_metadata("duty2", conflicting_duties.1)
400        .with_injection_strategy("ManualSoDViolation");
401
402        self.labels.push(label.clone());
403        Some(label)
404    }
405
406    /// Creates an intercompany mismatch anomaly.
407    pub fn create_ic_mismatch(
408        &mut self,
409        entry: &mut JournalEntry,
410        matching_company: &str,
411        expected_amount: Decimal,
412        actual_amount: Decimal,
413    ) -> Option<LabeledAnomaly> {
414        let anomaly_type = AnomalyType::Relational(RelationalAnomalyType::UnmatchedIntercompany);
415
416        let label = LabeledAnomaly::new(
417            format!("ANO{:08}", self.labels.len() + 1),
418            anomaly_type,
419            entry.document_number().clone(),
420            "JE".to_string(),
421            entry.company_code().to_string(),
422            entry.posting_date(),
423        )
424        .with_description(&format!(
425            "Intercompany mismatch with {}: expected {} but got {}",
426            matching_company, expected_amount, actual_amount
427        ))
428        .with_related_entity(matching_company)
429        .with_monetary_impact(actual_amount - expected_amount)
430        .with_metadata("expected_amount", &expected_amount.to_string())
431        .with_metadata("actual_amount", &actual_amount.to_string())
432        .with_injection_strategy("ManualICMismatch");
433
434        self.labels.push(label.clone());
435        Some(label)
436    }
437
438    /// Returns all generated labels.
439    pub fn get_labels(&self) -> &[LabeledAnomaly] {
440        &self.labels
441    }
442
443    /// Returns the anomaly summary.
444    pub fn get_summary(&self) -> AnomalySummary {
445        AnomalySummary::from_anomalies(&self.labels)
446    }
447
448    /// Returns injection statistics.
449    pub fn get_stats(&self) -> &InjectorStats {
450        &self.stats
451    }
452
453    /// Clears all labels and resets statistics.
454    pub fn reset(&mut self) {
455        self.labels.clear();
456        self.document_anomaly_counts.clear();
457        self.stats = InjectorStats::default();
458        self.cluster_manager = ClusterManager::new(self.config.patterns.clustering.clone());
459    }
460
461    /// Returns the number of clusters created.
462    pub fn cluster_count(&self) -> usize {
463        self.cluster_manager.cluster_count()
464    }
465}
466
467/// Builder for AnomalyInjectorConfig.
468pub struct AnomalyInjectorConfigBuilder {
469    config: AnomalyInjectorConfig,
470}
471
472impl AnomalyInjectorConfigBuilder {
473    /// Creates a new builder with default configuration.
474    pub fn new() -> Self {
475        Self {
476            config: AnomalyInjectorConfig::default(),
477        }
478    }
479
480    /// Sets the total anomaly rate.
481    pub fn with_total_rate(mut self, rate: f64) -> Self {
482        self.config.rates.total_rate = rate;
483        self
484    }
485
486    /// Sets the fraud rate (proportion of anomalies).
487    pub fn with_fraud_rate(mut self, rate: f64) -> Self {
488        self.config.rates.fraud_rate = rate;
489        self
490    }
491
492    /// Sets the error rate (proportion of anomalies).
493    pub fn with_error_rate(mut self, rate: f64) -> Self {
494        self.config.rates.error_rate = rate;
495        self
496    }
497
498    /// Sets the random seed.
499    pub fn with_seed(mut self, seed: u64) -> Self {
500        self.config.seed = seed;
501        self
502    }
503
504    /// Sets the temporal pattern.
505    pub fn with_temporal_pattern(mut self, pattern: TemporalPattern) -> Self {
506        self.config.patterns.temporal_pattern = pattern;
507        self
508    }
509
510    /// Enables or disables label generation.
511    pub fn with_labels(mut self, generate: bool) -> Self {
512        self.config.generate_labels = generate;
513        self
514    }
515
516    /// Sets target companies.
517    pub fn with_target_companies(mut self, companies: Vec<String>) -> Self {
518        self.config.target_companies = companies;
519        self
520    }
521
522    /// Sets the date range.
523    pub fn with_date_range(mut self, start: NaiveDate, end: NaiveDate) -> Self {
524        self.config.date_range = Some((start, end));
525        self
526    }
527
528    /// Builds the configuration.
529    pub fn build(self) -> AnomalyInjectorConfig {
530        self.config
531    }
532}
533
534impl Default for AnomalyInjectorConfigBuilder {
535    fn default() -> Self {
536        Self::new()
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use chrono::NaiveDate;
544    use datasynth_core::models::{JournalEntryLine, StatisticalAnomalyType};
545    use rust_decimal_macros::dec;
546
547    fn create_test_entry(doc_num: &str) -> JournalEntry {
548        let mut entry = JournalEntry::new_simple(
549            doc_num.to_string(),
550            "1000".to_string(),
551            NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(),
552            "Test Entry".to_string(),
553        );
554
555        entry.add_line(JournalEntryLine {
556            line_number: 1,
557            gl_account: "5000".to_string(),
558            debit_amount: dec!(1000),
559            ..Default::default()
560        });
561
562        entry.add_line(JournalEntryLine {
563            line_number: 2,
564            gl_account: "1000".to_string(),
565            credit_amount: dec!(1000),
566            ..Default::default()
567        });
568
569        entry
570    }
571
572    #[test]
573    fn test_anomaly_injector_basic() {
574        let config = AnomalyInjectorConfigBuilder::new()
575            .with_total_rate(0.5) // High rate for testing
576            .with_seed(42)
577            .build();
578
579        let mut injector = AnomalyInjector::new(config);
580
581        let mut entries: Vec<_> = (0..100)
582            .map(|i| create_test_entry(&format!("JE{:04}", i)))
583            .collect();
584
585        let result = injector.process_entries(&mut entries);
586
587        // With 50% rate, we should have some anomalies
588        assert!(result.anomalies_injected > 0);
589        assert!(!result.labels.is_empty());
590        assert_eq!(result.labels.len(), result.anomalies_injected);
591    }
592
593    #[test]
594    fn test_specific_injection() {
595        let config = AnomalyInjectorConfig::default();
596        let mut injector = AnomalyInjector::new(config);
597
598        let mut entry = create_test_entry("JE001");
599        let anomaly_type = AnomalyType::Statistical(StatisticalAnomalyType::UnusuallyHighAmount);
600
601        let label = injector.inject_specific(&mut entry, anomaly_type);
602
603        assert!(label.is_some());
604        let label = label.unwrap();
605        // document_id is the UUID string from the journal entry header
606        assert!(!label.document_id.is_empty());
607        assert_eq!(label.document_id, entry.document_number());
608    }
609
610    #[test]
611    fn test_self_approval_injection() {
612        let config = AnomalyInjectorConfig::default();
613        let mut injector = AnomalyInjector::new(config);
614
615        let mut entry = create_test_entry("JE001");
616        let label = injector.create_self_approval(&mut entry, "USER001");
617
618        assert!(label.is_some());
619        let label = label.unwrap();
620        assert!(matches!(
621            label.anomaly_type,
622            AnomalyType::Fraud(FraudType::SelfApproval)
623        ));
624        assert!(label.related_entities.contains(&"USER001".to_string()));
625    }
626
627    #[test]
628    fn test_company_filtering() {
629        let config = AnomalyInjectorConfigBuilder::new()
630            .with_total_rate(1.0) // Inject all
631            .with_target_companies(vec!["2000".to_string()])
632            .build();
633
634        let mut injector = AnomalyInjector::new(config);
635
636        let mut entries = vec![
637            create_test_entry("JE001"), // company 1000
638            create_test_entry("JE002"), // company 1000
639        ];
640
641        let result = injector.process_entries(&mut entries);
642
643        // No anomalies because entries are in company 1000, not 2000
644        assert_eq!(result.anomalies_injected, 0);
645    }
646}