Skip to main content

datasynth_server/grpc/
service.rs

1//! gRPC service implementation.
2
3use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17    ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18    TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26/// Server state for tracking metrics and configuration.
27pub struct ServerState {
28    /// Current configuration
29    pub config: RwLock<GeneratorConfig>,
30    /// Configuration source for reloading
31    pub config_source: RwLock<crate::config_loader::ConfigSource>,
32    /// Server start time
33    start_time: Instant,
34    /// Total entries generated
35    pub total_entries: AtomicU64,
36    /// Total anomalies injected
37    pub total_anomalies: AtomicU64,
38    /// Active streams count
39    pub active_streams: AtomicU64,
40    /// Total stream events
41    pub total_stream_events: AtomicU64,
42    /// Stream control flag
43    pub stream_paused: AtomicBool,
44    /// Stream stop flag
45    pub stream_stopped: AtomicBool,
46    /// Stream events per second (0 = unlimited)
47    pub stream_events_per_second: AtomicU64,
48    /// Stream maximum events (0 = unlimited)
49    pub stream_max_events: AtomicU64,
50    /// Stream anomaly injection flag
51    pub stream_inject_anomalies: AtomicBool,
52    /// Triggered pattern name (if any) - will be applied to next generated entries
53    pub triggered_pattern: RwLock<Option<String>>,
54    /// Resource guard for memory and disk monitoring
55    pub resource_guard: Arc<ResourceGuard>,
56    /// Maximum concurrent generations allowed
57    max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61    pub fn new(config: GeneratorConfig) -> Self {
62        // Build resource guard from config
63        let memory_limit = config.global.memory_limit_mb;
64        let resource_guard = if memory_limit > 0 {
65            ResourceGuardBuilder::new()
66                .memory_limit(memory_limit)
67                .conservative()
68                .build()
69        } else {
70            // Default: 2GB limit for server
71            ResourceGuardBuilder::new()
72                .memory_limit(2048)
73                .conservative()
74                .build()
75        };
76
77        Self {
78            config: RwLock::new(config),
79            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80            start_time: Instant::now(),
81            total_entries: AtomicU64::new(0),
82            total_anomalies: AtomicU64::new(0),
83            active_streams: AtomicU64::new(0),
84            total_stream_events: AtomicU64::new(0),
85            stream_paused: AtomicBool::new(false),
86            stream_stopped: AtomicBool::new(false),
87            stream_events_per_second: AtomicU64::new(0),
88            stream_max_events: AtomicU64::new(0),
89            stream_inject_anomalies: AtomicBool::new(false),
90            triggered_pattern: RwLock::new(None),
91            resource_guard: Arc::new(resource_guard),
92            max_concurrent_generations: AtomicU64::new(4),
93        }
94    }
95
96    /// Create with custom resource limits.
97    pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98        let resource_guard = ResourceGuardBuilder::new()
99            .memory_limit(memory_limit_mb)
100            .conservative()
101            .build();
102
103        Self {
104            config: RwLock::new(config),
105            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106            start_time: Instant::now(),
107            total_entries: AtomicU64::new(0),
108            total_anomalies: AtomicU64::new(0),
109            active_streams: AtomicU64::new(0),
110            total_stream_events: AtomicU64::new(0),
111            stream_paused: AtomicBool::new(false),
112            stream_stopped: AtomicBool::new(false),
113            stream_events_per_second: AtomicU64::new(0),
114            stream_max_events: AtomicU64::new(0),
115            stream_inject_anomalies: AtomicBool::new(false),
116            triggered_pattern: RwLock::new(None),
117            resource_guard: Arc::new(resource_guard),
118            max_concurrent_generations: AtomicU64::new(4),
119        }
120    }
121
122    pub fn uptime_seconds(&self) -> u64 {
123        self.start_time.elapsed().as_secs()
124    }
125
126    /// Check if resources are available for a new generation.
127    #[allow(clippy::result_large_err)] // tonic::Status is the idiomatic error type for gRPC
128    pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129        // Check if too many concurrent generations
130        let active = self.active_streams.load(Ordering::Relaxed);
131        let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132        if active >= max {
133            return Err(Status::resource_exhausted(format!(
134                "Too many concurrent generations ({}/{}). Try again later.",
135                active, max
136            )));
137        }
138
139        // Check memory and other resources
140        match self.resource_guard.check() {
141            Ok(level) => {
142                if level == DegradationLevel::Emergency {
143                    Err(Status::resource_exhausted(
144                        "Server resources critically low. Generation not possible.",
145                    ))
146                } else if level == DegradationLevel::Minimal {
147                    warn!("Resources constrained, generation may be limited");
148                    Ok(level)
149                } else {
150                    Ok(level)
151                }
152            }
153            Err(e) => Err(Status::resource_exhausted(format!(
154                "Resource check failed: {}",
155                e
156            ))),
157        }
158    }
159
160    /// Get current resource status for monitoring.
161    pub fn resource_status(&self) -> ResourceStatus {
162        let stats = self.resource_guard.stats();
163        ResourceStatus {
164            memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
165            memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
166            disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
167            degradation_level: stats.degradation_level.name().to_string(),
168            active_generations: self.active_streams.load(Ordering::Relaxed),
169        }
170    }
171}
172
173/// Resource status for monitoring endpoints.
174#[derive(Debug, Clone)]
175pub struct ResourceStatus {
176    pub memory_usage_mb: u64,
177    pub memory_peak_mb: u64,
178    pub disk_available_mb: u64,
179    pub degradation_level: String,
180    pub active_generations: u64,
181}
182
183/// Main gRPC service implementation.
184pub struct SynthService {
185    pub state: Arc<ServerState>,
186}
187
188impl SynthService {
189    pub fn new(config: GeneratorConfig) -> Self {
190        Self {
191            state: Arc::new(ServerState::new(config)),
192        }
193    }
194
195    pub fn with_state(state: Arc<ServerState>) -> Self {
196        Self { state }
197    }
198
199    /// Convert a GenerationConfig proto to GeneratorConfig.
200    async fn proto_to_config(
201        &self,
202        proto: Option<GenerationConfig>,
203    ) -> Result<GeneratorConfig, Status> {
204        match proto {
205            Some(p) => {
206                let industry = match p.industry.to_lowercase().as_str() {
207                    "manufacturing" => IndustrySector::Manufacturing,
208                    "retail" => IndustrySector::Retail,
209                    "financial_services" | "financial" => IndustrySector::FinancialServices,
210                    "healthcare" => IndustrySector::Healthcare,
211                    "technology" => IndustrySector::Technology,
212                    "" => IndustrySector::Manufacturing, // empty defaults to manufacturing
213                    other => {
214                        return Err(Status::invalid_argument(format!(
215                            "Unknown industry '{}'. Valid values: manufacturing, retail, financial_services, healthcare, technology",
216                            other
217                        )));
218                    }
219                };
220
221                let complexity = match p.coa_complexity.to_lowercase().as_str() {
222                    "small" => CoAComplexity::Small,
223                    "medium" => CoAComplexity::Medium,
224                    "large" => CoAComplexity::Large,
225                    "" => CoAComplexity::Small, // empty defaults to small
226                    other => {
227                        return Err(Status::invalid_argument(format!(
228                            "Unknown coa_complexity '{}'. Valid values: small, medium, large",
229                            other
230                        )));
231                    }
232                };
233
234                let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
235                    vec![CompanyConfig {
236                        code: "1000".to_string(),
237                        name: "Default Company".to_string(),
238                        currency: "USD".to_string(),
239                        country: "US".to_string(),
240                        annual_transaction_volume: TransactionVolume::TenK,
241                        volume_weight: 1.0,
242                        fiscal_year_variant: "K4".to_string(),
243                    }]
244                } else {
245                    p.companies
246                        .into_iter()
247                        .map(|c| CompanyConfig {
248                            code: c.code,
249                            name: c.name,
250                            currency: c.currency,
251                            country: c.country,
252                            annual_transaction_volume: TransactionVolume::Custom(
253                                c.annual_transaction_volume,
254                            ),
255                            volume_weight: c.volume_weight as f64,
256                            fiscal_year_variant: "K4".to_string(),
257                        })
258                        .collect()
259                };
260
261                let mut config = GeneratorConfig {
262                    global: GlobalConfig {
263                        seed: if p.seed > 0 { Some(p.seed) } else { None },
264                        industry,
265                        start_date: if p.start_date.is_empty() {
266                            "2024-01-01".to_string()
267                        } else {
268                            p.start_date
269                        },
270                        period_months: if p.period_months == 0 {
271                            12
272                        } else {
273                            p.period_months
274                        },
275                        group_currency: "USD".to_string(),
276                        parallel: true,
277                        worker_threads: 0,
278                        memory_limit_mb: 0,
279                        fiscal_year_months: None,
280                    },
281                    companies,
282                    chart_of_accounts: ChartOfAccountsConfig {
283                        complexity,
284                        industry_specific: true,
285                        custom_accounts: None,
286                        min_hierarchy_depth: 2,
287                        max_hierarchy_depth: 5,
288                    },
289                    ..default_generator_config()
290                };
291
292                // Enable fraud if requested
293                if p.fraud_enabled {
294                    config.fraud.enabled = true;
295                    config.fraud.fraud_rate = p.fraud_rate as f64;
296                }
297
298                Ok(config)
299            }
300            None => {
301                // Use current server config
302                let config = self.state.config.read().await;
303                Ok(config.clone())
304            }
305        }
306    }
307
308    /// Convert a JournalEntry to proto format.
309    fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
310        JournalEntryProto {
311            document_id: entry.header.document_id.to_string(),
312            company_code: entry.header.company_code.clone(),
313            fiscal_year: entry.header.fiscal_year as u32,
314            fiscal_period: entry.header.fiscal_period as u32,
315            posting_date: entry.header.posting_date.to_string(),
316            document_date: entry.header.document_date.to_string(),
317            created_at: entry.header.created_at.to_rfc3339(),
318            source: format!("{:?}", entry.header.source),
319            business_process: entry.header.business_process.map(|bp| format!("{:?}", bp)),
320            lines: entry
321                .lines
322                .iter()
323                .map(|line| {
324                    let amount = if line.is_debit() {
325                        line.debit_amount
326                    } else {
327                        line.credit_amount
328                    };
329                    JournalLineProto {
330                        line_number: line.line_number,
331                        account_number: line.gl_account.clone(),
332                        account_name: line.account_description.clone().unwrap_or_default(),
333                        amount: amount.to_string(),
334                        is_debit: line.is_debit(),
335                        cost_center: line.cost_center.clone(),
336                        profit_center: line.profit_center.clone(),
337                        // vendor_id/customer_id/material_id are not on JournalEntryLine;
338                        // populate from auxiliary_account_number when available (French GAAP)
339                        vendor_id: line.auxiliary_account_number.clone(),
340                        customer_id: None,
341                        material_id: None,
342                        text: line.line_text.clone().or_else(|| line.text.clone()),
343                    }
344                })
345                .collect(),
346            is_anomaly: entry.header.is_fraud,
347            anomaly_type: entry.header.fraud_type.map(|ft| format!("{:?}", ft)),
348        }
349    }
350
351    /// Convert current config to proto format.
352    fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
353        GenerationConfig {
354            industry: format!("{:?}", config.global.industry),
355            start_date: config.global.start_date.clone(),
356            period_months: config.global.period_months,
357            seed: config.global.seed.unwrap_or(0),
358            coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
359            companies: config
360                .companies
361                .iter()
362                .map(|c| CompanyConfigProto {
363                    code: c.code.clone(),
364                    name: c.name.clone(),
365                    currency: c.currency.clone(),
366                    country: c.country.clone(),
367                    annual_transaction_volume: c.annual_transaction_volume.count(),
368                    volume_weight: c.volume_weight as f32,
369                })
370                .collect(),
371            fraud_enabled: config.fraud.enabled,
372            fraud_rate: config.fraud.fraud_rate as f32,
373            generate_master_data: config.master_data.vendors.count > 0
374                || config.master_data.customers.count > 0
375                || config.master_data.materials.count > 0,
376            generate_document_flows: config.document_flows.p2p.enabled
377                || config.document_flows.o2c.enabled,
378        }
379    }
380}
381
382#[tonic::async_trait]
383impl synthetic_data_service_server::SyntheticDataService for SynthService {
384    /// Bulk generation - generates all data at once and returns.
385    async fn bulk_generate(
386        &self,
387        request: Request<BulkGenerateRequest>,
388    ) -> Result<Response<BulkGenerateResponse>, Status> {
389        let req = request.into_inner();
390
391        // Validate entry_count bounds
392        const MAX_ENTRY_COUNT: u64 = 1_000_000;
393        if req.entry_count > MAX_ENTRY_COUNT {
394            return Err(Status::invalid_argument(format!(
395                "entry_count ({}) exceeds maximum allowed value ({})",
396                req.entry_count, MAX_ENTRY_COUNT
397            )));
398        }
399
400        // Check resources before starting generation
401        let degradation_level = self.state.check_resources()?;
402        if degradation_level != DegradationLevel::Normal {
403            warn!(
404                "Starting bulk generation under resource pressure (level: {:?})",
405                degradation_level
406            );
407        }
408
409        info!("Bulk generate request: {} entries", req.entry_count);
410
411        let config = self.proto_to_config(req.config).await?;
412        let start_time = Instant::now();
413
414        // Create orchestrator with appropriate phase config
415        let phase_config = PhaseConfig {
416            generate_master_data: req.include_master_data,
417            generate_document_flows: false,
418            generate_journal_entries: true,
419            inject_anomalies: req.inject_anomalies,
420            show_progress: false,
421            ..Default::default()
422        };
423
424        let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
425            .map_err(|e| Status::internal(format!("Failed to create orchestrator: {}", e)))?;
426
427        let result = orchestrator
428            .generate()
429            .map_err(|e| Status::internal(format!("Generation failed: {}", e)))?;
430
431        let duration_ms = start_time.elapsed().as_millis() as u64;
432
433        // Update metrics
434        let entries_count = result.journal_entries.len() as u64;
435        self.state
436            .total_entries
437            .fetch_add(entries_count, Ordering::Relaxed);
438
439        let anomaly_count = result.anomaly_labels.labels.len() as u64;
440        self.state
441            .total_anomalies
442            .fetch_add(anomaly_count, Ordering::Relaxed);
443
444        // Convert to proto
445        let journal_entries: Vec<JournalEntryProto> = result
446            .journal_entries
447            .iter()
448            .map(Self::journal_entry_to_proto)
449            .collect();
450
451        let anomaly_labels: Vec<AnomalyLabelProto> = result
452            .anomaly_labels
453            .labels
454            .iter()
455            .map(|a| AnomalyLabelProto {
456                anomaly_id: a.anomaly_id.clone(),
457                document_id: a.document_id.clone(),
458                anomaly_type: format!("{:?}", a.anomaly_type),
459                anomaly_category: a.document_type.clone(),
460                description: a.description.clone(),
461                severity_score: a.severity as f32,
462            })
463            .collect();
464
465        // Compute stats
466        let mut total_debit = rust_decimal::Decimal::ZERO;
467        let mut total_credit = rust_decimal::Decimal::ZERO;
468        let mut total_lines = 0u64;
469        let mut entries_by_company = std::collections::HashMap::new();
470        let mut entries_by_source = std::collections::HashMap::new();
471
472        for entry in &result.journal_entries {
473            *entries_by_company
474                .entry(entry.header.company_code.clone())
475                .or_insert(0u64) += 1;
476            *entries_by_source
477                .entry(format!("{:?}", entry.header.source))
478                .or_insert(0u64) += 1;
479
480            for line in &entry.lines {
481                total_lines += 1;
482                total_debit += line.debit_amount;
483                total_credit += line.credit_amount;
484            }
485        }
486
487        let stats = GenerationStats {
488            total_entries: entries_count,
489            total_lines,
490            total_debit_amount: total_debit.to_string(),
491            total_credit_amount: total_credit.to_string(),
492            anomaly_count,
493            entries_by_company,
494            entries_by_source,
495        };
496
497        info!(
498            "Bulk generation complete: {} entries in {}ms",
499            entries_count, duration_ms
500        );
501
502        Ok(Response::new(BulkGenerateResponse {
503            entries_generated: entries_count,
504            duration_ms,
505            journal_entries,
506            anomaly_labels,
507            stats: Some(stats),
508        }))
509    }
510
511    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
512
513    /// Streaming generation - continuously generates data events.
514    async fn stream_data(
515        &self,
516        request: Request<StreamDataRequest>,
517    ) -> Result<Response<Self::StreamDataStream>, Status> {
518        let req = request.into_inner();
519
520        // Validate events_per_second bounds
521        const MIN_EVENTS_PER_SECOND: u32 = 1;
522        const MAX_EVENTS_PER_SECOND: u32 = 10_000;
523        if req.events_per_second < MIN_EVENTS_PER_SECOND {
524            return Err(Status::invalid_argument(format!(
525                "events_per_second ({}) must be at least {}",
526                req.events_per_second, MIN_EVENTS_PER_SECOND
527            )));
528        }
529        if req.events_per_second > MAX_EVENTS_PER_SECOND {
530            return Err(Status::invalid_argument(format!(
531                "events_per_second ({}) exceeds maximum allowed value ({})",
532                req.events_per_second, MAX_EVENTS_PER_SECOND
533            )));
534        }
535
536        // Validate max_events if specified
537        const MAX_STREAM_EVENTS: u64 = 10_000_000;
538        if req.max_events > MAX_STREAM_EVENTS {
539            return Err(Status::invalid_argument(format!(
540                "max_events ({}) exceeds maximum allowed value ({})",
541                req.max_events, MAX_STREAM_EVENTS
542            )));
543        }
544
545        // Check resources before starting stream
546        let degradation_level = self.state.check_resources()?;
547        if degradation_level != DegradationLevel::Normal {
548            warn!(
549                "Starting stream under resource pressure (level: {:?})",
550                degradation_level
551            );
552        }
553
554        info!(
555            "Stream data request: {} events/sec, max {}",
556            req.events_per_second, req.max_events
557        );
558
559        let config = self.proto_to_config(req.config).await?;
560        let state = self.state.clone();
561
562        // Increment active streams
563        state.active_streams.fetch_add(1, Ordering::Relaxed);
564
565        // Reset control flags
566        state.stream_paused.store(false, Ordering::Relaxed);
567        state.stream_stopped.store(false, Ordering::Relaxed);
568
569        let (tx, rx) = mpsc::channel(100);
570
571        // Spawn background task to generate and stream data
572        let events_per_second = req.events_per_second;
573        let max_events = req.max_events;
574        let inject_anomalies = req.inject_anomalies;
575
576        tokio::spawn(async move {
577            let phase_config = PhaseConfig {
578                generate_master_data: false,
579                generate_document_flows: false,
580                generate_journal_entries: true,
581                inject_anomalies,
582                show_progress: false,
583                ..Default::default()
584            };
585
586            let mut sequence = 0u64;
587            let delay = if events_per_second > 0 {
588                Duration::from_micros(1_000_000 / events_per_second as u64)
589            } else {
590                Duration::from_millis(1)
591            };
592
593            // Create orchestrator once outside the loop to avoid per-iteration overhead
594            let mut orchestrator =
595                match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
596                    Ok(o) => o,
597                    Err(e) => {
598                        error!("Failed to create orchestrator: {}", e);
599                        return;
600                    }
601                };
602
603            loop {
604                // Check stop flag
605                if state.stream_stopped.load(Ordering::Relaxed) {
606                    info!("Stream stopped by control command");
607                    break;
608                }
609
610                // Check pause flag
611                while state.stream_paused.load(Ordering::Relaxed) {
612                    tokio::time::sleep(Duration::from_millis(100)).await;
613                    if state.stream_stopped.load(Ordering::Relaxed) {
614                        break;
615                    }
616                }
617
618                // Check max events
619                if max_events > 0 && sequence >= max_events {
620                    info!("Stream reached max events: {}", max_events);
621                    break;
622                }
623
624                // Generate a batch
625                let result = match orchestrator.generate() {
626                    Ok(r) => r,
627                    Err(e) => {
628                        error!("Generation failed: {}", e);
629                        break;
630                    }
631                };
632
633                // Stream each entry
634                for entry in result.journal_entries {
635                    sequence += 1;
636                    state.total_stream_events.fetch_add(1, Ordering::Relaxed);
637                    state.total_entries.fetch_add(1, Ordering::Relaxed);
638
639                    let timestamp = Timestamp {
640                        seconds: Utc::now().timestamp(),
641                        nanos: 0,
642                    };
643
644                    let event = DataEvent {
645                        sequence,
646                        timestamp: Some(timestamp),
647                        event: Some(data_event::Event::JournalEntry(
648                            SynthService::journal_entry_to_proto(&entry),
649                        )),
650                    };
651
652                    if tx.send(Ok(event)).await.is_err() {
653                        info!("Stream receiver dropped");
654                        break;
655                    }
656
657                    // Rate limiting
658                    tokio::time::sleep(delay).await;
659
660                    // Check max events
661                    if max_events > 0 && sequence >= max_events {
662                        break;
663                    }
664                }
665            }
666
667            // Decrement active streams
668            state.active_streams.fetch_sub(1, Ordering::Relaxed);
669        });
670
671        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
672    }
673
674    /// Control commands for streaming.
675    async fn control(
676        &self,
677        request: Request<ControlCommand>,
678    ) -> Result<Response<ControlResponse>, Status> {
679        let cmd = request.into_inner();
680        let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
681
682        info!("Control command: {:?}", action);
683
684        let (success, message, status) = match action {
685            ControlAction::Pause => {
686                self.state.stream_paused.store(true, Ordering::Relaxed);
687                (true, "Stream paused".to_string(), StreamStatus::Paused)
688            }
689            ControlAction::Resume => {
690                self.state.stream_paused.store(false, Ordering::Relaxed);
691                (true, "Stream resumed".to_string(), StreamStatus::Running)
692            }
693            ControlAction::Stop => {
694                self.state.stream_stopped.store(true, Ordering::Relaxed);
695                (true, "Stream stopped".to_string(), StreamStatus::Stopped)
696            }
697            ControlAction::TriggerPattern => {
698                let pattern = cmd.pattern_name.unwrap_or_default();
699                if pattern.is_empty() {
700                    (
701                        false,
702                        "Pattern name is required for TriggerPattern action".to_string(),
703                        StreamStatus::Running,
704                    )
705                } else {
706                    // Valid patterns: year_end_spike, period_end_spike, holiday_cluster,
707                    // fraud_cluster, error_cluster, or any custom pattern name
708                    let valid_patterns = [
709                        "year_end_spike",
710                        "period_end_spike",
711                        "holiday_cluster",
712                        "fraud_cluster",
713                        "error_cluster",
714                        "uniform",
715                    ];
716                    let is_valid = valid_patterns.contains(&pattern.as_str())
717                        || pattern.starts_with("custom:");
718
719                    if is_valid {
720                        // Store the pattern for the stream generator to pick up
721                        if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
722                            *triggered = Some(pattern.clone());
723                        }
724                        info!("Pattern trigger activated: {}", pattern);
725                        (
726                            true,
727                            format!("Pattern '{}' will be applied to upcoming entries", pattern),
728                            StreamStatus::Running,
729                        )
730                    } else {
731                        (
732                            false,
733                            format!(
734                                "Unknown pattern '{}'. Valid patterns: {:?}",
735                                pattern, valid_patterns
736                            ),
737                            StreamStatus::Running,
738                        )
739                    }
740                }
741            }
742            ControlAction::Unspecified => (
743                false,
744                "Unknown control action".to_string(),
745                StreamStatus::Unspecified,
746            ),
747        };
748
749        Ok(Response::new(ControlResponse {
750            success,
751            message,
752            current_status: status as i32,
753        }))
754    }
755
756    /// Get current configuration.
757    async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
758        let config = self.state.config.read().await;
759        let proto_config = Self::config_to_proto(&config);
760
761        Ok(Response::new(ConfigResponse {
762            success: true,
763            message: "Current configuration retrieved".to_string(),
764            current_config: Some(proto_config),
765        }))
766    }
767
768    /// Set configuration.
769    async fn set_config(
770        &self,
771        request: Request<ConfigRequest>,
772    ) -> Result<Response<ConfigResponse>, Status> {
773        let req = request.into_inner();
774
775        if let Some(proto_config) = req.config {
776            let new_config = self.proto_to_config(Some(proto_config)).await?;
777
778            let mut config = self.state.config.write().await;
779            *config = new_config.clone();
780
781            info!("Configuration updated");
782
783            Ok(Response::new(ConfigResponse {
784                success: true,
785                message: "Configuration updated".to_string(),
786                current_config: Some(Self::config_to_proto(&new_config)),
787            }))
788        } else {
789            Err(Status::invalid_argument("No configuration provided"))
790        }
791    }
792
793    /// Get server metrics.
794    async fn get_metrics(
795        &self,
796        _request: Request<()>,
797    ) -> Result<Response<MetricsResponse>, Status> {
798        let uptime = self.state.uptime_seconds();
799        let total_entries = self.state.total_entries.load(Ordering::Relaxed);
800
801        let entries_per_second = if uptime > 0 {
802            total_entries as f64 / uptime as f64
803        } else {
804            0.0
805        };
806
807        Ok(Response::new(MetricsResponse {
808            total_entries_generated: total_entries,
809            total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
810            uptime_seconds: uptime,
811            session_entries: total_entries,
812            session_entries_per_second: entries_per_second,
813            active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
814            total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
815        }))
816    }
817
818    /// Health check.
819    async fn health_check(
820        &self,
821        _request: Request<()>,
822    ) -> Result<Response<HealthResponse>, Status> {
823        Ok(Response::new(HealthResponse {
824            healthy: true,
825            version: env!("CARGO_PKG_VERSION").to_string(),
826            uptime_seconds: self.state.uptime_seconds(),
827        }))
828    }
829}
830
831/// Create a default GeneratorConfig.
832pub fn default_generator_config() -> GeneratorConfig {
833    GeneratorConfig {
834        global: GlobalConfig {
835            seed: None,
836            industry: IndustrySector::Manufacturing,
837            start_date: "2024-01-01".to_string(),
838            period_months: 12,
839            group_currency: "USD".to_string(),
840            parallel: true,
841            worker_threads: 0,
842            memory_limit_mb: 0,
843            fiscal_year_months: None,
844        },
845        companies: vec![CompanyConfig {
846            code: "1000".to_string(),
847            name: "Default Company".to_string(),
848            currency: "USD".to_string(),
849            country: "US".to_string(),
850            annual_transaction_volume: TransactionVolume::TenK,
851            volume_weight: 1.0,
852            fiscal_year_variant: "K4".to_string(),
853        }],
854        chart_of_accounts: ChartOfAccountsConfig {
855            complexity: CoAComplexity::Small,
856            industry_specific: true,
857            custom_accounts: None,
858            min_hierarchy_depth: 2,
859            max_hierarchy_depth: 5,
860        },
861        transactions: Default::default(),
862        output: OutputConfig::default(),
863        fraud: Default::default(),
864        internal_controls: Default::default(),
865        business_processes: Default::default(),
866        user_personas: Default::default(),
867        templates: Default::default(),
868        approval: Default::default(),
869        departments: Default::default(),
870        master_data: Default::default(),
871        document_flows: Default::default(),
872        intercompany: Default::default(),
873        balance: Default::default(),
874        ocpm: Default::default(),
875        audit: Default::default(),
876        banking: Default::default(),
877        data_quality: Default::default(),
878        scenario: Default::default(),
879        temporal: Default::default(),
880        graph_export: Default::default(),
881        streaming: Default::default(),
882        rate_limit: Default::default(),
883        temporal_attributes: Default::default(),
884        relationships: Default::default(),
885        accounting_standards: Default::default(),
886        audit_standards: Default::default(),
887        distributions: Default::default(),
888        temporal_patterns: Default::default(),
889        vendor_network: Default::default(),
890        customer_segmentation: Default::default(),
891        relationship_strength: Default::default(),
892        cross_process_links: Default::default(),
893        organizational_events: Default::default(),
894        behavioral_drift: Default::default(),
895        market_drift: Default::default(),
896        drift_labeling: Default::default(),
897        anomaly_injection: Default::default(),
898        industry_specific: Default::default(),
899        fingerprint_privacy: Default::default(),
900        quality_gates: Default::default(),
901        compliance: Default::default(),
902        webhooks: Default::default(),
903        llm: Default::default(),
904        diffusion: Default::default(),
905        causal: Default::default(),
906        source_to_pay: Default::default(),
907        financial_reporting: Default::default(),
908        hr: Default::default(),
909        manufacturing: Default::default(),
910        sales_quotes: Default::default(),
911        tax: Default::default(),
912        treasury: Default::default(),
913        project_accounting: Default::default(),
914        esg: Default::default(),
915        country_packs: None,
916        scenarios: Default::default(),
917        session: Default::default(),
918    }
919}
920
921#[cfg(test)]
922#[allow(clippy::unwrap_used)]
923mod tests {
924    use super::*;
925    use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
926
927    // ==========================================================================
928    // Service Creation Tests
929    // ==========================================================================
930
931    #[tokio::test]
932    async fn test_service_creation() {
933        let config = default_generator_config();
934        let service = SynthService::new(config);
935        // Service should start with zero or very small uptime (test completes quickly)
936        assert!(service.state.uptime_seconds() < 60);
937    }
938
939    #[tokio::test]
940    async fn test_service_with_state() {
941        let config = default_generator_config();
942        let state = Arc::new(ServerState::new(config));
943        let service = SynthService::with_state(Arc::clone(&state));
944
945        // Should share the same state
946        state.total_entries.store(100, Ordering::Relaxed);
947        assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
948    }
949
950    // ==========================================================================
951    // Health Check Tests
952    // ==========================================================================
953
954    #[tokio::test]
955    async fn test_health_check() {
956        let config = default_generator_config();
957        let service = SynthService::new(config);
958
959        let response = service.health_check(Request::new(())).await.unwrap();
960        let health = response.into_inner();
961
962        assert!(health.healthy);
963        assert!(!health.version.is_empty());
964    }
965
966    #[tokio::test]
967    async fn test_health_check_returns_version() {
968        let config = default_generator_config();
969        let service = SynthService::new(config);
970
971        let response = service.health_check(Request::new(())).await.unwrap();
972        let health = response.into_inner();
973
974        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
975    }
976
977    // ==========================================================================
978    // Configuration Tests
979    // ==========================================================================
980
981    #[tokio::test]
982    async fn test_get_config() {
983        let config = default_generator_config();
984        let service = SynthService::new(config);
985
986        let response = service.get_config(Request::new(())).await.unwrap();
987        let config_response = response.into_inner();
988
989        assert!(config_response.success);
990        assert!(config_response.current_config.is_some());
991    }
992
993    #[tokio::test]
994    async fn test_get_config_returns_industry() {
995        let config = default_generator_config();
996        let service = SynthService::new(config);
997
998        let response = service.get_config(Request::new(())).await.unwrap();
999        let config_response = response.into_inner();
1000        let current = config_response.current_config.unwrap();
1001
1002        assert_eq!(current.industry, "Manufacturing");
1003    }
1004
1005    #[tokio::test]
1006    async fn test_set_config() {
1007        let config = default_generator_config();
1008        let service = SynthService::new(config);
1009
1010        let new_config = GenerationConfig {
1011            industry: "retail".to_string(),
1012            start_date: "2024-06-01".to_string(),
1013            period_months: 6,
1014            seed: 42,
1015            coa_complexity: "medium".to_string(),
1016            companies: vec![],
1017            fraud_enabled: true,
1018            fraud_rate: 0.05,
1019            generate_master_data: false,
1020            generate_document_flows: false,
1021        };
1022
1023        let response = service
1024            .set_config(Request::new(ConfigRequest {
1025                config: Some(new_config),
1026            }))
1027            .await
1028            .unwrap();
1029        let config_response = response.into_inner();
1030
1031        assert!(config_response.success);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_set_config_without_config_fails() {
1036        let config = default_generator_config();
1037        let service = SynthService::new(config);
1038
1039        let result = service
1040            .set_config(Request::new(ConfigRequest { config: None }))
1041            .await;
1042
1043        assert!(result.is_err());
1044    }
1045
1046    // ==========================================================================
1047    // Metrics Tests
1048    // ==========================================================================
1049
1050    #[tokio::test]
1051    async fn test_get_metrics_initial() {
1052        let config = default_generator_config();
1053        let service = SynthService::new(config);
1054
1055        let response = service.get_metrics(Request::new(())).await.unwrap();
1056        let metrics = response.into_inner();
1057
1058        assert_eq!(metrics.total_entries_generated, 0);
1059        assert_eq!(metrics.total_anomalies_injected, 0);
1060        assert_eq!(metrics.active_streams, 0);
1061    }
1062
1063    #[tokio::test]
1064    async fn test_get_metrics_after_updates() {
1065        let config = default_generator_config();
1066        let service = SynthService::new(config);
1067
1068        // Simulate some activity
1069        service.state.total_entries.store(1000, Ordering::Relaxed);
1070        service.state.total_anomalies.store(20, Ordering::Relaxed);
1071        service.state.active_streams.store(2, Ordering::Relaxed);
1072
1073        let response = service.get_metrics(Request::new(())).await.unwrap();
1074        let metrics = response.into_inner();
1075
1076        assert_eq!(metrics.total_entries_generated, 1000);
1077        assert_eq!(metrics.total_anomalies_injected, 20);
1078        assert_eq!(metrics.active_streams, 2);
1079    }
1080
1081    // ==========================================================================
1082    // Control Tests
1083    // ==========================================================================
1084
1085    #[tokio::test]
1086    async fn test_control_pause() {
1087        let config = default_generator_config();
1088        let service = SynthService::new(config);
1089
1090        let response = service
1091            .control(Request::new(ControlCommand {
1092                action: ControlAction::Pause as i32,
1093                pattern_name: None,
1094            }))
1095            .await
1096            .unwrap();
1097        let control_response = response.into_inner();
1098
1099        assert!(control_response.success);
1100        assert!(service.state.stream_paused.load(Ordering::Relaxed));
1101    }
1102
1103    #[tokio::test]
1104    async fn test_control_resume() {
1105        let config = default_generator_config();
1106        let service = SynthService::new(config);
1107
1108        // First pause
1109        service.state.stream_paused.store(true, Ordering::Relaxed);
1110
1111        let response = service
1112            .control(Request::new(ControlCommand {
1113                action: ControlAction::Resume as i32,
1114                pattern_name: None,
1115            }))
1116            .await
1117            .unwrap();
1118        let control_response = response.into_inner();
1119
1120        assert!(control_response.success);
1121        assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1122    }
1123
1124    #[tokio::test]
1125    async fn test_control_stop() {
1126        let config = default_generator_config();
1127        let service = SynthService::new(config);
1128
1129        let response = service
1130            .control(Request::new(ControlCommand {
1131                action: ControlAction::Stop as i32,
1132                pattern_name: None,
1133            }))
1134            .await
1135            .unwrap();
1136        let control_response = response.into_inner();
1137
1138        assert!(control_response.success);
1139        assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1140    }
1141
1142    // ==========================================================================
1143    // ServerState Tests
1144    // ==========================================================================
1145
1146    #[test]
1147    fn test_server_state_creation() {
1148        let config = default_generator_config();
1149        let state = ServerState::new(config);
1150
1151        assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1152        assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1153        assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1154        assert!(!state.stream_paused.load(Ordering::Relaxed));
1155        assert!(!state.stream_stopped.load(Ordering::Relaxed));
1156    }
1157
1158    #[test]
1159    fn test_server_state_uptime() {
1160        let config = default_generator_config();
1161        let state = ServerState::new(config);
1162
1163        // Uptime should be small since we just created the state
1164        assert!(state.uptime_seconds() < 60);
1165    }
1166
1167    // ==========================================================================
1168    // Proto Conversion Tests
1169    // ==========================================================================
1170
1171    #[test]
1172    fn test_default_generator_config() {
1173        let config = default_generator_config();
1174
1175        assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1176        assert_eq!(config.global.period_months, 12);
1177        assert!(!config.companies.is_empty());
1178        assert_eq!(config.companies[0].code, "1000");
1179    }
1180
1181    #[test]
1182    fn test_config_to_proto() {
1183        let config = default_generator_config();
1184        let proto = SynthService::config_to_proto(&config);
1185
1186        assert_eq!(proto.industry, "Manufacturing");
1187        assert_eq!(proto.period_months, 12);
1188        assert!(!proto.companies.is_empty());
1189    }
1190
1191    #[tokio::test]
1192    async fn test_proto_to_config_with_none() {
1193        let config = default_generator_config();
1194        let service = SynthService::new(config.clone());
1195
1196        let result = service.proto_to_config(None).await.unwrap();
1197
1198        // Should return current config when None is passed
1199        assert_eq!(result.global.industry, config.global.industry);
1200    }
1201
1202    #[tokio::test]
1203    async fn test_proto_to_config_with_retail() {
1204        let config = default_generator_config();
1205        let service = SynthService::new(config);
1206
1207        let proto = GenerationConfig {
1208            industry: "retail".to_string(),
1209            start_date: "2024-01-01".to_string(),
1210            period_months: 6,
1211            seed: 0,
1212            coa_complexity: "large".to_string(),
1213            companies: vec![],
1214            fraud_enabled: false,
1215            fraud_rate: 0.0,
1216            generate_master_data: false,
1217            generate_document_flows: false,
1218        };
1219
1220        let result = service.proto_to_config(Some(proto)).await.unwrap();
1221
1222        assert_eq!(result.global.industry, IndustrySector::Retail);
1223        assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_proto_to_config_with_healthcare() {
1228        let config = default_generator_config();
1229        let service = SynthService::new(config);
1230
1231        let proto = GenerationConfig {
1232            industry: "healthcare".to_string(),
1233            start_date: "2024-01-01".to_string(),
1234            period_months: 12,
1235            seed: 42,
1236            coa_complexity: "small".to_string(),
1237            companies: vec![],
1238            fraud_enabled: true,
1239            fraud_rate: 0.1,
1240            generate_master_data: true,
1241            generate_document_flows: true,
1242        };
1243
1244        let result = service.proto_to_config(Some(proto)).await.unwrap();
1245
1246        assert_eq!(result.global.industry, IndustrySector::Healthcare);
1247        assert_eq!(result.global.seed, Some(42));
1248        assert!(result.fraud.enabled);
1249        assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1250    }
1251
1252    #[tokio::test]
1253    async fn test_proto_to_config_with_companies() {
1254        let config = default_generator_config();
1255        let service = SynthService::new(config);
1256
1257        let proto = GenerationConfig {
1258            industry: "technology".to_string(),
1259            start_date: "2024-01-01".to_string(),
1260            period_months: 12,
1261            seed: 0,
1262            coa_complexity: "medium".to_string(),
1263            companies: vec![
1264                CompanyConfigProto {
1265                    code: "1000".to_string(),
1266                    name: "Parent Corp".to_string(),
1267                    currency: "USD".to_string(),
1268                    country: "US".to_string(),
1269                    annual_transaction_volume: 100000,
1270                    volume_weight: 1.0,
1271                },
1272                CompanyConfigProto {
1273                    code: "2000".to_string(),
1274                    name: "EU Sub".to_string(),
1275                    currency: "EUR".to_string(),
1276                    country: "DE".to_string(),
1277                    annual_transaction_volume: 50000,
1278                    volume_weight: 0.5,
1279                },
1280            ],
1281            fraud_enabled: false,
1282            fraud_rate: 0.0,
1283            generate_master_data: false,
1284            generate_document_flows: false,
1285        };
1286
1287        let result = service.proto_to_config(Some(proto)).await.unwrap();
1288
1289        assert_eq!(result.companies.len(), 2);
1290        assert_eq!(result.companies[0].code, "1000");
1291        assert_eq!(result.companies[1].currency, "EUR");
1292    }
1293
1294    // ==========================================================================
1295    // Input Validation Tests
1296    // ==========================================================================
1297
1298    #[tokio::test]
1299    async fn test_bulk_generate_entry_count_validation() {
1300        let config = default_generator_config();
1301        let service = SynthService::new(config);
1302
1303        let request = BulkGenerateRequest {
1304            entry_count: 2_000_000, // Exceeds MAX_ENTRY_COUNT
1305            include_master_data: false,
1306            inject_anomalies: false,
1307            output_format: 0,
1308            config: None,
1309        };
1310
1311        let result = service.bulk_generate(Request::new(request)).await;
1312        assert!(result.is_err());
1313        let err = result.err().unwrap();
1314        assert!(err.message().contains("exceeds maximum allowed value"));
1315    }
1316
1317    #[tokio::test]
1318    async fn test_stream_data_events_per_second_too_low() {
1319        let config = default_generator_config();
1320        let service = SynthService::new(config);
1321
1322        let request = StreamDataRequest {
1323            events_per_second: 0, // Below MIN_EVENTS_PER_SECOND
1324            max_events: 100,
1325            inject_anomalies: false,
1326            anomaly_rate: 0.0,
1327            config: None,
1328        };
1329
1330        let result = service.stream_data(Request::new(request)).await;
1331        assert!(result.is_err());
1332        let err = result.err().unwrap();
1333        assert!(err.message().contains("must be at least"));
1334    }
1335
1336    #[tokio::test]
1337    async fn test_stream_data_events_per_second_too_high() {
1338        let config = default_generator_config();
1339        let service = SynthService::new(config);
1340
1341        let request = StreamDataRequest {
1342            events_per_second: 20_000, // Exceeds MAX_EVENTS_PER_SECOND
1343            max_events: 100,
1344            inject_anomalies: false,
1345            anomaly_rate: 0.0,
1346            config: None,
1347        };
1348
1349        let result = service.stream_data(Request::new(request)).await;
1350        assert!(result.is_err());
1351        let err = result.err().unwrap();
1352        assert!(err.message().contains("exceeds maximum allowed value"));
1353    }
1354
1355    #[tokio::test]
1356    async fn test_stream_data_max_events_too_high() {
1357        let config = default_generator_config();
1358        let service = SynthService::new(config);
1359
1360        let request = StreamDataRequest {
1361            events_per_second: 100,
1362            max_events: 100_000_000, // Exceeds MAX_STREAM_EVENTS
1363            inject_anomalies: false,
1364            anomaly_rate: 0.0,
1365            config: None,
1366        };
1367
1368        let result = service.stream_data(Request::new(request)).await;
1369        assert!(result.is_err());
1370        let err = result.err().unwrap();
1371        assert!(err.message().contains("max_events"));
1372    }
1373
1374    #[tokio::test]
1375    async fn test_stream_data_valid_request() {
1376        let config = default_generator_config();
1377        let service = SynthService::new(config);
1378
1379        let request = StreamDataRequest {
1380            events_per_second: 10,
1381            max_events: 5,
1382            inject_anomalies: false,
1383            anomaly_rate: 0.0,
1384            config: None,
1385        };
1386
1387        // This should succeed - we can't easily test the stream output,
1388        // but we verify the request is accepted
1389        let result = service.stream_data(Request::new(request)).await;
1390        assert!(result.is_ok());
1391    }
1392}