1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs, missing_debug_implementations)]
#![warn(clippy::unused_async)]

//! A rust implementation of AMQP 1.0 protocol based on serde and tokio.
//!
//! [![crate_version](https://img.shields.io/crates/v/fe2o3-amqp.svg?style=flat)](https://crates.io/crates/fe2o3-amqp)
//! [![docs_version](https://img.shields.io/badge/docs-latest-blue.svg?style=flat)](https://docs.rs/fe2o3-amqp/latest/fe2o3_amqp/)
//! [![discord](https://img.shields.io/discord/1016422034592497665?label=&logo=discord&logoColor=ffffff&color=7389D8&labelColor=6A7EC2")](https://discord.gg/YMkaETwnFW)
//!
//! - [Quick Start](#quick-start)
//! - [Documentation](https://docs.rs/fe2o3-amqp)
//! - [Changelog](https://github.com/minghuaw/fe2o3-amqp/blob/main/fe2o3-amqp/Changelog.md)
//! - [Examples](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples)
//! - [0.7 migration guide](https://github.com/minghuaw/fe2o3-amqp/issues/120)
//!
//! # Feature flags
//!
//! ```toml
//! default = []
//! ```
//!
//! | Feature | Description |
//! |---------|-------------|
//! |`"rustls"`| enables TLS integration with `tokio-rustls` and `rustls` |
//! |`"native-tls"`| enables TLS integration with `tokio-native-tls` and `native-tls`|
//! |`"acceptor"`| enables `ConnectionAcceptor`, `SessionAcceptor`, and `LinkAcceptor`|
//! |`"transaction"`| enables `Controller`, `Transaction`, `OwnedTransaction` and `control_link_acceptor` |
//! |`"scram"`| enables SCRAM auth |
//! |`"tracing"`| enables logging with `tracing` |
//! |`"log"`| enables logging with `log` |
//!
//! # Quick start
//!
//! 1. [Client](#client)
//! 2. [Listener](#listener)
//! 3. [WebSocket binding](#websocket)
//!
//! More examples including one showing how to use it with Azure Serivce Bus can be found on the
//! [GitHub repo](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples).
//!
//! ## Client
//!
//! Below is an example with a local broker
//! ([`TestAmqpBroker`](https://github.com/Azure/amqpnetlite/releases/download/test_broker.1609/TestAmqpBroker.zip))
//! listening on the localhost. The broker is executed with the following command
//!
//! ```powershell
//! ./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
//! ```
//!
//! The following code requires the [`tokio`] async runtime added to the dependencies.
//!
//! ```rust
//! use fe2o3_amqp::{Connection, Session, Sender, Receiver};
//! use fe2o3_amqp::types::messaging::Outcome;
//!
//! #[tokio::main]
//! async fn main() {
//!     let mut connection = Connection::open(
//!         "connection-1",                     // container id
//!         "amqp://guest:guest@localhost:5672" // url
//!     ).await.unwrap();
//!     
//!     let mut session = Session::begin(&mut connection).await.unwrap();
//!     
//!     // Create a sender
//!     let mut sender = Sender::attach(
//!         &mut session,           // Session
//!         "rust-sender-link-1",   // link name
//!         "q1"                    // target address
//!     ).await.unwrap();
//!     
//!     // Create a receiver
//!     let mut receiver = Receiver::attach(
//!         &mut session,
//!         "rust-receiver-link-1", // link name
//!         "q1"                    // source address
//!     ).await.unwrap();
//!     
//!     // Send a message to the broker and wait for outcome (Disposition)
//!     let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
//!     outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
//!
//!     // Send a message with batchable field set to true
//!     let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
//!     let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
//!     outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
//!
//!     // Receive the message from the broker
//!     let delivery = receiver.recv::<String>().await.unwrap();
//!     receiver.accept(&delivery).await.unwrap();
//!
//!     sender.close().await.unwrap(); // Detach sender with closing Detach performatives
//!     receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
//!     session.end().await.unwrap(); // End the session
//!     connection.close().await.unwrap(); // Close the connection
//! }
//! ```
//!
//! ## Listener
//!
//! ```rust
//! use tokio::net::TcpListener;
//! use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
//!
//! #[tokio::main]
//! async fn main() {
//!     let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
//!     let connection_acceptor = ConnectionAcceptor::new("example-listener");
//!
//!     while let Ok((stream, addr)) = tcp_listener.accept().await {
//!         let mut connection = connection_acceptor.accept(stream).await.unwrap();
//!         let handle = tokio::spawn(async move {
//!             let session_acceptor = SessionAcceptor::new();
//!             while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
//!                 let handle = tokio::spawn(async move {
//!                     let link_acceptor = LinkAcceptor::new();
//!                     match link_acceptor.accept(&mut session).await.unwrap() {
//!                         LinkEndpoint::Sender(sender) => { },
//!                         LinkEndpoint::Receiver(recver) => { },
//!                     }
//!                 });
//!             }
//!         });
//!     }
//! }
//! ```
//!
//! ## WebSocket
//!
//! [`fe2o3-amqp-ws`](https://crates.io/crates/fe2o3-amqp-ws) is needed for WebSocket binding
//!
//! ```rust
//! use fe2o3_amqp::{
//!     types::{messaging::Outcome, primitives::Value},
//!     Connection, Delivery, Receiver, Sender, Session,
//! };
//! use fe2o3_amqp_ws::WebSocketStream;
//!
//! #[tokio::main]
//! async fn main() {
//!     let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
//!         .await
//!         .unwrap();
//!     let mut connection = Connection::builder()
//!         .container_id("connection-1")
//!         .open_with_stream(ws_stream)
//!         .await
//!         .unwrap();
//!
//!     connection.close().await.unwrap();
//! }
//! ```
//!
//! # More examples
//!
//! More examples of sending and receiving can be found on the [GitHub
//! repo](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/). Please note that most
//! examples requires a local broker running. One broker that can be used on Windows is
//! [TestAmqpBroker](https://azure.github.io/amqpnetlite/articles/hello_amqp.html).
//!
//! # WebAssembly support
//!
//! Experimental support for `wasm32-unknown-unknown` target is added since "0.8.11" and requires use of
//! `fe2o3-amqp-ws` to establish WebSocket connection to the broker. An example of sending and
//! receiving message in a browser tab can be found
//! [examples/wasm32-in-browser](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/wasm32-in-browser).
//!
//! # Components
//!
//! | Name | Description |
//! |------|-------------|
//! |`serde_amqp_derive`| Custom derive macro for described types as defined in AMQP1.0 protocol |
//! |`serde_amqp`| AMQP1.0 serializer and deserializer as well as primitive types |
//! |`fe2o3-amqp-types`| AMQP1.0 data types |
//! |`fe2o3-amqp`| Implementation of AMQP1.0 `Connection`, `Session`, and `Link` |
//! |`fe2o3-amqp-ext`| Extension types and implementations |
//! |`fe2o3-amqp-ws` | WebSocket binding for `fe2o3-amqp` transport |
//! |`fe2o3-amqp-management`| Experimental implementation of AMQP1.0 management |
//! |`fe2o3-amqp-cbs`| Experimental implementation of AMQP1.0 CBS |
//!
//! # Minimum rust version supported
//!
//! 1.56.0 (ie. 2021 edition)

#[macro_use]
mod macros;

pub(crate) mod control;
pub(crate) mod endpoint;
pub(crate) mod util;

pub mod auth;
pub mod connection;
pub mod frames;
pub mod link;
pub mod sasl_profile;
pub mod session;
pub mod transport;

cfg_acceptor! {
    pub mod acceptor;
}

cfg_transaction! {
    pub mod transaction;
}

pub mod types {
    //! Re-exporting `fe2o3-amqp-types`
    pub use fe2o3_amqp_types::*;
}

pub use connection::Connection;
pub use link::{
    delivery::{Delivery, Sendable},
    Receiver, Sender,
};
pub use session::Session;

type Payload = bytes::Bytes;

cfg_not_wasm32! {
    /// A marker trait to indicate that the type is `Send` bound in non-wasm32 targets
    pub trait SendBound: Send {}
    impl<T> SendBound for T where T: Send {}
}

cfg_wasm32! {
    /// A marker trait that is implemented for all types in wasm32 targets
    pub trait SendBound {}
    impl<T> SendBound for T {}
}