async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "nonempty-collections")] mod nonempty_collections;
68#[cfg(feature = "rust_decimal")] mod rust_decimal;
69#[cfg(feature = "semver")] mod semver;
70#[cfg(feature = "serde_json")] mod serde_json;
71#[cfg(feature = "serenity")] mod serenity;
72#[cfg(feature = "url")] mod url;
73#[cfg(feature = "uuid")] mod uuid;
74
75async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
76    let len = match max_len {
77        0 => 0,
78        1..=255 => u8::read(stream).await?.into(),
79        256..=65_535 => u16::read(stream).await?.into(),
80        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
81        _ => u64::read(stream).await?,
82    };
83    if len > max_len {
84        Err(ReadError {
85            context: error_ctx(),
86            kind: ReadErrorKind::MaxLen { len, max_len },
87        })
88    } else {
89        usize::try_from(len).map_err(|e| ReadError {
90            context: error_ctx(),
91            kind: e.into(),
92        })
93    }
94}
95
96async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
97    let len = u64::try_from(len).map_err(|e| WriteError {
98        context: error_ctx(),
99        kind: e.into(),
100    })?;
101    if len > max_len {
102        return Err(WriteError {
103            context: error_ctx(),
104            kind: WriteErrorKind::MaxLen { len, max_len },
105        })
106    }
107    match max_len {
108        0 => {}
109        1..=255 => (len as u8).write(sink).await?,
110        256..=65_535 => (len as u16).write(sink).await?,
111        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
112        _ => len.write(sink).await?,
113    }
114    Ok(())
115}
116
117fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
118    let len = match max_len {
119        0 => 0,
120        1..=255 => u8::read_sync(stream)?.into(),
121        256..=65_535 => u16::read_sync(stream)?.into(),
122        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
123        _ => u64::read_sync(stream)?,
124    };
125    if len > max_len {
126        Err(ReadError {
127            context: error_ctx(),
128            kind: ReadErrorKind::MaxLen { len, max_len },
129        })
130    } else {
131        usize::try_from(len).map_err(|e| ReadError {
132            context: error_ctx(),
133            kind: e.into(),
134        })
135    }
136}
137
138fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
139    let len = u64::try_from(len).map_err(|e| WriteError {
140        context: error_ctx(),
141        kind: e.into(),
142    })?;
143    if len > max_len {
144        return Err(WriteError {
145            context: error_ctx(),
146            kind: WriteErrorKind::MaxLen { len, max_len },
147        })
148    }
149    match max_len {
150        0 => {}
151        1..=255 => (len as u8).write_sync(sink)?,
152        256..=65_535 => (len as u16).write_sync(sink)?,
153        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
154        _ => len.write_sync(sink)?,
155    }
156    Ok(())
157}
158
159macro_rules! impl_protocol_primitive {
160    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
161        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
162        impl Protocol for $ty {
163            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
164                Box::pin(async move {
165                    Ok(stream.$read().await.map_err(|e| ReadError {
166                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
167                        kind: e.into(),
168                    })?)
169                })
170            }
171
172            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
173                Box::pin(async move {
174                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
175                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
176                        kind: e.into(),
177                    })?)
178                })
179            }
180
181            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
182                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
183                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
184                    kind: e.into(),
185                })?)
186            }
187
188            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
189                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
190                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
191                    kind: e.into(),
192                })?)
193            }
194        }
195    };
196}
197
198impl_protocol_primitive!(u8, read_u8, write_u8);
199impl_protocol_primitive!(i8, read_i8, write_i8);
200impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
201impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
202impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
203impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
204impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
205impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
206impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
207impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
208
209impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
210    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
211        Box::pin(async move {
212            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
213        })
214    }
215
216    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
217        Box::pin(async move {
218            self.start().write(sink).await?;
219            self.end().write(sink).await?;
220            Ok(())
221        })
222    }
223
224    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
225        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
226    }
227
228    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
229        self.start().write_sync(sink)?;
230        self.end().write_sync(sink)?;
231        Ok(())
232    }
233}
234
235macro_rules! impl_protocol_tuple {
236    ($($ty:ident),*) => {
237        #[allow(unused)]
238        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
239            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
240                Box::pin(async move {
241                    Ok((
242                        $($ty::read(stream).await?,)*
243                    ))
244                })
245            }
246
247            #[allow(non_snake_case)]
248            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
249                Box::pin(async move {
250                    let ($($ty,)*) = self;
251                    $(
252                        $ty.write(sink).await?;
253                    )*
254                    Ok(())
255                })
256            }
257
258            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
259                Ok((
260                    $($ty::read_sync(stream)?,)*
261                ))
262            }
263
264            #[allow(non_snake_case)]
265            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
266                let ($($ty,)*) = self;
267                $(
268                    $ty.write_sync(sink)?;
269                )*
270                Ok(())
271            }
272        }
273    };
274}
275
276impl_protocol_tuple!();
277impl_protocol_tuple!(A);
278impl_protocol_tuple!(A, B);
279impl_protocol_tuple!(A, B, C);
280impl_protocol_tuple!(A, B, C, D);
281impl_protocol_tuple!(A, B, C, D, E);
282impl_protocol_tuple!(A, B, C, D, E, F);
283impl_protocol_tuple!(A, B, C, D, E, F, G);
284impl_protocol_tuple!(A, B, C, D, E, F, G, H);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
288impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
289
290impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
291    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
292        Box::pin(async move {
293            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
294                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
295                kind: e.into(),
296            })?;
297            for _ in 0..N {
298                vec.push(T::read(stream).await?);
299            }
300            Ok(match vec.try_into() {
301                Ok(array) => array,
302                Err(_) => panic!("wrong array length"),
303            })
304        })
305    }
306
307    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
308        Box::pin(async move {
309            for elt in self {
310                elt.write(sink).await?;
311            }
312            Ok(())
313        })
314    }
315
316    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
317        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
318            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
319            kind: e.into(),
320        })?;
321        for _ in 0..N {
322            vec.push(T::read_sync(stream)?);
323        }
324        Ok(match vec.try_into() {
325            Ok(array) => array,
326            Err(_) => panic!("wrong array length"),
327        })
328    }
329
330    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
331        for elt in self {
332            elt.write_sync(sink)?;
333        }
334        Ok(())
335    }
336}
337
338/// Represented as one byte, with `0` for `false` and `1` for `true`.
339impl Protocol for bool {
340    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
341        Box::pin(async move {
342            Ok(match u8::read(stream).await? {
343                0 => false,
344                1 => true,
345                n => return Err(ReadError {
346                    context: ErrorContext::BuiltIn { for_type: "bool" },
347                    kind: ReadErrorKind::UnknownVariant8(n),
348                }),
349            })
350        })
351    }
352
353    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
354        Box::pin(async move {
355            if *self { 1u8 } else { 0 }.write(sink).await
356        })
357    }
358
359    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
360        Ok(match u8::read_sync(stream)? {
361            0 => false,
362            1 => true,
363            n => return Err(ReadError {
364                context: ErrorContext::BuiltIn { for_type: "bool" },
365                kind: ReadErrorKind::UnknownVariant8(n),
366            }),
367        })
368    }
369
370    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
371        if *self { 1u8 } else { 0 }.write_sync(sink)
372    }
373}
374
375impl<T: Protocol> Protocol for Box<T> {
376    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
377        Box::pin(async move {
378            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
379                context: ErrorContext::BuiltIn { for_type: "Box" },
380                kind: e.into(),
381            })?)
382        })
383    }
384
385    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
386        (**self).write(sink)
387    }
388
389    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
390        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
391            context: ErrorContext::BuiltIn { for_type: "Box" },
392            kind: e.into(),
393        })?)
394    }
395
396    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
397        (**self).write_sync(sink)
398    }
399}
400
401/// A vector is prefixed with the length as a [`u64`].
402///
403/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
404/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
405impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
406        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
407        Self::read_length_prefixed(stream, u64::MAX)
408    }
409
410    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
411        self.write_length_prefixed(sink, u64::MAX)
412    }
413
414    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
415        Self::read_length_prefixed_sync(stream, u64::MAX)
416    }
417
418    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
419        self.write_length_prefixed_sync(sink, u64::MAX)
420    }
421}
422
423/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
424/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
425impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
426    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
427        Box::pin(async move {
428            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
429            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
430                context: ErrorContext::BuiltIn { for_type: "Vec" },
431                kind: e.into(),
432            })?;
433            for _ in 0..len {
434                buf.push(T::read(stream).await?);
435            }
436            Ok(buf)
437        })
438    }
439
440    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
441        Box::pin(async move {
442            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
443            for elt in self {
444                elt.write(sink).await?;
445            }
446            Ok(())
447        })
448    }
449
450    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
451        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
452        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
453            context: ErrorContext::BuiltIn { for_type: "Vec" },
454            kind: e.into(),
455        })?;
456        for _ in 0..len {
457            buf.push(T::read_sync(stream)?);
458        }
459        Ok(buf)
460    }
461
462    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
463        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
464        for elt in self {
465            elt.write_sync(sink)?;
466        }
467        Ok(())
468    }
469}
470
471/// A set is prefixed with the length as a [`u64`].
472impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
473        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
474        Self::read_length_prefixed(stream, u64::MAX)
475    }
476
477    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
478        self.write_length_prefixed(sink, u64::MAX)
479    }
480
481    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
482        Self::read_length_prefixed_sync(stream, u64::MAX)
483    }
484
485    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
486        self.write_length_prefixed_sync(sink, u64::MAX)
487    }
488}
489
490impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
491    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
492        Box::pin(async move {
493            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
494            let mut set = Self::default();
495            for _ in 0..len {
496                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
497            }
498            Ok(set)
499        })
500    }
501
502    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
503        Box::pin(async move {
504            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
505            for elt in self {
506                elt.write(sink).await?;
507            }
508            Ok(())
509        })
510    }
511
512    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
513        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
514        for _ in 0..len {
515            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
516        }
517        Ok(set)
518    }
519
520    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
521        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
522        for elt in self {
523            elt.write_sync(sink)?;
524        }
525        Ok(())
526    }
527}
528
529/// A set is prefixed with the length as a [`u64`].
530impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
531    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
532        Self::read_length_prefixed(stream, u64::MAX)
533    }
534
535    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
536        self.write_length_prefixed(sink, u64::MAX)
537    }
538
539    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
540        Self::read_length_prefixed_sync(stream, u64::MAX)
541    }
542
543    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
544        self.write_length_prefixed_sync(sink, u64::MAX)
545    }
546}
547
548impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
549    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
550        Box::pin(async move {
551            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
552            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
553            for _ in 0..len {
554                set.insert(T::read(stream).await?);
555            }
556            Ok(set)
557        })
558    }
559
560    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
561        Box::pin(async move {
562            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
563            for elt in self {
564                elt.write(sink).await?;
565            }
566            Ok(())
567        })
568    }
569
570    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
571        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
572        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
573        for _ in 0..len {
574            set.insert(T::read_sync(stream)?);
575        }
576        Ok(set)
577    }
578
579    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
580        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
581        for elt in self {
582            elt.write_sync(sink)?;
583        }
584        Ok(())
585    }
586}
587
588/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
589impl Protocol for String {
590    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
591        Self::read_length_prefixed(stream, u64::MAX)
592    }
593
594    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
595        self.write_length_prefixed(sink, u64::MAX)
596    }
597
598    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
599        Self::read_length_prefixed_sync(stream, u64::MAX)
600    }
601
602    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
603        self.write_length_prefixed_sync(sink, u64::MAX)
604    }
605}
606
607/// A string is encoded in UTF-8 and prefixed with the length in bytes.
608impl LengthPrefixed for String {
609    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
610        Box::pin(async move {
611            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
612            let mut buf = Vec::default();
613            buf.try_resize(len, 0).map_err(|e| ReadError {
614                context: ErrorContext::BuiltIn { for_type: "String" },
615                kind: e.into(),
616            })?;
617            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
618                context: ErrorContext::BuiltIn { for_type: "String" },
619                kind: e.into(),
620            })?;
621            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
622                context: ErrorContext::BuiltIn { for_type: "String" },
623                kind: e.into(),
624            })?)
625        })
626    }
627
628    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
629        Box::pin(async move {
630            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
631            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
632                context: ErrorContext::BuiltIn { for_type: "String" },
633                kind: e.into(),
634            })?;
635            Ok(())
636        })
637    }
638
639    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
640        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
641        let mut buf = Vec::default();
642        buf.try_resize(len, 0).map_err(|e| ReadError {
643            context: ErrorContext::BuiltIn { for_type: "String" },
644            kind: e.into(),
645        })?;
646        stream.read_exact(&mut buf).map_err(|e| ReadError {
647            context: ErrorContext::BuiltIn { for_type: "String" },
648            kind: e.into(),
649        })?;
650        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
651            context: ErrorContext::BuiltIn { for_type: "String" },
652            kind: e.into(),
653        })?)
654    }
655
656    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
657        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
658        sink.write(self.as_bytes()).map_err(|e| WriteError {
659            context: ErrorContext::BuiltIn { for_type: "String" },
660            kind: e.into(),
661        })?;
662        Ok(())
663    }
664}
665
666impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
667    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
668        Self::read_length_prefixed(stream, u64::MAX)
669    }
670
671    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
672        self.write_length_prefixed(sink, u64::MAX)
673    }
674
675    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
676        Self::read_length_prefixed_sync(stream, u64::MAX)
677    }
678
679    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
680        self.write_length_prefixed_sync(sink, u64::MAX)
681    }
682}
683
684impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
685    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
686        Box::pin(async move {
687            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
688            let mut map = Self::default();
689            for _ in 0..len {
690                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
691            }
692            Ok(map)
693        })
694    }
695
696    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
697        Box::pin(async move {
698            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
699            for (k, v) in self {
700                k.write(sink).await?;
701                v.write(sink).await?;
702            }
703            Ok(())
704        })
705    }
706
707    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
708        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
709        let mut map = Self::default();
710        for _ in 0..len {
711            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
712        }
713        Ok(map)
714    }
715
716    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
717        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
718        for (k, v) in self {
719            k.write_sync(sink)?;
720            v.write_sync(sink)?;
721        }
722        Ok(())
723    }
724}
725
726/// A map is prefixed with the length as a [`u64`].
727impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
728    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
729        Self::read_length_prefixed(stream, u64::MAX)
730    }
731
732    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
733        self.write_length_prefixed(sink, u64::MAX)
734    }
735
736    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
737        Self::read_length_prefixed_sync(stream, u64::MAX)
738    }
739
740    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
741        self.write_length_prefixed_sync(sink, u64::MAX)
742    }
743}
744
745impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
746    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
747        Box::pin(async move {
748            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
749            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
750            for _ in 0..len {
751                map.insert(K::read(stream).await?, V::read(stream).await?);
752            }
753            Ok(map)
754        })
755    }
756
757    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
758        Box::pin(async move {
759            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
760            for (k, v) in self {
761                k.write(sink).await?;
762                v.write(sink).await?;
763            }
764            Ok(())
765        })
766    }
767
768    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
769        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
770        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
771        for _ in 0..len {
772            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
773        }
774        Ok(map)
775    }
776
777    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
778        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
779        for (k, v) in self {
780            k.write_sync(sink)?;
781            v.write_sync(sink)?;
782        }
783        Ok(())
784    }
785}
786
787/// A cow is represented like its owned variant.
788///
789/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
790impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
791where B::Owned: Protocol + Send + Sync {
792    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
793        Box::pin(async move {
794            Ok(Self::Owned(B::Owned::read(stream).await?))
795        })
796    }
797
798    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
799        Box::pin(async move {
800            match self {
801                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
802                Self::Owned(owned) => owned.write(sink).await?,
803            }
804            Ok(())
805        })
806    }
807
808    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
809        Ok(Self::Owned(B::Owned::read_sync(stream)?))
810    }
811
812    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
813        match self {
814            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
815            Self::Owned(owned) => owned.write_sync(sink)?,
816        }
817        Ok(())
818    }
819}
820
821/// A cow is represented like its owned variant.
822///
823/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
824impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
825where B::Owned: LengthPrefixed + Send + Sync {
826    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
827        Box::pin(async move {
828            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
829        })
830    }
831
832    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
833        Box::pin(async move {
834            match self {
835                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
836                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
837            }
838            Ok(())
839        })
840    }
841
842    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
843        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
844    }
845
846    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
847        match self {
848            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
849            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
850        }
851        Ok(())
852    }
853}
854
855#[derive(Protocol)]
856#[async_proto(internal)]
857struct F32Proxy([u8; 4]);
858
859impl From<F32Proxy> for f32 {
860    fn from(F32Proxy(bytes): F32Proxy) -> Self {
861        Self::from_be_bytes(bytes)
862    }
863}
864
865impl<'a> From<&'a f32> for F32Proxy {
866    fn from(val: &f32) -> Self {
867        Self(val.to_be_bytes())
868    }
869}
870
871#[derive(Protocol)]
872#[async_proto(internal)]
873struct F64Proxy([u8; 8]);
874
875impl From<F64Proxy> for f64 {
876    fn from(F64Proxy(bytes): F64Proxy) -> Self {
877        Self::from_be_bytes(bytes)
878    }
879}
880
881impl<'a> From<&'a f64> for F64Proxy {
882    fn from(val: &f64) -> Self {
883        Self(val.to_be_bytes())
884    }
885}
886
887#[derive(Protocol)]
888#[async_proto(internal)]
889struct DurationProxy {
890    secs: u64,
891    subsec_nanos: u32,
892}
893
894impl From<DurationProxy> for std::time::Duration {
895    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
896        Self::new(secs, subsec_nanos)
897    }
898}
899
900impl<'a> From<&'a std::time::Duration> for DurationProxy {
901    fn from(duration: &std::time::Duration) -> Self {
902        Self {
903            secs: duration.as_secs(),
904            subsec_nanos: duration.subsec_nanos(),
905        }
906    }
907}
908
909impl_protocol_for! {
910    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
911    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
912    type std::num::NonZeroU8;
913
914    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
915    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
916    type std::num::NonZeroI8;
917
918    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
919    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
920    type std::num::NonZeroU16;
921
922    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
923    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
924    type std::num::NonZeroI16;
925
926    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
927    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
928    type std::num::NonZeroU32;
929
930    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
931    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
932    type std::num::NonZeroI32;
933
934    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
935    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
936    type std::num::NonZeroU64;
937
938    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
939    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
940    type std::num::NonZeroI64;
941
942    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
943    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
944    type std::num::NonZeroU128;
945
946    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
947    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
948    type std::num::NonZeroI128;
949
950    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
951    #[async_proto(via = F32Proxy)]
952    type f32;
953
954    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
955    #[async_proto(via = F64Proxy)]
956    type f64;
957
958    #[async_proto(where(Idx: Protocol + Send + Sync))]
959    struct Range<Idx> {
960        start: Idx,
961        end: Idx,
962    }
963
964    #[async_proto(where(Idx: Protocol + Sync))]
965    struct RangeFrom<Idx> {
966        start: Idx,
967    }
968
969    #[async_proto(where(Idx: Protocol + Sync))]
970    struct RangeTo<Idx> {
971        end: Idx,
972    }
973
974    #[async_proto(where(Idx: Protocol + Sync))]
975    struct RangeToInclusive<Idx> {
976        end: Idx,
977    }
978
979    #[async_proto(where(T: Protocol + Sync))]
980    enum Option<T> {
981        None,
982        Some(T),
983    }
984
985    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
986    enum Result<T, E> {
987        Ok(T),
988        Err(E),
989    }
990
991    enum std::convert::Infallible {}
992
993    #[async_proto(where(T: Sync))]
994    struct std::marker::PhantomData<T>;
995
996    struct std::ops::RangeFull;
997
998    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
999    #[async_proto(via = DurationProxy)]
1000    type std::time::Duration;
1001}