Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5//!
6//! # Limitations — simplified pipeline
7//!
8//! This module implements a **simplified streaming pipeline** intended for
9//! interactive / server-side use cases where data is consumed as it is produced.
10//! It differs from the full-batch [`EnhancedOrchestrator`] in the following ways:
11//!
12//! - **Subset of generators**: Only journal entries, master data (vendors, customers,
13//!   materials, employees), document-flow documents (POs, GRs, invoices, payments,
14//!   sales orders, deliveries), and anomaly labels are streamed.  Advanced generators
15//!   (subledgers, period-close, audit, tax, ESG, banking, graph export, etc.) are
16//!   **not** invoked.
17//!
18//! - **No cross-generator state**: The streaming pipeline does not carry forward
19//!   balance tracker state, trial-balance snapshots, or depreciation runs between
20//!   periods.  Outputs are therefore not coherent across the full accounting cycle.
21//!
22//! - **No output sinks**: Records are emitted directly over the [`StreamSender`]
23//!   channel; CSV / JSON / Parquet file writing is the responsibility of the consumer.
24//!
25//! - **Backpressure model**: The orchestrator honours the [`BackpressureStrategy`]
26//!   configured on the [`StreamConfig`], but does not implement adaptive throttling
27//!   based on CPU / memory pressure (unlike the resource-guarded batch path).
28//!
29//! For full-fidelity, standards-compliant generation use
30//! [`EnhancedOrchestrator`](crate::enhanced_orchestrator::EnhancedOrchestrator) with
31//! a file-based output sink.
32//!
33//! [`EnhancedOrchestrator`]: crate::enhanced_orchestrator::EnhancedOrchestrator
34
35use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44/// Default RNG seed when not specified in config.
45const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48    documents::{
49        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50    },
51    AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52    Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59/// Generated items that can be streamed.
60#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62    /// Chart of Accounts.
63    ChartOfAccounts(Box<ChartOfAccounts>),
64    /// A vendor.
65    Vendor(Box<Vendor>),
66    /// A customer.
67    Customer(Box<Customer>),
68    /// A material.
69    Material(Box<Material>),
70    /// An employee.
71    Employee(Box<Employee>),
72    /// A journal entry.
73    JournalEntry(Box<JournalEntry>),
74    /// A purchase order (P2P).
75    PurchaseOrder(Box<PurchaseOrder>),
76    /// A goods receipt (P2P).
77    GoodsReceipt(Box<GoodsReceipt>),
78    /// A vendor invoice (P2P).
79    VendorInvoice(Box<VendorInvoice>),
80    /// A payment (P2P/O2C).
81    Payment(Box<Payment>),
82    /// A sales order (O2C).
83    SalesOrder(Box<SalesOrder>),
84    /// A delivery (O2C).
85    Delivery(Box<Delivery>),
86    /// A customer invoice (O2C).
87    CustomerInvoice(Box<CustomerInvoice>),
88    /// An anomaly label (injected during JE generation).
89    AnomalyLabel(Box<LabeledAnomaly>),
90    /// Progress update.
91    Progress(StreamProgress),
92    /// Phase completion marker.
93    PhaseComplete(String),
94}
95
96impl GeneratedItem {
97    /// Returns the item type name.
98    pub fn type_name(&self) -> &'static str {
99        match self {
100            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101            GeneratedItem::Vendor(_) => "vendor",
102            GeneratedItem::Customer(_) => "customer",
103            GeneratedItem::Material(_) => "material",
104            GeneratedItem::Employee(_) => "employee",
105            GeneratedItem::JournalEntry(_) => "journal_entry",
106            GeneratedItem::PurchaseOrder(_) => "purchase_order",
107            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109            GeneratedItem::Payment(_) => "payment",
110            GeneratedItem::SalesOrder(_) => "sales_order",
111            GeneratedItem::Delivery(_) => "delivery",
112            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113            GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114            GeneratedItem::Progress(_) => "progress",
115            GeneratedItem::PhaseComplete(_) => "phase_complete",
116        }
117    }
118}
119
120/// Phase of generation.
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123    /// Chart of Accounts generation.
124    ChartOfAccounts,
125    /// Master data generation (vendors, customers, etc.).
126    MasterData,
127    /// Document flow generation (P2P, O2C).
128    DocumentFlows,
129    /// OCPM event log generation.
130    OcpmEvents,
131    /// Journal entry generation.
132    JournalEntries,
133    /// Anomaly injection.
134    AnomalyInjection,
135    /// Balance validation.
136    BalanceValidation,
137    /// Data quality injection.
138    DataQuality,
139    /// Complete.
140    Complete,
141}
142
143impl GenerationPhase {
144    /// Returns the phase name.
145    pub fn name(&self) -> &'static str {
146        match self {
147            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148            GenerationPhase::MasterData => "master_data",
149            GenerationPhase::DocumentFlows => "document_flows",
150            GenerationPhase::OcpmEvents => "ocpm_events",
151            GenerationPhase::JournalEntries => "journal_entries",
152            GenerationPhase::AnomalyInjection => "anomaly_injection",
153            GenerationPhase::BalanceValidation => "balance_validation",
154            GenerationPhase::DataQuality => "data_quality",
155            GenerationPhase::Complete => "complete",
156        }
157    }
158}
159
160/// Configuration for streaming orchestration.
161#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163    /// Generator configuration.
164    pub generator_config: GeneratorConfig,
165    /// Stream configuration.
166    pub stream_config: StreamConfig,
167    /// Phases to execute.
168    pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172    /// Creates a new streaming orchestrator configuration.
173    pub fn new(generator_config: GeneratorConfig) -> Self {
174        Self {
175            generator_config,
176            stream_config: StreamConfig::default(),
177            phases: vec![
178                GenerationPhase::ChartOfAccounts,
179                GenerationPhase::MasterData,
180                GenerationPhase::DocumentFlows,
181                GenerationPhase::JournalEntries,
182            ],
183        }
184    }
185
186    /// Creates a configuration with all phases enabled including OCPM.
187    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188        Self {
189            generator_config,
190            stream_config: StreamConfig::default(),
191            phases: vec![
192                GenerationPhase::ChartOfAccounts,
193                GenerationPhase::MasterData,
194                GenerationPhase::DocumentFlows,
195                GenerationPhase::OcpmEvents,
196                GenerationPhase::JournalEntries,
197                GenerationPhase::AnomalyInjection,
198                GenerationPhase::DataQuality,
199            ],
200        }
201    }
202
203    /// Sets the stream configuration.
204    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205        self.stream_config = config;
206        self
207    }
208
209    /// Sets the phases to execute.
210    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211        self.phases = phases;
212        self
213    }
214}
215
216/// Streaming orchestrator for real-time data generation.
217pub struct StreamingOrchestrator {
218    config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222    /// Creates a new streaming orchestrator.
223    pub fn new(config: StreamingOrchestratorConfig) -> Self {
224        Self { config }
225    }
226
227    /// Creates a streaming orchestrator from generator config with defaults.
228    pub fn from_generator_config(config: GeneratorConfig) -> Self {
229        Self::new(StreamingOrchestratorConfig::new(config))
230    }
231
232    /// Starts streaming generation.
233    ///
234    /// Returns a receiver for stream events and a control handle.
235    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236        let (sender, receiver) = stream_channel(
237            self.config.stream_config.buffer_size,
238            self.config.stream_config.backpressure,
239        );
240
241        let control = Arc::new(StreamControl::new());
242        let control_clone = Arc::clone(&control);
243
244        let config = self.config.clone();
245
246        // Spawn generation thread
247        thread::spawn(move || {
248            let result = Self::run_generation(config, sender, control_clone);
249            if let Err(e) = result {
250                warn!("Streaming generation error: {}", e);
251            }
252        });
253
254        Ok((receiver, control))
255    }
256
257    /// Runs the generation process.
258    fn run_generation(
259        config: StreamingOrchestratorConfig,
260        sender: StreamSender<GeneratedItem>,
261        control: Arc<StreamControl>,
262    ) -> SynthResult<()> {
263        let start_time = Instant::now();
264        let mut items_generated: u64 = 0;
265        let mut phases_completed = Vec::new();
266
267        // Track stats
268        let progress_interval = config.stream_config.progress_interval;
269
270        // Send initial progress
271        let mut progress = StreamProgress::new("initializing");
272        sender.send(StreamEvent::Progress(progress.clone()))?;
273
274        for phase in &config.phases {
275            if control.is_cancelled() {
276                info!("Generation cancelled");
277                break;
278            }
279
280            // Handle pause
281            while control.is_paused() {
282                std::thread::sleep(std::time::Duration::from_millis(100));
283                if control.is_cancelled() {
284                    break;
285                }
286            }
287
288            progress.phase = phase.name().to_string();
289            sender.send(StreamEvent::Progress(progress.clone()))?;
290
291            match phase {
292                GenerationPhase::ChartOfAccounts => {
293                    let result =
294                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295                    items_generated += result;
296                }
297                GenerationPhase::MasterData => {
298                    let result = Self::generate_master_data_phase(
299                        &config.generator_config,
300                        &sender,
301                        &control,
302                    )?;
303                    items_generated += result;
304                }
305                GenerationPhase::DocumentFlows => {
306                    let result = Self::generate_document_flows_phase(
307                        &config.generator_config,
308                        &sender,
309                        &control,
310                        progress_interval,
311                        &mut progress,
312                    )?;
313                    items_generated += result;
314                }
315                GenerationPhase::OcpmEvents => {
316                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317                }
318                GenerationPhase::JournalEntries => {
319                    let result = Self::generate_journal_entries_phase(
320                        &config.generator_config,
321                        &sender,
322                        &control,
323                        progress_interval,
324                        &mut progress,
325                    )?;
326                    items_generated += result;
327                }
328                GenerationPhase::AnomalyInjection => {
329                    info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330                }
331                GenerationPhase::DataQuality => {
332                    info!(
333                        "Data quality injection is not yet supported in streaming mode; skipping"
334                    );
335                }
336                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337                    info!("Phase {:?} is not applicable in streaming mode", phase);
338                }
339            }
340
341            // Send phase completion
342            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343                phase.name().to_string(),
344            )))?;
345            phases_completed.push(phase.name().to_string());
346
347            // Update progress
348            progress.items_generated = items_generated;
349            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350            if progress.elapsed_ms > 0 {
351                progress.items_per_second =
352                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353            }
354            sender.send(StreamEvent::Progress(progress.clone()))?;
355        }
356
357        // Send completion
358        let stats = sender.stats();
359        let summary = StreamSummary {
360            total_items: items_generated,
361            total_time_ms: start_time.elapsed().as_millis() as u64,
362            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364            } else {
365                0.0
366            },
367            error_count: 0,
368            dropped_count: stats.items_dropped,
369            peak_memory_mb: None,
370            phases_completed,
371        };
372
373        sender.send(StreamEvent::Complete(summary))?;
374        sender.close();
375
376        Ok(())
377    }
378
379    /// Generates Chart of Accounts phase.
380    fn generate_coa_phase(
381        config: &GeneratorConfig,
382        sender: &StreamSender<GeneratedItem>,
383        control: &Arc<StreamControl>,
384    ) -> SynthResult<u64> {
385        use datasynth_generators::ChartOfAccountsGenerator;
386
387        if control.is_cancelled() {
388            return Ok(0);
389        }
390
391        info!("Generating Chart of Accounts");
392        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393        let complexity = config.chart_of_accounts.complexity;
394        let industry = config.global.industry;
395        let coa_framework = resolve_coa_framework_from_config(config);
396
397        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398            .with_coa_framework(coa_framework);
399        let coa = coa_gen.generate();
400
401        let account_count = coa.account_count() as u64;
402        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403            coa,
404        ))))?;
405
406        Ok(account_count)
407    }
408
409    /// Generates master data phase.
410    fn generate_master_data_phase(
411        config: &GeneratorConfig,
412        sender: &StreamSender<GeneratedItem>,
413        control: &Arc<StreamControl>,
414    ) -> SynthResult<u64> {
415        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417        let mut count: u64 = 0;
418        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419        let md_config = &config.master_data;
420        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421            .unwrap_or_else(|e| {
422                tracing::warn!(
423                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424                    config.global.start_date,
425                    e
426                );
427                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428            });
429
430        let company_code = config
431            .companies
432            .first()
433            .map(|c| c.code.as_str())
434            .unwrap_or_else(|| {
435                tracing::warn!("No companies configured, defaulting to company code '1000'");
436                "1000"
437            });
438
439        // Generate vendors
440        if control.is_cancelled() {
441            return Ok(count);
442        }
443
444        info!("Generating vendors");
445        let mut vendor_gen = VendorGenerator::new(seed);
446        for _ in 0..md_config.vendors.count {
447            if control.is_cancelled() {
448                break;
449            }
450            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452            count += 1;
453        }
454
455        // Generate customers
456        if control.is_cancelled() {
457            return Ok(count);
458        }
459
460        info!("Generating customers");
461        let mut customer_gen = CustomerGenerator::new(seed + 1);
462        for _ in 0..md_config.customers.count {
463            if control.is_cancelled() {
464                break;
465            }
466            let customer = customer_gen.generate_customer(company_code, effective_date);
467            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468                customer,
469            ))))?;
470            count += 1;
471        }
472
473        // Generate employees
474        if control.is_cancelled() {
475            return Ok(count);
476        }
477
478        info!("Generating employees");
479        let mut employee_gen = EmployeeGenerator::new(seed + 4);
480        // Use first department from config, falling back to a default
481        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482            datasynth_generators::DepartmentDefinition {
483                code: first_custom.code.clone(),
484                name: first_custom.name.clone(),
485                cost_center: first_custom
486                    .cost_center
487                    .clone()
488                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489                headcount: 10,
490                system_roles: vec![],
491                transaction_codes: vec![],
492            }
493        } else {
494            warn!("No departments configured, using default 'General' department");
495            datasynth_generators::DepartmentDefinition {
496                code: "1000".to_string(),
497                name: "General".to_string(),
498                cost_center: "CC1000".to_string(),
499                headcount: 10,
500                system_roles: vec![],
501                transaction_codes: vec![],
502            }
503        };
504        for _ in 0..md_config.employees.count {
505            if control.is_cancelled() {
506                break;
507            }
508            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510                employee,
511            ))))?;
512            count += 1;
513        }
514
515        Ok(count)
516    }
517
518    /// Generates journal entries phase.
519    ///
520    /// Note: This is a simplified version that generates basic journal entries.
521    /// For full-featured generation with all options, use EnhancedOrchestrator.
522    ///
523    /// When anomaly injection is enabled in config, anomalies are applied inline
524    /// to each batch of generated JEs before streaming them out.
525    fn generate_journal_entries_phase(
526        config: &GeneratorConfig,
527        sender: &StreamSender<GeneratedItem>,
528        control: &Arc<StreamControl>,
529        progress_interval: u64,
530        progress: &mut StreamProgress,
531    ) -> SynthResult<u64> {
532        use datasynth_generators::{
533            AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator,
534            EnhancedInjectionConfig, JournalEntryGenerator,
535        };
536        use std::sync::Arc;
537
538        let mut count: u64 = 0;
539        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
540
541        // Calculate total entries to generate based on volume weights
542        let default_monthly = 500;
543        let total_entries: usize = config
544            .companies
545            .iter()
546            .map(|c| {
547                let monthly = (c.volume_weight * default_monthly as f64) as usize;
548                monthly.max(100) * config.global.period_months as usize
549            })
550            .sum();
551
552        progress.items_remaining = Some(total_entries as u64);
553        info!("Generating {} journal entries", total_entries);
554
555        // Generate a shared CoA for all companies
556        let complexity = config.chart_of_accounts.complexity;
557        let industry = config.global.industry;
558        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
559        let coa = Arc::new(coa_gen.generate());
560
561        // Parse start date
562        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
563            .unwrap_or_else(|e| {
564                tracing::warn!(
565                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
566                    config.global.start_date,
567                    e
568                );
569                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
570            });
571        let end_date = start_date
572            .checked_add_months(chrono::Months::new(config.global.period_months))
573            .unwrap_or(start_date + chrono::Duration::days(365));
574
575        // Create JE generator from config
576        let mut je_gen = JournalEntryGenerator::from_generator_config(
577            config,
578            Arc::clone(&coa),
579            start_date,
580            end_date,
581            seed,
582        );
583
584        // Create anomaly injector if enabled.
585        // Priority: anomaly_injection config > fraud config
586        let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
587        let mut anomaly_injector = if anomaly_enabled {
588            let total_rate = if config.anomaly_injection.enabled {
589                config.anomaly_injection.rates.total_rate
590            } else {
591                config.fraud.fraud_rate
592            };
593            let fraud_rate = if config.anomaly_injection.enabled {
594                config.anomaly_injection.rates.fraud_rate
595            } else {
596                AnomalyRateConfig::default().fraud_rate
597            };
598            let error_rate = if config.anomaly_injection.enabled {
599                config.anomaly_injection.rates.error_rate
600            } else {
601                AnomalyRateConfig::default().error_rate
602            };
603            let process_issue_rate = if config.anomaly_injection.enabled {
604                config.anomaly_injection.rates.process_rate
605            } else {
606                AnomalyRateConfig::default().process_issue_rate
607            };
608
609            let injector_config = AnomalyInjectorConfig {
610                rates: AnomalyRateConfig {
611                    total_rate,
612                    fraud_rate,
613                    error_rate,
614                    process_issue_rate,
615                    ..Default::default()
616                },
617                enhanced: EnhancedInjectionConfig {
618                    fraud_behavioral_bias: config.fraud.effective_bias().to_core(),
619                    // Persistent fraud campaigns (A1) — off unless config opts in.
620                    fraud_campaign: config.fraud.campaigns.clone(),
621                    ..Default::default()
622                },
623                seed: seed + 5000,
624                ..Default::default()
625            };
626
627            info!(
628                "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
629                total_rate
630            );
631            Some(AnomalyInjector::new(injector_config))
632        } else {
633            None
634        };
635
636        // Generate JEs in batches when anomaly injection is active,
637        // or one-by-one when it is not.
638        let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
639        let mut remaining = total_entries;
640
641        while remaining > 0 {
642            if control.is_cancelled() {
643                break;
644            }
645
646            let current_batch = remaining.min(batch_size);
647            let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
648
649            for _ in 0..current_batch {
650                if control.is_cancelled() {
651                    break;
652                }
653
654                // Handle pause
655                while control.is_paused() {
656                    std::thread::sleep(std::time::Duration::from_millis(100));
657                    if control.is_cancelled() {
658                        break;
659                    }
660                }
661
662                batch.push(je_gen.generate());
663            }
664
665            if batch.is_empty() {
666                break;
667            }
668
669            // Apply anomaly injection to the batch if enabled
670            if let Some(ref mut injector) = anomaly_injector {
671                let result = injector.process_entries(&mut batch);
672
673                // Stream any generated anomaly labels
674                for label in result.labels {
675                    sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
676                        label,
677                    ))))?;
678                }
679            }
680
681            // Send the (possibly mutated) JEs
682            for je in batch {
683                sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
684                count += 1;
685
686                // Send progress updates
687                if count.is_multiple_of(progress_interval) {
688                    progress.items_generated = count;
689                    progress.items_remaining = Some(total_entries as u64 - count);
690                    sender.send(StreamEvent::Progress(progress.clone()))?;
691                }
692            }
693
694            remaining = remaining.saturating_sub(current_batch);
695        }
696
697        Ok(count)
698    }
699
700    /// Generates document flows phase (P2P and O2C).
701    ///
702    /// Creates complete document chains:
703    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
704    /// - O2C: SalesOrder → Delivery → CustomerInvoice
705    fn generate_document_flows_phase(
706        config: &GeneratorConfig,
707        sender: &StreamSender<GeneratedItem>,
708        control: &Arc<StreamControl>,
709        progress_interval: u64,
710        progress: &mut StreamProgress,
711    ) -> SynthResult<u64> {
712        use chrono::Datelike;
713        use datasynth_generators::{
714            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
715        };
716
717        let mut count: u64 = 0;
718        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
719        let df_config = &config.document_flows;
720        let md_config = &config.master_data;
721
722        // Parse dates
723        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
724            .unwrap_or_else(|e| {
725                tracing::warn!(
726                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
727                    config.global.start_date,
728                    e
729                );
730                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
731            });
732        let end_date = start_date
733            .checked_add_months(chrono::Months::new(config.global.period_months))
734            .unwrap_or(start_date + chrono::Duration::days(365));
735        let total_period_days = (end_date - start_date).num_days().max(1);
736
737        let company_code = config
738            .companies
739            .first()
740            .map(|c| c.code.as_str())
741            .unwrap_or_else(|| {
742                tracing::warn!("No companies configured, defaulting to company code '1000'");
743                "1000"
744            });
745
746        // Use master data config counts for generating reference data
747        let vendor_count = md_config.vendors.count.min(100);
748        let customer_count = md_config.customers.count.min(100);
749        let material_count = md_config.materials.count.min(50);
750
751        // Generate some master data for document flows
752        let mut vendor_gen = VendorGenerator::new(seed);
753        let mut customer_gen = CustomerGenerator::new(seed + 1);
754        let mut material_gen = MaterialGenerator::new(seed + 2);
755
756        let vendors: Vec<_> = (0..vendor_count)
757            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
758            .collect();
759
760        let customers: Vec<_> = (0..customer_count)
761            .map(|_| customer_gen.generate_customer(company_code, start_date))
762            .collect();
763
764        let materials: Vec<_> = (0..material_count)
765            .map(|_| material_gen.generate_material(company_code, start_date))
766            .collect();
767
768        // Determine number of chains based on transaction volume
769        // Use period months as a multiplier for document chains
770        let base_chains = (config.global.period_months as usize * 50).max(100);
771
772        // P2P Generation
773        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
774            info!("Generating P2P document flows");
775            let mut p2p_gen = P2PGenerator::new(seed + 100);
776
777            let chains_to_generate = base_chains.min(1000);
778            progress.items_remaining = Some(chains_to_generate as u64);
779
780            for i in 0..chains_to_generate {
781                if control.is_cancelled() {
782                    break;
783                }
784
785                // Handle pause
786                while control.is_paused() {
787                    std::thread::sleep(std::time::Duration::from_millis(100));
788                    if control.is_cancelled() {
789                        break;
790                    }
791                }
792
793                let vendor = &vendors[i % vendors.len()];
794                let material_refs: Vec<&datasynth_core::models::Material> =
795                    vec![&materials[i % materials.len()]];
796
797                // Calculate posting date within the period
798                let days_offset = (i as i64 % total_period_days).max(0);
799                let po_date = start_date + chrono::Duration::days(days_offset);
800                let fiscal_year = po_date.year() as u16;
801                let fiscal_period = po_date.month() as u8;
802
803                let chain = p2p_gen.generate_chain(
804                    company_code,
805                    vendor,
806                    &material_refs,
807                    po_date,
808                    fiscal_year,
809                    fiscal_period,
810                    "SYSTEM",
811                );
812
813                // Send each document in the chain
814                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
815                    chain.purchase_order,
816                ))))?;
817                count += 1;
818
819                for gr in chain.goods_receipts {
820                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
821                    count += 1;
822                }
823
824                if let Some(vi) = chain.vendor_invoice {
825                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
826                        vi,
827                    ))))?;
828                    count += 1;
829                }
830
831                if let Some(payment) = chain.payment {
832                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
833                    count += 1;
834                }
835
836                if count.is_multiple_of(progress_interval) {
837                    progress.items_generated = count;
838                    sender.send(StreamEvent::Progress(progress.clone()))?;
839                }
840            }
841        }
842
843        // O2C Generation
844        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
845            info!("Generating O2C document flows");
846            let mut o2c_gen = O2CGenerator::new(seed + 200);
847
848            let chains_to_generate = base_chains.min(1000);
849
850            for i in 0..chains_to_generate {
851                if control.is_cancelled() {
852                    break;
853                }
854
855                while control.is_paused() {
856                    std::thread::sleep(std::time::Duration::from_millis(100));
857                    if control.is_cancelled() {
858                        break;
859                    }
860                }
861
862                let customer = &customers[i % customers.len()];
863                let material_refs: Vec<&datasynth_core::models::Material> =
864                    vec![&materials[i % materials.len()]];
865
866                let days_offset = (i as i64 % total_period_days).max(0);
867                let so_date = start_date + chrono::Duration::days(days_offset);
868                let fiscal_year = so_date.year() as u16;
869                let fiscal_period = so_date.month() as u8;
870
871                let chain = o2c_gen.generate_chain(
872                    company_code,
873                    customer,
874                    &material_refs,
875                    so_date,
876                    fiscal_year,
877                    fiscal_period,
878                    "SYSTEM",
879                );
880
881                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
882                    chain.sales_order,
883                ))))?;
884                count += 1;
885
886                for delivery in chain.deliveries {
887                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
888                        delivery,
889                    ))))?;
890                    count += 1;
891                }
892
893                if let Some(ci) = chain.customer_invoice {
894                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
895                        ci,
896                    ))))?;
897                    count += 1;
898                }
899
900                if count.is_multiple_of(progress_interval) {
901                    progress.items_generated = count;
902                    sender.send(StreamEvent::Progress(progress.clone()))?;
903                }
904            }
905        }
906
907        Ok(count)
908    }
909
910    /// Returns the orchestrator configuration stats.
911    pub fn stats(&self) -> StreamingOrchestratorStats {
912        StreamingOrchestratorStats {
913            phases: self.config.phases.len(),
914            buffer_size: self.config.stream_config.buffer_size,
915            backpressure: self.config.stream_config.backpressure,
916        }
917    }
918}
919
920/// Statistics for the streaming orchestrator.
921#[derive(Debug, Clone)]
922pub struct StreamingOrchestratorStats {
923    /// Number of phases configured.
924    pub phases: usize,
925    /// Buffer size.
926    pub buffer_size: usize,
927    /// Backpressure strategy.
928    pub backpressure: BackpressureStrategy,
929}
930
931/// Resolve CoA framework from a GeneratorConfig.
932fn resolve_coa_framework_from_config(
933    config: &GeneratorConfig,
934) -> datasynth_generators::coa_generator::CoAFramework {
935    use datasynth_generators::coa_generator::CoAFramework;
936    if config.accounting_standards.enabled {
937        match config.accounting_standards.framework {
938            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
939                return CoAFramework::FrenchPcg;
940            }
941            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
942                return CoAFramework::GermanSkr04;
943            }
944            _ => {}
945        }
946    }
947    CoAFramework::UsGaap
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use datasynth_config::presets::create_preset;
954    use datasynth_config::schema::TransactionVolume;
955    use datasynth_core::models::{CoAComplexity, IndustrySector};
956
957    fn create_test_config() -> GeneratorConfig {
958        create_preset(
959            IndustrySector::Retail,
960            2,
961            3,
962            CoAComplexity::Small,
963            TransactionVolume::TenK,
964        )
965    }
966
967    #[test]
968    fn test_streaming_orchestrator_creation() {
969        let config = create_test_config();
970        let orchestrator = StreamingOrchestrator::from_generator_config(config);
971        let stats = orchestrator.stats();
972
973        assert!(stats.phases > 0);
974        assert!(stats.buffer_size > 0);
975    }
976
977    #[test]
978    fn test_streaming_generation() {
979        let mut config = create_test_config();
980        // Reduce volume for testing
981        config.master_data.vendors.count = 5;
982        config.master_data.customers.count = 5;
983        config.master_data.employees.count = 5;
984        config.global.period_months = 1;
985
986        let streaming_config = StreamingOrchestratorConfig::new(config)
987            .with_phases(vec![
988                GenerationPhase::ChartOfAccounts,
989                GenerationPhase::MasterData,
990            ])
991            .with_stream_config(StreamConfig {
992                buffer_size: 100,
993                progress_interval: 10,
994                ..Default::default()
995            });
996
997        let orchestrator = StreamingOrchestrator::new(streaming_config);
998        let (receiver, _control) = orchestrator.stream().unwrap();
999
1000        let mut items_count = 0;
1001        let mut has_coa = false;
1002        let mut has_completion = false;
1003
1004        for event in receiver {
1005            match event {
1006                StreamEvent::Data(item) => {
1007                    items_count += 1;
1008                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1009                        has_coa = true;
1010                    }
1011                }
1012                StreamEvent::Complete(_) => {
1013                    has_completion = true;
1014                    break;
1015                }
1016                _ => {}
1017            }
1018        }
1019
1020        assert!(items_count > 0);
1021        assert!(has_coa);
1022        assert!(has_completion);
1023    }
1024
1025    #[test]
1026    fn test_stream_cancellation() {
1027        let mut config = create_test_config();
1028        config.global.period_months = 12; // Longer generation
1029
1030        let streaming_config = StreamingOrchestratorConfig::new(config)
1031            .with_phases(vec![GenerationPhase::JournalEntries]);
1032
1033        let orchestrator = StreamingOrchestrator::new(streaming_config);
1034        let (receiver, control) = orchestrator.stream().unwrap();
1035
1036        // Cancel after receiving some items
1037        let mut items_count = 0;
1038        for event in receiver {
1039            if let StreamEvent::Data(_) = event {
1040                items_count += 1;
1041                if items_count >= 10 {
1042                    control.cancel();
1043                    break;
1044                }
1045            }
1046        }
1047
1048        assert!(control.is_cancelled());
1049    }
1050
1051    #[test]
1052    fn test_streaming_anomaly_injection() {
1053        let mut config = create_test_config();
1054        // Reduce volume for fast testing but keep enough entries for anomalies
1055        config.master_data.vendors.count = 3;
1056        config.master_data.customers.count = 3;
1057        config.master_data.employees.count = 3;
1058        config.global.period_months = 1;
1059
1060        // Enable anomaly injection with a high rate to guarantee some are created
1061        config.anomaly_injection.enabled = true;
1062        config.anomaly_injection.rates.total_rate = 0.20; // 20% to ensure hits
1063        config.anomaly_injection.rates.fraud_rate = 0.40;
1064        config.anomaly_injection.rates.error_rate = 0.40;
1065        config.anomaly_injection.rates.process_rate = 0.20;
1066
1067        let streaming_config = StreamingOrchestratorConfig::new(config)
1068            .with_phases(vec![GenerationPhase::JournalEntries])
1069            .with_stream_config(StreamConfig {
1070                buffer_size: 500,
1071                progress_interval: 50,
1072                ..Default::default()
1073            });
1074
1075        let orchestrator = StreamingOrchestrator::new(streaming_config);
1076        let (receiver, _control) = orchestrator.stream().unwrap();
1077
1078        let mut je_count = 0;
1079        let mut label_count = 0;
1080        let mut has_completion = false;
1081
1082        for event in receiver {
1083            match event {
1084                StreamEvent::Data(item) => match item {
1085                    GeneratedItem::JournalEntry(_) => je_count += 1,
1086                    GeneratedItem::AnomalyLabel(_) => label_count += 1,
1087                    _ => {}
1088                },
1089                StreamEvent::Complete(_) => {
1090                    has_completion = true;
1091                    break;
1092                }
1093                _ => {}
1094            }
1095        }
1096
1097        assert!(has_completion, "Stream should complete");
1098        assert!(je_count > 0, "Should generate journal entries");
1099        assert!(
1100            label_count > 0,
1101            "Should generate anomaly labels (got {} JEs, {} labels)",
1102            je_count,
1103            label_count
1104        );
1105    }
1106
1107    #[test]
1108    fn test_streaming_no_anomalies_when_disabled() {
1109        let mut config = create_test_config();
1110        config.master_data.vendors.count = 3;
1111        config.master_data.customers.count = 3;
1112        config.master_data.employees.count = 3;
1113        config.global.period_months = 1;
1114
1115        // Ensure anomaly injection is disabled
1116        config.anomaly_injection.enabled = false;
1117        config.fraud.enabled = false;
1118
1119        let streaming_config = StreamingOrchestratorConfig::new(config)
1120            .with_phases(vec![GenerationPhase::JournalEntries])
1121            .with_stream_config(StreamConfig {
1122                buffer_size: 500,
1123                progress_interval: 50,
1124                ..Default::default()
1125            });
1126
1127        let orchestrator = StreamingOrchestrator::new(streaming_config);
1128        let (receiver, _control) = orchestrator.stream().unwrap();
1129
1130        let mut label_count = 0;
1131
1132        for event in receiver {
1133            match event {
1134                StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1135                StreamEvent::Complete(_) => break,
1136                _ => {}
1137            }
1138        }
1139
1140        assert_eq!(
1141            label_count, 0,
1142            "Should not generate anomaly labels when disabled"
1143        );
1144    }
1145
1146    #[test]
1147    fn test_generated_item_type_name() {
1148        use datasynth_core::models::{CoAComplexity, IndustrySector};
1149
1150        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1151            "TEST_COA".to_string(),
1152            "Test Chart of Accounts".to_string(),
1153            "US".to_string(),
1154            IndustrySector::Manufacturing,
1155            CoAComplexity::Small,
1156        )));
1157        assert_eq!(coa.type_name(), "chart_of_accounts");
1158
1159        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1160        assert_eq!(progress.type_name(), "progress");
1161    }
1162}