cloacina_workflow/task.rs
1/*
2 * Copyright 2025 Colliery Software
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//! # Task Trait and State
18//!
19//! This module provides the core `Task` trait and `TaskState` enum for defining
20//! executable tasks in Cloacina workflows.
21
22use crate::context::Context;
23use crate::error::{CheckpointError, TaskError};
24use crate::namespace::TaskNamespace;
25use crate::retry::RetryPolicy;
26use async_trait::async_trait;
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29
30/// Represents the execution state of a task throughout its lifecycle.
31///
32/// Tasks progress through these states during execution, providing visibility
33/// into the current status and enabling proper error handling and recovery.
34///
35/// # State Transitions
36///
37/// - `Pending` -> `Running`: When task execution begins
38/// - `Running` -> `Completed`: When task completes successfully
39/// - `Running` -> `Failed`: When task encounters an error
40/// - `Failed` -> `Running`: When task is retried
41///
42/// Terminal states (`Completed` and `Failed`) do not transition to other states
43/// unless a retry is attempted.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub enum TaskState {
46 /// Task is registered but not yet started
47 Pending,
48 /// Task is currently executing
49 Running { start_time: DateTime<Utc> },
50 /// Task finished successfully
51 Completed { completion_time: DateTime<Utc> },
52 /// Task encountered an error
53 Failed {
54 error: String,
55 failure_time: DateTime<Utc>,
56 },
57 /// Task was skipped (e.g., trigger rule not satisfied)
58 Skipped {
59 reason: String,
60 skip_time: DateTime<Utc>,
61 },
62}
63
64impl TaskState {
65 /// Returns true if the task is in the completed state
66 pub fn is_completed(&self) -> bool {
67 matches!(self, TaskState::Completed { .. })
68 }
69
70 /// Returns true if the task is in the failed state
71 pub fn is_failed(&self) -> bool {
72 matches!(self, TaskState::Failed { .. })
73 }
74
75 /// Returns true if the task is currently running
76 pub fn is_running(&self) -> bool {
77 matches!(self, TaskState::Running { .. })
78 }
79
80 /// Returns true if the task is pending execution
81 pub fn is_pending(&self) -> bool {
82 matches!(self, TaskState::Pending)
83 }
84
85 /// Returns true if the task was skipped
86 pub fn is_skipped(&self) -> bool {
87 matches!(self, TaskState::Skipped { .. })
88 }
89}
90
91/// Core trait that defines an executable task in a pipeline.
92///
93/// Tasks are the fundamental units of work in Cloacina. Most users should use the
94/// `#[task]` macro instead of implementing this trait directly, as the macro provides
95/// automatic registration, code fingerprinting, and convenient syntax.
96///
97/// # Task Execution Model
98///
99/// Tasks follow a simple but powerful execution model:
100///
101/// 1. **Input**: Receive a context containing data from previous tasks
102/// 2. **Processing**: Execute the task's business logic
103/// 3. **Output**: Update the context with results
104/// 4. **Completion**: Return success or failure
105///
106/// # Using the Macro (Recommended)
107///
108/// ```rust,ignore
109/// use cloacina_workflow::*;
110///
111/// #[task(id = "my_task", dependencies = [])]
112/// async fn my_task(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
113/// // Your task logic here
114/// Ok(())
115/// }
116/// ```
117#[async_trait]
118pub trait Task: Send + Sync {
119 /// Executes the task with the provided context.
120 ///
121 /// This is the main entry point for task execution. The method receives
122 /// a context containing data from previous tasks and should return an
123 /// updated context with any new or modified data.
124 ///
125 /// # Arguments
126 ///
127 /// * `context` - The execution context containing task data
128 ///
129 /// # Returns
130 ///
131 /// * `Ok(Context)` - Updated context with task results
132 /// * `Err(TaskError)` - If the task execution fails
133 async fn execute(
134 &self,
135 context: Context<serde_json::Value>,
136 ) -> Result<Context<serde_json::Value>, TaskError>;
137
138 /// Returns the unique identifier for this task.
139 ///
140 /// The task ID must be unique within a Workflow or TaskRegistry.
141 /// It's used for dependency resolution and task lookup.
142 fn id(&self) -> &str;
143
144 /// Returns the list of task namespaces that this task depends on.
145 ///
146 /// Dependencies define the execution order - this task will only
147 /// execute after all its dependencies have completed successfully.
148 fn dependencies(&self) -> &[TaskNamespace];
149
150 /// Saves a checkpoint for this task.
151 ///
152 /// This method is called to save intermediate state during task execution.
153 /// The default implementation is a no-op, but tasks can override this
154 /// to implement custom checkpointing logic.
155 ///
156 /// # Arguments
157 ///
158 /// * `context` - The current execution context
159 ///
160 /// # Returns
161 ///
162 /// * `Ok(())` - If checkpointing succeeds
163 /// * `Err(CheckpointError)` - If checkpointing fails
164 fn checkpoint(&self, _context: &Context<serde_json::Value>) -> Result<(), CheckpointError> {
165 // Default implementation - tasks can override for custom checkpointing
166 Ok(())
167 }
168
169 /// Returns the retry policy for this task.
170 ///
171 /// This method defines how the task should behave when it fails, including
172 /// the number of retry attempts, backoff strategy, and conditions under
173 /// which retries should be attempted.
174 ///
175 /// The default implementation returns a sensible production-ready policy
176 /// with exponential backoff and 3 retry attempts.
177 fn retry_policy(&self) -> RetryPolicy {
178 RetryPolicy::default()
179 }
180
181 /// Returns the trigger rules for this task.
182 ///
183 /// Trigger rules define the conditions under which this task should execute
184 /// beyond simple dependency satisfaction. The default implementation returns
185 /// an "Always" trigger rule, meaning the task executes whenever its dependencies
186 /// are satisfied.
187 ///
188 /// # Returns
189 ///
190 /// A JSON value representing the trigger rules for this task.
191 fn trigger_rules(&self) -> serde_json::Value {
192 serde_json::json!({"type": "Always"})
193 }
194
195 /// Returns a code fingerprint for content-based versioning.
196 ///
197 /// This method should return a hash of the task's implementation code,
198 /// enabling automatic detection of changes for Workflow versioning.
199 ///
200 /// The default implementation returns None, indicating that the task
201 /// doesn't support code fingerprinting. Tasks generated by the `#[task]`
202 /// macro automatically provide fingerprints.
203 ///
204 /// # Returns
205 ///
206 /// - `Some(String)` - A hex-encoded hash of the task's code content
207 /// - `None` - Task doesn't support code fingerprinting
208 fn code_fingerprint(&self) -> Option<String> {
209 None
210 }
211}