strut_rabbitmq/util/
amqp_properties.rs

1use crate::util::field_table::Attempt;
2use crate::util::{PushHeader, HEADER_ATTEMPT};
3use crate::DeliveryMode;
4use lapin::protocol::basic::AMQPProperties;
5use lapin::types::FieldTable;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8pub mod push;
9pub mod retrieve;
10
11/// Convenience layer around [`AMQPProperties`] that allows easier getting and
12/// setting of the common properties.
13pub trait RetrievePushMap {
14    /// Extracts the attempt number from these [`AMQPProperties`], if it is
15    /// present and can be coerced to a `u32`.
16    ///
17    /// The attempt number is not a recognized AMQP property, and is stored in
18    /// the headers under the key [`HEADER_ATTEMPT`].
19    fn retrieve_attempt(&self) -> Option<u32>;
20
21    /// Extracts the [`DeliveryMode`] from these [`AMQPProperties`], if it is
22    /// present.
23    fn retrieve_delivery_mode(&self) -> Option<DeliveryMode>;
24
25    /// Extracts the priority from these [`AMQPProperties`], if it is present
26    /// and can be coerced to a `u8`.
27    fn retrieve_priority(&self) -> Option<u8>;
28
29    /// Extracts the timestamp from these [`AMQPProperties`], if it is present
30    /// and can be coerced to a `u64` (a UNIX timestamp).
31    fn retrieve_timestamp(&self) -> Option<u64>;
32
33    /// Sets the attempt number in these [`AMQPProperties`] to the given value.
34    ///
35    /// The attempt number is not a recognized AMQP property, and is stored in
36    /// the headers under the key [`HEADER_ATTEMPT`].
37    fn push_attempt(self, attempt: u32) -> Self;
38
39    /// Increments the attempt number in these [`AMQPProperties`] by one. If no
40    /// value exists, sets the new value to one.
41    ///
42    /// The attempt number is not a recognized AMQP property, and is stored in
43    /// the headers under the key [`HEADER_ATTEMPT`].
44    fn increment_attempt(self) -> Self;
45
46    /// Sets the [`DeliveryMode`] in these [`AMQPProperties`] to the given value.
47    fn push_delivery_mode(self, delivery_mode: DeliveryMode) -> Self;
48
49    /// Sets the priority in these [`AMQPProperties`] to the given `u8` value.
50    fn push_priority(self, priority: u8) -> Self;
51
52    /// Sets the timestamp in these [`AMQPProperties`] to the given `u64` value
53    /// (UNIX timestamp).
54    fn push_timestamp(self, timestamp: u64) -> Self;
55
56    /// Sets the message ID in these [`AMQPProperties`] to the current timestamp
57    /// as reported by [`SystemTime`] and converted to a `u64` value (UNIX
58    /// timestamp).
59    fn push_current_timestamp(self) -> Self;
60}
61
62impl RetrievePushMap for AMQPProperties {
63    fn retrieve_attempt(&self) -> Option<u32> {
64        self.headers()
65            .as_ref()
66            .and_then(FieldTable::retrieve_attempt)
67    }
68
69    fn retrieve_delivery_mode(&self) -> Option<DeliveryMode> {
70        self.delivery_mode().map(DeliveryMode::from)
71    }
72
73    fn retrieve_priority(&self) -> Option<u8> {
74        *self.priority()
75    }
76
77    fn retrieve_timestamp(&self) -> Option<u64> {
78        *self.timestamp()
79    }
80
81    fn push_attempt(self, attempt: u32) -> Self {
82        self.push_header(HEADER_ATTEMPT, attempt)
83    }
84
85    fn increment_attempt(self) -> Self {
86        let current_attempt = self.retrieve_attempt().unwrap_or(0);
87
88        self.push_header(HEADER_ATTEMPT, current_attempt + 1)
89    }
90
91    fn push_delivery_mode(self, delivery_mode: DeliveryMode) -> Self {
92        self.with_delivery_mode(delivery_mode.rabbitmq_value())
93    }
94
95    fn push_priority(self, priority: u8) -> Self {
96        self.with_priority(priority)
97    }
98
99    fn push_timestamp(self, timestamp: u64) -> Self {
100        self.with_timestamp(timestamp)
101    }
102
103    fn push_current_timestamp(self) -> Self {
104        self.with_timestamp(
105            SystemTime::now()
106                .duration_since(UNIX_EPOCH)
107                .unwrap_or_default() // If system time is somehow before UNIX epoch, set to default of zero
108                .as_secs(),
109        )
110    }
111}