use crate::config::{QueueConfig, trigger_shutdown};
use std::time::Duration;
use tracing::{info, warn};
pub async fn run_engine(
redis_url: String,
queues: Vec<QueueConfig>,
shutdown_grace_secs: u64,
) -> anyhow::Result<()> {
let queues = if queues.is_empty() {
warn!("No queues configured; defaulting to default:10");
vec![QueueConfig::new("default", 10, 0)]
} else {
queues
};
info!(redis = %redis_url, "Starting qrush-engine");
QueueConfig::initialize(redis_url, queues).await?;
info!("qrush-engine running. Press Ctrl+C to stop.");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received Ctrl+C");
}
_ = async {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
if let Ok(mut sigterm) = signal(SignalKind::terminate()) {
sigterm.recv().await;
return;
}
}
std::future::pending::<()>().await
} => {}
}
trigger_shutdown();
let grace = Duration::from_secs(shutdown_grace_secs);
info!(?grace, "Waiting for graceful shutdown");
tokio::time::sleep(grace).await;
info!("qrush-engine exited");
Ok(())
}
pub fn parse_queues(spec: &str) -> Vec<QueueConfig> {
spec.split(',')
.filter_map(|raw| {
let s = raw.trim();
if s.is_empty() {
return None;
}
let parts: Vec<&str> = s.split(':').collect();
let name = parts.get(0).unwrap_or(&"default").trim().to_string();
let concurrency = parts
.get(1)
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(5);
let priority = parts
.get(2)
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(0);
Some(QueueConfig::new(name, concurrency, priority))
})
.collect()
}