futures_codec2/lib.rs
1#![doc(html_root_url = "https://docs.rs/futures_codec/0.6.0")]
2#![allow(clippy::needless_doctest_main)]
3#![warn(
4 missing_debug_implementations,
5 missing_docs,
6 rust_2018_idioms,
7 unreachable_pub
8)]
9#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
10#![doc(test(
11 no_crate_inject,
12 attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
13))]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15
16//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink
17//!
18//! Raw I/O objects work with byte sequences, but higher-level code usually
19//! wants to batch these into meaningful chunks, called "frames".
20//!
21//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
22//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
23//! Framed streams are also known as transports.
24//!
25//! # The Decoder trait
26//!
27//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
28//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
29//! how sequences of bytes are turned into a sequence of frames, and to
30//! determine where the boundaries between frames are. The job of the
31//! `FramedRead` is to repeatedly switch between reading more data from the IO
32//! resource, and asking the decoder whether we have received enough data to
33//! decode another frame of data.
34//!
35//! The main method on the `Decoder` trait is the [`decode`] method. This method
36//! takes as argument the data that has been read so far, and when it is called,
37//! it will be in one of the following situations:
38//!
39//! 1. The buffer contains less than a full frame.
40//! 2. The buffer contains exactly a full frame.
41//! 3. The buffer contains more than a full frame.
42//!
43//! In the first situation, the decoder should return `Ok(None)`.
44//!
45//! In the second situation, the decoder should clear the provided buffer and
46//! return `Ok(Some(the_decoded_frame))`.
47//!
48//! In the third situation, the decoder should use a method such as [`split_to`]
49//! or [`advance`] to modify the buffer such that the frame is removed from the
50//! buffer, but any data in the buffer after that frame should still remain in
51//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
52//! this case.
53//!
54//! Finally the decoder may return an error if the data is invalid in some way.
55//! The decoder should _not_ return an error just because it has yet to receive
56//! a full frame.
57//!
58//! It is guaranteed that, from one call to `decode` to another, the provided
59//! buffer will contain the exact same data as before, except that if more data
60//! has arrived through the IO resource, that data will have been appended to
61//! the buffer. This means that reading frames from a `FramedRead` is
62//! essentially equivalent to the following loop:
63//!
64//! ```no_run
65//! use futures::io::AsyncBufReadExt;
66//! # // This uses async_stream to create an example that compiles.
67//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
68//! # use futures_codec2::Decoder;
69//! # let mut decoder = futures_codec2::BytesCodec::new();
70//! # let io_resource = &mut &[0u8, 1, 2, 3][..];
71//!
72//! let mut buf = bytes::BytesMut::new();
73//! loop {
74//! // The read_buf call will append to buf rather than overwrite existing data.
75//! let len = {
76//! let rbuf = io_resource.fill_buf().await?;
77//! buf.extend_from_slice(rbuf);
78//! rbuf.len()
79//! };
80//! io_resource.consume_unpin(len);
81//!
82//! if len == 0 {
83//! while let Some(frame) = decoder.decode_eof(&mut buf)? {
84//! yield frame;
85//! }
86//! break;
87//! }
88//!
89//! while let Some(frame) = decoder.decode(&mut buf)? {
90//! yield frame;
91//! }
92//! }
93//! # }}
94//! ```
95//! The example above uses `yield` whenever the `Stream` produces an item.
96//!
97//! ## Example decoder
98//!
99//! As an example, consider a protocol that can be used to send strings where
100//! each frame is a four byte integer that contains the length of the frame,
101//! followed by that many bytes of string data. The decoder fails with an error
102//! if the string data is not valid utf-8 or too long.
103//!
104//! Such a decoder can be written like this:
105//! ```
106//! use futures_codec2::Decoder;
107//! use bytes::{BytesMut, Buf};
108//!
109//! struct MyStringDecoder {}
110//!
111//! const MAX: usize = 8 * 1024 * 1024;
112//!
113//! impl Decoder for MyStringDecoder {
114//! type Item = String;
115//! type Error = std::io::Error;
116//!
117//! fn decode(
118//! &mut self,
119//! src: &mut BytesMut
120//! ) -> Result<Option<Self::Item>, Self::Error> {
121//! if src.len() < 4 {
122//! // Not enough data to read length marker.
123//! return Ok(None);
124//! }
125//!
126//! // Read length marker.
127//! let mut length_bytes = [0u8; 4];
128//! length_bytes.copy_from_slice(&src[..4]);
129//! let length = u32::from_le_bytes(length_bytes) as usize;
130//!
131//! // Check that the length is not too large to avoid a denial of
132//! // service attack where the server runs out of memory.
133//! if length > MAX {
134//! return Err(std::io::Error::new(
135//! std::io::ErrorKind::InvalidData,
136//! format!("Frame of length {} is too large.", length)
137//! ));
138//! }
139//!
140//! if src.len() < 4 + length {
141//! // The full string has not yet arrived.
142//! //
143//! // We reserve more space in the buffer. This is not strictly
144//! // necessary, but is a good idea performance-wise.
145//! src.reserve(4 + length - src.len());
146//!
147//! // We inform the Framed that we need more bytes to form the next
148//! // frame.
149//! return Ok(None);
150//! }
151//!
152//! // Use advance to modify src such that it no longer contains
153//! // this frame.
154//! let data = src[4..4 + length].to_vec();
155//! src.advance(4 + length);
156//!
157//! // Convert the data to a string, or fail if it is not valid utf-8.
158//! match String::from_utf8(data) {
159//! Ok(string) => Ok(Some(string)),
160//! Err(utf8_error) => {
161//! Err(std::io::Error::new(
162//! std::io::ErrorKind::InvalidData,
163//! utf8_error.utf8_error(),
164//! ))
165//! },
166//! }
167//! }
168//! }
169//! ```
170//!
171//! # The Encoder trait
172//!
173//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
174//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
175//! specify how frames are turned into a sequences of bytes. The job of the
176//! `FramedWrite` is to take the resulting sequence of bytes and write it to the
177//! IO resource.
178//!
179//! The main method on the `Encoder` trait is the [`encode`] method. This method
180//! takes an item that is being written, and a buffer to write the item to. The
181//! buffer may already contain data, and in this case, the encoder should append
182//! the new frame the to buffer rather than overwrite the existing data.
183//!
184//! It is guaranteed that, from one call to `encode` to another, the provided
185//! buffer will contain the exact same data as before, except that some of the
186//! data may have been removed from the front of the buffer. Writing to a
187//! `FramedWrite` is essentially equivalent to the following loop:
188//!
189//! ```no_run
190//! use futures::future::FutureExt;
191//! use futures::io::AsyncWriteExt;
192//! use bytes::Buf; // for advance
193//! # use futures_codec2::Encoder;
194//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
195//! # async fn no_more_frames() { }
196//! # fn main() -> std::io::Result<()> {
197//! # tokio_test::block_on(async {
198//! # let mut io_resource = futures::io::sink();
199//! # let mut encoder = futures_codec2::BytesCodec::new();
200//!
201//! const MAX: usize = 8192;
202//!
203//! let mut buf = bytes::BytesMut::new();
204//! loop {
205//! futures::select! {
206//! num_written = io_resource.write(&buf).fuse() => {
207//! if !buf.is_empty() {
208//! buf.advance(num_written?);
209//! }
210//! },
211//! frame = next_frame().fuse() => {
212//! if buf.len() < MAX {
213//! encoder.encode(frame, &mut buf)?;
214//! }
215//! },
216//! _ = no_more_frames().fuse() => {
217//! io_resource.write_all(&buf).await?;
218//! io_resource.close().await?;
219//! return Ok(());
220//! },
221//! }
222//! }
223//! # })}
224//! ```
225//! Here the `next_frame` method corresponds to any frames you write to the
226//! `FramedWrite`. The `no_more_frames` method corresponds to closing the
227//! `FramedWrite` with [`SinkExt::close`].
228//!
229//! ## Example encoder
230//!
231//! As an example, consider a protocol that can be used to send strings where
232//! each frame is a four byte integer that contains the length of the frame,
233//! followed by that many bytes of string data. The encoder will fail if the
234//! string is too long.
235//!
236//! Such an encoder can be written like this:
237//! ```
238//! use futures_codec2::Encoder;
239//! use bytes::BytesMut;
240//!
241//! struct MyStringEncoder {}
242//!
243//! const MAX: usize = 8 * 1024 * 1024;
244//!
245//! impl Encoder<String> for MyStringEncoder {
246//! type Error = std::io::Error;
247//!
248//! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
249//! // Don't send a string if it is longer than the other end will
250//! // accept.
251//! if item.len() > MAX {
252//! return Err(std::io::Error::new(
253//! std::io::ErrorKind::InvalidData,
254//! format!("Frame of length {} is too large.", item.len())
255//! ));
256//! }
257//!
258//! // Convert the length into a byte array.
259//! // The cast to u32 cannot overflow due to the length check above.
260//! let len_slice = u32::to_le_bytes(item.len() as u32);
261//!
262//! // Reserve space in the buffer.
263//! dst.reserve(4 + item.len());
264//!
265//! // Write the length and string to the buffer.
266//! dst.extend_from_slice(&len_slice);
267//! dst.extend_from_slice(item.as_bytes());
268//! Ok(())
269//! }
270//! }
271//! ```
272//!
273//! [`AsyncRead`]: futures_io::AsyncRead
274//! [`AsyncWrite`]: futures_io::AsyncWrite
275//! [`Stream`]: futures_core::stream::Stream
276//! [`Sink`]: futures_sink::Sink
277//! [`SinkExt::close`]: fn@futures_util::sink::SinkExt::close
278//! [`FramedRead`]: struct@crate::FramedRead
279//! [`FramedWrite`]: struct@crate::FramedWrite
280//! [`Framed`]: struct@crate::Framed
281//! [`Decoder`]: trait@crate::Decoder
282//! [`decode`]: fn@crate::Decoder::decode
283//! [`encode`]: fn@crate::Encoder::encode
284//! [`split_to`]: fn@bytes::BytesMut::split_to
285//! [`advance`]: fn@bytes::Buf::advance
286
287#[macro_use]
288mod cfg;
289
290cfg_json! {
291 mod json_codec;
292 pub use self::json_codec::{JsonCodec, JsonCodecError};
293}
294
295cfg_cbor! {
296 mod cbor_codec;
297 pub use self::cbor_codec::CborCodec;
298}
299
300mod bytes_codec;
301pub use self::bytes_codec::BytesCodec;
302
303mod decoder;
304pub use self::decoder::Decoder;
305
306mod encoder;
307pub use self::encoder::Encoder;
308
309mod framed_impl;
310pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
311
312mod framed;
313pub use self::framed::{Framed, FramedParts};
314
315mod framed_read;
316pub use self::framed_read::FramedRead;
317
318mod framed_write;
319pub use self::framed_write::FramedWrite;
320
321pub mod length_delimited;
322pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
323
324mod lines_codec;
325pub use self::lines_codec::{LinesCodec, LinesCodecError};