celery/
error.rs

1//! All error types used throughout the library.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7/// Errors that can occur while creating or using a `Celery` app.
8#[derive(Error, Debug)]
9pub enum CeleryError {
10    /// Raised when `Celery::consume_from` is given an empty array of queues.
11    #[error("at least one queue required to consume from")]
12    NoQueueToConsume,
13
14    /// Forced shutdown.
15    #[error("forced shutdown")]
16    ForcedShutdown,
17
18    /// Any other broker-level error that could happen when initializing or with an open
19    /// connection.
20    #[error("broker error")]
21    BrokerError(#[from] BrokerError),
22
23    /// Any other IO error that could occur.
24    #[error("IO error")]
25    IoError(#[from] std::io::Error),
26
27    /// A protocol error.
28    #[error("protocol error")]
29    ProtocolError(#[from] ProtocolError),
30
31    /// There is already a task registered to this name.
32    #[error("there is already a task registered as '{0}'")]
33    TaskRegistrationError(String),
34
35    #[error("received unregistered task {0}")]
36    UnregisteredTaskError(String),
37}
38
39/// Errors that can occur while creating or using a `Beat` app.
40#[derive(Error, Debug)]
41pub enum BeatError {
42    /// Any broker-level error.
43    #[error("broker error")]
44    BrokerError(#[from] BrokerError),
45
46    /// A protocol error.
47    #[error("protocol error")]
48    ProtocolError(#[from] ProtocolError),
49
50    /// An error with a task schedule.
51    #[error("task schedule error")]
52    ScheduleError(#[from] ScheduleError),
53}
54
55/// Errors that are related to task schedules.
56#[derive(Error, Debug)]
57pub enum ScheduleError {
58    /// Error that can occur while creating a cron schedule.
59    #[error("invalid cron schedule: {0}")]
60    CronScheduleError(String),
61}
62
63/// Errors that can occur at the task level.
64#[derive(Error, Debug, Serialize, Deserialize)]
65pub enum TaskError {
66    /// An error that is expected to happen every once in a while.
67    ///
68    /// These errors will only be logged at the `WARN` level and will always trigger a task
69    /// retry unless [`max_retries`](../task/struct.TaskOptions.html#structfield.max_retries)
70    /// is set to 0 (or max retries is exceeded).
71    ///
72    /// A typical example is a task that makes an HTTP request to an external service.
73    /// If that service is temporarily unavailable the task should raise an `ExpectedError`.
74    ///
75    /// Tasks are always retried with capped exponential backoff.
76    #[error("task raised expected error: {0}")]
77    ExpectedError(String),
78
79    /// Should be used when a task encounters an error that is unexpected.
80    ///
81    /// These errors will always be logged at the `ERROR` level. The retry behavior
82    /// when this error is encountered is determined by the
83    /// [`TaskOptions::retry_for_unexpected`](../task/struct.TaskOptions.html#structfield.retry_for_unexpected)
84    /// setting.
85    #[error("task raised unexpected error: {0}")]
86    UnexpectedError(String),
87
88    /// Raised when a task runs over its time limit specified by the
89    /// [`TaskOptions::time_limit`](../task/struct.TaskOptions.html#structfield.time_limit) setting.
90    ///
91    /// These errors are logged at the `ERROR` level but are otherwise treated like
92    /// `ExpectedError`s in that they will trigger a retry when `max_retries` is anything but 0.
93    ///
94    /// Typically a task implementation doesn't need to return these errors directly
95    /// because they will be raised automatically when the task runs over it's `time_limit`,
96    /// provided the task yields control at some point (like with non-blocking IO).
97    #[error("task timed out")]
98    TimeoutError,
99
100    /// A task can return this error variant to manually trigger a retry.
101    ///
102    /// This error variant should generally not be used directly. Instead, you should
103    /// call the `Task::retry_with_countdown` or `Task::retry_with_eta` trait methods
104    /// to manually trigger a retry from within a task.
105    #[error("task retry triggered")]
106    Retry(Option<DateTime<Utc>>),
107}
108
109/// Errors that can occur while tracing a task.
110#[derive(Error, Debug)]
111pub(crate) enum TraceError {
112    /// Raised when a task throws an error while executing.
113    #[error("task failed")]
114    TaskError(TaskError),
115
116    /// Raised when an expired task is received.
117    #[error("task expired")]
118    ExpirationError,
119
120    /// Raised when a task should be retried.
121    #[error("retrying task")]
122    Retry(Option<DateTime<Utc>>),
123}
124
125/// Errors that can occur at the broker level.
126#[derive(Error, Debug)]
127pub enum BrokerError {
128    /// Raised when a broker URL can't be parsed.
129    #[error("invalid broker URL '{0}'")]
130    InvalidBrokerUrl(String),
131
132    /// The queue you're attempting to use has not been defined.
133    #[error("unknown queue '{0}'")]
134    UnknownQueue(String),
135
136    /// Broker is disconnected.
137    #[error("broker not connected")]
138    NotConnected,
139
140    /// Any IO error that could occur.
141    #[error("IO error \"{0}\"")]
142    IoError(#[from] std::io::Error),
143
144    /// Deserialize error
145    #[error("Deserialize error \"{0}\"")]
146    DeserializeError(#[from] serde_json::Error),
147
148    /// Routing pattern error
149    #[error("Routing pattern error \"{0}\"")]
150    BadRoutingPattern(#[from] BadRoutingPattern),
151
152    /// Protocol error
153    #[error("Protocol error \"{0}\"")]
154    ProtocolError(#[from] ProtocolError),
155
156    /// Any other AMQP error that could happen.
157    #[error("AMQP error \"{0}\"")]
158    AMQPError(#[from] lapin::Error),
159
160    /// Any other Redis error that could happen.
161    #[error("Redis error \"{0}\"")]
162    RedisError(#[from] redis::RedisError),
163}
164
165impl BrokerError {
166    pub fn is_connection_error(&self) -> bool {
167        match self {
168            BrokerError::IoError(_) | BrokerError::NotConnected => true,
169            BrokerError::AMQPError(err) => matches!(
170                err,
171                // In lapin 3.6.0, error types have been restructured
172                // We'll treat all lapin errors as connection-related for now
173                _
174            ),
175            BrokerError::RedisError(err) => {
176                err.is_connection_dropped() || err.is_connection_refusal()
177            }
178            _ => false,
179        }
180    }
181}
182
183/// An invalid glob pattern for a routing rule.
184#[derive(Error, Debug)]
185#[error("invalid glob routing rule")]
186pub struct BadRoutingPattern(#[from] globset::Error);
187
188/// Errors that can occur due to messages not conforming to the protocol.
189#[derive(Error, Debug)]
190pub enum ProtocolError {
191    /// Raised when a required message property is missing.
192    #[error("missing required property '{0}'")]
193    MissingRequiredProperty(String),
194
195    /// Raised when the headers are missing altogether.
196    #[error("missing headers")]
197    MissingHeaders,
198
199    /// Raised when a required message header is missing.
200    #[error("missing required property '{0}'")]
201    MissingRequiredHeader(String),
202
203    /// Raised when serializing or de-serializing a message body fails.
204    #[error("message body serialization error")]
205    BodySerializationError(#[from] ContentTypeError),
206
207    /// Raised when field value is invalid.
208    #[error("invalid property '{0}'")]
209    InvalidProperty(String),
210}
211
212impl From<serde_json::Error> for ProtocolError {
213    fn from(err: serde_json::Error) -> Self {
214        Self::from(ContentTypeError::from(err))
215    }
216}
217
218#[cfg(any(test, feature = "extra_content_types"))]
219impl From<serde_yaml::Error> for ProtocolError {
220    fn from(err: serde_yaml::Error) -> Self {
221        Self::from(ContentTypeError::from(err))
222    }
223}
224
225#[cfg(any(test, feature = "extra_content_types"))]
226impl From<serde_pickle::error::Error> for ProtocolError {
227    fn from(err: serde_pickle::error::Error) -> Self {
228        Self::from(ContentTypeError::from(err))
229    }
230}
231
232#[cfg(any(test, feature = "extra_content_types"))]
233impl From<rmp_serde::decode::Error> for ProtocolError {
234    fn from(err: rmp_serde::decode::Error) -> Self {
235        Self::from(ContentTypeError::from(err))
236    }
237}
238
239#[cfg(any(test, feature = "extra_content_types"))]
240impl From<rmp_serde::encode::Error> for ProtocolError {
241    fn from(err: rmp_serde::encode::Error) -> Self {
242        Self::from(ContentTypeError::from(err))
243    }
244}
245
246#[cfg(any(test, feature = "extra_content_types"))]
247impl From<rmpv::ext::Error> for ProtocolError {
248    fn from(err: rmpv::ext::Error) -> Self {
249        Self::from(ContentTypeError::from(err))
250    }
251}
252
253#[derive(Error, Debug)]
254pub enum ContentTypeError {
255    #[error("JSON serialization error")]
256    Json(#[from] serde_json::Error),
257
258    #[cfg(any(test, feature = "extra_content_types"))]
259    #[error("YAML serialization error")]
260    Yaml(#[from] serde_yaml::Error),
261
262    #[cfg(any(test, feature = "extra_content_types"))]
263    #[error("Pickle serialization error")]
264    Pickle(#[from] serde_pickle::error::Error),
265
266    #[cfg(any(test, feature = "extra_content_types"))]
267    #[error("MessagePack decoding error")]
268    MsgPackDecode(#[from] rmp_serde::decode::Error),
269
270    #[cfg(any(test, feature = "extra_content_types"))]
271    #[error("MessagePack encoding error")]
272    MsgPackEncode(#[from] rmp_serde::encode::Error),
273
274    #[cfg(any(test, feature = "extra_content_types"))]
275    #[error("MessagePack value error")]
276    MsgPackValue(#[from] rmpv::ext::Error),
277
278    #[error("Unknown content type error")]
279    Unknown,
280}