datasynth_generators/anomaly/
scheme_advancer.rs1use chrono::NaiveDate;
7use rand::Rng;
8use rand::SeedableRng;
9use rand_chacha::ChaCha8Rng;
10use rust_decimal::Decimal;
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use datasynth_core::models::{SchemeDetectionStatus, SchemeType};
15
16use super::schemes::{
17 FraudScheme, GradualEmbezzlementScheme, RevenueManipulationScheme, SchemeAction, SchemeContext,
18 SchemeStatus, VendorKickbackScheme,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SchemeAdvancerConfig {
24 pub embezzlement_probability: f64,
26 pub revenue_manipulation_probability: f64,
28 pub kickback_probability: f64,
30 pub max_concurrent_schemes: usize,
32 pub allow_repeat_perpetrators: bool,
34 pub seed: u64,
36}
37
38impl Default for SchemeAdvancerConfig {
39 fn default() -> Self {
40 Self {
41 embezzlement_probability: 0.02,
42 revenue_manipulation_probability: 0.01,
43 kickback_probability: 0.01,
44 max_concurrent_schemes: 5,
45 allow_repeat_perpetrators: false,
46 seed: 42,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct CompletedScheme {
54 pub scheme_id: Uuid,
56 pub scheme_type: SchemeType,
58 pub perpetrator_id: String,
60 pub start_date: Option<NaiveDate>,
62 pub end_date: NaiveDate,
64 pub final_status: SchemeStatus,
66 pub detection_status: SchemeDetectionStatus,
68 pub total_impact: Decimal,
70 pub stages_completed: u32,
72 pub transaction_count: usize,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct MultiStageAnomalyLabel {
79 pub anomaly_id: String,
81 pub scheme_id: Uuid,
83 pub scheme_type: SchemeType,
85 pub stage_number: u32,
87 pub stage_name: String,
89 pub total_stages: u32,
91 pub perpetrator_id: String,
93 pub scheme_detected: bool,
95}
96
97pub struct SchemeAdvancer {
99 config: SchemeAdvancerConfig,
100 rng: ChaCha8Rng,
101 active_schemes: Vec<Box<dyn FraudScheme>>,
103 completed_schemes: Vec<CompletedScheme>,
105 active_perpetrators: Vec<String>,
107 active_vendors: Vec<String>,
109 labels: Vec<MultiStageAnomalyLabel>,
111}
112
113impl SchemeAdvancer {
114 pub fn new(config: SchemeAdvancerConfig) -> Self {
116 let rng = ChaCha8Rng::seed_from_u64(config.seed);
117 Self {
118 config,
119 rng,
120 active_schemes: Vec::new(),
121 completed_schemes: Vec::new(),
122 active_perpetrators: Vec::new(),
123 active_vendors: Vec::new(),
124 labels: Vec::new(),
125 }
126 }
127
128 pub fn maybe_start_scheme(&mut self, context: &SchemeContext) -> Option<Uuid> {
130 if self.active_schemes.len() >= self.config.max_concurrent_schemes {
132 return None;
133 }
134
135 let available_users: Vec<_> = if self.config.allow_repeat_perpetrators {
137 context.available_users.clone()
138 } else {
139 context
140 .available_users
141 .iter()
142 .filter(|u| !self.active_perpetrators.contains(u))
143 .cloned()
144 .collect()
145 };
146
147 if available_users.is_empty() {
148 return None;
149 }
150
151 let r = self.rng.gen::<f64>();
153 let total_prob = self.config.embezzlement_probability
154 + self.config.revenue_manipulation_probability
155 + self.config.kickback_probability;
156
157 if r > total_prob {
158 return None;
159 }
160
161 let normalized_r = r / total_prob;
162 let embezzlement_threshold = self.config.embezzlement_probability / total_prob;
163 let revenue_threshold =
164 embezzlement_threshold + self.config.revenue_manipulation_probability / total_prob;
165
166 let user_idx = self.rng.gen_range(0..available_users.len());
167 let perpetrator = available_users[user_idx].clone();
168
169 let scheme: Box<dyn FraudScheme> = if normalized_r < embezzlement_threshold {
170 let scheme = GradualEmbezzlementScheme::new(&perpetrator)
172 .with_accounts(context.available_accounts.clone());
173 Box::new(scheme)
174 } else if normalized_r < revenue_threshold {
175 let scheme = RevenueManipulationScheme::new(&perpetrator);
177 Box::new(scheme)
178 } else {
179 if context.available_counterparties.is_empty() {
181 return None;
182 }
183
184 let available_vendors: Vec<_> = context
185 .available_counterparties
186 .iter()
187 .filter(|v| !self.active_vendors.contains(v))
188 .cloned()
189 .collect();
190
191 if available_vendors.is_empty() {
192 return None;
193 }
194
195 let vendor_idx = self.rng.gen_range(0..available_vendors.len());
196 let vendor = available_vendors[vendor_idx].clone();
197
198 let inflation = 0.10 + self.rng.gen::<f64>() * 0.15; let scheme =
200 VendorKickbackScheme::new(&perpetrator, &vendor).with_inflation_percent(inflation);
201
202 self.active_vendors.push(vendor);
203 Box::new(scheme)
204 };
205
206 let scheme_id = scheme.scheme_id();
207 self.active_perpetrators.push(perpetrator);
208 self.active_schemes.push(scheme);
209
210 Some(scheme_id)
211 }
212
213 pub fn advance_all(&mut self, context: &SchemeContext) -> Vec<SchemeAction> {
215 let mut all_actions = Vec::new();
216 let mut schemes_to_complete = Vec::new();
217
218 for (idx, scheme) in self.active_schemes.iter_mut().enumerate() {
219 let mut scheme_rng = ChaCha8Rng::seed_from_u64(
221 self.config
222 .seed
223 .wrapping_add(scheme.scheme_id().as_u128() as u64),
224 );
225
226 let actions = scheme.advance(context, &mut scheme_rng);
227 all_actions.extend(actions);
228
229 if matches!(
231 scheme.status(),
232 SchemeStatus::Completed | SchemeStatus::Terminated | SchemeStatus::Detected
233 ) {
234 schemes_to_complete.push(idx);
235 }
236 }
237
238 for idx in schemes_to_complete.into_iter().rev() {
240 let scheme = self.active_schemes.remove(idx);
241 let completed = CompletedScheme {
242 scheme_id: scheme.scheme_id(),
243 scheme_type: scheme.scheme_type(),
244 perpetrator_id: scheme.perpetrator_id().to_string(),
245 start_date: scheme.start_date(),
246 end_date: context.current_date,
247 final_status: scheme.status(),
248 detection_status: scheme.detection_status(),
249 total_impact: scheme.total_impact(),
250 stages_completed: scheme.current_stage_number(),
251 transaction_count: scheme.transaction_refs().len(),
252 };
253
254 self.active_perpetrators
256 .retain(|p| p != scheme.perpetrator_id());
257
258 self.completed_schemes.push(completed);
259 }
260
261 all_actions
262 }
263
264 pub fn record_label(&mut self, anomaly_id: impl Into<String>, action: &SchemeAction) {
266 if let Some(scheme) = self
267 .active_schemes
268 .iter()
269 .find(|s| s.scheme_id() == action.scheme_id)
270 {
271 let label = MultiStageAnomalyLabel {
272 anomaly_id: anomaly_id.into(),
273 scheme_id: scheme.scheme_id(),
274 scheme_type: scheme.scheme_type(),
275 stage_number: action.stage,
276 stage_name: scheme.current_stage().name.clone(),
277 total_stages: scheme.stages().len() as u32,
278 perpetrator_id: scheme.perpetrator_id().to_string(),
279 scheme_detected: scheme.detection_status() != SchemeDetectionStatus::Undetected,
280 };
281 self.labels.push(label);
282 }
283 }
284
285 pub fn get_labels(&self) -> &[MultiStageAnomalyLabel] {
287 &self.labels
288 }
289
290 pub fn get_completed_schemes(&self) -> &[CompletedScheme] {
292 &self.completed_schemes
293 }
294
295 pub fn active_scheme_count(&self) -> usize {
297 self.active_schemes.len()
298 }
299
300 pub fn completed_scheme_count(&self) -> usize {
302 self.completed_schemes.len()
303 }
304
305 pub fn active_schemes_summary(&self) -> Vec<(Uuid, SchemeType, SchemeStatus)> {
307 self.active_schemes
308 .iter()
309 .map(|s| (s.scheme_id(), s.scheme_type(), s.status()))
310 .collect()
311 }
312
313 pub fn get_scheme(&self, scheme_id: Uuid) -> Option<&dyn FraudScheme> {
315 self.active_schemes
316 .iter()
317 .find(|s| s.scheme_id() == scheme_id)
318 .map(|s| s.as_ref())
319 }
320
321 pub fn reset(&mut self) {
323 self.active_schemes.clear();
324 self.completed_schemes.clear();
325 self.active_perpetrators.clear();
326 self.active_vendors.clear();
327 self.labels.clear();
328 self.rng = ChaCha8Rng::seed_from_u64(self.config.seed);
329 }
330
331 pub fn get_statistics(&self) -> SchemeStatistics {
333 let total_impact: Decimal = self
334 .completed_schemes
335 .iter()
336 .map(|s| s.total_impact)
337 .sum::<Decimal>()
338 + self
339 .active_schemes
340 .iter()
341 .map(|s| s.total_impact())
342 .sum::<Decimal>();
343
344 let detected_count = self
345 .completed_schemes
346 .iter()
347 .filter(|s| s.detection_status != SchemeDetectionStatus::Undetected)
348 .count();
349
350 let by_type = |t: SchemeType| {
351 self.completed_schemes
352 .iter()
353 .filter(|s| s.scheme_type == t)
354 .count()
355 + self
356 .active_schemes
357 .iter()
358 .filter(|s| s.scheme_type() == t)
359 .count()
360 };
361
362 SchemeStatistics {
363 total_schemes: self.active_schemes.len() + self.completed_schemes.len(),
364 active_schemes: self.active_schemes.len(),
365 completed_schemes: self.completed_schemes.len(),
366 detected_schemes: detected_count,
367 total_impact,
368 embezzlement_count: by_type(SchemeType::GradualEmbezzlement),
369 revenue_manipulation_count: by_type(SchemeType::RevenueManipulation),
370 kickback_count: by_type(SchemeType::VendorKickback),
371 }
372 }
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct SchemeStatistics {
378 pub total_schemes: usize,
380 pub active_schemes: usize,
382 pub completed_schemes: usize,
384 pub detected_schemes: usize,
386 pub total_impact: Decimal,
388 pub embezzlement_count: usize,
390 pub revenue_manipulation_count: usize,
392 pub kickback_count: usize,
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_scheme_advancer_creation() {
402 let advancer = SchemeAdvancer::new(SchemeAdvancerConfig::default());
403 assert_eq!(advancer.active_scheme_count(), 0);
404 assert_eq!(advancer.completed_scheme_count(), 0);
405 }
406
407 #[test]
408 fn test_scheme_advancer_start_scheme() {
409 let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
410 embezzlement_probability: 1.0, ..Default::default()
412 });
413
414 let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
415 .with_users(vec!["USER001".to_string(), "USER002".to_string()])
416 .with_accounts(vec!["5000".to_string()]);
417
418 let scheme_id = advancer.maybe_start_scheme(&context);
419 assert!(scheme_id.is_some());
420 assert_eq!(advancer.active_scheme_count(), 1);
421 }
422
423 #[test]
424 fn test_scheme_advancer_max_concurrent() {
425 let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
426 embezzlement_probability: 1.0,
427 max_concurrent_schemes: 2,
428 ..Default::default()
429 });
430
431 let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
432 .with_users(vec![
433 "USER001".to_string(),
434 "USER002".to_string(),
435 "USER003".to_string(),
436 ])
437 .with_accounts(vec!["5000".to_string()]);
438
439 advancer.maybe_start_scheme(&context);
441 advancer.maybe_start_scheme(&context);
442 let third = advancer.maybe_start_scheme(&context);
443
444 assert_eq!(advancer.active_scheme_count(), 2);
445 assert!(third.is_none()); }
447
448 #[test]
449 fn test_scheme_advancer_advance_all() {
450 let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
451 embezzlement_probability: 1.0,
452 ..Default::default()
453 });
454
455 let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
456 .with_users(vec!["USER001".to_string()])
457 .with_accounts(vec!["5000".to_string()]);
458
459 advancer.maybe_start_scheme(&context);
460
461 for day in 0..30 {
463 let date = NaiveDate::from_ymd_opt(2024, 1, 15).unwrap() + chrono::Duration::days(day);
464 let mut ctx = context.clone();
465 ctx.current_date = date;
466
467 let _actions = advancer.advance_all(&ctx);
468 }
469
470 assert_eq!(advancer.active_scheme_count(), 1);
471 }
472
473 #[test]
474 fn test_scheme_advancer_statistics() {
475 let mut advancer = SchemeAdvancer::new(SchemeAdvancerConfig {
476 embezzlement_probability: 1.0,
477 ..Default::default()
478 });
479
480 let context = SchemeContext::new(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(), "1000")
481 .with_users(vec!["USER001".to_string()])
482 .with_accounts(vec!["5000".to_string()]);
483
484 advancer.maybe_start_scheme(&context);
485
486 let stats = advancer.get_statistics();
487 assert_eq!(stats.total_schemes, 1);
488 assert_eq!(stats.active_schemes, 1);
489 assert_eq!(stats.embezzlement_count, 1);
490 }
491}