Skip to main content

mockforge_grpc/dynamic/http_bridge/
mod.rs

1//! HTTP to gRPC bridge implementation
2//!
3//! This module provides functionality to bridge HTTP requests to gRPC services,
4//! allowing RESTful APIs to be generated dynamically from protobuf definitions.
5
6pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12    body::Bytes,
13    extract::{Path, Query, State},
14    http::Method,
15    response::{IntoResponse, Json},
16    routing::{get, post},
17    Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30/// Type alias for the bridge handler function to reduce type complexity
31type BridgeHandlerFn = dyn Fn(
32        State<Arc<HttpBridge>>,
33        Path<HashMap<String, String>>,
34        Query<BridgeQuery>,
35        Bytes,
36    ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37    + Send
38    + Sync;
39
40/// Parameters for bridge request handling
41struct BridgeRequestParams<'a> {
42    proxy: &'a MockReflectionProxy,
43    converter: &'a ProtobufJsonConverter,
44    service_name: &'a str,
45    method_name: &'a str,
46    server_streaming: bool,
47    body: Bytes,
48}
49
50/// Configuration for the HTTP bridge
51#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53    /// Whether the HTTP bridge is enabled
54    pub enabled: bool,
55    /// Base path for HTTP routes (e.g., "/api")
56    pub base_path: String,
57    /// Whether to enable CORS
58    pub enable_cors: bool,
59    /// Maximum request size in bytes
60    pub max_request_size: usize,
61    /// Timeout for bridge requests in seconds
62    pub timeout_seconds: u64,
63    /// Path pattern for service routes (e.g., "/{service}/{method}")
64    pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68    fn default() -> Self {
69        Self {
70            enabled: true,
71            base_path: "/api".to_string(),
72            enable_cors: true,
73            max_request_size: 10 * 1024 * 1024, // 10MB
74            timeout_seconds: 30,
75            route_pattern: "/{service}/{method}".to_string(),
76        }
77    }
78}
79
80/// Query parameters for HTTP requests
81#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83    /// Streaming mode (none, server, client, bidirectional)
84    #[serde(default)]
85    pub stream: Option<String>,
86    /// Metadata to pass to gRPC call as key=value pairs
87    #[serde(flatten)]
88    pub metadata: HashMap<String, String>,
89}
90
91/// HTTP response wrapper
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94    /// Whether the request was successful
95    pub success: bool,
96    /// The response data
97    pub data: Option<T>,
98    /// Error message if success is false
99    pub error: Option<String>,
100    /// Metadata from the gRPC response
101    pub metadata: HashMap<String, String>,
102}
103
104/// Statistics about the HTTP bridge
105#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107    /// Number of requests served
108    pub requests_served: u64,
109    /// Number of successful requests
110    pub requests_successful: u64,
111    /// Number of failed requests
112    pub requests_failed: u64,
113    /// Services available via the bridge
114    pub available_services: Vec<String>,
115}
116
117/// The HTTP bridge that provides RESTful API access to gRPC services
118pub struct HttpBridge {
119    /// The reflection proxy that handles gRPC calls
120    proxy: Arc<MockReflectionProxy>,
121    /// Route generator for creating HTTP routes
122    route_generator: RouteGenerator,
123    /// JSON to protobuf converter
124    converter: ProtobufJsonConverter,
125    /// Bridge configuration
126    config: HttpBridgeConfig,
127    /// Statistics
128    stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132    /// Create a new HTTP bridge
133    pub fn new(
134        proxy: Arc<MockReflectionProxy>,
135        config: HttpBridgeConfig,
136    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137        let route_generator = RouteGenerator::new(config.clone());
138        let converter =
139            ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140        let available_services = proxy.service_names();
141
142        let stats = BridgeStats {
143            requests_served: 0,
144            requests_successful: 0,
145            requests_failed: 0,
146            available_services,
147        };
148
149        Ok(Self {
150            proxy,
151            route_generator,
152            converter,
153            config,
154            stats: Arc::new(std::sync::Mutex::new(stats)),
155        })
156    }
157
158    /// Create the HTTP router with all bridge routes
159    pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160        let mut router = Router::new();
161
162        // Add CORS if enabled
163        if self.config.enable_cors {
164            router = router.layer(
165                CorsLayer::new()
166                    .allow_methods([
167                        Method::GET,
168                        Method::POST,
169                        Method::PUT,
170                        Method::DELETE,
171                        Method::PATCH,
172                    ])
173                    .allow_headers(Any)
174                    .allow_origin(Any),
175            );
176        }
177
178        // Add state containing self reference
179        let bridge = Arc::new(self.clone());
180        router = router.with_state(bridge);
181
182        // Add health check endpoint
183        router =
184            router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186        // Add statistics endpoint
187        router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189        // Add services listing endpoint
190        router =
191            router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193        // Add OpenAPI documentation endpoint
194        router =
195            router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197        // Create dynamic bridge endpoints for all registered services
198        let registry = self.proxy.service_registry();
199
200        // Add a generic route that handles all service/method combinations
201        // The route pattern supports both GET (for streaming) and POST (for unary) requests
202        router =
203            router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204        router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206        let available_services = registry.service_names();
207        let total_methods =
208            registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209        info!(
210            "Created HTTP bridge router with {} services and {} dynamic endpoints",
211            available_services.len(),
212            total_methods
213        );
214
215        router
216    }
217
218    /// Health check handler
219    async fn health_check(State(_bridge): State<Arc<HttpBridge>>) -> Json<Value> {
220        Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
221    }
222
223    /// Get statistics handler
224    async fn get_stats(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
225        // Handle poisoned mutex gracefully - if mutex is poisoned, use default stats
226        let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
227            warn!("Statistics mutex is poisoned, using default values");
228            poisoned.into_inner()
229        });
230        Json(serde_json::json!({
231            "requests_served": stats.requests_served,
232            "requests_successful": stats.requests_successful,
233            "requests_failed": stats.requests_failed,
234            "available_services": stats.available_services
235        }))
236    }
237
238    /// List services handler
239    async fn list_services(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
240        Self::list_services_static(&bridge).await
241    }
242
243    /// Get OpenAPI spec handler
244    async fn get_openapi_spec(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
245        Self::get_openapi_spec_static(&bridge).await
246    }
247
248    /// Generic bridge request handler that routes to specific services/methods
249    async fn handle_generic_bridge_request(
250        State(state): State<Arc<HttpBridge>>,
251        Path(path_params): Path<HashMap<String, String>>,
252        _query: Query<BridgeQuery>,
253        body: Bytes,
254    ) -> axum::response::Response {
255        // Extract service and method from path parameters
256        let service_name = match path_params.get("service") {
257            Some(name) => name,
258            None => {
259                let error_response = BridgeResponse::<Value> {
260                    success: false,
261                    data: None,
262                    error: Some("Missing 'service' parameter in path".to_string()),
263                    metadata: HashMap::new(),
264                };
265                return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
266            }
267        };
268
269        let method_name = match path_params.get("method") {
270            Some(name) => name,
271            None => {
272                let error_response = BridgeResponse::<Value> {
273                    success: false,
274                    data: None,
275                    error: Some("Missing 'method' parameter in path".to_string()),
276                    metadata: HashMap::new(),
277                };
278                return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
279            }
280        };
281
282        // Get method information from the registry
283        let registry = state.proxy.service_registry();
284        let service_opt = registry.get(service_name);
285        let method_info = if let Some(service) = service_opt {
286            service.service().methods.iter().find(|m| m.name == *method_name)
287        } else {
288            let error_response = BridgeResponse::<Value> {
289                success: false,
290                data: None,
291                error: Some(format!("Service '{}' not found", service_name)),
292                metadata: HashMap::new(),
293            };
294            return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
295        };
296
297        let method_info = match method_info {
298            Some(method) => method,
299            None => {
300                let error_response = BridgeResponse::<Value> {
301                    success: false,
302                    data: None,
303                    error: Some(format!(
304                        "Method '{}' not found in service '{}'",
305                        method_name, service_name
306                    )),
307                    metadata: HashMap::new(),
308                };
309                return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
310            }
311        };
312
313        // Update stats - handle poisoned mutex gracefully
314        {
315            if let Ok(mut stats) = state.stats.lock() {
316                stats.requests_served += 1;
317            } else {
318                warn!("Failed to update request stats (mutex poisoned)");
319            }
320        }
321
322        // Handle the request
323        let params = BridgeRequestParams {
324            proxy: &state.proxy,
325            converter: &state.converter,
326            service_name: service_name.as_str(),
327            method_name: method_name.as_str(),
328            server_streaming: method_info.server_streaming,
329            body,
330        };
331        let result = Self::handle_bridge_request(&params).await;
332
333        match result {
334            Ok(response) => {
335                // Update successful stats - handle poisoned mutex gracefully
336                {
337                    if let Ok(mut stats) = state.stats.lock() {
338                        stats.requests_successful += 1;
339                    } else {
340                        warn!("Failed to update success stats (mutex poisoned)");
341                    }
342                }
343                (http::StatusCode::OK, Json(response)).into_response()
344            }
345            Err(err) => {
346                // Update failed stats - handle poisoned mutex gracefully
347                {
348                    if let Ok(mut stats) = state.stats.lock() {
349                        stats.requests_failed += 1;
350                    } else {
351                        warn!("Failed to update failure stats (mutex poisoned)");
352                    }
353                }
354                warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
355                let error_response = BridgeResponse::<Value> {
356                    success: false,
357                    data: None,
358                    error: Some(err.to_string()),
359                    metadata: HashMap::new(),
360                };
361                (http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
362            }
363        }
364    }
365
366    /// Create a handler function for a specific gRPC method
367    ///
368    /// Reserved for dynamic HTTP bridge handler creation paths.
369    #[allow(dead_code)] // Retained for planned handler-factory wiring.
370    fn create_bridge_handler(
371        &self,
372        service_name: String,
373        method_name: String,
374        _client_streaming: bool,
375        server_streaming: bool,
376    ) -> Box<BridgeHandlerFn> {
377        Box::new(
378            move |state: State<Arc<Self>>,
379                  _path: Path<HashMap<String, String>>,
380                  _query: Query<BridgeQuery>,
381                  body: Bytes| {
382                let service_name = service_name.clone();
383                let method_name = method_name.clone();
384                let stats = state.stats.clone();
385                let proxy = state.proxy.clone();
386                let converter = state.converter.clone();
387
388                Box::pin(async move {
389                    // Update stats - handle poisoned mutex gracefully
390                    {
391                        if let Ok(mut stats) = stats.lock() {
392                            stats.requests_served += 1;
393                        } else {
394                            warn!("Failed to update request stats (mutex poisoned)");
395                        }
396                    }
397
398                    // Handle the request
399                    let params = BridgeRequestParams {
400                        proxy: &proxy,
401                        converter: &converter,
402                        service_name: service_name.as_str(),
403                        method_name: method_name.as_str(),
404                        server_streaming,
405                        body,
406                    };
407                    let result = Self::handle_bridge_request(&params).await;
408
409                    match result {
410                        Ok(response) => {
411                            // Update successful stats - handle poisoned mutex gracefully
412                            {
413                                if let Ok(mut stats) = stats.lock() {
414                                    stats.requests_successful += 1;
415                                } else {
416                                    warn!("Failed to update success stats (mutex poisoned)");
417                                }
418                            }
419                            (http::StatusCode::OK, Json(response)).into_response()
420                        }
421                        Err(err) => {
422                            // Update failed stats - handle poisoned mutex gracefully
423                            {
424                                if let Ok(mut stats) = stats.lock() {
425                                    stats.requests_failed += 1;
426                                } else {
427                                    warn!("Failed to update failure stats (mutex poisoned)");
428                                }
429                            }
430                            warn!(
431                                "Bridge request failed for {}.{}: {}",
432                                service_name, method_name, err
433                            );
434                            let error_response = BridgeResponse::<Value> {
435                                success: false,
436                                data: None,
437                                error: Some(err.to_string()),
438                                metadata: HashMap::new(),
439                            };
440                            (http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
441                                .into_response()
442                        }
443                    }
444                })
445            },
446        )
447    }
448
449    /// Get bridge statistics (static method for handler)
450    ///
451    /// Reserved for HTTP bridge admin endpoint stats integration.
452    #[allow(dead_code)] // Retained for planned bridge stats endpoint wiring.
453    async fn get_stats_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
454        // Handle poisoned mutex gracefully - if mutex is poisoned, use default stats
455        let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
456            warn!("Statistics mutex is poisoned, using default values");
457            poisoned.into_inner()
458        });
459        Json(serde_json::json!({
460            "requests_served": stats.requests_served,
461            "requests_successful": stats.requests_successful,
462            "requests_failed": stats.requests_failed,
463            "available_services": stats.available_services
464        }))
465    }
466
467    /// List available services (static method for handler)
468    async fn list_services_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
469        let services = bridge.proxy.service_names();
470        Json(serde_json::json!({
471            "services": services
472        }))
473    }
474
475    /// Get OpenAPI spec (static method for handler)
476    async fn get_openapi_spec_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
477        use crate::dynamic::proto_parser::ProtoService;
478        use std::collections::HashMap;
479
480        // Extract services from the service registry
481        let services: HashMap<String, ProtoService> = bridge
482            .proxy
483            .service_registry()
484            .services
485            .iter()
486            .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
487            .collect();
488
489        // Generate OpenAPI spec using the route generator
490        let spec = bridge.route_generator.generate_openapi_spec(&services);
491        Json(spec)
492    }
493
494    /// Handle a bridge request by calling the appropriate gRPC method
495    async fn handle_bridge_request(
496        params: &BridgeRequestParams<'_>,
497    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
498        debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
499
500        // Parse JSON request body
501        let json_request: Value = if params.body.is_empty() {
502            Value::Null
503        } else {
504            serde_json::from_slice(&params.body).map_err(|e| {
505                Box::<dyn std::error::Error + Send + Sync>::from(format!(
506                    "Failed to parse JSON request: {}",
507                    e
508                ))
509            })?
510        };
511
512        // Call appropriate gRPC method based on streaming type
513        if params.server_streaming {
514            // Handle streaming response
515            Self::handle_streaming_request(
516                params.proxy,
517                params.converter,
518                params.service_name,
519                params.method_name,
520                json_request,
521            )
522            .await
523        } else {
524            // Handle unary request
525            Self::handle_unary_request(
526                params.proxy,
527                params.converter,
528                params.service_name,
529                params.method_name,
530                json_request,
531            )
532            .await
533        }
534    }
535
536    /// Handle unary request (no streaming)
537    async fn handle_unary_request(
538        proxy: &MockReflectionProxy,
539        _converter: &ProtobufJsonConverter,
540        service_name: &str,
541        method_name: &str,
542        json_request: Value,
543    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
544        // Get method descriptor from the service registry
545        let registry = proxy.service_registry();
546        let service_registry = registry.clone();
547
548        // Find the service and method
549        let service = match service_registry.get(service_name) {
550            Some(s) => s,
551            None => {
552                return Err(format!("Service '{}' not found", service_name).into());
553            }
554        };
555
556        let method = match service.service().methods.iter().find(|m| m.name == method_name) {
557            Some(m) => m,
558            None => {
559                return Err(format!(
560                    "Method '{}' not found in service '{}'",
561                    method_name, service_name
562                )
563                .into());
564            }
565        };
566
567        // Use method for future implementation
568        let _method = method;
569
570        // For now, create a generic response since we don't have full descriptor integration
571        // In a complete implementation, this would:
572        // 1. Get input/output descriptor from proto parser
573        // 2. Convert JSON to protobuf message
574        // 3. Call the actual gRPC method via proxy
575        // 4. Convert protobuf response back to JSON
576
577        // Create a mock response for demonstration
578        let json_response = serde_json::json!({
579            "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
580            "request_data": json_request,
581            "timestamp": chrono::Utc::now().to_rfc3339()
582        });
583
584        Ok(BridgeResponse {
585            success: true,
586            data: Some(json_response),
587            error: None,
588            metadata: HashMap::new(),
589        })
590    }
591
592    /// Handle streaming request (returns SSE stream)
593    async fn handle_streaming_request(
594        proxy: &MockReflectionProxy,
595        _converter: &ProtobufJsonConverter,
596        service_name: &str,
597        method_name: &str,
598        json_request: Value,
599    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
600        // SSE/websocket streaming is not wired yet, but provide a deterministic
601        // JSON stream payload so HTTP clients can exercise streaming contracts.
602        let mut events = Vec::new();
603        for seq in 0..3 {
604            events.push(serde_json::json!({
605                "sequence": seq + 1,
606                "service": service_name,
607                "method": method_name,
608                "timestamp": chrono::Utc::now().to_rfc3339(),
609                "data": {
610                    "message": format!("mock stream event {} from {}.{}", seq + 1, service_name, method_name),
611                    "request_echo": json_request.clone()
612                }
613            }));
614        }
615
616        let mut metadata = HashMap::new();
617        metadata.insert("x-mockforge-streaming-mode".to_string(), "json-envelope".to_string());
618        metadata.insert("x-mockforge-stream-count".to_string(), "3".to_string());
619        metadata.insert(
620            "x-mockforge-service-count".to_string(),
621            proxy.service_names().len().to_string(),
622        );
623
624        Ok(BridgeResponse {
625            success: true,
626            data: Some(serde_json::json!({
627                "stream_type": "server",
628                "service": service_name,
629                "method": method_name,
630                "events": events
631            })),
632            error: None,
633            metadata,
634        })
635    }
636}
637
638impl Clone for HttpBridge {
639    fn clone(&self) -> Self {
640        Self {
641            proxy: self.proxy.clone(),
642            route_generator: self.route_generator.clone(),
643            converter: self.converter.clone(),
644            config: self.config.clone(),
645            stats: self.stats.clone(),
646        }
647    }
648}
649
650#[cfg(test)]
651mod tests {
652    #[test]
653    fn test_module_compiles() {
654        // Verify this module's types and imports are valid
655    }
656}