celers_kombu/priority.rs
1//! Priority, message options, and extended producer types.
2
3use async_trait::async_trait;
4use celers_protocol::Message;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8
9use crate::{Producer, Result};
10
11// =============================================================================
12// Message Options
13// =============================================================================
14
15/// Priority levels for messages
16///
17/// # Examples
18///
19/// ```
20/// use celers_kombu::Priority;
21///
22/// let normal = Priority::Normal;
23/// assert_eq!(normal.as_u8(), 5);
24/// assert_eq!(normal.to_string(), "normal");
25///
26/// let high = Priority::High;
27/// assert!(high > normal);
28/// assert_eq!(high.as_u8(), 7);
29///
30/// // Convert from numeric value
31/// let priority = Priority::from_u8(8);
32/// assert_eq!(priority, Priority::High);
33///
34/// // Default is Normal
35/// let default = Priority::default();
36/// assert_eq!(default, Priority::Normal);
37/// ```
38#[derive(
39 Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
40)]
41pub enum Priority {
42 /// Lowest priority (0)
43 Lowest = 0,
44 /// Low priority (3)
45 Low = 3,
46 /// Normal priority (5)
47 #[default]
48 Normal = 5,
49 /// High priority (7)
50 High = 7,
51 /// Highest priority (9)
52 Highest = 9,
53}
54
55impl std::fmt::Display for Priority {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 Priority::Lowest => write!(f, "lowest"),
59 Priority::Low => write!(f, "low"),
60 Priority::Normal => write!(f, "normal"),
61 Priority::High => write!(f, "high"),
62 Priority::Highest => write!(f, "highest"),
63 }
64 }
65}
66
67impl Priority {
68 /// Convert to numeric value (0-9)
69 pub fn as_u8(&self) -> u8 {
70 *self as u8
71 }
72
73 /// Create from numeric value (clamped to 0-9)
74 pub fn from_u8(value: u8) -> Self {
75 match value {
76 0..=1 => Priority::Lowest,
77 2..=4 => Priority::Low,
78 5 => Priority::Normal,
79 6..=8 => Priority::High,
80 _ => Priority::Highest,
81 }
82 }
83}
84
85/// Message-level options
86///
87/// # Examples
88///
89/// ```
90/// use celers_kombu::{MessageOptions, Priority};
91/// use std::time::Duration;
92///
93/// let options = MessageOptions::new()
94/// .with_priority(Priority::High)
95/// .with_ttl(Duration::from_secs(3600))
96/// .with_correlation_id("req-123".to_string())
97/// .with_reply_to("response_queue".to_string());
98///
99/// assert_eq!(options.priority, Some(Priority::High));
100/// assert_eq!(options.ttl, Some(Duration::from_secs(3600)));
101/// assert_eq!(options.correlation_id, Some("req-123".to_string()));
102///
103/// // Check if message should be signed
104/// let secure_options = MessageOptions::new()
105/// .with_signing(b"secret-key".to_vec());
106/// assert!(secure_options.should_sign());
107/// ```
108#[derive(Debug, Clone, Default, Serialize, Deserialize)]
109pub struct MessageOptions {
110 /// Message priority
111 pub priority: Option<Priority>,
112 /// Message TTL (time-to-live)
113 pub ttl: Option<Duration>,
114 /// Message expiration timestamp (absolute)
115 pub expires_at: Option<u64>,
116 /// Delay before message becomes visible
117 pub delay: Option<Duration>,
118 /// Correlation ID for request/response patterns
119 pub correlation_id: Option<String>,
120 /// Reply-to queue for RPC patterns
121 pub reply_to: Option<String>,
122 /// Custom headers
123 pub headers: HashMap<String, String>,
124 /// Enable message signing (HMAC)
125 pub sign: bool,
126 /// Signing key for HMAC (if signing is enabled)
127 pub signing_key: Option<Vec<u8>>,
128 /// Enable message encryption (AES-256-GCM)
129 pub encrypt: bool,
130 /// Encryption key (32 bytes for AES-256)
131 pub encryption_key: Option<Vec<u8>>,
132 /// Compression hint
133 pub compress: bool,
134}
135
136impl MessageOptions {
137 /// Create new message options
138 pub fn new() -> Self {
139 Self::default()
140 }
141
142 /// Set priority
143 pub fn with_priority(mut self, priority: Priority) -> Self {
144 self.priority = Some(priority);
145 self
146 }
147
148 /// Set TTL
149 pub fn with_ttl(mut self, ttl: Duration) -> Self {
150 self.ttl = Some(ttl);
151 self
152 }
153
154 /// Set expiration timestamp (Unix timestamp in seconds)
155 pub fn with_expires_at(mut self, timestamp: u64) -> Self {
156 self.expires_at = Some(timestamp);
157 self
158 }
159
160 /// Set delay
161 pub fn with_delay(mut self, delay: Duration) -> Self {
162 self.delay = Some(delay);
163 self
164 }
165
166 /// Set correlation ID
167 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
168 self.correlation_id = Some(id.into());
169 self
170 }
171
172 /// Set reply-to queue
173 pub fn with_reply_to(mut self, queue: impl Into<String>) -> Self {
174 self.reply_to = Some(queue.into());
175 self
176 }
177
178 /// Add a custom header
179 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
180 self.headers.insert(key.into(), value.into());
181 self
182 }
183
184 /// Enable message signing with HMAC
185 pub fn with_signing(mut self, key: Vec<u8>) -> Self {
186 self.sign = true;
187 self.signing_key = Some(key);
188 self
189 }
190
191 /// Enable message encryption with AES-256-GCM
192 pub fn with_encryption(mut self, key: Vec<u8>) -> Self {
193 self.encrypt = true;
194 self.encryption_key = Some(key);
195 self
196 }
197
198 /// Enable compression
199 pub fn with_compression(mut self) -> Self {
200 self.compress = true;
201 self
202 }
203
204 /// Check if message has expired (based on expires_at)
205 pub fn is_expired(&self, current_timestamp: u64) -> bool {
206 self.expires_at.is_some_and(|exp| current_timestamp > exp)
207 }
208
209 /// Check if message should be delayed
210 pub fn should_delay(&self) -> bool {
211 self.delay.is_some()
212 }
213
214 /// Check if message should be signed
215 pub fn should_sign(&self) -> bool {
216 self.sign && self.signing_key.is_some()
217 }
218
219 /// Check if message should be encrypted
220 pub fn should_encrypt(&self) -> bool {
221 self.encrypt && self.encryption_key.is_some()
222 }
223
224 /// Check if message should be compressed
225 pub fn should_compress(&self) -> bool {
226 self.compress
227 }
228}
229
230// =============================================================================
231// Extended Producer Trait
232// =============================================================================
233
234/// Extended producer trait with message options support
235#[async_trait]
236pub trait ExtendedProducer: Producer {
237 /// Publish a message with options
238 async fn publish_with_options(
239 &mut self,
240 queue: &str,
241 message: Message,
242 options: MessageOptions,
243 ) -> Result<()>;
244
245 /// Publish a message with routing and options
246 async fn publish_with_routing_and_options(
247 &mut self,
248 exchange: &str,
249 routing_key: &str,
250 message: Message,
251 options: MessageOptions,
252 ) -> Result<()>;
253}