1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 functional_currency: None,
236 country: "US".to_string(),
237 annual_transaction_volume: TransactionVolume::TenK,
238 volume_weight: 1.0,
239 fiscal_year_variant: "K4".to_string(),
240 }]
241 } else {
242 p.companies
243 .into_iter()
244 .map(|c| CompanyConfig {
245 code: c.code,
246 name: c.name,
247 currency: c.currency,
248 functional_currency: None,
249 country: c.country,
250 annual_transaction_volume: TransactionVolume::Custom(
251 c.annual_transaction_volume,
252 ),
253 volume_weight: c.volume_weight as f64,
254 fiscal_year_variant: "K4".to_string(),
255 })
256 .collect()
257 };
258
259 let mut config = GeneratorConfig {
260 global: GlobalConfig {
261 seed: if p.seed > 0 { Some(p.seed) } else { None },
262 industry,
263 start_date: if p.start_date.is_empty() {
264 "2024-01-01".to_string()
265 } else {
266 p.start_date
267 },
268 period_months: if p.period_months == 0 {
269 12
270 } else {
271 p.period_months
272 },
273 group_currency: "USD".to_string(),
274 presentation_currency: None,
275 parallel: true,
276 worker_threads: 0,
277 memory_limit_mb: 0,
278 fiscal_year_months: None,
279 },
280 companies,
281 chart_of_accounts: ChartOfAccountsConfig {
282 complexity,
283 industry_specific: true,
284 custom_accounts: None,
285 min_hierarchy_depth: 2,
286 max_hierarchy_depth: 5,
287 },
288 ..default_generator_config()
289 };
290
291 if p.fraud_enabled {
293 config.fraud.enabled = true;
294 config.fraud.fraud_rate = p.fraud_rate as f64;
295 }
296
297 Ok(config)
298 }
299 None => {
300 let config = self.state.config.read().await;
302 Ok(config.clone())
303 }
304 }
305 }
306
307 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
309 JournalEntryProto {
310 document_id: entry.header.document_id.to_string(),
311 company_code: entry.header.company_code.clone(),
312 fiscal_year: entry.header.fiscal_year as u32,
313 fiscal_period: entry.header.fiscal_period as u32,
314 posting_date: entry.header.posting_date.to_string(),
315 document_date: entry.header.document_date.to_string(),
316 created_at: entry.header.created_at.to_rfc3339(),
317 source: format!("{:?}", entry.header.source),
318 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
319 lines: entry
320 .lines
321 .iter()
322 .map(|line| {
323 let amount = if line.is_debit() {
324 line.debit_amount
325 } else {
326 line.credit_amount
327 };
328 JournalLineProto {
329 line_number: line.line_number,
330 account_number: line.gl_account.clone(),
331 account_name: line.account_description.clone().unwrap_or_default(),
332 amount: amount.to_string(),
333 is_debit: line.is_debit(),
334 cost_center: line.cost_center.clone(),
335 profit_center: line.profit_center.clone(),
336 vendor_id: line.auxiliary_account_number.clone(),
339 customer_id: None,
340 material_id: None,
341 text: line.line_text.clone().or_else(|| line.text.clone()),
342 }
343 })
344 .collect(),
345 is_anomaly: entry.header.is_fraud,
346 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
347 }
348 }
349
350 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
352 GenerationConfig {
353 industry: format!("{:?}", config.global.industry),
354 start_date: config.global.start_date.clone(),
355 period_months: config.global.period_months,
356 seed: config.global.seed.unwrap_or(0),
357 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
358 companies: config
359 .companies
360 .iter()
361 .map(|c| CompanyConfigProto {
362 code: c.code.clone(),
363 name: c.name.clone(),
364 currency: c.currency.clone(),
365 country: c.country.clone(),
366 annual_transaction_volume: c.annual_transaction_volume.count(),
367 volume_weight: c.volume_weight as f32,
368 })
369 .collect(),
370 fraud_enabled: config.fraud.enabled,
371 fraud_rate: config.fraud.fraud_rate as f32,
372 generate_master_data: config.master_data.vendors.count > 0
373 || config.master_data.customers.count > 0
374 || config.master_data.materials.count > 0,
375 generate_document_flows: config.document_flows.p2p.enabled
376 || config.document_flows.o2c.enabled,
377 }
378 }
379}
380
381#[tonic::async_trait]
382impl synthetic_data_service_server::SyntheticDataService for SynthService {
383 async fn bulk_generate(
385 &self,
386 request: Request<BulkGenerateRequest>,
387 ) -> Result<Response<BulkGenerateResponse>, Status> {
388 let req = request.into_inner();
389
390 const MAX_ENTRY_COUNT: u64 = 1_000_000;
392 if req.entry_count > MAX_ENTRY_COUNT {
393 return Err(Status::invalid_argument(format!(
394 "entry_count ({}) exceeds maximum allowed value ({})",
395 req.entry_count, MAX_ENTRY_COUNT
396 )));
397 }
398
399 let degradation_level = self.state.check_resources()?;
401 if degradation_level != DegradationLevel::Normal {
402 warn!(
403 "Starting bulk generation under resource pressure (level: {:?})",
404 degradation_level
405 );
406 }
407
408 info!("Bulk generate request: {} entries", req.entry_count);
409
410 let config = self.proto_to_config(req.config).await?;
411 let start_time = Instant::now();
412
413 let phase_config = {
415 let mut pc = PhaseConfig::from_config(&config);
416 pc.generate_master_data = req.include_master_data;
417 pc.generate_document_flows = false;
418 pc.generate_journal_entries = true;
419 pc.inject_anomalies = req.inject_anomalies;
420 pc.show_progress = false;
421 pc
422 };
423
424 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
425 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
426
427 let result = orchestrator
428 .generate()
429 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
430
431 let duration_ms = start_time.elapsed().as_millis() as u64;
432
433 let entries_count = result.journal_entries.len() as u64;
435 self.state
436 .total_entries
437 .fetch_add(entries_count, Ordering::Relaxed);
438
439 let anomaly_count = result.anomaly_labels.labels.len() as u64;
440 self.state
441 .total_anomalies
442 .fetch_add(anomaly_count, Ordering::Relaxed);
443
444 let journal_entries: Vec<JournalEntryProto> = result
446 .journal_entries
447 .iter()
448 .map(Self::journal_entry_to_proto)
449 .collect();
450
451 let anomaly_labels: Vec<AnomalyLabelProto> = result
452 .anomaly_labels
453 .labels
454 .iter()
455 .map(|a| AnomalyLabelProto {
456 anomaly_id: a.anomaly_id.clone(),
457 document_id: a.document_id.clone(),
458 anomaly_type: format!("{:?}", a.anomaly_type),
459 anomaly_category: a.document_type.clone(),
460 description: a.description.clone(),
461 severity_score: a.severity as f32,
462 })
463 .collect();
464
465 let mut total_debit = rust_decimal::Decimal::ZERO;
467 let mut total_credit = rust_decimal::Decimal::ZERO;
468 let mut total_lines = 0u64;
469 let mut entries_by_company = std::collections::HashMap::new();
470 let mut entries_by_source = std::collections::HashMap::new();
471
472 for entry in &result.journal_entries {
473 *entries_by_company
474 .entry(entry.header.company_code.clone())
475 .or_insert(0u64) += 1;
476 *entries_by_source
477 .entry(format!("{:?}", entry.header.source))
478 .or_insert(0u64) += 1;
479
480 for line in &entry.lines {
481 total_lines += 1;
482 total_debit += line.debit_amount;
483 total_credit += line.credit_amount;
484 }
485 }
486
487 let stats = GenerationStats {
488 total_entries: entries_count,
489 total_lines,
490 total_debit_amount: total_debit.to_string(),
491 total_credit_amount: total_credit.to_string(),
492 anomaly_count,
493 entries_by_company,
494 entries_by_source,
495 };
496
497 info!(
498 "Bulk generation complete: {} entries in {}ms",
499 entries_count, duration_ms
500 );
501
502 Ok(Response::new(BulkGenerateResponse {
503 entries_generated: entries_count,
504 duration_ms,
505 journal_entries,
506 anomaly_labels,
507 stats: Some(stats),
508 }))
509 }
510
511 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
512
513 async fn stream_data(
515 &self,
516 request: Request<StreamDataRequest>,
517 ) -> Result<Response<Self::StreamDataStream>, Status> {
518 let req = request.into_inner();
519
520 const MIN_EVENTS_PER_SECOND: u32 = 1;
522 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
523 if req.events_per_second < MIN_EVENTS_PER_SECOND {
524 return Err(Status::invalid_argument(format!(
525 "events_per_second ({}) must be at least {}",
526 req.events_per_second, MIN_EVENTS_PER_SECOND
527 )));
528 }
529 if req.events_per_second > MAX_EVENTS_PER_SECOND {
530 return Err(Status::invalid_argument(format!(
531 "events_per_second ({}) exceeds maximum allowed value ({})",
532 req.events_per_second, MAX_EVENTS_PER_SECOND
533 )));
534 }
535
536 const MAX_STREAM_EVENTS: u64 = 10_000_000;
538 if req.max_events > MAX_STREAM_EVENTS {
539 return Err(Status::invalid_argument(format!(
540 "max_events ({}) exceeds maximum allowed value ({})",
541 req.max_events, MAX_STREAM_EVENTS
542 )));
543 }
544
545 let degradation_level = self.state.check_resources()?;
547 if degradation_level != DegradationLevel::Normal {
548 warn!(
549 "Starting stream under resource pressure (level: {:?})",
550 degradation_level
551 );
552 }
553
554 info!(
555 "Stream data request: {} events/sec, max {}",
556 req.events_per_second, req.max_events
557 );
558
559 let config = self.proto_to_config(req.config).await?;
560 let state = self.state.clone();
561
562 state.active_streams.fetch_add(1, Ordering::Relaxed);
564
565 state.stream_paused.store(false, Ordering::Relaxed);
567 state.stream_stopped.store(false, Ordering::Relaxed);
568
569 let (tx, rx) = mpsc::channel(100);
570
571 let events_per_second = req.events_per_second;
573 let max_events = req.max_events;
574 let inject_anomalies = req.inject_anomalies;
575
576 tokio::spawn(async move {
577 let phase_config = {
578 let mut pc = PhaseConfig::from_config(&config);
579 pc.generate_master_data = false;
580 pc.generate_document_flows = false;
581 pc.generate_journal_entries = true;
582 pc.inject_anomalies = inject_anomalies;
583 pc.show_progress = false;
584 pc
585 };
586
587 let mut sequence = 0u64;
588 let delay = if events_per_second > 0 {
589 Duration::from_micros(1_000_000 / events_per_second as u64)
590 } else {
591 Duration::from_millis(1)
592 };
593
594 let mut orchestrator =
596 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
597 Ok(o) => o,
598 Err(e) => {
599 error!("Failed to create orchestrator: {}", e);
600 return;
601 }
602 };
603
604 loop {
605 if state.stream_stopped.load(Ordering::Relaxed) {
607 info!("Stream stopped by control command");
608 break;
609 }
610
611 while state.stream_paused.load(Ordering::Relaxed) {
613 tokio::time::sleep(Duration::from_millis(100)).await;
614 if state.stream_stopped.load(Ordering::Relaxed) {
615 break;
616 }
617 }
618
619 if max_events > 0 && sequence >= max_events {
621 info!("Stream reached max events: {}", max_events);
622 break;
623 }
624
625 let result = match orchestrator.generate() {
627 Ok(r) => r,
628 Err(e) => {
629 error!("Generation failed: {}", e);
630 break;
631 }
632 };
633
634 for entry in result.journal_entries {
636 sequence += 1;
637 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
638 state.total_entries.fetch_add(1, Ordering::Relaxed);
639
640 let timestamp = Timestamp {
641 seconds: Utc::now().timestamp(),
642 nanos: 0,
643 };
644
645 let event = DataEvent {
646 sequence,
647 timestamp: Some(timestamp),
648 event: Some(data_event::Event::JournalEntry(
649 SynthService::journal_entry_to_proto(&entry),
650 )),
651 };
652
653 if tx.send(Ok(event)).await.is_err() {
654 info!("Stream receiver dropped");
655 break;
656 }
657
658 tokio::time::sleep(delay).await;
660
661 if max_events > 0 && sequence >= max_events {
663 break;
664 }
665 }
666 }
667
668 state.active_streams.fetch_sub(1, Ordering::Relaxed);
670 });
671
672 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
673 }
674
675 async fn control(
677 &self,
678 request: Request<ControlCommand>,
679 ) -> Result<Response<ControlResponse>, Status> {
680 let cmd = request.into_inner();
681 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
682
683 info!("Control command: {:?}", action);
684
685 let (success, message, status) = match action {
686 ControlAction::Pause => {
687 self.state.stream_paused.store(true, Ordering::Relaxed);
688 (true, "Stream paused".to_string(), StreamStatus::Paused)
689 }
690 ControlAction::Resume => {
691 self.state.stream_paused.store(false, Ordering::Relaxed);
692 (true, "Stream resumed".to_string(), StreamStatus::Running)
693 }
694 ControlAction::Stop => {
695 self.state.stream_stopped.store(true, Ordering::Relaxed);
696 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
697 }
698 ControlAction::TriggerPattern => {
699 let pattern = cmd.pattern_name.unwrap_or_default();
700 if pattern.is_empty() {
701 (
702 false,
703 "Pattern name is required for TriggerPattern action".to_string(),
704 StreamStatus::Running,
705 )
706 } else {
707 let valid_patterns = [
710 "year_end_spike",
711 "period_end_spike",
712 "holiday_cluster",
713 "fraud_cluster",
714 "error_cluster",
715 "uniform",
716 ];
717 let is_valid = valid_patterns.contains(&pattern.as_str())
718 || pattern.starts_with("custom:");
719
720 if is_valid {
721 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
723 *triggered = Some(pattern.clone());
724 }
725 info!("Pattern trigger activated: {}", pattern);
726 (
727 true,
728 format!("Pattern '{pattern}' will be applied to upcoming entries"),
729 StreamStatus::Running,
730 )
731 } else {
732 (
733 false,
734 format!(
735 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
736 ),
737 StreamStatus::Running,
738 )
739 }
740 }
741 }
742 ControlAction::Unspecified => (
743 false,
744 "Unknown control action".to_string(),
745 StreamStatus::Unspecified,
746 ),
747 };
748
749 Ok(Response::new(ControlResponse {
750 success,
751 message,
752 current_status: status as i32,
753 }))
754 }
755
756 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
758 let config = self.state.config.read().await;
759 let proto_config = Self::config_to_proto(&config);
760
761 Ok(Response::new(ConfigResponse {
762 success: true,
763 message: "Current configuration retrieved".to_string(),
764 current_config: Some(proto_config),
765 }))
766 }
767
768 async fn set_config(
770 &self,
771 request: Request<ConfigRequest>,
772 ) -> Result<Response<ConfigResponse>, Status> {
773 let req = request.into_inner();
774
775 if let Some(proto_config) = req.config {
776 let new_config = self.proto_to_config(Some(proto_config)).await?;
777
778 let mut config = self.state.config.write().await;
779 *config = new_config.clone();
780
781 info!("Configuration updated");
782
783 Ok(Response::new(ConfigResponse {
784 success: true,
785 message: "Configuration updated".to_string(),
786 current_config: Some(Self::config_to_proto(&new_config)),
787 }))
788 } else {
789 Err(Status::invalid_argument("No configuration provided"))
790 }
791 }
792
793 async fn get_metrics(
795 &self,
796 _request: Request<()>,
797 ) -> Result<Response<MetricsResponse>, Status> {
798 let uptime = self.state.uptime_seconds();
799 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
800
801 let entries_per_second = if uptime > 0 {
802 total_entries as f64 / uptime as f64
803 } else {
804 0.0
805 };
806
807 Ok(Response::new(MetricsResponse {
808 total_entries_generated: total_entries,
809 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
810 uptime_seconds: uptime,
811 session_entries: total_entries,
812 session_entries_per_second: entries_per_second,
813 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
814 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
815 }))
816 }
817
818 async fn health_check(
820 &self,
821 _request: Request<()>,
822 ) -> Result<Response<HealthResponse>, Status> {
823 Ok(Response::new(HealthResponse {
824 healthy: true,
825 version: env!("CARGO_PKG_VERSION").to_string(),
826 uptime_seconds: self.state.uptime_seconds(),
827 }))
828 }
829}
830
831pub fn default_generator_config() -> GeneratorConfig {
833 GeneratorConfig {
834 global: GlobalConfig {
835 seed: None,
836 industry: IndustrySector::Manufacturing,
837 start_date: "2024-01-01".to_string(),
838 period_months: 12,
839 group_currency: "USD".to_string(),
840 presentation_currency: None,
841 parallel: true,
842 worker_threads: 0,
843 memory_limit_mb: 0,
844 fiscal_year_months: None,
845 },
846 companies: vec![CompanyConfig {
847 code: "1000".to_string(),
848 name: "Default Company".to_string(),
849 currency: "USD".to_string(),
850 functional_currency: None,
851 country: "US".to_string(),
852 annual_transaction_volume: TransactionVolume::TenK,
853 volume_weight: 1.0,
854 fiscal_year_variant: "K4".to_string(),
855 }],
856 chart_of_accounts: ChartOfAccountsConfig {
857 complexity: CoAComplexity::Small,
858 industry_specific: true,
859 custom_accounts: None,
860 min_hierarchy_depth: 2,
861 max_hierarchy_depth: 5,
862 },
863 transactions: Default::default(),
864 output: OutputConfig::default(),
865 fraud: Default::default(),
866 internal_controls: Default::default(),
867 business_processes: Default::default(),
868 user_personas: Default::default(),
869 templates: Default::default(),
870 approval: Default::default(),
871 departments: Default::default(),
872 master_data: Default::default(),
873 document_flows: Default::default(),
874 intercompany: Default::default(),
875 balance: Default::default(),
876 ocpm: Default::default(),
877 audit: Default::default(),
878 banking: Default::default(),
879 data_quality: Default::default(),
880 scenario: Default::default(),
881 temporal: Default::default(),
882 graph_export: Default::default(),
883 streaming: Default::default(),
884 rate_limit: Default::default(),
885 temporal_attributes: Default::default(),
886 relationships: Default::default(),
887 accounting_standards: Default::default(),
888 audit_standards: Default::default(),
889 distributions: Default::default(),
890 temporal_patterns: Default::default(),
891 vendor_network: Default::default(),
892 customer_segmentation: Default::default(),
893 relationship_strength: Default::default(),
894 cross_process_links: Default::default(),
895 organizational_events: Default::default(),
896 behavioral_drift: Default::default(),
897 market_drift: Default::default(),
898 drift_labeling: Default::default(),
899 anomaly_injection: Default::default(),
900 industry_specific: Default::default(),
901 fingerprint_privacy: Default::default(),
902 quality_gates: Default::default(),
903 compliance: Default::default(),
904 webhooks: Default::default(),
905 llm: Default::default(),
906 diffusion: Default::default(),
907 causal: Default::default(),
908 source_to_pay: Default::default(),
909 financial_reporting: Default::default(),
910 hr: Default::default(),
911 manufacturing: Default::default(),
912 sales_quotes: Default::default(),
913 tax: Default::default(),
914 treasury: Default::default(),
915 project_accounting: Default::default(),
916 esg: Default::default(),
917 country_packs: None,
918 scenarios: Default::default(),
919 session: Default::default(),
920 compliance_regulations: Default::default(),
921 }
922}
923
924#[cfg(test)]
925#[allow(clippy::unwrap_used)]
926mod tests {
927 use super::*;
928 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
929
930 #[tokio::test]
935 async fn test_service_creation() {
936 let config = default_generator_config();
937 let service = SynthService::new(config);
938 assert!(service.state.uptime_seconds() < 60);
940 }
941
942 #[tokio::test]
943 async fn test_service_with_state() {
944 let config = default_generator_config();
945 let state = Arc::new(ServerState::new(config));
946 let service = SynthService::with_state(Arc::clone(&state));
947
948 state.total_entries.store(100, Ordering::Relaxed);
950 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
951 }
952
953 #[tokio::test]
958 async fn test_health_check() {
959 let config = default_generator_config();
960 let service = SynthService::new(config);
961
962 let response = service.health_check(Request::new(())).await.unwrap();
963 let health = response.into_inner();
964
965 assert!(health.healthy);
966 assert!(!health.version.is_empty());
967 }
968
969 #[tokio::test]
970 async fn test_health_check_returns_version() {
971 let config = default_generator_config();
972 let service = SynthService::new(config);
973
974 let response = service.health_check(Request::new(())).await.unwrap();
975 let health = response.into_inner();
976
977 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
978 }
979
980 #[tokio::test]
985 async fn test_get_config() {
986 let config = default_generator_config();
987 let service = SynthService::new(config);
988
989 let response = service.get_config(Request::new(())).await.unwrap();
990 let config_response = response.into_inner();
991
992 assert!(config_response.success);
993 assert!(config_response.current_config.is_some());
994 }
995
996 #[tokio::test]
997 async fn test_get_config_returns_industry() {
998 let config = default_generator_config();
999 let service = SynthService::new(config);
1000
1001 let response = service.get_config(Request::new(())).await.unwrap();
1002 let config_response = response.into_inner();
1003 let current = config_response.current_config.unwrap();
1004
1005 assert_eq!(current.industry, "Manufacturing");
1006 }
1007
1008 #[tokio::test]
1009 async fn test_set_config() {
1010 let config = default_generator_config();
1011 let service = SynthService::new(config);
1012
1013 let new_config = GenerationConfig {
1014 industry: "retail".to_string(),
1015 start_date: "2024-06-01".to_string(),
1016 period_months: 6,
1017 seed: 42,
1018 coa_complexity: "medium".to_string(),
1019 companies: vec![],
1020 fraud_enabled: true,
1021 fraud_rate: 0.05,
1022 generate_master_data: false,
1023 generate_document_flows: false,
1024 };
1025
1026 let response = service
1027 .set_config(Request::new(ConfigRequest {
1028 config: Some(new_config),
1029 }))
1030 .await
1031 .unwrap();
1032 let config_response = response.into_inner();
1033
1034 assert!(config_response.success);
1035 }
1036
1037 #[tokio::test]
1038 async fn test_set_config_without_config_fails() {
1039 let config = default_generator_config();
1040 let service = SynthService::new(config);
1041
1042 let result = service
1043 .set_config(Request::new(ConfigRequest { config: None }))
1044 .await;
1045
1046 assert!(result.is_err());
1047 }
1048
1049 #[tokio::test]
1054 async fn test_get_metrics_initial() {
1055 let config = default_generator_config();
1056 let service = SynthService::new(config);
1057
1058 let response = service.get_metrics(Request::new(())).await.unwrap();
1059 let metrics = response.into_inner();
1060
1061 assert_eq!(metrics.total_entries_generated, 0);
1062 assert_eq!(metrics.total_anomalies_injected, 0);
1063 assert_eq!(metrics.active_streams, 0);
1064 }
1065
1066 #[tokio::test]
1067 async fn test_get_metrics_after_updates() {
1068 let config = default_generator_config();
1069 let service = SynthService::new(config);
1070
1071 service.state.total_entries.store(1000, Ordering::Relaxed);
1073 service.state.total_anomalies.store(20, Ordering::Relaxed);
1074 service.state.active_streams.store(2, Ordering::Relaxed);
1075
1076 let response = service.get_metrics(Request::new(())).await.unwrap();
1077 let metrics = response.into_inner();
1078
1079 assert_eq!(metrics.total_entries_generated, 1000);
1080 assert_eq!(metrics.total_anomalies_injected, 20);
1081 assert_eq!(metrics.active_streams, 2);
1082 }
1083
1084 #[tokio::test]
1089 async fn test_control_pause() {
1090 let config = default_generator_config();
1091 let service = SynthService::new(config);
1092
1093 let response = service
1094 .control(Request::new(ControlCommand {
1095 action: ControlAction::Pause as i32,
1096 pattern_name: None,
1097 }))
1098 .await
1099 .unwrap();
1100 let control_response = response.into_inner();
1101
1102 assert!(control_response.success);
1103 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1104 }
1105
1106 #[tokio::test]
1107 async fn test_control_resume() {
1108 let config = default_generator_config();
1109 let service = SynthService::new(config);
1110
1111 service.state.stream_paused.store(true, Ordering::Relaxed);
1113
1114 let response = service
1115 .control(Request::new(ControlCommand {
1116 action: ControlAction::Resume as i32,
1117 pattern_name: None,
1118 }))
1119 .await
1120 .unwrap();
1121 let control_response = response.into_inner();
1122
1123 assert!(control_response.success);
1124 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1125 }
1126
1127 #[tokio::test]
1128 async fn test_control_stop() {
1129 let config = default_generator_config();
1130 let service = SynthService::new(config);
1131
1132 let response = service
1133 .control(Request::new(ControlCommand {
1134 action: ControlAction::Stop as i32,
1135 pattern_name: None,
1136 }))
1137 .await
1138 .unwrap();
1139 let control_response = response.into_inner();
1140
1141 assert!(control_response.success);
1142 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1143 }
1144
1145 #[test]
1150 fn test_server_state_creation() {
1151 let config = default_generator_config();
1152 let state = ServerState::new(config);
1153
1154 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1155 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1156 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1157 assert!(!state.stream_paused.load(Ordering::Relaxed));
1158 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1159 }
1160
1161 #[test]
1162 fn test_server_state_uptime() {
1163 let config = default_generator_config();
1164 let state = ServerState::new(config);
1165
1166 assert!(state.uptime_seconds() < 60);
1168 }
1169
1170 #[test]
1175 fn test_default_generator_config() {
1176 let config = default_generator_config();
1177
1178 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1179 assert_eq!(config.global.period_months, 12);
1180 assert!(!config.companies.is_empty());
1181 assert_eq!(config.companies[0].code, "1000");
1182 }
1183
1184 #[test]
1185 fn test_config_to_proto() {
1186 let config = default_generator_config();
1187 let proto = SynthService::config_to_proto(&config);
1188
1189 assert_eq!(proto.industry, "Manufacturing");
1190 assert_eq!(proto.period_months, 12);
1191 assert!(!proto.companies.is_empty());
1192 }
1193
1194 #[tokio::test]
1195 async fn test_proto_to_config_with_none() {
1196 let config = default_generator_config();
1197 let service = SynthService::new(config.clone());
1198
1199 let result = service.proto_to_config(None).await.unwrap();
1200
1201 assert_eq!(result.global.industry, config.global.industry);
1203 }
1204
1205 #[tokio::test]
1206 async fn test_proto_to_config_with_retail() {
1207 let config = default_generator_config();
1208 let service = SynthService::new(config);
1209
1210 let proto = GenerationConfig {
1211 industry: "retail".to_string(),
1212 start_date: "2024-01-01".to_string(),
1213 period_months: 6,
1214 seed: 0,
1215 coa_complexity: "large".to_string(),
1216 companies: vec![],
1217 fraud_enabled: false,
1218 fraud_rate: 0.0,
1219 generate_master_data: false,
1220 generate_document_flows: false,
1221 };
1222
1223 let result = service.proto_to_config(Some(proto)).await.unwrap();
1224
1225 assert_eq!(result.global.industry, IndustrySector::Retail);
1226 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1227 }
1228
1229 #[tokio::test]
1230 async fn test_proto_to_config_with_healthcare() {
1231 let config = default_generator_config();
1232 let service = SynthService::new(config);
1233
1234 let proto = GenerationConfig {
1235 industry: "healthcare".to_string(),
1236 start_date: "2024-01-01".to_string(),
1237 period_months: 12,
1238 seed: 42,
1239 coa_complexity: "small".to_string(),
1240 companies: vec![],
1241 fraud_enabled: true,
1242 fraud_rate: 0.1,
1243 generate_master_data: true,
1244 generate_document_flows: true,
1245 };
1246
1247 let result = service.proto_to_config(Some(proto)).await.unwrap();
1248
1249 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1250 assert_eq!(result.global.seed, Some(42));
1251 assert!(result.fraud.enabled);
1252 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1253 }
1254
1255 #[tokio::test]
1256 async fn test_proto_to_config_with_companies() {
1257 let config = default_generator_config();
1258 let service = SynthService::new(config);
1259
1260 let proto = GenerationConfig {
1261 industry: "technology".to_string(),
1262 start_date: "2024-01-01".to_string(),
1263 period_months: 12,
1264 seed: 0,
1265 coa_complexity: "medium".to_string(),
1266 companies: vec![
1267 CompanyConfigProto {
1268 code: "1000".to_string(),
1269 name: "Parent Corp".to_string(),
1270 currency: "USD".to_string(),
1271 country: "US".to_string(),
1272 annual_transaction_volume: 100000,
1273 volume_weight: 1.0,
1274 },
1275 CompanyConfigProto {
1276 code: "2000".to_string(),
1277 name: "EU Sub".to_string(),
1278 currency: "EUR".to_string(),
1279 country: "DE".to_string(),
1280 annual_transaction_volume: 50000,
1281 volume_weight: 0.5,
1282 },
1283 ],
1284 fraud_enabled: false,
1285 fraud_rate: 0.0,
1286 generate_master_data: false,
1287 generate_document_flows: false,
1288 };
1289
1290 let result = service.proto_to_config(Some(proto)).await.unwrap();
1291
1292 assert_eq!(result.companies.len(), 2);
1293 assert_eq!(result.companies[0].code, "1000");
1294 assert_eq!(result.companies[1].currency, "EUR");
1295 }
1296
1297 #[tokio::test]
1302 async fn test_bulk_generate_entry_count_validation() {
1303 let config = default_generator_config();
1304 let service = SynthService::new(config);
1305
1306 let request = BulkGenerateRequest {
1307 entry_count: 2_000_000, include_master_data: false,
1309 inject_anomalies: false,
1310 output_format: 0,
1311 config: None,
1312 };
1313
1314 let result = service.bulk_generate(Request::new(request)).await;
1315 assert!(result.is_err());
1316 let err = result.err().unwrap();
1317 assert!(err.message().contains("exceeds maximum allowed value"));
1318 }
1319
1320 #[tokio::test]
1321 async fn test_stream_data_events_per_second_too_low() {
1322 let config = default_generator_config();
1323 let service = SynthService::new(config);
1324
1325 let request = StreamDataRequest {
1326 events_per_second: 0, max_events: 100,
1328 inject_anomalies: false,
1329 anomaly_rate: 0.0,
1330 config: None,
1331 };
1332
1333 let result = service.stream_data(Request::new(request)).await;
1334 assert!(result.is_err());
1335 let err = result.err().unwrap();
1336 assert!(err.message().contains("must be at least"));
1337 }
1338
1339 #[tokio::test]
1340 async fn test_stream_data_events_per_second_too_high() {
1341 let config = default_generator_config();
1342 let service = SynthService::new(config);
1343
1344 let request = StreamDataRequest {
1345 events_per_second: 20_000, max_events: 100,
1347 inject_anomalies: false,
1348 anomaly_rate: 0.0,
1349 config: None,
1350 };
1351
1352 let result = service.stream_data(Request::new(request)).await;
1353 assert!(result.is_err());
1354 let err = result.err().unwrap();
1355 assert!(err.message().contains("exceeds maximum allowed value"));
1356 }
1357
1358 #[tokio::test]
1359 async fn test_stream_data_max_events_too_high() {
1360 let config = default_generator_config();
1361 let service = SynthService::new(config);
1362
1363 let request = StreamDataRequest {
1364 events_per_second: 100,
1365 max_events: 100_000_000, inject_anomalies: false,
1367 anomaly_rate: 0.0,
1368 config: None,
1369 };
1370
1371 let result = service.stream_data(Request::new(request)).await;
1372 assert!(result.is_err());
1373 let err = result.err().unwrap();
1374 assert!(err.message().contains("max_events"));
1375 }
1376
1377 #[tokio::test]
1378 async fn test_stream_data_valid_request() {
1379 let config = default_generator_config();
1380 let service = SynthService::new(config);
1381
1382 let request = StreamDataRequest {
1383 events_per_second: 10,
1384 max_events: 5,
1385 inject_anomalies: false,
1386 anomaly_rate: 0.0,
1387 config: None,
1388 };
1389
1390 let result = service.stream_data(Request::new(request)).await;
1393 assert!(result.is_ok());
1394 }
1395}