Skip to main content

datasynth_server/grpc/
service.rs

1//! gRPC service implementation.
2
3use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17    ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18    TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26/// Server state for tracking metrics and configuration.
27pub struct ServerState {
28    /// Current configuration
29    pub config: RwLock<GeneratorConfig>,
30    /// Configuration source for reloading
31    pub config_source: RwLock<crate::config_loader::ConfigSource>,
32    /// Server start time
33    start_time: Instant,
34    /// Total entries generated
35    pub total_entries: AtomicU64,
36    /// Total anomalies injected
37    pub total_anomalies: AtomicU64,
38    /// Active streams count
39    pub active_streams: AtomicU64,
40    /// Total stream events
41    pub total_stream_events: AtomicU64,
42    /// Stream control flag
43    pub stream_paused: AtomicBool,
44    /// Stream stop flag
45    pub stream_stopped: AtomicBool,
46    /// Stream events per second (0 = unlimited)
47    pub stream_events_per_second: AtomicU64,
48    /// Stream maximum events (0 = unlimited)
49    pub stream_max_events: AtomicU64,
50    /// Stream anomaly injection flag
51    pub stream_inject_anomalies: AtomicBool,
52    /// Triggered pattern name (if any) - will be applied to next generated entries
53    pub triggered_pattern: RwLock<Option<String>>,
54    /// Resource guard for memory and disk monitoring
55    pub resource_guard: Arc<ResourceGuard>,
56    /// Maximum concurrent generations allowed
57    max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61    pub fn new(config: GeneratorConfig) -> Self {
62        // Build resource guard from config
63        let memory_limit = config.global.memory_limit_mb;
64        let resource_guard = if memory_limit > 0 {
65            ResourceGuardBuilder::new()
66                .memory_limit(memory_limit)
67                .conservative()
68                .build()
69        } else {
70            // Default: 2GB limit for server
71            ResourceGuardBuilder::new()
72                .memory_limit(2048)
73                .conservative()
74                .build()
75        };
76
77        Self {
78            config: RwLock::new(config),
79            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80            start_time: Instant::now(),
81            total_entries: AtomicU64::new(0),
82            total_anomalies: AtomicU64::new(0),
83            active_streams: AtomicU64::new(0),
84            total_stream_events: AtomicU64::new(0),
85            stream_paused: AtomicBool::new(false),
86            stream_stopped: AtomicBool::new(false),
87            stream_events_per_second: AtomicU64::new(0),
88            stream_max_events: AtomicU64::new(0),
89            stream_inject_anomalies: AtomicBool::new(false),
90            triggered_pattern: RwLock::new(None),
91            resource_guard: Arc::new(resource_guard),
92            max_concurrent_generations: AtomicU64::new(4),
93        }
94    }
95
96    /// Create with custom resource limits.
97    pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98        let resource_guard = ResourceGuardBuilder::new()
99            .memory_limit(memory_limit_mb)
100            .conservative()
101            .build();
102
103        Self {
104            config: RwLock::new(config),
105            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106            start_time: Instant::now(),
107            total_entries: AtomicU64::new(0),
108            total_anomalies: AtomicU64::new(0),
109            active_streams: AtomicU64::new(0),
110            total_stream_events: AtomicU64::new(0),
111            stream_paused: AtomicBool::new(false),
112            stream_stopped: AtomicBool::new(false),
113            stream_events_per_second: AtomicU64::new(0),
114            stream_max_events: AtomicU64::new(0),
115            stream_inject_anomalies: AtomicBool::new(false),
116            triggered_pattern: RwLock::new(None),
117            resource_guard: Arc::new(resource_guard),
118            max_concurrent_generations: AtomicU64::new(4),
119        }
120    }
121
122    pub fn uptime_seconds(&self) -> u64 {
123        self.start_time.elapsed().as_secs()
124    }
125
126    /// Check if resources are available for a new generation.
127    #[allow(clippy::result_large_err)] // tonic::Status is the idiomatic error type for gRPC
128    pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129        // Check if too many concurrent generations
130        let active = self.active_streams.load(Ordering::Relaxed);
131        let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132        if active >= max {
133            return Err(Status::resource_exhausted(format!(
134                "Too many concurrent generations ({}/{}). Try again later.",
135                active, max
136            )));
137        }
138
139        // Check memory and other resources
140        match self.resource_guard.check() {
141            Ok(level) => {
142                if level == DegradationLevel::Emergency {
143                    Err(Status::resource_exhausted(
144                        "Server resources critically low. Generation not possible.",
145                    ))
146                } else if level == DegradationLevel::Minimal {
147                    warn!("Resources constrained, generation may be limited");
148                    Ok(level)
149                } else {
150                    Ok(level)
151                }
152            }
153            Err(e) => Err(Status::resource_exhausted(format!(
154                "Resource check failed: {}",
155                e
156            ))),
157        }
158    }
159
160    /// Get current resource status for monitoring.
161    pub fn resource_status(&self) -> ResourceStatus {
162        let stats = self.resource_guard.stats();
163        ResourceStatus {
164            memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
165            memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
166            disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
167            degradation_level: stats.degradation_level.name().to_string(),
168            active_generations: self.active_streams.load(Ordering::Relaxed),
169        }
170    }
171}
172
173/// Resource status for monitoring endpoints.
174#[derive(Debug, Clone)]
175pub struct ResourceStatus {
176    pub memory_usage_mb: u64,
177    pub memory_peak_mb: u64,
178    pub disk_available_mb: u64,
179    pub degradation_level: String,
180    pub active_generations: u64,
181}
182
183/// Main gRPC service implementation.
184pub struct SynthService {
185    pub state: Arc<ServerState>,
186}
187
188impl SynthService {
189    pub fn new(config: GeneratorConfig) -> Self {
190        Self {
191            state: Arc::new(ServerState::new(config)),
192        }
193    }
194
195    pub fn with_state(state: Arc<ServerState>) -> Self {
196        Self { state }
197    }
198
199    /// Convert a GenerationConfig proto to GeneratorConfig.
200    async fn proto_to_config(
201        &self,
202        proto: Option<GenerationConfig>,
203    ) -> Result<GeneratorConfig, Status> {
204        match proto {
205            Some(p) => {
206                let industry = match p.industry.to_lowercase().as_str() {
207                    "manufacturing" => IndustrySector::Manufacturing,
208                    "retail" => IndustrySector::Retail,
209                    "financial_services" | "financial" => IndustrySector::FinancialServices,
210                    "healthcare" => IndustrySector::Healthcare,
211                    "technology" => IndustrySector::Technology,
212                    "" => IndustrySector::Manufacturing, // empty defaults to manufacturing
213                    other => {
214                        return Err(Status::invalid_argument(format!(
215                            "Unknown industry '{}'. Valid values: manufacturing, retail, financial_services, healthcare, technology",
216                            other
217                        )));
218                    }
219                };
220
221                let complexity = match p.coa_complexity.to_lowercase().as_str() {
222                    "small" => CoAComplexity::Small,
223                    "medium" => CoAComplexity::Medium,
224                    "large" => CoAComplexity::Large,
225                    "" => CoAComplexity::Small, // empty defaults to small
226                    other => {
227                        return Err(Status::invalid_argument(format!(
228                            "Unknown coa_complexity '{}'. Valid values: small, medium, large",
229                            other
230                        )));
231                    }
232                };
233
234                let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
235                    vec![CompanyConfig {
236                        code: "1000".to_string(),
237                        name: "Default Company".to_string(),
238                        currency: "USD".to_string(),
239                        country: "US".to_string(),
240                        annual_transaction_volume: TransactionVolume::TenK,
241                        volume_weight: 1.0,
242                        fiscal_year_variant: "K4".to_string(),
243                    }]
244                } else {
245                    p.companies
246                        .into_iter()
247                        .map(|c| CompanyConfig {
248                            code: c.code,
249                            name: c.name,
250                            currency: c.currency,
251                            country: c.country,
252                            annual_transaction_volume: TransactionVolume::Custom(
253                                c.annual_transaction_volume,
254                            ),
255                            volume_weight: c.volume_weight as f64,
256                            fiscal_year_variant: "K4".to_string(),
257                        })
258                        .collect()
259                };
260
261                let mut config = GeneratorConfig {
262                    global: GlobalConfig {
263                        seed: if p.seed > 0 { Some(p.seed) } else { None },
264                        industry,
265                        start_date: if p.start_date.is_empty() {
266                            "2024-01-01".to_string()
267                        } else {
268                            p.start_date
269                        },
270                        period_months: if p.period_months == 0 {
271                            12
272                        } else {
273                            p.period_months
274                        },
275                        group_currency: "USD".to_string(),
276                        parallel: true,
277                        worker_threads: 0,
278                        memory_limit_mb: 0,
279                    },
280                    companies,
281                    chart_of_accounts: ChartOfAccountsConfig {
282                        complexity,
283                        industry_specific: true,
284                        custom_accounts: None,
285                        min_hierarchy_depth: 2,
286                        max_hierarchy_depth: 5,
287                    },
288                    ..default_generator_config()
289                };
290
291                // Enable fraud if requested
292                if p.fraud_enabled {
293                    config.fraud.enabled = true;
294                    config.fraud.fraud_rate = p.fraud_rate as f64;
295                }
296
297                Ok(config)
298            }
299            None => {
300                // Use current server config
301                let config = self.state.config.read().await;
302                Ok(config.clone())
303            }
304        }
305    }
306
307    /// Convert a JournalEntry to proto format.
308    fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
309        JournalEntryProto {
310            document_id: entry.header.document_id.to_string(),
311            company_code: entry.header.company_code.clone(),
312            fiscal_year: entry.header.fiscal_year as u32,
313            fiscal_period: entry.header.fiscal_period as u32,
314            posting_date: entry.header.posting_date.to_string(),
315            document_date: entry.header.document_date.to_string(),
316            created_at: entry.header.created_at.to_rfc3339(),
317            source: format!("{:?}", entry.header.source),
318            business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
319            lines: entry
320                .lines
321                .iter()
322                .map(|line| {
323                    let amount = if line.is_debit() {
324                        line.debit_amount
325                    } else {
326                        line.credit_amount
327                    };
328                    JournalLineProto {
329                        line_number: line.line_number,
330                        account_number: line.gl_account.clone(),
331                        account_name: line.account_description.clone().unwrap_or_default(),
332                        amount: amount.to_string(),
333                        is_debit: line.is_debit(),
334                        cost_center: line.cost_center.clone(),
335                        profit_center: line.profit_center.clone(),
336                        // vendor_id/customer_id/material_id are not on JournalEntryLine;
337                        // populate from auxiliary_account_number when available (French GAAP)
338                        vendor_id: line.auxiliary_account_number.clone(),
339                        customer_id: None,
340                        material_id: None,
341                        text: line.line_text.clone().or_else(|| line.text.clone()),
342                    }
343                })
344                .collect(),
345            is_anomaly: entry.header.is_fraud,
346            anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
347        }
348    }
349
350    /// Convert current config to proto format.
351    fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
352        GenerationConfig {
353            industry: format!("{:?}", config.global.industry),
354            start_date: config.global.start_date.clone(),
355            period_months: config.global.period_months,
356            seed: config.global.seed.unwrap_or(0),
357            coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
358            companies: config
359                .companies
360                .iter()
361                .map(|c| CompanyConfigProto {
362                    code: c.code.clone(),
363                    name: c.name.clone(),
364                    currency: c.currency.clone(),
365                    country: c.country.clone(),
366                    annual_transaction_volume: c.annual_transaction_volume.count(),
367                    volume_weight: c.volume_weight as f32,
368                })
369                .collect(),
370            fraud_enabled: config.fraud.enabled,
371            fraud_rate: config.fraud.fraud_rate as f32,
372            generate_master_data: config.master_data.vendors.count > 0
373                || config.master_data.customers.count > 0
374                || config.master_data.materials.count > 0,
375            generate_document_flows: config.document_flows.p2p.enabled
376                || config.document_flows.o2c.enabled,
377        }
378    }
379}
380
381#[tonic::async_trait]
382impl synthetic_data_service_server::SyntheticDataService for SynthService {
383    /// Bulk generation - generates all data at once and returns.
384    async fn bulk_generate(
385        &self,
386        request: Request<BulkGenerateRequest>,
387    ) -> Result<Response<BulkGenerateResponse>, Status> {
388        let req = request.into_inner();
389
390        // Validate entry_count bounds
391        const MAX_ENTRY_COUNT: u64 = 1_000_000;
392        if req.entry_count > MAX_ENTRY_COUNT {
393            return Err(Status::invalid_argument(format!(
394                "entry_count ({}) exceeds maximum allowed value ({})",
395                req.entry_count, MAX_ENTRY_COUNT
396            )));
397        }
398
399        // Check resources before starting generation
400        let degradation_level = self.state.check_resources()?;
401        if degradation_level != DegradationLevel::Normal {
402            warn!(
403                "Starting bulk generation under resource pressure (level: {:?})",
404                degradation_level
405            );
406        }
407
408        info!("Bulk generate request: {} entries", req.entry_count);
409
410        let config = self.proto_to_config(req.config).await?;
411        let start_time = Instant::now();
412
413        // Create orchestrator with appropriate phase config
414        let phase_config = PhaseConfig {
415            generate_master_data: req.include_master_data,
416            generate_document_flows: false,
417            generate_journal_entries: true,
418            inject_anomalies: req.inject_anomalies,
419            show_progress: false,
420            ..Default::default()
421        };
422
423        let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
424            .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
425
426        let result = orchestrator
427            .generate()
428            .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
429
430        let duration_ms = start_time.elapsed().as_millis() as u64;
431
432        // Update metrics
433        let entries_count = result.journal_entries.len() as u64;
434        self.state
435            .total_entries
436            .fetch_add(entries_count, Ordering::Relaxed);
437
438        let anomaly_count = result.anomaly_labels.labels.len() as u64;
439        self.state
440            .total_anomalies
441            .fetch_add(anomaly_count, Ordering::Relaxed);
442
443        // Convert to proto
444        let journal_entries: Vec<JournalEntryProto> = result
445            .journal_entries
446            .iter()
447            .map(Self::journal_entry_to_proto)
448            .collect();
449
450        let anomaly_labels: Vec<AnomalyLabelProto> = result
451            .anomaly_labels
452            .labels
453            .iter()
454            .map(|a| AnomalyLabelProto {
455                anomaly_id: a.anomaly_id.clone(),
456                document_id: a.document_id.clone(),
457                anomaly_type: format!("{:?}", a.anomaly_type),
458                anomaly_category: a.document_type.clone(),
459                description: a.description.clone(),
460                severity_score: a.severity as f32,
461            })
462            .collect();
463
464        // Compute stats
465        let mut total_debit = rust_decimal::Decimal::ZERO;
466        let mut total_credit = rust_decimal::Decimal::ZERO;
467        let mut total_lines = 0u64;
468        let mut entries_by_company = std::collections::HashMap::new();
469        let mut entries_by_source = std::collections::HashMap::new();
470
471        for entry in &result.journal_entries {
472            *entries_by_company
473                .entry(entry.header.company_code.clone())
474                .or_insert(0u64) += 1;
475            *entries_by_source
476                .entry(format!("{:?}", entry.header.source))
477                .or_insert(0u64) += 1;
478
479            for line in &entry.lines {
480                total_lines += 1;
481                total_debit += line.debit_amount;
482                total_credit += line.credit_amount;
483            }
484        }
485
486        let stats = GenerationStats {
487            total_entries: entries_count,
488            total_lines,
489            total_debit_amount: total_debit.to_string(),
490            total_credit_amount: total_credit.to_string(),
491            anomaly_count,
492            entries_by_company,
493            entries_by_source,
494        };
495
496        info!(
497            "Bulk generation complete: {} entries in {}ms",
498            entries_count, duration_ms
499        );
500
501        Ok(Response::new(BulkGenerateResponse {
502            entries_generated: entries_count,
503            duration_ms,
504            journal_entries,
505            anomaly_labels,
506            stats: Some(stats),
507        }))
508    }
509
510    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
511
512    /// Streaming generation - continuously generates data events.
513    async fn stream_data(
514        &self,
515        request: Request<StreamDataRequest>,
516    ) -> Result<Response<Self::StreamDataStream>, Status> {
517        let req = request.into_inner();
518
519        // Validate events_per_second bounds
520        const MIN_EVENTS_PER_SECOND: u32 = 1;
521        const MAX_EVENTS_PER_SECOND: u32 = 10_000;
522        if req.events_per_second < MIN_EVENTS_PER_SECOND {
523            return Err(Status::invalid_argument(format!(
524                "events_per_second ({}) must be at least {}",
525                req.events_per_second, MIN_EVENTS_PER_SECOND
526            )));
527        }
528        if req.events_per_second > MAX_EVENTS_PER_SECOND {
529            return Err(Status::invalid_argument(format!(
530                "events_per_second ({}) exceeds maximum allowed value ({})",
531                req.events_per_second, MAX_EVENTS_PER_SECOND
532            )));
533        }
534
535        // Validate max_events if specified
536        const MAX_STREAM_EVENTS: u64 = 10_000_000;
537        if req.max_events > MAX_STREAM_EVENTS {
538            return Err(Status::invalid_argument(format!(
539                "max_events ({}) exceeds maximum allowed value ({})",
540                req.max_events, MAX_STREAM_EVENTS
541            )));
542        }
543
544        // Check resources before starting stream
545        let degradation_level = self.state.check_resources()?;
546        if degradation_level != DegradationLevel::Normal {
547            warn!(
548                "Starting stream under resource pressure (level: {:?})",
549                degradation_level
550            );
551        }
552
553        info!(
554            "Stream data request: {} events/sec, max {}",
555            req.events_per_second, req.max_events
556        );
557
558        let config = self.proto_to_config(req.config).await?;
559        let state = self.state.clone();
560
561        // Increment active streams
562        state.active_streams.fetch_add(1, Ordering::Relaxed);
563
564        // Reset control flags
565        state.stream_paused.store(false, Ordering::Relaxed);
566        state.stream_stopped.store(false, Ordering::Relaxed);
567
568        let (tx, rx) = mpsc::channel(100);
569
570        // Spawn background task to generate and stream data
571        let events_per_second = req.events_per_second;
572        let max_events = req.max_events;
573        let inject_anomalies = req.inject_anomalies;
574
575        tokio::spawn(async move {
576            let phase_config = PhaseConfig {
577                generate_master_data: false,
578                generate_document_flows: false,
579                generate_journal_entries: true,
580                inject_anomalies,
581                show_progress: false,
582                ..Default::default()
583            };
584
585            let mut sequence = 0u64;
586            let delay = if events_per_second > 0 {
587                Duration::from_micros(1_000_000 / events_per_second as u64)
588            } else {
589                Duration::from_millis(1)
590            };
591
592            // Create orchestrator once outside the loop to avoid per-iteration overhead
593            let mut orchestrator =
594                match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
595                    Ok(o) => o,
596                    Err(e) => {
597                        error!("Failed to create orchestrator: {}", e);
598                        return;
599                    }
600                };
601
602            loop {
603                // Check stop flag
604                if state.stream_stopped.load(Ordering::Relaxed) {
605                    info!("Stream stopped by control command");
606                    break;
607                }
608
609                // Check pause flag
610                while state.stream_paused.load(Ordering::Relaxed) {
611                    tokio::time::sleep(Duration::from_millis(100)).await;
612                    if state.stream_stopped.load(Ordering::Relaxed) {
613                        break;
614                    }
615                }
616
617                // Check max events
618                if max_events > 0 && sequence >= max_events {
619                    info!("Stream reached max events: {}", max_events);
620                    break;
621                }
622
623                // Generate a batch
624                let result = match orchestrator.generate() {
625                    Ok(r) => r,
626                    Err(e) => {
627                        error!("Generation failed: {}", e);
628                        break;
629                    }
630                };
631
632                // Stream each entry
633                for entry in result.journal_entries {
634                    sequence += 1;
635                    state.total_stream_events.fetch_add(1, Ordering::Relaxed);
636                    state.total_entries.fetch_add(1, Ordering::Relaxed);
637
638                    let timestamp = Timestamp {
639                        seconds: Utc::now().timestamp(),
640                        nanos: 0,
641                    };
642
643                    let event = DataEvent {
644                        sequence,
645                        timestamp: Some(timestamp),
646                        event: Some(data_event::Event::JournalEntry(
647                            SynthService::journal_entry_to_proto(&entry),
648                        )),
649                    };
650
651                    if tx.send(Ok(event)).await.is_err() {
652                        info!("Stream receiver dropped");
653                        break;
654                    }
655
656                    // Rate limiting
657                    tokio::time::sleep(delay).await;
658
659                    // Check max events
660                    if max_events > 0 && sequence >= max_events {
661                        break;
662                    }
663                }
664            }
665
666            // Decrement active streams
667            state.active_streams.fetch_sub(1, Ordering::Relaxed);
668        });
669
670        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
671    }
672
673    /// Control commands for streaming.
674    async fn control(
675        &self,
676        request: Request<ControlCommand>,
677    ) -> Result<Response<ControlResponse>, Status> {
678        let cmd = request.into_inner();
679        let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
680
681        info!("Control command: {:?}", action);
682
683        let (success, message, status) = match action {
684            ControlAction::Pause => {
685                self.state.stream_paused.store(true, Ordering::Relaxed);
686                (true, "Stream paused".to_string(), StreamStatus::Paused)
687            }
688            ControlAction::Resume => {
689                self.state.stream_paused.store(false, Ordering::Relaxed);
690                (true, "Stream resumed".to_string(), StreamStatus::Running)
691            }
692            ControlAction::Stop => {
693                self.state.stream_stopped.store(true, Ordering::Relaxed);
694                (true, "Stream stopped".to_string(), StreamStatus::Stopped)
695            }
696            ControlAction::TriggerPattern => {
697                let pattern = cmd.pattern_name.unwrap_or_default();
698                if pattern.is_empty() {
699                    (
700                        false,
701                        "Pattern name is required for TriggerPattern action".to_string(),
702                        StreamStatus::Running,
703                    )
704                } else {
705                    // Valid patterns: year_end_spike, period_end_spike, holiday_cluster,
706                    // fraud_cluster, error_cluster, or any custom pattern name
707                    let valid_patterns = [
708                        "year_end_spike",
709                        "period_end_spike",
710                        "holiday_cluster",
711                        "fraud_cluster",
712                        "error_cluster",
713                        "uniform",
714                    ];
715                    let is_valid = valid_patterns.contains(&pattern.as_str())
716                        || pattern.starts_with("custom:");
717
718                    if is_valid {
719                        // Store the pattern for the stream generator to pick up
720                        if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
721                            *triggered = Some(pattern.clone());
722                        }
723                        info!("Pattern trigger activated: {}", pattern);
724                        (
725                            true,
726                            format!("Pattern '{}' will be applied to upcoming entries", pattern),
727                            StreamStatus::Running,
728                        )
729                    } else {
730                        (
731                            false,
732                            format!(
733                                "Unknown pattern '{}'. Valid patterns: {:?}",
734                                pattern, valid_patterns
735                            ),
736                            StreamStatus::Running,
737                        )
738                    }
739                }
740            }
741            ControlAction::Unspecified => (
742                false,
743                "Unknown control action".to_string(),
744                StreamStatus::Unspecified,
745            ),
746        };
747
748        Ok(Response::new(ControlResponse {
749            success,
750            message,
751            current_status: status as i32,
752        }))
753    }
754
755    /// Get current configuration.
756    async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
757        let config = self.state.config.read().await;
758        let proto_config = Self::config_to_proto(&config);
759
760        Ok(Response::new(ConfigResponse {
761            success: true,
762            message: "Current configuration retrieved".to_string(),
763            current_config: Some(proto_config),
764        }))
765    }
766
767    /// Set configuration.
768    async fn set_config(
769        &self,
770        request: Request<ConfigRequest>,
771    ) -> Result<Response<ConfigResponse>, Status> {
772        let req = request.into_inner();
773
774        if let Some(proto_config) = req.config {
775            let new_config = self.proto_to_config(Some(proto_config)).await?;
776
777            let mut config = self.state.config.write().await;
778            *config = new_config.clone();
779
780            info!("Configuration updated");
781
782            Ok(Response::new(ConfigResponse {
783                success: true,
784                message: "Configuration updated".to_string(),
785                current_config: Some(Self::config_to_proto(&new_config)),
786            }))
787        } else {
788            Err(Status::invalid_argument("No configuration provided"))
789        }
790    }
791
792    /// Get server metrics.
793    async fn get_metrics(
794        &self,
795        _request: Request<()>,
796    ) -> Result<Response<MetricsResponse>, Status> {
797        let uptime = self.state.uptime_seconds();
798        let total_entries = self.state.total_entries.load(Ordering::Relaxed);
799
800        let entries_per_second = if uptime > 0 {
801            total_entries as f64 / uptime as f64
802        } else {
803            0.0
804        };
805
806        Ok(Response::new(MetricsResponse {
807            total_entries_generated: total_entries,
808            total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
809            uptime_seconds: uptime,
810            session_entries: total_entries,
811            session_entries_per_second: entries_per_second,
812            active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
813            total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
814        }))
815    }
816
817    /// Health check.
818    async fn health_check(
819        &self,
820        _request: Request<()>,
821    ) -> Result<Response<HealthResponse>, Status> {
822        Ok(Response::new(HealthResponse {
823            healthy: true,
824            version: env!("CARGO_PKG_VERSION").to_string(),
825            uptime_seconds: self.state.uptime_seconds(),
826        }))
827    }
828}
829
830/// Create a default GeneratorConfig.
831pub fn default_generator_config() -> GeneratorConfig {
832    GeneratorConfig {
833        global: GlobalConfig {
834            seed: None,
835            industry: IndustrySector::Manufacturing,
836            start_date: "2024-01-01".to_string(),
837            period_months: 12,
838            group_currency: "USD".to_string(),
839            parallel: true,
840            worker_threads: 0,
841            memory_limit_mb: 0,
842        },
843        companies: vec![CompanyConfig {
844            code: "1000".to_string(),
845            name: "Default Company".to_string(),
846            currency: "USD".to_string(),
847            country: "US".to_string(),
848            annual_transaction_volume: TransactionVolume::TenK,
849            volume_weight: 1.0,
850            fiscal_year_variant: "K4".to_string(),
851        }],
852        chart_of_accounts: ChartOfAccountsConfig {
853            complexity: CoAComplexity::Small,
854            industry_specific: true,
855            custom_accounts: None,
856            min_hierarchy_depth: 2,
857            max_hierarchy_depth: 5,
858        },
859        transactions: Default::default(),
860        output: OutputConfig::default(),
861        fraud: Default::default(),
862        internal_controls: Default::default(),
863        business_processes: Default::default(),
864        user_personas: Default::default(),
865        templates: Default::default(),
866        approval: Default::default(),
867        departments: Default::default(),
868        master_data: Default::default(),
869        document_flows: Default::default(),
870        intercompany: Default::default(),
871        balance: Default::default(),
872        ocpm: Default::default(),
873        audit: Default::default(),
874        banking: Default::default(),
875        data_quality: Default::default(),
876        scenario: Default::default(),
877        temporal: Default::default(),
878        graph_export: Default::default(),
879        streaming: Default::default(),
880        rate_limit: Default::default(),
881        temporal_attributes: Default::default(),
882        relationships: Default::default(),
883        accounting_standards: Default::default(),
884        audit_standards: Default::default(),
885        distributions: Default::default(),
886        temporal_patterns: Default::default(),
887        vendor_network: Default::default(),
888        customer_segmentation: Default::default(),
889        relationship_strength: Default::default(),
890        cross_process_links: Default::default(),
891        organizational_events: Default::default(),
892        behavioral_drift: Default::default(),
893        market_drift: Default::default(),
894        drift_labeling: Default::default(),
895        anomaly_injection: Default::default(),
896        industry_specific: Default::default(),
897        fingerprint_privacy: Default::default(),
898        quality_gates: Default::default(),
899        compliance: Default::default(),
900        webhooks: Default::default(),
901        llm: Default::default(),
902        diffusion: Default::default(),
903        causal: Default::default(),
904        source_to_pay: Default::default(),
905        financial_reporting: Default::default(),
906        hr: Default::default(),
907        manufacturing: Default::default(),
908        sales_quotes: Default::default(),
909        tax: Default::default(),
910        treasury: Default::default(),
911        project_accounting: Default::default(),
912        esg: Default::default(),
913        country_packs: None,
914    }
915}
916
917#[cfg(test)]
918#[allow(clippy::unwrap_used)]
919mod tests {
920    use super::*;
921    use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
922
923    // ==========================================================================
924    // Service Creation Tests
925    // ==========================================================================
926
927    #[tokio::test]
928    async fn test_service_creation() {
929        let config = default_generator_config();
930        let service = SynthService::new(config);
931        // Service should start with zero or very small uptime (test completes quickly)
932        assert!(service.state.uptime_seconds() < 60);
933    }
934
935    #[tokio::test]
936    async fn test_service_with_state() {
937        let config = default_generator_config();
938        let state = Arc::new(ServerState::new(config));
939        let service = SynthService::with_state(Arc::clone(&state));
940
941        // Should share the same state
942        state.total_entries.store(100, Ordering::Relaxed);
943        assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
944    }
945
946    // ==========================================================================
947    // Health Check Tests
948    // ==========================================================================
949
950    #[tokio::test]
951    async fn test_health_check() {
952        let config = default_generator_config();
953        let service = SynthService::new(config);
954
955        let response = service.health_check(Request::new(())).await.unwrap();
956        let health = response.into_inner();
957
958        assert!(health.healthy);
959        assert!(!health.version.is_empty());
960    }
961
962    #[tokio::test]
963    async fn test_health_check_returns_version() {
964        let config = default_generator_config();
965        let service = SynthService::new(config);
966
967        let response = service.health_check(Request::new(())).await.unwrap();
968        let health = response.into_inner();
969
970        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
971    }
972
973    // ==========================================================================
974    // Configuration Tests
975    // ==========================================================================
976
977    #[tokio::test]
978    async fn test_get_config() {
979        let config = default_generator_config();
980        let service = SynthService::new(config);
981
982        let response = service.get_config(Request::new(())).await.unwrap();
983        let config_response = response.into_inner();
984
985        assert!(config_response.success);
986        assert!(config_response.current_config.is_some());
987    }
988
989    #[tokio::test]
990    async fn test_get_config_returns_industry() {
991        let config = default_generator_config();
992        let service = SynthService::new(config);
993
994        let response = service.get_config(Request::new(())).await.unwrap();
995        let config_response = response.into_inner();
996        let current = config_response.current_config.unwrap();
997
998        assert_eq!(current.industry, "Manufacturing");
999    }
1000
1001    #[tokio::test]
1002    async fn test_set_config() {
1003        let config = default_generator_config();
1004        let service = SynthService::new(config);
1005
1006        let new_config = GenerationConfig {
1007            industry: "retail".to_string(),
1008            start_date: "2024-06-01".to_string(),
1009            period_months: 6,
1010            seed: 42,
1011            coa_complexity: "medium".to_string(),
1012            companies: vec![],
1013            fraud_enabled: true,
1014            fraud_rate: 0.05,
1015            generate_master_data: false,
1016            generate_document_flows: false,
1017        };
1018
1019        let response = service
1020            .set_config(Request::new(ConfigRequest {
1021                config: Some(new_config),
1022            }))
1023            .await
1024            .unwrap();
1025        let config_response = response.into_inner();
1026
1027        assert!(config_response.success);
1028    }
1029
1030    #[tokio::test]
1031    async fn test_set_config_without_config_fails() {
1032        let config = default_generator_config();
1033        let service = SynthService::new(config);
1034
1035        let result = service
1036            .set_config(Request::new(ConfigRequest { config: None }))
1037            .await;
1038
1039        assert!(result.is_err());
1040    }
1041
1042    // ==========================================================================
1043    // Metrics Tests
1044    // ==========================================================================
1045
1046    #[tokio::test]
1047    async fn test_get_metrics_initial() {
1048        let config = default_generator_config();
1049        let service = SynthService::new(config);
1050
1051        let response = service.get_metrics(Request::new(())).await.unwrap();
1052        let metrics = response.into_inner();
1053
1054        assert_eq!(metrics.total_entries_generated, 0);
1055        assert_eq!(metrics.total_anomalies_injected, 0);
1056        assert_eq!(metrics.active_streams, 0);
1057    }
1058
1059    #[tokio::test]
1060    async fn test_get_metrics_after_updates() {
1061        let config = default_generator_config();
1062        let service = SynthService::new(config);
1063
1064        // Simulate some activity
1065        service.state.total_entries.store(1000, Ordering::Relaxed);
1066        service.state.total_anomalies.store(20, Ordering::Relaxed);
1067        service.state.active_streams.store(2, Ordering::Relaxed);
1068
1069        let response = service.get_metrics(Request::new(())).await.unwrap();
1070        let metrics = response.into_inner();
1071
1072        assert_eq!(metrics.total_entries_generated, 1000);
1073        assert_eq!(metrics.total_anomalies_injected, 20);
1074        assert_eq!(metrics.active_streams, 2);
1075    }
1076
1077    // ==========================================================================
1078    // Control Tests
1079    // ==========================================================================
1080
1081    #[tokio::test]
1082    async fn test_control_pause() {
1083        let config = default_generator_config();
1084        let service = SynthService::new(config);
1085
1086        let response = service
1087            .control(Request::new(ControlCommand {
1088                action: ControlAction::Pause as i32,
1089                pattern_name: None,
1090            }))
1091            .await
1092            .unwrap();
1093        let control_response = response.into_inner();
1094
1095        assert!(control_response.success);
1096        assert!(service.state.stream_paused.load(Ordering::Relaxed));
1097    }
1098
1099    #[tokio::test]
1100    async fn test_control_resume() {
1101        let config = default_generator_config();
1102        let service = SynthService::new(config);
1103
1104        // First pause
1105        service.state.stream_paused.store(true, Ordering::Relaxed);
1106
1107        let response = service
1108            .control(Request::new(ControlCommand {
1109                action: ControlAction::Resume as i32,
1110                pattern_name: None,
1111            }))
1112            .await
1113            .unwrap();
1114        let control_response = response.into_inner();
1115
1116        assert!(control_response.success);
1117        assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1118    }
1119
1120    #[tokio::test]
1121    async fn test_control_stop() {
1122        let config = default_generator_config();
1123        let service = SynthService::new(config);
1124
1125        let response = service
1126            .control(Request::new(ControlCommand {
1127                action: ControlAction::Stop as i32,
1128                pattern_name: None,
1129            }))
1130            .await
1131            .unwrap();
1132        let control_response = response.into_inner();
1133
1134        assert!(control_response.success);
1135        assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1136    }
1137
1138    // ==========================================================================
1139    // ServerState Tests
1140    // ==========================================================================
1141
1142    #[test]
1143    fn test_server_state_creation() {
1144        let config = default_generator_config();
1145        let state = ServerState::new(config);
1146
1147        assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1148        assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1149        assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1150        assert!(!state.stream_paused.load(Ordering::Relaxed));
1151        assert!(!state.stream_stopped.load(Ordering::Relaxed));
1152    }
1153
1154    #[test]
1155    fn test_server_state_uptime() {
1156        let config = default_generator_config();
1157        let state = ServerState::new(config);
1158
1159        // Uptime should be small since we just created the state
1160        assert!(state.uptime_seconds() < 60);
1161    }
1162
1163    // ==========================================================================
1164    // Proto Conversion Tests
1165    // ==========================================================================
1166
1167    #[test]
1168    fn test_default_generator_config() {
1169        let config = default_generator_config();
1170
1171        assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1172        assert_eq!(config.global.period_months, 12);
1173        assert!(!config.companies.is_empty());
1174        assert_eq!(config.companies[0].code, "1000");
1175    }
1176
1177    #[test]
1178    fn test_config_to_proto() {
1179        let config = default_generator_config();
1180        let proto = SynthService::config_to_proto(&config);
1181
1182        assert_eq!(proto.industry, "Manufacturing");
1183        assert_eq!(proto.period_months, 12);
1184        assert!(!proto.companies.is_empty());
1185    }
1186
1187    #[tokio::test]
1188    async fn test_proto_to_config_with_none() {
1189        let config = default_generator_config();
1190        let service = SynthService::new(config.clone());
1191
1192        let result = service.proto_to_config(None).await.unwrap();
1193
1194        // Should return current config when None is passed
1195        assert_eq!(result.global.industry, config.global.industry);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_proto_to_config_with_retail() {
1200        let config = default_generator_config();
1201        let service = SynthService::new(config);
1202
1203        let proto = GenerationConfig {
1204            industry: "retail".to_string(),
1205            start_date: "2024-01-01".to_string(),
1206            period_months: 6,
1207            seed: 0,
1208            coa_complexity: "large".to_string(),
1209            companies: vec![],
1210            fraud_enabled: false,
1211            fraud_rate: 0.0,
1212            generate_master_data: false,
1213            generate_document_flows: false,
1214        };
1215
1216        let result = service.proto_to_config(Some(proto)).await.unwrap();
1217
1218        assert_eq!(result.global.industry, IndustrySector::Retail);
1219        assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_proto_to_config_with_healthcare() {
1224        let config = default_generator_config();
1225        let service = SynthService::new(config);
1226
1227        let proto = GenerationConfig {
1228            industry: "healthcare".to_string(),
1229            start_date: "2024-01-01".to_string(),
1230            period_months: 12,
1231            seed: 42,
1232            coa_complexity: "small".to_string(),
1233            companies: vec![],
1234            fraud_enabled: true,
1235            fraud_rate: 0.1,
1236            generate_master_data: true,
1237            generate_document_flows: true,
1238        };
1239
1240        let result = service.proto_to_config(Some(proto)).await.unwrap();
1241
1242        assert_eq!(result.global.industry, IndustrySector::Healthcare);
1243        assert_eq!(result.global.seed, Some(42));
1244        assert!(result.fraud.enabled);
1245        assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1246    }
1247
1248    #[tokio::test]
1249    async fn test_proto_to_config_with_companies() {
1250        let config = default_generator_config();
1251        let service = SynthService::new(config);
1252
1253        let proto = GenerationConfig {
1254            industry: "technology".to_string(),
1255            start_date: "2024-01-01".to_string(),
1256            period_months: 12,
1257            seed: 0,
1258            coa_complexity: "medium".to_string(),
1259            companies: vec![
1260                CompanyConfigProto {
1261                    code: "1000".to_string(),
1262                    name: "Parent Corp".to_string(),
1263                    currency: "USD".to_string(),
1264                    country: "US".to_string(),
1265                    annual_transaction_volume: 100000,
1266                    volume_weight: 1.0,
1267                },
1268                CompanyConfigProto {
1269                    code: "2000".to_string(),
1270                    name: "EU Sub".to_string(),
1271                    currency: "EUR".to_string(),
1272                    country: "DE".to_string(),
1273                    annual_transaction_volume: 50000,
1274                    volume_weight: 0.5,
1275                },
1276            ],
1277            fraud_enabled: false,
1278            fraud_rate: 0.0,
1279            generate_master_data: false,
1280            generate_document_flows: false,
1281        };
1282
1283        let result = service.proto_to_config(Some(proto)).await.unwrap();
1284
1285        assert_eq!(result.companies.len(), 2);
1286        assert_eq!(result.companies[0].code, "1000");
1287        assert_eq!(result.companies[1].currency, "EUR");
1288    }
1289
1290    // ==========================================================================
1291    // Input Validation Tests
1292    // ==========================================================================
1293
1294    #[tokio::test]
1295    async fn test_bulk_generate_entry_count_validation() {
1296        let config = default_generator_config();
1297        let service = SynthService::new(config);
1298
1299        let request = BulkGenerateRequest {
1300            entry_count: 2_000_000, // Exceeds MAX_ENTRY_COUNT
1301            include_master_data: false,
1302            inject_anomalies: false,
1303            output_format: 0,
1304            config: None,
1305        };
1306
1307        let result = service.bulk_generate(Request::new(request)).await;
1308        assert!(result.is_err());
1309        let err = result.err().unwrap();
1310        assert!(err.message().contains("exceeds maximum allowed value"));
1311    }
1312
1313    #[tokio::test]
1314    async fn test_stream_data_events_per_second_too_low() {
1315        let config = default_generator_config();
1316        let service = SynthService::new(config);
1317
1318        let request = StreamDataRequest {
1319            events_per_second: 0, // Below MIN_EVENTS_PER_SECOND
1320            max_events: 100,
1321            inject_anomalies: false,
1322            anomaly_rate: 0.0,
1323            config: None,
1324        };
1325
1326        let result = service.stream_data(Request::new(request)).await;
1327        assert!(result.is_err());
1328        let err = result.err().unwrap();
1329        assert!(err.message().contains("must be at least"));
1330    }
1331
1332    #[tokio::test]
1333    async fn test_stream_data_events_per_second_too_high() {
1334        let config = default_generator_config();
1335        let service = SynthService::new(config);
1336
1337        let request = StreamDataRequest {
1338            events_per_second: 20_000, // Exceeds MAX_EVENTS_PER_SECOND
1339            max_events: 100,
1340            inject_anomalies: false,
1341            anomaly_rate: 0.0,
1342            config: None,
1343        };
1344
1345        let result = service.stream_data(Request::new(request)).await;
1346        assert!(result.is_err());
1347        let err = result.err().unwrap();
1348        assert!(err.message().contains("exceeds maximum allowed value"));
1349    }
1350
1351    #[tokio::test]
1352    async fn test_stream_data_max_events_too_high() {
1353        let config = default_generator_config();
1354        let service = SynthService::new(config);
1355
1356        let request = StreamDataRequest {
1357            events_per_second: 100,
1358            max_events: 100_000_000, // Exceeds MAX_STREAM_EVENTS
1359            inject_anomalies: false,
1360            anomaly_rate: 0.0,
1361            config: None,
1362        };
1363
1364        let result = service.stream_data(Request::new(request)).await;
1365        assert!(result.is_err());
1366        let err = result.err().unwrap();
1367        assert!(err.message().contains("max_events"));
1368    }
1369
1370    #[tokio::test]
1371    async fn test_stream_data_valid_request() {
1372        let config = default_generator_config();
1373        let service = SynthService::new(config);
1374
1375        let request = StreamDataRequest {
1376            events_per_second: 10,
1377            max_events: 5,
1378            inject_anomalies: false,
1379            anomaly_rate: 0.0,
1380            config: None,
1381        };
1382
1383        // This should succeed - we can't easily test the stream output,
1384        // but we verify the request is accepted
1385        let result = service.stream_data(Request::new(request)).await;
1386        assert!(result.is_ok());
1387    }
1388}