twitter_stream/
util.rs

1use std::fmt::{self, Display, Formatter};
2use std::mem;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{Buf, Bytes};
7use futures_core::{ready, Stream};
8use http_body::Body;
9use pin_project_lite::pin_project;
10
11use crate::error::Error;
12
13/// Creates an enum with `AsRef<str>` impl.
14macro_rules! str_enum {
15    (
16        $(#[$attr:meta])*
17        pub enum $E:ident {
18            $(
19                $(#[$v_attr:meta])*
20                $V:ident = $by:expr
21            ),*$(,)?
22        }
23    ) => {
24        $(#[$attr])*
25        pub enum $E {
26            $(
27                $(#[$v_attr])*
28                $V,
29            )*
30        }
31
32        impl std::convert::AsRef<str> for $E {
33            fn as_ref(&self) -> &str {
34                match *self {
35                    $($E::$V => $by,)*
36                }
37            }
38        }
39    }
40}
41
42pin_project! {
43    pub struct Lines<B> {
44        #[pin]
45        body: B,
46        body_done: bool,
47        buf: Bytes,
48    }
49}
50
51impl<B: Body> Lines<B> {
52    pub fn new(body: B) -> Self {
53        Lines {
54            body,
55            body_done: false,
56            buf: Bytes::new(),
57        }
58    }
59
60    fn poll_body(
61        self: Pin<&mut Self>,
62        cx: &mut Context<'_>,
63    ) -> Poll<Option<Result<B::Data, Error<B::Error>>>> {
64        let this = self.project();
65        if *this.body_done {
66            Poll::Ready(None)
67        } else if let Some(result) = ready!(this.body.poll_data(cx)) {
68            Poll::Ready(Some(result.map_err(Error::Service)))
69        } else {
70            *this.body_done = true;
71            Poll::Ready(None)
72        }
73    }
74}
75
76impl<B: Body> Stream for Lines<B> {
77    type Item = Result<Bytes, Error<B::Error>>;
78
79    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        if let Some(line) = remove_first_line(self.as_mut().project().buf) {
81            return Poll::Ready(Some(Ok(line)));
82        }
83
84        // Now `self.buf` does not have a CRLF.
85        // Extend the buffer until a CRLF is found.
86
87        loop {
88            let mut chunk = loop {
89                if let Some(c) = ready!(self.as_mut().poll_body(cx)?) {
90                    if c.has_remaining() {
91                        break c;
92                    }
93                } else if self.buf.is_empty() {
94                    return Poll::Ready(None);
95                } else {
96                    // `self.buf` does not have CRLF so it is safe to return its content as-is.
97                    let ret = mem::take(self.as_mut().project().buf);
98                    return Poll::Ready(Some(Ok(ret)));
99                }
100            };
101
102            let this = self.as_mut().project();
103
104            if chunk.chunk()[0] == b'\n' && this.buf.last() == Some(&b'\r') {
105                // Drop the CRLF
106                this.buf.truncate(this.buf.len() - 1);
107                chunk.advance(1);
108
109                let chunk = chunk.copy_to_bytes(chunk.remaining());
110                return Poll::Ready(Some(Ok(mem::replace(this.buf, chunk))));
111            } else {
112                let mut chunk = chunk.copy_to_bytes(chunk.remaining());
113                if let Some(line) = remove_first_line(&mut chunk) {
114                    let ret = if this.buf.is_empty() {
115                        line
116                    } else {
117                        let mut ret = Vec::with_capacity(this.buf.len() + line.len());
118                        ret.extend_from_slice(&this.buf);
119                        ret.extend_from_slice(&line);
120                        ret.into()
121                    };
122                    *this.buf = chunk;
123                    return Poll::Ready(Some(Ok(ret)));
124                } else {
125                    *this.buf = if this.buf.is_empty() {
126                        chunk
127                    } else {
128                        let mut buf = Vec::with_capacity(this.buf.len() + chunk.len());
129                        buf.extend_from_slice(&this.buf);
130                        buf.extend_from_slice(&chunk);
131                        buf.into()
132                    }
133                }
134            }
135        }
136    }
137}
138
139pub fn fmt_join<T: Display>(t: &[T], sep: &str, f: &mut Formatter<'_>) -> fmt::Result {
140    let mut iter = t.iter();
141    if let Some(t) = iter.next() {
142        Display::fmt(t, f)?;
143        for t in iter {
144            write!(f, "{}{}", sep, t)?;
145        }
146    }
147    Ok(())
148}
149
150fn remove_first_line(buf: &mut Bytes) -> Option<Bytes> {
151    if buf.len() < 2 {
152        return None;
153    }
154
155    if let Some(i) = memchr::memchr(b'\n', &buf[1..]) {
156        if buf[i] == b'\r' {
157            let mut line = buf.split_to(i + 2);
158            line.truncate(i); // Drop the CRLF
159            return Some(line);
160        }
161    }
162
163    None
164}
165
166#[cfg(test)]
167mod test {
168    use super::*;
169    use bytes::Bytes;
170    use futures::executor::block_on_stream;
171    use futures::stream::{self, StreamExt, TryStream};
172
173    pin_project! {
174            struct StreamBody<S> {
175                #[pin]
176                stream: S,
177            }
178    }
179
180    impl<S: TryStream> Body for StreamBody<S>
181    where
182        S::Ok: Buf,
183    {
184        type Data = S::Ok;
185        type Error = S::Error;
186
187        fn poll_data(
188            self: Pin<&mut Self>,
189            cx: &mut Context<'_>,
190        ) -> Poll<Option<Result<S::Ok, S::Error>>> {
191            self.project().stream.try_poll_next(cx)
192        }
193
194        fn poll_trailers(
195            self: Pin<&mut Self>,
196            _: &mut Context<'_>,
197        ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
198            Poll::Ready(Ok(None))
199        }
200    }
201
202    #[test]
203    fn lines() {
204        let body = [
205            "abc\r\n",
206            "d\r\nefg\r\n",
207            "hi",
208            "jk",
209            "",
210            "\r\n",
211            "\r\n",
212            "lmn\r\nop",
213            "q\rrs\r",
214            "\n\n\rtuv\r\r\n",
215            "wxyz\n",
216        ];
217
218        let concat = body.concat();
219        let expected = concat.split("\r\n");
220        let lines = Lines::new(StreamBody {
221            stream: stream::iter(&body).map(|&c| Ok(Bytes::from_static(c.as_bytes()))),
222        });
223        let lines = block_on_stream(lines)
224            .map(|s: Result<_, Error>| String::from_utf8(s.unwrap().to_vec()).unwrap());
225
226        assert_eq!(lines.collect::<Vec<_>>(), expected.collect::<Vec<_>>());
227    }
228}