Skip to main content

rs_dagcuter/
lib.rs

1mod executor;
2
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8use thiserror::Error;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11
12/// Type alias for task execution results
13/// Uses HashMap to allow flexible key-value output data
14pub type TaskResult = HashMap<String, serde_json::Value>;
15
16/// Type alias for task input parameters
17/// Provides flexibility for passing various data types between tasks
18pub type TaskInput = HashMap<String, serde_json::Value>;
19
20/// Type alias for thread-safe task references
21/// Arc enables sharing tasks across threads safely
22pub type BoxTask = Arc<dyn Task>;
23
24/// Comprehensive error types for task execution system
25/// Uses thiserror for automatic error trait implementations
26#[derive(Error, Debug)]
27pub enum Error {
28    #[error("Circular dependency detected")]
29    CircularDependency,
30    #[error("Task execution failed: {0}")]
31    TaskExecution(String),
32    #[error("Context cancelled: {0}")]
33    ContextCancelled(String),
34    #[error("Retry failed: {0}")]
35    RetryFailed(String),
36}
37
38/// Configuration for exponential backoff retry mechanism
39/// Implements intelligent retry logic with configurable parameters
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct RetryPolicy {
42    /// Initial delay between retry attempts
43    pub interval: Duration,
44    /// Maximum delay to prevent excessively long waits
45    pub max_interval: Duration,
46    /// Maximum number of retry attempts (-1 for unlimited)
47    pub max_attempts: i32,
48    /// Exponential backoff multiplier (2.0 = double delay each retry)
49    pub multiplier: f64,
50}
51
52impl Default for RetryPolicy {
53    /// Provides sensible defaults for retry behavior
54    /// Starts with 1s delay, max 30s, exponential backoff with 2x multiplier
55    fn default() -> Self {
56        Self {
57            interval: Duration::from_secs(1),
58            max_interval: Duration::from_secs(30),
59            max_attempts: -1, // Unlimited retries by default
60            multiplier: 2.0,  // Exponential backoff
61        }
62    }
63}
64
65/// Retry mechanism implementation with exponential backoff
66/// Handles transient failures gracefully with configurable policies
67struct Retry {
68    policy: RetryPolicy,
69}
70
71impl Retry {
72    /// Creates a new retry instance with validated policy parameters
73    /// Applies sensible defaults and boundary checks to prevent invalid configurations
74    fn new(policy: Option<RetryPolicy>) -> Self {
75        let mut policy = policy.unwrap_or_default();
76
77        // Validate and set safe default values to prevent edge cases
78        if policy.interval.is_zero() {
79            policy.interval = Duration::from_secs(1);
80        }
81        if policy.max_interval.is_zero() {
82            policy.max_interval = Duration::from_secs(30);
83        }
84        if policy.multiplier <= 0.0 {
85            policy.multiplier = 2.0;
86        }
87        // Prevent extremely long delays that could hang the system
88        if policy.max_interval > Duration::from_secs(150) {
89            policy.max_interval = Duration::from_secs(150);
90        }
91
92        Self { policy }
93    }
94
95    /// Executes an operation with retry logic and exponential backoff
96    /// Generic over operation type to support any async function
97    /// Respects cancellation tokens for graceful shutdown
98    async fn execute_with_retry<F, Fut, T>(
99        &self,
100        ctx: CancellationToken,
101        task_name: &str,
102        mut operation: F,
103    ) -> Result<T, Error>
104    where
105        F: FnMut(i32) -> Fut,
106        Fut: std::future::Future<Output = Result<T, Error>>,
107    {
108        // If retries disabled, execute once
109        if self.policy.max_attempts <= 0 {
110            return operation(0).await;
111        }
112
113        let mut last_error = None;
114
115        // Attempt execution up to max_attempts times
116        for attempt in 1..=self.policy.max_attempts {
117            // Check for cancellation before each attempt
118            if ctx.is_cancelled() {
119                return Err(Error::ContextCancelled(format!(
120                    "Context cancelled during retry attempt {}",
121                    attempt
122                )));
123            }
124
125            match operation(attempt).await {
126                Ok(result) => return Ok(result), // Success - return immediately
127                Err(e) => last_error = Some(e),  // Store error for final report
128            }
129
130            // Wait before next attempt (except after last attempt)
131            if attempt < self.policy.max_attempts {
132                let wait_time = self.calculate_backoff(attempt);
133                tokio::select! {
134                    // Respect cancellation during wait
135                    _ = ctx.cancelled() => {
136                        return Err(Error::ContextCancelled(
137                            "Context cancelled during retry wait".to_string()
138                        ));
139                    }
140                    // Wait for calculated backoff duration
141                    _ = sleep(wait_time) => {}
142                }
143            }
144        }
145
146        // All attempts failed - return comprehensive error
147        Err(Error::RetryFailed(format!(
148            "Task {} failed after {} attempts, last error: {:?}",
149            task_name, self.policy.max_attempts, last_error
150        )))
151    }
152
153    /// Calculates exponential backoff delay for given attempt number
154    /// Applies multiplier and respects maximum interval limit
155    fn calculate_backoff(&self, attempt: i32) -> Duration {
156        let backoff = self.policy.interval.as_secs_f64() * self.policy.multiplier.powi(attempt - 1);
157        let result = Duration::from_secs_f64(backoff);
158        // Ensure we don't exceed maximum interval
159        result.min(self.policy.max_interval)
160    }
161}
162
163/// Core trait defining task behavior and lifecycle
164/// All tasks must implement this trait to be executable by the DAG
165/// Provides hooks for custom pre/post processing logic
166#[async_trait]
167pub trait Task: Send + Sync {
168    /// Returns the unique name identifier for this task
169    fn name(&self) -> &str;
170
171    /// Returns list of task names this task depends on
172    /// Used to build the dependency graph
173    fn dependencies(&self) -> Vec<String>;
174
175    /// Returns retry policy for this specific task
176    /// None means use system defaults
177    fn retry_policy(&self) -> Option<RetryPolicy>;
178
179    /// Pre-execution hook called before main task execution
180    /// Useful for setup, validation, or logging
181    /// Default implementation does nothing
182    async fn pre_execution(
183        &self,
184        _ctx: CancellationToken,
185        _input: &TaskInput,
186    ) -> Result<(), Error> {
187        Ok(())
188    }
189
190    /// Main task execution logic - must be implemented by each task
191    /// Receives input from dependencies and returns results
192    /// This is the core business logic of the task
193    async fn execute(&self, ctx: CancellationToken, input: &TaskInput)
194    -> Result<TaskResult, Error>;
195
196    /// Post-execution hook called after successful main execution
197    /// Useful for cleanup, notifications, or result processing
198    /// Default implementation does nothing
199    async fn post_execution(
200        &self,
201        _ctx: CancellationToken,
202        _output: &TaskResult,
203    ) -> Result<(), Error> {
204        Ok(())
205    }
206}
207
208// Re-export the DAG executor for external use
209pub use crate::executor::Dag;