Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5//!
6//! # Limitations — simplified pipeline
7//!
8//! This module implements a **simplified streaming pipeline** intended for
9//! interactive / server-side use cases where data is consumed as it is produced.
10//! It differs from the full-batch [`EnhancedOrchestrator`] in the following ways:
11//!
12//! - **Subset of generators**: Only journal entries, master data (vendors, customers,
13//!   materials, employees), document-flow documents (POs, GRs, invoices, payments,
14//!   sales orders, deliveries), and anomaly labels are streamed.  Advanced generators
15//!   (subledgers, period-close, audit, tax, ESG, banking, graph export, etc.) are
16//!   **not** invoked.
17//!
18//! - **No cross-generator state**: The streaming pipeline does not carry forward
19//!   balance tracker state, trial-balance snapshots, or depreciation runs between
20//!   periods.  Outputs are therefore not coherent across the full accounting cycle.
21//!
22//! - **No output sinks**: Records are emitted directly over the [`StreamSender`]
23//!   channel; CSV / JSON / Parquet file writing is the responsibility of the consumer.
24//!
25//! - **Backpressure model**: The orchestrator honours the [`BackpressureStrategy`]
26//!   configured on the [`StreamConfig`], but does not implement adaptive throttling
27//!   based on CPU / memory pressure (unlike the resource-guarded batch path).
28//!
29//! For full-fidelity, standards-compliant generation use
30//! [`EnhancedOrchestrator`](crate::enhanced_orchestrator::EnhancedOrchestrator) with
31//! a file-based output sink.
32//!
33//! [`EnhancedOrchestrator`]: crate::enhanced_orchestrator::EnhancedOrchestrator
34
35use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44/// Default RNG seed when not specified in config.
45const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48    documents::{
49        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50    },
51    AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52    Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59/// Generated items that can be streamed.
60#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62    /// Chart of Accounts.
63    ChartOfAccounts(Box<ChartOfAccounts>),
64    /// A vendor.
65    Vendor(Box<Vendor>),
66    /// A customer.
67    Customer(Box<Customer>),
68    /// A material.
69    Material(Box<Material>),
70    /// An employee.
71    Employee(Box<Employee>),
72    /// A journal entry.
73    JournalEntry(Box<JournalEntry>),
74    /// A purchase order (P2P).
75    PurchaseOrder(Box<PurchaseOrder>),
76    /// A goods receipt (P2P).
77    GoodsReceipt(Box<GoodsReceipt>),
78    /// A vendor invoice (P2P).
79    VendorInvoice(Box<VendorInvoice>),
80    /// A payment (P2P/O2C).
81    Payment(Box<Payment>),
82    /// A sales order (O2C).
83    SalesOrder(Box<SalesOrder>),
84    /// A delivery (O2C).
85    Delivery(Box<Delivery>),
86    /// A customer invoice (O2C).
87    CustomerInvoice(Box<CustomerInvoice>),
88    /// An anomaly label (injected during JE generation).
89    AnomalyLabel(Box<LabeledAnomaly>),
90    /// Progress update.
91    Progress(StreamProgress),
92    /// Phase completion marker.
93    PhaseComplete(String),
94}
95
96impl GeneratedItem {
97    /// Returns the item type name.
98    pub fn type_name(&self) -> &'static str {
99        match self {
100            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101            GeneratedItem::Vendor(_) => "vendor",
102            GeneratedItem::Customer(_) => "customer",
103            GeneratedItem::Material(_) => "material",
104            GeneratedItem::Employee(_) => "employee",
105            GeneratedItem::JournalEntry(_) => "journal_entry",
106            GeneratedItem::PurchaseOrder(_) => "purchase_order",
107            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109            GeneratedItem::Payment(_) => "payment",
110            GeneratedItem::SalesOrder(_) => "sales_order",
111            GeneratedItem::Delivery(_) => "delivery",
112            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113            GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114            GeneratedItem::Progress(_) => "progress",
115            GeneratedItem::PhaseComplete(_) => "phase_complete",
116        }
117    }
118}
119
120/// Phase of generation.
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123    /// Chart of Accounts generation.
124    ChartOfAccounts,
125    /// Master data generation (vendors, customers, etc.).
126    MasterData,
127    /// Document flow generation (P2P, O2C).
128    DocumentFlows,
129    /// OCPM event log generation.
130    OcpmEvents,
131    /// Journal entry generation.
132    JournalEntries,
133    /// Anomaly injection.
134    AnomalyInjection,
135    /// Balance validation.
136    BalanceValidation,
137    /// Data quality injection.
138    DataQuality,
139    /// Complete.
140    Complete,
141}
142
143impl GenerationPhase {
144    /// Returns the phase name.
145    pub fn name(&self) -> &'static str {
146        match self {
147            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148            GenerationPhase::MasterData => "master_data",
149            GenerationPhase::DocumentFlows => "document_flows",
150            GenerationPhase::OcpmEvents => "ocpm_events",
151            GenerationPhase::JournalEntries => "journal_entries",
152            GenerationPhase::AnomalyInjection => "anomaly_injection",
153            GenerationPhase::BalanceValidation => "balance_validation",
154            GenerationPhase::DataQuality => "data_quality",
155            GenerationPhase::Complete => "complete",
156        }
157    }
158}
159
160/// Configuration for streaming orchestration.
161#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163    /// Generator configuration.
164    pub generator_config: GeneratorConfig,
165    /// Stream configuration.
166    pub stream_config: StreamConfig,
167    /// Phases to execute.
168    pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172    /// Creates a new streaming orchestrator configuration.
173    pub fn new(generator_config: GeneratorConfig) -> Self {
174        Self {
175            generator_config,
176            stream_config: StreamConfig::default(),
177            phases: vec![
178                GenerationPhase::ChartOfAccounts,
179                GenerationPhase::MasterData,
180                GenerationPhase::DocumentFlows,
181                GenerationPhase::JournalEntries,
182            ],
183        }
184    }
185
186    /// Creates a configuration with all phases enabled including OCPM.
187    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188        Self {
189            generator_config,
190            stream_config: StreamConfig::default(),
191            phases: vec![
192                GenerationPhase::ChartOfAccounts,
193                GenerationPhase::MasterData,
194                GenerationPhase::DocumentFlows,
195                GenerationPhase::OcpmEvents,
196                GenerationPhase::JournalEntries,
197                GenerationPhase::AnomalyInjection,
198                GenerationPhase::DataQuality,
199            ],
200        }
201    }
202
203    /// Sets the stream configuration.
204    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205        self.stream_config = config;
206        self
207    }
208
209    /// Sets the phases to execute.
210    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211        self.phases = phases;
212        self
213    }
214}
215
216/// Streaming orchestrator for real-time data generation.
217pub struct StreamingOrchestrator {
218    config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222    /// Creates a new streaming orchestrator.
223    pub fn new(config: StreamingOrchestratorConfig) -> Self {
224        Self { config }
225    }
226
227    /// Creates a streaming orchestrator from generator config with defaults.
228    pub fn from_generator_config(config: GeneratorConfig) -> Self {
229        Self::new(StreamingOrchestratorConfig::new(config))
230    }
231
232    /// Starts streaming generation.
233    ///
234    /// Returns a receiver for stream events and a control handle.
235    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236        let (sender, receiver) = stream_channel(
237            self.config.stream_config.buffer_size,
238            self.config.stream_config.backpressure,
239        );
240
241        let control = Arc::new(StreamControl::new());
242        let control_clone = Arc::clone(&control);
243
244        let config = self.config.clone();
245
246        // Spawn generation thread
247        thread::spawn(move || {
248            let result = Self::run_generation(config, sender, control_clone);
249            if let Err(e) = result {
250                warn!("Streaming generation error: {}", e);
251            }
252        });
253
254        Ok((receiver, control))
255    }
256
257    /// Runs the generation process.
258    fn run_generation(
259        config: StreamingOrchestratorConfig,
260        sender: StreamSender<GeneratedItem>,
261        control: Arc<StreamControl>,
262    ) -> SynthResult<()> {
263        let start_time = Instant::now();
264        let mut items_generated: u64 = 0;
265        let mut phases_completed = Vec::new();
266
267        // Track stats
268        let progress_interval = config.stream_config.progress_interval;
269
270        // Send initial progress
271        let mut progress = StreamProgress::new("initializing");
272        sender.send(StreamEvent::Progress(progress.clone()))?;
273
274        for phase in &config.phases {
275            if control.is_cancelled() {
276                info!("Generation cancelled");
277                break;
278            }
279
280            // Handle pause
281            while control.is_paused() {
282                std::thread::sleep(std::time::Duration::from_millis(100));
283                if control.is_cancelled() {
284                    break;
285                }
286            }
287
288            progress.phase = phase.name().to_string();
289            sender.send(StreamEvent::Progress(progress.clone()))?;
290
291            match phase {
292                GenerationPhase::ChartOfAccounts => {
293                    let result =
294                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295                    items_generated += result;
296                }
297                GenerationPhase::MasterData => {
298                    let result = Self::generate_master_data_phase(
299                        &config.generator_config,
300                        &sender,
301                        &control,
302                    )?;
303                    items_generated += result;
304                }
305                GenerationPhase::DocumentFlows => {
306                    let result = Self::generate_document_flows_phase(
307                        &config.generator_config,
308                        &sender,
309                        &control,
310                        progress_interval,
311                        &mut progress,
312                    )?;
313                    items_generated += result;
314                }
315                GenerationPhase::OcpmEvents => {
316                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317                }
318                GenerationPhase::JournalEntries => {
319                    let result = Self::generate_journal_entries_phase(
320                        &config.generator_config,
321                        &sender,
322                        &control,
323                        progress_interval,
324                        &mut progress,
325                    )?;
326                    items_generated += result;
327                }
328                GenerationPhase::AnomalyInjection => {
329                    info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330                }
331                GenerationPhase::DataQuality => {
332                    info!(
333                        "Data quality injection is not yet supported in streaming mode; skipping"
334                    );
335                }
336                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337                    info!("Phase {:?} is not applicable in streaming mode", phase);
338                }
339            }
340
341            // Send phase completion
342            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343                phase.name().to_string(),
344            )))?;
345            phases_completed.push(phase.name().to_string());
346
347            // Update progress
348            progress.items_generated = items_generated;
349            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350            if progress.elapsed_ms > 0 {
351                progress.items_per_second =
352                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353            }
354            sender.send(StreamEvent::Progress(progress.clone()))?;
355        }
356
357        // Send completion
358        let stats = sender.stats();
359        let summary = StreamSummary {
360            total_items: items_generated,
361            total_time_ms: start_time.elapsed().as_millis() as u64,
362            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364            } else {
365                0.0
366            },
367            error_count: 0,
368            dropped_count: stats.items_dropped,
369            peak_memory_mb: None,
370            phases_completed,
371        };
372
373        sender.send(StreamEvent::Complete(summary))?;
374        sender.close();
375
376        Ok(())
377    }
378
379    /// Generates Chart of Accounts phase.
380    fn generate_coa_phase(
381        config: &GeneratorConfig,
382        sender: &StreamSender<GeneratedItem>,
383        control: &Arc<StreamControl>,
384    ) -> SynthResult<u64> {
385        use datasynth_generators::ChartOfAccountsGenerator;
386
387        if control.is_cancelled() {
388            return Ok(0);
389        }
390
391        info!("Generating Chart of Accounts");
392        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393        let complexity = config.chart_of_accounts.complexity;
394        let industry = config.global.industry;
395        let coa_framework = resolve_coa_framework_from_config(config);
396
397        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398            .with_coa_framework(coa_framework);
399        let coa = coa_gen.generate();
400
401        let account_count = coa.account_count() as u64;
402        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403            coa,
404        ))))?;
405
406        Ok(account_count)
407    }
408
409    /// Generates master data phase.
410    fn generate_master_data_phase(
411        config: &GeneratorConfig,
412        sender: &StreamSender<GeneratedItem>,
413        control: &Arc<StreamControl>,
414    ) -> SynthResult<u64> {
415        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417        let mut count: u64 = 0;
418        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419        let md_config = &config.master_data;
420        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421            .unwrap_or_else(|e| {
422                tracing::warn!(
423                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424                    config.global.start_date,
425                    e
426                );
427                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428            });
429
430        let company_code = config
431            .companies
432            .first()
433            .map(|c| c.code.as_str())
434            .unwrap_or_else(|| {
435                tracing::warn!("No companies configured, defaulting to company code '1000'");
436                "1000"
437            });
438
439        // Generate vendors
440        if control.is_cancelled() {
441            return Ok(count);
442        }
443
444        info!("Generating vendors");
445        let mut vendor_gen = VendorGenerator::new(seed);
446        for _ in 0..md_config.vendors.count {
447            if control.is_cancelled() {
448                break;
449            }
450            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452            count += 1;
453        }
454
455        // Generate customers
456        if control.is_cancelled() {
457            return Ok(count);
458        }
459
460        info!("Generating customers");
461        let mut customer_gen = CustomerGenerator::new(seed + 1);
462        for _ in 0..md_config.customers.count {
463            if control.is_cancelled() {
464                break;
465            }
466            let customer = customer_gen.generate_customer(company_code, effective_date);
467            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468                customer,
469            ))))?;
470            count += 1;
471        }
472
473        // Generate employees
474        if control.is_cancelled() {
475            return Ok(count);
476        }
477
478        info!("Generating employees");
479        let mut employee_gen = EmployeeGenerator::new(seed + 4);
480        // Use first department from config, falling back to a default
481        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482            datasynth_generators::DepartmentDefinition {
483                code: first_custom.code.clone(),
484                name: first_custom.name.clone(),
485                cost_center: first_custom
486                    .cost_center
487                    .clone()
488                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489                headcount: 10,
490                system_roles: vec![],
491                transaction_codes: vec![],
492            }
493        } else {
494            warn!("No departments configured, using default 'General' department");
495            datasynth_generators::DepartmentDefinition {
496                code: "1000".to_string(),
497                name: "General".to_string(),
498                cost_center: "CC1000".to_string(),
499                headcount: 10,
500                system_roles: vec![],
501                transaction_codes: vec![],
502            }
503        };
504        for _ in 0..md_config.employees.count {
505            if control.is_cancelled() {
506                break;
507            }
508            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510                employee,
511            ))))?;
512            count += 1;
513        }
514
515        Ok(count)
516    }
517
518    /// Generates journal entries phase.
519    ///
520    /// Note: This is a simplified version that generates basic journal entries.
521    /// For full-featured generation with all options, use EnhancedOrchestrator.
522    ///
523    /// When anomaly injection is enabled in config, anomalies are applied inline
524    /// to each batch of generated JEs before streaming them out.
525    fn generate_journal_entries_phase(
526        config: &GeneratorConfig,
527        sender: &StreamSender<GeneratedItem>,
528        control: &Arc<StreamControl>,
529        progress_interval: u64,
530        progress: &mut StreamProgress,
531    ) -> SynthResult<u64> {
532        use datasynth_generators::{
533            AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator, JournalEntryGenerator,
534        };
535        use std::sync::Arc;
536
537        let mut count: u64 = 0;
538        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
539
540        // Calculate total entries to generate based on volume weights
541        let default_monthly = 500;
542        let total_entries: usize = config
543            .companies
544            .iter()
545            .map(|c| {
546                let monthly = (c.volume_weight * default_monthly as f64) as usize;
547                monthly.max(100) * config.global.period_months as usize
548            })
549            .sum();
550
551        progress.items_remaining = Some(total_entries as u64);
552        info!("Generating {} journal entries", total_entries);
553
554        // Generate a shared CoA for all companies
555        let complexity = config.chart_of_accounts.complexity;
556        let industry = config.global.industry;
557        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
558        let coa = Arc::new(coa_gen.generate());
559
560        // Parse start date
561        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
562            .unwrap_or_else(|e| {
563                tracing::warn!(
564                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
565                    config.global.start_date,
566                    e
567                );
568                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
569            });
570        let end_date = start_date
571            .checked_add_months(chrono::Months::new(config.global.period_months))
572            .unwrap_or(start_date + chrono::Duration::days(365));
573
574        // Create JE generator from config
575        let mut je_gen = JournalEntryGenerator::from_generator_config(
576            config,
577            Arc::clone(&coa),
578            start_date,
579            end_date,
580            seed,
581        );
582
583        // Create anomaly injector if enabled.
584        // Priority: anomaly_injection config > fraud config
585        let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
586        let mut anomaly_injector = if anomaly_enabled {
587            let total_rate = if config.anomaly_injection.enabled {
588                config.anomaly_injection.rates.total_rate
589            } else {
590                config.fraud.fraud_rate
591            };
592            let fraud_rate = if config.anomaly_injection.enabled {
593                config.anomaly_injection.rates.fraud_rate
594            } else {
595                AnomalyRateConfig::default().fraud_rate
596            };
597            let error_rate = if config.anomaly_injection.enabled {
598                config.anomaly_injection.rates.error_rate
599            } else {
600                AnomalyRateConfig::default().error_rate
601            };
602            let process_issue_rate = if config.anomaly_injection.enabled {
603                config.anomaly_injection.rates.process_rate
604            } else {
605                AnomalyRateConfig::default().process_issue_rate
606            };
607
608            let injector_config = AnomalyInjectorConfig {
609                rates: AnomalyRateConfig {
610                    total_rate,
611                    fraud_rate,
612                    error_rate,
613                    process_issue_rate,
614                    ..Default::default()
615                },
616                seed: seed + 5000,
617                ..Default::default()
618            };
619
620            info!(
621                "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
622                total_rate
623            );
624            Some(AnomalyInjector::new(injector_config))
625        } else {
626            None
627        };
628
629        // Generate JEs in batches when anomaly injection is active,
630        // or one-by-one when it is not.
631        let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
632        let mut remaining = total_entries;
633
634        while remaining > 0 {
635            if control.is_cancelled() {
636                break;
637            }
638
639            let current_batch = remaining.min(batch_size);
640            let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
641
642            for _ in 0..current_batch {
643                if control.is_cancelled() {
644                    break;
645                }
646
647                // Handle pause
648                while control.is_paused() {
649                    std::thread::sleep(std::time::Duration::from_millis(100));
650                    if control.is_cancelled() {
651                        break;
652                    }
653                }
654
655                batch.push(je_gen.generate());
656            }
657
658            if batch.is_empty() {
659                break;
660            }
661
662            // Apply anomaly injection to the batch if enabled
663            if let Some(ref mut injector) = anomaly_injector {
664                let result = injector.process_entries(&mut batch);
665
666                // Stream any generated anomaly labels
667                for label in result.labels {
668                    sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
669                        label,
670                    ))))?;
671                }
672            }
673
674            // Send the (possibly mutated) JEs
675            for je in batch {
676                sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
677                count += 1;
678
679                // Send progress updates
680                if count.is_multiple_of(progress_interval) {
681                    progress.items_generated = count;
682                    progress.items_remaining = Some(total_entries as u64 - count);
683                    sender.send(StreamEvent::Progress(progress.clone()))?;
684                }
685            }
686
687            remaining = remaining.saturating_sub(current_batch);
688        }
689
690        Ok(count)
691    }
692
693    /// Generates document flows phase (P2P and O2C).
694    ///
695    /// Creates complete document chains:
696    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
697    /// - O2C: SalesOrder → Delivery → CustomerInvoice
698    fn generate_document_flows_phase(
699        config: &GeneratorConfig,
700        sender: &StreamSender<GeneratedItem>,
701        control: &Arc<StreamControl>,
702        progress_interval: u64,
703        progress: &mut StreamProgress,
704    ) -> SynthResult<u64> {
705        use chrono::Datelike;
706        use datasynth_generators::{
707            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
708        };
709
710        let mut count: u64 = 0;
711        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
712        let df_config = &config.document_flows;
713        let md_config = &config.master_data;
714
715        // Parse dates
716        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
717            .unwrap_or_else(|e| {
718                tracing::warn!(
719                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
720                    config.global.start_date,
721                    e
722                );
723                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
724            });
725        let end_date = start_date
726            .checked_add_months(chrono::Months::new(config.global.period_months))
727            .unwrap_or(start_date + chrono::Duration::days(365));
728        let total_period_days = (end_date - start_date).num_days().max(1);
729
730        let company_code = config
731            .companies
732            .first()
733            .map(|c| c.code.as_str())
734            .unwrap_or_else(|| {
735                tracing::warn!("No companies configured, defaulting to company code '1000'");
736                "1000"
737            });
738
739        // Use master data config counts for generating reference data
740        let vendor_count = md_config.vendors.count.min(100);
741        let customer_count = md_config.customers.count.min(100);
742        let material_count = md_config.materials.count.min(50);
743
744        // Generate some master data for document flows
745        let mut vendor_gen = VendorGenerator::new(seed);
746        let mut customer_gen = CustomerGenerator::new(seed + 1);
747        let mut material_gen = MaterialGenerator::new(seed + 2);
748
749        let vendors: Vec<_> = (0..vendor_count)
750            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
751            .collect();
752
753        let customers: Vec<_> = (0..customer_count)
754            .map(|_| customer_gen.generate_customer(company_code, start_date))
755            .collect();
756
757        let materials: Vec<_> = (0..material_count)
758            .map(|_| material_gen.generate_material(company_code, start_date))
759            .collect();
760
761        // Determine number of chains based on transaction volume
762        // Use period months as a multiplier for document chains
763        let base_chains = (config.global.period_months as usize * 50).max(100);
764
765        // P2P Generation
766        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
767            info!("Generating P2P document flows");
768            let mut p2p_gen = P2PGenerator::new(seed + 100);
769
770            let chains_to_generate = base_chains.min(1000);
771            progress.items_remaining = Some(chains_to_generate as u64);
772
773            for i in 0..chains_to_generate {
774                if control.is_cancelled() {
775                    break;
776                }
777
778                // Handle pause
779                while control.is_paused() {
780                    std::thread::sleep(std::time::Duration::from_millis(100));
781                    if control.is_cancelled() {
782                        break;
783                    }
784                }
785
786                let vendor = &vendors[i % vendors.len()];
787                let material_refs: Vec<&datasynth_core::models::Material> =
788                    vec![&materials[i % materials.len()]];
789
790                // Calculate posting date within the period
791                let days_offset = (i as i64 % total_period_days).max(0);
792                let po_date = start_date + chrono::Duration::days(days_offset);
793                let fiscal_year = po_date.year() as u16;
794                let fiscal_period = po_date.month() as u8;
795
796                let chain = p2p_gen.generate_chain(
797                    company_code,
798                    vendor,
799                    &material_refs,
800                    po_date,
801                    fiscal_year,
802                    fiscal_period,
803                    "SYSTEM",
804                );
805
806                // Send each document in the chain
807                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
808                    chain.purchase_order,
809                ))))?;
810                count += 1;
811
812                for gr in chain.goods_receipts {
813                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
814                    count += 1;
815                }
816
817                if let Some(vi) = chain.vendor_invoice {
818                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
819                        vi,
820                    ))))?;
821                    count += 1;
822                }
823
824                if let Some(payment) = chain.payment {
825                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
826                    count += 1;
827                }
828
829                if count.is_multiple_of(progress_interval) {
830                    progress.items_generated = count;
831                    sender.send(StreamEvent::Progress(progress.clone()))?;
832                }
833            }
834        }
835
836        // O2C Generation
837        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
838            info!("Generating O2C document flows");
839            let mut o2c_gen = O2CGenerator::new(seed + 200);
840
841            let chains_to_generate = base_chains.min(1000);
842
843            for i in 0..chains_to_generate {
844                if control.is_cancelled() {
845                    break;
846                }
847
848                while control.is_paused() {
849                    std::thread::sleep(std::time::Duration::from_millis(100));
850                    if control.is_cancelled() {
851                        break;
852                    }
853                }
854
855                let customer = &customers[i % customers.len()];
856                let material_refs: Vec<&datasynth_core::models::Material> =
857                    vec![&materials[i % materials.len()]];
858
859                let days_offset = (i as i64 % total_period_days).max(0);
860                let so_date = start_date + chrono::Duration::days(days_offset);
861                let fiscal_year = so_date.year() as u16;
862                let fiscal_period = so_date.month() as u8;
863
864                let chain = o2c_gen.generate_chain(
865                    company_code,
866                    customer,
867                    &material_refs,
868                    so_date,
869                    fiscal_year,
870                    fiscal_period,
871                    "SYSTEM",
872                );
873
874                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
875                    chain.sales_order,
876                ))))?;
877                count += 1;
878
879                for delivery in chain.deliveries {
880                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
881                        delivery,
882                    ))))?;
883                    count += 1;
884                }
885
886                if let Some(ci) = chain.customer_invoice {
887                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
888                        ci,
889                    ))))?;
890                    count += 1;
891                }
892
893                if count.is_multiple_of(progress_interval) {
894                    progress.items_generated = count;
895                    sender.send(StreamEvent::Progress(progress.clone()))?;
896                }
897            }
898        }
899
900        Ok(count)
901    }
902
903    /// Returns the orchestrator configuration stats.
904    pub fn stats(&self) -> StreamingOrchestratorStats {
905        StreamingOrchestratorStats {
906            phases: self.config.phases.len(),
907            buffer_size: self.config.stream_config.buffer_size,
908            backpressure: self.config.stream_config.backpressure,
909        }
910    }
911}
912
913/// Statistics for the streaming orchestrator.
914#[derive(Debug, Clone)]
915pub struct StreamingOrchestratorStats {
916    /// Number of phases configured.
917    pub phases: usize,
918    /// Buffer size.
919    pub buffer_size: usize,
920    /// Backpressure strategy.
921    pub backpressure: BackpressureStrategy,
922}
923
924/// Resolve CoA framework from a GeneratorConfig.
925fn resolve_coa_framework_from_config(
926    config: &GeneratorConfig,
927) -> datasynth_generators::coa_generator::CoAFramework {
928    use datasynth_generators::coa_generator::CoAFramework;
929    if config.accounting_standards.enabled {
930        match config.accounting_standards.framework {
931            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
932                return CoAFramework::FrenchPcg;
933            }
934            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
935                return CoAFramework::GermanSkr04;
936            }
937            _ => {}
938        }
939    }
940    CoAFramework::UsGaap
941}
942
943#[cfg(test)]
944#[allow(clippy::unwrap_used)]
945mod tests {
946    use super::*;
947    use datasynth_config::presets::create_preset;
948    use datasynth_config::schema::TransactionVolume;
949    use datasynth_core::models::{CoAComplexity, IndustrySector};
950
951    fn create_test_config() -> GeneratorConfig {
952        create_preset(
953            IndustrySector::Retail,
954            2,
955            3,
956            CoAComplexity::Small,
957            TransactionVolume::TenK,
958        )
959    }
960
961    #[test]
962    fn test_streaming_orchestrator_creation() {
963        let config = create_test_config();
964        let orchestrator = StreamingOrchestrator::from_generator_config(config);
965        let stats = orchestrator.stats();
966
967        assert!(stats.phases > 0);
968        assert!(stats.buffer_size > 0);
969    }
970
971    #[test]
972    fn test_streaming_generation() {
973        let mut config = create_test_config();
974        // Reduce volume for testing
975        config.master_data.vendors.count = 5;
976        config.master_data.customers.count = 5;
977        config.master_data.employees.count = 5;
978        config.global.period_months = 1;
979
980        let streaming_config = StreamingOrchestratorConfig::new(config)
981            .with_phases(vec![
982                GenerationPhase::ChartOfAccounts,
983                GenerationPhase::MasterData,
984            ])
985            .with_stream_config(StreamConfig {
986                buffer_size: 100,
987                progress_interval: 10,
988                ..Default::default()
989            });
990
991        let orchestrator = StreamingOrchestrator::new(streaming_config);
992        let (receiver, _control) = orchestrator.stream().unwrap();
993
994        let mut items_count = 0;
995        let mut has_coa = false;
996        let mut has_completion = false;
997
998        for event in receiver {
999            match event {
1000                StreamEvent::Data(item) => {
1001                    items_count += 1;
1002                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1003                        has_coa = true;
1004                    }
1005                }
1006                StreamEvent::Complete(_) => {
1007                    has_completion = true;
1008                    break;
1009                }
1010                _ => {}
1011            }
1012        }
1013
1014        assert!(items_count > 0);
1015        assert!(has_coa);
1016        assert!(has_completion);
1017    }
1018
1019    #[test]
1020    fn test_stream_cancellation() {
1021        let mut config = create_test_config();
1022        config.global.period_months = 12; // Longer generation
1023
1024        let streaming_config = StreamingOrchestratorConfig::new(config)
1025            .with_phases(vec![GenerationPhase::JournalEntries]);
1026
1027        let orchestrator = StreamingOrchestrator::new(streaming_config);
1028        let (receiver, control) = orchestrator.stream().unwrap();
1029
1030        // Cancel after receiving some items
1031        let mut items_count = 0;
1032        for event in receiver {
1033            if let StreamEvent::Data(_) = event {
1034                items_count += 1;
1035                if items_count >= 10 {
1036                    control.cancel();
1037                    break;
1038                }
1039            }
1040        }
1041
1042        assert!(control.is_cancelled());
1043    }
1044
1045    #[test]
1046    fn test_streaming_anomaly_injection() {
1047        let mut config = create_test_config();
1048        // Reduce volume for fast testing but keep enough entries for anomalies
1049        config.master_data.vendors.count = 3;
1050        config.master_data.customers.count = 3;
1051        config.master_data.employees.count = 3;
1052        config.global.period_months = 1;
1053
1054        // Enable anomaly injection with a high rate to guarantee some are created
1055        config.anomaly_injection.enabled = true;
1056        config.anomaly_injection.rates.total_rate = 0.20; // 20% to ensure hits
1057        config.anomaly_injection.rates.fraud_rate = 0.40;
1058        config.anomaly_injection.rates.error_rate = 0.40;
1059        config.anomaly_injection.rates.process_rate = 0.20;
1060
1061        let streaming_config = StreamingOrchestratorConfig::new(config)
1062            .with_phases(vec![GenerationPhase::JournalEntries])
1063            .with_stream_config(StreamConfig {
1064                buffer_size: 500,
1065                progress_interval: 50,
1066                ..Default::default()
1067            });
1068
1069        let orchestrator = StreamingOrchestrator::new(streaming_config);
1070        let (receiver, _control) = orchestrator.stream().unwrap();
1071
1072        let mut je_count = 0;
1073        let mut label_count = 0;
1074        let mut has_completion = false;
1075
1076        for event in receiver {
1077            match event {
1078                StreamEvent::Data(item) => match item {
1079                    GeneratedItem::JournalEntry(_) => je_count += 1,
1080                    GeneratedItem::AnomalyLabel(_) => label_count += 1,
1081                    _ => {}
1082                },
1083                StreamEvent::Complete(_) => {
1084                    has_completion = true;
1085                    break;
1086                }
1087                _ => {}
1088            }
1089        }
1090
1091        assert!(has_completion, "Stream should complete");
1092        assert!(je_count > 0, "Should generate journal entries");
1093        assert!(
1094            label_count > 0,
1095            "Should generate anomaly labels (got {} JEs, {} labels)",
1096            je_count,
1097            label_count
1098        );
1099    }
1100
1101    #[test]
1102    fn test_streaming_no_anomalies_when_disabled() {
1103        let mut config = create_test_config();
1104        config.master_data.vendors.count = 3;
1105        config.master_data.customers.count = 3;
1106        config.master_data.employees.count = 3;
1107        config.global.period_months = 1;
1108
1109        // Ensure anomaly injection is disabled
1110        config.anomaly_injection.enabled = false;
1111        config.fraud.enabled = false;
1112
1113        let streaming_config = StreamingOrchestratorConfig::new(config)
1114            .with_phases(vec![GenerationPhase::JournalEntries])
1115            .with_stream_config(StreamConfig {
1116                buffer_size: 500,
1117                progress_interval: 50,
1118                ..Default::default()
1119            });
1120
1121        let orchestrator = StreamingOrchestrator::new(streaming_config);
1122        let (receiver, _control) = orchestrator.stream().unwrap();
1123
1124        let mut label_count = 0;
1125
1126        for event in receiver {
1127            match event {
1128                StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1129                StreamEvent::Complete(_) => break,
1130                _ => {}
1131            }
1132        }
1133
1134        assert_eq!(
1135            label_count, 0,
1136            "Should not generate anomaly labels when disabled"
1137        );
1138    }
1139
1140    #[test]
1141    fn test_generated_item_type_name() {
1142        use datasynth_core::models::{CoAComplexity, IndustrySector};
1143
1144        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1145            "TEST_COA".to_string(),
1146            "Test Chart of Accounts".to_string(),
1147            "US".to_string(),
1148            IndustrySector::Manufacturing,
1149            CoAComplexity::Small,
1150        )));
1151        assert_eq!(coa.type_name(), "chart_of_accounts");
1152
1153        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1154        assert_eq!(progress.type_name(), "progress");
1155    }
1156}