1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16 documents::{
17 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18 },
19 ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29 ChartOfAccounts(Box<ChartOfAccounts>),
31 Vendor(Box<Vendor>),
33 Customer(Box<Customer>),
35 Material(Box<Material>),
37 Employee(Box<Employee>),
39 JournalEntry(Box<JournalEntry>),
41 PurchaseOrder(Box<PurchaseOrder>),
43 GoodsReceipt(Box<GoodsReceipt>),
45 VendorInvoice(Box<VendorInvoice>),
47 Payment(Box<Payment>),
49 SalesOrder(Box<SalesOrder>),
51 Delivery(Box<Delivery>),
53 CustomerInvoice(Box<CustomerInvoice>),
55 Progress(StreamProgress),
57 PhaseComplete(String),
59}
60
61impl GeneratedItem {
62 pub fn type_name(&self) -> &'static str {
64 match self {
65 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66 GeneratedItem::Vendor(_) => "vendor",
67 GeneratedItem::Customer(_) => "customer",
68 GeneratedItem::Material(_) => "material",
69 GeneratedItem::Employee(_) => "employee",
70 GeneratedItem::JournalEntry(_) => "journal_entry",
71 GeneratedItem::PurchaseOrder(_) => "purchase_order",
72 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74 GeneratedItem::Payment(_) => "payment",
75 GeneratedItem::SalesOrder(_) => "sales_order",
76 GeneratedItem::Delivery(_) => "delivery",
77 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78 GeneratedItem::Progress(_) => "progress",
79 GeneratedItem::PhaseComplete(_) => "phase_complete",
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87 ChartOfAccounts,
89 MasterData,
91 DocumentFlows,
93 OcpmEvents,
95 JournalEntries,
97 AnomalyInjection,
99 BalanceValidation,
101 DataQuality,
103 Complete,
105}
106
107impl GenerationPhase {
108 pub fn name(&self) -> &'static str {
110 match self {
111 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112 GenerationPhase::MasterData => "master_data",
113 GenerationPhase::DocumentFlows => "document_flows",
114 GenerationPhase::OcpmEvents => "ocpm_events",
115 GenerationPhase::JournalEntries => "journal_entries",
116 GenerationPhase::AnomalyInjection => "anomaly_injection",
117 GenerationPhase::BalanceValidation => "balance_validation",
118 GenerationPhase::DataQuality => "data_quality",
119 GenerationPhase::Complete => "complete",
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127 pub generator_config: GeneratorConfig,
129 pub stream_config: StreamConfig,
131 pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136 pub fn new(generator_config: GeneratorConfig) -> Self {
138 Self {
139 generator_config,
140 stream_config: StreamConfig::default(),
141 phases: vec![
142 GenerationPhase::ChartOfAccounts,
143 GenerationPhase::MasterData,
144 GenerationPhase::DocumentFlows,
145 GenerationPhase::JournalEntries,
146 ],
147 }
148 }
149
150 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152 Self {
153 generator_config,
154 stream_config: StreamConfig::default(),
155 phases: vec![
156 GenerationPhase::ChartOfAccounts,
157 GenerationPhase::MasterData,
158 GenerationPhase::DocumentFlows,
159 GenerationPhase::OcpmEvents,
160 GenerationPhase::JournalEntries,
161 GenerationPhase::AnomalyInjection,
162 GenerationPhase::DataQuality,
163 ],
164 }
165 }
166
167 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169 self.stream_config = config;
170 self
171 }
172
173 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175 self.phases = phases;
176 self
177 }
178}
179
180pub struct StreamingOrchestrator {
182 config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186 pub fn new(config: StreamingOrchestratorConfig) -> Self {
188 Self { config }
189 }
190
191 pub fn from_generator_config(config: GeneratorConfig) -> Self {
193 Self::new(StreamingOrchestratorConfig::new(config))
194 }
195
196 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200 let (sender, receiver) = stream_channel(
201 self.config.stream_config.buffer_size,
202 self.config.stream_config.backpressure,
203 );
204
205 let control = Arc::new(StreamControl::new());
206 let control_clone = Arc::clone(&control);
207
208 let config = self.config.clone();
209
210 thread::spawn(move || {
212 let result = Self::run_generation(config, sender, control_clone);
213 if let Err(e) = result {
214 warn!("Streaming generation error: {}", e);
215 }
216 });
217
218 Ok((receiver, control))
219 }
220
221 fn run_generation(
223 config: StreamingOrchestratorConfig,
224 sender: StreamSender<GeneratedItem>,
225 control: Arc<StreamControl>,
226 ) -> SynthResult<()> {
227 let start_time = Instant::now();
228 let mut items_generated: u64 = 0;
229 let mut phases_completed = Vec::new();
230
231 let progress_interval = config.stream_config.progress_interval;
233
234 let mut progress = StreamProgress::new("initializing");
236 sender.send(StreamEvent::Progress(progress.clone()))?;
237
238 for phase in &config.phases {
239 if control.is_cancelled() {
240 info!("Generation cancelled");
241 break;
242 }
243
244 while control.is_paused() {
246 std::thread::sleep(std::time::Duration::from_millis(100));
247 if control.is_cancelled() {
248 break;
249 }
250 }
251
252 progress.phase = phase.name().to_string();
253 sender.send(StreamEvent::Progress(progress.clone()))?;
254
255 match phase {
256 GenerationPhase::ChartOfAccounts => {
257 let result =
258 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259 items_generated += result;
260 }
261 GenerationPhase::MasterData => {
262 let result = Self::generate_master_data_phase(
263 &config.generator_config,
264 &sender,
265 &control,
266 )?;
267 items_generated += result;
268 }
269 GenerationPhase::DocumentFlows => {
270 let result = Self::generate_document_flows_phase(
271 &config.generator_config,
272 &sender,
273 &control,
274 progress_interval,
275 &mut progress,
276 )?;
277 items_generated += result;
278 }
279 GenerationPhase::OcpmEvents => {
280 warn!("OCPM event generation is not yet supported in streaming mode; skipping");
281 }
282 GenerationPhase::JournalEntries => {
283 let result = Self::generate_journal_entries_phase(
284 &config.generator_config,
285 &sender,
286 &control,
287 progress_interval,
288 &mut progress,
289 )?;
290 items_generated += result;
291 }
292 GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
293 warn!(
294 "Phase {:?} requires post-processing of existing data and is skipped in streaming mode",
295 phase
296 );
297 }
298 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
299 info!("Phase {:?} is not applicable in streaming mode", phase);
300 }
301 }
302
303 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
305 phase.name().to_string(),
306 )))?;
307 phases_completed.push(phase.name().to_string());
308
309 progress.items_generated = items_generated;
311 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
312 if progress.elapsed_ms > 0 {
313 progress.items_per_second =
314 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
315 }
316 sender.send(StreamEvent::Progress(progress.clone()))?;
317 }
318
319 let stats = sender.stats();
321 let summary = StreamSummary {
322 total_items: items_generated,
323 total_time_ms: start_time.elapsed().as_millis() as u64,
324 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
325 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
326 } else {
327 0.0
328 },
329 error_count: 0,
330 dropped_count: stats.items_dropped,
331 peak_memory_mb: None,
332 phases_completed,
333 };
334
335 sender.send(StreamEvent::Complete(summary))?;
336 sender.close();
337
338 Ok(())
339 }
340
341 fn generate_coa_phase(
343 config: &GeneratorConfig,
344 sender: &StreamSender<GeneratedItem>,
345 control: &Arc<StreamControl>,
346 ) -> SynthResult<u64> {
347 use datasynth_generators::ChartOfAccountsGenerator;
348
349 if control.is_cancelled() {
350 return Ok(0);
351 }
352
353 info!("Generating Chart of Accounts");
354 let seed = config.global.seed.unwrap_or(42);
355 let complexity = config.chart_of_accounts.complexity;
356 let industry = config.global.industry;
357 let coa_framework = resolve_coa_framework_from_config(config);
358
359 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
360 .with_coa_framework(coa_framework);
361 let coa = coa_gen.generate();
362
363 let account_count = coa.account_count() as u64;
364 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
365 coa,
366 ))))?;
367
368 Ok(account_count)
369 }
370
371 fn generate_master_data_phase(
373 config: &GeneratorConfig,
374 sender: &StreamSender<GeneratedItem>,
375 control: &Arc<StreamControl>,
376 ) -> SynthResult<u64> {
377 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
378
379 let mut count: u64 = 0;
380 let seed = config.global.seed.unwrap_or(42);
381 let md_config = &config.master_data;
382 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
383 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
384
385 let company_code = config
386 .companies
387 .first()
388 .map(|c| c.code.as_str())
389 .unwrap_or("1000");
390
391 if control.is_cancelled() {
393 return Ok(count);
394 }
395
396 info!("Generating vendors");
397 let mut vendor_gen = VendorGenerator::new(seed);
398 for _ in 0..md_config.vendors.count {
399 if control.is_cancelled() {
400 break;
401 }
402 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
403 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
404 count += 1;
405 }
406
407 if control.is_cancelled() {
409 return Ok(count);
410 }
411
412 info!("Generating customers");
413 let mut customer_gen = CustomerGenerator::new(seed + 1);
414 for _ in 0..md_config.customers.count {
415 if control.is_cancelled() {
416 break;
417 }
418 let customer = customer_gen.generate_customer(company_code, effective_date);
419 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
420 customer,
421 ))))?;
422 count += 1;
423 }
424
425 if control.is_cancelled() {
427 return Ok(count);
428 }
429
430 info!("Generating employees");
431 let mut employee_gen = EmployeeGenerator::new(seed + 4);
432 let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
434 datasynth_generators::DepartmentDefinition {
435 code: first_custom.code.clone(),
436 name: first_custom.name.clone(),
437 cost_center: first_custom
438 .cost_center
439 .clone()
440 .unwrap_or_else(|| format!("CC{}", first_custom.code)),
441 headcount: 10,
442 system_roles: vec![],
443 transaction_codes: vec![],
444 }
445 } else {
446 warn!("No departments configured, using default 'General' department");
447 datasynth_generators::DepartmentDefinition {
448 code: "1000".to_string(),
449 name: "General".to_string(),
450 cost_center: "CC1000".to_string(),
451 headcount: 10,
452 system_roles: vec![],
453 transaction_codes: vec![],
454 }
455 };
456 for _ in 0..md_config.employees.count {
457 if control.is_cancelled() {
458 break;
459 }
460 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
461 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
462 employee,
463 ))))?;
464 count += 1;
465 }
466
467 Ok(count)
468 }
469
470 fn generate_journal_entries_phase(
475 config: &GeneratorConfig,
476 sender: &StreamSender<GeneratedItem>,
477 control: &Arc<StreamControl>,
478 progress_interval: u64,
479 progress: &mut StreamProgress,
480 ) -> SynthResult<u64> {
481 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
482 use std::sync::Arc;
483
484 let mut count: u64 = 0;
485 let seed = config.global.seed.unwrap_or(42);
486
487 let default_monthly = 500;
489 let total_entries: usize = config
490 .companies
491 .iter()
492 .map(|c| {
493 let monthly = (c.volume_weight * default_monthly as f64) as usize;
494 monthly.max(100) * config.global.period_months as usize
495 })
496 .sum();
497
498 progress.items_remaining = Some(total_entries as u64);
499 info!("Generating {} journal entries", total_entries);
500
501 let complexity = config.chart_of_accounts.complexity;
503 let industry = config.global.industry;
504 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
505 let coa = Arc::new(coa_gen.generate());
506
507 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
509 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
510 let end_date = start_date
511 .checked_add_months(chrono::Months::new(config.global.period_months))
512 .unwrap_or(start_date + chrono::Duration::days(365));
513
514 let mut je_gen = JournalEntryGenerator::from_generator_config(
516 config,
517 Arc::clone(&coa),
518 start_date,
519 end_date,
520 seed,
521 );
522
523 for _ in 0..total_entries {
524 if control.is_cancelled() {
525 break;
526 }
527
528 while control.is_paused() {
530 std::thread::sleep(std::time::Duration::from_millis(100));
531 if control.is_cancelled() {
532 break;
533 }
534 }
535
536 let je = je_gen.generate();
537 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
538 count += 1;
539
540 if count.is_multiple_of(progress_interval) {
542 progress.items_generated = count;
543 progress.items_remaining = Some(total_entries as u64 - count);
544 sender.send(StreamEvent::Progress(progress.clone()))?;
545 }
546 }
547
548 Ok(count)
549 }
550
551 fn generate_document_flows_phase(
557 config: &GeneratorConfig,
558 sender: &StreamSender<GeneratedItem>,
559 control: &Arc<StreamControl>,
560 progress_interval: u64,
561 progress: &mut StreamProgress,
562 ) -> SynthResult<u64> {
563 use chrono::Datelike;
564 use datasynth_generators::{
565 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
566 };
567
568 let mut count: u64 = 0;
569 let seed = config.global.seed.unwrap_or(42);
570 let df_config = &config.document_flows;
571 let md_config = &config.master_data;
572
573 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
575 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
576 let end_date = start_date
577 .checked_add_months(chrono::Months::new(config.global.period_months))
578 .unwrap_or(start_date + chrono::Duration::days(365));
579 let total_period_days = (end_date - start_date).num_days().max(1);
580
581 let company_code = config
582 .companies
583 .first()
584 .map(|c| c.code.as_str())
585 .unwrap_or("1000");
586
587 let vendor_count = md_config.vendors.count.min(100);
589 let customer_count = md_config.customers.count.min(100);
590 let material_count = md_config.materials.count.min(50);
591
592 let mut vendor_gen = VendorGenerator::new(seed);
594 let mut customer_gen = CustomerGenerator::new(seed + 1);
595 let mut material_gen = MaterialGenerator::new(seed + 2);
596
597 let vendors: Vec<_> = (0..vendor_count)
598 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
599 .collect();
600
601 let customers: Vec<_> = (0..customer_count)
602 .map(|_| customer_gen.generate_customer(company_code, start_date))
603 .collect();
604
605 let materials: Vec<_> = (0..material_count)
606 .map(|_| material_gen.generate_material(company_code, start_date))
607 .collect();
608
609 let base_chains = (config.global.period_months as usize * 50).max(100);
612
613 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
615 info!("Generating P2P document flows");
616 let mut p2p_gen = P2PGenerator::new(seed + 100);
617
618 let chains_to_generate = base_chains.min(1000);
619 progress.items_remaining = Some(chains_to_generate as u64);
620
621 for i in 0..chains_to_generate {
622 if control.is_cancelled() {
623 break;
624 }
625
626 while control.is_paused() {
628 std::thread::sleep(std::time::Duration::from_millis(100));
629 if control.is_cancelled() {
630 break;
631 }
632 }
633
634 let vendor = &vendors[i % vendors.len()];
635 let material_refs: Vec<&datasynth_core::models::Material> =
636 vec![&materials[i % materials.len()]];
637
638 let days_offset = (i as i64 % total_period_days).max(0);
640 let po_date = start_date + chrono::Duration::days(days_offset);
641 let fiscal_year = po_date.year() as u16;
642 let fiscal_period = po_date.month() as u8;
643
644 let chain = p2p_gen.generate_chain(
645 company_code,
646 vendor,
647 &material_refs,
648 po_date,
649 fiscal_year,
650 fiscal_period,
651 "SYSTEM",
652 );
653
654 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
656 chain.purchase_order,
657 ))))?;
658 count += 1;
659
660 for gr in chain.goods_receipts {
661 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
662 count += 1;
663 }
664
665 if let Some(vi) = chain.vendor_invoice {
666 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
667 vi,
668 ))))?;
669 count += 1;
670 }
671
672 if let Some(payment) = chain.payment {
673 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
674 count += 1;
675 }
676
677 if count.is_multiple_of(progress_interval) {
678 progress.items_generated = count;
679 sender.send(StreamEvent::Progress(progress.clone()))?;
680 }
681 }
682 }
683
684 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
686 info!("Generating O2C document flows");
687 let mut o2c_gen = O2CGenerator::new(seed + 200);
688
689 let chains_to_generate = base_chains.min(1000);
690
691 for i in 0..chains_to_generate {
692 if control.is_cancelled() {
693 break;
694 }
695
696 while control.is_paused() {
697 std::thread::sleep(std::time::Duration::from_millis(100));
698 if control.is_cancelled() {
699 break;
700 }
701 }
702
703 let customer = &customers[i % customers.len()];
704 let material_refs: Vec<&datasynth_core::models::Material> =
705 vec![&materials[i % materials.len()]];
706
707 let days_offset = (i as i64 % total_period_days).max(0);
708 let so_date = start_date + chrono::Duration::days(days_offset);
709 let fiscal_year = so_date.year() as u16;
710 let fiscal_period = so_date.month() as u8;
711
712 let chain = o2c_gen.generate_chain(
713 company_code,
714 customer,
715 &material_refs,
716 so_date,
717 fiscal_year,
718 fiscal_period,
719 "SYSTEM",
720 );
721
722 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
723 chain.sales_order,
724 ))))?;
725 count += 1;
726
727 for delivery in chain.deliveries {
728 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
729 delivery,
730 ))))?;
731 count += 1;
732 }
733
734 if let Some(ci) = chain.customer_invoice {
735 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
736 ci,
737 ))))?;
738 count += 1;
739 }
740
741 if count.is_multiple_of(progress_interval) {
742 progress.items_generated = count;
743 sender.send(StreamEvent::Progress(progress.clone()))?;
744 }
745 }
746 }
747
748 Ok(count)
749 }
750
751 pub fn stats(&self) -> StreamingOrchestratorStats {
753 StreamingOrchestratorStats {
754 phases: self.config.phases.len(),
755 buffer_size: self.config.stream_config.buffer_size,
756 backpressure: self.config.stream_config.backpressure,
757 }
758 }
759}
760
761#[derive(Debug, Clone)]
763pub struct StreamingOrchestratorStats {
764 pub phases: usize,
766 pub buffer_size: usize,
768 pub backpressure: BackpressureStrategy,
770}
771
772fn resolve_coa_framework_from_config(
774 config: &GeneratorConfig,
775) -> datasynth_generators::coa_generator::CoAFramework {
776 use datasynth_generators::coa_generator::CoAFramework;
777 if config.accounting_standards.enabled {
778 match config.accounting_standards.framework {
779 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
780 return CoAFramework::FrenchPcg;
781 }
782 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
783 return CoAFramework::GermanSkr04;
784 }
785 _ => {}
786 }
787 }
788 CoAFramework::UsGaap
789}
790
791#[cfg(test)]
792#[allow(clippy::unwrap_used)]
793mod tests {
794 use super::*;
795 use datasynth_config::presets::create_preset;
796 use datasynth_config::schema::TransactionVolume;
797 use datasynth_core::models::{CoAComplexity, IndustrySector};
798
799 fn create_test_config() -> GeneratorConfig {
800 create_preset(
801 IndustrySector::Retail,
802 2,
803 3,
804 CoAComplexity::Small,
805 TransactionVolume::TenK,
806 )
807 }
808
809 #[test]
810 fn test_streaming_orchestrator_creation() {
811 let config = create_test_config();
812 let orchestrator = StreamingOrchestrator::from_generator_config(config);
813 let stats = orchestrator.stats();
814
815 assert!(stats.phases > 0);
816 assert!(stats.buffer_size > 0);
817 }
818
819 #[test]
820 fn test_streaming_generation() {
821 let mut config = create_test_config();
822 config.master_data.vendors.count = 5;
824 config.master_data.customers.count = 5;
825 config.master_data.employees.count = 5;
826 config.global.period_months = 1;
827
828 let streaming_config = StreamingOrchestratorConfig::new(config)
829 .with_phases(vec![
830 GenerationPhase::ChartOfAccounts,
831 GenerationPhase::MasterData,
832 ])
833 .with_stream_config(StreamConfig {
834 buffer_size: 100,
835 progress_interval: 10,
836 ..Default::default()
837 });
838
839 let orchestrator = StreamingOrchestrator::new(streaming_config);
840 let (receiver, _control) = orchestrator.stream().unwrap();
841
842 let mut items_count = 0;
843 let mut has_coa = false;
844 let mut has_completion = false;
845
846 for event in receiver {
847 match event {
848 StreamEvent::Data(item) => {
849 items_count += 1;
850 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
851 has_coa = true;
852 }
853 }
854 StreamEvent::Complete(_) => {
855 has_completion = true;
856 break;
857 }
858 _ => {}
859 }
860 }
861
862 assert!(items_count > 0);
863 assert!(has_coa);
864 assert!(has_completion);
865 }
866
867 #[test]
868 fn test_stream_cancellation() {
869 let mut config = create_test_config();
870 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
873 .with_phases(vec![GenerationPhase::JournalEntries]);
874
875 let orchestrator = StreamingOrchestrator::new(streaming_config);
876 let (receiver, control) = orchestrator.stream().unwrap();
877
878 let mut items_count = 0;
880 for event in receiver {
881 if let StreamEvent::Data(_) = event {
882 items_count += 1;
883 if items_count >= 10 {
884 control.cancel();
885 break;
886 }
887 }
888 }
889
890 assert!(control.is_cancelled());
891 }
892
893 #[test]
894 fn test_generated_item_type_name() {
895 use datasynth_core::models::{CoAComplexity, IndustrySector};
896
897 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
898 "TEST_COA".to_string(),
899 "Test Chart of Accounts".to_string(),
900 "US".to_string(),
901 IndustrySector::Manufacturing,
902 CoAComplexity::Small,
903 )));
904 assert_eq!(coa.type_name(), "chart_of_accounts");
905
906 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
907 assert_eq!(progress.type_name(), "progress");
908 }
909}