mockforge_grpc/dynamic/
mod.rs

1//! Dynamic gRPC service discovery and registration
2//!
3//! This module provides functionality to dynamically discover and register
4//! gRPC services from proto files, making the gRPC mock system flexible
5//! for different applications and developers.
6
7pub mod http_bridge;
8pub mod proto_parser;
9pub mod service_generator;
10
11use crate::reflection::{MockReflectionProxy, ProxyConfig};
12use proto_parser::ProtoParser;
13use service_generator::DynamicGrpcService;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tonic::transport::Server;
17use tonic_reflection::server::Builder as ReflectionBuilder;
18use tracing::*;
19
20/// TLS configuration for gRPC server
21#[derive(Debug, Clone)]
22pub struct GrpcTlsConfig {
23    /// Path to the TLS certificate file (PEM format)
24    pub cert_path: String,
25    /// Path to the TLS private key file (PEM format)
26    pub key_path: String,
27    /// Optional path to CA certificate for client certificate verification (mTLS)
28    pub client_ca_path: Option<String>,
29}
30
31impl GrpcTlsConfig {
32    /// Create a new TLS configuration
33    pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
34        Self {
35            cert_path: cert_path.into(),
36            key_path: key_path.into(),
37            client_ca_path: None,
38        }
39    }
40
41    /// Create TLS configuration with mutual TLS (client certificate verification)
42    pub fn with_mtls(
43        cert_path: impl Into<String>,
44        key_path: impl Into<String>,
45        client_ca_path: impl Into<String>,
46    ) -> Self {
47        Self {
48            cert_path: cert_path.into(),
49            key_path: key_path.into(),
50            client_ca_path: Some(client_ca_path.into()),
51        }
52    }
53
54    /// Create TLS configuration from environment variables
55    ///
56    /// Uses:
57    /// - GRPC_TLS_CERT: Path to certificate file
58    /// - GRPC_TLS_KEY: Path to private key file
59    /// - GRPC_TLS_CLIENT_CA: Optional path to client CA for mTLS
60    pub fn from_env() -> Option<Self> {
61        let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
62        let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
63        let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
64
65        Some(Self {
66            cert_path,
67            key_path,
68            client_ca_path,
69        })
70    }
71}
72
73/// Configuration for dynamic gRPC service discovery
74#[derive(Debug, Clone)]
75pub struct DynamicGrpcConfig {
76    /// Directory containing proto files
77    pub proto_dir: String,
78    /// Whether to enable reflection
79    pub enable_reflection: bool,
80    /// Services to exclude from discovery
81    pub excluded_services: Vec<String>,
82    /// HTTP bridge configuration
83    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
84    /// TLS configuration (None for plaintext)
85    pub tls: Option<GrpcTlsConfig>,
86}
87
88impl Default for DynamicGrpcConfig {
89    fn default() -> Self {
90        Self {
91            proto_dir: "proto".to_string(),
92            enable_reflection: false,
93            excluded_services: Vec::new(),
94            http_bridge: Some(http_bridge::HttpBridgeConfig {
95                enabled: true,
96                ..Default::default()
97            }),
98            // Check for TLS configuration from environment
99            tls: GrpcTlsConfig::from_env(),
100        }
101    }
102}
103
104/// A registry of discovered gRPC services
105#[derive(Clone)]
106pub struct ServiceRegistry {
107    /// Map of service names to their implementations
108    services: HashMap<String, Arc<DynamicGrpcService>>,
109    /// Descriptor pool containing parsed proto definitions
110    descriptor_pool: prost_reflect::DescriptorPool,
111}
112
113impl Default for ServiceRegistry {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119impl ServiceRegistry {
120    /// Get the descriptor pool
121    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
122        &self.descriptor_pool
123    }
124
125    /// Create a new service registry
126    pub fn new() -> Self {
127        Self {
128            services: HashMap::new(),
129            descriptor_pool: prost_reflect::DescriptorPool::new(),
130        }
131    }
132
133    /// Create a service registry with a descriptor pool
134    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
135        Self {
136            services: HashMap::new(),
137            descriptor_pool,
138        }
139    }
140
141    /// Set the descriptor pool (useful when building registry incrementally)
142    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
143        self.descriptor_pool = pool;
144    }
145
146    /// Register a service implementation
147    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
148        self.services.insert(name, Arc::new(service));
149    }
150
151    /// Get a service by name
152    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
153        self.services.get(name)
154    }
155
156    /// List all registered service names
157    pub fn service_names(&self) -> Vec<String> {
158        self.services.keys().cloned().collect()
159    }
160}
161
162/// Discover and register services from proto files
163pub async fn discover_services(
164    config: &DynamicGrpcConfig,
165) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
166    use std::time::Instant;
167
168    let discovery_start = Instant::now();
169    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
170
171    // Parse proto files
172    let parse_start = Instant::now();
173    let mut parser = ProtoParser::new();
174    parser.parse_directory(&config.proto_dir).await?;
175    let parse_duration = parse_start.elapsed();
176    info!("Proto file parsing completed (took {:?})", parse_duration);
177
178    // Create registry with the descriptor pool from the parser
179    let registry_start = Instant::now();
180    let mut registry = ServiceRegistry::new();
181    // Extract services from parser and move descriptor pool
182    let services = parser.services().clone();
183    let descriptor_pool = parser.into_pool();
184
185    registry.set_descriptor_pool(descriptor_pool);
186    let registry_duration = registry_start.elapsed();
187    debug!("Registry creation completed (took {:?})", registry_duration);
188
189    // Create dynamic services from parsed proto definitions
190    let service_reg_start = Instant::now();
191    for (service_name, proto_service) in services {
192        // Skip excluded services
193        if config.excluded_services.contains(&service_name) {
194            info!("Skipping excluded service: {}", service_name);
195            continue;
196        }
197
198        // Create dynamic service
199        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
200        registry.register(service_name.clone(), dynamic_service);
201
202        debug!("Registered service: {}", service_name);
203    }
204    let service_reg_duration = service_reg_start.elapsed();
205    info!(
206        "Service registration completed for {} services (took {:?})",
207        registry.service_names().len(),
208        service_reg_duration
209    );
210
211    let total_discovery_duration = discovery_start.elapsed();
212    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
213    Ok(registry)
214}
215
216/// Start a dynamic server with both gRPC and HTTP bridge support
217pub async fn start_dynamic_server(
218    port: u16,
219    config: DynamicGrpcConfig,
220    latency_profile: Option<mockforge_core::LatencyProfile>,
221) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
222    use std::path::Path;
223    use std::time::Instant;
224
225    // Check if proto directory exists before attempting to start
226    let proto_path = Path::new(&config.proto_dir);
227    if !proto_path.exists() {
228        info!(
229            "Proto directory '{}' does not exist. gRPC server will not start. \
230            This is normal when using only HTTP/OpenAPI mocking.",
231            config.proto_dir
232        );
233        // Return Ok to indicate graceful skip, not an error
234        return Ok(());
235    }
236
237    let startup_start = Instant::now();
238
239    #[cfg(feature = "data-faker")]
240    mockforge_data::provider::register_core_faker_provider();
241
242    let _latency_injector = latency_profile
243        .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
244
245    // Discover services
246    let registry = discover_services(&config).await?;
247    let registry_arc = Arc::new(registry);
248
249    // Use shared server utilities for consistent address creation
250    let addr = mockforge_core::wildcard_socket_addr(port);
251    info!(
252        "Dynamic server listening on {} with {} services",
253        addr,
254        registry_arc.service_names().len()
255    );
256
257    // Create proxy configuration
258    let proxy_config = ProxyConfig::default();
259
260    // Create mock reflection proxy
261    let reflection_start = Instant::now();
262    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
263    let reflection_duration = reflection_start.elapsed();
264    info!("gRPC reflection proxy created (took {:?})", reflection_duration);
265
266    let total_startup_duration = startup_start.elapsed();
267    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
268
269    // Start HTTP server (bridge) if enabled
270    // Currently, just start the gRPC server directly
271    // HTTP bridge functionality is disabled
272    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
273
274    // HTTP bridge is disabled, no server handle to wait for
275
276    Ok(())
277}
278
279/// Start a gRPC-only server (for backward compatibility)
280pub async fn start_dynamic_grpc_server(
281    port: u16,
282    config: DynamicGrpcConfig,
283    latency_profile: Option<mockforge_core::LatencyProfile>,
284) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
285    // Disable HTTP bridge
286    let mut grpc_only_config = config;
287    grpc_only_config.http_bridge = None;
288
289    start_dynamic_server(port, grpc_only_config, latency_profile).await
290}
291
292/// Start the gRPC-only server implementation
293async fn start_grpc_only_server(
294    port: u16,
295    config: &DynamicGrpcConfig,
296    registry_arc: Arc<ServiceRegistry>,
297    _mock_proxy: MockReflectionProxy,
298) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
299    use tonic::transport::{Certificate, Identity, ServerTlsConfig};
300
301    // Create server builder with optional TLS
302    let mut server_builder = if let Some(tls_config) = &config.tls {
303        info!("Configuring gRPC server with TLS");
304
305        // Read certificate and key files
306        let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
307            error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
308            Box::<dyn std::error::Error + Send + Sync>::from(format!(
309                "Failed to read TLS certificate: {}",
310                e
311            ))
312        })?;
313
314        let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
315            error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
316            Box::<dyn std::error::Error + Send + Sync>::from(format!(
317                "Failed to read TLS key: {}",
318                e
319            ))
320        })?;
321
322        let identity = Identity::from_pem(cert, key);
323
324        let mut tls = ServerTlsConfig::new().identity(identity);
325
326        // Add client CA for mTLS if configured
327        if let Some(client_ca_path) = &tls_config.client_ca_path {
328            info!("Configuring mutual TLS (mTLS) with client certificate verification");
329            let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
330                error!("Failed to read client CA from {}: {}", client_ca_path, e);
331                Box::<dyn std::error::Error + Send + Sync>::from(format!(
332                    "Failed to read client CA: {}",
333                    e
334                ))
335            })?;
336            tls = tls.client_ca_root(Certificate::from_pem(client_ca));
337        }
338
339        Server::builder().tls_config(tls).map_err(|e| {
340            error!("Failed to configure TLS: {}", e);
341            Box::<dyn std::error::Error + Send + Sync>::from(format!(
342                "Failed to configure TLS: {}",
343                e
344            ))
345        })?
346    } else {
347        info!("gRPC server running in plaintext mode (no TLS configured)");
348        Server::builder()
349    };
350
351    // Start actual gRPC server on the specified port
352    info!(
353        "Starting gRPC server on {} with {} discovered services",
354        mockforge_core::wildcard_socket_addr(port),
355        registry_arc.service_names().len()
356    );
357
358    // Log discovered services
359    for service_name in registry_arc.service_names() {
360        info!("  - Service: {}", service_name);
361    }
362
363    // Create a basic gRPC server that at least starts successfully
364    // Full implementation would require generating actual service implementations
365    use std::net::SocketAddr;
366
367    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
368
369    info!("gRPC server listening on {} (basic implementation)", grpc_addr);
370    info!("Discovered services are logged but not yet fully implemented:");
371    for service_name in registry_arc.service_names() {
372        info!("  - {}", service_name);
373    }
374
375    // Create a basic gRPC server with the discovered services
376    use crate::generated::greeter_server::{Greeter, GreeterServer};
377    use crate::generated::{HelloReply, HelloRequest};
378    use tonic::{Request, Response, Status};
379
380    // Basic implementation of the Greeter service
381    #[derive(Debug, Default)]
382    pub struct MockGreeterService;
383
384    use futures::StreamExt;
385    use std::pin::Pin;
386    use tokio_stream::wrappers::ReceiverStream;
387
388    #[tonic::async_trait]
389    impl Greeter for MockGreeterService {
390        type SayHelloStreamStream =
391            Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
392        type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
393
394        async fn say_hello(
395            &self,
396            request: Request<HelloRequest>,
397        ) -> Result<Response<HelloReply>, Status> {
398            info!("gRPC say_hello request: {:?}", request);
399
400            let req = request.into_inner();
401            let reply = HelloReply {
402                message: format!("Hello {}! This is a mock response from MockForge", req.name),
403                metadata: None,
404                items: vec![],
405            };
406
407            Ok(Response::new(reply))
408        }
409
410        /// Server streaming: Returns multiple responses for a single request
411        async fn say_hello_stream(
412            &self,
413            request: Request<HelloRequest>,
414        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
415            info!("gRPC say_hello_stream request: {:?}", request);
416            let req = request.into_inner();
417            let name = req.name.clone();
418
419            // Create a channel to send responses
420            let (tx, rx) = tokio::sync::mpsc::channel(128);
421
422            // Spawn a task to send multiple responses
423            tokio::spawn(async move {
424                for i in 1..=5 {
425                    let reply = HelloReply {
426                        message: format!(
427                            "Hello {}! Stream message {} of 5 from MockForge",
428                            name, i
429                        ),
430                        metadata: None,
431                        items: vec![],
432                    };
433
434                    if tx.send(Ok(reply)).await.is_err() {
435                        // Client disconnected
436                        break;
437                    }
438
439                    // Small delay between messages to simulate streaming
440                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
441                }
442            });
443
444            let stream = ReceiverStream::new(rx);
445            Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
446        }
447
448        /// Client streaming: Collects multiple requests and returns a single response
449        async fn say_hello_client_stream(
450            &self,
451            request: Request<tonic::Streaming<HelloRequest>>,
452        ) -> Result<Response<HelloReply>, Status> {
453            info!("gRPC say_hello_client_stream started");
454
455            let mut stream = request.into_inner();
456            let mut names = Vec::new();
457            let mut count = 0;
458
459            // Collect all incoming requests
460            while let Some(req) = stream.next().await {
461                match req {
462                    Ok(hello_request) => {
463                        info!("Received client stream message: {:?}", hello_request);
464                        names.push(hello_request.name);
465                        count += 1;
466                    }
467                    Err(e) => {
468                        error!("Error receiving client stream message: {}", e);
469                        return Err(Status::internal(format!("Stream error: {}", e)));
470                    }
471                }
472            }
473
474            // Create aggregated response
475            let message = if names.is_empty() {
476                "Hello! No names received in the stream.".to_string()
477            } else {
478                format!(
479                    "Hello {}! Received {} messages from MockForge client stream.",
480                    names.join(", "),
481                    count
482                )
483            };
484
485            let reply = HelloReply {
486                message,
487                metadata: None,
488                items: vec![],
489            };
490
491            Ok(Response::new(reply))
492        }
493
494        /// Bidirectional streaming: Echo back responses for each request
495        async fn chat(
496            &self,
497            request: Request<tonic::Streaming<HelloRequest>>,
498        ) -> Result<Response<Self::ChatStream>, Status> {
499            info!("gRPC chat (bidirectional streaming) started");
500
501            let mut stream = request.into_inner();
502            let (tx, rx) = tokio::sync::mpsc::channel(128);
503
504            // Spawn a task to process incoming messages and send responses
505            tokio::spawn(async move {
506                let mut message_count = 0;
507
508                while let Some(req) = stream.next().await {
509                    match req {
510                        Ok(hello_request) => {
511                            message_count += 1;
512                            info!("Chat received: {:?}", hello_request);
513
514                            let reply = HelloReply {
515                                message: format!(
516                                    "Chat response {}: Hello {}! from MockForge",
517                                    message_count, hello_request.name
518                                ),
519                                metadata: None,
520                                items: vec![],
521                            };
522
523                            if tx.send(Ok(reply)).await.is_err() {
524                                // Client disconnected
525                                break;
526                            }
527                        }
528                        Err(e) => {
529                            error!("Chat stream error: {}", e);
530                            let _ = tx
531                                .send(Err(Status::internal(format!("Stream error: {}", e))))
532                                .await;
533                            break;
534                        }
535                    }
536                }
537
538                info!("Chat session ended after {} messages", message_count);
539            });
540
541            let output_stream = ReceiverStream::new(rx);
542            Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
543        }
544    }
545
546    let greeter = MockGreeterService;
547
548    info!("gRPC server listening on {} with Greeter service", grpc_addr);
549
550    // Build the server with services
551    let mut router = server_builder.add_service(GreeterServer::new(greeter));
552
553    // Add reflection service if enabled
554    if config.enable_reflection {
555        // Build reflection service from the descriptor pool
556        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
557        let reflection_service = ReflectionBuilder::configure()
558            .register_encoded_file_descriptor_set(&encoded_fd_set)
559            .build_v1()
560            .map_err(|e| {
561                error!("Failed to build reflection service: {}", e);
562                Box::<dyn std::error::Error + Send + Sync>::from(format!(
563                    "Failed to build reflection service: {}",
564                    e
565                ))
566            })?;
567
568        router = router.add_service(reflection_service);
569        info!("gRPC reflection service enabled");
570    }
571
572    router.serve(grpc_addr).await?;
573
574    Ok(())
575}
576
577// start_combined_server removed - was a stub that was never implemented
578
579#[cfg(test)]
580mod tests {
581
582    #[test]
583    fn test_module_compiles() {}
584}