1use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17 ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18 TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26pub struct ServerState {
28 pub config: RwLock<GeneratorConfig>,
30 pub config_source: RwLock<crate::config_loader::ConfigSource>,
32 start_time: Instant,
34 pub total_entries: AtomicU64,
36 pub total_anomalies: AtomicU64,
38 pub active_streams: AtomicU64,
40 pub total_stream_events: AtomicU64,
42 pub stream_paused: AtomicBool,
44 pub stream_stopped: AtomicBool,
46 pub stream_events_per_second: AtomicU64,
48 pub stream_max_events: AtomicU64,
50 pub stream_inject_anomalies: AtomicBool,
52 pub triggered_pattern: RwLock<Option<String>>,
54 pub resource_guard: Arc<ResourceGuard>,
56 max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61 pub fn new(config: GeneratorConfig) -> Self {
62 let memory_limit = config.global.memory_limit_mb;
64 let resource_guard = if memory_limit > 0 {
65 ResourceGuardBuilder::new()
66 .memory_limit(memory_limit)
67 .conservative()
68 .build()
69 } else {
70 ResourceGuardBuilder::new()
72 .memory_limit(2048)
73 .conservative()
74 .build()
75 };
76
77 Self {
78 config: RwLock::new(config),
79 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80 start_time: Instant::now(),
81 total_entries: AtomicU64::new(0),
82 total_anomalies: AtomicU64::new(0),
83 active_streams: AtomicU64::new(0),
84 total_stream_events: AtomicU64::new(0),
85 stream_paused: AtomicBool::new(false),
86 stream_stopped: AtomicBool::new(false),
87 stream_events_per_second: AtomicU64::new(0),
88 stream_max_events: AtomicU64::new(0),
89 stream_inject_anomalies: AtomicBool::new(false),
90 triggered_pattern: RwLock::new(None),
91 resource_guard: Arc::new(resource_guard),
92 max_concurrent_generations: AtomicU64::new(4),
93 }
94 }
95
96 pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98 let resource_guard = ResourceGuardBuilder::new()
99 .memory_limit(memory_limit_mb)
100 .conservative()
101 .build();
102
103 Self {
104 config: RwLock::new(config),
105 config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106 start_time: Instant::now(),
107 total_entries: AtomicU64::new(0),
108 total_anomalies: AtomicU64::new(0),
109 active_streams: AtomicU64::new(0),
110 total_stream_events: AtomicU64::new(0),
111 stream_paused: AtomicBool::new(false),
112 stream_stopped: AtomicBool::new(false),
113 stream_events_per_second: AtomicU64::new(0),
114 stream_max_events: AtomicU64::new(0),
115 stream_inject_anomalies: AtomicBool::new(false),
116 triggered_pattern: RwLock::new(None),
117 resource_guard: Arc::new(resource_guard),
118 max_concurrent_generations: AtomicU64::new(4),
119 }
120 }
121
122 pub fn uptime_seconds(&self) -> u64 {
123 self.start_time.elapsed().as_secs()
124 }
125
126 #[allow(clippy::result_large_err)] pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129 let active = self.active_streams.load(Ordering::Relaxed);
131 let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132 if active >= max {
133 return Err(Status::resource_exhausted(format!(
134 "Too many concurrent generations ({active}/{max}). Try again later."
135 )));
136 }
137
138 match self.resource_guard.check() {
140 Ok(level) => {
141 if level == DegradationLevel::Emergency {
142 Err(Status::resource_exhausted(
143 "Server resources critically low. Generation not possible.",
144 ))
145 } else if level == DegradationLevel::Minimal {
146 warn!("Resources constrained, generation may be limited");
147 Ok(level)
148 } else {
149 Ok(level)
150 }
151 }
152 Err(e) => Err(Status::resource_exhausted(format!(
153 "Resource check failed: {e}"
154 ))),
155 }
156 }
157
158 pub fn resource_status(&self) -> ResourceStatus {
160 let stats = self.resource_guard.stats();
161 ResourceStatus {
162 memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163 memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164 disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165 degradation_level: stats.degradation_level.name().to_string(),
166 active_generations: self.active_streams.load(Ordering::Relaxed),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174 pub memory_usage_mb: u64,
175 pub memory_peak_mb: u64,
176 pub disk_available_mb: u64,
177 pub degradation_level: String,
178 pub active_generations: u64,
179}
180
181pub struct SynthService {
183 pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187 pub fn new(config: GeneratorConfig) -> Self {
188 Self {
189 state: Arc::new(ServerState::new(config)),
190 }
191 }
192
193 pub fn with_state(state: Arc<ServerState>) -> Self {
194 Self { state }
195 }
196
197 async fn proto_to_config(
199 &self,
200 proto: Option<GenerationConfig>,
201 ) -> Result<GeneratorConfig, Status> {
202 match proto {
203 Some(p) => {
204 let industry = match p.industry.to_lowercase().as_str() {
205 "manufacturing" => IndustrySector::Manufacturing,
206 "retail" => IndustrySector::Retail,
207 "financial_services" | "financial" => IndustrySector::FinancialServices,
208 "healthcare" => IndustrySector::Healthcare,
209 "technology" => IndustrySector::Technology,
210 "" => IndustrySector::Manufacturing, other => {
212 return Err(Status::invalid_argument(format!(
213 "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214 )));
215 }
216 };
217
218 let complexity = match p.coa_complexity.to_lowercase().as_str() {
219 "small" => CoAComplexity::Small,
220 "medium" => CoAComplexity::Medium,
221 "large" => CoAComplexity::Large,
222 "" => CoAComplexity::Small, other => {
224 return Err(Status::invalid_argument(format!(
225 "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226 )));
227 }
228 };
229
230 let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231 vec![CompanyConfig {
232 code: "1000".to_string(),
233 name: "Default Company".to_string(),
234 currency: "USD".to_string(),
235 functional_currency: None,
236 country: "US".to_string(),
237 annual_transaction_volume: TransactionVolume::TenK,
238 volume_weight: 1.0,
239 fiscal_year_variant: "K4".to_string(),
240 }]
241 } else {
242 p.companies
243 .into_iter()
244 .map(|c| CompanyConfig {
245 code: c.code,
246 name: c.name,
247 currency: c.currency,
248 functional_currency: None,
249 country: c.country,
250 annual_transaction_volume: TransactionVolume::Custom(
251 c.annual_transaction_volume,
252 ),
253 volume_weight: c.volume_weight as f64,
254 fiscal_year_variant: "K4".to_string(),
255 })
256 .collect()
257 };
258
259 let mut config = GeneratorConfig {
260 global: GlobalConfig {
261 seed: if p.seed > 0 { Some(p.seed) } else { None },
262 industry,
263 start_date: if p.start_date.is_empty() {
264 "2024-01-01".to_string()
265 } else {
266 p.start_date
267 },
268 period_months: if p.period_months == 0 {
269 12
270 } else {
271 p.period_months
272 },
273 group_currency: "USD".to_string(),
274 presentation_currency: None,
275 parallel: true,
276 worker_threads: 0,
277 memory_limit_mb: 0,
278 fiscal_year_months: None,
279 },
280 companies,
281 chart_of_accounts: ChartOfAccountsConfig {
282 complexity,
283 industry_specific: true,
284 custom_accounts: None,
285 min_hierarchy_depth: 2,
286 max_hierarchy_depth: 5,
287 expand_industry_subaccounts: false,
288 },
289 ..default_generator_config()
290 };
291
292 if p.fraud_enabled {
294 config.fraud.enabled = true;
295 config.fraud.fraud_rate = p.fraud_rate as f64;
296 }
297
298 Ok(config)
299 }
300 None => {
301 let config = self.state.config.read().await;
303 Ok(config.clone())
304 }
305 }
306 }
307
308 fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
310 JournalEntryProto {
311 document_id: entry.header.document_id.to_string(),
312 company_code: entry.header.company_code.clone(),
313 fiscal_year: entry.header.fiscal_year as u32,
314 fiscal_period: entry.header.fiscal_period as u32,
315 posting_date: entry.header.posting_date.to_string(),
316 document_date: entry.header.document_date.to_string(),
317 created_at: entry.header.created_at.to_rfc3339(),
318 source: format!("{:?}", entry.header.source),
319 business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
320 lines: entry
321 .lines
322 .iter()
323 .map(|line| {
324 let amount = if line.is_debit() {
325 line.debit_amount
326 } else {
327 line.credit_amount
328 };
329 JournalLineProto {
330 line_number: line.line_number,
331 account_number: line.gl_account.clone(),
332 account_name: line.account_description.clone().unwrap_or_default(),
333 amount: amount.to_string(),
334 is_debit: line.is_debit(),
335 cost_center: line.cost_center.clone(),
336 profit_center: line.profit_center.clone(),
337 vendor_id: line.auxiliary_account_number.clone(),
340 customer_id: None,
341 material_id: None,
342 text: line.line_text.clone().or_else(|| line.text.clone()),
343 }
344 })
345 .collect(),
346 is_anomaly: entry.header.is_fraud,
347 anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
348 }
349 }
350
351 fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
353 GenerationConfig {
354 industry: format!("{:?}", config.global.industry),
355 start_date: config.global.start_date.clone(),
356 period_months: config.global.period_months,
357 seed: config.global.seed.unwrap_or(0),
358 coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
359 companies: config
360 .companies
361 .iter()
362 .map(|c| CompanyConfigProto {
363 code: c.code.clone(),
364 name: c.name.clone(),
365 currency: c.currency.clone(),
366 country: c.country.clone(),
367 annual_transaction_volume: c.annual_transaction_volume.count(),
368 volume_weight: c.volume_weight as f32,
369 })
370 .collect(),
371 fraud_enabled: config.fraud.enabled,
372 fraud_rate: config.fraud.fraud_rate as f32,
373 generate_master_data: config.master_data.vendors.count > 0
374 || config.master_data.customers.count > 0
375 || config.master_data.materials.count > 0,
376 generate_document_flows: config.document_flows.p2p.enabled
377 || config.document_flows.o2c.enabled,
378 }
379 }
380}
381
382#[tonic::async_trait]
383impl synthetic_data_service_server::SyntheticDataService for SynthService {
384 async fn bulk_generate(
386 &self,
387 request: Request<BulkGenerateRequest>,
388 ) -> Result<Response<BulkGenerateResponse>, Status> {
389 let req = request.into_inner();
390
391 const MAX_ENTRY_COUNT: u64 = 1_000_000;
393 if req.entry_count > MAX_ENTRY_COUNT {
394 return Err(Status::invalid_argument(format!(
395 "entry_count ({}) exceeds maximum allowed value ({})",
396 req.entry_count, MAX_ENTRY_COUNT
397 )));
398 }
399
400 let degradation_level = self.state.check_resources()?;
402 if degradation_level != DegradationLevel::Normal {
403 warn!(
404 "Starting bulk generation under resource pressure (level: {:?})",
405 degradation_level
406 );
407 }
408
409 info!("Bulk generate request: {} entries", req.entry_count);
410
411 let config = self.proto_to_config(req.config).await?;
412 let start_time = Instant::now();
413
414 let phase_config = {
416 let mut pc = PhaseConfig::from_config(&config);
417 pc.generate_master_data = req.include_master_data;
418 pc.generate_document_flows = false;
419 pc.generate_journal_entries = true;
420 pc.inject_anomalies = req.inject_anomalies;
421 pc.show_progress = false;
422 pc
423 };
424
425 let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
426 .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
427
428 let result = orchestrator
429 .generate()
430 .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
431
432 let duration_ms = start_time.elapsed().as_millis() as u64;
433
434 let entries_count = result.journal_entries.len() as u64;
436 self.state
437 .total_entries
438 .fetch_add(entries_count, Ordering::Relaxed);
439
440 let anomaly_count = result.anomaly_labels.labels.len() as u64;
441 self.state
442 .total_anomalies
443 .fetch_add(anomaly_count, Ordering::Relaxed);
444
445 let journal_entries: Vec<JournalEntryProto> = result
447 .journal_entries
448 .iter()
449 .map(Self::journal_entry_to_proto)
450 .collect();
451
452 let anomaly_labels: Vec<AnomalyLabelProto> = result
453 .anomaly_labels
454 .labels
455 .iter()
456 .map(|a| AnomalyLabelProto {
457 anomaly_id: a.anomaly_id.clone(),
458 document_id: a.document_id.clone(),
459 anomaly_type: format!("{:?}", a.anomaly_type),
460 anomaly_category: a.document_type.clone(),
461 description: a.description.clone(),
462 severity_score: a.severity as f32,
463 })
464 .collect();
465
466 let mut total_debit = rust_decimal::Decimal::ZERO;
468 let mut total_credit = rust_decimal::Decimal::ZERO;
469 let mut total_lines = 0u64;
470 let mut entries_by_company = std::collections::HashMap::new();
471 let mut entries_by_source = std::collections::HashMap::new();
472
473 for entry in &result.journal_entries {
474 *entries_by_company
475 .entry(entry.header.company_code.clone())
476 .or_insert(0u64) += 1;
477 *entries_by_source
478 .entry(format!("{:?}", entry.header.source))
479 .or_insert(0u64) += 1;
480
481 for line in &entry.lines {
482 total_lines += 1;
483 total_debit += line.debit_amount;
484 total_credit += line.credit_amount;
485 }
486 }
487
488 let stats = GenerationStats {
489 total_entries: entries_count,
490 total_lines,
491 total_debit_amount: total_debit.to_string(),
492 total_credit_amount: total_credit.to_string(),
493 anomaly_count,
494 entries_by_company,
495 entries_by_source,
496 };
497
498 info!(
499 "Bulk generation complete: {} entries in {}ms",
500 entries_count, duration_ms
501 );
502
503 Ok(Response::new(BulkGenerateResponse {
504 entries_generated: entries_count,
505 duration_ms,
506 journal_entries,
507 anomaly_labels,
508 stats: Some(stats),
509 }))
510 }
511
512 type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
513
514 async fn stream_data(
516 &self,
517 request: Request<StreamDataRequest>,
518 ) -> Result<Response<Self::StreamDataStream>, Status> {
519 let req = request.into_inner();
520
521 const MIN_EVENTS_PER_SECOND: u32 = 1;
523 const MAX_EVENTS_PER_SECOND: u32 = 10_000;
524 if req.events_per_second < MIN_EVENTS_PER_SECOND {
525 return Err(Status::invalid_argument(format!(
526 "events_per_second ({}) must be at least {}",
527 req.events_per_second, MIN_EVENTS_PER_SECOND
528 )));
529 }
530 if req.events_per_second > MAX_EVENTS_PER_SECOND {
531 return Err(Status::invalid_argument(format!(
532 "events_per_second ({}) exceeds maximum allowed value ({})",
533 req.events_per_second, MAX_EVENTS_PER_SECOND
534 )));
535 }
536
537 const MAX_STREAM_EVENTS: u64 = 10_000_000;
539 if req.max_events > MAX_STREAM_EVENTS {
540 return Err(Status::invalid_argument(format!(
541 "max_events ({}) exceeds maximum allowed value ({})",
542 req.max_events, MAX_STREAM_EVENTS
543 )));
544 }
545
546 let degradation_level = self.state.check_resources()?;
548 if degradation_level != DegradationLevel::Normal {
549 warn!(
550 "Starting stream under resource pressure (level: {:?})",
551 degradation_level
552 );
553 }
554
555 info!(
556 "Stream data request: {} events/sec, max {}",
557 req.events_per_second, req.max_events
558 );
559
560 let config = self.proto_to_config(req.config).await?;
561 let state = self.state.clone();
562
563 state.active_streams.fetch_add(1, Ordering::Relaxed);
565
566 state.stream_paused.store(false, Ordering::Relaxed);
568 state.stream_stopped.store(false, Ordering::Relaxed);
569
570 let (tx, rx) = mpsc::channel(100);
571
572 let events_per_second = req.events_per_second;
574 let max_events = req.max_events;
575 let inject_anomalies = req.inject_anomalies;
576
577 tokio::spawn(async move {
578 let phase_config = {
579 let mut pc = PhaseConfig::from_config(&config);
580 pc.generate_master_data = false;
581 pc.generate_document_flows = false;
582 pc.generate_journal_entries = true;
583 pc.inject_anomalies = inject_anomalies;
584 pc.show_progress = false;
585 pc
586 };
587
588 let mut sequence = 0u64;
589 let delay = if events_per_second > 0 {
590 Duration::from_micros(1_000_000 / events_per_second as u64)
591 } else {
592 Duration::from_millis(1)
593 };
594
595 let mut orchestrator =
597 match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
598 Ok(o) => o,
599 Err(e) => {
600 error!("Failed to create orchestrator: {}", e);
601 return;
602 }
603 };
604
605 loop {
606 if state.stream_stopped.load(Ordering::Relaxed) {
608 info!("Stream stopped by control command");
609 break;
610 }
611
612 while state.stream_paused.load(Ordering::Relaxed) {
614 tokio::time::sleep(Duration::from_millis(100)).await;
615 if state.stream_stopped.load(Ordering::Relaxed) {
616 break;
617 }
618 }
619
620 if max_events > 0 && sequence >= max_events {
622 info!("Stream reached max events: {}", max_events);
623 break;
624 }
625
626 let result = match orchestrator.generate() {
628 Ok(r) => r,
629 Err(e) => {
630 error!("Generation failed: {}", e);
631 break;
632 }
633 };
634
635 for entry in result.journal_entries {
637 sequence += 1;
638 state.total_stream_events.fetch_add(1, Ordering::Relaxed);
639 state.total_entries.fetch_add(1, Ordering::Relaxed);
640
641 let timestamp = Timestamp {
642 seconds: Utc::now().timestamp(),
643 nanos: 0,
644 };
645
646 let event = DataEvent {
647 sequence,
648 timestamp: Some(timestamp),
649 event: Some(data_event::Event::JournalEntry(
650 SynthService::journal_entry_to_proto(&entry),
651 )),
652 };
653
654 if tx.send(Ok(event)).await.is_err() {
655 info!("Stream receiver dropped");
656 break;
657 }
658
659 tokio::time::sleep(delay).await;
661
662 if max_events > 0 && sequence >= max_events {
664 break;
665 }
666 }
667 }
668
669 state.active_streams.fetch_sub(1, Ordering::Relaxed);
671 });
672
673 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
674 }
675
676 async fn control(
678 &self,
679 request: Request<ControlCommand>,
680 ) -> Result<Response<ControlResponse>, Status> {
681 let cmd = request.into_inner();
682 let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
683
684 info!("Control command: {:?}", action);
685
686 let (success, message, status) = match action {
687 ControlAction::Pause => {
688 self.state.stream_paused.store(true, Ordering::Relaxed);
689 (true, "Stream paused".to_string(), StreamStatus::Paused)
690 }
691 ControlAction::Resume => {
692 self.state.stream_paused.store(false, Ordering::Relaxed);
693 (true, "Stream resumed".to_string(), StreamStatus::Running)
694 }
695 ControlAction::Stop => {
696 self.state.stream_stopped.store(true, Ordering::Relaxed);
697 (true, "Stream stopped".to_string(), StreamStatus::Stopped)
698 }
699 ControlAction::TriggerPattern => {
700 let pattern = cmd.pattern_name.unwrap_or_default();
701 if pattern.is_empty() {
702 (
703 false,
704 "Pattern name is required for TriggerPattern action".to_string(),
705 StreamStatus::Running,
706 )
707 } else {
708 let valid_patterns = [
711 "year_end_spike",
712 "period_end_spike",
713 "holiday_cluster",
714 "fraud_cluster",
715 "error_cluster",
716 "uniform",
717 ];
718 let is_valid = valid_patterns.contains(&pattern.as_str())
719 || pattern.starts_with("custom:");
720
721 if is_valid {
722 if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
724 *triggered = Some(pattern.clone());
725 }
726 info!("Pattern trigger activated: {}", pattern);
727 (
728 true,
729 format!("Pattern '{pattern}' will be applied to upcoming entries"),
730 StreamStatus::Running,
731 )
732 } else {
733 (
734 false,
735 format!(
736 "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
737 ),
738 StreamStatus::Running,
739 )
740 }
741 }
742 }
743 ControlAction::Unspecified => (
744 false,
745 "Unknown control action".to_string(),
746 StreamStatus::Unspecified,
747 ),
748 };
749
750 Ok(Response::new(ControlResponse {
751 success,
752 message,
753 current_status: status as i32,
754 }))
755 }
756
757 async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
759 let config = self.state.config.read().await;
760 let proto_config = Self::config_to_proto(&config);
761
762 Ok(Response::new(ConfigResponse {
763 success: true,
764 message: "Current configuration retrieved".to_string(),
765 current_config: Some(proto_config),
766 }))
767 }
768
769 async fn set_config(
771 &self,
772 request: Request<ConfigRequest>,
773 ) -> Result<Response<ConfigResponse>, Status> {
774 let req = request.into_inner();
775
776 if let Some(proto_config) = req.config {
777 let new_config = self.proto_to_config(Some(proto_config)).await?;
778
779 let mut config = self.state.config.write().await;
780 *config = new_config.clone();
781
782 info!("Configuration updated");
783
784 Ok(Response::new(ConfigResponse {
785 success: true,
786 message: "Configuration updated".to_string(),
787 current_config: Some(Self::config_to_proto(&new_config)),
788 }))
789 } else {
790 Err(Status::invalid_argument("No configuration provided"))
791 }
792 }
793
794 async fn get_metrics(
796 &self,
797 _request: Request<()>,
798 ) -> Result<Response<MetricsResponse>, Status> {
799 let uptime = self.state.uptime_seconds();
800 let total_entries = self.state.total_entries.load(Ordering::Relaxed);
801
802 let entries_per_second = if uptime > 0 {
803 total_entries as f64 / uptime as f64
804 } else {
805 0.0
806 };
807
808 Ok(Response::new(MetricsResponse {
809 total_entries_generated: total_entries,
810 total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
811 uptime_seconds: uptime,
812 session_entries: total_entries,
813 session_entries_per_second: entries_per_second,
814 active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
815 total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
816 }))
817 }
818
819 async fn health_check(
821 &self,
822 _request: Request<()>,
823 ) -> Result<Response<HealthResponse>, Status> {
824 Ok(Response::new(HealthResponse {
825 healthy: true,
826 version: env!("CARGO_PKG_VERSION").to_string(),
827 uptime_seconds: self.state.uptime_seconds(),
828 }))
829 }
830}
831
832pub fn default_generator_config() -> GeneratorConfig {
834 GeneratorConfig {
835 global: GlobalConfig {
836 seed: None,
837 industry: IndustrySector::Manufacturing,
838 start_date: "2024-01-01".to_string(),
839 period_months: 12,
840 group_currency: "USD".to_string(),
841 presentation_currency: None,
842 parallel: true,
843 worker_threads: 0,
844 memory_limit_mb: 0,
845 fiscal_year_months: None,
846 },
847 companies: vec![CompanyConfig {
848 code: "1000".to_string(),
849 name: "Default Company".to_string(),
850 currency: "USD".to_string(),
851 functional_currency: None,
852 country: "US".to_string(),
853 annual_transaction_volume: TransactionVolume::TenK,
854 volume_weight: 1.0,
855 fiscal_year_variant: "K4".to_string(),
856 }],
857 chart_of_accounts: ChartOfAccountsConfig {
858 complexity: CoAComplexity::Small,
859 industry_specific: true,
860 custom_accounts: None,
861 min_hierarchy_depth: 2,
862 max_hierarchy_depth: 5,
863 expand_industry_subaccounts: false,
864 },
865 transactions: Default::default(),
866 output: OutputConfig::default(),
867 fraud: Default::default(),
868 internal_controls: Default::default(),
869 business_processes: Default::default(),
870 user_personas: Default::default(),
871 templates: Default::default(),
872 approval: Default::default(),
873 departments: Default::default(),
874 master_data: Default::default(),
875 document_flows: Default::default(),
876 intercompany: Default::default(),
877 balance: Default::default(),
878 ocpm: Default::default(),
879 audit: Default::default(),
880 banking: Default::default(),
881 data_quality: Default::default(),
882 scenario: Default::default(),
883 temporal: Default::default(),
884 graph_export: Default::default(),
885 streaming: Default::default(),
886 rate_limit: Default::default(),
887 temporal_attributes: Default::default(),
888 relationships: Default::default(),
889 accounting_standards: Default::default(),
890 audit_standards: Default::default(),
891 distributions: Default::default(),
892 temporal_patterns: Default::default(),
893 vendor_network: Default::default(),
894 customer_segmentation: Default::default(),
895 relationship_strength: Default::default(),
896 cross_process_links: Default::default(),
897 organizational_events: Default::default(),
898 behavioral_drift: Default::default(),
899 market_drift: Default::default(),
900 drift_labeling: Default::default(),
901 anomaly_injection: Default::default(),
902 industry_specific: Default::default(),
903 fingerprint_privacy: Default::default(),
904 quality_gates: Default::default(),
905 compliance: Default::default(),
906 webhooks: Default::default(),
907 llm: Default::default(),
908 diffusion: Default::default(),
909 causal: Default::default(),
910 source_to_pay: Default::default(),
911 financial_reporting: Default::default(),
912 hr: Default::default(),
913 manufacturing: Default::default(),
914 sales_quotes: Default::default(),
915 tax: Default::default(),
916 treasury: Default::default(),
917 project_accounting: Default::default(),
918 esg: Default::default(),
919 country_packs: None,
920 scenarios: Default::default(),
921 session: Default::default(),
922 compliance_regulations: Default::default(),
923 analytics_metadata: Default::default(),
924 concentration: Default::default(),
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931 use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
932
933 #[tokio::test]
938 async fn test_service_creation() {
939 let config = default_generator_config();
940 let service = SynthService::new(config);
941 assert!(service.state.uptime_seconds() < 60);
943 }
944
945 #[tokio::test]
946 async fn test_service_with_state() {
947 let config = default_generator_config();
948 let state = Arc::new(ServerState::new(config));
949 let service = SynthService::with_state(Arc::clone(&state));
950
951 state.total_entries.store(100, Ordering::Relaxed);
953 assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
954 }
955
956 #[tokio::test]
961 async fn test_health_check() {
962 let config = default_generator_config();
963 let service = SynthService::new(config);
964
965 let response = service.health_check(Request::new(())).await.unwrap();
966 let health = response.into_inner();
967
968 assert!(health.healthy);
969 assert!(!health.version.is_empty());
970 }
971
972 #[tokio::test]
973 async fn test_health_check_returns_version() {
974 let config = default_generator_config();
975 let service = SynthService::new(config);
976
977 let response = service.health_check(Request::new(())).await.unwrap();
978 let health = response.into_inner();
979
980 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
981 }
982
983 #[tokio::test]
988 async fn test_get_config() {
989 let config = default_generator_config();
990 let service = SynthService::new(config);
991
992 let response = service.get_config(Request::new(())).await.unwrap();
993 let config_response = response.into_inner();
994
995 assert!(config_response.success);
996 assert!(config_response.current_config.is_some());
997 }
998
999 #[tokio::test]
1000 async fn test_get_config_returns_industry() {
1001 let config = default_generator_config();
1002 let service = SynthService::new(config);
1003
1004 let response = service.get_config(Request::new(())).await.unwrap();
1005 let config_response = response.into_inner();
1006 let current = config_response.current_config.unwrap();
1007
1008 assert_eq!(current.industry, "Manufacturing");
1009 }
1010
1011 #[tokio::test]
1012 async fn test_set_config() {
1013 let config = default_generator_config();
1014 let service = SynthService::new(config);
1015
1016 let new_config = GenerationConfig {
1017 industry: "retail".to_string(),
1018 start_date: "2024-06-01".to_string(),
1019 period_months: 6,
1020 seed: 42,
1021 coa_complexity: "medium".to_string(),
1022 companies: vec![],
1023 fraud_enabled: true,
1024 fraud_rate: 0.05,
1025 generate_master_data: false,
1026 generate_document_flows: false,
1027 };
1028
1029 let response = service
1030 .set_config(Request::new(ConfigRequest {
1031 config: Some(new_config),
1032 }))
1033 .await
1034 .unwrap();
1035 let config_response = response.into_inner();
1036
1037 assert!(config_response.success);
1038 }
1039
1040 #[tokio::test]
1041 async fn test_set_config_without_config_fails() {
1042 let config = default_generator_config();
1043 let service = SynthService::new(config);
1044
1045 let result = service
1046 .set_config(Request::new(ConfigRequest { config: None }))
1047 .await;
1048
1049 assert!(result.is_err());
1050 }
1051
1052 #[tokio::test]
1057 async fn test_get_metrics_initial() {
1058 let config = default_generator_config();
1059 let service = SynthService::new(config);
1060
1061 let response = service.get_metrics(Request::new(())).await.unwrap();
1062 let metrics = response.into_inner();
1063
1064 assert_eq!(metrics.total_entries_generated, 0);
1065 assert_eq!(metrics.total_anomalies_injected, 0);
1066 assert_eq!(metrics.active_streams, 0);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_get_metrics_after_updates() {
1071 let config = default_generator_config();
1072 let service = SynthService::new(config);
1073
1074 service.state.total_entries.store(1000, Ordering::Relaxed);
1076 service.state.total_anomalies.store(20, Ordering::Relaxed);
1077 service.state.active_streams.store(2, Ordering::Relaxed);
1078
1079 let response = service.get_metrics(Request::new(())).await.unwrap();
1080 let metrics = response.into_inner();
1081
1082 assert_eq!(metrics.total_entries_generated, 1000);
1083 assert_eq!(metrics.total_anomalies_injected, 20);
1084 assert_eq!(metrics.active_streams, 2);
1085 }
1086
1087 #[tokio::test]
1092 async fn test_control_pause() {
1093 let config = default_generator_config();
1094 let service = SynthService::new(config);
1095
1096 let response = service
1097 .control(Request::new(ControlCommand {
1098 action: ControlAction::Pause as i32,
1099 pattern_name: None,
1100 }))
1101 .await
1102 .unwrap();
1103 let control_response = response.into_inner();
1104
1105 assert!(control_response.success);
1106 assert!(service.state.stream_paused.load(Ordering::Relaxed));
1107 }
1108
1109 #[tokio::test]
1110 async fn test_control_resume() {
1111 let config = default_generator_config();
1112 let service = SynthService::new(config);
1113
1114 service.state.stream_paused.store(true, Ordering::Relaxed);
1116
1117 let response = service
1118 .control(Request::new(ControlCommand {
1119 action: ControlAction::Resume as i32,
1120 pattern_name: None,
1121 }))
1122 .await
1123 .unwrap();
1124 let control_response = response.into_inner();
1125
1126 assert!(control_response.success);
1127 assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1128 }
1129
1130 #[tokio::test]
1131 async fn test_control_stop() {
1132 let config = default_generator_config();
1133 let service = SynthService::new(config);
1134
1135 let response = service
1136 .control(Request::new(ControlCommand {
1137 action: ControlAction::Stop as i32,
1138 pattern_name: None,
1139 }))
1140 .await
1141 .unwrap();
1142 let control_response = response.into_inner();
1143
1144 assert!(control_response.success);
1145 assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1146 }
1147
1148 #[test]
1153 fn test_server_state_creation() {
1154 let config = default_generator_config();
1155 let state = ServerState::new(config);
1156
1157 assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1158 assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1159 assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1160 assert!(!state.stream_paused.load(Ordering::Relaxed));
1161 assert!(!state.stream_stopped.load(Ordering::Relaxed));
1162 }
1163
1164 #[test]
1165 fn test_server_state_uptime() {
1166 let config = default_generator_config();
1167 let state = ServerState::new(config);
1168
1169 assert!(state.uptime_seconds() < 60);
1171 }
1172
1173 #[test]
1178 fn test_default_generator_config() {
1179 let config = default_generator_config();
1180
1181 assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1182 assert_eq!(config.global.period_months, 12);
1183 assert!(!config.companies.is_empty());
1184 assert_eq!(config.companies[0].code, "1000");
1185 }
1186
1187 #[test]
1188 fn test_config_to_proto() {
1189 let config = default_generator_config();
1190 let proto = SynthService::config_to_proto(&config);
1191
1192 assert_eq!(proto.industry, "Manufacturing");
1193 assert_eq!(proto.period_months, 12);
1194 assert!(!proto.companies.is_empty());
1195 }
1196
1197 #[tokio::test]
1198 async fn test_proto_to_config_with_none() {
1199 let config = default_generator_config();
1200 let service = SynthService::new(config.clone());
1201
1202 let result = service.proto_to_config(None).await.unwrap();
1203
1204 assert_eq!(result.global.industry, config.global.industry);
1206 }
1207
1208 #[tokio::test]
1209 async fn test_proto_to_config_with_retail() {
1210 let config = default_generator_config();
1211 let service = SynthService::new(config);
1212
1213 let proto = GenerationConfig {
1214 industry: "retail".to_string(),
1215 start_date: "2024-01-01".to_string(),
1216 period_months: 6,
1217 seed: 0,
1218 coa_complexity: "large".to_string(),
1219 companies: vec![],
1220 fraud_enabled: false,
1221 fraud_rate: 0.0,
1222 generate_master_data: false,
1223 generate_document_flows: false,
1224 };
1225
1226 let result = service.proto_to_config(Some(proto)).await.unwrap();
1227
1228 assert_eq!(result.global.industry, IndustrySector::Retail);
1229 assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1230 }
1231
1232 #[tokio::test]
1233 async fn test_proto_to_config_with_healthcare() {
1234 let config = default_generator_config();
1235 let service = SynthService::new(config);
1236
1237 let proto = GenerationConfig {
1238 industry: "healthcare".to_string(),
1239 start_date: "2024-01-01".to_string(),
1240 period_months: 12,
1241 seed: 42,
1242 coa_complexity: "small".to_string(),
1243 companies: vec![],
1244 fraud_enabled: true,
1245 fraud_rate: 0.1,
1246 generate_master_data: true,
1247 generate_document_flows: true,
1248 };
1249
1250 let result = service.proto_to_config(Some(proto)).await.unwrap();
1251
1252 assert_eq!(result.global.industry, IndustrySector::Healthcare);
1253 assert_eq!(result.global.seed, Some(42));
1254 assert!(result.fraud.enabled);
1255 assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1256 }
1257
1258 #[tokio::test]
1259 async fn test_proto_to_config_with_companies() {
1260 let config = default_generator_config();
1261 let service = SynthService::new(config);
1262
1263 let proto = GenerationConfig {
1264 industry: "technology".to_string(),
1265 start_date: "2024-01-01".to_string(),
1266 period_months: 12,
1267 seed: 0,
1268 coa_complexity: "medium".to_string(),
1269 companies: vec![
1270 CompanyConfigProto {
1271 code: "1000".to_string(),
1272 name: "Parent Corp".to_string(),
1273 currency: "USD".to_string(),
1274 country: "US".to_string(),
1275 annual_transaction_volume: 100000,
1276 volume_weight: 1.0,
1277 },
1278 CompanyConfigProto {
1279 code: "2000".to_string(),
1280 name: "EU Sub".to_string(),
1281 currency: "EUR".to_string(),
1282 country: "DE".to_string(),
1283 annual_transaction_volume: 50000,
1284 volume_weight: 0.5,
1285 },
1286 ],
1287 fraud_enabled: false,
1288 fraud_rate: 0.0,
1289 generate_master_data: false,
1290 generate_document_flows: false,
1291 };
1292
1293 let result = service.proto_to_config(Some(proto)).await.unwrap();
1294
1295 assert_eq!(result.companies.len(), 2);
1296 assert_eq!(result.companies[0].code, "1000");
1297 assert_eq!(result.companies[1].currency, "EUR");
1298 }
1299
1300 #[tokio::test]
1305 async fn test_bulk_generate_entry_count_validation() {
1306 let config = default_generator_config();
1307 let service = SynthService::new(config);
1308
1309 let request = BulkGenerateRequest {
1310 entry_count: 2_000_000, include_master_data: false,
1312 inject_anomalies: false,
1313 output_format: 0,
1314 config: None,
1315 };
1316
1317 let result = service.bulk_generate(Request::new(request)).await;
1318 assert!(result.is_err());
1319 let err = result.err().unwrap();
1320 assert!(err.message().contains("exceeds maximum allowed value"));
1321 }
1322
1323 #[tokio::test]
1324 async fn test_stream_data_events_per_second_too_low() {
1325 let config = default_generator_config();
1326 let service = SynthService::new(config);
1327
1328 let request = StreamDataRequest {
1329 events_per_second: 0, max_events: 100,
1331 inject_anomalies: false,
1332 anomaly_rate: 0.0,
1333 config: None,
1334 };
1335
1336 let result = service.stream_data(Request::new(request)).await;
1337 assert!(result.is_err());
1338 let err = result.err().unwrap();
1339 assert!(err.message().contains("must be at least"));
1340 }
1341
1342 #[tokio::test]
1343 async fn test_stream_data_events_per_second_too_high() {
1344 let config = default_generator_config();
1345 let service = SynthService::new(config);
1346
1347 let request = StreamDataRequest {
1348 events_per_second: 20_000, max_events: 100,
1350 inject_anomalies: false,
1351 anomaly_rate: 0.0,
1352 config: None,
1353 };
1354
1355 let result = service.stream_data(Request::new(request)).await;
1356 assert!(result.is_err());
1357 let err = result.err().unwrap();
1358 assert!(err.message().contains("exceeds maximum allowed value"));
1359 }
1360
1361 #[tokio::test]
1362 async fn test_stream_data_max_events_too_high() {
1363 let config = default_generator_config();
1364 let service = SynthService::new(config);
1365
1366 let request = StreamDataRequest {
1367 events_per_second: 100,
1368 max_events: 100_000_000, inject_anomalies: false,
1370 anomaly_rate: 0.0,
1371 config: None,
1372 };
1373
1374 let result = service.stream_data(Request::new(request)).await;
1375 assert!(result.is_err());
1376 let err = result.err().unwrap();
1377 assert!(err.message().contains("max_events"));
1378 }
1379
1380 #[tokio::test]
1381 async fn test_stream_data_valid_request() {
1382 let config = default_generator_config();
1383 let service = SynthService::new(config);
1384
1385 let request = StreamDataRequest {
1386 events_per_second: 10,
1387 max_events: 5,
1388 inject_anomalies: false,
1389 anomaly_rate: 0.0,
1390 config: None,
1391 };
1392
1393 let result = service.stream_data(Request::new(request)).await;
1396 assert!(result.is_ok());
1397 }
1398}