1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
//! A Rust implementation of [Celery](http://www.celeryproject.org/) for producing and consuming
//! asynchronous tasks with a distributed message queue.
//!
//! # Examples
//!
//! Define tasks by decorating functions with the [`macro@task`] attribute:
//!
//! ```rust
//! use celery::prelude::*;
//!
//! #[celery::task]
//! fn add(x: i32, y: i32) -> TaskResult<i32> {
//!     Ok(x + y)
//! }
//! ```
//!
//! Then create a [`Celery`] app with the [`app!`]
//! macro and register your tasks with it:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! #     Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! let my_app = celery::app!(
//!     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//!     tasks = [add],
//!     task_routes = [],
//! ).await?;
//! # Ok(())
//! # }
//! ```
//!
//! The [`Celery`] app can be used as either a producer or consumer (worker). To send tasks to a
//! queue for a worker to consume, use the [`Celery::send_task`] method:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! #     Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let my_app = celery::app!(
//! #     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//! #     tasks = [add],
//! #     task_routes = [],
//! # ).await?;
//! my_app.send_task(add::new(1, 2)).await?;
//! # Ok(())
//! # }
//! ```
//!
//! And to act as a worker to consume tasks sent to a queue by a producer, use the
//! [`Celery::consume`] method:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! #     Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let my_app = celery::app!(
//! #     broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//! #     tasks = [add],
//! #     task_routes = [],
//! # ).await?;
//! my_app.consume().await?;
//! # Ok(())
//! # }
//! ```

#![doc(
    html_favicon_url = "https://raw.githubusercontent.com/rusty-celery/rusty-celery/main/img/favicon.ico"
)]
#![doc(
    html_logo_url = "https://raw.githubusercontent.com/rusty-celery/rusty-celery/main/img/rusty-celery-logo-only.png"
)]

mod app;
mod routing;
pub use app::{Celery, CeleryBuilder};
pub mod beat;
pub mod broker;
pub mod error;
pub mod prelude;
pub mod protocol;
pub mod task;

#[cfg(feature = "codegen")]
mod codegen;

/// A procedural macro for generating a [`Task`](crate::task::Task) from a function.
///
/// If the annotated function has a return value, the return value must be a
/// [`TaskResult<R>`](task/type.TaskResult.html).
///
/// # Parameters
///
/// - `name`: The name to use when registering the task. Should be unique. If not given the name
/// will be set to the name of the function being decorated.
/// - `time_limit`: Set a task-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
/// - `hard_time_limit`: Set a task-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
/// - `max_retries`: Set a task-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
/// - `min_retry_delay`: Set a task-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
/// - `max_retry_delay`: Set a task-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
/// - `retry_for_unexpected`: Set a task-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
/// - `acks_late`: Set a task-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
/// - `content_type`: Set a task-level [`TaskOptions::content_type`](task/struct.TaskOptions.html#structfield.content_type).
/// - `bind`: A bool. If true, the task will be run like an instance method and so the function's
/// first argument should be a reference to `Self`. Note however that Rust won't allow you to call
/// the argument `self`. Instead, you could use `task` or just `t`.
/// - `on_failure`: An async callback function to run when the task fails. Should accept a reference to
/// a task instance and a reference to a [`TaskError`](error/enum.TaskError.html).
/// - `on_success`: An async callback function to run when the task succeeds. Should accept a reference to
/// a task instance and a reference to the value returned by the task.
///
/// For more information see the [tasks chapter](https://rusty-celery.github.io/guide/defining-tasks.html)
/// in the Rusty Celery Book.
///
/// ## Examples
///
/// Create a task named `add` with all of the default options:
///
/// ```rust
/// use celery::prelude::*;
///
/// #[celery::task]
/// fn add(x: i32, y: i32) -> TaskResult<i32> {
///     Ok(x + y)
/// }
/// ```
///
/// Use a name different from the function name:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(name = "sum")]
/// fn add(x: i32, y: i32) -> TaskResult<i32> {
///     Ok(x + y)
/// }
/// ```
///
/// Customize the default retry behavior:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(
///     time_limit = 3,
///     max_retries = 100,
///     min_retry_delay = 1,
///     max_retry_delay = 60,
/// )]
/// async fn io_task() -> TaskResult<()> {
///     // Do some async IO work that could possible fail, such as an HTTP request...
///     Ok(())
/// }
/// ```
///
/// Bind the function to the task instance so it runs like an instance method:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(bind = true)]
/// fn bound_task(task: &Self) {
///     println!("Hello, World! From {}", task.name());
/// }
/// ```
///
/// Run custom callbacks on failure and on success:
///
/// ```rust
/// # use celery::task::{Task, TaskResult};
/// # use celery::error::TaskError;
/// #[celery::task(on_failure = failure_callback, on_success = success_callback)]
/// fn task_with_callbacks() {}
///
/// async fn failure_callback<T: Task>(task: &T, err: &TaskError) {
///     println!("{} failed with {:?}", task.name(), err);
/// }
///
/// async fn success_callback<T: Task>(task: &T, ret: &T::Returns) {
///     println!("{} succeeded: {:?}", task.name(), ret);
/// }
/// ```
#[cfg(feature = "codegen")]
pub use codegen::task;

#[cfg(feature = "codegen")]
#[doc(hidden)]
pub mod export;

#[cfg(feature = "codegen")]
extern crate async_trait;

#[cfg(feature = "codegen")]
extern crate serde;

#[cfg(feature = "codegen")]
extern crate tokio;