macro_rules! beat { ( broker = $broker_type:ty { $broker_url:expr }, tasks = [ $( $task_name:expr => { $task_type:ty, schedule = $schedule:expr, args = $args:tt $(,)? } ),* $(,)? ], task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ] $(, $x:ident = $y:expr )* $(,)? ) => { ... }; ( broker = $broker_type:ty { $broker_url:expr }, scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr }, tasks = [ $( $task_name:expr => { $task_type:ty, schedule = $schedule:expr, args = $args:tt $(,)? } ),* $(,)? ], task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ] $(, $x:ident = $y:expr )* $(,)? ) => { ... }; }
Expand description
A macro for creating a Beat
app.
At a minimum the beat!
macro requires these 3 arguments (in order):
broker
: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,tasks
: a list of tasks together with their relative schedules (can be empty),task_routes
: a list of routing rules in the form ofpattern => queue
.
Tasks
An entry in the task list has the following components:
- The name of the task,
- The instance of the task to execute,
- The task schedule, which can be one of the provided schedules (e.g.,
CronSchedule
) or any other struct that implementsSchedule
, - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
Custom scheduler backend
A custom scheduler backend can be given as the second argument.
If not given, the default LocalSchedulerBackend
will be used.
Optional parameters
A number of other optional parameters can be passed as last arguments and in arbitrary order
(all of which correspond to a method on the BeatBuilder
struct):
default_queue
: Set theBeatBuilder::default_queue
.heartbeat
: Set theBeatBuilder::heartbeat
.broker_connection_timeout
: Set theBeatBuilder::broker_connection_timeout
.broker_connection_retry
: Set theBeatBuilder::broker_connection_retry
.broker_connection_max_retries
: Set theBeatBuilder::broker_connection_max_retries
.
Examples
Create a beat
which will send all messages to the celery
queue:
let beat = celery::beat!(
broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
tasks = [],
task_routes = [ "*" => "celery" ],
).await?;
Create a beat
with a scheduled task:
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
// It is enough to provide the implementation to the worker,
// the beat does not need it.
unimplemented!()
}
let beat = celery::beat!(
broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
tasks = [
"add_task" => {
add,
schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
args = (1, 2)
}
],
task_routes = [ "*" => "celery" ],
).await?;
Create a beat
with optional parameters:
let beat = celery::beat!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
tasks = [],
task_routes = [],
default_queue = "beat_queue"
).await?;
Create a beat
with a custom scheduler backend:
use celery::prelude::*;
use celery::beat::*;
use std::collections::BinaryHeap;
struct CustomSchedulerBackend {}
impl SchedulerBackend for CustomSchedulerBackend {
fn should_sync(&self) -> bool {
unimplemented!()
}
fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
unimplemented!()
}
}
let beat = celery::beat!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
tasks = [],
task_routes = [
"*" => "beat_queue",
],
).await?;