celery/task/
mod.rs

1//! Provides the [`Task`] trait as well as options for configuring tasks.
2
3use async_trait::async_trait;
4use chrono::{DateTime, NaiveDateTime, Utc};
5use rand::distributions::{Distribution, Uniform};
6use serde::{Deserialize, Serialize};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::error::TaskError;
10
11mod async_result;
12mod options;
13mod request;
14mod signature;
15
16pub use async_result::AsyncResult;
17pub use options::TaskOptions;
18pub use request::Request;
19pub use signature::Signature;
20
21/// The return type for a task.
22pub type TaskResult<R> = Result<R, TaskError>;
23
24#[doc(hidden)]
25pub trait AsTaskResult {
26    type Returns: Send + Sync + std::fmt::Debug;
27}
28
29impl<R> AsTaskResult for TaskResult<R>
30where
31    R: Send + Sync + std::fmt::Debug,
32{
33    type Returns = R;
34}
35
36/// A `Task` represents a unit of work that a `Celery` app can produce or consume.
37///
38/// The recommended way to create tasks is through the [`task`](../attr.task.html) attribute macro, not by directly implementing
39/// this trait. For more information see the [tasks chapter](https://rusty-celery.github.io/guide/defining-tasks.html)
40/// in the Rusty Celery Book.
41#[async_trait]
42pub trait Task: Send + Sync + std::marker::Sized {
43    /// The name of the task. When a task is registered it will be registered with this name.
44    const NAME: &'static str;
45
46    /// For compatability with Python tasks. This keeps track of the order
47    /// of arguments for the task so that the task can be called from Python with
48    /// positional arguments.
49    const ARGS: &'static [&'static str];
50
51    /// Default task options.
52    const DEFAULTS: TaskOptions = TaskOptions {
53        time_limit: None,
54        hard_time_limit: None,
55        max_retries: None,
56        min_retry_delay: None,
57        max_retry_delay: None,
58        retry_for_unexpected: None,
59        acks_late: None,
60        content_type: None,
61    };
62
63    /// The parameters of the task.
64    type Params: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>;
65
66    /// The return type of the task.
67    type Returns: Send + Sync + std::fmt::Debug;
68
69    /// Used to initialize a task instance from a request.
70    fn from_request(request: Request<Self>, options: TaskOptions) -> Self;
71
72    /// Get a reference to the request used to create this task instance.
73    fn request(&self) -> &Request<Self>;
74
75    /// Get a reference to the task's configuration options.
76    ///
77    /// This is a product of both app-level task options and the options configured specifically
78    /// for the given task. Options specified at the *task*-level take priority over options
79    /// specified at the app level. So, if the task was defined like this:
80    ///
81    /// ```rust
82    /// # use celery::prelude::*;
83    /// #[celery::task(time_limit = 3)]
84    /// fn add(x: i32, y: i32) -> TaskResult<i32> {
85    ///     Ok(x + y)
86    /// }
87    /// ```
88    ///
89    /// But the `Celery` app was built with a `task_time_limit` of 5, then
90    /// `Task::options().time_limit` would be `Some(3)`.
91    fn options(&self) -> &TaskOptions;
92
93    /// This function defines how a task executes.
94    async fn run(&self, params: Self::Params) -> TaskResult<Self::Returns>;
95
96    /// Callback that will run after a task fails.
97    #[allow(unused_variables)]
98    async fn on_failure(&self, err: &TaskError) {}
99
100    /// Callback that will run after a task completes successfully.
101    #[allow(unused_variables)]
102    async fn on_success(&self, returned: &Self::Returns) {}
103
104    /// Returns the registered name of the task.
105    fn name(&self) -> &'static str {
106        Self::NAME
107    }
108
109    /// This can be called from within a task function to trigger a retry in `countdown` seconds.
110    fn retry_with_countdown(&self, countdown: u32) -> TaskResult<Self::Returns> {
111        let eta = match SystemTime::now().duration_since(UNIX_EPOCH) {
112            Ok(now) => {
113                let now_secs = now.as_secs() as u32;
114                let now_millis = now.subsec_millis();
115                let eta_secs = now_secs + countdown;
116                Some(DateTime::<Utc>::from_naive_utc_and_offset(
117                    NaiveDateTime::from_timestamp_opt(eta_secs as i64, now_millis * 1000)
118                        .ok_or_else(|| {
119                            TaskError::UnexpectedError(format!(
120                                "Invalid countdown seconds {countdown}",
121                            ))
122                        })?,
123                    Utc,
124                ))
125            }
126            Err(_) => None,
127        };
128        Err(TaskError::Retry(eta))
129    }
130
131    /// This can be called from within a task function to trigger a retry at the specified `eta`.
132    fn retry_with_eta(&self, eta: DateTime<Utc>) -> TaskResult<Self::Returns> {
133        Err(TaskError::Retry(Some(eta)))
134    }
135
136    /// Get a future ETA at which time the task should be retried. By default this
137    /// uses a capped exponential backoff strategy.
138    fn retry_eta(&self) -> Option<DateTime<Utc>> {
139        let retries = self.request().retries;
140        let delay_secs = std::cmp::min(
141            2u32.checked_pow(retries)
142                .unwrap_or_else(|| self.max_retry_delay()),
143            self.max_retry_delay(),
144        );
145        let delay_secs = std::cmp::max(delay_secs, self.min_retry_delay());
146        let between = Uniform::from(0..1000);
147        let mut rng = rand::thread_rng();
148        let delay_millis = between.sample(&mut rng);
149        match SystemTime::now().duration_since(UNIX_EPOCH) {
150            Ok(now) => {
151                let now_secs = now.as_secs() as u32;
152                let now_millis = now.subsec_millis();
153                let eta_secs = now_secs + delay_secs;
154                let eta_millis = now_millis + delay_millis;
155                NaiveDateTime::from_timestamp_opt(eta_secs as i64, eta_millis * 1000)
156                    .map(|eta| DateTime::<Utc>::from_naive_utc_and_offset(eta, Utc))
157            }
158            Err(_) => None,
159        }
160    }
161
162    fn retry_for_unexpected(&self) -> bool {
163        Self::DEFAULTS
164            .retry_for_unexpected
165            .or(self.options().retry_for_unexpected)
166            .unwrap_or(true)
167    }
168
169    fn time_limit(&self) -> Option<u32> {
170        self.request().time_limit.or_else(|| {
171            // Take min or `time_limit` and `hard_time_limit`.
172            let time_limit = Self::DEFAULTS.time_limit.or(self.options().time_limit);
173            let hard_time_limit = Self::DEFAULTS
174                .hard_time_limit
175                .or(self.options().hard_time_limit);
176            match (time_limit, hard_time_limit) {
177                (Some(t1), Some(t2)) => Some(std::cmp::min(t1, t2)),
178                (Some(t1), None) => Some(t1),
179                (None, Some(t2)) => Some(t2),
180                _ => None,
181            }
182        })
183    }
184
185    fn max_retries(&self) -> Option<u32> {
186        Self::DEFAULTS.max_retries.or(self.options().max_retries)
187    }
188
189    fn min_retry_delay(&self) -> u32 {
190        Self::DEFAULTS
191            .min_retry_delay
192            .or(self.options().min_retry_delay)
193            .unwrap_or(0)
194    }
195
196    fn max_retry_delay(&self) -> u32 {
197        Self::DEFAULTS
198            .max_retry_delay
199            .or(self.options().max_retry_delay)
200            .unwrap_or(3600)
201    }
202
203    fn acks_late(&self) -> bool {
204        Self::DEFAULTS
205            .acks_late
206            .or(self.options().acks_late)
207            .unwrap_or(false)
208    }
209}
210
211#[derive(Clone, Debug)]
212pub(crate) enum TaskEvent {
213    StatusChange(TaskStatus),
214}
215
216#[derive(Clone, Debug)]
217pub(crate) enum TaskStatus {
218    Pending,
219    Finished,
220}
221
222/// Extension methods for `Result` types within a task body.
223///
224/// These methods can be used to convert a `Result<T, E>` to a `Result<T, TaskError>` with the
225/// appropriate [`TaskError`] variant. The trait has a blanket implementation for any error type that implements
226/// [`std::error::Error`](https://doc.rust-lang.org/std/error/trait.Error.html).
227///
228/// # Examples
229///
230/// ```rust
231/// # use celery::prelude::*;
232/// fn do_some_io() -> Result<(), std::io::Error> {
233///     unimplemented!()
234/// }
235///
236/// #[celery::task]
237/// fn fallible_io_task() -> TaskResult<()> {
238///     do_some_io().with_expected_err(|| "IO error")?;
239///     Ok(())
240/// }
241/// ```
242pub trait TaskResultExt<T, E, F, C> {
243    /// Convert the error type to a [`TaskError::ExpectedError`].
244    fn with_expected_err(self, f: F) -> Result<T, TaskError>;
245
246    /// Convert the error type to a [`TaskError::UnexpectedError`].
247    fn with_unexpected_err(self, f: F) -> Result<T, TaskError>;
248}
249
250impl<T, E, F, C> TaskResultExt<T, E, F, C> for Result<T, E>
251where
252    E: std::error::Error,
253    F: FnOnce() -> C,
254    C: std::fmt::Display + Send + Sync + 'static,
255{
256    fn with_expected_err(self, f: F) -> Result<T, TaskError> {
257        self.map_err(|e| TaskError::ExpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
258    }
259
260    fn with_unexpected_err(self, f: F) -> Result<T, TaskError> {
261        self.map_err(|e| TaskError::UnexpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
262    }
263}