1use async_trait::async_trait;
4use chrono::{DateTime, NaiveDateTime, Utc};
5use rand::distributions::{Distribution, Uniform};
6use serde::{Deserialize, Serialize};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::error::TaskError;
10
11mod async_result;
12mod options;
13mod request;
14mod signature;
15
16pub use async_result::AsyncResult;
17pub use options::TaskOptions;
18pub use request::Request;
19pub use signature::Signature;
20
21pub type TaskResult<R> = Result<R, TaskError>;
23
24#[doc(hidden)]
25pub trait AsTaskResult {
26 type Returns: Send + Sync + std::fmt::Debug;
27}
28
29impl<R> AsTaskResult for TaskResult<R>
30where
31 R: Send + Sync + std::fmt::Debug,
32{
33 type Returns = R;
34}
35
36#[async_trait]
42pub trait Task: Send + Sync + std::marker::Sized {
43 const NAME: &'static str;
45
46 const ARGS: &'static [&'static str];
50
51 const DEFAULTS: TaskOptions = TaskOptions {
53 time_limit: None,
54 hard_time_limit: None,
55 max_retries: None,
56 min_retry_delay: None,
57 max_retry_delay: None,
58 retry_for_unexpected: None,
59 acks_late: None,
60 content_type: None,
61 };
62
63 type Params: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>;
65
66 type Returns: Send + Sync + std::fmt::Debug;
68
69 fn from_request(request: Request<Self>, options: TaskOptions) -> Self;
71
72 fn request(&self) -> &Request<Self>;
74
75 fn options(&self) -> &TaskOptions;
92
93 async fn run(&self, params: Self::Params) -> TaskResult<Self::Returns>;
95
96 #[allow(unused_variables)]
98 async fn on_failure(&self, err: &TaskError) {}
99
100 #[allow(unused_variables)]
102 async fn on_success(&self, returned: &Self::Returns) {}
103
104 fn name(&self) -> &'static str {
106 Self::NAME
107 }
108
109 fn retry_with_countdown(&self, countdown: u32) -> TaskResult<Self::Returns> {
111 let eta = match SystemTime::now().duration_since(UNIX_EPOCH) {
112 Ok(now) => {
113 let now_secs = now.as_secs() as u32;
114 let now_millis = now.subsec_millis();
115 let eta_secs = now_secs + countdown;
116 Some(DateTime::<Utc>::from_naive_utc_and_offset(
117 NaiveDateTime::from_timestamp_opt(eta_secs as i64, now_millis * 1000)
118 .ok_or_else(|| {
119 TaskError::UnexpectedError(format!(
120 "Invalid countdown seconds {countdown}",
121 ))
122 })?,
123 Utc,
124 ))
125 }
126 Err(_) => None,
127 };
128 Err(TaskError::Retry(eta))
129 }
130
131 fn retry_with_eta(&self, eta: DateTime<Utc>) -> TaskResult<Self::Returns> {
133 Err(TaskError::Retry(Some(eta)))
134 }
135
136 fn retry_eta(&self) -> Option<DateTime<Utc>> {
139 let retries = self.request().retries;
140 let delay_secs = std::cmp::min(
141 2u32.checked_pow(retries)
142 .unwrap_or_else(|| self.max_retry_delay()),
143 self.max_retry_delay(),
144 );
145 let delay_secs = std::cmp::max(delay_secs, self.min_retry_delay());
146 let between = Uniform::from(0..1000);
147 let mut rng = rand::thread_rng();
148 let delay_millis = between.sample(&mut rng);
149 match SystemTime::now().duration_since(UNIX_EPOCH) {
150 Ok(now) => {
151 let now_secs = now.as_secs() as u32;
152 let now_millis = now.subsec_millis();
153 let eta_secs = now_secs + delay_secs;
154 let eta_millis = now_millis + delay_millis;
155 NaiveDateTime::from_timestamp_opt(eta_secs as i64, eta_millis * 1000)
156 .map(|eta| DateTime::<Utc>::from_naive_utc_and_offset(eta, Utc))
157 }
158 Err(_) => None,
159 }
160 }
161
162 fn retry_for_unexpected(&self) -> bool {
163 Self::DEFAULTS
164 .retry_for_unexpected
165 .or(self.options().retry_for_unexpected)
166 .unwrap_or(true)
167 }
168
169 fn time_limit(&self) -> Option<u32> {
170 self.request().time_limit.or_else(|| {
171 let time_limit = Self::DEFAULTS.time_limit.or(self.options().time_limit);
173 let hard_time_limit = Self::DEFAULTS
174 .hard_time_limit
175 .or(self.options().hard_time_limit);
176 match (time_limit, hard_time_limit) {
177 (Some(t1), Some(t2)) => Some(std::cmp::min(t1, t2)),
178 (Some(t1), None) => Some(t1),
179 (None, Some(t2)) => Some(t2),
180 _ => None,
181 }
182 })
183 }
184
185 fn max_retries(&self) -> Option<u32> {
186 Self::DEFAULTS.max_retries.or(self.options().max_retries)
187 }
188
189 fn min_retry_delay(&self) -> u32 {
190 Self::DEFAULTS
191 .min_retry_delay
192 .or(self.options().min_retry_delay)
193 .unwrap_or(0)
194 }
195
196 fn max_retry_delay(&self) -> u32 {
197 Self::DEFAULTS
198 .max_retry_delay
199 .or(self.options().max_retry_delay)
200 .unwrap_or(3600)
201 }
202
203 fn acks_late(&self) -> bool {
204 Self::DEFAULTS
205 .acks_late
206 .or(self.options().acks_late)
207 .unwrap_or(false)
208 }
209}
210
211#[derive(Clone, Debug)]
212pub(crate) enum TaskEvent {
213 StatusChange(TaskStatus),
214}
215
216#[derive(Clone, Debug)]
217pub(crate) enum TaskStatus {
218 Pending,
219 Finished,
220}
221
222pub trait TaskResultExt<T, E, F, C> {
243 fn with_expected_err(self, f: F) -> Result<T, TaskError>;
245
246 fn with_unexpected_err(self, f: F) -> Result<T, TaskError>;
248}
249
250impl<T, E, F, C> TaskResultExt<T, E, F, C> for Result<T, E>
251where
252 E: std::error::Error,
253 F: FnOnce() -> C,
254 C: std::fmt::Display + Send + Sync + 'static,
255{
256 fn with_expected_err(self, f: F) -> Result<T, TaskError> {
257 self.map_err(|e| TaskError::ExpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
258 }
259
260 fn with_unexpected_err(self, f: F) -> Result<T, TaskError> {
261 self.map_err(|e| TaskError::UnexpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
262 }
263}