Skip to main content

actionqueue_core/task/
task_spec.rs

1//! Task specification - the user's durable intent.
2
3use crate::ids::{TaskId, TenantId};
4use crate::task::constraints::{TaskConstraints, TaskConstraintsError};
5use crate::task::metadata::TaskMetadata;
6use crate::task::run_policy::{RunPolicy, RunPolicyError};
7
8/// Typed validation errors for [`TaskSpec`].
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum TaskSpecError {
11    /// The provided task identifier is nil/empty and therefore invalid at task admission.
12    InvalidTaskId {
13        /// Rejected task identifier.
14        task_id: TaskId,
15    },
16    /// The provided constraints payload violates task-constraint invariants.
17    InvalidConstraints(TaskConstraintsError),
18    /// The provided run policy violates contract-level run-policy invariants.
19    InvalidRunPolicy(RunPolicyError),
20}
21
22impl std::fmt::Display for TaskSpecError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            TaskSpecError::InvalidTaskId { task_id } => {
26                write!(f, "invalid task identifier for task admission: {task_id}")
27            }
28            TaskSpecError::InvalidConstraints(error) => {
29                write!(f, "invalid task constraints: {error}")
30            }
31            TaskSpecError::InvalidRunPolicy(error) => {
32                write!(f, "invalid run policy: {error}")
33            }
34        }
35    }
36}
37
38impl std::error::Error for TaskSpecError {}
39
40/// Opaque payload bytes bundled with an optional content-type hint.
41///
42/// Groups the two fields that are always set together when constructing or
43/// updating a task's executable content.
44#[derive(Debug, Clone, PartialEq, Eq)]
45#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
46pub struct TaskPayload {
47    /// Opaque payload bytes to be executed.
48    bytes: Vec<u8>,
49    /// Optional content type hint for the payload.
50    content_type: Option<String>,
51}
52
53impl TaskPayload {
54    /// Creates a new payload with no content-type hint.
55    pub fn new(bytes: Vec<u8>) -> Self {
56        Self { bytes, content_type: None }
57    }
58
59    /// Creates a new payload with a content-type hint.
60    pub fn with_content_type(bytes: Vec<u8>, content_type: impl Into<String>) -> Self {
61        Self { bytes, content_type: Some(content_type.into()) }
62    }
63
64    /// Returns the opaque payload bytes.
65    pub fn bytes(&self) -> &[u8] {
66        &self.bytes
67    }
68
69    /// Returns the optional content-type hint.
70    pub fn content_type(&self) -> Option<&str> {
71        self.content_type.as_deref()
72    }
73}
74
75/// The user's durable intent - what should be executed.
76#[derive(Debug, Clone, PartialEq, Eq)]
77#[cfg_attr(feature = "serde", derive(serde::Serialize))]
78#[must_use]
79pub struct TaskSpec {
80    /// Unique identifier for this task.
81    id: TaskId,
82    /// Opaque payload bytes and optional content-type hint.
83    payload: TaskPayload,
84    /// The run policy governs how many times this task should execute.
85    run_policy: RunPolicy,
86    /// Constraints that control execution behavior.
87    constraints: TaskConstraints,
88    /// Metadata for organization and priority.
89    metadata: TaskMetadata,
90    /// Optional parent task identifier for task hierarchy.
91    ///
92    /// When set, this task is a child of the named parent. The workflow
93    /// extension (`actionqueue-workflow`) enforces cascading cancellation and
94    /// completion gating based on this relationship.
95    #[cfg_attr(feature = "serde", serde(default))]
96    parent_task_id: Option<TaskId>,
97    /// Optional tenant identifier for multi-tenant isolation.
98    ///
99    /// When set, this task is scoped to the named tenant. The platform
100    /// extension (`actionqueue-platform`) enforces tenant isolation at query
101    /// and mutation boundaries. `None` in single-tenant mode.
102    #[cfg_attr(feature = "serde", serde(default))]
103    tenant_id: Option<TenantId>,
104}
105
106impl TaskSpec {
107    /// Creates a task specification with validated constraints.
108    ///
109    /// `parent_task_id` defaults to `None`. Use [`with_parent`](Self::with_parent)
110    /// to attach a parent before submission.
111    pub fn new(
112        id: TaskId,
113        payload: TaskPayload,
114        run_policy: RunPolicy,
115        constraints: TaskConstraints,
116        metadata: TaskMetadata,
117    ) -> Result<Self, TaskSpecError> {
118        let spec = Self {
119            id,
120            payload,
121            run_policy,
122            constraints,
123            metadata,
124            parent_task_id: None,
125            tenant_id: None,
126        };
127        spec.validate()?;
128        Ok(spec)
129    }
130
131    /// Attaches a parent task identifier, returning the modified spec.
132    ///
133    /// Used by the workflow extension to declare parent-child relationships.
134    /// The parent must exist in the projection when the child is submitted.
135    ///
136    /// Uses `debug_assert` (not `Result`) because this is only called from
137    /// internal workflow submission paths where the parent ID has already been
138    /// validated against the projection. A nil parent indicates a logic error
139    /// in the submission pipeline, not invalid user input.
140    pub fn with_parent(mut self, parent_task_id: TaskId) -> Self {
141        debug_assert!(!parent_task_id.is_nil());
142        self.parent_task_id = Some(parent_task_id);
143        self
144    }
145
146    /// Validates this task specification against invariant-sensitive checks.
147    pub fn validate(&self) -> Result<(), TaskSpecError> {
148        Self::validate_task_id(self.id)?;
149        self.run_policy.validate().map_err(TaskSpecError::InvalidRunPolicy)?;
150        self.constraints.validate().map_err(TaskSpecError::InvalidConstraints)
151    }
152
153    /// Validates task identifier shape invariants at task-admission boundaries.
154    fn validate_task_id(task_id: TaskId) -> Result<(), TaskSpecError> {
155        if task_id.is_nil() {
156            return Err(TaskSpecError::InvalidTaskId { task_id });
157        }
158
159        Ok(())
160    }
161
162    /// Returns this task's unique identifier.
163    pub fn id(&self) -> TaskId {
164        self.id
165    }
166
167    /// Returns the task payload bytes.
168    pub fn payload(&self) -> &[u8] {
169        self.payload.bytes()
170    }
171
172    /// Returns the optional payload content-type hint.
173    pub fn content_type(&self) -> Option<&str> {
174        self.payload.content_type()
175    }
176
177    /// Returns a reference to the full task payload (bytes + content-type).
178    pub fn task_payload(&self) -> &TaskPayload {
179        &self.payload
180    }
181
182    /// Returns the run policy snapshot for this task.
183    pub fn run_policy(&self) -> &RunPolicy {
184        &self.run_policy
185    }
186
187    /// Returns the constraints snapshot for this task.
188    pub fn constraints(&self) -> &TaskConstraints {
189        &self.constraints
190    }
191
192    /// Returns task metadata.
193    pub fn metadata(&self) -> &TaskMetadata {
194        &self.metadata
195    }
196
197    /// Returns the optional parent task identifier.
198    pub fn parent_task_id(&self) -> Option<TaskId> {
199        self.parent_task_id
200    }
201
202    /// Returns the optional tenant identifier.
203    pub fn tenant_id(&self) -> Option<TenantId> {
204        self.tenant_id
205    }
206
207    /// Attaches a tenant identifier, returning the modified spec.
208    pub fn with_tenant(mut self, tenant_id: TenantId) -> Self {
209        self.tenant_id = Some(tenant_id);
210        self
211    }
212
213    /// Replaces task constraints after validating invariants.
214    pub fn set_constraints(&mut self, constraints: TaskConstraints) -> Result<(), TaskSpecError> {
215        Self::validate_task_id(self.id)?;
216        constraints.validate().map_err(TaskSpecError::InvalidConstraints)?;
217        self.constraints = constraints;
218        Ok(())
219    }
220
221    /// Replaces task metadata.
222    pub fn set_metadata(&mut self, metadata: TaskMetadata) {
223        self.metadata = metadata;
224    }
225
226    /// Replaces payload bytes and optional content-type hint.
227    pub fn set_payload(&mut self, payload: TaskPayload) {
228        self.payload = payload;
229    }
230
231    /// Replaces run policy after validating policy-shape invariants.
232    pub fn set_run_policy(&mut self, run_policy: RunPolicy) -> Result<(), TaskSpecError> {
233        Self::validate_task_id(self.id)?;
234        run_policy.validate().map_err(TaskSpecError::InvalidRunPolicy)?;
235        self.run_policy = run_policy;
236        Ok(())
237    }
238}
239
240#[cfg(feature = "serde")]
241impl<'de> serde::Deserialize<'de> for TaskSpec {
242    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
243    where
244        D: serde::Deserializer<'de>,
245    {
246        #[derive(serde::Deserialize)]
247        struct TaskSpecWire {
248            id: TaskId,
249            payload: TaskPayload,
250            run_policy: RunPolicy,
251            constraints: TaskConstraints,
252            metadata: TaskMetadata,
253            #[serde(default)]
254            parent_task_id: Option<TaskId>,
255            #[serde(default)]
256            tenant_id: Option<TenantId>,
257        }
258
259        let wire = TaskSpecWire::deserialize(deserializer)?;
260        let spec = TaskSpec {
261            id: wire.id,
262            payload: wire.payload,
263            run_policy: wire.run_policy,
264            constraints: wire.constraints,
265            metadata: wire.metadata,
266            parent_task_id: wire.parent_task_id,
267            tenant_id: wire.tenant_id,
268        };
269        spec.validate().map_err(serde::de::Error::custom)?;
270        Ok(spec)
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use uuid::Uuid;
277
278    use crate::ids::TaskId;
279    use crate::task::constraints::TaskConstraints;
280    use crate::task::metadata::TaskMetadata;
281    use crate::task::run_policy::{RepeatPolicy, RunPolicy, RunPolicyError};
282    use crate::task::task_spec::{TaskPayload, TaskSpec, TaskSpecError};
283
284    #[test]
285    fn task_spec_new_rejects_nil_task_id() {
286        let task_id = TaskId::from_uuid(Uuid::nil());
287
288        let result = TaskSpec::new(
289            task_id,
290            TaskPayload::new(b"payload".to_vec()),
291            RunPolicy::Once,
292            TaskConstraints::default(),
293            TaskMetadata::default(),
294        );
295
296        assert_eq!(result, Err(TaskSpecError::InvalidTaskId { task_id }));
297    }
298
299    #[test]
300    fn task_spec_validate_rejects_nil_task_id_from_externally_shaped_payload() {
301        let task_id = TaskId::from_uuid(Uuid::nil());
302        let externally_shaped = TaskSpec {
303            id: task_id,
304            payload: TaskPayload::new(b"payload".to_vec()),
305            run_policy: RunPolicy::Once,
306            constraints: TaskConstraints::default(),
307            metadata: TaskMetadata::default(),
308            parent_task_id: None,
309            tenant_id: None,
310        };
311
312        assert_eq!(externally_shaped.validate(), Err(TaskSpecError::InvalidTaskId { task_id }));
313    }
314
315    #[test]
316    fn repeat_policy_rejects_zero_count() {
317        let result = RepeatPolicy::new(0, 60);
318        assert_eq!(result, Err(RunPolicyError::InvalidRepeatCount { count: 0 }));
319    }
320
321    #[test]
322    fn repeat_policy_rejects_zero_interval() {
323        let result = RepeatPolicy::new(3, 0);
324        assert_eq!(result, Err(RunPolicyError::InvalidRepeatIntervalSecs { interval_secs: 0 }));
325    }
326
327    #[test]
328    fn set_run_policy_accepts_valid_repeat_payload() {
329        let mut task_spec = TaskSpec::new(
330            TaskId::new(),
331            TaskPayload::new(b"payload".to_vec()),
332            RunPolicy::Once,
333            TaskConstraints::default(),
334            TaskMetadata::default(),
335        )
336        .expect("baseline task spec should be valid");
337
338        task_spec
339            .set_run_policy(RunPolicy::repeat(4, 30).expect("repeat policy should be valid"))
340            .expect("run policy mutation should succeed");
341
342        assert_eq!(task_spec.run_policy(), &RunPolicy::Repeat(RepeatPolicy::new(4, 30).unwrap()));
343    }
344
345    #[test]
346    fn set_run_policy_rejects_mutation_for_nil_task_id() {
347        let task_id = TaskId::from_uuid(Uuid::nil());
348        let mut externally_shaped = TaskSpec {
349            id: task_id,
350            payload: TaskPayload::new(b"payload".to_vec()),
351            run_policy: RunPolicy::Once,
352            constraints: TaskConstraints::default(),
353            metadata: TaskMetadata::default(),
354            parent_task_id: None,
355            tenant_id: None,
356        };
357
358        let result = externally_shaped
359            .set_run_policy(RunPolicy::repeat(2, 60).expect("repeat policy should be valid"));
360
361        assert_eq!(result, Err(TaskSpecError::InvalidTaskId { task_id }));
362        assert_eq!(externally_shaped.run_policy(), &RunPolicy::Once);
363    }
364}