1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs, missing_debug_implementations)]
#![warn(clippy::unused_async)]
//! A rust implementation of AMQP 1.0 protocol based on serde and tokio.
//!
//! [](https://crates.io/crates/fe2o3-amqp)
//! [](https://docs.rs/fe2o3-amqp/latest/fe2o3_amqp/)
//! [](https://discord.gg/YMkaETwnFW)
//!
//! - [Quick Start](#quick-start)
//! - [Documentation](https://docs.rs/fe2o3-amqp)
//! - [Changelog](https://github.com/minghuaw/fe2o3-amqp/blob/main/fe2o3-amqp/Changelog.md)
//! - [Examples](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples)
//! - [0.7 migration guide](https://github.com/minghuaw/fe2o3-amqp/issues/120)
//!
//! # Feature flags
//!
//! ```toml
//! default = []
//! ```
//!
//! | Feature | Description |
//! |---------|-------------|
//! |`"rustls"`| enables TLS integration with `tokio-rustls` and `rustls` |
//! |`"native-tls"`| enables TLS integration with `tokio-native-tls` and `native-tls`|
//! |`"acceptor"`| enables `ConnectionAcceptor`, `SessionAcceptor`, and `LinkAcceptor`|
//! |`"transaction"`| enables `Controller`, `Transaction`, `OwnedTransaction` and `control_link_acceptor` |
//! |`"scram"`| enables SCRAM auth |
//! |`"tracing"`| enables logging with `tracing` |
//! |`"log"`| enables logging with `log` |
//!
//! # Quick start
//!
//! 1. [Client](#client)
//! 2. [Listener](#listener)
//! 3. [WebSocket binding](#websocket)
//!
//! More examples including one showing how to use it with Azure Serivce Bus can be found on the
//! [GitHub repo](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples).
//!
//! ## Client
//!
//! Below is an example with a local broker
//! ([`TestAmqpBroker`](https://github.com/Azure/amqpnetlite/releases/download/test_broker.1609/TestAmqpBroker.zip))
//! listening on the localhost. The broker is executed with the following command
//!
//! ```powershell
//! ./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
//! ```
//!
//! The following code requires the [`tokio`] async runtime added to the dependencies.
//!
//! ```rust
//! use fe2o3_amqp::{Connection, Session, Sender, Receiver};
//! use fe2o3_amqp::types::messaging::Outcome;
//!
//! #[tokio::main]
//! async fn main() {
//! let mut connection = Connection::open(
//! "connection-1", // container id
//! "amqp://guest:guest@localhost:5672" // url
//! ).await.unwrap();
//!
//! let mut session = Session::begin(&mut connection).await.unwrap();
//!
//! // Create a sender
//! let mut sender = Sender::attach(
//! &mut session, // Session
//! "rust-sender-link-1", // link name
//! "q1" // target address
//! ).await.unwrap();
//!
//! // Create a receiver
//! let mut receiver = Receiver::attach(
//! &mut session,
//! "rust-receiver-link-1", // link name
//! "q1" // source address
//! ).await.unwrap();
//!
//! // Send a message to the broker and wait for outcome (Disposition)
//! let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
//! outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
//!
//! // Send a message with batchable field set to true
//! let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
//! let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
//! outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
//!
//! // Receive the message from the broker
//! let delivery = receiver.recv::<String>().await.unwrap();
//! receiver.accept(&delivery).await.unwrap();
//!
//! sender.close().await.unwrap(); // Detach sender with closing Detach performatives
//! receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
//! session.end().await.unwrap(); // End the session
//! connection.close().await.unwrap(); // Close the connection
//! }
//! ```
//!
//! ## Listener
//!
//! ```rust
//! use tokio::net::TcpListener;
//! use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
//!
//! #[tokio::main]
//! async fn main() {
//! let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
//! let connection_acceptor = ConnectionAcceptor::new("example-listener");
//!
//! while let Ok((stream, addr)) = tcp_listener.accept().await {
//! let mut connection = connection_acceptor.accept(stream).await.unwrap();
//! let handle = tokio::spawn(async move {
//! let session_acceptor = SessionAcceptor::new();
//! while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
//! let handle = tokio::spawn(async move {
//! let link_acceptor = LinkAcceptor::new();
//! match link_acceptor.accept(&mut session).await.unwrap() {
//! LinkEndpoint::Sender(sender) => { },
//! LinkEndpoint::Receiver(recver) => { },
//! }
//! });
//! }
//! });
//! }
//! }
//! ```
//!
//! ## WebSocket
//!
//! [`fe2o3-amqp-ws`](https://crates.io/crates/fe2o3-amqp-ws) is needed for WebSocket binding
//!
//! ```rust
//! use fe2o3_amqp::{
//! types::{messaging::Outcome, primitives::Value},
//! Connection, Delivery, Receiver, Sender, Session,
//! };
//! use fe2o3_amqp_ws::WebSocketStream;
//!
//! #[tokio::main]
//! async fn main() {
//! let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
//! .await
//! .unwrap();
//! let mut connection = Connection::builder()
//! .container_id("connection-1")
//! .open_with_stream(ws_stream)
//! .await
//! .unwrap();
//!
//! connection.close().await.unwrap();
//! }
//! ```
//!
//! # More examples
//!
//! More examples of sending and receiving can be found on the [GitHub
//! repo](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/). Please note that most
//! examples requires a local broker running. One broker that can be used on Windows is
//! [TestAmqpBroker](https://azure.github.io/amqpnetlite/articles/hello_amqp.html).
//!
//! # WebAssembly support
//!
//! Experimental support for `wasm32-unknown-unknown` target is added since "0.8.11" and requires use of
//! `fe2o3-amqp-ws` to establish WebSocket connection to the broker. An example of sending and
//! receiving message in a browser tab can be found
//! [examples/wasm32-in-browser](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/wasm32-in-browser).
//!
//! # Components
//!
//! | Name | Description |
//! |------|-------------|
//! |`serde_amqp_derive`| Custom derive macro for described types as defined in AMQP1.0 protocol |
//! |`serde_amqp`| AMQP1.0 serializer and deserializer as well as primitive types |
//! |`fe2o3-amqp-types`| AMQP1.0 data types |
//! |`fe2o3-amqp`| Implementation of AMQP1.0 `Connection`, `Session`, and `Link` |
//! |`fe2o3-amqp-ext`| Extension types and implementations |
//! |`fe2o3-amqp-ws` | WebSocket binding for `fe2o3-amqp` transport |
//! |`fe2o3-amqp-management`| Experimental implementation of AMQP1.0 management |
//! |`fe2o3-amqp-cbs`| Experimental implementation of AMQP1.0 CBS |
//!
//! # Minimum rust version supported
//!
//! 1.56.0 (ie. 2021 edition)
#[macro_use]
mod macros;
pub(crate) mod control;
pub(crate) mod endpoint;
pub(crate) mod util;
pub mod auth;
pub mod connection;
pub mod frames;
pub mod link;
pub mod sasl_profile;
pub mod session;
pub mod transport;
cfg_acceptor! {
pub mod acceptor;
}
cfg_transaction! {
pub mod transaction;
}
pub mod types {
//! Re-exporting `fe2o3-amqp-types`
pub use fe2o3_amqp_types::*;
}
pub use connection::Connection;
pub use link::{
delivery::{Delivery, Sendable},
Receiver, Sender,
};
pub use session::Session;
type Payload = bytes::Bytes;
cfg_not_wasm32! {
/// A marker trait to indicate that the type is `Send` bound in non-wasm32 targets
pub trait SendBound: Send {}
impl<T> SendBound for T where T: Send {}
}
cfg_wasm32! {
/// A marker trait that is implemented for all types in wasm32 targets
pub trait SendBound {}
impl<T> SendBound for T {}
}