Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14
15/// Default RNG seed when not specified in config.
16const DEFAULT_SEED: u64 = 42;
17use datasynth_core::error::SynthResult;
18use datasynth_core::models::{
19    documents::{
20        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
21    },
22    AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
23    Vendor,
24};
25use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
26use datasynth_core::traits::{
27    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
28};
29
30/// Generated items that can be streamed.
31#[derive(Debug, Clone)]
32pub enum GeneratedItem {
33    /// Chart of Accounts.
34    ChartOfAccounts(Box<ChartOfAccounts>),
35    /// A vendor.
36    Vendor(Box<Vendor>),
37    /// A customer.
38    Customer(Box<Customer>),
39    /// A material.
40    Material(Box<Material>),
41    /// An employee.
42    Employee(Box<Employee>),
43    /// A journal entry.
44    JournalEntry(Box<JournalEntry>),
45    /// A purchase order (P2P).
46    PurchaseOrder(Box<PurchaseOrder>),
47    /// A goods receipt (P2P).
48    GoodsReceipt(Box<GoodsReceipt>),
49    /// A vendor invoice (P2P).
50    VendorInvoice(Box<VendorInvoice>),
51    /// A payment (P2P/O2C).
52    Payment(Box<Payment>),
53    /// A sales order (O2C).
54    SalesOrder(Box<SalesOrder>),
55    /// A delivery (O2C).
56    Delivery(Box<Delivery>),
57    /// A customer invoice (O2C).
58    CustomerInvoice(Box<CustomerInvoice>),
59    /// An anomaly label (injected during JE generation).
60    AnomalyLabel(Box<LabeledAnomaly>),
61    /// Progress update.
62    Progress(StreamProgress),
63    /// Phase completion marker.
64    PhaseComplete(String),
65}
66
67impl GeneratedItem {
68    /// Returns the item type name.
69    pub fn type_name(&self) -> &'static str {
70        match self {
71            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
72            GeneratedItem::Vendor(_) => "vendor",
73            GeneratedItem::Customer(_) => "customer",
74            GeneratedItem::Material(_) => "material",
75            GeneratedItem::Employee(_) => "employee",
76            GeneratedItem::JournalEntry(_) => "journal_entry",
77            GeneratedItem::PurchaseOrder(_) => "purchase_order",
78            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
79            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
80            GeneratedItem::Payment(_) => "payment",
81            GeneratedItem::SalesOrder(_) => "sales_order",
82            GeneratedItem::Delivery(_) => "delivery",
83            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
84            GeneratedItem::AnomalyLabel(_) => "anomaly_label",
85            GeneratedItem::Progress(_) => "progress",
86            GeneratedItem::PhaseComplete(_) => "phase_complete",
87        }
88    }
89}
90
91/// Phase of generation.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum GenerationPhase {
94    /// Chart of Accounts generation.
95    ChartOfAccounts,
96    /// Master data generation (vendors, customers, etc.).
97    MasterData,
98    /// Document flow generation (P2P, O2C).
99    DocumentFlows,
100    /// OCPM event log generation.
101    OcpmEvents,
102    /// Journal entry generation.
103    JournalEntries,
104    /// Anomaly injection.
105    AnomalyInjection,
106    /// Balance validation.
107    BalanceValidation,
108    /// Data quality injection.
109    DataQuality,
110    /// Complete.
111    Complete,
112}
113
114impl GenerationPhase {
115    /// Returns the phase name.
116    pub fn name(&self) -> &'static str {
117        match self {
118            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
119            GenerationPhase::MasterData => "master_data",
120            GenerationPhase::DocumentFlows => "document_flows",
121            GenerationPhase::OcpmEvents => "ocpm_events",
122            GenerationPhase::JournalEntries => "journal_entries",
123            GenerationPhase::AnomalyInjection => "anomaly_injection",
124            GenerationPhase::BalanceValidation => "balance_validation",
125            GenerationPhase::DataQuality => "data_quality",
126            GenerationPhase::Complete => "complete",
127        }
128    }
129}
130
131/// Configuration for streaming orchestration.
132#[derive(Debug, Clone)]
133pub struct StreamingOrchestratorConfig {
134    /// Generator configuration.
135    pub generator_config: GeneratorConfig,
136    /// Stream configuration.
137    pub stream_config: StreamConfig,
138    /// Phases to execute.
139    pub phases: Vec<GenerationPhase>,
140}
141
142impl StreamingOrchestratorConfig {
143    /// Creates a new streaming orchestrator configuration.
144    pub fn new(generator_config: GeneratorConfig) -> Self {
145        Self {
146            generator_config,
147            stream_config: StreamConfig::default(),
148            phases: vec![
149                GenerationPhase::ChartOfAccounts,
150                GenerationPhase::MasterData,
151                GenerationPhase::DocumentFlows,
152                GenerationPhase::JournalEntries,
153            ],
154        }
155    }
156
157    /// Creates a configuration with all phases enabled including OCPM.
158    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
159        Self {
160            generator_config,
161            stream_config: StreamConfig::default(),
162            phases: vec![
163                GenerationPhase::ChartOfAccounts,
164                GenerationPhase::MasterData,
165                GenerationPhase::DocumentFlows,
166                GenerationPhase::OcpmEvents,
167                GenerationPhase::JournalEntries,
168                GenerationPhase::AnomalyInjection,
169                GenerationPhase::DataQuality,
170            ],
171        }
172    }
173
174    /// Sets the stream configuration.
175    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
176        self.stream_config = config;
177        self
178    }
179
180    /// Sets the phases to execute.
181    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
182        self.phases = phases;
183        self
184    }
185}
186
187/// Streaming orchestrator for real-time data generation.
188pub struct StreamingOrchestrator {
189    config: StreamingOrchestratorConfig,
190}
191
192impl StreamingOrchestrator {
193    /// Creates a new streaming orchestrator.
194    pub fn new(config: StreamingOrchestratorConfig) -> Self {
195        Self { config }
196    }
197
198    /// Creates a streaming orchestrator from generator config with defaults.
199    pub fn from_generator_config(config: GeneratorConfig) -> Self {
200        Self::new(StreamingOrchestratorConfig::new(config))
201    }
202
203    /// Starts streaming generation.
204    ///
205    /// Returns a receiver for stream events and a control handle.
206    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
207        let (sender, receiver) = stream_channel(
208            self.config.stream_config.buffer_size,
209            self.config.stream_config.backpressure,
210        );
211
212        let control = Arc::new(StreamControl::new());
213        let control_clone = Arc::clone(&control);
214
215        let config = self.config.clone();
216
217        // Spawn generation thread
218        thread::spawn(move || {
219            let result = Self::run_generation(config, sender, control_clone);
220            if let Err(e) = result {
221                warn!("Streaming generation error: {}", e);
222            }
223        });
224
225        Ok((receiver, control))
226    }
227
228    /// Runs the generation process.
229    fn run_generation(
230        config: StreamingOrchestratorConfig,
231        sender: StreamSender<GeneratedItem>,
232        control: Arc<StreamControl>,
233    ) -> SynthResult<()> {
234        let start_time = Instant::now();
235        let mut items_generated: u64 = 0;
236        let mut phases_completed = Vec::new();
237
238        // Track stats
239        let progress_interval = config.stream_config.progress_interval;
240
241        // Send initial progress
242        let mut progress = StreamProgress::new("initializing");
243        sender.send(StreamEvent::Progress(progress.clone()))?;
244
245        for phase in &config.phases {
246            if control.is_cancelled() {
247                info!("Generation cancelled");
248                break;
249            }
250
251            // Handle pause
252            while control.is_paused() {
253                std::thread::sleep(std::time::Duration::from_millis(100));
254                if control.is_cancelled() {
255                    break;
256                }
257            }
258
259            progress.phase = phase.name().to_string();
260            sender.send(StreamEvent::Progress(progress.clone()))?;
261
262            match phase {
263                GenerationPhase::ChartOfAccounts => {
264                    let result =
265                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
266                    items_generated += result;
267                }
268                GenerationPhase::MasterData => {
269                    let result = Self::generate_master_data_phase(
270                        &config.generator_config,
271                        &sender,
272                        &control,
273                    )?;
274                    items_generated += result;
275                }
276                GenerationPhase::DocumentFlows => {
277                    let result = Self::generate_document_flows_phase(
278                        &config.generator_config,
279                        &sender,
280                        &control,
281                        progress_interval,
282                        &mut progress,
283                    )?;
284                    items_generated += result;
285                }
286                GenerationPhase::OcpmEvents => {
287                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
288                }
289                GenerationPhase::JournalEntries => {
290                    let result = Self::generate_journal_entries_phase(
291                        &config.generator_config,
292                        &sender,
293                        &control,
294                        progress_interval,
295                        &mut progress,
296                    )?;
297                    items_generated += result;
298                }
299                GenerationPhase::AnomalyInjection => {
300                    info!("Anomaly injection applied inline during JE generation phase in streaming mode");
301                }
302                GenerationPhase::DataQuality => {
303                    info!(
304                        "Data quality injection is not yet supported in streaming mode; skipping"
305                    );
306                }
307                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
308                    info!("Phase {:?} is not applicable in streaming mode", phase);
309                }
310            }
311
312            // Send phase completion
313            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
314                phase.name().to_string(),
315            )))?;
316            phases_completed.push(phase.name().to_string());
317
318            // Update progress
319            progress.items_generated = items_generated;
320            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
321            if progress.elapsed_ms > 0 {
322                progress.items_per_second =
323                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
324            }
325            sender.send(StreamEvent::Progress(progress.clone()))?;
326        }
327
328        // Send completion
329        let stats = sender.stats();
330        let summary = StreamSummary {
331            total_items: items_generated,
332            total_time_ms: start_time.elapsed().as_millis() as u64,
333            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
334                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
335            } else {
336                0.0
337            },
338            error_count: 0,
339            dropped_count: stats.items_dropped,
340            peak_memory_mb: None,
341            phases_completed,
342        };
343
344        sender.send(StreamEvent::Complete(summary))?;
345        sender.close();
346
347        Ok(())
348    }
349
350    /// Generates Chart of Accounts phase.
351    fn generate_coa_phase(
352        config: &GeneratorConfig,
353        sender: &StreamSender<GeneratedItem>,
354        control: &Arc<StreamControl>,
355    ) -> SynthResult<u64> {
356        use datasynth_generators::ChartOfAccountsGenerator;
357
358        if control.is_cancelled() {
359            return Ok(0);
360        }
361
362        info!("Generating Chart of Accounts");
363        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
364        let complexity = config.chart_of_accounts.complexity;
365        let industry = config.global.industry;
366        let coa_framework = resolve_coa_framework_from_config(config);
367
368        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
369            .with_coa_framework(coa_framework);
370        let coa = coa_gen.generate();
371
372        let account_count = coa.account_count() as u64;
373        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
374            coa,
375        ))))?;
376
377        Ok(account_count)
378    }
379
380    /// Generates master data phase.
381    fn generate_master_data_phase(
382        config: &GeneratorConfig,
383        sender: &StreamSender<GeneratedItem>,
384        control: &Arc<StreamControl>,
385    ) -> SynthResult<u64> {
386        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
387
388        let mut count: u64 = 0;
389        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
390        let md_config = &config.master_data;
391        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
392            .unwrap_or_else(|e| {
393                tracing::warn!(
394                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
395                    config.global.start_date,
396                    e
397                );
398                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
399            });
400
401        let company_code = config
402            .companies
403            .first()
404            .map(|c| c.code.as_str())
405            .unwrap_or_else(|| {
406                tracing::warn!("No companies configured, defaulting to company code '1000'");
407                "1000"
408            });
409
410        // Generate vendors
411        if control.is_cancelled() {
412            return Ok(count);
413        }
414
415        info!("Generating vendors");
416        let mut vendor_gen = VendorGenerator::new(seed);
417        for _ in 0..md_config.vendors.count {
418            if control.is_cancelled() {
419                break;
420            }
421            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
422            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
423            count += 1;
424        }
425
426        // Generate customers
427        if control.is_cancelled() {
428            return Ok(count);
429        }
430
431        info!("Generating customers");
432        let mut customer_gen = CustomerGenerator::new(seed + 1);
433        for _ in 0..md_config.customers.count {
434            if control.is_cancelled() {
435                break;
436            }
437            let customer = customer_gen.generate_customer(company_code, effective_date);
438            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
439                customer,
440            ))))?;
441            count += 1;
442        }
443
444        // Generate employees
445        if control.is_cancelled() {
446            return Ok(count);
447        }
448
449        info!("Generating employees");
450        let mut employee_gen = EmployeeGenerator::new(seed + 4);
451        // Use first department from config, falling back to a default
452        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
453            datasynth_generators::DepartmentDefinition {
454                code: first_custom.code.clone(),
455                name: first_custom.name.clone(),
456                cost_center: first_custom
457                    .cost_center
458                    .clone()
459                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
460                headcount: 10,
461                system_roles: vec![],
462                transaction_codes: vec![],
463            }
464        } else {
465            warn!("No departments configured, using default 'General' department");
466            datasynth_generators::DepartmentDefinition {
467                code: "1000".to_string(),
468                name: "General".to_string(),
469                cost_center: "CC1000".to_string(),
470                headcount: 10,
471                system_roles: vec![],
472                transaction_codes: vec![],
473            }
474        };
475        for _ in 0..md_config.employees.count {
476            if control.is_cancelled() {
477                break;
478            }
479            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
480            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
481                employee,
482            ))))?;
483            count += 1;
484        }
485
486        Ok(count)
487    }
488
489    /// Generates journal entries phase.
490    ///
491    /// Note: This is a simplified version that generates basic journal entries.
492    /// For full-featured generation with all options, use EnhancedOrchestrator.
493    ///
494    /// When anomaly injection is enabled in config, anomalies are applied inline
495    /// to each batch of generated JEs before streaming them out.
496    fn generate_journal_entries_phase(
497        config: &GeneratorConfig,
498        sender: &StreamSender<GeneratedItem>,
499        control: &Arc<StreamControl>,
500        progress_interval: u64,
501        progress: &mut StreamProgress,
502    ) -> SynthResult<u64> {
503        use datasynth_generators::{
504            AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator, JournalEntryGenerator,
505        };
506        use std::sync::Arc;
507
508        let mut count: u64 = 0;
509        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
510
511        // Calculate total entries to generate based on volume weights
512        let default_monthly = 500;
513        let total_entries: usize = config
514            .companies
515            .iter()
516            .map(|c| {
517                let monthly = (c.volume_weight * default_monthly as f64) as usize;
518                monthly.max(100) * config.global.period_months as usize
519            })
520            .sum();
521
522        progress.items_remaining = Some(total_entries as u64);
523        info!("Generating {} journal entries", total_entries);
524
525        // Generate a shared CoA for all companies
526        let complexity = config.chart_of_accounts.complexity;
527        let industry = config.global.industry;
528        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
529        let coa = Arc::new(coa_gen.generate());
530
531        // Parse start date
532        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
533            .unwrap_or_else(|e| {
534                tracing::warn!(
535                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
536                    config.global.start_date,
537                    e
538                );
539                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
540            });
541        let end_date = start_date
542            .checked_add_months(chrono::Months::new(config.global.period_months))
543            .unwrap_or(start_date + chrono::Duration::days(365));
544
545        // Create JE generator from config
546        let mut je_gen = JournalEntryGenerator::from_generator_config(
547            config,
548            Arc::clone(&coa),
549            start_date,
550            end_date,
551            seed,
552        );
553
554        // Create anomaly injector if enabled.
555        // Priority: anomaly_injection config > fraud config
556        let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
557        let mut anomaly_injector = if anomaly_enabled {
558            let total_rate = if config.anomaly_injection.enabled {
559                config.anomaly_injection.rates.total_rate
560            } else {
561                config.fraud.fraud_rate
562            };
563            let fraud_rate = if config.anomaly_injection.enabled {
564                config.anomaly_injection.rates.fraud_rate
565            } else {
566                AnomalyRateConfig::default().fraud_rate
567            };
568            let error_rate = if config.anomaly_injection.enabled {
569                config.anomaly_injection.rates.error_rate
570            } else {
571                AnomalyRateConfig::default().error_rate
572            };
573            let process_issue_rate = if config.anomaly_injection.enabled {
574                config.anomaly_injection.rates.process_rate
575            } else {
576                AnomalyRateConfig::default().process_issue_rate
577            };
578
579            let injector_config = AnomalyInjectorConfig {
580                rates: AnomalyRateConfig {
581                    total_rate,
582                    fraud_rate,
583                    error_rate,
584                    process_issue_rate,
585                    ..Default::default()
586                },
587                seed: seed + 5000,
588                ..Default::default()
589            };
590
591            info!(
592                "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
593                total_rate
594            );
595            Some(AnomalyInjector::new(injector_config))
596        } else {
597            None
598        };
599
600        // Generate JEs in batches when anomaly injection is active,
601        // or one-by-one when it is not.
602        let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
603        let mut remaining = total_entries;
604
605        while remaining > 0 {
606            if control.is_cancelled() {
607                break;
608            }
609
610            let current_batch = remaining.min(batch_size);
611            let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
612
613            for _ in 0..current_batch {
614                if control.is_cancelled() {
615                    break;
616                }
617
618                // Handle pause
619                while control.is_paused() {
620                    std::thread::sleep(std::time::Duration::from_millis(100));
621                    if control.is_cancelled() {
622                        break;
623                    }
624                }
625
626                batch.push(je_gen.generate());
627            }
628
629            if batch.is_empty() {
630                break;
631            }
632
633            // Apply anomaly injection to the batch if enabled
634            if let Some(ref mut injector) = anomaly_injector {
635                let result = injector.process_entries(&mut batch);
636
637                // Stream any generated anomaly labels
638                for label in result.labels {
639                    sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
640                        label,
641                    ))))?;
642                }
643            }
644
645            // Send the (possibly mutated) JEs
646            for je in batch {
647                sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
648                count += 1;
649
650                // Send progress updates
651                if count.is_multiple_of(progress_interval) {
652                    progress.items_generated = count;
653                    progress.items_remaining = Some(total_entries as u64 - count);
654                    sender.send(StreamEvent::Progress(progress.clone()))?;
655                }
656            }
657
658            remaining = remaining.saturating_sub(current_batch);
659        }
660
661        Ok(count)
662    }
663
664    /// Generates document flows phase (P2P and O2C).
665    ///
666    /// Creates complete document chains:
667    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
668    /// - O2C: SalesOrder → Delivery → CustomerInvoice
669    fn generate_document_flows_phase(
670        config: &GeneratorConfig,
671        sender: &StreamSender<GeneratedItem>,
672        control: &Arc<StreamControl>,
673        progress_interval: u64,
674        progress: &mut StreamProgress,
675    ) -> SynthResult<u64> {
676        use chrono::Datelike;
677        use datasynth_generators::{
678            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
679        };
680
681        let mut count: u64 = 0;
682        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
683        let df_config = &config.document_flows;
684        let md_config = &config.master_data;
685
686        // Parse dates
687        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
688            .unwrap_or_else(|e| {
689                tracing::warn!(
690                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
691                    config.global.start_date,
692                    e
693                );
694                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
695            });
696        let end_date = start_date
697            .checked_add_months(chrono::Months::new(config.global.period_months))
698            .unwrap_or(start_date + chrono::Duration::days(365));
699        let total_period_days = (end_date - start_date).num_days().max(1);
700
701        let company_code = config
702            .companies
703            .first()
704            .map(|c| c.code.as_str())
705            .unwrap_or_else(|| {
706                tracing::warn!("No companies configured, defaulting to company code '1000'");
707                "1000"
708            });
709
710        // Use master data config counts for generating reference data
711        let vendor_count = md_config.vendors.count.min(100);
712        let customer_count = md_config.customers.count.min(100);
713        let material_count = md_config.materials.count.min(50);
714
715        // Generate some master data for document flows
716        let mut vendor_gen = VendorGenerator::new(seed);
717        let mut customer_gen = CustomerGenerator::new(seed + 1);
718        let mut material_gen = MaterialGenerator::new(seed + 2);
719
720        let vendors: Vec<_> = (0..vendor_count)
721            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
722            .collect();
723
724        let customers: Vec<_> = (0..customer_count)
725            .map(|_| customer_gen.generate_customer(company_code, start_date))
726            .collect();
727
728        let materials: Vec<_> = (0..material_count)
729            .map(|_| material_gen.generate_material(company_code, start_date))
730            .collect();
731
732        // Determine number of chains based on transaction volume
733        // Use period months as a multiplier for document chains
734        let base_chains = (config.global.period_months as usize * 50).max(100);
735
736        // P2P Generation
737        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
738            info!("Generating P2P document flows");
739            let mut p2p_gen = P2PGenerator::new(seed + 100);
740
741            let chains_to_generate = base_chains.min(1000);
742            progress.items_remaining = Some(chains_to_generate as u64);
743
744            for i in 0..chains_to_generate {
745                if control.is_cancelled() {
746                    break;
747                }
748
749                // Handle pause
750                while control.is_paused() {
751                    std::thread::sleep(std::time::Duration::from_millis(100));
752                    if control.is_cancelled() {
753                        break;
754                    }
755                }
756
757                let vendor = &vendors[i % vendors.len()];
758                let material_refs: Vec<&datasynth_core::models::Material> =
759                    vec![&materials[i % materials.len()]];
760
761                // Calculate posting date within the period
762                let days_offset = (i as i64 % total_period_days).max(0);
763                let po_date = start_date + chrono::Duration::days(days_offset);
764                let fiscal_year = po_date.year() as u16;
765                let fiscal_period = po_date.month() as u8;
766
767                let chain = p2p_gen.generate_chain(
768                    company_code,
769                    vendor,
770                    &material_refs,
771                    po_date,
772                    fiscal_year,
773                    fiscal_period,
774                    "SYSTEM",
775                );
776
777                // Send each document in the chain
778                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
779                    chain.purchase_order,
780                ))))?;
781                count += 1;
782
783                for gr in chain.goods_receipts {
784                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
785                    count += 1;
786                }
787
788                if let Some(vi) = chain.vendor_invoice {
789                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
790                        vi,
791                    ))))?;
792                    count += 1;
793                }
794
795                if let Some(payment) = chain.payment {
796                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
797                    count += 1;
798                }
799
800                if count.is_multiple_of(progress_interval) {
801                    progress.items_generated = count;
802                    sender.send(StreamEvent::Progress(progress.clone()))?;
803                }
804            }
805        }
806
807        // O2C Generation
808        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
809            info!("Generating O2C document flows");
810            let mut o2c_gen = O2CGenerator::new(seed + 200);
811
812            let chains_to_generate = base_chains.min(1000);
813
814            for i in 0..chains_to_generate {
815                if control.is_cancelled() {
816                    break;
817                }
818
819                while control.is_paused() {
820                    std::thread::sleep(std::time::Duration::from_millis(100));
821                    if control.is_cancelled() {
822                        break;
823                    }
824                }
825
826                let customer = &customers[i % customers.len()];
827                let material_refs: Vec<&datasynth_core::models::Material> =
828                    vec![&materials[i % materials.len()]];
829
830                let days_offset = (i as i64 % total_period_days).max(0);
831                let so_date = start_date + chrono::Duration::days(days_offset);
832                let fiscal_year = so_date.year() as u16;
833                let fiscal_period = so_date.month() as u8;
834
835                let chain = o2c_gen.generate_chain(
836                    company_code,
837                    customer,
838                    &material_refs,
839                    so_date,
840                    fiscal_year,
841                    fiscal_period,
842                    "SYSTEM",
843                );
844
845                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
846                    chain.sales_order,
847                ))))?;
848                count += 1;
849
850                for delivery in chain.deliveries {
851                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
852                        delivery,
853                    ))))?;
854                    count += 1;
855                }
856
857                if let Some(ci) = chain.customer_invoice {
858                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
859                        ci,
860                    ))))?;
861                    count += 1;
862                }
863
864                if count.is_multiple_of(progress_interval) {
865                    progress.items_generated = count;
866                    sender.send(StreamEvent::Progress(progress.clone()))?;
867                }
868            }
869        }
870
871        Ok(count)
872    }
873
874    /// Returns the orchestrator configuration stats.
875    pub fn stats(&self) -> StreamingOrchestratorStats {
876        StreamingOrchestratorStats {
877            phases: self.config.phases.len(),
878            buffer_size: self.config.stream_config.buffer_size,
879            backpressure: self.config.stream_config.backpressure,
880        }
881    }
882}
883
884/// Statistics for the streaming orchestrator.
885#[derive(Debug, Clone)]
886pub struct StreamingOrchestratorStats {
887    /// Number of phases configured.
888    pub phases: usize,
889    /// Buffer size.
890    pub buffer_size: usize,
891    /// Backpressure strategy.
892    pub backpressure: BackpressureStrategy,
893}
894
895/// Resolve CoA framework from a GeneratorConfig.
896fn resolve_coa_framework_from_config(
897    config: &GeneratorConfig,
898) -> datasynth_generators::coa_generator::CoAFramework {
899    use datasynth_generators::coa_generator::CoAFramework;
900    if config.accounting_standards.enabled {
901        match config.accounting_standards.framework {
902            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
903                return CoAFramework::FrenchPcg;
904            }
905            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
906                return CoAFramework::GermanSkr04;
907            }
908            _ => {}
909        }
910    }
911    CoAFramework::UsGaap
912}
913
914#[cfg(test)]
915#[allow(clippy::unwrap_used)]
916mod tests {
917    use super::*;
918    use datasynth_config::presets::create_preset;
919    use datasynth_config::schema::TransactionVolume;
920    use datasynth_core::models::{CoAComplexity, IndustrySector};
921
922    fn create_test_config() -> GeneratorConfig {
923        create_preset(
924            IndustrySector::Retail,
925            2,
926            3,
927            CoAComplexity::Small,
928            TransactionVolume::TenK,
929        )
930    }
931
932    #[test]
933    fn test_streaming_orchestrator_creation() {
934        let config = create_test_config();
935        let orchestrator = StreamingOrchestrator::from_generator_config(config);
936        let stats = orchestrator.stats();
937
938        assert!(stats.phases > 0);
939        assert!(stats.buffer_size > 0);
940    }
941
942    #[test]
943    fn test_streaming_generation() {
944        let mut config = create_test_config();
945        // Reduce volume for testing
946        config.master_data.vendors.count = 5;
947        config.master_data.customers.count = 5;
948        config.master_data.employees.count = 5;
949        config.global.period_months = 1;
950
951        let streaming_config = StreamingOrchestratorConfig::new(config)
952            .with_phases(vec![
953                GenerationPhase::ChartOfAccounts,
954                GenerationPhase::MasterData,
955            ])
956            .with_stream_config(StreamConfig {
957                buffer_size: 100,
958                progress_interval: 10,
959                ..Default::default()
960            });
961
962        let orchestrator = StreamingOrchestrator::new(streaming_config);
963        let (receiver, _control) = orchestrator.stream().unwrap();
964
965        let mut items_count = 0;
966        let mut has_coa = false;
967        let mut has_completion = false;
968
969        for event in receiver {
970            match event {
971                StreamEvent::Data(item) => {
972                    items_count += 1;
973                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
974                        has_coa = true;
975                    }
976                }
977                StreamEvent::Complete(_) => {
978                    has_completion = true;
979                    break;
980                }
981                _ => {}
982            }
983        }
984
985        assert!(items_count > 0);
986        assert!(has_coa);
987        assert!(has_completion);
988    }
989
990    #[test]
991    fn test_stream_cancellation() {
992        let mut config = create_test_config();
993        config.global.period_months = 12; // Longer generation
994
995        let streaming_config = StreamingOrchestratorConfig::new(config)
996            .with_phases(vec![GenerationPhase::JournalEntries]);
997
998        let orchestrator = StreamingOrchestrator::new(streaming_config);
999        let (receiver, control) = orchestrator.stream().unwrap();
1000
1001        // Cancel after receiving some items
1002        let mut items_count = 0;
1003        for event in receiver {
1004            if let StreamEvent::Data(_) = event {
1005                items_count += 1;
1006                if items_count >= 10 {
1007                    control.cancel();
1008                    break;
1009                }
1010            }
1011        }
1012
1013        assert!(control.is_cancelled());
1014    }
1015
1016    #[test]
1017    fn test_streaming_anomaly_injection() {
1018        let mut config = create_test_config();
1019        // Reduce volume for fast testing but keep enough entries for anomalies
1020        config.master_data.vendors.count = 3;
1021        config.master_data.customers.count = 3;
1022        config.master_data.employees.count = 3;
1023        config.global.period_months = 1;
1024
1025        // Enable anomaly injection with a high rate to guarantee some are created
1026        config.anomaly_injection.enabled = true;
1027        config.anomaly_injection.rates.total_rate = 0.20; // 20% to ensure hits
1028        config.anomaly_injection.rates.fraud_rate = 0.40;
1029        config.anomaly_injection.rates.error_rate = 0.40;
1030        config.anomaly_injection.rates.process_rate = 0.20;
1031
1032        let streaming_config = StreamingOrchestratorConfig::new(config)
1033            .with_phases(vec![GenerationPhase::JournalEntries])
1034            .with_stream_config(StreamConfig {
1035                buffer_size: 500,
1036                progress_interval: 50,
1037                ..Default::default()
1038            });
1039
1040        let orchestrator = StreamingOrchestrator::new(streaming_config);
1041        let (receiver, _control) = orchestrator.stream().unwrap();
1042
1043        let mut je_count = 0;
1044        let mut label_count = 0;
1045        let mut has_completion = false;
1046
1047        for event in receiver {
1048            match event {
1049                StreamEvent::Data(item) => match item {
1050                    GeneratedItem::JournalEntry(_) => je_count += 1,
1051                    GeneratedItem::AnomalyLabel(_) => label_count += 1,
1052                    _ => {}
1053                },
1054                StreamEvent::Complete(_) => {
1055                    has_completion = true;
1056                    break;
1057                }
1058                _ => {}
1059            }
1060        }
1061
1062        assert!(has_completion, "Stream should complete");
1063        assert!(je_count > 0, "Should generate journal entries");
1064        assert!(
1065            label_count > 0,
1066            "Should generate anomaly labels (got {} JEs, {} labels)",
1067            je_count,
1068            label_count
1069        );
1070    }
1071
1072    #[test]
1073    fn test_streaming_no_anomalies_when_disabled() {
1074        let mut config = create_test_config();
1075        config.master_data.vendors.count = 3;
1076        config.master_data.customers.count = 3;
1077        config.master_data.employees.count = 3;
1078        config.global.period_months = 1;
1079
1080        // Ensure anomaly injection is disabled
1081        config.anomaly_injection.enabled = false;
1082        config.fraud.enabled = false;
1083
1084        let streaming_config = StreamingOrchestratorConfig::new(config)
1085            .with_phases(vec![GenerationPhase::JournalEntries])
1086            .with_stream_config(StreamConfig {
1087                buffer_size: 500,
1088                progress_interval: 50,
1089                ..Default::default()
1090            });
1091
1092        let orchestrator = StreamingOrchestrator::new(streaming_config);
1093        let (receiver, _control) = orchestrator.stream().unwrap();
1094
1095        let mut label_count = 0;
1096
1097        for event in receiver {
1098            match event {
1099                StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1100                StreamEvent::Complete(_) => break,
1101                _ => {}
1102            }
1103        }
1104
1105        assert_eq!(
1106            label_count, 0,
1107            "Should not generate anomaly labels when disabled"
1108        );
1109    }
1110
1111    #[test]
1112    fn test_generated_item_type_name() {
1113        use datasynth_core::models::{CoAComplexity, IndustrySector};
1114
1115        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1116            "TEST_COA".to_string(),
1117            "Test Chart of Accounts".to_string(),
1118            "US".to_string(),
1119            IndustrySector::Manufacturing,
1120            CoAComplexity::Small,
1121        )));
1122        assert_eq!(coa.type_name(), "chart_of_accounts");
1123
1124        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1125        assert_eq!(progress.type_name(), "progress");
1126    }
1127}