use crate::{
message::{Message, Priority, QueueConfig},
operation_timing::OperationTimingStore,
storage::Storage,
};
use anyhow::Result;
use async_trait::async_trait;
use axum::{
extract::{ws::WebSocketUpgrade, Path, Query, State},
http::{Request, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
routing::get_service,
Json, Router,
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tower_http::services::{ServeDir, ServeFile};
#[derive(Debug, Clone, Serialize)]
struct QueueMetricPoint {
t_ms: i64,
total: usize,
locked: usize,
publish_rate_per_sec: f64,
ack_rate_per_sec: f64,
nack_rate_per_sec: f64,
}
#[derive(Debug, Default)]
struct QueueMetricsStore {
history: std::sync::Mutex<HashMap<String, VecDeque<QueueMetricPoint>>>,
}
impl QueueMetricsStore {
const WINDOW_SECONDS: usize = 10;
const EMA_ALPHA: f64 = 1.0 / 5.0;
fn push_sample(&self, queue: &str, point: QueueMetricPoint) {
let mut history = self.history.lock().expect("queue metrics mutex poisoned");
let buf = history.entry(queue.to_string()).or_default();
let smoothed_point = if let Some(prev) = buf.back() {
let a = Self::EMA_ALPHA;
QueueMetricPoint {
publish_rate_per_sec: a * point.publish_rate_per_sec
+ (1.0 - a) * prev.publish_rate_per_sec,
ack_rate_per_sec: a * point.ack_rate_per_sec + (1.0 - a) * prev.ack_rate_per_sec,
nack_rate_per_sec: a * point.nack_rate_per_sec + (1.0 - a) * prev.nack_rate_per_sec,
..point
}
} else {
point
};
buf.push_back(smoothed_point);
let limit = Self::WINDOW_SECONDS + 1;
while buf.len() > limit {
buf.pop_front();
}
}
fn get_points(&self, queue: &str) -> Vec<QueueMetricPoint> {
let history = self.history.lock().expect("queue metrics mutex poisoned");
history
.get(queue)
.map(|buf| buf.iter().cloned().collect())
.unwrap_or_default()
}
fn get_latest_rates(&self, queue: &str) -> (f64, f64, f64) {
let history = self.history.lock().expect("queue metrics mutex poisoned");
history
.get(queue)
.and_then(|buf| buf.back())
.map(|p| {
(
p.publish_rate_per_sec,
p.ack_rate_per_sec,
p.nack_rate_per_sec,
)
})
.unwrap_or((0.0, 0.0, 0.0))
}
}
#[derive(Debug, Clone, Copy)]
struct RateSnapshot {
at: Instant,
publish_total: u64,
ack_total: u64,
nack_total: u64,
publish_rate_per_sec: f64,
ack_rate_per_sec: f64,
nack_rate_per_sec: f64,
}
#[derive(Debug, Default)]
pub(crate) struct QueueRateTracker {
totals: std::sync::Mutex<HashMap<String, (u64, u64, u64)>>,
last: std::sync::Mutex<HashMap<String, RateSnapshot>>,
}
impl QueueRateTracker {
pub(crate) fn record_publish(&self, queue: &str) {
{
let mut totals = self
.totals
.lock()
.expect("queue rate totals mutex poisoned");
let entry = totals.entry(queue.to_string()).or_insert((0, 0, 0));
entry.0 = entry.0.saturating_add(1);
}
self.seed_baseline(queue);
}
pub(crate) fn record_ack(&self, queue: &str) {
{
let mut totals = self
.totals
.lock()
.expect("queue rate totals mutex poisoned");
let entry = totals.entry(queue.to_string()).or_insert((0, 0, 0));
entry.1 = entry.1.saturating_add(1);
}
self.seed_baseline(queue);
}
pub(crate) fn record_nack(&self, queue: &str) {
{
let mut totals = self
.totals
.lock()
.expect("queue rate totals mutex poisoned");
let entry = totals.entry(queue.to_string()).or_insert((0, 0, 0));
entry.2 = entry.2.saturating_add(1);
}
self.seed_baseline(queue);
}
fn seed_baseline(&self, queue: &str) {
let mut last = self.last.lock().expect("queue rate last mutex poisoned");
last.entry(queue.to_string())
.or_insert_with(|| RateSnapshot {
at: Instant::now(),
publish_total: 0,
ack_total: 0,
nack_total: 0,
publish_rate_per_sec: 0.0,
ack_rate_per_sec: 0.0,
nack_rate_per_sec: 0.0,
});
}
fn compute_rates<'a, I>(&self, queues: I) -> HashMap<String, (f64, f64, f64)>
where
I: IntoIterator<Item = &'a str>,
{
let now = Instant::now();
let totals = self
.totals
.lock()
.expect("queue rate totals mutex poisoned");
let mut last = self.last.lock().expect("queue rate last mutex poisoned");
let mut rates = HashMap::new();
for queue in queues {
let (publish_total, ack_total, nack_total) =
totals.get(queue).copied().unwrap_or((0, 0, 0));
let (publish_rate_per_sec, ack_rate_per_sec, nack_rate_per_sec) = match last.get(queue)
{
Some(prev) => {
let dt = now.duration_since(prev.at).as_secs_f64();
if dt > 0.0 {
let publish_delta = publish_total.saturating_sub(prev.publish_total);
let ack_delta = ack_total.saturating_sub(prev.ack_total);
let nack_delta = nack_total.saturating_sub(prev.nack_total);
(
(publish_delta as f64) / dt,
(ack_delta as f64) / dt,
(nack_delta as f64) / dt,
)
} else {
(
prev.publish_rate_per_sec,
prev.ack_rate_per_sec,
prev.nack_rate_per_sec,
)
}
}
None => (0.0, 0.0, 0.0),
};
last.insert(
queue.to_string(),
RateSnapshot {
at: now,
publish_total,
ack_total,
nack_total,
publish_rate_per_sec,
ack_rate_per_sec,
nack_rate_per_sec,
},
);
rates.insert(
queue.to_string(),
(publish_rate_per_sec, ack_rate_per_sec, nack_rate_per_sec),
);
}
rates
}
fn active_queues(&self) -> Vec<String> {
self.totals
.lock()
.expect("queue rate totals mutex poisoned")
.keys()
.cloned()
.collect()
}
fn rename_queue(&self, from: &str, to: &str) {
if from == to {
return;
}
{
let mut totals = self
.totals
.lock()
.expect("queue rate totals mutex poisoned");
if let Some(v) = totals.remove(from) {
totals.insert(to.to_string(), v);
}
}
{
let mut last = self.last.lock().expect("queue rate last mutex poisoned");
if let Some(snapshot) = last.remove(from) {
last.insert(to.to_string(), snapshot);
}
}
}
}
#[derive(Deserialize)]
pub struct CreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Deserialize)]
pub struct UpdateQueueRequest {
name: String,
config: UpdateQueueConfig,
}
#[derive(Deserialize)]
pub struct UpdateQueueConfig {
name: Option<String>,
allow_duplicates: Option<bool>,
}
#[derive(Deserialize)]
struct PublishRequest {
queue: String,
priority: Priority,
payload: String,
max_retries: Option<u32>,
}
#[derive(Deserialize)]
struct OperationTimingsQuery {
seconds: Option<u64>,
}
#[derive(Deserialize)]
struct BatchAckNackRequest {
consumer_id: String,
message_ids: Vec<String>,
}
#[derive(Deserialize)]
struct ConsumeRequest {
consumer_id: String,
timeout_seconds: Option<u64>,
}
#[derive(Serialize, Deserialize)]
pub struct PublishResponse {
pub id: String,
}
#[derive(Serialize, Deserialize)]
pub struct ConsumeResponse {
pub id: String,
pub payload: String,
pub retry_count: u32,
}
#[derive(Serialize)]
struct DeleteQueueResponse {
queue: String,
deleted_messages: usize,
}
#[derive(Serialize)]
struct PurgeQueueResponse {
queue: String,
purged_messages: usize,
}
#[derive(Serialize)]
struct PurgeAllResponse {
purged_queues: usize,
purged_messages: usize,
}
#[derive(Serialize)]
struct DeleteAllResponse {
deleted_queues: usize,
deleted_messages: usize,
}
#[derive(Clone)]
pub struct ApiServer {
storage: Arc<dyn StorageApi>,
rate_tracker: Arc<QueueRateTracker>,
metrics: Arc<QueueMetricsStore>,
metrics_sampler_started: Arc<AtomicBool>,
timings: Arc<OperationTimingStore>,
log_buffer: Option<crate::log_buffer::LogBuffer>,
initializing: Arc<AtomicBool>,
memory_pressure: Arc<AtomicBool>,
}
#[async_trait]
pub trait StorageApi: Send + Sync {
async fn queue_exists(&self, queue_name: &str) -> Result<bool>;
async fn create_queue(&self, queue_name: &str, config: QueueConfig);
async fn rename_queue(
&self,
from: &str,
to: &str,
) -> std::result::Result<(), crate::storage::RenameQueueError>;
async fn push(&self, msg: Message) -> Result<String>;
async fn pop(
&self,
queue: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<Option<Message>>;
async fn ack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool>;
async fn nack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool>;
async fn renew(
&self,
queue: &str,
message_id: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<bool>;
async fn batch_ack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<crate::message::BatchAckResult>;
async fn batch_nack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<crate::message::BatchNackResult>;
async fn delete_queue(&self, queue_name: &str) -> Result<usize>;
async fn purge_queue(&self, queue_name: &str) -> Result<usize>;
async fn get_all_queue_stats(&self) -> Result<Vec<crate::message::QueueStats>>;
async fn get_queue_stats(&self, queue_name: &str) -> Result<crate::message::QueueStats>;
async fn list_queues(&self) -> Result<Vec<String>>;
async fn unlock_expired_messages(&self) -> Result<usize>;
async fn force_unlock_queue(&self, queue_name: &str) -> Result<usize>;
async fn storage_metrics(&self) -> serde_json::Value {
serde_json::json!({})
}
}
#[async_trait]
impl StorageApi for Storage {
async fn queue_exists(&self, queue_name: &str) -> Result<bool> {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.queue_exists(&q))
.await
.expect("spawn_blocking panicked")
}
async fn create_queue(&self, queue_name: &str, config: QueueConfig) {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.create_queue(&q, config))
.await
.expect("spawn_blocking panicked");
}
async fn rename_queue(
&self,
from: &str,
to: &str,
) -> std::result::Result<(), crate::storage::RenameQueueError> {
let s = self.clone();
let f = from.to_string();
let t = to.to_string();
tokio::task::spawn_blocking(move || s.rename_queue(&f, &t))
.await
.expect("spawn_blocking panicked")
}
async fn push(&self, msg: Message) -> Result<String> {
let s = self.clone();
tokio::task::spawn_blocking(move || s.push(msg))
.await
.expect("spawn_blocking panicked")
}
async fn pop(
&self,
queue: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<Option<Message>> {
let s = self.clone();
let q = queue.to_string();
let c = consumer_id.to_string();
tokio::task::spawn_blocking(move || s.pop(&q, &c, timeout_secs))
.await
.expect("spawn_blocking panicked")
}
async fn ack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool> {
let s = self.clone();
let q = queue.to_string();
let m = message_id.to_string();
let c = consumer_id.to_string();
tokio::task::spawn_blocking(move || s.ack(&q, &m, &c))
.await
.expect("spawn_blocking panicked")
}
async fn nack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool> {
let s = self.clone();
let q = queue.to_string();
let m = message_id.to_string();
let c = consumer_id.to_string();
tokio::task::spawn_blocking(move || s.nack(&q, &m, &c))
.await
.expect("spawn_blocking panicked")
}
async fn renew(
&self,
queue: &str,
message_id: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<bool> {
let s = self.clone();
let q = queue.to_string();
let m = message_id.to_string();
let c = consumer_id.to_string();
tokio::task::spawn_blocking(move || s.renew(&q, &m, &c, timeout_secs))
.await
.expect("spawn_blocking panicked")
}
async fn batch_ack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<crate::message::BatchAckResult> {
let s = self.clone();
let q = queue.to_string();
let c = consumer_id.to_string();
let ids = message_ids.to_vec();
tokio::task::spawn_blocking(move || s.batch_ack(&q, &c, &ids))
.await
.expect("spawn_blocking panicked")
}
async fn batch_nack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<crate::message::BatchNackResult> {
let s = self.clone();
let q = queue.to_string();
let c = consumer_id.to_string();
let ids = message_ids.to_vec();
tokio::task::spawn_blocking(move || s.batch_nack(&q, &c, &ids))
.await
.expect("spawn_blocking panicked")
}
async fn delete_queue(&self, queue_name: &str) -> Result<usize> {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.delete_queue(&q))
.await
.expect("spawn_blocking panicked")
}
async fn purge_queue(&self, queue_name: &str) -> Result<usize> {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.purge_queue(&q))
.await
.expect("spawn_blocking panicked")
}
async fn get_all_queue_stats(&self) -> Result<Vec<crate::message::QueueStats>> {
let s = self.clone();
tokio::task::spawn_blocking(move || s.get_all_queue_stats())
.await
.expect("spawn_blocking panicked")
}
async fn get_queue_stats(&self, queue_name: &str) -> Result<crate::message::QueueStats> {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.get_queue_stats(&q))
.await
.expect("spawn_blocking panicked")
}
async fn list_queues(&self) -> Result<Vec<String>> {
let s = self.clone();
tokio::task::spawn_blocking(move || s.list_queues())
.await
.expect("spawn_blocking panicked")
}
async fn unlock_expired_messages(&self) -> Result<usize> {
let s = self.clone();
tokio::task::spawn_blocking(move || s.unlock_expired_messages())
.await
.expect("spawn_blocking panicked")
}
async fn force_unlock_queue(&self, queue_name: &str) -> Result<usize> {
let s = self.clone();
let q = queue_name.to_string();
tokio::task::spawn_blocking(move || s.force_unlock_queue_sync(&q))
.await
.expect("spawn_blocking panicked")
}
async fn storage_metrics(&self) -> serde_json::Value {
let s = self.clone();
tokio::task::spawn_blocking(move || s.extended_metrics())
.await
.expect("spawn_blocking panicked")
}
}
async fn startup_gate(
State(server): State<Arc<ApiServer>>,
request: Request<axum::body::Body>,
next: Next,
) -> Response {
if server.initializing.load(Ordering::Relaxed) {
let path = request.uri().path();
let is_mutation = path == "/create-queue"
|| path == "/update-queue"
|| path == "/publish"
|| path == "/purge-all"
|| path == "/delete-all"
|| path.starts_with("/delete-queue/")
|| path.starts_with("/purge-queue/")
|| path.starts_with("/consume/")
|| path.starts_with("/ack/")
|| path.starts_with("/nack/")
|| path.starts_with("/ack-batch/")
|| path.starts_with("/nack-batch/");
if is_mutation {
return (
StatusCode::SERVICE_UNAVAILABLE,
[(axum::http::header::RETRY_AFTER, "2")],
Json(serde_json::json!({
"status": "starting",
"message": "Qrusty is initialising. Retry shortly.",
"retry_after_secs": 2
})),
)
.into_response();
}
}
next.run(request).await
}
impl ApiServer {
pub fn new(storage: Arc<dyn StorageApi>) -> Self {
Self {
storage,
rate_tracker: Arc::new(QueueRateTracker::default()),
metrics: Arc::new(QueueMetricsStore::default()),
metrics_sampler_started: Arc::new(AtomicBool::new(false)),
timings: Arc::new(OperationTimingStore::new()),
log_buffer: None,
initializing: Arc::new(AtomicBool::new(false)),
memory_pressure: Arc::new(AtomicBool::new(false)),
}
}
pub fn with_initializing(mut self, flag: Arc<AtomicBool>) -> Self {
self.initializing = flag;
self
}
pub fn with_memory_pressure(mut self, flag: Arc<AtomicBool>) -> Self {
self.memory_pressure = flag;
self
}
pub fn with_log_buffer(mut self, log_buffer: crate::log_buffer::LogBuffer) -> Self {
self.log_buffer = Some(log_buffer);
self
}
pub fn timings(&self) -> &Arc<OperationTimingStore> {
&self.timings
}
fn ensure_metrics_sampler_started(&self) {
if self
.metrics_sampler_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
let storage_weak = Arc::downgrade(&self.storage);
let rate_tracker_weak = Arc::downgrade(&self.rate_tracker);
let metrics_weak = Arc::downgrade(&self.metrics);
let timings_weak = Arc::downgrade(&self.timings);
tokio::spawn(async move {
let start = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
let mut ticker = tokio::time::interval_at(start, std::time::Duration::from_secs(1));
loop {
ticker.tick().await;
let Some(storage) = storage_weak.upgrade() else {
break;
};
let Some(rate_tracker) = rate_tracker_weak.upgrade() else {
break;
};
let Some(metrics) = metrics_weak.upgrade() else {
break;
};
let storage_start = Instant::now();
let all_stats = match storage.get_all_queue_stats().await {
Ok(s) => s,
Err(_) => continue,
};
if let Some(timings) = timings_weak.upgrade() {
timings.record("storage_get_all_queue_stats", storage_start.elapsed());
}
let stats_by_name: std::collections::HashMap<&str, _> =
all_stats.iter().map(|s| (s.name.as_str(), s)).collect();
let active = rate_tracker.active_queues();
let all_queue_names: std::collections::HashSet<&str> = stats_by_name
.keys()
.copied()
.chain(active.iter().map(|s| s.as_str()))
.collect();
if all_queue_names.is_empty() {
continue;
}
let rates = rate_tracker.compute_rates(all_queue_names.iter().copied());
let now_ms = Utc::now().timestamp_millis();
for queue_name in &all_queue_names {
let (publish_rate_per_sec, ack_rate_per_sec, nack_rate_per_sec) =
rates.get(*queue_name).copied().unwrap_or((0.0, 0.0, 0.0));
let (total, locked) = stats_by_name
.get(queue_name)
.map(|s| (s.total, s.locked))
.unwrap_or((0, 0));
metrics.push_sample(
queue_name,
QueueMetricPoint {
t_ms: now_ms,
total,
locked,
publish_rate_per_sec,
ack_rate_per_sec,
nack_rate_per_sec,
},
);
}
}
});
}
pub fn router(&self) -> Router {
let webui_dir =
std::env::var("WEBUI_DIR").unwrap_or_else(|_| "/opt/qrusty/webui".to_string());
let state = Arc::new(self.clone());
self.ensure_metrics_sampler_started();
Router::new()
.route(
"/create-queue",
axum::routing::post(ApiServer::create_queue),
)
.route(
"/update-queue",
axum::routing::post(ApiServer::update_queue),
)
.route(
"/delete-queue/{queue}",
axum::routing::delete(ApiServer::delete_queue),
)
.route(
"/purge-queue/{queue}",
axum::routing::post(ApiServer::purge_queue),
)
.route(
"/force-unlock/{queue}",
axum::routing::post(ApiServer::force_unlock_queue),
)
.route("/purge-all", axum::routing::post(ApiServer::purge_all))
.route("/delete-all", axum::routing::post(ApiServer::delete_all))
.route("/publish", axum::routing::post(ApiServer::publish))
.route("/consume/{queue}", axum::routing::post(ApiServer::consume))
.route("/ack/{queue}/{id}", axum::routing::post(ApiServer::ack))
.route("/nack/{queue}/{id}", axum::routing::post(ApiServer::nack))
.route(
"/ack-batch/{queue}",
axum::routing::post(ApiServer::ack_batch),
)
.route(
"/nack-batch/{queue}",
axum::routing::post(ApiServer::nack_batch),
)
.route("/stats", axum::routing::get(ApiServer::stats))
.route("/health", axum::routing::get(ApiServer::health))
.route(
"/queue-stats/{queue}",
axum::routing::get(ApiServer::queue_stats),
)
.route(
"/queues/{queue}/metrics",
axum::routing::get(ApiServer::queue_metrics),
)
.route("/queues", axum::routing::get(ApiServer::queues))
.route(
"/operation-timings",
axum::routing::get(ApiServer::operation_timings),
)
.route("/ws", axum::routing::get(ApiServer::ws_upgrade))
.nest_service(
"/ui",
get_service(
ServeDir::new(webui_dir.clone()).not_found_service(get_service(
ServeFile::new(format!("{}/index.html", webui_dir)),
)),
)
.handle_error(|_error| async { StatusCode::INTERNAL_SERVER_ERROR }),
)
.nest_service(
"/assets",
get_service(ServeDir::new(format!("{}/assets", webui_dir)))
.handle_error(|_error| async { StatusCode::INTERNAL_SERVER_ERROR }),
)
.nest_service(
"/favicon.ico",
get_service(ServeFile::new(format!("{}/favicon.ico", webui_dir))),
)
.nest_service(
"/qrusty_192x192.png",
get_service(ServeFile::new(format!("{}/qrusty_192x192.png", webui_dir))),
)
.layer(axum::middleware::from_fn_with_state(
state.clone(),
startup_gate,
))
.with_state(state)
}
async fn health(State(server): State<Arc<ApiServer>>) -> impl IntoResponse {
if server.initializing.load(Ordering::Relaxed) {
(
StatusCode::OK,
Json(serde_json::json!({
"status": "starting",
"message": "Server is loading persistent storage cache. Please wait."
})),
)
.into_response()
} else {
let pressure = server.memory_pressure.load(Ordering::Relaxed);
(
StatusCode::OK,
Json(serde_json::json!({
"status": "ok",
"memory_pressure": pressure
})),
)
.into_response()
}
}
async fn ws_upgrade(
State(server): State<Arc<ApiServer>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
let storage = server.storage.clone();
let rate_tracker = server.rate_tracker.clone();
let log_buffer = server.log_buffer.clone();
let pressure = server.memory_pressure.clone();
ws.on_upgrade(move |socket| {
crate::ws::handle_connection(socket, storage, rate_tracker, log_buffer, pressure)
})
}
pub async fn create_queue(
State(server): State<Arc<ApiServer>>,
Json(req): Json<CreateQueueRequest>,
) -> Result<StatusCode, StatusCode> {
let exists = server
.storage
.queue_exists(&req.name)
.await
.map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
if exists {
return Err(StatusCode::CONFLICT);
}
server.storage.create_queue(&req.name, req.config).await;
Ok(StatusCode::OK)
}
pub async fn update_queue(
State(server): State<Arc<ApiServer>>,
Json(req): Json<UpdateQueueRequest>,
) -> Result<StatusCode, StatusCode> {
if req.name.trim().is_empty() {
return Err(StatusCode::BAD_REQUEST);
}
if !server
.storage
.queue_exists(&req.name)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
{
return Err(StatusCode::NOT_FOUND);
}
let current_config = server
.storage
.get_queue_stats(&req.name)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.config;
let target_name = req.config.name.as_deref().unwrap_or(&req.name);
if let Some(ref new_name) = req.config.name {
if new_name.trim().is_empty() {
return Err(StatusCode::BAD_REQUEST);
}
if new_name != &req.name {
match server.storage.rename_queue(&req.name, new_name).await {
Ok(()) => {
server.rate_tracker.rename_queue(&req.name, new_name);
}
Err(crate::storage::RenameQueueError::NotFound) => {
return Err(StatusCode::NOT_FOUND);
}
Err(crate::storage::RenameQueueError::AlreadyExists) => {
return Err(StatusCode::CONFLICT);
}
Err(crate::storage::RenameQueueError::Storage(_)) => {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
}
}
if let Some(allow_duplicates) = req.config.allow_duplicates {
let new_config = QueueConfig {
ordering: current_config.ordering, allow_duplicates,
priority_kind: current_config.priority_kind, };
server.storage.create_queue(target_name, new_config).await;
}
Ok(StatusCode::OK)
}
async fn delete_queue(
Path(queue): Path<String>,
State(api): State<Arc<ApiServer>>,
) -> Result<Json<DeleteQueueResponse>, (StatusCode, Json<serde_json::Value>)> {
match api.storage.delete_queue(&queue).await {
Ok(deleted_count) => {
tracing::info!("Deleted queue '{}' with {} messages", queue, deleted_count);
Ok(Json(DeleteQueueResponse {
queue,
deleted_messages: deleted_count,
}))
}
Err(e) => {
tracing::error!("Failed to delete queue '{}': {}", queue, e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to delete queue: {}", e)
})),
))
}
}
}
async fn purge_queue(
Path(queue): Path<String>,
State(api): State<Arc<ApiServer>>,
) -> Result<Json<PurgeQueueResponse>, (StatusCode, Json<serde_json::Value>)> {
match api.storage.purge_queue(&queue).await {
Ok(purged_count) => {
tracing::info!("Purged {} messages from queue '{}'", purged_count, queue);
Ok(Json(PurgeQueueResponse {
queue,
purged_messages: purged_count,
}))
}
Err(e) => {
tracing::error!("Failed to purge queue '{}': {}", queue, e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to purge queue: {}", e)
})),
))
}
}
}
async fn force_unlock_queue(
Path(queue): Path<String>,
State(api): State<Arc<ApiServer>>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
match api.storage.force_unlock_queue(&queue).await {
Ok(count) => {
tracing::info!("Force-unlocked {} messages on queue '{}'", count, queue);
Ok(Json(serde_json::json!({
"queue": queue,
"unlocked": count,
})))
}
Err(e) => {
tracing::error!("Failed to force-unlock queue '{}': {}", queue, e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to force-unlock queue: {}", e)
})),
))
}
}
}
async fn purge_all(
State(api): State<Arc<ApiServer>>,
) -> Result<Json<PurgeAllResponse>, (StatusCode, Json<serde_json::Value>)> {
let stats = api.storage.get_all_queue_stats().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to list queues: {}", e) })),
)
})?;
let queues: Vec<String> = stats.into_iter().map(|s| s.name).collect();
let mut purged_queues = 0usize;
let mut purged_messages = 0usize;
for q in &queues {
match api.storage.purge_queue(q).await {
Ok(count) => {
purged_queues += 1;
purged_messages += count;
}
Err(e) => {
tracing::error!("Failed to purge queue '{}': {}", q, e);
}
}
}
tracing::info!(
"Purged all: {} queues, {} messages",
purged_queues,
purged_messages
);
Ok(Json(PurgeAllResponse {
purged_queues,
purged_messages,
}))
}
async fn delete_all(
State(api): State<Arc<ApiServer>>,
) -> Result<Json<DeleteAllResponse>, (StatusCode, Json<serde_json::Value>)> {
let stats = api.storage.get_all_queue_stats().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": format!("Failed to list queues: {}", e) })),
)
})?;
let queues: Vec<String> = stats.into_iter().map(|s| s.name).collect();
let mut deleted_queues = 0usize;
let mut deleted_messages = 0usize;
for q in &queues {
match api.storage.delete_queue(q).await {
Ok(count) => {
deleted_queues += 1;
deleted_messages += count;
}
Err(e) => {
tracing::error!("Failed to delete queue '{}': {}", q, e);
}
}
}
tracing::info!(
"Deleted all: {} queues, {} messages",
deleted_queues,
deleted_messages
);
Ok(Json(DeleteAllResponse {
deleted_queues,
deleted_messages,
}))
}
async fn publish(
State(server): State<Arc<ApiServer>>,
Json(req): Json<PublishRequest>,
) -> Result<Json<PublishResponse>, (StatusCode, Json<serde_json::Value>)> {
if server.memory_pressure.load(Ordering::Relaxed) {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Server is under memory pressure. Try again later."
})),
));
}
let api_start = Instant::now();
let queue_name = req.queue.clone();
let msg = Message {
id: uuid::Uuid::new_v4().to_string(),
queue: queue_name.clone(),
priority: req.priority,
payload: req.payload,
created_at: Utc::now(),
locked_until: None,
locked_by: None,
retry_count: 0,
max_retries: req.max_retries.unwrap_or(3),
payload_ref: None,
payload_hash: None,
};
let storage_start = Instant::now();
let id = server.storage.push(msg).await.map_err(|e| {
let msg = e.to_string();
if msg.contains("Priority kind mismatch")
|| msg.contains("Text priority must not")
|| msg.contains("Duplicate payload rejected")
{
(
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": msg })),
)
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": msg })),
)
}
})?;
server
.timings
.record("storage_push", storage_start.elapsed());
server.rate_tracker.record_publish(&queue_name);
server.timings.record("publish", api_start.elapsed());
Ok(Json(PublishResponse { id }))
}
async fn consume(
State(server): State<Arc<ApiServer>>,
Path(queue): Path<String>,
Json(req): Json<ConsumeRequest>,
) -> Result<Json<Option<ConsumeResponse>>, StatusCode> {
let api_start = Instant::now();
let timeout = req.timeout_seconds.unwrap_or(30);
let storage_start = Instant::now();
let msg = server
.storage
.pop(&queue, &req.consumer_id, timeout)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
server
.timings
.record("storage_pop", storage_start.elapsed());
server.timings.record("consume", api_start.elapsed());
Ok(Json(msg.map(|m| ConsumeResponse {
id: m.id,
payload: m.payload,
retry_count: m.retry_count,
})))
}
async fn ack(
State(server): State<Arc<ApiServer>>,
Path((queue, id)): Path<(String, String)>,
Json(req): Json<ConsumeRequest>,
) -> Result<StatusCode, StatusCode> {
let api_start = Instant::now();
let storage_start = Instant::now();
let success = server
.storage
.ack(&queue, &id, &req.consumer_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
server
.timings
.record("storage_ack", storage_start.elapsed());
if success {
server.rate_tracker.record_ack(&queue);
}
server.timings.record("ack", api_start.elapsed());
if success {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}
async fn nack(
State(server): State<Arc<ApiServer>>,
Path((queue, id)): Path<(String, String)>,
Json(req): Json<ConsumeRequest>,
) -> Result<StatusCode, StatusCode> {
let api_start = Instant::now();
let storage_start = Instant::now();
let success = server
.storage
.nack(&queue, &id, &req.consumer_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
server
.timings
.record("storage_nack", storage_start.elapsed());
if success {
server.rate_tracker.record_nack(&queue);
}
server.timings.record("nack", api_start.elapsed());
if success {
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}
async fn ack_batch(
State(server): State<Arc<ApiServer>>,
Path(queue): Path<String>,
Json(req): Json<BatchAckNackRequest>,
) -> Result<Json<crate::message::BatchAckResult>, StatusCode> {
let api_start = Instant::now();
let storage_start = Instant::now();
let result = server
.storage
.batch_ack(&queue, &req.consumer_id, &req.message_ids)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
server
.timings
.record("storage_batch_ack", storage_start.elapsed());
for _ in &result.acked {
server.rate_tracker.record_ack(&queue);
}
server.timings.record("ack_batch", api_start.elapsed());
Ok(Json(result))
}
async fn nack_batch(
State(server): State<Arc<ApiServer>>,
Path(queue): Path<String>,
Json(req): Json<BatchAckNackRequest>,
) -> Result<Json<crate::message::BatchNackResult>, StatusCode> {
let api_start = Instant::now();
let storage_start = Instant::now();
let result = server
.storage
.batch_nack(&queue, &req.consumer_id, &req.message_ids)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
server
.timings
.record("storage_batch_nack", storage_start.elapsed());
let nacked = result.unlocked.len() + result.dead_lettered.len() + result.dropped.len();
for _ in 0..nacked {
server.rate_tracker.record_nack(&queue);
}
server.timings.record("nack_batch", api_start.elapsed());
Ok(Json(result))
}
async fn queue_stats(
Path(queue): Path<String>,
State(server): State<Arc<ApiServer>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let api_start = Instant::now();
let storage_start = Instant::now();
let stats_result = server.storage.get_queue_stats(&queue).await;
server
.timings
.record("storage_get_queue_stats", storage_start.elapsed());
match stats_result {
Ok(stats) => {
let (publish_rate_per_sec, ack_rate_per_sec, nack_rate_per_sec) =
server.metrics.get_latest_rates(&queue);
let mut value = serde_json::to_value(stats)
.unwrap_or(serde_json::json!({"error": "serialization"}));
if let serde_json::Value::Object(ref mut map) = value {
map.insert(
"publish_rate_per_sec".to_string(),
serde_json::json!(publish_rate_per_sec),
);
map.insert(
"ack_rate_per_sec".to_string(),
serde_json::json!(ack_rate_per_sec),
);
map.insert(
"nack_rate_per_sec".to_string(),
serde_json::json!(nack_rate_per_sec),
);
}
server.timings.record("queue_stats", api_start.elapsed());
Ok(Json(value))
}
Err(_) => {
server.timings.record("queue_stats", api_start.elapsed());
Err(StatusCode::NOT_FOUND)
}
}
}
async fn queues(State(server): State<Arc<ApiServer>>) -> Result<Json<Vec<String>>, StatusCode> {
match server.storage.list_queues().await {
Ok(queues) => Ok(Json(queues)),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
async fn operation_timings(
State(server): State<Arc<ApiServer>>,
Query(params): Query<OperationTimingsQuery>,
) -> Json<serde_json::Value> {
let seconds = params
.seconds
.unwrap_or(crate::operation_timing::DEFAULT_WINDOW_SECS)
.clamp(1, 3600);
let data = server.timings.query_all(seconds);
Json(serde_json::json!(data))
}
async fn queue_metrics(
Path(queue): Path<String>,
State(server): State<Arc<ApiServer>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
server
.storage
.get_queue_stats(&queue)
.await
.map_err(|_| StatusCode::NOT_FOUND)?;
let points = server.metrics.get_points(&queue);
Ok(Json(serde_json::json!({
"queue": queue,
"resolution_seconds": 1,
"window_seconds": QueueMetricsStore::WINDOW_SECONDS,
"points": points,
})))
}
pub async fn stats(
State(server): State<Arc<ApiServer>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
match server.storage.get_all_queue_stats().await {
Ok(queue_stats_vec) => {
let total_queues = queue_stats_vec.len();
let total_messages: usize = queue_stats_vec.iter().map(|q| q.total).sum();
let total_available: usize = queue_stats_vec.iter().map(|q| q.available).sum();
let total_locked: usize = queue_stats_vec.iter().map(|q| q.locked).sum();
let queues_json: Vec<serde_json::Value> = queue_stats_vec
.iter()
.map(|q| {
let mut value = serde_json::to_value(q)
.unwrap_or(serde_json::json!({"error": "serialization"}));
if let serde_json::Value::Object(ref mut map) = value {
let (publish_rate_per_sec, ack_rate_per_sec, nack_rate_per_sec) =
server.metrics.get_latest_rates(&q.name);
map.insert(
"publish_rate_per_sec".to_string(),
serde_json::json!(publish_rate_per_sec),
);
map.insert(
"ack_rate_per_sec".to_string(),
serde_json::json!(ack_rate_per_sec),
);
map.insert(
"nack_rate_per_sec".to_string(),
serde_json::json!(nack_rate_per_sec),
);
}
value
})
.collect();
let (mem_usage, mem_limit) = crate::memory_monitor::MemoryMonitor::usage_snapshot();
let storage_metrics = server.storage.storage_metrics().await;
let summary = serde_json::json!({
"total_queues": total_queues,
"total_messages": total_messages,
"total_available": total_available,
"total_locked": total_locked,
"memory_usage_bytes": mem_usage,
"memory_limit_bytes": mem_limit,
"memory_pressure": mem_usage as f64 / mem_limit.max(1) as f64 > 0.85,
"storage": storage_metrics
});
Ok(Json(serde_json::json!({
"queues": queues_json,
"summary": summary
})))
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use httpmock::Method::{GET, POST};
use httpmock::MockServer;
#[tokio::test]
async fn test_create_queue_endpoint() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/create-queue");
then.status(200);
});
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/create-queue", server.url("")))
.json(&serde_json::json!({
"name": "test-queue",
"config": {
"ordering": "MinFirst"
}
}))
.send()
.await
.unwrap();
mock.assert();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn test_publish_endpoint() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/publish");
then.status(200).json_body_obj(&serde_json::json!({
"id": "01234567-89ab-cdef-0123-456789abcdef"
}));
});
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/publish", server.url("")))
.json(&serde_json::json!({
"queue": "orders",
"priority": 100,
"payload": "{\"order_id\": 456}",
"max_retries": 3
}))
.send()
.await
.unwrap();
mock.assert();
assert_eq!(resp.status(), 200);
let body: PublishResponse = resp.json().await.unwrap();
assert_eq!(body.id, "01234567-89ab-cdef-0123-456789abcdef");
}
#[tokio::test]
async fn test_consume_endpoint() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/consume/orders");
then.status(200).json_body_obj(&serde_json::json!({
"id": "01234567-89ab-cdef-0123-456789abcdef",
"payload": "{\"order_id\": 456}",
"retry_count": 0
}));
});
let client = reqwest::Client::new();
let resp = client
.post(format!("{}/consume/orders", server.url("")))
.json(&serde_json::json!({
"consumer_id": "worker-node-1",
"timeout_seconds": 30
}))
.send()
.await
.unwrap();
mock.assert();
assert_eq!(resp.status(), 200);
let body: Option<ConsumeResponse> = resp.json().await.unwrap();
assert!(body.is_some());
let consume_response = body.unwrap();
assert_eq!(consume_response.id, "01234567-89ab-cdef-0123-456789abcdef");
assert_eq!(consume_response.payload, "{\"order_id\": 456}");
assert_eq!(consume_response.retry_count, 0);
}
#[tokio::test]
async fn test_ack_endpoint() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST)
.path("/ack/orders/01234567-89ab-cdef-0123-456789abcdef");
then.status(200);
});
let client = reqwest::Client::new();
let resp = client
.post(format!(
"{}/ack/orders/01234567-89ab-cdef-0123-456789abcdef",
server.url("")
))
.json(&serde_json::json!({
"consumer_id": "worker-node-1"
}))
.send()
.await
.unwrap();
mock.assert();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn test_nack_endpoint() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST)
.path("/nack/orders/01234567-89ab-cdef-0123-456789abcdef");
then.status(200);
});
let client = reqwest::Client::new();
let resp = client
.post(format!(
"{}/nack/orders/01234567-89ab-cdef-0123-456789abcdef",
server.url("")
))
.json(&serde_json::json!({
"consumer_id": "worker-node-1"
}))
.send()
.await
.unwrap();
mock.assert();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn test_stats_endpoint() {
let server = MockServer::start();
let _mock = server.mock(|when, then| {
when.method(GET).path("/stats");
then.status(200).json_body_obj(&serde_json::json!({
"queues": [
{
"name": "orders",
"available": 10,
"locked": 2,
"total": 12
}
]
}));
});
}
}