Skip to main content

datasynth_runtime/
streaming_orchestrator.rs

1//! Streaming orchestrator for real-time data generation.
2//!
3//! This orchestrator provides streaming capabilities with backpressure,
4//! progress reporting, and control for real-time data generation.
5
6use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor};
16use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
17use datasynth_core::traits::{
18    BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
19};
20
21/// Generated items that can be streamed.
22#[derive(Debug, Clone)]
23pub enum GeneratedItem {
24    /// Chart of Accounts.
25    ChartOfAccounts(Box<ChartOfAccounts>),
26    /// A vendor.
27    Vendor(Box<Vendor>),
28    /// A customer.
29    Customer(Box<Customer>),
30    /// A material.
31    Material(Box<Material>),
32    /// An employee.
33    Employee(Box<Employee>),
34    /// A journal entry.
35    JournalEntry(Box<JournalEntry>),
36    /// Progress update.
37    Progress(StreamProgress),
38    /// Phase completion marker.
39    PhaseComplete(String),
40}
41
42impl GeneratedItem {
43    /// Returns the item type name.
44    pub fn type_name(&self) -> &'static str {
45        match self {
46            GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
47            GeneratedItem::Vendor(_) => "vendor",
48            GeneratedItem::Customer(_) => "customer",
49            GeneratedItem::Material(_) => "material",
50            GeneratedItem::Employee(_) => "employee",
51            GeneratedItem::JournalEntry(_) => "journal_entry",
52            GeneratedItem::Progress(_) => "progress",
53            GeneratedItem::PhaseComplete(_) => "phase_complete",
54        }
55    }
56}
57
58/// Phase of generation.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum GenerationPhase {
61    /// Chart of Accounts generation.
62    ChartOfAccounts,
63    /// Master data generation (vendors, customers, etc.).
64    MasterData,
65    /// Document flow generation (P2P, O2C).
66    DocumentFlows,
67    /// Journal entry generation.
68    JournalEntries,
69    /// Anomaly injection.
70    AnomalyInjection,
71    /// Balance validation.
72    BalanceValidation,
73    /// Data quality injection.
74    DataQuality,
75    /// Complete.
76    Complete,
77}
78
79impl GenerationPhase {
80    /// Returns the phase name.
81    pub fn name(&self) -> &'static str {
82        match self {
83            GenerationPhase::ChartOfAccounts => "chart_of_accounts",
84            GenerationPhase::MasterData => "master_data",
85            GenerationPhase::DocumentFlows => "document_flows",
86            GenerationPhase::JournalEntries => "journal_entries",
87            GenerationPhase::AnomalyInjection => "anomaly_injection",
88            GenerationPhase::BalanceValidation => "balance_validation",
89            GenerationPhase::DataQuality => "data_quality",
90            GenerationPhase::Complete => "complete",
91        }
92    }
93}
94
95/// Configuration for streaming orchestration.
96#[derive(Debug, Clone)]
97pub struct StreamingOrchestratorConfig {
98    /// Generator configuration.
99    pub generator_config: GeneratorConfig,
100    /// Stream configuration.
101    pub stream_config: StreamConfig,
102    /// Phases to execute.
103    pub phases: Vec<GenerationPhase>,
104}
105
106impl StreamingOrchestratorConfig {
107    /// Creates a new streaming orchestrator configuration.
108    pub fn new(generator_config: GeneratorConfig) -> Self {
109        Self {
110            generator_config,
111            stream_config: StreamConfig::default(),
112            phases: vec![
113                GenerationPhase::ChartOfAccounts,
114                GenerationPhase::MasterData,
115                GenerationPhase::JournalEntries,
116            ],
117        }
118    }
119
120    /// Sets the stream configuration.
121    pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
122        self.stream_config = config;
123        self
124    }
125
126    /// Sets the phases to execute.
127    pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
128        self.phases = phases;
129        self
130    }
131}
132
133/// Streaming orchestrator for real-time data generation.
134pub struct StreamingOrchestrator {
135    config: StreamingOrchestratorConfig,
136}
137
138impl StreamingOrchestrator {
139    /// Creates a new streaming orchestrator.
140    pub fn new(config: StreamingOrchestratorConfig) -> Self {
141        Self { config }
142    }
143
144    /// Creates a streaming orchestrator from generator config with defaults.
145    pub fn from_generator_config(config: GeneratorConfig) -> Self {
146        Self::new(StreamingOrchestratorConfig::new(config))
147    }
148
149    /// Starts streaming generation.
150    ///
151    /// Returns a receiver for stream events and a control handle.
152    pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
153        let (sender, receiver) = stream_channel(
154            self.config.stream_config.buffer_size,
155            self.config.stream_config.backpressure,
156        );
157
158        let control = Arc::new(StreamControl::new());
159        let control_clone = Arc::clone(&control);
160
161        let config = self.config.clone();
162
163        // Spawn generation thread
164        thread::spawn(move || {
165            let result = Self::run_generation(config, sender, control_clone);
166            if let Err(e) = result {
167                warn!("Streaming generation error: {}", e);
168            }
169        });
170
171        Ok((receiver, control))
172    }
173
174    /// Runs the generation process.
175    fn run_generation(
176        config: StreamingOrchestratorConfig,
177        sender: StreamSender<GeneratedItem>,
178        control: Arc<StreamControl>,
179    ) -> SynthResult<()> {
180        let start_time = Instant::now();
181        let mut items_generated: u64 = 0;
182        let mut phases_completed = Vec::new();
183
184        // Track stats
185        let progress_interval = config.stream_config.progress_interval;
186
187        // Send initial progress
188        let mut progress = StreamProgress::new("initializing");
189        sender.send(StreamEvent::Progress(progress.clone()))?;
190
191        for phase in &config.phases {
192            if control.is_cancelled() {
193                info!("Generation cancelled");
194                break;
195            }
196
197            // Handle pause
198            while control.is_paused() {
199                std::thread::sleep(std::time::Duration::from_millis(100));
200                if control.is_cancelled() {
201                    break;
202                }
203            }
204
205            progress.phase = phase.name().to_string();
206            sender.send(StreamEvent::Progress(progress.clone()))?;
207
208            match phase {
209                GenerationPhase::ChartOfAccounts => {
210                    let result =
211                        Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
212                    items_generated += result;
213                }
214                GenerationPhase::MasterData => {
215                    let result = Self::generate_master_data_phase(
216                        &config.generator_config,
217                        &sender,
218                        &control,
219                    )?;
220                    items_generated += result;
221                }
222                GenerationPhase::JournalEntries => {
223                    let result = Self::generate_journal_entries_phase(
224                        &config.generator_config,
225                        &sender,
226                        &control,
227                        progress_interval,
228                        &mut progress,
229                    )?;
230                    items_generated += result;
231                }
232                _ => {
233                    // Other phases can be added incrementally
234                    debug!(
235                        "Skipping phase {:?} (not yet implemented for streaming)",
236                        phase
237                    );
238                }
239            }
240
241            // Send phase completion
242            sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
243                phase.name().to_string(),
244            )))?;
245            phases_completed.push(phase.name().to_string());
246
247            // Update progress
248            progress.items_generated = items_generated;
249            progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
250            if progress.elapsed_ms > 0 {
251                progress.items_per_second =
252                    (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
253            }
254            sender.send(StreamEvent::Progress(progress.clone()))?;
255        }
256
257        // Send completion
258        let stats = sender.stats();
259        let summary = StreamSummary {
260            total_items: items_generated,
261            total_time_ms: start_time.elapsed().as_millis() as u64,
262            avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
263                (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
264            } else {
265                0.0
266            },
267            error_count: 0,
268            dropped_count: stats.items_dropped,
269            peak_memory_mb: None,
270            phases_completed,
271        };
272
273        sender.send(StreamEvent::Complete(summary))?;
274        sender.close();
275
276        Ok(())
277    }
278
279    /// Generates Chart of Accounts phase.
280    fn generate_coa_phase(
281        config: &GeneratorConfig,
282        sender: &StreamSender<GeneratedItem>,
283        control: &Arc<StreamControl>,
284    ) -> SynthResult<u64> {
285        use datasynth_generators::ChartOfAccountsGenerator;
286
287        if control.is_cancelled() {
288            return Ok(0);
289        }
290
291        info!("Generating Chart of Accounts");
292        let seed = config.global.seed.unwrap_or(42);
293        let complexity = config.chart_of_accounts.complexity;
294        let industry = config.global.industry;
295
296        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
297        let coa = coa_gen.generate();
298
299        let account_count = coa.account_count() as u64;
300        sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
301            coa,
302        ))))?;
303
304        Ok(account_count)
305    }
306
307    /// Generates master data phase.
308    fn generate_master_data_phase(
309        config: &GeneratorConfig,
310        sender: &StreamSender<GeneratedItem>,
311        control: &Arc<StreamControl>,
312    ) -> SynthResult<u64> {
313        use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
314
315        let mut count: u64 = 0;
316        let seed = config.global.seed.unwrap_or(42);
317        let md_config = &config.master_data;
318        let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
319            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
320
321        let company_code = config
322            .companies
323            .first()
324            .map(|c| c.code.as_str())
325            .unwrap_or("1000");
326
327        // Generate vendors
328        if control.is_cancelled() {
329            return Ok(count);
330        }
331
332        info!("Generating vendors");
333        let mut vendor_gen = VendorGenerator::new(seed);
334        for _ in 0..md_config.vendors.count {
335            if control.is_cancelled() {
336                break;
337            }
338            let vendor = vendor_gen.generate_vendor(company_code, effective_date);
339            sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
340            count += 1;
341        }
342
343        // Generate customers
344        if control.is_cancelled() {
345            return Ok(count);
346        }
347
348        info!("Generating customers");
349        let mut customer_gen = CustomerGenerator::new(seed + 1);
350        for _ in 0..md_config.customers.count {
351            if control.is_cancelled() {
352                break;
353            }
354            let customer = customer_gen.generate_customer(company_code, effective_date);
355            sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
356                customer,
357            ))))?;
358            count += 1;
359        }
360
361        // Generate employees
362        if control.is_cancelled() {
363            return Ok(count);
364        }
365
366        info!("Generating employees");
367        let mut employee_gen = EmployeeGenerator::new(seed + 4);
368        // Use first department from config
369        let dept = datasynth_generators::DepartmentDefinition {
370            code: "1000".to_string(),
371            name: "General".to_string(),
372            cost_center: "CC1000".to_string(),
373            headcount: 10,
374            system_roles: vec![],
375            transaction_codes: vec![],
376        };
377        for _ in 0..md_config.employees.count {
378            if control.is_cancelled() {
379                break;
380            }
381            let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
382            sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
383                employee,
384            ))))?;
385            count += 1;
386        }
387
388        Ok(count)
389    }
390
391    /// Generates journal entries phase.
392    ///
393    /// Note: This is a simplified version that generates basic journal entries.
394    /// For full-featured generation with all options, use EnhancedOrchestrator.
395    fn generate_journal_entries_phase(
396        config: &GeneratorConfig,
397        sender: &StreamSender<GeneratedItem>,
398        control: &Arc<StreamControl>,
399        progress_interval: u64,
400        progress: &mut StreamProgress,
401    ) -> SynthResult<u64> {
402        use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
403        use std::sync::Arc;
404
405        let mut count: u64 = 0;
406        let seed = config.global.seed.unwrap_or(42);
407
408        // Calculate total entries to generate based on volume weights
409        let default_monthly = 500;
410        let total_entries: usize = config
411            .companies
412            .iter()
413            .map(|c| {
414                let monthly = (c.volume_weight * default_monthly as f64) as usize;
415                monthly.max(100) * config.global.period_months as usize
416            })
417            .sum();
418
419        progress.items_remaining = Some(total_entries as u64);
420        info!("Generating {} journal entries", total_entries);
421
422        // Generate a shared CoA for all companies
423        let complexity = config.chart_of_accounts.complexity;
424        let industry = config.global.industry;
425        let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
426        let coa = Arc::new(coa_gen.generate());
427
428        // Parse start date
429        let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
430            .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
431        let end_date =
432            start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
433
434        // Create JE generator from config
435        let mut je_gen = JournalEntryGenerator::from_generator_config(
436            config,
437            Arc::clone(&coa),
438            start_date,
439            end_date,
440            seed,
441        );
442
443        for _ in 0..total_entries {
444            if control.is_cancelled() {
445                break;
446            }
447
448            // Handle pause
449            while control.is_paused() {
450                std::thread::sleep(std::time::Duration::from_millis(100));
451                if control.is_cancelled() {
452                    break;
453                }
454            }
455
456            let je = je_gen.generate();
457            sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
458            count += 1;
459
460            // Send progress updates
461            if count % progress_interval == 0 {
462                progress.items_generated = count;
463                progress.items_remaining = Some(total_entries as u64 - count);
464                sender.send(StreamEvent::Progress(progress.clone()))?;
465            }
466        }
467
468        Ok(count)
469    }
470
471    /// Returns the orchestrator configuration stats.
472    pub fn stats(&self) -> StreamingOrchestratorStats {
473        StreamingOrchestratorStats {
474            phases: self.config.phases.len(),
475            buffer_size: self.config.stream_config.buffer_size,
476            backpressure: self.config.stream_config.backpressure,
477        }
478    }
479}
480
481/// Statistics for the streaming orchestrator.
482#[derive(Debug, Clone)]
483pub struct StreamingOrchestratorStats {
484    /// Number of phases configured.
485    pub phases: usize,
486    /// Buffer size.
487    pub buffer_size: usize,
488    /// Backpressure strategy.
489    pub backpressure: BackpressureStrategy,
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use datasynth_config::presets::create_preset;
496    use datasynth_config::schema::TransactionVolume;
497    use datasynth_core::models::{CoAComplexity, IndustrySector};
498
499    fn create_test_config() -> GeneratorConfig {
500        create_preset(
501            IndustrySector::Retail,
502            2,
503            3,
504            CoAComplexity::Small,
505            TransactionVolume::TenK,
506        )
507    }
508
509    #[test]
510    fn test_streaming_orchestrator_creation() {
511        let config = create_test_config();
512        let orchestrator = StreamingOrchestrator::from_generator_config(config);
513        let stats = orchestrator.stats();
514
515        assert!(stats.phases > 0);
516        assert!(stats.buffer_size > 0);
517    }
518
519    #[test]
520    fn test_streaming_generation() {
521        let mut config = create_test_config();
522        // Reduce volume for testing
523        config.master_data.vendors.count = 5;
524        config.master_data.customers.count = 5;
525        config.master_data.employees.count = 5;
526        config.global.period_months = 1;
527
528        let streaming_config = StreamingOrchestratorConfig::new(config)
529            .with_phases(vec![
530                GenerationPhase::ChartOfAccounts,
531                GenerationPhase::MasterData,
532            ])
533            .with_stream_config(StreamConfig {
534                buffer_size: 100,
535                progress_interval: 10,
536                ..Default::default()
537            });
538
539        let orchestrator = StreamingOrchestrator::new(streaming_config);
540        let (receiver, _control) = orchestrator.stream().unwrap();
541
542        let mut items_count = 0;
543        let mut has_coa = false;
544        let mut has_completion = false;
545
546        for event in receiver {
547            match event {
548                StreamEvent::Data(item) => {
549                    items_count += 1;
550                    if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
551                        has_coa = true;
552                    }
553                }
554                StreamEvent::Complete(_) => {
555                    has_completion = true;
556                    break;
557                }
558                _ => {}
559            }
560        }
561
562        assert!(items_count > 0);
563        assert!(has_coa);
564        assert!(has_completion);
565    }
566
567    #[test]
568    fn test_stream_cancellation() {
569        let mut config = create_test_config();
570        config.global.period_months = 12; // Longer generation
571
572        let streaming_config = StreamingOrchestratorConfig::new(config)
573            .with_phases(vec![GenerationPhase::JournalEntries]);
574
575        let orchestrator = StreamingOrchestrator::new(streaming_config);
576        let (receiver, control) = orchestrator.stream().unwrap();
577
578        // Cancel after receiving some items
579        let mut items_count = 0;
580        for event in receiver {
581            if let StreamEvent::Data(_) = event {
582                items_count += 1;
583                if items_count >= 10 {
584                    control.cancel();
585                    break;
586                }
587            }
588        }
589
590        assert!(control.is_cancelled());
591    }
592
593    #[test]
594    fn test_generated_item_type_name() {
595        use datasynth_core::models::{CoAComplexity, IndustrySector};
596
597        let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
598            "TEST_COA".to_string(),
599            "Test Chart of Accounts".to_string(),
600            "US".to_string(),
601            IndustrySector::Manufacturing,
602            CoAComplexity::Small,
603        )));
604        assert_eq!(coa.type_name(), "chart_of_accounts");
605
606        let progress = GeneratedItem::Progress(StreamProgress::new("test"));
607        assert_eq!(progress.type_name(), "progress");
608    }
609}