1use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48 documents::{
49 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50 },
51 AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52 Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62 ChartOfAccounts(Box<ChartOfAccounts>),
64 Vendor(Box<Vendor>),
66 Customer(Box<Customer>),
68 Material(Box<Material>),
70 Employee(Box<Employee>),
72 JournalEntry(Box<JournalEntry>),
74 PurchaseOrder(Box<PurchaseOrder>),
76 GoodsReceipt(Box<GoodsReceipt>),
78 VendorInvoice(Box<VendorInvoice>),
80 Payment(Box<Payment>),
82 SalesOrder(Box<SalesOrder>),
84 Delivery(Box<Delivery>),
86 CustomerInvoice(Box<CustomerInvoice>),
88 AnomalyLabel(Box<LabeledAnomaly>),
90 Progress(StreamProgress),
92 PhaseComplete(String),
94}
95
96impl GeneratedItem {
97 pub fn type_name(&self) -> &'static str {
99 match self {
100 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101 GeneratedItem::Vendor(_) => "vendor",
102 GeneratedItem::Customer(_) => "customer",
103 GeneratedItem::Material(_) => "material",
104 GeneratedItem::Employee(_) => "employee",
105 GeneratedItem::JournalEntry(_) => "journal_entry",
106 GeneratedItem::PurchaseOrder(_) => "purchase_order",
107 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109 GeneratedItem::Payment(_) => "payment",
110 GeneratedItem::SalesOrder(_) => "sales_order",
111 GeneratedItem::Delivery(_) => "delivery",
112 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113 GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114 GeneratedItem::Progress(_) => "progress",
115 GeneratedItem::PhaseComplete(_) => "phase_complete",
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123 ChartOfAccounts,
125 MasterData,
127 DocumentFlows,
129 OcpmEvents,
131 JournalEntries,
133 AnomalyInjection,
135 BalanceValidation,
137 DataQuality,
139 Complete,
141}
142
143impl GenerationPhase {
144 pub fn name(&self) -> &'static str {
146 match self {
147 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148 GenerationPhase::MasterData => "master_data",
149 GenerationPhase::DocumentFlows => "document_flows",
150 GenerationPhase::OcpmEvents => "ocpm_events",
151 GenerationPhase::JournalEntries => "journal_entries",
152 GenerationPhase::AnomalyInjection => "anomaly_injection",
153 GenerationPhase::BalanceValidation => "balance_validation",
154 GenerationPhase::DataQuality => "data_quality",
155 GenerationPhase::Complete => "complete",
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163 pub generator_config: GeneratorConfig,
165 pub stream_config: StreamConfig,
167 pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172 pub fn new(generator_config: GeneratorConfig) -> Self {
174 Self {
175 generator_config,
176 stream_config: StreamConfig::default(),
177 phases: vec![
178 GenerationPhase::ChartOfAccounts,
179 GenerationPhase::MasterData,
180 GenerationPhase::DocumentFlows,
181 GenerationPhase::JournalEntries,
182 ],
183 }
184 }
185
186 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188 Self {
189 generator_config,
190 stream_config: StreamConfig::default(),
191 phases: vec![
192 GenerationPhase::ChartOfAccounts,
193 GenerationPhase::MasterData,
194 GenerationPhase::DocumentFlows,
195 GenerationPhase::OcpmEvents,
196 GenerationPhase::JournalEntries,
197 GenerationPhase::AnomalyInjection,
198 GenerationPhase::DataQuality,
199 ],
200 }
201 }
202
203 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205 self.stream_config = config;
206 self
207 }
208
209 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211 self.phases = phases;
212 self
213 }
214}
215
216pub struct StreamingOrchestrator {
218 config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222 pub fn new(config: StreamingOrchestratorConfig) -> Self {
224 Self { config }
225 }
226
227 pub fn from_generator_config(config: GeneratorConfig) -> Self {
229 Self::new(StreamingOrchestratorConfig::new(config))
230 }
231
232 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236 let (sender, receiver) = stream_channel(
237 self.config.stream_config.buffer_size,
238 self.config.stream_config.backpressure,
239 );
240
241 let control = Arc::new(StreamControl::new());
242 let control_clone = Arc::clone(&control);
243
244 let config = self.config.clone();
245
246 thread::spawn(move || {
248 let result = Self::run_generation(config, sender, control_clone);
249 if let Err(e) = result {
250 warn!("Streaming generation error: {}", e);
251 }
252 });
253
254 Ok((receiver, control))
255 }
256
257 fn run_generation(
259 config: StreamingOrchestratorConfig,
260 sender: StreamSender<GeneratedItem>,
261 control: Arc<StreamControl>,
262 ) -> SynthResult<()> {
263 let start_time = Instant::now();
264 let mut items_generated: u64 = 0;
265 let mut phases_completed = Vec::new();
266
267 let progress_interval = config.stream_config.progress_interval;
269
270 let mut progress = StreamProgress::new("initializing");
272 sender.send(StreamEvent::Progress(progress.clone()))?;
273
274 for phase in &config.phases {
275 if control.is_cancelled() {
276 info!("Generation cancelled");
277 break;
278 }
279
280 while control.is_paused() {
282 std::thread::sleep(std::time::Duration::from_millis(100));
283 if control.is_cancelled() {
284 break;
285 }
286 }
287
288 progress.phase = phase.name().to_string();
289 sender.send(StreamEvent::Progress(progress.clone()))?;
290
291 match phase {
292 GenerationPhase::ChartOfAccounts => {
293 let result =
294 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295 items_generated += result;
296 }
297 GenerationPhase::MasterData => {
298 let result = Self::generate_master_data_phase(
299 &config.generator_config,
300 &sender,
301 &control,
302 )?;
303 items_generated += result;
304 }
305 GenerationPhase::DocumentFlows => {
306 let result = Self::generate_document_flows_phase(
307 &config.generator_config,
308 &sender,
309 &control,
310 progress_interval,
311 &mut progress,
312 )?;
313 items_generated += result;
314 }
315 GenerationPhase::OcpmEvents => {
316 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317 }
318 GenerationPhase::JournalEntries => {
319 let result = Self::generate_journal_entries_phase(
320 &config.generator_config,
321 &sender,
322 &control,
323 progress_interval,
324 &mut progress,
325 )?;
326 items_generated += result;
327 }
328 GenerationPhase::AnomalyInjection => {
329 info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330 }
331 GenerationPhase::DataQuality => {
332 info!(
333 "Data quality injection is not yet supported in streaming mode; skipping"
334 );
335 }
336 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337 info!("Phase {:?} is not applicable in streaming mode", phase);
338 }
339 }
340
341 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343 phase.name().to_string(),
344 )))?;
345 phases_completed.push(phase.name().to_string());
346
347 progress.items_generated = items_generated;
349 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350 if progress.elapsed_ms > 0 {
351 progress.items_per_second =
352 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353 }
354 sender.send(StreamEvent::Progress(progress.clone()))?;
355 }
356
357 let stats = sender.stats();
359 let summary = StreamSummary {
360 total_items: items_generated,
361 total_time_ms: start_time.elapsed().as_millis() as u64,
362 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364 } else {
365 0.0
366 },
367 error_count: 0,
368 dropped_count: stats.items_dropped,
369 peak_memory_mb: None,
370 phases_completed,
371 };
372
373 sender.send(StreamEvent::Complete(summary))?;
374 sender.close();
375
376 Ok(())
377 }
378
379 fn generate_coa_phase(
381 config: &GeneratorConfig,
382 sender: &StreamSender<GeneratedItem>,
383 control: &Arc<StreamControl>,
384 ) -> SynthResult<u64> {
385 use datasynth_generators::ChartOfAccountsGenerator;
386
387 if control.is_cancelled() {
388 return Ok(0);
389 }
390
391 info!("Generating Chart of Accounts");
392 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393 let complexity = config.chart_of_accounts.complexity;
394 let industry = config.global.industry;
395 let coa_framework = resolve_coa_framework_from_config(config);
396
397 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398 .with_coa_framework(coa_framework);
399 let coa = coa_gen.generate();
400
401 let account_count = coa.account_count() as u64;
402 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403 coa,
404 ))))?;
405
406 Ok(account_count)
407 }
408
409 fn generate_master_data_phase(
411 config: &GeneratorConfig,
412 sender: &StreamSender<GeneratedItem>,
413 control: &Arc<StreamControl>,
414 ) -> SynthResult<u64> {
415 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417 let mut count: u64 = 0;
418 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419 let md_config = &config.master_data;
420 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421 .unwrap_or_else(|e| {
422 tracing::warn!(
423 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424 config.global.start_date,
425 e
426 );
427 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428 });
429
430 let company_code = config
431 .companies
432 .first()
433 .map(|c| c.code.as_str())
434 .unwrap_or_else(|| {
435 tracing::warn!("No companies configured, defaulting to company code '1000'");
436 "1000"
437 });
438
439 if control.is_cancelled() {
441 return Ok(count);
442 }
443
444 info!("Generating vendors");
445 let mut vendor_gen = VendorGenerator::new(seed);
446 for _ in 0..md_config.vendors.count {
447 if control.is_cancelled() {
448 break;
449 }
450 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452 count += 1;
453 }
454
455 if control.is_cancelled() {
457 return Ok(count);
458 }
459
460 info!("Generating customers");
461 let mut customer_gen = CustomerGenerator::new(seed + 1);
462 for _ in 0..md_config.customers.count {
463 if control.is_cancelled() {
464 break;
465 }
466 let customer = customer_gen.generate_customer(company_code, effective_date);
467 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468 customer,
469 ))))?;
470 count += 1;
471 }
472
473 if control.is_cancelled() {
475 return Ok(count);
476 }
477
478 info!("Generating employees");
479 let mut employee_gen = EmployeeGenerator::new(seed + 4);
480 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482 datasynth_generators::DepartmentDefinition {
483 code: first_custom.code.clone(),
484 name: first_custom.name.clone(),
485 cost_center: first_custom
486 .cost_center
487 .clone()
488 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489 headcount: 10,
490 system_roles: vec![],
491 transaction_codes: vec![],
492 }
493 } else {
494 warn!("No departments configured, using default 'General' department");
495 datasynth_generators::DepartmentDefinition {
496 code: "1000".to_string(),
497 name: "General".to_string(),
498 cost_center: "CC1000".to_string(),
499 headcount: 10,
500 system_roles: vec![],
501 transaction_codes: vec![],
502 }
503 };
504 for _ in 0..md_config.employees.count {
505 if control.is_cancelled() {
506 break;
507 }
508 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510 employee,
511 ))))?;
512 count += 1;
513 }
514
515 Ok(count)
516 }
517
518 fn generate_journal_entries_phase(
526 config: &GeneratorConfig,
527 sender: &StreamSender<GeneratedItem>,
528 control: &Arc<StreamControl>,
529 progress_interval: u64,
530 progress: &mut StreamProgress,
531 ) -> SynthResult<u64> {
532 use datasynth_generators::{
533 AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator, JournalEntryGenerator,
534 };
535 use std::sync::Arc;
536
537 let mut count: u64 = 0;
538 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
539
540 let default_monthly = 500;
542 let total_entries: usize = config
543 .companies
544 .iter()
545 .map(|c| {
546 let monthly = (c.volume_weight * default_monthly as f64) as usize;
547 monthly.max(100) * config.global.period_months as usize
548 })
549 .sum();
550
551 progress.items_remaining = Some(total_entries as u64);
552 info!("Generating {} journal entries", total_entries);
553
554 let complexity = config.chart_of_accounts.complexity;
556 let industry = config.global.industry;
557 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
558 let coa = Arc::new(coa_gen.generate());
559
560 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
562 .unwrap_or_else(|e| {
563 tracing::warn!(
564 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
565 config.global.start_date,
566 e
567 );
568 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
569 });
570 let end_date = start_date
571 .checked_add_months(chrono::Months::new(config.global.period_months))
572 .unwrap_or(start_date + chrono::Duration::days(365));
573
574 let mut je_gen = JournalEntryGenerator::from_generator_config(
576 config,
577 Arc::clone(&coa),
578 start_date,
579 end_date,
580 seed,
581 );
582
583 let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
586 let mut anomaly_injector = if anomaly_enabled {
587 let total_rate = if config.anomaly_injection.enabled {
588 config.anomaly_injection.rates.total_rate
589 } else {
590 config.fraud.fraud_rate
591 };
592 let fraud_rate = if config.anomaly_injection.enabled {
593 config.anomaly_injection.rates.fraud_rate
594 } else {
595 AnomalyRateConfig::default().fraud_rate
596 };
597 let error_rate = if config.anomaly_injection.enabled {
598 config.anomaly_injection.rates.error_rate
599 } else {
600 AnomalyRateConfig::default().error_rate
601 };
602 let process_issue_rate = if config.anomaly_injection.enabled {
603 config.anomaly_injection.rates.process_rate
604 } else {
605 AnomalyRateConfig::default().process_issue_rate
606 };
607
608 let injector_config = AnomalyInjectorConfig {
609 rates: AnomalyRateConfig {
610 total_rate,
611 fraud_rate,
612 error_rate,
613 process_issue_rate,
614 ..Default::default()
615 },
616 seed: seed + 5000,
617 ..Default::default()
618 };
619
620 info!(
621 "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
622 total_rate
623 );
624 Some(AnomalyInjector::new(injector_config))
625 } else {
626 None
627 };
628
629 let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
632 let mut remaining = total_entries;
633
634 while remaining > 0 {
635 if control.is_cancelled() {
636 break;
637 }
638
639 let current_batch = remaining.min(batch_size);
640 let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
641
642 for _ in 0..current_batch {
643 if control.is_cancelled() {
644 break;
645 }
646
647 while control.is_paused() {
649 std::thread::sleep(std::time::Duration::from_millis(100));
650 if control.is_cancelled() {
651 break;
652 }
653 }
654
655 batch.push(je_gen.generate());
656 }
657
658 if batch.is_empty() {
659 break;
660 }
661
662 if let Some(ref mut injector) = anomaly_injector {
664 let result = injector.process_entries(&mut batch);
665
666 for label in result.labels {
668 sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
669 label,
670 ))))?;
671 }
672 }
673
674 for je in batch {
676 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
677 count += 1;
678
679 if count.is_multiple_of(progress_interval) {
681 progress.items_generated = count;
682 progress.items_remaining = Some(total_entries as u64 - count);
683 sender.send(StreamEvent::Progress(progress.clone()))?;
684 }
685 }
686
687 remaining = remaining.saturating_sub(current_batch);
688 }
689
690 Ok(count)
691 }
692
693 fn generate_document_flows_phase(
699 config: &GeneratorConfig,
700 sender: &StreamSender<GeneratedItem>,
701 control: &Arc<StreamControl>,
702 progress_interval: u64,
703 progress: &mut StreamProgress,
704 ) -> SynthResult<u64> {
705 use chrono::Datelike;
706 use datasynth_generators::{
707 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
708 };
709
710 let mut count: u64 = 0;
711 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
712 let df_config = &config.document_flows;
713 let md_config = &config.master_data;
714
715 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
717 .unwrap_or_else(|e| {
718 tracing::warn!(
719 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
720 config.global.start_date,
721 e
722 );
723 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
724 });
725 let end_date = start_date
726 .checked_add_months(chrono::Months::new(config.global.period_months))
727 .unwrap_or(start_date + chrono::Duration::days(365));
728 let total_period_days = (end_date - start_date).num_days().max(1);
729
730 let company_code = config
731 .companies
732 .first()
733 .map(|c| c.code.as_str())
734 .unwrap_or_else(|| {
735 tracing::warn!("No companies configured, defaulting to company code '1000'");
736 "1000"
737 });
738
739 let vendor_count = md_config.vendors.count.min(100);
741 let customer_count = md_config.customers.count.min(100);
742 let material_count = md_config.materials.count.min(50);
743
744 let mut vendor_gen = VendorGenerator::new(seed);
746 let mut customer_gen = CustomerGenerator::new(seed + 1);
747 let mut material_gen = MaterialGenerator::new(seed + 2);
748
749 let vendors: Vec<_> = (0..vendor_count)
750 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
751 .collect();
752
753 let customers: Vec<_> = (0..customer_count)
754 .map(|_| customer_gen.generate_customer(company_code, start_date))
755 .collect();
756
757 let materials: Vec<_> = (0..material_count)
758 .map(|_| material_gen.generate_material(company_code, start_date))
759 .collect();
760
761 let base_chains = (config.global.period_months as usize * 50).max(100);
764
765 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
767 info!("Generating P2P document flows");
768 let mut p2p_gen = P2PGenerator::new(seed + 100);
769
770 let chains_to_generate = base_chains.min(1000);
771 progress.items_remaining = Some(chains_to_generate as u64);
772
773 for i in 0..chains_to_generate {
774 if control.is_cancelled() {
775 break;
776 }
777
778 while control.is_paused() {
780 std::thread::sleep(std::time::Duration::from_millis(100));
781 if control.is_cancelled() {
782 break;
783 }
784 }
785
786 let vendor = &vendors[i % vendors.len()];
787 let material_refs: Vec<&datasynth_core::models::Material> =
788 vec![&materials[i % materials.len()]];
789
790 let days_offset = (i as i64 % total_period_days).max(0);
792 let po_date = start_date + chrono::Duration::days(days_offset);
793 let fiscal_year = po_date.year() as u16;
794 let fiscal_period = po_date.month() as u8;
795
796 let chain = p2p_gen.generate_chain(
797 company_code,
798 vendor,
799 &material_refs,
800 po_date,
801 fiscal_year,
802 fiscal_period,
803 "SYSTEM",
804 );
805
806 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
808 chain.purchase_order,
809 ))))?;
810 count += 1;
811
812 for gr in chain.goods_receipts {
813 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
814 count += 1;
815 }
816
817 if let Some(vi) = chain.vendor_invoice {
818 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
819 vi,
820 ))))?;
821 count += 1;
822 }
823
824 if let Some(payment) = chain.payment {
825 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
826 count += 1;
827 }
828
829 if count.is_multiple_of(progress_interval) {
830 progress.items_generated = count;
831 sender.send(StreamEvent::Progress(progress.clone()))?;
832 }
833 }
834 }
835
836 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
838 info!("Generating O2C document flows");
839 let mut o2c_gen = O2CGenerator::new(seed + 200);
840
841 let chains_to_generate = base_chains.min(1000);
842
843 for i in 0..chains_to_generate {
844 if control.is_cancelled() {
845 break;
846 }
847
848 while control.is_paused() {
849 std::thread::sleep(std::time::Duration::from_millis(100));
850 if control.is_cancelled() {
851 break;
852 }
853 }
854
855 let customer = &customers[i % customers.len()];
856 let material_refs: Vec<&datasynth_core::models::Material> =
857 vec![&materials[i % materials.len()]];
858
859 let days_offset = (i as i64 % total_period_days).max(0);
860 let so_date = start_date + chrono::Duration::days(days_offset);
861 let fiscal_year = so_date.year() as u16;
862 let fiscal_period = so_date.month() as u8;
863
864 let chain = o2c_gen.generate_chain(
865 company_code,
866 customer,
867 &material_refs,
868 so_date,
869 fiscal_year,
870 fiscal_period,
871 "SYSTEM",
872 );
873
874 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
875 chain.sales_order,
876 ))))?;
877 count += 1;
878
879 for delivery in chain.deliveries {
880 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
881 delivery,
882 ))))?;
883 count += 1;
884 }
885
886 if let Some(ci) = chain.customer_invoice {
887 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
888 ci,
889 ))))?;
890 count += 1;
891 }
892
893 if count.is_multiple_of(progress_interval) {
894 progress.items_generated = count;
895 sender.send(StreamEvent::Progress(progress.clone()))?;
896 }
897 }
898 }
899
900 Ok(count)
901 }
902
903 pub fn stats(&self) -> StreamingOrchestratorStats {
905 StreamingOrchestratorStats {
906 phases: self.config.phases.len(),
907 buffer_size: self.config.stream_config.buffer_size,
908 backpressure: self.config.stream_config.backpressure,
909 }
910 }
911}
912
913#[derive(Debug, Clone)]
915pub struct StreamingOrchestratorStats {
916 pub phases: usize,
918 pub buffer_size: usize,
920 pub backpressure: BackpressureStrategy,
922}
923
924fn resolve_coa_framework_from_config(
926 config: &GeneratorConfig,
927) -> datasynth_generators::coa_generator::CoAFramework {
928 use datasynth_generators::coa_generator::CoAFramework;
929 if config.accounting_standards.enabled {
930 match config.accounting_standards.framework {
931 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
932 return CoAFramework::FrenchPcg;
933 }
934 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
935 return CoAFramework::GermanSkr04;
936 }
937 _ => {}
938 }
939 }
940 CoAFramework::UsGaap
941}
942
943#[cfg(test)]
944mod tests {
945 use super::*;
946 use datasynth_config::presets::create_preset;
947 use datasynth_config::schema::TransactionVolume;
948 use datasynth_core::models::{CoAComplexity, IndustrySector};
949
950 fn create_test_config() -> GeneratorConfig {
951 create_preset(
952 IndustrySector::Retail,
953 2,
954 3,
955 CoAComplexity::Small,
956 TransactionVolume::TenK,
957 )
958 }
959
960 #[test]
961 fn test_streaming_orchestrator_creation() {
962 let config = create_test_config();
963 let orchestrator = StreamingOrchestrator::from_generator_config(config);
964 let stats = orchestrator.stats();
965
966 assert!(stats.phases > 0);
967 assert!(stats.buffer_size > 0);
968 }
969
970 #[test]
971 fn test_streaming_generation() {
972 let mut config = create_test_config();
973 config.master_data.vendors.count = 5;
975 config.master_data.customers.count = 5;
976 config.master_data.employees.count = 5;
977 config.global.period_months = 1;
978
979 let streaming_config = StreamingOrchestratorConfig::new(config)
980 .with_phases(vec![
981 GenerationPhase::ChartOfAccounts,
982 GenerationPhase::MasterData,
983 ])
984 .with_stream_config(StreamConfig {
985 buffer_size: 100,
986 progress_interval: 10,
987 ..Default::default()
988 });
989
990 let orchestrator = StreamingOrchestrator::new(streaming_config);
991 let (receiver, _control) = orchestrator.stream().unwrap();
992
993 let mut items_count = 0;
994 let mut has_coa = false;
995 let mut has_completion = false;
996
997 for event in receiver {
998 match event {
999 StreamEvent::Data(item) => {
1000 items_count += 1;
1001 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1002 has_coa = true;
1003 }
1004 }
1005 StreamEvent::Complete(_) => {
1006 has_completion = true;
1007 break;
1008 }
1009 _ => {}
1010 }
1011 }
1012
1013 assert!(items_count > 0);
1014 assert!(has_coa);
1015 assert!(has_completion);
1016 }
1017
1018 #[test]
1019 fn test_stream_cancellation() {
1020 let mut config = create_test_config();
1021 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
1024 .with_phases(vec![GenerationPhase::JournalEntries]);
1025
1026 let orchestrator = StreamingOrchestrator::new(streaming_config);
1027 let (receiver, control) = orchestrator.stream().unwrap();
1028
1029 let mut items_count = 0;
1031 for event in receiver {
1032 if let StreamEvent::Data(_) = event {
1033 items_count += 1;
1034 if items_count >= 10 {
1035 control.cancel();
1036 break;
1037 }
1038 }
1039 }
1040
1041 assert!(control.is_cancelled());
1042 }
1043
1044 #[test]
1045 fn test_streaming_anomaly_injection() {
1046 let mut config = create_test_config();
1047 config.master_data.vendors.count = 3;
1049 config.master_data.customers.count = 3;
1050 config.master_data.employees.count = 3;
1051 config.global.period_months = 1;
1052
1053 config.anomaly_injection.enabled = true;
1055 config.anomaly_injection.rates.total_rate = 0.20; config.anomaly_injection.rates.fraud_rate = 0.40;
1057 config.anomaly_injection.rates.error_rate = 0.40;
1058 config.anomaly_injection.rates.process_rate = 0.20;
1059
1060 let streaming_config = StreamingOrchestratorConfig::new(config)
1061 .with_phases(vec![GenerationPhase::JournalEntries])
1062 .with_stream_config(StreamConfig {
1063 buffer_size: 500,
1064 progress_interval: 50,
1065 ..Default::default()
1066 });
1067
1068 let orchestrator = StreamingOrchestrator::new(streaming_config);
1069 let (receiver, _control) = orchestrator.stream().unwrap();
1070
1071 let mut je_count = 0;
1072 let mut label_count = 0;
1073 let mut has_completion = false;
1074
1075 for event in receiver {
1076 match event {
1077 StreamEvent::Data(item) => match item {
1078 GeneratedItem::JournalEntry(_) => je_count += 1,
1079 GeneratedItem::AnomalyLabel(_) => label_count += 1,
1080 _ => {}
1081 },
1082 StreamEvent::Complete(_) => {
1083 has_completion = true;
1084 break;
1085 }
1086 _ => {}
1087 }
1088 }
1089
1090 assert!(has_completion, "Stream should complete");
1091 assert!(je_count > 0, "Should generate journal entries");
1092 assert!(
1093 label_count > 0,
1094 "Should generate anomaly labels (got {} JEs, {} labels)",
1095 je_count,
1096 label_count
1097 );
1098 }
1099
1100 #[test]
1101 fn test_streaming_no_anomalies_when_disabled() {
1102 let mut config = create_test_config();
1103 config.master_data.vendors.count = 3;
1104 config.master_data.customers.count = 3;
1105 config.master_data.employees.count = 3;
1106 config.global.period_months = 1;
1107
1108 config.anomaly_injection.enabled = false;
1110 config.fraud.enabled = false;
1111
1112 let streaming_config = StreamingOrchestratorConfig::new(config)
1113 .with_phases(vec![GenerationPhase::JournalEntries])
1114 .with_stream_config(StreamConfig {
1115 buffer_size: 500,
1116 progress_interval: 50,
1117 ..Default::default()
1118 });
1119
1120 let orchestrator = StreamingOrchestrator::new(streaming_config);
1121 let (receiver, _control) = orchestrator.stream().unwrap();
1122
1123 let mut label_count = 0;
1124
1125 for event in receiver {
1126 match event {
1127 StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1128 StreamEvent::Complete(_) => break,
1129 _ => {}
1130 }
1131 }
1132
1133 assert_eq!(
1134 label_count, 0,
1135 "Should not generate anomaly labels when disabled"
1136 );
1137 }
1138
1139 #[test]
1140 fn test_generated_item_type_name() {
1141 use datasynth_core::models::{CoAComplexity, IndustrySector};
1142
1143 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1144 "TEST_COA".to_string(),
1145 "Test Chart of Accounts".to_string(),
1146 "US".to_string(),
1147 IndustrySector::Manufacturing,
1148 CoAComplexity::Small,
1149 )));
1150 assert_eq!(coa.type_name(), "chart_of_accounts");
1151
1152 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1153 assert_eq!(progress.type_name(), "progress");
1154 }
1155}