1pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24 pub cert_path: String,
26 pub key_path: String,
28 pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33 pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35 Self {
36 cert_path: cert_path.into(),
37 key_path: key_path.into(),
38 client_ca_path: None,
39 }
40 }
41
42 pub fn with_mtls(
44 cert_path: impl Into<String>,
45 key_path: impl Into<String>,
46 client_ca_path: impl Into<String>,
47 ) -> Self {
48 Self {
49 cert_path: cert_path.into(),
50 key_path: key_path.into(),
51 client_ca_path: Some(client_ca_path.into()),
52 }
53 }
54
55 pub fn from_env() -> Option<Self> {
62 let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63 let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64 let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66 Some(Self {
67 cert_path,
68 key_path,
69 client_ca_path,
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77 pub proto_dir: String,
79 pub enable_reflection: bool,
81 pub excluded_services: Vec<String>,
83 pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85 pub tls: Option<GrpcTlsConfig>,
87 pub overrides: Vec<mockforge_core::config::GrpcOverride>,
90}
91
92impl Default for DynamicGrpcConfig {
93 fn default() -> Self {
94 Self {
95 proto_dir: "proto".to_string(),
96 enable_reflection: false,
97 excluded_services: Vec::new(),
98 overrides: Vec::new(),
99 http_bridge: Some(http_bridge::HttpBridgeConfig {
100 enabled: true,
101 ..Default::default()
102 }),
103 tls: GrpcTlsConfig::from_env(),
105 }
106 }
107}
108
109#[derive(Clone)]
111pub struct ServiceRegistry {
112 services: HashMap<String, Arc<DynamicGrpcService>>,
114 descriptor_pool: prost_reflect::DescriptorPool,
116 overrides: Arc<Vec<mockforge_core::config::GrpcOverride>>,
119}
120
121impl Default for ServiceRegistry {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl ServiceRegistry {
128 pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
130 &self.descriptor_pool
131 }
132
133 pub fn new() -> Self {
135 Self {
136 services: HashMap::new(),
137 descriptor_pool: prost_reflect::DescriptorPool::new(),
138 overrides: Arc::new(Vec::new()),
139 }
140 }
141
142 pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
144 Self {
145 services: HashMap::new(),
146 descriptor_pool,
147 overrides: Arc::new(Vec::new()),
148 }
149 }
150
151 pub fn set_overrides(&mut self, overrides: Vec<mockforge_core::config::GrpcOverride>) {
154 self.overrides = Arc::new(overrides);
155 }
156
157 pub fn overrides(&self) -> &[mockforge_core::config::GrpcOverride] {
159 self.overrides.as_ref()
160 }
161
162 pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
164 self.descriptor_pool = pool;
165 }
166
167 pub fn register(&mut self, name: String, service: DynamicGrpcService) {
169 self.services.insert(name, Arc::new(service));
170 }
171
172 pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
174 self.services.get(name)
175 }
176
177 pub fn service_names(&self) -> Vec<String> {
179 self.services.keys().cloned().collect()
180 }
181}
182
183pub async fn discover_services(
185 config: &DynamicGrpcConfig,
186) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
187 use std::time::Instant;
188
189 let discovery_start = Instant::now();
190 info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
191
192 let parse_start = Instant::now();
194 let mut parser = ProtoParser::new();
195 parser.parse_directory(&config.proto_dir).await?;
196 let parse_duration = parse_start.elapsed();
197 info!("Proto file parsing completed (took {:?})", parse_duration);
198
199 let registry_start = Instant::now();
201 let mut registry = ServiceRegistry::new();
202 let services = parser.services().clone();
204 let descriptor_pool = parser.into_pool();
205
206 registry.set_descriptor_pool(descriptor_pool);
207 if !config.overrides.is_empty() {
208 info!("Loaded {} gRPC method override rule(s) from config", config.overrides.len());
209 registry.set_overrides(config.overrides.clone());
210 }
211 let registry_duration = registry_start.elapsed();
212 debug!("Registry creation completed (took {:?})", registry_duration);
213
214 let service_reg_start = Instant::now();
216 for (service_name, proto_service) in services {
217 if config.excluded_services.contains(&service_name) {
219 info!("Skipping excluded service: {}", service_name);
220 continue;
221 }
222
223 let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
225 registry.register(service_name.clone(), dynamic_service);
226
227 debug!("Registered service: {}", service_name);
228 }
229 let service_reg_duration = service_reg_start.elapsed();
230 info!(
231 "Service registration completed for {} services (took {:?})",
232 registry.service_names().len(),
233 service_reg_duration
234 );
235
236 let total_discovery_duration = discovery_start.elapsed();
237 info!("Service discovery completed (total time: {:?})", total_discovery_duration);
238 Ok(registry)
239}
240
241pub async fn start_dynamic_server(
243 port: u16,
244 config: DynamicGrpcConfig,
245 latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
246) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
247 use std::time::Instant;
248
249 let startup_start = Instant::now();
250
251 #[cfg(feature = "data-faker")]
252 mockforge_data::provider::register_core_faker_provider();
253
254 let _latency_injector = latency_profile.map(|profile| {
255 mockforge_foundation::latency::LatencyInjector::new(profile, Default::default())
256 });
257
258 let registry = discover_services(&config).await?;
260 let registry_arc = Arc::new(registry);
261
262 let addr = mockforge_core::wildcard_socket_addr(port);
264 info!(
265 "Dynamic server listening on {} with {} services",
266 addr,
267 registry_arc.service_names().len()
268 );
269
270 let proxy_config = ProxyConfig::default();
272
273 let reflection_start = Instant::now();
275 let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
276 let reflection_duration = reflection_start.elapsed();
277 info!("gRPC reflection proxy created (took {:?})", reflection_duration);
278
279 let total_startup_duration = startup_start.elapsed();
280 info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
281
282 start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
286
287 Ok(())
290}
291
292pub async fn start_dynamic_grpc_server(
294 port: u16,
295 config: DynamicGrpcConfig,
296 latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
297) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
298 let mut grpc_only_config = config;
300 grpc_only_config.http_bridge = None;
301
302 start_dynamic_server(port, grpc_only_config, latency_profile).await
303}
304
305async fn start_grpc_only_server(
307 port: u16,
308 config: &DynamicGrpcConfig,
309 registry_arc: Arc<ServiceRegistry>,
310 _mock_proxy: MockReflectionProxy,
311) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
312 use tonic::transport::{Certificate, Identity, ServerTlsConfig};
313
314 let mut server_builder = if let Some(tls_config) = &config.tls {
316 info!("Configuring gRPC server with TLS");
317
318 let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
320 error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
321 Box::<dyn std::error::Error + Send + Sync>::from(format!(
322 "Failed to read TLS certificate: {}",
323 e
324 ))
325 })?;
326
327 let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
328 error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
329 Box::<dyn std::error::Error + Send + Sync>::from(format!(
330 "Failed to read TLS key: {}",
331 e
332 ))
333 })?;
334
335 let identity = Identity::from_pem(cert, key);
336
337 let mut tls = ServerTlsConfig::new().identity(identity);
338
339 if let Some(client_ca_path) = &tls_config.client_ca_path {
341 info!("Configuring mutual TLS (mTLS) with client certificate verification");
342 let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
343 error!("Failed to read client CA from {}: {}", client_ca_path, e);
344 Box::<dyn std::error::Error + Send + Sync>::from(format!(
345 "Failed to read client CA: {}",
346 e
347 ))
348 })?;
349 tls = tls.client_ca_root(Certificate::from_pem(client_ca));
350 }
351
352 Server::builder().tls_config(tls).map_err(|e| {
353 error!("Failed to configure TLS: {}", e);
354 Box::<dyn std::error::Error + Send + Sync>::from(format!(
355 "Failed to configure TLS: {}",
356 e
357 ))
358 })?
359 } else {
360 info!("gRPC server running in plaintext mode (no TLS configured)");
361 Server::builder()
362 };
363
364 use std::net::SocketAddr;
365
366 let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
367
368 info!(
370 "Starting gRPC server on {} with {} discovered services",
371 grpc_addr,
372 registry_arc.service_names().len()
373 );
374 for service_name in registry_arc.service_names() {
375 info!(" - Dynamic service: {}", service_name);
376 }
377
378 use crate::generated::greeter_server::{Greeter, GreeterServer};
380 use crate::generated::{HelloReply, HelloRequest};
381 use tonic::{Request, Response, Status};
382
383 #[derive(Debug, Default)]
384 struct MockGreeterService;
385
386 use futures::StreamExt;
387 use std::pin::Pin;
388 use tokio_stream::wrappers::ReceiverStream;
389
390 #[tonic::async_trait]
391 impl Greeter for MockGreeterService {
392 type SayHelloStreamStream =
393 Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
394 type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
395
396 async fn say_hello(
397 &self,
398 request: Request<HelloRequest>,
399 ) -> Result<Response<HelloReply>, Status> {
400 info!("gRPC say_hello request: {:?}", request);
401 let req = request.into_inner();
402 let reply = HelloReply {
403 message: format!("Hello {}! This is a mock response from MockForge", req.name),
404 metadata: None,
405 items: vec![],
406 };
407 Ok(Response::new(reply))
408 }
409
410 async fn say_hello_stream(
411 &self,
412 request: Request<HelloRequest>,
413 ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
414 info!("gRPC say_hello_stream request: {:?}", request);
415 let name = request.into_inner().name;
416 let (tx, rx) = tokio::sync::mpsc::channel(128);
417 tokio::spawn(async move {
418 for i in 1..=5 {
419 let reply = HelloReply {
420 message: format!(
421 "Hello {}! Stream message {} of 5 from MockForge",
422 name, i
423 ),
424 metadata: None,
425 items: vec![],
426 };
427 if tx.send(Ok(reply)).await.is_err() {
428 break;
429 }
430 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
431 }
432 });
433 let stream = ReceiverStream::new(rx);
434 Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
435 }
436
437 async fn say_hello_client_stream(
438 &self,
439 request: Request<tonic::Streaming<HelloRequest>>,
440 ) -> Result<Response<HelloReply>, Status> {
441 info!("gRPC say_hello_client_stream started");
442 let mut stream = request.into_inner();
443 let mut names = Vec::new();
444 let mut count = 0;
445 while let Some(req) = stream.next().await {
446 match req {
447 Ok(hello_request) => {
448 names.push(hello_request.name);
449 count += 1;
450 }
451 Err(e) => {
452 error!("Error receiving client stream message: {}", e);
453 return Err(Status::internal(format!("Stream error: {}", e)));
454 }
455 }
456 }
457 let message = if names.is_empty() {
458 "Hello! No names received in the stream.".to_string()
459 } else {
460 format!(
461 "Hello {}! Received {} messages from MockForge client stream.",
462 names.join(", "),
463 count
464 )
465 };
466 Ok(Response::new(HelloReply {
467 message,
468 metadata: None,
469 items: vec![],
470 }))
471 }
472
473 async fn chat(
474 &self,
475 request: Request<tonic::Streaming<HelloRequest>>,
476 ) -> Result<Response<Self::ChatStream>, Status> {
477 info!("gRPC chat (bidirectional streaming) started");
478 let mut stream = request.into_inner();
479 let (tx, rx) = tokio::sync::mpsc::channel(128);
480 tokio::spawn(async move {
481 let mut message_count = 0;
482 while let Some(req) = stream.next().await {
483 match req {
484 Ok(hello_request) => {
485 message_count += 1;
486 let reply = HelloReply {
487 message: format!(
488 "Chat response {}: Hello {}! from MockForge",
489 message_count, hello_request.name
490 ),
491 metadata: None,
492 items: vec![],
493 };
494 if tx.send(Ok(reply)).await.is_err() {
495 break;
496 }
497 }
498 Err(e) => {
499 error!("Chat stream error: {}", e);
500 let _ = tx
501 .send(Err(Status::internal(format!("Stream error: {}", e))))
502 .await;
503 break;
504 }
505 }
506 }
507 info!("Chat session ended after {} messages", message_count);
508 });
509 let output_stream = ReceiverStream::new(rx);
510 Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
511 }
512 }
513
514 let mut routes_builder = tonic::service::RoutesBuilder::default();
516 routes_builder.add_service(GreeterServer::new(MockGreeterService));
517 info!("Registered built-in Greeter service");
518
519 if config.enable_reflection {
521 let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
522 let reflection_service = ReflectionBuilder::configure()
523 .register_encoded_file_descriptor_set(&encoded_fd_set)
524 .build_v1()
525 .map_err(|e| {
526 error!("Failed to build reflection service: {}", e);
527 Box::<dyn std::error::Error + Send + Sync>::from(format!(
528 "Failed to build reflection service: {}",
529 e
530 ))
531 })?;
532 routes_builder.add_service(reflection_service);
533 info!("gRPC reflection service enabled");
534 }
535
536 let registry_for_fallback = registry_arc.clone();
538 let axum_router =
539 routes_builder
540 .routes()
541 .into_axum_router()
542 .fallback(move |req: axum::extract::Request| {
543 let registry = registry_for_fallback.clone();
544 async move {
545 let path = req.uri().path().to_string();
546 match router::parse_grpc_path(&path) {
547 Some((service_name, method_name)) => {
548 let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
550 .await
551 .unwrap_or_default();
552
553 match router::handle_dynamic_grpc_request(
554 ®istry,
555 service_name,
556 method_name,
557 body,
558 )
559 .await
560 {
561 Ok(response) => response,
562 Err(status) => router::create_grpc_error_response(status),
563 }
564 }
565 None => router::create_grpc_error_response(Status::unimplemented(
566 "Unknown path",
567 )),
568 }
569 }
570 });
571
572 let routes = tonic::service::Routes::from(axum_router);
574 server_builder.add_routes(routes).serve(grpc_addr).await?;
575
576 info!("gRPC server stopped");
577 Ok(())
578}
579
580#[cfg(test)]
583mod tests {
584 #[test]
585 fn test_module_compiles() {
586 }
588}