#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
mod api;
mod log_buffer;
mod memory_monitor;
mod memory_storage;
mod message;
mod operation_timing;
mod payload_store;
mod storage;
mod ws;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::net::TcpListener;
use tower::limit::ConcurrencyLimitLayer;
use tower_http::cors::CorsLayer;
use crate::api::{ApiServer, StorageApi};
use crate::storage::Storage;
const DESIRED_NOFILE: u64 = 65_536;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
print_banner();
raise_fd_limit();
let log_buf = crate::log_buffer::LogBuffer::new();
let capture_layer = crate::log_buffer::LogCaptureLayer::new(log_buf.clone());
use tracing_subscriber::layer::SubscriberExt;
let subscriber = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(capture_layer);
let _ = tracing::subscriber::set_global_default(subscriber);
let (data_path, bind_addr, storage_mode) = read_process_env_config();
run_server(
&data_path,
&bind_addr,
&storage_mode,
shutdown_signal(),
Some(log_buf),
)
.await?;
Ok(())
}
fn print_banner() {
println!(
r#"
___ _
/ _ \ _ __ _ _ ___| |_ _ _
| | | | '__| | | / __| __| | | |
| |_| | | | |_| \__ \ |_| |_| |
\__\_\_| \__,_|___/\__|\__, |
|___/
The trusty queue that never forgets 🦀
"#
);
}
fn raise_fd_limit() {
match rlimit::increase_nofile_limit(DESIRED_NOFILE) {
Ok(new) => {
if new < DESIRED_NOFILE {
eprintln!(
"Warning: could only raise NOFILE soft limit to {new} \
(wanted {DESIRED_NOFILE}). Consider running with \
--ulimit nofile={DESIRED_NOFILE}:{DESIRED_NOFILE}",
);
}
}
Err(e) => {
eprintln!("Warning: failed to raise NOFILE limit: {e}");
}
}
}
fn config_from_env(
data_path: Option<String>,
bind_addr: Option<String>,
storage_mode: Option<String>,
) -> (String, String, String) {
let data_path = data_path.unwrap_or_else(|| "/data".to_string());
let bind_addr = bind_addr.unwrap_or_else(|| "0.0.0.0:6784".to_string());
let storage_mode = storage_mode.unwrap_or_else(|| "rocksdb".to_string());
(data_path, bind_addr, storage_mode)
}
fn log_config_summary(data_path: &str, bind_addr: &str, storage_mode: &str) {
let env_or = |key: &str, default: &str| -> String {
std::env::var(key).unwrap_or_else(|_| default.to_string())
};
tracing::info!(
data_path = data_path,
bind_addr = bind_addr,
storage_mode = storage_mode,
max_connections = env_or("MAX_CONNECTIONS", "10000").as_str(),
rocksdb_cache_mb = env_or("ROCKSDB_CACHE_MB", "256").as_str(),
rocksdb_max_open_files = env_or("ROCKSDB_MAX_OPEN_FILES", "256").as_str(),
rocksdb_write_buffer_mb = env_or("ROCKSDB_WRITE_BUFFER_MB", "16").as_str(),
hot_tier_size = env_or("QRUSTY_HOT_TIER_SIZE", "1000").as_str(),
refill_threshold = env_or("QRUSTY_REFILL_THRESHOLD", "250").as_str(),
max_locked_index = env_or("QRUSTY_MAX_LOCKED_INDEX", "500000").as_str(),
compact_interval_secs = env_or("QRUSTY_COMPACT_INTERVAL_SECS", "300").as_str(),
segment_max_mb = env_or("QRUSTY_SEGMENT_MAX_MB", "256").as_str(),
externalize_min_bytes = env_or("QRUSTY_EXTERNALIZE_MIN_BYTES", "4096").as_str(),
memory_limit_mb = env_or("QRUSTY_MEMORY_LIMIT_MB", "auto").as_str(),
memory_pressure_threshold = env_or("QRUSTY_MEMORY_PRESSURE_THRESHOLD", "0.85").as_str(),
memory_pressure_exit = env_or("QRUSTY_MEMORY_PRESSURE_EXIT_THRESHOLD", "0.75").as_str(),
memory_critical_threshold = env_or("QRUSTY_MEMORY_CRITICAL_THRESHOLD", "0.90").as_str(),
ws_ping_interval_secs = env_or("WS_PING_INTERVAL_SECS", "30").as_str(),
ws_ping_timeout_secs = env_or("WS_PING_TIMEOUT_SECS", "10").as_str(),
webui_dir = env_or("WEBUI_DIR", "/opt/qrusty/webui").as_str(),
"Configuration summary"
);
}
fn read_process_env_config() -> (String, String, String) {
let data_path = std::env::var("DATA_PATH").ok();
let bind_addr = std::env::var("BIND_ADDR").ok();
let storage_mode = std::env::var("STORAGE_MODE").ok();
config_from_env(data_path, bind_addr, storage_mode)
}
async fn bind_listener(bind_addr: &str) -> Result<TcpListener, Box<dyn std::error::Error>> {
Ok(TcpListener::bind(bind_addr).await?)
}
async fn shutdown_signal() {
#[cfg(not(any(test, coverage)))]
{
let _ = tokio::signal::ctrl_c().await;
}
}
async fn run_server(
data_path: &str,
bind_addr: &str,
storage_mode: &str,
shutdown: impl Future<Output = ()> + Send + 'static,
log_buffer: Option<crate::log_buffer::LogBuffer>,
) -> Result<(), Box<dyn std::error::Error>> {
let listener = bind_listener(bind_addr).await?;
tracing::info!("Storage mode: {}", storage_mode);
log_config_summary(data_path, bind_addr, storage_mode);
tracing::info!("Server listening on {}", bind_addr);
run_with_listener(data_path, storage_mode, listener, shutdown, log_buffer).await
}
async fn run_with_listener(
data_path: &str,
storage_mode: &str,
listener: TcpListener,
shutdown: impl Future<Output = ()> + Send + 'static,
log_buffer: Option<crate::log_buffer::LogBuffer>,
) -> Result<(), Box<dyn std::error::Error>> {
let initializing = Arc::new(AtomicBool::new(false));
let mut concrete_storage: Option<Arc<Storage>> = None;
let storage: Arc<dyn StorageApi> = match storage_mode {
"memory" => Arc::new(crate::memory_storage::MemoryStorage::new()),
_ => {
let s = Arc::new(Storage::open(data_path)?);
concrete_storage = Some(s.clone());
initializing.store(true, Ordering::SeqCst);
let s_clone = s.clone();
let init_flag = initializing.clone();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(move || s_clone.initialize_cache())
.await
.expect("initialize_cache task panicked");
if let Err(e) = result {
tracing::error!("Storage cache initialisation failed: {}", e);
}
init_flag.store(false, Ordering::SeqCst);
});
let compact_interval_secs: u64 = std::env::var("QRUSTY_COMPACT_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(300); if s.payload_store().is_some() {
let s_compact = s.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(
compact_interval_secs,
));
loop {
interval.tick().await;
let sc = s_compact.clone();
if let Err(e) = tokio::task::spawn_blocking(move || sc.compact_payloads())
.await
.expect("compact_payloads task panicked")
{
tracing::warn!("Payload compaction failed: {}", e);
}
}
});
}
s
}
};
let memory_pressure = Arc::new(AtomicBool::new(false));
let mut api = ApiServer::new(storage.clone())
.with_initializing(initializing)
.with_memory_pressure(memory_pressure.clone());
if let Some(buf) = log_buffer {
api = api.with_log_buffer(buf);
}
let storage_clone = storage.clone();
let timings_clone = Arc::clone(api.timings());
tokio::spawn(timeout_monitor(storage_clone, timings_clone));
if let Some(concrete) = concrete_storage {
let monitor = crate::memory_monitor::MemoryMonitor::new();
let pressure_flag = memory_pressure.clone();
tokio::spawn(async move {
use crate::memory_monitor::PressureLevel;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let cooldown_ticks: u32 = 6; let mut remaining_cooldown: u32 = 0;
let mut prev_level = PressureLevel::None;
let log_throttle_ticks: u32 = 12;
let mut ticks_since_last_log: u32 = log_throttle_ticks;
loop {
interval.tick().await;
let state = monitor.state();
let level = state.pressure_level;
let pct = state.usage_ratio * 100.0;
pressure_flag.store(level >= PressureLevel::Critical, Ordering::SeqCst);
if prev_level > PressureLevel::None && level == PressureLevel::None {
tracing::info!(
event_type = "memory_pressure_resolved",
usage_pct = format!("{:.1}", pct).as_str(),
"Memory pressure resolved at {:.1}% — restoring caches",
pct,
);
let cc = concrete.clone();
let _ = tokio::task::spawn_blocking(move || {
cc.restore_cache_capacity();
})
.await;
remaining_cooldown = 0;
}
if level > PressureLevel::None {
let level_changed = level != prev_level;
let taking_action = remaining_cooldown == 0 || level > prev_level;
ticks_since_last_log += 1;
if level_changed || taking_action || ticks_since_last_log >= log_throttle_ticks
{
let action_label = match level {
PressureLevel::Warning => "flush_and_shrink_cache",
PressureLevel::Critical => {
"critical_release,shed_publishes,compact_payloads"
}
PressureLevel::None => unreachable!(),
};
let bd = concrete.memory_breakdown();
tracing::warn!(
event_type = "memory_pressure",
usage_bytes = state.usage_bytes,
limit_bytes = state.limit_bytes,
usage_pct = format!("{:.1}", pct).as_str(),
level = ?level,
action = action_label,
block_cache_mb = bd["block_cache_capacity_mb"],
hot_tier_entries = bd["hot_tier_entries"],
dedup_entries = bd["dedup_set_entries"],
locked_index_entries = bd["locked_index_entries"],
"Memory pressure: {:.1}% ({} MB / {} MB) [{:?}] cache={}MB hot={} dedup={} locked={}",
pct,
state.usage_bytes / (1024 * 1024),
state.limit_bytes / (1024 * 1024),
level,
bd["block_cache_capacity_mb"],
bd["hot_tier_entries"],
bd["dedup_set_entries"],
bd["locked_index_entries"],
);
ticks_since_last_log = 0;
}
if taking_action {
let cc = concrete.clone();
let _ = tokio::task::spawn_blocking(move || match level {
PressureLevel::Warning => {
cc.release_memory_warning();
}
PressureLevel::Critical => {
cc.release_memory_critical();
let _ = cc.compact_payloads();
}
PressureLevel::None => {}
})
.await;
remaining_cooldown = cooldown_ticks;
} else {
remaining_cooldown = remaining_cooldown.saturating_sub(1);
}
} else {
ticks_since_last_log = log_throttle_ticks;
}
prev_level = level;
}
});
}
serve(listener, api, shutdown).await
}
const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
async fn serve(
listener: TcpListener,
api: ApiServer,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
let max_conn: usize = std::env::var("MAX_CONNECTIONS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MAX_CONNECTIONS);
tracing::info!("Max concurrent connections: {max_conn}");
let app = api
.router()
.layer(CorsLayer::permissive())
.layer(ConcurrencyLimitLayer::new(max_conn));
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await?;
Ok(())
}
async fn timeout_monitor(
storage: Arc<dyn StorageApi>,
timings: Arc<crate::operation_timing::OperationTimingStore>,
) {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let start = std::time::Instant::now();
let result = storage.unlock_expired_messages().await;
timings.record("storage_unlock_expired_messages", start.elapsed());
handle_timeout_monitor_result(result);
}
}
fn handle_timeout_monitor_result(result: anyhow::Result<usize>) {
match result {
Ok(unlocked_count) => {
if unlocked_count > 0 {
tracing::info!("Unlocked {} expired messages", unlocked_count);
}
}
Err(e) => {
tracing::error!("Error during timeout monitoring: {}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::{Message, Priority};
use chrono::Utc;
use std::sync::Arc;
use std::sync::Mutex;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::time::Duration;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn init_tracing_for_tests() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.try_init();
}
fn create_test_storage() -> (Arc<Storage>, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let storage =
Storage::new(temp_dir.path().to_str().unwrap()).expect("Failed to create test storage");
(Arc::new(storage), temp_dir)
}
fn create_test_message(queue: &str, priority: u64, payload: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
queue: queue.to_string(),
priority: Priority::Numeric(priority),
payload: payload.to_string(),
created_at: Utc::now(),
locked_until: None,
locked_by: None,
retry_count: 0,
max_retries: 3,
payload_ref: None,
payload_hash: None,
}
}
#[test]
fn test_config_from_env_defaults() {
let (data_path, bind_addr, storage_mode) = config_from_env(None, None, None);
assert_eq!(data_path, "/data");
assert_eq!(bind_addr, "0.0.0.0:6784");
assert_eq!(storage_mode, "rocksdb");
}
#[test]
fn test_config_from_env_storage_mode_memory() {
let (_, _, storage_mode) = config_from_env(None, None, Some("memory".to_string()));
assert_eq!(storage_mode, "memory");
}
#[test]
fn test_main_exits_immediately_in_tests() {
init_tracing_for_tests();
let _guard = ENV_LOCK.lock().unwrap();
let temp_dir = TempDir::new().expect("Failed to create temp directory");
std::env::set_var("QRUSTY_TEST_IMMEDIATE_SHUTDOWN", "1");
std::env::set_var("DATA_PATH", temp_dir.path().to_str().unwrap());
std::env::set_var("BIND_ADDR", "127.0.0.1:0");
let res = super::main();
std::env::remove_var("QRUSTY_TEST_IMMEDIATE_SHUTDOWN");
std::env::remove_var("DATA_PATH");
std::env::remove_var("BIND_ADDR");
assert!(res.is_ok());
}
#[tokio::test]
async fn test_bind_listener_ephemeral_port() {
let listener = bind_listener("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
assert_eq!(addr.ip().to_string(), "127.0.0.1");
}
#[tokio::test]
async fn test_server_serves_health_and_shuts_down() {
init_tracing_for_tests();
print_banner();
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let listener = bind_listener("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let server_handle = tokio::spawn(async move {
run_with_listener(
temp_dir.path().to_str().unwrap(),
"rocksdb",
listener,
async {
let _ = shutdown_rx.await;
},
None,
)
.await
.unwrap()
});
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{}/health", addr))
.send()
.await
.unwrap();
assert!(resp.status().is_success());
let _ = shutdown_tx.send(());
tokio::time::timeout(Duration::from_secs(3), server_handle)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn test_server_memory_mode_serves_health() {
init_tracing_for_tests();
let listener = bind_listener("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let server_handle = tokio::spawn(async move {
run_with_listener(
"/unused",
"memory",
listener,
async {
let _ = shutdown_rx.await;
},
None,
)
.await
.unwrap()
});
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{}/health", addr))
.send()
.await
.unwrap();
assert!(resp.status().is_success());
let _ = shutdown_tx.send(());
tokio::time::timeout(Duration::from_secs(3), server_handle)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn test_timeout_monitor_background_task() {
init_tracing_for_tests();
let (storage, _temp_dir) = create_test_storage();
let message = create_test_message("monitor_test", 100, "background_test");
let message_id = message.id.clone();
storage.push(message).unwrap();
storage
.pop("monitor_test", "test_consumer", 1)
.unwrap()
.unwrap();
let storage_clone: Arc<dyn StorageApi> = storage.clone();
let timings = Arc::new(crate::operation_timing::OperationTimingStore::new());
let _monitor_handle = tokio::spawn(timeout_monitor(storage_clone, timings));
tokio::time::sleep(Duration::from_secs(7)).await;
let recovered_msg = storage
.pop("monitor_test", "recovery_consumer", 30)
.unwrap()
.unwrap();
assert_eq!(recovered_msg.id, message_id);
assert_eq!(recovered_msg.retry_count, 2); assert_eq!(
recovered_msg.locked_by,
Some("recovery_consumer".to_string())
);
}
#[tokio::test]
async fn test_timeout_monitor_error_resilience() {
init_tracing_for_tests();
let (storage, _temp_dir) = create_test_storage();
for i in 0..3 {
let message = create_test_message("resilience_test", i * 10, &format!("msg_{}", i));
storage.push(message).unwrap();
storage
.pop("resilience_test", &format!("consumer_{}", i), 1)
.unwrap();
}
let unlocked_count = storage.unlock_expired_messages().unwrap();
assert_eq!(unlocked_count, 0);
tokio::time::sleep(Duration::from_secs(2)).await;
let unlocked_count = storage.unlock_expired_messages().unwrap();
assert_eq!(unlocked_count, 3);
}
#[tokio::test]
async fn test_shutdown_signal_returns_immediately_in_tests_when_not_forced() {
init_tracing_for_tests();
{
let _guard = ENV_LOCK.lock().unwrap();
std::env::remove_var("QRUSTY_TEST_IMMEDIATE_SHUTDOWN");
}
shutdown_signal().await;
}
#[test]
fn test_timeout_monitor_result_branches() {
init_tracing_for_tests();
handle_timeout_monitor_result(Ok(0));
handle_timeout_monitor_result(Ok(1));
handle_timeout_monitor_result(Err(anyhow::anyhow!("boom")));
}
#[tokio::test]
async fn test_startup_gate_blocks_api_while_initializing() {
init_tracing_for_tests();
let listener = bind_listener("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let initializing = Arc::new(AtomicBool::new(true));
let storage: Arc<dyn StorageApi> = Arc::new(crate::memory_storage::MemoryStorage::new());
let api = ApiServer::new(storage).with_initializing(initializing);
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
serve(listener, api, async {
let _ = shutdown_rx.await;
})
.await
.unwrap();
});
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{}/health", addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "starting");
let resp = client
.post(format!("http://{}/publish", addr))
.json(&serde_json::json!({
"queue": "q", "priority": 1, "payload": "x", "max_retries": 0
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 503);
assert_eq!(
resp.headers().get("retry-after").unwrap().to_str().unwrap(),
"2"
);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "starting");
assert_eq!(body["retry_after_secs"], 2);
let resp = client
.get(format!("http://{}/queues", addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let _ = shutdown_tx.send(());
}
#[tokio::test]
async fn test_startup_gate_lifts_after_init() {
init_tracing_for_tests();
let listener = bind_listener("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let initializing = Arc::new(AtomicBool::new(true));
let storage: Arc<dyn StorageApi> = Arc::new(crate::memory_storage::MemoryStorage::new());
let api = ApiServer::new(storage).with_initializing(initializing.clone());
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
serve(listener, api, async {
let _ = shutdown_rx.await;
})
.await
.unwrap();
});
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{}/health", addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "starting");
initializing.store(false, Ordering::SeqCst);
let resp = client
.get(format!("http://{}/health", addr))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: serde_json::Value = resp.json().await.unwrap();
assert_eq!(body["status"], "ok");
let resp = client
.get(format!("http://{}/queues", addr))
.send()
.await
.unwrap();
assert!(resp.status().is_success());
let _ = shutdown_tx.send(());
}
}