Skip to main content

datasynth_generators/anomaly/
scheme_advancer.rs

1//! Scheme advancer for managing multiple fraud schemes.
2//!
3//! The SchemeAdvancer coordinates the lifecycle of multiple fraud schemes,
4//! handling scheme creation, advancement, and completion.
5
6use chrono::NaiveDate;
7use rand::Rng;
8use rand::SeedableRng;
9use rand_chacha::ChaCha8Rng;
10use rust_decimal::Decimal;
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use datasynth_core::models::{SchemeDetectionStatus, SchemeType};
15
16use super::schemes::{
17    FraudScheme, GradualEmbezzlementScheme, RevenueManipulationScheme, SchemeAction, SchemeContext,
18    SchemeStatus, VendorKickbackScheme,
19};
20
21/// Configuration for scheme generation.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SchemeAdvancerConfig {
24    /// Probability of starting an embezzlement scheme per period.
25    pub embezzlement_probability: f64,
26    /// Probability of starting a revenue manipulation scheme per period.
27    pub revenue_manipulation_probability: f64,
28    /// Probability of starting a kickback scheme per period.
29    pub kickback_probability: f64,
30    /// Maximum number of concurrent schemes.
31    pub max_concurrent_schemes: usize,
32    /// Whether to allow the same perpetrator in multiple schemes.
33    pub allow_repeat_perpetrators: bool,
34    /// Random seed for reproducibility.
35    pub seed: u64,
36}
37
38impl Default for SchemeAdvancerConfig {
39    fn default() -> Self {
40        Self {
41            embezzlement_probability: 0.02,
42            revenue_manipulation_probability: 0.01,
43            kickback_probability: 0.01,
44            max_concurrent_schemes: 5,
45            allow_repeat_perpetrators: false,
46            seed: 42,
47        }
48    }
49}
50
51/// Summary of a completed scheme.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CompletedScheme {
54    /// Scheme ID.
55    pub scheme_id: Uuid,
56    /// Scheme type.
57    pub scheme_type: SchemeType,
58    /// Perpetrator ID.
59    pub perpetrator_id: String,
60    /// Start date.
61    pub start_date: Option<NaiveDate>,
62    /// End date.
63    pub end_date: NaiveDate,
64    /// Final status.
65    pub final_status: SchemeStatus,
66    /// Detection status.
67    pub detection_status: SchemeDetectionStatus,
68    /// Total financial impact.
69    pub total_impact: Decimal,
70    /// Number of stages completed.
71    pub stages_completed: u32,
72    /// Total transactions.
73    pub transaction_count: usize,
74}
75
76/// Label for an anomaly that's part of a multi-stage scheme.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct MultiStageAnomalyLabel {
79    /// Anomaly ID.
80    pub anomaly_id: String,
81    /// Scheme ID.
82    pub scheme_id: Uuid,
83    /// Scheme type.
84    pub scheme_type: SchemeType,
85    /// Stage number within scheme.
86    pub stage_number: u32,
87    /// Stage name.
88    pub stage_name: String,
89    /// Total stages in scheme.
90    pub total_stages: u32,
91    /// Perpetrator ID.
92    pub perpetrator_id: String,
93    /// Whether scheme was ultimately detected.
94    pub scheme_detected: bool,
95}
96
97/// Manages the lifecycle of multiple fraud schemes.
98pub struct SchemeAdvancer {
99    config: SchemeAdvancerConfig,
100    rng: ChaCha8Rng,
101    /// Active schemes.
102    active_schemes: Vec<Box<dyn FraudScheme>>,
103    /// Completed schemes.
104    completed_schemes: Vec<CompletedScheme>,
105    /// Users who are currently perpetrators.
106    active_perpetrators: Vec<String>,
107    /// Vendors involved in active schemes.
108    active_vendors: Vec<String>,
109    /// Multi-stage labels generated.
110    labels: Vec<MultiStageAnomalyLabel>,
111}
112
113impl SchemeAdvancer {
114    /// Creates a new scheme advancer.
115    pub fn new(config: SchemeAdvancerConfig) -> Self {
116        let rng = ChaCha8Rng::seed_from_u64(config.seed);
117        Self {
118            config,
119            rng,
120            active_schemes: Vec::new(),
121            completed_schemes: Vec::new(),
122            active_perpetrators: Vec::new(),
123            active_vendors: Vec::new(),
124            labels: Vec::new(),
125        }
126    }
127
128    /// Potentially starts a new scheme based on probabilities.
129    pub fn maybe_start_scheme(&mut self, context: &SchemeContext) -> Option<Uuid> {
130        // Check if we can add more schemes
131        if self.active_schemes.len() >= self.config.max_concurrent_schemes {
132            return None;
133        }
134
135        // Check available perpetrators
136        let available_users: Vec<_> = if self.config.allow_repeat_perpetrators {
137            context.available_users.clone()
138        } else {
139            context
140                .available_users
141                .iter()
142                .filter(|u| !self.active_perpetrators.contains(u))
143                .cloned()
144                .collect()
145        };
146
147        if available_users.is_empty() {
148            return None;
149        }
150
151        // Determine which scheme type to start (if any)
152        let r = self.rng.gen::<f64>();
153        let total_prob = self.config.embezzlement_probability
154            + self.config.revenue_manipulation_probability
155            + self.config.kickback_probability;
156
157        if r > total_prob {
158            return None;
159        }
160
161        let normalized_r = r / total_prob;
162        let embezzlement_threshold = self.config.embezzlement_probability / total_prob;
163        let revenue_threshold =
164            embezzlement_threshold + self.config.revenue_manipulation_probability / total_prob;
165
166        let user_idx = self.rng.gen_range(0..available_users.len());
167        let perpetrator = available_users[user_idx].clone();
168
169        let scheme: Box<dyn FraudScheme> = if normalized_r < embezzlement_threshold {
170            // Start embezzlement scheme
171            let scheme = GradualEmbezzlementScheme::new(&perpetrator)
172                .with_accounts(context.available_accounts.clone());
173            Box::new(scheme)
174        } else if normalized_r < revenue_threshold {
175            // Start revenue manipulation scheme
176            let scheme = RevenueManipulationScheme::new(&perpetrator);
177            Box::new(scheme)
178        } else {
179            // Start kickback scheme - need a vendor
180            if context.available_counterparties.is_empty() {
181                return None;
182            }
183
184            let available_vendors: Vec<_> = context
185                .available_counterparties
186                .iter()
187                .filter(|v| !self.active_vendors.contains(v))
188                .cloned()
189                .collect();
190
191            if available_vendors.is_empty() {
192                return None;
193            }
194
195            let vendor_idx = self.rng.gen_range(0..available_vendors.len());
196            let vendor = available_vendors[vendor_idx].clone();
197
198            let inflation = 0.10 + self.rng.gen::<f64>() * 0.15; // 10-25%
199            let scheme =
200                VendorKickbackScheme::new(&perpetrator, &vendor).with_inflation_percent(inflation);
201
202            self.active_vendors.push(vendor);
203            Box::new(scheme)
204        };
205
206        let scheme_id = scheme.scheme_id();
207        self.active_perpetrators.push(perpetrator);
208        self.active_schemes.push(scheme);
209
210        Some(scheme_id)
211    }
212
213    /// Advances all active schemes and returns actions to execute.
214    pub fn advance_all(&mut self, context: &SchemeContext) -> Vec<SchemeAction> {
215        let mut all_actions = Vec::new();
216        let mut schemes_to_complete = Vec::new();
217
218        for (idx, scheme) in self.active_schemes.iter_mut().enumerate() {
219            // Create a local RNG for each scheme to ensure determinism
220            let mut scheme_rng = ChaCha8Rng::seed_from_u64(
221                self.config
222                    .seed
223                    .wrapping_add(scheme.scheme_id().as_u128() as u64),
224            );
225
226            let actions = scheme.advance(context, &mut scheme_rng);
227            all_actions.extend(actions);
228
229            // Check if scheme is done
230            if matches!(
231                scheme.status(),
232                SchemeStatus::Completed | SchemeStatus::Terminated | SchemeStatus::Detected
233            ) {
234                schemes_to_complete.push(idx);
235            }
236        }
237
238        // Complete finished schemes (iterate in reverse to maintain indices)
239        for idx in schemes_to_complete.into_iter().rev() {
240            let scheme = self.active_schemes.remove(idx);
241            let completed = CompletedScheme {
242                scheme_id: scheme.scheme_id(),
243                scheme_type: scheme.scheme_type(),
244                perpetrator_id: scheme.perpetrator_id().to_string(),
245                start_date: scheme.start_date(),
246                end_date: context.current_date,
247                final_status: scheme.status(),
248                detection_status: scheme.detection_status(),
249                total_impact: scheme.total_impact(),
250                stages_completed: scheme.current_stage_number(),
251                transaction_count: scheme.transaction_refs().len(),
252            };
253
254            // Remove perpetrator from active list
255            self.active_perpetrators
256                .retain(|p| p != scheme.perpetrator_id());
257
258            self.completed_schemes.push(completed);
259        }
260
261        all_actions
262    }
263
264    /// Records a label for a scheme action.
265    pub fn record_label(&mut self, anomaly_id: impl Into<String>, action: &SchemeAction) {
266        if let Some(scheme) = self
267            .active_schemes
268            .iter()
269            .find(|s| s.scheme_id() == action.scheme_id)
270        {
271            let label = MultiStageAnomalyLabel {
272                anomaly_id: anomaly_id.into(),
273                scheme_id: scheme.scheme_id(),
274                scheme_type: scheme.scheme_type(),
275                stage_number: action.stage,
276                stage_name: scheme.current_stage().name.clone(),
277                total_stages: scheme.stages().len() as u32,
278                perpetrator_id: scheme.perpetrator_id().to_string(),
279                scheme_detected: scheme.detection_status() != SchemeDetectionStatus::Undetected,
280            };
281            self.labels.push(label);
282        }
283    }
284
285    /// Returns all generated labels.
286    pub fn get_labels(&self) -> &[MultiStageAnomalyLabel] {
287        &self.labels
288    }
289
290    /// Returns completed schemes.
291    pub fn get_completed_schemes(&self) -> &[CompletedScheme] {
292        &self.completed_schemes
293    }
294
295    /// Returns the number of active schemes.
296    pub fn active_scheme_count(&self) -> usize {
297        self.active_schemes.len()
298    }
299
300    /// Returns the number of completed schemes.
301    pub fn completed_scheme_count(&self) -> usize {
302        self.completed_schemes.len()
303    }
304
305    /// Returns active schemes summary.
306    pub fn active_schemes_summary(&self) -> Vec<(Uuid, SchemeType, SchemeStatus)> {
307        self.active_schemes
308            .iter()
309            .map(|s| (s.scheme_id(), s.scheme_type(), s.status()))
310            .collect()
311    }
312
313    /// Gets a specific scheme by ID.
314    pub fn get_scheme(&self, scheme_id: Uuid) -> Option<&dyn FraudScheme> {
315        self.active_schemes
316            .iter()
317            .find(|s| s.scheme_id() == scheme_id)
318            .map(|s| s.as_ref())
319    }
320
321    /// Resets the advancer state.
322    pub fn reset(&mut self) {
323        self.active_schemes.clear();
324        self.completed_schemes.clear();
325        self.active_perpetrators.clear();
326        self.active_vendors.clear();
327        self.labels.clear();
328        self.rng = ChaCha8Rng::seed_from_u64(self.config.seed);
329    }
330
331    /// Returns statistics about schemes.
332    pub fn get_statistics(&self) -> SchemeStatistics {
333        let total_impact: Decimal = self
334            .completed_schemes
335            .iter()
336            .map(|s| s.total_impact)
337            .sum::<Decimal>()
338            + self
339                .active_schemes
340                .iter()
341                .map(|s| s.total_impact())
342                .sum::<Decimal>();
343
344        let detected_count = self
345            .completed_schemes
346            .iter()
347            .filter(|s| s.detection_status != SchemeDetectionStatus::Undetected)
348            .count();
349
350        let by_type = |t: SchemeType| {
351            self.completed_schemes
352                .iter()
353                .filter(|s| s.scheme_type == t)
354                .count()
355                + self
356                    .active_schemes
357                    .iter()
358                    .filter(|s| s.scheme_type() == t)
359                    .count()
360        };
361
362        SchemeStatistics {
363            total_schemes: self.active_schemes.len() + self.completed_schemes.len(),
364            active_schemes: self.active_schemes.len(),
365            completed_schemes: self.completed_schemes.len(),
366            detected_schemes: detected_count,
367            total_impact,
368            embezzlement_count: by_type(SchemeType::GradualEmbezzlement),
369            revenue_manipulation_count: by_type(SchemeType::RevenueManipulation),
370            kickback_count: by_type(SchemeType::VendorKickback),
371        }
372    }
373}
374
375/// Statistics about fraud schemes.
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct SchemeStatistics {
378    /// Total number of schemes (active + completed).
379    pub total_schemes: usize,
380    /// Number of active schemes.
381    pub active_schemes: usize,
382    /// Number of completed schemes.
383    pub completed_schemes: usize,
384    /// Number of detected schemes.
385    pub detected_schemes: usize,
386    /// Total financial impact.
387    pub total_impact: Decimal,
388    /// Number of embezzlement schemes.
389    pub embezzlement_count: usize,
390    /// Number of revenue manipulation schemes.
391    pub revenue_manipulation_count: usize,
392    /// Number of kickback schemes.
393    pub kickback_count: usize,
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_scheme_advancer_creation() {
402        let advancer = SchemeAdvancer::new(SchemeAdvancerConfig::default());
403        assert_eq!(advancer.active_scheme_count(), 0);
404        assert_eq!(advancer.completed_scheme_count(), 0);
405    }
406
407    #[test]
408    fn test_scheme_advancer_start_scheme() {
409        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
410            embezzlement_probability: 1.0, // Always start
411            ..Default::default()
412        });
413
414        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
415            .with_users(vec!["USER001".to_string(), "USER002".to_string()])
416            .with_accounts(vec!["5000".to_string()]);
417
418        let scheme_id = advancer.maybe_start_scheme(&context);
419        assert!(scheme_id.is_some());
420        assert_eq!(advancer.active_scheme_count(), 1);
421    }
422
423    #[test]
424    fn test_scheme_advancer_max_concurrent() {
425        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
426            embezzlement_probability: 1.0,
427            max_concurrent_schemes: 2,
428            ..Default::default()
429        });
430
431        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
432            .with_users(vec![
433                "USER001".to_string(),
434                "USER002".to_string(),
435                "USER003".to_string(),
436            ])
437            .with_accounts(vec!["5000".to_string()]);
438
439        // Start schemes up to max
440        advancer.maybe_start_scheme(&context);
441        advancer.maybe_start_scheme(&context);
442        let third = advancer.maybe_start_scheme(&context);
443
444        assert_eq!(advancer.active_scheme_count(), 2);
445        assert!(third.is_none()); // Should not start third due to max
446    }
447
448    #[test]
449    fn test_scheme_advancer_advance_all() {
450        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
451            embezzlement_probability: 1.0,
452            ..Default::default()
453        });
454
455        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
456            .with_users(vec!["USER001".to_string()])
457            .with_accounts(vec!["5000".to_string()]);
458
459        advancer.maybe_start_scheme(&context);
460
461        // Advance for several days
462        for day in 0..30 {
463            let date = NaiveDate::from_ymd_opt(2024, 1, 15).unwrap() + chrono::Duration::days(day);
464            let mut ctx = context.clone();
465            ctx.current_date = date;
466
467            let _actions = advancer.advance_all(&ctx);
468        }
469
470        assert_eq!(advancer.active_scheme_count(), 1);
471    }
472
473    #[test]
474    fn test_scheme_advancer_statistics() {
475        let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
476            embezzlement_probability: 1.0,
477            ..Default::default()
478        });
479
480        let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
481            .with_users(vec!["USER001".to_string()])
482            .with_accounts(vec!["5000".to_string()]);
483
484        advancer.maybe_start_scheme(&context);
485
486        let stats = advancer.get_statistics();
487        assert_eq!(stats.total_schemes, 1);
488        assert_eq!(stats.active_schemes, 1);
489        assert_eq!(stats.embezzlement_count, 1);
490    }
491}