1use std::sync::Arc;
7use std::thread;
8use std::time::Instant;
9
10use chrono::NaiveDate;
11use tracing::{debug, info, warn};
12
13use datasynth_config::schema::GeneratorConfig;
14use datasynth_core::error::SynthResult;
15use datasynth_core::models::{ChartOfAccounts, Customer, Employee, JournalEntry, Material, Vendor};
16use datasynth_core::streaming::{stream_channel, StreamReceiver, StreamSender};
17use datasynth_core::traits::{
18 BackpressureStrategy, StreamConfig, StreamControl, StreamEvent, StreamProgress, StreamSummary,
19};
20
21#[derive(Debug, Clone)]
23pub enum GeneratedItem {
24 ChartOfAccounts(Box<ChartOfAccounts>),
26 Vendor(Box<Vendor>),
28 Customer(Box<Customer>),
30 Material(Box<Material>),
32 Employee(Box<Employee>),
34 JournalEntry(Box<JournalEntry>),
36 Progress(StreamProgress),
38 PhaseComplete(String),
40}
41
42impl GeneratedItem {
43 pub fn type_name(&self) -> &'static str {
45 match self {
46 GeneratedItem::ChartOfAccounts(_) => "chart_of_accounts",
47 GeneratedItem::Vendor(_) => "vendor",
48 GeneratedItem::Customer(_) => "customer",
49 GeneratedItem::Material(_) => "material",
50 GeneratedItem::Employee(_) => "employee",
51 GeneratedItem::JournalEntry(_) => "journal_entry",
52 GeneratedItem::Progress(_) => "progress",
53 GeneratedItem::PhaseComplete(_) => "phase_complete",
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum GenerationPhase {
61 ChartOfAccounts,
63 MasterData,
65 DocumentFlows,
67 JournalEntries,
69 AnomalyInjection,
71 BalanceValidation,
73 DataQuality,
75 Complete,
77}
78
79impl GenerationPhase {
80 pub fn name(&self) -> &'static str {
82 match self {
83 GenerationPhase::ChartOfAccounts => "chart_of_accounts",
84 GenerationPhase::MasterData => "master_data",
85 GenerationPhase::DocumentFlows => "document_flows",
86 GenerationPhase::JournalEntries => "journal_entries",
87 GenerationPhase::AnomalyInjection => "anomaly_injection",
88 GenerationPhase::BalanceValidation => "balance_validation",
89 GenerationPhase::DataQuality => "data_quality",
90 GenerationPhase::Complete => "complete",
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct StreamingOrchestratorConfig {
98 pub generator_config: GeneratorConfig,
100 pub stream_config: StreamConfig,
102 pub phases: Vec<GenerationPhase>,
104}
105
106impl StreamingOrchestratorConfig {
107 pub fn new(generator_config: GeneratorConfig) -> Self {
109 Self {
110 generator_config,
111 stream_config: StreamConfig::default(),
112 phases: vec![
113 GenerationPhase::ChartOfAccounts,
114 GenerationPhase::MasterData,
115 GenerationPhase::JournalEntries,
116 ],
117 }
118 }
119
120 pub fn with_stream_config(mut self, config: StreamConfig) -> Self {
122 self.stream_config = config;
123 self
124 }
125
126 pub fn with_phases(mut self, phases: Vec<GenerationPhase>) -> Self {
128 self.phases = phases;
129 self
130 }
131}
132
133pub struct StreamingOrchestrator {
135 config: StreamingOrchestratorConfig,
136}
137
138impl StreamingOrchestrator {
139 pub fn new(config: StreamingOrchestratorConfig) -> Self {
141 Self { config }
142 }
143
144 pub fn from_generator_config(config: GeneratorConfig) -> Self {
146 Self::new(StreamingOrchestratorConfig::new(config))
147 }
148
149 pub fn stream(&self) -> SynthResult<(StreamReceiver<GeneratedItem>, Arc<StreamControl>)> {
153 let (sender, receiver) = stream_channel(
154 self.config.stream_config.buffer_size,
155 self.config.stream_config.backpressure,
156 );
157
158 let control = Arc::new(StreamControl::new());
159 let control_clone = Arc::clone(&control);
160
161 let config = self.config.clone();
162
163 thread::spawn(move || {
165 let result = Self::run_generation(config, sender, control_clone);
166 if let Err(e) = result {
167 warn!("Streaming generation error: {}", e);
168 }
169 });
170
171 Ok((receiver, control))
172 }
173
174 fn run_generation(
176 config: StreamingOrchestratorConfig,
177 sender: StreamSender<GeneratedItem>,
178 control: Arc<StreamControl>,
179 ) -> SynthResult<()> {
180 let start_time = Instant::now();
181 let mut items_generated: u64 = 0;
182 let mut phases_completed = Vec::new();
183
184 let progress_interval = config.stream_config.progress_interval;
186
187 let mut progress = StreamProgress::new("initializing");
189 sender.send(StreamEvent::Progress(progress.clone()))?;
190
191 for phase in &config.phases {
192 if control.is_cancelled() {
193 info!("Generation cancelled");
194 break;
195 }
196
197 while control.is_paused() {
199 std::thread::sleep(std::time::Duration::from_millis(100));
200 if control.is_cancelled() {
201 break;
202 }
203 }
204
205 progress.phase = phase.name().to_string();
206 sender.send(StreamEvent::Progress(progress.clone()))?;
207
208 match phase {
209 GenerationPhase::ChartOfAccounts => {
210 let result =
211 Self::generate_coa_phase(&config.generator_config, &sender, &control)?;
212 items_generated += result;
213 }
214 GenerationPhase::MasterData => {
215 let result = Self::generate_master_data_phase(
216 &config.generator_config,
217 &sender,
218 &control,
219 )?;
220 items_generated += result;
221 }
222 GenerationPhase::JournalEntries => {
223 let result = Self::generate_journal_entries_phase(
224 &config.generator_config,
225 &sender,
226 &control,
227 progress_interval,
228 &mut progress,
229 )?;
230 items_generated += result;
231 }
232 _ => {
233 debug!(
235 "Skipping phase {:?} (not yet implemented for streaming)",
236 phase
237 );
238 }
239 }
240
241 sender.send(StreamEvent::Data(GeneratedItem::PhaseComplete(
243 phase.name().to_string(),
244 )))?;
245 phases_completed.push(phase.name().to_string());
246
247 progress.items_generated = items_generated;
249 progress.elapsed_ms = start_time.elapsed().as_millis() as u64;
250 if progress.elapsed_ms > 0 {
251 progress.items_per_second =
252 (items_generated as f64) / (progress.elapsed_ms as f64 / 1000.0);
253 }
254 sender.send(StreamEvent::Progress(progress.clone()))?;
255 }
256
257 let stats = sender.stats();
259 let summary = StreamSummary {
260 total_items: items_generated,
261 total_time_ms: start_time.elapsed().as_millis() as u64,
262 avg_items_per_second: if start_time.elapsed().as_millis() > 0 {
263 (items_generated as f64) / (start_time.elapsed().as_millis() as f64 / 1000.0)
264 } else {
265 0.0
266 },
267 error_count: 0,
268 dropped_count: stats.items_dropped,
269 peak_memory_mb: None,
270 phases_completed,
271 };
272
273 sender.send(StreamEvent::Complete(summary))?;
274 sender.close();
275
276 Ok(())
277 }
278
279 fn generate_coa_phase(
281 config: &GeneratorConfig,
282 sender: &StreamSender<GeneratedItem>,
283 control: &Arc<StreamControl>,
284 ) -> SynthResult<u64> {
285 use datasynth_generators::ChartOfAccountsGenerator;
286
287 if control.is_cancelled() {
288 return Ok(0);
289 }
290
291 info!("Generating Chart of Accounts");
292 let seed = config.global.seed.unwrap_or(42);
293 let complexity = config.chart_of_accounts.complexity;
294 let industry = config.global.industry;
295
296 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
297 let coa = coa_gen.generate();
298
299 let account_count = coa.account_count() as u64;
300 sender.send(StreamEvent::Data(GeneratedItem::ChartOfAccounts(Box::new(
301 coa,
302 ))))?;
303
304 Ok(account_count)
305 }
306
307 fn generate_master_data_phase(
309 config: &GeneratorConfig,
310 sender: &StreamSender<GeneratedItem>,
311 control: &Arc<StreamControl>,
312 ) -> SynthResult<u64> {
313 use datasynth_generators::{CustomerGenerator, EmployeeGenerator, VendorGenerator};
314
315 let mut count: u64 = 0;
316 let seed = config.global.seed.unwrap_or(42);
317 let md_config = &config.master_data;
318 let effective_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
319 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
320
321 let company_code = config
322 .companies
323 .first()
324 .map(|c| c.code.as_str())
325 .unwrap_or("1000");
326
327 if control.is_cancelled() {
329 return Ok(count);
330 }
331
332 info!("Generating vendors");
333 let mut vendor_gen = VendorGenerator::new(seed);
334 for _ in 0..md_config.vendors.count {
335 if control.is_cancelled() {
336 break;
337 }
338 let vendor = vendor_gen.generate_vendor(company_code, effective_date);
339 sender.send(StreamEvent::Data(GeneratedItem::Vendor(Box::new(vendor))))?;
340 count += 1;
341 }
342
343 if control.is_cancelled() {
345 return Ok(count);
346 }
347
348 info!("Generating customers");
349 let mut customer_gen = CustomerGenerator::new(seed + 1);
350 for _ in 0..md_config.customers.count {
351 if control.is_cancelled() {
352 break;
353 }
354 let customer = customer_gen.generate_customer(company_code, effective_date);
355 sender.send(StreamEvent::Data(GeneratedItem::Customer(Box::new(
356 customer,
357 ))))?;
358 count += 1;
359 }
360
361 if control.is_cancelled() {
363 return Ok(count);
364 }
365
366 info!("Generating employees");
367 let mut employee_gen = EmployeeGenerator::new(seed + 4);
368 let dept = datasynth_generators::DepartmentDefinition {
370 code: "1000".to_string(),
371 name: "General".to_string(),
372 cost_center: "CC1000".to_string(),
373 headcount: 10,
374 system_roles: vec![],
375 transaction_codes: vec![],
376 };
377 for _ in 0..md_config.employees.count {
378 if control.is_cancelled() {
379 break;
380 }
381 let employee = employee_gen.generate_employee(company_code, &dept, effective_date);
382 sender.send(StreamEvent::Data(GeneratedItem::Employee(Box::new(
383 employee,
384 ))))?;
385 count += 1;
386 }
387
388 Ok(count)
389 }
390
391 fn generate_journal_entries_phase(
396 config: &GeneratorConfig,
397 sender: &StreamSender<GeneratedItem>,
398 control: &Arc<StreamControl>,
399 progress_interval: u64,
400 progress: &mut StreamProgress,
401 ) -> SynthResult<u64> {
402 use datasynth_generators::{ChartOfAccountsGenerator, JournalEntryGenerator};
403 use std::sync::Arc;
404
405 let mut count: u64 = 0;
406 let seed = config.global.seed.unwrap_or(42);
407
408 let default_monthly = 500;
410 let total_entries: usize = config
411 .companies
412 .iter()
413 .map(|c| {
414 let monthly = (c.volume_weight * default_monthly as f64) as usize;
415 monthly.max(100) * config.global.period_months as usize
416 })
417 .sum();
418
419 progress.items_remaining = Some(total_entries as u64);
420 info!("Generating {} journal entries", total_entries);
421
422 let complexity = config.chart_of_accounts.complexity;
424 let industry = config.global.industry;
425 let mut coa_gen = ChartOfAccountsGenerator::new(complexity, industry, seed);
426 let coa = Arc::new(coa_gen.generate());
427
428 let start_date = NaiveDate::parse_from_str(&config.global.start_date, "%Y-%m-%d")
430 .unwrap_or_else(|_| NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
431 let end_date =
432 start_date + chrono::Duration::days((config.global.period_months as i64) * 30);
433
434 let mut je_gen = JournalEntryGenerator::from_generator_config(
436 config,
437 Arc::clone(&coa),
438 start_date,
439 end_date,
440 seed,
441 );
442
443 for _ in 0..total_entries {
444 if control.is_cancelled() {
445 break;
446 }
447
448 while control.is_paused() {
450 std::thread::sleep(std::time::Duration::from_millis(100));
451 if control.is_cancelled() {
452 break;
453 }
454 }
455
456 let je = je_gen.generate();
457 sender.send(StreamEvent::Data(GeneratedItem::JournalEntry(Box::new(je))))?;
458 count += 1;
459
460 if count % progress_interval == 0 {
462 progress.items_generated = count;
463 progress.items_remaining = Some(total_entries as u64 - count);
464 sender.send(StreamEvent::Progress(progress.clone()))?;
465 }
466 }
467
468 Ok(count)
469 }
470
471 pub fn stats(&self) -> StreamingOrchestratorStats {
473 StreamingOrchestratorStats {
474 phases: self.config.phases.len(),
475 buffer_size: self.config.stream_config.buffer_size,
476 backpressure: self.config.stream_config.backpressure,
477 }
478 }
479}
480
481#[derive(Debug, Clone)]
483pub struct StreamingOrchestratorStats {
484 pub phases: usize,
486 pub buffer_size: usize,
488 pub backpressure: BackpressureStrategy,
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use datasynth_config::presets::create_preset;
496 use datasynth_config::schema::TransactionVolume;
497 use datasynth_core::models::{CoAComplexity, IndustrySector};
498
499 fn create_test_config() -> GeneratorConfig {
500 create_preset(
501 IndustrySector::Retail,
502 2,
503 3,
504 CoAComplexity::Small,
505 TransactionVolume::TenK,
506 )
507 }
508
509 #[test]
510 fn test_streaming_orchestrator_creation() {
511 let config = create_test_config();
512 let orchestrator = StreamingOrchestrator::from_generator_config(config);
513 let stats = orchestrator.stats();
514
515 assert!(stats.phases > 0);
516 assert!(stats.buffer_size > 0);
517 }
518
519 #[test]
520 fn test_streaming_generation() {
521 let mut config = create_test_config();
522 config.master_data.vendors.count = 5;
524 config.master_data.customers.count = 5;
525 config.master_data.employees.count = 5;
526 config.global.period_months = 1;
527
528 let streaming_config = StreamingOrchestratorConfig::new(config)
529 .with_phases(vec![
530 GenerationPhase::ChartOfAccounts,
531 GenerationPhase::MasterData,
532 ])
533 .with_stream_config(StreamConfig {
534 buffer_size: 100,
535 progress_interval: 10,
536 ..Default::default()
537 });
538
539 let orchestrator = StreamingOrchestrator::new(streaming_config);
540 let (receiver, _control) = orchestrator.stream().unwrap();
541
542 let mut items_count = 0;
543 let mut has_coa = false;
544 let mut has_completion = false;
545
546 for event in receiver {
547 match event {
548 StreamEvent::Data(item) => {
549 items_count += 1;
550 if matches!(item, GeneratedItem::ChartOfAccounts(_)) {
551 has_coa = true;
552 }
553 }
554 StreamEvent::Complete(_) => {
555 has_completion = true;
556 break;
557 }
558 _ => {}
559 }
560 }
561
562 assert!(items_count > 0);
563 assert!(has_coa);
564 assert!(has_completion);
565 }
566
567 #[test]
568 fn test_stream_cancellation() {
569 let mut config = create_test_config();
570 config.global.period_months = 12; let streaming_config = StreamingOrchestratorConfig::new(config)
573 .with_phases(vec![GenerationPhase::JournalEntries]);
574
575 let orchestrator = StreamingOrchestrator::new(streaming_config);
576 let (receiver, control) = orchestrator.stream().unwrap();
577
578 let mut items_count = 0;
580 for event in receiver {
581 if let StreamEvent::Data(_) = event {
582 items_count += 1;
583 if items_count >= 10 {
584 control.cancel();
585 break;
586 }
587 }
588 }
589
590 assert!(control.is_cancelled());
591 }
592
593 #[test]
594 fn test_generated_item_type_name() {
595 use datasynth_core::models::{CoAComplexity, IndustrySector};
596
597 let coa = GeneratedItem::ChartOfAccounts(Box::new(ChartOfAccounts::new(
598 "TEST_COA".to_string(),
599 "Test Chart of Accounts".to_string(),
600 "US".to_string(),
601 IndustrySector::Manufacturing,
602 CoAComplexity::Small,
603 )));
604 assert_eq!(coa.type_name(), "chart_of_accounts");
605
606 let progress = GeneratedItem::Progress(StreamProgress::new("test"));
607 assert_eq!(progress.type_name(), "progress");
608 }
609}