celery/
codegen.rs

1pub use celery_rs_codegen::task;
2
3#[doc(hidden)]
4#[macro_export]
5macro_rules! __app_internal {
6    (
7        $broker_type:ty { $broker_url:expr },
8        [ $( $t:ty ),* ],
9        [ $( $pattern:expr => $queue:expr ),* ],
10        $( $x:ident = $y:expr, )*
11    ) => {{
12        async fn _build_app(mut builder: $crate::CeleryBuilder) ->
13            $crate::export::Result<$crate::export::Arc<$crate::Celery>> {
14            let celery: $crate::Celery = builder.build().await?;
15
16            $(
17                celery.register_task::<$t>().await?;
18            )*
19
20            Ok($crate::export::Arc::new(celery))
21        }
22
23        let broker_url = $broker_url;
24
25        let mut builder = $crate::CeleryBuilder::new("celery", &broker_url);
26
27        $(
28            builder = builder.$x($y);
29        )*
30
31        $(
32            builder = builder.task_route($pattern, $queue);
33        )*
34
35        _build_app(builder)
36    }};
37}
38
39#[doc(hidden)]
40#[macro_export]
41macro_rules! __beat_internal {
42    (
43        $broker_type:ty { $broker_url:expr },
44        $scheduler_backend_type:ty { $scheduler_backend:expr },
45        [
46            $( $task_name:expr => {
47                $task_type:ty,
48                $schedule:expr,
49                ( $( $task_arg:expr ),* $(,)?)
50            } ),*
51        ],
52        [ $( $pattern:expr => $queue:expr ),* ],
53        $( $x:ident = $y:expr, )*
54    ) => {{
55        async fn _build_beat(mut builder: $crate::beat::BeatBuilder::<$scheduler_backend_type>) ->
56            $crate::export::BeatResult<$crate::beat::Beat::<$scheduler_backend_type>> {
57            let mut beat = builder.build().await?;
58
59            $(
60                beat.schedule_named_task($task_name.to_string(), <$task_type>::new( $( $task_arg ),* ), $schedule);
61            )*
62
63            Ok(beat)
64        }
65
66        let broker_url = $broker_url;
67
68        let mut builder = $crate::beat::Beat::<$scheduler_backend_type>::custom_builder("beat", &broker_url, $scheduler_backend);
69
70        $(
71            builder = builder.$x($y);
72        )*
73
74        $(
75            builder = builder.task_route($pattern, $queue);
76        )*
77
78        _build_beat(builder)
79    }};
80}
81
82/// A macro for creating a [`Celery`](struct.Celery.html) app.
83///
84/// At a minimum the `app!` macro requires these 3 arguments (in order):
85/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
86/// - `tasks`: a list of tasks to register, and
87/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
88///
89/// # Optional parameters
90///
91/// Following the task routing rules there are a number of other optional parameters that
92/// may appear in arbitrary order (all of which correspond to a method on the
93///   [`CeleryBuilder`](struct.CeleryBuilder.html) struct):
94///
95/// - `default_queue`: Set the
96///   [`CeleryBuilder::default_queue`](struct.CeleryBuilder.html#method.default_queue).
97/// - `prefetch_count`: Set the [`CeleryBuilder::prefect_count`](struct.CeleryBuilder.html#method.prefect_count).
98/// - `heartbeat`: Set the [`CeleryBuilder::heartbeat`](struct.CeleryBuilder.html#method.heartbeat).
99/// - `task_time_limit`: Set an app-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
100/// - `task_hard_time_limit`: Set an app-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
101/// - `task_max_retries`: Set an app-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
102/// - `task_min_retry_delay`: Set an app-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
103/// - `task_max_retry_delay`: Set an app-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
104/// - `task_retry_for_unexpected`: Set an app-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
105/// - `acks_late`: Set an app-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
106/// - `result_backend`: Set the [`CeleryBuilder::result_backend`](struct.CeleryBuilder.html#method.result_backend).
107/// - `broker_connection_timeout`: Set the
108///   [`CeleryBuilder::broker_connection_timeout`](struct.CeleryBuilder.html#method.broker_connection_timeout).
109/// - `broker_connection_retry`: Set the
110///   [`CeleryBuilder::broker_connection_retry`](struct.CeleryBuilder.html#method.broker_connection_retry).
111/// - `broker_connection_max_retries`: Set the
112///   [`CeleryBuilder::broker_connection_max_retries`](struct.CeleryBuilder.html#method.broker_connection_max_retries).
113///
114/// # Examples
115///
116/// ```rust,no_run
117/// # #[macro_use] extern crate celery;
118/// # use anyhow::Result;
119/// use celery::prelude::*;
120///
121/// #[celery::task]
122/// fn add(x: i32, y: i32) -> TaskResult<i32> {
123///     Ok(x + y)
124/// }
125///
126/// # #[tokio::main]
127/// # async fn main() -> Result<()> {
128/// let app = celery::app!(
129///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
130///     tasks = [ add ],
131///     task_routes = [ "*" => "celery" ],
132/// ).await?;
133/// # Ok(())
134/// # }
135/// ```
136///
137/// ```rust,no_run
138/// # #[macro_use] extern crate celery;
139/// # use anyhow::Result;
140/// # use celery::prelude::*;
141/// # #[tokio::main]
142/// # async fn main() -> Result<()> {
143/// let app = celery::app!(
144///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
145///     tasks = [],
146///     task_routes = [],
147///     task_time_limit = 2
148/// ).await?;
149/// # Ok(())
150/// # }
151/// ```
152#[macro_export]
153macro_rules! app {
154    (
155        broker = $broker_type:ty { $broker_url:expr },
156        tasks = [ $( $t:ty ),* $(,)? ],
157        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
158        $(, $x:ident = $y:expr )* $(,)?
159    ) => {
160        $crate::__app_internal!(
161            $broker_type { $broker_url },
162            [ $( $t ),* ],
163            [ $( $pattern => $queue ),* ],
164            $( $x = $y, )*
165        );
166    };
167}
168
169/// A macro for creating a [`Beat`](beat/struct.Beat.html) app.
170///
171/// At a minimum the `beat!` macro requires these 3 arguments (in order):
172/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
173/// - `tasks`: a list of tasks together with their relative schedules (can be empty),
174/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
175///
176/// # Tasks
177///
178/// An entry in the task list has the following components:
179/// - The name of the task,
180/// - The instance of the task to execute,
181/// - The task schedule, which can be one of the provided schedules (e.g., [`CronSchedule`](crate::beat::CronSchedule))
182///   or any other struct that implements [`Schedule`](crate::beat::Schedule),
183/// - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
184///
185/// # Custom scheduler backend
186///
187/// A custom scheduler backend can be given as the second argument.
188/// If not given, the default [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html) will be used.
189///
190/// # Optional parameters
191///
192/// A number of other optional parameters can be passed as last arguments and in arbitrary order
193/// (all of which correspond to a method on the [`BeatBuilder`](beat/struct.BeatBuilder.html) struct):
194///
195/// - `default_queue`: Set the
196///   [`BeatBuilder::default_queue`](beat/struct.BeatBuilder.html#method.default_queue).
197/// - `heartbeat`: Set the [`BeatBuilder::heartbeat`](beat/struct.BeatBuilder.html#method.heartbeat).
198/// - `broker_connection_timeout`: Set the
199///   [`BeatBuilder::broker_connection_timeout`](beat/struct.BeatBuilder.html#method.broker_connection_timeout).
200/// - `broker_connection_retry`: Set the
201///   [`BeatBuilder::broker_connection_retry`](beat/struct.BeatBuilder.html#method.broker_connection_retry).
202/// - `broker_connection_max_retries`: Set the
203///   [`BeatBuilder::broker_connection_max_retries`](beat/struct.BeatBuilder.html#method.broker_connection_max_retries).
204///
205/// # Examples
206///
207/// Create a `beat` which will send all messages to the `celery` queue:
208///
209/// ```rust,no_run
210/// # #[macro_use] extern crate celery;
211/// # use anyhow::Result;
212/// # use celery::prelude::*;
213/// # #[tokio::main]
214/// # async fn main() -> Result<()> {
215/// let beat = celery::beat!(
216///     broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
217///     tasks = [],
218///     task_routes = [ "*" => "celery" ],
219/// ).await?;
220/// # Ok(())
221/// # }
222/// ```
223///
224/// Create a `beat` with a scheduled task:
225///
226/// ```rust,no_run
227/// # #[macro_use] extern crate celery;
228/// # use anyhow::Result;
229/// # use celery::prelude::*;
230/// # use celery::beat::CronSchedule;
231/// # #[tokio::main]
232/// # async fn main() -> Result<()> {
233/// #[celery::task]
234/// fn add(x: i32, y: i32) -> TaskResult<i32> {
235///     // It is enough to provide the implementation to the worker,
236///     // the beat does not need it.
237///     unimplemented!()
238/// }
239///
240/// let beat = celery::beat!(
241///     broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
242///     tasks = [
243///         "add_task" => {
244///             add,
245///             schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
246///             args = (1, 2)
247///         }
248///     ],
249///     task_routes = [ "*" => "celery" ],
250/// ).await?;
251/// # Ok(())
252/// # }
253/// ```
254///
255/// Create a `beat` with optional parameters:
256///
257/// ```rust,no_run
258/// # #[macro_use] extern crate celery;
259/// # use anyhow::Result;
260/// # use celery::prelude::*;
261/// # #[tokio::main]
262/// # async fn main() -> Result<()> {
263/// let beat = celery::beat!(
264///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
265///     tasks = [],
266///     task_routes = [],
267///     default_queue = "beat_queue"
268/// ).await?;
269/// # Ok(())
270/// # }
271/// ```
272///
273/// Create a `beat` with a custom scheduler backend:
274///
275/// ```rust,no_run
276/// # #[macro_use] extern crate celery;
277/// # use anyhow::Result;
278/// use celery::prelude::*;
279/// use celery::beat::*;
280/// use std::collections::BinaryHeap;
281///
282/// struct CustomSchedulerBackend {}
283///
284/// impl SchedulerBackend for CustomSchedulerBackend {
285///     fn should_sync(&self) -> bool {
286///         unimplemented!()
287///     }
288///
289///     fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
290///         unimplemented!()
291///     }
292/// }
293///
294/// # #[tokio::main]
295/// # async fn main() -> Result<()> {
296/// let beat = celery::beat!(
297///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
298///     scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
299///     tasks = [],
300///     task_routes = [
301///         "*" => "beat_queue",
302///     ],
303/// ).await?;
304/// # Ok(())
305/// # }
306/// ```
307#[macro_export]
308macro_rules! beat {
309    (
310        broker = $broker_type:ty { $broker_url:expr },
311        tasks = [
312            $( $task_name:expr => {
313                $task_type:ty,
314                schedule = $schedule:expr,
315                args = $args:tt $(,)?
316            } ),* $(,)?
317        ],
318        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
319        $(, $x:ident = $y:expr )* $(,)?
320    ) => {
321        $crate::__beat_internal!(
322            $broker_type { $broker_url },
323            $crate::beat::LocalSchedulerBackend { $crate::beat::LocalSchedulerBackend::new() },
324            [ $ (
325                $task_name => {
326                    $task_type,
327                    $schedule,
328                    $args
329                }
330            ),* ],
331            [ $( $pattern => $queue ),* ],
332            $( $x = $y, )*
333        );
334    };
335    (
336        broker = $broker_type:ty { $broker_url:expr },
337        scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
338        tasks = [
339            $( $task_name:expr => {
340                $task_type:ty,
341                schedule = $schedule:expr,
342                args = $args:tt $(,)?
343            } ),* $(,)?
344        ],
345        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
346        $(, $x:ident = $y:expr )* $(,)?
347    ) => {
348        $crate::__beat_internal!(
349            $broker_type { $broker_url },
350            $scheduler_backend_type { $scheduler_backend },
351            [ $ (
352                $task_name => {
353                    $task_type,
354                    $schedule,
355                    $args
356                }
357            ),* ],
358            [ $( $pattern => $queue ),* ],
359            $( $x = $y, )*
360        );
361    };
362}