1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16 documents::{
17 CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18 },
19 ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29 ChartOfAccounts(Box<ChartOfAccounts>),
31 Vendor(Box<Vendor>),
33 Customer(Box<Customer>),
35 Material(Box<Material>),
37 Employee(Box<Employee>),
39 JournalEntry(Box<JournalEntry>),
41 PurchaseOrder(Box<PurchaseOrder>),
43 GoodsReceipt(Box<GoodsReceipt>),
45 VendorInvoice(Box<VendorInvoice>),
47 Payment(Box<Payment>),
49 SalesOrder(Box<SalesOrder>),
51 Delivery(Box<Delivery>),
53 CustomerInvoice(Box<CustomerInvoice>),
55 Progress(StreamProgress),
57 PhaseComplete(String),
59}
60
61impl GeneratedItem {
62 pub fn type_name(&self) -> &'static str {
64 match self {
65 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66 GeneratedItem::Vendor(_) => "vendor",
67 GeneratedItem::Customer(_) => "customer",
68 GeneratedItem::Material(_) => "material",
69 GeneratedItem::Employee(_) => "employee",
70 GeneratedItem::JournalEntry(_) => "journal_entry",
71 GeneratedItem::PurchaseOrder(_) => "purchase_order",
72 GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73 GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74 GeneratedItem::Payment(_) => "payment",
75 GeneratedItem::SalesOrder(_) => "sales_order",
76 GeneratedItem::Delivery(_) => "delivery",
77 GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78 GeneratedItem::Progress(_) => "progress",
79 GeneratedItem::PhaseComplete(_) => "phase_complete",
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87 ChartOfAccounts,
89 MasterData,
91 DocumentFlows,
93 OcpmEvents,
95 JournalEntries,
97 AnomalyInjection,
99 BalanceValidation,
101 DataQuality,
103 Complete,
105}
106
107impl GenerationPhase {
108 pub fn name(&self) -> &'static str {
110 match self {
111 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112 GenerationPhase::MasterData => "master_data",
113 GenerationPhase::DocumentFlows => "document_flows",
114 GenerationPhase::OcpmEvents => "ocpm_events",
115 GenerationPhase::JournalEntries => "journal_entries",
116 GenerationPhase::AnomalyInjection => "anomaly_injection",
117 GenerationPhase::BalanceValidation => "balance_validation",
118 GenerationPhase::DataQuality => "data_quality",
119 GenerationPhase::Complete => "complete",
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127 pub generator_config: GeneratorConfig,
129 pub stream_config: StreamConfig,
131 pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136 pub fn new(generator_config: GeneratorConfig) -> Self {
138 Self {
139 generator_config,
140 stream_config: StreamConfig::default(),
141 phases: vec![
142 GenerationPhase::ChartOfAccounts,
143 GenerationPhase::MasterData,
144 GenerationPhase::DocumentFlows,
145 GenerationPhase::JournalEntries,
146 ],
147 }
148 }
149
150 pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152 Self {
153 generator_config,
154 stream_config: StreamConfig::default(),
155 phases: vec![
156 GenerationPhase::ChartOfAccounts,
157 GenerationPhase::MasterData,
158 GenerationPhase::DocumentFlows,
159 GenerationPhase::OcpmEvents,
160 GenerationPhase::JournalEntries,
161 GenerationPhase::AnomalyInjection,
162 GenerationPhase::DataQuality,
163 ],
164 }
165 }
166
167 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169 self.stream_config = config;
170 self
171 }
172
173 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175 self.phases = phases;
176 self
177 }
178}
179
180pub struct StreamingOrchestrator {
182 config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186 pub fn new(config: StreamingOrchestratorConfig) -> Self {
188 Self { config }
189 }
190
191 pub fn from_generator_config(config: GeneratorConfig) -> Self {
193 Self::new(StreamingOrchestratorConfig::new(config))
194 }
195
196 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200 let (sender, receiver) = stream_channel(
201 self.config.stream_config.buffer_size,
202 self.config.stream_config.backpressure,
203 );
204
205 let control = Arc::new(StreamControl::new());
206 let control_clone = Arc::clone(&control);
207
208 let config = self.config.clone();
209
210 thread::spawn(move || {
212 let result = Self::run_generation(config, sender, control_clone);
213 if let Err(e) = result {
214 warn!("Streaming generation error: {}", e);
215 }
216 });
217
218 Ok((receiver, control))
219 }
220
221 fn run_generation(
223 config: StreamingOrchestratorConfig,
224 sender: StreamSender<GeneratedItem>,
225 control: Arc<StreamControl>,
226 ) -> SynthResult<()> {
227 let start_time = Instant::now();
228 let mut items_generated: u64 = 0;
229 let mut phases_completed = Vec::new();
230
231 let progress_interval = config.stream_config.progress_interval;
233
234 let mut progress = StreamProgress::new("initializing");
236 sender.send(StreamEvent::Progress(progress.clone()))?;
237
238 for phase in &config.phases {
239 if control.is_cancelled() {
240 info!("Generation cancelled");
241 break;
242 }
243
244 while control.is_paused() {
246 std::thread::sleep(std::time::Duration::from_millis(100));
247 if control.is_cancelled() {
248 break;
249 }
250 }
251
252 progress.phase = phase.name().to_string();
253 sender.send(StreamEvent::Progress(progress.clone()))?;
254
255 match phase {
256 GenerationPhase::ChartOfAccounts => {
257 let result =
258 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259 items_generated += result;
260 }
261 GenerationPhase::MasterData => {
262 let result = Self::generate_master_data_phase(
263 &config.generator_config,
264 &sender,
265 &control,
266 )?;
267 items_generated += result;
268 }
269 GenerationPhase::DocumentFlows => {
270 let result = Self::generate_document_flows_phase(
271 &config.generator_config,
272 &sender,
273 &control,
274 progress_interval,
275 &mut progress,
276 )?;
277 items_generated += result;
278 }
279 GenerationPhase::OcpmEvents => {
280 debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282 }
283 GenerationPhase::JournalEntries => {
284 let result = Self::generate_journal_entries_phase(
285 &config.generator_config,
286 &sender,
287 &control,
288 progress_interval,
289 &mut progress,
290 )?;
291 items_generated += result;
292 }
293 GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294 debug!(
296 "Phase {:?} operates on existing data (not streaming new items)",
297 phase
298 );
299 }
300 GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301 debug!("Phase {:?} is a validation/completion phase", phase);
303 }
304 }
305
306 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308 phase.name().to_string(),
309 )))?;
310 phases_completed.push(phase.name().to_string());
311
312 progress.items_generated = items_generated;
314 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315 if progress.elapsed_ms > 0 {
316 progress.items_per_second =
317 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318 }
319 sender.send(StreamEvent::Progress(progress.clone()))?;
320 }
321
322 let stats = sender.stats();
324 let summary = StreamSummary {
325 total_items: items_generated,
326 total_time_ms: start_time.elapsed().as_millis() as u64,
327 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329 } else {
330 0.0
331 },
332 error_count: 0,
333 dropped_count: stats.items_dropped,
334 peak_memory_mb: None,
335 phases_completed,
336 };
337
338 sender.send(StreamEvent::Complete(summary))?;
339 sender.close();
340
341 Ok(())
342 }
343
344 fn generate_coa_phase(
346 config: &GeneratorConfig,
347 sender: &StreamSender<GeneratedItem>,
348 control: &Arc<StreamControl>,
349 ) -> SynthResult<u64> {
350 use datasynth_generators::ChartOfAccountsGenerator;
351
352 if control.is_cancelled() {
353 return Ok(0);
354 }
355
356 info!("Generating Chart of Accounts");
357 let seed = config.global.seed.unwrap_or(42);
358 let complexity = config.chart_of_accounts.complexity;
359 let industry = config.global.industry;
360
361 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
362 let coa = coa_gen.generate();
363
364 let account_count = coa.account_count() as u64;
365 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
366 coa,
367 ))))?;
368
369 Ok(account_count)
370 }
371
372 fn generate_master_data_phase(
374 config: &GeneratorConfig,
375 sender: &StreamSender<GeneratedItem>,
376 control: &Arc<StreamControl>,
377 ) -> SynthResult<u64> {
378 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
379
380 let mut count: u64 = 0;
381 let seed = config.global.seed.unwrap_or(42);
382 let md_config = &config.master_data;
383 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
384 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
385
386 let company_code = config
387 .companies
388 .first()
389 .map(|c| c.code.as_str())
390 .unwrap_or("1000");
391
392 if control.is_cancelled() {
394 return Ok(count);
395 }
396
397 info!("Generating vendors");
398 let mut vendor_gen = VendorGenerator::new(seed);
399 for _ in 0..md_config.vendors.count {
400 if control.is_cancelled() {
401 break;
402 }
403 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
404 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
405 count += 1;
406 }
407
408 if control.is_cancelled() {
410 return Ok(count);
411 }
412
413 info!("Generating customers");
414 let mut customer_gen = CustomerGenerator::new(seed + 1);
415 for _ in 0..md_config.customers.count {
416 if control.is_cancelled() {
417 break;
418 }
419 let customer = customer_gen.generate_customer(company_code, effective_date);
420 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
421 customer,
422 ))))?;
423 count += 1;
424 }
425
426 if control.is_cancelled() {
428 return Ok(count);
429 }
430
431 info!("Generating employees");
432 let mut employee_gen = EmployeeGenerator::new(seed + 4);
433 let dept = datasynth_generators::DepartmentDefinition {
435 code: "1000".to_string(),
436 name: "General".to_string(),
437 cost_center: "CC1000".to_string(),
438 headcount: 10,
439 system_roles: vec![],
440 transaction_codes: vec![],
441 };
442 for _ in 0..md_config.employees.count {
443 if control.is_cancelled() {
444 break;
445 }
446 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
447 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
448 employee,
449 ))))?;
450 count += 1;
451 }
452
453 Ok(count)
454 }
455
456 fn generate_journal_entries_phase(
461 config: &GeneratorConfig,
462 sender: &StreamSender<GeneratedItem>,
463 control: &Arc<StreamControl>,
464 progress_interval: u64,
465 progress: &mut StreamProgress,
466 ) -> SynthResult<u64> {
467 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
468 use std::sync::Arc;
469
470 let mut count: u64 = 0;
471 let seed = config.global.seed.unwrap_or(42);
472
473 let default_monthly = 500;
475 let total_entries: usize = config
476 .companies
477 .iter()
478 .map(|c| {
479 let monthly = (c.volume_weight * default_monthly as f64) as usize;
480 monthly.max(100) * config.global.period_months as usize
481 })
482 .sum();
483
484 progress.items_remaining = Some(total_entries as u64);
485 info!("Generating {} journal entries", total_entries);
486
487 let complexity = config.chart_of_accounts.complexity;
489 let industry = config.global.industry;
490 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
491 let coa = Arc::new(coa_gen.generate());
492
493 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
495 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
496 let end_date =
497 start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
498
499 let mut je_gen = JournalEntryGenerator::from_generator_config(
501 config,
502 Arc::clone(&coa),
503 start_date,
504 end_date,
505 seed,
506 );
507
508 for _ in 0..total_entries {
509 if control.is_cancelled() {
510 break;
511 }
512
513 while control.is_paused() {
515 std::thread::sleep(std::time::Duration::from_millis(100));
516 if control.is_cancelled() {
517 break;
518 }
519 }
520
521 let je = je_gen.generate();
522 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
523 count += 1;
524
525 if count % progress_interval == 0 {
527 progress.items_generated = count;
528 progress.items_remaining = Some(total_entries as u64 - count);
529 sender.send(StreamEvent::Progress(progress.clone()))?;
530 }
531 }
532
533 Ok(count)
534 }
535
536 fn generate_document_flows_phase(
542 config: &GeneratorConfig,
543 sender: &StreamSender<GeneratedItem>,
544 control: &Arc<StreamControl>,
545 progress_interval: u64,
546 progress: &mut StreamProgress,
547 ) -> SynthResult<u64> {
548 use chrono::Datelike;
549 use datasynth_generators::{
550 CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
551 };
552
553 let mut count: u64 = 0;
554 let seed = config.global.seed.unwrap_or(42);
555 let df_config = &config.document_flows;
556 let md_config = &config.master_data;
557
558 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
560 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
561
562 let company_code = config
563 .companies
564 .first()
565 .map(|c| c.code.as_str())
566 .unwrap_or("1000");
567
568 let vendor_count = md_config.vendors.count.min(100);
570 let customer_count = md_config.customers.count.min(100);
571 let material_count = md_config.materials.count.min(50);
572
573 let mut vendor_gen = VendorGenerator::new(seed);
575 let mut customer_gen = CustomerGenerator::new(seed + 1);
576 let mut material_gen = MaterialGenerator::new(seed + 2);
577
578 let vendors: Vec<_> = (0..vendor_count)
579 .map(|_| vendor_gen.generate_vendor(company_code, start_date))
580 .collect();
581
582 let customers: Vec<_> = (0..customer_count)
583 .map(|_| customer_gen.generate_customer(company_code, start_date))
584 .collect();
585
586 let materials: Vec<_> = (0..material_count)
587 .map(|_| material_gen.generate_material(company_code, start_date))
588 .collect();
589
590 let base_chains = (config.global.period_months as usize * 50).max(100);
593
594 if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
596 info!("Generating P2P document flows");
597 let mut p2p_gen = P2PGenerator::new(seed + 100);
598
599 let chains_to_generate = base_chains.min(1000);
600 progress.items_remaining = Some(chains_to_generate as u64);
601
602 for i in 0..chains_to_generate {
603 if control.is_cancelled() {
604 break;
605 }
606
607 while control.is_paused() {
609 std::thread::sleep(std::time::Duration::from_millis(100));
610 if control.is_cancelled() {
611 break;
612 }
613 }
614
615 let vendor = &vendors[i % vendors.len()];
616 let material_refs: Vec<&datasynth_core::models::Material> =
617 vec![&materials[i % materials.len()]];
618
619 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
621 let po_date = start_date + chrono::Duration::days(days_offset);
622 let fiscal_year = po_date.year() as u16;
623 let fiscal_period = po_date.month() as u8;
624
625 let chain = p2p_gen.generate_chain(
626 company_code,
627 vendor,
628 &material_refs,
629 po_date,
630 fiscal_year,
631 fiscal_period,
632 "SYSTEM",
633 );
634
635 sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
637 chain.purchase_order,
638 ))))?;
639 count += 1;
640
641 for gr in chain.goods_receipts {
642 sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
643 count += 1;
644 }
645
646 if let Some(vi) = chain.vendor_invoice {
647 sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
648 vi,
649 ))))?;
650 count += 1;
651 }
652
653 if let Some(payment) = chain.payment {
654 sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
655 count += 1;
656 }
657
658 if count % progress_interval == 0 {
659 progress.items_generated = count;
660 sender.send(StreamEvent::Progress(progress.clone()))?;
661 }
662 }
663 }
664
665 if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
667 info!("Generating O2C document flows");
668 let mut o2c_gen = O2CGenerator::new(seed + 200);
669
670 let chains_to_generate = base_chains.min(1000);
671
672 for i in 0..chains_to_generate {
673 if control.is_cancelled() {
674 break;
675 }
676
677 while control.is_paused() {
678 std::thread::sleep(std::time::Duration::from_millis(100));
679 if control.is_cancelled() {
680 break;
681 }
682 }
683
684 let customer = &customers[i % customers.len()];
685 let material_refs: Vec<&datasynth_core::models::Material> =
686 vec![&materials[i % materials.len()]];
687
688 let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
689 let so_date = start_date + chrono::Duration::days(days_offset);
690 let fiscal_year = so_date.year() as u16;
691 let fiscal_period = so_date.month() as u8;
692
693 let chain = o2c_gen.generate_chain(
694 company_code,
695 customer,
696 &material_refs,
697 so_date,
698 fiscal_year,
699 fiscal_period,
700 "SYSTEM",
701 );
702
703 sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
704 chain.sales_order,
705 ))))?;
706 count += 1;
707
708 for delivery in chain.deliveries {
709 sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
710 delivery,
711 ))))?;
712 count += 1;
713 }
714
715 if let Some(ci) = chain.customer_invoice {
716 sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
717 ci,
718 ))))?;
719 count += 1;
720 }
721
722 if count % progress_interval == 0 {
723 progress.items_generated = count;
724 sender.send(StreamEvent::Progress(progress.clone()))?;
725 }
726 }
727 }
728
729 Ok(count)
730 }
731
732 pub fn stats(&self) -> StreamingOrchestratorStats {
734 StreamingOrchestratorStats {
735 phases: self.config.phases.len(),
736 buffer_size: self.config.stream_config.buffer_size,
737 backpressure: self.config.stream_config.backpressure,
738 }
739 }
740}
741
742#[derive(Debug, Clone)]
744pub struct StreamingOrchestratorStats {
745 pub phases: usize,
747 pub buffer_size: usize,
749 pub backpressure: BackpressureStrategy,
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use datasynth_config::presets::create_preset;
757 use datasynth_config::schema::TransactionVolume;
758 use datasynth_core::models::{CoAComplexity, IndustrySector};
759
760 fn create_test_config() -> GeneratorConfig {
761 create_preset(
762 IndustrySector::Retail,
763 2,
764 3,
765 CoAComplexity::Small,
766 TransactionVolume::TenK,
767 )
768 }
769
770 #[test]
771 fn test_streaming_orchestrator_creation() {
772 let config = create_test_config();
773 let orchestrator = StreamingOrchestrator::from_generator_config(config);
774 let stats = orchestrator.stats();
775
776 assert!(stats.phases > 0);
777 assert!(stats.buffer_size > 0);
778 }
779
780 #[test]
781 fn test_streaming_generation() {
782 let mut config = create_test_config();
783 config.master_data.vendors.count = 5;
785 config.master_data.customers.count = 5;
786 config.master_data.employees.count = 5;
787 config.global.period_months = 1;
788
789 let streaming_config = StreamingOrchestratorConfig::new(config)
790 .with_phases(vec![
791 GenerationPhase::ChartOfAccounts,
792 GenerationPhase::MasterData,
793 ])
794 .with_stream_config(StreamConfig {
795 buffer_size: 100,
796 progress_interval: 10,
797 ..Default::default()
798 });
799
800 let orchestrator = StreamingOrchestrator::new(streaming_config);
801 let (receiver, _control) = orchestrator.stream().unwrap();
802
803 let mut items_count = 0;
804 let mut has_coa = false;
805 let mut has_completion = false;
806
807 for event in receiver {
808 match event {
809 StreamEvent::Data(item) => {
810 items_count += 1;
811 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
812 has_coa = true;
813 }
814 }
815 StreamEvent::Complete(_) => {
816 has_completion = true;
817 break;
818 }
819 _ => {}
820 }
821 }
822
823 assert!(items_count > 0);
824 assert!(has_coa);
825 assert!(has_completion);
826 }
827
828 #[test]
829 fn test_stream_cancellation() {
830 let mut config = create_test_config();
831 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
834 .with_phases(vec![GenerationPhase::JournalEntries]);
835
836 let orchestrator = StreamingOrchestrator::new(streaming_config);
837 let (receiver, control) = orchestrator.stream().unwrap();
838
839 let mut items_count = 0;
841 for event in receiver {
842 if let StreamEvent::Data(_) = event {
843 items_count += 1;
844 if items_count >= 10 {
845 control.cancel();
846 break;
847 }
848 }
849 }
850
851 assert!(control.is_cancelled());
852 }
853
854 #[test]
855 fn test_generated_item_type_name() {
856 use datasynth_core::models::{CoAComplexity, IndustrySector};
857
858 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
859 "TEST_COA".to_string(),
860 "Test Chart of Accounts".to_string(),
861 "US".to_string(),
862 IndustrySector::Manufacturing,
863 CoAComplexity::Small,
864 )));
865 assert_eq!(coa.type_name(), "chart_of_accounts");
866
867 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
868 assert_eq!(progress.type_name(), "progress");
869 }
870}