Skip to main content

datasynth_server/grpc/
service.rs

1//! gRPC service implementation.
2
3use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17    ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18    TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26/// Server state for tracking metrics and configuration.
27pub struct ServerState {
28    /// Current configuration
29    pub config: RwLock<GeneratorConfig>,
30    /// Configuration source for reloading
31    pub config_source: RwLock<crate::config_loader::ConfigSource>,
32    /// Server start time
33    start_time: Instant,
34    /// Total entries generated
35    pub total_entries: AtomicU64,
36    /// Total anomalies injected
37    pub total_anomalies: AtomicU64,
38    /// Active streams count
39    pub active_streams: AtomicU64,
40    /// Total stream events
41    pub total_stream_events: AtomicU64,
42    /// Stream control flag
43    pub stream_paused: AtomicBool,
44    /// Stream stop flag
45    pub stream_stopped: AtomicBool,
46    /// Stream events per second (0 = unlimited)
47    pub stream_events_per_second: AtomicU64,
48    /// Stream maximum events (0 = unlimited)
49    pub stream_max_events: AtomicU64,
50    /// Stream anomaly injection flag
51    pub stream_inject_anomalies: AtomicBool,
52    /// Triggered pattern name (if any) - will be applied to next generated entries
53    pub triggered_pattern: RwLock<Option<String>>,
54    /// Resource guard for memory and disk monitoring
55    pub resource_guard: Arc<ResourceGuard>,
56    /// Maximum concurrent generations allowed
57    max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61    pub fn new(config: GeneratorConfig) -> Self {
62        // Build resource guard from config
63        let memory_limit = config.global.memory_limit_mb;
64        let resource_guard = if memory_limit > 0 {
65            ResourceGuardBuilder::new()
66                .memory_limit(memory_limit)
67                .conservative()
68                .build()
69        } else {
70            // Default: 2GB limit for server
71            ResourceGuardBuilder::new()
72                .memory_limit(2048)
73                .conservative()
74                .build()
75        };
76
77        Self {
78            config: RwLock::new(config),
79            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80            start_time: Instant::now(),
81            total_entries: AtomicU64::new(0),
82            total_anomalies: AtomicU64::new(0),
83            active_streams: AtomicU64::new(0),
84            total_stream_events: AtomicU64::new(0),
85            stream_paused: AtomicBool::new(false),
86            stream_stopped: AtomicBool::new(false),
87            stream_events_per_second: AtomicU64::new(0),
88            stream_max_events: AtomicU64::new(0),
89            stream_inject_anomalies: AtomicBool::new(false),
90            triggered_pattern: RwLock::new(None),
91            resource_guard: Arc::new(resource_guard),
92            max_concurrent_generations: AtomicU64::new(4),
93        }
94    }
95
96    /// Create with custom resource limits.
97    pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98        let resource_guard = ResourceGuardBuilder::new()
99            .memory_limit(memory_limit_mb)
100            .conservative()
101            .build();
102
103        Self {
104            config: RwLock::new(config),
105            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106            start_time: Instant::now(),
107            total_entries: AtomicU64::new(0),
108            total_anomalies: AtomicU64::new(0),
109            active_streams: AtomicU64::new(0),
110            total_stream_events: AtomicU64::new(0),
111            stream_paused: AtomicBool::new(false),
112            stream_stopped: AtomicBool::new(false),
113            stream_events_per_second: AtomicU64::new(0),
114            stream_max_events: AtomicU64::new(0),
115            stream_inject_anomalies: AtomicBool::new(false),
116            triggered_pattern: RwLock::new(None),
117            resource_guard: Arc::new(resource_guard),
118            max_concurrent_generations: AtomicU64::new(4),
119        }
120    }
121
122    pub fn uptime_seconds(&self) -> u64 {
123        self.start_time.elapsed().as_secs()
124    }
125
126    /// Check if resources are available for a new generation.
127    #[allow(clippy::result_large_err)] // tonic::Status is the idiomatic error type for gRPC
128    pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129        // Check if too many concurrent generations
130        let active = self.active_streams.load(Ordering::Relaxed);
131        let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132        if active >= max {
133            return Err(Status::resource_exhausted(format!(
134                "Too many concurrent generations ({active}/{max}). Try again later."
135            )));
136        }
137
138        // Check memory and other resources
139        match self.resource_guard.check() {
140            Ok(level) => {
141                if level == DegradationLevel::Emergency {
142                    Err(Status::resource_exhausted(
143                        "Server resources critically low. Generation not possible.",
144                    ))
145                } else if level == DegradationLevel::Minimal {
146                    warn!("Resources constrained, generation may be limited");
147                    Ok(level)
148                } else {
149                    Ok(level)
150                }
151            }
152            Err(e) => Err(Status::resource_exhausted(format!(
153                "Resource check failed: {e}"
154            ))),
155        }
156    }
157
158    /// Get current resource status for monitoring.
159    pub fn resource_status(&self) -> ResourceStatus {
160        let stats = self.resource_guard.stats();
161        ResourceStatus {
162            memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163            memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164            disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165            degradation_level: stats.degradation_level.name().to_string(),
166            active_generations: self.active_streams.load(Ordering::Relaxed),
167        }
168    }
169}
170
171/// Resource status for monitoring endpoints.
172#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174    pub memory_usage_mb: u64,
175    pub memory_peak_mb: u64,
176    pub disk_available_mb: u64,
177    pub degradation_level: String,
178    pub active_generations: u64,
179}
180
181/// Main gRPC service implementation.
182pub struct SynthService {
183    pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187    pub fn new(config: GeneratorConfig) -> Self {
188        Self {
189            state: Arc::new(ServerState::new(config)),
190        }
191    }
192
193    pub fn with_state(state: Arc<ServerState>) -> Self {
194        Self { state }
195    }
196
197    /// Convert a GenerationConfig proto to GeneratorConfig.
198    async fn proto_to_config(
199        &self,
200        proto: Option<GenerationConfig>,
201    ) -> Result<GeneratorConfig, Status> {
202        match proto {
203            Some(p) => {
204                let industry = match p.industry.to_lowercase().as_str() {
205                    "manufacturing" => IndustrySector::Manufacturing,
206                    "retail" => IndustrySector::Retail,
207                    "financial_services" | "financial" => IndustrySector::FinancialServices,
208                    "healthcare" => IndustrySector::Healthcare,
209                    "technology" => IndustrySector::Technology,
210                    "" => IndustrySector::Manufacturing, // empty defaults to manufacturing
211                    other => {
212                        return Err(Status::invalid_argument(format!(
213                            "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214                        )));
215                    }
216                };
217
218                let complexity = match p.coa_complexity.to_lowercase().as_str() {
219                    "small" => CoAComplexity::Small,
220                    "medium" => CoAComplexity::Medium,
221                    "large" => CoAComplexity::Large,
222                    "" => CoAComplexity::Small, // empty defaults to small
223                    other => {
224                        return Err(Status::invalid_argument(format!(
225                            "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226                        )));
227                    }
228                };
229
230                let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231                    vec![CompanyConfig {
232                        code: "1000".to_string(),
233                        name: "Default Company".to_string(),
234                        currency: "USD".to_string(),
235                        country: "US".to_string(),
236                        annual_transaction_volume: TransactionVolume::TenK,
237                        volume_weight: 1.0,
238                        fiscal_year_variant: "K4".to_string(),
239                    }]
240                } else {
241                    p.companies
242                        .into_iter()
243                        .map(|c| CompanyConfig {
244                            code: c.code,
245                            name: c.name,
246                            currency: c.currency,
247                            country: c.country,
248                            annual_transaction_volume: TransactionVolume::Custom(
249                                c.annual_transaction_volume,
250                            ),
251                            volume_weight: c.volume_weight as f64,
252                            fiscal_year_variant: "K4".to_string(),
253                        })
254                        .collect()
255                };
256
257                let mut config = GeneratorConfig {
258                    global: GlobalConfig {
259                        seed: if p.seed > 0 { Some(p.seed) } else { None },
260                        industry,
261                        start_date: if p.start_date.is_empty() {
262                            "2024-01-01".to_string()
263                        } else {
264                            p.start_date
265                        },
266                        period_months: if p.period_months == 0 {
267                            12
268                        } else {
269                            p.period_months
270                        },
271                        group_currency: "USD".to_string(),
272                        parallel: true,
273                        worker_threads: 0,
274                        memory_limit_mb: 0,
275                        fiscal_year_months: None,
276                    },
277                    companies,
278                    chart_of_accounts: ChartOfAccountsConfig {
279                        complexity,
280                        industry_specific: true,
281                        custom_accounts: None,
282                        min_hierarchy_depth: 2,
283                        max_hierarchy_depth: 5,
284                    },
285                    ..default_generator_config()
286                };
287
288                // Enable fraud if requested
289                if p.fraud_enabled {
290                    config.fraud.enabled = true;
291                    config.fraud.fraud_rate = p.fraud_rate as f64;
292                }
293
294                Ok(config)
295            }
296            None => {
297                // Use current server config
298                let config = self.state.config.read().await;
299                Ok(config.clone())
300            }
301        }
302    }
303
304    /// Convert a JournalEntry to proto format.
305    fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
306        JournalEntryProto {
307            document_id: entry.header.document_id.to_string(),
308            company_code: entry.header.company_code.clone(),
309            fiscal_year: entry.header.fiscal_year as u32,
310            fiscal_period: entry.header.fiscal_period as u32,
311            posting_date: entry.header.posting_date.to_string(),
312            document_date: entry.header.document_date.to_string(),
313            created_at: entry.header.created_at.to_rfc3339(),
314            source: format!("{:?}", entry.header.source),
315            business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
316            lines: entry
317                .lines
318                .iter()
319                .map(|line| {
320                    let amount = if line.is_debit() {
321                        line.debit_amount
322                    } else {
323                        line.credit_amount
324                    };
325                    JournalLineProto {
326                        line_number: line.line_number,
327                        account_number: line.gl_account.clone(),
328                        account_name: line.account_description.clone().unwrap_or_default(),
329                        amount: amount.to_string(),
330                        is_debit: line.is_debit(),
331                        cost_center: line.cost_center.clone(),
332                        profit_center: line.profit_center.clone(),
333                        // vendor_id/customer_id/material_id are not on JournalEntryLine;
334                        // populate from auxiliary_account_number when available (French GAAP)
335                        vendor_id: line.auxiliary_account_number.clone(),
336                        customer_id: None,
337                        material_id: None,
338                        text: line.line_text.clone().or_else(|| line.text.clone()),
339                    }
340                })
341                .collect(),
342            is_anomaly: entry.header.is_fraud,
343            anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
344        }
345    }
346
347    /// Convert current config to proto format.
348    fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
349        GenerationConfig {
350            industry: format!("{:?}", config.global.industry),
351            start_date: config.global.start_date.clone(),
352            period_months: config.global.period_months,
353            seed: config.global.seed.unwrap_or(0),
354            coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
355            companies: config
356                .companies
357                .iter()
358                .map(|c| CompanyConfigProto {
359                    code: c.code.clone(),
360                    name: c.name.clone(),
361                    currency: c.currency.clone(),
362                    country: c.country.clone(),
363                    annual_transaction_volume: c.annual_transaction_volume.count(),
364                    volume_weight: c.volume_weight as f32,
365                })
366                .collect(),
367            fraud_enabled: config.fraud.enabled,
368            fraud_rate: config.fraud.fraud_rate as f32,
369            generate_master_data: config.master_data.vendors.count > 0
370                || config.master_data.customers.count > 0
371                || config.master_data.materials.count > 0,
372            generate_document_flows: config.document_flows.p2p.enabled
373                || config.document_flows.o2c.enabled,
374        }
375    }
376}
377
378#[tonic::async_trait]
379impl synthetic_data_service_server::SyntheticDataService for SynthService {
380    /// Bulk generation - generates all data at once and returns.
381    async fn bulk_generate(
382        &self,
383        request: Request<BulkGenerateRequest>,
384    ) -> Result<Response<BulkGenerateResponse>, Status> {
385        let req = request.into_inner();
386
387        // Validate entry_count bounds
388        const MAX_ENTRY_COUNT: u64 = 1_000_000;
389        if req.entry_count > MAX_ENTRY_COUNT {
390            return Err(Status::invalid_argument(format!(
391                "entry_count ({}) exceeds maximum allowed value ({})",
392                req.entry_count, MAX_ENTRY_COUNT
393            )));
394        }
395
396        // Check resources before starting generation
397        let degradation_level = self.state.check_resources()?;
398        if degradation_level != DegradationLevel::Normal {
399            warn!(
400                "Starting bulk generation under resource pressure (level: {:?})",
401                degradation_level
402            );
403        }
404
405        info!("Bulk generate request: {} entries", req.entry_count);
406
407        let config = self.proto_to_config(req.config).await?;
408        let start_time = Instant::now();
409
410        // Create orchestrator with appropriate phase config
411        let phase_config = PhaseConfig {
412            generate_master_data: req.include_master_data,
413            generate_document_flows: false,
414            generate_journal_entries: true,
415            inject_anomalies: req.inject_anomalies,
416            show_progress: false,
417            ..Default::default()
418        };
419
420        let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
421            .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
422
423        let result = orchestrator
424            .generate()
425            .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
426
427        let duration_ms = start_time.elapsed().as_millis() as u64;
428
429        // Update metrics
430        let entries_count = result.journal_entries.len() as u64;
431        self.state
432            .total_entries
433            .fetch_add(entries_count, Ordering::Relaxed);
434
435        let anomaly_count = result.anomaly_labels.labels.len() as u64;
436        self.state
437            .total_anomalies
438            .fetch_add(anomaly_count, Ordering::Relaxed);
439
440        // Convert to proto
441        let journal_entries: Vec<JournalEntryProto> = result
442            .journal_entries
443            .iter()
444            .map(Self::journal_entry_to_proto)
445            .collect();
446
447        let anomaly_labels: Vec<AnomalyLabelProto> = result
448            .anomaly_labels
449            .labels
450            .iter()
451            .map(|a| AnomalyLabelProto {
452                anomaly_id: a.anomaly_id.clone(),
453                document_id: a.document_id.clone(),
454                anomaly_type: format!("{:?}", a.anomaly_type),
455                anomaly_category: a.document_type.clone(),
456                description: a.description.clone(),
457                severity_score: a.severity as f32,
458            })
459            .collect();
460
461        // Compute stats
462        let mut total_debit = rust_decimal::Decimal::ZERO;
463        let mut total_credit = rust_decimal::Decimal::ZERO;
464        let mut total_lines = 0u64;
465        let mut entries_by_company = std::collections::HashMap::new();
466        let mut entries_by_source = std::collections::HashMap::new();
467
468        for entry in &result.journal_entries {
469            *entries_by_company
470                .entry(entry.header.company_code.clone())
471                .or_insert(0u64) += 1;
472            *entries_by_source
473                .entry(format!("{:?}", entry.header.source))
474                .or_insert(0u64) += 1;
475
476            for line in &entry.lines {
477                total_lines += 1;
478                total_debit += line.debit_amount;
479                total_credit += line.credit_amount;
480            }
481        }
482
483        let stats = GenerationStats {
484            total_entries: entries_count,
485            total_lines,
486            total_debit_amount: total_debit.to_string(),
487            total_credit_amount: total_credit.to_string(),
488            anomaly_count,
489            entries_by_company,
490            entries_by_source,
491        };
492
493        info!(
494            "Bulk generation complete: {} entries in {}ms",
495            entries_count, duration_ms
496        );
497
498        Ok(Response::new(BulkGenerateResponse {
499            entries_generated: entries_count,
500            duration_ms,
501            journal_entries,
502            anomaly_labels,
503            stats: Some(stats),
504        }))
505    }
506
507    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
508
509    /// Streaming generation - continuously generates data events.
510    async fn stream_data(
511        &self,
512        request: Request<StreamDataRequest>,
513    ) -> Result<Response<Self::StreamDataStream>, Status> {
514        let req = request.into_inner();
515
516        // Validate events_per_second bounds
517        const MIN_EVENTS_PER_SECOND: u32 = 1;
518        const MAX_EVENTS_PER_SECOND: u32 = 10_000;
519        if req.events_per_second < MIN_EVENTS_PER_SECOND {
520            return Err(Status::invalid_argument(format!(
521                "events_per_second ({}) must be at least {}",
522                req.events_per_second, MIN_EVENTS_PER_SECOND
523            )));
524        }
525        if req.events_per_second > MAX_EVENTS_PER_SECOND {
526            return Err(Status::invalid_argument(format!(
527                "events_per_second ({}) exceeds maximum allowed value ({})",
528                req.events_per_second, MAX_EVENTS_PER_SECOND
529            )));
530        }
531
532        // Validate max_events if specified
533        const MAX_STREAM_EVENTS: u64 = 10_000_000;
534        if req.max_events > MAX_STREAM_EVENTS {
535            return Err(Status::invalid_argument(format!(
536                "max_events ({}) exceeds maximum allowed value ({})",
537                req.max_events, MAX_STREAM_EVENTS
538            )));
539        }
540
541        // Check resources before starting stream
542        let degradation_level = self.state.check_resources()?;
543        if degradation_level != DegradationLevel::Normal {
544            warn!(
545                "Starting stream under resource pressure (level: {:?})",
546                degradation_level
547            );
548        }
549
550        info!(
551            "Stream data request: {} events/sec, max {}",
552            req.events_per_second, req.max_events
553        );
554
555        let config = self.proto_to_config(req.config).await?;
556        let state = self.state.clone();
557
558        // Increment active streams
559        state.active_streams.fetch_add(1, Ordering::Relaxed);
560
561        // Reset control flags
562        state.stream_paused.store(false, Ordering::Relaxed);
563        state.stream_stopped.store(false, Ordering::Relaxed);
564
565        let (tx, rx) = mpsc::channel(100);
566
567        // Spawn background task to generate and stream data
568        let events_per_second = req.events_per_second;
569        let max_events = req.max_events;
570        let inject_anomalies = req.inject_anomalies;
571
572        tokio::spawn(async move {
573            let phase_config = PhaseConfig {
574                generate_master_data: false,
575                generate_document_flows: false,
576                generate_journal_entries: true,
577                inject_anomalies,
578                show_progress: false,
579                ..Default::default()
580            };
581
582            let mut sequence = 0u64;
583            let delay = if events_per_second > 0 {
584                Duration::from_micros(1_000_000 / events_per_second as u64)
585            } else {
586                Duration::from_millis(1)
587            };
588
589            // Create orchestrator once outside the loop to avoid per-iteration overhead
590            let mut orchestrator =
591                match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
592                    Ok(o) => o,
593                    Err(e) => {
594                        error!("Failed to create orchestrator: {}", e);
595                        return;
596                    }
597                };
598
599            loop {
600                // Check stop flag
601                if state.stream_stopped.load(Ordering::Relaxed) {
602                    info!("Stream stopped by control command");
603                    break;
604                }
605
606                // Check pause flag
607                while state.stream_paused.load(Ordering::Relaxed) {
608                    tokio::time::sleep(Duration::from_millis(100)).await;
609                    if state.stream_stopped.load(Ordering::Relaxed) {
610                        break;
611                    }
612                }
613
614                // Check max events
615                if max_events > 0 && sequence >= max_events {
616                    info!("Stream reached max events: {}", max_events);
617                    break;
618                }
619
620                // Generate a batch
621                let result = match orchestrator.generate() {
622                    Ok(r) => r,
623                    Err(e) => {
624                        error!("Generation failed: {}", e);
625                        break;
626                    }
627                };
628
629                // Stream each entry
630                for entry in result.journal_entries {
631                    sequence += 1;
632                    state.total_stream_events.fetch_add(1, Ordering::Relaxed);
633                    state.total_entries.fetch_add(1, Ordering::Relaxed);
634
635                    let timestamp = Timestamp {
636                        seconds: Utc::now().timestamp(),
637                        nanos: 0,
638                    };
639
640                    let event = DataEvent {
641                        sequence,
642                        timestamp: Some(timestamp),
643                        event: Some(data_event::Event::JournalEntry(
644                            SynthService::journal_entry_to_proto(&entry),
645                        )),
646                    };
647
648                    if tx.send(Ok(event)).await.is_err() {
649                        info!("Stream receiver dropped");
650                        break;
651                    }
652
653                    // Rate limiting
654                    tokio::time::sleep(delay).await;
655
656                    // Check max events
657                    if max_events > 0 && sequence >= max_events {
658                        break;
659                    }
660                }
661            }
662
663            // Decrement active streams
664            state.active_streams.fetch_sub(1, Ordering::Relaxed);
665        });
666
667        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
668    }
669
670    /// Control commands for streaming.
671    async fn control(
672        &self,
673        request: Request<ControlCommand>,
674    ) -> Result<Response<ControlResponse>, Status> {
675        let cmd = request.into_inner();
676        let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
677
678        info!("Control command: {:?}", action);
679
680        let (success, message, status) = match action {
681            ControlAction::Pause => {
682                self.state.stream_paused.store(true, Ordering::Relaxed);
683                (true, "Stream paused".to_string(), StreamStatus::Paused)
684            }
685            ControlAction::Resume => {
686                self.state.stream_paused.store(false, Ordering::Relaxed);
687                (true, "Stream resumed".to_string(), StreamStatus::Running)
688            }
689            ControlAction::Stop => {
690                self.state.stream_stopped.store(true, Ordering::Relaxed);
691                (true, "Stream stopped".to_string(), StreamStatus::Stopped)
692            }
693            ControlAction::TriggerPattern => {
694                let pattern = cmd.pattern_name.unwrap_or_default();
695                if pattern.is_empty() {
696                    (
697                        false,
698                        "Pattern name is required for TriggerPattern action".to_string(),
699                        StreamStatus::Running,
700                    )
701                } else {
702                    // Valid patterns: year_end_spike, period_end_spike, holiday_cluster,
703                    // fraud_cluster, error_cluster, or any custom pattern name
704                    let valid_patterns = [
705                        "year_end_spike",
706                        "period_end_spike",
707                        "holiday_cluster",
708                        "fraud_cluster",
709                        "error_cluster",
710                        "uniform",
711                    ];
712                    let is_valid = valid_patterns.contains(&pattern.as_str())
713                        || pattern.starts_with("custom:");
714
715                    if is_valid {
716                        // Store the pattern for the stream generator to pick up
717                        if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
718                            *triggered = Some(pattern.clone());
719                        }
720                        info!("Pattern trigger activated: {}", pattern);
721                        (
722                            true,
723                            format!("Pattern '{pattern}' will be applied to upcoming entries"),
724                            StreamStatus::Running,
725                        )
726                    } else {
727                        (
728                            false,
729                            format!(
730                                "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
731                            ),
732                            StreamStatus::Running,
733                        )
734                    }
735                }
736            }
737            ControlAction::Unspecified => (
738                false,
739                "Unknown control action".to_string(),
740                StreamStatus::Unspecified,
741            ),
742        };
743
744        Ok(Response::new(ControlResponse {
745            success,
746            message,
747            current_status: status as i32,
748        }))
749    }
750
751    /// Get current configuration.
752    async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
753        let config = self.state.config.read().await;
754        let proto_config = Self::config_to_proto(&config);
755
756        Ok(Response::new(ConfigResponse {
757            success: true,
758            message: "Current configuration retrieved".to_string(),
759            current_config: Some(proto_config),
760        }))
761    }
762
763    /// Set configuration.
764    async fn set_config(
765        &self,
766        request: Request<ConfigRequest>,
767    ) -> Result<Response<ConfigResponse>, Status> {
768        let req = request.into_inner();
769
770        if let Some(proto_config) = req.config {
771            let new_config = self.proto_to_config(Some(proto_config)).await?;
772
773            let mut config = self.state.config.write().await;
774            *config = new_config.clone();
775
776            info!("Configuration updated");
777
778            Ok(Response::new(ConfigResponse {
779                success: true,
780                message: "Configuration updated".to_string(),
781                current_config: Some(Self::config_to_proto(&new_config)),
782            }))
783        } else {
784            Err(Status::invalid_argument("No configuration provided"))
785        }
786    }
787
788    /// Get server metrics.
789    async fn get_metrics(
790        &self,
791        _request: Request<()>,
792    ) -> Result<Response<MetricsResponse>, Status> {
793        let uptime = self.state.uptime_seconds();
794        let total_entries = self.state.total_entries.load(Ordering::Relaxed);
795
796        let entries_per_second = if uptime > 0 {
797            total_entries as f64 / uptime as f64
798        } else {
799            0.0
800        };
801
802        Ok(Response::new(MetricsResponse {
803            total_entries_generated: total_entries,
804            total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
805            uptime_seconds: uptime,
806            session_entries: total_entries,
807            session_entries_per_second: entries_per_second,
808            active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
809            total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
810        }))
811    }
812
813    /// Health check.
814    async fn health_check(
815        &self,
816        _request: Request<()>,
817    ) -> Result<Response<HealthResponse>, Status> {
818        Ok(Response::new(HealthResponse {
819            healthy: true,
820            version: env!("CARGO_PKG_VERSION").to_string(),
821            uptime_seconds: self.state.uptime_seconds(),
822        }))
823    }
824}
825
826/// Create a default GeneratorConfig.
827pub fn default_generator_config() -> GeneratorConfig {
828    GeneratorConfig {
829        global: GlobalConfig {
830            seed: None,
831            industry: IndustrySector::Manufacturing,
832            start_date: "2024-01-01".to_string(),
833            period_months: 12,
834            group_currency: "USD".to_string(),
835            parallel: true,
836            worker_threads: 0,
837            memory_limit_mb: 0,
838            fiscal_year_months: None,
839        },
840        companies: vec![CompanyConfig {
841            code: "1000".to_string(),
842            name: "Default Company".to_string(),
843            currency: "USD".to_string(),
844            country: "US".to_string(),
845            annual_transaction_volume: TransactionVolume::TenK,
846            volume_weight: 1.0,
847            fiscal_year_variant: "K4".to_string(),
848        }],
849        chart_of_accounts: ChartOfAccountsConfig {
850            complexity: CoAComplexity::Small,
851            industry_specific: true,
852            custom_accounts: None,
853            min_hierarchy_depth: 2,
854            max_hierarchy_depth: 5,
855        },
856        transactions: Default::default(),
857        output: OutputConfig::default(),
858        fraud: Default::default(),
859        internal_controls: Default::default(),
860        business_processes: Default::default(),
861        user_personas: Default::default(),
862        templates: Default::default(),
863        approval: Default::default(),
864        departments: Default::default(),
865        master_data: Default::default(),
866        document_flows: Default::default(),
867        intercompany: Default::default(),
868        balance: Default::default(),
869        ocpm: Default::default(),
870        audit: Default::default(),
871        banking: Default::default(),
872        data_quality: Default::default(),
873        scenario: Default::default(),
874        temporal: Default::default(),
875        graph_export: Default::default(),
876        streaming: Default::default(),
877        rate_limit: Default::default(),
878        temporal_attributes: Default::default(),
879        relationships: Default::default(),
880        accounting_standards: Default::default(),
881        audit_standards: Default::default(),
882        distributions: Default::default(),
883        temporal_patterns: Default::default(),
884        vendor_network: Default::default(),
885        customer_segmentation: Default::default(),
886        relationship_strength: Default::default(),
887        cross_process_links: Default::default(),
888        organizational_events: Default::default(),
889        behavioral_drift: Default::default(),
890        market_drift: Default::default(),
891        drift_labeling: Default::default(),
892        anomaly_injection: Default::default(),
893        industry_specific: Default::default(),
894        fingerprint_privacy: Default::default(),
895        quality_gates: Default::default(),
896        compliance: Default::default(),
897        webhooks: Default::default(),
898        llm: Default::default(),
899        diffusion: Default::default(),
900        causal: Default::default(),
901        source_to_pay: Default::default(),
902        financial_reporting: Default::default(),
903        hr: Default::default(),
904        manufacturing: Default::default(),
905        sales_quotes: Default::default(),
906        tax: Default::default(),
907        treasury: Default::default(),
908        project_accounting: Default::default(),
909        esg: Default::default(),
910        country_packs: None,
911        scenarios: Default::default(),
912        session: Default::default(),
913    }
914}
915
916#[cfg(test)]
917#[allow(clippy::unwrap_used)]
918mod tests {
919    use super::*;
920    use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
921
922    // ==========================================================================
923    // Service Creation Tests
924    // ==========================================================================
925
926    #[tokio::test]
927    async fn test_service_creation() {
928        let config = default_generator_config();
929        let service = SynthService::new(config);
930        // Service should start with zero or very small uptime (test completes quickly)
931        assert!(service.state.uptime_seconds() < 60);
932    }
933
934    #[tokio::test]
935    async fn test_service_with_state() {
936        let config = default_generator_config();
937        let state = Arc::new(ServerState::new(config));
938        let service = SynthService::with_state(Arc::clone(&state));
939
940        // Should share the same state
941        state.total_entries.store(100, Ordering::Relaxed);
942        assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
943    }
944
945    // ==========================================================================
946    // Health Check Tests
947    // ==========================================================================
948
949    #[tokio::test]
950    async fn test_health_check() {
951        let config = default_generator_config();
952        let service = SynthService::new(config);
953
954        let response = service.health_check(Request::new(())).await.unwrap();
955        let health = response.into_inner();
956
957        assert!(health.healthy);
958        assert!(!health.version.is_empty());
959    }
960
961    #[tokio::test]
962    async fn test_health_check_returns_version() {
963        let config = default_generator_config();
964        let service = SynthService::new(config);
965
966        let response = service.health_check(Request::new(())).await.unwrap();
967        let health = response.into_inner();
968
969        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
970    }
971
972    // ==========================================================================
973    // Configuration Tests
974    // ==========================================================================
975
976    #[tokio::test]
977    async fn test_get_config() {
978        let config = default_generator_config();
979        let service = SynthService::new(config);
980
981        let response = service.get_config(Request::new(())).await.unwrap();
982        let config_response = response.into_inner();
983
984        assert!(config_response.success);
985        assert!(config_response.current_config.is_some());
986    }
987
988    #[tokio::test]
989    async fn test_get_config_returns_industry() {
990        let config = default_generator_config();
991        let service = SynthService::new(config);
992
993        let response = service.get_config(Request::new(())).await.unwrap();
994        let config_response = response.into_inner();
995        let current = config_response.current_config.unwrap();
996
997        assert_eq!(current.industry, "Manufacturing");
998    }
999
1000    #[tokio::test]
1001    async fn test_set_config() {
1002        let config = default_generator_config();
1003        let service = SynthService::new(config);
1004
1005        let new_config = GenerationConfig {
1006            industry: "retail".to_string(),
1007            start_date: "2024-06-01".to_string(),
1008            period_months: 6,
1009            seed: 42,
1010            coa_complexity: "medium".to_string(),
1011            companies: vec![],
1012            fraud_enabled: true,
1013            fraud_rate: 0.05,
1014            generate_master_data: false,
1015            generate_document_flows: false,
1016        };
1017
1018        let response = service
1019            .set_config(Request::new(ConfigRequest {
1020                config: Some(new_config),
1021            }))
1022            .await
1023            .unwrap();
1024        let config_response = response.into_inner();
1025
1026        assert!(config_response.success);
1027    }
1028
1029    #[tokio::test]
1030    async fn test_set_config_without_config_fails() {
1031        let config = default_generator_config();
1032        let service = SynthService::new(config);
1033
1034        let result = service
1035            .set_config(Request::new(ConfigRequest { config: None }))
1036            .await;
1037
1038        assert!(result.is_err());
1039    }
1040
1041    // ==========================================================================
1042    // Metrics Tests
1043    // ==========================================================================
1044
1045    #[tokio::test]
1046    async fn test_get_metrics_initial() {
1047        let config = default_generator_config();
1048        let service = SynthService::new(config);
1049
1050        let response = service.get_metrics(Request::new(())).await.unwrap();
1051        let metrics = response.into_inner();
1052
1053        assert_eq!(metrics.total_entries_generated, 0);
1054        assert_eq!(metrics.total_anomalies_injected, 0);
1055        assert_eq!(metrics.active_streams, 0);
1056    }
1057
1058    #[tokio::test]
1059    async fn test_get_metrics_after_updates() {
1060        let config = default_generator_config();
1061        let service = SynthService::new(config);
1062
1063        // Simulate some activity
1064        service.state.total_entries.store(1000, Ordering::Relaxed);
1065        service.state.total_anomalies.store(20, Ordering::Relaxed);
1066        service.state.active_streams.store(2, Ordering::Relaxed);
1067
1068        let response = service.get_metrics(Request::new(())).await.unwrap();
1069        let metrics = response.into_inner();
1070
1071        assert_eq!(metrics.total_entries_generated, 1000);
1072        assert_eq!(metrics.total_anomalies_injected, 20);
1073        assert_eq!(metrics.active_streams, 2);
1074    }
1075
1076    // ==========================================================================
1077    // Control Tests
1078    // ==========================================================================
1079
1080    #[tokio::test]
1081    async fn test_control_pause() {
1082        let config = default_generator_config();
1083        let service = SynthService::new(config);
1084
1085        let response = service
1086            .control(Request::new(ControlCommand {
1087                action: ControlAction::Pause as i32,
1088                pattern_name: None,
1089            }))
1090            .await
1091            .unwrap();
1092        let control_response = response.into_inner();
1093
1094        assert!(control_response.success);
1095        assert!(service.state.stream_paused.load(Ordering::Relaxed));
1096    }
1097
1098    #[tokio::test]
1099    async fn test_control_resume() {
1100        let config = default_generator_config();
1101        let service = SynthService::new(config);
1102
1103        // First pause
1104        service.state.stream_paused.store(true, Ordering::Relaxed);
1105
1106        let response = service
1107            .control(Request::new(ControlCommand {
1108                action: ControlAction::Resume as i32,
1109                pattern_name: None,
1110            }))
1111            .await
1112            .unwrap();
1113        let control_response = response.into_inner();
1114
1115        assert!(control_response.success);
1116        assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1117    }
1118
1119    #[tokio::test]
1120    async fn test_control_stop() {
1121        let config = default_generator_config();
1122        let service = SynthService::new(config);
1123
1124        let response = service
1125            .control(Request::new(ControlCommand {
1126                action: ControlAction::Stop as i32,
1127                pattern_name: None,
1128            }))
1129            .await
1130            .unwrap();
1131        let control_response = response.into_inner();
1132
1133        assert!(control_response.success);
1134        assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1135    }
1136
1137    // ==========================================================================
1138    // ServerState Tests
1139    // ==========================================================================
1140
1141    #[test]
1142    fn test_server_state_creation() {
1143        let config = default_generator_config();
1144        let state = ServerState::new(config);
1145
1146        assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1147        assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1148        assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1149        assert!(!state.stream_paused.load(Ordering::Relaxed));
1150        assert!(!state.stream_stopped.load(Ordering::Relaxed));
1151    }
1152
1153    #[test]
1154    fn test_server_state_uptime() {
1155        let config = default_generator_config();
1156        let state = ServerState::new(config);
1157
1158        // Uptime should be small since we just created the state
1159        assert!(state.uptime_seconds() < 60);
1160    }
1161
1162    // ==========================================================================
1163    // Proto Conversion Tests
1164    // ==========================================================================
1165
1166    #[test]
1167    fn test_default_generator_config() {
1168        let config = default_generator_config();
1169
1170        assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1171        assert_eq!(config.global.period_months, 12);
1172        assert!(!config.companies.is_empty());
1173        assert_eq!(config.companies[0].code, "1000");
1174    }
1175
1176    #[test]
1177    fn test_config_to_proto() {
1178        let config = default_generator_config();
1179        let proto = SynthService::config_to_proto(&config);
1180
1181        assert_eq!(proto.industry, "Manufacturing");
1182        assert_eq!(proto.period_months, 12);
1183        assert!(!proto.companies.is_empty());
1184    }
1185
1186    #[tokio::test]
1187    async fn test_proto_to_config_with_none() {
1188        let config = default_generator_config();
1189        let service = SynthService::new(config.clone());
1190
1191        let result = service.proto_to_config(None).await.unwrap();
1192
1193        // Should return current config when None is passed
1194        assert_eq!(result.global.industry, config.global.industry);
1195    }
1196
1197    #[tokio::test]
1198    async fn test_proto_to_config_with_retail() {
1199        let config = default_generator_config();
1200        let service = SynthService::new(config);
1201
1202        let proto = GenerationConfig {
1203            industry: "retail".to_string(),
1204            start_date: "2024-01-01".to_string(),
1205            period_months: 6,
1206            seed: 0,
1207            coa_complexity: "large".to_string(),
1208            companies: vec![],
1209            fraud_enabled: false,
1210            fraud_rate: 0.0,
1211            generate_master_data: false,
1212            generate_document_flows: false,
1213        };
1214
1215        let result = service.proto_to_config(Some(proto)).await.unwrap();
1216
1217        assert_eq!(result.global.industry, IndustrySector::Retail);
1218        assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1219    }
1220
1221    #[tokio::test]
1222    async fn test_proto_to_config_with_healthcare() {
1223        let config = default_generator_config();
1224        let service = SynthService::new(config);
1225
1226        let proto = GenerationConfig {
1227            industry: "healthcare".to_string(),
1228            start_date: "2024-01-01".to_string(),
1229            period_months: 12,
1230            seed: 42,
1231            coa_complexity: "small".to_string(),
1232            companies: vec![],
1233            fraud_enabled: true,
1234            fraud_rate: 0.1,
1235            generate_master_data: true,
1236            generate_document_flows: true,
1237        };
1238
1239        let result = service.proto_to_config(Some(proto)).await.unwrap();
1240
1241        assert_eq!(result.global.industry, IndustrySector::Healthcare);
1242        assert_eq!(result.global.seed, Some(42));
1243        assert!(result.fraud.enabled);
1244        assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1245    }
1246
1247    #[tokio::test]
1248    async fn test_proto_to_config_with_companies() {
1249        let config = default_generator_config();
1250        let service = SynthService::new(config);
1251
1252        let proto = GenerationConfig {
1253            industry: "technology".to_string(),
1254            start_date: "2024-01-01".to_string(),
1255            period_months: 12,
1256            seed: 0,
1257            coa_complexity: "medium".to_string(),
1258            companies: vec![
1259                CompanyConfigProto {
1260                    code: "1000".to_string(),
1261                    name: "Parent Corp".to_string(),
1262                    currency: "USD".to_string(),
1263                    country: "US".to_string(),
1264                    annual_transaction_volume: 100000,
1265                    volume_weight: 1.0,
1266                },
1267                CompanyConfigProto {
1268                    code: "2000".to_string(),
1269                    name: "EU Sub".to_string(),
1270                    currency: "EUR".to_string(),
1271                    country: "DE".to_string(),
1272                    annual_transaction_volume: 50000,
1273                    volume_weight: 0.5,
1274                },
1275            ],
1276            fraud_enabled: false,
1277            fraud_rate: 0.0,
1278            generate_master_data: false,
1279            generate_document_flows: false,
1280        };
1281
1282        let result = service.proto_to_config(Some(proto)).await.unwrap();
1283
1284        assert_eq!(result.companies.len(), 2);
1285        assert_eq!(result.companies[0].code, "1000");
1286        assert_eq!(result.companies[1].currency, "EUR");
1287    }
1288
1289    // ==========================================================================
1290    // Input Validation Tests
1291    // ==========================================================================
1292
1293    #[tokio::test]
1294    async fn test_bulk_generate_entry_count_validation() {
1295        let config = default_generator_config();
1296        let service = SynthService::new(config);
1297
1298        let request = BulkGenerateRequest {
1299            entry_count: 2_000_000, // Exceeds MAX_ENTRY_COUNT
1300            include_master_data: false,
1301            inject_anomalies: false,
1302            output_format: 0,
1303            config: None,
1304        };
1305
1306        let result = service.bulk_generate(Request::new(request)).await;
1307        assert!(result.is_err());
1308        let err = result.err().unwrap();
1309        assert!(err.message().contains("exceeds maximum allowed value"));
1310    }
1311
1312    #[tokio::test]
1313    async fn test_stream_data_events_per_second_too_low() {
1314        let config = default_generator_config();
1315        let service = SynthService::new(config);
1316
1317        let request = StreamDataRequest {
1318            events_per_second: 0, // Below MIN_EVENTS_PER_SECOND
1319            max_events: 100,
1320            inject_anomalies: false,
1321            anomaly_rate: 0.0,
1322            config: None,
1323        };
1324
1325        let result = service.stream_data(Request::new(request)).await;
1326        assert!(result.is_err());
1327        let err = result.err().unwrap();
1328        assert!(err.message().contains("must be at least"));
1329    }
1330
1331    #[tokio::test]
1332    async fn test_stream_data_events_per_second_too_high() {
1333        let config = default_generator_config();
1334        let service = SynthService::new(config);
1335
1336        let request = StreamDataRequest {
1337            events_per_second: 20_000, // Exceeds MAX_EVENTS_PER_SECOND
1338            max_events: 100,
1339            inject_anomalies: false,
1340            anomaly_rate: 0.0,
1341            config: None,
1342        };
1343
1344        let result = service.stream_data(Request::new(request)).await;
1345        assert!(result.is_err());
1346        let err = result.err().unwrap();
1347        assert!(err.message().contains("exceeds maximum allowed value"));
1348    }
1349
1350    #[tokio::test]
1351    async fn test_stream_data_max_events_too_high() {
1352        let config = default_generator_config();
1353        let service = SynthService::new(config);
1354
1355        let request = StreamDataRequest {
1356            events_per_second: 100,
1357            max_events: 100_000_000, // Exceeds MAX_STREAM_EVENTS
1358            inject_anomalies: false,
1359            anomaly_rate: 0.0,
1360            config: None,
1361        };
1362
1363        let result = service.stream_data(Request::new(request)).await;
1364        assert!(result.is_err());
1365        let err = result.err().unwrap();
1366        assert!(err.message().contains("max_events"));
1367    }
1368
1369    #[tokio::test]
1370    async fn test_stream_data_valid_request() {
1371        let config = default_generator_config();
1372        let service = SynthService::new(config);
1373
1374        let request = StreamDataRequest {
1375            events_per_second: 10,
1376            max_events: 5,
1377            inject_anomalies: false,
1378            anomaly_rate: 0.0,
1379            config: None,
1380        };
1381
1382        // This should succeed - we can't easily test the stream output,
1383        // but we verify the request is accepted
1384        let result = service.stream_data(Request::new(request)).await;
1385        assert!(result.is_ok());
1386    }
1387}