1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
use crate::{AmqpProperties, Channel, FieldTable, Result};
use amq_protocol::protocol::exchange::Declare;

/// Types of AMQP exchanges.
pub enum ExchangeType {
    /// Direct exchange; delivers messages to queues based on the routing key.
    Direct,

    /// Fanout exchange; delivers messages to all bound queues and ignores routing key.
    Fanout,

    /// Topic exchange; delivers messages based on matching between a message routing key and the
    /// pattern that was used to bind a queue to an exchange.
    Topic,

    /// Headers exchanges; ignores routing key and routes based on message header fields.
    Headers,

    /// Custom exchange type; should begin with "x-".
    Custom(String),
}

impl AsRef<str> for ExchangeType {
    fn as_ref(&self) -> &str {
        use self::ExchangeType::*;
        match self {
            Direct => "direct",
            Fanout => "fanout",
            Topic => "topic",
            Headers => "headers",
            Custom(s) => s,
        }
    }
}

/// Options passed to the server when declaring an exchange.
///
/// The [`default`](#impl-Default) implementation sets all boolean fields to false and has an empty
/// set of arguments.
#[derive(Clone, Debug, Default)]
pub struct ExchangeDeclareOptions {
    /// If true, declares exchange as durable (survives server restarts); if false, declares
    /// exchange as transient (will be deleted on a server restart).
    pub durable: bool,

    /// If true, declare exchange as auto-delete: it will be deleted once no queues are bound to
    /// it. The server will keep the exchange around for "a short period of time" to allow queues
    /// to be bound after it is created.
    pub auto_delete: bool,

    /// If true, declare exchange as internal: it may not be used by publishers, but only for
    /// exchange-to-exchange bindings.
    pub internal: bool,

    /// Extra arguments; these are optional in general, but may be needed for some plugins or
    /// server-specific features.
    pub arguments: FieldTable,
}

impl ExchangeDeclareOptions {
    pub(crate) fn into_declare(
        self,
        type_: ExchangeType,
        name: String,
        passive: bool,
        nowait: bool,
    ) -> Declare {
        Declare {
            ticket: 0,
            exchange: name,
            passive,
            type_: type_.as_ref().to_string(),
            durable: self.durable,
            auto_delete: self.auto_delete,
            internal: self.internal,
            nowait,
            arguments: self.arguments,
        }
    }
}

/// Wrapper for a message to be published.
pub struct Publish<'a> {
    /// Body of content to send.
    pub body: &'a [u8],

    /// Routing key.
    pub routing_key: String,

    /// If true, return this message to us if it cannot be routed to a queue. See
    /// [`Channel::listen_for_returns`](struct.Channel.html) for receiving returned messages.
    pub mandatory: bool,

    /// If true, return this message to us if it cannot immediately be routed to a consumer. See
    /// [`Channel::listen_for_returns`](struct.Channel.html) for receiving returned messages.
    pub immediate: bool,

    /// Other properties of the message (e.g., headers).
    pub properties: AmqpProperties,
}

impl<'a> Publish<'a> {
    /// Helper to create a message with the given body and routing key; `mandatory` and
    /// `immediate` will be set to false, and `properties` will be empty.
    pub fn new<S: Into<String>>(body: &[u8], routing_key: S) -> Publish {
        Publish {
            body,
            routing_key: routing_key.into(),
            mandatory: false,
            immediate: false,
            properties: AmqpProperties::default(),
        }
    }

    /// Helper to create a message with the given body, routing key, and properties; `mandatory`
    /// and `immediate` will be set to false.
    pub fn with_properties<S: Into<String>>(
        body: &[u8],
        routing_key: S,
        properties: AmqpProperties,
    ) -> Publish {
        Publish {
            body,
            routing_key: routing_key.into(),
            mandatory: false,
            immediate: false,
            properties,
        }
    }
}

/// Handle for a declared AMQP exchange.
pub struct Exchange<'a> {
    channel: &'a Channel,
    name: String,
}

impl Exchange<'_> {
    pub(crate) fn new(channel: &Channel, name: String) -> Exchange {
        Exchange { channel, name }
    }

    /// Construct a handle for the direct exchange on the given `channel`. This is an entirely
    /// local operation; the default exchange (named `""`) is guaranteed to exist and does not need
    /// to be declared.
    pub fn direct(channel: &Channel) -> Exchange {
        let name = "".to_string();
        Exchange { channel, name }
    }

    /// Name of this exchange.
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Publish a message to this exchange.
    pub fn publish(&self, publish: Publish) -> Result<()> {
        self.channel.basic_publish(self.name(), publish)
    }

    /// Synchronously bind this exchange (as destination) to the `source` exchange with the given
    /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can
    /// examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn bind_to_source<S: Into<String>>(
        &self,
        source: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_bind(self.name(), source.name(), routing_key, arguments)
    }

    /// Asynchronously bind this exchange (as destination) to the `source` exchange with the given
    /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can
    /// examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn bind_to_source_nowait<S: Into<String>>(
        &self,
        source: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_bind_nowait(self.name(), source.name(), routing_key, arguments)
    }

    /// Synchronously bind this exchange (as source) to the `destination` exchange with the given
    /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can
    /// examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn bind_to_destination<S: Into<String>>(
        &self,
        destination: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_bind(destination.name(), self.name(), routing_key, arguments)
    }

    /// Asynchronously bind this exchange (as source) to the `destination` exchange with the given
    /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can
    /// examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn bind_to_destination_nowait<S: Into<String>>(
        &self,
        destination: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_bind_nowait(destination.name(), self.name(), routing_key, arguments)
    }

    /// Synchronously unbind this exchange (as destination) from the `source` exchange with the
    /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you
    /// can examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn unbind_from_source<S: Into<String>>(
        &self,
        source: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_unbind(self.name(), source.name(), routing_key, arguments)
    }

    /// Asynchronously unbind this exchange (as destination) from the `source` exchange with the
    /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you
    /// can examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn unbind_from_source_nowait<S: Into<String>>(
        &self,
        source: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_unbind_nowait(self.name(), source.name(), routing_key, arguments)
    }

    /// Synchronously unbind this exchange (as source) from the `destination` exchange with the
    /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you
    /// can examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn unbind_from_destination<S: Into<String>>(
        &self,
        destination: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_unbind(destination.name(), self.name(), routing_key, arguments)
    }

    /// Asynchronously unbind this exchange (as source) from the `destination` exchange with the
    /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you
    /// can examine the connection's [server
    /// properties](struct.Connection.html#method.server_properties) to see if the current
    /// connection supports this feature.
    pub fn unbind_from_destination_nowait<S: Into<String>>(
        &self,
        destination: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .exchange_unbind_nowait(destination.name(), self.name(), routing_key, arguments)
    }

    /// Synchronously delete this exchange. If `if_unused` is true, the exchange will only be
    /// deleted if it has no queue bindings; if `if_unused` is true and the exchange still has
    /// queue bindings, the server will close this channel.
    pub fn delete(self, if_unused: bool) -> Result<()> {
        self.channel.exchange_delete(self.name(), if_unused)
    }

    /// Asynchronously delete this exchange. If `if_unused` is true, the exchange will only be
    /// deleted if it has no queue bindings; if `if_unused` is true and the exchange still has
    /// queue bindings, the server will close this channel.
    pub fn delete_nowait(self, if_unused: bool) -> Result<()> {
        self.channel.exchange_delete_nowait(self.name(), if_unused)
    }
}