1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
9pub enum CeleryError {
10 #[error("at least one queue required to consume from")]
12 NoQueueToConsume,
13
14 #[error("forced shutdown")]
16 ForcedShutdown,
17
18 #[error("broker error")]
21 BrokerError(#[from] BrokerError),
22
23 #[error("IO error")]
25 IoError(#[from] std::io::Error),
26
27 #[error("protocol error")]
29 ProtocolError(#[from] ProtocolError),
30
31 #[error("there is already a task registered as '{0}'")]
33 TaskRegistrationError(String),
34
35 #[error("received unregistered task {0}")]
36 UnregisteredTaskError(String),
37}
38
39#[derive(Error, Debug)]
41pub enum BeatError {
42 #[error("broker error")]
44 BrokerError(#[from] BrokerError),
45
46 #[error("protocol error")]
48 ProtocolError(#[from] ProtocolError),
49
50 #[error("task schedule error")]
52 ScheduleError(#[from] ScheduleError),
53}
54
55#[derive(Error, Debug)]
57pub enum ScheduleError {
58 #[error("invalid cron schedule: {0}")]
60 CronScheduleError(String),
61}
62
63#[derive(Error, Debug, Serialize, Deserialize)]
65pub enum TaskError {
66 #[error("task raised expected error: {0}")]
77 ExpectedError(String),
78
79 #[error("task raised unexpected error: {0}")]
86 UnexpectedError(String),
87
88 #[error("task timed out")]
98 TimeoutError,
99
100 #[error("task retry triggered")]
106 Retry(Option<DateTime<Utc>>),
107}
108
109#[derive(Error, Debug)]
111pub(crate) enum TraceError {
112 #[error("task failed")]
114 TaskError(TaskError),
115
116 #[error("task expired")]
118 ExpirationError,
119
120 #[error("retrying task")]
122 Retry(Option<DateTime<Utc>>),
123}
124
125#[derive(Error, Debug)]
127pub enum BrokerError {
128 #[error("invalid broker URL '{0}'")]
130 InvalidBrokerUrl(String),
131
132 #[error("unknown queue '{0}'")]
134 UnknownQueue(String),
135
136 #[error("broker not connected")]
138 NotConnected,
139
140 #[error("IO error \"{0}\"")]
142 IoError(#[from] std::io::Error),
143
144 #[error("Deserialize error \"{0}\"")]
146 DeserializeError(#[from] serde_json::Error),
147
148 #[error("Routing pattern error \"{0}\"")]
150 BadRoutingPattern(#[from] BadRoutingPattern),
151
152 #[error("Protocol error \"{0}\"")]
154 ProtocolError(#[from] ProtocolError),
155
156 #[error("AMQP error \"{0}\"")]
158 AMQPError(#[from] lapin::Error),
159
160 #[error("Redis error \"{0}\"")]
162 RedisError(#[from] redis::RedisError),
163}
164
165impl BrokerError {
166 pub fn is_connection_error(&self) -> bool {
167 match self {
168 BrokerError::IoError(_) | BrokerError::NotConnected => true,
169 BrokerError::AMQPError(err) => matches!(
170 err,
171 _
174 ),
175 BrokerError::RedisError(err) => {
176 err.is_connection_dropped() || err.is_connection_refusal()
177 }
178 _ => false,
179 }
180 }
181}
182
183#[derive(Error, Debug)]
185#[error("invalid glob routing rule")]
186pub struct BadRoutingPattern(#[from] globset::Error);
187
188#[derive(Error, Debug)]
190pub enum ProtocolError {
191 #[error("missing required property '{0}'")]
193 MissingRequiredProperty(String),
194
195 #[error("missing headers")]
197 MissingHeaders,
198
199 #[error("missing required property '{0}'")]
201 MissingRequiredHeader(String),
202
203 #[error("message body serialization error")]
205 BodySerializationError(#[from] ContentTypeError),
206
207 #[error("invalid property '{0}'")]
209 InvalidProperty(String),
210}
211
212impl From<serde_json::Error> for ProtocolError {
213 fn from(err: serde_json::Error) -> Self {
214 Self::from(ContentTypeError::from(err))
215 }
216}
217
218#[cfg(any(test, feature = "extra_content_types"))]
219impl From<serde_yaml::Error> for ProtocolError {
220 fn from(err: serde_yaml::Error) -> Self {
221 Self::from(ContentTypeError::from(err))
222 }
223}
224
225#[cfg(any(test, feature = "extra_content_types"))]
226impl From<serde_pickle::error::Error> for ProtocolError {
227 fn from(err: serde_pickle::error::Error) -> Self {
228 Self::from(ContentTypeError::from(err))
229 }
230}
231
232#[cfg(any(test, feature = "extra_content_types"))]
233impl From<rmp_serde::decode::Error> for ProtocolError {
234 fn from(err: rmp_serde::decode::Error) -> Self {
235 Self::from(ContentTypeError::from(err))
236 }
237}
238
239#[cfg(any(test, feature = "extra_content_types"))]
240impl From<rmp_serde::encode::Error> for ProtocolError {
241 fn from(err: rmp_serde::encode::Error) -> Self {
242 Self::from(ContentTypeError::from(err))
243 }
244}
245
246#[cfg(any(test, feature = "extra_content_types"))]
247impl From<rmpv::ext::Error> for ProtocolError {
248 fn from(err: rmpv::ext::Error) -> Self {
249 Self::from(ContentTypeError::from(err))
250 }
251}
252
253#[derive(Error, Debug)]
254pub enum ContentTypeError {
255 #[error("JSON serialization error")]
256 Json(#[from] serde_json::Error),
257
258 #[cfg(any(test, feature = "extra_content_types"))]
259 #[error("YAML serialization error")]
260 Yaml(#[from] serde_yaml::Error),
261
262 #[cfg(any(test, feature = "extra_content_types"))]
263 #[error("Pickle serialization error")]
264 Pickle(#[from] serde_pickle::error::Error),
265
266 #[cfg(any(test, feature = "extra_content_types"))]
267 #[error("MessagePack decoding error")]
268 MsgPackDecode(#[from] rmp_serde::decode::Error),
269
270 #[cfg(any(test, feature = "extra_content_types"))]
271 #[error("MessagePack encoding error")]
272 MsgPackEncode(#[from] rmp_serde::encode::Error),
273
274 #[cfg(any(test, feature = "extra_content_types"))]
275 #[error("MessagePack value error")]
276 MsgPackValue(#[from] rmpv::ext::Error),
277
278 #[error("Unknown content type error")]
279 Unknown,
280}