1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//! amiquip is a RabbitMQ client written in pure Rust.
//!
//! amiquip supports most features of the AMQP spec and some RabbitMQ extensions (see a list of
//! [currently unsupported features](#unsupported-features) below). It aims to be robust: problems
//! on a channel or connection should lead to a relevant [error](enum.ErrorKind.html) being raised.
//! Most errors, however, do result in effectively killing the channel or connection on which they
//! occur.
//!
//! TLS support is enabled by default via the [native-tls](https://crates.io/crates/native-tls)
//! crate. To enable disable TLS support at build time, disable amiquip's default features:
//!
//! ```toml
//! [dependencies]
//! amiquip = { version = "0.2", default-features = false }
//! ```
//!
//! If you disable TLS support, the methods `Connection::open`, `Connection::open_tuned`, and
//! `Connection::open_tls_stream` will no longer be available, as all three only allow secure
//! connections. The methods `Connection::insecure_open`, `Connection::insecure_open_tuned`, and
//! `Connection::insecure_open_stream` will still be available, as these methods support
//! unencrypted connections.
//!
//! # Examples
//!
//! A "hello world" publisher:
//!
//! ```rust,no_run
//! use amiquip::{Connection, Exchange, Publish, Result};
//!
//! fn main() -> Result<()> {
//!     // Open connection.
//!     let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;
//!
//!     // Open a channel - None says let the library choose the channel ID.
//!     let channel = connection.open_channel(None)?;
//!
//!     // Get a handle to the direct exchange on our channel.
//!     let exchange = Exchange::direct(&channel);
//!
//!     // Publish a message to the "hello" queue.
//!     exchange.publish(Publish::new("hello there".as_bytes(), "hello"))?;
//!
//!     connection.close()
//! }
//! ```
//!
//! A corresponding "hello world" consumer:
//!
//! ```rust,no_run
//! // Port of https://www.rabbitmq.com/tutorials/tutorial-one-python.html. Run this
//! // in one shell, and run the hello_world_publish example in another.
//! use amiquip::{Connection, ConsumerMessage, ConsumerOptions, QueueDeclareOptions, Result};
//!
//! fn main() -> Result<()> {
//!     // Open connection.
//!     let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;
//!
//!     // Open a channel - None says let the library choose the channel ID.
//!     let channel = connection.open_channel(None)?;
//!
//!     // Declare the "hello" queue.
//!     let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;
//!
//!     // Start a consumer.
//!     let consumer = queue.consume(ConsumerOptions::default())?;
//!     println!("Waiting for messages. Press Ctrl-C to exit.");
//!
//!     for (i, message) in consumer.receiver().iter().enumerate() {
//!         match message {
//!             ConsumerMessage::Delivery(delivery) => {
//!                 let body = String::from_utf8_lossy(&delivery.body);
//!                 println!("({:>3}) Received [{}]", i, body);
//!                 consumer.ack(delivery)?;
//!             }
//!             other => {
//!                 println!("Consumer ended: {:?}", other);
//!                 break;
//!             }
//!         }
//!     }
//!
//!     connection.close()
//! }
//! ```
//!
//! Both of these examples are ports of the [RabbitMQ Hello World
//! tutorial](https://www.rabbitmq.com/tutorials/tutorial-one-python.html). Additional examples,
//! including ports of the other tutorials from that series, [are also
//! available](https://github.com/jgallagher/amiquip/tree/master/examples).
//!
//! # Design Details
//!
//! When a [connection](struct.Connection.html) is opened, a thread is created to manage all reads
//! and writes on the socket. Other documentation and code refers to this as the "I/O thread". Each
//! connection has exactly one I/O thread. The I/O thread uses [mio](https://crates.io/crates/mio)
//! to drive nonblocking connection. The connection handle and other related handles (particularly
//! [channels](struct.Channel.html)) communicate with the I/O thread via [mio sync
//! channels](https://crates.io/crates/mio_extras) (to the I/O thread) and [crossbeam
//! channels](https://crates.io/crates/crossbeam-channel) (from the I/O thread).
//!
//! Heartbeats are entirely managed by the I/O thread; if heartbeats are enabled and the I/O thread
//! fails to receive communication from the server for too long, it will close the connection.
//!
//! amiquip uses the [log](https://crates.io/crates/log) crate internally. At the `trace` log
//! level, amiquip is quite noisy, but this may be valuable in debugging connection problems.
//!
//! ## Thread Support
//!
//! A [`Connection`](struct.Connection.html) is effectively bound to a single thread (it
//! technically implements both `Send` and `Sync`, but most relevant methods take `&mut self`). A
//! connection can open many [`Channel`](struct.Channel.html)s; a channel can only be used by a
//! single thread (it implements `Send` but not `Sync`). There is no tie between a connection and
//! its channels at the type system level; if the connection is closed (either intentionally or
//! because of an error), all the channels that it opened will end up returning errors shortly
//! thereafter. See the discussion on [`Connection::close`](struct.Connection.html#method.close)
//! for a bit more information about that.
//!
//! A channel is able to produce other handles ([queues](struct.Queue.html),
//! [exchanges](struct.Exchange.html), and [consumers](struct.Consumer.html)). These are mostly
//! thin convenience wrappers around the channel, and they do hold a reference back to the channel
//! that created them. This means if you want to use the connection to open a channel on one thread
//! then move it to another thread to do work, you will need to declare queues, exchanges, and
//! consumers from the thread where work will be done; e.g.,
//!
//! ```rust
//! use amiquip::{Connection, QueueDeclareOptions, ConsumerOptions, Result};
//! use std::thread;
//!
//! fn run_connection(mut connection: Connection) -> Result<()> {
//!     let channel = connection.open_channel(None)?;
//!
//!     // Declaring the queue outside the thread spawn will fail, as it cannot
//!     // be moved into the thread. Instead, wait to declare until inside the new thread.
//!
//!     // Would fail:
//!     // let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;
//!     thread::spawn(move || -> Result<()> {
//!         // Instead, declare once the channel is moved into this thread.
//!         let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;
//!         let consumer = queue.consume(ConsumerOptions::default())?;
//!         for message in consumer.receiver().iter() {
//!             // do something with message...
//!             # let _ = message;
//!         }
//!         Ok(())
//!     });
//!
//!     // do something to keep the connection open; if we drop the connection here,
//!     // it will be closed, killing the channel that we just moved into a new thread
//!     # Ok(())
//! }
//! ```
//!
//! # Unsupported Features
//!
//! * Connection recovery. If something goes wrong with a connection, it will be torn down, and
//! errors will be returned from calls on the connection and any other handles (channels,
//! consumers, etc.). A connection recovery strategy could be implemented on top of amiquip.
//! * Channel-level flow control. RabbitMQ, as of version 3.7.14 in March 2019, [does not
//! support](https://www.rabbitmq.com/specification.html#rules) clients requesting channel flow
//! control, and it does not send channel flow control messages to clients (using TCP backpressure
//! instead).
//! * Setting up a [`Consumer`](struct.Consumer.html) with a user-provided consumer tag. If this is
//! something you need, please [file an issue](https://github.com/jgallagher/amiquip/issues).
//! * `nowait` variants of [`Queue::consume`](struct.Queue.html#method.consume) and
//! [`Consumer::cancel`](struct.Consumer.html#method.cancel). It is unlikely support for these will
//! be added, as the synchronous versions are used to set up internal channels for consumer
//! messages.
//! * `nowait` variant of [`Channel::recover`](struct.Channel.html#method.recover). The
//! asynchronous version of `recover` is marked as deprecated in RabbitMQ's AMQP reference.

