wrpc_transport/
value.rs

1use core::any::TypeId;
2use core::fmt::{self, Debug};
3use core::future::{pending, Future};
4use core::hash::{Hash, Hasher};
5use core::iter::zip;
6use core::marker::PhantomData;
7use core::mem;
8use core::ops::{Deref, DerefMut};
9use core::pin::Pin;
10
11use bytes::{Buf as _, BufMut as _, Bytes, BytesMut};
12use futures::stream::{self, FuturesUnordered};
13use futures::{Stream, StreamExt as _, TryStreamExt as _};
14use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::codec::{Encoder as _, FramedRead};
20use tokio_util::io::StreamReader;
21use tracing::{debug, error, instrument, trace, Instrument as _, Span};
22use wasm_tokio::cm::{
23    BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
24    ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
25    U32Codec, U64Codec, U8Codec,
26};
27use wasm_tokio::{
28    CoreNameDecoder, CoreNameEncoder, CoreVecDecoder, CoreVecDecoderBytes, CoreVecEncoderBytes,
29    Leb128DecoderI128, Leb128DecoderI16, Leb128DecoderI32, Leb128DecoderI64, Leb128DecoderI8,
30    Leb128DecoderU128, Leb128DecoderU16, Leb128DecoderU32, Leb128DecoderU64, Leb128DecoderU8,
31    Leb128Encoder, Utf8Codec,
32};
33
34use crate::{Incoming, Index as _};
35
36/// Borrowed resource handle, represented as an opaque byte blob
37#[repr(transparent)]
38pub struct ResourceBorrow<T: ?Sized> {
39    repr: Bytes,
40    _ty: PhantomData<T>,
41}
42
43impl<T: ?Sized> From<Bytes> for ResourceBorrow<T> {
44    fn from(repr: Bytes) -> Self {
45        Self {
46            repr,
47            _ty: PhantomData,
48        }
49    }
50}
51
52impl<T: ?Sized> From<Vec<u8>> for ResourceBorrow<T> {
53    fn from(repr: Vec<u8>) -> Self {
54        Self {
55            repr: repr.into(),
56            _ty: PhantomData,
57        }
58    }
59}
60
61impl<T: ?Sized> From<ResourceBorrow<T>> for Bytes {
62    fn from(ResourceBorrow { repr, .. }: ResourceBorrow<T>) -> Self {
63        repr
64    }
65}
66
67impl<T: ?Sized> PartialEq for ResourceBorrow<T> {
68    fn eq(&self, other: &Self) -> bool {
69        self.repr == other.repr
70    }
71}
72
73impl<T: ?Sized> Eq for ResourceBorrow<T> {}
74
75impl<T: ?Sized> Hash for ResourceBorrow<T> {
76    fn hash<H: Hasher>(&self, state: &mut H) {
77        self.repr.hash(state);
78    }
79}
80
81impl<T: ?Sized> AsRef<[u8]> for ResourceBorrow<T> {
82    fn as_ref(&self) -> &[u8] {
83        &self.repr
84    }
85}
86
87impl<T: ?Sized> AsRef<Bytes> for ResourceBorrow<T> {
88    fn as_ref(&self) -> &Bytes {
89        &self.repr
90    }
91}
92
93impl<T: ?Sized + 'static> Debug for ResourceBorrow<T> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(f, "borrow<{:?}>", TypeId::of::<T>())
96    }
97}
98
99impl<T: ?Sized> Clone for ResourceBorrow<T> {
100    fn clone(&self) -> Self {
101        Self {
102            repr: self.repr.clone(),
103            _ty: PhantomData,
104        }
105    }
106}
107
108impl<T: ?Sized> ResourceBorrow<T> {
109    /// Constructs a new borrowed resource handle
110    pub fn new(repr: impl Into<Bytes>) -> Self {
111        Self::from(repr.into())
112    }
113}
114
115/// Owned resource handle, represented as an opaque byte blob
116#[repr(transparent)]
117pub struct ResourceOwn<T: ?Sized> {
118    repr: Bytes,
119    _ty: PhantomData<T>,
120}
121
122impl<T: ?Sized> From<ResourceOwn<T>> for ResourceBorrow<T> {
123    fn from(ResourceOwn { repr, _ty }: ResourceOwn<T>) -> Self {
124        Self {
125            repr,
126            _ty: PhantomData,
127        }
128    }
129}
130
131impl<T: ?Sized> From<Bytes> for ResourceOwn<T> {
132    fn from(repr: Bytes) -> Self {
133        Self {
134            repr,
135            _ty: PhantomData,
136        }
137    }
138}
139
140impl<T: ?Sized> From<Vec<u8>> for ResourceOwn<T> {
141    fn from(repr: Vec<u8>) -> Self {
142        Self {
143            repr: repr.into(),
144            _ty: PhantomData,
145        }
146    }
147}
148
149impl<T: ?Sized> From<ResourceOwn<T>> for Bytes {
150    fn from(ResourceOwn { repr, .. }: ResourceOwn<T>) -> Self {
151        repr
152    }
153}
154
155impl<T: ?Sized> PartialEq for ResourceOwn<T> {
156    fn eq(&self, other: &Self) -> bool {
157        self.repr == other.repr
158    }
159}
160
161impl<T: ?Sized> Eq for ResourceOwn<T> {}
162
163impl<T: ?Sized> Hash for ResourceOwn<T> {
164    fn hash<H: Hasher>(&self, state: &mut H) {
165        self.repr.hash(state);
166    }
167}
168
169impl<T: ?Sized> AsRef<[u8]> for ResourceOwn<T> {
170    fn as_ref(&self) -> &[u8] {
171        &self.repr
172    }
173}
174
175impl<T: ?Sized> AsRef<Bytes> for ResourceOwn<T> {
176    fn as_ref(&self) -> &Bytes {
177        &self.repr
178    }
179}
180
181impl<T: ?Sized + 'static> Debug for ResourceOwn<T> {
182    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183        write!(f, "own<{:?}>", TypeId::of::<T>())
184    }
185}
186
187impl<T: ?Sized> Clone for ResourceOwn<T> {
188    fn clone(&self) -> Self {
189        Self {
190            repr: self.repr.clone(),
191            _ty: PhantomData,
192        }
193    }
194}
195
196impl<T: ?Sized> ResourceOwn<T> {
197    /// Constructs a new owned resource handle
198    pub fn new(repr: impl Into<Bytes>) -> Self {
199        Self::from(repr.into())
200    }
201
202    /// Returns the owned handle as [`ResourceBorrow`]
203    pub fn as_borrow(&self) -> ResourceBorrow<T> {
204        ResourceBorrow {
205            repr: self.repr.clone(),
206            _ty: PhantomData,
207        }
208    }
209}
210
211/// Deferred operation used for async value processing
212pub type DeferredFn<T> = Box<
213    dyn FnOnce(T, Vec<usize>) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>> + Send,
214>;
215
216/// Handles async processing state for codecs
217pub trait Deferred<T> {
218    /// Takes a deferred async processing operation, if any
219    fn take_deferred(&mut self) -> Option<DeferredFn<T>>;
220}
221
222macro_rules! impl_deferred_sync {
223    ($t:ty) => {
224        impl<T> Deferred<T> for $t {
225            fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
226                None
227            }
228        }
229    };
230}
231
232impl_deferred_sync!(BoolCodec);
233impl_deferred_sync!(S8Codec);
234impl_deferred_sync!(U8Codec);
235impl_deferred_sync!(S16Codec);
236impl_deferred_sync!(U16Codec);
237impl_deferred_sync!(S32Codec);
238impl_deferred_sync!(U32Codec);
239impl_deferred_sync!(S64Codec);
240impl_deferred_sync!(U64Codec);
241impl_deferred_sync!(F32Codec);
242impl_deferred_sync!(F64Codec);
243impl_deferred_sync!(CoreNameDecoder);
244impl_deferred_sync!(CoreNameEncoder);
245impl_deferred_sync!(CoreVecDecoderBytes);
246impl_deferred_sync!(CoreVecEncoderBytes);
247impl_deferred_sync!(Utf8Codec);
248impl_deferred_sync!(PrimValEncoder);
249impl_deferred_sync!(Leb128Encoder);
250impl_deferred_sync!(Leb128DecoderI8);
251impl_deferred_sync!(Leb128DecoderU8);
252impl_deferred_sync!(Leb128DecoderI16);
253impl_deferred_sync!(Leb128DecoderU16);
254impl_deferred_sync!(Leb128DecoderI32);
255impl_deferred_sync!(Leb128DecoderU32);
256impl_deferred_sync!(Leb128DecoderI64);
257impl_deferred_sync!(Leb128DecoderU64);
258impl_deferred_sync!(Leb128DecoderI128);
259impl_deferred_sync!(Leb128DecoderU128);
260impl_deferred_sync!(ResourceEncoder);
261impl_deferred_sync!(UnitCodec);
262impl_deferred_sync!(ListDecoderU8);
263
264impl_deferred_sync!(CoreVecDecoder<BoolCodec>);
265impl_deferred_sync!(CoreVecDecoder<S8Codec>);
266impl_deferred_sync!(CoreVecDecoder<U8Codec>);
267impl_deferred_sync!(CoreVecDecoder<S16Codec>);
268impl_deferred_sync!(CoreVecDecoder<U16Codec>);
269impl_deferred_sync!(CoreVecDecoder<S32Codec>);
270impl_deferred_sync!(CoreVecDecoder<U32Codec>);
271impl_deferred_sync!(CoreVecDecoder<S64Codec>);
272impl_deferred_sync!(CoreVecDecoder<U64Codec>);
273impl_deferred_sync!(CoreVecDecoder<F32Codec>);
274impl_deferred_sync!(CoreVecDecoder<F64Codec>);
275impl_deferred_sync!(CoreVecDecoder<CoreNameDecoder>);
276impl_deferred_sync!(CoreVecDecoder<CoreVecDecoderBytes>);
277impl_deferred_sync!(CoreVecDecoder<Utf8Codec>);
278impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI8>);
279impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU8>);
280impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI16>);
281impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU16>);
282impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI32>);
283impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU32>);
284impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI64>);
285impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU64>);
286impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI128>);
287impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU128>);
288impl_deferred_sync!(CoreVecDecoder<UnitCodec>);
289
290/// Codec for synchronous values
291///
292/// This is a wrapper struct, which provides a no-op [Deferred] implementation
293/// for any codec.
294pub struct SyncCodec<T>(pub T);
295
296impl<T> Deref for SyncCodec<T> {
297    type Target = T;
298
299    fn deref(&self) -> &Self::Target {
300        &self.0
301    }
302}
303
304impl<T> DerefMut for SyncCodec<T> {
305    fn deref_mut(&mut self) -> &mut Self::Target {
306        &mut self.0
307    }
308}
309
310impl<T, C> Deferred<T> for SyncCodec<C> {
311    fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
312        None
313    }
314}
315
316impl<T: Default> Default for SyncCodec<T> {
317    fn default() -> Self {
318        Self(T::default())
319    }
320}
321
322impl<T, I> tokio_util::codec::Encoder<I> for SyncCodec<T>
323where
324    T: tokio_util::codec::Encoder<I>,
325{
326    type Error = T::Error;
327
328    fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
329        self.0.encode(item, dst)
330    }
331}
332
333impl<T> tokio_util::codec::Decoder for SyncCodec<T>
334where
335    T: tokio_util::codec::Decoder,
336{
337    type Item = T::Item;
338    type Error = T::Error;
339
340    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
341        self.0.decode(src)
342    }
343
344    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
345        self.0.decode_eof(buf)
346    }
347
348    fn framed<IO: AsyncRead + AsyncWrite + Sized>(
349        self,
350        io: IO,
351    ) -> tokio_util::codec::Framed<IO, Self>
352    where
353        Self: Sized,
354    {
355        self.0.framed(io).map_codec(Self)
356    }
357}
358
359#[instrument(level = "trace", skip(w, deferred))]
360async fn handle_deferred<T, I>(w: T, deferred: I, mut path: Vec<usize>) -> std::io::Result<()>
361where
362    I: IntoIterator<Item = Option<DeferredFn<T>>>,
363    I::IntoIter: ExactSizeIterator,
364    T: crate::Index<T>,
365{
366    let mut futs = FuturesUnordered::default();
367    for (i, f) in zip(0.., deferred) {
368        if let Some(f) = f {
369            path.push(i);
370            let w = w.index(&path).map_err(std::io::Error::other)?;
371            path.pop();
372            futs.push(f(w, Vec::default()));
373        }
374    }
375    while let Some(()) = futs.try_next().await? {}
376    Ok(())
377}
378
379/// Defines value encoding
380pub trait Encode<T>: Sized {
381    /// Encoder used to encode the value
382    type Encoder: tokio_util::codec::Encoder<Self> + Deferred<T> + Default + Send;
383
384    /// Convenience function for encoding a value
385    #[instrument(level = "trace", skip(self, enc))]
386    fn encode(
387        self,
388        enc: &mut Self::Encoder,
389        dst: &mut BytesMut,
390    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
391    {
392        enc.encode(self, dst)?;
393        Ok(enc.take_deferred())
394    }
395
396    /// Encode an iterator of owned values
397    #[instrument(level = "trace", skip(items, enc))]
398    fn encode_iter_own<I>(
399        items: I,
400        enc: &mut Self::Encoder,
401        dst: &mut BytesMut,
402    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
403    where
404        I: IntoIterator<Item = Self>,
405        I::IntoIter: ExactSizeIterator,
406        T: crate::Index<T> + Send + Sync + 'static,
407    {
408        let items = items.into_iter();
409        dst.reserve(items.len());
410        let mut deferred = Vec::with_capacity(items.len());
411        for item in items {
412            enc.encode(item, dst)?;
413            deferred.push(enc.take_deferred());
414        }
415        if deferred.iter().any(Option::is_some) {
416            Ok(Some(Box::new(move |w, path| {
417                Box::pin(handle_deferred(w, deferred, path))
418            })))
419        } else {
420            Ok(None)
421        }
422    }
423
424    /// Encode an iterator of value references
425    #[instrument(level = "trace", skip(items, enc))]
426    fn encode_iter_ref<'a, I>(
427        items: I,
428        enc: &mut Self::Encoder,
429        dst: &mut BytesMut,
430    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
431    where
432        I: IntoIterator<Item = &'a Self>,
433        I::IntoIter: ExactSizeIterator,
434        T: crate::Index<T> + Send + Sync + 'static,
435        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
436    {
437        let items = items.into_iter();
438        dst.reserve(items.len());
439        let mut deferred = Vec::with_capacity(items.len());
440        for item in items {
441            enc.encode(item, dst)?;
442            deferred.push(enc.take_deferred());
443        }
444        if deferred.iter().any(Option::is_some) {
445            Ok(Some(Box::new(move |w, path| {
446                Box::pin(handle_deferred(w, deferred, path))
447            })))
448        } else {
449            Ok(None)
450        }
451    }
452
453    /// Encode a list of owned values
454    #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
455    fn encode_list_own(
456        items: Vec<Self>,
457        enc: &mut Self::Encoder,
458        dst: &mut BytesMut,
459    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
460    where
461        T: crate::Index<T> + Send + Sync + 'static,
462    {
463        let n = u32::try_from(items.len())
464            .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
465        dst.reserve(5 + items.len());
466        Leb128Encoder.encode(n, dst)?;
467        Self::encode_iter_own(items, enc, dst)
468    }
469
470    /// Encode a list of value references
471    #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
472    fn encode_list_ref<'a>(
473        items: &'a [Self],
474        enc: &mut Self::Encoder,
475        dst: &mut BytesMut,
476    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
477    where
478        T: crate::Index<T> + Send + Sync + 'static,
479        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
480    {
481        let n = u32::try_from(items.len())
482            .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
483        dst.reserve(5 + items.len());
484        Leb128Encoder.encode(n, dst)?;
485        Self::encode_iter_ref(items, enc, dst)
486    }
487}
488
489/// Defines value decoding
490pub trait Decode<T>: Sized {
491    /// Decoder used to decode value
492    type Decoder: tokio_util::codec::Decoder<Item = Self>
493        + Deferred<Incoming<T>>
494        + Default
495        + Send
496        + 'static;
497    /// Decoder used to decode lists of value
498    type ListDecoder: tokio_util::codec::Decoder<Item = Vec<Self>> + Default + 'static;
499}
500
501impl<T, W> Deferred<W> for OptionEncoder<T>
502where
503    T: Deferred<W>,
504{
505    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
506        self.0.take_deferred()
507    }
508}
509
510impl<T, W> Encode<W> for Option<T>
511where
512    T: Encode<W>,
513{
514    type Encoder = OptionEncoder<T::Encoder>;
515}
516
517impl<'a, T, W> Encode<W> for &'a Option<T>
518where
519    T: Encode<W>,
520    T::Encoder: tokio_util::codec::Encoder<&'a T>,
521{
522    type Encoder = OptionEncoder<T::Encoder>;
523}
524
525impl<T, W> Deferred<W> for OptionDecoder<T>
526where
527    T: Deferred<W> + Default,
528{
529    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
530        mem::take(self).into_inner().take_deferred()
531    }
532}
533
534impl<T, R> Decode<R> for Option<T>
535where
536    T: Decode<R>,
537    R: crate::Index<R> + Send + 'static,
538{
539    type Decoder = OptionDecoder<T::Decoder>;
540    type ListDecoder = ListDecoder<Self::Decoder, R>;
541}
542
543impl<O, E, W> Deferred<W> for ResultEncoder<O, E>
544where
545    O: Deferred<W>,
546    E: Deferred<W>,
547{
548    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
549        match (self.ok.take_deferred(), self.err.take_deferred()) {
550            (None, None) => None,
551            (Some(ok), None) => Some(ok),
552            (None, Some(err)) => Some(err),
553            (Some(ok), Some(_)) => {
554                if cfg!(debug_assertions) {
555                    panic!("both `result::ok` and `result::err` deferred function set")
556                }
557                Some(ok)
558            }
559        }
560    }
561}
562
563impl<O, E, W> Encode<W> for Result<O, E>
564where
565    O: Encode<W>,
566    E: Encode<W>,
567    std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<O>>::Error>,
568    std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<E>>::Error>,
569{
570    type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
571}
572
573impl<'a, O, E, W> Encode<W> for &'a Result<O, E>
574where
575    O: Encode<W>,
576    O::Encoder: tokio_util::codec::Encoder<&'a O>,
577    E: Encode<W>,
578    E::Encoder: tokio_util::codec::Encoder<&'a E>,
579    std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<&'a O>>::Error>,
580    std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<&'a E>>::Error>,
581{
582    type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
583}
584
585impl<O, E, W> Deferred<W> for ResultDecoder<O, E>
586where
587    O: Deferred<W> + Default,
588    E: Deferred<W> + Default,
589{
590    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
591        let (mut ok, mut err) = mem::take(self).into_inner();
592        match (ok.take_deferred(), err.take_deferred()) {
593            (None, None) => None,
594            (Some(ok), None) => Some(ok),
595            (None, Some(err)) => Some(err),
596            (Some(ok), Some(_)) => {
597                if cfg!(debug_assertions) {
598                    panic!("both `result::ok` and `result::err` deferred function set")
599                }
600                Some(ok)
601            }
602        }
603    }
604}
605
606impl<O, E, R> Decode<R> for Result<O, E>
607where
608    O: Decode<R>,
609    E: Decode<R>,
610    R: crate::Index<R> + Send + 'static,
611    std::io::Error: From<<O::Decoder as tokio_util::codec::Decoder>::Error>,
612    std::io::Error: From<<E::Decoder as tokio_util::codec::Decoder>::Error>,
613{
614    type Decoder = ResultDecoder<O::Decoder, E::Decoder>;
615    type ListDecoder = ListDecoder<Self::Decoder, R>;
616}
617
618/// Encoder for `list<T>`
619pub struct ListEncoder<W> {
620    deferred: Option<DeferredFn<W>>,
621}
622
623impl<W> Default for ListEncoder<W> {
624    fn default() -> Self {
625        Self { deferred: None }
626    }
627}
628
629impl<W> Deferred<W> for ListEncoder<W> {
630    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
631        self.deferred.take()
632    }
633}
634
635impl<T, W> tokio_util::codec::Encoder<Vec<T>> for ListEncoder<W>
636where
637    T: Encode<W>,
638    W: crate::Index<W> + Send + Sync + 'static,
639{
640    type Error = <T::Encoder as tokio_util::codec::Encoder<T>>::Error;
641
642    fn encode(&mut self, items: Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
643        let mut enc = T::Encoder::default();
644        self.deferred = T::encode_list_own(items, &mut enc, dst)?;
645        Ok(())
646    }
647}
648
649impl<'a, T, W> tokio_util::codec::Encoder<&'a Vec<T>> for ListEncoder<W>
650where
651    T: Encode<W>,
652    T::Encoder: tokio_util::codec::Encoder<&'a T>,
653    W: crate::Index<W> + Send + Sync + 'static,
654{
655    type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
656
657    fn encode(&mut self, items: &'a Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
658        let mut enc = T::Encoder::default();
659        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
660        Ok(())
661    }
662}
663
664impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b Vec<T>> for ListEncoder<W>
665where
666    T: Encode<W>,
667    T::Encoder: tokio_util::codec::Encoder<&'b T>,
668    W: crate::Index<W> + Send + Sync + 'static,
669{
670    type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
671
672    fn encode(&mut self, items: &'a &'b Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
673        let mut enc = T::Encoder::default();
674        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
675        Ok(())
676    }
677}
678
679impl<'a, T, W> tokio_util::codec::Encoder<&'a [T]> for ListEncoder<W>
680where
681    T: Encode<W>,
682    T::Encoder: tokio_util::codec::Encoder<&'a T>,
683    W: crate::Index<W> + Send + Sync + 'static,
684{
685    type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
686
687    fn encode(&mut self, items: &'a [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
688        let mut enc = T::Encoder::default();
689        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
690        Ok(())
691    }
692}
693
694impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b [T]> for ListEncoder<W>
695where
696    T: Encode<W>,
697    T::Encoder: tokio_util::codec::Encoder<&'b T>,
698    W: crate::Index<W> + Send + Sync + 'static,
699{
700    type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
701
702    fn encode(&mut self, items: &'a &'b [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
703        let mut enc = T::Encoder::default();
704        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
705        Ok(())
706    }
707}
708
709impl<T, W> Encode<W> for Vec<T>
710where
711    T: Encode<W>,
712    W: crate::Index<W> + Send + Sync + 'static,
713{
714    type Encoder = ListEncoder<W>;
715}
716
717impl<'a, T, W> Encode<W> for &'a Vec<T>
718where
719    T: Encode<W>,
720    T::Encoder: tokio_util::codec::Encoder<&'a T>,
721    W: crate::Index<W> + Send + Sync + 'static,
722{
723    type Encoder = ListEncoder<W>;
724}
725
726impl<'a, T, W> Encode<W> for &'a [T]
727where
728    T: Encode<W>,
729    T::Encoder: tokio_util::codec::Encoder<&'a T>,
730    W: crate::Index<W> + Send + Sync + 'static,
731{
732    type Encoder = ListEncoder<W>;
733}
734
735/// Decoder for `list<T>`
736pub struct ListDecoder<T, R>
737where
738    T: tokio_util::codec::Decoder,
739{
740    dec: T,
741    ret: Vec<T::Item>,
742    cap: usize,
743    deferred: Vec<Option<DeferredFn<Incoming<R>>>>,
744}
745
746impl<T, R> ListDecoder<T, R>
747where
748    T: tokio_util::codec::Decoder,
749{
750    /// Constructs a new list decoder
751    pub fn new(dec: T) -> Self {
752        Self {
753            dec,
754            ret: Vec::default(),
755            cap: 0,
756            deferred: vec![],
757        }
758    }
759}
760
761impl<T, R> Default for ListDecoder<T, R>
762where
763    T: tokio_util::codec::Decoder + Default,
764{
765    fn default() -> Self {
766        Self::new(T::default())
767    }
768}
769
770impl<T, R> Deferred<Incoming<R>> for ListDecoder<T, R>
771where
772    T: tokio_util::codec::Decoder,
773    R: crate::Index<R> + Send + Sync + 'static,
774{
775    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
776        let deferred = mem::take(&mut self.deferred);
777        if deferred.iter().any(Option::is_some) {
778            Some(Box::new(|r, path| {
779                Box::pin(handle_deferred(r, deferred, path))
780            }))
781        } else {
782            None
783        }
784    }
785}
786
787impl<T, R> tokio_util::codec::Decoder for ListDecoder<T, R>
788where
789    T: tokio_util::codec::Decoder + Deferred<Incoming<R>>,
790{
791    type Item = Vec<T::Item>;
792    type Error = T::Error;
793
794    #[instrument(level = "trace", skip(self), fields(ty = "list"))]
795    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
796        if self.cap == 0 {
797            let Some(len) = Leb128DecoderU32.decode(src)? else {
798                return Ok(None);
799            };
800            if len == 0 {
801                return Ok(Some(Vec::default()));
802            }
803            let len = len
804                .try_into()
805                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
806            self.ret = Vec::with_capacity(len);
807            self.deferred = Vec::with_capacity(len);
808            self.cap = len;
809        }
810        while self.cap > 0 {
811            let Some(v) = self.dec.decode(src)? else {
812                return Ok(None);
813            };
814            self.ret.push(v);
815            self.deferred.push(self.dec.take_deferred());
816            self.cap -= 1;
817        }
818        Ok(Some(mem::take(&mut self.ret)))
819    }
820}
821
822impl<T, R> Decode<R> for Vec<T>
823where
824    T: Decode<R> + Send,
825    T::ListDecoder: Deferred<Incoming<R>> + Send,
826    R: crate::Index<R> + Send + 'static,
827{
828    type Decoder = T::ListDecoder;
829    type ListDecoder = ListDecoder<Self::Decoder, R>;
830}
831
832macro_rules! impl_copy_codec {
833    ($t:ty, $c:tt) => {
834        impl<W> Encode<W> for $t {
835            type Encoder = $c;
836
837            #[instrument(level = "trace", skip(items))]
838            fn encode_iter_own<I>(
839                items: I,
840                enc: &mut Self::Encoder,
841                dst: &mut BytesMut,
842            ) -> Result<
843                Option<DeferredFn<W>>,
844                <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
845            >
846            where
847                I: IntoIterator<Item = Self>,
848                I::IntoIter: ExactSizeIterator,
849            {
850                let items = items.into_iter();
851                dst.reserve(items.len());
852                for item in items {
853                    enc.encode(item, dst)?;
854                }
855                Ok(None)
856            }
857
858            #[instrument(level = "trace", skip(items))]
859            fn encode_iter_ref<'a, I>(
860                items: I,
861                enc: &mut Self::Encoder,
862                dst: &mut BytesMut,
863            ) -> Result<
864                Option<DeferredFn<W>>,
865                <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
866            >
867            where
868                I: IntoIterator<Item = &'a Self>,
869                I::IntoIter: ExactSizeIterator,
870            {
871                let items = items.into_iter();
872                dst.reserve(items.len());
873                for item in items {
874                    enc.encode(*item, dst)?;
875                }
876                Ok(None)
877            }
878        }
879
880        impl<'b, W> Encode<W> for &'b $t {
881            type Encoder = $c;
882
883            #[instrument(level = "trace", skip(items))]
884            fn encode_iter_own<I>(
885                items: I,
886                enc: &mut Self::Encoder,
887                dst: &mut BytesMut,
888            ) -> Result<
889                Option<DeferredFn<W>>,
890                <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
891            >
892            where
893                I: IntoIterator<Item = Self>,
894                I::IntoIter: ExactSizeIterator,
895            {
896                let items = items.into_iter();
897                dst.reserve(items.len());
898                for item in items {
899                    enc.encode(*item, dst)?;
900                }
901                Ok(None)
902            }
903
904            #[instrument(level = "trace", skip(items))]
905            fn encode_iter_ref<'a, I>(
906                items: I,
907                enc: &mut Self::Encoder,
908                dst: &mut BytesMut,
909            ) -> Result<
910                Option<DeferredFn<W>>,
911                <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
912            >
913            where
914                I: IntoIterator<Item = &'a Self>,
915                I::IntoIter: ExactSizeIterator,
916                'b: 'a,
917            {
918                let items = items.into_iter();
919                dst.reserve(items.len());
920                for item in items {
921                    enc.encode(item, dst)?;
922                }
923                Ok(None)
924            }
925        }
926
927        impl<R> Decode<R> for $t {
928            type Decoder = $c;
929            type ListDecoder = CoreVecDecoder<Self::Decoder>;
930        }
931    };
932}
933
934impl_copy_codec!(bool, BoolCodec);
935impl_copy_codec!(i8, S8Codec);
936impl_copy_codec!(i16, S16Codec);
937impl_copy_codec!(u16, U16Codec);
938impl_copy_codec!(i32, S32Codec);
939impl_copy_codec!(u32, U32Codec);
940impl_copy_codec!(i64, S64Codec);
941impl_copy_codec!(u64, U64Codec);
942impl_copy_codec!(f32, F32Codec);
943impl_copy_codec!(f64, F64Codec);
944impl_copy_codec!(char, Utf8Codec);
945
946impl<T> Encode<T> for u8 {
947    type Encoder = U8Codec;
948
949    #[instrument(level = "trace", skip(items))]
950    fn encode_iter_own<I>(
951        items: I,
952        enc: &mut Self::Encoder,
953        dst: &mut BytesMut,
954    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
955    where
956        I: IntoIterator<Item = Self>,
957        I::IntoIter: ExactSizeIterator,
958    {
959        let items = items.into_iter();
960        dst.reserve(items.len());
961        dst.extend(items);
962        Ok(None)
963    }
964
965    #[instrument(level = "trace", skip(items))]
966    fn encode_iter_ref<'a, I>(
967        items: I,
968        enc: &mut Self::Encoder,
969        dst: &mut BytesMut,
970    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
971    where
972        I: IntoIterator<Item = &'a Self>,
973        I::IntoIter: ExactSizeIterator,
974    {
975        let items = items.into_iter();
976        dst.reserve(items.len());
977        dst.extend(items);
978        Ok(None)
979    }
980
981    #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
982    fn encode_list_own(
983        items: Vec<Self>,
984        enc: &mut Self::Encoder,
985        dst: &mut BytesMut,
986    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
987    {
988        CoreVecEncoderBytes.encode(items, dst)?;
989        Ok(None)
990    }
991
992    #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
993    fn encode_list_ref<'a>(
994        items: &'a [Self],
995        enc: &mut Self::Encoder,
996        dst: &mut BytesMut,
997    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
998    where
999        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
1000    {
1001        CoreVecEncoderBytes.encode(items, dst)?;
1002        Ok(None)
1003    }
1004}
1005
1006impl<'b, T> Encode<T> for &'b u8 {
1007    type Encoder = U8Codec;
1008
1009    #[instrument(level = "trace", skip(items))]
1010    fn encode_iter_own<I>(
1011        items: I,
1012        enc: &mut Self::Encoder,
1013        dst: &mut BytesMut,
1014    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1015    where
1016        I: IntoIterator<Item = Self>,
1017        I::IntoIter: ExactSizeIterator,
1018    {
1019        let items = items.into_iter();
1020        dst.reserve(items.len());
1021        dst.extend(items);
1022        Ok(None)
1023    }
1024
1025    #[instrument(level = "trace", skip(items))]
1026    fn encode_iter_ref<'a, I>(
1027        items: I,
1028        enc: &mut Self::Encoder,
1029        dst: &mut BytesMut,
1030    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1031    where
1032        I: IntoIterator<Item = &'a Self>,
1033        I::IntoIter: ExactSizeIterator,
1034        'b: 'a,
1035    {
1036        let items = items.into_iter();
1037        dst.reserve(items.len());
1038        dst.extend(items.map(|b| **b));
1039        Ok(None)
1040    }
1041}
1042
1043/// Decoder for `list<u8>`
1044#[derive(Debug, Default)]
1045#[repr(transparent)]
1046pub struct ListDecoderU8(CoreVecDecoderBytes);
1047
1048impl tokio_util::codec::Decoder for ListDecoderU8 {
1049    type Item = Vec<u8>;
1050    type Error = <CoreVecDecoderBytes as tokio_util::codec::Decoder>::Error;
1051
1052    #[instrument(level = "trace", skip(self), fields(ty = "list<u8>"))]
1053    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1054        let Some(buf) = self.0.decode(src)? else {
1055            return Ok(None);
1056        };
1057        Ok(Some(buf.into()))
1058    }
1059}
1060
1061impl<R> Decode<R> for u8 {
1062    type Decoder = U8Codec;
1063    type ListDecoder = ListDecoderU8;
1064}
1065
1066impl<W> Encode<W> for &str {
1067    type Encoder = CoreNameEncoder;
1068}
1069
1070impl<W> Encode<W> for &&str {
1071    type Encoder = CoreNameEncoder;
1072}
1073
1074impl<W> Encode<W> for String {
1075    type Encoder = CoreNameEncoder;
1076}
1077
1078impl<W> Encode<W> for &String {
1079    type Encoder = CoreNameEncoder;
1080}
1081
1082impl<R> Decode<R> for String {
1083    type Decoder = CoreNameDecoder;
1084    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1085}
1086
1087impl<W> Encode<W> for Bytes {
1088    type Encoder = CoreVecEncoderBytes;
1089}
1090
1091impl<W> Encode<W> for &Bytes {
1092    type Encoder = CoreVecEncoderBytes;
1093}
1094
1095impl<R> Decode<R> for Bytes {
1096    type Decoder = CoreVecDecoderBytes;
1097    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1098}
1099
1100/// Encoder for `resource` types
1101#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1102#[repr(transparent)]
1103pub struct ResourceEncoder;
1104
1105impl<T: ?Sized> tokio_util::codec::Encoder<ResourceOwn<T>> for ResourceEncoder {
1106    type Error = std::io::Error;
1107
1108    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1109    fn encode(&mut self, item: ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1110        CoreVecEncoderBytes.encode(item.repr, dst)
1111    }
1112}
1113
1114impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceOwn<T>> for ResourceEncoder {
1115    type Error = std::io::Error;
1116
1117    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1118    fn encode(&mut self, item: &ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1119        CoreVecEncoderBytes.encode(&item.repr, dst)
1120    }
1121}
1122
1123impl<T: ?Sized, W> Encode<W> for ResourceOwn<T> {
1124    type Encoder = ResourceEncoder;
1125}
1126
1127impl<T: ?Sized, W> Encode<W> for &ResourceOwn<T> {
1128    type Encoder = ResourceEncoder;
1129}
1130
1131impl<T: ?Sized> tokio_util::codec::Encoder<ResourceBorrow<T>> for ResourceEncoder {
1132    type Error = std::io::Error;
1133
1134    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1135    fn encode(&mut self, item: ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1136        CoreVecEncoderBytes.encode(item.repr, dst)
1137    }
1138}
1139
1140impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceBorrow<T>> for ResourceEncoder {
1141    type Error = std::io::Error;
1142
1143    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1144    fn encode(&mut self, item: &ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1145        CoreVecEncoderBytes.encode(&item.repr, dst)
1146    }
1147}
1148
1149impl<T: ?Sized, W> Encode<W> for ResourceBorrow<T> {
1150    type Encoder = ResourceEncoder;
1151}
1152
1153impl<T: ?Sized, W> Encode<W> for &ResourceBorrow<T> {
1154    type Encoder = ResourceEncoder;
1155}
1156
1157/// Decoder for borrowed resource types
1158#[derive(Debug)]
1159#[repr(transparent)]
1160pub struct ResourceBorrowDecoder<T: ?Sized> {
1161    dec: CoreVecDecoderBytes,
1162    _ty: PhantomData<T>,
1163}
1164
1165impl<T: ?Sized> Default for ResourceBorrowDecoder<T> {
1166    fn default() -> Self {
1167        Self {
1168            dec: CoreVecDecoderBytes::default(),
1169            _ty: PhantomData,
1170        }
1171    }
1172}
1173
1174impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceBorrowDecoder<T> {
1175    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1176        None
1177    }
1178}
1179
1180impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceBorrowDecoder<T>> {
1181    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1182        None
1183    }
1184}
1185
1186impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceBorrow<T> {
1187    type Decoder = ResourceBorrowDecoder<T>;
1188    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1189}
1190
1191impl<T: ?Sized> tokio_util::codec::Decoder for ResourceBorrowDecoder<T> {
1192    type Item = ResourceBorrow<T>;
1193    type Error = std::io::Error;
1194
1195    #[instrument(level = "trace", skip(self), fields(ty = "borrow"))]
1196    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1197        let repr = self.dec.decode(src)?;
1198        Ok(repr.map(Self::Item::from))
1199    }
1200}
1201
1202/// Decoder for owned resource types
1203#[derive(Debug)]
1204#[repr(transparent)]
1205pub struct ResourceOwnDecoder<T: ?Sized> {
1206    dec: CoreVecDecoderBytes,
1207    _ty: PhantomData<T>,
1208}
1209
1210impl<T: ?Sized> Default for ResourceOwnDecoder<T> {
1211    fn default() -> Self {
1212        Self {
1213            dec: CoreVecDecoderBytes::default(),
1214            _ty: PhantomData,
1215        }
1216    }
1217}
1218
1219impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceOwnDecoder<T> {
1220    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1221        None
1222    }
1223}
1224
1225impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceOwnDecoder<T>> {
1226    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1227        None
1228    }
1229}
1230
1231impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceOwn<T> {
1232    type Decoder = ResourceOwnDecoder<T>;
1233    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1234}
1235
1236impl<T: ?Sized> tokio_util::codec::Decoder for ResourceOwnDecoder<T> {
1237    type Item = ResourceOwn<T>;
1238    type Error = std::io::Error;
1239
1240    #[instrument(level = "trace", skip(self), fields(ty = "own"))]
1241    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1242        let repr = self.dec.decode(src)?;
1243        Ok(repr.map(Self::Item::from))
1244    }
1245}
1246
1247/// Codec for `()`
1248#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1249#[repr(transparent)]
1250pub struct UnitCodec;
1251
1252impl tokio_util::codec::Encoder<()> for UnitCodec {
1253    type Error = std::io::Error;
1254
1255    #[instrument(level = "trace", skip(self), ret)]
1256    fn encode(&mut self, (): (), dst: &mut BytesMut) -> std::io::Result<()> {
1257        Ok(())
1258    }
1259}
1260
1261impl tokio_util::codec::Encoder<&()> for UnitCodec {
1262    type Error = std::io::Error;
1263
1264    #[instrument(level = "trace", skip(self), ret)]
1265    fn encode(&mut self, (): &(), dst: &mut BytesMut) -> std::io::Result<()> {
1266        Ok(())
1267    }
1268}
1269
1270impl tokio_util::codec::Decoder for UnitCodec {
1271    type Item = ();
1272    type Error = std::io::Error;
1273
1274    #[instrument(level = "trace", skip(self))]
1275    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1276        Ok(Some(()))
1277    }
1278}
1279
1280/// Marker trait for [Encode] tuple types
1281pub trait TupleEncode<W>: Encode<W> {}
1282
1283/// Marker trait for [Decode] tuple types
1284pub trait TupleDecode<R>: Decode<R> {}
1285
1286impl<W> Encode<W> for () {
1287    type Encoder = UnitCodec;
1288}
1289
1290impl<W> TupleEncode<W> for () {}
1291
1292impl<R> Decode<R> for () {
1293    type Decoder = UnitCodec;
1294    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1295}
1296
1297impl<R> TupleDecode<R> for () {}
1298
1299macro_rules! impl_tuple_codec {
1300    ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+; $($ct:ident),+) => {
1301        impl<W, $($ct),+> Deferred<W> for TupleEncoder::<($($ct),+,)>
1302        where
1303            W: crate::Index<W> + Send + Sync + 'static,
1304            $($ct: Deferred<W> + Default + 'static),+
1305        {
1306            fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1307                let Self(($(mut $cn),+,)) = mem::take(self);
1308                let deferred = [ $($cn.take_deferred()),+ ];
1309                if deferred.iter().any(Option::is_some) {
1310                    Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1311                } else {
1312                    None
1313                }
1314            }
1315        }
1316
1317        impl<W, E, $($vt),+> Encode<W> for ($($vt),+,)
1318        where
1319            W: crate::Index<W> + Send + Sync + 'static,
1320            E: From<std::io::Error>,
1321            $(
1322                $vt: Encode<W>,
1323                $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1324            )+
1325        {
1326            type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1327        }
1328
1329        impl<W, E, $($vt),+> TupleEncode<W> for ($($vt),+,)
1330        where
1331            W: crate::Index<W> + Send + Sync + 'static,
1332            E: From<std::io::Error>,
1333            $(
1334                $vt: Encode<W>,
1335                $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1336            )+
1337        {
1338        }
1339
1340        impl<'a, W, E, $($vt),+> Encode<W> for &'a ($($vt),+,)
1341        where
1342            W: crate::Index<W> + Send + Sync + 'static,
1343            E: From<std::io::Error>,
1344            $(
1345                $vt: Encode<W>,
1346                $vt::Encoder: tokio_util::codec::Encoder<&'a $vt, Error = E> + 'static,
1347            )+
1348        {
1349            type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1350        }
1351
1352        impl<R, $($vt),+> Deferred<Incoming<R>> for TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>
1353        where
1354            R: crate::Index<R> + Send + Sync + 'static,
1355            $($vt: Decode<R>),+
1356        {
1357            fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1358                let ($(mut $cn),+,) = mem::take(self).into_inner();
1359                let deferred = [ $($cn.take_deferred()),+ ];
1360                if deferred.iter().any(Option::is_some) {
1361                    Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1362                } else {
1363                    None
1364                }
1365            }
1366        }
1367
1368        impl<R, E, $($vt),+> Decode<R> for ($($vt),+,)
1369        where
1370            R: crate::Index<R> + Send + Sync + 'static,
1371            E: From<std::io::Error>,
1372            $(
1373                $vt: Decode<R> + Send + 'static,
1374                $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1375            )+
1376        {
1377            type Decoder = TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>;
1378            type ListDecoder = ListDecoder<Self::Decoder, R>;
1379        }
1380
1381        impl<R, E, $($vt),+> TupleDecode<R> for ($($vt),+,)
1382        where
1383            R: crate::Index<R> + Send + Sync + 'static,
1384            E: From<std::io::Error>,
1385            $(
1386                $vt: Decode<R> + Send + 'static,
1387                $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1388            )+
1389        {
1390        }
1391    };
1392}
1393
1394impl_tuple_codec!(
1395    v0;
1396    V0;
1397    c0;
1398    C0
1399);
1400
1401impl_tuple_codec!(
1402    v0, v1;
1403    V0, V1;
1404    c0, c1;
1405    C0, C1
1406);
1407
1408impl_tuple_codec!(
1409    v0, v1, v2;
1410    V0, V1, V2;
1411    c0, c1, c2;
1412    C0, C1, C2
1413);
1414
1415impl_tuple_codec!(
1416    v0, v1, v2, v3;
1417    V0, V1, V2, V3;
1418    c0, c1, c2, c3;
1419    C0, C1, C2, C3
1420);
1421
1422impl_tuple_codec!(
1423    v0, v1, v2, v3, v4;
1424    V0, V1, V2, V3, V4;
1425    c0, c1, c2, c3, c4;
1426    C0, C1, C2, C3, C4
1427);
1428
1429impl_tuple_codec!(
1430    v0, v1, v2, v3, v4, v5;
1431    V0, V1, V2, V3, V4, V5;
1432    c0, c1, c2, c3, c4, c5;
1433    C0, C1, C2, C3, C4, C5
1434);
1435
1436impl_tuple_codec!(
1437    v0, v1, v2, v3, v4, v5, v6;
1438    V0, V1, V2, V3, V4, V5, V6;
1439    c0, c1, c2, c3, c4, c5, c6;
1440    C0, C1, C2, C3, C4, C5, C6
1441);
1442
1443impl_tuple_codec!(
1444    v0, v1, v2, v3, v4, v5, v6, v7;
1445    V0, V1, V2, V3, V4, V5, V6, V7;
1446    c0, c1, c2, c3, c4, c5, c6, c7;
1447    C0, C1, C2, C3, C4, C5, C6, C7
1448);
1449
1450impl_tuple_codec!(
1451    v0, v1, v2, v3, v4, v5, v6, v7, v8;
1452    V0, V1, V2, V3, V4, V5, V6, V7, V8;
1453    c0, c1, c2, c3, c4, c5, c6, c7, c8;
1454    C0, C1, C2, C3, C4, C5, C6, C7, C8
1455);
1456
1457impl_tuple_codec!(
1458    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9;
1459    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9;
1460    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9;
1461    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9
1462);
1463
1464impl_tuple_codec!(
1465    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10;
1466    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10;
1467    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10;
1468    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10
1469);
1470
1471impl_tuple_codec!(
1472    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11;
1473    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11;
1474    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11;
1475    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11
1476);
1477
1478impl_tuple_codec!(
1479    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12;
1480    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12;
1481    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
1482    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12
1483);
1484
1485impl_tuple_codec!(
1486    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13;
1487    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13;
1488    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13;
1489    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13
1490);
1491
1492impl_tuple_codec!(
1493    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14;
1494    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14;
1495    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14;
1496    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14
1497);
1498
1499impl_tuple_codec!(
1500    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15;
1501    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15;
1502    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15;
1503    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15
1504);
1505
1506/// Encoder for `future<T>`
1507pub struct FutureEncoder<W> {
1508    deferred: Option<DeferredFn<W>>,
1509}
1510
1511impl<W> Default for FutureEncoder<W> {
1512    fn default() -> Self {
1513        Self { deferred: None }
1514    }
1515}
1516
1517impl<W> Deferred<W> for FutureEncoder<W> {
1518    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1519        self.deferred.take()
1520    }
1521}
1522
1523impl<T, W, Fut> tokio_util::codec::Encoder<Fut> for FutureEncoder<W>
1524where
1525    T: Encode<W>,
1526    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1527    Fut: Future<Output = T> + Send + 'static,
1528    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1529{
1530    type Error = std::io::Error;
1531
1532    #[instrument(level = "trace", skip(self, item), fields(ty = "future"))]
1533    fn encode(&mut self, item: Fut, dst: &mut BytesMut) -> std::io::Result<()> {
1534        // TODO: Check if future is resolved
1535        dst.reserve(1);
1536        dst.put_u8(0x00);
1537        let span = Span::current();
1538        self.deferred = Some(Box::new(|mut w, path| {
1539            Box::pin(
1540                async move {
1541                    if !path.is_empty() {
1542                        w = w.index(&path).map_err(std::io::Error::other)?;
1543                    };
1544                    let item = item.await;
1545                    let mut enc = T::Encoder::default();
1546                    let mut buf = BytesMut::default();
1547                    enc.encode(item, &mut buf)?;
1548                    w.write_all(&buf).await?;
1549                    if let Some(f) = enc.take_deferred() {
1550                        f(w, Vec::default()).await
1551                    } else {
1552                        Ok(())
1553                    }
1554                }
1555                .instrument(span),
1556            )
1557        }));
1558        Ok(())
1559    }
1560}
1561
1562impl<T, W> Encode<W> for Pin<Box<dyn Future<Output = T> + Send>>
1563where
1564    T: Encode<W> + 'static,
1565    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1566    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1567{
1568    type Encoder = FutureEncoder<W>;
1569}
1570
1571/// Decoder for `future<T>`
1572pub struct FutureDecoder<T, R>
1573where
1574    T: Decode<R>,
1575{
1576    dec: OptionDecoder<T::Decoder>,
1577    deferred: Option<DeferredFn<Incoming<R>>>,
1578    _ty: PhantomData<T>,
1579}
1580
1581impl<T, R> Default for FutureDecoder<T, R>
1582where
1583    T: Decode<R>,
1584{
1585    fn default() -> Self {
1586        Self {
1587            dec: OptionDecoder::default(),
1588            deferred: None,
1589            _ty: PhantomData,
1590        }
1591    }
1592}
1593
1594impl<T, R> Deferred<Incoming<R>> for FutureDecoder<T, R>
1595where
1596    T: Decode<R>,
1597{
1598    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1599        self.deferred.take()
1600    }
1601}
1602
1603impl<T, R> tokio_util::codec::Decoder for FutureDecoder<T, R>
1604where
1605    T: Decode<R> + Send + 'static,
1606    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1607    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1608{
1609    type Item = Pin<Box<dyn Future<Output = T> + Send>>;
1610    type Error = <T::Decoder as tokio_util::codec::Decoder>::Error;
1611
1612    #[instrument(level = "trace", skip(self), fields(ty = "future"))]
1613    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1614        let Some(item) = self.dec.decode(src)? else {
1615            return Ok(None);
1616        };
1617        if let Some(item) = item {
1618            self.deferred = self.dec.take_deferred();
1619            return Ok(Some(Box::pin(async { item })));
1620        }
1621
1622        // future is pending
1623        let (tx, rx) = oneshot::channel();
1624        let dec = mem::take(&mut self.dec).into_inner();
1625        let span = Span::current();
1626        self.deferred = Some(Box::new(|mut r, path| {
1627            Box::pin(
1628                async move {
1629                    if !path.is_empty() {
1630                        r = r.index(&path).map_err(std::io::Error::other)?;
1631                    };
1632                    let mut dec = FramedRead::new(r, dec);
1633                    trace!(?path, "receiving future element");
1634                    let Some(item) = dec.next().await else {
1635                        return Err(std::io::ErrorKind::UnexpectedEof.into());
1636                    };
1637                    let item = item?;
1638                    if tx.send(item).is_err() {
1639                        debug!("future receiver closed, discard data");
1640                        return Ok(());
1641                    }
1642                    if let Some(rx) = dec.decoder_mut().take_deferred() {
1643                        let buf = mem::take(dec.read_buffer_mut());
1644                        let mut r = dec.into_inner();
1645                        if !r.buffer.is_empty() {
1646                            r.buffer.unsplit(buf);
1647                        } else {
1648                            r.buffer = buf;
1649                        }
1650                        rx(r, Vec::default()).await?;
1651                    }
1652                    Ok(())
1653                }
1654                .instrument(span),
1655            )
1656        }));
1657        return Ok(Some(Box::pin(async {
1658            let Ok(ret) = rx.await else {
1659                error!("future I/O dropped");
1660                return pending().await;
1661            };
1662            ret
1663        })));
1664    }
1665}
1666
1667impl<T, R> Decode<R> for Pin<Box<dyn Future<Output = T> + Send>>
1668where
1669    T: Decode<R> + Send + 'static,
1670    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1671    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1672{
1673    type Decoder = FutureDecoder<T, R>;
1674    type ListDecoder = ListDecoder<Self::Decoder, R>;
1675}
1676
1677/// Encoder for `stream<T>`
1678pub struct StreamEncoder<W> {
1679    deferred: Option<DeferredFn<W>>,
1680}
1681
1682impl<W> Default for StreamEncoder<W> {
1683    fn default() -> Self {
1684        Self { deferred: None }
1685    }
1686}
1687
1688impl<W> Deferred<W> for StreamEncoder<W> {
1689    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1690        self.deferred.take()
1691    }
1692}
1693
1694impl<T, W, S> tokio_util::codec::Encoder<S> for StreamEncoder<W>
1695where
1696    T: Encode<W> + Send + 'static,
1697    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1698    S: Stream<Item = Vec<T>> + Send + Unpin + 'static,
1699    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1700{
1701    type Error = std::io::Error;
1702
1703    #[instrument(level = "trace", skip(self, items), fields(ty = "stream"))]
1704    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1705        // TODO: Check if stream is resolved
1706        dst.reserve(1);
1707        dst.put_u8(0x00);
1708        let span = Span::current();
1709        self.deferred = Some(Box::new(|mut w, path| {
1710            Box::pin(async move {
1711                if !path.is_empty() {
1712                    w = w.index(&path).map_err(std::io::Error::other)?;
1713                };
1714                let mut enc = T::Encoder::default();
1715                let mut buf = BytesMut::default();
1716                let mut tasks = JoinSet::new();
1717                let mut i = 0_u64;
1718                loop {
1719                    select! {
1720                        chunk = items.next() => {
1721                            let Some(chunk) = chunk else {
1722                                trace!("writing stream end");
1723                                buf.reserve(1);
1724                                buf.put_u8(0x00);
1725                                w.write_all(&buf).await?;
1726                                while let Some(res) = tasks.join_next().await {
1727                                    trace!(?res, "receiver task finished");
1728                                    res??;
1729                                }
1730                                return Ok(())
1731                            };
1732                            let n = u32::try_from(chunk.len()).map_err(|err| {
1733                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1734                            })?;
1735                            let end = i.checked_add(n.into()).ok_or_else(|| {
1736                                std::io::Error::new(
1737                                    std::io::ErrorKind::InvalidInput,
1738                                    "stream element index would overflow u64",
1739                                )
1740                            })?;
1741                            trace!(n, "encoding chunk length");
1742                            Leb128Encoder.encode(n, &mut buf)?;
1743                            trace!(i, buf = format!("{buf:02x?}"), "writing stream chunk items");
1744
1745                            buf.reserve(chunk.len());
1746                            for (i, item) in zip(i.., chunk) {
1747                                enc.encode(item, &mut buf)?;
1748                                if let Some(f) = enc.take_deferred() {
1749                                    let i = i
1750                                        .try_into()
1751                                        .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
1752                                    let w = w.index(&[i]).map_err(std::io::Error::other)?;
1753                                    trace!("spawning transmit task");
1754                                    tasks.spawn(f(w, Vec::default()));
1755                                }
1756                            }
1757                            i = end;
1758                        }
1759                        Some(res) = tasks.join_next() => {
1760                            trace!(?res, "receiver task finished");
1761                            res??;
1762                        }
1763                        res = w.write(&buf), if !buf.is_empty() => {
1764                            let n = res?;
1765                            trace!(?buf, n, "wrote bytes from buffer");
1766                            buf.advance(n);
1767                        }
1768                    }
1769                }
1770            }.instrument(span))
1771        }));
1772        Ok(())
1773    }
1774}
1775
1776impl<T, W> Encode<W> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
1777where
1778    T: Encode<W> + Send + 'static,
1779    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1780    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1781{
1782    type Encoder = StreamEncoder<W>;
1783}
1784
1785/// Encoder for `stream<list<u8>>`
1786pub struct StreamEncoderBytes<W> {
1787    deferred: Option<DeferredFn<W>>,
1788}
1789
1790impl<W> Default for StreamEncoderBytes<W> {
1791    fn default() -> Self {
1792        Self { deferred: None }
1793    }
1794}
1795
1796impl<W> Deferred<W> for StreamEncoderBytes<W> {
1797    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1798        self.deferred.take()
1799    }
1800}
1801
1802impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderBytes<W>
1803where
1804    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1805    S: Stream<Item = Bytes> + Send + Unpin + 'static,
1806{
1807    type Error = std::io::Error;
1808
1809    #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1810    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1811        // TODO: Check if reader is resolved
1812        dst.reserve(1);
1813        dst.put_u8(0x00);
1814        self.deferred = Some(Box::new(|mut w, path| {
1815            Box::pin(async move {
1816                if !path.is_empty() {
1817                    w = w.index(&path).map_err(std::io::Error::other)?;
1818                };
1819                let mut buf = BytesMut::default();
1820                loop {
1821                    select! {
1822                        chunk = items.next() => {
1823                            let Some(chunk) = chunk else {
1824                                trace!("writing stream end");
1825                                buf.reserve(1);
1826                                buf.put_u8(0x00);
1827                                return w.write_all(&buf).await
1828                            };
1829                            let n = u32::try_from(chunk.len()).map_err(|err| {
1830                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1831                            })?;
1832                            trace!(n, "encoding chunk length");
1833                            Leb128Encoder.encode(n, &mut buf)?;
1834                            buf.extend_from_slice(&chunk);
1835                        }
1836                        res = w.write(&buf), if !buf.is_empty() => {
1837                            let n = res?;
1838                            buf.advance(n);
1839                        }
1840                    }
1841                }
1842            })
1843        }));
1844        Ok(())
1845    }
1846}
1847
1848impl<W> Encode<W> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
1849where
1850    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1851{
1852    type Encoder = StreamEncoderBytes<W>;
1853}
1854
1855/// Encoder for `stream<list<u8>>` with [`AsyncRead`] support
1856pub struct StreamEncoderRead<W> {
1857    deferred: Option<DeferredFn<W>>,
1858}
1859
1860impl<W> Default for StreamEncoderRead<W> {
1861    fn default() -> Self {
1862        Self { deferred: None }
1863    }
1864}
1865
1866impl<W> Deferred<W> for StreamEncoderRead<W> {
1867    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1868        self.deferred.take()
1869    }
1870}
1871
1872impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderRead<W>
1873where
1874    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1875    S: AsyncRead + Send + Unpin + 'static,
1876{
1877    type Error = std::io::Error;
1878
1879    #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1880    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1881        // TODO: Check if reader is resolved
1882        dst.reserve(1);
1883        dst.put_u8(0x00);
1884        self.deferred = Some(Box::new(|mut w, path| {
1885            Box::pin(async move {
1886                if !path.is_empty() {
1887                    w = w.index(&path).map_err(std::io::Error::other)?;
1888                };
1889                let mut buf = BytesMut::default();
1890                let mut chunk = BytesMut::default();
1891                loop {
1892                    select! {
1893                        res = items.read_buf(&mut chunk) => {
1894                            let n = res?;
1895                            if n == 0 {
1896                                trace!("writing stream end");
1897                                buf.reserve(1);
1898                                buf.put_u8(0x00);
1899                                return w.write_all(&buf).await
1900                            }
1901                            let n = u32::try_from(n).map_err(|err| {
1902                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1903                            })?;
1904                            trace!(n, "encoding chunk length");
1905                            Leb128Encoder.encode(n, &mut buf)?;
1906                            buf.extend_from_slice(&chunk);
1907                            chunk.clear();
1908                        }
1909                        res = w.write(&buf), if !buf.is_empty() => {
1910                            let n = res?;
1911                            buf.advance(n);
1912                        }
1913                    }
1914                }
1915            })
1916        }));
1917        Ok(())
1918    }
1919}
1920
1921impl<W> Encode<W> for Pin<Box<dyn AsyncRead + Send>>
1922where
1923    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1924{
1925    type Encoder = StreamEncoderRead<W>;
1926}
1927
1928impl<T, W> Encode<W> for std::io::Cursor<T>
1929where
1930    T: AsRef<[u8]> + Send + Unpin + 'static,
1931    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1932{
1933    type Encoder = StreamEncoderRead<W>;
1934}
1935
1936impl<W> Encode<W> for tokio::io::Empty
1937where
1938    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1939{
1940    type Encoder = StreamEncoderRead<W>;
1941}
1942
1943#[cfg(feature = "io-std")]
1944impl<W> Encode<W> for tokio::io::Stdin
1945where
1946    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1947{
1948    type Encoder = StreamEncoderRead<W>;
1949}
1950
1951#[cfg(feature = "fs")]
1952impl<W> Encode<W> for tokio::fs::File
1953where
1954    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1955{
1956    type Encoder = StreamEncoderRead<W>;
1957}
1958
1959#[cfg(feature = "net")]
1960impl<W> Encode<W> for tokio::net::TcpStream
1961where
1962    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1963{
1964    type Encoder = StreamEncoderRead<W>;
1965}
1966
1967#[cfg(all(unix, feature = "net"))]
1968impl<W> Encode<W> for tokio::net::UnixStream
1969where
1970    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1971{
1972    type Encoder = StreamEncoderRead<W>;
1973}
1974
1975#[cfg(all(unix, feature = "net"))]
1976impl<W> Encode<W> for tokio::net::unix::pipe::Receiver
1977where
1978    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1979{
1980    type Encoder = StreamEncoderRead<W>;
1981}
1982
1983/// Decoder for `stream<T>`
1984pub struct StreamDecoder<T, R>
1985where
1986    T: Decode<R>,
1987{
1988    dec: T::ListDecoder,
1989    deferred: Option<DeferredFn<Incoming<R>>>,
1990    _ty: PhantomData<T>,
1991}
1992
1993impl<T, R> Default for StreamDecoder<T, R>
1994where
1995    T: Decode<R>,
1996{
1997    fn default() -> Self {
1998        Self {
1999            dec: T::ListDecoder::default(),
2000            deferred: None,
2001            _ty: PhantomData,
2002        }
2003    }
2004}
2005
2006impl<T, R> Deferred<Incoming<R>> for StreamDecoder<T, R>
2007where
2008    T: Decode<R>,
2009{
2010    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2011        self.deferred.take()
2012    }
2013}
2014
2015#[instrument(level = "trace", skip(dec, r, tx), ret)]
2016async fn handle_deferred_stream<C, T, R>(
2017    dec: C,
2018    mut r: Incoming<R>,
2019    path: Vec<usize>,
2020    tx: mpsc::Sender<Vec<T>>,
2021) -> std::io::Result<()>
2022where
2023    C: tokio_util::codec::Decoder<Item = T> + Deferred<Incoming<R>>,
2024    R: AsyncRead + crate::Index<R> + Send + Unpin + 'static,
2025    std::io::Error: From<C::Error>,
2026{
2027    let dec = ListDecoder::new(dec);
2028    if !path.is_empty() {
2029        r = r.index(&path).map_err(std::io::Error::other)?;
2030    };
2031    let mut framed = FramedRead::new(r, dec);
2032    let mut tasks = JoinSet::new();
2033    let mut i = 0_usize;
2034    loop {
2035        trace!("receiving pending stream chunk");
2036        select! {
2037            Some(chunk) = framed.next() => {
2038                let chunk = chunk?;
2039                if chunk.is_empty() {
2040                    trace!("received stream end");
2041                    while let Some(res) = tasks.join_next().await {
2042                        res??;
2043                    }
2044                    return Ok(())
2045                }
2046                let end = i.checked_add(chunk.len()).ok_or_else(|| {
2047                    std::io::Error::new(
2048                        std::io::ErrorKind::InvalidInput,
2049                        "stream element index would overflow usize",
2050                    )
2051                })?;
2052                trace!(i, end, "received stream chunk");
2053                if tx.send(chunk).await.is_err() {
2054                    debug!("stream receiver closed, discard data");
2055                    return Ok(())
2056                }
2057                for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
2058                    if let Some(deferred) = deferred {
2059                        let r = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
2060                        trace!("spawning receive task");
2061                        tasks.spawn(deferred(r, Vec::default()));
2062                    }
2063                }
2064                i = end;
2065            },
2066            Some(res) = tasks.join_next() => {
2067                trace!(?res, "receiver task finished");
2068                res??;
2069            }
2070            else => {
2071                return Ok(());
2072            }
2073        }
2074    }
2075}
2076
2077impl<T, R> tokio_util::codec::Decoder for StreamDecoder<T, R>
2078where
2079    T: Decode<R> + Send + 'static,
2080    T::ListDecoder: Deferred<Incoming<R>>,
2081    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2082    <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2083    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2084{
2085    type Item = Pin<Box<dyn Stream<Item = Vec<T>> + Send>>;
2086    type Error = <<T as Decode<R>>::ListDecoder as tokio_util::codec::Decoder>::Error;
2087
2088    #[instrument(level = "trace", skip(self), fields(ty = "stream"))]
2089    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2090        let Some(chunk) = self.dec.decode(src)? else {
2091            return Ok(None);
2092        };
2093        if !chunk.is_empty() {
2094            self.deferred = self.dec.take_deferred();
2095            return Ok(Some(Box::pin(stream::iter([chunk]))));
2096        }
2097
2098        // stream is pending
2099        let (tx, rx) = mpsc::channel(128);
2100        self.deferred = Some(Box::new(|r, path| {
2101            Box::pin(
2102                async move { handle_deferred_stream(T::Decoder::default(), r, path, tx).await },
2103            )
2104        }));
2105        return Ok(Some(Box::pin(ReceiverStream::new(rx))));
2106    }
2107}
2108
2109impl<T, R> Decode<R> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
2110where
2111    T: Decode<R> + Send + 'static,
2112    T::ListDecoder: Deferred<Incoming<R>> + Send,
2113    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2114    <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2115    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2116{
2117    type Decoder = StreamDecoder<T, R>;
2118    type ListDecoder = ListDecoder<Self::Decoder, R>;
2119}
2120
2121/// Decoder for `stream<list<u8>>`
2122pub struct StreamDecoderBytes<R> {
2123    dec: CoreVecDecoderBytes,
2124    deferred: Option<DeferredFn<Incoming<R>>>,
2125}
2126
2127impl<R> Default for StreamDecoderBytes<R> {
2128    fn default() -> Self {
2129        Self {
2130            dec: CoreVecDecoderBytes::default(),
2131            deferred: None,
2132        }
2133    }
2134}
2135
2136impl<R> Deferred<Incoming<R>> for StreamDecoderBytes<R> {
2137    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2138        self.deferred.take()
2139    }
2140}
2141
2142impl<R> tokio_util::codec::Decoder for StreamDecoderBytes<R>
2143where
2144    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2145{
2146    type Item = Pin<Box<dyn Stream<Item = Bytes> + Send>>;
2147    type Error = std::io::Error;
2148
2149    #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2150    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2151        let Some(chunk) = self.dec.decode(src)? else {
2152            return Ok(None);
2153        };
2154        if !chunk.is_empty() {
2155            return Ok(Some(Box::pin(stream::iter([chunk]))));
2156        }
2157
2158        // stream is pending
2159        let (tx, rx) = mpsc::channel(128);
2160        let dec = mem::take(&mut self.dec);
2161        let span = Span::current();
2162        self.deferred = Some(Box::new(|mut r, path| {
2163            Box::pin(
2164                async move {
2165                    if !path.is_empty() {
2166                        r = r.index(&path).map_err(std::io::Error::other)?;
2167                    };
2168                    let mut framed = FramedRead::new(r, dec);
2169                    trace!(?path, "receiving pending byte stream chunk");
2170                    while let Some(chunk) = framed.next().await {
2171                        let chunk = chunk?;
2172                        if chunk.is_empty() {
2173                            trace!("received stream end");
2174                            return Ok(());
2175                        }
2176                        trace!(?chunk, "received pending byte stream chunk");
2177                        if tx.send(chunk).await.is_err() {
2178                            debug!("stream receiver closed, discard data");
2179                            return Ok(());
2180                        }
2181                    }
2182                    Ok(())
2183                }
2184                .instrument(span),
2185            )
2186        }));
2187        return Ok(Some(Box::pin(ReceiverStream::new(rx))));
2188    }
2189}
2190
2191impl<R> Decode<R> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
2192where
2193    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2194{
2195    type Decoder = StreamDecoderBytes<R>;
2196    type ListDecoder = ListDecoder<Self::Decoder, R>;
2197}
2198
2199/// Decoder for `stream<list<u8>>` with [`AsyncRead`] support
2200pub struct StreamDecoderRead<R> {
2201    dec: CoreVecDecoderBytes,
2202    deferred: Option<DeferredFn<Incoming<R>>>,
2203}
2204
2205impl<R> Default for StreamDecoderRead<R> {
2206    fn default() -> Self {
2207        Self {
2208            dec: CoreVecDecoderBytes::default(),
2209            deferred: None,
2210        }
2211    }
2212}
2213
2214impl<R> Deferred<Incoming<R>> for StreamDecoderRead<R> {
2215    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2216        self.deferred.take()
2217    }
2218}
2219
2220impl<R> tokio_util::codec::Decoder for StreamDecoderRead<R>
2221where
2222    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2223{
2224    type Item = Pin<Box<dyn AsyncRead + Send>>;
2225    type Error = std::io::Error;
2226
2227    #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2228    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2229        let Some(chunk) = self.dec.decode(src)? else {
2230            return Ok(None);
2231        };
2232        if !chunk.is_empty() {
2233            return Ok(Some(Box::pin(std::io::Cursor::new(chunk))));
2234        }
2235
2236        // stream is pending
2237        let (tx, rx) = mpsc::channel(128);
2238        let dec = mem::take(&mut self.dec);
2239        self.deferred = Some(Box::new(|mut r, path| {
2240            Box::pin(async move {
2241                if !path.is_empty() {
2242                    r = r.index(&path).map_err(std::io::Error::other)?;
2243                };
2244                let mut framed = FramedRead::new(r, dec);
2245                trace!("receiving pending byte stream chunk");
2246                while let Some(chunk) = framed.next().await {
2247                    let chunk = chunk?;
2248                    if chunk.is_empty() {
2249                        trace!("received stream end");
2250                        return Ok(());
2251                    }
2252                    trace!(?chunk, "received byte stream chunk");
2253                    if tx.send(std::io::Result::Ok(chunk)).await.is_err() {
2254                        debug!("stream receiver closed, discard data");
2255                        return Ok(());
2256                    }
2257                }
2258                Ok(())
2259            })
2260        }));
2261        return Ok(Some(Box::pin(StreamReader::new(ReceiverStream::new(rx)))));
2262    }
2263}
2264
2265impl<R> Decode<R> for Pin<Box<dyn AsyncRead + Send>>
2266where
2267    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2268{
2269    type Decoder = StreamDecoderRead<R>;
2270    type ListDecoder = ListDecoder<Self::Decoder, R>;
2271}
2272
2273#[cfg(test)]
2274mod tests {
2275    use anyhow::bail;
2276
2277    use super::*;
2278
2279    struct NoopStream;
2280
2281    impl crate::Index<Self> for NoopStream {
2282        fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
2283            panic!("index should not be called with path {path:?}")
2284        }
2285    }
2286
2287    #[test_log::test(tokio::test)]
2288    async fn codec() -> anyhow::Result<()> {
2289        let mut buf = BytesMut::new();
2290        let mut enc = <(u8, u32) as Encode<NoopStream>>::Encoder::default();
2291        enc.encode((0x42, 0x42), &mut buf)?;
2292        if let Some(_f) = Deferred::<NoopStream>::take_deferred(&mut enc) {
2293            bail!("no deferred write should have been returned");
2294        }
2295        assert_eq!(buf.as_ref(), b"\x42\x42");
2296        Ok(())
2297    }
2298}