1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16 documents::{
17 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18 },
19 ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29 ChartOfAccounts(Box<ChartOfAccounts>),
31 Vendor(Box<Vendor>),
33 Customer(Box<Customer>),
35 Material(Box<Material>),
37 Employee(Box<Employee>),
39 JournalEntry(Box<JournalEntry>),
41 PurchaseOrder(Box<PurchaseOrder>),
43 GoodsReceipt(Box<GoodsReceipt>),
45 VendorInvoice(Box<VendorInvoice>),
47 Payment(Box<Payment>),
49 SalesOrder(Box<SalesOrder>),
51 Delivery(Box<Delivery>),
53 CustomerInvoice(Box<CustomerInvoice>),
55 Progress(StreamProgress),
57 PhaseComplete(String),
59}
60
61impl GeneratedItem {
62 pub fn type_name(&self) -> &'static str {
64 match self {
65 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66 GeneratedItem::Vendor(_) => "vendor",
67 GeneratedItem::Customer(_) => "customer",
68 GeneratedItem::Material(_) => "material",
69 GeneratedItem::Employee(_) => "employee",
70 GeneratedItem::JournalEntry(_) => "journal_entry",
71 GeneratedItem::PurchaseOrder(_) => "purchase_order",
72 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74 GeneratedItem::Payment(_) => "payment",
75 GeneratedItem::SalesOrder(_) => "sales_order",
76 GeneratedItem::Delivery(_) => "delivery",
77 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78 GeneratedItem::Progress(_) => "progress",
79 GeneratedItem::PhaseComplete(_) => "phase_complete",
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87 ChartOfAccounts,
89 MasterData,
91 DocumentFlows,
93 OcpmEvents,
95 JournalEntries,
97 AnomalyInjection,
99 BalanceValidation,
101 DataQuality,
103 Complete,
105}
106
107impl GenerationPhase {
108 pub fn name(&self) -> &'static str {
110 match self {
111 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112 GenerationPhase::MasterData => "master_data",
113 GenerationPhase::DocumentFlows => "document_flows",
114 GenerationPhase::OcpmEvents => "ocpm_events",
115 GenerationPhase::JournalEntries => "journal_entries",
116 GenerationPhase::AnomalyInjection => "anomaly_injection",
117 GenerationPhase::BalanceValidation => "balance_validation",
118 GenerationPhase::DataQuality => "data_quality",
119 GenerationPhase::Complete => "complete",
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127 pub generator_config: GeneratorConfig,
129 pub stream_config: StreamConfig,
131 pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136 pub fn new(generator_config: GeneratorConfig) -> Self {
138 Self {
139 generator_config,
140 stream_config: StreamConfig::default(),
141 phases: vec![
142 GenerationPhase::ChartOfAccounts,
143 GenerationPhase::MasterData,
144 GenerationPhase::DocumentFlows,
145 GenerationPhase::JournalEntries,
146 ],
147 }
148 }
149
150 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152 Self {
153 generator_config,
154 stream_config: StreamConfig::default(),
155 phases: vec![
156 GenerationPhase::ChartOfAccounts,
157 GenerationPhase::MasterData,
158 GenerationPhase::DocumentFlows,
159 GenerationPhase::OcpmEvents,
160 GenerationPhase::JournalEntries,
161 GenerationPhase::AnomalyInjection,
162 GenerationPhase::DataQuality,
163 ],
164 }
165 }
166
167 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169 self.stream_config = config;
170 self
171 }
172
173 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175 self.phases = phases;
176 self
177 }
178}
179
180pub struct StreamingOrchestrator {
182 config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186 pub fn new(config: StreamingOrchestratorConfig) -> Self {
188 Self { config }
189 }
190
191 pub fn from_generator_config(config: GeneratorConfig) -> Self {
193 Self::new(StreamingOrchestratorConfig::new(config))
194 }
195
196 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200 let (sender, receiver) = stream_channel(
201 self.config.stream_config.buffer_size,
202 self.config.stream_config.backpressure,
203 );
204
205 let control = Arc::new(StreamControl::new());
206 let control_clone = Arc::clone(&control);
207
208 let config = self.config.clone();
209
210 thread::spawn(move || {
212 let result = Self::run_generation(config, sender, control_clone);
213 if let Err(e) = result {
214 warn!("Streaming generation error: {}", e);
215 }
216 });
217
218 Ok((receiver, control))
219 }
220
221 fn run_generation(
223 config: StreamingOrchestratorConfig,
224 sender: StreamSender<GeneratedItem>,
225 control: Arc<StreamControl>,
226 ) -> SynthResult<()> {
227 let start_time = Instant::now();
228 let mut items_generated: u64 = 0;
229 let mut phases_completed = Vec::new();
230
231 let progress_interval = config.stream_config.progress_interval;
233
234 let mut progress = StreamProgress::new("initializing");
236 sender.send(StreamEvent::Progress(progress.clone()))?;
237
238 for phase in &config.phases {
239 if control.is_cancelled() {
240 info!("Generation cancelled");
241 break;
242 }
243
244 while control.is_paused() {
246 std::thread::sleep(std::time::Duration::from_millis(100));
247 if control.is_cancelled() {
248 break;
249 }
250 }
251
252 progress.phase = phase.name().to_string();
253 sender.send(StreamEvent::Progress(progress.clone()))?;
254
255 match phase {
256 GenerationPhase::ChartOfAccounts => {
257 let result =
258 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259 items_generated += result;
260 }
261 GenerationPhase::MasterData => {
262 let result = Self::generate_master_data_phase(
263 &config.generator_config,
264 &sender,
265 &control,
266 )?;
267 items_generated += result;
268 }
269 GenerationPhase::DocumentFlows => {
270 let result = Self::generate_document_flows_phase(
271 &config.generator_config,
272 &sender,
273 &control,
274 progress_interval,
275 &mut progress,
276 )?;
277 items_generated += result;
278 }
279 GenerationPhase::OcpmEvents => {
280 debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282 }
283 GenerationPhase::JournalEntries => {
284 let result = Self::generate_journal_entries_phase(
285 &config.generator_config,
286 &sender,
287 &control,
288 progress_interval,
289 &mut progress,
290 )?;
291 items_generated += result;
292 }
293 GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294 debug!(
296 "Phase {:?} operates on existing data (not streaming new items)",
297 phase
298 );
299 }
300 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301 debug!("Phase {:?} is a validation/completion phase", phase);
303 }
304 }
305
306 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308 phase.name().to_string(),
309 )))?;
310 phases_completed.push(phase.name().to_string());
311
312 progress.items_generated = items_generated;
314 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315 if progress.elapsed_ms > 0 {
316 progress.items_per_second =
317 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318 }
319 sender.send(StreamEvent::Progress(progress.clone()))?;
320 }
321
322 let stats = sender.stats();
324 let summary = StreamSummary {
325 total_items: items_generated,
326 total_time_ms: start_time.elapsed().as_millis() as u64,
327 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329 } else {
330 0.0
331 },
332 error_count: 0,
333 dropped_count: stats.items_dropped,
334 peak_memory_mb: None,
335 phases_completed,
336 };
337
338 sender.send(StreamEvent::Complete(summary))?;
339 sender.close();
340
341 Ok(())
342 }
343
344 fn generate_coa_phase(
346 config: &GeneratorConfig,
347 sender: &StreamSender<GeneratedItem>,
348 control: &Arc<StreamControl>,
349 ) -> SynthResult<u64> {
350 use datasynth_generators::ChartOfAccountsGenerator;
351
352 if control.is_cancelled() {
353 return Ok(0);
354 }
355
356 info!("Generating Chart of Accounts");
357 let seed = config.global.seed.unwrap_or(42);
358 let complexity = config.chart_of_accounts.complexity;
359 let industry = config.global.industry;
360 let coa_framework = resolve_coa_framework_from_config(config);
361
362 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
363 .with_coa_framework(coa_framework);
364 let coa = coa_gen.generate();
365
366 let account_count = coa.account_count() as u64;
367 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
368 coa,
369 ))))?;
370
371 Ok(account_count)
372 }
373
374 fn generate_master_data_phase(
376 config: &GeneratorConfig,
377 sender: &StreamSender<GeneratedItem>,
378 control: &Arc<StreamControl>,
379 ) -> SynthResult<u64> {
380 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
381
382 let mut count: u64 = 0;
383 let seed = config.global.seed.unwrap_or(42);
384 let md_config = &config.master_data;
385 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
386 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
387
388 let company_code = config
389 .companies
390 .first()
391 .map(|c| c.code.as_str())
392 .unwrap_or("1000");
393
394 if control.is_cancelled() {
396 return Ok(count);
397 }
398
399 info!("Generating vendors");
400 let mut vendor_gen = VendorGenerator::new(seed);
401 for _ in 0..md_config.vendors.count {
402 if control.is_cancelled() {
403 break;
404 }
405 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
406 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
407 count += 1;
408 }
409
410 if control.is_cancelled() {
412 return Ok(count);
413 }
414
415 info!("Generating customers");
416 let mut customer_gen = CustomerGenerator::new(seed + 1);
417 for _ in 0..md_config.customers.count {
418 if control.is_cancelled() {
419 break;
420 }
421 let customer = customer_gen.generate_customer(company_code, effective_date);
422 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
423 customer,
424 ))))?;
425 count += 1;
426 }
427
428 if control.is_cancelled() {
430 return Ok(count);
431 }
432
433 info!("Generating employees");
434 let mut employee_gen = EmployeeGenerator::new(seed + 4);
435 let dept = datasynth_generators::DepartmentDefinition {
437 code: "1000".to_string(),
438 name: "General".to_string(),
439 cost_center: "CC1000".to_string(),
440 headcount: 10,
441 system_roles: vec![],
442 transaction_codes: vec![],
443 };
444 for _ in 0..md_config.employees.count {
445 if control.is_cancelled() {
446 break;
447 }
448 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
449 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
450 employee,
451 ))))?;
452 count += 1;
453 }
454
455 Ok(count)
456 }
457
458 fn generate_journal_entries_phase(
463 config: &GeneratorConfig,
464 sender: &StreamSender<GeneratedItem>,
465 control: &Arc<StreamControl>,
466 progress_interval: u64,
467 progress: &mut StreamProgress,
468 ) -> SynthResult<u64> {
469 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
470 use std::sync::Arc;
471
472 let mut count: u64 = 0;
473 let seed = config.global.seed.unwrap_or(42);
474
475 let default_monthly = 500;
477 let total_entries: usize = config
478 .companies
479 .iter()
480 .map(|c| {
481 let monthly = (c.volume_weight * default_monthly as f64) as usize;
482 monthly.max(100) * config.global.period_months as usize
483 })
484 .sum();
485
486 progress.items_remaining = Some(total_entries as u64);
487 info!("Generating {} journal entries", total_entries);
488
489 let complexity = config.chart_of_accounts.complexity;
491 let industry = config.global.industry;
492 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
493 let coa = Arc::new(coa_gen.generate());
494
495 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
497 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
498 let end_date =
499 start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
500
501 let mut je_gen = JournalEntryGenerator::from_generator_config(
503 config,
504 Arc::clone(&coa),
505 start_date,
506 end_date,
507 seed,
508 );
509
510 for _ in 0..total_entries {
511 if control.is_cancelled() {
512 break;
513 }
514
515 while control.is_paused() {
517 std::thread::sleep(std::time::Duration::from_millis(100));
518 if control.is_cancelled() {
519 break;
520 }
521 }
522
523 let je = je_gen.generate();
524 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
525 count += 1;
526
527 if count.is_multiple_of(progress_interval) {
529 progress.items_generated = count;
530 progress.items_remaining = Some(total_entries as u64 - count);
531 sender.send(StreamEvent::Progress(progress.clone()))?;
532 }
533 }
534
535 Ok(count)
536 }
537
538 fn generate_document_flows_phase(
544 config: &GeneratorConfig,
545 sender: &StreamSender<GeneratedItem>,
546 control: &Arc<StreamControl>,
547 progress_interval: u64,
548 progress: &mut StreamProgress,
549 ) -> SynthResult<u64> {
550 use chrono::Datelike;
551 use datasynth_generators::{
552 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
553 };
554
555 let mut count: u64 = 0;
556 let seed = config.global.seed.unwrap_or(42);
557 let df_config = &config.document_flows;
558 let md_config = &config.master_data;
559
560 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
562 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
563
564 let company_code = config
565 .companies
566 .first()
567 .map(|c| c.code.as_str())
568 .unwrap_or("1000");
569
570 let vendor_count = md_config.vendors.count.min(100);
572 let customer_count = md_config.customers.count.min(100);
573 let material_count = md_config.materials.count.min(50);
574
575 let mut vendor_gen = VendorGenerator::new(seed);
577 let mut customer_gen = CustomerGenerator::new(seed + 1);
578 let mut material_gen = MaterialGenerator::new(seed + 2);
579
580 let vendors: Vec<_> = (0..vendor_count)
581 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
582 .collect();
583
584 let customers: Vec<_> = (0..customer_count)
585 .map(|_| customer_gen.generate_customer(company_code, start_date))
586 .collect();
587
588 let materials: Vec<_> = (0..material_count)
589 .map(|_| material_gen.generate_material(company_code, start_date))
590 .collect();
591
592 let base_chains = (config.global.period_months as usize * 50).max(100);
595
596 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
598 info!("Generating P2P document flows");
599 let mut p2p_gen = P2PGenerator::new(seed + 100);
600
601 let chains_to_generate = base_chains.min(1000);
602 progress.items_remaining = Some(chains_to_generate as u64);
603
604 for i in 0..chains_to_generate {
605 if control.is_cancelled() {
606 break;
607 }
608
609 while control.is_paused() {
611 std::thread::sleep(std::time::Duration::from_millis(100));
612 if control.is_cancelled() {
613 break;
614 }
615 }
616
617 let vendor = &vendors[i % vendors.len()];
618 let material_refs: Vec<&datasynth_core::models::Material> =
619 vec![&materials[i % materials.len()]];
620
621 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
623 let po_date = start_date + chrono::Duration::days(days_offset);
624 let fiscal_year = po_date.year() as u16;
625 let fiscal_period = po_date.month() as u8;
626
627 let chain = p2p_gen.generate_chain(
628 company_code,
629 vendor,
630 &material_refs,
631 po_date,
632 fiscal_year,
633 fiscal_period,
634 "SYSTEM",
635 );
636
637 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
639 chain.purchase_order,
640 ))))?;
641 count += 1;
642
643 for gr in chain.goods_receipts {
644 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
645 count += 1;
646 }
647
648 if let Some(vi) = chain.vendor_invoice {
649 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
650 vi,
651 ))))?;
652 count += 1;
653 }
654
655 if let Some(payment) = chain.payment {
656 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
657 count += 1;
658 }
659
660 if count.is_multiple_of(progress_interval) {
661 progress.items_generated = count;
662 sender.send(StreamEvent::Progress(progress.clone()))?;
663 }
664 }
665 }
666
667 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
669 info!("Generating O2C document flows");
670 let mut o2c_gen = O2CGenerator::new(seed + 200);
671
672 let chains_to_generate = base_chains.min(1000);
673
674 for i in 0..chains_to_generate {
675 if control.is_cancelled() {
676 break;
677 }
678
679 while control.is_paused() {
680 std::thread::sleep(std::time::Duration::from_millis(100));
681 if control.is_cancelled() {
682 break;
683 }
684 }
685
686 let customer = &customers[i % customers.len()];
687 let material_refs: Vec<&datasynth_core::models::Material> =
688 vec![&materials[i % materials.len()]];
689
690 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
691 let so_date = start_date + chrono::Duration::days(days_offset);
692 let fiscal_year = so_date.year() as u16;
693 let fiscal_period = so_date.month() as u8;
694
695 let chain = o2c_gen.generate_chain(
696 company_code,
697 customer,
698 &material_refs,
699 so_date,
700 fiscal_year,
701 fiscal_period,
702 "SYSTEM",
703 );
704
705 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
706 chain.sales_order,
707 ))))?;
708 count += 1;
709
710 for delivery in chain.deliveries {
711 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
712 delivery,
713 ))))?;
714 count += 1;
715 }
716
717 if let Some(ci) = chain.customer_invoice {
718 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
719 ci,
720 ))))?;
721 count += 1;
722 }
723
724 if count.is_multiple_of(progress_interval) {
725 progress.items_generated = count;
726 sender.send(StreamEvent::Progress(progress.clone()))?;
727 }
728 }
729 }
730
731 Ok(count)
732 }
733
734 pub fn stats(&self) -> StreamingOrchestratorStats {
736 StreamingOrchestratorStats {
737 phases: self.config.phases.len(),
738 buffer_size: self.config.stream_config.buffer_size,
739 backpressure: self.config.stream_config.backpressure,
740 }
741 }
742}
743
744#[derive(Debug, Clone)]
746pub struct StreamingOrchestratorStats {
747 pub phases: usize,
749 pub buffer_size: usize,
751 pub backpressure: BackpressureStrategy,
753}
754
755fn resolve_coa_framework_from_config(
757 config: &GeneratorConfig,
758) -> datasynth_generators::coa_generator::CoAFramework {
759 use datasynth_generators::coa_generator::CoAFramework;
760 if config.accounting_standards.enabled {
761 match config.accounting_standards.framework {
762 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
763 return CoAFramework::FrenchPcg;
764 }
765 Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
766 return CoAFramework::GermanSkr04;
767 }
768 _ => {}
769 }
770 }
771 CoAFramework::UsGaap
772}
773
774#[cfg(test)]
775#[allow(clippy::unwrap_used)]
776mod tests {
777 use super::*;
778 use datasynth_config::presets::create_preset;
779 use datasynth_config::schema::TransactionVolume;
780 use datasynth_core::models::{CoAComplexity, IndustrySector};
781
782 fn create_test_config() -> GeneratorConfig {
783 create_preset(
784 IndustrySector::Retail,
785 2,
786 3,
787 CoAComplexity::Small,
788 TransactionVolume::TenK,
789 )
790 }
791
792 #[test]
793 fn test_streaming_orchestrator_creation() {
794 let config = create_test_config();
795 let orchestrator = StreamingOrchestrator::from_generator_config(config);
796 let stats = orchestrator.stats();
797
798 assert!(stats.phases > 0);
799 assert!(stats.buffer_size > 0);
800 }
801
802 #[test]
803 fn test_streaming_generation() {
804 let mut config = create_test_config();
805 config.master_data.vendors.count = 5;
807 config.master_data.customers.count = 5;
808 config.master_data.employees.count = 5;
809 config.global.period_months = 1;
810
811 let streaming_config = StreamingOrchestratorConfig::new(config)
812 .with_phases(vec![
813 GenerationPhase::ChartOfAccounts,
814 GenerationPhase::MasterData,
815 ])
816 .with_stream_config(StreamConfig {
817 buffer_size: 100,
818 progress_interval: 10,
819 ..Default::default()
820 });
821
822 let orchestrator = StreamingOrchestrator::new(streaming_config);
823 let (receiver, _control) = orchestrator.stream().unwrap();
824
825 let mut items_count = 0;
826 let mut has_coa = false;
827 let mut has_completion = false;
828
829 for event in receiver {
830 match event {
831 StreamEvent::Data(item) => {
832 items_count += 1;
833 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
834 has_coa = true;
835 }
836 }
837 StreamEvent::Complete(_) => {
838 has_completion = true;
839 break;
840 }
841 _ => {}
842 }
843 }
844
845 assert!(items_count > 0);
846 assert!(has_coa);
847 assert!(has_completion);
848 }
849
850 #[test]
851 fn test_stream_cancellation() {
852 let mut config = create_test_config();
853 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
856 .with_phases(vec![GenerationPhase::JournalEntries]);
857
858 let orchestrator = StreamingOrchestrator::new(streaming_config);
859 let (receiver, control) = orchestrator.stream().unwrap();
860
861 let mut items_count = 0;
863 for event in receiver {
864 if let StreamEvent::Data(_) = event {
865 items_count += 1;
866 if items_count >= 10 {
867 control.cancel();
868 break;
869 }
870 }
871 }
872
873 assert!(control.is_cancelled());
874 }
875
876 #[test]
877 fn test_generated_item_type_name() {
878 use datasynth_core::models::{CoAComplexity, IndustrySector};
879
880 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
881 "TEST_COA".to_string(),
882 "Test Chart of Accounts".to_string(),
883 "US".to_string(),
884 IndustrySector::Manufacturing,
885 CoAComplexity::Small,
886 )));
887 assert_eq!(coa.type_name(), "chart_of_accounts");
888
889 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
890 assert_eq!(progress.type_name(), "progress");
891 }
892}