mod auth;
mod channel;
mod confirm;
mod connection;
mod connection_options;
mod consumer;
mod delivery;
mod errors;
mod exchange;
mod frame_buffer;
mod get;
mod heartbeats;
mod io_loop;
mod queue;
mod return_;
mod serialize;
mod stream;

pub use auth::{Auth, Sasl};
pub use channel::Channel;
pub use confirm::{Confirm, ConfirmPayload, ConfirmSmoother};
pub use connection::{Connection, ConnectionBlockedNotification, ConnectionTuning};
pub use connection_options::ConnectionOptions;
pub use consumer::{Consumer, ConsumerMessage, ConsumerOptions};
pub use delivery::Delivery;
pub use errors::{Error, ErrorKind, Result};
pub use exchange::{Exchange, ExchangeDeclareOptions, ExchangeType, Publish};
pub use get::Get;
pub use queue::{Queue, QueueDeclareOptions, QueueDeleteOptions};
pub use return_::Return;
pub use stream::IoStream;

#[cfg(feature = "native-tls")]
pub use stream::TlsConnector;

pub use amq_protocol::protocol::basic::AMQPProperties as AmqpProperties;
pub use amq_protocol::types::AMQPValue as AmqpValue;
pub use amq_protocol::types::FieldTable;

#[allow(dead_code)]
mod built_info {
    include!(concat!(env!("OUT_DIR"), "/built.rs"));
}

#[cfg(test)]
mod integration_tests;