persistent_scheduler/core/
task.rs

1use crate::core::model::TaskMeta;
2use crate::core::retry::{RetryPolicy, RetryStrategy};
3use crate::core::task_kind::TaskKind;
4use serde::{de::DeserializeOwned, Serialize};
5use std::future::Future;
6use std::pin::Pin;
7
8use super::cron::{is_valid_cron_string, is_valid_timezone};
9
10pub type TaskFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
11
12pub trait Task: Serialize + DeserializeOwned + 'static {
13    /// A unique identifier for this task.
14    ///
15    /// This name must be unique and is used to identify the task.
16    const TASK_KEY: &'static str;
17
18    /// The default queue associated with this task.
19    ///
20    /// This can be overridden at the individual task level. If a non-existent queue is specified,
21    /// the task will not be processed.
22    const TASK_QUEUE: &'static str;
23
24    /// The retry policy for this task.
25    ///
26    /// Defines the strategy and maximum retry attempts in case of failure.
27    /// Default is exponential backoff with a base of 2 and a maximum of 3 retries.
28    const RETRY_POLICY: RetryPolicy = RetryPolicy {
29        strategy: RetryStrategy::Exponential { base: 2 },
30        max_retries: Some(3),
31    };
32
33    /// Delay before executing a Once task, in seconds.
34    ///
35    /// Specifies the delay before starting a Once task after it is scheduled.
36    const DELAY_SECONDS: u32 = 3;
37
38    /// Executes the task with the given parameters.
39    ///
40    /// Contains the logic required to perform the task. Takes parameters of type `Self::Params`
41    /// that can be used during execution.
42    fn run(self) -> TaskFuture;
43
44    /// Validates the parameters based on the task type.
45    ///
46    /// Checks that the necessary fields for the specific `TaskKind` are provided.
47    fn validate(&self, kind: &TaskKind) -> Result<(), String> {
48        if Self::TASK_QUEUE.is_empty() {
49            return Err("TASK_QUEUE must not be empty.".into());
50        }
51
52        match kind {
53            TaskKind::Cron { schedule, timezone } => {
54                if !is_valid_cron_string(&*schedule) {
55                    return Err(
56                        format!("Invalid SCHEDULE: '{}' for Cron tasks.", schedule).into(),
57                    );
58                }
59
60                if !is_valid_timezone(&*timezone) {
61                    return Err(
62                        format!("Invalid TIMEZONE: '{}' for Cron tasks.", schedule).into(),
63                    );
64                }
65            }
66            TaskKind::Repeat { interval_seconds } => {
67                if interval_seconds <= &0 {
68                    return Err("A valid REPEAT_INTERVAL must be defined for Repeat tasks.".into());
69                }
70            }
71            TaskKind::Once => {
72                // No additional checks needed for Once tasks
73            }
74        }
75        Ok(())
76    }
77
78    /// Creates a new metadata entry for the task.
79    ///
80    /// This method generates a `TaskMetaEntity` instance based on the task's properties.
81    /// It validates required fields and panics if validation fails.
82    fn new_meta(&self, kind: TaskKind) -> TaskMeta {
83        self.validate(&kind).unwrap_or_else(|err| {
84            panic!(
85                "Validation failed for task '{}': {}. This indicates a programming error.",
86                Self::TASK_KEY,
87                err
88            )
89        });
90
91        let is_repeating = matches!(kind, TaskKind::Repeat { .. });
92        TaskMeta::new(
93            Self::TASK_KEY.to_owned(),
94            serde_json::to_string(&self).expect(
95                "Serialization failed: this should never happen if all fields are serializable",
96            ),
97            Self::TASK_QUEUE.to_owned(),
98            kind,
99            Self::RETRY_POLICY,
100            is_repeating,
101            Self::DELAY_SECONDS,
102        )
103    }
104}