monoio_codec/length_delimited.rs
1// Modified tokio's implementation to use monoio io traits natively
2
3//! Frame a stream of bytes based on a length prefix
4//!
5//! Many protocols delimit their frames by prefacing frame data with a
6//! frame head that specifies the length of the frame. The
7//! `length_delimited` module provides utilities for handling the length
8//! based framing. This allows the consumer to work with entire frames
9//! without having to worry about buffering or other framing logic.
10//!
11//! # Getting started
12//!
13//! If implementing a protocol from scratch, using length delimited framing
14//! is an easy way to get started. [`LengthDelimitedCodec::new()`] will
15//! return a length delimited codec using default configuration values.
16//! This can then be used to construct a framer to adapt a full-duplex
17//! byte stream into a stream of frames.
18//!
19//! ```
20//! use monoio::io::{AsyncReadRent, AsyncWriteRent};
21//! use monoio_codec::{length_delimited::LengthDelimitedCodec, Framed};
22//!
23//! fn bind_transport<T: AsyncReadRent + AsyncWriteRent>(io: T) -> Framed<T, LengthDelimitedCodec> {
24//! Framed::new(io, LengthDelimitedCodec::new())
25//! }
26//! # pub fn main() {}
27//! ```
28//!
29//! The returned transport implements `Sink + Stream` for `BytesMut`. It
30//! encodes the frame with a big-endian `u32` header denoting the frame
31//! payload length:
32//!
33//! ```text
34//! +----------+--------------------------------+
35//! | len: u32 | frame payload |
36//! +----------+--------------------------------+
37//! ```
38//!
39//! Specifically, given the following:
40//!
41//! ```
42//! use bytes::Bytes;
43//! use monoio::io::{sink::Sink, AsyncReadRent, AsyncWriteRent};
44//! use monoio_codec::{length_delimited::LengthDelimitedCodec, Framed};
45//!
46//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
47//! where
48//! T: AsyncReadRent + AsyncWriteRent + Unpin,
49//! {
50//! let mut transport = Framed::new(io, LengthDelimitedCodec::new());
51//! let frame = Bytes::from("hello world");
52//!
53//! transport.send(frame).await?;
54//! Ok(())
55//! }
56//! ```
57//!
58//! The encoded frame will look like this:
59//!
60//! ```text
61//! +---- len: u32 ----+---- data ----+
62//! | \x00\x00\x00\x0b | hello world |
63//! +------------------+--------------+
64//! ```
65//!
66//! # Decoding
67//!
68//! [`FramedRead`] adapts an [`AsyncReadRent`] into a `Stream` of [`BytesMut`],
69//! such that each yielded [`BytesMut`] value contains the contents of an
70//! entire frame. There are many configuration parameters enabling
71//! [`FramedRead`] to handle a wide range of protocols. Here are some
72//! examples that will cover the various options at a high level.
73//!
74//! ## Example 1
75//!
76//! The following will parse a `u16` length field at offset 0, including the
77//! frame head in the yielded `BytesMut`.
78//!
79//! ```
80//! # use monoio::io::AsyncReadRent;
81//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
82//! # fn bind_read<T: AsyncReadRent>(io: T) {
83//! LengthDelimitedCodec::builder()
84//! .length_field_offset(0) // default value
85//! .length_field_type::<u16>()
86//! .length_adjustment(0) // default value
87//! .num_skip(0) // Do not strip frame header
88//! .new_read(io);
89//! # }
90//! # pub fn main() {}
91//! ```
92//!
93//! The following frame will be decoded as such:
94//!
95//! ```text
96//! INPUT DECODED
97//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
98//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world |
99//! +----------+---------------+ +----------+---------------+
100//! ```
101//!
102//! The value of the length field is 11 (`\x0B`) which represents the length
103//! of the payload, `hello world`. By default, [`FramedRead`] assumes that
104//! the length field represents the number of bytes that **follows** the
105//! length field. Thus, the entire frame has a length of 13: 2 bytes for the
106//! frame head + 11 bytes for the payload.
107//!
108//! ## Example 2
109//!
110//! The following will parse a `u16` length field at offset 0, omitting the
111//! frame head in the yielded `BytesMut`.
112//!
113//! ```
114//! # use monoio::io::AsyncReadRent;
115//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
116//! # fn bind_read<T: AsyncReadRent>(io: T) {
117//! LengthDelimitedCodec::builder()
118//! .length_field_offset(0) // default value
119//! .length_field_type::<u16>()
120//! .length_adjustment(0) // default value
121//! // `num_skip` is not needed, the default is to skip
122//! .new_read(io);
123//! # }
124//! # pub fn main() {}
125//! ```
126//!
127//! The following frame will be decoded as such:
128//!
129//! ```text
130//! INPUT DECODED
131//! +-- len ---+--- Payload ---+ +--- Payload ---+
132//! | \x00\x0B | Hello world | --> | Hello world |
133//! +----------+---------------+ +---------------+
134//! ```
135//!
136//! This is similar to the first example, the only difference is that the
137//! frame head is **not** included in the yielded `BytesMut` value.
138//!
139//! ## Example 3
140//!
141//! The following will parse a `u16` length field at offset 0, including the
142//! frame head in the yielded `BytesMut`. In this case, the length field
143//! **includes** the frame head length.
144//!
145//! ```
146//! # use monoio::io::AsyncReadRent;
147//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
148//! # fn bind_read<T: AsyncReadRent>(io: T) {
149//! LengthDelimitedCodec::builder()
150//! .length_field_offset(0) // default value
151//! .length_field_type::<u16>()
152//! .length_adjustment(-2) // size of head
153//! .num_skip(0)
154//! .new_read(io);
155//! # }
156//! # pub fn main() {}
157//! ```
158//!
159//! The following frame will be decoded as such:
160//!
161//! ```text
162//! INPUT DECODED
163//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
164//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world |
165//! +----------+---------------+ +----------+---------------+
166//! ```
167//!
168//! In most cases, the length field represents the length of the payload
169//! only, as shown in the previous examples. However, in some protocols the
170//! length field represents the length of the whole frame, including the
171//! head. In such cases, we specify a negative `length_adjustment` to adjust
172//! the value provided in the frame head to represent the payload length.
173//!
174//! ## Example 4
175//!
176//! The following will parse a 3 byte length field at offset 0 in a 5 byte
177//! frame head, including the frame head in the yielded `BytesMut`.
178//!
179//! ```
180//! # use monoio::io::AsyncReadRent;
181//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
182//! # fn bind_read<T: AsyncReadRent>(io: T) {
183//! LengthDelimitedCodec::builder()
184//! .length_field_offset(0) // default value
185//! .length_field_length(3)
186//! .length_adjustment(2) // remaining head
187//! .num_skip(0)
188//! .new_read(io);
189//! # }
190//! # pub fn main() {}
191//! ```
192//!
193//! The following frame will be decoded as such:
194//!
195//! ```text
196//! INPUT
197//! +---- len -----+- head -+--- Payload ---+
198//! | \x00\x00\x0B | \xCAFE | Hello world |
199//! +--------------+--------+---------------+
200//!
201//! DECODED
202//! +---- len -----+- head -+--- Payload ---+
203//! | \x00\x00\x0B | \xCAFE | Hello world |
204//! +--------------+--------+---------------+
205//! ```
206//!
207//! A more advanced example that shows a case where there is extra frame
208//! head data between the length field and the payload. In such cases, it is
209//! usually desirable to include the frame head as part of the yielded
210//! `BytesMut`. This lets consumers of the length delimited framer to
211//! process the frame head as needed.
212//!
213//! The positive `length_adjustment` value lets `FramedRead` factor in the
214//! additional head into the frame length calculation.
215//!
216//! ## Example 5
217//!
218//! The following will parse a `u16` length field at offset 1 of a 4 byte
219//! frame head. The first byte and the length field will be omitted from the
220//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
221//! included.
222//!
223//! ```
224//! # use monoio::io::AsyncReadRent;
225//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
226//! # fn bind_read<T: AsyncReadRent>(io: T) {
227//! LengthDelimitedCodec::builder()
228//! .length_field_offset(1) // length of hdr1
229//! .length_field_type::<u16>()
230//! .length_adjustment(1) // length of hdr2
231//! .num_skip(3) // length of hdr1 + LEN
232//! .new_read(io);
233//! # }
234//! # pub fn main() {}
235//! ```
236//!
237//! The following frame will be decoded as such:
238//!
239//! ```text
240//! INPUT
241//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
242//! | \xCA | \x00\x0B | \xFE | Hello world |
243//! +--------+----------+--------+---------------+
244//!
245//! DECODED
246//! +- hdr2 -+--- Payload ---+
247//! | \xFE | Hello world |
248//! +--------+---------------+
249//! ```
250//!
251//! The length field is situated in the middle of the frame head. In this
252//! case, the first byte in the frame head could be a version or some other
253//! identifier that is not needed for processing. On the other hand, the
254//! second half of the head is needed.
255//!
256//! `length_field_offset` indicates how many bytes to skip before starting
257//! to read the length field. `length_adjustment` is the number of bytes to
258//! skip starting at the end of the length field. In this case, it is the
259//! second half of the head.
260//!
261//! ## Example 6
262//!
263//! The following will parse a `u16` length field at offset 1 of a 4 byte
264//! frame head. The first byte and the length field will be omitted from the
265//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
266//! included. In this case, the length field **includes** the frame head
267//! length.
268//!
269//! ```
270//! # use monoio::io::AsyncReadRent;
271//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
272//! # fn bind_read<T: AsyncReadRent>(io: T) {
273//! LengthDelimitedCodec::builder()
274//! .length_field_offset(1) // length of hdr1
275//! .length_field_type::<u16>()
276//! .length_adjustment(-3) // length of hdr1 + LEN, negative
277//! .num_skip(3)
278//! .new_read(io);
279//! # }
280//! ```
281//!
282//! The following frame will be decoded as such:
283//!
284//! ```text
285//! INPUT
286//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
287//! | \xCA | \x00\x0F | \xFE | Hello world |
288//! +--------+----------+--------+---------------+
289//!
290//! DECODED
291//! +- hdr2 -+--- Payload ---+
292//! | \xFE | Hello world |
293//! +--------+---------------+
294//! ```
295//!
296//! Similar to the example above, the difference is that the length field
297//! represents the length of the entire frame instead of just the payload.
298//! The length of `hdr1` and `len` must be counted in `length_adjustment`.
299//! Note that the length of `hdr2` does **not** need to be explicitly set
300//! anywhere because it already is factored into the total frame length that
301//! is read from the byte stream.
302//!
303//! ## Example 7
304//!
305//! The following will parse a 3 byte length field at offset 0 in a 4 byte
306//! frame head, excluding the 4th byte from the yielded `BytesMut`.
307//!
308//! ```
309//! # use monoio::io::AsyncReadRent;
310//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
311//! # fn bind_read<T: AsyncReadRent>(io: T) {
312//! LengthDelimitedCodec::builder()
313//! .length_field_offset(0) // default value
314//! .length_field_length(3)
315//! .length_adjustment(0) // default value
316//! .num_skip(4) // skip the first 4 bytes
317//! .new_read(io);
318//! # }
319//! # pub fn main() {}
320//! ```
321//!
322//! The following frame will be decoded as such:
323//!
324//! ```text
325//! INPUT DECODED
326//! +------- len ------+--- Payload ---+ +--- Payload ---+
327//! | \x00\x00\x0B\xFF | Hello world | => | Hello world |
328//! +------------------+---------------+ +---------------+
329//! ```
330//!
331//! A simple example where there are unused bytes between the length field
332//! and the payload.
333//!
334//! # Encoding
335//!
336//! [`FramedWrite`] adapts an [`AsyncWriteRent`] into a `Sink` of [`BytesMut`],
337//! such that each submitted [`BytesMut`] is prefaced by a length field.
338//! There are fewer configuration options than [`FramedRead`]. Given
339//! protocols that have more complex frame heads, an encoder should probably
340//! be written by hand using [`Encoder`].
341//!
342//! Here is a simple example, given a `FramedWrite` with the following
343//! configuration:
344//!
345//! ```
346//! # use monoio::io::AsyncWriteRent;
347//! # use monoio_codec::length_delimited::LengthDelimitedCodec;
348//! # fn write_frame<T: AsyncWriteRent>(io: T) {
349//! # let _ =
350//! LengthDelimitedCodec::builder()
351//! .length_field_type::<u16>()
352//! .new_write(io);
353//! # }
354//! # pub fn main() {}
355//! ```
356//!
357//! A payload of `hello world` will be encoded as:
358//!
359//! ```text
360//! +- len: u16 -+---- data ----+
361//! | \x00\x0b | hello world |
362//! +------------+--------------+
363//! ```
364//!
365//! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new
366//! [`FramedRead`]: struct@FramedRead
367//! [`FramedWrite`]: struct@FramedWrite
368//! [`AsyncReadRent`]: trait@monoio::io::AsyncReadRent
369//! [`AsyncWriteRent`]: trait@monoio::io::AsyncWriteRent
370//! [`Encoder`]: trait@Encoder
371//! [`BytesMut`]: bytes::BytesMut
372
373use std::{
374 cmp,
375 error::Error as StdError,
376 fmt,
377 io::{self, Cursor},
378 mem,
379};
380
381use bytes::{Buf, BufMut, Bytes, BytesMut};
382use monoio::io::{AsyncReadRent, AsyncWriteRent};
383
384use crate::{Decoded, Decoder, Encoder, Framed, FramedRead, FramedWrite};
385
386/// Configure length delimited `LengthDelimitedCodec`s.
387///
388/// `Builder` enables constructing configured length delimited codecs. Note
389/// that not all configuration settings apply to both encoding and decoding. See
390/// the documentation for specific methods for more detail.
391#[derive(Debug, Clone, Copy)]
392pub struct Builder {
393 // Maximum frame length
394 max_frame_len: usize,
395
396 // Number of bytes representing the field length
397 length_field_len: usize,
398
399 // Number of bytes in the header before the length field
400 length_field_offset: usize,
401
402 // Adjust the length specified in the header field by this amount
403 length_adjustment: isize,
404
405 // Total number of bytes to skip before reading the payload, if not set,
406 // `length_field_len + length_field_offset`
407 num_skip: Option<usize>,
408
409 // Length field byte order (little or big endian)
410 length_field_is_big_endian: bool,
411}
412
413/// An error when the number of bytes read is more than max frame length.
414pub struct LengthDelimitedCodecError {
415 _priv: (),
416}
417
418/// A codec for frames delimited by a frame head specifying their lengths.
419///
420/// This allows the consumer to work with entire frames without having to worry
421/// about buffering or other framing logic.
422///
423/// See [module level] documentation for more detail.
424///
425/// [module level]: index.html
426#[derive(Debug, Clone)]
427pub struct LengthDelimitedCodec {
428 // Configuration values
429 builder: Builder,
430
431 // Read state
432 state: DecodeState,
433}
434
435#[derive(Debug, Clone, Copy)]
436enum DecodeState {
437 Head,
438 Data(usize),
439}
440
441// ===== impl LengthDelimitedCodec ======
442
443impl LengthDelimitedCodec {
444 /// Creates a new `LengthDelimitedCodec` with the default configuration values.
445 pub fn new() -> Self {
446 Self {
447 builder: Builder::new(),
448 state: DecodeState::Head,
449 }
450 }
451
452 /// Creates a new length delimited codec builder with default configuration
453 /// values.
454 pub fn builder() -> Builder {
455 Builder::new()
456 }
457
458 /// Returns the current max frame setting
459 ///
460 /// This is the largest size this codec will accept from the wire. Larger
461 /// frames will be rejected.
462 pub fn max_frame_length(&self) -> usize {
463 self.builder.max_frame_len
464 }
465
466 /// Updates the max frame setting.
467 ///
468 /// The change takes effect the next time a frame is decoded. In other
469 /// words, if a frame is currently in process of being decoded with a frame
470 /// size greater than `val` but less than the max frame length in effect
471 /// before calling this function, then the frame will be allowed.
472 pub fn set_max_frame_length(&mut self, val: usize) {
473 self.builder.max_frame_length(val);
474 }
475
476 fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Decoded<usize>> {
477 let head_len = self.builder.num_head_bytes();
478 let field_len = self.builder.length_field_len;
479
480 if src.len() < head_len {
481 // Not enough data
482 return Ok(Decoded::InsufficientAtLeast(head_len));
483 }
484
485 let n = {
486 let mut src = Cursor::new(&mut *src);
487
488 // Skip the required bytes
489 src.advance(self.builder.length_field_offset);
490
491 // match endianness
492 let n = if self.builder.length_field_is_big_endian {
493 src.get_uint(field_len)
494 } else {
495 src.get_uint_le(field_len)
496 };
497
498 if n > self.builder.max_frame_len as u64 {
499 return Err(io::Error::new(
500 io::ErrorKind::InvalidData,
501 LengthDelimitedCodecError { _priv: () },
502 ));
503 }
504
505 // The check above ensures there is no overflow
506 let n = n as usize;
507
508 // Adjust `n` with bounds checking
509 let n = if self.builder.length_adjustment < 0 {
510 n.checked_sub(-self.builder.length_adjustment as usize)
511 } else {
512 n.checked_add(self.builder.length_adjustment as usize)
513 };
514
515 // Error handling
516 match n {
517 Some(n) => n,
518 None => {
519 return Err(io::Error::new(
520 io::ErrorKind::InvalidInput,
521 "provided length would overflow after adjustment",
522 ));
523 }
524 }
525 };
526
527 let num_skip = self.builder.get_num_skip();
528
529 if num_skip > 0 {
530 src.advance(num_skip);
531 }
532
533 // Ensure that the buffer has enough space to read the incoming
534 // payload
535 src.reserve(n);
536
537 Ok(Decoded::Some(n))
538 }
539
540 fn decode_data(&self, n: usize, src: &mut BytesMut) -> Decoded<BytesMut> {
541 // At this point, the buffer has already had the required capacity
542 // reserved. All there is to do is read.
543 if src.len() < n {
544 return Decoded::InsufficientAtLeast(n);
545 }
546
547 Decoded::Some(src.split_to(n))
548 }
549}
550
551impl Decoder for LengthDelimitedCodec {
552 type Item = BytesMut;
553 type Error = io::Error;
554
555 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Decoded<BytesMut>> {
556 let n = match self.state {
557 DecodeState::Head => match self.decode_head(src)? {
558 Decoded::Some(n) => {
559 self.state = DecodeState::Data(n);
560 n
561 }
562 Decoded::Insufficient => return Ok(Decoded::Insufficient),
563 Decoded::InsufficientAtLeast(n) => return Ok(Decoded::InsufficientAtLeast(n)),
564 },
565 DecodeState::Data(n) => n,
566 };
567
568 match self.decode_data(n, src) {
569 Decoded::Some(data) => {
570 // Update the decode state
571 self.state = DecodeState::Head;
572
573 // Make sure the buffer has enough space to read the next head
574 src.reserve(self.builder.num_head_bytes());
575
576 Ok(Decoded::Some(data))
577 }
578 Decoded::Insufficient => Ok(Decoded::Insufficient),
579 Decoded::InsufficientAtLeast(n) => Ok(Decoded::InsufficientAtLeast(n)),
580 }
581 }
582}
583
584impl Encoder<Bytes> for LengthDelimitedCodec {
585 type Error = io::Error;
586
587 fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
588 let n = data.len();
589
590 if n > self.builder.max_frame_len {
591 return Err(io::Error::new(
592 io::ErrorKind::InvalidInput,
593 LengthDelimitedCodecError { _priv: () },
594 ));
595 }
596
597 // Adjust `n` with bounds checking
598 let n = if self.builder.length_adjustment < 0 {
599 n.checked_add(-self.builder.length_adjustment as usize)
600 } else {
601 n.checked_sub(self.builder.length_adjustment as usize)
602 };
603
604 let n = n.ok_or_else(|| {
605 io::Error::new(
606 io::ErrorKind::InvalidInput,
607 "provided length would overflow after adjustment",
608 )
609 })?;
610
611 // Reserve capacity in the destination buffer to fit the frame and
612 // length field (plus adjustment).
613 dst.reserve(self.builder.length_field_len + n);
614
615 if self.builder.length_field_is_big_endian {
616 dst.put_uint(n as u64, self.builder.length_field_len);
617 } else {
618 dst.put_uint_le(n as u64, self.builder.length_field_len);
619 }
620
621 // Write the frame to the buffer
622 dst.extend_from_slice(&data[..]);
623
624 Ok(())
625 }
626}
627
628impl Default for LengthDelimitedCodec {
629 fn default() -> Self {
630 Self::new()
631 }
632}
633
634// ===== impl Builder =====
635
636mod builder {
637 /// Types that can be used with `Builder::length_field_type`.
638 pub trait LengthFieldType {}
639
640 impl LengthFieldType for u8 {}
641 impl LengthFieldType for u16 {}
642 impl LengthFieldType for u32 {}
643 impl LengthFieldType for u64 {}
644
645 #[cfg(any(
646 target_pointer_width = "8",
647 target_pointer_width = "16",
648 target_pointer_width = "32",
649 target_pointer_width = "64",
650 ))]
651 impl LengthFieldType for usize {}
652}
653
654impl Builder {
655 /// Creates a new length delimited codec builder with default configuration
656 /// values.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// # use monoio::io::AsyncReadRent;
662 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
663 ///
664 /// # fn bind_read<T: AsyncReadRent>(io: T) {
665 /// LengthDelimitedCodec::builder()
666 /// .length_field_offset(0)
667 /// .length_field_type::<u16>()
668 /// .length_adjustment(0)
669 /// .num_skip(0)
670 /// .new_read(io);
671 /// # }
672 /// # pub fn main() {}
673 /// ```
674 pub fn new() -> Builder {
675 Builder {
676 // Default max frame length of 8MB
677 max_frame_len: 8 * 1_024 * 1_024,
678
679 // Default byte length of 4
680 length_field_len: 4,
681
682 // Default to the header field being at the start of the header.
683 length_field_offset: 0,
684
685 length_adjustment: 0,
686
687 // Total number of bytes to skip before reading the payload, if not set,
688 // `length_field_len + length_field_offset`
689 num_skip: None,
690
691 // Default to reading the length field in network (big) endian.
692 length_field_is_big_endian: true,
693 }
694 }
695
696 /// Read the length field as a big endian integer
697 ///
698 /// This is the default setting.
699 ///
700 /// This configuration option applies to both encoding and decoding.
701 ///
702 /// # Examples
703 ///
704 /// ```
705 /// # use monoio::io::AsyncReadRent;
706 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
707 ///
708 /// # fn bind_read<T: AsyncReadRent>(io: T) {
709 /// LengthDelimitedCodec::builder().big_endian().new_read(io);
710 /// # }
711 /// # pub fn main() {}
712 /// ```
713 pub fn big_endian(&mut self) -> &mut Self {
714 self.length_field_is_big_endian = true;
715 self
716 }
717
718 /// Read the length field as a little endian integer
719 ///
720 /// The default setting is big endian.
721 ///
722 /// This configuration option applies to both encoding and decoding.
723 ///
724 /// # Examples
725 ///
726 /// ```
727 /// # use monoio::io::AsyncReadRent;
728 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
729 ///
730 /// # fn bind_read<T: AsyncReadRent>(io: T) {
731 /// LengthDelimitedCodec::builder().little_endian().new_read(io);
732 /// # }
733 /// # pub fn main() {}
734 /// ```
735 pub fn little_endian(&mut self) -> &mut Self {
736 self.length_field_is_big_endian = false;
737 self
738 }
739
740 /// Read the length field as a native endian integer
741 ///
742 /// The default setting is big endian.
743 ///
744 /// This configuration option applies to both encoding and decoding.
745 ///
746 /// # Examples
747 ///
748 /// ```
749 /// # use monoio::io::AsyncReadRent;
750 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
751 ///
752 /// # fn bind_read<T: AsyncReadRent>(io: T) {
753 /// LengthDelimitedCodec::builder().native_endian().new_read(io);
754 /// # }
755 /// # pub fn main() {}
756 /// ```
757 pub fn native_endian(&mut self) -> &mut Self {
758 if cfg!(target_endian = "big") {
759 self.big_endian()
760 } else {
761 self.little_endian()
762 }
763 }
764
765 /// Sets the max frame length in bytes
766 ///
767 /// This configuration option applies to both encoding and decoding. The
768 /// default value is 8MB.
769 ///
770 /// When decoding, the length field read from the byte stream is checked
771 /// against this setting **before** any adjustments are applied. When
772 /// encoding, the length of the submitted payload is checked against this
773 /// setting.
774 ///
775 /// When frames exceed the max length, an `io::Error` with the custom value
776 /// of the `LengthDelimitedCodecError` type will be returned.
777 ///
778 /// # Examples
779 ///
780 /// ```
781 /// # use monoio::io::AsyncReadRent;
782 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
783 ///
784 /// # fn bind_read<T: AsyncReadRent>(io: T) {
785 /// LengthDelimitedCodec::builder()
786 /// .max_frame_length(8 * 1024 * 1024)
787 /// .new_read(io);
788 /// # }
789 /// # pub fn main() {}
790 /// ```
791 pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
792 self.max_frame_len = val;
793 self
794 }
795
796 /// Sets the unsigned integer type used to represent the length field.
797 ///
798 /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on
799 /// 64-bit targets).
800 ///
801 /// # Examples
802 ///
803 /// ```
804 /// # use monoio::io::AsyncReadRent;
805 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
806 ///
807 /// # fn bind_read<T: AsyncReadRent>(io: T) {
808 /// LengthDelimitedCodec::builder()
809 /// .length_field_type::<u32>()
810 /// .new_read(io);
811 /// # }
812 /// # pub fn main() {}
813 /// ```
814 ///
815 /// Unlike [`Builder::length_field_length`], this does not fail at runtime
816 /// and instead produces a compile error:
817 ///
818 /// ```compile_fail
819 /// # use monoio::io::AsyncReadRent;
820 /// # use monoio_codec::length_delimited::LengthDelimitedCodec;
821 /// # fn bind_read<T: AsyncReadRent>(io: T) {
822 /// LengthDelimitedCodec::builder()
823 /// .length_field_type::<u128>()
824 /// .new_read(io);
825 /// # }
826 /// # pub fn main() {}
827 /// ```
828 pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self {
829 self.length_field_length(mem::size_of::<T>())
830 }
831
832 /// Sets the number of bytes used to represent the length field
833 ///
834 /// The default value is `4`. The max value is `8`.
835 ///
836 /// This configuration option applies to both encoding and decoding.
837 ///
838 /// # Examples
839 ///
840 /// ```
841 /// # use monoio::io::AsyncReadRent;
842 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
843 ///
844 /// # fn bind_read<T: AsyncReadRent>(io: T) {
845 /// LengthDelimitedCodec::builder()
846 /// .length_field_length(4)
847 /// .new_read(io);
848 /// # }
849 /// # pub fn main() {}
850 /// ```
851 pub fn length_field_length(&mut self, val: usize) -> &mut Self {
852 assert!(val > 0 && val <= 8, "invalid length field length");
853 self.length_field_len = val;
854 self
855 }
856
857 /// Sets the number of bytes in the header before the length field
858 ///
859 /// This configuration option only applies to decoding.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// # use monoio::io::AsyncReadRent;
865 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
866 ///
867 /// # fn bind_read<T: AsyncReadRent>(io: T) {
868 /// LengthDelimitedCodec::builder()
869 /// .length_field_offset(1)
870 /// .new_read(io);
871 /// # }
872 /// # pub fn main() {}
873 /// ```
874 pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
875 self.length_field_offset = val;
876 self
877 }
878
879 /// Delta between the payload length specified in the header and the real
880 /// payload length
881 ///
882 /// # Examples
883 ///
884 /// ```
885 /// # use monoio::io::AsyncReadRent;
886 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
887 ///
888 /// # fn bind_read<T: AsyncReadRent>(io: T) {
889 /// LengthDelimitedCodec::builder()
890 /// .length_adjustment(-2)
891 /// .new_read(io);
892 /// # }
893 /// # pub fn main() {}
894 /// ```
895 pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
896 self.length_adjustment = val;
897 self
898 }
899
900 /// Sets the number of bytes to skip before reading the payload
901 ///
902 /// Default value is `length_field_len + length_field_offset`
903 ///
904 /// This configuration option only applies to decoding
905 ///
906 /// # Examples
907 ///
908 /// ```
909 /// # use monoio::io::AsyncReadRent;
910 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
911 ///
912 /// # fn bind_read<T: AsyncReadRent>(io: T) {
913 /// LengthDelimitedCodec::builder().num_skip(4).new_read(io);
914 /// # }
915 /// # pub fn main() {}
916 /// ```
917 pub fn num_skip(&mut self, val: usize) -> &mut Self {
918 self.num_skip = Some(val);
919 self
920 }
921
922 /// Create a configured length delimited `LengthDelimitedCodec`
923 ///
924 /// # Examples
925 ///
926 /// ```
927 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
928 /// # pub fn main() {
929 /// LengthDelimitedCodec::builder()
930 /// .length_field_offset(0)
931 /// .length_field_type::<u16>()
932 /// .length_adjustment(0)
933 /// .num_skip(0)
934 /// .new_codec();
935 /// # }
936 /// ```
937 pub fn new_codec(&self) -> LengthDelimitedCodec {
938 LengthDelimitedCodec {
939 builder: *self,
940 state: DecodeState::Head,
941 }
942 }
943
944 /// Create a configured length delimited `FramedRead`
945 ///
946 /// # Examples
947 ///
948 /// ```
949 /// # use monoio::io::AsyncReadRent;
950 /// use monoio_codec::length_delimited::LengthDelimitedCodec;
951 ///
952 /// # fn bind_read<T: AsyncReadRent>(io: T) {
953 /// LengthDelimitedCodec::builder()
954 /// .length_field_offset(0)
955 /// .length_field_type::<u16>()
956 /// .length_adjustment(0)
957 /// .num_skip(0)
958 /// .new_read(io);
959 /// # }
960 /// # pub fn main() {}
961 /// ```
962 pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec>
963 where
964 T: AsyncReadRent,
965 {
966 FramedRead::new(upstream, self.new_codec())
967 }
968
969 /// Create a configured length delimited `FramedWrite`
970 ///
971 /// # Examples
972 ///
973 /// ```
974 /// # use monoio::io::AsyncWriteRent;
975 /// # use monoio_codec::length_delimited::LengthDelimitedCodec;
976 /// # fn write_frame<T: AsyncWriteRent>(io: T) {
977 /// LengthDelimitedCodec::builder()
978 /// .length_field_type::<u16>()
979 /// .new_write(io);
980 /// # }
981 /// # pub fn main() {}
982 /// ```
983 pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec>
984 where
985 T: AsyncWriteRent,
986 {
987 FramedWrite::new(inner, self.new_codec())
988 }
989
990 /// Create a configured length delimited `Framed`
991 ///
992 /// # Examples
993 ///
994 /// ```
995 /// # use monoio::io::{AsyncReadRent, AsyncWriteRent};
996 /// # use monoio_codec::length_delimited::LengthDelimitedCodec;
997 /// # fn write_frame<T: AsyncReadRent + AsyncWriteRent>(io: T) {
998 /// # let _ =
999 /// LengthDelimitedCodec::builder()
1000 /// .length_field_type::<u16>()
1001 /// .new_framed(io);
1002 /// # }
1003 /// # pub fn main() {}
1004 /// ```
1005 pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec>
1006 where
1007 T: AsyncReadRent + AsyncWriteRent,
1008 {
1009 Framed::new(inner, self.new_codec())
1010 }
1011
1012 fn num_head_bytes(&self) -> usize {
1013 let num = self.length_field_offset + self.length_field_len;
1014 cmp::max(num, self.num_skip.unwrap_or(0))
1015 }
1016
1017 fn get_num_skip(&self) -> usize {
1018 self.num_skip
1019 .unwrap_or(self.length_field_offset + self.length_field_len)
1020 }
1021}
1022
1023impl Default for Builder {
1024 fn default() -> Self {
1025 Self::new()
1026 }
1027}
1028
1029// ===== impl LengthDelimitedCodecError =====
1030
1031impl fmt::Debug for LengthDelimitedCodecError {
1032 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1033 f.debug_struct("LengthDelimitedCodecError").finish()
1034 }
1035}
1036
1037impl fmt::Display for LengthDelimitedCodecError {
1038 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1039 f.write_str("frame size too big")
1040 }
1041}
1042
1043impl StdError for LengthDelimitedCodecError {}