use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use uuid::Uuid;
pub(crate) const CONTENT_TYPE_JSON: &str = "application/json";
#[cfg(feature = "msgpack")]
pub(crate) const CONTENT_TYPE_MSGPACK: &str = "application/x-msgpack";
#[cfg(feature = "binary")]
pub(crate) const CONTENT_TYPE_BINARY: &str = "application/octet-stream";
pub(crate) const ENCODING_UTF8: &str = "utf-8";
pub(crate) const ENCODING_BINARY: &str = "binary";
pub(crate) const DEFAULT_LANG: &str = "rust";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ValidationError {
EmptyTaskName,
RetryLimitExceeded { retries: u32, max: u32 },
EtaAfterExpiration,
InvalidDeliveryMode { mode: u8 },
InvalidPriority { priority: u8 },
EmptyContentType,
EmptyBody,
BodyTooLarge { size: usize, max: usize },
}
impl fmt::Display for ValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ValidationError::EmptyTaskName => write!(f, "Task name cannot be empty"),
ValidationError::RetryLimitExceeded { retries, max } => {
write!(f, "Retries ({}) cannot exceed {}", retries, max)
}
ValidationError::EtaAfterExpiration => {
write!(f, "ETA cannot be after expiration time")
}
ValidationError::InvalidDeliveryMode { mode } => {
write!(
f,
"Invalid delivery mode ({}): must be 1 (non-persistent) or 2 (persistent)",
mode
)
}
ValidationError::InvalidPriority { priority } => {
write!(
f,
"Invalid priority ({}): must be between 0 and 9",
priority
)
}
ValidationError::EmptyContentType => write!(f, "Content type cannot be empty"),
ValidationError::EmptyBody => write!(f, "Message body cannot be empty"),
ValidationError::BodyTooLarge { size, max } => {
write!(
f,
"Message body too large: {} bytes (max {} bytes)",
size, max
)
}
}
}
}
impl std::error::Error for ValidationError {}
#[derive(
Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub enum ProtocolVersion {
#[default]
V2,
V5,
}
impl std::fmt::Display for ProtocolVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProtocolVersion::V2 => write!(f, "v2"),
ProtocolVersion::V5 => write!(f, "v5"),
}
}
}
impl std::str::FromStr for ProtocolVersion {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"v2" | "2" => Ok(ProtocolVersion::V2),
"v5" | "5" => Ok(ProtocolVersion::V5),
_ => Err(format!("Invalid protocol version: {}", s)),
}
}
}
impl ProtocolVersion {
#[inline]
pub const fn is_v2(self) -> bool {
matches!(self, ProtocolVersion::V2)
}
#[inline]
pub const fn is_v5(self) -> bool {
matches!(self, ProtocolVersion::V5)
}
#[inline]
pub const fn as_u8(self) -> u8 {
match self {
ProtocolVersion::V2 => 2,
ProtocolVersion::V5 => 5,
}
}
#[inline]
pub const fn as_number_str(self) -> &'static str {
match self {
ProtocolVersion::V2 => "2",
ProtocolVersion::V5 => "5",
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ContentType {
#[default]
Json,
#[cfg(feature = "msgpack")]
MessagePack,
#[cfg(feature = "binary")]
Binary,
Custom(String),
}
impl ContentType {
#[inline]
pub fn as_str(&self) -> &str {
match self {
ContentType::Json => CONTENT_TYPE_JSON,
#[cfg(feature = "msgpack")]
ContentType::MessagePack => CONTENT_TYPE_MSGPACK,
#[cfg(feature = "binary")]
ContentType::Binary => CONTENT_TYPE_BINARY,
ContentType::Custom(s) => s,
}
}
}
impl std::fmt::Display for ContentType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl std::str::FromStr for ContentType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
CONTENT_TYPE_JSON => Ok(ContentType::Json),
#[cfg(feature = "msgpack")]
CONTENT_TYPE_MSGPACK => Ok(ContentType::MessagePack),
#[cfg(feature = "binary")]
CONTENT_TYPE_BINARY => Ok(ContentType::Binary),
other => Ok(ContentType::Custom(other.to_string())),
}
}
}
impl From<&str> for ContentType {
fn from(s: &str) -> Self {
match s {
CONTENT_TYPE_JSON => ContentType::Json,
#[cfg(feature = "msgpack")]
CONTENT_TYPE_MSGPACK => ContentType::MessagePack,
#[cfg(feature = "binary")]
CONTENT_TYPE_BINARY => ContentType::Binary,
other => ContentType::Custom(other.to_string()),
}
}
}
impl AsRef<str> for ContentType {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ContentEncoding {
#[default]
Utf8,
Binary,
Custom(String),
}
impl ContentEncoding {
#[inline]
pub fn as_str(&self) -> &str {
match self {
ContentEncoding::Utf8 => ENCODING_UTF8,
ContentEncoding::Binary => ENCODING_BINARY,
ContentEncoding::Custom(s) => s,
}
}
}
impl std::fmt::Display for ContentEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl std::str::FromStr for ContentEncoding {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
ENCODING_UTF8 => Ok(ContentEncoding::Utf8),
ENCODING_BINARY => Ok(ContentEncoding::Binary),
other => Ok(ContentEncoding::Custom(other.to_string())),
}
}
}
impl From<&str> for ContentEncoding {
fn from(s: &str) -> Self {
match s {
ENCODING_UTF8 => ContentEncoding::Utf8,
ENCODING_BINARY => ContentEncoding::Binary,
other => ContentEncoding::Custom(other.to_string()),
}
}
}
impl AsRef<str> for ContentEncoding {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MessageHeaders {
pub task: String,
pub id: Uuid,
#[serde(default = "default_lang")]
pub lang: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub root_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retries: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub eta: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires: Option<DateTime<Utc>>,
#[serde(flatten)]
pub extra: HashMap<String, serde_json::Value>,
}
fn default_lang() -> String {
DEFAULT_LANG.to_string()
}
impl MessageHeaders {
pub fn new(task: String, id: Uuid) -> Self {
Self {
task,
id,
lang: default_lang(),
root_id: None,
parent_id: None,
group: None,
retries: None,
eta: None,
expires: None,
extra: HashMap::new(),
}
}
#[must_use]
pub fn with_lang(mut self, lang: String) -> Self {
self.lang = lang;
self
}
#[must_use]
pub fn with_root_id(mut self, root_id: Uuid) -> Self {
self.root_id = Some(root_id);
self
}
#[must_use]
pub fn with_parent_id(mut self, parent_id: Uuid) -> Self {
self.parent_id = Some(parent_id);
self
}
#[must_use]
pub fn with_group(mut self, group: Uuid) -> Self {
self.group = Some(group);
self
}
#[must_use]
pub fn with_retries(mut self, retries: u32) -> Self {
self.retries = Some(retries);
self
}
#[must_use]
pub fn with_eta(mut self, eta: DateTime<Utc>) -> Self {
self.eta = Some(eta);
self
}
#[must_use]
pub fn with_expires(mut self, expires: DateTime<Utc>) -> Self {
self.expires = Some(expires);
self
}
pub fn validate(&self) -> Result<(), ValidationError> {
if self.task.is_empty() {
return Err(ValidationError::EmptyTaskName);
}
if let Some(retries) = self.retries {
if retries > 1000 {
return Err(ValidationError::RetryLimitExceeded { retries, max: 1000 });
}
}
if let (Some(eta), Some(expires)) = (self.eta, self.expires) {
if eta > expires {
return Err(ValidationError::EtaAfterExpiration);
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MessageProperties {
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reply_to: Option<String>,
#[serde(default = "default_delivery_mode")]
pub delivery_mode: u8,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<u8>,
}
const fn default_delivery_mode() -> u8 {
2 }
impl Default for MessageProperties {
fn default() -> Self {
Self {
correlation_id: None,
reply_to: None,
delivery_mode: default_delivery_mode(),
priority: None,
}
}
}
impl MessageProperties {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
self.correlation_id = Some(correlation_id);
self
}
#[must_use]
pub fn with_reply_to(mut self, reply_to: String) -> Self {
self.reply_to = Some(reply_to);
self
}
#[must_use]
pub fn with_delivery_mode(mut self, delivery_mode: u8) -> Self {
self.delivery_mode = delivery_mode;
self
}
#[must_use]
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = Some(priority);
self
}
pub fn validate(&self) -> Result<(), ValidationError> {
if self.delivery_mode != 1 && self.delivery_mode != 2 {
return Err(ValidationError::InvalidDeliveryMode {
mode: self.delivery_mode,
});
}
if let Some(priority) = self.priority {
if priority > 9 {
return Err(ValidationError::InvalidPriority { priority });
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Message {
pub headers: MessageHeaders,
pub properties: MessageProperties,
#[serde(with = "serde_bytes_opt")]
pub body: Vec<u8>,
#[serde(rename = "content-type")]
pub content_type: String,
#[serde(rename = "content-encoding")]
pub content_encoding: String,
}
mod serde_bytes_opt {
use base64::Engine;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
serializer.serialize_str(&encoded)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
base64::engine::general_purpose::STANDARD
.decode(&s)
.map_err(Error::custom)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct TaskArgs {
#[serde(default)]
pub args: Vec<serde_json::Value>,
#[serde(default)]
pub kwargs: HashMap<String, serde_json::Value>,
}
impl TaskArgs {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
self.args = args;
self
}
#[must_use]
pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
self.kwargs = kwargs;
self
}
pub fn add_arg(&mut self, arg: serde_json::Value) {
self.args.push(arg);
}
pub fn add_kwarg(&mut self, key: String, value: serde_json::Value) {
self.kwargs.insert(key, value);
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.args.is_empty() && self.kwargs.is_empty()
}
#[inline(always)]
pub fn len(&self) -> usize {
self.args.len() + self.kwargs.len()
}
#[inline(always)]
pub fn has_args(&self) -> bool {
!self.args.is_empty()
}
#[inline(always)]
pub fn has_kwargs(&self) -> bool {
!self.kwargs.is_empty()
}
pub fn clear(&mut self) {
self.args.clear();
self.kwargs.clear();
}
#[inline]
pub fn get_arg(&self, index: usize) -> Option<&serde_json::Value> {
self.args.get(index)
}
#[inline]
pub fn get_kwarg(&self, key: &str) -> Option<&serde_json::Value> {
self.kwargs.get(key)
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json)
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn to_json_pretty(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
}
impl std::ops::Index<usize> for TaskArgs {
type Output = serde_json::Value;
#[inline]
fn index(&self, index: usize) -> &Self::Output {
&self.args[index]
}
}
impl std::ops::IndexMut<usize> for TaskArgs {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.args[index]
}
}
impl std::ops::Index<&str> for TaskArgs {
type Output = serde_json::Value;
#[inline]
fn index(&self, key: &str) -> &Self::Output {
&self.kwargs[key]
}
}
impl IntoIterator for TaskArgs {
type Item = serde_json::Value;
type IntoIter = std::vec::IntoIter<serde_json::Value>;
fn into_iter(self) -> Self::IntoIter {
self.args.into_iter()
}
}
impl<'a> IntoIterator for &'a TaskArgs {
type Item = &'a serde_json::Value;
type IntoIter = std::slice::Iter<'a, serde_json::Value>;
fn into_iter(self) -> Self::IntoIter {
self.args.iter()
}
}
impl Extend<serde_json::Value> for TaskArgs {
fn extend<T: IntoIterator<Item = serde_json::Value>>(&mut self, iter: T) {
self.args.extend(iter);
}
}
impl Extend<(String, serde_json::Value)> for TaskArgs {
fn extend<T: IntoIterator<Item = (String, serde_json::Value)>>(&mut self, iter: T) {
self.kwargs.extend(iter);
}
}
impl FromIterator<serde_json::Value> for TaskArgs {
fn from_iter<T: IntoIterator<Item = serde_json::Value>>(iter: T) -> Self {
Self {
args: iter.into_iter().collect(),
kwargs: HashMap::new(),
}
}
}