wrpc_runtime_wasmtime/
polyfill.rs

1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use anyhow::{bail, ensure, Context as _};
8use bytes::BytesMut;
9use futures::future::try_join_all;
10use tokio::io::AsyncWriteExt as _;
11use tokio::time::Instant;
12use tokio::try_join;
13use tokio_util::codec::Encoder;
14use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
15use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
16use wasmtime::{AsContextMut, Engine, StoreContextMut};
17use wasmtime_wasi::WasiView;
18use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
19
20use crate::rpc::Error;
21use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
22
23/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
24#[instrument(level = "trace", skip_all)]
25pub fn link_item<V>(
26    engine: &Engine,
27    linker: &mut LinkerInstance<V>,
28    guest_resources: impl Into<Arc<[ResourceType]>>,
29    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
30    ty: types::ComponentItem,
31    instance: impl Into<Arc<str>>,
32    name: impl Into<Arc<str>>,
33) -> wasmtime::Result<()>
34where
35    V: WasiView + WrpcView,
36{
37    let instance = instance.into();
38    let guest_resources = guest_resources.into();
39    let host_resources = host_resources.into();
40    match ty {
41        types::ComponentItem::ComponentFunc(ty) => {
42            let name = name.into();
43            debug!(?instance, ?name, "linking function");
44            link_function(
45                linker,
46                Arc::clone(&guest_resources),
47                Arc::clone(&host_resources),
48                ty,
49                instance,
50                name,
51            )?;
52        }
53        types::ComponentItem::CoreFunc(_) => {
54            bail!("polyfilling core functions not supported yet")
55        }
56        types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
57        types::ComponentItem::Component(ty) => {
58            for (name, ty) in ty.imports(engine) {
59                debug!(?instance, name, "linking component item");
60                link_item(
61                    engine,
62                    linker,
63                    Arc::clone(&guest_resources),
64                    Arc::clone(&host_resources),
65                    ty,
66                    "",
67                    name,
68                )?;
69            }
70        }
71        types::ComponentItem::ComponentInstance(ty) => {
72            let name = name.into();
73            let mut linker = linker
74                .instance(&name)
75                .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
76            debug!(?instance, ?name, "linking instance");
77            link_instance(
78                engine,
79                &mut linker,
80                guest_resources,
81                host_resources,
82                ty,
83                name,
84            )?;
85        }
86        types::ComponentItem::Type(_) => {}
87        types::ComponentItem::Resource(ty) => {
88            let name = name.into();
89            let Some((guest_ty, host_ty)) = host_resources
90                .get(&*instance)
91                .and_then(|instance| instance.get(&*name))
92            else {
93                bail!("resource type for {instance}/{name} not defined");
94            };
95            ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
96
97            debug!(?instance, ?name, "linking resource");
98            linker.resource(&name, *host_ty, |_, _| Ok(()))?;
99        }
100    }
101    Ok(())
102}
103
104/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
105#[instrument(level = "trace", skip_all)]
106pub fn link_instance<V>(
107    engine: &Engine,
108    linker: &mut LinkerInstance<V>,
109    guest_resources: impl Into<Arc<[ResourceType]>>,
110    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
111    ty: types::ComponentInstance,
112    name: impl Into<Arc<str>>,
113) -> wasmtime::Result<()>
114where
115    V: WrpcView + WasiView,
116{
117    let instance = name.into();
118    let guest_resources = guest_resources.into();
119    let host_resources = host_resources.into();
120    for (name, ty) in ty.exports(engine) {
121        debug!(name, "linking instance item");
122        link_item(
123            engine,
124            linker,
125            Arc::clone(&guest_resources),
126            Arc::clone(&host_resources),
127            ty,
128            Arc::clone(&instance),
129            name,
130        )?;
131    }
132    Ok(())
133}
134
135#[allow(clippy::too_many_arguments)]
136async fn invoke<T: WrpcView + WasiView>(
137    mut store: &mut StoreContextMut<'_, T>,
138    params: &[Val],
139    results: &mut [Val],
140    guest_resources: Arc<[ResourceType]>,
141    params_ty: impl IntoIterator<Item = (&str, Type)>,
142    results_ty: impl IntoIterator<Item = Type>,
143    instance: Arc<str>,
144    name: Arc<str>,
145) -> wasmtime::Result<anyhow::Result<()>> {
146    let mut buf = BytesMut::default();
147    let mut deferred = vec![];
148    for (v, (name, ref ty)) in zip(params, params_ty) {
149        let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources);
150        enc.encode(v, &mut buf)
151            .with_context(|| format!("failed to encode parameter `{name}`"))?;
152        deferred.push(enc.deferred);
153    }
154    let clt = store.data().client();
155    let cx = store.data().context();
156    let timeout = store.data().timeout();
157    let buf = buf.freeze();
158    // TODO: set paths
159    let paths = &[[]; 0];
160    let rpc_name = rpc_func_name(&name);
161    let start = Instant::now();
162    let invocation = if let Some(timeout) = timeout {
163        clt.timeout(timeout)
164            .invoke(cx, &instance, rpc_name, buf, paths)
165            .await
166    } else {
167        clt.invoke(cx, &instance, rpc_name, buf, paths).await
168    }
169    .with_context(|| format!("failed to invoke `{instance}.{name}` polyfill via wRPC"));
170    let (outgoing, incoming) = match invocation {
171        Ok((outgoing, incoming)) => (outgoing, incoming),
172        Err(err) => return Ok(Err(err)),
173    };
174    let tx = async {
175        try_join_all(
176            zip(0.., deferred)
177                .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
178                .map(|(w, f)| async move {
179                    let w = w?;
180                    f(w).await
181                }),
182        )
183        .await
184        .context("failed to write asynchronous parameters")?;
185        let mut outgoing = pin!(outgoing);
186        outgoing
187            .flush()
188            .await
189            .context("failed to flush outgoing stream")?;
190        if let Err(err) = outgoing.shutdown().await {
191            trace!(?err, "failed to shutdown outgoing stream");
192        }
193        anyhow::Ok(())
194    };
195    let rx = async {
196        let mut incoming = pin!(incoming);
197        for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
198            read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i])
199                .await
200                .with_context(|| format!("failed to decode return value {i}"))?;
201        }
202        Ok(())
203    };
204    let res = if let Some(timeout) = timeout {
205        let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
206        try_join!(
207            async {
208                tokio::time::timeout(timeout, tx)
209                    .await
210                    .context("data transmission timed out")?
211            },
212            async {
213                tokio::time::timeout(timeout, rx)
214                    .await
215                    .context("data receipt timed out")?
216            },
217        )
218    } else {
219        try_join!(tx, rx)
220    };
221    match res {
222        Ok(((), ())) => Ok(Ok(())),
223        Err(err) => Ok(Err(err)),
224    }
225}
226
227/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
228#[instrument(level = "trace", skip_all)]
229pub fn link_function<V>(
230    linker: &mut LinkerInstance<V>,
231    guest_resources: impl Into<Arc<[ResourceType]>>,
232    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
233    ty: types::ComponentFunc,
234    instance: impl Into<Arc<str>>,
235    name: impl Into<Arc<str>>,
236) -> wasmtime::Result<()>
237where
238    V: WrpcView + WasiView,
239{
240    let span = Span::current();
241    let instance = instance.into();
242    let name = name.into();
243    let guest_resources = guest_resources.into();
244    let host_resources = host_resources.into();
245    match rpc_result_type(&host_resources, ty.results()) {
246        None => linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
247            let ty = ty.clone();
248            let instance = Arc::clone(&instance);
249            let name = Arc::clone(&name);
250            let resources = Arc::clone(&guest_resources);
251            Box::new(
252                async move {
253                    match invoke(
254                        &mut store,
255                        params,
256                        results,
257                        resources,
258                        ty.params(),
259                        ty.results(),
260                        instance,
261                        name,
262                    )
263                    .await
264                    {
265                        Ok(Ok(())) => Ok(()),
266                        Ok(Err(err)) => Err(err),
267                        Err(err) => Err(err),
268                    }
269                }
270                .instrument(span.clone()),
271            )
272        }),
273        // `result<_, rpc-eror>`
274        Some(None) => {
275            linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
276                let ty = ty.clone();
277                let instance = Arc::clone(&instance);
278                let name = Arc::clone(&name);
279                let resources = Arc::clone(&guest_resources);
280                Box::new(
281                    async move {
282                        let [result] = results else {
283                            bail!("result type mismatch");
284                        };
285                        match invoke(
286                            &mut store,
287                            params,
288                            &mut [],
289                            resources,
290                            ty.params(),
291                            None,
292                            instance,
293                            name,
294                        )
295                        .await?
296                        {
297                            Ok(()) => {
298                                *result = Val::Result(Ok(None));
299                            }
300                            Err(err) => {
301                                let err = store.data_mut().push_error(Error::Invoke(err))?;
302                                let err = err
303                                    .try_into_resource_any(&mut store)
304                                    .context("failed to lower error resource")?;
305                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
306                            }
307                        }
308                        Ok(())
309                    }
310                    .instrument(span.clone()),
311                )
312            })
313        }
314        // `result<T, rpc-eror>`
315        Some(Some(result_ty)) => {
316            linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
317                let ty = ty.clone();
318                let instance = Arc::clone(&instance);
319                let name = Arc::clone(&name);
320                let resources = Arc::clone(&guest_resources);
321                let result_ty = result_ty.clone();
322                Box::new(
323                    async move {
324                        let [result] = results else {
325                            bail!("result type mismatch");
326                        };
327                        let mut ok = [Val::Bool(false); 1];
328                        match invoke(
329                            &mut store,
330                            params,
331                            ok.as_mut_slice(),
332                            resources,
333                            ty.params(),
334                            [result_ty],
335                            instance,
336                            name,
337                        )
338                        .await?
339                        {
340                            Ok(()) => {
341                                let [ok] = ok;
342                                *result = Val::Result(Ok(Some(Box::new(ok))));
343                            }
344                            Err(err) => {
345                                let err = store.data_mut().push_error(Error::Invoke(err))?;
346                                let err = err
347                                    .try_into_resource_any(&mut store)
348                                    .context("failed to lower error resource")?;
349                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
350                            }
351                        }
352                        Ok(())
353                    }
354                    .instrument(span.clone()),
355                )
356            })
357        }
358    }
359}