1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
use crate::{AmqpProperties, Channel, FieldTable, Result}; use amq_protocol::protocol::exchange::Declare; /// Types of AMQP exchanges. pub enum ExchangeType { /// Direct exchange; delivers messages to queues based on the routing key. Direct, /// Fanout exchange; delivers messages to all bound queues and ignores routing key. Fanout, /// Topic exchange; delivers messages based on matching between a message routing key and the /// pattern that was used to bind a queue to an exchange. Topic, /// Headers exchanges; ignores routing key and routes based on message header fields. Headers, /// Custom exchange type; should begin with "x-". Custom(String), } impl AsRef<str> for ExchangeType { fn as_ref(&self) -> &str { use self::ExchangeType::*; match self { Direct => "direct", Fanout => "fanout", Topic => "topic", Headers => "headers", Custom(s) => s, } } } /// Options passed to the server when declaring an exchange. /// /// The [`default`](#impl-Default) implementation sets all boolean fields to false and has an empty /// set of arguments. #[derive(Clone, Debug, Default)] pub struct ExchangeDeclareOptions { /// If true, declares exchange as durable (survives server restarts); if false, declares /// exchange as transient (will be deleted on a server restart). pub durable: bool, /// If true, declare exchange as auto-delete: it will be deleted once no queues are bound to /// it. The server will keep the exchange around for "a short period of time" to allow queues /// to be bound after it is created. pub auto_delete: bool, /// If true, declare exchange as internal: it may not be used by publishers, but only for /// exchange-to-exchange bindings. pub internal: bool, /// Extra arguments; these are optional in general, but may be needed for some plugins or /// server-specific features. pub arguments: FieldTable, } impl ExchangeDeclareOptions { pub(crate) fn into_declare( self, type_: ExchangeType, name: String, passive: bool, nowait: bool, ) -> Declare { Declare { ticket: 0, exchange: name, passive, type_: type_.as_ref().to_string(), durable: self.durable, auto_delete: self.auto_delete, internal: self.internal, nowait, arguments: self.arguments, } } } /// Wrapper for a message to be published. pub struct Publish<'a> { /// Body of content to send. pub body: &'a [u8], /// Routing key. pub routing_key: String, /// If true, return this message to us if it cannot be routed to a queue. See /// [`Channel::listen_for_returns`](struct.Channel.html) for receiving returned messages. pub mandatory: bool, /// If true, return this message to us if it cannot immediately be routed to a consumer. See /// [`Channel::listen_for_returns`](struct.Channel.html) for receiving returned messages. pub immediate: bool, /// Other properties of the message (e.g., headers). pub properties: AmqpProperties, } impl<'a> Publish<'a> { /// Helper to create a message with the given body and routing key; `mandatory` and /// `immediate` will be set to false, and `properties` will be empty. pub fn new<S: Into<String>>(body: &[u8], routing_key: S) -> Publish { Publish { body, routing_key: routing_key.into(), mandatory: false, immediate: false, properties: AmqpProperties::default(), } } /// Helper to create a message with the given body, routing key, and properties; `mandatory` /// and `immediate` will be set to false. pub fn with_properties<S: Into<String>>( body: &[u8], routing_key: S, properties: AmqpProperties, ) -> Publish { Publish { body, routing_key: routing_key.into(), mandatory: false, immediate: false, properties, } } } /// Handle for a declared AMQP exchange. pub struct Exchange<'a> { channel: &'a Channel, name: String, } impl Exchange<'_> { pub(crate) fn new(channel: &Channel, name: String) -> Exchange { Exchange { channel, name } } /// Construct a handle for the direct exchange on the given `channel`. This is an entirely /// local operation; the default exchange (named `""`) is guaranteed to exist and does not need /// to be declared. pub fn direct(channel: &Channel) -> Exchange { let name = "".to_string(); Exchange { channel, name } } /// Name of this exchange. pub fn name(&self) -> &str { &self.name } /// Publish a message to this exchange. pub fn publish(&self, publish: Publish) -> Result<()> { self.channel.basic_publish(self.name(), publish) } /// Synchronously bind this exchange (as destination) to the `source` exchange with the given /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can /// examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn bind_to_source<S: Into<String>>( &self, source: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_bind(self.name(), source.name(), routing_key, arguments) } /// Asynchronously bind this exchange (as destination) to the `source` exchange with the given /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can /// examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn bind_to_source_nowait<S: Into<String>>( &self, source: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_bind_nowait(self.name(), source.name(), routing_key, arguments) } /// Synchronously bind this exchange (as source) to the `destination` exchange with the given /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can /// examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn bind_to_destination<S: Into<String>>( &self, destination: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_bind(destination.name(), self.name(), routing_key, arguments) } /// Asynchronously bind this exchange (as source) to the `destination` exchange with the given /// routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you can /// examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn bind_to_destination_nowait<S: Into<String>>( &self, destination: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_bind_nowait(destination.name(), self.name(), routing_key, arguments) } /// Synchronously unbind this exchange (as destination) from the `source` exchange with the /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you /// can examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn unbind_from_source<S: Into<String>>( &self, source: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_unbind(self.name(), source.name(), routing_key, arguments) } /// Asynchronously unbind this exchange (as destination) from the `source` exchange with the /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you /// can examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn unbind_from_source_nowait<S: Into<String>>( &self, source: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_unbind_nowait(self.name(), source.name(), routing_key, arguments) } /// Synchronously unbind this exchange (as source) from the `destination` exchange with the /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you /// can examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn unbind_from_destination<S: Into<String>>( &self, destination: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_unbind(destination.name(), self.name(), routing_key, arguments) } /// Asynchronously unbind this exchange (as source) from the `destination` exchange with the /// given routing key and arguments. Exchange-to-exchange binding is a RabbitMQ extension; you /// can examine the connection's [server /// properties](struct.Connection.html#method.server_properties) to see if the current /// connection supports this feature. pub fn unbind_from_destination_nowait<S: Into<String>>( &self, destination: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .exchange_unbind_nowait(destination.name(), self.name(), routing_key, arguments) } /// Synchronously delete this exchange. If `if_unused` is true, the exchange will only be /// deleted if it has no queue bindings; if `if_unused` is true and the exchange still has /// queue bindings, the server will close this channel. pub fn delete(self, if_unused: bool) -> Result<()> { self.channel.exchange_delete(self.name(), if_unused) } /// Asynchronously delete this exchange. If `if_unused` is true, the exchange will only be /// deleted if it has no queue bindings; if `if_unused` is true and the exchange still has /// queue bindings, the server will close this channel. pub fn delete_nowait(self, if_unused: bool) -> Result<()> { self.channel.exchange_delete_nowait(self.name(), if_unused) } }