fe2o3_amqp_ws/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![deny(missing_docs, missing_debug_implementations)]
3
4//! WebSocket adapter for AMQP 1.0 websocket binding
5//!
6//! This provides a thin wrapper over `tokio_tungstenite::WebSocketStream`, and the wrapper performs
7//! the WebSocket handshake with the "Sec-WebSocket-Protocol" HTTP header set to "amqp".
8//!
9//! The wrapper type [`WebSocketStream`] could also be used for non-AMQP applications; however, the
10//! user should establish websocket stream with raw `tokio_tungstenite` API and then wrap the stream
11//! with the wrapper by `fe2o3_amqp_ws::WebSocketStream::from(ws_stream)`.
12//!
13//! # Feature flags
14//!
15//! ```toml
16//! default = []
17//! ```
18//!
19//! | Feature | Description |
20//! |---------|-------------|
21//! | `native-tls` | Enables "tokio-tungstenite/native-tls" |
22//! | `native-tls-vendored` | Enables "tokio-tungstenite/native-tls-vendored" |
23//! | `rustls-tls-native-roots` | Enables "tokio-tungstenite/rustls-tls-native-roots" |
24//! | `rustls-tls-webpki-roots` | Enables "tokio-tungstenite/rustls-tls-webpki-roots" |
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! use fe2o3_amqp::{
30//! types::{messaging::Outcome, primitives::Value},
31//! Connection, Delivery, Receiver, Sender, Session,
32//! };
33//! use fe2o3_amqp_ws::WebSocketStream;
34//!
35//! #[tokio::main]
36//! async fn main() {
37//! let ws_stream = WebSocketStream::connect("ws://localhost:5673")
38//! .await
39//! .unwrap();
40//! let mut connection = Connection::builder()
41//! .container_id("connection-1")
42//! .open_with_stream(ws_stream)
43//! .await
44//! .unwrap();
45//! let mut session = Session::begin(&mut connection).await.unwrap();
46//!
47//! let mut sender = Sender::attach(&mut session, "rust-sender-link-1", "q1")
48//! .await
49//! .unwrap();
50//! let mut receiver = Receiver::attach(&mut session, "rust-recver-1", "q1")
51//! .await
52//! .unwrap();
53//!
54//! let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
55//!
56//! let delivery: Delivery<Value> = receiver.recv().await.unwrap();
57//! receiver.accept(&delivery).await.unwrap();
58//!
59//! let outcome: Outcome = fut.await.unwrap();
60//! outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
61//!
62//! sender.close().await.unwrap();
63//! receiver.close().await.unwrap();
64//! session.end().await.unwrap();
65//! connection.close().await.unwrap();
66//! }
67//! ```
68//!
69//! ## WebAssembly support
70//!
71//! Experimental support for `wasm32-unknown-unknown` target has been added since "0.3.0" and uses a
72//! `web_sys::WebSocket` internally. An example of this can be found in
73//! [examples/wasm32-in-browser](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/wasm32-in-browser).
74
75use std::{
76 io::{self, Cursor, Read},
77 task::Poll,
78};
79
80use bytes::Bytes;
81use futures_util::{ready, Sink, Stream};
82use pin_project_lite::pin_project;
83use tokio::io::{AsyncRead, AsyncWrite};
84
85mod error;
86pub use error::Error;
87
88#[macro_use]
89mod macros;
90
91cfg_not_wasm32! {
92 pub mod native;
93}
94
95cfg_wasm32! {
96 pub mod wasm;
97}
98
99const SEC_WEBSOCKET_PROTOCOL_AMQP: &str = "amqp";
100
101/// This a wrapper around `tungstenite::Message`
102#[derive(Debug, Eq, PartialEq, Clone)]
103pub struct WsMessage(pub tungstenite::Message);
104
105pin_project! {
106 /// A wrapper over [`tokio_tungstenite::WebSoccketStream`] that implements
107 /// `tokio::io::AsyncRead` and `tokio::io::AsyncWrite`.
108 ///
109 /// The public APIs all internally call their equivalent in `tokio_tungstenite` and checks the
110 /// response. The only difference is that the APIs will set "Sec-WebSocket-Protocol" HTTP header
111 /// to "amqp".
112 ///
113 /// The "Sec-WebSocket-Protocol" HTTP header identifies the WebSocket subprotocol. For this
114 /// AMQP WebSocket binding, the value MUST be set to the US-ASCII text string “amqp” which
115 /// refers to the 1.0 version of the AMQP 1.0 or greater, with version negotiation as
116 /// defined by AMQP 1.0.
117 ///
118 /// If the Client does not receive a response with HTTP status code 101 and an HTTP
119 /// Sec-WebSocket-Protocol equal to the US-ASCII text string "amqp" then the Client MUST close
120 /// the socket connection
121 ///
122 /// # Example
123 ///
124 /// ```rust,no_run
125 /// use fe2o3_amqp::{
126 /// types::{messaging::Outcome, primitives::Value},
127 /// Connection, Delivery, Receiver, Sender, Session,
128 /// };
129 /// use fe2o3_amqp_ws::WebSocketStream;
130 ///
131 /// #[tokio::main]
132 /// async fn main() {
133 /// let ws_stream = WebSocketStream::connect("ws://localhost:5673")
134 /// .await
135 /// .unwrap();
136 /// let mut connection = Connection::builder()
137 /// .container_id("connection-1")
138 /// .open_with_stream(ws_stream)
139 /// .await
140 /// .unwrap();
141 ///
142 /// // ...
143 ///
144 /// connection.close().await.unwrap();
145 /// }
146 /// ```
147 #[derive(Debug)]
148 pub struct WebSocketStream<S> {
149 #[pin]
150 inner: S,
151 current_binary: Option<std::io::Cursor<Bytes>>,
152 }
153}
154
155// Reference implementations:
156//
157// - `tokio-rw-stream-sink`
158// - `rw-stream-sink`
159// - `ws_stream_tungstenite`
160impl<S> AsyncRead for WebSocketStream<S>
161where
162 S: Stream<Item = Result<WsMessage, tungstenite::Error>>,
163{
164 fn poll_read(
165 self: std::pin::Pin<&mut Self>,
166 cx: &mut std::task::Context<'_>,
167 buf: &mut tokio::io::ReadBuf<'_>,
168 ) -> std::task::Poll<std::io::Result<()>> {
169 let this = self.project();
170 let mut inner = this.inner;
171
172 let (item_to_copy, len_to_read) = loop {
173 if let Some(cursor) = this.current_binary {
174 let len = cursor.get_ref().len() as u64;
175 let pos = cursor.position();
176 if pos < len {
177 break (cursor, len - pos);
178 }
179 }
180
181 let msg = match ready!(inner.as_mut().poll_next(cx)) {
182 Some(Ok(msg)) => msg,
183 Some(Err(err)) => return Poll::Ready(Err(map_tungstenite_error(err))),
184 None => return Poll::Ready(Ok(())), // EOF
185 };
186
187 match msg.0 {
188 tungstenite::Message::Text(_) => {
189 return Poll::Ready(Err(io::Error::new(
190 io::ErrorKind::InvalidData,
191 "Text messsage is not supported",
192 )))
193 }
194 tungstenite::Message::Binary(buf) => *this.current_binary = Some(Cursor::new(buf)),
195
196 // This is already handled by tungstenite
197 tungstenite::Message::Ping(_) => {}
198 tungstenite::Message::Pong(_) => {}
199
200 // Let tungstenite perform close handshake
201 tungstenite::Message::Close(_) => {}
202
203 // Raw frame. Note, that you’re not going to get this value while reading the message.
204 tungstenite::Message::Frame(_) => unreachable!(),
205 }
206 };
207
208 // Copy it!
209 let len_to_read = buf
210 .remaining()
211 .min(len_to_read.min(usize::MAX as u64) as usize);
212 let unfilled_buf = buf.initialize_unfilled_to(len_to_read);
213 let len = item_to_copy.read(unfilled_buf)?;
214 buf.advance(len);
215 Poll::Ready(Ok(()))
216 }
217}
218
219impl<S> AsyncWrite for WebSocketStream<S>
220where
221 S: Sink<WsMessage, Error = tungstenite::Error>,
222{
223 fn poll_write(
224 self: std::pin::Pin<&mut Self>,
225 cx: &mut std::task::Context<'_>,
226 buf: &[u8],
227 ) -> std::task::Poll<Result<usize, std::io::Error>> {
228 let mut this = self.project();
229 ready!(this.inner.as_mut().poll_ready(cx)).map_err(map_tungstenite_error)?;
230 let n = buf.len();
231 let bin = Bytes::copy_from_slice(buf);
232 let item = tungstenite::Message::binary(bin);
233 let item = WsMessage(item);
234 match this.inner.start_send(item) {
235 Ok(_) => Poll::Ready(Ok(n)),
236 Err(error) => Poll::Ready(Err(map_tungstenite_error(error))),
237 }
238 }
239
240 fn poll_flush(
241 self: std::pin::Pin<&mut Self>,
242 cx: &mut std::task::Context<'_>,
243 ) -> std::task::Poll<Result<(), std::io::Error>> {
244 let this = self.project();
245 this.inner.poll_flush(cx).map_err(map_tungstenite_error)
246 }
247
248 fn poll_shutdown(
249 self: std::pin::Pin<&mut Self>,
250 cx: &mut std::task::Context<'_>,
251 ) -> std::task::Poll<Result<(), std::io::Error>> {
252 let this = self.project();
253 this.inner.poll_close(cx).map_err(map_tungstenite_error)
254 }
255}
256
257fn map_tungstenite_error(error: tungstenite::Error) -> io::Error {
258 match error {
259 tungstenite::Error::ConnectionClosed | tungstenite::Error::AlreadyClosed => {
260 io::ErrorKind::NotConnected.into()
261 }
262 tungstenite::Error::Io(err) => err,
263 tungstenite::Error::Capacity(err) => io::Error::new(io::ErrorKind::InvalidData, err),
264 _ => io::Error::new(io::ErrorKind::Other, error),
265 }
266}