celery/codegen.rs
1pub use celery_rs_codegen::task;
2
3#[doc(hidden)]
4#[macro_export]
5macro_rules! __app_internal {
6 (
7 $broker_type:ty { $broker_url:expr },
8 [ $( $t:ty ),* ],
9 [ $( $pattern:expr => $queue:expr ),* ],
10 $( $x:ident = $y:expr, )*
11 ) => {{
12 async fn _build_app(mut builder: $crate::CeleryBuilder) ->
13 $crate::export::Result<$crate::export::Arc<$crate::Celery>> {
14 let celery: $crate::Celery = builder.build().await?;
15
16 $(
17 celery.register_task::<$t>().await?;
18 )*
19
20 Ok($crate::export::Arc::new(celery))
21 }
22
23 let broker_url = $broker_url;
24
25 let mut builder = $crate::CeleryBuilder::new("celery", &broker_url);
26
27 $(
28 builder = builder.$x($y);
29 )*
30
31 $(
32 builder = builder.task_route($pattern, $queue);
33 )*
34
35 _build_app(builder)
36 }};
37}
38
39#[doc(hidden)]
40#[macro_export]
41macro_rules! __beat_internal {
42 (
43 $broker_type:ty { $broker_url:expr },
44 $scheduler_backend_type:ty { $scheduler_backend:expr },
45 [
46 $( $task_name:expr => {
47 $task_type:ty,
48 $schedule:expr,
49 ( $( $task_arg:expr ),* $(,)?)
50 } ),*
51 ],
52 [ $( $pattern:expr => $queue:expr ),* ],
53 $( $x:ident = $y:expr, )*
54 ) => {{
55 async fn _build_beat(mut builder: $crate::beat::BeatBuilder::<$scheduler_backend_type>) ->
56 $crate::export::BeatResult<$crate::beat::Beat::<$scheduler_backend_type>> {
57 let mut beat = builder.build().await?;
58
59 $(
60 beat.schedule_named_task($task_name.to_string(), <$task_type>::new( $( $task_arg ),* ), $schedule);
61 )*
62
63 Ok(beat)
64 }
65
66 let broker_url = $broker_url;
67
68 let mut builder = $crate::beat::Beat::<$scheduler_backend_type>::custom_builder("beat", &broker_url, $scheduler_backend);
69
70 $(
71 builder = builder.$x($y);
72 )*
73
74 $(
75 builder = builder.task_route($pattern, $queue);
76 )*
77
78 _build_beat(builder)
79 }};
80}
81
82/// A macro for creating a [`Celery`](struct.Celery.html) app.
83///
84/// At a minimum the `app!` macro requires these 3 arguments (in order):
85/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
86/// - `tasks`: a list of tasks to register, and
87/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
88///
89/// # Optional parameters
90///
91/// Following the task routing rules there are a number of other optional parameters that
92/// may appear in arbitrary order (all of which correspond to a method on the
93/// [`CeleryBuilder`](struct.CeleryBuilder.html) struct):
94///
95/// - `default_queue`: Set the
96/// [`CeleryBuilder::default_queue`](struct.CeleryBuilder.html#method.default_queue).
97/// - `prefetch_count`: Set the [`CeleryBuilder::prefect_count`](struct.CeleryBuilder.html#method.prefect_count).
98/// - `heartbeat`: Set the [`CeleryBuilder::heartbeat`](struct.CeleryBuilder.html#method.heartbeat).
99/// - `task_time_limit`: Set an app-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
100/// - `task_hard_time_limit`: Set an app-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
101/// - `task_max_retries`: Set an app-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
102/// - `task_min_retry_delay`: Set an app-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
103/// - `task_max_retry_delay`: Set an app-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
104/// - `task_retry_for_unexpected`: Set an app-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
105/// - `acks_late`: Set an app-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
106/// - `result_backend`: Set the [`CeleryBuilder::result_backend`](struct.CeleryBuilder.html#method.result_backend).
107/// - `broker_connection_timeout`: Set the
108/// [`CeleryBuilder::broker_connection_timeout`](struct.CeleryBuilder.html#method.broker_connection_timeout).
109/// - `broker_connection_retry`: Set the
110/// [`CeleryBuilder::broker_connection_retry`](struct.CeleryBuilder.html#method.broker_connection_retry).
111/// - `broker_connection_max_retries`: Set the
112/// [`CeleryBuilder::broker_connection_max_retries`](struct.CeleryBuilder.html#method.broker_connection_max_retries).
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// # #[macro_use] extern crate celery;
118/// # use anyhow::Result;
119/// use celery::prelude::*;
120///
121/// #[celery::task]
122/// fn add(x: i32, y: i32) -> TaskResult<i32> {
123/// Ok(x + y)
124/// }
125///
126/// # #[tokio::main]
127/// # async fn main() -> Result<()> {
128/// let app = celery::app!(
129/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
130/// tasks = [ add ],
131/// task_routes = [ "*" => "celery" ],
132/// ).await?;
133/// # Ok(())
134/// # }
135/// ```
136///
137/// ```rust,no_run
138/// # #[macro_use] extern crate celery;
139/// # use anyhow::Result;
140/// # use celery::prelude::*;
141/// # #[tokio::main]
142/// # async fn main() -> Result<()> {
143/// let app = celery::app!(
144/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
145/// tasks = [],
146/// task_routes = [],
147/// task_time_limit = 2
148/// ).await?;
149/// # Ok(())
150/// # }
151/// ```
152#[macro_export]
153macro_rules! app {
154 (
155 broker = $broker_type:ty { $broker_url:expr },
156 tasks = [ $( $t:ty ),* $(,)? ],
157 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
158 $(, $x:ident = $y:expr )* $(,)?
159 ) => {
160 $crate::__app_internal!(
161 $broker_type { $broker_url },
162 [ $( $t ),* ],
163 [ $( $pattern => $queue ),* ],
164 $( $x = $y, )*
165 );
166 };
167}
168
169/// A macro for creating a [`Beat`](beat/struct.Beat.html) app.
170///
171/// At a minimum the `beat!` macro requires these 3 arguments (in order):
172/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
173/// - `tasks`: a list of tasks together with their relative schedules (can be empty),
174/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
175///
176/// # Tasks
177///
178/// An entry in the task list has the following components:
179/// - The name of the task,
180/// - The instance of the task to execute,
181/// - The task schedule, which can be one of the provided schedules (e.g., [`CronSchedule`](crate::beat::CronSchedule))
182/// or any other struct that implements [`Schedule`](crate::beat::Schedule),
183/// - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
184///
185/// # Custom scheduler backend
186///
187/// A custom scheduler backend can be given as the second argument.
188/// If not given, the default [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html) will be used.
189///
190/// # Optional parameters
191///
192/// A number of other optional parameters can be passed as last arguments and in arbitrary order
193/// (all of which correspond to a method on the [`BeatBuilder`](beat/struct.BeatBuilder.html) struct):
194///
195/// - `default_queue`: Set the
196/// [`BeatBuilder::default_queue`](beat/struct.BeatBuilder.html#method.default_queue).
197/// - `heartbeat`: Set the [`BeatBuilder::heartbeat`](beat/struct.BeatBuilder.html#method.heartbeat).
198/// - `broker_connection_timeout`: Set the
199/// [`BeatBuilder::broker_connection_timeout`](beat/struct.BeatBuilder.html#method.broker_connection_timeout).
200/// - `broker_connection_retry`: Set the
201/// [`BeatBuilder::broker_connection_retry`](beat/struct.BeatBuilder.html#method.broker_connection_retry).
202/// - `broker_connection_max_retries`: Set the
203/// [`BeatBuilder::broker_connection_max_retries`](beat/struct.BeatBuilder.html#method.broker_connection_max_retries).
204///
205/// # Examples
206///
207/// Create a `beat` which will send all messages to the `celery` queue:
208///
209/// ```rust,no_run
210/// # #[macro_use] extern crate celery;
211/// # use anyhow::Result;
212/// # use celery::prelude::*;
213/// # #[tokio::main]
214/// # async fn main() -> Result<()> {
215/// let beat = celery::beat!(
216/// broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
217/// tasks = [],
218/// task_routes = [ "*" => "celery" ],
219/// ).await?;
220/// # Ok(())
221/// # }
222/// ```
223///
224/// Create a `beat` with a scheduled task:
225///
226/// ```rust,no_run
227/// # #[macro_use] extern crate celery;
228/// # use anyhow::Result;
229/// # use celery::prelude::*;
230/// # use celery::beat::CronSchedule;
231/// # #[tokio::main]
232/// # async fn main() -> Result<()> {
233/// #[celery::task]
234/// fn add(x: i32, y: i32) -> TaskResult<i32> {
235/// // It is enough to provide the implementation to the worker,
236/// // the beat does not need it.
237/// unimplemented!()
238/// }
239///
240/// let beat = celery::beat!(
241/// broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
242/// tasks = [
243/// "add_task" => {
244/// add,
245/// schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
246/// args = (1, 2)
247/// }
248/// ],
249/// task_routes = [ "*" => "celery" ],
250/// ).await?;
251/// # Ok(())
252/// # }
253/// ```
254///
255/// Create a `beat` with optional parameters:
256///
257/// ```rust,no_run
258/// # #[macro_use] extern crate celery;
259/// # use anyhow::Result;
260/// # use celery::prelude::*;
261/// # #[tokio::main]
262/// # async fn main() -> Result<()> {
263/// let beat = celery::beat!(
264/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
265/// tasks = [],
266/// task_routes = [],
267/// default_queue = "beat_queue"
268/// ).await?;
269/// # Ok(())
270/// # }
271/// ```
272///
273/// Create a `beat` with a custom scheduler backend:
274///
275/// ```rust,no_run
276/// # #[macro_use] extern crate celery;
277/// # use anyhow::Result;
278/// use celery::prelude::*;
279/// use celery::beat::*;
280/// use std::collections::BinaryHeap;
281///
282/// struct CustomSchedulerBackend {}
283///
284/// impl SchedulerBackend for CustomSchedulerBackend {
285/// fn should_sync(&self) -> bool {
286/// unimplemented!()
287/// }
288///
289/// fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
290/// unimplemented!()
291/// }
292/// }
293///
294/// # #[tokio::main]
295/// # async fn main() -> Result<()> {
296/// let beat = celery::beat!(
297/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
298/// scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
299/// tasks = [],
300/// task_routes = [
301/// "*" => "beat_queue",
302/// ],
303/// ).await?;
304/// # Ok(())
305/// # }
306/// ```
307#[macro_export]
308macro_rules! beat {
309 (
310 broker = $broker_type:ty { $broker_url:expr },
311 tasks = [
312 $( $task_name:expr => {
313 $task_type:ty,
314 schedule = $schedule:expr,
315 args = $args:tt $(,)?
316 } ),* $(,)?
317 ],
318 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
319 $(, $x:ident = $y:expr )* $(,)?
320 ) => {
321 $crate::__beat_internal!(
322 $broker_type { $broker_url },
323 $crate::beat::LocalSchedulerBackend { $crate::beat::LocalSchedulerBackend::new() },
324 [ $ (
325 $task_name => {
326 $task_type,
327 $schedule,
328 $args
329 }
330 ),* ],
331 [ $( $pattern => $queue ),* ],
332 $( $x = $y, )*
333 );
334 };
335 (
336 broker = $broker_type:ty { $broker_url:expr },
337 scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
338 tasks = [
339 $( $task_name:expr => {
340 $task_type:ty,
341 schedule = $schedule:expr,
342 args = $args:tt $(,)?
343 } ),* $(,)?
344 ],
345 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
346 $(, $x:ident = $y:expr )* $(,)?
347 ) => {
348 $crate::__beat_internal!(
349 $broker_type { $broker_url },
350 $scheduler_backend_type { $scheduler_backend },
351 [ $ (
352 $task_name => {
353 $task_type,
354 $schedule,
355 $args
356 }
357 ),* ],
358 [ $( $pattern => $queue ),* ],
359 $( $x = $y, )*
360 );
361 };
362}