use axum::{
extract::{Request, State},
http::StatusCode,
middleware::Next,
response::{IntoResponse, Response},
Json,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::models::AppState;
use crate::queue::{QueuedRequest, QueryResult, RequestPriority};
pub async fn queue_middleware(
State(state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Result<Response, StatusCode> {
let path = request.uri().path().to_string();
if path == "/health" || path == "/" {
return Ok(next.run(request).await);
}
if !state.global_pool.config().enable_queue {
return Ok(next.run(request).await);
}
match state.global_pool.try_get_connection().await {
Some(_conn) => {
debug!("Connection available, executing immediately: {}", path);
let response = next.run(request).await;
Ok(response)
}
None => {
debug!("Pool exhausted, enqueueing request: {}", path);
enqueue_and_wait(state, request, path).await
}
}
}
async fn enqueue_and_wait(
state: Arc<AppState>,
request: Request,
path: String,
) -> Result<Response, StatusCode> {
let (tx, rx) = tokio::sync::oneshot::channel();
let priority = RequestPriority::from_path(&path);
let queued = QueuedRequest {
id: Uuid::new_v4(),
request,
response_tx: tx,
enqueue_time: std::time::Instant::now(),
priority,
};
if let Some(ref queue_tx) = state.job_queue {
if let Err(e) = queue_tx.send(queued).await {
warn!("Failed to enqueue request: {}", e);
return Err(StatusCode::SERVICE_UNAVAILABLE);
}
state.queue_stats.record_enqueue();
let queue_timeout_ms = state.global_pool.config().queue_timeout_ms;
match timeout(Duration::from_millis(queue_timeout_ms), rx).await {
Ok(Ok(result)) => {
Ok(build_response(result))
}
Ok(Err(_)) => {
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
Err(_) => {
state.queue_stats.record_timeout();
Err(StatusCode::GATEWAY_TIMEOUT)
}
}
} else {
warn!("Job queue not initialized");
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
fn build_response(result: QueryResult) -> Response {
let status = if result.success {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
let body = Json(serde_json::json!({
"success": result.success,
"data": result.data,
"error": result.error,
"execution_time_ms": result.execution_time_ms,
}));
(status, body).into_response()
}