async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "nonempty-collections")] mod nonempty_collections;
68#[cfg(feature = "rust_decimal")] mod rust_decimal;
69#[cfg(feature = "semver")] mod semver;
70#[cfg(feature = "serde_json")] mod serde_json;
71#[cfg(feature = "serenity")] mod serenity;
72#[cfg(feature = "hematite-nbt")] mod hematite_nbt;
73#[cfg(feature = "url")] mod url;
74#[cfg(feature = "uuid")] mod uuid;
75
76async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
77    let len = match max_len {
78        0 => 0,
79        1..=255 => u8::read(stream).await?.into(),
80        256..=65_535 => u16::read(stream).await?.into(),
81        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
82        _ => u64::read(stream).await?,
83    };
84    if len > max_len {
85        Err(ReadError {
86            context: error_ctx(),
87            kind: ReadErrorKind::MaxLen { len, max_len },
88        })
89    } else {
90        usize::try_from(len).map_err(|e| ReadError {
91            context: error_ctx(),
92            kind: e.into(),
93        })
94    }
95}
96
97async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
98    let len = u64::try_from(len).map_err(|e| WriteError {
99        context: error_ctx(),
100        kind: e.into(),
101    })?;
102    if len > max_len {
103        return Err(WriteError {
104            context: error_ctx(),
105            kind: WriteErrorKind::MaxLen { len, max_len },
106        })
107    }
108    match max_len {
109        0 => {}
110        1..=255 => (len as u8).write(sink).await?,
111        256..=65_535 => (len as u16).write(sink).await?,
112        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
113        _ => len.write(sink).await?,
114    }
115    Ok(())
116}
117
118fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
119    let len = match max_len {
120        0 => 0,
121        1..=255 => u8::read_sync(stream)?.into(),
122        256..=65_535 => u16::read_sync(stream)?.into(),
123        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
124        _ => u64::read_sync(stream)?,
125    };
126    if len > max_len {
127        Err(ReadError {
128            context: error_ctx(),
129            kind: ReadErrorKind::MaxLen { len, max_len },
130        })
131    } else {
132        usize::try_from(len).map_err(|e| ReadError {
133            context: error_ctx(),
134            kind: e.into(),
135        })
136    }
137}
138
139fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
140    let len = u64::try_from(len).map_err(|e| WriteError {
141        context: error_ctx(),
142        kind: e.into(),
143    })?;
144    if len > max_len {
145        return Err(WriteError {
146            context: error_ctx(),
147            kind: WriteErrorKind::MaxLen { len, max_len },
148        })
149    }
150    match max_len {
151        0 => {}
152        1..=255 => (len as u8).write_sync(sink)?,
153        256..=65_535 => (len as u16).write_sync(sink)?,
154        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
155        _ => len.write_sync(sink)?,
156    }
157    Ok(())
158}
159
160macro_rules! impl_protocol_primitive {
161    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
162        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
163        impl Protocol for $ty {
164            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
165                Box::pin(async move {
166                    Ok(stream.$read().await.map_err(|e| ReadError {
167                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
168                        kind: e.into(),
169                    })?)
170                })
171            }
172
173            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
174                Box::pin(async move {
175                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
176                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
177                        kind: e.into(),
178                    })?)
179                })
180            }
181
182            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
183                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
184                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
185                    kind: e.into(),
186                })?)
187            }
188
189            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
190                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
191                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
192                    kind: e.into(),
193                })?)
194            }
195        }
196    };
197}
198
199impl_protocol_primitive!(u8, read_u8, write_u8);
200impl_protocol_primitive!(i8, read_i8, write_i8);
201impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
202impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
203impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
204impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
205impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
206impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
207impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
208impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
209
210impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
211    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
212        Box::pin(async move {
213            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
214        })
215    }
216
217    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
218        Box::pin(async move {
219            self.start().write(sink).await?;
220            self.end().write(sink).await?;
221            Ok(())
222        })
223    }
224
225    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
226        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
227    }
228
229    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
230        self.start().write_sync(sink)?;
231        self.end().write_sync(sink)?;
232        Ok(())
233    }
234}
235
236macro_rules! impl_protocol_tuple {
237    ($($ty:ident),*) => {
238        #[allow(unused)]
239        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
240            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
241                Box::pin(async move {
242                    Ok((
243                        $($ty::read(stream).await?,)*
244                    ))
245                })
246            }
247
248            #[allow(non_snake_case)]
249            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
250                Box::pin(async move {
251                    let ($($ty,)*) = self;
252                    $(
253                        $ty.write(sink).await?;
254                    )*
255                    Ok(())
256                })
257            }
258
259            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
260                Ok((
261                    $($ty::read_sync(stream)?,)*
262                ))
263            }
264
265            #[allow(non_snake_case)]
266            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
267                let ($($ty,)*) = self;
268                $(
269                    $ty.write_sync(sink)?;
270                )*
271                Ok(())
272            }
273        }
274    };
275}
276
277impl_protocol_tuple!();
278impl_protocol_tuple!(A);
279impl_protocol_tuple!(A, B);
280impl_protocol_tuple!(A, B, C);
281impl_protocol_tuple!(A, B, C, D);
282impl_protocol_tuple!(A, B, C, D, E);
283impl_protocol_tuple!(A, B, C, D, E, F);
284impl_protocol_tuple!(A, B, C, D, E, F, G);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
288impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
289impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
290
291impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
292    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
293        Box::pin(async move {
294            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
295                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
296                kind: e.into(),
297            })?;
298            for _ in 0..N {
299                vec.push(T::read(stream).await?);
300            }
301            Ok(match vec.try_into() {
302                Ok(array) => array,
303                Err(_) => panic!("wrong array length"),
304            })
305        })
306    }
307
308    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
309        Box::pin(async move {
310            for elt in self {
311                elt.write(sink).await?;
312            }
313            Ok(())
314        })
315    }
316
317    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
318        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
319            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
320            kind: e.into(),
321        })?;
322        for _ in 0..N {
323            vec.push(T::read_sync(stream)?);
324        }
325        Ok(match vec.try_into() {
326            Ok(array) => array,
327            Err(_) => panic!("wrong array length"),
328        })
329    }
330
331    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
332        for elt in self {
333            elt.write_sync(sink)?;
334        }
335        Ok(())
336    }
337}
338
339/// Represented as one byte, with `0` for `false` and `1` for `true`.
340impl Protocol for bool {
341    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
342        Box::pin(async move {
343            Ok(match u8::read(stream).await? {
344                0 => false,
345                1 => true,
346                n => return Err(ReadError {
347                    context: ErrorContext::BuiltIn { for_type: "bool" },
348                    kind: ReadErrorKind::UnknownVariant8(n),
349                }),
350            })
351        })
352    }
353
354    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
355        Box::pin(async move {
356            if *self { 1u8 } else { 0 }.write(sink).await
357        })
358    }
359
360    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
361        Ok(match u8::read_sync(stream)? {
362            0 => false,
363            1 => true,
364            n => return Err(ReadError {
365                context: ErrorContext::BuiltIn { for_type: "bool" },
366                kind: ReadErrorKind::UnknownVariant8(n),
367            }),
368        })
369    }
370
371    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
372        if *self { 1u8 } else { 0 }.write_sync(sink)
373    }
374}
375
376impl<T: Protocol> Protocol for Box<T> {
377    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
378        Box::pin(async move {
379            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
380                context: ErrorContext::BuiltIn { for_type: "Box" },
381                kind: e.into(),
382            })?)
383        })
384    }
385
386    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
387        (**self).write(sink)
388    }
389
390    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
391        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
392            context: ErrorContext::BuiltIn { for_type: "Box" },
393            kind: e.into(),
394        })?)
395    }
396
397    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
398        (**self).write_sync(sink)
399    }
400}
401
402/// A vector is prefixed with the length as a [`u64`].
403///
404/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
405/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
406impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
407        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
408        Self::read_length_prefixed(stream, u64::MAX)
409    }
410
411    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
412        self.write_length_prefixed(sink, u64::MAX)
413    }
414
415    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
416        Self::read_length_prefixed_sync(stream, u64::MAX)
417    }
418
419    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
420        self.write_length_prefixed_sync(sink, u64::MAX)
421    }
422}
423
424/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
425/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
426impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
427    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
428        Box::pin(async move {
429            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
430            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
431                context: ErrorContext::BuiltIn { for_type: "Vec" },
432                kind: e.into(),
433            })?;
434            for _ in 0..len {
435                buf.push(T::read(stream).await?);
436            }
437            Ok(buf)
438        })
439    }
440
441    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
442        Box::pin(async move {
443            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
444            for elt in self {
445                elt.write(sink).await?;
446            }
447            Ok(())
448        })
449    }
450
451    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
452        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
453        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
454            context: ErrorContext::BuiltIn { for_type: "Vec" },
455            kind: e.into(),
456        })?;
457        for _ in 0..len {
458            buf.push(T::read_sync(stream)?);
459        }
460        Ok(buf)
461    }
462
463    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
464        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
465        for elt in self {
466            elt.write_sync(sink)?;
467        }
468        Ok(())
469    }
470}
471
472/// A set is prefixed with the length as a [`u64`].
473impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
474        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
475        Self::read_length_prefixed(stream, u64::MAX)
476    }
477
478    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
479        self.write_length_prefixed(sink, u64::MAX)
480    }
481
482    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
483        Self::read_length_prefixed_sync(stream, u64::MAX)
484    }
485
486    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
487        self.write_length_prefixed_sync(sink, u64::MAX)
488    }
489}
490
491impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
492    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
493        Box::pin(async move {
494            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
495            let mut set = Self::default();
496            for _ in 0..len {
497                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
498            }
499            Ok(set)
500        })
501    }
502
503    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
504        Box::pin(async move {
505            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
506            for elt in self {
507                elt.write(sink).await?;
508            }
509            Ok(())
510        })
511    }
512
513    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
514        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
515        for _ in 0..len {
516            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
517        }
518        Ok(set)
519    }
520
521    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
522        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
523        for elt in self {
524            elt.write_sync(sink)?;
525        }
526        Ok(())
527    }
528}
529
530/// A set is prefixed with the length as a [`u64`].
531impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
532    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
533        Self::read_length_prefixed(stream, u64::MAX)
534    }
535
536    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
537        self.write_length_prefixed(sink, u64::MAX)
538    }
539
540    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
541        Self::read_length_prefixed_sync(stream, u64::MAX)
542    }
543
544    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
545        self.write_length_prefixed_sync(sink, u64::MAX)
546    }
547}
548
549impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
550    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
551        Box::pin(async move {
552            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
553            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
554            for _ in 0..len {
555                set.insert(T::read(stream).await?);
556            }
557            Ok(set)
558        })
559    }
560
561    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
562        Box::pin(async move {
563            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
564            for elt in self {
565                elt.write(sink).await?;
566            }
567            Ok(())
568        })
569    }
570
571    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
572        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
573        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
574        for _ in 0..len {
575            set.insert(T::read_sync(stream)?);
576        }
577        Ok(set)
578    }
579
580    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
581        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
582        for elt in self {
583            elt.write_sync(sink)?;
584        }
585        Ok(())
586    }
587}
588
589/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
590impl Protocol for String {
591    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
592        Self::read_length_prefixed(stream, u64::MAX)
593    }
594
595    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
596        self.write_length_prefixed(sink, u64::MAX)
597    }
598
599    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
600        Self::read_length_prefixed_sync(stream, u64::MAX)
601    }
602
603    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
604        self.write_length_prefixed_sync(sink, u64::MAX)
605    }
606}
607
608/// A string is encoded in UTF-8 and prefixed with the length in bytes.
609impl LengthPrefixed for String {
610    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
611        Box::pin(async move {
612            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
613            let mut buf = Vec::default();
614            buf.try_resize(len, 0).map_err(|e| ReadError {
615                context: ErrorContext::BuiltIn { for_type: "String" },
616                kind: e.into(),
617            })?;
618            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
619                context: ErrorContext::BuiltIn { for_type: "String" },
620                kind: e.into(),
621            })?;
622            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
623                context: ErrorContext::BuiltIn { for_type: "String" },
624                kind: e.into(),
625            })?)
626        })
627    }
628
629    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
630        Box::pin(async move {
631            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
632            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
633                context: ErrorContext::BuiltIn { for_type: "String" },
634                kind: e.into(),
635            })?;
636            Ok(())
637        })
638    }
639
640    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
641        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
642        let mut buf = Vec::default();
643        buf.try_resize(len, 0).map_err(|e| ReadError {
644            context: ErrorContext::BuiltIn { for_type: "String" },
645            kind: e.into(),
646        })?;
647        stream.read_exact(&mut buf).map_err(|e| ReadError {
648            context: ErrorContext::BuiltIn { for_type: "String" },
649            kind: e.into(),
650        })?;
651        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
652            context: ErrorContext::BuiltIn { for_type: "String" },
653            kind: e.into(),
654        })?)
655    }
656
657    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
658        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
659        sink.write(self.as_bytes()).map_err(|e| WriteError {
660            context: ErrorContext::BuiltIn { for_type: "String" },
661            kind: e.into(),
662        })?;
663        Ok(())
664    }
665}
666
667impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
668    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
669        Self::read_length_prefixed(stream, u64::MAX)
670    }
671
672    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
673        self.write_length_prefixed(sink, u64::MAX)
674    }
675
676    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
677        Self::read_length_prefixed_sync(stream, u64::MAX)
678    }
679
680    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
681        self.write_length_prefixed_sync(sink, u64::MAX)
682    }
683}
684
685impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
686    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
687        Box::pin(async move {
688            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
689            let mut map = Self::default();
690            for _ in 0..len {
691                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
692            }
693            Ok(map)
694        })
695    }
696
697    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
698        Box::pin(async move {
699            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
700            for (k, v) in self {
701                k.write(sink).await?;
702                v.write(sink).await?;
703            }
704            Ok(())
705        })
706    }
707
708    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
709        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
710        let mut map = Self::default();
711        for _ in 0..len {
712            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
713        }
714        Ok(map)
715    }
716
717    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
718        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
719        for (k, v) in self {
720            k.write_sync(sink)?;
721            v.write_sync(sink)?;
722        }
723        Ok(())
724    }
725}
726
727/// A map is prefixed with the length as a [`u64`].
728impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
729    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
730        Self::read_length_prefixed(stream, u64::MAX)
731    }
732
733    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
734        self.write_length_prefixed(sink, u64::MAX)
735    }
736
737    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
738        Self::read_length_prefixed_sync(stream, u64::MAX)
739    }
740
741    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
742        self.write_length_prefixed_sync(sink, u64::MAX)
743    }
744}
745
746impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
747    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
748        Box::pin(async move {
749            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
750            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
751            for _ in 0..len {
752                map.insert(K::read(stream).await?, V::read(stream).await?);
753            }
754            Ok(map)
755        })
756    }
757
758    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
759        Box::pin(async move {
760            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
761            for (k, v) in self {
762                k.write(sink).await?;
763                v.write(sink).await?;
764            }
765            Ok(())
766        })
767    }
768
769    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
770        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
771        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
772        for _ in 0..len {
773            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
774        }
775        Ok(map)
776    }
777
778    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
779        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
780        for (k, v) in self {
781            k.write_sync(sink)?;
782            v.write_sync(sink)?;
783        }
784        Ok(())
785    }
786}
787
788/// A cow is represented like its owned variant.
789///
790/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
791impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
792where B::Owned: Protocol + Send + Sync {
793    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
794        Box::pin(async move {
795            Ok(Self::Owned(B::Owned::read(stream).await?))
796        })
797    }
798
799    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
800        Box::pin(async move {
801            match self {
802                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
803                Self::Owned(owned) => owned.write(sink).await?,
804            }
805            Ok(())
806        })
807    }
808
809    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
810        Ok(Self::Owned(B::Owned::read_sync(stream)?))
811    }
812
813    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
814        match self {
815            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
816            Self::Owned(owned) => owned.write_sync(sink)?,
817        }
818        Ok(())
819    }
820}
821
822/// A cow is represented like its owned variant.
823///
824/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
825impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
826where B::Owned: LengthPrefixed + Send + Sync {
827    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
828        Box::pin(async move {
829            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
830        })
831    }
832
833    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
834        Box::pin(async move {
835            match self {
836                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
837                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
838            }
839            Ok(())
840        })
841    }
842
843    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
844        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
845    }
846
847    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
848        match self {
849            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
850            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
851        }
852        Ok(())
853    }
854}
855
856#[derive(Protocol)]
857#[async_proto(internal)]
858struct F32Proxy([u8; 4]);
859
860impl From<F32Proxy> for f32 {
861    fn from(F32Proxy(bytes): F32Proxy) -> Self {
862        Self::from_be_bytes(bytes)
863    }
864}
865
866impl<'a> From<&'a f32> for F32Proxy {
867    fn from(val: &f32) -> Self {
868        Self(val.to_be_bytes())
869    }
870}
871
872#[derive(Protocol)]
873#[async_proto(internal)]
874struct F64Proxy([u8; 8]);
875
876impl From<F64Proxy> for f64 {
877    fn from(F64Proxy(bytes): F64Proxy) -> Self {
878        Self::from_be_bytes(bytes)
879    }
880}
881
882impl<'a> From<&'a f64> for F64Proxy {
883    fn from(val: &f64) -> Self {
884        Self(val.to_be_bytes())
885    }
886}
887
888#[derive(Protocol)]
889#[async_proto(internal)]
890struct DurationProxy {
891    secs: u64,
892    subsec_nanos: u32,
893}
894
895impl From<DurationProxy> for std::time::Duration {
896    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
897        Self::new(secs, subsec_nanos)
898    }
899}
900
901impl<'a> From<&'a std::time::Duration> for DurationProxy {
902    fn from(duration: &std::time::Duration) -> Self {
903        Self {
904            secs: duration.as_secs(),
905            subsec_nanos: duration.subsec_nanos(),
906        }
907    }
908}
909
910impl_protocol_for! {
911    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
912    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
913    type std::num::NonZeroU8;
914
915    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
916    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
917    type std::num::NonZeroI8;
918
919    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
920    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
921    type std::num::NonZeroU16;
922
923    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
924    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
925    type std::num::NonZeroI16;
926
927    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
928    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
929    type std::num::NonZeroU32;
930
931    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
932    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
933    type std::num::NonZeroI32;
934
935    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
936    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
937    type std::num::NonZeroU64;
938
939    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
940    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
941    type std::num::NonZeroI64;
942
943    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
944    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
945    type std::num::NonZeroU128;
946
947    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
948    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
949    type std::num::NonZeroI128;
950
951    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
952    #[async_proto(via = F32Proxy)]
953    type f32;
954
955    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
956    #[async_proto(via = F64Proxy)]
957    type f64;
958
959    #[async_proto(where(Idx: Protocol + Send + Sync))]
960    struct Range<Idx> {
961        start: Idx,
962        end: Idx,
963    }
964
965    #[async_proto(where(Idx: Protocol + Sync))]
966    struct RangeFrom<Idx> {
967        start: Idx,
968    }
969
970    #[async_proto(where(Idx: Protocol + Sync))]
971    struct RangeTo<Idx> {
972        end: Idx,
973    }
974
975    #[async_proto(where(Idx: Protocol + Sync))]
976    struct RangeToInclusive<Idx> {
977        end: Idx,
978    }
979
980    #[async_proto(where(T: Protocol + Sync))]
981    enum Option<T> {
982        None,
983        Some(T),
984    }
985
986    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
987    enum Result<T, E> {
988        Ok(T),
989        Err(E),
990    }
991
992    enum std::convert::Infallible {}
993
994    #[async_proto(where(T: Sync))]
995    struct std::marker::PhantomData<T>;
996
997    struct std::ops::RangeFull;
998
999    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
1000    #[async_proto(via = DurationProxy)]
1001    type std::time::Duration;
1002}