celery/codegen.rs
1pub use celery_rs_codegen::task;
2
3#[doc(hidden)]
4#[macro_export]
5macro_rules! __app_internal {
6 (
7 $broker_type:ty { $broker_url:expr },
8 [ $( $t:ty ),* ],
9 [ $( $pattern:expr => $queue:expr ),* ],
10 $( $x:ident = $y:expr, )*
11 ) => {{
12 async fn _build_app(mut builder: $crate::CeleryBuilder) ->
13 $crate::export::Result<$crate::export::Arc<$crate::Celery>> {
14 let celery: $crate::Celery = builder.build().await?;
15
16 $(
17 celery.register_task::<$t>().await?;
18 )*
19
20 Ok($crate::export::Arc::new(celery))
21 }
22
23 let broker_url = $broker_url;
24
25 let mut builder = $crate::CeleryBuilder::new("celery", &broker_url);
26
27 $(
28 builder = builder.$x($y);
29 )*
30
31 $(
32 builder = builder.task_route($pattern, $queue);
33 )*
34
35 _build_app(builder)
36 }};
37}
38
39#[doc(hidden)]
40#[macro_export]
41macro_rules! __beat_internal {
42 (
43 $broker_type:ty { $broker_url:expr },
44 $scheduler_backend_type:ty { $scheduler_backend:expr },
45 [
46 $( $task_name:expr => {
47 $task_type:ty,
48 $schedule:expr,
49 ( $( $task_arg:expr ),* $(,)?)
50 } ),*
51 ],
52 [ $( $pattern:expr => $queue:expr ),* ],
53 $( $x:ident = $y:expr, )*
54 ) => {{
55 async fn _build_beat(mut builder: $crate::beat::BeatBuilder::<$scheduler_backend_type>) ->
56 $crate::export::BeatResult<$crate::beat::Beat::<$scheduler_backend_type>> {
57 let mut beat = builder.build().await?;
58
59 $(
60 beat.schedule_named_task($task_name.to_string(), <$task_type>::new( $( $task_arg ),* ), $schedule);
61 )*
62
63 Ok(beat)
64 }
65
66 let broker_url = $broker_url;
67
68 let mut builder = $crate::beat::Beat::<$scheduler_backend_type>::custom_builder("beat", &broker_url, $scheduler_backend);
69
70 $(
71 builder = builder.$x($y);
72 )*
73
74 $(
75 builder = builder.task_route($pattern, $queue);
76 )*
77
78 _build_beat(builder)
79 }};
80}
81
82/// A macro for creating a [`Celery`](struct.Celery.html) app.
83///
84/// At a minimum the `app!` macro requires these 3 arguments (in order):
85/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
86/// - `tasks`: a list of tasks to register, and
87/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
88///
89/// # Optional parameters
90///
91/// Following the task routing rules there are a number of other optional parameters that
92/// may appear in arbitrary order (all of which correspond to a method on the
93/// [`CeleryBuilder`](struct.CeleryBuilder.html) struct):
94///
95/// - `default_queue`: Set the
96/// [`CeleryBuilder::default_queue`](struct.CeleryBuilder.html#method.default_queue).
97/// - `prefetch_count`: Set the [`CeleryBuilder::prefect_count`](struct.CeleryBuilder.html#method.prefect_count).
98/// - `heartbeat`: Set the [`CeleryBuilder::heartbeat`](struct.CeleryBuilder.html#method.heartbeat).
99/// - `task_time_limit`: Set an app-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
100/// - `task_hard_time_limit`: Set an app-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
101/// - `task_max_retries`: Set an app-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
102/// - `task_min_retry_delay`: Set an app-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
103/// - `task_max_retry_delay`: Set an app-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
104/// - `task_retry_for_unexpected`: Set an app-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
105/// - `acks_late`: Set an app-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
106/// - `broker_connection_timeout`: Set the
107/// [`CeleryBuilder::broker_connection_timeout`](struct.CeleryBuilder.html#method.broker_connection_timeout).
108/// - `broker_connection_retry`: Set the
109/// [`CeleryBuilder::broker_connection_retry`](struct.CeleryBuilder.html#method.broker_connection_retry).
110/// - `broker_connection_max_retries`: Set the
111/// [`CeleryBuilder::broker_connection_max_retries`](struct.CeleryBuilder.html#method.broker_connection_max_retries).
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// # #[macro_use] extern crate celery;
117/// # use anyhow::Result;
118/// use celery::prelude::*;
119///
120/// #[celery::task]
121/// fn add(x: i32, y: i32) -> TaskResult<i32> {
122/// Ok(x + y)
123/// }
124///
125/// # #[tokio::main]
126/// # async fn main() -> Result<()> {
127/// let app = celery::app!(
128/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
129/// tasks = [ add ],
130/// task_routes = [ "*" => "celery" ],
131/// ).await?;
132/// # Ok(())
133/// # }
134/// ```
135///
136/// ```rust,no_run
137/// # #[macro_use] extern crate celery;
138/// # use anyhow::Result;
139/// # use celery::prelude::*;
140/// # #[tokio::main]
141/// # async fn main() -> Result<()> {
142/// let app = celery::app!(
143/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
144/// tasks = [],
145/// task_routes = [],
146/// task_time_limit = 2
147/// ).await?;
148/// # Ok(())
149/// # }
150/// ```
151#[macro_export]
152macro_rules! app {
153 (
154 broker = $broker_type:ty { $broker_url:expr },
155 tasks = [ $( $t:ty ),* $(,)? ],
156 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
157 $(, $x:ident = $y:expr )* $(,)?
158 ) => {
159 $crate::__app_internal!(
160 $broker_type { $broker_url },
161 [ $( $t ),* ],
162 [ $( $pattern => $queue ),* ],
163 $( $x = $y, )*
164 );
165 };
166}
167
168/// A macro for creating a [`Beat`](beat/struct.Beat.html) app.
169///
170/// At a minimum the `beat!` macro requires these 3 arguments (in order):
171/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
172/// - `tasks`: a list of tasks together with their relative schedules (can be empty),
173/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
174///
175/// # Tasks
176///
177/// An entry in the task list has the following components:
178/// - The name of the task,
179/// - The instance of the task to execute,
180/// - The task schedule, which can be one of the provided schedules (e.g., [`CronSchedule`](crate::beat::CronSchedule))
181/// or any other struct that implements [`Schedule`](crate::beat::Schedule),
182/// - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
183///
184/// # Custom scheduler backend
185///
186/// A custom scheduler backend can be given as the second argument.
187/// If not given, the default [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html) will be used.
188///
189/// # Optional parameters
190///
191/// A number of other optional parameters can be passed as last arguments and in arbitrary order
192/// (all of which correspond to a method on the [`BeatBuilder`](beat/struct.BeatBuilder.html) struct):
193///
194/// - `default_queue`: Set the
195/// [`BeatBuilder::default_queue`](beat/struct.BeatBuilder.html#method.default_queue).
196/// - `heartbeat`: Set the [`BeatBuilder::heartbeat`](beat/struct.BeatBuilder.html#method.heartbeat).
197/// - `broker_connection_timeout`: Set the
198/// [`BeatBuilder::broker_connection_timeout`](beat/struct.BeatBuilder.html#method.broker_connection_timeout).
199/// - `broker_connection_retry`: Set the
200/// [`BeatBuilder::broker_connection_retry`](beat/struct.BeatBuilder.html#method.broker_connection_retry).
201/// - `broker_connection_max_retries`: Set the
202/// [`BeatBuilder::broker_connection_max_retries`](beat/struct.BeatBuilder.html#method.broker_connection_max_retries).
203///
204/// # Examples
205///
206/// Create a `beat` which will send all messages to the `celery` queue:
207///
208/// ```rust,no_run
209/// # #[macro_use] extern crate celery;
210/// # use anyhow::Result;
211/// # use celery::prelude::*;
212/// # #[tokio::main]
213/// # async fn main() -> Result<()> {
214/// let beat = celery::beat!(
215/// broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
216/// tasks = [],
217/// task_routes = [ "*" => "celery" ],
218/// ).await?;
219/// # Ok(())
220/// # }
221/// ```
222///
223/// Create a `beat` with a scheduled task:
224///
225/// ```rust,no_run
226/// # #[macro_use] extern crate celery;
227/// # use anyhow::Result;
228/// # use celery::prelude::*;
229/// # use celery::beat::CronSchedule;
230/// # #[tokio::main]
231/// # async fn main() -> Result<()> {
232/// #[celery::task]
233/// fn add(x: i32, y: i32) -> TaskResult<i32> {
234/// // It is enough to provide the implementation to the worker,
235/// // the beat does not need it.
236/// unimplemented!()
237/// }
238///
239/// let beat = celery::beat!(
240/// broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
241/// tasks = [
242/// "add_task" => {
243/// add,
244/// schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
245/// args = (1, 2)
246/// }
247/// ],
248/// task_routes = [ "*" => "celery" ],
249/// ).await?;
250/// # Ok(())
251/// # }
252/// ```
253///
254/// Create a `beat` with optional parameters:
255///
256/// ```rust,no_run
257/// # #[macro_use] extern crate celery;
258/// # use anyhow::Result;
259/// # use celery::prelude::*;
260/// # #[tokio::main]
261/// # async fn main() -> Result<()> {
262/// let beat = celery::beat!(
263/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
264/// tasks = [],
265/// task_routes = [],
266/// default_queue = "beat_queue"
267/// ).await?;
268/// # Ok(())
269/// # }
270/// ```
271///
272/// Create a `beat` with a custom scheduler backend:
273///
274/// ```rust,no_run
275/// # #[macro_use] extern crate celery;
276/// # use anyhow::Result;
277/// use celery::prelude::*;
278/// use celery::beat::*;
279/// use std::collections::BinaryHeap;
280///
281/// struct CustomSchedulerBackend {}
282///
283/// impl SchedulerBackend for CustomSchedulerBackend {
284/// fn should_sync(&self) -> bool {
285/// unimplemented!()
286/// }
287///
288/// fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
289/// unimplemented!()
290/// }
291/// }
292///
293/// # #[tokio::main]
294/// # async fn main() -> Result<()> {
295/// let beat = celery::beat!(
296/// broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
297/// scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
298/// tasks = [],
299/// task_routes = [
300/// "*" => "beat_queue",
301/// ],
302/// ).await?;
303/// # Ok(())
304/// # }
305/// ```
306#[macro_export]
307macro_rules! beat {
308 (
309 broker = $broker_type:ty { $broker_url:expr },
310 tasks = [
311 $( $task_name:expr => {
312 $task_type:ty,
313 schedule = $schedule:expr,
314 args = $args:tt $(,)?
315 } ),* $(,)?
316 ],
317 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
318 $(, $x:ident = $y:expr )* $(,)?
319 ) => {
320 $crate::__beat_internal!(
321 $broker_type { $broker_url },
322 $crate::beat::LocalSchedulerBackend { $crate::beat::LocalSchedulerBackend::new() },
323 [ $ (
324 $task_name => {
325 $task_type,
326 $schedule,
327 $args
328 }
329 ),* ],
330 [ $( $pattern => $queue ),* ],
331 $( $x = $y, )*
332 );
333 };
334 (
335 broker = $broker_type:ty { $broker_url:expr },
336 scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
337 tasks = [
338 $( $task_name:expr => {
339 $task_type:ty,
340 schedule = $schedule:expr,
341 args = $args:tt $(,)?
342 } ),* $(,)?
343 ],
344 task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
345 $(, $x:ident = $y:expr )* $(,)?
346 ) => {
347 $crate::__beat_internal!(
348 $broker_type { $broker_url },
349 $scheduler_backend_type { $scheduler_backend },
350 [ $ (
351 $task_name => {
352 $task_type,
353 $schedule,
354 $args
355 }
356 ),* ],
357 [ $( $pattern => $queue ),* ],
358 $( $x = $y, )*
359 );
360 };
361}