async_utf8_decoder/
decoder.rs

1use crate::error::DecodeError;
2use futures_core::{ready, Stream};
3use futures_io::AsyncRead;
4use pin_project_lite::pin_project;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8const DEFAULT_BUF_SIZE: usize = 8 * 1024;
9const MINIMUM_BUF_SIZE: usize = 4; // Maximum utf-8 character byte length
10
11pub type Result<T> = std::result::Result<T, DecodeError>;
12
13pin_project! {
14    pub struct Utf8Decoder<R> {
15        #[pin]
16        reader: R,
17        buf: Box<[u8]>,
18        remains: usize,
19    }
20}
21
22impl<R> Utf8Decoder<R> {
23    /// Create a new incremental UTF-8 decoder from `reader`
24    pub fn new(reader: R) -> Self {
25        Utf8Decoder::with_capacity(DEFAULT_BUF_SIZE, reader)
26    }
27
28    /// Create a new incremental UTF-8 decoder from `reader` with specified capacity
29    pub fn with_capacity(capacity: usize, reader: R) -> Self {
30        debug_assert!(
31            capacity >= MINIMUM_BUF_SIZE,
32            "capacity must be at least {} but {} is specified",
33            MINIMUM_BUF_SIZE,
34            capacity,
35        );
36        let buffer = vec![0; capacity];
37        Self {
38            reader,
39            buf: buffer.into_boxed_slice(),
40            remains: 0,
41        }
42    }
43
44    /// Consumes this decoder, returning the underlying reader.
45    pub fn into_inner(self) -> R {
46        self.reader
47    }
48
49    /// Acquires a reference to the underlying reader that this
50    /// decoder is pulling from.
51    pub fn get_ref(&self) -> &R {
52        &self.reader
53    }
54
55    /// Acquires a mutable reference to the underlying reader that
56    /// this decoder is pulling from.
57    pub fn get_mut(&mut self) -> &mut R {
58        &mut self.reader
59    }
60}
61
62impl<R> Stream for Utf8Decoder<R>
63where
64    R: AsyncRead + Unpin,
65{
66    type Item = Result<String>;
67
68    fn poll_next(
69        self: Pin<&mut Self>,
70        cx: &mut Context<'_>,
71    ) -> Poll<Option<<Self as Stream>::Item>> {
72        let mut this = self.project();
73        let buf = this.buf;
74        loop {
75            let remains = *this.remains;
76            let reader = this.reader.as_mut();
77            match ready!(decode_next(reader, cx, buf, remains)) {
78                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
79                Some(Ok((decoded, remains))) => {
80                    *this.remains = remains;
81                    if decoded.is_empty() {
82                        continue;
83                    }
84                    return Poll::Ready(Some(Ok(decoded)));
85                }
86                None => {
87                    if remains > 0 {
88                        let remains = buf[..remains].to_vec();
89                        let err = DecodeError::IncompleteUtf8Sequence(remains);
90                        return Poll::Ready(Some(Err(err)));
91                    }
92                    return Poll::Ready(None);
93                }
94            }
95        }
96    }
97}
98
99fn decode_next<'a, R>(
100    reader: Pin<&mut R>,
101    cx: &mut Context<'_>,
102    buf: &'a mut [u8],
103    s: usize,
104) -> Poll<Option<Result<(String, usize)>>>
105where
106    R: AsyncRead,
107{
108    debug_assert!(buf.len() > s);
109    let n = ready!(reader.poll_read(cx, &mut buf[s..]))?;
110    // The upstream is closed
111    if n == 0 {
112        return Poll::Ready(None);
113    }
114    let e = s + n;
115    debug_assert!(buf.len() >= e);
116    let result = match std::str::from_utf8(&buf[..e]) {
117        Ok(decoded) => Ok((decoded.to_string(), 0)),
118        Err(err) => match err.error_len() {
119            Some(_) => {
120                // An unexpected byte was encounted. While this decoder is not
121                // lossy decoding, return the error itself and stop decoding.
122                Err(err.into())
123            }
124            None => {
125                // The end of the input was reached unexpectedly. This is what
126                // this decoder exists for.
127                let (valid, after_valid) = buf.split_at(err.valid_up_to());
128                // Copy 'valid' into the Heap as String
129                let decoded = unsafe { std::str::from_utf8_unchecked(valid) };
130                let decoded = decoded.to_string();
131                // Copy 'after_valid' at the front of the 'buf'
132                let remains = e - valid.len();
133                unsafe {
134                    // +-------------------------------------------------------------+
135                    // |                            buf                              |
136                    // +----------------+--------------------------------------------+
137                    // |     valid      | after_valid                                |
138                    // +----------------+--------------------------------------------+
139                    // |////////////////|#####.......................................|
140                    // +----------------+--------------------------------------------+
141                    //                               |
142                    //                               v
143                    // +-------------------------------------------------------------+
144                    // |                            buf                              |
145                    // +----------------+--------------------------------------------+
146                    // |     valid      | after_valid                                |
147                    // +----------------+--------------------------------------------+
148                    // |#####...........|............................                |
149                    // +----------------+--------------------------------------------+
150                    //
151                    // XXX: Can we use 'copy_nonoverlapping' here?
152                    // std::ptr::copy_nonoverlapping(after_valid.as_ptr(), buf.as_mut_ptr(), remains);
153                    std::ptr::copy(after_valid.as_ptr(), buf.as_mut_ptr(), remains);
154                }
155                Ok((decoded, remains))
156            }
157        },
158    };
159    Poll::Ready(Some(result))
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use anyhow::Result;
166    use futures::channel::mpsc;
167    use futures::io;
168    use futures::prelude::*;
169
170    async fn timeout<T>(future: impl Future<Output = T> + Unpin) -> Result<T> {
171        let result =
172            async_std::future::timeout(std::time::Duration::from_millis(100), future).await?;
173        Ok(result)
174    }
175
176    #[async_std::test]
177    async fn decoder_decode_demo() -> Result<()> {
178        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
179        let mut decoder = Utf8Decoder::new(rx.into_async_read());
180
181        tx.send(Ok(vec![240])).await?;
182        assert!(timeout(decoder.next()).await.is_err());
183        tx.send(Ok(vec![159])).await?;
184        assert!(timeout(decoder.next()).await.is_err());
185        tx.send(Ok(vec![146])).await?;
186        assert!(timeout(decoder.next()).await.is_err());
187        tx.send(Ok(vec![150])).await?;
188        assert_eq!("💖", timeout(decoder.next()).await?.unwrap()?);
189        assert!(timeout(decoder.next()).await.is_err());
190
191        Ok(())
192    }
193
194    #[async_std::test]
195    async fn decoder_decode_background() -> Result<()> {
196        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
197        let mut decoder = Utf8Decoder::new(rx.into_async_read());
198
199        let consumer = async_std::task::spawn(async move { decoder.next().await });
200        tx.send(Ok(vec![240])).await?;
201        tx.send(Ok(vec![159])).await?;
202        tx.send(Ok(vec![146])).await?;
203        tx.send(Ok(vec![150])).await?;
204        assert_eq!("💖", timeout(consumer).await?.unwrap()?);
205
206        Ok(())
207    }
208
209    #[async_std::test]
210    async fn decoder_decode_1byte_character() -> Result<()> {
211        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
212        let mut decoder = Utf8Decoder::new(rx.into_async_read());
213
214        tx.send(Ok(vec![0x24])).await?;
215        assert_eq!("\u{0024}", timeout(decoder.next()).await?.unwrap()?);
216        assert!(timeout(decoder.next()).await.is_err());
217
218        Ok(())
219    }
220
221    #[async_std::test]
222    async fn decoder_decode_2byte_character() -> Result<()> {
223        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
224        let mut decoder = Utf8Decoder::new(rx.into_async_read());
225
226        // Complete
227        tx.send(Ok(vec![0xC2, 0xA2])).await?;
228        assert_eq!("\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
229        assert!(timeout(decoder.next()).await.is_err());
230
231        // Incremental
232        tx.send(Ok(vec![0xC2])).await?;
233        assert!(timeout(decoder.next()).await.is_err());
234        tx.send(Ok(vec![0xA2])).await?;
235        assert_eq!("\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
236        assert!(timeout(decoder.next()).await.is_err());
237
238        Ok(())
239    }
240
241    #[async_std::test]
242    async fn decoder_decode_3byte_character() -> Result<()> {
243        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
244        let mut decoder = Utf8Decoder::new(rx.into_async_read());
245
246        // Complete
247        tx.send(Ok(vec![0xE0, 0xA4, 0xB9])).await?;
248        assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
249        assert!(timeout(decoder.next()).await.is_err());
250
251        // Incremental
252        tx.send(Ok(vec![0xE0])).await?;
253        assert!(timeout(decoder.next()).await.is_err());
254        tx.send(Ok(vec![0xA4])).await?;
255        assert!(timeout(decoder.next()).await.is_err());
256        tx.send(Ok(vec![0xB9])).await?;
257        assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
258        assert!(timeout(decoder.next()).await.is_err());
259
260        Ok(())
261    }
262
263    #[async_std::test]
264    async fn decoder_decode_4byte_character() -> Result<()> {
265        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
266        let mut decoder = Utf8Decoder::new(rx.into_async_read());
267
268        // Complete
269        tx.send(Ok(vec![0xF0, 0x90, 0x8D, 0x88])).await?;
270        assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
271        assert!(timeout(decoder.next()).await.is_err());
272
273        // Incremental
274        tx.send(Ok(vec![0xF0])).await?;
275        assert!(timeout(decoder.next()).await.is_err());
276        tx.send(Ok(vec![0x90])).await?;
277        assert!(timeout(decoder.next()).await.is_err());
278        tx.send(Ok(vec![0x8D])).await?;
279        assert!(timeout(decoder.next()).await.is_err());
280        tx.send(Ok(vec![0x88])).await?;
281        assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
282        assert!(timeout(decoder.next()).await.is_err());
283
284        Ok(())
285    }
286
287    #[async_std::test]
288    async fn decoder_decode_ok() -> Result<()> {
289        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
290        let mut decoder = Utf8Decoder::new(rx.into_async_read());
291
292        tx.send(Ok(vec![
293            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
294        ]))
295        .await?;
296        tx.send(Ok(vec![
297            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
298        ]))
299        .await?;
300        tx.send(Ok(vec![
301            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
302        ]))
303        .await?;
304        assert_eq!(
305            "\u{0024}\u{00A2}\u{0939}\u{10348}",
306            timeout(decoder.next()).await?.unwrap()?
307        );
308        assert_eq!(
309            "\u{0024}\u{00A2}\u{0939}\u{10348}",
310            timeout(decoder.next()).await?.unwrap()?
311        );
312        assert_eq!(
313            "\u{0024}\u{00A2}\u{0939}\u{10348}",
314            timeout(decoder.next()).await?.unwrap()?
315        );
316        assert!(timeout(decoder.next()).await.is_err());
317
318        Ok(())
319    }
320
321    #[async_std::test]
322    async fn decoder_decode_ok_with_minimum_capacity() -> Result<()> {
323        let (mut tx, rx) = mpsc::unbounded::<io::Result<Vec<u8>>>();
324        let mut decoder = Utf8Decoder::with_capacity(MINIMUM_BUF_SIZE, rx.into_async_read());
325
326        // Complete
327        tx.send(Ok(vec![
328            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
329        ]))
330        .await?;
331        tx.send(Ok(vec![
332            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
333        ]))
334        .await?;
335        tx.send(Ok(vec![
336            0x24, 0xC2, 0xA2, 0xE0, 0xA4, 0xB9, 0xF0, 0x90, 0x8D, 0x88,
337        ]))
338        .await?;
339        assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
340        assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
341        assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
342        assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
343        assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
344        assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
345        assert_eq!("\u{0024}\u{00A2}", timeout(decoder.next()).await?.unwrap()?);
346        assert_eq!("\u{0939}", timeout(decoder.next()).await?.unwrap()?);
347        assert_eq!("\u{10348}", timeout(decoder.next()).await?.unwrap()?);
348        assert!(timeout(decoder.next()).await.is_err());
349
350        Ok(())
351    }
352}