pollen-scheduler 0.1.0

Task scheduler for Pollen
Documentation
//! Task instance generator.

use chrono::{DateTime, Utc};
use pollen_store::StoreBackend;
use pollen_types::{Result, TaskDef, TaskInstance};
use std::sync::Arc;
use tracing::debug;

/// Generates task instances based on schedules.
pub struct InstanceGenerator {
    store: Arc<StoreBackend>,
}

impl InstanceGenerator {
    /// Create a new instance generator.
    pub fn new(store: Arc<StoreBackend>) -> Self {
        Self { store }
    }

    /// Ensure an instance exists for the given task at the scheduled time.
    pub async fn ensure_instance(&self, task: &TaskDef, scheduled_at: DateTime<Utc>) -> Result<bool> {
        // Check if instance already exists
        let task_id = task.id.clone();
        let existing = self.store.read(move |r| {
            r.list_instances_for_task(&task_id, 10)
        }).await?;

        // Check if we already have an instance for this time
        for instance in &existing {
            // Allow 1 minute tolerance
            let diff = (instance.scheduled_at - scheduled_at).num_seconds().abs();
            if diff < 60 {
                return Ok(false);
            }
        }

        // Create new instance
        let instance = TaskInstance::new(task.id.clone(), scheduled_at);
        let instance_clone = instance.clone();

        self.store.write(move |w| {
            w.insert_instance(&instance_clone)
        }).await?;

        debug!(
            "Generated instance {} for task {} at {}",
            instance.id, task.name, scheduled_at
        );

        Ok(true)
    }

    /// Generate instances for all enabled tasks up to a given horizon.
    pub async fn generate_upcoming(&self, tasks: &[TaskDef], horizon: DateTime<Utc>) -> Result<usize> {
        let now = Utc::now();
        let mut count = 0;

        for task in tasks {
            if !task.enabled {
                continue;
            }

            // Get next execution time
            if let Some(next) = super::compute_next_execution(&task.schedule, now) {
                if next <= horizon
                    && self.ensure_instance(task, next).await? {
                        count += 1;
                    }
            }
        }

        Ok(count)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use pollen_store::{MemoryStore, StoreBackend};
    use pollen_types::Schedule;
    use std::time::Duration;

    #[tokio::test]
    async fn test_generate_instance() {
        let store = Arc::new(StoreBackend::Memory(MemoryStore::new()));

        // Insert a task first
        let task = TaskDef::new("test", Schedule::interval(Duration::from_secs(60)));
        let task_for_insert = task.clone();
        store.write(move |w| w.insert_task(&task_for_insert)).await.unwrap();

        let generator = InstanceGenerator::new(store.clone());

        let scheduled_at = Utc::now() + chrono::Duration::minutes(5);
        let created = generator.ensure_instance(&task, scheduled_at).await.unwrap();

        assert!(created);

        // Second call should not create duplicate
        let created2 = generator.ensure_instance(&task, scheduled_at).await.unwrap();
        assert!(!created2);
    }
}