async_proto/impls/
mod.rs

1//! [`Protocol`] implementations for primitive and [`std`] types.
2
3#![allow(missing_docs)]
4
5use {
6    std::{
7        collections::{
8            BTreeMap,
9            BTreeSet,
10            HashMap,
11            HashSet,
12        },
13        convert::{
14            TryFrom as _,
15            TryInto as _,
16        },
17        future::Future,
18        hash::Hash,
19        io::prelude::*,
20        ops::{
21            Range,
22            RangeFrom,
23            RangeInclusive,
24            RangeTo,
25            RangeToInclusive,
26        },
27        pin::Pin,
28    },
29    byteorder::{
30        NetworkEndian,
31        ReadBytesExt as _,
32        WriteBytesExt as _,
33    },
34    fallible_collections::{
35        FallibleBox,
36        FallibleVec,
37    },
38    tokio::io::{
39        AsyncRead,
40        AsyncReadExt as _,
41        AsyncWrite,
42        AsyncWriteExt as _,
43    },
44    async_proto_derive::impl_protocol_for,
45    crate::{
46        ErrorContext,
47        LengthPrefixed,
48        Protocol,
49        ReadError,
50        ReadErrorKind,
51        WriteError,
52        WriteErrorKind,
53    },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "chrono")] mod chrono;
59#[cfg(feature = "chrono-tz")] mod chrono_tz;
60#[cfg(feature = "doubloon")] mod doubloon;
61#[cfg(feature = "either")] mod either;
62#[cfg(feature = "enumset")] mod enumset;
63#[cfg(feature = "git2")] mod git2;
64#[cfg(feature = "gix-hash")] mod gix_hash;
65#[cfg(feature = "noisy_float")] mod noisy_float;
66#[cfg(feature = "rust_decimal")] mod rust_decimal;
67#[cfg(feature = "semver")] mod semver;
68#[cfg(feature = "serde_json")] mod serde_json;
69#[cfg(feature = "serenity")] mod serenity;
70#[cfg(feature = "uuid")] mod uuid;
71
72async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
73    let len = match max_len {
74        0 => 0,
75        1..=255 => u8::read(stream).await?.into(),
76        256..=65_535 => u16::read(stream).await?.into(),
77        65_536..=4_294_967_295 => u32::read(stream).await?.into(),
78        _ => u64::read(stream).await?,
79    };
80    if len > max_len {
81        Err(ReadError {
82            context: error_ctx(),
83            kind: ReadErrorKind::MaxLen { len, max_len },
84        })
85    } else {
86        usize::try_from(len).map_err(|e| ReadError {
87            context: error_ctx(),
88            kind: e.into(),
89        })
90    }
91}
92
93async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
94    let len = u64::try_from(len).map_err(|e| WriteError {
95        context: error_ctx(),
96        kind: e.into(),
97    })?;
98    if len > max_len {
99        return Err(WriteError {
100            context: error_ctx(),
101            kind: WriteErrorKind::MaxLen { len, max_len },
102        })
103    }
104    match max_len {
105        0 => {}
106        1..=255 => (len as u8).write(sink).await?,
107        256..=65_535 => (len as u16).write(sink).await?,
108        65_536..=4_294_967_295 => (len as u32).write(sink).await?,
109        _ => len.write(sink).await?,
110    }
111    Ok(())
112}
113
114fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
115    let len = match max_len {
116        0 => 0,
117        1..=255 => u8::read_sync(stream)?.into(),
118        256..=65_535 => u16::read_sync(stream)?.into(),
119        65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
120        _ => u64::read_sync(stream)?,
121    };
122    if len > max_len {
123        Err(ReadError {
124            context: error_ctx(),
125            kind: ReadErrorKind::MaxLen { len, max_len },
126        })
127    } else {
128        usize::try_from(len).map_err(|e| ReadError {
129            context: error_ctx(),
130            kind: e.into(),
131        })
132    }
133}
134
135fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
136    let len = u64::try_from(len).map_err(|e| WriteError {
137        context: error_ctx(),
138        kind: e.into(),
139    })?;
140    if len > max_len {
141        return Err(WriteError {
142            context: error_ctx(),
143            kind: WriteErrorKind::MaxLen { len, max_len },
144        })
145    }
146    match max_len {
147        0 => {}
148        1..=255 => (len as u8).write_sync(sink)?,
149        256..=65_535 => (len as u16).write_sync(sink)?,
150        65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
151        _ => len.write_sync(sink)?,
152    }
153    Ok(())
154}
155
156macro_rules! impl_protocol_primitive {
157    ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
158        /// Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format.
159        impl Protocol for $ty {
160            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
161                Box::pin(async move {
162                    Ok(stream.$read().await.map_err(|e| ReadError {
163                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
164                        kind: e.into(),
165                    })?)
166                })
167            }
168
169            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
170                Box::pin(async move {
171                    Ok(sink.$write(*self).await.map_err(|e| WriteError {
172                        context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
173                        kind: e.into(),
174                    })?)
175                })
176            }
177
178            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
179                Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
180                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
181                    kind: e.into(),
182                })?)
183            }
184
185            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
186                Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
187                    context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
188                    kind: e.into(),
189                })?)
190            }
191        }
192    };
193}
194
195impl_protocol_primitive!(u8, read_u8, write_u8);
196impl_protocol_primitive!(i8, read_i8, write_i8);
197impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
198impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
199impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
200impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
201impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
202impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
203impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
204impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
205
206impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { //TODO derive
207    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
208        Box::pin(async move {
209            Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
210        })
211    }
212
213    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
214        Box::pin(async move {
215            self.start().write(sink).await?;
216            self.end().write(sink).await?;
217            Ok(())
218        })
219    }
220
221    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
222        Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
223    }
224
225    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
226        self.start().write_sync(sink)?;
227        self.end().write_sync(sink)?;
228        Ok(())
229    }
230}
231
232macro_rules! impl_protocol_tuple {
233    ($($ty:ident),*) => {
234        #[allow(unused)]
235        impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
236            fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
237                Box::pin(async move {
238                    Ok((
239                        $($ty::read(stream).await?,)*
240                    ))
241                })
242            }
243
244            #[allow(non_snake_case)]
245            fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
246                Box::pin(async move {
247                    let ($($ty,)*) = self;
248                    $(
249                        $ty.write(sink).await?;
250                    )*
251                    Ok(())
252                })
253            }
254
255            fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
256                Ok((
257                    $($ty::read_sync(stream)?,)*
258                ))
259            }
260
261            #[allow(non_snake_case)]
262            fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
263                let ($($ty,)*) = self;
264                $(
265                    $ty.write_sync(sink)?;
266                )*
267                Ok(())
268            }
269        }
270    };
271}
272
273impl_protocol_tuple!();
274impl_protocol_tuple!(A);
275impl_protocol_tuple!(A, B);
276impl_protocol_tuple!(A, B, C);
277impl_protocol_tuple!(A, B, C, D);
278impl_protocol_tuple!(A, B, C, D, E);
279impl_protocol_tuple!(A, B, C, D, E, F);
280impl_protocol_tuple!(A, B, C, D, E, F, G);
281impl_protocol_tuple!(A, B, C, D, E, F, G, H);
282impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
283impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
284impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
286
287impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
288    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
289        Box::pin(async move {
290            let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
291                context: ErrorContext::BuiltIn { for_type: "[T; N]" },
292                kind: e.into(),
293            })?;
294            for _ in 0..N {
295                vec.push(T::read(stream).await?);
296            }
297            Ok(match vec.try_into() {
298                Ok(array) => array,
299                Err(_) => panic!("wrong array length"),
300            })
301        })
302    }
303
304    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
305        Box::pin(async move {
306            for elt in self {
307                elt.write(sink).await?;
308            }
309            Ok(())
310        })
311    }
312
313    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
314        let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
315            context: ErrorContext::BuiltIn { for_type: "[T; N]" },
316            kind: e.into(),
317        })?;
318        for _ in 0..N {
319            vec.push(T::read_sync(stream)?);
320        }
321        Ok(match vec.try_into() {
322            Ok(array) => array,
323            Err(_) => panic!("wrong array length"),
324        })
325    }
326
327    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
328        for elt in self {
329            elt.write_sync(sink)?;
330        }
331        Ok(())
332    }
333}
334
335/// Represented as one byte, with `0` for `false` and `1` for `true`.
336impl Protocol for bool {
337    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
338        Box::pin(async move {
339            Ok(match u8::read(stream).await? {
340                0 => false,
341                1 => true,
342                n => return Err(ReadError {
343                    context: ErrorContext::BuiltIn { for_type: "bool" },
344                    kind: ReadErrorKind::UnknownVariant8(n),
345                }),
346            })
347        })
348    }
349
350    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
351        Box::pin(async move {
352            if *self { 1u8 } else { 0 }.write(sink).await
353        })
354    }
355
356    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
357        Ok(match u8::read_sync(stream)? {
358            0 => false,
359            1 => true,
360            n => return Err(ReadError {
361                context: ErrorContext::BuiltIn { for_type: "bool" },
362                kind: ReadErrorKind::UnknownVariant8(n),
363            }),
364        })
365    }
366
367    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
368        if *self { 1u8 } else { 0 }.write_sync(sink)
369    }
370}
371
372impl<T: Protocol> Protocol for Box<T> {
373    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
374        Box::pin(async move {
375            Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
376                context: ErrorContext::BuiltIn { for_type: "Box" },
377                kind: e.into(),
378            })?)
379        })
380    }
381
382    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
383        (**self).write(sink)
384    }
385
386    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
387        Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
388            context: ErrorContext::BuiltIn { for_type: "Box" },
389            kind: e.into(),
390        })?)
391    }
392
393    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
394        (**self).write_sync(sink)
395    }
396}
397
398/// A vector is prefixed with the length as a [`u64`].
399///
400/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
401/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
402impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
403        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
404        Self::read_length_prefixed(stream, u64::MAX)
405    }
406
407    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
408        self.write_length_prefixed(sink, u64::MAX)
409    }
410
411    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
412        Self::read_length_prefixed_sync(stream, u64::MAX)
413    }
414
415    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
416        self.write_length_prefixed_sync(sink, u64::MAX)
417    }
418}
419
420/// Note that due to Rust's lack of [specialization](https://github.com/rust-lang/rust/issues/31844), this implementation is inefficient for `Vec<u8>`.
421/// Prefer [`Bytes`](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) if possible.
422impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
423    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
424        Box::pin(async move {
425            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
426            let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
427                context: ErrorContext::BuiltIn { for_type: "Vec" },
428                kind: e.into(),
429            })?;
430            for _ in 0..len {
431                buf.push(T::read(stream).await?);
432            }
433            Ok(buf)
434        })
435    }
436
437    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
438        Box::pin(async move {
439            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
440            for elt in self {
441                elt.write(sink).await?;
442            }
443            Ok(())
444        })
445    }
446
447    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
448        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
449        let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
450            context: ErrorContext::BuiltIn { for_type: "Vec" },
451            kind: e.into(),
452        })?;
453        for _ in 0..len {
454            buf.push(T::read_sync(stream)?);
455        }
456        Ok(buf)
457    }
458
459    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
460        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
461        for elt in self {
462            elt.write_sync(sink)?;
463        }
464        Ok(())
465    }
466}
467
468/// A set is prefixed with the length as a [`u64`].
469impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
470        fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
471        Self::read_length_prefixed(stream, u64::MAX)
472    }
473
474    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
475        self.write_length_prefixed(sink, u64::MAX)
476    }
477
478    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
479        Self::read_length_prefixed_sync(stream, u64::MAX)
480    }
481
482    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
483        self.write_length_prefixed_sync(sink, u64::MAX)
484    }
485}
486
487impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
488    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
489        Box::pin(async move {
490            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
491            let mut set = Self::default();
492            for _ in 0..len {
493                set.insert(T::read(stream).await?); //TODO use fallible allocation once available
494            }
495            Ok(set)
496        })
497    }
498
499    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
500        Box::pin(async move {
501            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
502            for elt in self {
503                elt.write(sink).await?;
504            }
505            Ok(())
506        })
507    }
508
509    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
510        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;        let mut set = Self::default();
511        for _ in 0..len {
512            set.insert(T::read_sync(stream)?); //TODO use fallible allocation once available
513        }
514        Ok(set)
515    }
516
517    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
518        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
519        for elt in self {
520            elt.write_sync(sink)?;
521        }
522        Ok(())
523    }
524}
525
526impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
527    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
528        Self::read_length_prefixed(stream, u64::MAX)
529    }
530
531    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
532        self.write_length_prefixed(sink, u64::MAX)
533    }
534
535    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
536        Self::read_length_prefixed_sync(stream, u64::MAX)
537    }
538
539    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
540        self.write_length_prefixed_sync(sink, u64::MAX)
541    }
542}
543
544impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
545    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
546        Box::pin(async move {
547            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
548            let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
549            for _ in 0..len {
550                set.insert(T::read(stream).await?);
551            }
552            Ok(set)
553        })
554    }
555
556    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
557        Box::pin(async move {
558            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
559            for elt in self {
560                elt.write(sink).await?;
561            }
562            Ok(())
563        })
564    }
565
566    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
567        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
568        let mut set = Self::with_capacity(len); //TODO use fallible allocation once available
569        for _ in 0..len {
570            set.insert(T::read_sync(stream)?);
571        }
572        Ok(set)
573    }
574
575    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
576        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
577        for elt in self {
578            elt.write_sync(sink)?;
579        }
580        Ok(())
581    }
582}
583
584/// A string is encoded in UTF-8 and prefixed with the length in bytes as a [`u64`].
585impl Protocol for String {
586    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
587        Self::read_length_prefixed(stream, u64::MAX)
588    }
589
590    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
591        self.write_length_prefixed(sink, u64::MAX)
592    }
593
594    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
595        Self::read_length_prefixed_sync(stream, u64::MAX)
596    }
597
598    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
599        self.write_length_prefixed_sync(sink, u64::MAX)
600    }
601}
602
603/// A string is encoded in UTF-8 and prefixed with the length in bytes.
604impl LengthPrefixed for String {
605    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
606        Box::pin(async move {
607            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
608            let mut buf = Vec::default();
609            buf.try_resize(len, 0).map_err(|e| ReadError {
610                context: ErrorContext::BuiltIn { for_type: "String" },
611                kind: e.into(),
612            })?;
613            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
614                context: ErrorContext::BuiltIn { for_type: "String" },
615                kind: e.into(),
616            })?;
617            Ok(Self::from_utf8(buf).map_err(|e| ReadError {
618                context: ErrorContext::BuiltIn { for_type: "String" },
619                kind: e.into(),
620            })?)
621        })
622    }
623
624    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
625        Box::pin(async move {
626            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
627            sink.write(self.as_bytes()).await.map_err(|e| WriteError {
628                context: ErrorContext::BuiltIn { for_type: "String" },
629                kind: e.into(),
630            })?;
631            Ok(())
632        })
633    }
634
635    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
636        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
637        let mut buf = Vec::default();
638        buf.try_resize(len, 0).map_err(|e| ReadError {
639            context: ErrorContext::BuiltIn { for_type: "String" },
640            kind: e.into(),
641        })?;
642        stream.read_exact(&mut buf).map_err(|e| ReadError {
643            context: ErrorContext::BuiltIn { for_type: "String" },
644            kind: e.into(),
645        })?;
646        Ok(Self::from_utf8(buf).map_err(|e| ReadError {
647            context: ErrorContext::BuiltIn { for_type: "String" },
648            kind: e.into(),
649        })?)
650    }
651
652    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
653        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
654        sink.write(self.as_bytes()).map_err(|e| WriteError {
655            context: ErrorContext::BuiltIn { for_type: "String" },
656            kind: e.into(),
657        })?;
658        Ok(())
659    }
660}
661
662impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
663    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
664        Self::read_length_prefixed(stream, u64::MAX)
665    }
666
667    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
668        self.write_length_prefixed(sink, u64::MAX)
669    }
670
671    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
672        Self::read_length_prefixed_sync(stream, u64::MAX)
673    }
674
675    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
676        self.write_length_prefixed_sync(sink, u64::MAX)
677    }
678}
679
680impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
681    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
682        Box::pin(async move {
683            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
684            let mut map = Self::default();
685            for _ in 0..len {
686                map.insert(K::read(stream).await?, V::read(stream).await?); //TODO use fallible allocation once available
687            }
688            Ok(map)
689        })
690    }
691
692    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
693        Box::pin(async move {
694            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
695            for (k, v) in self {
696                k.write(sink).await?;
697                v.write(sink).await?;
698            }
699            Ok(())
700        })
701    }
702
703    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
704        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
705        let mut map = Self::default();
706        for _ in 0..len {
707            map.insert(K::read_sync(stream)?, V::read_sync(stream)?); //TODO use fallible allocation once available
708        }
709        Ok(map)
710    }
711
712    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
713        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
714        for (k, v) in self {
715            k.write_sync(sink)?;
716            v.write_sync(sink)?;
717        }
718        Ok(())
719    }
720}
721
722/// A map is prefixed with the length as a [`u64`].
723impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
724    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
725        Self::read_length_prefixed(stream, u64::MAX)
726    }
727
728    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
729        self.write_length_prefixed(sink, u64::MAX)
730    }
731
732    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
733        Self::read_length_prefixed_sync(stream, u64::MAX)
734    }
735
736    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
737        self.write_length_prefixed_sync(sink, u64::MAX)
738    }
739}
740
741impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
742    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
743        Box::pin(async move {
744            let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
745            let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
746            for _ in 0..len {
747                map.insert(K::read(stream).await?, V::read(stream).await?);
748            }
749            Ok(map)
750        })
751    }
752
753    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
754        Box::pin(async move {
755            write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
756            for (k, v) in self {
757                k.write(sink).await?;
758                v.write(sink).await?;
759            }
760            Ok(())
761        })
762    }
763
764    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
765        let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
766        let mut map = Self::with_capacity(len); //TODO use fallible allocation once available
767        for _ in 0..len {
768            map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
769        }
770        Ok(map)
771    }
772
773    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
774        write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
775        for (k, v) in self {
776            k.write_sync(sink)?;
777            v.write_sync(sink)?;
778        }
779        Ok(())
780    }
781}
782
783/// A cow is represented like its owned variant.
784///
785/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
786impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
787where B::Owned: Protocol + Send + Sync {
788    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
789        Box::pin(async move {
790            Ok(Self::Owned(B::Owned::read(stream).await?))
791        })
792    }
793
794    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
795        Box::pin(async move {
796            match self {
797                Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
798                Self::Owned(owned) => owned.write(sink).await?,
799            }
800            Ok(())
801        })
802    }
803
804    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
805        Ok(Self::Owned(B::Owned::read_sync(stream)?))
806    }
807
808    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
809        match self {
810            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
811            Self::Owned(owned) => owned.write_sync(sink)?,
812        }
813        Ok(())
814    }
815}
816
817/// A cow is represented like its owned variant.
818///
819/// Note that due to a restriction in the type system, writing a borrowed cow requires cloning it.
820impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
821where B::Owned: LengthPrefixed + Send + Sync {
822    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
823        Box::pin(async move {
824            Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
825        })
826    }
827
828    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
829        Box::pin(async move {
830            match self {
831                Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
832                Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
833            }
834            Ok(())
835        })
836    }
837
838    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
839        Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
840    }
841
842    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
843        match self {
844            Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
845            Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
846        }
847        Ok(())
848    }
849}
850
851#[derive(Protocol)]
852#[async_proto(internal)]
853struct F32Proxy([u8; 4]);
854
855impl From<F32Proxy> for f32 {
856    fn from(F32Proxy(bytes): F32Proxy) -> Self {
857        Self::from_be_bytes(bytes)
858    }
859}
860
861impl<'a> From<&'a f32> for F32Proxy {
862    fn from(val: &f32) -> Self {
863        Self(val.to_be_bytes())
864    }
865}
866
867#[derive(Protocol)]
868#[async_proto(internal)]
869struct F64Proxy([u8; 8]);
870
871impl From<F64Proxy> for f64 {
872    fn from(F64Proxy(bytes): F64Proxy) -> Self {
873        Self::from_be_bytes(bytes)
874    }
875}
876
877impl<'a> From<&'a f64> for F64Proxy {
878    fn from(val: &f64) -> Self {
879        Self(val.to_be_bytes())
880    }
881}
882
883#[derive(Protocol)]
884#[async_proto(internal)]
885struct DurationProxy {
886    secs: u64,
887    subsec_nanos: u32,
888}
889
890impl From<DurationProxy> for std::time::Duration {
891    fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
892        Self::new(secs, subsec_nanos)
893    }
894}
895
896impl<'a> From<&'a std::time::Duration> for DurationProxy {
897    fn from(duration: &std::time::Duration) -> Self {
898        Self {
899            secs: duration.as_secs(),
900            subsec_nanos: duration.subsec_nanos(),
901        }
902    }
903}
904
905impl_protocol_for! {
906    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
907    #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
908    type std::num::NonZeroU8;
909
910    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
911    #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
912    type std::num::NonZeroI8;
913
914    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
915    #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
916    type std::num::NonZeroU16;
917
918    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
919    #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
920    type std::num::NonZeroI16;
921
922    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
923    #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
924    type std::num::NonZeroU32;
925
926    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
927    #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
928    type std::num::NonZeroI32;
929
930    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
931    #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
932    type std::num::NonZeroU64;
933
934    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
935    #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
936    type std::num::NonZeroI64;
937
938    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
939    #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
940    type std::num::NonZeroU128;
941
942    #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
943    #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
944    type std::num::NonZeroI128;
945
946    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
947    #[async_proto(via = F32Proxy)]
948    type f32;
949
950    #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
951    #[async_proto(via = F64Proxy)]
952    type f64;
953
954    #[async_proto(where(Idx: Protocol + Send + Sync))]
955    struct Range<Idx> {
956        start: Idx,
957        end: Idx,
958    }
959
960    #[async_proto(where(Idx: Protocol + Sync))]
961    struct RangeFrom<Idx> {
962        start: Idx,
963    }
964
965    #[async_proto(where(Idx: Protocol + Sync))]
966    struct RangeTo<Idx> {
967        end: Idx,
968    }
969
970    #[async_proto(where(Idx: Protocol + Sync))]
971    struct RangeToInclusive<Idx> {
972        end: Idx,
973    }
974
975    #[async_proto(where(T: Protocol + Sync))]
976    enum Option<T> {
977        None,
978        Some(T),
979    }
980
981    #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
982    enum Result<T, E> {
983        Ok(T),
984        Err(E),
985    }
986
987    enum std::convert::Infallible {}
988
989    #[async_proto(where(T: Sync))]
990    struct std::marker::PhantomData<T>;
991
992    struct std::ops::RangeFull;
993
994    #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
995    #[async_proto(via = DurationProxy)]
996    type std::time::Duration;
997}