use hammerwork::{
Result,
cron::{CronSchedule, presets},
job::Job,
queue::JobQueue,
stats::{InMemoryStatsCollector, StatisticsCollector},
worker::{Worker, WorkerPool},
};
use serde_json::json;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tracing::info;
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().init();
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgresql://localhost/hammerwork_test".to_string());
let pool = Pool::<Postgres>::connect(&database_url).await?;
let queue = Arc::new(JobQueue::new(pool));
let stats_collector =
Arc::new(InMemoryStatsCollector::new_default()) as Arc<dyn StatisticsCollector>;
let report_handler = Arc::new(|job: Job| {
Box::pin(async move {
info!(
"Generating report: {} with payload: {}",
job.id, job.payload
);
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if let Some(report_type) = job.payload.get("report_type").and_then(|v| v.as_str()) {
match report_type {
"daily" => info!("Generated daily sales report"),
"weekly" => info!("Generated weekly analytics report"),
"monthly" => info!("Generated monthly summary report"),
_ => info!("Generated generic report"),
}
}
info!("Report {} completed successfully", job.id);
Ok(()) as Result<()>
}) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
});
let cleanup_handler = Arc::new(|job: Job| {
Box::pin(async move {
info!(
"Running cleanup task: {} with payload: {}",
job.id, job.payload
);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
if let Some(cleanup_type) = job.payload.get("cleanup_type").and_then(|v| v.as_str()) {
match cleanup_type {
"temp_files" => info!("Cleaned up temporary files"),
"logs" => info!("Rotated and archived log files"),
"cache" => info!("Cleared expired cache entries"),
_ => info!("Performed generic cleanup"),
}
}
info!("Cleanup task {} completed successfully", job.id);
Ok(()) as Result<()>
}) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
});
let report_worker = Worker::new(queue.clone(), "reports".to_string(), report_handler)
.with_poll_interval(tokio::time::Duration::from_secs(2))
.with_max_retries(3)
.with_default_timeout(tokio::time::Duration::from_secs(60))
.with_stats_collector(Arc::clone(&stats_collector));
let cleanup_worker = Worker::new(queue.clone(), "cleanup".to_string(), cleanup_handler)
.with_poll_interval(tokio::time::Duration::from_secs(3))
.with_max_retries(2)
.with_default_timeout(tokio::time::Duration::from_secs(30))
.with_stats_collector(Arc::clone(&stats_collector));
let mut worker_pool = WorkerPool::new().with_stats_collector(Arc::clone(&stats_collector));
worker_pool.add_worker(report_worker);
worker_pool.add_worker(cleanup_worker);
#[cfg(feature = "postgres")]
{
use hammerwork::queue::DatabaseQueue;
info!("=== Creating Cron Jobs ===");
let daily_report_schedule = CronSchedule::new("0 9 * * *")?; let daily_report_job = Job::with_cron_schedule(
"reports".to_string(),
json!({
"report_type": "daily",
"description": "Daily sales report"
}),
daily_report_schedule,
)?;
let daily_job_id = queue.enqueue_cron_job(daily_report_job).await?;
info!("Enqueued daily report job: {}", daily_job_id);
let weekly_report_schedule = CronSchedule::new("0 8 * * 1")?; let weekly_report_job = Job::with_cron_schedule(
"reports".to_string(),
json!({
"report_type": "weekly",
"description": "Weekly analytics report"
}),
weekly_report_schedule,
)?;
let weekly_job_id = queue.enqueue_cron_job(weekly_report_job).await?;
info!("Enqueued weekly report job: {}", weekly_job_id);
let monthly_report_schedule = CronSchedule::new("0 7 1 * *")?; let monthly_report_job = Job::with_cron_schedule(
"reports".to_string(),
json!({
"report_type": "monthly",
"description": "Monthly summary report"
}),
monthly_report_schedule,
)?;
let monthly_job_id = queue.enqueue_cron_job(monthly_report_job).await?;
info!("Enqueued monthly report job: {}", monthly_job_id);
let cleanup_schedule = CronSchedule::new("*/30 * * * *")?; let cleanup_job = Job::with_cron_schedule(
"cleanup".to_string(),
json!({
"cleanup_type": "temp_files",
"description": "Clean up temporary files"
}),
cleanup_schedule,
)?;
let cleanup_job_id = queue.enqueue_cron_job(cleanup_job).await?;
info!("Enqueued cleanup job: {}", cleanup_job_id);
let log_rotation_job = Job::new(
"cleanup".to_string(),
json!({
"cleanup_type": "logs",
"description": "Rotate log files"
}),
)
.with_cron(presets::every_hour())?;
let log_job_id = queue.enqueue_cron_job(log_rotation_job).await?;
info!("Enqueued log rotation job: {}", log_job_id);
let ny_cleanup_schedule = CronSchedule::with_timezone("0 2 * * *", "America/New_York")?; let cache_cleanup_job = Job::with_cron_schedule(
"cleanup".to_string(),
json!({
"cleanup_type": "cache",
"description": "Clear expired cache entries",
"timezone": "America/New_York"
}),
ny_cleanup_schedule,
)?;
let cache_job_id = queue.enqueue_cron_job(cache_cleanup_job).await?;
info!("Enqueued cache cleanup job (NY timezone): {}", cache_job_id);
let demo_schedule = CronSchedule::new("* * * * *")?; let demo_job = Job::with_cron_schedule(
"reports".to_string(),
json!({
"report_type": "demo",
"description": "Demo job that runs every minute"
}),
demo_schedule,
)?;
let demo_job_id = queue.enqueue_cron_job(demo_job).await?;
info!("Enqueued demo job (every minute): {}", demo_job_id);
info!("=== Recurring Jobs ===");
let recurring_reports = queue.get_recurring_jobs("reports").await?;
let recurring_cleanup = queue.get_recurring_jobs("cleanup").await?;
info!(
"Report queue has {} recurring jobs:",
recurring_reports.len()
);
for job in &recurring_reports {
info!(
" Job {}: {} (next run: {:?})",
job.id,
job.payload.get("description").unwrap_or(&json!("Unknown")),
job.next_run_at
);
}
info!(
"Cleanup queue has {} recurring jobs:",
recurring_cleanup.len()
);
for job in &recurring_cleanup {
info!(
" Job {}: {} (next run: {:?})",
job.id,
job.payload.get("description").unwrap_or(&json!("Unknown")),
job.next_run_at
);
}
info!("=== Due Cron Jobs ===");
let due_jobs = queue.get_due_cron_jobs(None).await?;
info!("Found {} jobs due for execution", due_jobs.len());
for job in &due_jobs {
info!(
" Due job {}: {} (scheduled: {})",
job.id,
job.payload.get("description").unwrap_or(&json!("Unknown")),
job.scheduled_at
);
}
info!("=== Processing Jobs ===");
info!("Starting workers to process jobs...");
let worker_handle = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(120)).await;
info!("Stopping workers after demonstration period");
});
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
info!("=== Job Processing Statistics ===");
let system_stats = stats_collector
.get_system_statistics(std::time::Duration::from_secs(300))
.await?;
info!(
"System Stats - Total: {}, Completed: {}, Failed: {}, Error Rate: {:.2}%",
system_stats.total_processed,
system_stats.completed,
system_stats.failed,
system_stats.error_rate * 100.0
);
let report_stats = stats_collector
.get_queue_statistics("reports", std::time::Duration::from_secs(300))
.await?;
info!(
"Reports Queue - Total: {}, Completed: {}, Avg Processing Time: {:.2}ms",
report_stats.total_processed,
report_stats.completed,
report_stats.avg_processing_time_ms
);
let cleanup_stats = stats_collector
.get_queue_statistics("cleanup", std::time::Duration::from_secs(300))
.await?;
info!(
"Cleanup Queue - Total: {}, Completed: {}, Avg Processing Time: {:.2}ms",
cleanup_stats.total_processed,
cleanup_stats.completed,
cleanup_stats.avg_processing_time_ms
);
info!("=== Cron Job Management ===");
queue.disable_recurring_job(demo_job_id).await?;
info!("Disabled demo job {} from further executions", demo_job_id);
let updated_reports = queue.get_recurring_jobs("reports").await?;
let active_count = updated_reports.iter().filter(|j| j.recurring).count();
info!(
"Reports queue now has {} active recurring jobs",
active_count
);
worker_handle.await?;
}
info!("=== Cron Jobs Example Completed ===");
info!("Demonstrated features:");
info!(" ✓ Creating cron jobs with various schedules");
info!(" ✓ Using preset cron schedules");
info!(" ✓ Timezone-aware cron scheduling");
info!(" ✓ Automatic job rescheduling after completion");
info!(" ✓ Listing and managing recurring jobs");
info!(" ✓ Enabling/disabling recurring jobs");
info!(" ✓ Getting due jobs for execution");
info!("");
info!("Cron Expression Examples Used:");
info!(" • '0 9 * * *' - Daily at 9:00 AM");
info!(" • '0 8 * * 1' - Weekly on Monday at 8:00 AM");
info!(" • '0 7 1 * *' - Monthly on 1st day at 7:00 AM");
info!(" • '*/30 * * * *' - Every 30 minutes");
info!(" • '0 * * * *' - Every hour (preset)");
info!(" • '* * * * *' - Every minute (demo)");
info!("");
info!("In a production environment, workers would run continuously:");
info!("worker_pool.start().await?;");
Ok(())
}