mockforge_grpc/dynamic/
mod.rs

1//! Dynamic gRPC service discovery and registration
2//!
3//! This module provides functionality to dynamically discover and register
4//! gRPC services from proto files, making the gRPC mock system flexible
5//! for different applications and developers.
6
7pub mod http_bridge;
8pub mod proto_parser;
9pub mod service_generator;
10
11use crate::reflection::{MockReflectionProxy, ProxyConfig};
12use proto_parser::ProtoParser;
13use service_generator::DynamicGrpcService;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tonic::transport::Server;
17use tonic_reflection::server::Builder as ReflectionBuilder;
18use tracing::*;
19
20/// Configuration for dynamic gRPC service discovery
21#[derive(Debug, Clone)]
22pub struct DynamicGrpcConfig {
23    /// Directory containing proto files
24    pub proto_dir: String,
25    /// Whether to enable reflection
26    pub enable_reflection: bool,
27    /// Services to exclude from discovery
28    pub excluded_services: Vec<String>,
29    /// HTTP bridge configuration
30    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
31}
32
33impl Default for DynamicGrpcConfig {
34    fn default() -> Self {
35        Self {
36            proto_dir: "proto".to_string(),
37            enable_reflection: false,
38            excluded_services: Vec::new(),
39            http_bridge: Some(http_bridge::HttpBridgeConfig {
40                enabled: true,
41                ..Default::default()
42            }),
43        }
44    }
45}
46
47/// A registry of discovered gRPC services
48#[derive(Clone)]
49pub struct ServiceRegistry {
50    /// Map of service names to their implementations
51    services: HashMap<String, Arc<DynamicGrpcService>>,
52    /// Descriptor pool containing parsed proto definitions
53    descriptor_pool: prost_reflect::DescriptorPool,
54}
55
56impl Default for ServiceRegistry {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl ServiceRegistry {
63    /// Get the descriptor pool
64    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
65        &self.descriptor_pool
66    }
67
68    /// Create a new service registry
69    pub fn new() -> Self {
70        Self {
71            services: HashMap::new(),
72            descriptor_pool: prost_reflect::DescriptorPool::new(),
73        }
74    }
75
76    /// Create a service registry with a descriptor pool
77    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
78        Self {
79            services: HashMap::new(),
80            descriptor_pool,
81        }
82    }
83
84    /// Set the descriptor pool (useful when building registry incrementally)
85    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
86        self.descriptor_pool = pool;
87    }
88
89    /// Register a service implementation
90    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
91        self.services.insert(name, Arc::new(service));
92    }
93
94    /// Get a service by name
95    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
96        self.services.get(name)
97    }
98
99    /// List all registered service names
100    pub fn service_names(&self) -> Vec<String> {
101        self.services.keys().cloned().collect()
102    }
103}
104
105/// Discover and register services from proto files
106pub async fn discover_services(
107    config: &DynamicGrpcConfig,
108) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
109    use std::time::Instant;
110
111    let discovery_start = Instant::now();
112    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
113
114    // Parse proto files
115    let parse_start = Instant::now();
116    let mut parser = ProtoParser::new();
117    parser.parse_directory(&config.proto_dir).await?;
118    let parse_duration = parse_start.elapsed();
119    info!("Proto file parsing completed (took {:?})", parse_duration);
120
121    // Create registry with the descriptor pool from the parser
122    let registry_start = Instant::now();
123    let mut registry = ServiceRegistry::new();
124    // Extract services from parser and move descriptor pool
125    let services = parser.services().clone();
126    let descriptor_pool = parser.into_pool();
127
128    registry.set_descriptor_pool(descriptor_pool);
129    let registry_duration = registry_start.elapsed();
130    debug!("Registry creation completed (took {:?})", registry_duration);
131
132    // Create dynamic services from parsed proto definitions
133    let service_reg_start = Instant::now();
134    for (service_name, proto_service) in services {
135        // Skip excluded services
136        if config.excluded_services.contains(&service_name) {
137            info!("Skipping excluded service: {}", service_name);
138            continue;
139        }
140
141        // Create dynamic service
142        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
143        registry.register(service_name.clone(), dynamic_service);
144
145        debug!("Registered service: {}", service_name);
146    }
147    let service_reg_duration = service_reg_start.elapsed();
148    info!(
149        "Service registration completed for {} services (took {:?})",
150        registry.service_names().len(),
151        service_reg_duration
152    );
153
154    let total_discovery_duration = discovery_start.elapsed();
155    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
156    Ok(registry)
157}
158
159/// Start a dynamic server with both gRPC and HTTP bridge support
160pub async fn start_dynamic_server(
161    port: u16,
162    config: DynamicGrpcConfig,
163    latency_profile: Option<mockforge_core::LatencyProfile>,
164) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
165    use std::time::Instant;
166
167    let startup_start = Instant::now();
168
169    #[cfg(feature = "data-faker")]
170    mockforge_data::provider::register_core_faker_provider();
171
172    let _latency_injector = latency_profile
173        .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
174
175    // Discover services
176    let registry = discover_services(&config).await?;
177    let registry_arc = Arc::new(registry);
178
179    // Use shared server utilities for consistent address creation
180    let addr = mockforge_core::wildcard_socket_addr(port);
181    info!(
182        "Dynamic server listening on {} with {} services",
183        addr,
184        registry_arc.service_names().len()
185    );
186
187    // Create proxy configuration
188    let proxy_config = ProxyConfig::default();
189
190    // Create mock reflection proxy
191    let reflection_start = Instant::now();
192    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
193    let reflection_duration = reflection_start.elapsed();
194    info!("gRPC reflection proxy created (took {:?})", reflection_duration);
195
196    let total_startup_duration = startup_start.elapsed();
197    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
198
199    // Start HTTP server (bridge) if enabled
200    // Currently, just start the gRPC server directly
201    // HTTP bridge functionality is disabled
202    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
203
204    // HTTP bridge is disabled, no server handle to wait for
205
206    Ok(())
207}
208
209/// Start a gRPC-only server (for backward compatibility)
210pub async fn start_dynamic_grpc_server(
211    port: u16,
212    config: DynamicGrpcConfig,
213    latency_profile: Option<mockforge_core::LatencyProfile>,
214) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
215    // Disable HTTP bridge
216    let mut grpc_only_config = config;
217    grpc_only_config.http_bridge = None;
218
219    start_dynamic_server(port, grpc_only_config, latency_profile).await
220}
221
222/// Start the gRPC-only server implementation
223async fn start_grpc_only_server(
224    port: u16,
225    config: &DynamicGrpcConfig,
226    registry_arc: Arc<ServiceRegistry>,
227    _mock_proxy: MockReflectionProxy,
228) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
229    // Create a gRPC server with the mock proxy
230    let mut server_builder = Server::builder();
231
232    // Start actual gRPC server on the specified port
233    info!(
234        "Starting gRPC server on {} with {} discovered services",
235        mockforge_core::wildcard_socket_addr(port),
236        registry_arc.service_names().len()
237    );
238
239    // Log discovered services
240    for service_name in registry_arc.service_names() {
241        info!("  - Service: {}", service_name);
242    }
243
244    // Create a basic gRPC server that at least starts successfully
245    // Full implementation would require generating actual service implementations
246    use std::net::SocketAddr;
247
248    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
249
250    info!("gRPC server listening on {} (basic implementation)", grpc_addr);
251    info!("Discovered services are logged but not yet fully implemented:");
252    for service_name in registry_arc.service_names() {
253        info!("  - {}", service_name);
254    }
255
256    // Create a basic gRPC server with the discovered services
257    use crate::generated::greeter_server::{Greeter, GreeterServer};
258    use crate::generated::{HelloReply, HelloRequest};
259    use tonic::{Request, Response, Status};
260
261    // Basic implementation of the Greeter service
262    #[derive(Debug, Default)]
263    pub struct MockGreeterService;
264
265    #[tonic::async_trait]
266    impl Greeter for MockGreeterService {
267        type SayHelloStreamStream = futures::stream::Empty<Result<HelloReply, Status>>;
268        type ChatStream = futures::stream::Empty<Result<HelloReply, Status>>;
269
270        async fn say_hello(
271            &self,
272            request: Request<HelloRequest>,
273        ) -> Result<Response<HelloReply>, Status> {
274            println!("Got a request: {:?}", request);
275
276            let req = request.into_inner();
277            let reply = HelloReply {
278                message: format!("Hello {}! This is a mock response from MockForge", req.name),
279                metadata: None,
280                items: vec![],
281            };
282
283            Ok(Response::new(reply))
284        }
285
286        async fn say_hello_stream(
287            &self,
288            _request: Request<HelloRequest>,
289        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
290            Err(Status::unimplemented("say_hello_stream not yet implemented"))
291        }
292
293        async fn say_hello_client_stream(
294            &self,
295            _request: Request<tonic::Streaming<HelloRequest>>,
296        ) -> Result<Response<HelloReply>, Status> {
297            Err(Status::unimplemented("say_hello_client_stream not yet implemented"))
298        }
299
300        async fn chat(
301            &self,
302            _request: Request<tonic::Streaming<HelloRequest>>,
303        ) -> Result<Response<Self::ChatStream>, Status> {
304            Err(Status::unimplemented("chat not yet implemented"))
305        }
306    }
307
308    let greeter = MockGreeterService;
309
310    info!("gRPC server listening on {} with Greeter service", grpc_addr);
311
312    // Build the server with services
313    let mut router = server_builder.add_service(GreeterServer::new(greeter));
314
315    // Add reflection service if enabled
316    if config.enable_reflection {
317        // Build reflection service from the descriptor pool
318        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
319        let reflection_service = ReflectionBuilder::configure()
320            .register_encoded_file_descriptor_set(&encoded_fd_set)
321            .build_v1()
322            .map_err(|e| {
323                error!("Failed to build reflection service: {}", e);
324                Box::<dyn std::error::Error + Send + Sync>::from(format!(
325                    "Failed to build reflection service: {}",
326                    e
327                ))
328            })?;
329
330        router = router.add_service(reflection_service);
331        info!("gRPC reflection service enabled");
332    }
333
334    router.serve(grpc_addr).await?;
335
336    Ok(())
337}
338
339// start_combined_server removed - was a stub that was never implemented
340
341#[cfg(test)]
342mod tests {
343
344    #[test]
345    fn test_module_compiles() {}
346}