strut_rabbitmq/repr/
delivery.rs

1use crate::transport::inbound::delivery::{abandon_delivery, backwash_delivery, complete_delivery};
2use lapin::acker::Acker;
3use lapin::types::ShortShortUInt;
4use strut_factory::Deserialize as StrutDeserialize;
5
6/// Defines whether RabbitMQ persists the messages to disk, which affects
7/// whether such messages are able to survive a broker restart.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, StrutDeserialize)]
9#[strut(eq_fn = strut_deserialize::Slug::eq_as_slugs)]
10pub enum DeliveryMode {
11    /// Delivery mode `1`: non-persistent (transient): messages sent with this
12    /// mode will **not** survive a broker restart.
13    Transient,
14    /// Delivery mode `2`: persistent (durable): messages sent with this mode
15    /// will be written to disk and, if they are **also** routed to a **durable
16    /// queue**, they **will** survive a broker restart.
17    Durable,
18}
19
20/// Represents the supporting ways of finalizing an incoming RabbitMQ message.
21/// Relevant only when the inbound messages are set to be
22/// [manually](crate::AckingBehavior::Manual) acknowledged on the
23/// [`Ingress`](crate::Ingress).
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, StrutDeserialize)]
25#[strut(eq_fn = strut_deserialize::Slug::eq_as_slugs)]
26pub enum FinalizationKind {
27    /// Positively acknowledge the message.
28    Complete,
29    /// Reject (negatively acknowledge) the message and re-queue it.
30    Backwash,
31    /// Reject (negatively acknowledge) the message without re-queueing.
32    Abandon,
33}
34
35impl DeliveryMode {
36    /// Returns the appropriate `u8` value recognized by RabbitMQ.
37    pub const fn rabbitmq_value(&self) -> u8 {
38        match self {
39            DeliveryMode::Transient => 1,
40            DeliveryMode::Durable => 2,
41        }
42    }
43}
44
45impl From<ShortShortUInt> for DeliveryMode {
46    fn from(value: ShortShortUInt) -> Self {
47        match value {
48            2 => DeliveryMode::Durable,
49            _ => DeliveryMode::Transient,
50        }
51    }
52}
53
54impl FinalizationKind {
55    pub(crate) async fn apply(self, subscriber: &str, acker: &Acker, bytes: &[u8]) {
56        match self {
57            FinalizationKind::Complete => complete_delivery(subscriber, &acker, &bytes).await,
58            FinalizationKind::Backwash => backwash_delivery(subscriber, &acker, &bytes).await,
59            FinalizationKind::Abandon => abandon_delivery(subscriber, &acker, &bytes).await,
60        };
61    }
62}