use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CeleryError {
#[error("at least one queue required to consume from")]
NoQueueToConsume,
#[error("forced shutdown")]
ForcedShutdown,
#[error("broker error")]
BrokerError(#[from] BrokerError),
#[error("IO error")]
IoError(#[from] std::io::Error),
#[error("protocol error")]
ProtocolError(#[from] ProtocolError),
#[error("there is already a task registered as '{0}'")]
TaskRegistrationError(String),
#[error("received unregistered task {0}")]
UnregisteredTaskError(String),
}
#[derive(Error, Debug)]
pub enum BeatError {
#[error("broker error")]
BrokerError(#[from] BrokerError),
#[error("protocol error")]
ProtocolError(#[from] ProtocolError),
#[error("task schedule error")]
ScheduleError(#[from] ScheduleError),
}
#[derive(Error, Debug)]
pub enum ScheduleError {
#[error("invalid cron schedule: {0}")]
CronScheduleError(String),
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum TaskError {
#[error("task raised expected error: {0}")]
ExpectedError(String),
#[error("task raised unexpected error: {0}")]
UnexpectedError(String),
#[error("task timed out")]
TimeoutError,
#[error("task retry triggered")]
Retry(Option<DateTime<Utc>>),
}
#[derive(Error, Debug)]
pub(crate) enum TraceError {
#[error("task failed")]
TaskError(TaskError),
#[error("task expired")]
ExpirationError,
#[error("retrying task")]
Retry(Option<DateTime<Utc>>),
}
#[derive(Error, Debug)]
pub enum BrokerError {
#[error("invalid broker URL '{0}'")]
InvalidBrokerUrl(String),
#[error("unknown queue '{0}'")]
UnknownQueue(String),
#[error("broker not connected")]
NotConnected,
#[error("IO error \"{0}\"")]
IoError(#[from] std::io::Error),
#[error("Deserialize error \"{0}\"")]
DeserializeError(#[from] serde_json::Error),
#[error("Routing pattern error \"{0}\"")]
BadRoutingPattern(#[from] BadRoutingPattern),
#[error("Protocol error \"{0}\"")]
ProtocolError(#[from] ProtocolError),
#[error("AMQP error \"{0}\"")]
AMQPError(#[from] lapin::Error),
#[error("Redis error \"{0}\"")]
RedisError(#[from] redis::RedisError),
}
impl BrokerError {
pub fn is_connection_error(&self) -> bool {
match self {
BrokerError::IoError(_) | BrokerError::NotConnected => true,
BrokerError::AMQPError(err) => matches!(
err,
lapin::Error::ProtocolError(_)
| lapin::Error::InvalidConnectionState(_)
| lapin::Error::InvalidChannelState(_)
),
BrokerError::RedisError(err) => {
err.is_connection_dropped() || err.is_connection_refusal()
}
_ => false,
}
}
}
#[derive(Error, Debug)]
#[error("invalid glob routing rule")]
pub struct BadRoutingPattern(#[from] globset::Error);
#[derive(Error, Debug)]
pub enum ProtocolError {
#[error("missing required property '{0}'")]
MissingRequiredProperty(String),
#[error("missing headers")]
MissingHeaders,
#[error("missing required property '{0}'")]
MissingRequiredHeader(String),
#[error("message body serialization error")]
BodySerializationError(#[from] ContentTypeError),
#[error("invalid property '{0}'")]
InvalidProperty(String),
}
impl From<serde_json::Error> for ProtocolError {
fn from(err: serde_json::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[cfg(any(test, feature = "extra_content_types"))]
impl From<serde_yaml::Error> for ProtocolError {
fn from(err: serde_yaml::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[cfg(any(test, feature = "extra_content_types"))]
impl From<serde_pickle::error::Error> for ProtocolError {
fn from(err: serde_pickle::error::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[cfg(any(test, feature = "extra_content_types"))]
impl From<rmp_serde::decode::Error> for ProtocolError {
fn from(err: rmp_serde::decode::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[cfg(any(test, feature = "extra_content_types"))]
impl From<rmp_serde::encode::Error> for ProtocolError {
fn from(err: rmp_serde::encode::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[cfg(any(test, feature = "extra_content_types"))]
impl From<rmpv::ext::Error> for ProtocolError {
fn from(err: rmpv::ext::Error) -> Self {
Self::from(ContentTypeError::from(err))
}
}
#[derive(Error, Debug)]
pub enum ContentTypeError {
#[error("JSON serialization error")]
Json(#[from] serde_json::Error),
#[cfg(any(test, feature = "extra_content_types"))]
#[error("YAML serialization error")]
Yaml(#[from] serde_yaml::Error),
#[cfg(any(test, feature = "extra_content_types"))]
#[error("Pickle serialization error")]
Pickle(#[from] serde_pickle::error::Error),
#[cfg(any(test, feature = "extra_content_types"))]
#[error("MessagePack decoding error")]
MsgPackDecode(#[from] rmp_serde::decode::Error),
#[cfg(any(test, feature = "extra_content_types"))]
#[error("MessagePack encoding error")]
MsgPackEncode(#[from] rmp_serde::encode::Error),
#[cfg(any(test, feature = "extra_content_types"))]
#[error("MessagePack value error")]
MsgPackValue(#[from] rmpv::ext::Error),
#[error("Unknown content type error")]
Unknown,
}