1use core::any::TypeId;
2use core::fmt::{self, Debug};
3use core::future::{pending, Future};
4use core::hash::{Hash, Hasher};
5use core::iter::zip;
6use core::marker::PhantomData;
7use core::mem;
8use core::ops::{Deref, DerefMut};
9use core::pin::Pin;
10
11use bytes::{Buf as _, BufMut as _, Bytes, BytesMut};
12use futures::stream::{self, FuturesUnordered};
13use futures::{Stream, StreamExt as _, TryStreamExt as _};
14use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::codec::{Encoder as _, FramedRead};
20use tokio_util::io::StreamReader;
21use tracing::{debug, error, instrument, trace, Instrument as _, Span};
22use wasm_tokio::cm::{
23 BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
24 ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
25 U32Codec, U64Codec, U8Codec,
26};
27use wasm_tokio::{
28 CoreNameDecoder, CoreNameEncoder, CoreVecDecoder, CoreVecDecoderBytes, CoreVecEncoderBytes,
29 Leb128DecoderI128, Leb128DecoderI16, Leb128DecoderI32, Leb128DecoderI64, Leb128DecoderI8,
30 Leb128DecoderU128, Leb128DecoderU16, Leb128DecoderU32, Leb128DecoderU64, Leb128DecoderU8,
31 Leb128Encoder, Utf8Codec,
32};
33
34use crate::{Incoming, Index as _};
35
36#[repr(transparent)]
38pub struct ResourceBorrow<T: ?Sized> {
39 repr: Bytes,
40 _ty: PhantomData<T>,
41}
42
43impl<T: ?Sized> From<Bytes> for ResourceBorrow<T> {
44 fn from(repr: Bytes) -> Self {
45 Self {
46 repr,
47 _ty: PhantomData,
48 }
49 }
50}
51
52impl<T: ?Sized> From<Vec<u8>> for ResourceBorrow<T> {
53 fn from(repr: Vec<u8>) -> Self {
54 Self {
55 repr: repr.into(),
56 _ty: PhantomData,
57 }
58 }
59}
60
61impl<T: ?Sized> From<ResourceBorrow<T>> for Bytes {
62 fn from(ResourceBorrow { repr, .. }: ResourceBorrow<T>) -> Self {
63 repr
64 }
65}
66
67impl<T: ?Sized> PartialEq for ResourceBorrow<T> {
68 fn eq(&self, other: &Self) -> bool {
69 self.repr == other.repr
70 }
71}
72
73impl<T: ?Sized> Eq for ResourceBorrow<T> {}
74
75impl<T: ?Sized> Hash for ResourceBorrow<T> {
76 fn hash<H: Hasher>(&self, state: &mut H) {
77 self.repr.hash(state);
78 }
79}
80
81impl<T: ?Sized> AsRef<[u8]> for ResourceBorrow<T> {
82 fn as_ref(&self) -> &[u8] {
83 &self.repr
84 }
85}
86
87impl<T: ?Sized> AsRef<Bytes> for ResourceBorrow<T> {
88 fn as_ref(&self) -> &Bytes {
89 &self.repr
90 }
91}
92
93impl<T: ?Sized + 'static> Debug for ResourceBorrow<T> {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(f, "borrow<{:?}>", TypeId::of::<T>())
96 }
97}
98
99impl<T: ?Sized> Clone for ResourceBorrow<T> {
100 fn clone(&self) -> Self {
101 Self {
102 repr: self.repr.clone(),
103 _ty: PhantomData,
104 }
105 }
106}
107
108impl<T: ?Sized> ResourceBorrow<T> {
109 pub fn new(repr: impl Into<Bytes>) -> Self {
111 Self::from(repr.into())
112 }
113}
114
115#[repr(transparent)]
117pub struct ResourceOwn<T: ?Sized> {
118 repr: Bytes,
119 _ty: PhantomData<T>,
120}
121
122impl<T: ?Sized> From<ResourceOwn<T>> for ResourceBorrow<T> {
123 fn from(ResourceOwn { repr, _ty }: ResourceOwn<T>) -> Self {
124 Self {
125 repr,
126 _ty: PhantomData,
127 }
128 }
129}
130
131impl<T: ?Sized> From<Bytes> for ResourceOwn<T> {
132 fn from(repr: Bytes) -> Self {
133 Self {
134 repr,
135 _ty: PhantomData,
136 }
137 }
138}
139
140impl<T: ?Sized> From<Vec<u8>> for ResourceOwn<T> {
141 fn from(repr: Vec<u8>) -> Self {
142 Self {
143 repr: repr.into(),
144 _ty: PhantomData,
145 }
146 }
147}
148
149impl<T: ?Sized> From<ResourceOwn<T>> for Bytes {
150 fn from(ResourceOwn { repr, .. }: ResourceOwn<T>) -> Self {
151 repr
152 }
153}
154
155impl<T: ?Sized> PartialEq for ResourceOwn<T> {
156 fn eq(&self, other: &Self) -> bool {
157 self.repr == other.repr
158 }
159}
160
161impl<T: ?Sized> Eq for ResourceOwn<T> {}
162
163impl<T: ?Sized> Hash for ResourceOwn<T> {
164 fn hash<H: Hasher>(&self, state: &mut H) {
165 self.repr.hash(state);
166 }
167}
168
169impl<T: ?Sized> AsRef<[u8]> for ResourceOwn<T> {
170 fn as_ref(&self) -> &[u8] {
171 &self.repr
172 }
173}
174
175impl<T: ?Sized> AsRef<Bytes> for ResourceOwn<T> {
176 fn as_ref(&self) -> &Bytes {
177 &self.repr
178 }
179}
180
181impl<T: ?Sized + 'static> Debug for ResourceOwn<T> {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 write!(f, "own<{:?}>", TypeId::of::<T>())
184 }
185}
186
187impl<T: ?Sized> Clone for ResourceOwn<T> {
188 fn clone(&self) -> Self {
189 Self {
190 repr: self.repr.clone(),
191 _ty: PhantomData,
192 }
193 }
194}
195
196impl<T: ?Sized> ResourceOwn<T> {
197 pub fn new(repr: impl Into<Bytes>) -> Self {
199 Self::from(repr.into())
200 }
201
202 pub fn as_borrow(&self) -> ResourceBorrow<T> {
204 ResourceBorrow {
205 repr: self.repr.clone(),
206 _ty: PhantomData,
207 }
208 }
209}
210
211pub type DeferredFn<T> = Box<
213 dyn FnOnce(T, Vec<usize>) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>> + Send,
214>;
215
216pub trait Deferred<T> {
218 fn take_deferred(&mut self) -> Option<DeferredFn<T>>;
220}
221
222macro_rules! impl_deferred_sync {
223 ($t:ty) => {
224 impl<T> Deferred<T> for $t {
225 fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
226 None
227 }
228 }
229 };
230}
231
232impl_deferred_sync!(BoolCodec);
233impl_deferred_sync!(S8Codec);
234impl_deferred_sync!(U8Codec);
235impl_deferred_sync!(S16Codec);
236impl_deferred_sync!(U16Codec);
237impl_deferred_sync!(S32Codec);
238impl_deferred_sync!(U32Codec);
239impl_deferred_sync!(S64Codec);
240impl_deferred_sync!(U64Codec);
241impl_deferred_sync!(F32Codec);
242impl_deferred_sync!(F64Codec);
243impl_deferred_sync!(CoreNameDecoder);
244impl_deferred_sync!(CoreNameEncoder);
245impl_deferred_sync!(CoreVecDecoderBytes);
246impl_deferred_sync!(CoreVecEncoderBytes);
247impl_deferred_sync!(Utf8Codec);
248impl_deferred_sync!(PrimValEncoder);
249impl_deferred_sync!(Leb128Encoder);
250impl_deferred_sync!(Leb128DecoderI8);
251impl_deferred_sync!(Leb128DecoderU8);
252impl_deferred_sync!(Leb128DecoderI16);
253impl_deferred_sync!(Leb128DecoderU16);
254impl_deferred_sync!(Leb128DecoderI32);
255impl_deferred_sync!(Leb128DecoderU32);
256impl_deferred_sync!(Leb128DecoderI64);
257impl_deferred_sync!(Leb128DecoderU64);
258impl_deferred_sync!(Leb128DecoderI128);
259impl_deferred_sync!(Leb128DecoderU128);
260impl_deferred_sync!(ResourceEncoder);
261impl_deferred_sync!(UnitCodec);
262impl_deferred_sync!(ListDecoderU8);
263
264impl_deferred_sync!(CoreVecDecoder<BoolCodec>);
265impl_deferred_sync!(CoreVecDecoder<S8Codec>);
266impl_deferred_sync!(CoreVecDecoder<U8Codec>);
267impl_deferred_sync!(CoreVecDecoder<S16Codec>);
268impl_deferred_sync!(CoreVecDecoder<U16Codec>);
269impl_deferred_sync!(CoreVecDecoder<S32Codec>);
270impl_deferred_sync!(CoreVecDecoder<U32Codec>);
271impl_deferred_sync!(CoreVecDecoder<S64Codec>);
272impl_deferred_sync!(CoreVecDecoder<U64Codec>);
273impl_deferred_sync!(CoreVecDecoder<F32Codec>);
274impl_deferred_sync!(CoreVecDecoder<F64Codec>);
275impl_deferred_sync!(CoreVecDecoder<CoreNameDecoder>);
276impl_deferred_sync!(CoreVecDecoder<CoreVecDecoderBytes>);
277impl_deferred_sync!(CoreVecDecoder<Utf8Codec>);
278impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI8>);
279impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU8>);
280impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI16>);
281impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU16>);
282impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI32>);
283impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU32>);
284impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI64>);
285impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU64>);
286impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI128>);
287impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU128>);
288impl_deferred_sync!(CoreVecDecoder<UnitCodec>);
289
290pub struct SyncCodec<T>(pub T);
295
296impl<T> Deref for SyncCodec<T> {
297 type Target = T;
298
299 fn deref(&self) -> &Self::Target {
300 &self.0
301 }
302}
303
304impl<T> DerefMut for SyncCodec<T> {
305 fn deref_mut(&mut self) -> &mut Self::Target {
306 &mut self.0
307 }
308}
309
310impl<T, C> Deferred<T> for SyncCodec<C> {
311 fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
312 None
313 }
314}
315
316impl<T: Default> Default for SyncCodec<T> {
317 fn default() -> Self {
318 Self(T::default())
319 }
320}
321
322impl<T, I> tokio_util::codec::Encoder<I> for SyncCodec<T>
323where
324 T: tokio_util::codec::Encoder<I>,
325{
326 type Error = T::Error;
327
328 fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
329 self.0.encode(item, dst)
330 }
331}
332
333impl<T> tokio_util::codec::Decoder for SyncCodec<T>
334where
335 T: tokio_util::codec::Decoder,
336{
337 type Item = T::Item;
338 type Error = T::Error;
339
340 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
341 self.0.decode(src)
342 }
343
344 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
345 self.0.decode_eof(buf)
346 }
347
348 fn framed<IO: AsyncRead + AsyncWrite + Sized>(
349 self,
350 io: IO,
351 ) -> tokio_util::codec::Framed<IO, Self>
352 where
353 Self: Sized,
354 {
355 self.0.framed(io).map_codec(Self)
356 }
357}
358
359#[instrument(level = "trace", skip(w, deferred))]
360async fn handle_deferred<T, I>(w: T, deferred: I, mut path: Vec<usize>) -> std::io::Result<()>
361where
362 I: IntoIterator<Item = Option<DeferredFn<T>>>,
363 I::IntoIter: ExactSizeIterator,
364 T: crate::Index<T>,
365{
366 let mut futs = FuturesUnordered::default();
367 for (i, f) in zip(0.., deferred) {
368 if let Some(f) = f {
369 path.push(i);
370 let w = w.index(&path).map_err(std::io::Error::other)?;
371 path.pop();
372 futs.push(f(w, Vec::default()));
373 }
374 }
375 while let Some(()) = futs.try_next().await? {}
376 Ok(())
377}
378
379pub trait Encode<T>: Sized {
381 type Encoder: tokio_util::codec::Encoder<Self> + Deferred<T> + Default + Send;
383
384 #[instrument(level = "trace", skip(self, enc))]
386 fn encode(
387 self,
388 enc: &mut Self::Encoder,
389 dst: &mut BytesMut,
390 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
391 {
392 enc.encode(self, dst)?;
393 Ok(enc.take_deferred())
394 }
395
396 #[instrument(level = "trace", skip(items, enc))]
398 fn encode_iter_own<I>(
399 items: I,
400 enc: &mut Self::Encoder,
401 dst: &mut BytesMut,
402 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
403 where
404 I: IntoIterator<Item = Self>,
405 I::IntoIter: ExactSizeIterator,
406 T: crate::Index<T> + Send + Sync + 'static,
407 {
408 let items = items.into_iter();
409 dst.reserve(items.len());
410 let mut deferred = Vec::with_capacity(items.len());
411 for item in items {
412 enc.encode(item, dst)?;
413 deferred.push(enc.take_deferred());
414 }
415 if deferred.iter().any(Option::is_some) {
416 Ok(Some(Box::new(move |w, path| {
417 Box::pin(handle_deferred(w, deferred, path))
418 })))
419 } else {
420 Ok(None)
421 }
422 }
423
424 #[instrument(level = "trace", skip(items, enc))]
426 fn encode_iter_ref<'a, I>(
427 items: I,
428 enc: &mut Self::Encoder,
429 dst: &mut BytesMut,
430 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
431 where
432 I: IntoIterator<Item = &'a Self>,
433 I::IntoIter: ExactSizeIterator,
434 T: crate::Index<T> + Send + Sync + 'static,
435 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
436 {
437 let items = items.into_iter();
438 dst.reserve(items.len());
439 let mut deferred = Vec::with_capacity(items.len());
440 for item in items {
441 enc.encode(item, dst)?;
442 deferred.push(enc.take_deferred());
443 }
444 if deferred.iter().any(Option::is_some) {
445 Ok(Some(Box::new(move |w, path| {
446 Box::pin(handle_deferred(w, deferred, path))
447 })))
448 } else {
449 Ok(None)
450 }
451 }
452
453 #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
455 fn encode_list_own(
456 items: Vec<Self>,
457 enc: &mut Self::Encoder,
458 dst: &mut BytesMut,
459 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
460 where
461 T: crate::Index<T> + Send + Sync + 'static,
462 {
463 let n = u32::try_from(items.len())
464 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
465 dst.reserve(5 + items.len());
466 Leb128Encoder.encode(n, dst)?;
467 Self::encode_iter_own(items, enc, dst)
468 }
469
470 #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
472 fn encode_list_ref<'a>(
473 items: &'a [Self],
474 enc: &mut Self::Encoder,
475 dst: &mut BytesMut,
476 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
477 where
478 T: crate::Index<T> + Send + Sync + 'static,
479 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
480 {
481 let n = u32::try_from(items.len())
482 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
483 dst.reserve(5 + items.len());
484 Leb128Encoder.encode(n, dst)?;
485 Self::encode_iter_ref(items, enc, dst)
486 }
487}
488
489pub trait Decode<T>: Sized {
491 type Decoder: tokio_util::codec::Decoder<Item = Self>
493 + Deferred<Incoming<T>>
494 + Default
495 + Send
496 + 'static;
497 type ListDecoder: tokio_util::codec::Decoder<Item = Vec<Self>> + Default + 'static;
499}
500
501impl<T, W> Deferred<W> for OptionEncoder<T>
502where
503 T: Deferred<W>,
504{
505 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
506 self.0.take_deferred()
507 }
508}
509
510impl<T, W> Encode<W> for Option<T>
511where
512 T: Encode<W>,
513{
514 type Encoder = OptionEncoder<T::Encoder>;
515}
516
517impl<'a, T, W> Encode<W> for &'a Option<T>
518where
519 T: Encode<W>,
520 T::Encoder: tokio_util::codec::Encoder<&'a T>,
521{
522 type Encoder = OptionEncoder<T::Encoder>;
523}
524
525impl<T, W> Deferred<W> for OptionDecoder<T>
526where
527 T: Deferred<W> + Default,
528{
529 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
530 mem::take(self).into_inner().take_deferred()
531 }
532}
533
534impl<T, R> Decode<R> for Option<T>
535where
536 T: Decode<R>,
537 R: crate::Index<R> + Send + 'static,
538{
539 type Decoder = OptionDecoder<T::Decoder>;
540 type ListDecoder = ListDecoder<Self::Decoder, R>;
541}
542
543impl<O, E, W> Deferred<W> for ResultEncoder<O, E>
544where
545 O: Deferred<W>,
546 E: Deferred<W>,
547{
548 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
549 match (self.ok.take_deferred(), self.err.take_deferred()) {
550 (None, None) => None,
551 (Some(ok), None) => Some(ok),
552 (None, Some(err)) => Some(err),
553 (Some(ok), Some(_)) => {
554 if cfg!(debug_assertions) {
555 panic!("both `result::ok` and `result::err` deferred function set")
556 }
557 Some(ok)
558 }
559 }
560 }
561}
562
563impl<O, E, W> Encode<W> for Result<O, E>
564where
565 O: Encode<W>,
566 E: Encode<W>,
567 std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<O>>::Error>,
568 std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<E>>::Error>,
569{
570 type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
571}
572
573impl<'a, O, E, W> Encode<W> for &'a Result<O, E>
574where
575 O: Encode<W>,
576 O::Encoder: tokio_util::codec::Encoder<&'a O>,
577 E: Encode<W>,
578 E::Encoder: tokio_util::codec::Encoder<&'a E>,
579 std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<&'a O>>::Error>,
580 std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<&'a E>>::Error>,
581{
582 type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
583}
584
585impl<O, E, W> Deferred<W> for ResultDecoder<O, E>
586where
587 O: Deferred<W> + Default,
588 E: Deferred<W> + Default,
589{
590 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
591 let (mut ok, mut err) = mem::take(self).into_inner();
592 match (ok.take_deferred(), err.take_deferred()) {
593 (None, None) => None,
594 (Some(ok), None) => Some(ok),
595 (None, Some(err)) => Some(err),
596 (Some(ok), Some(_)) => {
597 if cfg!(debug_assertions) {
598 panic!("both `result::ok` and `result::err` deferred function set")
599 }
600 Some(ok)
601 }
602 }
603 }
604}
605
606impl<O, E, R> Decode<R> for Result<O, E>
607where
608 O: Decode<R>,
609 E: Decode<R>,
610 R: crate::Index<R> + Send + 'static,
611 std::io::Error: From<<O::Decoder as tokio_util::codec::Decoder>::Error>,
612 std::io::Error: From<<E::Decoder as tokio_util::codec::Decoder>::Error>,
613{
614 type Decoder = ResultDecoder<O::Decoder, E::Decoder>;
615 type ListDecoder = ListDecoder<Self::Decoder, R>;
616}
617
618pub struct ListEncoder<W> {
620 deferred: Option<DeferredFn<W>>,
621}
622
623impl<W> Default for ListEncoder<W> {
624 fn default() -> Self {
625 Self { deferred: None }
626 }
627}
628
629impl<W> Deferred<W> for ListEncoder<W> {
630 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
631 self.deferred.take()
632 }
633}
634
635impl<T, W> tokio_util::codec::Encoder<Vec<T>> for ListEncoder<W>
636where
637 T: Encode<W>,
638 W: crate::Index<W> + Send + Sync + 'static,
639{
640 type Error = <T::Encoder as tokio_util::codec::Encoder<T>>::Error;
641
642 fn encode(&mut self, items: Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
643 let mut enc = T::Encoder::default();
644 self.deferred = T::encode_list_own(items, &mut enc, dst)?;
645 Ok(())
646 }
647}
648
649impl<'a, T, W> tokio_util::codec::Encoder<&'a Vec<T>> for ListEncoder<W>
650where
651 T: Encode<W>,
652 T::Encoder: tokio_util::codec::Encoder<&'a T>,
653 W: crate::Index<W> + Send + Sync + 'static,
654{
655 type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
656
657 fn encode(&mut self, items: &'a Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
658 let mut enc = T::Encoder::default();
659 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
660 Ok(())
661 }
662}
663
664impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b Vec<T>> for ListEncoder<W>
665where
666 T: Encode<W>,
667 T::Encoder: tokio_util::codec::Encoder<&'b T>,
668 W: crate::Index<W> + Send + Sync + 'static,
669{
670 type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
671
672 fn encode(&mut self, items: &'a &'b Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
673 let mut enc = T::Encoder::default();
674 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
675 Ok(())
676 }
677}
678
679impl<'a, T, W> tokio_util::codec::Encoder<&'a [T]> for ListEncoder<W>
680where
681 T: Encode<W>,
682 T::Encoder: tokio_util::codec::Encoder<&'a T>,
683 W: crate::Index<W> + Send + Sync + 'static,
684{
685 type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
686
687 fn encode(&mut self, items: &'a [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
688 let mut enc = T::Encoder::default();
689 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
690 Ok(())
691 }
692}
693
694impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b [T]> for ListEncoder<W>
695where
696 T: Encode<W>,
697 T::Encoder: tokio_util::codec::Encoder<&'b T>,
698 W: crate::Index<W> + Send + Sync + 'static,
699{
700 type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
701
702 fn encode(&mut self, items: &'a &'b [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
703 let mut enc = T::Encoder::default();
704 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
705 Ok(())
706 }
707}
708
709impl<T, W> Encode<W> for Vec<T>
710where
711 T: Encode<W>,
712 W: crate::Index<W> + Send + Sync + 'static,
713{
714 type Encoder = ListEncoder<W>;
715}
716
717impl<'a, T, W> Encode<W> for &'a Vec<T>
718where
719 T: Encode<W>,
720 T::Encoder: tokio_util::codec::Encoder<&'a T>,
721 W: crate::Index<W> + Send + Sync + 'static,
722{
723 type Encoder = ListEncoder<W>;
724}
725
726impl<'a, T, W> Encode<W> for &'a [T]
727where
728 T: Encode<W>,
729 T::Encoder: tokio_util::codec::Encoder<&'a T>,
730 W: crate::Index<W> + Send + Sync + 'static,
731{
732 type Encoder = ListEncoder<W>;
733}
734
735pub struct ListDecoder<T, R>
737where
738 T: tokio_util::codec::Decoder,
739{
740 dec: T,
741 ret: Vec<T::Item>,
742 cap: usize,
743 deferred: Vec<Option<DeferredFn<Incoming<R>>>>,
744}
745
746impl<T, R> ListDecoder<T, R>
747where
748 T: tokio_util::codec::Decoder,
749{
750 pub fn new(dec: T) -> Self {
752 Self {
753 dec,
754 ret: Vec::default(),
755 cap: 0,
756 deferred: vec![],
757 }
758 }
759}
760
761impl<T, R> Default for ListDecoder<T, R>
762where
763 T: tokio_util::codec::Decoder + Default,
764{
765 fn default() -> Self {
766 Self::new(T::default())
767 }
768}
769
770impl<T, R> Deferred<Incoming<R>> for ListDecoder<T, R>
771where
772 T: tokio_util::codec::Decoder,
773 R: crate::Index<R> + Send + Sync + 'static,
774{
775 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
776 let deferred = mem::take(&mut self.deferred);
777 if deferred.iter().any(Option::is_some) {
778 Some(Box::new(|r, path| {
779 Box::pin(handle_deferred(r, deferred, path))
780 }))
781 } else {
782 None
783 }
784 }
785}
786
787impl<T, R> tokio_util::codec::Decoder for ListDecoder<T, R>
788where
789 T: tokio_util::codec::Decoder + Deferred<Incoming<R>>,
790{
791 type Item = Vec<T::Item>;
792 type Error = T::Error;
793
794 #[instrument(level = "trace", skip(self), fields(ty = "list"))]
795 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
796 if self.cap == 0 {
797 let Some(len) = Leb128DecoderU32.decode(src)? else {
798 return Ok(None);
799 };
800 if len == 0 {
801 return Ok(Some(Vec::default()));
802 }
803 let len = len
804 .try_into()
805 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
806 self.ret = Vec::with_capacity(len);
807 self.deferred = Vec::with_capacity(len);
808 self.cap = len;
809 }
810 while self.cap > 0 {
811 let Some(v) = self.dec.decode(src)? else {
812 return Ok(None);
813 };
814 self.ret.push(v);
815 self.deferred.push(self.dec.take_deferred());
816 self.cap -= 1;
817 }
818 Ok(Some(mem::take(&mut self.ret)))
819 }
820}
821
822impl<T, R> Decode<R> for Vec<T>
823where
824 T: Decode<R> + Send,
825 T::ListDecoder: Deferred<Incoming<R>> + Send,
826 R: crate::Index<R> + Send + 'static,
827{
828 type Decoder = T::ListDecoder;
829 type ListDecoder = ListDecoder<Self::Decoder, R>;
830}
831
832macro_rules! impl_copy_codec {
833 ($t:ty, $c:tt) => {
834 impl<W> Encode<W> for $t {
835 type Encoder = $c;
836
837 #[instrument(level = "trace", skip(items))]
838 fn encode_iter_own<I>(
839 items: I,
840 enc: &mut Self::Encoder,
841 dst: &mut BytesMut,
842 ) -> Result<
843 Option<DeferredFn<W>>,
844 <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
845 >
846 where
847 I: IntoIterator<Item = Self>,
848 I::IntoIter: ExactSizeIterator,
849 {
850 let items = items.into_iter();
851 dst.reserve(items.len());
852 for item in items {
853 enc.encode(item, dst)?;
854 }
855 Ok(None)
856 }
857
858 #[instrument(level = "trace", skip(items))]
859 fn encode_iter_ref<'a, I>(
860 items: I,
861 enc: &mut Self::Encoder,
862 dst: &mut BytesMut,
863 ) -> Result<
864 Option<DeferredFn<W>>,
865 <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
866 >
867 where
868 I: IntoIterator<Item = &'a Self>,
869 I::IntoIter: ExactSizeIterator,
870 {
871 let items = items.into_iter();
872 dst.reserve(items.len());
873 for item in items {
874 enc.encode(*item, dst)?;
875 }
876 Ok(None)
877 }
878 }
879
880 impl<'b, W> Encode<W> for &'b $t {
881 type Encoder = $c;
882
883 #[instrument(level = "trace", skip(items))]
884 fn encode_iter_own<I>(
885 items: I,
886 enc: &mut Self::Encoder,
887 dst: &mut BytesMut,
888 ) -> Result<
889 Option<DeferredFn<W>>,
890 <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
891 >
892 where
893 I: IntoIterator<Item = Self>,
894 I::IntoIter: ExactSizeIterator,
895 {
896 let items = items.into_iter();
897 dst.reserve(items.len());
898 for item in items {
899 enc.encode(*item, dst)?;
900 }
901 Ok(None)
902 }
903
904 #[instrument(level = "trace", skip(items))]
905 fn encode_iter_ref<'a, I>(
906 items: I,
907 enc: &mut Self::Encoder,
908 dst: &mut BytesMut,
909 ) -> Result<
910 Option<DeferredFn<W>>,
911 <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
912 >
913 where
914 I: IntoIterator<Item = &'a Self>,
915 I::IntoIter: ExactSizeIterator,
916 'b: 'a,
917 {
918 let items = items.into_iter();
919 dst.reserve(items.len());
920 for item in items {
921 enc.encode(item, dst)?;
922 }
923 Ok(None)
924 }
925 }
926
927 impl<R> Decode<R> for $t {
928 type Decoder = $c;
929 type ListDecoder = CoreVecDecoder<Self::Decoder>;
930 }
931 };
932}
933
934const CANONICAL_NAN_F32: u32 = 0x7fc0_0000;
941const CANONICAL_NAN_F64: u64 = 0x7ff8_0000_0000_0000;
942
943macro_rules! impl_canonical_nan_codec {
947 ($name:ident, $inner:ty, $t:ty, $canon:expr) => {
948 #[doc = concat!("Canonicalizes `NaN`s on encode, wrapping [`", stringify!($inner), "`].")]
949 #[derive(Debug, Default)]
950 pub struct $name($inner);
951
952 impl tokio_util::codec::Encoder<$t> for $name {
953 type Error = std::io::Error;
954
955 fn encode(&mut self, item: $t, dst: &mut BytesMut) -> Result<(), Self::Error> {
956 let item = if item.is_nan() {
957 <$t>::from_bits($canon)
958 } else {
959 item
960 };
961 self.0.encode(item, dst)
962 }
963 }
964
965 impl tokio_util::codec::Encoder<&$t> for $name {
966 type Error = std::io::Error;
967
968 fn encode(&mut self, item: &$t, dst: &mut BytesMut) -> Result<(), Self::Error> {
969 tokio_util::codec::Encoder::<$t>::encode(self, *item, dst)
970 }
971 }
972
973 impl tokio_util::codec::Encoder<&&$t> for $name {
974 type Error = std::io::Error;
975
976 fn encode(&mut self, item: &&$t, dst: &mut BytesMut) -> Result<(), Self::Error> {
977 tokio_util::codec::Encoder::<$t>::encode(self, **item, dst)
978 }
979 }
980
981 impl tokio_util::codec::Decoder for $name {
982 type Item = $t;
983 type Error = std::io::Error;
984
985 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<$t>, Self::Error> {
986 self.0.decode(src)
987 }
988 }
989
990 impl_deferred_sync!($name);
991 impl_deferred_sync!(CoreVecDecoder<$name>);
992 };
993}
994
995impl_canonical_nan_codec!(CanonicalNanF32Codec, F32Codec, f32, CANONICAL_NAN_F32);
996impl_canonical_nan_codec!(CanonicalNanF64Codec, F64Codec, f64, CANONICAL_NAN_F64);
997
998impl_copy_codec!(bool, BoolCodec);
999impl_copy_codec!(i8, S8Codec);
1000impl_copy_codec!(i16, S16Codec);
1001impl_copy_codec!(u16, U16Codec);
1002impl_copy_codec!(i32, S32Codec);
1003impl_copy_codec!(u32, U32Codec);
1004impl_copy_codec!(i64, S64Codec);
1005impl_copy_codec!(u64, U64Codec);
1006impl_copy_codec!(f32, CanonicalNanF32Codec);
1007impl_copy_codec!(f64, CanonicalNanF64Codec);
1008impl_copy_codec!(char, Utf8Codec);
1009
1010impl<T> Encode<T> for u8 {
1011 type Encoder = U8Codec;
1012
1013 #[instrument(level = "trace", skip(items))]
1014 fn encode_iter_own<I>(
1015 items: I,
1016 enc: &mut Self::Encoder,
1017 dst: &mut BytesMut,
1018 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1019 where
1020 I: IntoIterator<Item = Self>,
1021 I::IntoIter: ExactSizeIterator,
1022 {
1023 let items = items.into_iter();
1024 dst.reserve(items.len());
1025 dst.extend(items);
1026 Ok(None)
1027 }
1028
1029 #[instrument(level = "trace", skip(items))]
1030 fn encode_iter_ref<'a, I>(
1031 items: I,
1032 enc: &mut Self::Encoder,
1033 dst: &mut BytesMut,
1034 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1035 where
1036 I: IntoIterator<Item = &'a Self>,
1037 I::IntoIter: ExactSizeIterator,
1038 {
1039 let items = items.into_iter();
1040 dst.reserve(items.len());
1041 dst.extend(items);
1042 Ok(None)
1043 }
1044
1045 #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
1046 fn encode_list_own(
1047 items: Vec<Self>,
1048 enc: &mut Self::Encoder,
1049 dst: &mut BytesMut,
1050 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1051 {
1052 CoreVecEncoderBytes.encode(items, dst)?;
1053 Ok(None)
1054 }
1055
1056 #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
1057 fn encode_list_ref<'a>(
1058 items: &'a [Self],
1059 enc: &mut Self::Encoder,
1060 dst: &mut BytesMut,
1061 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1062 where
1063 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
1064 {
1065 CoreVecEncoderBytes.encode(items, dst)?;
1066 Ok(None)
1067 }
1068}
1069
1070impl<'b, T> Encode<T> for &'b u8 {
1071 type Encoder = U8Codec;
1072
1073 #[instrument(level = "trace", skip(items))]
1074 fn encode_iter_own<I>(
1075 items: I,
1076 enc: &mut Self::Encoder,
1077 dst: &mut BytesMut,
1078 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1079 where
1080 I: IntoIterator<Item = Self>,
1081 I::IntoIter: ExactSizeIterator,
1082 {
1083 let items = items.into_iter();
1084 dst.reserve(items.len());
1085 dst.extend(items);
1086 Ok(None)
1087 }
1088
1089 #[instrument(level = "trace", skip(items))]
1090 fn encode_iter_ref<'a, I>(
1091 items: I,
1092 enc: &mut Self::Encoder,
1093 dst: &mut BytesMut,
1094 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1095 where
1096 I: IntoIterator<Item = &'a Self>,
1097 I::IntoIter: ExactSizeIterator,
1098 'b: 'a,
1099 {
1100 let items = items.into_iter();
1101 dst.reserve(items.len());
1102 dst.extend(items.map(|b| **b));
1103 Ok(None)
1104 }
1105}
1106
1107#[derive(Debug, Default)]
1109#[repr(transparent)]
1110pub struct ListDecoderU8(CoreVecDecoderBytes);
1111
1112impl tokio_util::codec::Decoder for ListDecoderU8 {
1113 type Item = Vec<u8>;
1114 type Error = <CoreVecDecoderBytes as tokio_util::codec::Decoder>::Error;
1115
1116 #[instrument(level = "trace", skip(self), fields(ty = "list<u8>"))]
1117 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1118 let Some(buf) = self.0.decode(src)? else {
1119 return Ok(None);
1120 };
1121 Ok(Some(buf.into()))
1122 }
1123}
1124
1125impl<R> Decode<R> for u8 {
1126 type Decoder = U8Codec;
1127 type ListDecoder = ListDecoderU8;
1128}
1129
1130impl<W> Encode<W> for &str {
1131 type Encoder = CoreNameEncoder;
1132}
1133
1134impl<W> Encode<W> for &&str {
1135 type Encoder = CoreNameEncoder;
1136}
1137
1138impl<W> Encode<W> for String {
1139 type Encoder = CoreNameEncoder;
1140}
1141
1142impl<W> Encode<W> for &String {
1143 type Encoder = CoreNameEncoder;
1144}
1145
1146impl<R> Decode<R> for String {
1147 type Decoder = CoreNameDecoder;
1148 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1149}
1150
1151impl<W> Encode<W> for Bytes {
1152 type Encoder = CoreVecEncoderBytes;
1153}
1154
1155impl<W> Encode<W> for &Bytes {
1156 type Encoder = CoreVecEncoderBytes;
1157}
1158
1159impl<R> Decode<R> for Bytes {
1160 type Decoder = CoreVecDecoderBytes;
1161 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1162}
1163
1164#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1166#[repr(transparent)]
1167pub struct ResourceEncoder;
1168
1169impl<T: ?Sized> tokio_util::codec::Encoder<ResourceOwn<T>> for ResourceEncoder {
1170 type Error = std::io::Error;
1171
1172 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1173 fn encode(&mut self, item: ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1174 CoreVecEncoderBytes.encode(item.repr, dst)
1175 }
1176}
1177
1178impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceOwn<T>> for ResourceEncoder {
1179 type Error = std::io::Error;
1180
1181 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1182 fn encode(&mut self, item: &ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1183 CoreVecEncoderBytes.encode(&item.repr, dst)
1184 }
1185}
1186
1187impl<T: ?Sized, W> Encode<W> for ResourceOwn<T> {
1188 type Encoder = ResourceEncoder;
1189}
1190
1191impl<T: ?Sized, W> Encode<W> for &ResourceOwn<T> {
1192 type Encoder = ResourceEncoder;
1193}
1194
1195impl<T: ?Sized> tokio_util::codec::Encoder<ResourceBorrow<T>> for ResourceEncoder {
1196 type Error = std::io::Error;
1197
1198 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1199 fn encode(&mut self, item: ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1200 CoreVecEncoderBytes.encode(item.repr, dst)
1201 }
1202}
1203
1204impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceBorrow<T>> for ResourceEncoder {
1205 type Error = std::io::Error;
1206
1207 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1208 fn encode(&mut self, item: &ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1209 CoreVecEncoderBytes.encode(&item.repr, dst)
1210 }
1211}
1212
1213impl<T: ?Sized, W> Encode<W> for ResourceBorrow<T> {
1214 type Encoder = ResourceEncoder;
1215}
1216
1217impl<T: ?Sized, W> Encode<W> for &ResourceBorrow<T> {
1218 type Encoder = ResourceEncoder;
1219}
1220
1221#[derive(Debug)]
1223#[repr(transparent)]
1224pub struct ResourceBorrowDecoder<T: ?Sized> {
1225 dec: CoreVecDecoderBytes,
1226 _ty: PhantomData<T>,
1227}
1228
1229impl<T: ?Sized> Default for ResourceBorrowDecoder<T> {
1230 fn default() -> Self {
1231 Self {
1232 dec: CoreVecDecoderBytes::default(),
1233 _ty: PhantomData,
1234 }
1235 }
1236}
1237
1238impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceBorrowDecoder<T> {
1239 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1240 None
1241 }
1242}
1243
1244impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceBorrowDecoder<T>> {
1245 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1246 None
1247 }
1248}
1249
1250impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceBorrow<T> {
1251 type Decoder = ResourceBorrowDecoder<T>;
1252 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1253}
1254
1255impl<T: ?Sized> tokio_util::codec::Decoder for ResourceBorrowDecoder<T> {
1256 type Item = ResourceBorrow<T>;
1257 type Error = std::io::Error;
1258
1259 #[instrument(level = "trace", skip(self), fields(ty = "borrow"))]
1260 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1261 let repr = self.dec.decode(src)?;
1262 Ok(repr.map(Self::Item::from))
1263 }
1264}
1265
1266#[derive(Debug)]
1268#[repr(transparent)]
1269pub struct ResourceOwnDecoder<T: ?Sized> {
1270 dec: CoreVecDecoderBytes,
1271 _ty: PhantomData<T>,
1272}
1273
1274impl<T: ?Sized> Default for ResourceOwnDecoder<T> {
1275 fn default() -> Self {
1276 Self {
1277 dec: CoreVecDecoderBytes::default(),
1278 _ty: PhantomData,
1279 }
1280 }
1281}
1282
1283impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceOwnDecoder<T> {
1284 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1285 None
1286 }
1287}
1288
1289impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceOwnDecoder<T>> {
1290 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1291 None
1292 }
1293}
1294
1295impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceOwn<T> {
1296 type Decoder = ResourceOwnDecoder<T>;
1297 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1298}
1299
1300impl<T: ?Sized> tokio_util::codec::Decoder for ResourceOwnDecoder<T> {
1301 type Item = ResourceOwn<T>;
1302 type Error = std::io::Error;
1303
1304 #[instrument(level = "trace", skip(self), fields(ty = "own"))]
1305 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1306 let repr = self.dec.decode(src)?;
1307 Ok(repr.map(Self::Item::from))
1308 }
1309}
1310
1311#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1313#[repr(transparent)]
1314pub struct UnitCodec;
1315
1316impl tokio_util::codec::Encoder<()> for UnitCodec {
1317 type Error = std::io::Error;
1318
1319 #[instrument(level = "trace", skip(self), ret)]
1320 fn encode(&mut self, (): (), dst: &mut BytesMut) -> std::io::Result<()> {
1321 Ok(())
1322 }
1323}
1324
1325impl tokio_util::codec::Encoder<&()> for UnitCodec {
1326 type Error = std::io::Error;
1327
1328 #[instrument(level = "trace", skip(self), ret)]
1329 fn encode(&mut self, (): &(), dst: &mut BytesMut) -> std::io::Result<()> {
1330 Ok(())
1331 }
1332}
1333
1334impl tokio_util::codec::Decoder for UnitCodec {
1335 type Item = ();
1336 type Error = std::io::Error;
1337
1338 #[instrument(level = "trace", skip(self))]
1339 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1340 Ok(Some(()))
1341 }
1342}
1343
1344pub trait TupleEncode<W>: Encode<W> {}
1346
1347pub trait TupleDecode<R>: Decode<R> {}
1349
1350impl<W> Encode<W> for () {
1351 type Encoder = UnitCodec;
1352}
1353
1354impl<W> TupleEncode<W> for () {}
1355
1356impl<R> Decode<R> for () {
1357 type Decoder = UnitCodec;
1358 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1359}
1360
1361impl<R> TupleDecode<R> for () {}
1362
1363macro_rules! impl_tuple_codec {
1364 ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+; $($ct:ident),+) => {
1365 impl<W, $($ct),+> Deferred<W> for TupleEncoder::<($($ct),+,)>
1366 where
1367 W: crate::Index<W> + Send + Sync + 'static,
1368 $($ct: Deferred<W> + Default + 'static),+
1369 {
1370 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1371 let Self(($(mut $cn),+,)) = mem::take(self);
1372 let deferred = [ $($cn.take_deferred()),+ ];
1373 if deferred.iter().any(Option::is_some) {
1374 Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1375 } else {
1376 None
1377 }
1378 }
1379 }
1380
1381 impl<W, E, $($vt),+> Encode<W> for ($($vt),+,)
1382 where
1383 W: crate::Index<W> + Send + Sync + 'static,
1384 E: From<std::io::Error>,
1385 $(
1386 $vt: Encode<W>,
1387 $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1388 )+
1389 {
1390 type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1391 }
1392
1393 impl<W, E, $($vt),+> TupleEncode<W> for ($($vt),+,)
1394 where
1395 W: crate::Index<W> + Send + Sync + 'static,
1396 E: From<std::io::Error>,
1397 $(
1398 $vt: Encode<W>,
1399 $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1400 )+
1401 {
1402 }
1403
1404 impl<'a, W, E, $($vt),+> Encode<W> for &'a ($($vt),+,)
1405 where
1406 W: crate::Index<W> + Send + Sync + 'static,
1407 E: From<std::io::Error>,
1408 $(
1409 $vt: Encode<W>,
1410 $vt::Encoder: tokio_util::codec::Encoder<&'a $vt, Error = E> + 'static,
1411 )+
1412 {
1413 type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1414 }
1415
1416 impl<R, $($vt),+> Deferred<Incoming<R>> for TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>
1417 where
1418 R: crate::Index<R> + Send + Sync + 'static,
1419 $($vt: Decode<R>),+
1420 {
1421 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1422 let ($(mut $cn),+,) = mem::take(self).into_inner();
1423 let deferred = [ $($cn.take_deferred()),+ ];
1424 if deferred.iter().any(Option::is_some) {
1425 Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1426 } else {
1427 None
1428 }
1429 }
1430 }
1431
1432 impl<R, E, $($vt),+> Decode<R> for ($($vt),+,)
1433 where
1434 R: crate::Index<R> + Send + Sync + 'static,
1435 E: From<std::io::Error>,
1436 $(
1437 $vt: Decode<R> + Send + 'static,
1438 $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1439 )+
1440 {
1441 type Decoder = TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>;
1442 type ListDecoder = ListDecoder<Self::Decoder, R>;
1443 }
1444
1445 impl<R, E, $($vt),+> TupleDecode<R> for ($($vt),+,)
1446 where
1447 R: crate::Index<R> + Send + Sync + 'static,
1448 E: From<std::io::Error>,
1449 $(
1450 $vt: Decode<R> + Send + 'static,
1451 $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1452 )+
1453 {
1454 }
1455 };
1456}
1457
1458impl_tuple_codec!(
1459 v0;
1460 V0;
1461 c0;
1462 C0
1463);
1464
1465impl_tuple_codec!(
1466 v0, v1;
1467 V0, V1;
1468 c0, c1;
1469 C0, C1
1470);
1471
1472impl_tuple_codec!(
1473 v0, v1, v2;
1474 V0, V1, V2;
1475 c0, c1, c2;
1476 C0, C1, C2
1477);
1478
1479impl_tuple_codec!(
1480 v0, v1, v2, v3;
1481 V0, V1, V2, V3;
1482 c0, c1, c2, c3;
1483 C0, C1, C2, C3
1484);
1485
1486impl_tuple_codec!(
1487 v0, v1, v2, v3, v4;
1488 V0, V1, V2, V3, V4;
1489 c0, c1, c2, c3, c4;
1490 C0, C1, C2, C3, C4
1491);
1492
1493impl_tuple_codec!(
1494 v0, v1, v2, v3, v4, v5;
1495 V0, V1, V2, V3, V4, V5;
1496 c0, c1, c2, c3, c4, c5;
1497 C0, C1, C2, C3, C4, C5
1498);
1499
1500impl_tuple_codec!(
1501 v0, v1, v2, v3, v4, v5, v6;
1502 V0, V1, V2, V3, V4, V5, V6;
1503 c0, c1, c2, c3, c4, c5, c6;
1504 C0, C1, C2, C3, C4, C5, C6
1505);
1506
1507impl_tuple_codec!(
1508 v0, v1, v2, v3, v4, v5, v6, v7;
1509 V0, V1, V2, V3, V4, V5, V6, V7;
1510 c0, c1, c2, c3, c4, c5, c6, c7;
1511 C0, C1, C2, C3, C4, C5, C6, C7
1512);
1513
1514impl_tuple_codec!(
1515 v0, v1, v2, v3, v4, v5, v6, v7, v8;
1516 V0, V1, V2, V3, V4, V5, V6, V7, V8;
1517 c0, c1, c2, c3, c4, c5, c6, c7, c8;
1518 C0, C1, C2, C3, C4, C5, C6, C7, C8
1519);
1520
1521impl_tuple_codec!(
1522 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9;
1523 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9;
1524 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9;
1525 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9
1526);
1527
1528impl_tuple_codec!(
1529 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10;
1530 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10;
1531 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10;
1532 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10
1533);
1534
1535impl_tuple_codec!(
1536 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11;
1537 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11;
1538 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11;
1539 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11
1540);
1541
1542impl_tuple_codec!(
1543 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12;
1544 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12;
1545 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
1546 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12
1547);
1548
1549impl_tuple_codec!(
1550 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13;
1551 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13;
1552 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13;
1553 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13
1554);
1555
1556impl_tuple_codec!(
1557 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14;
1558 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14;
1559 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14;
1560 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14
1561);
1562
1563impl_tuple_codec!(
1564 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15;
1565 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15;
1566 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15;
1567 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15
1568);
1569
1570pub struct FutureEncoder<W> {
1572 deferred: Option<DeferredFn<W>>,
1573}
1574
1575impl<W> Default for FutureEncoder<W> {
1576 fn default() -> Self {
1577 Self { deferred: None }
1578 }
1579}
1580
1581impl<W> Deferred<W> for FutureEncoder<W> {
1582 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1583 self.deferred.take()
1584 }
1585}
1586
1587impl<T, W, Fut> tokio_util::codec::Encoder<Fut> for FutureEncoder<W>
1588where
1589 T: Encode<W>,
1590 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1591 Fut: Future<Output = T> + Send + 'static,
1592 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1593{
1594 type Error = std::io::Error;
1595
1596 #[instrument(level = "trace", skip(self, item), fields(ty = "future"))]
1597 fn encode(&mut self, item: Fut, dst: &mut BytesMut) -> std::io::Result<()> {
1598 dst.reserve(1);
1600 dst.put_u8(0x00);
1601 let span = Span::current();
1602 self.deferred = Some(Box::new(|mut w, path| {
1603 Box::pin(
1604 async move {
1605 if !path.is_empty() {
1606 w = w.index(&path).map_err(std::io::Error::other)?;
1607 };
1608 let item = item.await;
1609 let mut enc = T::Encoder::default();
1610 let mut buf = BytesMut::default();
1611 enc.encode(item, &mut buf)?;
1612 w.write_all(&buf).await?;
1613 if let Some(f) = enc.take_deferred() {
1614 f(w, Vec::default()).await
1615 } else {
1616 Ok(())
1617 }
1618 }
1619 .instrument(span),
1620 )
1621 }));
1622 Ok(())
1623 }
1624}
1625
1626impl<T, W> Encode<W> for Pin<Box<dyn Future<Output = T> + Send>>
1627where
1628 T: Encode<W> + 'static,
1629 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1630 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1631{
1632 type Encoder = FutureEncoder<W>;
1633}
1634
1635pub struct FutureDecoder<T, R>
1637where
1638 T: Decode<R>,
1639{
1640 dec: OptionDecoder<T::Decoder>,
1641 deferred: Option<DeferredFn<Incoming<R>>>,
1642 _ty: PhantomData<T>,
1643}
1644
1645impl<T, R> Default for FutureDecoder<T, R>
1646where
1647 T: Decode<R>,
1648{
1649 fn default() -> Self {
1650 Self {
1651 dec: OptionDecoder::default(),
1652 deferred: None,
1653 _ty: PhantomData,
1654 }
1655 }
1656}
1657
1658impl<T, R> Deferred<Incoming<R>> for FutureDecoder<T, R>
1659where
1660 T: Decode<R>,
1661{
1662 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1663 self.deferred.take()
1664 }
1665}
1666
1667impl<T, R> tokio_util::codec::Decoder for FutureDecoder<T, R>
1668where
1669 T: Decode<R> + Send + 'static,
1670 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1671 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1672{
1673 type Item = Pin<Box<dyn Future<Output = T> + Send>>;
1674 type Error = <T::Decoder as tokio_util::codec::Decoder>::Error;
1675
1676 #[instrument(level = "trace", skip(self), fields(ty = "future"))]
1677 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1678 let Some(item) = self.dec.decode(src)? else {
1679 return Ok(None);
1680 };
1681 if let Some(item) = item {
1682 self.deferred = self.dec.take_deferred();
1683 return Ok(Some(Box::pin(async { item })));
1684 }
1685
1686 let (tx, rx) = oneshot::channel();
1688 let dec = mem::take(&mut self.dec).into_inner();
1689 let span = Span::current();
1690 self.deferred = Some(Box::new(|mut r, path| {
1691 Box::pin(
1692 async move {
1693 if !path.is_empty() {
1694 r = r.index(&path).map_err(std::io::Error::other)?;
1695 };
1696 let mut dec = FramedRead::new(r, dec);
1697 trace!(?path, "receiving future element");
1698 let Some(item) = dec.next().await else {
1699 return Err(std::io::ErrorKind::UnexpectedEof.into());
1700 };
1701 let item = item?;
1702 if tx.send(item).is_err() {
1703 debug!("future receiver closed, discard data");
1704 return Ok(());
1705 }
1706 if let Some(rx) = dec.decoder_mut().take_deferred() {
1707 let buf = mem::take(dec.read_buffer_mut());
1708 let mut r = dec.into_inner();
1709 if !r.buffer.is_empty() {
1710 r.buffer.unsplit(buf);
1711 } else {
1712 r.buffer = buf;
1713 }
1714 rx(r, Vec::default()).await?;
1715 }
1716 Ok(())
1717 }
1718 .instrument(span),
1719 )
1720 }));
1721 Ok(Some(Box::pin(async {
1722 let Ok(ret) = rx.await else {
1723 error!("future I/O dropped");
1724 return pending().await;
1725 };
1726 ret
1727 })))
1728 }
1729}
1730
1731impl<T, R> Decode<R> for Pin<Box<dyn Future<Output = T> + Send>>
1732where
1733 T: Decode<R> + Send + 'static,
1734 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1735 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1736{
1737 type Decoder = FutureDecoder<T, R>;
1738 type ListDecoder = ListDecoder<Self::Decoder, R>;
1739}
1740
1741pub struct StreamEncoder<W> {
1743 deferred: Option<DeferredFn<W>>,
1744}
1745
1746impl<W> Default for StreamEncoder<W> {
1747 fn default() -> Self {
1748 Self { deferred: None }
1749 }
1750}
1751
1752impl<W> Deferred<W> for StreamEncoder<W> {
1753 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1754 self.deferred.take()
1755 }
1756}
1757
1758impl<T, W, S> tokio_util::codec::Encoder<S> for StreamEncoder<W>
1759where
1760 T: Encode<W> + Send + 'static,
1761 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1762 S: Stream<Item = Vec<T>> + Send + Unpin + 'static,
1763 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1764{
1765 type Error = std::io::Error;
1766
1767 #[instrument(level = "trace", skip(self, items), fields(ty = "stream"))]
1768 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1769 dst.reserve(1);
1771 dst.put_u8(0x00);
1772 let span = Span::current();
1773 self.deferred = Some(Box::new(|mut w, path| {
1774 Box::pin(async move {
1775 if !path.is_empty() {
1776 w = w.index(&path).map_err(std::io::Error::other)?;
1777 };
1778 let mut enc = T::Encoder::default();
1779 let mut buf = BytesMut::default();
1780 let mut tasks = JoinSet::new();
1781 let mut i = 0_u64;
1782 loop {
1783 select! {
1784 chunk = items.next() => {
1785 let Some(chunk) = chunk else {
1786 trace!("writing stream end");
1787 buf.reserve(1);
1788 buf.put_u8(0x00);
1789 w.write_all(&buf).await?;
1790 while let Some(res) = tasks.join_next().await {
1791 trace!(?res, "receiver task finished");
1792 res??;
1793 }
1794 return Ok(())
1795 };
1796 let n = u32::try_from(chunk.len()).map_err(|err| {
1797 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1798 })?;
1799 let end = i.checked_add(n.into()).ok_or_else(|| {
1800 std::io::Error::new(
1801 std::io::ErrorKind::InvalidInput,
1802 "stream element index would overflow u64",
1803 )
1804 })?;
1805 trace!(n, "encoding chunk length");
1806 Leb128Encoder.encode(n, &mut buf)?;
1807 trace!(i, buf = format!("{buf:02x?}"), "writing stream chunk items");
1808
1809 buf.reserve(chunk.len());
1810 for (i, item) in zip(i.., chunk) {
1811 enc.encode(item, &mut buf)?;
1812 if let Some(f) = enc.take_deferred() {
1813 let i = i
1814 .try_into()
1815 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
1816 let w = w.index(&[i]).map_err(std::io::Error::other)?;
1817 trace!("spawning transmit task");
1818 tasks.spawn(f(w, Vec::default()));
1819 }
1820 }
1821 i = end;
1822 }
1823 Some(res) = tasks.join_next() => {
1824 trace!(?res, "receiver task finished");
1825 res??;
1826 }
1827 res = w.write(&buf), if !buf.is_empty() => {
1828 let n = res?;
1829 trace!(?buf, n, "wrote bytes from buffer");
1830 buf.advance(n);
1831 }
1832 }
1833 }
1834 }.instrument(span))
1835 }));
1836 Ok(())
1837 }
1838}
1839
1840impl<T, W> Encode<W> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
1841where
1842 T: Encode<W> + Send + 'static,
1843 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1844 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1845{
1846 type Encoder = StreamEncoder<W>;
1847}
1848
1849pub struct StreamEncoderBytes<W> {
1851 deferred: Option<DeferredFn<W>>,
1852}
1853
1854impl<W> Default for StreamEncoderBytes<W> {
1855 fn default() -> Self {
1856 Self { deferred: None }
1857 }
1858}
1859
1860impl<W> Deferred<W> for StreamEncoderBytes<W> {
1861 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1862 self.deferred.take()
1863 }
1864}
1865
1866impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderBytes<W>
1867where
1868 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1869 S: Stream<Item = Bytes> + Send + Unpin + 'static,
1870{
1871 type Error = std::io::Error;
1872
1873 #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1874 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1875 dst.reserve(1);
1877 dst.put_u8(0x00);
1878 self.deferred = Some(Box::new(|mut w, path| {
1879 Box::pin(async move {
1880 if !path.is_empty() {
1881 w = w.index(&path).map_err(std::io::Error::other)?;
1882 };
1883 let mut buf = BytesMut::default();
1884 loop {
1885 select! {
1886 chunk = items.next() => {
1887 let Some(chunk) = chunk else {
1888 trace!("writing stream end");
1889 buf.reserve(1);
1890 buf.put_u8(0x00);
1891 return w.write_all(&buf).await
1892 };
1893 let n = u32::try_from(chunk.len()).map_err(|err| {
1894 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1895 })?;
1896 trace!(n, "encoding chunk length");
1897 Leb128Encoder.encode(n, &mut buf)?;
1898 buf.extend_from_slice(&chunk);
1899 }
1900 res = w.write(&buf), if !buf.is_empty() => {
1901 let n = res?;
1902 buf.advance(n);
1903 }
1904 }
1905 }
1906 })
1907 }));
1908 Ok(())
1909 }
1910}
1911
1912impl<W> Encode<W> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
1913where
1914 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1915{
1916 type Encoder = StreamEncoderBytes<W>;
1917}
1918
1919pub struct StreamEncoderRead<W> {
1921 deferred: Option<DeferredFn<W>>,
1922}
1923
1924impl<W> Default for StreamEncoderRead<W> {
1925 fn default() -> Self {
1926 Self { deferred: None }
1927 }
1928}
1929
1930impl<W> Deferred<W> for StreamEncoderRead<W> {
1931 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1932 self.deferred.take()
1933 }
1934}
1935
1936impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderRead<W>
1937where
1938 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1939 S: AsyncRead + Send + Unpin + 'static,
1940{
1941 type Error = std::io::Error;
1942
1943 #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1944 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1945 dst.reserve(1);
1947 dst.put_u8(0x00);
1948 self.deferred = Some(Box::new(|mut w, path| {
1949 Box::pin(async move {
1950 if !path.is_empty() {
1951 w = w.index(&path).map_err(std::io::Error::other)?;
1952 };
1953 let mut buf = BytesMut::default();
1954 let mut chunk = BytesMut::default();
1955 loop {
1956 select! {
1957 res = items.read_buf(&mut chunk) => {
1958 let n = res?;
1959 if n == 0 {
1960 trace!("writing stream end");
1961 buf.reserve(1);
1962 buf.put_u8(0x00);
1963 return w.write_all(&buf).await
1964 }
1965 let n = u32::try_from(n).map_err(|err| {
1966 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1967 })?;
1968 trace!(n, "encoding chunk length");
1969 Leb128Encoder.encode(n, &mut buf)?;
1970 buf.extend_from_slice(&chunk);
1971 chunk.clear();
1972 }
1973 res = w.write(&buf), if !buf.is_empty() => {
1974 let n = res?;
1975 buf.advance(n);
1976 }
1977 }
1978 }
1979 })
1980 }));
1981 Ok(())
1982 }
1983}
1984
1985impl<W> Encode<W> for Pin<Box<dyn AsyncRead + Send>>
1986where
1987 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1988{
1989 type Encoder = StreamEncoderRead<W>;
1990}
1991
1992impl<T, W> Encode<W> for std::io::Cursor<T>
1993where
1994 T: AsRef<[u8]> + Send + Unpin + 'static,
1995 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1996{
1997 type Encoder = StreamEncoderRead<W>;
1998}
1999
2000impl<W> Encode<W> for tokio::io::Empty
2001where
2002 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2003{
2004 type Encoder = StreamEncoderRead<W>;
2005}
2006
2007#[cfg(feature = "io-std")]
2008impl<W> Encode<W> for tokio::io::Stdin
2009where
2010 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2011{
2012 type Encoder = StreamEncoderRead<W>;
2013}
2014
2015#[cfg(feature = "fs")]
2016impl<W> Encode<W> for tokio::fs::File
2017where
2018 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2019{
2020 type Encoder = StreamEncoderRead<W>;
2021}
2022
2023#[cfg(feature = "net")]
2024impl<W> Encode<W> for tokio::net::TcpStream
2025where
2026 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2027{
2028 type Encoder = StreamEncoderRead<W>;
2029}
2030
2031#[cfg(all(unix, feature = "net"))]
2032impl<W> Encode<W> for tokio::net::UnixStream
2033where
2034 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2035{
2036 type Encoder = StreamEncoderRead<W>;
2037}
2038
2039#[cfg(all(unix, feature = "net"))]
2040impl<W> Encode<W> for tokio::net::unix::pipe::Receiver
2041where
2042 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2043{
2044 type Encoder = StreamEncoderRead<W>;
2045}
2046
2047pub struct StreamDecoder<T, R>
2049where
2050 T: Decode<R>,
2051{
2052 dec: T::ListDecoder,
2053 deferred: Option<DeferredFn<Incoming<R>>>,
2054 _ty: PhantomData<T>,
2055}
2056
2057impl<T, R> Default for StreamDecoder<T, R>
2058where
2059 T: Decode<R>,
2060{
2061 fn default() -> Self {
2062 Self {
2063 dec: T::ListDecoder::default(),
2064 deferred: None,
2065 _ty: PhantomData,
2066 }
2067 }
2068}
2069
2070impl<T, R> Deferred<Incoming<R>> for StreamDecoder<T, R>
2071where
2072 T: Decode<R>,
2073{
2074 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2075 self.deferred.take()
2076 }
2077}
2078
2079#[instrument(level = "trace", skip(dec, r, tx), ret)]
2080async fn handle_deferred_stream<C, T, R>(
2081 dec: C,
2082 mut r: Incoming<R>,
2083 path: Vec<usize>,
2084 tx: mpsc::Sender<Vec<T>>,
2085) -> std::io::Result<()>
2086where
2087 C: tokio_util::codec::Decoder<Item = T> + Deferred<Incoming<R>>,
2088 R: AsyncRead + crate::Index<R> + Send + Unpin + 'static,
2089 std::io::Error: From<C::Error>,
2090{
2091 let dec = ListDecoder::new(dec);
2092 if !path.is_empty() {
2093 r = r.index(&path).map_err(std::io::Error::other)?;
2094 };
2095 let mut framed = FramedRead::new(r, dec);
2096 let mut tasks = JoinSet::new();
2097 let mut i = 0_usize;
2098 loop {
2099 trace!("receiving pending stream chunk");
2100 select! {
2101 Some(chunk) = framed.next() => {
2102 let chunk = chunk?;
2103 if chunk.is_empty() {
2104 trace!("received stream end");
2105 while let Some(res) = tasks.join_next().await {
2106 res??;
2107 }
2108 return Ok(())
2109 }
2110 let end = i.checked_add(chunk.len()).ok_or_else(|| {
2111 std::io::Error::new(
2112 std::io::ErrorKind::InvalidInput,
2113 "stream element index would overflow usize",
2114 )
2115 })?;
2116 trace!(i, end, "received stream chunk");
2117 if tx.send(chunk).await.is_err() {
2118 debug!("stream receiver closed, discard data");
2119 return Ok(())
2120 }
2121 for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
2122 if let Some(deferred) = deferred {
2123 let r = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
2124 trace!("spawning receive task");
2125 tasks.spawn(deferred(r, Vec::default()));
2126 }
2127 }
2128 i = end;
2129 },
2130 Some(res) = tasks.join_next() => {
2131 trace!(?res, "receiver task finished");
2132 res??;
2133 }
2134 else => {
2135 return Ok(());
2136 }
2137 }
2138 }
2139}
2140
2141impl<T, R> tokio_util::codec::Decoder for StreamDecoder<T, R>
2142where
2143 T: Decode<R> + Send + 'static,
2144 T::ListDecoder: Deferred<Incoming<R>>,
2145 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2146 <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2147 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2148{
2149 type Item = Pin<Box<dyn Stream<Item = Vec<T>> + Send>>;
2150 type Error = <<T as Decode<R>>::ListDecoder as tokio_util::codec::Decoder>::Error;
2151
2152 #[instrument(level = "trace", skip(self), fields(ty = "stream"))]
2153 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2154 let Some(chunk) = self.dec.decode(src)? else {
2155 return Ok(None);
2156 };
2157 if !chunk.is_empty() {
2158 self.deferred = self.dec.take_deferred();
2159 return Ok(Some(Box::pin(stream::iter([chunk]))));
2160 }
2161
2162 let (tx, rx) = mpsc::channel(128);
2164 self.deferred = Some(Box::new(|r, path| {
2165 Box::pin(
2166 async move { handle_deferred_stream(T::Decoder::default(), r, path, tx).await },
2167 )
2168 }));
2169 Ok(Some(Box::pin(ReceiverStream::new(rx))))
2170 }
2171}
2172
2173impl<T, R> Decode<R> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
2174where
2175 T: Decode<R> + Send + 'static,
2176 T::ListDecoder: Deferred<Incoming<R>> + Send,
2177 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2178 <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2179 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2180{
2181 type Decoder = StreamDecoder<T, R>;
2182 type ListDecoder = ListDecoder<Self::Decoder, R>;
2183}
2184
2185pub struct StreamDecoderBytes<R> {
2187 dec: CoreVecDecoderBytes,
2188 deferred: Option<DeferredFn<Incoming<R>>>,
2189}
2190
2191impl<R> Default for StreamDecoderBytes<R> {
2192 fn default() -> Self {
2193 Self {
2194 dec: CoreVecDecoderBytes::default(),
2195 deferred: None,
2196 }
2197 }
2198}
2199
2200impl<R> Deferred<Incoming<R>> for StreamDecoderBytes<R> {
2201 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2202 self.deferred.take()
2203 }
2204}
2205
2206impl<R> tokio_util::codec::Decoder for StreamDecoderBytes<R>
2207where
2208 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2209{
2210 type Item = Pin<Box<dyn Stream<Item = Bytes> + Send>>;
2211 type Error = std::io::Error;
2212
2213 #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2214 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2215 let Some(chunk) = self.dec.decode(src)? else {
2216 return Ok(None);
2217 };
2218 if !chunk.is_empty() {
2219 return Ok(Some(Box::pin(stream::iter([chunk]))));
2220 }
2221
2222 let (tx, rx) = mpsc::channel(128);
2224 let dec = mem::take(&mut self.dec);
2225 let span = Span::current();
2226 self.deferred = Some(Box::new(|mut r, path| {
2227 Box::pin(
2228 async move {
2229 if !path.is_empty() {
2230 r = r.index(&path).map_err(std::io::Error::other)?;
2231 };
2232 let mut framed = FramedRead::new(r, dec);
2233 trace!(?path, "receiving pending byte stream chunk");
2234 while let Some(chunk) = framed.next().await {
2235 let chunk = chunk?;
2236 if chunk.is_empty() {
2237 trace!("received stream end");
2238 return Ok(());
2239 }
2240 trace!(?chunk, "received pending byte stream chunk");
2241 if tx.send(chunk).await.is_err() {
2242 debug!("stream receiver closed, discard data");
2243 return Ok(());
2244 }
2245 }
2246 Ok(())
2247 }
2248 .instrument(span),
2249 )
2250 }));
2251 Ok(Some(Box::pin(ReceiverStream::new(rx))))
2252 }
2253}
2254
2255impl<R> Decode<R> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
2256where
2257 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2258{
2259 type Decoder = StreamDecoderBytes<R>;
2260 type ListDecoder = ListDecoder<Self::Decoder, R>;
2261}
2262
2263pub struct StreamDecoderRead<R> {
2265 dec: CoreVecDecoderBytes,
2266 deferred: Option<DeferredFn<Incoming<R>>>,
2267}
2268
2269impl<R> Default for StreamDecoderRead<R> {
2270 fn default() -> Self {
2271 Self {
2272 dec: CoreVecDecoderBytes::default(),
2273 deferred: None,
2274 }
2275 }
2276}
2277
2278impl<R> Deferred<Incoming<R>> for StreamDecoderRead<R> {
2279 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2280 self.deferred.take()
2281 }
2282}
2283
2284impl<R> tokio_util::codec::Decoder for StreamDecoderRead<R>
2285where
2286 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2287{
2288 type Item = Pin<Box<dyn AsyncRead + Send>>;
2289 type Error = std::io::Error;
2290
2291 #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2292 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2293 let Some(chunk) = self.dec.decode(src)? else {
2294 return Ok(None);
2295 };
2296 if !chunk.is_empty() {
2297 return Ok(Some(Box::pin(std::io::Cursor::new(chunk))));
2298 }
2299
2300 let (tx, rx) = mpsc::channel(128);
2302 let dec = mem::take(&mut self.dec);
2303 self.deferred = Some(Box::new(|mut r, path| {
2304 Box::pin(async move {
2305 if !path.is_empty() {
2306 r = r.index(&path).map_err(std::io::Error::other)?;
2307 };
2308 let mut framed = FramedRead::new(r, dec);
2309 trace!("receiving pending byte stream chunk");
2310 while let Some(chunk) = framed.next().await {
2311 let chunk = chunk?;
2312 if chunk.is_empty() {
2313 trace!("received stream end");
2314 return Ok(());
2315 }
2316 trace!(?chunk, "received byte stream chunk");
2317 if tx.send(std::io::Result::Ok(chunk)).await.is_err() {
2318 debug!("stream receiver closed, discard data");
2319 return Ok(());
2320 }
2321 }
2322 Ok(())
2323 })
2324 }));
2325 Ok(Some(Box::pin(StreamReader::new(ReceiverStream::new(rx)))))
2326 }
2327}
2328
2329impl<R> Decode<R> for Pin<Box<dyn AsyncRead + Send>>
2330where
2331 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2332{
2333 type Decoder = StreamDecoderRead<R>;
2334 type ListDecoder = ListDecoder<Self::Decoder, R>;
2335}
2336
2337#[cfg(test)]
2338mod tests {
2339 use anyhow::bail;
2340
2341 use super::*;
2342
2343 struct NoopStream;
2344
2345 impl crate::Index<Self> for NoopStream {
2346 fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
2347 panic!("index should not be called with path {path:?}")
2348 }
2349 }
2350
2351 #[test_log::test(tokio::test)]
2352 async fn codec() -> anyhow::Result<()> {
2353 let mut buf = BytesMut::new();
2354 let mut enc = <(u8, u32) as Encode<NoopStream>>::Encoder::default();
2355 enc.encode((0x42, 0x42), &mut buf)?;
2356 if let Some(_f) = Deferred::<NoopStream>::take_deferred(&mut enc) {
2357 bail!("no deferred write should have been returned");
2358 }
2359 assert_eq!(buf.as_ref(), b"\x42\x42");
2360 Ok(())
2361 }
2362
2363 #[test]
2364 fn canonical_nan_f32() {
2365 let mut enc = <f32 as Encode<NoopStream>>::Encoder::default();
2366
2367 let mut buf = BytesMut::new();
2369 enc.encode(f32::from_bits(0x7f80_0001), &mut buf).unwrap();
2370 assert_eq!(buf.as_ref(), CANONICAL_NAN_F32.to_le_bytes());
2371
2372 let mut buf = BytesMut::new();
2374 enc.encode(f32::from_bits(0xffc0_0000), &mut buf).unwrap();
2375 assert_eq!(buf.as_ref(), CANONICAL_NAN_F32.to_le_bytes());
2376
2377 let mut buf = BytesMut::new();
2379 enc.encode(1.5_f32, &mut buf).unwrap();
2380 assert_eq!(buf.as_ref(), 1.5_f32.to_bits().to_le_bytes());
2381 }
2382
2383 #[test]
2384 fn canonical_nan_f64() {
2385 let mut enc = <f64 as Encode<NoopStream>>::Encoder::default();
2386
2387 let mut buf = BytesMut::new();
2388 enc.encode(f64::from_bits(0x7ff0_0000_0000_0001), &mut buf)
2389 .unwrap();
2390 assert_eq!(buf.as_ref(), CANONICAL_NAN_F64.to_le_bytes());
2391
2392 let mut buf = BytesMut::new();
2393 enc.encode(f64::from_bits(0xfff8_0000_0000_0000), &mut buf)
2394 .unwrap();
2395 assert_eq!(buf.as_ref(), CANONICAL_NAN_F64.to_le_bytes());
2396
2397 let mut buf = BytesMut::new();
2398 enc.encode(1.5_f64, &mut buf).unwrap();
2399 assert_eq!(buf.as_ref(), 1.5_f64.to_bits().to_le_bytes());
2400 }
2401}