Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16    documents::{
17        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18    },
19    ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26/// Generated items that can be streamed.
27#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29    /// Chart of Accounts.
30    ChartOfAccounts(Box<ChartOfAccounts>),
31    /// A vendor.
32    Vendor(Box<Vendor>),
33    /// A customer.
34    Customer(Box<Customer>),
35    /// A material.
36    Material(Box<Material>),
37    /// An employee.
38    Employee(Box<Employee>),
39    /// A journal entry.
40    JournalEntry(Box<JournalEntry>),
41    /// A purchase order (P2P).
42    PurchaseOrder(Box<PurchaseOrder>),
43    /// A goods receipt (P2P).
44    GoodsReceipt(Box<GoodsReceipt>),
45    /// A vendor invoice (P2P).
46    VendorInvoice(Box<VendorInvoice>),
47    /// A payment (P2P/O2C).
48    Payment(Box<Payment>),
49    /// A sales order (O2C).
50    SalesOrder(Box<SalesOrder>),
51    /// A delivery (O2C).
52    Delivery(Box<Delivery>),
53    /// A customer invoice (O2C).
54    CustomerInvoice(Box<CustomerInvoice>),
55    /// Progress update.
56    Progress(StreamProgress),
57    /// Phase completion marker.
58    PhaseComplete(String),
59}
60
61impl GeneratedItem {
62    /// Returns the item type name.
63    pub fn type_name(&self) -> &'static str {
64        match self {
65            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66            GeneratedItem::Vendor(_) => "vendor",
67            GeneratedItem::Customer(_) => "customer",
68            GeneratedItem::Material(_) => "material",
69            GeneratedItem::Employee(_) => "employee",
70            GeneratedItem::JournalEntry(_) => "journal_entry",
71            GeneratedItem::PurchaseOrder(_) => "purchase_order",
72            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74            GeneratedItem::Payment(_) => "payment",
75            GeneratedItem::SalesOrder(_) => "sales_order",
76            GeneratedItem::Delivery(_) => "delivery",
77            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78            GeneratedItem::Progress(_) => "progress",
79            GeneratedItem::PhaseComplete(_) => "phase_complete",
80        }
81    }
82}
83
84/// Phase of generation.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87    /// Chart of Accounts generation.
88    ChartOfAccounts,
89    /// Master data generation (vendors, customers, etc.).
90    MasterData,
91    /// Document flow generation (P2P, O2C).
92    DocumentFlows,
93    /// OCPM event log generation.
94    OcpmEvents,
95    /// Journal entry generation.
96    JournalEntries,
97    /// Anomaly injection.
98    AnomalyInjection,
99    /// Balance validation.
100    BalanceValidation,
101    /// Data quality injection.
102    DataQuality,
103    /// Complete.
104    Complete,
105}
106
107impl GenerationPhase {
108    /// Returns the phase name.
109    pub fn name(&self) -> &'static str {
110        match self {
111            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112            GenerationPhase::MasterData => "master_data",
113            GenerationPhase::DocumentFlows => "document_flows",
114            GenerationPhase::OcpmEvents => "ocpm_events",
115            GenerationPhase::JournalEntries => "journal_entries",
116            GenerationPhase::AnomalyInjection => "anomaly_injection",
117            GenerationPhase::BalanceValidation => "balance_validation",
118            GenerationPhase::DataQuality => "data_quality",
119            GenerationPhase::Complete => "complete",
120        }
121    }
122}
123
124/// Configuration for streaming orchestration.
125#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127    /// Generator configuration.
128    pub generator_config: GeneratorConfig,
129    /// Stream configuration.
130    pub stream_config: StreamConfig,
131    /// Phases to execute.
132    pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136    /// Creates a new streaming orchestrator configuration.
137    pub fn new(generator_config: GeneratorConfig) -> Self {
138        Self {
139            generator_config,
140            stream_config: StreamConfig::default(),
141            phases: vec![
142                GenerationPhase::ChartOfAccounts,
143                GenerationPhase::MasterData,
144                GenerationPhase::DocumentFlows,
145                GenerationPhase::JournalEntries,
146            ],
147        }
148    }
149
150    /// Creates a configuration with all phases enabled including OCPM.
151    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152        Self {
153            generator_config,
154            stream_config: StreamConfig::default(),
155            phases: vec![
156                GenerationPhase::ChartOfAccounts,
157                GenerationPhase::MasterData,
158                GenerationPhase::DocumentFlows,
159                GenerationPhase::OcpmEvents,
160                GenerationPhase::JournalEntries,
161                GenerationPhase::AnomalyInjection,
162                GenerationPhase::DataQuality,
163            ],
164        }
165    }
166
167    /// Sets the stream configuration.
168    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169        self.stream_config = config;
170        self
171    }
172
173    /// Sets the phases to execute.
174    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175        self.phases = phases;
176        self
177    }
178}
179
180/// Streaming orchestrator for real-time data generation.
181pub struct StreamingOrchestrator {
182    config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186    /// Creates a new streaming orchestrator.
187    pub fn new(config: StreamingOrchestratorConfig) -> Self {
188        Self { config }
189    }
190
191    /// Creates a streaming orchestrator from generator config with defaults.
192    pub fn from_generator_config(config: GeneratorConfig) -> Self {
193        Self::new(StreamingOrchestratorConfig::new(config))
194    }
195
196    /// Starts streaming generation.
197    ///
198    /// Returns a receiver for stream events and a control handle.
199    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200        let (sender, receiver) = stream_channel(
201            self.config.stream_config.buffer_size,
202            self.config.stream_config.backpressure,
203        );
204
205        let control = Arc::new(StreamControl::new());
206        let control_clone = Arc::clone(&control);
207
208        let config = self.config.clone();
209
210        // Spawn generation thread
211        thread::spawn(move || {
212            let result = Self::run_generation(config, sender, control_clone);
213            if let Err(e) = result {
214                warn!("Streaming generation error: {}", e);
215            }
216        });
217
218        Ok((receiver, control))
219    }
220
221    /// Runs the generation process.
222    fn run_generation(
223        config: StreamingOrchestratorConfig,
224        sender: StreamSender<GeneratedItem>,
225        control: Arc<StreamControl>,
226    ) -> SynthResult<()> {
227        let start_time = Instant::now();
228        let mut items_generated: u64 = 0;
229        let mut phases_completed = Vec::new();
230
231        // Track stats
232        let progress_interval = config.stream_config.progress_interval;
233
234        // Send initial progress
235        let mut progress = StreamProgress::new("initializing");
236        sender.send(StreamEvent::Progress(progress.clone()))?;
237
238        for phase in &config.phases {
239            if control.is_cancelled() {
240                info!("Generation cancelled");
241                break;
242            }
243
244            // Handle pause
245            while control.is_paused() {
246                std::thread::sleep(std::time::Duration::from_millis(100));
247                if control.is_cancelled() {
248                    break;
249                }
250            }
251
252            progress.phase = phase.name().to_string();
253            sender.send(StreamEvent::Progress(progress.clone()))?;
254
255            match phase {
256                GenerationPhase::ChartOfAccounts => {
257                    let result =
258                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259                    items_generated += result;
260                }
261                GenerationPhase::MasterData => {
262                    let result = Self::generate_master_data_phase(
263                        &config.generator_config,
264                        &sender,
265                        &control,
266                    )?;
267                    items_generated += result;
268                }
269                GenerationPhase::DocumentFlows => {
270                    let result = Self::generate_document_flows_phase(
271                        &config.generator_config,
272                        &sender,
273                        &control,
274                        progress_interval,
275                        &mut progress,
276                    )?;
277                    items_generated += result;
278                }
279                GenerationPhase::OcpmEvents => {
280                    warn!("OCPM event generation is not yet supported in streaming mode; skipping");
281                }
282                GenerationPhase::JournalEntries => {
283                    let result = Self::generate_journal_entries_phase(
284                        &config.generator_config,
285                        &sender,
286                        &control,
287                        progress_interval,
288                        &mut progress,
289                    )?;
290                    items_generated += result;
291                }
292                GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
293                    warn!(
294                        "Phase {:?} requires post-processing of existing data and is skipped in streaming mode",
295                        phase
296                    );
297                }
298                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
299                    info!("Phase {:?} is not applicable in streaming mode", phase);
300                }
301            }
302
303            // Send phase completion
304            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
305                phase.name().to_string(),
306            )))?;
307            phases_completed.push(phase.name().to_string());
308
309            // Update progress
310            progress.items_generated = items_generated;
311            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
312            if progress.elapsed_ms > 0 {
313                progress.items_per_second =
314                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
315            }
316            sender.send(StreamEvent::Progress(progress.clone()))?;
317        }
318
319        // Send completion
320        let stats = sender.stats();
321        let summary = StreamSummary {
322            total_items: items_generated,
323            total_time_ms: start_time.elapsed().as_millis() as u64,
324            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
325                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
326            } else {
327                0.0
328            },
329            error_count: 0,
330            dropped_count: stats.items_dropped,
331            peak_memory_mb: None,
332            phases_completed,
333        };
334
335        sender.send(StreamEvent::Complete(summary))?;
336        sender.close();
337
338        Ok(())
339    }
340
341    /// Generates Chart of Accounts phase.
342    fn generate_coa_phase(
343        config: &GeneratorConfig,
344        sender: &StreamSender<GeneratedItem>,
345        control: &Arc<StreamControl>,
346    ) -> SynthResult<u64> {
347        use datasynth_generators::ChartOfAccountsGenerator;
348
349        if control.is_cancelled() {
350            return Ok(0);
351        }
352
353        info!("Generating Chart of Accounts");
354        let seed = config.global.seed.unwrap_or(42);
355        let complexity = config.chart_of_accounts.complexity;
356        let industry = config.global.industry;
357        let coa_framework = resolve_coa_framework_from_config(config);
358
359        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
360            .with_coa_framework(coa_framework);
361        let coa = coa_gen.generate();
362
363        let account_count = coa.account_count() as u64;
364        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
365            coa,
366        ))))?;
367
368        Ok(account_count)
369    }
370
371    /// Generates master data phase.
372    fn generate_master_data_phase(
373        config: &GeneratorConfig,
374        sender: &StreamSender<GeneratedItem>,
375        control: &Arc<StreamControl>,
376    ) -> SynthResult<u64> {
377        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
378
379        let mut count: u64 = 0;
380        let seed = config.global.seed.unwrap_or(42);
381        let md_config = &config.master_data;
382        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
383            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
384
385        let company_code = config
386            .companies
387            .first()
388            .map(|c| c.code.as_str())
389            .unwrap_or("1000");
390
391        // Generate vendors
392        if control.is_cancelled() {
393            return Ok(count);
394        }
395
396        info!("Generating vendors");
397        let mut vendor_gen = VendorGenerator::new(seed);
398        for _ in 0..md_config.vendors.count {
399            if control.is_cancelled() {
400                break;
401            }
402            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
403            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
404            count += 1;
405        }
406
407        // Generate customers
408        if control.is_cancelled() {
409            return Ok(count);
410        }
411
412        info!("Generating customers");
413        let mut customer_gen = CustomerGenerator::new(seed + 1);
414        for _ in 0..md_config.customers.count {
415            if control.is_cancelled() {
416                break;
417            }
418            let customer = customer_gen.generate_customer(company_code, effective_date);
419            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
420                customer,
421            ))))?;
422            count += 1;
423        }
424
425        // Generate employees
426        if control.is_cancelled() {
427            return Ok(count);
428        }
429
430        info!("Generating employees");
431        let mut employee_gen = EmployeeGenerator::new(seed + 4);
432        // Use first department from config, falling back to a default
433        let dept = if let Some(first_custom) = config.departments.custom_departments.first() {
434            datasynth_generators::DepartmentDefinition {
435                code: first_custom.code.clone(),
436                name: first_custom.name.clone(),
437                cost_center: first_custom
438                    .cost_center
439                    .clone()
440                    .unwrap_or_else(|| format!("CC{}", first_custom.code)),
441                headcount: 10,
442                system_roles: vec![],
443                transaction_codes: vec![],
444            }
445        } else {
446            warn!("No departments configured, using default 'General' department");
447            datasynth_generators::DepartmentDefinition {
448                code: "1000".to_string(),
449                name: "General".to_string(),
450                cost_center: "CC1000".to_string(),
451                headcount: 10,
452                system_roles: vec![],
453                transaction_codes: vec![],
454            }
455        };
456        for _ in 0..md_config.employees.count {
457            if control.is_cancelled() {
458                break;
459            }
460            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
461            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
462                employee,
463            ))))?;
464            count += 1;
465        }
466
467        Ok(count)
468    }
469
470    /// Generates journal entries phase.
471    ///
472    /// Note: This is a simplified version that generates basic journal entries.
473    /// For full-featured generation with all options, use EnhancedOrchestrator.
474    fn generate_journal_entries_phase(
475        config: &GeneratorConfig,
476        sender: &StreamSender<GeneratedItem>,
477        control: &Arc<StreamControl>,
478        progress_interval: u64,
479        progress: &mut StreamProgress,
480    ) -> SynthResult<u64> {
481        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
482        use std::sync::Arc;
483
484        let mut count: u64 = 0;
485        let seed = config.global.seed.unwrap_or(42);
486
487        // Calculate total entries to generate based on volume weights
488        let default_monthly = 500;
489        let total_entries: usize = config
490            .companies
491            .iter()
492            .map(|c| {
493                let monthly = (c.volume_weight * default_monthly as f64) as usize;
494                monthly.max(100) * config.global.period_months as usize
495            })
496            .sum();
497
498        progress.items_remaining = Some(total_entries as u64);
499        info!("Generating {} journal entries", total_entries);
500
501        // Generate a shared CoA for all companies
502        let complexity = config.chart_of_accounts.complexity;
503        let industry = config.global.industry;
504        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
505        let coa = Arc::new(coa_gen.generate());
506
507        // Parse start date
508        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
509            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
510        let end_date = start_date
511            .checked_add_months(chrono::Months::new(config.global.period_months))
512            .unwrap_or(start_date + chrono::Duration::days(365));
513
514        // Create JE generator from config
515        let mut je_gen = JournalEntryGenerator::from_generator_config(
516            config,
517            Arc::clone(&coa),
518            start_date,
519            end_date,
520            seed,
521        );
522
523        for _ in 0..total_entries {
524            if control.is_cancelled() {
525                break;
526            }
527
528            // Handle pause
529            while control.is_paused() {
530                std::thread::sleep(std::time::Duration::from_millis(100));
531                if control.is_cancelled() {
532                    break;
533                }
534            }
535
536            let je = je_gen.generate();
537            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
538            count += 1;
539
540            // Send progress updates
541            if count.is_multiple_of(progress_interval) {
542                progress.items_generated = count;
543                progress.items_remaining = Some(total_entries as u64 - count);
544                sender.send(StreamEvent::Progress(progress.clone()))?;
545            }
546        }
547
548        Ok(count)
549    }
550
551    /// Generates document flows phase (P2P and O2C).
552    ///
553    /// Creates complete document chains:
554    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
555    /// - O2C: SalesOrder → Delivery → CustomerInvoice
556    fn generate_document_flows_phase(
557        config: &GeneratorConfig,
558        sender: &StreamSender<GeneratedItem>,
559        control: &Arc<StreamControl>,
560        progress_interval: u64,
561        progress: &mut StreamProgress,
562    ) -> SynthResult<u64> {
563        use chrono::Datelike;
564        use datasynth_generators::{
565            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
566        };
567
568        let mut count: u64 = 0;
569        let seed = config.global.seed.unwrap_or(42);
570        let df_config = &config.document_flows;
571        let md_config = &config.master_data;
572
573        // Parse dates
574        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
575            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
576        let end_date = start_date
577            .checked_add_months(chrono::Months::new(config.global.period_months))
578            .unwrap_or(start_date + chrono::Duration::days(365));
579        let total_period_days = (end_date - start_date).num_days().max(1);
580
581        let company_code = config
582            .companies
583            .first()
584            .map(|c| c.code.as_str())
585            .unwrap_or("1000");
586
587        // Use master data config counts for generating reference data
588        let vendor_count = md_config.vendors.count.min(100);
589        let customer_count = md_config.customers.count.min(100);
590        let material_count = md_config.materials.count.min(50);
591
592        // Generate some master data for document flows
593        let mut vendor_gen = VendorGenerator::new(seed);
594        let mut customer_gen = CustomerGenerator::new(seed + 1);
595        let mut material_gen = MaterialGenerator::new(seed + 2);
596
597        let vendors: Vec<_> = (0..vendor_count)
598            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
599            .collect();
600
601        let customers: Vec<_> = (0..customer_count)
602            .map(|_| customer_gen.generate_customer(company_code, start_date))
603            .collect();
604
605        let materials: Vec<_> = (0..material_count)
606            .map(|_| material_gen.generate_material(company_code, start_date))
607            .collect();
608
609        // Determine number of chains based on transaction volume
610        // Use period months as a multiplier for document chains
611        let base_chains = (config.global.period_months as usize * 50).max(100);
612
613        // P2P Generation
614        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
615            info!("Generating P2P document flows");
616            let mut p2p_gen = P2PGenerator::new(seed + 100);
617
618            let chains_to_generate = base_chains.min(1000);
619            progress.items_remaining = Some(chains_to_generate as u64);
620
621            for i in 0..chains_to_generate {
622                if control.is_cancelled() {
623                    break;
624                }
625
626                // Handle pause
627                while control.is_paused() {
628                    std::thread::sleep(std::time::Duration::from_millis(100));
629                    if control.is_cancelled() {
630                        break;
631                    }
632                }
633
634                let vendor = &vendors[i % vendors.len()];
635                let material_refs: Vec<&datasynth_core::models::Material> =
636                    vec![&materials[i % materials.len()]];
637
638                // Calculate posting date within the period
639                let days_offset = (i as i64 % total_period_days).max(0);
640                let po_date = start_date + chrono::Duration::days(days_offset);
641                let fiscal_year = po_date.year() as u16;
642                let fiscal_period = po_date.month() as u8;
643
644                let chain = p2p_gen.generate_chain(
645                    company_code,
646                    vendor,
647                    &material_refs,
648                    po_date,
649                    fiscal_year,
650                    fiscal_period,
651                    "SYSTEM",
652                );
653
654                // Send each document in the chain
655                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
656                    chain.purchase_order,
657                ))))?;
658                count += 1;
659
660                for gr in chain.goods_receipts {
661                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
662                    count += 1;
663                }
664
665                if let Some(vi) = chain.vendor_invoice {
666                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
667                        vi,
668                    ))))?;
669                    count += 1;
670                }
671
672                if let Some(payment) = chain.payment {
673                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
674                    count += 1;
675                }
676
677                if count.is_multiple_of(progress_interval) {
678                    progress.items_generated = count;
679                    sender.send(StreamEvent::Progress(progress.clone()))?;
680                }
681            }
682        }
683
684        // O2C Generation
685        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
686            info!("Generating O2C document flows");
687            let mut o2c_gen = O2CGenerator::new(seed + 200);
688
689            let chains_to_generate = base_chains.min(1000);
690
691            for i in 0..chains_to_generate {
692                if control.is_cancelled() {
693                    break;
694                }
695
696                while control.is_paused() {
697                    std::thread::sleep(std::time::Duration::from_millis(100));
698                    if control.is_cancelled() {
699                        break;
700                    }
701                }
702
703                let customer = &customers[i % customers.len()];
704                let material_refs: Vec<&datasynth_core::models::Material> =
705                    vec![&materials[i % materials.len()]];
706
707                let days_offset = (i as i64 % total_period_days).max(0);
708                let so_date = start_date + chrono::Duration::days(days_offset);
709                let fiscal_year = so_date.year() as u16;
710                let fiscal_period = so_date.month() as u8;
711
712                let chain = o2c_gen.generate_chain(
713                    company_code,
714                    customer,
715                    &material_refs,
716                    so_date,
717                    fiscal_year,
718                    fiscal_period,
719                    "SYSTEM",
720                );
721
722                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
723                    chain.sales_order,
724                ))))?;
725                count += 1;
726
727                for delivery in chain.deliveries {
728                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
729                        delivery,
730                    ))))?;
731                    count += 1;
732                }
733
734                if let Some(ci) = chain.customer_invoice {
735                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
736                        ci,
737                    ))))?;
738                    count += 1;
739                }
740
741                if count.is_multiple_of(progress_interval) {
742                    progress.items_generated = count;
743                    sender.send(StreamEvent::Progress(progress.clone()))?;
744                }
745            }
746        }
747
748        Ok(count)
749    }
750
751    /// Returns the orchestrator configuration stats.
752    pub fn stats(&self) -> StreamingOrchestratorStats {
753        StreamingOrchestratorStats {
754            phases: self.config.phases.len(),
755            buffer_size: self.config.stream_config.buffer_size,
756            backpressure: self.config.stream_config.backpressure,
757        }
758    }
759}
760
761/// Statistics for the streaming orchestrator.
762#[derive(Debug, Clone)]
763pub struct StreamingOrchestratorStats {
764    /// Number of phases configured.
765    pub phases: usize,
766    /// Buffer size.
767    pub buffer_size: usize,
768    /// Backpressure strategy.
769    pub backpressure: BackpressureStrategy,
770}
771
772/// Resolve CoA framework from a GeneratorConfig.
773fn resolve_coa_framework_from_config(
774    config: &GeneratorConfig,
775) -> datasynth_generators::coa_generator::CoAFramework {
776    use datasynth_generators::coa_generator::CoAFramework;
777    if config.accounting_standards.enabled {
778        match config.accounting_standards.framework {
779            Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap) => {
780                return CoAFramework::FrenchPcg;
781            }
782            Some(datasynth_config::schema::AccountingFrameworkConfig::GermanGaap) => {
783                return CoAFramework::GermanSkr04;
784            }
785            _ => {}
786        }
787    }
788    CoAFramework::UsGaap
789}
790
791#[cfg(test)]
792#[allow(clippy::unwrap_used)]
793mod tests {
794    use super::*;
795    use datasynth_config::presets::create_preset;
796    use datasynth_config::schema::TransactionVolume;
797    use datasynth_core::models::{CoAComplexity, IndustrySector};
798
799    fn create_test_config() -> GeneratorConfig {
800        create_preset(
801            IndustrySector::Retail,
802            2,
803            3,
804            CoAComplexity::Small,
805            TransactionVolume::TenK,
806        )
807    }
808
809    #[test]
810    fn test_streaming_orchestrator_creation() {
811        let config = create_test_config();
812        let orchestrator = StreamingOrchestrator::from_generator_config(config);
813        let stats = orchestrator.stats();
814
815        assert!(stats.phases > 0);
816        assert!(stats.buffer_size > 0);
817    }
818
819    #[test]
820    fn test_streaming_generation() {
821        let mut config = create_test_config();
822        // Reduce volume for testing
823        config.master_data.vendors.count = 5;
824        config.master_data.customers.count = 5;
825        config.master_data.employees.count = 5;
826        config.global.period_months = 1;
827
828        let streaming_config = StreamingOrchestratorConfig::new(config)
829            .with_phases(vec![
830                GenerationPhase::ChartOfAccounts,
831                GenerationPhase::MasterData,
832            ])
833            .with_stream_config(StreamConfig {
834                buffer_size: 100,
835                progress_interval: 10,
836                ..Default::default()
837            });
838
839        let orchestrator = StreamingOrchestrator::new(streaming_config);
840        let (receiver, _control) = orchestrator.stream().unwrap();
841
842        let mut items_count = 0;
843        let mut has_coa = false;
844        let mut has_completion = false;
845
846        for event in receiver {
847            match event {
848                StreamEvent::Data(item) => {
849                    items_count += 1;
850                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
851                        has_coa = true;
852                    }
853                }
854                StreamEvent::Complete(_) => {
855                    has_completion = true;
856                    break;
857                }
858                _ => {}
859            }
860        }
861
862        assert!(items_count > 0);
863        assert!(has_coa);
864        assert!(has_completion);
865    }
866
867    #[test]
868    fn test_stream_cancellation() {
869        let mut config = create_test_config();
870        config.global.period_months = 12; // Longer generation
871
872        let streaming_config = StreamingOrchestratorConfig::new(config)
873            .with_phases(vec![GenerationPhase::JournalEntries]);
874
875        let orchestrator = StreamingOrchestrator::new(streaming_config);
876        let (receiver, control) = orchestrator.stream().unwrap();
877
878        // Cancel after receiving some items
879        let mut items_count = 0;
880        for event in receiver {
881            if let StreamEvent::Data(_) = event {
882                items_count += 1;
883                if items_count >= 10 {
884                    control.cancel();
885                    break;
886                }
887            }
888        }
889
890        assert!(control.is_cancelled());
891    }
892
893    #[test]
894    fn test_generated_item_type_name() {
895        use datasynth_core::models::{CoAComplexity, IndustrySector};
896
897        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
898            "TEST_COA".to_string(),
899            "Test Chart of Accounts".to_string(),
900            "US".to_string(),
901            IndustrySector::Manufacturing,
902            CoAComplexity::Small,
903        )));
904        assert_eq!(coa.type_name(), "chart_of_accounts");
905
906        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
907        assert_eq!(progress.type_name(), "progress");
908    }
909}