Skip to main content

datasynth_generators/anomaly/
scheme_advancer.rs

1//! Scheme advancer for managing multiple fraud schemes.
2//!
3//! The SchemeAdvancer coordinates the lifecycle of multiple fraud schemes,
4//! handling scheme creation, advancement, and completion.
5
6use chrono::NaiveDate;
7use rand::Rng;
8use rand::SeedableRng;
9use rand_chacha::ChaCha8Rng;
10use rust_decimal::Decimal;
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use datasynth_core::models::{SchemeDetectionStatus, SchemeType};
15
16use super::schemes::{
17    FraudScheme, GradualEmbezzlementScheme, RevenueManipulationScheme, SchemeAction, SchemeContext,
18    SchemeStatus, VendorKickbackScheme,
19};
20
21/// Configuration for scheme generation.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SchemeAdvancerConfig {
24    /// Probability of starting an embezzlement scheme per period.
25    pub embezzlement_probability: f64,
26    /// Probability of starting a revenue manipulation scheme per period.
27    pub revenue_manipulation_probability: f64,
28    /// Probability of starting a kickback scheme per period.
29    pub kickback_probability: f64,
30    /// Maximum number of concurrent schemes.
31    pub max_concurrent_schemes: usize,
32    /// Whether to allow the same perpetrator in multiple schemes.
33    pub allow_repeat_perpetrators: bool,
34    /// Random seed for reproducibility.
35    pub seed: u64,
36}
37
38impl Default for SchemeAdvancerConfig {
39    fn default() -> Self {
40        Self {
41            embezzlement_probability: 0.02,
42            revenue_manipulation_probability: 0.01,
43            kickback_probability: 0.01,
44            max_concurrent_schemes: 5,
45            allow_repeat_perpetrators: false,
46            seed: 42,
47        }
48    }
49}
50
51/// Summary of a completed scheme.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CompletedScheme {
54    /// Scheme ID.
55    pub scheme_id: Uuid,
56    /// Scheme type.
57    pub scheme_type: SchemeType,
58    /// Perpetrator ID.
59    pub perpetrator_id: String,
60    /// Start date.
61    pub start_date: Option<NaiveDate>,
62    /// End date.
63    pub end_date: NaiveDate,
64    /// Final status.
65    pub final_status: SchemeStatus,
66    /// Detection status.
67    pub detection_status: SchemeDetectionStatus,
68    /// Total financial impact.
69    pub total_impact: Decimal,
70    /// Number of stages completed.
71    pub stages_completed: u32,
72    /// Total transactions.
73    pub transaction_count: usize,
74}
75
76/// Label for an anomaly that's part of a multi-stage scheme.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct MultiStageAnomalyLabel {
79    /// Anomaly ID.
80    pub anomaly_id: String,
81    /// Scheme ID.
82    pub scheme_id: Uuid,
83    /// Scheme type.
84    pub scheme_type: SchemeType,
85    /// Stage number within scheme.
86    pub stage_number: u32,
87    /// Stage name.
88    pub stage_name: String,
89    /// Total stages in scheme.
90    pub total_stages: u32,
91    /// Perpetrator ID.
92    pub perpetrator_id: String,
93    /// Whether scheme was ultimately detected.
94    pub scheme_detected: bool,
95}
96
97/// Manages the lifecycle of multiple fraud schemes.
98pub struct SchemeAdvancer {
99    config: SchemeAdvancerConfig,
100    rng: ChaCha8Rng,
101    /// Active schemes.
102    active_schemes: Vec<Box<dyn FraudScheme>>,
103    /// Completed schemes.
104    completed_schemes: Vec<CompletedScheme>,
105    /// Users who are currently perpetrators.
106    active_perpetrators: Vec<String>,
107    /// Vendors involved in active schemes.
108    active_vendors: Vec<String>,
109    /// Multi-stage labels generated.
110    labels: Vec<MultiStageAnomalyLabel>,
111}
112
113impl SchemeAdvancer {
114    /// Creates a new scheme advancer.
115    pub fn new(config: SchemeAdvancerConfig) -> Self {
116        let rng = ChaCha8Rng::seed_from_u64(config.seed);
117        Self {
118            config,
119            rng,
120            active_schemes: Vec::new(),
121            completed_schemes: Vec::new(),
122            active_perpetrators: Vec::new(),
123            active_vendors: Vec::new(),
124            labels: Vec::new(),
125        }
126    }
127
128    /// Potentially starts a new scheme based on probabilities.
129    pub fn maybe_start_scheme(&mut self, context: &SchemeContext) -> Option<Uuid> {
130        // Check if we can add more schemes
131        if self.active_schemes.len() >= self.config.max_concurrent_schemes {
132            return None;
133        }
134
135        // Check available perpetrators
136        let available_users: Vec<_> = if self.config.allow_repeat_perpetrators {
137            context.available_users.clone()
138        } else {
139            context
140                .available_users
141                .iter()
142                .filter(|u| !self.active_perpetrators.contains(u))
143                .cloned()
144                .collect()
145        };
146
147        if available_users.is_empty() {
148            return None;
149        }
150
151        // Determine which scheme type to start (if any)
152        let r = self.rng.gen::<f64>();
153        let total_prob = self.config.embezzlement_probability
154            + self.config.revenue_manipulation_probability
155            + self.config.kickback_probability;
156
157        if r > total_prob {
158            return None;
159        }
160
161        let normalized_r = r / total_prob;
162        let embezzlement_threshold = self.config.embezzlement_probability / total_prob;
163        let revenue_threshold =
164            embezzlement_threshold + self.config.revenue_manipulation_probability / total_prob;
165
166        let user_idx = self.rng.gen_range(0..available_users.len());
167        let perpetrator = available_users[user_idx].clone();
168
169        let scheme: Box<dyn FraudScheme> = if normalized_r < embezzlement_threshold {
170            // Start embezzlement scheme
171            let scheme = GradualEmbezzlementScheme::new(&perpetrator)
172                .with_accounts(context.available_accounts.clone());
173            Box::new(scheme)
174        } else if normalized_r < revenue_threshold {
175            // Start revenue manipulation scheme
176            let scheme = RevenueManipulationScheme::new(&perpetrator);
177            Box::new(scheme)
178        } else {
179            // Start kickback scheme - need a vendor
180            if context.available_counterparties.is_empty() {
181                return None;
182            }
183
184            let available_vendors: Vec<_> = context
185                .available_counterparties
186                .iter()
187                .filter(|v| !self.active_vendors.contains(v))
188                .cloned()
189                .collect();
190
191            if available_vendors.is_empty() {
192                return None;
193            }
194
195            let vendor_idx = self.rng.gen_range(0..available_vendors.len());
196            let vendor = available_vendors[vendor_idx].clone();
197
198            let inflation = 0.10 + self.rng.gen::<f64>() * 0.15; // 10-25%
199            let scheme =
200                VendorKickbackScheme::new(&perpetrator, &vendor).with_inflation_percent(inflation);
201
202            self.active_vendors.push(vendor);
203            Box::new(scheme)
204        };
205
206        let scheme_id = scheme.scheme_id();
207        self.active_perpetrators.push(perpetrator);
208        self.active_schemes.push(scheme);
209
210        Some(scheme_id)
211    }
212
213    /// Advances all active schemes and returns actions to execute.
214    pub fn advance_all(&mut self, context: &SchemeContext) -> Vec<SchemeAction> {
215        let mut all_actions = Vec::new();
216        let mut schemes_to_complete = Vec::new();
217
218        for (idx, scheme) in self.active_schemes.iter_mut().enumerate() {
219            // Create a local RNG for each scheme to ensure determinism
220            let mut scheme_rng = ChaCha8Rng::seed_from_u64(
221                self.config
222                    .seed
223                    .wrapping_add(scheme.scheme_id().as_u128() as u64),
224            );
225
226            let actions = scheme.advance(context, &mut scheme_rng);
227            all_actions.extend(actions);
228
229            // Check if scheme is done
230            if matches!(
231                scheme.status(),
232                SchemeStatus::Completed | SchemeStatus::Terminated | SchemeStatus::Detected
233            ) {
234                schemes_to_complete.push(idx);
235            }
236        }
237
238        // Complete finished schemes (iterate in reverse to maintain indices)
239        for idx in schemes_to_complete.into_iter().rev() {
240            let scheme = self.active_schemes.remove(idx);
241            let completed = CompletedScheme {
242                scheme_id: scheme.scheme_id(),
243                scheme_type: scheme.scheme_type(),
244                perpetrator_id: scheme.perpetrator_id().to_string(),
245                start_date: scheme.start_date(),
246                end_date: context.current_date,
247                final_status: scheme.status(),
248                detection_status: scheme.detection_status(),
249                total_impact: scheme.total_impact(),
250                stages_completed: scheme.current_stage_number(),
251                transaction_count: scheme.transaction_refs().len(),
252            };
253
254            // Remove perpetrator from active list
255            self.active_perpetrators
256                .retain(|p| p != scheme.perpetrator_id());
257
258            self.completed_schemes.push(completed);
259        }
260
261        all_actions
262    }
263
264    /// Records a label for a scheme action.
265    pub fn record_label(&mut self, anomaly_id: impl Into<String>, action: &SchemeAction) {
266        if let Some(scheme) = self
267            .active_schemes
268            .iter()
269            .find(|s| s.scheme_id() == action.scheme_id)
270        {
271            let label = MultiStageAnomalyLabel {
272                anomaly_id: anomaly_id.into(),
273                scheme_id: scheme.scheme_id(),
274                scheme_type: scheme.scheme_type(),
275                stage_number: action.stage,
276                stage_name: scheme.current_stage().name.clone(),
277                total_stages: scheme.stages().len() as u32,
278                perpetrator_id: scheme.perpetrator_id().to_string(),
279                scheme_detected: scheme.detection_status() != SchemeDetectionStatus::Undetected,
280            };
281            self.labels.push(label);
282        }
283    }
284
285    /// Returns all generated labels.
286    pub fn get_labels(&self) -> &[MultiStageAnomalyLabel] {
287        &self.labels
288    }
289
290    /// Returns completed schemes.
291    pub fn get_completed_schemes(&self) -> &[CompletedScheme] {
292        &self.completed_schemes
293    }
294
295    /// Returns the number of active schemes.
296    pub fn active_scheme_count(&self) -> usize {
297        self.active_schemes.len()
298    }
299
300    /// Returns the number of completed schemes.
301    pub fn completed_scheme_count(&self) -> usize {
302        self.completed_schemes.len()
303    }
304
305    /// Returns active schemes summary.
306    pub fn active_schemes_summary(&self) -> Vec<(Uuid, SchemeType, SchemeStatus)> {
307        self.active_schemes
308            .iter()
309            .map(|s| (s.scheme_id(), s.scheme_type(), s.status()))
310            .collect()
311    }
312
313    /// Gets a specific scheme by ID.
314    pub fn get_scheme(&self, scheme_id: Uuid) -> Option<&dyn FraudScheme> {
315        self.active_schemes
316            .iter()
317            .find(|s| s.scheme_id() == scheme_id)
318            .map(|s| s.as_ref())
319    }
320
321    /// Resets the advancer state.
322    pub fn reset(&mut self) {
323        self.active_schemes.clear();
324        self.completed_schemes.clear();
325        self.active_perpetrators.clear();
326        self.active_vendors.clear();
327        self.labels.clear();
328        self.rng = ChaCha8Rng::seed_from_u64(self.config.seed);
329    }
330
331    /// Returns statistics about schemes.
332    pub fn get_statistics(&self) -> SchemeStatistics {
333        let total_impact: Decimal = self
334            .completed_schemes
335            .iter()
336            .map(|s| s.total_impact)
337            .sum::<Decimal>()
338            + self
339                .active_schemes
340                .iter()
341                .map(|s| s.total_impact())
342                .sum::<Decimal>();
343
344        let detected_count = self
345            .completed_schemes
346            .iter()
347            .filter(|s| s.detection_status != SchemeDetectionStatus::Undetected)
348            .count();
349
350        let by_type = |t: SchemeType| {
351            self.completed_schemes
352                .iter()
353                .filter(|s| s.scheme_type == t)
354                .count()
355                + self
356                    .active_schemes
357                    .iter()
358                    .filter(|s| s.scheme_type() == t)
359                    .count()
360        };
361
362        SchemeStatistics {
363            total_schemes: self.active_schemes.len() + self.completed_schemes.len(),
364            active_schemes: self.active_schemes.len(),
365            completed_schemes: self.completed_schemes.len(),
366            detected_schemes: detected_count,
367            total_impact,
368            embezzlement_count: by_type(SchemeType::GradualEmbezzlement),
369            revenue_manipulation_count: by_type(SchemeType::RevenueManipulation),
370            kickback_count: by_type(SchemeType::VendorKickback),
371        }
372    }
373}
374
375/// Statistics about fraud schemes.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct SchemeStatistics {
378    /// Total number of schemes (active + completed).
379    pub total_schemes: usize,
380    /// Number of active schemes.
381    pub active_schemes: usize,
382    /// Number of completed schemes.
383    pub completed_schemes: usize,
384    /// Number of detected schemes.
385    pub detected_schemes: usize,
386    /// Total financial impact.
387    pub total_impact: Decimal,
388    /// Number of embezzlement schemes.
389    pub embezzlement_count: usize,
390    /// Number of revenue manipulation schemes.
391    pub revenue_manipulation_count: usize,
392    /// Number of kickback schemes.
393    pub kickback_count: usize,
394}
395
396#[cfg(test)]
397#[allow(clippy::unwrap_used)]
398mod tests {
399    use super::*;
400
401    #[test]
402    fn test_scheme_advancer_creation() {
403        let advancer = SchemeAdvancer::new(SchemeAdvancerConfig::default());
404        assert_eq!(advancer.active_scheme_count(), 0);
405        assert_eq!(advancer.completed_scheme_count(), 0);
406    }
407
408    #[test]
409    fn test_scheme_advancer_start_scheme() {
410        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
411            embezzlement_probability: 1.0, // Always start
412            ..Default::default()
413        });
414
415        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
416            .with_users(vec!["USER001".to_string(), "USER002".to_string()])
417            .with_accounts(vec!["5000".to_string()]);
418
419        let scheme_id = advancer.maybe_start_scheme(&context);
420        assert!(scheme_id.is_some());
421        assert_eq!(advancer.active_scheme_count(), 1);
422    }
423
424    #[test]
425    fn test_scheme_advancer_max_concurrent() {
426        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
427            embezzlement_probability: 1.0,
428            max_concurrent_schemes: 2,
429            ..Default::default()
430        });
431
432        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
433            .with_users(vec![
434                "USER001".to_string(),
435                "USER002".to_string(),
436                "USER003".to_string(),
437            ])
438            .with_accounts(vec!["5000".to_string()]);
439
440        // Start schemes up to max
441        advancer.maybe_start_scheme(&context);
442        advancer.maybe_start_scheme(&context);
443        let third = advancer.maybe_start_scheme(&context);
444
445        assert_eq!(advancer.active_scheme_count(), 2);
446        assert!(third.is_none()); // Should not start third due to max
447    }
448
449    #[test]
450    fn test_scheme_advancer_advance_all() {
451        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
452            embezzlement_probability: 1.0,
453            ..Default::default()
454        });
455
456        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
457            .with_users(vec!["USER001".to_string()])
458            .with_accounts(vec!["5000".to_string()]);
459
460        advancer.maybe_start_scheme(&context);
461
462        // Advance for several days
463        for day in 0..30 {
464            let date = NaiveDate::from_ymd_opt(2024, 1, 15).unwrap() + chrono::Duration::days(day);
465            let mut ctx = context.clone();
466            ctx.current_date = date;
467
468            let _actions = advancer.advance_all(&ctx);
469        }
470
471        assert_eq!(advancer.active_scheme_count(), 1);
472    }
473
474    #[test]
475    fn test_scheme_advancer_statistics() {
476        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
477            embezzlement_probability: 1.0,
478            ..Default::default()
479        });
480
481        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
482            .with_users(vec!["USER001".to_string()])
483            .with_accounts(vec!["5000".to_string()]);
484
485        advancer.maybe_start_scheme(&context);
486
487        let stats = advancer.get_statistics();
488        assert_eq!(stats.total_schemes, 1);
489        assert_eq!(stats.active_schemes, 1);
490        assert_eq!(stats.embezzlement_count, 1);
491    }
492}