1#![allow(missing_docs)]
4
5use {
6 std::{
7 collections::{
8 BTreeMap,
9 BTreeSet,
10 HashMap,
11 HashSet,
12 },
13 convert::{
14 TryFrom as _,
15 TryInto as _,
16 },
17 future::Future,
18 hash::Hash,
19 io::prelude::*,
20 ops::{
21 Range,
22 RangeFrom,
23 RangeInclusive,
24 RangeTo,
25 RangeToInclusive,
26 },
27 pin::Pin,
28 },
29 byteorder::{
30 NetworkEndian,
31 ReadBytesExt as _,
32 WriteBytesExt as _,
33 },
34 fallible_collections::{
35 FallibleBox,
36 FallibleVec,
37 },
38 tokio::io::{
39 AsyncRead,
40 AsyncReadExt as _,
41 AsyncWrite,
42 AsyncWriteExt as _,
43 },
44 async_proto_derive::impl_protocol_for,
45 crate::{
46 ErrorContext,
47 LengthPrefixed,
48 Protocol,
49 ReadError,
50 ReadErrorKind,
51 WriteError,
52 WriteErrorKind,
53 },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "chrono")] mod chrono;
59#[cfg(feature = "chrono-tz")] mod chrono_tz;
60#[cfg(feature = "doubloon")] mod doubloon;
61#[cfg(feature = "either")] mod either;
62#[cfg(feature = "enumset")] mod enumset;
63#[cfg(feature = "git2")] mod git2;
64#[cfg(feature = "gix-hash")] mod gix_hash;
65#[cfg(feature = "noisy_float")] mod noisy_float;
66#[cfg(feature = "rust_decimal")] mod rust_decimal;
67#[cfg(feature = "semver")] mod semver;
68#[cfg(feature = "serde_json")] mod serde_json;
69#[cfg(feature = "serenity")] mod serenity;
70#[cfg(feature = "uuid")] mod uuid;
71
72async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
73 let len = match max_len {
74 0 => 0,
75 1..=255 => u8::read(stream).await?.into(),
76 256..=65_535 => u16::read(stream).await?.into(),
77 65_536..=4_294_967_295 => u32::read(stream).await?.into(),
78 _ => u64::read(stream).await?,
79 };
80 if len > max_len {
81 Err(ReadError {
82 context: error_ctx(),
83 kind: ReadErrorKind::MaxLen { len, max_len },
84 })
85 } else {
86 usize::try_from(len).map_err(|e| ReadError {
87 context: error_ctx(),
88 kind: e.into(),
89 })
90 }
91}
92
93async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
94 let len = u64::try_from(len).map_err(|e| WriteError {
95 context: error_ctx(),
96 kind: e.into(),
97 })?;
98 if len > max_len {
99 return Err(WriteError {
100 context: error_ctx(),
101 kind: WriteErrorKind::MaxLen { len, max_len },
102 })
103 }
104 match max_len {
105 0 => {}
106 1..=255 => (len as u8).write(sink).await?,
107 256..=65_535 => (len as u16).write(sink).await?,
108 65_536..=4_294_967_295 => (len as u32).write(sink).await?,
109 _ => len.write(sink).await?,
110 }
111 Ok(())
112}
113
114fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
115 let len = match max_len {
116 0 => 0,
117 1..=255 => u8::read_sync(stream)?.into(),
118 256..=65_535 => u16::read_sync(stream)?.into(),
119 65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
120 _ => u64::read_sync(stream)?,
121 };
122 if len > max_len {
123 Err(ReadError {
124 context: error_ctx(),
125 kind: ReadErrorKind::MaxLen { len, max_len },
126 })
127 } else {
128 usize::try_from(len).map_err(|e| ReadError {
129 context: error_ctx(),
130 kind: e.into(),
131 })
132 }
133}
134
135fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
136 let len = u64::try_from(len).map_err(|e| WriteError {
137 context: error_ctx(),
138 kind: e.into(),
139 })?;
140 if len > max_len {
141 return Err(WriteError {
142 context: error_ctx(),
143 kind: WriteErrorKind::MaxLen { len, max_len },
144 })
145 }
146 match max_len {
147 0 => {}
148 1..=255 => (len as u8).write_sync(sink)?,
149 256..=65_535 => (len as u16).write_sync(sink)?,
150 65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
151 _ => len.write_sync(sink)?,
152 }
153 Ok(())
154}
155
156macro_rules! impl_protocol_primitive {
157 ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
158 impl Protocol for $ty {
160 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
161 Box::pin(async move {
162 Ok(stream.$read().await.map_err(|e| ReadError {
163 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
164 kind: e.into(),
165 })?)
166 })
167 }
168
169 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
170 Box::pin(async move {
171 Ok(sink.$write(*self).await.map_err(|e| WriteError {
172 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
173 kind: e.into(),
174 })?)
175 })
176 }
177
178 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
179 Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
180 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
181 kind: e.into(),
182 })?)
183 }
184
185 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
186 Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
187 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
188 kind: e.into(),
189 })?)
190 }
191 }
192 };
193}
194
195impl_protocol_primitive!(u8, read_u8, write_u8);
196impl_protocol_primitive!(i8, read_i8, write_i8);
197impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
198impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
199impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
200impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
201impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
202impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
203impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
204impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
205
206impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
208 Box::pin(async move {
209 Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
210 })
211 }
212
213 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
214 Box::pin(async move {
215 self.start().write(sink).await?;
216 self.end().write(sink).await?;
217 Ok(())
218 })
219 }
220
221 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
222 Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
223 }
224
225 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
226 self.start().write_sync(sink)?;
227 self.end().write_sync(sink)?;
228 Ok(())
229 }
230}
231
232macro_rules! impl_protocol_tuple {
233 ($($ty:ident),*) => {
234 #[allow(unused)]
235 impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
236 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
237 Box::pin(async move {
238 Ok((
239 $($ty::read(stream).await?,)*
240 ))
241 })
242 }
243
244 #[allow(non_snake_case)]
245 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
246 Box::pin(async move {
247 let ($($ty,)*) = self;
248 $(
249 $ty.write(sink).await?;
250 )*
251 Ok(())
252 })
253 }
254
255 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
256 Ok((
257 $($ty::read_sync(stream)?,)*
258 ))
259 }
260
261 #[allow(non_snake_case)]
262 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
263 let ($($ty,)*) = self;
264 $(
265 $ty.write_sync(sink)?;
266 )*
267 Ok(())
268 }
269 }
270 };
271}
272
273impl_protocol_tuple!();
274impl_protocol_tuple!(A);
275impl_protocol_tuple!(A, B);
276impl_protocol_tuple!(A, B, C);
277impl_protocol_tuple!(A, B, C, D);
278impl_protocol_tuple!(A, B, C, D, E);
279impl_protocol_tuple!(A, B, C, D, E, F);
280impl_protocol_tuple!(A, B, C, D, E, F, G);
281impl_protocol_tuple!(A, B, C, D, E, F, G, H);
282impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
283impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
284impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
286
287impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
288 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
289 Box::pin(async move {
290 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
291 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
292 kind: e.into(),
293 })?;
294 for _ in 0..N {
295 vec.push(T::read(stream).await?);
296 }
297 Ok(match vec.try_into() {
298 Ok(array) => array,
299 Err(_) => panic!("wrong array length"),
300 })
301 })
302 }
303
304 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
305 Box::pin(async move {
306 for elt in self {
307 elt.write(sink).await?;
308 }
309 Ok(())
310 })
311 }
312
313 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
314 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
315 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
316 kind: e.into(),
317 })?;
318 for _ in 0..N {
319 vec.push(T::read_sync(stream)?);
320 }
321 Ok(match vec.try_into() {
322 Ok(array) => array,
323 Err(_) => panic!("wrong array length"),
324 })
325 }
326
327 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
328 for elt in self {
329 elt.write_sync(sink)?;
330 }
331 Ok(())
332 }
333}
334
335impl Protocol for bool {
337 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
338 Box::pin(async move {
339 Ok(match u8::read(stream).await? {
340 0 => false,
341 1 => true,
342 n => return Err(ReadError {
343 context: ErrorContext::BuiltIn { for_type: "bool" },
344 kind: ReadErrorKind::UnknownVariant8(n),
345 }),
346 })
347 })
348 }
349
350 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
351 Box::pin(async move {
352 if *self { 1u8 } else { 0 }.write(sink).await
353 })
354 }
355
356 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
357 Ok(match u8::read_sync(stream)? {
358 0 => false,
359 1 => true,
360 n => return Err(ReadError {
361 context: ErrorContext::BuiltIn { for_type: "bool" },
362 kind: ReadErrorKind::UnknownVariant8(n),
363 }),
364 })
365 }
366
367 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
368 if *self { 1u8 } else { 0 }.write_sync(sink)
369 }
370}
371
372impl<T: Protocol> Protocol for Box<T> {
373 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
374 Box::pin(async move {
375 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
376 context: ErrorContext::BuiltIn { for_type: "Box" },
377 kind: e.into(),
378 })?)
379 })
380 }
381
382 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
383 (**self).write(sink)
384 }
385
386 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
387 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
388 context: ErrorContext::BuiltIn { for_type: "Box" },
389 kind: e.into(),
390 })?)
391 }
392
393 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
394 (**self).write_sync(sink)
395 }
396}
397
398impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
403 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
404 Self::read_length_prefixed(stream, u64::MAX)
405 }
406
407 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
408 self.write_length_prefixed(sink, u64::MAX)
409 }
410
411 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
412 Self::read_length_prefixed_sync(stream, u64::MAX)
413 }
414
415 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
416 self.write_length_prefixed_sync(sink, u64::MAX)
417 }
418}
419
420impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
423 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
424 Box::pin(async move {
425 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
426 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
427 context: ErrorContext::BuiltIn { for_type: "Vec" },
428 kind: e.into(),
429 })?;
430 for _ in 0..len {
431 buf.push(T::read(stream).await?);
432 }
433 Ok(buf)
434 })
435 }
436
437 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
438 Box::pin(async move {
439 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
440 for elt in self {
441 elt.write(sink).await?;
442 }
443 Ok(())
444 })
445 }
446
447 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
448 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
449 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
450 context: ErrorContext::BuiltIn { for_type: "Vec" },
451 kind: e.into(),
452 })?;
453 for _ in 0..len {
454 buf.push(T::read_sync(stream)?);
455 }
456 Ok(buf)
457 }
458
459 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
460 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
461 for elt in self {
462 elt.write_sync(sink)?;
463 }
464 Ok(())
465 }
466}
467
468impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
470 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
471 Self::read_length_prefixed(stream, u64::MAX)
472 }
473
474 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
475 self.write_length_prefixed(sink, u64::MAX)
476 }
477
478 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
479 Self::read_length_prefixed_sync(stream, u64::MAX)
480 }
481
482 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
483 self.write_length_prefixed_sync(sink, u64::MAX)
484 }
485}
486
487impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
488 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
489 Box::pin(async move {
490 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
491 let mut set = Self::default();
492 for _ in 0..len {
493 set.insert(T::read(stream).await?); }
495 Ok(set)
496 })
497 }
498
499 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
500 Box::pin(async move {
501 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
502 for elt in self {
503 elt.write(sink).await?;
504 }
505 Ok(())
506 })
507 }
508
509 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
510 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?; let mut set = Self::default();
511 for _ in 0..len {
512 set.insert(T::read_sync(stream)?); }
514 Ok(set)
515 }
516
517 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
518 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
519 for elt in self {
520 elt.write_sync(sink)?;
521 }
522 Ok(())
523 }
524}
525
526impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
527 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
528 Self::read_length_prefixed(stream, u64::MAX)
529 }
530
531 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
532 self.write_length_prefixed(sink, u64::MAX)
533 }
534
535 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
536 Self::read_length_prefixed_sync(stream, u64::MAX)
537 }
538
539 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
540 self.write_length_prefixed_sync(sink, u64::MAX)
541 }
542}
543
544impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
545 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
546 Box::pin(async move {
547 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
548 let mut set = Self::with_capacity(len); for _ in 0..len {
550 set.insert(T::read(stream).await?);
551 }
552 Ok(set)
553 })
554 }
555
556 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
557 Box::pin(async move {
558 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
559 for elt in self {
560 elt.write(sink).await?;
561 }
562 Ok(())
563 })
564 }
565
566 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
567 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
568 let mut set = Self::with_capacity(len); for _ in 0..len {
570 set.insert(T::read_sync(stream)?);
571 }
572 Ok(set)
573 }
574
575 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
576 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
577 for elt in self {
578 elt.write_sync(sink)?;
579 }
580 Ok(())
581 }
582}
583
584impl Protocol for String {
586 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
587 Self::read_length_prefixed(stream, u64::MAX)
588 }
589
590 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
591 self.write_length_prefixed(sink, u64::MAX)
592 }
593
594 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
595 Self::read_length_prefixed_sync(stream, u64::MAX)
596 }
597
598 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
599 self.write_length_prefixed_sync(sink, u64::MAX)
600 }
601}
602
603impl LengthPrefixed for String {
605 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
606 Box::pin(async move {
607 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
608 let mut buf = Vec::default();
609 buf.try_resize(len, 0).map_err(|e| ReadError {
610 context: ErrorContext::BuiltIn { for_type: "String" },
611 kind: e.into(),
612 })?;
613 stream.read_exact(&mut buf).await.map_err(|e| ReadError {
614 context: ErrorContext::BuiltIn { for_type: "String" },
615 kind: e.into(),
616 })?;
617 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
618 context: ErrorContext::BuiltIn { for_type: "String" },
619 kind: e.into(),
620 })?)
621 })
622 }
623
624 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
625 Box::pin(async move {
626 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
627 sink.write(self.as_bytes()).await.map_err(|e| WriteError {
628 context: ErrorContext::BuiltIn { for_type: "String" },
629 kind: e.into(),
630 })?;
631 Ok(())
632 })
633 }
634
635 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
636 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
637 let mut buf = Vec::default();
638 buf.try_resize(len, 0).map_err(|e| ReadError {
639 context: ErrorContext::BuiltIn { for_type: "String" },
640 kind: e.into(),
641 })?;
642 stream.read_exact(&mut buf).map_err(|e| ReadError {
643 context: ErrorContext::BuiltIn { for_type: "String" },
644 kind: e.into(),
645 })?;
646 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
647 context: ErrorContext::BuiltIn { for_type: "String" },
648 kind: e.into(),
649 })?)
650 }
651
652 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
653 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
654 sink.write(self.as_bytes()).map_err(|e| WriteError {
655 context: ErrorContext::BuiltIn { for_type: "String" },
656 kind: e.into(),
657 })?;
658 Ok(())
659 }
660}
661
662impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
663 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
664 Self::read_length_prefixed(stream, u64::MAX)
665 }
666
667 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
668 self.write_length_prefixed(sink, u64::MAX)
669 }
670
671 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
672 Self::read_length_prefixed_sync(stream, u64::MAX)
673 }
674
675 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
676 self.write_length_prefixed_sync(sink, u64::MAX)
677 }
678}
679
680impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
681 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
682 Box::pin(async move {
683 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
684 let mut map = Self::default();
685 for _ in 0..len {
686 map.insert(K::read(stream).await?, V::read(stream).await?); }
688 Ok(map)
689 })
690 }
691
692 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
693 Box::pin(async move {
694 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
695 for (k, v) in self {
696 k.write(sink).await?;
697 v.write(sink).await?;
698 }
699 Ok(())
700 })
701 }
702
703 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
704 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
705 let mut map = Self::default();
706 for _ in 0..len {
707 map.insert(K::read_sync(stream)?, V::read_sync(stream)?); }
709 Ok(map)
710 }
711
712 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
713 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
714 for (k, v) in self {
715 k.write_sync(sink)?;
716 v.write_sync(sink)?;
717 }
718 Ok(())
719 }
720}
721
722impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
724 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
725 Self::read_length_prefixed(stream, u64::MAX)
726 }
727
728 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
729 self.write_length_prefixed(sink, u64::MAX)
730 }
731
732 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
733 Self::read_length_prefixed_sync(stream, u64::MAX)
734 }
735
736 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
737 self.write_length_prefixed_sync(sink, u64::MAX)
738 }
739}
740
741impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
742 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
743 Box::pin(async move {
744 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
745 let mut map = Self::with_capacity(len); for _ in 0..len {
747 map.insert(K::read(stream).await?, V::read(stream).await?);
748 }
749 Ok(map)
750 })
751 }
752
753 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
754 Box::pin(async move {
755 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
756 for (k, v) in self {
757 k.write(sink).await?;
758 v.write(sink).await?;
759 }
760 Ok(())
761 })
762 }
763
764 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
765 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
766 let mut map = Self::with_capacity(len); for _ in 0..len {
768 map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
769 }
770 Ok(map)
771 }
772
773 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
774 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
775 for (k, v) in self {
776 k.write_sync(sink)?;
777 v.write_sync(sink)?;
778 }
779 Ok(())
780 }
781}
782
783impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
787where B::Owned: Protocol + Send + Sync {
788 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
789 Box::pin(async move {
790 Ok(Self::Owned(B::Owned::read(stream).await?))
791 })
792 }
793
794 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
795 Box::pin(async move {
796 match self {
797 Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
798 Self::Owned(owned) => owned.write(sink).await?,
799 }
800 Ok(())
801 })
802 }
803
804 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
805 Ok(Self::Owned(B::Owned::read_sync(stream)?))
806 }
807
808 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
809 match self {
810 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
811 Self::Owned(owned) => owned.write_sync(sink)?,
812 }
813 Ok(())
814 }
815}
816
817impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
821where B::Owned: LengthPrefixed + Send + Sync {
822 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
823 Box::pin(async move {
824 Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
825 })
826 }
827
828 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
829 Box::pin(async move {
830 match self {
831 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
832 Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
833 }
834 Ok(())
835 })
836 }
837
838 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
839 Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
840 }
841
842 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
843 match self {
844 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
845 Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
846 }
847 Ok(())
848 }
849}
850
851#[derive(Protocol)]
852#[async_proto(internal)]
853struct F32Proxy([u8; 4]);
854
855impl From<F32Proxy> for f32 {
856 fn from(F32Proxy(bytes): F32Proxy) -> Self {
857 Self::from_be_bytes(bytes)
858 }
859}
860
861impl<'a> From<&'a f32> for F32Proxy {
862 fn from(val: &f32) -> Self {
863 Self(val.to_be_bytes())
864 }
865}
866
867#[derive(Protocol)]
868#[async_proto(internal)]
869struct F64Proxy([u8; 8]);
870
871impl From<F64Proxy> for f64 {
872 fn from(F64Proxy(bytes): F64Proxy) -> Self {
873 Self::from_be_bytes(bytes)
874 }
875}
876
877impl<'a> From<&'a f64> for F64Proxy {
878 fn from(val: &f64) -> Self {
879 Self(val.to_be_bytes())
880 }
881}
882
883#[derive(Protocol)]
884#[async_proto(internal)]
885struct DurationProxy {
886 secs: u64,
887 subsec_nanos: u32,
888}
889
890impl From<DurationProxy> for std::time::Duration {
891 fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
892 Self::new(secs, subsec_nanos)
893 }
894}
895
896impl<'a> From<&'a std::time::Duration> for DurationProxy {
897 fn from(duration: &std::time::Duration) -> Self {
898 Self {
899 secs: duration.as_secs(),
900 subsec_nanos: duration.subsec_nanos(),
901 }
902 }
903}
904
905impl_protocol_for! {
906 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
907 #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
908 type std::num::NonZeroU8;
909
910 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
911 #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
912 type std::num::NonZeroI8;
913
914 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
915 #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
916 type std::num::NonZeroU16;
917
918 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
919 #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
920 type std::num::NonZeroI16;
921
922 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
923 #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
924 type std::num::NonZeroU32;
925
926 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
927 #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
928 type std::num::NonZeroI32;
929
930 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
931 #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
932 type std::num::NonZeroU64;
933
934 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
935 #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
936 type std::num::NonZeroI64;
937
938 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
939 #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
940 type std::num::NonZeroU128;
941
942 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
943 #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
944 type std::num::NonZeroI128;
945
946 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
947 #[async_proto(via = F32Proxy)]
948 type f32;
949
950 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
951 #[async_proto(via = F64Proxy)]
952 type f64;
953
954 #[async_proto(where(Idx: Protocol + Send + Sync))]
955 struct Range<Idx> {
956 start: Idx,
957 end: Idx,
958 }
959
960 #[async_proto(where(Idx: Protocol + Sync))]
961 struct RangeFrom<Idx> {
962 start: Idx,
963 }
964
965 #[async_proto(where(Idx: Protocol + Sync))]
966 struct RangeTo<Idx> {
967 end: Idx,
968 }
969
970 #[async_proto(where(Idx: Protocol + Sync))]
971 struct RangeToInclusive<Idx> {
972 end: Idx,
973 }
974
975 #[async_proto(where(T: Protocol + Sync))]
976 enum Option<T> {
977 None,
978 Some(T),
979 }
980
981 #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
982 enum Result<T, E> {
983 Ok(T),
984 Err(E),
985 }
986
987 enum std::convert::Infallible {}
988
989 #[async_proto(where(T: Sync))]
990 struct std::marker::PhantomData<T>;
991
992 struct std::ops::RangeFull;
993
994 #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
995 #[async_proto(via = DurationProxy)]
996 type std::time::Duration;
997}