cloacina_workflow/
task.rs

1/*
2 *  Copyright 2025 Colliery Software
3 *
4 *  Licensed under the Apache License, Version 2.0 (the "License");
5 *  you may not use this file except in compliance with the License.
6 *  You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *  Unless required by applicable law or agreed to in writing, software
11 *  distributed under the License is distributed on an "AS IS" BASIS,
12 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *  See the License for the specific language governing permissions and
14 *  limitations under the License.
15 */
16
17//! # Task Trait and State
18//!
19//! This module provides the core `Task` trait and `TaskState` enum for defining
20//! executable tasks in Cloacina workflows.
21
22use crate::context::Context;
23use crate::error::{CheckpointError, TaskError};
24use crate::namespace::TaskNamespace;
25use crate::retry::RetryPolicy;
26use async_trait::async_trait;
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29
30/// Represents the execution state of a task throughout its lifecycle.
31///
32/// Tasks progress through these states during execution, providing visibility
33/// into the current status and enabling proper error handling and recovery.
34///
35/// # State Transitions
36///
37/// - `Pending` -> `Running`: When task execution begins
38/// - `Running` -> `Completed`: When task completes successfully
39/// - `Running` -> `Failed`: When task encounters an error
40/// - `Failed` -> `Running`: When task is retried
41///
42/// Terminal states (`Completed` and `Failed`) do not transition to other states
43/// unless a retry is attempted.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub enum TaskState {
46    /// Task is registered but not yet started
47    Pending,
48    /// Task is currently executing
49    Running { start_time: DateTime<Utc> },
50    /// Task finished successfully
51    Completed { completion_time: DateTime<Utc> },
52    /// Task encountered an error
53    Failed {
54        error: String,
55        failure_time: DateTime<Utc>,
56    },
57    /// Task was skipped (e.g., trigger rule not satisfied)
58    Skipped {
59        reason: String,
60        skip_time: DateTime<Utc>,
61    },
62}
63
64impl TaskState {
65    /// Returns true if the task is in the completed state
66    pub fn is_completed(&self) -> bool {
67        matches!(self, TaskState::Completed { .. })
68    }
69
70    /// Returns true if the task is in the failed state
71    pub fn is_failed(&self) -> bool {
72        matches!(self, TaskState::Failed { .. })
73    }
74
75    /// Returns true if the task is currently running
76    pub fn is_running(&self) -> bool {
77        matches!(self, TaskState::Running { .. })
78    }
79
80    /// Returns true if the task is pending execution
81    pub fn is_pending(&self) -> bool {
82        matches!(self, TaskState::Pending)
83    }
84
85    /// Returns true if the task was skipped
86    pub fn is_skipped(&self) -> bool {
87        matches!(self, TaskState::Skipped { .. })
88    }
89}
90
91/// Core trait that defines an executable task in a pipeline.
92///
93/// Tasks are the fundamental units of work in Cloacina. Most users should use the
94/// `#[task]` macro instead of implementing this trait directly, as the macro provides
95/// automatic registration, code fingerprinting, and convenient syntax.
96///
97/// # Task Execution Model
98///
99/// Tasks follow a simple but powerful execution model:
100///
101/// 1. **Input**: Receive a context containing data from previous tasks
102/// 2. **Processing**: Execute the task's business logic
103/// 3. **Output**: Update the context with results
104/// 4. **Completion**: Return success or failure
105///
106/// # Using the Macro (Recommended)
107///
108/// ```rust,ignore
109/// use cloacina_workflow::*;
110///
111/// #[task(id = "my_task", dependencies = [])]
112/// async fn my_task(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
113///     // Your task logic here
114///     Ok(())
115/// }
116/// ```
117#[async_trait]
118pub trait Task: Send + Sync {
119    /// Executes the task with the provided context.
120    ///
121    /// This is the main entry point for task execution. The method receives
122    /// a context containing data from previous tasks and should return an
123    /// updated context with any new or modified data.
124    ///
125    /// # Arguments
126    ///
127    /// * `context` - The execution context containing task data
128    ///
129    /// # Returns
130    ///
131    /// * `Ok(Context)` - Updated context with task results
132    /// * `Err(TaskError)` - If the task execution fails
133    async fn execute(
134        &self,
135        context: Context<serde_json::Value>,
136    ) -> Result<Context<serde_json::Value>, TaskError>;
137
138    /// Returns the unique identifier for this task.
139    ///
140    /// The task ID must be unique within a Workflow or TaskRegistry.
141    /// It's used for dependency resolution and task lookup.
142    fn id(&self) -> &str;
143
144    /// Returns the list of task namespaces that this task depends on.
145    ///
146    /// Dependencies define the execution order - this task will only
147    /// execute after all its dependencies have completed successfully.
148    fn dependencies(&self) -> &[TaskNamespace];
149
150    /// Saves a checkpoint for this task.
151    ///
152    /// This method is called to save intermediate state during task execution.
153    /// The default implementation is a no-op, but tasks can override this
154    /// to implement custom checkpointing logic.
155    ///
156    /// # Arguments
157    ///
158    /// * `context` - The current execution context
159    ///
160    /// # Returns
161    ///
162    /// * `Ok(())` - If checkpointing succeeds
163    /// * `Err(CheckpointError)` - If checkpointing fails
164    fn checkpoint(&self, _context: &Context<serde_json::Value>) -> Result<(), CheckpointError> {
165        // Default implementation - tasks can override for custom checkpointing
166        Ok(())
167    }
168
169    /// Returns the retry policy for this task.
170    ///
171    /// This method defines how the task should behave when it fails, including
172    /// the number of retry attempts, backoff strategy, and conditions under
173    /// which retries should be attempted.
174    ///
175    /// The default implementation returns a sensible production-ready policy
176    /// with exponential backoff and 3 retry attempts.
177    fn retry_policy(&self) -> RetryPolicy {
178        RetryPolicy::default()
179    }
180
181    /// Returns the trigger rules for this task.
182    ///
183    /// Trigger rules define the conditions under which this task should execute
184    /// beyond simple dependency satisfaction. The default implementation returns
185    /// an "Always" trigger rule, meaning the task executes whenever its dependencies
186    /// are satisfied.
187    ///
188    /// # Returns
189    ///
190    /// A JSON value representing the trigger rules for this task.
191    fn trigger_rules(&self) -> serde_json::Value {
192        serde_json::json!({"type": "Always"})
193    }
194
195    /// Returns a code fingerprint for content-based versioning.
196    ///
197    /// This method should return a hash of the task's implementation code,
198    /// enabling automatic detection of changes for Workflow versioning.
199    ///
200    /// The default implementation returns None, indicating that the task
201    /// doesn't support code fingerprinting. Tasks generated by the `#[task]`
202    /// macro automatically provide fingerprints.
203    ///
204    /// # Returns
205    ///
206    /// - `Some(String)` - A hex-encoded hash of the task's code content
207    /// - `None` - Task doesn't support code fingerprinting
208    fn code_fingerprint(&self) -> Option<String> {
209        None
210    }
211}