pollen-types 0.1.0

Shared types for Pollen distributed task scheduler
Documentation
//! Task-related types.

use crate::{InstanceId, NodeId, TaskId};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;

/// Task definition - the blueprint for a scheduled task.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskDef {
    /// Unique identifier.
    pub id: TaskId,
    /// Human-readable name (must be unique).
    pub name: String,
    /// Schedule configuration.
    pub schedule: Schedule,
    /// Task configuration.
    pub config: TaskConfig,
    /// Whether the task is enabled.
    pub enabled: bool,
    /// HLC timestamp for CRDT ordering.
    pub hlc_timestamp: u64,
    /// Version number for optimistic locking.
    pub version: u64,
    /// Creation time.
    pub created_at: DateTime<Utc>,
    /// Last update time.
    pub updated_at: DateTime<Utc>,
}

impl TaskDef {
    /// Create a new task definition.
    pub fn new(name: impl Into<String>, schedule: Schedule) -> Self {
        let now = Utc::now();
        Self {
            id: TaskId::new(),
            name: name.into(),
            schedule,
            config: TaskConfig::default(),
            enabled: true,
            hlc_timestamp: 0,
            version: 0,
            created_at: now,
            updated_at: now,
        }
    }

    /// Set the task configuration.
    pub fn with_config(mut self, config: TaskConfig) -> Self {
        self.config = config;
        self
    }

    /// Set the retry policy.
    pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
        self.config.retry = retry;
        self
    }

    /// Set the execution timeout.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.config.timeout = timeout;
        self
    }

    /// Set the payload data.
    pub fn with_payload(mut self, payload: Bytes) -> Self {
        self.config.payload = Some(payload);
        self
    }
}

/// Schedule configuration for a task.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum Schedule {
    /// Cron expression (e.g., "0 9 * * *").
    Cron(String),
    /// Fixed interval between executions.
    Interval(Duration),
    /// One-time execution at a specific time.
    Once(DateTime<Utc>),
}

impl Schedule {
    /// Create a cron schedule.
    pub fn cron(expr: impl Into<String>) -> Self {
        Schedule::Cron(expr.into())
    }

    /// Create an interval schedule.
    pub fn interval(duration: Duration) -> Self {
        Schedule::Interval(duration)
    }

    /// Create a one-time schedule.
    pub fn once(at: DateTime<Utc>) -> Self {
        Schedule::Once(at)
    }

    /// Create a one-time schedule from a delay.
    pub fn delay(duration: Duration) -> Self {
        Schedule::Once(Utc::now() + chrono::Duration::from_std(duration).unwrap())
    }
}

/// Task configuration options.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskConfig {
    /// Execution timeout.
    pub timeout: Duration,
    /// Retry policy.
    pub retry: RetryPolicy,
    /// Optional payload data.
    pub payload: Option<Bytes>,
    /// Handler identifier (for routing to the correct handler).
    pub handler_id: String,
}

impl Default for TaskConfig {
    fn default() -> Self {
        Self {
            timeout: Duration::from_secs(300), // 5 minutes
            retry: RetryPolicy::default(),
            payload: None,
            handler_id: String::new(),
        }
    }
}

/// Retry policy for failed tasks.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetryPolicy {
    /// Maximum number of retry attempts.
    pub max_attempts: u32,
    /// Initial delay before first retry.
    pub initial_delay: Duration,
    /// Maximum delay between retries.
    pub max_delay: Duration,
    /// Backoff multiplier.
    pub multiplier: f64,
}

impl Default for RetryPolicy {
    fn default() -> Self {
        Self {
            max_attempts: 3,
            initial_delay: Duration::from_secs(1),
            max_delay: Duration::from_secs(300),
            multiplier: 2.0,
        }
    }
}

impl RetryPolicy {
    /// Create a policy with no retries.
    pub fn none() -> Self {
        Self {
            max_attempts: 0,
            ..Default::default()
        }
    }

    /// Create an exponential backoff policy.
    pub fn exponential(max_attempts: u32, initial_delay: Duration) -> Self {
        Self {
            max_attempts,
            initial_delay,
            multiplier: 2.0,
            ..Default::default()
        }
    }

    /// Create a fixed delay policy.
    pub fn fixed(max_attempts: u32, delay: Duration) -> Self {
        Self {
            max_attempts,
            initial_delay: delay,
            max_delay: delay,
            multiplier: 1.0,
        }
    }

    /// Calculate delay for a given attempt number.
    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
        if attempt == 0 {
            return Duration::ZERO;
        }
        let delay = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32 - 1);
        let delay = Duration::from_secs_f64(delay);
        std::cmp::min(delay, self.max_delay)
    }
}

