Skip to main content

mockforge_grpc/dynamic/
mod.rs

1//! Dynamic gRPC service discovery and registration
2//!
3//! This module provides functionality to dynamically discover and register
4//! gRPC services from proto files, making the gRPC mock system flexible
5//! for different applications and developers.
6
7pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21/// TLS configuration for gRPC server
22#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24    /// Path to the TLS certificate file (PEM format)
25    pub cert_path: String,
26    /// Path to the TLS private key file (PEM format)
27    pub key_path: String,
28    /// Optional path to CA certificate for client certificate verification (mTLS)
29    pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33    /// Create a new TLS configuration
34    pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35        Self {
36            cert_path: cert_path.into(),
37            key_path: key_path.into(),
38            client_ca_path: None,
39        }
40    }
41
42    /// Create TLS configuration with mutual TLS (client certificate verification)
43    pub fn with_mtls(
44        cert_path: impl Into<String>,
45        key_path: impl Into<String>,
46        client_ca_path: impl Into<String>,
47    ) -> Self {
48        Self {
49            cert_path: cert_path.into(),
50            key_path: key_path.into(),
51            client_ca_path: Some(client_ca_path.into()),
52        }
53    }
54
55    /// Create TLS configuration from environment variables
56    ///
57    /// Uses:
58    /// - GRPC_TLS_CERT: Path to certificate file
59    /// - GRPC_TLS_KEY: Path to private key file
60    /// - GRPC_TLS_CLIENT_CA: Optional path to client CA for mTLS
61    pub fn from_env() -> Option<Self> {
62        let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63        let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64        let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66        Some(Self {
67            cert_path,
68            key_path,
69            client_ca_path,
70        })
71    }
72}
73
74/// Configuration for dynamic gRPC service discovery
75#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77    /// Directory containing proto files
78    pub proto_dir: String,
79    /// Whether to enable reflection
80    pub enable_reflection: bool,
81    /// Services to exclude from discovery
82    pub excluded_services: Vec<String>,
83    /// HTTP bridge configuration
84    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85    /// TLS configuration (None for plaintext)
86    pub tls: Option<GrpcTlsConfig>,
87}
88
89impl Default for DynamicGrpcConfig {
90    fn default() -> Self {
91        Self {
92            proto_dir: "proto".to_string(),
93            enable_reflection: false,
94            excluded_services: Vec::new(),
95            http_bridge: Some(http_bridge::HttpBridgeConfig {
96                enabled: true,
97                ..Default::default()
98            }),
99            // Check for TLS configuration from environment
100            tls: GrpcTlsConfig::from_env(),
101        }
102    }
103}
104
105/// A registry of discovered gRPC services
106#[derive(Clone)]
107pub struct ServiceRegistry {
108    /// Map of service names to their implementations
109    services: HashMap<String, Arc<DynamicGrpcService>>,
110    /// Descriptor pool containing parsed proto definitions
111    descriptor_pool: prost_reflect::DescriptorPool,
112}
113
114impl Default for ServiceRegistry {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl ServiceRegistry {
121    /// Get the descriptor pool
122    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
123        &self.descriptor_pool
124    }
125
126    /// Create a new service registry
127    pub fn new() -> Self {
128        Self {
129            services: HashMap::new(),
130            descriptor_pool: prost_reflect::DescriptorPool::new(),
131        }
132    }
133
134    /// Create a service registry with a descriptor pool
135    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
136        Self {
137            services: HashMap::new(),
138            descriptor_pool,
139        }
140    }
141
142    /// Set the descriptor pool (useful when building registry incrementally)
143    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
144        self.descriptor_pool = pool;
145    }
146
147    /// Register a service implementation
148    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
149        self.services.insert(name, Arc::new(service));
150    }
151
152    /// Get a service by name
153    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
154        self.services.get(name)
155    }
156
157    /// List all registered service names
158    pub fn service_names(&self) -> Vec<String> {
159        self.services.keys().cloned().collect()
160    }
161}
162
163/// Discover and register services from proto files
164pub async fn discover_services(
165    config: &DynamicGrpcConfig,
166) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
167    use std::time::Instant;
168
169    let discovery_start = Instant::now();
170    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
171
172    // Parse proto files
173    let parse_start = Instant::now();
174    let mut parser = ProtoParser::new();
175    parser.parse_directory(&config.proto_dir).await?;
176    let parse_duration = parse_start.elapsed();
177    info!("Proto file parsing completed (took {:?})", parse_duration);
178
179    // Create registry with the descriptor pool from the parser
180    let registry_start = Instant::now();
181    let mut registry = ServiceRegistry::new();
182    // Extract services from parser and move descriptor pool
183    let services = parser.services().clone();
184    let descriptor_pool = parser.into_pool();
185
186    registry.set_descriptor_pool(descriptor_pool);
187    let registry_duration = registry_start.elapsed();
188    debug!("Registry creation completed (took {:?})", registry_duration);
189
190    // Create dynamic services from parsed proto definitions
191    let service_reg_start = Instant::now();
192    for (service_name, proto_service) in services {
193        // Skip excluded services
194        if config.excluded_services.contains(&service_name) {
195            info!("Skipping excluded service: {}", service_name);
196            continue;
197        }
198
199        // Create dynamic service
200        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
201        registry.register(service_name.clone(), dynamic_service);
202
203        debug!("Registered service: {}", service_name);
204    }
205    let service_reg_duration = service_reg_start.elapsed();
206    info!(
207        "Service registration completed for {} services (took {:?})",
208        registry.service_names().len(),
209        service_reg_duration
210    );
211
212    let total_discovery_duration = discovery_start.elapsed();
213    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
214    Ok(registry)
215}
216
217/// Start a dynamic server with both gRPC and HTTP bridge support
218pub async fn start_dynamic_server(
219    port: u16,
220    config: DynamicGrpcConfig,
221    latency_profile: Option<mockforge_core::LatencyProfile>,
222) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223    use std::time::Instant;
224
225    let startup_start = Instant::now();
226
227    #[cfg(feature = "data-faker")]
228    mockforge_data::provider::register_core_faker_provider();
229
230    let _latency_injector = latency_profile
231        .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
232
233    // Discover services
234    let registry = discover_services(&config).await?;
235    let registry_arc = Arc::new(registry);
236
237    // Use shared server utilities for consistent address creation
238    let addr = mockforge_core::wildcard_socket_addr(port);
239    info!(
240        "Dynamic server listening on {} with {} services",
241        addr,
242        registry_arc.service_names().len()
243    );
244
245    // Create proxy configuration
246    let proxy_config = ProxyConfig::default();
247
248    // Create mock reflection proxy
249    let reflection_start = Instant::now();
250    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
251    let reflection_duration = reflection_start.elapsed();
252    info!("gRPC reflection proxy created (took {:?})", reflection_duration);
253
254    let total_startup_duration = startup_start.elapsed();
255    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
256
257    // Start HTTP server (bridge) if enabled
258    // Currently, just start the gRPC server directly
259    // HTTP bridge functionality is disabled
260    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
261
262    // HTTP bridge is disabled, no server handle to wait for
263
264    Ok(())
265}
266
267/// Start a gRPC-only server (for backward compatibility)
268pub async fn start_dynamic_grpc_server(
269    port: u16,
270    config: DynamicGrpcConfig,
271    latency_profile: Option<mockforge_core::LatencyProfile>,
272) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
273    // Disable HTTP bridge
274    let mut grpc_only_config = config;
275    grpc_only_config.http_bridge = None;
276
277    start_dynamic_server(port, grpc_only_config, latency_profile).await
278}
279
280/// Start the gRPC-only server implementation
281async fn start_grpc_only_server(
282    port: u16,
283    config: &DynamicGrpcConfig,
284    registry_arc: Arc<ServiceRegistry>,
285    _mock_proxy: MockReflectionProxy,
286) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
287    use tonic::transport::{Certificate, Identity, ServerTlsConfig};
288
289    // Create server builder with optional TLS
290    let mut server_builder = if let Some(tls_config) = &config.tls {
291        info!("Configuring gRPC server with TLS");
292
293        // Read certificate and key files
294        let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
295            error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
296            Box::<dyn std::error::Error + Send + Sync>::from(format!(
297                "Failed to read TLS certificate: {}",
298                e
299            ))
300        })?;
301
302        let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
303            error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
304            Box::<dyn std::error::Error + Send + Sync>::from(format!(
305                "Failed to read TLS key: {}",
306                e
307            ))
308        })?;
309
310        let identity = Identity::from_pem(cert, key);
311
312        let mut tls = ServerTlsConfig::new().identity(identity);
313
314        // Add client CA for mTLS if configured
315        if let Some(client_ca_path) = &tls_config.client_ca_path {
316            info!("Configuring mutual TLS (mTLS) with client certificate verification");
317            let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
318                error!("Failed to read client CA from {}: {}", client_ca_path, e);
319                Box::<dyn std::error::Error + Send + Sync>::from(format!(
320                    "Failed to read client CA: {}",
321                    e
322                ))
323            })?;
324            tls = tls.client_ca_root(Certificate::from_pem(client_ca));
325        }
326
327        Server::builder().tls_config(tls).map_err(|e| {
328            error!("Failed to configure TLS: {}", e);
329            Box::<dyn std::error::Error + Send + Sync>::from(format!(
330                "Failed to configure TLS: {}",
331                e
332            ))
333        })?
334    } else {
335        info!("gRPC server running in plaintext mode (no TLS configured)");
336        Server::builder()
337    };
338
339    use std::net::SocketAddr;
340
341    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
342
343    // Log discovered services
344    info!(
345        "Starting gRPC server on {} with {} discovered services",
346        grpc_addr,
347        registry_arc.service_names().len()
348    );
349    for service_name in registry_arc.service_names() {
350        info!("  - Dynamic service: {}", service_name);
351    }
352
353    // Build the built-in Greeter service
354    use crate::generated::greeter_server::{Greeter, GreeterServer};
355    use crate::generated::{HelloReply, HelloRequest};
356    use tonic::{Request, Response, Status};
357
358    #[derive(Debug, Default)]
359    struct MockGreeterService;
360
361    use futures::StreamExt;
362    use std::pin::Pin;
363    use tokio_stream::wrappers::ReceiverStream;
364
365    #[tonic::async_trait]
366    impl Greeter for MockGreeterService {
367        type SayHelloStreamStream =
368            Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
369        type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
370
371        async fn say_hello(
372            &self,
373            request: Request<HelloRequest>,
374        ) -> Result<Response<HelloReply>, Status> {
375            info!("gRPC say_hello request: {:?}", request);
376            let req = request.into_inner();
377            let reply = HelloReply {
378                message: format!("Hello {}! This is a mock response from MockForge", req.name),
379                metadata: None,
380                items: vec![],
381            };
382            Ok(Response::new(reply))
383        }
384
385        async fn say_hello_stream(
386            &self,
387            request: Request<HelloRequest>,
388        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
389            info!("gRPC say_hello_stream request: {:?}", request);
390            let name = request.into_inner().name;
391            let (tx, rx) = tokio::sync::mpsc::channel(128);
392            tokio::spawn(async move {
393                for i in 1..=5 {
394                    let reply = HelloReply {
395                        message: format!(
396                            "Hello {}! Stream message {} of 5 from MockForge",
397                            name, i
398                        ),
399                        metadata: None,
400                        items: vec![],
401                    };
402                    if tx.send(Ok(reply)).await.is_err() {
403                        break;
404                    }
405                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
406                }
407            });
408            let stream = ReceiverStream::new(rx);
409            Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
410        }
411
412        async fn say_hello_client_stream(
413            &self,
414            request: Request<tonic::Streaming<HelloRequest>>,
415        ) -> Result<Response<HelloReply>, Status> {
416            info!("gRPC say_hello_client_stream started");
417            let mut stream = request.into_inner();
418            let mut names = Vec::new();
419            let mut count = 0;
420            while let Some(req) = stream.next().await {
421                match req {
422                    Ok(hello_request) => {
423                        names.push(hello_request.name);
424                        count += 1;
425                    }
426                    Err(e) => {
427                        error!("Error receiving client stream message: {}", e);
428                        return Err(Status::internal(format!("Stream error: {}", e)));
429                    }
430                }
431            }
432            let message = if names.is_empty() {
433                "Hello! No names received in the stream.".to_string()
434            } else {
435                format!(
436                    "Hello {}! Received {} messages from MockForge client stream.",
437                    names.join(", "),
438                    count
439                )
440            };
441            Ok(Response::new(HelloReply {
442                message,
443                metadata: None,
444                items: vec![],
445            }))
446        }
447
448        async fn chat(
449            &self,
450            request: Request<tonic::Streaming<HelloRequest>>,
451        ) -> Result<Response<Self::ChatStream>, Status> {
452            info!("gRPC chat (bidirectional streaming) started");
453            let mut stream = request.into_inner();
454            let (tx, rx) = tokio::sync::mpsc::channel(128);
455            tokio::spawn(async move {
456                let mut message_count = 0;
457                while let Some(req) = stream.next().await {
458                    match req {
459                        Ok(hello_request) => {
460                            message_count += 1;
461                            let reply = HelloReply {
462                                message: format!(
463                                    "Chat response {}: Hello {}! from MockForge",
464                                    message_count, hello_request.name
465                                ),
466                                metadata: None,
467                                items: vec![],
468                            };
469                            if tx.send(Ok(reply)).await.is_err() {
470                                break;
471                            }
472                        }
473                        Err(e) => {
474                            error!("Chat stream error: {}", e);
475                            let _ = tx
476                                .send(Err(Status::internal(format!("Stream error: {}", e))))
477                                .await;
478                            break;
479                        }
480                    }
481                }
482                info!("Chat session ended after {} messages", message_count);
483            });
484            let output_stream = ReceiverStream::new(rx);
485            Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
486        }
487    }
488
489    // Build tonic Routes with built-in services
490    let mut routes_builder = tonic::service::RoutesBuilder::default();
491    routes_builder.add_service(GreeterServer::new(MockGreeterService));
492    info!("Registered built-in Greeter service");
493
494    // Add reflection service if enabled
495    if config.enable_reflection {
496        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
497        let reflection_service = ReflectionBuilder::configure()
498            .register_encoded_file_descriptor_set(&encoded_fd_set)
499            .build_v1()
500            .map_err(|e| {
501                error!("Failed to build reflection service: {}", e);
502                Box::<dyn std::error::Error + Send + Sync>::from(format!(
503                    "Failed to build reflection service: {}",
504                    e
505                ))
506            })?;
507        routes_builder.add_service(reflection_service);
508        info!("gRPC reflection service enabled");
509    }
510
511    // Convert to axum router and add dynamic service fallback
512    let registry_for_fallback = registry_arc.clone();
513    let axum_router =
514        routes_builder
515            .routes()
516            .into_axum_router()
517            .fallback(move |req: axum::extract::Request| {
518                let registry = registry_for_fallback.clone();
519                async move {
520                    let path = req.uri().path().to_string();
521                    match router::parse_grpc_path(&path) {
522                        Some((service_name, method_name)) => {
523                            // Collect the request body (limit to 4MB)
524                            let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
525                                .await
526                                .unwrap_or_default();
527
528                            match router::handle_dynamic_grpc_request(
529                                &registry,
530                                service_name,
531                                method_name,
532                                body,
533                            )
534                            .await
535                            {
536                                Ok(response) => response,
537                                Err(status) => router::create_grpc_error_response(status),
538                            }
539                        }
540                        None => router::create_grpc_error_response(Status::unimplemented(
541                            "Unknown path",
542                        )),
543                    }
544                }
545            });
546
547    // Convert back to tonic Routes and serve with the configured server
548    let routes = tonic::service::Routes::from(axum_router);
549    server_builder.add_routes(routes).serve(grpc_addr).await?;
550
551    info!("gRPC server stopped");
552    Ok(())
553}
554
555// start_combined_server removed - was a stub that was never implemented
556
557#[cfg(test)]
558mod tests {
559    #[test]
560    fn test_module_compiles() {
561        // Verify this module's types and imports are valid
562    }
563}