use super::{
AccountTypeInfo, AnomalyInjectionConfig, AnomalyInjector, ChartOfAccountsTemplate,
CompanyArchetype, GeneratorConfig, TransactionGenerator,
};
use crate::models::{
AccountingNetwork, Decimal128, FraudPattern, GaapViolation, HybridTimestamp, NetworkSnapshot,
TemporalAlert, TransactionFlow,
};
use std::time::Duration;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub tick_duration: Duration,
pub batch_size: usize,
pub channel_buffer: usize,
pub inject_anomalies: bool,
pub anomaly_config: AnomalyInjectionConfig,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
tick_duration: Duration::from_millis(100),
batch_size: 50,
channel_buffer: 1000,
inject_anomalies: true,
anomaly_config: AnomalyInjectionConfig::default(),
}
}
}
impl PipelineConfig {
pub fn fast() -> Self {
Self {
tick_duration: Duration::from_millis(10),
batch_size: 100,
..Default::default()
}
}
pub fn educational() -> Self {
Self {
tick_duration: Duration::from_millis(500),
batch_size: 5,
..Default::default()
}
}
}
#[derive(Debug, Clone)]
pub enum PipelineEvent {
EntriesGenerated {
count: usize,
timestamp: HybridTimestamp,
},
FlowsCreated {
flows: Vec<TransactionFlow>,
timestamp: HybridTimestamp,
},
NetworkUpdated(NetworkSnapshot),
AnomalyDetected(Alert),
FraudPatternDetected(FraudPattern),
GaapViolationDetected(GaapViolation),
TemporalAnomalyDetected(TemporalAlert),
StatsUpdated(PipelineStats),
Paused,
Resumed,
Stopped,
}
#[derive(Debug, Clone)]
pub struct Alert {
pub id: Uuid,
pub severity: AlertSeverity,
pub alert_type: String,
pub message: String,
pub accounts: Vec<u16>,
pub amount: Option<Decimal128>,
pub timestamp: HybridTimestamp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AlertSeverity {
Info,
Low,
Medium,
High,
Critical,
}
impl AlertSeverity {
pub fn color(&self) -> [u8; 3] {
match self {
AlertSeverity::Info => [100, 181, 246], AlertSeverity::Low => [255, 235, 59], AlertSeverity::Medium => [255, 152, 0], AlertSeverity::High => [244, 67, 54], AlertSeverity::Critical => [183, 28, 28], }
}
pub fn icon(&self) -> &'static str {
match self {
AlertSeverity::Info => "ℹ️",
AlertSeverity::Low => "⚠️",
AlertSeverity::Medium => "🔶",
AlertSeverity::High => "🔴",
AlertSeverity::Critical => "🚨",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PipelineStats {
pub entries_generated: u64,
pub flows_created: u64,
pub anomalies_detected: u64,
pub entries_per_second: f64,
pub flows_per_second: f64,
pub method_distribution: [u32; 5],
pub running_time_seconds: f64,
}
pub struct DataFabricPipeline {
entity_id: Uuid,
generator: TransactionGenerator,
injector: Option<AnomalyInjector>,
network: AccountingNetwork,
config: PipelineConfig,
event_sender: broadcast::Sender<PipelineEvent>,
is_running: bool,
is_paused: bool,
stats: PipelineStats,
start_time: Option<std::time::Instant>,
}
impl DataFabricPipeline {
pub fn new(
archetype: CompanyArchetype,
generator_config: GeneratorConfig,
pipeline_config: PipelineConfig,
) -> Self {
let entity_id = Uuid::new_v4();
let generator = TransactionGenerator::new(archetype.clone(), generator_config);
let coa = ChartOfAccountsTemplate::for_archetype(&archetype);
let mut network = AccountingNetwork::new(entity_id, 2024, 1);
for account_def in &coa.accounts {
let (node, metadata) = account_def.to_account(network.accounts.len() as u16);
network.add_account(node, metadata);
}
let injector = if pipeline_config.inject_anomalies {
let mut inj = AnomalyInjector::new(pipeline_config.anomaly_config.clone(), None);
for (idx, def) in coa.accounts.iter().enumerate() {
use crate::models::AccountType;
let info = AccountTypeInfo {
is_asset: def.account_type == AccountType::Asset,
is_liability: def.account_type == AccountType::Liability,
is_revenue: def.account_type == AccountType::Revenue,
is_expense: def.account_type == AccountType::Expense,
is_equity: def.account_type == AccountType::Equity,
is_cash: def.semantics & crate::models::AccountSemantics::IS_CASH != 0,
is_suspense: def.semantics & crate::models::AccountSemantics::IS_SUSPENSE != 0,
};
inj.register_account(idx as u16, info);
}
Some(inj)
} else {
None
};
let (event_sender, _) = broadcast::channel(pipeline_config.channel_buffer);
Self {
entity_id,
generator,
injector,
network,
config: pipeline_config,
event_sender,
is_running: false,
is_paused: false,
stats: PipelineStats::default(),
start_time: None,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<PipelineEvent> {
self.event_sender.subscribe()
}
pub fn network_snapshot(&self) -> NetworkSnapshot {
self.network.snapshot()
}
pub fn network(&self) -> &AccountingNetwork {
&self.network
}
pub fn network_mut(&mut self) -> &mut AccountingNetwork {
&mut self.network
}
pub fn stats(&self) -> &PipelineStats {
&self.stats
}
pub fn is_running(&self) -> bool {
self.is_running
}
pub fn is_paused(&self) -> bool {
self.is_paused
}
pub fn tick(&mut self) -> Vec<TransactionFlow> {
if self.is_paused {
return Vec::new();
}
if self.start_time.is_none() {
self.start_time = Some(std::time::Instant::now());
}
let entries = self.generator.generate_batch(self.config.batch_size);
let entry_count = entries.len();
self.stats.entries_generated += entry_count as u64;
let _ = self.event_sender.send(PipelineEvent::EntriesGenerated {
count: entry_count,
timestamp: HybridTimestamp::now(),
});
let mut all_flows = Vec::new();
for entry in entries {
let (final_entry, debit_lines, credit_lines, _anomaly_label) =
if let Some(ref mut injector) = self.injector {
let result =
injector.process(entry.entry, entry.debit_lines, entry.credit_lines);
if result.anomaly_injected {
self.stats.anomalies_detected += 1;
if let Some(ref label) = result.anomaly_label {
let alert = self.create_alert_from_label(label, &result.entry);
let _ = self
.event_sender
.send(PipelineEvent::AnomalyDetected(alert));
}
}
(
result.entry,
result.debit_lines,
result.credit_lines,
result.anomaly_label,
)
} else {
(entry.entry, entry.debit_lines, entry.credit_lines, None)
};
let flows = self.transform_to_flows(&final_entry, &debit_lines, &credit_lines);
self.stats.flows_created += flows.len() as u64;
let method_idx = final_entry.solving_method as usize;
if method_idx < 5 {
self.stats.method_distribution[method_idx] += 1;
}
for flow in &flows {
self.network.add_flow(flow.clone());
}
all_flows.extend(flows);
}
if !all_flows.is_empty() {
let _ = self.event_sender.send(PipelineEvent::FlowsCreated {
flows: all_flows.clone(),
timestamp: HybridTimestamp::now(),
});
}
self.network.update_statistics();
let _ = self
.event_sender
.send(PipelineEvent::NetworkUpdated(self.network.snapshot()));
if let Some(start) = self.start_time {
self.stats.running_time_seconds = start.elapsed().as_secs_f64();
if self.stats.running_time_seconds > 0.0 {
self.stats.entries_per_second =
self.stats.entries_generated as f64 / self.stats.running_time_seconds;
self.stats.flows_per_second =
self.stats.flows_created as f64 / self.stats.running_time_seconds;
}
}
all_flows
}
fn transform_to_flows(
&self,
entry: &crate::models::JournalEntry,
debit_lines: &[crate::models::JournalLineItem],
credit_lines: &[crate::models::JournalLineItem],
) -> Vec<TransactionFlow> {
use crate::models::SolvingMethod;
match entry.solving_method {
SolvingMethod::MethodA => {
if let (Some(debit), Some(credit)) = (debit_lines.first(), credit_lines.first()) {
vec![TransactionFlow::with_provenance(
debit.account_index,
credit.account_index,
debit.amount,
entry.id,
0,
0,
entry.posting_date,
SolvingMethod::MethodA,
1.0,
)]
} else {
Vec::new()
}
}
SolvingMethod::MethodB => {
let n = debit_lines.len().min(credit_lines.len());
(0..n)
.map(|i| {
TransactionFlow::with_provenance(
debit_lines[i].account_index,
credit_lines[i].account_index,
debit_lines[i].amount,
entry.id,
i as u16,
i as u16,
entry.posting_date,
SolvingMethod::MethodB,
1.0,
)
})
.collect()
}
_ => {
let total_credit: f64 = credit_lines.iter().map(|c| c.amount.to_f64()).sum();
if total_credit == 0.0 {
return Vec::new();
}
let mut flows = Vec::new();
for debit in debit_lines {
let debit_amount = debit.amount.to_f64();
for credit in credit_lines {
let credit_ratio = credit.amount.to_f64() / total_credit;
let flow_amount = Decimal128::from_f64(debit_amount * credit_ratio);
let confidence = entry.average_confidence * credit_ratio as f32;
flows.push(TransactionFlow::with_provenance(
debit.account_index,
credit.account_index,
flow_amount,
entry.id,
0,
0,
entry.posting_date,
entry.solving_method,
confidence,
));
}
}
flows
}
}
}
fn create_alert_from_label(
&self,
label: &super::AnomalyLabel,
entry: &crate::models::JournalEntry,
) -> Alert {
let (alert_type, message, severity) = match label {
super::AnomalyLabel::FraudPattern(pattern) => {
let severity = match pattern {
crate::models::FraudPatternType::CircularFlow => AlertSeverity::Critical,
crate::models::FraudPatternType::HighVelocity => AlertSeverity::High,
crate::models::FraudPatternType::ThresholdClustering => AlertSeverity::High,
_ => AlertSeverity::Medium,
};
(
format!("Fraud: {:?}", pattern),
pattern.description().to_string(),
severity,
)
}
super::AnomalyLabel::GaapViolation(violation) => {
let severity = match violation.default_severity() {
crate::models::ViolationSeverity::Critical => AlertSeverity::Critical,
crate::models::ViolationSeverity::High => AlertSeverity::High,
crate::models::ViolationSeverity::Medium => AlertSeverity::Medium,
crate::models::ViolationSeverity::Low => AlertSeverity::Low,
};
(
format!("GAAP: {:?}", violation),
violation.description().to_string(),
severity,
)
}
super::AnomalyLabel::TimingAnomaly(desc) => (
"Timing".to_string(),
format!("Timing anomaly: {}", desc),
AlertSeverity::Medium,
),
super::AnomalyLabel::AmountAnomaly(desc) => (
"Amount".to_string(),
format!("Amount anomaly: {}", desc),
AlertSeverity::Medium,
),
};
Alert {
id: Uuid::new_v4(),
severity,
alert_type,
message,
accounts: vec![],
amount: Some(entry.total_debits),
timestamp: entry.posting_date,
}
}
pub fn pause(&mut self) {
self.is_paused = true;
let _ = self.event_sender.send(PipelineEvent::Paused);
}
pub fn resume(&mut self) {
self.is_paused = false;
let _ = self.event_sender.send(PipelineEvent::Resumed);
}
pub fn stop(&mut self) {
self.is_running = false;
let _ = self.event_sender.send(PipelineEvent::Stopped);
}
pub fn reset(&mut self) {
self.network = AccountingNetwork::new(self.entity_id, 2024, 1);
self.stats = PipelineStats::default();
self.start_time = None;
if let Some(ref mut injector) = self.injector {
injector.reset_stats();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_creation() {
let archetype = CompanyArchetype::retail_standard();
let gen_config = GeneratorConfig::default();
let pipe_config = PipelineConfig::default();
let pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
assert!(!pipeline.is_running());
assert!(!pipeline.is_paused());
}
#[test]
fn test_pipeline_tick() {
let archetype = CompanyArchetype::retail_standard();
let gen_config = GeneratorConfig {
seed: Some(42),
..Default::default()
};
let pipe_config = PipelineConfig {
batch_size: 10,
inject_anomalies: false,
..Default::default()
};
let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
let flows = pipeline.tick();
assert!(!flows.is_empty());
assert!(pipeline.stats().entries_generated > 0);
assert!(pipeline.stats().flows_created > 0);
}
#[test]
fn test_pipeline_pause_resume() {
let archetype = CompanyArchetype::retail_standard();
let gen_config = GeneratorConfig::default();
let pipe_config = PipelineConfig::default();
let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
pipeline.tick();
let initial_count = pipeline.stats().entries_generated;
pipeline.pause();
assert!(pipeline.is_paused());
pipeline.tick();
assert_eq!(pipeline.stats().entries_generated, initial_count);
pipeline.resume();
assert!(!pipeline.is_paused());
pipeline.tick();
assert!(pipeline.stats().entries_generated > initial_count);
}
#[test]
fn test_pipeline_with_anomalies() {
let archetype = CompanyArchetype::retail_standard();
let gen_config = GeneratorConfig {
seed: Some(42),
..Default::default()
};
let pipe_config = PipelineConfig {
batch_size: 100,
inject_anomalies: true,
anomaly_config: AnomalyInjectionConfig {
injection_rate: 0.5, ..Default::default()
},
..Default::default()
};
let mut pipeline = DataFabricPipeline::new(archetype, gen_config, pipe_config);
for _ in 0..10 {
pipeline.tick();
}
assert!(pipeline.stats().anomalies_detected > 0);
}
}