1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{info, error, debug, warn};
use crate::models::AppState;
use crate::queue::{QueuedRequest, QueryResult};
/// Inicia os workers que processam a fila global
pub async fn start_workers(
state: Arc<AppState>,
rx: mpsc::Receiver<QueuedRequest>,
num_workers: usize,
) {
info!("Starting {} workers for global queue", num_workers);
// Compartilha o receiver entre workers usando Arc<Mutex<>>
let rx = Arc::new(tokio::sync::Mutex::new(rx));
for worker_id in 0..num_workers {
let state = state.clone();
let rx = rx.clone();
tokio::spawn(async move {
info!("Worker {} started", worker_id);
loop {
let request = {
let mut rx_guard = rx.lock().await;
rx_guard.recv().await
};
match request {
Some(request) => {
let wait_time = request.enqueue_time.elapsed();
let wait_time_ms = wait_time.as_millis() as u64;
debug!(
"Worker {} processing request {} (waited {}ms)",
worker_id, request.id, wait_time_ms
);
// Processa a requisição
let result = process_request(&state, request.request).await;
// Envia resultado de volta
if request.response_tx.send(result).is_err() {
warn!("Worker {}: Client disconnected before receiving response", worker_id);
}
// Atualiza estatísticas
state.queue_stats.record_process(wait_time_ms);
}
None => {
info!("Worker {}: Channel closed, stopping", worker_id);
break;
}
}
}
});
}
}
/// Processa uma requisição enfileirada
async fn process_request(state: &AppState, _request: axum::extract::Request) -> QueryResult {
// Obtém conexão do pool (pode aguardar se necessário)
let start = std::time::Instant::now();
match state.global_pool.get_connection().await {
Ok(_conn) => {
// TODO: Aqui executaríamos a query real
// Por enquanto, simula sucesso
let elapsed = start.elapsed().as_millis() as u64;
QueryResult {
success: true,
data: Some(serde_json::json!({"processed": true})),
error: None,
execution_time_ms: elapsed,
}
}
Err(e) => {
error!("Failed to get connection from pool: {}", e);
QueryResult {
success: false,
data: None,
error: Some(format!("Pool error: {}", e)),
execution_time_ms: 0,
}
}
}
}