json_escape/
stream.rs

1//! A high-performance, allocation-free, streaming JSON string unescaper.
2//!
3//! This module provides utilities to unescape JSON strings, with a focus on
4//! performance and flexibility. It is designed to work with data sources that
5//! deliver content in chunks, such as network sockets or file readers, without
6//! requiring heap allocations or holding onto previous chunks.
7//!
8//! # Key Features
9//!
10//! - **Streaming Unescaping**: The main type, [`UnescapeStream`], processes byte
11//!   slices incrementally. This is ideal for I/O-bound applications.
12//! - **Zero Heap Allocations**: The entire process occurs on the stack, using a
13//!   small internal buffer to "stitch" together escape sequences that are split
14//!   across chunk boundaries.
15//! - **Data-Source Agnostic**: The API uses a "push" model. You provide
16//!   byte slices to the unescaper as you receive them, allowing the caller to
17//!   reuse their input buffers.
18//! - **Robust Error Handling**: Reports detailed errors, including the position
19//!   and kind of failure.
20//!
21//! # How It Works
22//!
23//! The core of the streaming logic is the [`UnescapeStream`] struct. When you
24//! process a slice using [`unescape_next`](UnescapeStream::unescape_next), it returns
25//! a tuple containing two parts:
26//!
27//! 1.  An `Option<Result<char, UnescapeError>>`: This handles the "continuity"
28//!     between the previous slice and the current one. It will be `Some(_)` only if
29//!     the previous slice ended with an incomplete escape sequence that was
30//!     resolved by the start of the new slice. The `Result` will contain the
31//!     unescaped character on success or an error if the combined bytes form an
32//!     invalid sequence.
33//! 2.  An `UnescapeNext` iterator: This iterator yields the unescaped parts
34//!     for the remainder of the current slice.
35//!
36//! After processing all slices, you **must** call [`finish`](UnescapeStream::finish) to check
37//! for any leftover partial escape sequences, which would indicate a malformed
38//! JSON string at the end of the stream.
39//!
40//! # Example
41//!
42//! ```rust
43//! use json_escape::{stream::UnescapeStream, token::UnescapedToken};
44//!
45//! fn main() -> Result<(), Box<dyn std::error::Error>> {
46//!     // A JSON string split into multiple parts.
47//!     // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
48//!     let parts = vec![
49//!         br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
50//!         br#"\uDE00"}"#.as_slice(),
51//!     ];
52//!
53//!     let mut unescaper = UnescapeStream::new();
54//!     let mut unescaped_string = String::new();
55//!
56//!     for part in parts {
57//!         // Process the next part of the stream.
58//!         let (boundary_char, rest_of_part) = unescaper.try_unescape_next(part)?;
59//!
60//!         // 1. Handle the character that may have spanned the boundary.
61//!         if let Some(boundary_char) = boundary_char {
62//!             unescaped_string.push(boundary_char);
63//!         }
64//!
65//!         // 2. Process the rest of the current part.
66//!         for result in rest_of_part {
67//!             let unescaped_part = result?;
68//!             match unescaped_part {
69//!                 UnescapedToken::Literal(literal) => {
70//!                     unescaped_string.push_str(std::str::from_utf8(literal)?)
71//!                 }
72//!                 UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
73//!             }
74//!         }
75//!     }
76//!
77//!     // IMPORTANT: Always call finish() to detect errors at the end of the stream.
78//!     unescaper.finish()?;
79//!
80//!     assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
81//!
82//!     Ok(())
83//! }
84//! ```
85
86use core::convert::Infallible;
87#[cfg(feature = "std")]
88use std::vec::Vec;
89
90use crate::{
91    UnescapeError, UnescapeErrorKind,
92    token::{UnescapeTokens, UnescapedToken, unescape},
93};
94
95/// Drives an `UnescapeStream` with runtime-agnostic I/O expressions.
96///
97/// This macro provides an ergonomic, high-level wrapper for processing a stream
98/// of JSON string bytes. It hides the manual boilerplate of looping, calling
99/// `try_unescape_next`, and handling boundary conditions, and automatically calls
100/// `finish()` at the end.
101///
102/// # Rationale
103///
104/// The key feature of this macro is that it is **runtime-agnostic**.
105/// Because the `Read` and `Write` arguments are expressions, you can use
106/// this macro identically in both synchronous functions (e.g., with `std::io::Read`)
107/// and asynchronous functions (e.g., with `tokio::io::AsyncRead` and `.await`).
108///
109/// It avoids the "function coloring" problem, where you would otherwise need
110/// separate `unescape_sync` and `unescape_async` helper functions.
111///
112/// # Parameters
113///
114/// - `Read: { ... }`: An expression that provides the next chunk of byte data.
115///   This expression **must** evaluate to a value compatible with `Option<impl AsRef<[u8]>>`.
116///   - `Some(bytes)` provides the next chunk to the unescaper.
117///   - `None` signals the end of the stream (EOF).
118///
119/// - `Write: |$token| { ... }`: A closure-like expression that processes an
120///   `UnescapedToken`. The token will be available under the identifier you provide
121///   (e.g., `$token`).
122///
123/// # Examples
124///
125/// ### Synchronous Example (from an iterator)
126///
127/// ```rust
128/// use json_escape::stream::{unescape_stream_into, UnescapeStream};
129/// use json_escape::token::UnescapedToken;
130/// use std::str;
131///
132/// fn sync_stream() -> Result<(), Box<dyn std::error::Error>> {
133///     // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
134///     let mut parts = vec![
135///         br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
136///         br#"\uDE00"}"#.as_slice(),
137///     ]
138///     .into_iter();
139///
140///     let mut unescaped_string = String::new();
141///
142///     unescape_stream_into! {
143///         Read: {
144///             parts.next() // This is sync
145///         },
146///         Write: |token| {
147///             match token { // This is sync
148///                  UnescapedToken::Literal(literal) => {
149///                      unescaped_string.push_str(str::from_utf8(literal)?)
150///                  }
151///                  UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
152///              }
153///         },
154///     };
155///
156///     assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
157///     Ok(())
158/// }
159///
160/// sync_stream().unwrap();
161/// ```
162///
163/// ### Asynchronous Example (from an async stream)
164///
165/// ```rust
166/// use json_escape::stream::{unescape_stream_into, UnescapeStream};
167/// use json_escape::token::UnescapedToken;
168/// use std::str;
169///
170/// // A simple async iterator for the example
171/// struct AsyncIter<'a> {
172///     iter: <Vec<&'a [u8]> as IntoIterator>::IntoIter,
173/// }
174/// impl<'a> AsyncIter<'a> {
175///     async fn next(&mut self) -> Option<&'a [u8]> {
176///         self.iter.next()
177///     }
178/// }
179///
180/// // A simple async writer for the example
181/// struct AsyncWrite {
182///     write: String,
183/// }
184/// impl AsyncWrite {
185///     async fn write(
186///         &mut self,
187///         token: UnescapedToken<'_>,
188///     ) -> Result<(), Box<dyn std::error::Error>> {
189///         match token {
190///             UnescapedToken::Literal(literal) => {
191///                 self.write.push_str(str::from_utf8(literal)?)
192///             }
193///             UnescapedToken::Unescaped(ch) => self.write.push(ch),
194///         }
195///         Ok(())
196///     }
197/// }
198///
199/// async fn async_stream() -> Result<(), Box<dyn std::error::Error>> {
200///     let mut parts = AsyncIter {
201///         iter: vec![
202///             br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
203///             br#"\uDE00"}"#.as_slice(),
204///         ]
205///         .into_iter(),
206///     };
207///
208///     let mut unescaped_string = AsyncWrite {
209///         write: String::new(),
210///     };
211///
212///     unescape_stream_into! {
213///         Read: {
214///             parts.next().await // This is async
215///         },
216///         Write: |token| {
217///             unescaped_string.write(token).await? // This is async
218///         },
219///     };
220///
221///     assert_eq!(
222///         unescaped_string.write,
223///         r#"{"message": "Hello, W"orld! 😀"}"#
224///     );
225///     Ok(())
226/// }
227/// ```
228#[macro_export]
229macro_rules! unescape_stream_into {
230    (
231        $Read:ident: $read:expr,
232        $Write:ident: |$token:ident| $write:expr,
233    ) => {
234        // Made them ident so lsp would make doc appear for help
235
236        // ensure the examples compile
237
238        /// Provide the read op (async/sync) as an expression resulting in Option<impl AsRef<[u8]>>
239        /// like:
240        /// ```rust
241        /// # use json_escape::stream::unescape_stream_into;
242        /// # fn dummy() -> Result<(), Box<dyn std::error::Error>> {
243        /// # let mut src = std::io::Cursor::new(b"");
244        /// # let buf = &mut [0u8; 10];
245        /// # use std::io::Read;
246        /// unescape_stream_into! {
247        ///     Read: {
248        ///         // perform read... like
249        ///         let read = src.read(buf)?;
250        ///         if read > 0 {
251        ///             Some(&buf[..read])
252        ///         } else {
253        ///             None
254        ///         }
255        ///     },
256        ///     Write: |token| {
257        ///         // ...
258        ///     }
259        /// };
260        /// # Ok(())
261        /// # }
262        /// ```
263        /// Since we're in macro, the op can return error and other things the surrounding function can catch
264        #[allow(dead_code)]
265        #[allow(non_snake_case)]
266        fn $Read() -> Option<impl AsRef<[u8]>> {
267            unimplemented!();
268            #[allow(unreachable_code)]
269            Some(b"")
270        }
271
272        /// Provide the write op (async/sync) as an expression taking UnescapedToken<'_>
273        /// like:
274        /// ```rust
275        /// # use json_escape::stream::unescape_stream_into;
276        /// unescape_stream_into! {
277        ///     Read: {
278        ///         // ...
279        ///         None::<&[u8]>
280        ///     },
281        ///     Write: |token| {
282        ///         println!("{token:#?}")
283        ///     }
284        /// };
285        /// ```
286        #[allow(dead_code)]
287        #[allow(non_snake_case)]
288        #[allow(unused_variables)]
289        fn $Write($token: $crate::token::UnescapedToken<'_>) {
290            unimplemented!();
291        }
292
293        let mut unescaper = $crate::stream::UnescapeStream::new();
294        loop {
295            if let Some(part) = $read {
296                // Process the next part of the stream.
297                let (boundary_char, rest_of_part) = unescaper.try_unescape_next(part)?;
298
299                // 1. Handle the character that may have spanned the boundary.
300                if let Some(boundary_char) = boundary_char {
301                    let $token = $crate::token::UnescapedToken::Unescaped(boundary_char);
302                    $write
303                }
304
305                // 2. Process the rest of the current part.
306                for result in rest_of_part {
307                    let unescaped_part = result?;
308                    let $token = unescaped_part;
309                    $write
310                }
311            } else {
312                break;
313            }
314        }
315
316        // IMPORTANT: Always call finish() to detect errors at the end of the stream.
317        unescaper.finish()?;
318    };
319}
320
321/// A streaming JSON string unescaper that operates over byte slices.
322///
323/// This struct is the main entry point for streaming unescaping. It maintains
324/// a small internal buffer to handle escape sequences that are split across
325/// slice boundaries without requiring heap allocations.
326///
327/// See the [module-level documentation](self) for examples and more details.
328#[derive(Debug, Clone)]
329#[must_use = "UnescapeStream does nothing unless consumed"]
330pub struct UnescapeStream {
331    // A full surrogate pair escape `\uXXXX\uYYYY` is 12 bytes.
332    // This buffer is large enough to hold it and any other partial escape.
333    stitch_buf: [u8; 12],
334    /// The number of valid bytes in `stitch_buf`.
335    stitch_len: u8,
336}
337
338impl Default for UnescapeStream {
339    #[inline]
340    fn default() -> Self {
341        Self::new()
342    }
343}
344
345impl UnescapeStream {
346    /// Creates a new, empty `UnescapeStream`.
347    #[inline]
348    pub fn new() -> Self {
349        Self {
350            stitch_buf: [0; 12],
351            stitch_len: 0,
352        }
353    }
354
355    /// Processes the next byte slice, returning a fallible result.
356    ///
357    /// This is a convenience wrapper around [`UnescapeStream::unescape_next`]. Instead of returning
358    /// an `Option<Result<...>>`, it "hoists" a potential boundary error into
359    /// the main `Result`.
360    ///
361    /// This simplifies error handling, as you can use the `?` operator to handle
362    /// errors from both the boundary character and the rest of the stream.
363    ///
364    /// # Returns
365    ///
366    /// - `Ok((Option<char>, UnescapeNext))` on success. The `Option<char>` contains
367    ///   the successfully unescaped character from a boundary-spanning sequence.
368    /// - `Err(UnescapeError)` if completing a boundary-spanning sequence results
369    ///   in a parsing error.
370    #[inline]
371    pub fn try_unescape_next<'a, 'b, I: AsRef<[u8]> + ?Sized>(
372        &'a mut self,
373        next_part: &'b I,
374    ) -> Result<(Option<char>, UnescapeNext<'a, 'b>), UnescapeError> {
375        let (boundary_result, new) = self.unescape_next(next_part);
376        let boundary_char = boundary_result.transpose()?;
377
378        Ok((boundary_char, new))
379    }
380
381    /// Processes the next byte slice in the stream.
382    ///
383    /// This is the primary method for feeding data to the unescaper. It returns a tuple:
384    ///
385    /// 1.  An `Option<Result<char, UnescapeError>>` for the character that may have
386    ///     spanned the boundary from the *previous* slice. `None` if the previous
387    ///     slice ended cleanly.
388    /// 2.  An `UnescapeNext` iterator for the remainder of the *current* slice.
389    ///
390    /// If the current slice ends with an incomplete escape sequence, that partial data is
391    /// saved internally. It will be resolved on the next call to `unescape_next` again with
392    /// the subsequent slice or reported as an error by `finish`.
393    pub fn unescape_next<'a, 'b, I: AsRef<[u8]> + ?Sized>(
394        &'a mut self,
395        next_part: &'b I,
396    ) -> (Option<Result<char, UnescapeError>>, UnescapeNext<'a, 'b>) {
397        let mut next_part_slice = next_part.as_ref();
398        let boundary_char = if self.stitch_len > 0 {
399            // We have a partial escape from the previous chunk. Try to complete it.
400            let old_stitch_len = self.stitch_len as usize;
401
402            // Determine how many bytes to copy from the new chunk.
403            // We copy enough to fill our buffer, which is sized for the longest escape.
404            let needed = self.stitch_buf.len() - old_stitch_len;
405            let to_copy = needed.min(next_part_slice.len());
406
407            // Append bytes from the new chunk to our buffer.
408            self.stitch_buf[old_stitch_len..old_stitch_len + to_copy]
409                .copy_from_slice(&next_part_slice[..to_copy]);
410            self.stitch_len += to_copy as u8;
411
412            // Try to unescape the combined bytes.
413            let mut unescaper = unescape(&self.stitch_buf[..self.stitch_len as usize]);
414            let next = unescaper.next();
415
416            match next {
417                Some(Ok(token)) => {
418                    // Success! The stitched part was resolved.
419                    let total_consumed = (self.stitch_len as usize) - unescaper.remnant().len();
420                    let consumed_from_next = total_consumed - old_stitch_len;
421                    next_part_slice = &next_part_slice[consumed_from_next..];
422
423                    let unescaped_char = match token {
424                        UnescapedToken::Unescaped(c) => c,
425                        _ => unreachable!("unescaper should produce a char from an escape"),
426                    };
427
428                    // Clear the buffer and return the resolved character.
429                    self.stitch_len = 0;
430                    Some(Ok(unescaped_char))
431                }
432                Some(Err(err)) => {
433                    if err.kind == UnescapeErrorKind::UnexpectedEof {
434                        // Still not enough data. The new data was consumed into our
435                        // buffer but it wasn't enough to resolve the sequence.
436                        // We will wait for the next chunk.
437                        next_part_slice = &next_part_slice[to_copy..]; // Consume all we copied
438                        None // No character or error to report yet.
439                    } else {
440                        // A definitive error occurred.
441                        // To keep this function pure for a given `next_part`, we roll back
442                        // the change to `stitch_len` so that another call with the same
443                        // input will produce the same error. The bytes from `next_part_slice`
444                        // are treated as lookahead only and are not consumed.
445                        self.stitch_len = old_stitch_len as u8;
446                        Some(Err(err))
447                    }
448                }
449                None => {
450                    // This is unreachable because the buffer is non-empty. The unescaper
451                    // would either yield a chunk or an UnexpectedEof error.
452                    unreachable!();
453                }
454            }
455        } else {
456            // The previous chunk ended cleanly.
457            None
458        };
459
460        let iterator = UnescapeNext {
461            stream: self,
462            inner: UnescapeTokens::new(next_part_slice),
463        };
464
465        (boundary_char, iterator)
466    }
467
468    /// Finalizes the unescaping process, checking for leftover incomplete data.
469    ///
470    /// This method **must** be called after all slices have been processed. It checks
471    /// if there is an incomplete escape sequence stored in its internal buffer. If so,
472    /// it means the stream ended unexpectedly, and an `Err(UnescapeError)` is returned.
473    ///
474    /// This method consumes the `UnescapeStream`, preventing further use.
475    pub fn finish(self) -> Result<(), UnescapeError> {
476        if self.stitch_len > 0 {
477            // If there are bytes left in the stitch buffer, it means the stream
478            // ended mid-escape. We re-run the parser on just this fragment
479            // to generate the correct EOF error.
480            let buf = &self.stitch_buf[..self.stitch_len as usize];
481            if let Some(Err(e)) = unescape(buf).next() {
482                // We expect an EOF error here specifically.
483                debug_assert_eq!(
484                    e,
485                    UnescapeError {
486                        kind: UnescapeErrorKind::UnexpectedEof,
487                        offset: self.stitch_len
488                    }
489                );
490                return Err(e);
491            }
492        }
493        Ok(())
494    }
495
496    /// Clears any partial escape sequence data from the internal buffer.
497    ///
498    /// You might call this after encountering a non-fatal error if you want to
499    /// discard the invalid partial data and continue processing the stream from
500    /// a fresh state.
501    ///
502    /// **Warning**: If you `clear()` the state after an error and then call
503    /// `finish()` without processing more data, `finish()` will return `Ok(())`
504    /// because the partial state that caused the original error has been erased.
505    pub fn clear(&mut self) {
506        self.stitch_len = 0;
507    }
508
509    /// Unescapes a stream of byte chunks from a source function to a destination function.
510    ///
511    /// This function acts as a driver for the [`UnescapeStream`]. It repeatedly calls a
512    /// source function (`src`) to get the next chunk of data, unescapes it, and then
513    /// calls a destination function (`dst`) for each resulting [`UnescapedToken`].
514    ///
515    /// This provides a flexible way to connect any data source (like a file reader or
516    /// network socket) to any data sink without manually iterating.
517    ///
518    /// # Parameters
519    ///
520    /// - `self`: The `UnescapeStream` instance, which will be consumed.
521    /// - `src`: A closure or function that, when called, returns the next chunk of
522    ///   data as `Option<Result<B, SrcError>>`. It should return `Some(Ok(chunk))`
523    ///   for data, `Some(Err(e))` for a source error, and `None` to signal the
524    ///   end of the stream.
525    /// - `dst`: A closure or function that receives an [`UnescapedToken`]. It should
526    ///   return `Ok(())` on success or `Err(DstError)` on failure.
527    ///
528    /// # Errors
529    ///
530    /// Returns an [`UnescapeFnError`] if an error occurs at any point:
531    /// - [`UnescapeFnError::Src`] if the `src` function returns an error.
532    /// - [`UnescapeFnError::Unescape`] if the JSON string is malformed (e.g., an
533    ///   invalid escape sequence or incomplete data at the end of the stream).
534    /// - [`UnescapeFnError::Dst`] if the `dst` function returns an error.
535    #[inline]
536    #[deprecated(
537        since = "0.3.1",
538        note = "This sync-only function is superseded by the runtime-agnostic `unescape_stream_into!` macro."
539    )]
540    pub fn unescape_from_fn<Src, Dst, SrcError, DstError, B>(
541        self,
542        src: Src,
543        dst: Dst,
544    ) -> Result<(), UnescapeFnError<SrcError, DstError>>
545    where
546        Src: FnMut() -> Option<Result<B, SrcError>>,
547        Dst: FnMut(UnescapedToken<'_>) -> Result<(), DstError>,
548        B: AsRef<[u8]>,
549    {
550        #[allow(deprecated)]
551        self.unescape_from_source(
552            FnMutChunkSource {
553                closure: src,
554                _phantom: core::marker::PhantomData,
555            },
556            dst,
557        )
558    }
559
560    /// Processes a stream of byte chunks from a source and unescapes them to a destination.
561    ///
562    /// This function drives the unescaping process by repeatedly calling a `src`
563    /// (source) to get a chunk of bytes, unescaping the data, and then passing
564    /// the resulting `UnescapedToken`s to a `dst` (destination).
565    ///
566    /// This provides a flexible pipeline for connecting any byte source (e.g., a file
567    /// or network stream) to any byte sink without manual iteration.
568    ///
569    /// # Parameters
570    /// - `src`: A `ChunkSource` that provides the raw, escaped byte chunks.
571    /// - `dst`: A closure that receives each `UnescapedToken` and processes it.
572    ///
573    /// # Errors
574    /// This function returns an `UnescapeFnError` if an error occurs at any stage:
575    /// - `UnescapeFnError::Src`: An error from the source (`src`).
576    /// - `UnescapeFnError::Unescape`: The data is malformed (e.g., an invalid escape sequence).
577    /// - `UnescapeFnError::Dst`: An error from the destination (`dst`).
578    #[inline]
579    #[deprecated(
580        since = "0.3.1",
581        note = "This sync-only function is superseded by the runtime-agnostic `unescape_stream_into!` macro."
582    )]
583    #[allow(deprecated)]
584    pub fn unescape_from_source<Src, Dst, SrcError, DstError>(
585        mut self,
586        mut src: Src,
587        mut dst: Dst,
588    ) -> Result<(), UnescapeFnError<SrcError, DstError>>
589    where
590        Src: ChunkSource<Error = SrcError>,
591        Dst: FnMut(UnescapedToken<'_>) -> Result<(), DstError>,
592    {
593        while let Some(next) = src.next_chunk() {
594            let next = next.map_err(UnescapeFnError::Src)?;
595            let (boundary, next) = self
596                .try_unescape_next(next.as_ref())
597                .map_err(UnescapeFnError::Unescape)?;
598
599            if let Some(ch) = boundary {
600                dst(UnescapedToken::Unescaped(ch)).map_err(UnescapeFnError::Dst)?;
601            }
602
603            for token in next {
604                let token = token.map_err(UnescapeFnError::Unescape)?;
605                dst(token).map_err(UnescapeFnError::Dst)?;
606            }
607        }
608
609        self.finish().map_err(UnescapeFnError::Unescape)
610    }
611}
612
613/// An error that can occur during the `unescape_from_source` operation.
614///
615/// This enum consolidates errors from the three potential points of failure:
616/// 1. Reading from the source (`Src`).
617/// 2. The JSON unescaping process itself (`Unescape`).
618/// 3. Writing to the destination (`Dst`).
619#[derive(Clone, Debug)]
620pub enum UnescapeFnError<Src, Dst> {
621    /// An error occurred during the unescaping process.
622    Unescape(UnescapeError),
623    /// An error occurred while reading from the source.
624    Src(Src),
625    /// An error occurred while writing to the destination.
626    Dst(Dst),
627}
628
629impl<Src, Dst: core::fmt::Display> core::fmt::Display for UnescapeFnError<Src, Dst>
630where
631    Src: core::fmt::Display,
632{
633    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
634        match self {
635            UnescapeFnError::Unescape(e) => write!(f, "unescape error: {e}"),
636            UnescapeFnError::Src(e) => write!(f, "source error: {e}"),
637            UnescapeFnError::Dst(e) => write!(f, "destination error: {e}"),
638        }
639    }
640}
641
642#[cfg(feature = "std")]
643impl<Src, Dst> std::error::Error for UnescapeFnError<Src, Dst>
644where
645    Src: std::error::Error + 'static,
646    Dst: std::error::Error + 'static,
647{
648    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
649        match self {
650            UnescapeFnError::Unescape(e) => Some(e),
651            UnescapeFnError::Src(e) => Some(e),
652            UnescapeFnError::Dst(e) => Some(e),
653        }
654    }
655}
656
657impl From<UnescapeFnError<Infallible, Infallible>> for UnescapeError {
658    fn from(value: UnescapeFnError<Infallible, Infallible>) -> Self {
659        match value {
660            UnescapeFnError::Unescape(unescape_error) => unescape_error,
661            UnescapeFnError::Src(i) => match i {},
662            UnescapeFnError::Dst(i) => match i {},
663        }
664    }
665}
666
667/// An iterator over the unescaped parts of a single byte slice.
668///
669/// This struct is created by [`UnescapeStream::unescape_next`].
670#[must_use = "iterators are lazy and do nothing unless consumed"]
671#[derive(Debug)]
672pub struct UnescapeNext<'a, 'b> {
673    stream: &'a mut UnescapeStream,
674    inner: UnescapeTokens<'b>,
675}
676
677impl<'a, 'b> Iterator for UnescapeNext<'a, 'b> {
678    type Item = Result<UnescapedToken<'b>, UnescapeError>;
679
680    #[inline]
681    fn next(&mut self) -> Option<Self::Item> {
682        match self.inner.next() {
683            // ASSUMPTION: Unfinished high surrogate will cause UnexpectedEof
684            Some(Err(e)) if e.kind == UnescapeErrorKind::UnexpectedEof => {
685                // The current chunk ends with an incomplete escape sequence.
686                // Save the remnant for the next call to `unescape_next`.
687                //
688                // ASSUMPTION: UnescapeTokens will not update state on error.
689                let remnant = self.inner.remnant();
690                debug_assert!(!remnant.is_empty() && remnant[0] == b'\\', "{remnant:?}");
691                debug_assert!(remnant.len() < self.stream.stitch_buf.len(), "{remnant:?}");
692
693                // Copy the remnant and update the length.
694                self.stream.stitch_buf[..remnant.len()].copy_from_slice(remnant);
695                self.stream.stitch_len = remnant.len() as u8;
696
697                // Stop iterating for this chunk.
698                None
699            }
700            // A definitive error or a valid chunk.
701            other => other,
702        }
703    }
704}
705
706// =============================================================================
707// Traits
708// =============================================================================
709
710/// This trait is designed to handle byte streams efficiently, especially when the
711/// source needs to borrow from an internal buffer between calls. A simple closure
712/// (`FnMut() -> Option<Result<B, E>>`) cannot express this lifetime relationship,
713/// as the returned slice would need to outlive the closure call itself. This trait
714/// solves that by making the source a mutable object that you call repeatedly.
715///
716/// Async functionality can be achieved by the original API
717#[deprecated(
718    since = "0.3.1",
719    note = "This sync-only trait is superseded by the runtime-agnostic `unescape_stream_into!` macro."
720)]
721pub trait ChunkSource {
722    /// The type of error that can occur when reading a chunk.
723    type Error;
724
725    /// The type of chunk returned, which must implement AsRef<[u8]>
726    type Chunk<'a>: AsRef<[u8]> + 'a
727    where
728        Self: 'a;
729
730    /// Get the next chunk of bytes.
731    ///
732    /// Returns `None` when the source is exhausted, `Some(Ok(bytes))` for a successful chunk,
733    /// or `Some(Err(e))` if an error occurred.
734    ///
735    /// The returned slice is valid until the next call to `next_chunk` or until the
736    /// source is dropped.
737    fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>>;
738}
739
740#[allow(deprecated)]
741impl<T> ChunkSource for &mut T
742where
743    T: ChunkSource,
744{
745    type Error = T::Error;
746
747    type Chunk<'a>
748        = T::Chunk<'a>
749    where
750        Self: 'a;
751
752    #[inline]
753    fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
754        (*self).next_chunk()
755    }
756}
757
758/// A `ChunkSource` that reads from any `std::io::Read` type.
759///
760/// This struct manages an internal buffer, which is filled with data on each read
761/// operation. You can configure the buffer size to balance memory usage and I/O
762/// performance.
763#[cfg(feature = "std")]
764#[deprecated(
765    since = "0.3.1",
766    note = "This sync-only struct is superseded by the runtime-agnostic `unescape_stream_into!` macro."
767)]
768pub struct ReadChunkSource<R, B = Vec<u8>> {
769    reader: R,
770    buffer: B,
771}
772
773#[cfg(feature = "std")]
774#[allow(deprecated)]
775impl<R, B> ReadChunkSource<R, B> {
776    /// Creates a new `ReadChunkSource` with the given reader and a pre-allocated buffer.
777    ///
778    /// The size of the chunks read will be determined by the buffer's capacity.
779    pub fn new(reader: R, buffer: B) -> Self {
780        Self { reader, buffer }
781    }
782
783    /// Creates a new `ReadChunkSource` with the specified buffer size.
784    ///
785    /// This is a convenience method that creates a new `Vec<u8>` with the given
786    /// *length* (not just capacity) to use as the internal buffer.
787    pub fn with_buffer_size(reader: R, size: usize) -> ReadChunkSource<R, Vec<u8>> {
788        // Vec::with_capacity(size) creates a Vec with len() == 0.
789        ReadChunkSource::new(reader, std::vec![0u8; size])
790    }
791}
792
793#[allow(deprecated)]
794impl<R, B> ChunkSource for ReadChunkSource<R, B>
795where
796    R: std::io::Read,
797    B: AsMut<[u8]>,
798{
799    type Error = std::io::Error;
800    type Chunk<'a>
801        = &'a [u8]
802    where
803        Self: 'a;
804
805    #[inline]
806    fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
807        let buffer = self.buffer.as_mut();
808        // TODO; Make more robust (temporary error)
809        match self.reader.read(buffer) {
810            Ok(0) => None, // EOF
811            Ok(n) => Some(Ok(&buffer[..n])),
812            Err(e) => Some(Err(e)),
813        }
814    }
815}
816
817/// A `ChunkSource` implementation that wraps a mutable closure (`FnMut`).
818///
819/// This struct is generic over a lifetime `'s`, the closure type `F`,
820/// the chunk type `B`, and the error type `E`. It correctly handles the
821/// lifetime of the returned chunks, which are guaranteed to live at least
822/// as long as the `'s` lifetime associated with the struct.
823pub struct FnMutChunkSource<'s, F, B, E>
824where
825    F: FnMut() -> Option<Result<B, E>>,
826    B: AsRef<[u8]> + 's,
827{
828    /// The wrapped closure that produces chunks.    
829    closure: F,
830    /// A marker to inform the compiler about the `'s` lifetime. This is
831    /// necessary because the closure's return type `B` is tied to this
832    /// lifetime, but the struct doesn't directly hold a reference with
833    /// that lifetime.    
834    _phantom: core::marker::PhantomData<&'s ()>,
835}
836
837impl<'s, F, B, E> FnMutChunkSource<'s, F, B, E>
838where
839    F: FnMut() -> Option<Result<B, E>>,
840    B: AsRef<[u8]> + 's,
841{
842    /// Creates a new `FnMutChunkSource`.
843    ///
844    /// This function takes a closure `F` that will be used to generate chunks. The
845    /// closure must return `Option<Result<B, E>>`, where `B` is a type that can be
846    /// borrowed as a `&[u8]` and `E` is the error type.    
847    pub fn new(closure: F) -> Self {
848        FnMutChunkSource {
849            closure,
850            _phantom: core::marker::PhantomData,
851        }
852    }
853}
854
855#[allow(deprecated)]
856impl<'s, F, B, E> ChunkSource for FnMutChunkSource<'s, F, B, E>
857where
858    F: FnMut() -> Option<Result<B, E>>,
859    B: AsRef<[u8]> + 's,
860{
861    type Error = E;
862    type Chunk<'a>
863        = B
864    where
865        Self: 'a;
866
867    #[inline(always)]
868    fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
869        (self.closure)()
870    }
871}
872
873// Here we test our assumptions
874#[cfg(test)]
875mod assumptions_tests {
876    use super::*;
877
878    // Test that partial surrogate isn't lone-surrogate but
879    // actually EOF
880    #[test]
881    fn insufficient_data_is_not_lone_surrogate() {
882        let mut unescaper = unescape(br"\uD83D");
883        assert_eq!(
884            unescaper.next().unwrap().unwrap_err(),
885            UnescapeError {
886                kind: UnescapeErrorKind::UnexpectedEof,
887                offset: 6
888            }
889        );
890
891        let mut unescaper = unescape(br"\uD83D\");
892        assert_eq!(
893            unescaper.next().unwrap().unwrap_err(),
894            UnescapeError {
895                kind: UnescapeErrorKind::UnexpectedEof,
896                offset: 7
897            }
898        );
899    }
900
901    #[test]
902    fn lone_surrogate_is_not_eof() {
903        let mut unescaper = unescape(br"\uD83Da");
904        assert_eq!(
905            unescaper.next().unwrap().unwrap_err(),
906            UnescapeError {
907                kind: UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError {
908                    surrogate: 0xD83D
909                }),
910                offset: 6
911            }
912        );
913    }
914
915    #[test]
916    fn unescape_does_not_commit_on_error() {
917        let err_input = br"\uD83D";
918        let mut unescaper = unescape(err_input);
919        let err = unescaper.next().unwrap().unwrap_err();
920        assert_eq!(
921            err,
922            UnescapeError {
923                kind: UnescapeErrorKind::UnexpectedEof,
924                offset: 6
925            }
926        );
927
928        assert_eq!(unescaper.remnant(), err_input);
929    }
930
931    #[test]
932    fn unescape_keeps_erroring() {
933        // This tests that an error is sticky.
934        let err_input = br"\z";
935        let mut unescaper = unescape(err_input);
936        let err_1 = unescaper.next().unwrap().unwrap_err();
937        // The iterator should continue to yield the same error.
938        let err_2 = unescaper.next().unwrap().unwrap_err();
939
940        assert_eq!(err_1, err_2)
941    }
942}
943
944#[cfg(test)]
945mod tests {
946    use super::*;
947    use std::{str, string::String, vec::Vec};
948
949    /// Helper to run a stream test and collect the output into a string.
950    fn run_stream_test<I, S>(parts: I) -> Result<String, UnescapeError>
951    where
952        I: IntoIterator<Item = S>,
953        S: AsRef<[u8]>,
954    {
955        let unescaper = UnescapeStream::new();
956        let mut parts = parts.into_iter();
957        let mut output = String::new();
958        #[allow(deprecated)]
959        unescaper.unescape_from_fn::<_, _, Infallible, Infallible, _>(
960            || parts.next().map(Ok),
961            |token| {
962                match token {
963                    UnescapedToken::Unescaped(c) => output.push(c),
964                    UnescapedToken::Literal(s) => output.push_str(str::from_utf8(s).unwrap()),
965                }
966                Ok(())
967            },
968        )?;
969
970        Ok(output)
971    }
972
973    #[test]
974    fn test_single_chunk_no_escapes() {
975        let parts = [br"hello world"];
976        assert_eq!(run_stream_test(parts).unwrap(), "hello world");
977    }
978
979    #[test]
980    fn test_single_chunk_with_escapes() {
981        let parts = [br#"hello \"world\t\n\\"#];
982        assert_eq!(run_stream_test(parts).unwrap(), "hello \"world\t\n\\");
983    }
984
985    #[test]
986    fn test_multiple_chunks_no_escapes() {
987        let parts = [&br"hello "[..], &br"world"[..]];
988        assert_eq!(run_stream_test(parts).unwrap(), "hello world");
989    }
990
991    #[test]
992    fn test_empty_chunks() {
993        let parts = [
994            &br"hello"[..],
995            &br""[..],
996            &br" "[..],
997            &br""[..],
998            &br"world"[..],
999        ];
1000        assert_eq!(run_stream_test(parts).unwrap(), "hello world");
1001    }
1002
1003    #[test]
1004    fn test_split_before_escape() {
1005        let parts = [&br"hello"[..], &br"\nworld"[..]];
1006        assert_eq!(run_stream_test(parts).unwrap(), "hello\nworld");
1007    }
1008
1009    #[test]
1010    fn test_split_during_simple_escape() {
1011        let parts = [&br"hello\"[..], &br"nworld"[..]];
1012        assert_eq!(run_stream_test(parts).unwrap(), "hello\nworld");
1013    }
1014
1015    #[test]
1016    fn test_split_during_unicode_escape() {
1017        // Euro symbol € is U+20AC
1018        let parts = [&br"price: \u20"[..], &br"AC"[..]];
1019        assert_eq!(run_stream_test(parts).unwrap(), "price: €");
1020    }
1021
1022    #[test]
1023    fn test_split_during_surrogate_pair() {
1024        // Grinning face 😀 is U+1F600 -> \uD83D\uDE00
1025        let parts = [&br"emoji: \uD83D"[..], &br"\uDE00"[..]];
1026        assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1027    }
1028
1029    #[test]
1030    fn test_split_between_surrogate_pair_halves() {
1031        let parts = [&br"emoji: \uD83D\"[..], &br"uDE00"[..]];
1032        assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1033    }
1034
1035    #[test]
1036    fn test_split_in_second_surrogate() {
1037        // Grinning face 😀 is U+1F600 -> \uD83D\uDE00
1038        let parts = [&br"emoji: \uD83D\uDE"[..], &br"00"[..]];
1039        assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1040    }
1041
1042    #[test]
1043    fn test_tiny_chunks_across_surrogate_pair() {
1044        // emoji: 😀 (\uD83D\uDE00)
1045        let parts = [
1046            br"e", br"m", br"o", br"j", br"i", br":", br" ", br"\", br"u", br"D", br"8", br"3",
1047            br"D", br"\", br"u", br"D", br"E", br"0", br"0",
1048        ];
1049        assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1050    }
1051
1052    #[test]
1053    fn test_finish_success() {
1054        let mut unescaper = UnescapeStream::new();
1055        let (boundary, rest) = unescaper.unescape_next(br"hello");
1056        assert!(boundary.is_none());
1057        assert_eq!(
1058            rest.map(|r| r.unwrap()).collect::<Vec<UnescapedToken>>(),
1059            alloc::vec![UnescapedToken::Literal(br"hello")]
1060        );
1061
1062        let (boundary, rest) = unescaper.unescape_next(br" world");
1063        assert!(boundary.is_none());
1064        assert_eq!(
1065            rest.map(|r| r.unwrap()).collect::<Vec<UnescapedToken>>(),
1066            alloc::vec![UnescapedToken::Literal(br" world")]
1067        );
1068
1069        // Now finish it, it should be successful.
1070        assert!(unescaper.finish().is_ok());
1071    }
1072
1073    #[test]
1074    fn test_finish_error_on_incomplete_escape() {
1075        let mut unescaper = UnescapeStream::new();
1076        let (_, next) = unescaper.unescape_next(br"hello\");
1077        // drain next
1078        next.for_each(|r| {
1079            r.unwrap();
1080        });
1081        let err = unescaper.finish().unwrap_err();
1082        assert_eq!(err.kind, UnescapeErrorKind::UnexpectedEof);
1083    }
1084
1085    #[test]
1086    fn test_finish_error_on_incomplete_unicode() {
1087        let mut unescaper = UnescapeStream::new();
1088        let (_, next) = unescaper.unescape_next(br"hello\u12");
1089        // drain next
1090        next.for_each(|r| {
1091            r.unwrap();
1092        });
1093        let err = unescaper.finish().unwrap_err();
1094        assert_eq!(err.kind, UnescapeErrorKind::UnexpectedEof);
1095    }
1096
1097    #[test]
1098    fn test_error_across_boundary_invalid_escape() {
1099        let parts = [&br"oh\"[..], &br"z"[..]];
1100        let err = run_stream_test(parts).unwrap_err();
1101        assert_eq!(
1102            err.kind,
1103            UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1104        )
1105    }
1106
1107    #[test]
1108    fn test_error_across_boundary_lone_surrogate() {
1109        // High surrogate followed by non-low-surrogate
1110        let parts = [&br"\uD83D"[..], &br"abc"[..]];
1111        let err = run_stream_test(parts).unwrap_err();
1112        assert_eq!(
1113            err.kind,
1114            UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError { surrogate: 0xD83D })
1115        );
1116    }
1117
1118    #[test]
1119    fn test_error_across_boundary_not_low_surrogate() {
1120        // High surrogate followed by another valid escape that is not a low surrogate
1121        let parts = [&br"\uD83D"[..], &br"\u0020"[..]]; // \u0020 is a space
1122        let err = run_stream_test(parts).unwrap_err();
1123        assert_eq!(
1124            err.kind,
1125            UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError { surrogate: 0xD83D })
1126        );
1127    }
1128
1129    #[test]
1130    fn test_clear_after_error() {
1131        let mut unescaper = UnescapeStream::new();
1132        let (_, next) = unescaper.try_unescape_next("abc\\").unwrap();
1133
1134        // drain next
1135        next.for_each(|r| {
1136            r.unwrap();
1137        });
1138
1139        // This will now cause an error across the boundary
1140        let err = unescaper.try_unescape_next(br"z").unwrap_err();
1141        assert_eq!(
1142            err.kind,
1143            UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1144        );
1145
1146        // After the error, the internal state contains the partial data that caused it.
1147        // Finishing it would report an error. We use clone() to check without consuming.
1148        assert!(unescaper.clone().finish().is_err());
1149
1150        // But if we clear it, the state is reset.
1151        unescaper.clear();
1152        assert!(unescaper.clone().finish().is_ok());
1153
1154        // And we can continue processing new data.
1155        let mut output = String::new();
1156        let (boundary, rest) = unescaper.try_unescape_next(br"good data").unwrap();
1157        assert!(boundary.is_none());
1158        for token in rest {
1159            match token {
1160                Ok(UnescapedToken::Literal(literal)) => {
1161                    output.push_str(str::from_utf8(literal).unwrap())
1162                }
1163                _ => unreachable!(),
1164            }
1165        }
1166        assert_eq!(output, "good data");
1167    }
1168
1169    #[test]
1170    fn test_error_after_successful_boundary() {
1171        let mut unescaper = UnescapeStream::new();
1172        // First part is an incomplete surrogate
1173        let (_, mut rest) = unescaper.unescape_next(br"\uD83D");
1174        assert!(rest.next().is_none()); // The iterator is consumed into the stitch buffer
1175
1176        // Second part completes the surrogate but has an error after it
1177        let (boundary, mut rest) = unescaper.unescape_next(br"\uDE00\z");
1178
1179        // The boundary char should be resolved correctly
1180        let boundary_char = boundary.unwrap().unwrap();
1181        assert_eq!(boundary_char, '😀');
1182
1183        // The next token from the iterator should be the error
1184        let err = rest.next().unwrap().unwrap_err();
1185        assert_eq!(
1186            err.kind,
1187            UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1188        );
1189
1190        // Since the error did not happen at the EOF of the chunk,
1191        // the stitch buffer is empty.
1192        assert_eq!(unescaper.stitch_len, 0);
1193
1194        // Therefore, finishing the stream now should be OK, as there's no partial data.
1195        // The error was contained entirely within the second chunk.
1196        assert!(unescaper.finish().is_ok());
1197    }
1198
1199    #[test]
1200    #[allow(deprecated)]
1201    fn test_unescape_from_source_src_error() {
1202        let unescaper = UnescapeStream::new();
1203        let mut parts = alloc::vec![Ok(b"hello".as_slice()), Err("read error")].into_iter();
1204        let result =
1205            unescaper.unescape_from_fn(|| parts.next(), |_| -> Result<(), Infallible> { Ok(()) });
1206        match result {
1207            Err(UnescapeFnError::Src("read error")) => (), // pass
1208            _ => panic!("Expected a source error"),
1209        }
1210    }
1211
1212    #[test]
1213    #[allow(deprecated)]
1214    fn test_unescape_from_source_dst_error() {
1215        let unescaper = UnescapeStream::new();
1216        let mut parts = alloc::vec![Result::<_, ()>::Ok("hello")].into_iter();
1217        let result = unescaper.unescape_from_fn(
1218            || parts.next(),
1219            |_| -> Result<(), &str> { Err("write error") },
1220        );
1221        match result {
1222            Err(UnescapeFnError::Dst("write error")) => (), // pass
1223            _ => panic!("Expected a destination error"),
1224        }
1225    }
1226
1227    #[test]
1228    fn macro_stream() {
1229        fn sync_stream() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1230            // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
1231            let mut parts = std::vec![
1232                br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
1233                br#"\uDE00"}"#.as_slice(),
1234            ]
1235            .into_iter();
1236
1237            let mut unescaped_string = std::string::String::new();
1238
1239            unescape_stream_into! {
1240                Read: {
1241                    parts.next()
1242                },
1243                Write: |token| {
1244                    match token {
1245                         UnescapedToken::Literal(literal) => {
1246                             unescaped_string.push_str(std::str::from_utf8(literal)?)
1247                         }
1248                         UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
1249                     }
1250                },
1251            };
1252
1253            assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
1254            Ok(())
1255        }
1256
1257        fn sync_read() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1258            let mut read = &br#"{"message": "Hello, W\"orld! \uD83D\uDE00"}"#[..];
1259            let buffer = &mut [0u8; 5][..];
1260
1261            let mut unescaped_string = std::string::String::new();
1262            use std::io::Read;
1263            unescape_stream_into! {
1264                Read: {
1265                    let n = read.read(buffer)?;
1266                    if n == 0 {
1267                        // break
1268                        None
1269                    } else {
1270                        Some(&buffer[..n])
1271                    }
1272                },
1273                Write: |token| {
1274                    match token {
1275                         UnescapedToken::Literal(literal) => {
1276                             unescaped_string.push_str(std::str::from_utf8(literal)?)
1277                         }
1278                         UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
1279                     }
1280                },
1281            };
1282
1283            assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
1284            Ok(())
1285        }
1286
1287        async fn async_stream() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1288            struct AsyncIter<'a> {
1289                iter: <Vec<&'a [u8]> as IntoIterator>::IntoIter,
1290            }
1291
1292            impl<'a> AsyncIter<'a> {
1293                async fn next(&mut self) -> Option<&'a [u8]> {
1294                    self.iter.next()
1295                }
1296            }
1297
1298            struct AsyncWrite {
1299                write: std::string::String,
1300            }
1301
1302            impl AsyncWrite {
1303                async fn write(
1304                    &mut self,
1305                    token: UnescapedToken<'_>,
1306                ) -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1307                    match token {
1308                        UnescapedToken::Literal(literal) => {
1309                            self.write.push_str(std::str::from_utf8(literal)?)
1310                        }
1311                        UnescapedToken::Unescaped(ch) => self.write.push(ch),
1312                    }
1313                    Ok(())
1314                }
1315            }
1316
1317            // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
1318            let mut parts = AsyncIter {
1319                iter: std::vec![
1320                    br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
1321                    br#"\uDE00"}"#.as_slice(),
1322                ]
1323                .into_iter(),
1324            };
1325
1326            let mut unescaped_string = AsyncWrite {
1327                write: std::string::String::new(),
1328            };
1329
1330            unescape_stream_into! {
1331                Read: {
1332                    parts.next().await
1333                },
1334                Write: |token| {
1335                    unescaped_string.write(token).await?
1336                },
1337            };
1338
1339            assert_eq!(
1340                unescaped_string.write,
1341                r#"{"message": "Hello, W"orld! 😀"}"#
1342            );
1343            Ok(())
1344        }
1345
1346        sync_stream().unwrap();
1347        sync_read().unwrap();
1348
1349        let fut = std::pin::pin!(async_stream());
1350        use std::future::Future;
1351        let result = fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop()));
1352        assert!(matches!(result, std::task::Poll::Ready(Ok(()))))
1353    }
1354}