Skip to main content

aion_core/
schedule.rs

1//! Schedule identifiers, trigger specifications, and persisted schedule configuration.
2
3use std::fmt;
4use std::str::FromStr;
5use std::time::Duration;
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10use crate::Payload;
11
12/// Identifier for a persisted schedule resource.
13#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq, Hash)]
14pub struct ScheduleId(Uuid);
15
16impl ScheduleId {
17    /// Creates a schedule identifier from an existing UUID.
18    #[must_use]
19    pub const fn new(id: Uuid) -> Self {
20        Self(id)
21    }
22
23    /// Creates a schedule identifier with a random version 4 UUID.
24    #[must_use]
25    pub fn new_v4() -> Self {
26        Self(Uuid::new_v4())
27    }
28
29    /// Returns the UUID backing this identifier.
30    #[must_use]
31    pub const fn as_uuid(&self) -> Uuid {
32        self.0
33    }
34}
35
36impl fmt::Display for ScheduleId {
37    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
38        self.0.fmt(formatter)
39    }
40}
41
42impl FromStr for ScheduleId {
43    type Err = uuid::Error;
44
45    fn from_str(value: &str) -> Result<Self, Self::Err> {
46        Uuid::parse_str(value).map(Self)
47    }
48}
49
50/// Trigger definition for a persisted schedule.
51#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
52pub enum TriggerSpec {
53    /// Fire according to a cron expression.
54    Cron {
55        /// Cron expression to evaluate in later scheduling layers.
56        expression: String,
57    },
58    /// Fire at a fixed interval.
59    Interval {
60        /// Duration between firings.
61        #[ts(type = "{ secs: number, nanos: number }")]
62        period: Duration,
63    },
64}
65
66/// Policy for handling a schedule tick while an earlier run is still active.
67#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, Default, PartialEq, Eq)]
68pub enum OverlapPolicy {
69    /// Skip ticks that overlap an active run.
70    #[default]
71    Skip,
72    /// Keep at most one buffered tick while a run is active.
73    BufferOne,
74    /// Cancel the previous run before starting the new one.
75    CancelPrevious,
76    /// Allow every tick to start a run.
77    AllowAll,
78}
79
80/// Policy for handling schedule ticks missed while the scheduler was unavailable.
81#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, Default, PartialEq, Eq)]
82pub enum CatchUpPolicy {
83    /// Start every missed tick.
84    All,
85    /// Start one representative missed tick.
86    #[default]
87    One,
88    /// Skip missed ticks.
89    Skip,
90}
91
92/// Persisted configuration for a schedule resource.
93#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
94pub struct ScheduleConfig {
95    /// Trigger used to compute eligible fire times.
96    pub trigger: TriggerSpec,
97    /// Policy for overlapping firings.
98    #[serde(default)]
99    pub overlap_policy: OverlapPolicy,
100    /// Policy for missed firings.
101    #[serde(default)]
102    pub catch_up_policy: CatchUpPolicy,
103    /// Workflow type started when the schedule fires.
104    pub workflow_type: String,
105    /// Opaque workflow input supplied to triggered executions.
106    pub input: Payload,
107    /// Typed search attributes recorded on every triggered execution.
108    #[serde(default)]
109    pub search_attributes: std::collections::HashMap<String, crate::SearchAttributeValue>,
110}
111
112#[cfg(test)]
113mod tests {
114    use std::collections::HashMap;
115    use std::str::FromStr;
116    use std::time::Duration;
117
118    use serde::de::DeserializeOwned;
119    use serde_json::json;
120
121    use super::{CatchUpPolicy, OverlapPolicy, ScheduleConfig, ScheduleId, TriggerSpec};
122    use crate::Payload;
123
124    fn round_trip<T>(value: &T) -> Result<(), serde_json::Error>
125    where
126        T: DeserializeOwned + PartialEq + serde::Serialize + std::fmt::Debug,
127    {
128        let json = serde_json::to_string(value)?;
129        let decoded = serde_json::from_str::<T>(&json)?;
130        assert_eq!(*value, decoded);
131        Ok(())
132    }
133
134    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
135        Payload::from_json(&json!({ "label": label }))
136    }
137
138    fn config() -> Result<ScheduleConfig, crate::PayloadError> {
139        Ok(ScheduleConfig {
140            trigger: TriggerSpec::Cron {
141                expression: String::from("0 0 * * *"),
142            },
143            overlap_policy: OverlapPolicy::Skip,
144            catch_up_policy: CatchUpPolicy::One,
145            workflow_type: String::from("checkout"),
146            input: payload("schedule-input")?,
147            search_attributes: HashMap::from([(
148                String::from("aion.namespace"),
149                crate::SearchAttributeValue::String(String::from("tenant-a")),
150            )]),
151        })
152    }
153
154    #[test]
155    fn schedule_id_round_trips_through_json() -> Result<(), Box<dyn std::error::Error>> {
156        round_trip(&ScheduleId::new_v4())?;
157        Ok(())
158    }
159
160    #[test]
161    fn schedule_id_is_a_hash_map_key() {
162        let schedule_id = ScheduleId::new_v4();
163
164        let mut schedules = HashMap::new();
165        schedules.insert(schedule_id.clone(), "schedule");
166        assert_eq!(schedules.get(&schedule_id), Some(&"schedule"));
167    }
168
169    #[test]
170    fn schedule_id_display_and_from_str_use_uuid_format() -> Result<(), uuid::Error> {
171        let schedule_id = ScheduleId::new(uuid::Uuid::nil());
172
173        assert_eq!(
174            schedule_id.to_string(),
175            "00000000-0000-0000-0000-000000000000"
176        );
177        assert_eq!(
178            ScheduleId::from_str("00000000-0000-0000-0000-000000000000")?,
179            schedule_id
180        );
181        Ok(())
182    }
183
184    #[test]
185    fn trigger_specs_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
186        round_trip(&TriggerSpec::Cron {
187            expression: String::from("*/5 * * * *"),
188        })?;
189        round_trip(&TriggerSpec::Interval {
190            period: Duration::from_secs(300),
191        })?;
192        Ok(())
193    }
194
195    #[test]
196    fn overlap_policies_round_trip_through_json() -> Result<(), serde_json::Error> {
197        for policy in [
198            OverlapPolicy::Skip,
199            OverlapPolicy::BufferOne,
200            OverlapPolicy::CancelPrevious,
201            OverlapPolicy::AllowAll,
202        ] {
203            round_trip(&policy)?;
204        }
205        Ok(())
206    }
207
208    #[test]
209    fn catch_up_policies_round_trip_through_json() -> Result<(), serde_json::Error> {
210        for policy in [CatchUpPolicy::All, CatchUpPolicy::One, CatchUpPolicy::Skip] {
211            round_trip(&policy)?;
212        }
213        Ok(())
214    }
215
216    #[test]
217    fn schedule_config_round_trips_through_json() -> Result<(), Box<dyn std::error::Error>> {
218        round_trip(&config()?)?;
219        Ok(())
220    }
221
222    #[test]
223    fn schedule_config_defaults_missing_policy_fields() -> Result<(), Box<dyn std::error::Error>> {
224        let value = json!({
225            "trigger": { "Cron": { "expression": "0 0 * * *" } },
226            "workflow_type": "checkout",
227            "input": payload("schedule-input")?,
228        });
229
230        let config = serde_json::from_value::<ScheduleConfig>(value)?;
231
232        assert_eq!(config.overlap_policy, OverlapPolicy::Skip);
233        assert_eq!(config.catch_up_policy, CatchUpPolicy::One);
234        Ok(())
235    }
236}