use hammerwork::{
AutoscaleConfig, Job, JobQueue, Result, Worker, WorkerPool, queue::DatabaseQueue,
worker::JobHandler,
};
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
info!("Starting autoscaling worker pool example");
#[cfg(all(feature = "postgres", not(feature = "mysql")))]
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;
#[cfg(all(feature = "mysql", not(feature = "postgres")))]
let pool = sqlx::MySqlPool::connect("mysql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;
#[cfg(all(feature = "postgres", feature = "mysql"))]
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;
#[cfg(not(any(feature = "postgres", feature = "mysql")))]
compile_error!("This example requires either 'postgres' or 'mysql' feature to be enabled");
let queue = Arc::new(JobQueue::new(pool));
let handler: JobHandler = Arc::new(|job: Job| {
Box::pin(async move {
info!("Processing job {} with payload: {:?}", job.id, job.payload);
let processing_time = job
.payload
.get("processing_time")
.and_then(|v| v.as_u64())
.unwrap_or(1000);
sleep(Duration::from_millis(processing_time)).await;
info!("Completed job {}", job.id);
Ok(())
})
});
info!("=== Example 1: Default Autoscaling ===");
run_default_autoscaling_example(&queue, handler.clone()).await?;
info!("=== Example 2: Conservative Autoscaling ===");
run_conservative_autoscaling_example(&queue, handler.clone()).await?;
info!("=== Example 3: Aggressive Autoscaling ===");
run_aggressive_autoscaling_example(&queue, handler.clone()).await?;
info!("=== Example 4: Custom Autoscaling ===");
run_custom_autoscaling_example(&queue, handler.clone()).await?;
info!("=== Example 5: Disabled Autoscaling ===");
run_static_worker_pool_example(&queue, handler).await?;
info!("Autoscaling examples completed!");
Ok(())
}
async fn run_default_autoscaling_example<DB>(
queue: &Arc<JobQueue<DB>>,
handler: JobHandler,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Creating worker pool with default autoscaling settings");
let worker_template = Worker::new(
Arc::clone(queue),
"autoscaling_default".to_string(),
handler,
)
.with_poll_interval(Duration::from_millis(100));
let mut pool = WorkerPool::new()
.with_worker_template(worker_template.clone())
.with_autoscaling(AutoscaleConfig::default());
pool.add_worker(worker_template);
enqueue_test_jobs(queue, "autoscaling_default", 20).await?;
tokio::select! {
_ = pool.start() => {},
_ = sleep(Duration::from_secs(30)) => {
info!("Stopping default autoscaling example");
pool.shutdown().await?;
}
}
let metrics = pool.get_autoscale_metrics();
info!(
"Final metrics - Active workers: {}, Avg queue depth: {:.1}",
metrics.active_workers, metrics.avg_queue_depth
);
Ok(())
}
async fn run_conservative_autoscaling_example<DB>(
queue: &Arc<JobQueue<DB>>,
handler: JobHandler,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Creating worker pool with conservative autoscaling settings");
let worker_template = Worker::new(
Arc::clone(queue),
"autoscaling_conservative".to_string(),
handler,
)
.with_poll_interval(Duration::from_millis(100));
let mut pool = WorkerPool::new()
.with_worker_template(worker_template.clone())
.with_autoscaling(AutoscaleConfig::conservative());
pool.add_worker(worker_template);
enqueue_test_jobs(queue, "autoscaling_conservative", 50).await?;
tokio::select! {
_ = pool.start() => {},
_ = sleep(Duration::from_secs(45)) => {
info!("Stopping conservative autoscaling example");
pool.shutdown().await?;
}
}
let metrics = pool.get_autoscale_metrics();
info!(
"Conservative final metrics - Active workers: {}, Avg queue depth: {:.1}",
metrics.active_workers, metrics.avg_queue_depth
);
Ok(())
}
async fn run_aggressive_autoscaling_example<DB>(
queue: &Arc<JobQueue<DB>>,
handler: JobHandler,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Creating worker pool with aggressive autoscaling settings");
let worker_template = Worker::new(
Arc::clone(queue),
"autoscaling_aggressive".to_string(),
handler,
)
.with_poll_interval(Duration::from_millis(100));
let mut pool = WorkerPool::new()
.with_worker_template(worker_template.clone())
.with_autoscaling(AutoscaleConfig::aggressive());
pool.add_worker(worker_template);
enqueue_test_jobs(queue, "autoscaling_aggressive", 15).await?;
tokio::select! {
_ = pool.start() => {},
_ = sleep(Duration::from_secs(25)) => {
info!("Stopping aggressive autoscaling example");
pool.shutdown().await?;
}
}
let metrics = pool.get_autoscale_metrics();
info!(
"Aggressive final metrics - Active workers: {}, Avg queue depth: {:.1}",
metrics.active_workers, metrics.avg_queue_depth
);
Ok(())
}
async fn run_custom_autoscaling_example<DB>(
queue: &Arc<JobQueue<DB>>,
handler: JobHandler,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Creating worker pool with custom autoscaling settings");
let worker_template = Worker::new(Arc::clone(queue), "autoscaling_custom".to_string(), handler)
.with_poll_interval(Duration::from_millis(100));
let custom_config = AutoscaleConfig::new()
.with_min_workers(2)
.with_max_workers(8)
.with_scale_up_threshold(6)
.with_scale_down_threshold(1)
.with_cooldown_period(Duration::from_secs(45))
.with_scale_step(2)
.with_evaluation_window(Duration::from_secs(20))
.with_idle_timeout(Duration::from_secs(180));
let mut pool = WorkerPool::new()
.with_worker_template(worker_template.clone())
.with_autoscaling(custom_config);
pool.add_worker(worker_template);
enqueue_test_jobs(queue, "autoscaling_custom", 25).await?;
sleep(Duration::from_secs(10)).await;
enqueue_test_jobs(queue, "autoscaling_custom", 30).await?;
tokio::select! {
_ = pool.start() => {},
_ = sleep(Duration::from_secs(60)) => {
info!("Stopping custom autoscaling example");
pool.shutdown().await?;
}
}
let metrics = pool.get_autoscale_metrics();
info!(
"Custom final metrics - Active workers: {}, Avg queue depth: {:.1}",
metrics.active_workers, metrics.avg_queue_depth
);
Ok(())
}
async fn run_static_worker_pool_example<DB>(
queue: &Arc<JobQueue<DB>>,
handler: JobHandler,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Creating static worker pool (autoscaling disabled)");
let worker1 = Worker::new(
Arc::clone(queue),
"static_pool".to_string(),
handler.clone(),
)
.with_poll_interval(Duration::from_millis(100));
let worker2 = Worker::new(Arc::clone(queue), "static_pool".to_string(), handler)
.with_poll_interval(Duration::from_millis(100));
let mut pool = WorkerPool::new().without_autoscaling();
pool.add_worker(worker1);
pool.add_worker(worker2);
enqueue_test_jobs(queue, "static_pool", 30).await?;
tokio::select! {
_ = pool.start() => {},
_ = sleep(Duration::from_secs(20)) => {
info!("Stopping static worker pool example");
pool.shutdown().await?;
}
}
let metrics = pool.get_autoscale_metrics();
info!(
"Static pool final metrics - Active workers: {}, Avg queue depth: {:.1}",
metrics.active_workers, metrics.avg_queue_depth
);
Ok(())
}
async fn enqueue_test_jobs<DB>(
queue: &Arc<JobQueue<DB>>,
queue_name: &str,
count: u32,
) -> Result<()>
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
info!("Enqueuing {} test jobs to queue '{}'", count, queue_name);
for i in 0..count {
let job = Job::new(
queue_name.to_string(),
json!({
"task_id": i,
"message": format!("Test job {}", i),
"processing_time": 500 + (i % 3) * 500 }),
);
queue.enqueue(job).await?;
}
Ok(())
}
#[allow(dead_code)]
async fn monitor_autoscaling_metrics<DB>(pool: &WorkerPool<DB>)
where
DB: sqlx::Database + Send + Sync + 'static,
JobQueue<DB>: DatabaseQueue<Database = DB> + Send + Sync,
{
let mut interval = tokio::time::interval(Duration::from_secs(5));
for _ in 0..12 {
interval.tick().await;
let metrics = pool.get_autoscale_metrics();
info!(
"Autoscaling metrics - Workers: {}, Queue depth: {}/{:.1} (current/avg), Jobs/sec: {:.2}, Utilization: {:.1}%",
metrics.active_workers,
metrics.current_queue_depth,
metrics.avg_queue_depth,
metrics.jobs_per_second,
metrics.worker_utilization * 100.0
);
if let Some(last_scale) = metrics.last_scale_time {
let time_since = chrono::Utc::now() - last_scale;
info!(
"Time since last scaling action: {}s",
time_since.num_seconds()
);
}
}
}