1#![allow(missing_docs)]
4
5use {
6 std::{
7 collections::{
8 BTreeMap,
9 BTreeSet,
10 HashMap,
11 HashSet,
12 },
13 convert::{
14 TryFrom as _,
15 TryInto as _,
16 },
17 future::Future,
18 hash::Hash,
19 io::prelude::*,
20 ops::{
21 Range,
22 RangeFrom,
23 RangeInclusive,
24 RangeTo,
25 RangeToInclusive,
26 },
27 pin::Pin,
28 },
29 byteorder::{
30 NetworkEndian,
31 ReadBytesExt as _,
32 WriteBytesExt as _,
33 },
34 fallible_collections::{
35 FallibleBox,
36 FallibleVec,
37 },
38 tokio::io::{
39 AsyncRead,
40 AsyncReadExt as _,
41 AsyncWrite,
42 AsyncWriteExt as _,
43 },
44 async_proto_derive::impl_protocol_for,
45 crate::{
46 ErrorContext,
47 LengthPrefixed,
48 Protocol,
49 ReadError,
50 ReadErrorKind,
51 WriteError,
52 WriteErrorKind,
53 },
54};
55
56#[cfg(feature = "bitvec")] mod bitvec;
57#[cfg(feature = "bytes")] mod bytes;
58#[cfg(feature = "bytesize")] mod bytesize;
59#[cfg(feature = "chrono")] mod chrono;
60#[cfg(feature = "chrono-tz")] mod chrono_tz;
61#[cfg(feature = "doubloon")] mod doubloon;
62#[cfg(feature = "either")] mod either;
63#[cfg(feature = "enumset")] mod enumset;
64#[cfg(feature = "git2")] mod git2;
65#[cfg(feature = "gix-hash")] mod gix_hash;
66#[cfg(feature = "noisy_float")] mod noisy_float;
67#[cfg(feature = "nonempty-collections")] mod nonempty_collections;
68#[cfg(feature = "os_info")] mod os_info;
69#[cfg(feature = "rust_decimal")] mod rust_decimal;
70#[cfg(feature = "semver")] mod semver;
71#[cfg(feature = "serde_json")] mod serde_json;
72#[cfg(feature = "serenity")] mod serenity;
73#[cfg(feature = "hematite-nbt")] mod hematite_nbt;
74#[cfg(feature = "url")] mod url;
75#[cfg(feature = "uuid")] mod uuid;
76
77async fn read_len<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
78 let len = match max_len {
79 0 => 0,
80 1..=255 => u8::read(stream).await?.into(),
81 256..=65_535 => u16::read(stream).await?.into(),
82 65_536..=4_294_967_295 => u32::read(stream).await?.into(),
83 _ => u64::read(stream).await?,
84 };
85 if len > max_len {
86 Err(ReadError {
87 context: error_ctx(),
88 kind: ReadErrorKind::MaxLen { len, max_len },
89 })
90 } else {
91 usize::try_from(len).map_err(|e| ReadError {
92 context: error_ctx(),
93 kind: e.into(),
94 })
95 }
96}
97
98async fn write_len<'a, W: AsyncWrite + Unpin + Send + 'a>(sink: &'a mut W, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
99 let len = u64::try_from(len).map_err(|e| WriteError {
100 context: error_ctx(),
101 kind: e.into(),
102 })?;
103 if len > max_len {
104 return Err(WriteError {
105 context: error_ctx(),
106 kind: WriteErrorKind::MaxLen { len, max_len },
107 })
108 }
109 match max_len {
110 0 => {}
111 1..=255 => (len as u8).write(sink).await?,
112 256..=65_535 => (len as u16).write(sink).await?,
113 65_536..=4_294_967_295 => (len as u32).write(sink).await?,
114 _ => len.write(sink).await?,
115 }
116 Ok(())
117}
118
119fn read_len_sync(stream: &mut impl Read, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<usize, ReadError> {
120 let len = match max_len {
121 0 => 0,
122 1..=255 => u8::read_sync(stream)?.into(),
123 256..=65_535 => u16::read_sync(stream)?.into(),
124 65_536..=4_294_967_295 => u32::read_sync(stream)?.into(),
125 _ => u64::read_sync(stream)?,
126 };
127 if len > max_len {
128 Err(ReadError {
129 context: error_ctx(),
130 kind: ReadErrorKind::MaxLen { len, max_len },
131 })
132 } else {
133 usize::try_from(len).map_err(|e| ReadError {
134 context: error_ctx(),
135 kind: e.into(),
136 })
137 }
138}
139
140fn write_len_sync(sink: &mut impl Write, len: usize, max_len: u64, error_ctx: impl Fn() -> ErrorContext) -> Result<(), WriteError> {
141 let len = u64::try_from(len).map_err(|e| WriteError {
142 context: error_ctx(),
143 kind: e.into(),
144 })?;
145 if len > max_len {
146 return Err(WriteError {
147 context: error_ctx(),
148 kind: WriteErrorKind::MaxLen { len, max_len },
149 })
150 }
151 match max_len {
152 0 => {}
153 1..=255 => (len as u8).write_sync(sink)?,
154 256..=65_535 => (len as u16).write_sync(sink)?,
155 65_536..=4_294_967_295 => (len as u32).write_sync(sink)?,
156 _ => len.write_sync(sink)?,
157 }
158 Ok(())
159}
160
161macro_rules! impl_protocol_primitive {
162 ($ty:ty, $read:ident, $write:ident$(, $endian:ty)?) => {
163 impl Protocol for $ty {
165 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
166 Box::pin(async move {
167 Ok(stream.$read().await.map_err(|e| ReadError {
168 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
169 kind: e.into(),
170 })?)
171 })
172 }
173
174 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
175 Box::pin(async move {
176 Ok(sink.$write(*self).await.map_err(|e| WriteError {
177 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
178 kind: e.into(),
179 })?)
180 })
181 }
182
183 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
184 Ok(stream.$read$(::<$endian>)?().map_err(|e| ReadError {
185 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
186 kind: e.into(),
187 })?)
188 }
189
190 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
191 Ok(sink.$write$(::<$endian>)?(*self).map_err(|e| WriteError {
192 context: ErrorContext::BuiltIn { for_type: stringify!($ty) },
193 kind: e.into(),
194 })?)
195 }
196 }
197 };
198}
199
200impl_protocol_primitive!(u8, read_u8, write_u8);
201impl_protocol_primitive!(i8, read_i8, write_i8);
202impl_protocol_primitive!(u16, read_u16, write_u16, NetworkEndian);
203impl_protocol_primitive!(i16, read_i16, write_i16, NetworkEndian);
204impl_protocol_primitive!(u32, read_u32, write_u32, NetworkEndian);
205impl_protocol_primitive!(i32, read_i32, write_i32, NetworkEndian);
206impl_protocol_primitive!(u64, read_u64, write_u64, NetworkEndian);
207impl_protocol_primitive!(i64, read_i64, write_i64, NetworkEndian);
208impl_protocol_primitive!(u128, read_u128, write_u128, NetworkEndian);
209impl_protocol_primitive!(i128, read_i128, write_i128, NetworkEndian);
210
211impl<Idx: Protocol + Send + Sync> Protocol for RangeInclusive<Idx> { fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
213 Box::pin(async move {
214 Ok(Idx::read(stream).await?..=Idx::read(stream).await?)
215 })
216 }
217
218 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
219 Box::pin(async move {
220 self.start().write(sink).await?;
221 self.end().write(sink).await?;
222 Ok(())
223 })
224 }
225
226 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
227 Ok(Idx::read_sync(stream)?..=Idx::read_sync(stream)?)
228 }
229
230 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
231 self.start().write_sync(sink)?;
232 self.end().write_sync(sink)?;
233 Ok(())
234 }
235}
236
237macro_rules! impl_protocol_tuple {
238 ($($ty:ident),*) => {
239 #[allow(unused)]
240 impl<$($ty: Protocol + Send + Sync),*> Protocol for ($($ty,)*) {
241 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
242 Box::pin(async move {
243 Ok((
244 $($ty::read(stream).await?,)*
245 ))
246 })
247 }
248
249 #[allow(non_snake_case)]
250 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
251 Box::pin(async move {
252 let ($($ty,)*) = self;
253 $(
254 $ty.write(sink).await?;
255 )*
256 Ok(())
257 })
258 }
259
260 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
261 Ok((
262 $($ty::read_sync(stream)?,)*
263 ))
264 }
265
266 #[allow(non_snake_case)]
267 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
268 let ($($ty,)*) = self;
269 $(
270 $ty.write_sync(sink)?;
271 )*
272 Ok(())
273 }
274 }
275 };
276}
277
278impl_protocol_tuple!();
279impl_protocol_tuple!(A);
280impl_protocol_tuple!(A, B);
281impl_protocol_tuple!(A, B, C);
282impl_protocol_tuple!(A, B, C, D);
283impl_protocol_tuple!(A, B, C, D, E);
284impl_protocol_tuple!(A, B, C, D, E, F);
285impl_protocol_tuple!(A, B, C, D, E, F, G);
286impl_protocol_tuple!(A, B, C, D, E, F, G, H);
287impl_protocol_tuple!(A, B, C, D, E, F, G, H, I);
288impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J);
289impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K);
290impl_protocol_tuple!(A, B, C, D, E, F, G, H, I, J, K, L);
291
292impl<T: Protocol + Send + Sync, const N: usize> Protocol for [T; N] {
293 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
294 Box::pin(async move {
295 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
296 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
297 kind: e.into(),
298 })?;
299 for _ in 0..N {
300 vec.push(T::read(stream).await?);
301 }
302 Ok(match vec.try_into() {
303 Ok(array) => array,
304 Err(_) => panic!("wrong array length"),
305 })
306 })
307 }
308
309 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
310 Box::pin(async move {
311 for elt in self {
312 elt.write(sink).await?;
313 }
314 Ok(())
315 })
316 }
317
318 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
319 let mut vec = <Vec<_> as FallibleVec<_>>::try_with_capacity(N).map_err(|e| ReadError {
320 context: ErrorContext::BuiltIn { for_type: "[T; N]" },
321 kind: e.into(),
322 })?;
323 for _ in 0..N {
324 vec.push(T::read_sync(stream)?);
325 }
326 Ok(match vec.try_into() {
327 Ok(array) => array,
328 Err(_) => panic!("wrong array length"),
329 })
330 }
331
332 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
333 for elt in self {
334 elt.write_sync(sink)?;
335 }
336 Ok(())
337 }
338}
339
340impl Protocol for bool {
342 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
343 Box::pin(async move {
344 Ok(match u8::read(stream).await? {
345 0 => false,
346 1 => true,
347 n => return Err(ReadError {
348 context: ErrorContext::BuiltIn { for_type: "bool" },
349 kind: ReadErrorKind::UnknownVariant8(n),
350 }),
351 })
352 })
353 }
354
355 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
356 Box::pin(async move {
357 if *self { 1u8 } else { 0 }.write(sink).await
358 })
359 }
360
361 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
362 Ok(match u8::read_sync(stream)? {
363 0 => false,
364 1 => true,
365 n => return Err(ReadError {
366 context: ErrorContext::BuiltIn { for_type: "bool" },
367 kind: ReadErrorKind::UnknownVariant8(n),
368 }),
369 })
370 }
371
372 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
373 if *self { 1u8 } else { 0 }.write_sync(sink)
374 }
375}
376
377impl<T: Protocol> Protocol for Box<T> {
378 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
379 Box::pin(async move {
380 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read(stream).await?).map_err(|e| ReadError {
381 context: ErrorContext::BuiltIn { for_type: "Box" },
382 kind: e.into(),
383 })?)
384 })
385 }
386
387 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
388 (**self).write(sink)
389 }
390
391 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
392 Ok(<Box<_> as FallibleBox<_>>::try_new(T::read_sync(stream)?).map_err(|e| ReadError {
393 context: ErrorContext::BuiltIn { for_type: "Box" },
394 kind: e.into(),
395 })?)
396 }
397
398 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
399 (**self).write_sync(sink)
400 }
401}
402
403impl<T: Protocol + Send + Sync> Protocol for Vec<T> {
408 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
409 Self::read_length_prefixed(stream, u64::MAX)
410 }
411
412 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
413 self.write_length_prefixed(sink, u64::MAX)
414 }
415
416 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
417 Self::read_length_prefixed_sync(stream, u64::MAX)
418 }
419
420 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
421 self.write_length_prefixed_sync(sink, u64::MAX)
422 }
423}
424
425impl<T: Protocol + Send + Sync> LengthPrefixed for Vec<T> {
428 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
429 Box::pin(async move {
430 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
431 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
432 context: ErrorContext::BuiltIn { for_type: "Vec" },
433 kind: e.into(),
434 })?;
435 for _ in 0..len {
436 buf.push(T::read(stream).await?);
437 }
438 Ok(buf)
439 })
440 }
441
442 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
443 Box::pin(async move {
444 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" }).await?;
445 for elt in self {
446 elt.write(sink).await?;
447 }
448 Ok(())
449 })
450 }
451
452 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
453 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
454 let mut buf = <Self as FallibleVec<_>>::try_with_capacity(len).map_err(|e| ReadError {
455 context: ErrorContext::BuiltIn { for_type: "Vec" },
456 kind: e.into(),
457 })?;
458 for _ in 0..len {
459 buf.push(T::read_sync(stream)?);
460 }
461 Ok(buf)
462 }
463
464 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
465 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "Vec" })?;
466 for elt in self {
467 elt.write_sync(sink)?;
468 }
469 Ok(())
470 }
471}
472
473impl<T: Protocol + Ord + Send + Sync + 'static> Protocol for BTreeSet<T> {
475 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
476 Self::read_length_prefixed(stream, u64::MAX)
477 }
478
479 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
480 self.write_length_prefixed(sink, u64::MAX)
481 }
482
483 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
484 Self::read_length_prefixed_sync(stream, u64::MAX)
485 }
486
487 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
488 self.write_length_prefixed_sync(sink, u64::MAX)
489 }
490}
491
492impl<T: Protocol + Ord + Send + Sync + 'static> LengthPrefixed for BTreeSet<T> {
493 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
494 Box::pin(async move {
495 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
496 let mut set = Self::default();
497 for _ in 0..len {
498 set.insert(T::read(stream).await?); }
500 Ok(set)
501 })
502 }
503
504 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
505 Box::pin(async move {
506 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" }).await?;
507 for elt in self {
508 elt.write(sink).await?;
509 }
510 Ok(())
511 })
512 }
513
514 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
515 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?; let mut set = Self::default();
516 for _ in 0..len {
517 set.insert(T::read_sync(stream)?); }
519 Ok(set)
520 }
521
522 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
523 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeSet" })?;
524 for elt in self {
525 elt.write_sync(sink)?;
526 }
527 Ok(())
528 }
529}
530
531impl<T: Protocol + Eq + Hash + Send + Sync> Protocol for HashSet<T> {
533 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
534 Self::read_length_prefixed(stream, u64::MAX)
535 }
536
537 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
538 self.write_length_prefixed(sink, u64::MAX)
539 }
540
541 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
542 Self::read_length_prefixed_sync(stream, u64::MAX)
543 }
544
545 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
546 self.write_length_prefixed_sync(sink, u64::MAX)
547 }
548}
549
550impl<T: Protocol + Eq + Hash + Send + Sync> LengthPrefixed for HashSet<T> {
551 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
552 Box::pin(async move {
553 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
554 let mut set = Self::with_capacity(len); for _ in 0..len {
556 set.insert(T::read(stream).await?);
557 }
558 Ok(set)
559 })
560 }
561
562 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
563 Box::pin(async move {
564 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" }).await?;
565 for elt in self {
566 elt.write(sink).await?;
567 }
568 Ok(())
569 })
570 }
571
572 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
573 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
574 let mut set = Self::with_capacity(len); for _ in 0..len {
576 set.insert(T::read_sync(stream)?);
577 }
578 Ok(set)
579 }
580
581 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
582 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashSet" })?;
583 for elt in self {
584 elt.write_sync(sink)?;
585 }
586 Ok(())
587 }
588}
589
590impl Protocol for String {
592 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
593 Self::read_length_prefixed(stream, u64::MAX)
594 }
595
596 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
597 self.write_length_prefixed(sink, u64::MAX)
598 }
599
600 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
601 Self::read_length_prefixed_sync(stream, u64::MAX)
602 }
603
604 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
605 self.write_length_prefixed_sync(sink, u64::MAX)
606 }
607}
608
609impl LengthPrefixed for String {
611 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
612 Box::pin(async move {
613 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
614 let mut buf = Vec::default();
615 buf.try_resize(len, 0).map_err(|e| ReadError {
616 context: ErrorContext::BuiltIn { for_type: "String" },
617 kind: e.into(),
618 })?;
619 stream.read_exact(&mut buf).await.map_err(|e| ReadError {
620 context: ErrorContext::BuiltIn { for_type: "String" },
621 kind: e.into(),
622 })?;
623 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
624 context: ErrorContext::BuiltIn { for_type: "String" },
625 kind: e.into(),
626 })?)
627 })
628 }
629
630 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
631 Box::pin(async move {
632 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" }).await?;
633 sink.write(self.as_bytes()).await.map_err(|e| WriteError {
634 context: ErrorContext::BuiltIn { for_type: "String" },
635 kind: e.into(),
636 })?;
637 Ok(())
638 })
639 }
640
641 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
642 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
643 let mut buf = Vec::default();
644 buf.try_resize(len, 0).map_err(|e| ReadError {
645 context: ErrorContext::BuiltIn { for_type: "String" },
646 kind: e.into(),
647 })?;
648 stream.read_exact(&mut buf).map_err(|e| ReadError {
649 context: ErrorContext::BuiltIn { for_type: "String" },
650 kind: e.into(),
651 })?;
652 Ok(Self::from_utf8(buf).map_err(|e| ReadError {
653 context: ErrorContext::BuiltIn { for_type: "String" },
654 kind: e.into(),
655 })?)
656 }
657
658 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
659 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "String" })?;
660 sink.write(self.as_bytes()).map_err(|e| WriteError {
661 context: ErrorContext::BuiltIn { for_type: "String" },
662 kind: e.into(),
663 })?;
664 Ok(())
665 }
666}
667
668impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> Protocol for BTreeMap<K, V> {
669 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
670 Self::read_length_prefixed(stream, u64::MAX)
671 }
672
673 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
674 self.write_length_prefixed(sink, u64::MAX)
675 }
676
677 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
678 Self::read_length_prefixed_sync(stream, u64::MAX)
679 }
680
681 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
682 self.write_length_prefixed_sync(sink, u64::MAX)
683 }
684}
685
686impl<K: Protocol + Ord + Send + Sync + 'static, V: Protocol + Send + Sync + 'static> LengthPrefixed for BTreeMap<K, V> {
687 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
688 Box::pin(async move {
689 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
690 let mut map = Self::default();
691 for _ in 0..len {
692 map.insert(K::read(stream).await?, V::read(stream).await?); }
694 Ok(map)
695 })
696 }
697
698 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
699 Box::pin(async move {
700 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" }).await?;
701 for (k, v) in self {
702 k.write(sink).await?;
703 v.write(sink).await?;
704 }
705 Ok(())
706 })
707 }
708
709 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
710 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
711 let mut map = Self::default();
712 for _ in 0..len {
713 map.insert(K::read_sync(stream)?, V::read_sync(stream)?); }
715 Ok(map)
716 }
717
718 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
719 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "BTreeMap" })?;
720 for (k, v) in self {
721 k.write_sync(sink)?;
722 v.write_sync(sink)?;
723 }
724 Ok(())
725 }
726}
727
728impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> Protocol for HashMap<K, V> {
730 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
731 Self::read_length_prefixed(stream, u64::MAX)
732 }
733
734 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
735 self.write_length_prefixed(sink, u64::MAX)
736 }
737
738 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
739 Self::read_length_prefixed_sync(stream, u64::MAX)
740 }
741
742 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
743 self.write_length_prefixed_sync(sink, u64::MAX)
744 }
745}
746
747impl<K: Protocol + Eq + Hash + Send + Sync, V: Protocol + Send + Sync> LengthPrefixed for HashMap<K, V> {
748 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
749 Box::pin(async move {
750 let len = read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
751 let mut map = Self::with_capacity(len); for _ in 0..len {
753 map.insert(K::read(stream).await?, V::read(stream).await?);
754 }
755 Ok(map)
756 })
757 }
758
759 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
760 Box::pin(async move {
761 write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" }).await?;
762 for (k, v) in self {
763 k.write(sink).await?;
764 v.write(sink).await?;
765 }
766 Ok(())
767 })
768 }
769
770 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
771 let len = read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
772 let mut map = Self::with_capacity(len); for _ in 0..len {
774 map.insert(K::read_sync(stream)?, V::read_sync(stream)?);
775 }
776 Ok(map)
777 }
778
779 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
780 write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "HashMap" })?;
781 for (k, v) in self {
782 k.write_sync(sink)?;
783 v.write_sync(sink)?;
784 }
785 Ok(())
786 }
787}
788
789impl<'cow, B: ToOwned + Sync + ?Sized> Protocol for std::borrow::Cow<'cow, B>
793where B::Owned: Protocol + Send + Sync {
794 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
795 Box::pin(async move {
796 Ok(Self::Owned(B::Owned::read(stream).await?))
797 })
798 }
799
800 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
801 Box::pin(async move {
802 match self {
803 Self::Borrowed(borrowed) => (*borrowed).to_owned().write(sink).await?,
804 Self::Owned(owned) => owned.write(sink).await?,
805 }
806 Ok(())
807 })
808 }
809
810 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
811 Ok(Self::Owned(B::Owned::read_sync(stream)?))
812 }
813
814 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
815 match self {
816 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_sync(sink)?,
817 Self::Owned(owned) => owned.write_sync(sink)?,
818 }
819 Ok(())
820 }
821}
822
823impl<'cow, B: ToOwned + Sync + ?Sized> LengthPrefixed for std::borrow::Cow<'cow, B>
827where B::Owned: LengthPrefixed + Send + Sync {
828 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
829 Box::pin(async move {
830 Ok(Self::Owned(B::Owned::read_length_prefixed(stream, max_len).await?))
831 })
832 }
833
834 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
835 Box::pin(async move {
836 match self {
837 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed(sink, max_len).await?,
838 Self::Owned(owned) => owned.write_length_prefixed(sink, max_len).await?,
839 }
840 Ok(())
841 })
842 }
843
844 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
845 Ok(Self::Owned(B::Owned::read_length_prefixed_sync(stream, max_len)?))
846 }
847
848 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
849 match self {
850 Self::Borrowed(borrowed) => (*borrowed).to_owned().write_length_prefixed_sync(sink, max_len)?,
851 Self::Owned(owned) => owned.write_length_prefixed_sync(sink, max_len)?,
852 }
853 Ok(())
854 }
855}
856
857#[derive(Protocol)]
858#[async_proto(internal)]
859struct F32Proxy([u8; 4]);
860
861impl From<F32Proxy> for f32 {
862 fn from(F32Proxy(bytes): F32Proxy) -> Self {
863 Self::from_be_bytes(bytes)
864 }
865}
866
867impl<'a> From<&'a f32> for F32Proxy {
868 fn from(val: &f32) -> Self {
869 Self(val.to_be_bytes())
870 }
871}
872
873#[derive(Protocol)]
874#[async_proto(internal)]
875struct F64Proxy([u8; 8]);
876
877impl From<F64Proxy> for f64 {
878 fn from(F64Proxy(bytes): F64Proxy) -> Self {
879 Self::from_be_bytes(bytes)
880 }
881}
882
883impl<'a> From<&'a f64> for F64Proxy {
884 fn from(val: &f64) -> Self {
885 Self(val.to_be_bytes())
886 }
887}
888
889#[derive(Protocol)]
890#[async_proto(internal)]
891struct DurationProxy {
892 secs: u64,
893 subsec_nanos: u32,
894}
895
896impl From<DurationProxy> for std::time::Duration {
897 fn from(DurationProxy { secs, subsec_nanos }: DurationProxy) -> Self {
898 Self::new(secs, subsec_nanos)
899 }
900}
901
902impl<'a> From<&'a std::time::Duration> for DurationProxy {
903 fn from(duration: &std::time::Duration) -> Self {
904 Self {
905 secs: duration.as_secs(),
906 subsec_nanos: duration.subsec_nanos(),
907 }
908 }
909}
910
911impl_protocol_for! {
912 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
913 #[async_proto(via = u8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
914 type std::num::NonZeroU8;
915
916 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
917 #[async_proto(via = i8, clone, map_err = |_| ReadErrorKind::UnknownVariant8(0))]
918 type std::num::NonZeroI8;
919
920 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
921 #[async_proto(via = u16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
922 type std::num::NonZeroU16;
923
924 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
925 #[async_proto(via = i16, clone, map_err = |_| ReadErrorKind::UnknownVariant16(0))]
926 type std::num::NonZeroI16;
927
928 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
929 #[async_proto(via = u32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
930 type std::num::NonZeroU32;
931
932 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
933 #[async_proto(via = i32, clone, map_err = |_| ReadErrorKind::UnknownVariant32(0))]
934 type std::num::NonZeroI32;
935
936 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
937 #[async_proto(via = u64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
938 type std::num::NonZeroU64;
939
940 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
941 #[async_proto(via = i64, clone, map_err = |_| ReadErrorKind::UnknownVariant64(0))]
942 type std::num::NonZeroI64;
943
944 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
945 #[async_proto(via = u128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
946 type std::num::NonZeroU128;
947
948 #[async_proto(attr(doc = "A nonzero integer is represented like its value."))]
949 #[async_proto(via = i128, clone, map_err = |_| ReadErrorKind::UnknownVariant128(0))]
950 type std::num::NonZeroI128;
951
952 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
953 #[async_proto(via = F32Proxy)]
954 type f32;
955
956 #[async_proto(attr(doc = "Primitive number types are encoded in [big-endian](https://en.wikipedia.org/wiki/Big-endian) format."))]
957 #[async_proto(via = F64Proxy)]
958 type f64;
959
960 #[async_proto(where(Idx: Protocol + Send + Sync))]
961 struct Range<Idx> {
962 start: Idx,
963 end: Idx,
964 }
965
966 #[async_proto(where(Idx: Protocol + Sync))]
967 struct RangeFrom<Idx> {
968 start: Idx,
969 }
970
971 #[async_proto(where(Idx: Protocol + Sync))]
972 struct RangeTo<Idx> {
973 end: Idx,
974 }
975
976 #[async_proto(where(Idx: Protocol + Sync))]
977 struct RangeToInclusive<Idx> {
978 end: Idx,
979 }
980
981 #[async_proto(where(T: Protocol + Sync))]
982 enum Option<T> {
983 None,
984 Some(T),
985 }
986
987 #[async_proto(where(T: Protocol + Sync, E: Protocol + Sync))]
988 enum Result<T, E> {
989 Ok(T),
990 Err(E),
991 }
992
993 enum std::convert::Infallible {}
994
995 #[async_proto(where(T: Sync))]
996 struct std::marker::PhantomData<T>;
997
998 struct std::ops::RangeFull;
999
1000 #[async_proto(attr(doc = "A duration is represented as the number of whole seconds as a [`u64`] followed by the number of subsecond nanoseconds as a [`u32`]."))]
1001 #[async_proto(via = DurationProxy)]
1002 type std::time::Duration;
1003}