pg-api 0.2.0

A high-performance PostgreSQL REST API driver with rate limiting, connection pooling, and observability
use axum::{
    extract::{Request, State},
    http::StatusCode,
    middleware::Next,
    response::{IntoResponse, Response},
    Json,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use tracing::{debug, warn};
use uuid::Uuid;

use crate::models::AppState;
use crate::queue::{QueuedRequest, QueryResult, RequestPriority};

/// Middleware que gerencia a fila global de requisições
pub async fn queue_middleware(
    State(state): State<Arc<AppState>>,
    request: Request,
    next: Next,
) -> Result<Response, StatusCode> {
    let path = request.uri().path().to_string();
    
    // Health check não entra na fila
    if path == "/health" || path == "/" {
        return Ok(next.run(request).await);
    }
    
    // Verifica se fila está habilitada
    if !state.global_pool.config().enable_queue {
        // Fila desabilitada, tenta executar imediatamente
        return Ok(next.run(request).await);
    }
    
    // Tenta obter conexão do pool sem bloquear
    match state.global_pool.try_get_connection().await {
        Some(_conn) => {
            // ✅ Conexão disponível, executa imediatamente
            debug!("Connection available, executing immediately: {}", path);
            let response = next.run(request).await;
            // Conexão é liberada automaticamente quando _conn sai de escopo
            Ok(response)
        }
        None => {
            // ❌ Pool esgotado, enfileira
            debug!("Pool exhausted, enqueueing request: {}", path);
            enqueue_and_wait(state, request, path).await
        }
    }
}

/// Enfileira a requisição e aguarda resultado
async fn enqueue_and_wait(
    state: Arc<AppState>,
    request: Request,
    path: String,
) -> Result<Response, StatusCode> {
    // Cria channel para resposta async
    let (tx, rx) = tokio::sync::oneshot::channel();
    
    // Determina prioridade
    let priority = RequestPriority::from_path(&path);
    
    // Cria job na fila
    let queued = QueuedRequest {
        id: Uuid::new_v4(),
        request,
        response_tx: tx,
        enqueue_time: std::time::Instant::now(),
        priority,
    };
    
    // Envia para fila global
    if let Some(ref queue_tx) = state.job_queue {
        if let Err(e) = queue_tx.send(queued).await {
            warn!("Failed to enqueue request: {}", e);
            return Err(StatusCode::SERVICE_UNAVAILABLE);
        }
        
        // Atualiza estatísticas
        state.queue_stats.record_enqueue();
        
        // Aguarda resultado com timeout
        let queue_timeout_ms = state.global_pool.config().queue_timeout_ms;
        
        match timeout(Duration::from_millis(queue_timeout_ms), rx).await {
            Ok(Ok(result)) => {
                // ✅ Sucesso
                Ok(build_response(result))
            }
            Ok(Err(_)) => {
                // ❌ Worker falhou
                Err(StatusCode::INTERNAL_SERVER_ERROR)
            }
            Err(_) => {
                // ⏱️ Timeout
                state.queue_stats.record_timeout();
                Err(StatusCode::GATEWAY_TIMEOUT)
            }
        }
    } else {
        // Fila não inicializada
        warn!("Job queue not initialized");
        Err(StatusCode::SERVICE_UNAVAILABLE)
    }
}

/// Constrói a resposta HTTP a partir do resultado
fn build_response(result: QueryResult) -> Response {
    let status = if result.success {
        StatusCode::OK
    } else {
        StatusCode::INTERNAL_SERVER_ERROR
    };
    
    let body = Json(serde_json::json!({
        "success": result.success,
        "data": result.data,
        "error": result.error,
        "execution_time_ms": result.execution_time_ms,
    }));
    
    (status, body).into_response()
}