persistent_scheduler/core/task.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
use crate::core::model::TaskMeta;
use crate::core::retry::{RetryPolicy, RetryStrategy};
use crate::core::task_kind::TaskKind;
use serde::{de::DeserializeOwned, Serialize};
use std::future::Future;
use std::pin::Pin;
use super::cron::{is_valid_cron_string, is_valid_timezone};
pub type TaskFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
pub trait Task: Serialize + DeserializeOwned + 'static {
/// A unique identifier for this task.
///
/// This name must be unique and is used to identify the task.
const TASK_KEY: &'static str;
/// The default queue associated with this task.
///
/// This can be overridden at the individual task level. If a non-existent queue is specified,
/// the task will not be processed.
const TASK_QUEUE: &'static str;
/// The retry policy for this task.
///
/// Defines the strategy and maximum retry attempts in case of failure.
/// Default is exponential backoff with a base of 2 and a maximum of 5 retries.
const RETRY_POLICY: RetryPolicy = RetryPolicy {
strategy: RetryStrategy::Exponential { base: 2 },
max_retries: Some(3),
};
/// Delay before executing a Once task, in seconds.
///
/// Specifies the delay before starting a Once task after it is scheduled.
const DELAY_SECONDS: u32 = 3;
/// Executes the task with the given parameters.
///
/// Contains the logic required to perform the task. Takes parameters of type `Self::Params`
/// that can be used during execution.
fn run(self) -> TaskFuture;
/// Validates the parameters based on the task type.
///
/// Checks that the necessary fields for the specific `TaskKind` are provided.
fn validate(&self, kind: &TaskKind) -> Result<(), String> {
if Self::TASK_QUEUE.is_empty() {
return Err("TASK_QUEUE must not be empty.".into());
}
match kind {
TaskKind::Cron { schedule, timezone } => {
if !is_valid_cron_string(&*schedule) {
return Err(
format!("Invalid SCHEDULE: '{}' for Cron tasks.", schedule).into(),
);
}
if !is_valid_timezone(&*timezone) {
return Err(
format!("Invalid TIMEZONE: '{}' for Cron tasks.", schedule).into(),
);
}
}
TaskKind::Repeat { interval_seconds } => {
if interval_seconds <= &0 {
return Err("A valid REPEAT_INTERVAL must be defined for Repeat tasks.".into());
}
}
TaskKind::Once => {
// No additional checks needed for Once tasks
}
}
Ok(())
}
/// Creates a new metadata entry for the task.
///
/// This method generates a `TaskMetaEntity` instance based on the task's properties.
/// It validates required fields and panics if validation fails.
fn new_meta(&self, kind: TaskKind) -> TaskMeta {
self.validate(&kind).unwrap_or_else(|err| {
panic!(
"Validation failed for task '{}': {}. This indicates a programming error.",
Self::TASK_KEY,
err
)
});
let is_repeating = matches!(kind, TaskKind::Repeat { .. });
TaskMeta::new(
Self::TASK_KEY.to_owned(),
serde_json::to_string(&self).expect(
"Serialization failed: this should never happen if all fields are serializable",
),
Self::TASK_QUEUE.to_owned(),
kind,
Self::RETRY_POLICY,
is_repeating,
Self::DELAY_SECONDS,
)
}
}