json_escape/stream.rs
1//! A high-performance, allocation-free, streaming JSON string unescaper.
2//!
3//! This module provides utilities to unescape JSON strings, with a focus on
4//! performance and flexibility. It is designed to work with data sources that
5//! deliver content in chunks, such as network sockets or file readers, without
6//! requiring heap allocations or holding onto previous chunks.
7//!
8//! # Key Features
9//!
10//! - **Streaming Unescaping**: The main type, [`UnescapeStream`], processes byte
11//! slices incrementally. This is ideal for I/O-bound applications.
12//! - **Zero Heap Allocations**: The entire process occurs on the stack, using a
13//! small internal buffer to "stitch" together escape sequences that are split
14//! across chunk boundaries.
15//! - **Data-Source Agnostic**: The API uses a "push" model. You provide
16//! byte slices to the unescaper as you receive them, allowing the caller to
17//! reuse their input buffers.
18//! - **Robust Error Handling**: Reports detailed errors, including the position
19//! and kind of failure.
20//!
21//! # How It Works
22//!
23//! The core of the streaming logic is the [`UnescapeStream`] struct. When you
24//! process a slice using [`unescape_next`](UnescapeStream::unescape_next), it returns
25//! a tuple containing two parts:
26//!
27//! 1. An `Option<Result<char, UnescapeError>>`: This handles the "continuity"
28//! between the previous slice and the current one. It will be `Some(_)` only if
29//! the previous slice ended with an incomplete escape sequence that was
30//! resolved by the start of the new slice. The `Result` will contain the
31//! unescaped character on success or an error if the combined bytes form an
32//! invalid sequence.
33//! 2. An `UnescapeNext` iterator: This iterator yields the unescaped parts
34//! for the remainder of the current slice.
35//!
36//! After processing all slices, you **must** call [`finish`](UnescapeStream::finish) to check
37//! for any leftover partial escape sequences, which would indicate a malformed
38//! JSON string at the end of the stream.
39//!
40//! # Example
41//!
42//! ```rust
43//! use json_escape::{stream::UnescapeStream, token::UnescapedToken};
44//!
45//! fn main() -> Result<(), Box<dyn std::error::Error>> {
46//! // A JSON string split into multiple parts.
47//! // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
48//! let parts = vec![
49//! br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
50//! br#"\uDE00"}"#.as_slice(),
51//! ];
52//!
53//! let mut unescaper = UnescapeStream::new();
54//! let mut unescaped_string = String::new();
55//!
56//! for part in parts {
57//! // Process the next part of the stream.
58//! let (boundary_char, rest_of_part) = unescaper.try_unescape_next(part)?;
59//!
60//! // 1. Handle the character that may have spanned the boundary.
61//! if let Some(boundary_char) = boundary_char {
62//! unescaped_string.push(boundary_char);
63//! }
64//!
65//! // 2. Process the rest of the current part.
66//! for result in rest_of_part {
67//! let unescaped_part = result?;
68//! match unescaped_part {
69//! UnescapedToken::Literal(literal) => {
70//! unescaped_string.push_str(std::str::from_utf8(literal)?)
71//! }
72//! UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
73//! }
74//! }
75//! }
76//!
77//! // IMPORTANT: Always call finish() to detect errors at the end of the stream.
78//! unescaper.finish()?;
79//!
80//! assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
81//!
82//! Ok(())
83//! }
84//! ```
85
86use core::convert::Infallible;
87#[cfg(feature = "std")]
88use std::vec::Vec;
89
90use crate::{
91 UnescapeError, UnescapeErrorKind,
92 token::{UnescapeTokens, UnescapedToken, unescape},
93};
94
95/// Drives an `UnescapeStream` with runtime-agnostic I/O expressions.
96///
97/// This macro provides an ergonomic, high-level wrapper for processing a stream
98/// of JSON string bytes. It hides the manual boilerplate of looping, calling
99/// `try_unescape_next`, and handling boundary conditions, and automatically calls
100/// `finish()` at the end.
101///
102/// # Rationale
103///
104/// The key feature of this macro is that it is **runtime-agnostic**.
105/// Because the `Read` and `Write` arguments are expressions, you can use
106/// this macro identically in both synchronous functions (e.g., with `std::io::Read`)
107/// and asynchronous functions (e.g., with `tokio::io::AsyncRead` and `.await`).
108///
109/// It avoids the "function coloring" problem, where you would otherwise need
110/// separate `unescape_sync` and `unescape_async` helper functions.
111///
112/// # Parameters
113///
114/// - `Read: { ... }`: An expression that provides the next chunk of byte data.
115/// This expression **must** evaluate to a value compatible with `Option<impl AsRef<[u8]>>`.
116/// - `Some(bytes)` provides the next chunk to the unescaper.
117/// - `None` signals the end of the stream (EOF).
118///
119/// - `Write: |$token| { ... }`: A closure-like expression that processes an
120/// `UnescapedToken`. The token will be available under the identifier you provide
121/// (e.g., `$token`).
122///
123/// # Examples
124///
125/// ### Synchronous Example (from an iterator)
126///
127/// ```rust
128/// use json_escape::stream::{unescape_stream_into, UnescapeStream};
129/// use json_escape::token::UnescapedToken;
130/// use std::str;
131///
132/// fn sync_stream() -> Result<(), Box<dyn std::error::Error>> {
133/// // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
134/// let mut parts = vec![
135/// br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
136/// br#"\uDE00"}"#.as_slice(),
137/// ]
138/// .into_iter();
139///
140/// let mut unescaped_string = String::new();
141///
142/// unescape_stream_into! {
143/// Read: {
144/// parts.next() // This is sync
145/// },
146/// Write: |token| {
147/// match token { // This is sync
148/// UnescapedToken::Literal(literal) => {
149/// unescaped_string.push_str(str::from_utf8(literal)?)
150/// }
151/// UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
152/// }
153/// },
154/// };
155///
156/// assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
157/// Ok(())
158/// }
159///
160/// sync_stream().unwrap();
161/// ```
162///
163/// ### Asynchronous Example (from an async stream)
164///
165/// ```rust
166/// use json_escape::stream::{unescape_stream_into, UnescapeStream};
167/// use json_escape::token::UnescapedToken;
168/// use std::str;
169///
170/// // A simple async iterator for the example
171/// struct AsyncIter<'a> {
172/// iter: <Vec<&'a [u8]> as IntoIterator>::IntoIter,
173/// }
174/// impl<'a> AsyncIter<'a> {
175/// async fn next(&mut self) -> Option<&'a [u8]> {
176/// self.iter.next()
177/// }
178/// }
179///
180/// // A simple async writer for the example
181/// struct AsyncWrite {
182/// write: String,
183/// }
184/// impl AsyncWrite {
185/// async fn write(
186/// &mut self,
187/// token: UnescapedToken<'_>,
188/// ) -> Result<(), Box<dyn std::error::Error>> {
189/// match token {
190/// UnescapedToken::Literal(literal) => {
191/// self.write.push_str(str::from_utf8(literal)?)
192/// }
193/// UnescapedToken::Unescaped(ch) => self.write.push(ch),
194/// }
195/// Ok(())
196/// }
197/// }
198///
199/// async fn async_stream() -> Result<(), Box<dyn std::error::Error>> {
200/// let mut parts = AsyncIter {
201/// iter: vec![
202/// br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
203/// br#"\uDE00"}"#.as_slice(),
204/// ]
205/// .into_iter(),
206/// };
207///
208/// let mut unescaped_string = AsyncWrite {
209/// write: String::new(),
210/// };
211///
212/// unescape_stream_into! {
213/// Read: {
214/// parts.next().await // This is async
215/// },
216/// Write: |token| {
217/// unescaped_string.write(token).await? // This is async
218/// },
219/// };
220///
221/// assert_eq!(
222/// unescaped_string.write,
223/// r#"{"message": "Hello, W"orld! 😀"}"#
224/// );
225/// Ok(())
226/// }
227/// ```
228#[macro_export]
229macro_rules! unescape_stream_into {
230 (
231 $Read:ident: $read:expr,
232 $Write:ident: |$token:ident| $write:expr,
233 ) => {
234 // Made them ident so lsp would make doc appear for help
235
236 // ensure the examples compile
237
238 /// Provide the read op (async/sync) as an expression resulting in Option<impl AsRef<[u8]>>
239 /// like:
240 /// ```rust
241 /// # use json_escape::stream::unescape_stream_into;
242 /// # fn dummy() -> Result<(), Box<dyn std::error::Error>> {
243 /// # let mut src = std::io::Cursor::new(b"");
244 /// # let buf = &mut [0u8; 10];
245 /// # use std::io::Read;
246 /// unescape_stream_into! {
247 /// Read: {
248 /// // perform read... like
249 /// let read = src.read(buf)?;
250 /// if read > 0 {
251 /// Some(&buf[..read])
252 /// } else {
253 /// None
254 /// }
255 /// },
256 /// Write: |token| {
257 /// // ...
258 /// }
259 /// };
260 /// # Ok(())
261 /// # }
262 /// ```
263 /// Since we're in macro, the op can return error and other things the surrounding function can catch
264 #[allow(dead_code)]
265 #[allow(non_snake_case)]
266 fn $Read() -> Option<impl AsRef<[u8]>> {
267 unimplemented!();
268 #[allow(unreachable_code)]
269 Some(b"")
270 }
271
272 /// Provide the write op (async/sync) as an expression taking UnescapedToken<'_>
273 /// like:
274 /// ```rust
275 /// # use json_escape::stream::unescape_stream_into;
276 /// unescape_stream_into! {
277 /// Read: {
278 /// // ...
279 /// None::<&[u8]>
280 /// },
281 /// Write: |token| {
282 /// println!("{token:#?}")
283 /// }
284 /// };
285 /// ```
286 #[allow(dead_code)]
287 #[allow(non_snake_case)]
288 #[allow(unused_variables)]
289 fn $Write($token: $crate::token::UnescapedToken<'_>) {
290 unimplemented!();
291 }
292
293 let mut unescaper = $crate::stream::UnescapeStream::new();
294 loop {
295 if let Some(part) = $read {
296 // Process the next part of the stream.
297 let (boundary_char, rest_of_part) = unescaper.try_unescape_next(part)?;
298
299 // 1. Handle the character that may have spanned the boundary.
300 if let Some(boundary_char) = boundary_char {
301 let $token = $crate::token::UnescapedToken::Unescaped(boundary_char);
302 $write
303 }
304
305 // 2. Process the rest of the current part.
306 for result in rest_of_part {
307 let unescaped_part = result?;
308 let $token = unescaped_part;
309 $write
310 }
311 } else {
312 break;
313 }
314 }
315
316 // IMPORTANT: Always call finish() to detect errors at the end of the stream.
317 unescaper.finish()?;
318 };
319}
320
321/// A streaming JSON string unescaper that operates over byte slices.
322///
323/// This struct is the main entry point for streaming unescaping. It maintains
324/// a small internal buffer to handle escape sequences that are split across
325/// slice boundaries without requiring heap allocations.
326///
327/// See the [module-level documentation](self) for examples and more details.
328#[derive(Debug, Clone)]
329#[must_use = "UnescapeStream does nothing unless consumed"]
330pub struct UnescapeStream {
331 // A full surrogate pair escape `\uXXXX\uYYYY` is 12 bytes.
332 // This buffer is large enough to hold it and any other partial escape.
333 stitch_buf: [u8; 12],
334 /// The number of valid bytes in `stitch_buf`.
335 stitch_len: u8,
336}
337
338impl Default for UnescapeStream {
339 #[inline]
340 fn default() -> Self {
341 Self::new()
342 }
343}
344
345impl UnescapeStream {
346 /// Creates a new, empty `UnescapeStream`.
347 #[inline]
348 pub fn new() -> Self {
349 Self {
350 stitch_buf: [0; 12],
351 stitch_len: 0,
352 }
353 }
354
355 /// Processes the next byte slice, returning a fallible result.
356 ///
357 /// This is a convenience wrapper around [`UnescapeStream::unescape_next`]. Instead of returning
358 /// an `Option<Result<...>>`, it "hoists" a potential boundary error into
359 /// the main `Result`.
360 ///
361 /// This simplifies error handling, as you can use the `?` operator to handle
362 /// errors from both the boundary character and the rest of the stream.
363 ///
364 /// # Returns
365 ///
366 /// - `Ok((Option<char>, UnescapeNext))` on success. The `Option<char>` contains
367 /// the successfully unescaped character from a boundary-spanning sequence.
368 /// - `Err(UnescapeError)` if completing a boundary-spanning sequence results
369 /// in a parsing error.
370 #[inline]
371 pub fn try_unescape_next<'a, 'b, I: AsRef<[u8]> + ?Sized>(
372 &'a mut self,
373 next_part: &'b I,
374 ) -> Result<(Option<char>, UnescapeNext<'a, 'b>), UnescapeError> {
375 let (boundary_result, new) = self.unescape_next(next_part);
376 let boundary_char = boundary_result.transpose()?;
377
378 Ok((boundary_char, new))
379 }
380
381 /// Processes the next byte slice in the stream.
382 ///
383 /// This is the primary method for feeding data to the unescaper. It returns a tuple:
384 ///
385 /// 1. An `Option<Result<char, UnescapeError>>` for the character that may have
386 /// spanned the boundary from the *previous* slice. `None` if the previous
387 /// slice ended cleanly.
388 /// 2. An `UnescapeNext` iterator for the remainder of the *current* slice.
389 ///
390 /// If the current slice ends with an incomplete escape sequence, that partial data is
391 /// saved internally. It will be resolved on the next call to `unescape_next` again with
392 /// the subsequent slice or reported as an error by `finish`.
393 pub fn unescape_next<'a, 'b, I: AsRef<[u8]> + ?Sized>(
394 &'a mut self,
395 next_part: &'b I,
396 ) -> (Option<Result<char, UnescapeError>>, UnescapeNext<'a, 'b>) {
397 let mut next_part_slice = next_part.as_ref();
398 let boundary_char = if self.stitch_len > 0 {
399 // We have a partial escape from the previous chunk. Try to complete it.
400 let old_stitch_len = self.stitch_len as usize;
401
402 // Determine how many bytes to copy from the new chunk.
403 // We copy enough to fill our buffer, which is sized for the longest escape.
404 let needed = self.stitch_buf.len() - old_stitch_len;
405 let to_copy = needed.min(next_part_slice.len());
406
407 // Append bytes from the new chunk to our buffer.
408 self.stitch_buf[old_stitch_len..old_stitch_len + to_copy]
409 .copy_from_slice(&next_part_slice[..to_copy]);
410 self.stitch_len += to_copy as u8;
411
412 // Try to unescape the combined bytes.
413 let mut unescaper = unescape(&self.stitch_buf[..self.stitch_len as usize]);
414 let next = unescaper.next();
415
416 match next {
417 Some(Ok(token)) => {
418 // Success! The stitched part was resolved.
419 let total_consumed = (self.stitch_len as usize) - unescaper.remnant().len();
420 let consumed_from_next = total_consumed - old_stitch_len;
421 next_part_slice = &next_part_slice[consumed_from_next..];
422
423 let unescaped_char = match token {
424 UnescapedToken::Unescaped(c) => c,
425 _ => unreachable!("unescaper should produce a char from an escape"),
426 };
427
428 // Clear the buffer and return the resolved character.
429 self.stitch_len = 0;
430 Some(Ok(unescaped_char))
431 }
432 Some(Err(err)) => {
433 if err.kind == UnescapeErrorKind::UnexpectedEof {
434 // Still not enough data. The new data was consumed into our
435 // buffer but it wasn't enough to resolve the sequence.
436 // We will wait for the next chunk.
437 next_part_slice = &next_part_slice[to_copy..]; // Consume all we copied
438 None // No character or error to report yet.
439 } else {
440 // A definitive error occurred.
441 // To keep this function pure for a given `next_part`, we roll back
442 // the change to `stitch_len` so that another call with the same
443 // input will produce the same error. The bytes from `next_part_slice`
444 // are treated as lookahead only and are not consumed.
445 self.stitch_len = old_stitch_len as u8;
446 Some(Err(err))
447 }
448 }
449 None => {
450 // This is unreachable because the buffer is non-empty. The unescaper
451 // would either yield a chunk or an UnexpectedEof error.
452 unreachable!();
453 }
454 }
455 } else {
456 // The previous chunk ended cleanly.
457 None
458 };
459
460 let iterator = UnescapeNext {
461 stream: self,
462 inner: UnescapeTokens::new(next_part_slice),
463 };
464
465 (boundary_char, iterator)
466 }
467
468 /// Finalizes the unescaping process, checking for leftover incomplete data.
469 ///
470 /// This method **must** be called after all slices have been processed. It checks
471 /// if there is an incomplete escape sequence stored in its internal buffer. If so,
472 /// it means the stream ended unexpectedly, and an `Err(UnescapeError)` is returned.
473 ///
474 /// This method consumes the `UnescapeStream`, preventing further use.
475 pub fn finish(self) -> Result<(), UnescapeError> {
476 if self.stitch_len > 0 {
477 // If there are bytes left in the stitch buffer, it means the stream
478 // ended mid-escape. We re-run the parser on just this fragment
479 // to generate the correct EOF error.
480 let buf = &self.stitch_buf[..self.stitch_len as usize];
481 if let Some(Err(e)) = unescape(buf).next() {
482 // We expect an EOF error here specifically.
483 debug_assert_eq!(
484 e,
485 UnescapeError {
486 kind: UnescapeErrorKind::UnexpectedEof,
487 offset: self.stitch_len
488 }
489 );
490 return Err(e);
491 }
492 }
493 Ok(())
494 }
495
496 /// Clears any partial escape sequence data from the internal buffer.
497 ///
498 /// You might call this after encountering a non-fatal error if you want to
499 /// discard the invalid partial data and continue processing the stream from
500 /// a fresh state.
501 ///
502 /// **Warning**: If you `clear()` the state after an error and then call
503 /// `finish()` without processing more data, `finish()` will return `Ok(())`
504 /// because the partial state that caused the original error has been erased.
505 pub fn clear(&mut self) {
506 self.stitch_len = 0;
507 }
508
509 /// Unescapes a stream of byte chunks from a source function to a destination function.
510 ///
511 /// This function acts as a driver for the [`UnescapeStream`]. It repeatedly calls a
512 /// source function (`src`) to get the next chunk of data, unescapes it, and then
513 /// calls a destination function (`dst`) for each resulting [`UnescapedToken`].
514 ///
515 /// This provides a flexible way to connect any data source (like a file reader or
516 /// network socket) to any data sink without manually iterating.
517 ///
518 /// # Parameters
519 ///
520 /// - `self`: The `UnescapeStream` instance, which will be consumed.
521 /// - `src`: A closure or function that, when called, returns the next chunk of
522 /// data as `Option<Result<B, SrcError>>`. It should return `Some(Ok(chunk))`
523 /// for data, `Some(Err(e))` for a source error, and `None` to signal the
524 /// end of the stream.
525 /// - `dst`: A closure or function that receives an [`UnescapedToken`]. It should
526 /// return `Ok(())` on success or `Err(DstError)` on failure.
527 ///
528 /// # Errors
529 ///
530 /// Returns an [`UnescapeFnError`] if an error occurs at any point:
531 /// - [`UnescapeFnError::Src`] if the `src` function returns an error.
532 /// - [`UnescapeFnError::Unescape`] if the JSON string is malformed (e.g., an
533 /// invalid escape sequence or incomplete data at the end of the stream).
534 /// - [`UnescapeFnError::Dst`] if the `dst` function returns an error.
535 #[inline]
536 #[deprecated(
537 since = "0.3.1",
538 note = "This sync-only function is superseded by the runtime-agnostic `unescape_stream_into!` macro."
539 )]
540 pub fn unescape_from_fn<Src, Dst, SrcError, DstError, B>(
541 self,
542 src: Src,
543 dst: Dst,
544 ) -> Result<(), UnescapeFnError<SrcError, DstError>>
545 where
546 Src: FnMut() -> Option<Result<B, SrcError>>,
547 Dst: FnMut(UnescapedToken<'_>) -> Result<(), DstError>,
548 B: AsRef<[u8]>,
549 {
550 #[allow(deprecated)]
551 self.unescape_from_source(
552 FnMutChunkSource {
553 closure: src,
554 _phantom: core::marker::PhantomData,
555 },
556 dst,
557 )
558 }
559
560 /// Processes a stream of byte chunks from a source and unescapes them to a destination.
561 ///
562 /// This function drives the unescaping process by repeatedly calling a `src`
563 /// (source) to get a chunk of bytes, unescaping the data, and then passing
564 /// the resulting `UnescapedToken`s to a `dst` (destination).
565 ///
566 /// This provides a flexible pipeline for connecting any byte source (e.g., a file
567 /// or network stream) to any byte sink without manual iteration.
568 ///
569 /// # Parameters
570 /// - `src`: A `ChunkSource` that provides the raw, escaped byte chunks.
571 /// - `dst`: A closure that receives each `UnescapedToken` and processes it.
572 ///
573 /// # Errors
574 /// This function returns an `UnescapeFnError` if an error occurs at any stage:
575 /// - `UnescapeFnError::Src`: An error from the source (`src`).
576 /// - `UnescapeFnError::Unescape`: The data is malformed (e.g., an invalid escape sequence).
577 /// - `UnescapeFnError::Dst`: An error from the destination (`dst`).
578 #[inline]
579 #[deprecated(
580 since = "0.3.1",
581 note = "This sync-only function is superseded by the runtime-agnostic `unescape_stream_into!` macro."
582 )]
583 #[allow(deprecated)]
584 pub fn unescape_from_source<Src, Dst, SrcError, DstError>(
585 mut self,
586 mut src: Src,
587 mut dst: Dst,
588 ) -> Result<(), UnescapeFnError<SrcError, DstError>>
589 where
590 Src: ChunkSource<Error = SrcError>,
591 Dst: FnMut(UnescapedToken<'_>) -> Result<(), DstError>,
592 {
593 while let Some(next) = src.next_chunk() {
594 let next = next.map_err(UnescapeFnError::Src)?;
595 let (boundary, next) = self
596 .try_unescape_next(next.as_ref())
597 .map_err(UnescapeFnError::Unescape)?;
598
599 if let Some(ch) = boundary {
600 dst(UnescapedToken::Unescaped(ch)).map_err(UnescapeFnError::Dst)?;
601 }
602
603 for token in next {
604 let token = token.map_err(UnescapeFnError::Unescape)?;
605 dst(token).map_err(UnescapeFnError::Dst)?;
606 }
607 }
608
609 self.finish().map_err(UnescapeFnError::Unescape)
610 }
611}
612
613/// An error that can occur during the `unescape_from_source` operation.
614///
615/// This enum consolidates errors from the three potential points of failure:
616/// 1. Reading from the source (`Src`).
617/// 2. The JSON unescaping process itself (`Unescape`).
618/// 3. Writing to the destination (`Dst`).
619#[derive(Clone, Debug)]
620pub enum UnescapeFnError<Src, Dst> {
621 /// An error occurred during the unescaping process.
622 Unescape(UnescapeError),
623 /// An error occurred while reading from the source.
624 Src(Src),
625 /// An error occurred while writing to the destination.
626 Dst(Dst),
627}
628
629impl<Src, Dst: core::fmt::Display> core::fmt::Display for UnescapeFnError<Src, Dst>
630where
631 Src: core::fmt::Display,
632{
633 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
634 match self {
635 UnescapeFnError::Unescape(e) => write!(f, "unescape error: {e}"),
636 UnescapeFnError::Src(e) => write!(f, "source error: {e}"),
637 UnescapeFnError::Dst(e) => write!(f, "destination error: {e}"),
638 }
639 }
640}
641
642#[cfg(feature = "std")]
643impl<Src, Dst> std::error::Error for UnescapeFnError<Src, Dst>
644where
645 Src: std::error::Error + 'static,
646 Dst: std::error::Error + 'static,
647{
648 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
649 match self {
650 UnescapeFnError::Unescape(e) => Some(e),
651 UnescapeFnError::Src(e) => Some(e),
652 UnescapeFnError::Dst(e) => Some(e),
653 }
654 }
655}
656
657impl From<UnescapeFnError<Infallible, Infallible>> for UnescapeError {
658 fn from(value: UnescapeFnError<Infallible, Infallible>) -> Self {
659 match value {
660 UnescapeFnError::Unescape(unescape_error) => unescape_error,
661 UnescapeFnError::Src(i) => match i {},
662 UnescapeFnError::Dst(i) => match i {},
663 }
664 }
665}
666
667/// An iterator over the unescaped parts of a single byte slice.
668///
669/// This struct is created by [`UnescapeStream::unescape_next`].
670#[must_use = "iterators are lazy and do nothing unless consumed"]
671#[derive(Debug)]
672pub struct UnescapeNext<'a, 'b> {
673 stream: &'a mut UnescapeStream,
674 inner: UnescapeTokens<'b>,
675}
676
677impl<'a, 'b> Iterator for UnescapeNext<'a, 'b> {
678 type Item = Result<UnescapedToken<'b>, UnescapeError>;
679
680 #[inline]
681 fn next(&mut self) -> Option<Self::Item> {
682 match self.inner.next() {
683 // ASSUMPTION: Unfinished high surrogate will cause UnexpectedEof
684 Some(Err(e)) if e.kind == UnescapeErrorKind::UnexpectedEof => {
685 // The current chunk ends with an incomplete escape sequence.
686 // Save the remnant for the next call to `unescape_next`.
687 //
688 // ASSUMPTION: UnescapeTokens will not update state on error.
689 let remnant = self.inner.remnant();
690 debug_assert!(!remnant.is_empty() && remnant[0] == b'\\', "{remnant:?}");
691 debug_assert!(remnant.len() < self.stream.stitch_buf.len(), "{remnant:?}");
692
693 // Copy the remnant and update the length.
694 self.stream.stitch_buf[..remnant.len()].copy_from_slice(remnant);
695 self.stream.stitch_len = remnant.len() as u8;
696
697 // Stop iterating for this chunk.
698 None
699 }
700 // A definitive error or a valid chunk.
701 other => other,
702 }
703 }
704}
705
706// =============================================================================
707// Traits
708// =============================================================================
709
710/// This trait is designed to handle byte streams efficiently, especially when the
711/// source needs to borrow from an internal buffer between calls. A simple closure
712/// (`FnMut() -> Option<Result<B, E>>`) cannot express this lifetime relationship,
713/// as the returned slice would need to outlive the closure call itself. This trait
714/// solves that by making the source a mutable object that you call repeatedly.
715///
716/// Async functionality can be achieved by the original API
717#[deprecated(
718 since = "0.3.1",
719 note = "This sync-only trait is superseded by the runtime-agnostic `unescape_stream_into!` macro."
720)]
721pub trait ChunkSource {
722 /// The type of error that can occur when reading a chunk.
723 type Error;
724
725 /// The type of chunk returned, which must implement AsRef<[u8]>
726 type Chunk<'a>: AsRef<[u8]> + 'a
727 where
728 Self: 'a;
729
730 /// Get the next chunk of bytes.
731 ///
732 /// Returns `None` when the source is exhausted, `Some(Ok(bytes))` for a successful chunk,
733 /// or `Some(Err(e))` if an error occurred.
734 ///
735 /// The returned slice is valid until the next call to `next_chunk` or until the
736 /// source is dropped.
737 fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>>;
738}
739
740#[allow(deprecated)]
741impl<T> ChunkSource for &mut T
742where
743 T: ChunkSource,
744{
745 type Error = T::Error;
746
747 type Chunk<'a>
748 = T::Chunk<'a>
749 where
750 Self: 'a;
751
752 #[inline]
753 fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
754 (*self).next_chunk()
755 }
756}
757
758/// A `ChunkSource` that reads from any `std::io::Read` type.
759///
760/// This struct manages an internal buffer, which is filled with data on each read
761/// operation. You can configure the buffer size to balance memory usage and I/O
762/// performance.
763#[cfg(feature = "std")]
764#[deprecated(
765 since = "0.3.1",
766 note = "This sync-only struct is superseded by the runtime-agnostic `unescape_stream_into!` macro."
767)]
768pub struct ReadChunkSource<R, B = Vec<u8>> {
769 reader: R,
770 buffer: B,
771}
772
773#[cfg(feature = "std")]
774#[allow(deprecated)]
775impl<R, B> ReadChunkSource<R, B> {
776 /// Creates a new `ReadChunkSource` with the given reader and a pre-allocated buffer.
777 ///
778 /// The size of the chunks read will be determined by the buffer's capacity.
779 pub fn new(reader: R, buffer: B) -> Self {
780 Self { reader, buffer }
781 }
782
783 /// Creates a new `ReadChunkSource` with the specified buffer size.
784 ///
785 /// This is a convenience method that creates a new `Vec<u8>` with the given
786 /// *length* (not just capacity) to use as the internal buffer.
787 pub fn with_buffer_size(reader: R, size: usize) -> ReadChunkSource<R, Vec<u8>> {
788 // Vec::with_capacity(size) creates a Vec with len() == 0.
789 ReadChunkSource::new(reader, std::vec![0u8; size])
790 }
791}
792
793#[allow(deprecated)]
794impl<R, B> ChunkSource for ReadChunkSource<R, B>
795where
796 R: std::io::Read,
797 B: AsMut<[u8]>,
798{
799 type Error = std::io::Error;
800 type Chunk<'a>
801 = &'a [u8]
802 where
803 Self: 'a;
804
805 #[inline]
806 fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
807 let buffer = self.buffer.as_mut();
808 // TODO; Make more robust (temporary error)
809 match self.reader.read(buffer) {
810 Ok(0) => None, // EOF
811 Ok(n) => Some(Ok(&buffer[..n])),
812 Err(e) => Some(Err(e)),
813 }
814 }
815}
816
817/// A `ChunkSource` implementation that wraps a mutable closure (`FnMut`).
818///
819/// This struct is generic over a lifetime `'s`, the closure type `F`,
820/// the chunk type `B`, and the error type `E`. It correctly handles the
821/// lifetime of the returned chunks, which are guaranteed to live at least
822/// as long as the `'s` lifetime associated with the struct.
823pub struct FnMutChunkSource<'s, F, B, E>
824where
825 F: FnMut() -> Option<Result<B, E>>,
826 B: AsRef<[u8]> + 's,
827{
828 /// The wrapped closure that produces chunks.
829 closure: F,
830 /// A marker to inform the compiler about the `'s` lifetime. This is
831 /// necessary because the closure's return type `B` is tied to this
832 /// lifetime, but the struct doesn't directly hold a reference with
833 /// that lifetime.
834 _phantom: core::marker::PhantomData<&'s ()>,
835}
836
837impl<'s, F, B, E> FnMutChunkSource<'s, F, B, E>
838where
839 F: FnMut() -> Option<Result<B, E>>,
840 B: AsRef<[u8]> + 's,
841{
842 /// Creates a new `FnMutChunkSource`.
843 ///
844 /// This function takes a closure `F` that will be used to generate chunks. The
845 /// closure must return `Option<Result<B, E>>`, where `B` is a type that can be
846 /// borrowed as a `&[u8]` and `E` is the error type.
847 pub fn new(closure: F) -> Self {
848 FnMutChunkSource {
849 closure,
850 _phantom: core::marker::PhantomData,
851 }
852 }
853}
854
855#[allow(deprecated)]
856impl<'s, F, B, E> ChunkSource for FnMutChunkSource<'s, F, B, E>
857where
858 F: FnMut() -> Option<Result<B, E>>,
859 B: AsRef<[u8]> + 's,
860{
861 type Error = E;
862 type Chunk<'a>
863 = B
864 where
865 Self: 'a;
866
867 #[inline(always)]
868 fn next_chunk<'a>(&'a mut self) -> Option<Result<Self::Chunk<'a>, Self::Error>> {
869 (self.closure)()
870 }
871}
872
873// Here we test our assumptions
874#[cfg(test)]
875mod assumptions_tests {
876 use super::*;
877
878 // Test that partial surrogate isn't lone-surrogate but
879 // actually EOF
880 #[test]
881 fn insufficient_data_is_not_lone_surrogate() {
882 let mut unescaper = unescape(br"\uD83D");
883 assert_eq!(
884 unescaper.next().unwrap().unwrap_err(),
885 UnescapeError {
886 kind: UnescapeErrorKind::UnexpectedEof,
887 offset: 6
888 }
889 );
890
891 let mut unescaper = unescape(br"\uD83D\");
892 assert_eq!(
893 unescaper.next().unwrap().unwrap_err(),
894 UnescapeError {
895 kind: UnescapeErrorKind::UnexpectedEof,
896 offset: 7
897 }
898 );
899 }
900
901 #[test]
902 fn lone_surrogate_is_not_eof() {
903 let mut unescaper = unescape(br"\uD83Da");
904 assert_eq!(
905 unescaper.next().unwrap().unwrap_err(),
906 UnescapeError {
907 kind: UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError {
908 surrogate: 0xD83D
909 }),
910 offset: 6
911 }
912 );
913 }
914
915 #[test]
916 fn unescape_does_not_commit_on_error() {
917 let err_input = br"\uD83D";
918 let mut unescaper = unescape(err_input);
919 let err = unescaper.next().unwrap().unwrap_err();
920 assert_eq!(
921 err,
922 UnescapeError {
923 kind: UnescapeErrorKind::UnexpectedEof,
924 offset: 6
925 }
926 );
927
928 assert_eq!(unescaper.remnant(), err_input);
929 }
930
931 #[test]
932 fn unescape_keeps_erroring() {
933 // This tests that an error is sticky.
934 let err_input = br"\z";
935 let mut unescaper = unescape(err_input);
936 let err_1 = unescaper.next().unwrap().unwrap_err();
937 // The iterator should continue to yield the same error.
938 let err_2 = unescaper.next().unwrap().unwrap_err();
939
940 assert_eq!(err_1, err_2)
941 }
942}
943
944#[cfg(test)]
945mod tests {
946 use super::*;
947 use std::{str, string::String, vec::Vec};
948
949 /// Helper to run a stream test and collect the output into a string.
950 fn run_stream_test<I, S>(parts: I) -> Result<String, UnescapeError>
951 where
952 I: IntoIterator<Item = S>,
953 S: AsRef<[u8]>,
954 {
955 let unescaper = UnescapeStream::new();
956 let mut parts = parts.into_iter();
957 let mut output = String::new();
958 #[allow(deprecated)]
959 unescaper.unescape_from_fn::<_, _, Infallible, Infallible, _>(
960 || parts.next().map(Ok),
961 |token| {
962 match token {
963 UnescapedToken::Unescaped(c) => output.push(c),
964 UnescapedToken::Literal(s) => output.push_str(str::from_utf8(s).unwrap()),
965 }
966 Ok(())
967 },
968 )?;
969
970 Ok(output)
971 }
972
973 #[test]
974 fn test_single_chunk_no_escapes() {
975 let parts = [br"hello world"];
976 assert_eq!(run_stream_test(parts).unwrap(), "hello world");
977 }
978
979 #[test]
980 fn test_single_chunk_with_escapes() {
981 let parts = [br#"hello \"world\t\n\\"#];
982 assert_eq!(run_stream_test(parts).unwrap(), "hello \"world\t\n\\");
983 }
984
985 #[test]
986 fn test_multiple_chunks_no_escapes() {
987 let parts = [&br"hello "[..], &br"world"[..]];
988 assert_eq!(run_stream_test(parts).unwrap(), "hello world");
989 }
990
991 #[test]
992 fn test_empty_chunks() {
993 let parts = [
994 &br"hello"[..],
995 &br""[..],
996 &br" "[..],
997 &br""[..],
998 &br"world"[..],
999 ];
1000 assert_eq!(run_stream_test(parts).unwrap(), "hello world");
1001 }
1002
1003 #[test]
1004 fn test_split_before_escape() {
1005 let parts = [&br"hello"[..], &br"\nworld"[..]];
1006 assert_eq!(run_stream_test(parts).unwrap(), "hello\nworld");
1007 }
1008
1009 #[test]
1010 fn test_split_during_simple_escape() {
1011 let parts = [&br"hello\"[..], &br"nworld"[..]];
1012 assert_eq!(run_stream_test(parts).unwrap(), "hello\nworld");
1013 }
1014
1015 #[test]
1016 fn test_split_during_unicode_escape() {
1017 // Euro symbol € is U+20AC
1018 let parts = [&br"price: \u20"[..], &br"AC"[..]];
1019 assert_eq!(run_stream_test(parts).unwrap(), "price: €");
1020 }
1021
1022 #[test]
1023 fn test_split_during_surrogate_pair() {
1024 // Grinning face 😀 is U+1F600 -> \uD83D\uDE00
1025 let parts = [&br"emoji: \uD83D"[..], &br"\uDE00"[..]];
1026 assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1027 }
1028
1029 #[test]
1030 fn test_split_between_surrogate_pair_halves() {
1031 let parts = [&br"emoji: \uD83D\"[..], &br"uDE00"[..]];
1032 assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1033 }
1034
1035 #[test]
1036 fn test_split_in_second_surrogate() {
1037 // Grinning face 😀 is U+1F600 -> \uD83D\uDE00
1038 let parts = [&br"emoji: \uD83D\uDE"[..], &br"00"[..]];
1039 assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1040 }
1041
1042 #[test]
1043 fn test_tiny_chunks_across_surrogate_pair() {
1044 // emoji: 😀 (\uD83D\uDE00)
1045 let parts = [
1046 br"e", br"m", br"o", br"j", br"i", br":", br" ", br"\", br"u", br"D", br"8", br"3",
1047 br"D", br"\", br"u", br"D", br"E", br"0", br"0",
1048 ];
1049 assert_eq!(run_stream_test(parts).unwrap(), "emoji: 😀");
1050 }
1051
1052 #[test]
1053 fn test_finish_success() {
1054 let mut unescaper = UnescapeStream::new();
1055 let (boundary, rest) = unescaper.unescape_next(br"hello");
1056 assert!(boundary.is_none());
1057 assert_eq!(
1058 rest.map(|r| r.unwrap()).collect::<Vec<UnescapedToken>>(),
1059 alloc::vec![UnescapedToken::Literal(br"hello")]
1060 );
1061
1062 let (boundary, rest) = unescaper.unescape_next(br" world");
1063 assert!(boundary.is_none());
1064 assert_eq!(
1065 rest.map(|r| r.unwrap()).collect::<Vec<UnescapedToken>>(),
1066 alloc::vec![UnescapedToken::Literal(br" world")]
1067 );
1068
1069 // Now finish it, it should be successful.
1070 assert!(unescaper.finish().is_ok());
1071 }
1072
1073 #[test]
1074 fn test_finish_error_on_incomplete_escape() {
1075 let mut unescaper = UnescapeStream::new();
1076 let (_, next) = unescaper.unescape_next(br"hello\");
1077 // drain next
1078 next.for_each(|r| {
1079 r.unwrap();
1080 });
1081 let err = unescaper.finish().unwrap_err();
1082 assert_eq!(err.kind, UnescapeErrorKind::UnexpectedEof);
1083 }
1084
1085 #[test]
1086 fn test_finish_error_on_incomplete_unicode() {
1087 let mut unescaper = UnescapeStream::new();
1088 let (_, next) = unescaper.unescape_next(br"hello\u12");
1089 // drain next
1090 next.for_each(|r| {
1091 r.unwrap();
1092 });
1093 let err = unescaper.finish().unwrap_err();
1094 assert_eq!(err.kind, UnescapeErrorKind::UnexpectedEof);
1095 }
1096
1097 #[test]
1098 fn test_error_across_boundary_invalid_escape() {
1099 let parts = [&br"oh\"[..], &br"z"[..]];
1100 let err = run_stream_test(parts).unwrap_err();
1101 assert_eq!(
1102 err.kind,
1103 UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1104 )
1105 }
1106
1107 #[test]
1108 fn test_error_across_boundary_lone_surrogate() {
1109 // High surrogate followed by non-low-surrogate
1110 let parts = [&br"\uD83D"[..], &br"abc"[..]];
1111 let err = run_stream_test(parts).unwrap_err();
1112 assert_eq!(
1113 err.kind,
1114 UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError { surrogate: 0xD83D })
1115 );
1116 }
1117
1118 #[test]
1119 fn test_error_across_boundary_not_low_surrogate() {
1120 // High surrogate followed by another valid escape that is not a low surrogate
1121 let parts = [&br"\uD83D"[..], &br"\u0020"[..]]; // \u0020 is a space
1122 let err = run_stream_test(parts).unwrap_err();
1123 assert_eq!(
1124 err.kind,
1125 UnescapeErrorKind::LoneSurrogate(crate::LoneSurrogateError { surrogate: 0xD83D })
1126 );
1127 }
1128
1129 #[test]
1130 fn test_clear_after_error() {
1131 let mut unescaper = UnescapeStream::new();
1132 let (_, next) = unescaper.try_unescape_next("abc\\").unwrap();
1133
1134 // drain next
1135 next.for_each(|r| {
1136 r.unwrap();
1137 });
1138
1139 // This will now cause an error across the boundary
1140 let err = unescaper.try_unescape_next(br"z").unwrap_err();
1141 assert_eq!(
1142 err.kind,
1143 UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1144 );
1145
1146 // After the error, the internal state contains the partial data that caused it.
1147 // Finishing it would report an error. We use clone() to check without consuming.
1148 assert!(unescaper.clone().finish().is_err());
1149
1150 // But if we clear it, the state is reset.
1151 unescaper.clear();
1152 assert!(unescaper.clone().finish().is_ok());
1153
1154 // And we can continue processing new data.
1155 let mut output = String::new();
1156 let (boundary, rest) = unescaper.try_unescape_next(br"good data").unwrap();
1157 assert!(boundary.is_none());
1158 for token in rest {
1159 match token {
1160 Ok(UnescapedToken::Literal(literal)) => {
1161 output.push_str(str::from_utf8(literal).unwrap())
1162 }
1163 _ => unreachable!(),
1164 }
1165 }
1166 assert_eq!(output, "good data");
1167 }
1168
1169 #[test]
1170 fn test_error_after_successful_boundary() {
1171 let mut unescaper = UnescapeStream::new();
1172 // First part is an incomplete surrogate
1173 let (_, mut rest) = unescaper.unescape_next(br"\uD83D");
1174 assert!(rest.next().is_none()); // The iterator is consumed into the stitch buffer
1175
1176 // Second part completes the surrogate but has an error after it
1177 let (boundary, mut rest) = unescaper.unescape_next(br"\uDE00\z");
1178
1179 // The boundary char should be resolved correctly
1180 let boundary_char = boundary.unwrap().unwrap();
1181 assert_eq!(boundary_char, '😀');
1182
1183 // The next token from the iterator should be the error
1184 let err = rest.next().unwrap().unwrap_err();
1185 assert_eq!(
1186 err.kind,
1187 UnescapeErrorKind::InvalidEscape(crate::InvalidEscapeError { found: b'z' })
1188 );
1189
1190 // Since the error did not happen at the EOF of the chunk,
1191 // the stitch buffer is empty.
1192 assert_eq!(unescaper.stitch_len, 0);
1193
1194 // Therefore, finishing the stream now should be OK, as there's no partial data.
1195 // The error was contained entirely within the second chunk.
1196 assert!(unescaper.finish().is_ok());
1197 }
1198
1199 #[test]
1200 #[allow(deprecated)]
1201 fn test_unescape_from_source_src_error() {
1202 let unescaper = UnescapeStream::new();
1203 let mut parts = alloc::vec![Ok(b"hello".as_slice()), Err("read error")].into_iter();
1204 let result =
1205 unescaper.unescape_from_fn(|| parts.next(), |_| -> Result<(), Infallible> { Ok(()) });
1206 match result {
1207 Err(UnescapeFnError::Src("read error")) => (), // pass
1208 _ => panic!("Expected a source error"),
1209 }
1210 }
1211
1212 #[test]
1213 #[allow(deprecated)]
1214 fn test_unescape_from_source_dst_error() {
1215 let unescaper = UnescapeStream::new();
1216 let mut parts = alloc::vec![Result::<_, ()>::Ok("hello")].into_iter();
1217 let result = unescaper.unescape_from_fn(
1218 || parts.next(),
1219 |_| -> Result<(), &str> { Err("write error") },
1220 );
1221 match result {
1222 Err(UnescapeFnError::Dst("write error")) => (), // pass
1223 _ => panic!("Expected a destination error"),
1224 }
1225 }
1226
1227 #[test]
1228 fn macro_stream() {
1229 fn sync_stream() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1230 // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
1231 let mut parts = std::vec![
1232 br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
1233 br#"\uDE00"}"#.as_slice(),
1234 ]
1235 .into_iter();
1236
1237 let mut unescaped_string = std::string::String::new();
1238
1239 unescape_stream_into! {
1240 Read: {
1241 parts.next()
1242 },
1243 Write: |token| {
1244 match token {
1245 UnescapedToken::Literal(literal) => {
1246 unescaped_string.push_str(std::str::from_utf8(literal)?)
1247 }
1248 UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
1249 }
1250 },
1251 };
1252
1253 assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
1254 Ok(())
1255 }
1256
1257 fn sync_read() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1258 let mut read = &br#"{"message": "Hello, W\"orld! \uD83D\uDE00"}"#[..];
1259 let buffer = &mut [0u8; 5][..];
1260
1261 let mut unescaped_string = std::string::String::new();
1262 use std::io::Read;
1263 unescape_stream_into! {
1264 Read: {
1265 let n = read.read(buffer)?;
1266 if n == 0 {
1267 // break
1268 None
1269 } else {
1270 Some(&buffer[..n])
1271 }
1272 },
1273 Write: |token| {
1274 match token {
1275 UnescapedToken::Literal(literal) => {
1276 unescaped_string.push_str(std::str::from_utf8(literal)?)
1277 }
1278 UnescapedToken::Unescaped(ch) => unescaped_string.push(ch),
1279 }
1280 },
1281 };
1282
1283 assert_eq!(unescaped_string, r#"{"message": "Hello, W"orld! 😀"}"#);
1284 Ok(())
1285 }
1286
1287 async fn async_stream() -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1288 struct AsyncIter<'a> {
1289 iter: <Vec<&'a [u8]> as IntoIterator>::IntoIter,
1290 }
1291
1292 impl<'a> AsyncIter<'a> {
1293 async fn next(&mut self) -> Option<&'a [u8]> {
1294 self.iter.next()
1295 }
1296 }
1297
1298 struct AsyncWrite {
1299 write: std::string::String,
1300 }
1301
1302 impl AsyncWrite {
1303 async fn write(
1304 &mut self,
1305 token: UnescapedToken<'_>,
1306 ) -> Result<(), std::boxed::Box<dyn std::error::Error>> {
1307 match token {
1308 UnescapedToken::Literal(literal) => {
1309 self.write.push_str(std::str::from_utf8(literal)?)
1310 }
1311 UnescapedToken::Unescaped(ch) => self.write.push(ch),
1312 }
1313 Ok(())
1314 }
1315 }
1316
1317 // The surrogate pair `\uD83D\uDE00` (😀) is split across parts.
1318 let mut parts = AsyncIter {
1319 iter: std::vec![
1320 br#"{"message": "Hello, W\"orld! \uD83D"#.as_slice(),
1321 br#"\uDE00"}"#.as_slice(),
1322 ]
1323 .into_iter(),
1324 };
1325
1326 let mut unescaped_string = AsyncWrite {
1327 write: std::string::String::new(),
1328 };
1329
1330 unescape_stream_into! {
1331 Read: {
1332 parts.next().await
1333 },
1334 Write: |token| {
1335 unescaped_string.write(token).await?
1336 },
1337 };
1338
1339 assert_eq!(
1340 unescaped_string.write,
1341 r#"{"message": "Hello, W"orld! 😀"}"#
1342 );
1343 Ok(())
1344 }
1345
1346 sync_stream().unwrap();
1347 sync_read().unwrap();
1348
1349 let fut = std::pin::pin!(async_stream());
1350 use std::future::Future;
1351 let result = fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop()));
1352 assert!(matches!(result, std::task::Poll::Ready(Ok(()))))
1353 }
1354}