Skip to main content

pollen_types/
task.rs

1//! Task-related types.
2
3use crate::{InstanceId, NodeId, TaskId};
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::time::Duration;
8
9/// Task definition - the blueprint for a scheduled task.
10#[derive(Clone, Debug, Serialize, Deserialize)]
11pub struct TaskDef {
12    /// Unique identifier.
13    pub id: TaskId,
14    /// Human-readable name (must be unique).
15    pub name: String,
16    /// Schedule configuration.
17    pub schedule: Schedule,
18    /// Task configuration.
19    pub config: TaskConfig,
20    /// Whether the task is enabled.
21    pub enabled: bool,
22    /// HLC timestamp for CRDT ordering.
23    pub hlc_timestamp: u64,
24    /// Version number for optimistic locking.
25    pub version: u64,
26    /// Creation time.
27    pub created_at: DateTime<Utc>,
28    /// Last update time.
29    pub updated_at: DateTime<Utc>,
30}
31
32impl TaskDef {
33    /// Create a new task definition.
34    pub fn new(name: impl Into<String>, schedule: Schedule) -> Self {
35        let now = Utc::now();
36        Self {
37            id: TaskId::new(),
38            name: name.into(),
39            schedule,
40            config: TaskConfig::default(),
41            enabled: true,
42            hlc_timestamp: 0,
43            version: 0,
44            created_at: now,
45            updated_at: now,
46        }
47    }
48
49    /// Set the task configuration.
50    pub fn with_config(mut self, config: TaskConfig) -> Self {
51        self.config = config;
52        self
53    }
54
55    /// Set the retry policy.
56    pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
57        self.config.retry = retry;
58        self
59    }
60
61    /// Set the execution timeout.
62    pub fn with_timeout(mut self, timeout: Duration) -> Self {
63        self.config.timeout = timeout;
64        self
65    }
66
67    /// Set the payload data.
68    pub fn with_payload(mut self, payload: Bytes) -> Self {
69        self.config.payload = Some(payload);
70        self
71    }
72}
73
74/// Schedule configuration for a task.
75#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76pub enum Schedule {
77    /// Cron expression (e.g., "0 9 * * *").
78    Cron(String),
79    /// Fixed interval between executions.
80    Interval(Duration),
81    /// One-time execution at a specific time.
82    Once(DateTime<Utc>),
83}
84
85impl Schedule {
86    /// Create a cron schedule.
87    pub fn cron(expr: impl Into<String>) -> Self {
88        Schedule::Cron(expr.into())
89    }
90
91    /// Create an interval schedule.
92    pub fn interval(duration: Duration) -> Self {
93        Schedule::Interval(duration)
94    }
95
96    /// Create a one-time schedule.
97    pub fn once(at: DateTime<Utc>) -> Self {
98        Schedule::Once(at)
99    }
100
101    /// Create a one-time schedule from a delay.
102    pub fn delay(duration: Duration) -> Self {
103        Schedule::Once(Utc::now() + chrono::Duration::from_std(duration).unwrap())
104    }
105}
106
107/// Task configuration options.
108#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct TaskConfig {
110    /// Execution timeout.
111    pub timeout: Duration,
112    /// Retry policy.
113    pub retry: RetryPolicy,
114    /// Optional payload data.
115    pub payload: Option<Bytes>,
116    /// Handler identifier (for routing to the correct handler).
117    pub handler_id: String,
118}
119
120impl Default for TaskConfig {
121    fn default() -> Self {
122        Self {
123            timeout: Duration::from_secs(300), // 5 minutes
124            retry: RetryPolicy::default(),
125            payload: None,
126            handler_id: String::new(),
127        }
128    }
129}
130
131/// Retry policy for failed tasks.
132#[derive(Clone, Debug, Serialize, Deserialize)]
133pub struct RetryPolicy {
134    /// Maximum number of retry attempts.
135    pub max_attempts: u32,
136    /// Initial delay before first retry.
137    pub initial_delay: Duration,
138    /// Maximum delay between retries.
139    pub max_delay: Duration,
140    /// Backoff multiplier.
141    pub multiplier: f64,
142}
143
144impl Default for RetryPolicy {
145    fn default() -> Self {
146        Self {
147            max_attempts: 3,
148            initial_delay: Duration::from_secs(1),
149            max_delay: Duration::from_secs(300),
150            multiplier: 2.0,
151        }
152    }
153}
154
155impl RetryPolicy {
156    /// Create a policy with no retries.
157    pub fn none() -> Self {
158        Self {
159            max_attempts: 0,
160            ..Default::default()
161        }
162    }
163
164    /// Create an exponential backoff policy.
165    pub fn exponential(max_attempts: u32, initial_delay: Duration) -> Self {
166        Self {
167            max_attempts,
168            initial_delay,
169            multiplier: 2.0,
170            ..Default::default()
171        }
172    }
173
174    /// Create a fixed delay policy.
175    pub fn fixed(max_attempts: u32, delay: Duration) -> Self {
176        Self {
177            max_attempts,
178            initial_delay: delay,
179            max_delay: delay,
180            multiplier: 1.0,
181        }
182    }
183
184    /// Calculate delay for a given attempt number.
185    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
186        if attempt == 0 {
187            return Duration::ZERO;
188        }
189        let delay = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32 - 1);
190        let delay = Duration::from_secs_f64(delay);
191        std::cmp::min(delay, self.max_delay)
192    }
193}
194
195/// Task instance - a single execution of a task.
196#[derive(Clone, Debug, Serialize, Deserialize)]
197pub struct TaskInstance {
198    /// Unique identifier for this instance.
199    pub id: InstanceId,
200    /// The task this instance belongs to.
201    pub task_id: TaskId,
202    /// Scheduled execution time.
203    pub scheduled_at: DateTime<Utc>,
204    /// Current status.
205    pub status: TaskStatus,
206    /// Node that claimed this instance.
207    pub claimed_by: Option<NodeId>,
208    /// Version for optimistic locking.
209    pub claim_version: u64,
210    /// Actual start time.
211    pub started_at: Option<DateTime<Utc>>,
212    /// Completion time.
213    pub completed_at: Option<DateTime<Utc>>,
214    /// Result data (if successful).
215    pub result: Option<Bytes>,
216    /// Error message (if failed).
217    pub error: Option<String>,
218    /// Current attempt number.
219    pub attempt: u32,
220}
221
222impl TaskInstance {
223    /// Create a new pending task instance.
224    pub fn new(task_id: TaskId, scheduled_at: DateTime<Utc>) -> Self {
225        Self {
226            id: InstanceId::new(),
227            task_id,
228            scheduled_at,
229            status: TaskStatus::Pending,
230            claimed_by: None,
231            claim_version: 0,
232            started_at: None,
233            completed_at: None,
234            result: None,
235            error: None,
236            attempt: 0,
237        }
238    }
239
240    /// Check if this instance is ready to execute.
241    pub fn is_ready(&self) -> bool {
242        self.status == TaskStatus::Pending && self.scheduled_at <= Utc::now()
243    }
244
245    /// Check if this instance can be retried.
246    pub fn can_retry(&self, policy: &RetryPolicy) -> bool {
247        self.status == TaskStatus::Failed && self.attempt < policy.max_attempts
248    }
249}
250
251/// Status of a task instance.
252#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
253pub enum TaskStatus {
254    /// Waiting to be claimed.
255    Pending,
256    /// Claimed by a node, waiting to start.
257    Claimed,
258    /// Currently executing.
259    Running,
260    /// Completed successfully.
261    Success,
262    /// Failed (may be retried).
263    Failed,
264    /// Cancelled by user.
265    Cancelled,
266}
267
268impl TaskStatus {
269    /// Check if this is a terminal status.
270    pub fn is_terminal(&self) -> bool {
271        matches!(self, TaskStatus::Success | TaskStatus::Cancelled)
272    }
273
274    /// Convert to string representation.
275    pub fn as_str(&self) -> &'static str {
276        match self {
277            TaskStatus::Pending => "pending",
278            TaskStatus::Claimed => "claimed",
279            TaskStatus::Running => "running",
280            TaskStatus::Success => "success",
281            TaskStatus::Failed => "failed",
282            TaskStatus::Cancelled => "cancelled",
283        }
284    }
285}
286
287impl std::str::FromStr for TaskStatus {
288    type Err = String;
289
290    fn from_str(s: &str) -> Result<Self, Self::Err> {
291        match s {
292            "pending" => Ok(TaskStatus::Pending),
293            "claimed" => Ok(TaskStatus::Claimed),
294            "running" => Ok(TaskStatus::Running),
295            "success" => Ok(TaskStatus::Success),
296            "failed" => Ok(TaskStatus::Failed),
297            "cancelled" => Ok(TaskStatus::Cancelled),
298            _ => Err(format!("unknown task status: {}", s)),
299        }
300    }
301}
302
303/// Execution context passed to task handlers.
304#[derive(Clone, Debug)]
305pub struct TaskContext {
306    /// Task ID.
307    pub task_id: TaskId,
308    /// Instance ID.
309    pub instance_id: InstanceId,
310    /// Scheduled execution time.
311    pub scheduled_at: DateTime<Utc>,
312    /// Current attempt number.
313    pub attempt: u32,
314    /// Payload data.
315    payload: Bytes,
316}
317
318impl TaskContext {
319    /// Create a new task context.
320    pub fn new(
321        task_id: TaskId,
322        instance_id: InstanceId,
323        scheduled_at: DateTime<Utc>,
324        attempt: u32,
325        payload: Bytes,
326    ) -> Self {
327        Self {
328            task_id,
329            instance_id,
330            scheduled_at,
331            attempt,
332            payload,
333        }
334    }
335
336    /// Get the raw payload bytes.
337    pub fn payload_bytes(&self) -> &Bytes {
338        &self.payload
339    }
340
341    /// Deserialize the payload.
342    pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, bincode::Error> {
343        bincode::deserialize(&self.payload)
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_retry_policy_delay() {
353        let policy = RetryPolicy::exponential(5, Duration::from_secs(1));
354
355        assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
356        assert_eq!(policy.delay_for_attempt(1), Duration::from_secs(1));
357        assert_eq!(policy.delay_for_attempt(2), Duration::from_secs(2));
358        assert_eq!(policy.delay_for_attempt(3), Duration::from_secs(4));
359    }
360
361    #[test]
362    fn test_task_status_parse() {
363        assert_eq!("pending".parse::<TaskStatus>().unwrap(), TaskStatus::Pending);
364        assert_eq!("running".parse::<TaskStatus>().unwrap(), TaskStatus::Running);
365    }
366
367    #[test]
368    fn test_task_instance_ready() {
369        let task_id = TaskId::new();
370        let past = Utc::now() - chrono::Duration::hours(1);
371        let future = Utc::now() + chrono::Duration::hours(1);
372
373        let past_instance = TaskInstance::new(task_id.clone(), past);
374        let future_instance = TaskInstance::new(task_id, future);
375
376        assert!(past_instance.is_ready());
377        assert!(!future_instance.is_ready());
378    }
379}