tokio-util_wasi 0.7.4

Additional utilities for working with Tokio.
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::{cmp, fmt, io, str, usize};

const DEFAULT_SEEK_DELIMITERS: &[u8] = b",;\n\r";
const DEFAULT_SEQUENCE_WRITER: &[u8] = b",";
/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into chunks based on any character in the given delimiter string.
///
/// [`Decoder`]: crate::codec::Decoder
/// [`Encoder`]: crate::codec::Encoder
///
/// # Example
/// Decode string of bytes containing various different delimiters.
///
/// [`BytesMut`]: bytes::BytesMut
/// [`Error`]: std::io::Error
///
/// ```
/// use tokio_util::codec::{AnyDelimiterCodec, Decoder};
/// use bytes::{BufMut, BytesMut};
///
/// #
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), std::io::Error> {
/// let mut codec = AnyDelimiterCodec::new(b",;\r\n".to_vec(),b";".to_vec());
/// let buf = &mut BytesMut::new();
/// buf.reserve(200);
/// buf.put_slice(b"chunk 1,chunk 2;chunk 3\n\r");
/// assert_eq!("chunk 1", codec.decode(buf).unwrap().unwrap());
/// assert_eq!("chunk 2", codec.decode(buf).unwrap().unwrap());
/// assert_eq!("chunk 3", codec.decode(buf).unwrap().unwrap());
/// assert_eq!("", codec.decode(buf).unwrap().unwrap());
/// assert_eq!(None, codec.decode(buf).unwrap());
/// # Ok(())
/// # }
/// ```
///
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AnyDelimiterCodec {
    // Stored index of the next index to examine for the delimiter character.
    // This is used to optimize searching.
    // For example, if `decode` was called with `abc` and the delimiter is '{}', it would hold `3`,
    // because that is the next index to examine.
    // The next time `decode` is called with `abcde}`, the method will
    // only look at `de}` before returning.
    next_index: usize,

    /// The maximum length for a given chunk. If `usize::MAX`, chunks will be
    /// read until a delimiter character is reached.
    max_length: usize,

    /// Are we currently discarding the remainder of a chunk which was over
    /// the length limit?
    is_discarding: bool,

    /// The bytes that are using for search during decode
    seek_delimiters: Vec<u8>,

    /// The bytes that are using for encoding
    sequence_writer: Vec<u8>,
}

impl AnyDelimiterCodec {
    /// Returns a `AnyDelimiterCodec` for splitting up data into chunks.
    ///
    /// # Note
    ///
    /// The returned `AnyDelimiterCodec` will not have an upper bound on the length
    /// of a buffered chunk. See the documentation for [`new_with_max_length`]
    /// for information on why this could be a potential security risk.
    ///
    /// [`new_with_max_length`]: crate::codec::AnyDelimiterCodec::new_with_max_length()
    pub fn new(seek_delimiters: Vec<u8>, sequence_writer: Vec<u8>) -> AnyDelimiterCodec {
        AnyDelimiterCodec {
            next_index: 0,
            max_length: usize::MAX,
            is_discarding: false,
            seek_delimiters,
            sequence_writer,
        }
    }

    /// Returns a `AnyDelimiterCodec` with a maximum chunk length limit.
    ///
    /// If this is set, calls to `AnyDelimiterCodec::decode` will return a
    /// [`AnyDelimiterCodecError`] when a chunk exceeds the length limit. Subsequent calls
    /// will discard up to `limit` bytes from that chunk until a delimiter
    /// character is reached, returning `None` until the delimiter over the limit
    /// has been fully discarded. After that point, calls to `decode` will
    /// function as normal.
    ///
    /// # Note
    ///
    /// Setting a length limit is highly recommended for any `AnyDelimiterCodec` which
    /// will be exposed to untrusted input. Otherwise, the size of the buffer
    /// that holds the chunk currently being read is unbounded. An attacker could
    /// exploit this unbounded buffer by sending an unbounded amount of input
    /// without any delimiter characters, causing unbounded memory consumption.
    ///
    /// [`AnyDelimiterCodecError`]: crate::codec::AnyDelimiterCodecError
    pub fn new_with_max_length(
        seek_delimiters: Vec<u8>,
        sequence_writer: Vec<u8>,
        max_length: usize,
    ) -> Self {
        AnyDelimiterCodec {
            max_length,
            ..AnyDelimiterCodec::new(seek_delimiters, sequence_writer)
        }
    }

    /// Returns the maximum chunk length when decoding.
    ///
    /// ```
    /// use std::usize;
    /// use tokio_util::codec::AnyDelimiterCodec;
    ///
    /// let codec = AnyDelimiterCodec::new(b",;\n".to_vec(), b";".to_vec());
    /// assert_eq!(codec.max_length(), usize::MAX);
    /// ```
    /// ```
    /// use tokio_util::codec::AnyDelimiterCodec;
    ///
    /// let codec = AnyDelimiterCodec::new_with_max_length(b",;\n".to_vec(), b";".to_vec(), 256);
    /// assert_eq!(codec.max_length(), 256);
    /// ```
    pub fn max_length(&self) -> usize {
        self.max_length
    }
}

impl Decoder for AnyDelimiterCodec {
    type Item = Bytes;
    type Error = AnyDelimiterCodecError;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> {
        loop {
            // Determine how far into the buffer we'll search for a delimiter. If
            // there's no max_length set, we'll read to the end of the buffer.
            let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());

            let new_chunk_offset = buf[self.next_index..read_to].iter().position(|b| {
                self.seek_delimiters
                    .iter()
                    .any(|delimiter| *b == *delimiter)
            });

            match (self.is_discarding, new_chunk_offset) {
                (true, Some(offset)) => {
                    // If we found a new chunk, discard up to that offset and
                    // then stop discarding. On the next iteration, we'll try
                    // to read a chunk normally.
                    buf.advance(offset + self.next_index + 1);
                    self.is_discarding = false;
                    self.next_index = 0;
                }
                (true, None) => {
                    // Otherwise, we didn't find a new chunk, so we'll discard
                    // everything we read. On the next iteration, we'll continue
                    // discarding up to max_len bytes unless we find a new chunk.
                    buf.advance(read_to);
                    self.next_index = 0;
                    if buf.is_empty() {
                        return Ok(None);
                    }
                }
                (false, Some(offset)) => {
                    // Found a chunk!
                    let new_chunk_index = offset + self.next_index;
                    self.next_index = 0;
                    let mut chunk = buf.split_to(new_chunk_index + 1);
                    chunk.truncate(chunk.len() - 1);
                    let chunk = chunk.freeze();
                    return Ok(Some(chunk));
                }
                (false, None) if buf.len() > self.max_length => {
                    // Reached the maximum length without finding a
                    // new chunk, return an error and start discarding on the
                    // next call.
                    self.is_discarding = true;
                    return Err(AnyDelimiterCodecError::MaxChunkLengthExceeded);
                }
                (false, None) => {
                    // We didn't find a chunk or reach the length limit, so the next
                    // call will resume searching at the current offset.
                    self.next_index = read_to;
                    return Ok(None);
                }
            }
        }
    }

    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> {
        Ok(match self.decode(buf)? {
            Some(frame) => Some(frame),
            None => {
                // return remaining data, if any
                if buf.is_empty() {
                    None
                } else {
                    let chunk = buf.split_to(buf.len());
                    self.next_index = 0;
                    Some(chunk.freeze())
                }
            }
        })
    }
}

