rs_dagcuter/lib.rs
1mod executor;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8use thiserror::Error;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11
12/// Type alias for task execution results
13/// Uses HashMap to allow flexible key-value output data
14pub type TaskResult = HashMap<String, serde_json::Value>;
15
16/// Type alias for task input parameters
17/// Provides flexibility for passing various data types between tasks
18pub type TaskInput = HashMap<String, serde_json::Value>;
19
20/// Type alias for thread-safe task references
21/// Arc enables sharing tasks across threads safely
22pub type BoxTask = Arc<dyn Task>;
23
24/// Comprehensive error types for task execution system
25/// Uses thiserror for automatic error trait implementations
26#[derive(Error, Debug)]
27pub enum Error {
28 #[error("Circular dependency detected")]
29 CircularDependency,
30 #[error("Task execution failed: {0}")]
31 TaskExecution(String),
32 #[error("Context cancelled: {0}")]
33 ContextCancelled(String),
34 #[error("Retry failed: {0}")]
35 RetryFailed(String),
36}
37
38/// Configuration for exponential backoff retry mechanism
39/// Implements intelligent retry logic with configurable parameters
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct RetryPolicy {
42 /// Initial delay between retry attempts
43 pub interval: Duration,
44 /// Maximum delay to prevent excessively long waits
45 pub max_interval: Duration,
46 /// Maximum number of retry attempts (-1 for unlimited)
47 pub max_attempts: i32,
48 /// Exponential backoff multiplier (2.0 = double delay each retry)
49 pub multiplier: f64,
50}
51
52impl Default for RetryPolicy {
53 /// Provides sensible defaults for retry behavior
54 /// Starts with 1s delay, max 30s, exponential backoff with 2x multiplier
55 fn default() -> Self {
56 Self {
57 interval: Duration::from_secs(1),
58 max_interval: Duration::from_secs(30),
59 max_attempts: -1, // Unlimited retries by default
60 multiplier: 2.0, // Exponential backoff
61 }
62 }
63}
64
65/// Retry mechanism implementation with exponential backoff
66/// Handles transient failures gracefully with configurable policies
67struct Retry {
68 policy: RetryPolicy,
69}
70
71impl Retry {
72 /// Creates a new retry instance with validated policy parameters
73 /// Applies sensible defaults and boundary checks to prevent invalid configurations
74 fn new(policy: Option<RetryPolicy>) -> Self {
75 let mut policy = policy.unwrap_or_default();
76
77 // Validate and set safe default values to prevent edge cases
78 if policy.interval.is_zero() {
79 policy.interval = Duration::from_secs(1);
80 }
81 if policy.max_interval.is_zero() {
82 policy.max_interval = Duration::from_secs(30);
83 }
84 if policy.multiplier <= 0.0 {
85 policy.multiplier = 2.0;
86 }
87 // Prevent extremely long delays that could hang the system
88 if policy.max_interval > Duration::from_secs(150) {
89 policy.max_interval = Duration::from_secs(150);
90 }
91
92 Self { policy }
93 }
94
95 /// Executes an operation with retry logic and exponential backoff
96 /// Generic over operation type to support any async function
97 /// Respects cancellation tokens for graceful shutdown
98 async fn execute_with_retry<F, Fut, T>(
99 &self,
100 ctx: CancellationToken,
101 task_name: &str,
102 mut operation: F,
103 ) -> Result<T, Error>
104 where
105 F: FnMut(i32) -> Fut,
106 Fut: std::future::Future<Output = Result<T, Error>>,
107 {
108 // If retries disabled, execute once
109 if self.policy.max_attempts <= 0 {
110 return operation(0).await;
111 }
112
113 let mut last_error = None;
114
115 // Attempt execution up to max_attempts times
116 for attempt in 1..=self.policy.max_attempts {
117 // Check for cancellation before each attempt
118 if ctx.is_cancelled() {
119 return Err(Error::ContextCancelled(format!(
120 "Context cancelled during retry attempt {}",
121 attempt
122 )));
123 }
124
125 match operation(attempt).await {
126 Ok(result) => return Ok(result), // Success - return immediately
127 Err(e) => last_error = Some(e), // Store error for final report
128 }
129
130 // Wait before next attempt (except after last attempt)
131 if attempt < self.policy.max_attempts {
132 let wait_time = self.calculate_backoff(attempt);
133 tokio::select! {
134 // Respect cancellation during wait
135 _ = ctx.cancelled() => {
136 return Err(Error::ContextCancelled(
137 "Context cancelled during retry wait".to_string()
138 ));
139 }
140 // Wait for calculated backoff duration
141 _ = sleep(wait_time) => {}
142 }
143 }
144 }
145
146 // All attempts failed - return comprehensive error
147 Err(Error::RetryFailed(format!(
148 "Task {} failed after {} attempts, last error: {:?}",
149 task_name, self.policy.max_attempts, last_error
150 )))
151 }
152
153 /// Calculates exponential backoff delay for given attempt number
154 /// Applies multiplier and respects maximum interval limit
155 fn calculate_backoff(&self, attempt: i32) -> Duration {
156 let backoff = self.policy.interval.as_secs_f64() * self.policy.multiplier.powi(attempt - 1);
157 let result = Duration::from_secs_f64(backoff);
158 // Ensure we don't exceed maximum interval
159 result.min(self.policy.max_interval)
160 }
161}
162
163/// Core trait defining task behavior and lifecycle
164/// All tasks must implement this trait to be executable by the DAG
165/// Provides hooks for custom pre/post processing logic
166#[async_trait]
167pub trait Task: Send + Sync {
168 /// Returns the unique name identifier for this task
169 fn name(&self) -> &str;
170
171 /// Returns list of task names this task depends on
172 /// Used to build the dependency graph
173 fn dependencies(&self) -> Vec<String>;
174
175 /// Returns retry policy for this specific task
176 /// None means use system defaults
177 fn retry_policy(&self) -> Option<RetryPolicy>;
178
179 /// Pre-execution hook called before main task execution
180 /// Useful for setup, validation, or logging
181 /// Default implementation does nothing
182 async fn pre_execution(
183 &self,
184 _ctx: CancellationToken,
185 _input: &TaskInput,
186 ) -> Result<(), Error> {
187 Ok(())
188 }
189
190 /// Main task execution logic - must be implemented by each task
191 /// Receives input from dependencies and returns results
192 /// This is the core business logic of the task
193 async fn execute(&self, ctx: CancellationToken, input: &TaskInput)
194 -> Result<TaskResult, Error>;
195
196 /// Post-execution hook called after successful main execution
197 /// Useful for cleanup, notifications, or result processing
198 /// Default implementation does nothing
199 async fn post_execution(
200 &self,
201 _ctx: CancellationToken,
202 _output: &TaskResult,
203 ) -> Result<(), Error> {
204 Ok(())
205 }
206}
207
208// Re-export the DAG executor for external use
209pub use crate::executor::Dag;