1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
//! amiquip is a RabbitMQ client written in pure Rust. //! //! amiquip supports most features of the AMQP spec and some RabbitMQ extensions (see a list of //! [currently unsupported features](#unsupported-features) below). It aims to be robust: problems //! on a channel or connection should lead to a relevant [error](enum.Error.html) being raised. //! Most errors, however, do result in effectively killing the channel or connection on which they //! occur. //! //! TLS support is enabled by default via the [native-tls](https://crates.io/crates/native-tls) //! crate. To enable disable TLS support at build time, disable amiquip's default features: //! //! ```toml //! [dependencies] //! amiquip = { version = "0.4", default-features = false } //! ``` //! //! If you disable TLS support, the methods `Connection::open`, `Connection::open_tuned`, and //! `Connection::open_tls_stream` will no longer be available, as all three only allow secure //! connections. The methods `Connection::insecure_open`, `Connection::insecure_open_tuned`, and //! `Connection::insecure_open_stream` will still be available, as these methods support //! unencrypted connections. //! //! # Examples //! //! A "hello world" publisher: //! //! ```rust,no_run //! use amiquip::{Connection, Exchange, Publish, Result}; //! //! fn main() -> Result<()> { //! // Open connection. //! let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?; //! //! // Open a channel - None says let the library choose the channel ID. //! let channel = connection.open_channel(None)?; //! //! // Get a handle to the direct exchange on our channel. //! let exchange = Exchange::direct(&channel); //! //! // Publish a message to the "hello" queue. //! exchange.publish(Publish::new("hello there".as_bytes(), "hello"))?; //! //! connection.close() //! } //! ``` //! //! A corresponding "hello world" consumer: //! //! ```rust,no_run //! // Port of https://www.rabbitmq.com/tutorials/tutorial-one-python.html. Run this //! // in one shell, and run the hello_world_publish example in another. //! use amiquip::{Connection, ConsumerMessage, ConsumerOptions, QueueDeclareOptions, Result}; //! //! fn main() -> Result<()> { //! // Open connection. //! let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?; //! //! // Open a channel - None says let the library choose the channel ID. //! let channel = connection.open_channel(None)?; //! //! // Declare the "hello" queue. //! let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?; //! //! // Start a consumer. //! let consumer = queue.consume(ConsumerOptions::default())?; //! println!("Waiting for messages. Press Ctrl-C to exit."); //! //! for (i, message) in consumer.receiver().iter().enumerate() { //! match message { //! ConsumerMessage::Delivery(delivery) => { //! let body = String::from_utf8_lossy(&delivery.body); //! println!("({:>3}) Received [{}]", i, body); //! consumer.ack(delivery)?; //! } //! other => { //! println!("Consumer ended: {:?}", other); //! break; //! } //! } //! } //! //! connection.close() //! } //! ``` //! //! Both of these examples are ports of the [RabbitMQ Hello World //! tutorial](https://www.rabbitmq.com/tutorials/tutorial-one-python.html). Additional examples, //! including ports of the other tutorials from that series, [are also //! available](https://github.com/jgallagher/amiquip/tree/master/examples). //! //! # Design Details //! //! When a [connection](struct.Connection.html) is opened, a thread is created to manage all reads //! and writes on the socket. Other documentation and code refers to this as the "I/O thread". Each //! connection has exactly one I/O thread. The I/O thread uses [mio](https://crates.io/crates/mio) //! to drive nonblocking connection. The connection handle and other related handles (particularly //! [channels](struct.Channel.html)) communicate with the I/O thread via [mio sync //! channels](https://crates.io/crates/mio_extras) (to the I/O thread) and [crossbeam //! channels](https://crates.io/crates/crossbeam-channel) (from the I/O thread). //! //! Heartbeats are entirely managed by the I/O thread; if heartbeats are enabled and the I/O thread //! fails to receive communication from the server for too long, it will close the connection. //! //! amiquip uses the [log](https://crates.io/crates/log) crate internally. At the `trace` log //! level, amiquip is quite noisy, but this may be valuable in debugging connection problems. //! //! ## Thread Support //! //! A [`Connection`](struct.Connection.html) is effectively bound to a single thread (it //! technically implements both `Send` and `Sync`, but most relevant methods take `&mut self`). A //! connection can open many [`Channel`](struct.Channel.html)s; a channel can only be used by a //! single thread (it implements `Send` but not `Sync`). There is no tie between a connection and //! its channels at the type system level; if the connection is closed (either intentionally or //! because of an error), all the channels that it opened will end up returning errors shortly //! thereafter. See the discussion on [`Connection::close`](struct.Connection.html#method.close) //! for a bit more information about that. //! //! A channel is able to produce other handles ([queues](struct.Queue.html), //! [exchanges](struct.Exchange.html), and [consumers](struct.Consumer.html)). These are mostly //! thin convenience wrappers around the channel, and they do hold a reference back to the channel //! that created them. This means if you want to use the connection to open a channel on one thread //! then move it to another thread to do work, you will need to declare queues, exchanges, and //! consumers from the thread where work will be done; e.g., //! //! ```rust //! use amiquip::{Connection, QueueDeclareOptions, ConsumerOptions, Result}; //! use std::thread; //! //! fn run_connection(mut connection: Connection) -> Result<()> { //! let channel = connection.open_channel(None)?; //! //! // Declaring the queue outside the thread spawn will fail, as it cannot //! // be moved into the thread. Instead, wait to declare until inside the new thread. //! //! // Would fail: //! // let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?; //! thread::spawn(move || -> Result<()> { //! // Instead, declare once the channel is moved into this thread. //! let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?; //! let consumer = queue.consume(ConsumerOptions::default())?; //! for message in consumer.receiver().iter() { //! // do something with message... //! # let _ = message; //! } //! Ok(()) //! }); //! //! // do something to keep the connection open; if we drop the connection here, //! // it will be closed, killing the channel that we just moved into a new thread //! # Ok(()) //! } //! ``` //! //! # Unsupported Features //! //! * Connection recovery. If something goes wrong with a connection, it will be torn down, and //! errors will be returned from calls on the connection and any other handles (channels, //! consumers, etc.). A connection recovery strategy could be implemented on top of amiquip. //! * Channel-level flow control. RabbitMQ, as of version 3.7.14 in March 2019, [does not //! support](https://www.rabbitmq.com/specification.html#rules) clients requesting channel flow //! control, and it does not send channel flow control messages to clients (using TCP backpressure //! instead). //! * Setting up a [`Consumer`](struct.Consumer.html) with a user-provided consumer tag. If this is //! something you need, please [file an issue](https://github.com/jgallagher/amiquip/issues). //! * `nowait` variants of [`Queue::consume`](struct.Queue.html#method.consume) and //! [`Consumer::cancel`](struct.Consumer.html#method.cancel). It is unlikely support for these will //! be added, as the synchronous versions are used to set up internal channels for consumer //! messages. //! * `nowait` variant of [`Channel::recover`](struct.Channel.html#method.recover). The //! asynchronous version of `recover` is marked as deprecated in RabbitMQ's AMQP reference. mod auth; mod channel; mod confirm; mod connection; mod connection_options; mod consumer; mod delivery; mod errors; mod exchange; mod frame_buffer; mod get; mod heartbeats; mod io_loop; mod queue; mod return_; mod serialize; mod stream; pub use auth::{Auth, Sasl}; pub use channel::Channel; pub use confirm::{Confirm, ConfirmPayload, ConfirmSmoother}; pub use connection::{Connection, ConnectionBlockedNotification, ConnectionTuning}; pub use connection_options::ConnectionOptions; pub use consumer::{Consumer, ConsumerMessage, ConsumerOptions}; pub use delivery::Delivery; pub use errors::{Error, Result}; pub use exchange::{Exchange, ExchangeDeclareOptions, ExchangeType, Publish}; pub use get::Get; pub use queue::{Queue, QueueDeclareOptions, QueueDeleteOptions}; pub use return_::Return; pub use stream::IoStream; #[cfg(feature = "native-tls")] pub use stream::TlsConnector; pub use amq_protocol::protocol::basic::AMQPProperties as AmqpProperties; pub use amq_protocol::types::AMQPValue as AmqpValue; pub use amq_protocol::types::FieldTable; #[allow(dead_code)] mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); } #[cfg(test)] mod integration_tests;