strut_rabbitmq/util/
amqp_properties.rs1use crate::util::field_table::Attempt;
2use crate::util::{PushHeader, HEADER_ATTEMPT};
3use crate::DeliveryMode;
4use lapin::protocol::basic::AMQPProperties;
5use lapin::types::FieldTable;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8pub mod push;
9pub mod retrieve;
10
11pub trait RetrievePushMap {
14 fn retrieve_attempt(&self) -> Option<u32>;
20
21 fn retrieve_delivery_mode(&self) -> Option<DeliveryMode>;
24
25 fn retrieve_priority(&self) -> Option<u8>;
28
29 fn retrieve_timestamp(&self) -> Option<u64>;
32
33 fn push_attempt(self, attempt: u32) -> Self;
38
39 fn increment_attempt(self) -> Self;
45
46 fn push_delivery_mode(self, delivery_mode: DeliveryMode) -> Self;
48
49 fn push_priority(self, priority: u8) -> Self;
51
52 fn push_timestamp(self, timestamp: u64) -> Self;
55
56 fn push_current_timestamp(self) -> Self;
60}
61
62impl RetrievePushMap for AMQPProperties {
63 fn retrieve_attempt(&self) -> Option<u32> {
64 self.headers()
65 .as_ref()
66 .and_then(FieldTable::retrieve_attempt)
67 }
68
69 fn retrieve_delivery_mode(&self) -> Option<DeliveryMode> {
70 self.delivery_mode().map(DeliveryMode::from)
71 }
72
73 fn retrieve_priority(&self) -> Option<u8> {
74 *self.priority()
75 }
76
77 fn retrieve_timestamp(&self) -> Option<u64> {
78 *self.timestamp()
79 }
80
81 fn push_attempt(self, attempt: u32) -> Self {
82 self.push_header(HEADER_ATTEMPT, attempt)
83 }
84
85 fn increment_attempt(self) -> Self {
86 let current_attempt = self.retrieve_attempt().unwrap_or(0);
87
88 self.push_header(HEADER_ATTEMPT, current_attempt + 1)
89 }
90
91 fn push_delivery_mode(self, delivery_mode: DeliveryMode) -> Self {
92 self.with_delivery_mode(delivery_mode.rabbitmq_value())
93 }
94
95 fn push_priority(self, priority: u8) -> Self {
96 self.with_priority(priority)
97 }
98
99 fn push_timestamp(self, timestamp: u64) -> Self {
100 self.with_timestamp(timestamp)
101 }
102
103 fn push_current_timestamp(self) -> Self {
104 self.with_timestamp(
105 SystemTime::now()
106 .duration_since(UNIX_EPOCH)
107 .unwrap_or_default() .as_secs(),
109 )
110 }
111}