1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14
15const DEFAULT_SEED: u64 = 42;
17use datasynth_core::error::SynthResult;
18use datasynth_core::models::{
19 documents::{
20 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
21 },
22 ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
23};
24use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
25use datasynth_core::traits::{
26 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
27};
28
29#[derive(Debug, Clone)]
31pub enum GeneratedItem {
32 ChartOfAccounts(Box<ChartOfAccounts>),
34 Vendor(Box<Vendor>),
36 Customer(Box<Customer>),
38 Material(Box<Material>),
40 Employee(Box<Employee>),
42 JournalEntry(Box<JournalEntry>),
44 PurchaseOrder(Box<PurchaseOrder>),
46 GoodsReceipt(Box<GoodsReceipt>),
48 VendorInvoice(Box<VendorInvoice>),
50 Payment(Box<Payment>),
52 SalesOrder(Box<SalesOrder>),
54 Delivery(Box<Delivery>),
56 CustomerInvoice(Box<CustomerInvoice>),
58 Progress(StreamProgress),
60 PhaseComplete(String),
62}
63
64impl GeneratedItem {
65 pub fn type_name(&self) -> &'static str {
67 match self {
68 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
69 GeneratedItem::Vendor(_) => "vendor",
70 GeneratedItem::Customer(_) => "customer",
71 GeneratedItem::Material(_) => "material",
72 GeneratedItem::Employee(_) => "employee",
73 GeneratedItem::JournalEntry(_) => "journal_entry",
74 GeneratedItem::PurchaseOrder(_) => "purchase_order",
75 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
76 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
77 GeneratedItem::Payment(_) => "payment",
78 GeneratedItem::SalesOrder(_) => "sales_order",
79 GeneratedItem::Delivery(_) => "delivery",
80 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
81 GeneratedItem::Progress(_) => "progress",
82 GeneratedItem::PhaseComplete(_) => "phase_complete",
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum GenerationPhase {
90 ChartOfAccounts,
92 MasterData,
94 DocumentFlows,
96 OcpmEvents,
98 JournalEntries,
100 AnomalyInjection,
102 BalanceValidation,
104 DataQuality,
106 Complete,
108}
109
110impl GenerationPhase {
111 pub fn name(&self) -> &'static str {
113 match self {
114 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
115 GenerationPhase::MasterData => "master_data",
116 GenerationPhase::DocumentFlows => "document_flows",
117 GenerationPhase::OcpmEvents => "ocpm_events",
118 GenerationPhase::JournalEntries => "journal_entries",
119 GenerationPhase::AnomalyInjection => "anomaly_injection",
120 GenerationPhase::BalanceValidation => "balance_validation",
121 GenerationPhase::DataQuality => "data_quality",
122 GenerationPhase::Complete => "complete",
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct StreamingOrchestratorConfig {
130 pub generator_config: GeneratorConfig,
132 pub stream_config: StreamConfig,
134 pub phases: Vec<GenerationPhase>,
136}
137
138impl StreamingOrchestratorConfig {
139 pub fn new(generator_config: GeneratorConfig) -> Self {
141 Self {
142 generator_config,
143 stream_config: StreamConfig::default(),
144 phases: vec![
145 GenerationPhase::ChartOfAccounts,
146 GenerationPhase::MasterData,
147 GenerationPhase::DocumentFlows,
148 GenerationPhase::JournalEntries,
149 ],
150 }
151 }
152
153 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
155 Self {
156 generator_config,
157 stream_config: StreamConfig::default(),
158 phases: vec![
159 GenerationPhase::ChartOfAccounts,
160 GenerationPhase::MasterData,
161 GenerationPhase::DocumentFlows,
162 GenerationPhase::OcpmEvents,
163 GenerationPhase::JournalEntries,
164 GenerationPhase::AnomalyInjection,
165 GenerationPhase::DataQuality,
166 ],
167 }
168 }
169
170 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
172 self.stream_config = config;
173 self
174 }
175
176 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
178 self.phases = phases;
179 self
180 }
181}
182
183pub struct StreamingOrchestrator {
185 config: StreamingOrchestratorConfig,
186}
187
188impl StreamingOrchestrator {
189 pub fn new(config: StreamingOrchestratorConfig) -> Self {
191 Self { config }
192 }
193
194 pub fn from_generator_config(config: GeneratorConfig) -> Self {
196 Self::new(StreamingOrchestratorConfig::new(config))
197 }
198
199 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
203 let (sender, receiver) = stream_channel(
204 self.config.stream_config.buffer_size,
205 self.config.stream_config.backpressure,
206 );
207
208 let control = Arc::new(StreamControl::new());
209 let control_clone = Arc::clone(&control);
210
211 let config = self.config.clone();
212
213 thread::spawn(move || {
215 let result = Self::run_generation(config, sender, control_clone);
216 if let Err(e) = result {
217 warn!("Streaming generation error: {}", e);
218 }
219 });
220
221 Ok((receiver, control))
222 }
223
224 fn run_generation(
226 config: StreamingOrchestratorConfig,
227 sender: StreamSender<GeneratedItem>,
228 control: Arc<StreamControl>,
229 ) -> SynthResult<()> {
230 let start_time = Instant::now();
231 let mut items_generated: u64 = 0;
232 let mut phases_completed = Vec::new();
233
234 let progress_interval = config.stream_config.progress_interval;
236
237 let mut progress = StreamProgress::new("initializing");
239 sender.send(StreamEvent::Progress(progress.clone()))?;
240
241 for phase in &config.phases {
242 if control.is_cancelled() {
243 info!("Generation cancelled");
244 break;
245 }
246
247 while control.is_paused() {
249 std::thread::sleep(std::time::Duration::from_millis(100));
250 if control.is_cancelled() {
251 break;
252 }
253 }
254
255 progress.phase = phase.name().to_string();
256 sender.send(StreamEvent::Progress(progress.clone()))?;
257
258 match phase {
259 GenerationPhase::ChartOfAccounts => {
260 let result =
261 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
262 items_generated += result;
263 }
264 GenerationPhase::MasterData => {
265 let result = Self::generate_master_data_phase(
266 &config.generator_config,
267 &sender,
268 &control,
269 )?;
270 items_generated += result;
271 }
272 GenerationPhase::DocumentFlows => {
273 let result = Self::generate_document_flows_phase(
274 &config.generator_config,
275 &sender,
276 &control,
277 progress_interval,
278 &mut progress,
279 )?;
280 items_generated += result;
281 }
282 GenerationPhase::OcpmEvents => {
283 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
284 }
285 GenerationPhase::JournalEntries => {
286 let result = Self::generate_journal_entries_phase(
287 &config.generator_config,
288 &sender,
289 &control,
290 progress_interval,
291 &mut progress,
292 )?;
293 items_generated += result;
294 }
295 GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
296 warn!(
297 "Phase {:?} requires post-processing of existing data and is skipped in streaming mode",
298 phase
299 );
300 }
301 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
302 info!("Phase {:?} is not applicable in streaming mode", phase);
303 }
304 }
305
306 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308 phase.name().to_string(),
309 )))?;
310 phases_completed.push(phase.name().to_string());
311
312 progress.items_generated = items_generated;
314 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315 if progress.elapsed_ms > 0 {
316 progress.items_per_second =
317 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318 }
319 sender.send(StreamEvent::Progress(progress.clone()))?;
320 }
321
322 let stats = sender.stats();
324 let summary = StreamSummary {
325 total_items: items_generated,
326 total_time_ms: start_time.elapsed().as_millis() as u64,
327 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329 } else {
330 0.0
331 },
332 error_count: 0,
333 dropped_count: stats.items_dropped,
334 peak_memory_mb: None,
335 phases_completed,
336 };
337
338 sender.send(StreamEvent::Complete(summary))?;
339 sender.close();
340
341 Ok(())
342 }
343
344 fn generate_coa_phase(
346 config: &GeneratorConfig,
347 sender: &StreamSender<GeneratedItem>,
348 control: &Arc<StreamControl>,
349 ) -> SynthResult<u64> {
350 use datasynth_generators::ChartOfAccountsGenerator;
351
352 if control.is_cancelled() {
353 return Ok(0);
354 }
355
356 info!("Generating Chart of Accounts");
357 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
358 let complexity = config.chart_of_accounts.complexity;
359 let industry = config.global.industry;
360 let coa_framework = resolve_coa_framework_from_config(config);
361
362 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
363 .with_coa_framework(coa_framework);
364 let coa = coa_gen.generate();
365
366 let account_count = coa.account_count() as u64;
367 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
368 coa,
369 ))))?;
370
371 Ok(account_count)
372 }
373
374 fn generate_master_data_phase(
376 config: &GeneratorConfig,
377 sender: &StreamSender<GeneratedItem>,
378 control: &Arc<StreamControl>,
379 ) -> SynthResult<u64> {
380 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
381
382 let mut count: u64 = 0;
383 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
384 let md_config = &config.master_data;
385 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
386 .unwrap_or_else(|e| {
387 tracing::warn!(
388 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
389 config.global.start_date,
390 e
391 );
392 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
393 });
394
395 let company_code = config
396 .companies
397 .first()
398 .map(|c| c.code.as_str())
399 .unwrap_or_else(|| {
400 tracing::warn!("No companies configured, defaulting to company code '1000'");
401 "1000"
402 });
403
404 if control.is_cancelled() {
406 return Ok(count);
407 }
408
409 info!("Generating vendors");
410 let mut vendor_gen = VendorGenerator::new(seed);
411 for _ in 0..md_config.vendors.count {
412 if control.is_cancelled() {
413 break;
414 }
415 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
416 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
417 count += 1;
418 }
419
420 if control.is_cancelled() {
422 return Ok(count);
423 }
424
425 info!("Generating customers");
426 let mut customer_gen = CustomerGenerator::new(seed + 1);
427 for _ in 0..md_config.customers.count {
428 if control.is_cancelled() {
429 break;
430 }
431 let customer = customer_gen.generate_customer(company_code, effective_date);
432 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
433 customer,
434 ))))?;
435 count += 1;
436 }
437
438 if control.is_cancelled() {
440 return Ok(count);
441 }
442
443 info!("Generating employees");
444 let mut employee_gen = EmployeeGenerator::new(seed + 4);
445 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
447 datasynth_generators::DepartmentDefinition {
448 code: first_custom.code.clone(),
449 name: first_custom.name.clone(),
450 cost_center: first_custom
451 .cost_center
452 .clone()
453 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
454 headcount: 10,
455 system_roles: vec![],
456 transaction_codes: vec![],
457 }
458 } else {
459 warn!("No departments configured, using default 'General' department");
460 datasynth_generators::DepartmentDefinition {
461 code: "1000".to_string(),
462 name: "General".to_string(),
463 cost_center: "CC1000".to_string(),
464 headcount: 10,
465 system_roles: vec![],
466 transaction_codes: vec![],
467 }
468 };
469 for _ in 0..md_config.employees.count {
470 if control.is_cancelled() {
471 break;
472 }
473 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
474 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
475 employee,
476 ))))?;
477 count += 1;
478 }
479
480 Ok(count)
481 }
482
483 fn generate_journal_entries_phase(
488 config: &GeneratorConfig,
489 sender: &StreamSender<GeneratedItem>,
490 control: &Arc<StreamControl>,
491 progress_interval: u64,
492 progress: &mut StreamProgress,
493 ) -> SynthResult<u64> {
494 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
495 use std::sync::Arc;
496
497 let mut count: u64 = 0;
498 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
499
500 let default_monthly = 500;
502 let total_entries: usize = config
503 .companies
504 .iter()
505 .map(|c| {
506 let monthly = (c.volume_weight * default_monthly as f64) as usize;
507 monthly.max(100) * config.global.period_months as usize
508 })
509 .sum();
510
511 progress.items_remaining = Some(total_entries as u64);
512 info!("Generating {} journal entries", total_entries);
513
514 let complexity = config.chart_of_accounts.complexity;
516 let industry = config.global.industry;
517 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
518 let coa = Arc::new(coa_gen.generate());
519
520 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
522 .unwrap_or_else(|e| {
523 tracing::warn!(
524 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
525 config.global.start_date,
526 e
527 );
528 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
529 });
530 let end_date = start_date
531 .checked_add_months(chrono::Months::new(config.global.period_months))
532 .unwrap_or(start_date + chrono::Duration::days(365));
533
534 let mut je_gen = JournalEntryGenerator::from_generator_config(
536 config,
537 Arc::clone(&coa),
538 start_date,
539 end_date,
540 seed,
541 );
542
543 for _ in 0..total_entries {
544 if control.is_cancelled() {
545 break;
546 }
547
548 while control.is_paused() {
550 std::thread::sleep(std::time::Duration::from_millis(100));
551 if control.is_cancelled() {
552 break;
553 }
554 }
555
556 let je = je_gen.generate();
557 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
558 count += 1;
559
560 if count.is_multiple_of(progress_interval) {
562 progress.items_generated = count;
563 progress.items_remaining = Some(total_entries as u64 - count);
564 sender.send(StreamEvent::Progress(progress.clone()))?;
565 }
566 }
567
568 Ok(count)
569 }
570
571 fn generate_document_flows_phase(
577 config: &GeneratorConfig,
578 sender: &StreamSender<GeneratedItem>,
579 control: &Arc<StreamControl>,
580 progress_interval: u64,
581 progress: &mut StreamProgress,
582 ) -> SynthResult<u64> {
583 use chrono::Datelike;
584 use datasynth_generators::{
585 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
586 };
587
588 let mut count: u64 = 0;
589 let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
590 let df_config = &config.document_flows;
591 let md_config = &config.master_data;
592
593 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
595 .unwrap_or_else(|e| {
596 tracing::warn!(
597 "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
598 config.global.start_date,
599 e
600 );
601 NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
602 });
603 let end_date = start_date
604 .checked_add_months(chrono::Months::new(config.global.period_months))
605 .unwrap_or(start_date + chrono::Duration::days(365));
606 let total_period_days = (end_date - start_date).num_days().max(1);
607
608 let company_code = config
609 .companies
610 .first()
611 .map(|c| c.code.as_str())
612 .unwrap_or_else(|| {
613 tracing::warn!("No companies configured, defaulting to company code '1000'");
614 "1000"
615 });
616
617 let vendor_count = md_config.vendors.count.min(100);
619 let customer_count = md_config.customers.count.min(100);
620 let material_count = md_config.materials.count.min(50);
621
622 let mut vendor_gen = VendorGenerator::new(seed);
624 let mut customer_gen = CustomerGenerator::new(seed + 1);
625 let mut material_gen = MaterialGenerator::new(seed + 2);
626
627 let vendors: Vec<_> = (0..vendor_count)
628 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
629 .collect();
630
631 let customers: Vec<_> = (0..customer_count)
632 .map(|_| customer_gen.generate_customer(company_code, start_date))
633 .collect();
634
635 let materials: Vec<_> = (0..material_count)
636 .map(|_| material_gen.generate_material(company_code, start_date))
637 .collect();
638
639 let base_chains = (config.global.period_months as usize * 50).max(100);
642
643 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
645 info!("Generating P2P document flows");
646 let mut p2p_gen = P2PGenerator::new(seed + 100);
647
648 let chains_to_generate = base_chains.min(1000);
649 progress.items_remaining = Some(chains_to_generate as u64);
650
651 for i in 0..chains_to_generate {
652 if control.is_cancelled() {
653 break;
654 }
655
656 while control.is_paused() {
658 std::thread::sleep(std::time::Duration::from_millis(100));
659 if control.is_cancelled() {
660 break;
661 }
662 }
663
664 let vendor = &vendors[i % vendors.len()];
665 let material_refs: Vec<&datasynth_core::models::Material> =
666 vec![&materials[i % materials.len()]];
667
668 let days_offset = (i as i64 % total_period_days).max(0);
670 let po_date = start_date + chrono::Duration::days(days_offset);
671 let fiscal_year = po_date.year() as u16;
672 let fiscal_period = po_date.month() as u8;
673
674 let chain = p2p_gen.generate_chain(
675 company_code,
676 vendor,
677 &material_refs,
678 po_date,
679 fiscal_year,
680 fiscal_period,
681 "SYSTEM",
682 );
683
684 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
686 chain.purchase_order,
687 ))))?;
688 count += 1;
689
690 for gr in chain.goods_receipts {
691 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
692 count += 1;
693 }
694
695 if let Some(vi) = chain.vendor_invoice {
696 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
697 vi,
698 ))))?;
699 count += 1;
700 }
701
702 if let Some(payment) = chain.payment {
703 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
704 count += 1;
705 }
706
707 if count.is_multiple_of(progress_interval) {
708 progress.items_generated = count;
709 sender.send(StreamEvent::Progress(progress.clone()))?;
710 }
711 }
712 }
713
714 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
716 info!("Generating O2C document flows");
717 let mut o2c_gen = O2CGenerator::new(seed + 200);
718
719 let chains_to_generate = base_chains.min(1000);
720
721 for i in 0..chains_to_generate {
722 if control.is_cancelled() {
723 break;
724 }
725
726 while control.is_paused() {
727 std::thread::sleep(std::time::Duration::from_millis(100));
728 if control.is_cancelled() {
729 break;
730 }
731 }
732
733 let customer = &customers[i % customers.len()];
734 let material_refs: Vec<&datasynth_core::models::Material> =
735 vec![&materials[i % materials.len()]];
736
737 let days_offset = (i as i64 % total_period_days).max(0);
738 let so_date = start_date + chrono::Duration::days(days_offset);
739 let fiscal_year = so_date.year() as u16;
740 let fiscal_period = so_date.month() as u8;
741
742 let chain = o2c_gen.generate_chain(
743 company_code,
744 customer,
745 &material_refs,
746 so_date,
747 fiscal_year,
748 fiscal_period,
749 "SYSTEM",
750 );
751
752 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
753 chain.sales_order,
754 ))))?;
755 count += 1;
756
757 for delivery in chain.deliveries {
758 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
759 delivery,
760 ))))?;
761 count += 1;
762 }
763
764 if let Some(ci) = chain.customer_invoice {
765 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
766 ci,
767 ))))?;
768 count += 1;
769 }
770
771 if count.is_multiple_of(progress_interval) {
772 progress.items_generated = count;
773 sender.send(StreamEvent::Progress(progress.clone()))?;
774 }
775 }
776 }
777
778 Ok(count)
779 }
780
781 pub fn stats(&self) -> StreamingOrchestratorStats {
783 StreamingOrchestratorStats {
784 phases: self.config.phases.len(),
785 buffer_size: self.config.stream_config.buffer_size,
786 backpressure: self.config.stream_config.backpressure,
787 }
788 }
789}
790
791#[derive(Debug, Clone)]
793pub struct StreamingOrchestratorStats {
794 pub phases: usize,
796 pub buffer_size: usize,
798 pub backpressure: BackpressureStrategy,
800}
801
802fn resolve_coa_framework_from_config(
804 config: &GeneratorConfig,
805) -> datasynth_generators::coa_generator::CoAFramework {
806 use datasynth_generators::coa_generator::CoAFramework;
807 if config.accounting_standards.enabled {
808 match config.accounting_standards.framework {
809 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
810 return CoAFramework::FrenchPcg;
811 }
812 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
813 return CoAFramework::GermanSkr04;
814 }
815 _ => {}
816 }
817 }
818 CoAFramework::UsGaap
819}
820
821#[cfg(test)]
822#[allow(clippy::unwrap_used)]
823mod tests {
824 use super::*;
825 use datasynth_config::presets::create_preset;
826 use datasynth_config::schema::TransactionVolume;
827 use datasynth_core::models::{CoAComplexity, IndustrySector};
828
829 fn create_test_config() -> GeneratorConfig {
830 create_preset(
831 IndustrySector::Retail,
832 2,
833 3,
834 CoAComplexity::Small,
835 TransactionVolume::TenK,
836 )
837 }
838
839 #[test]
840 fn test_streaming_orchestrator_creation() {
841 let config = create_test_config();
842 let orchestrator = StreamingOrchestrator::from_generator_config(config);
843 let stats = orchestrator.stats();
844
845 assert!(stats.phases > 0);
846 assert!(stats.buffer_size > 0);
847 }
848
849 #[test]
850 fn test_streaming_generation() {
851 let mut config = create_test_config();
852 config.master_data.vendors.count = 5;
854 config.master_data.customers.count = 5;
855 config.master_data.employees.count = 5;
856 config.global.period_months = 1;
857
858 let streaming_config = StreamingOrchestratorConfig::new(config)
859 .with_phases(vec![
860 GenerationPhase::ChartOfAccounts,
861 GenerationPhase::MasterData,
862 ])
863 .with_stream_config(StreamConfig {
864 buffer_size: 100,
865 progress_interval: 10,
866 ..Default::default()
867 });
868
869 let orchestrator = StreamingOrchestrator::new(streaming_config);
870 let (receiver, _control) = orchestrator.stream().unwrap();
871
872 let mut items_count = 0;
873 let mut has_coa = false;
874 let mut has_completion = false;
875
876 for event in receiver {
877 match event {
878 StreamEvent::Data(item) => {
879 items_count += 1;
880 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
881 has_coa = true;
882 }
883 }
884 StreamEvent::Complete(_) => {
885 has_completion = true;
886 break;
887 }
888 _ => {}
889 }
890 }
891
892 assert!(items_count > 0);
893 assert!(has_coa);
894 assert!(has_completion);
895 }
896
897 #[test]
898 fn test_stream_cancellation() {
899 let mut config = create_test_config();
900 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
903 .with_phases(vec![GenerationPhase::JournalEntries]);
904
905 let orchestrator = StreamingOrchestrator::new(streaming_config);
906 let (receiver, control) = orchestrator.stream().unwrap();
907
908 let mut items_count = 0;
910 for event in receiver {
911 if let StreamEvent::Data(_) = event {
912 items_count += 1;
913 if items_count >= 10 {
914 control.cancel();
915 break;
916 }
917 }
918 }
919
920 assert!(control.is_cancelled());
921 }
922
923 #[test]
924 fn test_generated_item_type_name() {
925 use datasynth_core::models::{CoAComplexity, IndustrySector};
926
927 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
928 "TEST_COA".to_string(),
929 "Test Chart of Accounts".to_string(),
930 "US".to_string(),
931 IndustrySector::Manufacturing,
932 CoAComplexity::Small,
933 )));
934 assert_eq!(coa.type_name(), "chart_of_accounts");
935
936 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
937 assert_eq!(progress.type_name(), "progress");
938 }
939}