1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
//! A Rust implementation of [Celery](http://www.celeryproject.org/) for producing and consuming
//! asynchronous tasks with a distributed message queue.
//!
//! # Examples
//!
//! Define tasks by decorating functions with the [`macro@task`] attribute:
//!
//! ```rust
//! use celery::prelude::*;
//!
//! #[celery::task]
//! fn add(x: i32, y: i32) -> TaskResult<i32> {
//! Ok(x + y)
//! }
//! ```
//!
//! Then create a [`Celery`] app with the [`app!`]
//! macro and register your tasks with it:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! # Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! let my_app = celery::app!(
//! broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//! tasks = [add],
//! task_routes = [],
//! ).await?;
//! # Ok(())
//! # }
//! ```
//!
//! The [`Celery`] app can be used as either a producer or consumer (worker). To send tasks to a
//! queue for a worker to consume, use the [`Celery::send_task`] method:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! # Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let my_app = celery::app!(
//! # broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//! # tasks = [add],
//! # task_routes = [],
//! # ).await?;
//! my_app.send_task(add::new(1, 2)).await?;
//! # Ok(())
//! # }
//! ```
//!
//! And to act as a worker to consume tasks sent to a queue by a producer, use the
//! [`Celery::consume`] method:
//!
//! ```rust,no_run
//! # use anyhow::Result;
//! # use celery::prelude::*;
//! # #[celery::task]
//! # fn add(x: i32, y: i32) -> celery::task::TaskResult<i32> {
//! # Ok(x + y)
//! # }
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let my_app = celery::app!(
//! # broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
//! # tasks = [add],
//! # task_routes = [],
//! # ).await?;
//! my_app.consume().await?;
//! # Ok(())
//! # }
//! ```
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/rusty-celery/rusty-celery/main/img/favicon.ico"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/rusty-celery/rusty-celery/main/img/rusty-celery-logo-only.png"
)]
mod app;
mod routing;
pub use app::{Celery, CeleryBuilder};
pub mod beat;
pub mod broker;
pub mod error;
pub mod prelude;
pub mod protocol;
pub mod task;
#[cfg(feature = "codegen")]
mod codegen;
/// A procedural macro for generating a [`Task`](crate::task::Task) from a function.
///
/// If the annotated function has a return value, the return value must be a
/// [`TaskResult<R>`](task/type.TaskResult.html).
///
/// # Parameters
///
/// - `name`: The name to use when registering the task. Should be unique. If not given the name
/// will be set to the name of the function being decorated.
/// - `time_limit`: Set a task-level [`TaskOptions::time_limit`](task/struct.TaskOptions.html#structfield.time_limit).
/// - `hard_time_limit`: Set a task-level [`TaskOptions::hard_time_limit`](task/struct.TaskOptions.html#structfield.hard_time_limit).
/// - `max_retries`: Set a task-level [`TaskOptions::max_retries`](task/struct.TaskOptions.html#structfield.max_retries).
/// - `min_retry_delay`: Set a task-level [`TaskOptions::min_retry_delay`](task/struct.TaskOptions.html#structfield.min_retry_delay).
/// - `max_retry_delay`: Set a task-level [`TaskOptions::max_retry_delay`](task/struct.TaskOptions.html#structfield.max_retry_delay).
/// - `retry_for_unexpected`: Set a task-level [`TaskOptions::retry_for_unexpected`](task/struct.TaskOptions.html#structfield.retry_for_unexpected).
/// - `acks_late`: Set a task-level [`TaskOptions::acks_late`](task/struct.TaskOptions.html#structfield.acks_late).
/// - `content_type`: Set a task-level [`TaskOptions::content_type`](task/struct.TaskOptions.html#structfield.content_type).
/// - `bind`: A bool. If true, the task will be run like an instance method and so the function's
/// first argument should be a reference to `Self`. Note however that Rust won't allow you to call
/// the argument `self`. Instead, you could use `task` or just `t`.
/// - `on_failure`: An async callback function to run when the task fails. Should accept a reference to
/// a task instance and a reference to a [`TaskError`](error/enum.TaskError.html).
/// - `on_success`: An async callback function to run when the task succeeds. Should accept a reference to
/// a task instance and a reference to the value returned by the task.
///
/// For more information see the [tasks chapter](https://rusty-celery.github.io/guide/defining-tasks.html)
/// in the Rusty Celery Book.
///
/// ## Examples
///
/// Create a task named `add` with all of the default options:
///
/// ```rust
/// use celery::prelude::*;
///
/// #[celery::task]
/// fn add(x: i32, y: i32) -> TaskResult<i32> {
/// Ok(x + y)
/// }
/// ```
///
/// Use a name different from the function name:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(name = "sum")]
/// fn add(x: i32, y: i32) -> TaskResult<i32> {
/// Ok(x + y)
/// }
/// ```
///
/// Customize the default retry behavior:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(
/// time_limit = 3,
/// max_retries = 100,
/// min_retry_delay = 1,
/// max_retry_delay = 60,
/// )]
/// async fn io_task() -> TaskResult<()> {
/// // Do some async IO work that could possible fail, such as an HTTP request...
/// Ok(())
/// }
/// ```
///
/// Bind the function to the task instance so it runs like an instance method:
///
/// ```rust
/// # use celery::prelude::*;
/// #[celery::task(bind = true)]
/// fn bound_task(task: &Self) {
/// println!("Hello, World! From {}", task.name());
/// }
/// ```
///
/// Run custom callbacks on failure and on success:
///
/// ```rust
/// # use celery::task::{Task, TaskResult};
/// # use celery::error::TaskError;
/// #[celery::task(on_failure = failure_callback, on_success = success_callback)]
/// fn task_with_callbacks() {}
///
/// async fn failure_callback<T: Task>(task: &T, err: &TaskError) {
/// println!("{} failed with {:?}", task.name(), err);
/// }
///
/// async fn success_callback<T: Task>(task: &T, ret: &T::Returns) {
/// println!("{} succeeded: {:?}", task.name(), ret);
/// }
/// ```
#[cfg(feature = "codegen")]
pub use codegen::task;
#[cfg(feature = "codegen")]
#[doc(hidden)]
pub mod export;
#[cfg(feature = "codegen")]
extern crate async_trait;
#[cfg(feature = "codegen")]
extern crate serde;
#[cfg(feature = "codegen")]
extern crate tokio;