Skip to main content

futures_codec2/
lib.rs

1#![doc(html_root_url = "https://docs.rs/futures_codec/0.6.0")]
2#![allow(clippy::needless_doctest_main)]
3#![warn(
4    missing_debug_implementations,
5    missing_docs,
6    rust_2018_idioms,
7    unreachable_pub
8)]
9#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
10#![doc(test(
11    no_crate_inject,
12    attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
13))]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15
16//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink
17//!
18//! Raw I/O objects work with byte sequences, but higher-level code usually
19//! wants to batch these into meaningful chunks, called "frames".
20//!
21//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
22//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
23//! Framed streams are also known as transports.
24//!
25//! # The Decoder trait
26//!
27//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
28//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
29//! how sequences of bytes are turned into a sequence of frames, and to
30//! determine where the boundaries between frames are.  The job of the
31//! `FramedRead` is to repeatedly switch between reading more data from the IO
32//! resource, and asking the decoder whether we have received enough data to
33//! decode another frame of data.
34//!
35//! The main method on the `Decoder` trait is the [`decode`] method. This method
36//! takes as argument the data that has been read so far, and when it is called,
37//! it will be in one of the following situations:
38//!
39//!  1. The buffer contains less than a full frame.
40//!  2. The buffer contains exactly a full frame.
41//!  3. The buffer contains more than a full frame.
42//!
43//! In the first situation, the decoder should return `Ok(None)`.
44//!
45//! In the second situation, the decoder should clear the provided buffer and
46//! return `Ok(Some(the_decoded_frame))`.
47//!
48//! In the third situation, the decoder should use a method such as [`split_to`]
49//! or [`advance`] to modify the buffer such that the frame is removed from the
50//! buffer, but any data in the buffer after that frame should still remain in
51//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
52//! this case.
53//!
54//! Finally the decoder may return an error if the data is invalid in some way.
55//! The decoder should _not_ return an error just because it has yet to receive
56//! a full frame.
57//!
58//! It is guaranteed that, from one call to `decode` to another, the provided
59//! buffer will contain the exact same data as before, except that if more data
60//! has arrived through the IO resource, that data will have been appended to
61//! the buffer.  This means that reading frames from a `FramedRead` is
62//! essentially equivalent to the following loop:
63//!
64//! ```no_run
65//! use futures::io::AsyncBufReadExt;
66//! # // This uses async_stream to create an example that compiles.
67//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
68//! # use futures_codec2::Decoder;
69//! # let mut decoder = futures_codec2::BytesCodec::new();
70//! # let io_resource = &mut &[0u8, 1, 2, 3][..];
71//!
72//! let mut buf = bytes::BytesMut::new();
73//! loop {
74//!     // The read_buf call will append to buf rather than overwrite existing data.
75//!     let len = {
76//!         let rbuf = io_resource.fill_buf().await?;
77//!         buf.extend_from_slice(rbuf);
78//!         rbuf.len()
79//!     };
80//!     io_resource.consume_unpin(len);
81//!
82//!     if len == 0 {
83//!         while let Some(frame) = decoder.decode_eof(&mut buf)? {
84//!             yield frame;
85//!         }
86//!         break;
87//!     }
88//!
89//!     while let Some(frame) = decoder.decode(&mut buf)? {
90//!         yield frame;
91//!     }
92//! }
93//! # }}
94//! ```
95//! The example above uses `yield` whenever the `Stream` produces an item.
96//!
97//! ## Example decoder
98//!
99//! As an example, consider a protocol that can be used to send strings where
100//! each frame is a four byte integer that contains the length of the frame,
101//! followed by that many bytes of string data. The decoder fails with an error
102//! if the string data is not valid utf-8 or too long.
103//!
104//! Such a decoder can be written like this:
105//! ```
106//! use futures_codec2::Decoder;
107//! use bytes::{BytesMut, Buf};
108//!
109//! struct MyStringDecoder {}
110//!
111//! const MAX: usize = 8 * 1024 * 1024;
112//!
113//! impl Decoder for MyStringDecoder {
114//!     type Item = String;
115//!     type Error = std::io::Error;
116//!
117//!     fn decode(
118//!         &mut self,
119//!         src: &mut BytesMut
120//!     ) -> Result<Option<Self::Item>, Self::Error> {
121//!         if src.len() < 4 {
122//!             // Not enough data to read length marker.
123//!             return Ok(None);
124//!         }
125//!
126//!         // Read length marker.
127//!         let mut length_bytes = [0u8; 4];
128//!         length_bytes.copy_from_slice(&src[..4]);
129//!         let length = u32::from_le_bytes(length_bytes) as usize;
130//!
131//!         // Check that the length is not too large to avoid a denial of
132//!         // service attack where the server runs out of memory.
133//!         if length > MAX {
134//!             return Err(std::io::Error::new(
135//!                 std::io::ErrorKind::InvalidData,
136//!                 format!("Frame of length {} is too large.", length)
137//!             ));
138//!         }
139//!
140//!         if src.len() < 4 + length {
141//!             // The full string has not yet arrived.
142//!             //
143//!             // We reserve more space in the buffer. This is not strictly
144//!             // necessary, but is a good idea performance-wise.
145//!             src.reserve(4 + length - src.len());
146//!
147//!             // We inform the Framed that we need more bytes to form the next
148//!             // frame.
149//!             return Ok(None);
150//!         }
151//!
152//!         // Use advance to modify src such that it no longer contains
153//!         // this frame.
154//!         let data = src[4..4 + length].to_vec();
155//!         src.advance(4 + length);
156//!
157//!         // Convert the data to a string, or fail if it is not valid utf-8.
158//!         match String::from_utf8(data) {
159//!             Ok(string) => Ok(Some(string)),
160//!             Err(utf8_error) => {
161//!                 Err(std::io::Error::new(
162//!                     std::io::ErrorKind::InvalidData,
163//!                     utf8_error.utf8_error(),
164//!                 ))
165//!             },
166//!         }
167//!     }
168//! }
169//! ```
170//!
171//! # The Encoder trait
172//!
173//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
174//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
175//! specify how frames are turned into a sequences of bytes.  The job of the
176//! `FramedWrite` is to take the resulting sequence of bytes and write it to the
177//! IO resource.
178//!
179//! The main method on the `Encoder` trait is the [`encode`] method. This method
180//! takes an item that is being written, and a buffer to write the item to. The
181//! buffer may already contain data, and in this case, the encoder should append
182//! the new frame the to buffer rather than overwrite the existing data.
183//!
184//! It is guaranteed that, from one call to `encode` to another, the provided
185//! buffer will contain the exact same data as before, except that some of the
186//! data may have been removed from the front of the buffer. Writing to a
187//! `FramedWrite` is essentially equivalent to the following loop:
188//!
189//! ```no_run
190//! use futures::future::FutureExt;
191//! use futures::io::AsyncWriteExt;
192//! use bytes::Buf; // for advance
193//! # use futures_codec2::Encoder;
194//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
195//! # async fn no_more_frames() { }
196//! # fn main() -> std::io::Result<()> {
197//! # tokio_test::block_on(async {
198//! # let mut io_resource = futures::io::sink();
199//! # let mut encoder = futures_codec2::BytesCodec::new();
200//!
201//! const MAX: usize = 8192;
202//!
203//! let mut buf = bytes::BytesMut::new();
204//! loop {
205//!     futures::select! {
206//!         num_written = io_resource.write(&buf).fuse() => {
207//!             if !buf.is_empty() {
208//!                 buf.advance(num_written?);
209//!             }
210//!         },
211//!         frame = next_frame().fuse() => {
212//!             if buf.len() < MAX {
213//!                 encoder.encode(frame, &mut buf)?;
214//!             }
215//!         },
216//!         _ = no_more_frames().fuse() => {
217//!             io_resource.write_all(&buf).await?;
218//!             io_resource.close().await?;
219//!             return Ok(());
220//!         },
221//!     }
222//! }
223//! # })}
224//! ```
225//! Here the `next_frame` method corresponds to any frames you write to the
226//! `FramedWrite`. The `no_more_frames` method corresponds to closing the
227//! `FramedWrite` with [`SinkExt::close`].
228//!
229//! ## Example encoder
230//!
231//! As an example, consider a protocol that can be used to send strings where
232//! each frame is a four byte integer that contains the length of the frame,
233//! followed by that many bytes of string data. The encoder will fail if the
234//! string is too long.
235//!
236//! Such an encoder can be written like this:
237//! ```
238//! use futures_codec2::Encoder;
239//! use bytes::BytesMut;
240//!
241//! struct MyStringEncoder {}
242//!
243//! const MAX: usize = 8 * 1024 * 1024;
244//!
245//! impl Encoder<String> for MyStringEncoder {
246//!     type Error = std::io::Error;
247//!
248//!     fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
249//!         // Don't send a string if it is longer than the other end will
250//!         // accept.
251//!         if item.len() > MAX {
252//!             return Err(std::io::Error::new(
253//!                 std::io::ErrorKind::InvalidData,
254//!                 format!("Frame of length {} is too large.", item.len())
255//!             ));
256//!         }
257//!
258//!         // Convert the length into a byte array.
259//!         // The cast to u32 cannot overflow due to the length check above.
260//!         let len_slice = u32::to_le_bytes(item.len() as u32);
261//!
262//!         // Reserve space in the buffer.
263//!         dst.reserve(4 + item.len());
264//!
265//!         // Write the length and string to the buffer.
266//!         dst.extend_from_slice(&len_slice);
267//!         dst.extend_from_slice(item.as_bytes());
268//!         Ok(())
269//!     }
270//! }
271//! ```
272//!
273//! [`AsyncRead`]: futures_io::AsyncRead
274//! [`AsyncWrite`]: futures_io::AsyncWrite
275//! [`Stream`]: futures_core::stream::Stream
276//! [`Sink`]: futures_sink::Sink
277//! [`SinkExt::close`]: fn@futures_util::sink::SinkExt::close
278//! [`FramedRead`]: struct@crate::FramedRead
279//! [`FramedWrite`]: struct@crate::FramedWrite
280//! [`Framed`]: struct@crate::Framed
281//! [`Decoder`]: trait@crate::Decoder
282//! [`decode`]: fn@crate::Decoder::decode
283//! [`encode`]: fn@crate::Encoder::encode
284//! [`split_to`]: fn@bytes::BytesMut::split_to
285//! [`advance`]: fn@bytes::Buf::advance
286
287#[macro_use]
288mod cfg;
289
290cfg_json! {
291    mod json_codec;
292    pub use self::json_codec::{JsonCodec, JsonCodecError};
293}
294
295cfg_cbor! {
296    mod cbor_codec;
297    pub use self::cbor_codec::CborCodec;
298}
299
300mod bytes_codec;
301pub use self::bytes_codec::BytesCodec;
302
303mod decoder;
304pub use self::decoder::Decoder;
305
306mod encoder;
307pub use self::encoder::Encoder;
308
309mod framed_impl;
310pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
311
312mod framed;
313pub use self::framed::{Framed, FramedParts};
314
315mod framed_read;
316pub use self::framed_read::FramedRead;
317
318mod framed_write;
319pub use self::framed_write::FramedWrite;
320
321pub mod length_delimited;
322pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
323
324mod lines_codec;
325pub use self::lines_codec::{LinesCodec, LinesCodecError};