[][src]Trait celery::Task

pub trait Task: Send + Sync + Serialize + for<'de> Deserialize<'de> {
    type Returns: Send + Sync + Debug;

    const NAME: &'static str;
    const ARGS: &'static [&'static str];

    fn run<'life0, 'async_trait>(
        &'life0 mut self
    ) -> Pin<Box<dyn Future<Output = Result<Self::Returns, Error>> + Send + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
; fn on_failure<'life0, 'async_trait>(
        &'life0 mut self,
        err: Error
    ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
, { ... }
fn on_success<'life0, 'async_trait>(
        &'life0 mut self,
        returned: Self::Returns
    ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>>
    where
        'life0: 'async_trait,
        Self: 'async_trait
, { ... }
fn timeout(&self) -> Option<usize> { ... }
fn max_retries(&self) -> Option<usize> { ... }
fn min_retry_delay(&self) -> usize { ... }
fn max_retry_delay(&self) -> usize { ... } }

A Task represents a unit of work that a Celery app can produce or consume.

The recommended way to define a task is through the task procedural macro:

use celery::task;

#[task(name = "add")]
fn add(x: i32, y: i32) -> i32 {
    x + y
}

However, if you need more fine-grained control, it is fairly straight forward to implement a task directly. This would be equivalent to above:

use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use celery::{Task, Error};

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize)]
struct add {
    x: i32,
    y: i32,
}

#[async_trait]
impl Task for add {
    const NAME: &'static str = "add";
    const ARGS: &'static [&'static str] = &["x", "y"];

    type Returns = i32;

    async fn run(&mut self) -> Result<Self::Returns, Error> {
        Ok(self.x + self.y)
    }
}

fn add(x: i32, y: i32) -> add {
    add { x, y }
}

Within the Celery protocol the task parameters can be treated as either args (positional) or kwargs (key-word based). So, for example, from Python the add task could be called like

celery_app.send_task("add", args=[1, 2])

or

celery_app.send_task("add", kwargs={"x": 1, "y": 2})

Making task parameters optional

You can provide default values for task parameters through the deserialization mechanism. Currently this means you'll have to implement the task manually as opposed to using the #[task] macro.

So if we wanted to make the y parameter in the add task optional with a default value of 0, we could add the #[serde(default)] macro to the y field:

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize)]
struct add {
    x: i32,
    #[serde(default)]
    y: i32,
}

Error handling

As demonstrated above, the #[task] macro tries wrapping the return value in Result<Self::Returns, Error> when it is ran. Therefore the recommended way to propogate errors when defining a task with #[task] is to use .context("...")? on Result types within the task body.

For example:

use celery::{task, ResultExt};

#[task(name = "add")]
fn read_some_file() -> String {
    tokio::fs::read_to_string("some_file")
        .await
        .context("File does not exist")?
}

The .context method on a Result comes from the ResultExt trait. This is used to provide additional human-readable context to the error and also to convert it into the expected Error type.

Associated Types

type Returns: Send + Sync + Debug

The return type of the task.

Loading content...

Associated Constants

const NAME: &'static str

The name of the task. When a task is registered it will be registered with this name.

const ARGS: &'static [&'static str]

For compatability with Python tasks. This keeps track of the order of arguments for the task so that the task can be called from Python with positional arguments.

Loading content...

Required methods

fn run<'life0, 'async_trait>(
    &'life0 mut self
) -> Pin<Box<dyn Future<Output = Result<Self::Returns, Error>> + Send + 'async_trait>> where
    'life0: 'async_trait,
    Self: 'async_trait, 

This function defines how a task executes.

Loading content...

Provided methods

fn on_failure<'life0, 'async_trait>(
    &'life0 mut self,
    err: Error
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
    'life0: 'async_trait,
    Self: 'async_trait, 

This function can be overriden to provide custom logic that will run after a task fails. The argument to the function is the error returned by the task.

fn on_success<'life0, 'async_trait>(
    &'life0 mut self,
    returned: Self::Returns
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'async_trait>> where
    'life0: 'async_trait,
    Self: 'async_trait, 

This function can be overriden to provide custom logic that will run after a task completes successfully. The argument to the function is the returned value of the task.

fn timeout(&self) -> Option<usize>

Default timeout for this task.

fn max_retries(&self) -> Option<usize>

Default maximum number of retries for this task.

fn min_retry_delay(&self) -> usize

Default minimum retry delay (in seconds) for this task (default is 0).

fn max_retry_delay(&self) -> usize

Default maximum retry delay (in seconds) for this task (default is 3600 seconds).

Loading content...

Implementors

Loading content...