pub struct Connection { /* private fields */ }
Expand description

An AMQP 1.0 Connection.

§Open a new Connection with default configuration

Below is an example with a local broker ( TestAmqpBroker) listening on the localhost. The broker is executed with the following command

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
let connection = Connection::open(
    "connection-1", // container id
    "amqp://guest:guest@localhost:5672" // url with username and password
).await.unwrap();

§Default configuration

FieldDefault Value
max_frame_sizeDEFAULT_MAX_FRAME_SIZE
channel_maxDEFAULT_CHANNEL_MAX
idle_time_outNone
outgoing_localesNone
incoming_localesNone
offered_capabilitiesNone
desired_capabilitiesNone
PropertiesNone

§Order of negotiation

The order of negotiation follows the priority below

  1. TLS
  2. SASL
  3. AMQP

§Customize configuration with Builder

The example above creates a connection with the default configuration. If the user needs to customize the configuration, the connection Builder should be used.

let connection = Connection::builder()
    .container_id("connection-1")
    .max_frame_size(4096)
    .channel_max(64)
    .idle_time_out(50_000 as u32)
    .open("amqp://guest:guest@localhost:5672")
    .await.unwrap();

§TLS

If “amqps” is found in url’s scheme, the connection will start with exchanging TLS protocol header ([‘A’, ‘M’, ‘Q’, ‘P’, 2, 1, 0, 0]). TLS support is only enabled by selecting one and only one of the following feature flags

  1. "rustls": enables TLS support with tokio-rustls
  2. "native-tls": enables TLS support with tokio-native-tls

§Alternative Establishment

The specification allows establishing Connection on a pure TLS stream without exchanging the TLS protocol header, and this can be accomplished using Builder’s open_with_stream. An example of establishing connection on a tokio_native_tls::TlsStream is shown below. The tls_stream can be replaced with a tokio_rustls::client::TlsStream.

let addr = "localhost:5671";
let domain = "localhost";
let stream = TcpStream::connect(addr).await.unwrap();
let connector = native_tls::TlsConnector::new();
let connector = tokio_native_tls::TlsConnector::from(connector);
let tls_stream = connector.connect(domain, stream).await.unwrap();

let mut connection = Connection::builder()
    .container_id("connection-1")
    .scheme("amqp")
    .sasl_profile(SaslProfile::Plain {
        username: "guest".into(),
        password: "guest".into()
    })
    .open_with_stream(tls_stream)
    .await
    .unwrap();

§TLS with feature "rustls" enabled

TLS connection can be established with a default connector or a custom tokio_rustls::TlsConnector. The following connector is used unless a custom connector is supplied to the builder.

let mut root_cert_store = RootCertStore::empty();
root_cert_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(
    |ta| {
        OwnedTrustAnchor::from_subject_spki_name_constraints(
            ta.subject,
            ta.spki,
            ta.name_constraints,
        )
    },
));
let config = ClientConfig::builder()
    .with_safe_defaults()
    .with_root_certificates(root_cert_store)
    .with_no_client_auth();
let connector = TlsConnector::from(Arc::new(config));

Start TLS connection negotiation with default TLS connector

let connection = Connection::open("example-connection", "amqps://guest:guest@localhost:5671").await.unwrap();

Below shows how to use a custom tokio_rustls::TlsConnector for TLS.

let config = rustls::ClientConfig::builder()
    .with_safe_defaults()
    .with_root_certificates(root_cert_store)
    .with_no_client_auth(); // i guess this was previously the default?
let connector = TlsConnector::from(Arc::new(config));

let connection = Connection::builder()
    .container_id("connection-1")
    .tls_connector(connector)
    .open("amqps://guest:guest@localhost:5671")
    .await.unwrap();

§TLS with feature "native-tls" enabled

TLS connection can be established with a default connector or a custom tokio_native_tls::TlsConnector. The following connector is used unless a custom connector is supplied to the builder.

let connector = native_tls::TlsConnector::new().unwrap();
let connector = tokio_native_tls::TlsConnector::from(connector);

Below shows how to use a custom tokio_native_tls::TlsConnector.

let connector = native_tls::TlsConnector::new().unwrap();
let connector = tokio_native_tls::TlsConnector::from(connector);

let connection = Connection::builder()
    .container_id("connection-1")
    .tls_connector(connector)
    .open("amqps://guest:guest@localhost:5671")
    .await.unwrap();

§SASL

If username and password are supplied with the url, the connection negotiation will start with SASL PLAIN negotiation. Other than filling username and password in the url, one could also supply the information with sasl_profile field of the Builder. Please note that the SASL profile found in the url will override whatever SaslProfile supplied to the Builder.

The examples below shows two ways of starting the connection with SASL negotiation.

  1. Start SASL negotiation with SASL PLAIN profile extracted from the url

    let connection = Connection::open("connection-1", "amqp://guest:guest@localhost:5672").await.unwrap();
  2. Start SASL negotiation with the builder. Please note that tf the url contains username and password, the profile supplied to the builder will be overriden.

    // This is equivalent to the line above
    let profile = SaslProfile::Plain {
        username: "guest".to_string(),
        password: "guest".to_string()
    };
    let connection = Connection::builder()
        .container_id("connection-1")
        .sasl_profile(profile)
        .open("amqp://localhost:5672")
        .await.unwrap();

Implementations§

source§

impl Connection

source

pub fn builder<'a>() -> Builder<'a, ConnectorNoId, ()>

Creates a Builder for Connection

source

pub async fn open( container_id: impl Into<String>, url: impl TryInto<Url, Error = impl Into<OpenError>> ) -> Result<ConnectionHandle<()>, OpenError>

Available on non-WebAssembly only.

Negotiate and open a Connection with the default configuration

§Default configuration
FieldDefault Value
max_frame_sizeDEFAULT_MAX_FRAME_SIZE
channel_maxDEFAULT_CHANNEL_MAX
idle_time_outNone
outgoing_localesNone
incoming_localesNone
offered_capabilitiesNone
desired_capabilitiesNone
PropertiesNone

The negotiation depends on the url supplied.

§Raw AMQP
let connection = Connection::open("connection-1", "amqp://localhost:5672").await.unwrap();
§TLS

TLS support is enabled by selecting one and only one of the following feature flags

  1. "rustls": enables TLS support with tokio-rustls
  2. "native-tls": enables TLS support with tokio-native-tls
let connection = Connection::open("connection-1", "amqps://localhost:5671").await.unwrap();
§TLS with feature "rustls" enabled

TLS connection can be established with a default connector or a custom tokio_rustls::TlsConnector. The following connector is used unless a custom connector is supplied to the builder.

let mut root_cert_store = RootCertStore::empty();
root_cert_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(
    |ta| {
        OwnedTrustAnchor::from_subject_spki_name_constraints(
            ta.subject,
            ta.spki,
            ta.name_constraints,
        )
    },
));
let config = ClientConfig::builder()
    .with_safe_defaults()
    .with_root_certificates(root_cert_store)
    .with_no_client_auth();
let connector = TlsConnector::from(Arc::new(config));
§TLS with feature "native-tls" enabled

TLS connection can be established with a default connector or a custom tokio_native_tls::TlsConnector. The following connector is used unless a custom connector is supplied to the builder.

let connector = native_tls::TlsConnector::new().unwrap();
let connector = tokio_native_tls::TlsConnector::from(connector);
§SASL

Start SASL negotiation with SASL PLAIN profile extracted from the url

let connection = Connection::open("connection-1", "amqp://guest:guest@localhost:5672").await.unwrap();

Trait Implementations§

source§

impl Debug for Connection

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> SendBound for T
where T: Send,