use axum::extract::Request;
use serde_json::Value;
use std::time::Instant;
use tokio::sync::oneshot;
use uuid::Uuid;
#[derive(Debug)]
pub struct QueuedRequest {
pub id: Uuid,
pub request: Request,
pub response_tx: oneshot::Sender<QueryResult>,
pub enqueue_time: Instant,
pub priority: RequestPriority,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RequestPriority {
Critical = 0,
High = 1,
Normal = 2,
Low = 3,
Background = 4,
}
impl RequestPriority {
pub fn from_path(path: &str) -> Self {
match path {
"/health" => Self::Critical,
"/v1/query" => Self::Normal,
"/v1/batch" => Self::Normal,
"/v1/transaction" => Self::High,
_ => Self::Normal,
}
}
}
#[derive(Debug, Clone)]
#[derive(Default)]
pub struct QueryResult {
pub success: bool,
pub data: Option<Value>,
pub error: Option<String>,
pub execution_time_ms: u64,
}
#[derive(Debug, Default)]
pub struct QueueStats {
pub total_enqueued: std::sync::atomic::AtomicU64,
pub total_processed: std::sync::atomic::AtomicU64,
pub total_timeout: std::sync::atomic::AtomicU64,
pub current_size: std::sync::atomic::AtomicU64,
pub avg_wait_time_ms: std::sync::atomic::AtomicU64,
}
impl QueueStats {
pub fn record_enqueue(&self) {
self.total_enqueued.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.current_size.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_process(&self, wait_time_ms: u64) {
self.total_processed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.current_size.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let current_avg = self.avg_wait_time_ms.load(std::sync::atomic::Ordering::Relaxed);
let new_avg = (current_avg * 9 + wait_time_ms) / 10;
self.avg_wait_time_ms.store(new_avg, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_timeout(&self) {
self.total_timeout.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.current_size.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}