1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
9pub enum CeleryError {
10 #[error("at least one queue required to consume from")]
12 NoQueueToConsume,
13
14 #[error("forced shutdown")]
16 ForcedShutdown,
17
18 #[error("broker error")]
21 BrokerError(#[from] BrokerError),
22
23 #[error("IO error")]
25 IoError(#[from] std::io::Error),
26
27 #[error("protocol error")]
29 ProtocolError(#[from] ProtocolError),
30
31 #[error("there is already a task registered as '{0}'")]
33 TaskRegistrationError(String),
34
35 #[error("received unregistered task {0}")]
36 UnregisteredTaskError(String),
37}
38
39#[derive(Error, Debug)]
41pub enum BeatError {
42 #[error("broker error")]
44 BrokerError(#[from] BrokerError),
45
46 #[error("protocol error")]
48 ProtocolError(#[from] ProtocolError),
49
50 #[error("task schedule error")]
52 ScheduleError(#[from] ScheduleError),
53
54 #[error("redis error: {0}")]
56 RedisError(String),
57}
58
59#[derive(Error, Debug)]
61pub enum ScheduleError {
62 #[error("invalid cron schedule: {0}")]
64 CronScheduleError(String),
65}
66
67#[derive(Error, Debug, Serialize, Deserialize)]
69pub enum TaskError {
70 #[error("task raised expected error: {0}")]
81 ExpectedError(String),
82
83 #[error("task raised unexpected error: {0}")]
90 UnexpectedError(String),
91
92 #[error("task timed out")]
102 TimeoutError,
103
104 #[error("task retry triggered")]
110 Retry(Option<DateTime<Utc>>),
111}
112
113#[derive(Error, Debug)]
115pub(crate) enum TraceError {
116 #[error("task failed")]
118 TaskError(TaskError),
119
120 #[error("task expired")]
122 ExpirationError,
123
124 #[error("retrying task")]
126 Retry(Option<DateTime<Utc>>),
127}
128
129#[derive(Error, Debug)]
131pub enum BrokerError {
132 #[error("invalid broker URL '{0}'")]
134 InvalidBrokerUrl(String),
135
136 #[error("unknown queue '{0}'")]
138 UnknownQueue(String),
139
140 #[error("broker not connected")]
142 NotConnected,
143
144 #[error("IO error \"{0}\"")]
146 IoError(#[from] std::io::Error),
147
148 #[error("Deserialize error \"{0}\"")]
150 DeserializeError(#[from] serde_json::Error),
151
152 #[error("Routing pattern error \"{0}\"")]
154 BadRoutingPattern(#[from] BadRoutingPattern),
155
156 #[error("Protocol error \"{0}\"")]
158 ProtocolError(#[from] ProtocolError),
159
160 #[error("AMQP error \"{0}\"")]
162 AMQPError(#[from] lapin::Error),
163
164 #[error("Redis error \"{0}\"")]
166 RedisError(#[from] redis::RedisError),
167}
168
169impl BrokerError {
170 pub fn is_connection_error(&self) -> bool {
171 match self {
172 BrokerError::IoError(_) | BrokerError::NotConnected => true,
173 BrokerError::AMQPError(err) => matches!(
174 err,
175 _
178 ),
179 BrokerError::RedisError(err) => {
180 err.is_connection_dropped() || err.is_connection_refusal()
181 }
182 _ => false,
183 }
184 }
185}
186
187#[derive(Error, Debug)]
189#[error("invalid glob routing rule")]
190pub struct BadRoutingPattern(#[from] globset::Error);
191
192#[derive(Error, Debug)]
194pub enum ProtocolError {
195 #[error("missing required property '{0}'")]
197 MissingRequiredProperty(String),
198
199 #[error("missing headers")]
201 MissingHeaders,
202
203 #[error("missing required property '{0}'")]
205 MissingRequiredHeader(String),
206
207 #[error("message body serialization error")]
209 BodySerializationError(#[from] ContentTypeError),
210
211 #[error("invalid property '{0}'")]
213 InvalidProperty(String),
214}
215
216impl From<serde_json::Error> for ProtocolError {
217 fn from(err: serde_json::Error) -> Self {
218 Self::from(ContentTypeError::from(err))
219 }
220}
221
222#[cfg(any(test, feature = "extra_content_types"))]
223impl From<serde_yaml::Error> for ProtocolError {
224 fn from(err: serde_yaml::Error) -> Self {
225 Self::from(ContentTypeError::from(err))
226 }
227}
228
229#[cfg(any(test, feature = "extra_content_types"))]
230impl From<serde_pickle::error::Error> for ProtocolError {
231 fn from(err: serde_pickle::error::Error) -> Self {
232 Self::from(ContentTypeError::from(err))
233 }
234}
235
236#[cfg(any(test, feature = "extra_content_types"))]
237impl From<rmp_serde::decode::Error> for ProtocolError {
238 fn from(err: rmp_serde::decode::Error) -> Self {
239 Self::from(ContentTypeError::from(err))
240 }
241}
242
243#[cfg(any(test, feature = "extra_content_types"))]
244impl From<rmp_serde::encode::Error> for ProtocolError {
245 fn from(err: rmp_serde::encode::Error) -> Self {
246 Self::from(ContentTypeError::from(err))
247 }
248}
249
250#[cfg(any(test, feature = "extra_content_types"))]
251impl From<rmpv::ext::Error> for ProtocolError {
252 fn from(err: rmpv::ext::Error) -> Self {
253 Self::from(ContentTypeError::from(err))
254 }
255}
256
257#[derive(Error, Debug)]
258pub enum ContentTypeError {
259 #[error("JSON serialization error")]
260 Json(#[from] serde_json::Error),
261
262 #[cfg(any(test, feature = "extra_content_types"))]
263 #[error("YAML serialization error")]
264 Yaml(#[from] serde_yaml::Error),
265
266 #[cfg(any(test, feature = "extra_content_types"))]
267 #[error("Pickle serialization error")]
268 Pickle(#[from] serde_pickle::error::Error),
269
270 #[cfg(any(test, feature = "extra_content_types"))]
271 #[error("MessagePack decoding error")]
272 MsgPackDecode(#[from] rmp_serde::decode::Error),
273
274 #[cfg(any(test, feature = "extra_content_types"))]
275 #[error("MessagePack encoding error")]
276 MsgPackEncode(#[from] rmp_serde::encode::Error),
277
278 #[cfg(any(test, feature = "extra_content_types"))]
279 #[error("MessagePack value error")]
280 MsgPackValue(#[from] rmpv::ext::Error),
281
282 #[error("Unknown content type error")]
283 Unknown,
284}