lapin/
message.rs

1use crate::{
2    BasicProperties, Result,
3    acker::Acker,
4    error_holder::ErrorHolder,
5    internal_rpc::InternalRPCHandle,
6    killswitch::KillSwitch,
7    protocol::AMQPError,
8    types::ShortString,
9    types::{ChannelId, DeliveryTag, MessageCount, ReplyCode},
10};
11use std::ops::{Deref, DerefMut};
12
13/// Type wrapping the output of a consumer
14///
15/// - Ok(Some(delivery)) carries the delivery alongside its channel
16/// - Ok(None) means that the consumer got canceled
17/// - Err(error) carries the error and is always followed by Ok(None)
18pub type DeliveryResult = Result<Option<Delivery>>;
19
20/// A received AMQP message.
21///
22/// The message has to be acknowledged after processing by calling
23/// [`Acker::ack`], [`Acker::nack`] or [`Acker::reject`].
24/// (Multiple acknowledgments are also possible).
25///
26/// [`Acker::ack`]: ../struct.Acker.html#method.ack
27/// [`Acker::nack`]: ../struct.Acker.html#method.nack
28/// [`Acker::reject`]: ../struct.Acker.html#method.reject
29#[derive(Debug, PartialEq)]
30pub struct Delivery {
31    /// The delivery tag of the message. Use this for
32    /// acknowledging the message.
33    pub delivery_tag: DeliveryTag,
34
35    /// The exchange of the message. May be an empty string
36    /// if the default exchange is used.
37    pub exchange: ShortString,
38
39    /// The routing key of the message. May be an empty string
40    /// if no routing key is specified.
41    pub routing_key: ShortString,
42
43    /// Whether this message was redelivered
44    pub redelivered: bool,
45
46    /// Contains the properties and the headers of the
47    /// message.
48    pub properties: BasicProperties,
49
50    /// The payload of the message in binary format.
51    pub data: Vec<u8>,
52
53    /// The acker used to ack/nack the message
54    pub acker: Acker,
55}
56
57impl Delivery {
58    pub(crate) fn new(
59        channel_id: ChannelId,
60        delivery_tag: DeliveryTag,
61        exchange: ShortString,
62        routing_key: ShortString,
63        redelivered: bool,
64        internal_rpc: Option<InternalRPCHandle>,
65        error: Option<ErrorHolder>,
66        killswitch: KillSwitch,
67    ) -> Self {
68        Self {
69            delivery_tag,
70            exchange,
71            routing_key,
72            redelivered,
73            properties: BasicProperties::default(),
74            data: Vec::default(),
75            acker: Acker::new(channel_id, delivery_tag, internal_rpc, error, killswitch),
76        }
77    }
78
79    pub(crate) fn receive_content(&mut self, data: Vec<u8>) {
80        self.data.extend(data);
81    }
82}
83
84impl Deref for Delivery {
85    type Target = Acker;
86
87    fn deref(&self) -> &Self::Target {
88        &self.acker
89    }
90}
91
92#[derive(Debug, PartialEq)]
93pub struct BasicGetMessage {
94    pub delivery: Delivery,
95    pub message_count: MessageCount,
96}
97
98impl BasicGetMessage {
99    pub(crate) fn new(
100        channel_id: ChannelId,
101        delivery_tag: DeliveryTag,
102        exchange: ShortString,
103        routing_key: ShortString,
104        redelivered: bool,
105        message_count: MessageCount,
106        internal_rpc: InternalRPCHandle,
107        killswitch: KillSwitch,
108    ) -> Self {
109        Self {
110            delivery: Delivery::new(
111                channel_id,
112                delivery_tag,
113                exchange,
114                routing_key,
115                redelivered,
116                Some(internal_rpc),
117                None,
118                killswitch,
119            ),
120            message_count,
121        }
122    }
123}
124
125impl Deref for BasicGetMessage {
126    type Target = Delivery;
127
128    fn deref(&self) -> &Self::Target {
129        &self.delivery
130    }
131}
132
133impl DerefMut for BasicGetMessage {
134    fn deref_mut(&mut self) -> &mut Self::Target {
135        &mut self.delivery
136    }
137}
138
139#[derive(Debug, PartialEq)]
140pub struct BasicReturnMessage {
141    pub delivery: Delivery,
142    pub reply_code: ReplyCode,
143    pub reply_text: ShortString,
144}
145
146impl BasicReturnMessage {
147    pub(crate) fn new(
148        exchange: ShortString,
149        routing_key: ShortString,
150        reply_code: ReplyCode,
151        reply_text: ShortString,
152        killswitch: KillSwitch,
153    ) -> Self {
154        let delivery = Delivery::new(0, 0, exchange, routing_key, false, None, None, killswitch);
155        // We cannot ack a returned message
156        delivery.acker.invalidate();
157        Self {
158            delivery,
159            reply_code,
160            reply_text,
161        }
162    }
163
164    pub fn error(&self) -> Option<AMQPError> {
165        AMQPError::from_id(self.reply_code, self.reply_text.clone())
166    }
167}
168
169impl Deref for BasicReturnMessage {
170    type Target = Delivery;
171
172    fn deref(&self) -> &Self::Target {
173        &self.delivery
174    }
175}
176
177impl DerefMut for BasicReturnMessage {
178    fn deref_mut(&mut self) -> &mut Self::Target {
179        &mut self.delivery
180    }
181}