task_supervisor/task/
mod.rs1use std::{
2 error::Error,
3 time::{Duration, Instant},
4};
5
6use tokio_util::sync::CancellationToken;
7
8pub type DynTask = Box<dyn CloneableSupervisedTask>;
9pub type TaskError = Box<dyn Error + Send + Sync>;
10
11#[async_trait::async_trait]
12pub trait SupervisedTask: Send + 'static {
13 async fn run(&mut self) -> Result<TaskOutcome, TaskError>;
15}
16
17pub trait CloneableSupervisedTask: SupervisedTask {
18 fn clone_box(&self) -> Box<dyn CloneableSupervisedTask>;
19}
20
21impl<T> CloneableSupervisedTask for T
22where
23 T: SupervisedTask + Clone + Send + 'static,
24{
25 fn clone_box(&self) -> Box<dyn CloneableSupervisedTask> {
26 Box::new(self.clone())
27 }
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum TaskStatus {
33 Created,
35 Starting,
37 Healthy,
39 Failed,
41 Completed,
43 Dead,
45}
46
47impl TaskStatus {
48 pub fn is_restarting(&self) -> bool {
49 matches!(self, TaskStatus::Failed)
50 }
51
52 pub fn is_healthy(&self) -> bool {
53 matches!(self, TaskStatus::Healthy)
54 }
55
56 pub fn is_dead(&self) -> bool {
57 matches!(self, TaskStatus::Dead)
58 }
59
60 pub fn has_completed(&self) -> bool {
61 matches!(self, TaskStatus::Completed)
62 }
63}
64
65impl std::fmt::Display for TaskStatus {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Created => write!(f, "created"),
69 Self::Starting => write!(f, "starting"),
70 Self::Healthy => write!(f, "healthy"),
71 Self::Failed => write!(f, "failed"),
72 Self::Completed => write!(f, "completed"),
73 Self::Dead => write!(f, "dead"),
74 }
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TaskOutcome {
81 Completed,
83 Failed(String),
85}
86
87impl std::fmt::Display for TaskOutcome {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 Self::Completed => write!(f, "completed"),
91 Self::Failed(e) => write!(f, "failed: {e}"),
92 }
93 }
94}
95
96pub(crate) struct TaskHandle {
97 pub(crate) status: TaskStatus,
98 pub(crate) task: DynTask,
99 pub(crate) handles: Option<Vec<tokio::task::JoinHandle<()>>>,
100 pub(crate) last_heartbeat: Option<Instant>,
101 pub(crate) restart_attempts: u32,
102 pub(crate) healthy_since: Option<Instant>,
103 pub(crate) cancellation_token: Option<CancellationToken>,
104 max_restart_attempts: u32,
105 base_restart_delay: Duration,
106}
107
108impl TaskHandle {
109 pub(crate) fn new(
111 task: Box<dyn CloneableSupervisedTask>,
112 max_restart_attempts: u32,
113 base_restart_delay: Duration,
114 ) -> Self {
115 Self {
116 status: TaskStatus::Created,
117 task,
118 handles: None,
119 last_heartbeat: None,
120 restart_attempts: 0,
121 healthy_since: None,
122 cancellation_token: None,
123 max_restart_attempts,
124 base_restart_delay,
125 }
126 }
127
128 pub(crate) fn from_task<T: CloneableSupervisedTask + 'static>(
130 task: T,
131 max_restart_attempts: u32,
132 base_restart_delay: Duration,
133 ) -> Self {
134 let task = Box::new(task);
135 Self::new(task, max_restart_attempts, base_restart_delay)
136 }
137
138 pub(crate) fn ticked_at(&mut self, at: Instant) {
140 self.last_heartbeat = Some(at);
141 }
142
143 pub(crate) fn time_since_last_heartbeat(&self) -> Option<Duration> {
145 self.last_heartbeat
146 .map(|last| Instant::now().duration_since(last))
147 }
148
149 pub(crate) fn has_crashed(&self, timeout_threshold: Duration) -> bool {
151 let Some(time_since_last_heartbeat) = self.time_since_last_heartbeat() else {
152 return !self.is_ko();
153 };
154 !self.is_ko() && time_since_last_heartbeat > timeout_threshold
155 }
156
157 pub(crate) fn restart_delay(&self) -> Duration {
159 let factor = 2u32.saturating_pow(self.restart_attempts.min(5));
160 self.base_restart_delay.saturating_mul(factor)
161 }
162
163 pub(crate) const fn has_exceeded_max_retries(&self) -> bool {
165 self.restart_attempts >= self.max_restart_attempts
166 }
167
168 pub(crate) fn mark(&mut self, status: TaskStatus) {
170 self.status = status;
171 }
172
173 pub(crate) async fn clean(&mut self) {
175 if let Some(token) = self.cancellation_token.take() {
176 token.cancel();
177 }
178 self.last_heartbeat = None;
179 self.healthy_since = None;
180 if let Some(handles) = self.handles.take() {
181 for handle in handles {
182 handle.abort();
183 }
184 }
185 }
186
187 pub(crate) fn is_ko(&self) -> bool {
189 self.status == TaskStatus::Failed || self.status == TaskStatus::Dead
190 }
191}