celery/
error.rs

1//! All error types used throughout the library.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7/// Errors that can occur while creating or using a `Celery` app.
8#[derive(Error, Debug)]
9pub enum CeleryError {
10    /// Raised when `Celery::consume_from` is given an empty array of queues.
11    #[error("at least one queue required to consume from")]
12    NoQueueToConsume,
13
14    /// Forced shutdown.
15    #[error("forced shutdown")]
16    ForcedShutdown,
17
18    /// Any other broker-level error that could happen when initializing or with an open
19    /// connection.
20    #[error("broker error")]
21    BrokerError(#[from] BrokerError),
22
23    /// Any other IO error that could occur.
24    #[error("IO error")]
25    IoError(#[from] std::io::Error),
26
27    /// A protocol error.
28    #[error("protocol error")]
29    ProtocolError(#[from] ProtocolError),
30
31    /// There is already a task registered to this name.
32    #[error("there is already a task registered as '{0}'")]
33    TaskRegistrationError(String),
34
35    #[error("received unregistered task {0}")]
36    UnregisteredTaskError(String),
37}
38
39/// Errors that can occur while creating or using a `Beat` app.
40#[derive(Error, Debug)]
41pub enum BeatError {
42    /// Any broker-level error.
43    #[error("broker error")]
44    BrokerError(#[from] BrokerError),
45
46    /// A protocol error.
47    #[error("protocol error")]
48    ProtocolError(#[from] ProtocolError),
49
50    /// An error with a task schedule.
51    #[error("task schedule error")]
52    ScheduleError(#[from] ScheduleError),
53
54    /// Redis-related error.
55    #[error("redis error: {0}")]
56    RedisError(String),
57}
58
59/// Errors that are related to task schedules.
60#[derive(Error, Debug)]
61pub enum ScheduleError {
62    /// Error that can occur while creating a cron schedule.
63    #[error("invalid cron schedule: {0}")]
64    CronScheduleError(String),
65}
66
67/// Errors that can occur at the task level.
68#[derive(Error, Debug, Serialize, Deserialize)]
69pub enum TaskError {
70    /// An error that is expected to happen every once in a while.
71    ///
72    /// These errors will only be logged at the `WARN` level and will always trigger a task
73    /// retry unless [`max_retries`](../task/struct.TaskOptions.html#structfield.max_retries)
74    /// is set to 0 (or max retries is exceeded).
75    ///
76    /// A typical example is a task that makes an HTTP request to an external service.
77    /// If that service is temporarily unavailable the task should raise an `ExpectedError`.
78    ///
79    /// Tasks are always retried with capped exponential backoff.
80    #[error("task raised expected error: {0}")]
81    ExpectedError(String),
82
83    /// Should be used when a task encounters an error that is unexpected.
84    ///
85    /// These errors will always be logged at the `ERROR` level. The retry behavior
86    /// when this error is encountered is determined by the
87    /// [`TaskOptions::retry_for_unexpected`](../task/struct.TaskOptions.html#structfield.retry_for_unexpected)
88    /// setting.
89    #[error("task raised unexpected error: {0}")]
90    UnexpectedError(String),
91
92    /// Raised when a task runs over its time limit specified by the
93    /// [`TaskOptions::time_limit`](../task/struct.TaskOptions.html#structfield.time_limit) setting.
94    ///
95    /// These errors are logged at the `ERROR` level but are otherwise treated like
96    /// `ExpectedError`s in that they will trigger a retry when `max_retries` is anything but 0.
97    ///
98    /// Typically a task implementation doesn't need to return these errors directly
99    /// because they will be raised automatically when the task runs over it's `time_limit`,
100    /// provided the task yields control at some point (like with non-blocking IO).
101    #[error("task timed out")]
102    TimeoutError,
103
104    /// A task can return this error variant to manually trigger a retry.
105    ///
106    /// This error variant should generally not be used directly. Instead, you should
107    /// call the `Task::retry_with_countdown` or `Task::retry_with_eta` trait methods
108    /// to manually trigger a retry from within a task.
109    #[error("task retry triggered")]
110    Retry(Option<DateTime<Utc>>),
111}
112
113/// Errors that can occur while tracing a task.
114#[derive(Error, Debug)]
115pub(crate) enum TraceError {
116    /// Raised when a task throws an error while executing.
117    #[error("task failed")]
118    TaskError(TaskError),
119
120    /// Raised when an expired task is received.
121    #[error("task expired")]
122    ExpirationError,
123
124    /// Raised when a task should be retried.
125    #[error("retrying task")]
126    Retry(Option<DateTime<Utc>>),
127}
128
129/// Errors that can occur at the broker level.
130#[derive(Error, Debug)]
131pub enum BrokerError {
132    /// Raised when a broker URL can't be parsed.
133    #[error("invalid broker URL '{0}'")]
134    InvalidBrokerUrl(String),
135
136    /// The queue you're attempting to use has not been defined.
137    #[error("unknown queue '{0}'")]
138    UnknownQueue(String),
139
140    /// Broker is disconnected.
141    #[error("broker not connected")]
142    NotConnected,
143
144    /// Any IO error that could occur.
145    #[error("IO error \"{0}\"")]
146    IoError(#[from] std::io::Error),
147
148    /// Deserialize error
149    #[error("Deserialize error \"{0}\"")]
150    DeserializeError(#[from] serde_json::Error),
151
152    /// Routing pattern error
153    #[error("Routing pattern error \"{0}\"")]
154    BadRoutingPattern(#[from] BadRoutingPattern),
155
156    /// Protocol error
157    #[error("Protocol error \"{0}\"")]
158    ProtocolError(#[from] ProtocolError),
159
160    /// Any other AMQP error that could happen.
161    #[error("AMQP error \"{0}\"")]
162    AMQPError(#[from] lapin::Error),
163
164    /// Any other Redis error that could happen.
165    #[error("Redis error \"{0}\"")]
166    RedisError(#[from] redis::RedisError),
167}
168
169impl BrokerError {
170    pub fn is_connection_error(&self) -> bool {
171        match self {
172            BrokerError::IoError(_) | BrokerError::NotConnected => true,
173            BrokerError::AMQPError(err) => matches!(
174                err,
175                // In lapin 3.6.0, error types have been restructured
176                // We'll treat all lapin errors as connection-related for now
177                _
178            ),
179            BrokerError::RedisError(err) => {
180                err.is_connection_dropped() || err.is_connection_refusal()
181            }
182            _ => false,
183        }
184    }
185}
186
187/// An invalid glob pattern for a routing rule.
188#[derive(Error, Debug)]
189#[error("invalid glob routing rule")]
190pub struct BadRoutingPattern(#[from] globset::Error);
191
192/// Errors that can occur due to messages not conforming to the protocol.
193#[derive(Error, Debug)]
194pub enum ProtocolError {
195    /// Raised when a required message property is missing.
196    #[error("missing required property '{0}'")]
197    MissingRequiredProperty(String),
198
199    /// Raised when the headers are missing altogether.
200    #[error("missing headers")]
201    MissingHeaders,
202
203    /// Raised when a required message header is missing.
204    #[error("missing required property '{0}'")]
205    MissingRequiredHeader(String),
206
207    /// Raised when serializing or de-serializing a message body fails.
208    #[error("message body serialization error")]
209    BodySerializationError(#[from] ContentTypeError),
210
211    /// Raised when field value is invalid.
212    #[error("invalid property '{0}'")]
213    InvalidProperty(String),
214}
215
216impl From<serde_json::Error> for ProtocolError {
217    fn from(err: serde_json::Error) -> Self {
218        Self::from(ContentTypeError::from(err))
219    }
220}
221
222#[cfg(any(test, feature = "extra_content_types"))]
223impl From<serde_yaml::Error> for ProtocolError {
224    fn from(err: serde_yaml::Error) -> Self {
225        Self::from(ContentTypeError::from(err))
226    }
227}
228
229#[cfg(any(test, feature = "extra_content_types"))]
230impl From<serde_pickle::error::Error> for ProtocolError {
231    fn from(err: serde_pickle::error::Error) -> Self {
232        Self::from(ContentTypeError::from(err))
233    }
234}
235
236#[cfg(any(test, feature = "extra_content_types"))]
237impl From<rmp_serde::decode::Error> for ProtocolError {
238    fn from(err: rmp_serde::decode::Error) -> Self {
239        Self::from(ContentTypeError::from(err))
240    }
241}
242
243#[cfg(any(test, feature = "extra_content_types"))]
244impl From<rmp_serde::encode::Error> for ProtocolError {
245    fn from(err: rmp_serde::encode::Error) -> Self {
246        Self::from(ContentTypeError::from(err))
247    }
248}
249
250#[cfg(any(test, feature = "extra_content_types"))]
251impl From<rmpv::ext::Error> for ProtocolError {
252    fn from(err: rmpv::ext::Error) -> Self {
253        Self::from(ContentTypeError::from(err))
254    }
255}
256
257#[derive(Error, Debug)]
258pub enum ContentTypeError {
259    #[error("JSON serialization error")]
260    Json(#[from] serde_json::Error),
261
262    #[cfg(any(test, feature = "extra_content_types"))]
263    #[error("YAML serialization error")]
264    Yaml(#[from] serde_yaml::Error),
265
266    #[cfg(any(test, feature = "extra_content_types"))]
267    #[error("Pickle serialization error")]
268    Pickle(#[from] serde_pickle::error::Error),
269
270    #[cfg(any(test, feature = "extra_content_types"))]
271    #[error("MessagePack decoding error")]
272    MsgPackDecode(#[from] rmp_serde::decode::Error),
273
274    #[cfg(any(test, feature = "extra_content_types"))]
275    #[error("MessagePack encoding error")]
276    MsgPackEncode(#[from] rmp_serde::encode::Error),
277
278    #[cfg(any(test, feature = "extra_content_types"))]
279    #[error("MessagePack value error")]
280    MsgPackValue(#[from] rmpv::ext::Error),
281
282    #[error("Unknown content type error")]
283    Unknown,
284}