mockforge-grpc 0.3.107

gRPC protocol support for MockForge
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
//! Dynamic gRPC service discovery and registration
//!
//! This module provides functionality to dynamically discover and register
//! gRPC services from proto files, making the gRPC mock system flexible
//! for different applications and developers.

pub mod http_bridge;
pub mod proto_parser;
pub mod router;
pub mod service_generator;

use crate::reflection::{MockReflectionProxy, ProxyConfig};
use proto_parser::ProtoParser;
use service_generator::DynamicGrpcService;
use std::collections::HashMap;
use std::sync::Arc;
use tonic::transport::Server;
use tonic_reflection::server::Builder as ReflectionBuilder;
use tracing::*;

/// TLS configuration for gRPC server
#[derive(Debug, Clone)]
pub struct GrpcTlsConfig {
    /// Path to the TLS certificate file (PEM format)
    pub cert_path: String,
    /// Path to the TLS private key file (PEM format)
    pub key_path: String,
    /// Optional path to CA certificate for client certificate verification (mTLS)
    pub client_ca_path: Option<String>,
}

impl GrpcTlsConfig {
    /// Create a new TLS configuration
    pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
        Self {
            cert_path: cert_path.into(),
            key_path: key_path.into(),
            client_ca_path: None,
        }
    }

    /// Create TLS configuration with mutual TLS (client certificate verification)
    pub fn with_mtls(
        cert_path: impl Into<String>,
        key_path: impl Into<String>,
        client_ca_path: impl Into<String>,
    ) -> Self {
        Self {
            cert_path: cert_path.into(),
            key_path: key_path.into(),
            client_ca_path: Some(client_ca_path.into()),
        }
    }

    /// Create TLS configuration from environment variables
    ///
    /// Uses:
    /// - GRPC_TLS_CERT: Path to certificate file
    /// - GRPC_TLS_KEY: Path to private key file
    /// - GRPC_TLS_CLIENT_CA: Optional path to client CA for mTLS
    pub fn from_env() -> Option<Self> {
        let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
        let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
        let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();

        Some(Self {
            cert_path,
            key_path,
            client_ca_path,
        })
    }
}

/// Configuration for dynamic gRPC service discovery
#[derive(Debug, Clone)]
pub struct DynamicGrpcConfig {
    /// Directory containing proto files
    pub proto_dir: String,
    /// Whether to enable reflection
    pub enable_reflection: bool,
    /// Services to exclude from discovery
    pub excluded_services: Vec<String>,
    /// HTTP bridge configuration
    pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
    /// TLS configuration (None for plaintext)
    pub tls: Option<GrpcTlsConfig>,
}

impl Default for DynamicGrpcConfig {
    fn default() -> Self {
        Self {
            proto_dir: "proto".to_string(),
            enable_reflection: false,
            excluded_services: Vec::new(),
            http_bridge: Some(http_bridge::HttpBridgeConfig {
                enabled: true,
                ..Default::default()
            }),
            // Check for TLS configuration from environment
            tls: GrpcTlsConfig::from_env(),
        }
    }
}

/// A registry of discovered gRPC services
#[derive(Clone)]
pub struct ServiceRegistry {
    /// Map of service names to their implementations
    services: HashMap<String, Arc<DynamicGrpcService>>,
    /// Descriptor pool containing parsed proto definitions
    descriptor_pool: prost_reflect::DescriptorPool,
}

impl Default for ServiceRegistry {
    fn default() -> Self {
        Self::new()
    }
}

impl ServiceRegistry {
    /// Get the descriptor pool
    pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
        &self.descriptor_pool
    }

    /// Create a new service registry
    pub fn new() -> Self {
        Self {
            services: HashMap::new(),
            descriptor_pool: prost_reflect::DescriptorPool::new(),
        }
    }

    /// Create a service registry with a descriptor pool
    pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
        Self {
            services: HashMap::new(),
            descriptor_pool,
        }
    }

    /// Set the descriptor pool (useful when building registry incrementally)
    pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
        self.descriptor_pool = pool;
    }

    /// Register a service implementation
    pub fn register(&mut self, name: String, service: DynamicGrpcService) {
        self.services.insert(name, Arc::new(service));
    }

    /// Get a service by name
    pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
        self.services.get(name)
    }

    /// List all registered service names
    pub fn service_names(&self) -> Vec<String> {
        self.services.keys().cloned().collect()
    }
}

