Crate fe2o3_amqp

source ·
Expand description

A rust implementation of AMQP 1.0 protocol based on serde and tokio.

crate_version docs_version

Feature flags

default = []
FeatureDescription
"rustls"enables TLS integration with tokio-rustls and rustls
"native-tls"enables TLS integration with tokio-native-tls and native-tls
"acceptor"enables ConnectionAcceptor, SessionAcceptor, and LinkAcceptor
"transaction"enables Controller, Transaction, OwnedTransaction and control_link_acceptor
"scram"enables SCRAM auth
"tracing"enables logging with tracing
"log"enables logging with log

Quick start

  1. Client
  2. Listener
  3. WebSocket binding

More examples including one showing how to use it with Azure Serivce Bus can be found on the GitHub repo.

Client

Below is an example with a local broker (TestAmqpBroker) listening on the localhost. The broker is executed with the following command

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

The following code requires the tokio async runtime added to the dependencies.

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    let mut connection = Connection::open(
        "connection-1",                     // container id
        "amqp://guest:guest@localhost:5672" // url
    ).await.unwrap();
     
    let mut session = Session::begin(&mut connection).await.unwrap();
     
    // Create a sender
    let mut sender = Sender::attach(
        &mut session,           // Session
        "rust-sender-link-1",   // link name
        "q1"                    // target address
    ).await.unwrap();
     
    // Create a receiver
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // link name
        "q1"                    // source address
    ).await.unwrap();
     
    // Send a message to the broker and wait for outcome (Disposition)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Send a message with batchable field set to true
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Receive the message from the broker
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // Detach sender with closing Detach performatives
    receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
    session.end().await.unwrap(); // End the session
    connection.close().await.unwrap(); // Close the connection
}

Listener

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

WebSocket

fe2o3-amqp-ws is needed for WebSocket binding

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

More examples

More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.

WebAssembly support

Experimental support for wasm32-unknown-unknown target is added since “0.8.11” and requires use of fe2o3-amqp-ws to establish WebSocket connection to the broker. An example of sending and receiving message in a browser tab can be found examples/wasm32-in-browser.

Components

NameDescription
serde_amqp_deriveCustom derive macro for described types as defined in AMQP1.0 protocol
serde_amqpAMQP1.0 serializer and deserializer as well as primitive types
fe2o3-amqp-typesAMQP1.0 data types
fe2o3-amqpImplementation of AMQP1.0 Connection, Session, and Link
fe2o3-amqp-extExtension types and implementations
fe2o3-amqp-wsWebSocket binding for fe2o3-amqp transport
fe2o3-amqp-managementExperimental implementation of AMQP1.0 management
fe2o3-amqp-cbsExperimental implementation of AMQP1.0 CBS

Minimum rust version supported

1.56.0 (ie. 2021 edition)

Re-exports

pub use connection::Connection;
pub use link::delivery::Delivery;
pub use link::delivery::Sendable;
pub use link::Receiver;
pub use link::Sender;
pub use session::Session;

Modules

acceptoracceptor
Acceptors for fine control over incoming connections, sessions, and links
Implements SCRAM for SASL-SCRAM-SHA-1 and SASL-SCRAM-SHA-256 auth
Implements AMQP1.0 Connection
Implements frame encoder and decoder
Implements AMQP1.0 Link
Implements SASL profile
Implements AMQP1.0 Session
transactiontransaction
Transaction
Implements low level transport framing
Re-exporting fe2o3-amqp-types

Traits

A marker trait to indicate that the type is Send bound in non-wasm32 targets