1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
use crate::{Channel, Consumer, ConsumerOptions, Exchange, FieldTable, Get, Result}; use amq_protocol::protocol::queue::{Declare, Delete}; /// Options passed to the server when declaring a queue. /// /// The [`default`](#impl-Default) implementation sets all boolean fields to false and has an empty /// set of arguments. #[derive(Clone, Debug, Default)] pub struct QueueDeclareOptions { /// If true, declares queue as durable (survives server restarts); if false, declares queue as /// transient (will be deleted on a server restart). pub durable: bool, /// If true, declares queue as exclusive: the queue may only be accessed by the current /// connection, and it will be deleted when the current connection is closed. pub exclusive: bool, /// If true, declares queue as auto-delete: the server will delete it once the last consumer is /// disconnected (either by cancellation or by its channel being closed). /// /// NOTE: If a queue is declared as auto-delete but never has a consumer, it will not be /// deleted. pub auto_delete: bool, /// Extra arguments; these are optional in general, but may be needed for some plugins or /// server-specific features. pub arguments: FieldTable, } impl QueueDeclareOptions { pub(crate) fn into_declare(self, queue: String, passive: bool, nowait: bool) -> Declare { Declare { ticket: 0, queue, passive, durable: self.durable, exclusive: self.exclusive, auto_delete: self.auto_delete, nowait, arguments: self.arguments, } } } /// Options passed to the server when deleting a queue. /// /// The [`default`](#impl-Default) implementation sets all boolean fields to false. pub struct QueueDeleteOptions { /// If true, the server will only delete the queue if it has no consumers. If true and the /// queue _does_ have consumers, the server will close the current channel with an error. pub if_unused: bool, /// If true, the server will only delete the queue if it has no messages. pub if_empty: bool, } impl QueueDeleteOptions { pub(crate) fn into_delete(self, queue: String, nowait: bool) -> Delete { Delete { ticket: 0, queue, if_unused: self.if_unused, if_empty: self.if_empty, nowait, } } } /// Handle for a declared AMQP queue. pub struct Queue<'a> { channel: &'a Channel, name: String, message_count: Option<u32>, consumer_count: Option<u32>, } impl<'a> Queue<'a> { pub(crate) fn new( channel: &Channel, name: String, message_count: Option<u32>, consumer_count: Option<u32>, ) -> Queue { Queue { channel, name, message_count, consumer_count, } } /// Name of the declared queue. Normally this is the name specified when you declare the queue; /// however, if you declare a queue with an empty name, the server will assign an autogenerated /// queue name. #[inline] pub fn name(&self) -> &str { &self.name } /// Count of messages in the queue at the time when the queue was declared. /// /// This will be `Some(count)` if the queue was declared via /// [`Channel::queue_declare`](struct.Channel.html#method.queue_declare) or /// [`Channel::queue_declare_passive`](struct.Channel.html#method.queue_declare_passive), and /// will be `None` if the queue was declared via /// [`Channel::queue_declare_nowait`](struct.Channel.html#method.queue_declare_nowait). #[inline] pub fn declared_message_count(&self) -> Option<u32> { self.message_count } /// Count of consumers on the queue queue at the time when the queue was declared. /// /// This will be `Some(count)` if the queue was declared via /// [`Channel::queue_declare`](struct.Channel.html#method.queue_declare) or /// [`Channel::queue_declare_passive`](struct.Channel.html#method.queue_declare_passive), and /// will be `None` if the queue was declared via /// [`Channel::queue_declare_nowait`](struct.Channel.html#method.queue_declare_nowait). #[inline] pub fn declared_consumer_count(&self) -> Option<u32> { self.consumer_count } /// Synchronously get a single message from the queue. /// /// On success, returns `Some(message)` if there was a message in the queue or `None` if there /// were no messages in the queue. If `no_ack` is false, you are responsible for acknowledging /// the returned message, typically via [`Get::ack`](struct.Get.html#method.ack). /// /// Prefer using [`consume`](#method.consume) to allow the server to push messages to you on /// demand instead of polling with `get`. #[inline] pub fn get(&self, no_ack: bool) -> Result<Option<Get>> { self.channel.basic_get(self.name.clone(), no_ack) } /// Synchronously start a consumer on this queue. #[inline] pub fn consume(&self, options: ConsumerOptions) -> Result<Consumer<'a>> { self.channel.basic_consume(self.name.clone(), options) } /// Synchronously bind this queue to an exchange with the given routing key. `arguments` are /// typically optional, and are plugin / server dependent. #[inline] pub fn bind<S: Into<String>>( &self, exchange: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .queue_bind(self.name(), exchange.name(), routing_key, arguments) } /// Asynchronously bind this queue to an exchange with the given routing key. `arguments` are /// typically optional, and are plugin / server dependent. #[inline] pub fn bind_nowait<S: Into<String>>( &self, exchange: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .queue_bind_nowait(self.name(), exchange.name(), routing_key, arguments) } /// Synchronously unbind this queue from an exchange with the given routing key. `arguments` /// are typically optional, and are plugin / server dependent. #[inline] pub fn unbind<S: Into<String>>( &self, exchange: &Exchange, routing_key: S, arguments: FieldTable, ) -> Result<()> { self.channel .queue_unbind(self.name(), exchange.name(), routing_key, arguments) } /// Synchronously purge all messages from this queue. On success, returns the number of /// messages that were purged. #[inline] pub fn purge(&self) -> Result<u32> { self.channel.queue_purge(self.name()) } /// Asynchronously purge all messages from this queue. #[inline] pub fn purge_nowait(&self) -> Result<()> { self.channel.queue_purge_nowait(self.name()) } /// Synchronously delete this queue. On success, returns the number of messages that were in /// the queue when it was deleted. #[inline] pub fn delete(self, options: QueueDeleteOptions) -> Result<u32> { self.channel.queue_delete(self.name(), options) } /// Asynchronously delete this queue. #[inline] pub fn delete_nowait(self, options: QueueDeleteOptions) -> Result<()> { self.channel.queue_delete_nowait(self.name(), options) } }