Crate fe2o3_amqp
source ·Expand description
A rust implementation of AMQP 1.0 protocol based on serde and tokio.
§Feature flags
default = []
Feature | Description |
---|---|
"rustls" | enables TLS integration with tokio-rustls and rustls |
"native-tls" | enables TLS integration with tokio-native-tls and native-tls |
"acceptor" | enables ConnectionAcceptor , SessionAcceptor , and LinkAcceptor |
"transaction" | enables Controller , Transaction , OwnedTransaction and control_link_acceptor |
"scram" | enables SCRAM auth |
"tracing" | enables logging with tracing |
"log" | enables logging with log |
§Quick start
More examples including one showing how to use it with Azure Service Bus can be found on the GitHub repo.
§Client
Below is an example with a local broker
(TestAmqpBroker
)
listening on the localhost. The broker is executed with the following command
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
The following code requires the tokio
async runtime added to the dependencies.
use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;
#[tokio::main]
async fn main() {
let mut connection = Connection::open(
"connection-1", // container id
"amqp://guest:guest@localhost:5672" // url
).await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
// Create a sender
let mut sender = Sender::attach(
&mut session, // Session
"rust-sender-link-1", // link name
"q1" // target address
).await.unwrap();
// Create a receiver
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // link name
"q1" // source address
).await.unwrap();
// Send a message to the broker and wait for outcome (Disposition)
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Send a message with batchable field set to true
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Receive the message from the broker
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
sender.close().await.unwrap(); // Detach sender with closing Detach performatives
receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
session.end().await.unwrap(); // End the session
connection.close().await.unwrap(); // Close the connection
}
§Listener
use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
let connection_acceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
}
§WebSocket
fe2o3-amqp-ws
is needed for WebSocket binding
use fe2o3_amqp::{
types::{messaging::Outcome, primitives::Value},
Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;
#[tokio::main]
async fn main() {
let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
.await
.unwrap();
let mut connection = Connection::builder()
.container_id("connection-1")
.open_with_stream(ws_stream)
.await
.unwrap();
connection.close().await.unwrap();
}
§More examples
More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.
§WebAssembly support
Experimental support for wasm32-unknown-unknown
target is added since “0.8.11” and requires use of
fe2o3-amqp-ws
to establish WebSocket connection to the broker. An example of sending and
receiving message in a browser tab can be found
examples/wasm32-in-browser.
§Components
Name | Description |
---|---|
serde_amqp_derive | Custom derive macro for described types as defined in AMQP1.0 protocol |
serde_amqp | AMQP1.0 serializer and deserializer as well as primitive types |
fe2o3-amqp-types | AMQP1.0 data types |
fe2o3-amqp | Implementation of AMQP1.0 Connection , Session , and Link |
fe2o3-amqp-ext | Extension types and implementations |
fe2o3-amqp-ws | WebSocket binding for fe2o3-amqp transport |
fe2o3-amqp-management | Experimental implementation of AMQP1.0 management |
fe2o3-amqp-cbs | Experimental implementation of AMQP1.0 CBS |
§Minimum rust version supported
1.75.0
Re-exports§
pub use connection::Connection;
pub use link::delivery::Delivery;
pub use link::delivery::Sendable;
pub use link::Receiver;
pub use link::Sender;
pub use session::Session;
Modules§
- acceptor
acceptor
Acceptors for fine control over incoming connections, sessions, and links - Implements SCRAM for SASL-SCRAM-SHA-1 and SASL-SCRAM-SHA-256 auth
- Implements AMQP1.0 Connection
- Implements frame encoder and decoder
- Implements AMQP1.0 Link
- Implements SASL profile
- Implements AMQP1.0 Session
- transaction
transaction
Transaction - Implements low level transport framing
- Re-exporting
fe2o3-amqp-types
Traits§
- Send
Bound Non-WebAssembly A marker trait to indicate that the type isSend
bound in non-wasm32 targets