Skip to main content

solti_model/resource/
spec.rs

1//! # Task specification.
2//!
3//! [`TaskSpec`] defines the desired state; constructed via [`TaskSpecBuilder`].
4
5use serde::{Deserialize, Serialize};
6
7use crate::{
8    AdmissionPolicy, BackoffPolicy, Labels, RestartPolicy, RunnerSelector, Slot, TaskKind, Timeout,
9    error::{ModelError, ModelResult},
10};
11
12/// Desired state specification.
13///
14/// `TaskSpec` describes *what* should be run and *how* it should be managed by the runtime.
15///
16/// Fields cover:
17/// - logical grouping (`slot`)
18/// - execution backend (`kind`)
19/// - concurrency control (`admission`)
20/// - lifecycle policies (`timeout`, `restart`, `backoff`)
21///
22/// ## Also
23///
24/// - [`TaskSpecBuilder`] validated builder (via [`TaskSpec::builder`]).
25/// - [`TaskKind`] execution backend variants.
26/// - [`RestartPolicy`] / [`BackoffPolicy`] lifecycle policies.
27/// - [`AdmissionPolicy`] duplicate handling.
28/// - [`RunnerSelector`] label-based runner routing.
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31#[serde(try_from = "raw::TaskSpecRaw")]
32pub struct TaskSpec {
33    slot: Slot,
34    kind: TaskKind,
35
36    timeout: Timeout,
37    restart: RestartPolicy,
38    backoff: BackoffPolicy,
39    admission: AdmissionPolicy,
40
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    runner_selector: Option<RunnerSelector>,
43    #[serde(default, skip_serializing_if = "Labels::is_empty")]
44    labels: Labels,
45}
46
47impl TaskSpec {
48    /// Logical slot name for concurrency control.
49    #[inline]
50    pub fn slot(&self) -> &Slot {
51        &self.slot
52    }
53
54    /// Execution backend used to run the task.
55    #[inline]
56    pub fn kind(&self) -> &TaskKind {
57        &self.kind
58    }
59
60    /// Hard timeout in milliseconds.
61    #[inline]
62    pub fn timeout(&self) -> Timeout {
63        self.timeout
64    }
65
66    /// Restart policy applied after completion or failure.
67    #[inline]
68    pub fn restart(&self) -> RestartPolicy {
69        self.restart
70    }
71
72    /// Backoff configuration between restart attempts.
73    #[inline]
74    pub fn backoff(&self) -> &BackoffPolicy {
75        &self.backoff
76    }
77
78    /// Admission policy for handling slot conflicts.
79    #[inline]
80    pub fn admission(&self) -> AdmissionPolicy {
81        self.admission
82    }
83
84    /// Label selector for runner routing (if present).
85    #[inline]
86    pub fn runner_selector(&self) -> Option<&RunnerSelector> {
87        self.runner_selector.as_ref()
88    }
89
90    /// Metadata labels for routing / scheduling / observability.
91    #[inline]
92    pub fn labels(&self) -> &Labels {
93        &self.labels
94    }
95}
96
97impl TaskSpec {
98    /// Create a [`TaskSpecBuilder`] with the three required fields.
99    ///
100    /// ```rust
101    /// use solti_model::{TaskSpec, TaskKind, SubprocessSpec, SubprocessMode, RestartPolicy};
102    ///
103    /// let spec = TaskSpec::builder(
104    ///     "my-slot",
105    ///     TaskKind::Subprocess(SubprocessSpec {
106    ///         mode: SubprocessMode::Command {
107    ///             command: "echo".into(),
108    ///             args: vec!["hello".into()],
109    ///         },
110    ///         env: Default::default(),
111    ///         cwd: None,
112    ///         fail_on_non_zero: Default::default(),
113    ///     }),
114    ///     5_000u64,
115    /// )
116    /// .restart(RestartPolicy::OnFailure)
117    /// .build()
118    /// .expect("valid spec");
119    /// ```
120    pub fn builder(
121        slot: impl Into<Slot>,
122        kind: TaskKind,
123        timeout: impl Into<Timeout>,
124    ) -> TaskSpecBuilder {
125        TaskSpecBuilder::new(slot, kind, timeout)
126    }
127}
128
129impl TaskSpec {
130    /// Attach a runner selector used by the router (consuming builder-style).
131    #[inline]
132    pub fn with_runner_selector(mut self, sel: RunnerSelector) -> Self {
133        self.runner_selector = Some(sel);
134        self
135    }
136}
137
138impl TaskSpec {
139    /// Validate the spec at the **submit boundary**.
140    pub fn validate(&self) -> ModelResult<()> {
141        self.validate_structural()?;
142        if matches!(self.kind, TaskKind::Embedded) {
143            return Err(ModelError::Invalid(
144                "TaskKind::Embedded cannot be submitted via runner; use submit_with_task".into(),
145            ));
146        }
147        Ok(())
148    }
149
150    /// Structural validation of all fields.
151    ///
152    /// Checks:
153    /// - `slot` is not empty
154    /// - `timeout` is greater than zero
155    /// - `kind` specific constraints (e.g. non-empty command)
156    /// - `backoff` parameters are sane
157    /// - `runner_selector` requirements are structurally valid
158    fn validate_structural(&self) -> ModelResult<()> {
159        self.slot.validate_format()?;
160        if self.timeout.as_millis() == 0 {
161            return Err(ModelError::Invalid(
162                "timeout must be greater than zero".into(),
163            ));
164        }
165        self.kind.validate()?;
166        self.backoff.validate()?;
167        if let Some(ref sel) = self.runner_selector {
168            for req in &sel.match_expressions {
169                req.validate()?;
170            }
171        }
172        Ok(())
173    }
174}
175
176/// Builder for [`TaskSpec`] that validates structural invariants on [`build`](TaskSpecBuilder::build).
177///
178/// Required fields (`slot`, `kind`, `timeout`) are set in the constructor.
179/// Optional fields have sensible defaults:
180/// - `backoff`: [`BackoffPolicy::default`] (full jitter, 1 s → 30 s, factor 2)
181/// - `admission`: [`AdmissionPolicy::DropIfRunning`]
182/// - `restart`: [`RestartPolicy::Never`]
183/// - `runner_selector`: `None`
184/// - `labels`: empty
185///
186/// ## Also
187///
188/// - [`TaskSpec::builder`] entry point.
189/// - [`TaskSpec::validate`] submit-boundary validation (rejects `Embedded`).
190pub struct TaskSpecBuilder {
191    runner_selector: Option<RunnerSelector>,
192
193    kind: TaskKind,
194    slot: Slot,
195
196    backoff: BackoffPolicy,
197    restart: RestartPolicy,
198    timeout: Timeout,
199
200    admission: AdmissionPolicy,
201    labels: Labels,
202}
203
204impl TaskSpecBuilder {
205    fn new(slot: impl Into<Slot>, kind: TaskKind, timeout: impl Into<Timeout>) -> Self {
206        Self {
207            runner_selector: None,
208
209            kind,
210            slot: slot.into(),
211
212            restart: RestartPolicy::default(),
213            backoff: BackoffPolicy::default(),
214            timeout: timeout.into(),
215
216            admission: AdmissionPolicy::default(),
217            labels: Labels::new(),
218        }
219    }
220
221    /// Set restart policy.
222    #[must_use]
223    pub fn restart(mut self, restart: RestartPolicy) -> Self {
224        self.restart = restart;
225        self
226    }
227
228    /// Set backoff configuration.
229    #[must_use]
230    pub fn backoff(mut self, backoff: BackoffPolicy) -> Self {
231        self.backoff = backoff;
232        self
233    }
234
235    /// Set admission policy.
236    #[must_use]
237    pub fn admission(mut self, admission: AdmissionPolicy) -> Self {
238        self.admission = admission;
239        self
240    }
241
242    /// Set runner selector.
243    #[must_use]
244    pub fn runner_selector(mut self, sel: RunnerSelector) -> Self {
245        self.runner_selector = Some(sel);
246        self
247    }
248
249    /// Set metadata labels.
250    #[must_use]
251    pub fn labels(mut self, labels: Labels) -> Self {
252        self.labels = labels;
253        self
254    }
255
256    /// Build the [`TaskSpec`], validating structural invariants.
257    ///
258    /// This checks everything **except** the [`TaskKind::Embedded`] business rule
259    /// (which is enforced at the submit boundary by [`TaskSpec::validate`]).
260    ///
261    /// # Errors
262    ///
263    /// Returns [`ModelError::Invalid`] if:
264    /// - `slot` is empty
265    /// - `timeout` is zero
266    /// - `kind` fails kind-specific validation
267    /// - `backoff` parameters are invalid
268    /// - `runner_selector` requirements are invalid
269    pub fn build(self) -> ModelResult<TaskSpec> {
270        let spec = TaskSpec {
271            runner_selector: self.runner_selector,
272
273            kind: self.kind,
274            slot: self.slot,
275
276            restart: self.restart,
277            backoff: self.backoff,
278            timeout: self.timeout,
279
280            admission: self.admission,
281            labels: self.labels,
282        };
283        spec.validate_structural()?;
284        Ok(spec)
285    }
286}
287
288mod raw {
289    use super::*;
290
291    #[derive(Deserialize)]
292    #[serde(rename_all = "camelCase")]
293    pub(super) struct TaskSpecRaw {
294        slot: Slot,
295        kind: TaskKind,
296        timeout: Timeout,
297        restart: RestartPolicy,
298        backoff: BackoffPolicy,
299        admission: AdmissionPolicy,
300
301        #[serde(default)]
302        labels: Labels,
303        #[serde(default)]
304        runner_selector: Option<RunnerSelector>,
305    }
306
307    impl TryFrom<TaskSpecRaw> for TaskSpec {
308        type Error = ModelError;
309
310        fn try_from(r: TaskSpecRaw) -> Result<Self, Self::Error> {
311            let spec = Self {
312                runner_selector: r.runner_selector,
313
314                kind: r.kind,
315                slot: r.slot,
316
317                restart: r.restart,
318                backoff: r.backoff,
319                timeout: r.timeout,
320
321                admission: r.admission,
322                labels: r.labels,
323            };
324            spec.validate_structural()?;
325            Ok(spec)
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::{Flag, SubprocessMode, SubprocessSpec, TaskEnv};
334
335    fn valid_spec() -> TaskSpec {
336        TaskSpec::builder(
337            "test",
338            TaskKind::Subprocess(SubprocessSpec {
339                mode: SubprocessMode::Command {
340                    command: "echo".into(),
341                    args: vec![],
342                },
343                env: TaskEnv::default(),
344                cwd: None,
345                fail_on_non_zero: Flag::enabled(),
346            }),
347            5_000u64,
348        )
349        .build()
350        .expect("test spec must be valid")
351    }
352
353    #[test]
354    fn valid_spec_passes() {
355        assert!(valid_spec().validate().is_ok());
356    }
357
358    #[test]
359    fn builder_rejects_empty_slot() {
360        let err = TaskSpec::builder("", TaskKind::Embedded, 5_000u64)
361            .build()
362            .unwrap_err();
363        assert!(err.to_string().contains("slot"));
364    }
365
366    #[test]
367    fn builder_rejects_zero_timeout() {
368        let err = TaskSpec::builder("test", TaskKind::Embedded, 0u64)
369            .build()
370            .unwrap_err();
371        assert!(err.to_string().contains("timeout"));
372    }
373
374    #[test]
375    fn builder_allows_embedded_kind() {
376        let spec = TaskSpec::builder("test", TaskKind::Embedded, 5_000u64)
377            .build()
378            .expect("Embedded is structurally valid");
379        assert!(matches!(spec.kind(), TaskKind::Embedded));
380    }
381
382    #[test]
383    fn validate_rejects_embedded_kind() {
384        let spec = TaskSpec::builder("test", TaskKind::Embedded, 5_000u64)
385            .build()
386            .unwrap();
387        let err = spec.validate().unwrap_err();
388        assert!(err.to_string().contains("TaskKind::Embedded"));
389    }
390
391    #[test]
392    fn getters_return_expected_values() {
393        let spec = TaskSpec::builder("my-slot", TaskKind::Embedded, 10_000u64)
394            .restart(RestartPolicy::OnFailure)
395            .admission(AdmissionPolicy::Replace)
396            .build()
397            .unwrap();
398
399        assert_eq!(spec.slot(), "my-slot");
400        assert_eq!(spec.timeout().as_millis(), 10_000);
401        assert_eq!(spec.restart(), RestartPolicy::OnFailure);
402        assert_eq!(spec.admission(), AdmissionPolicy::Replace);
403    }
404
405    #[test]
406    fn serde_roundtrip() {
407        let spec = valid_spec();
408        let json = serde_json::to_string(&spec).unwrap();
409        let back: TaskSpec = serde_json::from_str(&json).unwrap();
410        assert_eq!(back, spec);
411    }
412
413    #[test]
414    fn serde_rejects_empty_slot() {
415        let spec = valid_spec();
416        let mut json: serde_json::Value = serde_json::to_value(&spec).unwrap();
417        json["slot"] = serde_json::Value::String(String::new());
418
419        let err = serde_json::from_value::<TaskSpec>(json).unwrap_err();
420        assert!(err.to_string().contains("slot"), "error: {err}");
421    }
422
423    #[test]
424    fn serde_rejects_zero_timeout() {
425        let spec = valid_spec();
426        let mut json: serde_json::Value = serde_json::to_value(&spec).unwrap();
427        json["timeout"] = serde_json::json!(0);
428
429        let err = serde_json::from_value::<TaskSpec>(json).unwrap_err();
430        assert!(err.to_string().contains("timeout"), "error: {err}");
431    }
432}