use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use rocketmq_rust::schedule::executor::ExecutorConfig;
use rocketmq_rust::CronTrigger;
use rocketmq_rust::IntervalTrigger;
use rocketmq_rust::SchedulerConfig;
use rocketmq_rust::Task;
use rocketmq_rust::TaskResult;
use rocketmq_rust::TaskScheduler;
use tokio::time::sleep;
use tracing::error;
use tracing::info;
use tracing::Level;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
let config = SchedulerConfig {
executor_config: ExecutorConfig {
max_concurrent_tasks: 5,
default_timeout: Duration::from_secs(30),
enable_metrics: true,
},
executor_pool_size: 2,
check_interval: Duration::from_millis(500),
max_scheduler_threads: 1,
enable_persistence: true,
persistence_interval: Duration::from_secs(10),
};
let scheduler = Arc::new(TaskScheduler::new(config));
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let data_task = Arc::new(
Task::new("data_processor", "Data Processor", move |ctx| {
let counter = counter_clone.clone();
async move {
let count = counter.fetch_add(1, Ordering::Relaxed);
info!("Processing data batch #{} at {:?}", count, ctx.scheduled_time);
sleep(Duration::from_millis(200)).await;
if count % 7 == 6 {
TaskResult::Failed("Simulated processing error".to_string())
} else {
TaskResult::Success(Some(format!("Processed batch #{}", count)))
}
}
})
.with_description("Processes data batches periodically")
.with_group("data")
.with_priority(2)
.with_max_retry(3)
.with_timeout(Duration::from_secs(10)),
);
let health_task = Arc::new(
Task::new("health_check", "Health Check", |ctx| async move {
info!("Performing health check: {}", ctx.execution_id);
sleep(Duration::from_millis(50)).await;
TaskResult::Success(Some("All systems healthy".to_string()))
})
.with_description("Checks system health")
.with_group("monitoring")
.with_priority(1),
);
let cleanup_task = Arc::new(
Task::new("cleanup", "Cleanup Task", |ctx| async move {
info!("Running cleanup: {}", ctx.execution_id);
sleep(Duration::from_millis(100)).await;
TaskResult::Success(Some("Cleanup completed".to_string()))
})
.with_description("Cleans up temporary files")
.with_group("maintenance")
.with_priority(0),
);
let counter_c = counter.clone();
let report_task = Arc::new(
Task::new("report_generator", "Report Generator", move |_| {
let value = counter_c.clone();
async move {
let count = value.load(Ordering::Relaxed);
info!("Generating report with {} processed items", count);
sleep(Duration::from_millis(300)).await;
TaskResult::Success(Some(format!("Report generated with {} items", count)))
}
})
.with_description("Generates periodic reports")
.with_group("reporting")
.with_priority(1),
);
scheduler.start().await?;
let data_job_id = scheduler
.schedule_job(data_task, Arc::new(IntervalTrigger::every_seconds(5)))
.await?;
let _health_job_id = scheduler
.schedule_job(health_task, Arc::new(IntervalTrigger::every_seconds(10)))
.await?;
let _cleanup_job_id = scheduler
.schedule_job(cleanup_task, Arc::new(CronTrigger::new("0 * * * * *")?))
.await?;
let report_job_id = scheduler
.schedule_job(
report_task,
Arc::new(
IntervalTrigger::every_seconds(30)
.with_start_time(std::time::SystemTime::now() + Duration::from_secs(10)),
),
)
.await?;
info!("All jobs scheduled successfully");
let arc = scheduler.clone();
tokio::spawn(async move {
sleep(Duration::from_secs(20)).await;
if let Err(e) = arc.set_job_enabled(&data_job_id, false).await {
error!("Failed to disable job: {}", e);
} else {
info!("Data processing job disabled");
}
sleep(Duration::from_secs(10)).await;
if let Err(e) = arc.set_job_enabled(&data_job_id, true).await {
error!("Failed to enable job: {}", e);
} else {
info!("Data processing job re-enabled");
}
sleep(Duration::from_secs(10)).await;
match arc.execute_job_now(&report_job_id).await {
Ok(execution_id) => info!("Report job executed immediately: {}", execution_id),
Err(e) => error!("Failed to execute report job: {}", e),
}
});
let clone = scheduler.clone();
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(15)).await;
let status = clone.get_status().await;
info!(
"Scheduler Status - Running: {}, Total Jobs: {}, Enabled: {}, Running Tasks: {}",
status.running, status.total_jobs, status.enabled_jobs, status.running_tasks
);
let monitoring_jobs = clone.get_jobs_by_group("monitoring").await;
info!("Monitoring jobs count: {}", monitoring_jobs.len());
}
});
sleep(Duration::from_secs(60)).await;
info!("Stopping scheduler...");
scheduler.stop().await?;
info!("Final counter value: {}", counter.load(Ordering::Relaxed));
Ok(())
}