async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "rust_decimal")] mod rust_decimal;
68#[cfg(feature = "semver")] mod semver;
69#[cfg(feature = "serde_json")] mod serde_json;
70#[cfg(feature = "serenity")] mod serenity;
71#[cfg(feature = "uuid")] mod uuid;
72
73async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
74    let len = match max_len {
75        0 => 0,
76        1..=255 => u8::read(stream).await?.into(),
77        256..=65_535 => u16::read(stream).await?.into(),
78        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
79        _ => u64::read(stream).await?,
80    };
81    if len > max_len {
82        Err(ReadError {
83            context: error_ctx(),
84            kind: ReadErrorKind::MaxLen { len, max_len },
85        })
86    } else {
87        usize::try_from(len).map_err(|e| ReadError {
88            context: error_ctx(),
89            kind: e.into(),
90        })
91    }
92}
93
94async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
95    let len = u64::try_from(len).map_err(|e| WriteError {
96        context: error_ctx(),
97        kind: e.into(),
98    })?;
99    if len > max_len {
100        return Err(WriteError {
101            context: error_ctx(),
102            kind: WriteErrorKind::MaxLen { len, max_len },
103        })
104    }
105    match max_len {
106        0 => {}
107        1..=255 => (len as u8).write(sink).await?,
108        256..=65_535 => (len as u16).write(sink).await?,
109        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
110        _ => len.write(sink).await?,
111    }
112    Ok(())
113}
114
115fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
116    let len = match max_len {
117        0 => 0,
118        1..=255 => u8::read_sync(stream)?.into(),
119        256..=65_535 => u16::read_sync(stream)?.into(),
120        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
121        _ => u64::read_sync(stream)?,
122    };
123    if len > max_len {
124        Err(ReadError {
125            context: error_ctx(),
126            kind: ReadErrorKind::MaxLen { len, max_len },
127        })
128    } else {
129        usize::try_from(len).map_err(|e| ReadError {
130            context: error_ctx(),
131            kind: e.into(),
132        })
133    }
134}
135
136fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
137    let len = u64::try_from(len).map_err(|e| WriteError {
138        context: error_ctx(),
139        kind: e.into(),
140    })?;
141    if len > max_len {
142        return Err(WriteError {
143            context: error_ctx(),
144            kind: WriteErrorKind::MaxLen { len, max_len },
145        })
146    }
147    match max_len {
148        0 => {}
149        1..=255 => (len as u8).write_sync(sink)?,
150        256..=65_535 => (len as u16).write_sync(sink)?,
151        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
152        _ => len.write_sync(sink)?,
153    }
154    Ok(())
155}
156
157macro_rules! impl_protocol_primitive {
158    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
159        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
160        impl Protocol for $ty {
161            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
162                Box::pin(async move {
163                    Ok(stream.$read().await.map_err(|e| ReadError {
164                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
165                        kind: e.into(),
166                    })?)
167                })
168            }
169
170            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
171                Box::pin(async move {
172                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
173                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
174                        kind: e.into(),
175                    })?)
176                })
177            }
178
179            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
180                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
181                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
182                    kind: e.into(),
183                })?)
184            }
185
186            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
187                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
188                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
189                    kind: e.into(),
190                })?)
191            }
192        }
193    };
194}
195
196impl_protocol_primitive!(u8, read_u8, write_u8);
197impl_protocol_primitive!(i8, read_i8, write_i8);
198impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
199impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
200impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
201impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
202impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
203impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
204impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
205impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
206
207impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
208    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
209        Box::pin(async move {
210            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
211        })
212    }
213
214    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
215        Box::pin(async move {
216            self.start().write(sink).await?;
217            self.end().write(sink).await?;
218            Ok(())
219        })
220    }
221
222    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
223        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
224    }
225
226    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
227        self.start().write_sync(sink)?;
228        self.end().write_sync(sink)?;
229        Ok(())
230    }
231}
232
233macro_rules! impl_protocol_tuple {
234    ($($ty:ident),*) => {
235        #[allow(unused)]
236        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
237            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
238                Box::pin(async move {
239                    Ok((
240                        $($ty::read(stream).await?,)*
241                    ))
242                })
243            }
244
245            #[allow(non_snake_case)]
246            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
247                Box::pin(async move {
248                    let ($($ty,)*) = self;
249                    $(
250                        $ty.write(sink).await?;
251                    )*
252                    Ok(())
253                })
254            }
255
256            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
257                Ok((
258                    $($ty::read_sync(stream)?,)*
259                ))
260            }
261
262            #[allow(non_snake_case)]
263            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
264                let ($($ty,)*) = self;
265                $(
266                    $ty.write_sync(sink)?;
267                )*
268                Ok(())
269            }
270        }
271    };
272}
273
274impl_protocol_tuple!();
275impl_protocol_tuple!(A);
276impl_protocol_tuple!(A, B);
277impl_protocol_tuple!(A, B, C);
278impl_protocol_tuple!(A, B, C, D);
279impl_protocol_tuple!(A, B, C, D, E);
280impl_protocol_tuple!(A, B, C, D, E, F);
281impl_protocol_tuple!(A, B, C, D, E, F, G);
282impl_protocol_tuple!(A, B, C, D, E, F, G, H);
283impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
284impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
287
288impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
289    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
290        Box::pin(async move {
291            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
292                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
293                kind: e.into(),
294            })?;
295            for _ in 0..N {
296                vec.push(T::read(stream).await?);
297            }
298            Ok(match vec.try_into() {
299                Ok(array) => array,
300                Err(_) => panic!("wrong array length"),
301            })
302        })
303    }
304
305    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
306        Box::pin(async move {
307            for elt in self {
308                elt.write(sink).await?;
309            }
310            Ok(())
311        })
312    }
313
314    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
315        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
316            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
317            kind: e.into(),
318        })?;
319        for _ in 0..N {
320            vec.push(T::read_sync(stream)?);
321        }
322        Ok(match vec.try_into() {
323            Ok(array) => array,
324            Err(_) => panic!("wrong array length"),
325        })
326    }
327
328    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
329        for elt in self {
330            elt.write_sync(sink)?;
331        }
332        Ok(())
333    }
334}
335
336/// Represented as one byte, with `0` for `false` and `1` for `true`.
337impl Protocol for bool {
338    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
339        Box::pin(async move {
340            Ok(match u8::read(stream).await? {
341                0 => false,
342                1 => true,
343                n => return Err(ReadError {
344                    context: ErrorContext::BuiltIn { for_type: "bool" },
345                    kind: ReadErrorKind::UnknownVariant8(n),
346                }),
347            })
348        })
349    }
350
351    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
352        Box::pin(async move {
353            if *self { 1u8 } else { 0 }.write(sink).await
354        })
355    }
356
357    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
358        Ok(match u8::read_sync(stream)? {
359            0 => false,
360            1 => true,
361            n => return Err(ReadError {
362                context: ErrorContext::BuiltIn { for_type: "bool" },
363                kind: ReadErrorKind::UnknownVariant8(n),
364            }),
365        })
366    }
367
368    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
369        if *self { 1u8 } else { 0 }.write_sync(sink)
370    }
371}
372
373impl<T: Protocol> Protocol for Box<T> {
374    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
375        Box::pin(async move {
376            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
377                context: ErrorContext::BuiltIn { for_type: "Box" },
378                kind: e.into(),
379            })?)
380        })
381    }
382
383    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
384        (**self).write(sink)
385    }
386
387    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
388        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
389            context: ErrorContext::BuiltIn { for_type: "Box" },
390            kind: e.into(),
391        })?)
392    }
393
394    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
395        (**self).write_sync(sink)
396    }
397}
398
399/// A vector is prefixed with the length as a [`u64`].
400///
401/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
402/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
403impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
404        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
405        Self::read_length_prefixed(stream, u64::MAX)
406    }
407
408    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
409        self.write_length_prefixed(sink, u64::MAX)
410    }
411
412    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
413        Self::read_length_prefixed_sync(stream, u64::MAX)
414    }
415
416    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
417        self.write_length_prefixed_sync(sink, u64::MAX)
418    }
419}
420
421/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
422/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
423impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
424    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
425        Box::pin(async move {
426            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
427            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
428                context: ErrorContext::BuiltIn { for_type: "Vec" },
429                kind: e.into(),
430            })?;
431            for _ in 0..len {
432                buf.push(T::read(stream).await?);
433            }
434            Ok(buf)
435        })
436    }
437
438    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
439        Box::pin(async move {
440            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
441            for elt in self {
442                elt.write(sink).await?;
443            }
444            Ok(())
445        })
446    }
447
448    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
449        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
450        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
451            context: ErrorContext::BuiltIn { for_type: "Vec" },
452            kind: e.into(),
453        })?;
454        for _ in 0..len {
455            buf.push(T::read_sync(stream)?);
456        }
457        Ok(buf)
458    }
459
460    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
461        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
462        for elt in self {
463            elt.write_sync(sink)?;
464        }
465        Ok(())
466    }
467}
468
469/// A set is prefixed with the length as a [`u64`].
470impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
471        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
472        Self::read_length_prefixed(stream, u64::MAX)
473    }
474
475    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
476        self.write_length_prefixed(sink, u64::MAX)
477    }
478
479    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
480        Self::read_length_prefixed_sync(stream, u64::MAX)
481    }
482
483    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
484        self.write_length_prefixed_sync(sink, u64::MAX)
485    }
486}
487
488impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
489    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
490        Box::pin(async move {
491            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
492            let mut set = Self::default();
493            for _ in 0..len {
494                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
495            }
496            Ok(set)
497        })
498    }
499
500    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
501        Box::pin(async move {
502            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
503            for elt in self {
504                elt.write(sink).await?;
505            }
506            Ok(())
507        })
508    }
509
510    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
511        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
512        for _ in 0..len {
513            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
514        }
515        Ok(set)
516    }
517
518    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
519        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
520        for elt in self {
521            elt.write_sync(sink)?;
522        }
523        Ok(())
524    }
525}
526
527impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
528    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
529        Self::read_length_prefixed(stream, u64::MAX)
530    }
531
532    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
533        self.write_length_prefixed(sink, u64::MAX)
534    }
535
536    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
537        Self::read_length_prefixed_sync(stream, u64::MAX)
538    }
539
540    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
541        self.write_length_prefixed_sync(sink, u64::MAX)
542    }
543}
544
545impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
546    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
547        Box::pin(async move {
548            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
549            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
550            for _ in 0..len {
551                set.insert(T::read(stream).await?);
552            }
553            Ok(set)
554        })
555    }
556
557    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
558        Box::pin(async move {
559            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
560            for elt in self {
561                elt.write(sink).await?;
562            }
563            Ok(())
564        })
565    }
566
567    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
568        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
569        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
570        for _ in 0..len {
571            set.insert(T::read_sync(stream)?);
572        }
573        Ok(set)
574    }
575
576    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
577        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
578        for elt in self {
579            elt.write_sync(sink)?;
580        }
581        Ok(())
582    }
583}
584
585/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
586impl Protocol for String {
587    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
588        Self::read_length_prefixed(stream, u64::MAX)
589    }
590
591    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
592        self.write_length_prefixed(sink, u64::MAX)
593    }
594
595    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
596        Self::read_length_prefixed_sync(stream, u64::MAX)
597    }
598
599    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
600        self.write_length_prefixed_sync(sink, u64::MAX)
601    }
602}
603
604/// A string is encoded in UTF-8 and prefixed with the length in bytes.
605impl LengthPrefixed for String {
606    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
607        Box::pin(async move {
608            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
609            let mut buf = Vec::default();
610            buf.try_resize(len, 0).map_err(|e| ReadError {
611                context: ErrorContext::BuiltIn { for_type: "String" },
612                kind: e.into(),
613            })?;
614            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
615                context: ErrorContext::BuiltIn { for_type: "String" },
616                kind: e.into(),
617            })?;
618            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
619                context: ErrorContext::BuiltIn { for_type: "String" },
620                kind: e.into(),
621            })?)
622        })
623    }
624
625    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
626        Box::pin(async move {
627            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
628            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
629                context: ErrorContext::BuiltIn { for_type: "String" },
630                kind: e.into(),
631            })?;
632            Ok(())
633        })
634    }
635
636    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
637        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
638        let mut buf = Vec::default();
639        buf.try_resize(len, 0).map_err(|e| ReadError {
640            context: ErrorContext::BuiltIn { for_type: "String" },
641            kind: e.into(),
642        })?;
643        stream.read_exact(&mut buf).map_err(|e| ReadError {
644            context: ErrorContext::BuiltIn { for_type: "String" },
645            kind: e.into(),
646        })?;
647        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
648            context: ErrorContext::BuiltIn { for_type: "String" },
649            kind: e.into(),
650        })?)
651    }
652
653    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
654        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
655        sink.write(self.as_bytes()).map_err(|e| WriteError {
656            context: ErrorContext::BuiltIn { for_type: "String" },
657            kind: e.into(),
658        })?;
659        Ok(())
660    }
661}
662
663impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
664    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
665        Self::read_length_prefixed(stream, u64::MAX)
666    }
667
668    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
669        self.write_length_prefixed(sink, u64::MAX)
670    }
671
672    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
673        Self::read_length_prefixed_sync(stream, u64::MAX)
674    }
675
676    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
677        self.write_length_prefixed_sync(sink, u64::MAX)
678    }
679}
680
681impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
682    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
683        Box::pin(async move {
684            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
685            let mut map = Self::default();
686            for _ in 0..len {
687                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
688            }
689            Ok(map)
690        })
691    }
692
693    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
694        Box::pin(async move {
695            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
696            for (k, v) in self {
697                k.write(sink).await?;
698                v.write(sink).await?;
699            }
700            Ok(())
701        })
702    }
703
704    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
705        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
706        let mut map = Self::default();
707        for _ in 0..len {
708            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
709        }
710        Ok(map)
711    }
712
713    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
714        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
715        for (k, v) in self {
716            k.write_sync(sink)?;
717            v.write_sync(sink)?;
718        }
719        Ok(())
720    }
721}
722
723/// A map is prefixed with the length as a [`u64`].
724impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
725    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
726        Self::read_length_prefixed(stream, u64::MAX)
727    }
728
729    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
730        self.write_length_prefixed(sink, u64::MAX)
731    }
732
733    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
734        Self::read_length_prefixed_sync(stream, u64::MAX)
735    }
736
737    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
738        self.write_length_prefixed_sync(sink, u64::MAX)
739    }
740}
741
742impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
743    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
744        Box::pin(async move {
745            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
746            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
747            for _ in 0..len {
748                map.insert(K::read(stream).await?, V::read(stream).await?);
749            }
750            Ok(map)
751        })
752    }
753
754    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
755        Box::pin(async move {
756            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
757            for (k, v) in self {
758                k.write(sink).await?;
759                v.write(sink).await?;
760            }
761            Ok(())
762        })
763    }
764
765    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
766        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
767        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
768        for _ in 0..len {
769            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
770        }
771        Ok(map)
772    }
773
774    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
775        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
776        for (k, v) in self {
777            k.write_sync(sink)?;
778            v.write_sync(sink)?;
779        }
780        Ok(())
781    }
782}
783
784/// A cow is represented like its owned variant.
785///
786/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
787impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
788where B::Owned: Protocol + Send + Sync {
789    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
790        Box::pin(async move {
791            Ok(Self::Owned(B::Owned::read(stream).await?))
792        })
793    }
794
795    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
796        Box::pin(async move {
797            match self {
798                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
799                Self::Owned(owned) => owned.write(sink).await?,
800            }
801            Ok(())
802        })
803    }
804
805    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
806        Ok(Self::Owned(B::Owned::read_sync(stream)?))
807    }
808
809    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
810        match self {
811            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
812            Self::Owned(owned) => owned.write_sync(sink)?,
813        }
814        Ok(())
815    }
816}
817
818/// A cow is represented like its owned variant.
819///
820/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
821impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
822where B::Owned: LengthPrefixed + Send + Sync {
823    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
824        Box::pin(async move {
825            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
826        })
827    }
828
829    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
830        Box::pin(async move {
831            match self {
832                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
833                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
834            }
835            Ok(())
836        })
837    }
838
839    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
840        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
841    }
842
843    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
844        match self {
845            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
846            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
847        }
848        Ok(())
849    }
850}
851
852#[derive(Protocol)]
853#[async_proto(internal)]
854struct F32Proxy([u8; 4]);
855
856impl From<F32Proxy> for f32 {
857    fn from(F32Proxy(bytes): F32Proxy) -> Self {
858        Self::from_be_bytes(bytes)
859    }
860}
861
862impl<'a> From<&'a f32> for F32Proxy {
863    fn from(val: &f32) -> Self {
864        Self(val.to_be_bytes())
865    }
866}
867
868#[derive(Protocol)]
869#[async_proto(internal)]
870struct F64Proxy([u8; 8]);
871
872impl From<F64Proxy> for f64 {
873    fn from(F64Proxy(bytes): F64Proxy) -> Self {
874        Self::from_be_bytes(bytes)
875    }
876}
877
878impl<'a> From<&'a f64> for F64Proxy {
879    fn from(val: &f64) -> Self {
880        Self(val.to_be_bytes())
881    }
882}
883
884#[derive(Protocol)]
885#[async_proto(internal)]
886struct DurationProxy {
887    secs: u64,
888    subsec_nanos: u32,
889}
890
891impl From<DurationProxy> for std::time::Duration {
892    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
893        Self::new(secs, subsec_nanos)
894    }
895}
896
897impl<'a> From<&'a std::time::Duration> for DurationProxy {
898    fn from(duration: &std::time::Duration) -> Self {
899        Self {
900            secs: duration.as_secs(),
901            subsec_nanos: duration.subsec_nanos(),
902        }
903    }
904}
905
906impl_protocol_for! {
907    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
908    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
909    type std::num::NonZeroU8;
910
911    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
912    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
913    type std::num::NonZeroI8;
914
915    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
916    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
917    type std::num::NonZeroU16;
918
919    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
920    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
921    type std::num::NonZeroI16;
922
923    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
924    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
925    type std::num::NonZeroU32;
926
927    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
928    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
929    type std::num::NonZeroI32;
930
931    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
932    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
933    type std::num::NonZeroU64;
934
935    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
936    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
937    type std::num::NonZeroI64;
938
939    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
940    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
941    type std::num::NonZeroU128;
942
943    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
944    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
945    type std::num::NonZeroI128;
946
947    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
948    #[async_proto(via = F32Proxy)]
949    type f32;
950
951    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
952    #[async_proto(via = F64Proxy)]
953    type f64;
954
955    #[async_proto(where(Idx: Protocol + Send + Sync))]
956    struct Range<Idx> {
957        start: Idx,
958        end: Idx,
959    }
960
961    #[async_proto(where(Idx: Protocol + Sync))]
962    struct RangeFrom<Idx> {
963        start: Idx,
964    }
965
966    #[async_proto(where(Idx: Protocol + Sync))]
967    struct RangeTo<Idx> {
968        end: Idx,
969    }
970
971    #[async_proto(where(Idx: Protocol + Sync))]
972    struct RangeToInclusive<Idx> {
973        end: Idx,
974    }
975
976    #[async_proto(where(T: Protocol + Sync))]
977    enum Option<T> {
978        None,
979        Some(T),
980    }
981
982    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
983    enum Result<T, E> {
984        Ok(T),
985        Err(E),
986    }
987
988    enum std::convert::Infallible {}
989
990    #[async_proto(where(T: Sync))]
991    struct std::marker::PhantomData<T>;
992
993    struct std::ops::RangeFull;
994
995    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
996    #[async_proto(via = DurationProxy)]
997    type std::time::Duration;
998}