[][src]Struct amiquip::Connection

pub struct Connection { /* fields omitted */ }

Handle for an AMQP connection.

Opening an AMQP connection creates at least one thread - the I/O thread which is responsible for driving the underlying socket and communicating back with the Connection handle and any open Channels. An additional thread may be created to handle heartbeat timers; this is an implementation detail that may not be true on all platforms.

Closing the connection (or dropping it) will attempt to join on the I/O thread. This can block; see the close for notes.

Tuning

Opening a connection requires specifying ConnectionTuning parameters. These control resources and backpressure between the I/O loop thread and its Connection handle and open channels. This structure has three fields:

  • mem_channel_bound controls the channel size for communication from a Connection and its channels into the I/O thread. Setting this to 0 means all communications from a handle into the I/O thread will block until the I/O thread is ready to receive the message; setting it to something higher than 0 means messages into the I/O thread will be buffered and will not block. Note that many methods are synchronous in the AMQP sense (e.g., Channel::queue_declare) and will see no benefit from buffering, as they must wait for a response from the I/O thread before they return. This bound may improve performance for asynchronous messages, but see the next two fields.

  • buffered_writes_high_water and buffered_writes_low_water control how much outgoing data the I/O thread is willing to buffer up before it starts applying backpressure to the in-memory channels that Connection and its AMQP channels use to send it messages. This prevents unbounded memory growth in the I/O thread if local clients are attempting to send data faster than the AMQP server is able to receive it. If the I/O thread buffers up more than buffered_writes_high_water bytes of data, it will stop polling channels until the amount of data drops below buffered_writes_low_water. These values combine with mem_channel_bound to apply two different kinds of buffering and backpressure.

For example, suppose a connection is used exclusively for publishing data, and it is attempting to publish data as quickly as possible. It sets mem_channel_bound to 16, buffered_writes_high_water to 16 MiB, and buffered_writes_low_water to 1 MiB. Once it has published enough data that the I/O thread crosses the 16 MiB mark for buffered outgoing data, the publisher will be able to send 16 more messages into the I/O thread (note that this does necessarily mean full data messages, as AMQP messages will be broken up into muliple framed messages internally), at which point additional sends into the I/O thread will block. Once the I/O thread's buffered data amount drops below 1 MiB, it will resume polling the in-memory channel, pulling from the 16 buffered messages, freeing up space and unblocking the publisher.

Thread Safety

Connection implements both Send and Sync; however, its most useful method (open_channel) takes &mut self, which requires unique ownership. The channels returned by open_channel themselves implement Send but not Sync. After opening a connection one thread, you are free to create any number of channels and send them to other threads for use. However, they are all tied back to the original connection; when it is closed or dropped, future operations on them will fail.

Methods

impl Connection[src]

pub fn open(url: &str) -> Result<Connection>[src]

Calls open_tuned with default ConnectionTuning settings.

pub fn open_tuned(url: &str, tuning: ConnectionTuning) -> Result<Connection>[src]

