1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
//! Stream type for splitting a delimited stream
//! ```rust
//! # use tokio_util::io::ReaderStream;
//! # use delimiter_slice::DelimiterSlice;
//! # use futures_util::StreamExt;
//! # #[tokio::main]
//! # async fn main() {
//! const TEST: &[u8] = b"FOOBARFOOBARBAZFOO";
//! const DELIM: &[u8] = b"BAZ";
//!
//! let stream = ReaderStream::new(TEST);
//! let mut slice_stream = DelimiterSlice::new(stream, DELIM);
//! let data = slice_stream.next().await.unwrap().unwrap();
//! assert_eq!(&data, &TEST[0..12]);
//! let data = slice_stream.next().await.unwrap().unwrap();
//! assert_eq!(&data, &TEST[15..]);
//! # }
//! ```
use std::{
pin::Pin,
task::{Context, Poll},
};
use bytes::{Buf, Bytes, BytesMut};
use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
pin_project! {
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct DelimiterSlice<St, D> {
#[pin]
stream: St,
buf: BytesMut,
delimiter: D,
found: bool,
}
}
impl<St, D> DelimiterSlice<St, D> {
/// Create a new `DelimiterSlice` based on the provided stream and delimiter.
///
/// This defaults instantiating the underlying buffer with a capacity of 8,192 bytes.
pub fn new(stream: St, delimiter: D) -> Self {
Self::with_capacity(stream, 8_192, delimiter)
}
/// Create a new `DelimiterSlice` based on the provided stream, delimiter, and capacity.
pub fn with_capacity(stream: St, capacity: usize, delimiter: D) -> Self {
Self {
stream,
buf: BytesMut::with_capacity(capacity),
delimiter,
found: false,
}
}
/// Return the wrapped stream.
///
/// This is useful once the delimiter has been returned and the internal buffer has been cleared
/// by calling `next()` again.
///
/// # Panics
///
/// Panics if the internal buffer is not empty when this is called. The stated purpose of this
/// library is to provide a simple and safe way to extract data from a delimited stream, but
/// allow the stream to continue producing the data in the order it was received.
///
/// If you've called into_inner before a second call to `next` this is likely an error.
pub fn into_inner(self) -> St {
assert!(self.buf.is_empty());
self.stream
}
}
impl<St, D> Stream for DelimiterSlice<St, D>
where
D: AsRef<[u8]>,
St: Stream<Item = Result<Bytes, std::io::Error>>,
{
type Item = Result<Bytes, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
let mut this = self.project();
if *this.found {
if this.buf.is_empty() {
return this.stream.poll_next(cx);
} else {
return Poll::Ready(Some(Ok(this.buf.split().freeze())));
}
}
let mut exhausted = false;
loop {
let delim = this.delimiter.as_ref();
if let Some(index) = this
.buf
.windows(delim.len())
.position(|window| window.eq(delim))
{
let data = this.buf.split_to(index).freeze();
this.buf.advance(delim.len());
*this.found = true;
return Poll::Ready(Some(Ok(data)));
}
if exhausted {
return Poll::Ready(None);
}
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(data)) => this.buf.extend_from_slice(&data),
Some(error) => return Poll::Ready(Some(error)),
None => exhausted = true,
}
}
}
}