Skip to main content

solti_model/resource/
task.rs

1//! # Task resource.
2//!
3//! [`Task`] is the K8s-style aggregate: metadata + spec + status.
4
5use serde::{Deserialize, Serialize};
6
7use crate::{
8    Labels, ObjectMeta, Slot, TaskId, TaskPhase, TaskSpec, TaskStatus,
9    error::{ModelError, ModelResult},
10};
11
12/// Unified task resource.
13///
14/// Every task is represented as a single resource with three sections:
15/// - `status`   - observed state: current phase, attempts, errors ([`TaskStatus`])
16/// - `spec`     - desired state: what to run and how ([`TaskSpec`])
17/// - `metadata` - identity, versioning, timestamps ([`ObjectMeta`])
18///
19/// ## Also
20///
21/// - [`TaskSpec`] desired state (what to run, how to restart).
22/// - [`TaskStatus`] observed state (phase, attempt, exit code).
23/// - [`TaskRun`](crate::TaskRun) per-attempt execution record.
24/// - [`ObjectMeta`] identity and versioning.
25/// - [`TaskPhase`] lifecycle state machine.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "camelCase")]
28pub struct Task {
29    metadata: ObjectMeta,
30    status: TaskStatus,
31    spec: TaskSpec,
32}
33
34impl Task {
35    /// Create a new task in [`TaskPhase::Pending`] phase.
36    pub fn new(id: TaskId, spec: TaskSpec) -> Self {
37        Self {
38            metadata: ObjectMeta::new(id),
39            status: TaskStatus::pending(),
40            spec,
41        }
42    }
43
44    /// Resource metadata (identity, `resource_version`, timestamps).
45    #[inline]
46    pub fn metadata(&self) -> &ObjectMeta {
47        &self.metadata
48    }
49
50    /// Observed state (phase, attempt, exit code, error).
51    #[inline]
52    pub fn status(&self) -> &TaskStatus {
53        &self.status
54    }
55
56    /// Desired state (what to run, how to restart).
57    #[inline]
58    pub fn spec(&self) -> &TaskSpec {
59        &self.spec
60    }
61
62    /// Destructure into `(metadata, spec, status)`. Used by transport
63    /// layers that need owned fields for serialization into wire types.
64    #[inline]
65    pub fn into_parts(self) -> (ObjectMeta, TaskSpec, TaskStatus) {
66        (self.metadata, self.spec, self.status)
67    }
68
69    /// Transition the task into a new attempt: bumps attempt counter, sets phase to `Running`, clears error/exit_code.
70    pub fn transition_starting(&mut self) {
71        self.increment_attempt();
72        self.update_phase(TaskPhase::Running, None, None);
73    }
74
75    /// Transition the current attempt into a terminal phase with optional error and exit code.
76    ///
77    /// Rejects illegal transitions:
78    /// - target phase must be terminal (see [`TaskPhase::is_terminal`]);
79    ///   finishing into `Pending` or `Running` is a logic bug upstream.
80    ///
81    /// Bumps `resource_version`.
82    pub fn transition_finished(
83        &mut self,
84        phase: TaskPhase,
85        error: Option<String>,
86        exit_code: Option<i32>,
87    ) -> ModelResult<()> {
88        if !phase.is_terminal() {
89            return Err(ModelError::Invalid(
90                format!("transition_finished requires a terminal phase, got {phase}").into(),
91            ));
92        }
93        self.update_phase(phase, error, exit_code);
94        Ok(())
95    }
96
97    /// Raw phase setter.
98    pub(crate) fn update_phase(
99        &mut self,
100        phase: TaskPhase,
101        error: Option<String>,
102        exit_code: Option<i32>,
103    ) {
104        self.metadata.bump_resource_version();
105        self.status.phase = phase;
106        self.status.error = error;
107        self.status.exit_code = exit_code;
108    }
109
110    /// Raw attempt bump. Crate-private (see [`update_phase`]).
111    pub(crate) fn increment_attempt(&mut self) {
112        self.metadata.bump_resource_version();
113        self.status.attempt += 1;
114    }
115
116    /// Task identifier (shortcut for `metadata.id`).
117    #[inline]
118    pub fn id(&self) -> &TaskId {
119        &self.metadata.id
120    }
121
122    /// Slot (shortcut for `spec.slot()`).
123    #[inline]
124    pub fn slot(&self) -> &Slot {
125        self.spec.slot()
126    }
127
128    /// Labels (shortcut for `spec.labels()`).
129    #[inline]
130    pub fn labels(&self) -> &Labels {
131        self.spec.labels()
132    }
133
134    /// Current phase (shortcut for `status.phase`).
135    #[inline]
136    pub fn phase(&self) -> &TaskPhase {
137        &self.status.phase
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::TaskKind;
145
146    fn test_spec() -> TaskSpec {
147        TaskSpec::builder("slot-a", TaskKind::Embedded, 5_000u64)
148            .build()
149            .expect("test spec must be valid")
150    }
151
152    #[test]
153    fn new_creates_pending_task() {
154        let task = Task::new("task-1".into(), test_spec());
155
156        assert_eq!(task.status().phase, TaskPhase::Pending);
157        assert_eq!(task.metadata().resource_version, 1);
158        assert_eq!(task.metadata().id, "task-1");
159        assert!(task.status().error.is_none());
160        assert_eq!(task.status().attempt, 0);
161        assert_eq!(task.slot(), "slot-a");
162    }
163
164    #[test]
165    fn transition_starting_sets_running_and_bumps() {
166        let mut task = Task::new("task-1".into(), test_spec());
167        task.transition_starting();
168
169        assert_eq!(task.status().phase, TaskPhase::Running);
170        assert_eq!(task.status().attempt, 1);
171        assert_eq!(task.metadata().resource_version, 3);
172    }
173
174    #[test]
175    fn transition_finished_accepts_terminal_and_carries_error() {
176        let mut task = Task::new("task-1".into(), test_spec());
177        task.transition_starting();
178        task.transition_finished(TaskPhase::Failed, Some("boom".into()), Some(1))
179            .unwrap();
180
181        assert_eq!(task.status().phase, TaskPhase::Failed);
182        assert_eq!(task.status().error.as_deref(), Some("boom"));
183        assert_eq!(task.status().exit_code, Some(1));
184    }
185
186    #[test]
187    fn transition_finished_rejects_non_terminal_phase() {
188        let mut task = Task::new("task-1".into(), test_spec());
189        let err = task
190            .transition_finished(TaskPhase::Running, None, None)
191            .unwrap_err();
192        assert!(err.to_string().contains("terminal phase"));
193    }
194
195    #[test]
196    fn convenience_accessors() {
197        let spec = TaskSpec::builder("slot-1", TaskKind::Embedded, 5_000u64)
198            .build()
199            .unwrap();
200        let task = Task::new("id-1".into(), spec);
201
202        assert_eq!(task.slot(), &Slot::from("slot-1"));
203        assert_eq!(task.id(), &TaskId::from("id-1"));
204        assert_eq!(*task.phase(), TaskPhase::Pending);
205    }
206
207    #[test]
208    fn serde_roundtrip() {
209        let spec = TaskSpec::builder("slot-1", TaskKind::Embedded, 5_000u64)
210            .build()
211            .unwrap();
212        let task = Task::new("id-1".into(), spec);
213        let json = serde_json::to_string(&task).unwrap();
214        let back: Task = serde_json::from_str(&json).unwrap();
215
216        assert_eq!(back.status().phase, TaskPhase::Pending);
217        assert_eq!(back.metadata().resource_version, 1);
218        assert_eq!(back.metadata().id, "id-1");
219    }
220}