push_decode/
lib.rs

1//! # Push-based decoding
2//!
3//! This crate provides abstractions for push-based decoding and pull-based encoding.
4//! That means, the caller is responsible for obtaining the bytes to decode and feeding them into
5//! decoder or pulling bytes from encoder and feeding them into writr.
6//!
7//! The main advantage of this approach is that it's IO-agnostic, which implies both
8//! **`async`-agnostic** and `no_std`. You can use the same code to deserialize from sync
9//! and `async` readers and only need a tiny piece of code to connect the reader to a decoder. This
10//! piece of code is provided by this crate for `std`, [`lgio`] (usable with `no_std`), `tokio`, `futures` and `async-std`.
11//!
12//! # Features
13//!
14//! * `std` - enables integration with the standard library - it's IO and error traits
15//! * `alloc` - enables integration with the standard `alloc` crate
16//! * `lgio` - connects decoders to lgio IO.
17//! * `tokio` - connects decoders to Tokio IO.
18//! * `async-std` - connects decoders to async-std IO.
19//! * `futures_0_3` - connects decoders to futures 0.3.x IO
20
21#![no_std]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23#![cfg_attr(docsrs, feature(doc_cfg))]
24
25#[cfg(feature = "std")]
26extern crate std;
27
28#[cfg(feature = "alloc")]
29#[cfg_attr(test, macro_use)]
30extern crate alloc;
31
32#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
33use core::pin::Pin;
34
35#[cfg(feature = "tokio")]
36use actual_tokio as tokio;
37#[cfg(feature = "async-std")]
38use actual_async_std as async_std;
39
40pub mod decoders;
41pub mod encoders;
42pub mod error;
43pub mod int;
44#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
45pub mod future;
46mod macros;
47
48use core::fmt;
49use core::ops::ControlFlow;
50
51/// Represents types responsible for decoding bytes pushed into it.
52///
53/// The types implementing this trait act like state machines (similar to futures) but instead of
54/// pulling data from some internal source they receive it in method calls. So they are actually
55/// much closer to the traditional state machines than futures.
56pub trait Decoder: Sized {
57    /// The type of value produced by this decoder.
58    type Value;
59    /// Decoding error.
60    type Error;
61
62    /// Processes nex chunk of bytes and updates the cursor.
63    ///
64    /// The decoder has to processes the chunk of bytes performing validation and transformation.
65    ///
66    /// If the bytes are valid the slice is updated to point to unread part. Thus if the slice is
67    /// non-epty after this method returns the decoder ended decoding.
68    ///
69    /// # Errors
70    ///
71    /// An error is returned in case the bytes are invalid. The validity is defined by the
72    /// implementor.
73    ///
74    /// **No** error may be returned if the number of bytes passed is not sufficient to decode the
75    /// value - the remaining bytes will be passed in the following call(s) of this method.
76    fn decode_chunk(&mut self, bytes: &mut &[u8]) -> Result<(), Self::Error>;
77
78    /// Called when decoding has ended or there are no more bytes.
79    ///
80    /// The decoder must validate the bytes passed in so far if it didn't do so yet and return the
81    /// decoded value or an error if the bytes were invalid.
82    ///
83    /// # Errors
84    ///
85    /// This returns an error if the bytes passed so far are invalid as defined by the decoder.
86    /// This commonly happens if the byte stream ended unexpectedly.
87    fn end(self) -> Result<Self::Value, Self::Error>;
88
89    /// Processes nex chunk of bytes without updating the cursor.
90    ///
91    /// This method is usually more convenient for the top-level callers which are receiving bytes
92    /// from buffered readers. Instead of modifying the slice this returns the number of bytes
93    /// consumed which can be passed to the `consume` method of a buffered reader.
94    fn bytes_received(&mut self, mut bytes: &[u8]) -> Result<usize, Self::Error> {
95        let prev_len = bytes.len();
96        self.decode_chunk(&mut bytes)?;
97        Ok(prev_len - bytes.len())
98    }
99
100    /// Chains another decoder after this one finishes such that the value of this one is used to
101    /// initialize the next one.
102    fn then<R: Decoder, F: FnOnce(Self::Value) -> R>(self, fun: F) -> decoders::combinators::Then<Self, R, F> {
103        decoders::combinators::Then::new(self, fun)
104    }
105
106    /// Chains another decoder after this one finishes such that the value of this one is used to
107    /// initialize the next one.
108    ///
109    /// Unlike `then` this combinator may also return an error and convert the errors into a custom
110    /// one.
111    fn then_try<E, R: Decoder, F: FnOnce(Self::Value) -> Result<R, E>>(self, fun: F) -> decoders::combinators::ThenTry<E, Self, R, F> where E: From<Self::Error> + From<R::Error> {
112        decoders::combinators::ThenTry::new(self, fun)
113    }
114
115    /// Chains another decoder after this one to decode two values.
116    fn chain<D: Decoder>(self, following: D) -> decoders::combinators::Chain<Self, D> {
117        decoders::combinators::Chain::new(self, following)
118    }
119
120    /// Resets the decoder returning the decoded value.
121    fn take(&mut self) -> Result<Self::Value, Self::Error> where Self: Default {
122        core::mem::take(self).end()
123    }
124
125    /// Decodes a value from lower-level decoder.
126    ///
127    /// When multiple decoders are chained one after another in a large state machine this method
128    /// can simplify delegation of decoding to the underlying decoder. You can wrap decoding in a
129    /// closure passed to [`Self::wrap_sub_decode`] and then just call `sub_decode()?` at the
130    /// beginning of each decoding state and continue working with the returned value.
131    ///
132    /// The method also accepts a function (closure) to convert the errors since using `map_err`
133    /// would be annoying because of double wrapping. In case no conversion is desired simply pass
134    /// in [`core::convert::identity`].
135    ///
136    /// Note that this requires the `Default` trait because it resets the decoder every time a
137    /// value is decoded. Apart from this resolving borrowing issues it also allows easily decoding
138    /// a stream of value in a loop. If you need to work with decoders that require a value (e.g.
139    /// [`VecDecoder`](decoders::VecDecoder)) it is recommended to create a specialized decoder that
140    /// will decode both (e.g. using [`Then`](decoders::combinators::then)) and call sub_deode on
141    /// that.
142    ///
143    /// You may notice this looks a lot like `await` and in principle it is very similar. The
144    /// differences are:
145    ///
146    /// * `await` also implements the state machine using `Future` trait. This doesn't. The
147    ///   `Future::poll` method would have to have another argument for us to be able to use it.
148    /// * This returns `ControlFlow` instead of `Poll<Result>` to make it return in case of "not
149    ///   ready" as well. The `Try` implementation on `Poll` only returns on `Err`, never on
150    ///   `Pending`. This is important for ergonomics.
151    /// * While it could be argued the type is morally `Poll<Error>` this one doesn't implement
152    ///   `Try` either so it's unsuitable for the purpose.
153    fn sub_decode<E, F: FnMut(Self::Error) -> E>(&mut self, bytes: &mut &[u8], mut map_err: F) -> ControlFlow<Result<(), E>, Self::Value> where Self: Default {
154        if let Err(error) = self.decode_chunk(bytes) {
155            return ControlFlow::Break(Err(map_err(error)));
156        }
157        if bytes.is_empty() {
158            ControlFlow::Break(Ok(()))
159        } else {
160            match self.take() {
161                Ok(value) => ControlFlow::Continue(value),
162                Err(error) => ControlFlow::Break(Err(map_err(error))),
163            }
164        }
165    }
166
167    /// Helper for using sub_decode.
168    ///
169    /// This can be used together with [`sub_decode`](Self::sub_decode) on sub-decoders to make
170    /// decoding easier. It helps with type inference and converts `ControlFlow` into `Result`.
171    ///
172    /// Note that this doesn't allow returning `ControlFlow::Continue` as that wouldn't make sense.
173    /// It is recommended to just return `ControlFlow::Break` with the result returned from
174    /// `decode_chunk` of the last decoder.
175    fn wrap_sub_decode<F: FnOnce() -> ControlFlow<Result<(), Self::Error>, core::convert::Infallible>>(f: F) -> Result<(), Self::Error> {
176        match f() {
177            ControlFlow::Continue(never) => match never {},
178            ControlFlow::Break(result) => result,
179        }
180    }
181}
182
183/// Represents types producing bytes of some encoded value.
184pub trait Encoder: Sized {
185    /// Provides next chunk of encoded bytes.
186    ///
187    /// The returned bytes represent a part of the value being encoded and should be written to a
188    /// writer by the consumer. Empty returned value indicates end - there are no more bytes to be
189    /// encoded.
190    ///
191    /// The returned value MUST be the same for all calls of `encoded_chunk` until
192    /// [`next()`](Self::next) is called. IOW it's not allowed to use interior mutability or global
193    /// state (randomness) to affect the returned value.
194    ///
195    /// It's recommended that this method returns bytes within the value if possible or minimal
196    /// required buffer otherwise. The *consumers* of the trait are responsible for buffering, so
197    /// buffering inside encoder would decrease performance.
198    #[must_use = "This method only returns bytes and doesn't modify the target"]
199    fn encoded_chunk(&self) -> &[u8];
200
201    /// Advances the state to get the next chunk of encoded bytes.
202    ///
203    /// Calling this method signals to the encoder that the current chunk was fully processed. The
204    /// encoder MUST either:
205    ///
206    /// * return `true` and provide the next chunk of data in folowing calls to
207    ///   [`encoded_chunk`](Self::encoded_chunk).
208    /// * return `false` indicating there is no more data.
209    ///
210    /// The encoder MUST NOT panic or cause similar undesirable behavior if `next` was called again
211    /// after it previously returned `false`.  It must simply return `false` again.
212    ///
213    /// Note that the implementor DOES NOT need to guarantee that `encoded_chunk` is empty after
214    /// this method returned `false` and for performance reasons it's not advisable to do in
215    /// release builds. The consumers are responsible for handling this situation. The consumers
216    /// may use [`track_position`](Self::track_position) to get a more convenient interface handling
217    /// these edge-cases and tracking byte position.
218    #[must_use = "Relying on encoded_chunk being empty is insufficient"]
219    fn next(&mut self) -> bool;
220
221    /// Returns a wrapper that tracks the position of processed bytes.
222    ///
223    /// The returned wrapper has a bit different interface that is more suitable for writing into
224    /// writers. It can handle partial writes and makes handling of end easier. Thus downstream
225    /// consumers are encouraged to use it where it makes sense.
226    fn track_position(self) -> EncoderPositionTracker<Self> {
227        EncoderPositionTracker::new(self)
228    }
229
230    fn write_to_slice(mut self, buf: &mut &mut [u8]) -> Result<(), error::BufferOverflow> {
231        while !self.encoded_chunk().is_empty() {
232            let chunk = self.encoded_chunk();
233            if chunk.len() > buf.len() {
234                return Err(error::BufferOverflow { bytes_past_end: chunk.len() - buf.len()});
235            }
236            buf[..chunk.len()].copy_from_slice(chunk);
237            *buf = &mut core::mem::take(buf)[chunk.len()..];
238            if !self.next() {
239                break;
240            }
241        }
242        Ok(())
243    }
244
245    /// Writes all encoded bytes to a vec.
246    ///
247    /// Note that this does **not** call `reserve` since there's no way to know the amount to
248    /// reserve. This should be handled by the code producing the decoder instead.
249    #[cfg(feature = "alloc")]
250    fn write_to_vec(mut self, buf: &mut alloc::vec::Vec<u8>) {
251        while !self.encoded_chunk().is_empty() {
252            buf.extend_from_slice(self.encoded_chunk());
253            if !self.next() {
254                break;
255            }
256        }
257    }
258
259    /// Writes all encoded bytes to the `std` writer.
260    #[cfg(feature = "std")]
261    fn write_all_sync<W: std::io::Write + BufWrite>(mut self, mut writer: W) -> std::io::Result<()> {
262        while !self.encoded_chunk().is_empty() {
263            writer.write_all(self.encoded_chunk())?;
264            if !self.next() {
265                break;
266            }
267        }
268        Ok(())
269    }
270
271    /// Writes all encoded bytes to the `std` writer.
272    #[cfg(feature = "lgio")]
273    fn write_all_sync_lgio<W: lgio::BufWrite>(mut self, mut writer: W) -> Result<(), W::WriteError> {
274        while !self.encoded_chunk().is_empty() {
275            writer.write_all(self.encoded_chunk())?;
276            if !self.next() {
277                break;
278            }
279        }
280        Ok(())
281    }
282
283    /// Writes all encoded bytes to the `tokio` async writer.
284    ///
285    /// The returned future resolves to `std::io::Result<()>`.
286    #[cfg(feature = "tokio")]
287    fn write_all_tokio<W: tokio::io::AsyncWrite + BufWrite>(self, writer: W) -> future::TokioEncodeFuture<W, Self> {
288        future::TokioEncodeFuture::new(writer, self)
289    }
290
291    /// Writes all encoded bytes to the `async-std` async writer.
292    ///
293    /// The returned future resolves to `std::io::Result<()>`.
294    #[cfg(feature = "async-std")]
295    fn write_all_async_std<W: async_std::io::Write + BufWrite>(self, writer: W) -> future::AsyncStdEncodeFuture<W, Self> {
296        future::AsyncStdEncodeFuture::new(writer, self)
297    }
298
299    /// Writes all encoded bytes to the `futures` 0.3 async writer.
300    ///
301    /// The returned future resolves to `std::io::Result<()>`.
302    #[cfg(feature = "futures_0_3")]
303    fn write_all_futures_0_3<W: futures_io_0_3::AsyncWrite + BufWrite>(self, writer: W) -> future::Futures0Dot3EncodeFuture<W, Self> {
304        future::Futures0Dot3EncodeFuture::new(writer, self)
305    }
306
307    /// Chains an encoder constructed by `second_encoder_constructor` after this one.
308    ///
309    /// This is similar to [`chain`](Self::chain) but only incurs the cost of creating the encoder
310    /// if it's actually needed. So if encoding stops before finishing (e.g. due to error) no CPU
311    /// time or memory is wasted.
312    ///
313    /// This will also save memory if the second encoder is larger than `F`.
314    ///
315    /// Note: `F` needs to be `FnMut` instead of `FnOnce` to correctly handle panics.
316    fn then<E: Encoder, F: FnMut() -> E>(self, second_encoder_constructor: F) -> encoders::combinators::Then<Self, E, F> {
317        encoders::combinators::Then::new(self, second_encoder_constructor)
318    }
319
320    /// Chains another encoder after this one.
321    ///
322    /// This requires second encoder to be eagerly created which may waste CPU time if encoding
323    /// stops early. You should consider using [`then`](Self::then) instead, which may save memory
324    /// as well.
325    fn chain<T: Encoder>(self, second_encoder: T) -> encoders::combinators::Chain<Self, T> {
326        encoders::combinators::Chain::new(self, second_encoder)
327    }
328}
329
330/// Marker trait for writers that are either buffered or don't incur the cost of context switch.
331///
332/// The trait should be implemented for types which don't incur a (significant) performance penalty
333/// when writing short chunks of data.
334#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
335pub trait BufWrite {}
336
337#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
338impl<'a, T: BufWrite> BufWrite for &mut T {}
339
340#[cfg(feature = "std")]
341impl<T: std::io::Write> BufWrite for std::io::BufWriter<T> {}
342
343#[cfg(feature = "tokio")]
344impl<T: tokio::io::AsyncWrite> BufWrite for tokio::io::BufWriter<T> {}
345
346#[cfg(feature = "async-std")]
347impl<T: async_std::io::Write> BufWrite for async_std::io::BufWriter<T> {}
348
349#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
350pin_project_lite::pin_project! {
351    /// Wrapper for external types that are known to be buffered.
352    ///
353    /// Downstream users may use this to satisfy the constraint of `write_` methods when they
354    /// themselves can't implement `BufWrite` for types from external crates due to orphan rules.
355    pub struct AssumeBuffered<T> {
356        #[pin]
357        inner: T
358    }
359}
360
361/// Wrapper for external types that are known to be buffered.
362///
363/// Downstream users may use this to satisfy the constraint of `write_` methods when they
364/// themselves can't implement `BufWrite` for types from external crates due to orphan rules.
365#[cfg(all(feature = "std", not(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))))]
366pub struct AssumeBuffered<T> {
367    inner: T,
368}
369
370#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
371impl<T> AssumeBuffered<T> {
372    pub fn new(writer: T) -> Self {
373        AssumeBuffered {
374            inner: writer,
375        }
376    }
377
378    pub fn inner(&self) -> &T {
379        &self.inner
380    }
381
382    pub fn inner_mut(&mut self) -> &mut T {
383        &mut self.inner
384    }
385
386    pub fn into_inner(self) -> T {
387        self.inner
388    }
389}
390
391#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
392impl<T> BufWrite for AssumeBuffered<T> {}
393
394#[cfg(feature = "std")]
395impl<T: std::io::Write> std::io::Write for AssumeBuffered<T> {
396    fn write(&mut self, bytes: &[u8]) -> std::io::Result<usize> {
397        self.inner.write(bytes)
398    }
399
400    fn flush(&mut self) -> std::io::Result<()> {
401        self.inner.flush()
402    }
403}
404
405#[cfg(feature = "async-std")]
406impl<T: async_std::io::Write> async_std::io::Write for AssumeBuffered<T> {
407    fn poll_write(self: Pin<&mut Self>, ctx: &mut core::task::Context, bytes: &[u8]) -> core::task::Poll<std::io::Result<usize>> {
408        self.project().inner.poll_write(ctx, bytes)
409    }
410
411    fn poll_flush(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
412        self.project().inner.poll_flush(ctx)
413    }
414
415    fn poll_close(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
416        self.project().inner.poll_close(ctx)
417    }
418}
419
420#[cfg(feature = "tokio")]
421impl<T: tokio::io::AsyncWrite> tokio::io::AsyncWrite for AssumeBuffered<T> {
422    fn poll_write(self: Pin<&mut Self>, ctx: &mut core::task::Context, bytes: &[u8]) -> core::task::Poll<std::io::Result<usize>> {
423        self.project().inner.poll_write(ctx, bytes)
424    }
425
426    fn poll_flush(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
427        self.project().inner.poll_flush(ctx)
428    }
429
430    fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
431        self.project().inner.poll_shutdown(ctx)
432    }
433}
434
435/// An `Encoder` wrapper that handles partial writes.
436///
437/// This wrapper internally tracks the position of encoded bytes which makes handling of partial
438/// writes very easy. It also simplifies handling of the end.
439#[derive(Debug, Clone)]
440pub struct EncoderPositionTracker<Enc> {
441    encoder: Enc,
442    pos: usize,
443}
444
445impl<Enc: Encoder> EncoderPositionTracker<Enc> {
446    fn new(encoder: Enc) -> Self {
447        EncoderPositionTracker {
448            encoder,
449            pos: 0,
450        }
451    }
452
453    /// Returns an unprocessed chunk of encoded data.
454    ///
455    /// The returned bytes should be processed - e.g. by writing them into a writer. Empty returned
456    /// value indicates there are no more bytes.
457    #[must_use = "This method only returns bytes and doesn't modify the target"]
458    pub fn encoded_chunk(&self) -> &[u8] {
459        &self.encoder.encoded_chunk()[self.pos..]
460    }
461
462    /// Marks the `amount` of bytes as processed.
463    ///
464    /// The consumer should call this method after (partially) processing the chunk. Usually this
465    /// is called after successful [`write`](std::io::Write::write) or analogous function. The
466    /// buffer returned by [`encoded_chunk`](Self::encoded_chunk) will by advanced by `amount`
467    /// bytes and, if it reached the end, the underlying encoder will be advanced to give the next
468    /// chunk.
469    ///
470    /// Calling this method with `amount` larger than `encoded_chunk().len()` will corrupt the
471    /// encoder and lead to a panic later. In debug builds this will panic instantly.
472    #[track_caller]
473    pub fn consume(&mut self, amount: usize) {
474        self.pos += amount;
475        if self.pos >= self.encoder.encoded_chunk().len() {
476            debug_assert_eq!(self.pos, self.encoder.encoded_chunk().len());
477            // Resetting only position when there are more chunks ensures `encoded_bytes` will be
478            // empty when this reaches the end.
479            if self.encoder.next() {
480                self.pos = 0;
481            }
482        }
483    }
484
485    /// Issues single write to te writer and advances the position accordingly.
486    ///
487    /// This can be used as a building block for various abstractions or protocols.
488    /// Returns the number of bytes written. Zero indicates the end of encoding.
489    #[cfg(feature = "std")]
490    pub fn write_once<W: std::io::Write>(&mut self, writer: &mut W) -> std::io::Result<usize> {
491        if self.encoded_chunk().is_empty() {
492            return Ok(0);
493        }
494        let amount = writer.write(self.encoded_chunk())?;
495        self.consume(amount);
496        Ok(amount)
497    }
498
499    /// Writes all bytes to the writer until the end or an error.
500    ///
501    /// This is similar to `Encoder::write_all` with one significant difference: it leaves the
502    /// state around so the operation can be restarted. This can be used to handle
503    /// [`ErrorKind::Interrupted`](std::io::ErrorKind::Interrupted) errors which are generally
504    /// recoverable but users may still wish to act on them (e.g. check a global flag set by a
505    /// signal).
506    #[cfg(feature = "std")]
507    pub fn write_all<W: std::io::Write>(&mut self, writer: &mut W) -> std::io::Result<()> {
508        while self.write_once(writer)? != 0 { }
509        Ok(())
510    }
511}
512
513/// Synchronously decodes a value from the given reader using a custom decoder.
514#[cfg(feature = "std")]
515pub fn decode_sync_with<D: Decoder, R: std::io::BufRead + ?Sized>(reader: &mut R, mut decoder: D) -> Result<D::Value, ReadError<D::Error>> {
516    loop {
517        let buf = match reader.fill_buf() {
518            Ok(buf) => buf,
519            Err(error) if error.kind() == std::io::ErrorKind::Interrupted => continue,
520            Err(error) => return Err(ReadError::Read(error)),
521        };
522        if buf.is_empty() {
523            break decoder.end().map_err(ReadError::Decode);
524        }
525        let num = decoder.bytes_received(buf).map_err(ReadError::Decode)?;
526        let buf_len = buf.len();
527        reader.consume(num);
528        if num < buf_len {
529            break decoder.end().map_err(ReadError::Decode);
530        }
531    }
532}
533
534/// Synchronously decodes a value from the given reader.
535#[cfg(feature = "std")]
536pub fn decode_sync<D: Decoder + Default>(reader: &mut (impl std::io::BufRead + ?Sized)) -> Result<D::Value, ReadError<D::Error>> {
537    decode_sync_with(reader, D::default())
538}
539
540/// Synchronously decodes a value from the given reader using a custom decoder.
541#[cfg(feature = "lgio")]
542pub fn decode_sync_lgio_with<D: Decoder, R: lgio::BufRead + ?Sized>(reader: &mut R, mut decoder: D) -> Result<D::Value, ReadError<D::Error, R::ReadError>> {
543    loop {
544        let buf = reader.fill_buf().map_err(ReadError::Read)?;
545        if buf.is_empty() {
546            break decoder.end().map_err(ReadError::Decode);
547        }
548        let num = decoder.bytes_received(buf).map_err(ReadError::Decode)?;
549        let buf_len = buf.len();
550        reader.consume(num);
551        if num < buf_len {
552            break decoder.end().map_err(ReadError::Decode);
553        }
554    }
555}
556
557/// Synchronously decodes a value from the given reader.
558#[cfg(feature = "lgio")]
559pub fn decode_sync_lgio<D: Decoder + Default, R: lgio::BufRead + ?Sized>(reader: &mut R) -> Result<D::Value, ReadError<D::Error, R::ReadError>> {
560    decode_sync_lgio_with(reader, D::default())
561}
562
563/// Asynchronously decodes a value from the given reader using a custom decoder.
564#[cfg(feature = "futures_0_3")]
565pub async fn decode_futures_0_3_with<D: Decoder, R: futures_io_0_3::AsyncBufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
566    use futures_io_0_3::AsyncBufRead;
567
568    future::DecodeFuture {
569        reader,
570        poll_fn: <R as AsyncBufRead>::poll_fill_buf,
571        consume_fn: <R as AsyncBufRead>::consume,
572        decoder: Some(decoder),
573    }
574    .await
575}
576
577/// Asynchronously decodes a value from the given reader.
578#[cfg(feature = "futures_0_3")]
579pub async fn decode_futures_0_3<D: Decoder + Default>(reader: impl futures_io_0_3::AsyncBufRead) -> Result<D::Value, ReadError<D::Error>> {
580    decode_futures_0_3_with(reader, D::default()).await
581}
582
583/// Asynchronously decodes a value from the given reader using a custom decoder.
584#[cfg(feature = "tokio")]
585pub async fn decode_tokio_with<D: Decoder, R: tokio::io::AsyncBufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
586    use tokio::io::AsyncBufRead;
587
588    future::DecodeFuture {
589        reader,
590        poll_fn: <R as AsyncBufRead>::poll_fill_buf,
591        consume_fn: <R as AsyncBufRead>::consume,
592        decoder: Some(decoder),
593    }
594    .await
595}
596
597/// Asynchronously decodes a value from the given reader.
598#[cfg(feature = "tokio")]
599pub async fn decode_tokio<D: Decoder + Default>(reader: impl tokio::io::AsyncBufRead) -> Result<D::Value, ReadError<D::Error>> {
600    decode_tokio_with(reader, D::default()).await
601}
602
603/// Asynchronously decodes a value from the given reader using a custom decoder.
604#[cfg(feature = "async-std")]
605pub async fn decode_async_std_with<D: Decoder, R: async_std::io::BufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
606    use async_std::io::BufRead as AsyncBufRead;
607
608    future::DecodeFuture {
609        reader,
610        poll_fn: <R as AsyncBufRead>::poll_fill_buf,
611        consume_fn: <R as AsyncBufRead>::consume,
612        decoder: Some(decoder),
613    }
614    .await
615}
616
617/// Asynchronously decodes a value from the given reader.
618#[cfg(feature = "async-std")]
619pub async fn decode_async_std<D: Decoder + Default>(reader: impl async_std::io::BufRead) -> Result<D::Value, ReadError<D::Error>> {
620    decode_async_std_with(reader, D::default()).await
621}
622
623/// Returned when either reading or decoding fails.
624#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
625// A trick to hide (wrong) cfg doc
626#[cfg_attr(docsrs, doc(cfg(all())))]
627#[cfg(not(feature = "std"))]
628pub enum ReadError<Decode, Read> {
629    /// Reading from a reader failed.
630    Read(Read),
631    /// Decoding the value failed.
632    Decode(Decode),
633}
634
635/// Returned when either reading or decoding fails.
636///
637/// Note that the `Read` type param only defaults to [`std::io::Error`] with `std` feature enabled.
638#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
639// A trick to hide (wrong) cfg doc
640#[cfg_attr(docsrs, doc(cfg(all())))]
641#[cfg(feature = "std")]
642pub enum ReadError<Decode, Read = std::io::Error> {
643    /// Reading from a reader failed.
644    Read(Read),
645    /// Decoding the value failed.
646    Decode(Decode),
647}
648
649impl<D, R> ReadError<D, R> {
650    /// Converts the inner errors to another type `E`.
651    pub fn convert_either<E>(self) -> E where D: Into<E>, R: Into<E> {
652        match self {
653            ReadError::Read(error) => error.into(),
654            ReadError::Decode(error) => error.into(),
655        }
656    }
657
658    /// Converts the read error using a closure.
659    ///
660    /// This is analogous to [`Result::map`]/[`Result::map_err`] and leaves `Decode` intact.
661    pub fn map_read<E, F>(self, map: F) -> ReadError<D, E> where F: FnOnce(R) -> E {
662        match self {
663            ReadError::Read(error) => ReadError::Read(map(error)),
664            ReadError::Decode(error) => ReadError::Decode(error),
665        }
666    }
667
668    /// Converts the decode error using a closure.
669    ///
670    /// This is analogous to [`Result::map`]/[`Result::map_err`] and leaves `Read` intact.
671    pub fn map_decode<E, F>(self, map: F) -> ReadError<E, R> where F: FnOnce(D) -> E {
672        match self {
673            ReadError::Read(error) => ReadError::Read(error),
674            ReadError::Decode(error) => ReadError::Decode(map(error)),
675        }
676    }
677}
678
679impl<E> ReadError<E, core::convert::Infallible> {
680    /// Statically proves that reading is infallible and converts to decode error.
681    pub fn into_decode(self) -> E {
682        match self {
683            ReadError::Read(never) => match never {},
684            ReadError::Decode(error) => error,
685        }
686    }
687}
688
689impl<E> ReadError<core::convert::Infallible, E> {
690    /// Statically proves that decoding is infallible and converts to read error.
691    pub fn into_read(self) -> E {
692        match self {
693            ReadError::Read(error) => error,
694            ReadError::Decode(never) => match never {},
695        }
696    }
697}
698
699impl From<ReadError<core::convert::Infallible, core::convert::Infallible>> for core::convert::Infallible {
700    fn from(error: ReadError<core::convert::Infallible, core::convert::Infallible>) -> Self {
701        match error {
702            ReadError::Read(error) => error,
703            ReadError::Decode(error) => error,
704        }
705    }
706}
707
708#[cfg(feature = "std")]
709impl<E: std::error::Error + Send + Sync + 'static> From<ReadError<E, std::io::Error>> for std::io::Error {
710    fn from(error: ReadError<E, std::io::Error>) -> Self {
711        use std::io::ErrorKind;
712
713        match error {
714            ReadError::Read(error) => error,
715            ReadError::Decode(error) => std::io::Error::new(ErrorKind::InvalidData, error),
716        }
717    }
718}
719
720impl<D, R> fmt::Display for ReadError<D, R> {
721    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
722        match self {
723            ReadError::Read(_) => write!(f, "reading failed"),
724            ReadError::Decode(_) => write!(f, "decoding failed"),
725        }
726    }
727}
728
729#[cfg(feature = "std")]
730impl<D: std::error::Error + 'static, R: std::error::Error + 'static> std::error::Error for ReadError<D, R> {
731    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
732        match self {
733            ReadError::Read(error) => Some(error),
734            ReadError::Decode(error) => Some(error),
735        }
736    }
737}