1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 country: "US".to_string(),
236 annual_transaction_volume: TransactionVolume::TenK,
237 volume_weight: 1.0,
238 fiscal_year_variant: "K4".to_string(),
239 }]
240 } else {
241 p.companies
242 .into_iter()
243 .map(|c| CompanyConfig {
244 code: c.code,
245 name: c.name,
246 currency: c.currency,
247 country: c.country,
248 annual_transaction_volume: TransactionVolume::Custom(
249 c.annual_transaction_volume,
250 ),
251 volume_weight: c.volume_weight as f64,
252 fiscal_year_variant: "K4".to_string(),
253 })
254 .collect()
255 };
256
257 let mut config = GeneratorConfig {
258 global: GlobalConfig {
259 seed: if p.seed > 0 { Some(p.seed) } else { None },
260 industry,
261 start_date: if p.start_date.is_empty() {
262 "2024-01-01".to_string()
263 } else {
264 p.start_date
265 },
266 period_months: if p.period_months == 0 {
267 12
268 } else {
269 p.period_months
270 },
271 group_currency: "USD".to_string(),
272 parallel: true,
273 worker_threads: 0,
274 memory_limit_mb: 0,
275 fiscal_year_months: None,
276 },
277 companies,
278 chart_of_accounts: ChartOfAccountsConfig {
279 complexity,
280 industry_specific: true,
281 custom_accounts: None,
282 min_hierarchy_depth: 2,
283 max_hierarchy_depth: 5,
284 },
285 ..default_generator_config()
286 };
287
288 if p.fraud_enabled {
290 config.fraud.enabled = true;
291 config.fraud.fraud_rate = p.fraud_rate as f64;
292 }
293
294 Ok(config)
295 }
296 None => {
297 let config = self.state.config.read().await;
299 Ok(config.clone())
300 }
301 }
302 }
303
304 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
306 JournalEntryProto {
307 document_id: entry.header.document_id.to_string(),
308 company_code: entry.header.company_code.clone(),
309 fiscal_year: entry.header.fiscal_year as u32,
310 fiscal_period: entry.header.fiscal_period as u32,
311 posting_date: entry.header.posting_date.to_string(),
312 document_date: entry.header.document_date.to_string(),
313 created_at: entry.header.created_at.to_rfc3339(),
314 source: format!("{:?}", entry.header.source),
315 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
316 lines: entry
317 .lines
318 .iter()
319 .map(|line| {
320 let amount = if line.is_debit() {
321 line.debit_amount
322 } else {
323 line.credit_amount
324 };
325 JournalLineProto {
326 line_number: line.line_number,
327 account_number: line.gl_account.clone(),
328 account_name: line.account_description.clone().unwrap_or_default(),
329 amount: amount.to_string(),
330 is_debit: line.is_debit(),
331 cost_center: line.cost_center.clone(),
332 profit_center: line.profit_center.clone(),
333 vendor_id: line.auxiliary_account_number.clone(),
336 customer_id: None,
337 material_id: None,
338 text: line.line_text.clone().or_else(|| line.text.clone()),
339 }
340 })
341 .collect(),
342 is_anomaly: entry.header.is_fraud,
343 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
344 }
345 }
346
347 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
349 GenerationConfig {
350 industry: format!("{:?}", config.global.industry),
351 start_date: config.global.start_date.clone(),
352 period_months: config.global.period_months,
353 seed: config.global.seed.unwrap_or(0),
354 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
355 companies: config
356 .companies
357 .iter()
358 .map(|c| CompanyConfigProto {
359 code: c.code.clone(),
360 name: c.name.clone(),
361 currency: c.currency.clone(),
362 country: c.country.clone(),
363 annual_transaction_volume: c.annual_transaction_volume.count(),
364 volume_weight: c.volume_weight as f32,
365 })
366 .collect(),
367 fraud_enabled: config.fraud.enabled,
368 fraud_rate: config.fraud.fraud_rate as f32,
369 generate_master_data: config.master_data.vendors.count > 0
370 || config.master_data.customers.count > 0
371 || config.master_data.materials.count > 0,
372 generate_document_flows: config.document_flows.p2p.enabled
373 || config.document_flows.o2c.enabled,
374 }
375 }
376}
377
378#[tonic::async_trait]
379impl synthetic_data_service_server::SyntheticDataService for SynthService {
380 async fn bulk_generate(
382 &self,
383 request: Request<BulkGenerateRequest>,
384 ) -> Result<Response<BulkGenerateResponse>, Status> {
385 let req = request.into_inner();
386
387 const MAX_ENTRY_COUNT: u64 = 1_000_000;
389 if req.entry_count > MAX_ENTRY_COUNT {
390 return Err(Status::invalid_argument(format!(
391 "entry_count ({}) exceeds maximum allowed value ({})",
392 req.entry_count, MAX_ENTRY_COUNT
393 )));
394 }
395
396 let degradation_level = self.state.check_resources()?;
398 if degradation_level != DegradationLevel::Normal {
399 warn!(
400 "Starting bulk generation under resource pressure (level: {:?})",
401 degradation_level
402 );
403 }
404
405 info!("Bulk generate request: {} entries", req.entry_count);
406
407 let config = self.proto_to_config(req.config).await?;
408 let start_time = Instant::now();
409
410 let phase_config = PhaseConfig {
412 generate_master_data: req.include_master_data,
413 generate_document_flows: false,
414 generate_journal_entries: true,
415 inject_anomalies: req.inject_anomalies,
416 show_progress: false,
417 ..Default::default()
418 };
419
420 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
421 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
422
423 let result = orchestrator
424 .generate()
425 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
426
427 let duration_ms = start_time.elapsed().as_millis() as u64;
428
429 let entries_count = result.journal_entries.len() as u64;
431 self.state
432 .total_entries
433 .fetch_add(entries_count, Ordering::Relaxed);
434
435 let anomaly_count = result.anomaly_labels.labels.len() as u64;
436 self.state
437 .total_anomalies
438 .fetch_add(anomaly_count, Ordering::Relaxed);
439
440 let journal_entries: Vec<JournalEntryProto> = result
442 .journal_entries
443 .iter()
444 .map(Self::journal_entry_to_proto)
445 .collect();
446
447 let anomaly_labels: Vec<AnomalyLabelProto> = result
448 .anomaly_labels
449 .labels
450 .iter()
451 .map(|a| AnomalyLabelProto {
452 anomaly_id: a.anomaly_id.clone(),
453 document_id: a.document_id.clone(),
454 anomaly_type: format!("{:?}", a.anomaly_type),
455 anomaly_category: a.document_type.clone(),
456 description: a.description.clone(),
457 severity_score: a.severity as f32,
458 })
459 .collect();
460
461 let mut total_debit = rust_decimal::Decimal::ZERO;
463 let mut total_credit = rust_decimal::Decimal::ZERO;
464 let mut total_lines = 0u64;
465 let mut entries_by_company = std::collections::HashMap::new();
466 let mut entries_by_source = std::collections::HashMap::new();
467
468 for entry in &result.journal_entries {
469 *entries_by_company
470 .entry(entry.header.company_code.clone())
471 .or_insert(0u64) += 1;
472 *entries_by_source
473 .entry(format!("{:?}", entry.header.source))
474 .or_insert(0u64) += 1;
475
476 for line in &entry.lines {
477 total_lines += 1;
478 total_debit += line.debit_amount;
479 total_credit += line.credit_amount;
480 }
481 }
482
483 let stats = GenerationStats {
484 total_entries: entries_count,
485 total_lines,
486 total_debit_amount: total_debit.to_string(),
487 total_credit_amount: total_credit.to_string(),
488 anomaly_count,
489 entries_by_company,
490 entries_by_source,
491 };
492
493 info!(
494 "Bulk generation complete: {} entries in {}ms",
495 entries_count, duration_ms
496 );
497
498 Ok(Response::new(BulkGenerateResponse {
499 entries_generated: entries_count,
500 duration_ms,
501 journal_entries,
502 anomaly_labels,
503 stats: Some(stats),
504 }))
505 }
506
507 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
508
509 async fn stream_data(
511 &self,
512 request: Request<StreamDataRequest>,
513 ) -> Result<Response<Self::StreamDataStream>, Status> {
514 let req = request.into_inner();
515
516 const MIN_EVENTS_PER_SECOND: u32 = 1;
518 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
519 if req.events_per_second < MIN_EVENTS_PER_SECOND {
520 return Err(Status::invalid_argument(format!(
521 "events_per_second ({}) must be at least {}",
522 req.events_per_second, MIN_EVENTS_PER_SECOND
523 )));
524 }
525 if req.events_per_second > MAX_EVENTS_PER_SECOND {
526 return Err(Status::invalid_argument(format!(
527 "events_per_second ({}) exceeds maximum allowed value ({})",
528 req.events_per_second, MAX_EVENTS_PER_SECOND
529 )));
530 }
531
532 const MAX_STREAM_EVENTS: u64 = 10_000_000;
534 if req.max_events > MAX_STREAM_EVENTS {
535 return Err(Status::invalid_argument(format!(
536 "max_events ({}) exceeds maximum allowed value ({})",
537 req.max_events, MAX_STREAM_EVENTS
538 )));
539 }
540
541 let degradation_level = self.state.check_resources()?;
543 if degradation_level != DegradationLevel::Normal {
544 warn!(
545 "Starting stream under resource pressure (level: {:?})",
546 degradation_level
547 );
548 }
549
550 info!(
551 "Stream data request: {} events/sec, max {}",
552 req.events_per_second, req.max_events
553 );
554
555 let config = self.proto_to_config(req.config).await?;
556 let state = self.state.clone();
557
558 state.active_streams.fetch_add(1, Ordering::Relaxed);
560
561 state.stream_paused.store(false, Ordering::Relaxed);
563 state.stream_stopped.store(false, Ordering::Relaxed);
564
565 let (tx, rx) = mpsc::channel(100);
566
567 let events_per_second = req.events_per_second;
569 let max_events = req.max_events;
570 let inject_anomalies = req.inject_anomalies;
571
572 tokio::spawn(async move {
573 let phase_config = PhaseConfig {
574 generate_master_data: false,
575 generate_document_flows: false,
576 generate_journal_entries: true,
577 inject_anomalies,
578 show_progress: false,
579 ..Default::default()
580 };
581
582 let mut sequence = 0u64;
583 let delay = if events_per_second > 0 {
584 Duration::from_micros(1_000_000 / events_per_second as u64)
585 } else {
586 Duration::from_millis(1)
587 };
588
589 let mut orchestrator =
591 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
592 Ok(o) => o,
593 Err(e) => {
594 error!("Failed to create orchestrator: {}", e);
595 return;
596 }
597 };
598
599 loop {
600 if state.stream_stopped.load(Ordering::Relaxed) {
602 info!("Stream stopped by control command");
603 break;
604 }
605
606 while state.stream_paused.load(Ordering::Relaxed) {
608 tokio::time::sleep(Duration::from_millis(100)).await;
609 if state.stream_stopped.load(Ordering::Relaxed) {
610 break;
611 }
612 }
613
614 if max_events > 0 && sequence >= max_events {
616 info!("Stream reached max events: {}", max_events);
617 break;
618 }
619
620 let result = match orchestrator.generate() {
622 Ok(r) => r,
623 Err(e) => {
624 error!("Generation failed: {}", e);
625 break;
626 }
627 };
628
629 for entry in result.journal_entries {
631 sequence += 1;
632 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
633 state.total_entries.fetch_add(1, Ordering::Relaxed);
634
635 let timestamp = Timestamp {
636 seconds: Utc::now().timestamp(),
637 nanos: 0,
638 };
639
640 let event = DataEvent {
641 sequence,
642 timestamp: Some(timestamp),
643 event: Some(data_event::Event::JournalEntry(
644 SynthService::journal_entry_to_proto(&entry),
645 )),
646 };
647
648 if tx.send(Ok(event)).await.is_err() {
649 info!("Stream receiver dropped");
650 break;
651 }
652
653 tokio::time::sleep(delay).await;
655
656 if max_events > 0 && sequence >= max_events {
658 break;
659 }
660 }
661 }
662
663 state.active_streams.fetch_sub(1, Ordering::Relaxed);
665 });
666
667 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
668 }
669
670 async fn control(
672 &self,
673 request: Request<ControlCommand>,
674 ) -> Result<Response<ControlResponse>, Status> {
675 let cmd = request.into_inner();
676 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
677
678 info!("Control command: {:?}", action);
679
680 let (success, message, status) = match action {
681 ControlAction::Pause => {
682 self.state.stream_paused.store(true, Ordering::Relaxed);
683 (true, "Stream paused".to_string(), StreamStatus::Paused)
684 }
685 ControlAction::Resume => {
686 self.state.stream_paused.store(false, Ordering::Relaxed);
687 (true, "Stream resumed".to_string(), StreamStatus::Running)
688 }
689 ControlAction::Stop => {
690 self.state.stream_stopped.store(true, Ordering::Relaxed);
691 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
692 }
693 ControlAction::TriggerPattern => {
694 let pattern = cmd.pattern_name.unwrap_or_default();
695 if pattern.is_empty() {
696 (
697 false,
698 "Pattern name is required for TriggerPattern action".to_string(),
699 StreamStatus::Running,
700 )
701 } else {
702 let valid_patterns = [
705 "year_end_spike",
706 "period_end_spike",
707 "holiday_cluster",
708 "fraud_cluster",
709 "error_cluster",
710 "uniform",
711 ];
712 let is_valid = valid_patterns.contains(&pattern.as_str())
713 || pattern.starts_with("custom:");
714
715 if is_valid {
716 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
718 *triggered = Some(pattern.clone());
719 }
720 info!("Pattern trigger activated: {}", pattern);
721 (
722 true,
723 format!("Pattern '{pattern}' will be applied to upcoming entries"),
724 StreamStatus::Running,
725 )
726 } else {
727 (
728 false,
729 format!(
730 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
731 ),
732 StreamStatus::Running,
733 )
734 }
735 }
736 }
737 ControlAction::Unspecified => (
738 false,
739 "Unknown control action".to_string(),
740 StreamStatus::Unspecified,
741 ),
742 };
743
744 Ok(Response::new(ControlResponse {
745 success,
746 message,
747 current_status: status as i32,
748 }))
749 }
750
751 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
753 let config = self.state.config.read().await;
754 let proto_config = Self::config_to_proto(&config);
755
756 Ok(Response::new(ConfigResponse {
757 success: true,
758 message: "Current configuration retrieved".to_string(),
759 current_config: Some(proto_config),
760 }))
761 }
762
763 async fn set_config(
765 &self,
766 request: Request<ConfigRequest>,
767 ) -> Result<Response<ConfigResponse>, Status> {
768 let req = request.into_inner();
769
770 if let Some(proto_config) = req.config {
771 let new_config = self.proto_to_config(Some(proto_config)).await?;
772
773 let mut config = self.state.config.write().await;
774 *config = new_config.clone();
775
776 info!("Configuration updated");
777
778 Ok(Response::new(ConfigResponse {
779 success: true,
780 message: "Configuration updated".to_string(),
781 current_config: Some(Self::config_to_proto(&new_config)),
782 }))
783 } else {
784 Err(Status::invalid_argument("No configuration provided"))
785 }
786 }
787
788 async fn get_metrics(
790 &self,
791 _request: Request<()>,
792 ) -> Result<Response<MetricsResponse>, Status> {
793 let uptime = self.state.uptime_seconds();
794 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
795
796 let entries_per_second = if uptime > 0 {
797 total_entries as f64 / uptime as f64
798 } else {
799 0.0
800 };
801
802 Ok(Response::new(MetricsResponse {
803 total_entries_generated: total_entries,
804 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
805 uptime_seconds: uptime,
806 session_entries: total_entries,
807 session_entries_per_second: entries_per_second,
808 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
809 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
810 }))
811 }
812
813 async fn health_check(
815 &self,
816 _request: Request<()>,
817 ) -> Result<Response<HealthResponse>, Status> {
818 Ok(Response::new(HealthResponse {
819 healthy: true,
820 version: env!("CARGO_PKG_VERSION").to_string(),
821 uptime_seconds: self.state.uptime_seconds(),
822 }))
823 }
824}
825
826pub fn default_generator_config() -> GeneratorConfig {
828 GeneratorConfig {
829 global: GlobalConfig {
830 seed: None,
831 industry: IndustrySector::Manufacturing,
832 start_date: "2024-01-01".to_string(),
833 period_months: 12,
834 group_currency: "USD".to_string(),
835 parallel: true,
836 worker_threads: 0,
837 memory_limit_mb: 0,
838 fiscal_year_months: None,
839 },
840 companies: vec![CompanyConfig {
841 code: "1000".to_string(),
842 name: "Default Company".to_string(),
843 currency: "USD".to_string(),
844 country: "US".to_string(),
845 annual_transaction_volume: TransactionVolume::TenK,
846 volume_weight: 1.0,
847 fiscal_year_variant: "K4".to_string(),
848 }],
849 chart_of_accounts: ChartOfAccountsConfig {
850 complexity: CoAComplexity::Small,
851 industry_specific: true,
852 custom_accounts: None,
853 min_hierarchy_depth: 2,
854 max_hierarchy_depth: 5,
855 },
856 transactions: Default::default(),
857 output: OutputConfig::default(),
858 fraud: Default::default(),
859 internal_controls: Default::default(),
860 business_processes: Default::default(),
861 user_personas: Default::default(),
862 templates: Default::default(),
863 approval: Default::default(),
864 departments: Default::default(),
865 master_data: Default::default(),
866 document_flows: Default::default(),
867 intercompany: Default::default(),
868 balance: Default::default(),
869 ocpm: Default::default(),
870 audit: Default::default(),
871 banking: Default::default(),
872 data_quality: Default::default(),
873 scenario: Default::default(),
874 temporal: Default::default(),
875 graph_export: Default::default(),
876 streaming: Default::default(),
877 rate_limit: Default::default(),
878 temporal_attributes: Default::default(),
879 relationships: Default::default(),
880 accounting_standards: Default::default(),
881 audit_standards: Default::default(),
882 distributions: Default::default(),
883 temporal_patterns: Default::default(),
884 vendor_network: Default::default(),
885 customer_segmentation: Default::default(),
886 relationship_strength: Default::default(),
887 cross_process_links: Default::default(),
888 organizational_events: Default::default(),
889 behavioral_drift: Default::default(),
890 market_drift: Default::default(),
891 drift_labeling: Default::default(),
892 anomaly_injection: Default::default(),
893 industry_specific: Default::default(),
894 fingerprint_privacy: Default::default(),
895 quality_gates: Default::default(),
896 compliance: Default::default(),
897 webhooks: Default::default(),
898 llm: Default::default(),
899 diffusion: Default::default(),
900 causal: Default::default(),
901 source_to_pay: Default::default(),
902 financial_reporting: Default::default(),
903 hr: Default::default(),
904 manufacturing: Default::default(),
905 sales_quotes: Default::default(),
906 tax: Default::default(),
907 treasury: Default::default(),
908 project_accounting: Default::default(),
909 esg: Default::default(),
910 country_packs: None,
911 scenarios: Default::default(),
912 session: Default::default(),
913 }
914}
915
916#[cfg(test)]
917#[allow(clippy::unwrap_used)]
918mod tests {
919 use super::*;
920 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
921
922 #[tokio::test]
927 async fn test_service_creation() {
928 let config = default_generator_config();
929 let service = SynthService::new(config);
930 assert!(service.state.uptime_seconds() < 60);
932 }
933
934 #[tokio::test]
935 async fn test_service_with_state() {
936 let config = default_generator_config();
937 let state = Arc::new(ServerState::new(config));
938 let service = SynthService::with_state(Arc::clone(&state));
939
940 state.total_entries.store(100, Ordering::Relaxed);
942 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
943 }
944
945 #[tokio::test]
950 async fn test_health_check() {
951 let config = default_generator_config();
952 let service = SynthService::new(config);
953
954 let response = service.health_check(Request::new(())).await.unwrap();
955 let health = response.into_inner();
956
957 assert!(health.healthy);
958 assert!(!health.version.is_empty());
959 }
960
961 #[tokio::test]
962 async fn test_health_check_returns_version() {
963 let config = default_generator_config();
964 let service = SynthService::new(config);
965
966 let response = service.health_check(Request::new(())).await.unwrap();
967 let health = response.into_inner();
968
969 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
970 }
971
972 #[tokio::test]
977 async fn test_get_config() {
978 let config = default_generator_config();
979 let service = SynthService::new(config);
980
981 let response = service.get_config(Request::new(())).await.unwrap();
982 let config_response = response.into_inner();
983
984 assert!(config_response.success);
985 assert!(config_response.current_config.is_some());
986 }
987
988 #[tokio::test]
989 async fn test_get_config_returns_industry() {
990 let config = default_generator_config();
991 let service = SynthService::new(config);
992
993 let response = service.get_config(Request::new(())).await.unwrap();
994 let config_response = response.into_inner();
995 let current = config_response.current_config.unwrap();
996
997 assert_eq!(current.industry, "Manufacturing");
998 }
999
1000 #[tokio::test]
1001 async fn test_set_config() {
1002 let config = default_generator_config();
1003 let service = SynthService::new(config);
1004
1005 let new_config = GenerationConfig {
1006 industry: "retail".to_string(),
1007 start_date: "2024-06-01".to_string(),
1008 period_months: 6,
1009 seed: 42,
1010 coa_complexity: "medium".to_string(),
1011 companies: vec![],
1012 fraud_enabled: true,
1013 fraud_rate: 0.05,
1014 generate_master_data: false,
1015 generate_document_flows: false,
1016 };
1017
1018 let response = service
1019 .set_config(Request::new(ConfigRequest {
1020 config: Some(new_config),
1021 }))
1022 .await
1023 .unwrap();
1024 let config_response = response.into_inner();
1025
1026 assert!(config_response.success);
1027 }
1028
1029 #[tokio::test]
1030 async fn test_set_config_without_config_fails() {
1031 let config = default_generator_config();
1032 let service = SynthService::new(config);
1033
1034 let result = service
1035 .set_config(Request::new(ConfigRequest { config: None }))
1036 .await;
1037
1038 assert!(result.is_err());
1039 }
1040
1041 #[tokio::test]
1046 async fn test_get_metrics_initial() {
1047 let config = default_generator_config();
1048 let service = SynthService::new(config);
1049
1050 let response = service.get_metrics(Request::new(())).await.unwrap();
1051 let metrics = response.into_inner();
1052
1053 assert_eq!(metrics.total_entries_generated, 0);
1054 assert_eq!(metrics.total_anomalies_injected, 0);
1055 assert_eq!(metrics.active_streams, 0);
1056 }
1057
1058 #[tokio::test]
1059 async fn test_get_metrics_after_updates() {
1060 let config = default_generator_config();
1061 let service = SynthService::new(config);
1062
1063 service.state.total_entries.store(1000, Ordering::Relaxed);
1065 service.state.total_anomalies.store(20, Ordering::Relaxed);
1066 service.state.active_streams.store(2, Ordering::Relaxed);
1067
1068 let response = service.get_metrics(Request::new(())).await.unwrap();
1069 let metrics = response.into_inner();
1070
1071 assert_eq!(metrics.total_entries_generated, 1000);
1072 assert_eq!(metrics.total_anomalies_injected, 20);
1073 assert_eq!(metrics.active_streams, 2);
1074 }
1075
1076 #[tokio::test]
1081 async fn test_control_pause() {
1082 let config = default_generator_config();
1083 let service = SynthService::new(config);
1084
1085 let response = service
1086 .control(Request::new(ControlCommand {
1087 action: ControlAction::Pause as i32,
1088 pattern_name: None,
1089 }))
1090 .await
1091 .unwrap();
1092 let control_response = response.into_inner();
1093
1094 assert!(control_response.success);
1095 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1096 }
1097
1098 #[tokio::test]
1099 async fn test_control_resume() {
1100 let config = default_generator_config();
1101 let service = SynthService::new(config);
1102
1103 service.state.stream_paused.store(true, Ordering::Relaxed);
1105
1106 let response = service
1107 .control(Request::new(ControlCommand {
1108 action: ControlAction::Resume as i32,
1109 pattern_name: None,
1110 }))
1111 .await
1112 .unwrap();
1113 let control_response = response.into_inner();
1114
1115 assert!(control_response.success);
1116 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1117 }
1118
1119 #[tokio::test]
1120 async fn test_control_stop() {
1121 let config = default_generator_config();
1122 let service = SynthService::new(config);
1123
1124 let response = service
1125 .control(Request::new(ControlCommand {
1126 action: ControlAction::Stop as i32,
1127 pattern_name: None,
1128 }))
1129 .await
1130 .unwrap();
1131 let control_response = response.into_inner();
1132
1133 assert!(control_response.success);
1134 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1135 }
1136
1137 #[test]
1142 fn test_server_state_creation() {
1143 let config = default_generator_config();
1144 let state = ServerState::new(config);
1145
1146 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1147 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1148 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1149 assert!(!state.stream_paused.load(Ordering::Relaxed));
1150 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1151 }
1152
1153 #[test]
1154 fn test_server_state_uptime() {
1155 let config = default_generator_config();
1156 let state = ServerState::new(config);
1157
1158 assert!(state.uptime_seconds() < 60);
1160 }
1161
1162 #[test]
1167 fn test_default_generator_config() {
1168 let config = default_generator_config();
1169
1170 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1171 assert_eq!(config.global.period_months, 12);
1172 assert!(!config.companies.is_empty());
1173 assert_eq!(config.companies[0].code, "1000");
1174 }
1175
1176 #[test]
1177 fn test_config_to_proto() {
1178 let config = default_generator_config();
1179 let proto = SynthService::config_to_proto(&config);
1180
1181 assert_eq!(proto.industry, "Manufacturing");
1182 assert_eq!(proto.period_months, 12);
1183 assert!(!proto.companies.is_empty());
1184 }
1185
1186 #[tokio::test]
1187 async fn test_proto_to_config_with_none() {
1188 let config = default_generator_config();
1189 let service = SynthService::new(config.clone());
1190
1191 let result = service.proto_to_config(None).await.unwrap();
1192
1193 assert_eq!(result.global.industry, config.global.industry);
1195 }
1196
1197 #[tokio::test]
1198 async fn test_proto_to_config_with_retail() {
1199 let config = default_generator_config();
1200 let service = SynthService::new(config);
1201
1202 let proto = GenerationConfig {
1203 industry: "retail".to_string(),
1204 start_date: "2024-01-01".to_string(),
1205 period_months: 6,
1206 seed: 0,
1207 coa_complexity: "large".to_string(),
1208 companies: vec![],
1209 fraud_enabled: false,
1210 fraud_rate: 0.0,
1211 generate_master_data: false,
1212 generate_document_flows: false,
1213 };
1214
1215 let result = service.proto_to_config(Some(proto)).await.unwrap();
1216
1217 assert_eq!(result.global.industry, IndustrySector::Retail);
1218 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1219 }
1220
1221 #[tokio::test]
1222 async fn test_proto_to_config_with_healthcare() {
1223 let config = default_generator_config();
1224 let service = SynthService::new(config);
1225
1226 let proto = GenerationConfig {
1227 industry: "healthcare".to_string(),
1228 start_date: "2024-01-01".to_string(),
1229 period_months: 12,
1230 seed: 42,
1231 coa_complexity: "small".to_string(),
1232 companies: vec![],
1233 fraud_enabled: true,
1234 fraud_rate: 0.1,
1235 generate_master_data: true,
1236 generate_document_flows: true,
1237 };
1238
1239 let result = service.proto_to_config(Some(proto)).await.unwrap();
1240
1241 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1242 assert_eq!(result.global.seed, Some(42));
1243 assert!(result.fraud.enabled);
1244 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1245 }
1246
1247 #[tokio::test]
1248 async fn test_proto_to_config_with_companies() {
1249 let config = default_generator_config();
1250 let service = SynthService::new(config);
1251
1252 let proto = GenerationConfig {
1253 industry: "technology".to_string(),
1254 start_date: "2024-01-01".to_string(),
1255 period_months: 12,
1256 seed: 0,
1257 coa_complexity: "medium".to_string(),
1258 companies: vec![
1259 CompanyConfigProto {
1260 code: "1000".to_string(),
1261 name: "Parent Corp".to_string(),
1262 currency: "USD".to_string(),
1263 country: "US".to_string(),
1264 annual_transaction_volume: 100000,
1265 volume_weight: 1.0,
1266 },
1267 CompanyConfigProto {
1268 code: "2000".to_string(),
1269 name: "EU Sub".to_string(),
1270 currency: "EUR".to_string(),
1271 country: "DE".to_string(),
1272 annual_transaction_volume: 50000,
1273 volume_weight: 0.5,
1274 },
1275 ],
1276 fraud_enabled: false,
1277 fraud_rate: 0.0,
1278 generate_master_data: false,
1279 generate_document_flows: false,
1280 };
1281
1282 let result = service.proto_to_config(Some(proto)).await.unwrap();
1283
1284 assert_eq!(result.companies.len(), 2);
1285 assert_eq!(result.companies[0].code, "1000");
1286 assert_eq!(result.companies[1].currency, "EUR");
1287 }
1288
1289 #[tokio::test]
1294 async fn test_bulk_generate_entry_count_validation() {
1295 let config = default_generator_config();
1296 let service = SynthService::new(config);
1297
1298 let request = BulkGenerateRequest {
1299 entry_count: 2_000_000, include_master_data: false,
1301 inject_anomalies: false,
1302 output_format: 0,
1303 config: None,
1304 };
1305
1306 let result = service.bulk_generate(Request::new(request)).await;
1307 assert!(result.is_err());
1308 let err = result.err().unwrap();
1309 assert!(err.message().contains("exceeds maximum allowed value"));
1310 }
1311
1312 #[tokio::test]
1313 async fn test_stream_data_events_per_second_too_low() {
1314 let config = default_generator_config();
1315 let service = SynthService::new(config);
1316
1317 let request = StreamDataRequest {
1318 events_per_second: 0, max_events: 100,
1320 inject_anomalies: false,
1321 anomaly_rate: 0.0,
1322 config: None,
1323 };
1324
1325 let result = service.stream_data(Request::new(request)).await;
1326 assert!(result.is_err());
1327 let err = result.err().unwrap();
1328 assert!(err.message().contains("must be at least"));
1329 }
1330
1331 #[tokio::test]
1332 async fn test_stream_data_events_per_second_too_high() {
1333 let config = default_generator_config();
1334 let service = SynthService::new(config);
1335
1336 let request = StreamDataRequest {
1337 events_per_second: 20_000, max_events: 100,
1339 inject_anomalies: false,
1340 anomaly_rate: 0.0,
1341 config: None,
1342 };
1343
1344 let result = service.stream_data(Request::new(request)).await;
1345 assert!(result.is_err());
1346 let err = result.err().unwrap();
1347 assert!(err.message().contains("exceeds maximum allowed value"));
1348 }
1349
1350 #[tokio::test]
1351 async fn test_stream_data_max_events_too_high() {
1352 let config = default_generator_config();
1353 let service = SynthService::new(config);
1354
1355 let request = StreamDataRequest {
1356 events_per_second: 100,
1357 max_events: 100_000_000, inject_anomalies: false,
1359 anomaly_rate: 0.0,
1360 config: None,
1361 };
1362
1363 let result = service.stream_data(Request::new(request)).await;
1364 assert!(result.is_err());
1365 let err = result.err().unwrap();
1366 assert!(err.message().contains("max_events"));
1367 }
1368
1369 #[tokio::test]
1370 async fn test_stream_data_valid_request() {
1371 let config = default_generator_config();
1372 let service = SynthService::new(config);
1373
1374 let request = StreamDataRequest {
1375 events_per_second: 10,
1376 max_events: 5,
1377 inject_anomalies: false,
1378 anomaly_rate: 0.0,
1379 config: None,
1380 };
1381
1382 let result = service.stream_data(Request::new(request)).await;
1385 assert!(result.is_ok());
1386 }
1387}