Macro celery::beat

source ·
macro_rules! beat {
    (
        broker = $broker_type:ty { $broker_url:expr },
        tasks = [
            $( $task_name:expr => {
                $task_type:ty,
                schedule = $schedule:expr,
                args = $args:tt $(,)?
            } ),* $(,)?
        ],
        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
        $(, $x:ident = $y:expr )* $(,)?
    ) => { ... };
    (
        broker = $broker_type:ty { $broker_url:expr },
        scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
        tasks = [
            $( $task_name:expr => {
                $task_type:ty,
                schedule = $schedule:expr,
                args = $args:tt $(,)?
            } ),* $(,)?
        ],
        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
        $(, $x:ident = $y:expr )* $(,)?
    ) => { ... };
}
Expand description

A macro for creating a Beat app.

At a minimum the beat! macro requires these 3 arguments (in order):

  • broker: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
  • tasks: a list of tasks together with their relative schedules (can be empty),
  • task_routes: a list of routing rules in the form of pattern => queue.

Tasks

An entry in the task list has the following components:

  • The name of the task,
  • The instance of the task to execute,
  • The task schedule, which can be one of the provided schedules (e.g., CronSchedule) or any other struct that implements Schedule,
  • A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.

Custom scheduler backend

A custom scheduler backend can be given as the second argument. If not given, the default LocalSchedulerBackend will be used.

Optional parameters

A number of other optional parameters can be passed as last arguments and in arbitrary order (all of which correspond to a method on the BeatBuilder struct):

Examples

Create a beat which will send all messages to the celery queue:

let beat = celery::beat!(
    broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
    tasks = [],
    task_routes = [ "*" => "celery" ],
).await?;

Create a beat with a scheduled task:

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    // It is enough to provide the implementation to the worker,
    // the beat does not need it.
    unimplemented!()
}

let beat = celery::beat!(
    broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
    tasks = [
        "add_task" => {
            add,
            schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
            args = (1, 2)
        }
    ],
    task_routes = [ "*" => "celery" ],
).await?;

Create a beat with optional parameters:

let beat = celery::beat!(
    broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [],
    task_routes = [],
    default_queue = "beat_queue"
).await?;

Create a beat with a custom scheduler backend:

use celery::prelude::*;
use celery::beat::*;
use std::collections::BinaryHeap;

struct CustomSchedulerBackend {}

impl SchedulerBackend for CustomSchedulerBackend {
    fn should_sync(&self) -> bool {
        unimplemented!()
    }

    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
        unimplemented!()
    }
}

let beat = celery::beat!(
    broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
    scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
    tasks = [],
    task_routes = [
        "*" => "beat_queue",
    ],
).await?;