1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
use crate::{Channel, Consumer, ConsumerOptions, Exchange, FieldTable, Get, Result};
use amq_protocol::protocol::queue::{Declare, Delete};

/// Options passed to the server when declaring a queue.
///
/// The [`default`](#impl-Default) implementation sets all boolean fields to false and has an empty
/// set of arguments.
#[derive(Clone, Debug, Default)]
pub struct QueueDeclareOptions {
    /// If true, declares queue as durable (survives server restarts); if false, declares queue as
    /// transient (will be deleted on a server restart).
    pub durable: bool,

    /// If true, declares queue as exclusive: the queue may only be accessed by the current
    /// connection, and it will be deleted when the current connection is closed.
    pub exclusive: bool,

    /// If true, declares queue as auto-delete: the server will delete it once the last consumer is
    /// disconnected (either by cancellation or by its channel being closed).
    ///
    /// NOTE: If a queue is declared as auto-delete but never has a consumer, it will not be
    /// deleted.
    pub auto_delete: bool,

    /// Extra arguments; these are optional in general, but may be needed for some plugins or
    /// server-specific features.
    pub arguments: FieldTable,
}

impl QueueDeclareOptions {
    pub(crate) fn into_declare(self, queue: String, passive: bool, nowait: bool) -> Declare {
        Declare {
            ticket: 0,
            queue,
            passive,
            durable: self.durable,
            exclusive: self.exclusive,
            auto_delete: self.auto_delete,
            nowait,
            arguments: self.arguments,
        }
    }
}

/// Options passed to the server when deleting a queue.
///
/// The [`default`](#impl-Default) implementation sets all boolean fields to false.
pub struct QueueDeleteOptions {
    /// If true, the server will only delete the queue if it has no consumers. If true and the
    /// queue _does_ have consumers, the server will close the current channel with an error.
    pub if_unused: bool,

    /// If true, the server will only delete the queue if it has no messages.
    pub if_empty: bool,
}

impl QueueDeleteOptions {
    pub(crate) fn into_delete(self, queue: String, nowait: bool) -> Delete {
        Delete {
            ticket: 0,
            queue,
            if_unused: self.if_unused,
            if_empty: self.if_empty,
            nowait,
        }
    }
}

/// Handle for a declared AMQP queue.
pub struct Queue<'a> {
    channel: &'a Channel,
    name: String,
    message_count: Option<u32>,
    consumer_count: Option<u32>,
}

impl<'a> Queue<'a> {
    pub(crate) fn new(
        channel: &Channel,
        name: String,
        message_count: Option<u32>,
        consumer_count: Option<u32>,
    ) -> Queue {
        Queue {
            channel,
            name,
            message_count,
            consumer_count,
        }
    }

    /// Name of the declared queue. Normally this is the name specified when you declare the queue;
    /// however, if you declare a queue with an empty name, the server will assign an autogenerated
    /// queue name.
    #[inline]
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Count of messages in the queue at the time when the queue was declared.
    ///
    /// This will be `Some(count)` if the queue was declared via
    /// [`Channel::queue_declare`](struct.Channel.html#method.queue_declare) or
    /// [`Channel::queue_declare_passive`](struct.Channel.html#method.queue_declare_passive), and
    /// will be `None` if the queue was declared via
    /// [`Channel::queue_declare_nowait`](struct.Channel.html#method.queue_declare_nowait).
    #[inline]
    pub fn declared_message_count(&self) -> Option<u32> {
        self.message_count
    }

    /// Count of consumers on the queue queue at the time when the queue was declared.
    ///
    /// This will be `Some(count)` if the queue was declared via
    /// [`Channel::queue_declare`](struct.Channel.html#method.queue_declare) or
    /// [`Channel::queue_declare_passive`](struct.Channel.html#method.queue_declare_passive), and
    /// will be `None` if the queue was declared via
    /// [`Channel::queue_declare_nowait`](struct.Channel.html#method.queue_declare_nowait).
    #[inline]
    pub fn declared_consumer_count(&self) -> Option<u32> {
        self.consumer_count
    }

    /// Synchronously get a single message from the queue.
    ///
    /// On success, returns `Some(message)` if there was a message in the queue or `None` if there
    /// were no messages in the queue. If `no_ack` is false, you are responsible for acknowledging
    /// the returned message, typically via [`Get::ack`](struct.Get.html#method.ack).
    ///
    /// Prefer using [`consume`](#method.consume) to allow the server to push messages to you on
    /// demand instead of polling with `get`.
    #[inline]
    pub fn get(&self, no_ack: bool) -> Result<Option<Get>> {
        self.channel.basic_get(self.name.clone(), no_ack)
    }

    /// Synchronously start a consumer on this queue.
    #[inline]
    pub fn consume(&self, options: ConsumerOptions) -> Result<Consumer<'a>> {
        self.channel.basic_consume(self.name.clone(), options)
    }

    /// Synchronously bind this queue to an exchange with the given routing key. `arguments` are
    /// typically optional, and are plugin / server dependent.
    #[inline]
    pub fn bind<S: Into<String>>(
        &self,
        exchange: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .queue_bind(self.name(), exchange.name(), routing_key, arguments)
    }

    /// Asynchronously bind this queue to an exchange with the given routing key. `arguments` are
    /// typically optional, and are plugin / server dependent.
    #[inline]
    pub fn bind_nowait<S: Into<String>>(
        &self,
        exchange: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .queue_bind_nowait(self.name(), exchange.name(), routing_key, arguments)
    }

    /// Synchronously unbind this queue from an exchange with the given routing key. `arguments`
    /// are typically optional, and are plugin / server dependent.
    #[inline]
    pub fn unbind<S: Into<String>>(
        &self,
        exchange: &Exchange,
        routing_key: S,
        arguments: FieldTable,
    ) -> Result<()> {
        self.channel
            .queue_unbind(self.name(), exchange.name(), routing_key, arguments)
    }

    /// Synchronously purge all messages from this queue. On success, returns the number of
    /// messages that were purged.
    #[inline]
    pub fn purge(&self) -> Result<u32> {
        self.channel.queue_purge(self.name())
    }

    /// Asynchronously purge all messages from this queue.
    #[inline]
    pub fn purge_nowait(&self) -> Result<()> {
        self.channel.queue_purge_nowait(self.name())
    }

    /// Synchronously delete this queue. On success, returns the number of messages that were in
    /// the queue when it was deleted.
    #[inline]
    pub fn delete(self, options: QueueDeleteOptions) -> Result<u32> {
        self.channel.queue_delete(self.name(), options)
    }

    /// Asynchronously delete this queue.
    #[inline]
    pub fn delete_nowait(self, options: QueueDeleteOptions) -> Result<()> {
        self.channel.queue_delete_nowait(self.name(), options)
    }
}