mockforge_grpc/dynamic/http_bridge/
mod.rs

1//! HTTP to gRPC bridge implementation
2//!
3//! This module provides functionality to bridge HTTP requests to gRPC services,
4//! allowing RESTful APIs to be generated dynamically from protobuf definitions.
5
6pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12    body::Bytes,
13    extract::{Path, Query, State},
14    http::Method,
15    response::{IntoResponse, Json},
16    routing::{get, post},
17    Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30/// Type alias for the bridge handler function to reduce type complexity
31type BridgeHandlerFn = dyn Fn(
32        State<Arc<HttpBridge>>,
33        Path<HashMap<String, String>>,
34        Query<BridgeQuery>,
35        Bytes,
36    ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37    + Send
38    + Sync;
39
40/// Parameters for bridge request handling
41struct BridgeRequestParams<'a> {
42    proxy: &'a MockReflectionProxy,
43    converter: &'a ProtobufJsonConverter,
44    service_name: &'a str,
45    method_name: &'a str,
46    server_streaming: bool,
47    body: Bytes,
48}
49
50/// Configuration for the HTTP bridge
51#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53    /// Whether the HTTP bridge is enabled
54    pub enabled: bool,
55    /// Base path for HTTP routes (e.g., "/api")
56    pub base_path: String,
57    /// Whether to enable CORS
58    pub enable_cors: bool,
59    /// Maximum request size in bytes
60    pub max_request_size: usize,
61    /// Timeout for bridge requests in seconds
62    pub timeout_seconds: u64,
63    /// Path pattern for service routes (e.g., "/{service}/{method}")
64    pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68    fn default() -> Self {
69        Self {
70            enabled: true,
71            base_path: "/api".to_string(),
72            enable_cors: true,
73            max_request_size: 10 * 1024 * 1024, // 10MB
74            timeout_seconds: 30,
75            route_pattern: "/{service}/{method}".to_string(),
76        }
77    }
78}
79
80/// Query parameters for HTTP requests
81#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83    /// Streaming mode (none, server, client, bidirectional)
84    #[serde(default)]
85    pub stream: Option<String>,
86    /// Metadata to pass to gRPC call as key=value pairs
87    #[serde(flatten)]
88    pub metadata: HashMap<String, String>,
89}
90
91/// HTTP response wrapper
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94    /// Whether the request was successful
95    pub success: bool,
96    /// The response data
97    pub data: Option<T>,
98    /// Error message if success is false
99    pub error: Option<String>,
100    /// Metadata from the gRPC response
101    pub metadata: HashMap<String, String>,
102}
103
104/// Statistics about the HTTP bridge
105#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107    /// Number of requests served
108    pub requests_served: u64,
109    /// Number of successful requests
110    pub requests_successful: u64,
111    /// Number of failed requests
112    pub requests_failed: u64,
113    /// Services available via the bridge
114    pub available_services: Vec<String>,
115}
116
117/// The HTTP bridge that provides RESTful API access to gRPC services
118pub struct HttpBridge {
119    /// The reflection proxy that handles gRPC calls
120    proxy: Arc<MockReflectionProxy>,
121    /// Route generator for creating HTTP routes
122    route_generator: RouteGenerator,
123    /// JSON to protobuf converter
124    converter: ProtobufJsonConverter,
125    /// Bridge configuration
126    config: HttpBridgeConfig,
127    /// Statistics
128    stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132    /// Create a new HTTP bridge
133    pub fn new(
134        proxy: Arc<MockReflectionProxy>,
135        config: HttpBridgeConfig,
136    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137        let route_generator = RouteGenerator::new(config.clone());
138        let converter =
139            ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140        let available_services = proxy.service_names();
141
142        let stats = BridgeStats {
143            requests_served: 0,
144            requests_successful: 0,
145            requests_failed: 0,
146            available_services,
147        };
148
149        Ok(Self {
150            proxy,
151            route_generator,
152            converter,
153            config,
154            stats: Arc::new(std::sync::Mutex::new(stats)),
155        })
156    }
157
158    /// Create the HTTP router with all bridge routes
159    pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160        let mut router = Router::new();
161
162        // Add CORS if enabled
163        if self.config.enable_cors {
164            router = router.layer(
165                CorsLayer::new()
166                    .allow_methods([
167                        Method::GET,
168                        Method::POST,
169                        Method::PUT,
170                        Method::DELETE,
171                        Method::PATCH,
172                    ])
173                    .allow_headers(Any)
174                    .allow_origin(Any),
175            );
176        }
177
178        // Add state containing self reference
179        let bridge = Arc::new(self.clone());
180        router = router.with_state(bridge);
181
182        // Add health check endpoint
183        router =
184            router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186        // Add statistics endpoint
187        router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189        // Add services listing endpoint
190        router =
191            router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193        // Add OpenAPI documentation endpoint
194        router =
195            router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197        // Create dynamic bridge endpoints for all registered services
198        let registry = self.proxy.service_registry();
199
200        // Add a generic route that handles all service/method combinations
201        // The route pattern supports both GET (for streaming) and POST (for unary) requests
202        router =
203            router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204        router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206        let available_services = registry.service_names();
207        let total_methods =
208            registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209        info!(
210            "Created HTTP bridge router with {} services and {} dynamic endpoints",
211            available_services.len(),
212            total_methods
213        );
214
215        router
216    }
217
218    /// Health check handler
219    async fn health_check(
220        State(_bridge): State<Arc<HttpBridge>>,
221    ) -> axum::response::Json<serde_json::Value> {
222        axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223    }
224
225    /// Get statistics handler
226    async fn get_stats(
227        State(bridge): State<Arc<HttpBridge>>,
228    ) -> axum::response::Json<serde_json::Value> {
229        let stats = bridge.stats.lock().unwrap();
230        axum::response::Json(serde_json::json!({
231            "requests_served": stats.requests_served,
232            "requests_successful": stats.requests_successful,
233            "requests_failed": stats.requests_failed,
234            "available_services": stats.available_services
235        }))
236    }
237
238    /// List services handler
239    async fn list_services(
240        State(bridge): State<Arc<HttpBridge>>,
241    ) -> axum::response::Json<serde_json::Value> {
242        Self::list_services_static(&bridge).await
243    }
244
245    /// Get OpenAPI spec handler
246    async fn get_openapi_spec(
247        State(bridge): State<Arc<HttpBridge>>,
248    ) -> axum::response::Json<serde_json::Value> {
249        Self::get_openapi_spec_static(&bridge).await
250    }
251
252    /// Generic bridge request handler that routes to specific services/methods
253    async fn handle_generic_bridge_request(
254        State(state): State<Arc<HttpBridge>>,
255        Path(path_params): Path<HashMap<String, String>>,
256        _query: Query<BridgeQuery>,
257        body: Bytes,
258    ) -> axum::response::Response {
259        // Extract service and method from path parameters
260        let service_name = match path_params.get("service") {
261            Some(name) => name,
262            None => {
263                let error_response = BridgeResponse::<Value> {
264                    success: false,
265                    data: None,
266                    error: Some("Missing 'service' parameter in path".to_string()),
267                    metadata: HashMap::new(),
268                };
269                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
270            }
271        };
272
273        let method_name = match path_params.get("method") {
274            Some(name) => name,
275            None => {
276                let error_response = BridgeResponse::<Value> {
277                    success: false,
278                    data: None,
279                    error: Some("Missing 'method' parameter in path".to_string()),
280                    metadata: HashMap::new(),
281                };
282                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
283            }
284        };
285
286        // Get method information from the registry
287        let registry = state.proxy.service_registry();
288        let service_opt = registry.get(service_name);
289        let method_info = if let Some(service) = service_opt {
290            service.service().methods.iter().find(|m| m.name == *method_name)
291        } else {
292            let error_response = BridgeResponse::<Value> {
293                success: false,
294                data: None,
295                error: Some(format!("Service '{}' not found", service_name)),
296                metadata: HashMap::new(),
297            };
298            return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
299        };
300
301        let method_info = match method_info {
302            Some(method) => method,
303            None => {
304                let error_response = BridgeResponse::<Value> {
305                    success: false,
306                    data: None,
307                    error: Some(format!(
308                        "Method '{}' not found in service '{}'",
309                        method_name, service_name
310                    )),
311                    metadata: HashMap::new(),
312                };
313                return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
314            }
315        };
316
317        // Update stats
318        {
319            let mut stats = state.stats.lock().unwrap();
320            stats.requests_served += 1;
321        }
322
323        // Handle the request
324        let params = BridgeRequestParams {
325            proxy: &state.proxy,
326            converter: &state.converter,
327            service_name: service_name.as_str(),
328            method_name: method_name.as_str(),
329            server_streaming: method_info.server_streaming,
330            body,
331        };
332        let result = Self::handle_bridge_request(&params).await;
333
334        match result {
335            Ok(response) => {
336                // Update successful stats
337                {
338                    let mut stats = state.stats.lock().unwrap();
339                    stats.requests_successful += 1;
340                }
341                (axum::http::StatusCode::OK, Json(response)).into_response()
342            }
343            Err(err) => {
344                // Update failed stats
345                {
346                    let mut stats = state.stats.lock().unwrap();
347                    stats.requests_failed += 1;
348                }
349                warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
350                let error_response = BridgeResponse::<Value> {
351                    success: false,
352                    data: None,
353                    error: Some(err.to_string()),
354                    metadata: HashMap::new(),
355                };
356                (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
357                    .into_response()
358            }
359        }
360    }
361
362    /// Create a handler function for a specific gRPC method
363    #[allow(dead_code)] // Used in future HTTP bridge implementations
364    fn create_bridge_handler(
365        &self,
366        service_name: String,
367        method_name: String,
368        _client_streaming: bool,
369        server_streaming: bool,
370    ) -> Box<BridgeHandlerFn> {
371        Box::new(
372            move |state: State<Arc<Self>>,
373                  _path: Path<HashMap<String, String>>,
374                  _query: Query<BridgeQuery>,
375                  body: Bytes| {
376                let service_name = service_name.clone();
377                let method_name = method_name.clone();
378                let stats = state.stats.clone();
379                let proxy = state.proxy.clone();
380                let converter = state.converter.clone();
381
382                Box::pin(async move {
383                    // Update stats
384                    {
385                        let mut stats = stats.lock().unwrap();
386                        stats.requests_served += 1;
387                    }
388
389                    // Handle the request
390                    let params = BridgeRequestParams {
391                        proxy: &proxy,
392                        converter: &converter,
393                        service_name: service_name.as_str(),
394                        method_name: method_name.as_str(),
395                        server_streaming,
396                        body,
397                    };
398                    let result = Self::handle_bridge_request(&params).await;
399
400                    match result {
401                        Ok(response) => {
402                            // Update successful stats
403                            {
404                                let mut stats = stats.lock().unwrap();
405                                stats.requests_successful += 1;
406                            }
407                            (axum::http::StatusCode::OK, Json(response)).into_response()
408                        }
409                        Err(err) => {
410                            // Update failed stats
411                            {
412                                let mut stats = stats.lock().unwrap();
413                                stats.requests_failed += 1;
414                            }
415                            warn!(
416                                "Bridge request failed for {}.{}: {}",
417                                service_name, method_name, err
418                            );
419                            let error_response = BridgeResponse::<Value> {
420                                success: false,
421                                data: None,
422                                error: Some(err.to_string()),
423                                metadata: HashMap::new(),
424                            };
425                            (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
426                                .into_response()
427                        }
428                    }
429                })
430            },
431        )
432    }
433
434    /// Get bridge statistics (static method for handler)
435    #[allow(dead_code)] // Used in future HTTP bridge implementations
436    async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
437        let stats = bridge.stats.lock().unwrap();
438        axum::response::Json(serde_json::json!({
439            "requests_served": stats.requests_served,
440            "requests_successful": stats.requests_successful,
441            "requests_failed": stats.requests_failed,
442            "available_services": stats.available_services
443        }))
444    }
445
446    /// List available services (static method for handler)
447    async fn list_services_static(
448        bridge: &Arc<HttpBridge>,
449    ) -> axum::response::Json<serde_json::Value> {
450        let services = bridge.proxy.service_names();
451        axum::response::Json(serde_json::json!({
452            "services": services
453        }))
454    }
455
456    /// Get OpenAPI spec (static method for handler)
457    async fn get_openapi_spec_static(
458        bridge: &Arc<HttpBridge>,
459    ) -> axum::response::Json<serde_json::Value> {
460        use crate::dynamic::proto_parser::ProtoService;
461        use std::collections::HashMap;
462
463        // Extract services from the service registry
464        let services: HashMap<String, ProtoService> = bridge
465            .proxy
466            .service_registry()
467            .services
468            .iter()
469            .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
470            .collect();
471
472        // Generate OpenAPI spec using the route generator
473        let spec = bridge.route_generator.generate_openapi_spec(&services);
474        axum::response::Json(spec)
475    }
476
477    /// Handle a bridge request by calling the appropriate gRPC method
478    async fn handle_bridge_request(
479        params: &BridgeRequestParams<'_>,
480    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
481        debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
482
483        // Parse JSON request body
484        let json_request: Value = if params.body.is_empty() {
485            Value::Null
486        } else {
487            serde_json::from_slice(&params.body).map_err(|e| {
488                Box::<dyn std::error::Error + Send + Sync>::from(format!(
489                    "Failed to parse JSON request: {}",
490                    e
491                ))
492            })?
493        };
494
495        // Call appropriate gRPC method based on streaming type
496        if params.server_streaming {
497            // Handle streaming response
498            Self::handle_streaming_request(
499                params.proxy,
500                params.converter,
501                params.service_name,
502                params.method_name,
503                json_request,
504            )
505            .await
506        } else {
507            // Handle unary request
508            Self::handle_unary_request(
509                params.proxy,
510                params.converter,
511                params.service_name,
512                params.method_name,
513                json_request,
514            )
515            .await
516        }
517    }
518
519    /// Handle unary request (no streaming)
520    async fn handle_unary_request(
521        proxy: &MockReflectionProxy,
522        _converter: &ProtobufJsonConverter,
523        service_name: &str,
524        method_name: &str,
525        json_request: Value,
526    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
527        // Get method descriptor from the service registry
528        let registry = proxy.service_registry();
529        let service_registry = registry.clone();
530
531        // Find the service and method
532        let service_opt = service_registry.get(service_name);
533        if service_opt.is_none() {
534            return Err(format!("Service '{}' not found", service_name).into());
535        }
536
537        let service = service_opt.unwrap();
538        let method_opt = service.service().methods.iter().find(|m| m.name == method_name);
539        if method_opt.is_none() {
540            return Err(format!(
541                "Method '{}' not found in service '{}'",
542                method_name, service_name
543            )
544            .into());
545        }
546
547        // For now, create a generic response since we don't have full descriptor integration
548        // In a complete implementation, this would:
549        // 1. Get input/output descriptor from proto parser
550        // 2. Convert JSON to protobuf message
551        // 3. Call the actual gRPC method via proxy
552        // 4. Convert protobuf response back to JSON
553
554        // Create a mock response for demonstration
555        let json_response = serde_json::json!({
556            "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
557            "request_data": json_request,
558            "timestamp": chrono::Utc::now().to_rfc3339()
559        });
560
561        Ok(BridgeResponse {
562            success: true,
563            data: Some(json_response),
564            error: None,
565            metadata: HashMap::new(),
566        })
567    }
568
569    /// Handle streaming request (returns SSE stream)
570    async fn handle_streaming_request(
571        _proxy: &MockReflectionProxy,
572        _converter: &ProtobufJsonConverter,
573        _service_name: &str,
574        _method_name: &str,
575        _json_request: Value,
576    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
577        // For now, return an error indicating streaming is not yet implemented via HTTP
578        // Full streaming implementation would use Server-Sent Events
579        Err("Streaming responses via HTTP bridge are not yet implemented".into())
580    }
581}
582
583impl Clone for HttpBridge {
584    fn clone(&self) -> Self {
585        Self {
586            proxy: self.proxy.clone(),
587            route_generator: self.route_generator.clone(),
588            converter: self.converter.clone(),
589            config: self.config.clone(),
590            stats: self.stats.clone(),
591        }
592    }
593}
594
595#[cfg(test)]
596mod tests {
597
598    #[test]
599    fn test_module_compiles() {}
600}