Skip to main content

mockforge_grpc/dynamic/
mod.rs

1//! Dynamic gRPC service discovery and registration
2//!
3//! This module provides functionality to dynamically discover and register
4//! gRPC services from proto files, making the gRPC mock system flexible
5//! for different applications and developers.
6
7pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21/// TLS configuration for gRPC server
22#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24    /// Path to the TLS certificate file (PEM format)
25    pub cert_path: String,
26    /// Path to the TLS private key file (PEM format)
27    pub key_path: String,
28    /// Optional path to CA certificate for client certificate verification (mTLS)
29    pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33    /// Create a new TLS configuration
34    pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35        Self {
36            cert_path: cert_path.into(),
37            key_path: key_path.into(),
38            client_ca_path: None,
39        }
40    }
41
42    /// Create TLS configuration with mutual TLS (client certificate verification)
43    pub fn with_mtls(
44        cert_path: impl Into<String>,
45        key_path: impl Into<String>,
46        client_ca_path: impl Into<String>,
47    ) -> Self {
48        Self {
49            cert_path: cert_path.into(),
50            key_path: key_path.into(),
51            client_ca_path: Some(client_ca_path.into()),
52        }
53    }
54
55    /// Create TLS configuration from environment variables
56    ///
57    /// Uses:
58    /// - GRPC_TLS_CERT: Path to certificate file
59    /// - GRPC_TLS_KEY: Path to private key file
60    /// - GRPC_TLS_CLIENT_CA: Optional path to client CA for mTLS
61    pub fn from_env() -> Option<Self> {
62        let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63        let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64        let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66        Some(Self {
67            cert_path,
68            key_path,
69            client_ca_path,
70        })
71    }
72}
73
74/// Configuration for dynamic gRPC service discovery
75#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77    /// Directory containing proto files
78    pub proto_dir: String,
79    /// Whether to enable reflection
80    pub enable_reflection: bool,
81    /// Services to exclude from discovery
82    pub excluded_services: Vec<String>,
83    /// HTTP bridge configuration
84    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85    /// TLS configuration (None for plaintext)
86    pub tls: Option<GrpcTlsConfig>,
87}
88
89impl Default for DynamicGrpcConfig {
90    fn default() -> Self {
91        Self {
92            proto_dir: "proto".to_string(),
93            enable_reflection: false,
94            excluded_services: Vec::new(),
95            http_bridge: Some(http_bridge::HttpBridgeConfig {
96                enabled: true,
97                ..Default::default()
98            }),
99            // Check for TLS configuration from environment
100            tls: GrpcTlsConfig::from_env(),
101        }
102    }
103}
104
105/// A registry of discovered gRPC services
106#[derive(Clone)]
107pub struct ServiceRegistry {
108    /// Map of service names to their implementations
109    services: HashMap<String, Arc<DynamicGrpcService>>,
110    /// Descriptor pool containing parsed proto definitions
111    descriptor_pool: prost_reflect::DescriptorPool,
112}
113
114impl Default for ServiceRegistry {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl ServiceRegistry {
121    /// Get the descriptor pool
122    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
123        &self.descriptor_pool
124    }
125
126    /// Create a new service registry
127    pub fn new() -> Self {
128        Self {
129            services: HashMap::new(),
130            descriptor_pool: prost_reflect::DescriptorPool::new(),
131        }
132    }
133
134    /// Create a service registry with a descriptor pool
135    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
136        Self {
137            services: HashMap::new(),
138            descriptor_pool,
139        }
140    }
141
142    /// Set the descriptor pool (useful when building registry incrementally)
143    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
144        self.descriptor_pool = pool;
145    }
146
147    /// Register a service implementation
148    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
149        self.services.insert(name, Arc::new(service));
150    }
151
152    /// Get a service by name
153    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
154        self.services.get(name)
155    }
156
157    /// List all registered service names
158    pub fn service_names(&self) -> Vec<String> {
159        self.services.keys().cloned().collect()
160    }
161}
162
163/// Discover and register services from proto files
164pub async fn discover_services(
165    config: &DynamicGrpcConfig,
166) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
167    use std::time::Instant;
168
169    let discovery_start = Instant::now();
170    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
171
172    // Parse proto files
173    let parse_start = Instant::now();
174    let mut parser = ProtoParser::new();
175    parser.parse_directory(&config.proto_dir).await?;
176    let parse_duration = parse_start.elapsed();
177    info!("Proto file parsing completed (took {:?})", parse_duration);
178
179    // Create registry with the descriptor pool from the parser
180    let registry_start = Instant::now();
181    let mut registry = ServiceRegistry::new();
182    // Extract services from parser and move descriptor pool
183    let services = parser.services().clone();
184    let descriptor_pool = parser.into_pool();
185
186    registry.set_descriptor_pool(descriptor_pool);
187    let registry_duration = registry_start.elapsed();
188    debug!("Registry creation completed (took {:?})", registry_duration);
189
190    // Create dynamic services from parsed proto definitions
191    let service_reg_start = Instant::now();
192    for (service_name, proto_service) in services {
193        // Skip excluded services
194        if config.excluded_services.contains(&service_name) {
195            info!("Skipping excluded service: {}", service_name);
196            continue;
197        }
198
199        // Create dynamic service
200        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
201        registry.register(service_name.clone(), dynamic_service);
202
203        debug!("Registered service: {}", service_name);
204    }
205    let service_reg_duration = service_reg_start.elapsed();
206    info!(
207        "Service registration completed for {} services (took {:?})",
208        registry.service_names().len(),
209        service_reg_duration
210    );
211
212    let total_discovery_duration = discovery_start.elapsed();
213    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
214    Ok(registry)
215}
216
217/// Start a dynamic server with both gRPC and HTTP bridge support
218pub async fn start_dynamic_server(
219    port: u16,
220    config: DynamicGrpcConfig,
221    latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
222) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223    use std::time::Instant;
224
225    let startup_start = Instant::now();
226
227    #[cfg(feature = "data-faker")]
228    mockforge_data::provider::register_core_faker_provider();
229
230    let _latency_injector = latency_profile.map(|profile| {
231        mockforge_foundation::latency::LatencyInjector::new(profile, Default::default())
232    });
233
234    // Discover services
235    let registry = discover_services(&config).await?;
236    let registry_arc = Arc::new(registry);
237
238    // Use shared server utilities for consistent address creation
239    let addr = mockforge_core::wildcard_socket_addr(port);
240    info!(
241        "Dynamic server listening on {} with {} services",
242        addr,
243        registry_arc.service_names().len()
244    );
245
246    // Create proxy configuration
247    let proxy_config = ProxyConfig::default();
248
249    // Create mock reflection proxy
250    let reflection_start = Instant::now();
251    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
252    let reflection_duration = reflection_start.elapsed();
253    info!("gRPC reflection proxy created (took {:?})", reflection_duration);
254
255    let total_startup_duration = startup_start.elapsed();
256    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
257
258    // Start HTTP server (bridge) if enabled
259    // Currently, just start the gRPC server directly
260    // HTTP bridge functionality is disabled
261    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
262
263    // HTTP bridge is disabled, no server handle to wait for
264
265    Ok(())
266}
267
268/// Start a gRPC-only server (for backward compatibility)
269pub async fn start_dynamic_grpc_server(
270    port: u16,
271    config: DynamicGrpcConfig,
272    latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
273) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
274    // Disable HTTP bridge
275    let mut grpc_only_config = config;
276    grpc_only_config.http_bridge = None;
277
278    start_dynamic_server(port, grpc_only_config, latency_profile).await
279}
280
281/// Start the gRPC-only server implementation
282async fn start_grpc_only_server(
283    port: u16,
284    config: &DynamicGrpcConfig,
285    registry_arc: Arc<ServiceRegistry>,
286    _mock_proxy: MockReflectionProxy,
287) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
288    use tonic::transport::{Certificate, Identity, ServerTlsConfig};
289
290    // Create server builder with optional TLS
291    let mut server_builder = if let Some(tls_config) = &config.tls {
292        info!("Configuring gRPC server with TLS");
293
294        // Read certificate and key files
295        let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
296            error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
297            Box::<dyn std::error::Error + Send + Sync>::from(format!(
298                "Failed to read TLS certificate: {}",
299                e
300            ))
301        })?;
302
303        let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
304            error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
305            Box::<dyn std::error::Error + Send + Sync>::from(format!(
306                "Failed to read TLS key: {}",
307                e
308            ))
309        })?;
310
311        let identity = Identity::from_pem(cert, key);
312
313        let mut tls = ServerTlsConfig::new().identity(identity);
314
315        // Add client CA for mTLS if configured
316        if let Some(client_ca_path) = &tls_config.client_ca_path {
317            info!("Configuring mutual TLS (mTLS) with client certificate verification");
318            let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
319                error!("Failed to read client CA from {}: {}", client_ca_path, e);
320                Box::<dyn std::error::Error + Send + Sync>::from(format!(
321                    "Failed to read client CA: {}",
322                    e
323                ))
324            })?;
325            tls = tls.client_ca_root(Certificate::from_pem(client_ca));
326        }
327
328        Server::builder().tls_config(tls).map_err(|e| {
329            error!("Failed to configure TLS: {}", e);
330            Box::<dyn std::error::Error + Send + Sync>::from(format!(
331                "Failed to configure TLS: {}",
332                e
333            ))
334        })?
335    } else {
336        info!("gRPC server running in plaintext mode (no TLS configured)");
337        Server::builder()
338    };
339
340    use std::net::SocketAddr;
341
342    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
343
344    // Log discovered services
345    info!(
346        "Starting gRPC server on {} with {} discovered services",
347        grpc_addr,
348        registry_arc.service_names().len()
349    );
350    for service_name in registry_arc.service_names() {
351        info!("  - Dynamic service: {}", service_name);
352    }
353
354    // Build the built-in Greeter service
355    use crate::generated::greeter_server::{Greeter, GreeterServer};
356    use crate::generated::{HelloReply, HelloRequest};
357    use tonic::{Request, Response, Status};
358
359    #[derive(Debug, Default)]
360    struct MockGreeterService;
361
362    use futures::StreamExt;
363    use std::pin::Pin;
364    use tokio_stream::wrappers::ReceiverStream;
365
366    #[tonic::async_trait]
367    impl Greeter for MockGreeterService {
368        type SayHelloStreamStream =
369            Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
370        type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
371
372        async fn say_hello(
373            &self,
374            request: Request<HelloRequest>,
375        ) -> Result<Response<HelloReply>, Status> {
376            info!("gRPC say_hello request: {:?}", request);
377            let req = request.into_inner();
378            let reply = HelloReply {
379                message: format!("Hello {}! This is a mock response from MockForge", req.name),
380                metadata: None,
381                items: vec![],
382            };
383            Ok(Response::new(reply))
384        }
385
386        async fn say_hello_stream(
387            &self,
388            request: Request<HelloRequest>,
389        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
390            info!("gRPC say_hello_stream request: {:?}", request);
391            let name = request.into_inner().name;
392            let (tx, rx) = tokio::sync::mpsc::channel(128);
393            tokio::spawn(async move {
394                for i in 1..=5 {
395                    let reply = HelloReply {
396                        message: format!(
397                            "Hello {}! Stream message {} of 5 from MockForge",
398                            name, i
399                        ),
400                        metadata: None,
401                        items: vec![],
402                    };
403                    if tx.send(Ok(reply)).await.is_err() {
404                        break;
405                    }
406                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
407                }
408            });
409            let stream = ReceiverStream::new(rx);
410            Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
411        }
412
413        async fn say_hello_client_stream(
414            &self,
415            request: Request<tonic::Streaming<HelloRequest>>,
416        ) -> Result<Response<HelloReply>, Status> {
417            info!("gRPC say_hello_client_stream started");
418            let mut stream = request.into_inner();
419            let mut names = Vec::new();
420            let mut count = 0;
421            while let Some(req) = stream.next().await {
422                match req {
423                    Ok(hello_request) => {
424                        names.push(hello_request.name);
425                        count += 1;
426                    }
427                    Err(e) => {
428                        error!("Error receiving client stream message: {}", e);
429                        return Err(Status::internal(format!("Stream error: {}", e)));
430                    }
431                }
432            }
433            let message = if names.is_empty() {
434                "Hello! No names received in the stream.".to_string()
435            } else {
436                format!(
437                    "Hello {}! Received {} messages from MockForge client stream.",
438                    names.join(", "),
439                    count
440                )
441            };
442            Ok(Response::new(HelloReply {
443                message,
444                metadata: None,
445                items: vec![],
446            }))
447        }
448
449        async fn chat(
450            &self,
451            request: Request<tonic::Streaming<HelloRequest>>,
452        ) -> Result<Response<Self::ChatStream>, Status> {
453            info!("gRPC chat (bidirectional streaming) started");
454            let mut stream = request.into_inner();
455            let (tx, rx) = tokio::sync::mpsc::channel(128);
456            tokio::spawn(async move {
457                let mut message_count = 0;
458                while let Some(req) = stream.next().await {
459                    match req {
460                        Ok(hello_request) => {
461                            message_count += 1;
462                            let reply = HelloReply {
463                                message: format!(
464                                    "Chat response {}: Hello {}! from MockForge",
465                                    message_count, hello_request.name
466                                ),
467                                metadata: None,
468                                items: vec![],
469                            };
470                            if tx.send(Ok(reply)).await.is_err() {
471                                break;
472                            }
473                        }
474                        Err(e) => {
475                            error!("Chat stream error: {}", e);
476                            let _ = tx
477                                .send(Err(Status::internal(format!("Stream error: {}", e))))
478                                .await;
479                            break;
480                        }
481                    }
482                }
483                info!("Chat session ended after {} messages", message_count);
484            });
485            let output_stream = ReceiverStream::new(rx);
486            Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
487        }
488    }
489
490    // Build tonic Routes with built-in services
491    let mut routes_builder = tonic::service::RoutesBuilder::default();
492    routes_builder.add_service(GreeterServer::new(MockGreeterService));
493    info!("Registered built-in Greeter service");
494
495    // Add reflection service if enabled
496    if config.enable_reflection {
497        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
498        let reflection_service = ReflectionBuilder::configure()
499            .register_encoded_file_descriptor_set(&encoded_fd_set)
500            .build_v1()
501            .map_err(|e| {
502                error!("Failed to build reflection service: {}", e);
503                Box::<dyn std::error::Error + Send + Sync>::from(format!(
504                    "Failed to build reflection service: {}",
505                    e
506                ))
507            })?;
508        routes_builder.add_service(reflection_service);
509        info!("gRPC reflection service enabled");
510    }
511
512    // Convert to axum router and add dynamic service fallback
513    let registry_for_fallback = registry_arc.clone();
514    let axum_router =
515        routes_builder
516            .routes()
517            .into_axum_router()
518            .fallback(move |req: axum::extract::Request| {
519                let registry = registry_for_fallback.clone();
520                async move {
521                    let path = req.uri().path().to_string();
522                    match router::parse_grpc_path(&path) {
523                        Some((service_name, method_name)) => {
524                            // Collect the request body (limit to 4MB)
525                            let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
526                                .await
527                                .unwrap_or_default();
528
529                            match router::handle_dynamic_grpc_request(
530                                &registry,
531                                service_name,
532                                method_name,
533                                body,
534                            )
535                            .await
536                            {
537                                Ok(response) => response,
538                                Err(status) => router::create_grpc_error_response(status),
539                            }
540                        }
541                        None => router::create_grpc_error_response(Status::unimplemented(
542                            "Unknown path",
543                        )),
544                    }
545                }
546            });
547
548    // Convert back to tonic Routes and serve with the configured server
549    let routes = tonic::service::Routes::from(axum_router);
550    server_builder.add_routes(routes).serve(grpc_addr).await?;
551
552    info!("gRPC server stopped");
553    Ok(())
554}
555
556// start_combined_server removed - was a stub that was never implemented
557
558#[cfg(test)]
559mod tests {
560    #[test]
561    fn test_module_compiles() {
562        // Verify this module's types and imports are valid
563    }
564}