use pollen::{Pollen, Schedule, Storage};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.try_init();
}
#[tokio::test]
async fn test_single_node_scheduler() {
init_tracing();
let scheduler = Pollen::builder()
.storage(Storage::memory())
.build()
.await
.unwrap();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = Arc::clone(&counter);
scheduler
.define("test_task")
.schedule(Schedule::interval(Duration::from_millis(50)))
.handler(move || {
let c = Arc::clone(&counter_clone);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
})
.register()
.await
.unwrap();
let task = scheduler.get_task("test_task");
assert!(task.is_some());
assert_eq!(task.unwrap().name, "test_task");
let tasks = scheduler.list_tasks();
assert_eq!(tasks.len(), 1);
tokio::time::sleep(Duration::from_millis(200)).await;
let count = counter.load(Ordering::SeqCst);
assert!(count >= 1, "Expected at least 1 execution, got {}", count);
scheduler.shutdown().await;
}
#[tokio::test]
async fn test_task_definition_api() {
let scheduler = Pollen::builder()
.storage(Storage::memory())
.build()
.await
.unwrap();
scheduler
.define("comprehensive_task")
.schedule(Schedule::cron("0 * * * *"))
.handler(|| async { Ok(()) })
.timeout(Duration::from_secs(60))
.retry(pollen::RetryPolicy::exponential(3, Duration::from_secs(1)))
.register()
.await
.unwrap();
let task = scheduler.get_task("comprehensive_task").unwrap();
assert_eq!(task.name, "comprehensive_task");
assert_eq!(task.config.timeout, Duration::from_secs(60));
assert_eq!(task.config.retry.max_attempts, 3);
scheduler.shutdown().await;
}
#[tokio::test]
async fn test_multiple_tasks() {
let scheduler = Pollen::builder()
.storage(Storage::memory())
.build()
.await
.unwrap();
let counter1 = Arc::new(AtomicU32::new(0));
let counter2 = Arc::new(AtomicU32::new(0));
let c1 = Arc::clone(&counter1);
scheduler
.define("task1")
.schedule(Schedule::interval(Duration::from_millis(30)))
.handler(move || {
let c = Arc::clone(&c1);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
})
.register()
.await
.unwrap();
let c2 = Arc::clone(&counter2);
scheduler
.define("task2")
.schedule(Schedule::interval(Duration::from_millis(30)))
.handler(move || {
let c = Arc::clone(&c2);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
}
})
.register()
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
let count1 = counter1.load(Ordering::SeqCst);
let count2 = counter2.load(Ordering::SeqCst);
assert!(count1 >= 1, "Task1 should have executed at least once");
assert!(count2 >= 1, "Task2 should have executed at least once");
scheduler.shutdown().await;
}
#[tokio::test]
async fn test_once_schedule() {
let scheduler = Pollen::builder()
.storage(Storage::memory())
.build()
.await
.unwrap();
let executed = Arc::new(AtomicU32::new(0));
let executed_clone = Arc::clone(&executed);
scheduler
.define("one_shot")
.schedule(Schedule::delay(Duration::from_millis(50)))
.handler(move || {
let e = Arc::clone(&executed_clone);
async move {
e.fetch_add(1, Ordering::SeqCst);
Ok(())
}
})
.register()
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let count = executed.load(Ordering::SeqCst);
assert!(count >= 1, "One-shot task should have executed");
scheduler.shutdown().await;
}