Skip to main content

mockforge_grpc/dynamic/
mod.rs

1//! Dynamic gRPC service discovery and registration
2//!
3//! This module provides functionality to dynamically discover and register
4//! gRPC services from proto files, making the gRPC mock system flexible
5//! for different applications and developers.
6
7pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21/// TLS configuration for gRPC server
22#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24    /// Path to the TLS certificate file (PEM format)
25    pub cert_path: String,
26    /// Path to the TLS private key file (PEM format)
27    pub key_path: String,
28    /// Optional path to CA certificate for client certificate verification (mTLS)
29    pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33    /// Create a new TLS configuration
34    pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35        Self {
36            cert_path: cert_path.into(),
37            key_path: key_path.into(),
38            client_ca_path: None,
39        }
40    }
41
42    /// Create TLS configuration with mutual TLS (client certificate verification)
43    pub fn with_mtls(
44        cert_path: impl Into<String>,
45        key_path: impl Into<String>,
46        client_ca_path: impl Into<String>,
47    ) -> Self {
48        Self {
49            cert_path: cert_path.into(),
50            key_path: key_path.into(),
51            client_ca_path: Some(client_ca_path.into()),
52        }
53    }
54
55    /// Create TLS configuration from environment variables
56    ///
57    /// Uses:
58    /// - GRPC_TLS_CERT: Path to certificate file
59    /// - GRPC_TLS_KEY: Path to private key file
60    /// - GRPC_TLS_CLIENT_CA: Optional path to client CA for mTLS
61    pub fn from_env() -> Option<Self> {
62        let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63        let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64        let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66        Some(Self {
67            cert_path,
68            key_path,
69            client_ca_path,
70        })
71    }
72}
73
74/// Configuration for dynamic gRPC service discovery
75#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77    /// Directory containing proto files
78    pub proto_dir: String,
79    /// Whether to enable reflection
80    pub enable_reflection: bool,
81    /// Services to exclude from discovery
82    pub excluded_services: Vec<String>,
83    /// HTTP bridge configuration
84    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85    /// TLS configuration (None for plaintext)
86    pub tls: Option<GrpcTlsConfig>,
87    /// Per-method response overrides. Populated by callers translating from
88    /// `mockforge_core::config::GrpcConfig`.
89    pub overrides: Vec<mockforge_core::config::GrpcOverride>,
90}
91
92impl Default for DynamicGrpcConfig {
93    fn default() -> Self {
94        Self {
95            proto_dir: "proto".to_string(),
96            enable_reflection: false,
97            excluded_services: Vec::new(),
98            overrides: Vec::new(),
99            http_bridge: Some(http_bridge::HttpBridgeConfig {
100                enabled: true,
101                ..Default::default()
102            }),
103            // Check for TLS configuration from environment
104            tls: GrpcTlsConfig::from_env(),
105        }
106    }
107}
108
109/// A registry of discovered gRPC services
110#[derive(Clone)]
111pub struct ServiceRegistry {
112    /// Map of service names to their implementations
113    services: HashMap<String, Arc<DynamicGrpcService>>,
114    /// Descriptor pool containing parsed proto definitions
115    descriptor_pool: prost_reflect::DescriptorPool,
116    /// Per-method override rules from config; the router checks these before
117    /// falling through to smart-mock generation.
118    overrides: Arc<Vec<mockforge_core::config::GrpcOverride>>,
119}
120
121impl Default for ServiceRegistry {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl ServiceRegistry {
128    /// Get the descriptor pool
129    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
130        &self.descriptor_pool
131    }
132
133    /// Create a new service registry
134    pub fn new() -> Self {
135        Self {
136            services: HashMap::new(),
137            descriptor_pool: prost_reflect::DescriptorPool::new(),
138            overrides: Arc::new(Vec::new()),
139        }
140    }
141
142    /// Create a service registry with a descriptor pool
143    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
144        Self {
145            services: HashMap::new(),
146            descriptor_pool,
147            overrides: Arc::new(Vec::new()),
148        }
149    }
150
151    /// Replace the override list with `overrides`. Typically called once at
152    /// server start with the rules from `GrpcConfig::overrides`.
153    pub fn set_overrides(&mut self, overrides: Vec<mockforge_core::config::GrpcOverride>) {
154        self.overrides = Arc::new(overrides);
155    }
156
157    /// Get the configured override list.
158    pub fn overrides(&self) -> &[mockforge_core::config::GrpcOverride] {
159        self.overrides.as_ref()
160    }
161
162    /// Set the descriptor pool (useful when building registry incrementally)
163    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
164        self.descriptor_pool = pool;
165    }
166
167    /// Register a service implementation
168    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
169        self.services.insert(name, Arc::new(service));
170    }
171
172    /// Get a service by name
173    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
174        self.services.get(name)
175    }
176
177    /// List all registered service names
178    pub fn service_names(&self) -> Vec<String> {
179        self.services.keys().cloned().collect()
180    }
181}
182
183/// Discover and register services from proto files
184pub async fn discover_services(
185    config: &DynamicGrpcConfig,
186) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
187    use std::time::Instant;
188
189    let discovery_start = Instant::now();
190    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
191
192    // Parse proto files
193    let parse_start = Instant::now();
194    let mut parser = ProtoParser::new();
195    parser.parse_directory(&config.proto_dir).await?;
196    let parse_duration = parse_start.elapsed();
197    info!("Proto file parsing completed (took {:?})", parse_duration);
198
199    // Create registry with the descriptor pool from the parser
200    let registry_start = Instant::now();
201    let mut registry = ServiceRegistry::new();
202    // Extract services from parser and move descriptor pool
203    let services = parser.services().clone();
204    let descriptor_pool = parser.into_pool();
205
206    registry.set_descriptor_pool(descriptor_pool);
207    if !config.overrides.is_empty() {
208        info!("Loaded {} gRPC method override rule(s) from config", config.overrides.len());
209        registry.set_overrides(config.overrides.clone());
210    }
211    let registry_duration = registry_start.elapsed();
212    debug!("Registry creation completed (took {:?})", registry_duration);
213
214    // Create dynamic services from parsed proto definitions
215    let service_reg_start = Instant::now();
216    for (service_name, proto_service) in services {
217        // Skip excluded services
218        if config.excluded_services.contains(&service_name) {
219            info!("Skipping excluded service: {}", service_name);
220            continue;
221        }
222
223        // Create dynamic service
224        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
225        registry.register(service_name.clone(), dynamic_service);
226
227        debug!("Registered service: {}", service_name);
228    }
229    let service_reg_duration = service_reg_start.elapsed();
230    info!(
231        "Service registration completed for {} services (took {:?})",
232        registry.service_names().len(),
233        service_reg_duration
234    );
235
236    let total_discovery_duration = discovery_start.elapsed();
237    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
238    Ok(registry)
239}
240
241/// Start a dynamic server with both gRPC and HTTP bridge support
242pub async fn start_dynamic_server(
243    port: u16,
244    config: DynamicGrpcConfig,
245    latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247    use std::time::Instant;
248
249    let startup_start = Instant::now();
250
251    #[cfg(feature = "data-faker")]
252    mockforge_data::provider::register_core_faker_provider();
253
254    let _latency_injector = latency_profile.map(|profile| {
255        mockforge_foundation::latency::LatencyInjector::new(profile, Default::default())
256    });
257
258    // Discover services
259    let registry = discover_services(&config).await?;
260    let registry_arc = Arc::new(registry);
261
262    // Use shared server utilities for consistent address creation
263    let addr = mockforge_core::wildcard_socket_addr(port);
264    info!(
265        "Dynamic server listening on {} with {} services",
266        addr,
267        registry_arc.service_names().len()
268    );
269
270    // Create proxy configuration
271    let proxy_config = ProxyConfig::default();
272
273    // Create mock reflection proxy
274    let reflection_start = Instant::now();
275    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
276    let reflection_duration = reflection_start.elapsed();
277    info!("gRPC reflection proxy created (took {:?})", reflection_duration);
278
279    let total_startup_duration = startup_start.elapsed();
280    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
281
282    // Start HTTP server (bridge) if enabled
283    // Currently, just start the gRPC server directly
284    // HTTP bridge functionality is disabled
285    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
286
287    // HTTP bridge is disabled, no server handle to wait for
288
289    Ok(())
290}
291
292/// Start a gRPC-only server (for backward compatibility)
293pub async fn start_dynamic_grpc_server(
294    port: u16,
295    config: DynamicGrpcConfig,
296    latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
297) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
298    // Disable HTTP bridge
299    let mut grpc_only_config = config;
300    grpc_only_config.http_bridge = None;
301
302    start_dynamic_server(port, grpc_only_config, latency_profile).await
303}
304
305/// Start the gRPC-only server implementation
306async fn start_grpc_only_server(
307    port: u16,
308    config: &DynamicGrpcConfig,
309    registry_arc: Arc<ServiceRegistry>,
310    _mock_proxy: MockReflectionProxy,
311) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
312    use tonic::transport::{Certificate, Identity, ServerTlsConfig};
313
314    // Create server builder with optional TLS
315    let mut server_builder = if let Some(tls_config) = &config.tls {
316        info!("Configuring gRPC server with TLS");
317
318        // Read certificate and key files
319        let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
320            error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
321            Box::<dyn std::error::Error + Send + Sync>::from(format!(
322                "Failed to read TLS certificate: {}",
323                e
324            ))
325        })?;
326
327        let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
328            error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
329            Box::<dyn std::error::Error + Send + Sync>::from(format!(
330                "Failed to read TLS key: {}",
331                e
332            ))
333        })?;
334
335        let identity = Identity::from_pem(cert, key);
336
337        let mut tls = ServerTlsConfig::new().identity(identity);
338
339        // Add client CA for mTLS if configured
340        if let Some(client_ca_path) = &tls_config.client_ca_path {
341            info!("Configuring mutual TLS (mTLS) with client certificate verification");
342            let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
343                error!("Failed to read client CA from {}: {}", client_ca_path, e);
344                Box::<dyn std::error::Error + Send + Sync>::from(format!(
345                    "Failed to read client CA: {}",
346                    e
347                ))
348            })?;
349            tls = tls.client_ca_root(Certificate::from_pem(client_ca));
350        }
351
352        Server::builder().tls_config(tls).map_err(|e| {
353            error!("Failed to configure TLS: {}", e);
354            Box::<dyn std::error::Error + Send + Sync>::from(format!(
355                "Failed to configure TLS: {}",
356                e
357            ))
358        })?
359    } else {
360        info!("gRPC server running in plaintext mode (no TLS configured)");
361        Server::builder()
362    };
363
364    use std::net::SocketAddr;
365
366    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
367
368    // Log discovered services
369    info!(
370        "Starting gRPC server on {} with {} discovered services",
371        grpc_addr,
372        registry_arc.service_names().len()
373    );
374    for service_name in registry_arc.service_names() {
375        info!("  - Dynamic service: {}", service_name);
376    }
377
378    // Build the built-in Greeter service
379    use crate::generated::greeter_server::{Greeter, GreeterServer};
380    use crate::generated::{HelloReply, HelloRequest};
381    use tonic::{Request, Response, Status};
382
383    #[derive(Debug, Default)]
384    struct MockGreeterService;
385
386    use futures::StreamExt;
387    use std::pin::Pin;
388    use tokio_stream::wrappers::ReceiverStream;
389
390    #[tonic::async_trait]
391    impl Greeter for MockGreeterService {
392        type SayHelloStreamStream =
393            Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
394        type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
395
396        async fn say_hello(
397            &self,
398            request: Request<HelloRequest>,
399        ) -> Result<Response<HelloReply>, Status> {
400            info!("gRPC say_hello request: {:?}", request);
401            let req = request.into_inner();
402            let reply = HelloReply {
403                message: format!("Hello {}! This is a mock response from MockForge", req.name),
404                metadata: None,
405                items: vec![],
406            };
407            Ok(Response::new(reply))
408        }
409
410        async fn say_hello_stream(
411            &self,
412            request: Request<HelloRequest>,
413        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
414            info!("gRPC say_hello_stream request: {:?}", request);
415            let name = request.into_inner().name;
416            let (tx, rx) = tokio::sync::mpsc::channel(128);
417            tokio::spawn(async move {
418                for i in 1..=5 {
419                    let reply = HelloReply {
420                        message: format!(
421                            "Hello {}! Stream message {} of 5 from MockForge",
422                            name, i
423                        ),
424                        metadata: None,
425                        items: vec![],
426                    };
427                    if tx.send(Ok(reply)).await.is_err() {
428                        break;
429                    }
430                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
431                }
432            });
433            let stream = ReceiverStream::new(rx);
434            Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
435        }
436
437        async fn say_hello_client_stream(
438            &self,
439            request: Request<tonic::Streaming<HelloRequest>>,
440        ) -> Result<Response<HelloReply>, Status> {
441            info!("gRPC say_hello_client_stream started");
442            let mut stream = request.into_inner();
443            let mut names = Vec::new();
444            let mut count = 0;
445            while let Some(req) = stream.next().await {
446                match req {
447                    Ok(hello_request) => {
448                        names.push(hello_request.name);
449                        count += 1;
450                    }
451                    Err(e) => {
452                        error!("Error receiving client stream message: {}", e);
453                        return Err(Status::internal(format!("Stream error: {}", e)));
454                    }
455                }
456            }
457            let message = if names.is_empty() {
458                "Hello! No names received in the stream.".to_string()
459            } else {
460                format!(
461                    "Hello {}! Received {} messages from MockForge client stream.",
462                    names.join(", "),
463                    count
464                )
465            };
466            Ok(Response::new(HelloReply {
467                message,
468                metadata: None,
469                items: vec![],
470            }))
471        }
472
473        async fn chat(
474            &self,
475            request: Request<tonic::Streaming<HelloRequest>>,
476        ) -> Result<Response<Self::ChatStream>, Status> {
477            info!("gRPC chat (bidirectional streaming) started");
478            let mut stream = request.into_inner();
479            let (tx, rx) = tokio::sync::mpsc::channel(128);
480            tokio::spawn(async move {
481                let mut message_count = 0;
482                while let Some(req) = stream.next().await {
483                    match req {
484                        Ok(hello_request) => {
485                            message_count += 1;
486                            let reply = HelloReply {
487                                message: format!(
488                                    "Chat response {}: Hello {}! from MockForge",
489                                    message_count, hello_request.name
490                                ),
491                                metadata: None,
492                                items: vec![],
493                            };
494                            if tx.send(Ok(reply)).await.is_err() {
495                                break;
496                            }
497                        }
498                        Err(e) => {
499                            error!("Chat stream error: {}", e);
500                            let _ = tx
501                                .send(Err(Status::internal(format!("Stream error: {}", e))))
502                                .await;
503                            break;
504                        }
505                    }
506                }
507                info!("Chat session ended after {} messages", message_count);
508            });
509            let output_stream = ReceiverStream::new(rx);
510            Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
511        }
512    }
513
514    // Build tonic Routes with built-in services
515    let mut routes_builder = tonic::service::RoutesBuilder::default();
516    routes_builder.add_service(GreeterServer::new(MockGreeterService));
517    info!("Registered built-in Greeter service");
518
519    // Add reflection service if enabled
520    if config.enable_reflection {
521        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
522        let reflection_service = ReflectionBuilder::configure()
523            .register_encoded_file_descriptor_set(&encoded_fd_set)
524            .build_v1()
525            .map_err(|e| {
526                error!("Failed to build reflection service: {}", e);
527                Box::<dyn std::error::Error + Send + Sync>::from(format!(
528                    "Failed to build reflection service: {}",
529                    e
530                ))
531            })?;
532        routes_builder.add_service(reflection_service);
533        info!("gRPC reflection service enabled");
534    }
535
536    // Convert to axum router and add dynamic service fallback
537    let registry_for_fallback = registry_arc.clone();
538    let axum_router =
539        routes_builder
540            .routes()
541            .into_axum_router()
542            .fallback(move |req: axum::extract::Request| {
543                let registry = registry_for_fallback.clone();
544                async move {
545                    let path = req.uri().path().to_string();
546                    match router::parse_grpc_path(&path) {
547                        Some((service_name, method_name)) => {
548                            // Collect the request body (limit to 4MB)
549                            let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
550                                .await
551                                .unwrap_or_default();
552
553                            match router::handle_dynamic_grpc_request(
554                                &registry,
555                                service_name,
556                                method_name,
557                                body,
558                            )
559                            .await
560                            {
561                                Ok(response) => response,
562                                Err(status) => router::create_grpc_error_response(status),
563                            }
564                        }
565                        None => router::create_grpc_error_response(Status::unimplemented(
566                            "Unknown path",
567                        )),
568                    }
569                }
570            });
571
572    // Convert back to tonic Routes and serve with the configured server
573    let routes = tonic::service::Routes::from(axum_router);
574    server_builder.add_routes(routes).serve(grpc_addr).await?;
575
576    info!("gRPC server stopped");
577    Ok(())
578}
579
580// start_combined_server removed - was a stub that was never implemented
581
582#[cfg(test)]
583mod tests {
584    #[test]
585    fn test_module_compiles() {
586        // Verify this module's types and imports are valid
587    }
588}