1pub mod http_bridge;
8pub mod proto_parser;
9pub mod service_generator;
10
11use crate::reflection::{MockReflectionProxy, ProxyConfig};
12use proto_parser::ProtoParser;
13use service_generator::DynamicGrpcService;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tonic::transport::Server;
17use tonic_reflection::server::Builder as ReflectionBuilder;
18use tracing::*;
19
20#[derive(Debug, Clone)]
22pub struct GrpcTlsConfig {
23 pub cert_path: String,
25 pub key_path: String,
27 pub client_ca_path: Option<String>,
29}
30
31impl GrpcTlsConfig {
32 pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
34 Self {
35 cert_path: cert_path.into(),
36 key_path: key_path.into(),
37 client_ca_path: None,
38 }
39 }
40
41 pub fn with_mtls(
43 cert_path: impl Into<String>,
44 key_path: impl Into<String>,
45 client_ca_path: impl Into<String>,
46 ) -> Self {
47 Self {
48 cert_path: cert_path.into(),
49 key_path: key_path.into(),
50 client_ca_path: Some(client_ca_path.into()),
51 }
52 }
53
54 pub fn from_env() -> Option<Self> {
61 let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
62 let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
63 let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
64
65 Some(Self {
66 cert_path,
67 key_path,
68 client_ca_path,
69 })
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct DynamicGrpcConfig {
76 pub proto_dir: String,
78 pub enable_reflection: bool,
80 pub excluded_services: Vec<String>,
82 pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
84 pub tls: Option<GrpcTlsConfig>,
86}
87
88impl Default for DynamicGrpcConfig {
89 fn default() -> Self {
90 Self {
91 proto_dir: "proto".to_string(),
92 enable_reflection: false,
93 excluded_services: Vec::new(),
94 http_bridge: Some(http_bridge::HttpBridgeConfig {
95 enabled: true,
96 ..Default::default()
97 }),
98 tls: GrpcTlsConfig::from_env(),
100 }
101 }
102}
103
104#[derive(Clone)]
106pub struct ServiceRegistry {
107 services: HashMap<String, Arc<DynamicGrpcService>>,
109 descriptor_pool: prost_reflect::DescriptorPool,
111}
112
113impl Default for ServiceRegistry {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119impl ServiceRegistry {
120 pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
122 &self.descriptor_pool
123 }
124
125 pub fn new() -> Self {
127 Self {
128 services: HashMap::new(),
129 descriptor_pool: prost_reflect::DescriptorPool::new(),
130 }
131 }
132
133 pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
135 Self {
136 services: HashMap::new(),
137 descriptor_pool,
138 }
139 }
140
141 pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
143 self.descriptor_pool = pool;
144 }
145
146 pub fn register(&mut self, name: String, service: DynamicGrpcService) {
148 self.services.insert(name, Arc::new(service));
149 }
150
151 pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
153 self.services.get(name)
154 }
155
156 pub fn service_names(&self) -> Vec<String> {
158 self.services.keys().cloned().collect()
159 }
160}
161
162pub async fn discover_services(
164 config: &DynamicGrpcConfig,
165) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
166 use std::time::Instant;
167
168 let discovery_start = Instant::now();
169 info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
170
171 let parse_start = Instant::now();
173 let mut parser = ProtoParser::new();
174 parser.parse_directory(&config.proto_dir).await?;
175 let parse_duration = parse_start.elapsed();
176 info!("Proto file parsing completed (took {:?})", parse_duration);
177
178 let registry_start = Instant::now();
180 let mut registry = ServiceRegistry::new();
181 let services = parser.services().clone();
183 let descriptor_pool = parser.into_pool();
184
185 registry.set_descriptor_pool(descriptor_pool);
186 let registry_duration = registry_start.elapsed();
187 debug!("Registry creation completed (took {:?})", registry_duration);
188
189 let service_reg_start = Instant::now();
191 for (service_name, proto_service) in services {
192 if config.excluded_services.contains(&service_name) {
194 info!("Skipping excluded service: {}", service_name);
195 continue;
196 }
197
198 let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
200 registry.register(service_name.clone(), dynamic_service);
201
202 debug!("Registered service: {}", service_name);
203 }
204 let service_reg_duration = service_reg_start.elapsed();
205 info!(
206 "Service registration completed for {} services (took {:?})",
207 registry.service_names().len(),
208 service_reg_duration
209 );
210
211 let total_discovery_duration = discovery_start.elapsed();
212 info!("Service discovery completed (total time: {:?})", total_discovery_duration);
213 Ok(registry)
214}
215
216pub async fn start_dynamic_server(
218 port: u16,
219 config: DynamicGrpcConfig,
220 latency_profile: Option<mockforge_core::LatencyProfile>,
221) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
222 use std::path::Path;
223 use std::time::Instant;
224
225 let proto_path = Path::new(&config.proto_dir);
227 if !proto_path.exists() {
228 info!(
229 "Proto directory '{}' does not exist. gRPC server will not start. \
230 This is normal when using only HTTP/OpenAPI mocking.",
231 config.proto_dir
232 );
233 return Ok(());
235 }
236
237 let startup_start = Instant::now();
238
239 #[cfg(feature = "data-faker")]
240 mockforge_data::provider::register_core_faker_provider();
241
242 let _latency_injector = latency_profile
243 .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
244
245 let registry = discover_services(&config).await?;
247 let registry_arc = Arc::new(registry);
248
249 let addr = mockforge_core::wildcard_socket_addr(port);
251 info!(
252 "Dynamic server listening on {} with {} services",
253 addr,
254 registry_arc.service_names().len()
255 );
256
257 let proxy_config = ProxyConfig::default();
259
260 let reflection_start = Instant::now();
262 let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
263 let reflection_duration = reflection_start.elapsed();
264 info!("gRPC reflection proxy created (took {:?})", reflection_duration);
265
266 let total_startup_duration = startup_start.elapsed();
267 info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
268
269 start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
273
274 Ok(())
277}
278
279pub async fn start_dynamic_grpc_server(
281 port: u16,
282 config: DynamicGrpcConfig,
283 latency_profile: Option<mockforge_core::LatencyProfile>,
284) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
285 let mut grpc_only_config = config;
287 grpc_only_config.http_bridge = None;
288
289 start_dynamic_server(port, grpc_only_config, latency_profile).await
290}
291
292async fn start_grpc_only_server(
294 port: u16,
295 config: &DynamicGrpcConfig,
296 registry_arc: Arc<ServiceRegistry>,
297 _mock_proxy: MockReflectionProxy,
298) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
299 use tonic::transport::{Certificate, Identity, ServerTlsConfig};
300
301 let mut server_builder = if let Some(tls_config) = &config.tls {
303 info!("Configuring gRPC server with TLS");
304
305 let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
307 error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
308 Box::<dyn std::error::Error + Send + Sync>::from(format!(
309 "Failed to read TLS certificate: {}",
310 e
311 ))
312 })?;
313
314 let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
315 error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
316 Box::<dyn std::error::Error + Send + Sync>::from(format!(
317 "Failed to read TLS key: {}",
318 e
319 ))
320 })?;
321
322 let identity = Identity::from_pem(cert, key);
323
324 let mut tls = ServerTlsConfig::new().identity(identity);
325
326 if let Some(client_ca_path) = &tls_config.client_ca_path {
328 info!("Configuring mutual TLS (mTLS) with client certificate verification");
329 let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
330 error!("Failed to read client CA from {}: {}", client_ca_path, e);
331 Box::<dyn std::error::Error + Send + Sync>::from(format!(
332 "Failed to read client CA: {}",
333 e
334 ))
335 })?;
336 tls = tls.client_ca_root(Certificate::from_pem(client_ca));
337 }
338
339 Server::builder().tls_config(tls).map_err(|e| {
340 error!("Failed to configure TLS: {}", e);
341 Box::<dyn std::error::Error + Send + Sync>::from(format!(
342 "Failed to configure TLS: {}",
343 e
344 ))
345 })?
346 } else {
347 info!("gRPC server running in plaintext mode (no TLS configured)");
348 Server::builder()
349 };
350
351 info!(
353 "Starting gRPC server on {} with {} discovered services",
354 mockforge_core::wildcard_socket_addr(port),
355 registry_arc.service_names().len()
356 );
357
358 for service_name in registry_arc.service_names() {
360 info!(" - Service: {}", service_name);
361 }
362
363 use std::net::SocketAddr;
366
367 let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
368
369 info!("gRPC server listening on {} (basic implementation)", grpc_addr);
370 info!("Discovered services are logged but not yet fully implemented:");
371 for service_name in registry_arc.service_names() {
372 info!(" - {}", service_name);
373 }
374
375 use crate::generated::greeter_server::{Greeter, GreeterServer};
377 use crate::generated::{HelloReply, HelloRequest};
378 use tonic::{Request, Response, Status};
379
380 #[derive(Debug, Default)]
382 pub struct MockGreeterService;
383
384 use futures::StreamExt;
385 use std::pin::Pin;
386 use tokio_stream::wrappers::ReceiverStream;
387
388 #[tonic::async_trait]
389 impl Greeter for MockGreeterService {
390 type SayHelloStreamStream =
391 Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
392 type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
393
394 async fn say_hello(
395 &self,
396 request: Request<HelloRequest>,
397 ) -> Result<Response<HelloReply>, Status> {
398 info!("gRPC say_hello request: {:?}", request);
399
400 let req = request.into_inner();
401 let reply = HelloReply {
402 message: format!("Hello {}! This is a mock response from MockForge", req.name),
403 metadata: None,
404 items: vec![],
405 };
406
407 Ok(Response::new(reply))
408 }
409
410 async fn say_hello_stream(
412 &self,
413 request: Request<HelloRequest>,
414 ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
415 info!("gRPC say_hello_stream request: {:?}", request);
416 let req = request.into_inner();
417 let name = req.name.clone();
418
419 let (tx, rx) = tokio::sync::mpsc::channel(128);
421
422 tokio::spawn(async move {
424 for i in 1..=5 {
425 let reply = HelloReply {
426 message: format!(
427 "Hello {}! Stream message {} of 5 from MockForge",
428 name, i
429 ),
430 metadata: None,
431 items: vec![],
432 };
433
434 if tx.send(Ok(reply)).await.is_err() {
435 break;
437 }
438
439 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
441 }
442 });
443
444 let stream = ReceiverStream::new(rx);
445 Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
446 }
447
448 async fn say_hello_client_stream(
450 &self,
451 request: Request<tonic::Streaming<HelloRequest>>,
452 ) -> Result<Response<HelloReply>, Status> {
453 info!("gRPC say_hello_client_stream started");
454
455 let mut stream = request.into_inner();
456 let mut names = Vec::new();
457 let mut count = 0;
458
459 while let Some(req) = stream.next().await {
461 match req {
462 Ok(hello_request) => {
463 info!("Received client stream message: {:?}", hello_request);
464 names.push(hello_request.name);
465 count += 1;
466 }
467 Err(e) => {
468 error!("Error receiving client stream message: {}", e);
469 return Err(Status::internal(format!("Stream error: {}", e)));
470 }
471 }
472 }
473
474 let message = if names.is_empty() {
476 "Hello! No names received in the stream.".to_string()
477 } else {
478 format!(
479 "Hello {}! Received {} messages from MockForge client stream.",
480 names.join(", "),
481 count
482 )
483 };
484
485 let reply = HelloReply {
486 message,
487 metadata: None,
488 items: vec![],
489 };
490
491 Ok(Response::new(reply))
492 }
493
494 async fn chat(
496 &self,
497 request: Request<tonic::Streaming<HelloRequest>>,
498 ) -> Result<Response<Self::ChatStream>, Status> {
499 info!("gRPC chat (bidirectional streaming) started");
500
501 let mut stream = request.into_inner();
502 let (tx, rx) = tokio::sync::mpsc::channel(128);
503
504 tokio::spawn(async move {
506 let mut message_count = 0;
507
508 while let Some(req) = stream.next().await {
509 match req {
510 Ok(hello_request) => {
511 message_count += 1;
512 info!("Chat received: {:?}", hello_request);
513
514 let reply = HelloReply {
515 message: format!(
516 "Chat response {}: Hello {}! from MockForge",
517 message_count, hello_request.name
518 ),
519 metadata: None,
520 items: vec![],
521 };
522
523 if tx.send(Ok(reply)).await.is_err() {
524 break;
526 }
527 }
528 Err(e) => {
529 error!("Chat stream error: {}", e);
530 let _ = tx
531 .send(Err(Status::internal(format!("Stream error: {}", e))))
532 .await;
533 break;
534 }
535 }
536 }
537
538 info!("Chat session ended after {} messages", message_count);
539 });
540
541 let output_stream = ReceiverStream::new(rx);
542 Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
543 }
544 }
545
546 let greeter = MockGreeterService;
547
548 info!("gRPC server listening on {} with Greeter service", grpc_addr);
549
550 let mut router = server_builder.add_service(GreeterServer::new(greeter));
552
553 if config.enable_reflection {
555 let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
557 let reflection_service = ReflectionBuilder::configure()
558 .register_encoded_file_descriptor_set(&encoded_fd_set)
559 .build_v1()
560 .map_err(|e| {
561 error!("Failed to build reflection service: {}", e);
562 Box::<dyn std::error::Error + Send + Sync>::from(format!(
563 "Failed to build reflection service: {}",
564 e
565 ))
566 })?;
567
568 router = router.add_service(reflection_service);
569 info!("gRPC reflection service enabled");
570 }
571
572 router.serve(grpc_addr).await?;
573
574 Ok(())
575}
576
577#[cfg(test)]
580mod tests {
581
582 #[test]
583 fn test_module_compiles() {}
584}