/// Discover and register services from proto files
pub async fn discover_services(
    config: &DynamicGrpcConfig,
) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
    use std::time::Instant;

    let discovery_start = Instant::now();
    info!("Discovering gRPC services from proto directory: {}", config.proto_dir);

    // Parse proto files
    let parse_start = Instant::now();
    let mut parser = ProtoParser::new();
    parser.parse_directory(&config.proto_dir).await?;
    let parse_duration = parse_start.elapsed();
    info!("Proto file parsing completed (took {:?})", parse_duration);

    // Create registry with the descriptor pool from the parser
    let registry_start = Instant::now();
    let mut registry = ServiceRegistry::new();
    // Extract services from parser and move descriptor pool
    let services = parser.services().clone();
    let descriptor_pool = parser.into_pool();

    registry.set_descriptor_pool(descriptor_pool);
    let registry_duration = registry_start.elapsed();
    debug!("Registry creation completed (took {:?})", registry_duration);

    // Create dynamic services from parsed proto definitions
    let service_reg_start = Instant::now();
    for (service_name, proto_service) in services {
        // Skip excluded services
        if config.excluded_services.contains(&service_name) {
            info!("Skipping excluded service: {}", service_name);
            continue;
        }

        // Create dynamic service
        let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
        registry.register(service_name.clone(), dynamic_service);

        debug!("Registered service: {}", service_name);
    }
    let service_reg_duration = service_reg_start.elapsed();
    info!(
        "Service registration completed for {} services (took {:?})",
        registry.service_names().len(),
        service_reg_duration
    );

    let total_discovery_duration = discovery_start.elapsed();
    info!("Service discovery completed (total time: {:?})", total_discovery_duration);
    Ok(registry)
}

/// Start a dynamic server with both gRPC and HTTP bridge support
pub async fn start_dynamic_server(
    port: u16,
    config: DynamicGrpcConfig,
    latency_profile: Option<mockforge_core::LatencyProfile>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    use std::time::Instant;

    let startup_start = Instant::now();

    #[cfg(feature = "data-faker")]
    mockforge_data::provider::register_core_faker_provider();

    let _latency_injector = latency_profile
        .map(|profile| mockforge_core::latency::LatencyInjector::new(profile, Default::default()));

    // Discover services
    let registry = discover_services(&config).await?;
    let registry_arc = Arc::new(registry);

    // Use shared server utilities for consistent address creation
    let addr = mockforge_core::wildcard_socket_addr(port);
    info!(
        "Dynamic server listening on {} with {} services",
        addr,
        registry_arc.service_names().len()
    );

    // Create proxy configuration
    let proxy_config = ProxyConfig::default();

    // Create mock reflection proxy
    let reflection_start = Instant::now();
    let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
    let reflection_duration = reflection_start.elapsed();
    info!("gRPC reflection proxy created (took {:?})", reflection_duration);

    let total_startup_duration = startup_start.elapsed();
    info!("gRPC server startup completed (total time: {:?})", total_startup_duration);

    // Start HTTP server (bridge) if enabled
    // Currently, just start the gRPC server directly
    // HTTP bridge functionality is disabled
    start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;

    // HTTP bridge is disabled, no server handle to wait for

    Ok(())
}

/// Start a gRPC-only server (for backward compatibility)
pub async fn start_dynamic_grpc_server(
    port: u16,
    config: DynamicGrpcConfig,
    latency_profile: Option<mockforge_core::LatencyProfile>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Disable HTTP bridge
    let mut grpc_only_config = config;
    grpc_only_config.http_bridge = None;

    start_dynamic_server(port, grpc_only_config, latency_profile).await
}

