1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use chrono::{Datelike, NaiveDate};
20use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
21use serde::{Deserialize, Serialize};
22use tracing::{debug, info, warn};
23
24use datasynth_banking::{
25 models::{BankAccount, BankTransaction, BankingCustomer},
26 BankingOrchestratorBuilder,
27};
28use datasynth_config::schema::GeneratorConfig;
29use datasynth_core::error::{SynthError, SynthResult};
30use datasynth_core::models::audit::{
31 AuditEngagement, AuditEvidence, AuditFinding, ProfessionalJudgment, RiskAssessment, Workpaper,
32};
33use datasynth_core::models::subledger::ap::APInvoice;
34use datasynth_core::models::subledger::ar::ARInvoice;
35use datasynth_core::models::*;
36use datasynth_core::{DegradationActions, DegradationLevel, ResourceGuard, ResourceGuardBuilder};
37use datasynth_fingerprint::{
38 io::FingerprintReader,
39 models::Fingerprint,
40 synthesis::{ConfigSynthesizer, CopulaGeneratorSpec, SynthesisOptions},
41};
42use datasynth_generators::{
43 AnomalyInjector,
45 AnomalyInjectorConfig,
46 AssetGenerator,
47 AuditEngagementGenerator,
49 BalanceTrackerConfig,
50 ChartOfAccountsGenerator,
52 CustomerGenerator,
53 DataQualityConfig,
54 DataQualityInjector,
56 DataQualityStats,
57 DocumentFlowJeConfig,
59 DocumentFlowJeGenerator,
60 DocumentFlowLinker,
62 EmployeeGenerator,
63 EvidenceGenerator,
64 FindingGenerator,
65 JournalEntryGenerator,
66 JudgmentGenerator,
67 LatePaymentDistribution,
68 MaterialGenerator,
69 O2CDocumentChain,
70 O2CGenerator,
71 O2CGeneratorConfig,
72 O2CPaymentBehavior,
73 P2PDocumentChain,
74 P2PGenerator,
76 P2PGeneratorConfig,
77 P2PPaymentBehavior,
78 RiskAssessmentGenerator,
79 RunningBalanceTracker,
81 ValidationError,
82 VendorGenerator,
84 WorkpaperGenerator,
85};
86use datasynth_graph::{
87 PyGExportConfig, PyGExporter, TransactionGraphBuilder, TransactionGraphConfig,
88};
89use datasynth_ocpm::{
90 EventLogMetadata, O2cDocuments, OcpmEventGenerator, OcpmEventLog, OcpmGeneratorConfig,
91 P2pDocuments,
92};
93
94use datasynth_config::schema::{O2CFlowConfig, P2PFlowConfig};
95use datasynth_core::models::documents::PaymentMethod;
96
97fn convert_p2p_config(schema_config: &P2PFlowConfig) -> P2PGeneratorConfig {
103 let payment_behavior = &schema_config.payment_behavior;
104 let late_dist = &payment_behavior.late_payment_days_distribution;
105
106 P2PGeneratorConfig {
107 three_way_match_rate: schema_config.three_way_match_rate,
108 partial_delivery_rate: schema_config.partial_delivery_rate,
109 over_delivery_rate: 0.02, price_variance_rate: schema_config.price_variance_rate,
111 max_price_variance_percent: schema_config.max_price_variance_percent,
112 avg_days_po_to_gr: schema_config.average_po_to_gr_days,
113 avg_days_gr_to_invoice: schema_config.average_gr_to_invoice_days,
114 avg_days_invoice_to_payment: schema_config.average_invoice_to_payment_days,
115 payment_method_distribution: vec![
116 (PaymentMethod::BankTransfer, 0.60),
117 (PaymentMethod::Check, 0.25),
118 (PaymentMethod::Wire, 0.10),
119 (PaymentMethod::CreditCard, 0.05),
120 ],
121 early_payment_discount_rate: 0.30, payment_behavior: P2PPaymentBehavior {
123 late_payment_rate: payment_behavior.late_payment_rate,
124 late_payment_distribution: LatePaymentDistribution {
125 slightly_late_1_to_7: late_dist.slightly_late_1_to_7,
126 late_8_to_14: late_dist.late_8_to_14,
127 very_late_15_to_30: late_dist.very_late_15_to_30,
128 severely_late_31_to_60: late_dist.severely_late_31_to_60,
129 extremely_late_over_60: late_dist.extremely_late_over_60,
130 },
131 partial_payment_rate: payment_behavior.partial_payment_rate,
132 payment_correction_rate: payment_behavior.payment_correction_rate,
133 },
134 }
135}
136
137fn convert_o2c_config(schema_config: &O2CFlowConfig) -> O2CGeneratorConfig {
139 let payment_behavior = &schema_config.payment_behavior;
140
141 O2CGeneratorConfig {
142 credit_check_failure_rate: schema_config.credit_check_failure_rate,
143 partial_shipment_rate: schema_config.partial_shipment_rate,
144 avg_days_so_to_delivery: schema_config.average_so_to_delivery_days,
145 avg_days_delivery_to_invoice: schema_config.average_delivery_to_invoice_days,
146 avg_days_invoice_to_payment: schema_config.average_invoice_to_receipt_days,
147 late_payment_rate: 0.15, bad_debt_rate: schema_config.bad_debt_rate,
149 returns_rate: schema_config.return_rate,
150 cash_discount_take_rate: schema_config.cash_discount.taken_rate,
151 payment_method_distribution: vec![
152 (PaymentMethod::BankTransfer, 0.50),
153 (PaymentMethod::Check, 0.30),
154 (PaymentMethod::Wire, 0.15),
155 (PaymentMethod::CreditCard, 0.05),
156 ],
157 payment_behavior: O2CPaymentBehavior {
158 partial_payment_rate: payment_behavior.partial_payments.rate,
159 short_payment_rate: payment_behavior.short_payments.rate,
160 max_short_percent: payment_behavior.short_payments.max_short_percent,
161 on_account_rate: payment_behavior.on_account_payments.rate,
162 payment_correction_rate: payment_behavior.payment_corrections.rate,
163 avg_days_until_remainder: payment_behavior.partial_payments.avg_days_until_remainder,
164 },
165 }
166}
167
168#[derive(Debug, Clone)]
170pub struct PhaseConfig {
171 pub generate_master_data: bool,
173 pub generate_document_flows: bool,
175 pub generate_ocpm_events: bool,
177 pub generate_journal_entries: bool,
179 pub inject_anomalies: bool,
181 pub inject_data_quality: bool,
183 pub validate_balances: bool,
185 pub show_progress: bool,
187 pub vendors_per_company: usize,
189 pub customers_per_company: usize,
191 pub materials_per_company: usize,
193 pub assets_per_company: usize,
195 pub employees_per_company: usize,
197 pub p2p_chains: usize,
199 pub o2c_chains: usize,
201 pub generate_audit: bool,
203 pub audit_engagements: usize,
205 pub workpapers_per_engagement: usize,
207 pub evidence_per_workpaper: usize,
209 pub risks_per_engagement: usize,
211 pub findings_per_engagement: usize,
213 pub judgments_per_engagement: usize,
215 pub generate_banking: bool,
217 pub generate_graph_export: bool,
219}
220
221impl Default for PhaseConfig {
222 fn default() -> Self {
223 Self {
224 generate_master_data: true,
225 generate_document_flows: true,
226 generate_ocpm_events: false, generate_journal_entries: true,
228 inject_anomalies: false,
229 inject_data_quality: false, validate_balances: true,
231 show_progress: true,
232 vendors_per_company: 50,
233 customers_per_company: 100,
234 materials_per_company: 200,
235 assets_per_company: 50,
236 employees_per_company: 100,
237 p2p_chains: 100,
238 o2c_chains: 100,
239 generate_audit: false, audit_engagements: 5,
241 workpapers_per_engagement: 20,
242 evidence_per_workpaper: 5,
243 risks_per_engagement: 15,
244 findings_per_engagement: 8,
245 judgments_per_engagement: 10,
246 generate_banking: false, generate_graph_export: false, }
249 }
250}
251
252#[derive(Debug, Clone, Default)]
254pub struct MasterDataSnapshot {
255 pub vendors: Vec<Vendor>,
257 pub customers: Vec<Customer>,
259 pub materials: Vec<Material>,
261 pub assets: Vec<FixedAsset>,
263 pub employees: Vec<Employee>,
265}
266
267#[derive(Debug, Clone)]
269pub struct HypergraphExportInfo {
270 pub node_count: usize,
272 pub edge_count: usize,
274 pub hyperedge_count: usize,
276 pub output_path: PathBuf,
278}
279
280#[derive(Debug, Clone, Default)]
282pub struct DocumentFlowSnapshot {
283 pub p2p_chains: Vec<P2PDocumentChain>,
285 pub o2c_chains: Vec<O2CDocumentChain>,
287 pub purchase_orders: Vec<documents::PurchaseOrder>,
289 pub goods_receipts: Vec<documents::GoodsReceipt>,
291 pub vendor_invoices: Vec<documents::VendorInvoice>,
293 pub sales_orders: Vec<documents::SalesOrder>,
295 pub deliveries: Vec<documents::Delivery>,
297 pub customer_invoices: Vec<documents::CustomerInvoice>,
299 pub payments: Vec<documents::Payment>,
301}
302
303#[derive(Debug, Clone, Default)]
305pub struct SubledgerSnapshot {
306 pub ap_invoices: Vec<APInvoice>,
308 pub ar_invoices: Vec<ARInvoice>,
310}
311
312#[derive(Debug, Clone, Default)]
314pub struct OcpmSnapshot {
315 pub event_log: Option<OcpmEventLog>,
317 pub event_count: usize,
319 pub object_count: usize,
321 pub case_count: usize,
323}
324
325#[derive(Debug, Clone, Default)]
327pub struct AuditSnapshot {
328 pub engagements: Vec<AuditEngagement>,
330 pub workpapers: Vec<Workpaper>,
332 pub evidence: Vec<AuditEvidence>,
334 pub risk_assessments: Vec<RiskAssessment>,
336 pub findings: Vec<AuditFinding>,
338 pub judgments: Vec<ProfessionalJudgment>,
340}
341
342#[derive(Debug, Clone, Default)]
344pub struct BankingSnapshot {
345 pub customers: Vec<BankingCustomer>,
347 pub accounts: Vec<BankAccount>,
349 pub transactions: Vec<BankTransaction>,
351 pub suspicious_count: usize,
353 pub scenario_count: usize,
355}
356
357#[derive(Debug, Clone, Default)]
359pub struct GraphExportSnapshot {
360 pub exported: bool,
362 pub graph_count: usize,
364 pub exports: HashMap<String, GraphExportInfo>,
366}
367
368#[derive(Debug, Clone)]
370pub struct GraphExportInfo {
371 pub name: String,
373 pub format: String,
375 pub output_path: PathBuf,
377 pub node_count: usize,
379 pub edge_count: usize,
381}
382
383#[derive(Debug, Clone, Default)]
385pub struct AnomalyLabels {
386 pub labels: Vec<LabeledAnomaly>,
388 pub summary: Option<AnomalySummary>,
390 pub by_type: HashMap<String, usize>,
392}
393
394#[derive(Debug, Clone, Default)]
396pub struct BalanceValidationResult {
397 pub validated: bool,
399 pub is_balanced: bool,
401 pub entries_processed: u64,
403 pub total_debits: rust_decimal::Decimal,
405 pub total_credits: rust_decimal::Decimal,
407 pub accounts_tracked: usize,
409 pub companies_tracked: usize,
411 pub validation_errors: Vec<ValidationError>,
413 pub has_unbalanced_entries: bool,
415}
416
417#[derive(Debug)]
419pub struct EnhancedGenerationResult {
420 pub chart_of_accounts: ChartOfAccounts,
422 pub master_data: MasterDataSnapshot,
424 pub document_flows: DocumentFlowSnapshot,
426 pub subledger: SubledgerSnapshot,
428 pub ocpm: OcpmSnapshot,
430 pub audit: AuditSnapshot,
432 pub banking: BankingSnapshot,
434 pub graph_export: GraphExportSnapshot,
436 pub journal_entries: Vec<JournalEntry>,
438 pub anomaly_labels: AnomalyLabels,
440 pub balance_validation: BalanceValidationResult,
442 pub data_quality_stats: DataQualityStats,
444 pub statistics: EnhancedGenerationStatistics,
446}
447
448#[derive(Debug, Clone, Default, Serialize, Deserialize)]
450pub struct EnhancedGenerationStatistics {
451 pub total_entries: u64,
453 pub total_line_items: u64,
455 pub accounts_count: usize,
457 pub companies_count: usize,
459 pub period_months: u32,
461 pub vendor_count: usize,
463 pub customer_count: usize,
464 pub material_count: usize,
465 pub asset_count: usize,
466 pub employee_count: usize,
467 pub p2p_chain_count: usize,
469 pub o2c_chain_count: usize,
470 pub ap_invoice_count: usize,
472 pub ar_invoice_count: usize,
473 pub ocpm_event_count: usize,
475 pub ocpm_object_count: usize,
476 pub ocpm_case_count: usize,
477 pub audit_engagement_count: usize,
479 pub audit_workpaper_count: usize,
480 pub audit_evidence_count: usize,
481 pub audit_risk_count: usize,
482 pub audit_finding_count: usize,
483 pub audit_judgment_count: usize,
484 pub anomalies_injected: usize,
486 pub data_quality_issues: usize,
488 pub banking_customer_count: usize,
490 pub banking_account_count: usize,
491 pub banking_transaction_count: usize,
492 pub banking_suspicious_count: usize,
493 pub graph_export_count: usize,
495 pub graph_node_count: usize,
496 pub graph_edge_count: usize,
497}
498
499pub struct EnhancedOrchestrator {
501 config: GeneratorConfig,
502 phase_config: PhaseConfig,
503 coa: Option<Arc<ChartOfAccounts>>,
504 master_data: MasterDataSnapshot,
505 seed: u64,
506 multi_progress: Option<MultiProgress>,
507 resource_guard: ResourceGuard,
509 output_path: Option<PathBuf>,
511 copula_generators: Vec<CopulaGeneratorSpec>,
513}
514
515impl EnhancedOrchestrator {
516 pub fn new(config: GeneratorConfig, phase_config: PhaseConfig) -> SynthResult<Self> {
518 datasynth_config::validate_config(&config)?;
519
520 let seed = config.global.seed.unwrap_or_else(rand::random);
521
522 let resource_guard = Self::build_resource_guard(&config, None);
524
525 Ok(Self {
526 config,
527 phase_config,
528 coa: None,
529 master_data: MasterDataSnapshot::default(),
530 seed,
531 multi_progress: None,
532 resource_guard,
533 output_path: None,
534 copula_generators: Vec::new(),
535 })
536 }
537
538 pub fn with_defaults(config: GeneratorConfig) -> SynthResult<Self> {
540 Self::new(config, PhaseConfig::default())
541 }
542
543 pub fn with_progress(mut self, show: bool) -> Self {
545 self.phase_config.show_progress = show;
546 if show {
547 self.multi_progress = Some(MultiProgress::new());
548 }
549 self
550 }
551
552 pub fn with_output_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
554 let path = path.into();
555 self.output_path = Some(path.clone());
556 self.resource_guard = Self::build_resource_guard(&self.config, Some(path));
558 self
559 }
560
561 pub fn has_copulas(&self) -> bool {
566 !self.copula_generators.is_empty()
567 }
568
569 pub fn copulas(&self) -> &[CopulaGeneratorSpec] {
575 &self.copula_generators
576 }
577
578 pub fn copulas_mut(&mut self) -> &mut [CopulaGeneratorSpec] {
582 &mut self.copula_generators
583 }
584
585 pub fn sample_from_copula(&mut self, copula_name: &str) -> Option<Vec<f64>> {
589 self.copula_generators
590 .iter_mut()
591 .find(|c| c.name == copula_name)
592 .map(|c| c.generator.sample())
593 }
594
595 pub fn from_fingerprint(
618 fingerprint_path: &std::path::Path,
619 phase_config: PhaseConfig,
620 scale: f64,
621 ) -> SynthResult<Self> {
622 info!("Loading fingerprint from: {}", fingerprint_path.display());
623
624 let reader = FingerprintReader::new();
626 let fingerprint = reader
627 .read_from_file(fingerprint_path)
628 .map_err(|e| SynthError::config(format!("Failed to read fingerprint: {}", e)))?;
629
630 Self::from_fingerprint_data(fingerprint, phase_config, scale)
631 }
632
633 pub fn from_fingerprint_data(
640 fingerprint: Fingerprint,
641 phase_config: PhaseConfig,
642 scale: f64,
643 ) -> SynthResult<Self> {
644 info!(
645 "Synthesizing config from fingerprint (version: {}, tables: {})",
646 fingerprint.manifest.version,
647 fingerprint.schema.tables.len()
648 );
649
650 let seed: u64 = rand::random();
652
653 let options = SynthesisOptions {
655 scale,
656 seed: Some(seed),
657 preserve_correlations: true,
658 inject_anomalies: true,
659 };
660 let synthesizer = ConfigSynthesizer::with_options(options);
661
662 let synthesis_result = synthesizer
664 .synthesize_full(&fingerprint, seed)
665 .map_err(|e| {
666 SynthError::config(format!(
667 "Failed to synthesize config from fingerprint: {}",
668 e
669 ))
670 })?;
671
672 let mut config = if let Some(ref industry) = fingerprint.manifest.source.industry {
674 Self::base_config_for_industry(industry)
675 } else {
676 Self::base_config_for_industry("manufacturing")
677 };
678
679 config = Self::apply_config_patch(config, &synthesis_result.config_patch);
681
682 info!(
684 "Config synthesized: {} tables, scale={:.2}, copula generators: {}",
685 fingerprint.schema.tables.len(),
686 scale,
687 synthesis_result.copula_generators.len()
688 );
689
690 if !synthesis_result.copula_generators.is_empty() {
691 for spec in &synthesis_result.copula_generators {
692 info!(
693 " Copula '{}' for table '{}': {} columns",
694 spec.name,
695 spec.table,
696 spec.columns.len()
697 );
698 }
699 }
700
701 let mut orchestrator = Self::new(config, phase_config)?;
703
704 orchestrator.copula_generators = synthesis_result.copula_generators;
706
707 Ok(orchestrator)
708 }
709
710 fn base_config_for_industry(industry: &str) -> GeneratorConfig {
712 use datasynth_config::presets::create_preset;
713 use datasynth_config::TransactionVolume;
714 use datasynth_core::models::{CoAComplexity, IndustrySector};
715
716 let sector = match industry.to_lowercase().as_str() {
717 "manufacturing" => IndustrySector::Manufacturing,
718 "retail" => IndustrySector::Retail,
719 "financial" | "financial_services" => IndustrySector::FinancialServices,
720 "healthcare" => IndustrySector::Healthcare,
721 "technology" | "tech" => IndustrySector::Technology,
722 _ => IndustrySector::Manufacturing,
723 };
724
725 create_preset(
727 sector,
728 1, 12, CoAComplexity::Medium,
731 TransactionVolume::TenK,
732 )
733 }
734
735 fn apply_config_patch(
737 mut config: GeneratorConfig,
738 patch: &datasynth_fingerprint::synthesis::ConfigPatch,
739 ) -> GeneratorConfig {
740 use datasynth_fingerprint::synthesis::ConfigValue;
741
742 for (key, value) in patch.values() {
743 match (key.as_str(), value) {
744 ("transactions.count", ConfigValue::Integer(n)) => {
747 info!(
748 "Fingerprint suggests {} transactions (apply via company volumes)",
749 n
750 );
751 }
752 ("global.period_months", ConfigValue::Integer(n)) => {
753 config.global.period_months = *n as u32;
754 }
755 ("global.start_date", ConfigValue::String(s)) => {
756 config.global.start_date = s.clone();
757 }
758 ("global.seed", ConfigValue::Integer(n)) => {
759 config.global.seed = Some(*n as u64);
760 }
761 ("fraud.enabled", ConfigValue::Bool(b)) => {
762 config.fraud.enabled = *b;
763 }
764 ("fraud.fraud_rate", ConfigValue::Float(f)) => {
765 config.fraud.fraud_rate = *f;
766 }
767 ("data_quality.enabled", ConfigValue::Bool(b)) => {
768 config.data_quality.enabled = *b;
769 }
770 ("anomaly_injection.enabled", ConfigValue::Bool(b)) => {
772 config.fraud.enabled = *b;
773 }
774 ("anomaly_injection.overall_rate", ConfigValue::Float(f)) => {
775 config.fraud.fraud_rate = *f;
776 }
777 _ => {
778 debug!("Ignoring unknown config patch key: {}", key);
779 }
780 }
781 }
782
783 config
784 }
785
786 fn build_resource_guard(
788 config: &GeneratorConfig,
789 output_path: Option<PathBuf>,
790 ) -> ResourceGuard {
791 let mut builder = ResourceGuardBuilder::new();
792
793 if config.global.memory_limit_mb > 0 {
795 builder = builder.memory_limit(config.global.memory_limit_mb);
796 }
797
798 if let Some(path) = output_path {
800 builder = builder.output_path(path).min_free_disk(100); }
802
803 builder = builder.conservative();
805
806 builder.build()
807 }
808
809 fn check_resources(&self) -> SynthResult<DegradationLevel> {
814 self.resource_guard.check()
815 }
816
817 fn check_resources_with_log(&self, phase: &str) -> SynthResult<DegradationLevel> {
819 let level = self.resource_guard.check()?;
820
821 if level != DegradationLevel::Normal {
822 warn!(
823 "Resource degradation at {}: level={}, memory={}MB, disk={}MB",
824 phase,
825 level,
826 self.resource_guard.current_memory_mb(),
827 self.resource_guard.available_disk_mb()
828 );
829 }
830
831 Ok(level)
832 }
833
834 fn get_degradation_actions(&self) -> DegradationActions {
836 self.resource_guard.get_actions()
837 }
838
839 fn check_memory_limit(&self) -> SynthResult<()> {
841 self.check_resources()?;
842 Ok(())
843 }
844
845 pub fn generate(&mut self) -> SynthResult<EnhancedGenerationResult> {
847 info!("Starting enhanced generation workflow");
848 info!(
849 "Config: industry={:?}, period_months={}, companies={}",
850 self.config.global.industry,
851 self.config.global.period_months,
852 self.config.companies.len()
853 );
854
855 let initial_level = self.check_resources_with_log("initial")?;
857 if initial_level == DegradationLevel::Emergency {
858 return Err(SynthError::resource(
859 "Insufficient resources to start generation",
860 ));
861 }
862
863 let mut stats = EnhancedGenerationStatistics {
864 companies_count: self.config.companies.len(),
865 period_months: self.config.global.period_months,
866 ..Default::default()
867 };
868
869 let coa = self.phase_chart_of_accounts(&mut stats)?;
871
872 self.phase_master_data(&mut stats)?;
874
875 let (document_flows, subledger) = self.phase_document_flows(&mut stats)?;
877
878 let ocpm = self.phase_ocpm_events(&document_flows, &mut stats)?;
880
881 let mut entries = self.phase_journal_entries(&coa, &document_flows, &mut stats)?;
883
884 let actions = self.get_degradation_actions();
886
887 let anomaly_labels = self.phase_anomaly_injection(&mut entries, &actions, &mut stats)?;
889
890 let balance_validation = self.phase_balance_validation(&entries)?;
892
893 let data_quality_stats =
895 self.phase_data_quality_injection(&mut entries, &actions, &mut stats)?;
896
897 let audit = self.phase_audit_data(&entries, &mut stats)?;
899
900 let banking = self.phase_banking_data(&mut stats)?;
902
903 let graph_export = self.phase_graph_export(&entries, &coa, &mut stats)?;
905
906 self.phase_hypergraph_export(&coa, &entries, &document_flows, &mut stats)?;
908
909 let resource_stats = self.resource_guard.stats();
911 info!(
912 "Generation workflow complete. Resource stats: memory_peak={}MB, disk_written={}bytes, degradation_level={}",
913 resource_stats.memory.peak_resident_bytes / (1024 * 1024),
914 resource_stats.disk.estimated_bytes_written,
915 resource_stats.degradation_level
916 );
917
918 Ok(EnhancedGenerationResult {
919 chart_of_accounts: (*coa).clone(),
920 master_data: self.master_data.clone(),
921 document_flows,
922 subledger,
923 ocpm,
924 audit,
925 banking,
926 graph_export,
927 journal_entries: entries,
928 anomaly_labels,
929 balance_validation,
930 data_quality_stats,
931 statistics: stats,
932 })
933 }
934
935 fn phase_chart_of_accounts(
941 &mut self,
942 stats: &mut EnhancedGenerationStatistics,
943 ) -> SynthResult<Arc<ChartOfAccounts>> {
944 info!("Phase 1: Generating Chart of Accounts");
945 let coa = self.generate_coa()?;
946 stats.accounts_count = coa.account_count();
947 info!(
948 "Chart of Accounts generated: {} accounts",
949 stats.accounts_count
950 );
951 self.check_resources_with_log("post-coa")?;
952 Ok(coa)
953 }
954
955 fn phase_master_data(&mut self, stats: &mut EnhancedGenerationStatistics) -> SynthResult<()> {
957 if self.phase_config.generate_master_data {
958 info!("Phase 2: Generating Master Data");
959 self.generate_master_data()?;
960 stats.vendor_count = self.master_data.vendors.len();
961 stats.customer_count = self.master_data.customers.len();
962 stats.material_count = self.master_data.materials.len();
963 stats.asset_count = self.master_data.assets.len();
964 stats.employee_count = self.master_data.employees.len();
965 info!(
966 "Master data generated: {} vendors, {} customers, {} materials, {} assets, {} employees",
967 stats.vendor_count, stats.customer_count, stats.material_count,
968 stats.asset_count, stats.employee_count
969 );
970 self.check_resources_with_log("post-master-data")?;
971 } else {
972 debug!("Phase 2: Skipped (master data generation disabled)");
973 }
974 Ok(())
975 }
976
977 fn phase_document_flows(
979 &mut self,
980 stats: &mut EnhancedGenerationStatistics,
981 ) -> SynthResult<(DocumentFlowSnapshot, SubledgerSnapshot)> {
982 let mut document_flows = DocumentFlowSnapshot::default();
983 let mut subledger = SubledgerSnapshot::default();
984
985 if self.phase_config.generate_document_flows && !self.master_data.vendors.is_empty() {
986 info!("Phase 3: Generating Document Flows");
987 self.generate_document_flows(&mut document_flows)?;
988 stats.p2p_chain_count = document_flows.p2p_chains.len();
989 stats.o2c_chain_count = document_flows.o2c_chains.len();
990 info!(
991 "Document flows generated: {} P2P chains, {} O2C chains",
992 stats.p2p_chain_count, stats.o2c_chain_count
993 );
994
995 debug!("Phase 3b: Linking document flows to subledgers");
997 subledger = self.link_document_flows_to_subledgers(&document_flows)?;
998 stats.ap_invoice_count = subledger.ap_invoices.len();
999 stats.ar_invoice_count = subledger.ar_invoices.len();
1000 debug!(
1001 "Subledgers linked: {} AP invoices, {} AR invoices",
1002 stats.ap_invoice_count, stats.ar_invoice_count
1003 );
1004
1005 self.check_resources_with_log("post-document-flows")?;
1006 } else {
1007 debug!("Phase 3: Skipped (document flow generation disabled or no master data)");
1008 }
1009
1010 Ok((document_flows, subledger))
1011 }
1012
1013 fn phase_ocpm_events(
1015 &mut self,
1016 document_flows: &DocumentFlowSnapshot,
1017 stats: &mut EnhancedGenerationStatistics,
1018 ) -> SynthResult<OcpmSnapshot> {
1019 if self.phase_config.generate_ocpm_events && !document_flows.p2p_chains.is_empty() {
1020 info!("Phase 3c: Generating OCPM Events");
1021 let ocpm_snapshot = self.generate_ocpm_events(document_flows)?;
1022 stats.ocpm_event_count = ocpm_snapshot.event_count;
1023 stats.ocpm_object_count = ocpm_snapshot.object_count;
1024 stats.ocpm_case_count = ocpm_snapshot.case_count;
1025 info!(
1026 "OCPM events generated: {} events, {} objects, {} cases",
1027 stats.ocpm_event_count, stats.ocpm_object_count, stats.ocpm_case_count
1028 );
1029 self.check_resources_with_log("post-ocpm")?;
1030 Ok(ocpm_snapshot)
1031 } else {
1032 debug!("Phase 3c: Skipped (OCPM generation disabled or no document flows)");
1033 Ok(OcpmSnapshot::default())
1034 }
1035 }
1036
1037 fn phase_journal_entries(
1039 &mut self,
1040 coa: &Arc<ChartOfAccounts>,
1041 document_flows: &DocumentFlowSnapshot,
1042 stats: &mut EnhancedGenerationStatistics,
1043 ) -> SynthResult<Vec<JournalEntry>> {
1044 let mut entries = Vec::new();
1045
1046 if self.phase_config.generate_document_flows && !document_flows.p2p_chains.is_empty() {
1048 debug!("Phase 4a: Generating JEs from document flows");
1049 let flow_entries = self.generate_jes_from_document_flows(document_flows)?;
1050 debug!("Generated {} JEs from document flows", flow_entries.len());
1051 entries.extend(flow_entries);
1052 }
1053
1054 if self.phase_config.generate_journal_entries {
1056 info!("Phase 4: Generating Journal Entries");
1057 let je_entries = self.generate_journal_entries(coa)?;
1058 info!("Generated {} standalone journal entries", je_entries.len());
1059 entries.extend(je_entries);
1060 } else {
1061 debug!("Phase 4: Skipped (journal entry generation disabled)");
1062 }
1063
1064 if !entries.is_empty() {
1065 stats.total_entries = entries.len() as u64;
1066 stats.total_line_items = entries.iter().map(|e| e.line_count() as u64).sum();
1067 info!(
1068 "Total entries: {}, total line items: {}",
1069 stats.total_entries, stats.total_line_items
1070 );
1071 self.check_resources_with_log("post-journal-entries")?;
1072 }
1073
1074 Ok(entries)
1075 }
1076
1077 fn phase_anomaly_injection(
1079 &mut self,
1080 entries: &mut [JournalEntry],
1081 actions: &DegradationActions,
1082 stats: &mut EnhancedGenerationStatistics,
1083 ) -> SynthResult<AnomalyLabels> {
1084 if self.phase_config.inject_anomalies
1085 && !entries.is_empty()
1086 && !actions.skip_anomaly_injection
1087 {
1088 info!("Phase 5: Injecting Anomalies");
1089 let result = self.inject_anomalies(entries)?;
1090 stats.anomalies_injected = result.labels.len();
1091 info!("Injected {} anomalies", stats.anomalies_injected);
1092 self.check_resources_with_log("post-anomaly-injection")?;
1093 Ok(result)
1094 } else if actions.skip_anomaly_injection {
1095 warn!("Phase 5: Skipped due to resource degradation");
1096 Ok(AnomalyLabels::default())
1097 } else {
1098 debug!("Phase 5: Skipped (anomaly injection disabled or no entries)");
1099 Ok(AnomalyLabels::default())
1100 }
1101 }
1102
1103 fn phase_balance_validation(
1105 &mut self,
1106 entries: &[JournalEntry],
1107 ) -> SynthResult<BalanceValidationResult> {
1108 if self.phase_config.validate_balances && !entries.is_empty() {
1109 debug!("Phase 6: Validating Balances");
1110 let balance_validation = self.validate_journal_entries(entries)?;
1111 if balance_validation.is_balanced {
1112 debug!("Balance validation passed");
1113 } else {
1114 warn!(
1115 "Balance validation found {} errors",
1116 balance_validation.validation_errors.len()
1117 );
1118 }
1119 Ok(balance_validation)
1120 } else {
1121 Ok(BalanceValidationResult::default())
1122 }
1123 }
1124
1125 fn phase_data_quality_injection(
1127 &mut self,
1128 entries: &mut [JournalEntry],
1129 actions: &DegradationActions,
1130 stats: &mut EnhancedGenerationStatistics,
1131 ) -> SynthResult<DataQualityStats> {
1132 if self.phase_config.inject_data_quality
1133 && !entries.is_empty()
1134 && !actions.skip_data_quality
1135 {
1136 info!("Phase 7: Injecting Data Quality Variations");
1137 let dq_stats = self.inject_data_quality(entries)?;
1138 stats.data_quality_issues = dq_stats.records_with_issues;
1139 info!("Injected {} data quality issues", stats.data_quality_issues);
1140 self.check_resources_with_log("post-data-quality")?;
1141 Ok(dq_stats)
1142 } else if actions.skip_data_quality {
1143 warn!("Phase 7: Skipped due to resource degradation");
1144 Ok(DataQualityStats::default())
1145 } else {
1146 debug!("Phase 7: Skipped (data quality injection disabled or no entries)");
1147 Ok(DataQualityStats::default())
1148 }
1149 }
1150
1151 fn phase_audit_data(
1153 &mut self,
1154 entries: &[JournalEntry],
1155 stats: &mut EnhancedGenerationStatistics,
1156 ) -> SynthResult<AuditSnapshot> {
1157 if self.phase_config.generate_audit {
1158 info!("Phase 8: Generating Audit Data");
1159 let audit_snapshot = self.generate_audit_data(entries)?;
1160 stats.audit_engagement_count = audit_snapshot.engagements.len();
1161 stats.audit_workpaper_count = audit_snapshot.workpapers.len();
1162 stats.audit_evidence_count = audit_snapshot.evidence.len();
1163 stats.audit_risk_count = audit_snapshot.risk_assessments.len();
1164 stats.audit_finding_count = audit_snapshot.findings.len();
1165 stats.audit_judgment_count = audit_snapshot.judgments.len();
1166 info!(
1167 "Audit data generated: {} engagements, {} workpapers, {} evidence, {} risks, {} findings, {} judgments",
1168 stats.audit_engagement_count, stats.audit_workpaper_count,
1169 stats.audit_evidence_count, stats.audit_risk_count,
1170 stats.audit_finding_count, stats.audit_judgment_count
1171 );
1172 self.check_resources_with_log("post-audit")?;
1173 Ok(audit_snapshot)
1174 } else {
1175 debug!("Phase 8: Skipped (audit generation disabled)");
1176 Ok(AuditSnapshot::default())
1177 }
1178 }
1179
1180 fn phase_banking_data(
1182 &mut self,
1183 stats: &mut EnhancedGenerationStatistics,
1184 ) -> SynthResult<BankingSnapshot> {
1185 if self.phase_config.generate_banking && self.config.banking.enabled {
1186 info!("Phase 9: Generating Banking KYC/AML Data");
1187 let banking_snapshot = self.generate_banking_data()?;
1188 stats.banking_customer_count = banking_snapshot.customers.len();
1189 stats.banking_account_count = banking_snapshot.accounts.len();
1190 stats.banking_transaction_count = banking_snapshot.transactions.len();
1191 stats.banking_suspicious_count = banking_snapshot.suspicious_count;
1192 info!(
1193 "Banking data generated: {} customers, {} accounts, {} transactions ({} suspicious)",
1194 stats.banking_customer_count, stats.banking_account_count,
1195 stats.banking_transaction_count, stats.banking_suspicious_count
1196 );
1197 self.check_resources_with_log("post-banking")?;
1198 Ok(banking_snapshot)
1199 } else {
1200 debug!("Phase 9: Skipped (banking generation disabled)");
1201 Ok(BankingSnapshot::default())
1202 }
1203 }
1204
1205 fn phase_graph_export(
1207 &mut self,
1208 entries: &[JournalEntry],
1209 coa: &Arc<ChartOfAccounts>,
1210 stats: &mut EnhancedGenerationStatistics,
1211 ) -> SynthResult<GraphExportSnapshot> {
1212 if (self.phase_config.generate_graph_export || self.config.graph_export.enabled)
1213 && !entries.is_empty()
1214 {
1215 info!("Phase 10: Exporting Accounting Network Graphs");
1216 match self.export_graphs(entries, coa, stats) {
1217 Ok(snapshot) => {
1218 info!(
1219 "Graph export complete: {} graphs ({} nodes, {} edges)",
1220 snapshot.graph_count, stats.graph_node_count, stats.graph_edge_count
1221 );
1222 Ok(snapshot)
1223 }
1224 Err(e) => {
1225 warn!("Phase 10: Graph export failed: {}", e);
1226 Ok(GraphExportSnapshot::default())
1227 }
1228 }
1229 } else {
1230 debug!("Phase 10: Skipped (graph export disabled or no entries)");
1231 Ok(GraphExportSnapshot::default())
1232 }
1233 }
1234
1235 fn phase_hypergraph_export(
1237 &self,
1238 coa: &Arc<ChartOfAccounts>,
1239 entries: &[JournalEntry],
1240 document_flows: &DocumentFlowSnapshot,
1241 stats: &mut EnhancedGenerationStatistics,
1242 ) -> SynthResult<()> {
1243 if self.config.graph_export.hypergraph.enabled && !entries.is_empty() {
1244 info!("Phase 10b: Exporting Multi-Layer Hypergraph");
1245 match self.export_hypergraph(coa, entries, document_flows, stats) {
1246 Ok(info) => {
1247 info!(
1248 "Hypergraph export complete: {} nodes, {} edges, {} hyperedges",
1249 info.node_count, info.edge_count, info.hyperedge_count
1250 );
1251 }
1252 Err(e) => {
1253 warn!("Phase 10b: Hypergraph export failed: {}", e);
1254 }
1255 }
1256 } else {
1257 debug!("Phase 10b: Skipped (hypergraph export disabled or no entries)");
1258 }
1259 Ok(())
1260 }
1261
1262 fn generate_coa(&mut self) -> SynthResult<Arc<ChartOfAccounts>> {
1264 let pb = self.create_progress_bar(1, "Generating Chart of Accounts");
1265
1266 let mut gen = ChartOfAccountsGenerator::new(
1267 self.config.chart_of_accounts.complexity,
1268 self.config.global.industry,
1269 self.seed,
1270 );
1271
1272 let coa = Arc::new(gen.generate());
1273 self.coa = Some(Arc::clone(&coa));
1274
1275 if let Some(pb) = pb {
1276 pb.finish_with_message("Chart of Accounts complete");
1277 }
1278
1279 Ok(coa)
1280 }
1281
1282 fn generate_master_data(&mut self) -> SynthResult<()> {
1284 let start_date = NaiveDate::parse_from_str(&self.config.global.start_date, "%Y-%m-%d")
1285 .map_err(|e| SynthError::config(format!("Invalid start_date: {}", e)))?;
1286 let end_date = start_date + chrono::Months::new(self.config.global.period_months);
1287
1288 let total = self.config.companies.len() as u64 * 5; let pb = self.create_progress_bar(total, "Generating Master Data");
1290
1291 for (i, company) in self.config.companies.iter().enumerate() {
1292 let company_seed = self.seed.wrapping_add(i as u64 * 1000);
1293
1294 let mut vendor_gen = VendorGenerator::new(company_seed);
1296 let vendor_pool = vendor_gen.generate_vendor_pool(
1297 self.phase_config.vendors_per_company,
1298 &company.code,
1299 start_date,
1300 );
1301 self.master_data.vendors.extend(vendor_pool.vendors);
1302 if let Some(pb) = &pb {
1303 pb.inc(1);
1304 }
1305
1306 let mut customer_gen = CustomerGenerator::new(company_seed + 100);
1308 let customer_pool = customer_gen.generate_customer_pool(
1309 self.phase_config.customers_per_company,
1310 &company.code,
1311 start_date,
1312 );
1313 self.master_data.customers.extend(customer_pool.customers);
1314 if let Some(pb) = &pb {
1315 pb.inc(1);
1316 }
1317
1318 let mut material_gen = MaterialGenerator::new(company_seed + 200);
1320 let material_pool = material_gen.generate_material_pool(
1321 self.phase_config.materials_per_company,
1322 &company.code,
1323 start_date,
1324 );
1325 self.master_data.materials.extend(material_pool.materials);
1326 if let Some(pb) = &pb {
1327 pb.inc(1);
1328 }
1329
1330 let mut asset_gen = AssetGenerator::new(company_seed + 300);
1332 let asset_pool = asset_gen.generate_asset_pool(
1333 self.phase_config.assets_per_company,
1334 &company.code,
1335 (start_date, end_date),
1336 );
1337 self.master_data.assets.extend(asset_pool.assets);
1338 if let Some(pb) = &pb {
1339 pb.inc(1);
1340 }
1341
1342 let mut employee_gen = EmployeeGenerator::new(company_seed + 400);
1344 let employee_pool =
1345 employee_gen.generate_company_pool(&company.code, (start_date, end_date));
1346 self.master_data.employees.extend(employee_pool.employees);
1347 if let Some(pb) = &pb {
1348 pb.inc(1);
1349 }
1350 }
1351
1352 if let Some(pb) = pb {
1353 pb.finish_with_message("Master data generation complete");
1354 }
1355
1356 Ok(())
1357 }
1358
1359 fn generate_document_flows(&mut self, flows: &mut DocumentFlowSnapshot) -> SynthResult<()> {
1361 let start_date = NaiveDate::parse_from_str(&self.config.global.start_date, "%Y-%m-%d")
1362 .map_err(|e| SynthError::config(format!("Invalid start_date: {}", e)))?;
1363
1364 let p2p_count = self
1366 .phase_config
1367 .p2p_chains
1368 .min(self.master_data.vendors.len() * 2);
1369 let pb = self.create_progress_bar(p2p_count as u64, "Generating P2P Document Flows");
1370
1371 let p2p_config = convert_p2p_config(&self.config.document_flows.p2p);
1373 let mut p2p_gen = P2PGenerator::with_config(self.seed + 1000, p2p_config);
1374
1375 for i in 0..p2p_count {
1376 let vendor = &self.master_data.vendors[i % self.master_data.vendors.len()];
1377 let materials: Vec<&Material> = self
1378 .master_data
1379 .materials
1380 .iter()
1381 .skip(i % self.master_data.materials.len().max(1))
1382 .take(2.min(self.master_data.materials.len()))
1383 .collect();
1384
1385 if materials.is_empty() {
1386 continue;
1387 }
1388
1389 let company = &self.config.companies[i % self.config.companies.len()];
1390 let po_date = start_date + chrono::Duration::days((i * 3) as i64 % 365);
1391 let fiscal_period = po_date.month() as u8;
1392 let created_by = self
1393 .master_data
1394 .employees
1395 .first()
1396 .map(|e| e.user_id.as_str())
1397 .unwrap_or("SYSTEM");
1398
1399 let chain = p2p_gen.generate_chain(
1400 &company.code,
1401 vendor,
1402 &materials,
1403 po_date,
1404 start_date.year() as u16,
1405 fiscal_period,
1406 created_by,
1407 );
1408
1409 flows.purchase_orders.push(chain.purchase_order.clone());
1411 flows.goods_receipts.extend(chain.goods_receipts.clone());
1412 if let Some(vi) = &chain.vendor_invoice {
1413 flows.vendor_invoices.push(vi.clone());
1414 }
1415 if let Some(payment) = &chain.payment {
1416 flows.payments.push(payment.clone());
1417 }
1418 flows.p2p_chains.push(chain);
1419
1420 if let Some(pb) = &pb {
1421 pb.inc(1);
1422 }
1423 }
1424
1425 if let Some(pb) = pb {
1426 pb.finish_with_message("P2P document flows complete");
1427 }
1428
1429 let o2c_count = self
1431 .phase_config
1432 .o2c_chains
1433 .min(self.master_data.customers.len() * 2);
1434 let pb = self.create_progress_bar(o2c_count as u64, "Generating O2C Document Flows");
1435
1436 let o2c_config = convert_o2c_config(&self.config.document_flows.o2c);
1438 let mut o2c_gen = O2CGenerator::with_config(self.seed + 2000, o2c_config);
1439
1440 for i in 0..o2c_count {
1441 let customer = &self.master_data.customers[i % self.master_data.customers.len()];
1442 let materials: Vec<&Material> = self
1443 .master_data
1444 .materials
1445 .iter()
1446 .skip(i % self.master_data.materials.len().max(1))
1447 .take(2.min(self.master_data.materials.len()))
1448 .collect();
1449
1450 if materials.is_empty() {
1451 continue;
1452 }
1453
1454 let company = &self.config.companies[i % self.config.companies.len()];
1455 let so_date = start_date + chrono::Duration::days((i * 2) as i64 % 365);
1456 let fiscal_period = so_date.month() as u8;
1457 let created_by = self
1458 .master_data
1459 .employees
1460 .first()
1461 .map(|e| e.user_id.as_str())
1462 .unwrap_or("SYSTEM");
1463
1464 let chain = o2c_gen.generate_chain(
1465 &company.code,
1466 customer,
1467 &materials,
1468 so_date,
1469 start_date.year() as u16,
1470 fiscal_period,
1471 created_by,
1472 );
1473
1474 flows.sales_orders.push(chain.sales_order.clone());
1476 flows.deliveries.extend(chain.deliveries.clone());
1477 if let Some(ci) = &chain.customer_invoice {
1478 flows.customer_invoices.push(ci.clone());
1479 }
1480 if let Some(receipt) = &chain.customer_receipt {
1481 flows.payments.push(receipt.clone());
1482 }
1483 flows.o2c_chains.push(chain);
1484
1485 if let Some(pb) = &pb {
1486 pb.inc(1);
1487 }
1488 }
1489
1490 if let Some(pb) = pb {
1491 pb.finish_with_message("O2C document flows complete");
1492 }
1493
1494 Ok(())
1495 }
1496
1497 fn generate_journal_entries(
1499 &mut self,
1500 coa: &Arc<ChartOfAccounts>,
1501 ) -> SynthResult<Vec<JournalEntry>> {
1502 let total = self.calculate_total_transactions();
1503 let pb = self.create_progress_bar(total, "Generating Journal Entries");
1504
1505 let start_date = NaiveDate::parse_from_str(&self.config.global.start_date, "%Y-%m-%d")
1506 .map_err(|e| SynthError::config(format!("Invalid start_date: {}", e)))?;
1507 let end_date = start_date + chrono::Months::new(self.config.global.period_months);
1508
1509 let company_codes: Vec<String> = self
1510 .config
1511 .companies
1512 .iter()
1513 .map(|c| c.code.clone())
1514 .collect();
1515
1516 let generator = JournalEntryGenerator::new_with_params(
1517 self.config.transactions.clone(),
1518 Arc::clone(coa),
1519 company_codes,
1520 start_date,
1521 end_date,
1522 self.seed,
1523 );
1524
1525 let mut generator = generator
1529 .with_master_data(
1530 &self.master_data.vendors,
1531 &self.master_data.customers,
1532 &self.master_data.materials,
1533 )
1534 .with_persona_errors(true)
1535 .with_fraud_config(self.config.fraud.clone());
1536
1537 if self.config.temporal.enabled {
1539 let drift_config = self.config.temporal.to_core_config();
1540 generator = generator.with_drift_config(drift_config, self.seed + 100);
1541 }
1542
1543 let mut entries = Vec::with_capacity(total as usize);
1544
1545 self.check_memory_limit()?;
1547
1548 const MEMORY_CHECK_INTERVAL: u64 = 1000;
1550
1551 for i in 0..total {
1552 let entry = generator.generate();
1553 entries.push(entry);
1554 if let Some(pb) = &pb {
1555 pb.inc(1);
1556 }
1557
1558 if (i + 1) % MEMORY_CHECK_INTERVAL == 0 {
1560 self.check_memory_limit()?;
1561 }
1562 }
1563
1564 if let Some(pb) = pb {
1565 pb.finish_with_message("Journal entries complete");
1566 }
1567
1568 Ok(entries)
1569 }
1570
1571 fn generate_jes_from_document_flows(
1576 &mut self,
1577 flows: &DocumentFlowSnapshot,
1578 ) -> SynthResult<Vec<JournalEntry>> {
1579 let total_chains = flows.p2p_chains.len() + flows.o2c_chains.len();
1580 let pb = self.create_progress_bar(total_chains as u64, "Generating Document Flow JEs");
1581
1582 let mut generator = DocumentFlowJeGenerator::with_config_and_seed(
1583 DocumentFlowJeConfig::default(),
1584 self.seed,
1585 );
1586 let mut entries = Vec::new();
1587
1588 for chain in &flows.p2p_chains {
1590 let chain_entries = generator.generate_from_p2p_chain(chain);
1591 entries.extend(chain_entries);
1592 if let Some(pb) = &pb {
1593 pb.inc(1);
1594 }
1595 }
1596
1597 for chain in &flows.o2c_chains {
1599 let chain_entries = generator.generate_from_o2c_chain(chain);
1600 entries.extend(chain_entries);
1601 if let Some(pb) = &pb {
1602 pb.inc(1);
1603 }
1604 }
1605
1606 if let Some(pb) = pb {
1607 pb.finish_with_message(format!(
1608 "Generated {} JEs from document flows",
1609 entries.len()
1610 ));
1611 }
1612
1613 Ok(entries)
1614 }
1615
1616 fn link_document_flows_to_subledgers(
1621 &mut self,
1622 flows: &DocumentFlowSnapshot,
1623 ) -> SynthResult<SubledgerSnapshot> {
1624 let total = flows.vendor_invoices.len() + flows.customer_invoices.len();
1625 let pb = self.create_progress_bar(total as u64, "Linking Subledgers");
1626
1627 let mut linker = DocumentFlowLinker::new();
1628
1629 let ap_invoices = linker.batch_create_ap_invoices(&flows.vendor_invoices);
1631 if let Some(pb) = &pb {
1632 pb.inc(flows.vendor_invoices.len() as u64);
1633 }
1634
1635 let ar_invoices = linker.batch_create_ar_invoices(&flows.customer_invoices);
1637 if let Some(pb) = &pb {
1638 pb.inc(flows.customer_invoices.len() as u64);
1639 }
1640
1641 if let Some(pb) = pb {
1642 pb.finish_with_message(format!(
1643 "Linked {} AP and {} AR invoices",
1644 ap_invoices.len(),
1645 ar_invoices.len()
1646 ));
1647 }
1648
1649 Ok(SubledgerSnapshot {
1650 ap_invoices,
1651 ar_invoices,
1652 })
1653 }
1654
1655 fn generate_ocpm_events(&mut self, flows: &DocumentFlowSnapshot) -> SynthResult<OcpmSnapshot> {
1660 let total_chains = flows.p2p_chains.len() + flows.o2c_chains.len();
1661 let pb = self.create_progress_bar(total_chains as u64, "Generating OCPM Events");
1662
1663 let metadata = EventLogMetadata::new("SyntheticData OCPM Log");
1665 let mut event_log = OcpmEventLog::with_metadata(metadata).with_standard_types();
1666
1667 let ocpm_config = OcpmGeneratorConfig {
1669 generate_p2p: true,
1670 generate_o2c: true,
1671 happy_path_rate: 0.75,
1672 exception_path_rate: 0.20,
1673 error_path_rate: 0.05,
1674 add_duration_variability: true,
1675 duration_std_dev_factor: 0.3,
1676 };
1677 let mut ocpm_gen = OcpmEventGenerator::with_config(self.seed + 3000, ocpm_config);
1678
1679 let available_users: Vec<String> = self
1681 .master_data
1682 .employees
1683 .iter()
1684 .take(20)
1685 .map(|e| e.user_id.clone())
1686 .collect();
1687
1688 for chain in &flows.p2p_chains {
1690 let po = &chain.purchase_order;
1691 let documents = P2pDocuments::new(
1692 &po.header.document_id,
1693 &po.vendor_id,
1694 &po.header.company_code,
1695 po.total_net_amount,
1696 &po.header.currency,
1697 )
1698 .with_goods_receipt(
1699 chain
1700 .goods_receipts
1701 .first()
1702 .map(|gr| gr.header.document_id.as_str())
1703 .unwrap_or(""),
1704 )
1705 .with_invoice(
1706 chain
1707 .vendor_invoice
1708 .as_ref()
1709 .map(|vi| vi.header.document_id.as_str())
1710 .unwrap_or(""),
1711 )
1712 .with_payment(
1713 chain
1714 .payment
1715 .as_ref()
1716 .map(|p| p.header.document_id.as_str())
1717 .unwrap_or(""),
1718 );
1719
1720 let start_time =
1721 chrono::DateTime::from_naive_utc_and_offset(po.header.entry_timestamp, chrono::Utc);
1722 let result = ocpm_gen.generate_p2p_case(&documents, start_time, &available_users);
1723
1724 for event in result.events {
1726 event_log.add_event(event);
1727 }
1728 for object in result.objects {
1729 event_log.add_object(object);
1730 }
1731 for relationship in result.relationships {
1732 event_log.add_relationship(relationship);
1733 }
1734 event_log.add_case(result.case_trace);
1735
1736 if let Some(pb) = &pb {
1737 pb.inc(1);
1738 }
1739 }
1740
1741 for chain in &flows.o2c_chains {
1743 let so = &chain.sales_order;
1744 let documents = O2cDocuments::new(
1745 &so.header.document_id,
1746 &so.customer_id,
1747 &so.header.company_code,
1748 so.total_net_amount,
1749 &so.header.currency,
1750 )
1751 .with_delivery(
1752 chain
1753 .deliveries
1754 .first()
1755 .map(|d| d.header.document_id.as_str())
1756 .unwrap_or(""),
1757 )
1758 .with_invoice(
1759 chain
1760 .customer_invoice
1761 .as_ref()
1762 .map(|ci| ci.header.document_id.as_str())
1763 .unwrap_or(""),
1764 )
1765 .with_receipt(
1766 chain
1767 .customer_receipt
1768 .as_ref()
1769 .map(|r| r.header.document_id.as_str())
1770 .unwrap_or(""),
1771 );
1772
1773 let start_time =
1774 chrono::DateTime::from_naive_utc_and_offset(so.header.entry_timestamp, chrono::Utc);
1775 let result = ocpm_gen.generate_o2c_case(&documents, start_time, &available_users);
1776
1777 for event in result.events {
1779 event_log.add_event(event);
1780 }
1781 for object in result.objects {
1782 event_log.add_object(object);
1783 }
1784 for relationship in result.relationships {
1785 event_log.add_relationship(relationship);
1786 }
1787 event_log.add_case(result.case_trace);
1788
1789 if let Some(pb) = &pb {
1790 pb.inc(1);
1791 }
1792 }
1793
1794 event_log.compute_variants();
1796
1797 let summary = event_log.summary();
1798
1799 if let Some(pb) = pb {
1800 pb.finish_with_message(format!(
1801 "Generated {} OCPM events, {} objects",
1802 summary.event_count, summary.object_count
1803 ));
1804 }
1805
1806 Ok(OcpmSnapshot {
1807 event_count: summary.event_count,
1808 object_count: summary.object_count,
1809 case_count: summary.case_count,
1810 event_log: Some(event_log),
1811 })
1812 }
1813
1814 fn inject_anomalies(&mut self, entries: &mut [JournalEntry]) -> SynthResult<AnomalyLabels> {
1816 let pb = self.create_progress_bar(entries.len() as u64, "Injecting Anomalies");
1817
1818 let anomaly_config = AnomalyInjectorConfig {
1819 rates: AnomalyRateConfig {
1820 total_rate: 0.02,
1821 ..Default::default()
1822 },
1823 seed: self.seed + 5000,
1824 ..Default::default()
1825 };
1826
1827 let mut injector = AnomalyInjector::new(anomaly_config);
1828 let result = injector.process_entries(entries);
1829
1830 if let Some(pb) = &pb {
1831 pb.inc(entries.len() as u64);
1832 pb.finish_with_message("Anomaly injection complete");
1833 }
1834
1835 let mut by_type = HashMap::new();
1836 for label in &result.labels {
1837 *by_type
1838 .entry(format!("{:?}", label.anomaly_type))
1839 .or_insert(0) += 1;
1840 }
1841
1842 Ok(AnomalyLabels {
1843 labels: result.labels,
1844 summary: Some(result.summary),
1845 by_type,
1846 })
1847 }
1848
1849 fn validate_journal_entries(
1858 &mut self,
1859 entries: &[JournalEntry],
1860 ) -> SynthResult<BalanceValidationResult> {
1861 let clean_entries: Vec<&JournalEntry> = entries
1863 .iter()
1864 .filter(|e| {
1865 e.header
1866 .header_text
1867 .as_ref()
1868 .map(|t| !t.contains("[HUMAN_ERROR:"))
1869 .unwrap_or(true)
1870 })
1871 .collect();
1872
1873 let pb = self.create_progress_bar(clean_entries.len() as u64, "Validating Balances");
1874
1875 let config = BalanceTrackerConfig {
1877 validate_on_each_entry: false, track_history: false, fail_on_validation_error: false, ..Default::default()
1881 };
1882
1883 let mut tracker = RunningBalanceTracker::new(config);
1884
1885 let clean_refs: Vec<JournalEntry> = clean_entries.into_iter().cloned().collect();
1887 let errors = tracker.apply_entries(&clean_refs);
1888
1889 if let Some(pb) = &pb {
1890 pb.inc(entries.len() as u64);
1891 }
1892
1893 let has_unbalanced = tracker
1896 .get_validation_errors()
1897 .iter()
1898 .any(|e| e.error_type == datasynth_generators::ValidationErrorType::UnbalancedEntry);
1899
1900 let mut all_errors = errors;
1903 all_errors.extend(tracker.get_validation_errors().iter().cloned());
1904 let company_codes: Vec<String> = self
1905 .config
1906 .companies
1907 .iter()
1908 .map(|c| c.code.clone())
1909 .collect();
1910
1911 let end_date = NaiveDate::parse_from_str(&self.config.global.start_date, "%Y-%m-%d")
1912 .map(|d| d + chrono::Months::new(self.config.global.period_months))
1913 .unwrap_or_else(|_| chrono::Local::now().date_naive());
1914
1915 for company_code in &company_codes {
1916 if let Err(e) = tracker.validate_balance_sheet(company_code, end_date, None) {
1917 all_errors.push(e);
1918 }
1919 }
1920
1921 let stats = tracker.get_statistics();
1923
1924 let is_balanced = all_errors.is_empty();
1926
1927 if let Some(pb) = pb {
1928 let msg = if is_balanced {
1929 "Balance validation passed"
1930 } else {
1931 "Balance validation completed with errors"
1932 };
1933 pb.finish_with_message(msg);
1934 }
1935
1936 Ok(BalanceValidationResult {
1937 validated: true,
1938 is_balanced,
1939 entries_processed: stats.entries_processed,
1940 total_debits: stats.total_debits,
1941 total_credits: stats.total_credits,
1942 accounts_tracked: stats.accounts_tracked,
1943 companies_tracked: stats.companies_tracked,
1944 validation_errors: all_errors,
1945 has_unbalanced_entries: has_unbalanced,
1946 })
1947 }
1948
1949 fn inject_data_quality(
1954 &mut self,
1955 entries: &mut [JournalEntry],
1956 ) -> SynthResult<DataQualityStats> {
1957 let pb = self.create_progress_bar(entries.len() as u64, "Injecting Data Quality Issues");
1958
1959 let config = DataQualityConfig::minimal();
1961 let mut injector = DataQualityInjector::new(config);
1962
1963 let context = HashMap::new();
1965
1966 for entry in entries.iter_mut() {
1967 if let Some(text) = &entry.header.header_text {
1969 let processed = injector.process_text_field(
1970 "header_text",
1971 text,
1972 &entry.header.document_id.to_string(),
1973 &context,
1974 );
1975 match processed {
1976 Some(new_text) if new_text != *text => {
1977 entry.header.header_text = Some(new_text);
1978 }
1979 None => {
1980 entry.header.header_text = None; }
1982 _ => {}
1983 }
1984 }
1985
1986 if let Some(ref_text) = &entry.header.reference {
1988 let processed = injector.process_text_field(
1989 "reference",
1990 ref_text,
1991 &entry.header.document_id.to_string(),
1992 &context,
1993 );
1994 match processed {
1995 Some(new_text) if new_text != *ref_text => {
1996 entry.header.reference = Some(new_text);
1997 }
1998 None => {
1999 entry.header.reference = None;
2000 }
2001 _ => {}
2002 }
2003 }
2004
2005 let user_persona = entry.header.user_persona.clone();
2007 if let Some(processed) = injector.process_text_field(
2008 "user_persona",
2009 &user_persona,
2010 &entry.header.document_id.to_string(),
2011 &context,
2012 ) {
2013 if processed != user_persona {
2014 entry.header.user_persona = processed;
2015 }
2016 }
2017
2018 for line in &mut entry.lines {
2020 if let Some(ref text) = line.line_text {
2022 let processed = injector.process_text_field(
2023 "line_text",
2024 text,
2025 &entry.header.document_id.to_string(),
2026 &context,
2027 );
2028 match processed {
2029 Some(new_text) if new_text != *text => {
2030 line.line_text = Some(new_text);
2031 }
2032 None => {
2033 line.line_text = None;
2034 }
2035 _ => {}
2036 }
2037 }
2038
2039 if let Some(cc) = &line.cost_center {
2041 let processed = injector.process_text_field(
2042 "cost_center",
2043 cc,
2044 &entry.header.document_id.to_string(),
2045 &context,
2046 );
2047 match processed {
2048 Some(new_cc) if new_cc != *cc => {
2049 line.cost_center = Some(new_cc);
2050 }
2051 None => {
2052 line.cost_center = None;
2053 }
2054 _ => {}
2055 }
2056 }
2057 }
2058
2059 if let Some(pb) = &pb {
2060 pb.inc(1);
2061 }
2062 }
2063
2064 if let Some(pb) = pb {
2065 pb.finish_with_message("Data quality injection complete");
2066 }
2067
2068 Ok(injector.stats().clone())
2069 }
2070
2071 fn generate_audit_data(&mut self, entries: &[JournalEntry]) -> SynthResult<AuditSnapshot> {
2082 let start_date = NaiveDate::parse_from_str(&self.config.global.start_date, "%Y-%m-%d")
2083 .map_err(|e| SynthError::config(format!("Invalid start_date: {}", e)))?;
2084 let fiscal_year = start_date.year() as u16;
2085 let period_end = start_date + chrono::Months::new(self.config.global.period_months);
2086
2087 let total_revenue: rust_decimal::Decimal = entries
2089 .iter()
2090 .flat_map(|e| e.lines.iter())
2091 .filter(|l| l.credit_amount > rust_decimal::Decimal::ZERO)
2092 .map(|l| l.credit_amount)
2093 .sum();
2094
2095 let total_items = (self.phase_config.audit_engagements * 50) as u64; let pb = self.create_progress_bar(total_items, "Generating Audit Data");
2097
2098 let mut snapshot = AuditSnapshot::default();
2099
2100 let mut engagement_gen = AuditEngagementGenerator::new(self.seed + 7000);
2102 let mut workpaper_gen = WorkpaperGenerator::new(self.seed + 7100);
2103 let mut evidence_gen = EvidenceGenerator::new(self.seed + 7200);
2104 let mut risk_gen = RiskAssessmentGenerator::new(self.seed + 7300);
2105 let mut finding_gen = FindingGenerator::new(self.seed + 7400);
2106 let mut judgment_gen = JudgmentGenerator::new(self.seed + 7500);
2107
2108 let accounts: Vec<String> = self
2110 .coa
2111 .as_ref()
2112 .map(|coa| {
2113 coa.get_postable_accounts()
2114 .iter()
2115 .map(|acc| acc.account_code().to_string())
2116 .collect()
2117 })
2118 .unwrap_or_default();
2119
2120 for (i, company) in self.config.companies.iter().enumerate() {
2122 let company_revenue = total_revenue
2124 * rust_decimal::Decimal::try_from(company.volume_weight).unwrap_or_default();
2125
2126 let engagements_for_company =
2128 self.phase_config.audit_engagements / self.config.companies.len().max(1);
2129 let extra = if i < self.phase_config.audit_engagements % self.config.companies.len() {
2130 1
2131 } else {
2132 0
2133 };
2134
2135 for _eng_idx in 0..(engagements_for_company + extra) {
2136 let engagement = engagement_gen.generate_engagement(
2138 &company.code,
2139 &company.name,
2140 fiscal_year,
2141 period_end,
2142 company_revenue,
2143 None, );
2145
2146 if let Some(pb) = &pb {
2147 pb.inc(1);
2148 }
2149
2150 let team_members: Vec<String> = engagement.team_member_ids.clone();
2152
2153 let workpapers =
2155 workpaper_gen.generate_complete_workpaper_set(&engagement, &team_members);
2156
2157 for wp in &workpapers {
2158 if let Some(pb) = &pb {
2159 pb.inc(1);
2160 }
2161
2162 let evidence = evidence_gen.generate_evidence_for_workpaper(
2164 wp,
2165 &team_members,
2166 wp.preparer_date,
2167 );
2168
2169 for _ in &evidence {
2170 if let Some(pb) = &pb {
2171 pb.inc(1);
2172 }
2173 }
2174
2175 snapshot.evidence.extend(evidence);
2176 }
2177
2178 let risks =
2180 risk_gen.generate_risks_for_engagement(&engagement, &team_members, &accounts);
2181
2182 for _ in &risks {
2183 if let Some(pb) = &pb {
2184 pb.inc(1);
2185 }
2186 }
2187 snapshot.risk_assessments.extend(risks);
2188
2189 let findings = finding_gen.generate_findings_for_engagement(
2191 &engagement,
2192 &workpapers,
2193 &team_members,
2194 );
2195
2196 for _ in &findings {
2197 if let Some(pb) = &pb {
2198 pb.inc(1);
2199 }
2200 }
2201 snapshot.findings.extend(findings);
2202
2203 let judgments =
2205 judgment_gen.generate_judgments_for_engagement(&engagement, &team_members);
2206
2207 for _ in &judgments {
2208 if let Some(pb) = &pb {
2209 pb.inc(1);
2210 }
2211 }
2212 snapshot.judgments.extend(judgments);
2213
2214 snapshot.workpapers.extend(workpapers);
2216 snapshot.engagements.push(engagement);
2217 }
2218 }
2219
2220 if let Some(pb) = pb {
2221 pb.finish_with_message(format!(
2222 "Audit data: {} engagements, {} workpapers, {} evidence",
2223 snapshot.engagements.len(),
2224 snapshot.workpapers.len(),
2225 snapshot.evidence.len()
2226 ));
2227 }
2228
2229 Ok(snapshot)
2230 }
2231
2232 fn export_graphs(
2239 &mut self,
2240 entries: &[JournalEntry],
2241 _coa: &Arc<ChartOfAccounts>,
2242 stats: &mut EnhancedGenerationStatistics,
2243 ) -> SynthResult<GraphExportSnapshot> {
2244 let pb = self.create_progress_bar(100, "Exporting Graphs");
2245
2246 let mut snapshot = GraphExportSnapshot::default();
2247
2248 let output_dir = self
2250 .output_path
2251 .clone()
2252 .unwrap_or_else(|| PathBuf::from(&self.config.output.output_directory));
2253 let graph_dir = output_dir.join(&self.config.graph_export.output_subdirectory);
2254
2255 for graph_type in &self.config.graph_export.graph_types {
2257 if let Some(pb) = &pb {
2258 pb.inc(10);
2259 }
2260
2261 let graph_config = TransactionGraphConfig {
2263 include_vendors: false,
2264 include_customers: false,
2265 create_debit_credit_edges: true,
2266 include_document_nodes: graph_type.include_document_nodes,
2267 min_edge_weight: graph_type.min_edge_weight,
2268 aggregate_parallel_edges: graph_type.aggregate_edges,
2269 };
2270
2271 let mut builder = TransactionGraphBuilder::new(graph_config);
2272 builder.add_journal_entries(entries);
2273 let graph = builder.build();
2274
2275 stats.graph_node_count += graph.node_count();
2277 stats.graph_edge_count += graph.edge_count();
2278
2279 if let Some(pb) = &pb {
2280 pb.inc(40);
2281 }
2282
2283 for format in &self.config.graph_export.formats {
2285 let format_dir = graph_dir.join(&graph_type.name).join(format_name(*format));
2286
2287 if let Err(e) = std::fs::create_dir_all(&format_dir) {
2289 warn!("Failed to create graph output directory: {}", e);
2290 continue;
2291 }
2292
2293 match format {
2294 datasynth_config::schema::GraphExportFormat::PytorchGeometric => {
2295 let pyg_config = PyGExportConfig {
2296 common: datasynth_graph::CommonExportConfig {
2297 export_node_features: true,
2298 export_edge_features: true,
2299 export_node_labels: true,
2300 export_edge_labels: true,
2301 export_masks: true,
2302 train_ratio: self.config.graph_export.train_ratio,
2303 val_ratio: self.config.graph_export.validation_ratio,
2304 seed: self.config.graph_export.split_seed.unwrap_or(self.seed),
2305 },
2306 one_hot_categoricals: false,
2307 };
2308
2309 let exporter = PyGExporter::new(pyg_config);
2310 match exporter.export(&graph, &format_dir) {
2311 Ok(metadata) => {
2312 snapshot.exports.insert(
2313 format!("{}_{}", graph_type.name, "pytorch_geometric"),
2314 GraphExportInfo {
2315 name: graph_type.name.clone(),
2316 format: "pytorch_geometric".to_string(),
2317 output_path: format_dir.clone(),
2318 node_count: metadata.num_nodes,
2319 edge_count: metadata.num_edges,
2320 },
2321 );
2322 snapshot.graph_count += 1;
2323 }
2324 Err(e) => {
2325 warn!("Failed to export PyTorch Geometric graph: {}", e);
2326 }
2327 }
2328 }
2329 datasynth_config::schema::GraphExportFormat::Neo4j => {
2330 debug!("Neo4j export not yet implemented for accounting networks");
2332 }
2333 datasynth_config::schema::GraphExportFormat::Dgl => {
2334 debug!("DGL export not yet implemented for accounting networks");
2336 }
2337 datasynth_config::schema::GraphExportFormat::RustGraph => {
2338 use datasynth_graph::{
2339 RustGraphExportConfig, RustGraphExporter, RustGraphOutputFormat,
2340 };
2341
2342 let rustgraph_config = RustGraphExportConfig {
2343 include_features: true,
2344 include_temporal: true,
2345 include_labels: true,
2346 source_name: "datasynth".to_string(),
2347 batch_id: None,
2348 output_format: RustGraphOutputFormat::JsonLines,
2349 export_node_properties: true,
2350 export_edge_properties: true,
2351 pretty_print: false,
2352 };
2353
2354 let exporter = RustGraphExporter::new(rustgraph_config);
2355 match exporter.export(&graph, &format_dir) {
2356 Ok(metadata) => {
2357 snapshot.exports.insert(
2358 format!("{}_{}", graph_type.name, "rustgraph"),
2359 GraphExportInfo {
2360 name: graph_type.name.clone(),
2361 format: "rustgraph".to_string(),
2362 output_path: format_dir.clone(),
2363 node_count: metadata.num_nodes,
2364 edge_count: metadata.num_edges,
2365 },
2366 );
2367 snapshot.graph_count += 1;
2368 }
2369 Err(e) => {
2370 warn!("Failed to export RustGraph: {}", e);
2371 }
2372 }
2373 }
2374 datasynth_config::schema::GraphExportFormat::RustGraphHypergraph => {
2375 debug!("RustGraphHypergraph format is handled in Phase 10b (hypergraph export)");
2377 }
2378 }
2379 }
2380
2381 if let Some(pb) = &pb {
2382 pb.inc(40);
2383 }
2384 }
2385
2386 stats.graph_export_count = snapshot.graph_count;
2387 snapshot.exported = snapshot.graph_count > 0;
2388
2389 if let Some(pb) = pb {
2390 pb.finish_with_message(format!(
2391 "Graphs exported: {} graphs ({} nodes, {} edges)",
2392 snapshot.graph_count, stats.graph_node_count, stats.graph_edge_count
2393 ));
2394 }
2395
2396 Ok(snapshot)
2397 }
2398
2399 fn export_hypergraph(
2406 &self,
2407 coa: &Arc<ChartOfAccounts>,
2408 entries: &[JournalEntry],
2409 document_flows: &DocumentFlowSnapshot,
2410 stats: &mut EnhancedGenerationStatistics,
2411 ) -> SynthResult<HypergraphExportInfo> {
2412 use datasynth_graph::builders::hypergraph::{HypergraphBuilder, HypergraphConfig};
2413 use datasynth_graph::exporters::hypergraph::{HypergraphExportConfig, HypergraphExporter};
2414 use datasynth_graph::models::hypergraph::AggregationStrategy;
2415
2416 let hg_settings = &self.config.graph_export.hypergraph;
2417
2418 let aggregation_strategy = match hg_settings.aggregation_strategy.as_str() {
2420 "truncate" => AggregationStrategy::Truncate,
2421 "pool_by_counterparty" => AggregationStrategy::PoolByCounterparty,
2422 "pool_by_time_period" => AggregationStrategy::PoolByTimePeriod,
2423 "importance_sample" => AggregationStrategy::ImportanceSample,
2424 _ => AggregationStrategy::PoolByCounterparty,
2425 };
2426
2427 let builder_config = HypergraphConfig {
2428 max_nodes: hg_settings.max_nodes,
2429 aggregation_strategy,
2430 include_coso: hg_settings.governance_layer.include_coso,
2431 include_controls: hg_settings.governance_layer.include_controls,
2432 include_sox: hg_settings.governance_layer.include_sox,
2433 include_vendors: hg_settings.governance_layer.include_vendors,
2434 include_customers: hg_settings.governance_layer.include_customers,
2435 include_employees: hg_settings.governance_layer.include_employees,
2436 include_p2p: hg_settings.process_layer.include_p2p,
2437 include_o2c: hg_settings.process_layer.include_o2c,
2438 events_as_hyperedges: hg_settings.process_layer.events_as_hyperedges,
2439 docs_per_counterparty_threshold: hg_settings
2440 .process_layer
2441 .docs_per_counterparty_threshold,
2442 include_accounts: hg_settings.accounting_layer.include_accounts,
2443 je_as_hyperedges: hg_settings.accounting_layer.je_as_hyperedges,
2444 include_cross_layer_edges: hg_settings.cross_layer.enabled,
2445 };
2446
2447 let mut builder = HypergraphBuilder::new(builder_config);
2448
2449 builder.add_coso_framework();
2451
2452 if hg_settings.governance_layer.include_controls && self.config.internal_controls.enabled {
2455 let controls = InternalControl::standard_controls();
2456 builder.add_controls(&controls);
2457 }
2458
2459 builder.add_vendors(&self.master_data.vendors);
2461 builder.add_customers(&self.master_data.customers);
2462 builder.add_employees(&self.master_data.employees);
2463
2464 builder.add_p2p_documents(
2466 &document_flows.purchase_orders,
2467 &document_flows.goods_receipts,
2468 &document_flows.vendor_invoices,
2469 &document_flows.payments,
2470 );
2471 builder.add_o2c_documents(
2472 &document_flows.sales_orders,
2473 &document_flows.deliveries,
2474 &document_flows.customer_invoices,
2475 );
2476
2477 builder.add_accounts(coa);
2479 builder.add_journal_entries_as_hyperedges(entries);
2480
2481 let hypergraph = builder.build();
2483
2484 let output_dir = self
2486 .output_path
2487 .clone()
2488 .unwrap_or_else(|| PathBuf::from(&self.config.output.output_directory));
2489 let hg_dir = output_dir
2490 .join(&self.config.graph_export.output_subdirectory)
2491 .join(&hg_settings.output_subdirectory);
2492
2493 let exporter = HypergraphExporter::new(HypergraphExportConfig::default());
2494 let metadata = exporter
2495 .export(&hypergraph, &hg_dir)
2496 .map_err(|e| SynthError::generation(format!("Hypergraph export failed: {}", e)))?;
2497
2498 stats.graph_node_count += metadata.num_nodes;
2500 stats.graph_edge_count += metadata.num_edges;
2501 stats.graph_export_count += 1;
2502
2503 Ok(HypergraphExportInfo {
2504 node_count: metadata.num_nodes,
2505 edge_count: metadata.num_edges,
2506 hyperedge_count: metadata.num_hyperedges,
2507 output_path: hg_dir,
2508 })
2509 }
2510
2511 fn generate_banking_data(&mut self) -> SynthResult<BankingSnapshot> {
2516 let pb = self.create_progress_bar(100, "Generating Banking Data");
2517
2518 let orchestrator = BankingOrchestratorBuilder::new()
2520 .config(self.config.banking.clone())
2521 .seed(self.seed + 9000)
2522 .build();
2523
2524 if let Some(pb) = &pb {
2525 pb.inc(10);
2526 }
2527
2528 let result = orchestrator.generate();
2530
2531 if let Some(pb) = &pb {
2532 pb.inc(90);
2533 pb.finish_with_message(format!(
2534 "Banking: {} customers, {} transactions",
2535 result.customers.len(),
2536 result.transactions.len()
2537 ));
2538 }
2539
2540 Ok(BankingSnapshot {
2541 customers: result.customers,
2542 accounts: result.accounts,
2543 transactions: result.transactions,
2544 suspicious_count: result.stats.suspicious_count,
2545 scenario_count: result.scenarios.len(),
2546 })
2547 }
2548
2549 fn calculate_total_transactions(&self) -> u64 {
2551 let months = self.config.global.period_months as f64;
2552 self.config
2553 .companies
2554 .iter()
2555 .map(|c| {
2556 let annual = c.annual_transaction_volume.count() as f64;
2557 let weighted = annual * c.volume_weight;
2558 (weighted * months / 12.0) as u64
2559 })
2560 .sum()
2561 }
2562
2563 fn create_progress_bar(&self, total: u64, message: &str) -> Option<ProgressBar> {
2565 if !self.phase_config.show_progress {
2566 return None;
2567 }
2568
2569 let pb = if let Some(mp) = &self.multi_progress {
2570 mp.add(ProgressBar::new(total))
2571 } else {
2572 ProgressBar::new(total)
2573 };
2574
2575 pb.set_style(
2576 ProgressStyle::default_bar()
2577 .template(&format!(
2578 "{{spinner:.green}} {} [{{elapsed_precise}}] [{{bar:40.cyan/blue}}] {{pos}}/{{len}} ({{per_sec}})",
2579 message
2580 ))
2581 .expect("Progress bar template should be valid - uses only standard indicatif placeholders")
2582 .progress_chars("#>-"),
2583 );
2584
2585 Some(pb)
2586 }
2587
2588 pub fn get_coa(&self) -> Option<Arc<ChartOfAccounts>> {
2590 self.coa.clone()
2591 }
2592
2593 pub fn get_master_data(&self) -> &MasterDataSnapshot {
2595 &self.master_data
2596 }
2597}
2598
2599fn format_name(format: datasynth_config::schema::GraphExportFormat) -> &'static str {
2601 match format {
2602 datasynth_config::schema::GraphExportFormat::PytorchGeometric => "pytorch_geometric",
2603 datasynth_config::schema::GraphExportFormat::Neo4j => "neo4j",
2604 datasynth_config::schema::GraphExportFormat::Dgl => "dgl",
2605 datasynth_config::schema::GraphExportFormat::RustGraph => "rustgraph",
2606 datasynth_config::schema::GraphExportFormat::RustGraphHypergraph => "rustgraph_hypergraph",
2607 }
2608}
2609
2610#[cfg(test)]
2611mod tests {
2612 use super::*;
2613 use datasynth_config::schema::*;
2614
2615 fn create_test_config() -> GeneratorConfig {
2616 GeneratorConfig {
2617 global: GlobalConfig {
2618 industry: IndustrySector::Manufacturing,
2619 start_date: "2024-01-01".to_string(),
2620 period_months: 1,
2621 seed: Some(42),
2622 parallel: false,
2623 group_currency: "USD".to_string(),
2624 worker_threads: 0,
2625 memory_limit_mb: 0,
2626 },
2627 companies: vec![CompanyConfig {
2628 code: "1000".to_string(),
2629 name: "Test Company".to_string(),
2630 currency: "USD".to_string(),
2631 country: "US".to_string(),
2632 annual_transaction_volume: TransactionVolume::TenK,
2633 volume_weight: 1.0,
2634 fiscal_year_variant: "K4".to_string(),
2635 }],
2636 chart_of_accounts: ChartOfAccountsConfig {
2637 complexity: CoAComplexity::Small,
2638 industry_specific: true,
2639 custom_accounts: None,
2640 min_hierarchy_depth: 2,
2641 max_hierarchy_depth: 4,
2642 },
2643 transactions: TransactionConfig::default(),
2644 output: OutputConfig::default(),
2645 fraud: FraudConfig::default(),
2646 internal_controls: InternalControlsConfig::default(),
2647 business_processes: BusinessProcessConfig::default(),
2648 user_personas: UserPersonaConfig::default(),
2649 templates: TemplateConfig::default(),
2650 approval: ApprovalConfig::default(),
2651 departments: DepartmentConfig::default(),
2652 master_data: MasterDataConfig::default(),
2653 document_flows: DocumentFlowConfig::default(),
2654 intercompany: IntercompanyConfig::default(),
2655 balance: BalanceConfig::default(),
2656 ocpm: OcpmConfig::default(),
2657 audit: AuditGenerationConfig::default(),
2658 banking: datasynth_banking::BankingConfig::default(),
2659 data_quality: DataQualitySchemaConfig::default(),
2660 scenario: ScenarioConfig::default(),
2661 temporal: TemporalDriftConfig::default(),
2662 graph_export: GraphExportConfig::default(),
2663 streaming: StreamingSchemaConfig::default(),
2664 rate_limit: RateLimitSchemaConfig::default(),
2665 temporal_attributes: TemporalAttributeSchemaConfig::default(),
2666 relationships: RelationshipSchemaConfig::default(),
2667 accounting_standards: AccountingStandardsConfig::default(),
2668 audit_standards: AuditStandardsConfig::default(),
2669 distributions: Default::default(),
2670 temporal_patterns: Default::default(),
2671 vendor_network: VendorNetworkSchemaConfig::default(),
2672 customer_segmentation: CustomerSegmentationSchemaConfig::default(),
2673 relationship_strength: RelationshipStrengthSchemaConfig::default(),
2674 cross_process_links: CrossProcessLinksSchemaConfig::default(),
2675 organizational_events: OrganizationalEventsSchemaConfig::default(),
2676 behavioral_drift: BehavioralDriftSchemaConfig::default(),
2677 market_drift: MarketDriftSchemaConfig::default(),
2678 drift_labeling: DriftLabelingSchemaConfig::default(),
2679 anomaly_injection: Default::default(),
2680 industry_specific: Default::default(),
2681 }
2682 }
2683
2684 #[test]
2685 fn test_enhanced_orchestrator_creation() {
2686 let config = create_test_config();
2687 let orchestrator = EnhancedOrchestrator::with_defaults(config);
2688 assert!(orchestrator.is_ok());
2689 }
2690
2691 #[test]
2692 fn test_minimal_generation() {
2693 let config = create_test_config();
2694 let phase_config = PhaseConfig {
2695 generate_master_data: false,
2696 generate_document_flows: false,
2697 generate_journal_entries: true,
2698 inject_anomalies: false,
2699 show_progress: false,
2700 ..Default::default()
2701 };
2702
2703 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2704 let result = orchestrator.generate();
2705
2706 assert!(result.is_ok());
2707 let result = result.unwrap();
2708 assert!(!result.journal_entries.is_empty());
2709 }
2710
2711 #[test]
2712 fn test_master_data_generation() {
2713 let config = create_test_config();
2714 let phase_config = PhaseConfig {
2715 generate_master_data: true,
2716 generate_document_flows: false,
2717 generate_journal_entries: false,
2718 inject_anomalies: false,
2719 show_progress: false,
2720 vendors_per_company: 5,
2721 customers_per_company: 5,
2722 materials_per_company: 10,
2723 assets_per_company: 5,
2724 employees_per_company: 10,
2725 ..Default::default()
2726 };
2727
2728 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2729 let result = orchestrator.generate().unwrap();
2730
2731 assert!(!result.master_data.vendors.is_empty());
2732 assert!(!result.master_data.customers.is_empty());
2733 assert!(!result.master_data.materials.is_empty());
2734 }
2735
2736 #[test]
2737 fn test_document_flow_generation() {
2738 let config = create_test_config();
2739 let phase_config = PhaseConfig {
2740 generate_master_data: true,
2741 generate_document_flows: true,
2742 generate_journal_entries: false,
2743 inject_anomalies: false,
2744 inject_data_quality: false,
2745 validate_balances: false,
2746 generate_ocpm_events: false,
2747 show_progress: false,
2748 vendors_per_company: 5,
2749 customers_per_company: 5,
2750 materials_per_company: 10,
2751 assets_per_company: 5,
2752 employees_per_company: 10,
2753 p2p_chains: 5,
2754 o2c_chains: 5,
2755 ..Default::default()
2756 };
2757
2758 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2759 let result = orchestrator.generate().unwrap();
2760
2761 assert!(!result.document_flows.p2p_chains.is_empty());
2763 assert!(!result.document_flows.o2c_chains.is_empty());
2764
2765 assert!(!result.document_flows.purchase_orders.is_empty());
2767 assert!(!result.document_flows.sales_orders.is_empty());
2768 }
2769
2770 #[test]
2771 fn test_anomaly_injection() {
2772 let config = create_test_config();
2773 let phase_config = PhaseConfig {
2774 generate_master_data: false,
2775 generate_document_flows: false,
2776 generate_journal_entries: true,
2777 inject_anomalies: true,
2778 show_progress: false,
2779 ..Default::default()
2780 };
2781
2782 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2783 let result = orchestrator.generate().unwrap();
2784
2785 assert!(!result.journal_entries.is_empty());
2787
2788 assert!(result.anomaly_labels.summary.is_some());
2791 }
2792
2793 #[test]
2794 fn test_full_generation_pipeline() {
2795 let config = create_test_config();
2796 let phase_config = PhaseConfig {
2797 generate_master_data: true,
2798 generate_document_flows: true,
2799 generate_journal_entries: true,
2800 inject_anomalies: false,
2801 inject_data_quality: false,
2802 validate_balances: true,
2803 generate_ocpm_events: false,
2804 show_progress: false,
2805 vendors_per_company: 3,
2806 customers_per_company: 3,
2807 materials_per_company: 5,
2808 assets_per_company: 3,
2809 employees_per_company: 5,
2810 p2p_chains: 3,
2811 o2c_chains: 3,
2812 ..Default::default()
2813 };
2814
2815 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2816 let result = orchestrator.generate().unwrap();
2817
2818 assert!(!result.master_data.vendors.is_empty());
2820 assert!(!result.master_data.customers.is_empty());
2821 assert!(!result.document_flows.p2p_chains.is_empty());
2822 assert!(!result.document_flows.o2c_chains.is_empty());
2823 assert!(!result.journal_entries.is_empty());
2824 assert!(result.statistics.accounts_count > 0);
2825
2826 assert!(!result.subledger.ap_invoices.is_empty());
2828 assert!(!result.subledger.ar_invoices.is_empty());
2829
2830 assert!(result.balance_validation.validated);
2832 assert!(result.balance_validation.entries_processed > 0);
2833 }
2834
2835 #[test]
2836 fn test_subledger_linking() {
2837 let config = create_test_config();
2838 let phase_config = PhaseConfig {
2839 generate_master_data: true,
2840 generate_document_flows: true,
2841 generate_journal_entries: false,
2842 inject_anomalies: false,
2843 inject_data_quality: false,
2844 validate_balances: false,
2845 generate_ocpm_events: false,
2846 show_progress: false,
2847 vendors_per_company: 5,
2848 customers_per_company: 5,
2849 materials_per_company: 10,
2850 assets_per_company: 3,
2851 employees_per_company: 5,
2852 p2p_chains: 5,
2853 o2c_chains: 5,
2854 ..Default::default()
2855 };
2856
2857 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2858 let result = orchestrator.generate().unwrap();
2859
2860 assert!(!result.document_flows.vendor_invoices.is_empty());
2862 assert!(!result.document_flows.customer_invoices.is_empty());
2863
2864 assert!(!result.subledger.ap_invoices.is_empty());
2866 assert!(!result.subledger.ar_invoices.is_empty());
2867
2868 assert_eq!(
2870 result.subledger.ap_invoices.len(),
2871 result.document_flows.vendor_invoices.len()
2872 );
2873
2874 assert_eq!(
2876 result.subledger.ar_invoices.len(),
2877 result.document_flows.customer_invoices.len()
2878 );
2879
2880 assert_eq!(
2882 result.statistics.ap_invoice_count,
2883 result.subledger.ap_invoices.len()
2884 );
2885 assert_eq!(
2886 result.statistics.ar_invoice_count,
2887 result.subledger.ar_invoices.len()
2888 );
2889 }
2890
2891 #[test]
2892 fn test_balance_validation() {
2893 let config = create_test_config();
2894 let phase_config = PhaseConfig {
2895 generate_master_data: false,
2896 generate_document_flows: false,
2897 generate_journal_entries: true,
2898 inject_anomalies: false,
2899 validate_balances: true,
2900 show_progress: false,
2901 ..Default::default()
2902 };
2903
2904 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2905 let result = orchestrator.generate().unwrap();
2906
2907 assert!(result.balance_validation.validated);
2909 assert!(result.balance_validation.entries_processed > 0);
2910
2911 assert!(!result.balance_validation.has_unbalanced_entries);
2913
2914 assert_eq!(
2916 result.balance_validation.total_debits,
2917 result.balance_validation.total_credits
2918 );
2919 }
2920
2921 #[test]
2922 fn test_statistics_accuracy() {
2923 let config = create_test_config();
2924 let phase_config = PhaseConfig {
2925 generate_master_data: true,
2926 generate_document_flows: false,
2927 generate_journal_entries: true,
2928 inject_anomalies: false,
2929 show_progress: false,
2930 vendors_per_company: 10,
2931 customers_per_company: 20,
2932 materials_per_company: 15,
2933 assets_per_company: 5,
2934 employees_per_company: 8,
2935 ..Default::default()
2936 };
2937
2938 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2939 let result = orchestrator.generate().unwrap();
2940
2941 assert_eq!(
2943 result.statistics.vendor_count,
2944 result.master_data.vendors.len()
2945 );
2946 assert_eq!(
2947 result.statistics.customer_count,
2948 result.master_data.customers.len()
2949 );
2950 assert_eq!(
2951 result.statistics.material_count,
2952 result.master_data.materials.len()
2953 );
2954 assert_eq!(
2955 result.statistics.total_entries as usize,
2956 result.journal_entries.len()
2957 );
2958 }
2959
2960 #[test]
2961 fn test_phase_config_defaults() {
2962 let config = PhaseConfig::default();
2963 assert!(config.generate_master_data);
2964 assert!(config.generate_document_flows);
2965 assert!(config.generate_journal_entries);
2966 assert!(!config.inject_anomalies);
2967 assert!(config.validate_balances);
2968 assert!(config.show_progress);
2969 assert!(config.vendors_per_company > 0);
2970 assert!(config.customers_per_company > 0);
2971 }
2972
2973 #[test]
2974 fn test_get_coa_before_generation() {
2975 let config = create_test_config();
2976 let orchestrator = EnhancedOrchestrator::with_defaults(config).unwrap();
2977
2978 assert!(orchestrator.get_coa().is_none());
2980 }
2981
2982 #[test]
2983 fn test_get_coa_after_generation() {
2984 let config = create_test_config();
2985 let phase_config = PhaseConfig {
2986 generate_master_data: false,
2987 generate_document_flows: false,
2988 generate_journal_entries: true,
2989 inject_anomalies: false,
2990 show_progress: false,
2991 ..Default::default()
2992 };
2993
2994 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
2995 let _ = orchestrator.generate().unwrap();
2996
2997 assert!(orchestrator.get_coa().is_some());
2999 }
3000
3001 #[test]
3002 fn test_get_master_data() {
3003 let config = create_test_config();
3004 let phase_config = PhaseConfig {
3005 generate_master_data: true,
3006 generate_document_flows: false,
3007 generate_journal_entries: false,
3008 inject_anomalies: false,
3009 show_progress: false,
3010 vendors_per_company: 5,
3011 customers_per_company: 5,
3012 materials_per_company: 5,
3013 assets_per_company: 5,
3014 employees_per_company: 5,
3015 ..Default::default()
3016 };
3017
3018 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
3019 let _ = orchestrator.generate().unwrap();
3020
3021 let master_data = orchestrator.get_master_data();
3022 assert!(!master_data.vendors.is_empty());
3023 }
3024
3025 #[test]
3026 fn test_with_progress_builder() {
3027 let config = create_test_config();
3028 let orchestrator = EnhancedOrchestrator::with_defaults(config)
3029 .unwrap()
3030 .with_progress(false);
3031
3032 assert!(!orchestrator.phase_config.show_progress);
3034 }
3035
3036 #[test]
3037 fn test_multi_company_generation() {
3038 let mut config = create_test_config();
3039 config.companies.push(CompanyConfig {
3040 code: "2000".to_string(),
3041 name: "Subsidiary".to_string(),
3042 currency: "EUR".to_string(),
3043 country: "DE".to_string(),
3044 annual_transaction_volume: TransactionVolume::TenK,
3045 volume_weight: 0.5,
3046 fiscal_year_variant: "K4".to_string(),
3047 });
3048
3049 let phase_config = PhaseConfig {
3050 generate_master_data: true,
3051 generate_document_flows: false,
3052 generate_journal_entries: true,
3053 inject_anomalies: false,
3054 show_progress: false,
3055 vendors_per_company: 5,
3056 customers_per_company: 5,
3057 materials_per_company: 5,
3058 assets_per_company: 5,
3059 employees_per_company: 5,
3060 ..Default::default()
3061 };
3062
3063 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
3064 let result = orchestrator.generate().unwrap();
3065
3066 assert!(result.statistics.vendor_count >= 10); assert!(result.statistics.customer_count >= 10);
3069 assert!(result.statistics.companies_count == 2);
3070 }
3071
3072 #[test]
3073 fn test_empty_master_data_skips_document_flows() {
3074 let config = create_test_config();
3075 let phase_config = PhaseConfig {
3076 generate_master_data: false, generate_document_flows: true, generate_journal_entries: false,
3079 inject_anomalies: false,
3080 show_progress: false,
3081 ..Default::default()
3082 };
3083
3084 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
3085 let result = orchestrator.generate().unwrap();
3086
3087 assert!(result.document_flows.p2p_chains.is_empty());
3089 assert!(result.document_flows.o2c_chains.is_empty());
3090 }
3091
3092 #[test]
3093 fn test_journal_entry_line_item_count() {
3094 let config = create_test_config();
3095 let phase_config = PhaseConfig {
3096 generate_master_data: false,
3097 generate_document_flows: false,
3098 generate_journal_entries: true,
3099 inject_anomalies: false,
3100 show_progress: false,
3101 ..Default::default()
3102 };
3103
3104 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
3105 let result = orchestrator.generate().unwrap();
3106
3107 let calculated_line_items: u64 = result
3109 .journal_entries
3110 .iter()
3111 .map(|e| e.line_count() as u64)
3112 .sum();
3113 assert_eq!(result.statistics.total_line_items, calculated_line_items);
3114 }
3115
3116 #[test]
3117 fn test_audit_generation() {
3118 let config = create_test_config();
3119 let phase_config = PhaseConfig {
3120 generate_master_data: false,
3121 generate_document_flows: false,
3122 generate_journal_entries: true,
3123 inject_anomalies: false,
3124 show_progress: false,
3125 generate_audit: true,
3126 audit_engagements: 2,
3127 workpapers_per_engagement: 5,
3128 evidence_per_workpaper: 2,
3129 risks_per_engagement: 3,
3130 findings_per_engagement: 2,
3131 judgments_per_engagement: 2,
3132 ..Default::default()
3133 };
3134
3135 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).unwrap();
3136 let result = orchestrator.generate().unwrap();
3137
3138 assert_eq!(result.audit.engagements.len(), 2);
3140 assert!(!result.audit.workpapers.is_empty());
3141 assert!(!result.audit.evidence.is_empty());
3142 assert!(!result.audit.risk_assessments.is_empty());
3143 assert!(!result.audit.findings.is_empty());
3144 assert!(!result.audit.judgments.is_empty());
3145
3146 assert_eq!(
3148 result.statistics.audit_engagement_count,
3149 result.audit.engagements.len()
3150 );
3151 assert_eq!(
3152 result.statistics.audit_workpaper_count,
3153 result.audit.workpapers.len()
3154 );
3155 assert_eq!(
3156 result.statistics.audit_evidence_count,
3157 result.audit.evidence.len()
3158 );
3159 assert_eq!(
3160 result.statistics.audit_risk_count,
3161 result.audit.risk_assessments.len()
3162 );
3163 assert_eq!(
3164 result.statistics.audit_finding_count,
3165 result.audit.findings.len()
3166 );
3167 assert_eq!(
3168 result.statistics.audit_judgment_count,
3169 result.audit.judgments.len()
3170 );
3171 }
3172}