pub mod converters;
pub mod handlers;
pub mod route_generator;
use crate::reflection::MockReflectionProxy;
use axum::{
body::Bytes,
extract::{Path, Query, State},
http::Method,
response::{IntoResponse, Json},
routing::{get, post},
Router,
};
use converters::ProtobufJsonConverter;
use route_generator::RouteGenerator;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tower_http::cors::{Any, CorsLayer};
use tracing::{debug, info, warn};
type BridgeHandlerFn = dyn Fn(
State<Arc<HttpBridge>>,
Path<HashMap<String, String>>,
Query<BridgeQuery>,
Bytes,
) -> Pin<Box<dyn Future<Output = axum::response::Response> + Send>>
+ Send
+ Sync;
struct BridgeRequestParams<'a> {
proxy: &'a MockReflectionProxy,
converter: &'a ProtobufJsonConverter,
service_name: &'a str,
method_name: &'a str,
server_streaming: bool,
body: Bytes,
}
#[derive(Debug, Clone)]
pub struct HttpBridgeConfig {
pub enabled: bool,
pub base_path: String,
pub enable_cors: bool,
pub max_request_size: usize,
pub timeout_seconds: u64,
pub route_pattern: String,
}
impl Default for HttpBridgeConfig {
fn default() -> Self {
Self {
enabled: true,
base_path: "/api".to_string(),
enable_cors: true,
max_request_size: 10 * 1024 * 1024, timeout_seconds: 30,
route_pattern: "/{service}/{method}".to_string(),
}
}
}
#[derive(Debug, Deserialize)]
pub struct BridgeQuery {
#[serde(default)]
pub stream: Option<String>,
#[serde(flatten)]
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BridgeResponse<T> {
pub success: bool,
pub data: Option<T>,
pub error: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Serialize, Clone)]
pub struct BridgeStats {
pub requests_served: u64,
pub requests_successful: u64,
pub requests_failed: u64,
pub available_services: Vec<String>,
}
pub struct HttpBridge {
proxy: Arc<MockReflectionProxy>,
route_generator: RouteGenerator,
converter: ProtobufJsonConverter,
config: HttpBridgeConfig,
stats: Arc<std::sync::Mutex<BridgeStats>>,
}
impl HttpBridge {
pub fn new(
proxy: Arc<MockReflectionProxy>,
config: HttpBridgeConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let route_generator = RouteGenerator::new(config.clone());
let converter =
ProtobufJsonConverter::new(proxy.service_registry.descriptor_pool().clone());
let available_services = proxy.service_names();
let stats = BridgeStats {
requests_served: 0,
requests_successful: 0,
requests_failed: 0,
available_services,
};
Ok(Self {
proxy,
route_generator,
converter,
config,
stats: Arc::new(std::sync::Mutex::new(stats)),
})
}
pub fn create_router(&self) -> Router<Arc<HttpBridge>> {
let mut router = Router::new();
if self.config.enable_cors {
router = router.layer(
CorsLayer::new()
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::DELETE,
Method::PATCH,
])
.allow_headers(Any)
.allow_origin(Any),
);
}
let bridge = Arc::new(self.clone());
router = router.with_state(bridge);
router =
router.route(&format!("{}/health", self.config.base_path), get(Self::health_check));
router = router.route(&format!("{}/stats", self.config.base_path), get(Self::get_stats));
router =
router.route(&format!("{}/services", self.config.base_path), get(Self::list_services));
router =
router.route(&format!("{}/docs", self.config.base_path), get(Self::get_openapi_spec));
let registry = self.proxy.service_registry();
router =
router.route(&self.config.route_pattern, post(Self::handle_generic_bridge_request));
router = router.route(&self.config.route_pattern, get(Self::handle_generic_bridge_request));
let available_services = registry.service_names();
let total_methods =
registry.services.values().map(|s| s.service().methods.len()).sum::<usize>();
info!(
"Created HTTP bridge router with {} services and {} dynamic endpoints",
available_services.len(),
total_methods
);
router
}
async fn health_check(State(_bridge): State<Arc<HttpBridge>>) -> Json<Value> {
Json(serde_json::json!({"status": "ok", "bridge": "healthy"}))
}
async fn get_stats(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
warn!("Statistics mutex is poisoned, using default values");
poisoned.into_inner()
});
Json(serde_json::json!({
"requests_served": stats.requests_served,
"requests_successful": stats.requests_successful,
"requests_failed": stats.requests_failed,
"available_services": stats.available_services
}))
}
async fn list_services(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
Self::list_services_static(&bridge).await
}
async fn get_openapi_spec(State(bridge): State<Arc<HttpBridge>>) -> Json<Value> {
Self::get_openapi_spec_static(&bridge).await
}
async fn handle_generic_bridge_request(
State(state): State<Arc<HttpBridge>>,
Path(path_params): Path<HashMap<String, String>>,
_query: Query<BridgeQuery>,
body: Bytes,
) -> axum::response::Response {
let service_name = match path_params.get("service") {
Some(name) => name,
None => {
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some("Missing 'service' parameter in path".to_string()),
metadata: HashMap::new(),
};
return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
}
};
let method_name = match path_params.get("method") {
Some(name) => name,
None => {
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some("Missing 'method' parameter in path".to_string()),
metadata: HashMap::new(),
};
return (http::StatusCode::BAD_REQUEST, Json(error_response)).into_response();
}
};
let registry = state.proxy.service_registry();
let service_opt = registry.get(service_name);
let method_info = if let Some(service) = service_opt {
service.service().methods.iter().find(|m| m.name == *method_name)
} else {
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some(format!("Service '{}' not found", service_name)),
metadata: HashMap::new(),
};
return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
};
let method_info = match method_info {
Some(method) => method,
None => {
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some(format!(
"Method '{}' not found in service '{}'",
method_name, service_name
)),
metadata: HashMap::new(),
};
return (http::StatusCode::NOT_FOUND, Json(error_response)).into_response();
}
};
{
if let Ok(mut stats) = state.stats.lock() {
stats.requests_served += 1;
} else {
warn!("Failed to update request stats (mutex poisoned)");
}
}
let params = BridgeRequestParams {
proxy: &state.proxy,
converter: &state.converter,
service_name: service_name.as_str(),
method_name: method_name.as_str(),
server_streaming: method_info.server_streaming,
body,
};
let result = Self::handle_bridge_request(¶ms).await;
match result {
Ok(response) => {
{
if let Ok(mut stats) = state.stats.lock() {
stats.requests_successful += 1;
} else {
warn!("Failed to update success stats (mutex poisoned)");
}
}
(http::StatusCode::OK, Json(response)).into_response()
}
Err(err) => {
{
if let Ok(mut stats) = state.stats.lock() {
stats.requests_failed += 1;
} else {
warn!("Failed to update failure stats (mutex poisoned)");
}
}
warn!("Bridge request failed for {}.{}: {}", service_name, method_name, err);
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some(err.to_string()),
metadata: HashMap::new(),
};
(http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
}
}
}
#[allow(dead_code)] fn create_bridge_handler(
&self,
service_name: String,
method_name: String,
_client_streaming: bool,
server_streaming: bool,
) -> Box<BridgeHandlerFn> {
Box::new(
move |state: State<Arc<Self>>,
_path: Path<HashMap<String, String>>,
_query: Query<BridgeQuery>,
body: Bytes| {
let service_name = service_name.clone();
let method_name = method_name.clone();
let stats = state.stats.clone();
let proxy = state.proxy.clone();
let converter = state.converter.clone();
Box::pin(async move {
{
if let Ok(mut stats) = stats.lock() {
stats.requests_served += 1;
} else {
warn!("Failed to update request stats (mutex poisoned)");
}
}
let params = BridgeRequestParams {
proxy: &proxy,
converter: &converter,
service_name: service_name.as_str(),
method_name: method_name.as_str(),
server_streaming,
body,
};
let result = Self::handle_bridge_request(¶ms).await;
match result {
Ok(response) => {
{
if let Ok(mut stats) = stats.lock() {
stats.requests_successful += 1;
} else {
warn!("Failed to update success stats (mutex poisoned)");
}
}
(http::StatusCode::OK, Json(response)).into_response()
}
Err(err) => {
{
if let Ok(mut stats) = stats.lock() {
stats.requests_failed += 1;
} else {
warn!("Failed to update failure stats (mutex poisoned)");
}
}
warn!(
"Bridge request failed for {}.{}: {}",
service_name, method_name, err
);
let error_response = BridgeResponse::<Value> {
success: false,
data: None,
error: Some(err.to_string()),
metadata: HashMap::new(),
};
(http::StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
.into_response()
}
}
})
},
)
}
#[allow(dead_code)] async fn get_stats_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
let stats = bridge.stats.lock().unwrap_or_else(|poisoned| {
warn!("Statistics mutex is poisoned, using default values");
poisoned.into_inner()
});
Json(serde_json::json!({
"requests_served": stats.requests_served,
"requests_successful": stats.requests_successful,
"requests_failed": stats.requests_failed,
"available_services": stats.available_services
}))
}
async fn list_services_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
let services = bridge.proxy.service_names();
Json(serde_json::json!({
"services": services
}))
}
async fn get_openapi_spec_static(bridge: &Arc<HttpBridge>) -> Json<Value> {
use crate::dynamic::proto_parser::ProtoService;
use std::collections::HashMap;
let services: HashMap<String, ProtoService> = bridge
.proxy
.service_registry()
.services
.iter()
.map(|(name, dyn_service)| (name.clone(), dyn_service.service().clone()))
.collect();
let spec = bridge.route_generator.generate_openapi_spec(&services);
Json(spec)
}
async fn handle_bridge_request(
params: &BridgeRequestParams<'_>,
) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
debug!("Handling bridge request for {}.{}", params.service_name, params.method_name);
let json_request: Value = if params.body.is_empty() {
Value::Null
} else {
serde_json::from_slice(¶ms.body).map_err(|e| {
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to parse JSON request: {}",
e
))
})?
};
if params.server_streaming {
Self::handle_streaming_request(
params.proxy,
params.converter,
params.service_name,
params.method_name,
json_request,
)
.await
} else {
Self::handle_unary_request(
params.proxy,
params.converter,
params.service_name,
params.method_name,
json_request,
)
.await
}
}
async fn handle_unary_request(
proxy: &MockReflectionProxy,
converter: &ProtobufJsonConverter,
service_name: &str,
method_name: &str,
_json_request: Value,
) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
let registry = proxy.service_registry();
let service_registry = registry.clone();
let service = match service_registry.get(service_name) {
Some(s) => s,
None => {
return Err(format!("Service '{}' not found", service_name).into());
}
};
let method = match service.service().methods.iter().find(|m| m.name == method_name) {
Some(m) => m,
None => {
return Err(format!(
"Method '{}' not found in service '{}'",
method_name, service_name
)
.into());
}
};
let method_descriptor = proxy.cache().get_method(service_name, method_name).await.map_err(
|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to get method descriptor: {}", e).into()
},
)?;
let output_descriptor = method_descriptor.output();
let mock_response = {
match proxy.smart_generator().lock() {
Ok(mut gen) => gen.generate_message(&output_descriptor),
Err(e) => {
return Err(format!("Failed to acquire smart generator lock: {}", e).into());
}
}
};
let json_response = converter
.protobuf_to_json(&output_descriptor, &mock_response)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to convert response to JSON: {}", e).into()
})?;
let mut metadata = HashMap::new();
metadata.insert("x-mockforge-bridge-mode".to_string(), "unary".to_string());
metadata.insert("x-mockforge-input-type".to_string(), method.input_type.clone());
metadata.insert("x-mockforge-output-type".to_string(), method.output_type.clone());
Ok(BridgeResponse {
success: true,
data: Some(json_response),
error: None,
metadata,
})
}
async fn handle_streaming_request(
proxy: &MockReflectionProxy,
converter: &ProtobufJsonConverter,
service_name: &str,
method_name: &str,
_json_request: Value,
) -> Result<BridgeResponse<Value>, Box<dyn std::error::Error + Send + Sync>> {
let method_descriptor = proxy.cache().get_method(service_name, method_name).await.map_err(
|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to get method descriptor: {}", e).into()
},
)?;
let output_descriptor = method_descriptor.output();
let stream_count = 3;
let mut events = Vec::new();
for seq in 0..stream_count {
let mock_msg = {
match proxy.smart_generator().lock() {
Ok(mut gen) => gen.generate_message(&output_descriptor),
Err(e) => {
return Err(format!("Failed to acquire smart generator lock: {}", e).into());
}
}
};
let json_data = converter.protobuf_to_json(&output_descriptor, &mock_msg).map_err(
|e| -> Box<dyn std::error::Error + Send + Sync> {
format!("Failed to convert stream event to JSON: {}", e).into()
},
)?;
events.push(serde_json::json!({
"sequence": seq + 1,
"service": service_name,
"method": method_name,
"timestamp": chrono::Utc::now().to_rfc3339(),
"data": json_data
}));
}
let mut metadata = HashMap::new();
metadata.insert("x-mockforge-streaming-mode".to_string(), "json-envelope".to_string());
metadata.insert("x-mockforge-stream-count".to_string(), stream_count.to_string());
metadata.insert(
"x-mockforge-output-type".to_string(),
method_descriptor.output().full_name().to_string(),
);
Ok(BridgeResponse {
success: true,
data: Some(serde_json::json!({
"stream_type": "server",
"service": service_name,
"method": method_name,
"events": events
})),
error: None,
metadata,
})
}
}
impl Clone for HttpBridge {
fn clone(&self) -> Self {
Self {
proxy: self.proxy.clone(),
route_generator: self.route_generator.clone(),
converter: self.converter.clone(),
config: self.config.clone(),
stats: self.stats.clone(),
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_module_compiles() {
}
}