use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use metrics_exporter_prometheus::PrometheusBuilder;
use rust_job_queue_api_worker_system::{
connect, migrate,
worker::{recover_stale_at_startup, SimulatedExecutor, WorkerRuntime},
PoolConfig,
};
use tokio::signal;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_tracing();
init_metrics()?;
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
let concurrency: usize = std::env::var("WORKER_CONCURRENCY")
.ok()
.and_then(|s| s.parse().ok())
.filter(|n: &usize| *n > 0)
.unwrap_or(4);
let max_conns = u32::try_from(concurrency)
.unwrap_or(u32::MAX)
.saturating_add(2);
let pool = connect(&PoolConfig::from_url(database_url).with_max_connections(max_conns)).await?;
migrate(&pool).await?;
let recovered = recover_stale_at_startup(&pool, Duration::from_secs(300)).await?;
info!(recovered, concurrency, "worker startup complete");
let cancel = CancellationToken::new();
let shutdown_token = cancel.clone();
tokio::spawn(async move {
wait_shutdown().await;
info!("shutdown signal received");
shutdown_token.cancel();
});
let runtime =
WorkerRuntime::new(pool, Arc::new(SimulatedExecutor)).with_concurrency(concurrency);
runtime.run(cancel).await;
info!("worker exiting");
Ok(())
}
fn init_metrics() -> anyhow::Result<()> {
let bind: SocketAddr = std::env::var("WORKER_METRICS_BIND_ADDR")
.unwrap_or_else(|_| "0.0.0.0:9091".to_string())
.parse()
.context("WORKER_METRICS_BIND_ADDR must be a socket address (host:port)")?;
PrometheusBuilder::new()
.with_http_listener(bind)
.install()
.context("install prometheus exporter")?;
info!(%bind, "metrics listener installed");
Ok(())
}
fn init_tracing() {
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info,sqlx=warn"));
let json = std::env::var("RUST_LOG_FORMAT").as_deref().unwrap_or("") == "json";
let builder = tracing_subscriber::fmt().with_env_filter(filter);
if json {
builder.json().init();
} else {
builder.init();
}
}
async fn wait_shutdown() {
let ctrl_c = async {
let _ = signal::ctrl_c().await;
};
#[cfg(unix)]
let term = async {
let mut sig = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("install sigterm handler");
sig.recv().await;
};
#[cfg(not(unix))]
let term = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {}
() = term => {}
}
}