pg-api 0.2.0

A high-performance PostgreSQL REST API driver with rate limiting, connection pooling, and observability
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{info, error, debug, warn};

use crate::models::AppState;
use crate::queue::{QueuedRequest, QueryResult};

/// Inicia os workers que processam a fila global
pub async fn start_workers(
    state: Arc<AppState>,
    rx: mpsc::Receiver<QueuedRequest>,
    num_workers: usize,
) {
    info!("Starting {} workers for global queue", num_workers);
    
    // Compartilha o receiver entre workers usando Arc<Mutex<>>
    let rx = Arc::new(tokio::sync::Mutex::new(rx));
    
    for worker_id in 0..num_workers {
        let state = state.clone();
        let rx = rx.clone();
        
        tokio::spawn(async move {
            info!("Worker {} started", worker_id);
            
            loop {
                let request = {
                    let mut rx_guard = rx.lock().await;
                    rx_guard.recv().await
                };
                
                match request {
                    Some(request) => {
                        let wait_time = request.enqueue_time.elapsed();
                        let wait_time_ms = wait_time.as_millis() as u64;
                        
                        debug!(
                            "Worker {} processing request {} (waited {}ms)",
                            worker_id, request.id, wait_time_ms
                        );
                        
                        // Processa a requisição
                        let result = process_request(&state, request.request).await;
                        
                        // Envia resultado de volta
                        if request.response_tx.send(result).is_err() {
                            warn!("Worker {}: Client disconnected before receiving response", worker_id);
                        }
                        
                        // Atualiza estatísticas
                        state.queue_stats.record_process(wait_time_ms);
                    }
                    None => {
                        info!("Worker {}: Channel closed, stopping", worker_id);
                        break;
                    }
                }
            }
        });
    }
}

/// Processa uma requisição enfileirada
async fn process_request(state: &AppState, _request: axum::extract::Request) -> QueryResult {
    // Obtém conexão do pool (pode aguardar se necessário)
    let start = std::time::Instant::now();
    
    match state.global_pool.get_connection().await {
        Ok(_conn) => {
            // TODO: Aqui executaríamos a query real
            // Por enquanto, simula sucesso
            let elapsed = start.elapsed().as_millis() as u64;
            
            QueryResult {
                success: true,
                data: Some(serde_json::json!({"processed": true})),
                error: None,
                execution_time_ms: elapsed,
            }
        }
        Err(e) => {
            error!("Failed to get connection from pool: {}", e);
            QueryResult {
                success: false,
                data: None,
                error: Some(format!("Pool error: {}", e)),
                execution_time_ms: 0,
            }
        }
    }
}