1use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48 documents::{
49 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50 },
51 AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52 Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62 ChartOfAccounts(Box<ChartOfAccounts>),
64 Vendor(Box<Vendor>),
66 Customer(Box<Customer>),
68 Material(Box<Material>),
70 Employee(Box<Employee>),
72 JournalEntry(Box<JournalEntry>),
74 PurchaseOrder(Box<PurchaseOrder>),
76 GoodsReceipt(Box<GoodsReceipt>),
78 VendorInvoice(Box<VendorInvoice>),
80 Payment(Box<Payment>),
82 SalesOrder(Box<SalesOrder>),
84 Delivery(Box<Delivery>),
86 CustomerInvoice(Box<CustomerInvoice>),
88 AnomalyLabel(Box<LabeledAnomaly>),
90 Progress(StreamProgress),
92 PhaseComplete(String),
94}
95
96impl GeneratedItem {
97 pub fn type_name(&self) -> &'static str {
99 match self {
100 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101 GeneratedItem::Vendor(_) => "vendor",
102 GeneratedItem::Customer(_) => "customer",
103 GeneratedItem::Material(_) => "material",
104 GeneratedItem::Employee(_) => "employee",
105 GeneratedItem::JournalEntry(_) => "journal_entry",
106 GeneratedItem::PurchaseOrder(_) => "purchase_order",
107 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109 GeneratedItem::Payment(_) => "payment",
110 GeneratedItem::SalesOrder(_) => "sales_order",
111 GeneratedItem::Delivery(_) => "delivery",
112 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113 GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114 GeneratedItem::Progress(_) => "progress",
115 GeneratedItem::PhaseComplete(_) => "phase_complete",
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123 ChartOfAccounts,
125 MasterData,
127 DocumentFlows,
129 OcpmEvents,
131 JournalEntries,
133 AnomalyInjection,
135 BalanceValidation,
137 DataQuality,
139 Complete,
141}
142
143impl GenerationPhase {
144 pub fn name(&self) -> &'static str {
146 match self {
147 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148 GenerationPhase::MasterData => "master_data",
149 GenerationPhase::DocumentFlows => "document_flows",
150 GenerationPhase::OcpmEvents => "ocpm_events",
151 GenerationPhase::JournalEntries => "journal_entries",
152 GenerationPhase::AnomalyInjection => "anomaly_injection",
153 GenerationPhase::BalanceValidation => "balance_validation",
154 GenerationPhase::DataQuality => "data_quality",
155 GenerationPhase::Complete => "complete",
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163 pub generator_config: GeneratorConfig,
165 pub stream_config: StreamConfig,
167 pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172 pub fn new(generator_config: GeneratorConfig) -> Self {
174 Self {
175 generator_config,
176 stream_config: StreamConfig::default(),
177 phases: vec![
178 GenerationPhase::ChartOfAccounts,
179 GenerationPhase::MasterData,
180 GenerationPhase::DocumentFlows,
181 GenerationPhase::JournalEntries,
182 ],
183 }
184 }
185
186 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188 Self {
189 generator_config,
190 stream_config: StreamConfig::default(),
191 phases: vec![
192 GenerationPhase::ChartOfAccounts,
193 GenerationPhase::MasterData,
194 GenerationPhase::DocumentFlows,
195 GenerationPhase::OcpmEvents,
196 GenerationPhase::JournalEntries,
197 GenerationPhase::AnomalyInjection,
198 GenerationPhase::DataQuality,
199 ],
200 }
201 }
202
203 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205 self.stream_config = config;
206 self
207 }
208
209 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211 self.phases = phases;
212 self
213 }
214}
215
216pub struct StreamingOrchestrator {
218 config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222 pub fn new(config: StreamingOrchestratorConfig) -> Self {
224 Self { config }
225 }
226
227 pub fn from_generator_config(config: GeneratorConfig) -> Self {
229 Self::new(StreamingOrchestratorConfig::new(config))
230 }
231
232 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236 let (sender, receiver) = stream_channel(
237 self.config.stream_config.buffer_size,
238 self.config.stream_config.backpressure,
239 );
240
241 let control = Arc::new(StreamControl::new());
242 let control_clone = Arc::clone(&control);
243
244 let config = self.config.clone();
245
246 thread::spawn(move || {
248 let result = Self::run_generation(config, sender, control_clone);
249 if let Err(e) = result {
250 warn!("Streaming generation error: {}", e);
251 }
252 });
253
254 Ok((receiver, control))
255 }
256
257 fn run_generation(
259 config: StreamingOrchestratorConfig,
260 sender: StreamSender<GeneratedItem>,
261 control: Arc<StreamControl>,
262 ) -> SynthResult<()> {
263 let start_time = Instant::now();
264 let mut items_generated: u64 = 0;
265 let mut phases_completed = Vec::new();
266
267 let progress_interval = config.stream_config.progress_interval;
269
270 let mut progress = StreamProgress::new("initializing");
272 sender.send(StreamEvent::Progress(progress.clone()))?;
273
274 for phase in &config.phases {
275 if control.is_cancelled() {
276 info!("Generation cancelled");
277 break;
278 }
279
280 while control.is_paused() {
282 std::thread::sleep(std::time::Duration::from_millis(100));
283 if control.is_cancelled() {
284 break;
285 }
286 }
287
288 progress.phase = phase.name().to_string();
289 sender.send(StreamEvent::Progress(progress.clone()))?;
290
291 match phase {
292 GenerationPhase::ChartOfAccounts => {
293 let result =
294 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295 items_generated += result;
296 }
297 GenerationPhase::MasterData => {
298 let result = Self::generate_master_data_phase(
299 &config.generator_config,
300 &sender,
301 &control,
302 )?;
303 items_generated += result;
304 }
305 GenerationPhase::DocumentFlows => {
306 let result = Self::generate_document_flows_phase(
307 &config.generator_config,
308 &sender,
309 &control,
310 progress_interval,
311 &mut progress,
312 )?;
313 items_generated += result;
314 }
315 GenerationPhase::OcpmEvents => {
316 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317 }
318 GenerationPhase::JournalEntries => {
319 let result = Self::generate_journal_entries_phase(
320 &config.generator_config,
321 &sender,
322 &control,
323 progress_interval,
324 &mut progress,
325 )?;
326 items_generated += result;
327 }
328 GenerationPhase::AnomalyInjection => {
329 info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330 }
331 GenerationPhase::DataQuality => {
332 info!(
333 "Data quality injection is not yet supported in streaming mode; skipping"
334 );
335 }
336 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337 info!("Phase {:?} is not applicable in streaming mode", phase);
338 }
339 }
340
341 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343 phase.name().to_string(),
344 )))?;
345 phases_completed.push(phase.name().to_string());
346
347 progress.items_generated = items_generated;
349 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350 if progress.elapsed_ms > 0 {
351 progress.items_per_second =
352 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353 }
354 sender.send(StreamEvent::Progress(progress.clone()))?;
355 }
356
357 let stats = sender.stats();
359 let summary = StreamSummary {
360 total_items: items_generated,
361 total_time_ms: start_time.elapsed().as_millis() as u64,
362 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364 } else {
365 0.0
366 },
367 error_count: 0,
368 dropped_count: stats.items_dropped,
369 peak_memory_mb: None,
370 phases_completed,
371 };
372
373 sender.send(StreamEvent::Complete(summary))?;
374 sender.close();
375
376 Ok(())
377 }
378
379 fn generate_coa_phase(
381 config: &GeneratorConfig,
382 sender: &StreamSender<GeneratedItem>,
383 control: &Arc<StreamControl>,
384 ) -> SynthResult<u64> {
385 use datasynth_generators::ChartOfAccountsGenerator;
386
387 if control.is_cancelled() {
388 return Ok(0);
389 }
390
391 info!("Generating Chart of Accounts");
392 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393 let complexity = config.chart_of_accounts.complexity;
394 let industry = config.global.industry;
395 let coa_framework = resolve_coa_framework_from_config(config);
396
397 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398 .with_coa_framework(coa_framework);
399 let coa = coa_gen.generate();
400
401 let account_count = coa.account_count() as u64;
402 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403 coa,
404 ))))?;
405
406 Ok(account_count)
407 }
408
409 fn generate_master_data_phase(
411 config: &GeneratorConfig,
412 sender: &StreamSender<GeneratedItem>,
413 control: &Arc<StreamControl>,
414 ) -> SynthResult<u64> {
415 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417 let mut count: u64 = 0;
418 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419 let md_config = &config.master_data;
420 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421 .unwrap_or_else(|e| {
422 tracing::warn!(
423 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424 config.global.start_date,
425 e
426 );
427 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428 });
429
430 let company_code = config
431 .companies
432 .first()
433 .map(|c| c.code.as_str())
434 .unwrap_or_else(|| {
435 tracing::warn!("No companies configured, defaulting to company code '1000'");
436 "1000"
437 });
438
439 if control.is_cancelled() {
441 return Ok(count);
442 }
443
444 info!("Generating vendors");
445 let mut vendor_gen = VendorGenerator::new(seed);
446 for _ in 0..md_config.vendors.count {
447 if control.is_cancelled() {
448 break;
449 }
450 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452 count += 1;
453 }
454
455 if control.is_cancelled() {
457 return Ok(count);
458 }
459
460 info!("Generating customers");
461 let mut customer_gen = CustomerGenerator::new(seed + 1);
462 for _ in 0..md_config.customers.count {
463 if control.is_cancelled() {
464 break;
465 }
466 let customer = customer_gen.generate_customer(company_code, effective_date);
467 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468 customer,
469 ))))?;
470 count += 1;
471 }
472
473 if control.is_cancelled() {
475 return Ok(count);
476 }
477
478 info!("Generating employees");
479 let mut employee_gen = EmployeeGenerator::new(seed + 4);
480 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482 datasynth_generators::DepartmentDefinition {
483 code: first_custom.code.clone(),
484 name: first_custom.name.clone(),
485 cost_center: first_custom
486 .cost_center
487 .clone()
488 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489 headcount: 10,
490 system_roles: vec![],
491 transaction_codes: vec![],
492 }
493 } else {
494 warn!("No departments configured, using default 'General' department");
495 datasynth_generators::DepartmentDefinition {
496 code: "1000".to_string(),
497 name: "General".to_string(),
498 cost_center: "CC1000".to_string(),
499 headcount: 10,
500 system_roles: vec![],
501 transaction_codes: vec![],
502 }
503 };
504 for _ in 0..md_config.employees.count {
505 if control.is_cancelled() {
506 break;
507 }
508 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510 employee,
511 ))))?;
512 count += 1;
513 }
514
515 Ok(count)
516 }
517
518 fn generate_journal_entries_phase(
526 config: &GeneratorConfig,
527 sender: &StreamSender<GeneratedItem>,
528 control: &Arc<StreamControl>,
529 progress_interval: u64,
530 progress: &mut StreamProgress,
531 ) -> SynthResult<u64> {
532 use datasynth_generators::{
533 AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator,
534 EnhancedInjectionConfig, JournalEntryGenerator,
535 };
536 use std::sync::Arc;
537
538 let mut count: u64 = 0;
539 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
540
541 let default_monthly = 500;
543 let total_entries: usize = config
544 .companies
545 .iter()
546 .map(|c| {
547 let monthly = (c.volume_weight * default_monthly as f64) as usize;
548 monthly.max(100) * config.global.period_months as usize
549 })
550 .sum();
551
552 progress.items_remaining = Some(total_entries as u64);
553 info!("Generating {} journal entries", total_entries);
554
555 let complexity = config.chart_of_accounts.complexity;
557 let industry = config.global.industry;
558 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
559 let coa = Arc::new(coa_gen.generate());
560
561 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
563 .unwrap_or_else(|e| {
564 tracing::warn!(
565 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
566 config.global.start_date,
567 e
568 );
569 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
570 });
571 let end_date = start_date
572 .checked_add_months(chrono::Months::new(config.global.period_months))
573 .unwrap_or(start_date + chrono::Duration::days(365));
574
575 let mut je_gen = JournalEntryGenerator::from_generator_config(
577 config,
578 Arc::clone(&coa),
579 start_date,
580 end_date,
581 seed,
582 );
583 let company_currencies: std::collections::HashMap<String, String> = config
585 .companies
586 .iter()
587 .map(|c| {
588 (
589 c.code.clone(),
590 c.functional_currency
591 .clone()
592 .unwrap_or_else(|| c.currency.clone()),
593 )
594 })
595 .collect();
596 je_gen = je_gen.with_company_currencies(company_currencies);
597
598 let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
601 let mut anomaly_injector = if anomaly_enabled {
602 let total_rate = if config.anomaly_injection.enabled {
603 config.anomaly_injection.rates.total_rate
604 } else {
605 config.fraud.fraud_rate
606 };
607 let fraud_rate = if config.anomaly_injection.enabled {
608 config.anomaly_injection.rates.fraud_rate
609 } else {
610 AnomalyRateConfig::default().fraud_rate
611 };
612 let error_rate = if config.anomaly_injection.enabled {
613 config.anomaly_injection.rates.error_rate
614 } else {
615 AnomalyRateConfig::default().error_rate
616 };
617 let process_issue_rate = if config.anomaly_injection.enabled {
618 config.anomaly_injection.rates.process_rate
619 } else {
620 AnomalyRateConfig::default().process_issue_rate
621 };
622
623 let injector_config = AnomalyInjectorConfig {
624 rates: AnomalyRateConfig {
625 total_rate,
626 fraud_rate,
627 error_rate,
628 process_issue_rate,
629 ..Default::default()
630 },
631 enhanced: EnhancedInjectionConfig {
632 fraud_behavioral_bias: config.fraud.effective_bias().to_core(),
633 fraud_campaign: config.fraud.campaigns.clone(),
635 ..Default::default()
636 },
637 seed: seed + 5000,
638 ..Default::default()
639 };
640
641 info!(
642 "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
643 total_rate
644 );
645 Some(AnomalyInjector::new(injector_config))
646 } else {
647 None
648 };
649
650 let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
653 let mut remaining = total_entries;
654
655 while remaining > 0 {
656 if control.is_cancelled() {
657 break;
658 }
659
660 let current_batch = remaining.min(batch_size);
661 let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
662
663 for _ in 0..current_batch {
664 if control.is_cancelled() {
665 break;
666 }
667
668 while control.is_paused() {
670 std::thread::sleep(std::time::Duration::from_millis(100));
671 if control.is_cancelled() {
672 break;
673 }
674 }
675
676 batch.push(je_gen.generate());
677 }
678
679 if batch.is_empty() {
680 break;
681 }
682
683 if let Some(ref mut injector) = anomaly_injector {
685 let result = injector.process_entries(&mut batch);
686
687 for label in result.labels {
689 sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
690 label,
691 ))))?;
692 }
693 }
694
695 for je in batch {
697 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
698 count += 1;
699
700 if count.is_multiple_of(progress_interval) {
702 progress.items_generated = count;
703 progress.items_remaining = Some(total_entries as u64 - count);
704 sender.send(StreamEvent::Progress(progress.clone()))?;
705 }
706 }
707
708 remaining = remaining.saturating_sub(current_batch);
709 }
710
711 Ok(count)
712 }
713
714 fn generate_document_flows_phase(
720 config: &GeneratorConfig,
721 sender: &StreamSender<GeneratedItem>,
722 control: &Arc<StreamControl>,
723 progress_interval: u64,
724 progress: &mut StreamProgress,
725 ) -> SynthResult<u64> {
726 use chrono::Datelike;
727 use datasynth_generators::{
728 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
729 };
730
731 let mut count: u64 = 0;
732 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
733 let df_config = &config.document_flows;
734 let md_config = &config.master_data;
735
736 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
738 .unwrap_or_else(|e| {
739 tracing::warn!(
740 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
741 config.global.start_date,
742 e
743 );
744 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
745 });
746 let end_date = start_date
747 .checked_add_months(chrono::Months::new(config.global.period_months))
748 .unwrap_or(start_date + chrono::Duration::days(365));
749 let total_period_days = (end_date - start_date).num_days().max(1);
750
751 let company_code = config
752 .companies
753 .first()
754 .map(|c| c.code.as_str())
755 .unwrap_or_else(|| {
756 tracing::warn!("No companies configured, defaulting to company code '1000'");
757 "1000"
758 });
759
760 let vendor_count = md_config.vendors.count.min(100);
762 let customer_count = md_config.customers.count.min(100);
763 let material_count = md_config.materials.count.min(50);
764
765 let mut vendor_gen = VendorGenerator::new(seed);
767 let mut customer_gen = CustomerGenerator::new(seed + 1);
768 let mut material_gen = MaterialGenerator::new(seed + 2);
769
770 let vendors: Vec<_> = (0..vendor_count)
771 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
772 .collect();
773
774 let customers: Vec<_> = (0..customer_count)
775 .map(|_| customer_gen.generate_customer(company_code, start_date))
776 .collect();
777
778 let materials: Vec<_> = (0..material_count)
779 .map(|_| material_gen.generate_material(company_code, start_date))
780 .collect();
781
782 let base_chains = (config.global.period_months as usize * 50).max(100);
785
786 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
788 info!("Generating P2P document flows");
789 let mut p2p_gen = P2PGenerator::new(seed + 100);
790
791 let chains_to_generate = base_chains.min(1000);
792 progress.items_remaining = Some(chains_to_generate as u64);
793
794 for i in 0..chains_to_generate {
795 if control.is_cancelled() {
796 break;
797 }
798
799 while control.is_paused() {
801 std::thread::sleep(std::time::Duration::from_millis(100));
802 if control.is_cancelled() {
803 break;
804 }
805 }
806
807 let vendor = &vendors[i % vendors.len()];
808 let material_refs: Vec<&datasynth_core::models::Material> =
809 vec![&materials[i % materials.len()]];
810
811 let days_offset = (i as i64 % total_period_days).max(0);
813 let po_date = start_date + chrono::Duration::days(days_offset);
814 let fiscal_year = po_date.year() as u16;
815 let fiscal_period = po_date.month() as u8;
816
817 let chain = p2p_gen.generate_chain(
818 company_code,
819 vendor,
820 &material_refs,
821 po_date,
822 fiscal_year,
823 fiscal_period,
824 "SYSTEM",
825 );
826
827 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
829 chain.purchase_order,
830 ))))?;
831 count += 1;
832
833 for gr in chain.goods_receipts {
834 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
835 count += 1;
836 }
837
838 if let Some(vi) = chain.vendor_invoice {
839 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
840 vi,
841 ))))?;
842 count += 1;
843 }
844
845 if let Some(payment) = chain.payment {
846 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
847 count += 1;
848 }
849
850 if count.is_multiple_of(progress_interval) {
851 progress.items_generated = count;
852 sender.send(StreamEvent::Progress(progress.clone()))?;
853 }
854 }
855 }
856
857 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
859 info!("Generating O2C document flows");
860 let mut o2c_gen = O2CGenerator::new(seed + 200);
861
862 let chains_to_generate = base_chains.min(1000);
863
864 for i in 0..chains_to_generate {
865 if control.is_cancelled() {
866 break;
867 }
868
869 while control.is_paused() {
870 std::thread::sleep(std::time::Duration::from_millis(100));
871 if control.is_cancelled() {
872 break;
873 }
874 }
875
876 let customer = &customers[i % customers.len()];
877 let material_refs: Vec<&datasynth_core::models::Material> =
878 vec![&materials[i % materials.len()]];
879
880 let days_offset = (i as i64 % total_period_days).max(0);
881 let so_date = start_date + chrono::Duration::days(days_offset);
882 let fiscal_year = so_date.year() as u16;
883 let fiscal_period = so_date.month() as u8;
884
885 let chain = o2c_gen.generate_chain(
886 company_code,
887 customer,
888 &material_refs,
889 so_date,
890 fiscal_year,
891 fiscal_period,
892 "SYSTEM",
893 );
894
895 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
896 chain.sales_order,
897 ))))?;
898 count += 1;
899
900 for delivery in chain.deliveries {
901 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
902 delivery,
903 ))))?;
904 count += 1;
905 }
906
907 if let Some(ci) = chain.customer_invoice {
908 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
909 ci,
910 ))))?;
911 count += 1;
912 }
913
914 if count.is_multiple_of(progress_interval) {
915 progress.items_generated = count;
916 sender.send(StreamEvent::Progress(progress.clone()))?;
917 }
918 }
919 }
920
921 Ok(count)
922 }
923
924 pub fn stats(&self) -> StreamingOrchestratorStats {
926 StreamingOrchestratorStats {
927 phases: self.config.phases.len(),
928 buffer_size: self.config.stream_config.buffer_size,
929 backpressure: self.config.stream_config.backpressure,
930 }
931 }
932}
933
934#[derive(Debug, Clone)]
936pub struct StreamingOrchestratorStats {
937 pub phases: usize,
939 pub buffer_size: usize,
941 pub backpressure: BackpressureStrategy,
943}
944
945fn resolve_coa_framework_from_config(
947 config: &GeneratorConfig,
948) -> datasynth_generators::coa_generator::CoAFramework {
949 use datasynth_generators::coa_generator::CoAFramework;
950 if config.accounting_standards.enabled {
951 match config.accounting_standards.framework {
952 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
953 return CoAFramework::FrenchPcg;
954 }
955 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
956 return CoAFramework::GermanSkr04;
957 }
958 _ => {}
959 }
960 }
961 CoAFramework::UsGaap
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967 use datasynth_config::presets::create_preset;
968 use datasynth_config::schema::TransactionVolume;
969 use datasynth_core::models::{CoAComplexity, IndustrySector};
970
971 fn create_test_config() -> GeneratorConfig {
972 create_preset(
973 IndustrySector::Retail,
974 2,
975 3,
976 CoAComplexity::Small,
977 TransactionVolume::TenK,
978 )
979 }
980
981 #[test]
982 fn test_streaming_orchestrator_creation() {
983 let config = create_test_config();
984 let orchestrator = StreamingOrchestrator::from_generator_config(config);
985 let stats = orchestrator.stats();
986
987 assert!(stats.phases > 0);
988 assert!(stats.buffer_size > 0);
989 }
990
991 #[test]
992 fn test_streaming_generation() {
993 let mut config = create_test_config();
994 config.master_data.vendors.count = 5;
996 config.master_data.customers.count = 5;
997 config.master_data.employees.count = 5;
998 config.global.period_months = 1;
999
1000 let streaming_config = StreamingOrchestratorConfig::new(config)
1001 .with_phases(vec![
1002 GenerationPhase::ChartOfAccounts,
1003 GenerationPhase::MasterData,
1004 ])
1005 .with_stream_config(StreamConfig {
1006 buffer_size: 100,
1007 progress_interval: 10,
1008 ..Default::default()
1009 });
1010
1011 let orchestrator = StreamingOrchestrator::new(streaming_config);
1012 let (receiver, _control) = orchestrator.stream().unwrap();
1013
1014 let mut items_count = 0;
1015 let mut has_coa = false;
1016 let mut has_completion = false;
1017
1018 for event in receiver {
1019 match event {
1020 StreamEvent::Data(item) => {
1021 items_count += 1;
1022 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1023 has_coa = true;
1024 }
1025 }
1026 StreamEvent::Complete(_) => {
1027 has_completion = true;
1028 break;
1029 }
1030 _ => {}
1031 }
1032 }
1033
1034 assert!(items_count > 0);
1035 assert!(has_coa);
1036 assert!(has_completion);
1037 }
1038
1039 #[test]
1040 fn test_stream_cancellation() {
1041 let mut config = create_test_config();
1042 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
1045 .with_phases(vec![GenerationPhase::JournalEntries]);
1046
1047 let orchestrator = StreamingOrchestrator::new(streaming_config);
1048 let (receiver, control) = orchestrator.stream().unwrap();
1049
1050 let mut items_count = 0;
1052 for event in receiver {
1053 if let StreamEvent::Data(_) = event {
1054 items_count += 1;
1055 if items_count >= 10 {
1056 control.cancel();
1057 break;
1058 }
1059 }
1060 }
1061
1062 assert!(control.is_cancelled());
1063 }
1064
1065 #[test]
1066 fn test_streaming_anomaly_injection() {
1067 let mut config = create_test_config();
1068 config.master_data.vendors.count = 3;
1070 config.master_data.customers.count = 3;
1071 config.master_data.employees.count = 3;
1072 config.global.period_months = 1;
1073
1074 config.anomaly_injection.enabled = true;
1076 config.anomaly_injection.rates.total_rate = 0.20; config.anomaly_injection.rates.fraud_rate = 0.40;
1078 config.anomaly_injection.rates.error_rate = 0.40;
1079 config.anomaly_injection.rates.process_rate = 0.20;
1080
1081 let streaming_config = StreamingOrchestratorConfig::new(config)
1082 .with_phases(vec![GenerationPhase::JournalEntries])
1083 .with_stream_config(StreamConfig {
1084 buffer_size: 500,
1085 progress_interval: 50,
1086 ..Default::default()
1087 });
1088
1089 let orchestrator = StreamingOrchestrator::new(streaming_config);
1090 let (receiver, _control) = orchestrator.stream().unwrap();
1091
1092 let mut je_count = 0;
1093 let mut label_count = 0;
1094 let mut has_completion = false;
1095
1096 for event in receiver {
1097 match event {
1098 StreamEvent::Data(item) => match item {
1099 GeneratedItem::JournalEntry(_) => je_count += 1,
1100 GeneratedItem::AnomalyLabel(_) => label_count += 1,
1101 _ => {}
1102 },
1103 StreamEvent::Complete(_) => {
1104 has_completion = true;
1105 break;
1106 }
1107 _ => {}
1108 }
1109 }
1110
1111 assert!(has_completion, "Stream should complete");
1112 assert!(je_count > 0, "Should generate journal entries");
1113 assert!(
1114 label_count > 0,
1115 "Should generate anomaly labels (got {} JEs, {} labels)",
1116 je_count,
1117 label_count
1118 );
1119 }
1120
1121 #[test]
1122 fn test_streaming_no_anomalies_when_disabled() {
1123 let mut config = create_test_config();
1124 config.master_data.vendors.count = 3;
1125 config.master_data.customers.count = 3;
1126 config.master_data.employees.count = 3;
1127 config.global.period_months = 1;
1128
1129 config.anomaly_injection.enabled = false;
1131 config.fraud.enabled = false;
1132
1133 let streaming_config = StreamingOrchestratorConfig::new(config)
1134 .with_phases(vec![GenerationPhase::JournalEntries])
1135 .with_stream_config(StreamConfig {
1136 buffer_size: 500,
1137 progress_interval: 50,
1138 ..Default::default()
1139 });
1140
1141 let orchestrator = StreamingOrchestrator::new(streaming_config);
1142 let (receiver, _control) = orchestrator.stream().unwrap();
1143
1144 let mut label_count = 0;
1145
1146 for event in receiver {
1147 match event {
1148 StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1149 StreamEvent::Complete(_) => break,
1150 _ => {}
1151 }
1152 }
1153
1154 assert_eq!(
1155 label_count, 0,
1156 "Should not generate anomaly labels when disabled"
1157 );
1158 }
1159
1160 #[test]
1161 fn test_generated_item_type_name() {
1162 use datasynth_core::models::{CoAComplexity, IndustrySector};
1163
1164 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1165 "TEST_COA".to_string(),
1166 "Test Chart of Accounts".to_string(),
1167 "US".to_string(),
1168 IndustrySector::Manufacturing,
1169 CoAComplexity::Small,
1170 )));
1171 assert_eq!(coa.type_name(), "chart_of_accounts");
1172
1173 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1174 assert_eq!(progress.type_name(), "progress");
1175 }
1176}