pollen-rs 0.1.0

Embedded decentralized distributed task scheduler
Documentation
//! Integration tests for Pollen scheduler.

use pollen::{Pollen, Schedule, Storage};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

fn init_tracing() {
    let _ = tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_test_writer()
        .try_init();
}

#[tokio::test]
async fn test_single_node_scheduler() {
    init_tracing();
    // Initialize scheduler with in-memory storage
    let scheduler = Pollen::builder()
        .storage(Storage::memory())
        .build()
        .await
        .unwrap();

    // Counter to track executions
    let counter = Arc::new(AtomicU32::new(0));
    let counter_clone = Arc::clone(&counter);

    // Define a task that runs every 50ms
    scheduler
        .define("test_task")
        .schedule(Schedule::interval(Duration::from_millis(50)))
        .handler(move || {
            let c = Arc::clone(&counter_clone);
            async move {
                c.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        })
        .register()
        .await
        .unwrap();

    // Verify task was registered
    let task = scheduler.get_task("test_task");
    assert!(task.is_some());
    assert_eq!(task.unwrap().name, "test_task");

    // List tasks
    let tasks = scheduler.list_tasks();
    assert_eq!(tasks.len(), 1);

    // Wait for some executions
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Verify executions happened
    let count = counter.load(Ordering::SeqCst);
    assert!(count >= 1, "Expected at least 1 execution, got {}", count);

    // Shutdown gracefully
    scheduler.shutdown().await;
}

#[tokio::test]
async fn test_task_definition_api() {
    let scheduler = Pollen::builder()
        .storage(Storage::memory())
        .build()
        .await
        .unwrap();

    // Define task with all options
    scheduler
        .define("comprehensive_task")
        .schedule(Schedule::cron("0 * * * *"))
        .handler(|| async { Ok(()) })
        .timeout(Duration::from_secs(60))
        .retry(pollen::RetryPolicy::exponential(3, Duration::from_secs(1)))
        .register()
        .await
        .unwrap();

    let task = scheduler.get_task("comprehensive_task").unwrap();
    assert_eq!(task.name, "comprehensive_task");
    assert_eq!(task.config.timeout, Duration::from_secs(60));
    assert_eq!(task.config.retry.max_attempts, 3);

    scheduler.shutdown().await;
}

#[tokio::test]
async fn test_multiple_tasks() {
    let scheduler = Pollen::builder()
        .storage(Storage::memory())
        .build()
        .await
        .unwrap();

    let counter1 = Arc::new(AtomicU32::new(0));
    let counter2 = Arc::new(AtomicU32::new(0));

    let c1 = Arc::clone(&counter1);
    scheduler
        .define("task1")
        .schedule(Schedule::interval(Duration::from_millis(30)))
        .handler(move || {
            let c = Arc::clone(&c1);
            async move {
                c.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        })
        .register()
        .await
        .unwrap();

    let c2 = Arc::clone(&counter2);
    scheduler
        .define("task2")
        .schedule(Schedule::interval(Duration::from_millis(30)))
        .handler(move || {
            let c = Arc::clone(&c2);
            async move {
                c.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        })
        .register()
        .await
        .unwrap();

    // Wait for executions
    tokio::time::sleep(Duration::from_millis(150)).await;

    let count1 = counter1.load(Ordering::SeqCst);
    let count2 = counter2.load(Ordering::SeqCst);

    assert!(count1 >= 1, "Task1 should have executed at least once");
    assert!(count2 >= 1, "Task2 should have executed at least once");

    scheduler.shutdown().await;
}

#[tokio::test]
async fn test_once_schedule() {
    let scheduler = Pollen::builder()
        .storage(Storage::memory())
        .build()
        .await
        .unwrap();

    let executed = Arc::new(AtomicU32::new(0));
    let executed_clone = Arc::clone(&executed);

    // Schedule a one-time task to run 50ms from now
    scheduler
        .define("one_shot")
        .schedule(Schedule::delay(Duration::from_millis(50)))
        .handler(move || {
            let e = Arc::clone(&executed_clone);
            async move {
                e.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
        })
        .register()
        .await
        .unwrap();

    // Wait for execution
    tokio::time::sleep(Duration::from_millis(200)).await;

    let count = executed.load(Ordering::SeqCst);
    // One-time task should execute exactly once
    assert!(count >= 1, "One-shot task should have executed");

    scheduler.shutdown().await;
}