fe2o3_amqp_ws/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![deny(missing_docs, missing_debug_implementations)]
3
4//! WebSocket adapter for AMQP 1.0 websocket binding
5//!
6//! This provides a thin wrapper over `tokio_tungstenite::WebSocketStream`, and the wrapper performs
7//! the WebSocket handshake with the "Sec-WebSocket-Protocol" HTTP header set to "amqp".
8//!
9//! The wrapper type [`WebSocketStream`] could also be used for non-AMQP applications; however, the
10//! user should establish websocket stream with raw `tokio_tungstenite` API and then wrap the stream
11//! with the wrapper by `fe2o3_amqp_ws::WebSocketStream::from(ws_stream)`.
12//!
13//! # Feature flags
14//!
15//! ```toml
16//! default = []
17//! ```
18//!
19//! | Feature | Description |
20//! |---------|-------------|
21//! | `native-tls` | Enables "tokio-tungstenite/native-tls" |
22//! | `native-tls-vendored` | Enables "tokio-tungstenite/native-tls-vendored" |
23//! | `rustls-tls-native-roots` | Enables "tokio-tungstenite/rustls-tls-native-roots" |
24//! | `rustls-tls-webpki-roots` | Enables "tokio-tungstenite/rustls-tls-webpki-roots" |
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! use fe2o3_amqp::{
30//!     types::{messaging::Outcome, primitives::Value},
31//!     Connection, Delivery, Receiver, Sender, Session,
32//! };
33//! use fe2o3_amqp_ws::WebSocketStream;
34//!
35//! #[tokio::main]
36//! async fn main() {
37//!     let ws_stream = WebSocketStream::connect("ws://localhost:5673")
38//!         .await
39//!         .unwrap();
40//!     let mut connection = Connection::builder()
41//!         .container_id("connection-1")
42//!         .open_with_stream(ws_stream)
43//!         .await
44//!         .unwrap();
45//!     let mut session = Session::begin(&mut connection).await.unwrap();
46//!
47//!     let mut sender = Sender::attach(&mut session, "rust-sender-link-1", "q1")
48//!         .await
49//!         .unwrap();
50//!     let mut receiver = Receiver::attach(&mut session, "rust-recver-1", "q1")
51//!         .await
52//!         .unwrap();
53//!
54//!     let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
55//!
56//!     let delivery: Delivery<Value> = receiver.recv().await.unwrap();
57//!     receiver.accept(&delivery).await.unwrap();
58//!
59//!     let outcome: Outcome = fut.await.unwrap();
60//!     outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
61//!
62//!     sender.close().await.unwrap();
63//!     receiver.close().await.unwrap();
64//!     session.end().await.unwrap();
65//!     connection.close().await.unwrap();
66//! }
67//! ```
68//!
69//! ## WebAssembly support
70//!
71//! Experimental support for `wasm32-unknown-unknown` target has been added since "0.3.0" and uses a
72//! `web_sys::WebSocket` internally. An example of this can be found in
73//! [examples/wasm32-in-browser](https://github.com/minghuaw/fe2o3-amqp/tree/main/examples/wasm32-in-browser).
74
75use std::{
76    io::{self, Cursor, Read},
77    task::Poll,
78};
79
80use bytes::Bytes;
81use futures_util::{ready, Sink, Stream};
82use pin_project_lite::pin_project;
83use tokio::io::{AsyncRead, AsyncWrite};
84
85mod error;
86pub use error::Error;
87
88#[macro_use]
89mod macros;
90
91cfg_not_wasm32! {
92    pub mod native;
93}
94
95cfg_wasm32! {
96    pub mod wasm;
97}
98
99const SEC_WEBSOCKET_PROTOCOL_AMQP: &str = "amqp";
100
101/// This a wrapper around `tungstenite::Message`
102#[derive(Debug, Eq, PartialEq, Clone)]
103pub struct WsMessage(pub tungstenite::Message);
104
105pin_project! {
106    /// A wrapper over [`tokio_tungstenite::WebSoccketStream`] that implements
107    /// `tokio::io::AsyncRead` and `tokio::io::AsyncWrite`.
108    ///
109    /// The public APIs all internally call their equivalent in `tokio_tungstenite` and checks the
110    /// response. The only difference is that the APIs will set "Sec-WebSocket-Protocol" HTTP header
111    /// to "amqp".
112    ///
113    /// The "Sec-WebSocket-Protocol" HTTP header identifies the WebSocket subprotocol. For this
114    /// AMQP WebSocket binding, the value MUST be set to the US-ASCII text string “amqp” which
115    /// refers to the 1.0 version of the AMQP 1.0 or greater, with version negotiation as
116    /// defined by AMQP 1.0.
117    ///
118    /// If the Client does not receive a response with HTTP status code 101 and an HTTP
119    /// Sec-WebSocket-Protocol equal to the US-ASCII text string "amqp" then the Client MUST close
120    /// the socket connection
121    ///
122    /// # Example
123    ///
124    /// ```rust,no_run
125    /// use fe2o3_amqp::{
126    ///     types::{messaging::Outcome, primitives::Value},
127    ///     Connection, Delivery, Receiver, Sender, Session,
128    /// };
129    /// use fe2o3_amqp_ws::WebSocketStream;
130    ///
131    /// #[tokio::main]
132    /// async fn main() {
133    ///     let ws_stream = WebSocketStream::connect("ws://localhost:5673")
134    ///         .await
135    ///         .unwrap();
136    ///     let mut connection = Connection::builder()
137    ///         .container_id("connection-1")
138    ///         .open_with_stream(ws_stream)
139    ///         .await
140    ///         .unwrap();
141    ///
142    ///     // ...
143    ///
144    ///     connection.close().await.unwrap();
145    /// }
146    /// ```
147    #[derive(Debug)]
148    pub struct WebSocketStream<S> {
149        #[pin]
150        inner: S,
151        current_binary: Option<std::io::Cursor<Bytes>>,
152    }
153}
154
155// Reference implementations:
156//
157// - `tokio-rw-stream-sink`
158// - `rw-stream-sink`
159// - `ws_stream_tungstenite`
160impl<S> AsyncRead for WebSocketStream<S>
161where
162    S: Stream<Item = Result<WsMessage, tungstenite::Error>>,
163{
164    fn poll_read(
165        self: std::pin::Pin<&mut Self>,
166        cx: &mut std::task::Context<'_>,
167        buf: &mut tokio::io::ReadBuf<'_>,
168    ) -> std::task::Poll<std::io::Result<()>> {
169        let this = self.project();
170        let mut inner = this.inner;
171
172        let (item_to_copy, len_to_read) = loop {
173            if let Some(cursor) = this.current_binary {
174                let len = cursor.get_ref().len() as u64;
175                let pos = cursor.position();
176                if pos < len {
177                    break (cursor, len - pos);
178                }
179            }
180
181            let msg = match ready!(inner.as_mut().poll_next(cx)) {
182                Some(Ok(msg)) => msg,
183                Some(Err(err)) => return Poll::Ready(Err(map_tungstenite_error(err))),
184                None => return Poll::Ready(Ok(())), // EOF
185            };
186
187            match msg.0 {
188                tungstenite::Message::Text(_) => {
189                    return Poll::Ready(Err(io::Error::new(
190                        io::ErrorKind::InvalidData,
191                        "Text messsage is not supported",
192                    )))
193                }
194                tungstenite::Message::Binary(buf) => *this.current_binary = Some(Cursor::new(buf)),
195
196                // This is already handled by tungstenite
197                tungstenite::Message::Ping(_) => {}
198                tungstenite::Message::Pong(_) => {}
199
200                // Let tungstenite perform close handshake
201                tungstenite::Message::Close(_) => {}
202
203                // Raw frame. Note, that you’re not going to get this value while reading the message.
204                tungstenite::Message::Frame(_) => unreachable!(),
205            }
206        };
207
208        // Copy it!
209        let len_to_read = buf
210            .remaining()
211            .min(len_to_read.min(usize::MAX as u64) as usize);
212        let unfilled_buf = buf.initialize_unfilled_to(len_to_read);
213        let len = item_to_copy.read(unfilled_buf)?;
214        buf.advance(len);
215        Poll::Ready(Ok(()))
216    }
217}
218
219impl<S> AsyncWrite for WebSocketStream<S>
220where
221    S: Sink<WsMessage, Error = tungstenite::Error>,
222{
223    fn poll_write(
224        self: std::pin::Pin<&mut Self>,
225        cx: &mut std::task::Context<'_>,
226        buf: &[u8],
227    ) -> std::task::Poll<Result<usize, std::io::Error>> {
228        let mut this = self.project();
229        ready!(this.inner.as_mut().poll_ready(cx)).map_err(map_tungstenite_error)?;
230        let n = buf.len();
231        let bin = Bytes::copy_from_slice(buf);
232        let item = tungstenite::Message::binary(bin);
233        let item = WsMessage(item);
234        match this.inner.start_send(item) {
235            Ok(_) => Poll::Ready(Ok(n)),
236            Err(error) => Poll::Ready(Err(map_tungstenite_error(error))),
237        }
238    }
239
240    fn poll_flush(
241        self: std::pin::Pin<&mut Self>,
242        cx: &mut std::task::Context<'_>,
243    ) -> std::task::Poll<Result<(), std::io::Error>> {
244        let this = self.project();
245        this.inner.poll_flush(cx).map_err(map_tungstenite_error)
246    }
247
248    fn poll_shutdown(
249        self: std::pin::Pin<&mut Self>,
250        cx: &mut std::task::Context<'_>,
251    ) -> std::task::Poll<Result<(), std::io::Error>> {
252        let this = self.project();
253        this.inner.poll_close(cx).map_err(map_tungstenite_error)
254    }
255}
256
257fn map_tungstenite_error(error: tungstenite::Error) -> io::Error {
258    match error {
259        tungstenite::Error::ConnectionClosed | tungstenite::Error::AlreadyClosed => {
260            io::ErrorKind::NotConnected.into()
261        }
262        tungstenite::Error::Io(err) => err,
263        tungstenite::Error::Capacity(err) => io::Error::new(io::ErrorKind::InvalidData, err),
264        _ => io::Error::new(io::ErrorKind::Other, error),
265    }
266}