1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({}/{}). Try again later.",
135 active, max
136 )));
137 }
138
139 match self.resource_guard.check() {
141 Ok(level) => {
142 if level == DegradationLevel::Emergency {
143 Err(Status::resource_exhausted(
144 "Server resources critically low. Generation not possible.",
145 ))
146 } else if level == DegradationLevel::Minimal {
147 warn!("Resources constrained, generation may be limited");
148 Ok(level)
149 } else {
150 Ok(level)
151 }
152 }
153 Err(e) => Err(Status::resource_exhausted(format!(
154 "Resource check failed: {}",
155 e
156 ))),
157 }
158 }
159
160 pub fn resource_status(&self) -> ResourceStatus {
162 let stats = self.resource_guard.stats();
163 ResourceStatus {
164 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
165 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
166 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
167 degradation_level: stats.degradation_level.name().to_string(),
168 active_generations: self.active_streams.load(Ordering::Relaxed),
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
175pub struct ResourceStatus {
176 pub memory_usage_mb: u64,
177 pub memory_peak_mb: u64,
178 pub disk_available_mb: u64,
179 pub degradation_level: String,
180 pub active_generations: u64,
181}
182
183pub struct SynthService {
185 pub state: Arc<ServerState>,
186}
187
188impl SynthService {
189 pub fn new(config: GeneratorConfig) -> Self {
190 Self {
191 state: Arc::new(ServerState::new(config)),
192 }
193 }
194
195 pub fn with_state(state: Arc<ServerState>) -> Self {
196 Self { state }
197 }
198
199 async fn proto_to_config(
201 &self,
202 proto: Option<GenerationConfig>,
203 ) -> Result<GeneratorConfig, Status> {
204 match proto {
205 Some(p) => {
206 let industry = match p.industry.to_lowercase().as_str() {
207 "manufacturing" => IndustrySector::Manufacturing,
208 "retail" => IndustrySector::Retail,
209 "financial_services" | "financial" => IndustrySector::FinancialServices,
210 "healthcare" => IndustrySector::Healthcare,
211 "technology" => IndustrySector::Technology,
212 "" => IndustrySector::Manufacturing, other => {
214 return Err(Status::invalid_argument(format!(
215 "Unknown industry '{}'. Valid values: manufacturing, retail, financial_services, healthcare, technology",
216 other
217 )));
218 }
219 };
220
221 let complexity = match p.coa_complexity.to_lowercase().as_str() {
222 "small" => CoAComplexity::Small,
223 "medium" => CoAComplexity::Medium,
224 "large" => CoAComplexity::Large,
225 "" => CoAComplexity::Small, other => {
227 return Err(Status::invalid_argument(format!(
228 "Unknown coa_complexity '{}'. Valid values: small, medium, large",
229 other
230 )));
231 }
232 };
233
234 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
235 vec![CompanyConfig {
236 code: "1000".to_string(),
237 name: "Default Company".to_string(),
238 currency: "USD".to_string(),
239 country: "US".to_string(),
240 annual_transaction_volume: TransactionVolume::TenK,
241 volume_weight: 1.0,
242 fiscal_year_variant: "K4".to_string(),
243 }]
244 } else {
245 p.companies
246 .into_iter()
247 .map(|c| CompanyConfig {
248 code: c.code,
249 name: c.name,
250 currency: c.currency,
251 country: c.country,
252 annual_transaction_volume: TransactionVolume::Custom(
253 c.annual_transaction_volume,
254 ),
255 volume_weight: c.volume_weight as f64,
256 fiscal_year_variant: "K4".to_string(),
257 })
258 .collect()
259 };
260
261 let mut config = GeneratorConfig {
262 global: GlobalConfig {
263 seed: if p.seed > 0 { Some(p.seed) } else { None },
264 industry,
265 start_date: if p.start_date.is_empty() {
266 "2024-01-01".to_string()
267 } else {
268 p.start_date
269 },
270 period_months: if p.period_months == 0 {
271 12
272 } else {
273 p.period_months
274 },
275 group_currency: "USD".to_string(),
276 parallel: true,
277 worker_threads: 0,
278 memory_limit_mb: 0,
279 fiscal_year_months: None,
280 },
281 companies,
282 chart_of_accounts: ChartOfAccountsConfig {
283 complexity,
284 industry_specific: true,
285 custom_accounts: None,
286 min_hierarchy_depth: 2,
287 max_hierarchy_depth: 5,
288 },
289 ..default_generator_config()
290 };
291
292 if p.fraud_enabled {
294 config.fraud.enabled = true;
295 config.fraud.fraud_rate = p.fraud_rate as f64;
296 }
297
298 Ok(config)
299 }
300 None => {
301 let config = self.state.config.read().await;
303 Ok(config.clone())
304 }
305 }
306 }
307
308 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
310 JournalEntryProto {
311 document_id: entry.header.document_id.to_string(),
312 company_code: entry.header.company_code.clone(),
313 fiscal_year: entry.header.fiscal_year as u32,
314 fiscal_period: entry.header.fiscal_period as u32,
315 posting_date: entry.header.posting_date.to_string(),
316 document_date: entry.header.document_date.to_string(),
317 created_at: entry.header.created_at.to_rfc3339(),
318 source: format!("{:?}", entry.header.source),
319 business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
320 lines: entry
321 .lines
322 .iter()
323 .map(|line| {
324 let amount = if line.is_debit() {
325 line.debit_amount
326 } else {
327 line.credit_amount
328 };
329 JournalLineProto {
330 line_number: line.line_number,
331 account_number: line.gl_account.clone(),
332 account_name: line.account_description.clone().unwrap_or_default(),
333 amount: amount.to_string(),
334 is_debit: line.is_debit(),
335 cost_center: line.cost_center.clone(),
336 profit_center: line.profit_center.clone(),
337 vendor_id: line.auxiliary_account_number.clone(),
340 customer_id: None,
341 material_id: None,
342 text: line.line_text.clone().or_else(|| line.text.clone()),
343 }
344 })
345 .collect(),
346 is_anomaly: entry.header.is_fraud,
347 anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
348 }
349 }
350
351 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
353 GenerationConfig {
354 industry: format!("{:?}", config.global.industry),
355 start_date: config.global.start_date.clone(),
356 period_months: config.global.period_months,
357 seed: config.global.seed.unwrap_or(0),
358 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
359 companies: config
360 .companies
361 .iter()
362 .map(|c| CompanyConfigProto {
363 code: c.code.clone(),
364 name: c.name.clone(),
365 currency: c.currency.clone(),
366 country: c.country.clone(),
367 annual_transaction_volume: c.annual_transaction_volume.count(),
368 volume_weight: c.volume_weight as f32,
369 })
370 .collect(),
371 fraud_enabled: config.fraud.enabled,
372 fraud_rate: config.fraud.fraud_rate as f32,
373 generate_master_data: config.master_data.vendors.count > 0
374 || config.master_data.customers.count > 0
375 || config.master_data.materials.count > 0,
376 generate_document_flows: config.document_flows.p2p.enabled
377 || config.document_flows.o2c.enabled,
378 }
379 }
380}
381
382#[tonic::async_trait]
383impl synthetic_data_service_server::SyntheticDataService for SynthService {
384 async fn bulk_generate(
386 &self,
387 request: Request<BulkGenerateRequest>,
388 ) -> Result<Response<BulkGenerateResponse>, Status> {
389 let req = request.into_inner();
390
391 const MAX_ENTRY_COUNT: u64 = 1_000_000;
393 if req.entry_count > MAX_ENTRY_COUNT {
394 return Err(Status::invalid_argument(format!(
395 "entry_count ({}) exceeds maximum allowed value ({})",
396 req.entry_count, MAX_ENTRY_COUNT
397 )));
398 }
399
400 let degradation_level = self.state.check_resources()?;
402 if degradation_level != DegradationLevel::Normal {
403 warn!(
404 "Starting bulk generation under resource pressure (level: {:?})",
405 degradation_level
406 );
407 }
408
409 info!("Bulk generate request: {} entries", req.entry_count);
410
411 let config = self.proto_to_config(req.config).await?;
412 let start_time = Instant::now();
413
414 let phase_config = PhaseConfig {
416 generate_master_data: req.include_master_data,
417 generate_document_flows: false,
418 generate_journal_entries: true,
419 inject_anomalies: req.inject_anomalies,
420 show_progress: false,
421 ..Default::default()
422 };
423
424 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
425 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
426
427 let result = orchestrator
428 .generate()
429 .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
430
431 let duration_ms = start_time.elapsed().as_millis() as u64;
432
433 let entries_count = result.journal_entries.len() as u64;
435 self.state
436 .total_entries
437 .fetch_add(entries_count, Ordering::Relaxed);
438
439 let anomaly_count = result.anomaly_labels.labels.len() as u64;
440 self.state
441 .total_anomalies
442 .fetch_add(anomaly_count, Ordering::Relaxed);
443
444 let journal_entries: Vec<JournalEntryProto> = result
446 .journal_entries
447 .iter()
448 .map(Self::journal_entry_to_proto)
449 .collect();
450
451 let anomaly_labels: Vec<AnomalyLabelProto> = result
452 .anomaly_labels
453 .labels
454 .iter()
455 .map(|a| AnomalyLabelProto {
456 anomaly_id: a.anomaly_id.clone(),
457 document_id: a.document_id.clone(),
458 anomaly_type: format!("{:?}", a.anomaly_type),
459 anomaly_category: a.document_type.clone(),
460 description: a.description.clone(),
461 severity_score: a.severity as f32,
462 })
463 .collect();
464
465 let mut total_debit = rust_decimal::Decimal::ZERO;
467 let mut total_credit = rust_decimal::Decimal::ZERO;
468 let mut total_lines = 0u64;
469 let mut entries_by_company = std::collections::HashMap::new();
470 let mut entries_by_source = std::collections::HashMap::new();
471
472 for entry in &result.journal_entries {
473 *entries_by_company
474 .entry(entry.header.company_code.clone())
475 .or_insert(0u64) += 1;
476 *entries_by_source
477 .entry(format!("{:?}", entry.header.source))
478 .or_insert(0u64) += 1;
479
480 for line in &entry.lines {
481 total_lines += 1;
482 total_debit += line.debit_amount;
483 total_credit += line.credit_amount;
484 }
485 }
486
487 let stats = GenerationStats {
488 total_entries: entries_count,
489 total_lines,
490 total_debit_amount: total_debit.to_string(),
491 total_credit_amount: total_credit.to_string(),
492 anomaly_count,
493 entries_by_company,
494 entries_by_source,
495 };
496
497 info!(
498 "Bulk generation complete: {} entries in {}ms",
499 entries_count, duration_ms
500 );
501
502 Ok(Response::new(BulkGenerateResponse {
503 entries_generated: entries_count,
504 duration_ms,
505 journal_entries,
506 anomaly_labels,
507 stats: Some(stats),
508 }))
509 }
510
511 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
512
513 async fn stream_data(
515 &self,
516 request: Request<StreamDataRequest>,
517 ) -> Result<Response<Self::StreamDataStream>, Status> {
518 let req = request.into_inner();
519
520 const MIN_EVENTS_PER_SECOND: u32 = 1;
522 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
523 if req.events_per_second < MIN_EVENTS_PER_SECOND {
524 return Err(Status::invalid_argument(format!(
525 "events_per_second ({}) must be at least {}",
526 req.events_per_second, MIN_EVENTS_PER_SECOND
527 )));
528 }
529 if req.events_per_second > MAX_EVENTS_PER_SECOND {
530 return Err(Status::invalid_argument(format!(
531 "events_per_second ({}) exceeds maximum allowed value ({})",
532 req.events_per_second, MAX_EVENTS_PER_SECOND
533 )));
534 }
535
536 const MAX_STREAM_EVENTS: u64 = 10_000_000;
538 if req.max_events > MAX_STREAM_EVENTS {
539 return Err(Status::invalid_argument(format!(
540 "max_events ({}) exceeds maximum allowed value ({})",
541 req.max_events, MAX_STREAM_EVENTS
542 )));
543 }
544
545 let degradation_level = self.state.check_resources()?;
547 if degradation_level != DegradationLevel::Normal {
548 warn!(
549 "Starting stream under resource pressure (level: {:?})",
550 degradation_level
551 );
552 }
553
554 info!(
555 "Stream data request: {} events/sec, max {}",
556 req.events_per_second, req.max_events
557 );
558
559 let config = self.proto_to_config(req.config).await?;
560 let state = self.state.clone();
561
562 state.active_streams.fetch_add(1, Ordering::Relaxed);
564
565 state.stream_paused.store(false, Ordering::Relaxed);
567 state.stream_stopped.store(false, Ordering::Relaxed);
568
569 let (tx, rx) = mpsc::channel(100);
570
571 let events_per_second = req.events_per_second;
573 let max_events = req.max_events;
574 let inject_anomalies = req.inject_anomalies;
575
576 tokio::spawn(async move {
577 let phase_config = PhaseConfig {
578 generate_master_data: false,
579 generate_document_flows: false,
580 generate_journal_entries: true,
581 inject_anomalies,
582 show_progress: false,
583 ..Default::default()
584 };
585
586 let mut sequence = 0u64;
587 let delay = if events_per_second > 0 {
588 Duration::from_micros(1_000_000 / events_per_second as u64)
589 } else {
590 Duration::from_millis(1)
591 };
592
593 let mut orchestrator =
595 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
596 Ok(o) => o,
597 Err(e) => {
598 error!("Failed to create orchestrator: {}", e);
599 return;
600 }
601 };
602
603 loop {
604 if state.stream_stopped.load(Ordering::Relaxed) {
606 info!("Stream stopped by control command");
607 break;
608 }
609
610 while state.stream_paused.load(Ordering::Relaxed) {
612 tokio::time::sleep(Duration::from_millis(100)).await;
613 if state.stream_stopped.load(Ordering::Relaxed) {
614 break;
615 }
616 }
617
618 if max_events > 0 && sequence >= max_events {
620 info!("Stream reached max events: {}", max_events);
621 break;
622 }
623
624 let result = match orchestrator.generate() {
626 Ok(r) => r,
627 Err(e) => {
628 error!("Generation failed: {}", e);
629 break;
630 }
631 };
632
633 for entry in result.journal_entries {
635 sequence += 1;
636 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
637 state.total_entries.fetch_add(1, Ordering::Relaxed);
638
639 let timestamp = Timestamp {
640 seconds: Utc::now().timestamp(),
641 nanos: 0,
642 };
643
644 let event = DataEvent {
645 sequence,
646 timestamp: Some(timestamp),
647 event: Some(data_event::Event::JournalEntry(
648 SynthService::journal_entry_to_proto(&entry),
649 )),
650 };
651
652 if tx.send(Ok(event)).await.is_err() {
653 info!("Stream receiver dropped");
654 break;
655 }
656
657 tokio::time::sleep(delay).await;
659
660 if max_events > 0 && sequence >= max_events {
662 break;
663 }
664 }
665 }
666
667 state.active_streams.fetch_sub(1, Ordering::Relaxed);
669 });
670
671 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
672 }
673
674 async fn control(
676 &self,
677 request: Request<ControlCommand>,
678 ) -> Result<Response<ControlResponse>, Status> {
679 let cmd = request.into_inner();
680 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
681
682 info!("Control command: {:?}", action);
683
684 let (success, message, status) = match action {
685 ControlAction::Pause => {
686 self.state.stream_paused.store(true, Ordering::Relaxed);
687 (true, "Stream paused".to_string(), StreamStatus::Paused)
688 }
689 ControlAction::Resume => {
690 self.state.stream_paused.store(false, Ordering::Relaxed);
691 (true, "Stream resumed".to_string(), StreamStatus::Running)
692 }
693 ControlAction::Stop => {
694 self.state.stream_stopped.store(true, Ordering::Relaxed);
695 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
696 }
697 ControlAction::TriggerPattern => {
698 let pattern = cmd.pattern_name.unwrap_or_default();
699 if pattern.is_empty() {
700 (
701 false,
702 "Pattern name is required for TriggerPattern action".to_string(),
703 StreamStatus::Running,
704 )
705 } else {
706 let valid_patterns = [
709 "year_end_spike",
710 "period_end_spike",
711 "holiday_cluster",
712 "fraud_cluster",
713 "error_cluster",
714 "uniform",
715 ];
716 let is_valid = valid_patterns.contains(&pattern.as_str())
717 || pattern.starts_with("custom:");
718
719 if is_valid {
720 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
722 *triggered = Some(pattern.clone());
723 }
724 info!("Pattern trigger activated: {}", pattern);
725 (
726 true,
727 format!("Pattern '{}' will be applied to upcoming entries", pattern),
728 StreamStatus::Running,
729 )
730 } else {
731 (
732 false,
733 format!(
734 "Unknown pattern '{}'. Valid patterns: {:?}",
735 pattern, valid_patterns
736 ),
737 StreamStatus::Running,
738 )
739 }
740 }
741 }
742 ControlAction::Unspecified => (
743 false,
744 "Unknown control action".to_string(),
745 StreamStatus::Unspecified,
746 ),
747 };
748
749 Ok(Response::new(ControlResponse {
750 success,
751 message,
752 current_status: status as i32,
753 }))
754 }
755
756 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
758 let config = self.state.config.read().await;
759 let proto_config = Self::config_to_proto(&config);
760
761 Ok(Response::new(ConfigResponse {
762 success: true,
763 message: "Current configuration retrieved".to_string(),
764 current_config: Some(proto_config),
765 }))
766 }
767
768 async fn set_config(
770 &self,
771 request: Request<ConfigRequest>,
772 ) -> Result<Response<ConfigResponse>, Status> {
773 let req = request.into_inner();
774
775 if let Some(proto_config) = req.config {
776 let new_config = self.proto_to_config(Some(proto_config)).await?;
777
778 let mut config = self.state.config.write().await;
779 *config = new_config.clone();
780
781 info!("Configuration updated");
782
783 Ok(Response::new(ConfigResponse {
784 success: true,
785 message: "Configuration updated".to_string(),
786 current_config: Some(Self::config_to_proto(&new_config)),
787 }))
788 } else {
789 Err(Status::invalid_argument("No configuration provided"))
790 }
791 }
792
793 async fn get_metrics(
795 &self,
796 _request: Request<()>,
797 ) -> Result<Response<MetricsResponse>, Status> {
798 let uptime = self.state.uptime_seconds();
799 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
800
801 let entries_per_second = if uptime > 0 {
802 total_entries as f64 / uptime as f64
803 } else {
804 0.0
805 };
806
807 Ok(Response::new(MetricsResponse {
808 total_entries_generated: total_entries,
809 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
810 uptime_seconds: uptime,
811 session_entries: total_entries,
812 session_entries_per_second: entries_per_second,
813 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
814 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
815 }))
816 }
817
818 async fn health_check(
820 &self,
821 _request: Request<()>,
822 ) -> Result<Response<HealthResponse>, Status> {
823 Ok(Response::new(HealthResponse {
824 healthy: true,
825 version: env!("CARGO_PKG_VERSION").to_string(),
826 uptime_seconds: self.state.uptime_seconds(),
827 }))
828 }
829}
830
831pub fn default_generator_config() -> GeneratorConfig {
833 GeneratorConfig {
834 global: GlobalConfig {
835 seed: None,
836 industry: IndustrySector::Manufacturing,
837 start_date: "2024-01-01".to_string(),
838 period_months: 12,
839 group_currency: "USD".to_string(),
840 parallel: true,
841 worker_threads: 0,
842 memory_limit_mb: 0,
843 fiscal_year_months: None,
844 },
845 companies: vec![CompanyConfig {
846 code: "1000".to_string(),
847 name: "Default Company".to_string(),
848 currency: "USD".to_string(),
849 country: "US".to_string(),
850 annual_transaction_volume: TransactionVolume::TenK,
851 volume_weight: 1.0,
852 fiscal_year_variant: "K4".to_string(),
853 }],
854 chart_of_accounts: ChartOfAccountsConfig {
855 complexity: CoAComplexity::Small,
856 industry_specific: true,
857 custom_accounts: None,
858 min_hierarchy_depth: 2,
859 max_hierarchy_depth: 5,
860 },
861 transactions: Default::default(),
862 output: OutputConfig::default(),
863 fraud: Default::default(),
864 internal_controls: Default::default(),
865 business_processes: Default::default(),
866 user_personas: Default::default(),
867 templates: Default::default(),
868 approval: Default::default(),
869 departments: Default::default(),
870 master_data: Default::default(),
871 document_flows: Default::default(),
872 intercompany: Default::default(),
873 balance: Default::default(),
874 ocpm: Default::default(),
875 audit: Default::default(),
876 banking: Default::default(),
877 data_quality: Default::default(),
878 scenario: Default::default(),
879 temporal: Default::default(),
880 graph_export: Default::default(),
881 streaming: Default::default(),
882 rate_limit: Default::default(),
883 temporal_attributes: Default::default(),
884 relationships: Default::default(),
885 accounting_standards: Default::default(),
886 audit_standards: Default::default(),
887 distributions: Default::default(),
888 temporal_patterns: Default::default(),
889 vendor_network: Default::default(),
890 customer_segmentation: Default::default(),
891 relationship_strength: Default::default(),
892 cross_process_links: Default::default(),
893 organizational_events: Default::default(),
894 behavioral_drift: Default::default(),
895 market_drift: Default::default(),
896 drift_labeling: Default::default(),
897 anomaly_injection: Default::default(),
898 industry_specific: Default::default(),
899 fingerprint_privacy: Default::default(),
900 quality_gates: Default::default(),
901 compliance: Default::default(),
902 webhooks: Default::default(),
903 llm: Default::default(),
904 diffusion: Default::default(),
905 causal: Default::default(),
906 source_to_pay: Default::default(),
907 financial_reporting: Default::default(),
908 hr: Default::default(),
909 manufacturing: Default::default(),
910 sales_quotes: Default::default(),
911 tax: Default::default(),
912 treasury: Default::default(),
913 project_accounting: Default::default(),
914 esg: Default::default(),
915 country_packs: None,
916 scenarios: Default::default(),
917 session: Default::default(),
918 }
919}
920
921#[cfg(test)]
922#[allow(clippy::unwrap_used)]
923mod tests {
924 use super::*;
925 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
926
927 #[tokio::test]
932 async fn test_service_creation() {
933 let config = default_generator_config();
934 let service = SynthService::new(config);
935 assert!(service.state.uptime_seconds() < 60);
937 }
938
939 #[tokio::test]
940 async fn test_service_with_state() {
941 let config = default_generator_config();
942 let state = Arc::new(ServerState::new(config));
943 let service = SynthService::with_state(Arc::clone(&state));
944
945 state.total_entries.store(100, Ordering::Relaxed);
947 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
948 }
949
950 #[tokio::test]
955 async fn test_health_check() {
956 let config = default_generator_config();
957 let service = SynthService::new(config);
958
959 let response = service.health_check(Request::new(())).await.unwrap();
960 let health = response.into_inner();
961
962 assert!(health.healthy);
963 assert!(!health.version.is_empty());
964 }
965
966 #[tokio::test]
967 async fn test_health_check_returns_version() {
968 let config = default_generator_config();
969 let service = SynthService::new(config);
970
971 let response = service.health_check(Request::new(())).await.unwrap();
972 let health = response.into_inner();
973
974 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
975 }
976
977 #[tokio::test]
982 async fn test_get_config() {
983 let config = default_generator_config();
984 let service = SynthService::new(config);
985
986 let response = service.get_config(Request::new(())).await.unwrap();
987 let config_response = response.into_inner();
988
989 assert!(config_response.success);
990 assert!(config_response.current_config.is_some());
991 }
992
993 #[tokio::test]
994 async fn test_get_config_returns_industry() {
995 let config = default_generator_config();
996 let service = SynthService::new(config);
997
998 let response = service.get_config(Request::new(())).await.unwrap();
999 let config_response = response.into_inner();
1000 let current = config_response.current_config.unwrap();
1001
1002 assert_eq!(current.industry, "Manufacturing");
1003 }
1004
1005 #[tokio::test]
1006 async fn test_set_config() {
1007 let config = default_generator_config();
1008 let service = SynthService::new(config);
1009
1010 let new_config = GenerationConfig {
1011 industry: "retail".to_string(),
1012 start_date: "2024-06-01".to_string(),
1013 period_months: 6,
1014 seed: 42,
1015 coa_complexity: "medium".to_string(),
1016 companies: vec![],
1017 fraud_enabled: true,
1018 fraud_rate: 0.05,
1019 generate_master_data: false,
1020 generate_document_flows: false,
1021 };
1022
1023 let response = service
1024 .set_config(Request::new(ConfigRequest {
1025 config: Some(new_config),
1026 }))
1027 .await
1028 .unwrap();
1029 let config_response = response.into_inner();
1030
1031 assert!(config_response.success);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_set_config_without_config_fails() {
1036 let config = default_generator_config();
1037 let service = SynthService::new(config);
1038
1039 let result = service
1040 .set_config(Request::new(ConfigRequest { config: None }))
1041 .await;
1042
1043 assert!(result.is_err());
1044 }
1045
1046 #[tokio::test]
1051 async fn test_get_metrics_initial() {
1052 let config = default_generator_config();
1053 let service = SynthService::new(config);
1054
1055 let response = service.get_metrics(Request::new(())).await.unwrap();
1056 let metrics = response.into_inner();
1057
1058 assert_eq!(metrics.total_entries_generated, 0);
1059 assert_eq!(metrics.total_anomalies_injected, 0);
1060 assert_eq!(metrics.active_streams, 0);
1061 }
1062
1063 #[tokio::test]
1064 async fn test_get_metrics_after_updates() {
1065 let config = default_generator_config();
1066 let service = SynthService::new(config);
1067
1068 service.state.total_entries.store(1000, Ordering::Relaxed);
1070 service.state.total_anomalies.store(20, Ordering::Relaxed);
1071 service.state.active_streams.store(2, Ordering::Relaxed);
1072
1073 let response = service.get_metrics(Request::new(())).await.unwrap();
1074 let metrics = response.into_inner();
1075
1076 assert_eq!(metrics.total_entries_generated, 1000);
1077 assert_eq!(metrics.total_anomalies_injected, 20);
1078 assert_eq!(metrics.active_streams, 2);
1079 }
1080
1081 #[tokio::test]
1086 async fn test_control_pause() {
1087 let config = default_generator_config();
1088 let service = SynthService::new(config);
1089
1090 let response = service
1091 .control(Request::new(ControlCommand {
1092 action: ControlAction::Pause as i32,
1093 pattern_name: None,
1094 }))
1095 .await
1096 .unwrap();
1097 let control_response = response.into_inner();
1098
1099 assert!(control_response.success);
1100 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1101 }
1102
1103 #[tokio::test]
1104 async fn test_control_resume() {
1105 let config = default_generator_config();
1106 let service = SynthService::new(config);
1107
1108 service.state.stream_paused.store(true, Ordering::Relaxed);
1110
1111 let response = service
1112 .control(Request::new(ControlCommand {
1113 action: ControlAction::Resume as i32,
1114 pattern_name: None,
1115 }))
1116 .await
1117 .unwrap();
1118 let control_response = response.into_inner();
1119
1120 assert!(control_response.success);
1121 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1122 }
1123
1124 #[tokio::test]
1125 async fn test_control_stop() {
1126 let config = default_generator_config();
1127 let service = SynthService::new(config);
1128
1129 let response = service
1130 .control(Request::new(ControlCommand {
1131 action: ControlAction::Stop as i32,
1132 pattern_name: None,
1133 }))
1134 .await
1135 .unwrap();
1136 let control_response = response.into_inner();
1137
1138 assert!(control_response.success);
1139 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1140 }
1141
1142 #[test]
1147 fn test_server_state_creation() {
1148 let config = default_generator_config();
1149 let state = ServerState::new(config);
1150
1151 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1152 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1153 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1154 assert!(!state.stream_paused.load(Ordering::Relaxed));
1155 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1156 }
1157
1158 #[test]
1159 fn test_server_state_uptime() {
1160 let config = default_generator_config();
1161 let state = ServerState::new(config);
1162
1163 assert!(state.uptime_seconds() < 60);
1165 }
1166
1167 #[test]
1172 fn test_default_generator_config() {
1173 let config = default_generator_config();
1174
1175 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1176 assert_eq!(config.global.period_months, 12);
1177 assert!(!config.companies.is_empty());
1178 assert_eq!(config.companies[0].code, "1000");
1179 }
1180
1181 #[test]
1182 fn test_config_to_proto() {
1183 let config = default_generator_config();
1184 let proto = SynthService::config_to_proto(&config);
1185
1186 assert_eq!(proto.industry, "Manufacturing");
1187 assert_eq!(proto.period_months, 12);
1188 assert!(!proto.companies.is_empty());
1189 }
1190
1191 #[tokio::test]
1192 async fn test_proto_to_config_with_none() {
1193 let config = default_generator_config();
1194 let service = SynthService::new(config.clone());
1195
1196 let result = service.proto_to_config(None).await.unwrap();
1197
1198 assert_eq!(result.global.industry, config.global.industry);
1200 }
1201
1202 #[tokio::test]
1203 async fn test_proto_to_config_with_retail() {
1204 let config = default_generator_config();
1205 let service = SynthService::new(config);
1206
1207 let proto = GenerationConfig {
1208 industry: "retail".to_string(),
1209 start_date: "2024-01-01".to_string(),
1210 period_months: 6,
1211 seed: 0,
1212 coa_complexity: "large".to_string(),
1213 companies: vec![],
1214 fraud_enabled: false,
1215 fraud_rate: 0.0,
1216 generate_master_data: false,
1217 generate_document_flows: false,
1218 };
1219
1220 let result = service.proto_to_config(Some(proto)).await.unwrap();
1221
1222 assert_eq!(result.global.industry, IndustrySector::Retail);
1223 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_proto_to_config_with_healthcare() {
1228 let config = default_generator_config();
1229 let service = SynthService::new(config);
1230
1231 let proto = GenerationConfig {
1232 industry: "healthcare".to_string(),
1233 start_date: "2024-01-01".to_string(),
1234 period_months: 12,
1235 seed: 42,
1236 coa_complexity: "small".to_string(),
1237 companies: vec![],
1238 fraud_enabled: true,
1239 fraud_rate: 0.1,
1240 generate_master_data: true,
1241 generate_document_flows: true,
1242 };
1243
1244 let result = service.proto_to_config(Some(proto)).await.unwrap();
1245
1246 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1247 assert_eq!(result.global.seed, Some(42));
1248 assert!(result.fraud.enabled);
1249 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1250 }
1251
1252 #[tokio::test]
1253 async fn test_proto_to_config_with_companies() {
1254 let config = default_generator_config();
1255 let service = SynthService::new(config);
1256
1257 let proto = GenerationConfig {
1258 industry: "technology".to_string(),
1259 start_date: "2024-01-01".to_string(),
1260 period_months: 12,
1261 seed: 0,
1262 coa_complexity: "medium".to_string(),
1263 companies: vec![
1264 CompanyConfigProto {
1265 code: "1000".to_string(),
1266 name: "Parent Corp".to_string(),
1267 currency: "USD".to_string(),
1268 country: "US".to_string(),
1269 annual_transaction_volume: 100000,
1270 volume_weight: 1.0,
1271 },
1272 CompanyConfigProto {
1273 code: "2000".to_string(),
1274 name: "EU Sub".to_string(),
1275 currency: "EUR".to_string(),
1276 country: "DE".to_string(),
1277 annual_transaction_volume: 50000,
1278 volume_weight: 0.5,
1279 },
1280 ],
1281 fraud_enabled: false,
1282 fraud_rate: 0.0,
1283 generate_master_data: false,
1284 generate_document_flows: false,
1285 };
1286
1287 let result = service.proto_to_config(Some(proto)).await.unwrap();
1288
1289 assert_eq!(result.companies.len(), 2);
1290 assert_eq!(result.companies[0].code, "1000");
1291 assert_eq!(result.companies[1].currency, "EUR");
1292 }
1293
1294 #[tokio::test]
1299 async fn test_bulk_generate_entry_count_validation() {
1300 let config = default_generator_config();
1301 let service = SynthService::new(config);
1302
1303 let request = BulkGenerateRequest {
1304 entry_count: 2_000_000, include_master_data: false,
1306 inject_anomalies: false,
1307 output_format: 0,
1308 config: None,
1309 };
1310
1311 let result = service.bulk_generate(Request::new(request)).await;
1312 assert!(result.is_err());
1313 let err = result.err().unwrap();
1314 assert!(err.message().contains("exceeds maximum allowed value"));
1315 }
1316
1317 #[tokio::test]
1318 async fn test_stream_data_events_per_second_too_low() {
1319 let config = default_generator_config();
1320 let service = SynthService::new(config);
1321
1322 let request = StreamDataRequest {
1323 events_per_second: 0, max_events: 100,
1325 inject_anomalies: false,
1326 anomaly_rate: 0.0,
1327 config: None,
1328 };
1329
1330 let result = service.stream_data(Request::new(request)).await;
1331 assert!(result.is_err());
1332 let err = result.err().unwrap();
1333 assert!(err.message().contains("must be at least"));
1334 }
1335
1336 #[tokio::test]
1337 async fn test_stream_data_events_per_second_too_high() {
1338 let config = default_generator_config();
1339 let service = SynthService::new(config);
1340
1341 let request = StreamDataRequest {
1342 events_per_second: 20_000, max_events: 100,
1344 inject_anomalies: false,
1345 anomaly_rate: 0.0,
1346 config: None,
1347 };
1348
1349 let result = service.stream_data(Request::new(request)).await;
1350 assert!(result.is_err());
1351 let err = result.err().unwrap();
1352 assert!(err.message().contains("exceeds maximum allowed value"));
1353 }
1354
1355 #[tokio::test]
1356 async fn test_stream_data_max_events_too_high() {
1357 let config = default_generator_config();
1358 let service = SynthService::new(config);
1359
1360 let request = StreamDataRequest {
1361 events_per_second: 100,
1362 max_events: 100_000_000, inject_anomalies: false,
1364 anomaly_rate: 0.0,
1365 config: None,
1366 };
1367
1368 let result = service.stream_data(Request::new(request)).await;
1369 assert!(result.is_err());
1370 let err = result.err().unwrap();
1371 assert!(err.message().contains("max_events"));
1372 }
1373
1374 #[tokio::test]
1375 async fn test_stream_data_valid_request() {
1376 let config = default_generator_config();
1377 let service = SynthService::new(config);
1378
1379 let request = StreamDataRequest {
1380 events_per_second: 10,
1381 max_events: 5,
1382 inject_anomalies: false,
1383 anomaly_rate: 0.0,
1384 config: None,
1385 };
1386
1387 let result = service.stream_data(Request::new(request)).await;
1390 assert!(result.is_ok());
1391 }
1392}