1pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24 pub cert_path: String,
26 pub key_path: String,
28 pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33 pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35 Self {
36 cert_path: cert_path.into(),
37 key_path: key_path.into(),
38 client_ca_path: None,
39 }
40 }
41
42 pub fn with_mtls(
44 cert_path: impl Into<String>,
45 key_path: impl Into<String>,
46 client_ca_path: impl Into<String>,
47 ) -> Self {
48 Self {
49 cert_path: cert_path.into(),
50 key_path: key_path.into(),
51 client_ca_path: Some(client_ca_path.into()),
52 }
53 }
54
55 pub fn from_env() -> Option<Self> {
62 let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63 let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64 let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66 Some(Self {
67 cert_path,
68 key_path,
69 client_ca_path,
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77 pub proto_dir: String,
79 pub enable_reflection: bool,
81 pub excluded_services: Vec<String>,
83 pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85 pub tls: Option<GrpcTlsConfig>,
87}
88
89impl Default for DynamicGrpcConfig {
90 fn default() -> Self {
91 Self {
92 proto_dir: "proto".to_string(),
93 enable_reflection: false,
94 excluded_services: Vec::new(),
95 http_bridge: Some(http_bridge::HttpBridgeConfig {
96 enabled: true,
97 ..Default::default()
98 }),
99 tls: GrpcTlsConfig::from_env(),
101 }
102 }
103}
104
105#[derive(Clone)]
107pub struct ServiceRegistry {
108 services: HashMap<String, Arc<DynamicGrpcService>>,
110 descriptor_pool: prost_reflect::DescriptorPool,
112}
113
114impl Default for ServiceRegistry {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl ServiceRegistry {
121 pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
123 &self.descriptor_pool
124 }
125
126 pub fn new() -> Self {
128 Self {
129 services: HashMap::new(),
130 descriptor_pool: prost_reflect::DescriptorPool::new(),
131 }
132 }
133
134 pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
136 Self {
137 services: HashMap::new(),
138 descriptor_pool,
139 }
140 }
141
142 pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
144 self.descriptor_pool = pool;
145 }
146
147 pub fn register(&mut self, name: String, service: DynamicGrpcService) {
149 self.services.insert(name, Arc::new(service));
150 }
151
152 pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
154 self.services.get(name)
155 }
156
157 pub fn service_names(&self) -> Vec<String> {
159 self.services.keys().cloned().collect()
160 }
161}
162
163pub async fn discover_services(
165 config: &DynamicGrpcConfig,
166) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
167 use std::time::Instant;
168
169 let discovery_start = Instant::now();
170 info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
171
172 let parse_start = Instant::now();
174 let mut parser = ProtoParser::new();
175 parser.parse_directory(&config.proto_dir).await?;
176 let parse_duration = parse_start.elapsed();
177 info!("Proto file parsing completed (took {:?})", parse_duration);
178
179 let registry_start = Instant::now();
181 let mut registry = ServiceRegistry::new();
182 let services = parser.services().clone();
184 let descriptor_pool = parser.into_pool();
185
186 registry.set_descriptor_pool(descriptor_pool);
187 let registry_duration = registry_start.elapsed();
188 debug!("Registry creation completed (took {:?})", registry_duration);
189
190 let service_reg_start = Instant::now();
192 for (service_name, proto_service) in services {
193 if config.excluded_services.contains(&service_name) {
195 info!("Skipping excluded service: {}", service_name);
196 continue;
197 }
198
199 let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
201 registry.register(service_name.clone(), dynamic_service);
202
203 debug!("Registered service: {}", service_name);
204 }
205 let service_reg_duration = service_reg_start.elapsed();
206 info!(
207 "Service registration completed for {} services (took {:?})",
208 registry.service_names().len(),
209 service_reg_duration
210 );
211
212 let total_discovery_duration = discovery_start.elapsed();
213 info!("Service discovery completed (total time: {:?})", total_discovery_duration);
214 Ok(registry)
215}
216
217pub async fn start_dynamic_server(
219 port: u16,
220 config: DynamicGrpcConfig,
221 latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
222) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223 use std::time::Instant;
224
225 let startup_start = Instant::now();
226
227 #[cfg(feature = "data-faker")]
228 mockforge_data::provider::register_core_faker_provider();
229
230 let _latency_injector = latency_profile.map(|profile| {
231 mockforge_foundation::latency::LatencyInjector::new(profile, Default::default())
232 });
233
234 let registry = discover_services(&config).await?;
236 let registry_arc = Arc::new(registry);
237
238 let addr = mockforge_core::wildcard_socket_addr(port);
240 info!(
241 "Dynamic server listening on {} with {} services",
242 addr,
243 registry_arc.service_names().len()
244 );
245
246 let proxy_config = ProxyConfig::default();
248
249 let reflection_start = Instant::now();
251 let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
252 let reflection_duration = reflection_start.elapsed();
253 info!("gRPC reflection proxy created (took {:?})", reflection_duration);
254
255 let total_startup_duration = startup_start.elapsed();
256 info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
257
258 start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
262
263 Ok(())
266}
267
268pub async fn start_dynamic_grpc_server(
270 port: u16,
271 config: DynamicGrpcConfig,
272 latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
273) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
274 let mut grpc_only_config = config;
276 grpc_only_config.http_bridge = None;
277
278 start_dynamic_server(port, grpc_only_config, latency_profile).await
279}
280
281async fn start_grpc_only_server(
283 port: u16,
284 config: &DynamicGrpcConfig,
285 registry_arc: Arc<ServiceRegistry>,
286 _mock_proxy: MockReflectionProxy,
287) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
288 use tonic::transport::{Certificate, Identity, ServerTlsConfig};
289
290 let mut server_builder = if let Some(tls_config) = &config.tls {
292 info!("Configuring gRPC server with TLS");
293
294 let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
296 error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
297 Box::<dyn std::error::Error + Send + Sync>::from(format!(
298 "Failed to read TLS certificate: {}",
299 e
300 ))
301 })?;
302
303 let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
304 error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
305 Box::<dyn std::error::Error + Send + Sync>::from(format!(
306 "Failed to read TLS key: {}",
307 e
308 ))
309 })?;
310
311 let identity = Identity::from_pem(cert, key);
312
313 let mut tls = ServerTlsConfig::new().identity(identity);
314
315 if let Some(client_ca_path) = &tls_config.client_ca_path {
317 info!("Configuring mutual TLS (mTLS) with client certificate verification");
318 let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
319 error!("Failed to read client CA from {}: {}", client_ca_path, e);
320 Box::<dyn std::error::Error + Send + Sync>::from(format!(
321 "Failed to read client CA: {}",
322 e
323 ))
324 })?;
325 tls = tls.client_ca_root(Certificate::from_pem(client_ca));
326 }
327
328 Server::builder().tls_config(tls).map_err(|e| {
329 error!("Failed to configure TLS: {}", e);
330 Box::<dyn std::error::Error + Send + Sync>::from(format!(
331 "Failed to configure TLS: {}",
332 e
333 ))
334 })?
335 } else {
336 info!("gRPC server running in plaintext mode (no TLS configured)");
337 Server::builder()
338 };
339
340 use std::net::SocketAddr;
341
342 let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
343
344 info!(
346 "Starting gRPC server on {} with {} discovered services",
347 grpc_addr,
348 registry_arc.service_names().len()
349 );
350 for service_name in registry_arc.service_names() {
351 info!(" - Dynamic service: {}", service_name);
352 }
353
354 use crate::generated::greeter_server::{Greeter, GreeterServer};
356 use crate::generated::{HelloReply, HelloRequest};
357 use tonic::{Request, Response, Status};
358
359 #[derive(Debug, Default)]
360 struct MockGreeterService;
361
362 use futures::StreamExt;
363 use std::pin::Pin;
364 use tokio_stream::wrappers::ReceiverStream;
365
366 #[tonic::async_trait]
367 impl Greeter for MockGreeterService {
368 type SayHelloStreamStream =
369 Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
370 type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
371
372 async fn say_hello(
373 &self,
374 request: Request<HelloRequest>,
375 ) -> Result<Response<HelloReply>, Status> {
376 info!("gRPC say_hello request: {:?}", request);
377 let req = request.into_inner();
378 let reply = HelloReply {
379 message: format!("Hello {}! This is a mock response from MockForge", req.name),
380 metadata: None,
381 items: vec![],
382 };
383 Ok(Response::new(reply))
384 }
385
386 async fn say_hello_stream(
387 &self,
388 request: Request<HelloRequest>,
389 ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
390 info!("gRPC say_hello_stream request: {:?}", request);
391 let name = request.into_inner().name;
392 let (tx, rx) = tokio::sync::mpsc::channel(128);
393 tokio::spawn(async move {
394 for i in 1..=5 {
395 let reply = HelloReply {
396 message: format!(
397 "Hello {}! Stream message {} of 5 from MockForge",
398 name, i
399 ),
400 metadata: None,
401 items: vec![],
402 };
403 if tx.send(Ok(reply)).await.is_err() {
404 break;
405 }
406 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
407 }
408 });
409 let stream = ReceiverStream::new(rx);
410 Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
411 }
412
413 async fn say_hello_client_stream(
414 &self,
415 request: Request<tonic::Streaming<HelloRequest>>,
416 ) -> Result<Response<HelloReply>, Status> {
417 info!("gRPC say_hello_client_stream started");
418 let mut stream = request.into_inner();
419 let mut names = Vec::new();
420 let mut count = 0;
421 while let Some(req) = stream.next().await {
422 match req {
423 Ok(hello_request) => {
424 names.push(hello_request.name);
425 count += 1;
426 }
427 Err(e) => {
428 error!("Error receiving client stream message: {}", e);
429 return Err(Status::internal(format!("Stream error: {}", e)));
430 }
431 }
432 }
433 let message = if names.is_empty() {
434 "Hello! No names received in the stream.".to_string()
435 } else {
436 format!(
437 "Hello {}! Received {} messages from MockForge client stream.",
438 names.join(", "),
439 count
440 )
441 };
442 Ok(Response::new(HelloReply {
443 message,
444 metadata: None,
445 items: vec![],
446 }))
447 }
448
449 async fn chat(
450 &self,
451 request: Request<tonic::Streaming<HelloRequest>>,
452 ) -> Result<Response<Self::ChatStream>, Status> {
453 info!("gRPC chat (bidirectional streaming) started");
454 let mut stream = request.into_inner();
455 let (tx, rx) = tokio::sync::mpsc::channel(128);
456 tokio::spawn(async move {
457 let mut message_count = 0;
458 while let Some(req) = stream.next().await {
459 match req {
460 Ok(hello_request) => {
461 message_count += 1;
462 let reply = HelloReply {
463 message: format!(
464 "Chat response {}: Hello {}! from MockForge",
465 message_count, hello_request.name
466 ),
467 metadata: None,
468 items: vec![],
469 };
470 if tx.send(Ok(reply)).await.is_err() {
471 break;
472 }
473 }
474 Err(e) => {
475 error!("Chat stream error: {}", e);
476 let _ = tx
477 .send(Err(Status::internal(format!("Stream error: {}", e))))
478 .await;
479 break;
480 }
481 }
482 }
483 info!("Chat session ended after {} messages", message_count);
484 });
485 let output_stream = ReceiverStream::new(rx);
486 Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
487 }
488 }
489
490 let mut routes_builder = tonic::service::RoutesBuilder::default();
492 routes_builder.add_service(GreeterServer::new(MockGreeterService));
493 info!("Registered built-in Greeter service");
494
495 if config.enable_reflection {
497 let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
498 let reflection_service = ReflectionBuilder::configure()
499 .register_encoded_file_descriptor_set(&encoded_fd_set)
500 .build_v1()
501 .map_err(|e| {
502 error!("Failed to build reflection service: {}", e);
503 Box::<dyn std::error::Error + Send + Sync>::from(format!(
504 "Failed to build reflection service: {}",
505 e
506 ))
507 })?;
508 routes_builder.add_service(reflection_service);
509 info!("gRPC reflection service enabled");
510 }
511
512 let registry_for_fallback = registry_arc.clone();
514 let axum_router =
515 routes_builder
516 .routes()
517 .into_axum_router()
518 .fallback(move |req: axum::extract::Request| {
519 let registry = registry_for_fallback.clone();
520 async move {
521 let path = req.uri().path().to_string();
522 match router::parse_grpc_path(&path) {
523 Some((service_name, method_name)) => {
524 let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
526 .await
527 .unwrap_or_default();
528
529 match router::handle_dynamic_grpc_request(
530 ®istry,
531 service_name,
532 method_name,
533 body,
534 )
535 .await
536 {
537 Ok(response) => response,
538 Err(status) => router::create_grpc_error_response(status),
539 }
540 }
541 None => router::create_grpc_error_response(Status::unimplemented(
542 "Unknown path",
543 )),
544 }
545 }
546 });
547
548 let routes = tonic::service::Routes::from(axum_router);
550 server_builder.add_routes(routes).serve(grpc_addr).await?;
551
552 info!("gRPC server stopped");
553 Ok(())
554}
555
556#[cfg(test)]
559mod tests {
560 #[test]
561 fn test_module_compiles() {
562 }
564}