Crate fe2o3_amqp

source ·
Expand description

A rust implementation of AMQP 1.0 protocol based on serde and tokio.

crate_version docs_version discord

§Feature flags

default = []
FeatureDescription
"rustls"enables TLS integration with tokio-rustls and rustls
"native-tls"enables TLS integration with tokio-native-tls and native-tls
"acceptor"enables ConnectionAcceptor, SessionAcceptor, and LinkAcceptor
"transaction"enables Controller, Transaction, OwnedTransaction and control_link_acceptor
"scram"enables SCRAM auth
"tracing"enables logging with tracing
"log"enables logging with log

§Quick start

  1. Client
  2. Listener
  3. WebSocket binding

More examples including one showing how to use it with Azure Service Bus can be found on the GitHub repo.

§Client

Below is an example with a local broker (TestAmqpBroker) listening on the localhost. The broker is executed with the following command

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

The following code requires the tokio async runtime added to the dependencies.

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    let mut connection = Connection::open(
        "connection-1",                     // container id
        "amqp://guest:guest@localhost:5672" // url
    ).await.unwrap();
     
    let mut session = Session::begin(&mut connection).await.unwrap();
     
    // Create a sender
    let mut sender = Sender::attach(
        &mut session,           // Session
        "rust-sender-link-1",   // link name
        "q1"                    // target address
    ).await.unwrap();
     
    // Create a receiver
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // link name
        "q1"                    // source address
    ).await.unwrap();
     
    // Send a message to the broker and wait for outcome (Disposition)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Send a message with batchable field set to true
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Receive the message from the broker
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // Detach sender with closing Detach performatives
    receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
    session.end().await.unwrap(); // End the session
    connection.close().await.unwrap(); // Close the connection
}

§Listener

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

§WebSocket

fe2o3-amqp-ws is needed for WebSocket binding

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

§More examples

More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.

§WebAssembly support

Experimental support for wasm32-unknown-unknown target is added since “0.8.11” and requires use of fe2o3-amqp-ws to establish WebSocket connection to the broker. An example of sending and receiving message in a browser tab can be found examples/wasm32-in-browser.

§Components

NameDescription
serde_amqp_deriveCustom derive macro for described types as defined in AMQP1.0 protocol
serde_amqpAMQP1.0 serializer and deserializer as well as primitive types
fe2o3-amqp-typesAMQP1.0 data types
fe2o3-amqpImplementation of AMQP1.0 Connection, Session, and Link
fe2o3-amqp-extExtension types and implementations
fe2o3-amqp-wsWebSocket binding for fe2o3-amqp transport
fe2o3-amqp-managementExperimental implementation of AMQP1.0 management
fe2o3-amqp-cbsExperimental implementation of AMQP1.0 CBS

§Minimum rust version supported

1.75.0

Re-exports§

Modules§

  • acceptoracceptor
    Acceptors for fine control over incoming connections, sessions, and links
  • Implements SCRAM for SASL-SCRAM-SHA-1 and SASL-SCRAM-SHA-256 auth
  • Implements AMQP1.0 Connection
  • Implements frame encoder and decoder
  • Implements AMQP1.0 Link
  • Implements SASL profile
  • Implements AMQP1.0 Session
  • transactiontransaction
    Transaction
  • Implements low level transport framing
  • Re-exporting fe2o3-amqp-types

Traits§

  • SendBoundNon-WebAssembly
    A marker trait to indicate that the type is Send bound in non-wasm32 targets