use async_trait::async_trait;
use celers_protocol::Message;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use crate::{Producer, Result};
#[derive(
Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub enum Priority {
Lowest = 0,
Low = 3,
#[default]
Normal = 5,
High = 7,
Highest = 9,
}
impl std::fmt::Display for Priority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Priority::Lowest => write!(f, "lowest"),
Priority::Low => write!(f, "low"),
Priority::Normal => write!(f, "normal"),
Priority::High => write!(f, "high"),
Priority::Highest => write!(f, "highest"),
}
}
}
impl Priority {
pub fn as_u8(&self) -> u8 {
*self as u8
}
pub fn from_u8(value: u8) -> Self {
match value {
0..=1 => Priority::Lowest,
2..=4 => Priority::Low,
5 => Priority::Normal,
6..=8 => Priority::High,
_ => Priority::Highest,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MessageOptions {
pub priority: Option<Priority>,
pub ttl: Option<Duration>,
pub expires_at: Option<u64>,
pub delay: Option<Duration>,
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
pub headers: HashMap<String, String>,
pub sign: bool,
pub signing_key: Option<Vec<u8>>,
pub encrypt: bool,
pub encryption_key: Option<Vec<u8>>,
pub compress: bool,
}
impl MessageOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_priority(mut self, priority: Priority) -> Self {
self.priority = Some(priority);
self
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_expires_at(mut self, timestamp: u64) -> Self {
self.expires_at = Some(timestamp);
self
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
}
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
self.correlation_id = Some(id.into());
self
}
pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
self.reply_to = Some(queue.into());
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_signing(mut self, key: Vec<u8>) -> Self {
self.sign = true;
self.signing_key = Some(key);
self
}
pub fn with_encryption(mut self, key: Vec<u8>) -> Self {
self.encrypt = true;
self.encryption_key = Some(key);
self
}
pub fn with_compression(mut self) -> Self {
self.compress = true;
self
}
pub fn is_expired(&self, current_timestamp: u64) -> bool {
self.expires_at.is_some_and(|exp| current_timestamp > exp)
}
pub fn should_delay(&self) -> bool {
self.delay.is_some()
}
pub fn should_sign(&self) -> bool {
self.sign && self.signing_key.is_some()
}
pub fn should_encrypt(&self) -> bool {
self.encrypt && self.encryption_key.is_some()
}
pub fn should_compress(&self) -> bool {
self.compress
}
}
#[async_trait]
pub trait ExtendedProducer: Producer {
async fn publish_with_options(
&mut self,
queue: &str,
message: Message,
options: MessageOptions,
) -> Result<()>;
async fn publish_with_routing_and_options(
&mut self,
exchange: &str,
routing_key: &str,
message: Message,
options: MessageOptions,
) -> Result<()>;
}