1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({}/{}). Try again later.",
135 active, max
136 )));
137 }
138
139 match self.resource_guard.check() {
141 Ok(level) => {
142 if level == DegradationLevel::Emergency {
143 Err(Status::resource_exhausted(
144 "Server resources critically low. Generation not possible.",
145 ))
146 } else if level == DegradationLevel::Minimal {
147 warn!("Resources constrained, generation may be limited");
148 Ok(level)
149 } else {
150 Ok(level)
151 }
152 }
153 Err(e) => Err(Status::resource_exhausted(format!(
154 "Resource check failed: {}",
155 e
156 ))),
157 }
158 }
159
160 pub fn resource_status(&self) -> ResourceStatus {
162 let stats = self.resource_guard.stats();
163 ResourceStatus {
164 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
165 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
166 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
167 degradation_level: stats.degradation_level.name().to_string(),
168 active_generations: self.active_streams.load(Ordering::Relaxed),
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
175pub struct ResourceStatus {
176 pub memory_usage_mb: u64,
177 pub memory_peak_mb: u64,
178 pub disk_available_mb: u64,
179 pub degradation_level: String,
180 pub active_generations: u64,
181}
182
183pub struct SynthService {
185 pub state: Arc<ServerState>,
186}
187
188impl SynthService {
189 pub fn new(config: GeneratorConfig) -> Self {
190 Self {
191 state: Arc::new(ServerState::new(config)),
192 }
193 }
194
195 pub fn with_state(state: Arc<ServerState>) -> Self {
196 Self { state }
197 }
198
199 async fn proto_to_config(
201 &self,
202 proto: Option<GenerationConfig>,
203 ) -> Result<GeneratorConfig, Status> {
204 match proto {
205 Some(p) => {
206 let industry = match p.industry.to_lowercase().as_str() {
207 "manufacturing" => IndustrySector::Manufacturing,
208 "retail" => IndustrySector::Retail,
209 "financial_services" | "financial" => IndustrySector::FinancialServices,
210 "healthcare" => IndustrySector::Healthcare,
211 "technology" => IndustrySector::Technology,
212 "" => IndustrySector::Manufacturing, other => {
214 return Err(Status::invalid_argument(format!(
215 "Unknown industry '{}'. Valid values: manufacturing, retail, financial_services, healthcare, technology",
216 other
217 )));
218 }
219 };
220
221 let complexity = match p.coa_complexity.to_lowercase().as_str() {
222 "small" => CoAComplexity::Small,
223 "medium" => CoAComplexity::Medium,
224 "large" => CoAComplexity::Large,
225 "" => CoAComplexity::Small, other => {
227 return Err(Status::invalid_argument(format!(
228 "Unknown coa_complexity '{}'. Valid values: small, medium, large",
229 other
230 )));
231 }
232 };
233
234 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
235 vec![CompanyConfig {
236 code: "1000".to_string(),
237 name: "Default Company".to_string(),
238 currency: "USD".to_string(),
239 country: "US".to_string(),
240 annual_transaction_volume: TransactionVolume::TenK,
241 volume_weight: 1.0,
242 fiscal_year_variant: "K4".to_string(),
243 }]
244 } else {
245 p.companies
246 .into_iter()
247 .map(|c| CompanyConfig {
248 code: c.code,
249 name: c.name,
250 currency: c.currency,
251 country: c.country,
252 annual_transaction_volume: TransactionVolume::Custom(
253 c.annual_transaction_volume,
254 ),
255 volume_weight: c.volume_weight as f64,
256 fiscal_year_variant: "K4".to_string(),
257 })
258 .collect()
259 };
260
261 let mut config = GeneratorConfig {
262 global: GlobalConfig {
263 seed: if p.seed > 0 { Some(p.seed) } else { None },
264 industry,
265 start_date: if p.start_date.is_empty() {
266 "2024-01-01".to_string()
267 } else {
268 p.start_date
269 },
270 period_months: if p.period_months == 0 {
271 12
272 } else {
273 p.period_months
274 },
275 group_currency: "USD".to_string(),
276 parallel: true,
277 worker_threads: 0,
278 memory_limit_mb: 0,
279 },
280 companies,
281 chart_of_accounts: ChartOfAccountsConfig {
282 complexity,
283 industry_specific: true,
284 custom_accounts: None,
285 min_hierarchy_depth: 2,
286 max_hierarchy_depth: 5,
287 },
288 ..default_generator_config()
289 };
290
291 if p.fraud_enabled {
293 config.fraud.enabled = true;
294 config.fraud.fraud_rate = p.fraud_rate as f64;
295 }
296
297 Ok(config)
298 }
299 None => {
300 let config = self.state.config.read().await;
302 Ok(config.clone())
303 }
304 }
305 }
306
307 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
309 JournalEntryProto {
310 document_id: entry.header.document_id.to_string(),
311 company_code: entry.header.company_code.clone(),
312 fiscal_year: entry.header.fiscal_year as u32,
313 fiscal_period: entry.header.fiscal_period as u32,
314 posting_date: entry.header.posting_date.to_string(),
315 document_date: entry.header.document_date.to_string(),
316 created_at: entry.header.created_at.to_rfc3339(),
317 source: format!("{:?}", entry.header.source),
318 business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
319 lines: entry
320 .lines
321 .iter()
322 .map(|line| {
323 let amount = if line.is_debit() {
324 line.debit_amount
325 } else {
326 line.credit_amount
327 };
328 JournalLineProto {
329 line_number: line.line_number,
330 account_number: line.gl_account.clone(),
331 account_name: line.account_description.clone().unwrap_or_default(),
332 amount: amount.to_string(),
333 is_debit: line.is_debit(),
334 cost_center: line.cost_center.clone(),
335 profit_center: line.profit_center.clone(),
336 vendor_id: line.auxiliary_account_number.clone(),
339 customer_id: None,
340 material_id: None,
341 text: line.line_text.clone().or_else(|| line.text.clone()),
342 }
343 })
344 .collect(),
345 is_anomaly: entry.header.is_fraud,
346 anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
347 }
348 }
349
350 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
352 GenerationConfig {
353 industry: format!("{:?}", config.global.industry),
354 start_date: config.global.start_date.clone(),
355 period_months: config.global.period_months,
356 seed: config.global.seed.unwrap_or(0),
357 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
358 companies: config
359 .companies
360 .iter()
361 .map(|c| CompanyConfigProto {
362 code: c.code.clone(),
363 name: c.name.clone(),
364 currency: c.currency.clone(),
365 country: c.country.clone(),
366 annual_transaction_volume: c.annual_transaction_volume.count(),
367 volume_weight: c.volume_weight as f32,
368 })
369 .collect(),
370 fraud_enabled: config.fraud.enabled,
371 fraud_rate: config.fraud.fraud_rate as f32,
372 generate_master_data: config.master_data.vendors.count > 0
373 || config.master_data.customers.count > 0
374 || config.master_data.materials.count > 0,
375 generate_document_flows: config.document_flows.p2p.enabled
376 || config.document_flows.o2c.enabled,
377 }
378 }
379}
380
381#[tonic::async_trait]
382impl synthetic_data_service_server::SyntheticDataService for SynthService {
383 async fn bulk_generate(
385 &self,
386 request: Request<BulkGenerateRequest>,
387 ) -> Result<Response<BulkGenerateResponse>, Status> {
388 let req = request.into_inner();
389
390 const MAX_ENTRY_COUNT: u64 = 1_000_000;
392 if req.entry_count > MAX_ENTRY_COUNT {
393 return Err(Status::invalid_argument(format!(
394 "entry_count ({}) exceeds maximum allowed value ({})",
395 req.entry_count, MAX_ENTRY_COUNT
396 )));
397 }
398
399 let degradation_level = self.state.check_resources()?;
401 if degradation_level != DegradationLevel::Normal {
402 warn!(
403 "Starting bulk generation under resource pressure (level: {:?})",
404 degradation_level
405 );
406 }
407
408 info!("Bulk generate request: {} entries", req.entry_count);
409
410 let config = self.proto_to_config(req.config).await?;
411 let start_time = Instant::now();
412
413 let phase_config = PhaseConfig {
415 generate_master_data: req.include_master_data,
416 generate_document_flows: false,
417 generate_journal_entries: true,
418 inject_anomalies: req.inject_anomalies,
419 show_progress: false,
420 ..Default::default()
421 };
422
423 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
424 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
425
426 let result = orchestrator
427 .generate()
428 .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
429
430 let duration_ms = start_time.elapsed().as_millis() as u64;
431
432 let entries_count = result.journal_entries.len() as u64;
434 self.state
435 .total_entries
436 .fetch_add(entries_count, Ordering::Relaxed);
437
438 let anomaly_count = result.anomaly_labels.labels.len() as u64;
439 self.state
440 .total_anomalies
441 .fetch_add(anomaly_count, Ordering::Relaxed);
442
443 let journal_entries: Vec<JournalEntryProto> = result
445 .journal_entries
446 .iter()
447 .map(Self::journal_entry_to_proto)
448 .collect();
449
450 let anomaly_labels: Vec<AnomalyLabelProto> = result
451 .anomaly_labels
452 .labels
453 .iter()
454 .map(|a| AnomalyLabelProto {
455 anomaly_id: a.anomaly_id.clone(),
456 document_id: a.document_id.clone(),
457 anomaly_type: format!("{:?}", a.anomaly_type),
458 anomaly_category: a.document_type.clone(),
459 description: a.description.clone(),
460 severity_score: a.severity as f32,
461 })
462 .collect();
463
464 let mut total_debit = rust_decimal::Decimal::ZERO;
466 let mut total_credit = rust_decimal::Decimal::ZERO;
467 let mut total_lines = 0u64;
468 let mut entries_by_company = std::collections::HashMap::new();
469 let mut entries_by_source = std::collections::HashMap::new();
470
471 for entry in &result.journal_entries {
472 *entries_by_company
473 .entry(entry.header.company_code.clone())
474 .or_insert(0u64) += 1;
475 *entries_by_source
476 .entry(format!("{:?}", entry.header.source))
477 .or_insert(0u64) += 1;
478
479 for line in &entry.lines {
480 total_lines += 1;
481 total_debit += line.debit_amount;
482 total_credit += line.credit_amount;
483 }
484 }
485
486 let stats = GenerationStats {
487 total_entries: entries_count,
488 total_lines,
489 total_debit_amount: total_debit.to_string(),
490 total_credit_amount: total_credit.to_string(),
491 anomaly_count,
492 entries_by_company,
493 entries_by_source,
494 };
495
496 info!(
497 "Bulk generation complete: {} entries in {}ms",
498 entries_count, duration_ms
499 );
500
501 Ok(Response::new(BulkGenerateResponse {
502 entries_generated: entries_count,
503 duration_ms,
504 journal_entries,
505 anomaly_labels,
506 stats: Some(stats),
507 }))
508 }
509
510 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
511
512 async fn stream_data(
514 &self,
515 request: Request<StreamDataRequest>,
516 ) -> Result<Response<Self::StreamDataStream>, Status> {
517 let req = request.into_inner();
518
519 const MIN_EVENTS_PER_SECOND: u32 = 1;
521 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
522 if req.events_per_second < MIN_EVENTS_PER_SECOND {
523 return Err(Status::invalid_argument(format!(
524 "events_per_second ({}) must be at least {}",
525 req.events_per_second, MIN_EVENTS_PER_SECOND
526 )));
527 }
528 if req.events_per_second > MAX_EVENTS_PER_SECOND {
529 return Err(Status::invalid_argument(format!(
530 "events_per_second ({}) exceeds maximum allowed value ({})",
531 req.events_per_second, MAX_EVENTS_PER_SECOND
532 )));
533 }
534
535 const MAX_STREAM_EVENTS: u64 = 10_000_000;
537 if req.max_events > MAX_STREAM_EVENTS {
538 return Err(Status::invalid_argument(format!(
539 "max_events ({}) exceeds maximum allowed value ({})",
540 req.max_events, MAX_STREAM_EVENTS
541 )));
542 }
543
544 let degradation_level = self.state.check_resources()?;
546 if degradation_level != DegradationLevel::Normal {
547 warn!(
548 "Starting stream under resource pressure (level: {:?})",
549 degradation_level
550 );
551 }
552
553 info!(
554 "Stream data request: {} events/sec, max {}",
555 req.events_per_second, req.max_events
556 );
557
558 let config = self.proto_to_config(req.config).await?;
559 let state = self.state.clone();
560
561 state.active_streams.fetch_add(1, Ordering::Relaxed);
563
564 state.stream_paused.store(false, Ordering::Relaxed);
566 state.stream_stopped.store(false, Ordering::Relaxed);
567
568 let (tx, rx) = mpsc::channel(100);
569
570 let events_per_second = req.events_per_second;
572 let max_events = req.max_events;
573 let inject_anomalies = req.inject_anomalies;
574
575 tokio::spawn(async move {
576 let phase_config = PhaseConfig {
577 generate_master_data: false,
578 generate_document_flows: false,
579 generate_journal_entries: true,
580 inject_anomalies,
581 show_progress: false,
582 ..Default::default()
583 };
584
585 let mut sequence = 0u64;
586 let delay = if events_per_second > 0 {
587 Duration::from_micros(1_000_000 / events_per_second as u64)
588 } else {
589 Duration::from_millis(1)
590 };
591
592 let mut orchestrator =
594 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
595 Ok(o) => o,
596 Err(e) => {
597 error!("Failed to create orchestrator: {}", e);
598 return;
599 }
600 };
601
602 loop {
603 if state.stream_stopped.load(Ordering::Relaxed) {
605 info!("Stream stopped by control command");
606 break;
607 }
608
609 while state.stream_paused.load(Ordering::Relaxed) {
611 tokio::time::sleep(Duration::from_millis(100)).await;
612 if state.stream_stopped.load(Ordering::Relaxed) {
613 break;
614 }
615 }
616
617 if max_events > 0 && sequence >= max_events {
619 info!("Stream reached max events: {}", max_events);
620 break;
621 }
622
623 let result = match orchestrator.generate() {
625 Ok(r) => r,
626 Err(e) => {
627 error!("Generation failed: {}", e);
628 break;
629 }
630 };
631
632 for entry in result.journal_entries {
634 sequence += 1;
635 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
636 state.total_entries.fetch_add(1, Ordering::Relaxed);
637
638 let timestamp = Timestamp {
639 seconds: Utc::now().timestamp(),
640 nanos: 0,
641 };
642
643 let event = DataEvent {
644 sequence,
645 timestamp: Some(timestamp),
646 event: Some(data_event::Event::JournalEntry(
647 SynthService::journal_entry_to_proto(&entry),
648 )),
649 };
650
651 if tx.send(Ok(event)).await.is_err() {
652 info!("Stream receiver dropped");
653 break;
654 }
655
656 tokio::time::sleep(delay).await;
658
659 if max_events > 0 && sequence >= max_events {
661 break;
662 }
663 }
664 }
665
666 state.active_streams.fetch_sub(1, Ordering::Relaxed);
668 });
669
670 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
671 }
672
673 async fn control(
675 &self,
676 request: Request<ControlCommand>,
677 ) -> Result<Response<ControlResponse>, Status> {
678 let cmd = request.into_inner();
679 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
680
681 info!("Control command: {:?}", action);
682
683 let (success, message, status) = match action {
684 ControlAction::Pause => {
685 self.state.stream_paused.store(true, Ordering::Relaxed);
686 (true, "Stream paused".to_string(), StreamStatus::Paused)
687 }
688 ControlAction::Resume => {
689 self.state.stream_paused.store(false, Ordering::Relaxed);
690 (true, "Stream resumed".to_string(), StreamStatus::Running)
691 }
692 ControlAction::Stop => {
693 self.state.stream_stopped.store(true, Ordering::Relaxed);
694 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
695 }
696 ControlAction::TriggerPattern => {
697 let pattern = cmd.pattern_name.unwrap_or_default();
698 if pattern.is_empty() {
699 (
700 false,
701 "Pattern name is required for TriggerPattern action".to_string(),
702 StreamStatus::Running,
703 )
704 } else {
705 let valid_patterns = [
708 "year_end_spike",
709 "period_end_spike",
710 "holiday_cluster",
711 "fraud_cluster",
712 "error_cluster",
713 "uniform",
714 ];
715 let is_valid = valid_patterns.contains(&pattern.as_str())
716 || pattern.starts_with("custom:");
717
718 if is_valid {
719 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
721 *triggered = Some(pattern.clone());
722 }
723 info!("Pattern trigger activated: {}", pattern);
724 (
725 true,
726 format!("Pattern '{}' will be applied to upcoming entries", pattern),
727 StreamStatus::Running,
728 )
729 } else {
730 (
731 false,
732 format!(
733 "Unknown pattern '{}'. Valid patterns: {:?}",
734 pattern, valid_patterns
735 ),
736 StreamStatus::Running,
737 )
738 }
739 }
740 }
741 ControlAction::Unspecified => (
742 false,
743 "Unknown control action".to_string(),
744 StreamStatus::Unspecified,
745 ),
746 };
747
748 Ok(Response::new(ControlResponse {
749 success,
750 message,
751 current_status: status as i32,
752 }))
753 }
754
755 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
757 let config = self.state.config.read().await;
758 let proto_config = Self::config_to_proto(&config);
759
760 Ok(Response::new(ConfigResponse {
761 success: true,
762 message: "Current configuration retrieved".to_string(),
763 current_config: Some(proto_config),
764 }))
765 }
766
767 async fn set_config(
769 &self,
770 request: Request<ConfigRequest>,
771 ) -> Result<Response<ConfigResponse>, Status> {
772 let req = request.into_inner();
773
774 if let Some(proto_config) = req.config {
775 let new_config = self.proto_to_config(Some(proto_config)).await?;
776
777 let mut config = self.state.config.write().await;
778 *config = new_config.clone();
779
780 info!("Configuration updated");
781
782 Ok(Response::new(ConfigResponse {
783 success: true,
784 message: "Configuration updated".to_string(),
785 current_config: Some(Self::config_to_proto(&new_config)),
786 }))
787 } else {
788 Err(Status::invalid_argument("No configuration provided"))
789 }
790 }
791
792 async fn get_metrics(
794 &self,
795 _request: Request<()>,
796 ) -> Result<Response<MetricsResponse>, Status> {
797 let uptime = self.state.uptime_seconds();
798 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
799
800 let entries_per_second = if uptime > 0 {
801 total_entries as f64 / uptime as f64
802 } else {
803 0.0
804 };
805
806 Ok(Response::new(MetricsResponse {
807 total_entries_generated: total_entries,
808 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
809 uptime_seconds: uptime,
810 session_entries: total_entries,
811 session_entries_per_second: entries_per_second,
812 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
813 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
814 }))
815 }
816
817 async fn health_check(
819 &self,
820 _request: Request<()>,
821 ) -> Result<Response<HealthResponse>, Status> {
822 Ok(Response::new(HealthResponse {
823 healthy: true,
824 version: env!("CARGO_PKG_VERSION").to_string(),
825 uptime_seconds: self.state.uptime_seconds(),
826 }))
827 }
828}
829
830pub fn default_generator_config() -> GeneratorConfig {
832 GeneratorConfig {
833 global: GlobalConfig {
834 seed: None,
835 industry: IndustrySector::Manufacturing,
836 start_date: "2024-01-01".to_string(),
837 period_months: 12,
838 group_currency: "USD".to_string(),
839 parallel: true,
840 worker_threads: 0,
841 memory_limit_mb: 0,
842 },
843 companies: vec![CompanyConfig {
844 code: "1000".to_string(),
845 name: "Default Company".to_string(),
846 currency: "USD".to_string(),
847 country: "US".to_string(),
848 annual_transaction_volume: TransactionVolume::TenK,
849 volume_weight: 1.0,
850 fiscal_year_variant: "K4".to_string(),
851 }],
852 chart_of_accounts: ChartOfAccountsConfig {
853 complexity: CoAComplexity::Small,
854 industry_specific: true,
855 custom_accounts: None,
856 min_hierarchy_depth: 2,
857 max_hierarchy_depth: 5,
858 },
859 transactions: Default::default(),
860 output: OutputConfig::default(),
861 fraud: Default::default(),
862 internal_controls: Default::default(),
863 business_processes: Default::default(),
864 user_personas: Default::default(),
865 templates: Default::default(),
866 approval: Default::default(),
867 departments: Default::default(),
868 master_data: Default::default(),
869 document_flows: Default::default(),
870 intercompany: Default::default(),
871 balance: Default::default(),
872 ocpm: Default::default(),
873 audit: Default::default(),
874 banking: Default::default(),
875 data_quality: Default::default(),
876 scenario: Default::default(),
877 temporal: Default::default(),
878 graph_export: Default::default(),
879 streaming: Default::default(),
880 rate_limit: Default::default(),
881 temporal_attributes: Default::default(),
882 relationships: Default::default(),
883 accounting_standards: Default::default(),
884 audit_standards: Default::default(),
885 distributions: Default::default(),
886 temporal_patterns: Default::default(),
887 vendor_network: Default::default(),
888 customer_segmentation: Default::default(),
889 relationship_strength: Default::default(),
890 cross_process_links: Default::default(),
891 organizational_events: Default::default(),
892 behavioral_drift: Default::default(),
893 market_drift: Default::default(),
894 drift_labeling: Default::default(),
895 anomaly_injection: Default::default(),
896 industry_specific: Default::default(),
897 fingerprint_privacy: Default::default(),
898 quality_gates: Default::default(),
899 compliance: Default::default(),
900 webhooks: Default::default(),
901 llm: Default::default(),
902 diffusion: Default::default(),
903 causal: Default::default(),
904 source_to_pay: Default::default(),
905 financial_reporting: Default::default(),
906 hr: Default::default(),
907 manufacturing: Default::default(),
908 sales_quotes: Default::default(),
909 tax: Default::default(),
910 treasury: Default::default(),
911 project_accounting: Default::default(),
912 esg: Default::default(),
913 country_packs: None,
914 }
915}
916
917#[cfg(test)]
918#[allow(clippy::unwrap_used)]
919mod tests {
920 use super::*;
921 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
922
923 #[tokio::test]
928 async fn test_service_creation() {
929 let config = default_generator_config();
930 let service = SynthService::new(config);
931 assert!(service.state.uptime_seconds() < 60);
933 }
934
935 #[tokio::test]
936 async fn test_service_with_state() {
937 let config = default_generator_config();
938 let state = Arc::new(ServerState::new(config));
939 let service = SynthService::with_state(Arc::clone(&state));
940
941 state.total_entries.store(100, Ordering::Relaxed);
943 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
944 }
945
946 #[tokio::test]
951 async fn test_health_check() {
952 let config = default_generator_config();
953 let service = SynthService::new(config);
954
955 let response = service.health_check(Request::new(())).await.unwrap();
956 let health = response.into_inner();
957
958 assert!(health.healthy);
959 assert!(!health.version.is_empty());
960 }
961
962 #[tokio::test]
963 async fn test_health_check_returns_version() {
964 let config = default_generator_config();
965 let service = SynthService::new(config);
966
967 let response = service.health_check(Request::new(())).await.unwrap();
968 let health = response.into_inner();
969
970 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
971 }
972
973 #[tokio::test]
978 async fn test_get_config() {
979 let config = default_generator_config();
980 let service = SynthService::new(config);
981
982 let response = service.get_config(Request::new(())).await.unwrap();
983 let config_response = response.into_inner();
984
985 assert!(config_response.success);
986 assert!(config_response.current_config.is_some());
987 }
988
989 #[tokio::test]
990 async fn test_get_config_returns_industry() {
991 let config = default_generator_config();
992 let service = SynthService::new(config);
993
994 let response = service.get_config(Request::new(())).await.unwrap();
995 let config_response = response.into_inner();
996 let current = config_response.current_config.unwrap();
997
998 assert_eq!(current.industry, "Manufacturing");
999 }
1000
1001 #[tokio::test]
1002 async fn test_set_config() {
1003 let config = default_generator_config();
1004 let service = SynthService::new(config);
1005
1006 let new_config = GenerationConfig {
1007 industry: "retail".to_string(),
1008 start_date: "2024-06-01".to_string(),
1009 period_months: 6,
1010 seed: 42,
1011 coa_complexity: "medium".to_string(),
1012 companies: vec![],
1013 fraud_enabled: true,
1014 fraud_rate: 0.05,
1015 generate_master_data: false,
1016 generate_document_flows: false,
1017 };
1018
1019 let response = service
1020 .set_config(Request::new(ConfigRequest {
1021 config: Some(new_config),
1022 }))
1023 .await
1024 .unwrap();
1025 let config_response = response.into_inner();
1026
1027 assert!(config_response.success);
1028 }
1029
1030 #[tokio::test]
1031 async fn test_set_config_without_config_fails() {
1032 let config = default_generator_config();
1033 let service = SynthService::new(config);
1034
1035 let result = service
1036 .set_config(Request::new(ConfigRequest { config: None }))
1037 .await;
1038
1039 assert!(result.is_err());
1040 }
1041
1042 #[tokio::test]
1047 async fn test_get_metrics_initial() {
1048 let config = default_generator_config();
1049 let service = SynthService::new(config);
1050
1051 let response = service.get_metrics(Request::new(())).await.unwrap();
1052 let metrics = response.into_inner();
1053
1054 assert_eq!(metrics.total_entries_generated, 0);
1055 assert_eq!(metrics.total_anomalies_injected, 0);
1056 assert_eq!(metrics.active_streams, 0);
1057 }
1058
1059 #[tokio::test]
1060 async fn test_get_metrics_after_updates() {
1061 let config = default_generator_config();
1062 let service = SynthService::new(config);
1063
1064 service.state.total_entries.store(1000, Ordering::Relaxed);
1066 service.state.total_anomalies.store(20, Ordering::Relaxed);
1067 service.state.active_streams.store(2, Ordering::Relaxed);
1068
1069 let response = service.get_metrics(Request::new(())).await.unwrap();
1070 let metrics = response.into_inner();
1071
1072 assert_eq!(metrics.total_entries_generated, 1000);
1073 assert_eq!(metrics.total_anomalies_injected, 20);
1074 assert_eq!(metrics.active_streams, 2);
1075 }
1076
1077 #[tokio::test]
1082 async fn test_control_pause() {
1083 let config = default_generator_config();
1084 let service = SynthService::new(config);
1085
1086 let response = service
1087 .control(Request::new(ControlCommand {
1088 action: ControlAction::Pause as i32,
1089 pattern_name: None,
1090 }))
1091 .await
1092 .unwrap();
1093 let control_response = response.into_inner();
1094
1095 assert!(control_response.success);
1096 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1097 }
1098
1099 #[tokio::test]
1100 async fn test_control_resume() {
1101 let config = default_generator_config();
1102 let service = SynthService::new(config);
1103
1104 service.state.stream_paused.store(true, Ordering::Relaxed);
1106
1107 let response = service
1108 .control(Request::new(ControlCommand {
1109 action: ControlAction::Resume as i32,
1110 pattern_name: None,
1111 }))
1112 .await
1113 .unwrap();
1114 let control_response = response.into_inner();
1115
1116 assert!(control_response.success);
1117 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1118 }
1119
1120 #[tokio::test]
1121 async fn test_control_stop() {
1122 let config = default_generator_config();
1123 let service = SynthService::new(config);
1124
1125 let response = service
1126 .control(Request::new(ControlCommand {
1127 action: ControlAction::Stop as i32,
1128 pattern_name: None,
1129 }))
1130 .await
1131 .unwrap();
1132 let control_response = response.into_inner();
1133
1134 assert!(control_response.success);
1135 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1136 }
1137
1138 #[test]
1143 fn test_server_state_creation() {
1144 let config = default_generator_config();
1145 let state = ServerState::new(config);
1146
1147 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1148 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1149 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1150 assert!(!state.stream_paused.load(Ordering::Relaxed));
1151 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1152 }
1153
1154 #[test]
1155 fn test_server_state_uptime() {
1156 let config = default_generator_config();
1157 let state = ServerState::new(config);
1158
1159 assert!(state.uptime_seconds() < 60);
1161 }
1162
1163 #[test]
1168 fn test_default_generator_config() {
1169 let config = default_generator_config();
1170
1171 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1172 assert_eq!(config.global.period_months, 12);
1173 assert!(!config.companies.is_empty());
1174 assert_eq!(config.companies[0].code, "1000");
1175 }
1176
1177 #[test]
1178 fn test_config_to_proto() {
1179 let config = default_generator_config();
1180 let proto = SynthService::config_to_proto(&config);
1181
1182 assert_eq!(proto.industry, "Manufacturing");
1183 assert_eq!(proto.period_months, 12);
1184 assert!(!proto.companies.is_empty());
1185 }
1186
1187 #[tokio::test]
1188 async fn test_proto_to_config_with_none() {
1189 let config = default_generator_config();
1190 let service = SynthService::new(config.clone());
1191
1192 let result = service.proto_to_config(None).await.unwrap();
1193
1194 assert_eq!(result.global.industry, config.global.industry);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_proto_to_config_with_retail() {
1200 let config = default_generator_config();
1201 let service = SynthService::new(config);
1202
1203 let proto = GenerationConfig {
1204 industry: "retail".to_string(),
1205 start_date: "2024-01-01".to_string(),
1206 period_months: 6,
1207 seed: 0,
1208 coa_complexity: "large".to_string(),
1209 companies: vec![],
1210 fraud_enabled: false,
1211 fraud_rate: 0.0,
1212 generate_master_data: false,
1213 generate_document_flows: false,
1214 };
1215
1216 let result = service.proto_to_config(Some(proto)).await.unwrap();
1217
1218 assert_eq!(result.global.industry, IndustrySector::Retail);
1219 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_proto_to_config_with_healthcare() {
1224 let config = default_generator_config();
1225 let service = SynthService::new(config);
1226
1227 let proto = GenerationConfig {
1228 industry: "healthcare".to_string(),
1229 start_date: "2024-01-01".to_string(),
1230 period_months: 12,
1231 seed: 42,
1232 coa_complexity: "small".to_string(),
1233 companies: vec![],
1234 fraud_enabled: true,
1235 fraud_rate: 0.1,
1236 generate_master_data: true,
1237 generate_document_flows: true,
1238 };
1239
1240 let result = service.proto_to_config(Some(proto)).await.unwrap();
1241
1242 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1243 assert_eq!(result.global.seed, Some(42));
1244 assert!(result.fraud.enabled);
1245 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_proto_to_config_with_companies() {
1250 let config = default_generator_config();
1251 let service = SynthService::new(config);
1252
1253 let proto = GenerationConfig {
1254 industry: "technology".to_string(),
1255 start_date: "2024-01-01".to_string(),
1256 period_months: 12,
1257 seed: 0,
1258 coa_complexity: "medium".to_string(),
1259 companies: vec![
1260 CompanyConfigProto {
1261 code: "1000".to_string(),
1262 name: "Parent Corp".to_string(),
1263 currency: "USD".to_string(),
1264 country: "US".to_string(),
1265 annual_transaction_volume: 100000,
1266 volume_weight: 1.0,
1267 },
1268 CompanyConfigProto {
1269 code: "2000".to_string(),
1270 name: "EU Sub".to_string(),
1271 currency: "EUR".to_string(),
1272 country: "DE".to_string(),
1273 annual_transaction_volume: 50000,
1274 volume_weight: 0.5,
1275 },
1276 ],
1277 fraud_enabled: false,
1278 fraud_rate: 0.0,
1279 generate_master_data: false,
1280 generate_document_flows: false,
1281 };
1282
1283 let result = service.proto_to_config(Some(proto)).await.unwrap();
1284
1285 assert_eq!(result.companies.len(), 2);
1286 assert_eq!(result.companies[0].code, "1000");
1287 assert_eq!(result.companies[1].currency, "EUR");
1288 }
1289
1290 #[tokio::test]
1295 async fn test_bulk_generate_entry_count_validation() {
1296 let config = default_generator_config();
1297 let service = SynthService::new(config);
1298
1299 let request = BulkGenerateRequest {
1300 entry_count: 2_000_000, include_master_data: false,
1302 inject_anomalies: false,
1303 output_format: 0,
1304 config: None,
1305 };
1306
1307 let result = service.bulk_generate(Request::new(request)).await;
1308 assert!(result.is_err());
1309 let err = result.err().unwrap();
1310 assert!(err.message().contains("exceeds maximum allowed value"));
1311 }
1312
1313 #[tokio::test]
1314 async fn test_stream_data_events_per_second_too_low() {
1315 let config = default_generator_config();
1316 let service = SynthService::new(config);
1317
1318 let request = StreamDataRequest {
1319 events_per_second: 0, max_events: 100,
1321 inject_anomalies: false,
1322 anomaly_rate: 0.0,
1323 config: None,
1324 };
1325
1326 let result = service.stream_data(Request::new(request)).await;
1327 assert!(result.is_err());
1328 let err = result.err().unwrap();
1329 assert!(err.message().contains("must be at least"));
1330 }
1331
1332 #[tokio::test]
1333 async fn test_stream_data_events_per_second_too_high() {
1334 let config = default_generator_config();
1335 let service = SynthService::new(config);
1336
1337 let request = StreamDataRequest {
1338 events_per_second: 20_000, max_events: 100,
1340 inject_anomalies: false,
1341 anomaly_rate: 0.0,
1342 config: None,
1343 };
1344
1345 let result = service.stream_data(Request::new(request)).await;
1346 assert!(result.is_err());
1347 let err = result.err().unwrap();
1348 assert!(err.message().contains("exceeds maximum allowed value"));
1349 }
1350
1351 #[tokio::test]
1352 async fn test_stream_data_max_events_too_high() {
1353 let config = default_generator_config();
1354 let service = SynthService::new(config);
1355
1356 let request = StreamDataRequest {
1357 events_per_second: 100,
1358 max_events: 100_000_000, inject_anomalies: false,
1360 anomaly_rate: 0.0,
1361 config: None,
1362 };
1363
1364 let result = service.stream_data(Request::new(request)).await;
1365 assert!(result.is_err());
1366 let err = result.err().unwrap();
1367 assert!(err.message().contains("max_events"));
1368 }
1369
1370 #[tokio::test]
1371 async fn test_stream_data_valid_request() {
1372 let config = default_generator_config();
1373 let service = SynthService::new(config);
1374
1375 let request = StreamDataRequest {
1376 events_per_second: 10,
1377 max_events: 5,
1378 inject_anomalies: false,
1379 anomaly_rate: 0.0,
1380 config: None,
1381 };
1382
1383 let result = service.stream_data(Request::new(request)).await;
1386 assert!(result.is_ok());
1387 }
1388}