mockforge_grpc/dynamic/http_bridge/
mod.rs

1//! HTTP to gRPC bridge implementation
2//!
3//! This module provides functionality to bridge HTTP requests to gRPC services,
4//! allowing RESTful APIs to be generated dynamically from protobuf definitions.
5
6pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12    body::Bytes,
13    extract::{Path, Query, State},
14    http::Method,
15    response::{IntoResponse, Json},
16    routing::{get, post},
17    Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30/// Type alias for the bridge handler function to reduce type complexity
31type BridgeHandlerFn = dyn Fn(
32        State<Arc<HttpBridge>>,
33        Path<HashMap<String, String>>,
34        Query<BridgeQuery>,
35        Bytes,
36    ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37    + Send
38    + Sync;
39
40/// Parameters for bridge request handling
41struct BridgeRequestParams<'a> {
42    proxy: &'a MockReflectionProxy,
43    converter: &'a ProtobufJsonConverter,
44    service_name: &'a str,
45    method_name: &'a str,
46    server_streaming: bool,
47    body: Bytes,
48}
49
50/// Configuration for the HTTP bridge
51#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53    /// Whether the HTTP bridge is enabled
54    pub enabled: bool,
55    /// Base path for HTTP routes (e.g., "/api")
56    pub base_path: String,
57    /// Whether to enable CORS
58    pub enable_cors: bool,
59    /// Maximum request size in bytes
60    pub max_request_size: usize,
61    /// Timeout for bridge requests in seconds
62    pub timeout_seconds: u64,
63    /// Path pattern for service routes (e.g., "/{service}/{method}")
64    pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68    fn default() -> Self {
69        Self {
70            enabled: true,
71            base_path: "/api".to_string(),
72            enable_cors: true,
73            max_request_size: 10 * 1024 * 1024, // 10MB
74            timeout_seconds: 30,
75            route_pattern: "/{service}/{method}".to_string(),
76        }
77    }
78}
79
80/// Query parameters for HTTP requests
81#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83    /// Streaming mode (none, server, client, bidirectional)
84    #[serde(default)]
85    pub stream: Option<String>,
86    /// Metadata to pass to gRPC call as key=value pairs
87    #[serde(flatten)]
88    pub metadata: HashMap<String, String>,
89}
90
91/// HTTP response wrapper
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94    /// Whether the request was successful
95    pub success: bool,
96    /// The response data
97    pub data: Option<T>,
98    /// Error message if success is false
99    pub error: Option<String>,
100    /// Metadata from the gRPC response
101    pub metadata: HashMap<String, String>,
102}
103
104/// Statistics about the HTTP bridge
105#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107    /// Number of requests served
108    pub requests_served: u64,
109    /// Number of successful requests
110    pub requests_successful: u64,
111    /// Number of failed requests
112    pub requests_failed: u64,
113    /// Services available via the bridge
114    pub available_services: Vec<String>,
115}
116
117/// The HTTP bridge that provides RESTful API access to gRPC services
118pub struct HttpBridge {
119    /// The reflection proxy that handles gRPC calls
120    proxy: Arc<MockReflectionProxy>,
121    /// Route generator for creating HTTP routes
122    route_generator: RouteGenerator,
123    /// JSON to protobuf converter
124    converter: ProtobufJsonConverter,
125    /// Bridge configuration
126    config: HttpBridgeConfig,
127    /// Statistics
128    stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132    /// Create a new HTTP bridge
133    pub fn new(
134        proxy: Arc<MockReflectionProxy>,
135        config: HttpBridgeConfig,
136    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137        let route_generator = RouteGenerator::new(config.clone());
138        let converter =
139            ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140        let available_services = proxy.service_names();
141
142        let stats = BridgeStats {
143            requests_served: 0,
144            requests_successful: 0,
145            requests_failed: 0,
146            available_services,
147        };
148
149        Ok(Self {
150            proxy,
151            route_generator,
152            converter,
153            config,
154            stats: Arc::new(std::sync::Mutex::new(stats)),
155        })
156    }
157
158    /// Create the HTTP router with all bridge routes
159    pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160        let mut router = Router::new();
161
162        // Add CORS if enabled
163        if self.config.enable_cors {
164            router = router.layer(
165                CorsLayer::new()
166                    .allow_methods([
167                        Method::GET,
168                        Method::POST,
169                        Method::PUT,
170                        Method::DELETE,
171                        Method::PATCH,
172                    ])
173                    .allow_headers(Any)
174                    .allow_origin(Any),
175            );
176        }
177
178        // Add state containing self reference
179        let bridge = Arc::new(self.clone());
180        router = router.with_state(bridge);
181
182        // Add health check endpoint
183        router =
184            router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186        // Add statistics endpoint
187        router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189        // Add services listing endpoint
190        router =
191            router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193        // Add OpenAPI documentation endpoint
194        router =
195            router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197        // Create dynamic bridge endpoints for all registered services
198        let registry = self.proxy.service_registry();
199
200        // Add a generic route that handles all service/method combinations
201        // The route pattern supports both GET (for streaming) and POST (for unary) requests
202        router =
203            router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204        router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206        let available_services = registry.service_names();
207        let total_methods =
208            registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209        info!(
210            "Created HTTP bridge router with {} services and {} dynamic endpoints",
211            available_services.len(),
212            total_methods
213        );
214
215        router
216    }
217
218    /// Health check handler
219    async fn health_check(
220        State(_bridge): State<Arc<HttpBridge>>,
221    ) -> axum::response::Json<serde_json::Value> {
222        axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223    }
224
225    /// Get statistics handler
226    async fn get_stats(
227        State(bridge): State<Arc<HttpBridge>>,
228    ) -> axum::response::Json<serde_json::Value> {
229        let stats = bridge.stats.lock().unwrap();
230        axum::response::Json(serde_json::json!({
231            "requests_served": stats.requests_served,
232            "requests_successful": stats.requests_successful,
233            "requests_failed": stats.requests_failed,
234            "available_services": stats.available_services
235        }))
236    }
237
238    /// List services handler
239    async fn list_services(
240        State(bridge): State<Arc<HttpBridge>>,
241    ) -> axum::response::Json<serde_json::Value> {
242        Self::list_services_static(&bridge).await
243    }
244
245    /// Get OpenAPI spec handler
246    async fn get_openapi_spec(
247        State(bridge): State<Arc<HttpBridge>>,
248    ) -> axum::response::Json<serde_json::Value> {
249        Self::get_openapi_spec_static(&bridge).await
250    }
251
252    /// Generic bridge request handler that routes to specific services/methods
253    async fn handle_generic_bridge_request(
254        State(state): State<Arc<HttpBridge>>,
255        Path(path_params): Path<HashMap<String, String>>,
256        _query: Query<BridgeQuery>,
257        body: Bytes,
258    ) -> axum::response::Response {
259        // Extract service and method from path parameters
260        let service_name = match path_params.get("service") {
261            Some(name) => name,
262            None => {
263                let error_response = BridgeResponse::<Value> {
264                    success: false,
265                    data: None,
266                    error: Some("Missing 'service' parameter in path".to_string()),
267                    metadata: HashMap::new(),
268                };
269                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
270            }
271        };
272
273        let method_name = match path_params.get("method") {
274            Some(name) => name,
275            None => {
276                let error_response = BridgeResponse::<Value> {
277                    success: false,
278                    data: None,
279                    error: Some("Missing 'method' parameter in path".to_string()),
280                    metadata: HashMap::new(),
281                };
282                return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
283            }
284        };
285
286        // Get method information from the registry
287        let registry = state.proxy.service_registry();
288        let service_opt = registry.get(service_name);
289        let method_info = if let Some(service) = service_opt {
290            service.service().methods.iter().find(|m| m.name == *method_name)
291        } else {
292            let error_response = BridgeResponse::<Value> {
293                success: false,
294                data: None,
295                error: Some(format!("Service '{}' not found", service_name)),
296                metadata: HashMap::new(),
297            };
298            return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
299        };
300
301        let method_info = match method_info {
302            Some(method) => method,
303            None => {
304                let error_response = BridgeResponse::<Value> {
305                    success: false,
306                    data: None,
307                    error: Some(format!(
308                        "Method '{}' not found in service '{}'",
309                        method_name, service_name
310                    )),
311                    metadata: HashMap::new(),
312                };
313                return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
314            }
315        };
316
317        // Update stats
318        {
319            let mut stats = state.stats.lock().unwrap();
320            stats.requests_served += 1;
321        }
322
323        // Handle the request
324        let params = BridgeRequestParams {
325            proxy: &state.proxy,
326            converter: &state.converter,
327            service_name: service_name.as_str(),
328            method_name: method_name.as_str(),
329            server_streaming: method_info.server_streaming,
330            body,
331        };
332        let result = Self::handle_bridge_request(&params).await;
333
334        match result {
335            Ok(response) => {
336                // Update successful stats
337                {
338                    let mut stats = state.stats.lock().unwrap();
339                    stats.requests_successful += 1;
340                }
341                (axum::http::StatusCode::OK, Json(response)).into_response()
342            }
343            Err(err) => {
344                // Update failed stats
345                {
346                    let mut stats = state.stats.lock().unwrap();
347                    stats.requests_failed += 1;
348                }
349                warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
350                let error_response = BridgeResponse::<Value> {
351                    success: false,
352                    data: None,
353                    error: Some(err.to_string()),
354                    metadata: HashMap::new(),
355                };
356                (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
357                    .into_response()
358            }
359        }
360    }
361
362    /// Create a handler function for a specific gRPC method
363    ///
364    /// TODO: Use when dynamic HTTP bridge handler creation is fully implemented
365    #[allow(dead_code)] // TODO: Remove when HTTP bridge handler factory is complete
366    fn create_bridge_handler(
367        &self,
368        service_name: String,
369        method_name: String,
370        _client_streaming: bool,
371        server_streaming: bool,
372    ) -> Box<BridgeHandlerFn> {
373        Box::new(
374            move |state: State<Arc<Self>>,
375                  _path: Path<HashMap<String, String>>,
376                  _query: Query<BridgeQuery>,
377                  body: Bytes| {
378                let service_name = service_name.clone();
379                let method_name = method_name.clone();
380                let stats = state.stats.clone();
381                let proxy = state.proxy.clone();
382                let converter = state.converter.clone();
383
384                Box::pin(async move {
385                    // Update stats
386                    {
387                        let mut stats = stats.lock().unwrap();
388                        stats.requests_served += 1;
389                    }
390
391                    // Handle the request
392                    let params = BridgeRequestParams {
393                        proxy: &proxy,
394                        converter: &converter,
395                        service_name: service_name.as_str(),
396                        method_name: method_name.as_str(),
397                        server_streaming,
398                        body,
399                    };
400                    let result = Self::handle_bridge_request(&params).await;
401
402                    match result {
403                        Ok(response) => {
404                            // Update successful stats
405                            {
406                                let mut stats = stats.lock().unwrap();
407                                stats.requests_successful += 1;
408                            }
409                            (axum::http::StatusCode::OK, Json(response)).into_response()
410                        }
411                        Err(err) => {
412                            // Update failed stats
413                            {
414                                let mut stats = stats.lock().unwrap();
415                                stats.requests_failed += 1;
416                            }
417                            warn!(
418                                "Bridge request failed for {}.{}: {}",
419                                service_name, method_name, err
420                            );
421                            let error_response = BridgeResponse::<Value> {
422                                success: false,
423                                data: None,
424                                error: Some(err.to_string()),
425                                metadata: HashMap::new(),
426                            };
427                            (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
428                                .into_response()
429                        }
430                    }
431                })
432            },
433        )
434    }
435
436    /// Get bridge statistics (static method for handler)
437    ///
438    /// TODO: Integrate into HTTP bridge admin endpoints when stats API is implemented
439    #[allow(dead_code)] // TODO: Remove when bridge stats endpoint is complete
440    async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
441        let stats = bridge.stats.lock().unwrap();
442        axum::response::Json(serde_json::json!({
443            "requests_served": stats.requests_served,
444            "requests_successful": stats.requests_successful,
445            "requests_failed": stats.requests_failed,
446            "available_services": stats.available_services
447        }))
448    }
449
450    /// List available services (static method for handler)
451    async fn list_services_static(
452        bridge: &Arc<HttpBridge>,
453    ) -> axum::response::Json<serde_json::Value> {
454        let services = bridge.proxy.service_names();
455        axum::response::Json(serde_json::json!({
456            "services": services
457        }))
458    }
459
460    /// Get OpenAPI spec (static method for handler)
461    async fn get_openapi_spec_static(
462        bridge: &Arc<HttpBridge>,
463    ) -> axum::response::Json<serde_json::Value> {
464        use crate::dynamic::proto_parser::ProtoService;
465        use std::collections::HashMap;
466
467        // Extract services from the service registry
468        let services: HashMap<String, ProtoService> = bridge
469            .proxy
470            .service_registry()
471            .services
472            .iter()
473            .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
474            .collect();
475
476        // Generate OpenAPI spec using the route generator
477        let spec = bridge.route_generator.generate_openapi_spec(&services);
478        axum::response::Json(spec)
479    }
480
481    /// Handle a bridge request by calling the appropriate gRPC method
482    async fn handle_bridge_request(
483        params: &BridgeRequestParams<'_>,
484    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
485        debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
486
487        // Parse JSON request body
488        let json_request: Value = if params.body.is_empty() {
489            Value::Null
490        } else {
491            serde_json::from_slice(&params.body).map_err(|e| {
492                Box::<dyn std::error::Error + Send + Sync>::from(format!(
493                    "Failed to parse JSON request: {}",
494                    e
495                ))
496            })?
497        };
498
499        // Call appropriate gRPC method based on streaming type
500        if params.server_streaming {
501            // Handle streaming response
502            Self::handle_streaming_request(
503                params.proxy,
504                params.converter,
505                params.service_name,
506                params.method_name,
507                json_request,
508            )
509            .await
510        } else {
511            // Handle unary request
512            Self::handle_unary_request(
513                params.proxy,
514                params.converter,
515                params.service_name,
516                params.method_name,
517                json_request,
518            )
519            .await
520        }
521    }
522
523    /// Handle unary request (no streaming)
524    async fn handle_unary_request(
525        proxy: &MockReflectionProxy,
526        _converter: &ProtobufJsonConverter,
527        service_name: &str,
528        method_name: &str,
529        json_request: Value,
530    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
531        // Get method descriptor from the service registry
532        let registry = proxy.service_registry();
533        let service_registry = registry.clone();
534
535        // Find the service and method
536        let service_opt = service_registry.get(service_name);
537        if service_opt.is_none() {
538            return Err(format!("Service '{}' not found", service_name).into());
539        }
540
541        let service = service_opt.unwrap();
542        let method_opt = service.service().methods.iter().find(|m| m.name == method_name);
543        if method_opt.is_none() {
544            return Err(format!(
545                "Method '{}' not found in service '{}'",
546                method_name, service_name
547            )
548            .into());
549        }
550
551        // For now, create a generic response since we don't have full descriptor integration
552        // In a complete implementation, this would:
553        // 1. Get input/output descriptor from proto parser
554        // 2. Convert JSON to protobuf message
555        // 3. Call the actual gRPC method via proxy
556        // 4. Convert protobuf response back to JSON
557
558        // Create a mock response for demonstration
559        let json_response = serde_json::json!({
560            "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
561            "request_data": json_request,
562            "timestamp": chrono::Utc::now().to_rfc3339()
563        });
564
565        Ok(BridgeResponse {
566            success: true,
567            data: Some(json_response),
568            error: None,
569            metadata: HashMap::new(),
570        })
571    }
572
573    /// Handle streaming request (returns SSE stream)
574    async fn handle_streaming_request(
575        _proxy: &MockReflectionProxy,
576        _converter: &ProtobufJsonConverter,
577        _service_name: &str,
578        _method_name: &str,
579        _json_request: Value,
580    ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
581        // For now, return an error indicating streaming is not yet implemented via HTTP
582        // Full streaming implementation would use Server-Sent Events
583        Err("Streaming responses via HTTP bridge are not yet implemented".into())
584    }
585}
586
587impl Clone for HttpBridge {
588    fn clone(&self) -> Self {
589        Self {
590            proxy: self.proxy.clone(),
591            route_generator: self.route_generator.clone(),
592            converter: self.converter.clone(),
593            config: self.config.clone(),
594            stats: self.stats.clone(),
595        }
596    }
597}
598
599#[cfg(test)]
600mod tests {
601
602    #[test]
603    fn test_module_compiles() {}
604}