Skip to main content

datasynth_server/grpc/
service.rs

1//! gRPC service implementation.
2
3use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17    ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18    TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26/// Server state for tracking metrics and configuration.
27pub struct ServerState {
28    /// Current configuration
29    pub config: RwLock<GeneratorConfig>,
30    /// Server start time
31    start_time: Instant,
32    /// Total entries generated
33    pub total_entries: AtomicU64,
34    /// Total anomalies injected
35    pub total_anomalies: AtomicU64,
36    /// Active streams count
37    pub active_streams: AtomicU64,
38    /// Total stream events
39    pub total_stream_events: AtomicU64,
40    /// Stream control flag
41    pub stream_paused: AtomicBool,
42    /// Stream stop flag
43    pub stream_stopped: AtomicBool,
44    /// Triggered pattern name (if any) - will be applied to next generated entries
45    pub triggered_pattern: RwLock<Option<String>>,
46    /// Resource guard for memory and disk monitoring
47    pub resource_guard: Arc<ResourceGuard>,
48    /// Maximum concurrent generations allowed
49    max_concurrent_generations: AtomicU64,
50}
51
52impl ServerState {
53    pub fn new(config: GeneratorConfig) -> Self {
54        // Build resource guard from config
55        let memory_limit = config.global.memory_limit_mb;
56        let resource_guard = if memory_limit > 0 {
57            ResourceGuardBuilder::new()
58                .memory_limit(memory_limit)
59                .conservative()
60                .build()
61        } else {
62            // Default: 2GB limit for server
63            ResourceGuardBuilder::new()
64                .memory_limit(2048)
65                .conservative()
66                .build()
67        };
68
69        Self {
70            config: RwLock::new(config),
71            start_time: Instant::now(),
72            total_entries: AtomicU64::new(0),
73            total_anomalies: AtomicU64::new(0),
74            active_streams: AtomicU64::new(0),
75            total_stream_events: AtomicU64::new(0),
76            stream_paused: AtomicBool::new(false),
77            stream_stopped: AtomicBool::new(false),
78            triggered_pattern: RwLock::new(None),
79            resource_guard: Arc::new(resource_guard),
80            max_concurrent_generations: AtomicU64::new(4),
81        }
82    }
83
84    /// Create with custom resource limits.
85    pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
86        let resource_guard = ResourceGuardBuilder::new()
87            .memory_limit(memory_limit_mb)
88            .conservative()
89            .build();
90
91        Self {
92            config: RwLock::new(config),
93            start_time: Instant::now(),
94            total_entries: AtomicU64::new(0),
95            total_anomalies: AtomicU64::new(0),
96            active_streams: AtomicU64::new(0),
97            total_stream_events: AtomicU64::new(0),
98            stream_paused: AtomicBool::new(false),
99            stream_stopped: AtomicBool::new(false),
100            triggered_pattern: RwLock::new(None),
101            resource_guard: Arc::new(resource_guard),
102            max_concurrent_generations: AtomicU64::new(4),
103        }
104    }
105
106    pub fn uptime_seconds(&self) -> u64 {
107        self.start_time.elapsed().as_secs()
108    }
109
110    /// Check if resources are available for a new generation.
111    #[allow(clippy::result_large_err)] // tonic::Status is the idiomatic error type for gRPC
112    pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
113        // Check if too many concurrent generations
114        let active = self.active_streams.load(Ordering::Relaxed);
115        let max = self.max_concurrent_generations.load(Ordering::Relaxed);
116        if active >= max {
117            return Err(Status::resource_exhausted(format!(
118                "Too many concurrent generations ({}/{}). Try again later.",
119                active, max
120            )));
121        }
122
123        // Check memory and other resources
124        match self.resource_guard.check() {
125            Ok(level) => {
126                if level == DegradationLevel::Emergency {
127                    Err(Status::resource_exhausted(
128                        "Server resources critically low. Generation not possible.",
129                    ))
130                } else if level == DegradationLevel::Minimal {
131                    warn!("Resources constrained, generation may be limited");
132                    Ok(level)
133                } else {
134                    Ok(level)
135                }
136            }
137            Err(e) => Err(Status::resource_exhausted(format!(
138                "Resource check failed: {}",
139                e
140            ))),
141        }
142    }
143
144    /// Get current resource status for monitoring.
145    pub fn resource_status(&self) -> ResourceStatus {
146        let stats = self.resource_guard.stats();
147        ResourceStatus {
148            memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
149            memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
150            disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
151            degradation_level: stats.degradation_level.name().to_string(),
152            active_generations: self.active_streams.load(Ordering::Relaxed),
153        }
154    }
155}
156
157/// Resource status for monitoring endpoints.
158#[derive(Debug, Clone)]
159pub struct ResourceStatus {
160    pub memory_usage_mb: u64,
161    pub memory_peak_mb: u64,
162    pub disk_available_mb: u64,
163    pub degradation_level: String,
164    pub active_generations: u64,
165}
166
167/// Main gRPC service implementation.
168pub struct SynthService {
169    pub state: Arc<ServerState>,
170}
171
172impl SynthService {
173    pub fn new(config: GeneratorConfig) -> Self {
174        Self {
175            state: Arc::new(ServerState::new(config)),
176        }
177    }
178
179    pub fn with_state(state: Arc<ServerState>) -> Self {
180        Self { state }
181    }
182
183    /// Convert a GenerationConfig proto to GeneratorConfig.
184    async fn proto_to_config(
185        &self,
186        proto: Option<GenerationConfig>,
187    ) -> Result<GeneratorConfig, Status> {
188        match proto {
189            Some(p) => {
190                let industry = match p.industry.to_lowercase().as_str() {
191                    "manufacturing" => IndustrySector::Manufacturing,
192                    "retail" => IndustrySector::Retail,
193                    "financial_services" | "financial" => IndustrySector::FinancialServices,
194                    "healthcare" => IndustrySector::Healthcare,
195                    "technology" => IndustrySector::Technology,
196                    _ => IndustrySector::Manufacturing,
197                };
198
199                let complexity = match p.coa_complexity.to_lowercase().as_str() {
200                    "small" => CoAComplexity::Small,
201                    "medium" => CoAComplexity::Medium,
202                    "large" => CoAComplexity::Large,
203                    _ => CoAComplexity::Small,
204                };
205
206                let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
207                    vec![CompanyConfig {
208                        code: "1000".to_string(),
209                        name: "Default Company".to_string(),
210                        currency: "USD".to_string(),
211                        country: "US".to_string(),
212                        annual_transaction_volume: TransactionVolume::TenK,
213                        volume_weight: 1.0,
214                        fiscal_year_variant: "K4".to_string(),
215                    }]
216                } else {
217                    p.companies
218                        .into_iter()
219                        .map(|c| CompanyConfig {
220                            code: c.code,
221                            name: c.name,
222                            currency: c.currency,
223                            country: c.country,
224                            annual_transaction_volume: TransactionVolume::Custom(
225                                c.annual_transaction_volume,
226                            ),
227                            volume_weight: c.volume_weight as f64,
228                            fiscal_year_variant: "K4".to_string(),
229                        })
230                        .collect()
231                };
232
233                let mut config = GeneratorConfig {
234                    global: GlobalConfig {
235                        seed: if p.seed > 0 { Some(p.seed) } else { None },
236                        industry,
237                        start_date: if p.start_date.is_empty() {
238                            "2024-01-01".to_string()
239                        } else {
240                            p.start_date
241                        },
242                        period_months: if p.period_months == 0 {
243                            12
244                        } else {
245                            p.period_months
246                        },
247                        group_currency: "USD".to_string(),
248                        parallel: true,
249                        worker_threads: 0,
250                        memory_limit_mb: 0,
251                    },
252                    companies,
253                    chart_of_accounts: ChartOfAccountsConfig {
254                        complexity,
255                        industry_specific: true,
256                        custom_accounts: None,
257                        min_hierarchy_depth: 2,
258                        max_hierarchy_depth: 5,
259                    },
260                    ..default_generator_config()
261                };
262
263                // Enable fraud if requested
264                if p.fraud_enabled {
265                    config.fraud.enabled = true;
266                    config.fraud.fraud_rate = p.fraud_rate as f64;
267                }
268
269                Ok(config)
270            }
271            None => {
272                // Use current server config
273                let config = self.state.config.read().await;
274                Ok(config.clone())
275            }
276        }
277    }
278
279    /// Convert a JournalEntry to proto format.
280    fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
281        JournalEntryProto {
282            document_id: entry.header.document_id.to_string(),
283            company_code: entry.header.company_code.clone(),
284            fiscal_year: entry.header.fiscal_year as u32,
285            fiscal_period: entry.header.fiscal_period as u32,
286            posting_date: entry.header.posting_date.to_string(),
287            document_date: entry.header.document_date.to_string(),
288            created_at: entry.header.created_at.to_rfc3339(),
289            source: format!("{:?}", entry.header.source),
290            business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
291            lines: entry
292                .lines
293                .iter()
294                .map(|line| {
295                    let amount = if line.is_debit() {
296                        line.debit_amount
297                    } else {
298                        line.credit_amount
299                    };
300                    JournalLineProto {
301                        line_number: line.line_number,
302                        account_number: line.gl_account.clone(),
303                        account_name: line.account_description.clone().unwrap_or_default(),
304                        amount: amount.to_string(),
305                        is_debit: line.is_debit(),
306                        cost_center: line.cost_center.clone(),
307                        profit_center: line.profit_center.clone(),
308                        vendor_id: None,
309                        customer_id: None,
310                        material_id: None,
311                        text: None,
312                    }
313                })
314                .collect(),
315            is_anomaly: entry.header.is_fraud,
316            anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
317        }
318    }
319
320    /// Convert current config to proto format.
321    fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
322        GenerationConfig {
323            industry: format!("{:?}", config.global.industry),
324            start_date: config.global.start_date.clone(),
325            period_months: config.global.period_months,
326            seed: config.global.seed.unwrap_or(0),
327            coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
328            companies: config
329                .companies
330                .iter()
331                .map(|c| CompanyConfigProto {
332                    code: c.code.clone(),
333                    name: c.name.clone(),
334                    currency: c.currency.clone(),
335                    country: c.country.clone(),
336                    annual_transaction_volume: c.annual_transaction_volume.count(),
337                    volume_weight: c.volume_weight as f32,
338                })
339                .collect(),
340            fraud_enabled: config.fraud.enabled,
341            fraud_rate: config.fraud.fraud_rate as f32,
342            generate_master_data: true,
343            generate_document_flows: true,
344        }
345    }
346}
347
348#[tonic::async_trait]
349impl synthetic_data_service_server::SyntheticDataService for SynthService {
350    /// Bulk generation - generates all data at once and returns.
351    async fn bulk_generate(
352        &self,
353        request: Request<BulkGenerateRequest>,
354    ) -> Result<Response<BulkGenerateResponse>, Status> {
355        let req = request.into_inner();
356
357        // Validate entry_count bounds
358        const MAX_ENTRY_COUNT: u64 = 1_000_000;
359        if req.entry_count > MAX_ENTRY_COUNT {
360            return Err(Status::invalid_argument(format!(
361                "entry_count ({}) exceeds maximum allowed value ({})",
362                req.entry_count, MAX_ENTRY_COUNT
363            )));
364        }
365
366        // Check resources before starting generation
367        let degradation_level = self.state.check_resources()?;
368        if degradation_level != DegradationLevel::Normal {
369            warn!(
370                "Starting bulk generation under resource pressure (level: {:?})",
371                degradation_level
372            );
373        }
374
375        info!("Bulk generate request: {} entries", req.entry_count);
376
377        let config = self.proto_to_config(req.config).await?;
378        let start_time = Instant::now();
379
380        // Create orchestrator with appropriate phase config
381        let phase_config = PhaseConfig {
382            generate_master_data: req.include_master_data,
383            generate_document_flows: false,
384            generate_journal_entries: true,
385            inject_anomalies: req.inject_anomalies,
386            show_progress: false,
387            ..Default::default()
388        };
389
390        let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
391            .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
392
393        let result = orchestrator
394            .generate()
395            .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
396
397        let duration_ms = start_time.elapsed().as_millis() as u64;
398
399        // Update metrics
400        let entries_count = result.journal_entries.len() as u64;
401        self.state
402            .total_entries
403            .fetch_add(entries_count, Ordering::Relaxed);
404
405        let anomaly_count = result.anomaly_labels.labels.len() as u64;
406        self.state
407            .total_anomalies
408            .fetch_add(anomaly_count, Ordering::Relaxed);
409
410        // Convert to proto
411        let journal_entries: Vec<JournalEntryProto> = result
412            .journal_entries
413            .iter()
414            .map(Self::journal_entry_to_proto)
415            .collect();
416
417        let anomaly_labels: Vec<AnomalyLabelProto> = result
418            .anomaly_labels
419            .labels
420            .iter()
421            .map(|a| AnomalyLabelProto {
422                anomaly_id: a.anomaly_id.clone(),
423                document_id: a.document_id.clone(),
424                anomaly_type: format!("{:?}", a.anomaly_type),
425                anomaly_category: a.document_type.clone(),
426                description: a.description.clone(),
427                severity_score: a.severity as f32,
428            })
429            .collect();
430
431        // Compute stats
432        let mut total_debit = rust_decimal::Decimal::ZERO;
433        let mut total_credit = rust_decimal::Decimal::ZERO;
434        let mut total_lines = 0u64;
435        let mut entries_by_company = std::collections::HashMap::new();
436        let mut entries_by_source = std::collections::HashMap::new();
437
438        for entry in &result.journal_entries {
439            *entries_by_company
440                .entry(entry.header.company_code.clone())
441                .or_insert(0u64) += 1;
442            *entries_by_source
443                .entry(format!("{:?}", entry.header.source))
444                .or_insert(0u64) += 1;
445
446            for line in &entry.lines {
447                total_lines += 1;
448                total_debit += line.debit_amount;
449                total_credit += line.credit_amount;
450            }
451        }
452
453        let stats = GenerationStats {
454            total_entries: entries_count,
455            total_lines,
456            total_debit_amount: total_debit.to_string(),
457            total_credit_amount: total_credit.to_string(),
458            anomaly_count,
459            entries_by_company,
460            entries_by_source,
461        };
462
463        info!(
464            "Bulk generation complete: {} entries in {}ms",
465            entries_count, duration_ms
466        );
467
468        Ok(Response::new(BulkGenerateResponse {
469            entries_generated: entries_count,
470            duration_ms,
471            journal_entries,
472            anomaly_labels,
473            stats: Some(stats),
474        }))
475    }
476
477    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
478
479    /// Streaming generation - continuously generates data events.
480    async fn stream_data(
481        &self,
482        request: Request<StreamDataRequest>,
483    ) -> Result<Response<Self::StreamDataStream>, Status> {
484        let req = request.into_inner();
485
486        // Validate events_per_second bounds
487        const MIN_EVENTS_PER_SECOND: u32 = 1;
488        const MAX_EVENTS_PER_SECOND: u32 = 10_000;
489        if req.events_per_second < MIN_EVENTS_PER_SECOND {
490            return Err(Status::invalid_argument(format!(
491                "events_per_second ({}) must be at least {}",
492                req.events_per_second, MIN_EVENTS_PER_SECOND
493            )));
494        }
495        if req.events_per_second > MAX_EVENTS_PER_SECOND {
496            return Err(Status::invalid_argument(format!(
497                "events_per_second ({}) exceeds maximum allowed value ({})",
498                req.events_per_second, MAX_EVENTS_PER_SECOND
499            )));
500        }
501
502        // Validate max_events if specified
503        const MAX_STREAM_EVENTS: u64 = 10_000_000;
504        if req.max_events > MAX_STREAM_EVENTS {
505            return Err(Status::invalid_argument(format!(
506                "max_events ({}) exceeds maximum allowed value ({})",
507                req.max_events, MAX_STREAM_EVENTS
508            )));
509        }
510
511        // Check resources before starting stream
512        let degradation_level = self.state.check_resources()?;
513        if degradation_level != DegradationLevel::Normal {
514            warn!(
515                "Starting stream under resource pressure (level: {:?})",
516                degradation_level
517            );
518        }
519
520        info!(
521            "Stream data request: {} events/sec, max {}",
522            req.events_per_second, req.max_events
523        );
524
525        let config = self.proto_to_config(req.config).await?;
526        let state = self.state.clone();
527
528        // Increment active streams
529        state.active_streams.fetch_add(1, Ordering::Relaxed);
530
531        // Reset control flags
532        state.stream_paused.store(false, Ordering::Relaxed);
533        state.stream_stopped.store(false, Ordering::Relaxed);
534
535        let (tx, rx) = mpsc::channel(100);
536
537        // Spawn background task to generate and stream data
538        let events_per_second = req.events_per_second;
539        let max_events = req.max_events;
540        let inject_anomalies = req.inject_anomalies;
541
542        tokio::spawn(async move {
543            let phase_config = PhaseConfig {
544                generate_master_data: false,
545                generate_document_flows: false,
546                generate_journal_entries: true,
547                inject_anomalies,
548                show_progress: false,
549                ..Default::default()
550            };
551
552            let mut sequence = 0u64;
553            let delay = if events_per_second > 0 {
554                Duration::from_micros(1_000_000 / events_per_second as u64)
555            } else {
556                Duration::from_millis(1)
557            };
558
559            loop {
560                // Check stop flag
561                if state.stream_stopped.load(Ordering::Relaxed) {
562                    info!("Stream stopped by control command");
563                    break;
564                }
565
566                // Check pause flag
567                while state.stream_paused.load(Ordering::Relaxed) {
568                    tokio::time::sleep(Duration::from_millis(100)).await;
569                    if state.stream_stopped.load(Ordering::Relaxed) {
570                        break;
571                    }
572                }
573
574                // Check max events
575                if max_events > 0 && sequence >= max_events {
576                    info!("Stream reached max events: {}", max_events);
577                    break;
578                }
579
580                // Generate a batch
581                let mut orchestrator =
582                    match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
583                        Ok(o) => o,
584                        Err(e) => {
585                            error!("Failed to create orchestrator: {}", e);
586                            break;
587                        }
588                    };
589
590                let result = match orchestrator.generate() {
591                    Ok(r) => r,
592                    Err(e) => {
593                        error!("Generation failed: {}", e);
594                        break;
595                    }
596                };
597
598                // Stream each entry
599                for entry in result.journal_entries {
600                    sequence += 1;
601                    state.total_stream_events.fetch_add(1, Ordering::Relaxed);
602                    state.total_entries.fetch_add(1, Ordering::Relaxed);
603
604                    let timestamp = Timestamp {
605                        seconds: Utc::now().timestamp(),
606                        nanos: 0,
607                    };
608
609                    let event = DataEvent {
610                        sequence,
611                        timestamp: Some(timestamp),
612                        event: Some(data_event::Event::JournalEntry(
613                            SynthService::journal_entry_to_proto(&entry),
614                        )),
615                    };
616
617                    if tx.send(Ok(event)).await.is_err() {
618                        info!("Stream receiver dropped");
619                        break;
620                    }
621
622                    // Rate limiting
623                    tokio::time::sleep(delay).await;
624
625                    // Check max events
626                    if max_events > 0 && sequence >= max_events {
627                        break;
628                    }
629                }
630            }
631
632            // Decrement active streams
633            state.active_streams.fetch_sub(1, Ordering::Relaxed);
634        });
635
636        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
637    }
638
639    /// Control commands for streaming.
640    async fn control(
641        &self,
642        request: Request<ControlCommand>,
643    ) -> Result<Response<ControlResponse>, Status> {
644        let cmd = request.into_inner();
645        let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
646
647        info!("Control command: {:?}", action);
648
649        let (success, message, status) = match action {
650            ControlAction::Pause => {
651                self.state.stream_paused.store(true, Ordering::Relaxed);
652                (true, "Stream paused".to_string(), StreamStatus::Paused)
653            }
654            ControlAction::Resume => {
655                self.state.stream_paused.store(false, Ordering::Relaxed);
656                (true, "Stream resumed".to_string(), StreamStatus::Running)
657            }
658            ControlAction::Stop => {
659                self.state.stream_stopped.store(true, Ordering::Relaxed);
660                (true, "Stream stopped".to_string(), StreamStatus::Stopped)
661            }
662            ControlAction::TriggerPattern => {
663                let pattern = cmd.pattern_name.unwrap_or_default();
664                if pattern.is_empty() {
665                    (
666                        false,
667                        "Pattern name is required for TriggerPattern action".to_string(),
668                        StreamStatus::Running,
669                    )
670                } else {
671                    // Valid patterns: year_end_spike, period_end_spike, holiday_cluster,
672                    // fraud_cluster, error_cluster, or any custom pattern name
673                    let valid_patterns = [
674                        "year_end_spike",
675                        "period_end_spike",
676                        "holiday_cluster",
677                        "fraud_cluster",
678                        "error_cluster",
679                        "uniform",
680                    ];
681                    let is_valid = valid_patterns.contains(&pattern.as_str())
682                        || pattern.starts_with("custom:");
683
684                    if is_valid {
685                        // Store the pattern for the stream generator to pick up
686                        if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
687                            *triggered = Some(pattern.clone());
688                        }
689                        info!("Pattern trigger activated: {}", pattern);
690                        (
691                            true,
692                            format!("Pattern '{}' will be applied to upcoming entries", pattern),
693                            StreamStatus::Running,
694                        )
695                    } else {
696                        (
697                            false,
698                            format!(
699                                "Unknown pattern '{}'. Valid patterns: {:?}",
700                                pattern, valid_patterns
701                            ),
702                            StreamStatus::Running,
703                        )
704                    }
705                }
706            }
707            ControlAction::Unspecified => (
708                false,
709                "Unknown control action".to_string(),
710                StreamStatus::Unspecified,
711            ),
712        };
713
714        Ok(Response::new(ControlResponse {
715            success,
716            message,
717            current_status: status as i32,
718        }))
719    }
720
721    /// Get current configuration.
722    async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
723        let config = self.state.config.read().await;
724        let proto_config = Self::config_to_proto(&config);
725
726        Ok(Response::new(ConfigResponse {
727            success: true,
728            message: "Current configuration retrieved".to_string(),
729            current_config: Some(proto_config),
730        }))
731    }
732
733    /// Set configuration.
734    async fn set_config(
735        &self,
736        request: Request<ConfigRequest>,
737    ) -> Result<Response<ConfigResponse>, Status> {
738        let req = request.into_inner();
739
740        if let Some(proto_config) = req.config {
741            let new_config = self.proto_to_config(Some(proto_config)).await?;
742
743            let mut config = self.state.config.write().await;
744            *config = new_config.clone();
745
746            info!("Configuration updated");
747
748            Ok(Response::new(ConfigResponse {
749                success: true,
750                message: "Configuration updated".to_string(),
751                current_config: Some(Self::config_to_proto(&new_config)),
752            }))
753        } else {
754            Err(Status::invalid_argument("No configuration provided"))
755        }
756    }
757
758    /// Get server metrics.
759    async fn get_metrics(
760        &self,
761        _request: Request<()>,
762    ) -> Result<Response<MetricsResponse>, Status> {
763        let uptime = self.state.uptime_seconds();
764        let total_entries = self.state.total_entries.load(Ordering::Relaxed);
765
766        let entries_per_second = if uptime > 0 {
767            total_entries as f64 / uptime as f64
768        } else {
769            0.0
770        };
771
772        Ok(Response::new(MetricsResponse {
773            total_entries_generated: total_entries,
774            total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
775            uptime_seconds: uptime,
776            session_entries: total_entries,
777            session_entries_per_second: entries_per_second,
778            active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
779            total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
780        }))
781    }
782
783    /// Health check.
784    async fn health_check(
785        &self,
786        _request: Request<()>,
787    ) -> Result<Response<HealthResponse>, Status> {
788        Ok(Response::new(HealthResponse {
789            healthy: true,
790            version: env!("CARGO_PKG_VERSION").to_string(),
791            uptime_seconds: self.state.uptime_seconds(),
792        }))
793    }
794}
795
796/// Create a default GeneratorConfig.
797pub fn default_generator_config() -> GeneratorConfig {
798    GeneratorConfig {
799        global: GlobalConfig {
800            seed: None,
801            industry: IndustrySector::Manufacturing,
802            start_date: "2024-01-01".to_string(),
803            period_months: 12,
804            group_currency: "USD".to_string(),
805            parallel: true,
806            worker_threads: 0,
807            memory_limit_mb: 0,
808        },
809        companies: vec![CompanyConfig {
810            code: "1000".to_string(),
811            name: "Default Company".to_string(),
812            currency: "USD".to_string(),
813            country: "US".to_string(),
814            annual_transaction_volume: TransactionVolume::TenK,
815            volume_weight: 1.0,
816            fiscal_year_variant: "K4".to_string(),
817        }],
818        chart_of_accounts: ChartOfAccountsConfig {
819            complexity: CoAComplexity::Small,
820            industry_specific: true,
821            custom_accounts: None,
822            min_hierarchy_depth: 2,
823            max_hierarchy_depth: 5,
824        },
825        transactions: Default::default(),
826        output: OutputConfig::default(),
827        fraud: Default::default(),
828        internal_controls: Default::default(),
829        business_processes: Default::default(),
830        user_personas: Default::default(),
831        templates: Default::default(),
832        approval: Default::default(),
833        departments: Default::default(),
834        master_data: Default::default(),
835        document_flows: Default::default(),
836        intercompany: Default::default(),
837        balance: Default::default(),
838        ocpm: Default::default(),
839        audit: Default::default(),
840        banking: Default::default(),
841        data_quality: Default::default(),
842        scenario: Default::default(),
843        temporal: Default::default(),
844        graph_export: Default::default(),
845        streaming: Default::default(),
846        rate_limit: Default::default(),
847        temporal_attributes: Default::default(),
848        relationships: Default::default(),
849        accounting_standards: Default::default(),
850        audit_standards: Default::default(),
851        distributions: Default::default(),
852        temporal_patterns: Default::default(),
853        vendor_network: Default::default(),
854        customer_segmentation: Default::default(),
855        relationship_strength: Default::default(),
856        cross_process_links: Default::default(),
857        organizational_events: Default::default(),
858        behavioral_drift: Default::default(),
859        market_drift: Default::default(),
860        drift_labeling: Default::default(),
861        anomaly_injection: Default::default(),
862        industry_specific: Default::default(),
863        fingerprint_privacy: Default::default(),
864        quality_gates: Default::default(),
865        compliance: Default::default(),
866        webhooks: Default::default(),
867        llm: Default::default(),
868        diffusion: Default::default(),
869        causal: Default::default(),
870        source_to_pay: Default::default(),
871        financial_reporting: Default::default(),
872        hr: Default::default(),
873        manufacturing: Default::default(),
874        sales_quotes: Default::default(),
875        tax: Default::default(),
876        treasury: Default::default(),
877        project_accounting: Default::default(),
878        esg: Default::default(),
879        country_packs: None,
880    }
881}
882
883#[cfg(test)]
884#[allow(clippy::unwrap_used)]
885mod tests {
886    use super::*;
887    use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
888
889    // ==========================================================================
890    // Service Creation Tests
891    // ==========================================================================
892
893    #[tokio::test]
894    async fn test_service_creation() {
895        let config = default_generator_config();
896        let service = SynthService::new(config);
897        // Service should start with zero or very small uptime (test completes quickly)
898        assert!(service.state.uptime_seconds() < 60);
899    }
900
901    #[tokio::test]
902    async fn test_service_with_state() {
903        let config = default_generator_config();
904        let state = Arc::new(ServerState::new(config));
905        let service = SynthService::with_state(Arc::clone(&state));
906
907        // Should share the same state
908        state.total_entries.store(100, Ordering::Relaxed);
909        assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
910    }
911
912    // ==========================================================================
913    // Health Check Tests
914    // ==========================================================================
915
916    #[tokio::test]
917    async fn test_health_check() {
918        let config = default_generator_config();
919        let service = SynthService::new(config);
920
921        let response = service.health_check(Request::new(())).await.unwrap();
922        let health = response.into_inner();
923
924        assert!(health.healthy);
925        assert!(!health.version.is_empty());
926    }
927
928    #[tokio::test]
929    async fn test_health_check_returns_version() {
930        let config = default_generator_config();
931        let service = SynthService::new(config);
932
933        let response = service.health_check(Request::new(())).await.unwrap();
934        let health = response.into_inner();
935
936        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
937    }
938
939    // ==========================================================================
940    // Configuration Tests
941    // ==========================================================================
942
943    #[tokio::test]
944    async fn test_get_config() {
945        let config = default_generator_config();
946        let service = SynthService::new(config);
947
948        let response = service.get_config(Request::new(())).await.unwrap();
949        let config_response = response.into_inner();
950
951        assert!(config_response.success);
952        assert!(config_response.current_config.is_some());
953    }
954
955    #[tokio::test]
956    async fn test_get_config_returns_industry() {
957        let config = default_generator_config();
958        let service = SynthService::new(config);
959
960        let response = service.get_config(Request::new(())).await.unwrap();
961        let config_response = response.into_inner();
962        let current = config_response.current_config.unwrap();
963
964        assert_eq!(current.industry, "Manufacturing");
965    }
966
967    #[tokio::test]
968    async fn test_set_config() {
969        let config = default_generator_config();
970        let service = SynthService::new(config);
971
972        let new_config = GenerationConfig {
973            industry: "retail".to_string(),
974            start_date: "2024-06-01".to_string(),
975            period_months: 6,
976            seed: 42,
977            coa_complexity: "medium".to_string(),
978            companies: vec![],
979            fraud_enabled: true,
980            fraud_rate: 0.05,
981            generate_master_data: false,
982            generate_document_flows: false,
983        };
984
985        let response = service
986            .set_config(Request::new(ConfigRequest {
987                config: Some(new_config),
988            }))
989            .await
990            .unwrap();
991        let config_response = response.into_inner();
992
993        assert!(config_response.success);
994    }
995
996    #[tokio::test]
997    async fn test_set_config_without_config_fails() {
998        let config = default_generator_config();
999        let service = SynthService::new(config);
1000
1001        let result = service
1002            .set_config(Request::new(ConfigRequest { config: None }))
1003            .await;
1004
1005        assert!(result.is_err());
1006    }
1007
1008    // ==========================================================================
1009    // Metrics Tests
1010    // ==========================================================================
1011
1012    #[tokio::test]
1013    async fn test_get_metrics_initial() {
1014        let config = default_generator_config();
1015        let service = SynthService::new(config);
1016
1017        let response = service.get_metrics(Request::new(())).await.unwrap();
1018        let metrics = response.into_inner();
1019
1020        assert_eq!(metrics.total_entries_generated, 0);
1021        assert_eq!(metrics.total_anomalies_injected, 0);
1022        assert_eq!(metrics.active_streams, 0);
1023    }
1024
1025    #[tokio::test]
1026    async fn test_get_metrics_after_updates() {
1027        let config = default_generator_config();
1028        let service = SynthService::new(config);
1029
1030        // Simulate some activity
1031        service.state.total_entries.store(1000, Ordering::Relaxed);
1032        service.state.total_anomalies.store(20, Ordering::Relaxed);
1033        service.state.active_streams.store(2, Ordering::Relaxed);
1034
1035        let response = service.get_metrics(Request::new(())).await.unwrap();
1036        let metrics = response.into_inner();
1037
1038        assert_eq!(metrics.total_entries_generated, 1000);
1039        assert_eq!(metrics.total_anomalies_injected, 20);
1040        assert_eq!(metrics.active_streams, 2);
1041    }
1042
1043    // ==========================================================================
1044    // Control Tests
1045    // ==========================================================================
1046
1047    #[tokio::test]
1048    async fn test_control_pause() {
1049        let config = default_generator_config();
1050        let service = SynthService::new(config);
1051
1052        let response = service
1053            .control(Request::new(ControlCommand {
1054                action: ControlAction::Pause as i32,
1055                pattern_name: None,
1056            }))
1057            .await
1058            .unwrap();
1059        let control_response = response.into_inner();
1060
1061        assert!(control_response.success);
1062        assert!(service.state.stream_paused.load(Ordering::Relaxed));
1063    }
1064
1065    #[tokio::test]
1066    async fn test_control_resume() {
1067        let config = default_generator_config();
1068        let service = SynthService::new(config);
1069
1070        // First pause
1071        service.state.stream_paused.store(true, Ordering::Relaxed);
1072
1073        let response = service
1074            .control(Request::new(ControlCommand {
1075                action: ControlAction::Resume as i32,
1076                pattern_name: None,
1077            }))
1078            .await
1079            .unwrap();
1080        let control_response = response.into_inner();
1081
1082        assert!(control_response.success);
1083        assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1084    }
1085
1086    #[tokio::test]
1087    async fn test_control_stop() {
1088        let config = default_generator_config();
1089        let service = SynthService::new(config);
1090
1091        let response = service
1092            .control(Request::new(ControlCommand {
1093                action: ControlAction::Stop as i32,
1094                pattern_name: None,
1095            }))
1096            .await
1097            .unwrap();
1098        let control_response = response.into_inner();
1099
1100        assert!(control_response.success);
1101        assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1102    }
1103
1104    // ==========================================================================
1105    // ServerState Tests
1106    // ==========================================================================
1107
1108    #[test]
1109    fn test_server_state_creation() {
1110        let config = default_generator_config();
1111        let state = ServerState::new(config);
1112
1113        assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1114        assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1115        assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1116        assert!(!state.stream_paused.load(Ordering::Relaxed));
1117        assert!(!state.stream_stopped.load(Ordering::Relaxed));
1118    }
1119
1120    #[test]
1121    fn test_server_state_uptime() {
1122        let config = default_generator_config();
1123        let state = ServerState::new(config);
1124
1125        // Uptime should be small since we just created the state
1126        assert!(state.uptime_seconds() < 60);
1127    }
1128
1129    // ==========================================================================
1130    // Proto Conversion Tests
1131    // ==========================================================================
1132
1133    #[test]
1134    fn test_default_generator_config() {
1135        let config = default_generator_config();
1136
1137        assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1138        assert_eq!(config.global.period_months, 12);
1139        assert!(!config.companies.is_empty());
1140        assert_eq!(config.companies[0].code, "1000");
1141    }
1142
1143    #[test]
1144    fn test_config_to_proto() {
1145        let config = default_generator_config();
1146        let proto = SynthService::config_to_proto(&config);
1147
1148        assert_eq!(proto.industry, "Manufacturing");
1149        assert_eq!(proto.period_months, 12);
1150        assert!(!proto.companies.is_empty());
1151    }
1152
1153    #[tokio::test]
1154    async fn test_proto_to_config_with_none() {
1155        let config = default_generator_config();
1156        let service = SynthService::new(config.clone());
1157
1158        let result = service.proto_to_config(None).await.unwrap();
1159
1160        // Should return current config when None is passed
1161        assert_eq!(result.global.industry, config.global.industry);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_proto_to_config_with_retail() {
1166        let config = default_generator_config();
1167        let service = SynthService::new(config);
1168
1169        let proto = GenerationConfig {
1170            industry: "retail".to_string(),
1171            start_date: "2024-01-01".to_string(),
1172            period_months: 6,
1173            seed: 0,
1174            coa_complexity: "large".to_string(),
1175            companies: vec![],
1176            fraud_enabled: false,
1177            fraud_rate: 0.0,
1178            generate_master_data: false,
1179            generate_document_flows: false,
1180        };
1181
1182        let result = service.proto_to_config(Some(proto)).await.unwrap();
1183
1184        assert_eq!(result.global.industry, IndustrySector::Retail);
1185        assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1186    }
1187
1188    #[tokio::test]
1189    async fn test_proto_to_config_with_healthcare() {
1190        let config = default_generator_config();
1191        let service = SynthService::new(config);
1192
1193        let proto = GenerationConfig {
1194            industry: "healthcare".to_string(),
1195            start_date: "2024-01-01".to_string(),
1196            period_months: 12,
1197            seed: 42,
1198            coa_complexity: "small".to_string(),
1199            companies: vec![],
1200            fraud_enabled: true,
1201            fraud_rate: 0.1,
1202            generate_master_data: true,
1203            generate_document_flows: true,
1204        };
1205
1206        let result = service.proto_to_config(Some(proto)).await.unwrap();
1207
1208        assert_eq!(result.global.industry, IndustrySector::Healthcare);
1209        assert_eq!(result.global.seed, Some(42));
1210        assert!(result.fraud.enabled);
1211        assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_proto_to_config_with_companies() {
1216        let config = default_generator_config();
1217        let service = SynthService::new(config);
1218
1219        let proto = GenerationConfig {
1220            industry: "technology".to_string(),
1221            start_date: "2024-01-01".to_string(),
1222            period_months: 12,
1223            seed: 0,
1224            coa_complexity: "medium".to_string(),
1225            companies: vec![
1226                CompanyConfigProto {
1227                    code: "1000".to_string(),
1228                    name: "Parent Corp".to_string(),
1229                    currency: "USD".to_string(),
1230                    country: "US".to_string(),
1231                    annual_transaction_volume: 100000,
1232                    volume_weight: 1.0,
1233                },
1234                CompanyConfigProto {
1235                    code: "2000".to_string(),
1236                    name: "EU Sub".to_string(),
1237                    currency: "EUR".to_string(),
1238                    country: "DE".to_string(),
1239                    annual_transaction_volume: 50000,
1240                    volume_weight: 0.5,
1241                },
1242            ],
1243            fraud_enabled: false,
1244            fraud_rate: 0.0,
1245            generate_master_data: false,
1246            generate_document_flows: false,
1247        };
1248
1249        let result = service.proto_to_config(Some(proto)).await.unwrap();
1250
1251        assert_eq!(result.companies.len(), 2);
1252        assert_eq!(result.companies[0].code, "1000");
1253        assert_eq!(result.companies[1].currency, "EUR");
1254    }
1255
1256    // ==========================================================================
1257    // Input Validation Tests
1258    // ==========================================================================
1259
1260    #[tokio::test]
1261    async fn test_bulk_generate_entry_count_validation() {
1262        let config = default_generator_config();
1263        let service = SynthService::new(config);
1264
1265        let request = BulkGenerateRequest {
1266            entry_count: 2_000_000, // Exceeds MAX_ENTRY_COUNT
1267            include_master_data: false,
1268            inject_anomalies: false,
1269            output_format: 0,
1270            config: None,
1271        };
1272
1273        let result = service.bulk_generate(Request::new(request)).await;
1274        assert!(result.is_err());
1275        let err = result.err().unwrap();
1276        assert!(err.message().contains("exceeds maximum allowed value"));
1277    }
1278
1279    #[tokio::test]
1280    async fn test_stream_data_events_per_second_too_low() {
1281        let config = default_generator_config();
1282        let service = SynthService::new(config);
1283
1284        let request = StreamDataRequest {
1285            events_per_second: 0, // Below MIN_EVENTS_PER_SECOND
1286            max_events: 100,
1287            inject_anomalies: false,
1288            anomaly_rate: 0.0,
1289            config: None,
1290        };
1291
1292        let result = service.stream_data(Request::new(request)).await;
1293        assert!(result.is_err());
1294        let err = result.err().unwrap();
1295        assert!(err.message().contains("must be at least"));
1296    }
1297
1298    #[tokio::test]
1299    async fn test_stream_data_events_per_second_too_high() {
1300        let config = default_generator_config();
1301        let service = SynthService::new(config);
1302
1303        let request = StreamDataRequest {
1304            events_per_second: 20_000, // Exceeds MAX_EVENTS_PER_SECOND
1305            max_events: 100,
1306            inject_anomalies: false,
1307            anomaly_rate: 0.0,
1308            config: None,
1309        };
1310
1311        let result = service.stream_data(Request::new(request)).await;
1312        assert!(result.is_err());
1313        let err = result.err().unwrap();
1314        assert!(err.message().contains("exceeds maximum allowed value"));
1315    }
1316
1317    #[tokio::test]
1318    async fn test_stream_data_max_events_too_high() {
1319        let config = default_generator_config();
1320        let service = SynthService::new(config);
1321
1322        let request = StreamDataRequest {
1323            events_per_second: 100,
1324            max_events: 100_000_000, // Exceeds MAX_STREAM_EVENTS
1325            inject_anomalies: false,
1326            anomaly_rate: 0.0,
1327            config: None,
1328        };
1329
1330        let result = service.stream_data(Request::new(request)).await;
1331        assert!(result.is_err());
1332        let err = result.err().unwrap();
1333        assert!(err.message().contains("max_events"));
1334    }
1335
1336    #[tokio::test]
1337    async fn test_stream_data_valid_request() {
1338        let config = default_generator_config();
1339        let service = SynthService::new(config);
1340
1341        let request = StreamDataRequest {
1342            events_per_second: 10,
1343            max_events: 5,
1344            inject_anomalies: false,
1345            anomaly_rate: 0.0,
1346            config: None,
1347        };
1348
1349        // This should succeed - we can't easily test the stream output,
1350        // but we verify the request is accepted
1351        let result = service.stream_data(Request::new(request)).await;
1352        assert!(result.is_ok());
1353    }
1354}