Skip to main content

lapin/
message.rs

1use crate::{
2    BasicProperties, Result,
3    acker::Acker,
4    error_holder::ErrorHolder,
5    internal_rpc::InternalRPCHandle,
6    killswitch::KillSwitch,
7    protocol::AMQPError,
8    types::ShortString,
9    types::{ChannelId, DeliveryTag, MessageCount, ReplyCode},
10};
11use std::ops::{Deref, DerefMut};
12
13/// Type wrapping the output of a consumer
14///
15/// - Ok(Some(delivery)) carries the delivery alongside its channel
16/// - Ok(None) means that the consumer got canceled
17/// - Err(error) carries the error and is always followed by Ok(None)
18pub type DeliveryResult = Result<Option<Delivery>>;
19
20/// A received AMQP message.
21///
22/// The message has to be acknowledged after processing by calling
23/// [`Acker::ack`], [`Acker::nack`] or [`Acker::reject`].
24/// (Multiple acknowledgments are also possible).
25///
26/// [`Acker::ack`]: ../struct.Acker.html#method.ack
27/// [`Acker::nack`]: ../struct.Acker.html#method.nack
28/// [`Acker::reject`]: ../struct.Acker.html#method.reject
29#[derive(Debug, PartialEq)]
30pub struct Delivery {
31    /// The delivery tag of the message. Use this for
32    /// acknowledging the message.
33    pub delivery_tag: DeliveryTag,
34
35    /// The exchange of the message. May be an empty string
36    /// if the default exchange is used.
37    pub exchange: ShortString,
38
39    /// The routing key of the message. May be an empty string
40    /// if no routing key is specified.
41    pub routing_key: ShortString,
42
43    /// Whether this message was redelivered
44    pub redelivered: bool,
45
46    /// Contains the properties and the headers of the
47    /// message.
48    pub properties: BasicProperties,
49
50    /// The payload of the message in binary format.
51    pub data: Vec<u8>,
52
53    /// The acker used to ack/nack the message
54    pub acker: Acker,
55}
56
57impl Delivery {
58    /// Craft a new Delivery for mocking in integration testing
59    pub fn mock(
60        delivery_tag: DeliveryTag,
61        exchange: ShortString,
62        routing_key: ShortString,
63        redelivered: bool,
64        data: Vec<u8>,
65    ) -> Self {
66        let mut this = Self::new(
67            0,
68            delivery_tag,
69            exchange,
70            routing_key,
71            redelivered,
72            None,
73            None,
74            KillSwitch::default(),
75        );
76        this.data = data;
77        this
78    }
79
80    pub(crate) fn new(
81        channel_id: ChannelId,
82        delivery_tag: DeliveryTag,
83        exchange: ShortString,
84        routing_key: ShortString,
85        redelivered: bool,
86        internal_rpc: Option<InternalRPCHandle>,
87        error: Option<ErrorHolder>,
88        killswitch: KillSwitch,
89    ) -> Self {
90        Self {
91            delivery_tag,
92            exchange,
93            routing_key,
94            redelivered,
95            properties: BasicProperties::default(),
96            data: Vec::default(),
97            acker: Acker::new(channel_id, delivery_tag, internal_rpc, error, killswitch),
98        }
99    }
100
101    pub(crate) fn receive_content(&mut self, data: Vec<u8>) {
102        self.data.extend(data);
103    }
104}
105
106impl Deref for Delivery {
107    type Target = Acker;
108
109    fn deref(&self) -> &Self::Target {
110        &self.acker
111    }
112}
113
114#[derive(Debug, PartialEq)]
115pub struct BasicGetMessage {
116    pub delivery: Delivery,
117    pub message_count: MessageCount,
118}
119
120impl BasicGetMessage {
121    pub(crate) fn new(
122        channel_id: ChannelId,
123        delivery_tag: DeliveryTag,
124        exchange: ShortString,
125        routing_key: ShortString,
126        redelivered: bool,
127        message_count: MessageCount,
128        internal_rpc: InternalRPCHandle,
129        killswitch: KillSwitch,
130    ) -> Self {
131        Self {
132            delivery: Delivery::new(
133                channel_id,
134                delivery_tag,
135                exchange,
136                routing_key,
137                redelivered,
138                Some(internal_rpc),
139                None,
140                killswitch,
141            ),
142            message_count,
143        }
144    }
145}
146
147impl Deref for BasicGetMessage {
148    type Target = Delivery;
149
150    fn deref(&self) -> &Self::Target {
151        &self.delivery
152    }
153}
154
155impl DerefMut for BasicGetMessage {
156    fn deref_mut(&mut self) -> &mut Self::Target {
157        &mut self.delivery
158    }
159}
160
161#[derive(Debug, PartialEq)]
162pub struct BasicReturnMessage {
163    pub delivery: Delivery,
164    pub reply_code: ReplyCode,
165    pub reply_text: ShortString,
166}
167
168impl BasicReturnMessage {
169    pub(crate) fn new(
170        exchange: ShortString,
171        routing_key: ShortString,
172        reply_code: ReplyCode,
173        reply_text: ShortString,
174        killswitch: KillSwitch,
175    ) -> Self {
176        let delivery = Delivery::new(0, 0, exchange, routing_key, false, None, None, killswitch);
177        // We cannot ack a returned message
178        delivery.acker.invalidate();
179        Self {
180            delivery,
181            reply_code,
182            reply_text,
183        }
184    }
185
186    pub fn error(&self) -> Option<AMQPError> {
187        AMQPError::from_id(self.reply_code, self.reply_text.clone())
188    }
189}
190
191impl Deref for BasicReturnMessage {
192    type Target = Delivery;
193
194    fn deref(&self) -> &Self::Target {
195        &self.delivery
196    }
197}
198
199impl DerefMut for BasicReturnMessage {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        &mut self.delivery
202    }
203}