[−][src]Struct amiquip::Connection
Handle for an AMQP connection.
Opening an AMQP connection creates at least one thread - the I/O thread which is responsible
for driving the underlying socket and communicating back with the Connection
handle and any
open Channel
s. An additional thread may be created to handle heartbeat
timers; this is an implementation detail that may not be true on all platforms.
Closing the connection (or dropping it) will attempt to join
on the I/O thread. This can
block; see the close
for notes.
Tuning
Opening a connection requires specifying ConnectionTuning
parameters. These control resources and backpressure between the I/O loop thread and its
Connection
handle and open channels. This structure has three fields:
-
mem_channel_bound
controls the channel size for communication from aConnection
and its channels into the I/O thread. Setting this to 0 means all communications from a handle into the I/O thread will block until the I/O thread is ready to receive the message; setting it to something higher than 0 means messages into the I/O thread will be buffered and will not block. Note that many methods are synchronous in the AMQP sense (e.g.,Channel::queue_declare
) and will see no benefit from buffering, as they must wait for a response from the I/O thread before they return. This bound may improve performance for asynchronous messages, but see the next two fields. -
buffered_writes_high_water
andbuffered_writes_low_water
control how much outgoing data the I/O thread is willing to buffer up before it starts applying backpressure to the in-memory channels thatConnection
and its AMQP channels use to send it messages. This prevents unbounded memory growth in the I/O thread if local clients are attempting to send data faster than the AMQP server is able to receive it. If the I/O thread buffers up more thanbuffered_writes_high_water
bytes of data, it will stop polling channels until the amount of data drops belowbuffered_writes_low_water
. These values combine withmem_channel_bound
to apply two different kinds of buffering and backpressure.
For example, suppose a connection is used exclusively for publishing data, and it is attempting
to publish data as quickly as possible. It sets mem_channel_bound
to 16,
buffered_writes_high_water
to 16 MiB, and buffered_writes_low_water
to 1 MiB. Once it has
published enough data that the I/O thread crosses the 16 MiB mark for buffered outgoing data,
the publisher will be able to send 16 more messages into the I/O thread (note that this does
necessarily mean full data messages, as AMQP messages will be broken up into muliple framed
messages internally), at which point additional sends into the I/O thread will block. Once the
I/O thread's buffered data amount drops below 1 MiB, it will resume polling the in-memory
channel, pulling from the 16 buffered messages, freeing up space and unblocking the publisher.
Thread Safety
Connection
implements both Send
and Sync
; however, its most useful method
(open_channel
) takes &mut self
, which requires unique ownership.
The channels returned by open_channel
themselves implement Send
but
not Sync
. After opening a connection one thread, you are free to create any number of
channels and send them to other threads for use. However, they are all tied back to the
original connection; when it is closed or dropped, future operations on them will fail.
Methods
impl Connection
[src]
pub fn open(url: &str) -> Result<Connection>
[src]
Calls open_tuned
with default
ConnectionTuning
settings.
pub fn open_tuned(url: &str, tuning: ConnectionTuning) -> Result<Connection>
[src]
Equivalent to insecure_open_tuned
, except
only secure URLs (amqps://...
) are allowed. Calling this method with an insecure
(amqp://...
) URL will return an error with kind
InsecureUrl
.
pub fn insecure_open(url: &str) -> Result<Connection>
[src]
Calls insecure_open_tuned
with default
ConnectionTuning
settings.
pub fn insecure_open_tuned(
url: &str,
tuning: ConnectionTuning
) -> Result<Connection>
[src]
url: &str,
tuning: ConnectionTuning
) -> Result<Connection>
Open an AMQP connection from an amqp://...
or amqps://...
URL. Mostly follows the
RabbitMQ URI Specification, with the following
differences:
- If the username and password are omitted, a username/password of
guest
/guest
is used. - There is no way to specify a vhost of
""
(the empty string). Passing a URL without a vhost will result in an open request to the default AMQP vhost of/
.
A subset of the RabbitMQ query parameters are supported:
heartbeat
connection_timeout
channel_max
auth_mechanism
(partial); the only allowed value isexternal
, and if this query parameter is given any username or password on the URL will be ignored.
Using amqps
URLs requires amiquip to be built with the native-tls
feature (which is
enabled by default). The TLS-related RabbitMQ query parameters are not supported; use
open_tls_stream
with a configured TlsConnector
if you need
control over the TLS configuration.
Examples
use amiquip::{Auth, Connection, ConnectionOptions, ConnectionTuning, Result}; use std::time::Duration; // Examples below assume a helper function to open a TcpStream from an address string with // a signature like this is available: // fn tcp_stream(addr: &str) -> Result<mio::net::TcpStream>; // Empty amqp URL is equivalent to default options; handy for initial debugging and // development. let conn1 = Connection::insecure_open("amqp://")?; let conn1 = Connection::insecure_open_stream( tcp_stream("localhost:5672")?, ConnectionOptions::<Auth>::default(), ConnectionTuning::default(), )?; // All possible options specified in the URL except auth_mechanism=external (which would // override the username and password). let conn3 = Connection::insecure_open( "amqp://user:pass@example.com:12345/myvhost?heartbeat=30&channel_max=1024&connection_timeout=10000", )?; let conn3 = Connection::insecure_open_stream( tcp_stream("example.com:12345")?, ConnectionOptions::default() .auth(Auth::Plain { username: "user".to_string(), password: "pass".to_string(), }) .heartbeat(30) .channel_max(1024) .connection_timeout(Some(Duration::from_millis(10_000))), ConnectionTuning::default(), )?;
pub fn open_tls_stream<Auth: Sasl, C: Into<TlsConnector>, S: IoStream>(
connector: C,
domain: &str,
stream: S,
options: ConnectionOptions<Auth>,
tuning: ConnectionTuning
) -> Result<Connection>
[src]
connector: C,
domain: &str,
stream: S,
options: ConnectionOptions<Auth>,
tuning: ConnectionTuning
) -> Result<Connection>
Open an encrypted AMQP connection on a stream (typically a mio::net::TcpStream
)
using the provided TlsConnector
.
pub fn insecure_open_stream<Auth: Sasl, S: IoStream>(
stream: S,
options: ConnectionOptions<Auth>,
tuning: ConnectionTuning
) -> Result<Connection>
[src]
stream: S,
options: ConnectionOptions<Auth>,
tuning: ConnectionTuning
) -> Result<Connection>
Open an AMQP connection on an insecure stream (typically a mio::net::TcpStream
).
Consider using open_tls_stream
instead, unless you are sure an
insecure connection is acceptable (e.g., you're connecting to localhost
).
pub fn server_properties(&self) -> &FieldTable
[src]
Get the properties reported by the server during the initial AMQP handshake. This typically includes string fields like:
cluster_name
copyright
platform
product
version
It also typically includes a nested FieldTable
under the key capabilities
that
describes extensions supported by the server. Relevant capabilities to amiquip include:
basic.nack
- required to useDelivery::nack
connection.blocked
- required forlisten_for_connection_blocked
to receive notifications.consumer_cancel_notify
- if present, the server may cancel consumers (e.g., if its queue is deleted)exchange_exchange_bindings
- required forExchange::bind_to_source
and related exchange-to-exchange binding methods.
pub fn open_channel(&mut self, channel_id: Option<u16>) -> Result<Channel>
[src]
Open an AMQP channel on this connection. If channel_id
is Some
, the returned channel
will have the request ID if possible, or an error will be returned if that channel ID not
available. If channel_id
is None
, the connection will choose an available channel ID
(unless all available channel IDs are exhausted, in which case this method will return
ErrorKind::ExhaustedChannelIds
.
The returned channel is tied to this connection in a logical sense but not in any ownership way. For example, it may be passed to a thread for use. However, closing (or dropping, which also closes) this connection will cause operations on all opened channels to fail.
pub fn listen_for_connection_blocked(
&mut self
) -> Result<Receiver<ConnectionBlockedNotification>>
[src]
&mut self
) -> Result<Receiver<ConnectionBlockedNotification>>
Open a crossbeam channel to receive connection blocked notifications from the server.
There can be only one connection blocked listener. If you call this method a second (or more) time, the I/O thread will drop the sending side of previously returned channels.
Dropping the Receiver
returned by this method is harmless. If the I/O loop receives a
connection blocked notification and there is no listener registered or the
previously-registered listener has been dropped, it will discard the notification.
pub fn close(self) -> Result<()>
[src]
Close this connection. This method will join on the I/O thread handle, so it may block for a nontrivial amount of time. If heartbeats are not enabled, it is possible this method could block indefinitely waiting for the server to respond to our close request.
Closing a connection will cause future operations on any channels opened on this connection to fail.
If the I/O thread panics, this method will return an error with its
kind set to
ErrorKind::IoThreadPanic
. (Note - the I/O
thread should not panic. If it does, please file an
issue.) For this reason, applications that
want more detail about errors to separate the use of the connection from closing it. For
example:
use amiquip::{Connection, Result, ErrorKind}; fn use_connection(connection: &mut Connection) -> Result<()> { // ...do all the things... } fn run_connection(mut connection: Connection) -> Result<()> { // capture any errors from using the connection, but don't immediately return. let use_result = use_connection(&mut connection); // close the connection; if this fails, it is probably the most useful error // message (since it may indicate an I/O thread panic or other fatal error). connection.close()?; // if close completed succsssfully, return `use_result`, which might still contain // some other kind of error. use_result }
Trait Implementations
impl Drop for Connection
[src]
impl Debug for Connection
[src]
Auto Trait Implementations
impl Send for Connection
impl Sync for Connection
Blanket Implementations
impl<T> From for T
[src]
impl<T, U> Into for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T> Borrow for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> BorrowMut for T where
T: ?Sized,
[src]
T: ?Sized,
fn borrow_mut(&mut self) -> &mut T
[src]
impl<T, U> TryInto for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,