1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Schedules for repeated task executions.

use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::Duration;

use crate::task::TaskDefinition;

/// A schedule supports repeatedly spawning jobs
/// based on the given settings.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[must_use]
pub struct ScheduleDefinition {
    /// The task schedule policy.
    pub policy: SchedulePolicy,
    /// Whether to immediately spawn a task
    /// when the schedule is first processed.
    pub immediate: bool,
    /// The policy for missed tasks.
    #[serde(default)]
    pub missed_tasks: MissedTasksPolicy,
    /// Parameters for newly spawned tasks.
    pub new_task: NewTask,
    /// Schedule labels.
    #[serde(default)]
    pub labels: HashMap<String, Value>,
}

impl ScheduleDefinition {
    /// Create a new schedule.
    pub fn new(policy: SchedulePolicy, new_task: NewTask) -> Self {
        Self {
            missed_tasks: Default::default(),
            policy,
            new_task,
            immediate: false,
            labels: Default::default(),
        }
    }

    /// Set whether a task should be immediately scheduled
    /// when the schedule is added.
    pub fn immediate(mut self, immediate: bool) -> Self {
        self.immediate = immediate;
        self
    }

    /// Set the missed tasks policy.
    pub fn on_missed_tasks(mut self, policy: MissedTasksPolicy) -> Self {
        self.missed_tasks = policy;
        self
    }

    /// Set a label value.
    ///
    /// # Panics
    ///
    /// Panics if the value is not JSON-serializable.
    pub fn with_label(mut self, name: &str, value: impl Serialize) -> Self {
        self.labels
            .insert(name.into(), serde_json::to_value(value).unwrap());
        self
    }
}

/// Task spawning policy of the schedule.
///
/// It is used to configure how and when new
/// tasks are spawned.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[must_use]
pub enum SchedulePolicy {
    /// Repeatedly spawn tasks.
    Repeat {
        /// The interval between tasks.
        interval_ns: u64,
    },
    /// Repeat using a cron expression.
    Cron {
        /// A cron expression.
        expr: String,
    },
}

impl SchedulePolicy {
    /// Repeat tasks with the given interval.
    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
    pub fn repeat(interval: Duration) -> Self {
        Self::Repeat {
            interval_ns: interval.whole_nanoseconds() as _,
        }
    }

    /// Repeat tasks according to the given cron expression.
    ///
    /// # Errors
    ///
    /// Returns an error if the cron expression is not valid.
    pub fn cron(expr: &str) -> Result<Self, cron::error::Error> {
        expr.parse::<cron::Schedule>()?;

        Ok(Self::Cron {
            expr: expr.to_string(),
        })
    }
}

/// The policy that is used to determine
/// the execution target time of newly spawned
/// tasks when the schedule is behind.
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MissedTasksPolicy {
    /// Queue all missed tasks.
    Burst,
    /// Skip all missed tasks and set
    /// the next task at a multiple of the interval.
    #[default]
    Skip,
}

/// Parameters for newly spawned tasks.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NewTask {
    /// Spawn the same task repeatedly.
    Repeat {
        /// The task to be repeated.
        task: TaskDefinition,
    },
}

impl NewTask {
    /// Use the same task for each run.
    #[must_use]
    pub fn repeat<T>(task: TaskDefinition<T>) -> Self {
        Self::Repeat { task: task.cast() }
    }
}