1pub mod http_bridge;
8pub mod proto_parser;
9pub mod service_generator;
10
11use crate::reflection::{MockReflectionProxy, ProxyConfig};
12use proto_parser::ProtoParser;
13use service_generator::DynamicGrpcService;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tonic::transport::Server;
17use tonic_reflection::server::Builder as ReflectionBuilder;
18use tracing::*;
19
20#[derive(Debug, Clone)]
22pub struct GrpcTlsConfig {
23 pub cert_path: String,
25 pub key_path: String,
27 pub client_ca_path: Option<String>,
29}
30
31impl GrpcTlsConfig {
32 pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
34 Self {
35 cert_path: cert_path.into(),
36 key_path: key_path.into(),
37 client_ca_path: None,
38 }
39 }
40
41 pub fn with_mtls(
43 cert_path: impl Into<String>,
44 key_path: impl Into<String>,
45 client_ca_path: impl Into<String>,
46 ) -> Self {
47 Self {
48 cert_path: cert_path.into(),
49 key_path: key_path.into(),
50 client_ca_path: Some(client_ca_path.into()),
51 }
52 }
53
54 pub fn from_env() -> Option<Self> {
61 let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
62 let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
63 let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
64
65 Some(Self {
66 cert_path,
67 key_path,
68 client_ca_path,
69 })
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct DynamicGrpcConfig {
76 pub proto_dir: String,
78 pub enable_reflection: bool,
80 pub excluded_services: Vec<String>,
82 pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
84 pub tls: Option<GrpcTlsConfig>,
86}
87
88impl Default for DynamicGrpcConfig {
89 fn default() -> Self {
90 Self {
91 proto_dir: "proto".to_string(),
92 enable_reflection: false,
93 excluded_services: Vec::new(),
94 http_bridge: Some(http_bridge::HttpBridgeConfig {
95 enabled: true,
96 ..Default::default()
97 }),
98 tls: GrpcTlsConfig::from_env(),
100 }
101 }
102}
103
104#[derive(Clone)]
106pub struct ServiceRegistry {
107 services: HashMap<String, Arc<DynamicGrpcService>>,
109 descriptor_pool: prost_reflect::DescriptorPool,
111}
112
113impl Default for ServiceRegistry {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119impl ServiceRegistry {
120 pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
122 &self.descriptor_pool
123 }
124
125 pub fn new() -> Self {
127 Self {
128 services: HashMap::new(),
129 descriptor_pool: prost_reflect::DescriptorPool::new(),
130 }
131 }
132
133 pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
135 Self {
136 services: HashMap::new(),
137 descriptor_pool,
138 }
139 }
140
141 pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
143 self.descriptor_pool = pool;
144 }
145
146 pub fn register(&mut self, name: String, service: DynamicGrpcService) {
148 self.services.insert(name, Arc::new(service));
149 }
150
151 pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
153 self.services.get(name)
154 }
155
156 pub fn service_names(&self) -> Vec<String> {
158 self.services.keys().cloned().collect()
159 }
160}
161
162pub async fn discover_services(
164 config: &DynamicGrpcConfig,
165) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
166 use std::time::Instant;
167
168 let discovery_start = Instant::now();
169 info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
170
171 let parse_start = Instant::now();
173 let mut parser = ProtoParser::new();
174 parser.parse_directory(&config.proto_dir).await?;
175 let parse_duration = parse_start.elapsed();
176 info!("Proto file parsing completed (took {:?})", parse_duration);
177
178 let registry_start = Instant::now();
180 let mut registry = ServiceRegistry::new();
181 let services = parser.services().clone();
183 let descriptor_pool = parser.into_pool();
184
185 registry.set_descriptor_pool(descriptor_pool);
186 let registry_duration = registry_start.elapsed();
187 debug!("Registry creation completed (took {:?})", registry_duration);
188
189 let service_reg_start = Instant::now();
191 for (service_name, proto_service) in services {
192 if config.excluded_services.contains(&service_name) {
194 info!("Skipping excluded service: {}", service_name);
195 continue;
196 }
197
198 let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
200 registry.register(service_name.clone(), dynamic_service);
201
202 debug!("Registered service: {}", service_name);
203 }
204 let service_reg_duration = service_reg_start.elapsed();
205 info!(
206 "Service registration completed for {} services (took {:?})",
207 registry.service_names().len(),
208 service_reg_duration
209 );
210
211 let total_discovery_duration = discovery_start.elapsed();
212 info!("Service discovery completed (total time: {:?})", total_discovery_duration);
213 Ok(registry)
214}
215
216pub async fn start_dynamic_server(
218 port: u16,
219 config: DynamicGrpcConfig,
220 latency_profile: Option<mockforge_core::LatencyProfile>,
221) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
222 use std::time::Instant;
223
224 let startup_start = Instant::now();
225
226 #[cfg(feature = "data-faker")]
227 mockforge_data::provider::register_core_faker_provider();
228
229 let _latency_injector = latency_profile
230 .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));
231
232 let registry = discover_services(&config).await?;
234 let registry_arc = Arc::new(registry);
235
236 let addr = mockforge_core::wildcard_socket_addr(port);
238 info!(
239 "Dynamic server listening on {} with {} services",
240 addr,
241 registry_arc.service_names().len()
242 );
243
244 let proxy_config = ProxyConfig::default();
246
247 let reflection_start = Instant::now();
249 let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
250 let reflection_duration = reflection_start.elapsed();
251 info!("gRPC reflection proxy created (took {:?})", reflection_duration);
252
253 let total_startup_duration = startup_start.elapsed();
254 info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
255
256 start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
260
261 Ok(())
264}
265
266pub async fn start_dynamic_grpc_server(
268 port: u16,
269 config: DynamicGrpcConfig,
270 latency_profile: Option<mockforge_core::LatencyProfile>,
271) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
272 let mut grpc_only_config = config;
274 grpc_only_config.http_bridge = None;
275
276 start_dynamic_server(port, grpc_only_config, latency_profile).await
277}
278
279async fn start_grpc_only_server(
281 port: u16,
282 config: &DynamicGrpcConfig,
283 registry_arc: Arc<ServiceRegistry>,
284 _mock_proxy: MockReflectionProxy,
285) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
286 use tonic::transport::{Certificate, Identity, ServerTlsConfig};
287
288 let mut server_builder = if let Some(tls_config) = &config.tls {
290 info!("Configuring gRPC server with TLS");
291
292 let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
294 error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
295 Box::<dyn std::error::Error + Send + Sync>::from(format!(
296 "Failed to read TLS certificate: {}",
297 e
298 ))
299 })?;
300
301 let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
302 error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
303 Box::<dyn std::error::Error + Send + Sync>::from(format!(
304 "Failed to read TLS key: {}",
305 e
306 ))
307 })?;
308
309 let identity = Identity::from_pem(cert, key);
310
311 let mut tls = ServerTlsConfig::new().identity(identity);
312
313 if let Some(client_ca_path) = &tls_config.client_ca_path {
315 info!("Configuring mutual TLS (mTLS) with client certificate verification");
316 let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
317 error!("Failed to read client CA from {}: {}", client_ca_path, e);
318 Box::<dyn std::error::Error + Send + Sync>::from(format!(
319 "Failed to read client CA: {}",
320 e
321 ))
322 })?;
323 tls = tls.client_ca_root(Certificate::from_pem(client_ca));
324 }
325
326 Server::builder().tls_config(tls).map_err(|e| {
327 error!("Failed to configure TLS: {}", e);
328 Box::<dyn std::error::Error + Send + Sync>::from(format!(
329 "Failed to configure TLS: {}",
330 e
331 ))
332 })?
333 } else {
334 info!("gRPC server running in plaintext mode (no TLS configured)");
335 Server::builder()
336 };
337
338 info!(
340 "Starting gRPC server on {} with {} discovered services",
341 mockforge_core::wildcard_socket_addr(port),
342 registry_arc.service_names().len()
343 );
344
345 for service_name in registry_arc.service_names() {
347 info!(" - Service: {}", service_name);
348 }
349
350 use std::net::SocketAddr;
353
354 let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
355
356 info!("gRPC server listening on {} (basic implementation)", grpc_addr);
357 info!("Discovered services are logged but not yet fully implemented:");
358 for service_name in registry_arc.service_names() {
359 info!(" - {}", service_name);
360 }
361
362 use crate::generated::greeter_server::{Greeter, GreeterServer};
364 use crate::generated::{HelloReply, HelloRequest};
365 use tonic::{Request, Response, Status};
366
367 #[derive(Debug, Default)]
369 pub struct MockGreeterService;
370
371 use futures::StreamExt;
372 use std::pin::Pin;
373 use tokio_stream::wrappers::ReceiverStream;
374
375 #[tonic::async_trait]
376 impl Greeter for MockGreeterService {
377 type SayHelloStreamStream =
378 Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
379 type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
380
381 async fn say_hello(
382 &self,
383 request: Request<HelloRequest>,
384 ) -> Result<Response<HelloReply>, Status> {
385 info!("gRPC say_hello request: {:?}", request);
386
387 let req = request.into_inner();
388 let reply = HelloReply {
389 message: format!("Hello {}! This is a mock response from MockForge", req.name),
390 metadata: None,
391 items: vec![],
392 };
393
394 Ok(Response::new(reply))
395 }
396
397 async fn say_hello_stream(
399 &self,
400 request: Request<HelloRequest>,
401 ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
402 info!("gRPC say_hello_stream request: {:?}", request);
403 let req = request.into_inner();
404 let name = req.name.clone();
405
406 let (tx, rx) = tokio::sync::mpsc::channel(128);
408
409 tokio::spawn(async move {
411 for i in 1..=5 {
412 let reply = HelloReply {
413 message: format!(
414 "Hello {}! Stream message {} of 5 from MockForge",
415 name, i
416 ),
417 metadata: None,
418 items: vec![],
419 };
420
421 if tx.send(Ok(reply)).await.is_err() {
422 break;
424 }
425
426 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
428 }
429 });
430
431 let stream = ReceiverStream::new(rx);
432 Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
433 }
434
435 async fn say_hello_client_stream(
437 &self,
438 request: Request<tonic::Streaming<HelloRequest>>,
439 ) -> Result<Response<HelloReply>, Status> {
440 info!("gRPC say_hello_client_stream started");
441
442 let mut stream = request.into_inner();
443 let mut names = Vec::new();
444 let mut count = 0;
445
446 while let Some(req) = stream.next().await {
448 match req {
449 Ok(hello_request) => {
450 info!("Received client stream message: {:?}", hello_request);
451 names.push(hello_request.name);
452 count += 1;
453 }
454 Err(e) => {
455 error!("Error receiving client stream message: {}", e);
456 return Err(Status::internal(format!("Stream error: {}", e)));
457 }
458 }
459 }
460
461 let message = if names.is_empty() {
463 "Hello! No names received in the stream.".to_string()
464 } else {
465 format!(
466 "Hello {}! Received {} messages from MockForge client stream.",
467 names.join(", "),
468 count
469 )
470 };
471
472 let reply = HelloReply {
473 message,
474 metadata: None,
475 items: vec![],
476 };
477
478 Ok(Response::new(reply))
479 }
480
481 async fn chat(
483 &self,
484 request: Request<tonic::Streaming<HelloRequest>>,
485 ) -> Result<Response<Self::ChatStream>, Status> {
486 info!("gRPC chat (bidirectional streaming) started");
487
488 let mut stream = request.into_inner();
489 let (tx, rx) = tokio::sync::mpsc::channel(128);
490
491 tokio::spawn(async move {
493 let mut message_count = 0;
494
495 while let Some(req) = stream.next().await {
496 match req {
497 Ok(hello_request) => {
498 message_count += 1;
499 info!("Chat received: {:?}", hello_request);
500
501 let reply = HelloReply {
502 message: format!(
503 "Chat response {}: Hello {}! from MockForge",
504 message_count, hello_request.name
505 ),
506 metadata: None,
507 items: vec![],
508 };
509
510 if tx.send(Ok(reply)).await.is_err() {
511 break;
513 }
514 }
515 Err(e) => {
516 error!("Chat stream error: {}", e);
517 let _ = tx
518 .send(Err(Status::internal(format!("Stream error: {}", e))))
519 .await;
520 break;
521 }
522 }
523 }
524
525 info!("Chat session ended after {} messages", message_count);
526 });
527
528 let output_stream = ReceiverStream::new(rx);
529 Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
530 }
531 }
532
533 let greeter = MockGreeterService;
534
535 info!("gRPC server listening on {} with Greeter service", grpc_addr);
536
537 let mut router = server_builder.add_service(GreeterServer::new(greeter));
539
540 if config.enable_reflection {
542 let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
544 let reflection_service = ReflectionBuilder::configure()
545 .register_encoded_file_descriptor_set(&encoded_fd_set)
546 .build_v1()
547 .map_err(|e| {
548 error!("Failed to build reflection service: {}", e);
549 Box::<dyn std::error::Error + Send + Sync>::from(format!(
550 "Failed to build reflection service: {}",
551 e
552 ))
553 })?;
554
555 router = router.add_service(reflection_service);
556 info!("gRPC reflection service enabled");
557 }
558
559 router.serve(grpc_addr).await?;
560
561 Ok(())
562}
563
564#[cfg(test)]
567mod tests {
568 #[test]
569 fn test_module_compiles() {
570 }
572}