task_supervisor/task/
mod.rs

1use std::{
2    error::Error,
3    time::{Duration, Instant},
4};
5
6use tokio_util::sync::CancellationToken;
7
8pub type DynTask = Box<dyn CloneableSupervisedTask>;
9pub type TaskError = Box<dyn Error + Send + Sync>;
10
11#[async_trait::async_trait]
12pub trait SupervisedTask: Send + 'static {
13    /// Runs the task until completion or failure.
14    async fn run(&mut self) -> Result<TaskOutcome, TaskError>;
15}
16
17pub trait CloneableSupervisedTask: SupervisedTask {
18    fn clone_box(&self) -> Box<dyn CloneableSupervisedTask>;
19}
20
21impl<T> CloneableSupervisedTask for T
22where
23    T: SupervisedTask + Clone + Send + 'static,
24{
25    fn clone_box(&self) -> Box<dyn CloneableSupervisedTask> {
26        Box::new(self.clone())
27    }
28}
29
30/// Represents the current state of a supervised task.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum TaskStatus {
33    /// Task has been created but not yet started.
34    Created,
35    /// Task is in the process of starting.
36    Starting,
37    /// Task is running and healthy.
38    Healthy,
39    /// Task has failed and is pending restart.
40    Failed,
41    /// Task has completed successfully.
42    Completed,
43    /// Task has failed too many times and is terminated.
44    Dead,
45}
46
47impl TaskStatus {
48    pub fn is_restarting(&self) -> bool {
49        matches!(self, TaskStatus::Failed)
50    }
51
52    pub fn is_healthy(&self) -> bool {
53        matches!(self, TaskStatus::Healthy)
54    }
55
56    pub fn is_dead(&self) -> bool {
57        matches!(self, TaskStatus::Dead)
58    }
59
60    pub fn has_completed(&self) -> bool {
61        matches!(self, TaskStatus::Completed)
62    }
63}
64
65impl std::fmt::Display for TaskStatus {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Created => write!(f, "created"),
69            Self::Starting => write!(f, "starting"),
70            Self::Healthy => write!(f, "healthy"),
71            Self::Failed => write!(f, "failed"),
72            Self::Completed => write!(f, "completed"),
73            Self::Dead => write!(f, "dead"),
74        }
75    }
76}
77
78/// Outcome of a task's execution.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TaskOutcome {
81    /// Task completed successfully and should not be restarted.
82    Completed,
83    /// Task failed and may be restarted, with an optional reason.
84    Failed(String),
85}
86
87impl std::fmt::Display for TaskOutcome {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            Self::Completed => write!(f, "completed"),
91            Self::Failed(e) => write!(f, "failed: {e}"),
92        }
93    }
94}
95
96pub(crate) struct TaskHandle {
97    pub(crate) status: TaskStatus,
98    pub(crate) task: DynTask,
99    pub(crate) handles: Option<Vec<tokio::task::JoinHandle<()>>>,
100    pub(crate) last_heartbeat: Option<Instant>,
101    pub(crate) restart_attempts: u32,
102    pub(crate) healthy_since: Option<Instant>,
103    pub(crate) cancellation_token: Option<CancellationToken>,
104    max_restart_attempts: u32,
105    base_restart_delay: Duration,
106}
107
108impl TaskHandle {
109    /// Creates a `TaskHandle` from a boxed task with default configuration.
110    pub(crate) fn new(
111        task: Box<dyn CloneableSupervisedTask>,
112        max_restart_attempts: u32,
113        base_restart_delay: Duration,
114    ) -> Self {
115        Self {
116            status: TaskStatus::Created,
117            task,
118            handles: None,
119            last_heartbeat: None,
120            restart_attempts: 0,
121            healthy_since: None,
122            cancellation_token: None,
123            max_restart_attempts,
124            base_restart_delay,
125        }
126    }
127
128    /// Creates a new `TaskHandle` with custom restart configuration.
129    pub(crate) fn from_task<T: CloneableSupervisedTask + 'static>(
130        task: T,
131        max_restart_attempts: u32,
132        base_restart_delay: Duration,
133    ) -> Self {
134        let task = Box::new(task);
135        Self::new(task, max_restart_attempts, base_restart_delay)
136    }
137
138    /// Updates the last heartbeat time.
139    pub(crate) fn ticked_at(&mut self, at: Instant) {
140        self.last_heartbeat = Some(at);
141    }
142
143    /// Calculates the time since the last heartbeat.
144    pub(crate) fn time_since_last_heartbeat(&self) -> Option<Duration> {
145        self.last_heartbeat
146            .map(|last| Instant::now().duration_since(last))
147    }
148
149    /// Checks if the task has crashed based on the timeout threshold.
150    pub(crate) fn has_crashed(&self, timeout_threshold: Duration) -> bool {
151        let Some(time_since_last_heartbeat) = self.time_since_last_heartbeat() else {
152            return !self.is_ko();
153        };
154        !self.is_ko() && time_since_last_heartbeat > timeout_threshold
155    }
156
157    /// Calculates the restart delay using exponential backoff.
158    pub(crate) fn restart_delay(&self) -> Duration {
159        let factor = 2u32.saturating_pow(self.restart_attempts.min(5));
160        self.base_restart_delay.saturating_mul(factor)
161    }
162
163    /// Checks if the task has exceeded its maximum restart attempts.
164    pub(crate) const fn has_exceeded_max_retries(&self) -> bool {
165        self.restart_attempts >= self.max_restart_attempts
166    }
167
168    /// Updates the task's status.
169    pub(crate) fn mark(&mut self, status: TaskStatus) {
170        self.status = status;
171    }
172
173    /// Cleans up the task by aborting its handle and resetting state.
174    pub(crate) async fn clean(&mut self) {
175        if let Some(token) = self.cancellation_token.take() {
176            token.cancel();
177        }
178        self.last_heartbeat = None;
179        self.healthy_since = None;
180        if let Some(handles) = self.handles.take() {
181            for handle in handles {
182                handle.abort();
183            }
184        }
185    }
186
187    /// Checks if the task is in a failed or dead state.
188    pub(crate) fn is_ko(&self) -> bool {
189        self.status == TaskStatus::Failed || self.status == TaskStatus::Dead
190    }
191}