use hammerwork::{
Result,
job::Job,
queue::JobQueue,
rate_limit::{RateLimit, ThrottleConfig},
stats::{InMemoryStatsCollector, StatisticsCollector},
worker::{Worker, WorkerPool},
};
use serde_json::json;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tracing::info;
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().init();
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgresql://localhost/hammerwork_test".to_string());
let pool = Pool::<Postgres>::connect(&database_url).await?;
let queue = Arc::new(JobQueue::new(pool));
#[cfg(feature = "postgres")]
{
}
let stats_collector =
Arc::new(InMemoryStatsCollector::new_default()) as Arc<dyn StatisticsCollector>;
let handler = Arc::new(|job: Job| {
Box::pin(async move {
info!("Processing job: {} with payload: {}", job.id, job.payload);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
if job.payload.get("action").and_then(|v| v.as_str()) == Some("fail") {
return Err(hammerwork::HammerworkError::Worker {
message: "Simulated failure".to_string(),
});
}
info!("Job {} completed successfully", job.id);
Ok(()) as Result<()>
}) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
});
let worker1 = Worker::new(queue.clone(), "email".to_string(), handler.clone())
.with_poll_interval(tokio::time::Duration::from_secs(1))
.with_max_retries(3)
.with_default_timeout(tokio::time::Duration::from_secs(30)) .with_rate_limit(RateLimit::per_second(2)) .with_stats_collector(Arc::clone(&stats_collector));
let worker2 = Worker::new(queue.clone(), "notifications".to_string(), handler.clone())
.with_poll_interval(tokio::time::Duration::from_secs(2))
.with_max_retries(2)
.with_default_timeout(tokio::time::Duration::from_secs(60)) .with_throttle_config(
ThrottleConfig::new()
.rate_per_minute(30) .backoff_on_error(tokio::time::Duration::from_secs(10)), )
.with_stats_collector(Arc::clone(&stats_collector));
let api_worker = Worker::new(queue.clone(), "api_calls".to_string(), handler.clone())
.with_poll_interval(tokio::time::Duration::from_millis(100))
.with_rate_limit(
RateLimit::per_second(10) .with_burst_limit(50), )
.with_stats_collector(Arc::clone(&stats_collector));
let mut worker_pool = WorkerPool::new().with_stats_collector(Arc::clone(&stats_collector));
worker_pool.add_worker(worker1);
worker_pool.add_worker(worker2);
worker_pool.add_worker(api_worker);
#[cfg(feature = "postgres")]
{
use hammerwork::queue::DatabaseQueue;
let job1 = Job::new(
"email".to_string(),
json!({"to": "user@example.com", "subject": "Hello"}),
);
let job2 = Job::new(
"notifications".to_string(),
json!({"message": "Welcome!", "user_id": 123}),
)
.with_timeout(std::time::Duration::from_secs(10));
let job3 = Job::new("email".to_string(), json!({"action": "fail"}));
let job4 = Job::new("email".to_string(), json!({"action": "fail"})) .with_timeout(std::time::Duration::from_secs(5));
let job5 = Job::new(
"email".to_string(),
json!({"to": "admin@example.com", "subject": "Heavy Processing"}),
)
.with_timeout(std::time::Duration::from_secs(300)) .with_max_attempts(5);
queue
.set_throttle(
"api_calls",
ThrottleConfig::new()
.max_concurrent(5)
.rate_per_minute(300) .backoff_on_error(tokio::time::Duration::from_secs(30)),
)
.await?;
info!("Configured throttling for api_calls queue: max 5 concurrent, 300/min global rate");
let job1_id = queue.enqueue(job1).await?;
let job2_id = queue.enqueue(job2).await?;
let job3_id = queue.enqueue(job3).await?;
let job4_id = queue.enqueue(job4).await?;
let job5_id = queue.enqueue(job5).await?;
for i in 1..=10 {
let api_job = Job::new(
"api_calls".to_string(),
json!({
"endpoint": format!("/api/users/{}", i),
"method": "GET",
"user_id": i
}),
);
queue.enqueue(api_job).await?;
}
info!("Enqueued 10 API call jobs to demonstrate rate limiting");
info!("Enqueued test jobs with timeouts:");
info!(" {} - uses worker default timeout (30s)", job1_id);
info!(" {} - custom 10s timeout", job2_id);
info!(" {} - will fail (uses default timeout)", job3_id);
info!(" {} - will fail with 5s timeout", job4_id);
info!(" {} - heavy processing with 5min timeout", job5_id);
info!("Processing jobs for 10 seconds to demonstrate rate limiting...");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
info!("=== Job Processing Statistics (Including Timeouts) ===");
let system_stats = stats_collector
.get_system_statistics(std::time::Duration::from_secs(300))
.await?;
info!(
"System Stats - Total: {}, Completed: {}, Failed: {}, Dead: {}, Timed Out: {}, Error Rate: {:.2}%",
system_stats.total_processed,
system_stats.completed,
system_stats.failed,
system_stats.dead,
system_stats.timed_out,
system_stats.error_rate * 100.0
);
let email_stats = stats_collector
.get_queue_statistics("email", std::time::Duration::from_secs(300))
.await?;
info!(
"Email Queue Stats - Total: {}, Completed: {}, Failed: {}, Timed Out: {}, Avg Processing Time: {:.2}ms",
email_stats.total_processed,
email_stats.completed,
email_stats.failed,
email_stats.timed_out,
email_stats.avg_processing_time_ms
);
let notifications_stats = stats_collector
.get_queue_statistics("notifications", std::time::Duration::from_secs(300))
.await?;
info!(
"Notifications Queue Stats - Total: {}, Completed: {}, Timed Out: {}, Avg Processing Time: {:.2}ms",
notifications_stats.total_processed,
notifications_stats.completed,
notifications_stats.timed_out,
notifications_stats.avg_processing_time_ms
);
let api_stats = stats_collector
.get_queue_statistics("api_calls", std::time::Duration::from_secs(300))
.await?;
info!(
"API Calls Queue Stats - Total: {}, Completed: {}, Rate Limiting Effects Visible: {}",
api_stats.total_processed,
api_stats.completed,
if api_stats.total_processed < 10 {
"Yes (some jobs still processing due to rate limits)"
} else {
"No"
}
);
info!("=== Current Throttling Configurations ===");
let throttle_configs = queue.get_all_throttles().await;
for (queue_name, config) in throttle_configs {
info!(
"Queue '{}': max_concurrent={:?}, rate_per_minute={:?}, backoff_on_error={:?}",
queue_name, config.max_concurrent, config.rate_per_minute, config.backoff_on_error
);
}
info!("=== Dead Job Management ===");
let dead_jobs = queue.get_dead_jobs(Some(10), None).await?;
info!("Found {} dead jobs", dead_jobs.len());
for dead_job in &dead_jobs {
info!(
"Dead Job {} in queue '{}': {:?}",
dead_job.id, dead_job.queue_name, dead_job.error_message
);
}
let dead_summary = queue.get_dead_job_summary().await?;
info!(
"Dead Job Summary - Total: {}, By Queue: {:?}, Error Patterns: {:?}",
dead_summary.total_dead_jobs,
dead_summary.dead_jobs_by_queue,
dead_summary.error_patterns
);
let queue_stats = queue.get_queue_stats("email").await?;
info!(
"Email Queue DB Stats - Pending: {}, Running: {}, Dead: {}, Timed Out: {}, Completed: {}",
queue_stats.pending_count,
queue_stats.running_count,
queue_stats.dead_count,
queue_stats.timed_out_count,
queue_stats.completed_count
);
let all_queue_stats = queue.get_all_queue_stats().await?;
info!("=== All Queue Statistics ===");
for stats in &all_queue_stats {
info!(
"Queue '{}' - Pending: {}, Running: {}, Dead: {}, Timed Out: {}, Completed: {}",
stats.queue_name,
stats.pending_count,
stats.running_count,
stats.dead_count,
stats.timed_out_count,
stats.completed_count
);
}
if !dead_jobs.is_empty() {
let dead_job_id = dead_jobs[0].id;
info!("Attempting to retry dead job: {}", dead_job_id);
queue.retry_dead_job(dead_job_id).await?;
info!("Dead job {} has been reset for retry", dead_job_id);
}
}
info!("Example completed successfully!");
info!("Demonstrated features:");
info!(" ✓ Job timeout configuration (per-job and worker defaults)");
info!(" ✓ Timeout statistics tracking and monitoring");
info!(" ✓ Dead job management and retry functionality");
info!(" ✓ Comprehensive job processing statistics");
info!("");
info!("Timeout Features Shown:");
info!(" • Worker default timeouts (30s for email, 60s for notifications)");
info!(" • Job-specific timeouts (10s, 5s, 300s examples)");
info!(" • Timeout event tracking in statistics");
info!(" • Timeout counts in database queue statistics");
info!("");
info!("In a real application, you would start the worker pool to run indefinitely:");
info!("worker_pool.start().await?;");
Ok(())
}