opensearch-api 0.1.0

High-performance REST API gateway for OpenSearch with security, observability and multi-tenant support
use axum::{
    extract::State,
    http::{Request, StatusCode},
    middleware::Next,
    response::Response,
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{
    sync::Arc,
    time::{Duration, Instant},
};
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MetricEvent {
    #[serde(rename = "@timestamp")]
    pub timestamp: DateTime<Utc>,
    pub event_id: String,
    pub event_type: String,
    pub method: String,
    pub path: String,
    pub status_code: u16,
    pub duration_ms: u64,
    pub client_ip: Option<String>,
    pub user_agent: Option<String>,
    pub error_message: Option<String>,
    pub api_key_id: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SystemMetrics {
    #[serde(rename = "@timestamp")]
    pub timestamp: DateTime<Utc>,
    pub metric_type: String,
    pub cpu_usage: f32,
    pub memory_usage_mb: u64,
    pub active_connections: u32,
    pub total_requests: u64,
    pub error_rate: f32,
    pub avg_response_time_ms: f64,
}

// Middleware para coletar métricas de requisições
pub async fn metrics_middleware(
    State(state): State<Arc<crate::AppState>>,
    request: Request<axum::body::Body>,
    next: Next,
) -> Result<Response, StatusCode> {
    let start = Instant::now();
    let method = request.method().to_string();
    let path = request.uri().path().to_string();

    // Extrai informações do cliente
    let client_ip = request
        .headers()
        .get("x-forwarded-for")
        .or_else(|| request.headers().get("x-real-ip"))
        .and_then(|v| v.to_str().ok())
        .map(|s| s.to_string());

    let user_agent = request
        .headers()
        .get("user-agent")
        .and_then(|v| v.to_str().ok())
        .map(|s| s.to_string());

    let api_key_id = request
        .headers()
        .get("x-api-key")
        .and_then(|v| v.to_str().ok())
        .map(|k| format!("key_{}...", &k[..8.min(k.len())]));

    // Processa a requisição
    let response = next.run(request).await;

    // Calcula duração
    let duration = start.elapsed();
    let status_code = response.status().as_u16();

    // Cria evento de métrica
    let metric_event = MetricEvent {
        timestamp: Utc::now(),
        event_id: Uuid::new_v4().to_string(),
        event_type: "api_request".to_string(),
        method,
        path: path.clone(),
        status_code,
        duration_ms: duration.as_millis() as u64,
        client_ip,
        user_agent,
        error_message: if status_code >= 400 {
            Some(format!("HTTP {}", status_code))
        } else {
            None
        },
        api_key_id,
    };

    // Envia métrica para OpenSearch de forma assíncrona
    let state_clone = state.clone();
    tokio::spawn(async move {
        let _ = index_metric(&state_clone, metric_event).await;
    });

    Ok(response)
}

// Indexa métrica no OpenSearch
async fn index_metric(
    state: &Arc<crate::AppState>,
    metric: MetricEvent,
) -> Result<(), reqwest::Error> {
    let index_name = format!("opensearch-api-metrics-{}", Utc::now().format("%Y.%m"));
    let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);

    state.client.post(&url).json(&metric).send().await?;

    Ok(())
}

// Coleta métricas do sistema periodicamente
pub async fn collect_system_metrics(state: Arc<crate::AppState>) {
    let mut interval = tokio::time::interval(Duration::from_secs(60)); // A cada minuto

    loop {
        interval.tick().await;

        // Coleta métricas do sistema
        let metrics = SystemMetrics {
            timestamp: Utc::now(),
            metric_type: "system".to_string(),
            cpu_usage: get_cpu_usage(),
            memory_usage_mb: get_memory_usage(),
            active_connections: get_active_connections(),
            total_requests: get_total_requests(),
            error_rate: calculate_error_rate(),
            avg_response_time_ms: calculate_avg_response_time(),
        };

        // Indexa no OpenSearch
        let index_name = format!("opensearch-api-system-{}", Utc::now().format("%Y.%m"));
        let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);

        let _ = state.client.post(&url).json(&metrics).send().await;
    }
}

