Skip to main content

datasynth_server/grpc/
service.rs

1//! gRPC service implementation.
2
3use std::pin::Pin;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use chrono::Utc;
9use futures::Stream;
10use prost_types::Timestamp;
11use tokio::sync::{mpsc, RwLock};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{Request, Response, Status};
14use tracing::{error, info, warn};
15
16use datasynth_config::schema::{
17    ChartOfAccountsConfig, CompanyConfig, GeneratorConfig, GlobalConfig, OutputConfig,
18    TransactionVolume,
19};
20use datasynth_core::models::{CoAComplexity, IndustrySector, JournalEntry};
21use datasynth_core::{DegradationLevel, ResourceGuard, ResourceGuardBuilder};
22use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
23
24use super::synth::*;
25
26/// Server state for tracking metrics and configuration.
27pub struct ServerState {
28    /// Current configuration
29    pub config: RwLock<GeneratorConfig>,
30    /// Configuration source for reloading
31    pub config_source: RwLock<crate::config_loader::ConfigSource>,
32    /// Server start time
33    start_time: Instant,
34    /// Total entries generated
35    pub total_entries: AtomicU64,
36    /// Total anomalies injected
37    pub total_anomalies: AtomicU64,
38    /// Active streams count
39    pub active_streams: AtomicU64,
40    /// Total stream events
41    pub total_stream_events: AtomicU64,
42    /// Stream control flag
43    pub stream_paused: AtomicBool,
44    /// Stream stop flag
45    pub stream_stopped: AtomicBool,
46    /// Stream events per second (0 = unlimited)
47    pub stream_events_per_second: AtomicU64,
48    /// Stream maximum events (0 = unlimited)
49    pub stream_max_events: AtomicU64,
50    /// Stream anomaly injection flag
51    pub stream_inject_anomalies: AtomicBool,
52    /// Triggered pattern name (if any) - will be applied to next generated entries
53    pub triggered_pattern: RwLock<Option<String>>,
54    /// Resource guard for memory and disk monitoring
55    pub resource_guard: Arc<ResourceGuard>,
56    /// Maximum concurrent generations allowed
57    max_concurrent_generations: AtomicU64,
58}
59
60impl ServerState {
61    pub fn new(config: GeneratorConfig) -> Self {
62        // Build resource guard from config
63        let memory_limit = config.global.memory_limit_mb;
64        let resource_guard = if memory_limit > 0 {
65            ResourceGuardBuilder::new()
66                .memory_limit(memory_limit)
67                .conservative()
68                .build()
69        } else {
70            // Default: 2GB limit for server
71            ResourceGuardBuilder::new()
72                .memory_limit(2048)
73                .conservative()
74                .build()
75        };
76
77        Self {
78            config: RwLock::new(config),
79            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
80            start_time: Instant::now(),
81            total_entries: AtomicU64::new(0),
82            total_anomalies: AtomicU64::new(0),
83            active_streams: AtomicU64::new(0),
84            total_stream_events: AtomicU64::new(0),
85            stream_paused: AtomicBool::new(false),
86            stream_stopped: AtomicBool::new(false),
87            stream_events_per_second: AtomicU64::new(0),
88            stream_max_events: AtomicU64::new(0),
89            stream_inject_anomalies: AtomicBool::new(false),
90            triggered_pattern: RwLock::new(None),
91            resource_guard: Arc::new(resource_guard),
92            max_concurrent_generations: AtomicU64::new(4),
93        }
94    }
95
96    /// Create with custom resource limits.
97    pub fn with_resource_limits(config: GeneratorConfig, memory_limit_mb: usize) -> Self {
98        let resource_guard = ResourceGuardBuilder::new()
99            .memory_limit(memory_limit_mb)
100            .conservative()
101            .build();
102
103        Self {
104            config: RwLock::new(config),
105            config_source: RwLock::new(crate::config_loader::ConfigSource::Default),
106            start_time: Instant::now(),
107            total_entries: AtomicU64::new(0),
108            total_anomalies: AtomicU64::new(0),
109            active_streams: AtomicU64::new(0),
110            total_stream_events: AtomicU64::new(0),
111            stream_paused: AtomicBool::new(false),
112            stream_stopped: AtomicBool::new(false),
113            stream_events_per_second: AtomicU64::new(0),
114            stream_max_events: AtomicU64::new(0),
115            stream_inject_anomalies: AtomicBool::new(false),
116            triggered_pattern: RwLock::new(None),
117            resource_guard: Arc::new(resource_guard),
118            max_concurrent_generations: AtomicU64::new(4),
119        }
120    }
121
122    pub fn uptime_seconds(&self) -> u64 {
123        self.start_time.elapsed().as_secs()
124    }
125
126    /// Check if resources are available for a new generation.
127    #[allow(clippy::result_large_err)] // tonic::Status is the idiomatic error type for gRPC
128    pub fn check_resources(&self) -> Result<DegradationLevel, Status> {
129        // Check if too many concurrent generations
130        let active = self.active_streams.load(Ordering::Relaxed);
131        let max = self.max_concurrent_generations.load(Ordering::Relaxed);
132        if active >= max {
133            return Err(Status::resource_exhausted(format!(
134                "Too many concurrent generations ({active}/{max}). Try again later."
135            )));
136        }
137
138        // Check memory and other resources
139        match self.resource_guard.check() {
140            Ok(level) => {
141                if level == DegradationLevel::Emergency {
142                    Err(Status::resource_exhausted(
143                        "Server resources critically low. Generation not possible.",
144                    ))
145                } else if level == DegradationLevel::Minimal {
146                    warn!("Resources constrained, generation may be limited");
147                    Ok(level)
148                } else {
149                    Ok(level)
150                }
151            }
152            Err(e) => Err(Status::resource_exhausted(format!(
153                "Resource check failed: {e}"
154            ))),
155        }
156    }
157
158    /// Get current resource status for monitoring.
159    pub fn resource_status(&self) -> ResourceStatus {
160        let stats = self.resource_guard.stats();
161        ResourceStatus {
162            memory_usage_mb: stats.memory.resident_bytes / (1024 * 1024),
163            memory_peak_mb: stats.memory.peak_resident_bytes / (1024 * 1024),
164            disk_available_mb: stats.disk.available_bytes / (1024 * 1024),
165            degradation_level: stats.degradation_level.name().to_string(),
166            active_generations: self.active_streams.load(Ordering::Relaxed),
167        }
168    }
169}
170
171/// Resource status for monitoring endpoints.
172#[derive(Debug, Clone)]
173pub struct ResourceStatus {
174    pub memory_usage_mb: u64,
175    pub memory_peak_mb: u64,
176    pub disk_available_mb: u64,
177    pub degradation_level: String,
178    pub active_generations: u64,
179}
180
181/// Main gRPC service implementation.
182pub struct SynthService {
183    pub state: Arc<ServerState>,
184}
185
186impl SynthService {
187    pub fn new(config: GeneratorConfig) -> Self {
188        Self {
189            state: Arc::new(ServerState::new(config)),
190        }
191    }
192
193    pub fn with_state(state: Arc<ServerState>) -> Self {
194        Self { state }
195    }
196
197    /// Convert a GenerationConfig proto to GeneratorConfig.
198    async fn proto_to_config(
199        &self,
200        proto: Option<GenerationConfig>,
201    ) -> Result<GeneratorConfig, Status> {
202        match proto {
203            Some(p) => {
204                let industry = match p.industry.to_lowercase().as_str() {
205                    "manufacturing" => IndustrySector::Manufacturing,
206                    "retail" => IndustrySector::Retail,
207                    "financial_services" | "financial" => IndustrySector::FinancialServices,
208                    "healthcare" => IndustrySector::Healthcare,
209                    "technology" => IndustrySector::Technology,
210                    "" => IndustrySector::Manufacturing, // empty defaults to manufacturing
211                    other => {
212                        return Err(Status::invalid_argument(format!(
213                            "Unknown industry '{other}'. Valid values: manufacturing, retail, financial_services, healthcare, technology"
214                        )));
215                    }
216                };
217
218                let complexity = match p.coa_complexity.to_lowercase().as_str() {
219                    "small" => CoAComplexity::Small,
220                    "medium" => CoAComplexity::Medium,
221                    "large" => CoAComplexity::Large,
222                    "" => CoAComplexity::Small, // empty defaults to small
223                    other => {
224                        return Err(Status::invalid_argument(format!(
225                            "Unknown coa_complexity '{other}'. Valid values: small, medium, large"
226                        )));
227                    }
228                };
229
230                let companies: Vec<CompanyConfig> = if p.companies.is_empty() {
231                    vec![CompanyConfig {
232                        code: "1000".to_string(),
233                        name: "Default Company".to_string(),
234                        currency: "USD".to_string(),
235                        functional_currency: None,
236                        country: "US".to_string(),
237                        annual_transaction_volume: TransactionVolume::TenK,
238                        volume_weight: 1.0,
239                        fiscal_year_variant: "K4".to_string(),
240                    }]
241                } else {
242                    p.companies
243                        .into_iter()
244                        .map(|c| CompanyConfig {
245                            code: c.code,
246                            name: c.name,
247                            currency: c.currency,
248                            functional_currency: None,
249                            country: c.country,
250                            annual_transaction_volume: TransactionVolume::Custom(
251                                c.annual_transaction_volume,
252                            ),
253                            volume_weight: c.volume_weight as f64,
254                            fiscal_year_variant: "K4".to_string(),
255                        })
256                        .collect()
257                };
258
259                let mut config = GeneratorConfig {
260                    global: GlobalConfig {
261                        seed: if p.seed > 0 { Some(p.seed) } else { None },
262                        industry,
263                        start_date: if p.start_date.is_empty() {
264                            "2024-01-01".to_string()
265                        } else {
266                            p.start_date
267                        },
268                        period_months: if p.period_months == 0 {
269                            12
270                        } else {
271                            p.period_months
272                        },
273                        group_currency: "USD".to_string(),
274                        presentation_currency: None,
275                        parallel: true,
276                        worker_threads: 0,
277                        memory_limit_mb: 0,
278                        fiscal_year_months: None,
279                    },
280                    companies,
281                    chart_of_accounts: ChartOfAccountsConfig {
282                        complexity,
283                        industry_specific: true,
284                        custom_accounts: None,
285                        min_hierarchy_depth: 2,
286                        max_hierarchy_depth: 5,
287                        expand_industry_subaccounts: false,
288                    },
289                    ..default_generator_config()
290                };
291
292                // Enable fraud if requested
293                if p.fraud_enabled {
294                    config.fraud.enabled = true;
295                    config.fraud.fraud_rate = p.fraud_rate as f64;
296                }
297
298                Ok(config)
299            }
300            None => {
301                // Use current server config
302                let config = self.state.config.read().await;
303                Ok(config.clone())
304            }
305        }
306    }
307
308    /// Convert a JournalEntry to proto format.
309    fn journal_entry_to_proto(entry: &JournalEntry) -> JournalEntryProto {
310        JournalEntryProto {
311            document_id: entry.header.document_id.to_string(),
312            company_code: entry.header.company_code.clone(),
313            fiscal_year: entry.header.fiscal_year as u32,
314            fiscal_period: entry.header.fiscal_period as u32,
315            posting_date: entry.header.posting_date.to_string(),
316            document_date: entry.header.document_date.to_string(),
317            created_at: entry.header.created_at.to_rfc3339(),
318            source: format!("{:?}", entry.header.source),
319            business_process: entry.header.business_process.map(|bp| format!("{bp:?}")),
320            lines: entry
321                .lines
322                .iter()
323                .map(|line| {
324                    let amount = if line.is_debit() {
325                        line.debit_amount
326                    } else {
327                        line.credit_amount
328                    };
329                    JournalLineProto {
330                        line_number: line.line_number,
331                        account_number: line.gl_account.clone(),
332                        account_name: line.account_description.clone().unwrap_or_default(),
333                        amount: amount.to_string(),
334                        is_debit: line.is_debit(),
335                        cost_center: line.cost_center.clone(),
336                        profit_center: line.profit_center.clone(),
337                        // vendor_id/customer_id/material_id are not on JournalEntryLine;
338                        // populate from auxiliary_account_number when available (French GAAP)
339                        vendor_id: line.auxiliary_account_number.clone(),
340                        customer_id: None,
341                        material_id: None,
342                        text: line.line_text.clone().or_else(|| line.text.clone()),
343                    }
344                })
345                .collect(),
346            is_anomaly: entry.header.is_fraud,
347            anomaly_type: entry.header.fraud_type.map(|ft| format!("{ft:?}")),
348        }
349    }
350
351    /// Convert current config to proto format.
352    fn config_to_proto(config: &GeneratorConfig) -> GenerationConfig {
353        GenerationConfig {
354            industry: format!("{:?}", config.global.industry),
355            start_date: config.global.start_date.clone(),
356            period_months: config.global.period_months,
357            seed: config.global.seed.unwrap_or(0),
358            coa_complexity: format!("{:?}", config.chart_of_accounts.complexity),
359            companies: config
360                .companies
361                .iter()
362                .map(|c| CompanyConfigProto {
363                    code: c.code.clone(),
364                    name: c.name.clone(),
365                    currency: c.currency.clone(),
366                    country: c.country.clone(),
367                    annual_transaction_volume: c.annual_transaction_volume.count(),
368                    volume_weight: c.volume_weight as f32,
369                })
370                .collect(),
371            fraud_enabled: config.fraud.enabled,
372            fraud_rate: config.fraud.fraud_rate as f32,
373            generate_master_data: config.master_data.vendors.count > 0
374                || config.master_data.customers.count > 0
375                || config.master_data.materials.count > 0,
376            generate_document_flows: config.document_flows.p2p.enabled
377                || config.document_flows.o2c.enabled,
378        }
379    }
380}
381
382#[tonic::async_trait]
383impl synthetic_data_service_server::SyntheticDataService for SynthService {
384    /// Bulk generation - generates all data at once and returns.
385    async fn bulk_generate(
386        &self,
387        request: Request<BulkGenerateRequest>,
388    ) -> Result<Response<BulkGenerateResponse>, Status> {
389        let req = request.into_inner();
390
391        // Validate entry_count bounds
392        const MAX_ENTRY_COUNT: u64 = 1_000_000;
393        if req.entry_count > MAX_ENTRY_COUNT {
394            return Err(Status::invalid_argument(format!(
395                "entry_count ({}) exceeds maximum allowed value ({})",
396                req.entry_count, MAX_ENTRY_COUNT
397            )));
398        }
399
400        // Check resources before starting generation
401        let degradation_level = self.state.check_resources()?;
402        if degradation_level != DegradationLevel::Normal {
403            warn!(
404                "Starting bulk generation under resource pressure (level: {:?})",
405                degradation_level
406            );
407        }
408
409        info!("Bulk generate request: {} entries", req.entry_count);
410
411        let config = self.proto_to_config(req.config).await?;
412        let start_time = Instant::now();
413
414        // Create orchestrator with appropriate phase config
415        let phase_config = {
416            let mut pc = PhaseConfig::from_config(&config);
417            pc.generate_master_data = req.include_master_data;
418            pc.generate_document_flows = false;
419            pc.generate_journal_entries = true;
420            pc.inject_anomalies = req.inject_anomalies;
421            pc.show_progress = false;
422            pc
423        };
424
425        let mut orchestrator = EnhancedOrchestrator::new(config, phase_config)
426            .map_err(|e| Status::internal(format!("Failed to create orchestrator: {e}")))?;
427
428        let result = orchestrator
429            .generate()
430            .map_err(|e| Status::internal(format!("Generation failed: {e}")))?;
431
432        let duration_ms = start_time.elapsed().as_millis() as u64;
433
434        // Update metrics
435        let entries_count = result.journal_entries.len() as u64;
436        self.state
437            .total_entries
438            .fetch_add(entries_count, Ordering::Relaxed);
439
440        let anomaly_count = result.anomaly_labels.labels.len() as u64;
441        self.state
442            .total_anomalies
443            .fetch_add(anomaly_count, Ordering::Relaxed);
444
445        // Convert to proto
446        let journal_entries: Vec<JournalEntryProto> = result
447            .journal_entries
448            .iter()
449            .map(Self::journal_entry_to_proto)
450            .collect();
451
452        let anomaly_labels: Vec<AnomalyLabelProto> = result
453            .anomaly_labels
454            .labels
455            .iter()
456            .map(|a| AnomalyLabelProto {
457                anomaly_id: a.anomaly_id.clone(),
458                document_id: a.document_id.clone(),
459                anomaly_type: format!("{:?}", a.anomaly_type),
460                anomaly_category: a.document_type.clone(),
461                description: a.description.clone(),
462                severity_score: a.severity as f32,
463            })
464            .collect();
465
466        // Compute stats
467        let mut total_debit = rust_decimal::Decimal::ZERO;
468        let mut total_credit = rust_decimal::Decimal::ZERO;
469        let mut total_lines = 0u64;
470        let mut entries_by_company = std::collections::HashMap::new();
471        let mut entries_by_source = std::collections::HashMap::new();
472
473        for entry in &result.journal_entries {
474            *entries_by_company
475                .entry(entry.header.company_code.clone())
476                .or_insert(0u64) += 1;
477            *entries_by_source
478                .entry(format!("{:?}", entry.header.source))
479                .or_insert(0u64) += 1;
480
481            for line in &entry.lines {
482                total_lines += 1;
483                total_debit += line.debit_amount;
484                total_credit += line.credit_amount;
485            }
486        }
487
488        let stats = GenerationStats {
489            total_entries: entries_count,
490            total_lines,
491            total_debit_amount: total_debit.to_string(),
492            total_credit_amount: total_credit.to_string(),
493            anomaly_count,
494            entries_by_company,
495            entries_by_source,
496        };
497
498        info!(
499            "Bulk generation complete: {} entries in {}ms",
500            entries_count, duration_ms
501        );
502
503        Ok(Response::new(BulkGenerateResponse {
504            entries_generated: entries_count,
505            duration_ms,
506            journal_entries,
507            anomaly_labels,
508            stats: Some(stats),
509        }))
510    }
511
512    type StreamDataStream = Pin<Box<dyn Stream<Item = Result<DataEvent, Status>> + Send + 'static>>;
513
514    /// Streaming generation - continuously generates data events.
515    async fn stream_data(
516        &self,
517        request: Request<StreamDataRequest>,
518    ) -> Result<Response<Self::StreamDataStream>, Status> {
519        let req = request.into_inner();
520
521        // Validate events_per_second bounds
522        const MIN_EVENTS_PER_SECOND: u32 = 1;
523        const MAX_EVENTS_PER_SECOND: u32 = 10_000;
524        if req.events_per_second < MIN_EVENTS_PER_SECOND {
525            return Err(Status::invalid_argument(format!(
526                "events_per_second ({}) must be at least {}",
527                req.events_per_second, MIN_EVENTS_PER_SECOND
528            )));
529        }
530        if req.events_per_second > MAX_EVENTS_PER_SECOND {
531            return Err(Status::invalid_argument(format!(
532                "events_per_second ({}) exceeds maximum allowed value ({})",
533                req.events_per_second, MAX_EVENTS_PER_SECOND
534            )));
535        }
536
537        // Validate max_events if specified
538        const MAX_STREAM_EVENTS: u64 = 10_000_000;
539        if req.max_events > MAX_STREAM_EVENTS {
540            return Err(Status::invalid_argument(format!(
541                "max_events ({}) exceeds maximum allowed value ({})",
542                req.max_events, MAX_STREAM_EVENTS
543            )));
544        }
545
546        // Check resources before starting stream
547        let degradation_level = self.state.check_resources()?;
548        if degradation_level != DegradationLevel::Normal {
549            warn!(
550                "Starting stream under resource pressure (level: {:?})",
551                degradation_level
552            );
553        }
554
555        info!(
556            "Stream data request: {} events/sec, max {}",
557            req.events_per_second, req.max_events
558        );
559
560        let config = self.proto_to_config(req.config).await?;
561        let state = self.state.clone();
562
563        // Increment active streams
564        state.active_streams.fetch_add(1, Ordering::Relaxed);
565
566        // Reset control flags
567        state.stream_paused.store(false, Ordering::Relaxed);
568        state.stream_stopped.store(false, Ordering::Relaxed);
569
570        let (tx, rx) = mpsc::channel(100);
571
572        // Spawn background task to generate and stream data
573        let events_per_second = req.events_per_second;
574        let max_events = req.max_events;
575        let inject_anomalies = req.inject_anomalies;
576
577        tokio::spawn(async move {
578            let phase_config = {
579                let mut pc = PhaseConfig::from_config(&config);
580                pc.generate_master_data = false;
581                pc.generate_document_flows = false;
582                pc.generate_journal_entries = true;
583                pc.inject_anomalies = inject_anomalies;
584                pc.show_progress = false;
585                pc
586            };
587
588            let mut sequence = 0u64;
589            let delay = if events_per_second > 0 {
590                Duration::from_micros(1_000_000 / events_per_second as u64)
591            } else {
592                Duration::from_millis(1)
593            };
594
595            // Create orchestrator once outside the loop to avoid per-iteration overhead
596            let mut orchestrator =
597                match EnhancedOrchestrator::new(config.clone(), phase_config.clone()) {
598                    Ok(o) => o,
599                    Err(e) => {
600                        error!("Failed to create orchestrator: {}", e);
601                        return;
602                    }
603                };
604
605            loop {
606                // Check stop flag
607                if state.stream_stopped.load(Ordering::Relaxed) {
608                    info!("Stream stopped by control command");
609                    break;
610                }
611
612                // Check pause flag
613                while state.stream_paused.load(Ordering::Relaxed) {
614                    tokio::time::sleep(Duration::from_millis(100)).await;
615                    if state.stream_stopped.load(Ordering::Relaxed) {
616                        break;
617                    }
618                }
619
620                // Check max events
621                if max_events > 0 && sequence >= max_events {
622                    info!("Stream reached max events: {}", max_events);
623                    break;
624                }
625
626                // Generate a batch
627                let result = match orchestrator.generate() {
628                    Ok(r) => r,
629                    Err(e) => {
630                        error!("Generation failed: {}", e);
631                        break;
632                    }
633                };
634
635                // Stream each entry
636                for entry in result.journal_entries {
637                    sequence += 1;
638                    state.total_stream_events.fetch_add(1, Ordering::Relaxed);
639                    state.total_entries.fetch_add(1, Ordering::Relaxed);
640
641                    let timestamp = Timestamp {
642                        seconds: Utc::now().timestamp(),
643                        nanos: 0,
644                    };
645
646                    let event = DataEvent {
647                        sequence,
648                        timestamp: Some(timestamp),
649                        event: Some(data_event::Event::JournalEntry(
650                            SynthService::journal_entry_to_proto(&entry),
651                        )),
652                    };
653
654                    if tx.send(Ok(event)).await.is_err() {
655                        info!("Stream receiver dropped");
656                        break;
657                    }
658
659                    // Rate limiting
660                    tokio::time::sleep(delay).await;
661
662                    // Check max events
663                    if max_events > 0 && sequence >= max_events {
664                        break;
665                    }
666                }
667            }
668
669            // Decrement active streams
670            state.active_streams.fetch_sub(1, Ordering::Relaxed);
671        });
672
673        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
674    }
675
676    /// Control commands for streaming.
677    async fn control(
678        &self,
679        request: Request<ControlCommand>,
680    ) -> Result<Response<ControlResponse>, Status> {
681        let cmd = request.into_inner();
682        let action = ControlAction::try_from(cmd.action).unwrap_or(ControlAction::Unspecified);
683
684        info!("Control command: {:?}", action);
685
686        let (success, message, status) = match action {
687            ControlAction::Pause => {
688                self.state.stream_paused.store(true, Ordering::Relaxed);
689                (true, "Stream paused".to_string(), StreamStatus::Paused)
690            }
691            ControlAction::Resume => {
692                self.state.stream_paused.store(false, Ordering::Relaxed);
693                (true, "Stream resumed".to_string(), StreamStatus::Running)
694            }
695            ControlAction::Stop => {
696                self.state.stream_stopped.store(true, Ordering::Relaxed);
697                (true, "Stream stopped".to_string(), StreamStatus::Stopped)
698            }
699            ControlAction::TriggerPattern => {
700                let pattern = cmd.pattern_name.unwrap_or_default();
701                if pattern.is_empty() {
702                    (
703                        false,
704                        "Pattern name is required for TriggerPattern action".to_string(),
705                        StreamStatus::Running,
706                    )
707                } else {
708                    // Valid patterns: year_end_spike, period_end_spike, holiday_cluster,
709                    // fraud_cluster, error_cluster, or any custom pattern name
710                    let valid_patterns = [
711                        "year_end_spike",
712                        "period_end_spike",
713                        "holiday_cluster",
714                        "fraud_cluster",
715                        "error_cluster",
716                        "uniform",
717                    ];
718                    let is_valid = valid_patterns.contains(&pattern.as_str())
719                        || pattern.starts_with("custom:");
720
721                    if is_valid {
722                        // Store the pattern for the stream generator to pick up
723                        if let Ok(mut triggered) = self.state.triggered_pattern.try_write() {
724                            *triggered = Some(pattern.clone());
725                        }
726                        info!("Pattern trigger activated: {}", pattern);
727                        (
728                            true,
729                            format!("Pattern '{pattern}' will be applied to upcoming entries"),
730                            StreamStatus::Running,
731                        )
732                    } else {
733                        (
734                            false,
735                            format!(
736                                "Unknown pattern '{pattern}'. Valid patterns: {valid_patterns:?}"
737                            ),
738                            StreamStatus::Running,
739                        )
740                    }
741                }
742            }
743            ControlAction::Unspecified => (
744                false,
745                "Unknown control action".to_string(),
746                StreamStatus::Unspecified,
747            ),
748        };
749
750        Ok(Response::new(ControlResponse {
751            success,
752            message,
753            current_status: status as i32,
754        }))
755    }
756
757    /// Get current configuration.
758    async fn get_config(&self, _request: Request<()>) -> Result<Response<ConfigResponse>, Status> {
759        let config = self.state.config.read().await;
760        let proto_config = Self::config_to_proto(&config);
761
762        Ok(Response::new(ConfigResponse {
763            success: true,
764            message: "Current configuration retrieved".to_string(),
765            current_config: Some(proto_config),
766        }))
767    }
768
769    /// Set configuration.
770    async fn set_config(
771        &self,
772        request: Request<ConfigRequest>,
773    ) -> Result<Response<ConfigResponse>, Status> {
774        let req = request.into_inner();
775
776        if let Some(proto_config) = req.config {
777            let new_config = self.proto_to_config(Some(proto_config)).await?;
778
779            let mut config = self.state.config.write().await;
780            *config = new_config.clone();
781
782            info!("Configuration updated");
783
784            Ok(Response::new(ConfigResponse {
785                success: true,
786                message: "Configuration updated".to_string(),
787                current_config: Some(Self::config_to_proto(&new_config)),
788            }))
789        } else {
790            Err(Status::invalid_argument("No configuration provided"))
791        }
792    }
793
794    /// Get server metrics.
795    async fn get_metrics(
796        &self,
797        _request: Request<()>,
798    ) -> Result<Response<MetricsResponse>, Status> {
799        let uptime = self.state.uptime_seconds();
800        let total_entries = self.state.total_entries.load(Ordering::Relaxed);
801
802        let entries_per_second = if uptime > 0 {
803            total_entries as f64 / uptime as f64
804        } else {
805            0.0
806        };
807
808        Ok(Response::new(MetricsResponse {
809            total_entries_generated: total_entries,
810            total_anomalies_injected: self.state.total_anomalies.load(Ordering::Relaxed),
811            uptime_seconds: uptime,
812            session_entries: total_entries,
813            session_entries_per_second: entries_per_second,
814            active_streams: self.state.active_streams.load(Ordering::Relaxed) as u32,
815            total_stream_events: self.state.total_stream_events.load(Ordering::Relaxed),
816        }))
817    }
818
819    /// Health check.
820    async fn health_check(
821        &self,
822        _request: Request<()>,
823    ) -> Result<Response<HealthResponse>, Status> {
824        Ok(Response::new(HealthResponse {
825            healthy: true,
826            version: env!("CARGO_PKG_VERSION").to_string(),
827            uptime_seconds: self.state.uptime_seconds(),
828        }))
829    }
830}
831
832/// Create a default GeneratorConfig.
833pub fn default_generator_config() -> GeneratorConfig {
834    GeneratorConfig {
835        global: GlobalConfig {
836            seed: None,
837            industry: IndustrySector::Manufacturing,
838            start_date: "2024-01-01".to_string(),
839            period_months: 12,
840            group_currency: "USD".to_string(),
841            presentation_currency: None,
842            parallel: true,
843            worker_threads: 0,
844            memory_limit_mb: 0,
845            fiscal_year_months: None,
846        },
847        companies: vec![CompanyConfig {
848            code: "1000".to_string(),
849            name: "Default Company".to_string(),
850            currency: "USD".to_string(),
851            functional_currency: None,
852            country: "US".to_string(),
853            annual_transaction_volume: TransactionVolume::TenK,
854            volume_weight: 1.0,
855            fiscal_year_variant: "K4".to_string(),
856        }],
857        chart_of_accounts: ChartOfAccountsConfig {
858            complexity: CoAComplexity::Small,
859            industry_specific: true,
860            custom_accounts: None,
861            min_hierarchy_depth: 2,
862            max_hierarchy_depth: 5,
863            expand_industry_subaccounts: false,
864        },
865        transactions: Default::default(),
866        output: OutputConfig::default(),
867        fraud: Default::default(),
868        internal_controls: Default::default(),
869        business_processes: Default::default(),
870        user_personas: Default::default(),
871        templates: Default::default(),
872        approval: Default::default(),
873        departments: Default::default(),
874        master_data: Default::default(),
875        document_flows: Default::default(),
876        intercompany: Default::default(),
877        balance: Default::default(),
878        ocpm: Default::default(),
879        audit: Default::default(),
880        banking: Default::default(),
881        data_quality: Default::default(),
882        scenario: Default::default(),
883        temporal: Default::default(),
884        graph_export: Default::default(),
885        streaming: Default::default(),
886        rate_limit: Default::default(),
887        temporal_attributes: Default::default(),
888        relationships: Default::default(),
889        accounting_standards: Default::default(),
890        audit_standards: Default::default(),
891        distributions: Default::default(),
892        temporal_patterns: Default::default(),
893        vendor_network: Default::default(),
894        customer_segmentation: Default::default(),
895        relationship_strength: Default::default(),
896        cross_process_links: Default::default(),
897        organizational_events: Default::default(),
898        behavioral_drift: Default::default(),
899        market_drift: Default::default(),
900        drift_labeling: Default::default(),
901        anomaly_injection: Default::default(),
902        industry_specific: Default::default(),
903        fingerprint_privacy: Default::default(),
904        quality_gates: Default::default(),
905        compliance: Default::default(),
906        webhooks: Default::default(),
907        llm: Default::default(),
908        diffusion: Default::default(),
909        causal: Default::default(),
910        source_to_pay: Default::default(),
911        financial_reporting: Default::default(),
912        hr: Default::default(),
913        manufacturing: Default::default(),
914        sales_quotes: Default::default(),
915        tax: Default::default(),
916        treasury: Default::default(),
917        project_accounting: Default::default(),
918        esg: Default::default(),
919        country_packs: None,
920        scenarios: Default::default(),
921        session: Default::default(),
922        compliance_regulations: Default::default(),
923        analytics_metadata: Default::default(),
924        concentration: Default::default(),
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use super::*;
931    use crate::grpc::synth::synthetic_data_service_server::SyntheticDataService;
932
933    // ==========================================================================
934    // Service Creation Tests
935    // ==========================================================================
936
937    #[tokio::test]
938    async fn test_service_creation() {
939        let config = default_generator_config();
940        let service = SynthService::new(config);
941        // Service should start with zero or very small uptime (test completes quickly)
942        assert!(service.state.uptime_seconds() < 60);
943    }
944
945    #[tokio::test]
946    async fn test_service_with_state() {
947        let config = default_generator_config();
948        let state = Arc::new(ServerState::new(config));
949        let service = SynthService::with_state(Arc::clone(&state));
950
951        // Should share the same state
952        state.total_entries.store(100, Ordering::Relaxed);
953        assert_eq!(service.state.total_entries.load(Ordering::Relaxed), 100);
954    }
955
956    // ==========================================================================
957    // Health Check Tests
958    // ==========================================================================
959
960    #[tokio::test]
961    async fn test_health_check() {
962        let config = default_generator_config();
963        let service = SynthService::new(config);
964
965        let response = service.health_check(Request::new(())).await.unwrap();
966        let health = response.into_inner();
967
968        assert!(health.healthy);
969        assert!(!health.version.is_empty());
970    }
971
972    #[tokio::test]
973    async fn test_health_check_returns_version() {
974        let config = default_generator_config();
975        let service = SynthService::new(config);
976
977        let response = service.health_check(Request::new(())).await.unwrap();
978        let health = response.into_inner();
979
980        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
981    }
982
983    // ==========================================================================
984    // Configuration Tests
985    // ==========================================================================
986
987    #[tokio::test]
988    async fn test_get_config() {
989        let config = default_generator_config();
990        let service = SynthService::new(config);
991
992        let response = service.get_config(Request::new(())).await.unwrap();
993        let config_response = response.into_inner();
994
995        assert!(config_response.success);
996        assert!(config_response.current_config.is_some());
997    }
998
999    #[tokio::test]
1000    async fn test_get_config_returns_industry() {
1001        let config = default_generator_config();
1002        let service = SynthService::new(config);
1003
1004        let response = service.get_config(Request::new(())).await.unwrap();
1005        let config_response = response.into_inner();
1006        let current = config_response.current_config.unwrap();
1007
1008        assert_eq!(current.industry, "Manufacturing");
1009    }
1010
1011    #[tokio::test]
1012    async fn test_set_config() {
1013        let config = default_generator_config();
1014        let service = SynthService::new(config);
1015
1016        let new_config = GenerationConfig {
1017            industry: "retail".to_string(),
1018            start_date: "2024-06-01".to_string(),
1019            period_months: 6,
1020            seed: 42,
1021            coa_complexity: "medium".to_string(),
1022            companies: vec![],
1023            fraud_enabled: true,
1024            fraud_rate: 0.05,
1025            generate_master_data: false,
1026            generate_document_flows: false,
1027        };
1028
1029        let response = service
1030            .set_config(Request::new(ConfigRequest {
1031                config: Some(new_config),
1032            }))
1033            .await
1034            .unwrap();
1035        let config_response = response.into_inner();
1036
1037        assert!(config_response.success);
1038    }
1039
1040    #[tokio::test]
1041    async fn test_set_config_without_config_fails() {
1042        let config = default_generator_config();
1043        let service = SynthService::new(config);
1044
1045        let result = service
1046            .set_config(Request::new(ConfigRequest { config: None }))
1047            .await;
1048
1049        assert!(result.is_err());
1050    }
1051
1052    // ==========================================================================
1053    // Metrics Tests
1054    // ==========================================================================
1055
1056    #[tokio::test]
1057    async fn test_get_metrics_initial() {
1058        let config = default_generator_config();
1059        let service = SynthService::new(config);
1060
1061        let response = service.get_metrics(Request::new(())).await.unwrap();
1062        let metrics = response.into_inner();
1063
1064        assert_eq!(metrics.total_entries_generated, 0);
1065        assert_eq!(metrics.total_anomalies_injected, 0);
1066        assert_eq!(metrics.active_streams, 0);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_get_metrics_after_updates() {
1071        let config = default_generator_config();
1072        let service = SynthService::new(config);
1073
1074        // Simulate some activity
1075        service.state.total_entries.store(1000, Ordering::Relaxed);
1076        service.state.total_anomalies.store(20, Ordering::Relaxed);
1077        service.state.active_streams.store(2, Ordering::Relaxed);
1078
1079        let response = service.get_metrics(Request::new(())).await.unwrap();
1080        let metrics = response.into_inner();
1081
1082        assert_eq!(metrics.total_entries_generated, 1000);
1083        assert_eq!(metrics.total_anomalies_injected, 20);
1084        assert_eq!(metrics.active_streams, 2);
1085    }
1086
1087    // ==========================================================================
1088    // Control Tests
1089    // ==========================================================================
1090
1091    #[tokio::test]
1092    async fn test_control_pause() {
1093        let config = default_generator_config();
1094        let service = SynthService::new(config);
1095
1096        let response = service
1097            .control(Request::new(ControlCommand {
1098                action: ControlAction::Pause as i32,
1099                pattern_name: None,
1100            }))
1101            .await
1102            .unwrap();
1103        let control_response = response.into_inner();
1104
1105        assert!(control_response.success);
1106        assert!(service.state.stream_paused.load(Ordering::Relaxed));
1107    }
1108
1109    #[tokio::test]
1110    async fn test_control_resume() {
1111        let config = default_generator_config();
1112        let service = SynthService::new(config);
1113
1114        // First pause
1115        service.state.stream_paused.store(true, Ordering::Relaxed);
1116
1117        let response = service
1118            .control(Request::new(ControlCommand {
1119                action: ControlAction::Resume as i32,
1120                pattern_name: None,
1121            }))
1122            .await
1123            .unwrap();
1124        let control_response = response.into_inner();
1125
1126        assert!(control_response.success);
1127        assert!(!service.state.stream_paused.load(Ordering::Relaxed));
1128    }
1129
1130    #[tokio::test]
1131    async fn test_control_stop() {
1132        let config = default_generator_config();
1133        let service = SynthService::new(config);
1134
1135        let response = service
1136            .control(Request::new(ControlCommand {
1137                action: ControlAction::Stop as i32,
1138                pattern_name: None,
1139            }))
1140            .await
1141            .unwrap();
1142        let control_response = response.into_inner();
1143
1144        assert!(control_response.success);
1145        assert!(service.state.stream_stopped.load(Ordering::Relaxed));
1146    }
1147
1148    // ==========================================================================
1149    // ServerState Tests
1150    // ==========================================================================
1151
1152    #[test]
1153    fn test_server_state_creation() {
1154        let config = default_generator_config();
1155        let state = ServerState::new(config);
1156
1157        assert_eq!(state.total_entries.load(Ordering::Relaxed), 0);
1158        assert_eq!(state.total_anomalies.load(Ordering::Relaxed), 0);
1159        assert_eq!(state.active_streams.load(Ordering::Relaxed), 0);
1160        assert!(!state.stream_paused.load(Ordering::Relaxed));
1161        assert!(!state.stream_stopped.load(Ordering::Relaxed));
1162    }
1163
1164    #[test]
1165    fn test_server_state_uptime() {
1166        let config = default_generator_config();
1167        let state = ServerState::new(config);
1168
1169        // Uptime should be small since we just created the state
1170        assert!(state.uptime_seconds() < 60);
1171    }
1172
1173    // ==========================================================================
1174    // Proto Conversion Tests
1175    // ==========================================================================
1176
1177    #[test]
1178    fn test_default_generator_config() {
1179        let config = default_generator_config();
1180
1181        assert_eq!(config.global.industry, IndustrySector::Manufacturing);
1182        assert_eq!(config.global.period_months, 12);
1183        assert!(!config.companies.is_empty());
1184        assert_eq!(config.companies[0].code, "1000");
1185    }
1186
1187    #[test]
1188    fn test_config_to_proto() {
1189        let config = default_generator_config();
1190        let proto = SynthService::config_to_proto(&config);
1191
1192        assert_eq!(proto.industry, "Manufacturing");
1193        assert_eq!(proto.period_months, 12);
1194        assert!(!proto.companies.is_empty());
1195    }
1196
1197    #[tokio::test]
1198    async fn test_proto_to_config_with_none() {
1199        let config = default_generator_config();
1200        let service = SynthService::new(config.clone());
1201
1202        let result = service.proto_to_config(None).await.unwrap();
1203
1204        // Should return current config when None is passed
1205        assert_eq!(result.global.industry, config.global.industry);
1206    }
1207
1208    #[tokio::test]
1209    async fn test_proto_to_config_with_retail() {
1210        let config = default_generator_config();
1211        let service = SynthService::new(config);
1212
1213        let proto = GenerationConfig {
1214            industry: "retail".to_string(),
1215            start_date: "2024-01-01".to_string(),
1216            period_months: 6,
1217            seed: 0,
1218            coa_complexity: "large".to_string(),
1219            companies: vec![],
1220            fraud_enabled: false,
1221            fraud_rate: 0.0,
1222            generate_master_data: false,
1223            generate_document_flows: false,
1224        };
1225
1226        let result = service.proto_to_config(Some(proto)).await.unwrap();
1227
1228        assert_eq!(result.global.industry, IndustrySector::Retail);
1229        assert_eq!(result.chart_of_accounts.complexity, CoAComplexity::Large);
1230    }
1231
1232    #[tokio::test]
1233    async fn test_proto_to_config_with_healthcare() {
1234        let config = default_generator_config();
1235        let service = SynthService::new(config);
1236
1237        let proto = GenerationConfig {
1238            industry: "healthcare".to_string(),
1239            start_date: "2024-01-01".to_string(),
1240            period_months: 12,
1241            seed: 42,
1242            coa_complexity: "small".to_string(),
1243            companies: vec![],
1244            fraud_enabled: true,
1245            fraud_rate: 0.1,
1246            generate_master_data: true,
1247            generate_document_flows: true,
1248        };
1249
1250        let result = service.proto_to_config(Some(proto)).await.unwrap();
1251
1252        assert_eq!(result.global.industry, IndustrySector::Healthcare);
1253        assert_eq!(result.global.seed, Some(42));
1254        assert!(result.fraud.enabled);
1255        assert!((result.fraud.fraud_rate - 0.1).abs() < 0.001);
1256    }
1257
1258    #[tokio::test]
1259    async fn test_proto_to_config_with_companies() {
1260        let config = default_generator_config();
1261        let service = SynthService::new(config);
1262
1263        let proto = GenerationConfig {
1264            industry: "technology".to_string(),
1265            start_date: "2024-01-01".to_string(),
1266            period_months: 12,
1267            seed: 0,
1268            coa_complexity: "medium".to_string(),
1269            companies: vec![
1270                CompanyConfigProto {
1271                    code: "1000".to_string(),
1272                    name: "Parent Corp".to_string(),
1273                    currency: "USD".to_string(),
1274                    country: "US".to_string(),
1275                    annual_transaction_volume: 100000,
1276                    volume_weight: 1.0,
1277                },
1278                CompanyConfigProto {
1279                    code: "2000".to_string(),
1280                    name: "EU Sub".to_string(),
1281                    currency: "EUR".to_string(),
1282                    country: "DE".to_string(),
1283                    annual_transaction_volume: 50000,
1284                    volume_weight: 0.5,
1285                },
1286            ],
1287            fraud_enabled: false,
1288            fraud_rate: 0.0,
1289            generate_master_data: false,
1290            generate_document_flows: false,
1291        };
1292
1293        let result = service.proto_to_config(Some(proto)).await.unwrap();
1294
1295        assert_eq!(result.companies.len(), 2);
1296        assert_eq!(result.companies[0].code, "1000");
1297        assert_eq!(result.companies[1].currency, "EUR");
1298    }
1299
1300    // ==========================================================================
1301    // Input Validation Tests
1302    // ==========================================================================
1303
1304    #[tokio::test]
1305    async fn test_bulk_generate_entry_count_validation() {
1306        let config = default_generator_config();
1307        let service = SynthService::new(config);
1308
1309        let request = BulkGenerateRequest {
1310            entry_count: 2_000_000, // Exceeds MAX_ENTRY_COUNT
1311            include_master_data: false,
1312            inject_anomalies: false,
1313            output_format: 0,
1314            config: None,
1315        };
1316
1317        let result = service.bulk_generate(Request::new(request)).await;
1318        assert!(result.is_err());
1319        let err = result.err().unwrap();
1320        assert!(err.message().contains("exceeds maximum allowed value"));
1321    }
1322
1323    #[tokio::test]
1324    async fn test_stream_data_events_per_second_too_low() {
1325        let config = default_generator_config();
1326        let service = SynthService::new(config);
1327
1328        let request = StreamDataRequest {
1329            events_per_second: 0, // Below MIN_EVENTS_PER_SECOND
1330            max_events: 100,
1331            inject_anomalies: false,
1332            anomaly_rate: 0.0,
1333            config: None,
1334        };
1335
1336        let result = service.stream_data(Request::new(request)).await;
1337        assert!(result.is_err());
1338        let err = result.err().unwrap();
1339        assert!(err.message().contains("must be at least"));
1340    }
1341
1342    #[tokio::test]
1343    async fn test_stream_data_events_per_second_too_high() {
1344        let config = default_generator_config();
1345        let service = SynthService::new(config);
1346
1347        let request = StreamDataRequest {
1348            events_per_second: 20_000, // Exceeds MAX_EVENTS_PER_SECOND
1349            max_events: 100,
1350            inject_anomalies: false,
1351            anomaly_rate: 0.0,
1352            config: None,
1353        };
1354
1355        let result = service.stream_data(Request::new(request)).await;
1356        assert!(result.is_err());
1357        let err = result.err().unwrap();
1358        assert!(err.message().contains("exceeds maximum allowed value"));
1359    }
1360
1361    #[tokio::test]
1362    async fn test_stream_data_max_events_too_high() {
1363        let config = default_generator_config();
1364        let service = SynthService::new(config);
1365
1366        let request = StreamDataRequest {
1367            events_per_second: 100,
1368            max_events: 100_000_000, // Exceeds MAX_STREAM_EVENTS
1369            inject_anomalies: false,
1370            anomaly_rate: 0.0,
1371            config: None,
1372        };
1373
1374        let result = service.stream_data(Request::new(request)).await;
1375        assert!(result.is_err());
1376        let err = result.err().unwrap();
1377        assert!(err.message().contains("max_events"));
1378    }
1379
1380    #[tokio::test]
1381    async fn test_stream_data_valid_request() {
1382        let config = default_generator_config();
1383        let service = SynthService::new(config);
1384
1385        let request = StreamDataRequest {
1386            events_per_second: 10,
1387            max_events: 5,
1388            inject_anomalies: false,
1389            anomaly_rate: 0.0,
1390            config: None,
1391        };
1392
1393        // This should succeed - we can't easily test the stream output,
1394        // but we verify the request is accepted
1395        let result = service.stream_data(Request::new(request)).await;
1396        assert!(result.is_ok());
1397    }
1398}