persistent_scheduler/core/
task.rs

1use crate::core::model::TaskMeta;
2use crate::core::retry::{RetryPolicy, RetryStrategy};
3use crate::core::task_kind::TaskKind;
4use serde::{de::DeserializeOwned, Serialize};
5use std::future::Future;
6use std::pin::Pin;
7
8use super::cron::{is_valid_cron_string, is_valid_timezone};
9
10pub type TaskFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
11
12pub trait Task: Serialize + DeserializeOwned + 'static {
13    /// A unique identifier for this task.
14    ///
15    /// This name must be unique and is used to identify the task.
16    const TASK_KEY: &'static str;
17
18    /// The default queue associated with this task.
19    ///
20    /// This can be overridden at the individual task level. If a non-existent queue is specified,
21    /// the task will not be processed.
22    const TASK_QUEUE: &'static str;
23
24    /// Returns the retry policy for this task instance.
25    /// Default is exponential backoff with base 2 and max 3 retries.
26    fn retry_policy(&self) -> RetryPolicy {
27        RetryPolicy {
28            strategy: RetryStrategy::Exponential { base: 2 },
29            max_retries: Some(3),
30        }
31    }
32
33    /// Returns the delay in seconds before executing a Once task.
34    /// Default is 3 seconds.
35    fn delay_seconds(&self) -> u32 {
36        3
37    }
38
39    /// Executes the task with the given parameters.
40    ///
41    /// Contains the logic required to perform the task. Takes parameters of type `Self::Params`
42    /// that can be used during execution.
43    fn run(self) -> TaskFuture;
44
45    /// Validates the parameters based on the task type.
46    ///
47    /// Checks that the necessary fields for the specific `TaskKind` are provided.
48    fn validate(&self, kind: &TaskKind) -> Result<(), String> {
49        if Self::TASK_QUEUE.is_empty() {
50            return Err("TASK_QUEUE must not be empty.".into());
51        }
52
53        match kind {
54            TaskKind::Cron { schedule, timezone } => {
55                if !is_valid_cron_string(&*schedule) {
56                    return Err(format!("Invalid SCHEDULE: '{}' for Cron tasks.", schedule).into());
57                }
58
59                if !is_valid_timezone(&*timezone) {
60                    return Err(format!("Invalid TIMEZONE: '{}' for Cron tasks.", schedule).into());
61                }
62            }
63            TaskKind::Repeat { interval_seconds } => {
64                if interval_seconds <= &0 {
65                    return Err("A valid REPEAT_INTERVAL must be defined for Repeat tasks.".into());
66                }
67            }
68            TaskKind::Once => {
69                // No additional checks needed for Once tasks
70            }
71        }
72        Ok(())
73    }
74
75    /// Creates a new metadata entry for the task.
76    ///
77    /// This method generates a `TaskMetaEntity` instance based on the task's properties.
78    /// It validates required fields and panics if validation fails.
79    fn new_meta(&self, kind: TaskKind) -> TaskMeta {
80        self.validate(&kind).unwrap_or_else(|err| {
81            panic!(
82                "Validation failed for task '{}': {}. This indicates a programming error.",
83                Self::TASK_KEY,
84                err
85            )
86        });
87
88        let is_repeating = matches!(kind, TaskKind::Repeat { .. });
89        TaskMeta::new(
90            Self::TASK_KEY.to_owned(),
91            serde_json::to_string(&self).expect(
92                "Serialization failed: this should never happen if all fields are serializable",
93            ),
94            Self::TASK_QUEUE.to_owned(),
95            kind,
96            self.retry_policy(),
97            is_repeating,
98            self.delay_seconds(),
99        )
100    }
101}