1use core::future::Future;
2use core::iter::zip;
3use core::ops::{BitOrAssign, Shl};
4use core::pin::{pin, Pin};
5
6use std::collections::HashSet;
7
8use bytes::{BufMut as _, BytesMut};
9use futures::stream::FuturesUnordered;
10use futures::TryStreamExt as _;
11use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
12use tokio_util::codec::{Encoder, FramedRead};
13use tokio_util::compat::FuturesAsyncReadCompatExt as _;
14use tracing::{error, instrument, trace, warn};
15use uuid::Uuid;
16use wasm_tokio::cm::AsyncReadValue as _;
17use wasm_tokio::{
18 AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder,
19 CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
20};
21use wasmtime::bail;
22use wasmtime::component::types::{Case, Field};
23use wasmtime::component::{ResourceType, Type, Val};
24use wasmtime::error::Context as _;
25use wasmtime::{AsContextMut, StoreContextMut};
26use wasmtime_wasi::p2::pipe::AsyncReadStream;
27use wasmtime_wasi::p2::{DynInputStream, StreamError};
28use wrpc_transport::ListDecoderU8;
29
30use crate::{RemoteResource, WrpcView};
31
32pub struct ValEncoder<'a, T: 'static, W> {
33 pub store: StoreContextMut<'a, T>,
34 pub ty: &'a Type,
35 pub resources: &'a [ResourceType],
36 pub io_streams: &'a [ResourceType],
39 pub deferred: Option<
40 Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
41 >,
42}
43
44impl<T, W> ValEncoder<'_, T, W> {
45 #[must_use]
46 pub fn new<'a>(
47 store: StoreContextMut<'a, T>,
48 ty: &'a Type,
49 resources: &'a [ResourceType],
50 io_streams: &'a [ResourceType],
51 ) -> ValEncoder<'a, T, W> {
52 ValEncoder {
53 store,
54 ty,
55 resources,
56 io_streams,
57 deferred: None,
58 }
59 }
60
61 pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> {
62 ValEncoder {
63 store: self.store.as_context_mut(),
64 ty,
65 resources: self.resources,
66 io_streams: self.io_streams,
67 deferred: None,
68 }
69 }
70}
71
72fn find_enum_discriminant<'a, T>(
73 iter: impl IntoIterator<Item = T>,
74 names: impl IntoIterator<Item = &'a str>,
75 discriminant: &str,
76) -> wasmtime::Result<T> {
77 zip(iter, names)
78 .find_map(|(i, name)| (name == discriminant).then_some(i))
79 .context("unknown enum discriminant")
80}
81
82fn find_variant_discriminant<'a, T>(
83 iter: impl IntoIterator<Item = T>,
84 cases: impl IntoIterator<Item = Case<'a>>,
85 discriminant: &str,
86) -> wasmtime::Result<(T, Option<Type>)> {
87 zip(iter, cases)
88 .find_map(|(i, Case { name, ty })| (name == discriminant).then_some((i, ty)))
89 .context("unknown variant discriminant")
90}
91
92#[inline]
93fn flag_bits<'a, T: BitOrAssign + Shl<u8, Output = T> + From<u8>>(
94 names: impl IntoIterator<Item = &'a str>,
95 flags: impl IntoIterator<Item = &'a str>,
96) -> T {
97 let mut v = T::from(0);
98 let flags: HashSet<&str> = flags.into_iter().collect();
99 for (i, name) in zip(0u8.., names) {
100 if flags.contains(name) {
101 v |= T::from(1) << i;
102 }
103 }
104 v
105}
106
107async fn write_deferred<W, I>(w: W, deferred: I) -> wasmtime::Result<()>
108where
109 W: wrpc_transport::Index<W> + Sync + Send + 'static,
110 I: IntoIterator,
111 I::IntoIter: ExactSizeIterator<
112 Item = Option<
113 Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
114 >,
115 >,
116{
117 let mut futs: FuturesUnordered<_> = zip(0.., deferred)
118 .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f)))
119 .map(|(w, f)| async move {
120 let w = w.map_err(wasmtime::Error::from_anyhow)?;
121 f(w).await
122 })
123 .collect();
124 while let Some(()) = futs.try_next().await? {}
125 Ok(())
126}
127
128impl<T, W> Encoder<&Val> for ValEncoder<'_, T, W>
129where
130 T: WrpcView,
131 W: AsyncWrite + wrpc_transport::Index<W> + Sync + Send + 'static,
132{
133 type Error = wasmtime::Error;
134
135 #[allow(clippy::too_many_lines)]
136 #[instrument(level = "trace", skip(self))]
137 fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> {
138 match (v, self.ty) {
139 (Val::Bool(v), Type::Bool) => {
140 dst.reserve(1);
141 dst.put_u8((*v).into());
142 Ok(())
143 }
144 (Val::S8(v), Type::S8) => {
145 dst.reserve(1);
146 dst.put_i8(*v);
147 Ok(())
148 }
149 (Val::U8(v), Type::U8) => {
150 dst.reserve(1);
151 dst.put_u8(*v);
152 Ok(())
153 }
154 (Val::S16(v), Type::S16) => Leb128Encoder
155 .encode(*v, dst)
156 .context("failed to encode s16"),
157 (Val::U16(v), Type::U16) => Leb128Encoder
158 .encode(*v, dst)
159 .context("failed to encode u16"),
160 (Val::S32(v), Type::S32) => Leb128Encoder
161 .encode(*v, dst)
162 .context("failed to encode s32"),
163 (Val::U32(v), Type::U32) => Leb128Encoder
164 .encode(*v, dst)
165 .context("failed to encode u32"),
166 (Val::S64(v), Type::S64) => Leb128Encoder
167 .encode(*v, dst)
168 .context("failed to encode s64"),
169 (Val::U64(v), Type::U64) => Leb128Encoder
170 .encode(*v, dst)
171 .context("failed to encode u64"),
172 (Val::Float32(v), Type::Float32) => {
173 dst.reserve(4);
174 dst.put_f32_le(*v);
175 Ok(())
176 }
177 (Val::Float64(v), Type::Float64) => {
178 dst.reserve(8);
179 dst.put_f64_le(*v);
180 Ok(())
181 }
182 (Val::Char(v), Type::Char) => {
183 Utf8Codec.encode(*v, dst).context("failed to encode char")
184 }
185 (Val::String(v), Type::String) => CoreNameEncoder
186 .encode(v.as_str(), dst)
187 .context("failed to encode string"),
188 (Val::List(vs), Type::List(ty)) => {
189 let ty = ty.ty();
190 let n = u32::try_from(vs.len()).context("list length does not fit in u32")?;
191 dst.reserve(5 + vs.len());
192 Leb128Encoder
193 .encode(n, dst)
194 .context("failed to encode list length")?;
195 let mut deferred = Vec::with_capacity(vs.len());
196 for v in vs {
197 let mut enc = self.with_type(&ty);
198 enc.encode(v, dst)
199 .context("failed to encode list element")?;
200 deferred.push(enc.deferred);
201 }
202 if deferred.iter().any(Option::is_some) {
203 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
204 }
205 Ok(())
206 }
207 (Val::Record(vs), Type::Record(ty)) => {
208 dst.reserve(vs.len());
209 let mut deferred = Vec::with_capacity(vs.len());
210 for ((name, v), Field { ref ty, .. }) in zip(vs, ty.fields()) {
211 let mut enc = self.with_type(ty);
212 enc.encode(v, dst)
213 .with_context(|| format!("failed to encode `{name}` field"))?;
214 deferred.push(enc.deferred);
215 }
216 if deferred.iter().any(Option::is_some) {
217 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
218 }
219 Ok(())
220 }
221 (Val::Tuple(vs), Type::Tuple(ty)) => {
222 dst.reserve(vs.len());
223 let mut deferred = Vec::with_capacity(vs.len());
224 for (v, ref ty) in zip(vs, ty.types()) {
225 let mut enc = self.with_type(ty);
226 enc.encode(v, dst)
227 .context("failed to encode tuple element")?;
228 deferred.push(enc.deferred);
229 }
230 if deferred.iter().any(Option::is_some) {
231 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
232 }
233 Ok(())
234 }
235 (Val::Variant(discriminant, v), Type::Variant(ty)) => {
236 let cases = ty.cases();
237 let ty = match cases.len() {
238 ..=0x0000_00ff => {
239 let (discriminant, ty) =
240 find_variant_discriminant(0u8.., cases, discriminant)?;
241 dst.reserve(2 + usize::from(v.is_some()));
242 Leb128Encoder.encode(discriminant, dst)?;
243 ty
244 }
245 0x0000_0100..=0x0000_ffff => {
246 let (discriminant, ty) =
247 find_variant_discriminant(0u16.., cases, discriminant)?;
248 dst.reserve(3 + usize::from(v.is_some()));
249 Leb128Encoder.encode(discriminant, dst)?;
250 ty
251 }
252 0x0001_0000..=0x00ff_ffff => {
253 let (discriminant, ty) =
254 find_variant_discriminant(0u32.., cases, discriminant)?;
255 dst.reserve(4 + usize::from(v.is_some()));
256 Leb128Encoder.encode(discriminant, dst)?;
257 ty
258 }
259 0x0100_0000..=0xffff_ffff => {
260 let (discriminant, ty) =
261 find_variant_discriminant(0u32.., cases, discriminant)?;
262 dst.reserve(5 + usize::from(v.is_some()));
263 Leb128Encoder.encode(discriminant, dst)?;
264 ty
265 }
266 0x1_0000_0000.. => bail!("case count does not fit in u32"),
267 };
268 if let Some(v) = v {
269 let ty = ty.context("type missing for variant")?;
270 let mut enc = self.with_type(&ty);
271 enc.encode(v, dst)
272 .context("failed to encode variant value")?;
273 if let Some(f) = enc.deferred {
274 self.deferred = Some(f);
275 }
276 }
277 Ok(())
278 }
279 (Val::Enum(discriminant), Type::Enum(ty)) => {
280 let names = ty.names();
281 match names.len() {
282 ..=0x0000_00ff => {
283 let discriminant = find_enum_discriminant(0u8.., names, discriminant)?;
284 dst.reserve(2);
285 Leb128Encoder.encode(discriminant, dst)?;
286 }
287 0x0000_0100..=0x0000_ffff => {
288 let discriminant = find_enum_discriminant(0u16.., names, discriminant)?;
289 dst.reserve(3);
290 Leb128Encoder.encode(discriminant, dst)?;
291 }
292 0x0001_0000..=0x00ff_ffff => {
293 let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
294 dst.reserve(4);
295 Leb128Encoder.encode(discriminant, dst)?;
296 }
297 0x0100_0000..=0xffff_ffff => {
298 let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
299 dst.reserve(5);
300 Leb128Encoder.encode(discriminant, dst)?;
301 }
302 0x1_0000_0000.. => bail!("name count does not fit in u32"),
303 }
304 Ok(())
305 }
306 (Val::Option(None), Type::Option(_)) => {
307 dst.reserve(1);
308 dst.put_u8(0);
309 Ok(())
310 }
311 (Val::Option(Some(v)), Type::Option(ty)) => {
312 dst.reserve(2);
313 dst.put_u8(1);
314 let ty = ty.ty();
315 let mut enc = self.with_type(&ty);
316 enc.encode(v, dst)
317 .context("failed to encode `option::some` value")?;
318 if let Some(f) = enc.deferred {
319 self.deferred = Some(f);
320 }
321 Ok(())
322 }
323 (Val::Result(v), Type::Result(ty)) => match v {
324 Ok(v) => match (v, ty.ok()) {
325 (Some(v), Some(ty)) => {
326 dst.reserve(2);
327 dst.put_u8(0);
328 let mut enc = self.with_type(&ty);
329 enc.encode(v, dst)
330 .context("failed to encode `result::ok` value")?;
331 if let Some(f) = enc.deferred {
332 self.deferred = Some(f);
333 }
334 Ok(())
335 }
336 (Some(_v), None) => bail!("`result::ok` value of unknown type"),
337 (None, Some(_ty)) => bail!("`result::ok` value missing"),
338 (None, None) => {
339 dst.reserve(1);
340 dst.put_u8(0);
341 Ok(())
342 }
343 },
344 Err(v) => match (v, ty.err()) {
345 (Some(v), Some(ty)) => {
346 dst.reserve(2);
347 dst.put_u8(1);
348 let mut enc = self.with_type(&ty);
349 enc.encode(v, dst)
350 .context("failed to encode `result::err` value")?;
351 if let Some(f) = enc.deferred {
352 self.deferred = Some(f);
353 }
354 Ok(())
355 }
356 (Some(_v), None) => bail!("`result::err` value of unknown type"),
357 (None, Some(_ty)) => bail!("`result::err` value missing"),
358 (None, None) => {
359 dst.reserve(1);
360 dst.put_u8(1);
361 Ok(())
362 }
363 },
364 },
365 (Val::Flags(vs), Type::Flags(ty)) => {
366 let names = ty.names();
367 let vs = vs.iter().map(String::as_str);
368 match names.len() {
369 ..=8 => {
370 dst.reserve(1);
371 dst.put_u8(flag_bits(names, vs));
372 }
373 9..=16 => {
374 dst.reserve(2);
375 dst.put_u16_le(flag_bits(names, vs));
376 }
377 17..=24 => {
378 dst.reserve(3);
379 dst.put_slice(&u32::to_le_bytes(flag_bits(names, vs))[..3]);
380 }
381 25..=32 => {
382 dst.reserve(4);
383 dst.put_u32_le(flag_bits(names, vs));
384 }
385 33..=40 => {
386 dst.reserve(5);
387 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..5]);
388 }
389 41..=48 => {
390 dst.reserve(6);
391 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..6]);
392 }
393 49..=56 => {
394 dst.reserve(7);
395 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..7]);
396 }
397 57..=64 => {
398 dst.reserve(8);
399 dst.put_u64_le(flag_bits(names, vs));
400 }
401 65..=72 => {
402 dst.reserve(9);
403 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..9]);
404 }
405 73..=80 => {
406 dst.reserve(10);
407 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..10]);
408 }
409 81..=88 => {
410 dst.reserve(11);
411 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..11]);
412 }
413 89..=96 => {
414 dst.reserve(12);
415 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..12]);
416 }
417 97..=104 => {
418 dst.reserve(13);
419 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..13]);
420 }
421 105..=112 => {
422 dst.reserve(14);
423 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..14]);
424 }
425 113..=120 => {
426 dst.reserve(15);
427 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..15]);
428 }
429 121..=128 => {
430 dst.reserve(16);
431 dst.put_u128_le(flag_bits(names, vs));
432 }
433 bits @ 129.. => {
434 let mut cap = bits / 8;
435 if bits % 8 != 0 {
436 cap = cap.saturating_add(1);
437 }
438 let mut buf = vec![0; cap];
439 let flags: HashSet<&str> = vs.into_iter().collect();
440 for (i, name) in names.enumerate() {
441 if flags.contains(name) {
442 buf[i / 8] |= 1 << (i % 8);
443 }
444 }
445 dst.extend_from_slice(&buf);
446 }
447 }
448 Ok(())
449 }
450 (Val::Resource(resource), Type::Own(ty) | Type::Borrow(ty)) => {
451 if *ty == ResourceType::host::<DynInputStream>() || self.io_streams.contains(ty) {
452 let stream = resource
453 .try_into_resource::<DynInputStream>(&mut self.store)
454 .context("failed to downcast `wasi:io/input-stream`")?;
455 if stream.owned() {
456 let mut stream = self
457 .store
458 .data_mut()
459 .wrpc()
460 .table
461 .delete(stream)
462 .context("failed to delete input stream")?;
463 self.deferred = Some(Box::new(|w| {
464 Box::pin(async move {
465 let mut w = pin!(w);
466 loop {
467 stream.ready().await;
468 match stream.read(8096) {
469 Ok(buf) => {
470 let mut chunk = BytesMut::with_capacity(
471 buf.len().saturating_add(5),
472 );
473 CoreVecEncoderBytes
474 .encode(buf, &mut chunk)
475 .context("failed to encode input stream chunk")?;
476 w.write_all(&chunk).await?;
477 }
478 Err(StreamError::Closed) => {
479 w.write_all(&[0x00]).await?;
480 }
481 Err(err) => return Err(err.into()),
482 }
483 }
484 })
485 }));
486 } else {
487 self.store
488 .data_mut()
489 .wrpc()
490 .table
491 .get_mut(&stream)
492 .context("failed to get input stream")?;
493 bail!("encoding borrowed `wasi:io/input-stream` not supported yet");
497 };
498 Ok(())
499 } else if resource.ty() == ResourceType::host::<RemoteResource>() {
500 let resource = resource
501 .try_into_resource(&mut self.store)
502 .context("resource type mismatch")?;
503 let table = self.store.data_mut().wrpc().table;
504 if resource.owned() {
505 let RemoteResource(buf) = table
506 .delete(resource)
507 .context("failed to delete remote resource")?;
508 CoreVecEncoderBytes
509 .encode(buf, dst)
510 .context("failed to encode resource handle")
511 } else {
512 let RemoteResource(buf) = table
513 .get(&resource)
514 .context("failed to get remote resource")?;
515 CoreVecEncoderBytes
516 .encode(buf, dst)
517 .context("failed to encode resource handle")
518 }
519 } else if self.resources.contains(ty) {
520 let id = Uuid::now_v7();
521 CoreVecEncoderBytes
522 .encode(id.to_bytes_le().as_slice(), dst)
523 .context("failed to encode resource handle")?;
524 trace!(?id, "store shared resource");
525 if self
526 .store
527 .data_mut()
528 .wrpc()
529 .ctx
530 .shared_resources()
531 .0
532 .insert(id, *resource)
533 .is_some()
534 {
535 error!(?id, "duplicate resource ID generated");
536 }
537 Ok(())
538 } else {
539 bail!("encoding host resources not supported yet")
540 }
541 }
542
543 (_, Type::Future(..) | Type::Stream(..) | Type::ErrorContext) => {
544 bail!("async not supported")
545 }
546 _ => bail!("value type mismatch"),
547 }
548 }
549}
550
551#[inline]
552async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Result<u128> {
553 let mut buf = 0u128.to_le_bytes();
554 r.read_exact(&mut buf[..n]).await?;
555 Ok(u128::from_le_bytes(buf))
556}
557
558#[instrument(level = "trace", skip_all, fields(ty, path))]
560#[allow(clippy::too_many_arguments)]
561pub async fn read_value<T, R>(
562 store: &mut impl AsContextMut<Data = T>,
563 r: &mut Pin<&mut R>,
564 resources: &[ResourceType],
565 io_streams: &[ResourceType],
566 val: &mut Val,
567 ty: &Type,
568 path: &[usize],
569) -> std::io::Result<()>
570where
571 T: WrpcView + 'static,
572 R: AsyncRead + wrpc_transport::Index<R> + Send + Unpin + 'static,
573{
574 match ty {
575 Type::Bool => {
576 let v = r.read_bool().await?;
577 *val = Val::Bool(v);
578 Ok(())
579 }
580 Type::S8 => {
581 let v = r.read_i8().await?;
582 *val = Val::S8(v);
583 Ok(())
584 }
585 Type::U8 => {
586 let v = r.read_u8().await?;
587 *val = Val::U8(v);
588 Ok(())
589 }
590 Type::S16 => {
591 let v = r.read_i16_leb128().await?;
592 *val = Val::S16(v);
593 Ok(())
594 }
595 Type::U16 => {
596 let v = r.read_u16_leb128().await?;
597 *val = Val::U16(v);
598 Ok(())
599 }
600 Type::S32 => {
601 let v = r.read_i32_leb128().await?;
602 *val = Val::S32(v);
603 Ok(())
604 }
605 Type::U32 => {
606 let v = r.read_u32_leb128().await?;
607 *val = Val::U32(v);
608 Ok(())
609 }
610 Type::S64 => {
611 let v = r.read_i64_leb128().await?;
612 *val = Val::S64(v);
613 Ok(())
614 }
615 Type::U64 => {
616 let v = r.read_u64_leb128().await?;
617 *val = Val::U64(v);
618 Ok(())
619 }
620 Type::Float32 => {
621 let v = r.read_f32_le().await?;
622 *val = Val::Float32(v);
623 Ok(())
624 }
625 Type::Float64 => {
626 let v = r.read_f64_le().await?;
627 *val = Val::Float64(v);
628 Ok(())
629 }
630 Type::Char => {
631 let v = r.read_char_utf8().await?;
632 *val = Val::Char(v);
633 Ok(())
634 }
635 Type::String => {
636 let mut s = String::default();
637 r.read_core_name(&mut s).await?;
638 *val = Val::String(s);
639 Ok(())
640 }
641 Type::List(ty) => {
642 let n = r.read_u32_leb128().await?;
643 let n = n.try_into().unwrap_or(usize::MAX);
644 let mut vs = Vec::with_capacity(n);
645 let ty = ty.ty();
646 let mut path = path.to_vec();
647 for i in 0..n {
648 let mut v = Val::Bool(false);
649 path.push(i);
650 trace!(i, "reading list element value");
651 Box::pin(read_value(
652 store, r, resources, io_streams, &mut v, &ty, &path,
653 ))
654 .await?;
655 path.pop();
656 vs.push(v);
657 }
658 *val = Val::List(vs);
659 Ok(())
660 }
661 Type::Record(ty) => {
662 let fields = ty.fields();
663 let mut vs = Vec::with_capacity(fields.len());
664 let mut path = path.to_vec();
665 for (i, Field { name, ty }) in fields.enumerate() {
666 let mut v = Val::Bool(false);
667 path.push(i);
668 trace!(i, "reading struct field value");
669 Box::pin(read_value(
670 store, r, resources, io_streams, &mut v, &ty, &path,
671 ))
672 .await?;
673 path.pop();
674 vs.push((name.to_string(), v));
675 }
676 *val = Val::Record(vs);
677 Ok(())
678 }
679 Type::Tuple(ty) => {
680 let types = ty.types();
681 let mut vs = Vec::with_capacity(types.len());
682 let mut path = path.to_vec();
683 for (i, ty) in types.enumerate() {
684 let mut v = Val::Bool(false);
685 path.push(i);
686 trace!(i, "reading tuple element value");
687 Box::pin(read_value(
688 store, r, resources, io_streams, &mut v, &ty, &path,
689 ))
690 .await?;
691 path.pop();
692 vs.push(v);
693 }
694 *val = Val::Tuple(vs);
695 Ok(())
696 }
697 Type::Variant(ty) => {
698 let discriminant = r.read_u32_leb128().await?;
699 let discriminant = discriminant
700 .try_into()
701 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
702 let Case { name, ty } = ty.cases().nth(discriminant).ok_or_else(|| {
703 std::io::Error::new(
704 std::io::ErrorKind::InvalidInput,
705 format!("unknown variant discriminant `{discriminant}`"),
706 )
707 })?;
708 let name = name.to_string();
709 if let Some(ty) = ty {
710 let mut v = Val::Bool(false);
711 trace!(variant = name, "reading nested variant value");
712 Box::pin(read_value(
713 store, r, resources, io_streams, &mut v, &ty, path,
714 ))
715 .await?;
716 *val = Val::Variant(name, Some(Box::new(v)));
717 } else {
718 *val = Val::Variant(name, None);
719 }
720 Ok(())
721 }
722 Type::Enum(ty) => {
723 let discriminant = r.read_u32_leb128().await?;
724 let discriminant = discriminant
725 .try_into()
726 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
727 let name = ty.names().nth(discriminant).ok_or_else(|| {
728 std::io::Error::new(
729 std::io::ErrorKind::InvalidInput,
730 format!("unknown enum discriminant `{discriminant}`"),
731 )
732 })?;
733 *val = Val::Enum(name.to_string());
734 Ok(())
735 }
736 Type::Option(ty) => {
737 let ok = r.read_option_status().await?;
738 if ok {
739 let mut v = Val::Bool(false);
740 trace!("reading nested `option::some` value");
741 Box::pin(read_value(
742 store,
743 r,
744 resources,
745 io_streams,
746 &mut v,
747 &ty.ty(),
748 path,
749 ))
750 .await?;
751 *val = Val::Option(Some(Box::new(v)));
752 } else {
753 *val = Val::Option(None);
754 }
755 Ok(())
756 }
757 Type::Result(ty) => {
758 let ok = r.read_result_status().await?;
759 if ok {
760 if let Some(ty) = ty.ok() {
761 let mut v = Val::Bool(false);
762 trace!("reading nested `result::ok` value");
763 Box::pin(read_value(
764 store, r, resources, io_streams, &mut v, &ty, path,
765 ))
766 .await?;
767 *val = Val::Result(Ok(Some(Box::new(v))));
768 } else {
769 *val = Val::Result(Ok(None));
770 }
771 } else if let Some(ty) = ty.err() {
772 let mut v = Val::Bool(false);
773 trace!("reading nested `result::err` value");
774 Box::pin(read_value(
775 store, r, resources, io_streams, &mut v, &ty, path,
776 ))
777 .await?;
778 *val = Val::Result(Err(Some(Box::new(v))));
779 } else {
780 *val = Val::Result(Err(None));
781 }
782 Ok(())
783 }
784 Type::Flags(ty) => {
785 let names = ty.names();
786 let flags = match names.len() {
787 ..=8 => read_flags(1, r).await?,
788 9..=16 => read_flags(2, r).await?,
789 17..=24 => read_flags(3, r).await?,
790 25..=32 => read_flags(4, r).await?,
791 33..=40 => read_flags(5, r).await?,
792 41..=48 => read_flags(6, r).await?,
793 49..=56 => read_flags(7, r).await?,
794 57..=64 => read_flags(8, r).await?,
795 65..=72 => read_flags(9, r).await?,
796 73..=80 => read_flags(10, r).await?,
797 81..=88 => read_flags(11, r).await?,
798 89..=96 => read_flags(12, r).await?,
799 97..=104 => read_flags(13, r).await?,
800 105..=112 => read_flags(14, r).await?,
801 113..=120 => read_flags(15, r).await?,
802 121..=128 => r.read_u128_le().await?,
803 bits @ 129.. => {
804 let mut cap = bits / 8;
805 if bits % 8 != 0 {
806 cap = cap.saturating_add(1);
807 }
808 let mut buf = vec![0; cap];
809 r.read_exact(&mut buf).await?;
810 let mut vs = Vec::with_capacity(
811 buf.iter()
812 .map(|b| b.count_ones())
813 .sum::<u32>()
814 .try_into()
815 .unwrap_or(usize::MAX),
816 );
817 for (i, name) in names.enumerate() {
818 if buf[i / 8] & (1 << (i % 8)) != 0 {
819 vs.push(name.to_string());
820 }
821 }
822 *val = Val::Flags(vs);
823 return Ok(());
824 }
825 };
826 let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX));
827 for (i, name) in zip(0.., names) {
828 if flags & (1 << i) != 0 {
829 vs.push(name.to_string());
830 }
831 }
832 *val = Val::Flags(vs);
833 Ok(())
834 }
835 Type::Own(ty) | Type::Borrow(ty) => {
836 if *ty == ResourceType::host::<DynInputStream>() || io_streams.contains(ty) {
837 let mut store = store.as_context_mut();
838 let r = r.index(path).map_err(std::io::Error::other)?;
839 let stream: DynInputStream = Box::new(AsyncReadStream::new(
845 FramedRead::new(r, ListDecoderU8::default())
846 .into_async_read()
847 .compat(),
848 ));
849 let res = store
850 .data_mut()
851 .wrpc()
852 .table
853 .push(stream)
854 .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
855 let v = res
856 .try_into_resource_any(store)
857 .map_err(std::io::Error::other)?;
858 *val = Val::Resource(v);
859 Ok(())
860 } else if resources.contains(ty) {
861 let mut store = store.as_context_mut();
862 let mut id = uuid::Bytes::default();
863 debug_assert_eq!(id.len(), 16);
864 let n = r.read_u8_leb128().await?;
865 if usize::from(n) != id.len() {
866 return Err(std::io::Error::new(
867 std::io::ErrorKind::InvalidInput,
868 format!(
869 "invalid guest resource handle length {n}, expected {}",
870 id.len()
871 ),
872 ));
873 }
874 let n = r.read_exact(&mut id).await?;
875 if n != id.len() {
876 return Err(std::io::Error::new(
877 std::io::ErrorKind::InvalidInput,
878 format!(
879 "invalid amount of guest resource handle bytes read {n}, expected {}",
880 id.len()
881 ),
882 ));
883 }
884
885 let id = Uuid::from_bytes_le(id);
886 trace!(?id, "lookup shared resource");
887 let resource = store
888 .data_mut()
889 .wrpc()
890 .ctx
891 .shared_resources()
892 .0
893 .get(&id)
894 .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::NotFound))?;
895 *val = Val::Resource(*resource);
896 Ok(())
897 } else {
898 let mut store = store.as_context_mut();
899 let n = r.read_u32_leb128().await?;
900 let n = usize::try_from(n)
901 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
902 let mut buf = Vec::with_capacity(n);
903 r.read_to_end(&mut buf).await?;
904 let table = store.data_mut().wrpc().table;
905 let resource = table
906 .push(RemoteResource(buf.into()))
907 .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
908 let resource = resource
909 .try_into_resource_any(store)
910 .map_err(std::io::Error::other)?;
911 *val = Val::Resource(resource);
912 Ok(())
913 }
914 }
915 Type::Future(..) | Type::Stream(..) | Type::ErrorContext => Err(std::io::Error::new(
916 std::io::ErrorKind::Unsupported,
917 "async not supported",
918 )),
919 Type::Map(..) => Err(std::io::Error::new(
920 std::io::ErrorKind::Unsupported,
921 "`map` type not supported",
922 )),
923 }
924}