1pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12 body::Bytes,
13 extract::{Path, Query, State},
14 http::Method,
15 response::{IntoResponse, Json},
16 routing::{get, post},
17 Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30type BridgeHandlerFn = dyn Fn(
32 State<Arc<HttpBridge>>,
33 Path<HashMap<String, String>>,
34 Query<BridgeQuery>,
35 Bytes,
36 ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37 + Send
38 + Sync;
39
40struct BridgeRequestParams<'a> {
42 proxy: &'a MockReflectionProxy,
43 converter: &'a ProtobufJsonConverter,
44 service_name: &'a str,
45 method_name: &'a str,
46 server_streaming: bool,
47 body: Bytes,
48}
49
50#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53 pub enabled: bool,
55 pub base_path: String,
57 pub enable_cors: bool,
59 pub max_request_size: usize,
61 pub timeout_seconds: u64,
63 pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68 fn default() -> Self {
69 Self {
70 enabled: true,
71 base_path: "/api".to_string(),
72 enable_cors: true,
73 max_request_size: 10 * 1024 * 1024, timeout_seconds: 30,
75 route_pattern: "/{service}/{method}".to_string(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83 #[serde(default)]
85 pub stream: Option<String>,
86 #[serde(flatten)]
88 pub metadata: HashMap<String, String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94 pub success: bool,
96 pub data: Option<T>,
98 pub error: Option<String>,
100 pub metadata: HashMap<String, String>,
102}
103
104#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107 pub requests_served: u64,
109 pub requests_successful: u64,
111 pub requests_failed: u64,
113 pub available_services: Vec<String>,
115}
116
117pub struct HttpBridge {
119 proxy: Arc<MockReflectionProxy>,
121 route_generator: RouteGenerator,
123 converter: ProtobufJsonConverter,
125 config: HttpBridgeConfig,
127 stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132 pub fn new(
134 proxy: Arc<MockReflectionProxy>,
135 config: HttpBridgeConfig,
136 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137 let route_generator = RouteGenerator::new(config.clone());
138 let converter =
139 ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140 let available_services = proxy.service_names();
141
142 let stats = BridgeStats {
143 requests_served: 0,
144 requests_successful: 0,
145 requests_failed: 0,
146 available_services,
147 };
148
149 Ok(Self {
150 proxy,
151 route_generator,
152 converter,
153 config,
154 stats: Arc::new(std::sync::Mutex::new(stats)),
155 })
156 }
157
158 pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160 let mut router = Router::new();
161
162 if self.config.enable_cors {
164 router = router.layer(
165 CorsLayer::new()
166 .allow_methods([
167 Method::GET,
168 Method::POST,
169 Method::PUT,
170 Method::DELETE,
171 Method::PATCH,
172 ])
173 .allow_headers(Any)
174 .allow_origin(Any),
175 );
176 }
177
178 let bridge = Arc::new(self.clone());
180 router = router.with_state(bridge);
181
182 router =
184 router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186 router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189 router =
191 router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193 router =
195 router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197 let registry = self.proxy.service_registry();
199
200 router =
203 router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204 router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206 let available_services = registry.service_names();
207 let total_methods =
208 registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209 info!(
210 "Created HTTP bridge router with {} services and {} dynamic endpoints",
211 available_services.len(),
212 total_methods
213 );
214
215 router
216 }
217
218 async fn health_check(
220 State(_bridge): State<Arc<HttpBridge>>,
221 ) -> axum::response::Json<serde_json::Value> {
222 axum::response::Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
223 }
224
225 async fn get_stats(
227 State(bridge): State<Arc<HttpBridge>>,
228 ) -> axum::response::Json<serde_json::Value> {
229 let stats = bridge.stats.lock().unwrap();
230 axum::response::Json(serde_json::json!({
231 "requests_served": stats.requests_served,
232 "requests_successful": stats.requests_successful,
233 "requests_failed": stats.requests_failed,
234 "available_services": stats.available_services
235 }))
236 }
237
238 async fn list_services(
240 State(bridge): State<Arc<HttpBridge>>,
241 ) -> axum::response::Json<serde_json::Value> {
242 Self::list_services_static(&bridge).await
243 }
244
245 async fn get_openapi_spec(
247 State(bridge): State<Arc<HttpBridge>>,
248 ) -> axum::response::Json<serde_json::Value> {
249 Self::get_openapi_spec_static(&bridge).await
250 }
251
252 async fn handle_generic_bridge_request(
254 State(state): State<Arc<HttpBridge>>,
255 Path(path_params): Path<HashMap<String, String>>,
256 _query: Query<BridgeQuery>,
257 body: Bytes,
258 ) -> axum::response::Response {
259 let service_name = match path_params.get("service") {
261 Some(name) => name,
262 None => {
263 let error_response = BridgeResponse::<Value> {
264 success: false,
265 data: None,
266 error: Some("Missing 'service' parameter in path".to_string()),
267 metadata: HashMap::new(),
268 };
269 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
270 }
271 };
272
273 let method_name = match path_params.get("method") {
274 Some(name) => name,
275 None => {
276 let error_response = BridgeResponse::<Value> {
277 success: false,
278 data: None,
279 error: Some("Missing 'method' parameter in path".to_string()),
280 metadata: HashMap::new(),
281 };
282 return (axum::http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
283 }
284 };
285
286 let registry = state.proxy.service_registry();
288 let service_opt = registry.get(service_name);
289 let method_info = if let Some(service) = service_opt {
290 service.service().methods.iter().find(|m| m.name == *method_name)
291 } else {
292 let error_response = BridgeResponse::<Value> {
293 success: false,
294 data: None,
295 error: Some(format!("Service '{}' not found", service_name)),
296 metadata: HashMap::new(),
297 };
298 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
299 };
300
301 let method_info = match method_info {
302 Some(method) => method,
303 None => {
304 let error_response = BridgeResponse::<Value> {
305 success: false,
306 data: None,
307 error: Some(format!(
308 "Method '{}' not found in service '{}'",
309 method_name, service_name
310 )),
311 metadata: HashMap::new(),
312 };
313 return (axum::http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
314 }
315 };
316
317 {
319 let mut stats = state.stats.lock().unwrap();
320 stats.requests_served += 1;
321 }
322
323 let params = BridgeRequestParams {
325 proxy: &state.proxy,
326 converter: &state.converter,
327 service_name: service_name.as_str(),
328 method_name: method_name.as_str(),
329 server_streaming: method_info.server_streaming,
330 body,
331 };
332 let result = Self::handle_bridge_request(¶ms).await;
333
334 match result {
335 Ok(response) => {
336 {
338 let mut stats = state.stats.lock().unwrap();
339 stats.requests_successful += 1;
340 }
341 (axum::http::StatusCode::OK, Json(response)).into_response()
342 }
343 Err(err) => {
344 {
346 let mut stats = state.stats.lock().unwrap();
347 stats.requests_failed += 1;
348 }
349 warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
350 let error_response = BridgeResponse::<Value> {
351 success: false,
352 data: None,
353 error: Some(err.to_string()),
354 metadata: HashMap::new(),
355 };
356 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
357 .into_response()
358 }
359 }
360 }
361
362 #[allow(dead_code)] fn create_bridge_handler(
365 &self,
366 service_name: String,
367 method_name: String,
368 _client_streaming: bool,
369 server_streaming: bool,
370 ) -> Box<BridgeHandlerFn> {
371 Box::new(
372 move |state: State<Arc<Self>>,
373 _path: Path<HashMap<String, String>>,
374 _query: Query<BridgeQuery>,
375 body: Bytes| {
376 let service_name = service_name.clone();
377 let method_name = method_name.clone();
378 let stats = state.stats.clone();
379 let proxy = state.proxy.clone();
380 let converter = state.converter.clone();
381
382 Box::pin(async move {
383 {
385 let mut stats = stats.lock().unwrap();
386 stats.requests_served += 1;
387 }
388
389 let params = BridgeRequestParams {
391 proxy: &proxy,
392 converter: &converter,
393 service_name: service_name.as_str(),
394 method_name: method_name.as_str(),
395 server_streaming,
396 body,
397 };
398 let result = Self::handle_bridge_request(¶ms).await;
399
400 match result {
401 Ok(response) => {
402 {
404 let mut stats = stats.lock().unwrap();
405 stats.requests_successful += 1;
406 }
407 (axum::http::StatusCode::OK, Json(response)).into_response()
408 }
409 Err(err) => {
410 {
412 let mut stats = stats.lock().unwrap();
413 stats.requests_failed += 1;
414 }
415 warn!(
416 "Bridge request failed for {}.{}: {}",
417 service_name, method_name, err
418 );
419 let error_response = BridgeResponse::<Value> {
420 success: false,
421 data: None,
422 error: Some(err.to_string()),
423 metadata: HashMap::new(),
424 };
425 (axum::http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
426 .into_response()
427 }
428 }
429 })
430 },
431 )
432 }
433
434 #[allow(dead_code)] async fn get_stats_static(bridge: &Arc<HttpBridge>) -> axum::response::Json<serde_json::Value> {
437 let stats = bridge.stats.lock().unwrap();
438 axum::response::Json(serde_json::json!({
439 "requests_served": stats.requests_served,
440 "requests_successful": stats.requests_successful,
441 "requests_failed": stats.requests_failed,
442 "available_services": stats.available_services
443 }))
444 }
445
446 async fn list_services_static(
448 bridge: &Arc<HttpBridge>,
449 ) -> axum::response::Json<serde_json::Value> {
450 let services = bridge.proxy.service_names();
451 axum::response::Json(serde_json::json!({
452 "services": services
453 }))
454 }
455
456 async fn get_openapi_spec_static(
458 bridge: &Arc<HttpBridge>,
459 ) -> axum::response::Json<serde_json::Value> {
460 use crate::dynamic::proto_parser::ProtoService;
461 use std::collections::HashMap;
462
463 let services: HashMap<String, ProtoService> = bridge
465 .proxy
466 .service_registry()
467 .services
468 .iter()
469 .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
470 .collect();
471
472 let spec = bridge.route_generator.generate_openapi_spec(&services);
474 axum::response::Json(spec)
475 }
476
477 async fn handle_bridge_request(
479 params: &BridgeRequestParams<'_>,
480 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
481 debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
482
483 let json_request: Value = if params.body.is_empty() {
485 Value::Null
486 } else {
487 serde_json::from_slice(¶ms.body).map_err(|e| {
488 Box::<dyn std::error::Error + Send + Sync>::from(format!(
489 "Failed to parse JSON request: {}",
490 e
491 ))
492 })?
493 };
494
495 if params.server_streaming {
497 Self::handle_streaming_request(
499 params.proxy,
500 params.converter,
501 params.service_name,
502 params.method_name,
503 json_request,
504 )
505 .await
506 } else {
507 Self::handle_unary_request(
509 params.proxy,
510 params.converter,
511 params.service_name,
512 params.method_name,
513 json_request,
514 )
515 .await
516 }
517 }
518
519 async fn handle_unary_request(
521 proxy: &MockReflectionProxy,
522 _converter: &ProtobufJsonConverter,
523 service_name: &str,
524 method_name: &str,
525 json_request: Value,
526 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
527 let registry = proxy.service_registry();
529 let service_registry = registry.clone();
530
531 let service_opt = service_registry.get(service_name);
533 if service_opt.is_none() {
534 return Err(format!("Service '{}' not found", service_name).into());
535 }
536
537 let service = service_opt.unwrap();
538 let method_opt = service.service().methods.iter().find(|m| m.name == method_name);
539 if method_opt.is_none() {
540 return Err(format!(
541 "Method '{}' not found in service '{}'",
542 method_name, service_name
543 )
544 .into());
545 }
546
547 let json_response = serde_json::json!({
556 "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
557 "request_data": json_request,
558 "timestamp": chrono::Utc::now().to_rfc3339()
559 });
560
561 Ok(BridgeResponse {
562 success: true,
563 data: Some(json_response),
564 error: None,
565 metadata: HashMap::new(),
566 })
567 }
568
569 async fn handle_streaming_request(
571 _proxy: &MockReflectionProxy,
572 _converter: &ProtobufJsonConverter,
573 _service_name: &str,
574 _method_name: &str,
575 _json_request: Value,
576 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
577 Err("Streaming responses via HTTP bridge are not yet implemented".into())
580 }
581}
582
583impl Clone for HttpBridge {
584 fn clone(&self) -> Self {
585 Self {
586 proxy: self.proxy.clone(),
587 route_generator: self.route_generator.clone(),
588 converter: self.converter.clone(),
589 config: self.config.clone(),
590 stats: self.stats.clone(),
591 }
592 }
593}
594
595#[cfg(test)]
596mod tests {
597
598 #[test]
599 fn test_module_compiles() {}
600}