wasmer_config/app/
job.rs

1use std::{borrow::Cow, fmt::Display, str::FromStr};
2
3use anyhow::anyhow;
4use serde::{de::Error, Deserialize, Serialize};
5
6use indexmap::IndexMap;
7
8use crate::package::PackageSource;
9
10use super::{pretty_duration::PrettyDuration, AppConfigCapabilityMemoryV1, AppVolume, HttpRequest};
11
12/// Job configuration.
13#[derive(
14    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
15)]
16pub struct Job {
17    name: String,
18    trigger: JobTrigger,
19
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub timeout: Option<PrettyDuration>,
22
23    /// Don't start job if past the due time by this amount,
24    /// instead opting to wait for the next instance of it
25    /// to be triggered.
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub max_schedule_drift: Option<PrettyDuration>,
28
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub retries: Option<u32>,
31
32    /// Maximum percent of "jitter" to introduce between invocations.
33    ///
34    /// Value range: 0-100
35    ///
36    /// Jitter is used to spread out jobs over time.
37    /// The calculation works by multiplying the time between invocations
38    /// by a random amount, and taking the percentage of that random amount.
39    ///
40    /// See also [`Self::jitter_percent_min`] to set a minimum jitter.
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub jitter_percent_max: Option<u8>,
43
44    /// Minimum "jitter" to introduce between invocations.
45    ///
46    /// Value range: 0-100
47    ///
48    /// Jitter is used to spread out jobs over time.
49    /// The calculation works by multiplying the time between invocations
50    /// by a random amount, and taking the percentage of that random amount.
51    ///
52    /// If not specified while `jitter_percent_max` is, it will default to 10%.
53    ///
54    /// See also [`Self::jitter_percent_max`] to set a maximum jitter.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub jitter_percent_min: Option<u8>,
57
58    action: JobAction,
59
60    /// Additional unknown fields.
61    ///
62    /// Exists for forward compatibility for newly added fields.
63    #[serde(flatten)]
64    pub other: IndexMap<String, serde_json::Value>,
65}
66
67// We need this wrapper struct to enable this formatting:
68// job:
69//   action:
70//     execute: ...
71#[derive(
72    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
73)]
74pub struct JobAction {
75    #[serde(flatten)]
76    action: JobActionCase,
77}
78
79#[derive(
80    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
81)]
82#[serde(rename_all = "lowercase")]
83pub enum JobActionCase {
84    Fetch(HttpRequest),
85    Execute(ExecutableJob),
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub struct CronExpression {
90    pub cron: saffron::parse::CronExpr,
91    // Keep the original string form around for serialization purposes.
92    pub parsed_from: String,
93}
94
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub enum JobTrigger {
97    PreDeployment,
98    PostDeployment,
99    Cron(CronExpression),
100    Duration(PrettyDuration),
101}
102
103#[derive(
104    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
105)]
106pub struct ExecutableJob {
107    /// The package that contains the command to run. Defaults to the app config's package.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    package: Option<PackageSource>,
110
111    /// The command to run. Defaults to the package's entrypoint.
112    #[serde(skip_serializing_if = "Option::is_none")]
113    command: Option<String>,
114
115    /// CLI arguments passed to the runner.
116    /// Only applicable for runners that accept CLI arguments.
117    #[serde(skip_serializing_if = "Option::is_none")]
118    cli_args: Option<Vec<String>>,
119
120    /// Environment variables.
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub env: Option<IndexMap<String, String>>,
123
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub capabilities: Option<ExecutableJobCompatibilityMapV1>,
126
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub volumes: Option<Vec<AppVolume>>,
129}
130
131#[derive(
132    serde::Serialize, serde::Deserialize, schemars::JsonSchema, Clone, Debug, PartialEq, Eq,
133)]
134pub struct ExecutableJobCompatibilityMapV1 {
135    /// Instance memory settings.
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub memory: Option<AppConfigCapabilityMemoryV1>,
138
139    /// Additional unknown capabilities.
140    ///
141    /// This provides a small bit of forwards compatibility for newly added
142    /// capabilities.
143    #[serde(flatten)]
144    pub other: IndexMap<String, serde_json::Value>,
145}
146
147impl Serialize for JobTrigger {
148    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149    where
150        S: serde::Serializer,
151    {
152        self.to_string().serialize(serializer)
153    }
154}
155
156impl<'de> Deserialize<'de> for JobTrigger {
157    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
158    where
159        D: serde::Deserializer<'de>,
160    {
161        let repr: Cow<'de, str> = Cow::deserialize(deserializer)?;
162        repr.parse().map_err(D::Error::custom)
163    }
164}
165
166impl Display for JobTrigger {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Self::PreDeployment => write!(f, "pre-deployment"),
170            Self::PostDeployment => write!(f, "post-deployment"),
171            Self::Cron(cron) => write!(f, "{}", cron.parsed_from),
172            Self::Duration(duration) => write!(f, "{duration}"),
173        }
174    }
175}
176
177impl FromStr for JobTrigger {
178    type Err = anyhow::Error;
179
180    fn from_str(s: &str) -> Result<Self, Self::Err> {
181        if s == "pre-deployment" {
182            Ok(Self::PreDeployment)
183        } else if s == "post-deployment" {
184            Ok(Self::PostDeployment)
185        } else if let Ok(expr) = s.parse::<CronExpression>() {
186            Ok(Self::Cron(expr))
187        } else if let Ok(duration) = s.parse::<PrettyDuration>() {
188            Ok(Self::Duration(duration))
189        } else {
190            Err(anyhow!(
191                "Invalid job trigger '{s}'. Must be 'pre-deployment', 'post-deployment', \
192                a valid cron expression such as '0 */5 * * *' or a duration such as '15m'.",
193            ))
194        }
195    }
196}
197
198impl FromStr for CronExpression {
199    type Err = Box<dyn std::error::Error + Send + Sync>;
200
201    fn from_str(s: &str) -> Result<Self, Self::Err> {
202        if let Some(predefined_sched) = s.strip_prefix('@') {
203            match predefined_sched {
204                "hourly" => Ok(Self {
205                    cron: "0 * * * *".parse().unwrap(),
206                    parsed_from: s.to_owned(),
207                }),
208                "daily" => Ok(Self {
209                    cron: "0 0 * * *".parse().unwrap(),
210                    parsed_from: s.to_owned(),
211                }),
212                "weekly" => Ok(Self {
213                    cron: "0 0 * * 1".parse().unwrap(),
214                    parsed_from: s.to_owned(),
215                }),
216                "monthly" => Ok(Self {
217                    cron: "0 0 1 * *".parse().unwrap(),
218                    parsed_from: s.to_owned(),
219                }),
220                "yearly" => Ok(Self {
221                    cron: "0 0 1 1 *".parse().unwrap(),
222                    parsed_from: s.to_owned(),
223                }),
224                _ => Err(format!("Invalid cron expression {s}").into()),
225            }
226        } else {
227            // Let's make sure the input string is valid...
228            match s.parse() {
229                Ok(expr) => Ok(Self {
230                    cron: expr,
231                    parsed_from: s.to_owned(),
232                }),
233                Err(_) => Err(format!("Invalid cron expression {s}").into()),
234            }
235        }
236    }
237}
238
239impl schemars::JsonSchema for JobTrigger {
240    fn schema_name() -> String {
241        "JobTrigger".to_owned()
242    }
243
244    fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
245        String::json_schema(gen)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use std::time::Duration;
252
253    use super::*;
254
255    #[test]
256    pub fn job_trigger_serialization_roundtrip() {
257        fn assert_roundtrip(
258            serialized: &str,
259            description: Option<&str>,
260            duration: Option<Duration>,
261        ) {
262            let parsed = serialized.parse::<JobTrigger>().unwrap();
263            assert_eq!(&parsed.to_string(), serialized);
264
265            if let JobTrigger::Cron(expr) = &parsed {
266                assert_eq!(
267                    &expr
268                        .cron
269                        .describe(saffron::parse::English::default())
270                        .to_string(),
271                    description.unwrap()
272                );
273            } else {
274                assert!(description.is_none());
275            }
276
277            if let JobTrigger::Duration(d) = &parsed {
278                assert_eq!(d.as_duration(), duration.unwrap());
279            } else {
280                assert!(duration.is_none());
281            }
282        }
283
284        assert_roundtrip("pre-deployment", None, None);
285        assert_roundtrip("post-deployment", None, None);
286
287        assert_roundtrip("@hourly", Some("Every hour"), None);
288        assert_roundtrip("@daily", Some("At 12:00 AM"), None);
289        assert_roundtrip("@weekly", Some("At 12:00 AM on Sunday"), None);
290        assert_roundtrip(
291            "@monthly",
292            Some("At 12:00 AM on the 1st of every month"),
293            None,
294        );
295        assert_roundtrip("@yearly", Some("At 12:00 AM on the 1st of January"), None);
296
297        // Note: the parsing code should keep the formatting of the source string.
298        // This is tested in assert_roundtrip.
299        assert_roundtrip(
300            "0/2 12 * JAN-APR 2",
301            Some(
302                "At every 2nd minute from 0 through 59 minutes past the hour, \
303                between 12:00 PM and 12:59 PM on Monday of January to April",
304            ),
305            None,
306        );
307
308        assert_roundtrip("10s", None, Some(Duration::from_secs(10)));
309        assert_roundtrip("15m", None, Some(Duration::from_secs(15 * 60)));
310        assert_roundtrip("20h", None, Some(Duration::from_secs(20 * 60 * 60)));
311        assert_roundtrip("2d", None, Some(Duration::from_secs(2 * 60 * 60 * 24)));
312    }
313
314    #[test]
315    pub fn job_serialization_roundtrip() {
316        fn parse_cron(expr: &str) -> CronExpression {
317            CronExpression {
318                cron: expr.parse().unwrap(),
319                parsed_from: expr.to_owned(),
320            }
321        }
322
323        let job = Job {
324            name: "my-job".to_owned(),
325            trigger: JobTrigger::Cron(parse_cron("0/2 12 * JAN-APR 2")),
326            timeout: Some("1m".parse().unwrap()),
327            max_schedule_drift: Some("2h".parse().unwrap()),
328            jitter_percent_max: None,
329            jitter_percent_min: None,
330            retries: None,
331            action: JobAction {
332                action: JobActionCase::Execute(super::ExecutableJob {
333                    package: Some(crate::package::PackageSource::Ident(
334                        crate::package::PackageIdent::Named(crate::package::NamedPackageIdent {
335                            registry: None,
336                            namespace: Some("ns".to_owned()),
337                            name: "pkg".to_owned(),
338                            tag: None,
339                        }),
340                    )),
341                    command: Some("cmd".to_owned()),
342                    cli_args: Some(vec!["arg-1".to_owned(), "arg-2".to_owned()]),
343                    env: Some([("VAR1".to_owned(), "Value".to_owned())].into()),
344                    capabilities: Some(super::ExecutableJobCompatibilityMapV1 {
345                        memory: Some(crate::app::AppConfigCapabilityMemoryV1 {
346                            limit: Some(bytesize::ByteSize::gb(1)),
347                        }),
348                        other: Default::default(),
349                    }),
350                    volumes: Some(vec![crate::app::AppVolume {
351                        name: "vol".to_owned(),
352                        mount: "/path/to/volume".to_owned(),
353                    }]),
354                }),
355            },
356            other: Default::default(),
357        };
358
359        let serialized = r#"
360name: my-job
361trigger: 0/2 12 * JAN-APR 2
362timeout: 1m
363max_schedule_drift: 2h
364action:
365  execute:
366    package: ns/pkg
367    command: cmd
368    cli_args:
369    - arg-1
370    - arg-2
371    env:
372      VAR1: Value
373    capabilities:
374      memory:
375        limit: 1000.0 MB
376    volumes:
377    - name: vol
378      mount: /path/to/volume"#;
379
380        assert_eq!(
381            serialized.trim(),
382            serde_yaml::to_string(&job).unwrap().trim()
383        );
384        assert_eq!(job, serde_yaml::from_str(serialized).unwrap());
385    }
386}