Equivalent to insecure_open_tuned, except only secure URLs (amqps://...) are allowed. Calling this method with an insecure (amqp://...) URL will return an error with kind InsecureUrl.

pub fn insecure_open(url: &str) -> Result<Connection>[src]

Calls insecure_open_tuned with default ConnectionTuning settings.

pub fn insecure_open_tuned(
    url: &str,
    tuning: ConnectionTuning
) -> Result<Connection>
[src]

Open an AMQP connection from an amqp://... or amqps://... URL. Mostly follows the RabbitMQ URI Specification, with the following differences:

  • If the username and password are omitted, a username/password of guest/guest is used.
  • There is no way to specify a vhost of "" (the empty string). Passing a URL without a vhost will result in an open request to the default AMQP vhost of /.

A subset of the RabbitMQ query parameters are supported:

  • heartbeat
  • connection_timeout
  • channel_max
  • auth_mechanism (partial); the only allowed value is external, and if this query parameter is given any username or password on the URL will be ignored.

Using amqps URLs requires amiquip to be built with the native-tls feature (which is enabled by default). The TLS-related RabbitMQ query parameters are not supported; use open_tls_stream with a configured TlsConnector if you need control over the TLS configuration.

Examples

use amiquip::{Auth, Connection, ConnectionOptions, ConnectionTuning, Result};
use std::time::Duration;

// Examples below assume a helper function to open a TcpStream from an address string with
// a signature like this is available:
//   fn tcp_stream(addr: &str) -> Result<mio::net::TcpStream>;

// Empty amqp URL is equivalent to default options; handy for initial debugging and
// development.
let conn1 = Connection::insecure_open("amqp://")?;
let conn1 = Connection::insecure_open_stream(
    tcp_stream("localhost:5672")?,
    ConnectionOptions::<Auth>::default(),
    ConnectionTuning::default(),
)?;

// All possible options specified in the URL except auth_mechanism=external (which would
// override the username and password).
let conn3 = Connection::insecure_open(
    "amqp://user:pass@example.com:12345/myvhost?heartbeat=30&channel_max=1024&connection_timeout=10000",
)?;
let conn3 = Connection::insecure_open_stream(
    tcp_stream("example.com:12345")?,
    ConnectionOptions::default()
        .auth(Auth::Plain {
            username: "user".to_string(),
            password: "pass".to_string(),
        })
        .heartbeat(30)
        .channel_max(1024)
        .connection_timeout(Some(Duration::from_millis(10_000))),
    ConnectionTuning::default(),
)?;

pub fn open_tls_stream<Auth: Sasl, C: Into<TlsConnector>, S: IoStream>(
    connector: C,
    domain: &str,
    stream: S,
    options: ConnectionOptions<Auth>,
    tuning: ConnectionTuning
) -> Result<Connection>
[src]

Open an encrypted AMQP connection on a stream (typically a mio::net::TcpStream) using the provided TlsConnector.

pub fn insecure_open_stream<Auth: Sasl, S: IoStream>(
    stream: S,
    options: ConnectionOptions<Auth>,
    tuning: ConnectionTuning
) -> Result<Connection>
[src]

Open an AMQP connection on an insecure stream (typically a mio::net::TcpStream).

Consider using open_tls_stream instead, unless you are sure an insecure connection is acceptable (e.g., you're connecting to localhost).

pub fn server_properties(&self) -> &FieldTable[src]

Get the properties reported by the server during the initial AMQP handshake. This typically includes string fields like:

  • cluster_name
  • copyright
  • platform
  • product
  • version

It also typically includes a nested FieldTable under the key capabilities that describes extensions supported by the server. Relevant capabilities to amiquip include:

  • basic.nack - required to use Delivery::nack
  • connection.blocked - required for listen_for_connection_blocked to receive notifications.
  • consumer_cancel_notify - if present, the server may cancel consumers (e.g., if its queue is deleted)
  • exchange_exchange_bindings - required for Exchange::bind_to_source and related exchange-to-exchange binding methods.

pub fn open_channel(&mut self, channel_id: Option<u16>) -> Result<Channel>[src]

Open an AMQP channel on this connection. If channel_id is Some, the returned channel will have the request ID if possible, or an error will be returned if that channel ID not available. If channel_id is None, the connection will choose an available channel ID (unless all available channel IDs are exhausted, in which case this method will return ErrorKind::ExhaustedChannelIds.

The returned channel is tied to this connection in a logical sense but not in any ownership way. For example, it may be passed to a thread for use. However, closing (or dropping, which also closes) this connection will cause operations on all opened channels to fail.

pub fn listen_for_connection_blocked(
    &mut self
) -> Result<Receiver<ConnectionBlockedNotification>>
[src]

Open a crossbeam channel to receive connection blocked notifications from the server.

There can be only one connection blocked listener. If you call this method a second (or more) time, the I/O thread will drop the sending side of previously returned channels.

Dropping the Receiver returned by this method is harmless. If the I/O loop receives a connection blocked notification and there is no listener registered or the previously-registered listener has been dropped, it will discard the notification.

pub fn close(self) -> Result<()>[src]

Close this connection. This method will join on the I/O thread handle, so it may block for a nontrivial amount of time. If heartbeats are not enabled, it is possible this method could block indefinitely waiting for the server to respond to our close request.

Closing a connection will cause future operations on any channels opened on this connection to fail.

If the I/O thread panics, this method will return an error with its kind set to ErrorKind::IoThreadPanic. (Note - the I/O thread should not panic. If it does, please file an issue.) For this reason, applications that want more detail about errors to separate the use of the connection from closing it. For example:

use amiquip::{Connection, Result, ErrorKind};

fn use_connection(connection: &mut Connection) -> Result<()> {
    // ...do all the things...
}

fn run_connection(mut connection: Connection) -> Result<()> {
    // capture any errors from using the connection, but don't immediately return.
    let use_result = use_connection(&mut connection);

    // close the connection; if this fails, it is probably the most useful error
    // message (since it may indicate an I/O thread panic or other fatal error).
    connection.close()?;

    // if close completed succsssfully, return `use_result`, which might still contain
    // some other kind of error.
    use_result
}

Trait Implementations

impl Drop for Connection[src]

impl Debug for Connection[src]

Auto Trait Implementations

impl Send for Connection

impl Sync for Connection

Blanket Implementations

impl<T> From for T[src]

impl<T, U> Into for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T> Borrow for T where
    T: ?Sized
[src]

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> BorrowMut for T where
    T: ?Sized
[src]

impl<T, U> TryInto for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.