// Endpoint para consultar métricas
pub async fn get_metrics(
    State(state): State<Arc<crate::AppState>>,
) -> Result<axum::Json<MetricsResponse>, StatusCode> {
    // Busca métricas dos últimos 5 minutos
    let query = serde_json::json!({
        "query": {
            "range": {
                "@timestamp": {
                    "gte": "now-5m"
                }
            }
        },
        "aggs": {
            "avg_duration": {
                "avg": {
                    "field": "duration_ms"
                }
            },
            "status_codes": {
                "terms": {
                    "field": "status_code"
                }
            },
            "top_endpoints": {
                "terms": {
                    "field": "path.keyword",
                    "size": 10
                }
            }
        },
        "size": 0
    });

    let index_pattern = format!("opensearch-api-metrics-*");
    let url = format!("{}/{}/_search", state.config.opensearch_url, index_pattern);

    let response = state
        .client
        .post(&url)
        .json(&query)
        .send()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

    if !response.status().is_success() {
        return Err(StatusCode::INTERNAL_SERVER_ERROR);
    }

    let body: serde_json::Value = response
        .json()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

    // Extrai dados das agregações
    let avg_duration = body["aggregations"]["avg_duration"]["value"]
        .as_f64()
        .unwrap_or(0.0);

    let total_requests = body["hits"]["total"]["value"].as_u64().unwrap_or(0);

    let status_codes: Vec<StatusCodeCount> = body["aggregations"]["status_codes"]["buckets"]
        .as_array()
        .unwrap_or(&Vec::new())
        .iter()
        .map(|bucket| StatusCodeCount {
            status_code: bucket["key"].as_u64().unwrap_or(0) as u16,
            count: bucket["doc_count"].as_u64().unwrap_or(0),
        })
        .collect();

    let top_endpoints: Vec<EndpointCount> = body["aggregations"]["top_endpoints"]["buckets"]
        .as_array()
        .unwrap_or(&Vec::new())
        .iter()
        .map(|bucket| EndpointCount {
            endpoint: bucket["key"].as_str().unwrap_or("").to_string(),
            count: bucket["doc_count"].as_u64().unwrap_or(0),
        })
        .collect();

    Ok(axum::Json(MetricsResponse {
        period: "last_5_minutes".to_string(),
        total_requests,
        avg_duration_ms: avg_duration,
        status_codes,
        top_endpoints,
    }))
}

#[derive(Serialize)]
pub struct MetricsResponse {
    pub period: String,
    pub total_requests: u64,
    pub avg_duration_ms: f64,
    pub status_codes: Vec<StatusCodeCount>,
    pub top_endpoints: Vec<EndpointCount>,
}

#[derive(Serialize)]
pub struct StatusCodeCount {
    pub status_code: u16,
    pub count: u64,
}

#[derive(Serialize)]
pub struct EndpointCount {
    pub endpoint: String,
    pub count: u64,
}

// Funções auxiliares (simplificadas - em produção usar crates apropriadas)
fn get_cpu_usage() -> f32 {
    // Em produção, usar sysinfo crate
    0.0
}

fn get_memory_usage() -> u64 {
    // Em produção, usar sysinfo crate
    0
}

fn get_active_connections() -> u32 {
    // Implementar contador de conexões ativas
    0
}

fn get_total_requests() -> u64 {
    // Implementar contador total
    0
}

fn calculate_error_rate() -> f32 {
    // Calcular taxa de erro das últimas requisições
    0.0
}

fn calculate_avg_response_time() -> f64 {
    // Calcular tempo médio de resposta
    0.0
}

// Cria índices e templates no OpenSearch
pub async fn setup_metrics_indices(state: &Arc<crate::AppState>) -> Result<(), reqwest::Error> {
    // Template para índices de métricas
    let template = serde_json::json!({
        "index_patterns": ["opensearch-api-metrics-*", "opensearch-api-system-*"],
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
            "index.refresh_interval": "5s"
        },
        "mappings": {
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "event_id": {
                    "type": "keyword"
                },
                "event_type": {
                    "type": "keyword"
                },
                "method": {
                    "type": "keyword"
                },
                "path": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "status_code": {
                    "type": "integer"
                },
                "duration_ms": {
                    "type": "long"
                },
                "client_ip": {
                    "type": "ip"
                },
                "user_agent": {
                    "type": "text"
                },
                "error_message": {
                    "type": "text"
                },
                "api_key_id": {
                    "type": "keyword"
                },
                "cpu_usage": {
                    "type": "float"
                },
                "memory_usage_mb": {
                    "type": "long"
                },
                "active_connections": {
                    "type": "integer"
                },
                "total_requests": {
                    "type": "long"
                },
                "error_rate": {
                    "type": "float"
                },
                "avg_response_time_ms": {
                    "type": "float"
                }
            }
        }
    });

    // Cria o template
    let url = format!(
        "{}/_index_template/opensearch-api-metrics",
        state.config.opensearch_url
    );
    state.client.put(&url).json(&template).send().await?;

    Ok(())
}