tokio_transports/framed/
lines_codec.rs

1// Forked from `tokio-util`, but the internal `Decoder` and `Encoder` are modified with `Bytes`.
2
3use std::{cmp, io, str, usize};
4
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use tokio_util::codec::{Decoder, Encoder, LinesCodecError};
7
8/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
9///
10/// [`Decoder`]: tokio_util::codec::Decoder
11/// [`Encoder`]: tokio_util::codec::Encoder
12#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
13pub struct LinesCodec {
14    // Stored index of the next index to examine for a `\n` character.
15    // This is used to optimize searching.
16    // For example, if `decode` was called with `abc`, it would hold `3`,
17    // because that is the next index to examine.
18    // The next time `decode` is called with `abcde\n`, the method will
19    // only look at `de\n` before returning.
20    next_index: usize,
21
22    /// The maximum length for a given line. If `usize::MAX`, lines will be
23    /// read until a `\n` character is reached.
24    max_length: usize,
25
26    /// Are we currently discarding the remainder of a line which was over
27    /// the length limit?
28    is_discarding: bool,
29}
30
31impl LinesCodec {
32    /// Returns a `LinesCodec` for splitting up data into lines.
33    ///
34    /// # Note
35    ///
36    /// The returned `LinesCodec` will not have an upper bound on the length
37    /// of a buffered line. See the documentation for [`new_with_max_length`]
38    /// for information on why this could be a potential security risk.
39    ///
40    /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
41    pub fn new() -> LinesCodec {
42        LinesCodec {
43            next_index: 0,
44            max_length: usize::MAX,
45            is_discarding: false,
46        }
47    }
48
49    /// Returns a `LinesCodec` with a maximum line length limit.
50    ///
51    /// If this is set, calls to `LinesCodec::decode` will return a
52    /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
53    /// will discard up to `limit` bytes from that line until a newline
54    /// character is reached, returning `None` until the line over the limit
55    /// has been fully discarded. After that point, calls to `decode` will
56    /// function as normal.
57    ///
58    /// # Note
59    ///
60    /// Setting a length limit is highly recommended for any `LinesCodec` which
61    /// will be exposed to untrusted input. Otherwise, the size of the buffer
62    /// that holds the line currently being read is unbounded. An attacker could
63    /// exploit this unbounded buffer by sending an unbounded amount of input
64    /// without any `\n` characters, causing unbounded memory consumption.
65    ///
66    /// [`LinesCodecError`]: crate::codec::LinesCodecError
67    pub fn new_with_max_length(max_length: usize) -> Self {
68        LinesCodec {
69            max_length,
70            ..LinesCodec::new()
71        }
72    }
73
74    /// Returns the maximum line length when decoding.
75    ///
76    /// ```
77    /// use std::usize;
78    /// use tokio_util::codec::LinesCodec;
79    ///
80    /// let codec = LinesCodec::new();
81    /// assert_eq!(codec.max_length(), usize::MAX);
82    /// ```
83    /// ```
84    /// use tokio_util::codec::LinesCodec;
85    ///
86    /// let codec = LinesCodec::new_with_max_length(256);
87    /// assert_eq!(codec.max_length(), 256);
88    /// ```
89    pub fn max_length(&self) -> usize {
90        self.max_length
91    }
92}
93
94#[inline]
95fn utf8(buf: &[u8]) -> io::Result<&str> {
96    str::from_utf8(buf)
97        .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to codec input as UTF8"))
98}
99
100#[inline]
101fn without_carriage_return(mut s: BytesMut) -> BytesMut {
102    if let Some(&b'\r') = s.last() {
103        s.split_to(s.len() - 1)
104    } else {
105        s
106    }
107}
108
109impl Decoder for LinesCodec {
110    type Item = BytesMut;
111    type Error = LinesCodecError;
112
113    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, LinesCodecError> {
114        loop {
115            // Determine how far into the buffer we'll search for a newline. If
116            // there's no max_length set, we'll read to the end of the buffer.
117            let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
118
119            let newline_offset = buf[self.next_index..read_to]
120                .iter()
121                .position(|b| *b == b'\n');
122
123            match (self.is_discarding, newline_offset) {
124                (true, Some(offset)) => {
125                    // If we found a newline, discard up to that offset and
126                    // then stop discarding. On the next iteration, we'll try
127                    // to read a line normally.
128                    buf.advance(offset + self.next_index + 1);
129                    self.is_discarding = false;
130                    self.next_index = 0;
131                }
132                (true, None) => {
133                    // Otherwise, we didn't find a newline, so we'll discard
134                    // everything we read. On the next iteration, we'll continue
135                    // discarding up to max_len bytes unless we find a newline.
136                    buf.advance(read_to);
137                    self.next_index = 0;
138                    if buf.is_empty() {
139                        return Ok(None);
140                    }
141                }
142                (false, Some(offset)) => {
143                    // Found a line!
144                    let newline_index = offset + self.next_index;
145                    self.next_index = 0;
146                    let mut line = buf.split_to(newline_index + 1);
147                    let line = line.split_to(line.len() - 1);
148                    let line = without_carriage_return(line);
149                    return Ok(Some(line));
150                }
151                (false, None) if buf.len() > self.max_length => {
152                    // Reached the maximum length without finding a
153                    // newline, return an error and start discarding on the
154                    // next call.
155                    self.is_discarding = true;
156                    return Err(LinesCodecError::MaxLineLengthExceeded);
157                }
158                (false, None) => {
159                    // We didn't find a line or reach the length limit, so the next
160                    // call will resume searching at the current offset.
161                    self.next_index = read_to;
162                    return Ok(None);
163                }
164            }
165        }
166    }
167
168    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, LinesCodecError> {
169        Ok(match self.decode(buf)? {
170            Some(frame) => Some(frame),
171            None => {
172                // No terminating newline - return remaining data, if any
173                if buf.is_empty() || buf == &b"\r"[..] {
174                    None
175                } else {
176                    let line = buf.split_to(buf.len());
177                    let line = without_carriage_return(line);
178                    self.next_index = 0;
179                    Some(line)
180                }
181            }
182        })
183    }
184}
185
186impl Encoder<Bytes> for LinesCodec {
187    type Error = LinesCodecError;
188
189    fn encode(&mut self, line: Bytes, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
190        let line = utf8(&line)?;
191        buf.reserve(line.len() + 1);
192        buf.put(line.as_bytes());
193        buf.put_u8(b'\n');
194        Ok(())
195    }
196}
197
198impl Default for LinesCodec {
199    fn default() -> Self {
200        Self::new()
201    }
202}