1use std::{
20 pin::Pin,
21 task::{Context, Poll},
22};
23
24use bytes::{Buf, Bytes, BytesMut};
25use futures_core::{ready, Stream};
26use pin_project_lite::pin_project;
27
28pin_project! {
29 #[derive(Debug)]
30 #[must_use = "streams do nothing unless polled"]
31 pub struct DelimiterSlice<St, D> {
32 #[pin]
33 stream: St,
34 buf: BytesMut,
35 delimiter: D,
36 found: bool,
37 limit: usize,
38 }
39}
40
41impl<St, D> DelimiterSlice<St, D> {
42 const CAPACITY: usize = 8_192;
43
44 const LIMIT: usize = usize::MAX;
45
46 pub fn new(stream: St, delimiter: D) -> Self {
50 Self::with_capacity_and_limit(stream, delimiter, Self::CAPACITY, Self::LIMIT)
51 }
52
53 pub fn with_capacity(stream: St, delimiter: D, capacity: usize) -> Self {
54 Self::with_capacity_and_limit(stream, delimiter, capacity, Self::LIMIT)
55 }
56
57 pub fn with_limit(stream: St, delimiter: D, limit: usize) -> Self {
58 Self::with_capacity_and_limit(stream, delimiter, Self::CAPACITY, limit)
59 }
60
61 pub fn with_capacity_and_limit(
62 stream: St,
63 delimiter: D,
64 capacity: usize,
65 limit: usize,
66 ) -> Self {
67 Self {
68 stream,
69 buf: BytesMut::with_capacity(capacity),
70 delimiter,
71 found: false,
72 limit,
73 }
74 }
75
76 pub fn into_inner(self) -> St {
89 assert!(self.buf.is_empty());
90 self.stream
91 }
92
93 fn limit_reached_err() -> std::io::Error {
94 std::io::Error::new(std::io::ErrorKind::OutOfMemory, "Limit exceeded")
95 }
96}
97
98impl<St, D> Stream for DelimiterSlice<St, D>
99where
100 D: AsRef<[u8]>,
101 St: Stream<Item = Result<Bytes, std::io::Error>>,
102{
103 type Item = Result<Bytes, std::io::Error>;
104
105 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
106 let mut this = self.project();
107
108 if *this.found {
109 if this.buf.is_empty() {
110 return this.stream.poll_next(cx);
111 } else {
112 return Poll::Ready(Some(Ok(this.buf.split().freeze())));
113 }
114 }
115
116 let mut exhausted = false;
117
118 loop {
119 let delim = this.delimiter.as_ref();
120 if let Some(index) = this
121 .buf
122 .windows(delim.len())
123 .position(|window| window.eq(delim))
124 {
125 let data = this.buf.split_to(index).freeze();
126 this.buf.advance(delim.len());
127 *this.found = true;
128 return Poll::Ready(Some(Ok(data)));
129 }
130
131 if exhausted {
132 return Poll::Ready(None);
133 }
134
135 match ready!(this.stream.as_mut().poll_next(cx)) {
136 Some(Ok(data)) => {
137 if this.buf.len().saturating_add(data.len()).ge(this.limit) {
138 return Poll::Ready(Some(Err(Self::limit_reached_err())));
139 } else {
140 this.buf.extend_from_slice(&data);
141 continue;
142 }
143 }
144 Some(error) => return Poll::Ready(Some(error)),
145 None => exhausted = true,
146 }
147 }
148 }
149}