async_mq/
client.rs

1// SPDX-License-Identifier: Apache-2.0 AND MIT
2//! `Client` and `Connection` structs
3use std::default::Default;
4
5/// A [non-consuming] [Connection] builder.
6///
7/// [Connection]: struct.Connection.html
8/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
9pub struct Client {
10    props: lapin::ConnectionProperties,
11}
12
13impl Client {
14    pub fn new() -> Self {
15        Self {
16            ..Default::default()
17        }
18    }
19    pub async fn connect(&self, uri: &str) -> crate::Result<Connection> {
20        let c = lapin::Connection::connect(uri, self.props.clone())
21            .await
22            .map_err(crate::Error::from)?;
23        Ok(Connection(c))
24    }
25}
26
27impl Default for Client {
28    fn default() -> Self {
29        Self {
30            props: lapin::ConnectionProperties::default(),
31        }
32    }
33}
34
35/// A [non-consuming] [ProducerBuilder] and [ConsumerBuilder] builder.
36///
37/// [ProducerBuilder]: ../produce/struct.ProducerBuilder.html
38/// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
39/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
40#[derive(Clone)]
41pub struct Connection(lapin::Connection);
42
43#[derive(Clone)]
44pub struct QueueOptions {
45    pub kind: lapin::ExchangeKind,
46    pub ex_opts: lapin::options::ExchangeDeclareOptions,
47    pub ex_field: lapin::types::FieldTable,
48    pub queue_opts: lapin::options::QueueDeclareOptions,
49    pub queue_field: lapin::types::FieldTable,
50    pub bind_opts: lapin::options::QueueBindOptions,
51    pub bind_field: lapin::types::FieldTable,
52}
53
54impl Connection {
55    /// Build a [non-consuming] [ProducerBuilder].
56    ///
57    /// [ProducerBuilder]: ../consume/struct.ProducerBuilder.html
58    /// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
59    pub fn producer_builder(&self) -> crate::ProducerBuilder {
60        crate::ProducerBuilder::new(self.clone())
61    }
62    /// Build a [non-consuming] [ConsumerBuilder].
63    ///
64    /// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
65    /// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
66    pub fn consumer_builder(&self) -> crate::ConsumerBuilder {
67        crate::ConsumerBuilder::new(self.clone())
68    }
69    /// channel creates a channel over the [Connection]
70    /// and returns the `Future<Output = <lapin::Channel>>`.
71    pub async fn channel(&self) -> crate::Result<lapin::Channel> {
72        self.0.create_channel().await.map_err(crate::Error::from)
73    }
74    /// queue creates a channel and a queue over the [Connection]
75    /// and returns the `Future<Output = <lapin::Channel, lapin::Queue>>`.
76    pub async fn queue(
77        &self,
78        ex: &str,
79        queue: &str,
80        opts: QueueOptions,
81    ) -> crate::Result<(lapin::Channel, lapin::Queue)> {
82        let ch = self.0.create_channel().await.map_err(crate::Error::from)?;
83        let q = ch
84            .queue_declare(queue, opts.queue_opts, opts.queue_field)
85            .await
86            .map_err(crate::Error::from)?;
87        if Self::is_default_exchange(ex) {
88            // We don't need to bind to the exchange in case of the default
89            // exchange.
90            return Ok((ch, q));
91        }
92        ch.exchange_declare(ex, opts.kind, opts.ex_opts, opts.ex_field)
93            .await
94            .map_err(crate::Error::from)?;
95        let routing_key = if Self::is_ephemeral_queue(queue) {
96            q.name().as_str()
97        } else {
98            queue
99        };
100        ch.queue_bind(
101            queue,
102            ex,
103            routing_key,
104            opts.bind_opts.clone(),
105            opts.bind_field.clone(),
106        )
107        .await
108        .map_err(crate::Error::from)?;
109        Ok((ch, q))
110    }
111    fn is_default_exchange(name: &str) -> bool {
112        name == crate::DEFAULT_EXCHANGE
113    }
114    fn is_ephemeral_queue(name: &str) -> bool {
115        name == crate::EPHEMERAL_QUEUE
116    }
117}