1use std::fmt::{self, Display, Formatter};
2use std::mem;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{Buf, Bytes};
7use futures_core::{ready, Stream};
8use http_body::Body;
9use pin_project_lite::pin_project;
10
11use crate::error::Error;
12
13macro_rules! str_enum {
15 (
16 $(#[$attr:meta])*
17 pub enum $E:ident {
18 $(
19 $(#[$v_attr:meta])*
20 $V:ident = $by:expr
21 ),*$(,)?
22 }
23 ) => {
24 $(#[$attr])*
25 pub enum $E {
26 $(
27 $(#[$v_attr])*
28 $V,
29 )*
30 }
31
32 impl std::convert::AsRef<str> for $E {
33 fn as_ref(&self) -> &str {
34 match *self {
35 $($E::$V => $by,)*
36 }
37 }
38 }
39 }
40}
41
42pin_project! {
43 pub struct Lines<B> {
44 #[pin]
45 body: B,
46 body_done: bool,
47 buf: Bytes,
48 }
49}
50
51impl<B: Body> Lines<B> {
52 pub fn new(body: B) -> Self {
53 Lines {
54 body,
55 body_done: false,
56 buf: Bytes::new(),
57 }
58 }
59
60 fn poll_body(
61 self: Pin<&mut Self>,
62 cx: &mut Context<'_>,
63 ) -> Poll<Option<Result<B::Data, Error<B::Error>>>> {
64 let this = self.project();
65 if *this.body_done {
66 Poll::Ready(None)
67 } else if let Some(result) = ready!(this.body.poll_data(cx)) {
68 Poll::Ready(Some(result.map_err(Error::Service)))
69 } else {
70 *this.body_done = true;
71 Poll::Ready(None)
72 }
73 }
74}
75
76impl<B: Body> Stream for Lines<B> {
77 type Item = Result<Bytes, Error<B::Error>>;
78
79 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 if let Some(line) = remove_first_line(self.as_mut().project().buf) {
81 return Poll::Ready(Some(Ok(line)));
82 }
83
84 loop {
88 let mut chunk = loop {
89 if let Some(c) = ready!(self.as_mut().poll_body(cx)?) {
90 if c.has_remaining() {
91 break c;
92 }
93 } else if self.buf.is_empty() {
94 return Poll::Ready(None);
95 } else {
96 let ret = mem::take(self.as_mut().project().buf);
98 return Poll::Ready(Some(Ok(ret)));
99 }
100 };
101
102 let this = self.as_mut().project();
103
104 if chunk.chunk()[0] == b'\n' && this.buf.last() == Some(&b'\r') {
105 this.buf.truncate(this.buf.len() - 1);
107 chunk.advance(1);
108
109 let chunk = chunk.copy_to_bytes(chunk.remaining());
110 return Poll::Ready(Some(Ok(mem::replace(this.buf, chunk))));
111 } else {
112 let mut chunk = chunk.copy_to_bytes(chunk.remaining());
113 if let Some(line) = remove_first_line(&mut chunk) {
114 let ret = if this.buf.is_empty() {
115 line
116 } else {
117 let mut ret = Vec::with_capacity(this.buf.len() + line.len());
118 ret.extend_from_slice(&this.buf);
119 ret.extend_from_slice(&line);
120 ret.into()
121 };
122 *this.buf = chunk;
123 return Poll::Ready(Some(Ok(ret)));
124 } else {
125 *this.buf = if this.buf.is_empty() {
126 chunk
127 } else {
128 let mut buf = Vec::with_capacity(this.buf.len() + chunk.len());
129 buf.extend_from_slice(&this.buf);
130 buf.extend_from_slice(&chunk);
131 buf.into()
132 }
133 }
134 }
135 }
136 }
137}
138
139pub fn fmt_join<T: Display>(t: &[T], sep: &str, f: &mut Formatter<'_>) -> fmt::Result {
140 let mut iter = t.iter();
141 if let Some(t) = iter.next() {
142 Display::fmt(t, f)?;
143 for t in iter {
144 write!(f, "{}{}", sep, t)?;
145 }
146 }
147 Ok(())
148}
149
150fn remove_first_line(buf: &mut Bytes) -> Option<Bytes> {
151 if buf.len() < 2 {
152 return None;
153 }
154
155 if let Some(i) = memchr::memchr(b'\n', &buf[1..]) {
156 if buf[i] == b'\r' {
157 let mut line = buf.split_to(i + 2);
158 line.truncate(i); return Some(line);
160 }
161 }
162
163 None
164}
165
166#[cfg(test)]
167mod test {
168 use super::*;
169 use bytes::Bytes;
170 use futures::executor::block_on_stream;
171 use futures::stream::{self, StreamExt, TryStream};
172
173 pin_project! {
174 struct StreamBody<S> {
175 #[pin]
176 stream: S,
177 }
178 }
179
180 impl<S: TryStream> Body for StreamBody<S>
181 where
182 S::Ok: Buf,
183 {
184 type Data = S::Ok;
185 type Error = S::Error;
186
187 fn poll_data(
188 self: Pin<&mut Self>,
189 cx: &mut Context<'_>,
190 ) -> Poll<Option<Result<S::Ok, S::Error>>> {
191 self.project().stream.try_poll_next(cx)
192 }
193
194 fn poll_trailers(
195 self: Pin<&mut Self>,
196 _: &mut Context<'_>,
197 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
198 Poll::Ready(Ok(None))
199 }
200 }
201
202 #[test]
203 fn lines() {
204 let body = [
205 "abc\r\n",
206 "d\r\nefg\r\n",
207 "hi",
208 "jk",
209 "",
210 "\r\n",
211 "\r\n",
212 "lmn\r\nop",
213 "q\rrs\r",
214 "\n\n\rtuv\r\r\n",
215 "wxyz\n",
216 ];
217
218 let concat = body.concat();
219 let expected = concat.split("\r\n");
220 let lines = Lines::new(StreamBody {
221 stream: stream::iter(&body).map(|&c| Ok(Bytes::from_static(c.as_bytes()))),
222 });
223 let lines = block_on_stream(lines)
224 .map(|s: Result<_, Error>| String::from_utf8(s.unwrap().to_vec()).unwrap());
225
226 assert_eq!(lines.collect::<Vec<_>>(), expected.collect::<Vec<_>>());
227 }
228}