celery/
codegen.rs

1pub use celery_rs_codegen::task;
2
3#[doc(hidden)]
4#[macro_export]
5macro_rules! __app_internal {
6    (
7        $broker_type:ty { $broker_url:expr },
8        [ $( $t:ty ),* ],
9        [ $( $pattern:expr => $queue:expr ),* ],
10        $( $x:ident = $y:expr, )*
11    ) => {{
12        async fn _build_app(mut builder: $crate::CeleryBuilder) ->
13            $crate::export::Result<$crate::export::Arc<$crate::Celery>> {
14            let celery: $crate::Celery = builder.build().await?;
15
16            $(
17                celery.register_task::<$t>().await?;
18            )*
19
20            Ok($crate::export::Arc::new(celery))
21        }
22
23        let broker_url = $broker_url;
24
25        let mut builder = $crate::CeleryBuilder::new("celery", &broker_url);
26
27        $(
28            builder = builder.$x($y);
29        )*
30
31        $(
32            builder = builder.task_route($pattern, $queue);
33        )*
34
35        _build_app(builder)
36    }};
37}
38
39#[doc(hidden)]
40#[macro_export]
41macro_rules! __beat_internal {
42    (
43        $broker_type:ty { $broker_url:expr },
44        $scheduler_backend_type:ty { $scheduler_backend:expr },
45        [
46            $( $task_name:expr => {
47                $task_type:ty,
48                $schedule:expr,
49                ( $( $task_arg:expr ),* $(,)?)
50            } ),*
51        ],
52        [ $( $pattern:expr => $queue:expr ),* ],
53        $( $x:ident = $y:expr, )*
54    ) => {{
55        async fn _build_beat(mut builder: $crate::beat::BeatBuilder::<$scheduler_backend_type>) ->
56            $crate::export::BeatResult<$crate::beat::Beat::<$scheduler_backend_type>> {
57            let mut beat = builder.build().await?;
58
59            $(
60                beat.schedule_named_task($task_name.to_string(), <$task_type>::new( $( $task_arg ),* ), $schedule);
61            )*
62
63            Ok(beat)
64        }
65
66        let broker_url = $broker_url;
67
68        let mut builder = $crate::beat::Beat::<$scheduler_backend_type>::custom_builder("beat", &broker_url, $scheduler_backend);
69
70        $(
71            builder = builder.$x($y);
72        )*
73
74        $(
75            builder = builder.task_route($pattern, $queue);
76        )*
77
78        _build_beat(builder)
79    }};
80}
81
82/// A macro for creating a [`Celery`](struct.Celery.html) app.
83///
84/// At a minimum the `app!` macro requires these 3 arguments (in order):
85/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
86/// - `tasks`: a list of tasks to register, and
87/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
88///
89/// # Optional parameters
90///
91/// Following the task routing rules there are a number of other optional parameters that
92/// may appear in arbitrary order (all of which correspond to a method on the
93///   [`CeleryBuilder`](struct.CeleryBuilder.html) struct):
94///
95/// - `default_queue`: Set the
96///   [`CeleryBuilder::default_queue`](struct.CeleryBuilder.html#method.default_queue).
97/// - `prefetch_count`: Set the [`CeleryBuilder::prefect_count`](struct.CeleryBuilder.html#method.prefect_count).
98/// - `heartbeat`: Set the [`CeleryBuilder::heartbeat`](struct.CeleryBuilder.html#method.heartbeat).
99/// - `task_time_limit`: Set an app-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
100/// - `task_hard_time_limit`: Set an app-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
101/// - `task_max_retries`: Set an app-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
102/// - `task_min_retry_delay`: Set an app-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
103/// - `task_max_retry_delay`: Set an app-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
104/// - `task_retry_for_unexpected`: Set an app-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
105/// - `acks_late`: Set an app-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
106/// - `broker_connection_timeout`: Set the
107///   [`CeleryBuilder::broker_connection_timeout`](struct.CeleryBuilder.html#method.broker_connection_timeout).
108/// - `broker_connection_retry`: Set the
109///   [`CeleryBuilder::broker_connection_retry`](struct.CeleryBuilder.html#method.broker_connection_retry).
110/// - `broker_connection_max_retries`: Set the
111///   [`CeleryBuilder::broker_connection_max_retries`](struct.CeleryBuilder.html#method.broker_connection_max_retries).
112///
113/// # Examples
114///
115/// ```rust,no_run
116/// # #[macro_use] extern crate celery;
117/// # use anyhow::Result;
118/// use celery::prelude::*;
119///
120/// #[celery::task]
121/// fn add(x: i32, y: i32) -> TaskResult<i32> {
122///     Ok(x + y)
123/// }
124///
125/// # #[tokio::main]
126/// # async fn main() -> Result<()> {
127/// let app = celery::app!(
128///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
129///     tasks = [ add ],
130///     task_routes = [ "*" => "celery" ],
131/// ).await?;
132/// # Ok(())
133/// # }
134/// ```
135///
136/// ```rust,no_run
137/// # #[macro_use] extern crate celery;
138/// # use anyhow::Result;
139/// # use celery::prelude::*;
140/// # #[tokio::main]
141/// # async fn main() -> Result<()> {
142/// let app = celery::app!(
143///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
144///     tasks = [],
145///     task_routes = [],
146///     task_time_limit = 2
147/// ).await?;
148/// # Ok(())
149/// # }
150/// ```
151#[macro_export]
152macro_rules! app {
153    (
154        broker = $broker_type:ty { $broker_url:expr },
155        tasks = [ $( $t:ty ),* $(,)? ],
156        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
157        $(, $x:ident = $y:expr )* $(,)?
158    ) => {
159        $crate::__app_internal!(
160            $broker_type { $broker_url },
161            [ $( $t ),* ],
162            [ $( $pattern => $queue ),* ],
163            $( $x = $y, )*
164        );
165    };
166}
167
168/// A macro for creating a [`Beat`](beat/struct.Beat.html) app.
169///
170/// At a minimum the `beat!` macro requires these 3 arguments (in order):
171/// - `broker`: a broker type (currently only AMQP is supported) with an expression for the broker URL in brackets,
172/// - `tasks`: a list of tasks together with their relative schedules (can be empty),
173/// - `task_routes`: a list of routing rules in the form of `pattern => queue`.
174///
175/// # Tasks
176///
177/// An entry in the task list has the following components:
178/// - The name of the task,
179/// - The instance of the task to execute,
180/// - The task schedule, which can be one of the provided schedules (e.g., [`CronSchedule`](crate::beat::CronSchedule))
181///   or any other struct that implements [`Schedule`](crate::beat::Schedule),
182/// - A list of arguments for the task in the form of a comma-separated list surrounded by parenthesis.
183///
184/// # Custom scheduler backend
185///
186/// A custom scheduler backend can be given as the second argument.
187/// If not given, the default [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html) will be used.
188///
189/// # Optional parameters
190///
191/// A number of other optional parameters can be passed as last arguments and in arbitrary order
192/// (all of which correspond to a method on the [`BeatBuilder`](beat/struct.BeatBuilder.html) struct):
193///
194/// - `default_queue`: Set the
195///   [`BeatBuilder::default_queue`](beat/struct.BeatBuilder.html#method.default_queue).
196/// - `heartbeat`: Set the [`BeatBuilder::heartbeat`](beat/struct.BeatBuilder.html#method.heartbeat).
197/// - `broker_connection_timeout`: Set the
198///   [`BeatBuilder::broker_connection_timeout`](beat/struct.BeatBuilder.html#method.broker_connection_timeout).
199/// - `broker_connection_retry`: Set the
200///   [`BeatBuilder::broker_connection_retry`](beat/struct.BeatBuilder.html#method.broker_connection_retry).
201/// - `broker_connection_max_retries`: Set the
202///   [`BeatBuilder::broker_connection_max_retries`](beat/struct.BeatBuilder.html#method.broker_connection_max_retries).
203///
204/// # Examples
205///
206/// Create a `beat` which will send all messages to the `celery` queue:
207///
208/// ```rust,no_run
209/// # #[macro_use] extern crate celery;
210/// # use anyhow::Result;
211/// # use celery::prelude::*;
212/// # #[tokio::main]
213/// # async fn main() -> Result<()> {
214/// let beat = celery::beat!(
215///     broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
216///     tasks = [],
217///     task_routes = [ "*" => "celery" ],
218/// ).await?;
219/// # Ok(())
220/// # }
221/// ```
222///
223/// Create a `beat` with a scheduled task:
224///
225/// ```rust,no_run
226/// # #[macro_use] extern crate celery;
227/// # use anyhow::Result;
228/// # use celery::prelude::*;
229/// # use celery::beat::CronSchedule;
230/// # #[tokio::main]
231/// # async fn main() -> Result<()> {
232/// #[celery::task]
233/// fn add(x: i32, y: i32) -> TaskResult<i32> {
234///     // It is enough to provide the implementation to the worker,
235///     // the beat does not need it.
236///     unimplemented!()
237/// }
238///
239/// let beat = celery::beat!(
240///     broker = AMQPBroker{ std::env::var("AMQP_ADDR").unwrap() },
241///     tasks = [
242///         "add_task" => {
243///             add,
244///             schedule = CronSchedule::from_string("*/3 * * * mon-fri")?,
245///             args = (1, 2)
246///         }
247///     ],
248///     task_routes = [ "*" => "celery" ],
249/// ).await?;
250/// # Ok(())
251/// # }
252/// ```
253///
254/// Create a `beat` with optional parameters:
255///
256/// ```rust,no_run
257/// # #[macro_use] extern crate celery;
258/// # use anyhow::Result;
259/// # use celery::prelude::*;
260/// # #[tokio::main]
261/// # async fn main() -> Result<()> {
262/// let beat = celery::beat!(
263///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
264///     tasks = [],
265///     task_routes = [],
266///     default_queue = "beat_queue"
267/// ).await?;
268/// # Ok(())
269/// # }
270/// ```
271///
272/// Create a `beat` with a custom scheduler backend:
273///
274/// ```rust,no_run
275/// # #[macro_use] extern crate celery;
276/// # use anyhow::Result;
277/// use celery::prelude::*;
278/// use celery::beat::*;
279/// use std::collections::BinaryHeap;
280///
281/// struct CustomSchedulerBackend {}
282///
283/// impl SchedulerBackend for CustomSchedulerBackend {
284///     fn should_sync(&self) -> bool {
285///         unimplemented!()
286///     }
287///
288///     fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
289///         unimplemented!()
290///     }
291/// }
292///
293/// # #[tokio::main]
294/// # async fn main() -> Result<()> {
295/// let beat = celery::beat!(
296///     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
297///     scheduler_backend = CustomSchedulerBackend { CustomSchedulerBackend {} },
298///     tasks = [],
299///     task_routes = [
300///         "*" => "beat_queue",
301///     ],
302/// ).await?;
303/// # Ok(())
304/// # }
305/// ```
306#[macro_export]
307macro_rules! beat {
308    (
309        broker = $broker_type:ty { $broker_url:expr },
310        tasks = [
311            $( $task_name:expr => {
312                $task_type:ty,
313                schedule = $schedule:expr,
314                args = $args:tt $(,)?
315            } ),* $(,)?
316        ],
317        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
318        $(, $x:ident = $y:expr )* $(,)?
319    ) => {
320        $crate::__beat_internal!(
321            $broker_type { $broker_url },
322            $crate::beat::LocalSchedulerBackend { $crate::beat::LocalSchedulerBackend::new() },
323            [ $ (
324                $task_name => {
325                    $task_type,
326                    $schedule,
327                    $args
328                }
329            ),* ],
330            [ $( $pattern => $queue ),* ],
331            $( $x = $y, )*
332        );
333    };
334    (
335        broker = $broker_type:ty { $broker_url:expr },
336        scheduler_backend = $scheduler_backend_type:ty { $scheduler_backend:expr },
337        tasks = [
338            $( $task_name:expr => {
339                $task_type:ty,
340                schedule = $schedule:expr,
341                args = $args:tt $(,)?
342            } ),* $(,)?
343        ],
344        task_routes = [ $( $pattern:expr => $queue:expr ),* $(,)? ]
345        $(, $x:ident = $y:expr )* $(,)?
346    ) => {
347        $crate::__beat_internal!(
348            $broker_type { $broker_url },
349            $scheduler_backend_type { $scheduler_backend },
350            [ $ (
351                $task_name => {
352                    $task_type,
353                    $schedule,
354                    $args
355                }
356            ),* ],
357            [ $( $pattern => $queue ),* ],
358            $( $x = $y, )*
359        );
360    };
361}