celers_protocol/
utils.rs

1//! Message utility helpers
2//!
3//! This module provides convenient helper functions for common message operations.
4
5use crate::{Message, ValidationError};
6use chrono::{Duration, Utc};
7use uuid::Uuid;
8
9/// Check if a message is expired based on current time
10///
11/// # Arguments
12///
13/// * `message` - The message to check
14///
15/// # Returns
16///
17/// `true` if the message has an expiration time that has passed, `false` otherwise
18#[inline]
19pub fn is_message_expired(message: &Message) -> bool {
20    message
21        .headers
22        .expires
23        .map(|expires| expires < Utc::now())
24        .unwrap_or(false)
25}
26
27/// Check if a message should be executed now (ETA has passed)
28///
29/// # Arguments
30///
31/// * `message` - The message to check
32///
33/// # Returns
34///
35/// `true` if the message has no ETA or the ETA has passed, `false` otherwise
36#[inline]
37pub fn is_ready_to_execute(message: &Message) -> bool {
38    message
39        .headers
40        .eta
41        .map(|eta| eta <= Utc::now())
42        .unwrap_or(true)
43}
44
45/// Get the time remaining until a message expires
46///
47/// # Arguments
48///
49/// * `message` - The message to check
50///
51/// # Returns
52///
53/// `Some(Duration)` if the message has an expiration time in the future, `None` otherwise
54pub fn time_until_expiration(message: &Message) -> Option<Duration> {
55    message.headers.expires.and_then(|expires| {
56        let now = Utc::now();
57        if expires > now {
58            Some(expires - now)
59        } else {
60            None
61        }
62    })
63}
64
65/// Get the time remaining until a message should be executed
66///
67/// # Arguments
68///
69/// * `message` - The message to check
70///
71/// # Returns
72///
73/// `Some(Duration)` if the message has an ETA in the future, `None` otherwise
74pub fn time_until_eta(message: &Message) -> Option<Duration> {
75    message.headers.eta.and_then(|eta| {
76        let now = Utc::now();
77        if eta > now {
78            Some(eta - now)
79        } else {
80            None
81        }
82    })
83}
84
85/// Calculate the age of a message based on its ETA or expires timestamp
86///
87/// # Arguments
88///
89/// * `message` - The message to check
90///
91/// # Returns
92///
93/// `Duration` representing the estimated age of the message.
94/// If the message has an ETA in the past, returns the duration since that ETA.
95/// If the message has an expires timestamp, estimates age as 1/4 of time until expiration.
96/// Otherwise returns zero.
97///
98/// Note: This is an estimation since messages don't carry creation timestamps.
99/// For accurate message age tracking, add a custom header with creation timestamp.
100pub fn message_age(message: &Message) -> Duration {
101    let now = Utc::now();
102
103    // If ETA is in the past, assume message was created around that time
104    if let Some(eta) = message.headers.eta {
105        if eta < now {
106            return now - eta;
107        }
108    }
109
110    // If expires is set, estimate age based on typical TTL patterns
111    // This is a heuristic: assume message was created 1 hour before expiration
112    // or 25% of the time to expiration, whichever is smaller
113    if let Some(expires) = message.headers.expires {
114        if expires > now {
115            let time_to_expire = expires - now;
116            let estimated_ttl = time_to_expire + Duration::hours(1);
117            return Duration::hours(1).min(estimated_ttl / 4);
118        }
119        // Message is expired, estimate it was created 1 hour before expiration
120        return now - (expires - Duration::hours(1));
121    }
122
123    Duration::zero()
124}
125
126/// Check if a message should be retried based on retry count
127///
128/// # Arguments
129///
130/// * `message` - The message to check
131/// * `max_retries` - Maximum number of retries allowed
132///
133/// # Returns
134///
135/// `true` if the message can be retried, `false` otherwise
136#[inline]
137pub fn can_retry(message: &Message, max_retries: u32) -> bool {
138    message.headers.retries.unwrap_or(0) < max_retries
139}
140
141/// Create a retry message with incremented retry count
142///
143/// # Arguments
144///
145/// * `message` - The original message
146/// * `delay` - Optional delay before retry
147///
148/// # Returns
149///
150/// A new message with incremented retry count and optional ETA
151pub fn create_retry_message(message: &Message, delay: Option<Duration>) -> Message {
152    let mut retry_msg = message.clone();
153    let current_retries = retry_msg.headers.retries.unwrap_or(0);
154    retry_msg.headers.retries = Some(current_retries + 1);
155
156    if let Some(delay) = delay {
157        retry_msg.headers.eta = Some(Utc::now() + delay);
158    }
159
160    retry_msg
161}
162
163/// Clone a message with a new task ID (useful for retries or re-queuing)
164///
165/// # Arguments
166///
167/// * `message` - The message to clone
168///
169/// # Returns
170///
171/// A new message with a new UUID
172pub fn clone_with_new_id(message: &Message) -> Message {
173    let mut new_msg = message.clone();
174    new_msg.headers.id = Uuid::new_v4();
175    new_msg
176}
177
178/// Calculate exponential backoff delay for retries
179///
180/// # Arguments
181///
182/// * `retry_count` - Current retry attempt number (0-indexed)
183/// * `base_delay_secs` - Base delay in seconds
184/// * `max_delay_secs` - Maximum delay in seconds
185///
186/// # Returns
187///
188/// Duration for the backoff delay
189pub fn exponential_backoff(
190    retry_count: u32,
191    base_delay_secs: u32,
192    max_delay_secs: u32,
193) -> Duration {
194    let delay_secs = (base_delay_secs * 2_u32.pow(retry_count)).min(max_delay_secs);
195    Duration::seconds(delay_secs as i64)
196}
197
198/// Validate multiple messages in batch
199///
200/// # Arguments
201///
202/// * `messages` - Slice of messages to validate
203///
204/// # Returns
205///
206/// `Ok(())` if all messages are valid, `Err(ValidationError)` for the first invalid message
207pub fn validate_batch(messages: &[Message]) -> Result<(), ValidationError> {
208    for msg in messages {
209        msg.validate()?;
210    }
211    Ok(())
212}
213
214/// Filter messages by task name pattern
215///
216/// # Arguments
217///
218/// * `messages` - Slice of messages to filter
219/// * `pattern` - Task name pattern (supports prefix matching)
220///
221/// # Returns
222///
223/// Vector of references to messages matching the pattern
224pub fn filter_by_task<'a>(messages: &'a [Message], pattern: &str) -> Vec<&'a Message> {
225    messages
226        .iter()
227        .filter(|msg| msg.headers.task.starts_with(pattern))
228        .collect()
229}
230
231/// Group messages by task name
232///
233/// # Arguments
234///
235/// * `messages` - Slice of messages to group
236///
237/// # Returns
238///
239/// HashMap mapping task names to vectors of messages
240pub fn group_by_task(messages: Vec<Message>) -> std::collections::HashMap<String, Vec<Message>> {
241    let mut groups = std::collections::HashMap::new();
242    for msg in messages {
243        groups
244            .entry(msg.headers.task.clone())
245            .or_insert_with(Vec::new)
246            .push(msg);
247    }
248    groups
249}
250
251/// Sort messages by priority (highest first)
252///
253/// # Arguments
254///
255/// * `messages` - Mutable slice of messages to sort
256pub fn sort_by_priority(messages: &mut [Message]) {
257    messages.sort_by(|a, b| {
258        let priority_a = a.properties.priority.unwrap_or(0);
259        let priority_b = b.properties.priority.unwrap_or(0);
260        priority_b.cmp(&priority_a) // Reverse order (highest first)
261    });
262}
263
264/// Sort messages by ETA (earliest first)
265///
266/// # Arguments
267///
268/// * `messages` - Mutable slice of messages to sort
269pub fn sort_by_eta(messages: &mut [Message]) {
270    messages.sort_by(|a, b| match (a.headers.eta, b.headers.eta) {
271        (Some(eta_a), Some(eta_b)) => eta_a.cmp(&eta_b),
272        (Some(_), None) => std::cmp::Ordering::Less,
273        (None, Some(_)) => std::cmp::Ordering::Greater,
274        (None, None) => std::cmp::Ordering::Equal,
275    });
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::builder::MessageBuilder;
282
283    fn create_test_message() -> Message {
284        MessageBuilder::new("tasks.test")
285            .args(vec![serde_json::json!(1)])
286            .build()
287            .unwrap()
288    }
289
290    #[test]
291    fn test_is_message_expired() {
292        let mut msg = create_test_message();
293
294        // Message without expiration is not expired
295        assert!(!is_message_expired(&msg));
296
297        // Message with future expiration is not expired
298        msg.headers.expires = Some(Utc::now() + Duration::hours(1));
299        assert!(!is_message_expired(&msg));
300
301        // Message with past expiration is expired
302        msg.headers.expires = Some(Utc::now() - Duration::hours(1));
303        assert!(is_message_expired(&msg));
304    }
305
306    #[test]
307    fn test_is_ready_to_execute() {
308        let mut msg = create_test_message();
309
310        // Message without ETA is ready
311        assert!(is_ready_to_execute(&msg));
312
313        // Message with past ETA is ready
314        msg.headers.eta = Some(Utc::now() - Duration::hours(1));
315        assert!(is_ready_to_execute(&msg));
316
317        // Message with future ETA is not ready
318        msg.headers.eta = Some(Utc::now() + Duration::hours(1));
319        assert!(!is_ready_to_execute(&msg));
320    }
321
322    #[test]
323    fn test_time_until_expiration() {
324        let mut msg = create_test_message();
325
326        // No expiration
327        assert!(time_until_expiration(&msg).is_none());
328
329        // Future expiration
330        msg.headers.expires = Some(Utc::now() + Duration::hours(1));
331        let remaining = time_until_expiration(&msg);
332        assert!(remaining.is_some());
333        assert!(remaining.unwrap().num_minutes() > 50);
334
335        // Past expiration
336        msg.headers.expires = Some(Utc::now() - Duration::hours(1));
337        assert!(time_until_expiration(&msg).is_none());
338    }
339
340    #[test]
341    fn test_can_retry() {
342        let mut msg = create_test_message();
343
344        // No retries yet
345        assert!(can_retry(&msg, 3));
346
347        // Some retries
348        msg.headers.retries = Some(2);
349        assert!(can_retry(&msg, 3));
350
351        // Max retries reached
352        msg.headers.retries = Some(3);
353        assert!(!can_retry(&msg, 3));
354    }
355
356    #[test]
357    fn test_create_retry_message() {
358        let msg = create_test_message();
359
360        // Without delay
361        let retry = create_retry_message(&msg, None);
362        assert_eq!(retry.headers.retries, Some(1));
363        assert_eq!(retry.headers.task, msg.headers.task);
364
365        // With delay
366        let delay = Duration::minutes(5);
367        let retry_delayed = create_retry_message(&msg, Some(delay));
368        assert_eq!(retry_delayed.headers.retries, Some(1));
369        assert!(retry_delayed.headers.eta.is_some());
370    }
371
372    #[test]
373    fn test_clone_with_new_id() {
374        let msg = create_test_message();
375        let original_id = msg.headers.id;
376
377        let cloned = clone_with_new_id(&msg);
378        assert_ne!(cloned.headers.id, original_id);
379        assert_eq!(cloned.headers.task, msg.headers.task);
380    }
381
382    #[test]
383    fn test_exponential_backoff() {
384        assert_eq!(exponential_backoff(0, 1, 60), Duration::seconds(1));
385        assert_eq!(exponential_backoff(1, 1, 60), Duration::seconds(2));
386        assert_eq!(exponential_backoff(2, 1, 60), Duration::seconds(4));
387        assert_eq!(exponential_backoff(3, 1, 60), Duration::seconds(8));
388
389        // Test max cap
390        assert_eq!(exponential_backoff(10, 1, 60), Duration::seconds(60));
391    }
392
393    #[test]
394    fn test_validate_batch() {
395        let msg1 = create_test_message();
396        let msg2 = create_test_message();
397        let messages = vec![msg1, msg2];
398
399        assert!(validate_batch(&messages).is_ok());
400
401        // Test with invalid message
402        let mut invalid = create_test_message();
403        invalid.headers.task = String::new();
404        let messages_with_invalid = vec![create_test_message(), invalid];
405
406        assert!(validate_batch(&messages_with_invalid).is_err());
407    }
408
409    #[test]
410    fn test_filter_by_task() {
411        let msg1 = MessageBuilder::new("tasks.add").build().unwrap();
412        let msg2 = MessageBuilder::new("tasks.subtract").build().unwrap();
413        let msg3 = MessageBuilder::new("email.send").build().unwrap();
414
415        let messages = vec![msg1, msg2, msg3];
416        let filtered = filter_by_task(&messages, "tasks.");
417
418        assert_eq!(filtered.len(), 2);
419    }
420
421    #[test]
422    fn test_sort_by_priority() {
423        let mut msg1 = create_test_message();
424        let mut msg2 = create_test_message();
425        let mut msg3 = create_test_message();
426
427        msg1.properties.priority = Some(5);
428        msg2.properties.priority = Some(9);
429        msg3.properties.priority = Some(1);
430
431        let mut messages = vec![msg1, msg2, msg3];
432        sort_by_priority(&mut messages);
433
434        assert_eq!(messages[0].properties.priority, Some(9));
435        assert_eq!(messages[1].properties.priority, Some(5));
436        assert_eq!(messages[2].properties.priority, Some(1));
437    }
438
439    #[test]
440    fn test_sort_by_eta() {
441        let now = Utc::now();
442        let mut msg1 = create_test_message();
443        let mut msg2 = create_test_message();
444        let mut msg3 = create_test_message();
445
446        msg1.headers.eta = Some(now + Duration::hours(2));
447        msg2.headers.eta = Some(now + Duration::hours(1));
448        msg3.headers.eta = None;
449
450        let mut messages = vec![msg1, msg2, msg3];
451        sort_by_eta(&mut messages);
452
453        assert!(messages[0].headers.eta.is_some());
454        assert!(messages[1].headers.eta.is_some());
455        assert!(messages[2].headers.eta.is_none());
456    }
457}