1use crate::{Message, ValidationError};
6use chrono::{Duration, Utc};
7use uuid::Uuid;
8
9#[inline]
19pub fn is_message_expired(message: &Message) -> bool {
20 message
21 .headers
22 .expires
23 .map(|expires| expires < Utc::now())
24 .unwrap_or(false)
25}
26
27#[inline]
37pub fn is_ready_to_execute(message: &Message) -> bool {
38 message
39 .headers
40 .eta
41 .map(|eta| eta <= Utc::now())
42 .unwrap_or(true)
43}
44
45pub fn time_until_expiration(message: &Message) -> Option<Duration> {
55 message.headers.expires.and_then(|expires| {
56 let now = Utc::now();
57 if expires > now {
58 Some(expires - now)
59 } else {
60 None
61 }
62 })
63}
64
65pub fn time_until_eta(message: &Message) -> Option<Duration> {
75 message.headers.eta.and_then(|eta| {
76 let now = Utc::now();
77 if eta > now {
78 Some(eta - now)
79 } else {
80 None
81 }
82 })
83}
84
85pub fn message_age(message: &Message) -> Duration {
101 let now = Utc::now();
102
103 if let Some(eta) = message.headers.eta {
105 if eta < now {
106 return now - eta;
107 }
108 }
109
110 if let Some(expires) = message.headers.expires {
114 if expires > now {
115 let time_to_expire = expires - now;
116 let estimated_ttl = time_to_expire + Duration::hours(1);
117 return Duration::hours(1).min(estimated_ttl / 4);
118 }
119 return now - (expires - Duration::hours(1));
121 }
122
123 Duration::zero()
124}
125
126#[inline]
137pub fn can_retry(message: &Message, max_retries: u32) -> bool {
138 message.headers.retries.unwrap_or(0) < max_retries
139}
140
141pub fn create_retry_message(message: &Message, delay: Option<Duration>) -> Message {
152 let mut retry_msg = message.clone();
153 let current_retries = retry_msg.headers.retries.unwrap_or(0);
154 retry_msg.headers.retries = Some(current_retries + 1);
155
156 if let Some(delay) = delay {
157 retry_msg.headers.eta = Some(Utc::now() + delay);
158 }
159
160 retry_msg
161}
162
163pub fn clone_with_new_id(message: &Message) -> Message {
173 let mut new_msg = message.clone();
174 new_msg.headers.id = Uuid::new_v4();
175 new_msg
176}
177
178pub fn exponential_backoff(
190 retry_count: u32,
191 base_delay_secs: u32,
192 max_delay_secs: u32,
193) -> Duration {
194 let delay_secs = (base_delay_secs * 2_u32.pow(retry_count)).min(max_delay_secs);
195 Duration::seconds(delay_secs as i64)
196}
197
198pub fn validate_batch(messages: &[Message]) -> Result<(), ValidationError> {
208 for msg in messages {
209 msg.validate()?;
210 }
211 Ok(())
212}
213
214pub fn filter_by_task<'a>(messages: &'a [Message], pattern: &str) -> Vec<&'a Message> {
225 messages
226 .iter()
227 .filter(|msg| msg.headers.task.starts_with(pattern))
228 .collect()
229}
230
231pub fn group_by_task(messages: Vec<Message>) -> std::collections::HashMap<String, Vec<Message>> {
241 let mut groups = std::collections::HashMap::new();
242 for msg in messages {
243 groups
244 .entry(msg.headers.task.clone())
245 .or_insert_with(Vec::new)
246 .push(msg);
247 }
248 groups
249}
250
251pub fn sort_by_priority(messages: &mut [Message]) {
257 messages.sort_by(|a, b| {
258 let priority_a = a.properties.priority.unwrap_or(0);
259 let priority_b = b.properties.priority.unwrap_or(0);
260 priority_b.cmp(&priority_a) });
262}
263
264pub fn sort_by_eta(messages: &mut [Message]) {
270 messages.sort_by(|a, b| match (a.headers.eta, b.headers.eta) {
271 (Some(eta_a), Some(eta_b)) => eta_a.cmp(&eta_b),
272 (Some(_), None) => std::cmp::Ordering::Less,
273 (None, Some(_)) => std::cmp::Ordering::Greater,
274 (None, None) => std::cmp::Ordering::Equal,
275 });
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::builder::MessageBuilder;
282
283 fn create_test_message() -> Message {
284 MessageBuilder::new("tasks.test")
285 .args(vec![serde_json::json!(1)])
286 .build()
287 .unwrap()
288 }
289
290 #[test]
291 fn test_is_message_expired() {
292 let mut msg = create_test_message();
293
294 assert!(!is_message_expired(&msg));
296
297 msg.headers.expires = Some(Utc::now() + Duration::hours(1));
299 assert!(!is_message_expired(&msg));
300
301 msg.headers.expires = Some(Utc::now() - Duration::hours(1));
303 assert!(is_message_expired(&msg));
304 }
305
306 #[test]
307 fn test_is_ready_to_execute() {
308 let mut msg = create_test_message();
309
310 assert!(is_ready_to_execute(&msg));
312
313 msg.headers.eta = Some(Utc::now() - Duration::hours(1));
315 assert!(is_ready_to_execute(&msg));
316
317 msg.headers.eta = Some(Utc::now() + Duration::hours(1));
319 assert!(!is_ready_to_execute(&msg));
320 }
321
322 #[test]
323 fn test_time_until_expiration() {
324 let mut msg = create_test_message();
325
326 assert!(time_until_expiration(&msg).is_none());
328
329 msg.headers.expires = Some(Utc::now() + Duration::hours(1));
331 let remaining = time_until_expiration(&msg);
332 assert!(remaining.is_some());
333 assert!(remaining.unwrap().num_minutes() > 50);
334
335 msg.headers.expires = Some(Utc::now() - Duration::hours(1));
337 assert!(time_until_expiration(&msg).is_none());
338 }
339
340 #[test]
341 fn test_can_retry() {
342 let mut msg = create_test_message();
343
344 assert!(can_retry(&msg, 3));
346
347 msg.headers.retries = Some(2);
349 assert!(can_retry(&msg, 3));
350
351 msg.headers.retries = Some(3);
353 assert!(!can_retry(&msg, 3));
354 }
355
356 #[test]
357 fn test_create_retry_message() {
358 let msg = create_test_message();
359
360 let retry = create_retry_message(&msg, None);
362 assert_eq!(retry.headers.retries, Some(1));
363 assert_eq!(retry.headers.task, msg.headers.task);
364
365 let delay = Duration::minutes(5);
367 let retry_delayed = create_retry_message(&msg, Some(delay));
368 assert_eq!(retry_delayed.headers.retries, Some(1));
369 assert!(retry_delayed.headers.eta.is_some());
370 }
371
372 #[test]
373 fn test_clone_with_new_id() {
374 let msg = create_test_message();
375 let original_id = msg.headers.id;
376
377 let cloned = clone_with_new_id(&msg);
378 assert_ne!(cloned.headers.id, original_id);
379 assert_eq!(cloned.headers.task, msg.headers.task);
380 }
381
382 #[test]
383 fn test_exponential_backoff() {
384 assert_eq!(exponential_backoff(0, 1, 60), Duration::seconds(1));
385 assert_eq!(exponential_backoff(1, 1, 60), Duration::seconds(2));
386 assert_eq!(exponential_backoff(2, 1, 60), Duration::seconds(4));
387 assert_eq!(exponential_backoff(3, 1, 60), Duration::seconds(8));
388
389 assert_eq!(exponential_backoff(10, 1, 60), Duration::seconds(60));
391 }
392
393 #[test]
394 fn test_validate_batch() {
395 let msg1 = create_test_message();
396 let msg2 = create_test_message();
397 let messages = vec![msg1, msg2];
398
399 assert!(validate_batch(&messages).is_ok());
400
401 let mut invalid = create_test_message();
403 invalid.headers.task = String::new();
404 let messages_with_invalid = vec![create_test_message(), invalid];
405
406 assert!(validate_batch(&messages_with_invalid).is_err());
407 }
408
409 #[test]
410 fn test_filter_by_task() {
411 let msg1 = MessageBuilder::new("tasks.add").build().unwrap();
412 let msg2 = MessageBuilder::new("tasks.subtract").build().unwrap();
413 let msg3 = MessageBuilder::new("email.send").build().unwrap();
414
415 let messages = vec![msg1, msg2, msg3];
416 let filtered = filter_by_task(&messages, "tasks.");
417
418 assert_eq!(filtered.len(), 2);
419 }
420
421 #[test]
422 fn test_sort_by_priority() {
423 let mut msg1 = create_test_message();
424 let mut msg2 = create_test_message();
425 let mut msg3 = create_test_message();
426
427 msg1.properties.priority = Some(5);
428 msg2.properties.priority = Some(9);
429 msg3.properties.priority = Some(1);
430
431 let mut messages = vec![msg1, msg2, msg3];
432 sort_by_priority(&mut messages);
433
434 assert_eq!(messages[0].properties.priority, Some(9));
435 assert_eq!(messages[1].properties.priority, Some(5));
436 assert_eq!(messages[2].properties.priority, Some(1));
437 }
438
439 #[test]
440 fn test_sort_by_eta() {
441 let now = Utc::now();
442 let mut msg1 = create_test_message();
443 let mut msg2 = create_test_message();
444 let mut msg3 = create_test_message();
445
446 msg1.headers.eta = Some(now + Duration::hours(2));
447 msg2.headers.eta = Some(now + Duration::hours(1));
448 msg3.headers.eta = None;
449
450 let mut messages = vec![msg1, msg2, msg3];
451 sort_by_eta(&mut messages);
452
453 assert!(messages[0].headers.eta.is_some());
454 assert!(messages[1].headers.eta.is_some());
455 assert!(messages[2].headers.eta.is_none());
456 }
457}