Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14
15/// Default RNG seed when not specified in config.
16const DEFAULT_SEED: u64 = 42;
17use datasynth_core::error::SynthResult;
18use datasynth_core::models::{
19    documents::{
20        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
21    },
22    ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
23};
24use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
25use datasynth_core::traits::{
26    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
27};
28
29/// Generated items that can be streamed.
30#[derive(Debug, Clone)]
31pub enum GeneratedItem {
32    /// Chart of Accounts.
33    ChartOfAccounts(Box<ChartOfAccounts>),
34    /// A vendor.
35    Vendor(Box<Vendor>),
36    /// A customer.
37    Customer(Box<Customer>),
38    /// A material.
39    Material(Box<Material>),
40    /// An employee.
41    Employee(Box<Employee>),
42    /// A journal entry.
43    JournalEntry(Box<JournalEntry>),
44    /// A purchase order (P2P).
45    PurchaseOrder(Box<PurchaseOrder>),
46    /// A goods receipt (P2P).
47    GoodsReceipt(Box<GoodsReceipt>),
48    /// A vendor invoice (P2P).
49    VendorInvoice(Box<VendorInvoice>),
50    /// A payment (P2P/O2C).
51    Payment(Box<Payment>),
52    /// A sales order (O2C).
53    SalesOrder(Box<SalesOrder>),
54    /// A delivery (O2C).
55    Delivery(Box<Delivery>),
56    /// A customer invoice (O2C).
57    CustomerInvoice(Box<CustomerInvoice>),
58    /// Progress update.
59    Progress(StreamProgress),
60    /// Phase completion marker.
61    PhaseComplete(String),
62}
63
64impl GeneratedItem {
65    /// Returns the item type name.
66    pub fn type_name(&self) -> &'static str {
67        match self {
68            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
69            GeneratedItem::Vendor(_) => "vendor",
70            GeneratedItem::Customer(_) => "customer",
71            GeneratedItem::Material(_) => "material",
72            GeneratedItem::Employee(_) => "employee",
73            GeneratedItem::JournalEntry(_) => "journal_entry",
74            GeneratedItem::PurchaseOrder(_) => "purchase_order",
75            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
76            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
77            GeneratedItem::Payment(_) => "payment",
78            GeneratedItem::SalesOrder(_) => "sales_order",
79            GeneratedItem::Delivery(_) => "delivery",
80            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
81            GeneratedItem::Progress(_) => "progress",
82            GeneratedItem::PhaseComplete(_) => "phase_complete",
83        }
84    }
85}
86
87/// Phase of generation.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum GenerationPhase {
90    /// Chart of Accounts generation.
91    ChartOfAccounts,
92    /// Master data generation (vendors, customers, etc.).
93    MasterData,
94    /// Document flow generation (P2P, O2C).
95    DocumentFlows,
96    /// OCPM event log generation.
97    OcpmEvents,
98    /// Journal entry generation.
99    JournalEntries,
100    /// Anomaly injection.
101    AnomalyInjection,
102    /// Balance validation.
103    BalanceValidation,
104    /// Data quality injection.
105    DataQuality,
106    /// Complete.
107    Complete,
108}
109
110impl GenerationPhase {
111    /// Returns the phase name.
112    pub fn name(&self) -> &'static str {
113        match self {
114            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
115            GenerationPhase::MasterData => "master_data",
116            GenerationPhase::DocumentFlows => "document_flows",
117            GenerationPhase::OcpmEvents => "ocpm_events",
118            GenerationPhase::JournalEntries => "journal_entries",
119            GenerationPhase::AnomalyInjection => "anomaly_injection",
120            GenerationPhase::BalanceValidation => "balance_validation",
121            GenerationPhase::DataQuality => "data_quality",
122            GenerationPhase::Complete => "complete",
123        }
124    }
125}
126
127/// Configuration for streaming orchestration.
128#[derive(Debug, Clone)]
129pub struct StreamingOrchestratorConfig {
130    /// Generator configuration.
131    pub generator_config: GeneratorConfig,
132    /// Stream configuration.
133    pub stream_config: StreamConfig,
134    /// Phases to execute.
135    pub phases: Vec<GenerationPhase>,
136}
137
138impl StreamingOrchestratorConfig {
139    /// Creates a new streaming orchestrator configuration.
140    pub fn new(generator_config: GeneratorConfig) -> Self {
141        Self {
142            generator_config,
143            stream_config: StreamConfig::default(),
144            phases: vec![
145                GenerationPhase::ChartOfAccounts,
146                GenerationPhase::MasterData,
147                GenerationPhase::DocumentFlows,
148                GenerationPhase::JournalEntries,
149            ],
150        }
151    }
152
153    /// Creates a configuration with all phases enabled including OCPM.
154    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
155        Self {
156            generator_config,
157            stream_config: StreamConfig::default(),
158            phases: vec![
159                GenerationPhase::ChartOfAccounts,
160                GenerationPhase::MasterData,
161                GenerationPhase::DocumentFlows,
162                GenerationPhase::OcpmEvents,
163                GenerationPhase::JournalEntries,
164                GenerationPhase::AnomalyInjection,
165                GenerationPhase::DataQuality,
166            ],
167        }
168    }
169
170    /// Sets the stream configuration.
171    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
172        self.stream_config = config;
173        self
174    }
175
176    /// Sets the phases to execute.
177    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
178        self.phases = phases;
179        self
180    }
181}
182
183/// Streaming orchestrator for real-time data generation.
184pub struct StreamingOrchestrator {
185    config: StreamingOrchestratorConfig,
186}
187
188impl StreamingOrchestrator {
189    /// Creates a new streaming orchestrator.
190    pub fn new(config: StreamingOrchestratorConfig) -> Self {
191        Self { config }
192    }
193
194    /// Creates a streaming orchestrator from generator config with defaults.
195    pub fn from_generator_config(config: GeneratorConfig) -> Self {
196        Self::new(StreamingOrchestratorConfig::new(config))
197    }
198
199    /// Starts streaming generation.
200    ///
201    /// Returns a receiver for stream events and a control handle.
202    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
203        let (sender, receiver) = stream_channel(
204            self.config.stream_config.buffer_size,
205            self.config.stream_config.backpressure,
206        );
207
208        let control = Arc::new(StreamControl::new());
209        let control_clone = Arc::clone(&control);
210
211        let config = self.config.clone();
212
213        // Spawn generation thread
214        thread::spawn(move || {
215            let result = Self::run_generation(config, sender, control_clone);
216            if let Err(e) = result {
217                warn!("Streaming generation error: {}", e);
218            }
219        });
220
221        Ok((receiver, control))
222    }
223
224    /// Runs the generation process.
225    fn run_generation(
226        config: StreamingOrchestratorConfig,
227        sender: StreamSender<GeneratedItem>,
228        control: Arc<StreamControl>,
229    ) -> SynthResult<()> {
230        let start_time = Instant::now();
231        let mut items_generated: u64 = 0;
232        let mut phases_completed = Vec::new();
233
234        // Track stats
235        let progress_interval = config.stream_config.progress_interval;
236
237        // Send initial progress
238        let mut progress = StreamProgress::new("initializing");
239        sender.send(StreamEvent::Progress(progress.clone()))?;
240
241        for phase in &config.phases {
242            if control.is_cancelled() {
243                info!("Generation cancelled");
244                break;
245            }
246
247            // Handle pause
248            while control.is_paused() {
249                std::thread::sleep(std::time::Duration::from_millis(100));
250                if control.is_cancelled() {
251                    break;
252                }
253            }
254
255            progress.phase = phase.name().to_string();
256            sender.send(StreamEvent::Progress(progress.clone()))?;
257
258            match phase {
259                GenerationPhase::ChartOfAccounts => {
260                    let result =
261                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
262                    items_generated += result;
263                }
264                GenerationPhase::MasterData => {
265                    let result = Self::generate_master_data_phase(
266                        &config.generator_config,
267                        &sender,
268                        &control,
269                    )?;
270                    items_generated += result;
271                }
272                GenerationPhase::DocumentFlows => {
273                    let result = Self::generate_document_flows_phase(
274                        &config.generator_config,
275                        &sender,
276                        &control,
277                        progress_interval,
278                        &mut progress,
279                    )?;
280                    items_generated += result;
281                }
282                GenerationPhase::OcpmEvents => {
283                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
284                }
285                GenerationPhase::JournalEntries => {
286                    let result = Self::generate_journal_entries_phase(
287                        &config.generator_config,
288                        &sender,
289                        &control,
290                        progress_interval,
291                        &mut progress,
292                    )?;
293                    items_generated += result;
294                }
295                GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
296                    warn!(
297                        "Phase {:?} requires post-processing of existing data and is skipped in streaming mode",
298                        phase
299                    );
300                }
301                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
302                    info!("Phase {:?} is not applicable in streaming mode", phase);
303                }
304            }
305
306            // Send phase completion
307            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308                phase.name().to_string(),
309            )))?;
310            phases_completed.push(phase.name().to_string());
311
312            // Update progress
313            progress.items_generated = items_generated;
314            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315            if progress.elapsed_ms > 0 {
316                progress.items_per_second =
317                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318            }
319            sender.send(StreamEvent::Progress(progress.clone()))?;
320        }
321
322        // Send completion
323        let stats = sender.stats();
324        let summary = StreamSummary {
325            total_items: items_generated,
326            total_time_ms: start_time.elapsed().as_millis() as u64,
327            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329            } else {
330                0.0
331            },
332            error_count: 0,
333            dropped_count: stats.items_dropped,
334            peak_memory_mb: None,
335            phases_completed,
336        };
337
338        sender.send(StreamEvent::Complete(summary))?;
339        sender.close();
340
341        Ok(())
342    }
343
344    /// Generates Chart of Accounts phase.
345    fn generate_coa_phase(
346        config: &GeneratorConfig,
347        sender: &StreamSender<GeneratedItem>,
348        control: &Arc<StreamControl>,
349    ) -> SynthResult<u64> {
350        use datasynth_generators::ChartOfAccountsGenerator;
351
352        if control.is_cancelled() {
353            return Ok(0);
354        }
355
356        info!("Generating Chart of Accounts");
357        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
358        let complexity = config.chart_of_accounts.complexity;
359        let industry = config.global.industry;
360        let coa_framework = resolve_coa_framework_from_config(config);
361
362        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
363            .with_coa_framework(coa_framework);
364        let coa = coa_gen.generate();
365
366        let account_count = coa.account_count() as u64;
367        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
368            coa,
369        ))))?;
370
371        Ok(account_count)
372    }
373
374    /// Generates master data phase.
375    fn generate_master_data_phase(
376        config: &GeneratorConfig,
377        sender: &StreamSender<GeneratedItem>,
378        control: &Arc<StreamControl>,
379    ) -> SynthResult<u64> {
380        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
381
382        let mut count: u64 = 0;
383        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
384        let md_config = &config.master_data;
385        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
386            .unwrap_or_else(|e| {
387                tracing::warn!(
388                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
389                    config.global.start_date,
390                    e
391                );
392                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
393            });
394
395        let company_code = config
396            .companies
397            .first()
398            .map(|c| c.code.as_str())
399            .unwrap_or_else(|| {
400                tracing::warn!("No companies configured, defaulting to company code '1000'");
401                "1000"
402            });
403
404        // Generate vendors
405        if control.is_cancelled() {
406            return Ok(count);
407        }
408
409        info!("Generating vendors");
410        let mut vendor_gen = VendorGenerator::new(seed);
411        for _ in 0..md_config.vendors.count {
412            if control.is_cancelled() {
413                break;
414            }
415            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
416            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
417            count += 1;
418        }
419
420        // Generate customers
421        if control.is_cancelled() {
422            return Ok(count);
423        }
424
425        info!("Generating customers");
426        let mut customer_gen = CustomerGenerator::new(seed + 1);
427        for _ in 0..md_config.customers.count {
428            if control.is_cancelled() {
429                break;
430            }
431            let customer = customer_gen.generate_customer(company_code, effective_date);
432            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
433                customer,
434            ))))?;
435            count += 1;
436        }
437
438        // Generate employees
439        if control.is_cancelled() {
440            return Ok(count);
441        }
442
443        info!("Generating employees");
444        let mut employee_gen = EmployeeGenerator::new(seed + 4);
445        // Use first department from config, falling back to a default
446        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
447            datasynth_generators::DepartmentDefinition {
448                code: first_custom.code.clone(),
449                name: first_custom.name.clone(),
450                cost_center: first_custom
451                    .cost_center
452                    .clone()
453                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
454                headcount: 10,
455                system_roles: vec![],
456                transaction_codes: vec![],
457            }
458        } else {
459            warn!("No departments configured, using default 'General' department");
460            datasynth_generators::DepartmentDefinition {
461                code: "1000".to_string(),
462                name: "General".to_string(),
463                cost_center: "CC1000".to_string(),
464                headcount: 10,
465                system_roles: vec![],
466                transaction_codes: vec![],
467            }
468        };
469        for _ in 0..md_config.employees.count {
470            if control.is_cancelled() {
471                break;
472            }
473            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
474            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
475                employee,
476            ))))?;
477            count += 1;
478        }
479
480        Ok(count)
481    }
482
483    /// Generates journal entries phase.
484    ///
485    /// Note: This is a simplified version that generates basic journal entries.
486    /// For full-featured generation with all options, use EnhancedOrchestrator.
487    fn generate_journal_entries_phase(
488        config: &GeneratorConfig,
489        sender: &StreamSender<GeneratedItem>,
490        control: &Arc<StreamControl>,
491        progress_interval: u64,
492        progress: &mut StreamProgress,
493    ) -> SynthResult<u64> {
494        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
495        use std::sync::Arc;
496
497        let mut count: u64 = 0;
498        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
499
500        // Calculate total entries to generate based on volume weights
501        let default_monthly = 500;
502        let total_entries: usize = config
503            .companies
504            .iter()
505            .map(|c| {
506                let monthly = (c.volume_weight * default_monthly as f64) as usize;
507                monthly.max(100) * config.global.period_months as usize
508            })
509            .sum();
510
511        progress.items_remaining = Some(total_entries as u64);
512        info!("Generating {} journal entries", total_entries);
513
514        // Generate a shared CoA for all companies
515        let complexity = config.chart_of_accounts.complexity;
516        let industry = config.global.industry;
517        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
518        let coa = Arc::new(coa_gen.generate());
519
520        // Parse start date
521        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
522            .unwrap_or_else(|e| {
523                tracing::warn!(
524                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
525                    config.global.start_date,
526                    e
527                );
528                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
529            });
530        let end_date = start_date
531            .checked_add_months(chrono::Months::new(config.global.period_months))
532            .unwrap_or(start_date + chrono::Duration::days(365));
533
534        // Create JE generator from config
535        let mut je_gen = JournalEntryGenerator::from_generator_config(
536            config,
537            Arc::clone(&coa),
538            start_date,
539            end_date,
540            seed,
541        );
542
543        for _ in 0..total_entries {
544            if control.is_cancelled() {
545                break;
546            }
547
548            // Handle pause
549            while control.is_paused() {
550                std::thread::sleep(std::time::Duration::from_millis(100));
551                if control.is_cancelled() {
552                    break;
553                }
554            }
555
556            let je = je_gen.generate();
557            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
558            count += 1;
559
560            // Send progress updates
561            if count.is_multiple_of(progress_interval) {
562                progress.items_generated = count;
563                progress.items_remaining = Some(total_entries as u64 - count);
564                sender.send(StreamEvent::Progress(progress.clone()))?;
565            }
566        }
567
568        Ok(count)
569    }
570
571    /// Generates document flows phase (P2P and O2C).
572    ///
573    /// Creates complete document chains:
574    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
575    /// - O2C: SalesOrder → Delivery → CustomerInvoice
576    fn generate_document_flows_phase(
577        config: &GeneratorConfig,
578        sender: &StreamSender<GeneratedItem>,
579        control: &Arc<StreamControl>,
580        progress_interval: u64,
581        progress: &mut StreamProgress,
582    ) -> SynthResult<u64> {
583        use chrono::Datelike;
584        use datasynth_generators::{
585            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
586        };
587
588        let mut count: u64 = 0;
589        let seed = config.global.seed.unwrap_or(DEFAULT_SEED);
590        let df_config = &config.document_flows;
591        let md_config = &config.master_data;
592
593        // Parse dates
594        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
595            .unwrap_or_else(|e| {
596                tracing::warn!(
597                    "Failed to parse start_date '{}': {}. Defaulting to 2024-01-01",
598                    config.global.start_date,
599                    e
600                );
601                NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date")
602            });
603        let end_date = start_date
604            .checked_add_months(chrono::Months::new(config.global.period_months))
605            .unwrap_or(start_date + chrono::Duration::days(365));
606        let total_period_days = (end_date - start_date).num_days().max(1);
607
608        let company_code = config
609            .companies
610            .first()
611            .map(|c| c.code.as_str())
612            .unwrap_or_else(|| {
613                tracing::warn!("No companies configured, defaulting to company code '1000'");
614                "1000"
615            });
616
617        // Use master data config counts for generating reference data
618        let vendor_count = md_config.vendors.count.min(100);
619        let customer_count = md_config.customers.count.min(100);
620        let material_count = md_config.materials.count.min(50);
621
622        // Generate some master data for document flows
623        let mut vendor_gen = VendorGenerator::new(seed);
624        let mut customer_gen = CustomerGenerator::new(seed + 1);
625        let mut material_gen = MaterialGenerator::new(seed + 2);
626
627        let vendors: Vec<_> = (0..vendor_count)
628            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
629            .collect();
630
631        let customers: Vec<_> = (0..customer_count)
632            .map(|_| customer_gen.generate_customer(company_code, start_date))
633            .collect();
634
635        let materials: Vec<_> = (0..material_count)
636            .map(|_| material_gen.generate_material(company_code, start_date))
637            .collect();
638
639        // Determine number of chains based on transaction volume
640        // Use period months as a multiplier for document chains
641        let base_chains = (config.global.period_months as usize * 50).max(100);
642
643        // P2P Generation
644        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
645            info!("Generating P2P document flows");
646            let mut p2p_gen = P2PGenerator::new(seed + 100);
647
648            let chains_to_generate = base_chains.min(1000);
649            progress.items_remaining = Some(chains_to_generate as u64);
650
651            for i in 0..chains_to_generate {
652                if control.is_cancelled() {
653                    break;
654                }
655
656                // Handle pause
657                while control.is_paused() {
658                    std::thread::sleep(std::time::Duration::from_millis(100));
659                    if control.is_cancelled() {
660                        break;
661                    }
662                }
663
664                let vendor = &vendors[i % vendors.len()];
665                let material_refs: Vec<&datasynth_core::models::Material> =
666                    vec![&materials[i % materials.len()]];
667
668                // Calculate posting date within the period
669                let days_offset = (i as i64 % total_period_days).max(0);
670                let po_date = start_date + chrono::Duration::days(days_offset);
671                let fiscal_year = po_date.year() as u16;
672                let fiscal_period = po_date.month() as u8;
673
674                let chain = p2p_gen.generate_chain(
675                    company_code,
676                    vendor,
677                    &material_refs,
678                    po_date,
679                    fiscal_year,
680                    fiscal_period,
681                    "SYSTEM",
682                );
683
684                // Send each document in the chain
685                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
686                    chain.purchase_order,
687                ))))?;
688                count += 1;
689
690                for gr in chain.goods_receipts {
691                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
692                    count += 1;
693                }
694
695                if let Some(vi) = chain.vendor_invoice {
696                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
697                        vi,
698                    ))))?;
699                    count += 1;
700                }
701
702                if let Some(payment) = chain.payment {
703                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
704                    count += 1;
705                }
706
707                if count.is_multiple_of(progress_interval) {
708                    progress.items_generated = count;
709                    sender.send(StreamEvent::Progress(progress.clone()))?;
710                }
711            }
712        }
713
714        // O2C Generation
715        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
716            info!("Generating O2C document flows");
717            let mut o2c_gen = O2CGenerator::new(seed + 200);
718
719            let chains_to_generate = base_chains.min(1000);
720
721            for i in 0..chains_to_generate {
722                if control.is_cancelled() {
723                    break;
724                }
725
726                while control.is_paused() {
727                    std::thread::sleep(std::time::Duration::from_millis(100));
728                    if control.is_cancelled() {
729                        break;
730                    }
731                }
732
733                let customer = &customers[i % customers.len()];
734                let material_refs: Vec<&datasynth_core::models::Material> =
735                    vec![&materials[i % materials.len()]];
736
737                let days_offset = (i as i64 % total_period_days).max(0);
738                let so_date = start_date + chrono::Duration::days(days_offset);
739                let fiscal_year = so_date.year() as u16;
740                let fiscal_period = so_date.month() as u8;
741
742                let chain = o2c_gen.generate_chain(
743                    company_code,
744                    customer,
745                    &material_refs,
746                    so_date,
747                    fiscal_year,
748                    fiscal_period,
749                    "SYSTEM",
750                );
751
752                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
753                    chain.sales_order,
754                ))))?;
755                count += 1;
756
757                for delivery in chain.deliveries {
758                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
759                        delivery,
760                    ))))?;
761                    count += 1;
762                }
763
764                if let Some(ci) = chain.customer_invoice {
765                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
766                        ci,
767                    ))))?;
768                    count += 1;
769                }
770
771                if count.is_multiple_of(progress_interval) {
772                    progress.items_generated = count;
773                    sender.send(StreamEvent::Progress(progress.clone()))?;
774                }
775            }
776        }
777
778        Ok(count)
779    }
780
781    /// Returns the orchestrator configuration stats.
782    pub fn stats(&self) -> StreamingOrchestratorStats {
783        StreamingOrchestratorStats {
784            phases: self.config.phases.len(),
785            buffer_size: self.config.stream_config.buffer_size,
786            backpressure: self.config.stream_config.backpressure,
787        }
788    }
789}
790
791/// Statistics for the streaming orchestrator.
792#[derive(Debug, Clone)]
793pub struct StreamingOrchestratorStats {
794    /// Number of phases configured.
795    pub phases: usize,
796    /// Buffer size.
797    pub buffer_size: usize,
798    /// Backpressure strategy.
799    pub backpressure: BackpressureStrategy,
800}
801
802/// Resolve CoA framework from a GeneratorConfig.
803fn resolve_coa_framework_from_config(
804    config: &GeneratorConfig,
805) -> datasynth_generators::coa_generator::CoAFramework {
806    use datasynth_generators::coa_generator::CoAFramework;
807    if config.accounting_standards.enabled {
808        match config.accounting_standards.framework {
809            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
810                return CoAFramework::FrenchPcg;
811            }
812            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
813                return CoAFramework::GermanSkr04;
814            }
815            _ => {}
816        }
817    }
818    CoAFramework::UsGaap
819}
820
821#[cfg(test)]
822#[allow(clippy::unwrap_used)]
823mod tests {
824    use super::*;
825    use datasynth_config::presets::create_preset;
826    use datasynth_config::schema::TransactionVolume;
827    use datasynth_core::models::{CoAComplexity, IndustrySector};
828
829    fn create_test_config() -> GeneratorConfig {
830        create_preset(
831            IndustrySector::Retail,
832            2,
833            3,
834            CoAComplexity::Small,
835            TransactionVolume::TenK,
836        )
837    }
838
839    #[test]
840    fn test_streaming_orchestrator_creation() {
841        let config = create_test_config();
842        let orchestrator = StreamingOrchestrator::from_generator_config(config);
843        let stats = orchestrator.stats();
844
845        assert!(stats.phases > 0);
846        assert!(stats.buffer_size > 0);
847    }
848
849    #[test]
850    fn test_streaming_generation() {
851        let mut config = create_test_config();
852        // Reduce volume for testing
853        config.master_data.vendors.count = 5;
854        config.master_data.customers.count = 5;
855        config.master_data.employees.count = 5;
856        config.global.period_months = 1;
857
858        let streaming_config = StreamingOrchestratorConfig::new(config)
859            .with_phases(vec![
860                GenerationPhase::ChartOfAccounts,
861                GenerationPhase::MasterData,
862            ])
863            .with_stream_config(StreamConfig {
864                buffer_size: 100,
865                progress_interval: 10,
866                ..Default::default()
867            });
868
869        let orchestrator = StreamingOrchestrator::new(streaming_config);
870        let (receiver, _control) = orchestrator.stream().unwrap();
871
872        let mut items_count = 0;
873        let mut has_coa = false;
874        let mut has_completion = false;
875
876        for event in receiver {
877            match event {
878                StreamEvent::Data(item) => {
879                    items_count += 1;
880                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
881                        has_coa = true;
882                    }
883                }
884                StreamEvent::Complete(_) => {
885                    has_completion = true;
886                    break;
887                }
888                _ => {}
889            }
890        }
891
892        assert!(items_count > 0);
893        assert!(has_coa);
894        assert!(has_completion);
895    }
896
897    #[test]
898    fn test_stream_cancellation() {
899        let mut config = create_test_config();
900        config.global.period_months = 12; // Longer generation
901
902        let streaming_config = StreamingOrchestratorConfig::new(config)
903            .with_phases(vec![GenerationPhase::JournalEntries]);
904
905        let orchestrator = StreamingOrchestrator::new(streaming_config);
906        let (receiver, control) = orchestrator.stream().unwrap();
907
908        // Cancel after receiving some items
909        let mut items_count = 0;
910        for event in receiver {
911            if let StreamEvent::Data(_) = event {
912                items_count += 1;
913                if items_count >= 10 {
914                    control.cancel();
915                    break;
916                }
917            }
918        }
919
920        assert!(control.is_cancelled());
921    }
922
923    #[test]
924    fn test_generated_item_type_name() {
925        use datasynth_core::models::{CoAComplexity, IndustrySector};
926
927        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
928            "TEST_COA".to_string(),
929            "Test Chart of Accounts".to_string(),
930            "US".to_string(),
931            IndustrySector::Manufacturing,
932            CoAComplexity::Small,
933        )));
934        assert_eq!(coa.type_name(), "chart_of_accounts");
935
936        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
937        assert_eq!(progress.type_name(), "progress");
938    }
939}