macro_rules! beat {
(
broker = $broker_type:ty { $broker_url:expr },
tasks = [
$( $task_name:expr => {
$task_type:ty,
schedule = $schedule:expr,
args = $args:tt $(,)?
} ),* $(,)?
],
task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
$(, $x:ident = $y:expr )* $(,)?
) => { ... };
(
broker = $broker_type:ty { $broker_url:expr },
scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
tasks = [
$( $task_name:expr => {
$task_type:ty,
schedule = $schedule:expr,
args = $args:tt $(,)?
} ),* $(,)?
],
task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
$(, $x:ident = $y:expr )* $(,)?
) => { ... };
}Expand description
A macro for creating a Beat app.
At a minimum the beat! macro requires these 3 arguments (in order):
broker: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,tasks: a list of tasks together with their relative schedules (can be empty),task_routes: a list of routing rules in the form ofpattern => queue.
§Tasks
An entry in the task list has the following components:
- The name of the task,
- The instance of the task to execute,
- The task schedule, which can be one of the provided schedules (e.g.,
CronSchedule) or any other struct that implementsSchedule, - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
§Custom scheduler backend
A custom scheduler backend can be given as the second argument.
If not given, the default LocalSchedulerBackend will be used.
§Optional parameters
A number of other optional parameters can be passed as last arguments and in arbitrary order
(all of which correspond to a method on the BeatBuilder struct):
default_queue: Set theBeatBuilder::default_queue.heartbeat: Set theBeatBuilder::heartbeat.broker_connection_timeout: Set theBeatBuilder::broker_connection_timeout.broker_connection_retry: Set theBeatBuilder::broker_connection_retry.broker_connection_max_retries: Set theBeatBuilder::broker_connection_max_retries.
§Examples
Create a beat which will send all messages to the celery queue:
let beat = celery::beat!(
broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
tasks = [],
task_routes = [ "*" => "celery" ],
).await?;Create a beat with a scheduled task:
#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
// It is enough to provide the implementation to the worker,
// the beat does not need it.
unimplemented!()
}
let beat = celery::beat!(
broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
tasks = [
"add_task" => {
add,
schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
args = (1, 2)
}
],
task_routes = [ "*" => "celery" ],
).await?;Create a beat with optional parameters:
let beat = celery::beat!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
tasks = [],
task_routes = [],
default_queue = "beat_queue"
).await?;Create a beat with a custom scheduler backend:
use celery::prelude::*;
use celery::beat::*;
use std::collections::BinaryHeap;
struct CustomSchedulerBackend {}
impl SchedulerBackend for CustomSchedulerBackend {
fn should_sync(&self) -> bool {
unimplemented!()
}
fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
unimplemented!()
}
}
let beat = celery::beat!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
tasks = [],
task_routes = [
"*" => "beat_queue",
],
).await?;