1use anyhow::Result;
2use std::collections::HashMap;
3
4use crate::message::PublishedMessage;
5use crate::model::ChannelNumber;
6use crate::processor;
7use crate::processor::{ClientRequest, ClientRequestSink, Param, WaitFor};
8use metalmq_codec::frame;
9
10pub struct Channel {
13 pub channel: ChannelNumber,
15 pub(crate) sink: ClientRequestSink,
16 consumers: HashMap<String, ClientRequest>,
18}
19
20impl std::fmt::Debug for Channel {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("Channel")
23 .field("channel", &(self.channel as u16))
24 .finish()
25 }
26}
27
28pub enum ExchangeType {
30 Direct,
33 Fanout,
35 Topic,
40 Headers,
43}
44
45impl From<ExchangeType> for &'static str {
46 fn from(et: ExchangeType) -> &'static str {
47 match et {
48 ExchangeType::Direct => "direct",
49 ExchangeType::Fanout => "fanout",
50 ExchangeType::Topic => "topic",
51 ExchangeType::Headers => "headers",
52 }
53 }
54}
55
56pub struct IfEmpty(pub bool);
58pub struct IfUnused(pub bool);
60
61#[derive(Default)]
78pub struct ExchangeDeclareOpts {
79 passive: bool,
80 durable: bool,
81 auto_delete: bool,
82 internal: bool,
83}
84
85impl ExchangeDeclareOpts {
86 pub fn passive(mut self, mode: bool) -> Self {
89 self.passive = mode;
90 self
91 }
92
93 pub fn durable(mut self, mode: bool) -> Self {
95 self.durable = mode;
96 self
97 }
98
99 pub fn auto_delete(mut self, mode: bool) -> Self {
103 self.auto_delete = mode;
104 self
105 }
106
107 pub fn internal(mut self, mode: bool) -> Self {
110 self.internal = mode;
111 self
112 }
113}
114
115#[derive(Default)]
119pub struct QueueDeclareOpts {
120 passive: bool,
121 durable: bool,
122 exclusive: bool,
123 auto_delete: bool,
124}
125
126impl QueueDeclareOpts {
127 pub fn passive(mut self, mode: bool) -> Self {
130 self.passive = mode;
131 self
132 }
133
134 pub fn durable(mut self, mode: bool) -> Self {
136 self.durable = mode;
137 self
138 }
139
140 pub fn exclusive(mut self, mode: bool) -> Self {
143 self.exclusive = mode;
144 self
145 }
146
147 pub fn auto_delete(mut self, mode: bool) -> Self {
150 self.auto_delete = mode;
151 self
152 }
153}
154
155#[derive(Debug)]
157pub enum HeaderMatch {
158 Any,
160 All,
162 AnyWithX,
164 AllWithX,
166}
167
168impl From<HeaderMatch> for frame::AMQPFieldValue {
169 fn from(value: HeaderMatch) -> Self {
170 match value {
171 HeaderMatch::Any => frame::AMQPFieldValue::LongString(String::from("any")),
172 HeaderMatch::All => frame::AMQPFieldValue::LongString(String::from("all")),
173 HeaderMatch::AnyWithX => frame::AMQPFieldValue::LongString(String::from("any-with-x")),
174 HeaderMatch::AllWithX => frame::AMQPFieldValue::LongString(String::from("all-with-x")),
175 }
176 }
177}
178
179#[derive(Debug)]
181pub enum Binding {
182 Direct(String),
185 Topic(String),
191 Fanout,
193 Headers {
196 headers: HashMap<String, String>,
197 x_match: HeaderMatch,
198 },
199}
200
201impl Channel {
202 pub(crate) fn new(channel: ChannelNumber, sink: ClientRequestSink) -> Channel {
203 Channel {
204 channel,
205 sink,
206 consumers: HashMap::new(),
207 }
208 }
209
210 pub async fn exchange_declare(
212 &self,
213 exchange_name: &str,
214 exchange_type: ExchangeType,
215 opts: ExchangeDeclareOpts,
216 ) -> Result<()> {
217 let frame = frame::ExchangeDeclareArgs::default()
218 .exchange_name(exchange_name)
219 .exchange_type(exchange_type.into())
220 .passive(opts.passive)
221 .durable(opts.durable)
222 .auto_delete(opts.auto_delete)
223 .internal(opts.internal)
224 .frame(self.channel);
225
226 processor::call(&self.sink, frame).await
227 }
228
229 pub async fn exchange_delete(&self, exchange_name: &str, if_unused: IfUnused) -> Result<()> {
242 let frame = frame::ExchangeDeleteArgs::default()
243 .exchange_name(exchange_name)
244 .if_unused(if_unused.0)
245 .frame(self.channel);
246
247 processor::call(&self.sink, frame).await
248 }
249
250 pub async fn queue_declare(&self, queue_name: &str, opts: QueueDeclareOpts) -> Result<()> {
252 let frame = frame::QueueDeclareArgs::default()
253 .name(queue_name)
254 .passive(opts.passive)
255 .durable(opts.durable)
256 .exclusive(opts.exclusive)
257 .auto_delete(opts.auto_delete)
258 .frame(self.channel);
259
260 processor::call(&self.sink, frame).await
261 }
262
263 pub async fn queue_bind(&self, queue_name: &str, exchange_name: &str, binding: Binding) -> Result<()> {
265 use frame::AMQPFieldValue;
266
267 let mut queue_binding = frame::QueueBindArgs::new(queue_name, exchange_name);
268
269 queue_binding = match binding {
270 Binding::Direct(routing_key) => queue_binding.routing_key(&routing_key),
271 Binding::Topic(routing_key) => queue_binding.routing_key(&routing_key),
272 Binding::Fanout => queue_binding,
273 Binding::Headers { headers, x_match } => {
274 let mut args = HashMap::new();
275
276 for (k, v) in headers.into_iter() {
277 args.insert(k, AMQPFieldValue::LongString(v));
278 }
279
280 args.insert("x-match".to_string(), x_match.into());
281
282 queue_binding.args = Some(args);
283 queue_binding
284 }
285 };
286
287 processor::call(&self.sink, queue_binding.frame(self.channel)).await
288 }
289
290 pub async fn queue_unbind(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<()> {
291 let frame = frame::QueueUnbindArgs::new(queue_name, exchange_name)
292 .routing_key(routing_key)
293 .frame(self.channel);
294
295 processor::call(&self.sink, frame).await
296 }
297
298 pub async fn queue_purge(&self, queue_name: &str) -> Result<()> {
299 processor::call(
301 &self.sink,
302 frame::QueuePurgeArgs::default()
303 .queue_name(queue_name)
304 .frame(self.channel),
305 )
306 .await
307 }
308
309 pub async fn queue_delete(&self, queue_name: &str, if_unused: IfUnused, if_empty: IfEmpty) -> Result<()> {
310 let frame = frame::QueueDeleteArgs::default()
311 .queue_name(queue_name)
312 .if_empty(if_empty.0)
313 .if_unused(if_unused.0)
314 .frame(self.channel);
315
316 processor::call(&self.sink, frame).await
317 }
318
319 pub async fn basic_publish(&self, exchange_name: &str, routing_key: &str, message: PublishedMessage) -> Result<()> {
320 let frame = frame::BasicPublishArgs::new(exchange_name)
321 .routing_key(routing_key)
322 .immediate(message.immediate)
323 .mandatory(message.mandatory)
324 .frame(self.channel);
325
326 self.sink
327 .send(ClientRequest {
328 param: Param::Publish(frame, message.message),
329 response: WaitFor::Nothing,
330 })
331 .await?;
332
333 Ok(())
334 }
335
336 pub async fn confirm(&self) -> Result<()> {
337 processor::call(&self.sink, frame::confirm_select(self.channel)).await
338 }
339
340 pub async fn close(&self) -> Result<()> {
342 processor::call(
343 &self.sink,
344 frame::channel_close(self.channel, 200, "Normal close", frame::CHANNEL_CLOSE),
345 )
346 .await
347 }
348}