[][src]Crate amiquip

amiquip is a RabbitMQ client written in pure Rust.

amiquip supports most features of the AMQP spec and some RabbitMQ extensions (see a list of currently unsupported features below). It aims to be robust: problems on a channel or connection should lead to a relevant error being raised. Most errors, however, do result in effectively killing the channel or connection on which they occur.

TLS support is enabled by default via the native-tls crate. To enable disable TLS support at build time, disable amiquip's default features:

[dependencies]
amiquip = { version = "0.2", default-features = false }

If you disable TLS support, the methods Connection::open, Connection::open_tuned, and Connection::open_tls_stream will no longer be available, as all three only allow secure connections. The methods Connection::insecure_open, Connection::insecure_open_tuned, and Connection::insecure_open_stream will still be available, as these methods support unencrypted connections.

Examples

A "hello world" publisher:

use amiquip::{Connection, Exchange, Publish, Result};

fn main() -> Result<()> {
    // Open connection.
    let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;

    // Open a channel - None says let the library choose the channel ID.
    let channel = connection.open_channel(None)?;

    // Get a handle to the direct exchange on our channel.
    let exchange = Exchange::direct(&channel);

    // Publish a message to the "hello" queue.
    exchange.publish(Publish::new("hello there".as_bytes(), "hello"))?;

    connection.close()
}

A corresponding "hello world" consumer:

// Port of https://www.rabbitmq.com/tutorials/tutorial-one-python.html. Run this
// in one shell, and run the hello_world_publish example in another.
use amiquip::{Connection, ConsumerMessage, ConsumerOptions, QueueDeclareOptions, Result};

fn main() -> Result<()> {
    // Open connection.
    let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;

    // Open a channel - None says let the library choose the channel ID.
    let channel = connection.open_channel(None)?;

    // Declare the "hello" queue.
    let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;

    // Start a consumer.
    let consumer = queue.consume(ConsumerOptions::default())?;
    println!("Waiting for messages. Press Ctrl-C to exit.");

    for (i, message) in consumer.receiver().iter().enumerate() {
        match message {
            ConsumerMessage::Delivery(delivery) => {
                let body = String::from_utf8_lossy(&delivery.body);
                println!("({:>3}) Received [{}]", i, body);
                consumer.ack(delivery)?;
            }
            other => {
                println!("Consumer ended: {:?}", other);
                break;
            }
        }
    }

    connection.close()
}

Both of these examples are ports of the RabbitMQ Hello World tutorial. Additional examples, including ports of the other tutorials from that series, are also available.

Design Details

When a connection is opened, a thread is created to manage all reads and writes on the socket. Other documentation and code refers to this as the "I/O thread". Each connection has exactly one I/O thread. The I/O thread uses mio to drive nonblocking connection. The connection handle and other related handles (particularly channels) communicate with the I/O thread via mio sync channels (to the I/O thread) and crossbeam channels (from the I/O thread).

Heartbeats are entirely managed by the I/O thread; if heartbeats are enabled and the I/O thread fails to receive communication from the server for too long, it will close the connection.

amiquip uses the log crate internally. At the trace log level, amiquip is quite noisy, but this may be valuable in debugging connection problems.

Thread Support

A Connection is effectively bound to a single thread (it technically implements both Send and Sync, but most relevant methods take &mut self). A connection can open many Channels; a channel can only be used by a single thread (it implements Send but not Sync). There is no tie between a connection and its channels at the type system level; if the connection is closed (either intentionally or because of an error), all the channels that it opened will end up returning errors shortly thereafter. See the discussion on Connection::close for a bit more information about that.

A channel is able to produce other handles (queues, exchanges, and consumers). These are mostly thin convenience wrappers around the channel, and they do hold a reference back to the channel that created them. This means if you want to use the connection to open a channel on one thread then move it to another thread to do work, you will need to declare queues, exchanges, and consumers from the thread where work will be done; e.g.,

use amiquip::{Connection, QueueDeclareOptions, ConsumerOptions, Result};
use std::thread;

fn run_connection(mut connection: Connection) -> Result<()> {
    let channel = connection.open_channel(None)?;

    // Declaring the queue outside the thread spawn will fail, as it cannot
    // be moved into the thread. Instead, wait to declare until inside the new thread.

    // Would fail:
    // let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;
    thread::spawn(move || -> Result<()> {
        // Instead, declare once the channel is moved into this thread.
        let queue = channel.queue_declare("hello", QueueDeclareOptions::default())?;
        let consumer = queue.consume(ConsumerOptions::default())?;
        for message in consumer.receiver().iter() {
            // do something with message...
        }
        Ok(())
    });

    // do something to keep the connection open; if we drop the connection here,
    // it will be closed, killing the channel that we just moved into a new thread
}

Unsupported Features

  • Connection recovery. If something goes wrong with a connection, it will be torn down, and errors will be returned from calls on the connection and any other handles (channels, consumers, etc.). A connection recovery strategy could be implemented on top of amiquip.
  • Channel-level flow control. RabbitMQ, as of version 3.7.14 in March 2019, does not support clients requesting channel flow control, and it does not send channel flow control messages to clients (using TCP backpressure instead).
  • Setting up a Consumer with a user-provided consumer tag. If this is something you need, please file an issue.
  • nowait variants of Queue::consume and Consumer::cancel. It is unlikely support for these will be added, as the synchronous versions are used to set up internal channels for consumer messages.
  • nowait variant of Channel::recover. The asynchronous version of recover is marked as deprecated in RabbitMQ's AMQP reference.

Structs

AmqpProperties

basic properties (Generated)

Channel

Handle for an AMQP channel.

ConfirmPayload

Payload for a publisher confirmation message (either an ack or a nack) from the server.

ConfirmSmoother

Helper to smooth out of order and/or multiple: true publisher confirmation messages.

Connection

Handle for an AMQP connection.

ConnectionOptions

Options that control the overall AMQP connection.

ConnectionTuning

Tuning parameters for the amiquip client.

Consumer

A message consumer associated with an AMQP queue.

ConsumerOptions

Options passed to the server when starting a consumer.

Delivery

A message delivered to a consumer.

Error

An error that can occur from amiquip.

Exchange

Handle for a declared AMQP exchange.

ExchangeDeclareOptions

Options passed to the server when declaring an exchange.

Get

A message delivered in response to a get request.

Publish

Wrapper for a message to be published.

Queue

Handle for a declared AMQP queue.

QueueDeclareOptions

Options passed to the server when declaring a queue.

QueueDeleteOptions

Options passed to the server when deleting a queue.

Return

An unpublished message returned to the publishing channel.

TlsConnector

Newtype wrapper around a native_tls::TlsConnector to make it usable by amiquip's I/O loop.

Enums

AmqpValue

Enumeration referencing the possible AMQP values depending on the types

Auth

Built-in authentication mechanisms.

Confirm

A publisher confirmation message from the server.

ConnectionBlockedNotification

Asynchronous notifications sent by the server when it temporarily blocks a connection, typically due to a resource alarm.

ConsumerMessage

Messages delivered to consumers.

ErrorKind

Specific error cases returned by amiquip.

ExchangeType

Types of AMQP exchanges.

Traits

IoStream

Combination trait for readable, writable streams that can be polled by mio.

Sasl

A trait encapsulating the operations required to authenticate to an AMQP server.

Type Definitions

FieldTable

A Map<String, AMQPValue>

Result

A type alias for handling errors throughout amiquip.