1pub mod converters;
7pub mod handlers;
8pub mod route_generator;
9
10use crate::reflection::MockReflectionProxy;
11use axum::{
12 body::Bytes,
13 extract::{Path, Query, State},
14 http::Method,
15 response::{IntoResponse, Json},
16 routing::{get, post},
17 Router,
18};
19use converters::ProtobufJsonConverter;
20use route_generator::RouteGenerator;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::{debug, info, warn};
29
30type BridgeHandlerFn = dyn Fn(
32 State<Arc<HttpBridge>>,
33 Path<HashMap<String, String>>,
34 Query<BridgeQuery>,
35 Bytes,
36 ) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
37 + Send
38 + Sync;
39
40struct BridgeRequestParams<'a> {
42 proxy: &'a MockReflectionProxy,
43 converter: &'a ProtobufJsonConverter,
44 service_name: &'a str,
45 method_name: &'a str,
46 server_streaming: bool,
47 body: Bytes,
48}
49
50#[derive(Debug, Clone)]
52pub struct HttpBridgeConfig {
53 pub enabled: bool,
55 pub base_path: String,
57 pub enable_cors: bool,
59 pub max_request_size: usize,
61 pub timeout_seconds: u64,
63 pub route_pattern: String,
65}
66
67impl Default for HttpBridgeConfig {
68 fn default() -> Self {
69 Self {
70 enabled: true,
71 base_path: "/api".to_string(),
72 enable_cors: true,
73 max_request_size: 10 * 1024 * 1024, timeout_seconds: 30,
75 route_pattern: "/{service}/{method}".to_string(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct BridgeQuery {
83 #[serde(default)]
85 pub stream: Option<String>,
86 #[serde(flatten)]
88 pub metadata: HashMap<String, String>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BridgeResponse<T> {
94 pub success: bool,
96 pub data: Option<T>,
98 pub error: Option<String>,
100 pub metadata: HashMap<String, String>,
102}
103
104#[derive(Debug, Serialize, Clone)]
106pub struct BridgeStats {
107 pub requests_served: u64,
109 pub requests_successful: u64,
111 pub requests_failed: u64,
113 pub available_services: Vec<String>,
115}
116
117pub struct HttpBridge {
119 proxy: Arc<MockReflectionProxy>,
121 route_generator: RouteGenerator,
123 converter: ProtobufJsonConverter,
125 config: HttpBridgeConfig,
127 stats: Arc<std::sync::Mutex<BridgeStats>>,
129}
130
131impl HttpBridge {
132 pub fn new(
134 proxy: Arc<MockReflectionProxy>,
135 config: HttpBridgeConfig,
136 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
137 let route_generator = RouteGenerator::new(config.clone());
138 let converter =
139 ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
140 let available_services = proxy.service_names();
141
142 let stats = BridgeStats {
143 requests_served: 0,
144 requests_successful: 0,
145 requests_failed: 0,
146 available_services,
147 };
148
149 Ok(Self {
150 proxy,
151 route_generator,
152 converter,
153 config,
154 stats: Arc::new(std::sync::Mutex::new(stats)),
155 })
156 }
157
158 pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
160 let mut router = Router::new();
161
162 if self.config.enable_cors {
164 router = router.layer(
165 CorsLayer::new()
166 .allow_methods([
167 Method::GET,
168 Method::POST,
169 Method::PUT,
170 Method::DELETE,
171 Method::PATCH,
172 ])
173 .allow_headers(Any)
174 .allow_origin(Any),
175 );
176 }
177
178 let bridge = Arc::new(self.clone());
180 router = router.with_state(bridge);
181
182 router =
184 router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
185
186 router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
188
189 router =
191 router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
192
193 router =
195 router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
196
197 let registry = self.proxy.service_registry();
199
200 router =
203 router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
204 router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
205
206 let available_services = registry.service_names();
207 let total_methods =
208 registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
209 info!(
210 "Created HTTP bridge router with {} services and {} dynamic endpoints",
211 available_services.len(),
212 total_methods
213 );
214
215 router
216 }
217
218 async fn health_check(State(_bridge): State<Arc<HttpBridge>>) -> Json<Value> {
220 Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
221 }
222
223 async fn get_stats(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
225 let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
227 warn!("Statistics mutex is poisoned, using default values");
228 poisoned.into_inner()
229 });
230 Json(serde_json::json!({
231 "requests_served": stats.requests_served,
232 "requests_successful": stats.requests_successful,
233 "requests_failed": stats.requests_failed,
234 "available_services": stats.available_services
235 }))
236 }
237
238 async fn list_services(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
240 Self::list_services_static(&bridge).await
241 }
242
243 async fn get_openapi_spec(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
245 Self::get_openapi_spec_static(&bridge).await
246 }
247
248 async fn handle_generic_bridge_request(
250 State(state): State<Arc<HttpBridge>>,
251 Path(path_params): Path<HashMap<String, String>>,
252 _query: Query<BridgeQuery>,
253 body: Bytes,
254 ) -> axum::response::Response {
255 let service_name = match path_params.get("service") {
257 Some(name) => name,
258 None => {
259 let error_response = BridgeResponse::<Value> {
260 success: false,
261 data: None,
262 error: Some("Missing 'service' parameter in path".to_string()),
263 metadata: HashMap::new(),
264 };
265 return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
266 }
267 };
268
269 let method_name = match path_params.get("method") {
270 Some(name) => name,
271 None => {
272 let error_response = BridgeResponse::<Value> {
273 success: false,
274 data: None,
275 error: Some("Missing 'method' parameter in path".to_string()),
276 metadata: HashMap::new(),
277 };
278 return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
279 }
280 };
281
282 let registry = state.proxy.service_registry();
284 let service_opt = registry.get(service_name);
285 let method_info = if let Some(service) = service_opt {
286 service.service().methods.iter().find(|m| m.name == *method_name)
287 } else {
288 let error_response = BridgeResponse::<Value> {
289 success: false,
290 data: None,
291 error: Some(format!("Service '{}' not found", service_name)),
292 metadata: HashMap::new(),
293 };
294 return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
295 };
296
297 let method_info = match method_info {
298 Some(method) => method,
299 None => {
300 let error_response = BridgeResponse::<Value> {
301 success: false,
302 data: None,
303 error: Some(format!(
304 "Method '{}' not found in service '{}'",
305 method_name, service_name
306 )),
307 metadata: HashMap::new(),
308 };
309 return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
310 }
311 };
312
313 {
315 if let Ok(mut stats) = state.stats.lock() {
316 stats.requests_served += 1;
317 } else {
318 warn!("Failed to update request stats (mutex poisoned)");
319 }
320 }
321
322 let params = BridgeRequestParams {
324 proxy: &state.proxy,
325 converter: &state.converter,
326 service_name: service_name.as_str(),
327 method_name: method_name.as_str(),
328 server_streaming: method_info.server_streaming,
329 body,
330 };
331 let result = Self::handle_bridge_request(¶ms).await;
332
333 match result {
334 Ok(response) => {
335 {
337 if let Ok(mut stats) = state.stats.lock() {
338 stats.requests_successful += 1;
339 } else {
340 warn!("Failed to update success stats (mutex poisoned)");
341 }
342 }
343 (http::StatusCode::OK, Json(response)).into_response()
344 }
345 Err(err) => {
346 {
348 if let Ok(mut stats) = state.stats.lock() {
349 stats.requests_failed += 1;
350 } else {
351 warn!("Failed to update failure stats (mutex poisoned)");
352 }
353 }
354 warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
355 let error_response = BridgeResponse::<Value> {
356 success: false,
357 data: None,
358 error: Some(err.to_string()),
359 metadata: HashMap::new(),
360 };
361 (http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
362 }
363 }
364 }
365
366 #[allow(dead_code)] fn create_bridge_handler(
371 &self,
372 service_name: String,
373 method_name: String,
374 _client_streaming: bool,
375 server_streaming: bool,
376 ) -> Box<BridgeHandlerFn> {
377 Box::new(
378 move |state: State<Arc<Self>>,
379 _path: Path<HashMap<String, String>>,
380 _query: Query<BridgeQuery>,
381 body: Bytes| {
382 let service_name = service_name.clone();
383 let method_name = method_name.clone();
384 let stats = state.stats.clone();
385 let proxy = state.proxy.clone();
386 let converter = state.converter.clone();
387
388 Box::pin(async move {
389 {
391 if let Ok(mut stats) = stats.lock() {
392 stats.requests_served += 1;
393 } else {
394 warn!("Failed to update request stats (mutex poisoned)");
395 }
396 }
397
398 let params = BridgeRequestParams {
400 proxy: &proxy,
401 converter: &converter,
402 service_name: service_name.as_str(),
403 method_name: method_name.as_str(),
404 server_streaming,
405 body,
406 };
407 let result = Self::handle_bridge_request(¶ms).await;
408
409 match result {
410 Ok(response) => {
411 {
413 if let Ok(mut stats) = stats.lock() {
414 stats.requests_successful += 1;
415 } else {
416 warn!("Failed to update success stats (mutex poisoned)");
417 }
418 }
419 (http::StatusCode::OK, Json(response)).into_response()
420 }
421 Err(err) => {
422 {
424 if let Ok(mut stats) = stats.lock() {
425 stats.requests_failed += 1;
426 } else {
427 warn!("Failed to update failure stats (mutex poisoned)");
428 }
429 }
430 warn!(
431 "Bridge request failed for {}.{}: {}",
432 service_name, method_name, err
433 );
434 let error_response = BridgeResponse::<Value> {
435 success: false,
436 data: None,
437 error: Some(err.to_string()),
438 metadata: HashMap::new(),
439 };
440 (http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
441 .into_response()
442 }
443 }
444 })
445 },
446 )
447 }
448
449 #[allow(dead_code)] async fn get_stats_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
454 let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
456 warn!("Statistics mutex is poisoned, using default values");
457 poisoned.into_inner()
458 });
459 Json(serde_json::json!({
460 "requests_served": stats.requests_served,
461 "requests_successful": stats.requests_successful,
462 "requests_failed": stats.requests_failed,
463 "available_services": stats.available_services
464 }))
465 }
466
467 async fn list_services_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
469 let services = bridge.proxy.service_names();
470 Json(serde_json::json!({
471 "services": services
472 }))
473 }
474
475 async fn get_openapi_spec_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
477 use crate::dynamic::proto_parser::ProtoService;
478 use std::collections::HashMap;
479
480 let services: HashMap<String, ProtoService> = bridge
482 .proxy
483 .service_registry()
484 .services
485 .iter()
486 .map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
487 .collect();
488
489 let spec = bridge.route_generator.generate_openapi_spec(&services);
491 Json(spec)
492 }
493
494 async fn handle_bridge_request(
496 params: &BridgeRequestParams<'_>,
497 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
498 debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
499
500 let json_request: Value = if params.body.is_empty() {
502 Value::Null
503 } else {
504 serde_json::from_slice(¶ms.body).map_err(|e| {
505 Box::<dyn std::error::Error + Send + Sync>::from(format!(
506 "Failed to parse JSON request: {}",
507 e
508 ))
509 })?
510 };
511
512 if params.server_streaming {
514 Self::handle_streaming_request(
516 params.proxy,
517 params.converter,
518 params.service_name,
519 params.method_name,
520 json_request,
521 )
522 .await
523 } else {
524 Self::handle_unary_request(
526 params.proxy,
527 params.converter,
528 params.service_name,
529 params.method_name,
530 json_request,
531 )
532 .await
533 }
534 }
535
536 async fn handle_unary_request(
538 proxy: &MockReflectionProxy,
539 _converter: &ProtobufJsonConverter,
540 service_name: &str,
541 method_name: &str,
542 json_request: Value,
543 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
544 let registry = proxy.service_registry();
546 let service_registry = registry.clone();
547
548 let service = match service_registry.get(service_name) {
550 Some(s) => s,
551 None => {
552 return Err(format!("Service '{}' not found", service_name).into());
553 }
554 };
555
556 let method = match service.service().methods.iter().find(|m| m.name == method_name) {
557 Some(m) => m,
558 None => {
559 return Err(format!(
560 "Method '{}' not found in service '{}'",
561 method_name, service_name
562 )
563 .into());
564 }
565 };
566
567 let _method = method;
569
570 let json_response = serde_json::json!({
579 "message": format!("Hello! This is a mock response from {}.{} bridge", service_name, method_name),
580 "request_data": json_request,
581 "timestamp": chrono::Utc::now().to_rfc3339()
582 });
583
584 Ok(BridgeResponse {
585 success: true,
586 data: Some(json_response),
587 error: None,
588 metadata: HashMap::new(),
589 })
590 }
591
592 async fn handle_streaming_request(
594 proxy: &MockReflectionProxy,
595 _converter: &ProtobufJsonConverter,
596 service_name: &str,
597 method_name: &str,
598 json_request: Value,
599 ) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
600 let mut events = Vec::new();
603 for seq in 0..3 {
604 events.push(serde_json::json!({
605 "sequence": seq + 1,
606 "service": service_name,
607 "method": method_name,
608 "timestamp": chrono::Utc::now().to_rfc3339(),
609 "data": {
610 "message": format!("mock stream event {} from {}.{}", seq + 1, service_name, method_name),
611 "request_echo": json_request.clone()
612 }
613 }));
614 }
615
616 let mut metadata = HashMap::new();
617 metadata.insert("x-mockforge-streaming-mode".to_string(), "json-envelope".to_string());
618 metadata.insert("x-mockforge-stream-count".to_string(), "3".to_string());
619 metadata.insert(
620 "x-mockforge-service-count".to_string(),
621 proxy.service_names().len().to_string(),
622 );
623
624 Ok(BridgeResponse {
625 success: true,
626 data: Some(serde_json::json!({
627 "stream_type": "server",
628 "service": service_name,
629 "method": method_name,
630 "events": events
631 })),
632 error: None,
633 metadata,
634 })
635 }
636}
637
638impl Clone for HttpBridge {
639 fn clone(&self) -> Self {
640 Self {
641 proxy: self.proxy.clone(),
642 route_generator: self.route_generator.clone(),
643 converter: self.converter.clone(),
644 config: self.config.clone(),
645 stats: self.stats.clone(),
646 }
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 #[test]
653 fn test_module_compiles() {
654 }
656}