1use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48 documents::{
49 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50 },
51 AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52 Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62 ChartOfAccounts(Box<ChartOfAccounts>),
64 Vendor(Box<Vendor>),
66 Customer(Box<Customer>),
68 Material(Box<Material>),
70 Employee(Box<Employee>),
72 JournalEntry(Box<JournalEntry>),
74 PurchaseOrder(Box<PurchaseOrder>),
76 GoodsReceipt(Box<GoodsReceipt>),
78 VendorInvoice(Box<VendorInvoice>),
80 Payment(Box<Payment>),
82 SalesOrder(Box<SalesOrder>),
84 Delivery(Box<Delivery>),
86 CustomerInvoice(Box<CustomerInvoice>),
88 AnomalyLabel(Box<LabeledAnomaly>),
90 Progress(StreamProgress),
92 PhaseComplete(String),
94}
95
96impl GeneratedItem {
97 pub fn type_name(&self) -> &'static str {
99 match self {
100 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101 GeneratedItem::Vendor(_) => "vendor",
102 GeneratedItem::Customer(_) => "customer",
103 GeneratedItem::Material(_) => "material",
104 GeneratedItem::Employee(_) => "employee",
105 GeneratedItem::JournalEntry(_) => "journal_entry",
106 GeneratedItem::PurchaseOrder(_) => "purchase_order",
107 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109 GeneratedItem::Payment(_) => "payment",
110 GeneratedItem::SalesOrder(_) => "sales_order",
111 GeneratedItem::Delivery(_) => "delivery",
112 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113 GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114 GeneratedItem::Progress(_) => "progress",
115 GeneratedItem::PhaseComplete(_) => "phase_complete",
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123 ChartOfAccounts,
125 MasterData,
127 DocumentFlows,
129 OcpmEvents,
131 JournalEntries,
133 AnomalyInjection,
135 BalanceValidation,
137 DataQuality,
139 Complete,
141}
142
143impl GenerationPhase {
144 pub fn name(&self) -> &'static str {
146 match self {
147 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148 GenerationPhase::MasterData => "master_data",
149 GenerationPhase::DocumentFlows => "document_flows",
150 GenerationPhase::OcpmEvents => "ocpm_events",
151 GenerationPhase::JournalEntries => "journal_entries",
152 GenerationPhase::AnomalyInjection => "anomaly_injection",
153 GenerationPhase::BalanceValidation => "balance_validation",
154 GenerationPhase::DataQuality => "data_quality",
155 GenerationPhase::Complete => "complete",
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163 pub generator_config: GeneratorConfig,
165 pub stream_config: StreamConfig,
167 pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172 pub fn new(generator_config: GeneratorConfig) -> Self {
174 Self {
175 generator_config,
176 stream_config: StreamConfig::default(),
177 phases: vec![
178 GenerationPhase::ChartOfAccounts,
179 GenerationPhase::MasterData,
180 GenerationPhase::DocumentFlows,
181 GenerationPhase::JournalEntries,
182 ],
183 }
184 }
185
186 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188 Self {
189 generator_config,
190 stream_config: StreamConfig::default(),
191 phases: vec![
192 GenerationPhase::ChartOfAccounts,
193 GenerationPhase::MasterData,
194 GenerationPhase::DocumentFlows,
195 GenerationPhase::OcpmEvents,
196 GenerationPhase::JournalEntries,
197 GenerationPhase::AnomalyInjection,
198 GenerationPhase::DataQuality,
199 ],
200 }
201 }
202
203 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205 self.stream_config = config;
206 self
207 }
208
209 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211 self.phases = phases;
212 self
213 }
214}
215
216pub struct StreamingOrchestrator {
218 config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222 pub fn new(config: StreamingOrchestratorConfig) -> Self {
224 Self { config }
225 }
226
227 pub fn from_generator_config(config: GeneratorConfig) -> Self {
229 Self::new(StreamingOrchestratorConfig::new(config))
230 }
231
232 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236 let (sender, receiver) = stream_channel(
237 self.config.stream_config.buffer_size,
238 self.config.stream_config.backpressure,
239 );
240
241 let control = Arc::new(StreamControl::new());
242 let control_clone = Arc::clone(&control);
243
244 let config = self.config.clone();
245
246 thread::spawn(move || {
248 let result = Self::run_generation(config, sender, control_clone);
249 if let Err(e) = result {
250 warn!("Streaming generation error: {}", e);
251 }
252 });
253
254 Ok((receiver, control))
255 }
256
257 fn run_generation(
259 config: StreamingOrchestratorConfig,
260 sender: StreamSender<GeneratedItem>,
261 control: Arc<StreamControl>,
262 ) -> SynthResult<()> {
263 let start_time = Instant::now();
264 let mut items_generated: u64 = 0;
265 let mut phases_completed = Vec::new();
266
267 let progress_interval = config.stream_config.progress_interval;
269
270 let mut progress = StreamProgress::new("initializing");
272 sender.send(StreamEvent::Progress(progress.clone()))?;
273
274 for phase in &config.phases {
275 if control.is_cancelled() {
276 info!("Generation cancelled");
277 break;
278 }
279
280 while control.is_paused() {
282 std::thread::sleep(std::time::Duration::from_millis(100));
283 if control.is_cancelled() {
284 break;
285 }
286 }
287
288 progress.phase = phase.name().to_string();
289 sender.send(StreamEvent::Progress(progress.clone()))?;
290
291 match phase {
292 GenerationPhase::ChartOfAccounts => {
293 let result =
294 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295 items_generated += result;
296 }
297 GenerationPhase::MasterData => {
298 let result = Self::generate_master_data_phase(
299 &config.generator_config,
300 &sender,
301 &control,
302 )?;
303 items_generated += result;
304 }
305 GenerationPhase::DocumentFlows => {
306 let result = Self::generate_document_flows_phase(
307 &config.generator_config,
308 &sender,
309 &control,
310 progress_interval,
311 &mut progress,
312 )?;
313 items_generated += result;
314 }
315 GenerationPhase::OcpmEvents => {
316 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317 }
318 GenerationPhase::JournalEntries => {
319 let result = Self::generate_journal_entries_phase(
320 &config.generator_config,
321 &sender,
322 &control,
323 progress_interval,
324 &mut progress,
325 )?;
326 items_generated += result;
327 }
328 GenerationPhase::AnomalyInjection => {
329 info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330 }
331 GenerationPhase::DataQuality => {
332 info!(
333 "Data quality injection is not yet supported in streaming mode; skipping"
334 );
335 }
336 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337 info!("Phase {:?} is not applicable in streaming mode", phase);
338 }
339 }
340
341 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343 phase.name().to_string(),
344 )))?;
345 phases_completed.push(phase.name().to_string());
346
347 progress.items_generated = items_generated;
349 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350 if progress.elapsed_ms > 0 {
351 progress.items_per_second =
352 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353 }
354 sender.send(StreamEvent::Progress(progress.clone()))?;
355 }
356
357 let stats = sender.stats();
359 let summary = StreamSummary {
360 total_items: items_generated,
361 total_time_ms: start_time.elapsed().as_millis() as u64,
362 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364 } else {
365 0.0
366 },
367 error_count: 0,
368 dropped_count: stats.items_dropped,
369 peak_memory_mb: None,
370 phases_completed,
371 };
372
373 sender.send(StreamEvent::Complete(summary))?;
374 sender.close();
375
376 Ok(())
377 }
378
379 fn generate_coa_phase(
381 config: &GeneratorConfig,
382 sender: &StreamSender<GeneratedItem>,
383 control: &Arc<StreamControl>,
384 ) -> SynthResult<u64> {
385 use datasynth_generators::ChartOfAccountsGenerator;
386
387 if control.is_cancelled() {
388 return Ok(0);
389 }
390
391 info!("Generating Chart of Accounts");
392 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393 let complexity = config.chart_of_accounts.complexity;
394 let industry = config.global.industry;
395 let coa_framework = resolve_coa_framework_from_config(config);
396
397 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398 .with_coa_framework(coa_framework);
399 let coa = coa_gen.generate();
400
401 let account_count = coa.account_count() as u64;
402 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403 coa,
404 ))))?;
405
406 Ok(account_count)
407 }
408
409 fn generate_master_data_phase(
411 config: &GeneratorConfig,
412 sender: &StreamSender<GeneratedItem>,
413 control: &Arc<StreamControl>,
414 ) -> SynthResult<u64> {
415 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417 let mut count: u64 = 0;
418 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419 let md_config = &config.master_data;
420 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421 .unwrap_or_else(|e| {
422 tracing::warn!(
423 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424 config.global.start_date,
425 e
426 );
427 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428 });
429
430 let company_code = config
431 .companies
432 .first()
433 .map(|c| c.code.as_str())
434 .unwrap_or_else(|| {
435 tracing::warn!("No companies configured, defaulting to company code '1000'");
436 "1000"
437 });
438
439 if control.is_cancelled() {
441 return Ok(count);
442 }
443
444 info!("Generating vendors");
445 let mut vendor_gen = VendorGenerator::new(seed);
446 for _ in 0..md_config.vendors.count {
447 if control.is_cancelled() {
448 break;
449 }
450 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452 count += 1;
453 }
454
455 if control.is_cancelled() {
457 return Ok(count);
458 }
459
460 info!("Generating customers");
461 let mut customer_gen = CustomerGenerator::new(seed + 1);
462 for _ in 0..md_config.customers.count {
463 if control.is_cancelled() {
464 break;
465 }
466 let customer = customer_gen.generate_customer(company_code, effective_date);
467 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468 customer,
469 ))))?;
470 count += 1;
471 }
472
473 if control.is_cancelled() {
475 return Ok(count);
476 }
477
478 info!("Generating employees");
479 let mut employee_gen = EmployeeGenerator::new(seed + 4);
480 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482 datasynth_generators::DepartmentDefinition {
483 code: first_custom.code.clone(),
484 name: first_custom.name.clone(),
485 cost_center: first_custom
486 .cost_center
487 .clone()
488 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489 headcount: 10,
490 system_roles: vec![],
491 transaction_codes: vec![],
492 }
493 } else {
494 warn!("No departments configured, using default 'General' department");
495 datasynth_generators::DepartmentDefinition {
496 code: "1000".to_string(),
497 name: "General".to_string(),
498 cost_center: "CC1000".to_string(),
499 headcount: 10,
500 system_roles: vec![],
501 transaction_codes: vec![],
502 }
503 };
504 for _ in 0..md_config.employees.count {
505 if control.is_cancelled() {
506 break;
507 }
508 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510 employee,
511 ))))?;
512 count += 1;
513 }
514
515 Ok(count)
516 }
517
518 fn generate_journal_entries_phase(
526 config: &GeneratorConfig,
527 sender: &StreamSender<GeneratedItem>,
528 control: &Arc<StreamControl>,
529 progress_interval: u64,
530 progress: &mut StreamProgress,
531 ) -> SynthResult<u64> {
532 use datasynth_generators::{
533 AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator, JournalEntryGenerator,
534 };
535 use std::sync::Arc;
536
537 let mut count: u64 = 0;
538 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
539
540 let default_monthly = 500;
542 let total_entries: usize = config
543 .companies
544 .iter()
545 .map(|c| {
546 let monthly = (c.volume_weight * default_monthly as f64) as usize;
547 monthly.max(100) * config.global.period_months as usize
548 })
549 .sum();
550
551 progress.items_remaining = Some(total_entries as u64);
552 info!("Generating {} journal entries", total_entries);
553
554 let complexity = config.chart_of_accounts.complexity;
556 let industry = config.global.industry;
557 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
558 let coa = Arc::new(coa_gen.generate());
559
560 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
562 .unwrap_or_else(|e| {
563 tracing::warn!(
564 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
565 config.global.start_date,
566 e
567 );
568 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
569 });
570 let end_date = start_date
571 .checked_add_months(chrono::Months::new(config.global.period_months))
572 .unwrap_or(start_date + chrono::Duration::days(365));
573
574 let mut je_gen = JournalEntryGenerator::from_generator_config(
576 config,
577 Arc::clone(&coa),
578 start_date,
579 end_date,
580 seed,
581 );
582
583 let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
586 let mut anomaly_injector = if anomaly_enabled {
587 let total_rate = if config.anomaly_injection.enabled {
588 config.anomaly_injection.rates.total_rate
589 } else {
590 config.fraud.fraud_rate
591 };
592 let fraud_rate = if config.anomaly_injection.enabled {
593 config.anomaly_injection.rates.fraud_rate
594 } else {
595 AnomalyRateConfig::default().fraud_rate
596 };
597 let error_rate = if config.anomaly_injection.enabled {
598 config.anomaly_injection.rates.error_rate
599 } else {
600 AnomalyRateConfig::default().error_rate
601 };
602 let process_issue_rate = if config.anomaly_injection.enabled {
603 config.anomaly_injection.rates.process_rate
604 } else {
605 AnomalyRateConfig::default().process_issue_rate
606 };
607
608 let injector_config = AnomalyInjectorConfig {
609 rates: AnomalyRateConfig {
610 total_rate,
611 fraud_rate,
612 error_rate,
613 process_issue_rate,
614 ..Default::default()
615 },
616 seed: seed + 5000,
617 ..Default::default()
618 };
619
620 info!(
621 "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
622 total_rate
623 );
624 Some(AnomalyInjector::new(injector_config))
625 } else {
626 None
627 };
628
629 let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
632 let mut remaining = total_entries;
633
634 while remaining > 0 {
635 if control.is_cancelled() {
636 break;
637 }
638
639 let current_batch = remaining.min(batch_size);
640 let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
641
642 for _ in 0..current_batch {
643 if control.is_cancelled() {
644 break;
645 }
646
647 while control.is_paused() {
649 std::thread::sleep(std::time::Duration::from_millis(100));
650 if control.is_cancelled() {
651 break;
652 }
653 }
654
655 batch.push(je_gen.generate());
656 }
657
658 if batch.is_empty() {
659 break;
660 }
661
662 if let Some(ref mut injector) = anomaly_injector {
664 let result = injector.process_entries(&mut batch);
665
666 for label in result.labels {
668 sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
669 label,
670 ))))?;
671 }
672 }
673
674 for je in batch {
676 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
677 count += 1;
678
679 if count.is_multiple_of(progress_interval) {
681 progress.items_generated = count;
682 progress.items_remaining = Some(total_entries as u64 - count);
683 sender.send(StreamEvent::Progress(progress.clone()))?;
684 }
685 }
686
687 remaining = remaining.saturating_sub(current_batch);
688 }
689
690 Ok(count)
691 }
692
693 fn generate_document_flows_phase(
699 config: &GeneratorConfig,
700 sender: &StreamSender<GeneratedItem>,
701 control: &Arc<StreamControl>,
702 progress_interval: u64,
703 progress: &mut StreamProgress,
704 ) -> SynthResult<u64> {
705 use chrono::Datelike;
706 use datasynth_generators::{
707 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
708 };
709
710 let mut count: u64 = 0;
711 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
712 let df_config = &config.document_flows;
713 let md_config = &config.master_data;
714
715 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
717 .unwrap_or_else(|e| {
718 tracing::warn!(
719 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
720 config.global.start_date,
721 e
722 );
723 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
724 });
725 let end_date = start_date
726 .checked_add_months(chrono::Months::new(config.global.period_months))
727 .unwrap_or(start_date + chrono::Duration::days(365));
728 let total_period_days = (end_date - start_date).num_days().max(1);
729
730 let company_code = config
731 .companies
732 .first()
733 .map(|c| c.code.as_str())
734 .unwrap_or_else(|| {
735 tracing::warn!("No companies configured, defaulting to company code '1000'");
736 "1000"
737 });
738
739 let vendor_count = md_config.vendors.count.min(100);
741 let customer_count = md_config.customers.count.min(100);
742 let material_count = md_config.materials.count.min(50);
743
744 let mut vendor_gen = VendorGenerator::new(seed);
746 let mut customer_gen = CustomerGenerator::new(seed + 1);
747 let mut material_gen = MaterialGenerator::new(seed + 2);
748
749 let vendors: Vec<_> = (0..vendor_count)
750 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
751 .collect();
752
753 let customers: Vec<_> = (0..customer_count)
754 .map(|_| customer_gen.generate_customer(company_code, start_date))
755 .collect();
756
757 let materials: Vec<_> = (0..material_count)
758 .map(|_| material_gen.generate_material(company_code, start_date))
759 .collect();
760
761 let base_chains = (config.global.period_months as usize * 50).max(100);
764
765 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
767 info!("Generating P2P document flows");
768 let mut p2p_gen = P2PGenerator::new(seed + 100);
769
770 let chains_to_generate = base_chains.min(1000);
771 progress.items_remaining = Some(chains_to_generate as u64);
772
773 for i in 0..chains_to_generate {
774 if control.is_cancelled() {
775 break;
776 }
777
778 while control.is_paused() {
780 std::thread::sleep(std::time::Duration::from_millis(100));
781 if control.is_cancelled() {
782 break;
783 }
784 }
785
786 let vendor = &vendors[i % vendors.len()];
787 let material_refs: Vec<&datasynth_core::models::Material> =
788 vec![&materials[i % materials.len()]];
789
790 let days_offset = (i as i64 % total_period_days).max(0);
792 let po_date = start_date + chrono::Duration::days(days_offset);
793 let fiscal_year = po_date.year() as u16;
794 let fiscal_period = po_date.month() as u8;
795
796 let chain = p2p_gen.generate_chain(
797 company_code,
798 vendor,
799 &material_refs,
800 po_date,
801 fiscal_year,
802 fiscal_period,
803 "SYSTEM",
804 );
805
806 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
808 chain.purchase_order,
809 ))))?;
810 count += 1;
811
812 for gr in chain.goods_receipts {
813 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
814 count += 1;
815 }
816
817 if let Some(vi) = chain.vendor_invoice {
818 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
819 vi,
820 ))))?;
821 count += 1;
822 }
823
824 if let Some(payment) = chain.payment {
825 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
826 count += 1;
827 }
828
829 if count.is_multiple_of(progress_interval) {
830 progress.items_generated = count;
831 sender.send(StreamEvent::Progress(progress.clone()))?;
832 }
833 }
834 }
835
836 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
838 info!("Generating O2C document flows");
839 let mut o2c_gen = O2CGenerator::new(seed + 200);
840
841 let chains_to_generate = base_chains.min(1000);
842
843 for i in 0..chains_to_generate {
844 if control.is_cancelled() {
845 break;
846 }
847
848 while control.is_paused() {
849 std::thread::sleep(std::time::Duration::from_millis(100));
850 if control.is_cancelled() {
851 break;
852 }
853 }
854
855 let customer = &customers[i % customers.len()];
856 let material_refs: Vec<&datasynth_core::models::Material> =
857 vec![&materials[i % materials.len()]];
858
859 let days_offset = (i as i64 % total_period_days).max(0);
860 let so_date = start_date + chrono::Duration::days(days_offset);
861 let fiscal_year = so_date.year() as u16;
862 let fiscal_period = so_date.month() as u8;
863
864 let chain = o2c_gen.generate_chain(
865 company_code,
866 customer,
867 &material_refs,
868 so_date,
869 fiscal_year,
870 fiscal_period,
871 "SYSTEM",
872 );
873
874 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
875 chain.sales_order,
876 ))))?;
877 count += 1;
878
879 for delivery in chain.deliveries {
880 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
881 delivery,
882 ))))?;
883 count += 1;
884 }
885
886 if let Some(ci) = chain.customer_invoice {
887 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
888 ci,
889 ))))?;
890 count += 1;
891 }
892
893 if count.is_multiple_of(progress_interval) {
894 progress.items_generated = count;
895 sender.send(StreamEvent::Progress(progress.clone()))?;
896 }
897 }
898 }
899
900 Ok(count)
901 }
902
903 pub fn stats(&self) -> StreamingOrchestratorStats {
905 StreamingOrchestratorStats {
906 phases: self.config.phases.len(),
907 buffer_size: self.config.stream_config.buffer_size,
908 backpressure: self.config.stream_config.backpressure,
909 }
910 }
911}
912
913#[derive(Debug, Clone)]
915pub struct StreamingOrchestratorStats {
916 pub phases: usize,
918 pub buffer_size: usize,
920 pub backpressure: BackpressureStrategy,
922}
923
924fn resolve_coa_framework_from_config(
926 config: &GeneratorConfig,
927) -> datasynth_generators::coa_generator::CoAFramework {
928 use datasynth_generators::coa_generator::CoAFramework;
929 if config.accounting_standards.enabled {
930 match config.accounting_standards.framework {
931 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
932 return CoAFramework::FrenchPcg;
933 }
934 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
935 return CoAFramework::GermanSkr04;
936 }
937 _ => {}
938 }
939 }
940 CoAFramework::UsGaap
941}
942
943#[cfg(test)]
944#[allow(clippy::unwrap_used)]
945mod tests {
946 use super::*;
947 use datasynth_config::presets::create_preset;
948 use datasynth_config::schema::TransactionVolume;
949 use datasynth_core::models::{CoAComplexity, IndustrySector};
950
951 fn create_test_config() -> GeneratorConfig {
952 create_preset(
953 IndustrySector::Retail,
954 2,
955 3,
956 CoAComplexity::Small,
957 TransactionVolume::TenK,
958 )
959 }
960
961 #[test]
962 fn test_streaming_orchestrator_creation() {
963 let config = create_test_config();
964 let orchestrator = StreamingOrchestrator::from_generator_config(config);
965 let stats = orchestrator.stats();
966
967 assert!(stats.phases > 0);
968 assert!(stats.buffer_size > 0);
969 }
970
971 #[test]
972 fn test_streaming_generation() {
973 let mut config = create_test_config();
974 config.master_data.vendors.count = 5;
976 config.master_data.customers.count = 5;
977 config.master_data.employees.count = 5;
978 config.global.period_months = 1;
979
980 let streaming_config = StreamingOrchestratorConfig::new(config)
981 .with_phases(vec![
982 GenerationPhase::ChartOfAccounts,
983 GenerationPhase::MasterData,
984 ])
985 .with_stream_config(StreamConfig {
986 buffer_size: 100,
987 progress_interval: 10,
988 ..Default::default()
989 });
990
991 let orchestrator = StreamingOrchestrator::new(streaming_config);
992 let (receiver, _control) = orchestrator.stream().unwrap();
993
994 let mut items_count = 0;
995 let mut has_coa = false;
996 let mut has_completion = false;
997
998 for event in receiver {
999 match event {
1000 StreamEvent::Data(item) => {
1001 items_count += 1;
1002 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1003 has_coa = true;
1004 }
1005 }
1006 StreamEvent::Complete(_) => {
1007 has_completion = true;
1008 break;
1009 }
1010 _ => {}
1011 }
1012 }
1013
1014 assert!(items_count > 0);
1015 assert!(has_coa);
1016 assert!(has_completion);
1017 }
1018
1019 #[test]
1020 fn test_stream_cancellation() {
1021 let mut config = create_test_config();
1022 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
1025 .with_phases(vec![GenerationPhase::JournalEntries]);
1026
1027 let orchestrator = StreamingOrchestrator::new(streaming_config);
1028 let (receiver, control) = orchestrator.stream().unwrap();
1029
1030 let mut items_count = 0;
1032 for event in receiver {
1033 if let StreamEvent::Data(_) = event {
1034 items_count += 1;
1035 if items_count >= 10 {
1036 control.cancel();
1037 break;
1038 }
1039 }
1040 }
1041
1042 assert!(control.is_cancelled());
1043 }
1044
1045 #[test]
1046 fn test_streaming_anomaly_injection() {
1047 let mut config = create_test_config();
1048 config.master_data.vendors.count = 3;
1050 config.master_data.customers.count = 3;
1051 config.master_data.employees.count = 3;
1052 config.global.period_months = 1;
1053
1054 config.anomaly_injection.enabled = true;
1056 config.anomaly_injection.rates.total_rate = 0.20; config.anomaly_injection.rates.fraud_rate = 0.40;
1058 config.anomaly_injection.rates.error_rate = 0.40;
1059 config.anomaly_injection.rates.process_rate = 0.20;
1060
1061 let streaming_config = StreamingOrchestratorConfig::new(config)
1062 .with_phases(vec![GenerationPhase::JournalEntries])
1063 .with_stream_config(StreamConfig {
1064 buffer_size: 500,
1065 progress_interval: 50,
1066 ..Default::default()
1067 });
1068
1069 let orchestrator = StreamingOrchestrator::new(streaming_config);
1070 let (receiver, _control) = orchestrator.stream().unwrap();
1071
1072 let mut je_count = 0;
1073 let mut label_count = 0;
1074 let mut has_completion = false;
1075
1076 for event in receiver {
1077 match event {
1078 StreamEvent::Data(item) => match item {
1079 GeneratedItem::JournalEntry(_) => je_count += 1,
1080 GeneratedItem::AnomalyLabel(_) => label_count += 1,
1081 _ => {}
1082 },
1083 StreamEvent::Complete(_) => {
1084 has_completion = true;
1085 break;
1086 }
1087 _ => {}
1088 }
1089 }
1090
1091 assert!(has_completion, "Stream should complete");
1092 assert!(je_count > 0, "Should generate journal entries");
1093 assert!(
1094 label_count > 0,
1095 "Should generate anomaly labels (got {} JEs, {} labels)",
1096 je_count,
1097 label_count
1098 );
1099 }
1100
1101 #[test]
1102 fn test_streaming_no_anomalies_when_disabled() {
1103 let mut config = create_test_config();
1104 config.master_data.vendors.count = 3;
1105 config.master_data.customers.count = 3;
1106 config.master_data.employees.count = 3;
1107 config.global.period_months = 1;
1108
1109 config.anomaly_injection.enabled = false;
1111 config.fraud.enabled = false;
1112
1113 let streaming_config = StreamingOrchestratorConfig::new(config)
1114 .with_phases(vec![GenerationPhase::JournalEntries])
1115 .with_stream_config(StreamConfig {
1116 buffer_size: 500,
1117 progress_interval: 50,
1118 ..Default::default()
1119 });
1120
1121 let orchestrator = StreamingOrchestrator::new(streaming_config);
1122 let (receiver, _control) = orchestrator.stream().unwrap();
1123
1124 let mut label_count = 0;
1125
1126 for event in receiver {
1127 match event {
1128 StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1129 StreamEvent::Complete(_) => break,
1130 _ => {}
1131 }
1132 }
1133
1134 assert_eq!(
1135 label_count, 0,
1136 "Should not generate anomaly labels when disabled"
1137 );
1138 }
1139
1140 #[test]
1141 fn test_generated_item_type_name() {
1142 use datasynth_core::models::{CoAComplexity, IndustrySector};
1143
1144 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1145 "TEST_COA".to_string(),
1146 "Test Chart of Accounts".to_string(),
1147 "US".to_string(),
1148 IndustrySector::Manufacturing,
1149 CoAComplexity::Small,
1150 )));
1151 assert_eq!(coa.type_name(), "chart_of_accounts");
1152
1153 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1154 assert_eq!(progress.type_name(), "progress");
1155 }
1156}