[−][src]Trait celery::Task
A Task
represents a unit of work that a Celery
app can produce or consume.
The recommended way to define a task is through the task
procedural macro:
use celery::task; #[task(name = "add")] fn add(x: i32, y: i32) -> i32 { x + y }
However, if you need more fine-grained control, it is fairly straight forward to implement a task directly. This would be equivalent to above:
use async_trait::async_trait; use serde::{Serialize, Deserialize}; use celery::{Task, Error}; #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize)] struct add { x: i32, y: i32, } #[async_trait] impl Task for add { const NAME: &'static str = "add"; const ARGS: &'static [&'static str] = &["x", "y"]; type Returns = i32; async fn run(&mut self) -> Result<Self::Returns, Error> { Ok(self.x + self.y) } } fn add(x: i32, y: i32) -> add { add { x, y } }
Within the Celery protocol
the task parameters can be treated as either args
(positional) or kwargs
(key-word based).
So, for example, from Python the add
task could be called like
celery_app.send_task("add", args=[1, 2])
or
celery_app.send_task("add", kwargs={"x": 1, "y": 2})
Making task parameters optional
You can provide default values for task parameters through the
deserialization mechanism. Currently this means
you'll have to implement the task manually as opposed to using the #[task]
macro.
So if we wanted to make the y
parameter in the add
task optional with a default
value of 0, we could add the #[serde(default)]
macro to the y
field:
#[allow(non_camel_case_types)] #[derive(Serialize, Deserialize)] struct add { x: i32, #[serde(default)] y: i32, }
Error handling
As demonstrated above, the #[task]
macro tries wrapping the return value in Result<Self::Returns, Error>
when it is ran. Therefore the recommended way to propogate errors when defining a task with
#[task]
is to use .context("...")?
on Result
types within the task body.
For example:
use celery::{task, ResultExt}; #[task(name = "add")] fn read_some_file() -> String { tokio::fs::read_to_string("some_file") .await .context("File does not exist")? }
The .context
method on a Result
comes from the ResultExt
trait.
This is used to provide additional human-readable context to the error and also
to convert it into the expected Error
type.
Associated Types
Loading content...Associated Constants
const NAME: &'static str
The name of the task. When a task is registered it will be registered with this name.
const ARGS: &'static [&'static str]
For compatability with Python tasks. This keeps track of the order of arguments for the task so that the task can be called from Python with positional arguments.
Required methods
fn run<'life0, 'async_trait>(
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<Self::Returns, Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
&'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<Self::Returns, Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
This function defines how a task executes.
Provided methods
fn on_failure<'life0, 'async_trait>(
&'life0 mut self,
err: Error
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
&'life0 mut self,
err: Error
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
This function can be overriden to provide custom logic that will run after a task fails. The argument to the function is the error returned by the task.
fn on_success<'life0, 'async_trait>(
&'life0 mut self,
returned: Self::Returns
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
&'life0 mut self,
returned: Self::Returns
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
'life0: 'async_trait,
Self: 'async_trait,
This function can be overriden to provide custom logic that will run after a task completes successfully. The argument to the function is the returned value of the task.
fn timeout(&self) -> Option<usize>
Default timeout for this task.
fn max_retries(&self) -> Option<usize>
Default maximum number of retries for this task.
fn min_retry_delay(&self) -> usize
Default minimum retry delay (in seconds) for this task (default is 0).
fn max_retry_delay(&self) -> usize
Default maximum retry delay (in seconds) for this task (default is 3600 seconds).