1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 start_time: Instant,
32 pub total_entries: AtomicU64,
34 pub total_anomalies: AtomicU64,
36 pub active_streams: AtomicU64,
38 pub total_stream_events: AtomicU64,
40 pub stream_paused: AtomicBool,
42 pub stream_stopped: AtomicBool,
44 pub triggered_pattern: RwLock<Option<String>>,
46 pub resource_guard: Arc<ResourceGuard>,
48 max_concurrent_generations: AtomicU64,
50}
51
52impl ServerState {
53 pub fn new(config: GeneratorConfig) -> Self {
54 let memory_limit = config.global.memory_limit_mb;
56 let resource_guard = if memory_limit > 0 {
57 ResourceGuardBuilder::new()
58 .memory_limit(memory_limit)
59 .conservative()
60 .build()
61 } else {
62 ResourceGuardBuilder::new()
64 .memory_limit(2048)
65 .conservative()
66 .build()
67 };
68
69 Self {
70 config: RwLock::new(config),
71 start_time: Instant::now(),
72 total_entries: AtomicU64::new(0),
73 total_anomalies: AtomicU64::new(0),
74 active_streams: AtomicU64::new(0),
75 total_stream_events: AtomicU64::new(0),
76 stream_paused: AtomicBool::new(false),
77 stream_stopped: AtomicBool::new(false),
78 triggered_pattern: RwLock::new(None),
79 resource_guard: Arc::new(resource_guard),
80 max_concurrent_generations: AtomicU64::new(4),
81 }
82 }
83
84 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
86 let resource_guard = ResourceGuardBuilder::new()
87 .memory_limit(memory_limit_mb)
88 .conservative()
89 .build();
90
91 Self {
92 config: RwLock::new(config),
93 start_time: Instant::now(),
94 total_entries: AtomicU64::new(0),
95 total_anomalies: AtomicU64::new(0),
96 active_streams: AtomicU64::new(0),
97 total_stream_events: AtomicU64::new(0),
98 stream_paused: AtomicBool::new(false),
99 stream_stopped: AtomicBool::new(false),
100 triggered_pattern: RwLock::new(None),
101 resource_guard: Arc::new(resource_guard),
102 max_concurrent_generations: AtomicU64::new(4),
103 }
104 }
105
106 pub fn uptime_seconds(&self) -> u64 {
107 self.start_time.elapsed().as_secs()
108 }
109
110 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
113 let active = self.active_streams.load(Ordering::Relaxed);
115 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
116 if active >= max {
117 return Err(Status::resource_exhausted(format!(
118 "Too many concurrent generations ({}/{}). Try again later.",
119 active, max
120 )));
121 }
122
123 match self.resource_guard.check() {
125 Ok(level) => {
126 if level == DegradationLevel::Emergency {
127 Err(Status::resource_exhausted(
128 "Server resources critically low. Generation not possible.",
129 ))
130 } else if level == DegradationLevel::Minimal {
131 warn!("Resources constrained, generation may be limited");
132 Ok(level)
133 } else {
134 Ok(level)
135 }
136 }
137 Err(e) => Err(Status::resource_exhausted(format!(
138 "Resource check failed: {}",
139 e
140 ))),
141 }
142 }
143
144 pub fn resource_status(&self) -> ResourceStatus {
146 let stats = self.resource_guard.stats();
147 ResourceStatus {
148 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
149 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
150 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
151 degradation_level: stats.degradation_level.name().to_string(),
152 active_generations: self.active_streams.load(Ordering::Relaxed),
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
159pub struct ResourceStatus {
160 pub memory_usage_mb: u64,
161 pub memory_peak_mb: u64,
162 pub disk_available_mb: u64,
163 pub degradation_level: String,
164 pub active_generations: u64,
165}
166
167pub struct SynthService {
169 pub state: Arc<ServerState>,
170}
171
172impl SynthService {
173 pub fn new(config: GeneratorConfig) -> Self {
174 Self {
175 state: Arc::new(ServerState::new(config)),
176 }
177 }
178
179 pub fn with_state(state: Arc<ServerState>) -> Self {
180 Self { state }
181 }
182
183 async fn proto_to_config(
185 &self,
186 proto: Option<GenerationConfig>,
187 ) -> Result<GeneratorConfig, Status> {
188 match proto {
189 Some(p) => {
190 let industry = match p.industry.to_lowercase().as_str() {
191 "manufacturing" => IndustrySector::Manufacturing,
192 "retail" => IndustrySector::Retail,
193 "financial_services" | "financial" => IndustrySector::FinancialServices,
194 "healthcare" => IndustrySector::Healthcare,
195 "technology" => IndustrySector::Technology,
196 _ => IndustrySector::Manufacturing,
197 };
198
199 let complexity = match p.coa_complexity.to_lowercase().as_str() {
200 "small" => CoAComplexity::Small,
201 "medium" => CoAComplexity::Medium,
202 "large" => CoAComplexity::Large,
203 _ => CoAComplexity::Small,
204 };
205
206 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
207 vec![CompanyConfig {
208 code: "1000".to_string(),
209 name: "Default Company".to_string(),
210 currency: "USD".to_string(),
211 country: "US".to_string(),
212 annual_transaction_volume: TransactionVolume::TenK,
213 volume_weight: 1.0,
214 fiscal_year_variant: "K4".to_string(),
215 }]
216 } else {
217 p.companies
218 .into_iter()
219 .map(|c| CompanyConfig {
220 code: c.code,
221 name: c.name,
222 currency: c.currency,
223 country: c.country,
224 annual_transaction_volume: TransactionVolume::Custom(
225 c.annual_transaction_volume,
226 ),
227 volume_weight: c.volume_weight as f64,
228 fiscal_year_variant: "K4".to_string(),
229 })
230 .collect()
231 };
232
233 let mut config = GeneratorConfig {
234 global: GlobalConfig {
235 seed: if p.seed > 0 { Some(p.seed) } else { None },
236 industry,
237 start_date: if p.start_date.is_empty() {
238 "2024-01-01".to_string()
239 } else {
240 p.start_date
241 },
242 period_months: if p.period_months == 0 {
243 12
244 } else {
245 p.period_months
246 },
247 group_currency: "USD".to_string(),
248 parallel: true,
249 worker_threads: 0,
250 memory_limit_mb: 0,
251 },
252 companies,
253 chart_of_accounts: ChartOfAccountsConfig {
254 complexity,
255 industry_specific: true,
256 custom_accounts: None,
257 min_hierarchy_depth: 2,
258 max_hierarchy_depth: 5,
259 },
260 ..default_generator_config()
261 };
262
263 if p.fraud_enabled {
265 config.fraud.enabled = true;
266 config.fraud.fraud_rate = p.fraud_rate as f64;
267 }
268
269 Ok(config)
270 }
271 None => {
272 let config = self.state.config.read().await;
274 Ok(config.clone())
275 }
276 }
277 }
278
279 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
281 JournalEntryProto {
282 document_id: entry.header.document_id.to_string(),
283 company_code: entry.header.company_code.clone(),
284 fiscal_year: entry.header.fiscal_year as u32,
285 fiscal_period: entry.header.fiscal_period as u32,
286 posting_date: entry.header.posting_date.to_string(),
287 document_date: entry.header.document_date.to_string(),
288 created_at: entry.header.created_at.to_rfc3339(),
289 source: format!("{:?}", entry.header.source),
290 business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
291 lines: entry
292 .lines
293 .iter()
294 .map(|line| {
295 let amount = if line.is_debit() {
296 line.debit_amount
297 } else {
298 line.credit_amount
299 };
300 JournalLineProto {
301 line_number: line.line_number,
302 account_number: line.gl_account.clone(),
303 account_name: line.account_description.clone().unwrap_or_default(),
304 amount: amount.to_string(),
305 is_debit: line.is_debit(),
306 cost_center: line.cost_center.clone(),
307 profit_center: line.profit_center.clone(),
308 vendor_id: None,
309 customer_id: None,
310 material_id: None,
311 text: None,
312 }
313 })
314 .collect(),
315 is_anomaly: entry.header.is_fraud,
316 anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
317 }
318 }
319
320 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
322 GenerationConfig {
323 industry: format!("{:?}", config.global.industry),
324 start_date: config.global.start_date.clone(),
325 period_months: config.global.period_months,
326 seed: config.global.seed.unwrap_or(0),
327 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
328 companies: config
329 .companies
330 .iter()
331 .map(|c| CompanyConfigProto {
332 code: c.code.clone(),
333 name: c.name.clone(),
334 currency: c.currency.clone(),
335 country: c.country.clone(),
336 annual_transaction_volume: c.annual_transaction_volume.count(),
337 volume_weight: c.volume_weight as f32,
338 })
339 .collect(),
340 fraud_enabled: config.fraud.enabled,
341 fraud_rate: config.fraud.fraud_rate as f32,
342 generate_master_data: true,
343 generate_document_flows: true,
344 }
345 }
346}
347
348#[tonic::async_trait]
349impl synthetic_data_service_server::SyntheticDataService for SynthService {
350 async fn bulk_generate(
352 &self,
353 request: Request<BulkGenerateRequest>,
354 ) -> Result<Response<BulkGenerateResponse>, Status> {
355 let req = request.into_inner();
356
357 const MAX_ENTRY_COUNT: u64 = 1_000_000;
359 if req.entry_count > MAX_ENTRY_COUNT {
360 return Err(Status::invalid_argument(format!(
361 "entry_count ({}) exceeds maximum allowed value ({})",
362 req.entry_count, MAX_ENTRY_COUNT
363 )));
364 }
365
366 let degradation_level = self.state.check_resources()?;
368 if degradation_level != DegradationLevel::Normal {
369 warn!(
370 "Starting bulk generation under resource pressure (level: {:?})",
371 degradation_level
372 );
373 }
374
375 info!("Bulk generate request: {} entries", req.entry_count);
376
377 let config = self.proto_to_config(req.config).await?;
378 let start_time = Instant::now();
379
380 let phase_config = PhaseConfig {
382 generate_master_data: req.include_master_data,
383 generate_document_flows: false,
384 generate_journal_entries: true,
385 inject_anomalies: req.inject_anomalies,
386 show_progress: false,
387 ..Default::default()
388 };
389
390 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
391 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
392
393 let result = orchestrator
394 .generate()
395 .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
396
397 let duration_ms = start_time.elapsed().as_millis() as u64;
398
399 let entries_count = result.journal_entries.len() as u64;
401 self.state
402 .total_entries
403 .fetch_add(entries_count, Ordering::Relaxed);
404
405 let anomaly_count = result.anomaly_labels.labels.len() as u64;
406 self.state
407 .total_anomalies
408 .fetch_add(anomaly_count, Ordering::Relaxed);
409
410 let journal_entries: Vec<JournalEntryProto> = result
412 .journal_entries
413 .iter()
414 .map(Self::journal_entry_to_proto)
415 .collect();
416
417 let anomaly_labels: Vec<AnomalyLabelProto> = result
418 .anomaly_labels
419 .labels
420 .iter()
421 .map(|a| AnomalyLabelProto {
422 anomaly_id: a.anomaly_id.clone(),
423 document_id: a.document_id.clone(),
424 anomaly_type: format!("{:?}", a.anomaly_type),
425 anomaly_category: a.document_type.clone(),
426 description: a.description.clone(),
427 severity_score: a.severity as f32,
428 })
429 .collect();
430
431 let mut total_debit = rust_decimal::Decimal::ZERO;
433 let mut total_credit = rust_decimal::Decimal::ZERO;
434 let mut total_lines = 0u64;
435 let mut entries_by_company = std::collections::HashMap::new();
436 let mut entries_by_source = std::collections::HashMap::new();
437
438 for entry in &result.journal_entries {
439 *entries_by_company
440 .entry(entry.header.company_code.clone())
441 .or_insert(0u64) += 1;
442 *entries_by_source
443 .entry(format!("{:?}", entry.header.source))
444 .or_insert(0u64) += 1;
445
446 for line in &entry.lines {
447 total_lines += 1;
448 total_debit += line.debit_amount;
449 total_credit += line.credit_amount;
450 }
451 }
452
453 let stats = GenerationStats {
454 total_entries: entries_count,
455 total_lines,
456 total_debit_amount: total_debit.to_string(),
457 total_credit_amount: total_credit.to_string(),
458 anomaly_count,
459 entries_by_company,
460 entries_by_source,
461 };
462
463 info!(
464 "Bulk generation complete: {} entries in {}ms",
465 entries_count, duration_ms
466 );
467
468 Ok(Response::new(BulkGenerateResponse {
469 entries_generated: entries_count,
470 duration_ms,
471 journal_entries,
472 anomaly_labels,
473 stats: Some(stats),
474 }))
475 }
476
477 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
478
479 async fn stream_data(
481 &self,
482 request: Request<StreamDataRequest>,
483 ) -> Result<Response<Self::StreamDataStream>, Status> {
484 let req = request.into_inner();
485
486 const MIN_EVENTS_PER_SECOND: u32 = 1;
488 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
489 if req.events_per_second < MIN_EVENTS_PER_SECOND {
490 return Err(Status::invalid_argument(format!(
491 "events_per_second ({}) must be at least {}",
492 req.events_per_second, MIN_EVENTS_PER_SECOND
493 )));
494 }
495 if req.events_per_second > MAX_EVENTS_PER_SECOND {
496 return Err(Status::invalid_argument(format!(
497 "events_per_second ({}) exceeds maximum allowed value ({})",
498 req.events_per_second, MAX_EVENTS_PER_SECOND
499 )));
500 }
501
502 const MAX_STREAM_EVENTS: u64 = 10_000_000;
504 if req.max_events > MAX_STREAM_EVENTS {
505 return Err(Status::invalid_argument(format!(
506 "max_events ({}) exceeds maximum allowed value ({})",
507 req.max_events, MAX_STREAM_EVENTS
508 )));
509 }
510
511 let degradation_level = self.state.check_resources()?;
513 if degradation_level != DegradationLevel::Normal {
514 warn!(
515 "Starting stream under resource pressure (level: {:?})",
516 degradation_level
517 );
518 }
519
520 info!(
521 "Stream data request: {} events/sec, max {}",
522 req.events_per_second, req.max_events
523 );
524
525 let config = self.proto_to_config(req.config).await?;
526 let state = self.state.clone();
527
528 state.active_streams.fetch_add(1, Ordering::Relaxed);
530
531 state.stream_paused.store(false, Ordering::Relaxed);
533 state.stream_stopped.store(false, Ordering::Relaxed);
534
535 let (tx, rx) = mpsc::channel(100);
536
537 let events_per_second = req.events_per_second;
539 let max_events = req.max_events;
540 let inject_anomalies = req.inject_anomalies;
541
542 tokio::spawn(async move {
543 let phase_config = PhaseConfig {
544 generate_master_data: false,
545 generate_document_flows: false,
546 generate_journal_entries: true,
547 inject_anomalies,
548 show_progress: false,
549 ..Default::default()
550 };
551
552 let mut sequence = 0u64;
553 let delay = if events_per_second > 0 {
554 Duration::from_micros(1_000_000 / events_per_second as u64)
555 } else {
556 Duration::from_millis(1)
557 };
558
559 loop {
560 if state.stream_stopped.load(Ordering::Relaxed) {
562 info!("Stream stopped by control command");
563 break;
564 }
565
566 while state.stream_paused.load(Ordering::Relaxed) {
568 tokio::time::sleep(Duration::from_millis(100)).await;
569 if state.stream_stopped.load(Ordering::Relaxed) {
570 break;
571 }
572 }
573
574 if max_events > 0 && sequence >= max_events {
576 info!("Stream reached max events: {}", max_events);
577 break;
578 }
579
580 let mut orchestrator =
582 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
583 Ok(o) => o,
584 Err(e) => {
585 error!("Failed to create orchestrator: {}", e);
586 break;
587 }
588 };
589
590 let result = match orchestrator.generate() {
591 Ok(r) => r,
592 Err(e) => {
593 error!("Generation failed: {}", e);
594 break;
595 }
596 };
597
598 for entry in result.journal_entries {
600 sequence += 1;
601 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
602 state.total_entries.fetch_add(1, Ordering::Relaxed);
603
604 let timestamp = Timestamp {
605 seconds: Utc::now().timestamp(),
606 nanos: 0,
607 };
608
609 let event = DataEvent {
610 sequence,
611 timestamp: Some(timestamp),
612 event: Some(data_event::Event::JournalEntry(
613 SynthService::journal_entry_to_proto(&entry),
614 )),
615 };
616
617 if tx.send(Ok(event)).await.is_err() {
618 info!("Stream receiver dropped");
619 break;
620 }
621
622 tokio::time::sleep(delay).await;
624
625 if max_events > 0 && sequence >= max_events {
627 break;
628 }
629 }
630 }
631
632 state.active_streams.fetch_sub(1, Ordering::Relaxed);
634 });
635
636 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
637 }
638
639 async fn control(
641 &self,
642 request: Request<ControlCommand>,
643 ) -> Result<Response<ControlResponse>, Status> {
644 let cmd = request.into_inner();
645 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
646
647 info!("Control command: {:?}", action);
648
649 let (success, message, status) = match action {
650 ControlAction::Pause => {
651 self.state.stream_paused.store(true, Ordering::Relaxed);
652 (true, "Stream paused".to_string(), StreamStatus::Paused)
653 }
654 ControlAction::Resume => {
655 self.state.stream_paused.store(false, Ordering::Relaxed);
656 (true, "Stream resumed".to_string(), StreamStatus::Running)
657 }
658 ControlAction::Stop => {
659 self.state.stream_stopped.store(true, Ordering::Relaxed);
660 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
661 }
662 ControlAction::TriggerPattern => {
663 let pattern = cmd.pattern_name.unwrap_or_default();
664 if pattern.is_empty() {
665 (
666 false,
667 "Pattern name is required for TriggerPattern action".to_string(),
668 StreamStatus::Running,
669 )
670 } else {
671 let valid_patterns = [
674 "year_end_spike",
675 "period_end_spike",
676 "holiday_cluster",
677 "fraud_cluster",
678 "error_cluster",
679 "uniform",
680 ];
681 let is_valid = valid_patterns.contains(&pattern.as_str())
682 || pattern.starts_with("custom:");
683
684 if is_valid {
685 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
687 *triggered = Some(pattern.clone());
688 }
689 info!("Pattern trigger activated: {}", pattern);
690 (
691 true,
692 format!("Pattern '{}' will be applied to upcoming entries", pattern),
693 StreamStatus::Running,
694 )
695 } else {
696 (
697 false,
698 format!(
699 "Unknown pattern '{}'. Valid patterns: {:?}",
700 pattern, valid_patterns
701 ),
702 StreamStatus::Running,
703 )
704 }
705 }
706 }
707 ControlAction::Unspecified => (
708 false,
709 "Unknown control action".to_string(),
710 StreamStatus::Unspecified,
711 ),
712 };
713
714 Ok(Response::new(ControlResponse {
715 success,
716 message,
717 current_status: status as i32,
718 }))
719 }
720
721 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
723 let config = self.state.config.read().await;
724 let proto_config = Self::config_to_proto(&config);
725
726 Ok(Response::new(ConfigResponse {
727 success: true,
728 message: "Current configuration retrieved".to_string(),
729 current_config: Some(proto_config),
730 }))
731 }
732
733 async fn set_config(
735 &self,
736 request: Request<ConfigRequest>,
737 ) -> Result<Response<ConfigResponse>, Status> {
738 let req = request.into_inner();
739
740 if let Some(proto_config) = req.config {
741 let new_config = self.proto_to_config(Some(proto_config)).await?;
742
743 let mut config = self.state.config.write().await;
744 *config = new_config.clone();
745
746 info!("Configuration updated");
747
748 Ok(Response::new(ConfigResponse {
749 success: true,
750 message: "Configuration updated".to_string(),
751 current_config: Some(Self::config_to_proto(&new_config)),
752 }))
753 } else {
754 Err(Status::invalid_argument("No configuration provided"))
755 }
756 }
757
758 async fn get_metrics(
760 &self,
761 _request: Request<()>,
762 ) -> Result<Response<MetricsResponse>, Status> {
763 let uptime = self.state.uptime_seconds();
764 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
765
766 let entries_per_second = if uptime > 0 {
767 total_entries as f64 / uptime as f64
768 } else {
769 0.0
770 };
771
772 Ok(Response::new(MetricsResponse {
773 total_entries_generated: total_entries,
774 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
775 uptime_seconds: uptime,
776 session_entries: total_entries,
777 session_entries_per_second: entries_per_second,
778 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
779 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
780 }))
781 }
782
783 async fn health_check(
785 &self,
786 _request: Request<()>,
787 ) -> Result<Response<HealthResponse>, Status> {
788 Ok(Response::new(HealthResponse {
789 healthy: true,
790 version: env!("CARGO_PKG_VERSION").to_string(),
791 uptime_seconds: self.state.uptime_seconds(),
792 }))
793 }
794}
795
796pub fn default_generator_config() -> GeneratorConfig {
798 GeneratorConfig {
799 global: GlobalConfig {
800 seed: None,
801 industry: IndustrySector::Manufacturing,
802 start_date: "2024-01-01".to_string(),
803 period_months: 12,
804 group_currency: "USD".to_string(),
805 parallel: true,
806 worker_threads: 0,
807 memory_limit_mb: 0,
808 },
809 companies: vec![CompanyConfig {
810 code: "1000".to_string(),
811 name: "Default Company".to_string(),
812 currency: "USD".to_string(),
813 country: "US".to_string(),
814 annual_transaction_volume: TransactionVolume::TenK,
815 volume_weight: 1.0,
816 fiscal_year_variant: "K4".to_string(),
817 }],
818 chart_of_accounts: ChartOfAccountsConfig {
819 complexity: CoAComplexity::Small,
820 industry_specific: true,
821 custom_accounts: None,
822 min_hierarchy_depth: 2,
823 max_hierarchy_depth: 5,
824 },
825 transactions: Default::default(),
826 output: OutputConfig::default(),
827 fraud: Default::default(),
828 internal_controls: Default::default(),
829 business_processes: Default::default(),
830 user_personas: Default::default(),
831 templates: Default::default(),
832 approval: Default::default(),
833 departments: Default::default(),
834 master_data: Default::default(),
835 document_flows: Default::default(),
836 intercompany: Default::default(),
837 balance: Default::default(),
838 ocpm: Default::default(),
839 audit: Default::default(),
840 banking: Default::default(),
841 data_quality: Default::default(),
842 scenario: Default::default(),
843 temporal: Default::default(),
844 graph_export: Default::default(),
845 streaming: Default::default(),
846 rate_limit: Default::default(),
847 temporal_attributes: Default::default(),
848 relationships: Default::default(),
849 accounting_standards: Default::default(),
850 audit_standards: Default::default(),
851 distributions: Default::default(),
852 temporal_patterns: Default::default(),
853 vendor_network: Default::default(),
854 customer_segmentation: Default::default(),
855 relationship_strength: Default::default(),
856 cross_process_links: Default::default(),
857 organizational_events: Default::default(),
858 behavioral_drift: Default::default(),
859 market_drift: Default::default(),
860 drift_labeling: Default::default(),
861 anomaly_injection: Default::default(),
862 industry_specific: Default::default(),
863 fingerprint_privacy: Default::default(),
864 quality_gates: Default::default(),
865 compliance: Default::default(),
866 webhooks: Default::default(),
867 llm: Default::default(),
868 diffusion: Default::default(),
869 causal: Default::default(),
870 source_to_pay: Default::default(),
871 financial_reporting: Default::default(),
872 hr: Default::default(),
873 manufacturing: Default::default(),
874 sales_quotes: Default::default(),
875 tax: Default::default(),
876 treasury: Default::default(),
877 project_accounting: Default::default(),
878 esg: Default::default(),
879 }
880}
881
882#[cfg(test)]
883#[allow(clippy::unwrap_used)]
884mod tests {
885 use super::*;
886 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
887
888 #[tokio::test]
893 async fn test_service_creation() {
894 let config = default_generator_config();
895 let service = SynthService::new(config);
896 assert!(service.state.uptime_seconds() < 60);
898 }
899
900 #[tokio::test]
901 async fn test_service_with_state() {
902 let config = default_generator_config();
903 let state = Arc::new(ServerState::new(config));
904 let service = SynthService::with_state(Arc::clone(&state));
905
906 state.total_entries.store(100, Ordering::Relaxed);
908 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
909 }
910
911 #[tokio::test]
916 async fn test_health_check() {
917 let config = default_generator_config();
918 let service = SynthService::new(config);
919
920 let response = service.health_check(Request::new(())).await.unwrap();
921 let health = response.into_inner();
922
923 assert!(health.healthy);
924 assert!(!health.version.is_empty());
925 }
926
927 #[tokio::test]
928 async fn test_health_check_returns_version() {
929 let config = default_generator_config();
930 let service = SynthService::new(config);
931
932 let response = service.health_check(Request::new(())).await.unwrap();
933 let health = response.into_inner();
934
935 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
936 }
937
938 #[tokio::test]
943 async fn test_get_config() {
944 let config = default_generator_config();
945 let service = SynthService::new(config);
946
947 let response = service.get_config(Request::new(())).await.unwrap();
948 let config_response = response.into_inner();
949
950 assert!(config_response.success);
951 assert!(config_response.current_config.is_some());
952 }
953
954 #[tokio::test]
955 async fn test_get_config_returns_industry() {
956 let config = default_generator_config();
957 let service = SynthService::new(config);
958
959 let response = service.get_config(Request::new(())).await.unwrap();
960 let config_response = response.into_inner();
961 let current = config_response.current_config.unwrap();
962
963 assert_eq!(current.industry, "Manufacturing");
964 }
965
966 #[tokio::test]
967 async fn test_set_config() {
968 let config = default_generator_config();
969 let service = SynthService::new(config);
970
971 let new_config = GenerationConfig {
972 industry: "retail".to_string(),
973 start_date: "2024-06-01".to_string(),
974 period_months: 6,
975 seed: 42,
976 coa_complexity: "medium".to_string(),
977 companies: vec![],
978 fraud_enabled: true,
979 fraud_rate: 0.05,
980 generate_master_data: false,
981 generate_document_flows: false,
982 };
983
984 let response = service
985 .set_config(Request::new(ConfigRequest {
986 config: Some(new_config),
987 }))
988 .await
989 .unwrap();
990 let config_response = response.into_inner();
991
992 assert!(config_response.success);
993 }
994
995 #[tokio::test]
996 async fn test_set_config_without_config_fails() {
997 let config = default_generator_config();
998 let service = SynthService::new(config);
999
1000 let result = service
1001 .set_config(Request::new(ConfigRequest { config: None }))
1002 .await;
1003
1004 assert!(result.is_err());
1005 }
1006
1007 #[tokio::test]
1012 async fn test_get_metrics_initial() {
1013 let config = default_generator_config();
1014 let service = SynthService::new(config);
1015
1016 let response = service.get_metrics(Request::new(())).await.unwrap();
1017 let metrics = response.into_inner();
1018
1019 assert_eq!(metrics.total_entries_generated, 0);
1020 assert_eq!(metrics.total_anomalies_injected, 0);
1021 assert_eq!(metrics.active_streams, 0);
1022 }
1023
1024 #[tokio::test]
1025 async fn test_get_metrics_after_updates() {
1026 let config = default_generator_config();
1027 let service = SynthService::new(config);
1028
1029 service.state.total_entries.store(1000, Ordering::Relaxed);
1031 service.state.total_anomalies.store(20, Ordering::Relaxed);
1032 service.state.active_streams.store(2, Ordering::Relaxed);
1033
1034 let response = service.get_metrics(Request::new(())).await.unwrap();
1035 let metrics = response.into_inner();
1036
1037 assert_eq!(metrics.total_entries_generated, 1000);
1038 assert_eq!(metrics.total_anomalies_injected, 20);
1039 assert_eq!(metrics.active_streams, 2);
1040 }
1041
1042 #[tokio::test]
1047 async fn test_control_pause() {
1048 let config = default_generator_config();
1049 let service = SynthService::new(config);
1050
1051 let response = service
1052 .control(Request::new(ControlCommand {
1053 action: ControlAction::Pause as i32,
1054 pattern_name: None,
1055 }))
1056 .await
1057 .unwrap();
1058 let control_response = response.into_inner();
1059
1060 assert!(control_response.success);
1061 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1062 }
1063
1064 #[tokio::test]
1065 async fn test_control_resume() {
1066 let config = default_generator_config();
1067 let service = SynthService::new(config);
1068
1069 service.state.stream_paused.store(true, Ordering::Relaxed);
1071
1072 let response = service
1073 .control(Request::new(ControlCommand {
1074 action: ControlAction::Resume as i32,
1075 pattern_name: None,
1076 }))
1077 .await
1078 .unwrap();
1079 let control_response = response.into_inner();
1080
1081 assert!(control_response.success);
1082 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1083 }
1084
1085 #[tokio::test]
1086 async fn test_control_stop() {
1087 let config = default_generator_config();
1088 let service = SynthService::new(config);
1089
1090 let response = service
1091 .control(Request::new(ControlCommand {
1092 action: ControlAction::Stop as i32,
1093 pattern_name: None,
1094 }))
1095 .await
1096 .unwrap();
1097 let control_response = response.into_inner();
1098
1099 assert!(control_response.success);
1100 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1101 }
1102
1103 #[test]
1108 fn test_server_state_creation() {
1109 let config = default_generator_config();
1110 let state = ServerState::new(config);
1111
1112 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1113 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1114 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1115 assert!(!state.stream_paused.load(Ordering::Relaxed));
1116 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1117 }
1118
1119 #[test]
1120 fn test_server_state_uptime() {
1121 let config = default_generator_config();
1122 let state = ServerState::new(config);
1123
1124 assert!(state.uptime_seconds() < 60);
1126 }
1127
1128 #[test]
1133 fn test_default_generator_config() {
1134 let config = default_generator_config();
1135
1136 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1137 assert_eq!(config.global.period_months, 12);
1138 assert!(!config.companies.is_empty());
1139 assert_eq!(config.companies[0].code, "1000");
1140 }
1141
1142 #[test]
1143 fn test_config_to_proto() {
1144 let config = default_generator_config();
1145 let proto = SynthService::config_to_proto(&config);
1146
1147 assert_eq!(proto.industry, "Manufacturing");
1148 assert_eq!(proto.period_months, 12);
1149 assert!(!proto.companies.is_empty());
1150 }
1151
1152 #[tokio::test]
1153 async fn test_proto_to_config_with_none() {
1154 let config = default_generator_config();
1155 let service = SynthService::new(config.clone());
1156
1157 let result = service.proto_to_config(None).await.unwrap();
1158
1159 assert_eq!(result.global.industry, config.global.industry);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_proto_to_config_with_retail() {
1165 let config = default_generator_config();
1166 let service = SynthService::new(config);
1167
1168 let proto = GenerationConfig {
1169 industry: "retail".to_string(),
1170 start_date: "2024-01-01".to_string(),
1171 period_months: 6,
1172 seed: 0,
1173 coa_complexity: "large".to_string(),
1174 companies: vec![],
1175 fraud_enabled: false,
1176 fraud_rate: 0.0,
1177 generate_master_data: false,
1178 generate_document_flows: false,
1179 };
1180
1181 let result = service.proto_to_config(Some(proto)).await.unwrap();
1182
1183 assert_eq!(result.global.industry, IndustrySector::Retail);
1184 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1185 }
1186
1187 #[tokio::test]
1188 async fn test_proto_to_config_with_healthcare() {
1189 let config = default_generator_config();
1190 let service = SynthService::new(config);
1191
1192 let proto = GenerationConfig {
1193 industry: "healthcare".to_string(),
1194 start_date: "2024-01-01".to_string(),
1195 period_months: 12,
1196 seed: 42,
1197 coa_complexity: "small".to_string(),
1198 companies: vec![],
1199 fraud_enabled: true,
1200 fraud_rate: 0.1,
1201 generate_master_data: true,
1202 generate_document_flows: true,
1203 };
1204
1205 let result = service.proto_to_config(Some(proto)).await.unwrap();
1206
1207 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1208 assert_eq!(result.global.seed, Some(42));
1209 assert!(result.fraud.enabled);
1210 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1211 }
1212
1213 #[tokio::test]
1214 async fn test_proto_to_config_with_companies() {
1215 let config = default_generator_config();
1216 let service = SynthService::new(config);
1217
1218 let proto = GenerationConfig {
1219 industry: "technology".to_string(),
1220 start_date: "2024-01-01".to_string(),
1221 period_months: 12,
1222 seed: 0,
1223 coa_complexity: "medium".to_string(),
1224 companies: vec![
1225 CompanyConfigProto {
1226 code: "1000".to_string(),
1227 name: "Parent Corp".to_string(),
1228 currency: "USD".to_string(),
1229 country: "US".to_string(),
1230 annual_transaction_volume: 100000,
1231 volume_weight: 1.0,
1232 },
1233 CompanyConfigProto {
1234 code: "2000".to_string(),
1235 name: "EU Sub".to_string(),
1236 currency: "EUR".to_string(),
1237 country: "DE".to_string(),
1238 annual_transaction_volume: 50000,
1239 volume_weight: 0.5,
1240 },
1241 ],
1242 fraud_enabled: false,
1243 fraud_rate: 0.0,
1244 generate_master_data: false,
1245 generate_document_flows: false,
1246 };
1247
1248 let result = service.proto_to_config(Some(proto)).await.unwrap();
1249
1250 assert_eq!(result.companies.len(), 2);
1251 assert_eq!(result.companies[0].code, "1000");
1252 assert_eq!(result.companies[1].currency, "EUR");
1253 }
1254
1255 #[tokio::test]
1260 async fn test_bulk_generate_entry_count_validation() {
1261 let config = default_generator_config();
1262 let service = SynthService::new(config);
1263
1264 let request = BulkGenerateRequest {
1265 entry_count: 2_000_000, include_master_data: false,
1267 inject_anomalies: false,
1268 output_format: 0,
1269 config: None,
1270 };
1271
1272 let result = service.bulk_generate(Request::new(request)).await;
1273 assert!(result.is_err());
1274 let err = result.err().unwrap();
1275 assert!(err.message().contains("exceeds maximum allowed value"));
1276 }
1277
1278 #[tokio::test]
1279 async fn test_stream_data_events_per_second_too_low() {
1280 let config = default_generator_config();
1281 let service = SynthService::new(config);
1282
1283 let request = StreamDataRequest {
1284 events_per_second: 0, max_events: 100,
1286 inject_anomalies: false,
1287 anomaly_rate: 0.0,
1288 config: None,
1289 };
1290
1291 let result = service.stream_data(Request::new(request)).await;
1292 assert!(result.is_err());
1293 let err = result.err().unwrap();
1294 assert!(err.message().contains("must be at least"));
1295 }
1296
1297 #[tokio::test]
1298 async fn test_stream_data_events_per_second_too_high() {
1299 let config = default_generator_config();
1300 let service = SynthService::new(config);
1301
1302 let request = StreamDataRequest {
1303 events_per_second: 20_000, max_events: 100,
1305 inject_anomalies: false,
1306 anomaly_rate: 0.0,
1307 config: None,
1308 };
1309
1310 let result = service.stream_data(Request::new(request)).await;
1311 assert!(result.is_err());
1312 let err = result.err().unwrap();
1313 assert!(err.message().contains("exceeds maximum allowed value"));
1314 }
1315
1316 #[tokio::test]
1317 async fn test_stream_data_max_events_too_high() {
1318 let config = default_generator_config();
1319 let service = SynthService::new(config);
1320
1321 let request = StreamDataRequest {
1322 events_per_second: 100,
1323 max_events: 100_000_000, inject_anomalies: false,
1325 anomaly_rate: 0.0,
1326 config: None,
1327 };
1328
1329 let result = service.stream_data(Request::new(request)).await;
1330 assert!(result.is_err());
1331 let err = result.err().unwrap();
1332 assert!(err.message().contains("max_events"));
1333 }
1334
1335 #[tokio::test]
1336 async fn test_stream_data_valid_request() {
1337 let config = default_generator_config();
1338 let service = SynthService::new(config);
1339
1340 let request = StreamDataRequest {
1341 events_per_second: 10,
1342 max_events: 5,
1343 inject_anomalies: false,
1344 anomaly_rate: 0.0,
1345 config: None,
1346 };
1347
1348 let result = service.stream_data(Request::new(request)).await;
1351 assert!(result.is_ok());
1352 }
1353}