1use crate::{
2 BasicProperties, Result,
3 acker::Acker,
4 error_holder::ErrorHolder,
5 internal_rpc::InternalRPCHandle,
6 killswitch::KillSwitch,
7 protocol::AMQPError,
8 types::ShortString,
9 types::{ChannelId, DeliveryTag, MessageCount, ReplyCode},
10};
11use std::ops::{Deref, DerefMut};
12
13pub type DeliveryResult = Result<Option<Delivery>>;
19
20#[derive(Debug, PartialEq)]
30pub struct Delivery {
31 pub delivery_tag: DeliveryTag,
34
35 pub exchange: ShortString,
38
39 pub routing_key: ShortString,
42
43 pub redelivered: bool,
45
46 pub properties: BasicProperties,
49
50 pub data: Vec<u8>,
52
53 pub acker: Acker,
55}
56
57impl Delivery {
58 pub fn mock(
60 delivery_tag: DeliveryTag,
61 exchange: ShortString,
62 routing_key: ShortString,
63 redelivered: bool,
64 data: Vec<u8>,
65 ) -> Self {
66 let mut this = Self::new(
67 0,
68 delivery_tag,
69 exchange,
70 routing_key,
71 redelivered,
72 None,
73 None,
74 KillSwitch::default(),
75 );
76 this.data = data;
77 this
78 }
79
80 pub(crate) fn new(
81 channel_id: ChannelId,
82 delivery_tag: DeliveryTag,
83 exchange: ShortString,
84 routing_key: ShortString,
85 redelivered: bool,
86 internal_rpc: Option<InternalRPCHandle>,
87 error: Option<ErrorHolder>,
88 killswitch: KillSwitch,
89 ) -> Self {
90 Self {
91 delivery_tag,
92 exchange,
93 routing_key,
94 redelivered,
95 properties: BasicProperties::default(),
96 data: Vec::default(),
97 acker: Acker::new(channel_id, delivery_tag, internal_rpc, error, killswitch),
98 }
99 }
100
101 pub(crate) fn receive_content(&mut self, data: Vec<u8>) {
102 self.data.extend(data);
103 }
104}
105
106impl Deref for Delivery {
107 type Target = Acker;
108
109 fn deref(&self) -> &Self::Target {
110 &self.acker
111 }
112}
113
114#[derive(Debug, PartialEq)]
115pub struct BasicGetMessage {
116 pub delivery: Delivery,
117 pub message_count: MessageCount,
118}
119
120impl BasicGetMessage {
121 pub(crate) fn new(
122 channel_id: ChannelId,
123 delivery_tag: DeliveryTag,
124 exchange: ShortString,
125 routing_key: ShortString,
126 redelivered: bool,
127 message_count: MessageCount,
128 internal_rpc: InternalRPCHandle,
129 killswitch: KillSwitch,
130 ) -> Self {
131 Self {
132 delivery: Delivery::new(
133 channel_id,
134 delivery_tag,
135 exchange,
136 routing_key,
137 redelivered,
138 Some(internal_rpc),
139 None,
140 killswitch,
141 ),
142 message_count,
143 }
144 }
145}
146
147impl Deref for BasicGetMessage {
148 type Target = Delivery;
149
150 fn deref(&self) -> &Self::Target {
151 &self.delivery
152 }
153}
154
155impl DerefMut for BasicGetMessage {
156 fn deref_mut(&mut self) -> &mut Self::Target {
157 &mut self.delivery
158 }
159}
160
161#[derive(Debug, PartialEq)]
162pub struct BasicReturnMessage {
163 pub delivery: Delivery,
164 pub reply_code: ReplyCode,
165 pub reply_text: ShortString,
166}
167
168impl BasicReturnMessage {
169 pub(crate) fn new(
170 exchange: ShortString,
171 routing_key: ShortString,
172 reply_code: ReplyCode,
173 reply_text: ShortString,
174 killswitch: KillSwitch,
175 ) -> Self {
176 let delivery = Delivery::new(0, 0, exchange, routing_key, false, None, None, killswitch);
177 delivery.acker.invalidate();
179 Self {
180 delivery,
181 reply_code,
182 reply_text,
183 }
184 }
185
186 pub fn error(&self) -> Option<AMQPError> {
187 AMQPError::from_id(self.reply_code, self.reply_text.clone())
188 }
189}
190
191impl Deref for BasicReturnMessage {
192 type Target = Delivery;
193
194 fn deref(&self) -> &Self::Target {
195 &self.delivery
196 }
197}
198
199impl DerefMut for BasicReturnMessage {
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 &mut self.delivery
202 }
203}