1pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12 body::Bytes,
13 extract::{Path, Query, State},
14 http::Method,
15 response::{IntoResponse, Json},
16 routing::{get, post},
17 Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30type BridgeHandlerFn = dyn Fn(
32 State<Arc<HttpBridge>>,
33 Path<HashMap<String, String>>,
34 Query<BridgeQuery>,
35 Bytes,
36 ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37 + Send
38 + Sync;
39
40struct BridgeRequestParams<'a> {
42 proxy: &'a MockReflectionProxy,
43 converter: &'a ProtobufJsonConverter,
44 service_name: &'a str,
45 method_name: &'a str,
46 server_streaming: bool,
47 body: Bytes,
48}
49
50#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53 pub enabled: bool,
55 pub base_path: String,
57 pub enable_cors: bool,
59 pub max_request_size: usize,
61 pub timeout_seconds: u64,
63 pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68 fn default() -> Self {
69 Self {
70 enabled: true,
71 base_path: "/api".to_string(),
72 enable_cors: true,
73 max_request_size: 10 * 1024 * 1024, timeout_seconds: 30,
75 route_pattern: "/{service}/{method}".to_string(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83 #[serde(default)]
85 pub stream: Option<String>,
86 #[serde(flatten)]
88 pub metadata: HashMap<String, String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94 pub success: bool,
96 pub data: Option<T>,
98 pub error: Option<String>,
100 pub metadata: HashMap<String, String>,
102}
103
104#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107 pub requests_served: u64,
109 pub requests_successful: u64,
111 pub requests_failed: u64,
113 pub available_services: Vec<String>,
115}
116
117pub struct HttpBridge {
119 proxy: Arc<MockReflectionProxy>,
121 route_generator: RouteGenerator,
123 converter: ProtobufJsonConverter,
125 config: HttpBridgeConfig,
127 stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132 pub fn new(
134 proxy: Arc<MockReflectionProxy>,
135 config: HttpBridgeConfig,
136 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137 let route_generator = RouteGenerator::new(config.clone());
138 let converter =
139 ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140 let available_services = proxy.service_names();
141
142 let stats = BridgeStats {
143 requests_served: 0,
144 requests_successful: 0,
145 requests_failed: 0,
146 available_services,
147 };
148
149 Ok(Self {
150 proxy,
151 route_generator,
152 converter,
153 config,
154 stats: Arc::new(std::sync::Mutex::new(stats)),
155 })
156 }
157
158 pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160 let mut router = Router::new();
161
162 if self.config.enable_cors {
164 router = router.layer(
165 CorsLayer::new()
166 .allow_methods([
167 Method::GET,
168 Method::POST,
169 Method::PUT,
170 Method::DELETE,
171 Method::PATCH,
172 ])
173 .allow_headers(Any)
174 .allow_origin(Any),
175 );
176 }
177
178 let bridge = Arc::new(self.clone());
180 router = router.with_state(bridge);
181
182 router =
184 router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186 router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189 router =
191 router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193 router =
195 router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197 let registry = self.proxy.service_registry();
199
200 router =
203 router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204 router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206 let available_services = registry.service_names();
207 let total_methods =
208 registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209 info!(
210 "Created HTTP bridge router with {} services and {} dynamic endpoints",
211 available_services.len(),
212 total_methods
213 );
214
215 router
216 }
217
218 async fn health_check(
220 State(_bridge): State<Arc<HttpBridge>>,
221 ) -> axum::response::Json<serde_json::Value> {
222 axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223 }
224
225 async fn get_stats(
227 State(bridge): State<Arc<HttpBridge>>,
228 ) -> axum::response::Json<serde_json::Value> {
229 let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
231 warn!("Statistics mutex is poisoned, using default values");
232 poisoned.into_inner()
233 });
234 axum::response::Json(serde_json::json!({
235 "requests_served": stats.requests_served,
236 "requests_successful": stats.requests_successful,
237 "requests_failed": stats.requests_failed,
238 "available_services": stats.available_services
239 }))
240 }
241
242 async fn list_services(
244 State(bridge): State<Arc<HttpBridge>>,
245 ) -> axum::response::Json<serde_json::Value> {
246 Self::list_services_static(&bridge).await
247 }
248
249 async fn get_openapi_spec(
251 State(bridge): State<Arc<HttpBridge>>,
252 ) -> axum::response::Json<serde_json::Value> {
253 Self::get_openapi_spec_static(&bridge).await
254 }
255
256 async fn handle_generic_bridge_request(
258 State(state): State<Arc<HttpBridge>>,
259 Path(path_params): Path<HashMap<String, String>>,
260 _query: Query<BridgeQuery>,
261 body: Bytes,
262 ) -> axum::response::Response {
263 let service_name = match path_params.get("service") {
265 Some(name) => name,
266 None => {
267 let error_response = BridgeResponse::<Value> {
268 success: false,
269 data: None,
270 error: Some("Missing 'service' parameter in path".to_string()),
271 metadata: HashMap::new(),
272 };
273 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
274 }
275 };
276
277 let method_name = match path_params.get("method") {
278 Some(name) => name,
279 None => {
280 let error_response = BridgeResponse::<Value> {
281 success: false,
282 data: None,
283 error: Some("Missing 'method' parameter in path".to_string()),
284 metadata: HashMap::new(),
285 };
286 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
287 }
288 };
289
290 let registry = state.proxy.service_registry();
292 let service_opt = registry.get(service_name);
293 let method_info = if let Some(service) = service_opt {
294 service.service().methods.iter().find(|m| m.name == *method_name)
295 } else {
296 let error_response = BridgeResponse::<Value> {
297 success: false,
298 data: None,
299 error: Some(format!("Service '{}' not found", service_name)),
300 metadata: HashMap::new(),
301 };
302 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
303 };
304
305 let method_info = match method_info {
306 Some(method) => method,
307 None => {
308 let error_response = BridgeResponse::<Value> {
309 success: false,
310 data: None,
311 error: Some(format!(
312 "Method '{}' not found in service '{}'",
313 method_name, service_name
314 )),
315 metadata: HashMap::new(),
316 };
317 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
318 }
319 };
320
321 {
323 if let Ok(mut stats) = state.stats.lock() {
324 stats.requests_served += 1;
325 } else {
326 warn!("Failed to update request stats (mutex poisoned)");
327 }
328 }
329
330 let params = BridgeRequestParams {
332 proxy: &state.proxy,
333 converter: &state.converter,
334 service_name: service_name.as_str(),
335 method_name: method_name.as_str(),
336 server_streaming: method_info.server_streaming,
337 body,
338 };
339 let result = Self::handle_bridge_request(¶ms).await;
340
341 match result {
342 Ok(response) => {
343 {
345 if let Ok(mut stats) = state.stats.lock() {
346 stats.requests_successful += 1;
347 } else {
348 warn!("Failed to update success stats (mutex poisoned)");
349 }
350 }
351 (axum::http::StatusCode::OK, Json(response)).into_response()
352 }
353 Err(err) => {
354 {
356 if let Ok(mut stats) = state.stats.lock() {
357 stats.requests_failed += 1;
358 } else {
359 warn!("Failed to update failure stats (mutex poisoned)");
360 }
361 }
362 warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
363 let error_response = BridgeResponse::<Value> {
364 success: false,
365 data: None,
366 error: Some(err.to_string()),
367 metadata: HashMap::new(),
368 };
369 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
370 .into_response()
371 }
372 }
373 }
374
375 #[allow(dead_code)] fn create_bridge_handler(
380 &self,
381 service_name: String,
382 method_name: String,
383 _client_streaming: bool,
384 server_streaming: bool,
385 ) -> Box<BridgeHandlerFn> {
386 Box::new(
387 move |state: State<Arc<Self>>,
388 _path: Path<HashMap<String, String>>,
389 _query: Query<BridgeQuery>,
390 body: Bytes| {
391 let service_name = service_name.clone();
392 let method_name = method_name.clone();
393 let stats = state.stats.clone();
394 let proxy = state.proxy.clone();
395 let converter = state.converter.clone();
396
397 Box::pin(async move {
398 {
400 if let Ok(mut stats) = stats.lock() {
401 stats.requests_served += 1;
402 } else {
403 warn!("Failed to update request stats (mutex poisoned)");
404 }
405 }
406
407 let params = BridgeRequestParams {
409 proxy: &proxy,
410 converter: &converter,
411 service_name: service_name.as_str(),
412 method_name: method_name.as_str(),
413 server_streaming,
414 body,
415 };
416 let result = Self::handle_bridge_request(¶ms).await;
417
418 match result {
419 Ok(response) => {
420 {
422 if let Ok(mut stats) = stats.lock() {
423 stats.requests_successful += 1;
424 } else {
425 warn!("Failed to update success stats (mutex poisoned)");
426 }
427 }
428 (axum::http::StatusCode::OK, Json(response)).into_response()
429 }
430 Err(err) => {
431 {
433 if let Ok(mut stats) = stats.lock() {
434 stats.requests_failed += 1;
435 } else {
436 warn!("Failed to update failure stats (mutex poisoned)");
437 }
438 }
439 warn!(
440 "Bridge request failed for {}.{}: {}",
441 service_name, method_name, err
442 );
443 let error_response = BridgeResponse::<Value> {
444 success: false,
445 data: None,
446 error: Some(err.to_string()),
447 metadata: HashMap::new(),
448 };
449 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
450 .into_response()
451 }
452 }
453 })
454 },
455 )
456 }
457
458 #[allow(dead_code)] async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
463 let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
465 warn!("Statistics mutex is poisoned, using default values");
466 poisoned.into_inner()
467 });
468 axum::response::Json(serde_json::json!({
469 "requests_served": stats.requests_served,
470 "requests_successful": stats.requests_successful,
471 "requests_failed": stats.requests_failed,
472 "available_services": stats.available_services
473 }))
474 }
475
476 async fn list_services_static(
478 bridge: &Arc<HttpBridge>,
479 ) -> axum::response::Json<serde_json::Value> {
480 let services = bridge.proxy.service_names();
481 axum::response::Json(serde_json::json!({
482 "services": services
483 }))
484 }
485
486 async fn get_openapi_spec_static(
488 bridge: &Arc<HttpBridge>,
489 ) -> axum::response::Json<serde_json::Value> {
490 use crate::dynamic::proto_parser::ProtoService;
491 use std::collections::HashMap;
492
493 let services: HashMap<String, ProtoService> = bridge
495 .proxy
496 .service_registry()
497 .services
498 .iter()
499 .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
500 .collect();
501
502 let spec = bridge.route_generator.generate_openapi_spec(&services);
504 axum::response::Json(spec)
505 }
506
507 async fn handle_bridge_request(
509 params: &BridgeRequestParams<'_>,
510 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
511 debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
512
513 let json_request: Value = if params.body.is_empty() {
515 Value::Null
516 } else {
517 serde_json::from_slice(¶ms.body).map_err(|e| {
518 Box::<dyn std::error::Error + Send + Sync>::from(format!(
519 "Failed to parse JSON request: {}",
520 e
521 ))
522 })?
523 };
524
525 if params.server_streaming {
527 Self::handle_streaming_request(
529 params.proxy,
530 params.converter,
531 params.service_name,
532 params.method_name,
533 json_request,
534 )
535 .await
536 } else {
537 Self::handle_unary_request(
539 params.proxy,
540 params.converter,
541 params.service_name,
542 params.method_name,
543 json_request,
544 )
545 .await
546 }
547 }
548
549 async fn handle_unary_request(
551 proxy: &MockReflectionProxy,
552 _converter: &ProtobufJsonConverter,
553 service_name: &str,
554 method_name: &str,
555 json_request: Value,
556 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
557 let registry = proxy.service_registry();
559 let service_registry = registry.clone();
560
561 let service = match service_registry.get(service_name) {
563 Some(s) => s,
564 None => {
565 return Err(format!("Service '{}' not found", service_name).into());
566 }
567 };
568
569 let method = match service.service().methods.iter().find(|m| m.name == method_name) {
570 Some(m) => m,
571 None => {
572 return Err(format!(
573 "Method '{}' not found in service '{}'",
574 method_name, service_name
575 )
576 .into());
577 }
578 };
579
580 let _method = method;
582
583 let json_response = serde_json::json!({
592 "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
593 "request_data": json_request,
594 "timestamp": chrono::Utc::now().to_rfc3339()
595 });
596
597 Ok(BridgeResponse {
598 success: true,
599 data: Some(json_response),
600 error: None,
601 metadata: HashMap::new(),
602 })
603 }
604
605 async fn handle_streaming_request(
607 _proxy: &MockReflectionProxy,
608 _converter: &ProtobufJsonConverter,
609 _service_name: &str,
610 _method_name: &str,
611 _json_request: Value,
612 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
613 Err("Streaming responses via HTTP bridge are not yet implemented".into())
616 }
617}
618
619impl Clone for HttpBridge {
620 fn clone(&self) -> Self {
621 Self {
622 proxy: self.proxy.clone(),
623 route_generator: self.route_generator.clone(),
624 converter: self.converter.clone(),
625 config: self.config.clone(),
626 stats: self.stats.clone(),
627 }
628 }
629}
630
631#[cfg(test)]
632mod tests {
633
634 #[test]
635 fn test_module_compiles() {}
636}