1pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12 body::Bytes,
13 extract::{Path, Query, State},
14 http::Method,
15 response::{IntoResponse, Json},
16 routing::{get, post},
17 Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30type BridgeHandlerFn = dyn Fn(
32 State<Arc<HttpBridge>>,
33 Path<HashMap<String, String>>,
34 Query<BridgeQuery>,
35 Bytes,
36 ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37 + Send
38 + Sync;
39
40struct BridgeRequestParams<'a> {
42 proxy: &'a MockReflectionProxy,
43 converter: &'a ProtobufJsonConverter,
44 service_name: &'a str,
45 method_name: &'a str,
46 server_streaming: bool,
47 body: Bytes,
48}
49
50#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53 pub enabled: bool,
55 pub base_path: String,
57 pub enable_cors: bool,
59 pub max_request_size: usize,
61 pub timeout_seconds: u64,
63 pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68 fn default() -> Self {
69 Self {
70 enabled: true,
71 base_path: "/api".to_string(),
72 enable_cors: true,
73 max_request_size: 10 * 1024 * 1024, timeout_seconds: 30,
75 route_pattern: "/{service}/{method}".to_string(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83 #[serde(default)]
85 pub stream: Option<String>,
86 #[serde(flatten)]
88 pub metadata: HashMap<String, String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94 pub success: bool,
96 pub data: Option<T>,
98 pub error: Option<String>,
100 pub metadata: HashMap<String, String>,
102}
103
104#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107 pub requests_served: u64,
109 pub requests_successful: u64,
111 pub requests_failed: u64,
113 pub available_services: Vec<String>,
115}
116
117pub struct HttpBridge {
119 proxy: Arc<MockReflectionProxy>,
121 route_generator: RouteGenerator,
123 converter: ProtobufJsonConverter,
125 config: HttpBridgeConfig,
127 stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132 pub fn new(
134 proxy: Arc<MockReflectionProxy>,
135 config: HttpBridgeConfig,
136 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137 let route_generator = RouteGenerator::new(config.clone());
138 let converter =
139 ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140 let available_services = proxy.service_names();
141
142 let stats = BridgeStats {
143 requests_served: 0,
144 requests_successful: 0,
145 requests_failed: 0,
146 available_services,
147 };
148
149 Ok(Self {
150 proxy,
151 route_generator,
152 converter,
153 config,
154 stats: Arc::new(std::sync::Mutex::new(stats)),
155 })
156 }
157
158 pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160 let mut router = Router::new();
161
162 if self.config.enable_cors {
164 router = router.layer(
165 CorsLayer::new()
166 .allow_methods([
167 Method::GET,
168 Method::POST,
169 Method::PUT,
170 Method::DELETE,
171 Method::PATCH,
172 ])
173 .allow_headers(Any)
174 .allow_origin(Any),
175 );
176 }
177
178 let bridge = Arc::new(self.clone());
180 router = router.with_state(bridge);
181
182 router =
184 router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186 router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189 router =
191 router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193 router =
195 router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197 let registry = self.proxy.service_registry();
199
200 router =
203 router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204 router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206 let available_services = registry.service_names();
207 let total_methods =
208 registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209 info!(
210 "Created HTTP bridge router with {} services and {} dynamic endpoints",
211 available_services.len(),
212 total_methods
213 );
214
215 router
216 }
217
218 async fn health_check(
220 State(_bridge): State<Arc<HttpBridge>>,
221 ) -> axum::response::Json<serde_json::Value> {
222 axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223 }
224
225 async fn get_stats(
227 State(bridge): State<Arc<HttpBridge>>,
228 ) -> axum::response::Json<serde_json::Value> {
229 let stats = bridge.stats.lock().unwrap();
230 axum::response::Json(serde_json::json!({
231 "requests_served": stats.requests_served,
232 "requests_successful": stats.requests_successful,
233 "requests_failed": stats.requests_failed,
234 "available_services": stats.available_services
235 }))
236 }
237
238 async fn list_services(
240 State(bridge): State<Arc<HttpBridge>>,
241 ) -> axum::response::Json<serde_json::Value> {
242 Self::list_services_static(&bridge).await
243 }
244
245 async fn get_openapi_spec(
247 State(bridge): State<Arc<HttpBridge>>,
248 ) -> axum::response::Json<serde_json::Value> {
249 Self::get_openapi_spec_static(&bridge).await
250 }
251
252 async fn handle_generic_bridge_request(
254 State(state): State<Arc<HttpBridge>>,
255 Path(path_params): Path<HashMap<String, String>>,
256 _query: Query<BridgeQuery>,
257 body: Bytes,
258 ) -> axum::response::Response {
259 let service_name = match path_params.get("service") {
261 Some(name) => name,
262 None => {
263 let error_response = BridgeResponse::<Value> {
264 success: false,
265 data: None,
266 error: Some("Missing 'service' parameter in path".to_string()),
267 metadata: HashMap::new(),
268 };
269 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
270 }
271 };
272
273 let method_name = match path_params.get("method") {
274 Some(name) => name,
275 None => {
276 let error_response = BridgeResponse::<Value> {
277 success: false,
278 data: None,
279 error: Some("Missing 'method' parameter in path".to_string()),
280 metadata: HashMap::new(),
281 };
282 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
283 }
284 };
285
286 let registry = state.proxy.service_registry();
288 let service_opt = registry.get(service_name);
289 let method_info = if let Some(service) = service_opt {
290 service.service().methods.iter().find(|m| m.name == *method_name)
291 } else {
292 let error_response = BridgeResponse::<Value> {
293 success: false,
294 data: None,
295 error: Some(format!("Service '{}' not found", service_name)),
296 metadata: HashMap::new(),
297 };
298 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
299 };
300
301 let method_info = match method_info {
302 Some(method) => method,
303 None => {
304 let error_response = BridgeResponse::<Value> {
305 success: false,
306 data: None,
307 error: Some(format!(
308 "Method '{}' not found in service '{}'",
309 method_name, service_name
310 )),
311 metadata: HashMap::new(),
312 };
313 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
314 }
315 };
316
317 {
319 let mut stats = state.stats.lock().unwrap();
320 stats.requests_served += 1;
321 }
322
323 let params = BridgeRequestParams {
325 proxy: &state.proxy,
326 converter: &state.converter,
327 service_name: service_name.as_str(),
328 method_name: method_name.as_str(),
329 server_streaming: method_info.server_streaming,
330 body,
331 };
332 let result = Self::handle_bridge_request(¶ms).await;
333
334 match result {
335 Ok(response) => {
336 {
338 let mut stats = state.stats.lock().unwrap();
339 stats.requests_successful += 1;
340 }
341 (axum::http::StatusCode::OK, Json(response)).into_response()
342 }
343 Err(err) => {
344 {
346 let mut stats = state.stats.lock().unwrap();
347 stats.requests_failed += 1;
348 }
349 warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
350 let error_response = BridgeResponse::<Value> {
351 success: false,
352 data: None,
353 error: Some(err.to_string()),
354 metadata: HashMap::new(),
355 };
356 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
357 .into_response()
358 }
359 }
360 }
361
362 #[allow(dead_code)] fn create_bridge_handler(
367 &self,
368 service_name: String,
369 method_name: String,
370 _client_streaming: bool,
371 server_streaming: bool,
372 ) -> Box<BridgeHandlerFn> {
373 Box::new(
374 move |state: State<Arc<Self>>,
375 _path: Path<HashMap<String, String>>,
376 _query: Query<BridgeQuery>,
377 body: Bytes| {
378 let service_name = service_name.clone();
379 let method_name = method_name.clone();
380 let stats = state.stats.clone();
381 let proxy = state.proxy.clone();
382 let converter = state.converter.clone();
383
384 Box::pin(async move {
385 {
387 let mut stats = stats.lock().unwrap();
388 stats.requests_served += 1;
389 }
390
391 let params = BridgeRequestParams {
393 proxy: &proxy,
394 converter: &converter,
395 service_name: service_name.as_str(),
396 method_name: method_name.as_str(),
397 server_streaming,
398 body,
399 };
400 let result = Self::handle_bridge_request(¶ms).await;
401
402 match result {
403 Ok(response) => {
404 {
406 let mut stats = stats.lock().unwrap();
407 stats.requests_successful += 1;
408 }
409 (axum::http::StatusCode::OK, Json(response)).into_response()
410 }
411 Err(err) => {
412 {
414 let mut stats = stats.lock().unwrap();
415 stats.requests_failed += 1;
416 }
417 warn!(
418 "Bridge request failed for {}.{}: {}",
419 service_name, method_name, err
420 );
421 let error_response = BridgeResponse::<Value> {
422 success: false,
423 data: None,
424 error: Some(err.to_string()),
425 metadata: HashMap::new(),
426 };
427 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
428 .into_response()
429 }
430 }
431 })
432 },
433 )
434 }
435
436 #[allow(dead_code)] async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
441 let stats = bridge.stats.lock().unwrap();
442 axum::response::Json(serde_json::json!({
443 "requests_served": stats.requests_served,
444 "requests_successful": stats.requests_successful,
445 "requests_failed": stats.requests_failed,
446 "available_services": stats.available_services
447 }))
448 }
449
450 async fn list_services_static(
452 bridge: &Arc<HttpBridge>,
453 ) -> axum::response::Json<serde_json::Value> {
454 let services = bridge.proxy.service_names();
455 axum::response::Json(serde_json::json!({
456 "services": services
457 }))
458 }
459
460 async fn get_openapi_spec_static(
462 bridge: &Arc<HttpBridge>,
463 ) -> axum::response::Json<serde_json::Value> {
464 use crate::dynamic::proto_parser::ProtoService;
465 use std::collections::HashMap;
466
467 let services: HashMap<String, ProtoService> = bridge
469 .proxy
470 .service_registry()
471 .services
472 .iter()
473 .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
474 .collect();
475
476 let spec = bridge.route_generator.generate_openapi_spec(&services);
478 axum::response::Json(spec)
479 }
480
481 async fn handle_bridge_request(
483 params: &BridgeRequestParams<'_>,
484 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
485 debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
486
487 let json_request: Value = if params.body.is_empty() {
489 Value::Null
490 } else {
491 serde_json::from_slice(¶ms.body).map_err(|e| {
492 Box::<dyn std::error::Error + Send + Sync>::from(format!(
493 "Failed to parse JSON request: {}",
494 e
495 ))
496 })?
497 };
498
499 if params.server_streaming {
501 Self::handle_streaming_request(
503 params.proxy,
504 params.converter,
505 params.service_name,
506 params.method_name,
507 json_request,
508 )
509 .await
510 } else {
511 Self::handle_unary_request(
513 params.proxy,
514 params.converter,
515 params.service_name,
516 params.method_name,
517 json_request,
518 )
519 .await
520 }
521 }
522
523 async fn handle_unary_request(
525 proxy: &MockReflectionProxy,
526 _converter: &ProtobufJsonConverter,
527 service_name: &str,
528 method_name: &str,
529 json_request: Value,
530 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
531 let registry = proxy.service_registry();
533 let service_registry = registry.clone();
534
535 let service_opt = service_registry.get(service_name);
537 if service_opt.is_none() {
538 return Err(format!("Service '{}' not found", service_name).into());
539 }
540
541 let service = service_opt.unwrap();
542 let method_opt = service.service().methods.iter().find(|m| m.name == method_name);
543 if method_opt.is_none() {
544 return Err(format!(
545 "Method '{}' not found in service '{}'",
546 method_name, service_name
547 )
548 .into());
549 }
550
551 let json_response = serde_json::json!({
560 "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
561 "request_data": json_request,
562 "timestamp": chrono::Utc::now().to_rfc3339()
563 });
564
565 Ok(BridgeResponse {
566 success: true,
567 data: Some(json_response),
568 error: None,
569 metadata: HashMap::new(),
570 })
571 }
572
573 async fn handle_streaming_request(
575 _proxy: &MockReflectionProxy,
576 _converter: &ProtobufJsonConverter,
577 _service_name: &str,
578 _method_name: &str,
579 _json_request: Value,
580 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
581 Err("Streaming responses via HTTP bridge are not yet implemented".into())
584 }
585}
586
587impl Clone for HttpBridge {
588 fn clone(&self) -> Self {
589 Self {
590 proxy: self.proxy.clone(),
591 route_generator: self.route_generator.clone(),
592 converter: self.converter.clone(),
593 config: self.config.clone(),
594 stats: self.stats.clone(),
595 }
596 }
597}
598
599#[cfg(test)]
600mod tests {
601
602 #[test]
603 fn test_module_compiles() {}
604}