1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 start_time: Instant,
32 pub total_entries: AtomicU64,
34 pub total_anomalies: AtomicU64,
36 pub active_streams: AtomicU64,
38 pub total_stream_events: AtomicU64,
40 pub stream_paused: AtomicBool,
42 pub stream_stopped: AtomicBool,
44 pub triggered_pattern: RwLock<Option<String>>,
46 pub resource_guard: Arc<ResourceGuard>,
48 max_concurrent_generations: AtomicU64,
50}
51
52impl ServerState {
53 pub fn new(config: GeneratorConfig) -> Self {
54 let memory_limit = config.global.memory_limit_mb;
56 let resource_guard = if memory_limit > 0 {
57 ResourceGuardBuilder::new()
58 .memory_limit(memory_limit)
59 .conservative()
60 .build()
61 } else {
62 ResourceGuardBuilder::new()
64 .memory_limit(2048)
65 .conservative()
66 .build()
67 };
68
69 Self {
70 config: RwLock::new(config),
71 start_time: Instant::now(),
72 total_entries: AtomicU64::new(0),
73 total_anomalies: AtomicU64::new(0),
74 active_streams: AtomicU64::new(0),
75 total_stream_events: AtomicU64::new(0),
76 stream_paused: AtomicBool::new(false),
77 stream_stopped: AtomicBool::new(false),
78 triggered_pattern: RwLock::new(None),
79 resource_guard: Arc::new(resource_guard),
80 max_concurrent_generations: AtomicU64::new(4),
81 }
82 }
83
84 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
86 let resource_guard = ResourceGuardBuilder::new()
87 .memory_limit(memory_limit_mb)
88 .conservative()
89 .build();
90
91 Self {
92 config: RwLock::new(config),
93 start_time: Instant::now(),
94 total_entries: AtomicU64::new(0),
95 total_anomalies: AtomicU64::new(0),
96 active_streams: AtomicU64::new(0),
97 total_stream_events: AtomicU64::new(0),
98 stream_paused: AtomicBool::new(false),
99 stream_stopped: AtomicBool::new(false),
100 triggered_pattern: RwLock::new(None),
101 resource_guard: Arc::new(resource_guard),
102 max_concurrent_generations: AtomicU64::new(4),
103 }
104 }
105
106 pub fn uptime_seconds(&self) -> u64 {
107 self.start_time.elapsed().as_secs()
108 }
109
110 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
113 let active = self.active_streams.load(Ordering::Relaxed);
115 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
116 if active >= max {
117 return Err(Status::resource_exhausted(format!(
118 "Too many concurrent generations ({}/{}). Try again later.",
119 active, max
120 )));
121 }
122
123 match self.resource_guard.check() {
125 Ok(level) => {
126 if level == DegradationLevel::Emergency {
127 Err(Status::resource_exhausted(
128 "Server resources critically low. Generation not possible.",
129 ))
130 } else if level == DegradationLevel::Minimal {
131 warn!("Resources constrained, generation may be limited");
132 Ok(level)
133 } else {
134 Ok(level)
135 }
136 }
137 Err(e) => Err(Status::resource_exhausted(format!(
138 "Resource check failed: {}",
139 e
140 ))),
141 }
142 }
143
144 pub fn resource_status(&self) -> ResourceStatus {
146 let stats = self.resource_guard.stats();
147 ResourceStatus {
148 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
149 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
150 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
151 degradation_level: stats.degradation_level.name().to_string(),
152 active_generations: self.active_streams.load(Ordering::Relaxed),
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
159pub struct ResourceStatus {
160 pub memory_usage_mb: u64,
161 pub memory_peak_mb: u64,
162 pub disk_available_mb: u64,
163 pub degradation_level: String,
164 pub active_generations: u64,
165}
166
167pub struct SynthService {
169 pub state: Arc<ServerState>,
170}
171
172impl SynthService {
173 pub fn new(config: GeneratorConfig) -> Self {
174 Self {
175 state: Arc::new(ServerState::new(config)),
176 }
177 }
178
179 pub fn with_state(state: Arc<ServerState>) -> Self {
180 Self { state }
181 }
182
183 async fn proto_to_config(
185 &self,
186 proto: Option<GenerationConfig>,
187 ) -> Result<GeneratorConfig, Status> {
188 match proto {
189 Some(p) => {
190 let industry = match p.industry.to_lowercase().as_str() {
191 "manufacturing" => IndustrySector::Manufacturing,
192 "retail" => IndustrySector::Retail,
193 "financial_services" | "financial" => IndustrySector::FinancialServices,
194 "healthcare" => IndustrySector::Healthcare,
195 "technology" => IndustrySector::Technology,
196 _ => IndustrySector::Manufacturing,
197 };
198
199 let complexity = match p.coa_complexity.to_lowercase().as_str() {
200 "small" => CoAComplexity::Small,
201 "medium" => CoAComplexity::Medium,
202 "large" => CoAComplexity::Large,
203 _ => CoAComplexity::Small,
204 };
205
206 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
207 vec![CompanyConfig {
208 code: "1000".to_string(),
209 name: "Default Company".to_string(),
210 currency: "USD".to_string(),
211 country: "US".to_string(),
212 annual_transaction_volume: TransactionVolume::TenK,
213 volume_weight: 1.0,
214 fiscal_year_variant: "K4".to_string(),
215 }]
216 } else {
217 p.companies
218 .into_iter()
219 .map(|c| CompanyConfig {
220 code: c.code,
221 name: c.name,
222 currency: c.currency,
223 country: c.country,
224 annual_transaction_volume: TransactionVolume::Custom(
225 c.annual_transaction_volume,
226 ),
227 volume_weight: c.volume_weight as f64,
228 fiscal_year_variant: "K4".to_string(),
229 })
230 .collect()
231 };
232
233 let mut config = GeneratorConfig {
234 global: GlobalConfig {
235 seed: if p.seed > 0 { Some(p.seed) } else { None },
236 industry,
237 start_date: if p.start_date.is_empty() {
238 "2024-01-01".to_string()
239 } else {
240 p.start_date
241 },
242 period_months: if p.period_months == 0 {
243 12
244 } else {
245 p.period_months
246 },
247 group_currency: "USD".to_string(),
248 parallel: true,
249 worker_threads: 0,
250 memory_limit_mb: 0,
251 },
252 companies,
253 chart_of_accounts: ChartOfAccountsConfig {
254 complexity,
255 industry_specific: true,
256 custom_accounts: None,
257 min_hierarchy_depth: 2,
258 max_hierarchy_depth: 5,
259 },
260 ..default_generator_config()
261 };
262
263 if p.fraud_enabled {
265 config.fraud.enabled = true;
266 config.fraud.fraud_rate = p.fraud_rate as f64;
267 }
268
269 Ok(config)
270 }
271 None => {
272 let config = self.state.config.read().await;
274 Ok(config.clone())
275 }
276 }
277 }
278
279 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
281 JournalEntryProto {
282 document_id: entry.header.document_id.to_string(),
283 company_code: entry.header.company_code.clone(),
284 fiscal_year: entry.header.fiscal_year as u32,
285 fiscal_period: entry.header.fiscal_period as u32,
286 posting_date: entry.header.posting_date.to_string(),
287 document_date: entry.header.document_date.to_string(),
288 created_at: entry.header.created_at.to_rfc3339(),
289 source: format!("{:?}", entry.header.source),
290 business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
291 lines: entry
292 .lines
293 .iter()
294 .map(|line| {
295 let amount = if line.is_debit() {
296 line.debit_amount
297 } else {
298 line.credit_amount
299 };
300 JournalLineProto {
301 line_number: line.line_number,
302 account_number: line.gl_account.clone(),
303 account_name: line.account_description.clone().unwrap_or_default(),
304 amount: amount.to_string(),
305 is_debit: line.is_debit(),
306 cost_center: line.cost_center.clone(),
307 profit_center: line.profit_center.clone(),
308 vendor_id: None,
309 customer_id: None,
310 material_id: None,
311 text: None,
312 }
313 })
314 .collect(),
315 is_anomaly: entry.header.is_fraud,
316 anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
317 }
318 }
319
320 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
322 GenerationConfig {
323 industry: format!("{:?}", config.global.industry),
324 start_date: config.global.start_date.clone(),
325 period_months: config.global.period_months,
326 seed: config.global.seed.unwrap_or(0),
327 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
328 companies: config
329 .companies
330 .iter()
331 .map(|c| CompanyConfigProto {
332 code: c.code.clone(),
333 name: c.name.clone(),
334 currency: c.currency.clone(),
335 country: c.country.clone(),
336 annual_transaction_volume: c.annual_transaction_volume.count(),
337 volume_weight: c.volume_weight as f32,
338 })
339 .collect(),
340 fraud_enabled: config.fraud.enabled,
341 fraud_rate: config.fraud.fraud_rate as f32,
342 generate_master_data: true,
343 generate_document_flows: true,
344 }
345 }
346}
347
348#[tonic::async_trait]
349impl synthetic_data_service_server::SyntheticDataService for SynthService {
350 async fn bulk_generate(
352 &self,
353 request: Request<BulkGenerateRequest>,
354 ) -> Result<Response<BulkGenerateResponse>, Status> {
355 let req = request.into_inner();
356
357 const MAX_ENTRY_COUNT: u64 = 1_000_000;
359 if req.entry_count > MAX_ENTRY_COUNT {
360 return Err(Status::invalid_argument(format!(
361 "entry_count ({}) exceeds maximum allowed value ({})",
362 req.entry_count, MAX_ENTRY_COUNT
363 )));
364 }
365
366 let degradation_level = self.state.check_resources()?;
368 if degradation_level != DegradationLevel::Normal {
369 warn!(
370 "Starting bulk generation under resource pressure (level: {:?})",
371 degradation_level
372 );
373 }
374
375 info!("Bulk generate request: {} entries", req.entry_count);
376
377 let config = self.proto_to_config(req.config).await?;
378 let start_time = Instant::now();
379
380 let phase_config = PhaseConfig {
382 generate_master_data: req.include_master_data,
383 generate_document_flows: false,
384 generate_journal_entries: true,
385 inject_anomalies: req.inject_anomalies,
386 show_progress: false,
387 ..Default::default()
388 };
389
390 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
391 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
392
393 let result = orchestrator
394 .generate()
395 .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
396
397 let duration_ms = start_time.elapsed().as_millis() as u64;
398
399 let entries_count = result.journal_entries.len() as u64;
401 self.state
402 .total_entries
403 .fetch_add(entries_count, Ordering::Relaxed);
404
405 let anomaly_count = result.anomaly_labels.labels.len() as u64;
406 self.state
407 .total_anomalies
408 .fetch_add(anomaly_count, Ordering::Relaxed);
409
410 let journal_entries: Vec<JournalEntryProto> = result
412 .journal_entries
413 .iter()
414 .map(Self::journal_entry_to_proto)
415 .collect();
416
417 let anomaly_labels: Vec<AnomalyLabelProto> = result
418 .anomaly_labels
419 .labels
420 .iter()
421 .map(|a| AnomalyLabelProto {
422 anomaly_id: a.anomaly_id.clone(),
423 document_id: a.document_id.clone(),
424 anomaly_type: format!("{:?}", a.anomaly_type),
425 anomaly_category: a.document_type.clone(),
426 description: a.description.clone(),
427 severity_score: a.severity as f32,
428 })
429 .collect();
430
431 let mut total_debit = rust_decimal::Decimal::ZERO;
433 let mut total_credit = rust_decimal::Decimal::ZERO;
434 let mut total_lines = 0u64;
435 let mut entries_by_company = std::collections::HashMap::new();
436 let mut entries_by_source = std::collections::HashMap::new();
437
438 for entry in &result.journal_entries {
439 *entries_by_company
440 .entry(entry.header.company_code.clone())
441 .or_insert(0u64) += 1;
442 *entries_by_source
443 .entry(format!("{:?}", entry.header.source))
444 .or_insert(0u64) += 1;
445
446 for line in &entry.lines {
447 total_lines += 1;
448 total_debit += line.debit_amount;
449 total_credit += line.credit_amount;
450 }
451 }
452
453 let stats = GenerationStats {
454 total_entries: entries_count,
455 total_lines,
456 total_debit_amount: total_debit.to_string(),
457 total_credit_amount: total_credit.to_string(),
458 anomaly_count,
459 entries_by_company,
460 entries_by_source,
461 };
462
463 info!(
464 "Bulk generation complete: {} entries in {}ms",
465 entries_count, duration_ms
466 );
467
468 Ok(Response::new(BulkGenerateResponse {
469 entries_generated: entries_count,
470 duration_ms,
471 journal_entries,
472 anomaly_labels,
473 stats: Some(stats),
474 }))
475 }
476
477 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
478
479 async fn stream_data(
481 &self,
482 request: Request<StreamDataRequest>,
483 ) -> Result<Response<Self::StreamDataStream>, Status> {
484 let req = request.into_inner();
485
486 const MIN_EVENTS_PER_SECOND: u32 = 1;
488 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
489 if req.events_per_second < MIN_EVENTS_PER_SECOND {
490 return Err(Status::invalid_argument(format!(
491 "events_per_second ({}) must be at least {}",
492 req.events_per_second, MIN_EVENTS_PER_SECOND
493 )));
494 }
495 if req.events_per_second > MAX_EVENTS_PER_SECOND {
496 return Err(Status::invalid_argument(format!(
497 "events_per_second ({}) exceeds maximum allowed value ({})",
498 req.events_per_second, MAX_EVENTS_PER_SECOND
499 )));
500 }
501
502 const MAX_STREAM_EVENTS: u64 = 10_000_000;
504 if req.max_events > MAX_STREAM_EVENTS {
505 return Err(Status::invalid_argument(format!(
506 "max_events ({}) exceeds maximum allowed value ({})",
507 req.max_events, MAX_STREAM_EVENTS
508 )));
509 }
510
511 let degradation_level = self.state.check_resources()?;
513 if degradation_level != DegradationLevel::Normal {
514 warn!(
515 "Starting stream under resource pressure (level: {:?})",
516 degradation_level
517 );
518 }
519
520 info!(
521 "Stream data request: {} events/sec, max {}",
522 req.events_per_second, req.max_events
523 );
524
525 let config = self.proto_to_config(req.config).await?;
526 let state = self.state.clone();
527
528 state.active_streams.fetch_add(1, Ordering::Relaxed);
530
531 state.stream_paused.store(false, Ordering::Relaxed);
533 state.stream_stopped.store(false, Ordering::Relaxed);
534
535 let (tx, rx) = mpsc::channel(100);
536
537 let events_per_second = req.events_per_second;
539 let max_events = req.max_events;
540 let inject_anomalies = req.inject_anomalies;
541
542 tokio::spawn(async move {
543 let phase_config = PhaseConfig {
544 generate_master_data: false,
545 generate_document_flows: false,
546 generate_journal_entries: true,
547 inject_anomalies,
548 show_progress: false,
549 ..Default::default()
550 };
551
552 let mut sequence = 0u64;
553 let delay = if events_per_second > 0 {
554 Duration::from_micros(1_000_000 / events_per_second as u64)
555 } else {
556 Duration::from_millis(1)
557 };
558
559 loop {
560 if state.stream_stopped.load(Ordering::Relaxed) {
562 info!("Stream stopped by control command");
563 break;
564 }
565
566 while state.stream_paused.load(Ordering::Relaxed) {
568 tokio::time::sleep(Duration::from_millis(100)).await;
569 if state.stream_stopped.load(Ordering::Relaxed) {
570 break;
571 }
572 }
573
574 if max_events > 0 && sequence >= max_events {
576 info!("Stream reached max events: {}", max_events);
577 break;
578 }
579
580 let mut orchestrator =
582 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
583 Ok(o) => o,
584 Err(e) => {
585 error!("Failed to create orchestrator: {}", e);
586 break;
587 }
588 };
589
590 let result = match orchestrator.generate() {
591 Ok(r) => r,
592 Err(e) => {
593 error!("Generation failed: {}", e);
594 break;
595 }
596 };
597
598 for entry in result.journal_entries {
600 sequence += 1;
601 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
602 state.total_entries.fetch_add(1, Ordering::Relaxed);
603
604 let timestamp = Timestamp {
605 seconds: Utc::now().timestamp(),
606 nanos: 0,
607 };
608
609 let event = DataEvent {
610 sequence,
611 timestamp: Some(timestamp),
612 event: Some(data_event::Event::JournalEntry(
613 SynthService::journal_entry_to_proto(&entry),
614 )),
615 };
616
617 if tx.send(Ok(event)).await.is_err() {
618 info!("Stream receiver dropped");
619 break;
620 }
621
622 tokio::time::sleep(delay).await;
624
625 if max_events > 0 && sequence >= max_events {
627 break;
628 }
629 }
630 }
631
632 state.active_streams.fetch_sub(1, Ordering::Relaxed);
634 });
635
636 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
637 }
638
639 async fn control(
641 &self,
642 request: Request<ControlCommand>,
643 ) -> Result<Response<ControlResponse>, Status> {
644 let cmd = request.into_inner();
645 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
646
647 info!("Control command: {:?}", action);
648
649 let (success, message, status) = match action {
650 ControlAction::Pause => {
651 self.state.stream_paused.store(true, Ordering::Relaxed);
652 (true, "Stream paused".to_string(), StreamStatus::Paused)
653 }
654 ControlAction::Resume => {
655 self.state.stream_paused.store(false, Ordering::Relaxed);
656 (true, "Stream resumed".to_string(), StreamStatus::Running)
657 }
658 ControlAction::Stop => {
659 self.state.stream_stopped.store(true, Ordering::Relaxed);
660 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
661 }
662 ControlAction::TriggerPattern => {
663 let pattern = cmd.pattern_name.unwrap_or_default();
664 if pattern.is_empty() {
665 (
666 false,
667 "Pattern name is required for TriggerPattern action".to_string(),
668 StreamStatus::Running,
669 )
670 } else {
671 let valid_patterns = [
674 "year_end_spike",
675 "period_end_spike",
676 "holiday_cluster",
677 "fraud_cluster",
678 "error_cluster",
679 "uniform",
680 ];
681 let is_valid = valid_patterns.contains(&pattern.as_str())
682 || pattern.starts_with("custom:");
683
684 if is_valid {
685 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
687 *triggered = Some(pattern.clone());
688 }
689 info!("Pattern trigger activated: {}", pattern);
690 (
691 true,
692 format!("Pattern '{}' will be applied to upcoming entries", pattern),
693 StreamStatus::Running,
694 )
695 } else {
696 (
697 false,
698 format!(
699 "Unknown pattern '{}'. Valid patterns: {:?}",
700 pattern, valid_patterns
701 ),
702 StreamStatus::Running,
703 )
704 }
705 }
706 }
707 ControlAction::Unspecified => (
708 false,
709 "Unknown control action".to_string(),
710 StreamStatus::Unspecified,
711 ),
712 };
713
714 Ok(Response::new(ControlResponse {
715 success,
716 message,
717 current_status: status as i32,
718 }))
719 }
720
721 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
723 let config = self.state.config.read().await;
724 let proto_config = Self::config_to_proto(&config);
725
726 Ok(Response::new(ConfigResponse {
727 success: true,
728 message: "Current configuration retrieved".to_string(),
729 current_config: Some(proto_config),
730 }))
731 }
732
733 async fn set_config(
735 &self,
736 request: Request<ConfigRequest>,
737 ) -> Result<Response<ConfigResponse>, Status> {
738 let req = request.into_inner();
739
740 if let Some(proto_config) = req.config {
741 let new_config = self.proto_to_config(Some(proto_config)).await?;
742
743 let mut config = self.state.config.write().await;
744 *config = new_config.clone();
745
746 info!("Configuration updated");
747
748 Ok(Response::new(ConfigResponse {
749 success: true,
750 message: "Configuration updated".to_string(),
751 current_config: Some(Self::config_to_proto(&new_config)),
752 }))
753 } else {
754 Err(Status::invalid_argument("No configuration provided"))
755 }
756 }
757
758 async fn get_metrics(
760 &self,
761 _request: Request<()>,
762 ) -> Result<Response<MetricsResponse>, Status> {
763 let uptime = self.state.uptime_seconds();
764 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
765
766 let entries_per_second = if uptime > 0 {
767 total_entries as f64 / uptime as f64
768 } else {
769 0.0
770 };
771
772 Ok(Response::new(MetricsResponse {
773 total_entries_generated: total_entries,
774 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
775 uptime_seconds: uptime,
776 session_entries: total_entries,
777 session_entries_per_second: entries_per_second,
778 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
779 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
780 }))
781 }
782
783 async fn health_check(
785 &self,
786 _request: Request<()>,
787 ) -> Result<Response<HealthResponse>, Status> {
788 Ok(Response::new(HealthResponse {
789 healthy: true,
790 version: env!("CARGO_PKG_VERSION").to_string(),
791 uptime_seconds: self.state.uptime_seconds(),
792 }))
793 }
794}
795
796pub fn default_generator_config() -> GeneratorConfig {
798 GeneratorConfig {
799 global: GlobalConfig {
800 seed: None,
801 industry: IndustrySector::Manufacturing,
802 start_date: "2024-01-01".to_string(),
803 period_months: 12,
804 group_currency: "USD".to_string(),
805 parallel: true,
806 worker_threads: 0,
807 memory_limit_mb: 0,
808 },
809 companies: vec![CompanyConfig {
810 code: "1000".to_string(),
811 name: "Default Company".to_string(),
812 currency: "USD".to_string(),
813 country: "US".to_string(),
814 annual_transaction_volume: TransactionVolume::TenK,
815 volume_weight: 1.0,
816 fiscal_year_variant: "K4".to_string(),
817 }],
818 chart_of_accounts: ChartOfAccountsConfig {
819 complexity: CoAComplexity::Small,
820 industry_specific: true,
821 custom_accounts: None,
822 min_hierarchy_depth: 2,
823 max_hierarchy_depth: 5,
824 },
825 transactions: Default::default(),
826 output: OutputConfig::default(),
827 fraud: Default::default(),
828 internal_controls: Default::default(),
829 business_processes: Default::default(),
830 user_personas: Default::default(),
831 templates: Default::default(),
832 approval: Default::default(),
833 departments: Default::default(),
834 master_data: Default::default(),
835 document_flows: Default::default(),
836 intercompany: Default::default(),
837 balance: Default::default(),
838 ocpm: Default::default(),
839 audit: Default::default(),
840 banking: Default::default(),
841 data_quality: Default::default(),
842 scenario: Default::default(),
843 temporal: Default::default(),
844 graph_export: Default::default(),
845 streaming: Default::default(),
846 rate_limit: Default::default(),
847 temporal_attributes: Default::default(),
848 relationships: Default::default(),
849 accounting_standards: Default::default(),
850 audit_standards: Default::default(),
851 distributions: Default::default(),
852 temporal_patterns: Default::default(),
853 vendor_network: Default::default(),
854 customer_segmentation: Default::default(),
855 relationship_strength: Default::default(),
856 cross_process_links: Default::default(),
857 organizational_events: Default::default(),
858 behavioral_drift: Default::default(),
859 market_drift: Default::default(),
860 drift_labeling: Default::default(),
861 anomaly_injection: Default::default(),
862 industry_specific: Default::default(),
863 fingerprint_privacy: Default::default(),
864 quality_gates: Default::default(),
865 compliance: Default::default(),
866 webhooks: Default::default(),
867 llm: Default::default(),
868 diffusion: Default::default(),
869 causal: Default::default(),
870 source_to_pay: Default::default(),
871 financial_reporting: Default::default(),
872 hr: Default::default(),
873 manufacturing: Default::default(),
874 sales_quotes: Default::default(),
875 tax: Default::default(),
876 treasury: Default::default(),
877 project_accounting: Default::default(),
878 esg: Default::default(),
879 country_packs: None,
880 }
881}
882
883#[cfg(test)]
884#[allow(clippy::unwrap_used)]
885mod tests {
886 use super::*;
887 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
888
889 #[tokio::test]
894 async fn test_service_creation() {
895 let config = default_generator_config();
896 let service = SynthService::new(config);
897 assert!(service.state.uptime_seconds() < 60);
899 }
900
901 #[tokio::test]
902 async fn test_service_with_state() {
903 let config = default_generator_config();
904 let state = Arc::new(ServerState::new(config));
905 let service = SynthService::with_state(Arc::clone(&state));
906
907 state.total_entries.store(100, Ordering::Relaxed);
909 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
910 }
911
912 #[tokio::test]
917 async fn test_health_check() {
918 let config = default_generator_config();
919 let service = SynthService::new(config);
920
921 let response = service.health_check(Request::new(())).await.unwrap();
922 let health = response.into_inner();
923
924 assert!(health.healthy);
925 assert!(!health.version.is_empty());
926 }
927
928 #[tokio::test]
929 async fn test_health_check_returns_version() {
930 let config = default_generator_config();
931 let service = SynthService::new(config);
932
933 let response = service.health_check(Request::new(())).await.unwrap();
934 let health = response.into_inner();
935
936 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
937 }
938
939 #[tokio::test]
944 async fn test_get_config() {
945 let config = default_generator_config();
946 let service = SynthService::new(config);
947
948 let response = service.get_config(Request::new(())).await.unwrap();
949 let config_response = response.into_inner();
950
951 assert!(config_response.success);
952 assert!(config_response.current_config.is_some());
953 }
954
955 #[tokio::test]
956 async fn test_get_config_returns_industry() {
957 let config = default_generator_config();
958 let service = SynthService::new(config);
959
960 let response = service.get_config(Request::new(())).await.unwrap();
961 let config_response = response.into_inner();
962 let current = config_response.current_config.unwrap();
963
964 assert_eq!(current.industry, "Manufacturing");
965 }
966
967 #[tokio::test]
968 async fn test_set_config() {
969 let config = default_generator_config();
970 let service = SynthService::new(config);
971
972 let new_config = GenerationConfig {
973 industry: "retail".to_string(),
974 start_date: "2024-06-01".to_string(),
975 period_months: 6,
976 seed: 42,
977 coa_complexity: "medium".to_string(),
978 companies: vec![],
979 fraud_enabled: true,
980 fraud_rate: 0.05,
981 generate_master_data: false,
982 generate_document_flows: false,
983 };
984
985 let response = service
986 .set_config(Request::new(ConfigRequest {
987 config: Some(new_config),
988 }))
989 .await
990 .unwrap();
991 let config_response = response.into_inner();
992
993 assert!(config_response.success);
994 }
995
996 #[tokio::test]
997 async fn test_set_config_without_config_fails() {
998 let config = default_generator_config();
999 let service = SynthService::new(config);
1000
1001 let result = service
1002 .set_config(Request::new(ConfigRequest { config: None }))
1003 .await;
1004
1005 assert!(result.is_err());
1006 }
1007
1008 #[tokio::test]
1013 async fn test_get_metrics_initial() {
1014 let config = default_generator_config();
1015 let service = SynthService::new(config);
1016
1017 let response = service.get_metrics(Request::new(())).await.unwrap();
1018 let metrics = response.into_inner();
1019
1020 assert_eq!(metrics.total_entries_generated, 0);
1021 assert_eq!(metrics.total_anomalies_injected, 0);
1022 assert_eq!(metrics.active_streams, 0);
1023 }
1024
1025 #[tokio::test]
1026 async fn test_get_metrics_after_updates() {
1027 let config = default_generator_config();
1028 let service = SynthService::new(config);
1029
1030 service.state.total_entries.store(1000, Ordering::Relaxed);
1032 service.state.total_anomalies.store(20, Ordering::Relaxed);
1033 service.state.active_streams.store(2, Ordering::Relaxed);
1034
1035 let response = service.get_metrics(Request::new(())).await.unwrap();
1036 let metrics = response.into_inner();
1037
1038 assert_eq!(metrics.total_entries_generated, 1000);
1039 assert_eq!(metrics.total_anomalies_injected, 20);
1040 assert_eq!(metrics.active_streams, 2);
1041 }
1042
1043 #[tokio::test]
1048 async fn test_control_pause() {
1049 let config = default_generator_config();
1050 let service = SynthService::new(config);
1051
1052 let response = service
1053 .control(Request::new(ControlCommand {
1054 action: ControlAction::Pause as i32,
1055 pattern_name: None,
1056 }))
1057 .await
1058 .unwrap();
1059 let control_response = response.into_inner();
1060
1061 assert!(control_response.success);
1062 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1063 }
1064
1065 #[tokio::test]
1066 async fn test_control_resume() {
1067 let config = default_generator_config();
1068 let service = SynthService::new(config);
1069
1070 service.state.stream_paused.store(true, Ordering::Relaxed);
1072
1073 let response = service
1074 .control(Request::new(ControlCommand {
1075 action: ControlAction::Resume as i32,
1076 pattern_name: None,
1077 }))
1078 .await
1079 .unwrap();
1080 let control_response = response.into_inner();
1081
1082 assert!(control_response.success);
1083 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1084 }
1085
1086 #[tokio::test]
1087 async fn test_control_stop() {
1088 let config = default_generator_config();
1089 let service = SynthService::new(config);
1090
1091 let response = service
1092 .control(Request::new(ControlCommand {
1093 action: ControlAction::Stop as i32,
1094 pattern_name: None,
1095 }))
1096 .await
1097 .unwrap();
1098 let control_response = response.into_inner();
1099
1100 assert!(control_response.success);
1101 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1102 }
1103
1104 #[test]
1109 fn test_server_state_creation() {
1110 let config = default_generator_config();
1111 let state = ServerState::new(config);
1112
1113 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1114 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1115 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1116 assert!(!state.stream_paused.load(Ordering::Relaxed));
1117 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1118 }
1119
1120 #[test]
1121 fn test_server_state_uptime() {
1122 let config = default_generator_config();
1123 let state = ServerState::new(config);
1124
1125 assert!(state.uptime_seconds() < 60);
1127 }
1128
1129 #[test]
1134 fn test_default_generator_config() {
1135 let config = default_generator_config();
1136
1137 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1138 assert_eq!(config.global.period_months, 12);
1139 assert!(!config.companies.is_empty());
1140 assert_eq!(config.companies[0].code, "1000");
1141 }
1142
1143 #[test]
1144 fn test_config_to_proto() {
1145 let config = default_generator_config();
1146 let proto = SynthService::config_to_proto(&config);
1147
1148 assert_eq!(proto.industry, "Manufacturing");
1149 assert_eq!(proto.period_months, 12);
1150 assert!(!proto.companies.is_empty());
1151 }
1152
1153 #[tokio::test]
1154 async fn test_proto_to_config_with_none() {
1155 let config = default_generator_config();
1156 let service = SynthService::new(config.clone());
1157
1158 let result = service.proto_to_config(None).await.unwrap();
1159
1160 assert_eq!(result.global.industry, config.global.industry);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_proto_to_config_with_retail() {
1166 let config = default_generator_config();
1167 let service = SynthService::new(config);
1168
1169 let proto = GenerationConfig {
1170 industry: "retail".to_string(),
1171 start_date: "2024-01-01".to_string(),
1172 period_months: 6,
1173 seed: 0,
1174 coa_complexity: "large".to_string(),
1175 companies: vec![],
1176 fraud_enabled: false,
1177 fraud_rate: 0.0,
1178 generate_master_data: false,
1179 generate_document_flows: false,
1180 };
1181
1182 let result = service.proto_to_config(Some(proto)).await.unwrap();
1183
1184 assert_eq!(result.global.industry, IndustrySector::Retail);
1185 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1186 }
1187
1188 #[tokio::test]
1189 async fn test_proto_to_config_with_healthcare() {
1190 let config = default_generator_config();
1191 let service = SynthService::new(config);
1192
1193 let proto = GenerationConfig {
1194 industry: "healthcare".to_string(),
1195 start_date: "2024-01-01".to_string(),
1196 period_months: 12,
1197 seed: 42,
1198 coa_complexity: "small".to_string(),
1199 companies: vec![],
1200 fraud_enabled: true,
1201 fraud_rate: 0.1,
1202 generate_master_data: true,
1203 generate_document_flows: true,
1204 };
1205
1206 let result = service.proto_to_config(Some(proto)).await.unwrap();
1207
1208 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1209 assert_eq!(result.global.seed, Some(42));
1210 assert!(result.fraud.enabled);
1211 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_proto_to_config_with_companies() {
1216 let config = default_generator_config();
1217 let service = SynthService::new(config);
1218
1219 let proto = GenerationConfig {
1220 industry: "technology".to_string(),
1221 start_date: "2024-01-01".to_string(),
1222 period_months: 12,
1223 seed: 0,
1224 coa_complexity: "medium".to_string(),
1225 companies: vec![
1226 CompanyConfigProto {
1227 code: "1000".to_string(),
1228 name: "Parent Corp".to_string(),
1229 currency: "USD".to_string(),
1230 country: "US".to_string(),
1231 annual_transaction_volume: 100000,
1232 volume_weight: 1.0,
1233 },
1234 CompanyConfigProto {
1235 code: "2000".to_string(),
1236 name: "EU Sub".to_string(),
1237 currency: "EUR".to_string(),
1238 country: "DE".to_string(),
1239 annual_transaction_volume: 50000,
1240 volume_weight: 0.5,
1241 },
1242 ],
1243 fraud_enabled: false,
1244 fraud_rate: 0.0,
1245 generate_master_data: false,
1246 generate_document_flows: false,
1247 };
1248
1249 let result = service.proto_to_config(Some(proto)).await.unwrap();
1250
1251 assert_eq!(result.companies.len(), 2);
1252 assert_eq!(result.companies[0].code, "1000");
1253 assert_eq!(result.companies[1].currency, "EUR");
1254 }
1255
1256 #[tokio::test]
1261 async fn test_bulk_generate_entry_count_validation() {
1262 let config = default_generator_config();
1263 let service = SynthService::new(config);
1264
1265 let request = BulkGenerateRequest {
1266 entry_count: 2_000_000, include_master_data: false,
1268 inject_anomalies: false,
1269 output_format: 0,
1270 config: None,
1271 };
1272
1273 let result = service.bulk_generate(Request::new(request)).await;
1274 assert!(result.is_err());
1275 let err = result.err().unwrap();
1276 assert!(err.message().contains("exceeds maximum allowed value"));
1277 }
1278
1279 #[tokio::test]
1280 async fn test_stream_data_events_per_second_too_low() {
1281 let config = default_generator_config();
1282 let service = SynthService::new(config);
1283
1284 let request = StreamDataRequest {
1285 events_per_second: 0, max_events: 100,
1287 inject_anomalies: false,
1288 anomaly_rate: 0.0,
1289 config: None,
1290 };
1291
1292 let result = service.stream_data(Request::new(request)).await;
1293 assert!(result.is_err());
1294 let err = result.err().unwrap();
1295 assert!(err.message().contains("must be at least"));
1296 }
1297
1298 #[tokio::test]
1299 async fn test_stream_data_events_per_second_too_high() {
1300 let config = default_generator_config();
1301 let service = SynthService::new(config);
1302
1303 let request = StreamDataRequest {
1304 events_per_second: 20_000, max_events: 100,
1306 inject_anomalies: false,
1307 anomaly_rate: 0.0,
1308 config: None,
1309 };
1310
1311 let result = service.stream_data(Request::new(request)).await;
1312 assert!(result.is_err());
1313 let err = result.err().unwrap();
1314 assert!(err.message().contains("exceeds maximum allowed value"));
1315 }
1316
1317 #[tokio::test]
1318 async fn test_stream_data_max_events_too_high() {
1319 let config = default_generator_config();
1320 let service = SynthService::new(config);
1321
1322 let request = StreamDataRequest {
1323 events_per_second: 100,
1324 max_events: 100_000_000, inject_anomalies: false,
1326 anomaly_rate: 0.0,
1327 config: None,
1328 };
1329
1330 let result = service.stream_data(Request::new(request)).await;
1331 assert!(result.is_err());
1332 let err = result.err().unwrap();
1333 assert!(err.message().contains("max_events"));
1334 }
1335
1336 #[tokio::test]
1337 async fn test_stream_data_valid_request() {
1338 let config = default_generator_config();
1339 let service = SynthService::new(config);
1340
1341 let request = StreamDataRequest {
1342 events_per_second: 10,
1343 max_events: 5,
1344 inject_anomalies: false,
1345 anomaly_rate: 0.0,
1346 config: None,
1347 };
1348
1349 let result = service.stream_data(Request::new(request)).await;
1352 assert!(result.is_ok());
1353 }
1354}