Skip to main content

wrpc_runtime_wasmtime/
polyfill.rs

1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::future::try_join_all;
9use tokio::io::AsyncWriteExt as _;
10use tokio::time::Instant;
11use tokio::try_join;
12use tokio_util::codec::Encoder;
13use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
14use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
15use wasmtime::error::Context as _;
16use wasmtime::{bail, ensure, AsContextMut, Engine, StoreContextMut};
17use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
18
19use crate::rpc::Error;
20use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
21
22/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
23#[instrument(level = "trace", skip_all)]
24pub fn link_item<V>(
25    engine: &Engine,
26    linker: &mut LinkerInstance<V>,
27    guest_resources: impl Into<Arc<[ResourceType]>>,
28    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
29    ty: types::ComponentItem,
30    instance: impl Into<Arc<str>>,
31    name: impl Into<Arc<str>>,
32) -> wasmtime::Result<()>
33where
34    V: WrpcView,
35{
36    let instance = instance.into();
37    let guest_resources = guest_resources.into();
38    let host_resources = host_resources.into();
39    match ty {
40        types::ComponentItem::ComponentFunc(ty) => {
41            let name = name.into();
42            debug!(?instance, ?name, "linking function");
43            link_function(
44                linker,
45                Arc::clone(&guest_resources),
46                Arc::clone(&host_resources),
47                ty,
48                instance,
49                name,
50            )?;
51        }
52        types::ComponentItem::CoreFunc(_) => {
53            bail!("polyfilling core functions not supported yet")
54        }
55        types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
56        types::ComponentItem::Component(ty) => {
57            for (name, ty) in ty.imports(engine) {
58                debug!(?instance, name, "linking component item");
59                link_item(
60                    engine,
61                    linker,
62                    Arc::clone(&guest_resources),
63                    Arc::clone(&host_resources),
64                    ty,
65                    "",
66                    name,
67                )?;
68            }
69        }
70        types::ComponentItem::ComponentInstance(ty) => {
71            let name = name.into();
72            let mut linker = linker
73                .instance(&name)
74                .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
75            debug!(?instance, ?name, "linking instance");
76            link_instance(
77                engine,
78                &mut linker,
79                guest_resources,
80                host_resources,
81                ty,
82                name,
83            )?;
84        }
85        types::ComponentItem::Type(_) => {}
86        types::ComponentItem::Resource(ty) => {
87            let name = name.into();
88            let Some((guest_ty, host_ty)) = host_resources
89                .get(&*instance)
90                .and_then(|instance| instance.get(&*name))
91            else {
92                bail!("resource type for {instance}/{name} not defined");
93            };
94            ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
95
96            debug!(?instance, ?name, "linking resource");
97            linker.resource(&name, *host_ty, |_, _| Ok(()))?;
98        }
99    }
100    Ok(())
101}
102
103/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
104#[instrument(level = "trace", skip_all)]
105pub fn link_instance<V>(
106    engine: &Engine,
107    linker: &mut LinkerInstance<V>,
108    guest_resources: impl Into<Arc<[ResourceType]>>,
109    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
110    ty: types::ComponentInstance,
111    name: impl Into<Arc<str>>,
112) -> wasmtime::Result<()>
113where
114    V: WrpcView,
115{
116    let instance = name.into();
117    let guest_resources = guest_resources.into();
118    let host_resources = host_resources.into();
119    for (name, ty) in ty.exports(engine) {
120        debug!(name, "linking instance item");
121        link_item(
122            engine,
123            linker,
124            Arc::clone(&guest_resources),
125            Arc::clone(&host_resources),
126            ty,
127            Arc::clone(&instance),
128            name,
129        )?;
130    }
131    Ok(())
132}
133
134#[allow(clippy::too_many_arguments)]
135async fn invoke<T: WrpcView>(
136    mut store: &mut StoreContextMut<'_, T>,
137    params: &[Val],
138    results: &mut [Val],
139    guest_resources: Arc<[ResourceType]>,
140    params_ty: impl IntoIterator<Item = (&str, Type)>,
141    results_ty: impl IntoIterator<Item = Type>,
142    instance: Arc<str>,
143    name: Arc<str>,
144) -> wasmtime::Result<anyhow::Result<()>> {
145    let mut buf = BytesMut::default();
146    let mut deferred = vec![];
147    for (v, (name, ref ty)) in zip(params, params_ty) {
148        let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources, &[]);
149        enc.encode(v, &mut buf)
150            .with_context(|| format!("failed to encode parameter `{name}`"))?;
151        deferred.push(enc.deferred);
152    }
153    let view = store.data_mut().wrpc();
154    let clt = view.ctx.client();
155    let cx = view.ctx.context();
156    let timeout = view.ctx.timeout();
157    let buf = buf.freeze();
158    // TODO: set paths
159    let paths = &[[]; 0];
160    let rpc_name = rpc_func_name(&name);
161    let start = Instant::now();
162    let invocation = if let Some(timeout) = timeout {
163        clt.timeout(timeout)
164            .invoke(cx, &instance, rpc_name, buf, paths)
165            .await
166    } else {
167        clt.invoke(cx, &instance, rpc_name, buf, paths).await
168    };
169    let (outgoing, incoming) = match invocation {
170        Ok((outgoing, incoming)) => (outgoing, incoming),
171        Err(err) => {
172            return Ok(Err(err.context(format!(
173                "failed to invoke `{instance}.{name}` polyfill via wRPC"
174            ))))
175        }
176    };
177    let tx = async {
178        try_join_all(
179            zip(0.., deferred)
180                .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
181                .map(|(w, f)| async move {
182                    let w = w.map_err(wasmtime::Error::from_anyhow)?;
183                    f(w).await
184                }),
185        )
186        .await
187        .context("failed to write asynchronous parameters")?;
188        let mut outgoing = pin!(outgoing);
189        outgoing
190            .flush()
191            .await
192            .context("failed to flush outgoing stream")?;
193        if let Err(err) = outgoing.shutdown().await {
194            trace!(?err, "failed to shutdown outgoing stream");
195        }
196        wasmtime::error::Ok(())
197    };
198    let rx = async {
199        let mut incoming = pin!(incoming);
200        for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
201            read_value(
202                &mut store,
203                &mut incoming,
204                &guest_resources,
205                &[],
206                v,
207                ty,
208                &[i],
209            )
210            .await
211            .with_context(|| format!("failed to decode return value {i}"))?;
212        }
213        wasmtime::error::Ok(())
214    };
215    let res = if let Some(timeout) = timeout {
216        let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
217        try_join!(
218            async {
219                tokio::time::timeout(timeout, tx)
220                    .await
221                    .context("data transmission timed out")?
222            },
223            async {
224                tokio::time::timeout(timeout, rx)
225                    .await
226                    .context("data receipt timed out")?
227            },
228        )
229    } else {
230        try_join!(tx, rx)
231    };
232    match res {
233        Ok(((), ())) => Ok(Ok(())),
234        Err(err) => Ok(Err(err.into())),
235    }
236}
237
238/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
239#[instrument(level = "trace", skip_all)]
240pub fn link_function<V>(
241    linker: &mut LinkerInstance<V>,
242    guest_resources: impl Into<Arc<[ResourceType]>>,
243    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
244    ty: types::ComponentFunc,
245    instance: impl Into<Arc<str>>,
246    name: impl Into<Arc<str>>,
247) -> wasmtime::Result<()>
248where
249    V: WrpcView,
250{
251    let span = Span::current();
252    let instance = instance.into();
253    let name = name.into();
254    let guest_resources = guest_resources.into();
255    let host_resources = host_resources.into();
256    match rpc_result_type(&host_resources, ty.results()) {
257        None => linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
258            let instance = Arc::clone(&instance);
259            let name = Arc::clone(&name);
260            let resources = Arc::clone(&guest_resources);
261            Box::new(
262                async move {
263                    match invoke(
264                        &mut store,
265                        params,
266                        results,
267                        resources,
268                        ty.params(),
269                        ty.results(),
270                        instance,
271                        name,
272                    )
273                    .await
274                    {
275                        Ok(Ok(())) => Ok(()),
276                        Ok(Err(err)) => Err(wasmtime::Error::from_anyhow(err)),
277                        Err(err) => Err(err),
278                    }
279                }
280                .instrument(span.clone()),
281            )
282        }),
283        // `result<_, rpc-eror>`
284        Some(None) => {
285            linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
286                let instance = Arc::clone(&instance);
287                let name = Arc::clone(&name);
288                let resources = Arc::clone(&guest_resources);
289                Box::new(
290                    async move {
291                        let [result] = results else {
292                            bail!("result type mismatch");
293                        };
294                        match invoke(
295                            &mut store,
296                            params,
297                            &mut [],
298                            resources,
299                            ty.params(),
300                            None,
301                            instance,
302                            name,
303                        )
304                        .await?
305                        {
306                            Ok(()) => {
307                                *result = Val::Result(Ok(None));
308                            }
309                            Err(err) => {
310                                let err = store.data_mut().push_error(Error::Invoke(err))?;
311                                let err = err
312                                    .try_into_resource_any(&mut store)
313                                    .context("failed to lower error resource")?;
314                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
315                            }
316                        }
317                        Ok(())
318                    }
319                    .instrument(span.clone()),
320                )
321            })
322        }
323        // `result<T, rpc-eror>`
324        Some(Some(result_ty)) => {
325            linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
326                let instance = Arc::clone(&instance);
327                let name = Arc::clone(&name);
328                let resources = Arc::clone(&guest_resources);
329                let result_ty = result_ty.clone();
330                Box::new(
331                    async move {
332                        let [result] = results else {
333                            bail!("result type mismatch");
334                        };
335                        let mut ok = [Val::Bool(false); 1];
336                        match invoke(
337                            &mut store,
338                            params,
339                            ok.as_mut_slice(),
340                            resources,
341                            ty.params(),
342                            [result_ty],
343                            instance,
344                            name,
345                        )
346                        .await?
347                        {
348                            Ok(()) => {
349                                let [ok] = ok;
350                                *result = Val::Result(Ok(Some(Box::new(ok))));
351                            }
352                            Err(err) => {
353                                let err = store.data_mut().push_error(Error::Invoke(err))?;
354                                let err = err
355                                    .try_into_resource_any(&mut store)
356                                    .context("failed to lower error resource")?;
357                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
358                            }
359                        }
360                        Ok(())
361                    }
362                    .instrument(span.clone()),
363                )
364            })
365        }
366    }
367}