1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 functional_currency: None,
236 country: "US".to_string(),
237 annual_transaction_volume: TransactionVolume::TenK,
238 volume_weight: 1.0,
239 fiscal_year_variant: "K4".to_string(),
240 }]
241 } else {
242 p.companies
243 .into_iter()
244 .map(|c| CompanyConfig {
245 code: c.code,
246 name: c.name,
247 currency: c.currency,
248 functional_currency: None,
249 country: c.country,
250 annual_transaction_volume: TransactionVolume::Custom(
251 c.annual_transaction_volume,
252 ),
253 volume_weight: c.volume_weight as f64,
254 fiscal_year_variant: "K4".to_string(),
255 })
256 .collect()
257 };
258
259 let mut config = GeneratorConfig {
260 global: GlobalConfig {
261 seed: if p.seed > 0 { Some(p.seed) } else { None },
262 industry,
263 start_date: if p.start_date.is_empty() {
264 "2024-01-01".to_string()
265 } else {
266 p.start_date
267 },
268 period_months: if p.period_months == 0 {
269 12
270 } else {
271 p.period_months
272 },
273 group_currency: "USD".to_string(),
274 presentation_currency: None,
275 parallel: true,
276 worker_threads: 0,
277 memory_limit_mb: 0,
278 fiscal_year_months: None,
279 },
280 companies,
281 chart_of_accounts: ChartOfAccountsConfig {
282 complexity,
283 industry_specific: true,
284 custom_accounts: None,
285 min_hierarchy_depth: 2,
286 max_hierarchy_depth: 5,
287 },
288 ..default_generator_config()
289 };
290
291 if p.fraud_enabled {
293 config.fraud.enabled = true;
294 config.fraud.fraud_rate = p.fraud_rate as f64;
295 }
296
297 Ok(config)
298 }
299 None => {
300 let config = self.state.config.read().await;
302 Ok(config.clone())
303 }
304 }
305 }
306
307 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
309 JournalEntryProto {
310 document_id: entry.header.document_id.to_string(),
311 company_code: entry.header.company_code.clone(),
312 fiscal_year: entry.header.fiscal_year as u32,
313 fiscal_period: entry.header.fiscal_period as u32,
314 posting_date: entry.header.posting_date.to_string(),
315 document_date: entry.header.document_date.to_string(),
316 created_at: entry.header.created_at.to_rfc3339(),
317 source: format!("{:?}", entry.header.source),
318 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
319 lines: entry
320 .lines
321 .iter()
322 .map(|line| {
323 let amount = if line.is_debit() {
324 line.debit_amount
325 } else {
326 line.credit_amount
327 };
328 JournalLineProto {
329 line_number: line.line_number,
330 account_number: line.gl_account.clone(),
331 account_name: line.account_description.clone().unwrap_or_default(),
332 amount: amount.to_string(),
333 is_debit: line.is_debit(),
334 cost_center: line.cost_center.clone(),
335 profit_center: line.profit_center.clone(),
336 vendor_id: line.auxiliary_account_number.clone(),
339 customer_id: None,
340 material_id: None,
341 text: line.line_text.clone().or_else(|| line.text.clone()),
342 }
343 })
344 .collect(),
345 is_anomaly: entry.header.is_fraud,
346 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
347 }
348 }
349
350 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
352 GenerationConfig {
353 industry: format!("{:?}", config.global.industry),
354 start_date: config.global.start_date.clone(),
355 period_months: config.global.period_months,
356 seed: config.global.seed.unwrap_or(0),
357 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
358 companies: config
359 .companies
360 .iter()
361 .map(|c| CompanyConfigProto {
362 code: c.code.clone(),
363 name: c.name.clone(),
364 currency: c.currency.clone(),
365 country: c.country.clone(),
366 annual_transaction_volume: c.annual_transaction_volume.count(),
367 volume_weight: c.volume_weight as f32,
368 })
369 .collect(),
370 fraud_enabled: config.fraud.enabled,
371 fraud_rate: config.fraud.fraud_rate as f32,
372 generate_master_data: config.master_data.vendors.count > 0
373 || config.master_data.customers.count > 0
374 || config.master_data.materials.count > 0,
375 generate_document_flows: config.document_flows.p2p.enabled
376 || config.document_flows.o2c.enabled,
377 }
378 }
379}
380
381#[tonic::async_trait]
382impl synthetic_data_service_server::SyntheticDataService for SynthService {
383 async fn bulk_generate(
385 &self,
386 request: Request<BulkGenerateRequest>,
387 ) -> Result<Response<BulkGenerateResponse>, Status> {
388 let req = request.into_inner();
389
390 const MAX_ENTRY_COUNT: u64 = 1_000_000;
392 if req.entry_count > MAX_ENTRY_COUNT {
393 return Err(Status::invalid_argument(format!(
394 "entry_count ({}) exceeds maximum allowed value ({})",
395 req.entry_count, MAX_ENTRY_COUNT
396 )));
397 }
398
399 let degradation_level = self.state.check_resources()?;
401 if degradation_level != DegradationLevel::Normal {
402 warn!(
403 "Starting bulk generation under resource pressure (level: {:?})",
404 degradation_level
405 );
406 }
407
408 info!("Bulk generate request: {} entries", req.entry_count);
409
410 let config = self.proto_to_config(req.config).await?;
411 let start_time = Instant::now();
412
413 let phase_config = {
415 let mut pc = PhaseConfig::from_config(&config);
416 pc.generate_master_data = req.include_master_data;
417 pc.generate_document_flows = false;
418 pc.generate_journal_entries = true;
419 pc.inject_anomalies = req.inject_anomalies;
420 pc.show_progress = false;
421 pc
422 };
423
424 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
425 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
426
427 let result = orchestrator
428 .generate()
429 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
430
431 let duration_ms = start_time.elapsed().as_millis() as u64;
432
433 let entries_count = result.journal_entries.len() as u64;
435 self.state
436 .total_entries
437 .fetch_add(entries_count, Ordering::Relaxed);
438
439 let anomaly_count = result.anomaly_labels.labels.len() as u64;
440 self.state
441 .total_anomalies
442 .fetch_add(anomaly_count, Ordering::Relaxed);
443
444 let journal_entries: Vec<JournalEntryProto> = result
446 .journal_entries
447 .iter()
448 .map(Self::journal_entry_to_proto)
449 .collect();
450
451 let anomaly_labels: Vec<AnomalyLabelProto> = result
452 .anomaly_labels
453 .labels
454 .iter()
455 .map(|a| AnomalyLabelProto {
456 anomaly_id: a.anomaly_id.clone(),
457 document_id: a.document_id.clone(),
458 anomaly_type: format!("{:?}", a.anomaly_type),
459 anomaly_category: a.document_type.clone(),
460 description: a.description.clone(),
461 severity_score: a.severity as f32,
462 })
463 .collect();
464
465 let mut total_debit = rust_decimal::Decimal::ZERO;
467 let mut total_credit = rust_decimal::Decimal::ZERO;
468 let mut total_lines = 0u64;
469 let mut entries_by_company = std::collections::HashMap::new();
470 let mut entries_by_source = std::collections::HashMap::new();
471
472 for entry in &result.journal_entries {
473 *entries_by_company
474 .entry(entry.header.company_code.clone())
475 .or_insert(0u64) += 1;
476 *entries_by_source
477 .entry(format!("{:?}", entry.header.source))
478 .or_insert(0u64) += 1;
479
480 for line in &entry.lines {
481 total_lines += 1;
482 total_debit += line.debit_amount;
483 total_credit += line.credit_amount;
484 }
485 }
486
487 let stats = GenerationStats {
488 total_entries: entries_count,
489 total_lines,
490 total_debit_amount: total_debit.to_string(),
491 total_credit_amount: total_credit.to_string(),
492 anomaly_count,
493 entries_by_company,
494 entries_by_source,
495 };
496
497 info!(
498 "Bulk generation complete: {} entries in {}ms",
499 entries_count, duration_ms
500 );
501
502 Ok(Response::new(BulkGenerateResponse {
503 entries_generated: entries_count,
504 duration_ms,
505 journal_entries,
506 anomaly_labels,
507 stats: Some(stats),
508 }))
509 }
510
511 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
512
513 async fn stream_data(
515 &self,
516 request: Request<StreamDataRequest>,
517 ) -> Result<Response<Self::StreamDataStream>, Status> {
518 let req = request.into_inner();
519
520 const MIN_EVENTS_PER_SECOND: u32 = 1;
522 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
523 if req.events_per_second < MIN_EVENTS_PER_SECOND {
524 return Err(Status::invalid_argument(format!(
525 "events_per_second ({}) must be at least {}",
526 req.events_per_second, MIN_EVENTS_PER_SECOND
527 )));
528 }
529 if req.events_per_second > MAX_EVENTS_PER_SECOND {
530 return Err(Status::invalid_argument(format!(
531 "events_per_second ({}) exceeds maximum allowed value ({})",
532 req.events_per_second, MAX_EVENTS_PER_SECOND
533 )));
534 }
535
536 const MAX_STREAM_EVENTS: u64 = 10_000_000;
538 if req.max_events > MAX_STREAM_EVENTS {
539 return Err(Status::invalid_argument(format!(
540 "max_events ({}) exceeds maximum allowed value ({})",
541 req.max_events, MAX_STREAM_EVENTS
542 )));
543 }
544
545 let degradation_level = self.state.check_resources()?;
547 if degradation_level != DegradationLevel::Normal {
548 warn!(
549 "Starting stream under resource pressure (level: {:?})",
550 degradation_level
551 );
552 }
553
554 info!(
555 "Stream data request: {} events/sec, max {}",
556 req.events_per_second, req.max_events
557 );
558
559 let config = self.proto_to_config(req.config).await?;
560 let state = self.state.clone();
561
562 state.active_streams.fetch_add(1, Ordering::Relaxed);
564
565 state.stream_paused.store(false, Ordering::Relaxed);
567 state.stream_stopped.store(false, Ordering::Relaxed);
568
569 let (tx, rx) = mpsc::channel(100);
570
571 let events_per_second = req.events_per_second;
573 let max_events = req.max_events;
574 let inject_anomalies = req.inject_anomalies;
575
576 tokio::spawn(async move {
577 let phase_config = {
578 let mut pc = PhaseConfig::from_config(&config);
579 pc.generate_master_data = false;
580 pc.generate_document_flows = false;
581 pc.generate_journal_entries = true;
582 pc.inject_anomalies = inject_anomalies;
583 pc.show_progress = false;
584 pc
585 };
586
587 let mut sequence = 0u64;
588 let delay = if events_per_second > 0 {
589 Duration::from_micros(1_000_000 / events_per_second as u64)
590 } else {
591 Duration::from_millis(1)
592 };
593
594 let mut orchestrator =
596 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
597 Ok(o) => o,
598 Err(e) => {
599 error!("Failed to create orchestrator: {}", e);
600 return;
601 }
602 };
603
604 loop {
605 if state.stream_stopped.load(Ordering::Relaxed) {
607 info!("Stream stopped by control command");
608 break;
609 }
610
611 while state.stream_paused.load(Ordering::Relaxed) {
613 tokio::time::sleep(Duration::from_millis(100)).await;
614 if state.stream_stopped.load(Ordering::Relaxed) {
615 break;
616 }
617 }
618
619 if max_events > 0 && sequence >= max_events {
621 info!("Stream reached max events: {}", max_events);
622 break;
623 }
624
625 let result = match orchestrator.generate() {
627 Ok(r) => r,
628 Err(e) => {
629 error!("Generation failed: {}", e);
630 break;
631 }
632 };
633
634 for entry in result.journal_entries {
636 sequence += 1;
637 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
638 state.total_entries.fetch_add(1, Ordering::Relaxed);
639
640 let timestamp = Timestamp {
641 seconds: Utc::now().timestamp(),
642 nanos: 0,
643 };
644
645 let event = DataEvent {
646 sequence,
647 timestamp: Some(timestamp),
648 event: Some(data_event::Event::JournalEntry(
649 SynthService::journal_entry_to_proto(&entry),
650 )),
651 };
652
653 if tx.send(Ok(event)).await.is_err() {
654 info!("Stream receiver dropped");
655 break;
656 }
657
658 tokio::time::sleep(delay).await;
660
661 if max_events > 0 && sequence >= max_events {
663 break;
664 }
665 }
666 }
667
668 state.active_streams.fetch_sub(1, Ordering::Relaxed);
670 });
671
672 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
673 }
674
675 async fn control(
677 &self,
678 request: Request<ControlCommand>,
679 ) -> Result<Response<ControlResponse>, Status> {
680 let cmd = request.into_inner();
681 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
682
683 info!("Control command: {:?}", action);
684
685 let (success, message, status) = match action {
686 ControlAction::Pause => {
687 self.state.stream_paused.store(true, Ordering::Relaxed);
688 (true, "Stream paused".to_string(), StreamStatus::Paused)
689 }
690 ControlAction::Resume => {
691 self.state.stream_paused.store(false, Ordering::Relaxed);
692 (true, "Stream resumed".to_string(), StreamStatus::Running)
693 }
694 ControlAction::Stop => {
695 self.state.stream_stopped.store(true, Ordering::Relaxed);
696 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
697 }
698 ControlAction::TriggerPattern => {
699 let pattern = cmd.pattern_name.unwrap_or_default();
700 if pattern.is_empty() {
701 (
702 false,
703 "Pattern name is required for TriggerPattern action".to_string(),
704 StreamStatus::Running,
705 )
706 } else {
707 let valid_patterns = [
710 "year_end_spike",
711 "period_end_spike",
712 "holiday_cluster",
713 "fraud_cluster",
714 "error_cluster",
715 "uniform",
716 ];
717 let is_valid = valid_patterns.contains(&pattern.as_str())
718 || pattern.starts_with("custom:");
719
720 if is_valid {
721 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
723 *triggered = Some(pattern.clone());
724 }
725 info!("Pattern trigger activated: {}", pattern);
726 (
727 true,
728 format!("Pattern '{pattern}' will be applied to upcoming entries"),
729 StreamStatus::Running,
730 )
731 } else {
732 (
733 false,
734 format!(
735 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
736 ),
737 StreamStatus::Running,
738 )
739 }
740 }
741 }
742 ControlAction::Unspecified => (
743 false,
744 "Unknown control action".to_string(),
745 StreamStatus::Unspecified,
746 ),
747 };
748
749 Ok(Response::new(ControlResponse {
750 success,
751 message,
752 current_status: status as i32,
753 }))
754 }
755
756 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
758 let config = self.state.config.read().await;
759 let proto_config = Self::config_to_proto(&config);
760
761 Ok(Response::new(ConfigResponse {
762 success: true,
763 message: "Current configuration retrieved".to_string(),
764 current_config: Some(proto_config),
765 }))
766 }
767
768 async fn set_config(
770 &self,
771 request: Request<ConfigRequest>,
772 ) -> Result<Response<ConfigResponse>, Status> {
773 let req = request.into_inner();
774
775 if let Some(proto_config) = req.config {
776 let new_config = self.proto_to_config(Some(proto_config)).await?;
777
778 let mut config = self.state.config.write().await;
779 *config = new_config.clone();
780
781 info!("Configuration updated");
782
783 Ok(Response::new(ConfigResponse {
784 success: true,
785 message: "Configuration updated".to_string(),
786 current_config: Some(Self::config_to_proto(&new_config)),
787 }))
788 } else {
789 Err(Status::invalid_argument("No configuration provided"))
790 }
791 }
792
793 async fn get_metrics(
795 &self,
796 _request: Request<()>,
797 ) -> Result<Response<MetricsResponse>, Status> {
798 let uptime = self.state.uptime_seconds();
799 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
800
801 let entries_per_second = if uptime > 0 {
802 total_entries as f64 / uptime as f64
803 } else {
804 0.0
805 };
806
807 Ok(Response::new(MetricsResponse {
808 total_entries_generated: total_entries,
809 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
810 uptime_seconds: uptime,
811 session_entries: total_entries,
812 session_entries_per_second: entries_per_second,
813 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
814 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
815 }))
816 }
817
818 async fn health_check(
820 &self,
821 _request: Request<()>,
822 ) -> Result<Response<HealthResponse>, Status> {
823 Ok(Response::new(HealthResponse {
824 healthy: true,
825 version: env!("CARGO_PKG_VERSION").to_string(),
826 uptime_seconds: self.state.uptime_seconds(),
827 }))
828 }
829}
830
831pub fn default_generator_config() -> GeneratorConfig {
833 GeneratorConfig {
834 global: GlobalConfig {
835 seed: None,
836 industry: IndustrySector::Manufacturing,
837 start_date: "2024-01-01".to_string(),
838 period_months: 12,
839 group_currency: "USD".to_string(),
840 presentation_currency: None,
841 parallel: true,
842 worker_threads: 0,
843 memory_limit_mb: 0,
844 fiscal_year_months: None,
845 },
846 companies: vec![CompanyConfig {
847 code: "1000".to_string(),
848 name: "Default Company".to_string(),
849 currency: "USD".to_string(),
850 functional_currency: None,
851 country: "US".to_string(),
852 annual_transaction_volume: TransactionVolume::TenK,
853 volume_weight: 1.0,
854 fiscal_year_variant: "K4".to_string(),
855 }],
856 chart_of_accounts: ChartOfAccountsConfig {
857 complexity: CoAComplexity::Small,
858 industry_specific: true,
859 custom_accounts: None,
860 min_hierarchy_depth: 2,
861 max_hierarchy_depth: 5,
862 },
863 transactions: Default::default(),
864 output: OutputConfig::default(),
865 fraud: Default::default(),
866 internal_controls: Default::default(),
867 business_processes: Default::default(),
868 user_personas: Default::default(),
869 templates: Default::default(),
870 approval: Default::default(),
871 departments: Default::default(),
872 master_data: Default::default(),
873 document_flows: Default::default(),
874 intercompany: Default::default(),
875 balance: Default::default(),
876 ocpm: Default::default(),
877 audit: Default::default(),
878 banking: Default::default(),
879 data_quality: Default::default(),
880 scenario: Default::default(),
881 temporal: Default::default(),
882 graph_export: Default::default(),
883 streaming: Default::default(),
884 rate_limit: Default::default(),
885 temporal_attributes: Default::default(),
886 relationships: Default::default(),
887 accounting_standards: Default::default(),
888 audit_standards: Default::default(),
889 distributions: Default::default(),
890 temporal_patterns: Default::default(),
891 vendor_network: Default::default(),
892 customer_segmentation: Default::default(),
893 relationship_strength: Default::default(),
894 cross_process_links: Default::default(),
895 organizational_events: Default::default(),
896 behavioral_drift: Default::default(),
897 market_drift: Default::default(),
898 drift_labeling: Default::default(),
899 anomaly_injection: Default::default(),
900 industry_specific: Default::default(),
901 fingerprint_privacy: Default::default(),
902 quality_gates: Default::default(),
903 compliance: Default::default(),
904 webhooks: Default::default(),
905 llm: Default::default(),
906 diffusion: Default::default(),
907 causal: Default::default(),
908 source_to_pay: Default::default(),
909 financial_reporting: Default::default(),
910 hr: Default::default(),
911 manufacturing: Default::default(),
912 sales_quotes: Default::default(),
913 tax: Default::default(),
914 treasury: Default::default(),
915 project_accounting: Default::default(),
916 esg: Default::default(),
917 country_packs: None,
918 scenarios: Default::default(),
919 session: Default::default(),
920 compliance_regulations: Default::default(),
921 analytics_metadata: Default::default(),
922 }
923}
924
925#[cfg(test)]
926#[allow(clippy::unwrap_used)]
927mod tests {
928 use super::*;
929 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
930
931 #[tokio::test]
936 async fn test_service_creation() {
937 let config = default_generator_config();
938 let service = SynthService::new(config);
939 assert!(service.state.uptime_seconds() < 60);
941 }
942
943 #[tokio::test]
944 async fn test_service_with_state() {
945 let config = default_generator_config();
946 let state = Arc::new(ServerState::new(config));
947 let service = SynthService::with_state(Arc::clone(&state));
948
949 state.total_entries.store(100, Ordering::Relaxed);
951 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
952 }
953
954 #[tokio::test]
959 async fn test_health_check() {
960 let config = default_generator_config();
961 let service = SynthService::new(config);
962
963 let response = service.health_check(Request::new(())).await.unwrap();
964 let health = response.into_inner();
965
966 assert!(health.healthy);
967 assert!(!health.version.is_empty());
968 }
969
970 #[tokio::test]
971 async fn test_health_check_returns_version() {
972 let config = default_generator_config();
973 let service = SynthService::new(config);
974
975 let response = service.health_check(Request::new(())).await.unwrap();
976 let health = response.into_inner();
977
978 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
979 }
980
981 #[tokio::test]
986 async fn test_get_config() {
987 let config = default_generator_config();
988 let service = SynthService::new(config);
989
990 let response = service.get_config(Request::new(())).await.unwrap();
991 let config_response = response.into_inner();
992
993 assert!(config_response.success);
994 assert!(config_response.current_config.is_some());
995 }
996
997 #[tokio::test]
998 async fn test_get_config_returns_industry() {
999 let config = default_generator_config();
1000 let service = SynthService::new(config);
1001
1002 let response = service.get_config(Request::new(())).await.unwrap();
1003 let config_response = response.into_inner();
1004 let current = config_response.current_config.unwrap();
1005
1006 assert_eq!(current.industry, "Manufacturing");
1007 }
1008
1009 #[tokio::test]
1010 async fn test_set_config() {
1011 let config = default_generator_config();
1012 let service = SynthService::new(config);
1013
1014 let new_config = GenerationConfig {
1015 industry: "retail".to_string(),
1016 start_date: "2024-06-01".to_string(),
1017 period_months: 6,
1018 seed: 42,
1019 coa_complexity: "medium".to_string(),
1020 companies: vec![],
1021 fraud_enabled: true,
1022 fraud_rate: 0.05,
1023 generate_master_data: false,
1024 generate_document_flows: false,
1025 };
1026
1027 let response = service
1028 .set_config(Request::new(ConfigRequest {
1029 config: Some(new_config),
1030 }))
1031 .await
1032 .unwrap();
1033 let config_response = response.into_inner();
1034
1035 assert!(config_response.success);
1036 }
1037
1038 #[tokio::test]
1039 async fn test_set_config_without_config_fails() {
1040 let config = default_generator_config();
1041 let service = SynthService::new(config);
1042
1043 let result = service
1044 .set_config(Request::new(ConfigRequest { config: None }))
1045 .await;
1046
1047 assert!(result.is_err());
1048 }
1049
1050 #[tokio::test]
1055 async fn test_get_metrics_initial() {
1056 let config = default_generator_config();
1057 let service = SynthService::new(config);
1058
1059 let response = service.get_metrics(Request::new(())).await.unwrap();
1060 let metrics = response.into_inner();
1061
1062 assert_eq!(metrics.total_entries_generated, 0);
1063 assert_eq!(metrics.total_anomalies_injected, 0);
1064 assert_eq!(metrics.active_streams, 0);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_get_metrics_after_updates() {
1069 let config = default_generator_config();
1070 let service = SynthService::new(config);
1071
1072 service.state.total_entries.store(1000, Ordering::Relaxed);
1074 service.state.total_anomalies.store(20, Ordering::Relaxed);
1075 service.state.active_streams.store(2, Ordering::Relaxed);
1076
1077 let response = service.get_metrics(Request::new(())).await.unwrap();
1078 let metrics = response.into_inner();
1079
1080 assert_eq!(metrics.total_entries_generated, 1000);
1081 assert_eq!(metrics.total_anomalies_injected, 20);
1082 assert_eq!(metrics.active_streams, 2);
1083 }
1084
1085 #[tokio::test]
1090 async fn test_control_pause() {
1091 let config = default_generator_config();
1092 let service = SynthService::new(config);
1093
1094 let response = service
1095 .control(Request::new(ControlCommand {
1096 action: ControlAction::Pause as i32,
1097 pattern_name: None,
1098 }))
1099 .await
1100 .unwrap();
1101 let control_response = response.into_inner();
1102
1103 assert!(control_response.success);
1104 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1105 }
1106
1107 #[tokio::test]
1108 async fn test_control_resume() {
1109 let config = default_generator_config();
1110 let service = SynthService::new(config);
1111
1112 service.state.stream_paused.store(true, Ordering::Relaxed);
1114
1115 let response = service
1116 .control(Request::new(ControlCommand {
1117 action: ControlAction::Resume as i32,
1118 pattern_name: None,
1119 }))
1120 .await
1121 .unwrap();
1122 let control_response = response.into_inner();
1123
1124 assert!(control_response.success);
1125 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1126 }
1127
1128 #[tokio::test]
1129 async fn test_control_stop() {
1130 let config = default_generator_config();
1131 let service = SynthService::new(config);
1132
1133 let response = service
1134 .control(Request::new(ControlCommand {
1135 action: ControlAction::Stop as i32,
1136 pattern_name: None,
1137 }))
1138 .await
1139 .unwrap();
1140 let control_response = response.into_inner();
1141
1142 assert!(control_response.success);
1143 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1144 }
1145
1146 #[test]
1151 fn test_server_state_creation() {
1152 let config = default_generator_config();
1153 let state = ServerState::new(config);
1154
1155 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1156 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1157 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1158 assert!(!state.stream_paused.load(Ordering::Relaxed));
1159 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1160 }
1161
1162 #[test]
1163 fn test_server_state_uptime() {
1164 let config = default_generator_config();
1165 let state = ServerState::new(config);
1166
1167 assert!(state.uptime_seconds() < 60);
1169 }
1170
1171 #[test]
1176 fn test_default_generator_config() {
1177 let config = default_generator_config();
1178
1179 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1180 assert_eq!(config.global.period_months, 12);
1181 assert!(!config.companies.is_empty());
1182 assert_eq!(config.companies[0].code, "1000");
1183 }
1184
1185 #[test]
1186 fn test_config_to_proto() {
1187 let config = default_generator_config();
1188 let proto = SynthService::config_to_proto(&config);
1189
1190 assert_eq!(proto.industry, "Manufacturing");
1191 assert_eq!(proto.period_months, 12);
1192 assert!(!proto.companies.is_empty());
1193 }
1194
1195 #[tokio::test]
1196 async fn test_proto_to_config_with_none() {
1197 let config = default_generator_config();
1198 let service = SynthService::new(config.clone());
1199
1200 let result = service.proto_to_config(None).await.unwrap();
1201
1202 assert_eq!(result.global.industry, config.global.industry);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_proto_to_config_with_retail() {
1208 let config = default_generator_config();
1209 let service = SynthService::new(config);
1210
1211 let proto = GenerationConfig {
1212 industry: "retail".to_string(),
1213 start_date: "2024-01-01".to_string(),
1214 period_months: 6,
1215 seed: 0,
1216 coa_complexity: "large".to_string(),
1217 companies: vec![],
1218 fraud_enabled: false,
1219 fraud_rate: 0.0,
1220 generate_master_data: false,
1221 generate_document_flows: false,
1222 };
1223
1224 let result = service.proto_to_config(Some(proto)).await.unwrap();
1225
1226 assert_eq!(result.global.industry, IndustrySector::Retail);
1227 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1228 }
1229
1230 #[tokio::test]
1231 async fn test_proto_to_config_with_healthcare() {
1232 let config = default_generator_config();
1233 let service = SynthService::new(config);
1234
1235 let proto = GenerationConfig {
1236 industry: "healthcare".to_string(),
1237 start_date: "2024-01-01".to_string(),
1238 period_months: 12,
1239 seed: 42,
1240 coa_complexity: "small".to_string(),
1241 companies: vec![],
1242 fraud_enabled: true,
1243 fraud_rate: 0.1,
1244 generate_master_data: true,
1245 generate_document_flows: true,
1246 };
1247
1248 let result = service.proto_to_config(Some(proto)).await.unwrap();
1249
1250 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1251 assert_eq!(result.global.seed, Some(42));
1252 assert!(result.fraud.enabled);
1253 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1254 }
1255
1256 #[tokio::test]
1257 async fn test_proto_to_config_with_companies() {
1258 let config = default_generator_config();
1259 let service = SynthService::new(config);
1260
1261 let proto = GenerationConfig {
1262 industry: "technology".to_string(),
1263 start_date: "2024-01-01".to_string(),
1264 period_months: 12,
1265 seed: 0,
1266 coa_complexity: "medium".to_string(),
1267 companies: vec![
1268 CompanyConfigProto {
1269 code: "1000".to_string(),
1270 name: "Parent Corp".to_string(),
1271 currency: "USD".to_string(),
1272 country: "US".to_string(),
1273 annual_transaction_volume: 100000,
1274 volume_weight: 1.0,
1275 },
1276 CompanyConfigProto {
1277 code: "2000".to_string(),
1278 name: "EU Sub".to_string(),
1279 currency: "EUR".to_string(),
1280 country: "DE".to_string(),
1281 annual_transaction_volume: 50000,
1282 volume_weight: 0.5,
1283 },
1284 ],
1285 fraud_enabled: false,
1286 fraud_rate: 0.0,
1287 generate_master_data: false,
1288 generate_document_flows: false,
1289 };
1290
1291 let result = service.proto_to_config(Some(proto)).await.unwrap();
1292
1293 assert_eq!(result.companies.len(), 2);
1294 assert_eq!(result.companies[0].code, "1000");
1295 assert_eq!(result.companies[1].currency, "EUR");
1296 }
1297
1298 #[tokio::test]
1303 async fn test_bulk_generate_entry_count_validation() {
1304 let config = default_generator_config();
1305 let service = SynthService::new(config);
1306
1307 let request = BulkGenerateRequest {
1308 entry_count: 2_000_000, include_master_data: false,
1310 inject_anomalies: false,
1311 output_format: 0,
1312 config: None,
1313 };
1314
1315 let result = service.bulk_generate(Request::new(request)).await;
1316 assert!(result.is_err());
1317 let err = result.err().unwrap();
1318 assert!(err.message().contains("exceeds maximum allowed value"));
1319 }
1320
1321 #[tokio::test]
1322 async fn test_stream_data_events_per_second_too_low() {
1323 let config = default_generator_config();
1324 let service = SynthService::new(config);
1325
1326 let request = StreamDataRequest {
1327 events_per_second: 0, max_events: 100,
1329 inject_anomalies: false,
1330 anomaly_rate: 0.0,
1331 config: None,
1332 };
1333
1334 let result = service.stream_data(Request::new(request)).await;
1335 assert!(result.is_err());
1336 let err = result.err().unwrap();
1337 assert!(err.message().contains("must be at least"));
1338 }
1339
1340 #[tokio::test]
1341 async fn test_stream_data_events_per_second_too_high() {
1342 let config = default_generator_config();
1343 let service = SynthService::new(config);
1344
1345 let request = StreamDataRequest {
1346 events_per_second: 20_000, max_events: 100,
1348 inject_anomalies: false,
1349 anomaly_rate: 0.0,
1350 config: None,
1351 };
1352
1353 let result = service.stream_data(Request::new(request)).await;
1354 assert!(result.is_err());
1355 let err = result.err().unwrap();
1356 assert!(err.message().contains("exceeds maximum allowed value"));
1357 }
1358
1359 #[tokio::test]
1360 async fn test_stream_data_max_events_too_high() {
1361 let config = default_generator_config();
1362 let service = SynthService::new(config);
1363
1364 let request = StreamDataRequest {
1365 events_per_second: 100,
1366 max_events: 100_000_000, inject_anomalies: false,
1368 anomaly_rate: 0.0,
1369 config: None,
1370 };
1371
1372 let result = service.stream_data(Request::new(request)).await;
1373 assert!(result.is_err());
1374 let err = result.err().unwrap();
1375 assert!(err.message().contains("max_events"));
1376 }
1377
1378 #[tokio::test]
1379 async fn test_stream_data_valid_request() {
1380 let config = default_generator_config();
1381 let service = SynthService::new(config);
1382
1383 let request = StreamDataRequest {
1384 events_per_second: 10,
1385 max_events: 5,
1386 inject_anomalies: false,
1387 anomaly_rate: 0.0,
1388 config: None,
1389 };
1390
1391 let result = service.stream_data(Request::new(request)).await;
1394 assert!(result.is_ok());
1395 }
1396}