Skip to main content

actix_web/types/
readlines.rs

1//! For request line reader documentation, see [`Readlines`].
2
3use std::{
4    borrow::Cow,
5    pin::Pin,
6    str,
7    task::{Context, Poll},
8};
9
10use bytes::{Bytes, BytesMut};
11use encoding_rs::{Encoding, UTF_8};
12use futures_core::{ready, stream::Stream};
13
14use crate::{
15    dev::Payload,
16    error::{PayloadError, ReadlinesError},
17    HttpMessage,
18};
19
20/// Stream that reads request line by line.
21pub struct Readlines<T: HttpMessage> {
22    stream: Payload<T::Stream>,
23    buf: BytesMut,
24    limit: usize,
25    checked_buff: bool,
26    encoding: &'static Encoding,
27    err: Option<ReadlinesError>,
28}
29
30impl<T> Readlines<T>
31where
32    T: HttpMessage,
33    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
34{
35    /// Create a new stream to read request line by line.
36    pub fn new(req: &mut T) -> Self {
37        let encoding = match req.encoding() {
38            Ok(enc) => enc,
39            Err(err) => return Self::err(err.into()),
40        };
41
42        Readlines {
43            stream: req.take_payload(),
44            buf: BytesMut::with_capacity(262_144),
45            limit: 262_144,
46            checked_buff: true,
47            err: None,
48            encoding,
49        }
50    }
51
52    /// Set maximum accepted payload size. The default limit is 256kB.
53    pub fn limit(mut self, limit: usize) -> Self {
54        self.limit = limit;
55        self
56    }
57
58    fn err(err: ReadlinesError) -> Self {
59        Readlines {
60            stream: Payload::None,
61            buf: BytesMut::new(),
62            limit: 262_144,
63            checked_buff: true,
64            encoding: UTF_8,
65            err: Some(err),
66        }
67    }
68
69    /// Decodes one complete logical line using the request's configured encoding.
70    ///
71    /// Callers are expected to pass only the bytes that belong to the line being yielded,
72    /// whether they came from the internal buffer, the current payload chunk, or both.
73    fn decode(encoding: &'static Encoding, bytes: &[u8]) -> Result<String, ReadlinesError> {
74        if encoding == UTF_8 {
75            str::from_utf8(bytes)
76                .map_err(|_| ReadlinesError::EncodingError)
77                .map(str::to_owned)
78        } else {
79            encoding
80                .decode_without_bom_handling_and_without_replacement(bytes)
81                .map(Cow::into_owned)
82                .ok_or(ReadlinesError::EncodingError)
83        }
84    }
85}
86
87impl<T> Stream for Readlines<T>
88where
89    T: HttpMessage,
90    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
91{
92    type Item = Result<String, ReadlinesError>;
93
94    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95        let this = self.get_mut();
96
97        if let Some(err) = this.err.take() {
98            return Poll::Ready(Some(Err(err)));
99        }
100
101        // check if there is a newline in the buffer
102        if !this.checked_buff {
103            let mut found: Option<usize> = None;
104            for (ind, b) in this.buf.iter().enumerate() {
105                if *b == b'\n' {
106                    found = Some(ind);
107                    break;
108                }
109            }
110            if let Some(ind) = found {
111                // check if line is longer than limit
112                if ind + 1 > this.limit {
113                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
114                }
115                let line = Self::decode(this.encoding, &this.buf.split_to(ind + 1))?;
116                return Poll::Ready(Some(Ok(line)));
117            }
118            this.checked_buff = true;
119        }
120
121        // poll req for more bytes
122        match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
123            Some(Ok(mut bytes)) => {
124                // check if there is a newline in bytes
125                let mut found: Option<usize> = None;
126                for (ind, b) in bytes.iter().enumerate() {
127                    if *b == b'\n' {
128                        found = Some(ind);
129                        break;
130                    }
131                }
132                if let Some(ind) = found {
133                    // check if line is longer than limit
134                    if this.buf.len() + ind + 1 > this.limit {
135                        return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
136                    }
137
138                    this.buf.extend_from_slice(&bytes.split_to(ind + 1));
139                    let line = Self::decode(this.encoding, &this.buf)?;
140                    this.buf.clear();
141
142                    // buffer bytes following the returned line
143                    this.buf.extend_from_slice(&bytes);
144                    this.checked_buff = this.buf.is_empty();
145                    return Poll::Ready(Some(Ok(line)));
146                }
147                this.buf.extend_from_slice(&bytes);
148                Poll::Pending
149            }
150
151            None => {
152                if this.buf.is_empty() {
153                    return Poll::Ready(None);
154                }
155                if this.buf.len() > this.limit {
156                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
157                }
158                let line = Self::decode(this.encoding, &this.buf)?;
159                this.buf.clear();
160                Poll::Ready(Some(Ok(line)))
161            }
162
163            Some(Err(err)) => Poll::Ready(Some(Err(ReadlinesError::from(err)))),
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::{
171        pin::Pin,
172        task::{Context, Poll},
173    };
174
175    use actix_http::{h1, Request};
176    use futures_util::{task::noop_waker_ref, StreamExt as _};
177
178    use super::*;
179    use crate::{error::ReadlinesError, test::TestRequest};
180
181    #[actix_rt::test]
182    async fn test_readlines() {
183        let mut req = TestRequest::default()
184            .set_payload(Bytes::from_static(
185                b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
186                  industry. Lorem Ipsum has been the industry's standard dummy\n\
187                  Contrary to popular belief, Lorem Ipsum is not simply random text.",
188            ))
189            .to_request();
190
191        let mut stream = Readlines::new(&mut req);
192        assert_eq!(
193            stream.next().await.unwrap().unwrap(),
194            "Lorem Ipsum is simply dummy text of the printing and typesetting\n"
195        );
196
197        assert_eq!(
198            stream.next().await.unwrap().unwrap(),
199            "industry. Lorem Ipsum has been the industry's standard dummy\n"
200        );
201
202        assert_eq!(
203            stream.next().await.unwrap().unwrap(),
204            "Contrary to popular belief, Lorem Ipsum is not simply random text."
205        );
206    }
207
208    #[test]
209    fn test_readlines_limit_across_chunks() {
210        let (mut sender, payload) = h1::Payload::create(false);
211        let payload: actix_http::Payload = payload.into();
212        let mut req = Request::with_payload(payload);
213        let mut stream = Readlines::new(&mut req).limit(10);
214        let mut cx = Context::from_waker(noop_waker_ref());
215
216        sender.feed_data(Bytes::from_static(b"AAAAAAAAAA"));
217        assert!(matches!(
218            Pin::new(&mut stream).poll_next(&mut cx),
219            Poll::Pending
220        ));
221
222        sender.feed_data(Bytes::from_static(b"A\n"));
223        assert!(matches!(
224            Pin::new(&mut stream).poll_next(&mut cx),
225            Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)))
226        ));
227    }
228
229    #[test]
230    fn test_readlines_returns_full_line_across_chunks() {
231        let (mut sender, payload) = h1::Payload::create(false);
232        let payload: actix_http::Payload = payload.into();
233        let mut req = Request::with_payload(payload);
234        let mut stream = Readlines::new(&mut req);
235        let mut cx = Context::from_waker(noop_waker_ref());
236
237        sender.feed_data(Bytes::from_static(b"hello "));
238        assert!(matches!(
239            Pin::new(&mut stream).poll_next(&mut cx),
240            Poll::Pending
241        ));
242
243        sender.feed_data(Bytes::from_static(b"world\nnext"));
244        assert!(matches!(
245            Pin::new(&mut stream).poll_next(&mut cx),
246            Poll::Ready(Some(Ok(ref line))) if line == "hello world\n"
247        ));
248    }
249}