1use super::{
7 AccountTypeInfo, AnomalyInjectionConfig, AnomalyInjector, ChartOfAccountsTemplate,
8 CompanyArchetype, GeneratorConfig, TransactionGenerator,
9};
10use crate::models::{
11 AccountingNetwork, Decimal128, FraudPattern, GaapViolation, HybridTimestamp, NetworkSnapshot,
12 TemporalAlert, TransactionFlow,
13};
14use std::time::Duration;
15use tokio::sync::broadcast;
16use uuid::Uuid;
17
18#[derive(Debug, Clone)]
20pub struct PipelineConfig {
21 pub tick_duration: Duration,
23 pub batch_size: usize,
25 pub channel_buffer: usize,
27 pub inject_anomalies: bool,
29 pub anomaly_config: AnomalyInjectionConfig,
31}
32
33impl Default for PipelineConfig {
34 fn default() -> Self {
35 Self {
36 tick_duration: Duration::from_millis(100),
37 batch_size: 50,
38 channel_buffer: 1000,
39 inject_anomalies: true,
40 anomaly_config: AnomalyInjectionConfig::default(),
41 }
42 }
43}
44
45impl PipelineConfig {
46 pub fn fast() -> Self {
48 Self {
49 tick_duration: Duration::from_millis(10),
50 batch_size: 100,
51 ..Default::default()
52 }
53 }
54
55 pub fn educational() -> Self {
57 Self {
58 tick_duration: Duration::from_millis(500),
59 batch_size: 5,
60 ..Default::default()
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub enum PipelineEvent {
68 EntriesGenerated {
70 count: usize,
72 timestamp: HybridTimestamp,
74 },
75
76 FlowsCreated {
78 flows: Vec<TransactionFlow>,
80 timestamp: HybridTimestamp,
82 },
83
84 NetworkUpdated(NetworkSnapshot),
86
87 AnomalyDetected(Alert),
89
90 FraudPatternDetected(FraudPattern),
92
93 GaapViolationDetected(GaapViolation),
95
96 TemporalAnomalyDetected(TemporalAlert),
98
99 StatsUpdated(PipelineStats),
101
102 Paused,
104
105 Resumed,
107
108 Stopped,
110}
111
112#[derive(Debug, Clone)]
114pub struct Alert {
115 pub id: Uuid,
117 pub severity: AlertSeverity,
119 pub alert_type: String,
121 pub message: String,
123 pub accounts: Vec<u16>,
125 pub amount: Option<Decimal128>,
127 pub timestamp: HybridTimestamp,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
133pub enum AlertSeverity {
134 Info,
136 Low,
138 Medium,
140 High,
142 Critical,
144}
145
146impl AlertSeverity {
147 pub fn color(&self) -> [u8; 3] {
149 match self {
150 AlertSeverity::Info => [100, 181, 246], AlertSeverity::Low => [255, 235, 59], AlertSeverity::Medium => [255, 152, 0], AlertSeverity::High => [244, 67, 54], AlertSeverity::Critical => [183, 28, 28], }
156 }
157
158 pub fn icon(&self) -> &'static str {
160 match self {
161 AlertSeverity::Info => "âšī¸",
162 AlertSeverity::Low => "â ī¸",
163 AlertSeverity::Medium => "đļ",
164 AlertSeverity::High => "đ´",
165 AlertSeverity::Critical => "đ¨",
166 }
167 }
168}
169
170#[derive(Debug, Clone, Default)]
172pub struct PipelineStats {
173 pub entries_generated: u64,
175 pub flows_created: u64,
177 pub anomalies_detected: u64,
179 pub entries_per_second: f64,
181 pub flows_per_second: f64,
183 pub method_distribution: [u32; 5],
185 pub running_time_seconds: f64,
187}
188
189pub struct DataFabricPipeline {
191 entity_id: Uuid,
193 generator: TransactionGenerator,
195 injector: Option<AnomalyInjector>,
197 network: AccountingNetwork,
199 config: PipelineConfig,
201 event_sender: broadcast::Sender<PipelineEvent>,
203 is_running: bool,
205 is_paused: bool,
207 stats: PipelineStats,
209 start_time: Option<std::time::Instant>,
211}
212
213impl DataFabricPipeline {
214 pub fn new(
216 archetype: CompanyArchetype,
217 generator_config: GeneratorConfig,
218 pipeline_config: PipelineConfig,
219 ) -> Self {
220 let entity_id = Uuid::new_v4();
221 let generator = TransactionGenerator::new(archetype.clone(), generator_config);
222
223 let coa = ChartOfAccountsTemplate::for_archetype(&archetype);
225 let mut network = AccountingNetwork::new(entity_id, 2024, 1);
226
227 for account_def in &coa.accounts {
229 let (node, metadata) = account_def.to_account(network.accounts.len() as u16);
230 network.add_account(node, metadata);
231 }
232
233 let injector = if pipeline_config.inject_anomalies {
235 let mut inj = AnomalyInjector::new(pipeline_config.anomaly_config.clone(), None);
236
237 for (idx, def) in coa.accounts.iter().enumerate() {
239 use crate::models::AccountType;
240 let info = AccountTypeInfo {
241 is_asset: def.account_type == AccountType::Asset,
242 is_liability: def.account_type == AccountType::Liability,
243 is_revenue: def.account_type == AccountType::Revenue,
244 is_expense: def.account_type == AccountType::Expense,
245 is_equity: def.account_type == AccountType::Equity,
246 is_cash: def.semantics & crate::models::AccountSemantics::IS_CASH != 0,
247 is_suspense: def.semantics & crate::models::AccountSemantics::IS_SUSPENSE != 0,
248 };
249 inj.register_account(idx as u16, info);
250 }
251
252 Some(inj)
253 } else {
254 None
255 };
256
257 let (event_sender, _) = broadcast::channel(pipeline_config.channel_buffer);
258
259 Self {
260 entity_id,
261 generator,
262 injector,
263 network,
264 config: pipeline_config,
265 event_sender,
266 is_running: false,
267 is_paused: false,
268 stats: PipelineStats::default(),
269 start_time: None,
270 }
271 }
272
273 pub fn subscribe(&self) -> broadcast::Receiver<PipelineEvent> {
275 self.event_sender.subscribe()
276 }
277
278 pub fn network_snapshot(&self) -> NetworkSnapshot {
280 self.network.snapshot()
281 }
282
283 pub fn network(&self) -> &AccountingNetwork {
285 &self.network
286 }
287
288 pub fn network_mut(&mut self) -> &mut AccountingNetwork {
290 &mut self.network
291 }
292
293 pub fn stats(&self) -> &PipelineStats {
295 &self.stats
296 }
297
298 pub fn is_running(&self) -> bool {
300 self.is_running
301 }
302
303 pub fn is_paused(&self) -> bool {
305 self.is_paused
306 }
307
308 pub fn tick(&mut self) -> Vec<TransactionFlow> {
311 if self.is_paused {
312 return Vec::new();
313 }
314
315 if self.start_time.is_none() {
317 self.start_time = Some(std::time::Instant::now());
318 }
319
320 let entries = self.generator.generate_batch(self.config.batch_size);
322 let entry_count = entries.len();
323 self.stats.entries_generated += entry_count as u64;
324
325 let _ = self.event_sender.send(PipelineEvent::EntriesGenerated {
327 count: entry_count,
328 timestamp: HybridTimestamp::now(),
329 });
330
331 let mut all_flows = Vec::new();
333
334 for entry in entries {
335 let (final_entry, debit_lines, credit_lines, _anomaly_label) =
337 if let Some(ref mut injector) = self.injector {
338 let result =
339 injector.process(entry.entry, entry.debit_lines, entry.credit_lines);
340
341 if result.anomaly_injected {
342 self.stats.anomalies_detected += 1;
343
344 if let Some(ref label) = result.anomaly_label {
346 let alert = self.create_alert_from_label(label, &result.entry);
347 let _ = self
348 .event_sender
349 .send(PipelineEvent::AnomalyDetected(alert));
350 }
351 }
352
353 (
354 result.entry,
355 result.debit_lines,
356 result.credit_lines,
357 result.anomaly_label,
358 )
359 } else {
360 (entry.entry, entry.debit_lines, entry.credit_lines, None)
361 };
362
363 let flows = self.transform_to_flows(&final_entry, &debit_lines, &credit_lines);
365 self.stats.flows_created += flows.len() as u64;
366
367 let method_idx = final_entry.solving_method as usize;
369 if method_idx < 5 {
370 self.stats.method_distribution[method_idx] += 1;
371 }
372
373 for flow in &flows {
375 self.network.add_flow(flow.clone());
376 }
377
378 all_flows.extend(flows);
379 }
380
381 if !all_flows.is_empty() {
383 let _ = self.event_sender.send(PipelineEvent::FlowsCreated {
384 flows: all_flows.clone(),
385 timestamp: HybridTimestamp::now(),
386 });
387 }
388
389 self.network.update_statistics();
391
392 let _ = self
394 .event_sender
395 .send(PipelineEvent::NetworkUpdated(self.network.snapshot()));
396
397 if let Some(start) = self.start_time {
399 self.stats.running_time_seconds = start.elapsed().as_secs_f64();
400 if self.stats.running_time_seconds > 0.0 {
401 self.stats.entries_per_second =
402 self.stats.entries_generated as f64 / self.stats.running_time_seconds;
403 self.stats.flows_per_second =
404 self.stats.flows_created as f64 / self.stats.running_time_seconds;
405 }
406 }
407
408 all_flows
409 }
410
411 fn transform_to_flows(
413 &self,
414 entry: &crate::models::JournalEntry,
415 debit_lines: &[crate::models::JournalLineItem],
416 credit_lines: &[crate::models::JournalLineItem],
417 ) -> Vec<TransactionFlow> {
418 use crate::models::SolvingMethod;
419
420 match entry.solving_method {
421 SolvingMethod::MethodA => {
422 if let (Some(debit), Some(credit)) = (debit_lines.first(), credit_lines.first()) {
424 vec![TransactionFlow::with_provenance(
425 debit.account_index,
426 credit.account_index,
427 debit.amount,
428 entry.id,
429 0,
430 0,
431 entry.posting_date,
432 SolvingMethod::MethodA,
433 1.0,
434 )]
435 } else {
436 Vec::new()
437 }
438 }
439
440 SolvingMethod::MethodB => {
441 let n = debit_lines.len().min(credit_lines.len());
443 (0..n)
444 .map(|i| {
445 TransactionFlow::with_provenance(
446 debit_lines[i].account_index,
447 credit_lines[i].account_index,
448 debit_lines[i].amount,
449 entry.id,
450 i as u16,
451 i as u16,
452 entry.posting_date,
453 SolvingMethod::MethodB,
454 1.0,
455 )
456 })
457 .collect()
458 }
459
460 _ => {
461 let total_credit: f64 = credit_lines.iter().map(|c| c.amount.to_f64()).sum();
463
464 if total_credit == 0.0 {
465 return Vec::new();
466 }
467
468 let mut flows = Vec::new();
469 for debit in debit_lines {
470 let debit_amount = debit.amount.to_f64();
471 for credit in credit_lines {
472 let credit_ratio = credit.amount.to_f64() / total_credit;
473 let flow_amount = Decimal128::from_f64(debit_amount * credit_ratio);
474 let confidence = entry.average_confidence * credit_ratio as f32;
475
476 flows.push(TransactionFlow::with_provenance(
477 debit.account_index,
478 credit.account_index,
479 flow_amount,
480 entry.id,
481 0,
482 0,
483 entry.posting_date,
484 entry.solving_method,
485 confidence,
486 ));
487 }
488 }
489 flows
490 }
491 }
492 }
493
494 fn create_alert_from_label(
496 &self,
497 label: &super::AnomalyLabel,
498 entry: &crate::models::JournalEntry,
499 ) -> Alert {
500 let (alert_type, message, severity) = match label {
501 super::AnomalyLabel::FraudPattern(pattern) => {
502 let severity = match pattern {
503 crate::models::FraudPatternType::CircularFlow => AlertSeverity::Critical,
504 crate::models::FraudPatternType::HighVelocity => AlertSeverity::High,
505 crate::models::FraudPatternType::ThresholdClustering => AlertSeverity::High,
506 _ => AlertSeverity::Medium,
507 };
508 (
509 format!("Fraud: {:?}", pattern),
510 pattern.description().to_string(),
511 severity,
512 )
513 }
514 super::AnomalyLabel::GaapViolation(violation) => {
515 let severity = match violation.default_severity() {
516 crate::models::ViolationSeverity::Critical => AlertSeverity::Critical,
517 crate::models::ViolationSeverity::High => AlertSeverity::High,
518 crate::models::ViolationSeverity::Medium => AlertSeverity::Medium,
519 crate::models::ViolationSeverity::Low => AlertSeverity::Low,
520 };
521 (
522 format!("GAAP: {:?}", violation),
523 violation.description().to_string(),
524 severity,
525 )
526 }
527 super::AnomalyLabel::TimingAnomaly(desc) => (
528 "Timing".to_string(),
529 format!("Timing anomaly: {}", desc),
530 AlertSeverity::Medium,
531 ),
532 super::AnomalyLabel::AmountAnomaly(desc) => (
533 "Amount".to_string(),
534 format!("Amount anomaly: {}", desc),
535 AlertSeverity::Medium,
536 ),
537 };
538
539 Alert {
540 id: Uuid::new_v4(),
541 severity,
542 alert_type,
543 message,
544 accounts: vec![],
545 amount: Some(entry.total_debits),
546 timestamp: entry.posting_date,
547 }
548 }
549
550 pub fn pause(&mut self) {
552 self.is_paused = true;
553 let _ = self.event_sender.send(PipelineEvent::Paused);
554 }
555
556 pub fn resume(&mut self) {
558 self.is_paused = false;
559 let _ = self.event_sender.send(PipelineEvent::Resumed);
560 }
561
562 pub fn stop(&mut self) {
564 self.is_running = false;
565 let _ = self.event_sender.send(PipelineEvent::Stopped);
566 }
567
568 pub fn reset(&mut self) {
570 self.network = AccountingNetwork::new(self.entity_id, 2024, 1);
571
572 self.stats = PipelineStats::default();
576 self.start_time = None;
577
578 if let Some(ref mut injector) = self.injector {
579 injector.reset_stats();
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587
588 #[test]
589 fn test_pipeline_creation() {
590 let archetype = CompanyArchetype::retail_standard();
591 let gen_config = GeneratorConfig::default();
592 let pipe_config = PipelineConfig::default();
593
594 let pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
595 assert!(!pipeline.is_running());
596 assert!(!pipeline.is_paused());
597 }
598
599 #[test]
600 fn test_pipeline_tick() {
601 let archetype = CompanyArchetype::retail_standard();
602 let gen_config = GeneratorConfig {
603 seed: Some(42),
604 ..Default::default()
605 };
606 let pipe_config = PipelineConfig {
607 batch_size: 10,
608 inject_anomalies: false,
609 ..Default::default()
610 };
611
612 let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
613
614 let flows = pipeline.tick();
615 assert!(!flows.is_empty());
616 assert!(pipeline.stats().entries_generated > 0);
617 assert!(pipeline.stats().flows_created > 0);
618 }
619
620 #[test]
621 fn test_pipeline_pause_resume() {
622 let archetype = CompanyArchetype::retail_standard();
623 let gen_config = GeneratorConfig::default();
624 let pipe_config = PipelineConfig::default();
625
626 let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
627
628 pipeline.tick();
630 let initial_count = pipeline.stats().entries_generated;
631
632 pipeline.pause();
634 assert!(pipeline.is_paused());
635 pipeline.tick();
636 assert_eq!(pipeline.stats().entries_generated, initial_count);
637
638 pipeline.resume();
640 assert!(!pipeline.is_paused());
641 pipeline.tick();
642 assert!(pipeline.stats().entries_generated > initial_count);
643 }
644
645 #[test]
646 fn test_pipeline_with_anomalies() {
647 let archetype = CompanyArchetype::retail_standard();
648 let gen_config = GeneratorConfig {
649 seed: Some(42),
650 ..Default::default()
651 };
652 let pipe_config = PipelineConfig {
653 batch_size: 100,
654 inject_anomalies: true,
655 anomaly_config: AnomalyInjectionConfig {
656 injection_rate: 0.5, ..Default::default()
658 },
659 ..Default::default()
660 };
661
662 let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
663
664 for _ in 0..10 {
666 pipeline.tick();
667 }
668
669 assert!(pipeline.stats().anomalies_detected > 0);
671 }
672}