1use crate::error::DecodeError;
2use futures_core::{ready, Stream};
3use futures_io::AsyncRead;
4use pin_project_lite::pin_project;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8const DEFAULT_BUF_SIZE: usize = 8 * 1024;
9const MINIMUM_BUF_SIZE: usize = 4; pub type Result<T> = std::result::Result<T, DecodeError>;
12
13pin_project! {
14 pub struct Utf8Decoder<R> {
15 #[pin]
16 reader: R,
17 buf: Box<[u8]>,
18 remains: usize,
19 }
20}
21
22impl<R> Utf8Decoder<R> {
23 pub fn new(reader: R) -> Self {
25 Utf8Decoder::with_capacity(DEFAULT_BUF_SIZE, reader)
26 }
27
28 pub fn with_capacity(capacity: usize, reader: R) -> Self {
30 debug_assert!(
31 capacity >= MINIMUM_BUF_SIZE,
32 "capacity must be at least {} but {} is specified",
33 MINIMUM_BUF_SIZE,
34 capacity,
35 );
36 let buffer = vec![0; capacity];
37 Self {
38 reader,
39 buf: buffer.into_boxed_slice(),
40 remains: 0,
41 }
42 }
43
44 pub fn into_inner(self) -> R {
46 self.reader
47 }
48
49 pub fn get_ref(&self) -> &R {
52 &self.reader
53 }
54
55 pub fn get_mut(&mut self) -> &mut R {
58 &mut self.reader
59 }
60}
61
62impl<R> Stream for Utf8Decoder<R>
63where
64 R: AsyncRead + Unpin,
65{
66 type Item = Result<String>;
67
68 fn poll_next(
69 self: Pin<&mut Self>,
70 cx: &mut Context<'_>,
71 ) -> Poll<Option<<Self as Stream>::Item>> {
72 let mut this = self.project();
73 let buf = this.buf;
74 loop {
75 let remains = *this.remains;
76 let reader = this.reader.as_mut();
77 match ready!(decode_next(reader, cx, buf, remains)) {
78 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
79 Some(Ok((decoded, remains))) => {
80 *this.remains = remains;
81 if decoded.is_empty() {
82 continue;
83 }
84 return Poll::Ready(Some(Ok(decoded)));
85 }
86 None => {
87 if remains > 0 {
88 let remains = buf[..remains].to_vec();
89 let err = DecodeError::IncompleteUtf8Sequence(remains);
90 return Poll::Ready(Some(Err(err)));
91 }
92 return Poll::Ready(None);
93 }
94 }
95 }
96 }
97}
98
99fn decode_next<'a, R>(
100 reader: Pin<&mut R>,
101 cx: &mut Context<'_>,
102 buf: &'a mut [u8],
103 s: usize,
104) -> Poll<Option<Result<(String, usize)>>>
105where
106 R: AsyncRead,
107{
108 debug_assert!(buf.len() > s);
109 let n = ready!(reader.poll_read(cx, &mut buf[s..]))?;
110 if n == 0 {
112 return Poll::Ready(None);
113 }
114 let e = s + n;
115 debug_assert!(buf.len() >= e);
116 let result = match std::str::from_utf8(&buf[..e]) {
117 Ok(decoded) => Ok((decoded.to_string(), 0)),
118 Err(err) => match err.error_len() {
119 Some(_) => {
120 Err(err.into())
123 }
124 None => {
125 let (valid, after_valid) = buf.split_at(err.valid_up_to());
128 let decoded = unsafe { std::str::from_utf8_unchecked(valid) };
130 let decoded = decoded.to_string();
131 let remains = e - valid.len();
133 unsafe {
134 std::ptr::copy(after_valid.as_ptr(), buf.as_mut_ptr(), remains);
154 }
155 Ok((decoded, remains))
156 }
157 },
158 };
159 Poll::Ready(Some(result))
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use anyhow::Result;
166 use futures::channel::mpsc;
167 use futures::io;
168 use futures::prelude::*;
169
170 async fn timeout<T>(future: impl Future<Output = T> + Unpin) -> Result<T> {
171 let result =
172 async_std::future::timeout(std::time::Duration::from_millis(100), future).await?;
173 Ok(result)
174 }
175
176 #[async_std::test]
177 async fn decoder_decode_demo() -> Result<()> {
178 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
179 let mut decoder = Utf8Decoder::new(rx.into_async_read());
180
181 tx.send(Ok(vec![240])).await?;
182 assert!(timeout(decoder.next()).await.is_err());
183 tx.send(Ok(vec![159])).await?;
184 assert!(timeout(decoder.next()).await.is_err());
185 tx.send(Ok(vec![146])).await?;
186 assert!(timeout(decoder.next()).await.is_err());
187 tx.send(Ok(vec![150])).await?;
188 assert_eq!("💖", timeout(decoder.next()).await?.unwrap()?);
189 assert!(timeout(decoder.next()).await.is_err());
190
191 Ok(())
192 }
193
194 #[async_std::test]
195 async fn decoder_decode_background() -> Result<()> {
196 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
197 let mut decoder = Utf8Decoder::new(rx.into_async_read());
198
199 let consumer = async_std::task::spawn(async move { decoder.next().await });
200 tx.send(Ok(vec![240])).await?;
201 tx.send(Ok(vec![159])).await?;
202 tx.send(Ok(vec![146])).await?;
203 tx.send(Ok(vec![150])).await?;
204 assert_eq!("💖", timeout(consumer).await?.unwrap()?);
205
206 Ok(())
207 }
208
209 #[async_std::test]
210 async fn decoder_decode_1byte_character() -> Result<()> {
211 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
212 let mut decoder = Utf8Decoder::new(rx.into_async_read());
213
214 tx.send(Ok(vec![0x24])).await?;
215 assert_eq!("\u{0024}", timeout(decoder.next()).await?.unwrap()?);
216 assert!(timeout(decoder.next()).await.is_err());
217
218 Ok(())
219 }
220
221 #[async_std::test]
222 async fn decoder_decode_2byte_character() -> Result<()> {
223 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
224 let mut decoder = Utf8Decoder::new(rx.into_async_read());
225
226 tx.send(Ok(vec![0xC2, 0xA2])).await?;
228 assert_eq!("\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
229 assert!(timeout(decoder.next()).await.is_err());
230
231 tx.send(Ok(vec![0xC2])).await?;
233 assert!(timeout(decoder.next()).await.is_err());
234 tx.send(Ok(vec![0xA2])).await?;
235 assert_eq!("\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
236 assert!(timeout(decoder.next()).await.is_err());
237
238 Ok(())
239 }
240
241 #[async_std::test]
242 async fn decoder_decode_3byte_character() -> Result<()> {
243 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
244 let mut decoder = Utf8Decoder::new(rx.into_async_read());
245
246 tx.send(Ok(vec![0xE0, 0xA4, 0xB9])).await?;
248 assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
249 assert!(timeout(decoder.next()).await.is_err());
250
251 tx.send(Ok(vec![0xE0])).await?;
253 assert!(timeout(decoder.next()).await.is_err());
254 tx.send(Ok(vec![0xA4])).await?;
255 assert!(timeout(decoder.next()).await.is_err());
256 tx.send(Ok(vec![0xB9])).await?;
257 assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
258 assert!(timeout(decoder.next()).await.is_err());
259
260 Ok(())
261 }
262
263 #[async_std::test]
264 async fn decoder_decode_4byte_character() -> Result<()> {
265 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
266 let mut decoder = Utf8Decoder::new(rx.into_async_read());
267
268 tx.send(Ok(vec![0xF0, 0x90, 0x8D, 0x88])).await?;
270 assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
271 assert!(timeout(decoder.next()).await.is_err());
272
273 tx.send(Ok(vec![0xF0])).await?;
275 assert!(timeout(decoder.next()).await.is_err());
276 tx.send(Ok(vec![0x90])).await?;
277 assert!(timeout(decoder.next()).await.is_err());
278 tx.send(Ok(vec![0x8D])).await?;
279 assert!(timeout(decoder.next()).await.is_err());
280 tx.send(Ok(vec![0x88])).await?;
281 assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
282 assert!(timeout(decoder.next()).await.is_err());
283
284 Ok(())
285 }
286
287 #[async_std::test]
288 async fn decoder_decode_ok() -> Result<()> {
289 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
290 let mut decoder = Utf8Decoder::new(rx.into_async_read());
291
292 tx.send(Ok(vec![
293 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
294 ]))
295 .await?;
296 tx.send(Ok(vec![
297 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
298 ]))
299 .await?;
300 tx.send(Ok(vec![
301 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
302 ]))
303 .await?;
304 assert_eq!(
305 "\u{0024}\u{00A2}\u{0939}\u{10348}",
306 timeout(decoder.next()).await?.unwrap()?
307 );
308 assert_eq!(
309 "\u{0024}\u{00A2}\u{0939}\u{10348}",
310 timeout(decoder.next()).await?.unwrap()?
311 );
312 assert_eq!(
313 "\u{0024}\u{00A2}\u{0939}\u{10348}",
314 timeout(decoder.next()).await?.unwrap()?
315 );
316 assert!(timeout(decoder.next()).await.is_err());
317
318 Ok(())
319 }
320
321 #[async_std::test]
322 async fn decoder_decode_ok_with_minimum_capacity() -> Result<()> {
323 let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
324 let mut decoder = Utf8Decoder::with_capacity(MINIMUM_BUF_SIZE, rx.into_async_read());
325
326 tx.send(Ok(vec![
328 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
329 ]))
330 .await?;
331 tx.send(Ok(vec![
332 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
333 ]))
334 .await?;
335 tx.send(Ok(vec![
336 0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
337 ]))
338 .await?;
339 assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
340 assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
341 assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
342 assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
343 assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
344 assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
345 assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
346 assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
347 assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
348 assert!(timeout(decoder.next()).await.is_err());
349
350 Ok(())
351 }
352}