celery/
error.rs

1//! All error types used throughout the library.
2
3use chrono::{DateTime, Utc};
4use deadpool_redis::PoolError;
5use serde::{Deserialize, Serialize};
6use thiserror::Error;
7
8/// Errors that can occur while creating or using a `Celery` app.
9#[derive(Error, Debug)]
10pub enum CeleryError {
11    /// Raised when `Celery::consume_from` is given an empty array of queues.
12    #[error("at least one queue required to consume from")]
13    NoQueueToConsume,
14
15    /// Forced shutdown.
16    #[error("forced shutdown")]
17    ForcedShutdown,
18
19    /// Any other broker-level error that could happen when initializing or with an open
20    /// connection.
21    #[error("broker error")]
22    BrokerError(#[from] BrokerError),
23
24    /// Any other IO error that could occur.
25    #[error("IO error")]
26    IoError(#[from] std::io::Error),
27
28    /// A protocol error.
29    #[error("protocol error")]
30    ProtocolError(#[from] ProtocolError),
31
32    /// There is already a task registered to this name.
33    #[error("there is already a task registered as '{0}'")]
34    TaskRegistrationError(String),
35
36    #[error("received unregistered task {0}")]
37    UnregisteredTaskError(String),
38
39    /// Any result backend related error.
40    #[error("backend error")]
41    BackendError(#[from] BackendError),
42}
43
44/// Errors that can occur while creating or using a `Beat` app.
45#[derive(Error, Debug)]
46pub enum BeatError {
47    /// Any broker-level error.
48    #[error("broker error")]
49    BrokerError(#[from] BrokerError),
50
51    /// A protocol error.
52    #[error("protocol error")]
53    ProtocolError(#[from] ProtocolError),
54
55    /// An error with a task schedule.
56    #[error("task schedule error")]
57    ScheduleError(#[from] ScheduleError),
58
59    /// Redis-related error.
60    #[error("redis error: {0}")]
61    RedisError(String),
62}
63
64/// Errors that can occur while storing or retrieving task results.
65#[derive(Error, Debug)]
66pub enum BackendError {
67    /// Raised when a result backend was requested but not configured.
68    #[error("result backend not configured")]
69    NotConfigured,
70
71    /// Raised when waiting for a task exceeded the provided timeout.
72    #[error("timeout waiting for task result")]
73    Timeout,
74
75    /// Raised when deserializing or serializing backend payloads failed.
76    #[error("backend serialization error: {0}")]
77    Serialization(#[from] serde_json::Error),
78
79    /// Raised when a Redis command fails.
80    #[error("redis error: {0}")]
81    Redis(#[from] redis::RedisError),
82
83    /// Raised when acquiring a connection from the Redis pool failed.
84    #[error("redis pool error: {0}")]
85    Pool(#[from] PoolError),
86
87    /// Raised when constructing the Redis pool failed.
88    #[error("redis pool creation error: {0}")]
89    PoolCreationError(String),
90
91    /// Raised when the task finished with a failure state.
92    #[error("task failed: {0}")]
93    TaskFailed(String),
94}
95
96/// Errors that are related to task schedules.
97#[derive(Error, Debug)]
98pub enum ScheduleError {
99    /// Error that can occur while creating a cron schedule.
100    #[error("invalid cron schedule: {0}")]
101    CronScheduleError(String),
102}
103
104/// Errors that can occur at the task level.
105#[derive(Error, Debug, Serialize, Deserialize)]
106pub enum TaskError {
107    /// An error that is expected to happen every once in a while.
108    ///
109    /// These errors will only be logged at the `WARN` level and will always trigger a task
110    /// retry unless [`max_retries`](../task/struct.TaskOptions.html#structfield.max_retries)
111    /// is set to 0 (or max retries is exceeded).
112    ///
113    /// A typical example is a task that makes an HTTP request to an external service.
114    /// If that service is temporarily unavailable the task should raise an `ExpectedError`.
115    ///
116    /// Tasks are always retried with capped exponential backoff.
117    #[error("task raised expected error: {0}")]
118    ExpectedError(String),
119
120    /// Should be used when a task encounters an error that is unexpected.
121    ///
122    /// These errors will always be logged at the `ERROR` level. The retry behavior
123    /// when this error is encountered is determined by the
124    /// [`TaskOptions::retry_for_unexpected`](../task/struct.TaskOptions.html#structfield.retry_for_unexpected)
125    /// setting.
126    #[error("task raised unexpected error: {0}")]
127    UnexpectedError(String),
128
129    /// Raised when a task runs over its time limit specified by the
130    /// [`TaskOptions::time_limit`](../task/struct.TaskOptions.html#structfield.time_limit) setting.
131    ///
132    /// These errors are logged at the `ERROR` level but are otherwise treated like
133    /// `ExpectedError`s in that they will trigger a retry when `max_retries` is anything but 0.
134    ///
135    /// Typically a task implementation doesn't need to return these errors directly
136    /// because they will be raised automatically when the task runs over it's `time_limit`,
137    /// provided the task yields control at some point (like with non-blocking IO).
138    #[error("task timed out")]
139    TimeoutError,
140
141    /// A task can return this error variant to manually trigger a retry.
142    ///
143    /// This error variant should generally not be used directly. Instead, you should
144    /// call the `Task::retry_with_countdown` or `Task::retry_with_eta` trait methods
145    /// to manually trigger a retry from within a task.
146    #[error("task retry triggered")]
147    Retry(Option<DateTime<Utc>>),
148}
149
150/// Errors that can occur while tracing a task.
151#[derive(Error, Debug)]
152pub(crate) enum TraceError {
153    /// Raised when a task throws an error while executing.
154    #[error("task failed")]
155    TaskError(TaskError),
156
157    /// Raised when an expired task is received.
158    #[error("task expired")]
159    ExpirationError,
160
161    /// Raised when a task should be retried.
162    #[error("retrying task")]
163    Retry(Option<DateTime<Utc>>),
164}
165
166/// Errors that can occur at the broker level.
167#[derive(Error, Debug)]
168pub enum BrokerError {
169    /// Raised when a broker URL can't be parsed.
170    #[error("invalid broker URL '{0}'")]
171    InvalidBrokerUrl(String),
172
173    /// The queue you're attempting to use has not been defined.
174    #[error("unknown queue '{0}'")]
175    UnknownQueue(String),
176
177    /// Broker is disconnected.
178    #[error("broker not connected")]
179    NotConnected,
180
181    /// Any IO error that could occur.
182    #[error("IO error \"{0}\"")]
183    IoError(#[from] std::io::Error),
184
185    /// Deserialize error
186    #[error("Deserialize error \"{0}\"")]
187    DeserializeError(#[from] serde_json::Error),
188
189    /// Routing pattern error
190    #[error("Routing pattern error \"{0}\"")]
191    BadRoutingPattern(#[from] BadRoutingPattern),
192
193    /// Protocol error
194    #[error("Protocol error \"{0}\"")]
195    ProtocolError(#[from] ProtocolError),
196
197    /// Any other AMQP error that could happen.
198    #[error("AMQP error \"{0}\"")]
199    AMQPError(#[from] lapin::Error),
200
201    /// Any other Redis error that could happen.
202    #[error("Redis error \"{0}\"")]
203    RedisError(#[from] redis::RedisError),
204}
205
206impl BrokerError {
207    pub fn is_connection_error(&self) -> bool {
208        match self {
209            BrokerError::IoError(_) | BrokerError::NotConnected => true,
210            BrokerError::AMQPError(err) => matches!(
211                err,
212                // In lapin 3.6.0, error types have been restructured
213                // We'll treat all lapin errors as connection-related for now
214                _
215            ),
216            BrokerError::RedisError(err) => {
217                err.is_connection_dropped() || err.is_connection_refusal()
218            }
219            _ => false,
220        }
221    }
222}
223
224/// An invalid glob pattern for a routing rule.
225#[derive(Error, Debug)]
226#[error("invalid glob routing rule")]
227pub struct BadRoutingPattern(#[from] globset::Error);
228
229/// Errors that can occur due to messages not conforming to the protocol.
230#[derive(Error, Debug)]
231pub enum ProtocolError {
232    /// Raised when a required message property is missing.
233    #[error("missing required property '{0}'")]
234    MissingRequiredProperty(String),
235
236    /// Raised when the headers are missing altogether.
237    #[error("missing headers")]
238    MissingHeaders,
239
240    /// Raised when a required message header is missing.
241    #[error("missing required property '{0}'")]
242    MissingRequiredHeader(String),
243
244    /// Raised when serializing or de-serializing a message body fails.
245    #[error("message body serialization error")]
246    BodySerializationError(#[from] ContentTypeError),
247
248    /// Raised when field value is invalid.
249    #[error("invalid property '{0}'")]
250    InvalidProperty(String),
251}
252
253impl From<serde_json::Error> for ProtocolError {
254    fn from(err: serde_json::Error) -> Self {
255        Self::from(ContentTypeError::from(err))
256    }
257}
258
259#[cfg(any(test, feature = "extra_content_types"))]
260impl From<serde_yaml::Error> for ProtocolError {
261    fn from(err: serde_yaml::Error) -> Self {
262        Self::from(ContentTypeError::from(err))
263    }
264}
265
266#[cfg(any(test, feature = "extra_content_types"))]
267impl From<serde_pickle::error::Error> for ProtocolError {
268    fn from(err: serde_pickle::error::Error) -> Self {
269        Self::from(ContentTypeError::from(err))
270    }
271}
272
273#[cfg(any(test, feature = "extra_content_types"))]
274impl From<rmp_serde::decode::Error> for ProtocolError {
275    fn from(err: rmp_serde::decode::Error) -> Self {
276        Self::from(ContentTypeError::from(err))
277    }
278}
279
280#[cfg(any(test, feature = "extra_content_types"))]
281impl From<rmp_serde::encode::Error> for ProtocolError {
282    fn from(err: rmp_serde::encode::Error) -> Self {
283        Self::from(ContentTypeError::from(err))
284    }
285}
286
287#[cfg(any(test, feature = "extra_content_types"))]
288impl From<rmpv::ext::Error> for ProtocolError {
289    fn from(err: rmpv::ext::Error) -> Self {
290        Self::from(ContentTypeError::from(err))
291    }
292}
293
294#[derive(Error, Debug)]
295pub enum ContentTypeError {
296    #[error("JSON serialization error")]
297    Json(#[from] serde_json::Error),
298
299    #[cfg(any(test, feature = "extra_content_types"))]
300    #[error("YAML serialization error")]
301    Yaml(#[from] serde_yaml::Error),
302
303    #[cfg(any(test, feature = "extra_content_types"))]
304    #[error("Pickle serialization error")]
305    Pickle(#[from] serde_pickle::error::Error),
306
307    #[cfg(any(test, feature = "extra_content_types"))]
308    #[error("MessagePack decoding error")]
309    MsgPackDecode(#[from] rmp_serde::decode::Error),
310
311    #[cfg(any(test, feature = "extra_content_types"))]
312    #[error("MessagePack encoding error")]
313    MsgPackEncode(#[from] rmp_serde::encode::Error),
314
315    #[cfg(any(test, feature = "extra_content_types"))]
316    #[error("MessagePack value error")]
317    MsgPackValue(#[from] rmpv::ext::Error),
318
319    #[error("Unknown content type error")]
320    Unknown,
321}