Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16    documents::{
17        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18    },
19    ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26/// Generated items that can be streamed.
27#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29    /// Chart of Accounts.
30    ChartOfAccounts(Box<ChartOfAccounts>),
31    /// A vendor.
32    Vendor(Box<Vendor>),
33    /// A customer.
34    Customer(Box<Customer>),
35    /// A material.
36    Material(Box<Material>),
37    /// An employee.
38    Employee(Box<Employee>),
39    /// A journal entry.
40    JournalEntry(Box<JournalEntry>),
41    /// A purchase order (P2P).
42    PurchaseOrder(Box<PurchaseOrder>),
43    /// A goods receipt (P2P).
44    GoodsReceipt(Box<GoodsReceipt>),
45    /// A vendor invoice (P2P).
46    VendorInvoice(Box<VendorInvoice>),
47    /// A payment (P2P/O2C).
48    Payment(Box<Payment>),
49    /// A sales order (O2C).
50    SalesOrder(Box<SalesOrder>),
51    /// A delivery (O2C).
52    Delivery(Box<Delivery>),
53    /// A customer invoice (O2C).
54    CustomerInvoice(Box<CustomerInvoice>),
55    /// Progress update.
56    Progress(StreamProgress),
57    /// Phase completion marker.
58    PhaseComplete(String),
59}
60
61impl GeneratedItem {
62    /// Returns the item type name.
63    pub fn type_name(&self) -> &'static str {
64        match self {
65            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66            GeneratedItem::Vendor(_) => "vendor",
67            GeneratedItem::Customer(_) => "customer",
68            GeneratedItem::Material(_) => "material",
69            GeneratedItem::Employee(_) => "employee",
70            GeneratedItem::JournalEntry(_) => "journal_entry",
71            GeneratedItem::PurchaseOrder(_) => "purchase_order",
72            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74            GeneratedItem::Payment(_) => "payment",
75            GeneratedItem::SalesOrder(_) => "sales_order",
76            GeneratedItem::Delivery(_) => "delivery",
77            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78            GeneratedItem::Progress(_) => "progress",
79            GeneratedItem::PhaseComplete(_) => "phase_complete",
80        }
81    }
82}
83
84/// Phase of generation.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87    /// Chart of Accounts generation.
88    ChartOfAccounts,
89    /// Master data generation (vendors, customers, etc.).
90    MasterData,
91    /// Document flow generation (P2P, O2C).
92    DocumentFlows,
93    /// OCPM event log generation.
94    OcpmEvents,
95    /// Journal entry generation.
96    JournalEntries,
97    /// Anomaly injection.
98    AnomalyInjection,
99    /// Balance validation.
100    BalanceValidation,
101    /// Data quality injection.
102    DataQuality,
103    /// Complete.
104    Complete,
105}
106
107impl GenerationPhase {
108    /// Returns the phase name.
109    pub fn name(&self) -> &'static str {
110        match self {
111            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112            GenerationPhase::MasterData => "master_data",
113            GenerationPhase::DocumentFlows => "document_flows",
114            GenerationPhase::OcpmEvents => "ocpm_events",
115            GenerationPhase::JournalEntries => "journal_entries",
116            GenerationPhase::AnomalyInjection => "anomaly_injection",
117            GenerationPhase::BalanceValidation => "balance_validation",
118            GenerationPhase::DataQuality => "data_quality",
119            GenerationPhase::Complete => "complete",
120        }
121    }
122}
123
124/// Configuration for streaming orchestration.
125#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127    /// Generator configuration.
128    pub generator_config: GeneratorConfig,
129    /// Stream configuration.
130    pub stream_config: StreamConfig,
131    /// Phases to execute.
132    pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136    /// Creates a new streaming orchestrator configuration.
137    pub fn new(generator_config: GeneratorConfig) -> Self {
138        Self {
139            generator_config,
140            stream_config: StreamConfig::default(),
141            phases: vec![
142                GenerationPhase::ChartOfAccounts,
143                GenerationPhase::MasterData,
144                GenerationPhase::DocumentFlows,
145                GenerationPhase::JournalEntries,
146            ],
147        }
148    }
149
150    /// Creates a configuration with all phases enabled including OCPM.
151    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152        Self {
153            generator_config,
154            stream_config: StreamConfig::default(),
155            phases: vec![
156                GenerationPhase::ChartOfAccounts,
157                GenerationPhase::MasterData,
158                GenerationPhase::DocumentFlows,
159                GenerationPhase::OcpmEvents,
160                GenerationPhase::JournalEntries,
161                GenerationPhase::AnomalyInjection,
162                GenerationPhase::DataQuality,
163            ],
164        }
165    }
166
167    /// Sets the stream configuration.
168    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169        self.stream_config = config;
170        self
171    }
172
173    /// Sets the phases to execute.
174    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175        self.phases = phases;
176        self
177    }
178}
179
180/// Streaming orchestrator for real-time data generation.
181pub struct StreamingOrchestrator {
182    config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186    /// Creates a new streaming orchestrator.
187    pub fn new(config: StreamingOrchestratorConfig) -> Self {
188        Self { config }
189    }
190
191    /// Creates a streaming orchestrator from generator config with defaults.
192    pub fn from_generator_config(config: GeneratorConfig) -> Self {
193        Self::new(StreamingOrchestratorConfig::new(config))
194    }
195
196    /// Starts streaming generation.
197    ///
198    /// Returns a receiver for stream events and a control handle.
199    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200        let (sender, receiver) = stream_channel(
201            self.config.stream_config.buffer_size,
202            self.config.stream_config.backpressure,
203        );
204
205        let control = Arc::new(StreamControl::new());
206        let control_clone = Arc::clone(&control);
207
208        let config = self.config.clone();
209
210        // Spawn generation thread
211        thread::spawn(move || {
212            let result = Self::run_generation(config, sender, control_clone);
213            if let Err(e) = result {
214                warn!("Streaming generation error: {}", e);
215            }
216        });
217
218        Ok((receiver, control))
219    }
220
221    /// Runs the generation process.
222    fn run_generation(
223        config: StreamingOrchestratorConfig,
224        sender: StreamSender<GeneratedItem>,
225        control: Arc<StreamControl>,
226    ) -> SynthResult<()> {
227        let start_time = Instant::now();
228        let mut items_generated: u64 = 0;
229        let mut phases_completed = Vec::new();
230
231        // Track stats
232        let progress_interval = config.stream_config.progress_interval;
233
234        // Send initial progress
235        let mut progress = StreamProgress::new("initializing");
236        sender.send(StreamEvent::Progress(progress.clone()))?;
237
238        for phase in &config.phases {
239            if control.is_cancelled() {
240                info!("Generation cancelled");
241                break;
242            }
243
244            // Handle pause
245            while control.is_paused() {
246                std::thread::sleep(std::time::Duration::from_millis(100));
247                if control.is_cancelled() {
248                    break;
249                }
250            }
251
252            progress.phase = phase.name().to_string();
253            sender.send(StreamEvent::Progress(progress.clone()))?;
254
255            match phase {
256                GenerationPhase::ChartOfAccounts => {
257                    let result =
258                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259                    items_generated += result;
260                }
261                GenerationPhase::MasterData => {
262                    let result = Self::generate_master_data_phase(
263                        &config.generator_config,
264                        &sender,
265                        &control,
266                    )?;
267                    items_generated += result;
268                }
269                GenerationPhase::DocumentFlows => {
270                    let result = Self::generate_document_flows_phase(
271                        &config.generator_config,
272                        &sender,
273                        &control,
274                        progress_interval,
275                        &mut progress,
276                    )?;
277                    items_generated += result;
278                }
279                GenerationPhase::OcpmEvents => {
280                    // OCPM event generation is optional and requires document flows
281                    debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282                }
283                GenerationPhase::JournalEntries => {
284                    let result = Self::generate_journal_entries_phase(
285                        &config.generator_config,
286                        &sender,
287                        &control,
288                        progress_interval,
289                        &mut progress,
290                    )?;
291                    items_generated += result;
292                }
293                GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294                    // These phases modify existing data; not directly generating new items
295                    debug!(
296                        "Phase {:?} operates on existing data (not streaming new items)",
297                        phase
298                    );
299                }
300                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301                    // Validation and completion phases don't generate items
302                    debug!("Phase {:?} is a validation/completion phase", phase);
303                }
304            }
305
306            // Send phase completion
307            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308                phase.name().to_string(),
309            )))?;
310            phases_completed.push(phase.name().to_string());
311
312            // Update progress
313            progress.items_generated = items_generated;
314            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315            if progress.elapsed_ms > 0 {
316                progress.items_per_second =
317                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318            }
319            sender.send(StreamEvent::Progress(progress.clone()))?;
320        }
321
322        // Send completion
323        let stats = sender.stats();
324        let summary = StreamSummary {
325            total_items: items_generated,
326            total_time_ms: start_time.elapsed().as_millis() as u64,
327            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329            } else {
330                0.0
331            },
332            error_count: 0,
333            dropped_count: stats.items_dropped,
334            peak_memory_mb: None,
335            phases_completed,
336        };
337
338        sender.send(StreamEvent::Complete(summary))?;
339        sender.close();
340
341        Ok(())
342    }
343
344    /// Generates Chart of Accounts phase.
345    fn generate_coa_phase(
346        config: &GeneratorConfig,
347        sender: &StreamSender<GeneratedItem>,
348        control: &Arc<StreamControl>,
349    ) -> SynthResult<u64> {
350        use datasynth_generators::ChartOfAccountsGenerator;
351
352        if control.is_cancelled() {
353            return Ok(0);
354        }
355
356        info!("Generating Chart of Accounts");
357        let seed = config.global.seed.unwrap_or(42);
358        let complexity = config.chart_of_accounts.complexity;
359        let industry = config.global.industry;
360        let use_french_pcg = config.accounting_standards.enabled
361            && matches!(
362                config.accounting_standards.framework,
363                Some(datasynth_config::schema::AccountingFrameworkConfig::FrenchGaap)
364            );
365
366        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed)
367            .with_french_pcg(use_french_pcg);
368        let coa = coa_gen.generate();
369
370        let account_count = coa.account_count() as u64;
371        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
372            coa,
373        ))))?;
374
375        Ok(account_count)
376    }
377
378    /// Generates master data phase.
379    fn generate_master_data_phase(
380        config: &GeneratorConfig,
381        sender: &StreamSender<GeneratedItem>,
382        control: &Arc<StreamControl>,
383    ) -> SynthResult<u64> {
384        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
385
386        let mut count: u64 = 0;
387        let seed = config.global.seed.unwrap_or(42);
388        let md_config = &config.master_data;
389        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
390            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
391
392        let company_code = config
393            .companies
394            .first()
395            .map(|c| c.code.as_str())
396            .unwrap_or("1000");
397
398        // Generate vendors
399        if control.is_cancelled() {
400            return Ok(count);
401        }
402
403        info!("Generating vendors");
404        let mut vendor_gen = VendorGenerator::new(seed);
405        for _ in 0..md_config.vendors.count {
406            if control.is_cancelled() {
407                break;
408            }
409            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
410            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
411            count += 1;
412        }
413
414        // Generate customers
415        if control.is_cancelled() {
416            return Ok(count);
417        }
418
419        info!("Generating customers");
420        let mut customer_gen = CustomerGenerator::new(seed + 1);
421        for _ in 0..md_config.customers.count {
422            if control.is_cancelled() {
423                break;
424            }
425            let customer = customer_gen.generate_customer(company_code, effective_date);
426            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
427                customer,
428            ))))?;
429            count += 1;
430        }
431
432        // Generate employees
433        if control.is_cancelled() {
434            return Ok(count);
435        }
436
437        info!("Generating employees");
438        let mut employee_gen = EmployeeGenerator::new(seed + 4);
439        // Use first department from config
440        let dept = datasynth_generators::DepartmentDefinition {
441            code: "1000".to_string(),
442            name: "General".to_string(),
443            cost_center: "CC1000".to_string(),
444            headcount: 10,
445            system_roles: vec![],
446            transaction_codes: vec![],
447        };
448        for _ in 0..md_config.employees.count {
449            if control.is_cancelled() {
450                break;
451            }
452            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
453            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
454                employee,
455            ))))?;
456            count += 1;
457        }
458
459        Ok(count)
460    }
461
462    /// Generates journal entries phase.
463    ///
464    /// Note: This is a simplified version that generates basic journal entries.
465    /// For full-featured generation with all options, use EnhancedOrchestrator.
466    fn generate_journal_entries_phase(
467        config: &GeneratorConfig,
468        sender: &StreamSender<GeneratedItem>,
469        control: &Arc<StreamControl>,
470        progress_interval: u64,
471        progress: &mut StreamProgress,
472    ) -> SynthResult<u64> {
473        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
474        use std::sync::Arc;
475
476        let mut count: u64 = 0;
477        let seed = config.global.seed.unwrap_or(42);
478
479        // Calculate total entries to generate based on volume weights
480        let default_monthly = 500;
481        let total_entries: usize = config
482            .companies
483            .iter()
484            .map(|c| {
485                let monthly = (c.volume_weight * default_monthly as f64) as usize;
486                monthly.max(100) * config.global.period_months as usize
487            })
488            .sum();
489
490        progress.items_remaining = Some(total_entries as u64);
491        info!("Generating {} journal entries", total_entries);
492
493        // Generate a shared CoA for all companies
494        let complexity = config.chart_of_accounts.complexity;
495        let industry = config.global.industry;
496        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
497        let coa = Arc::new(coa_gen.generate());
498
499        // Parse start date
500        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
501            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
502        let end_date =
503            start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
504
505        // Create JE generator from config
506        let mut je_gen = JournalEntryGenerator::from_generator_config(
507            config,
508            Arc::clone(&coa),
509            start_date,
510            end_date,
511            seed,
512        );
513
514        for _ in 0..total_entries {
515            if control.is_cancelled() {
516                break;
517            }
518
519            // Handle pause
520            while control.is_paused() {
521                std::thread::sleep(std::time::Duration::from_millis(100));
522                if control.is_cancelled() {
523                    break;
524                }
525            }
526
527            let je = je_gen.generate();
528            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
529            count += 1;
530
531            // Send progress updates
532            if count.is_multiple_of(progress_interval) {
533                progress.items_generated = count;
534                progress.items_remaining = Some(total_entries as u64 - count);
535                sender.send(StreamEvent::Progress(progress.clone()))?;
536            }
537        }
538
539        Ok(count)
540    }
541
542    /// Generates document flows phase (P2P and O2C).
543    ///
544    /// Creates complete document chains:
545    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
546    /// - O2C: SalesOrder → Delivery → CustomerInvoice
547    fn generate_document_flows_phase(
548        config: &GeneratorConfig,
549        sender: &StreamSender<GeneratedItem>,
550        control: &Arc<StreamControl>,
551        progress_interval: u64,
552        progress: &mut StreamProgress,
553    ) -> SynthResult<u64> {
554        use chrono::Datelike;
555        use datasynth_generators::{
556            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
557        };
558
559        let mut count: u64 = 0;
560        let seed = config.global.seed.unwrap_or(42);
561        let df_config = &config.document_flows;
562        let md_config = &config.master_data;
563
564        // Parse dates
565        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
566            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).expect("valid default date"));
567
568        let company_code = config
569            .companies
570            .first()
571            .map(|c| c.code.as_str())
572            .unwrap_or("1000");
573
574        // Use master data config counts for generating reference data
575        let vendor_count = md_config.vendors.count.min(100);
576        let customer_count = md_config.customers.count.min(100);
577        let material_count = md_config.materials.count.min(50);
578
579        // Generate some master data for document flows
580        let mut vendor_gen = VendorGenerator::new(seed);
581        let mut customer_gen = CustomerGenerator::new(seed + 1);
582        let mut material_gen = MaterialGenerator::new(seed + 2);
583
584        let vendors: Vec<_> = (0..vendor_count)
585            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
586            .collect();
587
588        let customers: Vec<_> = (0..customer_count)
589            .map(|_| customer_gen.generate_customer(company_code, start_date))
590            .collect();
591
592        let materials: Vec<_> = (0..material_count)
593            .map(|_| material_gen.generate_material(company_code, start_date))
594            .collect();
595
596        // Determine number of chains based on transaction volume
597        // Use period months as a multiplier for document chains
598        let base_chains = (config.global.period_months as usize * 50).max(100);
599
600        // P2P Generation
601        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
602            info!("Generating P2P document flows");
603            let mut p2p_gen = P2PGenerator::new(seed + 100);
604
605            let chains_to_generate = base_chains.min(1000);
606            progress.items_remaining = Some(chains_to_generate as u64);
607
608            for i in 0..chains_to_generate {
609                if control.is_cancelled() {
610                    break;
611                }
612
613                // Handle pause
614                while control.is_paused() {
615                    std::thread::sleep(std::time::Duration::from_millis(100));
616                    if control.is_cancelled() {
617                        break;
618                    }
619                }
620
621                let vendor = &vendors[i % vendors.len()];
622                let material_refs: Vec<&datasynth_core::models::Material> =
623                    vec![&materials[i % materials.len()]];
624
625                // Calculate posting date within the period
626                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
627                let po_date = start_date + chrono::Duration::days(days_offset);
628                let fiscal_year = po_date.year() as u16;
629                let fiscal_period = po_date.month() as u8;
630
631                let chain = p2p_gen.generate_chain(
632                    company_code,
633                    vendor,
634                    &material_refs,
635                    po_date,
636                    fiscal_year,
637                    fiscal_period,
638                    "SYSTEM",
639                );
640
641                // Send each document in the chain
642                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
643                    chain.purchase_order,
644                ))))?;
645                count += 1;
646
647                for gr in chain.goods_receipts {
648                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
649                    count += 1;
650                }
651
652                if let Some(vi) = chain.vendor_invoice {
653                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
654                        vi,
655                    ))))?;
656                    count += 1;
657                }
658
659                if let Some(payment) = chain.payment {
660                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
661                    count += 1;
662                }
663
664                if count.is_multiple_of(progress_interval) {
665                    progress.items_generated = count;
666                    sender.send(StreamEvent::Progress(progress.clone()))?;
667                }
668            }
669        }
670
671        // O2C Generation
672        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
673            info!("Generating O2C document flows");
674            let mut o2c_gen = O2CGenerator::new(seed + 200);
675
676            let chains_to_generate = base_chains.min(1000);
677
678            for i in 0..chains_to_generate {
679                if control.is_cancelled() {
680                    break;
681                }
682
683                while control.is_paused() {
684                    std::thread::sleep(std::time::Duration::from_millis(100));
685                    if control.is_cancelled() {
686                        break;
687                    }
688                }
689
690                let customer = &customers[i % customers.len()];
691                let material_refs: Vec<&datasynth_core::models::Material> =
692                    vec![&materials[i % materials.len()]];
693
694                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
695                let so_date = start_date + chrono::Duration::days(days_offset);
696                let fiscal_year = so_date.year() as u16;
697                let fiscal_period = so_date.month() as u8;
698
699                let chain = o2c_gen.generate_chain(
700                    company_code,
701                    customer,
702                    &material_refs,
703                    so_date,
704                    fiscal_year,
705                    fiscal_period,
706                    "SYSTEM",
707                );
708
709                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
710                    chain.sales_order,
711                ))))?;
712                count += 1;
713
714                for delivery in chain.deliveries {
715                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
716                        delivery,
717                    ))))?;
718                    count += 1;
719                }
720
721                if let Some(ci) = chain.customer_invoice {
722                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
723                        ci,
724                    ))))?;
725                    count += 1;
726                }
727
728                if count.is_multiple_of(progress_interval) {
729                    progress.items_generated = count;
730                    sender.send(StreamEvent::Progress(progress.clone()))?;
731                }
732            }
733        }
734
735        Ok(count)
736    }
737
738    /// Returns the orchestrator configuration stats.
739    pub fn stats(&self) -> StreamingOrchestratorStats {
740        StreamingOrchestratorStats {
741            phases: self.config.phases.len(),
742            buffer_size: self.config.stream_config.buffer_size,
743            backpressure: self.config.stream_config.backpressure,
744        }
745    }
746}
747
748/// Statistics for the streaming orchestrator.
749#[derive(Debug, Clone)]
750pub struct StreamingOrchestratorStats {
751    /// Number of phases configured.
752    pub phases: usize,
753    /// Buffer size.
754    pub buffer_size: usize,
755    /// Backpressure strategy.
756    pub backpressure: BackpressureStrategy,
757}
758
759#[cfg(test)]
760#[allow(clippy::unwrap_used)]
761mod tests {
762    use super::*;
763    use datasynth_config::presets::create_preset;
764    use datasynth_config::schema::TransactionVolume;
765    use datasynth_core::models::{CoAComplexity, IndustrySector};
766
767    fn create_test_config() -> GeneratorConfig {
768        create_preset(
769            IndustrySector::Retail,
770            2,
771            3,
772            CoAComplexity::Small,
773            TransactionVolume::TenK,
774        )
775    }
776
777    #[test]
778    fn test_streaming_orchestrator_creation() {
779        let config = create_test_config();
780        let orchestrator = StreamingOrchestrator::from_generator_config(config);
781        let stats = orchestrator.stats();
782
783        assert!(stats.phases > 0);
784        assert!(stats.buffer_size > 0);
785    }
786
787    #[test]
788    fn test_streaming_generation() {
789        let mut config = create_test_config();
790        // Reduce volume for testing
791        config.master_data.vendors.count = 5;
792        config.master_data.customers.count = 5;
793        config.master_data.employees.count = 5;
794        config.global.period_months = 1;
795
796        let streaming_config = StreamingOrchestratorConfig::new(config)
797            .with_phases(vec![
798                GenerationPhase::ChartOfAccounts,
799                GenerationPhase::MasterData,
800            ])
801            .with_stream_config(StreamConfig {
802                buffer_size: 100,
803                progress_interval: 10,
804                ..Default::default()
805            });
806
807        let orchestrator = StreamingOrchestrator::new(streaming_config);
808        let (receiver, _control) = orchestrator.stream().unwrap();
809
810        let mut items_count = 0;
811        let mut has_coa = false;
812        let mut has_completion = false;
813
814        for event in receiver {
815            match event {
816                StreamEvent::Data(item) => {
817                    items_count += 1;
818                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
819                        has_coa = true;
820                    }
821                }
822                StreamEvent::Complete(_) => {
823                    has_completion = true;
824                    break;
825                }
826                _ => {}
827            }
828        }
829
830        assert!(items_count > 0);
831        assert!(has_coa);
832        assert!(has_completion);
833    }
834
835    #[test]
836    fn test_stream_cancellation() {
837        let mut config = create_test_config();
838        config.global.period_months = 12; // Longer generation
839
840        let streaming_config = StreamingOrchestratorConfig::new(config)
841            .with_phases(vec![GenerationPhase::JournalEntries]);
842
843        let orchestrator = StreamingOrchestrator::new(streaming_config);
844        let (receiver, control) = orchestrator.stream().unwrap();
845
846        // Cancel after receiving some items
847        let mut items_count = 0;
848        for event in receiver {
849            if let StreamEvent::Data(_) = event {
850                items_count += 1;
851                if items_count >= 10 {
852                    control.cancel();
853                    break;
854                }
855            }
856        }
857
858        assert!(control.is_cancelled());
859    }
860
861    #[test]
862    fn test_generated_item_type_name() {
863        use datasynth_core::models::{CoAComplexity, IndustrySector};
864
865        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
866            "TEST_COA".to_string(),
867            "Test Chart of Accounts".to_string(),
868            "US".to_string(),
869            IndustrySector::Manufacturing,
870            CoAComplexity::Small,
871        )));
872        assert_eq!(coa.type_name(), "chart_of_accounts");
873
874        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
875        assert_eq!(progress.type_name(), "progress");
876    }
877}