metalmq_codec/frame/
basic.rs

1use super::{AMQPFrame, Channel, FieldTable, MethodFrameArgs};
2
3bitflags! {
4    pub struct BasicConsumeFlags: u8 {
5        const NO_LOCAL = 0b00000001;
6        const NO_ACK = 0b00000010;
7        const EXCLUSIVE = 0b00000100;
8        const NO_WAIT = 0b00001000;
9    }
10}
11
12impl Default for BasicConsumeFlags {
13    fn default() -> Self {
14        BasicConsumeFlags::empty()
15    }
16}
17
18#[derive(Debug, Default)]
19pub struct BasicConsumeArgs {
20    pub queue: String,
21    pub consumer_tag: String,
22    pub flags: BasicConsumeFlags,
23    pub args: Option<FieldTable>,
24}
25
26impl BasicConsumeArgs {
27    pub fn queue(mut self, queue: &str) -> Self {
28        self.queue = queue.to_string();
29        self
30    }
31
32    pub fn consumer_tag(mut self, consumer_tag: &str) -> Self {
33        self.consumer_tag = consumer_tag.to_string();
34        self
35    }
36
37    pub fn exclusive(mut self, mode: bool) -> Self {
38        self.flags.set(BasicConsumeFlags::EXCLUSIVE, mode);
39        self
40    }
41
42    pub fn no_ack(mut self, mode: bool) -> Self {
43        self.flags.set(BasicConsumeFlags::NO_ACK, mode);
44        self
45    }
46
47    pub fn no_local(mut self, mode: bool) -> Self {
48        self.flags.set(BasicConsumeFlags::NO_LOCAL, mode);
49        self
50    }
51
52    pub fn frame(self, channel: Channel) -> AMQPFrame {
53        AMQPFrame::Method(channel, super::BASIC_CONSUME, MethodFrameArgs::BasicConsume(self))
54    }
55}
56
57#[derive(Clone, Debug, Default)]
58pub struct BasicConsumeOkArgs {
59    pub consumer_tag: String,
60}
61
62impl BasicConsumeOkArgs {
63    pub fn new(consumer_tag: &str) -> Self {
64        Self {
65            consumer_tag: consumer_tag.to_string(),
66        }
67    }
68
69    pub fn frame(self, channel: Channel) -> AMQPFrame {
70        AMQPFrame::Method(channel, super::BASIC_CONSUME_OK, MethodFrameArgs::BasicConsumeOk(self))
71    }
72}
73
74#[derive(Clone, Debug, Default)]
75pub struct BasicCancelArgs {
76    pub consumer_tag: String,
77    pub no_wait: bool,
78}
79
80impl BasicCancelArgs {
81    pub fn new(consumer_tag: &str) -> Self {
82        Self {
83            consumer_tag: consumer_tag.to_string(),
84            no_wait: false,
85        }
86    }
87
88    pub fn frame(self, channel: Channel) -> AMQPFrame {
89        AMQPFrame::Method(channel, super::BASIC_CANCEL, super::MethodFrameArgs::BasicCancel(self))
90    }
91}
92
93#[derive(Clone, Debug, Default)]
94pub struct BasicCancelOkArgs {
95    pub consumer_tag: String,
96}
97
98impl BasicCancelOkArgs {
99    pub fn new(consumer_tag: &str) -> Self {
100        Self {
101            consumer_tag: consumer_tag.to_string(),
102        }
103    }
104
105    pub fn frame(self, channel: Channel) -> AMQPFrame {
106        AMQPFrame::Method(
107            channel,
108            super::BASIC_CANCEL_OK,
109            super::MethodFrameArgs::BasicCancelOk(self),
110        )
111    }
112}
113
114#[derive(Clone, Debug, Default)]
115pub struct BasicGetArgs {
116    pub queue: String,
117    pub no_ack: bool,
118}
119
120impl BasicGetArgs {
121    pub fn new(queue: &str) -> Self {
122        Self {
123            queue: queue.to_string(),
124            no_ack: false,
125        }
126    }
127
128    pub fn no_ack(mut self, mode: bool) -> Self {
129        self.no_ack = mode;
130        self
131    }
132
133    pub fn frame(self, channel: Channel) -> AMQPFrame {
134        AMQPFrame::Method(channel, super::BASIC_GET, super::MethodFrameArgs::BasicGet(self))
135    }
136}
137
138#[derive(Clone, Debug, Default)]
139pub struct BasicGetOkArgs {
140    pub delivery_tag: u64,
141    pub redelivered: bool,
142    pub exchange_name: String,
143    pub routing_key: String,
144    pub message_count: u32,
145}
146
147impl BasicGetOkArgs {
148    pub fn new(delivery_tag: u64, exchange_name: &str) -> Self {
149        Self {
150            delivery_tag,
151            exchange_name: exchange_name.to_string(),
152            ..Default::default()
153        }
154    }
155
156    pub fn redelivered(mut self, mode: bool) -> Self {
157        self.redelivered = mode;
158        self
159    }
160
161    pub fn routing_key(mut self, routing_key: &str) -> Self {
162        self.routing_key = routing_key.to_string();
163        self
164    }
165
166    pub fn message_count(mut self, message_count: u32) -> Self {
167        self.message_count = message_count;
168        self
169    }
170
171    pub fn frame(self, channel: Channel) -> AMQPFrame {
172        AMQPFrame::Method(channel, super::BASIC_GET_OK, super::MethodFrameArgs::BasicGetOk(self))
173    }
174}
175
176bitflags! {
177    pub struct BasicPublishFlags: u8 {
178        const MANDATORY = 0b00000001;
179        const IMMEDIATE = 0b00000010;
180    }
181}
182
183impl Default for BasicPublishFlags {
184    fn default() -> Self {
185        BasicPublishFlags::empty()
186    }
187}
188
189#[derive(Clone, Debug, Default)]
190pub struct BasicPublishArgs {
191    pub exchange_name: String,
192    pub routing_key: String,
193    pub flags: BasicPublishFlags,
194}
195
196impl BasicPublishArgs {
197    pub fn new(exchange_name: &str) -> Self {
198        Self {
199            exchange_name: exchange_name.to_string(),
200            ..Default::default()
201        }
202    }
203
204    pub fn routing_key(mut self, routing_key: &str) -> Self {
205        self.routing_key = routing_key.to_string();
206        self
207    }
208
209    pub fn immediate(mut self, mode: bool) -> Self {
210        self.flags.set(BasicPublishFlags::IMMEDIATE, mode);
211        self
212    }
213
214    pub fn mandatory(mut self, mode: bool) -> Self {
215        self.flags.set(BasicPublishFlags::MANDATORY, mode);
216        self
217    }
218
219    pub fn frame(self, channel: Channel) -> AMQPFrame {
220        AMQPFrame::Method(
221            channel,
222            super::BASIC_PUBLISH,
223            super::MethodFrameArgs::BasicPublish(self),
224        )
225    }
226}
227
228#[derive(Clone, Debug, Default)]
229pub struct BasicReturnArgs {
230    pub reply_code: u16,
231    pub reply_text: String,
232    pub exchange_name: String,
233    pub routing_key: String,
234}
235
236impl BasicReturnArgs {
237    pub fn frame(self, channel: Channel) -> AMQPFrame {
238        AMQPFrame::Method(channel, super::BASIC_RETURN, super::MethodFrameArgs::BasicReturn(self))
239    }
240}
241
242#[derive(Clone, Debug, Default)]
243pub struct BasicDeliverArgs {
244    pub consumer_tag: String,
245    pub delivery_tag: u64,
246    pub redelivered: bool,
247    pub exchange_name: String,
248    pub routing_key: String,
249}
250
251impl BasicDeliverArgs {
252    pub fn new(consumer_tag: &str, delivery_tag: u64, exchange_name: &str) -> Self {
253        Self {
254            consumer_tag: consumer_tag.to_string(),
255            delivery_tag,
256            exchange_name: exchange_name.to_string(),
257            ..Default::default()
258        }
259    }
260
261    pub fn redelivered(mut self, mode: bool) -> Self {
262        self.redelivered = mode;
263        self
264    }
265
266    pub fn routing_key(mut self, routing_key: &str) -> Self {
267        self.routing_key = routing_key.to_string();
268        self
269    }
270
271    pub fn frame(self, channel: Channel) -> AMQPFrame {
272        AMQPFrame::Method(channel, super::BASIC_DELIVER, MethodFrameArgs::BasicDeliver(self))
273    }
274}
275
276#[derive(Clone, Debug, Default)]
277pub struct BasicAckArgs {
278    pub delivery_tag: u64,
279    pub multiple: bool,
280}
281
282impl BasicAckArgs {
283    pub fn delivery_tag(mut self, delivery_tag: u64) -> Self {
284        self.delivery_tag = delivery_tag;
285        self
286    }
287
288    pub fn multiple(mut self, mode: bool) -> Self {
289        self.multiple = mode;
290        self
291    }
292
293    pub fn frame(self, channel: Channel) -> AMQPFrame {
294        AMQPFrame::Method(channel, super::BASIC_ACK, super::MethodFrameArgs::BasicAck(self))
295    }
296}
297
298#[derive(Clone, Debug, Default)]
299pub struct BasicRejectArgs {
300    pub delivery_tag: u64,
301    pub requeue: bool,
302}
303
304#[derive(Clone, Debug, Default)]
305pub struct ConfirmSelectArgs {
306    pub no_wait: bool,
307}
308
309pub fn basic_get_empty(channel: Channel) -> AMQPFrame {
310    AMQPFrame::Method(channel, super::BASIC_GET_EMPTY, MethodFrameArgs::BasicGetEmpty)
311}
312
313pub fn confirm_select(channel: Channel) -> AMQPFrame {
314    AMQPFrame::Method(
315        channel,
316        super::CONFIRM_SELECT,
317        MethodFrameArgs::ConfirmSelect(ConfirmSelectArgs { no_wait: false }),
318    )
319}
320
321pub fn confirm_select_ok(channel: Channel) -> AMQPFrame {
322    AMQPFrame::Method(channel, super::CONFIRM_SELECT_OK, MethodFrameArgs::ConfirmSelectOk)
323}