Skip to main content

wrpc_runtime_wasmtime/
codec.rs

1use core::future::Future;
2use core::iter::zip;
3use core::ops::{BitOrAssign, Shl};
4use core::pin::{pin, Pin};
5
6use std::collections::HashSet;
7
8use bytes::{BufMut as _, BytesMut};
9use futures::stream::FuturesUnordered;
10use futures::TryStreamExt as _;
11use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
12use tokio_util::codec::{Encoder, FramedRead};
13use tokio_util::compat::FuturesAsyncReadCompatExt as _;
14use tracing::{error, instrument, trace, warn};
15use uuid::Uuid;
16use wasm_tokio::cm::AsyncReadValue as _;
17use wasm_tokio::{
18    AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder,
19    CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
20};
21use wasmtime::bail;
22use wasmtime::component::types::{Case, Field};
23use wasmtime::component::{ResourceType, Type, Val};
24use wasmtime::error::Context as _;
25use wasmtime::{AsContextMut, StoreContextMut};
26use wasmtime_wasi::p2::pipe::AsyncReadStream;
27use wasmtime_wasi::p2::{DynInputStream, StreamError};
28use wrpc_transport::ListDecoderU8;
29
30use crate::{RemoteResource, WrpcView};
31
32pub struct ValEncoder<'a, T: 'static, W> {
33    pub store: StoreContextMut<'a, T>,
34    pub ty: &'a Type,
35    pub resources: &'a [ResourceType],
36    /// Resource types bridged to wRPC `stream<u8>` (`wasi:io` `input-stream`/
37    /// `output-stream`), identified by their possibly-uninstantiated type.
38    pub io_streams: &'a [ResourceType],
39    pub deferred: Option<
40        Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
41    >,
42}
43
44impl<T, W> ValEncoder<'_, T, W> {
45    #[must_use]
46    pub fn new<'a>(
47        store: StoreContextMut<'a, T>,
48        ty: &'a Type,
49        resources: &'a [ResourceType],
50        io_streams: &'a [ResourceType],
51    ) -> ValEncoder<'a, T, W> {
52        ValEncoder {
53            store,
54            ty,
55            resources,
56            io_streams,
57            deferred: None,
58        }
59    }
60
61    pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> {
62        ValEncoder {
63            store: self.store.as_context_mut(),
64            ty,
65            resources: self.resources,
66            io_streams: self.io_streams,
67            deferred: None,
68        }
69    }
70}
71
72fn find_enum_discriminant<'a, T>(
73    iter: impl IntoIterator<Item = T>,
74    names: impl IntoIterator<Item = &'a str>,
75    discriminant: &str,
76) -> wasmtime::Result<T> {
77    zip(iter, names)
78        .find_map(|(i, name)| (name == discriminant).then_some(i))
79        .context("unknown enum discriminant")
80}
81
82fn find_variant_discriminant<'a, T>(
83    iter: impl IntoIterator<Item = T>,
84    cases: impl IntoIterator<Item = Case<'a>>,
85    discriminant: &str,
86) -> wasmtime::Result<(T, Option<Type>)> {
87    zip(iter, cases)
88        .find_map(|(i, Case { name, ty })| (name == discriminant).then_some((i, ty)))
89        .context("unknown variant discriminant")
90}
91
92#[inline]
93fn flag_bits<'a, T: BitOrAssign + Shl<u8, Output = T> + From<u8>>(
94    names: impl IntoIterator<Item = &'a str>,
95    flags: impl IntoIterator<Item = &'a str>,
96) -> T {
97    let mut v = T::from(0);
98    let flags: HashSet<&str> = flags.into_iter().collect();
99    for (i, name) in zip(0u8.., names) {
100        if flags.contains(name) {
101            v |= T::from(1) << i;
102        }
103    }
104    v
105}
106
107async fn write_deferred<W, I>(w: W, deferred: I) -> wasmtime::Result<()>
108where
109    W: wrpc_transport::Index<W> + Sync + Send + 'static,
110    I: IntoIterator,
111    I::IntoIter: ExactSizeIterator<
112        Item = Option<
113            Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
114        >,
115    >,
116{
117    let mut futs: FuturesUnordered<_> = zip(0.., deferred)
118        .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f)))
119        .map(|(w, f)| async move {
120            let w = w.map_err(wasmtime::Error::from_anyhow)?;
121            f(w).await
122        })
123        .collect();
124    while let Some(()) = futs.try_next().await? {}
125    Ok(())
126}
127
128impl<T, W> Encoder<&Val> for ValEncoder<'_, T, W>
129where
130    T: WrpcView,
131    W: AsyncWrite + wrpc_transport::Index<W> + Sync + Send + 'static,
132{
133    type Error = wasmtime::Error;
134
135    #[allow(clippy::too_many_lines)]
136    #[instrument(level = "trace", skip(self))]
137    fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> {
138        match (v, self.ty) {
139            (Val::Bool(v), Type::Bool) => {
140                dst.reserve(1);
141                dst.put_u8((*v).into());
142                Ok(())
143            }
144            (Val::S8(v), Type::S8) => {
145                dst.reserve(1);
146                dst.put_i8(*v);
147                Ok(())
148            }
149            (Val::U8(v), Type::U8) => {
150                dst.reserve(1);
151                dst.put_u8(*v);
152                Ok(())
153            }
154            (Val::S16(v), Type::S16) => Leb128Encoder
155                .encode(*v, dst)
156                .context("failed to encode s16"),
157            (Val::U16(v), Type::U16) => Leb128Encoder
158                .encode(*v, dst)
159                .context("failed to encode u16"),
160            (Val::S32(v), Type::S32) => Leb128Encoder
161                .encode(*v, dst)
162                .context("failed to encode s32"),
163            (Val::U32(v), Type::U32) => Leb128Encoder
164                .encode(*v, dst)
165                .context("failed to encode u32"),
166            (Val::S64(v), Type::S64) => Leb128Encoder
167                .encode(*v, dst)
168                .context("failed to encode s64"),
169            (Val::U64(v), Type::U64) => Leb128Encoder
170                .encode(*v, dst)
171                .context("failed to encode u64"),
172            (Val::Float32(v), Type::Float32) => {
173                dst.reserve(4);
174                dst.put_f32_le(*v);
175                Ok(())
176            }
177            (Val::Float64(v), Type::Float64) => {
178                dst.reserve(8);
179                dst.put_f64_le(*v);
180                Ok(())
181            }
182            (Val::Char(v), Type::Char) => {
183                Utf8Codec.encode(*v, dst).context("failed to encode char")
184            }
185            (Val::String(v), Type::String) => CoreNameEncoder
186                .encode(v.as_str(), dst)
187                .context("failed to encode string"),
188            (Val::List(vs), Type::List(ty)) => {
189                let ty = ty.ty();
190                let n = u32::try_from(vs.len()).context("list length does not fit in u32")?;
191                dst.reserve(5 + vs.len());
192                Leb128Encoder
193                    .encode(n, dst)
194                    .context("failed to encode list length")?;
195                let mut deferred = Vec::with_capacity(vs.len());
196                for v in vs {
197                    let mut enc = self.with_type(&ty);
198                    enc.encode(v, dst)
199                        .context("failed to encode list element")?;
200                    deferred.push(enc.deferred);
201                }
202                if deferred.iter().any(Option::is_some) {
203                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
204                }
205                Ok(())
206            }
207            (Val::Record(vs), Type::Record(ty)) => {
208                dst.reserve(vs.len());
209                let mut deferred = Vec::with_capacity(vs.len());
210                for ((name, v), Field { ref ty, .. }) in zip(vs, ty.fields()) {
211                    let mut enc = self.with_type(ty);
212                    enc.encode(v, dst)
213                        .with_context(|| format!("failed to encode `{name}` field"))?;
214                    deferred.push(enc.deferred);
215                }
216                if deferred.iter().any(Option::is_some) {
217                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
218                }
219                Ok(())
220            }
221            (Val::Tuple(vs), Type::Tuple(ty)) => {
222                dst.reserve(vs.len());
223                let mut deferred = Vec::with_capacity(vs.len());
224                for (v, ref ty) in zip(vs, ty.types()) {
225                    let mut enc = self.with_type(ty);
226                    enc.encode(v, dst)
227                        .context("failed to encode tuple element")?;
228                    deferred.push(enc.deferred);
229                }
230                if deferred.iter().any(Option::is_some) {
231                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
232                }
233                Ok(())
234            }
235            (Val::Variant(discriminant, v), Type::Variant(ty)) => {
236                let cases = ty.cases();
237                let ty = match cases.len() {
238                    ..=0x0000_00ff => {
239                        let (discriminant, ty) =
240                            find_variant_discriminant(0u8.., cases, discriminant)?;
241                        dst.reserve(2 + usize::from(v.is_some()));
242                        Leb128Encoder.encode(discriminant, dst)?;
243                        ty
244                    }
245                    0x0000_0100..=0x0000_ffff => {
246                        let (discriminant, ty) =
247                            find_variant_discriminant(0u16.., cases, discriminant)?;
248                        dst.reserve(3 + usize::from(v.is_some()));
249                        Leb128Encoder.encode(discriminant, dst)?;
250                        ty
251                    }
252                    0x0001_0000..=0x00ff_ffff => {
253                        let (discriminant, ty) =
254                            find_variant_discriminant(0u32.., cases, discriminant)?;
255                        dst.reserve(4 + usize::from(v.is_some()));
256                        Leb128Encoder.encode(discriminant, dst)?;
257                        ty
258                    }
259                    0x0100_0000..=0xffff_ffff => {
260                        let (discriminant, ty) =
261                            find_variant_discriminant(0u32.., cases, discriminant)?;
262                        dst.reserve(5 + usize::from(v.is_some()));
263                        Leb128Encoder.encode(discriminant, dst)?;
264                        ty
265                    }
266                    0x1_0000_0000.. => bail!("case count does not fit in u32"),
267                };
268                if let Some(v) = v {
269                    let ty = ty.context("type missing for variant")?;
270                    let mut enc = self.with_type(&ty);
271                    enc.encode(v, dst)
272                        .context("failed to encode variant value")?;
273                    if let Some(f) = enc.deferred {
274                        self.deferred = Some(f);
275                    }
276                }
277                Ok(())
278            }
279            (Val::Enum(discriminant), Type::Enum(ty)) => {
280                let names = ty.names();
281                match names.len() {
282                    ..=0x0000_00ff => {
283                        let discriminant = find_enum_discriminant(0u8.., names, discriminant)?;
284                        dst.reserve(2);
285                        Leb128Encoder.encode(discriminant, dst)?;
286                    }
287                    0x0000_0100..=0x0000_ffff => {
288                        let discriminant = find_enum_discriminant(0u16.., names, discriminant)?;
289                        dst.reserve(3);
290                        Leb128Encoder.encode(discriminant, dst)?;
291                    }
292                    0x0001_0000..=0x00ff_ffff => {
293                        let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
294                        dst.reserve(4);
295                        Leb128Encoder.encode(discriminant, dst)?;
296                    }
297                    0x0100_0000..=0xffff_ffff => {
298                        let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
299                        dst.reserve(5);
300                        Leb128Encoder.encode(discriminant, dst)?;
301                    }
302                    0x1_0000_0000.. => bail!("name count does not fit in u32"),
303                }
304                Ok(())
305            }
306            (Val::Option(None), Type::Option(_)) => {
307                dst.reserve(1);
308                dst.put_u8(0);
309                Ok(())
310            }
311            (Val::Option(Some(v)), Type::Option(ty)) => {
312                dst.reserve(2);
313                dst.put_u8(1);
314                let ty = ty.ty();
315                let mut enc = self.with_type(&ty);
316                enc.encode(v, dst)
317                    .context("failed to encode `option::some` value")?;
318                if let Some(f) = enc.deferred {
319                    self.deferred = Some(f);
320                }
321                Ok(())
322            }
323            (Val::Result(v), Type::Result(ty)) => match v {
324                Ok(v) => match (v, ty.ok()) {
325                    (Some(v), Some(ty)) => {
326                        dst.reserve(2);
327                        dst.put_u8(0);
328                        let mut enc = self.with_type(&ty);
329                        enc.encode(v, dst)
330                            .context("failed to encode `result::ok` value")?;
331                        if let Some(f) = enc.deferred {
332                            self.deferred = Some(f);
333                        }
334                        Ok(())
335                    }
336                    (Some(_v), None) => bail!("`result::ok` value of unknown type"),
337                    (None, Some(_ty)) => bail!("`result::ok` value missing"),
338                    (None, None) => {
339                        dst.reserve(1);
340                        dst.put_u8(0);
341                        Ok(())
342                    }
343                },
344                Err(v) => match (v, ty.err()) {
345                    (Some(v), Some(ty)) => {
346                        dst.reserve(2);
347                        dst.put_u8(1);
348                        let mut enc = self.with_type(&ty);
349                        enc.encode(v, dst)
350                            .context("failed to encode `result::err` value")?;
351                        if let Some(f) = enc.deferred {
352                            self.deferred = Some(f);
353                        }
354                        Ok(())
355                    }
356                    (Some(_v), None) => bail!("`result::err` value of unknown type"),
357                    (None, Some(_ty)) => bail!("`result::err` value missing"),
358                    (None, None) => {
359                        dst.reserve(1);
360                        dst.put_u8(1);
361                        Ok(())
362                    }
363                },
364            },
365            (Val::Flags(vs), Type::Flags(ty)) => {
366                let names = ty.names();
367                let vs = vs.iter().map(String::as_str);
368                match names.len() {
369                    ..=8 => {
370                        dst.reserve(1);
371                        dst.put_u8(flag_bits(names, vs));
372                    }
373                    9..=16 => {
374                        dst.reserve(2);
375                        dst.put_u16_le(flag_bits(names, vs));
376                    }
377                    17..=24 => {
378                        dst.reserve(3);
379                        dst.put_slice(&u32::to_le_bytes(flag_bits(names, vs))[..3]);
380                    }
381                    25..=32 => {
382                        dst.reserve(4);
383                        dst.put_u32_le(flag_bits(names, vs));
384                    }
385                    33..=40 => {
386                        dst.reserve(5);
387                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..5]);
388                    }
389                    41..=48 => {
390                        dst.reserve(6);
391                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..6]);
392                    }
393                    49..=56 => {
394                        dst.reserve(7);
395                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..7]);
396                    }
397                    57..=64 => {
398                        dst.reserve(8);
399                        dst.put_u64_le(flag_bits(names, vs));
400                    }
401                    65..=72 => {
402                        dst.reserve(9);
403                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..9]);
404                    }
405                    73..=80 => {
406                        dst.reserve(10);
407                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..10]);
408                    }
409                    81..=88 => {
410                        dst.reserve(11);
411                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..11]);
412                    }
413                    89..=96 => {
414                        dst.reserve(12);
415                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..12]);
416                    }
417                    97..=104 => {
418                        dst.reserve(13);
419                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..13]);
420                    }
421                    105..=112 => {
422                        dst.reserve(14);
423                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..14]);
424                    }
425                    113..=120 => {
426                        dst.reserve(15);
427                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..15]);
428                    }
429                    121..=128 => {
430                        dst.reserve(16);
431                        dst.put_u128_le(flag_bits(names, vs));
432                    }
433                    bits @ 129.. => {
434                        let mut cap = bits / 8;
435                        if bits % 8 != 0 {
436                            cap = cap.saturating_add(1);
437                        }
438                        let mut buf = vec![0; cap];
439                        let flags: HashSet<&str> = vs.into_iter().collect();
440                        for (i, name) in names.enumerate() {
441                            if flags.contains(name) {
442                                buf[i / 8] |= 1 << (i % 8);
443                            }
444                        }
445                        dst.extend_from_slice(&buf);
446                    }
447                }
448                Ok(())
449            }
450            (Val::Resource(resource), Type::Own(ty) | Type::Borrow(ty)) => {
451                if *ty == ResourceType::host::<DynInputStream>() || self.io_streams.contains(ty) {
452                    let stream = resource
453                        .try_into_resource::<DynInputStream>(&mut self.store)
454                        .context("failed to downcast `wasi:io/input-stream`")?;
455                    if stream.owned() {
456                        let mut stream = self
457                            .store
458                            .data_mut()
459                            .wrpc()
460                            .table
461                            .delete(stream)
462                            .context("failed to delete input stream")?;
463                        self.deferred = Some(Box::new(|w| {
464                            Box::pin(async move {
465                                let mut w = pin!(w);
466                                loop {
467                                    stream.ready().await;
468                                    match stream.read(8096) {
469                                        Ok(buf) => {
470                                            let mut chunk = BytesMut::with_capacity(
471                                                buf.len().saturating_add(5),
472                                            );
473                                            CoreVecEncoderBytes
474                                                .encode(buf, &mut chunk)
475                                                .context("failed to encode input stream chunk")?;
476                                            w.write_all(&chunk).await?;
477                                        }
478                                        Err(StreamError::Closed) => {
479                                            w.write_all(&[0x00]).await?;
480                                        }
481                                        Err(err) => return Err(err.into()),
482                                    }
483                                }
484                            })
485                        }));
486                    } else {
487                        self.store
488                            .data_mut()
489                            .wrpc()
490                            .table
491                            .get_mut(&stream)
492                            .context("failed to get input stream")?;
493                        // NOTE: In order to handle this we'd need to know how many bytes the
494                        // receiver has read. That means that some kind of callback would be required from
495                        // the receiver. This is not trivial and generally should be a very rare use case.
496                        bail!("encoding borrowed `wasi:io/input-stream` not supported yet");
497                    };
498                    Ok(())
499                } else if resource.ty() == ResourceType::host::<RemoteResource>() {
500                    let resource = resource
501                        .try_into_resource(&mut self.store)
502                        .context("resource type mismatch")?;
503                    let table = self.store.data_mut().wrpc().table;
504                    if resource.owned() {
505                        let RemoteResource(buf) = table
506                            .delete(resource)
507                            .context("failed to delete remote resource")?;
508                        CoreVecEncoderBytes
509                            .encode(buf, dst)
510                            .context("failed to encode resource handle")
511                    } else {
512                        let RemoteResource(buf) = table
513                            .get(&resource)
514                            .context("failed to get remote resource")?;
515                        CoreVecEncoderBytes
516                            .encode(buf, dst)
517                            .context("failed to encode resource handle")
518                    }
519                } else if self.resources.contains(ty) {
520                    let id = Uuid::now_v7();
521                    CoreVecEncoderBytes
522                        .encode(id.to_bytes_le().as_slice(), dst)
523                        .context("failed to encode resource handle")?;
524                    trace!(?id, "store shared resource");
525                    if self
526                        .store
527                        .data_mut()
528                        .wrpc()
529                        .ctx
530                        .shared_resources()
531                        .0
532                        .insert(id, *resource)
533                        .is_some()
534                    {
535                        error!(?id, "duplicate resource ID generated");
536                    }
537                    Ok(())
538                } else {
539                    bail!("encoding host resources not supported yet")
540                }
541            }
542
543            (_, Type::Future(..) | Type::Stream(..) | Type::ErrorContext) => {
544                bail!("async not supported")
545            }
546            _ => bail!("value type mismatch"),
547        }
548    }
549}
550
551#[inline]
552async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Result<u128> {
553    let mut buf = 0u128.to_le_bytes();
554    r.read_exact(&mut buf[..n]).await?;
555    Ok(u128::from_le_bytes(buf))
556}
557
558/// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`]
559#[instrument(level = "trace", skip_all, fields(ty, path))]
560#[allow(clippy::too_many_arguments)]
561pub async fn read_value<T, R>(
562    store: &mut impl AsContextMut<Data = T>,
563    r: &mut Pin<&mut R>,
564    resources: &[ResourceType],
565    io_streams: &[ResourceType],
566    val: &mut Val,
567    ty: &Type,
568    path: &[usize],
569) -> std::io::Result<()>
570where
571    T: WrpcView + 'static,
572    R: AsyncRead + wrpc_transport::Index<R> + Send + Unpin + 'static,
573{
574    match ty {
575        Type::Bool => {
576            let v = r.read_bool().await?;
577            *val = Val::Bool(v);
578            Ok(())
579        }
580        Type::S8 => {
581            let v = r.read_i8().await?;
582            *val = Val::S8(v);
583            Ok(())
584        }
585        Type::U8 => {
586            let v = r.read_u8().await?;
587            *val = Val::U8(v);
588            Ok(())
589        }
590        Type::S16 => {
591            let v = r.read_i16_leb128().await?;
592            *val = Val::S16(v);
593            Ok(())
594        }
595        Type::U16 => {
596            let v = r.read_u16_leb128().await?;
597            *val = Val::U16(v);
598            Ok(())
599        }
600        Type::S32 => {
601            let v = r.read_i32_leb128().await?;
602            *val = Val::S32(v);
603            Ok(())
604        }
605        Type::U32 => {
606            let v = r.read_u32_leb128().await?;
607            *val = Val::U32(v);
608            Ok(())
609        }
610        Type::S64 => {
611            let v = r.read_i64_leb128().await?;
612            *val = Val::S64(v);
613            Ok(())
614        }
615        Type::U64 => {
616            let v = r.read_u64_leb128().await?;
617            *val = Val::U64(v);
618            Ok(())
619        }
620        Type::Float32 => {
621            let v = r.read_f32_le().await?;
622            *val = Val::Float32(v);
623            Ok(())
624        }
625        Type::Float64 => {
626            let v = r.read_f64_le().await?;
627            *val = Val::Float64(v);
628            Ok(())
629        }
630        Type::Char => {
631            let v = r.read_char_utf8().await?;
632            *val = Val::Char(v);
633            Ok(())
634        }
635        Type::String => {
636            let mut s = String::default();
637            r.read_core_name(&mut s).await?;
638            *val = Val::String(s);
639            Ok(())
640        }
641        Type::List(ty) => {
642            let n = r.read_u32_leb128().await?;
643            let n = n.try_into().unwrap_or(usize::MAX);
644            let mut vs = Vec::with_capacity(n);
645            let ty = ty.ty();
646            let mut path = path.to_vec();
647            for i in 0..n {
648                let mut v = Val::Bool(false);
649                path.push(i);
650                trace!(i, "reading list element value");
651                Box::pin(read_value(
652                    store, r, resources, io_streams, &mut v, &ty, &path,
653                ))
654                .await?;
655                path.pop();
656                vs.push(v);
657            }
658            *val = Val::List(vs);
659            Ok(())
660        }
661        Type::Record(ty) => {
662            let fields = ty.fields();
663            let mut vs = Vec::with_capacity(fields.len());
664            let mut path = path.to_vec();
665            for (i, Field { name, ty }) in fields.enumerate() {
666                let mut v = Val::Bool(false);
667                path.push(i);
668                trace!(i, "reading struct field value");
669                Box::pin(read_value(
670                    store, r, resources, io_streams, &mut v, &ty, &path,
671                ))
672                .await?;
673                path.pop();
674                vs.push((name.to_string(), v));
675            }
676            *val = Val::Record(vs);
677            Ok(())
678        }
679        Type::Tuple(ty) => {
680            let types = ty.types();
681            let mut vs = Vec::with_capacity(types.len());
682            let mut path = path.to_vec();
683            for (i, ty) in types.enumerate() {
684                let mut v = Val::Bool(false);
685                path.push(i);
686                trace!(i, "reading tuple element value");
687                Box::pin(read_value(
688                    store, r, resources, io_streams, &mut v, &ty, &path,
689                ))
690                .await?;
691                path.pop();
692                vs.push(v);
693            }
694            *val = Val::Tuple(vs);
695            Ok(())
696        }
697        Type::Variant(ty) => {
698            let discriminant = r.read_u32_leb128().await?;
699            let discriminant = discriminant
700                .try_into()
701                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
702            let Case { name, ty } = ty.cases().nth(discriminant).ok_or_else(|| {
703                std::io::Error::new(
704                    std::io::ErrorKind::InvalidInput,
705                    format!("unknown variant discriminant `{discriminant}`"),
706                )
707            })?;
708            let name = name.to_string();
709            if let Some(ty) = ty {
710                let mut v = Val::Bool(false);
711                trace!(variant = name, "reading nested variant value");
712                Box::pin(read_value(
713                    store, r, resources, io_streams, &mut v, &ty, path,
714                ))
715                .await?;
716                *val = Val::Variant(name, Some(Box::new(v)));
717            } else {
718                *val = Val::Variant(name, None);
719            }
720            Ok(())
721        }
722        Type::Enum(ty) => {
723            let discriminant = r.read_u32_leb128().await?;
724            let discriminant = discriminant
725                .try_into()
726                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
727            let name = ty.names().nth(discriminant).ok_or_else(|| {
728                std::io::Error::new(
729                    std::io::ErrorKind::InvalidInput,
730                    format!("unknown enum discriminant `{discriminant}`"),
731                )
732            })?;
733            *val = Val::Enum(name.to_string());
734            Ok(())
735        }
736        Type::Option(ty) => {
737            let ok = r.read_option_status().await?;
738            if ok {
739                let mut v = Val::Bool(false);
740                trace!("reading nested `option::some` value");
741                Box::pin(read_value(
742                    store,
743                    r,
744                    resources,
745                    io_streams,
746                    &mut v,
747                    &ty.ty(),
748                    path,
749                ))
750                .await?;
751                *val = Val::Option(Some(Box::new(v)));
752            } else {
753                *val = Val::Option(None);
754            }
755            Ok(())
756        }
757        Type::Result(ty) => {
758            let ok = r.read_result_status().await?;
759            if ok {
760                if let Some(ty) = ty.ok() {
761                    let mut v = Val::Bool(false);
762                    trace!("reading nested `result::ok` value");
763                    Box::pin(read_value(
764                        store, r, resources, io_streams, &mut v, &ty, path,
765                    ))
766                    .await?;
767                    *val = Val::Result(Ok(Some(Box::new(v))));
768                } else {
769                    *val = Val::Result(Ok(None));
770                }
771            } else if let Some(ty) = ty.err() {
772                let mut v = Val::Bool(false);
773                trace!("reading nested `result::err` value");
774                Box::pin(read_value(
775                    store, r, resources, io_streams, &mut v, &ty, path,
776                ))
777                .await?;
778                *val = Val::Result(Err(Some(Box::new(v))));
779            } else {
780                *val = Val::Result(Err(None));
781            }
782            Ok(())
783        }
784        Type::Flags(ty) => {
785            let names = ty.names();
786            let flags = match names.len() {
787                ..=8 => read_flags(1, r).await?,
788                9..=16 => read_flags(2, r).await?,
789                17..=24 => read_flags(3, r).await?,
790                25..=32 => read_flags(4, r).await?,
791                33..=40 => read_flags(5, r).await?,
792                41..=48 => read_flags(6, r).await?,
793                49..=56 => read_flags(7, r).await?,
794                57..=64 => read_flags(8, r).await?,
795                65..=72 => read_flags(9, r).await?,
796                73..=80 => read_flags(10, r).await?,
797                81..=88 => read_flags(11, r).await?,
798                89..=96 => read_flags(12, r).await?,
799                97..=104 => read_flags(13, r).await?,
800                105..=112 => read_flags(14, r).await?,
801                113..=120 => read_flags(15, r).await?,
802                121..=128 => r.read_u128_le().await?,
803                bits @ 129.. => {
804                    let mut cap = bits / 8;
805                    if bits % 8 != 0 {
806                        cap = cap.saturating_add(1);
807                    }
808                    let mut buf = vec![0; cap];
809                    r.read_exact(&mut buf).await?;
810                    let mut vs = Vec::with_capacity(
811                        buf.iter()
812                            .map(|b| b.count_ones())
813                            .sum::<u32>()
814                            .try_into()
815                            .unwrap_or(usize::MAX),
816                    );
817                    for (i, name) in names.enumerate() {
818                        if buf[i / 8] & (1 << (i % 8)) != 0 {
819                            vs.push(name.to_string());
820                        }
821                    }
822                    *val = Val::Flags(vs);
823                    return Ok(());
824                }
825            };
826            let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX));
827            for (i, name) in zip(0.., names) {
828                if flags & (1 << i) != 0 {
829                    vs.push(name.to_string());
830                }
831            }
832            *val = Val::Flags(vs);
833            Ok(())
834        }
835        Type::Own(ty) | Type::Borrow(ty) => {
836            if *ty == ResourceType::host::<DynInputStream>() || io_streams.contains(ty) {
837                let mut store = store.as_context_mut();
838                let r = r.index(path).map_err(std::io::Error::other)?;
839                // TODO: Implement a custom reader, this approach ignores the stream end (`\0`),
840                // which will could potentially break/hang with some transports
841                // The stream must be typed as `DynInputStream` (the host resource type),
842                // otherwise the resulting resource handle carries the concrete reader type
843                // and fails the guest's `own<input-stream>` type check.
844                let stream: DynInputStream = Box::new(AsyncReadStream::new(
845                    FramedRead::new(r, ListDecoderU8::default())
846                        .into_async_read()
847                        .compat(),
848                ));
849                let res = store
850                    .data_mut()
851                    .wrpc()
852                    .table
853                    .push(stream)
854                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
855                let v = res
856                    .try_into_resource_any(store)
857                    .map_err(std::io::Error::other)?;
858                *val = Val::Resource(v);
859                Ok(())
860            } else if resources.contains(ty) {
861                let mut store = store.as_context_mut();
862                let mut id = uuid::Bytes::default();
863                debug_assert_eq!(id.len(), 16);
864                let n = r.read_u8_leb128().await?;
865                if usize::from(n) != id.len() {
866                    return Err(std::io::Error::new(
867                        std::io::ErrorKind::InvalidInput,
868                        format!(
869                            "invalid guest resource handle length {n}, expected {}",
870                            id.len()
871                        ),
872                    ));
873                }
874                let n = r.read_exact(&mut id).await?;
875                if n != id.len() {
876                    return Err(std::io::Error::new(
877                        std::io::ErrorKind::InvalidInput,
878                        format!(
879                            "invalid amount of guest resource handle bytes read {n}, expected {}",
880                            id.len()
881                        ),
882                    ));
883                }
884
885                let id = Uuid::from_bytes_le(id);
886                trace!(?id, "lookup shared resource");
887                let resource = store
888                    .data_mut()
889                    .wrpc()
890                    .ctx
891                    .shared_resources()
892                    .0
893                    .get(&id)
894                    .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::NotFound))?;
895                *val = Val::Resource(*resource);
896                Ok(())
897            } else {
898                let mut store = store.as_context_mut();
899                let n = r.read_u32_leb128().await?;
900                let n = usize::try_from(n)
901                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
902                let mut buf = Vec::with_capacity(n);
903                r.read_to_end(&mut buf).await?;
904                let table = store.data_mut().wrpc().table;
905                let resource = table
906                    .push(RemoteResource(buf.into()))
907                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
908                let resource = resource
909                    .try_into_resource_any(store)
910                    .map_err(std::io::Error::other)?;
911                *val = Val::Resource(resource);
912                Ok(())
913            }
914        }
915        Type::Future(..) | Type::Stream(..) | Type::ErrorContext => Err(std::io::Error::new(
916            std::io::ErrorKind::Unsupported,
917            "async not supported",
918        )),
919        Type::Map(..) => Err(std::io::Error::new(
920            std::io::ErrorKind::Unsupported,
921            "`map` type not supported",
922        )),
923    }
924}