1use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48 documents::{
49 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50 },
51 AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52 Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62 ChartOfAccounts(Box<ChartOfAccounts>),
64 Vendor(Box<Vendor>),
66 Customer(Box<Customer>),
68 Material(Box<Material>),
70 Employee(Box<Employee>),
72 JournalEntry(Box<JournalEntry>),
74 PurchaseOrder(Box<PurchaseOrder>),
76 GoodsReceipt(Box<GoodsReceipt>),
78 VendorInvoice(Box<VendorInvoice>),
80 Payment(Box<Payment>),
82 SalesOrder(Box<SalesOrder>),
84 Delivery(Box<Delivery>),
86 CustomerInvoice(Box<CustomerInvoice>),
88 AnomalyLabel(Box<LabeledAnomaly>),
90 Progress(StreamProgress),
92 PhaseComplete(String),
94}
95
96impl GeneratedItem {
97 pub fn type_name(&self) -> &'static str {
99 match self {
100 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101 GeneratedItem::Vendor(_) => "vendor",
102 GeneratedItem::Customer(_) => "customer",
103 GeneratedItem::Material(_) => "material",
104 GeneratedItem::Employee(_) => "employee",
105 GeneratedItem::JournalEntry(_) => "journal_entry",
106 GeneratedItem::PurchaseOrder(_) => "purchase_order",
107 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109 GeneratedItem::Payment(_) => "payment",
110 GeneratedItem::SalesOrder(_) => "sales_order",
111 GeneratedItem::Delivery(_) => "delivery",
112 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113 GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114 GeneratedItem::Progress(_) => "progress",
115 GeneratedItem::PhaseComplete(_) => "phase_complete",
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123 ChartOfAccounts,
125 MasterData,
127 DocumentFlows,
129 OcpmEvents,
131 JournalEntries,
133 AnomalyInjection,
135 BalanceValidation,
137 DataQuality,
139 Complete,
141}
142
143impl GenerationPhase {
144 pub fn name(&self) -> &'static str {
146 match self {
147 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148 GenerationPhase::MasterData => "master_data",
149 GenerationPhase::DocumentFlows => "document_flows",
150 GenerationPhase::OcpmEvents => "ocpm_events",
151 GenerationPhase::JournalEntries => "journal_entries",
152 GenerationPhase::AnomalyInjection => "anomaly_injection",
153 GenerationPhase::BalanceValidation => "balance_validation",
154 GenerationPhase::DataQuality => "data_quality",
155 GenerationPhase::Complete => "complete",
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163 pub generator_config: GeneratorConfig,
165 pub stream_config: StreamConfig,
167 pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172 pub fn new(generator_config: GeneratorConfig) -> Self {
174 Self {
175 generator_config,
176 stream_config: StreamConfig::default(),
177 phases: vec![
178 GenerationPhase::ChartOfAccounts,
179 GenerationPhase::MasterData,
180 GenerationPhase::DocumentFlows,
181 GenerationPhase::JournalEntries,
182 ],
183 }
184 }
185
186 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188 Self {
189 generator_config,
190 stream_config: StreamConfig::default(),
191 phases: vec![
192 GenerationPhase::ChartOfAccounts,
193 GenerationPhase::MasterData,
194 GenerationPhase::DocumentFlows,
195 GenerationPhase::OcpmEvents,
196 GenerationPhase::JournalEntries,
197 GenerationPhase::AnomalyInjection,
198 GenerationPhase::DataQuality,
199 ],
200 }
201 }
202
203 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205 self.stream_config = config;
206 self
207 }
208
209 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211 self.phases = phases;
212 self
213 }
214}
215
216pub struct StreamingOrchestrator {
218 config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222 pub fn new(config: StreamingOrchestratorConfig) -> Self {
224 Self { config }
225 }
226
227 pub fn from_generator_config(config: GeneratorConfig) -> Self {
229 Self::new(StreamingOrchestratorConfig::new(config))
230 }
231
232 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236 let (sender, receiver) = stream_channel(
237 self.config.stream_config.buffer_size,
238 self.config.stream_config.backpressure,
239 );
240
241 let control = Arc::new(StreamControl::new());
242 let control_clone = Arc::clone(&control);
243
244 let config = self.config.clone();
245
246 thread::spawn(move || {
248 let result = Self::run_generation(config, sender, control_clone);
249 if let Err(e) = result {
250 warn!("Streaming generation error: {}", e);
251 }
252 });
253
254 Ok((receiver, control))
255 }
256
257 fn run_generation(
259 config: StreamingOrchestratorConfig,
260 sender: StreamSender<GeneratedItem>,
261 control: Arc<StreamControl>,
262 ) -> SynthResult<()> {
263 let start_time = Instant::now();
264 let mut items_generated: u64 = 0;
265 let mut phases_completed = Vec::new();
266
267 let progress_interval = config.stream_config.progress_interval;
269
270 let mut progress = StreamProgress::new("initializing");
272 sender.send(StreamEvent::Progress(progress.clone()))?;
273
274 for phase in &config.phases {
275 if control.is_cancelled() {
276 info!("Generation cancelled");
277 break;
278 }
279
280 while control.is_paused() {
282 std::thread::sleep(std::time::Duration::from_millis(100));
283 if control.is_cancelled() {
284 break;
285 }
286 }
287
288 progress.phase = phase.name().to_string();
289 sender.send(StreamEvent::Progress(progress.clone()))?;
290
291 match phase {
292 GenerationPhase::ChartOfAccounts => {
293 let result =
294 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295 items_generated += result;
296 }
297 GenerationPhase::MasterData => {
298 let result = Self::generate_master_data_phase(
299 &config.generator_config,
300 &sender,
301 &control,
302 )?;
303 items_generated += result;
304 }
305 GenerationPhase::DocumentFlows => {
306 let result = Self::generate_document_flows_phase(
307 &config.generator_config,
308 &sender,
309 &control,
310 progress_interval,
311 &mut progress,
312 )?;
313 items_generated += result;
314 }
315 GenerationPhase::OcpmEvents => {
316 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317 }
318 GenerationPhase::JournalEntries => {
319 let result = Self::generate_journal_entries_phase(
320 &config.generator_config,
321 &sender,
322 &control,
323 progress_interval,
324 &mut progress,
325 )?;
326 items_generated += result;
327 }
328 GenerationPhase::AnomalyInjection => {
329 info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330 }
331 GenerationPhase::DataQuality => {
332 info!(
333 "Data quality injection is not yet supported in streaming mode; skipping"
334 );
335 }
336 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337 info!("Phase {:?} is not applicable in streaming mode", phase);
338 }
339 }
340
341 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343 phase.name().to_string(),
344 )))?;
345 phases_completed.push(phase.name().to_string());
346
347 progress.items_generated = items_generated;
349 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350 if progress.elapsed_ms > 0 {
351 progress.items_per_second =
352 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353 }
354 sender.send(StreamEvent::Progress(progress.clone()))?;
355 }
356
357 let stats = sender.stats();
359 let summary = StreamSummary {
360 total_items: items_generated,
361 total_time_ms: start_time.elapsed().as_millis() as u64,
362 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364 } else {
365 0.0
366 },
367 error_count: 0,
368 dropped_count: stats.items_dropped,
369 peak_memory_mb: None,
370 phases_completed,
371 };
372
373 sender.send(StreamEvent::Complete(summary))?;
374 sender.close();
375
376 Ok(())
377 }
378
379 fn generate_coa_phase(
381 config: &GeneratorConfig,
382 sender: &StreamSender<GeneratedItem>,
383 control: &Arc<StreamControl>,
384 ) -> SynthResult<u64> {
385 use datasynth_generators::ChartOfAccountsGenerator;
386
387 if control.is_cancelled() {
388 return Ok(0);
389 }
390
391 info!("Generating Chart of Accounts");
392 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393 let complexity = config.chart_of_accounts.complexity;
394 let industry = config.global.industry;
395 let coa_framework = resolve_coa_framework_from_config(config);
396
397 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398 .with_coa_framework(coa_framework);
399 let coa = coa_gen.generate();
400
401 let account_count = coa.account_count() as u64;
402 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403 coa,
404 ))))?;
405
406 Ok(account_count)
407 }
408
409 fn generate_master_data_phase(
411 config: &GeneratorConfig,
412 sender: &StreamSender<GeneratedItem>,
413 control: &Arc<StreamControl>,
414 ) -> SynthResult<u64> {
415 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417 let mut count: u64 = 0;
418 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419 let md_config = &config.master_data;
420 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421 .unwrap_or_else(|e| {
422 tracing::warn!(
423 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424 config.global.start_date,
425 e
426 );
427 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428 });
429
430 let company_code = config
431 .companies
432 .first()
433 .map(|c| c.code.as_str())
434 .unwrap_or_else(|| {
435 tracing::warn!("No companies configured, defaulting to company code '1000'");
436 "1000"
437 });
438
439 if control.is_cancelled() {
441 return Ok(count);
442 }
443
444 info!("Generating vendors");
445 let mut vendor_gen = VendorGenerator::new(seed);
446 for _ in 0..md_config.vendors.count {
447 if control.is_cancelled() {
448 break;
449 }
450 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452 count += 1;
453 }
454
455 if control.is_cancelled() {
457 return Ok(count);
458 }
459
460 info!("Generating customers");
461 let mut customer_gen = CustomerGenerator::new(seed + 1);
462 for _ in 0..md_config.customers.count {
463 if control.is_cancelled() {
464 break;
465 }
466 let customer = customer_gen.generate_customer(company_code, effective_date);
467 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468 customer,
469 ))))?;
470 count += 1;
471 }
472
473 if control.is_cancelled() {
475 return Ok(count);
476 }
477
478 info!("Generating employees");
479 let mut employee_gen = EmployeeGenerator::new(seed + 4);
480 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482 datasynth_generators::DepartmentDefinition {
483 code: first_custom.code.clone(),
484 name: first_custom.name.clone(),
485 cost_center: first_custom
486 .cost_center
487 .clone()
488 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489 headcount: 10,
490 system_roles: vec![],
491 transaction_codes: vec![],
492 }
493 } else {
494 warn!("No departments configured, using default 'General' department");
495 datasynth_generators::DepartmentDefinition {
496 code: "1000".to_string(),
497 name: "General".to_string(),
498 cost_center: "CC1000".to_string(),
499 headcount: 10,
500 system_roles: vec![],
501 transaction_codes: vec![],
502 }
503 };
504 for _ in 0..md_config.employees.count {
505 if control.is_cancelled() {
506 break;
507 }
508 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510 employee,
511 ))))?;
512 count += 1;
513 }
514
515 Ok(count)
516 }
517
518 fn generate_journal_entries_phase(
526 config: &GeneratorConfig,
527 sender: &StreamSender<GeneratedItem>,
528 control: &Arc<StreamControl>,
529 progress_interval: u64,
530 progress: &mut StreamProgress,
531 ) -> SynthResult<u64> {
532 use datasynth_generators::{
533 AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator,
534 EnhancedInjectionConfig, JournalEntryGenerator,
535 };
536 use std::sync::Arc;
537
538 let mut count: u64 = 0;
539 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
540
541 let default_monthly = 500;
543 let total_entries: usize = config
544 .companies
545 .iter()
546 .map(|c| {
547 let monthly = (c.volume_weight * default_monthly as f64) as usize;
548 monthly.max(100) * config.global.period_months as usize
549 })
550 .sum();
551
552 progress.items_remaining = Some(total_entries as u64);
553 info!("Generating {} journal entries", total_entries);
554
555 let complexity = config.chart_of_accounts.complexity;
557 let industry = config.global.industry;
558 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
559 let coa = Arc::new(coa_gen.generate());
560
561 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
563 .unwrap_or_else(|e| {
564 tracing::warn!(
565 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
566 config.global.start_date,
567 e
568 );
569 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
570 });
571 let end_date = start_date
572 .checked_add_months(chrono::Months::new(config.global.period_months))
573 .unwrap_or(start_date + chrono::Duration::days(365));
574
575 let mut je_gen = JournalEntryGenerator::from_generator_config(
577 config,
578 Arc::clone(&coa),
579 start_date,
580 end_date,
581 seed,
582 );
583
584 let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
587 let mut anomaly_injector = if anomaly_enabled {
588 let total_rate = if config.anomaly_injection.enabled {
589 config.anomaly_injection.rates.total_rate
590 } else {
591 config.fraud.fraud_rate
592 };
593 let fraud_rate = if config.anomaly_injection.enabled {
594 config.anomaly_injection.rates.fraud_rate
595 } else {
596 AnomalyRateConfig::default().fraud_rate
597 };
598 let error_rate = if config.anomaly_injection.enabled {
599 config.anomaly_injection.rates.error_rate
600 } else {
601 AnomalyRateConfig::default().error_rate
602 };
603 let process_issue_rate = if config.anomaly_injection.enabled {
604 config.anomaly_injection.rates.process_rate
605 } else {
606 AnomalyRateConfig::default().process_issue_rate
607 };
608
609 let injector_config = AnomalyInjectorConfig {
610 rates: AnomalyRateConfig {
611 total_rate,
612 fraud_rate,
613 error_rate,
614 process_issue_rate,
615 ..Default::default()
616 },
617 enhanced: EnhancedInjectionConfig {
618 fraud_behavioral_bias: config.fraud.effective_bias().to_core(),
619 fraud_campaign: config.fraud.campaigns.clone(),
621 ..Default::default()
622 },
623 seed: seed + 5000,
624 ..Default::default()
625 };
626
627 info!(
628 "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
629 total_rate
630 );
631 Some(AnomalyInjector::new(injector_config))
632 } else {
633 None
634 };
635
636 let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
639 let mut remaining = total_entries;
640
641 while remaining > 0 {
642 if control.is_cancelled() {
643 break;
644 }
645
646 let current_batch = remaining.min(batch_size);
647 let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
648
649 for _ in 0..current_batch {
650 if control.is_cancelled() {
651 break;
652 }
653
654 while control.is_paused() {
656 std::thread::sleep(std::time::Duration::from_millis(100));
657 if control.is_cancelled() {
658 break;
659 }
660 }
661
662 batch.push(je_gen.generate());
663 }
664
665 if batch.is_empty() {
666 break;
667 }
668
669 if let Some(ref mut injector) = anomaly_injector {
671 let result = injector.process_entries(&mut batch);
672
673 for label in result.labels {
675 sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
676 label,
677 ))))?;
678 }
679 }
680
681 for je in batch {
683 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
684 count += 1;
685
686 if count.is_multiple_of(progress_interval) {
688 progress.items_generated = count;
689 progress.items_remaining = Some(total_entries as u64 - count);
690 sender.send(StreamEvent::Progress(progress.clone()))?;
691 }
692 }
693
694 remaining = remaining.saturating_sub(current_batch);
695 }
696
697 Ok(count)
698 }
699
700 fn generate_document_flows_phase(
706 config: &GeneratorConfig,
707 sender: &StreamSender<GeneratedItem>,
708 control: &Arc<StreamControl>,
709 progress_interval: u64,
710 progress: &mut StreamProgress,
711 ) -> SynthResult<u64> {
712 use chrono::Datelike;
713 use datasynth_generators::{
714 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
715 };
716
717 let mut count: u64 = 0;
718 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
719 let df_config = &config.document_flows;
720 let md_config = &config.master_data;
721
722 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
724 .unwrap_or_else(|e| {
725 tracing::warn!(
726 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
727 config.global.start_date,
728 e
729 );
730 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
731 });
732 let end_date = start_date
733 .checked_add_months(chrono::Months::new(config.global.period_months))
734 .unwrap_or(start_date + chrono::Duration::days(365));
735 let total_period_days = (end_date - start_date).num_days().max(1);
736
737 let company_code = config
738 .companies
739 .first()
740 .map(|c| c.code.as_str())
741 .unwrap_or_else(|| {
742 tracing::warn!("No companies configured, defaulting to company code '1000'");
743 "1000"
744 });
745
746 let vendor_count = md_config.vendors.count.min(100);
748 let customer_count = md_config.customers.count.min(100);
749 let material_count = md_config.materials.count.min(50);
750
751 let mut vendor_gen = VendorGenerator::new(seed);
753 let mut customer_gen = CustomerGenerator::new(seed + 1);
754 let mut material_gen = MaterialGenerator::new(seed + 2);
755
756 let vendors: Vec<_> = (0..vendor_count)
757 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
758 .collect();
759
760 let customers: Vec<_> = (0..customer_count)
761 .map(|_| customer_gen.generate_customer(company_code, start_date))
762 .collect();
763
764 let materials: Vec<_> = (0..material_count)
765 .map(|_| material_gen.generate_material(company_code, start_date))
766 .collect();
767
768 let base_chains = (config.global.period_months as usize * 50).max(100);
771
772 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
774 info!("Generating P2P document flows");
775 let mut p2p_gen = P2PGenerator::new(seed + 100);
776
777 let chains_to_generate = base_chains.min(1000);
778 progress.items_remaining = Some(chains_to_generate as u64);
779
780 for i in 0..chains_to_generate {
781 if control.is_cancelled() {
782 break;
783 }
784
785 while control.is_paused() {
787 std::thread::sleep(std::time::Duration::from_millis(100));
788 if control.is_cancelled() {
789 break;
790 }
791 }
792
793 let vendor = &vendors[i % vendors.len()];
794 let material_refs: Vec<&datasynth_core::models::Material> =
795 vec![&materials[i % materials.len()]];
796
797 let days_offset = (i as i64 % total_period_days).max(0);
799 let po_date = start_date + chrono::Duration::days(days_offset);
800 let fiscal_year = po_date.year() as u16;
801 let fiscal_period = po_date.month() as u8;
802
803 let chain = p2p_gen.generate_chain(
804 company_code,
805 vendor,
806 &material_refs,
807 po_date,
808 fiscal_year,
809 fiscal_period,
810 "SYSTEM",
811 );
812
813 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
815 chain.purchase_order,
816 ))))?;
817 count += 1;
818
819 for gr in chain.goods_receipts {
820 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
821 count += 1;
822 }
823
824 if let Some(vi) = chain.vendor_invoice {
825 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
826 vi,
827 ))))?;
828 count += 1;
829 }
830
831 if let Some(payment) = chain.payment {
832 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
833 count += 1;
834 }
835
836 if count.is_multiple_of(progress_interval) {
837 progress.items_generated = count;
838 sender.send(StreamEvent::Progress(progress.clone()))?;
839 }
840 }
841 }
842
843 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
845 info!("Generating O2C document flows");
846 let mut o2c_gen = O2CGenerator::new(seed + 200);
847
848 let chains_to_generate = base_chains.min(1000);
849
850 for i in 0..chains_to_generate {
851 if control.is_cancelled() {
852 break;
853 }
854
855 while control.is_paused() {
856 std::thread::sleep(std::time::Duration::from_millis(100));
857 if control.is_cancelled() {
858 break;
859 }
860 }
861
862 let customer = &customers[i % customers.len()];
863 let material_refs: Vec<&datasynth_core::models::Material> =
864 vec![&materials[i % materials.len()]];
865
866 let days_offset = (i as i64 % total_period_days).max(0);
867 let so_date = start_date + chrono::Duration::days(days_offset);
868 let fiscal_year = so_date.year() as u16;
869 let fiscal_period = so_date.month() as u8;
870
871 let chain = o2c_gen.generate_chain(
872 company_code,
873 customer,
874 &material_refs,
875 so_date,
876 fiscal_year,
877 fiscal_period,
878 "SYSTEM",
879 );
880
881 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
882 chain.sales_order,
883 ))))?;
884 count += 1;
885
886 for delivery in chain.deliveries {
887 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
888 delivery,
889 ))))?;
890 count += 1;
891 }
892
893 if let Some(ci) = chain.customer_invoice {
894 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
895 ci,
896 ))))?;
897 count += 1;
898 }
899
900 if count.is_multiple_of(progress_interval) {
901 progress.items_generated = count;
902 sender.send(StreamEvent::Progress(progress.clone()))?;
903 }
904 }
905 }
906
907 Ok(count)
908 }
909
910 pub fn stats(&self) -> StreamingOrchestratorStats {
912 StreamingOrchestratorStats {
913 phases: self.config.phases.len(),
914 buffer_size: self.config.stream_config.buffer_size,
915 backpressure: self.config.stream_config.backpressure,
916 }
917 }
918}
919
920#[derive(Debug, Clone)]
922pub struct StreamingOrchestratorStats {
923 pub phases: usize,
925 pub buffer_size: usize,
927 pub backpressure: BackpressureStrategy,
929}
930
931fn resolve_coa_framework_from_config(
933 config: &GeneratorConfig,
934) -> datasynth_generators::coa_generator::CoAFramework {
935 use datasynth_generators::coa_generator::CoAFramework;
936 if config.accounting_standards.enabled {
937 match config.accounting_standards.framework {
938 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
939 return CoAFramework::FrenchPcg;
940 }
941 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
942 return CoAFramework::GermanSkr04;
943 }
944 _ => {}
945 }
946 }
947 CoAFramework::UsGaap
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use datasynth_config::presets::create_preset;
954 use datasynth_config::schema::TransactionVolume;
955 use datasynth_core::models::{CoAComplexity, IndustrySector};
956
957 fn create_test_config() -> GeneratorConfig {
958 create_preset(
959 IndustrySector::Retail,
960 2,
961 3,
962 CoAComplexity::Small,
963 TransactionVolume::TenK,
964 )
965 }
966
967 #[test]
968 fn test_streaming_orchestrator_creation() {
969 let config = create_test_config();
970 let orchestrator = StreamingOrchestrator::from_generator_config(config);
971 let stats = orchestrator.stats();
972
973 assert!(stats.phases > 0);
974 assert!(stats.buffer_size > 0);
975 }
976
977 #[test]
978 fn test_streaming_generation() {
979 let mut config = create_test_config();
980 config.master_data.vendors.count = 5;
982 config.master_data.customers.count = 5;
983 config.master_data.employees.count = 5;
984 config.global.period_months = 1;
985
986 let streaming_config = StreamingOrchestratorConfig::new(config)
987 .with_phases(vec![
988 GenerationPhase::ChartOfAccounts,
989 GenerationPhase::MasterData,
990 ])
991 .with_stream_config(StreamConfig {
992 buffer_size: 100,
993 progress_interval: 10,
994 ..Default::default()
995 });
996
997 let orchestrator = StreamingOrchestrator::new(streaming_config);
998 let (receiver, _control) = orchestrator.stream().unwrap();
999
1000 let mut items_count = 0;
1001 let mut has_coa = false;
1002 let mut has_completion = false;
1003
1004 for event in receiver {
1005 match event {
1006 StreamEvent::Data(item) => {
1007 items_count += 1;
1008 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1009 has_coa = true;
1010 }
1011 }
1012 StreamEvent::Complete(_) => {
1013 has_completion = true;
1014 break;
1015 }
1016 _ => {}
1017 }
1018 }
1019
1020 assert!(items_count > 0);
1021 assert!(has_coa);
1022 assert!(has_completion);
1023 }
1024
1025 #[test]
1026 fn test_stream_cancellation() {
1027 let mut config = create_test_config();
1028 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
1031 .with_phases(vec![GenerationPhase::JournalEntries]);
1032
1033 let orchestrator = StreamingOrchestrator::new(streaming_config);
1034 let (receiver, control) = orchestrator.stream().unwrap();
1035
1036 let mut items_count = 0;
1038 for event in receiver {
1039 if let StreamEvent::Data(_) = event {
1040 items_count += 1;
1041 if items_count >= 10 {
1042 control.cancel();
1043 break;
1044 }
1045 }
1046 }
1047
1048 assert!(control.is_cancelled());
1049 }
1050
1051 #[test]
1052 fn test_streaming_anomaly_injection() {
1053 let mut config = create_test_config();
1054 config.master_data.vendors.count = 3;
1056 config.master_data.customers.count = 3;
1057 config.master_data.employees.count = 3;
1058 config.global.period_months = 1;
1059
1060 config.anomaly_injection.enabled = true;
1062 config.anomaly_injection.rates.total_rate = 0.20; config.anomaly_injection.rates.fraud_rate = 0.40;
1064 config.anomaly_injection.rates.error_rate = 0.40;
1065 config.anomaly_injection.rates.process_rate = 0.20;
1066
1067 let streaming_config = StreamingOrchestratorConfig::new(config)
1068 .with_phases(vec![GenerationPhase::JournalEntries])
1069 .with_stream_config(StreamConfig {
1070 buffer_size: 500,
1071 progress_interval: 50,
1072 ..Default::default()
1073 });
1074
1075 let orchestrator = StreamingOrchestrator::new(streaming_config);
1076 let (receiver, _control) = orchestrator.stream().unwrap();
1077
1078 let mut je_count = 0;
1079 let mut label_count = 0;
1080 let mut has_completion = false;
1081
1082 for event in receiver {
1083 match event {
1084 StreamEvent::Data(item) => match item {
1085 GeneratedItem::JournalEntry(_) => je_count += 1,
1086 GeneratedItem::AnomalyLabel(_) => label_count += 1,
1087 _ => {}
1088 },
1089 StreamEvent::Complete(_) => {
1090 has_completion = true;
1091 break;
1092 }
1093 _ => {}
1094 }
1095 }
1096
1097 assert!(has_completion, "Stream should complete");
1098 assert!(je_count > 0, "Should generate journal entries");
1099 assert!(
1100 label_count > 0,
1101 "Should generate anomaly labels (got {} JEs, {} labels)",
1102 je_count,
1103 label_count
1104 );
1105 }
1106
1107 #[test]
1108 fn test_streaming_no_anomalies_when_disabled() {
1109 let mut config = create_test_config();
1110 config.master_data.vendors.count = 3;
1111 config.master_data.customers.count = 3;
1112 config.master_data.employees.count = 3;
1113 config.global.period_months = 1;
1114
1115 config.anomaly_injection.enabled = false;
1117 config.fraud.enabled = false;
1118
1119 let streaming_config = StreamingOrchestratorConfig::new(config)
1120 .with_phases(vec![GenerationPhase::JournalEntries])
1121 .with_stream_config(StreamConfig {
1122 buffer_size: 500,
1123 progress_interval: 50,
1124 ..Default::default()
1125 });
1126
1127 let orchestrator = StreamingOrchestrator::new(streaming_config);
1128 let (receiver, _control) = orchestrator.stream().unwrap();
1129
1130 let mut label_count = 0;
1131
1132 for event in receiver {
1133 match event {
1134 StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1135 StreamEvent::Complete(_) => break,
1136 _ => {}
1137 }
1138 }
1139
1140 assert_eq!(
1141 label_count, 0,
1142 "Should not generate anomaly labels when disabled"
1143 );
1144 }
1145
1146 #[test]
1147 fn test_generated_item_type_name() {
1148 use datasynth_core::models::{CoAComplexity, IndustrySector};
1149
1150 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1151 "TEST_COA".to_string(),
1152 "Test Chart of Accounts".to_string(),
1153 "US".to_string(),
1154 IndustrySector::Manufacturing,
1155 CoAComplexity::Small,
1156 )));
1157 assert_eq!(coa.type_name(), "chart_of_accounts");
1158
1159 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1160 assert_eq!(progress.type_name(), "progress");
1161 }
1162}