rabbitmesh_gateway/
router.rs

1use axum::{
2    extract::{Path, Query, State},
3    http::{StatusCode},
4    response::{Json, Response},
5    routing::{get, post, put, delete},
6    Router,
7};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::time::Duration;
12use tracing::{debug, error, info, warn};
13
14use rabbitmesh::{ServiceClient, RabbitMeshError};
15use crate::registry::ServiceRegistry;
16
17/// Gateway state containing the RabbitMQ client and service registry
18#[derive(Debug, Clone)]
19pub struct GatewayState {
20    /// Client for calling microservices via RabbitMQ
21    pub service_client: Arc<ServiceClient>,
22    /// Registry of available services and their methods
23    pub service_registry: Arc<ServiceRegistry>,
24}
25
26/// Create the auto-router that handles all REST API calls
27/// 
28/// ## How it works:
29/// 1. Frontend sends HTTP request to gateway
30/// 2. Gateway parses the request and extracts service/method  
31/// 3. Gateway calls microservice via RabbitMQ (NO PORTS!)
32/// 4. Microservice processes request and responds via RabbitMQ
33/// 5. Gateway returns HTTP response to frontend
34/// 
35/// Example flow:
36/// ```
37/// Frontend -> GET /api/v1/user-service/users/123
38///          -> Gateway extracts: service="user-service", method="get_user", params={id: 123}
39///          -> Gateway calls service via RabbitMQ
40///          -> user-service processes request 
41///          -> user-service responds via RabbitMQ
42///          -> Gateway returns JSON to frontend
43/// ```
44pub async fn create_auto_router(amqp_url: impl Into<String>) -> Result<Router, RabbitMeshError> {
45    info!("🌐 Creating auto-router gateway");
46    
47    // Create service client that talks to microservices via RabbitMQ
48    let service_client = Arc::new(
49        ServiceClient::new("api-gateway", amqp_url).await?
50    );
51    
52    let service_registry = Arc::new(ServiceRegistry::new());
53    
54    let state = GatewayState {
55        service_client,
56        service_registry,
57    };
58
59    let router = Router::new()
60        // Health check endpoint
61        .route("/health", get(health_check))
62        .route("/health/:service", get(service_health_check))
63        
64        // Auto-generated REST API routes
65        // Pattern: /api/v1/{service}/{method}/{params...}
66        .route("/api/v1/:service/:method", post(handle_rpc_call))
67        .route("/api/v1/:service/:method", get(handle_rpc_call))
68        .route("/api/v1/:service/:method", put(handle_rpc_call))
69        .route("/api/v1/:service/:method", delete(handle_rpc_call))
70        
71        // Parameterized routes: /api/v1/user-service/users/123
72        .route("/api/v1/:service/:method/:param", get(handle_rpc_call_with_param))
73        .route("/api/v1/:service/:method/:param", post(handle_rpc_call_with_param))
74        .route("/api/v1/:service/:method/:param", put(handle_rpc_call_with_param))
75        .route("/api/v1/:service/:method/:param", delete(handle_rpc_call_with_param))
76        
77        // Service registry endpoints
78        .route("/registry/services", get(list_services))
79        .route("/registry/services/:service", get(describe_service))
80        
81        .with_state(state);
82
83    info!("✅ Auto-router created, ready to proxy HTTP -> RabbitMQ -> Microservices");
84    Ok(router)
85}
86
87/// Handle RPC call without path parameters
88/// 
89/// Examples:
90/// - POST /api/v1/user-service/create_user  
91/// - GET /api/v1/product-service/list_products
92async fn handle_rpc_call(
93    State(state): State<GatewayState>,
94    Path((service, method)): Path<(String, String)>,
95    Query(query_params): Query<HashMap<String, String>>,
96    body: Option<Json<Value>>,
97) -> Result<Json<Value>, GatewayError> {
98    debug!("🔄 RPC call: {}.{}", service, method);
99    
100    // Prepare parameters from query params and body
101    let params = prepare_params(query_params, body);
102    
103    // In a real implementation, you'd extract headers here
104    let params_with_meta = params;
105    
106    // Call microservice via RabbitMQ (THIS IS THE MAGIC!)
107    let response = state
108        .service_client
109        .call_with_timeout(&service, &method, params_with_meta, Duration::from_secs(30))
110        .await?;
111    
112    // Convert RPC response to HTTP response
113    match response {
114        rabbitmesh::message::RpcResponse::Success { data, processing_time_ms } => {
115            debug!("✅ RPC success: {}.{} ({}ms)", service, method, processing_time_ms);
116            Ok(Json(data))
117        }
118        rabbitmesh::message::RpcResponse::Error { error, code, details } => {
119            warn!("❌ RPC error: {}.{} - {}", service, method, error);
120            Err(GatewayError::ServiceError {
121                service: service.clone(),
122                method: method.clone(),
123                error,
124                code,
125                details,
126            })
127        }
128    }
129}
130
131/// Handle RPC call with single path parameter
132/// 
133/// Examples:
134/// - GET /api/v1/user-service/get_user/123 -> get_user(user_id: 123)
135/// - DELETE /api/v1/order-service/cancel_order/abc-456 -> cancel_order(order_id: "abc-456")
136async fn handle_rpc_call_with_param(
137    State(state): State<GatewayState>,
138    Path((service, method, param)): Path<(String, String, String)>,
139    Query(query_params): Query<HashMap<String, String>>,
140    body: Option<Json<Value>>,
141) -> Result<Json<Value>, GatewayError> {
142    debug!("🔄 RPC call with param: {}.{}({})", service, method, param);
143    
144    // Add path parameter to query params
145    let mut all_params = query_params;
146    
147    // Try to parse param as different types
148    let param_value = if let Ok(num) = param.parse::<i64>() {
149        Value::Number(serde_json::Number::from(num))
150    } else if let Ok(num) = param.parse::<f64>() {
151        Value::Number(serde_json::Number::from_f64(num).unwrap_or_else(|| serde_json::Number::from(0)))
152    } else if param == "true" || param == "false" {
153        Value::Bool(param == "true")
154    } else {
155        Value::String(param)
156    };
157    
158    // Common parameter names based on method patterns
159    let param_key = match method.as_str() {
160        m if m.contains("user") => "user_id",
161        m if m.contains("order") => "order_id", 
162        m if m.contains("product") => "product_id",
163        _ => "id", // Default fallback
164    };
165    
166    all_params.insert(param_key.to_string(), param_value.to_string());
167    
168    let params = prepare_params(all_params, body);
169    let params_with_meta = params;
170    
171    // Call microservice via RabbitMQ
172    let response = state
173        .service_client
174        .call_with_timeout(&service, &method, params_with_meta, Duration::from_secs(30))
175        .await?;
176    
177    match response {
178        rabbitmesh::message::RpcResponse::Success { data, processing_time_ms } => {
179            debug!("✅ RPC success: {}.{} ({}ms)", service, method, processing_time_ms);
180            Ok(Json(data))
181        }
182        rabbitmesh::message::RpcResponse::Error { error, code, details } => {
183            warn!("❌ RPC error: {}.{} - {}", service, method, error);
184            Err(GatewayError::ServiceError {
185                service,
186                method,
187                error,
188                code,
189                details,
190            })
191        }
192    }
193}
194
195/// Prepare parameters from query params and request body
196fn prepare_params(
197    query_params: HashMap<String, String>,
198    body: Option<Json<Value>>,
199) -> Value {
200    let mut params = serde_json::Map::new();
201    
202    // Add query parameters
203    for (key, value) in query_params {
204        // Try to parse value as JSON, fallback to string
205        let parsed_value = serde_json::from_str(&value)
206            .unwrap_or_else(|_| Value::String(value));
207        params.insert(key, parsed_value);
208    }
209    
210    // Add body parameters
211    if let Some(Json(body_value)) = body {
212        if let Value::Object(body_map) = body_value {
213            for (key, value) in body_map {
214                params.insert(key, value);
215            }
216        }
217    }
218    
219    Value::Object(params)
220}
221
222
223/// Health check for the gateway itself
224async fn health_check(State(state): State<GatewayState>) -> Result<Json<Value>, GatewayError> {
225    let is_healthy = state.service_client.is_healthy().await;
226    let stats = state.service_client.get_stats().await;
227    
228    Ok(Json(serde_json::json!({
229        "status": if is_healthy { "healthy" } else { "unhealthy" },
230        "gateway": "rabbitmesh-gateway",
231        "version": env!("CARGO_PKG_VERSION"),
232        "connection": stats.connection_stats,
233        "rpc": stats.rpc_stats
234    })))
235}
236
237/// Health check for specific service
238async fn service_health_check(
239    State(state): State<GatewayState>,
240    Path(service): Path<String>,
241) -> Result<Json<Value>, GatewayError> {
242    // Try to ping the service
243    match state.service_client.call_with_timeout(
244        &service,
245        "ping",
246        serde_json::json!({}),
247        Duration::from_secs(5)
248    ).await {
249        Ok(_) => Ok(Json(serde_json::json!({
250            "service": service,
251            "status": "healthy"
252        }))),
253        Err(_) => Ok(Json(serde_json::json!({
254            "service": service,
255            "status": "unhealthy"
256        })))
257    }
258}
259
260/// List all registered services
261async fn list_services(State(state): State<GatewayState>) -> Json<Value> {
262    let services = state.service_registry.list_services().await;
263    Json(serde_json::json!({ "services": services }))
264}
265
266/// Describe a specific service and its methods
267async fn describe_service(
268    State(state): State<GatewayState>,
269    Path(service): Path<String>,
270) -> Result<Json<Value>, GatewayError> {
271    match state.service_registry.get_service(&service).await {
272        Some(service_info) => Ok(Json(serde_json::json!(service_info))),
273        None => Err(GatewayError::ServiceNotFound(service)),
274    }
275}
276
277/// Gateway-specific errors
278#[derive(Debug)]
279pub enum GatewayError {
280    ServiceError {
281        service: String,
282        method: String,
283        error: String,
284        code: Option<String>,
285        details: Option<Value>,
286    },
287    ServiceNotFound(String),
288    RabbitMeshError(RabbitMeshError),
289}
290
291impl From<RabbitMeshError> for GatewayError {
292    fn from(err: RabbitMeshError) -> Self {
293        Self::RabbitMeshError(err)
294    }
295}
296
297impl axum::response::IntoResponse for GatewayError {
298    fn into_response(self) -> Response {
299        let (status, error_message) = match self {
300            GatewayError::ServiceError { service, method, error, code, details } => {
301                let status = match code.as_deref() {
302                    Some("NOT_FOUND") => StatusCode::NOT_FOUND,
303                    Some("UNAUTHORIZED") => StatusCode::UNAUTHORIZED,
304                    Some("FORBIDDEN") => StatusCode::FORBIDDEN,
305                    Some("BAD_REQUEST") => StatusCode::BAD_REQUEST,
306                    _ => StatusCode::INTERNAL_SERVER_ERROR,
307                };
308                
309                let body = serde_json::json!({
310                    "error": error,
311                    "service": service,
312                    "method": method,
313                    "code": code,
314                    "details": details
315                });
316                
317                (status, body)
318            }
319            GatewayError::ServiceNotFound(service) => {
320                (
321                    StatusCode::NOT_FOUND,
322                    serde_json::json!({
323                        "error": format!("Service '{}' not found", service),
324                        "code": "SERVICE_NOT_FOUND"
325                    })
326                )
327            }
328            GatewayError::RabbitMeshError(err) => {
329                error!("Gateway error: {}", err);
330                (
331                    StatusCode::INTERNAL_SERVER_ERROR,
332                    serde_json::json!({
333                        "error": "Internal gateway error",
334                        "code": "GATEWAY_ERROR"
335                    })
336                )
337            }
338        };
339
340        (status, Json(error_message)).into_response()
341    }
342}