1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// SPDX-License-Identifier: Apache-2.0 AND MIT
//! `Client` and `Connection` structs
use std::default::Default;

/// A [non-consuming] [Connection] builder.
///
/// [Connection]: struct.Connection.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
pub struct Client {
    props: lapin::ConnectionProperties,
}

impl Client {
    pub fn new() -> Self {
        Self {
            ..Default::default()
        }
    }
    pub async fn connect(&self, uri: &str) -> crate::Result<Connection> {
        let c = lapin::Connection::connect(uri, self.props.clone())
            .await
            .map_err(crate::Error::from)?;
        Ok(Connection(c))
    }
}

impl Default for Client {
    fn default() -> Self {
        Self {
            props: lapin::ConnectionProperties::default(),
        }
    }
}

/// A [non-consuming] [ProducerBuilder] and [ConsumerBuilder] builder.
///
/// [ProducerBuilder]: ../produce/struct.ProducerBuilder.html
/// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
/// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
#[derive(Clone)]
pub struct Connection(lapin::Connection);

#[derive(Clone)]
pub struct QueueOptions {
    pub kind: lapin::ExchangeKind,
    pub ex_opts: lapin::options::ExchangeDeclareOptions,
    pub ex_field: lapin::types::FieldTable,
    pub queue_opts: lapin::options::QueueDeclareOptions,
    pub queue_field: lapin::types::FieldTable,
    pub bind_opts: lapin::options::QueueBindOptions,
    pub bind_field: lapin::types::FieldTable,
}

impl Connection {
    /// Build a [non-consuming] [ProducerBuilder].
    ///
    /// [ProducerBuilder]: ../consume/struct.ProducerBuilder.html
    /// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
    pub fn producer_builder(&self) -> crate::ProducerBuilder {
        crate::ProducerBuilder::new(self.clone())
    }
    /// Build a [non-consuming] [ConsumerBuilder].
    ///
    /// [ConsumerBuilder]: ../consume/struct.ConsumerBuilder.html
    /// [non-consuming]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html#non-consuming-builders-(preferred):
    pub fn consumer_builder(&self) -> crate::ConsumerBuilder {
        crate::ConsumerBuilder::new(self.clone())
    }
    /// channel creates a channel over the [Connection]
    /// and returns the `Future<Output = <lapin::Channel>>`.
    pub async fn channel(&self) -> crate::Result<lapin::Channel> {
        self.0.create_channel().await.map_err(crate::Error::from)
    }
    /// queue creates a channel and a queue over the [Connection]
    /// and returns the `Future<Output = <lapin::Channel, lapin::Queue>>`.
    pub async fn queue(
        &self,
        ex: &str,
        queue: &str,
        opts: QueueOptions,
    ) -> crate::Result<(lapin::Channel, lapin::Queue)> {
        let ch = self.0.create_channel().await.map_err(crate::Error::from)?;
        let q = ch
            .queue_declare(queue, opts.queue_opts, opts.queue_field)
            .await
            .map_err(crate::Error::from)?;
        if Self::is_default_exchange(ex) {
            // We don't need to bind to the exchange in case of the default
            // exchange.
            return Ok((ch, q));
        }
        ch.exchange_declare(ex, opts.kind, opts.ex_opts, opts.ex_field)
            .await
            .map_err(crate::Error::from)?;
        let routing_key = if Self::is_ephemeral_queue(queue) {
            q.name().as_str()
        } else {
            queue
        };
        ch.queue_bind(
            queue,
            ex,
            routing_key,
            opts.bind_opts.clone(),
            opts.bind_field.clone(),
        )
        .await
        .map_err(crate::Error::from)?;
        Ok((ch, q))
    }
    fn is_default_exchange(name: &str) -> bool {
        name == crate::DEFAULT_EXCHANGE
    }
    fn is_ephemeral_queue(name: &str) -> bool {
        name == crate::EPHEMERAL_QUEUE
    }
}