Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5//!
6//! # Limitations — simplified pipeline
7//!
8//! This module implements a **simplified streaming pipeline** intended for
9//! interactive / server-side use cases where data is consumed as it is produced.
10//! It differs from the full-batch [`EnhancedOrchestrator`] in the following ways:
11//!
12//! - **Subset of generators**: Only journal entries, master data (vendors, customers,
13//!   materials, employees), document-flow documents (POs, GRs, invoices, payments,
14//!   sales orders, deliveries), and anomaly labels are streamed.  Advanced generators
15//!   (subledgers, period-close, audit, tax, ESG, banking, graph export, etc.) are
16//!   **not** invoked.
17//!
18//! - **No cross-generator state**: The streaming pipeline does not carry forward
19//!   balance tracker state, trial-balance snapshots, or depreciation runs between
20//!   periods.  Outputs are therefore not coherent across the full accounting cycle.
21//!
22//! - **No output sinks**: Records are emitted directly over the [`StreamSender`]
23//!   channel; CSV / JSON / Parquet file writing is the responsibility of the consumer.
24//!
25//! - **Backpressure model**: The orchestrator honours the [`BackpressureStrategy`]
26//!   configured on the [`StreamConfig`], but does not implement adaptive throttling
27//!   based on CPU / memory pressure (unlike the resource-guarded batch path).
28//!
29//! For full-fidelity, standards-compliant generation use
30//! [`EnhancedOrchestrator`](crate::enhanced_orchestrator::EnhancedOrchestrator) with
31//! a file-based output sink.
32//!
33//! [`EnhancedOrchestrator`]: crate::enhanced_orchestrator::EnhancedOrchestrator
34
35use std::sync::Arc;
36use std::thread;
37use std::time::Instant;
38
39use chrono::NaiveDate;
40use tracing::{info, warn};
41
42use datasynth_config::schema::GeneratorConfig;
43
44/// Default RNG seed when not specified in config.
45const DEFAULT_SEED: u64 = 42;
46use datasynth_core::error::SynthResult;
47use datasynth_core::models::{
48    documents::{
49        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
50    },
51    AnomalyRateConfig, ChartOfAccounts, Customer, Employee, JournalEntry, LabeledAnomaly, Material,
52    Vendor,
53};
54use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
55use datasynth_core::traits::{
56    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
57};
58
59/// Generated items that can be streamed.
60#[derive(Debug, Clone)]
61pub enum GeneratedItem {
62    /// Chart of Accounts.
63    ChartOfAccounts(Box<ChartOfAccounts>),
64    /// A vendor.
65    Vendor(Box<Vendor>),
66    /// A customer.
67    Customer(Box<Customer>),
68    /// A material.
69    Material(Box<Material>),
70    /// An employee.
71    Employee(Box<Employee>),
72    /// A journal entry.
73    JournalEntry(Box<JournalEntry>),
74    /// A purchase order (P2P).
75    PurchaseOrder(Box<PurchaseOrder>),
76    /// A goods receipt (P2P).
77    GoodsReceipt(Box<GoodsReceipt>),
78    /// A vendor invoice (P2P).
79    VendorInvoice(Box<VendorInvoice>),
80    /// A payment (P2P/O2C).
81    Payment(Box<Payment>),
82    /// A sales order (O2C).
83    SalesOrder(Box<SalesOrder>),
84    /// A delivery (O2C).
85    Delivery(Box<Delivery>),
86    /// A customer invoice (O2C).
87    CustomerInvoice(Box<CustomerInvoice>),
88    /// An anomaly label (injected during JE generation).
89    AnomalyLabel(Box<LabeledAnomaly>),
90    /// Progress update.
91    Progress(StreamProgress),
92    /// Phase completion marker.
93    PhaseComplete(String),
94}
95
96impl GeneratedItem {
97    /// Returns the item type name.
98    pub fn type_name(&self) -> &'static str {
99        match self {
100            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
101            GeneratedItem::Vendor(_) => "vendor",
102            GeneratedItem::Customer(_) => "customer",
103            GeneratedItem::Material(_) => "material",
104            GeneratedItem::Employee(_) => "employee",
105            GeneratedItem::JournalEntry(_) => "journal_entry",
106            GeneratedItem::PurchaseOrder(_) => "purchase_order",
107            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
108            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
109            GeneratedItem::Payment(_) => "payment",
110            GeneratedItem::SalesOrder(_) => "sales_order",
111            GeneratedItem::Delivery(_) => "delivery",
112            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
113            GeneratedItem::AnomalyLabel(_) => "anomaly_label",
114            GeneratedItem::Progress(_) => "progress",
115            GeneratedItem::PhaseComplete(_) => "phase_complete",
116        }
117    }
118}
119
120/// Phase of generation.
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum GenerationPhase {
123    /// Chart of Accounts generation.
124    ChartOfAccounts,
125    /// Master data generation (vendors, customers, etc.).
126    MasterData,
127    /// Document flow generation (P2P, O2C).
128    DocumentFlows,
129    /// OCPM event log generation.
130    OcpmEvents,
131    /// Journal entry generation.
132    JournalEntries,
133    /// Anomaly injection.
134    AnomalyInjection,
135    /// Balance validation.
136    BalanceValidation,
137    /// Data quality injection.
138    DataQuality,
139    /// Complete.
140    Complete,
141}
142
143impl GenerationPhase {
144    /// Returns the phase name.
145    pub fn name(&self) -> &'static str {
146        match self {
147            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
148            GenerationPhase::MasterData => "master_data",
149            GenerationPhase::DocumentFlows => "document_flows",
150            GenerationPhase::OcpmEvents => "ocpm_events",
151            GenerationPhase::JournalEntries => "journal_entries",
152            GenerationPhase::AnomalyInjection => "anomaly_injection",
153            GenerationPhase::BalanceValidation => "balance_validation",
154            GenerationPhase::DataQuality => "data_quality",
155            GenerationPhase::Complete => "complete",
156        }
157    }
158}
159
160/// Configuration for streaming orchestration.
161#[derive(Debug, Clone)]
162pub struct StreamingOrchestratorConfig {
163    /// Generator configuration.
164    pub generator_config: GeneratorConfig,
165    /// Stream configuration.
166    pub stream_config: StreamConfig,
167    /// Phases to execute.
168    pub phases: Vec<GenerationPhase>,
169}
170
171impl StreamingOrchestratorConfig {
172    /// Creates a new streaming orchestrator configuration.
173    pub fn new(generator_config: GeneratorConfig) -> Self {
174        Self {
175            generator_config,
176            stream_config: StreamConfig::default(),
177            phases: vec![
178                GenerationPhase::ChartOfAccounts,
179                GenerationPhase::MasterData,
180                GenerationPhase::DocumentFlows,
181                GenerationPhase::JournalEntries,
182            ],
183        }
184    }
185
186    /// Creates a configuration with all phases enabled including OCPM.
187    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
188        Self {
189            generator_config,
190            stream_config: StreamConfig::default(),
191            phases: vec![
192                GenerationPhase::ChartOfAccounts,
193                GenerationPhase::MasterData,
194                GenerationPhase::DocumentFlows,
195                GenerationPhase::OcpmEvents,
196                GenerationPhase::JournalEntries,
197                GenerationPhase::AnomalyInjection,
198                GenerationPhase::DataQuality,
199            ],
200        }
201    }
202
203    /// Sets the stream configuration.
204    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
205        self.stream_config = config;
206        self
207    }
208
209    /// Sets the phases to execute.
210    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
211        self.phases = phases;
212        self
213    }
214}
215
216/// Streaming orchestrator for real-time data generation.
217pub struct StreamingOrchestrator {
218    config: StreamingOrchestratorConfig,
219}
220
221impl StreamingOrchestrator {
222    /// Creates a new streaming orchestrator.
223    pub fn new(config: StreamingOrchestratorConfig) -> Self {
224        Self { config }
225    }
226
227    /// Creates a streaming orchestrator from generator config with defaults.
228    pub fn from_generator_config(config: GeneratorConfig) -> Self {
229        Self::new(StreamingOrchestratorConfig::new(config))
230    }
231
232    /// Starts streaming generation.
233    ///
234    /// Returns a receiver for stream events and a control handle.
235    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
236        let (sender, receiver) = stream_channel(
237            self.config.stream_config.buffer_size,
238            self.config.stream_config.backpressure,
239        );
240
241        let control = Arc::new(StreamControl::new());
242        let control_clone = Arc::clone(&control);
243
244        let config = self.config.clone();
245
246        // Spawn generation thread
247        thread::spawn(move || {
248            let result = Self::run_generation(config, sender, control_clone);
249            if let Err(e) = result {
250                warn!("Streaming generation error: {}", e);
251            }
252        });
253
254        Ok((receiver, control))
255    }
256
257    /// Runs the generation process.
258    fn run_generation(
259        config: StreamingOrchestratorConfig,
260        sender: StreamSender<GeneratedItem>,
261        control: Arc<StreamControl>,
262    ) -> SynthResult<()> {
263        let start_time = Instant::now();
264        let mut items_generated: u64 = 0;
265        let mut phases_completed = Vec::new();
266
267        // Track stats
268        let progress_interval = config.stream_config.progress_interval;
269
270        // Send initial progress
271        let mut progress = StreamProgress::new("initializing");
272        sender.send(StreamEvent::Progress(progress.clone()))?;
273
274        for phase in &config.phases {
275            if control.is_cancelled() {
276                info!("Generation cancelled");
277                break;
278            }
279
280            // Handle pause
281            while control.is_paused() {
282                std::thread::sleep(std::time::Duration::from_millis(100));
283                if control.is_cancelled() {
284                    break;
285                }
286            }
287
288            progress.phase = phase.name().to_string();
289            sender.send(StreamEvent::Progress(progress.clone()))?;
290
291            match phase {
292                GenerationPhase::ChartOfAccounts => {
293                    let result =
294                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
295                    items_generated += result;
296                }
297                GenerationPhase::MasterData => {
298                    let result = Self::generate_master_data_phase(
299                        &config.generator_config,
300                        &sender,
301                        &control,
302                    )?;
303                    items_generated += result;
304                }
305                GenerationPhase::DocumentFlows => {
306                    let result = Self::generate_document_flows_phase(
307                        &config.generator_config,
308                        &sender,
309                        &control,
310                        progress_interval,
311                        &mut progress,
312                    )?;
313                    items_generated += result;
314                }
315                GenerationPhase::OcpmEvents => {
316                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
317                }
318                GenerationPhase::JournalEntries => {
319                    let result = Self::generate_journal_entries_phase(
320                        &config.generator_config,
321                        &sender,
322                        &control,
323                        progress_interval,
324                        &mut progress,
325                    )?;
326                    items_generated += result;
327                }
328                GenerationPhase::AnomalyInjection => {
329                    info!("Anomaly injection applied inline during JE generation phase in streaming mode");
330                }
331                GenerationPhase::DataQuality => {
332                    info!(
333                        "Data quality injection is not yet supported in streaming mode; skipping"
334                    );
335                }
336                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
337                    info!("Phase {:?} is not applicable in streaming mode", phase);
338                }
339            }
340
341            // Send phase completion
342            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
343                phase.name().to_string(),
344            )))?;
345            phases_completed.push(phase.name().to_string());
346
347            // Update progress
348            progress.items_generated = items_generated;
349            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
350            if progress.elapsed_ms > 0 {
351                progress.items_per_second =
352                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
353            }
354            sender.send(StreamEvent::Progress(progress.clone()))?;
355        }
356
357        // Send completion
358        let stats = sender.stats();
359        let summary = StreamSummary {
360            total_items: items_generated,
361            total_time_ms: start_time.elapsed().as_millis() as u64,
362            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
363                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
364            } else {
365                0.0
366            },
367            error_count: 0,
368            dropped_count: stats.items_dropped,
369            peak_memory_mb: None,
370            phases_completed,
371        };
372
373        sender.send(StreamEvent::Complete(summary))?;
374        sender.close();
375
376        Ok(())
377    }
378
379    /// Generates Chart of Accounts phase.
380    fn generate_coa_phase(
381        config: &GeneratorConfig,
382        sender: &StreamSender<GeneratedItem>,
383        control: &Arc<StreamControl>,
384    ) -> SynthResult<u64> {
385        use datasynth_generators::ChartOfAccountsGenerator;
386
387        if control.is_cancelled() {
388            return Ok(0);
389        }
390
391        info!("Generating Chart of Accounts");
392        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
393        let complexity = config.chart_of_accounts.complexity;
394        let industry = config.global.industry;
395        let coa_framework = resolve_coa_framework_from_config(config);
396
397        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
398            .with_coa_framework(coa_framework);
399        let coa = coa_gen.generate();
400
401        let account_count = coa.account_count() as u64;
402        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
403            coa,
404        ))))?;
405
406        Ok(account_count)
407    }
408
409    /// Generates master data phase.
410    fn generate_master_data_phase(
411        config: &GeneratorConfig,
412        sender: &StreamSender<GeneratedItem>,
413        control: &Arc<StreamControl>,
414    ) -> SynthResult<u64> {
415        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
416
417        let mut count: u64 = 0;
418        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
419        let md_config = &config.master_data;
420        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
421            .unwrap_or_else(|e| {
422                tracing::warn!(
423                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
424                    config.global.start_date,
425                    e
426                );
427                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
428            });
429
430        let company_code = config
431            .companies
432            .first()
433            .map(|c| c.code.as_str())
434            .unwrap_or_else(|| {
435                tracing::warn!("No companies configured, defaulting to company code '1000'");
436                "1000"
437            });
438
439        // Generate vendors
440        if control.is_cancelled() {
441            return Ok(count);
442        }
443
444        info!("Generating vendors");
445        let mut vendor_gen = VendorGenerator::new(seed);
446        for _ in 0..md_config.vendors.count {
447            if control.is_cancelled() {
448                break;
449            }
450            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
451            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
452            count += 1;
453        }
454
455        // Generate customers
456        if control.is_cancelled() {
457            return Ok(count);
458        }
459
460        info!("Generating customers");
461        let mut customer_gen = CustomerGenerator::new(seed + 1);
462        for _ in 0..md_config.customers.count {
463            if control.is_cancelled() {
464                break;
465            }
466            let customer = customer_gen.generate_customer(company_code, effective_date);
467            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
468                customer,
469            ))))?;
470            count += 1;
471        }
472
473        // Generate employees
474        if control.is_cancelled() {
475            return Ok(count);
476        }
477
478        info!("Generating employees");
479        let mut employee_gen = EmployeeGenerator::new(seed + 4);
480        // Use first department from config, falling back to a default
481        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
482            datasynth_generators::DepartmentDefinition {
483                code: first_custom.code.clone(),
484                name: first_custom.name.clone(),
485                cost_center: first_custom
486                    .cost_center
487                    .clone()
488                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
489                headcount: 10,
490                system_roles: vec![],
491                transaction_codes: vec![],
492            }
493        } else {
494            warn!("No departments configured, using default 'General' department");
495            datasynth_generators::DepartmentDefinition {
496                code: "1000".to_string(),
497                name: "General".to_string(),
498                cost_center: "CC1000".to_string(),
499                headcount: 10,
500                system_roles: vec![],
501                transaction_codes: vec![],
502            }
503        };
504        for _ in 0..md_config.employees.count {
505            if control.is_cancelled() {
506                break;
507            }
508            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
509            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
510                employee,
511            ))))?;
512            count += 1;
513        }
514
515        Ok(count)
516    }
517
518    /// Generates journal entries phase.
519    ///
520    /// Note: This is a simplified version that generates basic journal entries.
521    /// For full-featured generation with all options, use EnhancedOrchestrator.
522    ///
523    /// When anomaly injection is enabled in config, anomalies are applied inline
524    /// to each batch of generated JEs before streaming them out.
525    fn generate_journal_entries_phase(
526        config: &GeneratorConfig,
527        sender: &StreamSender<GeneratedItem>,
528        control: &Arc<StreamControl>,
529        progress_interval: u64,
530        progress: &mut StreamProgress,
531    ) -> SynthResult<u64> {
532        use datasynth_generators::{
533            AnomalyInjector, AnomalyInjectorConfig, ChartOfAccountsGenerator,
534            EnhancedInjectionConfig, JournalEntryGenerator,
535        };
536        use std::sync::Arc;
537
538        let mut count: u64 = 0;
539        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
540
541        // Calculate total entries to generate based on volume weights
542        let default_monthly = 500;
543        let total_entries: usize = config
544            .companies
545            .iter()
546            .map(|c| {
547                let monthly = (c.volume_weight * default_monthly as f64) as usize;
548                monthly.max(100) * config.global.period_months as usize
549            })
550            .sum();
551
552        progress.items_remaining = Some(total_entries as u64);
553        info!("Generating {} journal entries", total_entries);
554
555        // Generate a shared CoA for all companies
556        let complexity = config.chart_of_accounts.complexity;
557        let industry = config.global.industry;
558        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
559        let coa = Arc::new(coa_gen.generate());
560
561        // Parse start date
562        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
563            .unwrap_or_else(|e| {
564                tracing::warn!(
565                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
566                    config.global.start_date,
567                    e
568                );
569                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
570            });
571        let end_date = start_date
572            .checked_add_months(chrono::Months::new(config.global.period_months))
573            .unwrap_or(start_date + chrono::Duration::days(365));
574
575        // Create JE generator from config
576        let mut je_gen = JournalEntryGenerator::from_generator_config(
577            config,
578            Arc::clone(&coa),
579            start_date,
580            end_date,
581            seed,
582        );
583        // P2 (multi-currency): per-entity functional currency on the document header.
584        let company_currencies: std::collections::HashMap<String, String> = config
585            .companies
586            .iter()
587            .map(|c| {
588                (
589                    c.code.clone(),
590                    c.functional_currency
591                        .clone()
592                        .unwrap_or_else(|| c.currency.clone()),
593                )
594            })
595            .collect();
596        je_gen = je_gen.with_company_currencies(company_currencies);
597
598        // Create anomaly injector if enabled.
599        // Priority: anomaly_injection config > fraud config
600        let anomaly_enabled = config.anomaly_injection.enabled || config.fraud.enabled;
601        let mut anomaly_injector = if anomaly_enabled {
602            let total_rate = if config.anomaly_injection.enabled {
603                config.anomaly_injection.rates.total_rate
604            } else {
605                config.fraud.fraud_rate
606            };
607            let fraud_rate = if config.anomaly_injection.enabled {
608                config.anomaly_injection.rates.fraud_rate
609            } else {
610                AnomalyRateConfig::default().fraud_rate
611            };
612            let error_rate = if config.anomaly_injection.enabled {
613                config.anomaly_injection.rates.error_rate
614            } else {
615                AnomalyRateConfig::default().error_rate
616            };
617            let process_issue_rate = if config.anomaly_injection.enabled {
618                config.anomaly_injection.rates.process_rate
619            } else {
620                AnomalyRateConfig::default().process_issue_rate
621            };
622
623            let injector_config = AnomalyInjectorConfig {
624                rates: AnomalyRateConfig {
625                    total_rate,
626                    fraud_rate,
627                    error_rate,
628                    process_issue_rate,
629                    ..Default::default()
630                },
631                enhanced: EnhancedInjectionConfig {
632                    fraud_behavioral_bias: config.fraud.effective_bias().to_core(),
633                    // Persistent fraud campaigns (A1) — off unless config opts in.
634                    fraud_campaign: config.fraud.campaigns.clone(),
635                    ..Default::default()
636                },
637                seed: seed + 5000,
638                ..Default::default()
639            };
640
641            info!(
642                "Anomaly injection enabled for streaming JE phase (total_rate={:.3})",
643                total_rate
644            );
645            Some(AnomalyInjector::new(injector_config))
646        } else {
647            None
648        };
649
650        // Generate JEs in batches when anomaly injection is active,
651        // or one-by-one when it is not.
652        let batch_size: usize = if anomaly_injector.is_some() { 100 } else { 1 };
653        let mut remaining = total_entries;
654
655        while remaining > 0 {
656            if control.is_cancelled() {
657                break;
658            }
659
660            let current_batch = remaining.min(batch_size);
661            let mut batch: Vec<JournalEntry> = Vec::with_capacity(current_batch);
662
663            for _ in 0..current_batch {
664                if control.is_cancelled() {
665                    break;
666                }
667
668                // Handle pause
669                while control.is_paused() {
670                    std::thread::sleep(std::time::Duration::from_millis(100));
671                    if control.is_cancelled() {
672                        break;
673                    }
674                }
675
676                batch.push(je_gen.generate());
677            }
678
679            if batch.is_empty() {
680                break;
681            }
682
683            // Apply anomaly injection to the batch if enabled
684            if let Some(ref mut injector) = anomaly_injector {
685                let result = injector.process_entries(&mut batch);
686
687                // Stream any generated anomaly labels
688                for label in result.labels {
689                    sender.send(StreamEvent::Data(GeneratedItem::AnomalyLabel(Box::new(
690                        label,
691                    ))))?;
692                }
693            }
694
695            // Send the (possibly mutated) JEs
696            for je in batch {
697                sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
698                count += 1;
699
700                // Send progress updates
701                if count.is_multiple_of(progress_interval) {
702                    progress.items_generated = count;
703                    progress.items_remaining = Some(total_entries as u64 - count);
704                    sender.send(StreamEvent::Progress(progress.clone()))?;
705                }
706            }
707
708            remaining = remaining.saturating_sub(current_batch);
709        }
710
711        Ok(count)
712    }
713
714    /// Generates document flows phase (P2P and O2C).
715    ///
716    /// Creates complete document chains:
717    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
718    /// - O2C: SalesOrder → Delivery → CustomerInvoice
719    fn generate_document_flows_phase(
720        config: &GeneratorConfig,
721        sender: &StreamSender<GeneratedItem>,
722        control: &Arc<StreamControl>,
723        progress_interval: u64,
724        progress: &mut StreamProgress,
725    ) -> SynthResult<u64> {
726        use chrono::Datelike;
727        use datasynth_generators::{
728            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
729        };
730
731        let mut count: u64 = 0;
732        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
733        let df_config = &config.document_flows;
734        let md_config = &config.master_data;
735
736        // Parse dates
737        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
738            .unwrap_or_else(|e| {
739                tracing::warn!(
740                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
741                    config.global.start_date,
742                    e
743                );
744                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
745            });
746        let end_date = start_date
747            .checked_add_months(chrono::Months::new(config.global.period_months))
748            .unwrap_or(start_date + chrono::Duration::days(365));
749        let total_period_days = (end_date - start_date).num_days().max(1);
750
751        let company_code = config
752            .companies
753            .first()
754            .map(|c| c.code.as_str())
755            .unwrap_or_else(|| {
756                tracing::warn!("No companies configured, defaulting to company code '1000'");
757                "1000"
758            });
759
760        // Use master data config counts for generating reference data
761        let vendor_count = md_config.vendors.count.min(100);
762        let customer_count = md_config.customers.count.min(100);
763        let material_count = md_config.materials.count.min(50);
764
765        // Generate some master data for document flows
766        let mut vendor_gen = VendorGenerator::new(seed);
767        let mut customer_gen = CustomerGenerator::new(seed + 1);
768        let mut material_gen = MaterialGenerator::new(seed + 2);
769
770        let vendors: Vec<_> = (0..vendor_count)
771            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
772            .collect();
773
774        let customers: Vec<_> = (0..customer_count)
775            .map(|_| customer_gen.generate_customer(company_code, start_date))
776            .collect();
777
778        let materials: Vec<_> = (0..material_count)
779            .map(|_| material_gen.generate_material(company_code, start_date))
780            .collect();
781
782        // Determine number of chains based on transaction volume
783        // Use period months as a multiplier for document chains
784        let base_chains = (config.global.period_months as usize * 50).max(100);
785
786        // P2P Generation
787        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
788            info!("Generating P2P document flows");
789            let mut p2p_gen = P2PGenerator::new(seed + 100);
790
791            let chains_to_generate = base_chains.min(1000);
792            progress.items_remaining = Some(chains_to_generate as u64);
793
794            for i in 0..chains_to_generate {
795                if control.is_cancelled() {
796                    break;
797                }
798
799                // Handle pause
800                while control.is_paused() {
801                    std::thread::sleep(std::time::Duration::from_millis(100));
802                    if control.is_cancelled() {
803                        break;
804                    }
805                }
806
807                let vendor = &vendors[i % vendors.len()];
808                let material_refs: Vec<&datasynth_core::models::Material> =
809                    vec![&materials[i % materials.len()]];
810
811                // Calculate posting date within the period
812                let days_offset = (i as i64 % total_period_days).max(0);
813                let po_date = start_date + chrono::Duration::days(days_offset);
814                let fiscal_year = po_date.year() as u16;
815                let fiscal_period = po_date.month() as u8;
816
817                let chain = p2p_gen.generate_chain(
818                    company_code,
819                    vendor,
820                    &material_refs,
821                    po_date,
822                    fiscal_year,
823                    fiscal_period,
824                    "SYSTEM",
825                );
826
827                // Send each document in the chain
828                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
829                    chain.purchase_order,
830                ))))?;
831                count += 1;
832
833                for gr in chain.goods_receipts {
834                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
835                    count += 1;
836                }
837
838                if let Some(vi) = chain.vendor_invoice {
839                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
840                        vi,
841                    ))))?;
842                    count += 1;
843                }
844
845                if let Some(payment) = chain.payment {
846                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
847                    count += 1;
848                }
849
850                if count.is_multiple_of(progress_interval) {
851                    progress.items_generated = count;
852                    sender.send(StreamEvent::Progress(progress.clone()))?;
853                }
854            }
855        }
856
857        // O2C Generation
858        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
859            info!("Generating O2C document flows");
860            let mut o2c_gen = O2CGenerator::new(seed + 200);
861
862            let chains_to_generate = base_chains.min(1000);
863
864            for i in 0..chains_to_generate {
865                if control.is_cancelled() {
866                    break;
867                }
868
869                while control.is_paused() {
870                    std::thread::sleep(std::time::Duration::from_millis(100));
871                    if control.is_cancelled() {
872                        break;
873                    }
874                }
875
876                let customer = &customers[i % customers.len()];
877                let material_refs: Vec<&datasynth_core::models::Material> =
878                    vec![&materials[i % materials.len()]];
879
880                let days_offset = (i as i64 % total_period_days).max(0);
881                let so_date = start_date + chrono::Duration::days(days_offset);
882                let fiscal_year = so_date.year() as u16;
883                let fiscal_period = so_date.month() as u8;
884
885                let chain = o2c_gen.generate_chain(
886                    company_code,
887                    customer,
888                    &material_refs,
889                    so_date,
890                    fiscal_year,
891                    fiscal_period,
892                    "SYSTEM",
893                );
894
895                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
896                    chain.sales_order,
897                ))))?;
898                count += 1;
899
900                for delivery in chain.deliveries {
901                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
902                        delivery,
903                    ))))?;
904                    count += 1;
905                }
906
907                if let Some(ci) = chain.customer_invoice {
908                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
909                        ci,
910                    ))))?;
911                    count += 1;
912                }
913
914                if count.is_multiple_of(progress_interval) {
915                    progress.items_generated = count;
916                    sender.send(StreamEvent::Progress(progress.clone()))?;
917                }
918            }
919        }
920
921        Ok(count)
922    }
923
924    /// Returns the orchestrator configuration stats.
925    pub fn stats(&self) -> StreamingOrchestratorStats {
926        StreamingOrchestratorStats {
927            phases: self.config.phases.len(),
928            buffer_size: self.config.stream_config.buffer_size,
929            backpressure: self.config.stream_config.backpressure,
930        }
931    }
932}
933
934/// Statistics for the streaming orchestrator.
935#[derive(Debug, Clone)]
936pub struct StreamingOrchestratorStats {
937    /// Number of phases configured.
938    pub phases: usize,
939    /// Buffer size.
940    pub buffer_size: usize,
941    /// Backpressure strategy.
942    pub backpressure: BackpressureStrategy,
943}
944
945/// Resolve CoA framework from a GeneratorConfig.
946fn resolve_coa_framework_from_config(
947    config: &GeneratorConfig,
948) -> datasynth_generators::coa_generator::CoAFramework {
949    use datasynth_generators::coa_generator::CoAFramework;
950    if config.accounting_standards.enabled {
951        match config.accounting_standards.framework {
952            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
953                return CoAFramework::FrenchPcg;
954            }
955            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
956                return CoAFramework::GermanSkr04;
957            }
958            _ => {}
959        }
960    }
961    CoAFramework::UsGaap
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967    use datasynth_config::presets::create_preset;
968    use datasynth_config::schema::TransactionVolume;
969    use datasynth_core::models::{CoAComplexity, IndustrySector};
970
971    fn create_test_config() -> GeneratorConfig {
972        create_preset(
973            IndustrySector::Retail,
974            2,
975            3,
976            CoAComplexity::Small,
977            TransactionVolume::TenK,
978        )
979    }
980
981    #[test]
982    fn test_streaming_orchestrator_creation() {
983        let config = create_test_config();
984        let orchestrator = StreamingOrchestrator::from_generator_config(config);
985        let stats = orchestrator.stats();
986
987        assert!(stats.phases > 0);
988        assert!(stats.buffer_size > 0);
989    }
990
991    #[test]
992    fn test_streaming_generation() {
993        let mut config = create_test_config();
994        // Reduce volume for testing
995        config.master_data.vendors.count = 5;
996        config.master_data.customers.count = 5;
997        config.master_data.employees.count = 5;
998        config.global.period_months = 1;
999
1000        let streaming_config = StreamingOrchestratorConfig::new(config)
1001            .with_phases(vec![
1002                GenerationPhase::ChartOfAccounts,
1003                GenerationPhase::MasterData,
1004            ])
1005            .with_stream_config(StreamConfig {
1006                buffer_size: 100,
1007                progress_interval: 10,
1008                ..Default::default()
1009            });
1010
1011        let orchestrator = StreamingOrchestrator::new(streaming_config);
1012        let (receiver, _control) = orchestrator.stream().unwrap();
1013
1014        let mut items_count = 0;
1015        let mut has_coa = false;
1016        let mut has_completion = false;
1017
1018        for event in receiver {
1019            match event {
1020                StreamEvent::Data(item) => {
1021                    items_count += 1;
1022                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
1023                        has_coa = true;
1024                    }
1025                }
1026                StreamEvent::Complete(_) => {
1027                    has_completion = true;
1028                    break;
1029                }
1030                _ => {}
1031            }
1032        }
1033
1034        assert!(items_count > 0);
1035        assert!(has_coa);
1036        assert!(has_completion);
1037    }
1038
1039    #[test]
1040    fn test_stream_cancellation() {
1041        let mut config = create_test_config();
1042        config.global.period_months = 12; // Longer generation
1043
1044        let streaming_config = StreamingOrchestratorConfig::new(config)
1045            .with_phases(vec![GenerationPhase::JournalEntries]);
1046
1047        let orchestrator = StreamingOrchestrator::new(streaming_config);
1048        let (receiver, control) = orchestrator.stream().unwrap();
1049
1050        // Cancel after receiving some items
1051        let mut items_count = 0;
1052        for event in receiver {
1053            if let StreamEvent::Data(_) = event {
1054                items_count += 1;
1055                if items_count >= 10 {
1056                    control.cancel();
1057                    break;
1058                }
1059            }
1060        }
1061
1062        assert!(control.is_cancelled());
1063    }
1064
1065    #[test]
1066    fn test_streaming_anomaly_injection() {
1067        let mut config = create_test_config();
1068        // Reduce volume for fast testing but keep enough entries for anomalies
1069        config.master_data.vendors.count = 3;
1070        config.master_data.customers.count = 3;
1071        config.master_data.employees.count = 3;
1072        config.global.period_months = 1;
1073
1074        // Enable anomaly injection with a high rate to guarantee some are created
1075        config.anomaly_injection.enabled = true;
1076        config.anomaly_injection.rates.total_rate = 0.20; // 20% to ensure hits
1077        config.anomaly_injection.rates.fraud_rate = 0.40;
1078        config.anomaly_injection.rates.error_rate = 0.40;
1079        config.anomaly_injection.rates.process_rate = 0.20;
1080
1081        let streaming_config = StreamingOrchestratorConfig::new(config)
1082            .with_phases(vec![GenerationPhase::JournalEntries])
1083            .with_stream_config(StreamConfig {
1084                buffer_size: 500,
1085                progress_interval: 50,
1086                ..Default::default()
1087            });
1088
1089        let orchestrator = StreamingOrchestrator::new(streaming_config);
1090        let (receiver, _control) = orchestrator.stream().unwrap();
1091
1092        let mut je_count = 0;
1093        let mut label_count = 0;
1094        let mut has_completion = false;
1095
1096        for event in receiver {
1097            match event {
1098                StreamEvent::Data(item) => match item {
1099                    GeneratedItem::JournalEntry(_) => je_count += 1,
1100                    GeneratedItem::AnomalyLabel(_) => label_count += 1,
1101                    _ => {}
1102                },
1103                StreamEvent::Complete(_) => {
1104                    has_completion = true;
1105                    break;
1106                }
1107                _ => {}
1108            }
1109        }
1110
1111        assert!(has_completion, "Stream should complete");
1112        assert!(je_count > 0, "Should generate journal entries");
1113        assert!(
1114            label_count > 0,
1115            "Should generate anomaly labels (got {} JEs, {} labels)",
1116            je_count,
1117            label_count
1118        );
1119    }
1120
1121    #[test]
1122    fn test_streaming_no_anomalies_when_disabled() {
1123        let mut config = create_test_config();
1124        config.master_data.vendors.count = 3;
1125        config.master_data.customers.count = 3;
1126        config.master_data.employees.count = 3;
1127        config.global.period_months = 1;
1128
1129        // Ensure anomaly injection is disabled
1130        config.anomaly_injection.enabled = false;
1131        config.fraud.enabled = false;
1132
1133        let streaming_config = StreamingOrchestratorConfig::new(config)
1134            .with_phases(vec![GenerationPhase::JournalEntries])
1135            .with_stream_config(StreamConfig {
1136                buffer_size: 500,
1137                progress_interval: 50,
1138                ..Default::default()
1139            });
1140
1141        let orchestrator = StreamingOrchestrator::new(streaming_config);
1142        let (receiver, _control) = orchestrator.stream().unwrap();
1143
1144        let mut label_count = 0;
1145
1146        for event in receiver {
1147            match event {
1148                StreamEvent::Data(GeneratedItem::AnomalyLabel(_)) => label_count += 1,
1149                StreamEvent::Complete(_) => break,
1150                _ => {}
1151            }
1152        }
1153
1154        assert_eq!(
1155            label_count, 0,
1156            "Should not generate anomaly labels when disabled"
1157        );
1158    }
1159
1160    #[test]
1161    fn test_generated_item_type_name() {
1162        use datasynth_core::models::{CoAComplexity, IndustrySector};
1163
1164        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
1165            "TEST_COA".to_string(),
1166            "Test Chart of Accounts".to_string(),
1167            "US".to_string(),
1168            IndustrySector::Manufacturing,
1169            CoAComplexity::Small,
1170        )));
1171        assert_eq!(coa.type_name(), "chart_of_accounts");
1172
1173        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
1174        assert_eq!(progress.type_name(), "progress");
1175    }
1176}