1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14
15const DEFAULT_SEED: u64 = 42;
17use datasynth_core::error::SynthResult;
18use datasynth_core::models::{
19 documents::{
20 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
21 },
22 AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
23 Vendor,
24};
25use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
26use datasynth_core::traits::{
27 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
28};
29
30#[derive(Debug, Clone)]
32pub enum GeneratedItem {
33 ChartOfAccounts(Box<ChartOfAccounts>),
35 Vendor(Box<Vendor>),
37 Customer(Box<Customer>),
39 Material(Box<Material>),
41 Employee(Box<Employee>),
43 JournalEntry(Box<JournalEntry>),
45 PurchaseOrder(Box<PurchaseOrder>),
47 GoodsReceipt(Box<GoodsReceipt>),
49 VendorInvoice(Box<VendorInvoice>),
51 Payment(Box<Payment>),
53 SalesOrder(Box<SalesOrder>),
55 Delivery(Box<Delivery>),
57 CustomerInvoice(Box<CustomerInvoice>),
59 AnomalyLabel(Box<LabeledAnomaly>),
61 Progress(StreamProgress),
63 PhaseComplete(String),
65}
66
67impl GeneratedItem {
68 pub fn type_name(&self) -> &'static str {
70 match self {
71 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
72 GeneratedItem::Vendor(_) => "vendor",
73 GeneratedItem::Customer(_) => "customer",
74 GeneratedItem::Material(_) => "material",
75 GeneratedItem::Employee(_) => "employee",
76 GeneratedItem::JournalEntry(_) => "journal_entry",
77 GeneratedItem::PurchaseOrder(_) => "purchase_order",
78 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
79 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
80 GeneratedItem::Payment(_) => "payment",
81 GeneratedItem::SalesOrder(_) => "sales_order",
82 GeneratedItem::Delivery(_) => "delivery",
83 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
84 GeneratedItem::AnomalyLabel(_) => "anomaly_label",
85 GeneratedItem::Progress(_) => "progress",
86 GeneratedItem::PhaseComplete(_) => "phase_complete",
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum GenerationPhase {
94 ChartOfAccounts,
96 MasterData,
98 DocumentFlows,
100 OcpmEvents,
102 JournalEntries,
104 AnomalyInjection,
106 BalanceValidation,
108 DataQuality,
110 Complete,
112}
113
114impl GenerationPhase {
115 pub fn name(&self) -> &'static str {
117 match self {
118 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
119 GenerationPhase::MasterData => "master_data",
120 GenerationPhase::DocumentFlows => "document_flows",
121 GenerationPhase::OcpmEvents => "ocpm_events",
122 GenerationPhase::JournalEntries => "journal_entries",
123 GenerationPhase::AnomalyInjection => "anomaly_injection",
124 GenerationPhase::BalanceValidation => "balance_validation",
125 GenerationPhase::DataQuality => "data_quality",
126 GenerationPhase::Complete => "complete",
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct StreamingOrchestratorConfig {
134 pub generator_config: GeneratorConfig,
136 pub stream_config: StreamConfig,
138 pub phases: Vec<GenerationPhase>,
140}
141
142impl StreamingOrchestratorConfig {
143 pub fn new(generator_config: GeneratorConfig) -> Self {
145 Self {
146 generator_config,
147 stream_config: StreamConfig::default(),
148 phases: vec![
149 GenerationPhase::ChartOfAccounts,
150 GenerationPhase::MasterData,
151 GenerationPhase::DocumentFlows,
152 GenerationPhase::JournalEntries,
153 ],
154 }
155 }
156
157 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
159 Self {
160 generator_config,
161 stream_config: StreamConfig::default(),
162 phases: vec![
163 GenerationPhase::ChartOfAccounts,
164 GenerationPhase::MasterData,
165 GenerationPhase::DocumentFlows,
166 GenerationPhase::OcpmEvents,
167 GenerationPhase::JournalEntries,
168 GenerationPhase::AnomalyInjection,
169 GenerationPhase::DataQuality,
170 ],
171 }
172 }
173
174 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
176 self.stream_config = config;
177 self
178 }
179
180 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
182 self.phases = phases;
183 self
184 }
185}
186
187pub struct StreamingOrchestrator {
189 config: StreamingOrchestratorConfig,
190}
191
192impl StreamingOrchestrator {
193 pub fn new(config: StreamingOrchestratorConfig) -> Self {
195 Self { config }
196 }
197
198 pub fn from_generator_config(config: GeneratorConfig) -> Self {
200 Self::new(StreamingOrchestratorConfig::new(config))
201 }
202
203 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
207 let (sender, receiver) = stream_channel(
208 self.config.stream_config.buffer_size,
209 self.config.stream_config.backpressure,
210 );
211
212 let control = Arc::new(StreamControl::new());
213 let control_clone = Arc::clone(&control);
214
215 let config = self.config.clone();
216
217 thread::spawn(move || {
219 let result = Self::run_generation(config, sender, control_clone);
220 if let Err(e) = result {
221 warn!("Streaming generation error: {}", e);
222 }
223 });
224
225 Ok((receiver, control))
226 }
227
228 fn run_generation(
230 config: StreamingOrchestratorConfig,
231 sender: StreamSender<GeneratedItem>,
232 control: Arc<StreamControl>,
233 ) -> SynthResult<()> {
234 let start_time = Instant::now();
235 let mut items_generated: u64 = 0;
236 let mut phases_completed = Vec::new();
237
238 let progress_interval = config.stream_config.progress_interval;
240
241 let mut progress = StreamProgress::new("initializing");
243 sender.send(StreamEvent::Progress(progress.clone()))?;
244
245 for phase in &config.phases {
246 if control.is_cancelled() {
247 info!("Generation cancelled");
248 break;
249 }
250
251 while control.is_paused() {
253 std::thread::sleep(std::time::Duration::from_millis(100));
254 if control.is_cancelled() {
255 break;
256 }
257 }
258
259 progress.phase = phase.name().to_string();
260 sender.send(StreamEvent::Progress(progress.clone()))?;
261
262 match phase {
263 GenerationPhase::ChartOfAccounts => {
264 let result =
265 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
266 items_generated += result;
267 }
268 GenerationPhase::MasterData => {
269 let result = Self::generate_master_data_phase(
270 &config.generator_config,
271 &sender,
272 &control,
273 )?;
274 items_generated += result;
275 }
276 GenerationPhase::DocumentFlows => {
277 let result = Self::generate_document_flows_phase(
278 &config.generator_config,
279 &sender,
280 &control,
281 progress_interval,
282 &mut progress,
283 )?;
284 items_generated += result;
285 }
286 GenerationPhase::OcpmEvents => {
287 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
288 }
289 GenerationPhase::JournalEntries => {
290 let result = Self::generate_journal_entries_phase(
291 &config.generator_config,
292 &sender,
293 &control,
294 progress_interval,
295 &mut progress,
296 )?;
297 items_generated += result;
298 }
299 GenerationPhase::AnomalyInjection => {
300 info!("Anomaly injection applied inline during JE generation phase in streaming mode");
301 }
302 GenerationPhase::DataQuality => {
303 info!(
304 "Data quality injection is not yet supported in streaming mode; skipping"
305 );
306 }
307 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
308 info!("Phase {:?} is not applicable in streaming mode", phase);
309 }
310 }
311
312 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
314 phase.name().to_string(),
315 )))?;
316 phases_completed.push(phase.name().to_string());
317
318 progress.items_generated = items_generated;
320 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
321 if progress.elapsed_ms > 0 {
322 progress.items_per_second =
323 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
324 }
325 sender.send(StreamEvent::Progress(progress.clone()))?;
326 }
327
328 let stats = sender.stats();
330 let summary = StreamSummary {
331 total_items: items_generated,
332 total_time_ms: start_time.elapsed().as_millis() as u64,
333 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
334 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
335 } else {
336 0.0
337 },
338 error_count: 0,
339 dropped_count: stats.items_dropped,
340 peak_memory_mb: None,
341 phases_completed,
342 };
343
344 sender.send(StreamEvent::Complete(summary))?;
345 sender.close();
346
347 Ok(())
348 }
349
350 fn generate_coa_phase(
352 config: &GeneratorConfig,
353 sender: &StreamSender<GeneratedItem>,
354 control: &Arc<StreamControl>,
355 ) -> SynthResult<u64> {
356 use datasynth_generators::ChartOfAccountsGenerator;
357
358 if control.is_cancelled() {
359 return Ok(0);
360 }
361
362 info!("Generating Chart of Accounts");
363 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
364 let complexity = config.chart_of_accounts.complexity;
365 let industry = config.global.industry;
366 let coa_framework = resolve_coa_framework_from_config(config);
367
368 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
369 .with_coa_framework(coa_framework);
370 let coa = coa_gen.generate();
371
372 let account_count = coa.account_count() as u64;
373 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
374 coa,
375 ))))?;
376
377 Ok(account_count)
378 }
379
380 fn generate_master_data_phase(
382 config: &GeneratorConfig,
383 sender: &StreamSender<GeneratedItem>,
384 control: &Arc<StreamControl>,
385 ) -> SynthResult<u64> {
386 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
387
388 let mut count: u64 = 0;
389 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
390 let md_config = &config.master_data;
391 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
392 .unwrap_or_else(|e| {
393 tracing::warn!(
394 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
395 config.global.start_date,
396 e
397 );
398 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
399 });
400
401 let company_code = config
402 .companies
403 .first()
404 .map(|c| c.code.as_str())
405 .unwrap_or_else(|| {
406 tracing::warn!("No companies configured, defaulting to company code '1000'");
407 "1000"
408 });
409
410 if control.is_cancelled() {
412 return Ok(count);
413 }
414
415 info!("Generating vendors");
416 let mut vendor_gen = VendorGenerator::new(seed);
417 for _ in 0..md_config.vendors.count {
418 if control.is_cancelled() {
419 break;
420 }
421 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
422 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
423 count += 1;
424 }
425
426 if control.is_cancelled() {
428 return Ok(count);
429 }
430
431 info!("Generating customers");
432 let mut customer_gen = CustomerGenerator::new(seed + 1);
433 for _ in 0..md_config.customers.count {
434 if control.is_cancelled() {
435 break;
436 }
437 let customer = customer_gen.generate_customer(company_code, effective_date);
438 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
439 customer,
440 ))))?;
441 count += 1;
442 }
443
444 if control.is_cancelled() {
446 return Ok(count);
447 }
448
449 info!("Generating employees");
450 let mut employee_gen = EmployeeGenerator::new(seed + 4);
451 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
453 datasynth_generators::DepartmentDefinition {
454 code: first_custom.code.clone(),
455 name: first_custom.name.clone(),
456 cost_center: first_custom
457 .cost_center
458 .clone()
459 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
460 headcount: 10,
461 system_roles: vec![],
462 transaction_codes: vec![],
463 }
464 } else {
465 warn!("No departments configured, using default 'General' department");
466 datasynth_generators::DepartmentDefinition {
467 code: "1000".to_string(),
468 name: "General".to_string(),
469 cost_center: "CC1000".to_string(),
470 headcount: 10,
471 system_roles: vec![],
472 transaction_codes: vec![],
473 }
474 };
475 for _ in 0..md_config.employees.count {
476 if control.is_cancelled() {
477 break;
478 }
479 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
480 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
481 employee,
482 ))))?;
483 count += 1;
484 }
485
486 Ok(count)
487 }
488
489 fn generate_journal_entries_phase(
497 config: &GeneratorConfig,
498 sender: &StreamSender<GeneratedItem>,
499 control: &Arc<StreamControl>,
500 progress_interval: u64,
501 progress: &mut StreamProgress,
502 ) -> SynthResult<u64> {
503 use datasynth_generators::{
504 AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator, JournalEntryGenerator,
505 };
506 use std::sync::Arc;
507
508 let mut count: u64 = 0;
509 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
510
511 let default_monthly = 500;
513 let total_entries: usize = config
514 .companies
515 .iter()
516 .map(|c| {
517 let monthly = (c.volume_weight * default_monthly as f64) as usize;
518 monthly.max(100) * config.global.period_months as usize
519 })
520 .sum();
521
522 progress.items_remaining = Some(total_entries as u64);
523 info!("Generating {} journal entries", total_entries);
524
525 let complexity = config.chart_of_accounts.complexity;
527 let industry = config.global.industry;
528 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
529 let coa = Arc::new(coa_gen.generate());
530
531 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
533 .unwrap_or_else(|e| {
534 tracing::warn!(
535 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
536 config.global.start_date,
537 e
538 );
539 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
540 });
541 let end_date = start_date
542 .checked_add_months(chrono::Months::new(config.global.period_months))
543 .unwrap_or(start_date + chrono::Duration::days(365));
544
545 let mut je_gen = JournalEntryGenerator::from_generator_config(
547 config,
548 Arc::clone(&coa),
549 start_date,
550 end_date,
551 seed,
552 );
553
554 let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
557 let mut anomaly_injector = if anomaly_enabled {
558 let total_rate = if config.anomaly_injection.enabled {
559 config.anomaly_injection.rates.total_rate
560 } else {
561 config.fraud.fraud_rate
562 };
563 let fraud_rate = if config.anomaly_injection.enabled {
564 config.anomaly_injection.rates.fraud_rate
565 } else {
566 AnomalyRateConfig::default().fraud_rate
567 };
568 let error_rate = if config.anomaly_injection.enabled {
569 config.anomaly_injection.rates.error_rate
570 } else {
571 AnomalyRateConfig::default().error_rate
572 };
573 let process_issue_rate = if config.anomaly_injection.enabled {
574 config.anomaly_injection.rates.process_rate
575 } else {
576 AnomalyRateConfig::default().process_issue_rate
577 };
578
579 let injector_config = AnomalyInjectorConfig {
580 rates: AnomalyRateConfig {
581 total_rate,
582 fraud_rate,
583 error_rate,
584 process_issue_rate,
585 ..Default::default()
586 },
587 seed: seed + 5000,
588 ..Default::default()
589 };
590
591 info!(
592 "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
593 total_rate
594 );
595 Some(AnomalyInjector::new(injector_config))
596 } else {
597 None
598 };
599
600 let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
603 let mut remaining = total_entries;
604
605 while remaining > 0 {
606 if control.is_cancelled() {
607 break;
608 }
609
610 let current_batch = remaining.min(batch_size);
611 let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
612
613 for _ in 0..current_batch {
614 if control.is_cancelled() {
615 break;
616 }
617
618 while control.is_paused() {
620 std::thread::sleep(std::time::Duration::from_millis(100));
621 if control.is_cancelled() {
622 break;
623 }
624 }
625
626 batch.push(je_gen.generate());
627 }
628
629 if batch.is_empty() {
630 break;
631 }
632
633 if let Some(ref mut injector) = anomaly_injector {
635 let result = injector.process_entries(&mut batch);
636
637 for label in result.labels {
639 sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
640 label,
641 ))))?;
642 }
643 }
644
645 for je in batch {
647 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
648 count += 1;
649
650 if count.is_multiple_of(progress_interval) {
652 progress.items_generated = count;
653 progress.items_remaining = Some(total_entries as u64 - count);
654 sender.send(StreamEvent::Progress(progress.clone()))?;
655 }
656 }
657
658 remaining = remaining.saturating_sub(current_batch);
659 }
660
661 Ok(count)
662 }
663
664 fn generate_document_flows_phase(
670 config: &GeneratorConfig,
671 sender: &StreamSender<GeneratedItem>,
672 control: &Arc<StreamControl>,
673 progress_interval: u64,
674 progress: &mut StreamProgress,
675 ) -> SynthResult<u64> {
676 use chrono::Datelike;
677 use datasynth_generators::{
678 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
679 };
680
681 let mut count: u64 = 0;
682 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
683 let df_config = &config.document_flows;
684 let md_config = &config.master_data;
685
686 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
688 .unwrap_or_else(|e| {
689 tracing::warn!(
690 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
691 config.global.start_date,
692 e
693 );
694 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
695 });
696 let end_date = start_date
697 .checked_add_months(chrono::Months::new(config.global.period_months))
698 .unwrap_or(start_date + chrono::Duration::days(365));
699 let total_period_days = (end_date - start_date).num_days().max(1);
700
701 let company_code = config
702 .companies
703 .first()
704 .map(|c| c.code.as_str())
705 .unwrap_or_else(|| {
706 tracing::warn!("No companies configured, defaulting to company code '1000'");
707 "1000"
708 });
709
710 let vendor_count = md_config.vendors.count.min(100);
712 let customer_count = md_config.customers.count.min(100);
713 let material_count = md_config.materials.count.min(50);
714
715 let mut vendor_gen = VendorGenerator::new(seed);
717 let mut customer_gen = CustomerGenerator::new(seed + 1);
718 let mut material_gen = MaterialGenerator::new(seed + 2);
719
720 let vendors: Vec<_> = (0..vendor_count)
721 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
722 .collect();
723
724 let customers: Vec<_> = (0..customer_count)
725 .map(|_| customer_gen.generate_customer(company_code, start_date))
726 .collect();
727
728 let materials: Vec<_> = (0..material_count)
729 .map(|_| material_gen.generate_material(company_code, start_date))
730 .collect();
731
732 let base_chains = (config.global.period_months as usize * 50).max(100);
735
736 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
738 info!("Generating P2P document flows");
739 let mut p2p_gen = P2PGenerator::new(seed + 100);
740
741 let chains_to_generate = base_chains.min(1000);
742 progress.items_remaining = Some(chains_to_generate as u64);
743
744 for i in 0..chains_to_generate {
745 if control.is_cancelled() {
746 break;
747 }
748
749 while control.is_paused() {
751 std::thread::sleep(std::time::Duration::from_millis(100));
752 if control.is_cancelled() {
753 break;
754 }
755 }
756
757 let vendor = &vendors[i % vendors.len()];
758 let material_refs: Vec<&datasynth_core::models::Material> =
759 vec![&materials[i % materials.len()]];
760
761 let days_offset = (i as i64 % total_period_days).max(0);
763 let po_date = start_date + chrono::Duration::days(days_offset);
764 let fiscal_year = po_date.year() as u16;
765 let fiscal_period = po_date.month() as u8;
766
767 let chain = p2p_gen.generate_chain(
768 company_code,
769 vendor,
770 &material_refs,
771 po_date,
772 fiscal_year,
773 fiscal_period,
774 "SYSTEM",
775 );
776
777 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
779 chain.purchase_order,
780 ))))?;
781 count += 1;
782
783 for gr in chain.goods_receipts {
784 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
785 count += 1;
786 }
787
788 if let Some(vi) = chain.vendor_invoice {
789 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
790 vi,
791 ))))?;
792 count += 1;
793 }
794
795 if let Some(payment) = chain.payment {
796 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
797 count += 1;
798 }
799
800 if count.is_multiple_of(progress_interval) {
801 progress.items_generated = count;
802 sender.send(StreamEvent::Progress(progress.clone()))?;
803 }
804 }
805 }
806
807 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
809 info!("Generating O2C document flows");
810 let mut o2c_gen = O2CGenerator::new(seed + 200);
811
812 let chains_to_generate = base_chains.min(1000);
813
814 for i in 0..chains_to_generate {
815 if control.is_cancelled() {
816 break;
817 }
818
819 while control.is_paused() {
820 std::thread::sleep(std::time::Duration::from_millis(100));
821 if control.is_cancelled() {
822 break;
823 }
824 }
825
826 let customer = &customers[i % customers.len()];
827 let material_refs: Vec<&datasynth_core::models::Material> =
828 vec![&materials[i % materials.len()]];
829
830 let days_offset = (i as i64 % total_period_days).max(0);
831 let so_date = start_date + chrono::Duration::days(days_offset);
832 let fiscal_year = so_date.year() as u16;
833 let fiscal_period = so_date.month() as u8;
834
835 let chain = o2c_gen.generate_chain(
836 company_code,
837 customer,
838 &material_refs,
839 so_date,
840 fiscal_year,
841 fiscal_period,
842 "SYSTEM",
843 );
844
845 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
846 chain.sales_order,
847 ))))?;
848 count += 1;
849
850 for delivery in chain.deliveries {
851 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
852 delivery,
853 ))))?;
854 count += 1;
855 }
856
857 if let Some(ci) = chain.customer_invoice {
858 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
859 ci,
860 ))))?;
861 count += 1;
862 }
863
864 if count.is_multiple_of(progress_interval) {
865 progress.items_generated = count;
866 sender.send(StreamEvent::Progress(progress.clone()))?;
867 }
868 }
869 }
870
871 Ok(count)
872 }
873
874 pub fn stats(&self) -> StreamingOrchestratorStats {
876 StreamingOrchestratorStats {
877 phases: self.config.phases.len(),
878 buffer_size: self.config.stream_config.buffer_size,
879 backpressure: self.config.stream_config.backpressure,
880 }
881 }
882}
883
884#[derive(Debug, Clone)]
886pub struct StreamingOrchestratorStats {
887 pub phases: usize,
889 pub buffer_size: usize,
891 pub backpressure: BackpressureStrategy,
893}
894
895fn resolve_coa_framework_from_config(
897 config: &GeneratorConfig,
898) -> datasynth_generators::coa_generator::CoAFramework {
899 use datasynth_generators::coa_generator::CoAFramework;
900 if config.accounting_standards.enabled {
901 match config.accounting_standards.framework {
902 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
903 return CoAFramework::FrenchPcg;
904 }
905 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
906 return CoAFramework::GermanSkr04;
907 }
908 _ => {}
909 }
910 }
911 CoAFramework::UsGaap
912}
913
914#[cfg(test)]
915#[allow(clippy::unwrap_used)]
916mod tests {
917 use super::*;
918 use datasynth_config::presets::create_preset;
919 use datasynth_config::schema::TransactionVolume;
920 use datasynth_core::models::{CoAComplexity, IndustrySector};
921
922 fn create_test_config() -> GeneratorConfig {
923 create_preset(
924 IndustrySector::Retail,
925 2,
926 3,
927 CoAComplexity::Small,
928 TransactionVolume::TenK,
929 )
930 }
931
932 #[test]
933 fn test_streaming_orchestrator_creation() {
934 let config = create_test_config();
935 let orchestrator = StreamingOrchestrator::from_generator_config(config);
936 let stats = orchestrator.stats();
937
938 assert!(stats.phases > 0);
939 assert!(stats.buffer_size > 0);
940 }
941
942 #[test]
943 fn test_streaming_generation() {
944 let mut config = create_test_config();
945 config.master_data.vendors.count = 5;
947 config.master_data.customers.count = 5;
948 config.master_data.employees.count = 5;
949 config.global.period_months = 1;
950
951 let streaming_config = StreamingOrchestratorConfig::new(config)
952 .with_phases(vec![
953 GenerationPhase::ChartOfAccounts,
954 GenerationPhase::MasterData,
955 ])
956 .with_stream_config(StreamConfig {
957 buffer_size: 100,
958 progress_interval: 10,
959 ..Default::default()
960 });
961
962 let orchestrator = StreamingOrchestrator::new(streaming_config);
963 let (receiver, _control) = orchestrator.stream().unwrap();
964
965 let mut items_count = 0;
966 let mut has_coa = false;
967 let mut has_completion = false;
968
969 for event in receiver {
970 match event {
971 StreamEvent::Data(item) => {
972 items_count += 1;
973 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
974 has_coa = true;
975 }
976 }
977 StreamEvent::Complete(_) => {
978 has_completion = true;
979 break;
980 }
981 _ => {}
982 }
983 }
984
985 assert!(items_count > 0);
986 assert!(has_coa);
987 assert!(has_completion);
988 }
989
990 #[test]
991 fn test_stream_cancellation() {
992 let mut config = create_test_config();
993 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
996 .with_phases(vec![GenerationPhase::JournalEntries]);
997
998 let orchestrator = StreamingOrchestrator::new(streaming_config);
999 let (receiver, control) = orchestrator.stream().unwrap();
1000
1001 let mut items_count = 0;
1003 for event in receiver {
1004 if let StreamEvent::Data(_) = event {
1005 items_count += 1;
1006 if items_count >= 10 {
1007 control.cancel();
1008 break;
1009 }
1010 }
1011 }
1012
1013 assert!(control.is_cancelled());
1014 }
1015
1016 #[test]
1017 fn test_streaming_anomaly_injection() {
1018 let mut config = create_test_config();
1019 config.master_data.vendors.count = 3;
1021 config.master_data.customers.count = 3;
1022 config.master_data.employees.count = 3;
1023 config.global.period_months = 1;
1024
1025 config.anomaly_injection.enabled = true;
1027 config.anomaly_injection.rates.total_rate = 0.20; config.anomaly_injection.rates.fraud_rate = 0.40;
1029 config.anomaly_injection.rates.error_rate = 0.40;
1030 config.anomaly_injection.rates.process_rate = 0.20;
1031
1032 let streaming_config = StreamingOrchestratorConfig::new(config)
1033 .with_phases(vec![GenerationPhase::JournalEntries])
1034 .with_stream_config(StreamConfig {
1035 buffer_size: 500,
1036 progress_interval: 50,
1037 ..Default::default()
1038 });
1039
1040 let orchestrator = StreamingOrchestrator::new(streaming_config);
1041 let (receiver, _control) = orchestrator.stream().unwrap();
1042
1043 let mut je_count = 0;
1044 let mut label_count = 0;
1045 let mut has_completion = false;
1046
1047 for event in receiver {
1048 match event {
1049 StreamEvent::Data(item) => match item {
1050 GeneratedItem::JournalEntry(_) => je_count += 1,
1051 GeneratedItem::AnomalyLabel(_) => label_count += 1,
1052 _ => {}
1053 },
1054 StreamEvent::Complete(_) => {
1055 has_completion = true;
1056 break;
1057 }
1058 _ => {}
1059 }
1060 }
1061
1062 assert!(has_completion, "Stream should complete");
1063 assert!(je_count > 0, "Should generate journal entries");
1064 assert!(
1065 label_count > 0,
1066 "Should generate anomaly labels (got {} JEs, {} labels)",
1067 je_count,
1068 label_count
1069 );
1070 }
1071
1072 #[test]
1073 fn test_streaming_no_anomalies_when_disabled() {
1074 let mut config = create_test_config();
1075 config.master_data.vendors.count = 3;
1076 config.master_data.customers.count = 3;
1077 config.master_data.employees.count = 3;
1078 config.global.period_months = 1;
1079
1080 config.anomaly_injection.enabled = false;
1082 config.fraud.enabled = false;
1083
1084 let streaming_config = StreamingOrchestratorConfig::new(config)
1085 .with_phases(vec![GenerationPhase::JournalEntries])
1086 .with_stream_config(StreamConfig {
1087 buffer_size: 500,
1088 progress_interval: 50,
1089 ..Default::default()
1090 });
1091
1092 let orchestrator = StreamingOrchestrator::new(streaming_config);
1093 let (receiver, _control) = orchestrator.stream().unwrap();
1094
1095 let mut label_count = 0;
1096
1097 for event in receiver {
1098 match event {
1099 StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1100 StreamEvent::Complete(_) => break,
1101 _ => {}
1102 }
1103 }
1104
1105 assert_eq!(
1106 label_count, 0,
1107 "Should not generate anomaly labels when disabled"
1108 );
1109 }
1110
1111 #[test]
1112 fn test_generated_item_type_name() {
1113 use datasynth_core::models::{CoAComplexity, IndustrySector};
1114
1115 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1116 "TEST_COA".to_string(),
1117 "Test Chart of Accounts".to_string(),
1118 "US".to_string(),
1119 IndustrySector::Manufacturing,
1120 CoAComplexity::Small,
1121 )));
1122 assert_eq!(coa.type_name(), "chart_of_accounts");
1123
1124 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1125 assert_eq!(progress.type_name(), "progress");
1126 }
1127}