Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16    documents::{
17        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18    },
19    ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26/// Generated items that can be streamed.
27#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29    /// Chart of Accounts.
30    ChartOfAccounts(Box<ChartOfAccounts>),
31    /// A vendor.
32    Vendor(Box<Vendor>),
33    /// A customer.
34    Customer(Box<Customer>),
35    /// A material.
36    Material(Box<Material>),
37    /// An employee.
38    Employee(Box<Employee>),
39    /// A journal entry.
40    JournalEntry(Box<JournalEntry>),
41    /// A purchase order (P2P).
42    PurchaseOrder(Box<PurchaseOrder>),
43    /// A goods receipt (P2P).
44    GoodsReceipt(Box<GoodsReceipt>),
45    /// A vendor invoice (P2P).
46    VendorInvoice(Box<VendorInvoice>),
47    /// A payment (P2P/O2C).
48    Payment(Box<Payment>),
49    /// A sales order (O2C).
50    SalesOrder(Box<SalesOrder>),
51    /// A delivery (O2C).
52    Delivery(Box<Delivery>),
53    /// A customer invoice (O2C).
54    CustomerInvoice(Box<CustomerInvoice>),
55    /// Progress update.
56    Progress(StreamProgress),
57    /// Phase completion marker.
58    PhaseComplete(String),
59}
60
61impl GeneratedItem {
62    /// Returns the item type name.
63    pub fn type_name(&self) -> &'static str {
64        match self {
65            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66            GeneratedItem::Vendor(_) => "vendor",
67            GeneratedItem::Customer(_) => "customer",
68            GeneratedItem::Material(_) => "material",
69            GeneratedItem::Employee(_) => "employee",
70            GeneratedItem::JournalEntry(_) => "journal_entry",
71            GeneratedItem::PurchaseOrder(_) => "purchase_order",
72            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74            GeneratedItem::Payment(_) => "payment",
75            GeneratedItem::SalesOrder(_) => "sales_order",
76            GeneratedItem::Delivery(_) => "delivery",
77            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78            GeneratedItem::Progress(_) => "progress",
79            GeneratedItem::PhaseComplete(_) => "phase_complete",
80        }
81    }
82}
83
84/// Phase of generation.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87    /// Chart of Accounts generation.
88    ChartOfAccounts,
89    /// Master data generation (vendors, customers, etc.).
90    MasterData,
91    /// Document flow generation (P2P, O2C).
92    DocumentFlows,
93    /// OCPM event log generation.
94    OcpmEvents,
95    /// Journal entry generation.
96    JournalEntries,
97    /// Anomaly injection.
98    AnomalyInjection,
99    /// Balance validation.
100    BalanceValidation,
101    /// Data quality injection.
102    DataQuality,
103    /// Complete.
104    Complete,
105}
106
107impl GenerationPhase {
108    /// Returns the phase name.
109    pub fn name(&self) -> &'static str {
110        match self {
111            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112            GenerationPhase::MasterData => "master_data",
113            GenerationPhase::DocumentFlows => "document_flows",
114            GenerationPhase::OcpmEvents => "ocpm_events",
115            GenerationPhase::JournalEntries => "journal_entries",
116            GenerationPhase::AnomalyInjection => "anomaly_injection",
117            GenerationPhase::BalanceValidation => "balance_validation",
118            GenerationPhase::DataQuality => "data_quality",
119            GenerationPhase::Complete => "complete",
120        }
121    }
122}
123
124/// Configuration for streaming orchestration.
125#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127    /// Generator configuration.
128    pub generator_config: GeneratorConfig,
129    /// Stream configuration.
130    pub stream_config: StreamConfig,
131    /// Phases to execute.
132    pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136    /// Creates a new streaming orchestrator configuration.
137    pub fn new(generator_config: GeneratorConfig) -> Self {
138        Self {
139            generator_config,
140            stream_config: StreamConfig::default(),
141            phases: vec![
142                GenerationPhase::ChartOfAccounts,
143                GenerationPhase::MasterData,
144                GenerationPhase::DocumentFlows,
145                GenerationPhase::JournalEntries,
146            ],
147        }
148    }
149
150    /// Creates a configuration with all phases enabled including OCPM.
151    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152        Self {
153            generator_config,
154            stream_config: StreamConfig::default(),
155            phases: vec![
156                GenerationPhase::ChartOfAccounts,
157                GenerationPhase::MasterData,
158                GenerationPhase::DocumentFlows,
159                GenerationPhase::OcpmEvents,
160                GenerationPhase::JournalEntries,
161                GenerationPhase::AnomalyInjection,
162                GenerationPhase::DataQuality,
163            ],
164        }
165    }
166
167    /// Sets the stream configuration.
168    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169        self.stream_config = config;
170        self
171    }
172
173    /// Sets the phases to execute.
174    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175        self.phases = phases;
176        self
177    }
178}
179
180/// Streaming orchestrator for real-time data generation.
181pub struct StreamingOrchestrator {
182    config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186    /// Creates a new streaming orchestrator.
187    pub fn new(config: StreamingOrchestratorConfig) -> Self {
188        Self { config }
189    }
190
191    /// Creates a streaming orchestrator from generator config with defaults.
192    pub fn from_generator_config(config: GeneratorConfig) -> Self {
193        Self::new(StreamingOrchestratorConfig::new(config))
194    }
195
196    /// Starts streaming generation.
197    ///
198    /// Returns a receiver for stream events and a control handle.
199    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200        let (sender, receiver) = stream_channel(
201            self.config.stream_config.buffer_size,
202            self.config.stream_config.backpressure,
203        );
204
205        let control = Arc::new(StreamControl::new());
206        let control_clone = Arc::clone(&control);
207
208        let config = self.config.clone();
209
210        // Spawn generation thread
211        thread::spawn(move || {
212            let result = Self::run_generation(config, sender, control_clone);
213            if let Err(e) = result {
214                warn!("Streaming generation error: {}", e);
215            }
216        });
217
218        Ok((receiver, control))
219    }
220
221    /// Runs the generation process.
222    fn run_generation(
223        config: StreamingOrchestratorConfig,
224        sender: StreamSender<GeneratedItem>,
225        control: Arc<StreamControl>,
226    ) -> SynthResult<()> {
227        let start_time = Instant::now();
228        let mut items_generated: u64 = 0;
229        let mut phases_completed = Vec::new();
230
231        // Track stats
232        let progress_interval = config.stream_config.progress_interval;
233
234        // Send initial progress
235        let mut progress = StreamProgress::new("initializing");
236        sender.send(StreamEvent::Progress(progress.clone()))?;
237
238        for phase in &config.phases {
239            if control.is_cancelled() {
240                info!("Generation cancelled");
241                break;
242            }
243
244            // Handle pause
245            while control.is_paused() {
246                std::thread::sleep(std::time::Duration::from_millis(100));
247                if control.is_cancelled() {
248                    break;
249                }
250            }
251
252            progress.phase = phase.name().to_string();
253            sender.send(StreamEvent::Progress(progress.clone()))?;
254
255            match phase {
256                GenerationPhase::ChartOfAccounts => {
257                    let result =
258                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259                    items_generated += result;
260                }
261                GenerationPhase::MasterData => {
262                    let result = Self::generate_master_data_phase(
263                        &config.generator_config,
264                        &sender,
265                        &control,
266                    )?;
267                    items_generated += result;
268                }
269                GenerationPhase::DocumentFlows => {
270                    let result = Self::generate_document_flows_phase(
271                        &config.generator_config,
272                        &sender,
273                        &control,
274                        progress_interval,
275                        &mut progress,
276                    )?;
277                    items_generated += result;
278                }
279                GenerationPhase::OcpmEvents => {
280                    // OCPM event generation is optional and requires document flows
281                    debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282                }
283                GenerationPhase::JournalEntries => {
284                    let result = Self::generate_journal_entries_phase(
285                        &config.generator_config,
286                        &sender,
287                        &control,
288                        progress_interval,
289                        &mut progress,
290                    )?;
291                    items_generated += result;
292                }
293                GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294                    // These phases modify existing data; not directly generating new items
295                    debug!(
296                        "Phase {:?} operates on existing data (not streaming new items)",
297                        phase
298                    );
299                }
300                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301                    // Validation and completion phases don't generate items
302                    debug!("Phase {:?} is a validation/completion phase", phase);
303                }
304            }
305
306            // Send phase completion
307            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308                phase.name().to_string(),
309            )))?;
310            phases_completed.push(phase.name().to_string());
311
312            // Update progress
313            progress.items_generated = items_generated;
314            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315            if progress.elapsed_ms > 0 {
316                progress.items_per_second =
317                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318            }
319            sender.send(StreamEvent::Progress(progress.clone()))?;
320        }
321
322        // Send completion
323        let stats = sender.stats();
324        let summary = StreamSummary {
325            total_items: items_generated,
326            total_time_ms: start_time.elapsed().as_millis() as u64,
327            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329            } else {
330                0.0
331            },
332            error_count: 0,
333            dropped_count: stats.items_dropped,
334            peak_memory_mb: None,
335            phases_completed,
336        };
337
338        sender.send(StreamEvent::Complete(summary))?;
339        sender.close();
340
341        Ok(())
342    }
343
344    /// Generates Chart of Accounts phase.
345    fn generate_coa_phase(
346        config: &GeneratorConfig,
347        sender: &StreamSender<GeneratedItem>,
348        control: &Arc<StreamControl>,
349    ) -> SynthResult<u64> {
350        use datasynth_generators::ChartOfAccountsGenerator;
351
352        if control.is_cancelled() {
353            return Ok(0);
354        }
355
356        info!("Generating Chart of Accounts");
357        let seed = config.global.seed.unwrap_or(42);
358        let complexity = config.chart_of_accounts.complexity;
359        let industry = config.global.industry;
360        let coa_framework = resolve_coa_framework_from_config(config);
361
362        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
363            .with_coa_framework(coa_framework);
364        let coa = coa_gen.generate();
365
366        let account_count = coa.account_count() as u64;
367        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
368            coa,
369        ))))?;
370
371        Ok(account_count)
372    }
373
374    /// Generates master data phase.
375    fn generate_master_data_phase(
376        config: &GeneratorConfig,
377        sender: &StreamSender<GeneratedItem>,
378        control: &Arc<StreamControl>,
379    ) -> SynthResult<u64> {
380        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
381
382        let mut count: u64 = 0;
383        let seed = config.global.seed.unwrap_or(42);
384        let md_config = &config.master_data;
385        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
386            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
387
388        let company_code = config
389            .companies
390            .first()
391            .map(|c| c.code.as_str())
392            .unwrap_or("1000");
393
394        // Generate vendors
395        if control.is_cancelled() {
396            return Ok(count);
397        }
398
399        info!("Generating vendors");
400        let mut vendor_gen = VendorGenerator::new(seed);
401        for _ in 0..md_config.vendors.count {
402            if control.is_cancelled() {
403                break;
404            }
405            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
406            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
407            count += 1;
408        }
409
410        // Generate customers
411        if control.is_cancelled() {
412            return Ok(count);
413        }
414
415        info!("Generating customers");
416        let mut customer_gen = CustomerGenerator::new(seed + 1);
417        for _ in 0..md_config.customers.count {
418            if control.is_cancelled() {
419                break;
420            }
421            let customer = customer_gen.generate_customer(company_code, effective_date);
422            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
423                customer,
424            ))))?;
425            count += 1;
426        }
427
428        // Generate employees
429        if control.is_cancelled() {
430            return Ok(count);
431        }
432
433        info!("Generating employees");
434        let mut employee_gen = EmployeeGenerator::new(seed + 4);
435        // Use first department from config
436        let dept = datasynth_generators::DepartmentDefinition {
437            code: "1000".to_string(),
438            name: "General".to_string(),
439            cost_center: "CC1000".to_string(),
440            headcount: 10,
441            system_roles: vec![],
442            transaction_codes: vec![],
443        };
444        for _ in 0..md_config.employees.count {
445            if control.is_cancelled() {
446                break;
447            }
448            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
449            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
450                employee,
451            ))))?;
452            count += 1;
453        }
454
455        Ok(count)
456    }
457
458    /// Generates journal entries phase.
459    ///
460    /// Note: This is a simplified version that generates basic journal entries.
461    /// For full-featured generation with all options, use EnhancedOrchestrator.
462    fn generate_journal_entries_phase(
463        config: &GeneratorConfig,
464        sender: &StreamSender<GeneratedItem>,
465        control: &Arc<StreamControl>,
466        progress_interval: u64,
467        progress: &mut StreamProgress,
468    ) -> SynthResult<u64> {
469        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
470        use std::sync::Arc;
471
472        let mut count: u64 = 0;
473        let seed = config.global.seed.unwrap_or(42);
474
475        // Calculate total entries to generate based on volume weights
476        let default_monthly = 500;
477        let total_entries: usize = config
478            .companies
479            .iter()
480            .map(|c| {
481                let monthly = (c.volume_weight * default_monthly as f64) as usize;
482                monthly.max(100) * config.global.period_months as usize
483            })
484            .sum();
485
486        progress.items_remaining = Some(total_entries as u64);
487        info!("Generating {} journal entries", total_entries);
488
489        // Generate a shared CoA for all companies
490        let complexity = config.chart_of_accounts.complexity;
491        let industry = config.global.industry;
492        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
493        let coa = Arc::new(coa_gen.generate());
494
495        // Parse start date
496        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
497            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
498        let end_date =
499            start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
500
501        // Create JE generator from config
502        let mut je_gen = JournalEntryGenerator::from_generator_config(
503            config,
504            Arc::clone(&coa),
505            start_date,
506            end_date,
507            seed,
508        );
509
510        for _ in 0..total_entries {
511            if control.is_cancelled() {
512                break;
513            }
514
515            // Handle pause
516            while control.is_paused() {
517                std::thread::sleep(std::time::Duration::from_millis(100));
518                if control.is_cancelled() {
519                    break;
520                }
521            }
522
523            let je = je_gen.generate();
524            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
525            count += 1;
526
527            // Send progress updates
528            if count.is_multiple_of(progress_interval) {
529                progress.items_generated = count;
530                progress.items_remaining = Some(total_entries as u64 - count);
531                sender.send(StreamEvent::Progress(progress.clone()))?;
532            }
533        }
534
535        Ok(count)
536    }
537
538    /// Generates document flows phase (P2P and O2C).
539    ///
540    /// Creates complete document chains:
541    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
542    /// - O2C: SalesOrder → Delivery → CustomerInvoice
543    fn generate_document_flows_phase(
544        config: &GeneratorConfig,
545        sender: &StreamSender<GeneratedItem>,
546        control: &Arc<StreamControl>,
547        progress_interval: u64,
548        progress: &mut StreamProgress,
549    ) -> SynthResult<u64> {
550        use chrono::Datelike;
551        use datasynth_generators::{
552            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
553        };
554
555        let mut count: u64 = 0;
556        let seed = config.global.seed.unwrap_or(42);
557        let df_config = &config.document_flows;
558        let md_config = &config.master_data;
559
560        // Parse dates
561        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
562            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
563
564        let company_code = config
565            .companies
566            .first()
567            .map(|c| c.code.as_str())
568            .unwrap_or("1000");
569
570        // Use master data config counts for generating reference data
571        let vendor_count = md_config.vendors.count.min(100);
572        let customer_count = md_config.customers.count.min(100);
573        let material_count = md_config.materials.count.min(50);
574
575        // Generate some master data for document flows
576        let mut vendor_gen = VendorGenerator::new(seed);
577        let mut customer_gen = CustomerGenerator::new(seed + 1);
578        let mut material_gen = MaterialGenerator::new(seed + 2);
579
580        let vendors: Vec<_> = (0..vendor_count)
581            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
582            .collect();
583
584        let customers: Vec<_> = (0..customer_count)
585            .map(|_| customer_gen.generate_customer(company_code, start_date))
586            .collect();
587
588        let materials: Vec<_> = (0..material_count)
589            .map(|_| material_gen.generate_material(company_code, start_date))
590            .collect();
591
592        // Determine number of chains based on transaction volume
593        // Use period months as a multiplier for document chains
594        let base_chains = (config.global.period_months as usize * 50).max(100);
595
596        // P2P Generation
597        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
598            info!("Generating P2P document flows");
599            let mut p2p_gen = P2PGenerator::new(seed + 100);
600
601            let chains_to_generate = base_chains.min(1000);
602            progress.items_remaining = Some(chains_to_generate as u64);
603
604            for i in 0..chains_to_generate {
605                if control.is_cancelled() {
606                    break;
607                }
608
609                // Handle pause
610                while control.is_paused() {
611                    std::thread::sleep(std::time::Duration::from_millis(100));
612                    if control.is_cancelled() {
613                        break;
614                    }
615                }
616
617                let vendor = &vendors[i % vendors.len()];
618                let material_refs: Vec<&datasynth_core::models::Material> =
619                    vec![&materials[i % materials.len()]];
620
621                // Calculate posting date within the period
622                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
623                let po_date = start_date + chrono::Duration::days(days_offset);
624                let fiscal_year = po_date.year() as u16;
625                let fiscal_period = po_date.month() as u8;
626
627                let chain = p2p_gen.generate_chain(
628                    company_code,
629                    vendor,
630                    &material_refs,
631                    po_date,
632                    fiscal_year,
633                    fiscal_period,
634                    "SYSTEM",
635                );
636
637                // Send each document in the chain
638                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
639                    chain.purchase_order,
640                ))))?;
641                count += 1;
642
643                for gr in chain.goods_receipts {
644                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
645                    count += 1;
646                }
647
648                if let Some(vi) = chain.vendor_invoice {
649                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
650                        vi,
651                    ))))?;
652                    count += 1;
653                }
654
655                if let Some(payment) = chain.payment {
656                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
657                    count += 1;
658                }
659
660                if count.is_multiple_of(progress_interval) {
661                    progress.items_generated = count;
662                    sender.send(StreamEvent::Progress(progress.clone()))?;
663                }
664            }
665        }
666
667        // O2C Generation
668        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
669            info!("Generating O2C document flows");
670            let mut o2c_gen = O2CGenerator::new(seed + 200);
671
672            let chains_to_generate = base_chains.min(1000);
673
674            for i in 0..chains_to_generate {
675                if control.is_cancelled() {
676                    break;
677                }
678
679                while control.is_paused() {
680                    std::thread::sleep(std::time::Duration::from_millis(100));
681                    if control.is_cancelled() {
682                        break;
683                    }
684                }
685
686                let customer = &customers[i % customers.len()];
687                let material_refs: Vec<&datasynth_core::models::Material> =
688                    vec![&materials[i % materials.len()]];
689
690                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
691                let so_date = start_date + chrono::Duration::days(days_offset);
692                let fiscal_year = so_date.year() as u16;
693                let fiscal_period = so_date.month() as u8;
694
695                let chain = o2c_gen.generate_chain(
696                    company_code,
697                    customer,
698                    &material_refs,
699                    so_date,
700                    fiscal_year,
701                    fiscal_period,
702                    "SYSTEM",
703                );
704
705                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
706                    chain.sales_order,
707                ))))?;
708                count += 1;
709
710                for delivery in chain.deliveries {
711                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
712                        delivery,
713                    ))))?;
714                    count += 1;
715                }
716
717                if let Some(ci) = chain.customer_invoice {
718                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
719                        ci,
720                    ))))?;
721                    count += 1;
722                }
723
724                if count.is_multiple_of(progress_interval) {
725                    progress.items_generated = count;
726                    sender.send(StreamEvent::Progress(progress.clone()))?;
727                }
728            }
729        }
730
731        Ok(count)
732    }
733
734    /// Returns the orchestrator configuration stats.
735    pub fn stats(&self) -> StreamingOrchestratorStats {
736        StreamingOrchestratorStats {
737            phases: self.config.phases.len(),
738            buffer_size: self.config.stream_config.buffer_size,
739            backpressure: self.config.stream_config.backpressure,
740        }
741    }
742}
743
744/// Statistics for the streaming orchestrator.
745#[derive(Debug, Clone)]
746pub struct StreamingOrchestratorStats {
747    /// Number of phases configured.
748    pub phases: usize,
749    /// Buffer size.
750    pub buffer_size: usize,
751    /// Backpressure strategy.
752    pub backpressure: BackpressureStrategy,
753}
754
755/// Resolve CoA framework from a GeneratorConfig.
756fn resolve_coa_framework_from_config(
757    config: &GeneratorConfig,
758) -> datasynth_generators::coa_generator::CoAFramework {
759    use datasynth_generators::coa_generator::CoAFramework;
760    if config.accounting_standards.enabled {
761        match config.accounting_standards.framework {
762            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
763                return CoAFramework::FrenchPcg;
764            }
765            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
766                return CoAFramework::GermanSkr04;
767            }
768            _ => {}
769        }
770    }
771    CoAFramework::UsGaap
772}
773
774#[cfg(test)]
775#[allow(clippy::unwrap_used)]
776mod tests {
777    use super::*;
778    use datasynth_config::presets::create_preset;
779    use datasynth_config::schema::TransactionVolume;
780    use datasynth_core::models::{CoAComplexity, IndustrySector};
781
782    fn create_test_config() -> GeneratorConfig {
783        create_preset(
784            IndustrySector::Retail,
785            2,
786            3,
787            CoAComplexity::Small,
788            TransactionVolume::TenK,
789        )
790    }
791
792    #[test]
793    fn test_streaming_orchestrator_creation() {
794        let config = create_test_config();
795        let orchestrator = StreamingOrchestrator::from_generator_config(config);
796        let stats = orchestrator.stats();
797
798        assert!(stats.phases > 0);
799        assert!(stats.buffer_size > 0);
800    }
801
802    #[test]
803    fn test_streaming_generation() {
804        let mut config = create_test_config();
805        // Reduce volume for testing
806        config.master_data.vendors.count = 5;
807        config.master_data.customers.count = 5;
808        config.master_data.employees.count = 5;
809        config.global.period_months = 1;
810
811        let streaming_config = StreamingOrchestratorConfig::new(config)
812            .with_phases(vec![
813                GenerationPhase::ChartOfAccounts,
814                GenerationPhase::MasterData,
815            ])
816            .with_stream_config(StreamConfig {
817                buffer_size: 100,
818                progress_interval: 10,
819                ..Default::default()
820            });
821
822        let orchestrator = StreamingOrchestrator::new(streaming_config);
823        let (receiver, _control) = orchestrator.stream().unwrap();
824
825        let mut items_count = 0;
826        let mut has_coa = false;
827        let mut has_completion = false;
828
829        for event in receiver {
830            match event {
831                StreamEvent::Data(item) => {
832                    items_count += 1;
833                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
834                        has_coa = true;
835                    }
836                }
837                StreamEvent::Complete(_) => {
838                    has_completion = true;
839                    break;
840                }
841                _ => {}
842            }
843        }
844
845        assert!(items_count > 0);
846        assert!(has_coa);
847        assert!(has_completion);
848    }
849
850    #[test]
851    fn test_stream_cancellation() {
852        let mut config = create_test_config();
853        config.global.period_months = 12; // Longer generation
854
855        let streaming_config = StreamingOrchestratorConfig::new(config)
856            .with_phases(vec![GenerationPhase::JournalEntries]);
857
858        let orchestrator = StreamingOrchestrator::new(streaming_config);
859        let (receiver, control) = orchestrator.stream().unwrap();
860
861        // Cancel after receiving some items
862        let mut items_count = 0;
863        for event in receiver {
864            if let StreamEvent::Data(_) = event {
865                items_count += 1;
866                if items_count >= 10 {
867                    control.cancel();
868                    break;
869                }
870            }
871        }
872
873        assert!(control.is_cancelled());
874    }
875
876    #[test]
877    fn test_generated_item_type_name() {
878        use datasynth_core::models::{CoAComplexity, IndustrySector};
879
880        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
881            "TEST_COA".to_string(),
882            "Test Chart of Accounts".to_string(),
883            "US".to_string(),
884            IndustrySector::Manufacturing,
885            CoAComplexity::Small,
886        )));
887        assert_eq!(coa.type_name(), "chart_of_accounts");
888
889        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
890        assert_eq!(progress.type_name(), "progress");
891    }
892}