Skip to main content

wrpc_transport/
value.rs

1use core::any::TypeId;
2use core::fmt::{self, Debug};
3use core::future::{pending, Future};
4use core::hash::{Hash, Hasher};
5use core::iter::zip;
6use core::marker::PhantomData;
7use core::mem;
8use core::ops::{Deref, DerefMut};
9use core::pin::Pin;
10
11use bytes::{Buf as _, BufMut as _, Bytes, BytesMut};
12use futures::stream::{self, FuturesUnordered};
13use futures::{Stream, StreamExt as _, TryStreamExt as _};
14use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::ReceiverStream;
19use tokio_util::codec::{Encoder as _, FramedRead};
20use tokio_util::io::StreamReader;
21use tracing::{debug, error, instrument, trace, Instrument as _, Span};
22use wasm_tokio::cm::{
23    BoolCodec, F32Codec, F64Codec, OptionDecoder, OptionEncoder, PrimValEncoder, ResultDecoder,
24    ResultEncoder, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, TupleEncoder, U16Codec,
25    U32Codec, U64Codec, U8Codec,
26};
27use wasm_tokio::{
28    CoreNameDecoder, CoreNameEncoder, CoreVecDecoder, CoreVecDecoderBytes, CoreVecEncoderBytes,
29    Leb128DecoderI128, Leb128DecoderI16, Leb128DecoderI32, Leb128DecoderI64, Leb128DecoderI8,
30    Leb128DecoderU128, Leb128DecoderU16, Leb128DecoderU32, Leb128DecoderU64, Leb128DecoderU8,
31    Leb128Encoder, Utf8Codec,
32};
33
34use crate::{Incoming, Index as _};
35
36/// Borrowed resource handle, represented as an opaque byte blob
37#[repr(transparent)]
38pub struct ResourceBorrow<T: ?Sized> {
39    repr: Bytes,
40    _ty: PhantomData<T>,
41}
42
43impl<T: ?Sized> From<Bytes> for ResourceBorrow<T> {
44    fn from(repr: Bytes) -> Self {
45        Self {
46            repr,
47            _ty: PhantomData,
48        }
49    }
50}
51
52impl<T: ?Sized> From<Vec<u8>> for ResourceBorrow<T> {
53    fn from(repr: Vec<u8>) -> Self {
54        Self {
55            repr: repr.into(),
56            _ty: PhantomData,
57        }
58    }
59}
60
61impl<T: ?Sized> From<ResourceBorrow<T>> for Bytes {
62    fn from(ResourceBorrow { repr, .. }: ResourceBorrow<T>) -> Self {
63        repr
64    }
65}
66
67impl<T: ?Sized> PartialEq for ResourceBorrow<T> {
68    fn eq(&self, other: &Self) -> bool {
69        self.repr == other.repr
70    }
71}
72
73impl<T: ?Sized> Eq for ResourceBorrow<T> {}
74
75impl<T: ?Sized> Hash for ResourceBorrow<T> {
76    fn hash<H: Hasher>(&self, state: &mut H) {
77        self.repr.hash(state);
78    }
79}
80
81impl<T: ?Sized> AsRef<[u8]> for ResourceBorrow<T> {
82    fn as_ref(&self) -> &[u8] {
83        &self.repr
84    }
85}
86
87impl<T: ?Sized> AsRef<Bytes> for ResourceBorrow<T> {
88    fn as_ref(&self) -> &Bytes {
89        &self.repr
90    }
91}
92
93impl<T: ?Sized + 'static> Debug for ResourceBorrow<T> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(f, "borrow<{:?}>", TypeId::of::<T>())
96    }
97}
98
99impl<T: ?Sized> Clone for ResourceBorrow<T> {
100    fn clone(&self) -> Self {
101        Self {
102            repr: self.repr.clone(),
103            _ty: PhantomData,
104        }
105    }
106}
107
108impl<T: ?Sized> ResourceBorrow<T> {
109    /// Constructs a new borrowed resource handle
110    pub fn new(repr: impl Into<Bytes>) -> Self {
111        Self::from(repr.into())
112    }
113}
114
115/// Owned resource handle, represented as an opaque byte blob
116#[repr(transparent)]
117pub struct ResourceOwn<T: ?Sized> {
118    repr: Bytes,
119    _ty: PhantomData<T>,
120}
121
122impl<T: ?Sized> From<ResourceOwn<T>> for ResourceBorrow<T> {
123    fn from(ResourceOwn { repr, _ty }: ResourceOwn<T>) -> Self {
124        Self {
125            repr,
126            _ty: PhantomData,
127        }
128    }
129}
130
131impl<T: ?Sized> From<Bytes> for ResourceOwn<T> {
132    fn from(repr: Bytes) -> Self {
133        Self {
134            repr,
135            _ty: PhantomData,
136        }
137    }
138}
139
140impl<T: ?Sized> From<Vec<u8>> for ResourceOwn<T> {
141    fn from(repr: Vec<u8>) -> Self {
142        Self {
143            repr: repr.into(),
144            _ty: PhantomData,
145        }
146    }
147}
148
149impl<T: ?Sized> From<ResourceOwn<T>> for Bytes {
150    fn from(ResourceOwn { repr, .. }: ResourceOwn<T>) -> Self {
151        repr
152    }
153}
154
155impl<T: ?Sized> PartialEq for ResourceOwn<T> {
156    fn eq(&self, other: &Self) -> bool {
157        self.repr == other.repr
158    }
159}
160
161impl<T: ?Sized> Eq for ResourceOwn<T> {}
162
163impl<T: ?Sized> Hash for ResourceOwn<T> {
164    fn hash<H: Hasher>(&self, state: &mut H) {
165        self.repr.hash(state);
166    }
167}
168
169impl<T: ?Sized> AsRef<[u8]> for ResourceOwn<T> {
170    fn as_ref(&self) -> &[u8] {
171        &self.repr
172    }
173}
174
175impl<T: ?Sized> AsRef<Bytes> for ResourceOwn<T> {
176    fn as_ref(&self) -> &Bytes {
177        &self.repr
178    }
179}
180
181impl<T: ?Sized + 'static> Debug for ResourceOwn<T> {
182    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183        write!(f, "own<{:?}>", TypeId::of::<T>())
184    }
185}
186
187impl<T: ?Sized> Clone for ResourceOwn<T> {
188    fn clone(&self) -> Self {
189        Self {
190            repr: self.repr.clone(),
191            _ty: PhantomData,
192        }
193    }
194}
195
196impl<T: ?Sized> ResourceOwn<T> {
197    /// Constructs a new owned resource handle
198    pub fn new(repr: impl Into<Bytes>) -> Self {
199        Self::from(repr.into())
200    }
201
202    /// Returns the owned handle as [`ResourceBorrow`]
203    pub fn as_borrow(&self) -> ResourceBorrow<T> {
204        ResourceBorrow {
205            repr: self.repr.clone(),
206            _ty: PhantomData,
207        }
208    }
209}
210
211/// Deferred operation used for async value processing
212pub type DeferredFn<T> = Box<
213    dyn FnOnce(T, Vec<usize>) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>> + Send,
214>;
215
216/// Handles async processing state for codecs
217pub trait Deferred<T> {
218    /// Takes a deferred async processing operation, if any
219    fn take_deferred(&mut self) -> Option<DeferredFn<T>>;
220}
221
222macro_rules! impl_deferred_sync {
223    ($t:ty) => {
224        impl<T> Deferred<T> for $t {
225            fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
226                None
227            }
228        }
229    };
230}
231
232impl_deferred_sync!(BoolCodec);
233impl_deferred_sync!(S8Codec);
234impl_deferred_sync!(U8Codec);
235impl_deferred_sync!(S16Codec);
236impl_deferred_sync!(U16Codec);
237impl_deferred_sync!(S32Codec);
238impl_deferred_sync!(U32Codec);
239impl_deferred_sync!(S64Codec);
240impl_deferred_sync!(U64Codec);
241impl_deferred_sync!(F32Codec);
242impl_deferred_sync!(F64Codec);
243impl_deferred_sync!(CoreNameDecoder);
244impl_deferred_sync!(CoreNameEncoder);
245impl_deferred_sync!(CoreVecDecoderBytes);
246impl_deferred_sync!(CoreVecEncoderBytes);
247impl_deferred_sync!(Utf8Codec);
248impl_deferred_sync!(PrimValEncoder);
249impl_deferred_sync!(Leb128Encoder);
250impl_deferred_sync!(Leb128DecoderI8);
251impl_deferred_sync!(Leb128DecoderU8);
252impl_deferred_sync!(Leb128DecoderI16);
253impl_deferred_sync!(Leb128DecoderU16);
254impl_deferred_sync!(Leb128DecoderI32);
255impl_deferred_sync!(Leb128DecoderU32);
256impl_deferred_sync!(Leb128DecoderI64);
257impl_deferred_sync!(Leb128DecoderU64);
258impl_deferred_sync!(Leb128DecoderI128);
259impl_deferred_sync!(Leb128DecoderU128);
260impl_deferred_sync!(ResourceEncoder);
261impl_deferred_sync!(UnitCodec);
262impl_deferred_sync!(ListDecoderU8);
263
264impl_deferred_sync!(CoreVecDecoder<BoolCodec>);
265impl_deferred_sync!(CoreVecDecoder<S8Codec>);
266impl_deferred_sync!(CoreVecDecoder<U8Codec>);
267impl_deferred_sync!(CoreVecDecoder<S16Codec>);
268impl_deferred_sync!(CoreVecDecoder<U16Codec>);
269impl_deferred_sync!(CoreVecDecoder<S32Codec>);
270impl_deferred_sync!(CoreVecDecoder<U32Codec>);
271impl_deferred_sync!(CoreVecDecoder<S64Codec>);
272impl_deferred_sync!(CoreVecDecoder<U64Codec>);
273impl_deferred_sync!(CoreVecDecoder<F32Codec>);
274impl_deferred_sync!(CoreVecDecoder<F64Codec>);
275impl_deferred_sync!(CoreVecDecoder<CoreNameDecoder>);
276impl_deferred_sync!(CoreVecDecoder<CoreVecDecoderBytes>);
277impl_deferred_sync!(CoreVecDecoder<Utf8Codec>);
278impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI8>);
279impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU8>);
280impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI16>);
281impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU16>);
282impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI32>);
283impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU32>);
284impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI64>);
285impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU64>);
286impl_deferred_sync!(CoreVecDecoder<Leb128DecoderI128>);
287impl_deferred_sync!(CoreVecDecoder<Leb128DecoderU128>);
288impl_deferred_sync!(CoreVecDecoder<UnitCodec>);
289
290/// Codec for synchronous values
291///
292/// This is a wrapper struct, which provides a no-op [Deferred] implementation
293/// for any codec.
294pub struct SyncCodec<T>(pub T);
295
296impl<T> Deref for SyncCodec<T> {
297    type Target = T;
298
299    fn deref(&self) -> &Self::Target {
300        &self.0
301    }
302}
303
304impl<T> DerefMut for SyncCodec<T> {
305    fn deref_mut(&mut self) -> &mut Self::Target {
306        &mut self.0
307    }
308}
309
310impl<T, C> Deferred<T> for SyncCodec<C> {
311    fn take_deferred(&mut self) -> Option<DeferredFn<T>> {
312        None
313    }
314}
315
316impl<T: Default> Default for SyncCodec<T> {
317    fn default() -> Self {
318        Self(T::default())
319    }
320}
321
322impl<T, I> tokio_util::codec::Encoder<I> for SyncCodec<T>
323where
324    T: tokio_util::codec::Encoder<I>,
325{
326    type Error = T::Error;
327
328    fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
329        self.0.encode(item, dst)
330    }
331}
332
333impl<T> tokio_util::codec::Decoder for SyncCodec<T>
334where
335    T: tokio_util::codec::Decoder,
336{
337    type Item = T::Item;
338    type Error = T::Error;
339
340    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
341        self.0.decode(src)
342    }
343
344    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
345        self.0.decode_eof(buf)
346    }
347
348    fn framed<IO: AsyncRead + AsyncWrite + Sized>(
349        self,
350        io: IO,
351    ) -> tokio_util::codec::Framed<IO, Self>
352    where
353        Self: Sized,
354    {
355        self.0.framed(io).map_codec(Self)
356    }
357}
358
359#[instrument(level = "trace", skip(w, deferred))]
360async fn handle_deferred<T, I>(w: T, deferred: I, mut path: Vec<usize>) -> std::io::Result<()>
361where
362    I: IntoIterator<Item = Option<DeferredFn<T>>>,
363    I::IntoIter: ExactSizeIterator,
364    T: crate::Index<T>,
365{
366    let mut futs = FuturesUnordered::default();
367    for (i, f) in zip(0.., deferred) {
368        if let Some(f) = f {
369            path.push(i);
370            let w = w.index(&path).map_err(std::io::Error::other)?;
371            path.pop();
372            futs.push(f(w, Vec::default()));
373        }
374    }
375    while let Some(()) = futs.try_next().await? {}
376    Ok(())
377}
378
379/// Defines value encoding
380pub trait Encode<T>: Sized {
381    /// Encoder used to encode the value
382    type Encoder: tokio_util::codec::Encoder<Self> + Deferred<T> + Default + Send;
383
384    /// Convenience function for encoding a value
385    #[instrument(level = "trace", skip(self, enc))]
386    fn encode(
387        self,
388        enc: &mut Self::Encoder,
389        dst: &mut BytesMut,
390    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
391    {
392        enc.encode(self, dst)?;
393        Ok(enc.take_deferred())
394    }
395
396    /// Encode an iterator of owned values
397    #[instrument(level = "trace", skip(items, enc))]
398    fn encode_iter_own<I>(
399        items: I,
400        enc: &mut Self::Encoder,
401        dst: &mut BytesMut,
402    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
403    where
404        I: IntoIterator<Item = Self>,
405        I::IntoIter: ExactSizeIterator,
406        T: crate::Index<T> + Send + Sync + 'static,
407    {
408        let items = items.into_iter();
409        dst.reserve(items.len());
410        let mut deferred = Vec::with_capacity(items.len());
411        for item in items {
412            enc.encode(item, dst)?;
413            deferred.push(enc.take_deferred());
414        }
415        if deferred.iter().any(Option::is_some) {
416            Ok(Some(Box::new(move |w, path| {
417                Box::pin(handle_deferred(w, deferred, path))
418            })))
419        } else {
420            Ok(None)
421        }
422    }
423
424    /// Encode an iterator of value references
425    #[instrument(level = "trace", skip(items, enc))]
426    fn encode_iter_ref<'a, I>(
427        items: I,
428        enc: &mut Self::Encoder,
429        dst: &mut BytesMut,
430    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
431    where
432        I: IntoIterator<Item = &'a Self>,
433        I::IntoIter: ExactSizeIterator,
434        T: crate::Index<T> + Send + Sync + 'static,
435        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
436    {
437        let items = items.into_iter();
438        dst.reserve(items.len());
439        let mut deferred = Vec::with_capacity(items.len());
440        for item in items {
441            enc.encode(item, dst)?;
442            deferred.push(enc.take_deferred());
443        }
444        if deferred.iter().any(Option::is_some) {
445            Ok(Some(Box::new(move |w, path| {
446                Box::pin(handle_deferred(w, deferred, path))
447            })))
448        } else {
449            Ok(None)
450        }
451    }
452
453    /// Encode a list of owned values
454    #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
455    fn encode_list_own(
456        items: Vec<Self>,
457        enc: &mut Self::Encoder,
458        dst: &mut BytesMut,
459    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
460    where
461        T: crate::Index<T> + Send + Sync + 'static,
462    {
463        let n = u32::try_from(items.len())
464            .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
465        dst.reserve(5 + items.len());
466        Leb128Encoder.encode(n, dst)?;
467        Self::encode_iter_own(items, enc, dst)
468    }
469
470    /// Encode a list of value references
471    #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))]
472    fn encode_list_ref<'a>(
473        items: &'a [Self],
474        enc: &mut Self::Encoder,
475        dst: &mut BytesMut,
476    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
477    where
478        T: crate::Index<T> + Send + Sync + 'static,
479        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
480    {
481        let n = u32::try_from(items.len())
482            .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
483        dst.reserve(5 + items.len());
484        Leb128Encoder.encode(n, dst)?;
485        Self::encode_iter_ref(items, enc, dst)
486    }
487}
488
489/// Defines value decoding
490pub trait Decode<T>: Sized {
491    /// Decoder used to decode value
492    type Decoder: tokio_util::codec::Decoder<Item = Self>
493        + Deferred<Incoming<T>>
494        + Default
495        + Send
496        + 'static;
497    /// Decoder used to decode lists of value
498    type ListDecoder: tokio_util::codec::Decoder<Item = Vec<Self>> + Default + 'static;
499}
500
501impl<T, W> Deferred<W> for OptionEncoder<T>
502where
503    T: Deferred<W>,
504{
505    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
506        self.0.take_deferred()
507    }
508}
509
510impl<T, W> Encode<W> for Option<T>
511where
512    T: Encode<W>,
513{
514    type Encoder = OptionEncoder<T::Encoder>;
515}
516
517impl<'a, T, W> Encode<W> for &'a Option<T>
518where
519    T: Encode<W>,
520    T::Encoder: tokio_util::codec::Encoder<&'a T>,
521{
522    type Encoder = OptionEncoder<T::Encoder>;
523}
524
525impl<T, W> Deferred<W> for OptionDecoder<T>
526where
527    T: Deferred<W> + Default,
528{
529    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
530        mem::take(self).into_inner().take_deferred()
531    }
532}
533
534impl<T, R> Decode<R> for Option<T>
535where
536    T: Decode<R>,
537    R: crate::Index<R> + Send + 'static,
538{
539    type Decoder = OptionDecoder<T::Decoder>;
540    type ListDecoder = ListDecoder<Self::Decoder, R>;
541}
542
543impl<O, E, W> Deferred<W> for ResultEncoder<O, E>
544where
545    O: Deferred<W>,
546    E: Deferred<W>,
547{
548    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
549        match (self.ok.take_deferred(), self.err.take_deferred()) {
550            (None, None) => None,
551            (Some(ok), None) => Some(ok),
552            (None, Some(err)) => Some(err),
553            (Some(ok), Some(_)) => {
554                if cfg!(debug_assertions) {
555                    panic!("both `result::ok` and `result::err` deferred function set")
556                }
557                Some(ok)
558            }
559        }
560    }
561}
562
563impl<O, E, W> Encode<W> for Result<O, E>
564where
565    O: Encode<W>,
566    E: Encode<W>,
567    std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<O>>::Error>,
568    std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<E>>::Error>,
569{
570    type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
571}
572
573impl<'a, O, E, W> Encode<W> for &'a Result<O, E>
574where
575    O: Encode<W>,
576    O::Encoder: tokio_util::codec::Encoder<&'a O>,
577    E: Encode<W>,
578    E::Encoder: tokio_util::codec::Encoder<&'a E>,
579    std::io::Error: From<<O::Encoder as tokio_util::codec::Encoder<&'a O>>::Error>,
580    std::io::Error: From<<E::Encoder as tokio_util::codec::Encoder<&'a E>>::Error>,
581{
582    type Encoder = ResultEncoder<O::Encoder, E::Encoder>;
583}
584
585impl<O, E, W> Deferred<W> for ResultDecoder<O, E>
586where
587    O: Deferred<W> + Default,
588    E: Deferred<W> + Default,
589{
590    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
591        let (mut ok, mut err) = mem::take(self).into_inner();
592        match (ok.take_deferred(), err.take_deferred()) {
593            (None, None) => None,
594            (Some(ok), None) => Some(ok),
595            (None, Some(err)) => Some(err),
596            (Some(ok), Some(_)) => {
597                if cfg!(debug_assertions) {
598                    panic!("both `result::ok` and `result::err` deferred function set")
599                }
600                Some(ok)
601            }
602        }
603    }
604}
605
606impl<O, E, R> Decode<R> for Result<O, E>
607where
608    O: Decode<R>,
609    E: Decode<R>,
610    R: crate::Index<R> + Send + 'static,
611    std::io::Error: From<<O::Decoder as tokio_util::codec::Decoder>::Error>,
612    std::io::Error: From<<E::Decoder as tokio_util::codec::Decoder>::Error>,
613{
614    type Decoder = ResultDecoder<O::Decoder, E::Decoder>;
615    type ListDecoder = ListDecoder<Self::Decoder, R>;
616}
617
618/// Encoder for `list<T>`
619pub struct ListEncoder<W> {
620    deferred: Option<DeferredFn<W>>,
621}
622
623impl<W> Default for ListEncoder<W> {
624    fn default() -> Self {
625        Self { deferred: None }
626    }
627}
628
629impl<W> Deferred<W> for ListEncoder<W> {
630    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
631        self.deferred.take()
632    }
633}
634
635impl<T, W> tokio_util::codec::Encoder<Vec<T>> for ListEncoder<W>
636where
637    T: Encode<W>,
638    W: crate::Index<W> + Send + Sync + 'static,
639{
640    type Error = <T::Encoder as tokio_util::codec::Encoder<T>>::Error;
641
642    fn encode(&mut self, items: Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
643        let mut enc = T::Encoder::default();
644        self.deferred = T::encode_list_own(items, &mut enc, dst)?;
645        Ok(())
646    }
647}
648
649impl<'a, T, W> tokio_util::codec::Encoder<&'a Vec<T>> for ListEncoder<W>
650where
651    T: Encode<W>,
652    T::Encoder: tokio_util::codec::Encoder<&'a T>,
653    W: crate::Index<W> + Send + Sync + 'static,
654{
655    type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
656
657    fn encode(&mut self, items: &'a Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
658        let mut enc = T::Encoder::default();
659        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
660        Ok(())
661    }
662}
663
664impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b Vec<T>> for ListEncoder<W>
665where
666    T: Encode<W>,
667    T::Encoder: tokio_util::codec::Encoder<&'b T>,
668    W: crate::Index<W> + Send + Sync + 'static,
669{
670    type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
671
672    fn encode(&mut self, items: &'a &'b Vec<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
673        let mut enc = T::Encoder::default();
674        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
675        Ok(())
676    }
677}
678
679impl<'a, T, W> tokio_util::codec::Encoder<&'a [T]> for ListEncoder<W>
680where
681    T: Encode<W>,
682    T::Encoder: tokio_util::codec::Encoder<&'a T>,
683    W: crate::Index<W> + Send + Sync + 'static,
684{
685    type Error = <T::Encoder as tokio_util::codec::Encoder<&'a T>>::Error;
686
687    fn encode(&mut self, items: &'a [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
688        let mut enc = T::Encoder::default();
689        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
690        Ok(())
691    }
692}
693
694impl<'a, 'b, T, W> tokio_util::codec::Encoder<&'a &'b [T]> for ListEncoder<W>
695where
696    T: Encode<W>,
697    T::Encoder: tokio_util::codec::Encoder<&'b T>,
698    W: crate::Index<W> + Send + Sync + 'static,
699{
700    type Error = <T::Encoder as tokio_util::codec::Encoder<&'b T>>::Error;
701
702    fn encode(&mut self, items: &'a &'b [T], dst: &mut BytesMut) -> Result<(), Self::Error> {
703        let mut enc = T::Encoder::default();
704        self.deferred = T::encode_list_ref(items, &mut enc, dst)?;
705        Ok(())
706    }
707}
708
709impl<T, W> Encode<W> for Vec<T>
710where
711    T: Encode<W>,
712    W: crate::Index<W> + Send + Sync + 'static,
713{
714    type Encoder = ListEncoder<W>;
715}
716
717impl<'a, T, W> Encode<W> for &'a Vec<T>
718where
719    T: Encode<W>,
720    T::Encoder: tokio_util::codec::Encoder<&'a T>,
721    W: crate::Index<W> + Send + Sync + 'static,
722{
723    type Encoder = ListEncoder<W>;
724}
725
726impl<'a, T, W> Encode<W> for &'a [T]
727where
728    T: Encode<W>,
729    T::Encoder: tokio_util::codec::Encoder<&'a T>,
730    W: crate::Index<W> + Send + Sync + 'static,
731{
732    type Encoder = ListEncoder<W>;
733}
734
735/// Decoder for `list<T>`
736pub struct ListDecoder<T, R>
737where
738    T: tokio_util::codec::Decoder,
739{
740    dec: T,
741    ret: Vec<T::Item>,
742    cap: usize,
743    deferred: Vec<Option<DeferredFn<Incoming<R>>>>,
744}
745
746impl<T, R> ListDecoder<T, R>
747where
748    T: tokio_util::codec::Decoder,
749{
750    /// Constructs a new list decoder
751    pub fn new(dec: T) -> Self {
752        Self {
753            dec,
754            ret: Vec::default(),
755            cap: 0,
756            deferred: vec![],
757        }
758    }
759}
760
761impl<T, R> Default for ListDecoder<T, R>
762where
763    T: tokio_util::codec::Decoder + Default,
764{
765    fn default() -> Self {
766        Self::new(T::default())
767    }
768}
769
770impl<T, R> Deferred<Incoming<R>> for ListDecoder<T, R>
771where
772    T: tokio_util::codec::Decoder,
773    R: crate::Index<R> + Send + Sync + 'static,
774{
775    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
776        let deferred = mem::take(&mut self.deferred);
777        if deferred.iter().any(Option::is_some) {
778            Some(Box::new(|r, path| {
779                Box::pin(handle_deferred(r, deferred, path))
780            }))
781        } else {
782            None
783        }
784    }
785}
786
787impl<T, R> tokio_util::codec::Decoder for ListDecoder<T, R>
788where
789    T: tokio_util::codec::Decoder + Deferred<Incoming<R>>,
790{
791    type Item = Vec<T::Item>;
792    type Error = T::Error;
793
794    #[instrument(level = "trace", skip(self), fields(ty = "list"))]
795    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
796        if self.cap == 0 {
797            let Some(len) = Leb128DecoderU32.decode(src)? else {
798                return Ok(None);
799            };
800            if len == 0 {
801                return Ok(Some(Vec::default()));
802            }
803            let len = len
804                .try_into()
805                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
806            self.ret = Vec::with_capacity(len);
807            self.deferred = Vec::with_capacity(len);
808            self.cap = len;
809        }
810        while self.cap > 0 {
811            let Some(v) = self.dec.decode(src)? else {
812                return Ok(None);
813            };
814            self.ret.push(v);
815            self.deferred.push(self.dec.take_deferred());
816            self.cap -= 1;
817        }
818        Ok(Some(mem::take(&mut self.ret)))
819    }
820}
821
822impl<T, R> Decode<R> for Vec<T>
823where
824    T: Decode<R> + Send,
825    T::ListDecoder: Deferred<Incoming<R>> + Send,
826    R: crate::Index<R> + Send + 'static,
827{
828    type Decoder = T::ListDecoder;
829    type ListDecoder = ListDecoder<Self::Decoder, R>;
830}
831
832macro_rules! impl_copy_codec {
833    ($t:ty, $c:tt) => {
834        impl<W> Encode<W> for $t {
835            type Encoder = $c;
836
837            #[instrument(level = "trace", skip(items))]
838            fn encode_iter_own<I>(
839                items: I,
840                enc: &mut Self::Encoder,
841                dst: &mut BytesMut,
842            ) -> Result<
843                Option<DeferredFn<W>>,
844                <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
845            >
846            where
847                I: IntoIterator<Item = Self>,
848                I::IntoIter: ExactSizeIterator,
849            {
850                let items = items.into_iter();
851                dst.reserve(items.len());
852                for item in items {
853                    enc.encode(item, dst)?;
854                }
855                Ok(None)
856            }
857
858            #[instrument(level = "trace", skip(items))]
859            fn encode_iter_ref<'a, I>(
860                items: I,
861                enc: &mut Self::Encoder,
862                dst: &mut BytesMut,
863            ) -> Result<
864                Option<DeferredFn<W>>,
865                <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
866            >
867            where
868                I: IntoIterator<Item = &'a Self>,
869                I::IntoIter: ExactSizeIterator,
870            {
871                let items = items.into_iter();
872                dst.reserve(items.len());
873                for item in items {
874                    enc.encode(*item, dst)?;
875                }
876                Ok(None)
877            }
878        }
879
880        impl<'b, W> Encode<W> for &'b $t {
881            type Encoder = $c;
882
883            #[instrument(level = "trace", skip(items))]
884            fn encode_iter_own<I>(
885                items: I,
886                enc: &mut Self::Encoder,
887                dst: &mut BytesMut,
888            ) -> Result<
889                Option<DeferredFn<W>>,
890                <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error,
891            >
892            where
893                I: IntoIterator<Item = Self>,
894                I::IntoIter: ExactSizeIterator,
895            {
896                let items = items.into_iter();
897                dst.reserve(items.len());
898                for item in items {
899                    enc.encode(*item, dst)?;
900                }
901                Ok(None)
902            }
903
904            #[instrument(level = "trace", skip(items))]
905            fn encode_iter_ref<'a, I>(
906                items: I,
907                enc: &mut Self::Encoder,
908                dst: &mut BytesMut,
909            ) -> Result<
910                Option<DeferredFn<W>>,
911                <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error,
912            >
913            where
914                I: IntoIterator<Item = &'a Self>,
915                I::IntoIter: ExactSizeIterator,
916                'b: 'a,
917            {
918                let items = items.into_iter();
919                dst.reserve(items.len());
920                for item in items {
921                    enc.encode(item, dst)?;
922                }
923                Ok(None)
924            }
925        }
926
927        impl<R> Decode<R> for $t {
928            type Decoder = $c;
929            type ListDecoder = CoreVecDecoder<Self::Decoder>;
930        }
931    };
932}
933
934// The Component Model canonical ABI mandates a single canonical `NaN`
935// representation for floating point values. Encoding canonicalizes `NaN`s to
936// match; decoding is lenient and accepts any `NaN` representation.
937//
938// See `canonicalize_nan{32,64}` in
939// <https://github.com/WebAssembly/component-model/blob/main/design/mvp/canonical-abi/definitions.py>.
940const CANONICAL_NAN_F32: u32 = 0x7fc0_0000;
941const CANONICAL_NAN_F64: u64 = 0x7ff8_0000_0000_0000;
942
943/// Defines a floating-point codec that canonicalizes `NaN` values on encode to
944/// match the Component Model canonical ABI, delegating the actual byte encoding
945/// and decoding to the wrapped `wasm-tokio` codec.
946macro_rules! impl_canonical_nan_codec {
947    ($name:ident, $inner:ty, $t:ty, $canon:expr) => {
948        #[doc = concat!("Canonicalizes `NaN`s on encode, wrapping [`", stringify!($inner), "`].")]
949        #[derive(Debug, Default)]
950        pub struct $name($inner);
951
952        impl tokio_util::codec::Encoder<$t> for $name {
953            type Error = std::io::Error;
954
955            fn encode(&mut self, item: $t, dst: &mut BytesMut) -> Result<(), Self::Error> {
956                let item = if item.is_nan() {
957                    <$t>::from_bits($canon)
958                } else {
959                    item
960                };
961                self.0.encode(item, dst)
962            }
963        }
964
965        impl tokio_util::codec::Encoder<&$t> for $name {
966            type Error = std::io::Error;
967
968            fn encode(&mut self, item: &$t, dst: &mut BytesMut) -> Result<(), Self::Error> {
969                tokio_util::codec::Encoder::<$t>::encode(self, *item, dst)
970            }
971        }
972
973        impl tokio_util::codec::Encoder<&&$t> for $name {
974            type Error = std::io::Error;
975
976            fn encode(&mut self, item: &&$t, dst: &mut BytesMut) -> Result<(), Self::Error> {
977                tokio_util::codec::Encoder::<$t>::encode(self, **item, dst)
978            }
979        }
980
981        impl tokio_util::codec::Decoder for $name {
982            type Item = $t;
983            type Error = std::io::Error;
984
985            fn decode(&mut self, src: &mut BytesMut) -> Result<Option<$t>, Self::Error> {
986                self.0.decode(src)
987            }
988        }
989
990        impl_deferred_sync!($name);
991        impl_deferred_sync!(CoreVecDecoder<$name>);
992    };
993}
994
995impl_canonical_nan_codec!(CanonicalNanF32Codec, F32Codec, f32, CANONICAL_NAN_F32);
996impl_canonical_nan_codec!(CanonicalNanF64Codec, F64Codec, f64, CANONICAL_NAN_F64);
997
998impl_copy_codec!(bool, BoolCodec);
999impl_copy_codec!(i8, S8Codec);
1000impl_copy_codec!(i16, S16Codec);
1001impl_copy_codec!(u16, U16Codec);
1002impl_copy_codec!(i32, S32Codec);
1003impl_copy_codec!(u32, U32Codec);
1004impl_copy_codec!(i64, S64Codec);
1005impl_copy_codec!(u64, U64Codec);
1006impl_copy_codec!(f32, CanonicalNanF32Codec);
1007impl_copy_codec!(f64, CanonicalNanF64Codec);
1008impl_copy_codec!(char, Utf8Codec);
1009
1010impl<T> Encode<T> for u8 {
1011    type Encoder = U8Codec;
1012
1013    #[instrument(level = "trace", skip(items))]
1014    fn encode_iter_own<I>(
1015        items: I,
1016        enc: &mut Self::Encoder,
1017        dst: &mut BytesMut,
1018    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1019    where
1020        I: IntoIterator<Item = Self>,
1021        I::IntoIter: ExactSizeIterator,
1022    {
1023        let items = items.into_iter();
1024        dst.reserve(items.len());
1025        dst.extend(items);
1026        Ok(None)
1027    }
1028
1029    #[instrument(level = "trace", skip(items))]
1030    fn encode_iter_ref<'a, I>(
1031        items: I,
1032        enc: &mut Self::Encoder,
1033        dst: &mut BytesMut,
1034    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1035    where
1036        I: IntoIterator<Item = &'a Self>,
1037        I::IntoIter: ExactSizeIterator,
1038    {
1039        let items = items.into_iter();
1040        dst.reserve(items.len());
1041        dst.extend(items);
1042        Ok(None)
1043    }
1044
1045    #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
1046    fn encode_list_own(
1047        items: Vec<Self>,
1048        enc: &mut Self::Encoder,
1049        dst: &mut BytesMut,
1050    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1051    {
1052        CoreVecEncoderBytes.encode(items, dst)?;
1053        Ok(None)
1054    }
1055
1056    #[instrument(level = "trace", skip(items), fields(ty = "list<u8>"))]
1057    fn encode_list_ref<'a>(
1058        items: &'a [Self],
1059        enc: &mut Self::Encoder,
1060        dst: &mut BytesMut,
1061    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1062    where
1063        Self::Encoder: tokio_util::codec::Encoder<&'a Self>,
1064    {
1065        CoreVecEncoderBytes.encode(items, dst)?;
1066        Ok(None)
1067    }
1068}
1069
1070impl<'b, T> Encode<T> for &'b u8 {
1071    type Encoder = U8Codec;
1072
1073    #[instrument(level = "trace", skip(items))]
1074    fn encode_iter_own<I>(
1075        items: I,
1076        enc: &mut Self::Encoder,
1077        dst: &mut BytesMut,
1078    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<Self>>::Error>
1079    where
1080        I: IntoIterator<Item = Self>,
1081        I::IntoIter: ExactSizeIterator,
1082    {
1083        let items = items.into_iter();
1084        dst.reserve(items.len());
1085        dst.extend(items);
1086        Ok(None)
1087    }
1088
1089    #[instrument(level = "trace", skip(items))]
1090    fn encode_iter_ref<'a, I>(
1091        items: I,
1092        enc: &mut Self::Encoder,
1093        dst: &mut BytesMut,
1094    ) -> Result<Option<DeferredFn<T>>, <Self::Encoder as tokio_util::codec::Encoder<&'a Self>>::Error>
1095    where
1096        I: IntoIterator<Item = &'a Self>,
1097        I::IntoIter: ExactSizeIterator,
1098        'b: 'a,
1099    {
1100        let items = items.into_iter();
1101        dst.reserve(items.len());
1102        dst.extend(items.map(|b| **b));
1103        Ok(None)
1104    }
1105}
1106
1107/// Decoder for `list<u8>`
1108#[derive(Debug, Default)]
1109#[repr(transparent)]
1110pub struct ListDecoderU8(CoreVecDecoderBytes);
1111
1112impl tokio_util::codec::Decoder for ListDecoderU8 {
1113    type Item = Vec<u8>;
1114    type Error = <CoreVecDecoderBytes as tokio_util::codec::Decoder>::Error;
1115
1116    #[instrument(level = "trace", skip(self), fields(ty = "list<u8>"))]
1117    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1118        let Some(buf) = self.0.decode(src)? else {
1119            return Ok(None);
1120        };
1121        Ok(Some(buf.into()))
1122    }
1123}
1124
1125impl<R> Decode<R> for u8 {
1126    type Decoder = U8Codec;
1127    type ListDecoder = ListDecoderU8;
1128}
1129
1130impl<W> Encode<W> for &str {
1131    type Encoder = CoreNameEncoder;
1132}
1133
1134impl<W> Encode<W> for &&str {
1135    type Encoder = CoreNameEncoder;
1136}
1137
1138impl<W> Encode<W> for String {
1139    type Encoder = CoreNameEncoder;
1140}
1141
1142impl<W> Encode<W> for &String {
1143    type Encoder = CoreNameEncoder;
1144}
1145
1146impl<R> Decode<R> for String {
1147    type Decoder = CoreNameDecoder;
1148    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1149}
1150
1151impl<W> Encode<W> for Bytes {
1152    type Encoder = CoreVecEncoderBytes;
1153}
1154
1155impl<W> Encode<W> for &Bytes {
1156    type Encoder = CoreVecEncoderBytes;
1157}
1158
1159impl<R> Decode<R> for Bytes {
1160    type Decoder = CoreVecDecoderBytes;
1161    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1162}
1163
1164/// Encoder for `resource` types
1165#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1166#[repr(transparent)]
1167pub struct ResourceEncoder;
1168
1169impl<T: ?Sized> tokio_util::codec::Encoder<ResourceOwn<T>> for ResourceEncoder {
1170    type Error = std::io::Error;
1171
1172    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1173    fn encode(&mut self, item: ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1174        CoreVecEncoderBytes.encode(item.repr, dst)
1175    }
1176}
1177
1178impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceOwn<T>> for ResourceEncoder {
1179    type Error = std::io::Error;
1180
1181    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))]
1182    fn encode(&mut self, item: &ResourceOwn<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1183        CoreVecEncoderBytes.encode(&item.repr, dst)
1184    }
1185}
1186
1187impl<T: ?Sized, W> Encode<W> for ResourceOwn<T> {
1188    type Encoder = ResourceEncoder;
1189}
1190
1191impl<T: ?Sized, W> Encode<W> for &ResourceOwn<T> {
1192    type Encoder = ResourceEncoder;
1193}
1194
1195impl<T: ?Sized> tokio_util::codec::Encoder<ResourceBorrow<T>> for ResourceEncoder {
1196    type Error = std::io::Error;
1197
1198    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1199    fn encode(&mut self, item: ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1200        CoreVecEncoderBytes.encode(item.repr, dst)
1201    }
1202}
1203
1204impl<T: ?Sized> tokio_util::codec::Encoder<&ResourceBorrow<T>> for ResourceEncoder {
1205    type Error = std::io::Error;
1206
1207    #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))]
1208    fn encode(&mut self, item: &ResourceBorrow<T>, dst: &mut BytesMut) -> std::io::Result<()> {
1209        CoreVecEncoderBytes.encode(&item.repr, dst)
1210    }
1211}
1212
1213impl<T: ?Sized, W> Encode<W> for ResourceBorrow<T> {
1214    type Encoder = ResourceEncoder;
1215}
1216
1217impl<T: ?Sized, W> Encode<W> for &ResourceBorrow<T> {
1218    type Encoder = ResourceEncoder;
1219}
1220
1221/// Decoder for borrowed resource types
1222#[derive(Debug)]
1223#[repr(transparent)]
1224pub struct ResourceBorrowDecoder<T: ?Sized> {
1225    dec: CoreVecDecoderBytes,
1226    _ty: PhantomData<T>,
1227}
1228
1229impl<T: ?Sized> Default for ResourceBorrowDecoder<T> {
1230    fn default() -> Self {
1231        Self {
1232            dec: CoreVecDecoderBytes::default(),
1233            _ty: PhantomData,
1234        }
1235    }
1236}
1237
1238impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceBorrowDecoder<T> {
1239    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1240        None
1241    }
1242}
1243
1244impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceBorrowDecoder<T>> {
1245    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1246        None
1247    }
1248}
1249
1250impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceBorrow<T> {
1251    type Decoder = ResourceBorrowDecoder<T>;
1252    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1253}
1254
1255impl<T: ?Sized> tokio_util::codec::Decoder for ResourceBorrowDecoder<T> {
1256    type Item = ResourceBorrow<T>;
1257    type Error = std::io::Error;
1258
1259    #[instrument(level = "trace", skip(self), fields(ty = "borrow"))]
1260    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1261        let repr = self.dec.decode(src)?;
1262        Ok(repr.map(Self::Item::from))
1263    }
1264}
1265
1266/// Decoder for owned resource types
1267#[derive(Debug)]
1268#[repr(transparent)]
1269pub struct ResourceOwnDecoder<T: ?Sized> {
1270    dec: CoreVecDecoderBytes,
1271    _ty: PhantomData<T>,
1272}
1273
1274impl<T: ?Sized> Default for ResourceOwnDecoder<T> {
1275    fn default() -> Self {
1276        Self {
1277            dec: CoreVecDecoderBytes::default(),
1278            _ty: PhantomData,
1279        }
1280    }
1281}
1282
1283impl<R, T: ?Sized> Deferred<Incoming<R>> for ResourceOwnDecoder<T> {
1284    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1285        None
1286    }
1287}
1288
1289impl<R, T: ?Sized> Deferred<Incoming<R>> for CoreVecDecoder<ResourceOwnDecoder<T>> {
1290    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1291        None
1292    }
1293}
1294
1295impl<R, T: ?Sized + Send + 'static> Decode<R> for ResourceOwn<T> {
1296    type Decoder = ResourceOwnDecoder<T>;
1297    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1298}
1299
1300impl<T: ?Sized> tokio_util::codec::Decoder for ResourceOwnDecoder<T> {
1301    type Item = ResourceOwn<T>;
1302    type Error = std::io::Error;
1303
1304    #[instrument(level = "trace", skip(self), fields(ty = "own"))]
1305    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1306        let repr = self.dec.decode(src)?;
1307        Ok(repr.map(Self::Item::from))
1308    }
1309}
1310
1311/// Codec for `()`
1312#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
1313#[repr(transparent)]
1314pub struct UnitCodec;
1315
1316impl tokio_util::codec::Encoder<()> for UnitCodec {
1317    type Error = std::io::Error;
1318
1319    #[instrument(level = "trace", skip(self), ret)]
1320    fn encode(&mut self, (): (), dst: &mut BytesMut) -> std::io::Result<()> {
1321        Ok(())
1322    }
1323}
1324
1325impl tokio_util::codec::Encoder<&()> for UnitCodec {
1326    type Error = std::io::Error;
1327
1328    #[instrument(level = "trace", skip(self), ret)]
1329    fn encode(&mut self, (): &(), dst: &mut BytesMut) -> std::io::Result<()> {
1330        Ok(())
1331    }
1332}
1333
1334impl tokio_util::codec::Decoder for UnitCodec {
1335    type Item = ();
1336    type Error = std::io::Error;
1337
1338    #[instrument(level = "trace", skip(self))]
1339    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1340        Ok(Some(()))
1341    }
1342}
1343
1344/// Marker trait for [Encode] tuple types
1345pub trait TupleEncode<W>: Encode<W> {}
1346
1347/// Marker trait for [Decode] tuple types
1348pub trait TupleDecode<R>: Decode<R> {}
1349
1350impl<W> Encode<W> for () {
1351    type Encoder = UnitCodec;
1352}
1353
1354impl<W> TupleEncode<W> for () {}
1355
1356impl<R> Decode<R> for () {
1357    type Decoder = UnitCodec;
1358    type ListDecoder = CoreVecDecoder<Self::Decoder>;
1359}
1360
1361impl<R> TupleDecode<R> for () {}
1362
1363macro_rules! impl_tuple_codec {
1364    ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+; $($ct:ident),+) => {
1365        impl<W, $($ct),+> Deferred<W> for TupleEncoder::<($($ct),+,)>
1366        where
1367            W: crate::Index<W> + Send + Sync + 'static,
1368            $($ct: Deferred<W> + Default + 'static),+
1369        {
1370            fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1371                let Self(($(mut $cn),+,)) = mem::take(self);
1372                let deferred = [ $($cn.take_deferred()),+ ];
1373                if deferred.iter().any(Option::is_some) {
1374                    Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1375                } else {
1376                    None
1377                }
1378            }
1379        }
1380
1381        impl<W, E, $($vt),+> Encode<W> for ($($vt),+,)
1382        where
1383            W: crate::Index<W> + Send + Sync + 'static,
1384            E: From<std::io::Error>,
1385            $(
1386                $vt: Encode<W>,
1387                $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1388            )+
1389        {
1390            type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1391        }
1392
1393        impl<W, E, $($vt),+> TupleEncode<W> for ($($vt),+,)
1394        where
1395            W: crate::Index<W> + Send + Sync + 'static,
1396            E: From<std::io::Error>,
1397            $(
1398                $vt: Encode<W>,
1399                $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E> + 'static,
1400            )+
1401        {
1402        }
1403
1404        impl<'a, W, E, $($vt),+> Encode<W> for &'a ($($vt),+,)
1405        where
1406            W: crate::Index<W> + Send + Sync + 'static,
1407            E: From<std::io::Error>,
1408            $(
1409                $vt: Encode<W>,
1410                $vt::Encoder: tokio_util::codec::Encoder<&'a $vt, Error = E> + 'static,
1411            )+
1412        {
1413            type Encoder = TupleEncoder::<($($vt::Encoder),+,)>;
1414        }
1415
1416        impl<R, $($vt),+> Deferred<Incoming<R>> for TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>
1417        where
1418            R: crate::Index<R> + Send + Sync + 'static,
1419            $($vt: Decode<R>),+
1420        {
1421            fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1422                let ($(mut $cn),+,) = mem::take(self).into_inner();
1423                let deferred = [ $($cn.take_deferred()),+ ];
1424                if deferred.iter().any(Option::is_some) {
1425                    Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path))))
1426                } else {
1427                    None
1428                }
1429            }
1430        }
1431
1432        impl<R, E, $($vt),+> Decode<R> for ($($vt),+,)
1433        where
1434            R: crate::Index<R> + Send + Sync + 'static,
1435            E: From<std::io::Error>,
1436            $(
1437                $vt: Decode<R> + Send + 'static,
1438                $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1439            )+
1440        {
1441            type Decoder = TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>;
1442            type ListDecoder = ListDecoder<Self::Decoder, R>;
1443        }
1444
1445        impl<R, E, $($vt),+> TupleDecode<R> for ($($vt),+,)
1446        where
1447            R: crate::Index<R> + Send + Sync + 'static,
1448            E: From<std::io::Error>,
1449            $(
1450                $vt: Decode<R> + Send + 'static,
1451                $vt::Decoder: tokio_util::codec::Decoder<Error = E> + Send + 'static,
1452            )+
1453        {
1454        }
1455    };
1456}
1457
1458impl_tuple_codec!(
1459    v0;
1460    V0;
1461    c0;
1462    C0
1463);
1464
1465impl_tuple_codec!(
1466    v0, v1;
1467    V0, V1;
1468    c0, c1;
1469    C0, C1
1470);
1471
1472impl_tuple_codec!(
1473    v0, v1, v2;
1474    V0, V1, V2;
1475    c0, c1, c2;
1476    C0, C1, C2
1477);
1478
1479impl_tuple_codec!(
1480    v0, v1, v2, v3;
1481    V0, V1, V2, V3;
1482    c0, c1, c2, c3;
1483    C0, C1, C2, C3
1484);
1485
1486impl_tuple_codec!(
1487    v0, v1, v2, v3, v4;
1488    V0, V1, V2, V3, V4;
1489    c0, c1, c2, c3, c4;
1490    C0, C1, C2, C3, C4
1491);
1492
1493impl_tuple_codec!(
1494    v0, v1, v2, v3, v4, v5;
1495    V0, V1, V2, V3, V4, V5;
1496    c0, c1, c2, c3, c4, c5;
1497    C0, C1, C2, C3, C4, C5
1498);
1499
1500impl_tuple_codec!(
1501    v0, v1, v2, v3, v4, v5, v6;
1502    V0, V1, V2, V3, V4, V5, V6;
1503    c0, c1, c2, c3, c4, c5, c6;
1504    C0, C1, C2, C3, C4, C5, C6
1505);
1506
1507impl_tuple_codec!(
1508    v0, v1, v2, v3, v4, v5, v6, v7;
1509    V0, V1, V2, V3, V4, V5, V6, V7;
1510    c0, c1, c2, c3, c4, c5, c6, c7;
1511    C0, C1, C2, C3, C4, C5, C6, C7
1512);
1513
1514impl_tuple_codec!(
1515    v0, v1, v2, v3, v4, v5, v6, v7, v8;
1516    V0, V1, V2, V3, V4, V5, V6, V7, V8;
1517    c0, c1, c2, c3, c4, c5, c6, c7, c8;
1518    C0, C1, C2, C3, C4, C5, C6, C7, C8
1519);
1520
1521impl_tuple_codec!(
1522    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9;
1523    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9;
1524    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9;
1525    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9
1526);
1527
1528impl_tuple_codec!(
1529    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10;
1530    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10;
1531    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10;
1532    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10
1533);
1534
1535impl_tuple_codec!(
1536    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11;
1537    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11;
1538    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11;
1539    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11
1540);
1541
1542impl_tuple_codec!(
1543    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12;
1544    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12;
1545    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
1546    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12
1547);
1548
1549impl_tuple_codec!(
1550    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13;
1551    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13;
1552    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13;
1553    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13
1554);
1555
1556impl_tuple_codec!(
1557    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14;
1558    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14;
1559    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14;
1560    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14
1561);
1562
1563impl_tuple_codec!(
1564    v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15;
1565    V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15;
1566    c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15;
1567    C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15
1568);
1569
1570/// Encoder for `future<T>`
1571pub struct FutureEncoder<W> {
1572    deferred: Option<DeferredFn<W>>,
1573}
1574
1575impl<W> Default for FutureEncoder<W> {
1576    fn default() -> Self {
1577        Self { deferred: None }
1578    }
1579}
1580
1581impl<W> Deferred<W> for FutureEncoder<W> {
1582    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1583        self.deferred.take()
1584    }
1585}
1586
1587impl<T, W, Fut> tokio_util::codec::Encoder<Fut> for FutureEncoder<W>
1588where
1589    T: Encode<W>,
1590    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1591    Fut: Future<Output = T> + Send + 'static,
1592    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1593{
1594    type Error = std::io::Error;
1595
1596    #[instrument(level = "trace", skip(self, item), fields(ty = "future"))]
1597    fn encode(&mut self, item: Fut, dst: &mut BytesMut) -> std::io::Result<()> {
1598        // TODO: Check if future is resolved
1599        dst.reserve(1);
1600        dst.put_u8(0x00);
1601        let span = Span::current();
1602        self.deferred = Some(Box::new(|mut w, path| {
1603            Box::pin(
1604                async move {
1605                    if !path.is_empty() {
1606                        w = w.index(&path).map_err(std::io::Error::other)?;
1607                    };
1608                    let item = item.await;
1609                    let mut enc = T::Encoder::default();
1610                    let mut buf = BytesMut::default();
1611                    enc.encode(item, &mut buf)?;
1612                    w.write_all(&buf).await?;
1613                    if let Some(f) = enc.take_deferred() {
1614                        f(w, Vec::default()).await
1615                    } else {
1616                        Ok(())
1617                    }
1618                }
1619                .instrument(span),
1620            )
1621        }));
1622        Ok(())
1623    }
1624}
1625
1626impl<T, W> Encode<W> for Pin<Box<dyn Future<Output = T> + Send>>
1627where
1628    T: Encode<W> + 'static,
1629    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1630    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1631{
1632    type Encoder = FutureEncoder<W>;
1633}
1634
1635/// Decoder for `future<T>`
1636pub struct FutureDecoder<T, R>
1637where
1638    T: Decode<R>,
1639{
1640    dec: OptionDecoder<T::Decoder>,
1641    deferred: Option<DeferredFn<Incoming<R>>>,
1642    _ty: PhantomData<T>,
1643}
1644
1645impl<T, R> Default for FutureDecoder<T, R>
1646where
1647    T: Decode<R>,
1648{
1649    fn default() -> Self {
1650        Self {
1651            dec: OptionDecoder::default(),
1652            deferred: None,
1653            _ty: PhantomData,
1654        }
1655    }
1656}
1657
1658impl<T, R> Deferred<Incoming<R>> for FutureDecoder<T, R>
1659where
1660    T: Decode<R>,
1661{
1662    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
1663        self.deferred.take()
1664    }
1665}
1666
1667impl<T, R> tokio_util::codec::Decoder for FutureDecoder<T, R>
1668where
1669    T: Decode<R> + Send + 'static,
1670    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1671    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1672{
1673    type Item = Pin<Box<dyn Future<Output = T> + Send>>;
1674    type Error = <T::Decoder as tokio_util::codec::Decoder>::Error;
1675
1676    #[instrument(level = "trace", skip(self), fields(ty = "future"))]
1677    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
1678        let Some(item) = self.dec.decode(src)? else {
1679            return Ok(None);
1680        };
1681        if let Some(item) = item {
1682            self.deferred = self.dec.take_deferred();
1683            return Ok(Some(Box::pin(async { item })));
1684        }
1685
1686        // future is pending
1687        let (tx, rx) = oneshot::channel();
1688        let dec = mem::take(&mut self.dec).into_inner();
1689        let span = Span::current();
1690        self.deferred = Some(Box::new(|mut r, path| {
1691            Box::pin(
1692                async move {
1693                    if !path.is_empty() {
1694                        r = r.index(&path).map_err(std::io::Error::other)?;
1695                    };
1696                    let mut dec = FramedRead::new(r, dec);
1697                    trace!(?path, "receiving future element");
1698                    let Some(item) = dec.next().await else {
1699                        return Err(std::io::ErrorKind::UnexpectedEof.into());
1700                    };
1701                    let item = item?;
1702                    if tx.send(item).is_err() {
1703                        debug!("future receiver closed, discard data");
1704                        return Ok(());
1705                    }
1706                    if let Some(rx) = dec.decoder_mut().take_deferred() {
1707                        let buf = mem::take(dec.read_buffer_mut());
1708                        let mut r = dec.into_inner();
1709                        if !r.buffer.is_empty() {
1710                            r.buffer.unsplit(buf);
1711                        } else {
1712                            r.buffer = buf;
1713                        }
1714                        rx(r, Vec::default()).await?;
1715                    }
1716                    Ok(())
1717                }
1718                .instrument(span),
1719            )
1720        }));
1721        Ok(Some(Box::pin(async {
1722            let Ok(ret) = rx.await else {
1723                error!("future I/O dropped");
1724                return pending().await;
1725            };
1726            ret
1727        })))
1728    }
1729}
1730
1731impl<T, R> Decode<R> for Pin<Box<dyn Future<Output = T> + Send>>
1732where
1733    T: Decode<R> + Send + 'static,
1734    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
1735    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
1736{
1737    type Decoder = FutureDecoder<T, R>;
1738    type ListDecoder = ListDecoder<Self::Decoder, R>;
1739}
1740
1741/// Encoder for `stream<T>`
1742pub struct StreamEncoder<W> {
1743    deferred: Option<DeferredFn<W>>,
1744}
1745
1746impl<W> Default for StreamEncoder<W> {
1747    fn default() -> Self {
1748        Self { deferred: None }
1749    }
1750}
1751
1752impl<W> Deferred<W> for StreamEncoder<W> {
1753    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1754        self.deferred.take()
1755    }
1756}
1757
1758impl<T, W, S> tokio_util::codec::Encoder<S> for StreamEncoder<W>
1759where
1760    T: Encode<W> + Send + 'static,
1761    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1762    S: Stream<Item = Vec<T>> + Send + Unpin + 'static,
1763    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1764{
1765    type Error = std::io::Error;
1766
1767    #[instrument(level = "trace", skip(self, items), fields(ty = "stream"))]
1768    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1769        // TODO: Check if stream is resolved
1770        dst.reserve(1);
1771        dst.put_u8(0x00);
1772        let span = Span::current();
1773        self.deferred = Some(Box::new(|mut w, path| {
1774            Box::pin(async move {
1775                if !path.is_empty() {
1776                    w = w.index(&path).map_err(std::io::Error::other)?;
1777                };
1778                let mut enc = T::Encoder::default();
1779                let mut buf = BytesMut::default();
1780                let mut tasks = JoinSet::new();
1781                let mut i = 0_u64;
1782                loop {
1783                    select! {
1784                        chunk = items.next() => {
1785                            let Some(chunk) = chunk else {
1786                                trace!("writing stream end");
1787                                buf.reserve(1);
1788                                buf.put_u8(0x00);
1789                                w.write_all(&buf).await?;
1790                                while let Some(res) = tasks.join_next().await {
1791                                    trace!(?res, "receiver task finished");
1792                                    res??;
1793                                }
1794                                return Ok(())
1795                            };
1796                            let n = u32::try_from(chunk.len()).map_err(|err| {
1797                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1798                            })?;
1799                            let end = i.checked_add(n.into()).ok_or_else(|| {
1800                                std::io::Error::new(
1801                                    std::io::ErrorKind::InvalidInput,
1802                                    "stream element index would overflow u64",
1803                                )
1804                            })?;
1805                            trace!(n, "encoding chunk length");
1806                            Leb128Encoder.encode(n, &mut buf)?;
1807                            trace!(i, buf = format!("{buf:02x?}"), "writing stream chunk items");
1808
1809                            buf.reserve(chunk.len());
1810                            for (i, item) in zip(i.., chunk) {
1811                                enc.encode(item, &mut buf)?;
1812                                if let Some(f) = enc.take_deferred() {
1813                                    let i = i
1814                                        .try_into()
1815                                        .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
1816                                    let w = w.index(&[i]).map_err(std::io::Error::other)?;
1817                                    trace!("spawning transmit task");
1818                                    tasks.spawn(f(w, Vec::default()));
1819                                }
1820                            }
1821                            i = end;
1822                        }
1823                        Some(res) = tasks.join_next() => {
1824                            trace!(?res, "receiver task finished");
1825                            res??;
1826                        }
1827                        res = w.write(&buf), if !buf.is_empty() => {
1828                            let n = res?;
1829                            trace!(?buf, n, "wrote bytes from buffer");
1830                            buf.advance(n);
1831                        }
1832                    }
1833                }
1834            }.instrument(span))
1835        }));
1836        Ok(())
1837    }
1838}
1839
1840impl<T, W> Encode<W> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
1841where
1842    T: Encode<W> + Send + 'static,
1843    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1844    std::io::Error: From<<T::Encoder as tokio_util::codec::Encoder<T>>::Error>,
1845{
1846    type Encoder = StreamEncoder<W>;
1847}
1848
1849/// Encoder for `stream<list<u8>>`
1850pub struct StreamEncoderBytes<W> {
1851    deferred: Option<DeferredFn<W>>,
1852}
1853
1854impl<W> Default for StreamEncoderBytes<W> {
1855    fn default() -> Self {
1856        Self { deferred: None }
1857    }
1858}
1859
1860impl<W> Deferred<W> for StreamEncoderBytes<W> {
1861    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1862        self.deferred.take()
1863    }
1864}
1865
1866impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderBytes<W>
1867where
1868    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1869    S: Stream<Item = Bytes> + Send + Unpin + 'static,
1870{
1871    type Error = std::io::Error;
1872
1873    #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1874    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1875        // TODO: Check if reader is resolved
1876        dst.reserve(1);
1877        dst.put_u8(0x00);
1878        self.deferred = Some(Box::new(|mut w, path| {
1879            Box::pin(async move {
1880                if !path.is_empty() {
1881                    w = w.index(&path).map_err(std::io::Error::other)?;
1882                };
1883                let mut buf = BytesMut::default();
1884                loop {
1885                    select! {
1886                        chunk = items.next() => {
1887                            let Some(chunk) = chunk else {
1888                                trace!("writing stream end");
1889                                buf.reserve(1);
1890                                buf.put_u8(0x00);
1891                                return w.write_all(&buf).await
1892                            };
1893                            let n = u32::try_from(chunk.len()).map_err(|err| {
1894                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1895                            })?;
1896                            trace!(n, "encoding chunk length");
1897                            Leb128Encoder.encode(n, &mut buf)?;
1898                            buf.extend_from_slice(&chunk);
1899                        }
1900                        res = w.write(&buf), if !buf.is_empty() => {
1901                            let n = res?;
1902                            buf.advance(n);
1903                        }
1904                    }
1905                }
1906            })
1907        }));
1908        Ok(())
1909    }
1910}
1911
1912impl<W> Encode<W> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
1913where
1914    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1915{
1916    type Encoder = StreamEncoderBytes<W>;
1917}
1918
1919/// Encoder for `stream<list<u8>>` with [`AsyncRead`] support
1920pub struct StreamEncoderRead<W> {
1921    deferred: Option<DeferredFn<W>>,
1922}
1923
1924impl<W> Default for StreamEncoderRead<W> {
1925    fn default() -> Self {
1926        Self { deferred: None }
1927    }
1928}
1929
1930impl<W> Deferred<W> for StreamEncoderRead<W> {
1931    fn take_deferred(&mut self) -> Option<DeferredFn<W>> {
1932        self.deferred.take()
1933    }
1934}
1935
1936impl<W, S> tokio_util::codec::Encoder<S> for StreamEncoderRead<W>
1937where
1938    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1939    S: AsyncRead + Send + Unpin + 'static,
1940{
1941    type Error = std::io::Error;
1942
1943    #[instrument(level = "trace", skip(self, items), fields(ty = "stream<u8>"))]
1944    fn encode(&mut self, mut items: S, dst: &mut BytesMut) -> std::io::Result<()> {
1945        // TODO: Check if reader is resolved
1946        dst.reserve(1);
1947        dst.put_u8(0x00);
1948        self.deferred = Some(Box::new(|mut w, path| {
1949            Box::pin(async move {
1950                if !path.is_empty() {
1951                    w = w.index(&path).map_err(std::io::Error::other)?;
1952                };
1953                let mut buf = BytesMut::default();
1954                let mut chunk = BytesMut::default();
1955                loop {
1956                    select! {
1957                        res = items.read_buf(&mut chunk) => {
1958                            let n = res?;
1959                            if n == 0 {
1960                                trace!("writing stream end");
1961                                buf.reserve(1);
1962                                buf.put_u8(0x00);
1963                                return w.write_all(&buf).await
1964                            }
1965                            let n = u32::try_from(n).map_err(|err| {
1966                                std::io::Error::new(std::io::ErrorKind::InvalidInput, err)
1967                            })?;
1968                            trace!(n, "encoding chunk length");
1969                            Leb128Encoder.encode(n, &mut buf)?;
1970                            buf.extend_from_slice(&chunk);
1971                            chunk.clear();
1972                        }
1973                        res = w.write(&buf), if !buf.is_empty() => {
1974                            let n = res?;
1975                            buf.advance(n);
1976                        }
1977                    }
1978                }
1979            })
1980        }));
1981        Ok(())
1982    }
1983}
1984
1985impl<W> Encode<W> for Pin<Box<dyn AsyncRead + Send>>
1986where
1987    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1988{
1989    type Encoder = StreamEncoderRead<W>;
1990}
1991
1992impl<T, W> Encode<W> for std::io::Cursor<T>
1993where
1994    T: AsRef<[u8]> + Send + Unpin + 'static,
1995    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
1996{
1997    type Encoder = StreamEncoderRead<W>;
1998}
1999
2000impl<W> Encode<W> for tokio::io::Empty
2001where
2002    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2003{
2004    type Encoder = StreamEncoderRead<W>;
2005}
2006
2007#[cfg(feature = "io-std")]
2008impl<W> Encode<W> for tokio::io::Stdin
2009where
2010    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2011{
2012    type Encoder = StreamEncoderRead<W>;
2013}
2014
2015#[cfg(feature = "fs")]
2016impl<W> Encode<W> for tokio::fs::File
2017where
2018    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2019{
2020    type Encoder = StreamEncoderRead<W>;
2021}
2022
2023#[cfg(feature = "net")]
2024impl<W> Encode<W> for tokio::net::TcpStream
2025where
2026    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2027{
2028    type Encoder = StreamEncoderRead<W>;
2029}
2030
2031#[cfg(all(unix, feature = "net"))]
2032impl<W> Encode<W> for tokio::net::UnixStream
2033where
2034    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2035{
2036    type Encoder = StreamEncoderRead<W>;
2037}
2038
2039#[cfg(all(unix, feature = "net"))]
2040impl<W> Encode<W> for tokio::net::unix::pipe::Receiver
2041where
2042    W: AsyncWrite + crate::Index<W> + Send + Sync + Unpin + 'static,
2043{
2044    type Encoder = StreamEncoderRead<W>;
2045}
2046
2047/// Decoder for `stream<T>`
2048pub struct StreamDecoder<T, R>
2049where
2050    T: Decode<R>,
2051{
2052    dec: T::ListDecoder,
2053    deferred: Option<DeferredFn<Incoming<R>>>,
2054    _ty: PhantomData<T>,
2055}
2056
2057impl<T, R> Default for StreamDecoder<T, R>
2058where
2059    T: Decode<R>,
2060{
2061    fn default() -> Self {
2062        Self {
2063            dec: T::ListDecoder::default(),
2064            deferred: None,
2065            _ty: PhantomData,
2066        }
2067    }
2068}
2069
2070impl<T, R> Deferred<Incoming<R>> for StreamDecoder<T, R>
2071where
2072    T: Decode<R>,
2073{
2074    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2075        self.deferred.take()
2076    }
2077}
2078
2079#[instrument(level = "trace", skip(dec, r, tx), ret)]
2080async fn handle_deferred_stream<C, T, R>(
2081    dec: C,
2082    mut r: Incoming<R>,
2083    path: Vec<usize>,
2084    tx: mpsc::Sender<Vec<T>>,
2085) -> std::io::Result<()>
2086where
2087    C: tokio_util::codec::Decoder<Item = T> + Deferred<Incoming<R>>,
2088    R: AsyncRead + crate::Index<R> + Send + Unpin + 'static,
2089    std::io::Error: From<C::Error>,
2090{
2091    let dec = ListDecoder::new(dec);
2092    if !path.is_empty() {
2093        r = r.index(&path).map_err(std::io::Error::other)?;
2094    };
2095    let mut framed = FramedRead::new(r, dec);
2096    let mut tasks = JoinSet::new();
2097    let mut i = 0_usize;
2098    loop {
2099        trace!("receiving pending stream chunk");
2100        select! {
2101            Some(chunk) = framed.next() => {
2102                let chunk = chunk?;
2103                if chunk.is_empty() {
2104                    trace!("received stream end");
2105                    while let Some(res) = tasks.join_next().await {
2106                        res??;
2107                    }
2108                    return Ok(())
2109                }
2110                let end = i.checked_add(chunk.len()).ok_or_else(|| {
2111                    std::io::Error::new(
2112                        std::io::ErrorKind::InvalidInput,
2113                        "stream element index would overflow usize",
2114                    )
2115                })?;
2116                trace!(i, end, "received stream chunk");
2117                if tx.send(chunk).await.is_err() {
2118                    debug!("stream receiver closed, discard data");
2119                    return Ok(())
2120                }
2121                for (i, deferred) in zip(i.., mem::take(&mut framed.decoder_mut().deferred)) {
2122                    if let Some(deferred) = deferred {
2123                        let r = framed.get_ref().index(&[i]).map_err(std::io::Error::other)?;
2124                        trace!("spawning receive task");
2125                        tasks.spawn(deferred(r, Vec::default()));
2126                    }
2127                }
2128                i = end;
2129            },
2130            Some(res) = tasks.join_next() => {
2131                trace!(?res, "receiver task finished");
2132                res??;
2133            }
2134            else => {
2135                return Ok(());
2136            }
2137        }
2138    }
2139}
2140
2141impl<T, R> tokio_util::codec::Decoder for StreamDecoder<T, R>
2142where
2143    T: Decode<R> + Send + 'static,
2144    T::ListDecoder: Deferred<Incoming<R>>,
2145    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2146    <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2147    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2148{
2149    type Item = Pin<Box<dyn Stream<Item = Vec<T>> + Send>>;
2150    type Error = <<T as Decode<R>>::ListDecoder as tokio_util::codec::Decoder>::Error;
2151
2152    #[instrument(level = "trace", skip(self), fields(ty = "stream"))]
2153    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2154        let Some(chunk) = self.dec.decode(src)? else {
2155            return Ok(None);
2156        };
2157        if !chunk.is_empty() {
2158            self.deferred = self.dec.take_deferred();
2159            return Ok(Some(Box::pin(stream::iter([chunk]))));
2160        }
2161
2162        // stream is pending
2163        let (tx, rx) = mpsc::channel(128);
2164        self.deferred = Some(Box::new(|r, path| {
2165            Box::pin(
2166                async move { handle_deferred_stream(T::Decoder::default(), r, path, tx).await },
2167            )
2168        }));
2169        Ok(Some(Box::pin(ReceiverStream::new(rx))))
2170    }
2171}
2172
2173impl<T, R> Decode<R> for Pin<Box<dyn Stream<Item = Vec<T>> + Send>>
2174where
2175    T: Decode<R> + Send + 'static,
2176    T::ListDecoder: Deferred<Incoming<R>> + Send,
2177    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2178    <T::Decoder as tokio_util::codec::Decoder>::Error: Send,
2179    std::io::Error: From<<T::Decoder as tokio_util::codec::Decoder>::Error>,
2180{
2181    type Decoder = StreamDecoder<T, R>;
2182    type ListDecoder = ListDecoder<Self::Decoder, R>;
2183}
2184
2185/// Decoder for `stream<list<u8>>`
2186pub struct StreamDecoderBytes<R> {
2187    dec: CoreVecDecoderBytes,
2188    deferred: Option<DeferredFn<Incoming<R>>>,
2189}
2190
2191impl<R> Default for StreamDecoderBytes<R> {
2192    fn default() -> Self {
2193        Self {
2194            dec: CoreVecDecoderBytes::default(),
2195            deferred: None,
2196        }
2197    }
2198}
2199
2200impl<R> Deferred<Incoming<R>> for StreamDecoderBytes<R> {
2201    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2202        self.deferred.take()
2203    }
2204}
2205
2206impl<R> tokio_util::codec::Decoder for StreamDecoderBytes<R>
2207where
2208    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2209{
2210    type Item = Pin<Box<dyn Stream<Item = Bytes> + Send>>;
2211    type Error = std::io::Error;
2212
2213    #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2214    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2215        let Some(chunk) = self.dec.decode(src)? else {
2216            return Ok(None);
2217        };
2218        if !chunk.is_empty() {
2219            return Ok(Some(Box::pin(stream::iter([chunk]))));
2220        }
2221
2222        // stream is pending
2223        let (tx, rx) = mpsc::channel(128);
2224        let dec = mem::take(&mut self.dec);
2225        let span = Span::current();
2226        self.deferred = Some(Box::new(|mut r, path| {
2227            Box::pin(
2228                async move {
2229                    if !path.is_empty() {
2230                        r = r.index(&path).map_err(std::io::Error::other)?;
2231                    };
2232                    let mut framed = FramedRead::new(r, dec);
2233                    trace!(?path, "receiving pending byte stream chunk");
2234                    while let Some(chunk) = framed.next().await {
2235                        let chunk = chunk?;
2236                        if chunk.is_empty() {
2237                            trace!("received stream end");
2238                            return Ok(());
2239                        }
2240                        trace!(?chunk, "received pending byte stream chunk");
2241                        if tx.send(chunk).await.is_err() {
2242                            debug!("stream receiver closed, discard data");
2243                            return Ok(());
2244                        }
2245                    }
2246                    Ok(())
2247                }
2248                .instrument(span),
2249            )
2250        }));
2251        Ok(Some(Box::pin(ReceiverStream::new(rx))))
2252    }
2253}
2254
2255impl<R> Decode<R> for Pin<Box<dyn Stream<Item = Bytes> + Send>>
2256where
2257    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2258{
2259    type Decoder = StreamDecoderBytes<R>;
2260    type ListDecoder = ListDecoder<Self::Decoder, R>;
2261}
2262
2263/// Decoder for `stream<list<u8>>` with [`AsyncRead`] support
2264pub struct StreamDecoderRead<R> {
2265    dec: CoreVecDecoderBytes,
2266    deferred: Option<DeferredFn<Incoming<R>>>,
2267}
2268
2269impl<R> Default for StreamDecoderRead<R> {
2270    fn default() -> Self {
2271        Self {
2272            dec: CoreVecDecoderBytes::default(),
2273            deferred: None,
2274        }
2275    }
2276}
2277
2278impl<R> Deferred<Incoming<R>> for StreamDecoderRead<R> {
2279    fn take_deferred(&mut self) -> Option<DeferredFn<Incoming<R>>> {
2280        self.deferred.take()
2281    }
2282}
2283
2284impl<R> tokio_util::codec::Decoder for StreamDecoderRead<R>
2285where
2286    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2287{
2288    type Item = Pin<Box<dyn AsyncRead + Send>>;
2289    type Error = std::io::Error;
2290
2291    #[instrument(level = "trace", skip(self), fields(ty = "stream<u8>"))]
2292    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
2293        let Some(chunk) = self.dec.decode(src)? else {
2294            return Ok(None);
2295        };
2296        if !chunk.is_empty() {
2297            return Ok(Some(Box::pin(std::io::Cursor::new(chunk))));
2298        }
2299
2300        // stream is pending
2301        let (tx, rx) = mpsc::channel(128);
2302        let dec = mem::take(&mut self.dec);
2303        self.deferred = Some(Box::new(|mut r, path| {
2304            Box::pin(async move {
2305                if !path.is_empty() {
2306                    r = r.index(&path).map_err(std::io::Error::other)?;
2307                };
2308                let mut framed = FramedRead::new(r, dec);
2309                trace!("receiving pending byte stream chunk");
2310                while let Some(chunk) = framed.next().await {
2311                    let chunk = chunk?;
2312                    if chunk.is_empty() {
2313                        trace!("received stream end");
2314                        return Ok(());
2315                    }
2316                    trace!(?chunk, "received byte stream chunk");
2317                    if tx.send(std::io::Result::Ok(chunk)).await.is_err() {
2318                        debug!("stream receiver closed, discard data");
2319                        return Ok(());
2320                    }
2321                }
2322                Ok(())
2323            })
2324        }));
2325        Ok(Some(Box::pin(StreamReader::new(ReceiverStream::new(rx)))))
2326    }
2327}
2328
2329impl<R> Decode<R> for Pin<Box<dyn AsyncRead + Send>>
2330where
2331    R: AsyncRead + crate::Index<R> + Send + Sync + Unpin + 'static,
2332{
2333    type Decoder = StreamDecoderRead<R>;
2334    type ListDecoder = ListDecoder<Self::Decoder, R>;
2335}
2336
2337#[cfg(test)]
2338mod tests {
2339    use anyhow::bail;
2340
2341    use super::*;
2342
2343    struct NoopStream;
2344
2345    impl crate::Index<Self> for NoopStream {
2346        fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
2347            panic!("index should not be called with path {path:?}")
2348        }
2349    }
2350
2351    #[test_log::test(tokio::test)]
2352    async fn codec() -> anyhow::Result<()> {
2353        let mut buf = BytesMut::new();
2354        let mut enc = <(u8, u32) as Encode<NoopStream>>::Encoder::default();
2355        enc.encode((0x42, 0x42), &mut buf)?;
2356        if let Some(_f) = Deferred::<NoopStream>::take_deferred(&mut enc) {
2357            bail!("no deferred write should have been returned");
2358        }
2359        assert_eq!(buf.as_ref(), b"\x42\x42");
2360        Ok(())
2361    }
2362
2363    #[test]
2364    fn canonical_nan_f32() {
2365        let mut enc = <f32 as Encode<NoopStream>>::Encoder::default();
2366
2367        // A non-canonical (e.g. signalling) `NaN` is canonicalized on encode.
2368        let mut buf = BytesMut::new();
2369        enc.encode(f32::from_bits(0x7f80_0001), &mut buf).unwrap();
2370        assert_eq!(buf.as_ref(), CANONICAL_NAN_F32.to_le_bytes());
2371
2372        // A negative `NaN` is canonicalized to the (positive) canonical `NaN`.
2373        let mut buf = BytesMut::new();
2374        enc.encode(f32::from_bits(0xffc0_0000), &mut buf).unwrap();
2375        assert_eq!(buf.as_ref(), CANONICAL_NAN_F32.to_le_bytes());
2376
2377        // Non-`NaN` values are encoded unchanged.
2378        let mut buf = BytesMut::new();
2379        enc.encode(1.5_f32, &mut buf).unwrap();
2380        assert_eq!(buf.as_ref(), 1.5_f32.to_bits().to_le_bytes());
2381    }
2382
2383    #[test]
2384    fn canonical_nan_f64() {
2385        let mut enc = <f64 as Encode<NoopStream>>::Encoder::default();
2386
2387        let mut buf = BytesMut::new();
2388        enc.encode(f64::from_bits(0x7ff0_0000_0000_0001), &mut buf)
2389            .unwrap();
2390        assert_eq!(buf.as_ref(), CANONICAL_NAN_F64.to_le_bytes());
2391
2392        let mut buf = BytesMut::new();
2393        enc.encode(f64::from_bits(0xfff8_0000_0000_0000), &mut buf)
2394            .unwrap();
2395        assert_eq!(buf.as_ref(), CANONICAL_NAN_F64.to_le_bytes());
2396
2397        let mut buf = BytesMut::new();
2398        enc.encode(1.5_f64, &mut buf).unwrap();
2399        assert_eq!(buf.as_ref(), 1.5_f64.to_bits().to_le_bytes());
2400    }
2401}