1use chrono::{DateTime, Utc};
4use deadpool_redis::PoolError;
5use serde::{Deserialize, Serialize};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum CeleryError {
11 #[error("at least one queue required to consume from")]
13 NoQueueToConsume,
14
15 #[error("forced shutdown")]
17 ForcedShutdown,
18
19 #[error("broker error")]
22 BrokerError(#[from] BrokerError),
23
24 #[error("IO error")]
26 IoError(#[from] std::io::Error),
27
28 #[error("protocol error")]
30 ProtocolError(#[from] ProtocolError),
31
32 #[error("there is already a task registered as '{0}'")]
34 TaskRegistrationError(String),
35
36 #[error("received unregistered task {0}")]
37 UnregisteredTaskError(String),
38
39 #[error("backend error")]
41 BackendError(#[from] BackendError),
42}
43
44#[derive(Error, Debug)]
46pub enum BeatError {
47 #[error("broker error")]
49 BrokerError(#[from] BrokerError),
50
51 #[error("protocol error")]
53 ProtocolError(#[from] ProtocolError),
54
55 #[error("task schedule error")]
57 ScheduleError(#[from] ScheduleError),
58
59 #[error("redis error: {0}")]
61 RedisError(String),
62}
63
64#[derive(Error, Debug)]
66pub enum BackendError {
67 #[error("result backend not configured")]
69 NotConfigured,
70
71 #[error("timeout waiting for task result")]
73 Timeout,
74
75 #[error("backend serialization error: {0}")]
77 Serialization(#[from] serde_json::Error),
78
79 #[error("redis error: {0}")]
81 Redis(#[from] redis::RedisError),
82
83 #[error("redis pool error: {0}")]
85 Pool(#[from] PoolError),
86
87 #[error("redis pool creation error: {0}")]
89 PoolCreationError(String),
90
91 #[error("task failed: {0}")]
93 TaskFailed(String),
94}
95
96#[derive(Error, Debug)]
98pub enum ScheduleError {
99 #[error("invalid cron schedule: {0}")]
101 CronScheduleError(String),
102}
103
104#[derive(Error, Debug, Serialize, Deserialize)]
106pub enum TaskError {
107 #[error("task raised expected error: {0}")]
118 ExpectedError(String),
119
120 #[error("task raised unexpected error: {0}")]
127 UnexpectedError(String),
128
129 #[error("task timed out")]
139 TimeoutError,
140
141 #[error("task retry triggered")]
147 Retry(Option<DateTime<Utc>>),
148}
149
150#[derive(Error, Debug)]
152pub(crate) enum TraceError {
153 #[error("task failed")]
155 TaskError(TaskError),
156
157 #[error("task expired")]
159 ExpirationError,
160
161 #[error("retrying task")]
163 Retry(Option<DateTime<Utc>>),
164}
165
166#[derive(Error, Debug)]
168pub enum BrokerError {
169 #[error("invalid broker URL '{0}'")]
171 InvalidBrokerUrl(String),
172
173 #[error("unknown queue '{0}'")]
175 UnknownQueue(String),
176
177 #[error("broker not connected")]
179 NotConnected,
180
181 #[error("IO error \"{0}\"")]
183 IoError(#[from] std::io::Error),
184
185 #[error("Deserialize error \"{0}\"")]
187 DeserializeError(#[from] serde_json::Error),
188
189 #[error("Routing pattern error \"{0}\"")]
191 BadRoutingPattern(#[from] BadRoutingPattern),
192
193 #[error("Protocol error \"{0}\"")]
195 ProtocolError(#[from] ProtocolError),
196
197 #[error("AMQP error \"{0}\"")]
199 AMQPError(#[from] lapin::Error),
200
201 #[error("Redis error \"{0}\"")]
203 RedisError(#[from] redis::RedisError),
204}
205
206impl BrokerError {
207 pub fn is_connection_error(&self) -> bool {
208 match self {
209 BrokerError::IoError(_) | BrokerError::NotConnected => true,
210 BrokerError::AMQPError(err) => matches!(
211 err,
212 _
215 ),
216 BrokerError::RedisError(err) => {
217 err.is_connection_dropped() || err.is_connection_refusal()
218 }
219 _ => false,
220 }
221 }
222}
223
224#[derive(Error, Debug)]
226#[error("invalid glob routing rule")]
227pub struct BadRoutingPattern(#[from] globset::Error);
228
229#[derive(Error, Debug)]
231pub enum ProtocolError {
232 #[error("missing required property '{0}'")]
234 MissingRequiredProperty(String),
235
236 #[error("missing headers")]
238 MissingHeaders,
239
240 #[error("missing required property '{0}'")]
242 MissingRequiredHeader(String),
243
244 #[error("message body serialization error")]
246 BodySerializationError(#[from] ContentTypeError),
247
248 #[error("invalid property '{0}'")]
250 InvalidProperty(String),
251}
252
253impl From<serde_json::Error> for ProtocolError {
254 fn from(err: serde_json::Error) -> Self {
255 Self::from(ContentTypeError::from(err))
256 }
257}
258
259#[cfg(any(test, feature = "extra_content_types"))]
260impl From<serde_yaml::Error> for ProtocolError {
261 fn from(err: serde_yaml::Error) -> Self {
262 Self::from(ContentTypeError::from(err))
263 }
264}
265
266#[cfg(any(test, feature = "extra_content_types"))]
267impl From<serde_pickle::error::Error> for ProtocolError {
268 fn from(err: serde_pickle::error::Error) -> Self {
269 Self::from(ContentTypeError::from(err))
270 }
271}
272
273#[cfg(any(test, feature = "extra_content_types"))]
274impl From<rmp_serde::decode::Error> for ProtocolError {
275 fn from(err: rmp_serde::decode::Error) -> Self {
276 Self::from(ContentTypeError::from(err))
277 }
278}
279
280#[cfg(any(test, feature = "extra_content_types"))]
281impl From<rmp_serde::encode::Error> for ProtocolError {
282 fn from(err: rmp_serde::encode::Error) -> Self {
283 Self::from(ContentTypeError::from(err))
284 }
285}
286
287#[cfg(any(test, feature = "extra_content_types"))]
288impl From<rmpv::ext::Error> for ProtocolError {
289 fn from(err: rmpv::ext::Error) -> Self {
290 Self::from(ContentTypeError::from(err))
291 }
292}
293
294#[derive(Error, Debug)]
295pub enum ContentTypeError {
296 #[error("JSON serialization error")]
297 Json(#[from] serde_json::Error),
298
299 #[cfg(any(test, feature = "extra_content_types"))]
300 #[error("YAML serialization error")]
301 Yaml(#[from] serde_yaml::Error),
302
303 #[cfg(any(test, feature = "extra_content_types"))]
304 #[error("Pickle serialization error")]
305 Pickle(#[from] serde_pickle::error::Error),
306
307 #[cfg(any(test, feature = "extra_content_types"))]
308 #[error("MessagePack decoding error")]
309 MsgPackDecode(#[from] rmp_serde::decode::Error),
310
311 #[cfg(any(test, feature = "extra_content_types"))]
312 #[error("MessagePack encoding error")]
313 MsgPackEncode(#[from] rmp_serde::encode::Error),
314
315 #[cfg(any(test, feature = "extra_content_types"))]
316 #[error("MessagePack value error")]
317 MsgPackValue(#[from] rmpv::ext::Error),
318
319 #[error("Unknown content type error")]
320 Unknown,
321}