1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16 documents::{
17 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18 },
19 ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29 ChartOfAccounts(Box<ChartOfAccounts>),
31 Vendor(Box<Vendor>),
33 Customer(Box<Customer>),
35 Material(Box<Material>),
37 Employee(Box<Employee>),
39 JournalEntry(Box<JournalEntry>),
41 PurchaseOrder(Box<PurchaseOrder>),
43 GoodsReceipt(Box<GoodsReceipt>),
45 VendorInvoice(Box<VendorInvoice>),
47 Payment(Box<Payment>),
49 SalesOrder(Box<SalesOrder>),
51 Delivery(Box<Delivery>),
53 CustomerInvoice(Box<CustomerInvoice>),
55 Progress(StreamProgress),
57 PhaseComplete(String),
59}
60
61impl GeneratedItem {
62 pub fn type_name(&self) -> &'static str {
64 match self {
65 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66 GeneratedItem::Vendor(_) => "vendor",
67 GeneratedItem::Customer(_) => "customer",
68 GeneratedItem::Material(_) => "material",
69 GeneratedItem::Employee(_) => "employee",
70 GeneratedItem::JournalEntry(_) => "journal_entry",
71 GeneratedItem::PurchaseOrder(_) => "purchase_order",
72 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74 GeneratedItem::Payment(_) => "payment",
75 GeneratedItem::SalesOrder(_) => "sales_order",
76 GeneratedItem::Delivery(_) => "delivery",
77 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78 GeneratedItem::Progress(_) => "progress",
79 GeneratedItem::PhaseComplete(_) => "phase_complete",
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87 ChartOfAccounts,
89 MasterData,
91 DocumentFlows,
93 OcpmEvents,
95 JournalEntries,
97 AnomalyInjection,
99 BalanceValidation,
101 DataQuality,
103 Complete,
105}
106
107impl GenerationPhase {
108 pub fn name(&self) -> &'static str {
110 match self {
111 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112 GenerationPhase::MasterData => "master_data",
113 GenerationPhase::DocumentFlows => "document_flows",
114 GenerationPhase::OcpmEvents => "ocpm_events",
115 GenerationPhase::JournalEntries => "journal_entries",
116 GenerationPhase::AnomalyInjection => "anomaly_injection",
117 GenerationPhase::BalanceValidation => "balance_validation",
118 GenerationPhase::DataQuality => "data_quality",
119 GenerationPhase::Complete => "complete",
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127 pub generator_config: GeneratorConfig,
129 pub stream_config: StreamConfig,
131 pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136 pub fn new(generator_config: GeneratorConfig) -> Self {
138 Self {
139 generator_config,
140 stream_config: StreamConfig::default(),
141 phases: vec![
142 GenerationPhase::ChartOfAccounts,
143 GenerationPhase::MasterData,
144 GenerationPhase::DocumentFlows,
145 GenerationPhase::JournalEntries,
146 ],
147 }
148 }
149
150 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152 Self {
153 generator_config,
154 stream_config: StreamConfig::default(),
155 phases: vec![
156 GenerationPhase::ChartOfAccounts,
157 GenerationPhase::MasterData,
158 GenerationPhase::DocumentFlows,
159 GenerationPhase::OcpmEvents,
160 GenerationPhase::JournalEntries,
161 GenerationPhase::AnomalyInjection,
162 GenerationPhase::DataQuality,
163 ],
164 }
165 }
166
167 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169 self.stream_config = config;
170 self
171 }
172
173 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175 self.phases = phases;
176 self
177 }
178}
179
180pub struct StreamingOrchestrator {
182 config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186 pub fn new(config: StreamingOrchestratorConfig) -> Self {
188 Self { config }
189 }
190
191 pub fn from_generator_config(config: GeneratorConfig) -> Self {
193 Self::new(StreamingOrchestratorConfig::new(config))
194 }
195
196 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200 let (sender, receiver) = stream_channel(
201 self.config.stream_config.buffer_size,
202 self.config.stream_config.backpressure,
203 );
204
205 let control = Arc::new(StreamControl::new());
206 let control_clone = Arc::clone(&control);
207
208 let config = self.config.clone();
209
210 thread::spawn(move || {
212 let result = Self::run_generation(config, sender, control_clone);
213 if let Err(e) = result {
214 warn!("Streaming generation error: {}", e);
215 }
216 });
217
218 Ok((receiver, control))
219 }
220
221 fn run_generation(
223 config: StreamingOrchestratorConfig,
224 sender: StreamSender<GeneratedItem>,
225 control: Arc<StreamControl>,
226 ) -> SynthResult<()> {
227 let start_time = Instant::now();
228 let mut items_generated: u64 = 0;
229 let mut phases_completed = Vec::new();
230
231 let progress_interval = config.stream_config.progress_interval;
233
234 let mut progress = StreamProgress::new("initializing");
236 sender.send(StreamEvent::Progress(progress.clone()))?;
237
238 for phase in &config.phases {
239 if control.is_cancelled() {
240 info!("Generation cancelled");
241 break;
242 }
243
244 while control.is_paused() {
246 std::thread::sleep(std::time::Duration::from_millis(100));
247 if control.is_cancelled() {
248 break;
249 }
250 }
251
252 progress.phase = phase.name().to_string();
253 sender.send(StreamEvent::Progress(progress.clone()))?;
254
255 match phase {
256 GenerationPhase::ChartOfAccounts => {
257 let result =
258 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259 items_generated += result;
260 }
261 GenerationPhase::MasterData => {
262 let result = Self::generate_master_data_phase(
263 &config.generator_config,
264 &sender,
265 &control,
266 )?;
267 items_generated += result;
268 }
269 GenerationPhase::DocumentFlows => {
270 let result = Self::generate_document_flows_phase(
271 &config.generator_config,
272 &sender,
273 &control,
274 progress_interval,
275 &mut progress,
276 )?;
277 items_generated += result;
278 }
279 GenerationPhase::OcpmEvents => {
280 debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282 }
283 GenerationPhase::JournalEntries => {
284 let result = Self::generate_journal_entries_phase(
285 &config.generator_config,
286 &sender,
287 &control,
288 progress_interval,
289 &mut progress,
290 )?;
291 items_generated += result;
292 }
293 GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294 debug!(
296 "Phase {:?} operates on existing data (not streaming new items)",
297 phase
298 );
299 }
300 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301 debug!("Phase {:?} is a validation/completion phase", phase);
303 }
304 }
305
306 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308 phase.name().to_string(),
309 )))?;
310 phases_completed.push(phase.name().to_string());
311
312 progress.items_generated = items_generated;
314 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315 if progress.elapsed_ms > 0 {
316 progress.items_per_second =
317 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318 }
319 sender.send(StreamEvent::Progress(progress.clone()))?;
320 }
321
322 let stats = sender.stats();
324 let summary = StreamSummary {
325 total_items: items_generated,
326 total_time_ms: start_time.elapsed().as_millis() as u64,
327 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329 } else {
330 0.0
331 },
332 error_count: 0,
333 dropped_count: stats.items_dropped,
334 peak_memory_mb: None,
335 phases_completed,
336 };
337
338 sender.send(StreamEvent::Complete(summary))?;
339 sender.close();
340
341 Ok(())
342 }
343
344 fn generate_coa_phase(
346 config: &GeneratorConfig,
347 sender: &StreamSender<GeneratedItem>,
348 control: &Arc<StreamControl>,
349 ) -> SynthResult<u64> {
350 use datasynth_generators::ChartOfAccountsGenerator;
351
352 if control.is_cancelled() {
353 return Ok(0);
354 }
355
356 info!("Generating Chart of Accounts");
357 let seed = config.global.seed.unwrap_or(42);
358 let complexity = config.chart_of_accounts.complexity;
359 let industry = config.global.industry;
360 let use_french_pcg = config.accounting_standards.enabled
361 && matches!(
362 config.accounting_standards.framework,
363 Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap)
364 );
365
366 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
367 .with_french_pcg(use_french_pcg);
368 let coa = coa_gen.generate();
369
370 let account_count = coa.account_count() as u64;
371 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
372 coa,
373 ))))?;
374
375 Ok(account_count)
376 }
377
378 fn generate_master_data_phase(
380 config: &GeneratorConfig,
381 sender: &StreamSender<GeneratedItem>,
382 control: &Arc<StreamControl>,
383 ) -> SynthResult<u64> {
384 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
385
386 let mut count: u64 = 0;
387 let seed = config.global.seed.unwrap_or(42);
388 let md_config = &config.master_data;
389 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
390 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
391
392 let company_code = config
393 .companies
394 .first()
395 .map(|c| c.code.as_str())
396 .unwrap_or("1000");
397
398 if control.is_cancelled() {
400 return Ok(count);
401 }
402
403 info!("Generating vendors");
404 let mut vendor_gen = VendorGenerator::new(seed);
405 for _ in 0..md_config.vendors.count {
406 if control.is_cancelled() {
407 break;
408 }
409 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
410 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
411 count += 1;
412 }
413
414 if control.is_cancelled() {
416 return Ok(count);
417 }
418
419 info!("Generating customers");
420 let mut customer_gen = CustomerGenerator::new(seed + 1);
421 for _ in 0..md_config.customers.count {
422 if control.is_cancelled() {
423 break;
424 }
425 let customer = customer_gen.generate_customer(company_code, effective_date);
426 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
427 customer,
428 ))))?;
429 count += 1;
430 }
431
432 if control.is_cancelled() {
434 return Ok(count);
435 }
436
437 info!("Generating employees");
438 let mut employee_gen = EmployeeGenerator::new(seed + 4);
439 let dept = datasynth_generators::DepartmentDefinition {
441 code: "1000".to_string(),
442 name: "General".to_string(),
443 cost_center: "CC1000".to_string(),
444 headcount: 10,
445 system_roles: vec![],
446 transaction_codes: vec![],
447 };
448 for _ in 0..md_config.employees.count {
449 if control.is_cancelled() {
450 break;
451 }
452 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
453 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
454 employee,
455 ))))?;
456 count += 1;
457 }
458
459 Ok(count)
460 }
461
462 fn generate_journal_entries_phase(
467 config: &GeneratorConfig,
468 sender: &StreamSender<GeneratedItem>,
469 control: &Arc<StreamControl>,
470 progress_interval: u64,
471 progress: &mut StreamProgress,
472 ) -> SynthResult<u64> {
473 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
474 use std::sync::Arc;
475
476 let mut count: u64 = 0;
477 let seed = config.global.seed.unwrap_or(42);
478
479 let default_monthly = 500;
481 let total_entries: usize = config
482 .companies
483 .iter()
484 .map(|c| {
485 let monthly = (c.volume_weight * default_monthly as f64) as usize;
486 monthly.max(100) * config.global.period_months as usize
487 })
488 .sum();
489
490 progress.items_remaining = Some(total_entries as u64);
491 info!("Generating {} journal entries", total_entries);
492
493 let complexity = config.chart_of_accounts.complexity;
495 let industry = config.global.industry;
496 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
497 let coa = Arc::new(coa_gen.generate());
498
499 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
501 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
502 let end_date =
503 start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
504
505 let mut je_gen = JournalEntryGenerator::from_generator_config(
507 config,
508 Arc::clone(&coa),
509 start_date,
510 end_date,
511 seed,
512 );
513
514 for _ in 0..total_entries {
515 if control.is_cancelled() {
516 break;
517 }
518
519 while control.is_paused() {
521 std::thread::sleep(std::time::Duration::from_millis(100));
522 if control.is_cancelled() {
523 break;
524 }
525 }
526
527 let je = je_gen.generate();
528 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
529 count += 1;
530
531 if count.is_multiple_of(progress_interval) {
533 progress.items_generated = count;
534 progress.items_remaining = Some(total_entries as u64 - count);
535 sender.send(StreamEvent::Progress(progress.clone()))?;
536 }
537 }
538
539 Ok(count)
540 }
541
542 fn generate_document_flows_phase(
548 config: &GeneratorConfig,
549 sender: &StreamSender<GeneratedItem>,
550 control: &Arc<StreamControl>,
551 progress_interval: u64,
552 progress: &mut StreamProgress,
553 ) -> SynthResult<u64> {
554 use chrono::Datelike;
555 use datasynth_generators::{
556 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
557 };
558
559 let mut count: u64 = 0;
560 let seed = config.global.seed.unwrap_or(42);
561 let df_config = &config.document_flows;
562 let md_config = &config.master_data;
563
564 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
566 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
567
568 let company_code = config
569 .companies
570 .first()
571 .map(|c| c.code.as_str())
572 .unwrap_or("1000");
573
574 let vendor_count = md_config.vendors.count.min(100);
576 let customer_count = md_config.customers.count.min(100);
577 let material_count = md_config.materials.count.min(50);
578
579 let mut vendor_gen = VendorGenerator::new(seed);
581 let mut customer_gen = CustomerGenerator::new(seed + 1);
582 let mut material_gen = MaterialGenerator::new(seed + 2);
583
584 let vendors: Vec<_> = (0..vendor_count)
585 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
586 .collect();
587
588 let customers: Vec<_> = (0..customer_count)
589 .map(|_| customer_gen.generate_customer(company_code, start_date))
590 .collect();
591
592 let materials: Vec<_> = (0..material_count)
593 .map(|_| material_gen.generate_material(company_code, start_date))
594 .collect();
595
596 let base_chains = (config.global.period_months as usize * 50).max(100);
599
600 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
602 info!("Generating P2P document flows");
603 let mut p2p_gen = P2PGenerator::new(seed + 100);
604
605 let chains_to_generate = base_chains.min(1000);
606 progress.items_remaining = Some(chains_to_generate as u64);
607
608 for i in 0..chains_to_generate {
609 if control.is_cancelled() {
610 break;
611 }
612
613 while control.is_paused() {
615 std::thread::sleep(std::time::Duration::from_millis(100));
616 if control.is_cancelled() {
617 break;
618 }
619 }
620
621 let vendor = &vendors[i % vendors.len()];
622 let material_refs: Vec<&datasynth_core::models::Material> =
623 vec![&materials[i % materials.len()]];
624
625 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
627 let po_date = start_date + chrono::Duration::days(days_offset);
628 let fiscal_year = po_date.year() as u16;
629 let fiscal_period = po_date.month() as u8;
630
631 let chain = p2p_gen.generate_chain(
632 company_code,
633 vendor,
634 &material_refs,
635 po_date,
636 fiscal_year,
637 fiscal_period,
638 "SYSTEM",
639 );
640
641 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
643 chain.purchase_order,
644 ))))?;
645 count += 1;
646
647 for gr in chain.goods_receipts {
648 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
649 count += 1;
650 }
651
652 if let Some(vi) = chain.vendor_invoice {
653 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
654 vi,
655 ))))?;
656 count += 1;
657 }
658
659 if let Some(payment) = chain.payment {
660 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
661 count += 1;
662 }
663
664 if count.is_multiple_of(progress_interval) {
665 progress.items_generated = count;
666 sender.send(StreamEvent::Progress(progress.clone()))?;
667 }
668 }
669 }
670
671 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
673 info!("Generating O2C document flows");
674 let mut o2c_gen = O2CGenerator::new(seed + 200);
675
676 let chains_to_generate = base_chains.min(1000);
677
678 for i in 0..chains_to_generate {
679 if control.is_cancelled() {
680 break;
681 }
682
683 while control.is_paused() {
684 std::thread::sleep(std::time::Duration::from_millis(100));
685 if control.is_cancelled() {
686 break;
687 }
688 }
689
690 let customer = &customers[i % customers.len()];
691 let material_refs: Vec<&datasynth_core::models::Material> =
692 vec![&materials[i % materials.len()]];
693
694 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
695 let so_date = start_date + chrono::Duration::days(days_offset);
696 let fiscal_year = so_date.year() as u16;
697 let fiscal_period = so_date.month() as u8;
698
699 let chain = o2c_gen.generate_chain(
700 company_code,
701 customer,
702 &material_refs,
703 so_date,
704 fiscal_year,
705 fiscal_period,
706 "SYSTEM",
707 );
708
709 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
710 chain.sales_order,
711 ))))?;
712 count += 1;
713
714 for delivery in chain.deliveries {
715 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
716 delivery,
717 ))))?;
718 count += 1;
719 }
720
721 if let Some(ci) = chain.customer_invoice {
722 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
723 ci,
724 ))))?;
725 count += 1;
726 }
727
728 if count.is_multiple_of(progress_interval) {
729 progress.items_generated = count;
730 sender.send(StreamEvent::Progress(progress.clone()))?;
731 }
732 }
733 }
734
735 Ok(count)
736 }
737
738 pub fn stats(&self) -> StreamingOrchestratorStats {
740 StreamingOrchestratorStats {
741 phases: self.config.phases.len(),
742 buffer_size: self.config.stream_config.buffer_size,
743 backpressure: self.config.stream_config.backpressure,
744 }
745 }
746}
747
748#[derive(Debug, Clone)]
750pub struct StreamingOrchestratorStats {
751 pub phases: usize,
753 pub buffer_size: usize,
755 pub backpressure: BackpressureStrategy,
757}
758
759#[cfg(test)]
760#[allow(clippy::unwrap_used)]
761mod tests {
762 use super::*;
763 use datasynth_config::presets::create_preset;
764 use datasynth_config::schema::TransactionVolume;
765 use datasynth_core::models::{CoAComplexity, IndustrySector};
766
767 fn create_test_config() -> GeneratorConfig {
768 create_preset(
769 IndustrySector::Retail,
770 2,
771 3,
772 CoAComplexity::Small,
773 TransactionVolume::TenK,
774 )
775 }
776
777 #[test]
778 fn test_streaming_orchestrator_creation() {
779 let config = create_test_config();
780 let orchestrator = StreamingOrchestrator::from_generator_config(config);
781 let stats = orchestrator.stats();
782
783 assert!(stats.phases > 0);
784 assert!(stats.buffer_size > 0);
785 }
786
787 #[test]
788 fn test_streaming_generation() {
789 let mut config = create_test_config();
790 config.master_data.vendors.count = 5;
792 config.master_data.customers.count = 5;
793 config.master_data.employees.count = 5;
794 config.global.period_months = 1;
795
796 let streaming_config = StreamingOrchestratorConfig::new(config)
797 .with_phases(vec![
798 GenerationPhase::ChartOfAccounts,
799 GenerationPhase::MasterData,
800 ])
801 .with_stream_config(StreamConfig {
802 buffer_size: 100,
803 progress_interval: 10,
804 ..Default::default()
805 });
806
807 let orchestrator = StreamingOrchestrator::new(streaming_config);
808 let (receiver, _control) = orchestrator.stream().unwrap();
809
810 let mut items_count = 0;
811 let mut has_coa = false;
812 let mut has_completion = false;
813
814 for event in receiver {
815 match event {
816 StreamEvent::Data(item) => {
817 items_count += 1;
818 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
819 has_coa = true;
820 }
821 }
822 StreamEvent::Complete(_) => {
823 has_completion = true;
824 break;
825 }
826 _ => {}
827 }
828 }
829
830 assert!(items_count > 0);
831 assert!(has_coa);
832 assert!(has_completion);
833 }
834
835 #[test]
836 fn test_stream_cancellation() {
837 let mut config = create_test_config();
838 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
841 .with_phases(vec![GenerationPhase::JournalEntries]);
842
843 let orchestrator = StreamingOrchestrator::new(streaming_config);
844 let (receiver, control) = orchestrator.stream().unwrap();
845
846 let mut items_count = 0;
848 for event in receiver {
849 if let StreamEvent::Data(_) = event {
850 items_count += 1;
851 if items_count >= 10 {
852 control.cancel();
853 break;
854 }
855 }
856 }
857
858 assert!(control.is_cancelled());
859 }
860
861 #[test]
862 fn test_generated_item_type_name() {
863 use datasynth_core::models::{CoAComplexity, IndustrySector};
864
865 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
866 "TEST_COA".to_string(),
867 "Test Chart of Accounts".to_string(),
868 "US".to_string(),
869 IndustrySector::Manufacturing,
870 CoAComplexity::Small,
871 )));
872 assert_eq!(coa.type_name(), "chart_of_accounts");
873
874 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
875 assert_eq!(progress.type_name(), "progress");
876 }
877}