1use core::any::TypeId;
2use core::fmt::{self, Debug};
3use core::future::{pending, Future};
4use core::hash::{Hash, Hasher};
5use core::iter::zip;
6use core::marker::PhantomData;
7use core::mem;
8use core::ops::{Deref, DerefMut};
9use core::pin::Pin;
10
11use bytes::{Buf as _, BufMut as _, Bytes, BytesMut};
12use futures::stream::{self, FuturesUnordered};
13use futures::{Stream, StreamExt as _, TryStreamExt as _};
14use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::codec::{Encoder as _, FramedRead};
20use tokio_util::io::StreamReader;
21use tracing::{debug, error, instrument, trace, Instrument as _, Span};
22use wasm_tokio::cm::{
23 BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
24 ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
25 U32Codec, U64Codec, U8Codec,
26};
27use wasm_tokio::{
28 CoreNameDecoder, CoreNameEncoder, CoreVecDecoder, CoreVecDecoderBytes, CoreVecEncoderBytes,
29 Leb128DecoderI128, Leb128DecoderI16, Leb128DecoderI32, Leb128DecoderI64, Leb128DecoderI8,
30 Leb128DecoderU128, Leb128DecoderU16, Leb128DecoderU32, Leb128DecoderU64, Leb128DecoderU8,
31 Leb128Encoder, Utf8Codec,
32};
33
34use crate::{Incoming, Index as _};
35
36#[repr(transparent)]
38pub struct ResourceBorrow<T: ?Sized> {
39 repr: Bytes,
40 _ty: PhantomData<T>,
41}
42
43impl<T: ?Sized> From<Bytes> for ResourceBorrow<T> {
44 fn from(repr: Bytes) -> Self {
45 Self {
46 repr,
47 _ty: PhantomData,
48 }
49 }
50}
51
52impl<T: ?Sized> From<Vec<u8>> for ResourceBorrow<T> {
53 fn from(repr: Vec<u8>) -> Self {
54 Self {
55 repr: repr.into(),
56 _ty: PhantomData,
57 }
58 }
59}
60
61impl<T: ?Sized> From<ResourceBorrow<T>> for Bytes {
62 fn from(ResourceBorrow { repr, .. }: ResourceBorrow<T>) -> Self {
63 repr
64 }
65}
66
67impl<T: ?Sized> PartialEq for ResourceBorrow<T> {
68 fn eq(&self, other: &Self) -> bool {
69 self.repr == other.repr
70 }
71}
72
73impl<T: ?Sized> Eq for ResourceBorrow<T> {}
74
75impl<T: ?Sized> Hash for ResourceBorrow<T> {
76 fn hash<H: Hasher>(&self, state: &mut H) {
77 self.repr.hash(state);
78 }
79}
80
81impl<T: ?Sized> AsRef<[u8]> for ResourceBorrow<T> {
82 fn as_ref(&self) -> &[u8] {
83 &self.repr
84 }
85}
86
87impl<T: ?Sized> AsRef<Bytes> for ResourceBorrow<T> {
88 fn as_ref(&self) -> &Bytes {
89 &self.repr
90 }
91}
92
93impl<T: ?Sized + 'static> Debug for ResourceBorrow<T> {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(f, "borrow<{:?}>", TypeId::of::<T>())
96 }
97}
98
99impl<T: ?Sized> Clone for ResourceBorrow<T> {
100 fn clone(&self) -> Self {
101 Self {
102 repr: self.repr.clone(),
103 _ty: PhantomData,
104 }
105 }
106}
107
108impl<T: ?Sized> ResourceBorrow<T> {
109 pub fn new(repr: impl Into<Bytes>) -> Self {
111 Self::from(repr.into())
112 }
113}
114
115#[repr(transparent)]
117pub struct ResourceOwn<T: ?Sized> {
118 repr: Bytes,
119 _ty: PhantomData<T>,
120}
121
122impl<T: ?Sized> From<ResourceOwn<T>> for ResourceBorrow<T> {
123 fn from(ResourceOwn { repr, _ty }: ResourceOwn<T>) -> Self {
124 Self {
125 repr,
126 _ty: PhantomData,
127 }
128 }
129}
130
131impl<T: ?Sized> From<Bytes> for ResourceOwn<T> {
132 fn from(repr: Bytes) -> Self {
133 Self {
134 repr,
135 _ty: PhantomData,
136 }
137 }
138}
139
140impl<T: ?Sized> From<Vec<u8>> for ResourceOwn<T> {
141 fn from(repr: Vec<u8>) -> Self {
142 Self {
143 repr: repr.into(),
144 _ty: PhantomData,
145 }
146 }
147}
148
149impl<T: ?Sized> From<ResourceOwn<T>> for Bytes {
150 fn from(ResourceOwn { repr, .. }: ResourceOwn<T>) -> Self {
151 repr
152 }
153}
154
155impl<T: ?Sized> PartialEq for ResourceOwn<T> {
156 fn eq(&self, other: &Self) -> bool {
157 self.repr == other.repr
158 }
159}
160
161impl<T: ?Sized> Eq for ResourceOwn<T> {}
162
163impl<T: ?Sized> Hash for ResourceOwn<T> {
164 fn hash<H: Hasher>(&self, state: &mut H) {
165 self.repr.hash(state);
166 }
167}
168
169impl<T: ?Sized> AsRef<[u8]> for ResourceOwn<T> {
170 fn as_ref(&self) -> &[u8] {
171 &self.repr
172 }
173}
174
175impl<T: ?Sized> AsRef<Bytes> for ResourceOwn<T> {
176 fn as_ref(&self) -> &Bytes {
177 &self.repr
178 }
179}
180
181impl<T: ?Sized + 'static> Debug for ResourceOwn<T> {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 write!(f, "own<{:?}>", TypeId::of::<T>())
184 }
185}
186
187impl<T: ?Sized> Clone for ResourceOwn<T> {
188 fn clone(&self) -> Self {
189 Self {
190 repr: self.repr.clone(),
191 _ty: PhantomData,
192 }
193 }
194}
195
196impl<T: ?Sized> ResourceOwn<T> {
197 pub fn new(repr: impl Into<Bytes>) -> Self {
199 Self::from(repr.into())
200 }
201
202 pub fn as_borrow(&self) -> ResourceBorrow<T> {
204 ResourceBorrow {
205 repr: self.repr.clone(),
206 _ty: PhantomData,
207 }
208 }
209}
210
211pub type DeferredFn<T> = Box<
213 dyn FnOnce(T, Vec<usize>) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>> + Send,
214>;
215
216pub trait Deferred<T> {
218 fn take_deferred(&mut self) -> Option<DeferredFn<T>>;
220}
221
222macro_rules! impl_deferred_sync {
223 ($t:ty) => {
224 impl<T> Deferred<T> for $t {
225 fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
226 None
227 }
228 }
229 };
230}
231
232impl_deferred_sync!(BoolCodec);
233impl_deferred_sync!(S8Codec);
234impl_deferred_sync!(U8Codec);
235impl_deferred_sync!(S16Codec);
236impl_deferred_sync!(U16Codec);
237impl_deferred_sync!(S32Codec);
238impl_deferred_sync!(U32Codec);
239impl_deferred_sync!(S64Codec);
240impl_deferred_sync!(U64Codec);
241impl_deferred_sync!(F32Codec);
242impl_deferred_sync!(F64Codec);
243impl_deferred_sync!(CoreNameDecoder);
244impl_deferred_sync!(CoreNameEncoder);
245impl_deferred_sync!(CoreVecDecoderBytes);
246impl_deferred_sync!(CoreVecEncoderBytes);
247impl_deferred_sync!(Utf8Codec);
248impl_deferred_sync!(PrimValEncoder);
249impl_deferred_sync!(Leb128Encoder);
250impl_deferred_sync!(Leb128DecoderI8);
251impl_deferred_sync!(Leb128DecoderU8);
252impl_deferred_sync!(Leb128DecoderI16);
253impl_deferred_sync!(Leb128DecoderU16);
254impl_deferred_sync!(Leb128DecoderI32);
255impl_deferred_sync!(Leb128DecoderU32);
256impl_deferred_sync!(Leb128DecoderI64);
257impl_deferred_sync!(Leb128DecoderU64);
258impl_deferred_sync!(Leb128DecoderI128);
259impl_deferred_sync!(Leb128DecoderU128);
260impl_deferred_sync!(ResourceEncoder);
261impl_deferred_sync!(UnitCodec);
262impl_deferred_sync!(ListDecoderU8);
263
264impl_deferred_sync!(CoreVecDecoder<BoolCodec>);
265impl_deferred_sync!(CoreVecDecoder<S8Codec>);
266impl_deferred_sync!(CoreVecDecoder<U8Codec>);
267impl_deferred_sync!(CoreVecDecoder<S16Codec>);
268impl_deferred_sync!(CoreVecDecoder<U16Codec>);
269impl_deferred_sync!(CoreVecDecoder<S32Codec>);
270impl_deferred_sync!(CoreVecDecoder<U32Codec>);
271impl_deferred_sync!(CoreVecDecoder<S64Codec>);
272impl_deferred_sync!(CoreVecDecoder<U64Codec>);
273impl_deferred_sync!(CoreVecDecoder<F32Codec>);
274impl_deferred_sync!(CoreVecDecoder<F64Codec>);
275impl_deferred_sync!(CoreVecDecoder<CoreNameDecoder>);
276impl_deferred_sync!(CoreVecDecoder<CoreVecDecoderBytes>);
277impl_deferred_sync!(CoreVecDecoder<Utf8Codec>);
278impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI8>);
279impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU8>);
280impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI16>);
281impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU16>);
282impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI32>);
283impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU32>);
284impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI64>);
285impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU64>);
286impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI128>);
287impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU128>);
288impl_deferred_sync!(CoreVecDecoder<UnitCodec>);
289
290pub struct SyncCodec<T>(pub T);
295
296impl<T> Deref for SyncCodec<T> {
297 type Target = T;
298
299 fn deref(&self) -> &Self::Target {
300 &self.0
301 }
302}
303
304impl<T> DerefMut for SyncCodec<T> {
305 fn deref_mut(&mut self) -> &mut Self::Target {
306 &mut self.0
307 }
308}
309
310impl<T, C> Deferred<T> for SyncCodec<C> {
311 fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
312 None
313 }
314}
315
316impl<T: Default> Default for SyncCodec<T> {
317 fn default() -> Self {
318 Self(T::default())
319 }
320}
321
322impl<T, I> tokio_util::codec::Encoder<I> for SyncCodec<T>
323where
324 T: tokio_util::codec::Encoder<I>,
325{
326 type Error = T::Error;
327
328 fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
329 self.0.encode(item, dst)
330 }
331}
332
333impl<T> tokio_util::codec::Decoder for SyncCodec<T>
334where
335 T: tokio_util::codec::Decoder,
336{
337 type Item = T::Item;
338 type Error = T::Error;
339
340 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
341 self.0.decode(src)
342 }
343
344 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
345 self.0.decode_eof(buf)
346 }
347
348 fn framed<IO: AsyncRead + AsyncWrite + Sized>(
349 self,
350 io: IO,
351 ) -> tokio_util::codec::Framed<IO, Self>
352 where
353 Self: Sized,
354 {
355 self.0.framed(io).map_codec(Self)
356 }
357}
358
359#[instrument(level = "trace", skip(w, deferred))]
360async fn handle_deferred<T, I>(w: T, deferred: I, mut path: Vec<usize>) -> std::io::Result<()>
361where
362 I: IntoIterator<Item = Option<DeferredFn<T>>>,
363 I::IntoIter: ExactSizeIterator,
364 T: crate::Index<T>,
365{
366 let mut futs = FuturesUnordered::default();
367 for (i, f) in zip(0.., deferred) {
368 if let Some(f) = f {
369 path.push(i);
370 let w = w.index(&path).map_err(std::io::Error::other)?;
371 path.pop();
372 futs.push(f(w, Vec::default()));
373 }
374 }
375 while let Some(()) = futs.try_next().await? {}
376 Ok(())
377}
378
379pub trait Encode<T>: Sized {
381 type Encoder: tokio_util::codec::Encoder<Self> + Deferred<T> + Default + Send;
383
384 #[instrument(level = "trace", skip(self, enc))]
386 fn encode(
387 self,
388 enc: &mut Self::Encoder,
389 dst: &mut BytesMut,
390 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
391 {
392 enc.encode(self, dst)?;
393 Ok(enc.take_deferred())
394 }
395
396 #[instrument(level = "trace", skip(items, enc))]
398 fn encode_iter_own<I>(
399 items: I,
400 enc: &mut Self::Encoder,
401 dst: &mut BytesMut,
402 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
403 where
404 I: IntoIterator<Item = Self>,
405 I::IntoIter: ExactSizeIterator,
406 T: crate::Index<T> + Send + Sync + 'static,
407 {
408 let items = items.into_iter();
409 dst.reserve(items.len());
410 let mut deferred = Vec::with_capacity(items.len());
411 for item in items {
412 enc.encode(item, dst)?;
413 deferred.push(enc.take_deferred());
414 }
415 if deferred.iter().any(Option::is_some) {
416 Ok(Some(Box::new(move |w, path| {
417 Box::pin(handle_deferred(w, deferred, path))
418 })))
419 } else {
420 Ok(None)
421 }
422 }
423
424 #[instrument(level = "trace", skip(items, enc))]
426 fn encode_iter_ref<'a, I>(
427 items: I,
428 enc: &mut Self::Encoder,
429 dst: &mut BytesMut,
430 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
431 where
432 I: IntoIterator<Item = &'a Self>,
433 I::IntoIter: ExactSizeIterator,
434 T: crate::Index<T> + Send + Sync + 'static,
435 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
436 {
437 let items = items.into_iter();
438 dst.reserve(items.len());
439 let mut deferred = Vec::with_capacity(items.len());
440 for item in items {
441 enc.encode(item, dst)?;
442 deferred.push(enc.take_deferred());
443 }
444 if deferred.iter().any(Option::is_some) {
445 Ok(Some(Box::new(move |w, path| {
446 Box::pin(handle_deferred(w, deferred, path))
447 })))
448 } else {
449 Ok(None)
450 }
451 }
452
453 #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
455 fn encode_list_own(
456 items: Vec<Self>,
457 enc: &mut Self::Encoder,
458 dst: &mut BytesMut,
459 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
460 where
461 T: crate::Index<T> + Send + Sync + 'static,
462 {
463 let n = u32::try_from(items.len())
464 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
465 dst.reserve(5 + items.len());
466 Leb128Encoder.encode(n, dst)?;
467 Self::encode_iter_own(items, enc, dst)
468 }
469
470 #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
472 fn encode_list_ref<'a>(
473 items: &'a [Self],
474 enc: &mut Self::Encoder,
475 dst: &mut BytesMut,
476 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
477 where
478 T: crate::Index<T> + Send + Sync + 'static,
479 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
480 {
481 let n = u32::try_from(items.len())
482 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
483 dst.reserve(5 + items.len());
484 Leb128Encoder.encode(n, dst)?;
485 Self::encode_iter_ref(items, enc, dst)
486 }
487}
488
489pub trait Decode<T>: Sized {
491 type Decoder: tokio_util::codec::Decoder<Item = Self>
493 + Deferred<Incoming<T>>
494 + Default
495 + Send
496 + 'static;
497 type ListDecoder: tokio_util::codec::Decoder<Item = Vec<Self>> + Default + 'static;
499}
500
501impl<T, W> Deferred<W> for OptionEncoder<T>
502where
503 T: Deferred<W>,
504{
505 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
506 self.0.take_deferred()
507 }
508}
509
510impl<T, W> Encode<W> for Option<T>
511where
512 T: Encode<W>,
513{
514 type Encoder = OptionEncoder<T::Encoder>;
515}
516
517impl<'a, T, W> Encode<W> for &'a Option<T>
518where
519 T: Encode<W>,
520 T::Encoder: tokio_util::codec::Encoder<&'a T>,
521{
522 type Encoder = OptionEncoder<T::Encoder>;
523}
524
525impl<T, W> Deferred<W> for OptionDecoder<T>
526where
527 T: Deferred<W> + Default,
528{
529 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
530 mem::take(self).into_inner().take_deferred()
531 }
532}
533
534impl<T, R> Decode<R> for Option<T>
535where
536 T: Decode<R>,
537 R: crate::Index<R> + Send + 'static,
538{
539 type Decoder = OptionDecoder<T::Decoder>;
540 type ListDecoder = ListDecoder<Self::Decoder, R>;
541}
542
543impl<O, E, W> Deferred<W> for ResultEncoder<O, E>
544where
545 O: Deferred<W>,
546 E: Deferred<W>,
547{
548 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
549 match (self.ok.take_deferred(), self.err.take_deferred()) {
550 (None, None) => None,
551 (Some(ok), None) => Some(ok),
552 (None, Some(err)) => Some(err),
553 (Some(ok), Some(_)) => {
554 if cfg!(debug_assertions) {
555 panic!("both `result::ok` and `result::err` deferred function set")
556 }
557 Some(ok)
558 }
559 }
560 }
561}
562
563impl<O, E, W> Encode<W> for Result<O, E>
564where
565 O: Encode<W>,
566 E: Encode<W>,
567 std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<O>>::Error>,
568 std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<E>>::Error>,
569{
570 type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
571}
572
573impl<'a, O, E, W> Encode<W> for &'a Result<O, E>
574where
575 O: Encode<W>,
576 O::Encoder: tokio_util::codec::Encoder<&'a O>,
577 E: Encode<W>,
578 E::Encoder: tokio_util::codec::Encoder<&'a E>,
579 std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<&'a O>>::Error>,
580 std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<&'a E>>::Error>,
581{
582 type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
583}
584
585impl<O, E, W> Deferred<W> for ResultDecoder<O, E>
586where
587 O: Deferred<W> + Default,
588 E: Deferred<W> + Default,
589{
590 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
591 let (mut ok, mut err) = mem::take(self).into_inner();
592 match (ok.take_deferred(), err.take_deferred()) {
593 (None, None) => None,
594 (Some(ok), None) => Some(ok),
595 (None, Some(err)) => Some(err),
596 (Some(ok), Some(_)) => {
597 if cfg!(debug_assertions) {
598 panic!("both `result::ok` and `result::err` deferred function set")
599 }
600 Some(ok)
601 }
602 }
603 }
604}
605
606impl<O, E, R> Decode<R> for Result<O, E>
607where
608 O: Decode<R>,
609 E: Decode<R>,
610 R: crate::Index<R> + Send + 'static,
611 std::io::Error: From<<O::Decoder as tokio_util::codec::Decoder>::Error>,
612 std::io::Error: From<<E::Decoder as tokio_util::codec::Decoder>::Error>,
613{
614 type Decoder = ResultDecoder<O::Decoder, E::Decoder>;
615 type ListDecoder = ListDecoder<Self::Decoder, R>;
616}
617
618pub struct ListEncoder<W> {
620 deferred: Option<DeferredFn<W>>,
621}
622
623impl<W> Default for ListEncoder<W> {
624 fn default() -> Self {
625 Self { deferred: None }
626 }
627}
628
629impl<W> Deferred<W> for ListEncoder<W> {
630 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
631 self.deferred.take()
632 }
633}
634
635impl<T, W> tokio_util::codec::Encoder<Vec<T>> for ListEncoder<W>
636where
637 T: Encode<W>,
638 W: crate::Index<W> + Send + Sync + 'static,
639{
640 type Error = <T::Encoder as tokio_util::codec::Encoder<T>>::Error;
641
642 fn encode(&mut self, items: Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
643 let mut enc = T::Encoder::default();
644 self.deferred = T::encode_list_own(items, &mut enc, dst)?;
645 Ok(())
646 }
647}
648
649impl<'a, T, W> tokio_util::codec::Encoder<&'a Vec<T>> for ListEncoder<W>
650where
651 T: Encode<W>,
652 T::Encoder: tokio_util::codec::Encoder<&'a T>,
653 W: crate::Index<W> + Send + Sync + 'static,
654{
655 type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
656
657 fn encode(&mut self, items: &'a Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
658 let mut enc = T::Encoder::default();
659 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
660 Ok(())
661 }
662}
663
664impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b Vec<T>> for ListEncoder<W>
665where
666 T: Encode<W>,
667 T::Encoder: tokio_util::codec::Encoder<&'b T>,
668 W: crate::Index<W> + Send + Sync + 'static,
669{
670 type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
671
672 fn encode(&mut self, items: &'a &'b Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
673 let mut enc = T::Encoder::default();
674 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
675 Ok(())
676 }
677}
678
679impl<'a, T, W> tokio_util::codec::Encoder<&'a [T]> for ListEncoder<W>
680where
681 T: Encode<W>,
682 T::Encoder: tokio_util::codec::Encoder<&'a T>,
683 W: crate::Index<W> + Send + Sync + 'static,
684{
685 type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
686
687 fn encode(&mut self, items: &'a [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
688 let mut enc = T::Encoder::default();
689 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
690 Ok(())
691 }
692}
693
694impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b [T]> for ListEncoder<W>
695where
696 T: Encode<W>,
697 T::Encoder: tokio_util::codec::Encoder<&'b T>,
698 W: crate::Index<W> + Send + Sync + 'static,
699{
700 type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
701
702 fn encode(&mut self, items: &'a &'b [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
703 let mut enc = T::Encoder::default();
704 self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
705 Ok(())
706 }
707}
708
709impl<T, W> Encode<W> for Vec<T>
710where
711 T: Encode<W>,
712 W: crate::Index<W> + Send + Sync + 'static,
713{
714 type Encoder = ListEncoder<W>;
715}
716
717impl<'a, T, W> Encode<W> for &'a Vec<T>
718where
719 T: Encode<W>,
720 T::Encoder: tokio_util::codec::Encoder<&'a T>,
721 W: crate::Index<W> + Send + Sync + 'static,
722{
723 type Encoder = ListEncoder<W>;
724}
725
726impl<'a, T, W> Encode<W> for &'a [T]
727where
728 T: Encode<W>,
729 T::Encoder: tokio_util::codec::Encoder<&'a T>,
730 W: crate::Index<W> + Send + Sync + 'static,
731{
732 type Encoder = ListEncoder<W>;
733}
734
735pub struct ListDecoder<T, R>
737where
738 T: tokio_util::codec::Decoder,
739{
740 dec: T,
741 ret: Vec<T::Item>,
742 cap: usize,
743 deferred: Vec<Option<DeferredFn<Incoming<R>>>>,
744}
745
746impl<T, R> ListDecoder<T, R>
747where
748 T: tokio_util::codec::Decoder,
749{
750 pub fn new(dec: T) -> Self {
752 Self {
753 dec,
754 ret: Vec::default(),
755 cap: 0,
756 deferred: vec![],
757 }
758 }
759}
760
761impl<T, R> Default for ListDecoder<T, R>
762where
763 T: tokio_util::codec::Decoder + Default,
764{
765 fn default() -> Self {
766 Self::new(T::default())
767 }
768}
769
770impl<T, R> Deferred<Incoming<R>> for ListDecoder<T, R>
771where
772 T: tokio_util::codec::Decoder,
773 R: crate::Index<R> + Send + Sync + 'static,
774{
775 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
776 let deferred = mem::take(&mut self.deferred);
777 if deferred.iter().any(Option::is_some) {
778 Some(Box::new(|r, path| {
779 Box::pin(handle_deferred(r, deferred, path))
780 }))
781 } else {
782 None
783 }
784 }
785}
786
787impl<T, R> tokio_util::codec::Decoder for ListDecoder<T, R>
788where
789 T: tokio_util::codec::Decoder + Deferred<Incoming<R>>,
790{
791 type Item = Vec<T::Item>;
792 type Error = T::Error;
793
794 #[instrument(level = "trace", skip(self), fields(ty = "list"))]
795 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
796 if self.cap == 0 {
797 let Some(len) = Leb128DecoderU32.decode(src)? else {
798 return Ok(None);
799 };
800 if len == 0 {
801 return Ok(Some(Vec::default()));
802 }
803 let len = len
804 .try_into()
805 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
806 self.ret = Vec::with_capacity(len);
807 self.deferred = Vec::with_capacity(len);
808 self.cap = len;
809 }
810 while self.cap > 0 {
811 let Some(v) = self.dec.decode(src)? else {
812 return Ok(None);
813 };
814 self.ret.push(v);
815 self.deferred.push(self.dec.take_deferred());
816 self.cap -= 1;
817 }
818 Ok(Some(mem::take(&mut self.ret)))
819 }
820}
821
822impl<T, R> Decode<R> for Vec<T>
823where
824 T: Decode<R> + Send,
825 T::ListDecoder: Deferred<Incoming<R>> + Send,
826 R: crate::Index<R> + Send + 'static,
827{
828 type Decoder = T::ListDecoder;
829 type ListDecoder = ListDecoder<Self::Decoder, R>;
830}
831
832macro_rules! impl_copy_codec {
833 ($t:ty, $c:tt) => {
834 impl<W> Encode<W> for $t {
835 type Encoder = $c;
836
837 #[instrument(level = "trace", skip(items))]
838 fn encode_iter_own<I>(
839 items: I,
840 enc: &mut Self::Encoder,
841 dst: &mut BytesMut,
842 ) -> Result<
843 Option<DeferredFn<W>>,
844 <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
845 >
846 where
847 I: IntoIterator<Item = Self>,
848 I::IntoIter: ExactSizeIterator,
849 {
850 let items = items.into_iter();
851 dst.reserve(items.len());
852 for item in items {
853 enc.encode(item, dst)?;
854 }
855 Ok(None)
856 }
857
858 #[instrument(level = "trace", skip(items))]
859 fn encode_iter_ref<'a, I>(
860 items: I,
861 enc: &mut Self::Encoder,
862 dst: &mut BytesMut,
863 ) -> Result<
864 Option<DeferredFn<W>>,
865 <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
866 >
867 where
868 I: IntoIterator<Item = &'a Self>,
869 I::IntoIter: ExactSizeIterator,
870 {
871 let items = items.into_iter();
872 dst.reserve(items.len());
873 for item in items {
874 enc.encode(*item, dst)?;
875 }
876 Ok(None)
877 }
878 }
879
880 impl<'b, W> Encode<W> for &'b $t {
881 type Encoder = $c;
882
883 #[instrument(level = "trace", skip(items))]
884 fn encode_iter_own<I>(
885 items: I,
886 enc: &mut Self::Encoder,
887 dst: &mut BytesMut,
888 ) -> Result<
889 Option<DeferredFn<W>>,
890 <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
891 >
892 where
893 I: IntoIterator<Item = Self>,
894 I::IntoIter: ExactSizeIterator,
895 {
896 let items = items.into_iter();
897 dst.reserve(items.len());
898 for item in items {
899 enc.encode(*item, dst)?;
900 }
901 Ok(None)
902 }
903
904 #[instrument(level = "trace", skip(items))]
905 fn encode_iter_ref<'a, I>(
906 items: I,
907 enc: &mut Self::Encoder,
908 dst: &mut BytesMut,
909 ) -> Result<
910 Option<DeferredFn<W>>,
911 <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
912 >
913 where
914 I: IntoIterator<Item = &'a Self>,
915 I::IntoIter: ExactSizeIterator,
916 'b: 'a,
917 {
918 let items = items.into_iter();
919 dst.reserve(items.len());
920 for item in items {
921 enc.encode(item, dst)?;
922 }
923 Ok(None)
924 }
925 }
926
927 impl<R> Decode<R> for $t {
928 type Decoder = $c;
929 type ListDecoder = CoreVecDecoder<Self::Decoder>;
930 }
931 };
932}
933
934impl_copy_codec!(bool, BoolCodec);
935impl_copy_codec!(i8, S8Codec);
936impl_copy_codec!(i16, S16Codec);
937impl_copy_codec!(u16, U16Codec);
938impl_copy_codec!(i32, S32Codec);
939impl_copy_codec!(u32, U32Codec);
940impl_copy_codec!(i64, S64Codec);
941impl_copy_codec!(u64, U64Codec);
942impl_copy_codec!(f32, F32Codec);
943impl_copy_codec!(f64, F64Codec);
944impl_copy_codec!(char, Utf8Codec);
945
946impl<T> Encode<T> for u8 {
947 type Encoder = U8Codec;
948
949 #[instrument(level = "trace", skip(items))]
950 fn encode_iter_own<I>(
951 items: I,
952 enc: &mut Self::Encoder,
953 dst: &mut BytesMut,
954 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
955 where
956 I: IntoIterator<Item = Self>,
957 I::IntoIter: ExactSizeIterator,
958 {
959 let items = items.into_iter();
960 dst.reserve(items.len());
961 dst.extend(items);
962 Ok(None)
963 }
964
965 #[instrument(level = "trace", skip(items))]
966 fn encode_iter_ref<'a, I>(
967 items: I,
968 enc: &mut Self::Encoder,
969 dst: &mut BytesMut,
970 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
971 where
972 I: IntoIterator<Item = &'a Self>,
973 I::IntoIter: ExactSizeIterator,
974 {
975 let items = items.into_iter();
976 dst.reserve(items.len());
977 dst.extend(items);
978 Ok(None)
979 }
980
981 #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
982 fn encode_list_own(
983 items: Vec<Self>,
984 enc: &mut Self::Encoder,
985 dst: &mut BytesMut,
986 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
987 {
988 CoreVecEncoderBytes.encode(items, dst)?;
989 Ok(None)
990 }
991
992 #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
993 fn encode_list_ref<'a>(
994 items: &'a [Self],
995 enc: &mut Self::Encoder,
996 dst: &mut BytesMut,
997 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
998 where
999 Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
1000 {
1001 CoreVecEncoderBytes.encode(items, dst)?;
1002 Ok(None)
1003 }
1004}
1005
1006impl<'b, T> Encode<T> for &'b u8 {
1007 type Encoder = U8Codec;
1008
1009 #[instrument(level = "trace", skip(items))]
1010 fn encode_iter_own<I>(
1011 items: I,
1012 enc: &mut Self::Encoder,
1013 dst: &mut BytesMut,
1014 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1015 where
1016 I: IntoIterator<Item = Self>,
1017 I::IntoIter: ExactSizeIterator,
1018 {
1019 let items = items.into_iter();
1020 dst.reserve(items.len());
1021 dst.extend(items);
1022 Ok(None)
1023 }
1024
1025 #[instrument(level = "trace", skip(items))]
1026 fn encode_iter_ref<'a, I>(
1027 items: I,
1028 enc: &mut Self::Encoder,
1029 dst: &mut BytesMut,
1030 ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1031 where
1032 I: IntoIterator<Item = &'a Self>,
1033 I::IntoIter: ExactSizeIterator,
1034 'b: 'a,
1035 {
1036 let items = items.into_iter();
1037 dst.reserve(items.len());
1038 dst.extend(items.map(|b| **b));
1039 Ok(None)
1040 }
1041}
1042
1043#[derive(Debug, Default)]
1045#[repr(transparent)]
1046pub struct ListDecoderU8(CoreVecDecoderBytes);
1047
1048impl tokio_util::codec::Decoder for ListDecoderU8 {
1049 type Item = Vec<u8>;
1050 type Error = <CoreVecDecoderBytes as tokio_util::codec::Decoder>::Error;
1051
1052 #[instrument(level = "trace", skip(self), fields(ty = "list<u8>"))]
1053 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1054 let Some(buf) = self.0.decode(src)? else {
1055 return Ok(None);
1056 };
1057 Ok(Some(buf.into()))
1058 }
1059}
1060
1061impl<R> Decode<R> for u8 {
1062 type Decoder = U8Codec;
1063 type ListDecoder = ListDecoderU8;
1064}
1065
1066impl<W> Encode<W> for &str {
1067 type Encoder = CoreNameEncoder;
1068}
1069
1070impl<W> Encode<W> for &&str {
1071 type Encoder = CoreNameEncoder;
1072}
1073
1074impl<W> Encode<W> for String {
1075 type Encoder = CoreNameEncoder;
1076}
1077
1078impl<W> Encode<W> for &String {
1079 type Encoder = CoreNameEncoder;
1080}
1081
1082impl<R> Decode<R> for String {
1083 type Decoder = CoreNameDecoder;
1084 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1085}
1086
1087impl<W> Encode<W> for Bytes {
1088 type Encoder = CoreVecEncoderBytes;
1089}
1090
1091impl<W> Encode<W> for &Bytes {
1092 type Encoder = CoreVecEncoderBytes;
1093}
1094
1095impl<R> Decode<R> for Bytes {
1096 type Decoder = CoreVecDecoderBytes;
1097 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1098}
1099
1100#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1102#[repr(transparent)]
1103pub struct ResourceEncoder;
1104
1105impl<T: ?Sized> tokio_util::codec::Encoder<ResourceOwn<T>> for ResourceEncoder {
1106 type Error = std::io::Error;
1107
1108 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1109 fn encode(&mut self, item: ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1110 CoreVecEncoderBytes.encode(item.repr, dst)
1111 }
1112}
1113
1114impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceOwn<T>> for ResourceEncoder {
1115 type Error = std::io::Error;
1116
1117 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1118 fn encode(&mut self, item: &ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1119 CoreVecEncoderBytes.encode(&item.repr, dst)
1120 }
1121}
1122
1123impl<T: ?Sized, W> Encode<W> for ResourceOwn<T> {
1124 type Encoder = ResourceEncoder;
1125}
1126
1127impl<T: ?Sized, W> Encode<W> for &ResourceOwn<T> {
1128 type Encoder = ResourceEncoder;
1129}
1130
1131impl<T: ?Sized> tokio_util::codec::Encoder<ResourceBorrow<T>> for ResourceEncoder {
1132 type Error = std::io::Error;
1133
1134 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1135 fn encode(&mut self, item: ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1136 CoreVecEncoderBytes.encode(item.repr, dst)
1137 }
1138}
1139
1140impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceBorrow<T>> for ResourceEncoder {
1141 type Error = std::io::Error;
1142
1143 #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1144 fn encode(&mut self, item: &ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1145 CoreVecEncoderBytes.encode(&item.repr, dst)
1146 }
1147}
1148
1149impl<T: ?Sized, W> Encode<W> for ResourceBorrow<T> {
1150 type Encoder = ResourceEncoder;
1151}
1152
1153impl<T: ?Sized, W> Encode<W> for &ResourceBorrow<T> {
1154 type Encoder = ResourceEncoder;
1155}
1156
1157#[derive(Debug)]
1159#[repr(transparent)]
1160pub struct ResourceBorrowDecoder<T: ?Sized> {
1161 dec: CoreVecDecoderBytes,
1162 _ty: PhantomData<T>,
1163}
1164
1165impl<T: ?Sized> Default for ResourceBorrowDecoder<T> {
1166 fn default() -> Self {
1167 Self {
1168 dec: CoreVecDecoderBytes::default(),
1169 _ty: PhantomData,
1170 }
1171 }
1172}
1173
1174impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceBorrowDecoder<T> {
1175 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1176 None
1177 }
1178}
1179
1180impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceBorrowDecoder<T>> {
1181 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1182 None
1183 }
1184}
1185
1186impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceBorrow<T> {
1187 type Decoder = ResourceBorrowDecoder<T>;
1188 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1189}
1190
1191impl<T: ?Sized> tokio_util::codec::Decoder for ResourceBorrowDecoder<T> {
1192 type Item = ResourceBorrow<T>;
1193 type Error = std::io::Error;
1194
1195 #[instrument(level = "trace", skip(self), fields(ty = "borrow"))]
1196 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1197 let repr = self.dec.decode(src)?;
1198 Ok(repr.map(Self::Item::from))
1199 }
1200}
1201
1202#[derive(Debug)]
1204#[repr(transparent)]
1205pub struct ResourceOwnDecoder<T: ?Sized> {
1206 dec: CoreVecDecoderBytes,
1207 _ty: PhantomData<T>,
1208}
1209
1210impl<T: ?Sized> Default for ResourceOwnDecoder<T> {
1211 fn default() -> Self {
1212 Self {
1213 dec: CoreVecDecoderBytes::default(),
1214 _ty: PhantomData,
1215 }
1216 }
1217}
1218
1219impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceOwnDecoder<T> {
1220 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1221 None
1222 }
1223}
1224
1225impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceOwnDecoder<T>> {
1226 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1227 None
1228 }
1229}
1230
1231impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceOwn<T> {
1232 type Decoder = ResourceOwnDecoder<T>;
1233 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1234}
1235
1236impl<T: ?Sized> tokio_util::codec::Decoder for ResourceOwnDecoder<T> {
1237 type Item = ResourceOwn<T>;
1238 type Error = std::io::Error;
1239
1240 #[instrument(level = "trace", skip(self), fields(ty = "own"))]
1241 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1242 let repr = self.dec.decode(src)?;
1243 Ok(repr.map(Self::Item::from))
1244 }
1245}
1246
1247#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1249#[repr(transparent)]
1250pub struct UnitCodec;
1251
1252impl tokio_util::codec::Encoder<()> for UnitCodec {
1253 type Error = std::io::Error;
1254
1255 #[instrument(level = "trace", skip(self), ret)]
1256 fn encode(&mut self, (): (), dst: &mut BytesMut) -> std::io::Result<()> {
1257 Ok(())
1258 }
1259}
1260
1261impl tokio_util::codec::Encoder<&()> for UnitCodec {
1262 type Error = std::io::Error;
1263
1264 #[instrument(level = "trace", skip(self), ret)]
1265 fn encode(&mut self, (): &(), dst: &mut BytesMut) -> std::io::Result<()> {
1266 Ok(())
1267 }
1268}
1269
1270impl tokio_util::codec::Decoder for UnitCodec {
1271 type Item = ();
1272 type Error = std::io::Error;
1273
1274 #[instrument(level = "trace", skip(self))]
1275 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1276 Ok(Some(()))
1277 }
1278}
1279
1280pub trait TupleEncode<W>: Encode<W> {}
1282
1283pub trait TupleDecode<R>: Decode<R> {}
1285
1286impl<W> Encode<W> for () {
1287 type Encoder = UnitCodec;
1288}
1289
1290impl<W> TupleEncode<W> for () {}
1291
1292impl<R> Decode<R> for () {
1293 type Decoder = UnitCodec;
1294 type ListDecoder = CoreVecDecoder<Self::Decoder>;
1295}
1296
1297impl<R> TupleDecode<R> for () {}
1298
1299macro_rules! impl_tuple_codec {
1300 ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+; $($ct:ident),+) => {
1301 impl<W, $($ct),+> Deferred<W> for TupleEncoder::<($($ct),+,)>
1302 where
1303 W: crate::Index<W> + Send + Sync + 'static,
1304 $($ct: Deferred<W> + Default + 'static),+
1305 {
1306 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1307 let Self(($(mut $cn),+,)) = mem::take(self);
1308 let deferred = [ $($cn.take_deferred()),+ ];
1309 if deferred.iter().any(Option::is_some) {
1310 Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1311 } else {
1312 None
1313 }
1314 }
1315 }
1316
1317 impl<W, E, $($vt),+> Encode<W> for ($($vt),+,)
1318 where
1319 W: crate::Index<W> + Send + Sync + 'static,
1320 E: From<std::io::Error>,
1321 $(
1322 $vt: Encode<W>,
1323 $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1324 )+
1325 {
1326 type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1327 }
1328
1329 impl<W, E, $($vt),+> TupleEncode<W> for ($($vt),+,)
1330 where
1331 W: crate::Index<W> + Send + Sync + 'static,
1332 E: From<std::io::Error>,
1333 $(
1334 $vt: Encode<W>,
1335 $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1336 )+
1337 {
1338 }
1339
1340 impl<'a, W, E, $($vt),+> Encode<W> for &'a ($($vt),+,)
1341 where
1342 W: crate::Index<W> + Send + Sync + 'static,
1343 E: From<std::io::Error>,
1344 $(
1345 $vt: Encode<W>,
1346 $vt::Encoder: tokio_util::codec::Encoder<&'a $vt, Error = E> + 'static,
1347 )+
1348 {
1349 type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1350 }
1351
1352 impl<R, $($vt),+> Deferred<Incoming<R>> for TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>
1353 where
1354 R: crate::Index<R> + Send + Sync + 'static,
1355 $($vt: Decode<R>),+
1356 {
1357 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1358 let ($(mut $cn),+,) = mem::take(self).into_inner();
1359 let deferred = [ $($cn.take_deferred()),+ ];
1360 if deferred.iter().any(Option::is_some) {
1361 Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1362 } else {
1363 None
1364 }
1365 }
1366 }
1367
1368 impl<R, E, $($vt),+> Decode<R> for ($($vt),+,)
1369 where
1370 R: crate::Index<R> + Send + Sync + 'static,
1371 E: From<std::io::Error>,
1372 $(
1373 $vt: Decode<R> + Send + 'static,
1374 $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1375 )+
1376 {
1377 type Decoder = TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>;
1378 type ListDecoder = ListDecoder<Self::Decoder, R>;
1379 }
1380
1381 impl<R, E, $($vt),+> TupleDecode<R> for ($($vt),+,)
1382 where
1383 R: crate::Index<R> + Send + Sync + 'static,
1384 E: From<std::io::Error>,
1385 $(
1386 $vt: Decode<R> + Send + 'static,
1387 $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1388 )+
1389 {
1390 }
1391 };
1392}
1393
1394impl_tuple_codec!(
1395 v0;
1396 V0;
1397 c0;
1398 C0
1399);
1400
1401impl_tuple_codec!(
1402 v0, v1;
1403 V0, V1;
1404 c0, c1;
1405 C0, C1
1406);
1407
1408impl_tuple_codec!(
1409 v0, v1, v2;
1410 V0, V1, V2;
1411 c0, c1, c2;
1412 C0, C1, C2
1413);
1414
1415impl_tuple_codec!(
1416 v0, v1, v2, v3;
1417 V0, V1, V2, V3;
1418 c0, c1, c2, c3;
1419 C0, C1, C2, C3
1420);
1421
1422impl_tuple_codec!(
1423 v0, v1, v2, v3, v4;
1424 V0, V1, V2, V3, V4;
1425 c0, c1, c2, c3, c4;
1426 C0, C1, C2, C3, C4
1427);
1428
1429impl_tuple_codec!(
1430 v0, v1, v2, v3, v4, v5;
1431 V0, V1, V2, V3, V4, V5;
1432 c0, c1, c2, c3, c4, c5;
1433 C0, C1, C2, C3, C4, C5
1434);
1435
1436impl_tuple_codec!(
1437 v0, v1, v2, v3, v4, v5, v6;
1438 V0, V1, V2, V3, V4, V5, V6;
1439 c0, c1, c2, c3, c4, c5, c6;
1440 C0, C1, C2, C3, C4, C5, C6
1441);
1442
1443impl_tuple_codec!(
1444 v0, v1, v2, v3, v4, v5, v6, v7;
1445 V0, V1, V2, V3, V4, V5, V6, V7;
1446 c0, c1, c2, c3, c4, c5, c6, c7;
1447 C0, C1, C2, C3, C4, C5, C6, C7
1448);
1449
1450impl_tuple_codec!(
1451 v0, v1, v2, v3, v4, v5, v6, v7, v8;
1452 V0, V1, V2, V3, V4, V5, V6, V7, V8;
1453 c0, c1, c2, c3, c4, c5, c6, c7, c8;
1454 C0, C1, C2, C3, C4, C5, C6, C7, C8
1455);
1456
1457impl_tuple_codec!(
1458 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9;
1459 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9;
1460 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9;
1461 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9
1462);
1463
1464impl_tuple_codec!(
1465 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10;
1466 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10;
1467 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10;
1468 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10
1469);
1470
1471impl_tuple_codec!(
1472 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11;
1473 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11;
1474 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11;
1475 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11
1476);
1477
1478impl_tuple_codec!(
1479 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12;
1480 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12;
1481 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
1482 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12
1483);
1484
1485impl_tuple_codec!(
1486 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13;
1487 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13;
1488 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13;
1489 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13
1490);
1491
1492impl_tuple_codec!(
1493 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14;
1494 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14;
1495 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14;
1496 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14
1497);
1498
1499impl_tuple_codec!(
1500 v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15;
1501 V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15;
1502 c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15;
1503 C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15
1504);
1505
1506pub struct FutureEncoder<W> {
1508 deferred: Option<DeferredFn<W>>,
1509}
1510
1511impl<W> Default for FutureEncoder<W> {
1512 fn default() -> Self {
1513 Self { deferred: None }
1514 }
1515}
1516
1517impl<W> Deferred<W> for FutureEncoder<W> {
1518 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1519 self.deferred.take()
1520 }
1521}
1522
1523impl<T, W, Fut> tokio_util::codec::Encoder<Fut> for FutureEncoder<W>
1524where
1525 T: Encode<W>,
1526 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1527 Fut: Future<Output = T> + Send + 'static,
1528 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1529{
1530 type Error = std::io::Error;
1531
1532 #[instrument(level = "trace", skip(self, item), fields(ty = "future"))]
1533 fn encode(&mut self, item: Fut, dst: &mut BytesMut) -> std::io::Result<()> {
1534 dst.reserve(1);
1536 dst.put_u8(0x00);
1537 let span = Span::current();
1538 self.deferred = Some(Box::new(|mut w, path| {
1539 Box::pin(
1540 async move {
1541 if !path.is_empty() {
1542 w = w.index(&path).map_err(std::io::Error::other)?;
1543 };
1544 let item = item.await;
1545 let mut enc = T::Encoder::default();
1546 let mut buf = BytesMut::default();
1547 enc.encode(item, &mut buf)?;
1548 w.write_all(&buf).await?;
1549 if let Some(f) = enc.take_deferred() {
1550 f(w, Vec::default()).await
1551 } else {
1552 Ok(())
1553 }
1554 }
1555 .instrument(span),
1556 )
1557 }));
1558 Ok(())
1559 }
1560}
1561
1562impl<T, W> Encode<W> for Pin<Box<dyn Future<Output = T> + Send>>
1563where
1564 T: Encode<W> + 'static,
1565 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1566 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1567{
1568 type Encoder = FutureEncoder<W>;
1569}
1570
1571pub struct FutureDecoder<T, R>
1573where
1574 T: Decode<R>,
1575{
1576 dec: OptionDecoder<T::Decoder>,
1577 deferred: Option<DeferredFn<Incoming<R>>>,
1578 _ty: PhantomData<T>,
1579}
1580
1581impl<T, R> Default for FutureDecoder<T, R>
1582where
1583 T: Decode<R>,
1584{
1585 fn default() -> Self {
1586 Self {
1587 dec: OptionDecoder::default(),
1588 deferred: None,
1589 _ty: PhantomData,
1590 }
1591 }
1592}
1593
1594impl<T, R> Deferred<Incoming<R>> for FutureDecoder<T, R>
1595where
1596 T: Decode<R>,
1597{
1598 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1599 self.deferred.take()
1600 }
1601}
1602
1603impl<T, R> tokio_util::codec::Decoder for FutureDecoder<T, R>
1604where
1605 T: Decode<R> + Send + 'static,
1606 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1607 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1608{
1609 type Item = Pin<Box<dyn Future<Output = T> + Send>>;
1610 type Error = <T::Decoder as tokio_util::codec::Decoder>::Error;
1611
1612 #[instrument(level = "trace", skip(self), fields(ty = "future"))]
1613 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1614 let Some(item) = self.dec.decode(src)? else {
1615 return Ok(None);
1616 };
1617 if let Some(item) = item {
1618 self.deferred = self.dec.take_deferred();
1619 return Ok(Some(Box::pin(async { item })));
1620 }
1621
1622 let (tx, rx) = oneshot::channel();
1624 let dec = mem::take(&mut self.dec).into_inner();
1625 let span = Span::current();
1626 self.deferred = Some(Box::new(|mut r, path| {
1627 Box::pin(
1628 async move {
1629 if !path.is_empty() {
1630 r = r.index(&path).map_err(std::io::Error::other)?;
1631 };
1632 let mut dec = FramedRead::new(r, dec);
1633 trace!(?path, "receiving future element");
1634 let Some(item) = dec.next().await else {
1635 return Err(std::io::ErrorKind::UnexpectedEof.into());
1636 };
1637 let item = item?;
1638 if tx.send(item).is_err() {
1639 debug!("future receiver closed, discard data");
1640 return Ok(());
1641 }
1642 if let Some(rx) = dec.decoder_mut().take_deferred() {
1643 let buf = mem::take(dec.read_buffer_mut());
1644 let mut r = dec.into_inner();
1645 if !r.buffer.is_empty() {
1646 r.buffer.unsplit(buf);
1647 } else {
1648 r.buffer = buf;
1649 }
1650 rx(r, Vec::default()).await?;
1651 }
1652 Ok(())
1653 }
1654 .instrument(span),
1655 )
1656 }));
1657 return Ok(Some(Box::pin(async {
1658 let Ok(ret) = rx.await else {
1659 error!("future I/O dropped");
1660 return pending().await;
1661 };
1662 ret
1663 })));
1664 }
1665}
1666
1667impl<T, R> Decode<R> for Pin<Box<dyn Future<Output = T> + Send>>
1668where
1669 T: Decode<R> + Send + 'static,
1670 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1671 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1672{
1673 type Decoder = FutureDecoder<T, R>;
1674 type ListDecoder = ListDecoder<Self::Decoder, R>;
1675}
1676
1677pub struct StreamEncoder<W> {
1679 deferred: Option<DeferredFn<W>>,
1680}
1681
1682impl<W> Default for StreamEncoder<W> {
1683 fn default() -> Self {
1684 Self { deferred: None }
1685 }
1686}
1687
1688impl<W> Deferred<W> for StreamEncoder<W> {
1689 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1690 self.deferred.take()
1691 }
1692}
1693
1694impl<T, W, S> tokio_util::codec::Encoder<S> for StreamEncoder<W>
1695where
1696 T: Encode<W> + Send + 'static,
1697 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1698 S: Stream<Item = Vec<T>> + Send + Unpin + 'static,
1699 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1700{
1701 type Error = std::io::Error;
1702
1703 #[instrument(level = "trace", skip(self, items), fields(ty = "stream"))]
1704 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1705 dst.reserve(1);
1707 dst.put_u8(0x00);
1708 let span = Span::current();
1709 self.deferred = Some(Box::new(|mut w, path| {
1710 Box::pin(async move {
1711 if !path.is_empty() {
1712 w = w.index(&path).map_err(std::io::Error::other)?;
1713 };
1714 let mut enc = T::Encoder::default();
1715 let mut buf = BytesMut::default();
1716 let mut tasks = JoinSet::new();
1717 let mut i = 0_u64;
1718 loop {
1719 select! {
1720 chunk = items.next() => {
1721 let Some(chunk) = chunk else {
1722 trace!("writing stream end");
1723 buf.reserve(1);
1724 buf.put_u8(0x00);
1725 w.write_all(&buf).await?;
1726 while let Some(res) = tasks.join_next().await {
1727 trace!(?res, "receiver task finished");
1728 res??;
1729 }
1730 return Ok(())
1731 };
1732 let n = u32::try_from(chunk.len()).map_err(|err| {
1733 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1734 })?;
1735 let end = i.checked_add(n.into()).ok_or_else(|| {
1736 std::io::Error::new(
1737 std::io::ErrorKind::InvalidInput,
1738 "stream element index would overflow u64",
1739 )
1740 })?;
1741 trace!(n, "encoding chunk length");
1742 Leb128Encoder.encode(n, &mut buf)?;
1743 trace!(i, buf = format!("{buf:02x?}"), "writing stream chunk items");
1744
1745 buf.reserve(chunk.len());
1746 for (i, item) in zip(i.., chunk) {
1747 enc.encode(item, &mut buf)?;
1748 if let Some(f) = enc.take_deferred() {
1749 let i = i
1750 .try_into()
1751 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
1752 let w = w.index(&[i]).map_err(std::io::Error::other)?;
1753 trace!("spawning transmit task");
1754 tasks.spawn(f(w, Vec::default()));
1755 }
1756 }
1757 i = end;
1758 }
1759 Some(res) = tasks.join_next() => {
1760 trace!(?res, "receiver task finished");
1761 res??;
1762 }
1763 res = w.write(&buf), if !buf.is_empty() => {
1764 let n = res?;
1765 trace!(?buf, n, "wrote bytes from buffer");
1766 buf.advance(n);
1767 }
1768 }
1769 }
1770 }.instrument(span))
1771 }));
1772 Ok(())
1773 }
1774}
1775
1776impl<T, W> Encode<W> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
1777where
1778 T: Encode<W> + Send + 'static,
1779 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1780 std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1781{
1782 type Encoder = StreamEncoder<W>;
1783}
1784
1785pub struct StreamEncoderBytes<W> {
1787 deferred: Option<DeferredFn<W>>,
1788}
1789
1790impl<W> Default for StreamEncoderBytes<W> {
1791 fn default() -> Self {
1792 Self { deferred: None }
1793 }
1794}
1795
1796impl<W> Deferred<W> for StreamEncoderBytes<W> {
1797 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1798 self.deferred.take()
1799 }
1800}
1801
1802impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderBytes<W>
1803where
1804 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1805 S: Stream<Item = Bytes> + Send + Unpin + 'static,
1806{
1807 type Error = std::io::Error;
1808
1809 #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1810 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1811 dst.reserve(1);
1813 dst.put_u8(0x00);
1814 self.deferred = Some(Box::new(|mut w, path| {
1815 Box::pin(async move {
1816 if !path.is_empty() {
1817 w = w.index(&path).map_err(std::io::Error::other)?;
1818 };
1819 let mut buf = BytesMut::default();
1820 loop {
1821 select! {
1822 chunk = items.next() => {
1823 let Some(chunk) = chunk else {
1824 trace!("writing stream end");
1825 buf.reserve(1);
1826 buf.put_u8(0x00);
1827 return w.write_all(&buf).await
1828 };
1829 let n = u32::try_from(chunk.len()).map_err(|err| {
1830 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1831 })?;
1832 trace!(n, "encoding chunk length");
1833 Leb128Encoder.encode(n, &mut buf)?;
1834 buf.extend_from_slice(&chunk);
1835 }
1836 res = w.write(&buf), if !buf.is_empty() => {
1837 let n = res?;
1838 buf.advance(n);
1839 }
1840 }
1841 }
1842 })
1843 }));
1844 Ok(())
1845 }
1846}
1847
1848impl<W> Encode<W> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
1849where
1850 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1851{
1852 type Encoder = StreamEncoderBytes<W>;
1853}
1854
1855pub struct StreamEncoderRead<W> {
1857 deferred: Option<DeferredFn<W>>,
1858}
1859
1860impl<W> Default for StreamEncoderRead<W> {
1861 fn default() -> Self {
1862 Self { deferred: None }
1863 }
1864}
1865
1866impl<W> Deferred<W> for StreamEncoderRead<W> {
1867 fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1868 self.deferred.take()
1869 }
1870}
1871
1872impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderRead<W>
1873where
1874 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1875 S: AsyncRead + Send + Unpin + 'static,
1876{
1877 type Error = std::io::Error;
1878
1879 #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1880 fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1881 dst.reserve(1);
1883 dst.put_u8(0x00);
1884 self.deferred = Some(Box::new(|mut w, path| {
1885 Box::pin(async move {
1886 if !path.is_empty() {
1887 w = w.index(&path).map_err(std::io::Error::other)?;
1888 };
1889 let mut buf = BytesMut::default();
1890 let mut chunk = BytesMut::default();
1891 loop {
1892 select! {
1893 res = items.read_buf(&mut chunk) => {
1894 let n = res?;
1895 if n == 0 {
1896 trace!("writing stream end");
1897 buf.reserve(1);
1898 buf.put_u8(0x00);
1899 return w.write_all(&buf).await
1900 }
1901 let n = u32::try_from(n).map_err(|err| {
1902 std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1903 })?;
1904 trace!(n, "encoding chunk length");
1905 Leb128Encoder.encode(n, &mut buf)?;
1906 buf.extend_from_slice(&chunk);
1907 chunk.clear();
1908 }
1909 res = w.write(&buf), if !buf.is_empty() => {
1910 let n = res?;
1911 buf.advance(n);
1912 }
1913 }
1914 }
1915 })
1916 }));
1917 Ok(())
1918 }
1919}
1920
1921impl<W> Encode<W> for Pin<Box<dyn AsyncRead + Send>>
1922where
1923 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1924{
1925 type Encoder = StreamEncoderRead<W>;
1926}
1927
1928impl<T, W> Encode<W> for std::io::Cursor<T>
1929where
1930 T: AsRef<[u8]> + Send + Unpin + 'static,
1931 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1932{
1933 type Encoder = StreamEncoderRead<W>;
1934}
1935
1936impl<W> Encode<W> for tokio::io::Empty
1937where
1938 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1939{
1940 type Encoder = StreamEncoderRead<W>;
1941}
1942
1943#[cfg(feature = "io-std")]
1944impl<W> Encode<W> for tokio::io::Stdin
1945where
1946 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1947{
1948 type Encoder = StreamEncoderRead<W>;
1949}
1950
1951#[cfg(feature = "fs")]
1952impl<W> Encode<W> for tokio::fs::File
1953where
1954 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1955{
1956 type Encoder = StreamEncoderRead<W>;
1957}
1958
1959#[cfg(feature = "net")]
1960impl<W> Encode<W> for tokio::net::TcpStream
1961where
1962 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1963{
1964 type Encoder = StreamEncoderRead<W>;
1965}
1966
1967#[cfg(all(unix, feature = "net"))]
1968impl<W> Encode<W> for tokio::net::UnixStream
1969where
1970 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1971{
1972 type Encoder = StreamEncoderRead<W>;
1973}
1974
1975#[cfg(all(unix, feature = "net"))]
1976impl<W> Encode<W> for tokio::net::unix::pipe::Receiver
1977where
1978 W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1979{
1980 type Encoder = StreamEncoderRead<W>;
1981}
1982
1983pub struct StreamDecoder<T, R>
1985where
1986 T: Decode<R>,
1987{
1988 dec: T::ListDecoder,
1989 deferred: Option<DeferredFn<Incoming<R>>>,
1990 _ty: PhantomData<T>,
1991}
1992
1993impl<T, R> Default for StreamDecoder<T, R>
1994where
1995 T: Decode<R>,
1996{
1997 fn default() -> Self {
1998 Self {
1999 dec: T::ListDecoder::default(),
2000 deferred: None,
2001 _ty: PhantomData,
2002 }
2003 }
2004}
2005
2006impl<T, R> Deferred<Incoming<R>> for StreamDecoder<T, R>
2007where
2008 T: Decode<R>,
2009{
2010 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2011 self.deferred.take()
2012 }
2013}
2014
2015#[instrument(level = "trace", skip(dec, r, tx), ret)]
2016async fn handle_deferred_stream<C, T, R>(
2017 dec: C,
2018 mut r: Incoming<R>,
2019 path: Vec<usize>,
2020 tx: mpsc::Sender<Vec<T>>,
2021) -> std::io::Result<()>
2022where
2023 C: tokio_util::codec::Decoder<Item = T> + Deferred<Incoming<R>>,
2024 R: AsyncRead + crate::Index<R> + Send + Unpin + 'static,
2025 std::io::Error: From<C::Error>,
2026{
2027 let dec = ListDecoder::new(dec);
2028 if !path.is_empty() {
2029 r = r.index(&path).map_err(std::io::Error::other)?;
2030 };
2031 let mut framed = FramedRead::new(r, dec);
2032 let mut tasks = JoinSet::new();
2033 let mut i = 0_usize;
2034 loop {
2035 trace!("receiving pending stream chunk");
2036 select! {
2037 Some(chunk) = framed.next() => {
2038 let chunk = chunk?;
2039 if chunk.is_empty() {
2040 trace!("received stream end");
2041 while let Some(res) = tasks.join_next().await {
2042 res??;
2043 }
2044 return Ok(())
2045 }
2046 let end = i.checked_add(chunk.len()).ok_or_else(|| {
2047 std::io::Error::new(
2048 std::io::ErrorKind::InvalidInput,
2049 "stream element index would overflow usize",
2050 )
2051 })?;
2052 trace!(i, end, "received stream chunk");
2053 if tx.send(chunk).await.is_err() {
2054 debug!("stream receiver closed, discard data");
2055 return Ok(())
2056 }
2057 for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
2058 if let Some(deferred) = deferred {
2059 let r = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
2060 trace!("spawning receive task");
2061 tasks.spawn(deferred(r, Vec::default()));
2062 }
2063 }
2064 i = end;
2065 },
2066 Some(res) = tasks.join_next() => {
2067 trace!(?res, "receiver task finished");
2068 res??;
2069 }
2070 else => {
2071 return Ok(());
2072 }
2073 }
2074 }
2075}
2076
2077impl<T, R> tokio_util::codec::Decoder for StreamDecoder<T, R>
2078where
2079 T: Decode<R> + Send + 'static,
2080 T::ListDecoder: Deferred<Incoming<R>>,
2081 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2082 <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2083 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2084{
2085 type Item = Pin<Box<dyn Stream<Item = Vec<T>> + Send>>;
2086 type Error = <<T as Decode<R>>::ListDecoder as tokio_util::codec::Decoder>::Error;
2087
2088 #[instrument(level = "trace", skip(self), fields(ty = "stream"))]
2089 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2090 let Some(chunk) = self.dec.decode(src)? else {
2091 return Ok(None);
2092 };
2093 if !chunk.is_empty() {
2094 self.deferred = self.dec.take_deferred();
2095 return Ok(Some(Box::pin(stream::iter([chunk]))));
2096 }
2097
2098 let (tx, rx) = mpsc::channel(128);
2100 self.deferred = Some(Box::new(|r, path| {
2101 Box::pin(
2102 async move { handle_deferred_stream(T::Decoder::default(), r, path, tx).await },
2103 )
2104 }));
2105 return Ok(Some(Box::pin(ReceiverStream::new(rx))));
2106 }
2107}
2108
2109impl<T, R> Decode<R> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
2110where
2111 T: Decode<R> + Send + 'static,
2112 T::ListDecoder: Deferred<Incoming<R>> + Send,
2113 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2114 <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2115 std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2116{
2117 type Decoder = StreamDecoder<T, R>;
2118 type ListDecoder = ListDecoder<Self::Decoder, R>;
2119}
2120
2121pub struct StreamDecoderBytes<R> {
2123 dec: CoreVecDecoderBytes,
2124 deferred: Option<DeferredFn<Incoming<R>>>,
2125}
2126
2127impl<R> Default for StreamDecoderBytes<R> {
2128 fn default() -> Self {
2129 Self {
2130 dec: CoreVecDecoderBytes::default(),
2131 deferred: None,
2132 }
2133 }
2134}
2135
2136impl<R> Deferred<Incoming<R>> for StreamDecoderBytes<R> {
2137 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2138 self.deferred.take()
2139 }
2140}
2141
2142impl<R> tokio_util::codec::Decoder for StreamDecoderBytes<R>
2143where
2144 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2145{
2146 type Item = Pin<Box<dyn Stream<Item = Bytes> + Send>>;
2147 type Error = std::io::Error;
2148
2149 #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2150 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2151 let Some(chunk) = self.dec.decode(src)? else {
2152 return Ok(None);
2153 };
2154 if !chunk.is_empty() {
2155 return Ok(Some(Box::pin(stream::iter([chunk]))));
2156 }
2157
2158 let (tx, rx) = mpsc::channel(128);
2160 let dec = mem::take(&mut self.dec);
2161 let span = Span::current();
2162 self.deferred = Some(Box::new(|mut r, path| {
2163 Box::pin(
2164 async move {
2165 if !path.is_empty() {
2166 r = r.index(&path).map_err(std::io::Error::other)?;
2167 };
2168 let mut framed = FramedRead::new(r, dec);
2169 trace!(?path, "receiving pending byte stream chunk");
2170 while let Some(chunk) = framed.next().await {
2171 let chunk = chunk?;
2172 if chunk.is_empty() {
2173 trace!("received stream end");
2174 return Ok(());
2175 }
2176 trace!(?chunk, "received pending byte stream chunk");
2177 if tx.send(chunk).await.is_err() {
2178 debug!("stream receiver closed, discard data");
2179 return Ok(());
2180 }
2181 }
2182 Ok(())
2183 }
2184 .instrument(span),
2185 )
2186 }));
2187 return Ok(Some(Box::pin(ReceiverStream::new(rx))));
2188 }
2189}
2190
2191impl<R> Decode<R> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
2192where
2193 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2194{
2195 type Decoder = StreamDecoderBytes<R>;
2196 type ListDecoder = ListDecoder<Self::Decoder, R>;
2197}
2198
2199pub struct StreamDecoderRead<R> {
2201 dec: CoreVecDecoderBytes,
2202 deferred: Option<DeferredFn<Incoming<R>>>,
2203}
2204
2205impl<R> Default for StreamDecoderRead<R> {
2206 fn default() -> Self {
2207 Self {
2208 dec: CoreVecDecoderBytes::default(),
2209 deferred: None,
2210 }
2211 }
2212}
2213
2214impl<R> Deferred<Incoming<R>> for StreamDecoderRead<R> {
2215 fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2216 self.deferred.take()
2217 }
2218}
2219
2220impl<R> tokio_util::codec::Decoder for StreamDecoderRead<R>
2221where
2222 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2223{
2224 type Item = Pin<Box<dyn AsyncRead + Send>>;
2225 type Error = std::io::Error;
2226
2227 #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2228 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2229 let Some(chunk) = self.dec.decode(src)? else {
2230 return Ok(None);
2231 };
2232 if !chunk.is_empty() {
2233 return Ok(Some(Box::pin(std::io::Cursor::new(chunk))));
2234 }
2235
2236 let (tx, rx) = mpsc::channel(128);
2238 let dec = mem::take(&mut self.dec);
2239 self.deferred = Some(Box::new(|mut r, path| {
2240 Box::pin(async move {
2241 if !path.is_empty() {
2242 r = r.index(&path).map_err(std::io::Error::other)?;
2243 };
2244 let mut framed = FramedRead::new(r, dec);
2245 trace!("receiving pending byte stream chunk");
2246 while let Some(chunk) = framed.next().await {
2247 let chunk = chunk?;
2248 if chunk.is_empty() {
2249 trace!("received stream end");
2250 return Ok(());
2251 }
2252 trace!(?chunk, "received byte stream chunk");
2253 if tx.send(std::io::Result::Ok(chunk)).await.is_err() {
2254 debug!("stream receiver closed, discard data");
2255 return Ok(());
2256 }
2257 }
2258 Ok(())
2259 })
2260 }));
2261 return Ok(Some(Box::pin(StreamReader::new(ReceiverStream::new(rx)))));
2262 }
2263}
2264
2265impl<R> Decode<R> for Pin<Box<dyn AsyncRead + Send>>
2266where
2267 R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2268{
2269 type Decoder = StreamDecoderRead<R>;
2270 type ListDecoder = ListDecoder<Self::Decoder, R>;
2271}
2272
2273#[cfg(test)]
2274mod tests {
2275 use anyhow::bail;
2276
2277 use super::*;
2278
2279 struct NoopStream;
2280
2281 impl crate::Index<Self> for NoopStream {
2282 fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
2283 panic!("index should not be called with path {path:?}")
2284 }
2285 }
2286
2287 #[test_log::test(tokio::test)]
2288 async fn codec() -> anyhow::Result<()> {
2289 let mut buf = BytesMut::new();
2290 let mut enc = <(u8, u32) as Encode<NoopStream>>::Encoder::default();
2291 enc.encode((0x42, 0x42), &mut buf)?;
2292 if let Some(_f) = Deferred::<NoopStream>::take_deferred(&mut enc) {
2293 bail!("no deferred write should have been returned");
2294 }
2295 assert_eq!(buf.as_ref(), b"\x42\x42");
2296 Ok(())
2297 }
2298}