metalmq_client/
channel_api.rs

1use anyhow::Result;
2use std::collections::HashMap;
3
4use crate::message::PublishedMessage;
5use crate::model::ChannelNumber;
6use crate::processor;
7use crate::processor::{ClientRequest, ClientRequestSink, Param, WaitFor};
8use metalmq_codec::frame;
9
10/// A channel is the main method of communicating with an AMQP server. Channels can be created on
11/// an open connection by calling the [`Client.channel_open`] function.
12pub struct Channel {
13    /// Channel number identifies the channel in a connection.
14    pub channel: ChannelNumber,
15    pub(crate) sink: ClientRequestSink,
16    /// Active consumers by consumer tag
17    consumers: HashMap<String, ClientRequest>,
18}
19
20impl std::fmt::Debug for Channel {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("Channel")
23            .field("channel", &(self.channel as u16))
24            .finish()
25    }
26}
27
28/// Represents the exchange binding type during `Queue.Bind`
29pub enum ExchangeType {
30    /// Messages are sent to the queue if the message routing key is equal to the binding routing
31    /// key.
32    Direct,
33    /// Messages are sent to all bound queue and the routing key is ignored.
34    Fanout,
35    /// Message routing key are matched to the routing key pattern of different queues bound to the
36    /// exchange, and the message is forwarded to the matching queues only. For example if the
37    /// message routing key is `stock.nyse.goog` the matching routing keys are `stock.*.*`,
38    /// `stock.nyse.*` or `stock.#` where hashmark matches more tags.
39    Topic,
40    /// Here the headers of the message are matched to the criteria defined by the binding. All or
41    /// any match is enough, based on the configuration.
42    Headers,
43}
44
45impl From<ExchangeType> for &'static str {
46    fn from(et: ExchangeType) -> &'static str {
47        match et {
48            ExchangeType::Direct => "direct",
49            ExchangeType::Fanout => "fanout",
50            ExchangeType::Topic => "topic",
51            ExchangeType::Headers => "headers",
52        }
53    }
54}
55
56/// Condition for stating that queue can be deleted if it is empty, doesn't have messages.
57pub struct IfEmpty(pub bool);
58/// Condition for deleting an exchange or a queue if they don't have active consumers.
59pub struct IfUnused(pub bool);
60
61/// Options for declaring exchanges.
62///
63/// It works in a builder style, one can chain the setter functions to build the option.
64/// ```no_run
65/// use metalmq_client::*;
66///
67/// async fn declare_exchange(channel: Channel) {
68///     channel.exchange_declare(
69///         "number-plates",
70///         ExchangeType::Direct,
71///         ExchangeDeclareOpts::default().durable(true)
72///         )
73///         .await
74///         .unwrap();
75/// }
76/// ```
77#[derive(Default)]
78pub struct ExchangeDeclareOpts {
79    passive: bool,
80    durable: bool,
81    auto_delete: bool,
82    internal: bool,
83}
84
85impl ExchangeDeclareOpts {
86    /// In passive declare the client can check if the exchange exists and it has been declared
87    /// with the same parameters (durable, auto delete and internal).
88    pub fn passive(mut self, mode: bool) -> Self {
89        self.passive = mode;
90        self
91    }
92
93    /// Durable exchanges survive the server restart.
94    pub fn durable(mut self, mode: bool) -> Self {
95        self.durable = mode;
96        self
97    }
98
99    /// `AutoDelete` queues are deleted when no longer used. When the last consumer closes the
100    /// connection, the server deletes the queue. Auto delete queues can be deleted explicitly. Auto
101    /// delete queues are not deleted if they are not yet used.
102    pub fn auto_delete(mut self, mode: bool) -> Self {
103        self.auto_delete = mode;
104        self
105    }
106
107    /// Clients cannot publish to internal exchanges only they can be bound to other exchanges and
108    /// exchanges can forward messages to them.
109    pub fn internal(mut self, mode: bool) -> Self {
110        self.internal = mode;
111        self
112    }
113}
114
115/// Builder style helper to specify options during queue declaration.
116///
117/// See [`ExchangeDeclareOpts`]
118#[derive(Default)]
119pub struct QueueDeclareOpts {
120    passive: bool,
121    durable: bool,
122    exclusive: bool,
123    auto_delete: bool,
124}
125
126impl QueueDeclareOpts {
127    /// In passive declare the client can check if the queue exists and it has been declared
128    /// with the same parameters (durable, auto delete and exclusive).
129    pub fn passive(mut self, mode: bool) -> Self {
130        self.passive = mode;
131        self
132    }
133
134    /// Durable queues survive the server restart.
135    pub fn durable(mut self, mode: bool) -> Self {
136        self.durable = mode;
137        self
138    }
139
140    /// Exclusive queues can be access by the declaring connection only, and they are deleted after
141    /// the connection terminates.
142    pub fn exclusive(mut self, mode: bool) -> Self {
143        self.exclusive = mode;
144        self
145    }
146
147    /// Auto delete queues are deleted after the last consumer terminates. If a queue has not been
148    /// consumed, it won't be deleted even if the declaring connection terminates.
149    pub fn auto_delete(mut self, mode: bool) -> Self {
150        self.auto_delete = mode;
151        self
152    }
153}
154
155/// Describe how many headers need to be matched in case of headers binding.
156#[derive(Debug)]
157pub enum HeaderMatch {
158    /// Any non `x-` header match is suffice.
159    Any,
160    /// All non `x-` header need to match.
161    All,
162    /// Any header match result in a success.
163    AnyWithX,
164    /// All header need to match including the ones start with `x-`.
165    AllWithX,
166}
167
168impl From<HeaderMatch> for frame::AMQPFieldValue {
169    fn from(value: HeaderMatch) -> Self {
170        match value {
171            HeaderMatch::Any => frame::AMQPFieldValue::LongString(String::from("any")),
172            HeaderMatch::All => frame::AMQPFieldValue::LongString(String::from("all")),
173            HeaderMatch::AnyWithX => frame::AMQPFieldValue::LongString(String::from("any-with-x")),
174            HeaderMatch::AllWithX => frame::AMQPFieldValue::LongString(String::from("all-with-x")),
175        }
176    }
177}
178
179/// Describe the queue binding.
180#[derive(Debug)]
181pub enum Binding {
182    /// Direct binding routes messages to a bound queue if the routing key of the message equals to
183    /// the routing key in the binding.
184    Direct(String),
185    /// In topic binding the message routed if the routing key of the message conforms to the topic
186    /// exchange pattern. The pattern can be an exact routing key in this case the match is
187    /// equality. It can contain asterisks which means any string separated by dots, like "stock.*.*"
188    /// matches "stock.nyse.goog". It can contain hashmark which covers multiple dot-separated
189    /// parts, like "stock.#" matchs to all substrings including one with dots.
190    Topic(String),
191    /// Fanout exchange broadcast message to each bound queues.
192    Fanout,
193    /// Headers binding matches the messages by their headers. Any or all header should match
194    /// depending on the [`HeaderMatch`] value. Currently header values are `String`s only.
195    Headers {
196        headers: HashMap<String, String>,
197        x_match: HeaderMatch,
198    },
199}
200
201impl Channel {
202    pub(crate) fn new(channel: ChannelNumber, sink: ClientRequestSink) -> Channel {
203        Channel {
204            channel,
205            sink,
206            consumers: HashMap::new(),
207        }
208    }
209
210    /// Declare exchange.
211    pub async fn exchange_declare(
212        &self,
213        exchange_name: &str,
214        exchange_type: ExchangeType,
215        opts: ExchangeDeclareOpts,
216    ) -> Result<()> {
217        let frame = frame::ExchangeDeclareArgs::default()
218            .exchange_name(exchange_name)
219            .exchange_type(exchange_type.into())
220            .passive(opts.passive)
221            .durable(opts.durable)
222            .auto_delete(opts.auto_delete)
223            .internal(opts.internal)
224            .frame(self.channel);
225
226        processor::call(&self.sink, frame).await
227    }
228
229    /// Delete exchange.
230    ///
231    /// ```no_run
232    /// use metalmq_client::{Client, IfUnused};
233    ///
234    /// # async fn foo() {
235    /// let mut c = Client::connect("localhost:5672", "guest", "guest").await.unwrap();
236    /// let ch = c.channel_open(1).await.unwrap();
237    ///
238    /// ch.exchange_delete("price-exchange", IfUnused(false)).await.unwrap();
239    /// # }
240    /// ```
241    pub async fn exchange_delete(&self, exchange_name: &str, if_unused: IfUnused) -> Result<()> {
242        let frame = frame::ExchangeDeleteArgs::default()
243            .exchange_name(exchange_name)
244            .if_unused(if_unused.0)
245            .frame(self.channel);
246
247        processor::call(&self.sink, frame).await
248    }
249
250    /// Declare queue.
251    pub async fn queue_declare(&self, queue_name: &str, opts: QueueDeclareOpts) -> Result<()> {
252        let frame = frame::QueueDeclareArgs::default()
253            .name(queue_name)
254            .passive(opts.passive)
255            .durable(opts.durable)
256            .exclusive(opts.exclusive)
257            .auto_delete(opts.auto_delete)
258            .frame(self.channel);
259
260        processor::call(&self.sink, frame).await
261    }
262
263    /// Bind queue to exchange.
264    pub async fn queue_bind(&self, queue_name: &str, exchange_name: &str, binding: Binding) -> Result<()> {
265        use frame::AMQPFieldValue;
266
267        let mut queue_binding = frame::QueueBindArgs::new(queue_name, exchange_name);
268
269        queue_binding = match binding {
270            Binding::Direct(routing_key) => queue_binding.routing_key(&routing_key),
271            Binding::Topic(routing_key) => queue_binding.routing_key(&routing_key),
272            Binding::Fanout => queue_binding,
273            Binding::Headers { headers, x_match } => {
274                let mut args = HashMap::new();
275
276                for (k, v) in headers.into_iter() {
277                    args.insert(k, AMQPFieldValue::LongString(v));
278                }
279
280                args.insert("x-match".to_string(), x_match.into());
281
282                queue_binding.args = Some(args);
283                queue_binding
284            }
285        };
286
287        processor::call(&self.sink, queue_binding.frame(self.channel)).await
288    }
289
290    pub async fn queue_unbind(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<()> {
291        let frame = frame::QueueUnbindArgs::new(queue_name, exchange_name)
292            .routing_key(routing_key)
293            .frame(self.channel);
294
295        processor::call(&self.sink, frame).await
296    }
297
298    pub async fn queue_purge(&self, queue_name: &str) -> Result<()> {
299        // FIXME give back the number of messages arriving in the QueuePurgeOk frame
300        processor::call(
301            &self.sink,
302            frame::QueuePurgeArgs::default()
303                .queue_name(queue_name)
304                .frame(self.channel),
305        )
306        .await
307    }
308
309    pub async fn queue_delete(&self, queue_name: &str, if_unused: IfUnused, if_empty: IfEmpty) -> Result<()> {
310        let frame = frame::QueueDeleteArgs::default()
311            .queue_name(queue_name)
312            .if_empty(if_empty.0)
313            .if_unused(if_unused.0)
314            .frame(self.channel);
315
316        processor::call(&self.sink, frame).await
317    }
318
319    pub async fn basic_publish(&self, exchange_name: &str, routing_key: &str, message: PublishedMessage) -> Result<()> {
320        let frame = frame::BasicPublishArgs::new(exchange_name)
321            .routing_key(routing_key)
322            .immediate(message.immediate)
323            .mandatory(message.mandatory)
324            .frame(self.channel);
325
326        self.sink
327            .send(ClientRequest {
328                param: Param::Publish(frame, message.message),
329                response: WaitFor::Nothing,
330            })
331            .await?;
332
333        Ok(())
334    }
335
336    pub async fn confirm(&self) -> Result<()> {
337        processor::call(&self.sink, frame::confirm_select(self.channel)).await
338    }
339
340    /// Closes the channel.
341    pub async fn close(&self) -> Result<()> {
342        processor::call(
343            &self.sink,
344            frame::channel_close(self.channel, 200, "Normal close", frame::CHANNEL_CLOSE),
345        )
346        .await
347    }
348}