push_decode/lib.rs
1//! # Push-based decoding
2//!
3//! This crate provides abstractions for push-based decoding and pull-based encoding.
4//! That means, the caller is responsible for obtaining the bytes to decode and feeding them into
5//! decoder or pulling bytes from encoder and feeding them into writr.
6//!
7//! The main advantage of this approach is that it's IO-agnostic, which implies both
8//! **`async`-agnostic** and `no_std`. You can use the same code to deserialize from sync
9//! and `async` readers and only need a tiny piece of code to connect the reader to a decoder. This
10//! piece of code is provided by this crate for `std`, [`lgio`] (usable with `no_std`), `tokio`, `futures` and `async-std`.
11//!
12//! # Features
13//!
14//! * `std` - enables integration with the standard library - it's IO and error traits
15//! * `alloc` - enables integration with the standard `alloc` crate
16//! * `lgio` - connects decoders to lgio IO.
17//! * `tokio` - connects decoders to Tokio IO.
18//! * `async-std` - connects decoders to async-std IO.
19//! * `futures_0_3` - connects decoders to futures 0.3.x IO
20
21#![no_std]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23#![cfg_attr(docsrs, feature(doc_cfg))]
24
25#[cfg(feature = "std")]
26extern crate std;
27
28#[cfg(feature = "alloc")]
29#[cfg_attr(test, macro_use)]
30extern crate alloc;
31
32#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
33use core::pin::Pin;
34
35#[cfg(feature = "tokio")]
36use actual_tokio as tokio;
37#[cfg(feature = "async-std")]
38use actual_async_std as async_std;
39
40pub mod decoders;
41pub mod encoders;
42pub mod error;
43pub mod int;
44#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
45pub mod future;
46mod macros;
47
48use core::fmt;
49use core::ops::ControlFlow;
50
51/// Represents types responsible for decoding bytes pushed into it.
52///
53/// The types implementing this trait act like state machines (similar to futures) but instead of
54/// pulling data from some internal source they receive it in method calls. So they are actually
55/// much closer to the traditional state machines than futures.
56pub trait Decoder: Sized {
57 /// The type of value produced by this decoder.
58 type Value;
59 /// Decoding error.
60 type Error;
61
62 /// Processes nex chunk of bytes and updates the cursor.
63 ///
64 /// The decoder has to processes the chunk of bytes performing validation and transformation.
65 ///
66 /// If the bytes are valid the slice is updated to point to unread part. Thus if the slice is
67 /// non-epty after this method returns the decoder ended decoding.
68 ///
69 /// # Errors
70 ///
71 /// An error is returned in case the bytes are invalid. The validity is defined by the
72 /// implementor.
73 ///
74 /// **No** error may be returned if the number of bytes passed is not sufficient to decode the
75 /// value - the remaining bytes will be passed in the following call(s) of this method.
76 fn decode_chunk(&mut self, bytes: &mut &[u8]) -> Result<(), Self::Error>;
77
78 /// Called when decoding has ended or there are no more bytes.
79 ///
80 /// The decoder must validate the bytes passed in so far if it didn't do so yet and return the
81 /// decoded value or an error if the bytes were invalid.
82 ///
83 /// # Errors
84 ///
85 /// This returns an error if the bytes passed so far are invalid as defined by the decoder.
86 /// This commonly happens if the byte stream ended unexpectedly.
87 fn end(self) -> Result<Self::Value, Self::Error>;
88
89 /// Processes nex chunk of bytes without updating the cursor.
90 ///
91 /// This method is usually more convenient for the top-level callers which are receiving bytes
92 /// from buffered readers. Instead of modifying the slice this returns the number of bytes
93 /// consumed which can be passed to the `consume` method of a buffered reader.
94 fn bytes_received(&mut self, mut bytes: &[u8]) -> Result<usize, Self::Error> {
95 let prev_len = bytes.len();
96 self.decode_chunk(&mut bytes)?;
97 Ok(prev_len - bytes.len())
98 }
99
100 /// Chains another decoder after this one finishes such that the value of this one is used to
101 /// initialize the next one.
102 fn then<R: Decoder, F: FnOnce(Self::Value) -> R>(self, fun: F) -> decoders::combinators::Then<Self, R, F> {
103 decoders::combinators::Then::new(self, fun)
104 }
105
106 /// Chains another decoder after this one finishes such that the value of this one is used to
107 /// initialize the next one.
108 ///
109 /// Unlike `then` this combinator may also return an error and convert the errors into a custom
110 /// one.
111 fn then_try<E, R: Decoder, F: FnOnce(Self::Value) -> Result<R, E>>(self, fun: F) -> decoders::combinators::ThenTry<E, Self, R, F> where E: From<Self::Error> + From<R::Error> {
112 decoders::combinators::ThenTry::new(self, fun)
113 }
114
115 /// Chains another decoder after this one to decode two values.
116 fn chain<D: Decoder>(self, following: D) -> decoders::combinators::Chain<Self, D> {
117 decoders::combinators::Chain::new(self, following)
118 }
119
120 /// Resets the decoder returning the decoded value.
121 fn take(&mut self) -> Result<Self::Value, Self::Error> where Self: Default {
122 core::mem::take(self).end()
123 }
124
125 /// Decodes a value from lower-level decoder.
126 ///
127 /// When multiple decoders are chained one after another in a large state machine this method
128 /// can simplify delegation of decoding to the underlying decoder. You can wrap decoding in a
129 /// closure passed to [`Self::wrap_sub_decode`] and then just call `sub_decode()?` at the
130 /// beginning of each decoding state and continue working with the returned value.
131 ///
132 /// The method also accepts a function (closure) to convert the errors since using `map_err`
133 /// would be annoying because of double wrapping. In case no conversion is desired simply pass
134 /// in [`core::convert::identity`].
135 ///
136 /// Note that this requires the `Default` trait because it resets the decoder every time a
137 /// value is decoded. Apart from this resolving borrowing issues it also allows easily decoding
138 /// a stream of value in a loop. If you need to work with decoders that require a value (e.g.
139 /// [`VecDecoder`](decoders::VecDecoder)) it is recommended to create a specialized decoder that
140 /// will decode both (e.g. using [`Then`](decoders::combinators::then)) and call sub_deode on
141 /// that.
142 ///
143 /// You may notice this looks a lot like `await` and in principle it is very similar. The
144 /// differences are:
145 ///
146 /// * `await` also implements the state machine using `Future` trait. This doesn't. The
147 /// `Future::poll` method would have to have another argument for us to be able to use it.
148 /// * This returns `ControlFlow` instead of `Poll<Result>` to make it return in case of "not
149 /// ready" as well. The `Try` implementation on `Poll` only returns on `Err`, never on
150 /// `Pending`. This is important for ergonomics.
151 /// * While it could be argued the type is morally `Poll<Error>` this one doesn't implement
152 /// `Try` either so it's unsuitable for the purpose.
153 fn sub_decode<E, F: FnMut(Self::Error) -> E>(&mut self, bytes: &mut &[u8], mut map_err: F) -> ControlFlow<Result<(), E>, Self::Value> where Self: Default {
154 if let Err(error) = self.decode_chunk(bytes) {
155 return ControlFlow::Break(Err(map_err(error)));
156 }
157 if bytes.is_empty() {
158 ControlFlow::Break(Ok(()))
159 } else {
160 match self.take() {
161 Ok(value) => ControlFlow::Continue(value),
162 Err(error) => ControlFlow::Break(Err(map_err(error))),
163 }
164 }
165 }
166
167 /// Helper for using sub_decode.
168 ///
169 /// This can be used together with [`sub_decode`](Self::sub_decode) on sub-decoders to make
170 /// decoding easier. It helps with type inference and converts `ControlFlow` into `Result`.
171 ///
172 /// Note that this doesn't allow returning `ControlFlow::Continue` as that wouldn't make sense.
173 /// It is recommended to just return `ControlFlow::Break` with the result returned from
174 /// `decode_chunk` of the last decoder.
175 fn wrap_sub_decode<F: FnOnce() -> ControlFlow<Result<(), Self::Error>, core::convert::Infallible>>(f: F) -> Result<(), Self::Error> {
176 match f() {
177 ControlFlow::Continue(never) => match never {},
178 ControlFlow::Break(result) => result,
179 }
180 }
181}
182
183/// Represents types producing bytes of some encoded value.
184pub trait Encoder: Sized {
185 /// Provides next chunk of encoded bytes.
186 ///
187 /// The returned bytes represent a part of the value being encoded and should be written to a
188 /// writer by the consumer. Empty returned value indicates end - there are no more bytes to be
189 /// encoded.
190 ///
191 /// The returned value MUST be the same for all calls of `encoded_chunk` until
192 /// [`next()`](Self::next) is called. IOW it's not allowed to use interior mutability or global
193 /// state (randomness) to affect the returned value.
194 ///
195 /// It's recommended that this method returns bytes within the value if possible or minimal
196 /// required buffer otherwise. The *consumers* of the trait are responsible for buffering, so
197 /// buffering inside encoder would decrease performance.
198 #[must_use = "This method only returns bytes and doesn't modify the target"]
199 fn encoded_chunk(&self) -> &[u8];
200
201 /// Advances the state to get the next chunk of encoded bytes.
202 ///
203 /// Calling this method signals to the encoder that the current chunk was fully processed. The
204 /// encoder MUST either:
205 ///
206 /// * return `true` and provide the next chunk of data in folowing calls to
207 /// [`encoded_chunk`](Self::encoded_chunk).
208 /// * return `false` indicating there is no more data.
209 ///
210 /// The encoder MUST NOT panic or cause similar undesirable behavior if `next` was called again
211 /// after it previously returned `false`. It must simply return `false` again.
212 ///
213 /// Note that the implementor DOES NOT need to guarantee that `encoded_chunk` is empty after
214 /// this method returned `false` and for performance reasons it's not advisable to do in
215 /// release builds. The consumers are responsible for handling this situation. The consumers
216 /// may use [`track_position`](Self::track_position) to get a more convenient interface handling
217 /// these edge-cases and tracking byte position.
218 #[must_use = "Relying on encoded_chunk being empty is insufficient"]
219 fn next(&mut self) -> bool;
220
221 /// Returns a wrapper that tracks the position of processed bytes.
222 ///
223 /// The returned wrapper has a bit different interface that is more suitable for writing into
224 /// writers. It can handle partial writes and makes handling of end easier. Thus downstream
225 /// consumers are encouraged to use it where it makes sense.
226 fn track_position(self) -> EncoderPositionTracker<Self> {
227 EncoderPositionTracker::new(self)
228 }
229
230 fn write_to_slice(mut self, buf: &mut &mut [u8]) -> Result<(), error::BufferOverflow> {
231 while !self.encoded_chunk().is_empty() {
232 let chunk = self.encoded_chunk();
233 if chunk.len() > buf.len() {
234 return Err(error::BufferOverflow { bytes_past_end: chunk.len() - buf.len()});
235 }
236 buf[..chunk.len()].copy_from_slice(chunk);
237 *buf = &mut core::mem::take(buf)[chunk.len()..];
238 if !self.next() {
239 break;
240 }
241 }
242 Ok(())
243 }
244
245 /// Writes all encoded bytes to a vec.
246 ///
247 /// Note that this does **not** call `reserve` since there's no way to know the amount to
248 /// reserve. This should be handled by the code producing the decoder instead.
249 #[cfg(feature = "alloc")]
250 fn write_to_vec(mut self, buf: &mut alloc::vec::Vec<u8>) {
251 while !self.encoded_chunk().is_empty() {
252 buf.extend_from_slice(self.encoded_chunk());
253 if !self.next() {
254 break;
255 }
256 }
257 }
258
259 /// Writes all encoded bytes to the `std` writer.
260 #[cfg(feature = "std")]
261 fn write_all_sync<W: std::io::Write + BufWrite>(mut self, mut writer: W) -> std::io::Result<()> {
262 while !self.encoded_chunk().is_empty() {
263 writer.write_all(self.encoded_chunk())?;
264 if !self.next() {
265 break;
266 }
267 }
268 Ok(())
269 }
270
271 /// Writes all encoded bytes to the `std` writer.
272 #[cfg(feature = "lgio")]
273 fn write_all_sync_lgio<W: lgio::BufWrite>(mut self, mut writer: W) -> Result<(), W::WriteError> {
274 while !self.encoded_chunk().is_empty() {
275 writer.write_all(self.encoded_chunk())?;
276 if !self.next() {
277 break;
278 }
279 }
280 Ok(())
281 }
282
283 /// Writes all encoded bytes to the `tokio` async writer.
284 ///
285 /// The returned future resolves to `std::io::Result<()>`.
286 #[cfg(feature = "tokio")]
287 fn write_all_tokio<W: tokio::io::AsyncWrite + BufWrite>(self, writer: W) -> future::TokioEncodeFuture<W, Self> {
288 future::TokioEncodeFuture::new(writer, self)
289 }
290
291 /// Writes all encoded bytes to the `async-std` async writer.
292 ///
293 /// The returned future resolves to `std::io::Result<()>`.
294 #[cfg(feature = "async-std")]
295 fn write_all_async_std<W: async_std::io::Write + BufWrite>(self, writer: W) -> future::AsyncStdEncodeFuture<W, Self> {
296 future::AsyncStdEncodeFuture::new(writer, self)
297 }
298
299 /// Writes all encoded bytes to the `futures` 0.3 async writer.
300 ///
301 /// The returned future resolves to `std::io::Result<()>`.
302 #[cfg(feature = "futures_0_3")]
303 fn write_all_futures_0_3<W: futures_io_0_3::AsyncWrite + BufWrite>(self, writer: W) -> future::Futures0Dot3EncodeFuture<W, Self> {
304 future::Futures0Dot3EncodeFuture::new(writer, self)
305 }
306
307 /// Chains an encoder constructed by `second_encoder_constructor` after this one.
308 ///
309 /// This is similar to [`chain`](Self::chain) but only incurs the cost of creating the encoder
310 /// if it's actually needed. So if encoding stops before finishing (e.g. due to error) no CPU
311 /// time or memory is wasted.
312 ///
313 /// This will also save memory if the second encoder is larger than `F`.
314 ///
315 /// Note: `F` needs to be `FnMut` instead of `FnOnce` to correctly handle panics.
316 fn then<E: Encoder, F: FnMut() -> E>(self, second_encoder_constructor: F) -> encoders::combinators::Then<Self, E, F> {
317 encoders::combinators::Then::new(self, second_encoder_constructor)
318 }
319
320 /// Chains another encoder after this one.
321 ///
322 /// This requires second encoder to be eagerly created which may waste CPU time if encoding
323 /// stops early. You should consider using [`then`](Self::then) instead, which may save memory
324 /// as well.
325 fn chain<T: Encoder>(self, second_encoder: T) -> encoders::combinators::Chain<Self, T> {
326 encoders::combinators::Chain::new(self, second_encoder)
327 }
328}
329
330/// Marker trait for writers that are either buffered or don't incur the cost of context switch.
331///
332/// The trait should be implemented for types which don't incur a (significant) performance penalty
333/// when writing short chunks of data.
334#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
335pub trait BufWrite {}
336
337#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
338impl<'a, T: BufWrite> BufWrite for &mut T {}
339
340#[cfg(feature = "std")]
341impl<T: std::io::Write> BufWrite for std::io::BufWriter<T> {}
342
343#[cfg(feature = "tokio")]
344impl<T: tokio::io::AsyncWrite> BufWrite for tokio::io::BufWriter<T> {}
345
346#[cfg(feature = "async-std")]
347impl<T: async_std::io::Write> BufWrite for async_std::io::BufWriter<T> {}
348
349#[cfg(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
350pin_project_lite::pin_project! {
351 /// Wrapper for external types that are known to be buffered.
352 ///
353 /// Downstream users may use this to satisfy the constraint of `write_` methods when they
354 /// themselves can't implement `BufWrite` for types from external crates due to orphan rules.
355 pub struct AssumeBuffered<T> {
356 #[pin]
357 inner: T
358 }
359}
360
361/// Wrapper for external types that are known to be buffered.
362///
363/// Downstream users may use this to satisfy the constraint of `write_` methods when they
364/// themselves can't implement `BufWrite` for types from external crates due to orphan rules.
365#[cfg(all(feature = "std", not(any(feature = "tokio", feature = "async-std", feature = "futures_0_3"))))]
366pub struct AssumeBuffered<T> {
367 inner: T,
368}
369
370#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
371impl<T> AssumeBuffered<T> {
372 pub fn new(writer: T) -> Self {
373 AssumeBuffered {
374 inner: writer,
375 }
376 }
377
378 pub fn inner(&self) -> &T {
379 &self.inner
380 }
381
382 pub fn inner_mut(&mut self) -> &mut T {
383 &mut self.inner
384 }
385
386 pub fn into_inner(self) -> T {
387 self.inner
388 }
389}
390
391#[cfg(any(feature = "std", feature = "tokio", feature = "async-std", feature = "futures_0_3"))]
392impl<T> BufWrite for AssumeBuffered<T> {}
393
394#[cfg(feature = "std")]
395impl<T: std::io::Write> std::io::Write for AssumeBuffered<T> {
396 fn write(&mut self, bytes: &[u8]) -> std::io::Result<usize> {
397 self.inner.write(bytes)
398 }
399
400 fn flush(&mut self) -> std::io::Result<()> {
401 self.inner.flush()
402 }
403}
404
405#[cfg(feature = "async-std")]
406impl<T: async_std::io::Write> async_std::io::Write for AssumeBuffered<T> {
407 fn poll_write(self: Pin<&mut Self>, ctx: &mut core::task::Context, bytes: &[u8]) -> core::task::Poll<std::io::Result<usize>> {
408 self.project().inner.poll_write(ctx, bytes)
409 }
410
411 fn poll_flush(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
412 self.project().inner.poll_flush(ctx)
413 }
414
415 fn poll_close(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
416 self.project().inner.poll_close(ctx)
417 }
418}
419
420#[cfg(feature = "tokio")]
421impl<T: tokio::io::AsyncWrite> tokio::io::AsyncWrite for AssumeBuffered<T> {
422 fn poll_write(self: Pin<&mut Self>, ctx: &mut core::task::Context, bytes: &[u8]) -> core::task::Poll<std::io::Result<usize>> {
423 self.project().inner.poll_write(ctx, bytes)
424 }
425
426 fn poll_flush(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
427 self.project().inner.poll_flush(ctx)
428 }
429
430 fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut core::task::Context) -> core::task::Poll<std::io::Result<()>> {
431 self.project().inner.poll_shutdown(ctx)
432 }
433}
434
435/// An `Encoder` wrapper that handles partial writes.
436///
437/// This wrapper internally tracks the position of encoded bytes which makes handling of partial
438/// writes very easy. It also simplifies handling of the end.
439#[derive(Debug, Clone)]
440pub struct EncoderPositionTracker<Enc> {
441 encoder: Enc,
442 pos: usize,
443}
444
445impl<Enc: Encoder> EncoderPositionTracker<Enc> {
446 fn new(encoder: Enc) -> Self {
447 EncoderPositionTracker {
448 encoder,
449 pos: 0,
450 }
451 }
452
453 /// Returns an unprocessed chunk of encoded data.
454 ///
455 /// The returned bytes should be processed - e.g. by writing them into a writer. Empty returned
456 /// value indicates there are no more bytes.
457 #[must_use = "This method only returns bytes and doesn't modify the target"]
458 pub fn encoded_chunk(&self) -> &[u8] {
459 &self.encoder.encoded_chunk()[self.pos..]
460 }
461
462 /// Marks the `amount` of bytes as processed.
463 ///
464 /// The consumer should call this method after (partially) processing the chunk. Usually this
465 /// is called after successful [`write`](std::io::Write::write) or analogous function. The
466 /// buffer returned by [`encoded_chunk`](Self::encoded_chunk) will by advanced by `amount`
467 /// bytes and, if it reached the end, the underlying encoder will be advanced to give the next
468 /// chunk.
469 ///
470 /// Calling this method with `amount` larger than `encoded_chunk().len()` will corrupt the
471 /// encoder and lead to a panic later. In debug builds this will panic instantly.
472 #[track_caller]
473 pub fn consume(&mut self, amount: usize) {
474 self.pos += amount;
475 if self.pos >= self.encoder.encoded_chunk().len() {
476 debug_assert_eq!(self.pos, self.encoder.encoded_chunk().len());
477 // Resetting only position when there are more chunks ensures `encoded_bytes` will be
478 // empty when this reaches the end.
479 if self.encoder.next() {
480 self.pos = 0;
481 }
482 }
483 }
484
485 /// Issues single write to te writer and advances the position accordingly.
486 ///
487 /// This can be used as a building block for various abstractions or protocols.
488 /// Returns the number of bytes written. Zero indicates the end of encoding.
489 #[cfg(feature = "std")]
490 pub fn write_once<W: std::io::Write>(&mut self, writer: &mut W) -> std::io::Result<usize> {
491 if self.encoded_chunk().is_empty() {
492 return Ok(0);
493 }
494 let amount = writer.write(self.encoded_chunk())?;
495 self.consume(amount);
496 Ok(amount)
497 }
498
499 /// Writes all bytes to the writer until the end or an error.
500 ///
501 /// This is similar to `Encoder::write_all` with one significant difference: it leaves the
502 /// state around so the operation can be restarted. This can be used to handle
503 /// [`ErrorKind::Interrupted`](std::io::ErrorKind::Interrupted) errors which are generally
504 /// recoverable but users may still wish to act on them (e.g. check a global flag set by a
505 /// signal).
506 #[cfg(feature = "std")]
507 pub fn write_all<W: std::io::Write>(&mut self, writer: &mut W) -> std::io::Result<()> {
508 while self.write_once(writer)? != 0 { }
509 Ok(())
510 }
511}
512
513/// Synchronously decodes a value from the given reader using a custom decoder.
514#[cfg(feature = "std")]
515pub fn decode_sync_with<D: Decoder, R: std::io::BufRead + ?Sized>(reader: &mut R, mut decoder: D) -> Result<D::Value, ReadError<D::Error>> {
516 loop {
517 let buf = match reader.fill_buf() {
518 Ok(buf) => buf,
519 Err(error) if error.kind() == std::io::ErrorKind::Interrupted => continue,
520 Err(error) => return Err(ReadError::Read(error)),
521 };
522 if buf.is_empty() {
523 break decoder.end().map_err(ReadError::Decode);
524 }
525 let num = decoder.bytes_received(buf).map_err(ReadError::Decode)?;
526 let buf_len = buf.len();
527 reader.consume(num);
528 if num < buf_len {
529 break decoder.end().map_err(ReadError::Decode);
530 }
531 }
532}
533
534/// Synchronously decodes a value from the given reader.
535#[cfg(feature = "std")]
536pub fn decode_sync<D: Decoder + Default>(reader: &mut (impl std::io::BufRead + ?Sized)) -> Result<D::Value, ReadError<D::Error>> {
537 decode_sync_with(reader, D::default())
538}
539
540/// Synchronously decodes a value from the given reader using a custom decoder.
541#[cfg(feature = "lgio")]
542pub fn decode_sync_lgio_with<D: Decoder, R: lgio::BufRead + ?Sized>(reader: &mut R, mut decoder: D) -> Result<D::Value, ReadError<D::Error, R::ReadError>> {
543 loop {
544 let buf = reader.fill_buf().map_err(ReadError::Read)?;
545 if buf.is_empty() {
546 break decoder.end().map_err(ReadError::Decode);
547 }
548 let num = decoder.bytes_received(buf).map_err(ReadError::Decode)?;
549 let buf_len = buf.len();
550 reader.consume(num);
551 if num < buf_len {
552 break decoder.end().map_err(ReadError::Decode);
553 }
554 }
555}
556
557/// Synchronously decodes a value from the given reader.
558#[cfg(feature = "lgio")]
559pub fn decode_sync_lgio<D: Decoder + Default, R: lgio::BufRead + ?Sized>(reader: &mut R) -> Result<D::Value, ReadError<D::Error, R::ReadError>> {
560 decode_sync_lgio_with(reader, D::default())
561}
562
563/// Asynchronously decodes a value from the given reader using a custom decoder.
564#[cfg(feature = "futures_0_3")]
565pub async fn decode_futures_0_3_with<D: Decoder, R: futures_io_0_3::AsyncBufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
566 use futures_io_0_3::AsyncBufRead;
567
568 future::DecodeFuture {
569 reader,
570 poll_fn: <R as AsyncBufRead>::poll_fill_buf,
571 consume_fn: <R as AsyncBufRead>::consume,
572 decoder: Some(decoder),
573 }
574 .await
575}
576
577/// Asynchronously decodes a value from the given reader.
578#[cfg(feature = "futures_0_3")]
579pub async fn decode_futures_0_3<D: Decoder + Default>(reader: impl futures_io_0_3::AsyncBufRead) -> Result<D::Value, ReadError<D::Error>> {
580 decode_futures_0_3_with(reader, D::default()).await
581}
582
583/// Asynchronously decodes a value from the given reader using a custom decoder.
584#[cfg(feature = "tokio")]
585pub async fn decode_tokio_with<D: Decoder, R: tokio::io::AsyncBufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
586 use tokio::io::AsyncBufRead;
587
588 future::DecodeFuture {
589 reader,
590 poll_fn: <R as AsyncBufRead>::poll_fill_buf,
591 consume_fn: <R as AsyncBufRead>::consume,
592 decoder: Some(decoder),
593 }
594 .await
595}
596
597/// Asynchronously decodes a value from the given reader.
598#[cfg(feature = "tokio")]
599pub async fn decode_tokio<D: Decoder + Default>(reader: impl tokio::io::AsyncBufRead) -> Result<D::Value, ReadError<D::Error>> {
600 decode_tokio_with(reader, D::default()).await
601}
602
603/// Asynchronously decodes a value from the given reader using a custom decoder.
604#[cfg(feature = "async-std")]
605pub async fn decode_async_std_with<D: Decoder, R: async_std::io::BufRead>(reader: R, decoder: D) -> Result<D::Value, ReadError<D::Error>> {
606 use async_std::io::BufRead as AsyncBufRead;
607
608 future::DecodeFuture {
609 reader,
610 poll_fn: <R as AsyncBufRead>::poll_fill_buf,
611 consume_fn: <R as AsyncBufRead>::consume,
612 decoder: Some(decoder),
613 }
614 .await
615}
616
617/// Asynchronously decodes a value from the given reader.
618#[cfg(feature = "async-std")]
619pub async fn decode_async_std<D: Decoder + Default>(reader: impl async_std::io::BufRead) -> Result<D::Value, ReadError<D::Error>> {
620 decode_async_std_with(reader, D::default()).await
621}
622
623/// Returned when either reading or decoding fails.
624#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
625// A trick to hide (wrong) cfg doc
626#[cfg_attr(docsrs, doc(cfg(all())))]
627#[cfg(not(feature = "std"))]
628pub enum ReadError<Decode, Read> {
629 /// Reading from a reader failed.
630 Read(Read),
631 /// Decoding the value failed.
632 Decode(Decode),
633}
634
635/// Returned when either reading or decoding fails.
636///
637/// Note that the `Read` type param only defaults to [`std::io::Error`] with `std` feature enabled.
638#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
639// A trick to hide (wrong) cfg doc
640#[cfg_attr(docsrs, doc(cfg(all())))]
641#[cfg(feature = "std")]
642pub enum ReadError<Decode, Read = std::io::Error> {
643 /// Reading from a reader failed.
644 Read(Read),
645 /// Decoding the value failed.
646 Decode(Decode),
647}
648
649impl<D, R> ReadError<D, R> {
650 /// Converts the inner errors to another type `E`.
651 pub fn convert_either<E>(self) -> E where D: Into<E>, R: Into<E> {
652 match self {
653 ReadError::Read(error) => error.into(),
654 ReadError::Decode(error) => error.into(),
655 }
656 }
657
658 /// Converts the read error using a closure.
659 ///
660 /// This is analogous to [`Result::map`]/[`Result::map_err`] and leaves `Decode` intact.
661 pub fn map_read<E, F>(self, map: F) -> ReadError<D, E> where F: FnOnce(R) -> E {
662 match self {
663 ReadError::Read(error) => ReadError::Read(map(error)),
664 ReadError::Decode(error) => ReadError::Decode(error),
665 }
666 }
667
668 /// Converts the decode error using a closure.
669 ///
670 /// This is analogous to [`Result::map`]/[`Result::map_err`] and leaves `Read` intact.
671 pub fn map_decode<E, F>(self, map: F) -> ReadError<E, R> where F: FnOnce(D) -> E {
672 match self {
673 ReadError::Read(error) => ReadError::Read(error),
674 ReadError::Decode(error) => ReadError::Decode(map(error)),
675 }
676 }
677}
678
679impl<E> ReadError<E, core::convert::Infallible> {
680 /// Statically proves that reading is infallible and converts to decode error.
681 pub fn into_decode(self) -> E {
682 match self {
683 ReadError::Read(never) => match never {},
684 ReadError::Decode(error) => error,
685 }
686 }
687}
688
689impl<E> ReadError<core::convert::Infallible, E> {
690 /// Statically proves that decoding is infallible and converts to read error.
691 pub fn into_read(self) -> E {
692 match self {
693 ReadError::Read(error) => error,
694 ReadError::Decode(never) => match never {},
695 }
696 }
697}
698
699impl From<ReadError<core::convert::Infallible, core::convert::Infallible>> for core::convert::Infallible {
700 fn from(error: ReadError<core::convert::Infallible, core::convert::Infallible>) -> Self {
701 match error {
702 ReadError::Read(error) => error,
703 ReadError::Decode(error) => error,
704 }
705 }
706}
707
708#[cfg(feature = "std")]
709impl<E: std::error::Error + Send + Sync + 'static> From<ReadError<E, std::io::Error>> for std::io::Error {
710 fn from(error: ReadError<E, std::io::Error>) -> Self {
711 use std::io::ErrorKind;
712
713 match error {
714 ReadError::Read(error) => error,
715 ReadError::Decode(error) => std::io::Error::new(ErrorKind::InvalidData, error),
716 }
717 }
718}
719
720impl<D, R> fmt::Display for ReadError<D, R> {
721 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
722 match self {
723 ReadError::Read(_) => write!(f, "reading failed"),
724 ReadError::Decode(_) => write!(f, "decoding failed"),
725 }
726 }
727}
728
729#[cfg(feature = "std")]
730impl<D: std::error::Error + 'static, R: std::error::Error + 'static> std::error::Error for ReadError<D, R> {
731 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
732 match self {
733 ReadError::Read(error) => Some(error),
734 ReadError::Decode(error) => Some(error),
735 }
736 }
737}