1#![allow(missing_docs)]
4
5use {
6 std::{
7 collections::{
8 BTreeMap,
9 BTreeSet,
10 HashMap,
11 HashSet,
12 },
13 convert::{
14 TryFrom as _,
15 TryInto as _,
16 },
17 future::Future,
18 hash::Hash,
19 io::prelude::*,
20 ops::{
21 Range,
22 RangeFrom,
23 RangeInclusive,
24 RangeTo,
25 RangeToInclusive,
26 },
27 pin::Pin,
28 },
29 byteorder::{
30 NetworkEndian,
31 ReadBytesExt as _,
32 WriteBytesExt as _,
33 },
34 fallible_collections::{
35 FallibleBox,
36 FallibleVec,
37 },
38 tokio::io::{
39 AsyncRead,
40 AsyncReadExt as _,
41 AsyncWrite,
42 AsyncWriteExt as _,
43 },
44 async_proto_derive::impl_protocol_for,
45 crate::{
46 ErrorContext,
47 LengthPrefixed,
48 Protocol,
49 ReadError,
50 ReadErrorKind,
51 WriteError,
52 WriteErrorKind,
53 },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "nonempty-collections")] mod nonempty_collections;
68#[cfg(feature = "rust_decimal")] mod rust_decimal;
69#[cfg(feature = "semver")] mod semver;
70#[cfg(feature = "serde_json")] mod serde_json;
71#[cfg(feature = "serenity")] mod serenity;
72#[cfg(feature = "hematite-nbt")] mod hematite_nbt;
73#[cfg(feature = "url")] mod url;
74#[cfg(feature = "uuid")] mod uuid;
75
76async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
77 let len = match max_len {
78 0 => 0,
79 1..=255 => u8::read(stream).await?.into(),
80 256..=65_535 => u16::read(stream).await?.into(),
81 65_536..=4_294_967_295 => u32::read(stream).await?.into(),
82 _ => u64::read(stream).await?,
83 };
84 if len > max_len {
85 Err(ReadError {
86 context: error_ctx(),
87 kind: ReadErrorKind::MaxLen { len, max_len },
88 })
89 } else {
90 usize::try_from(len).map_err(|e| ReadError {
91 context: error_ctx(),
92 kind: e.into(),
93 })
94 }
95}
96
97async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
98 let len = u64::try_from(len).map_err(|e| WriteError {
99 context: error_ctx(),
100 kind: e.into(),
101 })?;
102 if len > max_len {
103 return Err(WriteError {
104 context: error_ctx(),
105 kind: WriteErrorKind::MaxLen { len, max_len },
106 })
107 }
108 match max_len {
109 0 => {}
110 1..=255 => (len as u8).write(sink).await?,
111 256..=65_535 => (len as u16).write(sink).await?,
112 65_536..=4_294_967_295 => (len as u32).write(sink).await?,
113 _ => len.write(sink).await?,
114 }
115 Ok(())
116}
117
118fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
119 let len = match max_len {
120 0 => 0,
121 1..=255 => u8::read_sync(stream)?.into(),
122 256..=65_535 => u16::read_sync(stream)?.into(),
123 65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
124 _ => u64::read_sync(stream)?,
125 };
126 if len > max_len {
127 Err(ReadError {
128 context: error_ctx(),
129 kind: ReadErrorKind::MaxLen { len, max_len },
130 })
131 } else {
132 usize::try_from(len).map_err(|e| ReadError {
133 context: error_ctx(),
134 kind: e.into(),
135 })
136 }
137}
138
139fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
140 let len = u64::try_from(len).map_err(|e| WriteError {
141 context: error_ctx(),
142 kind: e.into(),
143 })?;
144 if len > max_len {
145 return Err(WriteError {
146 context: error_ctx(),
147 kind: WriteErrorKind::MaxLen { len, max_len },
148 })
149 }
150 match max_len {
151 0 => {}
152 1..=255 => (len as u8).write_sync(sink)?,
153 256..=65_535 => (len as u16).write_sync(sink)?,
154 65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
155 _ => len.write_sync(sink)?,
156 }
157 Ok(())
158}
159
160macro_rules! impl_protocol_primitive {
161 ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
162 impl Protocol for $ty {
164 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
165 Box::pin(async move {
166 Ok(stream.$read().await.map_err(|e| ReadError {
167 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
168 kind: e.into(),
169 })?)
170 })
171 }
172
173 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
174 Box::pin(async move {
175 Ok(sink.$write(*self).await.map_err(|e| WriteError {
176 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
177 kind: e.into(),
178 })?)
179 })
180 }
181
182 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
183 Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
184 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
185 kind: e.into(),
186 })?)
187 }
188
189 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
190 Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
191 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
192 kind: e.into(),
193 })?)
194 }
195 }
196 };
197}
198
199impl_protocol_primitive!(u8, read_u8, write_u8);
200impl_protocol_primitive!(i8, read_i8, write_i8);
201impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
202impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
203impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
204impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
205impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
206impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
207impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
208impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
209
210impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
212 Box::pin(async move {
213 Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
214 })
215 }
216
217 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
218 Box::pin(async move {
219 self.start().write(sink).await?;
220 self.end().write(sink).await?;
221 Ok(())
222 })
223 }
224
225 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
226 Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
227 }
228
229 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
230 self.start().write_sync(sink)?;
231 self.end().write_sync(sink)?;
232 Ok(())
233 }
234}
235
236macro_rules! impl_protocol_tuple {
237 ($($ty:ident),*) => {
238 #[allow(unused)]
239 impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
240 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
241 Box::pin(async move {
242 Ok((
243 $($ty::read(stream).await?,)*
244 ))
245 })
246 }
247
248 #[allow(non_snake_case)]
249 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
250 Box::pin(async move {
251 let ($($ty,)*) = self;
252 $(
253 $ty.write(sink).await?;
254 )*
255 Ok(())
256 })
257 }
258
259 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
260 Ok((
261 $($ty::read_sync(stream)?,)*
262 ))
263 }
264
265 #[allow(non_snake_case)]
266 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
267 let ($($ty,)*) = self;
268 $(
269 $ty.write_sync(sink)?;
270 )*
271 Ok(())
272 }
273 }
274 };
275}
276
277impl_protocol_tuple!();
278impl_protocol_tuple!(A);
279impl_protocol_tuple!(A, B);
280impl_protocol_tuple!(A, B, C);
281impl_protocol_tuple!(A, B, C, D);
282impl_protocol_tuple!(A, B, C, D, E);
283impl_protocol_tuple!(A, B, C, D, E, F);
284impl_protocol_tuple!(A, B, C, D, E, F, G);
285impl_protocol_tuple!(A, B, C, D, E, F, G, H);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
288impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
289impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
290
291impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
292 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
293 Box::pin(async move {
294 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
295 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
296 kind: e.into(),
297 })?;
298 for _ in 0..N {
299 vec.push(T::read(stream).await?);
300 }
301 Ok(match vec.try_into() {
302 Ok(array) => array,
303 Err(_) => panic!("wrong array length"),
304 })
305 })
306 }
307
308 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
309 Box::pin(async move {
310 for elt in self {
311 elt.write(sink).await?;
312 }
313 Ok(())
314 })
315 }
316
317 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
318 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
319 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
320 kind: e.into(),
321 })?;
322 for _ in 0..N {
323 vec.push(T::read_sync(stream)?);
324 }
325 Ok(match vec.try_into() {
326 Ok(array) => array,
327 Err(_) => panic!("wrong array length"),
328 })
329 }
330
331 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
332 for elt in self {
333 elt.write_sync(sink)?;
334 }
335 Ok(())
336 }
337}
338
339impl Protocol for bool {
341 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
342 Box::pin(async move {
343 Ok(match u8::read(stream).await? {
344 0 => false,
345 1 => true,
346 n => return Err(ReadError {
347 context: ErrorContext::BuiltIn { for_type: "bool" },
348 kind: ReadErrorKind::UnknownVariant8(n),
349 }),
350 })
351 })
352 }
353
354 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
355 Box::pin(async move {
356 if *self { 1u8 } else { 0 }.write(sink).await
357 })
358 }
359
360 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
361 Ok(match u8::read_sync(stream)? {
362 0 => false,
363 1 => true,
364 n => return Err(ReadError {
365 context: ErrorContext::BuiltIn { for_type: "bool" },
366 kind: ReadErrorKind::UnknownVariant8(n),
367 }),
368 })
369 }
370
371 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
372 if *self { 1u8 } else { 0 }.write_sync(sink)
373 }
374}
375
376impl<T: Protocol> Protocol for Box<T> {
377 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
378 Box::pin(async move {
379 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
380 context: ErrorContext::BuiltIn { for_type: "Box" },
381 kind: e.into(),
382 })?)
383 })
384 }
385
386 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
387 (**self).write(sink)
388 }
389
390 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
391 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
392 context: ErrorContext::BuiltIn { for_type: "Box" },
393 kind: e.into(),
394 })?)
395 }
396
397 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
398 (**self).write_sync(sink)
399 }
400}
401
402impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
407 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
408 Self::read_length_prefixed(stream, u64::MAX)
409 }
410
411 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
412 self.write_length_prefixed(sink, u64::MAX)
413 }
414
415 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
416 Self::read_length_prefixed_sync(stream, u64::MAX)
417 }
418
419 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
420 self.write_length_prefixed_sync(sink, u64::MAX)
421 }
422}
423
424impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
427 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
428 Box::pin(async move {
429 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
430 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
431 context: ErrorContext::BuiltIn { for_type: "Vec" },
432 kind: e.into(),
433 })?;
434 for _ in 0..len {
435 buf.push(T::read(stream).await?);
436 }
437 Ok(buf)
438 })
439 }
440
441 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
442 Box::pin(async move {
443 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
444 for elt in self {
445 elt.write(sink).await?;
446 }
447 Ok(())
448 })
449 }
450
451 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
452 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
453 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
454 context: ErrorContext::BuiltIn { for_type: "Vec" },
455 kind: e.into(),
456 })?;
457 for _ in 0..len {
458 buf.push(T::read_sync(stream)?);
459 }
460 Ok(buf)
461 }
462
463 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
464 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
465 for elt in self {
466 elt.write_sync(sink)?;
467 }
468 Ok(())
469 }
470}
471
472impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
474 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
475 Self::read_length_prefixed(stream, u64::MAX)
476 }
477
478 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
479 self.write_length_prefixed(sink, u64::MAX)
480 }
481
482 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
483 Self::read_length_prefixed_sync(stream, u64::MAX)
484 }
485
486 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
487 self.write_length_prefixed_sync(sink, u64::MAX)
488 }
489}
490
491impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
492 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
493 Box::pin(async move {
494 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
495 let mut set = Self::default();
496 for _ in 0..len {
497 set.insert(T::read(stream).await?); }
499 Ok(set)
500 })
501 }
502
503 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
504 Box::pin(async move {
505 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
506 for elt in self {
507 elt.write(sink).await?;
508 }
509 Ok(())
510 })
511 }
512
513 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
514 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?; let mut set = Self::default();
515 for _ in 0..len {
516 set.insert(T::read_sync(stream)?); }
518 Ok(set)
519 }
520
521 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
522 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
523 for elt in self {
524 elt.write_sync(sink)?;
525 }
526 Ok(())
527 }
528}
529
530impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
532 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
533 Self::read_length_prefixed(stream, u64::MAX)
534 }
535
536 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
537 self.write_length_prefixed(sink, u64::MAX)
538 }
539
540 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
541 Self::read_length_prefixed_sync(stream, u64::MAX)
542 }
543
544 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
545 self.write_length_prefixed_sync(sink, u64::MAX)
546 }
547}
548
549impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
550 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
551 Box::pin(async move {
552 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
553 let mut set = Self::with_capacity(len); for _ in 0..len {
555 set.insert(T::read(stream).await?);
556 }
557 Ok(set)
558 })
559 }
560
561 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
562 Box::pin(async move {
563 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
564 for elt in self {
565 elt.write(sink).await?;
566 }
567 Ok(())
568 })
569 }
570
571 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
572 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
573 let mut set = Self::with_capacity(len); for _ in 0..len {
575 set.insert(T::read_sync(stream)?);
576 }
577 Ok(set)
578 }
579
580 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
581 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
582 for elt in self {
583 elt.write_sync(sink)?;
584 }
585 Ok(())
586 }
587}
588
589impl Protocol for String {
591 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
592 Self::read_length_prefixed(stream, u64::MAX)
593 }
594
595 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
596 self.write_length_prefixed(sink, u64::MAX)
597 }
598
599 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
600 Self::read_length_prefixed_sync(stream, u64::MAX)
601 }
602
603 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
604 self.write_length_prefixed_sync(sink, u64::MAX)
605 }
606}
607
608impl LengthPrefixed for String {
610 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
611 Box::pin(async move {
612 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
613 let mut buf = Vec::default();
614 buf.try_resize(len, 0).map_err(|e| ReadError {
615 context: ErrorContext::BuiltIn { for_type: "String" },
616 kind: e.into(),
617 })?;
618 stream.read_exact(&mut buf).await.map_err(|e| ReadError {
619 context: ErrorContext::BuiltIn { for_type: "String" },
620 kind: e.into(),
621 })?;
622 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
623 context: ErrorContext::BuiltIn { for_type: "String" },
624 kind: e.into(),
625 })?)
626 })
627 }
628
629 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
630 Box::pin(async move {
631 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
632 sink.write(self.as_bytes()).await.map_err(|e| WriteError {
633 context: ErrorContext::BuiltIn { for_type: "String" },
634 kind: e.into(),
635 })?;
636 Ok(())
637 })
638 }
639
640 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
641 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
642 let mut buf = Vec::default();
643 buf.try_resize(len, 0).map_err(|e| ReadError {
644 context: ErrorContext::BuiltIn { for_type: "String" },
645 kind: e.into(),
646 })?;
647 stream.read_exact(&mut buf).map_err(|e| ReadError {
648 context: ErrorContext::BuiltIn { for_type: "String" },
649 kind: e.into(),
650 })?;
651 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
652 context: ErrorContext::BuiltIn { for_type: "String" },
653 kind: e.into(),
654 })?)
655 }
656
657 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
658 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
659 sink.write(self.as_bytes()).map_err(|e| WriteError {
660 context: ErrorContext::BuiltIn { for_type: "String" },
661 kind: e.into(),
662 })?;
663 Ok(())
664 }
665}
666
667impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
668 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
669 Self::read_length_prefixed(stream, u64::MAX)
670 }
671
672 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
673 self.write_length_prefixed(sink, u64::MAX)
674 }
675
676 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
677 Self::read_length_prefixed_sync(stream, u64::MAX)
678 }
679
680 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
681 self.write_length_prefixed_sync(sink, u64::MAX)
682 }
683}
684
685impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
686 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
687 Box::pin(async move {
688 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
689 let mut map = Self::default();
690 for _ in 0..len {
691 map.insert(K::read(stream).await?, V::read(stream).await?); }
693 Ok(map)
694 })
695 }
696
697 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
698 Box::pin(async move {
699 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
700 for (k, v) in self {
701 k.write(sink).await?;
702 v.write(sink).await?;
703 }
704 Ok(())
705 })
706 }
707
708 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
709 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
710 let mut map = Self::default();
711 for _ in 0..len {
712 map.insert(K::read_sync(stream)?, V::read_sync(stream)?); }
714 Ok(map)
715 }
716
717 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
718 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
719 for (k, v) in self {
720 k.write_sync(sink)?;
721 v.write_sync(sink)?;
722 }
723 Ok(())
724 }
725}
726
727impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
729 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
730 Self::read_length_prefixed(stream, u64::MAX)
731 }
732
733 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
734 self.write_length_prefixed(sink, u64::MAX)
735 }
736
737 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
738 Self::read_length_prefixed_sync(stream, u64::MAX)
739 }
740
741 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
742 self.write_length_prefixed_sync(sink, u64::MAX)
743 }
744}
745
746impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
747 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
748 Box::pin(async move {
749 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
750 let mut map = Self::with_capacity(len); for _ in 0..len {
752 map.insert(K::read(stream).await?, V::read(stream).await?);
753 }
754 Ok(map)
755 })
756 }
757
758 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
759 Box::pin(async move {
760 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
761 for (k, v) in self {
762 k.write(sink).await?;
763 v.write(sink).await?;
764 }
765 Ok(())
766 })
767 }
768
769 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
770 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
771 let mut map = Self::with_capacity(len); for _ in 0..len {
773 map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
774 }
775 Ok(map)
776 }
777
778 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
779 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
780 for (k, v) in self {
781 k.write_sync(sink)?;
782 v.write_sync(sink)?;
783 }
784 Ok(())
785 }
786}
787
788impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
792where B::Owned: Protocol + Send + Sync {
793 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
794 Box::pin(async move {
795 Ok(Self::Owned(B::Owned::read(stream).await?))
796 })
797 }
798
799 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
800 Box::pin(async move {
801 match self {
802 Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
803 Self::Owned(owned) => owned.write(sink).await?,
804 }
805 Ok(())
806 })
807 }
808
809 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
810 Ok(Self::Owned(B::Owned::read_sync(stream)?))
811 }
812
813 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
814 match self {
815 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
816 Self::Owned(owned) => owned.write_sync(sink)?,
817 }
818 Ok(())
819 }
820}
821
822impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
826where B::Owned: LengthPrefixed + Send + Sync {
827 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
828 Box::pin(async move {
829 Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
830 })
831 }
832
833 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
834 Box::pin(async move {
835 match self {
836 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
837 Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
838 }
839 Ok(())
840 })
841 }
842
843 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
844 Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
845 }
846
847 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
848 match self {
849 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
850 Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
851 }
852 Ok(())
853 }
854}
855
856#[derive(Protocol)]
857#[async_proto(internal)]
858struct F32Proxy([u8; 4]);
859
860impl From<F32Proxy> for f32 {
861 fn from(F32Proxy(bytes): F32Proxy) -> Self {
862 Self::from_be_bytes(bytes)
863 }
864}
865
866impl<'a> From<&'a f32> for F32Proxy {
867 fn from(val: &f32) -> Self {
868 Self(val.to_be_bytes())
869 }
870}
871
872#[derive(Protocol)]
873#[async_proto(internal)]
874struct F64Proxy([u8; 8]);
875
876impl From<F64Proxy> for f64 {
877 fn from(F64Proxy(bytes): F64Proxy) -> Self {
878 Self::from_be_bytes(bytes)
879 }
880}
881
882impl<'a> From<&'a f64> for F64Proxy {
883 fn from(val: &f64) -> Self {
884 Self(val.to_be_bytes())
885 }
886}
887
888#[derive(Protocol)]
889#[async_proto(internal)]
890struct DurationProxy {
891 secs: u64,
892 subsec_nanos: u32,
893}
894
895impl From<DurationProxy> for std::time::Duration {
896 fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
897 Self::new(secs, subsec_nanos)
898 }
899}
900
901impl<'a> From<&'a std::time::Duration> for DurationProxy {
902 fn from(duration: &std::time::Duration) -> Self {
903 Self {
904 secs: duration.as_secs(),
905 subsec_nanos: duration.subsec_nanos(),
906 }
907 }
908}
909
910impl_protocol_for! {
911 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
912 #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
913 type std::num::NonZeroU8;
914
915 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
916 #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
917 type std::num::NonZeroI8;
918
919 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
920 #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
921 type std::num::NonZeroU16;
922
923 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
924 #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
925 type std::num::NonZeroI16;
926
927 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
928 #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
929 type std::num::NonZeroU32;
930
931 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
932 #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
933 type std::num::NonZeroI32;
934
935 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
936 #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
937 type std::num::NonZeroU64;
938
939 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
940 #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
941 type std::num::NonZeroI64;
942
943 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
944 #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
945 type std::num::NonZeroU128;
946
947 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
948 #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
949 type std::num::NonZeroI128;
950
951 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
952 #[async_proto(via = F32Proxy)]
953 type f32;
954
955 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
956 #[async_proto(via = F64Proxy)]
957 type f64;
958
959 #[async_proto(where(Idx: Protocol + Send + Sync))]
960 struct Range<Idx> {
961 start: Idx,
962 end: Idx,
963 }
964
965 #[async_proto(where(Idx: Protocol + Sync))]
966 struct RangeFrom<Idx> {
967 start: Idx,
968 }
969
970 #[async_proto(where(Idx: Protocol + Sync))]
971 struct RangeTo<Idx> {
972 end: Idx,
973 }
974
975 #[async_proto(where(Idx: Protocol + Sync))]
976 struct RangeToInclusive<Idx> {
977 end: Idx,
978 }
979
980 #[async_proto(where(T: Protocol + Sync))]
981 enum Option<T> {
982 None,
983 Some(T),
984 }
985
986 #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
987 enum Result<T, E> {
988 Ok(T),
989 Err(E),
990 }
991
992 enum std::convert::Infallible {}
993
994 #[async_proto(where(T: Sync))]
995 struct std::marker::PhantomData<T>;
996
997 struct std::ops::RangeFull;
998
999 #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
1000 #[async_proto(via = DurationProxy)]
1001 type std::time::Duration;
1002}