impl<T> Encoder<T> for AnyDelimiterCodec
where
    T: AsRef<str>,
{
    type Error = AnyDelimiterCodecError;

    fn encode(&mut self, chunk: T, buf: &mut BytesMut) -> Result<(), AnyDelimiterCodecError> {
        let chunk = chunk.as_ref();
        buf.reserve(chunk.len() + 1);
        buf.put(chunk.as_bytes());
        buf.put(self.sequence_writer.as_ref());

        Ok(())
    }
}

impl Default for AnyDelimiterCodec {
    fn default() -> Self {
        Self::new(
            DEFAULT_SEEK_DELIMITERS.to_vec(),
            DEFAULT_SEQUENCE_WRITER.to_vec(),
        )
    }
}

/// An error occurred while encoding or decoding a chunk.
#[derive(Debug)]
pub enum AnyDelimiterCodecError {
    /// The maximum chunk length was exceeded.
    MaxChunkLengthExceeded,
    /// An IO error occurred.
    Io(io::Error),
}

impl fmt::Display for AnyDelimiterCodecError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            AnyDelimiterCodecError::MaxChunkLengthExceeded => {
                write!(f, "max chunk length exceeded")
            }
            AnyDelimiterCodecError::Io(e) => write!(f, "{}", e),
        }
    }
}

impl From<io::Error> for AnyDelimiterCodecError {
    fn from(e: io::Error) -> AnyDelimiterCodecError {
        AnyDelimiterCodecError::Io(e)
    }
}

impl std::error::Error for AnyDelimiterCodecError {}