/// Start the gRPC-only server implementation
async fn start_grpc_only_server(
    port: u16,
    config: &DynamicGrpcConfig,
    registry_arc: Arc<ServiceRegistry>,
    _mock_proxy: MockReflectionProxy,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    use tonic::transport::{Certificate, Identity, ServerTlsConfig};

    // Create server builder with optional TLS
    let mut server_builder = if let Some(tls_config) = &config.tls {
        info!("Configuring gRPC server with TLS");

        // Read certificate and key files
        let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
            error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
            Box::<dyn std::error::Error + Send + Sync>::from(format!(
                "Failed to read TLS certificate: {}",
                e
            ))
        })?;

        let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
            error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
            Box::<dyn std::error::Error + Send + Sync>::from(format!(
                "Failed to read TLS key: {}",
                e
            ))
        })?;

        let identity = Identity::from_pem(cert, key);

        let mut tls = ServerTlsConfig::new().identity(identity);

        // Add client CA for mTLS if configured
        if let Some(client_ca_path) = &tls_config.client_ca_path {
            info!("Configuring mutual TLS (mTLS) with client certificate verification");
            let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
                error!("Failed to read client CA from {}: {}", client_ca_path, e);
                Box::<dyn std::error::Error + Send + Sync>::from(format!(
                    "Failed to read client CA: {}",
                    e
                ))
            })?;
            tls = tls.client_ca_root(Certificate::from_pem(client_ca));
        }

        Server::builder().tls_config(tls).map_err(|e| {
            error!("Failed to configure TLS: {}", e);
            Box::<dyn std::error::Error + Send + Sync>::from(format!(
                "Failed to configure TLS: {}",
                e
            ))
        })?
    } else {
        info!("gRPC server running in plaintext mode (no TLS configured)");
        Server::builder()
    };

    use std::net::SocketAddr;

    let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);

    // Log discovered services
    info!(
        "Starting gRPC server on {} with {} discovered services",
        grpc_addr,
        registry_arc.service_names().len()
    );
    for service_name in registry_arc.service_names() {
        info!("  - Dynamic service: {}", service_name);
    }

    // Build the built-in Greeter service
    use crate::generated::greeter_server::{Greeter, GreeterServer};
    use crate::generated::{HelloReply, HelloRequest};
    use tonic::{Request, Response, Status};

    #[derive(Debug, Default)]
    struct MockGreeterService;

    use futures::StreamExt;
    use std::pin::Pin;
    use tokio_stream::wrappers::ReceiverStream;

    #[tonic::async_trait]
    impl Greeter for MockGreeterService {
        type SayHelloStreamStream =
            Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
        type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;

        async fn say_hello(
            &self,
            request: Request<HelloRequest>,
        ) -> Result<Response<HelloReply>, Status> {
            info!("gRPC say_hello request: {:?}", request);
            let req = request.into_inner();
            let reply = HelloReply {
                message: format!("Hello {}! This is a mock response from MockForge", req.name),
                metadata: None,
                items: vec![],
            };
            Ok(Response::new(reply))
        }

        async fn say_hello_stream(
            &self,
            request: Request<HelloRequest>,
        ) -> Result<Response<Self::SayHelloStreamStream>, Status> {
            info!("gRPC say_hello_stream request: {:?}", request);
            let name = request.into_inner().name;
            let (tx, rx) = tokio::sync::mpsc::channel(128);
            tokio::spawn(async move {
                for i in 1..=5 {
                    let reply = HelloReply {
                        message: format!(
                            "Hello {}! Stream message {} of 5 from MockForge",
                            name, i
                        ),
                        metadata: None,
                        items: vec![],
                    };
                    if tx.send(Ok(reply)).await.is_err() {
                        break;
                    }
                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                }
            });
            let stream = ReceiverStream::new(rx);
            Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
        }

        async fn say_hello_client_stream(
            &self,
            request: Request<tonic::Streaming<HelloRequest>>,
        ) -> Result<Response<HelloReply>, Status> {
            info!("gRPC say_hello_client_stream started");
            let mut stream = request.into_inner();
            let mut names = Vec::new();
            let mut count = 0;
            while let Some(req) = stream.next().await {
                match req {
                    Ok(hello_request) => {
                        names.push(hello_request.name);
                        count += 1;
                    }
                    Err(e) => {
                        error!("Error receiving client stream message: {}", e);
                        return Err(Status::internal(format!("Stream error: {}", e)));
                    }
                }
            }
            let message = if names.is_empty() {
                "Hello! No names received in the stream.".to_string()
            } else {
                format!(
                    "Hello {}! Received {} messages from MockForge client stream.",
                    names.join(", "),
                    count
                )
            };
            Ok(Response::new(HelloReply {
                message,
                metadata: None,
                items: vec![],
            }))
        }

        async fn chat(
            &self,
            request: Request<tonic::Streaming<HelloRequest>>,
        ) -> Result<Response<Self::ChatStream>, Status> {
            info!("gRPC chat (bidirectional streaming) started");
            let mut stream = request.into_inner();
            let (tx, rx) = tokio::sync::mpsc::channel(128);
            tokio::spawn(async move {
                let mut message_count = 0;
                while let Some(req) = stream.next().await {
                    match req {
                        Ok(hello_request) => {
                            message_count += 1;
                            let reply = HelloReply {
                                message: format!(
                                    "Chat response {}: Hello {}! from MockForge",
                                    message_count, hello_request.name
                                ),
                                metadata: None,
                                items: vec![],
                            };
                            if tx.send(Ok(reply)).await.is_err() {
                                break;
                            }
                        }
                        Err(e) => {
                            error!("Chat stream error: {}", e);
                            let _ = tx
                                .send(Err(Status::internal(format!("Stream error: {}", e))))
                                .await;
                            break;
                        }
                    }
                }
                info!("Chat session ended after {} messages", message_count);
            });
            let output_stream = ReceiverStream::new(rx);
            Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
        }
    }

    // Build tonic Routes with built-in services
    let mut routes_builder = tonic::service::RoutesBuilder::default();
    routes_builder.add_service(GreeterServer::new(MockGreeterService));
    info!("Registered built-in Greeter service");

    // Add reflection service if enabled
    if config.enable_reflection {
        let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
        let reflection_service = ReflectionBuilder::configure()
            .register_encoded_file_descriptor_set(&encoded_fd_set)
            .build_v1()
            .map_err(|e| {
                error!("Failed to build reflection service: {}", e);
                Box::<dyn std::error::Error + Send + Sync>::from(format!(
                    "Failed to build reflection service: {}",
                    e
                ))
            })?;
        routes_builder.add_service(reflection_service);
        info!("gRPC reflection service enabled");
    }

    // Convert to axum router and add dynamic service fallback
    let registry_for_fallback = registry_arc.clone();
    let axum_router =
        routes_builder
            .routes()
            .into_axum_router()
            .fallback(move |req: axum::extract::Request| {
                let registry = registry_for_fallback.clone();
                async move {
                    let path = req.uri().path().to_string();
                    match router::parse_grpc_path(&path) {
                        Some((service_name, method_name)) => {
                            // Collect the request body (limit to 4MB)
                            let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
                                .await
                                .unwrap_or_default();

                            match router::handle_dynamic_grpc_request(
                                &registry,
                                service_name,
                                method_name,
                                body,
                            )
                            .await
                            {
                                Ok(response) => response,
                                Err(status) => router::create_grpc_error_response(status),
                            }
                        }
                        None => router::create_grpc_error_response(Status::unimplemented(
                            "Unknown path",
                        )),
                    }
                }
            });

    // Convert back to tonic Routes and serve with the configured server
    let routes = tonic::service::Routes::from(axum_router);
    server_builder.add_routes(routes).serve(grpc_addr).await?;

    info!("gRPC server stopped");
    Ok(())
}

// start_combined_server removed - was a stub that was never implemented

#[cfg(test)]
mod tests {
    #[test]
    fn test_module_compiles() {
        // Verify this module's types and imports are valid
    }
}