delimiter_slice/
lib.rs

1//! Stream type for splitting a delimited stream  
2//! ```rust
3//! # use tokio_util::io::ReaderStream;
4//! # use delimiter_slice::DelimiterSlice;
5//! # use futures_util::StreamExt;
6//! # #[tokio::main]
7//! # async fn main() {
8//! const TEST: &[u8] = b"FOOBARFOOBARBAZFOO";
9//! const DELIM: &[u8] = b"BAZ";
10//!
11//! let stream = ReaderStream::new(TEST);
12//! let mut slice_stream = DelimiterSlice::new(stream, DELIM);
13//! let data = slice_stream.next().await.unwrap().unwrap();
14//! assert_eq!(&data, &TEST[0..12]);
15//! let data = slice_stream.next().await.unwrap().unwrap();
16//! assert_eq!(&data, &TEST[15..]);
17//! # }
18//! ```
19use std::{
20    pin::Pin,
21    task::{Context, Poll},
22};
23
24use bytes::{Buf, Bytes, BytesMut};
25use futures_core::{ready, Stream};
26use pin_project_lite::pin_project;
27
28pin_project! {
29    #[derive(Debug)]
30    #[must_use = "streams do nothing unless polled"]
31    pub struct DelimiterSlice<St, D> {
32        #[pin]
33        stream: St,
34    buf: BytesMut,
35        delimiter: D,
36    found: bool,
37    limit: usize,
38    }
39}
40
41impl<St, D> DelimiterSlice<St, D> {
42    const CAPACITY: usize = 8_192;
43
44    const LIMIT: usize = usize::MAX;
45
46    /// Create a new `DelimiterSlice` based on the provided stream and delimiter.
47    ///
48    /// This defaults instantiating the underlying buffer with a capacity of 8,192 bytes.
49    pub fn new(stream: St, delimiter: D) -> Self {
50        Self::with_capacity_and_limit(stream, delimiter, Self::CAPACITY, Self::LIMIT)
51    }
52
53    pub fn with_capacity(stream: St, delimiter: D, capacity: usize) -> Self {
54        Self::with_capacity_and_limit(stream, delimiter, capacity, Self::LIMIT)
55    }
56
57    pub fn with_limit(stream: St, delimiter: D, limit: usize) -> Self {
58        Self::with_capacity_and_limit(stream, delimiter, Self::CAPACITY, limit)
59    }
60
61    pub fn with_capacity_and_limit(
62        stream: St,
63        delimiter: D,
64        capacity: usize,
65        limit: usize,
66    ) -> Self {
67        Self {
68            stream,
69            buf: BytesMut::with_capacity(capacity),
70            delimiter,
71            found: false,
72            limit,
73        }
74    }
75
76    /// Return the wrapped stream.
77    ///
78    /// This is useful once the delimiter has been returned and the internal buffer has been cleared
79    /// by calling `next()` again.
80    ///
81    /// # Panics
82    ///
83    /// Panics if the internal buffer is not empty when this is called. The stated purpose of this
84    /// library is to provide a simple and safe way to extract data from a delimited stream, but
85    /// allow the stream to continue producing the data in the order it was received.
86    ///
87    /// If you've called into_inner before a second call to `next` this is likely an error.
88    pub fn into_inner(self) -> St {
89        assert!(self.buf.is_empty());
90        self.stream
91    }
92
93    fn limit_reached_err() -> std::io::Error {
94        std::io::Error::new(std::io::ErrorKind::OutOfMemory, "Limit exceeded")
95    }
96}
97
98impl<St, D> Stream for DelimiterSlice<St, D>
99where
100    D: AsRef<[u8]>,
101    St: Stream<Item = Result<Bytes, std::io::Error>>,
102{
103    type Item = Result<Bytes, std::io::Error>;
104
105    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
106        let mut this = self.project();
107
108        if *this.found {
109            if this.buf.is_empty() {
110                return this.stream.poll_next(cx);
111            } else {
112                return Poll::Ready(Some(Ok(this.buf.split().freeze())));
113            }
114        }
115
116        let mut exhausted = false;
117
118        loop {
119            let delim = this.delimiter.as_ref();
120            if let Some(index) = this
121                .buf
122                .windows(delim.len())
123                .position(|window| window.eq(delim))
124            {
125                let data = this.buf.split_to(index).freeze();
126                this.buf.advance(delim.len());
127                *this.found = true;
128                return Poll::Ready(Some(Ok(data)));
129            }
130
131            if exhausted {
132                return Poll::Ready(None);
133            }
134
135            match ready!(this.stream.as_mut().poll_next(cx)) {
136                Some(Ok(data)) => {
137                    if this.buf.len().saturating_add(data.len()).ge(this.limit) {
138                        return Poll::Ready(Some(Err(Self::limit_reached_err())));
139                    } else {
140                        this.buf.extend_from_slice(&data);
141                        continue;
142                    }
143                }
144                Some(error) => return Poll::Ready(Some(error)),
145                None => exhausted = true,
146            }
147        }
148    }
149}