actix_web/types/
readlines.rs1use std::{
4 borrow::Cow,
5 pin::Pin,
6 str,
7 task::{Context, Poll},
8};
9
10use bytes::{Bytes, BytesMut};
11use encoding_rs::{Encoding, UTF_8};
12use futures_core::{ready, stream::Stream};
13
14use crate::{
15 dev::Payload,
16 error::{PayloadError, ReadlinesError},
17 HttpMessage,
18};
19
20pub struct Readlines<T: HttpMessage> {
22 stream: Payload<T::Stream>,
23 buf: BytesMut,
24 limit: usize,
25 checked_buff: bool,
26 encoding: &'static Encoding,
27 err: Option<ReadlinesError>,
28}
29
30impl<T> Readlines<T>
31where
32 T: HttpMessage,
33 T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
34{
35 pub fn new(req: &mut T) -> Self {
37 let encoding = match req.encoding() {
38 Ok(enc) => enc,
39 Err(err) => return Self::err(err.into()),
40 };
41
42 Readlines {
43 stream: req.take_payload(),
44 buf: BytesMut::with_capacity(262_144),
45 limit: 262_144,
46 checked_buff: true,
47 err: None,
48 encoding,
49 }
50 }
51
52 pub fn limit(mut self, limit: usize) -> Self {
54 self.limit = limit;
55 self
56 }
57
58 fn err(err: ReadlinesError) -> Self {
59 Readlines {
60 stream: Payload::None,
61 buf: BytesMut::new(),
62 limit: 262_144,
63 checked_buff: true,
64 encoding: UTF_8,
65 err: Some(err),
66 }
67 }
68
69 fn decode(encoding: &'static Encoding, bytes: &[u8]) -> Result<String, ReadlinesError> {
74 if encoding == UTF_8 {
75 str::from_utf8(bytes)
76 .map_err(|_| ReadlinesError::EncodingError)
77 .map(str::to_owned)
78 } else {
79 encoding
80 .decode_without_bom_handling_and_without_replacement(bytes)
81 .map(Cow::into_owned)
82 .ok_or(ReadlinesError::EncodingError)
83 }
84 }
85}
86
87impl<T> Stream for Readlines<T>
88where
89 T: HttpMessage,
90 T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
91{
92 type Item = Result<String, ReadlinesError>;
93
94 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let this = self.get_mut();
96
97 if let Some(err) = this.err.take() {
98 return Poll::Ready(Some(Err(err)));
99 }
100
101 if !this.checked_buff {
103 let mut found: Option<usize> = None;
104 for (ind, b) in this.buf.iter().enumerate() {
105 if *b == b'\n' {
106 found = Some(ind);
107 break;
108 }
109 }
110 if let Some(ind) = found {
111 if ind + 1 > this.limit {
113 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
114 }
115 let line = Self::decode(this.encoding, &this.buf.split_to(ind + 1))?;
116 return Poll::Ready(Some(Ok(line)));
117 }
118 this.checked_buff = true;
119 }
120
121 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
123 Some(Ok(mut bytes)) => {
124 let mut found: Option<usize> = None;
126 for (ind, b) in bytes.iter().enumerate() {
127 if *b == b'\n' {
128 found = Some(ind);
129 break;
130 }
131 }
132 if let Some(ind) = found {
133 if this.buf.len() + ind + 1 > this.limit {
135 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
136 }
137
138 this.buf.extend_from_slice(&bytes.split_to(ind + 1));
139 let line = Self::decode(this.encoding, &this.buf)?;
140 this.buf.clear();
141
142 this.buf.extend_from_slice(&bytes);
144 this.checked_buff = this.buf.is_empty();
145 return Poll::Ready(Some(Ok(line)));
146 }
147 this.buf.extend_from_slice(&bytes);
148 Poll::Pending
149 }
150
151 None => {
152 if this.buf.is_empty() {
153 return Poll::Ready(None);
154 }
155 if this.buf.len() > this.limit {
156 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
157 }
158 let line = Self::decode(this.encoding, &this.buf)?;
159 this.buf.clear();
160 Poll::Ready(Some(Ok(line)))
161 }
162
163 Some(Err(err)) => Poll::Ready(Some(Err(ReadlinesError::from(err)))),
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use std::{
171 pin::Pin,
172 task::{Context, Poll},
173 };
174
175 use actix_http::{h1, Request};
176 use futures_util::{task::noop_waker_ref, StreamExt as _};
177
178 use super::*;
179 use crate::{error::ReadlinesError, test::TestRequest};
180
181 #[actix_rt::test]
182 async fn test_readlines() {
183 let mut req = TestRequest::default()
184 .set_payload(Bytes::from_static(
185 b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
186 industry. Lorem Ipsum has been the industry's standard dummy\n\
187 Contrary to popular belief, Lorem Ipsum is not simply random text.",
188 ))
189 .to_request();
190
191 let mut stream = Readlines::new(&mut req);
192 assert_eq!(
193 stream.next().await.unwrap().unwrap(),
194 "Lorem Ipsum is simply dummy text of the printing and typesetting\n"
195 );
196
197 assert_eq!(
198 stream.next().await.unwrap().unwrap(),
199 "industry. Lorem Ipsum has been the industry's standard dummy\n"
200 );
201
202 assert_eq!(
203 stream.next().await.unwrap().unwrap(),
204 "Contrary to popular belief, Lorem Ipsum is not simply random text."
205 );
206 }
207
208 #[test]
209 fn test_readlines_limit_across_chunks() {
210 let (mut sender, payload) = h1::Payload::create(false);
211 let payload: actix_http::Payload = payload.into();
212 let mut req = Request::with_payload(payload);
213 let mut stream = Readlines::new(&mut req).limit(10);
214 let mut cx = Context::from_waker(noop_waker_ref());
215
216 sender.feed_data(Bytes::from_static(b"AAAAAAAAAA"));
217 assert!(matches!(
218 Pin::new(&mut stream).poll_next(&mut cx),
219 Poll::Pending
220 ));
221
222 sender.feed_data(Bytes::from_static(b"A\n"));
223 assert!(matches!(
224 Pin::new(&mut stream).poll_next(&mut cx),
225 Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)))
226 ));
227 }
228
229 #[test]
230 fn test_readlines_returns_full_line_across_chunks() {
231 let (mut sender, payload) = h1::Payload::create(false);
232 let payload: actix_http::Payload = payload.into();
233 let mut req = Request::with_payload(payload);
234 let mut stream = Readlines::new(&mut req);
235 let mut cx = Context::from_waker(noop_waker_ref());
236
237 sender.feed_data(Bytes::from_static(b"hello "));
238 assert!(matches!(
239 Pin::new(&mut stream).poll_next(&mut cx),
240 Poll::Pending
241 ));
242
243 sender.feed_data(Bytes::from_static(b"world\nnext"));
244 assert!(matches!(
245 Pin::new(&mut stream).poll_next(&mut cx),
246 Poll::Ready(Some(Ok(ref line))) if line == "hello world\n"
247 ));
248 }
249}