pub mod config;
pub mod worker;
pub mod scheduler;
pub mod janitor;
pub use config::{ServerBuilder, ServerConfig, ServerState};
pub use worker::{Worker, WorkerMetadata};
pub use scheduler::Scheduler;
pub use janitor::{Janitor, JanitorConfig};
use crate::{Error, Result};
use crate::processor::Mux;
use crate::observability::RediqMetrics;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
pub struct Server {
state: Arc<ServerState>,
shutdown: Arc<AtomicBool>,
worker_count: Arc<AtomicUsize>,
metrics: Option<Arc<RediqMetrics>>,
metrics_bind_address: Option<SocketAddr>,
}
impl Server {
fn new(state: ServerState) -> Self {
Self {
state: Arc::new(state),
shutdown: Arc::new(AtomicBool::new(false)),
worker_count: Arc::new(AtomicUsize::new(0)),
metrics: None,
metrics_bind_address: None,
}
}
pub fn enable_metrics(&mut self, bind_address: impl Into<SocketAddr>) -> Result<()> {
let metrics = RediqMetrics::new()
.map_err(|e| Error::Metrics(e.to_string()))?;
self.metrics = Some(Arc::new(metrics));
let addr = bind_address.into();
self.metrics_bind_address = Some(addr);
tracing::info!("Metrics enabled on http://{}", addr);
tracing::info!("Metrics endpoint: http://{}/metrics", addr);
Ok(())
}
pub fn enable_metrics_on(&mut self, bind_address: impl Into<String>) -> Result<()> {
let addr_str = bind_address.into();
let addr = addr_str.parse::<SocketAddr>()
.map_err(|e| Error::Config(format!("Invalid metrics address '{}': {}", addr_str, e)))?;
self.enable_metrics(addr)
}
pub async fn run(self, mux: Mux) -> Result<()> {
tracing::info!("Starting Rediq Server: {}", self.state.config.server_name);
tracing::info!("Queues: {:?}", self.state.config.queues);
tracing::info!("Concurrency: {}", self.state.config.concurrency);
let mux = Arc::new(tokio::sync::Mutex::new(mux));
let mut join_set = JoinSet::new();
#[cfg(feature = "metrics-http")]
if let (Some(metrics), Some(bind_address)) = (self.metrics.clone(), self.metrics_bind_address) {
use crate::observability::http_server::MetricsServer;
let metrics_server = MetricsServer::new(metrics, bind_address);
let metrics_server_shutdown = self.shutdown.clone();
join_set.spawn(async move {
let result = metrics_server.run().await;
metrics_server_shutdown.store(true, Ordering::SeqCst);
result
});
tracing::info!("Metrics HTTP server started on http://{}", bind_address);
}
if self.state.config.enable_scheduler {
let scheduler = Scheduler::new(
self.state.redis.clone(),
self.state.config.queues.clone(),
);
let scheduler_shutdown = self.shutdown.clone();
tokio::spawn(async move {
let result = scheduler.run().await;
scheduler_shutdown.store(true, Ordering::SeqCst);
result
});
tracing::info!("Scheduler started");
}
if let Some(janitor_config) = &self.state.config.janitor_config {
let janitor = Janitor::new(self.state.redis.clone(), janitor_config.clone());
let janitor_shutdown = janitor.shutdown_handle();
let _shutdown = self.shutdown.clone();
tokio::spawn(async move {
janitor.run().await;
_shutdown.store(true, Ordering::SeqCst);
});
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
janitor_shutdown.store(true, Ordering::SeqCst);
});
tracing::info!("Janitor started (interval: {:?})", janitor_config.interval);
}
for i in 0..self.state.config.concurrency {
let worker = self.create_worker(i, mux.clone())?;
let _shutdown = self.shutdown.clone();
let count = self.worker_count.clone();
count.fetch_add(1, Ordering::Relaxed);
join_set.spawn(async move {
let result = worker.run().await;
count.fetch_sub(1, Ordering::Relaxed);
result
});
}
tracing::info!("Started {} workers", self.state.config.concurrency);
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received shutdown signal");
self.shutdown.store(true, Ordering::SeqCst);
}
}
self.graceful_shutdown(&mut join_set).await?;
tracing::info!("Server stopped");
Ok(())
}
fn create_worker(&self, index: usize, mux: Arc<Mutex<Mux>>) -> Result<Worker> {
let worker_id = format!(
"{}-worker-{}",
self.state.config.server_name,
index
);
Ok(Worker::new(
worker_id,
self.state.clone(),
self.shutdown.clone(),
mux,
))
}
async fn graceful_shutdown(&self, join_set: &mut JoinSet<Result<()>>) -> Result<()> {
tracing::info!("Initiating graceful shutdown");
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
let initial_count = self.worker_count.load(Ordering::Relaxed);
while initial_count > 0 && start.elapsed() < timeout {
if let Some(result) = join_set.join_next().await {
if let Err(e) = result {
tracing::error!("Worker error during shutdown: {}", e);
}
} else {
break;
}
}
let remaining = self.worker_count.load(Ordering::Relaxed);
if remaining > 0 {
tracing::warn!("Force shutting down {} workers", remaining);
}
while let Some(result) = join_set.join_next().await {
if let Err(e) = result {
tracing::error!("Worker error: {}", e);
}
}
Ok(())
}
pub fn stats(&self) -> ServerStats {
ServerStats {
server_name: self.state.config.server_name.clone(),
active_workers: self.worker_count.load(Ordering::Relaxed),
queues: self.state.config.queues.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ServerStats {
pub server_name: String,
pub active_workers: usize,
pub queues: Vec<String>,
}
impl From<ServerState> for Server {
fn from(state: ServerState) -> Self {
Self::new(state)
}
}
pub async fn run_server(
redis_url: impl Into<String>,
queues: &[&str],
mux: Mux,
) -> Result<()> {
let state = ServerBuilder::new()
.redis_url(redis_url)
.queues(queues)
.build()
.await?;
let server = Server::from(state);
server.run(mux).await
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore = "Requires Redis server"]
async fn test_server_creation() {
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://localhost:6379".to_string());
let state = ServerBuilder::new()
.redis_url(&redis_url)
.queues(&["default"])
.concurrency(5)
.build()
.await
.unwrap();
let server = Server::new(state);
let stats = server.stats();
assert_eq!(stats.queues, vec!["default"]);
}
}