Skip to main content

celers_kombu/
priority.rs

1//! Priority, message options, and extended producer types.
2
3use async_trait::async_trait;
4use celers_protocol::Message;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8
9use crate::{Producer, Result};
10
11// =============================================================================
12// Message Options
13// =============================================================================
14
15/// Priority levels for messages
16///
17/// # Examples
18///
19/// ```
20/// use celers_kombu::Priority;
21///
22/// let normal = Priority::Normal;
23/// assert_eq!(normal.as_u8(), 5);
24/// assert_eq!(normal.to_string(), "normal");
25///
26/// let high = Priority::High;
27/// assert!(high > normal);
28/// assert_eq!(high.as_u8(), 7);
29///
30/// // Convert from numeric value
31/// let priority = Priority::from_u8(8);
32/// assert_eq!(priority, Priority::High);
33///
34/// // Default is Normal
35/// let default = Priority::default();
36/// assert_eq!(default, Priority::Normal);
37/// ```
38#[derive(
39    Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
40)]
41pub enum Priority {
42    /// Lowest priority (0)
43    Lowest = 0,
44    /// Low priority (3)
45    Low = 3,
46    /// Normal priority (5)
47    #[default]
48    Normal = 5,
49    /// High priority (7)
50    High = 7,
51    /// Highest priority (9)
52    Highest = 9,
53}
54
55impl std::fmt::Display for Priority {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            Priority::Lowest => write!(f, "lowest"),
59            Priority::Low => write!(f, "low"),
60            Priority::Normal => write!(f, "normal"),
61            Priority::High => write!(f, "high"),
62            Priority::Highest => write!(f, "highest"),
63        }
64    }
65}
66
67impl Priority {
68    /// Convert to numeric value (0-9)
69    pub fn as_u8(&self) -> u8 {
70        *self as u8
71    }
72
73    /// Create from numeric value (clamped to 0-9)
74    pub fn from_u8(value: u8) -> Self {
75        match value {
76            0..=1 => Priority::Lowest,
77            2..=4 => Priority::Low,
78            5 => Priority::Normal,
79            6..=8 => Priority::High,
80            _ => Priority::Highest,
81        }
82    }
83}
84
85/// Message-level options
86///
87/// # Examples
88///
89/// ```
90/// use celers_kombu::{MessageOptions, Priority};
91/// use std::time::Duration;
92///
93/// let options = MessageOptions::new()
94///     .with_priority(Priority::High)
95///     .with_ttl(Duration::from_secs(3600))
96///     .with_correlation_id("req-123".to_string())
97///     .with_reply_to("response_queue".to_string());
98///
99/// assert_eq!(options.priority, Some(Priority::High));
100/// assert_eq!(options.ttl, Some(Duration::from_secs(3600)));
101/// assert_eq!(options.correlation_id, Some("req-123".to_string()));
102///
103/// // Check if message should be signed
104/// let secure_options = MessageOptions::new()
105///     .with_signing(b"secret-key".to_vec());
106/// assert!(secure_options.should_sign());
107/// ```
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct MessageOptions {
110    /// Message priority
111    pub priority: Option<Priority>,
112    /// Message TTL (time-to-live)
113    pub ttl: Option<Duration>,
114    /// Message expiration timestamp (absolute)
115    pub expires_at: Option<u64>,
116    /// Delay before message becomes visible
117    pub delay: Option<Duration>,
118    /// Correlation ID for request/response patterns
119    pub correlation_id: Option<String>,
120    /// Reply-to queue for RPC patterns
121    pub reply_to: Option<String>,
122    /// Custom headers
123    pub headers: HashMap<String, String>,
124    /// Enable message signing (HMAC)
125    pub sign: bool,
126    /// Signing key for HMAC (if signing is enabled)
127    pub signing_key: Option<Vec<u8>>,
128    /// Enable message encryption (AES-256-GCM)
129    pub encrypt: bool,
130    /// Encryption key (32 bytes for AES-256)
131    pub encryption_key: Option<Vec<u8>>,
132    /// Compression hint
133    pub compress: bool,
134}
135
136impl MessageOptions {
137    /// Create new message options
138    pub fn new() -> Self {
139        Self::default()
140    }
141
142    /// Set priority
143    pub fn with_priority(mut self, priority: Priority) -> Self {
144        self.priority = Some(priority);
145        self
146    }
147
148    /// Set TTL
149    pub fn with_ttl(mut self, ttl: Duration) -> Self {
150        self.ttl = Some(ttl);
151        self
152    }
153
154    /// Set expiration timestamp (Unix timestamp in seconds)
155    pub fn with_expires_at(mut self, timestamp: u64) -> Self {
156        self.expires_at = Some(timestamp);
157        self
158    }
159
160    /// Set delay
161    pub fn with_delay(mut self, delay: Duration) -> Self {
162        self.delay = Some(delay);
163        self
164    }
165
166    /// Set correlation ID
167    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
168        self.correlation_id = Some(id.into());
169        self
170    }
171
172    /// Set reply-to queue
173    pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
174        self.reply_to = Some(queue.into());
175        self
176    }
177
178    /// Add a custom header
179    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
180        self.headers.insert(key.into(), value.into());
181        self
182    }
183
184    /// Enable message signing with HMAC
185    pub fn with_signing(mut self, key: Vec<u8>) -> Self {
186        self.sign = true;
187        self.signing_key = Some(key);
188        self
189    }
190
191    /// Enable message encryption with AES-256-GCM
192    pub fn with_encryption(mut self, key: Vec<u8>) -> Self {
193        self.encrypt = true;
194        self.encryption_key = Some(key);
195        self
196    }
197
198    /// Enable compression
199    pub fn with_compression(mut self) -> Self {
200        self.compress = true;
201        self
202    }
203
204    /// Check if message has expired (based on expires_at)
205    pub fn is_expired(&self, current_timestamp: u64) -> bool {
206        self.expires_at.is_some_and(|exp| current_timestamp > exp)
207    }
208
209    /// Check if message should be delayed
210    pub fn should_delay(&self) -> bool {
211        self.delay.is_some()
212    }
213
214    /// Check if message should be signed
215    pub fn should_sign(&self) -> bool {
216        self.sign && self.signing_key.is_some()
217    }
218
219    /// Check if message should be encrypted
220    pub fn should_encrypt(&self) -> bool {
221        self.encrypt && self.encryption_key.is_some()
222    }
223
224    /// Check if message should be compressed
225    pub fn should_compress(&self) -> bool {
226        self.compress
227    }
228}
229
230// =============================================================================
231// Extended Producer Trait
232// =============================================================================
233
234/// Extended producer trait with message options support
235#[async_trait]
236pub trait ExtendedProducer: Producer {
237    /// Publish a message with options
238    async fn publish_with_options(
239        &mut self,
240        queue: &str,
241        message: Message,
242        options: MessageOptions,
243    ) -> Result<()>;
244
245    /// Publish a message with routing and options
246    async fn publish_with_routing_and_options(
247        &mut self,
248        exchange: &str,
249        routing_key: &str,
250        message: Message,
251        options: MessageOptions,
252    ) -> Result<()>;
253}