1pub mod http_bridge;
8pub mod proto_parser;
9pub mod router;
10pub mod service_generator;
11
12use crate::reflection::{MockReflectionProxy, ProxyConfig};
13use proto_parser::ProtoParser;
14use service_generator::DynamicGrpcService;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic_reflection::server::Builder as ReflectionBuilder;
19use tracing::*;
20
21#[derive(Debug, Clone)]
23pub struct GrpcTlsConfig {
24 pub cert_path: String,
26 pub key_path: String,
28 pub client_ca_path: Option<String>,
30}
31
32impl GrpcTlsConfig {
33 pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
35 Self {
36 cert_path: cert_path.into(),
37 key_path: key_path.into(),
38 client_ca_path: None,
39 }
40 }
41
42 pub fn with_mtls(
44 cert_path: impl Into<String>,
45 key_path: impl Into<String>,
46 client_ca_path: impl Into<String>,
47 ) -> Self {
48 Self {
49 cert_path: cert_path.into(),
50 key_path: key_path.into(),
51 client_ca_path: Some(client_ca_path.into()),
52 }
53 }
54
55 pub fn from_env() -> Option<Self> {
62 let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
63 let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
64 let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
65
66 Some(Self {
67 cert_path,
68 key_path,
69 client_ca_path,
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct DynamicGrpcConfig {
77 pub proto_dir: String,
79 pub enable_reflection: bool,
81 pub excluded_services: Vec<String>,
83 pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
85 pub tls: Option<GrpcTlsConfig>,
87}
88
89impl Default for DynamicGrpcConfig {
90 fn default() -> Self {
91 Self {
92 proto_dir: "proto".to_string(),
93 enable_reflection: false,
94 excluded_services: Vec::new(),
95 http_bridge: Some(http_bridge::HttpBridgeConfig {
96 enabled: true,
97 ..Default::default()
98 }),
99 tls: GrpcTlsConfig::from_env(),
101 }
102 }
103}
104
105#[derive(Clone)]
107pub struct ServiceRegistry {
108 services: HashMap<String, Arc<DynamicGrpcService>>,
110 descriptor_pool: prost_reflect::DescriptorPool,
112}
113
114impl Default for ServiceRegistry {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl ServiceRegistry {
121 pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
123 &self.descriptor_pool
124 }
125
126 pub fn new() -> Self {
128 Self {
129 services: HashMap::new(),
130 descriptor_pool: prost_reflect::DescriptorPool::new(),
131 }
132 }
133
134 pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
136 Self {
137 services: HashMap::new(),
138 descriptor_pool,
139 }
140 }
141
142 pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
144 self.descriptor_pool = pool;
145 }
146
147 pub fn register(&mut self, name: String, service: DynamicGrpcService) {
149 self.services.insert(name, Arc::new(service));
150 }
151
152 pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
154 self.services.get(name)
155 }
156
157 pub fn service_names(&self) -> Vec<String> {
159 self.services.keys().cloned().collect()
160 }
161}
162
163pub async fn discover_services(
165 config: &DynamicGrpcConfig,
166) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
167 use std::time::Instant;
168
169 let discovery_start = Instant::now();
170 info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
171
172 let parse_start = Instant::now();
174 let mut parser = ProtoParser::new();
175 parser.parse_directory(&config.proto_dir).await?;
176 let parse_duration = parse_start.elapsed();
177 info!("Proto file parsing completed (took {:?})", parse_duration);
178
179 let registry_start = Instant::now();
181 let mut registry = ServiceRegistry::new();
182 let services = parser.services().clone();
184 let descriptor_pool = parser.into_pool();
185
186 registry.set_descriptor_pool(descriptor_pool);
187 let registry_duration = registry_start.elapsed();
188 debug!("Registry creation completed (took {:?})", registry_duration);
189
190 let service_reg_start = Instant::now();
192 for (service_name, proto_service) in services {
193 if config.excluded_services.contains(&service_name) {
195 info!("Skipping excluded service: {}", service_name);
196 continue;
197 }
198
199 let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
201 registry.register(service_name.clone(), dynamic_service);
202
203 debug!("Registered service: {}", service_name);
204 }
205 let service_reg_duration = service_reg_start.elapsed();
206 info!(
207 "Service registration completed for {} services (took {:?})",
208 registry.service_names().len(),
209 service_reg_duration
210 );
211
212 let total_discovery_duration = discovery_start.elapsed();
213 info!("Service discovery completed (total time: {:?})", total_discovery_duration);
214 Ok(registry)
215}
216
217pub async fn start_dynamic_server(
219 port: u16,
220 config: DynamicGrpcConfig,
221 latency_profile: Option<mockforge_core::LatencyProfile>,
222) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
223 use std::time::Instant;
224
225 let startup_start = Instant::now();
226
227 #[cfg(feature = "data-faker")]
228 mockforge_data::provider::register_core_faker_provider();
229
230 let _latency_injector = latency_profile
231 .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
232
233 let registry = discover_services(&config).await?;
235 let registry_arc = Arc::new(registry);
236
237 let addr = mockforge_core::wildcard_socket_addr(port);
239 info!(
240 "Dynamic server listening on {} with {} services",
241 addr,
242 registry_arc.service_names().len()
243 );
244
245 let proxy_config = ProxyConfig::default();
247
248 let reflection_start = Instant::now();
250 let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
251 let reflection_duration = reflection_start.elapsed();
252 info!("gRPC reflection proxy created (took {:?})", reflection_duration);
253
254 let total_startup_duration = startup_start.elapsed();
255 info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
256
257 start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
261
262 Ok(())
265}
266
267pub async fn start_dynamic_grpc_server(
269 port: u16,
270 config: DynamicGrpcConfig,
271 latency_profile: Option<mockforge_core::LatencyProfile>,
272) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
273 let mut grpc_only_config = config;
275 grpc_only_config.http_bridge = None;
276
277 start_dynamic_server(port, grpc_only_config, latency_profile).await
278}
279
280async fn start_grpc_only_server(
282 port: u16,
283 config: &DynamicGrpcConfig,
284 registry_arc: Arc<ServiceRegistry>,
285 _mock_proxy: MockReflectionProxy,
286) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
287 use tonic::transport::{Certificate, Identity, ServerTlsConfig};
288
289 let mut server_builder = if let Some(tls_config) = &config.tls {
291 info!("Configuring gRPC server with TLS");
292
293 let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
295 error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
296 Box::<dyn std::error::Error + Send + Sync>::from(format!(
297 "Failed to read TLS certificate: {}",
298 e
299 ))
300 })?;
301
302 let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
303 error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
304 Box::<dyn std::error::Error + Send + Sync>::from(format!(
305 "Failed to read TLS key: {}",
306 e
307 ))
308 })?;
309
310 let identity = Identity::from_pem(cert, key);
311
312 let mut tls = ServerTlsConfig::new().identity(identity);
313
314 if let Some(client_ca_path) = &tls_config.client_ca_path {
316 info!("Configuring mutual TLS (mTLS) with client certificate verification");
317 let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
318 error!("Failed to read client CA from {}: {}", client_ca_path, e);
319 Box::<dyn std::error::Error + Send + Sync>::from(format!(
320 "Failed to read client CA: {}",
321 e
322 ))
323 })?;
324 tls = tls.client_ca_root(Certificate::from_pem(client_ca));
325 }
326
327 Server::builder().tls_config(tls).map_err(|e| {
328 error!("Failed to configure TLS: {}", e);
329 Box::<dyn std::error::Error + Send + Sync>::from(format!(
330 "Failed to configure TLS: {}",
331 e
332 ))
333 })?
334 } else {
335 info!("gRPC server running in plaintext mode (no TLS configured)");
336 Server::builder()
337 };
338
339 use std::net::SocketAddr;
340
341 let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
342
343 info!(
345 "Starting gRPC server on {} with {} discovered services",
346 grpc_addr,
347 registry_arc.service_names().len()
348 );
349 for service_name in registry_arc.service_names() {
350 info!(" - Dynamic service: {}", service_name);
351 }
352
353 use crate::generated::greeter_server::{Greeter, GreeterServer};
355 use crate::generated::{HelloReply, HelloRequest};
356 use tonic::{Request, Response, Status};
357
358 #[derive(Debug, Default)]
359 struct MockGreeterService;
360
361 use futures::StreamExt;
362 use std::pin::Pin;
363 use tokio_stream::wrappers::ReceiverStream;
364
365 #[tonic::async_trait]
366 impl Greeter for MockGreeterService {
367 type SayHelloStreamStream =
368 Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
369 type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
370
371 async fn say_hello(
372 &self,
373 request: Request<HelloRequest>,
374 ) -> Result<Response<HelloReply>, Status> {
375 info!("gRPC say_hello request: {:?}", request);
376 let req = request.into_inner();
377 let reply = HelloReply {
378 message: format!("Hello {}! This is a mock response from MockForge", req.name),
379 metadata: None,
380 items: vec![],
381 };
382 Ok(Response::new(reply))
383 }
384
385 async fn say_hello_stream(
386 &self,
387 request: Request<HelloRequest>,
388 ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
389 info!("gRPC say_hello_stream request: {:?}", request);
390 let name = request.into_inner().name;
391 let (tx, rx) = tokio::sync::mpsc::channel(128);
392 tokio::spawn(async move {
393 for i in 1..=5 {
394 let reply = HelloReply {
395 message: format!(
396 "Hello {}! Stream message {} of 5 from MockForge",
397 name, i
398 ),
399 metadata: None,
400 items: vec![],
401 };
402 if tx.send(Ok(reply)).await.is_err() {
403 break;
404 }
405 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
406 }
407 });
408 let stream = ReceiverStream::new(rx);
409 Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
410 }
411
412 async fn say_hello_client_stream(
413 &self,
414 request: Request<tonic::Streaming<HelloRequest>>,
415 ) -> Result<Response<HelloReply>, Status> {
416 info!("gRPC say_hello_client_stream started");
417 let mut stream = request.into_inner();
418 let mut names = Vec::new();
419 let mut count = 0;
420 while let Some(req) = stream.next().await {
421 match req {
422 Ok(hello_request) => {
423 names.push(hello_request.name);
424 count += 1;
425 }
426 Err(e) => {
427 error!("Error receiving client stream message: {}", e);
428 return Err(Status::internal(format!("Stream error: {}", e)));
429 }
430 }
431 }
432 let message = if names.is_empty() {
433 "Hello! No names received in the stream.".to_string()
434 } else {
435 format!(
436 "Hello {}! Received {} messages from MockForge client stream.",
437 names.join(", "),
438 count
439 )
440 };
441 Ok(Response::new(HelloReply {
442 message,
443 metadata: None,
444 items: vec![],
445 }))
446 }
447
448 async fn chat(
449 &self,
450 request: Request<tonic::Streaming<HelloRequest>>,
451 ) -> Result<Response<Self::ChatStream>, Status> {
452 info!("gRPC chat (bidirectional streaming) started");
453 let mut stream = request.into_inner();
454 let (tx, rx) = tokio::sync::mpsc::channel(128);
455 tokio::spawn(async move {
456 let mut message_count = 0;
457 while let Some(req) = stream.next().await {
458 match req {
459 Ok(hello_request) => {
460 message_count += 1;
461 let reply = HelloReply {
462 message: format!(
463 "Chat response {}: Hello {}! from MockForge",
464 message_count, hello_request.name
465 ),
466 metadata: None,
467 items: vec![],
468 };
469 if tx.send(Ok(reply)).await.is_err() {
470 break;
471 }
472 }
473 Err(e) => {
474 error!("Chat stream error: {}", e);
475 let _ = tx
476 .send(Err(Status::internal(format!("Stream error: {}", e))))
477 .await;
478 break;
479 }
480 }
481 }
482 info!("Chat session ended after {} messages", message_count);
483 });
484 let output_stream = ReceiverStream::new(rx);
485 Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
486 }
487 }
488
489 let mut routes_builder = tonic::service::RoutesBuilder::default();
491 routes_builder.add_service(GreeterServer::new(MockGreeterService));
492 info!("Registered built-in Greeter service");
493
494 if config.enable_reflection {
496 let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
497 let reflection_service = ReflectionBuilder::configure()
498 .register_encoded_file_descriptor_set(&encoded_fd_set)
499 .build_v1()
500 .map_err(|e| {
501 error!("Failed to build reflection service: {}", e);
502 Box::<dyn std::error::Error + Send + Sync>::from(format!(
503 "Failed to build reflection service: {}",
504 e
505 ))
506 })?;
507 routes_builder.add_service(reflection_service);
508 info!("gRPC reflection service enabled");
509 }
510
511 let registry_for_fallback = registry_arc.clone();
513 let axum_router =
514 routes_builder
515 .routes()
516 .into_axum_router()
517 .fallback(move |req: axum::extract::Request| {
518 let registry = registry_for_fallback.clone();
519 async move {
520 let path = req.uri().path().to_string();
521 match router::parse_grpc_path(&path) {
522 Some((service_name, method_name)) => {
523 let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
525 .await
526 .unwrap_or_default();
527
528 match router::handle_dynamic_grpc_request(
529 ®istry,
530 service_name,
531 method_name,
532 body,
533 )
534 .await
535 {
536 Ok(response) => response,
537 Err(status) => router::create_grpc_error_response(status),
538 }
539 }
540 None => router::create_grpc_error_response(Status::unimplemented(
541 "Unknown path",
542 )),
543 }
544 }
545 });
546
547 let routes = tonic::service::Routes::from(axum_router);
549 server_builder.add_routes(routes).serve(grpc_addr).await?;
550
551 info!("gRPC server stopped");
552 Ok(())
553}
554
555#[cfg(test)]
558mod tests {
559 #[test]
560 fn test_module_compiles() {
561 }
563}