Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{
16    documents::{
17        CustomerInvoice, Delivery, GoodsReceipt, Payment, PurchaseOrder, SalesOrder, VendorInvoice,
18    },
19    ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor,
20};
21use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
22use datasynth_core::traits::{
23    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
24};
25
26/// Generated items that can be streamed.
27#[derive(Debug, Clone)]
28pub enum GeneratedItem {
29    /// Chart of Accounts.
30    ChartOfAccounts(Box<ChartOfAccounts>),
31    /// A vendor.
32    Vendor(Box<Vendor>),
33    /// A customer.
34    Customer(Box<Customer>),
35    /// A material.
36    Material(Box<Material>),
37    /// An employee.
38    Employee(Box<Employee>),
39    /// A journal entry.
40    JournalEntry(Box<JournalEntry>),
41    /// A purchase order (P2P).
42    PurchaseOrder(Box<PurchaseOrder>),
43    /// A goods receipt (P2P).
44    GoodsReceipt(Box<GoodsReceipt>),
45    /// A vendor invoice (P2P).
46    VendorInvoice(Box<VendorInvoice>),
47    /// A payment (P2P/O2C).
48    Payment(Box<Payment>),
49    /// A sales order (O2C).
50    SalesOrder(Box<SalesOrder>),
51    /// A delivery (O2C).
52    Delivery(Box<Delivery>),
53    /// A customer invoice (O2C).
54    CustomerInvoice(Box<CustomerInvoice>),
55    /// Progress update.
56    Progress(StreamProgress),
57    /// Phase completion marker.
58    PhaseComplete(String),
59}
60
61impl GeneratedItem {
62    /// Returns the item type name.
63    pub fn type_name(&self) -> &'static str {
64        match self {
65            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
66            GeneratedItem::Vendor(_) => "vendor",
67            GeneratedItem::Customer(_) => "customer",
68            GeneratedItem::Material(_) => "material",
69            GeneratedItem::Employee(_) => "employee",
70            GeneratedItem::JournalEntry(_) => "journal_entry",
71            GeneratedItem::PurchaseOrder(_) => "purchase_order",
72            GeneratedItem::GoodsReceipt(_) => "goods_receipt",
73            GeneratedItem::VendorInvoice(_) => "vendor_invoice",
74            GeneratedItem::Payment(_) => "payment",
75            GeneratedItem::SalesOrder(_) => "sales_order",
76            GeneratedItem::Delivery(_) => "delivery",
77            GeneratedItem::CustomerInvoice(_) => "customer_invoice",
78            GeneratedItem::Progress(_) => "progress",
79            GeneratedItem::PhaseComplete(_) => "phase_complete",
80        }
81    }
82}
83
84/// Phase of generation.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum GenerationPhase {
87    /// Chart of Accounts generation.
88    ChartOfAccounts,
89    /// Master data generation (vendors, customers, etc.).
90    MasterData,
91    /// Document flow generation (P2P, O2C).
92    DocumentFlows,
93    /// OCPM event log generation.
94    OcpmEvents,
95    /// Journal entry generation.
96    JournalEntries,
97    /// Anomaly injection.
98    AnomalyInjection,
99    /// Balance validation.
100    BalanceValidation,
101    /// Data quality injection.
102    DataQuality,
103    /// Complete.
104    Complete,
105}
106
107impl GenerationPhase {
108    /// Returns the phase name.
109    pub fn name(&self) -> &'static str {
110        match self {
111            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
112            GenerationPhase::MasterData => "master_data",
113            GenerationPhase::DocumentFlows => "document_flows",
114            GenerationPhase::OcpmEvents => "ocpm_events",
115            GenerationPhase::JournalEntries => "journal_entries",
116            GenerationPhase::AnomalyInjection => "anomaly_injection",
117            GenerationPhase::BalanceValidation => "balance_validation",
118            GenerationPhase::DataQuality => "data_quality",
119            GenerationPhase::Complete => "complete",
120        }
121    }
122}
123
124/// Configuration for streaming orchestration.
125#[derive(Debug, Clone)]
126pub struct StreamingOrchestratorConfig {
127    /// Generator configuration.
128    pub generator_config: GeneratorConfig,
129    /// Stream configuration.
130    pub stream_config: StreamConfig,
131    /// Phases to execute.
132    pub phases: Vec<GenerationPhase>,
133}
134
135impl StreamingOrchestratorConfig {
136    /// Creates a new streaming orchestrator configuration.
137    pub fn new(generator_config: GeneratorConfig) -> Self {
138        Self {
139            generator_config,
140            stream_config: StreamConfig::default(),
141            phases: vec![
142                GenerationPhase::ChartOfAccounts,
143                GenerationPhase::MasterData,
144                GenerationPhase::DocumentFlows,
145                GenerationPhase::JournalEntries,
146            ],
147        }
148    }
149
150    /// Creates a configuration with all phases enabled including OCPM.
151    pub fn with_all_phases(generator_config: GeneratorConfig) -> Self {
152        Self {
153            generator_config,
154            stream_config: StreamConfig::default(),
155            phases: vec![
156                GenerationPhase::ChartOfAccounts,
157                GenerationPhase::MasterData,
158                GenerationPhase::DocumentFlows,
159                GenerationPhase::OcpmEvents,
160                GenerationPhase::JournalEntries,
161                GenerationPhase::AnomalyInjection,
162                GenerationPhase::DataQuality,
163            ],
164        }
165    }
166
167    /// Sets the stream configuration.
168    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
169        self.stream_config = config;
170        self
171    }
172
173    /// Sets the phases to execute.
174    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
175        self.phases = phases;
176        self
177    }
178}
179
180/// Streaming orchestrator for real-time data generation.
181pub struct StreamingOrchestrator {
182    config: StreamingOrchestratorConfig,
183}
184
185impl StreamingOrchestrator {
186    /// Creates a new streaming orchestrator.
187    pub fn new(config: StreamingOrchestratorConfig) -> Self {
188        Self { config }
189    }
190
191    /// Creates a streaming orchestrator from generator config with defaults.
192    pub fn from_generator_config(config: GeneratorConfig) -> Self {
193        Self::new(StreamingOrchestratorConfig::new(config))
194    }
195
196    /// Starts streaming generation.
197    ///
198    /// Returns a receiver for stream events and a control handle.
199    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
200        let (sender, receiver) = stream_channel(
201            self.config.stream_config.buffer_size,
202            self.config.stream_config.backpressure,
203        );
204
205        let control = Arc::new(StreamControl::new());
206        let control_clone = Arc::clone(&control);
207
208        let config = self.config.clone();
209
210        // Spawn generation thread
211        thread::spawn(move || {
212            let result = Self::run_generation(config, sender, control_clone);
213            if let Err(e) = result {
214                warn!("Streaming generation error: {}", e);
215            }
216        });
217
218        Ok((receiver, control))
219    }
220
221    /// Runs the generation process.
222    fn run_generation(
223        config: StreamingOrchestratorConfig,
224        sender: StreamSender<GeneratedItem>,
225        control: Arc<StreamControl>,
226    ) -> SynthResult<()> {
227        let start_time = Instant::now();
228        let mut items_generated: u64 = 0;
229        let mut phases_completed = Vec::new();
230
231        // Track stats
232        let progress_interval = config.stream_config.progress_interval;
233
234        // Send initial progress
235        let mut progress = StreamProgress::new("initializing");
236        sender.send(StreamEvent::Progress(progress.clone()))?;
237
238        for phase in &config.phases {
239            if control.is_cancelled() {
240                info!("Generation cancelled");
241                break;
242            }
243
244            // Handle pause
245            while control.is_paused() {
246                std::thread::sleep(std::time::Duration::from_millis(100));
247                if control.is_cancelled() {
248                    break;
249                }
250            }
251
252            progress.phase = phase.name().to_string();
253            sender.send(StreamEvent::Progress(progress.clone()))?;
254
255            match phase {
256                GenerationPhase::ChartOfAccounts => {
257                    let result =
258                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
259                    items_generated += result;
260                }
261                GenerationPhase::MasterData => {
262                    let result = Self::generate_master_data_phase(
263                        &config.generator_config,
264                        &sender,
265                        &control,
266                    )?;
267                    items_generated += result;
268                }
269                GenerationPhase::DocumentFlows => {
270                    let result = Self::generate_document_flows_phase(
271                        &config.generator_config,
272                        &sender,
273                        &control,
274                        progress_interval,
275                        &mut progress,
276                    )?;
277                    items_generated += result;
278                }
279                GenerationPhase::OcpmEvents => {
280                    // OCPM event generation is optional and requires document flows
281                    debug!("OCPM events phase - skipping (documents should be generated via P2P/O2C generators)");
282                }
283                GenerationPhase::JournalEntries => {
284                    let result = Self::generate_journal_entries_phase(
285                        &config.generator_config,
286                        &sender,
287                        &control,
288                        progress_interval,
289                        &mut progress,
290                    )?;
291                    items_generated += result;
292                }
293                GenerationPhase::AnomalyInjection | GenerationPhase::DataQuality => {
294                    // These phases modify existing data; not directly generating new items
295                    debug!(
296                        "Phase {:?} operates on existing data (not streaming new items)",
297                        phase
298                    );
299                }
300                GenerationPhase::BalanceValidation | GenerationPhase::Complete => {
301                    // Validation and completion phases don't generate items
302                    debug!("Phase {:?} is a validation/completion phase", phase);
303                }
304            }
305
306            // Send phase completion
307            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
308                phase.name().to_string(),
309            )))?;
310            phases_completed.push(phase.name().to_string());
311
312            // Update progress
313            progress.items_generated = items_generated;
314            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
315            if progress.elapsed_ms > 0 {
316                progress.items_per_second =
317                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
318            }
319            sender.send(StreamEvent::Progress(progress.clone()))?;
320        }
321
322        // Send completion
323        let stats = sender.stats();
324        let summary = StreamSummary {
325            total_items: items_generated,
326            total_time_ms: start_time.elapsed().as_millis() as u64,
327            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
328                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
329            } else {
330                0.0
331            },
332            error_count: 0,
333            dropped_count: stats.items_dropped,
334            peak_memory_mb: None,
335            phases_completed,
336        };
337
338        sender.send(StreamEvent::Complete(summary))?;
339        sender.close();
340
341        Ok(())
342    }
343
344    /// Generates Chart of Accounts phase.
345    fn generate_coa_phase(
346        config: &GeneratorConfig,
347        sender: &StreamSender<GeneratedItem>,
348        control: &Arc<StreamControl>,
349    ) -> SynthResult<u64> {
350        use datasynth_generators::ChartOfAccountsGenerator;
351
352        if control.is_cancelled() {
353            return Ok(0);
354        }
355
356        info!("Generating Chart of Accounts");
357        let seed = config.global.seed.unwrap_or(42);
358        let complexity = config.chart_of_accounts.complexity;
359        let industry = config.global.industry;
360
361        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
362        let coa = coa_gen.generate();
363
364        let account_count = coa.account_count() as u64;
365        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
366            coa,
367        ))))?;
368
369        Ok(account_count)
370    }
371
372    /// Generates master data phase.
373    fn generate_master_data_phase(
374        config: &GeneratorConfig,
375        sender: &StreamSender<GeneratedItem>,
376        control: &Arc<StreamControl>,
377    ) -> SynthResult<u64> {
378        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
379
380        let mut count: u64 = 0;
381        let seed = config.global.seed.unwrap_or(42);
382        let md_config = &config.master_data;
383        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
384            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
385
386        let company_code = config
387            .companies
388            .first()
389            .map(|c| c.code.as_str())
390            .unwrap_or("1000");
391
392        // Generate vendors
393        if control.is_cancelled() {
394            return Ok(count);
395        }
396
397        info!("Generating vendors");
398        let mut vendor_gen = VendorGenerator::new(seed);
399        for _ in 0..md_config.vendors.count {
400            if control.is_cancelled() {
401                break;
402            }
403            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
404            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
405            count += 1;
406        }
407
408        // Generate customers
409        if control.is_cancelled() {
410            return Ok(count);
411        }
412
413        info!("Generating customers");
414        let mut customer_gen = CustomerGenerator::new(seed + 1);
415        for _ in 0..md_config.customers.count {
416            if control.is_cancelled() {
417                break;
418            }
419            let customer = customer_gen.generate_customer(company_code, effective_date);
420            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
421                customer,
422            ))))?;
423            count += 1;
424        }
425
426        // Generate employees
427        if control.is_cancelled() {
428            return Ok(count);
429        }
430
431        info!("Generating employees");
432        let mut employee_gen = EmployeeGenerator::new(seed + 4);
433        // Use first department from config
434        let dept = datasynth_generators::DepartmentDefinition {
435            code: "1000".to_string(),
436            name: "General".to_string(),
437            cost_center: "CC1000".to_string(),
438            headcount: 10,
439            system_roles: vec![],
440            transaction_codes: vec![],
441        };
442        for _ in 0..md_config.employees.count {
443            if control.is_cancelled() {
444                break;
445            }
446            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
447            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
448                employee,
449            ))))?;
450            count += 1;
451        }
452
453        Ok(count)
454    }
455
456    /// Generates journal entries phase.
457    ///
458    /// Note: This is a simplified version that generates basic journal entries.
459    /// For full-featured generation with all options, use EnhancedOrchestrator.
460    fn generate_journal_entries_phase(
461        config: &GeneratorConfig,
462        sender: &StreamSender<GeneratedItem>,
463        control: &Arc<StreamControl>,
464        progress_interval: u64,
465        progress: &mut StreamProgress,
466    ) -> SynthResult<u64> {
467        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
468        use std::sync::Arc;
469
470        let mut count: u64 = 0;
471        let seed = config.global.seed.unwrap_or(42);
472
473        // Calculate total entries to generate based on volume weights
474        let default_monthly = 500;
475        let total_entries: usize = config
476            .companies
477            .iter()
478            .map(|c| {
479                let monthly = (c.volume_weight * default_monthly as f64) as usize;
480                monthly.max(100) * config.global.period_months as usize
481            })
482            .sum();
483
484        progress.items_remaining = Some(total_entries as u64);
485        info!("Generating {} journal entries", total_entries);
486
487        // Generate a shared CoA for all companies
488        let complexity = config.chart_of_accounts.complexity;
489        let industry = config.global.industry;
490        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
491        let coa = Arc::new(coa_gen.generate());
492
493        // Parse start date
494        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
495            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
496        let end_date =
497            start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
498
499        // Create JE generator from config
500        let mut je_gen = JournalEntryGenerator::from_generator_config(
501            config,
502            Arc::clone(&coa),
503            start_date,
504            end_date,
505            seed,
506        );
507
508        for _ in 0..total_entries {
509            if control.is_cancelled() {
510                break;
511            }
512
513            // Handle pause
514            while control.is_paused() {
515                std::thread::sleep(std::time::Duration::from_millis(100));
516                if control.is_cancelled() {
517                    break;
518                }
519            }
520
521            let je = je_gen.generate();
522            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
523            count += 1;
524
525            // Send progress updates
526            if count % progress_interval == 0 {
527                progress.items_generated = count;
528                progress.items_remaining = Some(total_entries as u64 - count);
529                sender.send(StreamEvent::Progress(progress.clone()))?;
530            }
531        }
532
533        Ok(count)
534    }
535
536    /// Generates document flows phase (P2P and O2C).
537    ///
538    /// Creates complete document chains:
539    /// - P2P: PurchaseOrder → GoodsReceipt → VendorInvoice → Payment
540    /// - O2C: SalesOrder → Delivery → CustomerInvoice
541    fn generate_document_flows_phase(
542        config: &GeneratorConfig,
543        sender: &StreamSender<GeneratedItem>,
544        control: &Arc<StreamControl>,
545        progress_interval: u64,
546        progress: &mut StreamProgress,
547    ) -> SynthResult<u64> {
548        use chrono::Datelike;
549        use datasynth_generators::{
550            CustomerGenerator, MaterialGenerator, O2CGenerator, P2PGenerator, VendorGenerator,
551        };
552
553        let mut count: u64 = 0;
554        let seed = config.global.seed.unwrap_or(42);
555        let df_config = &config.document_flows;
556        let md_config = &config.master_data;
557
558        // Parse dates
559        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
560            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
561
562        let company_code = config
563            .companies
564            .first()
565            .map(|c| c.code.as_str())
566            .unwrap_or("1000");
567
568        // Use master data config counts for generating reference data
569        let vendor_count = md_config.vendors.count.min(100);
570        let customer_count = md_config.customers.count.min(100);
571        let material_count = md_config.materials.count.min(50);
572
573        // Generate some master data for document flows
574        let mut vendor_gen = VendorGenerator::new(seed);
575        let mut customer_gen = CustomerGenerator::new(seed + 1);
576        let mut material_gen = MaterialGenerator::new(seed + 2);
577
578        let vendors: Vec<_> = (0..vendor_count)
579            .map(|_| vendor_gen.generate_vendor(company_code, start_date))
580            .collect();
581
582        let customers: Vec<_> = (0..customer_count)
583            .map(|_| customer_gen.generate_customer(company_code, start_date))
584            .collect();
585
586        let materials: Vec<_> = (0..material_count)
587            .map(|_| material_gen.generate_material(company_code, start_date))
588            .collect();
589
590        // Determine number of chains based on transaction volume
591        // Use period months as a multiplier for document chains
592        let base_chains = (config.global.period_months as usize * 50).max(100);
593
594        // P2P Generation
595        if df_config.p2p.enabled && !vendors.is_empty() && !materials.is_empty() {
596            info!("Generating P2P document flows");
597            let mut p2p_gen = P2PGenerator::new(seed + 100);
598
599            let chains_to_generate = base_chains.min(1000);
600            progress.items_remaining = Some(chains_to_generate as u64);
601
602            for i in 0..chains_to_generate {
603                if control.is_cancelled() {
604                    break;
605                }
606
607                // Handle pause
608                while control.is_paused() {
609                    std::thread::sleep(std::time::Duration::from_millis(100));
610                    if control.is_cancelled() {
611                        break;
612                    }
613                }
614
615                let vendor = &vendors[i % vendors.len()];
616                let material_refs: Vec<&datasynth_core::models::Material> =
617                    vec![&materials[i % materials.len()]];
618
619                // Calculate posting date within the period
620                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
621                let po_date = start_date + chrono::Duration::days(days_offset);
622                let fiscal_year = po_date.year() as u16;
623                let fiscal_period = po_date.month() as u8;
624
625                let chain = p2p_gen.generate_chain(
626                    company_code,
627                    vendor,
628                    &material_refs,
629                    po_date,
630                    fiscal_year,
631                    fiscal_period,
632                    "SYSTEM",
633                );
634
635                // Send each document in the chain
636                sender.send(StreamEvent::Data(GeneratedItem::PurchaseOrder(Box::new(
637                    chain.purchase_order,
638                ))))?;
639                count += 1;
640
641                for gr in chain.goods_receipts {
642                    sender.send(StreamEvent::Data(GeneratedItem::GoodsReceipt(Box::new(gr))))?;
643                    count += 1;
644                }
645
646                if let Some(vi) = chain.vendor_invoice {
647                    sender.send(StreamEvent::Data(GeneratedItem::VendorInvoice(Box::new(
648                        vi,
649                    ))))?;
650                    count += 1;
651                }
652
653                if let Some(payment) = chain.payment {
654                    sender.send(StreamEvent::Data(GeneratedItem::Payment(Box::new(payment))))?;
655                    count += 1;
656                }
657
658                if count % progress_interval == 0 {
659                    progress.items_generated = count;
660                    sender.send(StreamEvent::Progress(progress.clone()))?;
661                }
662            }
663        }
664
665        // O2C Generation
666        if df_config.o2c.enabled && !customers.is_empty() && !materials.is_empty() {
667            info!("Generating O2C document flows");
668            let mut o2c_gen = O2CGenerator::new(seed + 200);
669
670            let chains_to_generate = base_chains.min(1000);
671
672            for i in 0..chains_to_generate {
673                if control.is_cancelled() {
674                    break;
675                }
676
677                while control.is_paused() {
678                    std::thread::sleep(std::time::Duration::from_millis(100));
679                    if control.is_cancelled() {
680                        break;
681                    }
682                }
683
684                let customer = &customers[i % customers.len()];
685                let material_refs: Vec<&datasynth_core::models::Material> =
686                    vec![&materials[i % materials.len()]];
687
688                let days_offset = (i as i64 % (config.global.period_months as i64 * 30)).max(0);
689                let so_date = start_date + chrono::Duration::days(days_offset);
690                let fiscal_year = so_date.year() as u16;
691                let fiscal_period = so_date.month() as u8;
692
693                let chain = o2c_gen.generate_chain(
694                    company_code,
695                    customer,
696                    &material_refs,
697                    so_date,
698                    fiscal_year,
699                    fiscal_period,
700                    "SYSTEM",
701                );
702
703                sender.send(StreamEvent::Data(GeneratedItem::SalesOrder(Box::new(
704                    chain.sales_order,
705                ))))?;
706                count += 1;
707
708                for delivery in chain.deliveries {
709                    sender.send(StreamEvent::Data(GeneratedItem::Delivery(Box::new(
710                        delivery,
711                    ))))?;
712                    count += 1;
713                }
714
715                if let Some(ci) = chain.customer_invoice {
716                    sender.send(StreamEvent::Data(GeneratedItem::CustomerInvoice(Box::new(
717                        ci,
718                    ))))?;
719                    count += 1;
720                }
721
722                if count % progress_interval == 0 {
723                    progress.items_generated = count;
724                    sender.send(StreamEvent::Progress(progress.clone()))?;
725                }
726            }
727        }
728
729        Ok(count)
730    }
731
732    /// Returns the orchestrator configuration stats.
733    pub fn stats(&self) -> StreamingOrchestratorStats {
734        StreamingOrchestratorStats {
735            phases: self.config.phases.len(),
736            buffer_size: self.config.stream_config.buffer_size,
737            backpressure: self.config.stream_config.backpressure,
738        }
739    }
740}
741
742/// Statistics for the streaming orchestrator.
743#[derive(Debug, Clone)]
744pub struct StreamingOrchestratorStats {
745    /// Number of phases configured.
746    pub phases: usize,
747    /// Buffer size.
748    pub buffer_size: usize,
749    /// Backpressure strategy.
750    pub backpressure: BackpressureStrategy,
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use datasynth_config::presets::create_preset;
757    use datasynth_config::schema::TransactionVolume;
758    use datasynth_core::models::{CoAComplexity, IndustrySector};
759
760    fn create_test_config() -> GeneratorConfig {
761        create_preset(
762            IndustrySector::Retail,
763            2,
764            3,
765            CoAComplexity::Small,
766            TransactionVolume::TenK,
767        )
768    }
769
770    #[test]
771    fn test_streaming_orchestrator_creation() {
772        let config = create_test_config();
773        let orchestrator = StreamingOrchestrator::from_generator_config(config);
774        let stats = orchestrator.stats();
775
776        assert!(stats.phases > 0);
777        assert!(stats.buffer_size > 0);
778    }
779
780    #[test]
781    fn test_streaming_generation() {
782        let mut config = create_test_config();
783        // Reduce volume for testing
784        config.master_data.vendors.count = 5;
785        config.master_data.customers.count = 5;
786        config.master_data.employees.count = 5;
787        config.global.period_months = 1;
788
789        let streaming_config = StreamingOrchestratorConfig::new(config)
790            .with_phases(vec![
791                GenerationPhase::ChartOfAccounts,
792                GenerationPhase::MasterData,
793            ])
794            .with_stream_config(StreamConfig {
795                buffer_size: 100,
796                progress_interval: 10,
797                ..Default::default()
798            });
799
800        let orchestrator = StreamingOrchestrator::new(streaming_config);
801        let (receiver, _control) = orchestrator.stream().unwrap();
802
803        let mut items_count = 0;
804        let mut has_coa = false;
805        let mut has_completion = false;
806
807        for event in receiver {
808            match event {
809                StreamEvent::Data(item) => {
810                    items_count += 1;
811                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
812                        has_coa = true;
813                    }
814                }
815                StreamEvent::Complete(_) => {
816                    has_completion = true;
817                    break;
818                }
819                _ => {}
820            }
821        }
822
823        assert!(items_count > 0);
824        assert!(has_coa);
825        assert!(has_completion);
826    }
827
828    #[test]
829    fn test_stream_cancellation() {
830        let mut config = create_test_config();
831        config.global.period_months = 12; // Longer generation
832
833        let streaming_config = StreamingOrchestratorConfig::new(config)
834            .with_phases(vec![GenerationPhase::JournalEntries]);
835
836        let orchestrator = StreamingOrchestrator::new(streaming_config);
837        let (receiver, control) = orchestrator.stream().unwrap();
838
839        // Cancel after receiving some items
840        let mut items_count = 0;
841        for event in receiver {
842            if let StreamEvent::Data(_) = event {
843                items_count += 1;
844                if items_count >= 10 {
845                    control.cancel();
846                    break;
847                }
848            }
849        }
850
851        assert!(control.is_cancelled());
852    }
853
854    #[test]
855    fn test_generated_item_type_name() {
856        use datasynth_core::models::{CoAComplexity, IndustrySector};
857
858        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
859            "TEST_COA".to_string(),
860            "Test Chart of Accounts".to_string(),
861            "US".to_string(),
862            IndustrySector::Manufacturing,
863            CoAComplexity::Small,
864        )));
865        assert_eq!(coa.type_name(), "chart_of_accounts");
866
867        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
868        assert_eq!(progress.type_name(), "progress");
869    }
870}