soket/
lib.rs

1// Copyright (c) 2019 Parity Technologies (UK) Ltd.
2// Copyright (c) 2016 twist developers
3//
4// Licensed under the Apache License, Version 2.0
5// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
6// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. All files in the project carrying such notice may not be copied,
8// modified, or distributed except according to those terms.
9
10//! An implementation of the [RFC 6455][rfc6455] websocket protocol.
11//!
12//! To begin a websocket connection one first needs to perform a [handshake],
13//! either as [client] or [server], in order to upgrade from HTTP.
14//! Once successful, the client or server can transition to a connection,
15//! i.e. a [Sender]/[Receiver] pair and send and receive textual or
16//! binary data.
17//!
18//! **Note**: While it is possible to only receive websocket messages it is
19//! not possible to only send websocket messages. Receiving data is required
20//! in order to react to control frames such as PING or CLOSE. While those will be
21//! answered transparently they have to be received in the first place, so
22//! calling [`connection::Receiver::receive`] is imperative.
23//!
24//! **Note**: None of the `async` methods are safe to cancel so their `Future`s
25//! must not be dropped unless they return `Poll::Ready`.
26//!
27//! # Client example
28//!
29//! ```no_run
30//! # use tokio_util::compat::Tokio02AsyncReadCompatExt;
31//! # async fn doc() -> Result<(), soket::BoxedError> {
32//! use soket::handshake::{Client, ServerResponse};
33//!
34//! // First, we need to establish a TCP connection.
35//! let socket = tokio::net::TcpStream::connect("...").await?;
36//!
37//! // Then we configure the client handshake.
38//! let mut client = Client::new(socket.compat(), "...", "/");
39//!
40//! // And finally we perform the handshake and handle the result.
41//! let (mut sender, mut receiver) = match client.handshake().await? {
42//!     ServerResponse::Accepted { .. } => client.into_builder().finish(),
43//!     ServerResponse::Redirect { status_code, location } => unimplemented!("follow location URL"),
44//!     ServerResponse::Rejected { status_code } => unimplemented!("handle failure")
45//! };
46//!
47//! // Over the established websocket connection we can send
48//! sender.send_text("some text").await?;
49//! sender.send_text("some more text").await?;
50//! sender.flush().await?;
51//!
52//! // ... and receive data.
53//! let mut data = Vec::new();
54//! receiver.receive_data(&mut data).await?;
55//!
56//! # Ok(())
57//! # }
58//!
59//! ```
60//!
61//! # Server example
62//!
63//! ```no_run
64//! # use tokio_util::compat::Tokio02AsyncReadCompatExt;
65//! # use tokio::stream::StreamExt;
66//! # async fn doc() -> Result<(), soket::BoxedError> {
67//! use soket::{handshake::{Server, ClientRequest, server::Response}};
68//!
69//! // First, we listen for incoming connections.
70//! let mut listener = tokio::net::TcpListener::bind("...").await?;
71//! let mut incoming = listener.incoming();
72//!
73//! while let Some(socket) = incoming.next().await {
74//!     // For each incoming connection we perform a handshake.
75//!     let mut server = Server::new(socket?.compat());
76//!
77//!     let websocket_key = {
78//!         let req = server.receive_request().await?;
79//!         req.into_key()
80//!     };
81//!
82//!     // Here we accept the client unconditionally.
83//!     let accept = Response::Accept { key: &websocket_key, protocol: None };
84//!     server.send_response(&accept).await?;
85//!
86//!     // And we can finally transition to a websocket connection.
87//!     let (mut sender, mut receiver) = server.into_builder().finish();
88//!
89//!     let mut data = Vec::new();
90//!     let data_type = receiver.receive_data(&mut data).await?;
91//!
92//!     if data_type.is_text() {
93//!         sender.send_text(std::str::from_utf8(&data)?).await?
94//!     } else {
95//!         sender.send_binary(&data).await?
96//!     }
97//!
98//!     sender.close().await?
99//! }
100//!
101//! # Ok(())
102//! # }
103//!
104//! ```
105//! [client]: handshake::Client
106//! [server]: handshake::Server
107//! [Sender]: connection::Sender
108//! [Receiver]: connection::Receiver
109//! [rfc6455]: https://tools.ietf.org/html/rfc6455
110//! [handshake]: https://tools.ietf.org/html/rfc6455#section-4
111
112#![forbid(unsafe_code)]
113
114pub mod base;
115pub mod data;
116pub mod extension;
117pub mod handshake;
118pub mod connection;
119
120use bytes::BytesMut;
121use futures::io::{AsyncRead, AsyncReadExt};
122use std::io;
123
124pub use connection::{Mode, Receiver, Sender};
125pub use data::{Data, Incoming};
126
127pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
128
129/// A parsing result.
130#[derive(Debug, Clone)]
131pub enum Parsing<T, N = ()> {
132    /// Parsing completed.
133    Done {
134        /// The parsed value.
135        value: T,
136        /// The offset into the byte slice that has been consumed.
137        offset: usize
138    },
139    /// Parsing is incomplete and needs more data.
140    NeedMore(N)
141}
142
143/// A buffer type used for implementing `Extension`s.
144#[derive(Debug)]
145pub enum Storage<'a> {
146    /// A read-only shared byte slice.
147    Shared(&'a [u8]),
148    /// A mutable byte slice.
149    Unique(&'a mut [u8]),
150    /// An owned byte buffer.
151    Owned(Vec<u8>)
152}
153
154impl AsRef<[u8]> for Storage<'_> {
155    fn as_ref(&self) -> &[u8] {
156        match self {
157            Storage::Shared(d) => d,
158            Storage::Unique(d) => d,
159            Storage::Owned(b) => b.as_ref()
160        }
161    }
162}
163
164/// Helper function to allow casts from `usize` to `u64` only on platforms
165/// where the sizes are guaranteed to fit.
166#[cfg(any(target_pointer_width = "32", target_pointer_width = "64"))]
167const fn as_u64(a: usize) -> u64 {
168    a as u64
169}
170
171/// Fill the buffer from the given `AsyncRead` impl with up to `max` bytes.
172async fn read<R>(reader: &mut R, dest: &mut BytesMut, max: usize) -> io::Result<()>
173where
174    R: AsyncRead + Unpin
175{
176    let i = dest.len();
177    dest.resize(i + max, 0u8);
178    let n = reader.read(&mut dest[i ..]).await?;
179    dest.truncate(i + n);
180    if n == 0 {
181        return Err(io::ErrorKind::UnexpectedEof.into())
182    }
183    log::trace!("read {} bytes", n);
184    Ok(())
185}
186