async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "rust_decimal")] mod rust_decimal;
68#[cfg(feature = "semver")] mod semver;
69#[cfg(feature = "serde_json")] mod serde_json;
70#[cfg(feature = "serenity")] mod serenity;
71#[cfg(feature = "url")] mod url;
72#[cfg(feature = "uuid")] mod uuid;
73
74async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
75    let len = match max_len {
76        0 => 0,
77        1..=255 => u8::read(stream).await?.into(),
78        256..=65_535 => u16::read(stream).await?.into(),
79        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
80        _ => u64::read(stream).await?,
81    };
82    if len > max_len {
83        Err(ReadError {
84            context: error_ctx(),
85            kind: ReadErrorKind::MaxLen { len, max_len },
86        })
87    } else {
88        usize::try_from(len).map_err(|e| ReadError {
89            context: error_ctx(),
90            kind: e.into(),
91        })
92    }
93}
94
95async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
96    let len = u64::try_from(len).map_err(|e| WriteError {
97        context: error_ctx(),
98        kind: e.into(),
99    })?;
100    if len > max_len {
101        return Err(WriteError {
102            context: error_ctx(),
103            kind: WriteErrorKind::MaxLen { len, max_len },
104        })
105    }
106    match max_len {
107        0 => {}
108        1..=255 => (len as u8).write(sink).await?,
109        256..=65_535 => (len as u16).write(sink).await?,
110        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
111        _ => len.write(sink).await?,
112    }
113    Ok(())
114}
115
116fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
117    let len = match max_len {
118        0 => 0,
119        1..=255 => u8::read_sync(stream)?.into(),
120        256..=65_535 => u16::read_sync(stream)?.into(),
121        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
122        _ => u64::read_sync(stream)?,
123    };
124    if len > max_len {
125        Err(ReadError {
126            context: error_ctx(),
127            kind: ReadErrorKind::MaxLen { len, max_len },
128        })
129    } else {
130        usize::try_from(len).map_err(|e| ReadError {
131            context: error_ctx(),
132            kind: e.into(),
133        })
134    }
135}
136
137fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
138    let len = u64::try_from(len).map_err(|e| WriteError {
139        context: error_ctx(),
140        kind: e.into(),
141    })?;
142    if len > max_len {
143        return Err(WriteError {
144            context: error_ctx(),
145            kind: WriteErrorKind::MaxLen { len, max_len },
146        })
147    }
148    match max_len {
149        0 => {}
150        1..=255 => (len as u8).write_sync(sink)?,
151        256..=65_535 => (len as u16).write_sync(sink)?,
152        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
153        _ => len.write_sync(sink)?,
154    }
155    Ok(())
156}
157
158macro_rules! impl_protocol_primitive {
159    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
160        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
161        impl Protocol for $ty {
162            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
163                Box::pin(async move {
164                    Ok(stream.$read().await.map_err(|e| ReadError {
165                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
166                        kind: e.into(),
167                    })?)
168                })
169            }
170
171            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
172                Box::pin(async move {
173                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
174                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
175                        kind: e.into(),
176                    })?)
177                })
178            }
179
180            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
181                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
182                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
183                    kind: e.into(),
184                })?)
185            }
186
187            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
188                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
189                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
190                    kind: e.into(),
191                })?)
192            }
193        }
194    };
195}
196
197impl_protocol_primitive!(u8, read_u8, write_u8);
198impl_protocol_primitive!(i8, read_i8, write_i8);
199impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
200impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
201impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
202impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
203impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
204impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
205impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
206impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
207
208impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
209    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
210        Box::pin(async move {
211            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
212        })
213    }
214
215    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
216        Box::pin(async move {
217            self.start().write(sink).await?;
218            self.end().write(sink).await?;
219            Ok(())
220        })
221    }
222
223    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
224        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
225    }
226
227    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
228        self.start().write_sync(sink)?;
229        self.end().write_sync(sink)?;
230        Ok(())
231    }
232}
233
234macro_rules! impl_protocol_tuple {
235    ($($ty:ident),*) => {
236        #[allow(unused)]
237        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
238            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
239                Box::pin(async move {
240                    Ok((
241                        $($ty::read(stream).await?,)*
242                    ))
243                })
244            }
245
246            #[allow(non_snake_case)]
247            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
248                Box::pin(async move {
249                    let ($($ty,)*) = self;
250                    $(
251                        $ty.write(sink).await?;
252                    )*
253                    Ok(())
254                })
255            }
256
257            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
258                Ok((
259                    $($ty::read_sync(stream)?,)*
260                ))
261            }
262
263            #[allow(non_snake_case)]
264            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
265                let ($($ty,)*) = self;
266                $(
267                    $ty.write_sync(sink)?;
268                )*
269                Ok(())
270            }
271        }
272    };
273}
274
275impl_protocol_tuple!();
276impl_protocol_tuple!(A);
277impl_protocol_tuple!(A, B);
278impl_protocol_tuple!(A, B, C);
279impl_protocol_tuple!(A, B, C, D);
280impl_protocol_tuple!(A, B, C, D, E);
281impl_protocol_tuple!(A, B, C, D, E, F);
282impl_protocol_tuple!(A, B, C, D, E, F, G);
283impl_protocol_tuple!(A, B, C, D, E, F, G, H);
284impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
288
289impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
290    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
291        Box::pin(async move {
292            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
293                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
294                kind: e.into(),
295            })?;
296            for _ in 0..N {
297                vec.push(T::read(stream).await?);
298            }
299            Ok(match vec.try_into() {
300                Ok(array) => array,
301                Err(_) => panic!("wrong array length"),
302            })
303        })
304    }
305
306    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
307        Box::pin(async move {
308            for elt in self {
309                elt.write(sink).await?;
310            }
311            Ok(())
312        })
313    }
314
315    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
316        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
317            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
318            kind: e.into(),
319        })?;
320        for _ in 0..N {
321            vec.push(T::read_sync(stream)?);
322        }
323        Ok(match vec.try_into() {
324            Ok(array) => array,
325            Err(_) => panic!("wrong array length"),
326        })
327    }
328
329    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
330        for elt in self {
331            elt.write_sync(sink)?;
332        }
333        Ok(())
334    }
335}
336
337/// Represented as one byte, with `0` for `false` and `1` for `true`.
338impl Protocol for bool {
339    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
340        Box::pin(async move {
341            Ok(match u8::read(stream).await? {
342                0 => false,
343                1 => true,
344                n => return Err(ReadError {
345                    context: ErrorContext::BuiltIn { for_type: "bool" },
346                    kind: ReadErrorKind::UnknownVariant8(n),
347                }),
348            })
349        })
350    }
351
352    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
353        Box::pin(async move {
354            if *self { 1u8 } else { 0 }.write(sink).await
355        })
356    }
357
358    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
359        Ok(match u8::read_sync(stream)? {
360            0 => false,
361            1 => true,
362            n => return Err(ReadError {
363                context: ErrorContext::BuiltIn { for_type: "bool" },
364                kind: ReadErrorKind::UnknownVariant8(n),
365            }),
366        })
367    }
368
369    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
370        if *self { 1u8 } else { 0 }.write_sync(sink)
371    }
372}
373
374impl<T: Protocol> Protocol for Box<T> {
375    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
376        Box::pin(async move {
377            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
378                context: ErrorContext::BuiltIn { for_type: "Box" },
379                kind: e.into(),
380            })?)
381        })
382    }
383
384    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
385        (**self).write(sink)
386    }
387
388    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
389        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
390            context: ErrorContext::BuiltIn { for_type: "Box" },
391            kind: e.into(),
392        })?)
393    }
394
395    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
396        (**self).write_sync(sink)
397    }
398}
399
400/// A vector is prefixed with the length as a [`u64`].
401///
402/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
403/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
404impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
405        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
406        Self::read_length_prefixed(stream, u64::MAX)
407    }
408
409    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
410        self.write_length_prefixed(sink, u64::MAX)
411    }
412
413    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
414        Self::read_length_prefixed_sync(stream, u64::MAX)
415    }
416
417    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
418        self.write_length_prefixed_sync(sink, u64::MAX)
419    }
420}
421
422/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
423/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
424impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
425    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
426        Box::pin(async move {
427            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
428            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
429                context: ErrorContext::BuiltIn { for_type: "Vec" },
430                kind: e.into(),
431            })?;
432            for _ in 0..len {
433                buf.push(T::read(stream).await?);
434            }
435            Ok(buf)
436        })
437    }
438
439    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
440        Box::pin(async move {
441            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
442            for elt in self {
443                elt.write(sink).await?;
444            }
445            Ok(())
446        })
447    }
448
449    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
450        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
451        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
452            context: ErrorContext::BuiltIn { for_type: "Vec" },
453            kind: e.into(),
454        })?;
455        for _ in 0..len {
456            buf.push(T::read_sync(stream)?);
457        }
458        Ok(buf)
459    }
460
461    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
462        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
463        for elt in self {
464            elt.write_sync(sink)?;
465        }
466        Ok(())
467    }
468}
469
470/// A set is prefixed with the length as a [`u64`].
471impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
472        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
473        Self::read_length_prefixed(stream, u64::MAX)
474    }
475
476    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
477        self.write_length_prefixed(sink, u64::MAX)
478    }
479
480    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
481        Self::read_length_prefixed_sync(stream, u64::MAX)
482    }
483
484    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
485        self.write_length_prefixed_sync(sink, u64::MAX)
486    }
487}
488
489impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
490    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
491        Box::pin(async move {
492            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
493            let mut set = Self::default();
494            for _ in 0..len {
495                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
496            }
497            Ok(set)
498        })
499    }
500
501    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
502        Box::pin(async move {
503            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
504            for elt in self {
505                elt.write(sink).await?;
506            }
507            Ok(())
508        })
509    }
510
511    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
512        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
513        for _ in 0..len {
514            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
515        }
516        Ok(set)
517    }
518
519    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
520        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
521        for elt in self {
522            elt.write_sync(sink)?;
523        }
524        Ok(())
525    }
526}
527
528impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
529    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
530        Self::read_length_prefixed(stream, u64::MAX)
531    }
532
533    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
534        self.write_length_prefixed(sink, u64::MAX)
535    }
536
537    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
538        Self::read_length_prefixed_sync(stream, u64::MAX)
539    }
540
541    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
542        self.write_length_prefixed_sync(sink, u64::MAX)
543    }
544}
545
546impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
547    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
548        Box::pin(async move {
549            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
550            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
551            for _ in 0..len {
552                set.insert(T::read(stream).await?);
553            }
554            Ok(set)
555        })
556    }
557
558    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
559        Box::pin(async move {
560            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
561            for elt in self {
562                elt.write(sink).await?;
563            }
564            Ok(())
565        })
566    }
567
568    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
569        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
570        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
571        for _ in 0..len {
572            set.insert(T::read_sync(stream)?);
573        }
574        Ok(set)
575    }
576
577    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
578        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
579        for elt in self {
580            elt.write_sync(sink)?;
581        }
582        Ok(())
583    }
584}
585
586/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
587impl Protocol for String {
588    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
589        Self::read_length_prefixed(stream, u64::MAX)
590    }
591
592    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
593        self.write_length_prefixed(sink, u64::MAX)
594    }
595
596    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
597        Self::read_length_prefixed_sync(stream, u64::MAX)
598    }
599
600    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
601        self.write_length_prefixed_sync(sink, u64::MAX)
602    }
603}
604
605/// A string is encoded in UTF-8 and prefixed with the length in bytes.
606impl LengthPrefixed for String {
607    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
608        Box::pin(async move {
609            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
610            let mut buf = Vec::default();
611            buf.try_resize(len, 0).map_err(|e| ReadError {
612                context: ErrorContext::BuiltIn { for_type: "String" },
613                kind: e.into(),
614            })?;
615            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
616                context: ErrorContext::BuiltIn { for_type: "String" },
617                kind: e.into(),
618            })?;
619            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
620                context: ErrorContext::BuiltIn { for_type: "String" },
621                kind: e.into(),
622            })?)
623        })
624    }
625
626    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
627        Box::pin(async move {
628            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
629            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
630                context: ErrorContext::BuiltIn { for_type: "String" },
631                kind: e.into(),
632            })?;
633            Ok(())
634        })
635    }
636
637    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
638        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
639        let mut buf = Vec::default();
640        buf.try_resize(len, 0).map_err(|e| ReadError {
641            context: ErrorContext::BuiltIn { for_type: "String" },
642            kind: e.into(),
643        })?;
644        stream.read_exact(&mut buf).map_err(|e| ReadError {
645            context: ErrorContext::BuiltIn { for_type: "String" },
646            kind: e.into(),
647        })?;
648        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
649            context: ErrorContext::BuiltIn { for_type: "String" },
650            kind: e.into(),
651        })?)
652    }
653
654    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
655        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
656        sink.write(self.as_bytes()).map_err(|e| WriteError {
657            context: ErrorContext::BuiltIn { for_type: "String" },
658            kind: e.into(),
659        })?;
660        Ok(())
661    }
662}
663
664impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
665    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
666        Self::read_length_prefixed(stream, u64::MAX)
667    }
668
669    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
670        self.write_length_prefixed(sink, u64::MAX)
671    }
672
673    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
674        Self::read_length_prefixed_sync(stream, u64::MAX)
675    }
676
677    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
678        self.write_length_prefixed_sync(sink, u64::MAX)
679    }
680}
681
682impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
683    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
684        Box::pin(async move {
685            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
686            let mut map = Self::default();
687            for _ in 0..len {
688                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
689            }
690            Ok(map)
691        })
692    }
693
694    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
695        Box::pin(async move {
696            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
697            for (k, v) in self {
698                k.write(sink).await?;
699                v.write(sink).await?;
700            }
701            Ok(())
702        })
703    }
704
705    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
706        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
707        let mut map = Self::default();
708        for _ in 0..len {
709            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
710        }
711        Ok(map)
712    }
713
714    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
715        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
716        for (k, v) in self {
717            k.write_sync(sink)?;
718            v.write_sync(sink)?;
719        }
720        Ok(())
721    }
722}
723
724/// A map is prefixed with the length as a [`u64`].
725impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
726    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
727        Self::read_length_prefixed(stream, u64::MAX)
728    }
729
730    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
731        self.write_length_prefixed(sink, u64::MAX)
732    }
733
734    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
735        Self::read_length_prefixed_sync(stream, u64::MAX)
736    }
737
738    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
739        self.write_length_prefixed_sync(sink, u64::MAX)
740    }
741}
742
743impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
744    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
745        Box::pin(async move {
746            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
747            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
748            for _ in 0..len {
749                map.insert(K::read(stream).await?, V::read(stream).await?);
750            }
751            Ok(map)
752        })
753    }
754
755    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
756        Box::pin(async move {
757            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
758            for (k, v) in self {
759                k.write(sink).await?;
760                v.write(sink).await?;
761            }
762            Ok(())
763        })
764    }
765
766    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
767        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
768        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
769        for _ in 0..len {
770            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
771        }
772        Ok(map)
773    }
774
775    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
776        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
777        for (k, v) in self {
778            k.write_sync(sink)?;
779            v.write_sync(sink)?;
780        }
781        Ok(())
782    }
783}
784
785/// A cow is represented like its owned variant.
786///
787/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
788impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
789where B::Owned: Protocol + Send + Sync {
790    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
791        Box::pin(async move {
792            Ok(Self::Owned(B::Owned::read(stream).await?))
793        })
794    }
795
796    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
797        Box::pin(async move {
798            match self {
799                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
800                Self::Owned(owned) => owned.write(sink).await?,
801            }
802            Ok(())
803        })
804    }
805
806    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
807        Ok(Self::Owned(B::Owned::read_sync(stream)?))
808    }
809
810    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
811        match self {
812            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
813            Self::Owned(owned) => owned.write_sync(sink)?,
814        }
815        Ok(())
816    }
817}
818
819/// A cow is represented like its owned variant.
820///
821/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
822impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
823where B::Owned: LengthPrefixed + Send + Sync {
824    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
825        Box::pin(async move {
826            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
827        })
828    }
829
830    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
831        Box::pin(async move {
832            match self {
833                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
834                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
835            }
836            Ok(())
837        })
838    }
839
840    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
841        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
842    }
843
844    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
845        match self {
846            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
847            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
848        }
849        Ok(())
850    }
851}
852
853#[derive(Protocol)]
854#[async_proto(internal)]
855struct F32Proxy([u8; 4]);
856
857impl From<F32Proxy> for f32 {
858    fn from(F32Proxy(bytes): F32Proxy) -> Self {
859        Self::from_be_bytes(bytes)
860    }
861}
862
863impl<'a> From<&'a f32> for F32Proxy {
864    fn from(val: &f32) -> Self {
865        Self(val.to_be_bytes())
866    }
867}
868
869#[derive(Protocol)]
870#[async_proto(internal)]
871struct F64Proxy([u8; 8]);
872
873impl From<F64Proxy> for f64 {
874    fn from(F64Proxy(bytes): F64Proxy) -> Self {
875        Self::from_be_bytes(bytes)
876    }
877}
878
879impl<'a> From<&'a f64> for F64Proxy {
880    fn from(val: &f64) -> Self {
881        Self(val.to_be_bytes())
882    }
883}
884
885#[derive(Protocol)]
886#[async_proto(internal)]
887struct DurationProxy {
888    secs: u64,
889    subsec_nanos: u32,
890}
891
892impl From<DurationProxy> for std::time::Duration {
893    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
894        Self::new(secs, subsec_nanos)
895    }
896}
897
898impl<'a> From<&'a std::time::Duration> for DurationProxy {
899    fn from(duration: &std::time::Duration) -> Self {
900        Self {
901            secs: duration.as_secs(),
902            subsec_nanos: duration.subsec_nanos(),
903        }
904    }
905}
906
907impl_protocol_for! {
908    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
909    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
910    type std::num::NonZeroU8;
911
912    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
913    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
914    type std::num::NonZeroI8;
915
916    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
917    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
918    type std::num::NonZeroU16;
919
920    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
921    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
922    type std::num::NonZeroI16;
923
924    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
925    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
926    type std::num::NonZeroU32;
927
928    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
929    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
930    type std::num::NonZeroI32;
931
932    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
933    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
934    type std::num::NonZeroU64;
935
936    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
937    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
938    type std::num::NonZeroI64;
939
940    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
941    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
942    type std::num::NonZeroU128;
943
944    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
945    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
946    type std::num::NonZeroI128;
947
948    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
949    #[async_proto(via = F32Proxy)]
950    type f32;
951
952    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
953    #[async_proto(via = F64Proxy)]
954    type f64;
955
956    #[async_proto(where(Idx: Protocol + Send + Sync))]
957    struct Range<Idx> {
958        start: Idx,
959        end: Idx,
960    }
961
962    #[async_proto(where(Idx: Protocol + Sync))]
963    struct RangeFrom<Idx> {
964        start: Idx,
965    }
966
967    #[async_proto(where(Idx: Protocol + Sync))]
968    struct RangeTo<Idx> {
969        end: Idx,
970    }
971
972    #[async_proto(where(Idx: Protocol + Sync))]
973    struct RangeToInclusive<Idx> {
974        end: Idx,
975    }
976
977    #[async_proto(where(T: Protocol + Sync))]
978    enum Option<T> {
979        None,
980        Some(T),
981    }
982
983    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
984    enum Result<T, E> {
985        Ok(T),
986        Err(E),
987    }
988
989    enum std::convert::Infallible {}
990
991    #[async_proto(where(T: Sync))]
992    struct std::marker::PhantomData<T>;
993
994    struct std::ops::RangeFull;
995
996    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
997    #[async_proto(via = DurationProxy)]
998    type std::time::Duration;
999}