1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 country: "US".to_string(),
236 annual_transaction_volume: TransactionVolume::TenK,
237 volume_weight: 1.0,
238 fiscal_year_variant: "K4".to_string(),
239 }]
240 } else {
241 p.companies
242 .into_iter()
243 .map(|c| CompanyConfig {
244 code: c.code,
245 name: c.name,
246 currency: c.currency,
247 country: c.country,
248 annual_transaction_volume: TransactionVolume::Custom(
249 c.annual_transaction_volume,
250 ),
251 volume_weight: c.volume_weight as f64,
252 fiscal_year_variant: "K4".to_string(),
253 })
254 .collect()
255 };
256
257 let mut config = GeneratorConfig {
258 global: GlobalConfig {
259 seed: if p.seed > 0 { Some(p.seed) } else { None },
260 industry,
261 start_date: if p.start_date.is_empty() {
262 "2024-01-01".to_string()
263 } else {
264 p.start_date
265 },
266 period_months: if p.period_months == 0 {
267 12
268 } else {
269 p.period_months
270 },
271 group_currency: "USD".to_string(),
272 parallel: true,
273 worker_threads: 0,
274 memory_limit_mb: 0,
275 fiscal_year_months: None,
276 },
277 companies,
278 chart_of_accounts: ChartOfAccountsConfig {
279 complexity,
280 industry_specific: true,
281 custom_accounts: None,
282 min_hierarchy_depth: 2,
283 max_hierarchy_depth: 5,
284 },
285 ..default_generator_config()
286 };
287
288 if p.fraud_enabled {
290 config.fraud.enabled = true;
291 config.fraud.fraud_rate = p.fraud_rate as f64;
292 }
293
294 Ok(config)
295 }
296 None => {
297 let config = self.state.config.read().await;
299 Ok(config.clone())
300 }
301 }
302 }
303
304 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
306 JournalEntryProto {
307 document_id: entry.header.document_id.to_string(),
308 company_code: entry.header.company_code.clone(),
309 fiscal_year: entry.header.fiscal_year as u32,
310 fiscal_period: entry.header.fiscal_period as u32,
311 posting_date: entry.header.posting_date.to_string(),
312 document_date: entry.header.document_date.to_string(),
313 created_at: entry.header.created_at.to_rfc3339(),
314 source: format!("{:?}", entry.header.source),
315 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
316 lines: entry
317 .lines
318 .iter()
319 .map(|line| {
320 let amount = if line.is_debit() {
321 line.debit_amount
322 } else {
323 line.credit_amount
324 };
325 JournalLineProto {
326 line_number: line.line_number,
327 account_number: line.gl_account.clone(),
328 account_name: line.account_description.clone().unwrap_or_default(),
329 amount: amount.to_string(),
330 is_debit: line.is_debit(),
331 cost_center: line.cost_center.clone(),
332 profit_center: line.profit_center.clone(),
333 vendor_id: line.auxiliary_account_number.clone(),
336 customer_id: None,
337 material_id: None,
338 text: line.line_text.clone().or_else(|| line.text.clone()),
339 }
340 })
341 .collect(),
342 is_anomaly: entry.header.is_fraud,
343 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
344 }
345 }
346
347 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
349 GenerationConfig {
350 industry: format!("{:?}", config.global.industry),
351 start_date: config.global.start_date.clone(),
352 period_months: config.global.period_months,
353 seed: config.global.seed.unwrap_or(0),
354 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
355 companies: config
356 .companies
357 .iter()
358 .map(|c| CompanyConfigProto {
359 code: c.code.clone(),
360 name: c.name.clone(),
361 currency: c.currency.clone(),
362 country: c.country.clone(),
363 annual_transaction_volume: c.annual_transaction_volume.count(),
364 volume_weight: c.volume_weight as f32,
365 })
366 .collect(),
367 fraud_enabled: config.fraud.enabled,
368 fraud_rate: config.fraud.fraud_rate as f32,
369 generate_master_data: config.master_data.vendors.count > 0
370 || config.master_data.customers.count > 0
371 || config.master_data.materials.count > 0,
372 generate_document_flows: config.document_flows.p2p.enabled
373 || config.document_flows.o2c.enabled,
374 }
375 }
376}
377
378#[tonic::async_trait]
379impl synthetic_data_service_server::SyntheticDataService for SynthService {
380 async fn bulk_generate(
382 &self,
383 request: Request<BulkGenerateRequest>,
384 ) -> Result<Response<BulkGenerateResponse>, Status> {
385 let req = request.into_inner();
386
387 const MAX_ENTRY_COUNT: u64 = 1_000_000;
389 if req.entry_count > MAX_ENTRY_COUNT {
390 return Err(Status::invalid_argument(format!(
391 "entry_count ({}) exceeds maximum allowed value ({})",
392 req.entry_count, MAX_ENTRY_COUNT
393 )));
394 }
395
396 let degradation_level = self.state.check_resources()?;
398 if degradation_level != DegradationLevel::Normal {
399 warn!(
400 "Starting bulk generation under resource pressure (level: {:?})",
401 degradation_level
402 );
403 }
404
405 info!("Bulk generate request: {} entries", req.entry_count);
406
407 let config = self.proto_to_config(req.config).await?;
408 let start_time = Instant::now();
409
410 let phase_config = PhaseConfig {
412 generate_master_data: req.include_master_data,
413 generate_document_flows: false,
414 generate_journal_entries: true,
415 inject_anomalies: req.inject_anomalies,
416 show_progress: false,
417 ..Default::default()
418 };
419
420 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
421 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
422
423 let result = orchestrator
424 .generate()
425 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
426
427 let duration_ms = start_time.elapsed().as_millis() as u64;
428
429 let entries_count = result.journal_entries.len() as u64;
431 self.state
432 .total_entries
433 .fetch_add(entries_count, Ordering::Relaxed);
434
435 let anomaly_count = result.anomaly_labels.labels.len() as u64;
436 self.state
437 .total_anomalies
438 .fetch_add(anomaly_count, Ordering::Relaxed);
439
440 let journal_entries: Vec<JournalEntryProto> = result
442 .journal_entries
443 .iter()
444 .map(Self::journal_entry_to_proto)
445 .collect();
446
447 let anomaly_labels: Vec<AnomalyLabelProto> = result
448 .anomaly_labels
449 .labels
450 .iter()
451 .map(|a| AnomalyLabelProto {
452 anomaly_id: a.anomaly_id.clone(),
453 document_id: a.document_id.clone(),
454 anomaly_type: format!("{:?}", a.anomaly_type),
455 anomaly_category: a.document_type.clone(),
456 description: a.description.clone(),
457 severity_score: a.severity as f32,
458 })
459 .collect();
460
461 let mut total_debit = rust_decimal::Decimal::ZERO;
463 let mut total_credit = rust_decimal::Decimal::ZERO;
464 let mut total_lines = 0u64;
465 let mut entries_by_company = std::collections::HashMap::new();
466 let mut entries_by_source = std::collections::HashMap::new();
467
468 for entry in &result.journal_entries {
469 *entries_by_company
470 .entry(entry.header.company_code.clone())
471 .or_insert(0u64) += 1;
472 *entries_by_source
473 .entry(format!("{:?}", entry.header.source))
474 .or_insert(0u64) += 1;
475
476 for line in &entry.lines {
477 total_lines += 1;
478 total_debit += line.debit_amount;
479 total_credit += line.credit_amount;
480 }
481 }
482
483 let stats = GenerationStats {
484 total_entries: entries_count,
485 total_lines,
486 total_debit_amount: total_debit.to_string(),
487 total_credit_amount: total_credit.to_string(),
488 anomaly_count,
489 entries_by_company,
490 entries_by_source,
491 };
492
493 info!(
494 "Bulk generation complete: {} entries in {}ms",
495 entries_count, duration_ms
496 );
497
498 Ok(Response::new(BulkGenerateResponse {
499 entries_generated: entries_count,
500 duration_ms,
501 journal_entries,
502 anomaly_labels,
503 stats: Some(stats),
504 }))
505 }
506
507 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
508
509 async fn stream_data(
511 &self,
512 request: Request<StreamDataRequest>,
513 ) -> Result<Response<Self::StreamDataStream>, Status> {
514 let req = request.into_inner();
515
516 const MIN_EVENTS_PER_SECOND: u32 = 1;
518 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
519 if req.events_per_second < MIN_EVENTS_PER_SECOND {
520 return Err(Status::invalid_argument(format!(
521 "events_per_second ({}) must be at least {}",
522 req.events_per_second, MIN_EVENTS_PER_SECOND
523 )));
524 }
525 if req.events_per_second > MAX_EVENTS_PER_SECOND {
526 return Err(Status::invalid_argument(format!(
527 "events_per_second ({}) exceeds maximum allowed value ({})",
528 req.events_per_second, MAX_EVENTS_PER_SECOND
529 )));
530 }
531
532 const MAX_STREAM_EVENTS: u64 = 10_000_000;
534 if req.max_events > MAX_STREAM_EVENTS {
535 return Err(Status::invalid_argument(format!(
536 "max_events ({}) exceeds maximum allowed value ({})",
537 req.max_events, MAX_STREAM_EVENTS
538 )));
539 }
540
541 let degradation_level = self.state.check_resources()?;
543 if degradation_level != DegradationLevel::Normal {
544 warn!(
545 "Starting stream under resource pressure (level: {:?})",
546 degradation_level
547 );
548 }
549
550 info!(
551 "Stream data request: {} events/sec, max {}",
552 req.events_per_second, req.max_events
553 );
554
555 let config = self.proto_to_config(req.config).await?;
556 let state = self.state.clone();
557
558 state.active_streams.fetch_add(1, Ordering::Relaxed);
560
561 state.stream_paused.store(false, Ordering::Relaxed);
563 state.stream_stopped.store(false, Ordering::Relaxed);
564
565 let (tx, rx) = mpsc::channel(100);
566
567 let events_per_second = req.events_per_second;
569 let max_events = req.max_events;
570 let inject_anomalies = req.inject_anomalies;
571
572 tokio::spawn(async move {
573 let phase_config = PhaseConfig {
574 generate_master_data: false,
575 generate_document_flows: false,
576 generate_journal_entries: true,
577 inject_anomalies,
578 show_progress: false,
579 ..Default::default()
580 };
581
582 let mut sequence = 0u64;
583 let delay = if events_per_second > 0 {
584 Duration::from_micros(1_000_000 / events_per_second as u64)
585 } else {
586 Duration::from_millis(1)
587 };
588
589 let mut orchestrator =
591 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
592 Ok(o) => o,
593 Err(e) => {
594 error!("Failed to create orchestrator: {}", e);
595 return;
596 }
597 };
598
599 loop {
600 if state.stream_stopped.load(Ordering::Relaxed) {
602 info!("Stream stopped by control command");
603 break;
604 }
605
606 while state.stream_paused.load(Ordering::Relaxed) {
608 tokio::time::sleep(Duration::from_millis(100)).await;
609 if state.stream_stopped.load(Ordering::Relaxed) {
610 break;
611 }
612 }
613
614 if max_events > 0 && sequence >= max_events {
616 info!("Stream reached max events: {}", max_events);
617 break;
618 }
619
620 let result = match orchestrator.generate() {
622 Ok(r) => r,
623 Err(e) => {
624 error!("Generation failed: {}", e);
625 break;
626 }
627 };
628
629 for entry in result.journal_entries {
631 sequence += 1;
632 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
633 state.total_entries.fetch_add(1, Ordering::Relaxed);
634
635 let timestamp = Timestamp {
636 seconds: Utc::now().timestamp(),
637 nanos: 0,
638 };
639
640 let event = DataEvent {
641 sequence,
642 timestamp: Some(timestamp),
643 event: Some(data_event::Event::JournalEntry(
644 SynthService::journal_entry_to_proto(&entry),
645 )),
646 };
647
648 if tx.send(Ok(event)).await.is_err() {
649 info!("Stream receiver dropped");
650 break;
651 }
652
653 tokio::time::sleep(delay).await;
655
656 if max_events > 0 && sequence >= max_events {
658 break;
659 }
660 }
661 }
662
663 state.active_streams.fetch_sub(1, Ordering::Relaxed);
665 });
666
667 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
668 }
669
670 async fn control(
672 &self,
673 request: Request<ControlCommand>,
674 ) -> Result<Response<ControlResponse>, Status> {
675 let cmd = request.into_inner();
676 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
677
678 info!("Control command: {:?}", action);
679
680 let (success, message, status) = match action {
681 ControlAction::Pause => {
682 self.state.stream_paused.store(true, Ordering::Relaxed);
683 (true, "Stream paused".to_string(), StreamStatus::Paused)
684 }
685 ControlAction::Resume => {
686 self.state.stream_paused.store(false, Ordering::Relaxed);
687 (true, "Stream resumed".to_string(), StreamStatus::Running)
688 }
689 ControlAction::Stop => {
690 self.state.stream_stopped.store(true, Ordering::Relaxed);
691 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
692 }
693 ControlAction::TriggerPattern => {
694 let pattern = cmd.pattern_name.unwrap_or_default();
695 if pattern.is_empty() {
696 (
697 false,
698 "Pattern name is required for TriggerPattern action".to_string(),
699 StreamStatus::Running,
700 )
701 } else {
702 let valid_patterns = [
705 "year_end_spike",
706 "period_end_spike",
707 "holiday_cluster",
708 "fraud_cluster",
709 "error_cluster",
710 "uniform",
711 ];
712 let is_valid = valid_patterns.contains(&pattern.as_str())
713 || pattern.starts_with("custom:");
714
715 if is_valid {
716 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
718 *triggered = Some(pattern.clone());
719 }
720 info!("Pattern trigger activated: {}", pattern);
721 (
722 true,
723 format!("Pattern '{pattern}' will be applied to upcoming entries"),
724 StreamStatus::Running,
725 )
726 } else {
727 (
728 false,
729 format!(
730 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
731 ),
732 StreamStatus::Running,
733 )
734 }
735 }
736 }
737 ControlAction::Unspecified => (
738 false,
739 "Unknown control action".to_string(),
740 StreamStatus::Unspecified,
741 ),
742 };
743
744 Ok(Response::new(ControlResponse {
745 success,
746 message,
747 current_status: status as i32,
748 }))
749 }
750
751 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
753 let config = self.state.config.read().await;
754 let proto_config = Self::config_to_proto(&config);
755
756 Ok(Response::new(ConfigResponse {
757 success: true,
758 message: "Current configuration retrieved".to_string(),
759 current_config: Some(proto_config),
760 }))
761 }
762
763 async fn set_config(
765 &self,
766 request: Request<ConfigRequest>,
767 ) -> Result<Response<ConfigResponse>, Status> {
768 let req = request.into_inner();
769
770 if let Some(proto_config) = req.config {
771 let new_config = self.proto_to_config(Some(proto_config)).await?;
772
773 let mut config = self.state.config.write().await;
774 *config = new_config.clone();
775
776 info!("Configuration updated");
777
778 Ok(Response::new(ConfigResponse {
779 success: true,
780 message: "Configuration updated".to_string(),
781 current_config: Some(Self::config_to_proto(&new_config)),
782 }))
783 } else {
784 Err(Status::invalid_argument("No configuration provided"))
785 }
786 }
787
788 async fn get_metrics(
790 &self,
791 _request: Request<()>,
792 ) -> Result<Response<MetricsResponse>, Status> {
793 let uptime = self.state.uptime_seconds();
794 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
795
796 let entries_per_second = if uptime > 0 {
797 total_entries as f64 / uptime as f64
798 } else {
799 0.0
800 };
801
802 Ok(Response::new(MetricsResponse {
803 total_entries_generated: total_entries,
804 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
805 uptime_seconds: uptime,
806 session_entries: total_entries,
807 session_entries_per_second: entries_per_second,
808 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
809 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
810 }))
811 }
812
813 async fn health_check(
815 &self,
816 _request: Request<()>,
817 ) -> Result<Response<HealthResponse>, Status> {
818 Ok(Response::new(HealthResponse {
819 healthy: true,
820 version: env!("CARGO_PKG_VERSION").to_string(),
821 uptime_seconds: self.state.uptime_seconds(),
822 }))
823 }
824}
825
826pub fn default_generator_config() -> GeneratorConfig {
828 GeneratorConfig {
829 global: GlobalConfig {
830 seed: None,
831 industry: IndustrySector::Manufacturing,
832 start_date: "2024-01-01".to_string(),
833 period_months: 12,
834 group_currency: "USD".to_string(),
835 parallel: true,
836 worker_threads: 0,
837 memory_limit_mb: 0,
838 fiscal_year_months: None,
839 },
840 companies: vec![CompanyConfig {
841 code: "1000".to_string(),
842 name: "Default Company".to_string(),
843 currency: "USD".to_string(),
844 country: "US".to_string(),
845 annual_transaction_volume: TransactionVolume::TenK,
846 volume_weight: 1.0,
847 fiscal_year_variant: "K4".to_string(),
848 }],
849 chart_of_accounts: ChartOfAccountsConfig {
850 complexity: CoAComplexity::Small,
851 industry_specific: true,
852 custom_accounts: None,
853 min_hierarchy_depth: 2,
854 max_hierarchy_depth: 5,
855 },
856 transactions: Default::default(),
857 output: OutputConfig::default(),
858 fraud: Default::default(),
859 internal_controls: Default::default(),
860 business_processes: Default::default(),
861 user_personas: Default::default(),
862 templates: Default::default(),
863 approval: Default::default(),
864 departments: Default::default(),
865 master_data: Default::default(),
866 document_flows: Default::default(),
867 intercompany: Default::default(),
868 balance: Default::default(),
869 ocpm: Default::default(),
870 audit: Default::default(),
871 banking: Default::default(),
872 data_quality: Default::default(),
873 scenario: Default::default(),
874 temporal: Default::default(),
875 graph_export: Default::default(),
876 streaming: Default::default(),
877 rate_limit: Default::default(),
878 temporal_attributes: Default::default(),
879 relationships: Default::default(),
880 accounting_standards: Default::default(),
881 audit_standards: Default::default(),
882 distributions: Default::default(),
883 temporal_patterns: Default::default(),
884 vendor_network: Default::default(),
885 customer_segmentation: Default::default(),
886 relationship_strength: Default::default(),
887 cross_process_links: Default::default(),
888 organizational_events: Default::default(),
889 behavioral_drift: Default::default(),
890 market_drift: Default::default(),
891 drift_labeling: Default::default(),
892 anomaly_injection: Default::default(),
893 industry_specific: Default::default(),
894 fingerprint_privacy: Default::default(),
895 quality_gates: Default::default(),
896 compliance: Default::default(),
897 webhooks: Default::default(),
898 llm: Default::default(),
899 diffusion: Default::default(),
900 causal: Default::default(),
901 source_to_pay: Default::default(),
902 financial_reporting: Default::default(),
903 hr: Default::default(),
904 manufacturing: Default::default(),
905 sales_quotes: Default::default(),
906 tax: Default::default(),
907 treasury: Default::default(),
908 project_accounting: Default::default(),
909 esg: Default::default(),
910 country_packs: None,
911 scenarios: Default::default(),
912 session: Default::default(),
913 compliance_regulations: Default::default(),
914 }
915}
916
917#[cfg(test)]
918#[allow(clippy::unwrap_used)]
919mod tests {
920 use super::*;
921 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
922
923 #[tokio::test]
928 async fn test_service_creation() {
929 let config = default_generator_config();
930 let service = SynthService::new(config);
931 assert!(service.state.uptime_seconds() < 60);
933 }
934
935 #[tokio::test]
936 async fn test_service_with_state() {
937 let config = default_generator_config();
938 let state = Arc::new(ServerState::new(config));
939 let service = SynthService::with_state(Arc::clone(&state));
940
941 state.total_entries.store(100, Ordering::Relaxed);
943 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
944 }
945
946 #[tokio::test]
951 async fn test_health_check() {
952 let config = default_generator_config();
953 let service = SynthService::new(config);
954
955 let response = service.health_check(Request::new(())).await.unwrap();
956 let health = response.into_inner();
957
958 assert!(health.healthy);
959 assert!(!health.version.is_empty());
960 }
961
962 #[tokio::test]
963 async fn test_health_check_returns_version() {
964 let config = default_generator_config();
965 let service = SynthService::new(config);
966
967 let response = service.health_check(Request::new(())).await.unwrap();
968 let health = response.into_inner();
969
970 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
971 }
972
973 #[tokio::test]
978 async fn test_get_config() {
979 let config = default_generator_config();
980 let service = SynthService::new(config);
981
982 let response = service.get_config(Request::new(())).await.unwrap();
983 let config_response = response.into_inner();
984
985 assert!(config_response.success);
986 assert!(config_response.current_config.is_some());
987 }
988
989 #[tokio::test]
990 async fn test_get_config_returns_industry() {
991 let config = default_generator_config();
992 let service = SynthService::new(config);
993
994 let response = service.get_config(Request::new(())).await.unwrap();
995 let config_response = response.into_inner();
996 let current = config_response.current_config.unwrap();
997
998 assert_eq!(current.industry, "Manufacturing");
999 }
1000
1001 #[tokio::test]
1002 async fn test_set_config() {
1003 let config = default_generator_config();
1004 let service = SynthService::new(config);
1005
1006 let new_config = GenerationConfig {
1007 industry: "retail".to_string(),
1008 start_date: "2024-06-01".to_string(),
1009 period_months: 6,
1010 seed: 42,
1011 coa_complexity: "medium".to_string(),
1012 companies: vec![],
1013 fraud_enabled: true,
1014 fraud_rate: 0.05,
1015 generate_master_data: false,
1016 generate_document_flows: false,
1017 };
1018
1019 let response = service
1020 .set_config(Request::new(ConfigRequest {
1021 config: Some(new_config),
1022 }))
1023 .await
1024 .unwrap();
1025 let config_response = response.into_inner();
1026
1027 assert!(config_response.success);
1028 }
1029
1030 #[tokio::test]
1031 async fn test_set_config_without_config_fails() {
1032 let config = default_generator_config();
1033 let service = SynthService::new(config);
1034
1035 let result = service
1036 .set_config(Request::new(ConfigRequest { config: None }))
1037 .await;
1038
1039 assert!(result.is_err());
1040 }
1041
1042 #[tokio::test]
1047 async fn test_get_metrics_initial() {
1048 let config = default_generator_config();
1049 let service = SynthService::new(config);
1050
1051 let response = service.get_metrics(Request::new(())).await.unwrap();
1052 let metrics = response.into_inner();
1053
1054 assert_eq!(metrics.total_entries_generated, 0);
1055 assert_eq!(metrics.total_anomalies_injected, 0);
1056 assert_eq!(metrics.active_streams, 0);
1057 }
1058
1059 #[tokio::test]
1060 async fn test_get_metrics_after_updates() {
1061 let config = default_generator_config();
1062 let service = SynthService::new(config);
1063
1064 service.state.total_entries.store(1000, Ordering::Relaxed);
1066 service.state.total_anomalies.store(20, Ordering::Relaxed);
1067 service.state.active_streams.store(2, Ordering::Relaxed);
1068
1069 let response = service.get_metrics(Request::new(())).await.unwrap();
1070 let metrics = response.into_inner();
1071
1072 assert_eq!(metrics.total_entries_generated, 1000);
1073 assert_eq!(metrics.total_anomalies_injected, 20);
1074 assert_eq!(metrics.active_streams, 2);
1075 }
1076
1077 #[tokio::test]
1082 async fn test_control_pause() {
1083 let config = default_generator_config();
1084 let service = SynthService::new(config);
1085
1086 let response = service
1087 .control(Request::new(ControlCommand {
1088 action: ControlAction::Pause as i32,
1089 pattern_name: None,
1090 }))
1091 .await
1092 .unwrap();
1093 let control_response = response.into_inner();
1094
1095 assert!(control_response.success);
1096 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1097 }
1098
1099 #[tokio::test]
1100 async fn test_control_resume() {
1101 let config = default_generator_config();
1102 let service = SynthService::new(config);
1103
1104 service.state.stream_paused.store(true, Ordering::Relaxed);
1106
1107 let response = service
1108 .control(Request::new(ControlCommand {
1109 action: ControlAction::Resume as i32,
1110 pattern_name: None,
1111 }))
1112 .await
1113 .unwrap();
1114 let control_response = response.into_inner();
1115
1116 assert!(control_response.success);
1117 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1118 }
1119
1120 #[tokio::test]
1121 async fn test_control_stop() {
1122 let config = default_generator_config();
1123 let service = SynthService::new(config);
1124
1125 let response = service
1126 .control(Request::new(ControlCommand {
1127 action: ControlAction::Stop as i32,
1128 pattern_name: None,
1129 }))
1130 .await
1131 .unwrap();
1132 let control_response = response.into_inner();
1133
1134 assert!(control_response.success);
1135 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1136 }
1137
1138 #[test]
1143 fn test_server_state_creation() {
1144 let config = default_generator_config();
1145 let state = ServerState::new(config);
1146
1147 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1148 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1149 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1150 assert!(!state.stream_paused.load(Ordering::Relaxed));
1151 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1152 }
1153
1154 #[test]
1155 fn test_server_state_uptime() {
1156 let config = default_generator_config();
1157 let state = ServerState::new(config);
1158
1159 assert!(state.uptime_seconds() < 60);
1161 }
1162
1163 #[test]
1168 fn test_default_generator_config() {
1169 let config = default_generator_config();
1170
1171 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1172 assert_eq!(config.global.period_months, 12);
1173 assert!(!config.companies.is_empty());
1174 assert_eq!(config.companies[0].code, "1000");
1175 }
1176
1177 #[test]
1178 fn test_config_to_proto() {
1179 let config = default_generator_config();
1180 let proto = SynthService::config_to_proto(&config);
1181
1182 assert_eq!(proto.industry, "Manufacturing");
1183 assert_eq!(proto.period_months, 12);
1184 assert!(!proto.companies.is_empty());
1185 }
1186
1187 #[tokio::test]
1188 async fn test_proto_to_config_with_none() {
1189 let config = default_generator_config();
1190 let service = SynthService::new(config.clone());
1191
1192 let result = service.proto_to_config(None).await.unwrap();
1193
1194 assert_eq!(result.global.industry, config.global.industry);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_proto_to_config_with_retail() {
1200 let config = default_generator_config();
1201 let service = SynthService::new(config);
1202
1203 let proto = GenerationConfig {
1204 industry: "retail".to_string(),
1205 start_date: "2024-01-01".to_string(),
1206 period_months: 6,
1207 seed: 0,
1208 coa_complexity: "large".to_string(),
1209 companies: vec![],
1210 fraud_enabled: false,
1211 fraud_rate: 0.0,
1212 generate_master_data: false,
1213 generate_document_flows: false,
1214 };
1215
1216 let result = service.proto_to_config(Some(proto)).await.unwrap();
1217
1218 assert_eq!(result.global.industry, IndustrySector::Retail);
1219 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_proto_to_config_with_healthcare() {
1224 let config = default_generator_config();
1225 let service = SynthService::new(config);
1226
1227 let proto = GenerationConfig {
1228 industry: "healthcare".to_string(),
1229 start_date: "2024-01-01".to_string(),
1230 period_months: 12,
1231 seed: 42,
1232 coa_complexity: "small".to_string(),
1233 companies: vec![],
1234 fraud_enabled: true,
1235 fraud_rate: 0.1,
1236 generate_master_data: true,
1237 generate_document_flows: true,
1238 };
1239
1240 let result = service.proto_to_config(Some(proto)).await.unwrap();
1241
1242 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1243 assert_eq!(result.global.seed, Some(42));
1244 assert!(result.fraud.enabled);
1245 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_proto_to_config_with_companies() {
1250 let config = default_generator_config();
1251 let service = SynthService::new(config);
1252
1253 let proto = GenerationConfig {
1254 industry: "technology".to_string(),
1255 start_date: "2024-01-01".to_string(),
1256 period_months: 12,
1257 seed: 0,
1258 coa_complexity: "medium".to_string(),
1259 companies: vec![
1260 CompanyConfigProto {
1261 code: "1000".to_string(),
1262 name: "Parent Corp".to_string(),
1263 currency: "USD".to_string(),
1264 country: "US".to_string(),
1265 annual_transaction_volume: 100000,
1266 volume_weight: 1.0,
1267 },
1268 CompanyConfigProto {
1269 code: "2000".to_string(),
1270 name: "EU Sub".to_string(),
1271 currency: "EUR".to_string(),
1272 country: "DE".to_string(),
1273 annual_transaction_volume: 50000,
1274 volume_weight: 0.5,
1275 },
1276 ],
1277 fraud_enabled: false,
1278 fraud_rate: 0.0,
1279 generate_master_data: false,
1280 generate_document_flows: false,
1281 };
1282
1283 let result = service.proto_to_config(Some(proto)).await.unwrap();
1284
1285 assert_eq!(result.companies.len(), 2);
1286 assert_eq!(result.companies[0].code, "1000");
1287 assert_eq!(result.companies[1].currency, "EUR");
1288 }
1289
1290 #[tokio::test]
1295 async fn test_bulk_generate_entry_count_validation() {
1296 let config = default_generator_config();
1297 let service = SynthService::new(config);
1298
1299 let request = BulkGenerateRequest {
1300 entry_count: 2_000_000, include_master_data: false,
1302 inject_anomalies: false,
1303 output_format: 0,
1304 config: None,
1305 };
1306
1307 let result = service.bulk_generate(Request::new(request)).await;
1308 assert!(result.is_err());
1309 let err = result.err().unwrap();
1310 assert!(err.message().contains("exceeds maximum allowed value"));
1311 }
1312
1313 #[tokio::test]
1314 async fn test_stream_data_events_per_second_too_low() {
1315 let config = default_generator_config();
1316 let service = SynthService::new(config);
1317
1318 let request = StreamDataRequest {
1319 events_per_second: 0, max_events: 100,
1321 inject_anomalies: false,
1322 anomaly_rate: 0.0,
1323 config: None,
1324 };
1325
1326 let result = service.stream_data(Request::new(request)).await;
1327 assert!(result.is_err());
1328 let err = result.err().unwrap();
1329 assert!(err.message().contains("must be at least"));
1330 }
1331
1332 #[tokio::test]
1333 async fn test_stream_data_events_per_second_too_high() {
1334 let config = default_generator_config();
1335 let service = SynthService::new(config);
1336
1337 let request = StreamDataRequest {
1338 events_per_second: 20_000, max_events: 100,
1340 inject_anomalies: false,
1341 anomaly_rate: 0.0,
1342 config: None,
1343 };
1344
1345 let result = service.stream_data(Request::new(request)).await;
1346 assert!(result.is_err());
1347 let err = result.err().unwrap();
1348 assert!(err.message().contains("exceeds maximum allowed value"));
1349 }
1350
1351 #[tokio::test]
1352 async fn test_stream_data_max_events_too_high() {
1353 let config = default_generator_config();
1354 let service = SynthService::new(config);
1355
1356 let request = StreamDataRequest {
1357 events_per_second: 100,
1358 max_events: 100_000_000, inject_anomalies: false,
1360 anomaly_rate: 0.0,
1361 config: None,
1362 };
1363
1364 let result = service.stream_data(Request::new(request)).await;
1365 assert!(result.is_err());
1366 let err = result.err().unwrap();
1367 assert!(err.message().contains("max_events"));
1368 }
1369
1370 #[tokio::test]
1371 async fn test_stream_data_valid_request() {
1372 let config = default_generator_config();
1373 let service = SynthService::new(config);
1374
1375 let request = StreamDataRequest {
1376 events_per_second: 10,
1377 max_events: 5,
1378 inject_anomalies: false,
1379 anomaly_rate: 0.0,
1380 config: None,
1381 };
1382
1383 let result = service.stream_data(Request::new(request)).await;
1386 assert!(result.is_ok());
1387 }
1388}