1use axum::{
2 extract::{Path, Query, State},
3 http::{StatusCode},
4 response::{Json, Response},
5 routing::{get, post, put, delete},
6 Router,
7};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::time::Duration;
12use tracing::{debug, error, info, warn};
13
14use rabbitmesh::{ServiceClient, RabbitMeshError};
15use crate::registry::ServiceRegistry;
16
17#[derive(Debug, Clone)]
19pub struct GatewayState {
20 pub service_client: Arc<ServiceClient>,
22 pub service_registry: Arc<ServiceRegistry>,
24}
25
26pub async fn create_auto_router(amqp_url: impl Into<String>) -> Result<Router, RabbitMeshError> {
45 info!("🌐 Creating auto-router gateway");
46
47 let service_client = Arc::new(
49 ServiceClient::new("api-gateway", amqp_url).await?
50 );
51
52 let service_registry = Arc::new(ServiceRegistry::new());
53
54 let state = GatewayState {
55 service_client,
56 service_registry,
57 };
58
59 let router = Router::new()
60 .route("/health", get(health_check))
62 .route("/health/:service", get(service_health_check))
63
64 .route("/api/v1/:service/:method", post(handle_rpc_call))
67 .route("/api/v1/:service/:method", get(handle_rpc_call))
68 .route("/api/v1/:service/:method", put(handle_rpc_call))
69 .route("/api/v1/:service/:method", delete(handle_rpc_call))
70
71 .route("/api/v1/:service/:method/:param", get(handle_rpc_call_with_param))
73 .route("/api/v1/:service/:method/:param", post(handle_rpc_call_with_param))
74 .route("/api/v1/:service/:method/:param", put(handle_rpc_call_with_param))
75 .route("/api/v1/:service/:method/:param", delete(handle_rpc_call_with_param))
76
77 .route("/registry/services", get(list_services))
79 .route("/registry/services/:service", get(describe_service))
80
81 .with_state(state);
82
83 info!("✅ Auto-router created, ready to proxy HTTP -> RabbitMQ -> Microservices");
84 Ok(router)
85}
86
87async fn handle_rpc_call(
93 State(state): State<GatewayState>,
94 Path((service, method)): Path<(String, String)>,
95 Query(query_params): Query<HashMap<String, String>>,
96 body: Option<Json<Value>>,
97) -> Result<Json<Value>, GatewayError> {
98 debug!("🔄 RPC call: {}.{}", service, method);
99
100 let params = prepare_params(query_params, body);
102
103 let params_with_meta = params;
105
106 let response = state
108 .service_client
109 .call_with_timeout(&service, &method, params_with_meta, Duration::from_secs(30))
110 .await?;
111
112 match response {
114 rabbitmesh::message::RpcResponse::Success { data, processing_time_ms } => {
115 debug!("✅ RPC success: {}.{} ({}ms)", service, method, processing_time_ms);
116 Ok(Json(data))
117 }
118 rabbitmesh::message::RpcResponse::Error { error, code, details } => {
119 warn!("❌ RPC error: {}.{} - {}", service, method, error);
120 Err(GatewayError::ServiceError {
121 service: service.clone(),
122 method: method.clone(),
123 error,
124 code,
125 details,
126 })
127 }
128 }
129}
130
131async fn handle_rpc_call_with_param(
137 State(state): State<GatewayState>,
138 Path((service, method, param)): Path<(String, String, String)>,
139 Query(query_params): Query<HashMap<String, String>>,
140 body: Option<Json<Value>>,
141) -> Result<Json<Value>, GatewayError> {
142 debug!("🔄 RPC call with param: {}.{}({})", service, method, param);
143
144 let mut all_params = query_params;
146
147 let param_value = if let Ok(num) = param.parse::<i64>() {
149 Value::Number(serde_json::Number::from(num))
150 } else if let Ok(num) = param.parse::<f64>() {
151 Value::Number(serde_json::Number::from_f64(num).unwrap_or_else(|| serde_json::Number::from(0)))
152 } else if param == "true" || param == "false" {
153 Value::Bool(param == "true")
154 } else {
155 Value::String(param)
156 };
157
158 let param_key = match method.as_str() {
160 m if m.contains("user") => "user_id",
161 m if m.contains("order") => "order_id",
162 m if m.contains("product") => "product_id",
163 _ => "id", };
165
166 all_params.insert(param_key.to_string(), param_value.to_string());
167
168 let params = prepare_params(all_params, body);
169 let params_with_meta = params;
170
171 let response = state
173 .service_client
174 .call_with_timeout(&service, &method, params_with_meta, Duration::from_secs(30))
175 .await?;
176
177 match response {
178 rabbitmesh::message::RpcResponse::Success { data, processing_time_ms } => {
179 debug!("✅ RPC success: {}.{} ({}ms)", service, method, processing_time_ms);
180 Ok(Json(data))
181 }
182 rabbitmesh::message::RpcResponse::Error { error, code, details } => {
183 warn!("❌ RPC error: {}.{} - {}", service, method, error);
184 Err(GatewayError::ServiceError {
185 service,
186 method,
187 error,
188 code,
189 details,
190 })
191 }
192 }
193}
194
195fn prepare_params(
197 query_params: HashMap<String, String>,
198 body: Option<Json<Value>>,
199) -> Value {
200 let mut params = serde_json::Map::new();
201
202 for (key, value) in query_params {
204 let parsed_value = serde_json::from_str(&value)
206 .unwrap_or_else(|_| Value::String(value));
207 params.insert(key, parsed_value);
208 }
209
210 if let Some(Json(body_value)) = body {
212 if let Value::Object(body_map) = body_value {
213 for (key, value) in body_map {
214 params.insert(key, value);
215 }
216 }
217 }
218
219 Value::Object(params)
220}
221
222
223async fn health_check(State(state): State<GatewayState>) -> Result<Json<Value>, GatewayError> {
225 let is_healthy = state.service_client.is_healthy().await;
226 let stats = state.service_client.get_stats().await;
227
228 Ok(Json(serde_json::json!({
229 "status": if is_healthy { "healthy" } else { "unhealthy" },
230 "gateway": "rabbitmesh-gateway",
231 "version": env!("CARGO_PKG_VERSION"),
232 "connection": stats.connection_stats,
233 "rpc": stats.rpc_stats
234 })))
235}
236
237async fn service_health_check(
239 State(state): State<GatewayState>,
240 Path(service): Path<String>,
241) -> Result<Json<Value>, GatewayError> {
242 match state.service_client.call_with_timeout(
244 &service,
245 "ping",
246 serde_json::json!({}),
247 Duration::from_secs(5)
248 ).await {
249 Ok(_) => Ok(Json(serde_json::json!({
250 "service": service,
251 "status": "healthy"
252 }))),
253 Err(_) => Ok(Json(serde_json::json!({
254 "service": service,
255 "status": "unhealthy"
256 })))
257 }
258}
259
260async fn list_services(State(state): State<GatewayState>) -> Json<Value> {
262 let services = state.service_registry.list_services().await;
263 Json(serde_json::json!({ "services": services }))
264}
265
266async fn describe_service(
268 State(state): State<GatewayState>,
269 Path(service): Path<String>,
270) -> Result<Json<Value>, GatewayError> {
271 match state.service_registry.get_service(&service).await {
272 Some(service_info) => Ok(Json(serde_json::json!(service_info))),
273 None => Err(GatewayError::ServiceNotFound(service)),
274 }
275}
276
277#[derive(Debug)]
279pub enum GatewayError {
280 ServiceError {
281 service: String,
282 method: String,
283 error: String,
284 code: Option<String>,
285 details: Option<Value>,
286 },
287 ServiceNotFound(String),
288 RabbitMeshError(RabbitMeshError),
289}
290
291impl From<RabbitMeshError> for GatewayError {
292 fn from(err: RabbitMeshError) -> Self {
293 Self::RabbitMeshError(err)
294 }
295}
296
297impl axum::response::IntoResponse for GatewayError {
298 fn into_response(self) -> Response {
299 let (status, error_message) = match self {
300 GatewayError::ServiceError { service, method, error, code, details } => {
301 let status = match code.as_deref() {
302 Some("NOT_FOUND") => StatusCode::NOT_FOUND,
303 Some("UNAUTHORIZED") => StatusCode::UNAUTHORIZED,
304 Some("FORBIDDEN") => StatusCode::FORBIDDEN,
305 Some("BAD_REQUEST") => StatusCode::BAD_REQUEST,
306 _ => StatusCode::INTERNAL_SERVER_ERROR,
307 };
308
309 let body = serde_json::json!({
310 "error": error,
311 "service": service,
312 "method": method,
313 "code": code,
314 "details": details
315 });
316
317 (status, body)
318 }
319 GatewayError::ServiceNotFound(service) => {
320 (
321 StatusCode::NOT_FOUND,
322 serde_json::json!({
323 "error": format!("Service '{}' not found", service),
324 "code": "SERVICE_NOT_FOUND"
325 })
326 )
327 }
328 GatewayError::RabbitMeshError(err) => {
329 error!("Gateway error: {}", err);
330 (
331 StatusCode::INTERNAL_SERVER_ERROR,
332 serde_json::json!({
333 "error": "Internal gateway error",
334 "code": "GATEWAY_ERROR"
335 })
336 )
337 }
338 };
339
340 (status, Json(error_message)).into_response()
341 }
342}