celers_core/broker.rs
1use crate::{Result, SerializedTask, TaskId};
2
3/// Message envelope for broker operations
4#[derive(Debug, Clone)]
5pub struct BrokerMessage {
6 /// The serialized task
7 pub task: SerializedTask,
8
9 /// Receipt handle for acknowledging/rejecting the message
10 pub receipt_handle: Option<String>,
11}
12
13impl BrokerMessage {
14 /// Create a new broker message
15 #[must_use]
16 pub const fn new(task: SerializedTask) -> Self {
17 Self {
18 task,
19 receipt_handle: None,
20 }
21 }
22
23 /// Create a new broker message with a receipt handle
24 #[must_use]
25 pub const fn with_receipt_handle(task: SerializedTask, receipt_handle: String) -> Self {
26 Self {
27 task,
28 receipt_handle: Some(receipt_handle),
29 }
30 }
31
32 /// Check if message has a receipt handle
33 #[inline]
34 #[must_use]
35 pub const fn has_receipt_handle(&self) -> bool {
36 self.receipt_handle.is_some()
37 }
38
39 /// Get task ID
40 #[inline]
41 #[must_use]
42 pub const fn task_id(&self) -> crate::TaskId {
43 self.task.metadata.id
44 }
45
46 /// Get task name
47 #[inline]
48 #[must_use]
49 pub fn task_name(&self) -> &str {
50 &self.task.metadata.name
51 }
52
53 /// Get task priority
54 #[inline]
55 #[must_use]
56 pub const fn priority(&self) -> i32 {
57 self.task.metadata.priority
58 }
59
60 /// Check if task is expired
61 #[inline]
62 #[must_use]
63 pub fn is_expired(&self) -> bool {
64 self.task.is_expired()
65 }
66
67 /// Get task age
68 #[inline]
69 #[must_use]
70 pub fn age(&self) -> chrono::Duration {
71 self.task.age()
72 }
73}
74
75impl std::fmt::Display for BrokerMessage {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "BrokerMessage[task={}]", self.task)?;
78 if let Some(ref handle) = self.receipt_handle {
79 write!(f, " receipt={}", &handle[..handle.len().min(8)])?;
80 }
81 Ok(())
82 }
83}
84
85/// Core trait for task queue brokers
86#[async_trait::async_trait]
87pub trait Broker: Send + Sync {
88 /// Enqueue a task to the broker
89 async fn enqueue(&self, task: SerializedTask) -> Result<TaskId>;
90
91 /// Dequeue a task from the broker (blocking/waiting)
92 async fn dequeue(&self) -> Result<Option<BrokerMessage>>;
93
94 /// Acknowledge successful processing of a task
95 async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()>;
96
97 /// Reject a task and potentially requeue it
98 async fn reject(
99 &self,
100 task_id: &TaskId,
101 receipt_handle: Option<&str>,
102 requeue: bool,
103 ) -> Result<()>;
104
105 /// Get the current queue size
106 async fn queue_size(&self) -> Result<usize>;
107
108 /// Cancel a pending task
109 async fn cancel(&self, task_id: &TaskId) -> Result<bool>;
110
111 // Batch Operations (optional, with default implementations)
112
113 /// Enqueue multiple tasks in a single operation (batch)
114 ///
115 /// Default implementation calls `enqueue()` for each task.
116 /// Brokers should override this for better performance.
117 async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
118 let mut task_ids = Vec::with_capacity(tasks.len());
119 for task in tasks {
120 task_ids.push(self.enqueue(task).await?);
121 }
122 Ok(task_ids)
123 }
124
125 /// Dequeue multiple tasks in a single operation (batch)
126 ///
127 /// Returns up to `count` messages from the queue.
128 /// Default implementation calls `dequeue()` multiple times.
129 /// Brokers should override this for better performance.
130 async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>> {
131 let mut messages = Vec::with_capacity(count);
132 for _ in 0..count {
133 if let Some(msg) = self.dequeue().await? {
134 messages.push(msg);
135 } else {
136 break;
137 }
138 }
139 Ok(messages)
140 }
141
142 /// Acknowledge multiple tasks in a single operation (batch)
143 ///
144 /// Default implementation calls `ack()` for each task.
145 async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()> {
146 for (task_id, receipt_handle) in tasks {
147 self.ack(task_id, receipt_handle.as_deref()).await?;
148 }
149 Ok(())
150 }
151
152 // Delayed Task Execution (optional, with default implementations)
153
154 /// Schedule a task for execution at a specific Unix timestamp (seconds)
155 ///
156 /// Default implementation executes the task immediately via `enqueue()`.
157 /// Brokers with scheduling support should override this.
158 async fn enqueue_at(&self, task: SerializedTask, _execute_at: i64) -> Result<TaskId> {
159 // Default: execute immediately
160 self.enqueue(task).await
161 }
162
163 /// Schedule a task for execution after a delay (seconds)
164 ///
165 /// Default implementation executes the task immediately via `enqueue()`.
166 /// Brokers with scheduling support should override this.
167 async fn enqueue_after(&self, task: SerializedTask, _delay_secs: u64) -> Result<TaskId> {
168 // Default: execute immediately
169 self.enqueue(task).await
170 }
171}
172
173/// Batch utilities for `BrokerMessage` collections
174pub mod broker_batch {
175 use super::{BrokerMessage, TaskId};
176 use std::collections::HashMap;
177
178 /// Sort messages by priority (highest first)
179 ///
180 /// # Example
181 /// ```
182 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
183 ///
184 /// let mut messages = vec![
185 /// BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1]).with_priority(1)),
186 /// BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2]).with_priority(10)),
187 /// BrokerMessage::new(SerializedTask::new("task3".to_string(), vec![3]).with_priority(5)),
188 /// ];
189 ///
190 /// broker_batch::sort_by_priority(&mut messages);
191 /// assert_eq!(messages[0].priority(), 10);
192 /// assert_eq!(messages[2].priority(), 1);
193 /// ```
194 #[inline]
195 pub fn sort_by_priority(messages: &mut [BrokerMessage]) {
196 messages.sort_by_key(|b| std::cmp::Reverse(b.priority()));
197 }
198
199 /// Group messages by task name
200 ///
201 /// # Example
202 /// ```
203 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
204 ///
205 /// let messages = vec![
206 /// BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1])),
207 /// BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2])),
208 /// BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![3])),
209 /// ];
210 ///
211 /// let grouped = broker_batch::group_by_task_name(&messages);
212 /// assert_eq!(grouped.get("task1").unwrap().len(), 2);
213 /// assert_eq!(grouped.get("task2").unwrap().len(), 1);
214 /// ```
215 #[inline]
216 #[must_use]
217 pub fn group_by_task_name(messages: &[BrokerMessage]) -> HashMap<String, Vec<&BrokerMessage>> {
218 let mut map: HashMap<String, Vec<&BrokerMessage>> = HashMap::new();
219 for msg in messages {
220 map.entry(msg.task_name().to_string())
221 .or_default()
222 .push(msg);
223 }
224 map
225 }
226
227 /// Filter messages by task name pattern
228 ///
229 /// # Example
230 /// ```
231 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
232 ///
233 /// let messages = vec![
234 /// BrokerMessage::new(SerializedTask::new("process_data".to_string(), vec![1])),
235 /// BrokerMessage::new(SerializedTask::new("send_email".to_string(), vec![2])),
236 /// BrokerMessage::new(SerializedTask::new("process_image".to_string(), vec![3])),
237 /// ];
238 ///
239 /// let process_messages = broker_batch::filter_by_name_prefix(&messages, "process");
240 /// assert_eq!(process_messages.len(), 2);
241 /// ```
242 #[inline]
243 #[must_use]
244 pub fn filter_by_name_prefix<'a>(
245 messages: &'a [BrokerMessage],
246 prefix: &str,
247 ) -> Vec<&'a BrokerMessage> {
248 messages
249 .iter()
250 .filter(|msg| msg.task_name().starts_with(prefix))
251 .collect()
252 }
253
254 /// Get total payload size of all messages
255 ///
256 /// # Example
257 /// ```
258 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
259 ///
260 /// let messages = vec![
261 /// BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1, 2, 3])),
262 /// BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![4, 5])),
263 /// ];
264 ///
265 /// let total_size = broker_batch::total_payload_size(&messages);
266 /// assert_eq!(total_size, 5);
267 /// ```
268 #[inline]
269 #[must_use]
270 pub fn total_payload_size(messages: &[BrokerMessage]) -> usize {
271 messages.iter().map(|msg| msg.task.payload.len()).sum()
272 }
273
274 /// Filter expired messages
275 ///
276 /// # Example
277 /// ```
278 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
279 ///
280 /// let messages = vec![
281 /// BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1])),
282 /// BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2])),
283 /// ];
284 ///
285 /// let expired = broker_batch::filter_expired(&messages);
286 /// // Newly created tasks should not be expired
287 /// assert_eq!(expired.len(), 0);
288 /// ```
289 #[inline]
290 #[must_use]
291 pub fn filter_expired(messages: &[BrokerMessage]) -> Vec<&BrokerMessage> {
292 messages.iter().filter(|msg| msg.is_expired()).collect()
293 }
294
295 /// Extract task IDs and receipt handles for batch acknowledgement
296 ///
297 /// # Example
298 /// ```
299 /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
300 ///
301 /// let messages = vec![
302 /// BrokerMessage::with_receipt_handle(
303 /// SerializedTask::new("task1".to_string(), vec![1]),
304 /// "receipt1".to_string()
305 /// ),
306 /// BrokerMessage::with_receipt_handle(
307 /// SerializedTask::new("task2".to_string(), vec![2]),
308 /// "receipt2".to_string()
309 /// ),
310 /// ];
311 ///
312 /// let ack_data = broker_batch::prepare_ack_batch(&messages);
313 /// assert_eq!(ack_data.len(), 2);
314 /// ```
315 #[inline]
316 #[must_use]
317 pub fn prepare_ack_batch(messages: &[BrokerMessage]) -> Vec<(TaskId, Option<String>)> {
318 messages
319 .iter()
320 .map(|msg| (msg.task_id(), msg.receipt_handle.clone()))
321 .collect()
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use crate::task::SerializedTask;
329
330 fn create_test_task() -> SerializedTask {
331 SerializedTask::new("test_task".to_string(), vec![1, 2, 3])
332 }
333
334 #[test]
335 fn test_broker_message_new() {
336 let task = create_test_task();
337 let task_id = task.metadata.id;
338 let msg = BrokerMessage::new(task);
339
340 assert_eq!(msg.task_id(), task_id);
341 assert_eq!(msg.task_name(), "test_task");
342 assert!(!msg.has_receipt_handle());
343 assert!(msg.receipt_handle.is_none());
344 }
345
346 #[test]
347 fn test_broker_message_with_receipt_handle() {
348 let task = create_test_task();
349 let receipt = "receipt-123456".to_string();
350 let msg = BrokerMessage::with_receipt_handle(task, receipt.clone());
351
352 assert!(msg.has_receipt_handle());
353 assert_eq!(msg.receipt_handle, Some(receipt));
354 }
355
356 #[test]
357 fn test_broker_message_task_accessors() {
358 let task = create_test_task().with_priority(5);
359 let task_id = task.metadata.id;
360 let msg = BrokerMessage::new(task);
361
362 assert_eq!(msg.task_id(), task_id);
363 assert_eq!(msg.task_name(), "test_task");
364 assert_eq!(msg.priority(), 5);
365 }
366
367 #[test]
368 fn test_broker_message_is_expired() {
369 let task = create_test_task();
370 let msg = BrokerMessage::new(task);
371
372 // Newly created task should not be expired
373 assert!(!msg.is_expired());
374 }
375
376 #[test]
377 fn test_broker_message_age() {
378 let task = create_test_task();
379 let msg = BrokerMessage::new(task);
380
381 // Age should be very small for newly created task
382 let age = msg.age();
383 assert!(age.num_seconds() < 1);
384 }
385
386 #[test]
387 fn test_broker_message_display() {
388 let task = create_test_task();
389 let msg = BrokerMessage::new(task);
390
391 let display = format!("{msg}");
392 assert!(display.contains("BrokerMessage"));
393 assert!(display.contains("task="));
394 }
395
396 #[test]
397 fn test_broker_message_display_with_receipt() {
398 let task = create_test_task();
399 let receipt = "very-long-receipt-handle-12345678901234567890".to_string();
400 let msg = BrokerMessage::with_receipt_handle(task, receipt);
401
402 let display = format!("{msg}");
403 assert!(display.contains("BrokerMessage"));
404 assert!(display.contains("receipt="));
405 // Should truncate to 8 characters
406 assert!(display.contains("very-lon"));
407 }
408}