/// Task instance - a single execution of a task.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskInstance {
    /// Unique identifier for this instance.
    pub id: InstanceId,
    /// The task this instance belongs to.
    pub task_id: TaskId,
    /// Scheduled execution time.
    pub scheduled_at: DateTime<Utc>,
    /// Current status.
    pub status: TaskStatus,
    /// Node that claimed this instance.
    pub claimed_by: Option<NodeId>,
    /// Version for optimistic locking.
    pub claim_version: u64,
    /// Actual start time.
    pub started_at: Option<DateTime<Utc>>,
    /// Completion time.
    pub completed_at: Option<DateTime<Utc>>,
    /// Result data (if successful).
    pub result: Option<Bytes>,
    /// Error message (if failed).
    pub error: Option<String>,
    /// Current attempt number.
    pub attempt: u32,
}

impl TaskInstance {
    /// Create a new pending task instance.
    pub fn new(task_id: TaskId, scheduled_at: DateTime<Utc>) -> Self {
        Self {
            id: InstanceId::new(),
            task_id,
            scheduled_at,
            status: TaskStatus::Pending,
            claimed_by: None,
            claim_version: 0,
            started_at: None,
            completed_at: None,
            result: None,
            error: None,
            attempt: 0,
        }
    }

    /// Check if this instance is ready to execute.
    pub fn is_ready(&self) -> bool {
        self.status == TaskStatus::Pending && self.scheduled_at <= Utc::now()
    }

    /// Check if this instance can be retried.
    pub fn can_retry(&self, policy: &RetryPolicy) -> bool {
        self.status == TaskStatus::Failed && self.attempt < policy.max_attempts
    }
}

/// Status of a task instance.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskStatus {
    /// Waiting to be claimed.
    Pending,
    /// Claimed by a node, waiting to start.
    Claimed,
    /// Currently executing.
    Running,
    /// Completed successfully.
    Success,
    /// Failed (may be retried).
    Failed,
    /// Cancelled by user.
    Cancelled,
}

impl TaskStatus {
    /// Check if this is a terminal status.
    pub fn is_terminal(&self) -> bool {
        matches!(self, TaskStatus::Success | TaskStatus::Cancelled)
    }

    /// Convert to string representation.
    pub fn as_str(&self) -> &'static str {
        match self {
            TaskStatus::Pending => "pending",
            TaskStatus::Claimed => "claimed",
            TaskStatus::Running => "running",
            TaskStatus::Success => "success",
            TaskStatus::Failed => "failed",
            TaskStatus::Cancelled => "cancelled",
        }
    }
}

impl std::str::FromStr for TaskStatus {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "pending" => Ok(TaskStatus::Pending),
            "claimed" => Ok(TaskStatus::Claimed),
            "running" => Ok(TaskStatus::Running),
            "success" => Ok(TaskStatus::Success),
            "failed" => Ok(TaskStatus::Failed),
            "cancelled" => Ok(TaskStatus::Cancelled),
            _ => Err(format!("unknown task status: {}", s)),
        }
    }
}

/// Execution context passed to task handlers.
#[derive(Clone, Debug)]
pub struct TaskContext {
    /// Task ID.
    pub task_id: TaskId,
    /// Instance ID.
    pub instance_id: InstanceId,
    /// Scheduled execution time.
    pub scheduled_at: DateTime<Utc>,
    /// Current attempt number.
    pub attempt: u32,
    /// Payload data.
    payload: Bytes,
}

impl TaskContext {
    /// Create a new task context.
    pub fn new(
        task_id: TaskId,
        instance_id: InstanceId,
        scheduled_at: DateTime<Utc>,
        attempt: u32,
        payload: Bytes,
    ) -> Self {
        Self {
            task_id,
            instance_id,
            scheduled_at,
            attempt,
            payload,
        }
    }

    /// Get the raw payload bytes.
    pub fn payload_bytes(&self) -> &Bytes {
        &self.payload
    }

    /// Deserialize the payload.
    pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, bincode::Error> {
        bincode::deserialize(&self.payload)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_retry_policy_delay() {
        let policy = RetryPolicy::exponential(5, Duration::from_secs(1));

        assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
        assert_eq!(policy.delay_for_attempt(1), Duration::from_secs(1));
        assert_eq!(policy.delay_for_attempt(2), Duration::from_secs(2));
        assert_eq!(policy.delay_for_attempt(3), Duration::from_secs(4));
    }

    #[test]
    fn test_task_status_parse() {
        assert_eq!("pending".parse::<TaskStatus>().unwrap(), TaskStatus::Pending);
        assert_eq!("running".parse::<TaskStatus>().unwrap(), TaskStatus::Running);
    }

    #[test]
    fn test_task_instance_ready() {
        let task_id = TaskId::new();
        let past = Utc::now() - chrono::Duration::hours(1);
        let future = Utc::now() + chrono::Duration::hours(1);

        let past_instance = TaskInstance::new(task_id.clone(), past);
        let future_instance = TaskInstance::new(task_id, future);

        assert!(past_instance.is_ready());
        assert!(!future_instance.is_ready());
    }
}