mockforge_grpc/dynamic/http_bridge/
mod.rs

1//! HTTP to gRPC bridge implementation
2//!
3//! This module provides functionality to bridge HTTP requests to gRPC services,
4//! allowing RESTful APIs to be generated dynamically from protobuf definitions.
5
6pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12    body::Bytes,
13    extract::{Path, Query, State},
14    http::Method,
15    response::{IntoResponse, Json},
16    routing::{get, post},
17    Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30/// Type alias for the bridge handler function to reduce type complexity
31type BridgeHandlerFn = dyn Fn(
32        State<Arc<HttpBridge>>,
33        Path<HashMap<String, String>>,
34        Query<BridgeQuery>,
35        Bytes,
36    ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37    + Send
38    + Sync;
39
40/// Parameters for bridge request handling
41struct BridgeRequestParams<'a> {
42    proxy: &'a MockReflectionProxy,
43    converter: &'a ProtobufJsonConverter,
44    service_name: &'a str,
45    method_name: &'a str,
46    server_streaming: bool,
47    body: Bytes,
48}
49
50/// Configuration for the HTTP bridge
51#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53    /// Whether the HTTP bridge is enabled
54    pub enabled: bool,
55    /// Base path for HTTP routes (e.g., "/api")
56    pub base_path: String,
57    /// Whether to enable CORS
58    pub enable_cors: bool,
59    /// Maximum request size in bytes
60    pub max_request_size: usize,
61    /// Timeout for bridge requests in seconds
62    pub timeout_seconds: u64,
63    /// Path pattern for service routes (e.g., "/{service}/{method}")
64    pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68    fn default() -> Self {
69        Self {
70            enabled: true,
71            base_path: "/api".to_string(),
72            enable_cors: true,
73            max_request_size: 10 * 1024 * 1024, // 10MB
74            timeout_seconds: 30,
75            route_pattern: "/{service}/{method}".to_string(),
76        }
77    }
78}
79
80/// Query parameters for HTTP requests
81#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83    /// Streaming mode (none, server, client, bidirectional)
84    #[serde(default)]
85    pub stream: Option<String>,
86    /// Metadata to pass to gRPC call as key=value pairs
87    #[serde(flatten)]
88    pub metadata: HashMap<String, String>,
89}
90
91/// HTTP response wrapper
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94    /// Whether the request was successful
95    pub success: bool,
96    /// The response data
97    pub data: Option<T>,
98    /// Error message if success is false
99    pub error: Option<String>,
100    /// Metadata from the gRPC response
101    pub metadata: HashMap<String, String>,
102}
103
104/// Statistics about the HTTP bridge
105#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107    /// Number of requests served
108    pub requests_served: u64,
109    /// Number of successful requests
110    pub requests_successful: u64,
111    /// Number of failed requests
112    pub requests_failed: u64,
113    /// Services available via the bridge
114    pub available_services: Vec<String>,
115}
116
117/// The HTTP bridge that provides RESTful API access to gRPC services
118pub struct HttpBridge {
119    /// The reflection proxy that handles gRPC calls
120    proxy: Arc<MockReflectionProxy>,
121    /// Route generator for creating HTTP routes
122    route_generator: RouteGenerator,
123    /// JSON to protobuf converter
124    converter: ProtobufJsonConverter,
125    /// Bridge configuration
126    config: HttpBridgeConfig,
127    /// Statistics
128    stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132    /// Create a new HTTP bridge
133    pub fn new(
134        proxy: Arc<MockReflectionProxy>,
135        config: HttpBridgeConfig,
136    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137        let route_generator = RouteGenerator::new(config.clone());
138        let converter =
139            ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140        let available_services = proxy.service_names();
141
142        let stats = BridgeStats {
143            requests_served: 0,
144            requests_successful: 0,
145            requests_failed: 0,
146            available_services,
147        };
148
149        Ok(Self {
150            proxy,
151            route_generator,
152            converter,
153            config,
154            stats: Arc::new(std::sync::Mutex::new(stats)),
155        })
156    }
157
158    /// Create the HTTP router with all bridge routes
159    pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160        let mut router = Router::new();
161
162        // Add CORS if enabled
163        if self.config.enable_cors {
164            router = router.layer(
165                CorsLayer::new()
166                    .allow_methods([
167                        Method::GET,
168                        Method::POST,
169                        Method::PUT,
170                        Method::DELETE,
171                        Method::PATCH,
172                    ])
173                    .allow_headers(Any)
174                    .allow_origin(Any),
175            );
176        }
177
178        // Add state containing self reference
179        let bridge = Arc::new(self.clone());
180        router = router.with_state(bridge);
181
182        // Add health check endpoint
183        router =
184            router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186        // Add statistics endpoint
187        router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189        // Add services listing endpoint
190        router =
191            router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193        // Add OpenAPI documentation endpoint
194        router =
195            router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197        // Create dynamic bridge endpoints for all registered services
198        let registry = self.proxy.service_registry();
199
200        // Add a generic route that handles all service/method combinations
201        // The route pattern supports both GET (for streaming) and POST (for unary) requests
202        router =
203            router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204        router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206        let available_services = registry.service_names();
207        let total_methods =
208            registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209        info!(
210            "Created HTTP bridge router with {} services and {} dynamic endpoints",
211            available_services.len(),
212            total_methods
213        );
214
215        router
216    }
217
218    /// Health check handler
219    async fn health_check(
220        State(_bridge): State<Arc<HttpBridge>>,
221    ) -> axum::response::Json<serde_json::Value> {
222        axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223    }
224
225    /// Get statistics handler
226    async fn get_stats(
227        State(bridge): State<Arc<HttpBridge>>,
228    ) -> axum::response::Json<serde_json::Value> {
229        // Handle poisoned mutex gracefully - if mutex is poisoned, use default stats
230        let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
231            warn!("Statistics mutex is poisoned, using default values");
232            poisoned.into_inner()
233        });
234        axum::response::Json(serde_json::json!({
235            "requests_served": stats.requests_served,
236            "requests_successful": stats.requests_successful,
237            "requests_failed": stats.requests_failed,
238            "available_services": stats.available_services
239        }))
240    }
241
242    /// List services handler
243    async fn list_services(
244        State(bridge): State<Arc<HttpBridge>>,
245    ) -> axum::response::Json<serde_json::Value> {
246        Self::list_services_static(&bridge).await
247    }
248
249    /// Get OpenAPI spec handler
250    async fn get_openapi_spec(
251        State(bridge): State<Arc<HttpBridge>>,
252    ) -> axum::response::Json<serde_json::Value> {
253        Self::get_openapi_spec_static(&bridge).await
254    }
255
256    /// Generic bridge request handler that routes to specific services/methods
257    async fn handle_generic_bridge_request(
258        State(state): State<Arc<HttpBridge>>,
259        Path(path_params): Path<HashMap<String, String>>,
260        _query: Query<BridgeQuery>,
261        body: Bytes,
262    ) -> axum::response::Response {
263        // Extract service and method from path parameters
264        let service_name = match path_params.get("service") {
265            Some(name) => name,
266            None => {
267                let error_response = BridgeResponse::<Value> {
268                    success: false,
269                    data: None,
270                    error: Some("Missing 'service' parameter in path".to_string()),
271                    metadata: HashMap::new(),
272                };
273                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
274            }
275        };
276
277        let method_name = match path_params.get("method") {
278            Some(name) => name,
279            None => {
280                let error_response = BridgeResponse::<Value> {
281                    success: false,
282                    data: None,
283                    error: Some("Missing 'method' parameter in path".to_string()),
284                    metadata: HashMap::new(),
285                };
286                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
287            }
288        };
289
290        // Get method information from the registry
291        let registry = state.proxy.service_registry();
292        let service_opt = registry.get(service_name);
293        let method_info = if let Some(service) = service_opt {
294            service.service().methods.iter().find(|m| m.name == *method_name)
295        } else {
296            let error_response = BridgeResponse::<Value> {
297                success: false,
298                data: None,
299                error: Some(format!("Service '{}' not found", service_name)),
300                metadata: HashMap::new(),
301            };
302            return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
303        };
304
305        let method_info = match method_info {
306            Some(method) => method,
307            None => {
308                let error_response = BridgeResponse::<Value> {
309                    success: false,
310                    data: None,
311                    error: Some(format!(
312                        "Method '{}' not found in service '{}'",
313                        method_name, service_name
314                    )),
315                    metadata: HashMap::new(),
316                };
317                return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
318            }
319        };
320
321        // Update stats - handle poisoned mutex gracefully
322        {
323            if let Ok(mut stats) = state.stats.lock() {
324                stats.requests_served += 1;
325            } else {
326                warn!("Failed to update request stats (mutex poisoned)");
327            }
328        }
329
330        // Handle the request
331        let params = BridgeRequestParams {
332            proxy: &state.proxy,
333            converter: &state.converter,
334            service_name: service_name.as_str(),
335            method_name: method_name.as_str(),
336            server_streaming: method_info.server_streaming,
337            body,
338        };
339        let result = Self::handle_bridge_request(&params).await;
340
341        match result {
342            Ok(response) => {
343                // Update successful stats - handle poisoned mutex gracefully
344                {
345                    if let Ok(mut stats) = state.stats.lock() {
346                        stats.requests_successful += 1;
347                    } else {
348                        warn!("Failed to update success stats (mutex poisoned)");
349                    }
350                }
351                (axum::http::StatusCode::OK, Json(response)).into_response()
352            }
353            Err(err) => {
354                // Update failed stats - handle poisoned mutex gracefully
355                {
356                    if let Ok(mut stats) = state.stats.lock() {
357                        stats.requests_failed += 1;
358                    } else {
359                        warn!("Failed to update failure stats (mutex poisoned)");
360                    }
361                }
362                warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
363                let error_response = BridgeResponse::<Value> {
364                    success: false,
365                    data: None,
366                    error: Some(err.to_string()),
367                    metadata: HashMap::new(),
368                };
369                (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
370                    .into_response()
371            }
372        }
373    }
374
375    /// Create a handler function for a specific gRPC method
376    ///
377    /// TODO: Use when dynamic HTTP bridge handler creation is fully implemented
378    #[allow(dead_code)] // TODO: Remove when HTTP bridge handler factory is complete
379    fn create_bridge_handler(
380        &self,
381        service_name: String,
382        method_name: String,
383        _client_streaming: bool,
384        server_streaming: bool,
385    ) -> Box<BridgeHandlerFn> {
386        Box::new(
387            move |state: State<Arc<Self>>,
388                  _path: Path<HashMap<String, String>>,
389                  _query: Query<BridgeQuery>,
390                  body: Bytes| {
391                let service_name = service_name.clone();
392                let method_name = method_name.clone();
393                let stats = state.stats.clone();
394                let proxy = state.proxy.clone();
395                let converter = state.converter.clone();
396
397                Box::pin(async move {
398                    // Update stats - handle poisoned mutex gracefully
399                    {
400                        if let Ok(mut stats) = stats.lock() {
401                            stats.requests_served += 1;
402                        } else {
403                            warn!("Failed to update request stats (mutex poisoned)");
404                        }
405                    }
406
407                    // Handle the request
408                    let params = BridgeRequestParams {
409                        proxy: &proxy,
410                        converter: &converter,
411                        service_name: service_name.as_str(),
412                        method_name: method_name.as_str(),
413                        server_streaming,
414                        body,
415                    };
416                    let result = Self::handle_bridge_request(&params).await;
417
418                    match result {
419                        Ok(response) => {
420                            // Update successful stats - handle poisoned mutex gracefully
421                            {
422                                if let Ok(mut stats) = stats.lock() {
423                                    stats.requests_successful += 1;
424                                } else {
425                                    warn!("Failed to update success stats (mutex poisoned)");
426                                }
427                            }
428                            (axum::http::StatusCode::OK, Json(response)).into_response()
429                        }
430                        Err(err) => {
431                            // Update failed stats - handle poisoned mutex gracefully
432                            {
433                                if let Ok(mut stats) = stats.lock() {
434                                    stats.requests_failed += 1;
435                                } else {
436                                    warn!("Failed to update failure stats (mutex poisoned)");
437                                }
438                            }
439                            warn!(
440                                "Bridge request failed for {}.{}: {}",
441                                service_name, method_name, err
442                            );
443                            let error_response = BridgeResponse::<Value> {
444                                success: false,
445                                data: None,
446                                error: Some(err.to_string()),
447                                metadata: HashMap::new(),
448                            };
449                            (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
450                                .into_response()
451                        }
452                    }
453                })
454            },
455        )
456    }
457
458    /// Get bridge statistics (static method for handler)
459    ///
460    /// TODO: Integrate into HTTP bridge admin endpoints when stats API is implemented
461    #[allow(dead_code)] // TODO: Remove when bridge stats endpoint is complete
462    async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
463        // Handle poisoned mutex gracefully - if mutex is poisoned, use default stats
464        let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
465            warn!("Statistics mutex is poisoned, using default values");
466            poisoned.into_inner()
467        });
468        axum::response::Json(serde_json::json!({
469            "requests_served": stats.requests_served,
470            "requests_successful": stats.requests_successful,
471            "requests_failed": stats.requests_failed,
472            "available_services": stats.available_services
473        }))
474    }
475
476    /// List available services (static method for handler)
477    async fn list_services_static(
478        bridge: &Arc<HttpBridge>,
479    ) -> axum::response::Json<serde_json::Value> {
480        let services = bridge.proxy.service_names();
481        axum::response::Json(serde_json::json!({
482            "services": services
483        }))
484    }
485
486    /// Get OpenAPI spec (static method for handler)
487    async fn get_openapi_spec_static(
488        bridge: &Arc<HttpBridge>,
489    ) -> axum::response::Json<serde_json::Value> {
490        use crate::dynamic::proto_parser::ProtoService;
491        use std::collections::HashMap;
492
493        // Extract services from the service registry
494        let services: HashMap<String, ProtoService> = bridge
495            .proxy
496            .service_registry()
497            .services
498            .iter()
499            .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
500            .collect();
501
502        // Generate OpenAPI spec using the route generator
503        let spec = bridge.route_generator.generate_openapi_spec(&services);
504        axum::response::Json(spec)
505    }
506
507    /// Handle a bridge request by calling the appropriate gRPC method
508    async fn handle_bridge_request(
509        params: &BridgeRequestParams<'_>,
510    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
511        debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
512
513        // Parse JSON request body
514        let json_request: Value = if params.body.is_empty() {
515            Value::Null
516        } else {
517            serde_json::from_slice(&params.body).map_err(|e| {
518                Box::<dyn std::error::Error + Send + Sync>::from(format!(
519                    "Failed to parse JSON request: {}",
520                    e
521                ))
522            })?
523        };
524
525        // Call appropriate gRPC method based on streaming type
526        if params.server_streaming {
527            // Handle streaming response
528            Self::handle_streaming_request(
529                params.proxy,
530                params.converter,
531                params.service_name,
532                params.method_name,
533                json_request,
534            )
535            .await
536        } else {
537            // Handle unary request
538            Self::handle_unary_request(
539                params.proxy,
540                params.converter,
541                params.service_name,
542                params.method_name,
543                json_request,
544            )
545            .await
546        }
547    }
548
549    /// Handle unary request (no streaming)
550    async fn handle_unary_request(
551        proxy: &MockReflectionProxy,
552        _converter: &ProtobufJsonConverter,
553        service_name: &str,
554        method_name: &str,
555        json_request: Value,
556    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
557        // Get method descriptor from the service registry
558        let registry = proxy.service_registry();
559        let service_registry = registry.clone();
560
561        // Find the service and method
562        let service = match service_registry.get(service_name) {
563            Some(s) => s,
564            None => {
565                return Err(format!("Service '{}' not found", service_name).into());
566            }
567        };
568
569        let method = match service.service().methods.iter().find(|m| m.name == method_name) {
570            Some(m) => m,
571            None => {
572                return Err(format!(
573                    "Method '{}' not found in service '{}'",
574                    method_name, service_name
575                )
576                .into());
577            }
578        };
579
580        // Use method for future implementation
581        let _method = method;
582
583        // For now, create a generic response since we don't have full descriptor integration
584        // In a complete implementation, this would:
585        // 1. Get input/output descriptor from proto parser
586        // 2. Convert JSON to protobuf message
587        // 3. Call the actual gRPC method via proxy
588        // 4. Convert protobuf response back to JSON
589
590        // Create a mock response for demonstration
591        let json_response = serde_json::json!({
592            "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
593            "request_data": json_request,
594            "timestamp": chrono::Utc::now().to_rfc3339()
595        });
596
597        Ok(BridgeResponse {
598            success: true,
599            data: Some(json_response),
600            error: None,
601            metadata: HashMap::new(),
602        })
603    }
604
605    /// Handle streaming request (returns SSE stream)
606    async fn handle_streaming_request(
607        _proxy: &MockReflectionProxy,
608        _converter: &ProtobufJsonConverter,
609        _service_name: &str,
610        _method_name: &str,
611        _json_request: Value,
612    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
613        // For now, return an error indicating streaming is not yet implemented via HTTP
614        // Full streaming implementation would use Server-Sent Events
615        Err("Streaming responses via HTTP bridge are not yet implemented".into())
616    }
617}
618
619impl Clone for HttpBridge {
620    fn clone(&self) -> Self {
621        Self {
622            proxy: self.proxy.clone(),
623            route_generator: self.route_generator.clone(),
624            converter: self.converter.clone(),
625            config: self.config.clone(),
626            stats: self.stats.clone(),
627        }
628    }
629}
630
631#[cfg(test)]
632mod tests {
633
634    #[test]
635    fn test_module_compiles() {}
636}