1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 functional_currency: None,
236 country: "US".to_string(),
237 annual_transaction_volume: TransactionVolume::TenK,
238 volume_weight: 1.0,
239 fiscal_year_variant: "K4".to_string(),
240 }]
241 } else {
242 p.companies
243 .into_iter()
244 .map(|c| CompanyConfig {
245 code: c.code,
246 name: c.name,
247 currency: c.currency,
248 functional_currency: None,
249 country: c.country,
250 annual_transaction_volume: TransactionVolume::Custom(
251 c.annual_transaction_volume,
252 ),
253 volume_weight: c.volume_weight as f64,
254 fiscal_year_variant: "K4".to_string(),
255 })
256 .collect()
257 };
258
259 let mut config = GeneratorConfig {
260 global: GlobalConfig {
261 seed: if p.seed > 0 { Some(p.seed) } else { None },
262 industry,
263 start_date: if p.start_date.is_empty() {
264 "2024-01-01".to_string()
265 } else {
266 p.start_date
267 },
268 period_months: if p.period_months == 0 {
269 12
270 } else {
271 p.period_months
272 },
273 group_currency: "USD".to_string(),
274 presentation_currency: None,
275 parallel: true,
276 worker_threads: 0,
277 memory_limit_mb: 0,
278 fiscal_year_months: None,
279 },
280 companies,
281 chart_of_accounts: ChartOfAccountsConfig {
282 complexity,
283 industry_specific: true,
284 custom_accounts: None,
285 min_hierarchy_depth: 2,
286 max_hierarchy_depth: 5,
287 },
288 ..default_generator_config()
289 };
290
291 if p.fraud_enabled {
293 config.fraud.enabled = true;
294 config.fraud.fraud_rate = p.fraud_rate as f64;
295 }
296
297 Ok(config)
298 }
299 None => {
300 let config = self.state.config.read().await;
302 Ok(config.clone())
303 }
304 }
305 }
306
307 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
309 JournalEntryProto {
310 document_id: entry.header.document_id.to_string(),
311 company_code: entry.header.company_code.clone(),
312 fiscal_year: entry.header.fiscal_year as u32,
313 fiscal_period: entry.header.fiscal_period as u32,
314 posting_date: entry.header.posting_date.to_string(),
315 document_date: entry.header.document_date.to_string(),
316 created_at: entry.header.created_at.to_rfc3339(),
317 source: format!("{:?}", entry.header.source),
318 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
319 lines: entry
320 .lines
321 .iter()
322 .map(|line| {
323 let amount = if line.is_debit() {
324 line.debit_amount
325 } else {
326 line.credit_amount
327 };
328 JournalLineProto {
329 line_number: line.line_number,
330 account_number: line.gl_account.clone(),
331 account_name: line.account_description.clone().unwrap_or_default(),
332 amount: amount.to_string(),
333 is_debit: line.is_debit(),
334 cost_center: line.cost_center.clone(),
335 profit_center: line.profit_center.clone(),
336 vendor_id: line.auxiliary_account_number.clone(),
339 customer_id: None,
340 material_id: None,
341 text: line.line_text.clone().or_else(|| line.text.clone()),
342 }
343 })
344 .collect(),
345 is_anomaly: entry.header.is_fraud,
346 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
347 }
348 }
349
350 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
352 GenerationConfig {
353 industry: format!("{:?}", config.global.industry),
354 start_date: config.global.start_date.clone(),
355 period_months: config.global.period_months,
356 seed: config.global.seed.unwrap_or(0),
357 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
358 companies: config
359 .companies
360 .iter()
361 .map(|c| CompanyConfigProto {
362 code: c.code.clone(),
363 name: c.name.clone(),
364 currency: c.currency.clone(),
365 country: c.country.clone(),
366 annual_transaction_volume: c.annual_transaction_volume.count(),
367 volume_weight: c.volume_weight as f32,
368 })
369 .collect(),
370 fraud_enabled: config.fraud.enabled,
371 fraud_rate: config.fraud.fraud_rate as f32,
372 generate_master_data: config.master_data.vendors.count > 0
373 || config.master_data.customers.count > 0
374 || config.master_data.materials.count > 0,
375 generate_document_flows: config.document_flows.p2p.enabled
376 || config.document_flows.o2c.enabled,
377 }
378 }
379}
380
381#[tonic::async_trait]
382impl synthetic_data_service_server::SyntheticDataService for SynthService {
383 async fn bulk_generate(
385 &self,
386 request: Request<BulkGenerateRequest>,
387 ) -> Result<Response<BulkGenerateResponse>, Status> {
388 let req = request.into_inner();
389
390 const MAX_ENTRY_COUNT: u64 = 1_000_000;
392 if req.entry_count > MAX_ENTRY_COUNT {
393 return Err(Status::invalid_argument(format!(
394 "entry_count ({}) exceeds maximum allowed value ({})",
395 req.entry_count, MAX_ENTRY_COUNT
396 )));
397 }
398
399 let degradation_level = self.state.check_resources()?;
401 if degradation_level != DegradationLevel::Normal {
402 warn!(
403 "Starting bulk generation under resource pressure (level: {:?})",
404 degradation_level
405 );
406 }
407
408 info!("Bulk generate request: {} entries", req.entry_count);
409
410 let config = self.proto_to_config(req.config).await?;
411 let start_time = Instant::now();
412
413 let phase_config = PhaseConfig {
415 generate_master_data: req.include_master_data,
416 generate_document_flows: false,
417 generate_journal_entries: true,
418 inject_anomalies: req.inject_anomalies,
419 show_progress: false,
420 ..Default::default()
421 };
422
423 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
424 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
425
426 let result = orchestrator
427 .generate()
428 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
429
430 let duration_ms = start_time.elapsed().as_millis() as u64;
431
432 let entries_count = result.journal_entries.len() as u64;
434 self.state
435 .total_entries
436 .fetch_add(entries_count, Ordering::Relaxed);
437
438 let anomaly_count = result.anomaly_labels.labels.len() as u64;
439 self.state
440 .total_anomalies
441 .fetch_add(anomaly_count, Ordering::Relaxed);
442
443 let journal_entries: Vec<JournalEntryProto> = result
445 .journal_entries
446 .iter()
447 .map(Self::journal_entry_to_proto)
448 .collect();
449
450 let anomaly_labels: Vec<AnomalyLabelProto> = result
451 .anomaly_labels
452 .labels
453 .iter()
454 .map(|a| AnomalyLabelProto {
455 anomaly_id: a.anomaly_id.clone(),
456 document_id: a.document_id.clone(),
457 anomaly_type: format!("{:?}", a.anomaly_type),
458 anomaly_category: a.document_type.clone(),
459 description: a.description.clone(),
460 severity_score: a.severity as f32,
461 })
462 .collect();
463
464 let mut total_debit = rust_decimal::Decimal::ZERO;
466 let mut total_credit = rust_decimal::Decimal::ZERO;
467 let mut total_lines = 0u64;
468 let mut entries_by_company = std::collections::HashMap::new();
469 let mut entries_by_source = std::collections::HashMap::new();
470
471 for entry in &result.journal_entries {
472 *entries_by_company
473 .entry(entry.header.company_code.clone())
474 .or_insert(0u64) += 1;
475 *entries_by_source
476 .entry(format!("{:?}", entry.header.source))
477 .or_insert(0u64) += 1;
478
479 for line in &entry.lines {
480 total_lines += 1;
481 total_debit += line.debit_amount;
482 total_credit += line.credit_amount;
483 }
484 }
485
486 let stats = GenerationStats {
487 total_entries: entries_count,
488 total_lines,
489 total_debit_amount: total_debit.to_string(),
490 total_credit_amount: total_credit.to_string(),
491 anomaly_count,
492 entries_by_company,
493 entries_by_source,
494 };
495
496 info!(
497 "Bulk generation complete: {} entries in {}ms",
498 entries_count, duration_ms
499 );
500
501 Ok(Response::new(BulkGenerateResponse {
502 entries_generated: entries_count,
503 duration_ms,
504 journal_entries,
505 anomaly_labels,
506 stats: Some(stats),
507 }))
508 }
509
510 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
511
512 async fn stream_data(
514 &self,
515 request: Request<StreamDataRequest>,
516 ) -> Result<Response<Self::StreamDataStream>, Status> {
517 let req = request.into_inner();
518
519 const MIN_EVENTS_PER_SECOND: u32 = 1;
521 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
522 if req.events_per_second < MIN_EVENTS_PER_SECOND {
523 return Err(Status::invalid_argument(format!(
524 "events_per_second ({}) must be at least {}",
525 req.events_per_second, MIN_EVENTS_PER_SECOND
526 )));
527 }
528 if req.events_per_second > MAX_EVENTS_PER_SECOND {
529 return Err(Status::invalid_argument(format!(
530 "events_per_second ({}) exceeds maximum allowed value ({})",
531 req.events_per_second, MAX_EVENTS_PER_SECOND
532 )));
533 }
534
535 const MAX_STREAM_EVENTS: u64 = 10_000_000;
537 if req.max_events > MAX_STREAM_EVENTS {
538 return Err(Status::invalid_argument(format!(
539 "max_events ({}) exceeds maximum allowed value ({})",
540 req.max_events, MAX_STREAM_EVENTS
541 )));
542 }
543
544 let degradation_level = self.state.check_resources()?;
546 if degradation_level != DegradationLevel::Normal {
547 warn!(
548 "Starting stream under resource pressure (level: {:?})",
549 degradation_level
550 );
551 }
552
553 info!(
554 "Stream data request: {} events/sec, max {}",
555 req.events_per_second, req.max_events
556 );
557
558 let config = self.proto_to_config(req.config).await?;
559 let state = self.state.clone();
560
561 state.active_streams.fetch_add(1, Ordering::Relaxed);
563
564 state.stream_paused.store(false, Ordering::Relaxed);
566 state.stream_stopped.store(false, Ordering::Relaxed);
567
568 let (tx, rx) = mpsc::channel(100);
569
570 let events_per_second = req.events_per_second;
572 let max_events = req.max_events;
573 let inject_anomalies = req.inject_anomalies;
574
575 tokio::spawn(async move {
576 let phase_config = PhaseConfig {
577 generate_master_data: false,
578 generate_document_flows: false,
579 generate_journal_entries: true,
580 inject_anomalies,
581 show_progress: false,
582 ..Default::default()
583 };
584
585 let mut sequence = 0u64;
586 let delay = if events_per_second > 0 {
587 Duration::from_micros(1_000_000 / events_per_second as u64)
588 } else {
589 Duration::from_millis(1)
590 };
591
592 let mut orchestrator =
594 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
595 Ok(o) => o,
596 Err(e) => {
597 error!("Failed to create orchestrator: {}", e);
598 return;
599 }
600 };
601
602 loop {
603 if state.stream_stopped.load(Ordering::Relaxed) {
605 info!("Stream stopped by control command");
606 break;
607 }
608
609 while state.stream_paused.load(Ordering::Relaxed) {
611 tokio::time::sleep(Duration::from_millis(100)).await;
612 if state.stream_stopped.load(Ordering::Relaxed) {
613 break;
614 }
615 }
616
617 if max_events > 0 && sequence >= max_events {
619 info!("Stream reached max events: {}", max_events);
620 break;
621 }
622
623 let result = match orchestrator.generate() {
625 Ok(r) => r,
626 Err(e) => {
627 error!("Generation failed: {}", e);
628 break;
629 }
630 };
631
632 for entry in result.journal_entries {
634 sequence += 1;
635 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
636 state.total_entries.fetch_add(1, Ordering::Relaxed);
637
638 let timestamp = Timestamp {
639 seconds: Utc::now().timestamp(),
640 nanos: 0,
641 };
642
643 let event = DataEvent {
644 sequence,
645 timestamp: Some(timestamp),
646 event: Some(data_event::Event::JournalEntry(
647 SynthService::journal_entry_to_proto(&entry),
648 )),
649 };
650
651 if tx.send(Ok(event)).await.is_err() {
652 info!("Stream receiver dropped");
653 break;
654 }
655
656 tokio::time::sleep(delay).await;
658
659 if max_events > 0 && sequence >= max_events {
661 break;
662 }
663 }
664 }
665
666 state.active_streams.fetch_sub(1, Ordering::Relaxed);
668 });
669
670 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
671 }
672
673 async fn control(
675 &self,
676 request: Request<ControlCommand>,
677 ) -> Result<Response<ControlResponse>, Status> {
678 let cmd = request.into_inner();
679 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
680
681 info!("Control command: {:?}", action);
682
683 let (success, message, status) = match action {
684 ControlAction::Pause => {
685 self.state.stream_paused.store(true, Ordering::Relaxed);
686 (true, "Stream paused".to_string(), StreamStatus::Paused)
687 }
688 ControlAction::Resume => {
689 self.state.stream_paused.store(false, Ordering::Relaxed);
690 (true, "Stream resumed".to_string(), StreamStatus::Running)
691 }
692 ControlAction::Stop => {
693 self.state.stream_stopped.store(true, Ordering::Relaxed);
694 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
695 }
696 ControlAction::TriggerPattern => {
697 let pattern = cmd.pattern_name.unwrap_or_default();
698 if pattern.is_empty() {
699 (
700 false,
701 "Pattern name is required for TriggerPattern action".to_string(),
702 StreamStatus::Running,
703 )
704 } else {
705 let valid_patterns = [
708 "year_end_spike",
709 "period_end_spike",
710 "holiday_cluster",
711 "fraud_cluster",
712 "error_cluster",
713 "uniform",
714 ];
715 let is_valid = valid_patterns.contains(&pattern.as_str())
716 || pattern.starts_with("custom:");
717
718 if is_valid {
719 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
721 *triggered = Some(pattern.clone());
722 }
723 info!("Pattern trigger activated: {}", pattern);
724 (
725 true,
726 format!("Pattern '{pattern}' will be applied to upcoming entries"),
727 StreamStatus::Running,
728 )
729 } else {
730 (
731 false,
732 format!(
733 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
734 ),
735 StreamStatus::Running,
736 )
737 }
738 }
739 }
740 ControlAction::Unspecified => (
741 false,
742 "Unknown control action".to_string(),
743 StreamStatus::Unspecified,
744 ),
745 };
746
747 Ok(Response::new(ControlResponse {
748 success,
749 message,
750 current_status: status as i32,
751 }))
752 }
753
754 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
756 let config = self.state.config.read().await;
757 let proto_config = Self::config_to_proto(&config);
758
759 Ok(Response::new(ConfigResponse {
760 success: true,
761 message: "Current configuration retrieved".to_string(),
762 current_config: Some(proto_config),
763 }))
764 }
765
766 async fn set_config(
768 &self,
769 request: Request<ConfigRequest>,
770 ) -> Result<Response<ConfigResponse>, Status> {
771 let req = request.into_inner();
772
773 if let Some(proto_config) = req.config {
774 let new_config = self.proto_to_config(Some(proto_config)).await?;
775
776 let mut config = self.state.config.write().await;
777 *config = new_config.clone();
778
779 info!("Configuration updated");
780
781 Ok(Response::new(ConfigResponse {
782 success: true,
783 message: "Configuration updated".to_string(),
784 current_config: Some(Self::config_to_proto(&new_config)),
785 }))
786 } else {
787 Err(Status::invalid_argument("No configuration provided"))
788 }
789 }
790
791 async fn get_metrics(
793 &self,
794 _request: Request<()>,
795 ) -> Result<Response<MetricsResponse>, Status> {
796 let uptime = self.state.uptime_seconds();
797 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
798
799 let entries_per_second = if uptime > 0 {
800 total_entries as f64 / uptime as f64
801 } else {
802 0.0
803 };
804
805 Ok(Response::new(MetricsResponse {
806 total_entries_generated: total_entries,
807 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
808 uptime_seconds: uptime,
809 session_entries: total_entries,
810 session_entries_per_second: entries_per_second,
811 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
812 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
813 }))
814 }
815
816 async fn health_check(
818 &self,
819 _request: Request<()>,
820 ) -> Result<Response<HealthResponse>, Status> {
821 Ok(Response::new(HealthResponse {
822 healthy: true,
823 version: env!("CARGO_PKG_VERSION").to_string(),
824 uptime_seconds: self.state.uptime_seconds(),
825 }))
826 }
827}
828
829pub fn default_generator_config() -> GeneratorConfig {
831 GeneratorConfig {
832 global: GlobalConfig {
833 seed: None,
834 industry: IndustrySector::Manufacturing,
835 start_date: "2024-01-01".to_string(),
836 period_months: 12,
837 group_currency: "USD".to_string(),
838 presentation_currency: None,
839 parallel: true,
840 worker_threads: 0,
841 memory_limit_mb: 0,
842 fiscal_year_months: None,
843 },
844 companies: vec![CompanyConfig {
845 code: "1000".to_string(),
846 name: "Default Company".to_string(),
847 currency: "USD".to_string(),
848 functional_currency: None,
849 country: "US".to_string(),
850 annual_transaction_volume: TransactionVolume::TenK,
851 volume_weight: 1.0,
852 fiscal_year_variant: "K4".to_string(),
853 }],
854 chart_of_accounts: ChartOfAccountsConfig {
855 complexity: CoAComplexity::Small,
856 industry_specific: true,
857 custom_accounts: None,
858 min_hierarchy_depth: 2,
859 max_hierarchy_depth: 5,
860 },
861 transactions: Default::default(),
862 output: OutputConfig::default(),
863 fraud: Default::default(),
864 internal_controls: Default::default(),
865 business_processes: Default::default(),
866 user_personas: Default::default(),
867 templates: Default::default(),
868 approval: Default::default(),
869 departments: Default::default(),
870 master_data: Default::default(),
871 document_flows: Default::default(),
872 intercompany: Default::default(),
873 balance: Default::default(),
874 ocpm: Default::default(),
875 audit: Default::default(),
876 banking: Default::default(),
877 data_quality: Default::default(),
878 scenario: Default::default(),
879 temporal: Default::default(),
880 graph_export: Default::default(),
881 streaming: Default::default(),
882 rate_limit: Default::default(),
883 temporal_attributes: Default::default(),
884 relationships: Default::default(),
885 accounting_standards: Default::default(),
886 audit_standards: Default::default(),
887 distributions: Default::default(),
888 temporal_patterns: Default::default(),
889 vendor_network: Default::default(),
890 customer_segmentation: Default::default(),
891 relationship_strength: Default::default(),
892 cross_process_links: Default::default(),
893 organizational_events: Default::default(),
894 behavioral_drift: Default::default(),
895 market_drift: Default::default(),
896 drift_labeling: Default::default(),
897 anomaly_injection: Default::default(),
898 industry_specific: Default::default(),
899 fingerprint_privacy: Default::default(),
900 quality_gates: Default::default(),
901 compliance: Default::default(),
902 webhooks: Default::default(),
903 llm: Default::default(),
904 diffusion: Default::default(),
905 causal: Default::default(),
906 source_to_pay: Default::default(),
907 financial_reporting: Default::default(),
908 hr: Default::default(),
909 manufacturing: Default::default(),
910 sales_quotes: Default::default(),
911 tax: Default::default(),
912 treasury: Default::default(),
913 project_accounting: Default::default(),
914 esg: Default::default(),
915 country_packs: None,
916 scenarios: Default::default(),
917 session: Default::default(),
918 compliance_regulations: Default::default(),
919 }
920}
921
922#[cfg(test)]
923#[allow(clippy::unwrap_used)]
924mod tests {
925 use super::*;
926 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
927
928 #[tokio::test]
933 async fn test_service_creation() {
934 let config = default_generator_config();
935 let service = SynthService::new(config);
936 assert!(service.state.uptime_seconds() < 60);
938 }
939
940 #[tokio::test]
941 async fn test_service_with_state() {
942 let config = default_generator_config();
943 let state = Arc::new(ServerState::new(config));
944 let service = SynthService::with_state(Arc::clone(&state));
945
946 state.total_entries.store(100, Ordering::Relaxed);
948 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
949 }
950
951 #[tokio::test]
956 async fn test_health_check() {
957 let config = default_generator_config();
958 let service = SynthService::new(config);
959
960 let response = service.health_check(Request::new(())).await.unwrap();
961 let health = response.into_inner();
962
963 assert!(health.healthy);
964 assert!(!health.version.is_empty());
965 }
966
967 #[tokio::test]
968 async fn test_health_check_returns_version() {
969 let config = default_generator_config();
970 let service = SynthService::new(config);
971
972 let response = service.health_check(Request::new(())).await.unwrap();
973 let health = response.into_inner();
974
975 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
976 }
977
978 #[tokio::test]
983 async fn test_get_config() {
984 let config = default_generator_config();
985 let service = SynthService::new(config);
986
987 let response = service.get_config(Request::new(())).await.unwrap();
988 let config_response = response.into_inner();
989
990 assert!(config_response.success);
991 assert!(config_response.current_config.is_some());
992 }
993
994 #[tokio::test]
995 async fn test_get_config_returns_industry() {
996 let config = default_generator_config();
997 let service = SynthService::new(config);
998
999 let response = service.get_config(Request::new(())).await.unwrap();
1000 let config_response = response.into_inner();
1001 let current = config_response.current_config.unwrap();
1002
1003 assert_eq!(current.industry, "Manufacturing");
1004 }
1005
1006 #[tokio::test]
1007 async fn test_set_config() {
1008 let config = default_generator_config();
1009 let service = SynthService::new(config);
1010
1011 let new_config = GenerationConfig {
1012 industry: "retail".to_string(),
1013 start_date: "2024-06-01".to_string(),
1014 period_months: 6,
1015 seed: 42,
1016 coa_complexity: "medium".to_string(),
1017 companies: vec![],
1018 fraud_enabled: true,
1019 fraud_rate: 0.05,
1020 generate_master_data: false,
1021 generate_document_flows: false,
1022 };
1023
1024 let response = service
1025 .set_config(Request::new(ConfigRequest {
1026 config: Some(new_config),
1027 }))
1028 .await
1029 .unwrap();
1030 let config_response = response.into_inner();
1031
1032 assert!(config_response.success);
1033 }
1034
1035 #[tokio::test]
1036 async fn test_set_config_without_config_fails() {
1037 let config = default_generator_config();
1038 let service = SynthService::new(config);
1039
1040 let result = service
1041 .set_config(Request::new(ConfigRequest { config: None }))
1042 .await;
1043
1044 assert!(result.is_err());
1045 }
1046
1047 #[tokio::test]
1052 async fn test_get_metrics_initial() {
1053 let config = default_generator_config();
1054 let service = SynthService::new(config);
1055
1056 let response = service.get_metrics(Request::new(())).await.unwrap();
1057 let metrics = response.into_inner();
1058
1059 assert_eq!(metrics.total_entries_generated, 0);
1060 assert_eq!(metrics.total_anomalies_injected, 0);
1061 assert_eq!(metrics.active_streams, 0);
1062 }
1063
1064 #[tokio::test]
1065 async fn test_get_metrics_after_updates() {
1066 let config = default_generator_config();
1067 let service = SynthService::new(config);
1068
1069 service.state.total_entries.store(1000, Ordering::Relaxed);
1071 service.state.total_anomalies.store(20, Ordering::Relaxed);
1072 service.state.active_streams.store(2, Ordering::Relaxed);
1073
1074 let response = service.get_metrics(Request::new(())).await.unwrap();
1075 let metrics = response.into_inner();
1076
1077 assert_eq!(metrics.total_entries_generated, 1000);
1078 assert_eq!(metrics.total_anomalies_injected, 20);
1079 assert_eq!(metrics.active_streams, 2);
1080 }
1081
1082 #[tokio::test]
1087 async fn test_control_pause() {
1088 let config = default_generator_config();
1089 let service = SynthService::new(config);
1090
1091 let response = service
1092 .control(Request::new(ControlCommand {
1093 action: ControlAction::Pause as i32,
1094 pattern_name: None,
1095 }))
1096 .await
1097 .unwrap();
1098 let control_response = response.into_inner();
1099
1100 assert!(control_response.success);
1101 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1102 }
1103
1104 #[tokio::test]
1105 async fn test_control_resume() {
1106 let config = default_generator_config();
1107 let service = SynthService::new(config);
1108
1109 service.state.stream_paused.store(true, Ordering::Relaxed);
1111
1112 let response = service
1113 .control(Request::new(ControlCommand {
1114 action: ControlAction::Resume as i32,
1115 pattern_name: None,
1116 }))
1117 .await
1118 .unwrap();
1119 let control_response = response.into_inner();
1120
1121 assert!(control_response.success);
1122 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1123 }
1124
1125 #[tokio::test]
1126 async fn test_control_stop() {
1127 let config = default_generator_config();
1128 let service = SynthService::new(config);
1129
1130 let response = service
1131 .control(Request::new(ControlCommand {
1132 action: ControlAction::Stop as i32,
1133 pattern_name: None,
1134 }))
1135 .await
1136 .unwrap();
1137 let control_response = response.into_inner();
1138
1139 assert!(control_response.success);
1140 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1141 }
1142
1143 #[test]
1148 fn test_server_state_creation() {
1149 let config = default_generator_config();
1150 let state = ServerState::new(config);
1151
1152 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1153 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1154 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1155 assert!(!state.stream_paused.load(Ordering::Relaxed));
1156 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1157 }
1158
1159 #[test]
1160 fn test_server_state_uptime() {
1161 let config = default_generator_config();
1162 let state = ServerState::new(config);
1163
1164 assert!(state.uptime_seconds() < 60);
1166 }
1167
1168 #[test]
1173 fn test_default_generator_config() {
1174 let config = default_generator_config();
1175
1176 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1177 assert_eq!(config.global.period_months, 12);
1178 assert!(!config.companies.is_empty());
1179 assert_eq!(config.companies[0].code, "1000");
1180 }
1181
1182 #[test]
1183 fn test_config_to_proto() {
1184 let config = default_generator_config();
1185 let proto = SynthService::config_to_proto(&config);
1186
1187 assert_eq!(proto.industry, "Manufacturing");
1188 assert_eq!(proto.period_months, 12);
1189 assert!(!proto.companies.is_empty());
1190 }
1191
1192 #[tokio::test]
1193 async fn test_proto_to_config_with_none() {
1194 let config = default_generator_config();
1195 let service = SynthService::new(config.clone());
1196
1197 let result = service.proto_to_config(None).await.unwrap();
1198
1199 assert_eq!(result.global.industry, config.global.industry);
1201 }
1202
1203 #[tokio::test]
1204 async fn test_proto_to_config_with_retail() {
1205 let config = default_generator_config();
1206 let service = SynthService::new(config);
1207
1208 let proto = GenerationConfig {
1209 industry: "retail".to_string(),
1210 start_date: "2024-01-01".to_string(),
1211 period_months: 6,
1212 seed: 0,
1213 coa_complexity: "large".to_string(),
1214 companies: vec![],
1215 fraud_enabled: false,
1216 fraud_rate: 0.0,
1217 generate_master_data: false,
1218 generate_document_flows: false,
1219 };
1220
1221 let result = service.proto_to_config(Some(proto)).await.unwrap();
1222
1223 assert_eq!(result.global.industry, IndustrySector::Retail);
1224 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_proto_to_config_with_healthcare() {
1229 let config = default_generator_config();
1230 let service = SynthService::new(config);
1231
1232 let proto = GenerationConfig {
1233 industry: "healthcare".to_string(),
1234 start_date: "2024-01-01".to_string(),
1235 period_months: 12,
1236 seed: 42,
1237 coa_complexity: "small".to_string(),
1238 companies: vec![],
1239 fraud_enabled: true,
1240 fraud_rate: 0.1,
1241 generate_master_data: true,
1242 generate_document_flows: true,
1243 };
1244
1245 let result = service.proto_to_config(Some(proto)).await.unwrap();
1246
1247 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1248 assert_eq!(result.global.seed, Some(42));
1249 assert!(result.fraud.enabled);
1250 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1251 }
1252
1253 #[tokio::test]
1254 async fn test_proto_to_config_with_companies() {
1255 let config = default_generator_config();
1256 let service = SynthService::new(config);
1257
1258 let proto = GenerationConfig {
1259 industry: "technology".to_string(),
1260 start_date: "2024-01-01".to_string(),
1261 period_months: 12,
1262 seed: 0,
1263 coa_complexity: "medium".to_string(),
1264 companies: vec![
1265 CompanyConfigProto {
1266 code: "1000".to_string(),
1267 name: "Parent Corp".to_string(),
1268 currency: "USD".to_string(),
1269 country: "US".to_string(),
1270 annual_transaction_volume: 100000,
1271 volume_weight: 1.0,
1272 },
1273 CompanyConfigProto {
1274 code: "2000".to_string(),
1275 name: "EU Sub".to_string(),
1276 currency: "EUR".to_string(),
1277 country: "DE".to_string(),
1278 annual_transaction_volume: 50000,
1279 volume_weight: 0.5,
1280 },
1281 ],
1282 fraud_enabled: false,
1283 fraud_rate: 0.0,
1284 generate_master_data: false,
1285 generate_document_flows: false,
1286 };
1287
1288 let result = service.proto_to_config(Some(proto)).await.unwrap();
1289
1290 assert_eq!(result.companies.len(), 2);
1291 assert_eq!(result.companies[0].code, "1000");
1292 assert_eq!(result.companies[1].currency, "EUR");
1293 }
1294
1295 #[tokio::test]
1300 async fn test_bulk_generate_entry_count_validation() {
1301 let config = default_generator_config();
1302 let service = SynthService::new(config);
1303
1304 let request = BulkGenerateRequest {
1305 entry_count: 2_000_000, include_master_data: false,
1307 inject_anomalies: false,
1308 output_format: 0,
1309 config: None,
1310 };
1311
1312 let result = service.bulk_generate(Request::new(request)).await;
1313 assert!(result.is_err());
1314 let err = result.err().unwrap();
1315 assert!(err.message().contains("exceeds maximum allowed value"));
1316 }
1317
1318 #[tokio::test]
1319 async fn test_stream_data_events_per_second_too_low() {
1320 let config = default_generator_config();
1321 let service = SynthService::new(config);
1322
1323 let request = StreamDataRequest {
1324 events_per_second: 0, max_events: 100,
1326 inject_anomalies: false,
1327 anomaly_rate: 0.0,
1328 config: None,
1329 };
1330
1331 let result = service.stream_data(Request::new(request)).await;
1332 assert!(result.is_err());
1333 let err = result.err().unwrap();
1334 assert!(err.message().contains("must be at least"));
1335 }
1336
1337 #[tokio::test]
1338 async fn test_stream_data_events_per_second_too_high() {
1339 let config = default_generator_config();
1340 let service = SynthService::new(config);
1341
1342 let request = StreamDataRequest {
1343 events_per_second: 20_000, max_events: 100,
1345 inject_anomalies: false,
1346 anomaly_rate: 0.0,
1347 config: None,
1348 };
1349
1350 let result = service.stream_data(Request::new(request)).await;
1351 assert!(result.is_err());
1352 let err = result.err().unwrap();
1353 assert!(err.message().contains("exceeds maximum allowed value"));
1354 }
1355
1356 #[tokio::test]
1357 async fn test_stream_data_max_events_too_high() {
1358 let config = default_generator_config();
1359 let service = SynthService::new(config);
1360
1361 let request = StreamDataRequest {
1362 events_per_second: 100,
1363 max_events: 100_000_000, inject_anomalies: false,
1365 anomaly_rate: 0.0,
1366 config: None,
1367 };
1368
1369 let result = service.stream_data(Request::new(request)).await;
1370 assert!(result.is_err());
1371 let err = result.err().unwrap();
1372 assert!(err.message().contains("max_events"));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_stream_data_valid_request() {
1377 let config = default_generator_config();
1378 let service = SynthService::new(config);
1379
1380 let request = StreamDataRequest {
1381 events_per_second: 10,
1382 max_events: 5,
1383 inject_anomalies: false,
1384 anomaly_rate: 0.0,
1385 config: None,
1386 };
1387
1388 let result = service.stream_data(Request::new(request)).await;
1391 assert!(result.is_ok());
1392 }
1393}