Skip to main content

async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "nonempty-collections")] mod nonempty_collections;
68#[cfg(feature = "os_info")] mod os_info;
69#[cfg(feature = "rust_decimal")] mod rust_decimal;
70#[cfg(feature = "semver")] mod semver;
71#[cfg(feature = "serde_json")] mod serde_json;
72#[cfg(feature = "serenity")] mod serenity;
73#[cfg(feature = "hematite-nbt")] mod hematite_nbt;
74#[cfg(feature = "url")] mod url;
75#[cfg(feature = "uuid")] mod uuid;
76
77async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
78    let len = match max_len {
79        0 => 0,
80        1..=255 => u8::read(stream).await?.into(),
81        256..=65_535 => u16::read(stream).await?.into(),
82        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
83        _ => u64::read(stream).await?,
84    };
85    if len > max_len {
86        Err(ReadError {
87            context: error_ctx(),
88            kind: ReadErrorKind::MaxLen { len, max_len },
89        })
90    } else {
91        usize::try_from(len).map_err(|e| ReadError {
92            context: error_ctx(),
93            kind: e.into(),
94        })
95    }
96}
97
98async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
99    let len = u64::try_from(len).map_err(|e| WriteError {
100        context: error_ctx(),
101        kind: e.into(),
102    })?;
103    if len > max_len {
104        return Err(WriteError {
105            context: error_ctx(),
106            kind: WriteErrorKind::MaxLen { len, max_len },
107        })
108    }
109    match max_len {
110        0 => {}
111        1..=255 => (len as u8).write(sink).await?,
112        256..=65_535 => (len as u16).write(sink).await?,
113        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
114        _ => len.write(sink).await?,
115    }
116    Ok(())
117}
118
119fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
120    let len = match max_len {
121        0 => 0,
122        1..=255 => u8::read_sync(stream)?.into(),
123        256..=65_535 => u16::read_sync(stream)?.into(),
124        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
125        _ => u64::read_sync(stream)?,
126    };
127    if len > max_len {
128        Err(ReadError {
129            context: error_ctx(),
130            kind: ReadErrorKind::MaxLen { len, max_len },
131        })
132    } else {
133        usize::try_from(len).map_err(|e| ReadError {
134            context: error_ctx(),
135            kind: e.into(),
136        })
137    }
138}
139
140fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
141    let len = u64::try_from(len).map_err(|e| WriteError {
142        context: error_ctx(),
143        kind: e.into(),
144    })?;
145    if len > max_len {
146        return Err(WriteError {
147            context: error_ctx(),
148            kind: WriteErrorKind::MaxLen { len, max_len },
149        })
150    }
151    match max_len {
152        0 => {}
153        1..=255 => (len as u8).write_sync(sink)?,
154        256..=65_535 => (len as u16).write_sync(sink)?,
155        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
156        _ => len.write_sync(sink)?,
157    }
158    Ok(())
159}
160
161macro_rules! impl_protocol_primitive {
162    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
163        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
164        impl Protocol for $ty {
165            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
166                Box::pin(async move {
167                    Ok(stream.$read().await.map_err(|e| ReadError {
168                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
169                        kind: e.into(),
170                    })?)
171                })
172            }
173
174            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
175                Box::pin(async move {
176                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
177                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
178                        kind: e.into(),
179                    })?)
180                })
181            }
182
183            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
184                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
185                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
186                    kind: e.into(),
187                })?)
188            }
189
190            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
191                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
192                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
193                    kind: e.into(),
194                })?)
195            }
196        }
197    };
198}
199
200impl_protocol_primitive!(u8, read_u8, write_u8);
201impl_protocol_primitive!(i8, read_i8, write_i8);
202impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
203impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
204impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
205impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
206impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
207impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
208impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
209impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
210
211impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
212    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
213        Box::pin(async move {
214            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
215        })
216    }
217
218    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
219        Box::pin(async move {
220            self.start().write(sink).await?;
221            self.end().write(sink).await?;
222            Ok(())
223        })
224    }
225
226    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
227        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
228    }
229
230    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
231        self.start().write_sync(sink)?;
232        self.end().write_sync(sink)?;
233        Ok(())
234    }
235}
236
237macro_rules! impl_protocol_tuple {
238    ($($ty:ident),*) => {
239        #[allow(unused)]
240        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
241            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
242                Box::pin(async move {
243                    Ok((
244                        $($ty::read(stream).await?,)*
245                    ))
246                })
247            }
248
249            #[allow(non_snake_case)]
250            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
251                Box::pin(async move {
252                    let ($($ty,)*) = self;
253                    $(
254                        $ty.write(sink).await?;
255                    )*
256                    Ok(())
257                })
258            }
259
260            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
261                Ok((
262                    $($ty::read_sync(stream)?,)*
263                ))
264            }
265
266            #[allow(non_snake_case)]
267            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
268                let ($($ty,)*) = self;
269                $(
270                    $ty.write_sync(sink)?;
271                )*
272                Ok(())
273            }
274        }
275    };
276}
277
278impl_protocol_tuple!();
279impl_protocol_tuple!(A);
280impl_protocol_tuple!(A, B);
281impl_protocol_tuple!(A, B, C);
282impl_protocol_tuple!(A, B, C, D);
283impl_protocol_tuple!(A, B, C, D, E);
284impl_protocol_tuple!(A, B, C, D, E, F);
285impl_protocol_tuple!(A, B, C, D, E, F, G);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
288impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
289impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
290impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
291
292impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
293    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
294        Box::pin(async move {
295            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
296                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
297                kind: e.into(),
298            })?;
299            for _ in 0..N {
300                vec.push(T::read(stream).await?);
301            }
302            Ok(match vec.try_into() {
303                Ok(array) => array,
304                Err(_) => panic!("wrong array length"),
305            })
306        })
307    }
308
309    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
310        Box::pin(async move {
311            for elt in self {
312                elt.write(sink).await?;
313            }
314            Ok(())
315        })
316    }
317
318    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
319        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
320            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
321            kind: e.into(),
322        })?;
323        for _ in 0..N {
324            vec.push(T::read_sync(stream)?);
325        }
326        Ok(match vec.try_into() {
327            Ok(array) => array,
328            Err(_) => panic!("wrong array length"),
329        })
330    }
331
332    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
333        for elt in self {
334            elt.write_sync(sink)?;
335        }
336        Ok(())
337    }
338}
339
340/// Represented as one byte, with `0` for `false` and `1` for `true`.
341impl Protocol for bool {
342    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
343        Box::pin(async move {
344            Ok(match u8::read(stream).await? {
345                0 => false,
346                1 => true,
347                n => return Err(ReadError {
348                    context: ErrorContext::BuiltIn { for_type: "bool" },
349                    kind: ReadErrorKind::UnknownVariant8(n),
350                }),
351            })
352        })
353    }
354
355    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
356        Box::pin(async move {
357            if *self { 1u8 } else { 0 }.write(sink).await
358        })
359    }
360
361    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
362        Ok(match u8::read_sync(stream)? {
363            0 => false,
364            1 => true,
365            n => return Err(ReadError {
366                context: ErrorContext::BuiltIn { for_type: "bool" },
367                kind: ReadErrorKind::UnknownVariant8(n),
368            }),
369        })
370    }
371
372    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
373        if *self { 1u8 } else { 0 }.write_sync(sink)
374    }
375}
376
377impl<T: Protocol> Protocol for Box<T> {
378    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
379        Box::pin(async move {
380            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
381                context: ErrorContext::BuiltIn { for_type: "Box" },
382                kind: e.into(),
383            })?)
384        })
385    }
386
387    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
388        (**self).write(sink)
389    }
390
391    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
392        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
393            context: ErrorContext::BuiltIn { for_type: "Box" },
394            kind: e.into(),
395        })?)
396    }
397
398    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
399        (**self).write_sync(sink)
400    }
401}
402
403/// A vector is prefixed with the length as a [`u64`].
404///
405/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
406/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
407impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
408        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
409        Self::read_length_prefixed(stream, u64::MAX)
410    }
411
412    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
413        self.write_length_prefixed(sink, u64::MAX)
414    }
415
416    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
417        Self::read_length_prefixed_sync(stream, u64::MAX)
418    }
419
420    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
421        self.write_length_prefixed_sync(sink, u64::MAX)
422    }
423}
424
425/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
426/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
427impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
428    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
429        Box::pin(async move {
430            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
431            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
432                context: ErrorContext::BuiltIn { for_type: "Vec" },
433                kind: e.into(),
434            })?;
435            for _ in 0..len {
436                buf.push(T::read(stream).await?);
437            }
438            Ok(buf)
439        })
440    }
441
442    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
443        Box::pin(async move {
444            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
445            for elt in self {
446                elt.write(sink).await?;
447            }
448            Ok(())
449        })
450    }
451
452    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
453        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
454        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
455            context: ErrorContext::BuiltIn { for_type: "Vec" },
456            kind: e.into(),
457        })?;
458        for _ in 0..len {
459            buf.push(T::read_sync(stream)?);
460        }
461        Ok(buf)
462    }
463
464    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
465        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
466        for elt in self {
467            elt.write_sync(sink)?;
468        }
469        Ok(())
470    }
471}
472
473/// A set is prefixed with the length as a [`u64`].
474impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
475        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
476        Self::read_length_prefixed(stream, u64::MAX)
477    }
478
479    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
480        self.write_length_prefixed(sink, u64::MAX)
481    }
482
483    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
484        Self::read_length_prefixed_sync(stream, u64::MAX)
485    }
486
487    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
488        self.write_length_prefixed_sync(sink, u64::MAX)
489    }
490}
491
492impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
493    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
494        Box::pin(async move {
495            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
496            let mut set = Self::default();
497            for _ in 0..len {
498                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
499            }
500            Ok(set)
501        })
502    }
503
504    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
505        Box::pin(async move {
506            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
507            for elt in self {
508                elt.write(sink).await?;
509            }
510            Ok(())
511        })
512    }
513
514    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
515        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
516        for _ in 0..len {
517            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
518        }
519        Ok(set)
520    }
521
522    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
523        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
524        for elt in self {
525            elt.write_sync(sink)?;
526        }
527        Ok(())
528    }
529}
530
531/// A set is prefixed with the length as a [`u64`].
532impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
533    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
534        Self::read_length_prefixed(stream, u64::MAX)
535    }
536
537    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
538        self.write_length_prefixed(sink, u64::MAX)
539    }
540
541    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
542        Self::read_length_prefixed_sync(stream, u64::MAX)
543    }
544
545    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
546        self.write_length_prefixed_sync(sink, u64::MAX)
547    }
548}
549
550impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
551    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
552        Box::pin(async move {
553            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
554            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
555            for _ in 0..len {
556                set.insert(T::read(stream).await?);
557            }
558            Ok(set)
559        })
560    }
561
562    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
563        Box::pin(async move {
564            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
565            for elt in self {
566                elt.write(sink).await?;
567            }
568            Ok(())
569        })
570    }
571
572    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
573        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
574        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
575        for _ in 0..len {
576            set.insert(T::read_sync(stream)?);
577        }
578        Ok(set)
579    }
580
581    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
582        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
583        for elt in self {
584            elt.write_sync(sink)?;
585        }
586        Ok(())
587    }
588}
589
590/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
591impl Protocol for String {
592    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
593        Self::read_length_prefixed(stream, u64::MAX)
594    }
595
596    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
597        self.write_length_prefixed(sink, u64::MAX)
598    }
599
600    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
601        Self::read_length_prefixed_sync(stream, u64::MAX)
602    }
603
604    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
605        self.write_length_prefixed_sync(sink, u64::MAX)
606    }
607}
608
609/// A string is encoded in UTF-8 and prefixed with the length in bytes.
610impl LengthPrefixed for String {
611    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
612        Box::pin(async move {
613            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
614            let mut buf = Vec::default();
615            buf.try_resize(len, 0).map_err(|e| ReadError {
616                context: ErrorContext::BuiltIn { for_type: "String" },
617                kind: e.into(),
618            })?;
619            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
620                context: ErrorContext::BuiltIn { for_type: "String" },
621                kind: e.into(),
622            })?;
623            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
624                context: ErrorContext::BuiltIn { for_type: "String" },
625                kind: e.into(),
626            })?)
627        })
628    }
629
630    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
631        Box::pin(async move {
632            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
633            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
634                context: ErrorContext::BuiltIn { for_type: "String" },
635                kind: e.into(),
636            })?;
637            Ok(())
638        })
639    }
640
641    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
642        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
643        let mut buf = Vec::default();
644        buf.try_resize(len, 0).map_err(|e| ReadError {
645            context: ErrorContext::BuiltIn { for_type: "String" },
646            kind: e.into(),
647        })?;
648        stream.read_exact(&mut buf).map_err(|e| ReadError {
649            context: ErrorContext::BuiltIn { for_type: "String" },
650            kind: e.into(),
651        })?;
652        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
653            context: ErrorContext::BuiltIn { for_type: "String" },
654            kind: e.into(),
655        })?)
656    }
657
658    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
659        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
660        sink.write(self.as_bytes()).map_err(|e| WriteError {
661            context: ErrorContext::BuiltIn { for_type: "String" },
662            kind: e.into(),
663        })?;
664        Ok(())
665    }
666}
667
668impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
669    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
670        Self::read_length_prefixed(stream, u64::MAX)
671    }
672
673    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
674        self.write_length_prefixed(sink, u64::MAX)
675    }
676
677    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
678        Self::read_length_prefixed_sync(stream, u64::MAX)
679    }
680
681    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
682        self.write_length_prefixed_sync(sink, u64::MAX)
683    }
684}
685
686impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
687    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
688        Box::pin(async move {
689            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
690            let mut map = Self::default();
691            for _ in 0..len {
692                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
693            }
694            Ok(map)
695        })
696    }
697
698    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
699        Box::pin(async move {
700            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
701            for (k, v) in self {
702                k.write(sink).await?;
703                v.write(sink).await?;
704            }
705            Ok(())
706        })
707    }
708
709    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
710        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
711        let mut map = Self::default();
712        for _ in 0..len {
713            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
714        }
715        Ok(map)
716    }
717
718    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
719        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
720        for (k, v) in self {
721            k.write_sync(sink)?;
722            v.write_sync(sink)?;
723        }
724        Ok(())
725    }
726}
727
728/// A map is prefixed with the length as a [`u64`].
729impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
730    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
731        Self::read_length_prefixed(stream, u64::MAX)
732    }
733
734    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
735        self.write_length_prefixed(sink, u64::MAX)
736    }
737
738    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
739        Self::read_length_prefixed_sync(stream, u64::MAX)
740    }
741
742    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
743        self.write_length_prefixed_sync(sink, u64::MAX)
744    }
745}
746
747impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
748    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
749        Box::pin(async move {
750            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
751            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
752            for _ in 0..len {
753                map.insert(K::read(stream).await?, V::read(stream).await?);
754            }
755            Ok(map)
756        })
757    }
758
759    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
760        Box::pin(async move {
761            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
762            for (k, v) in self {
763                k.write(sink).await?;
764                v.write(sink).await?;
765            }
766            Ok(())
767        })
768    }
769
770    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
771        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
772        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
773        for _ in 0..len {
774            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
775        }
776        Ok(map)
777    }
778
779    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
780        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
781        for (k, v) in self {
782            k.write_sync(sink)?;
783            v.write_sync(sink)?;
784        }
785        Ok(())
786    }
787}
788
789/// A cow is represented like its owned variant.
790///
791/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
792impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
793where B::Owned: Protocol + Send + Sync {
794    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
795        Box::pin(async move {
796            Ok(Self::Owned(B::Owned::read(stream).await?))
797        })
798    }
799
800    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
801        Box::pin(async move {
802            match self {
803                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
804                Self::Owned(owned) => owned.write(sink).await?,
805            }
806            Ok(())
807        })
808    }
809
810    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
811        Ok(Self::Owned(B::Owned::read_sync(stream)?))
812    }
813
814    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
815        match self {
816            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
817            Self::Owned(owned) => owned.write_sync(sink)?,
818        }
819        Ok(())
820    }
821}
822
823/// A cow is represented like its owned variant.
824///
825/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
826impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
827where B::Owned: LengthPrefixed + Send + Sync {
828    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
829        Box::pin(async move {
830            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
831        })
832    }
833
834    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
835        Box::pin(async move {
836            match self {
837                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
838                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
839            }
840            Ok(())
841        })
842    }
843
844    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
845        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
846    }
847
848    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
849        match self {
850            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
851            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
852        }
853        Ok(())
854    }
855}
856
857#[derive(Protocol)]
858#[async_proto(internal)]
859struct F32Proxy([u8; 4]);
860
861impl From<F32Proxy> for f32 {
862    fn from(F32Proxy(bytes): F32Proxy) -> Self {
863        Self::from_be_bytes(bytes)
864    }
865}
866
867impl<'a> From<&'a f32> for F32Proxy {
868    fn from(val: &f32) -> Self {
869        Self(val.to_be_bytes())
870    }
871}
872
873#[derive(Protocol)]
874#[async_proto(internal)]
875struct F64Proxy([u8; 8]);
876
877impl From<F64Proxy> for f64 {
878    fn from(F64Proxy(bytes): F64Proxy) -> Self {
879        Self::from_be_bytes(bytes)
880    }
881}
882
883impl<'a> From<&'a f64> for F64Proxy {
884    fn from(val: &f64) -> Self {
885        Self(val.to_be_bytes())
886    }
887}
888
889#[derive(Protocol)]
890#[async_proto(internal)]
891struct DurationProxy {
892    secs: u64,
893    subsec_nanos: u32,
894}
895
896impl From<DurationProxy> for std::time::Duration {
897    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
898        Self::new(secs, subsec_nanos)
899    }
900}
901
902impl<'a> From<&'a std::time::Duration> for DurationProxy {
903    fn from(duration: &std::time::Duration) -> Self {
904        Self {
905            secs: duration.as_secs(),
906            subsec_nanos: duration.subsec_nanos(),
907        }
908    }
909}
910
911impl_protocol_for! {
912    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
913    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
914    type std::num::NonZeroU8;
915
916    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
917    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
918    type std::num::NonZeroI8;
919
920    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
921    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
922    type std::num::NonZeroU16;
923
924    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
925    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
926    type std::num::NonZeroI16;
927
928    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
929    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
930    type std::num::NonZeroU32;
931
932    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
933    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
934    type std::num::NonZeroI32;
935
936    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
937    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
938    type std::num::NonZeroU64;
939
940    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
941    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
942    type std::num::NonZeroI64;
943
944    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
945    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
946    type std::num::NonZeroU128;
947
948    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
949    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
950    type std::num::NonZeroI128;
951
952    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
953    #[async_proto(via = F32Proxy)]
954    type f32;
955
956    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
957    #[async_proto(via = F64Proxy)]
958    type f64;
959
960    #[async_proto(where(Idx: Protocol + Send + Sync))]
961    struct Range<Idx> {
962        start: Idx,
963        end: Idx,
964    }
965
966    #[async_proto(where(Idx: Protocol + Sync))]
967    struct RangeFrom<Idx> {
968        start: Idx,
969    }
970
971    #[async_proto(where(Idx: Protocol + Sync))]
972    struct RangeTo<Idx> {
973        end: Idx,
974    }
975
976    #[async_proto(where(Idx: Protocol + Sync))]
977    struct RangeToInclusive<Idx> {
978        end: Idx,
979    }
980
981    #[async_proto(where(T: Protocol + Sync))]
982    enum Option<T> {
983        None,
984        Some(T),
985    }
986
987    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
988    enum Result<T, E> {
989        Ok(T),
990        Err(E),
991    }
992
993    enum std::convert::Infallible {}
994
995    #[async_proto(where(T: Sync))]
996    struct std::marker::PhantomData<T>;
997
998    struct std::ops::RangeFull;
999
1000    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
1001    #[async_proto(via = DurationProxy)]
1002    type std::time::Duration;
1003}