celers_core/
broker.rs

1use crate::{Result, SerializedTask, TaskId};
2
3/// Message envelope for broker operations
4#[derive(Debug, Clone)]
5pub struct BrokerMessage {
6    /// The serialized task
7    pub task: SerializedTask,
8
9    /// Receipt handle for acknowledging/rejecting the message
10    pub receipt_handle: Option<String>,
11}
12
13impl BrokerMessage {
14    /// Create a new broker message
15    #[must_use]
16    pub const fn new(task: SerializedTask) -> Self {
17        Self {
18            task,
19            receipt_handle: None,
20        }
21    }
22
23    /// Create a new broker message with a receipt handle
24    #[must_use]
25    pub const fn with_receipt_handle(task: SerializedTask, receipt_handle: String) -> Self {
26        Self {
27            task,
28            receipt_handle: Some(receipt_handle),
29        }
30    }
31
32    /// Check if message has a receipt handle
33    #[inline]
34    #[must_use]
35    pub const fn has_receipt_handle(&self) -> bool {
36        self.receipt_handle.is_some()
37    }
38
39    /// Get task ID
40    #[inline]
41    #[must_use]
42    pub const fn task_id(&self) -> crate::TaskId {
43        self.task.metadata.id
44    }
45
46    /// Get task name
47    #[inline]
48    #[must_use]
49    pub fn task_name(&self) -> &str {
50        &self.task.metadata.name
51    }
52
53    /// Get task priority
54    #[inline]
55    #[must_use]
56    pub const fn priority(&self) -> i32 {
57        self.task.metadata.priority
58    }
59
60    /// Check if task is expired
61    #[inline]
62    #[must_use]
63    pub fn is_expired(&self) -> bool {
64        self.task.is_expired()
65    }
66
67    /// Get task age
68    #[inline]
69    #[must_use]
70    pub fn age(&self) -> chrono::Duration {
71        self.task.age()
72    }
73}
74
75impl std::fmt::Display for BrokerMessage {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "BrokerMessage[task={}]", self.task)?;
78        if let Some(ref handle) = self.receipt_handle {
79            write!(f, " receipt={}", &handle[..handle.len().min(8)])?;
80        }
81        Ok(())
82    }
83}
84
85/// Core trait for task queue brokers
86#[async_trait::async_trait]
87pub trait Broker: Send + Sync {
88    /// Enqueue a task to the broker
89    async fn enqueue(&self, task: SerializedTask) -> Result<TaskId>;
90
91    /// Dequeue a task from the broker (blocking/waiting)
92    async fn dequeue(&self) -> Result<Option<BrokerMessage>>;
93
94    /// Acknowledge successful processing of a task
95    async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()>;
96
97    /// Reject a task and potentially requeue it
98    async fn reject(
99        &self,
100        task_id: &TaskId,
101        receipt_handle: Option<&str>,
102        requeue: bool,
103    ) -> Result<()>;
104
105    /// Get the current queue size
106    async fn queue_size(&self) -> Result<usize>;
107
108    /// Cancel a pending task
109    async fn cancel(&self, task_id: &TaskId) -> Result<bool>;
110
111    // Batch Operations (optional, with default implementations)
112
113    /// Enqueue multiple tasks in a single operation (batch)
114    ///
115    /// Default implementation calls `enqueue()` for each task.
116    /// Brokers should override this for better performance.
117    async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
118        let mut task_ids = Vec::with_capacity(tasks.len());
119        for task in tasks {
120            task_ids.push(self.enqueue(task).await?);
121        }
122        Ok(task_ids)
123    }
124
125    /// Dequeue multiple tasks in a single operation (batch)
126    ///
127    /// Returns up to `count` messages from the queue.
128    /// Default implementation calls `dequeue()` multiple times.
129    /// Brokers should override this for better performance.
130    async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>> {
131        let mut messages = Vec::with_capacity(count);
132        for _ in 0..count {
133            if let Some(msg) = self.dequeue().await? {
134                messages.push(msg);
135            } else {
136                break;
137            }
138        }
139        Ok(messages)
140    }
141
142    /// Acknowledge multiple tasks in a single operation (batch)
143    ///
144    /// Default implementation calls `ack()` for each task.
145    async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()> {
146        for (task_id, receipt_handle) in tasks {
147            self.ack(task_id, receipt_handle.as_deref()).await?;
148        }
149        Ok(())
150    }
151
152    // Delayed Task Execution (optional, with default implementations)
153
154    /// Schedule a task for execution at a specific Unix timestamp (seconds)
155    ///
156    /// Default implementation executes the task immediately via `enqueue()`.
157    /// Brokers with scheduling support should override this.
158    async fn enqueue_at(&self, task: SerializedTask, _execute_at: i64) -> Result<TaskId> {
159        // Default: execute immediately
160        self.enqueue(task).await
161    }
162
163    /// Schedule a task for execution after a delay (seconds)
164    ///
165    /// Default implementation executes the task immediately via `enqueue()`.
166    /// Brokers with scheduling support should override this.
167    async fn enqueue_after(&self, task: SerializedTask, _delay_secs: u64) -> Result<TaskId> {
168        // Default: execute immediately
169        self.enqueue(task).await
170    }
171}
172
173/// Batch utilities for `BrokerMessage` collections
174pub mod broker_batch {
175    use super::{BrokerMessage, TaskId};
176    use std::collections::HashMap;
177
178    /// Sort messages by priority (highest first)
179    ///
180    /// # Example
181    /// ```
182    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
183    ///
184    /// let mut messages = vec![
185    ///     BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1]).with_priority(1)),
186    ///     BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2]).with_priority(10)),
187    ///     BrokerMessage::new(SerializedTask::new("task3".to_string(), vec![3]).with_priority(5)),
188    /// ];
189    ///
190    /// broker_batch::sort_by_priority(&mut messages);
191    /// assert_eq!(messages[0].priority(), 10);
192    /// assert_eq!(messages[2].priority(), 1);
193    /// ```
194    #[inline]
195    pub fn sort_by_priority(messages: &mut [BrokerMessage]) {
196        messages.sort_by_key(|b| std::cmp::Reverse(b.priority()));
197    }
198
199    /// Group messages by task name
200    ///
201    /// # Example
202    /// ```
203    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
204    ///
205    /// let messages = vec![
206    ///     BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1])),
207    ///     BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2])),
208    ///     BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![3])),
209    /// ];
210    ///
211    /// let grouped = broker_batch::group_by_task_name(&messages);
212    /// assert_eq!(grouped.get("task1").unwrap().len(), 2);
213    /// assert_eq!(grouped.get("task2").unwrap().len(), 1);
214    /// ```
215    #[inline]
216    #[must_use]
217    pub fn group_by_task_name(messages: &[BrokerMessage]) -> HashMap<String, Vec<&BrokerMessage>> {
218        let mut map: HashMap<String, Vec<&BrokerMessage>> = HashMap::new();
219        for msg in messages {
220            map.entry(msg.task_name().to_string())
221                .or_default()
222                .push(msg);
223        }
224        map
225    }
226
227    /// Filter messages by task name pattern
228    ///
229    /// # Example
230    /// ```
231    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
232    ///
233    /// let messages = vec![
234    ///     BrokerMessage::new(SerializedTask::new("process_data".to_string(), vec![1])),
235    ///     BrokerMessage::new(SerializedTask::new("send_email".to_string(), vec![2])),
236    ///     BrokerMessage::new(SerializedTask::new("process_image".to_string(), vec![3])),
237    /// ];
238    ///
239    /// let process_messages = broker_batch::filter_by_name_prefix(&messages, "process");
240    /// assert_eq!(process_messages.len(), 2);
241    /// ```
242    #[inline]
243    #[must_use]
244    pub fn filter_by_name_prefix<'a>(
245        messages: &'a [BrokerMessage],
246        prefix: &str,
247    ) -> Vec<&'a BrokerMessage> {
248        messages
249            .iter()
250            .filter(|msg| msg.task_name().starts_with(prefix))
251            .collect()
252    }
253
254    /// Get total payload size of all messages
255    ///
256    /// # Example
257    /// ```
258    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
259    ///
260    /// let messages = vec![
261    ///     BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1, 2, 3])),
262    ///     BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![4, 5])),
263    /// ];
264    ///
265    /// let total_size = broker_batch::total_payload_size(&messages);
266    /// assert_eq!(total_size, 5);
267    /// ```
268    #[inline]
269    #[must_use]
270    pub fn total_payload_size(messages: &[BrokerMessage]) -> usize {
271        messages.iter().map(|msg| msg.task.payload.len()).sum()
272    }
273
274    /// Filter expired messages
275    ///
276    /// # Example
277    /// ```
278    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
279    ///
280    /// let messages = vec![
281    ///     BrokerMessage::new(SerializedTask::new("task1".to_string(), vec![1])),
282    ///     BrokerMessage::new(SerializedTask::new("task2".to_string(), vec![2])),
283    /// ];
284    ///
285    /// let expired = broker_batch::filter_expired(&messages);
286    /// // Newly created tasks should not be expired
287    /// assert_eq!(expired.len(), 0);
288    /// ```
289    #[inline]
290    #[must_use]
291    pub fn filter_expired(messages: &[BrokerMessage]) -> Vec<&BrokerMessage> {
292        messages.iter().filter(|msg| msg.is_expired()).collect()
293    }
294
295    /// Extract task IDs and receipt handles for batch acknowledgement
296    ///
297    /// # Example
298    /// ```
299    /// use celers_core::{BrokerMessage, SerializedTask, broker::broker_batch};
300    ///
301    /// let messages = vec![
302    ///     BrokerMessage::with_receipt_handle(
303    ///         SerializedTask::new("task1".to_string(), vec![1]),
304    ///         "receipt1".to_string()
305    ///     ),
306    ///     BrokerMessage::with_receipt_handle(
307    ///         SerializedTask::new("task2".to_string(), vec![2]),
308    ///         "receipt2".to_string()
309    ///     ),
310    /// ];
311    ///
312    /// let ack_data = broker_batch::prepare_ack_batch(&messages);
313    /// assert_eq!(ack_data.len(), 2);
314    /// ```
315    #[inline]
316    #[must_use]
317    pub fn prepare_ack_batch(messages: &[BrokerMessage]) -> Vec<(TaskId, Option<String>)> {
318        messages
319            .iter()
320            .map(|msg| (msg.task_id(), msg.receipt_handle.clone()))
321            .collect()
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use crate::task::SerializedTask;
329
330    fn create_test_task() -> SerializedTask {
331        SerializedTask::new("test_task".to_string(), vec![1, 2, 3])
332    }
333
334    #[test]
335    fn test_broker_message_new() {
336        let task = create_test_task();
337        let task_id = task.metadata.id;
338        let msg = BrokerMessage::new(task);
339
340        assert_eq!(msg.task_id(), task_id);
341        assert_eq!(msg.task_name(), "test_task");
342        assert!(!msg.has_receipt_handle());
343        assert!(msg.receipt_handle.is_none());
344    }
345
346    #[test]
347    fn test_broker_message_with_receipt_handle() {
348        let task = create_test_task();
349        let receipt = "receipt-123456".to_string();
350        let msg = BrokerMessage::with_receipt_handle(task, receipt.clone());
351
352        assert!(msg.has_receipt_handle());
353        assert_eq!(msg.receipt_handle, Some(receipt));
354    }
355
356    #[test]
357    fn test_broker_message_task_accessors() {
358        let task = create_test_task().with_priority(5);
359        let task_id = task.metadata.id;
360        let msg = BrokerMessage::new(task);
361
362        assert_eq!(msg.task_id(), task_id);
363        assert_eq!(msg.task_name(), "test_task");
364        assert_eq!(msg.priority(), 5);
365    }
366
367    #[test]
368    fn test_broker_message_is_expired() {
369        let task = create_test_task();
370        let msg = BrokerMessage::new(task);
371
372        // Newly created task should not be expired
373        assert!(!msg.is_expired());
374    }
375
376    #[test]
377    fn test_broker_message_age() {
378        let task = create_test_task();
379        let msg = BrokerMessage::new(task);
380
381        // Age should be very small for newly created task
382        let age = msg.age();
383        assert!(age.num_seconds() < 1);
384    }
385
386    #[test]
387    fn test_broker_message_display() {
388        let task = create_test_task();
389        let msg = BrokerMessage::new(task);
390
391        let display = format!("{msg}");
392        assert!(display.contains("BrokerMessage"));
393        assert!(display.contains("task="));
394    }
395
396    #[test]
397    fn test_broker_message_display_with_receipt() {
398        let task = create_test_task();
399        let receipt = "very-long-receipt-handle-12345678901234567890".to_string();
400        let msg = BrokerMessage::with_receipt_handle(task, receipt);
401
402        let display = format!("{msg}");
403        assert!(display.contains("BrokerMessage"));
404        assert!(display.contains("receipt="));
405        // Should truncate to 8 characters
406        assert!(display.contains("very-lon"));
407    }
408}